summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ant.py250
-rw-r--r--main.py85
-rw-r--r--vrptw_base.py318
3 files changed, 357 insertions, 296 deletions
diff --git a/ant.py b/ant.py
new file mode 100644
index 0000000..3f2bdd2
--- /dev/null
+++ b/ant.py
@@ -0,0 +1,250 @@
+import numpy as np
+import copy
+import random
+from vrptw_base import VrptwGraph
+
+
+class Ant:
+ def __init__(self, graph: VrptwGraph, start_index=0):
+ super()
+ self.graph = graph
+ self.current_index = 0
+ self.vehicle_load = 0
+ self.vehicle_travel_time = 0
+ self.travel_path = [start_index]
+ self.arrival_time = [0]
+
+ self.index_to_visit = list(range(graph.node_num))
+ self.index_to_visit.remove(start_index)
+
+ self.total_travel_distance = 0
+
+ def move_to_next_index(self, next_index):
+ # 更新蚂蚁路径
+ self.travel_path.append(next_index)
+ self.total_travel_distance += self.graph.node_dist_mat[self.current_index][next_index]
+
+ dist = self.graph.node_dist_mat[self.current_index][next_index]
+ self.arrival_time.append(self.vehicle_travel_time + dist)
+
+ if self.graph.nodes[next_index].is_depot:
+ # 如果一下个位置为服务器点,则要将车辆负载等清空
+ self.vehicle_load = 0
+ self.vehicle_travel_time = 0
+
+ else:
+ # 更新车辆负载、行驶距离、时间
+ self.vehicle_load += self.graph.nodes[next_index].demand
+ # 如果早于客户要求的时间窗(ready_time),则需要等待
+
+ self.vehicle_travel_time += dist + max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist, 0) + self.graph.nodes[next_index].service_time
+ self.index_to_visit.remove(next_index)
+
+ self.current_index = next_index
+
+ def index_to_visit_empty(self):
+ return len(self.index_to_visit) == 0
+
+ def get_active_vehicles_num(self):
+ return self.travel_path.count(0)-1
+
+ def check_condition(self, next_index) -> bool:
+ """
+ 检查移动到下一个点是否满足约束条件
+ :param next_index:
+ :return:
+ """
+ if self.vehicle_load + self.graph.nodes[next_index].demand > self.graph.vehicle_capacity:
+ return False
+
+ dist = self.graph.node_dist_mat[self.current_index][next_index]
+ wait_time = max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist, 0)
+ service_time = self.graph.nodes[next_index].service_time
+
+ # 检查访问某一个旅客之后,能否回到服务店
+ if self.vehicle_travel_time + dist + wait_time + service_time + self.graph.node_dist_mat[next_index][0] > self.graph.nodes[0].due_time:
+ return False
+
+ # 不可以服务due time之外的旅客
+ if self.vehicle_travel_time + dist > self.graph.nodes[next_index].due_time:
+ return False
+
+ return True
+
+ def cal_next_index_meet_constrains(self):
+ """
+ 找出所有从当前位置(ant.current_index)可达的customer
+ :return:
+ """
+ next_index_meet_constrains = []
+ for next_ind in self.index_to_visit:
+ if self.check_condition(next_ind):
+ next_index_meet_constrains.append(next_ind)
+ return next_index_meet_constrains
+
+ def cal_nearest_next_index(self, next_index_list):
+ """
+ 从待选的customers中选择,离当前位置(ant.current_index)最近的customer
+
+ :param next_index_list:
+ :return:
+ """
+ current_ind = self.current_index
+
+ nearest_ind = next_index_list[0]
+ min_dist = self.graph.node_dist_mat[current_ind][next_index_list[0]]
+
+ for next_ind in next_index_list[1:]:
+ dist = self.graph.node_dist_mat[current_ind][next_ind]
+ if dist < min_dist:
+ min_dist = dist
+ nearest_ind = next_ind
+
+ return nearest_ind
+
+ def cal_total_travel_distance(self, travel_path):
+ distance = 0
+ current_ind = travel_path[0]
+ for next_ind in travel_path[1:]:
+ distance += self.graph.node_dist_mat[current_ind][next_ind]
+ current_ind = next_ind
+ return distance
+
+ def try_insert_on_path(self, node_id):
+ """
+ 尝试性地将node_id插入当前的travel_path中
+ 插入的位置不能违反载重,时间,行驶距离的限制
+ 如果有多个位置,则找出最优的位置
+ :param node_id:
+ :return:
+ """
+ feasible_insert_index = []
+ feasible_distance = []
+
+ path = copy.deepcopy(self.travel_path)
+
+ for insert_index in range(len(path)):
+ if self.graph.nodes[path[insert_index]].is_depot:
+ continue
+
+ front_depot_index = insert_index
+ while front_depot_index >= 0 and not self.graph.nodes[self.travel_path[front_depot_index]].is_depot:
+ front_depot_index -= 1
+ front_depot_index = max(front_depot_index, 0)
+
+ check_ant = Ant(self.graph, path[0])
+
+ # 让check_ant 走过 path中下标从front_depot_index开始到insert_index-1的点
+ for i in range(front_depot_index, insert_index):
+ check_ant.move_to_next_index(path[i])
+
+ # 开始尝试性地对排序后的index_to_visit中的结点进行访问
+ if check_ant.check_condition(node_id):
+ check_ant.move_to_next_index(node_id)
+
+ # 如果可以到node_id,则要保证vehicle可以行驶回到depot
+ for next_ind in path[insert_index:]:
+ if check_ant.check_condition(next_ind):
+ check_ant.move_to_next_index(next_ind)
+ if self.graph.nodes[next_ind].is_depot:
+ feasible_insert_index.append(insert_index)
+ path.insert(insert_index, node_id)
+ feasible_distance.append(self.cal_total_travel_distance(path))
+ # 如果不可以回到depot,则返回上一层
+ else:
+ break
+
+ if len(feasible_distance) == 0:
+ return None
+ else:
+ feasible_distance = np.array(feasible_distance)
+ min_insert_ind = np.argmin(feasible_distance)
+ best_ind = feasible_insert_index[int(min_insert_ind)]
+ return best_ind
+
+ def insertion_procedure(self):
+ """
+ 为每个未访问的结点尝试性地找到一个合适的位置,插入到当前的travel_path
+ 插入的位置不能违反载重,时间,行驶距离的限制
+ :return:
+ """
+ if self.index_to_visit_empty():
+ return
+
+ ind_to_visit = copy.deepcopy(self.index_to_visit)
+
+ demand = np.zeros(len(ind_to_visit))
+ for i in range(len(ind_to_visit)):
+ demand[i] = self.graph.nodes[i].demand
+
+ sorted_ind = np.argsort(demand)
+ ind_to_visit = ind_to_visit[sorted_ind]
+
+ for node_id in ind_to_visit:
+ best_insert_index = self.try_insert_on_path(node_id)
+ if best_insert_index is not None:
+ self.travel_path.insert(best_insert_index, node_id)
+ self.index_to_visit.remove(node_id)
+
+ self.total_travel_distance = self.cal_total_travel_distance(self.travel_path)
+
+ def local_search_procedure(self):
+ """
+ 对当前的已经访问完graph中所有节点的travel_path使用cross进行局部搜索
+ :return:
+ """
+ # 找出path中所有的depot的位置
+ depot_ind = []
+ for ind in range(len(self.travel_path)):
+ if self.graph.nodes[self.travel_path[ind]].is_depot:
+ depot_ind.append(ind)
+
+ new_path_travel_distance = []
+ new_path = []
+ # 将self.travel_path分成多段,每段以depot开始,以depot结束,称为route
+ for i in range(1, len(depot_ind)):
+ for j in range(i+1, len(depot_ind)):
+
+ # 随机在两段route,各随机选择一段customer id,交换这两段customer id
+ start_a = random.randint(depot_ind[i-1]+1, depot_ind[i]-1)
+ end_a = random.randint(depot_ind[i-1]+1, depot_ind[i]-1)
+ if end_a < start_a:
+ start_a, end_a = end_a, start_a
+
+ start_b = random.randint(depot_ind[j-1]+1, depot_ind[j]-1)
+ end_b = random.randint(depot_ind[j - 1] + 1, depot_ind[j] - 1)
+ if end_b < start_b:
+ start_b, end_b = end_b, start_b
+
+ path = []
+ path.extend(self.travel_path[:start_a])
+ path.extend(self.travel_path[start_b:end_b+1])
+ path.extend(self.travel_path[end_a:start_b])
+ path.extend(self.travel_path[start_a:end_a+1])
+ path.extend(self.travel_path[end_b+1:])
+
+ if len(path) != self.travel_path:
+ raise RuntimeError('error')
+
+ # 判断新生成的path是否是可行的
+ check_ant = Ant(self.graph, path[0])
+ for ind in path[1:]:
+ if check_ant.check_condition(ind):
+ check_ant.move_to_next_index(ind)
+ else:
+ break
+ # 如果新生成的path是可行的
+ if check_ant.index_to_visit_empty():
+ # print('success to search')
+ new_path_travel_distance.append(check_ant.total_travel_distance)
+ new_path.append(path)
+
+ # 找出新生成的path中,路程最小的
+ new_path_travel_distance = np.array(new_path_travel_distance)
+ min_distance_ind = np.argmin(new_path_travel_distance)
+ min_distance = new_path_travel_distance[min_distance_ind]
+
+ if min_distance < self.total_travel_distance:
+ return new_path[int(min_distance_ind)]
+ else:
+ return None \ No newline at end of file
diff --git a/main.py b/main.py
index db5b34a..07cc98f 100644
--- a/main.py
+++ b/main.py
@@ -1,11 +1,12 @@
import numpy as np
-from vrptw_base import VrptwGraph, Ant, NearestNeighborHeuristic
import random
from vprtw_aco_figure import VrptwAcoFigure
+from vrptw_base import VrptwGraph
+from ant import Ant
class VrptwAco:
- def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, alpha=1, beta=2, rho=0.1):
+ def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, alpha=1, beta=2):
super()
# graph 结点的位置、服务时间信息
self.graph = graph
@@ -13,24 +14,16 @@ class VrptwAco:
self.ants_num = ants_num
# max_iter 最大迭代次数
self.max_iter = max_iter
+ # vehicle_capacity 表示每辆车的最大载重
+ self.max_load = graph.vehicle_capacity
+ # 信息素强度
+ self.Q = 1
# alpha 信息素信息重要新
self.alpha = alpha
# beta 启发性信息重要性
self.beta = beta
- # rho 信息素挥发速度
- self.rho = rho
# q0 表示直接选择概率最大的下一点的概率
self.q0 = 0.1
- # vehicle_capacity 表示每辆车的最大载重
- self.max_load = graph.vehicle_capacity
- # 信息素强度
- self.Q = 1
- # 创建信息素矩阵
- nn_heuristic = NearestNeighborHeuristic(self.graph)
- self.init_pheromone_val = nn_heuristic.cal_init_pheromone_val()
- self.pheromone_mat = np.ones((self.graph.node_num, self.graph.node_num)) * self.init_pheromone_val
- # 启发式信息矩阵
- self.heuristic_info_mat = 1 / graph.node_dist_mat
# best path
self.best_path_distance = None
self.best_path = None
@@ -64,11 +57,11 @@ class VrptwAco:
# 更新蚂蚁路径
ants[k].move_to_next_index(next_index)
- self.local_update_pheromone(ants[k].current_index, next_index)
+ self.graph.local_update_pheromone(ants[k].current_index, next_index)
# 最终回到0位置
ants[k].move_to_next_index(0)
- self.local_update_pheromone(ants[k].current_index, 0)
+ self.graph.local_update_pheromone(ants[k].current_index, 0)
# 计算所有蚂蚁的路径长度
paths_distance = np.array([ant.total_travel_distance for ant in ants])
@@ -76,22 +69,22 @@ class VrptwAco:
# 记录当前的最佳路径
best_index = np.argmin(paths_distance)
if self.best_path is None:
- self.best_path = ants[best_index].travel_path
+ self.best_path = ants[int(best_index)].travel_path
self.best_path_distance = paths_distance[best_index]
if self.whether_or_not_to_show_figure:
self.figure.init_figure(self.best_path)
elif paths_distance[best_index] < self.best_path_distance:
- self.best_path = ants[best_index].travel_path
+ self.best_path = ants[int(best_index)].travel_path
self.best_path_distance = paths_distance[best_index]
if self.whether_or_not_to_show_figure:
self.figure.update_figure(self.best_path)
print('[iteration %d]: best distance %f' % (iter, self.best_path_distance))
# 更新信息素表
- self.global_update_pheromone()
+ self.graph.global_update_pheromone(self.best_path, self.best_path_distance)
- def select_next_index(self, ant: Ant):
+ def select_next_index(self, ant):
"""
选择下一个结点
:param ant:
@@ -100,8 +93,8 @@ class VrptwAco:
current_index = ant.current_index
index_to_visit = ant.index_to_visit
- transition_prob = np.power(self.pheromone_mat[current_index][index_to_visit], self.alpha) * \
- np.power(self.heuristic_info_mat[current_index][index_to_visit], self.beta)
+ transition_prob = np.power(self.graph.pheromone_mat[current_index][index_to_visit], self.alpha) * \
+ np.power(self.graph.heuristic_info_mat[current_index][index_to_visit], self.beta)
if np.random.rand() < self.q0:
max_prob_index = np.argmax(transition_prob)
@@ -111,22 +104,6 @@ class VrptwAco:
next_index = self.stochastic_accept(index_to_visit, transition_prob)
return next_index
- def local_update_pheromone(self, start_ind, end_ind):
- self.pheromone_mat[start_ind][end_ind] = (1-self.rho) * self.pheromone_mat[start_ind][end_ind] + \
- self.rho * self.init_pheromone_val
-
- def global_update_pheromone(self):
- """
- 更新信息素矩阵
- :return:
- """
- self.pheromone_mat = (1-self.rho) * self.pheromone_mat
-
- current_ind = self.best_path[0]
- for next_ind in self.best_path[1:]:
- self.pheromone_mat[current_ind][next_ind] += self.rho/self.best_path_distance
- current_ind = next_ind
-
def stochastic_accept(self, index_to_visit, transition_prob):
"""
轮盘赌
@@ -148,13 +125,7 @@ class VrptwAco:
if random.random() <= norm_transition_prob[ind]:
return index_to_visit[ind]
- def new_active_ant(self, k: int, local_search: bool, IN):
- # 对graph中的depot复制k-1,使得depot数量等于k
- new_graph = self.graph.construct_graph_with_duplicated_depot(k)
-
- # 从任意一个depot开始
- start_index = random.randint(0, k-1)
- ant = Ant(new_graph, start_index)
+ def new_active_ant(self, ant: Ant, local_search: bool, IN):
# 计算从当前位置可以达到的下一个位置
next_index_meet_constrains = ant.cal_next_index_meet_constrains()
@@ -163,10 +134,10 @@ class VrptwAco:
ready_time = np.zeros(index_num)
due_time = np.zeros(index_num)
for i in range(index_num):
- ready_time[i] = new_graph.nodes[next_index_meet_constrains[i]].ready_time
- due_time[i] = new_graph.nodes[next_index_meet_constrains[i]].due_time
+ ready_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].ready_time
+ due_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].due_time
- delivery_time = np.max(ant.vehicle_travel_time + new_graph.node_dist_mat[ant.current_index][next_index_meet_constrains], ready_time)
+ delivery_time = np.max(ant.vehicle_travel_time + ant.graph.node_dist_mat[ant.current_index][next_index_meet_constrains], ready_time)
delat_time = delivery_time - ant.vehicle_travel_time
distance = delat_time * (due_time - ant.vehicle_travel_time)
distance = np.max(1.0, distance-IN)
@@ -174,12 +145,12 @@ class VrptwAco:
closeness = 1/distance
# 按照概率选择下一个点next_index
- if np.random.rand() < self.q0:
+ if np.random.rand() < ant.graph.q0:
max_prob_index = np.argmax(closeness)
next_index = next_index_meet_constrains[max_prob_index]
else:
# 使用轮盘赌算法
- next_index = self.stochastic_accept(next_index_meet_constrains, closeness)
+ next_index = ant.graph.stochastic_accept(next_index_meet_constrains, closeness)
ant.move_to_next_index(next_index)
# 更新信息素矩阵
@@ -194,7 +165,19 @@ class VrptwAco:
new_path = ant.local_search_procedure()
if new_path is not None:
pass
- print('fuck')
+
+ def acs_time(self, vehicle_num):
+ # how to calculate init_pheromone_val
+ new_graph = self.graph.construct_graph_with_duplicated_depot(vehicle_num, 1)
+
+ # 初始化信息素矩阵
+ while True:
+ for k in range(self.ants_num):
+ ant = Ant(new_graph, random.randint(0, vehicle_num-1))
+ self.new_active_ant(ant, True, 0)
+ # if ant.index_to_visit_empty() and ant.total_travel_distance < global_travel_distance:
+ # send ant.travel_path
+ # pass
if __name__ == '__main__':
diff --git a/vrptw_base.py b/vrptw_base.py
index ae9f232..9413731 100644
--- a/vrptw_base.py
+++ b/vrptw_base.py
@@ -1,6 +1,5 @@
import numpy as np
import copy
-import random
class Node:
@@ -22,15 +21,25 @@ class Node:
class VrptwGraph:
- def __init__(self, file_path):
+ def __init__(self, file_path, rho=0.1):
super()
# node_num 结点个数
# node_dist_mat 节点之间的距离(矩阵)
# pheromone_mat 节点之间路径上的信息度浓度
self.node_num, self.nodes, self.node_dist_mat, self.vehicle_num, self.vehicle_capacity \
= self.create_from_file(file_path)
+ # rho 信息素挥发速度
+ self.rho = rho
+ # 创建信息素矩阵
- def construct_graph_with_duplicated_depot(self, vehicle_num):
+ self.init_pheromone_val = self._nearest_neighbor_heuristic()
+ self.init_pheromone_val = 1/(self.init_pheromone_val * self.node_num)
+
+ self.pheromone_mat = np.ones((self.node_num, self.node_num)) * self.init_pheromone_val
+ # 启发式信息矩阵
+ self.heuristic_info_mat = 1 / self.node_dist_mat
+
+ def construct_graph_with_duplicated_depot(self, vehicle_num, init_pheromone_val):
new_graph = copy.deepcopy(self)
new_graph.node_num += vehicle_num-1
@@ -51,6 +60,12 @@ class VrptwGraph:
original_j = j - vehicle_num + 1
new_graph.node_dist_mat[j][i] = new_graph.node_dist_mat[i][j] = self.node_dist_mat[original_i][original_j]
+ # 启发式信息
+ new_graph.heuristic_info_mat = 1 / new_graph.node_dist_mat
+ # 信息素
+ new_graph.init_pheromone_val = init_pheromone_val
+ new_graph.pheromone_mat = np.ones((self.node_num, self.node_num)) * self.init_pheromone_val
+
return new_graph
def create_from_file(self, file_path):
@@ -84,265 +99,78 @@ class VrptwGraph:
def calculate_dist(node_a, node_b):
return np.linalg.norm((node_a.x - node_b.x, node_a.y - node_b.y))
+ def local_update_pheromone(self, start_ind, end_ind):
+ self.pheromone_mat[start_ind][end_ind] = (1-self.rho) * self.pheromone_mat[start_ind][end_ind] + \
+ self.rho * self.init_pheromone_val
-class Ant:
- def __init__(self, graph: VrptwGraph, start_index=0):
- super()
- self.graph = graph
- self.current_index = 0
- self.vehicle_load = 0
- self.vehicle_travel_time = 0
- self.travel_path = [start_index]
- self.arrival_time = [0]
-
- self.index_to_visit = list(range(graph.node_num))
- self.index_to_visit.remove(start_index)
-
- self.total_travel_distance = 0
-
- def move_to_next_index(self, next_index):
- # 更新蚂蚁路径
- self.travel_path.append(next_index)
- self.total_travel_distance += self.graph.node_dist_mat[self.current_index][next_index]
-
- dist = self.graph.node_dist_mat[self.current_index][next_index]
- self.arrival_time.append(self.vehicle_travel_time + dist)
-
- if self.graph.nodes[next_index].is_depot:
- # 如果一下个位置为服务器点,则要将车辆负载等清空
- self.vehicle_load = 0
- self.vehicle_travel_time = 0
-
- else:
- # 更新车辆负载、行驶距离、时间
- self.vehicle_load += self.graph.nodes[next_index].demand
- # 如果早于客户要求的时间窗(ready_time),则需要等待
-
- self.vehicle_travel_time += dist + max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist, 0) + self.graph.nodes[next_index].service_time
- self.index_to_visit.remove(next_index)
-
- self.current_index = next_index
-
- def index_to_visit_empty(self):
- return len(self.index_to_visit) == 0
-
- def get_active_vehicles_num(self):
- return self.travel_path.count(0)-1
-
- def check_condition(self, next_index) -> bool:
- """
- 检查移动到下一个点是否满足约束条件
- :param next_index:
- :return:
- """
- if self.vehicle_load + self.graph.nodes[next_index].demand > self.graph.vehicle_capacity:
- return False
-
- dist = self.graph.node_dist_mat[self.current_index][next_index]
- wait_time = max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist, 0)
- service_time = self.graph.nodes[next_index].service_time
- # 检查访问某一个旅客之后,能否回到服务店
- if self.vehicle_travel_time + dist + wait_time + service_time + self.graph.node_dist_mat[next_index][0] > self.graph.nodes[0].due_time:
- return False
-
- # 不可以服务due time之外的旅客
- if self.vehicle_travel_time + dist > self.graph.nodes[next_index].due_time:
- return False
-
- return True
-
- def cal_next_index_meet_constrains(self):
+ def global_update_pheromone(self, best_path, best_path_distance):
"""
- 找出所有从当前位置(ant.current_index)可达的customer
+ 更新信息素矩阵
:return:
"""
- next_index_meet_constrains = []
- for next_ind in self.index_to_visit:
- if self.check_condition(next_ind):
- next_index_meet_constrains.append(next_ind)
- return next_index_meet_constrains
+ self.pheromone_mat = (1-self.rho) * self.pheromone_mat
- def cal_nearest_next_index(self, next_index_list):
- """
- 从待选的customers中选择,离当前位置(ant.current_index)最近的customer
+ current_ind = best_path[0]
+ for next_ind in best_path[1:]:
+ self.pheromone_mat[current_ind][next_ind] += self.rho/best_path_distance
+ current_ind = next_ind
- :param next_index_list:
- :return:
- """
- current_ind = self.current_index
+ def _nearest_neighbor_heuristic(self):
+ index_to_visit = list(range(1, self.node_num))
+ current_index = 0
+ current_load = 0
+ current_time = 0
+ travel_distance = 0
+ while len(index_to_visit) > 0:
+ nearest_next_index = self._cal_nearest_next_index(index_to_visit, current_index, current_load, current_time)
+
+ if nearest_next_index is None:
+ travel_distance += self.node_dist_mat[current_index][0]
+
+ current_load = 0
+ current_time = 0
+ current_index = 0
+ else:
+ current_load += self.nodes[nearest_next_index].demand
- nearest_ind = next_index_list[0]
- min_dist = self.graph.node_dist_mat[current_ind][next_index_list[0]]
+ dist = self.node_dist_mat[current_index][nearest_next_index]
+ wait_time = max(self.nodes[nearest_next_index].ready_time - current_time - dist, 0)
+ service_time = self.nodes[nearest_next_index].service_time
- for next_ind in next_index_list[1:]:
- dist = self.graph.node_dist_mat[current_ind][next_ind]
- if dist < min_dist:
- min_dist = dist
- nearest_ind = next_ind
+ current_time += dist + wait_time + service_time
+ index_to_visit.remove(nearest_next_index)
- return nearest_ind
+ travel_distance += self.node_dist_mat[current_index][nearest_next_index]
- def cal_total_travel_distance(self, travel_path):
- distance = 0
- current_ind = travel_path[0]
- for next_ind in travel_path[1:]:
- distance += self.graph.node_dist_mat[current_ind][next_ind]
- current_ind = next_ind
- return distance
+ current_index = nearest_next_index
+ return travel_distance
- def try_insert_on_path(self, node_id):
+ def _cal_nearest_next_index(self, index_to_visit, current_index, current_load, current_time):
"""
- 尝试性地将node_id插入当前的travel_path中
- 插入的位置不能违反载重,时间,行驶距离的限制
- 如果有多个位置,则找出最优的位置
- :param node_id:
+ 找到最近的可达的next_index
+ :param index_to_visit:
:return:
"""
- feasible_insert_index = []
- feasible_distance = []
+ nearest_ind = None
+ nearest_distance = None
- path = copy.deepcopy(self.travel_path)
-
- for insert_index in range(len(path)):
- if self.graph.nodes[path[insert_index]].is_depot:
+ for next_index in index_to_visit:
+ if current_load + self.nodes[next_index].demand > self.vehicle_capacity:
continue
- front_depot_index = insert_index
- while front_depot_index >= 0 and not self.graph.nodes[self.travel_path[front_depot_index]].is_depot:
- front_depot_index -= 1
- front_depot_index = max(front_depot_index, 0)
-
- check_ant = Ant(self.graph, path[0])
-
- # 让check_ant 走过 path中下标从front_depot_index开始到insert_index-1的点
- for i in range(front_depot_index, insert_index):
- check_ant.move_to_next_index(path[i])
-
- # 开始尝试性地对排序后的index_to_visit中的结点进行访问
- if check_ant.check_condition(node_id):
- check_ant.move_to_next_index(node_id)
-
- # 如果可以到node_id,则要保证vehicle可以行驶回到depot
- for next_ind in path[insert_index:]:
- if check_ant.check_condition(next_ind):
- check_ant.move_to_next_index(next_ind)
- if self.graph.nodes[next_ind].is_depot:
- feasible_insert_index.append(insert_index)
- path.insert(insert_index, node_id)
- feasible_distance.append(self.cal_total_travel_distance(path))
- # 如果不可以回到depot,则返回上一层
- else:
- break
-
- if len(feasible_distance) == 0:
- return None
- else:
- feasible_distance = np.array(feasible_distance)
- min_insert_ind = np.argmin(feasible_distance)
- best_ind = feasible_insert_index[int(min_insert_ind)]
- return best_ind
-
- def insertion_procedure(self):
- """
- 为每个未访问的结点尝试性地找到一个合适的位置,插入到当前的travel_path
- 插入的位置不能违反载重,时间,行驶距离的限制
- :return:
- """
- if self.index_to_visit_empty():
- return
-
- ind_to_visit = copy.deepcopy(self.index_to_visit)
-
- demand = np.zeros(len(ind_to_visit))
- for i in range(len(ind_to_visit)):
- demand[i] = self.graph.nodes[i].demand
-
- sorted_ind = np.argsort(demand)
- ind_to_visit = ind_to_visit[sorted_ind]
-
- for node_id in ind_to_visit:
- best_insert_index = self.try_insert_on_path(node_id)
- if best_insert_index is not None:
- self.travel_path.insert(best_insert_index, node_id)
- self.index_to_visit.remove(node_id)
-
- self.total_travel_distance = self.cal_total_travel_distance(self.travel_path)
-
- def local_search_procedure(self):
- depot_ind = []
- for ind in range(len(self.travel_path)):
- if self.graph.nodes[self.travel_path[ind]].is_depot:
- depot_ind.append(ind)
-
- new_path_travel_distance = []
- new_path = []
- for i in range(1, len(depot_ind)):
- for j in range(i+1, len(depot_ind)):
- start_a = random.randint(depot_ind[i-1]+1, depot_ind[i]-1)
- end_a = random.randint(depot_ind[i-1]+1, depot_ind[i]-1)
- if end_a < start_a:
- start_a, end_a = end_a, start_a
-
- start_b = random.randint(depot_ind[j-1]+1, depot_ind[j]-1)
- end_b = random.randint(depot_ind[j - 1] + 1, depot_ind[j] - 1)
- if end_b < start_b:
- start_b, end_b = end_b, start_b
-
- path = []
- path.extend(self.travel_path[:start_a])
- path.extend(self.travel_path[start_b:end_b+1])
- path.extend(self.travel_path[end_a:start_b])
- path.extend(self.travel_path[start_a:end_a+1])
- path.extend(self.travel_path[end_b+1:])
-
- if len(path) != self.travel_path:
- raise RuntimeError('error')
-
- check_ant = Ant(self.graph, path[0])
- for ind in path[1:]:
- if check_ant.check_condition(ind):
- check_ant.move_to_next_index(ind)
- else:
- break
- if check_ant.index_to_visit_empty():
- # print('success to search')
- new_path_travel_distance.append(check_ant.total_travel_distance)
- new_path.append(path)
-
- new_path_travel_distance = np.array(new_path_travel_distance)
- min_distance_ind = np.argmin(new_path_travel_distance)
- min_distance = new_path_travel_distance[min_distance_ind]
-
- if min_distance < self.total_travel_distance:
- return new_path[int(min_distance_ind)]
- else:
- return None
-
-
-class NearestNeighborHeuristic:
- def __init__(self, graph: VrptwGraph):
- self.graph = graph
-
- def construct_path(self):
- """
- 不考虑使用的车辆的数目,调用近邻点算法构造路径
- :return:
- """
- ant = Ant(self.graph)
- while not ant.index_to_visit_empty():
- next_index_meet_constrains = ant.cal_next_index_meet_constrains()
- if len(next_index_meet_constrains) == 0:
- next_index = 0
- else:
- next_index = ant.cal_nearest_next_index(next_index_meet_constrains)
-
- ant.move_to_next_index(next_index)
- ant.move_to_next_index(0)
+ dist = self.node_dist_mat[current_index][next_index]
+ wait_time = max(self.nodes[next_index].ready_time - current_time - dist, 0)
+ service_time = self.nodes[next_index].service_time
+ # 检查访问某一个旅客之后,能否回到服务店
+ if current_time + dist + wait_time + service_time + self.node_dist_mat[next_index][0] > self.nodes[0].due_time:
+ continue
- return ant
+ # 不可以服务due time之外的旅客
+ if current_time + dist > self.nodes[next_index].due_time:
+ continue
- def cal_init_pheromone_val(self):
- ant = self.construct_path()
- init_pheromone = (1 / (self.graph.node_num * ant.total_travel_distance))
- return init_pheromone
+ if nearest_distance is None or self.node_dist_mat[current_index][next_index] < nearest_distance:
+ nearest_distance = self.node_dist_mat[current_index][next_index]
+ nearest_ind = next_index
+ return nearest_ind