1.思路:
1.抽取路网(管网)各节点,点去重+点线关联;---根据数据及业务场景可选择线起始点/起中终点/全部节点;
2.构建路网(管网)各节点KDtree;
3.使用邻接矩阵存储点连通性;--不通:-1、自身点:0、相连:1(或权重/距离--可根据业务场景设置);
4.基于KDtree分别捕获到 [ 出发点 , 到达点] 最近的一个捕捉节点(k=1);
5.根据捕捉节点进行图的广度优先遍历--结合迪杰克斯特拉(Dijkstra)算法思路--因为节点重复,根据节点键值取存储线id后,需要根据前后节点筛选线;
6.前进方向确定(线遍历后排列顺序+线坐标点集合顺序)--根据返回数据格式确定;
7.路书制作( 八方位-["北", "东北", "东", "东南", "南", "西南", "西", "西北", "原点", ]+ 下一节点方向(左/右/直行/掉头))--方向-八方位-坐标大小/ 方向-左右-向量叉乘(注意:当前代码因数据区域大小原因,已忽略椭球体及地理坐标影响因素);
8.结果输出(在路书中增加出发点/到达点与最短路径的关系--详细导航信息)--当前输出json结构参考了高德最短路径分析api返回数据结构;
9.效率优化--(将数据提前加载进内存/api调用)--当前框架使用python/flask,一秒内返回结果(当前测试数据大概0.04-0.5)
2.实现
1.python
#!/usr/bin/python3 # -*- coding:utf-8 -*- ''' #################################################################################### #版本信息 ##最近修改时间:20220331 ##最近修改人:慢慢 #################################################################################### #功能说明 ##陇南路径分析 #################################################################################### #返回结果 ##返回格式-dict ##运行成功- ##运行失败-{"code": 404, "type": "fail", "msg": "服务错误", "box": "", "data": "需要重启"} #################################################################################### #使用说明 #################################################################################### #更新历史 ##20220421 新增最短路径导航详细信息 路书功能开发-(路口左右转、东南西北八个方向、距离) #################################################################################### ''' import json from shapely.geometry import LineString from shapely.geometry import mapping as geommapping # from lib.geotools import Features, Intersect, Clip, ToPolygons, CreatGrid, ToPoints try: from lib.flaskservices.config_app import __keyself__ except ImportError: __keyself__ = {"file_line": "/data/l_road.shp", "building": "/data/r_building.shp", "grid": "/data/r_grid.shp", "people": "/data/r_people.csv"} from geopandas import GeoSeries, GeoDataFrame __data__type = (list, tuple) __mapdata_type__ = (GeoSeries, GeoDataFrame) __kself__ = list(__keyself__.keys())[0] def GetRoadBookCourse(pointa, pointb): # 方向-八方位-坐标大小 txt = ["北", "东北", "东", "东南", "南", "西南", "西", "西北", "原点", ] x_p, y_p = round(pointb[0], 4) - round(pointa[0], 4), round(pointb[1], 4) - round(pointa[1], 4) if x_p == 0 and y_p > 0: course = txt[0] elif x_p > 0 and y_p > 0: course = txt[1] elif x_p > 0 and y_p == 0: course = txt[2] elif x_p > 0 and y_p < 0: course = txt[3] elif x_p == 0 and y_p < 0: course = txt[4] elif x_p < 0 and y_p < 0: course = txt[5] elif x_p < 0 and y_p == 0: course = txt[6] elif x_p < 0 and y_p > 0: course = txt[7] else: course = txt[-1] return course def GetRoadBookLR(origin, pointa, pointb): # 方向-左右-向量叉乘 x_o, y_o = round(pointa[0], 2) - round(origin[0], 2), round(pointa[1], 2) - round(origin[1], 2) x_l, y_l = round(pointb[0], 2) - round(origin[0], 2), round(pointb[1], 2) - round(origin[1], 2) k = (x_o * y_l) - (x_l * y_o) if k < 0: lr = "右转" elif k > 0: lr = "左转" else: # 左右-向量点乘 k = (x_o * x_l) + (y_o * y_l) if k < 0: lr = "掉头" else: lr = "直行" return lr def GetNodeInd(kdtree, point, k=1): try: __distances, __ind = kdtree.query(point, k=k) return True, __ind except: return False, "根据输入点获取最近线失败(经度,纬度) " + "%s" % point # 因为节点重复,根据节点键值取存储线id后,需要根据前后节点筛选线 def GetNodeIdLineId(poiss_ind, adjacency_matrix, node_dict, node_key): # 广度优先遍历--结合迪杰克斯特拉(Dijkstra)算法思路 try: loop_will = [poiss_ind[0]] loop_ed = set() loop_done = [] loop = 0 loop_stdout = [poiss_ind[1]] stdout = [] while loop_will: loop = loop_will.pop(0) loop_once = [] loop_ed.add(loop) for i, lind_id in enumerate(adjacency_matrix[loop, ...]): if lind_id > 0: loop_once.append(i) if i == poiss_ind[1]: loop_will.clear() loop_stdout.append(loop) break if i not in loop_ed: loop_will.append(i) else: loop_done.append([loop, loop_once]) loop_done.reverse() for key in loop_done: if loop in key[1]: loop_stdout.append(key[0]) loop = key[0] if loop == poiss_ind[0]: loop_stdout.reverse() break ################################################################################# # 可运行,可用于功能扩展 # while loop: # for key in keys: # if loop in loop_done[key]: # loop_stdout.append(key) # loop = key # if loop == poiss_ind[0]: # # loop_stdout.append(key) # loop_stdout.reverse() # loop = 0 # break ################################################################################# # 确定前进方向 for key in loop_stdout: stdout.extend(node_dict[node_key[key]]) else: stdout = list(set([v for v in stdout if stdout.count(v) > 1])) # 根据图遍历结果数据特性--间隔取值,逆间隔取值--转换到最终结果 if len(stdout) > 2: stdout = stdout[1::2] + [stdout[-1]] + stdout[-3::-2] except Exception as __ex: return False, "路径搜索失败:GetLineId()" + "%s" % __ex else: return True, (loop_stdout, stdout) def SetOutFormatGD(poiss, nodeid_lineid, node, line, to_crs=4544): def GetBook(poi, p_l, roadname, length): if not roadname: roadname = "无名道路" data["points"].extend(p_l) tmp = dict() tmp["instruction"] = "\tGetRoadBookLR\t.沿" + roadname + GetRoadBookCourse(poi[0], poi[1]) + "方向前进 %s" % round( length, 2) + " 米,\tGetRoadBookLRNext\t;" tmp["points"] = p_l data["steps"].append(tmp) data = {"steps": [], "points": []} for i, id in enumerate(nodeid_lineid[1]): points = list(geommapping(line["geometry"][id])["coordinates"]) if node[nodeid_lineid[0][i]] != points[0]: points.reverse() if i == 0: poi = [list(poiss[0]), points[0]] p_l = poi roadname = "出发点到最近路口的" length = GeoDataFrame(crs=to_crs, geometry=[LineString(poi)]).length[0] * 100000 else: poi = [points[0], points[-1]] p_l = points roadname = str(line["Name"][id]).replace("None", "") length = line["Shape_Leng"][id] GetBook(poi, p_l, roadname, length) else: poi = [points[-1], list(poiss[1])] p_l = poi roadname = "最近路口到达点的" length = GeoDataFrame(crs=to_crs, geometry=[LineString(poi)]).length[0] * 100000 GetBook(poi, p_l, roadname, length) # 路书 for i in range(len(data["steps"])): if i == 0: # 正北方向 origin = [list(poiss[0])[0], list(poiss[0])[1] - 1] pointa = data["steps"][i]["points"][0] pointb = data["steps"][i]["points"][1] lr = ",准备出发" insed = "到达主路" elif i == len(data["steps"]) - 1: origin = data["steps"][i]["points"][-2] pointa = data["steps"][i]["points"][-1] pointb = list(poiss[1]) lr = ",到达目的地附近" insed = "到达目的地" else: origin = data["steps"][i - 1]["points"][-2] pointa = data["steps"][i]["points"][0] pointb = data["steps"][i]["points"][1] lr = "" insed = "下个路口" + ins ins = GetRoadBookLR(origin, pointa, pointb) data["steps"][i]["instruction"] = data["steps"][i]["instruction"].replace("\tGetRoadBookLR\t", ins + lr).replace( "\tGetRoadBookLRNext\t", insed) return data class AnalysisPath: def __init__(self, data_dict): self._type = True self._msg = {"code": 200, "type": "success", "msg": "最短路径分析成功", "box": "", "data": None} self.data_dict = data_dict def __str__(self): return 'AnalysisPath类实现路径分析' def __Check(self, data): if data[0]: return data[1] else: self._type = False self._msg = {"code": 204, "type": "fail", "msg": "最短路径分析失败", "box": "", "data": "%s" % data[1]} return None def Run(self, crs_distance=4544): # ######################################################################################################## # # # 可运行--计算两点直线距离,以后功能扩展可用 # pois = [self.__Check(v) for v in # [ToPoints(data=[v], crs=self.data_dict[__kself__].crs).XYs() for v in # self.data_dict['file']]] # if self._type: # poi_distance = round(pois[0].to_crs(crs_distance).distance(pois[1].to_crs(crs_distance)).loc[0], 2) # # poiss_ind = [self.__Check(GetNodeInd(self.data_dict["kdtree"], __v)) for __v in self.data_dict['file'] if # self._type] # ######################################################################################################## poiss_ind = [self.__Check(GetNodeInd(self.data_dict["kdtree"], __v)) for __v in self.data_dict['file'] if self._type] if self._type: if poiss_ind[0] == poiss_ind[1]: self._msg["data"] = "起点和终点相同" else: nodeid_lineid = self.__Check( GetNodeIdLineId(poiss_ind, self.data_dict["adjacency_matrix"], self.data_dict["node_dict"], self.data_dict["node_key"])) if self._type: point_gd = SetOutFormatGD(self.data_dict['file'], nodeid_lineid, self.data_dict['node'], self.data_dict[__kself__].loc[nodeid_lineid[1]][ ["Name", "Shape_Leng", "geometry"]].to_dict()) self._msg["data"] = json.dumps(point_gd) return self._msg def Main(data_dict): return AnalysisPath(data_dict).Run() if __name__ == '__main__': import numpy as np from cartopy import crs as ccrs import matplotlib.pyplot as plt ax = plt.axes(projection=ccrs.PlateCarree(), aspect='auto') import time, datetime __start_time = time.time() print('开始处理:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) import geopandas as gpd from scipy.spatial import KDTree path_base = r"C:\myprojects\py_projects\get_polygon" data_dict = dict() nodes = set() data_dict[__kself__] = gpd.read_file(path_base + __keyself__[__kself__]) for id in range(len(data_dict[__kself__])): points = geommapping(data_dict[__kself__]["geometry"][id])["coordinates"] #################################################################################### # 获取所有点--生成前期基础数据很慢,需要优化速度 # for v in points: # nodes.append([v, data_dict[__kself__]["id"][id]]) #################################################################################### # 起点、中点、终点 nodes.add((points[0], data_dict[__kself__]["id"][id])) # nodes.add((points[int(len(points)/2)], data_dict[__kself__]["id"][id])) nodes.add((points[-1], data_dict[__kself__]["id"][id])) data_dict[__kself__].plot(ax=ax) node_dict = {} tmp_xy = [] node = [] # 点关联到线,字典--key:点id,value:线id for __i, __v in enumerate(nodes): # 过滤重复点--根据线共用点情况,多份存储 if __v[0] in tmp_xy: node_dict[tmp_xy.index(__v[0])].append(__v[1]) else: node_dict[__i] = [__v[1]] node.append(__v[0]) tmp_xy.append(__v[0]) else: del tmp_xy node_key = list(node_dict.keys()) data_dict["kdtree"] = KDTree(node) data_dict["node"] = node data_dict["node_dict"] = node_dict data_dict["node_key"] = node_key # 点连通性关系存储--邻接矩阵 adjacency_matrix = np.zeros(shape=(len(node), len(node)), dtype=int) - 1 for i in range(len(node_key)): for j in range(len(node_key)): if i == j: adjacency_matrix[i][j] = 0 else: for __vj in node_dict[node_key[j]]: if __vj in node_dict[node_key[i]]: adjacency_matrix[i][j] = __vj + 1 ###################################################################################### # 当前算法未用到距离 # adjacency_matrix[i][j] = data_dict[__kself__].loc[__vj]["Shape_Leng"] ###################################################################################### else: data_dict["adjacency_matrix"] = adjacency_matrix data_dict['file'] = [(104.907, 33.408), (104.916, 33.398)] # data_dict['file'] = [(104.028, 34.1034), (104.05, 34.1)] # data_dict['file'] = [(105.115, 32.6145), (105.2, 32.6306)] # data_dict['file'] = [(104.919, 33.394), (104.921, 33.391)] tmp = Main(data_dict) print(tmp) print('本次耗时:', datetime.timedelta(seconds=time.time() - __start_time), '\n')
2.c
待补充...
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)