summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--basic_aco.py142
-rw-r--r--main.py433
-rw-r--r--multiple_ant_colony_system.py337
3 files changed, 487 insertions, 425 deletions
diff --git a/basic_aco.py b/basic_aco.py
new file mode 100644
index 0000000..3bdc24c
--- /dev/null
+++ b/basic_aco.py
@@ -0,0 +1,142 @@
+import numpy as np
+import random
+from vprtw_aco_figure import VrptwAcoFigure
+from vrptw_base import VrptwGraph, PathMessage
+from ant import Ant
+from threading import Thread
+from queue import Queue
+
+
+class BasicACO:
+ def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, alpha=1, beta=2):
+ super()
+ # graph 结点的位置、服务时间信息
+ self.graph = graph
+ # ants_num 蚂蚁数量
+ self.ants_num = ants_num
+ # max_iter 最大迭代次数
+ self.max_iter = max_iter
+ # vehicle_capacity 表示每辆车的最大载重
+ self.max_load = graph.vehicle_capacity
+ # 信息素强度
+ self.Q = 1
+ # alpha 信息素信息重要新
+ self.alpha = alpha
+ # beta 启发性信息重要性
+ self.beta = beta
+ # q0 表示直接选择概率最大的下一点的概率
+ self.q0 = 0.1
+ # best path
+ self.best_path_distance = None
+ self.best_path = None
+ self.best_vehicle_num = None
+
+ self.whether_or_not_to_show_figure = True
+
+ def run_basic_aco(self):
+ # 开启一个线程来跑_basic_aco,使用主线程来绘图
+ path_queue_for_figure = Queue()
+ basic_aco_thread = Thread(target=self._basic_aco, args=(path_queue_for_figure,))
+ basic_aco_thread.start()
+
+ # 是否要展示figure
+ if self.whether_or_not_to_show_figure:
+ figure = VrptwAcoFigure(self.graph, path_queue_for_figure)
+ figure.run()
+ basic_aco_thread.join()
+
+ # 传入None作为结束标志
+ if self.whether_or_not_to_show_figure:
+ path_queue_for_figure.put(PathMessage(None, None))
+
+ def _basic_aco(self, path_queue_for_figure: Queue):
+ """
+ 最基本的蚁群算法
+ :return:
+ """
+ # 最大迭代次数
+ for iter in range(self.max_iter):
+
+ # 为每只蚂蚁设置当前车辆负载,当前旅行距离,当前时间
+ ants = list(Ant(self.graph) for _ in range(self.ants_num))
+ for k in range(self.ants_num):
+
+ # 蚂蚁需要访问完所有的客户
+ while not ants[k].index_to_visit_empty():
+ next_index = self.select_next_index(ants[k])
+ # 判断加入该位置后,是否还满足约束条件, 如果不满足,则再选择一次,然后再进行判断
+ if not ants[k].check_condition(next_index):
+ next_index = self.select_next_index(ants[k])
+ if not ants[k].check_condition(next_index):
+ next_index = 0
+
+ # 更新蚂蚁路径
+ ants[k].move_to_next_index(next_index)
+ self.graph.local_update_pheromone(ants[k].current_index, next_index)
+
+ # 最终回到0位置
+ ants[k].move_to_next_index(0)
+ self.graph.local_update_pheromone(ants[k].current_index, 0)
+
+ # 计算所有蚂蚁的路径长度
+ paths_distance = np.array([ant.total_travel_distance for ant in ants])
+
+ # 记录当前的最佳路径
+ best_index = np.argmin(paths_distance)
+ if self.best_path is None:
+ self.best_path = ants[int(best_index)].travel_path
+ self.best_path_distance = paths_distance[best_index]
+ if self.whether_or_not_to_show_figure:
+ path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
+
+ elif paths_distance[best_index] < self.best_path_distance:
+ self.best_path = ants[int(best_index)].travel_path
+ self.best_path_distance = paths_distance[best_index]
+ if self.whether_or_not_to_show_figure:
+ path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
+
+ print('[iteration %d]: best distance %f' % (iter, self.best_path_distance))
+ # 更新信息素表
+ self.graph.global_update_pheromone(self.best_path, self.best_path_distance)
+
+ def select_next_index(self, ant):
+ """
+ 选择下一个结点
+ :param ant:
+ :return:
+ """
+ current_index = ant.current_index
+ index_to_visit = ant.index_to_visit
+
+ transition_prob = np.power(self.graph.pheromone_mat[current_index][index_to_visit], self.alpha) * \
+ np.power(self.graph.heuristic_info_mat[current_index][index_to_visit], self.beta)
+
+ if np.random.rand() < self.q0:
+ max_prob_index = np.argmax(transition_prob)
+ next_index = index_to_visit[max_prob_index]
+ else:
+ # 使用轮盘赌算法
+ next_index = BasicACO.stochastic_accept(index_to_visit, transition_prob)
+ return next_index
+
+ @staticmethod
+ def stochastic_accept(index_to_visit, transition_prob):
+ """
+ 轮盘赌
+ :param index_to_visit: a list of N index (list or tuple)
+ :param transition_prob:
+ :return: selected index
+ """
+ # calculate N and max fitness value
+ N = len(index_to_visit)
+
+ # normalize
+ sum_tran_prob = np.sum(transition_prob)
+ norm_transition_prob = transition_prob/sum_tran_prob
+
+ # select: O(1)
+ while True:
+ # randomly select an individual with uniform probability
+ ind = int(N * random.random())
+ if random.random() <= norm_transition_prob[ind]:
+ return index_to_visit[ind]
diff --git a/main.py b/main.py
index afa3d95..efb4951 100644
--- a/main.py
+++ b/main.py
@@ -1,431 +1,14 @@
-import numpy as np
-import random
-from vprtw_aco_figure import VrptwAcoFigure
-from vrptw_base import VrptwGraph, PathMessage
-from ant import Ant
-from threading import Thread, Event
-from queue import Queue
-from concurrent.futures import ThreadPoolExecutor
-import copy
-
-
-class VrptwAco:
- def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, alpha=1, beta=2):
- super()
- # graph 结点的位置、服务时间信息
- self.graph = graph
- # ants_num 蚂蚁数量
- self.ants_num = ants_num
- # max_iter 最大迭代次数
- self.max_iter = max_iter
- # vehicle_capacity 表示每辆车的最大载重
- self.max_load = graph.vehicle_capacity
- # 信息素强度
- self.Q = 1
- # alpha 信息素信息重要新
- self.alpha = alpha
- # beta 启发性信息重要性
- self.beta = beta
- # q0 表示直接选择概率最大的下一点的概率
- self.q0 = 0.1
- # best path
- self.best_path_distance = None
- self.best_path = None
- self.best_vehicle_num = None
-
- self.whether_or_not_to_show_figure = True
-
- def run_basic_aco(self):
- # 开启一个线程来跑_basic_aco,使用主线程来绘图
- path_queue_for_figure = Queue()
- basic_aco_thread = Thread(target=self._basic_aco, args=(path_queue_for_figure,))
- basic_aco_thread.start()
-
- # 是否要展示figure
- if self.whether_or_not_to_show_figure:
- figure = VrptwAcoFigure(self.graph, path_queue_for_figure)
- figure.run()
- basic_aco_thread.join()
-
- # 传入None作为结束标志
- if self.whether_or_not_to_show_figure:
- path_queue_for_figure.put(PathMessage(None, None))
-
- def _basic_aco(self, path_queue_for_figure: Queue):
- """
- 最基本的蚁群算法
- :return:
- """
- # 最大迭代次数
- for iter in range(self.max_iter):
-
- # 为每只蚂蚁设置当前车辆负载,当前旅行距离,当前时间
- ants = list(Ant(self.graph) for _ in range(self.ants_num))
- for k in range(self.ants_num):
-
- # 蚂蚁需要访问完所有的客户
- while not ants[k].index_to_visit_empty():
- next_index = self.select_next_index(ants[k])
- # 判断加入该位置后,是否还满足约束条件, 如果不满足,则再选择一次,然后再进行判断
- if not ants[k].check_condition(next_index):
- next_index = self.select_next_index(ants[k])
- if not ants[k].check_condition(next_index):
- next_index = 0
-
- # 更新蚂蚁路径
- ants[k].move_to_next_index(next_index)
- self.graph.local_update_pheromone(ants[k].current_index, next_index)
-
- # 最终回到0位置
- ants[k].move_to_next_index(0)
- self.graph.local_update_pheromone(ants[k].current_index, 0)
-
- # 计算所有蚂蚁的路径长度
- paths_distance = np.array([ant.total_travel_distance for ant in ants])
-
- # 记录当前的最佳路径
- best_index = np.argmin(paths_distance)
- if self.best_path is None:
- self.best_path = ants[int(best_index)].travel_path
- self.best_path_distance = paths_distance[best_index]
- if self.whether_or_not_to_show_figure:
- path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
-
- elif paths_distance[best_index] < self.best_path_distance:
- self.best_path = ants[int(best_index)].travel_path
- self.best_path_distance = paths_distance[best_index]
- if self.whether_or_not_to_show_figure:
- path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
-
- print('[iteration %d]: best distance %f' % (iter, self.best_path_distance))
- # 更新信息素表
- self.graph.global_update_pheromone(self.best_path, self.best_path_distance)
-
- def select_next_index(self, ant):
- """
- 选择下一个结点
- :param ant:
- :return:
- """
- current_index = ant.current_index
- index_to_visit = ant.index_to_visit
-
- transition_prob = np.power(self.graph.pheromone_mat[current_index][index_to_visit], self.alpha) * \
- np.power(self.graph.heuristic_info_mat[current_index][index_to_visit], self.beta)
-
- if np.random.rand() < self.q0:
- max_prob_index = np.argmax(transition_prob)
- next_index = index_to_visit[max_prob_index]
- else:
- # 使用轮盘赌算法
- next_index = VrptwAco.stochastic_accept(index_to_visit, transition_prob)
- return next_index
-
- @staticmethod
- def stochastic_accept(index_to_visit, transition_prob):
- """
- 轮盘赌
- :param index_to_visit: a list of N index (list or tuple)
- :param transition_prob:
- :return: selected index
- """
- # calculate N and max fitness value
- N = len(index_to_visit)
-
- # normalize
- sum_tran_prob = np.sum(transition_prob)
- norm_transition_prob = transition_prob/sum_tran_prob
-
- # select: O(1)
- while True:
- # randomly select an individual with uniform probability
- ind = int(N * random.random())
- if random.random() <= norm_transition_prob[ind]:
- return index_to_visit[ind]
-
- @staticmethod
- def new_active_ant(ant: Ant, vehicle_num: int, local_search: bool, IN: np.numarray, q0: float, stop_event: Event):
- # print('[new_active_ant]: start, start_index %d' % ant.travel_path[0])
-
- # 在new_active_ant中,最多可以使用vehicle_num个车,即最多可以包含vehicle_num+1个depot结点,由于出发结点用掉了一个,所以只剩下vehicle个depot
- unused_depot_count = vehicle_num
-
- # 如果还有未访问的结点,并且还可以回到depot中
- while not ant.index_to_visit_empty() and unused_depot_count > 0:
- if stop_event.is_set():
- # print('[new_active_ant]: receive stop event')
- return
-
- # 计算所有满足载重等限制的下一个结点
- next_index_meet_constrains = ant.cal_next_index_meet_constrains()
-
- # 如果没有满足限制的下一个结点,则回到depot中
- if len(next_index_meet_constrains) == 0:
- ant.move_to_next_index(0)
- unused_depot_count -= 1
- continue
-
- # 开始计算满足限制的下一个结点,选择各个结点的概率
- index_num = len(next_index_meet_constrains)
- ready_time = np.zeros(index_num)
- due_time = np.zeros(index_num)
-
- for i in range(index_num):
- ready_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].ready_time
- due_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].due_time
-
- delivery_time = np.array([max(i, j) for i, j in zip(ant.vehicle_travel_time + ant.graph.node_dist_mat[ant.current_index][next_index_meet_constrains], ready_time)])
-
- delta_time = delivery_time - ant.vehicle_travel_time
- distance = delta_time * (due_time - ant.vehicle_travel_time)
-
- distance = np.array([max(1.0, j) for j in distance-IN[next_index_meet_constrains]])
- closeness = 1/distance
-
- # 按照概率直接选择closeness最大的结点
- if np.random.rand() < q0:
- max_prob_index = np.argmax(closeness)
- next_index = next_index_meet_constrains[max_prob_index]
- else:
- # 使用轮盘赌算法
- next_index = VrptwAco.stochastic_accept(next_index_meet_constrains, closeness)
-
- # 更新信息素矩阵
- ant.graph.local_update_pheromone(ant.current_index, next_index)
- ant.move_to_next_index(next_index)
-
- # 如果走完所有的点了,需要回到depot
- if ant.index_to_visit_empty():
- ant.move_to_next_index(0)
-
- # 对未访问的点进行插入,保证path是可行的
- ant.insertion_procedure(stop_event)
-
- # ant.index_to_visit_empty()==True就是feasible的意思
- if local_search is True and ant.index_to_visit_empty():
- ant.local_search_procedure(stop_event)
-
- @staticmethod
- def acs_time(new_graph: VrptwGraph, vehicle_num, ants_num, q0, global_path_queue: Queue, path_found_queue: Queue, stop_event: Event):
- # 最多可以使用vehicle_num辆车,即在path中最多包含vehicle_num+1个depot中,找到路程最短的路径,
- # vehicle_num设置为与当前的best_path一致
- print('[acs_time]: start, vehicle_num %d' % vehicle_num)
- # 初始化信息素矩阵
- global_best_path = None
- global_best_distance = None
- ants_pool = ThreadPoolExecutor(ants_num)
- ants_thread = []
- ants = []
- while True:
-
- if stop_event.is_set():
- print('[acs_time]: receive stop event')
- return
-
- for k in range(ants_num):
- ant = Ant(new_graph, 0)
- thread = ants_pool.submit(VrptwAco.new_active_ant, ant, vehicle_num, True, np.zeros(new_graph.node_num), q0, stop_event)
-
- ants_thread.append(thread)
- ants.append(ant)
-
- # 这里可以使用result方法,等待线程跑完
- for thread in ants_thread:
- thread.result()
-
- # 判断蚂蚁找出来的路径是否是feasible的,并且比全局的路径要好
- for ant in ants:
-
- if stop_event.is_set():
- print('[acs_time]: receive stop event')
- return
-
- # 如果比全局的路径要好,则要将该路径发送到macs中
- if not global_path_queue.empty():
- info = global_path_queue.get()
- while not global_path_queue.empty():
- info = global_path_queue.get()
- print('[acs_time]: receive global path info')
- global_best_path, global_best_distance = info.get_path_info()
-
- if ant.index_to_visit_empty() and ant.total_travel_distance < global_best_distance:
- print('[acs_time]: found a improved feasible path, send path info to macs')
- path_found_queue.put(PathMessage(ant.travel_path, ant.total_travel_distance))
-
- # 在这里执行信息素的全局更新
- new_graph.global_update_pheromone(global_best_path, global_best_distance)
-
- @staticmethod
- def acs_vehicle(new_graph: VrptwGraph, vehicle_num: int, ants_num: int, q0: float, global_path_queue: Queue, path_found_queue: Queue, stop_event: Event):
- # 最多可以使用vehicle_num辆车,即在path中最多包含vehicle_num+1个depot中,找到路程最短的路径,
- # vehicle_num设置为比当前的best_path少一个
- print('[acs_vehicle]: start, vehicle_num %d' % vehicle_num)
- global_best_path = None
- global_best_distance = None
-
- # 使用邻近点算法初始化path 和distance
- current_path, current_path_distance = new_graph.nearest_neighbor_heuristic()
-
- # 找出当前path中未访问的结点
- current_index_to_visit = list(range(new_graph.node_num))
- for ind in set(current_path):
- current_index_to_visit.remove(ind)
-
- ants_pool = ThreadPoolExecutor(ants_num)
- ants_thread = []
- ants = []
- IN = np.zeros(new_graph.node_num)
- while True:
-
- if stop_event.is_set():
- print('[acs_vehicle]: receive stop event')
- return
-
- for k in range(ants_num):
- ant = Ant(new_graph, 0)
- thread = ants_pool.submit(VrptwAco.new_active_ant, ant, vehicle_num, True, IN, q0, stop_event)
-
- ants_thread.append(thread)
- ants.append(ant)
-
- # 这里可以使用result方法,等待线程跑完
- for thread in ants_thread:
- thread.result()
-
- for ant in ants:
-
- if stop_event.is_set():
- print('[acs_vehicle]: receive stop event')
- return
-
- index_to_visit = copy.deepcopy(ant.index_to_visit)
- IN[index_to_visit] = IN[index_to_visit]+1
-
- # 判断蚂蚁找出来的路径是否比current_path,能使用vehicle_num辆车访问到更多的结点
- if len(index_to_visit) < len(current_index_to_visit):
- current_path = copy.deepcopy(ant.travel_path)
- current_index_to_visit = index_to_visit
- current_path_distance = ant.total_travel_distance
- # 并且将IN设置为0
- IN = np.zeros(new_graph.node_num)
-
- # 如果这一条路径是feasible的话,就要发到macs_vrptw中
- if ant.index_to_visit_empty():
- print('[acs_vehicle]: found a feasible path, send path info to macs')
- path_found_queue.put(PathMessage(ant.travel_path, ant.total_travel_distance))
-
- # 更新new_graph中的信息素,global
- new_graph.global_update_pheromone(current_path, current_path_distance)
-
- if not global_path_queue.empty():
- info = global_path_queue.get()
- while not global_path_queue.empty():
- info = global_path_queue.get()
- print('[acs_vehicle]: receive global path info')
- global_best_path, global_best_distance = info.get_path_info()
-
- new_graph.global_update_pheromone(global_best_path, global_best_distance)
-
- def run_multiple_ant_colony_system(self):
- # _multiple_ant_colony_system,使用主线程来绘图
- path_queue_for_figure = Queue()
- multiple_ant_colony_system_thread = Thread(target=self._multiple_ant_colony_system, args=(path_queue_for_figure,))
- multiple_ant_colony_system_thread.start()
-
- # 是否要展示figure
- if self.whether_or_not_to_show_figure:
- figure = VrptwAcoFigure(self.graph, path_queue_for_figure)
- figure.run()
- multiple_ant_colony_system_thread.join()
-
- # 传入None作为结束标志
- if self.whether_or_not_to_show_figure:
- path_queue_for_figure.put(PathMessage(None, None))
-
- def _multiple_ant_colony_system(self, path_queue_for_figure: Queue):
- # 在这里需要两个队列,time_what_to_do、vehicle_what_to_do, 用来告诉acs_time、acs_vehicle这两个线程,当前的best path是什么,或者让他们停止计算
- global_path_to_acs_time = Queue()
- global_path_to_acs_vehicle = Queue()
-
- # 另外的一个队列, path_found_queue就是接收acs_time 和acs_vehicle计算出来的比best path还要好的feasible path
- path_found_queue = Queue()
-
- # 使用近邻点算法初始化
- self.best_path, self.best_path_distance = self.graph.nearest_neighbor_heuristic()
- self.best_vehicle_num = self.best_path.count(0) - 1
-
- while True:
- # 当前best path的信息
- global_path_to_acs_vehicle.put(PathMessage(self.best_path, self.best_path_distance))
- global_path_to_acs_time.put(PathMessage(self.best_path, self.best_path_distance))
-
- # acs_vehicle
- stop_event = Event()
- graph_for_acs_vehicle = self.graph.copy(self.graph.init_pheromone_val)
- acs_vehicle_thread = Thread(target=VrptwAco.acs_vehicle,
- args=(graph_for_acs_vehicle, self.best_vehicle_num-1, self.ants_num, self.q0,
- global_path_to_acs_vehicle, path_found_queue, stop_event))
-
- # acs_time
- graph_for_acs_time = self.graph.copy(self.graph.init_pheromone_val)
- acs_time_thread = Thread(target=VrptwAco.acs_time,
- args=(graph_for_acs_time, self.best_vehicle_num, self.ants_num, self.q0,
- global_path_to_acs_time, path_found_queue, stop_event))
-
- # 启动acs_vehicle_thread和acs_time_thread,当他们找到feasible、且是比best path好的路径时,就会发送到macs中来
- print('[macs]: start acs_vehicle and acs_time')
- acs_vehicle_thread.start()
- acs_time_thread.start()
-
- best_vehicle_num = self.best_vehicle_num
-
- while acs_vehicle_thread.is_alive() and acs_time_thread.is_alive():
-
- if path_found_queue.empty():
- continue
-
- path_info = path_found_queue.get()
- print('[macs]: receive found path info')
- found_path, found_path_distance = path_info.get_path_info()
-
- # 如果找到的路径(feasible)的距离更短,则更新当前的最佳path的信息
- if found_path_distance < self.best_path_distance:
- print('-' * 50)
- print('[macs]: distance of found path (%f) better than best path\'s (%f)' % (found_path_distance, self.best_path_distance))
- print('-' * 50)
- self.best_path = found_path
- self.best_vehicle_num = found_path.count(0) - 1
- self.best_path_distance = found_path_distance
-
- # 如果需要绘制图形,则要找到的best path发送给绘图程序
- if self.whether_or_not_to_show_figure:
- path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
-
- # 通知acs_vehicle和acs_time两个线程,当前找到的best_path和best_path_distance
- global_path_to_acs_vehicle.put(PathMessage(self.best_path, self.best_path_distance))
- global_path_to_acs_time.put(PathMessage(self.best_path, self.best_path_distance))
-
- # 如果,这两个线程找到的路径用的车辆更少了,就停止这两个线程,开始下一轮迭代
- # 向acs_time和acs_vehicle中发送停止信息
- if found_path.count(0)-1 < best_vehicle_num:
- print('-' * 50)
- print('[macs]: vehicle num of found path (%d) better than best path\'s (%d)' % (found_path.count(0)-1, best_vehicle_num))
- print('-' * 50)
- self.best_path = found_path
- self.best_vehicle_num = found_path.count(0)-1
- self.best_path_distance = found_path_distance
-
- if self.whether_or_not_to_show_figure:
- path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
-
- # 停止acs_time 和 acs_vehicle 两个线程
- print('[macs]: send stop info to acs_time and acs_vehicle')
- stop_event.set()
+from vrptw_base import VrptwGraph
+from basic_aco import BasicACO
+from multiple_ant_colony_system import MultipleAntColonySystem
if __name__ == '__main__':
file_path = './solomon-100/c101.txt'
graph = VrptwGraph(file_path)
- vrptw = VrptwAco(graph)
- vrptw.run_multiple_ant_colony_system()
+ # vrptw = BasicACO(graph)
+ # vrptw.run_basic_aco()
+
+ macs = MultipleAntColonySystem(graph)
+ macs.run_multiple_ant_colony_system()
diff --git a/multiple_ant_colony_system.py b/multiple_ant_colony_system.py
new file mode 100644
index 0000000..b292072
--- /dev/null
+++ b/multiple_ant_colony_system.py
@@ -0,0 +1,337 @@
+import numpy as np
+import random
+from vprtw_aco_figure import VrptwAcoFigure
+from vrptw_base import VrptwGraph, PathMessage
+from ant import Ant
+from threading import Thread, Event
+from queue import Queue
+from concurrent.futures import ThreadPoolExecutor
+import copy
+
+
+class MultipleAntColonySystem:
+ def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, alpha=1, beta=2):
+ super()
+ # graph 结点的位置、服务时间信息
+ self.graph = graph
+ # ants_num 蚂蚁数量
+ self.ants_num = ants_num
+ # max_iter 最大迭代次数
+ self.max_iter = max_iter
+ # vehicle_capacity 表示每辆车的最大载重
+ self.max_load = graph.vehicle_capacity
+ # 信息素强度
+ self.Q = 1
+ # alpha 信息素信息重要新
+ self.alpha = alpha
+ # beta 启发性信息重要性
+ self.beta = beta
+ # q0 表示直接选择概率最大的下一点的概率
+ self.q0 = 0.1
+ # best path
+ self.best_path_distance = None
+ self.best_path = None
+ self.best_vehicle_num = None
+
+ self.whether_or_not_to_show_figure = True
+
+ @staticmethod
+ def stochastic_accept(index_to_visit, transition_prob):
+ """
+ 轮盘赌
+ :param index_to_visit: a list of N index (list or tuple)
+ :param transition_prob:
+ :return: selected index
+ """
+ # calculate N and max fitness value
+ N = len(index_to_visit)
+
+ # normalize
+ sum_tran_prob = np.sum(transition_prob)
+ norm_transition_prob = transition_prob/sum_tran_prob
+
+ # select: O(1)
+ while True:
+ # randomly select an individual with uniform probability
+ ind = int(N * random.random())
+ if random.random() <= norm_transition_prob[ind]:
+ return index_to_visit[ind]
+
+ @staticmethod
+ def new_active_ant(ant: Ant, vehicle_num: int, local_search: bool, IN: np.numarray, q0: float, stop_event: Event):
+ # print('[new_active_ant]: start, start_index %d' % ant.travel_path[0])
+
+ # 在new_active_ant中,最多可以使用vehicle_num个车,即最多可以包含vehicle_num+1个depot结点,由于出发结点用掉了一个,所以只剩下vehicle个depot
+ unused_depot_count = vehicle_num
+
+ # 如果还有未访问的结点,并且还可以回到depot中
+ while not ant.index_to_visit_empty() and unused_depot_count > 0:
+ if stop_event.is_set():
+ # print('[new_active_ant]: receive stop event')
+ return
+
+ # 计算所有满足载重等限制的下一个结点
+ next_index_meet_constrains = ant.cal_next_index_meet_constrains()
+
+ # 如果没有满足限制的下一个结点,则回到depot中
+ if len(next_index_meet_constrains) == 0:
+ ant.move_to_next_index(0)
+ unused_depot_count -= 1
+ continue
+
+ # 开始计算满足限制的下一个结点,选择各个结点的概率
+ index_num = len(next_index_meet_constrains)
+ ready_time = np.zeros(index_num)
+ due_time = np.zeros(index_num)
+
+ for i in range(index_num):
+ ready_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].ready_time
+ due_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].due_time
+
+ delivery_time = np.array([max(i, j) for i, j in zip(ant.vehicle_travel_time + ant.graph.node_dist_mat[ant.current_index][next_index_meet_constrains], ready_time)])
+
+ delta_time = delivery_time - ant.vehicle_travel_time
+ distance = delta_time * (due_time - ant.vehicle_travel_time)
+
+ distance = np.array([max(1.0, j) for j in distance-IN[next_index_meet_constrains]])
+ closeness = 1/distance
+
+ # 按照概率直接选择closeness最大的结点
+ if np.random.rand() < q0:
+ max_prob_index = np.argmax(closeness)
+ next_index = next_index_meet_constrains[max_prob_index]
+ else:
+ # 使用轮盘赌算法
+ next_index = MultipleAntColonySystem.stochastic_accept(next_index_meet_constrains, closeness)
+
+ # 更新信息素矩阵
+ ant.graph.local_update_pheromone(ant.current_index, next_index)
+ ant.move_to_next_index(next_index)
+
+ # 如果走完所有的点了,需要回到depot
+ if ant.index_to_visit_empty():
+ ant.move_to_next_index(0)
+
+ # 对未访问的点进行插入,保证path是可行的
+ ant.insertion_procedure(stop_event)
+
+ # ant.index_to_visit_empty()==True就是feasible的意思
+ if local_search is True and ant.index_to_visit_empty():
+ ant.local_search_procedure(stop_event)
+
+ @staticmethod
+ def acs_time(new_graph: VrptwGraph, vehicle_num, ants_num, q0, global_path_queue: Queue, path_found_queue: Queue, stop_event: Event):
+ # 最多可以使用vehicle_num辆车,即在path中最多包含vehicle_num+1个depot中,找到路程最短的路径,
+ # vehicle_num设置为与当前的best_path一致
+ print('[acs_time]: start, vehicle_num %d' % vehicle_num)
+ # 初始化信息素矩阵
+ global_best_path = None
+ global_best_distance = None
+ ants_pool = ThreadPoolExecutor(ants_num)
+ ants_thread = []
+ ants = []
+ while True:
+
+ if stop_event.is_set():
+ print('[acs_time]: receive stop event')
+ return
+
+ for k in range(ants_num):
+ ant = Ant(new_graph, 0)
+ thread = ants_pool.submit(MultipleAntColonySystem.new_active_ant, ant, vehicle_num, True, np.zeros(new_graph.node_num), q0, stop_event)
+
+ ants_thread.append(thread)
+ ants.append(ant)
+
+ # 这里可以使用result方法,等待线程跑完
+ for thread in ants_thread:
+ thread.result()
+
+ # 判断蚂蚁找出来的路径是否是feasible的,并且比全局的路径要好
+ for ant in ants:
+
+ if stop_event.is_set():
+ print('[acs_time]: receive stop event')
+ return
+
+ # 如果比全局的路径要好,则要将该路径发送到macs中
+ if not global_path_queue.empty():
+ info = global_path_queue.get()
+ while not global_path_queue.empty():
+ info = global_path_queue.get()
+ print('[acs_time]: receive global path info')
+ global_best_path, global_best_distance = info.get_path_info()
+
+ if ant.index_to_visit_empty() and ant.total_travel_distance < global_best_distance:
+ print('[acs_time]: found a improved feasible path, send path info to macs')
+ path_found_queue.put(PathMessage(ant.travel_path, ant.total_travel_distance))
+
+ # 在这里执行信息素的全局更新
+ new_graph.global_update_pheromone(global_best_path, global_best_distance)
+
+ @staticmethod
+ def acs_vehicle(new_graph: VrptwGraph, vehicle_num: int, ants_num: int, q0: float, global_path_queue: Queue, path_found_queue: Queue, stop_event: Event):
+ # 最多可以使用vehicle_num辆车,即在path中最多包含vehicle_num+1个depot中,找到路程最短的路径,
+ # vehicle_num设置为比当前的best_path少一个
+ print('[acs_vehicle]: start, vehicle_num %d' % vehicle_num)
+ global_best_path = None
+ global_best_distance = None
+
+ # 使用邻近点算法初始化path 和distance
+ current_path, current_path_distance = new_graph.nearest_neighbor_heuristic()
+
+ # 找出当前path中未访问的结点
+ current_index_to_visit = list(range(new_graph.node_num))
+ for ind in set(current_path):
+ current_index_to_visit.remove(ind)
+
+ ants_pool = ThreadPoolExecutor(ants_num)
+ ants_thread = []
+ ants = []
+ IN = np.zeros(new_graph.node_num)
+ while True:
+
+ if stop_event.is_set():
+ print('[acs_vehicle]: receive stop event')
+ return
+
+ for k in range(ants_num):
+ ant = Ant(new_graph, 0)
+ thread = ants_pool.submit(MultipleAntColonySystem.new_active_ant, ant, vehicle_num, True, IN, q0, stop_event)
+
+ ants_thread.append(thread)
+ ants.append(ant)
+
+ # 这里可以使用result方法,等待线程跑完
+ for thread in ants_thread:
+ thread.result()
+
+ for ant in ants:
+
+ if stop_event.is_set():
+ print('[acs_vehicle]: receive stop event')
+ return
+
+ index_to_visit = copy.deepcopy(ant.index_to_visit)
+ IN[index_to_visit] = IN[index_to_visit]+1
+
+ # 判断蚂蚁找出来的路径是否比current_path,能使用vehicle_num辆车访问到更多的结点
+ if len(index_to_visit) < len(current_index_to_visit):
+ current_path = copy.deepcopy(ant.travel_path)
+ current_index_to_visit = index_to_visit
+ current_path_distance = ant.total_travel_distance
+ # 并且将IN设置为0
+ IN = np.zeros(new_graph.node_num)
+
+ # 如果这一条路径是feasible的话,就要发到macs_vrptw中
+ if ant.index_to_visit_empty():
+ print('[acs_vehicle]: found a feasible path, send path info to macs')
+ path_found_queue.put(PathMessage(ant.travel_path, ant.total_travel_distance))
+
+ # 更新new_graph中的信息素,global
+ new_graph.global_update_pheromone(current_path, current_path_distance)
+
+ if not global_path_queue.empty():
+ info = global_path_queue.get()
+ while not global_path_queue.empty():
+ info = global_path_queue.get()
+ print('[acs_vehicle]: receive global path info')
+ global_best_path, global_best_distance = info.get_path_info()
+
+ new_graph.global_update_pheromone(global_best_path, global_best_distance)
+
+ def run_multiple_ant_colony_system(self):
+ # _multiple_ant_colony_system,使用主线程来绘图
+ path_queue_for_figure = Queue()
+ multiple_ant_colony_system_thread = Thread(target=self._multiple_ant_colony_system, args=(path_queue_for_figure,))
+ multiple_ant_colony_system_thread.start()
+
+ # 是否要展示figure
+ if self.whether_or_not_to_show_figure:
+ figure = VrptwAcoFigure(self.graph, path_queue_for_figure)
+ figure.run()
+ multiple_ant_colony_system_thread.join()
+
+ # 传入None作为结束标志
+ if self.whether_or_not_to_show_figure:
+ path_queue_for_figure.put(PathMessage(None, None))
+
+ def _multiple_ant_colony_system(self, path_queue_for_figure: Queue):
+ # 在这里需要两个队列,time_what_to_do、vehicle_what_to_do, 用来告诉acs_time、acs_vehicle这两个线程,当前的best path是什么,或者让他们停止计算
+ global_path_to_acs_time = Queue()
+ global_path_to_acs_vehicle = Queue()
+
+ # 另外的一个队列, path_found_queue就是接收acs_time 和acs_vehicle计算出来的比best path还要好的feasible path
+ path_found_queue = Queue()
+
+ # 使用近邻点算法初始化
+ self.best_path, self.best_path_distance = self.graph.nearest_neighbor_heuristic()
+ self.best_vehicle_num = self.best_path.count(0) - 1
+
+ while True:
+ # 当前best path的信息
+ global_path_to_acs_vehicle.put(PathMessage(self.best_path, self.best_path_distance))
+ global_path_to_acs_time.put(PathMessage(self.best_path, self.best_path_distance))
+
+ # acs_vehicle
+ stop_event = Event()
+ graph_for_acs_vehicle = self.graph.copy(self.graph.init_pheromone_val)
+ acs_vehicle_thread = Thread(target=MultipleAntColonySystem.acs_vehicle,
+ args=(graph_for_acs_vehicle, self.best_vehicle_num-1, self.ants_num, self.q0,
+ global_path_to_acs_vehicle, path_found_queue, stop_event))
+
+ # acs_time
+ graph_for_acs_time = self.graph.copy(self.graph.init_pheromone_val)
+ acs_time_thread = Thread(target=MultipleAntColonySystem.acs_time,
+ args=(graph_for_acs_time, self.best_vehicle_num, self.ants_num, self.q0,
+ global_path_to_acs_time, path_found_queue, stop_event))
+
+ # 启动acs_vehicle_thread和acs_time_thread,当他们找到feasible、且是比best path好的路径时,就会发送到macs中来
+ print('[macs]: start acs_vehicle and acs_time')
+ acs_vehicle_thread.start()
+ acs_time_thread.start()
+
+ best_vehicle_num = self.best_vehicle_num
+
+ while acs_vehicle_thread.is_alive() and acs_time_thread.is_alive():
+
+ if path_found_queue.empty():
+ continue
+
+ path_info = path_found_queue.get()
+ print('[macs]: receive found path info')
+ found_path, found_path_distance = path_info.get_path_info()
+
+ # 如果找到的路径(feasible)的距离更短,则更新当前的最佳path的信息
+ if found_path_distance < self.best_path_distance:
+ print('-' * 50)
+ print('[macs]: distance of found path (%f) better than best path\'s (%f)' % (found_path_distance, self.best_path_distance))
+ print('-' * 50)
+ self.best_path = found_path
+ self.best_vehicle_num = found_path.count(0) - 1
+ self.best_path_distance = found_path_distance
+
+ # 如果需要绘制图形,则要找到的best path发送给绘图程序
+ if self.whether_or_not_to_show_figure:
+ path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
+
+ # 通知acs_vehicle和acs_time两个线程,当前找到的best_path和best_path_distance
+ global_path_to_acs_vehicle.put(PathMessage(self.best_path, self.best_path_distance))
+ global_path_to_acs_time.put(PathMessage(self.best_path, self.best_path_distance))
+
+ # 如果,这两个线程找到的路径用的车辆更少了,就停止这两个线程,开始下一轮迭代
+ # 向acs_time和acs_vehicle中发送停止信息
+ if found_path.count(0)-1 < best_vehicle_num:
+ print('-' * 50)
+ print('[macs]: vehicle num of found path (%d) better than best path\'s (%d)' % (found_path.count(0)-1, best_vehicle_num))
+ print('-' * 50)
+ self.best_path = found_path
+ self.best_vehicle_num = found_path.count(0)-1
+ self.best_path_distance = found_path_distance
+
+ if self.whether_or_not_to_show_figure:
+ path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
+
+ # 停止acs_time 和 acs_vehicle 两个线程
+ print('[macs]: send stop info to acs_time and acs_vehicle')
+ stop_event.set()