From 0e6ebb04fc63498bac25adef14826e305ee1f634 Mon Sep 17 00:00:00 2001 From: JonZhao <1044264932@qq.com> Date: Mon, 27 May 2019 20:24:59 +0800 Subject: =?UTF-8?q?=E4=BD=BF=E7=94=A8event=E6=9D=A5=E4=BD=9C=E4=B8=BA?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E7=BB=88=E6=AD=A2=E4=BF=A1=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ant.py | 37 ++++++------- main.py | 172 ++++++++++++++++++++++++++-------------------------------- vrptw_base.py | 13 +---- 3 files changed, 98 insertions(+), 124 deletions(-) diff --git a/ant.py b/ant.py index 2294377..5cd2ad9 100644 --- a/ant.py +++ b/ant.py @@ -2,6 +2,7 @@ import numpy as np import copy import random from vrptw_base import VrptwGraph +from threading import Event class Ant: @@ -110,7 +111,7 @@ class Ant: current_ind = next_ind return distance - def try_insert_on_path(self, node_id, what_to_do_list: list): + def try_insert_on_path(self, node_id, stop_event: Event): """ 尝试性地将node_id插入当前的travel_path中 插入的位置不能违反载重,时间,行驶距离的限制 @@ -125,10 +126,9 @@ class Ant: for insert_index in range(len(path)): - if len(what_to_do_list) > 0: - info = what_to_do_list[0] - if info.is_to_stop(): - return + if stop_event.is_set(): + print('[try_insert_on_path]: receive stop event') + return if self.graph.nodes[path[insert_index]].is_depot: continue @@ -151,10 +151,9 @@ class Ant: # 如果可以到node_id,则要保证vehicle可以行驶回到depot for next_ind in path[insert_index:]: - if len(what_to_do_list) > 0: - info = what_to_do_list[0] - if info.is_to_stop(): - return + if stop_event.is_set(): + print('[try_insert_on_path]: receive stop event') + return if check_ant.check_condition(next_ind): check_ant.move_to_next_index(next_ind) @@ -181,7 +180,7 @@ class Ant: path[i] = 0 return path - def insertion_procedure(self, what_to_do_list: list): + def insertion_procedure(self, stop_even: Event): """ 为每个未访问的结点尝试性地找到一个合适的位置,插入到当前的travel_path 插入的位置不能违反载重,时间,行驶距离的限制 @@ -201,19 +200,18 @@ class Ant: for node_id in ind_to_visit: - if len(what_to_do_list) > 0: - info = what_to_do_list[0] - if info.is_to_stop(): - return + if stop_even.is_set(): + print('[insertion_procedure]: receive stop event') + return - best_insert_index = self.try_insert_on_path(node_id, what_to_do_list) + best_insert_index = self.try_insert_on_path(node_id, stop_even) if best_insert_index is not None: self.travel_path.insert(best_insert_index, node_id) self.index_to_visit.remove(node_id) self.total_travel_distance = self.cal_total_travel_distance(self.travel_path) - def local_search_procedure(self, what_to_do_list): + def local_search_procedure(self, stop_event: Event): """ 对当前的已经访问完graph中所有节点的travel_path使用cross进行局部搜索 :return: @@ -230,10 +228,9 @@ class Ant: for i in range(1, len(depot_ind)): for j in range(i+1, len(depot_ind)): - if len(what_to_do_list) > 0: - info = what_to_do_list[0] - if info.is_to_stop(): - return + if stop_event.is_set(): + print('[local_search_procedure]: receive stop event') + return # 随机在两段route,各随机选择一段customer id,交换这两段customer id start_a = random.randint(depot_ind[i-1]+1, depot_ind[i]-1) diff --git a/main.py b/main.py index 30d8a8d..5e83ebd 100644 --- a/main.py +++ b/main.py @@ -1,12 +1,11 @@ import numpy as np import random from vprtw_aco_figure import VrptwAcoFigure -from vrptw_base import VrptwGraph, VrptwMessage +from vrptw_base import VrptwGraph, PathMessage from ant import Ant -from threading import Thread +from threading import Thread, Event from queue import Queue from concurrent.futures import ThreadPoolExecutor -import copy class VrptwAco: @@ -31,7 +30,7 @@ class VrptwAco: # best path self.best_path_distance = None self.best_path = None - self.best_vehicle_num = self.graph.nnh_travel_path.count(0)-1 + self.best_vehicle_num = None self.whether_or_not_to_show_figure = False @@ -132,21 +131,21 @@ class VrptwAco: return index_to_visit[ind] @staticmethod - def new_active_ant(ant: Ant, local_search: bool, IN: np.numarray, q0: float, what_to_do_list: list): + def new_active_ant(ant: Ant, local_search: bool, IN: np.numarray, q0: float, stop_event: Event): print('[new_active_ant]: start, start_index %d' % ant.travel_path[0]) # 计算从当前位置可以达到的下一个位置 next_index_meet_constrains = ant.cal_next_index_meet_constrains() while len(next_index_meet_constrains) > 0: - if len(what_to_do_list) > 0: - info = what_to_do_list[0] - if info.is_to_stop(): - return + if stop_event.is_set(): + print('[new_active_ant]: receive stop event') + return index_num = len(next_index_meet_constrains) ready_time = np.zeros(index_num) due_time = np.zeros(index_num) + for i in range(index_num): ready_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].ready_time due_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].due_time @@ -173,79 +172,69 @@ class VrptwAco: # 重新计算可选的下一个点 next_index_meet_constrains = ant.cal_next_index_meet_constrains() - ant.insertion_procedure(what_to_do_list) + # 如果走完所有的点了,需要回到depot + if ant.index_to_visit_empty(): + ant.move_to_next_index(0) + + # 对未访问的点进行插入,保证path是可行的 + ant.insertion_procedure(stop_event) # ant.index_to_visit_empty()==True就是feasible的意思 if local_search is True and ant.index_to_visit_empty(): - ant.local_search_procedure(what_to_do_list) + ant.local_search_procedure(stop_event) return ant.get_path_without_duplicated_depot() @staticmethod - def acs_time(new_graph: VrptwGraph, vehicle_num, ants_num, q0, what_to_do: Queue, what_is_found: Queue): + def acs_time(new_graph: VrptwGraph, vehicle_num, ants_num, q0, global_path_queue: Queue, path_found_queue: Queue, stop_event: Event): print('[acs_time]: start, vehicle_num %d' % vehicle_num) # 初始化信息素矩阵 - global_best_path = [] + global_best_path = None global_best_distance = None ants_pool = ThreadPoolExecutor(ants_num) ants_thread = [] ants = [] - ants_what_to_do_list = [] while True: + + if stop_event.is_set(): + print('[acs_time]: receive stop event') + return + for k in range(ants_num): ant = Ant(new_graph, random.randint(0, vehicle_num - 1)) - what_to_do_list = [] - thread = ants_pool.submit(VrptwAco.new_active_ant, ant, True, np.zeros(new_graph.node_num), q0, what_to_do_list) + thread = ants_pool.submit(VrptwAco.new_active_ant, ant, True, np.zeros(new_graph.node_num), q0, stop_event) ants_thread.append(thread) ants.append(ant) - ants_what_to_do_list.append(what_to_do_list) # 这里可以使用result方法,等待线程跑完 for thread in ants_thread: - - while not what_to_do.empty(): - info = what_to_do.get() - if info.is_to_stop(): - print('[acs_time]: receive stop info') - # 顺便要让蚂蚁线程都结束 - for what_to_do_list in ants_what_to_do_list: - what_to_do_list.append(VrptwMessage('stop', None, None)) - ants_pool.shutdown() - return - else: - print('[acs_time]: receive global path info') - global_best_path, global_best_distance = info.get_path_info() - thread.result() # 判断蚂蚁找出来的路径是否是feasible的,并且比全局的路径要好 for ant in ants: + + if stop_event.is_set(): + print('[acs_time]: receive stop event') + return + # 如果比全局的路径要好,则要将该路径发送到macs中 - while not what_to_do.empty(): - info = what_to_do.get() - if info.is_to_stop(): - print('[acs_time]: receive stop info') - # 顺便要让蚂蚁线程都结束 - for what_to_do_list in ants_what_to_do_list: - what_to_do_list.append(VrptwMessage('stop', None, None)) - ants_pool.shutdown() - return - else: - print('[acs_time]: receive global path info') - global_best_path, global_best_distance = info.get_path_info() + while not global_path_queue.empty(): + info = global_path_queue.get() + print('[acs_time]: receive global path info') + global_best_path, global_best_distance = info.get_path_info() if ant.index_to_visit_empty() and ant.total_travel_distance < global_best_distance: print('[acs_time]: found a improved feasible path, send path info to macs') - what_is_found.put(VrptwMessage('path_info', ant.travel_path, ant.total_travel_distance)) + path_found_queue.put(PathMessage(ant.get_path_without_duplicated_depot(), ant.total_travel_distance)) # 在这里执行信息素的全局更新 new_graph.global_update_pheromone(global_best_path, global_best_distance) @staticmethod - def acs_vehicle(new_graph: VrptwGraph, vehicle_num, ants_num, q0, what_to_do: Queue, what_is_found: Queue): + def acs_vehicle(new_graph: VrptwGraph, vehicle_num: int, ants_num: int, q0: float, global_path_queue: Queue, path_found_queue: Queue, stop_event: Event): print('[acs_vehicle]: start, vehicle_num %d' % vehicle_num) - global_best_path = [] + global_best_path = None global_best_distance = None # 使用邻近点算法初始化path 和distance @@ -259,37 +248,30 @@ class VrptwAco: ants_pool = ThreadPoolExecutor(ants_num) ants_thread = [] ants = [] - ants_what_to_do_list = [] IN = np.zeros(new_graph.node_num) while True: + + if stop_event.is_set(): + print('[acs_vehicle]: receive stop event') + return + for k in range(ants_num): ant = Ant(new_graph, random.randint(0, vehicle_num-1)) - what_to_do_list = [] - thread = ants_pool.submit(VrptwAco.new_active_ant, ant, True, IN, q0, what_to_do_list) + thread = ants_pool.submit(VrptwAco.new_active_ant, ant, True, IN, q0, stop_event) ants_thread.append(thread) ants.append(ant) - ants_what_to_do_list.append(what_to_do_list) # 这里可以使用result方法,等待线程跑完 for thread in ants_thread: - - while not what_to_do.empty(): - info = what_to_do.get() - if info.is_to_stop(): - # 顺便要让蚂蚁线程都结束 - print('[acs_vehicle]: receive stop info') - for what_to_do_list in ants_what_to_do_list: - what_to_do_list.append(VrptwMessage('stop', None, None)) - ants_pool.shutdown() - return - else: - print('[acs_vehicle]: receive global path info') - global_best_path, global_best_distance = info.get_path_info() - thread.result() for ant in ants: + + if stop_event.is_set(): + print('[acs_vehicle]: receive stop event') + return + index_to_visit = ant.index_to_visit IN[index_to_visit] = IN[index_to_visit]+1 path = ant.get_path_without_duplicated_depot() @@ -305,46 +287,44 @@ class VrptwAco: # 如果这一条路径是feasible的话,就要发到macs_vrptw中 if ant.index_to_visit_empty(): print('[acs_vehicle]: found a feasible path, send path info to macs') - what_is_found.put(VrptwMessage('path_info', ant.travel_path, ant.total_travel_distance)) + path_found_queue.put(PathMessage(ant.get_path_without_duplicated_depot(), ant.total_travel_distance)) # 更新new_graph中的信息素,global new_graph.global_update_pheromone(current_path, current_path_distance) - while not what_to_do.empty(): - info = what_to_do.get() - if info.is_to_stop(): - # 顺便要让蚂蚁线程都结束 - print('[acs_vehicle]: receive stop info') - for what_to_do_list in ants_what_to_do_list: - what_to_do_list.append(VrptwMessage('stop', None, None)) - ants_pool.shutdown() - return - else: - print('[acs_vehicle]: receive global path info') - global_best_path, global_best_distance = info.get_path_info() + while not global_path_queue.empty(): + info = global_path_queue.get() + print('[acs_vehicle]: receive global path info') + global_best_path, global_best_distance = info.get_path_info() new_graph.global_update_pheromone(global_best_path, global_best_distance) def multiple_ant_colony_system(self): - # 在这里需要两个队列,一个队列是macs告诉acs_time和acs_vehicle这两个线程,当前的best path是什么、告诉他们停止计算 - # 另外的一个线程就是用来,接收他们两个计算出来的比best path还要好的feasible path - time_what_to_do = Queue() - vehicle_what_to_do = Queue() - what_is_found = Queue() + # 在这里需要两个队列,time_what_to_do、vehicle_what_to_do, 用来告诉acs_time、acs_vehicle这两个线程,当前的best path是什么,或者让他们停止计算 + global_path_to_acs_time = Queue() + global_path_to_acs_vehicle = Queue() + + # 另外的一个队列, path_found_queue就是接收acs_time 和acs_vehicle计算出来的比best path还要好的feasible path + path_found_queue = Queue() + # 使用近邻点算法初始化 + self.best_path, self.best_path_distance = self.graph.nearest_neighbor_heuristic() + self.best_vehicle_num = self.best_path.count(0) - 1 while True: # acs_vehicle + stop_event = Event() graph_for_acs_vehicle = self.graph.construct_graph_with_duplicated_depot(self.best_vehicle_num-1, self.graph.init_pheromone_val) - acs_vehicle_thread = Thread(target=self.acs_vehicle, args=(graph_for_acs_vehicle, self.best_vehicle_num-1, - self.ants_num, self.q0, vehicle_what_to_do, - what_is_found)) + acs_vehicle_thread = Thread(target=VrptwAco.acs_vehicle, + args=(graph_for_acs_vehicle, self.best_vehicle_num-1, self.ants_num, self.q0, + global_path_to_acs_vehicle, path_found_queue, stop_event)) # acs_time graph_for_acs_time = self.graph.construct_graph_with_duplicated_depot(self.best_vehicle_num, self.graph.init_pheromone_val) - acs_time_thread = Thread(target=self.acs_time, args=(graph_for_acs_time, self.best_vehicle_num, - self.ants_num, self.q0, time_what_to_do, what_is_found)) + acs_time_thread = Thread(target=VrptwAco.acs_time, + args=(graph_for_acs_time, self.best_vehicle_num, self.ants_num, self.q0, + global_path_to_acs_time, path_found_queue, stop_event)) # 启动acs_vehicle_thread和acs_time_thread,当他们找到feasible、且是比best path好的路径时,就会发送到macs中来 print('[macs]: start acs_vehicle and acs_time') @@ -354,28 +334,32 @@ class VrptwAco: best_vehicle_num = self.best_vehicle_num while acs_vehicle_thread.is_alive() and acs_time_thread.is_alive(): - path_info = what_is_found.get() + + if path_found_queue.empty(): + continue + + path_info = path_found_queue.get() print('[macs]: receive found path info') found_path, found_path_distance = path_info.get_path_info() - # 如果,这两个线程找到的路径用的车辆更少了,就停止这两个线程,开始下一轮迭代 - # 向acs_time和acs_vehicle中发送停止信息 + # 如果找到的路径(feasible)的距离更短,则更新当前的最佳path的信息 if found_path_distance < self.best_path_distance: print('[macs]: distance of found path better than best path\'s') self.best_path = found_path self.best_vehicle_num = found_path.count(0) - 1 - self.best_path_distance = 0 + self.best_path_distance = found_path_distance + # 如果,这两个线程找到的路径用的车辆更少了,就停止这两个线程,开始下一轮迭代 + # 向acs_time和acs_vehicle中发送停止信息 if found_path.count(0)-1 < best_vehicle_num: print('[macs]: vehicle num of found path better than best path\'s') self.best_path = found_path self.best_vehicle_num = found_path.count(0)-1 - self.best_path_distance = 0 + self.best_path_distance = found_path_distance # 停止acs_time 和 acs_vehicle 两个线程 print('[macs]: send stop info to acs_time and acs_vehicle') - time_what_to_do.put(VrptwMessage(info_type='stop', path=None, distance=None)) - vehicle_what_to_do.put(VrptwMessage(info_type='stop', path=None, distance=None)) + stop_event.set() if __name__ == '__main__': diff --git a/vrptw_base.py b/vrptw_base.py index 5837e10..4c2e7e1 100644 --- a/vrptw_base.py +++ b/vrptw_base.py @@ -178,17 +178,10 @@ class VrptwGraph: return nearest_ind -class VrptwMessage: - def __init__(self, info_type, path, distance): - super() - if info_type != 'stop' and info_type != 'path_info': - raise RuntimeError('info_type should be: stop or path_info') - self.info_type = info_type - self.path = copy.deepcopy(path) +class PathMessage: + def __init__(self, path, distance): + self.path = path self.distance = distance def get_path_info(self): return self.path, self.distance - - def is_to_stop(self): - return self.info_type == 'stop' -- cgit v1.2.3