Skip to main content

Python

Installation

The name of the required library is in the description of each service.

All available libraries can be viewed at PYPI.

Installing the required library:

pip install vrt_lss_universal

Update multiple libraries at once:

pip install --upgrade vrt_lss_universal vrt_lss_routing vrt_lss_account

Installing the required version of the library

pip install vrt_lss_universal==6.10.2076

It is important to check the version of the service being accessed - it is specified in the documentation, and install the library of a compatible version.

Example

An example of using the universal library, in which:

  • a planning task is created
  • the task is being validated
  • start asynchronous scheduling and download the result
  • the result is converted to xlsx
import datetime
from time import sleep
from typing import List

from vrt_lss_universal import (
Configuration,
ApiClient,
SystemApi,
PlanTask,
Location,
Order,
Performer,
Transport,
Geopoint,
Demand,
PossibleEvent,
TimeWindow,
PerformerShift,
TransportShift,
PlanApi,
CalculationStatus,
Cargo,
Capacity,
Box,
TransportType,
ConvertApi,
DemandType,
UniversalData,
OrderCompatibilities )

# settings
HOST = "https://api.edge.veeroute.tech"
TOKEN = "BEARER_TOKEN"

# create client
configuration = Configuration(access_token=TOKEN, host=HOST)
client = ApiClient(configuration=configuration)

# create api instances
system_api = SystemApi(client)
plan_api = PlanApi(client)
convert_api = ConvertApi(client)


class PlanTaskGenerator:
def __init__(self):
self.time_window = TimeWindow(
var_from=datetime.datetime.now().astimezone() - datetime.timedelta(days=1),
to=datetime.datetime.now().astimezone() + datetime.timedelta(days=1),
)

def __get_locations(self) -> List[Location]:
return [
Location(
key="location_1",
geopoint=Geopoint(
latitude=59.464332,
longitude=31.991713,
),
arrival_duration="PT10S",
departure_duration="PT20S",
work_windows=[self.time_window],
),
Location(
key="location_2",
geopoint=Geopoint(
latitude=59.464332,
longitude=31.991714,
),
arrival_duration="PT10S",
departure_duration="PT20S",
work_windows=[self.time_window],
)
]

def __get_orders(self) -> List[Order]:
order_work = Order(
key="order_1",
demands=[
Demand(
key="demand_1",
demand_type=DemandType.WORK,
precedence_in_trip=0,
precedence_in_order=0,
possible_events=[
PossibleEvent(
key="event_1",
location_key="location_1",
duration="PT10M",
reward=1000.1,
hard_time_window=self.time_window,
)
],
)
],
compatibilities=OrderCompatibilities(
performer_restrictions=["asda"],
),
)
order_pickup_drop = Order(
key="order_2",
cargos=[
Cargo(
key="cargo_1",
capacity=Capacity(
mass=1.0,
volume=1.0,
capacity_a=1.0,
capacity_b=1.0,
capacity_c=1.0,
),
)
],
demands=[
Demand(
key="demand_2_pickup",
demand_type=DemandType.PICKUP,
target_cargos=["cargo_1"],
possible_events=[
PossibleEvent(
key="possible_event_2_pickup",
location_key="location_1",
duration="PT25S",
reward=1000.1,
hard_time_window=self.time_window,
soft_time_window=self.time_window,
)
],
),
Demand(
key="demand_2_drop",
demand_type=DemandType.DROP,
target_cargos=["cargo_1"],
possible_events=[
PossibleEvent(
key="possible_event_2_drop",
location_key="location_1",
duration="PT25S",
reward=1000.1,
hard_time_window=self.time_window,
soft_time_window=self.time_window,
)
],
),
],
)
return [
order_work,
order_pickup_drop,
]

def __get_performers(self) -> List[Performer]:
return [
Performer(
key="perf_1",
shifts=[
PerformerShift(
key="perf_shift_1",
availability_time=self.time_window,
working_time=self.time_window,
start_location_key="location_2",
finish_location_key=None,
max_locations=None,
max_stops=None,
tariff=None,
work_and_rest_rules=None,
)
],
own_transport_type=TransportType.CAR,
compatibilities=None,
limits=None,
)
]

def __get_transports(self) -> List[Transport]:
return [
Transport(
key="transport_1",
shifts=[
TransportShift(
key="tr_shift_1",
availability_time=self.time_window,
working_time=self.time_window,
start_location_key=None,
finish_location_key=None,
tariff=None,
)
],
transport_type=TransportType.TRUCK_1500,
boxes=[
Box(
key="box_1",
capacity=Capacity(
mass=2.0,
volume=2.0,
capacity_a=2.0,
capacity_b=2.0,
capacity_c=2.0,
),
compatibilities=None,
limits=None,
)
],
compatibilities=None,
limits=None,
)
]

def get_plan_task(self) -> PlanTask:
locations = self.__get_locations()
orders = self.__get_orders()
performers = self.__get_performers()
transports = self.__get_transports()

return PlanTask(
locations=locations,
orders=orders,
transports=transports,
performers=performers,
)


def is_calculation_complete(_plan_id: str) -> bool:
calculation_state = plan_api.read_plan_calculation_state(_plan_id)
_status = calculation_state.info.status
is_complete = (
_status == CalculationStatus.FINISHED_IN_TIME
or
_status == CalculationStatus.FINISHED_OUT_OF_TIME
)
return is_complete


if __name__ == "__main__":
check_response = system_api.check()
version = system_api.version()
print(check_response, version)

task = PlanTaskGenerator().get_plan_task()
k = client.sanitize_for_serialization(task)
print(k)

print("--> start validate plan_task")
v_result = plan_api.run_plan_validation(task)
print(f"--> finish validate plan_task with validation result: {v_result}")

print("--> start planning...")
plan_result_async = plan_api.run_plan_calculation_async(task)
plan_id = plan_result_async.id
print(f"-> plan_id: {plan_id}")

while not is_calculation_complete(plan_id):
sleep(5)

plan_result = plan_api.read_plan_result(plan_id)
print("--> finish planning")
print(f"--> total statistics: {plan_result.statistics.total_statistics}")

print("--> start convert plan_result to xlsx")
converted_plan_result = convert_api.run_convert_json_to_xlsx(UniversalData(**plan_result.dict()))

with open("plan_result.xlsx", "wb") as data:
data.write(converted_plan_result)
print("--> finish convert plan_result to xlsx")