好记性不如烂笔头。很多学到的东西还是迅速而系统的整理出来才是对自己时间的最大的节约。还记得2019年末的时候整理过这个算法,虽然当时没有完全的明白其精髓,但是至少把使用流程搞清楚了。才过了半年不到,最近想重新的拾起来该算法做点东西。居然忘的一干二净,大致又在上面前前后后耗费了一周,终于形成了代码。算是将这个算法拆解零碎了。也感谢这个过程,让我再一次深入的研究,并加深了理解。
问题描述QPPTW全称为:the Quickest Path Problem with Time Windows即带时间窗路径问题。可能提到时间窗的最短路问题大家第一想到的是SPPTW算法(shourtest path problem with time windows)。其实QPPTW就是SPPTW的一个变体,不同的是SPPTW时间窗是在节点处,而QPPTW的时间窗是在线路上。
问题可以大致表述为下图的(a)。图中总共有4个节点,3条路径。自由体从第一个节点处到第四个节点处,但是路径并不是所有的时间都开放(绿色表示开放,红色表示路径关闭),自由体可以在节点处无限的等待,那么自由体最早在什么时间内能够到达终点?自由体走过的路程要最短。
其实,我们可以借鉴dijkstra算法(https://blog.csdn.net/weixin_41806489/article/details/105962788)的处理思路,即,将到达节点的时间段也看做一个维度(或者叫虚拟节点),也就是原图的基础上增加若干的虚拟节点将该问题转化为一个最短路问题。虽然比较绕,但是我觉得这个想法真的特别的cool。某种意义上是时间和空间之间的转化。
算法描述 1. 原始描述Gawrilow, Ewgenij,K O Hler, Ekkehard,M O Hring, Rolf H
Stenzel, Bj O Rn,等人在2008年提出的该种方法,感谢诸位大佬。
《Dynamic routing of automated guided vehicles in real-time》
翻译一下:
- 初始化:
从出发点生成一个初始标签L=(目前的路段,从初始点到该点的距离,到达该路段尾部的时间窗,前一个标签)。并将该标签加入队列H。 - loop循环:
- 退出条件:(1)H为空,说明没有路径可以到达。(2)延申到了目标节点,此时可以回溯出最优路径。
- 对于每个时间窗:
- (1)尽量的延申。如果所有的延申都做完了,那么进入下一个时间窗。
- (2)支配判断。对于某个到达节点u,如果当前的label“被支配了”(有一个label,到这里走的路程又少,到达我这里之后时间窗还充足,充足到能把你完全的包住),那么当前的label就被废弃。反之,如果当前的label支配了其他的label那其他的label也就没有再延申的必要了。
注意: 上面的4.2特别的重要,这牵扯到了怎样唯一的表征一个label的问题,不能只通过路段开始和结束节点(u,v),还应该包括某个时间窗t=[t(0),t(1)]。所以一个label我们可以用进行唯一的表征,这一点在编程中特别的重要。
QPPTW算法#-*-coding:utf-8-*-
from taxiProcessORFram.models.information import myOwnGrapyWithTimeWindow
import pandas as pd
import heapq
import datetime
import collections
myOwnGrapyTw=myOwnGrapyWithTimeWindow()
weight_edges_with_time_window=[]
def getNextI(vilateNextTimeWindows,vilateBlockTimeWindows):
finalResult=[]
vilateNextTimeWindowsDict = collections.defaultdict(list)
for idx, twn in enumerate(vilateNextTimeWindows):
twn_begin, twn_end = twn
vilateNextTimeWindowsDict[twn_begin].append(("vtb", str(idx)))
vilateNextTimeWindowsDict[twn_end].append(("vte", str(idx)))
vilateBlockTimeWindowsDict = collections.defaultdict(list)
for idx, twb in enumerate(vilateBlockTimeWindows):
twb_begin, twb_end = twb
vilateBlockTimeWindowsDict[twb_begin].append(("vbb", str(idx)))
vilateBlockTimeWindowsDict[twb_end].append(("vbe", str(idx)))
timeLineQuen = []
heapq.heapify(timeLineQuen)
for temp in vilateNextTimeWindowsDict:
heapq.heappush(timeLineQuen, (temp, vilateNextTimeWindowsDict[temp]))
for temp in vilateBlockTimeWindowsDict:
heapq.heappush(timeLineQuen, (temp, vilateBlockTimeWindowsDict[temp]))
timeLineQuenCopy = []
beginBlockTimeNumber = 0
# 有效时间是这样的定义:blockBeginTime是None,并且timeWindowBegin有效,并出现了有效的结束,则该段时间生效。对所有的节点排序
while timeLineQuen:
thisData = heapq.heappop(timeLineQuen)
timeLineQuenCopy.append(thisData)
for i in range(len(timeLineQuenCopy) - 1):
thisData = timeLineQuenCopy[i]
nextData = timeLineQuenCopy[i + 1]
# 记录当前是否有还在生效的block
for type, idx in thisData[1]:
# print(thisTime,type,idx)
if type == "vbb":
beginBlockTimeNumber += 1
elif type == "vbe":
beginBlockTimeNumber -= 1
if beginBlockTimeNumber == 0:
# 确定在此刻的前面,所有的block都是关闭的,才可以判断当前的小时间窗是否有效
if "vtb" in [item[0] for item in thisData[1]] and (
("vte" in [item[0] for item in nextData[1]]) or ("vbb" in [item[0] for item in nextData[1]])) and \
thisData[0]<nextData[0]:
finalResult.append((thisData[0], nextData[0]))
# print(thisData, nextData)
if "vbe" in [item[0] for item in thisData[1]] and \
"vte" in [item[0] for item in nextData[1]] and \
thisData[0]<nextData[0]:
finalResult.append((thisData[0], nextData[0]))
if "vbe" in [item[0] for item in thisData[1]] and \
"vte" in [item[0] for item in nextData[1]] and \
thisData[0] < nextData[0]:
finalResult.append((thisData[0], nextData[0]))
return finalResult
if __name__ == '__main__':
#s是一个虚拟节点,表示出发节点
s=0
st=1#代表实际出发地点
weight_edges_with_time_window.append((s,st,0,pd.Timestamp("2019-11-26 00:00:00"),pd.Timestamp("2019-11-26 00:00:10")))
weight_edges_with_time_window.append((st,s,10000,pd.Timestamp("2019-11-26 00:00:00"),pd.Timestamp("2019-11-26 00:00:10")))
weight_edges_with_time_window.append((1,2,1,pd.Timestamp("2019-11-26 00:00:00"),pd.Timestamp("2019-11-26 00:00:10")))
weight_edges_with_time_window.append((2,1,1,pd.Timestamp("2019-11-26 00:00:00"),pd.Timestamp("2019-11-26 00:00:10")))
weight_edges_with_time_window.append((2,3,2,pd.Timestamp("2019-11-26 00:00:00"),pd.Timestamp("2019-11-26 00:00:10")))
weight_edges_with_time_window.append((3,2,2,pd.Timestamp("2019-11-26 00:00:00"),pd.Timestamp("2019-11-26 00:00:10")))
weight_edges_with_time_window.append((3,4,1,pd.Timestamp("2019-11-26 00:00:00"),pd.Timestamp("2019-11-26 00:00:10")))
weight_edges_with_time_window.append((4,3,1,pd.Timestamp("2019-11-26 00:00:00"),pd.Timestamp("2019-11-26 00:00:10")))
myOwnGrapyTw.addWeightedEdgesWithTimeWindowFrom(weight_edges_with_time_window,graphy_type="direction")
##添加blocktime
myOwnGrapyTw.addBlockTimeForEdge(2,3, (pd.Timestamp("2019-11-26 00:00:05"),pd.Timestamp("2019-11-26 00:00:06")))
myOwnGrapyTw.addBlockTimeForEdge(3,4, (pd.Timestamp("2019-11-26 00:00:00"),pd.Timestamp("2019-11-26 00:00:02")))
myOwnGrapyTw.addBlockTimeForEdge(3,4, (pd.Timestamp("2019-11-26 00:00:08"),pd.Timestamp("2019-11-26 00:00:10")))
#添加I_times,即到达弧度(u,v)尾部的时间窗
myOwnGrapyTw.addITimeWindowForEdge(0,1,(pd.Timestamp("2019-11-26 00:00:01"),pd.Timestamp("2019-11-26 00:00:09")))
#以节点为单位的label
allLlist={}
for u in myOwnGrapyTw.adj:
allLlist[u] = {}
for v in myOwnGrapyTw.adj[u]:
allLlist[u][v]={}
H=[]
heapq.heapify(H)
for ti in myOwnGrapyTw.getITimeWindowsForEdge(s,st):
thisLable=[myOwnGrapyTw.getEdgeData(s,st),(s,st),myOwnGrapyTw.getITimeWindowsForEdge(s,st)[0],None]
#ti时刻从s出发,到st形成的lable
allLlist[st][s][ti[0]]=thisLable
#这几个参数的意思:cost、弧、可能到达的时间(这里表示到这个弧的尾部的时间)、上一个弧(cL,aL,IL,predL)
heapq.heappush(H,thisLable)
target=4
while H:
tempLabel=heapq.heappop(H)
if tempLabel[1][1]==target:
print("we have found the best solution")
print(tempLabel)
break
print("---------------------------当前的弧线:",tempLabel[1],"-----------------------")
print("当前弧线的Lable",tempLabel)
#step1.应该是以每一个时间窗为对象。在同一个arc上,时间窗会被dominance,因为会造成花的时间又多,到达的时候时间段又不好的情况,即到达的时间窗也被被人(支配了)
# for temTimeWindow in tempLabel[2]:
temTimeWindow=tempLabel[2]
#对于dominated功能,我们用allLlist这个变量来实现。对于被支配的我们进行特殊标记,并在这里进行跳过
if allLlist[tempLabel[1][1]][tempLabel[1][0]].get(temTimeWindow[0],None)=="Null":#说明已经被支配
print("该条线上的,",temTimeWindow,"被支配了")
break
"""
"""
##由于可以等待,所以temTimeWindow会被拉伸
temTimeWindow = [temTimeWindow[0], myOwnGrapyTw.getUpTimeWindowForEdge(tempLabel[1][0], tempLabel[1][1])]
#step2.从这个时间窗到其他的arc上去
for nextNode in myOwnGrapyTw.adj[tempLabel[1][1]]:
# 第二步,取出这个tempArc上的所有I
vilateNextTimeWindows=[]
vilateNextTimeWindow = [ min(temTimeWindow[0] + datetime.timedelta(seconds=myOwnGrapyTw.getEdgeData(tempLabel[1][1], nextNode)),
myOwnGrapyTw.getUpTimeWindowForEdge(tempLabel[1][0], tempLabel[1][1])),
min(temTimeWindow[1] + datetime.timedelta(seconds=myOwnGrapyTw.getEdgeData(tempLabel[1][1], nextNode)),
myOwnGrapyTw.getUpTimeWindowForEdge(tempLabel[1][0], tempLabel[1][1]))]
vilateNextTimeWindows.append(vilateNextTimeWindow)
vilateBlockTimeWindows = []
for thisBlockTime in myOwnGrapyTw.getBlockTimeForEdge(tempLabel[1][1], nextNode):
vilateBlockTimeWindow = [thisBlockTime[0], min(thisBlockTime[1] + datetime.timedelta(
seconds=myOwnGrapyTw.getEdgeData(tempLabel[1][1], nextNode)), myOwnGrapyTw.getUpTimeWindowForEdge(tempLabel[1][0],tempLabel[1][1]))]
vilateBlockTimeWindows.append(vilateBlockTimeWindow)
nextAllI = getNextI(vilateNextTimeWindows, vilateBlockTimeWindows)
if not nextAllI:
#说明无法到达
continue
Lpian = [tempLabel[0] + myOwnGrapyTw.getEdgeData(tempLabel[1][1], nextNode),
(tempLabel[1][1], nextNode),
nextAllI[0],
tempLabel]
""""
thisLabel的第四位存的是前一个label,而不是arc
"""
print("nextAllI", nextAllI)
#在这里询问Lpian是支配别人还是被别人支配
goStep2 = False
for u in allLlist:
if nextNode in allLlist[u]:
for t in allLlist[u][nextNode]:
oldLabel=allLlist[u][nextNode][t]
#Lpian支配别人
if Lpian[0]<oldLabel[0] and Lpian[2][0]<=oldLabel[2][0] and Lpian[2][1]>=oldLabel[2][1]:
oldLabel[nextNode][u][t]="Null"
print("Lpian支配了别人",Lpian)
if Lpian[0]>oldLabel[0] and Lpian[2][0]>=oldLabel[2][0] and Lpian[2][1]<=oldLabel[2][1]:
#将Lpian废弃,并返回step2
print("Lpian被支配了", Lpian)
goStep2=True
break
if goStep2:
break
if goStep2:
continue#下一个
##将Lpian插入到H和allLlist
heapq.heappush(H,Lpian)
allLlist[nextNode][tempLabel[1][1]][temTimeWindow[0]]=Lpian
else:
print("说明没有到达目标点的路径")
依赖的类
class myOwnGrapyWithTimeWindow:
# "Abstract all variable types to one level"
node_dict_factory = collections.defaultdict(dict)
node_attr_dict_factory = dict
adjlist_outer_dict_factory = collections.defaultdict(dict)#information in outer of the adj
adjlist_inner_dict_factory = dict#information in innter of the adi
edge_attr_dict_factory = dict
graph_attr_dict_factory = dict
be_used_time_window_factory=list
def __init__(self):
self.graph = self.graph_attr_dict_factory() # dictionary for graph attributes
self._node = self.node_dict_factory # empty node attribute dict
self._adj = self.adjlist_outer_dict_factory # empty adjacency dict
@property
def adj(self):
return self._adj
def addEdge(self,u_of_edge, v_of_edge,graphy_type,**attr):
u, v = u_of_edge, v_of_edge
datadict = self._adj[u].get(v, self.edge_attr_dict_factory())
datadict.update(attr)
# print(graphy_type)
if graphy_type == "bidirection":
self._adj[u][v] = datadict
#一定要注意变量引用的底层逻辑
self._adj[v][u] = copy.deepcopy(datadict)
else:
self._adj[u][v] = datadict
# self._adj[v][v] = datadict
def removeEdge(self,u,v):
del self._adj[u][v]
def addBlockTimeForEdge(self,u,v,block_time,type="bidiriciton"):
# if self._adj[v][u].get("block_time","Null")=="Null":
# self._adj[v][u]["block_time"]=[block_time]
# self._adj[u][v]["block_time"] = [block_time]
# else:
if type=="bidiriction":
self._adj[u][v]["block_time"].append(block_time)
self._adj[v][u]["block_time"].append(block_time)
else:
self._adj[u][v]["block_time"].append(block_time)
def getBlockTimeForEdge(self,u,v):
return self._adj[u][v]["block_time"]
def getUpTimeWindowForEdge(self,u,v):
return self._adj[u][v]["time_end"]
def addITimeWindowForEdge(self,u,v,Itime):
self._adj[u][v]["I_times"].append(Itime)
def setITimeWindowsForEdge(self, u, v, Itimes):
self._adj[u][v]["I_times"]=copy.deepcopy(Itimes)
def getITimeWindowsForEdge(self, u, v):
return self._adj[u][v]["I_times"]
def getTimeWindowWithOutBlockTime(self,u,v):
time_points=[]
# item[0], item[1]
for item in self._adj[u][v]["block_time"]:
time_points.append(item[0])
time_points.append(item[1])
time_points.append(self._adj[u][v]["time_begin"])
time_points.append(self._adj[u][v]["time_end"])
i=0
results=[]
while i<len(time_points)-1:
results.append((time_points[i],time_points[i+1]))
i+=2
return results
def getEdgeData(self,u,v,type="weight",default=None):
try:
return self._adj[u][v][type]
except KeyError:
return default
def addEdgesFrom(self, ebunch_to_add,graphy_type="bidirection", **attr):
# print("----2>",graphy_type)
for e in ebunch_to_add:
ne = len(e)
if ne == 3:
u, v, dd = e
elif ne == 2:
u, v = e
dd = {} # doesn't need edge_attr_dict_factory
else:
raise Exception(f"Edge tuple {e} must be a 2-tuple or 3-tuple.")
self.addEdge(u,v,graphy_type,**dd,**attr)
def addWeightedEdgesFrom(self, ebunch_to_add,weight="weight", **attr):
self.addEdgesFrom(((u, v, {weight: d}) for u, v, d in ebunch_to_add), **attr)
def addWeightedEdgesWithTimeWindowFrom(self, ebunch_to_add, weight="weight", time_begin="time_begin", time_end="time_end",graphy_type="bidirection", **attr):
""""
graphy_type:表示是单项还是双向图
"""
# 怎样存储这个时间窗哪?
# print("1---->",graphy_type)
self.addEdgesFrom(((u, v, {weight: d, time_begin: t_b, time_end: t_e,"block_time":[],"I_times":[]}) for u, v, d, t_b, t_e in ebunch_to_add),graphy_type=graphy_type,**attr)
def addBeUsedTimeWindowToEdge(self,u_be_added,v_be_added,be_used_window):
u,v=u_be_added,v_be_added
if not self.getEdgeData(u,v,type="be_used_windows"):
init_be_used_time_window_factory =[be_used_window]
init_be_used_time_window_factory2 = [copy.deepcopy(be_used_window)]
heapq.heapify(init_be_used_time_window_factory)
heapq.heapify(init_be_used_time_window_factory2)
self._adj[u][v].update({"be_used_windows":init_be_used_time_window_factory})
self._adj[v][u].update({"be_used_windows":init_be_used_time_window_factory2})
else:
#这里需要写一个函数,重新的归并已经用的时间窗。
print("==============touble")
pass
# heapq.heappush(self._adj[u][v]["be_used_windows"],be_used_window)
# heapq.heappush(self._adj[v][u]["be_used_windows"], be_used_window)
def updateBeUsedTimeWindowToEdge(self,u_be_added,v_be_added,be_update_window,detatT=10):
u,v=u_be_added,v_be_added
start_time,end_time=be_update_window
len_time_window=(end_time-start_time).seconds
if len(self._adj[u][v]["be_used_windows"])==1:
start_time1, end_time1=self._adj[u][v]["be_used_windows"][0]
end_time1_strip=time.mktime(time.strptime(str(end_time1),"%Y-%m-%d %H:%M:%S"))
start_time_strip=time.mktime(time.strptime(str(start_time),"%Y-%m-%d %H:%M:%S"))
start_time1_strip = time.mktime(time.strptime(str(start_time1), "%Y-%m-%d %H:%M:%S"))
end_time_strip = time.mktime(time.strptime(str(end_time), "%Y-%m-%d %H:%M:%S"))
if (end_time1_strip-start_time_strip)*(start_time1_strip-end_time_strip)<=0:#等于0的时候说明重合,要等上一个飞机滑行完
# print("重合")
#当发生重合的时候,留一个缓冲时间
departure_time=end_time1+datetime.timedelta(seconds=detatT)
arrival_time=departure_time+datetime.timedelta(seconds=len_time_window)
heapq.heappush(self._adj[u][v]["be_used_windows"], (departure_time-datetime.timedelta(seconds=detatT),arrival_time))
# print("vu插入前->", self._adj[u][v]["be_used_windows"])
heapq.heappush(self._adj[v][u]["be_used_windows"], (departure_time-datetime.timedelta(seconds=detatT), arrival_time))
# print("vu插入后->", self._adj[u][v]["be_used_windows"])
# 返回的是这个点的出发时间和下个点的到达时间。
# print("插入后->", self._adj[u][v]["be_used_windows"])
# print("退出")
return departure_time,arrival_time
else:
heapq.heappush(self._adj[u][v]["be_used_windows"],be_update_window)
heapq.heappush(self._adj[v][u]["be_used_windows"], be_update_window)
return start_time,end_time
# print("向下进行")
#时间窗是有序的
for i in range(len(self._adj[u][v]["be_used_windows"])-1):
slot_start,slot_end=self._adj[u][v]["be_used_windows"][i][1],self._adj[u][v]["be_used_windows"][i+1][0]
#能插入就插入,插入不了就插在所有时间窗的最尾端
if slot_end>=end_time:#只有这种情况下才需要判断,否则都不行
if slot_start<=start_time:#这种情况下可以直接插入
heapq.heappush(self._adj[u][v]["be_used_windows"], be_update_window)
heapq.heappush(self._adj[v][u]["be_used_windows"], be_update_window)
return start_time, end_time
else:#说明有交叉
len_slot=(slot_end-slot_start).seconds
if len_slot>=len_time_window:
return slot_start,slot_start+datetime.timedelta(seconds=len_time_window)
else:
continue
else:
continue
if end_time>=self._adj[u][v]["be_used_windows"][-1][1]:
heapq.heappush(self._adj[u][v]["be_used_windows"], be_update_window)
heapq.heappush(self._adj[v][u]["be_used_windows"], be_update_window)
return start_time, end_time
else:
departure_time = self._adj[u][v]["be_used_windows"][-1][1]
arrival_time = departure_time + datetime.timedelta(seconds=len_time_window)
heapq.heappush(self._adj[u][v]["be_used_windows"], (departure_time, arrival_time))
heapq.heappush(self._adj[v][u]["be_used_windows"], (departure_time, arrival_time))
return departure_time, arrival_time
# if not conflict:#不冲突
def addNode(self, node_for_adding, **attr):
self._adj[node_for_adding] = self.adjlist_inner_dict_factory()
self._node[node_for_adding].update(attr)
后记
留个悬念,getNextI是一个很巧妙的函数。闲了专门写个博客刨析一下它。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)