summaryrefslogtreecommitdiffstats
path: root/basic_aco.py
diff options
context:
space:
mode:
Diffstat (limited to 'basic_aco.py')
-rw-r--r--basic_aco.py142
1 files changed, 142 insertions, 0 deletions
diff --git a/basic_aco.py b/basic_aco.py
new file mode 100644
index 0000000..3bdc24c
--- /dev/null
+++ b/basic_aco.py
@@ -0,0 +1,142 @@
+import numpy as np
+import random
+from vprtw_aco_figure import VrptwAcoFigure
+from vrptw_base import VrptwGraph, PathMessage
+from ant import Ant
+from threading import Thread
+from queue import Queue
+
+
+class BasicACO:
+ def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, alpha=1, beta=2):
+ super()
+ # graph 结点的位置、服务时间信息
+ self.graph = graph
+ # ants_num 蚂蚁数量
+ self.ants_num = ants_num
+ # max_iter 最大迭代次数
+ self.max_iter = max_iter
+ # vehicle_capacity 表示每辆车的最大载重
+ self.max_load = graph.vehicle_capacity
+ # 信息素强度
+ self.Q = 1
+ # alpha 信息素信息重要新
+ self.alpha = alpha
+ # beta 启发性信息重要性
+ self.beta = beta
+ # q0 表示直接选择概率最大的下一点的概率
+ self.q0 = 0.1
+ # best path
+ self.best_path_distance = None
+ self.best_path = None
+ self.best_vehicle_num = None
+
+ self.whether_or_not_to_show_figure = True
+
+ def run_basic_aco(self):
+ # 开启一个线程来跑_basic_aco,使用主线程来绘图
+ path_queue_for_figure = Queue()
+ basic_aco_thread = Thread(target=self._basic_aco, args=(path_queue_for_figure,))
+ basic_aco_thread.start()
+
+ # 是否要展示figure
+ if self.whether_or_not_to_show_figure:
+ figure = VrptwAcoFigure(self.graph, path_queue_for_figure)
+ figure.run()
+ basic_aco_thread.join()
+
+ # 传入None作为结束标志
+ if self.whether_or_not_to_show_figure:
+ path_queue_for_figure.put(PathMessage(None, None))
+
+ def _basic_aco(self, path_queue_for_figure: Queue):
+ """
+ 最基本的蚁群算法
+ :return:
+ """
+ # 最大迭代次数
+ for iter in range(self.max_iter):
+
+ # 为每只蚂蚁设置当前车辆负载,当前旅行距离,当前时间
+ ants = list(Ant(self.graph) for _ in range(self.ants_num))
+ for k in range(self.ants_num):
+
+ # 蚂蚁需要访问完所有的客户
+ while not ants[k].index_to_visit_empty():
+ next_index = self.select_next_index(ants[k])
+ # 判断加入该位置后,是否还满足约束条件, 如果不满足,则再选择一次,然后再进行判断
+ if not ants[k].check_condition(next_index):
+ next_index = self.select_next_index(ants[k])
+ if not ants[k].check_condition(next_index):
+ next_index = 0
+
+ # 更新蚂蚁路径
+ ants[k].move_to_next_index(next_index)
+ self.graph.local_update_pheromone(ants[k].current_index, next_index)
+
+ # 最终回到0位置
+ ants[k].move_to_next_index(0)
+ self.graph.local_update_pheromone(ants[k].current_index, 0)
+
+ # 计算所有蚂蚁的路径长度
+ paths_distance = np.array([ant.total_travel_distance for ant in ants])
+
+ # 记录当前的最佳路径
+ best_index = np.argmin(paths_distance)
+ if self.best_path is None:
+ self.best_path = ants[int(best_index)].travel_path
+ self.best_path_distance = paths_distance[best_index]
+ if self.whether_or_not_to_show_figure:
+ path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
+
+ elif paths_distance[best_index] < self.best_path_distance:
+ self.best_path = ants[int(best_index)].travel_path
+ self.best_path_distance = paths_distance[best_index]
+ if self.whether_or_not_to_show_figure:
+ path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
+
+ print('[iteration %d]: best distance %f' % (iter, self.best_path_distance))
+ # 更新信息素表
+ self.graph.global_update_pheromone(self.best_path, self.best_path_distance)
+
+ def select_next_index(self, ant):
+ """
+ 选择下一个结点
+ :param ant:
+ :return:
+ """
+ current_index = ant.current_index
+ index_to_visit = ant.index_to_visit
+
+ transition_prob = np.power(self.graph.pheromone_mat[current_index][index_to_visit], self.alpha) * \
+ np.power(self.graph.heuristic_info_mat[current_index][index_to_visit], self.beta)
+
+ if np.random.rand() < self.q0:
+ max_prob_index = np.argmax(transition_prob)
+ next_index = index_to_visit[max_prob_index]
+ else:
+ # 使用轮盘赌算法
+ next_index = BasicACO.stochastic_accept(index_to_visit, transition_prob)
+ return next_index
+
+ @staticmethod
+ def stochastic_accept(index_to_visit, transition_prob):
+ """
+ 轮盘赌
+ :param index_to_visit: a list of N index (list or tuple)
+ :param transition_prob:
+ :return: selected index
+ """
+ # calculate N and max fitness value
+ N = len(index_to_visit)
+
+ # normalize
+ sum_tran_prob = np.sum(transition_prob)
+ norm_transition_prob = transition_prob/sum_tran_prob
+
+ # select: O(1)
+ while True:
+ # randomly select an individual with uniform probability
+ ind = int(N * random.random())
+ if random.random() <= norm_transition_prob[ind]:
+ return index_to_visit[ind]