跳到主要内容

Python

Установка

Название библиотеки совпадает с названием сервиса.

Все доступные библиотеки можно найти на PYPI.

Установка необходимой библиотеки:

pip install vrt_lss_universal

Обновление сразу нескольких библиотек:

pip install --upgrade vrt_lss_universal vrt_lss_routing vrt_lss_account

Установка библиотеки определенной версии

pip install vrt_lss_universal==6.10.2076

Важно проверять версию сервиса, к которому осуществляется обращение, и устанавливать библиотеку совместимой версии.

Пример

Пример использования библиотеки universal, в котором:

  • создается задача на планирование
  • проводится валидация задачи
  • запускается асинхронное планирование и скачивается результат
  • полученный результат конвертируется в xlsx
import datetime
from time import sleep
from typing import List

from vrt_lss_universal import (
Configuration,
ApiClient,
SystemApi,
PlanTask,
Location,
Order,
Performer,
Transport,
Geopoint,
Demand,
PossibleEvent,
TimeWindow,
PerformerShift,
TransportShift,
PlanApi,
CalculationStatus,
Cargo,
Capacity,
Box,
TransportType,
ConvertApi,
DemandType,
UniversalData,
OrderCompatibilities )

# settings
HOST = "https://api.edge.veeroute.tech"
TOKEN = "BEARER_TOKEN"

# create client
configuration = Configuration(access_token=TOKEN, host=HOST)
client = ApiClient(configuration=configuration)

# create api instances
system_api = SystemApi(client)
plan_api = PlanApi(client)
convert_api = ConvertApi(client)


class PlanTaskGenerator:
def __init__(self):
self.time_window = TimeWindow(
var_from=datetime.datetime.now().astimezone() - datetime.timedelta(days=1),
to=datetime.datetime.now().astimezone() + datetime.timedelta(days=1),
)

def __get_locations(self) -> List[Location]:
return [
Location(
key="location_1",
geopoint=Geopoint(
latitude=59.464332,
longitude=31.991713,
),
arrival_duration="PT10S",
departure_duration="PT20S",
work_windows=[self.time_window],
),
Location(
key="location_2",
geopoint=Geopoint(
latitude=59.464332,
longitude=31.991714,
),
arrival_duration="PT10S",
departure_duration="PT20S",
work_windows=[self.time_window],
)
]

def __get_orders(self) -> List[Order]:
order_work = Order(
key="order_1",
demands=[
Demand(
key="demand_1",
demand_type=DemandType.WORK,
precedence_in_trip=0,
precedence_in_order=0,
possible_events=[
PossibleEvent(
key="event_1",
location_key="location_1",
duration="PT10M",
reward=1000.1,
hard_time_window=self.time_window,
)
],
)
],
compatibilities=OrderCompatibilities(
performer_restrictions=["asda"],
),
)
order_pickup_drop = Order(
key="order_2",
cargos=[
Cargo(
key="cargo_1",
capacity=Capacity(
mass=1.0,
volume=1.0,
capacity_a=1.0,
capacity_b=1.0,
capacity_c=1.0,
),
)
],
demands=[
Demand(
key="demand_2_pickup",
demand_type=DemandType.PICKUP,
target_cargos=["cargo_1"],
possible_events=[
PossibleEvent(
key="possible_event_2_pickup",
location_key="location_1",
duration="PT25S",
reward=1000.1,
hard_time_window=self.time_window,
soft_time_window=self.time_window,
)
],
),
Demand(
key="demand_2_drop",
demand_type=DemandType.DROP,
target_cargos=["cargo_1"],
possible_events=[
PossibleEvent(
key="possible_event_2_drop",
location_key="location_1",
duration="PT25S",
reward=1000.1,
hard_time_window=self.time_window,
soft_time_window=self.time_window,
)
],
),
],
)
return [
order_work,
order_pickup_drop,
]

def __get_performers(self) -> List[Performer]:
return [
Performer(
key="perf_1",
shifts=[
PerformerShift(
key="perf_shift_1",
availability_time=self.time_window,
working_time=self.time_window,
start_location_key="location_2",
finish_location_key=None,
max_locations=None,
max_stops=None,
tariff=None,
work_and_rest_rules=None,
)
],
own_transport_type=TransportType.CAR,
compatibilities=None,
limits=None,
)
]

def __get_transports(self) -> List[Transport]:
return [
Transport(
key="transport_1",
shifts=[
TransportShift(
key="tr_shift_1",
availability_time=self.time_window,
working_time=self.time_window,
start_location_key=None,
finish_location_key=None,
tariff=None,
)
],
transport_type=TransportType.TRUCK_1500,
boxes=[
Box(
key="box_1",
capacity=Capacity(
mass=2.0,
volume=2.0,
capacity_a=2.0,
capacity_b=2.0,
capacity_c=2.0,
),
compatibilities=None,
limits=None,
)
],
compatibilities=None,
limits=None,
)
]

def get_plan_task(self) -> PlanTask:
locations = self.__get_locations()
orders = self.__get_orders()
performers = self.__get_performers()
transports = self.__get_transports()

return PlanTask(
locations=locations,
orders=orders,
transports=transports,
performers=performers,
)


def is_calculation_complete(_plan_id: str) -> bool:
calculation_state = plan_api.read_plan_calculation_state(_plan_id)
_status = calculation_state.info.status
is_complete = (
_status == CalculationStatus.FINISHED_IN_TIME
or
_status == CalculationStatus.FINISHED_OUT_OF_TIME
)
return is_complete


if __name__ == "__main__":
check_response = system_api.check()
version = system_api.version()
print(check_response, version)

task = PlanTaskGenerator().get_plan_task()
k = client.sanitize_for_serialization(task)
print(k)

print("--> start validate plan_task")
v_result = plan_api.run_plan_validation(task)
print(f"--> finish validate plan_task with validation result: {v_result}")

print("--> start planning...")
plan_result_async = plan_api.run_plan_calculation_async(task)
plan_id = plan_result_async.id
print(f"-> plan_id: {plan_id}")

while not is_calculation_complete(plan_id):
sleep(5)

plan_result = plan_api.read_plan_result(plan_id)
print("--> finish planning")
print(f"--> total statistics: {plan_result.statistics.total_statistics}")

print("--> start convert plan_result to xlsx")
converted_plan_result = convert_api.run_convert_json_to_xlsx(UniversalData(**plan_result.dict()))

with open("plan_result.xlsx", "wb") as data:
data.write(converted_plan_result)
print("--> finish convert plan_result to xlsx")