import numpy as np import copy from vrptw_base import VrptwGraph from threading import Event class Ant: """Instancia de agente en Ant Colony System. Paralelizable. Attributes ---------- graph : VrptwGraph VRPTW problem instance (parameters + graph + pheromones) current_index : int Current route's vehicle's location vehicle_load : float Current route's vehicle's spare capacity vehicle_travel_time : float Current route's vehicle's travel time travel_path : list Representation of the solution. A sequence of node indeces. arrival_time : list ? index_to_visit : list Remaining node indices not yet included in travel_path total_travel_distance : float """ def __init__(self, graph: VrptwGraph, start_index=0): super() self.graph = graph self.current_index = start_index self.vehicle_load = 0 self.vehicle_travel_time = 0 self.travel_path = [start_index] self.arrival_time = [0] self.index_to_visit = list(range(graph.node_num)) self.index_to_visit.remove(start_index) self.total_travel_distance = 0 def clear(self): """Restart Ant solution (Kill)""" self.travel_path.clear() self.index_to_visit.clear() def move_to_next_index(self, next_index): """Update ant path + "current simulated vehicle route parameters".""" self.travel_path.append(next_index) dist = self.graph.node_dist_mat[self.current_index][next_index] self.total_travel_distance += dist self.arrival_time.append(self.vehicle_travel_time + dist * 1) if self.graph.nodes[next_index].is_depot: # If the next location is a depot, the vehicle load, etc., must be cleared # (start next vehicle route) self.vehicle_load = 0 self.vehicle_travel_time = 0 else: # Update vehicle load, distance traveled, time self.vehicle_load += self.graph.nodes[next_index].demand # If it is earlier than the time window (ready_time) requested by the client, you need to wait self.vehicle_travel_time += dist * 1 \ + max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist * 1, 0) \ + self.graph.nodes[next_index].service_time self.index_to_visit.remove(next_index) self.current_index = next_index def index_to_visit_empty(self): return len(self.index_to_visit) == 0 def get_active_vehicles_num(self): return self.travel_path.count(0)-1 def check_condition(self, next_index) -> bool: """Check if moving to next point satisfies constraints Returns ------- bool Conditions satisfied. """ if self.vehicle_load + self.graph.nodes[next_index].demand > self.graph.vehicle_capacity: # Capacity is not enough to serve whole customer demand return False # Time that takes traversing the arc ("tramo") (vehicle mean speed may vary) dist_time = self.graph.node_dist_mat[self.current_index][next_index] * 1 wait_time = max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist_time, 0) service_time = self.graph.nodes[next_index].service_time # Check whether a traveler can return to the service shop after visiting a certain traveler if self.vehicle_travel_time + dist_time + wait_time + service_time \ + self.graph.node_dist_mat[next_index][0] > self.graph.nodes[0].due_time: return False # Passengers outside the due time cannot be served if self.vehicle_travel_time + dist_time > self.graph.nodes[next_index].due_time: return False return True def cal_next_index_meet_constrains(self): """Find all customers reachable from the current location (ant.current_index) Returns ------- list Neighbor nodes' indices that satisfy constraints """ next_index_meet_constrains = [] for next_index in self.index_to_visit: if self.check_condition(next_index): next_index_meet_constrains.append(next_index) return next_index_meet_constrains def cal_nearest_next_index(self, next_index_list): """ Select from the available customers, the customer closest to the current location (ant.current_index) Parameters ---------- next_index_list : list ??? Returns ------- int """ current_index = self.current_index nearest_index = next_index_list[0] min_dist = self.graph.node_dist_mat[current_index][nearest_index] for next_index in next_index_list[1:]: dist = self.graph.node_dist_mat[current_index][next_index] if dist < min_dist: min_dist = dist nearest_index = next_index return nearest_index @staticmethod def cal_total_travel_distance(graph: VrptwGraph, travel_path): """ (Update to ignore depot-to-depot distances later) """ distance = 0 current_index = travel_path[0] for next_index in travel_path[1:]: distance += graph.node_dist_mat[current_index][next_index] current_index = next_index return distance def try_insert_on_path(self, node_id, stop_event: Event): """ Attempt to insert node_id into current travel_path. Inserted locations cannot violate load, time, travel distance restrictions. If there are multiple positions, find the best position. Parameters ---------- Returns ------- Note ---- Seems like at least one vehicle route (depot-to-depot substring) in travel_path is required. This is OK since """ best_insert_index = None best_distance = None for insert_index in range(len(self.travel_path)): if stop_event.is_set(): # print('[try_insert_on_path]: receive stop event') return if self.graph.nodes[self.travel_path[insert_index]].is_depot: continue # Find the nearest depot before insert_index last_depot_index = insert_index while last_depot_index > 0 and not self.graph.nodes[self.travel_path[last_depot_index]].is_depot: last_depot_index -= 1 # check_ant starts from last_depot_index check_ant = Ant(self.graph, self.travel_path[last_depot_index]) # Let check_ant walk through the point in the path from # last_depot_index to insert_index-1 for i in range(last_depot_index+1, insert_index): # (only one depot, so no start point needs to be passed) check_ant.move_to_next_index(self.travel_path[i]) # Begin to tentatively visit the nodes in the sorted index_to_visit if check_ant.check_condition(node_id): check_ant.move_to_next_index(node_id) else: continue # If you can get to the node_id, make sure that the vehicle can drive back to the depot for next_index in self.travel_path[insert_index:]: if stop_event.is_set(): # print('[try_insert_on_path]: receive stop event') return if check_ant.check_condition(next_index): check_ant.move_to_next_index(next_index) # If back to the depot, then it means the vehicle whose # route would be modified is over *and valid*. So now, # compute the new total_travel_distance if node_id where # added just before travel_path[insert_index] if self.graph.nodes[next_index].is_depot: insert_node_id_prev = self.travel_path[insert_index-1] insert_node_id = self.travel_path[insert_index] check_ant_distance = self.total_travel_distance \ - self.graph.node_dist_mat[insert_node_id_prev][insert_node_id] \ + self.graph.node_dist_mat[insert_node_id_prev][node_id] \ + self.graph.node_dist_mat[node_id][insert_node_id] if best_distance is None or check_ant_distance < best_distance: best_distance = check_ant_distance best_insert_index = insert_index break # If you can't go back to the depot, simmulated route was invalid else: break return best_insert_index def insertion_procedure(self, stop_event: Event): """ Try to find a suitable location for each unvisited node, insert into the current travel_path and insert the location without violating load, time, travel distance constraints :return: """ if self.index_to_visit_empty(): return success_to_insert = True # Until none of the unvisited nodes can be inserted successfully while success_to_insert: success_to_insert = False # Get unvisited nodes ind_to_visit = np.array(copy.deepcopy(self.index_to_visit)) # Sort ind_to_visit by customer demand in descending order demand = np.zeros(len(ind_to_visit)) for i, ind in zip(range(len(ind_to_visit)), ind_to_visit): demand[i] = self.graph.nodes[ind].demand arg_index = np.argsort(demand)[::-1] ind_to_visit = ind_to_visit[arg_index] # Try to insert at least one from the ordered list for node_id in ind_to_visit: if stop_event.is_set(): # print('[insertion_procedure]: receive stop event') return best_insert_index = self.try_insert_on_path(node_id, stop_event) if best_insert_index is not None: self.travel_path.insert(best_insert_index, node_id) self.index_to_visit.remove(node_id) # print('[insertion_procedure]: success to insert %d(node id) in %d(index)' % (node_id, best_insert_index)) success_to_insert = True del demand del ind_to_visit if self.index_to_visit_empty(): print('[insertion_procedure]: success in insertion') self.total_travel_distance = Ant.cal_total_travel_distance(self.graph, self.travel_path) @staticmethod def local_search_once(graph: VrptwGraph, travel_path: list, travel_distance: float, i_start, stop_event: Event): """ Returns ------- list None, None, None """ # Find the locations of all depots in the path depot_index = [] for ind in range(len(travel_path)): if graph.nodes[travel_path[ind]].is_depot: depot_index.append(ind) # Divide self.travel_path into multiple segments, each segment starts # with depot and ends with depot, called route for i in range(i_start, len(depot_index)): for j in range(i + 1, len(depot_index)): if stop_event.is_set(): return None, None, None for start_a in range(depot_index[i - 1] + 1, depot_index[i]): for end_a in range(start_a, min(depot_index[i], start_a + 6)): for start_b in range(depot_index[j - 1] + 1, depot_index[j]): for end_b in range(start_b, min(depot_index[j], start_b + 6)): if start_a == end_a and start_b == end_b: continue # Create new travel_path changing swaping routes # a and b new_path = [] new_path.extend(travel_path[:start_a]) new_path.extend(travel_path[start_b:end_b + 1]) new_path.extend(travel_path[end_a:start_b]) new_path.extend(travel_path[start_a:end_a]) new_path.extend(travel_path[end_b + 1:]) depot_before_start_a = depot_index[i - 1] depot_before_start_b = depot_index[j - 1] + (end_b - start_b) - (end_a - start_a) + 1 if not graph.nodes[new_path[depot_before_start_b]].is_depot: raise RuntimeError('error') # Determine whether the changed route a is feasible success_route_a = False check_ant = Ant(graph, new_path[depot_before_start_a]) for ind in new_path[depot_before_start_a + 1:]: if check_ant.check_condition(ind): check_ant.move_to_next_index(ind) if graph.nodes[ind].is_depot: success_route_a = True break else: break check_ant.clear() del check_ant # Determine whether the changed route b is feasible success_route_b = False check_ant = Ant(graph, new_path[depot_before_start_b]) for ind in new_path[depot_before_start_b + 1:]: if check_ant.check_condition(ind): check_ant.move_to_next_index(ind) if graph.nodes[ind].is_depot: success_route_b = True break else: break check_ant.clear() del check_ant if success_route_a and success_route_b: new_path_distance = Ant.cal_total_travel_distance(graph, new_path) if new_path_distance < travel_distance: # print('success to search') # Delete one of the depots linked together in the path for temp_index in range(1, len(new_path)): if graph.nodes[new_path[temp_index]].is_depot and graph.nodes[new_path[temp_index - 1]].is_depot: new_path.pop(temp_index) break return new_path, new_path_distance, i else: new_path.clear() return None, None, None def local_search_procedure(self, stop_event: Event): """ Use cross to perform a local search on the current travel_path that has visited all nodes in the graph """ new_path = copy.deepcopy(self.travel_path) new_path_distance = self.total_travel_distance times = 100 count = 0 i_start = 1 while count < times: temp_path, temp_distance, temp_i = Ant.local_search_once(self.graph, new_path, new_path_distance, i_start, stop_event) if temp_path is not None: count += 1 del new_path, new_path_distance new_path = temp_path new_path_distance = temp_distance # set i_start i_start = (i_start + 1) % (new_path.count(0)-1) i_start = max(i_start, 1) else: break self.travel_path = new_path self.total_travel_distance = new_path_distance print('[local_search_procedure]: local search finished')