Skip to content

Commit

Permalink
fix: inspect laneStatusCoded to deduce direction (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
hbruch authored Apr 17, 2024
1 parent cf72d95 commit 72d4241
Show file tree
Hide file tree
Showing 7 changed files with 3,869 additions and 19 deletions.
109 changes: 90 additions & 19 deletions pipeline/transformer/cifs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import argparse
import datetime
import json
import logging
import re
from datetime import datetime
from typing import Optional

import defusedxml.ElementTree as ET
import requests
Expand All @@ -18,13 +20,17 @@


class DatexII2CifsTransformer:
# Pattern to validate lanestatus encoding. For expected values, see https://www.mdm-portal.de/wp-content/uploads/2019/03/mdm_datenmodell_baustellen_04-00-00.zip
LANE_STATUS_PATTERN = re.compile('^([sxui]*l?)?[suioewx]*(1|2)[suioewx]*(r?[xsoe]*)?$')

should_skip_roadworks_in_past = True

def __init__(self, reference, should_skip_roadworks_in_past: bool = True):
def __init__(self, reference, should_skip_roadworks_in_past: bool = True, current_time: datetime = datetime.now()):
self.reference = reference
self.should_skip_roadworks_in_past = should_skip_roadworks_in_past
self.current_time = current_time

def _roadworks_name(self, situationRecord):
def _roadworks_name(self, situationRecord: ET) -> str | None:
"""
Extracts roadworks name from generalPublicComment with commentType2 equal to roadworksName.
Expand All @@ -50,7 +56,7 @@ def _roadworks_name(self, situationRecord):
return generalPublicComment.find('d:comment/d:values/d:value', ns).text
return None

def _road_name(self, situationRecord):
def _road_name(self, situationRecord: ET) -> str:
"""
Extracts road name from linearElement within groupOfLocations:
Expand Down Expand Up @@ -78,7 +84,7 @@ def _road_name(self, situationRecord):
roadnumber = roadNumberElement.text if roadNumberElement is not None else ''
return f'{roadnumber} {roadname}'.strip()

def _incident_type(self, situationRecord):
def _incident_type(self, situationRecord: ET) -> str:
roadworkType = situationRecord.find('d:roadOrCarriagewayOrLaneManagementType', ns)
if roadworkType is None:
roadworkType = situationRecord.find('d:roadMaintenanceType', ns)
Expand All @@ -89,7 +95,7 @@ def _incident_type(self, situationRecord):

return type

def _incident_subtype(self, situationRecord):
def _incident_subtype(self, situationRecord: ET) -> str:
"""
Returns ROAD_CLOSED_CONSTRUCTION in case the road is incident_type is ROAD_CLOSED
"""
Expand All @@ -103,7 +109,7 @@ def _is_referenced_as_cause(self, situation, situationRecord):

return managedCause is not None

def _should_skip(self, situation, situationRecord):
def _should_skip(self, situation: ET, situationRecord: ET) -> bool:
"""
Skips a situationRecord if one of the following criteris is met:
* suffix ends on '-gegen' (BEMaS/BIS specific encoding of opposite direction, which will be handled by setting direction as BOTH_DIRECTIONS)
Expand All @@ -120,7 +126,7 @@ def _should_skip(self, situation, situationRecord):

if self.should_skip_roadworks_in_past:
(starttime, endtime) = self._get_start_end_time(situationRecord)
if datetime.datetime.now().astimezone() > datetime.datetime.fromisoformat(endtime):
if self.current_time.astimezone() > datetime.fromisoformat(endtime):
logging.debug('skip situationRecord %s as it is in the past', situationRecord.get('id'))
return True

Expand All @@ -130,21 +136,86 @@ def _should_skip(self, situation, situationRecord):

return False

def _detect_direction(self, situation, situationRecord):
def _laneStatusCoded(self, situationRecord: ET) -> Optional[str]:
"""
Extracts the laneStatusCoded value of the given situationRecord or None, if not available.
Example: for the following examplary, partial situationRecord, `o2xx` would be returned:
<situationRecord id="xxx">
<impact>
<impactExtension>
<impactExtended>
<laneStatusCoded>o2xx</laneStatusCoded>
<laneRestriction>
<lane>allLanesCompleteCarriageway</lane>
</laneRestriction>
</impactExtended>
</impactExtension>
</impact>
</situationRecord>
"""
lsElement = situationRecord.find('d:impact/d:impactExtension/d:impactExtended/d:laneStatusCoded', ns)

if lsElement is None:
return None
if self.LANE_STATUS_PATTERN.match(lsElement.text):
return lsElement.text

logging.warn(
'ignore laneStatus %s for situatinoRecord %s as it has unexpected encoding',
lsElement.text,
situationRecord.get('id'),
)
return None

@staticmethod
def _is_opposite_direction_concerned(lanestatus: str) -> bool:
# lanes can be single carriageways (encoded by a centre line indicated by '1', or dual carriageways (encoded by separating centreline indicated '2'))
# we split at both.
lanesPerDirection = lanestatus.replace('2', '1').split('1')

leftLanes = lanesPerDirection[0]
rightLanes = lanesPerDirection[1]

# if leftLanes include more than unnrestricted lane, should, shoulder separatore, or
# some lanes of opposite directions are switched to the right lines, opposite direction is concerned
hasAnyLaneLeftToCentreLine = 0 < len(leftLanes)
hasLeftLanesOtherThanUnrestrictedAndSideLanes = len(re.sub('[usl]', '', leftLanes)) > 0
hasOppositeLanesRightToCentreLine = len(re.sub('[^uiw]', '', rightLanes)) > 0
return (
hasAnyLaneLeftToCentreLine
and hasLeftLanesOtherThanUnrestrictedAndSideLanes
or hasOppositeLanesRightToCentreLine
)

def _detect_direction(self, situation: ET, situationRecord: ET) -> str:
"""
For BIS/BEMaS generated DATEX, a road closure has also an opposite direction,
if for a situationRecord with id suffix -sperrung a situation with
id suffix '-gegen-sperrung' exists.
For constructions, we rely on existance of laneStatusCoded to deduce if
any lane left of the centre line is blocked or dedicated to traffic
in this record's direction.
"""
situationRecordId = situationRecord.get('id')
inverse_direction_id = situationRecordId.replace('-sperrung', '-gegen-sperrung')
return (
'BOTH_DIRECTIONS'
if situation.find("d:situationRecord[@id='{}']".format(inverse_direction_id), ns)
else 'ONE_DIRECTION'
)

def _get_start_end_time(self, situationRecord):
situationRecordId = situationRecord.get('id')
if situationRecordId.endswith('-sperrung'):
inverse_direction_id = situationRecordId.replace('-sperrung', '-gegen-sperrung')
return (
'BOTH_DIRECTIONS'
if situation.find("d:situationRecord[@id='{}']".format(inverse_direction_id), ns)
else 'ONE_DIRECTION'
)

laneStatusCoded = self._laneStatusCoded(situationRecord)
if laneStatusCoded is not None:
return 'BOTH_DIRECTIONS' if self._is_opposite_direction_concerned(laneStatusCoded) else 'ONE_DIRECTION'

# be defensive, if we don't know, be assume both are concerned
return 'BOTH_DIRECTIONS'

def _get_start_end_time(self, situationRecord: ET) -> tuple[str, str]:
"""
Extracts daate/time intervaal from validityTimeSpecification.
"""
Expand All @@ -154,7 +225,7 @@ def _get_start_end_time(self, situationRecord):

return (starttime, endtime)

def _parse(self, datex2file):
def _parse(self, datex2file: str) -> ET:
if datex2file.startswith('http'):
r = requests.get(datex2file, timeout=10)
r.encoding = 'UTF-8'
Expand Down Expand Up @@ -246,7 +317,7 @@ def transform_datex2(self, datex2doc: ET, format: str = 'cifs') -> dict:
geojson = {'type': 'FeatureCollection', 'features': features}
json_result = geojson
else:
incidents = {'incidents': closures, 'timestamp': datetime.datetime.now().isoformat()}
incidents = {'incidents': closures, 'timestamp': self.current_time.isoformat()}
json_result = incidents

return json_result
Expand Down
13 changes: 13 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[pytest]
env =
IPL_GTFS_DB_POSTGRES_HOST=''
IPL_GTFS_DB_POSTGRES_USER=''
IPL_GTFS_DB_POSTGRES_PASSWORD=''
IPL_GTFS_DB_POSTGRES_DB=''
IPL_GTFS_IMPORTER_GTFS_DOWNLOAD_URL=''
IPL_GTFS_IMPORTER_GTFS_DOWNLOAD_USER_AGENT=''
IPL_GTFS_DB_POSTGRES_DB_PREFIX=''
IPL_GTFS_DB_POSTGREST_USER=''
IPL_GTFS_DB_POSTGREST_PASSWORD=''
IPL_GTFS_IMPORTER_HOST_GTFS_OUTPUT_DIR=''
IPL_GTFS_IMPORTER_HOST_CUSTOM_SCRIPTS_DIR=''
2 changes: 2 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pandas-stubs==2.1.1.230928
types-SQLAlchemy==1.4.53.38
# for docker currently no official stubs exist, see https://github.com/docker/docker-py/issues/2796
docker-stubs @ git+https://github.com/rdozier-work/docker-stubs@9de7906804ae912f1d644c97b617ac77e784fca8
pytest==7.4.3
pytest-env==1.1.3

ruff~=0.1.6
black~=23.10.1
Expand Down
Empty file added tests/transformer/__init__.py
Empty file.
Loading

0 comments on commit 72d4241

Please sign in to comment.