-
Notifications
You must be signed in to change notification settings - Fork 0
/
algorithm.py
256 lines (204 loc) · 6.62 KB
/
algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import copy
import random
from typing import List
from constants import Constants
from errors import BadArgument
from models.elevator import Elevator
from models.load import Load
from models.stats import GeneratedStats, SimulationStats
class ElevatorAlgorithm:
"""A global class that houses the elevators"""
name: str
def __init__(self, manager, floors=None, *, elevators=None, loads=None) -> None:
self.manager = manager
self._floors = floors if floors is not None else Constants.DEFAULT_FLOORS
self.elevators: List['Elevator'] = elevators or []
self.loads: List['Load'] = loads or []
self.max_load = 15 * 60
self.rnd = random.Random()
self.active = False
self.tick_count = 0
self.wait_times = GeneratedStats()
self.time_in_lift = GeneratedStats()
self.occupancy = GeneratedStats()
def copy(self):
"""Creates a copy of the algorithm"""
ev_algo = self.__class__(
self.manager,
floors=self.floors,
elevators=[elevator.copy() for elevator in self.elevators],
loads=[load.copy() for load in self.loads],
)
ev_algo.max_load = self.max_load
ev_algo.rnd = copy.copy(self.rnd)
ev_algo.tick_count = self.tick_count
ev_algo.wait_times = self.wait_times.copy()
ev_algo.time_in_lift = self.time_in_lift.copy()
ev_algo.occupancy = self.occupancy.copy()
ev_algo.active = self.active
return ev_algo
@property
def floors(self):
return self._floors
@floors.setter
def floors(self, value):
self._floors = value
self.on_floors_changed()
@property
def stats(self):
return SimulationStats(
ticks=self.tick_count,
algorithm_name=self.name,
wait_time=self.wait_times,
time_in_lift=self.time_in_lift,
occupancy=self.occupancy,
)
@property
def pending_loads(self) -> List['Load']:
return [load for load in self.loads if load.elevator is None]
@property
def simulation_running(self) -> bool:
"""Returns True if there are loads in the system"""
return len(self.loads) > 0
def get_new_destination(self, elevator):
"""Gets a new destination for an elevator
elevator: Elevator
The elevator to get a new destination for
"""
raise NotImplementedError('get_new_destination must be implemented in a subclass')
def pre_load_check(self, load, elevator):
"""Checks if a load is allowed to enter the elevator
load: Load
The load to check
elevator: Elevator
The elevator to check
"""
return True
def pre_unload_check(self, load, elevator):
"""Checks if a load is allowed to leave the elevator
load: Load
The load to check
elevator: Elevator
The elevator to check
"""
return True
# region Event Handlers
def pre_loop(self):
"""Runs at the start of every tick"""
pass
def post_loop(self):
"""Runs at the end of every tick"""
pass
def on_load_load(self, load, elevator):
"""Runs when a load is added to an elevator
load: Load
The load to check
elevator: Elevator
The elevator to check
"""
pass
def on_load_unload(self, load, elevator):
"""Runs after a load is unloaded
load: Load
The load to check
elevator: Elevator
The elevator to check
"""
pass
def on_elevator_move(self, elevator):
"""Runs when an elevator moves
elevator: Elevator
The elevator that moved
"""
pass
def on_elevator_added(self, elevator):
"""Runs when an elevator is added
elevator: Elevator
The elevator that was added
"""
pass
def on_elevator_removed(self, elevator):
"""Runs when an elevator is removed
elevator: Elevator
The elevator that was removed
"""
pass
def on_floors_changed(self):
"""Runs when the number of floors is changed"""
pass
def on_load_added(self, load):
"""Runs when a load is added
load: Load
The load that was added
"""
pass
def on_load_removed(self, load):
"""Runs when a load is removed
load: Load
The load that was removed
"""
pass
def on_simulation_end(self):
"""Runs when the simulation ends"""
pass
# endregion
def add_load(self, load):
"""Adds a load to the system
load: Load
The load to add"""
self.loads.append(load)
self.on_load_added(load)
def remove_load(self, load):
"""Removes a load from the system
load: Load
The load to remove
"""
self.loads.remove(load)
self.on_load_removed(load)
def create_elevator(self, current_floor=1):
"""Creates a new elevator
current_floor: int[Optional]
The current floor of the elevator
Default: 1
"""
new_id = 1
if self.elevators:
new_id = self.elevators[-1].id + 1
elevator = Elevator(self.manager, new_id, current_floor)
self.elevators.append(elevator)
self.on_elevator_added(elevator)
return elevator
def remove_elevator(self, elevator_id):
"""Removes an elevator
elevator: Elevator
The elevator to remove
"""
for elevator in self.elevators:
if elevator.id == elevator_id:
self.elevators.remove(elevator)
self.on_elevator_removed(elevator_id)
return
raise BadArgument(f'No elevator with id {elevator_id}')
def add_passenger(self, initial, destination):
"""Adds a passenger
initial: int
The floor the passenger is on
destination: int
The floor the passenger wants to go to
"""
load = Load(initial, destination, 60)
load.tick_created = self.tick_count
self.add_load(load)
def loop(self):
"""Runs a cycle of the elevator algorithm"""
# Boarding
self.pre_loop()
for elevator in self.elevators:
elevator.loop()
self.tick_count += 1
self.post_loop()
def __getstate__(self):
state = self.__dict__.copy()
if 'manager' in state:
del state['manager']
return state