Python
Installation
The name of the required library is in the description of each service.
All available libraries can be viewed at PYPI.
Installing the required library:
pip install vrt_lss_universal
Update multiple libraries at once:
pip install --upgrade vrt_lss_universal vrt_lss_routing vrt_lss_account
Installing the required version of the library
pip install vrt_lss_universal==6.10.2076
It is important to check the version of the service being accessed - it is specified in the documentation, and install the library of a compatible version.
Example
An example of using the universal library, in which:
- a planning task is created
- the task is being validated
- start asynchronous scheduling and download the result
- the result is converted to xlsx
import datetime
from time import sleep
from typing import List
from vrt_lss_universal import (
Configuration,
ApiClient,
SystemApi,
PlanTask,
Location,
Order,
Performer,
Transport,
Geopoint,
Demand,
PossibleEvent,
TimeWindow,
PerformerShift,
TransportShift,
PlanApi,
CalculationStatus,
Cargo,
Capacity,
Box,
TransportType,
ConvertApi,
DemandType,
UniversalData,
OrderCompatibilities )
# settings
HOST = "https://api.edge.veeroute.tech"
TOKEN = "BEARER_TOKEN"
# create client
configuration = Configuration(access_token=TOKEN, host=HOST)
client = ApiClient(configuration=configuration)
# create api instances
system_api = SystemApi(client)
plan_api = PlanApi(client)
convert_api = ConvertApi(client)
class PlanTaskGenerator:
def __init__(self):
self.time_window = TimeWindow(
var_from=datetime.datetime.now().astimezone() - datetime.timedelta(days=1),
to=datetime.datetime.now().astimezone() + datetime.timedelta(days=1),
)
def __get_locations(self) -> List[Location]:
return [
Location(
key="location_1",
geopoint=Geopoint(
latitude=59.464332,
longitude=31.991713,
),
arrival_duration="PT10S",
departure_duration="PT20S",
work_windows=[self.time_window],
),
Location(
key="location_2",
geopoint=Geopoint(
latitude=59.464332,
longitude=31.991714,
),
arrival_duration="PT10S",
departure_duration="PT20S",
work_windows=[self.time_window],
)
]
def __get_orders(self) -> List[Order]:
order_work = Order(
key="order_1",
demands=[
Demand(
key="demand_1",
demand_type=DemandType.WORK,
precedence_in_trip=0,
precedence_in_order=0,
possible_events=[
PossibleEvent(
key="event_1",
location_key="location_1",
duration="PT10M",
reward=1000.1,
hard_time_window=self.time_window,
)
],
)
],
compatibilities=OrderCompatibilities(
performer_restrictions=["asda"],
),
)
order_pickup_drop = Order(
key="order_2",
cargos=[
Cargo(
key="cargo_1",
capacity=Capacity(
mass=1.0,
volume=1.0,
capacity_a=1.0,
capacity_b=1.0,
capacity_c=1.0,
),
)
],
demands=[
Demand(
key="demand_2_pickup",
demand_type=DemandType.PICKUP,
target_cargos=["cargo_1"],
possible_events=[
PossibleEvent(
key="possible_event_2_pickup",
location_key="location_1",
duration="PT25S",
reward=1000.1,
hard_time_window=self.time_window,
soft_time_window=self.time_window,
)
],
),
Demand(
key="demand_2_drop",
demand_type=DemandType.DROP,
target_cargos=["cargo_1"],
possible_events=[
PossibleEvent(
key="possible_event_2_drop",
location_key="location_1",
duration="PT25S",
reward=1000.1,
hard_time_window=self.time_window,
soft_time_window=self.time_window,
)
],
),
],
)
return [
order_work,
order_pickup_drop,
]
def __get_performers(self) -> List[Performer]:
return [
Performer(
key="perf_1",
shifts=[
PerformerShift(
key="perf_shift_1",
availability_time=self.time_window,
working_time=self.time_window,
start_location_key="location_2",
finish_location_key=None,
max_locations=None,
max_stops=None,
tariff=None,
work_and_rest_rules=None,
)
],
own_transport_type=TransportType.CAR,
compatibilities=None,
limits=None,
)
]
def __get_transports(self) -> List[Transport]:
return [
Transport(
key="transport_1",
shifts=[
TransportShift(
key="tr_shift_1",
availability_time=self.time_window,
working_time=self.time_window,
start_location_key=None,
finish_location_key=None,
tariff=None,
)
],
transport_type=TransportType.TRUCK_1500,
boxes=[
Box(
key="box_1",
capacity=Capacity(
mass=2.0,
volume=2.0,
capacity_a=2.0,
capacity_b=2.0,
capacity_c=2.0,
),
compatibilities=None,
limits=None,
)
],
compatibilities=None,
limits=None,
)
]
def get_plan_task(self) -> PlanTask:
locations = self.__get_locations()
orders = self.__get_orders()
performers = self.__get_performers()
transports = self.__get_transports()
return PlanTask(
locations=locations,
orders=orders,
transports=transports,
performers=performers,
)
def is_calculation_complete(_plan_id: str) -> bool:
calculation_state = plan_api.read_plan_calculation_state(_plan_id)
_status = calculation_state.info.status
is_complete = (
_status == CalculationStatus.FINISHED_IN_TIME
or
_status == CalculationStatus.FINISHED_OUT_OF_TIME
)
return is_complete
if __name__ == "__main__":
check_response = system_api.check()
version = system_api.version()
print(check_response, version)
task = PlanTaskGenerator().get_plan_task()
k = client.sanitize_for_serialization(task)
print(k)
print("--> start validate plan_task")
v_result = plan_api.run_plan_validation(task)
print(f"--> finish validate plan_task with validation result: {v_result}")
print("--> start planning...")
plan_result_async = plan_api.run_plan_calculation_async(task)
plan_id = plan_result_async.id
print(f"-> plan_id: {plan_id}")
while not is_calculation_complete(plan_id):
sleep(5)
plan_result = plan_api.read_plan_result(plan_id)
print("--> finish planning")
print(f"--> total statistics: {plan_result.statistics.total_statistics}")
print("--> start convert plan_result to xlsx")
converted_plan_result = convert_api.run_convert_json_to_xlsx(UniversalData(**plan_result.dict()))
with open("plan_result.xlsx", "wb") as data:
data.write(converted_plan_result)
print("--> finish convert plan_result to xlsx")