summaryrefslogtreecommitdiffstats
path: root/multiple_ant_colony_system.py
diff options
context:
space:
mode:
Diffstat (limited to 'multiple_ant_colony_system.py')
-rw-r--r--multiple_ant_colony_system.py187
1 files changed, 117 insertions, 70 deletions
diff --git a/multiple_ant_colony_system.py b/multiple_ant_colony_system.py
index 751a441..45c896a 100644
--- a/multiple_ant_colony_system.py
+++ b/multiple_ant_colony_system.py
@@ -13,17 +13,30 @@ from multiprocessing import Queue as MPQueue
class MultipleAntColonySystem:
+ """
+ Attributes
+ ----------
+ graph : VrptwGraph
+ ants_num : int
+ max_load : int
+ beta : float
+ Heuristic information importance (relative to pheromones)
+ q0 : float
+ Probability of directly selecting the next point with highest
+ probability ??
+ """
def __init__(self, graph: VrptwGraph, ants_num=10, beta=1, q0=0.1, whether_or_not_to_show_figure=True):
super()
- # graph 结点的位置、服务时间信息
+ # The location and service time information of graph nodes
self.graph = graph
- # ants_num 蚂蚁数量
+ # ants_num number of ants
self.ants_num = ants_num
- # vehicle_capacity 表示每辆车的最大载重
+ # vehicle_capacity represents the maximum load per vehicle
self.max_load = graph.vehicle_capacity
- # beta 启发性信息重要性
+ # beta heuristic information importance
self.beta = beta
- # q0 表示直接选择概率最大的下一点的概率
+ # q0 represents the probability of directly selecting the next point
+ # with the highest probability
self.q0 = q0
# best path
self.best_path_distance = None
@@ -35,7 +48,7 @@ class MultipleAntColonySystem:
@staticmethod
def stochastic_accept(index_to_visit, transition_prob):
"""
- 轮盘赌
+ Roulette
:param index_to_visit: a list of N index (list or tuple)
:param transition_prob:
:return: selected index
@@ -55,41 +68,60 @@ class MultipleAntColonySystem:
return index_to_visit[ind]
@staticmethod
- def new_active_ant(ant: Ant, vehicle_num: int, local_search: bool, IN: np.numarray, q0: float, beta: int, stop_event: Event):
+ def new_active_ant(ant: Ant, vehicle_num: int, local_search: bool,
+ IN: np.numarray, q0: float, beta: int, stop_event: Event):
"""
- 按照指定的vehicle_num在地图上进行探索,所使用的vehicle num不能多于指定的数量,acs_time和acs_vehicle都会使用到这个方法
- 对于acs_time来说,需要访问完所有的结点(路径是可行的),尽量找到travel distance更短的路径
- 对于acs_vehicle来说,所使用的vehicle num会比当前所找到的best path所使用的车辆数少一辆,要使用更少的车辆,尽量去访问结点,如果访问完了所有的结点(路径是可行的),就将通知macs
- :param ant:
- :param vehicle_num:
- :param local_search:
- :param IN:
- :param q0:
- :param beta:
- :param stop_event:
- :return:
+ Explore the map according to the specified vehicle_num. The vehicle num
+ used cannot be more than the specified number. This method is used by
+ both acs_time and acs_vehicle
+
+ For acs_time, it is necessary to visit all nodes (the path is
+ feasible), and try to find a path with a shorter travel distance
+
+ For acs_vehicle, the vehicle num used will be one less vehicle than the
+ number of vehicles used by the currently found best path. To use fewer
+ vehicles, try to visit the nodes. If all nodes are visited (the path is
+ feasible), it will notify macs
+
+ Parameters
+ ----------
+ ant : Ant
+ vehicle_num : int
+ local_search : bool
+ IN : numpy.numarray
+ ??? (variable compartida entre acs_vehicle y acs_time a traves de
+ macs. Pero que info tiene???
+ q0 : float
+ beta : int
+ stop_event : threading.Event
+
+ Returns
+ -------
"""
# print('[new_active_ant]: start, start_index %d' % ant.travel_path[0])
- # 在new_active_ant中,最多可以使用vehicle_num个车,即最多可以包含vehicle_num+1个depot结点,由于出发结点用掉了一个,所以只剩下vehicle个depot
+ # In new_active_ant, up to vehicle_num vehicles can be used, that is, it
+ # can contain up to vehicle_num+1 depot nodes. Since one departure node
+ # is used up, only vehicle depots are left.
unused_depot_count = vehicle_num
- # 如果还有未访问的结点,并且还可以回到depot中
+ # If there are still unvisited nodes, and you can also return to the depot
while not ant.index_to_visit_empty() and unused_depot_count > 0:
if stop_event.is_set():
# print('[new_active_ant]: receive stop event')
return
- # 计算所有满足载重等限制的下一个结点
+ # Calculate all next nodes that satisfy constraints such as load
next_index_meet_constrains = ant.cal_next_index_meet_constrains()
- # 如果没有满足限制的下一个结点,则回到depot中
+ # If there is no next node that meets the limit, go back to the depot
if len(next_index_meet_constrains) == 0:
ant.move_to_next_index(0)
unused_depot_count -= 1
continue
- # 开始计算满足限制的下一个结点,选择各个结点的概率
+ # Start calculating the next node that satisfies the constraints,
+ # and select the probability of each node
length = len(next_index_meet_constrains)
ready_time = np.zeros(length)
due_time = np.zeros(length)
@@ -98,38 +130,39 @@ class MultipleAntColonySystem:
ready_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].ready_time
due_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].due_time
- delivery_time = np.maximum(ant.vehicle_travel_time + ant.graph.node_dist_mat[ant.current_index][next_index_meet_constrains], ready_time)
+ delivery_time = np.maximum(ant.vehicle_travel_time \
+ + ant.graph.node_dist_mat[ant.current_index][next_index_meet_constrains], ready_time)
delta_time = delivery_time - ant.vehicle_travel_time
distance = delta_time * (due_time - ant.vehicle_travel_time)
- distance = np.maximum(1.0, distance-IN[next_index_meet_constrains])
+ distance = np.maximum(1.0, distance - IN[next_index_meet_constrains])
closeness = 1/distance
transition_prob = ant.graph.pheromone_mat[ant.current_index][next_index_meet_constrains] * \
np.power(closeness, beta)
transition_prob = transition_prob / np.sum(transition_prob)
- # 按照概率直接选择closeness最大的结点
+ # Directly select the node with the largest closeness according to the probability
if np.random.rand() < q0:
max_prob_index = np.argmax(transition_prob)
next_index = next_index_meet_constrains[max_prob_index]
else:
- # 使用轮盘赌算法
+ # Use the roulette algorithm
next_index = MultipleAntColonySystem.stochastic_accept(next_index_meet_constrains, transition_prob)
- # 更新信息素矩阵
+ # update pheromone matrix
ant.graph.local_update_pheromone(ant.current_index, next_index)
ant.move_to_next_index(next_index)
- # 如果走完所有的点了,需要回到depot
+ # If you finish all the points, you need to go back to the depot
if ant.index_to_visit_empty():
ant.graph.local_update_pheromone(ant.current_index, 0)
ant.move_to_next_index(0)
- # 对未访问的点进行插入,保证path是可行的
+ # Insert unvisited points to ensure that the path is feasible
ant.insertion_procedure(stop_event)
- # ant.index_to_visit_empty()==True就是feasible的意思
+ # ant.index_to_visit_empty()==True means Feasible
if local_search is True and ant.index_to_visit_empty():
ant.local_search_procedure(stop_event)
@@ -137,7 +170,8 @@ class MultipleAntColonySystem:
def acs_time(new_graph: VrptwGraph, vehicle_num: int, ants_num: int, q0: float, beta: int,
global_path_queue: Queue, path_found_queue: Queue, stop_event: Event):
"""
- 对于acs_time来说,需要访问完所有的结点(路径是可行的),尽量找到travel distance更短的路径
+ For acs_time, it is necessary to visit all nodes (the path is feasible),
+ and try to find a path with a shorter travel distance
:param new_graph:
:param vehicle_num:
:param ants_num:
@@ -216,16 +250,17 @@ class MultipleAntColonySystem:
def acs_vehicle(new_graph: VrptwGraph, vehicle_num: int, ants_num: int, q0: float, beta: int,
global_path_queue: Queue, path_found_queue: Queue, stop_event: Event):
"""
- 对于acs_vehicle来说,所使用的vehicle num会比当前所找到的best path所使用的车辆数少一辆,要使用更少的车辆,尽量去访问结点,如果访问完了所有的结点(路径是可行的),就将通知macs
- :param new_graph:
- :param vehicle_num:
- :param ants_num:
- :param q0:
- :param beta:
- :param global_path_queue:
- :param path_found_queue:
- :param stop_event:
- :return:
+ For acs_vehicle, the vehicle num used will be one less vehicle than the
+ number of vehicles used by the currently found best path. To use fewer
+ vehicles, try to visit the nodes. If all nodes are visited (the path is
+ feasible), it will notify macs
+
+ Parameters
+ ----------
+ new_graph : VrptwGraph
+ global_path_queue : queue.Queue
+ path_found_queue : queue.Queue
+ stop_event : threading.Event
"""
# vehicle_num设置为比当前的best_path少一个
print('[acs_vehicle]: start, vehicle_num %d' % vehicle_num)
@@ -253,13 +288,14 @@ class MultipleAntColonySystem:
for k in range(ants_num):
ant = Ant(new_graph, 0)
- thread = ants_pool.submit(MultipleAntColonySystem.new_active_ant, ant, vehicle_num, False, IN, q0,
+ thread = ants_pool.submit(MultipleAntColonySystem.new_active_ant,
+ ant, vehicle_num, False, IN, q0,
beta, stop_event)
ants_thread.append(thread)
ants.append(ant)
- # 这里可以使用result方法,等待线程跑完
+ # Here you can use the result method to wait for the thread to finish running
for thread in ants_thread:
thread.result()
@@ -271,20 +307,21 @@ class MultipleAntColonySystem:
IN[ant.index_to_visit] = IN[ant.index_to_visit]+1
- # 蚂蚁找出来的路径与current_path进行比较,是否能使用vehicle_num辆车访问到更多的结点
+ # Compare the path found by the ants with the current_path,
+ # whether it can use vehicle_num vehicles to visit more nodes
if len(ant.index_to_visit) < len(current_index_to_visit):
current_path = copy.deepcopy(ant.travel_path)
current_index_to_visit = copy.deepcopy(ant.index_to_visit)
current_path_distance = ant.total_travel_distance
- # 并且将IN设置为0
+ # and set IN to 0
IN = np.zeros(new_graph.node_num)
- # 如果这一条路径是feasible的话,就要发到macs_vrptw中
+ # If this path is feasible, it should be sent to macs_vrptw
if ant.index_to_visit_empty():
print('[acs_vehicle]: found a feasible path, send path info to macs')
path_found_queue.put(PathMessage(ant.travel_path, ant.total_travel_distance))
- # 更新new_graph中的信息素,global
+ # Update pheromone in new_graph, global
new_graph.global_update_pheromone(current_path, current_path_distance)
if not global_path_queue.empty():
@@ -304,14 +341,13 @@ class MultipleAntColonySystem:
def run_multiple_ant_colony_system(self, file_to_write_path=None):
"""
- 开启另外的线程来跑multiple_ant_colony_system, 使用主线程来绘图
- :return:
+ Start another thread to run multiple_ant_colony_system, use the main thread for drawing
"""
path_queue_for_figure = MPQueue()
multiple_ant_colony_system_thread = Process(target=self._multiple_ant_colony_system, args=(path_queue_for_figure, file_to_write_path, ))
multiple_ant_colony_system_thread.start()
- # 是否要展示figure
+ # Whether to show figure
if self.whether_or_not_to_show_figure:
figure = VrptwAcoFigure(self.graph.nodes, path_queue_for_figure)
figure.run()
@@ -319,7 +355,7 @@ class MultipleAntColonySystem:
def _multiple_ant_colony_system(self, path_queue_for_figure: MPQueue, file_to_write_path=None):
"""
- 调用acs_time 和 acs_vehicle进行路径的探索
+ Call acs_time and acs_vehicle for path exploration
:param path_queue_for_figure:
:return:
"""
@@ -330,14 +366,18 @@ class MultipleAntColonySystem:
start_time_total = time.time()
- # 在这里需要两个队列,time_what_to_do、vehicle_what_to_do, 用来告诉acs_time、acs_vehicle这两个线程,当前的best path是什么,或者让他们停止计算
+ # Two queues are needed here, time_what_to_do, vehicle_what_to_do, to
+ # tell the two threads acs_time and acs_vehicle what the current best
+ # path is, or let them stop computing
global_path_to_acs_time = Queue()
global_path_to_acs_vehicle = Queue()
- # 另外的一个队列, path_found_queue就是接收acs_time 和acs_vehicle计算出来的比best path还要好的feasible path
+ # Another queue, path_found_queue, is to receive acs_time and
+ # acs_vehicle calculated by acs_vehicle that is even better than the
+ # best path. Feasible path
path_found_queue = Queue()
- # 使用近邻点算法初始化
+ # Initialize using the nearest neighbor algorithm
self.best_path, self.best_path_distance, self.best_vehicle_num = self.graph.nearest_neighbor_heuristic()
path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
@@ -345,25 +385,28 @@ class MultipleAntColonySystem:
print('[multiple_ant_colony_system]: new iteration')
start_time_found_improved_solution = time.time()
- # 当前best path的信息,放在queue中以通知acs_time和acs_vehicle当前的best_path是什么
+ # The information of the current best path is placed in the queue to
+ # inform acs_time and acs_vehicle what the current best_path is
global_path_to_acs_vehicle.put(PathMessage(self.best_path, self.best_path_distance))
global_path_to_acs_time.put(PathMessage(self.best_path, self.best_path_distance))
stop_event = Event()
- # acs_vehicle,尝试以self.best_vehicle_num-1辆车去探索,访问更多的结点
+ # acs_vehicle, try to explore with self.best_vehicle_num-1 vehicles, visit more nodes
graph_for_acs_vehicle = self.graph.copy(self.graph.init_pheromone_val)
acs_vehicle_thread = Thread(target=MultipleAntColonySystem.acs_vehicle,
args=(graph_for_acs_vehicle, self.best_vehicle_num-1, self.ants_num, self.q0,
self.beta, global_path_to_acs_vehicle, path_found_queue, stop_event))
- # acs_time 尝试以self.best_vehicle_num辆车去探索,找到更短的路径
+ # acs_time try to explore with self.best_vehicle_num vehicles to find a shorter path
graph_for_acs_time = self.graph.copy(self.graph.init_pheromone_val)
acs_time_thread = Thread(target=MultipleAntColonySystem.acs_time,
args=(graph_for_acs_time, self.best_vehicle_num, self.ants_num, self.q0, self.beta,
global_path_to_acs_time, path_found_queue, stop_event))
- # 启动acs_vehicle_thread和acs_time_thread,当他们找到feasible、且是比best path好的路径时,就会发送到macs中来
+ # Start acs_vehicle_thread and acs_time_thread, when they find a
+ # feasible and better path than the best path, they will be sent to
+ # macs
print('[macs]: start acs_vehicle and acs_time')
acs_vehicle_thread.start()
acs_time_thread.start()
@@ -372,7 +415,7 @@ class MultipleAntColonySystem:
while acs_vehicle_thread.is_alive() and acs_time_thread.is_alive():
- # 如果在指定时间内没有搜索到更好的结果,则退出程序
+ # Exit the program if no better results are found within the specified time
given_time = 10
if time.time() - start_time_found_improved_solution > 60 * given_time:
stop_event.set()
@@ -384,7 +427,7 @@ class MultipleAntColonySystem:
self.print_and_write_in_file(file_to_write, 'best path distance is %f, best vehicle_num is %d' % (self.best_path_distance, self.best_vehicle_num))
self.print_and_write_in_file(file_to_write, '*' * 50)
- # 传入None作为结束标志
+ # Pass in None as the end flag
if self.whether_or_not_to_show_figure:
path_queue_for_figure.put(PathMessage(None, None))
@@ -408,10 +451,11 @@ class MultipleAntColonySystem:
if vehicle_num < found_path_used_vehicle_num:
found_path, found_path_distance, found_path_used_vehicle_num = path, distance, vehicle_num
- # 如果找到的路径(which is feasible)的距离更短,则更新当前的最佳path的信息
+ # If the distance of the found path (which is feasible) is
+ # shorter, update the current best path information
if found_path_distance < self.best_path_distance:
- # 搜索到更好的结果,更新start_time
+ # Better search results, update start_time
start_time_found_improved_solution = time.time()
self.print_and_write_in_file(file_to_write, '*' * 50)
@@ -425,19 +469,21 @@ class MultipleAntColonySystem:
self.best_vehicle_num = found_path_used_vehicle_num
self.best_path_distance = found_path_distance
- # 如果需要绘制图形,则要找到的best path发送给绘图程序
+ # If you need to draw graphics, send the best path to be found to the drawing program
if self.whether_or_not_to_show_figure:
path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
- # 通知acs_vehicle和acs_time两个线程,当前找到的best_path和best_path_distance
+ # Notify the two threads of acs_vehicle and acs_time, the
+ # currently found best_path and best_path_distance
global_path_to_acs_vehicle.put(PathMessage(self.best_path, self.best_path_distance))
global_path_to_acs_time.put(PathMessage(self.best_path, self.best_path_distance))
- # 如果,这两个线程找到的路径用的车辆更少了,就停止这两个线程,开始下一轮迭代
- # 向acs_time和acs_vehicle中发送停止信息
+ # If the paths found by the two threads use fewer vehicles, stop
+ # the two threads and start the next iteration
+ # Send stop messages to acs_time and acs_vehicle
if found_path_used_vehicle_num < best_vehicle_num:
- # 搜索到更好的结果,更新start_time
+ # Better search results, update start_time
start_time_found_improved_solution = time.time()
self.print_and_write_in_file(file_to_write, '*' * 50)
self.print_and_write_in_file(file_to_write, '[macs]: vehicle num of found path (%d) better than best path\'s (%d), found path distance is %f'
@@ -454,9 +500,10 @@ class MultipleAntColonySystem:
if self.whether_or_not_to_show_figure:
path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
- # 停止acs_time 和 acs_vehicle 两个线程
+ # Stop the acs_time and acs_vehicle threads
print('[macs]: send stop info to acs_time and acs_vehicle')
- # 通知acs_vehicle和acs_time两个线程,当前找到的best_path和best_path_distance
+ # Notify the two threads of acs_vehicle and acs_time, the
+ # currently found best_path and best_path_distance
stop_event.set()
@staticmethod