Skip to content

Instantly share code, notes, and snippets.

@Redoute
Created July 4, 2025 08:11
Show Gist options
  • Save Redoute/8460d4aa12c26a70d282bd9280c8d94f to your computer and use it in GitHub Desktop.
Save Redoute/8460d4aa12c26a70d282bd9280c8d94f to your computer and use it in GitHub Desktop.
QGIS algorithm: service area, using cached network
# processing algorithm to compute service area
# Kai Borgolte (FORPLAN DR. SCHMIEDEL GmbH) 2025-07-04
# Unlike native:serviceareafrompoint the used network will be cached.
# The network layer must be segmented - each line has to be an edge
# with exactly two vertices.
# ToDo: delete temporary start node from graph
# - doesn't work with QgsGraph, next time I'll use networkx
# hard coded field names
# these must contain precomputed costs (minutes),
# negative values (-1) for non-routable edges/directions
field_cost_forward = 'minutes_forward'
field_cost_backward = 'minutes_backward'
from qgis.PyQt.QtCore import QMetaType
from qgis.core import (
Qgis,
QgsFeature,
QgsFeatureRequest,
QgsFeatureSink,
QgsField,
QgsFields,
QgsGeometry,
QgsPointXY,
QgsProcessing,
QgsProcessingAlgorithm,
QgsProcessingParameterBoolean,
QgsProcessingParameterFeatureSink,
QgsProcessingParameterFeatureSource,
QgsProcessingParameterNumber,
QgsProcessingParameterPoint,
QgsSettings,
QgsSpatialIndex,
)
from qgis.analysis import QgsGraph, QgsGraphAnalyzer
dijkstra = QgsGraphAnalyzer.dijkstra
# theses variables are singletons (bound to the module)
G = None
ix_coords = None
sp_index = None
class GSE_ServiceArea_Algorithm(QgsProcessingAlgorithm):
def processAlgorithm(self, parameters, context, feedback):
s = QgsSettings()
s.setValue('gse_servicearea/settings', self.asMap(parameters, context)['inputs'])
source = self.parameterAsSource(parameters, 'INPUT', context)
reload = self.parameterAsBool(parameters, 'RELOAD', context)
hub = self.parameterAsPoint(
parameters, 'HUB', context, crs=source.sourceCrs())
max_minutes = self.parameterAsDouble(parameters, 'MINUTES', context)
(hubout, dest2_id) = self.parameterAsSink(
parameters, 'HUBOUTPUT', context,
QgsFields(), Qgis.WkbType.Point, source.sourceCrs())
fields = QgsFields()
fields.append(QgsField('minutes_1', QMetaType.Double))
fields.append(QgsField('minutes_2', QMetaType.Double))
(sink, dest_id) = self.parameterAsSink(
parameters, 'OUTPUT', context,
fields, source.wkbType(), source.sourceCrs())
# read network into QgsGraph
progress = get_graph(source, reload, feedback)
# create temporary graph vertex for location and get id
idhub, phub, eh1_id, eh2_id, progress = insert_point(hub, feedback, progress)
if feedback.isCanceled():
return
# evaluate cost list into output layer
progress = do_servicearea(idhub, eh1_id, eh2_id, max_minutes, sink, feedback, progress)
if feedback.isCanceled():
return
# delete temporary graph vertex
# G.removeVertex(idhub)
# output start node
progress = output_hub(phub, hubout, feedback, progress)
if feedback.isCanceled():
return
return {"HUBOUTPUT": dest2_id, "OUTPUT": dest_id}
def createInstance(self):
return GSE_ServiceArea_Algorithm()
def name(self):
return 'gse_servicearea'
def displayName(self):
return 'Service area from point'
def group(self):
return 'gis.stackexchange.com'
def groupId(self):
return 'gse'
def shortDescription(self):
return ('Service area from point')
def shortHelpString(self):
return '''
This algorithm creates a new vector layer with all the edges or parts of edges of a network line layer that can be reached with limited travel cost (time), starting from a point.
The network line layer has to be prepared:
- segmented: every edge should consist of a straight line with exactly two nodes/vertices
- correctly noded: two edges connect if and only if they share a common node
- prepared costs as fields 'minutes_forward' and 'minutes_backward'
- if an edge is oneway, the field for the other direction gets the value '-1.0'
The output layer "service area" will contain one (directed) feature
for every edge in the service area with two fields "minutes_1" and
"minutes_2" for both endpoints.
'''
def initAlgorithm(self, config=None):
saved = QgsSettings().value('fds_servicearea/settings', {})
self.addParameter(
QgsProcessingParameterFeatureSource(
name='INPUT', description='network',
types=[QgsProcessing.TypeVectorLine],
defaultValue=saved.get('INPUT')))
self.addParameter(
QgsProcessingParameterBoolean(
name='RELOAD', description='delete cache',
defaultValue=False))
self.addParameter(
QgsProcessingParameterPoint(
name='HUB', description='start point',
defaultValue=saved.get('HUB')))
self.addParameter(
QgsProcessingParameterNumber(
name='MINUTES', description='minutes',
defaultValue=saved.get('MINUTES', 12.0)))
self.addParameter(
QgsProcessingParameterFeatureSink(
name='OUTPUT', description='service area'))
self.addParameter(
QgsProcessingParameterFeatureSink(
name='HUBOUTPUT', description='output start point'))
def get_graph(l_in, reload, feedback):
global G, ix_coords, sp_index
if G and not reload:
feedback.pushInfo(f'Kanten (gerichtet): {G.edgeCount()}')
feedback.pushInfo(f'Knoten: {G.vertexCount()}')
feedback.pushInfo('')
return 0
# read network into QgsGraph
feedback.setProgressText('Straßennetz einlesen')
progress = 0
feedback.setProgress(progress)
filter = f'{field_cost_forward} > 0 or {field_cost_backward} > 0'
attrs = [field_cost_forward, field_cost_backward]
req = QgsFeatureRequest().setFilterExpression(filter)\
.setSubsetOfAttributes(attrs, l_in.fields())
G = QgsGraph()
ix_coords = {}
total = 60.0 / fc if (fc:=l_in.featureCount()) else 60.0
for current, edge in enumerate(l_in.getFeatures(req)):
if feedback.isCanceled():
return
mforw = edge[field_cost_forward]
mbackw = edge[field_cost_backward]
g = edge.geometry()
g1, g2 = g.vertices()
c1 = g1.x(), g1.y()
c2 = g2.x(), g2.y()
try:
graph_id1 = ix_coords[c1]
except KeyError:
graph_id1 = G.addVertex(QgsPointXY(g1))
ix_coords[c1] = graph_id1
try:
graph_id2 = ix_coords[c2]
except KeyError:
graph_id2 = G.addVertex(QgsPointXY(g2))
ix_coords[c2] = graph_id2
if mforw >= 0:
G.addEdge(graph_id1, graph_id2, (mforw, ))
if mbackw >= 0:
G.addEdge(graph_id2, graph_id1, (mbackw, ))
feedback.setProgress(int(current * total))
progress = 60
feedback.setProgress(progress)
feedback.pushInfo(f'Kanten (gerichtet): {G.edgeCount()}')
feedback.pushInfo(f'Knoten: {G.vertexCount()}')
feedback.pushInfo('')
feedback.setProgressText('räumlichen Index erzeugen')
sp_index = QgsSpatialIndex(
l_in.getFeatures(req), flags=QgsSpatialIndex.FlagStoreFeatureGeometries)
progress = 80
feedback.setProgress(progress)
return progress
def insert_point(pt, feedback, progress):
# find next edge near given location
# and insert graph edges to its endpoints
global G, ix_coord, sp_index
feedback.setProgressText('Startpunkt in Graph einfügen')
# find next edge near given location
# https://gis.stackexchange.com/a/449288/18013
nearest_id = sp_index.nearestNeighbor(pt, 1)[0] # we need only one neighbour
# get endpoints and distance from p1 to location-nearest point
# get point projected on nearest_geom
nearest_geom = sp_index.geometry(nearest_id)
p1, p2 = nearest_geom.vertices()
pos = nearest_geom.lineLocatePoint(QgsGeometry.fromPointXY(pt))
pp = nearest_geom.interpolate(pos)
# get coordinates of endpoints
c1 = p1.x(), p1.y()
c2 = p2.x(), p2.y()
# get graph vertices from endpoints
ixv1 = ix_coords[c1]
v1 = G.vertex(ixv1)
ixv2 = ix_coords[c2]
v2 = G.vertex(ixv2)
# get graph edges between endpoints
for ix in v1.incomingEdges():
e2_id = ix
e2 = G.edge(ix)
if e2.fromVertex() == ixv2:
break
else:
e2 = None
for ix in v2.incomingEdges():
e1_id = ix
e1 = G.edge(ix)
if e1.fromVertex() == ixv1:
break
else:
e1 = None
# get cost from location to endpoints
len_edge = nearest_geom.length()
if e1:
cost1 = e1.cost(0) * pos / len_edge
if e2:
cost2 = e2.cost(0) * (1.0 - (pos / len_edge))
# insert new vertice in graph
graph_id = G.addVertex(pp.asPoint())
# insert new edges in graph
if e1:
G.addEdge(graph_id, ixv1, (cost1, ))
if e2:
G.addEdge(graph_id, ixv2, (cost2, ))
progress += 5
feedback.setProgress(progress)
return graph_id, pp, e1_id, e2_id, progress
def do_servicearea(idhub, eh1_id, eh2_id, max_minutes, sink, feedback, progress):
# evaluate cost list into output layer
# calculate costs from location
feedback.setProgressText('Erreichbarkeiten berechnen')
le, lc = dijkstra(G, idhub, 0)
progress += 5
feedback.setProgress(progress)
if feedback.isCanceled():
return
total = 95.0 - progress
if vc:=G.vertexCount():
total = total / vc
# lc has one entry per graph node: cost to this node
# if this is less then max_minutes: put outgoing edges to output
# le has one entry per graph node: edge leading to this node
for n1_id, (c1, e01_id) in enumerate(zip(lc, le)):
if c1 <= max_minutes:
n1 = G.vertex(n1_id)
p1 = n1.point()
for e12_id in n1.outgoingEdges():
# ignore splitted edge at hub
if e12_id in (eh1_id, eh2_id):
continue
# calculate cost to theoretic meeting point (cm)
# we want to fully draw all outgoing edges from n1, except:
# - not the opposite edge to e01
# - only part of edges where the other part is quickest
# from other direction (cm > c2)
# - only part of edges where max_minutes is exceeded
# (cm > max_minutes)
e12 = G.edge(e12_id)
c12 = e12.cost(0)
n2_id = e12.toVertex()
n2 = G.vertex(n2_id)
p2 = n2.point()
c2 = lc[n2_id]
e21_id = G.findOppositeEdge(e12_id)
if e21_id >= 0 and e21_id == e01_id:
continue # this is the edge we are coming from
if e21_id >= 0:
e21 = G.edge(e21_id)
c21 = e21.cost(0)
cm = (c1 + c2 + (c12 + c21) / 2) / 2
else:
cm = c1 + c12
c_end = min(cm, max_minutes)
# full edge
g = QgsGeometry.fromPolylineXY([p1, p2])
# part of edge
if c_end - c1 < c12:
posm = (c_end - c1) / c12
dm = g.length() * posm
pm = g.interpolate(dm).asPoint()
g = QgsGeometry.fromPolylineXY([p1, pm])
out_feature = QgsFeature()
out_feature.setGeometry(g)
out_feature.setAttributes([c1, c_end])
sink.addFeature(out_feature, QgsFeatureSink.FastInsert)
feedback.setProgress(progress + int(n1_id * total))
if feedback.isCanceled():
break
progress = 95.0
return progress
def output_hub(phub, hubout, feedback, progress):
# show projected location of hub
feedback.setProgressText('Startpunkt (projiziert) ausgeben')
out_feature = QgsFeature()
out_feature.setGeometry(phub)
hubout.addFeature(out_feature, QgsFeatureSink.FastInsert)
progress += 5
feedback.setProgress(progress)
return progress
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment