diff options
| -rw-r--r-- | ant.py | 186 | ||||
| -rw-r--r-- | multiple_ant_colony_system.py | 2 |
2 files changed, 103 insertions, 85 deletions
@@ -106,11 +106,12 @@ class Ant: return nearest_ind - def cal_total_travel_distance(self, travel_path): + @staticmethod + def cal_total_travel_distance(graph: VrptwGraph, travel_path): distance = 0 current_ind = travel_path[0] for next_ind in travel_path[1:]: - distance += self.graph.node_dist_mat[current_ind][next_ind] + distance += graph.node_dist_mat[current_ind][next_ind] current_ind = next_ind return distance @@ -216,104 +217,121 @@ class Ant: if best_insert_index is not None: self.travel_path.insert(best_insert_index, node_id) self.index_to_visit.remove(node_id) - print('[insertion_procedure]: success to insert %d(node id) in %d(index)' % (node_id, best_insert_index)) + # print('[insertion_procedure]: success to insert %d(node id) in %d(index)' % (node_id, best_insert_index)) success_to_insert = True del demand del ind_to_visit - self.total_travel_distance = self.cal_total_travel_distance(self.travel_path) - print('[insertion_procedure]: insertion finished') + if self.index_to_visit_empty(): + print('[insertion_procedure]: success in insertion') + + self.total_travel_distance = Ant.cal_total_travel_distance(self.graph, self.travel_path) + + @staticmethod + def local_search_once(graph: VrptwGraph, travel_path: list, travel_distance: float, i_start, stop_event: Event): - def local_search_procedure(self, stop_event: Event): - """ - 对当前的已经访问完graph中所有节点的travel_path使用cross进行局部搜索 - :return: - """ # 找出path中所有的depot的位置 depot_ind = [] - for ind in range(len(self.travel_path)): - if self.graph.nodes[self.travel_path[ind]].is_depot: + for ind in range(len(travel_path)): + if graph.nodes[travel_path[ind]].is_depot: depot_ind.append(ind) - new_path_travel_distance = self.total_travel_distance - new_path = self.travel_path - # 将self.travel_path分成多段,每段以depot开始,以depot结束,称为route - for i in range(1, len(depot_ind)): - for j in range(i+1, len(depot_ind)): + for i in range(i_start, len(depot_ind)): + for j in range(i + 1, len(depot_ind)): if stop_event.is_set(): - return + return None, None, None - position = [] - for start_a in range(depot_ind[i-1]+1, depot_ind[i]): - for end_a in range(start_a, min(depot_ind[i], start_a+6)): - for start_b in range(depot_ind[j-1]+1, depot_ind[j]): - for end_b in range(start_b, min(depot_ind[j], start_b+6)): + for start_a in range(depot_ind[i - 1] + 1, depot_ind[i]): + for end_a in range(start_a, min(depot_ind[i], start_a + 6)): + for start_b in range(depot_ind[j - 1] + 1, depot_ind[j]): + for end_b in range(start_b, min(depot_ind[j], start_b + 6)): if start_a == end_a and start_b == end_b: continue - position.append([start_a, end_a, start_b, end_b]) - - for posi in position: - start_a, end_a, start_b, end_b = posi - path = [] - path.extend(self.travel_path[:start_a]) - path.extend(self.travel_path[start_b:end_b+1]) - path.extend(self.travel_path[end_a:start_b]) - path.extend(self.travel_path[start_a:end_a]) - path.extend(self.travel_path[end_b+1:]) - - depot_before_start_a = depot_ind[i-1] - - depot_before_start_b = depot_ind[j-1] + (end_b-start_b) - (end_a-start_a) + 1 - if not self.graph.nodes[path[depot_before_start_b]].is_depot: - raise RuntimeError('error') - - # 判断发生改变的route a是否是feasible的 - success_route_a = False - check_ant = Ant(self.graph, path[depot_before_start_a]) - for ind in path[depot_before_start_a+1:]: - if check_ant.check_condition(ind): - check_ant.move_to_next_index(ind) - if self.graph.nodes[ind].is_depot: - success_route_a = True - break - else: - break - - check_ant.clear() - del check_ant - - # 判断发生改变的route b是否是feasible的 - success_route_b = False - check_ant = Ant(self.graph, path[depot_before_start_b]) - for ind in path[depot_before_start_b + 1:]: - if check_ant.check_condition(ind): - check_ant.move_to_next_index(ind) - if self.graph.nodes[ind].is_depot: - success_route_b = True - break - else: - break - check_ant.clear() - del check_ant - - if success_route_a and success_route_b: - total_travel_distance = self.cal_total_travel_distance(path) - if total_travel_distance < new_path_travel_distance: - # print('success to search') - new_path_travel_distance = total_travel_distance - new_path = path - print('[local_search_procedure]: found a path in local search, its distance is %f' % new_path_travel_distance) - else: - path.clear() - - # 判断new_path中是否存在连个连续的depot - for i in range(1, len(new_path)): - if self.graph.nodes[new_path[i]].is_depot and self.graph.nodes[new_path[i-1]].is_depot: - new_path.pop(i) + new_path = [] + new_path.extend(travel_path[:start_a]) + new_path.extend(travel_path[start_b:end_b + 1]) + new_path.extend(travel_path[end_a:start_b]) + new_path.extend(travel_path[start_a:end_a]) + new_path.extend(travel_path[end_b + 1:]) + + depot_before_start_a = depot_ind[i - 1] + + depot_before_start_b = depot_ind[j - 1] + (end_b - start_b) - (end_a - start_a) + 1 + if not graph.nodes[new_path[depot_before_start_b]].is_depot: + raise RuntimeError('error') + + # 判断发生改变的route a是否是feasible的 + success_route_a = False + check_ant = Ant(graph, new_path[depot_before_start_a]) + for ind in new_path[depot_before_start_a + 1:]: + if check_ant.check_condition(ind): + check_ant.move_to_next_index(ind) + if graph.nodes[ind].is_depot: + success_route_a = True + break + else: + break + + check_ant.clear() + del check_ant + + # 判断发生改变的route b是否是feasible的 + success_route_b = False + check_ant = Ant(graph, new_path[depot_before_start_b]) + for ind in new_path[depot_before_start_b + 1:]: + if check_ant.check_condition(ind): + check_ant.move_to_next_index(ind) + if graph.nodes[ind].is_depot: + success_route_b = True + break + else: + break + check_ant.clear() + del check_ant + + if success_route_a and success_route_b: + new_path_distance = Ant.cal_total_travel_distance(graph, new_path) + if new_path_distance < travel_distance: + # print('success to search') + + # 删除路径中连在一起的depot中的一个 + for temp_ind in range(1, len(new_path)): + if graph.nodes[new_path[temp_ind]].is_depot and graph.nodes[new_path[temp_ind - 1]].is_depot: + new_path.pop(temp_ind) + break + return new_path, new_path_distance, i + else: + new_path.clear() + + return None, None, None + + def local_search_procedure(self, stop_event: Event): + """ + 对当前的已经访问完graph中所有节点的travel_path使用cross进行局部搜索 + :return: + """ + new_path = copy.deepcopy(self.travel_path) + new_path_distance = self.total_travel_distance + times = 100 + count = 0 + i_start = 1 + while count < times: + temp_path, temp_distance, temp_i = Ant.local_search_once(self.graph, new_path, new_path_distance, i_start, stop_event) + if temp_path is not None: + count += 1 + + del new_path, new_path_distance + new_path = temp_path + new_path_distance = temp_distance + + # 设置i_start + i_start = (i_start + 1) % (new_path.count(0)-1) + i_start = max(i_start, 1) + else: break self.travel_path = new_path - self.total_travel_distance = new_path_travel_distance + self.total_travel_distance = new_path_distance print('[local_search_procedure]: local search finished')
\ No newline at end of file diff --git a/multiple_ant_colony_system.py b/multiple_ant_colony_system.py index b4e6f92..6dced28 100644 --- a/multiple_ant_colony_system.py +++ b/multiple_ant_colony_system.py @@ -203,7 +203,7 @@ class MultipleAntColonySystem: # 向macs发送计算得到的当前的最佳路径 if ant_best_travel_distance is not None and ant_best_travel_distance < global_best_distance: - print('[acs_time]: local search for global_path found a improved feasible path, send path info to macs') + print('[acs_time]: ants\' local search found a improved feasible path, send path info to macs') path_found_queue.put(PathMessage(ant_best_path, ant_best_travel_distance)) ants_thread.clear() |
