1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
|
import numpy as np
import copy
from vrptw_base import VrptwGraph
from threading import Event
class Ant:
"""Instancia de agente en Ant Colony System. Paralelizable.
Attributes
----------
graph : VrptwGraph
VRPTW problem instance (parameters + graph + pheromones)
current_index : int
Current route's vehicle's location
vehicle_load : float
Current route's vehicle's spare capacity
vehicle_travel_time : float
Current route's vehicle's travel time
travel_path : list
Representation of the solution. A sequence of node indeces.
arrival_time : list
?
index_to_visit : list
Remaining node indices not yet included in travel_path
total_travel_distance : float
"""
def __init__(self, graph: VrptwGraph, start_index=0):
super()
self.graph = graph
self.current_index = start_index
self.vehicle_load = 0
self.vehicle_travel_time = 0
self.travel_path = [start_index]
self.arrival_time = [0]
self.index_to_visit = list(range(graph.node_num))
self.index_to_visit.remove(start_index)
self.total_travel_distance = 0
def clear(self):
"""Restart Ant solution (Kill)"""
self.travel_path.clear()
self.index_to_visit.clear()
def move_to_next_index(self, next_index):
"""Update ant path + "current simulated vehicle route parameters"."""
self.travel_path.append(next_index)
dist = self.graph.node_dist_mat[self.current_index][next_index]
self.total_travel_distance += dist
self.arrival_time.append(self.vehicle_travel_time + dist * 1)
if self.graph.nodes[next_index].is_depot:
# If the next location is a depot, the vehicle load, etc., must be cleared
# (start next vehicle route)
self.vehicle_load = 0
self.vehicle_travel_time = 0
else:
# Update vehicle load, distance traveled, time
self.vehicle_load += self.graph.nodes[next_index].demand
# If it is earlier than the time window (ready_time) requested by the client, you need to wait
self.vehicle_travel_time += dist * 1 \
+ max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist * 1, 0) \
+ self.graph.nodes[next_index].service_time
self.index_to_visit.remove(next_index)
self.current_index = next_index
def index_to_visit_empty(self):
return len(self.index_to_visit) == 0
def get_active_vehicles_num(self):
return self.travel_path.count(0)-1
def check_condition(self, next_index) -> bool:
"""Check if moving to next point satisfies constraints
Returns
-------
bool
Conditions satisfied.
"""
if self.vehicle_load + self.graph.nodes[next_index].demand > self.graph.vehicle_capacity:
# Capacity is not enough to serve whole customer demand
return False
# Time that takes traversing the arc ("tramo") (vehicle mean speed may vary)
dist_time = self.graph.node_dist_mat[self.current_index][next_index] * 1
wait_time = max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist_time, 0)
service_time = self.graph.nodes[next_index].service_time
# Check whether a traveler can return to the service shop after visiting a certain traveler
if self.vehicle_travel_time + dist_time + wait_time + service_time \
+ self.graph.node_dist_mat[next_index][0] > self.graph.nodes[0].due_time:
return False
# Passengers outside the due time cannot be served
if self.vehicle_travel_time + dist_time > self.graph.nodes[next_index].due_time:
return False
return True
def cal_next_index_meet_constrains(self):
"""Find all customers reachable from the current location (ant.current_index)
Returns
-------
list
Neighbor nodes' indices that satisfy constraints
"""
next_index_meet_constrains = []
for next_index in self.index_to_visit:
if self.check_condition(next_index):
next_index_meet_constrains.append(next_index)
return next_index_meet_constrains
def cal_nearest_next_index(self, next_index_list):
"""
Select from the available customers, the customer closest to the current
location (ant.current_index)
Parameters
----------
next_index_list : list
???
Returns
-------
int
"""
current_index = self.current_index
nearest_index = next_index_list[0]
min_dist = self.graph.node_dist_mat[current_index][nearest_index]
for next_index in next_index_list[1:]:
dist = self.graph.node_dist_mat[current_index][next_index]
if dist < min_dist:
min_dist = dist
nearest_index = next_index
return nearest_index
@staticmethod
def cal_total_travel_distance(graph: VrptwGraph, travel_path):
"""
(Update to ignore depot-to-depot distances later)
"""
distance = 0
current_index = travel_path[0]
for next_index in travel_path[1:]:
distance += graph.node_dist_mat[current_index][next_index]
current_index = next_index
return distance
def try_insert_on_path(self, node_id, stop_event: Event):
"""
Attempt to insert node_id into current travel_path. Inserted locations
cannot violate load, time, travel distance restrictions. If there are
multiple positions, find the best position.
Parameters
----------
Returns
-------
Note
----
Seems like at least one vehicle route (depot-to-depot substring) in
travel_path is required. This is OK since
"""
best_insert_index = None
best_distance = None
for insert_index in range(len(self.travel_path)):
if stop_event.is_set():
# print('[try_insert_on_path]: receive stop event')
return
if self.graph.nodes[self.travel_path[insert_index]].is_depot:
continue
# Find the nearest depot before insert_index
last_depot_index = insert_index
while last_depot_index > 0 and not self.graph.nodes[self.travel_path[last_depot_index]].is_depot:
last_depot_index -= 1
# check_ant starts from last_depot_index
check_ant = Ant(self.graph, self.travel_path[last_depot_index])
# Let check_ant walk through the point in the path from
# last_depot_index to insert_index-1
for i in range(last_depot_index+1, insert_index):
# (only one depot, so no start point needs to be passed)
check_ant.move_to_next_index(self.travel_path[i])
# Begin to tentatively visit the nodes in the sorted index_to_visit
if check_ant.check_condition(node_id):
check_ant.move_to_next_index(node_id)
else:
continue
# If you can get to the node_id, make sure that the vehicle can drive back to the depot
for next_index in self.travel_path[insert_index:]:
if stop_event.is_set():
# print('[try_insert_on_path]: receive stop event')
return
if check_ant.check_condition(next_index):
check_ant.move_to_next_index(next_index)
# If back to the depot, then it means the vehicle whose
# route would be modified is over *and valid*. So now,
# compute the new total_travel_distance if node_id where
# added just before travel_path[insert_index]
if self.graph.nodes[next_index].is_depot:
insert_node_id_prev = self.travel_path[insert_index-1]
insert_node_id = self.travel_path[insert_index]
check_ant_distance = self.total_travel_distance \
- self.graph.node_dist_mat[insert_node_id_prev][insert_node_id] \
+ self.graph.node_dist_mat[insert_node_id_prev][node_id] \
+ self.graph.node_dist_mat[node_id][insert_node_id]
if best_distance is None or check_ant_distance < best_distance:
best_distance = check_ant_distance
best_insert_index = insert_index
break
# If you can't go back to the depot, simmulated route was invalid
else:
break
return best_insert_index
def insertion_procedure(self, stop_event: Event):
"""
Try to find a suitable location for each unvisited node, insert into the
current travel_path and insert the location without violating load,
time, travel distance constraints
:return:
"""
if self.index_to_visit_empty():
return
success_to_insert = True
# Until none of the unvisited nodes can be inserted successfully
while success_to_insert:
success_to_insert = False
# Get unvisited nodes
ind_to_visit = np.array(copy.deepcopy(self.index_to_visit))
# Sort ind_to_visit by customer demand in descending order
demand = np.zeros(len(ind_to_visit))
for i, ind in zip(range(len(ind_to_visit)), ind_to_visit):
demand[i] = self.graph.nodes[ind].demand
arg_index = np.argsort(demand)[::-1]
ind_to_visit = ind_to_visit[arg_index]
# Try to insert at least one from the ordered list
for node_id in ind_to_visit:
if stop_event.is_set():
# print('[insertion_procedure]: receive stop event')
return
best_insert_index = self.try_insert_on_path(node_id, stop_event)
if best_insert_index is not None:
self.travel_path.insert(best_insert_index, node_id)
self.index_to_visit.remove(node_id)
# print('[insertion_procedure]: success to insert %d(node id) in %d(index)' % (node_id, best_insert_index))
success_to_insert = True
del demand
del ind_to_visit
if self.index_to_visit_empty():
print('[insertion_procedure]: success in insertion')
self.total_travel_distance = Ant.cal_total_travel_distance(self.graph, self.travel_path)
@staticmethod
def local_search_once(graph: VrptwGraph, travel_path: list, travel_distance: float, i_start, stop_event: Event):
"""
Returns
-------
list
None, None, None
"""
# Find the locations of all depots in the path
depot_index = []
for ind in range(len(travel_path)):
if graph.nodes[travel_path[ind]].is_depot:
depot_index.append(ind)
# Divide self.travel_path into multiple segments, each segment starts
# with depot and ends with depot, called route
for i in range(i_start, len(depot_index)):
for j in range(i + 1, len(depot_index)):
if stop_event.is_set():
return None, None, None
for start_a in range(depot_index[i - 1] + 1, depot_index[i]):
for end_a in range(start_a, min(depot_index[i], start_a + 6)):
for start_b in range(depot_index[j - 1] + 1, depot_index[j]):
for end_b in range(start_b, min(depot_index[j], start_b + 6)):
if start_a == end_a and start_b == end_b:
continue
# Create new travel_path changing swaping routes
# a and b
new_path = []
new_path.extend(travel_path[:start_a])
new_path.extend(travel_path[start_b:end_b + 1])
new_path.extend(travel_path[end_a:start_b])
new_path.extend(travel_path[start_a:end_a])
new_path.extend(travel_path[end_b + 1:])
depot_before_start_a = depot_index[i - 1]
depot_before_start_b = depot_index[j - 1] + (end_b - start_b) - (end_a - start_a) + 1
if not graph.nodes[new_path[depot_before_start_b]].is_depot:
raise RuntimeError('error')
# Determine whether the changed route a is feasible
success_route_a = False
check_ant = Ant(graph, new_path[depot_before_start_a])
for ind in new_path[depot_before_start_a + 1:]:
if check_ant.check_condition(ind):
check_ant.move_to_next_index(ind)
if graph.nodes[ind].is_depot:
success_route_a = True
break
else:
break
check_ant.clear()
del check_ant
# Determine whether the changed route b is feasible
success_route_b = False
check_ant = Ant(graph, new_path[depot_before_start_b])
for ind in new_path[depot_before_start_b + 1:]:
if check_ant.check_condition(ind):
check_ant.move_to_next_index(ind)
if graph.nodes[ind].is_depot:
success_route_b = True
break
else:
break
check_ant.clear()
del check_ant
if success_route_a and success_route_b:
new_path_distance = Ant.cal_total_travel_distance(graph, new_path)
if new_path_distance < travel_distance:
# print('success to search')
# Delete one of the depots linked together in the path
for temp_index in range(1, len(new_path)):
if graph.nodes[new_path[temp_index]].is_depot and graph.nodes[new_path[temp_index - 1]].is_depot:
new_path.pop(temp_index)
break
return new_path, new_path_distance, i
else:
new_path.clear()
return None, None, None
def local_search_procedure(self, stop_event: Event):
"""
Use cross to perform a local search on the current travel_path that has visited all nodes in the graph
"""
new_path = copy.deepcopy(self.travel_path)
new_path_distance = self.total_travel_distance
times = 100
count = 0
i_start = 1
while count < times:
temp_path, temp_distance, temp_i = Ant.local_search_once(self.graph, new_path, new_path_distance, i_start, stop_event)
if temp_path is not None:
count += 1
del new_path, new_path_distance
new_path = temp_path
new_path_distance = temp_distance
# set i_start
i_start = (i_start + 1) % (new_path.count(0)-1)
i_start = max(i_start, 1)
else:
break
self.travel_path = new_path
self.total_travel_distance = new_path_distance
print('[local_search_procedure]: local search finished')
|