Created
July 4, 2025 08:11
-
-
Save Redoute/8460d4aa12c26a70d282bd9280c8d94f to your computer and use it in GitHub Desktop.
QGIS algorithm: service area, using cached network
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# processing algorithm to compute service area | |
# Kai Borgolte (FORPLAN DR. SCHMIEDEL GmbH) 2025-07-04 | |
# Unlike native:serviceareafrompoint the used network will be cached. | |
# The network layer must be segmented - each line has to be an edge | |
# with exactly two vertices. | |
# ToDo: delete temporary start node from graph | |
# - doesn't work with QgsGraph, next time I'll use networkx | |
# hard coded field names | |
# these must contain precomputed costs (minutes), | |
# negative values (-1) for non-routable edges/directions | |
field_cost_forward = 'minutes_forward' | |
field_cost_backward = 'minutes_backward' | |
from qgis.PyQt.QtCore import QMetaType | |
from qgis.core import ( | |
Qgis, | |
QgsFeature, | |
QgsFeatureRequest, | |
QgsFeatureSink, | |
QgsField, | |
QgsFields, | |
QgsGeometry, | |
QgsPointXY, | |
QgsProcessing, | |
QgsProcessingAlgorithm, | |
QgsProcessingParameterBoolean, | |
QgsProcessingParameterFeatureSink, | |
QgsProcessingParameterFeatureSource, | |
QgsProcessingParameterNumber, | |
QgsProcessingParameterPoint, | |
QgsSettings, | |
QgsSpatialIndex, | |
) | |
from qgis.analysis import QgsGraph, QgsGraphAnalyzer | |
dijkstra = QgsGraphAnalyzer.dijkstra | |
# theses variables are singletons (bound to the module) | |
G = None | |
ix_coords = None | |
sp_index = None | |
class GSE_ServiceArea_Algorithm(QgsProcessingAlgorithm): | |
def processAlgorithm(self, parameters, context, feedback): | |
s = QgsSettings() | |
s.setValue('gse_servicearea/settings', self.asMap(parameters, context)['inputs']) | |
source = self.parameterAsSource(parameters, 'INPUT', context) | |
reload = self.parameterAsBool(parameters, 'RELOAD', context) | |
hub = self.parameterAsPoint( | |
parameters, 'HUB', context, crs=source.sourceCrs()) | |
max_minutes = self.parameterAsDouble(parameters, 'MINUTES', context) | |
(hubout, dest2_id) = self.parameterAsSink( | |
parameters, 'HUBOUTPUT', context, | |
QgsFields(), Qgis.WkbType.Point, source.sourceCrs()) | |
fields = QgsFields() | |
fields.append(QgsField('minutes_1', QMetaType.Double)) | |
fields.append(QgsField('minutes_2', QMetaType.Double)) | |
(sink, dest_id) = self.parameterAsSink( | |
parameters, 'OUTPUT', context, | |
fields, source.wkbType(), source.sourceCrs()) | |
# read network into QgsGraph | |
progress = get_graph(source, reload, feedback) | |
# create temporary graph vertex for location and get id | |
idhub, phub, eh1_id, eh2_id, progress = insert_point(hub, feedback, progress) | |
if feedback.isCanceled(): | |
return | |
# evaluate cost list into output layer | |
progress = do_servicearea(idhub, eh1_id, eh2_id, max_minutes, sink, feedback, progress) | |
if feedback.isCanceled(): | |
return | |
# delete temporary graph vertex | |
# G.removeVertex(idhub) | |
# output start node | |
progress = output_hub(phub, hubout, feedback, progress) | |
if feedback.isCanceled(): | |
return | |
return {"HUBOUTPUT": dest2_id, "OUTPUT": dest_id} | |
def createInstance(self): | |
return GSE_ServiceArea_Algorithm() | |
def name(self): | |
return 'gse_servicearea' | |
def displayName(self): | |
return 'Service area from point' | |
def group(self): | |
return 'gis.stackexchange.com' | |
def groupId(self): | |
return 'gse' | |
def shortDescription(self): | |
return ('Service area from point') | |
def shortHelpString(self): | |
return ''' | |
This algorithm creates a new vector layer with all the edges or parts of edges of a network line layer that can be reached with limited travel cost (time), starting from a point. | |
The network line layer has to be prepared: | |
- segmented: every edge should consist of a straight line with exactly two nodes/vertices | |
- correctly noded: two edges connect if and only if they share a common node | |
- prepared costs as fields 'minutes_forward' and 'minutes_backward' | |
- if an edge is oneway, the field for the other direction gets the value '-1.0' | |
The output layer "service area" will contain one (directed) feature | |
for every edge in the service area with two fields "minutes_1" and | |
"minutes_2" for both endpoints. | |
''' | |
def initAlgorithm(self, config=None): | |
saved = QgsSettings().value('fds_servicearea/settings', {}) | |
self.addParameter( | |
QgsProcessingParameterFeatureSource( | |
name='INPUT', description='network', | |
types=[QgsProcessing.TypeVectorLine], | |
defaultValue=saved.get('INPUT'))) | |
self.addParameter( | |
QgsProcessingParameterBoolean( | |
name='RELOAD', description='delete cache', | |
defaultValue=False)) | |
self.addParameter( | |
QgsProcessingParameterPoint( | |
name='HUB', description='start point', | |
defaultValue=saved.get('HUB'))) | |
self.addParameter( | |
QgsProcessingParameterNumber( | |
name='MINUTES', description='minutes', | |
defaultValue=saved.get('MINUTES', 12.0))) | |
self.addParameter( | |
QgsProcessingParameterFeatureSink( | |
name='OUTPUT', description='service area')) | |
self.addParameter( | |
QgsProcessingParameterFeatureSink( | |
name='HUBOUTPUT', description='output start point')) | |
def get_graph(l_in, reload, feedback): | |
global G, ix_coords, sp_index | |
if G and not reload: | |
feedback.pushInfo(f'Kanten (gerichtet): {G.edgeCount()}') | |
feedback.pushInfo(f'Knoten: {G.vertexCount()}') | |
feedback.pushInfo('') | |
return 0 | |
# read network into QgsGraph | |
feedback.setProgressText('Straßennetz einlesen') | |
progress = 0 | |
feedback.setProgress(progress) | |
filter = f'{field_cost_forward} > 0 or {field_cost_backward} > 0' | |
attrs = [field_cost_forward, field_cost_backward] | |
req = QgsFeatureRequest().setFilterExpression(filter)\ | |
.setSubsetOfAttributes(attrs, l_in.fields()) | |
G = QgsGraph() | |
ix_coords = {} | |
total = 60.0 / fc if (fc:=l_in.featureCount()) else 60.0 | |
for current, edge in enumerate(l_in.getFeatures(req)): | |
if feedback.isCanceled(): | |
return | |
mforw = edge[field_cost_forward] | |
mbackw = edge[field_cost_backward] | |
g = edge.geometry() | |
g1, g2 = g.vertices() | |
c1 = g1.x(), g1.y() | |
c2 = g2.x(), g2.y() | |
try: | |
graph_id1 = ix_coords[c1] | |
except KeyError: | |
graph_id1 = G.addVertex(QgsPointXY(g1)) | |
ix_coords[c1] = graph_id1 | |
try: | |
graph_id2 = ix_coords[c2] | |
except KeyError: | |
graph_id2 = G.addVertex(QgsPointXY(g2)) | |
ix_coords[c2] = graph_id2 | |
if mforw >= 0: | |
G.addEdge(graph_id1, graph_id2, (mforw, )) | |
if mbackw >= 0: | |
G.addEdge(graph_id2, graph_id1, (mbackw, )) | |
feedback.setProgress(int(current * total)) | |
progress = 60 | |
feedback.setProgress(progress) | |
feedback.pushInfo(f'Kanten (gerichtet): {G.edgeCount()}') | |
feedback.pushInfo(f'Knoten: {G.vertexCount()}') | |
feedback.pushInfo('') | |
feedback.setProgressText('räumlichen Index erzeugen') | |
sp_index = QgsSpatialIndex( | |
l_in.getFeatures(req), flags=QgsSpatialIndex.FlagStoreFeatureGeometries) | |
progress = 80 | |
feedback.setProgress(progress) | |
return progress | |
def insert_point(pt, feedback, progress): | |
# find next edge near given location | |
# and insert graph edges to its endpoints | |
global G, ix_coord, sp_index | |
feedback.setProgressText('Startpunkt in Graph einfügen') | |
# find next edge near given location | |
# https://gis.stackexchange.com/a/449288/18013 | |
nearest_id = sp_index.nearestNeighbor(pt, 1)[0] # we need only one neighbour | |
# get endpoints and distance from p1 to location-nearest point | |
# get point projected on nearest_geom | |
nearest_geom = sp_index.geometry(nearest_id) | |
p1, p2 = nearest_geom.vertices() | |
pos = nearest_geom.lineLocatePoint(QgsGeometry.fromPointXY(pt)) | |
pp = nearest_geom.interpolate(pos) | |
# get coordinates of endpoints | |
c1 = p1.x(), p1.y() | |
c2 = p2.x(), p2.y() | |
# get graph vertices from endpoints | |
ixv1 = ix_coords[c1] | |
v1 = G.vertex(ixv1) | |
ixv2 = ix_coords[c2] | |
v2 = G.vertex(ixv2) | |
# get graph edges between endpoints | |
for ix in v1.incomingEdges(): | |
e2_id = ix | |
e2 = G.edge(ix) | |
if e2.fromVertex() == ixv2: | |
break | |
else: | |
e2 = None | |
for ix in v2.incomingEdges(): | |
e1_id = ix | |
e1 = G.edge(ix) | |
if e1.fromVertex() == ixv1: | |
break | |
else: | |
e1 = None | |
# get cost from location to endpoints | |
len_edge = nearest_geom.length() | |
if e1: | |
cost1 = e1.cost(0) * pos / len_edge | |
if e2: | |
cost2 = e2.cost(0) * (1.0 - (pos / len_edge)) | |
# insert new vertice in graph | |
graph_id = G.addVertex(pp.asPoint()) | |
# insert new edges in graph | |
if e1: | |
G.addEdge(graph_id, ixv1, (cost1, )) | |
if e2: | |
G.addEdge(graph_id, ixv2, (cost2, )) | |
progress += 5 | |
feedback.setProgress(progress) | |
return graph_id, pp, e1_id, e2_id, progress | |
def do_servicearea(idhub, eh1_id, eh2_id, max_minutes, sink, feedback, progress): | |
# evaluate cost list into output layer | |
# calculate costs from location | |
feedback.setProgressText('Erreichbarkeiten berechnen') | |
le, lc = dijkstra(G, idhub, 0) | |
progress += 5 | |
feedback.setProgress(progress) | |
if feedback.isCanceled(): | |
return | |
total = 95.0 - progress | |
if vc:=G.vertexCount(): | |
total = total / vc | |
# lc has one entry per graph node: cost to this node | |
# if this is less then max_minutes: put outgoing edges to output | |
# le has one entry per graph node: edge leading to this node | |
for n1_id, (c1, e01_id) in enumerate(zip(lc, le)): | |
if c1 <= max_minutes: | |
n1 = G.vertex(n1_id) | |
p1 = n1.point() | |
for e12_id in n1.outgoingEdges(): | |
# ignore splitted edge at hub | |
if e12_id in (eh1_id, eh2_id): | |
continue | |
# calculate cost to theoretic meeting point (cm) | |
# we want to fully draw all outgoing edges from n1, except: | |
# - not the opposite edge to e01 | |
# - only part of edges where the other part is quickest | |
# from other direction (cm > c2) | |
# - only part of edges where max_minutes is exceeded | |
# (cm > max_minutes) | |
e12 = G.edge(e12_id) | |
c12 = e12.cost(0) | |
n2_id = e12.toVertex() | |
n2 = G.vertex(n2_id) | |
p2 = n2.point() | |
c2 = lc[n2_id] | |
e21_id = G.findOppositeEdge(e12_id) | |
if e21_id >= 0 and e21_id == e01_id: | |
continue # this is the edge we are coming from | |
if e21_id >= 0: | |
e21 = G.edge(e21_id) | |
c21 = e21.cost(0) | |
cm = (c1 + c2 + (c12 + c21) / 2) / 2 | |
else: | |
cm = c1 + c12 | |
c_end = min(cm, max_minutes) | |
# full edge | |
g = QgsGeometry.fromPolylineXY([p1, p2]) | |
# part of edge | |
if c_end - c1 < c12: | |
posm = (c_end - c1) / c12 | |
dm = g.length() * posm | |
pm = g.interpolate(dm).asPoint() | |
g = QgsGeometry.fromPolylineXY([p1, pm]) | |
out_feature = QgsFeature() | |
out_feature.setGeometry(g) | |
out_feature.setAttributes([c1, c_end]) | |
sink.addFeature(out_feature, QgsFeatureSink.FastInsert) | |
feedback.setProgress(progress + int(n1_id * total)) | |
if feedback.isCanceled(): | |
break | |
progress = 95.0 | |
return progress | |
def output_hub(phub, hubout, feedback, progress): | |
# show projected location of hub | |
feedback.setProgressText('Startpunkt (projiziert) ausgeben') | |
out_feature = QgsFeature() | |
out_feature.setGeometry(phub) | |
hubout.addFeature(out_feature, QgsFeatureSink.FastInsert) | |
progress += 5 | |
feedback.setProgress(progress) | |
return progress |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment