summaryrefslogtreecommitdiffstats
path: root/ant.py
blob: 9887847b8f1028c22065af121fab6aca06d56457 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
import numpy as np
import copy
from vrptw_base import VrptwGraph
from threading import Event


class Ant:
    """Instancia de agente en Ant Colony System.  Paralelizable.

    Attributes
    ----------
    graph : VrptwGraph
        VRPTW problem instance (parameters + graph + pheromones)
    current_index : int
        Current route's vehicle's location
    vehicle_load : float
        Current route's vehicle's spare capacity
    vehicle_travel_time : float
        Current route's vehicle's travel time
    travel_path : list
        Representation of the solution.  A sequence of node indeces.
    arrival_time : list
        ?
    index_to_visit : list
        Remaining node indices not yet included in travel_path
    total_travel_distance : float
    """
    def __init__(self, graph: VrptwGraph, start_index=0):
        super()
        self.graph = graph
        self.current_index = start_index
        self.vehicle_load = 0
        self.vehicle_travel_time = 0
        self.travel_path = [start_index]
        self.arrival_time = [0]

        self.index_to_visit = list(range(graph.node_num))
        self.index_to_visit.remove(start_index)

        self.total_travel_distance = 0

    def clear(self):
        """Restart Ant solution (Kill)"""
        self.travel_path.clear()
        self.index_to_visit.clear()

    def move_to_next_index(self, next_index):
        """Update ant path + "current simulated vehicle route parameters"."""
        self.travel_path.append(next_index)
        dist = self.graph.node_dist_mat[self.current_index][next_index]
        self.total_travel_distance += dist

        self.arrival_time.append(self.vehicle_travel_time + dist * 1)

        if self.graph.nodes[next_index].is_depot:
            # If the next location is a depot, the vehicle load, etc., must be cleared
            # (start next vehicle route)
            self.vehicle_load = 0
            self.vehicle_travel_time = 0

        else:
            # Update vehicle load, distance traveled, time
            self.vehicle_load += self.graph.nodes[next_index].demand
            # If it is earlier than the time window (ready_time) requested by the client, you need to wait
            self.vehicle_travel_time += dist * 1 \
                    + max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist * 1, 0) \
                    + self.graph.nodes[next_index].service_time
            self.index_to_visit.remove(next_index)

        self.current_index = next_index

    def index_to_visit_empty(self):
        return len(self.index_to_visit) == 0

    def get_active_vehicles_num(self):
        return self.travel_path.count(0)-1

    def check_condition(self, next_index) -> bool:
        """Check if moving to next point satisfies constraints

        Returns
        -------
        bool
            Conditions satisfied.
        """
        if self.vehicle_load + self.graph.nodes[next_index].demand > self.graph.vehicle_capacity:
            # Capacity is not enough to serve whole customer demand
            return False

        # Time that takes traversing the arc ("tramo") (vehicle mean speed may vary)
        dist_time = self.graph.node_dist_mat[self.current_index][next_index] * 1
        wait_time = max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist_time, 0)
        service_time = self.graph.nodes[next_index].service_time

        # Check whether a traveler can return to the service shop after visiting a certain traveler
        if self.vehicle_travel_time + dist_time + wait_time + service_time \
           + self.graph.node_dist_mat[next_index][0] > self.graph.nodes[0].due_time:
            return False

        # Passengers outside the due time cannot be served
        if self.vehicle_travel_time + dist_time > self.graph.nodes[next_index].due_time:
            return False

        return True

    def cal_next_index_meet_constrains(self):
        """Find all customers reachable from the current location (ant.current_index)

        Returns
        -------
        list
            Neighbor nodes' indices that satisfy constraints
        """
        next_index_meet_constrains = []
        for next_index in self.index_to_visit:
            if self.check_condition(next_index):
                next_index_meet_constrains.append(next_index)
        return next_index_meet_constrains

    def cal_nearest_next_index(self, next_index_list):
        """
        Select from the available customers, the customer closest to the current
        location (ant.current_index)

        Parameters
        ----------
        next_index_list : list
            ???

        Returns
        -------
        int
        """
        current_index = self.current_index

        nearest_index = next_index_list[0]
        min_dist = self.graph.node_dist_mat[current_index][nearest_index]

        for next_index in next_index_list[1:]:
            dist = self.graph.node_dist_mat[current_index][next_index]
            if dist < min_dist:
                min_dist = dist
                nearest_index = next_index

        return nearest_index

    @staticmethod
    def cal_total_travel_distance(graph: VrptwGraph, travel_path):
        """
        (Update to ignore depot-to-depot distances later)
        """
        distance = 0
        current_index = travel_path[0]
        for next_index in travel_path[1:]:
            distance += graph.node_dist_mat[current_index][next_index]
            current_index = next_index
        return distance

    def try_insert_on_path(self, node_id, stop_event: Event):
        """
        Attempt to insert node_id into current travel_path.  Inserted locations
        cannot violate load, time, travel distance restrictions.  If there are
        multiple positions, find the best position.

        Parameters
        ----------

        Returns
        -------

        Note
        ----
        Seems like at least one vehicle route (depot-to-depot substring) in
        travel_path is required.  This is OK since 
        """
        best_insert_index = None
        best_distance = None

        for insert_index in range(len(self.travel_path)):

            if stop_event.is_set():
                # print('[try_insert_on_path]: receive stop event')
                return

            if self.graph.nodes[self.travel_path[insert_index]].is_depot:
                continue

            # Find the nearest depot before insert_index
            last_depot_index = insert_index
            while last_depot_index > 0 and not self.graph.nodes[self.travel_path[last_depot_index]].is_depot:
                last_depot_index -= 1

            # check_ant starts from last_depot_index
            check_ant = Ant(self.graph, self.travel_path[last_depot_index])

            # Let check_ant walk through the point in the path from
            # last_depot_index to insert_index-1
            for i in range(last_depot_index+1, insert_index):
                # (only one depot, so no start point needs to be passed)
                check_ant.move_to_next_index(self.travel_path[i])

            # Begin to tentatively visit the nodes in the sorted index_to_visit
            if check_ant.check_condition(node_id):
                check_ant.move_to_next_index(node_id)
            else:
                continue

            # If you can get to the node_id, make sure that the vehicle can drive back to the depot
            for next_index in self.travel_path[insert_index:]:
                if stop_event.is_set():
                    # print('[try_insert_on_path]: receive stop event')
                    return

                if check_ant.check_condition(next_index):
                    check_ant.move_to_next_index(next_index)

                    # If back to the depot,  then it means the vehicle whose
                    # route would be modified is over *and valid*.  So now,
                    # compute the new total_travel_distance if node_id where
                    # added just before travel_path[insert_index]
                    if self.graph.nodes[next_index].is_depot:
                        insert_node_id_prev = self.travel_path[insert_index-1]
                        insert_node_id = self.travel_path[insert_index]

                        check_ant_distance = self.total_travel_distance \
                                             - self.graph.node_dist_mat[insert_node_id_prev][insert_node_id] \
                                             + self.graph.node_dist_mat[insert_node_id_prev][node_id] \
                                             + self.graph.node_dist_mat[node_id][insert_node_id]

                        if best_distance is None or check_ant_distance < best_distance:
                            best_distance = check_ant_distance
                            best_insert_index = insert_index
                        break

                # If you can't go back to the depot, simmulated route was invalid
                else:
                    break

        return best_insert_index

    def insertion_procedure(self, stop_event: Event):
        """
        Try to find a suitable location for each unvisited node, insert into the
        current travel_path and insert the location without violating load,
        time, travel distance constraints
        :return:
        """
        if self.index_to_visit_empty():
            return

        success_to_insert = True
        # Until none of the unvisited nodes can be inserted successfully
        while success_to_insert:

            success_to_insert = False
            # Get unvisited nodes
            ind_to_visit = np.array(copy.deepcopy(self.index_to_visit))

            # Sort ind_to_visit by customer demand in descending order
            demand = np.zeros(len(ind_to_visit))
            for i, ind in zip(range(len(ind_to_visit)), ind_to_visit):
                demand[i] = self.graph.nodes[ind].demand

            arg_index = np.argsort(demand)[::-1]
            ind_to_visit = ind_to_visit[arg_index]

            # Try to insert at least one from the ordered list
            for node_id in ind_to_visit:
                if stop_event.is_set():
                    # print('[insertion_procedure]: receive stop event')
                    return

                best_insert_index = self.try_insert_on_path(node_id, stop_event)
                if best_insert_index is not None:
                    self.travel_path.insert(best_insert_index, node_id)
                    self.index_to_visit.remove(node_id)
                    # print('[insertion_procedure]: success to insert %d(node id) in %d(index)' % (node_id, best_insert_index))
                    success_to_insert = True

            del demand
            del ind_to_visit
        if self.index_to_visit_empty():
            print('[insertion_procedure]: success in insertion')

        self.total_travel_distance = Ant.cal_total_travel_distance(self.graph, self.travel_path)

    @staticmethod
    def local_search_once(graph: VrptwGraph, travel_path: list, travel_distance: float, i_start, stop_event: Event):
        """

        Returns
        -------
        list
            None, None, None
        """

        # Find the locations of all depots in the path
        depot_index = []
        for ind in range(len(travel_path)):
            if graph.nodes[travel_path[ind]].is_depot:
                depot_index.append(ind)

        # Divide self.travel_path into multiple segments, each segment starts
        # with depot and ends with depot, called route
        for i in range(i_start, len(depot_index)):
            for j in range(i + 1, len(depot_index)):

                if stop_event.is_set():
                    return None, None, None

                for start_a in range(depot_index[i - 1] + 1, depot_index[i]):
                    for end_a in range(start_a, min(depot_index[i], start_a + 6)):
                        for start_b in range(depot_index[j - 1] + 1, depot_index[j]):
                            for end_b in range(start_b, min(depot_index[j], start_b + 6)):
                                if start_a == end_a and start_b == end_b:
                                    continue
                                # Create new travel_path changing swaping routes
                                # a and b
                                new_path = []
                                new_path.extend(travel_path[:start_a])
                                new_path.extend(travel_path[start_b:end_b + 1])
                                new_path.extend(travel_path[end_a:start_b])
                                new_path.extend(travel_path[start_a:end_a])
                                new_path.extend(travel_path[end_b + 1:])

                                depot_before_start_a = depot_index[i - 1]

                                depot_before_start_b = depot_index[j - 1] + (end_b - start_b) - (end_a - start_a) + 1
                                if not graph.nodes[new_path[depot_before_start_b]].is_depot:
                                    raise RuntimeError('error')

                                # Determine whether the changed route a is feasible
                                success_route_a = False
                                check_ant = Ant(graph, new_path[depot_before_start_a])
                                for ind in new_path[depot_before_start_a + 1:]:
                                    if check_ant.check_condition(ind):
                                        check_ant.move_to_next_index(ind)
                                        if graph.nodes[ind].is_depot:
                                            success_route_a = True
                                            break
                                    else:
                                        break

                                check_ant.clear()
                                del check_ant

                                # Determine whether the changed route b is feasible
                                success_route_b = False
                                check_ant = Ant(graph, new_path[depot_before_start_b])
                                for ind in new_path[depot_before_start_b + 1:]:
                                    if check_ant.check_condition(ind):
                                        check_ant.move_to_next_index(ind)
                                        if graph.nodes[ind].is_depot:
                                            success_route_b = True
                                            break
                                    else:
                                        break
                                check_ant.clear()
                                del check_ant

                                if success_route_a and success_route_b:
                                    new_path_distance = Ant.cal_total_travel_distance(graph, new_path)
                                    if new_path_distance < travel_distance:
                                        # print('success to search')

                                        # Delete one of the depots linked together in the path
                                        for temp_index in range(1, len(new_path)):
                                            if graph.nodes[new_path[temp_index]].is_depot and graph.nodes[new_path[temp_index - 1]].is_depot:
                                                new_path.pop(temp_index)
                                                break
                                        return new_path, new_path_distance, i
                                else:
                                    new_path.clear()

        return None, None, None

    def local_search_procedure(self, stop_event: Event):
        """
        Use cross to perform a local search on the current travel_path that has visited all nodes in the graph
        """
        new_path = copy.deepcopy(self.travel_path)
        new_path_distance = self.total_travel_distance
        times = 100
        count = 0
        i_start = 1
        while count < times:
            temp_path, temp_distance, temp_i = Ant.local_search_once(self.graph, new_path, new_path_distance, i_start, stop_event)
            if temp_path is not None:
                count += 1

                del new_path, new_path_distance
                new_path = temp_path
                new_path_distance = temp_distance

                # set i_start
                i_start = (i_start + 1) % (new_path.count(0)-1)
                i_start = max(i_start, 1)
            else:
                break

        self.travel_path = new_path
        self.total_travel_distance = new_path_distance
        print('[local_search_procedure]: local search finished')