From 5d06dfde0602def1d39afd6907bc80d4b19b97af Mon Sep 17 00:00:00 2001 From: JonZhao <1044264932@qq.com> Date: Sun, 26 May 2019 21:02:29 +0800 Subject: =?UTF-8?q?1.=20=E5=B0=86=E4=BF=A1=E6=81=AF=E7=B4=A0=E6=94=BE?= =?UTF-8?q?=E5=9C=A8graph=E4=B8=AD=202.=20=E5=88=A0=E9=99=A4=20class=20Nea?= =?UTF-8?q?restNeighborHeuristic=203.=20=E5=9C=A8graph=E4=B8=AD=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=20nearest=5Fneighbor=5Fheuristic=E6=96=B9=E6=B3=95=20?= =?UTF-8?q?4.=20=E5=B0=86Ant=20class=E7=A7=BB=E5=8A=A8=E5=88=B0=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E6=96=B0=E6=96=87=E4=BB=B6=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ant.py | 250 +++++++++++++++++++++++++++++++++++++++++++++ main.py | 85 +++++++--------- vrptw_base.py | 318 ++++++++++++++-------------------------------------------- 3 files changed, 357 insertions(+), 296 deletions(-) create mode 100644 ant.py diff --git a/ant.py b/ant.py new file mode 100644 index 0000000..3f2bdd2 --- /dev/null +++ b/ant.py @@ -0,0 +1,250 @@ +import numpy as np +import copy +import random +from vrptw_base import VrptwGraph + + +class Ant: + def __init__(self, graph: VrptwGraph, start_index=0): + super() + self.graph = graph + self.current_index = 0 + self.vehicle_load = 0 + self.vehicle_travel_time = 0 + self.travel_path = [start_index] + self.arrival_time = [0] + + self.index_to_visit = list(range(graph.node_num)) + self.index_to_visit.remove(start_index) + + self.total_travel_distance = 0 + + def move_to_next_index(self, next_index): + # 更新蚂蚁路径 + self.travel_path.append(next_index) + self.total_travel_distance += self.graph.node_dist_mat[self.current_index][next_index] + + dist = self.graph.node_dist_mat[self.current_index][next_index] + self.arrival_time.append(self.vehicle_travel_time + dist) + + if self.graph.nodes[next_index].is_depot: + # 如果一下个位置为服务器点,则要将车辆负载等清空 + self.vehicle_load = 0 + self.vehicle_travel_time = 0 + + else: + # 更新车辆负载、行驶距离、时间 + self.vehicle_load += self.graph.nodes[next_index].demand + # 如果早于客户要求的时间窗(ready_time),则需要等待 + + self.vehicle_travel_time += dist + max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist, 0) + self.graph.nodes[next_index].service_time + self.index_to_visit.remove(next_index) + + self.current_index = next_index + + def index_to_visit_empty(self): + return len(self.index_to_visit) == 0 + + def get_active_vehicles_num(self): + return self.travel_path.count(0)-1 + + def check_condition(self, next_index) -> bool: + """ + 检查移动到下一个点是否满足约束条件 + :param next_index: + :return: + """ + if self.vehicle_load + self.graph.nodes[next_index].demand > self.graph.vehicle_capacity: + return False + + dist = self.graph.node_dist_mat[self.current_index][next_index] + wait_time = max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist, 0) + service_time = self.graph.nodes[next_index].service_time + + # 检查访问某一个旅客之后,能否回到服务店 + if self.vehicle_travel_time + dist + wait_time + service_time + self.graph.node_dist_mat[next_index][0] > self.graph.nodes[0].due_time: + return False + + # 不可以服务due time之外的旅客 + if self.vehicle_travel_time + dist > self.graph.nodes[next_index].due_time: + return False + + return True + + def cal_next_index_meet_constrains(self): + """ + 找出所有从当前位置(ant.current_index)可达的customer + :return: + """ + next_index_meet_constrains = [] + for next_ind in self.index_to_visit: + if self.check_condition(next_ind): + next_index_meet_constrains.append(next_ind) + return next_index_meet_constrains + + def cal_nearest_next_index(self, next_index_list): + """ + 从待选的customers中选择,离当前位置(ant.current_index)最近的customer + + :param next_index_list: + :return: + """ + current_ind = self.current_index + + nearest_ind = next_index_list[0] + min_dist = self.graph.node_dist_mat[current_ind][next_index_list[0]] + + for next_ind in next_index_list[1:]: + dist = self.graph.node_dist_mat[current_ind][next_ind] + if dist < min_dist: + min_dist = dist + nearest_ind = next_ind + + return nearest_ind + + def cal_total_travel_distance(self, travel_path): + distance = 0 + current_ind = travel_path[0] + for next_ind in travel_path[1:]: + distance += self.graph.node_dist_mat[current_ind][next_ind] + current_ind = next_ind + return distance + + def try_insert_on_path(self, node_id): + """ + 尝试性地将node_id插入当前的travel_path中 + 插入的位置不能违反载重,时间,行驶距离的限制 + 如果有多个位置,则找出最优的位置 + :param node_id: + :return: + """ + feasible_insert_index = [] + feasible_distance = [] + + path = copy.deepcopy(self.travel_path) + + for insert_index in range(len(path)): + if self.graph.nodes[path[insert_index]].is_depot: + continue + + front_depot_index = insert_index + while front_depot_index >= 0 and not self.graph.nodes[self.travel_path[front_depot_index]].is_depot: + front_depot_index -= 1 + front_depot_index = max(front_depot_index, 0) + + check_ant = Ant(self.graph, path[0]) + + # 让check_ant 走过 path中下标从front_depot_index开始到insert_index-1的点 + for i in range(front_depot_index, insert_index): + check_ant.move_to_next_index(path[i]) + + # 开始尝试性地对排序后的index_to_visit中的结点进行访问 + if check_ant.check_condition(node_id): + check_ant.move_to_next_index(node_id) + + # 如果可以到node_id,则要保证vehicle可以行驶回到depot + for next_ind in path[insert_index:]: + if check_ant.check_condition(next_ind): + check_ant.move_to_next_index(next_ind) + if self.graph.nodes[next_ind].is_depot: + feasible_insert_index.append(insert_index) + path.insert(insert_index, node_id) + feasible_distance.append(self.cal_total_travel_distance(path)) + # 如果不可以回到depot,则返回上一层 + else: + break + + if len(feasible_distance) == 0: + return None + else: + feasible_distance = np.array(feasible_distance) + min_insert_ind = np.argmin(feasible_distance) + best_ind = feasible_insert_index[int(min_insert_ind)] + return best_ind + + def insertion_procedure(self): + """ + 为每个未访问的结点尝试性地找到一个合适的位置,插入到当前的travel_path + 插入的位置不能违反载重,时间,行驶距离的限制 + :return: + """ + if self.index_to_visit_empty(): + return + + ind_to_visit = copy.deepcopy(self.index_to_visit) + + demand = np.zeros(len(ind_to_visit)) + for i in range(len(ind_to_visit)): + demand[i] = self.graph.nodes[i].demand + + sorted_ind = np.argsort(demand) + ind_to_visit = ind_to_visit[sorted_ind] + + for node_id in ind_to_visit: + best_insert_index = self.try_insert_on_path(node_id) + if best_insert_index is not None: + self.travel_path.insert(best_insert_index, node_id) + self.index_to_visit.remove(node_id) + + self.total_travel_distance = self.cal_total_travel_distance(self.travel_path) + + def local_search_procedure(self): + """ + 对当前的已经访问完graph中所有节点的travel_path使用cross进行局部搜索 + :return: + """ + # 找出path中所有的depot的位置 + depot_ind = [] + for ind in range(len(self.travel_path)): + if self.graph.nodes[self.travel_path[ind]].is_depot: + depot_ind.append(ind) + + new_path_travel_distance = [] + new_path = [] + # 将self.travel_path分成多段,每段以depot开始,以depot结束,称为route + for i in range(1, len(depot_ind)): + for j in range(i+1, len(depot_ind)): + + # 随机在两段route,各随机选择一段customer id,交换这两段customer id + start_a = random.randint(depot_ind[i-1]+1, depot_ind[i]-1) + end_a = random.randint(depot_ind[i-1]+1, depot_ind[i]-1) + if end_a < start_a: + start_a, end_a = end_a, start_a + + start_b = random.randint(depot_ind[j-1]+1, depot_ind[j]-1) + end_b = random.randint(depot_ind[j - 1] + 1, depot_ind[j] - 1) + if end_b < start_b: + start_b, end_b = end_b, start_b + + path = [] + path.extend(self.travel_path[:start_a]) + path.extend(self.travel_path[start_b:end_b+1]) + path.extend(self.travel_path[end_a:start_b]) + path.extend(self.travel_path[start_a:end_a+1]) + path.extend(self.travel_path[end_b+1:]) + + if len(path) != self.travel_path: + raise RuntimeError('error') + + # 判断新生成的path是否是可行的 + check_ant = Ant(self.graph, path[0]) + for ind in path[1:]: + if check_ant.check_condition(ind): + check_ant.move_to_next_index(ind) + else: + break + # 如果新生成的path是可行的 + if check_ant.index_to_visit_empty(): + # print('success to search') + new_path_travel_distance.append(check_ant.total_travel_distance) + new_path.append(path) + + # 找出新生成的path中,路程最小的 + new_path_travel_distance = np.array(new_path_travel_distance) + min_distance_ind = np.argmin(new_path_travel_distance) + min_distance = new_path_travel_distance[min_distance_ind] + + if min_distance < self.total_travel_distance: + return new_path[int(min_distance_ind)] + else: + return None \ No newline at end of file diff --git a/main.py b/main.py index db5b34a..07cc98f 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,12 @@ import numpy as np -from vrptw_base import VrptwGraph, Ant, NearestNeighborHeuristic import random from vprtw_aco_figure import VrptwAcoFigure +from vrptw_base import VrptwGraph +from ant import Ant class VrptwAco: - def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, alpha=1, beta=2, rho=0.1): + def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, alpha=1, beta=2): super() # graph 结点的位置、服务时间信息 self.graph = graph @@ -13,24 +14,16 @@ class VrptwAco: self.ants_num = ants_num # max_iter 最大迭代次数 self.max_iter = max_iter + # vehicle_capacity 表示每辆车的最大载重 + self.max_load = graph.vehicle_capacity + # 信息素强度 + self.Q = 1 # alpha 信息素信息重要新 self.alpha = alpha # beta 启发性信息重要性 self.beta = beta - # rho 信息素挥发速度 - self.rho = rho # q0 表示直接选择概率最大的下一点的概率 self.q0 = 0.1 - # vehicle_capacity 表示每辆车的最大载重 - self.max_load = graph.vehicle_capacity - # 信息素强度 - self.Q = 1 - # 创建信息素矩阵 - nn_heuristic = NearestNeighborHeuristic(self.graph) - self.init_pheromone_val = nn_heuristic.cal_init_pheromone_val() - self.pheromone_mat = np.ones((self.graph.node_num, self.graph.node_num)) * self.init_pheromone_val - # 启发式信息矩阵 - self.heuristic_info_mat = 1 / graph.node_dist_mat # best path self.best_path_distance = None self.best_path = None @@ -64,11 +57,11 @@ class VrptwAco: # 更新蚂蚁路径 ants[k].move_to_next_index(next_index) - self.local_update_pheromone(ants[k].current_index, next_index) + self.graph.local_update_pheromone(ants[k].current_index, next_index) # 最终回到0位置 ants[k].move_to_next_index(0) - self.local_update_pheromone(ants[k].current_index, 0) + self.graph.local_update_pheromone(ants[k].current_index, 0) # 计算所有蚂蚁的路径长度 paths_distance = np.array([ant.total_travel_distance for ant in ants]) @@ -76,22 +69,22 @@ class VrptwAco: # 记录当前的最佳路径 best_index = np.argmin(paths_distance) if self.best_path is None: - self.best_path = ants[best_index].travel_path + self.best_path = ants[int(best_index)].travel_path self.best_path_distance = paths_distance[best_index] if self.whether_or_not_to_show_figure: self.figure.init_figure(self.best_path) elif paths_distance[best_index] < self.best_path_distance: - self.best_path = ants[best_index].travel_path + self.best_path = ants[int(best_index)].travel_path self.best_path_distance = paths_distance[best_index] if self.whether_or_not_to_show_figure: self.figure.update_figure(self.best_path) print('[iteration %d]: best distance %f' % (iter, self.best_path_distance)) # 更新信息素表 - self.global_update_pheromone() + self.graph.global_update_pheromone(self.best_path, self.best_path_distance) - def select_next_index(self, ant: Ant): + def select_next_index(self, ant): """ 选择下一个结点 :param ant: @@ -100,8 +93,8 @@ class VrptwAco: current_index = ant.current_index index_to_visit = ant.index_to_visit - transition_prob = np.power(self.pheromone_mat[current_index][index_to_visit], self.alpha) * \ - np.power(self.heuristic_info_mat[current_index][index_to_visit], self.beta) + transition_prob = np.power(self.graph.pheromone_mat[current_index][index_to_visit], self.alpha) * \ + np.power(self.graph.heuristic_info_mat[current_index][index_to_visit], self.beta) if np.random.rand() < self.q0: max_prob_index = np.argmax(transition_prob) @@ -111,22 +104,6 @@ class VrptwAco: next_index = self.stochastic_accept(index_to_visit, transition_prob) return next_index - def local_update_pheromone(self, start_ind, end_ind): - self.pheromone_mat[start_ind][end_ind] = (1-self.rho) * self.pheromone_mat[start_ind][end_ind] + \ - self.rho * self.init_pheromone_val - - def global_update_pheromone(self): - """ - 更新信息素矩阵 - :return: - """ - self.pheromone_mat = (1-self.rho) * self.pheromone_mat - - current_ind = self.best_path[0] - for next_ind in self.best_path[1:]: - self.pheromone_mat[current_ind][next_ind] += self.rho/self.best_path_distance - current_ind = next_ind - def stochastic_accept(self, index_to_visit, transition_prob): """ 轮盘赌 @@ -148,13 +125,7 @@ class VrptwAco: if random.random() <= norm_transition_prob[ind]: return index_to_visit[ind] - def new_active_ant(self, k: int, local_search: bool, IN): - # 对graph中的depot复制k-1,使得depot数量等于k - new_graph = self.graph.construct_graph_with_duplicated_depot(k) - - # 从任意一个depot开始 - start_index = random.randint(0, k-1) - ant = Ant(new_graph, start_index) + def new_active_ant(self, ant: Ant, local_search: bool, IN): # 计算从当前位置可以达到的下一个位置 next_index_meet_constrains = ant.cal_next_index_meet_constrains() @@ -163,10 +134,10 @@ class VrptwAco: ready_time = np.zeros(index_num) due_time = np.zeros(index_num) for i in range(index_num): - ready_time[i] = new_graph.nodes[next_index_meet_constrains[i]].ready_time - due_time[i] = new_graph.nodes[next_index_meet_constrains[i]].due_time + ready_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].ready_time + due_time[i] = ant.graph.nodes[next_index_meet_constrains[i]].due_time - delivery_time = np.max(ant.vehicle_travel_time + new_graph.node_dist_mat[ant.current_index][next_index_meet_constrains], ready_time) + delivery_time = np.max(ant.vehicle_travel_time + ant.graph.node_dist_mat[ant.current_index][next_index_meet_constrains], ready_time) delat_time = delivery_time - ant.vehicle_travel_time distance = delat_time * (due_time - ant.vehicle_travel_time) distance = np.max(1.0, distance-IN) @@ -174,12 +145,12 @@ class VrptwAco: closeness = 1/distance # 按照概率选择下一个点next_index - if np.random.rand() < self.q0: + if np.random.rand() < ant.graph.q0: max_prob_index = np.argmax(closeness) next_index = next_index_meet_constrains[max_prob_index] else: # 使用轮盘赌算法 - next_index = self.stochastic_accept(next_index_meet_constrains, closeness) + next_index = ant.graph.stochastic_accept(next_index_meet_constrains, closeness) ant.move_to_next_index(next_index) # 更新信息素矩阵 @@ -194,7 +165,19 @@ class VrptwAco: new_path = ant.local_search_procedure() if new_path is not None: pass - print('fuck') + + def acs_time(self, vehicle_num): + # how to calculate init_pheromone_val + new_graph = self.graph.construct_graph_with_duplicated_depot(vehicle_num, 1) + + # 初始化信息素矩阵 + while True: + for k in range(self.ants_num): + ant = Ant(new_graph, random.randint(0, vehicle_num-1)) + self.new_active_ant(ant, True, 0) + # if ant.index_to_visit_empty() and ant.total_travel_distance < global_travel_distance: + # send ant.travel_path + # pass if __name__ == '__main__': diff --git a/vrptw_base.py b/vrptw_base.py index ae9f232..9413731 100644 --- a/vrptw_base.py +++ b/vrptw_base.py @@ -1,6 +1,5 @@ import numpy as np import copy -import random class Node: @@ -22,15 +21,25 @@ class Node: class VrptwGraph: - def __init__(self, file_path): + def __init__(self, file_path, rho=0.1): super() # node_num 结点个数 # node_dist_mat 节点之间的距离(矩阵) # pheromone_mat 节点之间路径上的信息度浓度 self.node_num, self.nodes, self.node_dist_mat, self.vehicle_num, self.vehicle_capacity \ = self.create_from_file(file_path) + # rho 信息素挥发速度 + self.rho = rho + # 创建信息素矩阵 - def construct_graph_with_duplicated_depot(self, vehicle_num): + self.init_pheromone_val = self._nearest_neighbor_heuristic() + self.init_pheromone_val = 1/(self.init_pheromone_val * self.node_num) + + self.pheromone_mat = np.ones((self.node_num, self.node_num)) * self.init_pheromone_val + # 启发式信息矩阵 + self.heuristic_info_mat = 1 / self.node_dist_mat + + def construct_graph_with_duplicated_depot(self, vehicle_num, init_pheromone_val): new_graph = copy.deepcopy(self) new_graph.node_num += vehicle_num-1 @@ -51,6 +60,12 @@ class VrptwGraph: original_j = j - vehicle_num + 1 new_graph.node_dist_mat[j][i] = new_graph.node_dist_mat[i][j] = self.node_dist_mat[original_i][original_j] + # 启发式信息 + new_graph.heuristic_info_mat = 1 / new_graph.node_dist_mat + # 信息素 + new_graph.init_pheromone_val = init_pheromone_val + new_graph.pheromone_mat = np.ones((self.node_num, self.node_num)) * self.init_pheromone_val + return new_graph def create_from_file(self, file_path): @@ -84,265 +99,78 @@ class VrptwGraph: def calculate_dist(node_a, node_b): return np.linalg.norm((node_a.x - node_b.x, node_a.y - node_b.y)) + def local_update_pheromone(self, start_ind, end_ind): + self.pheromone_mat[start_ind][end_ind] = (1-self.rho) * self.pheromone_mat[start_ind][end_ind] + \ + self.rho * self.init_pheromone_val -class Ant: - def __init__(self, graph: VrptwGraph, start_index=0): - super() - self.graph = graph - self.current_index = 0 - self.vehicle_load = 0 - self.vehicle_travel_time = 0 - self.travel_path = [start_index] - self.arrival_time = [0] - - self.index_to_visit = list(range(graph.node_num)) - self.index_to_visit.remove(start_index) - - self.total_travel_distance = 0 - - def move_to_next_index(self, next_index): - # 更新蚂蚁路径 - self.travel_path.append(next_index) - self.total_travel_distance += self.graph.node_dist_mat[self.current_index][next_index] - - dist = self.graph.node_dist_mat[self.current_index][next_index] - self.arrival_time.append(self.vehicle_travel_time + dist) - - if self.graph.nodes[next_index].is_depot: - # 如果一下个位置为服务器点,则要将车辆负载等清空 - self.vehicle_load = 0 - self.vehicle_travel_time = 0 - - else: - # 更新车辆负载、行驶距离、时间 - self.vehicle_load += self.graph.nodes[next_index].demand - # 如果早于客户要求的时间窗(ready_time),则需要等待 - - self.vehicle_travel_time += dist + max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist, 0) + self.graph.nodes[next_index].service_time - self.index_to_visit.remove(next_index) - - self.current_index = next_index - - def index_to_visit_empty(self): - return len(self.index_to_visit) == 0 - - def get_active_vehicles_num(self): - return self.travel_path.count(0)-1 - - def check_condition(self, next_index) -> bool: - """ - 检查移动到下一个点是否满足约束条件 - :param next_index: - :return: - """ - if self.vehicle_load + self.graph.nodes[next_index].demand > self.graph.vehicle_capacity: - return False - - dist = self.graph.node_dist_mat[self.current_index][next_index] - wait_time = max(self.graph.nodes[next_index].ready_time - self.vehicle_travel_time - dist, 0) - service_time = self.graph.nodes[next_index].service_time - # 检查访问某一个旅客之后,能否回到服务店 - if self.vehicle_travel_time + dist + wait_time + service_time + self.graph.node_dist_mat[next_index][0] > self.graph.nodes[0].due_time: - return False - - # 不可以服务due time之外的旅客 - if self.vehicle_travel_time + dist > self.graph.nodes[next_index].due_time: - return False - - return True - - def cal_next_index_meet_constrains(self): + def global_update_pheromone(self, best_path, best_path_distance): """ - 找出所有从当前位置(ant.current_index)可达的customer + 更新信息素矩阵 :return: """ - next_index_meet_constrains = [] - for next_ind in self.index_to_visit: - if self.check_condition(next_ind): - next_index_meet_constrains.append(next_ind) - return next_index_meet_constrains + self.pheromone_mat = (1-self.rho) * self.pheromone_mat - def cal_nearest_next_index(self, next_index_list): - """ - 从待选的customers中选择,离当前位置(ant.current_index)最近的customer + current_ind = best_path[0] + for next_ind in best_path[1:]: + self.pheromone_mat[current_ind][next_ind] += self.rho/best_path_distance + current_ind = next_ind - :param next_index_list: - :return: - """ - current_ind = self.current_index + def _nearest_neighbor_heuristic(self): + index_to_visit = list(range(1, self.node_num)) + current_index = 0 + current_load = 0 + current_time = 0 + travel_distance = 0 + while len(index_to_visit) > 0: + nearest_next_index = self._cal_nearest_next_index(index_to_visit, current_index, current_load, current_time) + + if nearest_next_index is None: + travel_distance += self.node_dist_mat[current_index][0] + + current_load = 0 + current_time = 0 + current_index = 0 + else: + current_load += self.nodes[nearest_next_index].demand - nearest_ind = next_index_list[0] - min_dist = self.graph.node_dist_mat[current_ind][next_index_list[0]] + dist = self.node_dist_mat[current_index][nearest_next_index] + wait_time = max(self.nodes[nearest_next_index].ready_time - current_time - dist, 0) + service_time = self.nodes[nearest_next_index].service_time - for next_ind in next_index_list[1:]: - dist = self.graph.node_dist_mat[current_ind][next_ind] - if dist < min_dist: - min_dist = dist - nearest_ind = next_ind + current_time += dist + wait_time + service_time + index_to_visit.remove(nearest_next_index) - return nearest_ind + travel_distance += self.node_dist_mat[current_index][nearest_next_index] - def cal_total_travel_distance(self, travel_path): - distance = 0 - current_ind = travel_path[0] - for next_ind in travel_path[1:]: - distance += self.graph.node_dist_mat[current_ind][next_ind] - current_ind = next_ind - return distance + current_index = nearest_next_index + return travel_distance - def try_insert_on_path(self, node_id): + def _cal_nearest_next_index(self, index_to_visit, current_index, current_load, current_time): """ - 尝试性地将node_id插入当前的travel_path中 - 插入的位置不能违反载重,时间,行驶距离的限制 - 如果有多个位置,则找出最优的位置 - :param node_id: + 找到最近的可达的next_index + :param index_to_visit: :return: """ - feasible_insert_index = [] - feasible_distance = [] + nearest_ind = None + nearest_distance = None - path = copy.deepcopy(self.travel_path) - - for insert_index in range(len(path)): - if self.graph.nodes[path[insert_index]].is_depot: + for next_index in index_to_visit: + if current_load + self.nodes[next_index].demand > self.vehicle_capacity: continue - front_depot_index = insert_index - while front_depot_index >= 0 and not self.graph.nodes[self.travel_path[front_depot_index]].is_depot: - front_depot_index -= 1 - front_depot_index = max(front_depot_index, 0) - - check_ant = Ant(self.graph, path[0]) - - # 让check_ant 走过 path中下标从front_depot_index开始到insert_index-1的点 - for i in range(front_depot_index, insert_index): - check_ant.move_to_next_index(path[i]) - - # 开始尝试性地对排序后的index_to_visit中的结点进行访问 - if check_ant.check_condition(node_id): - check_ant.move_to_next_index(node_id) - - # 如果可以到node_id,则要保证vehicle可以行驶回到depot - for next_ind in path[insert_index:]: - if check_ant.check_condition(next_ind): - check_ant.move_to_next_index(next_ind) - if self.graph.nodes[next_ind].is_depot: - feasible_insert_index.append(insert_index) - path.insert(insert_index, node_id) - feasible_distance.append(self.cal_total_travel_distance(path)) - # 如果不可以回到depot,则返回上一层 - else: - break - - if len(feasible_distance) == 0: - return None - else: - feasible_distance = np.array(feasible_distance) - min_insert_ind = np.argmin(feasible_distance) - best_ind = feasible_insert_index[int(min_insert_ind)] - return best_ind - - def insertion_procedure(self): - """ - 为每个未访问的结点尝试性地找到一个合适的位置,插入到当前的travel_path - 插入的位置不能违反载重,时间,行驶距离的限制 - :return: - """ - if self.index_to_visit_empty(): - return - - ind_to_visit = copy.deepcopy(self.index_to_visit) - - demand = np.zeros(len(ind_to_visit)) - for i in range(len(ind_to_visit)): - demand[i] = self.graph.nodes[i].demand - - sorted_ind = np.argsort(demand) - ind_to_visit = ind_to_visit[sorted_ind] - - for node_id in ind_to_visit: - best_insert_index = self.try_insert_on_path(node_id) - if best_insert_index is not None: - self.travel_path.insert(best_insert_index, node_id) - self.index_to_visit.remove(node_id) - - self.total_travel_distance = self.cal_total_travel_distance(self.travel_path) - - def local_search_procedure(self): - depot_ind = [] - for ind in range(len(self.travel_path)): - if self.graph.nodes[self.travel_path[ind]].is_depot: - depot_ind.append(ind) - - new_path_travel_distance = [] - new_path = [] - for i in range(1, len(depot_ind)): - for j in range(i+1, len(depot_ind)): - start_a = random.randint(depot_ind[i-1]+1, depot_ind[i]-1) - end_a = random.randint(depot_ind[i-1]+1, depot_ind[i]-1) - if end_a < start_a: - start_a, end_a = end_a, start_a - - start_b = random.randint(depot_ind[j-1]+1, depot_ind[j]-1) - end_b = random.randint(depot_ind[j - 1] + 1, depot_ind[j] - 1) - if end_b < start_b: - start_b, end_b = end_b, start_b - - path = [] - path.extend(self.travel_path[:start_a]) - path.extend(self.travel_path[start_b:end_b+1]) - path.extend(self.travel_path[end_a:start_b]) - path.extend(self.travel_path[start_a:end_a+1]) - path.extend(self.travel_path[end_b+1:]) - - if len(path) != self.travel_path: - raise RuntimeError('error') - - check_ant = Ant(self.graph, path[0]) - for ind in path[1:]: - if check_ant.check_condition(ind): - check_ant.move_to_next_index(ind) - else: - break - if check_ant.index_to_visit_empty(): - # print('success to search') - new_path_travel_distance.append(check_ant.total_travel_distance) - new_path.append(path) - - new_path_travel_distance = np.array(new_path_travel_distance) - min_distance_ind = np.argmin(new_path_travel_distance) - min_distance = new_path_travel_distance[min_distance_ind] - - if min_distance < self.total_travel_distance: - return new_path[int(min_distance_ind)] - else: - return None - - -class NearestNeighborHeuristic: - def __init__(self, graph: VrptwGraph): - self.graph = graph - - def construct_path(self): - """ - 不考虑使用的车辆的数目,调用近邻点算法构造路径 - :return: - """ - ant = Ant(self.graph) - while not ant.index_to_visit_empty(): - next_index_meet_constrains = ant.cal_next_index_meet_constrains() - if len(next_index_meet_constrains) == 0: - next_index = 0 - else: - next_index = ant.cal_nearest_next_index(next_index_meet_constrains) - - ant.move_to_next_index(next_index) - ant.move_to_next_index(0) + dist = self.node_dist_mat[current_index][next_index] + wait_time = max(self.nodes[next_index].ready_time - current_time - dist, 0) + service_time = self.nodes[next_index].service_time + # 检查访问某一个旅客之后,能否回到服务店 + if current_time + dist + wait_time + service_time + self.node_dist_mat[next_index][0] > self.nodes[0].due_time: + continue - return ant + # 不可以服务due time之外的旅客 + if current_time + dist > self.nodes[next_index].due_time: + continue - def cal_init_pheromone_val(self): - ant = self.construct_path() - init_pheromone = (1 / (self.graph.node_num * ant.total_travel_distance)) - return init_pheromone + if nearest_distance is None or self.node_dist_mat[current_index][next_index] < nearest_distance: + nearest_distance = self.node_dist_mat[current_index][next_index] + nearest_ind = next_index + return nearest_ind -- cgit v1.2.3