-
Notifications
You must be signed in to change notification settings - Fork 0
/
search.py
342 lines (245 loc) · 10.7 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
""" Contains all local search algorithms except parallel hillclimbing, as well as an abstract class for search algorithms. """
import numpy as np
from multiprocessing import Process, Manager
import time
from itertools import count, compress
import random
from math import exp
from searchutils import value_function, neighbors_func
class Abstract_Search():
"""
This is an abstract search class that all other search-algorithms can inherit.
"""
def __init__(self, warehouse, order, log_var = None, window = None):
self.directories = [warehouse, order]
self.items = self.get_items(warehouse)
self.order = self.open_order(order, self.items)
self.psus, self.psu_nrs = self.get_psus(warehouse, self.items, self.order)
# init start_state
self.start_state = np.random.choice([True, False], len(self.psus), p=[np.count_nonzero(self.order) / len(self.items),
1 - (np.count_nonzero(self.order) / len(self.items))])
self.log_var = log_var
self.window = window
def start(self):
"""
Initializes the start states of a search
:return: current-state, value, neighbors, value_neighbors
"""
current = self.start_state
value = self.value_function(current)
neighbors = self.neighbors(current)
value_neighbors = np.apply_along_axis(self.value_function, 1, neighbors)
return current, value, neighbors, value_neighbors
def get_items(self, path):
"""
Retrieves a list of all items from the given file
:param path: file path
:return: list of items
"""
with open(path) as f:
data = f.read()
lines = data.split("\n")
items = lines[0].split(" ")
return items
def get_psus(self, path, items, order):
"""
Retrieves a list of the PSUs that contain items of the current order
A PSU is described by a binary array that contains the order items
:param path: file path
:param items: list of all items
:return: 2D array containing the psus
"""
with open(path) as f:
data = f.read()
lines = data.split("\n")
# collect the important psus for our search and remember their psu-nr
psus = []
psu_nrs = []
order_index = np.nonzero(order)
for index, line in enumerate(lines[2:]):
psu_temp = np.zeros(len(items), dtype=bool)
psu_items = line.split(" ")
for item in psu_items:
item_index = items.index(item)
psu_temp[item_index] = 1
if np.any(psu_temp[order_index]):
psus.append(psu_temp[order_index])
psu_nrs.append(index)
psus = np.asarray(psus)
psu_nrs = np.asarray(psu_nrs)
return psus, psu_nrs
def open_order(self, path, items):
"""
Retrieves the order from the given file
:param path: file path
:param items: list of all get_items
:return: list of all ordered items
"""
with open(path) as f:
order_raw = f.read()
order_raw = order_raw.split(' ')
order = np.zeros(len(items), dtype=bool)
for item_in_order in order_raw:
order[items.index(item_in_order)] = True
return order
def print_solution(self, final_state):
"""
Returns a string representation of the final state with Information of the order, the value of the end
state and the PSUs needed including their used items
"""
output = ""
# display order
psus_state = np.compress(final_state, self.psus, axis=0)
order_raw = np.compress(self.order, self.items)
output += "Order: " + str(set(order_raw)) + '\n\n'
# Numbers of psus needed
number_of_psus = np.count_nonzero(final_state)
output += "Number of PSUs needed: {}\n\n".format(number_of_psus)
# display psus used
for index, psu in enumerate(psus_state):
items_in_psu = np.compress(psu, order_raw)
output += f"PSU Nr.{self.psu_nrs[index] + 1}: {items_in_psu}" + '\n'
# display value
output += f"\nValue of end state: {self.value_function(final_state)}\n"
return output
def value_function(self, state):
"""
Evaluates how good a subset of PSU fulfills the order
:param state: binary array describing used PSUs
:return: value of state
"""
return value_function(state, self.order, self.psus)
def neighbors(self, state):
"""
Creates all neighbors of a given state
A state's neighbor is identical to the state except at exactly one
position
:param state: binary array describing used PSUs
:return: 2D array containing all state's neighbors
"""
return neighbors_func(state)
def termination(self, value, value_neighbors):
"""
Checks if there is a higher value in its neighborhood
:param value: value
:param value_neighbors: list of values
:return: Bool
"""
return not np.any(value_neighbors > value)
class Hill_Climbing(Abstract_Search):
"""
Starts with a random state and continues with the best state of its neighborhood until there is no improvement possible
in the neighborhood (local maximum).
"""
def search(self, start_state=None):
current, value, neighbors, value_neighbors = self.start()
iteration = 0
while not self.termination(value, value_neighbors):
t = time.time()
iteration += 1
# Choose the biggest neighbour
max_neighbor = neighbors[np.argmax(np.apply_along_axis(self.value_function, 1, neighbors))]
# Calculate new current and view it
current = max_neighbor
value = self.value_function(current)
if self.log_var == None:
print(iteration, value)
else:
self.log_var.set(value)
self.window.update()
# Create new neighbours and their values
neighbors = self.neighbors(current)
value_neighbors = np.apply_along_axis(self.value_function, 1, neighbors)
t = time.time() - t
return current
class First_Choice_Hill_Climbing(Abstract_Search):
"""
Starts with a random state and continues with the first better state of its neighborhood until
there is no improvement possible in the neighborhood (local maximum).
"""
def search(self):
current, value, neighbors, value_neighbors = self.start()
iteration = 0
while not self.termination(value, value_neighbors):
iteration += 1
# Choose first neighbour that is better than current state
first_bigger_neighbor = neighbors[np.argmax(value_neighbors > value)]
# Calculate new current and view it
current = first_bigger_neighbor
value = self.value_function(current)
if self.log_var is not None: self.log_var.set(value)
if self.window is not None: self.window.update()
# Create new neighbours and their values
neighbors = self.neighbors(current)
value_neighbors = np.apply_along_axis(self.value_function, 1, neighbors)
return current
class Local_Beam_Search(Abstract_Search):
"""
Starts with k states, does hillclimbing and continues with k best values of the union of the neighborhoods.
"""
def search(self, k):
# initializes k start states
k_states = np.random.choice([True, False], (k, len(self.psus)),
p=[np.count_nonzero(self.order) / len(self.psus),
1 - (np.count_nonzero(self.order) / len(self.psus))])
# Generate neighbours of current states
all_neighbors = np.apply_along_axis(self.neighbors, 1, k_states)
all_neighbors = all_neighbors.reshape(-1, all_neighbors.shape[-1])
# If no neighbour is better than worst current state return
value_neighbors = np.apply_along_axis(self.value_function, 1, all_neighbors)
value = np.amin(np.apply_along_axis(self.value_function, 1, k_states))
iteration = 0
while not self.termination(value, value_neighbors):
iteration += 1
# Continue with k best neighbours
sort = np.argsort(value_neighbors)
k_states = all_neighbors[sort][-k:]
# Generate neighbours of current states
all_neighbors = np.apply_along_axis(self.neighbors, 1, k_states)
all_neighbors = all_neighbors.reshape(-1, all_neighbors.shape[-1])
# If no neighbour is better than worst current state return
value_neighbors = np.apply_along_axis(self.value_function, 1, all_neighbors)
values = np.apply_along_axis(self.value_function, 1, k_states)
value = np.amin(values)
# Update graph
if self.log_var == None:
print(iteration, values)
else:
self.log_var.set(list(values))
self.window.update()
return k_states[-1]
class Simulated_Annealing(Abstract_Search):
def schedule(self, t):
return 100 * 0.9 ** t
def search(self):
current = self.start_state
for t in count():
# Updates temperature using the time schedule
temp = self.schedule(t)
# Returns current state if temperature is 0
if t == 500:
final_hc = Hill_Climbing(self.directories[0], self.directories[1], self.log_var, self.window)
final_hc.start_state = current
return final_hc.search()
# Choose random neighbour and calculates ∆E
next_neighbor = random.choice(self.neighbors(current))
delta_e = self.value_function(next_neighbor) - self.value_function(current)
# If the random neighbour is better, continue search with it
if delta_e > 0:
current = next_neighbor
# If it is worse, continue with the random neighbour
# with probability e^(∆E / temperature)
else:
if random.random() < exp(delta_e / temp):
current = next_neighbor
# Update graph
value = self.value_function(current)
if self.log_var == None:
print(t, value)
else:
self.log_var.set(value)
self.window.update()
''' Testing the Search '''
if __name__ == '__main__':
s = Simulated_Annealing('data/problem1.txt', 'data/order11.txt')
s.search()