From 7dd68707124da17bc0de801de4c690be515926ab Mon Sep 17 00:00:00 2001 From: JonZhao <1044264932@qq.com> Date: Tue, 28 May 2019 21:19:08 +0800 Subject: =?UTF-8?q?=E5=B0=86=E4=B8=A4=E4=B8=AA=E8=9A=81=E7=BE=A4=E7=AE=97?= =?UTF-8?q?=E6=B3=95=E5=88=92=E5=88=86=E6=88=90=E4=B8=A4=E4=B8=AA=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- basic_aco.py | 142 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 basic_aco.py (limited to 'basic_aco.py') diff --git a/basic_aco.py b/basic_aco.py new file mode 100644 index 0000000..3bdc24c --- /dev/null +++ b/basic_aco.py @@ -0,0 +1,142 @@ +import numpy as np +import random +from vprtw_aco_figure import VrptwAcoFigure +from vrptw_base import VrptwGraph, PathMessage +from ant import Ant +from threading import Thread +from queue import Queue + + +class BasicACO: + def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, alpha=1, beta=2): + super() + # graph 结点的位置、服务时间信息 + self.graph = graph + # ants_num 蚂蚁数量 + self.ants_num = ants_num + # max_iter 最大迭代次数 + self.max_iter = max_iter + # vehicle_capacity 表示每辆车的最大载重 + self.max_load = graph.vehicle_capacity + # 信息素强度 + self.Q = 1 + # alpha 信息素信息重要新 + self.alpha = alpha + # beta 启发性信息重要性 + self.beta = beta + # q0 表示直接选择概率最大的下一点的概率 + self.q0 = 0.1 + # best path + self.best_path_distance = None + self.best_path = None + self.best_vehicle_num = None + + self.whether_or_not_to_show_figure = True + + def run_basic_aco(self): + # 开启一个线程来跑_basic_aco,使用主线程来绘图 + path_queue_for_figure = Queue() + basic_aco_thread = Thread(target=self._basic_aco, args=(path_queue_for_figure,)) + basic_aco_thread.start() + + # 是否要展示figure + if self.whether_or_not_to_show_figure: + figure = VrptwAcoFigure(self.graph, path_queue_for_figure) + figure.run() + basic_aco_thread.join() + + # 传入None作为结束标志 + if self.whether_or_not_to_show_figure: + path_queue_for_figure.put(PathMessage(None, None)) + + def _basic_aco(self, path_queue_for_figure: Queue): + """ + 最基本的蚁群算法 + :return: + """ + # 最大迭代次数 + for iter in range(self.max_iter): + + # 为每只蚂蚁设置当前车辆负载,当前旅行距离,当前时间 + ants = list(Ant(self.graph) for _ in range(self.ants_num)) + for k in range(self.ants_num): + + # 蚂蚁需要访问完所有的客户 + while not ants[k].index_to_visit_empty(): + next_index = self.select_next_index(ants[k]) + # 判断加入该位置后,是否还满足约束条件, 如果不满足,则再选择一次,然后再进行判断 + if not ants[k].check_condition(next_index): + next_index = self.select_next_index(ants[k]) + if not ants[k].check_condition(next_index): + next_index = 0 + + # 更新蚂蚁路径 + ants[k].move_to_next_index(next_index) + self.graph.local_update_pheromone(ants[k].current_index, next_index) + + # 最终回到0位置 + ants[k].move_to_next_index(0) + self.graph.local_update_pheromone(ants[k].current_index, 0) + + # 计算所有蚂蚁的路径长度 + paths_distance = np.array([ant.total_travel_distance for ant in ants]) + + # 记录当前的最佳路径 + best_index = np.argmin(paths_distance) + if self.best_path is None: + self.best_path = ants[int(best_index)].travel_path + self.best_path_distance = paths_distance[best_index] + if self.whether_or_not_to_show_figure: + path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance)) + + elif paths_distance[best_index] < self.best_path_distance: + self.best_path = ants[int(best_index)].travel_path + self.best_path_distance = paths_distance[best_index] + if self.whether_or_not_to_show_figure: + path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance)) + + print('[iteration %d]: best distance %f' % (iter, self.best_path_distance)) + # 更新信息素表 + self.graph.global_update_pheromone(self.best_path, self.best_path_distance) + + def select_next_index(self, ant): + """ + 选择下一个结点 + :param ant: + :return: + """ + current_index = ant.current_index + index_to_visit = ant.index_to_visit + + transition_prob = np.power(self.graph.pheromone_mat[current_index][index_to_visit], self.alpha) * \ + np.power(self.graph.heuristic_info_mat[current_index][index_to_visit], self.beta) + + if np.random.rand() < self.q0: + max_prob_index = np.argmax(transition_prob) + next_index = index_to_visit[max_prob_index] + else: + # 使用轮盘赌算法 + next_index = BasicACO.stochastic_accept(index_to_visit, transition_prob) + return next_index + + @staticmethod + def stochastic_accept(index_to_visit, transition_prob): + """ + 轮盘赌 + :param index_to_visit: a list of N index (list or tuple) + :param transition_prob: + :return: selected index + """ + # calculate N and max fitness value + N = len(index_to_visit) + + # normalize + sum_tran_prob = np.sum(transition_prob) + norm_transition_prob = transition_prob/sum_tran_prob + + # select: O(1) + while True: + # randomly select an individual with uniform probability + ind = int(N * random.random()) + if random.random() <= norm_transition_prob[ind]: + return index_to_visit[ind] -- cgit v1.2.3