summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md3
-rw-r--r--basic_aco.py18
-rw-r--r--example1.py (renamed from main.py)4
-rw-r--r--example2.py10
-rw-r--r--multiple_ant_colony_system.py84
-rw-r--r--vprtw_aco_figure.py25
-rw-r--r--vrptw_base.py9
7 files changed, 106 insertions, 47 deletions
diff --git a/README.md b/README.md
index 7d8e164..16d2a4a 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,5 @@
# VRPTW-ACO-python
A **python** implementation of a **ant colony optimization** based solution to **Vehicle Routing Problem with Time Windows**.
+
+## Reference
+1. `Gambardella L M, Taillard É, Agazzi G. Macs-vrptw: A multiple colony system for vehicle routing problems with time windows[C]//New ideas in optimization. 1999.` \ No newline at end of file
diff --git a/basic_aco.py b/basic_aco.py
index 3bdc24c..bbadb52 100644
--- a/basic_aco.py
+++ b/basic_aco.py
@@ -8,7 +8,7 @@ from queue import Queue
class BasicACO:
- def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, alpha=1, beta=2):
+ def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, beta=2):
super()
# graph 结点的位置、服务时间信息
self.graph = graph
@@ -20,8 +20,6 @@ class BasicACO:
self.max_load = graph.vehicle_capacity
# 信息素强度
self.Q = 1
- # alpha 信息素信息重要新
- self.alpha = alpha
# beta 启发性信息重要性
self.beta = beta
# q0 表示直接选择概率最大的下一点的概率
@@ -41,7 +39,7 @@ class BasicACO:
# 是否要展示figure
if self.whether_or_not_to_show_figure:
- figure = VrptwAcoFigure(self.graph, path_queue_for_figure)
+ figure = VrptwAcoFigure(self.graph.nodes, path_queue_for_figure)
figure.run()
basic_aco_thread.join()
@@ -55,6 +53,7 @@ class BasicACO:
:return:
"""
# 最大迭代次数
+ start_iteration = 0
for iter in range(self.max_iter):
# 为每只蚂蚁设置当前车辆负载,当前旅行距离,当前时间
@@ -86,12 +85,16 @@ class BasicACO:
if self.best_path is None:
self.best_path = ants[int(best_index)].travel_path
self.best_path_distance = paths_distance[best_index]
+
+ # 图形化展示
if self.whether_or_not_to_show_figure:
path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
elif paths_distance[best_index] < self.best_path_distance:
self.best_path = ants[int(best_index)].travel_path
self.best_path_distance = paths_distance[best_index]
+
+ # 图形化展示
if self.whether_or_not_to_show_figure:
path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
@@ -99,6 +102,10 @@ class BasicACO:
# 更新信息素表
self.graph.global_update_pheromone(self.best_path, self.best_path_distance)
+ given_iteration = 50
+ if iter - start_iteration > given_iteration:
+ print('iteration is up: can not find better solution in %d iteration' % given_iteration)
+
def select_next_index(self, ant):
"""
选择下一个结点
@@ -108,8 +115,9 @@ class BasicACO:
current_index = ant.current_index
index_to_visit = ant.index_to_visit
- transition_prob = np.power(self.graph.pheromone_mat[current_index][index_to_visit], self.alpha) * \
+ transition_prob = self.graph.pheromone_mat[current_index][index_to_visit] * \
np.power(self.graph.heuristic_info_mat[current_index][index_to_visit], self.beta)
+ transition_prob = transition_prob / np.sum(transition_prob)
if np.random.rand() < self.q0:
max_prob_index = np.argmax(transition_prob)
diff --git a/main.py b/example1.py
index efb4951..c7629e8 100644
--- a/main.py
+++ b/example1.py
@@ -1,5 +1,4 @@
from vrptw_base import VrptwGraph
-from basic_aco import BasicACO
from multiple_ant_colony_system import MultipleAntColonySystem
@@ -7,8 +6,5 @@ if __name__ == '__main__':
file_path = './solomon-100/c101.txt'
graph = VrptwGraph(file_path)
- # vrptw = BasicACO(graph)
- # vrptw.run_basic_aco()
-
macs = MultipleAntColonySystem(graph)
macs.run_multiple_ant_colony_system()
diff --git a/example2.py b/example2.py
new file mode 100644
index 0000000..16e12b2
--- /dev/null
+++ b/example2.py
@@ -0,0 +1,10 @@
+from vrptw_base import VrptwGraph
+from basic_aco import BasicACO
+
+
+if __name__ == '__main__':
+ file_path = './solomon-100/c101.txt'
+ graph = VrptwGraph(file_path)
+
+ vrptw = BasicACO(graph)
+ vrptw.run_basic_aco() \ No newline at end of file
diff --git a/multiple_ant_colony_system.py b/multiple_ant_colony_system.py
index b292072..9a9dcb1 100644
--- a/multiple_ant_colony_system.py
+++ b/multiple_ant_colony_system.py
@@ -7,6 +7,7 @@ from threading import Thread, Event
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import copy
+import time
class MultipleAntColonySystem:
@@ -58,7 +59,7 @@ class MultipleAntColonySystem:
return index_to_visit[ind]
@staticmethod
- def new_active_ant(ant: Ant, vehicle_num: int, local_search: bool, IN: np.numarray, q0: float, stop_event: Event):
+ def new_active_ant(ant: Ant, vehicle_num: int, local_search: bool, IN: np.numarray, q0: float, beta: int, stop_event: Event):
# print('[new_active_ant]: start, start_index %d' % ant.travel_path[0])
# 在new_active_ant中,最多可以使用vehicle_num个车,即最多可以包含vehicle_num+1个depot结点,由于出发结点用掉了一个,所以只剩下vehicle个depot
@@ -96,13 +97,18 @@ class MultipleAntColonySystem:
distance = np.array([max(1.0, j) for j in distance-IN[next_index_meet_constrains]])
closeness = 1/distance
+ transition_prob = ant.graph.pheromone_mat[ant.current_index][next_index_meet_constrains] * \
+ np.power(closeness, beta)
+
+ transition_prob = transition_prob / np.sum(transition_prob)
+
# 按照概率直接选择closeness最大的结点
if np.random.rand() < q0:
- max_prob_index = np.argmax(closeness)
+ max_prob_index = np.argmax(transition_prob)
next_index = next_index_meet_constrains[max_prob_index]
else:
# 使用轮盘赌算法
- next_index = MultipleAntColonySystem.stochastic_accept(next_index_meet_constrains, closeness)
+ next_index = MultipleAntColonySystem.stochastic_accept(next_index_meet_constrains, transition_prob)
# 更新信息素矩阵
ant.graph.local_update_pheromone(ant.current_index, next_index)
@@ -120,7 +126,9 @@ class MultipleAntColonySystem:
ant.local_search_procedure(stop_event)
@staticmethod
- def acs_time(new_graph: VrptwGraph, vehicle_num, ants_num, q0, global_path_queue: Queue, path_found_queue: Queue, stop_event: Event):
+ def acs_time(new_graph: VrptwGraph, vehicle_num: int, ants_num: int, q0: float, beta: int,
+ global_path_queue: Queue, path_found_queue: Queue, stop_event: Event):
+
# 最多可以使用vehicle_num辆车,即在path中最多包含vehicle_num+1个depot中,找到路程最短的路径,
# vehicle_num设置为与当前的best_path一致
print('[acs_time]: start, vehicle_num %d' % vehicle_num)
@@ -138,7 +146,8 @@ class MultipleAntColonySystem:
for k in range(ants_num):
ant = Ant(new_graph, 0)
- thread = ants_pool.submit(MultipleAntColonySystem.new_active_ant, ant, vehicle_num, True, np.zeros(new_graph.node_num), q0, stop_event)
+ thread = ants_pool.submit(MultipleAntColonySystem.new_active_ant, ant, vehicle_num, True,
+ np.zeros(new_graph.node_num), q0, beta, stop_event)
ants_thread.append(thread)
ants.append(ant)
@@ -154,14 +163,15 @@ class MultipleAntColonySystem:
print('[acs_time]: receive stop event')
return
- # 如果比全局的路径要好,则要将该路径发送到macs中
+ # 获取当前的best path
if not global_path_queue.empty():
info = global_path_queue.get()
while not global_path_queue.empty():
info = global_path_queue.get()
print('[acs_time]: receive global path info')
- global_best_path, global_best_distance = info.get_path_info()
+ global_best_path, global_best_distance, global_used_vehicle_num = info.get_path_info()
+ # 如果比全局的路径要好,则要将该路径发送到macs中
if ant.index_to_visit_empty() and ant.total_travel_distance < global_best_distance:
print('[acs_time]: found a improved feasible path, send path info to macs')
path_found_queue.put(PathMessage(ant.travel_path, ant.total_travel_distance))
@@ -170,15 +180,17 @@ class MultipleAntColonySystem:
new_graph.global_update_pheromone(global_best_path, global_best_distance)
@staticmethod
- def acs_vehicle(new_graph: VrptwGraph, vehicle_num: int, ants_num: int, q0: float, global_path_queue: Queue, path_found_queue: Queue, stop_event: Event):
+ def acs_vehicle(new_graph: VrptwGraph, vehicle_num: int, ants_num: int, q0: float, beta: int,
+ global_path_queue: Queue, path_found_queue: Queue, stop_event: Event):
+
# 最多可以使用vehicle_num辆车,即在path中最多包含vehicle_num+1个depot中,找到路程最短的路径,
# vehicle_num设置为比当前的best_path少一个
print('[acs_vehicle]: start, vehicle_num %d' % vehicle_num)
global_best_path = None
global_best_distance = None
- # 使用邻近点算法初始化path 和distance
- current_path, current_path_distance = new_graph.nearest_neighbor_heuristic()
+ # 使用nearest_neighbor_heuristic算法初始化path 和distance
+ current_path, current_path_distance, _ = new_graph.nearest_neighbor_heuristic()
# 找出当前path中未访问的结点
current_index_to_visit = list(range(new_graph.node_num))
@@ -197,7 +209,8 @@ class MultipleAntColonySystem:
for k in range(ants_num):
ant = Ant(new_graph, 0)
- thread = ants_pool.submit(MultipleAntColonySystem.new_active_ant, ant, vehicle_num, True, IN, q0, stop_event)
+ thread = ants_pool.submit(MultipleAntColonySystem.new_active_ant, ant, vehicle_num, True, IN, q0,
+ beta, stop_event)
ants_thread.append(thread)
ants.append(ant)
@@ -215,7 +228,7 @@ class MultipleAntColonySystem:
index_to_visit = copy.deepcopy(ant.index_to_visit)
IN[index_to_visit] = IN[index_to_visit]+1
- # 判断蚂蚁找出来的路径是否比current_path,能使用vehicle_num辆车访问到更多的结点
+ # 蚂蚁找出来的路径与current_path进行比较,是否能使用vehicle_num辆车访问到更多的结点
if len(index_to_visit) < len(current_index_to_visit):
current_path = copy.deepcopy(ant.travel_path)
current_index_to_visit = index_to_visit
@@ -236,7 +249,7 @@ class MultipleAntColonySystem:
while not global_path_queue.empty():
info = global_path_queue.get()
print('[acs_vehicle]: receive global path info')
- global_best_path, global_best_distance = info.get_path_info()
+ global_best_path, global_best_distance, global_used_vehicle_num = info.get_path_info()
new_graph.global_update_pheromone(global_best_path, global_best_distance)
@@ -248,7 +261,7 @@ class MultipleAntColonySystem:
# 是否要展示figure
if self.whether_or_not_to_show_figure:
- figure = VrptwAcoFigure(self.graph, path_queue_for_figure)
+ figure = VrptwAcoFigure(self.graph.nodes, path_queue_for_figure)
figure.run()
multiple_ant_colony_system_thread.join()
@@ -265,25 +278,27 @@ class MultipleAntColonySystem:
path_found_queue = Queue()
# 使用近邻点算法初始化
- self.best_path, self.best_path_distance = self.graph.nearest_neighbor_heuristic()
- self.best_vehicle_num = self.best_path.count(0) - 1
+ self.best_path, self.best_path_distance, self.best_vehicle_num = self.graph.nearest_neighbor_heuristic()
while True:
- # 当前best path的信息
+ start_time = time.time()
+
+ # 当前best path的信息,放在queue中以通知acs_time和acs_vehicle当前的best_path是什么
global_path_to_acs_vehicle.put(PathMessage(self.best_path, self.best_path_distance))
global_path_to_acs_time.put(PathMessage(self.best_path, self.best_path_distance))
- # acs_vehicle
stop_event = Event()
+
+ # acs_vehicle,尝试以self.best_vehicle_num-1辆车去探索,访问更多的结点
graph_for_acs_vehicle = self.graph.copy(self.graph.init_pheromone_val)
acs_vehicle_thread = Thread(target=MultipleAntColonySystem.acs_vehicle,
args=(graph_for_acs_vehicle, self.best_vehicle_num-1, self.ants_num, self.q0,
- global_path_to_acs_vehicle, path_found_queue, stop_event))
+ self.beta, global_path_to_acs_vehicle, path_found_queue, stop_event))
- # acs_time
+ # acs_time 尝试以self.best_vehicle_num辆车去探索,找到更短的路径
graph_for_acs_time = self.graph.copy(self.graph.init_pheromone_val)
acs_time_thread = Thread(target=MultipleAntColonySystem.acs_time,
- args=(graph_for_acs_time, self.best_vehicle_num, self.ants_num, self.q0,
+ args=(graph_for_acs_time, self.best_vehicle_num, self.ants_num, self.q0, self.beta,
global_path_to_acs_time, path_found_queue, stop_event))
# 启动acs_vehicle_thread和acs_time_thread,当他们找到feasible、且是比best path好的路径时,就会发送到macs中来
@@ -295,20 +310,31 @@ class MultipleAntColonySystem:
while acs_vehicle_thread.is_alive() and acs_time_thread.is_alive():
+ # 如果在指定时间内没有搜索到更好的结果,则退出程序
+ end_time = time.time()
+ if end_time - start_time > 60 * 5:
+ stop_event.set()
+ print('time is up: cannot find a better solution in given time')
+ return
+
if path_found_queue.empty():
continue
path_info = path_found_queue.get()
print('[macs]: receive found path info')
- found_path, found_path_distance = path_info.get_path_info()
+ found_path, found_path_distance, found_path_used_vehicle_num = path_info.get_path_info()
- # 如果找到的路径(feasible)的距离更短,则更新当前的最佳path的信息
+ # 如果找到的路径(which is feasible)的距离更短,则更新当前的最佳path的信息
if found_path_distance < self.best_path_distance:
+
+ # 搜索到更好的结果,更新start_time
+ start_time = time.time()
+
print('-' * 50)
print('[macs]: distance of found path (%f) better than best path\'s (%f)' % (found_path_distance, self.best_path_distance))
print('-' * 50)
self.best_path = found_path
- self.best_vehicle_num = found_path.count(0) - 1
+ self.best_vehicle_num = found_path_used_vehicle_num
self.best_path_distance = found_path_distance
# 如果需要绘制图形,则要找到的best path发送给绘图程序
@@ -321,12 +347,16 @@ class MultipleAntColonySystem:
# 如果,这两个线程找到的路径用的车辆更少了,就停止这两个线程,开始下一轮迭代
# 向acs_time和acs_vehicle中发送停止信息
- if found_path.count(0)-1 < best_vehicle_num:
+ if found_path_used_vehicle_num < best_vehicle_num:
+
+ # 搜索到更好的结果,更新start_time
+ start_time = time.time()
+
print('-' * 50)
- print('[macs]: vehicle num of found path (%d) better than best path\'s (%d)' % (found_path.count(0)-1, best_vehicle_num))
+ print('[macs]: vehicle num of found path (%d) better than best path\'s (%d)' % (found_path_used_vehicle_num, best_vehicle_num))
print('-' * 50)
self.best_path = found_path
- self.best_vehicle_num = found_path.count(0)-1
+ self.best_vehicle_num = found_path_used_vehicle_num
self.best_path_distance = found_path_distance
if self.whether_or_not_to_show_figure:
diff --git a/vprtw_aco_figure.py b/vprtw_aco_figure.py
index 99e71f4..01dbc80 100644
--- a/vprtw_aco_figure.py
+++ b/vprtw_aco_figure.py
@@ -1,11 +1,20 @@
import matplotlib.pyplot as plt
-from vrptw_base import VrptwGraph
from queue import Queue
class VrptwAcoFigure:
- def __init__(self, graph: VrptwGraph, path_queue: Queue):
- self.graph = graph
+ def __init__(self, nodes: list, path_queue: Queue):
+ """
+ matplotlib绘图计算需要放在主线程,寻找路径的工作建议另外开一个线程,
+ 当寻找路径的线程找到一个新的path的时候,将path放在path_queue中,图形绘制线程就会自动进行绘制
+ queue中存放的path以PathMessage(class)的形式存在
+ nodes中存放的结点以Node(class)的形式存在,主要使用到Node.x, Node.y 来获取到结点的坐标
+
+ :param nodes: nodes是各个结点的list,包括depot
+ :param path_queue: queue用来存放工作线程计算得到的path,队列中的每一个元素都是一个path,path中存放的是各个结点的id
+ """
+
+ self.nodes = nodes
self.figure = plt.figure(figsize=(10, 10))
self.figure_ax = self.figure.add_subplot(1, 1, 1)
self.path_queue = path_queue
@@ -14,8 +23,8 @@ class VrptwAcoFigure:
def _draw_point(self):
# 先画出图中的点
- self.figure_ax.plot(list(node.x for node in self.graph.nodes),
- list(node.y for node in self.graph.nodes), '%s.' % self._dot_color)
+ self.figure_ax.plot(list(node.x for node in self.nodes),
+ list(node.y for node in self.nodes), '%s.' % self._dot_color)
self.figure.show()
plt.pause(0.5)
@@ -28,7 +37,7 @@ class VrptwAcoFigure:
info = self.path_queue.get()
while not self.path_queue.empty():
info = self.path_queue.get()
- path, distance = info.get_path_info()
+ path, distance, used_vehicle_num = info.get_path_info()
if path is None:
break
@@ -44,7 +53,7 @@ class VrptwAcoFigure:
def _draw_line(self, path):
# 根据path中index进行路劲的绘制
for i in range(1, len(path)):
- x_list = [self.graph.nodes[path[i - 1]].x, self.graph.nodes[path[i]].x]
- y_list = [self.graph.nodes[path[i - 1]].y, self.graph.nodes[path[i]].y]
+ x_list = [self.nodes[path[i - 1]].x, self.nodes[path[i]].x]
+ y_list = [self.nodes[path[i - 1]].y, self.nodes[path[i]].y]
self.figure_ax.plot(x_list, y_list, '%s-' % self._line_color_list[0])
plt.pause(0.05)
diff --git a/vrptw_base.py b/vrptw_base.py
index 645b114..988713e 100644
--- a/vrptw_base.py
+++ b/vrptw_base.py
@@ -32,7 +32,7 @@ class VrptwGraph:
self.rho = rho
# 创建信息素矩阵
- self.nnh_travel_path, self.init_pheromone_val = self.nearest_neighbor_heuristic()
+ self.nnh_travel_path, self.init_pheromone_val, _ = self.nearest_neighbor_heuristic()
self.init_pheromone_val = 1/(self.init_pheromone_val * self.node_num)
self.pheromone_mat = np.ones((self.node_num, self.node_num)) * self.init_pheromone_val
@@ -129,7 +129,9 @@ class VrptwGraph:
# 最后要回到depot
travel_distance += self.node_dist_mat[current_index][0]
travel_path.append(0)
- return travel_path, travel_distance
+
+ vehicle_num = travel_path.count(0)-1
+ return travel_path, travel_distance, vehicle_num
def _cal_nearest_next_index(self, index_to_visit, current_index, current_load, current_time):
"""
@@ -166,6 +168,7 @@ class PathMessage:
def __init__(self, path, distance):
self.path = copy.deepcopy(path)
self.distance = copy.deepcopy(distance)
+ self.used_vehicle_num = self.path.count(0) - 1
def get_path_info(self):
- return self.path, self.distance
+ return self.path, self.distance, self.used_vehicle_num