Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed and extended ScheduledProject internals, various fixes #67

Merged
merged 8 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sampo"
version = "0.1.1.231"
version = "0.1.1.249"
description = "Open-source framework for adaptive manufacturing processes scheduling"
authors = ["iAirLab <[email protected]>"]
license = "BSD-3-Clause"
Expand Down
57 changes: 30 additions & 27 deletions sampo/pipeline/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,29 @@
import pandas as pd


def contractors_can_perform_work_graph(contractors: list[Contractor], wg: WorkGraph) -> bool:
is_at_least_one_contractor_can_perform = True
num_contractors_can_perform_node = 0

for node in wg.nodes:
reqs = node.work_unit.worker_reqs
for contractor in contractors:
offers = contractor.workers
for req in reqs:
if req.min_count > offers[req.kind].count:
is_at_least_one_contractor_can_perform = False
break
if is_at_least_one_contractor_can_perform:
num_contractors_can_perform_node += 1
break
is_at_least_one_contractor_can_perform = True
if num_contractors_can_perform_node == 0:
return False
num_contractors_can_perform_node = 0

return True


class DefaultInputPipeline(InputPipeline):
"""
Default pipeline, that help to use the framework
Expand Down Expand Up @@ -64,8 +87,6 @@ def contractors(self, contractors: list[Contractor] | pd.DataFrame | str) -> 'In
:param contractors: the contractors list for scheduling task
:return: the pipeline object
"""
if not DefaultInputPipeline._check_is_contractors_can_perform_work_graph(contractors, self._wg):
raise NoSufficientContractorError('Contractors are not able to perform the graph of works')
self._contractors = contractors
return self

Expand Down Expand Up @@ -152,6 +173,10 @@ def schedule(self, scheduler: Scheduler) -> 'SchedulePipeline':
work_resource_estimator=self._work_estimator,
unique_work_names_mapper=self._name_mapper
)

if not contractors_can_perform_work_graph(self._contractors, self._wg):
raise NoSufficientContractorError('Contractors are not able to perform the graph of works')

if isinstance(scheduler, GenericScheduler):
# if scheduler is generic, it supports injecting local optimizations
# cache upper-layer self to another variable to get it from inner class
Expand Down Expand Up @@ -205,7 +230,7 @@ def prioritization(wg: WorkGraph, work_estimator: WorkTimeEstimator):
schedule = schedule2

case _:
wg = graph_restructuring(self._wg, self._lag_optimize)
wg = graph_restructuring(self._wg, self._lag_optimize.value)
schedule, _, _, node_order = scheduler.schedule_with_cache(wg, self._contractors,
self._landscape_config,
self._spec,
Expand All @@ -214,29 +239,6 @@ def prioritization(wg: WorkGraph, work_estimator: WorkTimeEstimator):

return DefaultSchedulePipeline(self, wg, schedule)

@staticmethod
def _check_is_contractors_can_perform_work_graph(contractors: list[Contractor], wg: WorkGraph) -> bool:
is_at_least_one_contractor_can_perform = True
num_contractors_can_perform_node = 0

for node in wg.nodes:
reqs = node.work_unit.worker_reqs
for contractor in contractors:
offers = contractor.workers
for req in reqs:
if req.min_count > offers[req.kind].count:
is_at_least_one_contractor_can_perform = False
break
if is_at_least_one_contractor_can_perform:
num_contractors_can_perform_node += 1
break
is_at_least_one_contractor_can_perform = True
if num_contractors_can_perform_node == 0:
return False
num_contractors_can_perform_node = 0

return True


# noinspection PyProtectedMember
class DefaultSchedulePipeline(SchedulePipeline):
Expand All @@ -249,6 +251,7 @@ def __init__(self, s_input: DefaultInputPipeline, wg: WorkGraph, schedule: Sched
self._scheduled_works = {wg[swork.id]:
swork for swork in schedule.to_schedule_work_dict.values()}
self._local_optimize_stack = ApplyQueue()
self._start_date = None

def optimize_local(self, optimizer: ScheduleLocalOptimizer, area: range) -> 'SchedulePipeline':
self._local_optimize_stack.add(optimizer.optimize,
Expand All @@ -262,4 +265,4 @@ def optimize_local(self, optimizer: ScheduleLocalOptimizer, area: range) -> 'Sch
def finish(self) -> ScheduledProject:
processed_sworks = self._local_optimize_stack.apply(self._scheduled_works)
schedule = Schedule.from_scheduled_works(processed_sworks.values(), self._wg)
return ScheduledProject(self._wg, self._input._contractors, schedule)
return ScheduledProject(self._input._wg, self._wg, self._input._contractors, schedule)
6 changes: 5 additions & 1 deletion sampo/scheduler/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,14 @@ def build_scheduler(self,
finish_time += start_time

if index == len(ordered_nodes) - 1: # we are scheduling the work `end of the project`
start_time = max(start_time, timeline.zone_timeline.finish_statuses())
finish_time, finalizing_zones = timeline.zone_timeline.finish_statuses()
start_time = max(start_time, finish_time)

# apply work to scheduling
timeline.schedule(node, node2swork, best_worker_team, contractor, work_spec,
start_time, work_spec.assigned_time, assigned_parent_time, work_estimator)

if index == len(ordered_nodes) - 1: # we are scheduling the work `end of the project`
node2swork[node].zones_pre = finalizing_zones

return node2swork.values(), assigned_parent_time, timeline
6 changes: 5 additions & 1 deletion sampo/scheduler/genetic/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,16 @@ def work_scheduled(args) -> bool:
st = assigned_parent_time # this work should always have st = 0, so we just re-assign it

if idx == len(works_order) - 1: # we are scheduling the work `end of the project`
st = max(st, timeline.zone_timeline.finish_statuses())
finish_time, finalizing_zones = timeline.zone_timeline.finish_statuses()
st = max(start_time, finish_time)

# finish using time spec
timeline.schedule(node, node2swork, worker_team, contractor, work_spec,
st, exec_time, assigned_parent_time, work_estimator)

if idx == len(works_order) - 1: # we are scheduling the work `end of the project`
node2swork[node].zones_pre = finalizing_zones

work_timeline.update_timeline(st, exec_time, None)
return True
return False
Expand Down
15 changes: 12 additions & 3 deletions sampo/scheduler/timeline/zone_timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ def update_timeline(self, index: int, zones: list[Zone], start_time: Time, exec_
end_time=start_time))
return sworks

def append_statuses(self, zones: list[Zone]) -> Time:
def append_statuses(self, zones: list[Zone]) -> tuple[Time, list[ZoneTransition]]:
global_finish_time = Time(0)
sworks = []

for zone in zones:
state = self._timeline[zone.name]
Expand All @@ -286,9 +287,17 @@ def append_statuses(self, zones: list[Zone]) -> Time:
state.add(ScheduleEvent(Time.inf().value, EventType.START, latest_time, None, zone.status))
state.add(ScheduleEvent(Time.inf().value, EventType.END, finish_time, None, zone.status))

if latest_status != zone.status and zone.status != 0:
# if we need to change status, record it
sworks.append(ZoneTransition(name=zone.name,
from_status=latest_status,
to_status=zone.status,
start_time=latest_time - change_cost,
end_time=latest_time))

global_finish_time = max(global_finish_time, finish_time)

return global_finish_time
return global_finish_time, sworks

def finish_statuses(self) -> Time:
def finish_statuses(self) -> tuple[Time, list[ZoneTransition]]:
return self.append_statuses([Zone(*v) for v in self._config.end_statuses])
20 changes: 18 additions & 2 deletions sampo/schemas/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,25 @@


class ScheduledProject(AutoJSONSerializable['ScheduledProject']):
def __init__(self, wg: WorkGraph, contractors: list[Contractor], schedule: Schedule):
self.schedule = schedule

ignored_fields = ['raw_schedule', 'raw_wg']

def __init__(self, wg: WorkGraph, raw_wg: WorkGraph, contractors: list[Contractor], schedule: Schedule):
StannisMod marked this conversation as resolved.
Show resolved Hide resolved
"""
Contains schedule and all information about its creation
:param wg: the original work graph
:param raw_wg: restructured work graph, which was given directly to scheduler to produce this schedule
:param contractors: list of contractors
:param schedule: the raw schedule received directly from scheduler
"""
# the final variant of schedule, without any technical issues
self.schedule = schedule.unite_stages()
# the raw schedule, with inseparables
self.raw_schedule = schedule
# the original work graph
self.wg = wg
# internally processed work graph, with inseparables
self.raw_wg = raw_wg
self.contractors = contractors

@custom_serializer('contractors')
Expand Down
35 changes: 21 additions & 14 deletions sampo/schemas/schedule.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
from datetime import datetime
from functools import partial, lru_cache
from typing import Iterable, Union
Expand All @@ -10,7 +11,7 @@
from sampo.schemas.time import Time
from sampo.schemas.works import WorkUnit
from sampo.utilities.datetime_util import add_time_delta
from sampo.utilities.schedule import fix_split_tasks
from sampo.utilities.schedule import fix_split_tasks, offset_schedule

ResourceSchedule = dict[str, list[tuple[Time, Time]]]
ScheduleWorkDict = dict[str, ScheduledWork]
Expand Down Expand Up @@ -106,22 +107,28 @@ def merged_stages_datetime_df(self, offset: Union[datetime, str]) -> DataFrame:
:param offset: Start of schedule, to add as an offset.
:return: Shifted schedule DataFrame with merged tasks.
"""
result = fix_split_tasks(self.offset_schedule(offset))
result = fix_split_tasks(offset_schedule(self._schedule, offset))
return result

def offset_schedule(self, offset: Union[datetime, str]) -> DataFrame:
def unite_stages(self) -> 'Schedule':
"""
Returns full schedule object with `start` and `finish` columns pushed by date in `offset` argument.
:param offset: Start of schedule, to add as an offset.
:return: Shifted schedule DataFrame.
"""
r = self._schedule.loc[:, :]
r['start_offset'] = r['start'].apply(partial(add_time_delta, offset))
r['finish_offset'] = r['finish'].apply(partial(add_time_delta, offset))
r = r.rename({'start': 'start_', 'finish': 'finish_',
'start_offset': 'start', 'finish_offset': 'finish'}, axis=1) \
.drop(['start_', 'finish_'], axis=1)
return r
Merge stages and reconstruct the `Schedule`
:return: `Schedule` with inseparable chains united
"""
merged_df = fix_split_tasks(self._schedule)
StannisMod marked this conversation as resolved.
Show resolved Hide resolved

def f(row):
swork: ScheduledWork = deepcopy(row[self._scheduled_work_column])
row[self._scheduled_work_column] = swork
swork.name = row['task_name_mapped']
swork.display_name = row['task_name']
swork.volume = float(row['volume'])
swork.start_end_time = Time(int(row['start'])), Time(int(row['finish']))
return row

merged_df = merged_df.apply(f, axis=1)

return Schedule.from_scheduled_works(works=merged_df[self._scheduled_work_column])

@staticmethod
def from_scheduled_works(works: Iterable[ScheduledWork],
Expand Down
29 changes: 27 additions & 2 deletions sampo/utilities/schedule.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
from datetime import datetime
from functools import partial

import numpy as np
import pandas as pd
from pandas import DataFrame

from sampo.structurator import STAGE_SEP
from sampo.utilities.datetime_util import add_time_delta


def offset_schedule(schedule: DataFrame, offset: datetime | str) -> DataFrame:
"""
Returns full schedule object with `start` and `finish` columns pushed by date in `offset` argument.
:param schedule: the schedule itself
:param offset: Start of schedule, to add as an offset.
:return: Shifted schedule DataFrame.
"""
r = schedule.loc[:, :]
r['start_offset'] = r['start'].apply(partial(add_time_delta, offset))
r['finish_offset'] = r['finish'].apply(partial(add_time_delta, offset))
r = r.rename({'start': 'start_', 'finish': 'finish_',
'start_offset': 'start', 'finish_offset': 'finish'}, axis=1) \
.drop(['start_', 'finish_'], axis=1)
return r


def fix_split_tasks(baps_schedule_df: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -43,7 +65,7 @@ def merge_split_stages(task_df: pd.DataFrame) -> pd.Series:
df = task_df.copy()

df = df.iloc[-1:].reset_index(drop=True)
for column in ['task_id', 'task_name']:
for column in ['task_id', 'task_name', 'task_name_mapped']:
df.loc[0, column] = df.loc[0, column].split(STAGE_SEP)[0] # fix task id and name

# sum up volumes through all stages
Expand All @@ -53,7 +75,10 @@ def merge_split_stages(task_df: pd.DataFrame) -> pd.Series:
# fix task's start time and duration
df.loc[0, 'start'] = task_df.loc[0, 'start']
df.loc[0, 'finish'] = task_df.loc[len(task_df) - 1, 'finish']
df.loc[0, 'duration'] = (df.loc[0, 'finish'] - df.loc[0, 'start']).days + 1
if isinstance(df.loc[0, 'start'], np.int64) or isinstance(df.loc[0, 'start'], np.int32):
df.loc[0, 'duration'] = df.loc[0, 'finish'] - df.loc[0, 'start'] + 1
else:
df.loc[0, 'duration'] = (df.loc[0, 'finish'] - df.loc[0, 'start']).days + 1
else:
df = task_df.copy()

Expand Down
2 changes: 1 addition & 1 deletion sampo/utilities/visualization/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,6 @@ def get_zone_usage_info(swork) -> str:
title_text='Date')

fig.update_layout(autosize=True, font_size=12)
fig.update_layout(height=1000)
# fig.update_layout(height=1000)

return visualize(fig, mode=visualization, file_name=fig_file_name)
4 changes: 3 additions & 1 deletion tests/pipeline/basic_pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ def test_plain_scheduling_with_no_sufficient_number_of_contractors(setup_wg, set
try:
SchedulingPipeline.create() \
.wg(setup_wg) \
.contractors(setup_empty_contractors)
.contractors(setup_empty_contractors) \
.schedule(HEFTScheduler()) \
.finish()
except NoSufficientContractorError:
thrown = True

Expand Down