diff --git a/src/genet/input/gtfs_reader.py b/src/genet/input/gtfs_reader.py index 3509e9a9..834a094e 100644 --- a/src/genet/input/gtfs_reader.py +++ b/src/genet/input/gtfs_reader.py @@ -13,12 +13,19 @@ from genet.utils import persistence, spatial -def read_services_from_calendar(path, day): - """ - return list of services to be included - :param path: path to GTFS folder - :param day: 'YYYYMMDD' for specific day - :return: +def read_services_from_calendar(path: str, day: str) -> list: + """Return list of services to be included. + + Args: + path (str): Path to GTFS directory. + day (str): 'YYYYMMDD' for specific day in which to find services. + + Raises: + RuntimeError: There must be at least one service on the day selected. + RuntimeError: GTFS directory must include a calendar file. + + Returns: + list: Included service IDs. """ logging.info("Reading the calendar for GTFS") diff --git a/src/genet/input/matsim_reader.py b/src/genet/input/matsim_reader.py index e67c11d2..13be1689 100644 --- a/src/genet/input/matsim_reader.py +++ b/src/genet/input/matsim_reader.py @@ -1,6 +1,7 @@ import logging import re import xml.etree.cElementTree as ET +from typing import Optional import networkx as nx from pyproj import Proj, Transformer @@ -9,14 +10,24 @@ from genet.utils import dict_support, java_dtypes, spatial -def read_node(elem, g, node_id_mapping, node_attribs, transformer): - """ - Adds node elem of the stream to the network - :param elem: - :param g: nx.MultiDiGraph - :param node_id_mapping: - :param transformer: - :return: +def read_node( + elem: ET.Element, + g: nx.MultiDiGraph, + node_id_mapping: dict, + node_attribs: dict, + transformer: Transformer, +) -> tuple[nx.MultiDiGraph, dict]: + """Adds node elem of the stream to the network. + + Args: + elem (ET.Element): Element of the stream. + g (nx.MultiDiGraph): Network. + node_id_mapping (dict): Mapping from node IDs to S2 IDs. + node_attribs (dict): Node attributes to attach to the node in the network object. + transformer (Transformer): PyProj CRS Transformer to update the `elem` `x`/`y` coordinates to `lat`/`lon`. + + Returns: + tuple[nx.MultiDiGraph, dict]: Network with added node; found duplicated node IDs. """ duplicated_node_id = {} attribs = elem.attrib @@ -49,18 +60,35 @@ def read_node(elem, g, node_id_mapping, node_attribs, transformer): return g, duplicated_node_id -def read_link(elem, g, u, v, node_id_mapping, link_id_mapping, link_attribs): - """ - Reads link elem of the stream to the network - :param elem: - :param g: nx.MultiDiGraph - :param u: from node of the previous link - :param v: to node of the previous link - :param node_id_mapping: - :param link_id_mapping: - :param link_attribs: link attributes of the previous link - :return: +def read_link( + elem: ET.Element, + g: nx.MultiDiGraph, + u: str, + v: str, + node_id_mapping: dict, + link_id_mapping: dict, + link_attribs: dict, +) -> tuple[nx.MultiDiGraph, str, str, dict, dict]: + """Adds link elem of the stream to the network + + Args: + elem (ET.Element): Element of the stream. + g (nx.MultiDiGraph): Network. + u (str): `from` node of the previous link. + v (str): `to` node of the previous link. + node_id_mapping (dict): Mapping from node IDs to S2 IDs. + link_id_mapping (dict): Mapping from link ID to node to and from IDs. + link_attribs (dict): link attributes of the previous link. + + Returns: + tuple[nx.MultiDiGraph, str, str, dict, dict]: + Network with added link; + `from` node ID of link; + `to` node ID of link; + `link_id_mapping` with mapping for the newly added link; + found duplicated link IDs. """ + duplicated_link_id = {} attribs = elem.attrib @@ -105,13 +133,19 @@ def read_link(elem, g, u, v, node_id_mapping, link_id_mapping, link_attribs): return g, u, v, link_id_mapping, duplicated_link_id -def update_additional_attrib(elem, attribs, force_long_form_attributes=False): - """ - Reads additional attributes - :param elem: - :param attribs: current additional attributes - :param force_long_form_attributes: Defaults to False, if True the additional attributes will be read into long form - :return: +def update_additional_attrib( + elem: ET.Element, attribs: dict, force_long_form_attributes: bool = False +) -> dict: + """Updates additional attributes dictionary with data read from stream. + + Args: + elem (ET.Element): Stream element for which additional attributes are to be read and updated. + attribs (dict): Current additional attributes. + force_long_form_attributes (bool, optional): + If True the additional attributes will be read into long form. Defaults to False. + + Returns: + dict: Updated `attribs` dictionary with `elem` additional attributes. """ attribs[elem.attrib["name"]] = read_additional_attrib( elem, force_long_form_attributes=force_long_form_attributes @@ -119,11 +153,16 @@ def update_additional_attrib(elem, attribs, force_long_form_attributes=False): return attribs -def read_additional_attrib(elem, force_long_form_attributes=False): - """ - :param elem: - :param force_long_form_attributes: Defaults to False, if True the additional attributes will be read into long form - :return: +def read_additional_attrib(elem: ET.Element, force_long_form_attributes: bool = False) -> dict: + """Reads additional attributes dictionary from stream. + + Args: + elem (ET.Element): Stream element for which additional attributes are to be read and updated. + force_long_form_attributes (bool, optional): + If True the additional attributes will be read into long form. Defaults to False. + + Returns: + dict: `elem` additional attributes. """ if force_long_form_attributes: return _read_additional_attrib_to_long_form(elem) @@ -165,7 +204,7 @@ def _read_additional_attrib_to_short_form(elem): return t -def _read_additional_attrib_to_long_form(elem): +def _read_additional_attrib_to_long_form(elem: ET.Element) -> dict: return { "text": _read_additional_attrib_text(elem), "class": _read_additional_attrib_class(elem), @@ -188,37 +227,50 @@ def unique_link_id(link_id, link_id_mapping): return link_id, duplicated_link_id -def read_network(network_path, transformer: Transformer, force_long_form_attributes=False): - """ - Read MATSim network - :param network_path: path to the network.xml file - :param transformer: pyproj crs transformer - :param force_long_form_attributes: Defaults to False, if True the additional attributes will be read into verbose - format: - { - 'additional_attrib': {'name': 'additional_attrib', 'class': 'java.lang.String', 'text': 'attrib_value'} - } - where 'attrib_value' is always a python string; instead of the default short form: - { - 'additional_attrib': 'attrib_value' - } - where the type of attrib_value is mapped to a python type using the declared java class. - NOTE! Network level attributes cannot be forced to be read into long form. - :return: g (nx.MultiDiGraph representing the multimodal network), - node_id_mapping (dict {matsim network node ids : s2 spatial ids}), - link_id_mapping (dict {matsim network link ids : {'from': matsim id from node, ,'to': matsim id to - node, 's2_from' : s2 spatial ids from node, 's2_to': s2 spatial ids to node}}) +def read_network( + network_path: str, transformer: Transformer, force_long_form_attributes: bool = False +) -> tuple[nx.MultiDiGraph, dict, dict, dict, dict]: + """Read MATSim network. + + Args: + network_path (str): path to the network.xml file. + transformer (Transformer): PyProj CRS Transformer to update the `elem` `x`/`y` coordinates to `lat`/`lon`. + force_long_form_attributes (bool, optional): + If True the additional attributes will be read into verbose format: + ```dict + {'additional_attrib': {'name': 'additional_attrib', 'class': 'java.lang.String', 'text': attrib_value}} + ``` + where `attrib_value` is always a python string. + + If False, defaults to short-form: + ```python + {'additional_attrib': attrib_value} + ``` + where the type of `attrib_value` is mapped to a python type using the declared java class. + + !!! note + Network level attributes cannot be forced to be read into long form. + + Defaults to False. + + Returns: + tuple[nx.MultiDiGraph, dict, dict, dict, dict]: + Representation of the multimodal network; + MATSim node ID mapping: `{network node ID : s2 spatial ID}`; + MATSIM link ID to node ID mapping: `{network link ID : {'from': from node ID, 'to': to node ID, 's2_from' : from node S2 spatial ID, 's2_to': to node S2 spatial ID}}` + Network additional attribute dictionary. """ g = nx.MultiDiGraph() - network_attributes = {} - node_id_mapping = {} - node_attribs = {} - link_id_mapping = {} - link_attribs = {} - duplicated_link_ids = {} - duplicated_node_ids = {} - u, v = None, None + network_attributes: dict = {} + node_id_mapping: dict = {} + node_attribs: dict = {} + link_id_mapping: dict = {} + link_attribs: dict = {} + duplicated_link_ids: dict = {} + duplicated_node_ids: dict = {} + u: Optional[str] = None + v: Optional[str] = None elem_themes_for_additional_attributes = {"network", "nodes", "links"} elem_type_for_additional_attributes = None @@ -272,23 +324,39 @@ def read_network(network_path, transformer: Transformer, force_long_form_attribu return g, link_id_mapping, duplicated_node_ids, duplicated_link_ids, network_attributes -def read_schedule(schedule_path, epsg, force_long_form_attributes=False): - """ - Read MATSim schedule - :param schedule_path: path to the schedule.xml file - :param epsg: 'epsg:12345' - :param force_long_form_attributes: Defaults to False, if True the additional attributes will be read into verbose - format: - { - 'additional_attrib': {'name': 'additional_attrib', 'class': 'java.lang.String', 'text': 'attrib_value'} - } - where 'attrib_value' is always a python string; instead of the default short form: - { - 'additional_attrib': 'attrib_value' - } - where the type of attrib_value is mapped to a python type using the declared java class. - NOTE! Schedule level attributes cannot be forced to be read into long form. - :return: list of Service objects +def read_schedule( + schedule_path: str, epsg: str, force_long_form_attributes: bool = False +) -> tuple[list, dict, dict, dict]: + """Read MATSim schedule. + + Args: + schedule_path (str): Path to the `schedule.xml` file. + epsg (str): Schedule projection CRS, e.g. `epsg:4326`. + force_long_form_attributes (bool, optional): + If True the additional attributes will be read into verbose format: + ```dict + {'additional_attrib': {'name': 'additional_attrib', 'class': 'java.lang.String', 'text': attrib_value}} + ``` + where `attrib_value` is always a python string. + + If False, defaults to short-form: + ```python + {'additional_attrib': attrib_value} + ``` + where the type of `attrib_value` is mapped to a python type using the declared java class. + + !!! note + Schedule level attributes cannot be forced to be read into long form. + + Defaults to False. + + Returns: + tuple[list, dict, dict, dict]: + list of Service objects; + Minimal transfer times between stops; + Transit stop ID mapping; + Schedule additional attributes. + """ services = [] transformer = Transformer.from_proj(Proj(epsg), Proj("epsg:4326"), always_xy=True) @@ -359,14 +427,14 @@ def write_transitLinesTransitRoute(transitLine, transitRoutes, transportMode): _service.add_additional_attributes({"attributes": transitLine["attributes"]}) services.append(_service) - transitLine = {} - transitRoutes = {} - transportMode = {} - transit_stop_id_mapping = {} + transitLine: dict = {} + transitRoutes: dict = {} + transportMode: dict = {} + transit_stop_id_mapping: dict = {} is_minimalTransferTimes = False - minimalTransferTimes = ( - {} - ) # {'stop_id_1': {'stop_id_2': 0.0}} seconds_to_transfer between stop_id_1 and stop_id_2 + + # {'stop_id_1': {'stop_id_2': 0.0}} seconds_to_transfer between stop_id_1 and stop_id_2 + minimalTransferTimes: dict = {} elem_themes_for_additional_attributes = { "transitSchedule", @@ -375,7 +443,7 @@ def write_transitLinesTransitRoute(transitLine, transitRoutes, transportMode): "transitRoute", } elem_type_for_additional_attributes = None - schedule_attribs = {} + schedule_attribs: dict = {} # Track IDs through the stream current_stop_id = None current_route_id = None diff --git a/src/genet/input/osmnx_customised.py b/src/genet/input/osmnx_customised.py index 1baa596c..15880201 100644 --- a/src/genet/input/osmnx_customised.py +++ b/src/genet/input/osmnx_customised.py @@ -6,21 +6,17 @@ # rip and monkey patch of a few functions from osmnx.core to customise the tags being saved to the graph -def parse_osm_nodes_paths(osm_data, config): - """ - function from osmnx, adding our own spin on this - need extra tags +def parse_osm_nodes_paths(osm_data: dict, config: "osm_reader.Config") -> tuple[dict, dict]: + """Construct dicts of nodes and paths with key=osmid and value=dict of attributes. - Construct dicts of nodes and paths with key=osmid and value=dict of - attributes. + Function from osmnx. Adding our own spin on this - need extra tags - Parameters - ---------- - osm_data : dict - JSON response from from the Overpass API + Args: + osm_data (dict): JSON response from from the Overpass API + config (genet.input.osm_reader.Config): OSM reader configuration. - Returns - ------- - nodes, paths : tuple + Returns: + tuple[dict, dict]: Nodes; Paths. """ nodes = {} @@ -40,18 +36,15 @@ def parse_osm_nodes_paths(osm_data, config): return nodes, paths -def get_node(element, config): - """ - Convert an OSM node element into the format for a networkx node. +def get_node(element: dict, config: "osm_reader.Config") -> dict: + """Convert an OSM node element into the format for a networkx node. - Parameters - ---------- - element : dict - an OSM node element + Args: + element (dict): An OSM node element. + config (genet.input.osm_reader.Config): OSM reader configuration. - Returns - ------- - dict + Returns: + dict: OSM node element converted to the networkx node format. """ node = {} @@ -65,20 +58,17 @@ def get_node(element, config): return node -def get_path(element, config): - """ - function from osmnx, adding our own spin on this - need extra tags +def get_path(element: dict, config: "osm_reader.Config") -> dict: + """Convert an OSM way element into the format for a networkx graph path. - Convert an OSM way element into the format for a networkx graph path. + Function from osmnx, adding our own spin on this - need extra tags - Parameters - ---------- - element : dict - an OSM way element + Args: + element (dict): An OSM way element. + config (genet.input.osm_reader.Config): OSM reader configuration. - Returns - ------- - dict + Returns: + dict: OSM way element converted to the networkx graph path format. """ path = {} @@ -97,13 +87,13 @@ def get_path(element, config): return path -def return_edges(paths, config, bidirectional=False): - """ - Makes graph edges from osm paths - :param paths: dictionary {osm_way_id: {osmid: x, nodes:[a,b], osmtags: vals}} - :param config: genet.input.osm_reader.Config object - :param bidirectional: bool value if True, reads all paths as both ways - :return: +def return_edges(paths: dict, config: "osm_reader.Config", bidirectional: bool = False): + """Makes graph edges from osm paths. + + Args: + paths (dict): OSM paths, e.g. `{osm_way_id: {osmid: x, nodes:[a,b], osmtags: vals}}` + config (genet.input.osm_reader.Config): OSM reader configuration object. + bidirectional (bool, optional): If True, reads all paths as both ways. Defaults to False. """ def extract_osm_data(data, es): diff --git a/src/genet/input/read.py b/src/genet/input/read.py index 3bb673cb..69f33cc6 100644 --- a/src/genet/input/read.py +++ b/src/genet/input/read.py @@ -1,11 +1,13 @@ import ast import json import logging +from typing import Optional import geopandas as gpd import networkx as nx import pandas as pd +import genet import genet.core as core import genet.input.gtfs_reader as gtfs_reader import genet.input.matsim_reader as matsim_reader @@ -21,29 +23,40 @@ def read_matsim( path_to_network: str, epsg: str, - path_to_schedule: str = None, - path_to_vehicles: str = None, - force_long_form_attributes=False, -): - """ - Reads MATSim's network.xml to genet.Network object and if give, also the schedule.xml and vehicles.xml into - genet.Schedule object, part of the genet.Network object. - :param path_to_network: path to MATSim's network.xml file - :param path_to_schedule: path to MATSim's schedule.xml file, optional - :param path_to_vehicles: path to MATSim's vehicles.xml file, optional, expected to be passed with a schedule - :param epsg: projection for the network, e.g. 'epsg:27700' - :param force_long_form_attributes: Defaults to False, if True the additional attributes will be read into verbose - format: - { - 'additional_attrib': {'name': 'additional_attrib', 'class': 'java.lang.String', 'text': 'attrib_value'} - } - where 'attrib_value' is always a python string; instead of the default short form: - { - 'additional_attrib': 'attrib_value' - } - where the type of attrib_value is mapped to a python type using the declared java class. - NOTE! Network and Schedule level attributes cannot be forced to be read into long form. - :return: genet.Network object + path_to_schedule: Optional[str] = None, + path_to_vehicles: Optional[str] = None, + force_long_form_attributes: bool = False, +) -> core.Network: + """Creates a GeNet Network from MATSim's network.xml and (optionally) schedule.xml and vehicles.xml files. + + If given, schedule and vehicles files will be used to create a `genet.Schedule` object, which will be added to the generated `genet.Network` object. + the schedule file needs to be given if the vehicles file is given. + + Args: + path_to_network (str): Path to MATSim's `network.xml` file. + epsg (str): Projection for the network, e.g. 'epsg:27700'. + path_to_schedule (Optional[str], optional): Path to MATSim's `schedule.xml` file. Defaults to None. + path_to_vehicles (Optional[str], optional): Path to MATSim's `vehicles.xml` file,. Defaults to None. + force_long_form_attributes (bool, optional): + If True the additional attributes will be read into verbose format: + ```dict + {'additional_attrib': {'name': 'additional_attrib', 'class': 'java.lang.String', 'text': attrib_value}} + ``` + where `attrib_value` is always a python string. + + If False, defaults to short-form: + ```python + {'additional_attrib': attrib_value} + ``` + where the type of `attrib_value` is mapped to a python type using the declared java class. + + !!! note + Network level attributes cannot be forced to be read into long form. + + Defaults to False. + + Returns: + core.Network: GeNet Network object. """ n = read_matsim_network( path_to_network=path_to_network, @@ -60,23 +73,34 @@ def read_matsim( return n -def read_matsim_network(path_to_network: str, epsg: str, force_long_form_attributes=False): - """ - Reads MATSim's network.xml to genet.Network object - :param path_to_network: path to MATSim's network.xml file - :param epsg: projection for the network, e.g. 'epsg:27700' - :param force_long_form_attributes: Defaults to False, if True the additional attributes will be read into verbose - format: - { - 'additional_attrib': {'name': 'additional_attrib', 'class': 'java.lang.String', 'text': 'attrib_value'} - } - where 'attrib_value' is always a python string; instead of the default short form: - { - 'additional_attrib': 'attrib_value' - } - where the type of attrib_value is mapped to a python type using the declared java class. - NOTE! Network level attributes cannot be forced to be read into long form. - :return: genet.Network object +def read_matsim_network( + path_to_network: str, epsg: str, force_long_form_attributes: bool = False +) -> core.Network: + """Reads MATSim's network.xml to genet.Network object. + + Args: + path_to_network (str): Path to MATSim's `network.xml` file. + epsg (str): Projection for the network, e.g. 'epsg:27700'. + force_long_form_attributes (bool, optional): + If True the additional attributes will be read into verbose format: + ```dict + {'additional_attrib': {'name': 'additional_attrib', 'class': 'java.lang.String', 'text': attrib_value}} + ``` + where `attrib_value` is always a python string. + + If False, defaults to short-form: + ```python + {'additional_attrib': attrib_value} + ``` + where the type of `attrib_value` is mapped to a python type using the declared java class. + + !!! note + Network level attributes cannot be forced to be read into long form. + + Defaults to False. + + Returns: + core.Network: GeNet Network object. """ n = core.Network(epsg=epsg) (n.graph, n.link_id_mapping, duplicated_nodes, duplicated_links, network_attributes) = ( @@ -105,25 +129,37 @@ def read_matsim_network(path_to_network: str, epsg: str, force_long_form_attribu def read_matsim_schedule( - path_to_schedule: str, epsg: str, path_to_vehicles: str = None, force_long_form_attributes=False -): - """ - Reads MATSim's schedule.xml (and possibly vehicles.xml) to genet.Schedule object - :param path_to_schedule: path to MATSim's schedule.xml file, - :param path_to_vehicles: path to MATSim's vehicles.xml file, optional but encouraged - :param epsg: projection for the schedule, e.g. 'epsg:27700' - :param force_long_form_attributes: Defaults to False, if True the additional attributes will be read into verbose - format: - { - 'additional_attrib': {'name': 'additional_attrib', 'class': 'java.lang.String', 'text': 'attrib_value'} - } - where 'attrib_value' is always a python string; instead of the default short form: - { - 'additional_attrib': 'attrib_value' - } - where the type of attrib_value is mapped to a python type using the declared java class. - NOTE! Schedule level attributes cannot be forced to be read into long form. - :return: genet.Schedule object + path_to_schedule: str, + epsg: str, + path_to_vehicles: Optional[str] = None, + force_long_form_attributes: bool = False, +) -> schedule_elements.Schedule: + """Reads MATSim's schedule.xml (and possibly vehicles.xml) to genet.Schedule object. + + Args: + path_to_schedule (str): Path to MATSim's `schedule.xml` file. + epsg (str): Projection for the network, e.g. 'epsg:27700'. + path_to_vehicles (Optional[str], optional): Path to MATSim's `vehicles.xml` file,. Defaults to None. + force_long_form_attributes (bool, optional): + If True the additional attributes will be read into verbose format: + ```dict + {'additional_attrib': {'name': 'additional_attrib', 'class': 'java.lang.String', 'text': attrib_value}} + ``` + where `attrib_value` is always a python string. + + If False, defaults to short-form: + ```python + {'additional_attrib': attrib_value} + ``` + where the type of `attrib_value` is mapped to a python type using the declared java class. + + !!! note + Network level attributes cannot be forced to be read into long form. + + Defaults to False. + + Returns: + schedule_elements.Schedule: GeNet Schedule object. """ (services, minimal_transfer_times, transit_stop_id_mapping, schedule_attributes) = ( matsim_reader.read_schedule( @@ -155,27 +191,33 @@ def read_matsim_schedule( return matsim_schedule -def read_json(network_path: str, epsg: str, schedule_path: str = ""): - """ - Reads Network and, if passed, Schedule JSON files in to a genet.Network - :param network_path: path to json network file - :param schedule_path: path to json schedule file - :param epsg: projection for the network, e.g. 'epsg:27700' - :return: genet.Network object +def read_json(network_path: str, epsg: str, schedule_path: Optional[str] = None) -> core.Network: + """Reads Network and, if passed, Schedule JSON files in to a genet.Network. + + Args: + network_path (str): path to JSON network file. + epsg (str): Projection for the network, e.g. 'epsg:27700'. + schedule_path (Optional[str], optional): Path to json schedule file. Defaults to None. + + Returns: + core.Network: GeNet network object. """ n = read_json_network(network_path, epsg) - if schedule_path: + if schedule_path is not None: n.schedule = read_json_schedule(schedule_path, epsg) return n -def read_geojson_network(nodes_path: str, links_path: str, epsg: str): - """ - Reads Network graph from JSON file. - :param nodes_path: path to geojson network nodes file - :param links_path: path to geojson network links file - :param epsg: projection for the network, e.g. 'epsg:27700' - :return: genet.Network object +def read_geojson_network(nodes_path: str, links_path: str, epsg: str) -> core.Network: + """Reads Network graph from JSON file. + + Args: + nodes_path (str): Path to geojson network nodes file. + links_path (str): Path to geojson network links file. + epsg (str): Projection for the network, e.g. 'epsg:27700'. + + Returns: + core.Network: GeNet network object. """ logging.info(f"Reading Network nodes from {nodes_path}") nodes = gpd.read_file(nodes_path) @@ -200,23 +242,26 @@ def read_geojson_network(nodes_path: str, links_path: str, epsg: str): return n -def read_json_network(network_path: str, epsg: str): - """ - Reads Network graph from JSON file. - :param network_path: path to json or geojson network file - :param epsg: projection for the network, e.g. 'epsg:27700' - :return: genet.Network object +def read_json_network(network_path: str, epsg: str) -> core.Network: + """Reads network JSON file in to a genet.Network. + + Args: + network_path (str): path to JSON or GeoJSON network file. + epsg (str): Projection for the network, e.g. 'epsg:27700'. + + Returns: + core.Network: GeNet network object. """ logging.info(f"Reading Network from {network_path}") with open(network_path) as json_file: json_data = json.load(json_file) - for node, data in json_data["nodes"].items(): + for _, data in json_data["nodes"].items(): try: del data["geometry"] except KeyError: pass - for link, data in json_data["links"].items(): + for _, data in json_data["links"].items(): try: data["geometry"] = spatial.decode_polyline_to_shapely_linestring(data["geometry"]) except KeyError: @@ -233,12 +278,15 @@ def read_json_network(network_path: str, epsg: str): return n -def read_json_schedule(schedule_path: str, epsg: str): - """ - Reads Schedule from a JSON file. - :param schedule_path: path to json or geojson schedule file - :param epsg: projection for the network, e.g. 'epsg:27700' - :return: genet.Schedule object +def read_json_schedule(schedule_path: str, epsg: str) -> schedule_elements.Schedule: + """Reads Schedule from a JSON file. + + Args: + schedule_path (str): path to JSON or GeoJSON schedule file. + epsg (str): Projection for the network, e.g. 'epsg:27700'. + + Returns: + schedule_elements.Schedule: GeNet schedule object. """ logging.info(f"Reading Schedule from {schedule_path}") with open(schedule_path) as json_file: @@ -279,26 +327,38 @@ def _literal_eval_col(df_col): return df_col -def read_csv(path_to_network_nodes: str, path_to_network_links: str, epsg: str): - """ - Reads CSV data into a genet.Network object - :param path_to_network_nodes: CSV file describing nodes. Should at least include columns: - - id: unique ID for the node - - x: spatial coordinate in given epsg - - y: spatial coordinate in given epsg - :param path_to_network_links: CSV file describing links. - Should at least include columns: - - from - source Node ID - - to - target Node ID - Optional columns, but strongly encouraged - - id - unique ID for link - - length - link length in metres - - freespeed - meter/seconds speed - - capacity - vehicles/hour - - permlanes - number of lanes - - modes - set of modes - :param epsg: projection for the network, e.g. 'epsg:27700' - :return: genet.Network object +def read_csv(path_to_network_nodes: str, path_to_network_links: str, epsg: str) -> core.Network: + """Reads CSV data into a genet.Network object + + Args: + path_to_network_nodes (str): + CSV file describing nodes. + Should at least include columns: + - id: unique ID for the node + - x: spatial coordinate in given epsg + - y: spatial coordinate in given epsg + + path_to_network_links (str): + CSV file describing links. + Should at least include columns: + - from - source Node ID + - to - target Node ID + + Optional columns, but strongly encouraged: + - id - unique ID for link + - length - link length in metres + - freespeed - meter/seconds speed + - capacity - vehicles/hour + - permlanes - number of lanes + - modes - set of modes + + epsg (str): Projection for the network, e.g. 'epsg:27700'. + + Raises: + NetworkSchemaError: Network nodes must have at least the columns specified above. + + Returns: + core.Network: GeNet network object. """ logging.info(f"Reading nodes from {path_to_network_nodes}") df_nodes = pd.read_csv(path_to_network_nodes) @@ -350,14 +410,22 @@ def read_csv(path_to_network_nodes: str, path_to_network_links: str, epsg: str): return n -def read_gtfs(path, day, epsg=None): - """ - Reads from GTFS. The resulting services will not have network routes. Assumed to be in lat lon epsg:4326. - :param path: to GTFS folder or a zip file - :param day: 'YYYYMMDD' to use from the gtfs - :param epsg: projection for the output Schedule, e.g. 'epsg:27700'. If not provided, the Schedule remains in - epsg:4326 - :return: +def read_gtfs(path: str, day: str, epsg: Optional[str] = None) -> schedule_elements.Schedule: + """Reads schedule from GTFS. + + The resulting services will not have network routes. + Input GTFS is assumed to be using the 'epsg:4326' projection. + + Args: + path (str): Path to GTFS folder or a zip file. + day (str): 'YYYYMMDD' to use from the GTFS. + epsg (Optional[str], optional): + Projection for the output Schedule, e.g. 'epsg:27700'. + If not provided, defaults to 'epsg:4326'. + Defaults to None. + + Returns: + schedule_elements.Schedule: GeNet schedule. """ logging.info(f"Reading GTFS from {path}") schedule_graph = gtfs_reader.read_gtfs_to_schedule_graph(path, day) @@ -367,18 +435,25 @@ def read_gtfs(path, day, epsg=None): return s -def read_osm(osm_file_path, osm_read_config, num_processes: int = 1, epsg=None): - """ - Reads OSM data into a graph of the Network object - :param osm_file_path: path to .osm or .osm.pbf file - :param osm_read_config: config file (see configs folder in genet for examples) which informs for example which - highway types to read (in case of road network) and what modes to assign to them - :param num_processes: number of processes to split parallelisable operations across - :param epsg: projection for the output Network, e.g. 'epsg:27700'. If not provided, defaults to epsg:4326 - :return: genet.Network object +def read_osm( + osm_file_path: str, osm_read_config: str, num_processes: int = 1, epsg: str = "epsg:4326" +) -> core.Network: + """Reads OSM data into a graph of the Network object. + + Args: + osm_file_path (str): path to .osm or .osm.pbf file + osm_read_config (str): + Path to config file, which informs e.g., which highway types to read (in case of road network) and what modes to assign to them. + See configs folder in genet for examples. + num_processes (int, optional): Number of processes to split parallelisable operations across. Defaults to 1. + epsg (Optional[str], optional): + Projection for the output Network, e.g. 'epsg:27700'. + Defaults to "epsg:4326". + + Returns: + core.Network: GeNet network object. """ - if epsg is None: - epsg = "epsg:4326" + config = osm_reader.Config(osm_read_config) n = core.Network(epsg) nodes, edges = osm_reader.generate_osm_graph_edges_from_file( @@ -414,10 +489,13 @@ def read_osm(osm_file_path, osm_read_config, num_processes: int = 1, epsg=None): return n -def read_matsim_road_pricing(path_to_file): - """ - TODO: implement - :param path_to_file: path to MATSim's road_pricing.xml file - :return: genet.Toll.. or other if applicable though not yet implemented (eg distance or area tolling) +def read_matsim_road_pricing(path_to_file: str) -> "genet.use.road_pricing.Toll": + """TODO: implement + + Args: + path_to_file (str): path to MATSim's road_pricing.xml file + + Returns: + genet.Toll: or other if applicable though not yet implemented (eg distance or area tolling) """ - pass + raise NotImplementedError() diff --git a/src/genet/modify/change_log.py b/src/genet/modify/change_log.py index 8f622a04..0c438988 100644 --- a/src/genet/modify/change_log.py +++ b/src/genet/modify/change_log.py @@ -1,21 +1,26 @@ from datetime import datetime -from typing import List, Union +from typing import Optional, Union import dictdiffer import pandas as pd +from typing_extensions import Self class ChangeLog(pd.DataFrame): - """ - Records changes in genet.core.Network into a pandas.DataFrame + def __init__(self, df: Optional[pd.DataFrame] = None): + """Records changes in genet.core.Network in a pandas.DataFrame - Change Events: - • Add : - • Modify : - • Remove : - """ + Change Events: + • Add : + • Modify : + • Remove : - def __init__(self, df=None): + Args: + df (Optional[pd.DataFrame], optional): + If given, initialise with `df`. + If not given, initialise with an empty DataFrame. + Defaults to None. + """ if df is None: super().__init__( columns=[ @@ -53,13 +58,16 @@ def add(self, object_type: str, object_id: Union[int, str], object_attributes: d ) def add_bunch( - self, object_type: str, id_bunch: List[Union[int, str]], attributes_bunch: List[dict] - ): + self, object_type: str, id_bunch: list[Union[int, str]], attributes_bunch: list[dict] + ) -> Self: """ - :param object_type: - :param id_bunch: same len as attributes_bunch - :param attributes_bunch: same len as id_bunch - :return: + Args: + object_type (str): GeNet object type. + id_bunch (list[Union[int, str]]): same len as `attributes_bunch`. + attributes_bunch (list[dict]): same len as `id_bunch`. + + Returns: + Self: Existing config concatenated with input bunch. """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return self.__class__( @@ -110,18 +118,21 @@ def modify( def modify_bunch( self, object_type: str, - old_id_bunch: List[Union[int, str]], - old_attributes: List[dict], - new_id_bunch: List[Union[int, str]], - new_attributes: List[dict], - ): + old_id_bunch: list[Union[int, str]], + old_attributes: list[dict], + new_id_bunch: list[Union[int, str]], + new_attributes: list[dict], + ) -> Self: """ - :param object_type: - :param old_id_bunch: same len as attributes_bunch - :param old_attributes: same len as attributes_bunch - :param new_id_bunch: same len as id_bunch - :param new_attributes: same len as id_bunch - :return: + Args: + object_type (str): GeNet object type. + old_id_bunch (list[Union[int, str]]): Same len as `attributes_bunch`. + old_attributes (list[dict]): Same len as `id_bunch.` + new_id_bunch (list[Union[int, str]]): Same len as `attributes_bunch`. + new_attributes (list[dict]): Same len as `id_bunch.` + + Returns: + Self: Existing config concatenated with modified bunch. """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return self.__class__( @@ -151,16 +162,25 @@ def modify_bunch( ) def simplify_bunch( - self, old_ids_list_bunch, new_id_bunch, indexed_paths_to_simplify, links_to_add - ): - """Series of ordered lists of indecies and attributes to log simplification of links, data prior to - simplification and the nodes simplified - :param old_ids_list_bunch: old ids list - :param new_id_bunch: same new ids list - :param indexed_paths_to_simplify: same len as id_bunch - :param links_to_add: lists of nodes deleted in order e.g. is path_before = [A, B, C, D] and path_after = [A, D] - path_diff = [B, C], list of those for all links - :return: + self, + old_ids_list_bunch: list, + new_id_bunch: list, + indexed_paths_to_simplify: dict, + links_to_add: dict, + ) -> Self: + """Series of ordered lists of indices and attributes to log simplification of links, data prior to simplification and the nodes simplified. + + Args: + old_ids_list_bunch (list): old ids list. + new_id_bunch (list): new ids list + indexed_paths_to_simplify (dict): same len as ID bunches + links_to_add (dict): + Lists of nodes deleted in order. + E.g. if path_before = [A, B, C, D] and path_after = [A, D], then path_diff = [B, C]. + Dictionary is list of `path_diff` for all links. + + Returns: + Self: Existing config concatenated with simplified bunch. """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return self.__class__( @@ -205,13 +225,17 @@ def remove(self, object_type: str, object_id: Union[int, str], object_attributes ) def remove_bunch( - self, object_type: str, id_bunch: List[Union[int, str]], attributes_bunch: List[dict] - ): + self, object_type: str, id_bunch: list[Union[int, str]], attributes_bunch: list[dict] + ) -> Self: """ - :param object_type: - :param id_bunch: same len as attributes_bunch - :param attributes_bunch: same len as id_bunch - :return: + + Args: + object_type (str): GeNet object type. + id_bunch (list[Union[int, str]]): same len as `attributes_bunch`. + attributes_bunch (list[dict]): same len as `id_bunch`. + + Returns: + Self: Existing config concatenated with removed bunch. """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return self.__class__( diff --git a/src/genet/modify/schedule.py b/src/genet/modify/schedule.py index 3561e63d..154b8952 100644 --- a/src/genet/modify/schedule.py +++ b/src/genet/modify/schedule.py @@ -8,11 +8,15 @@ from genet.max_stable_set import MaxStableSet -def reproj_stops(schedule_element_nodes: dict, new_epsg): +def reproj_stops(schedule_element_nodes: dict, new_epsg: str) -> dict: """ - :param schedule_element_nodes: dict stop ids : stop data including x, y, epsg - :param new_epsg: 'epsg:1234', the epsg stops are being projected to - :return: dict: stop ids from schedule_element_nodes: changed stop data in dict format new x, y and epsg + + Args: + schedule_element_nodes (dict): stop IDs : stop data including `x`, `y`, `epsg`. + new_epsg (str): the epsg stops are being projected to, e.g. 'epsg:4326'. + + Returns: + dict: Stop IDs from `schedule_element_nodes`: changed stop data in dict format new `x`, `y` and `epsg`. """ transformers = { epsg: Transformer.from_crs(epsg, new_epsg, always_xy=True) diff --git a/src/genet/output/matsim_xml_writer.py b/src/genet/output/matsim_xml_writer.py index 74447e75..7eb2a53b 100644 --- a/src/genet/output/matsim_xml_writer.py +++ b/src/genet/output/matsim_xml_writer.py @@ -9,6 +9,7 @@ import genet.variables as variables from genet.exceptions import MalformedAdditionalAttributeError from genet.output import sanitiser +from genet.schedule_elements import Schedule from genet.utils.spatial import encode_shapely_linestring_to_polyline from genet.validate.network import validate_attribute_data @@ -187,14 +188,16 @@ def write_matsim_network(output_dir, network): save_attributes(link_attributes, xf, elem_type="link") -def write_matsim_schedule(output_dir, schedule, reproj_processes=1): - """ - Save to MATSim XML format. - :param output_dir: path to output directory - :param schedule: genet.Schedule object - :param reproj_processes: you can set this in case you have a lot of stops and your stops need to be reprojected - it splits the process across given number of processes. - :return: +def write_matsim_schedule(output_dir: str, schedule: Schedule, reproj_processes: int = 1): + """Save to MATSim XML format. + + Args: + output_dir (str): path to output directory. + schedule (Schedule): Schedule object to write. + reproj_processes (int, optional): + You can set this in case you have a lot of stops and your stops need to be reprojected. + It splits the process across given number of processes. + Defaults to 1. """ fname = os.path.join(output_dir, "schedule.xml") logging.info("Writing {}".format(fname)) diff --git a/src/genet/schedule_elements.py b/src/genet/schedule_elements.py index 7869fa55..9b611de5 100644 --- a/src/genet/schedule_elements.py +++ b/src/genet/schedule_elements.py @@ -3008,14 +3008,14 @@ def apply_function_to_services(self, function: Callable, location: str): ) self.apply_attributes_to_services(new_attributes) - def apply_function_to_routes(self, function: Callable, location: str): + def apply_function_to_routes(self, function: Union[dict, Callable], location: str): """Applies a function or mapping to Routes within the Schedule. Fails silently, if the keys referred to by the function are not present, they will not be considered. The function will only be applied where it is possible. Args: - function (Callable): + function (Union[dict, Callable]): Function of Service attributes dictionary returning a value that should be stored under `location` or a dictionary mapping. In the case of a dictionary all values stored under `location` will be mapped to new values given by the mapping, if they are present. @@ -3026,14 +3026,14 @@ def apply_function_to_routes(self, function: Callable, location: str): ) self.apply_attributes_to_routes(new_attributes) - def apply_function_to_stops(self, function: Callable, location: str): + def apply_function_to_stops(self, function: Union[dict, Callable], location: str): """Applies a function or mapping to Stops within the Schedule. Fails silently, if the keys referred to by the function are not present, they will not be considered. The function will only be applied where it is possible. Args: - function (Callable): + function (Union[dict, Callable]): Function of Service attributes dictionary returning a value that should be stored under `location` or a dictionary mapping. In the case of a dictionary all values stored under `location` will be mapped to new values given by the mapping, if they are present. diff --git a/src/genet/use/road_pricing.py b/src/genet/use/road_pricing.py index 5c5e9598..525affc1 100644 --- a/src/genet/use/road_pricing.py +++ b/src/genet/use/road_pricing.py @@ -5,9 +5,11 @@ import numpy as np import pandas as pd from lxml import etree as et -from lxml.etree import Comment, Element, SubElement +from lxml.etree import Comment, Element, SubElement, _Element from tqdm import tqdm +from genet.core import Network + class Toll: def __init__(self, df_tolls: pd.DataFrame = None): @@ -27,30 +29,38 @@ def __init__(self, df_tolls: pd.DataFrame = None): else: self.df_tolls = df_tolls - def write_to_csv(self, output_dir, filename="road_pricing.csv"): - """ - Exports all tolls to csv file - :param output_dir: path to folder to receive the file - :return: None + def write_to_csv(self, output_dir: str, filename: str = "road_pricing.csv"): + """Exports all tolls to csv file. + + Args: + output_dir (str): Path to folder in which to save `filename`. + filename (str, optional): CSV filename. Defaults to "road_pricing.csv". """ self.df_tolls.to_csv(os.path.join(output_dir, filename), index=False) def write_to_xml( self, - output_dir, - filename="roadpricing-file.xml", - toll_type="link", - toll_scheme_name="simple-toll", - toll_description="A simple toll scheme", + output_dir: str, + filename: str = "roadpricing-file.xml", + toll_type: str = "link", + toll_scheme_name: str = "simple-toll", + toll_description: str = "A simple toll scheme", ): - """ - Write toll to MATSim xml file - :param output_dir: path to folder to receive the file - :param toll_type: default 'link', other supported MATSim toll types: 'distance', 'cordon', 'area', - more info: https://www.matsim.org/apidocs/core/0.3.0/org/matsim/roadpricing/package-summary.html - :param toll_scheme_name: name to pass to xml file, useful for identifying multiple toll schemes - :param toll_description: additional description of the toll to pass to the xml file - :return: None + """Write toll to MATSim xml file. + + Args: + output_dir (str): Path to folder in which to save `filename`. + filename (str, optional): Matsim XML filename. Defaults to "roadpricing-file.xml". + toll_type (str, optional): + Supported MATSim toll types: 'distance', 'cordon', 'area', 'link'. + More info: https://www.matsim.org/apidocs/core/0.3.0/org/matsim/roadpricing/package-summary.html. + Defaults to "link". + toll_scheme_name (str, optional): + Name to pass to xml file, useful for identifying multiple toll schemes. + Defaults to "simple-toll". + toll_description (str, optional): + Additional description of the toll to pass to the xml file. + Defaults to "A simple toll scheme". """ xml_tree = build_tree( self.df_tolls, @@ -61,19 +71,21 @@ def write_to_xml( write_xml(xml_tree, output_dir, filename=filename) -def road_pricing_from_osm(network, attribute_name, osm_csv_path, outpath): - """ - Instantiates a Toll object from OSM csv config and network inputs - - Parse a genet.Network object and find edges whose - ['attributes'][attribute_name]['text'] is present in a list of OSM way ids - :param network: a genet.Network object with attribute_name tags - :param attribute_name: a string corresponding to the name of the link attribute of interest - :param osm_csv_path: path to a .csv config file where OSM way ids are stored in column `osm_ids` - :param outpath: path to an outputs folder - :return: osm_df which is also written to .csv and a mapping between OSM IDs and network link IDs osm_to_network_dict - which is also saved to .json in the `outpath` location - :return: +def road_pricing_from_osm( + network: Network, attribute_name: str, osm_csv_path: str, outpath: str +) -> Toll: + """Instantiates a Toll object from OSM csv config and network inputs. + + Parse a genet.Network object and find edges whose `['attributes'][attribute_name]['text']` is present in a list of OSM way ids. + + Args: + network (Network): a genet.Network object with attribute_name tags. + attribute_name (str): a string corresponding to the name of the link attribute of interest + osm_csv_path (str): path to a .csv config file where OSM way ids are stored in column `osm_ids` + outpath (str): path to an outputs folder + + Returns: + Toll: Contains OSM dataframe (which is also written to CSV file), and a mapping between OSM IDs and network link IDs osm_to_network_dict (which is also saved to JSON file in the `outpath` location). """ osm_df, osm_to_network_dict = extract_network_id_from_osm_csv( network, attribute_name, osm_csv_path, outpath @@ -98,18 +110,25 @@ def merge_osm_tolls_and_network_snapping(osm_df, osm_to_network_dict): return df -def extract_network_id_from_osm_csv(network, attribute_name, osm_csv_path, outpath, osm_dtype=str): - """ - Parse a genet.Network object and find edges whose ['attributes'][attribute_name] is present in a list - of OSM way ids - :param network: a genet.Network object with attribute_name tags - :param attribute_name: a string corresponding to the name of the link attribute of interest - :param osm_csv_path: path to a .csv config file where OSM way ids are stored in column `osm_ids` - :param outpath: path to a folder - :param osm_dtype: data type to pass to pandas.read_csv method. Should match the python dtype for OSM data tags - stored in the network as they are being matched - :return: osm_df which is also written to .csv and a mapping between OSM IDs and network link IDs - osm_to_network_dict which is also saved to .json in the `outpath` location +def extract_network_id_from_osm_csv( + network: Network, attribute_name: str, osm_csv_path: str, outpath: str, osm_dtype: type = str +) -> tuple[pd.DataFrame, dict]: + """Parse a genet.Network object and find edges whose `['attributes'][attribute_name]` is present in a list of OSM way ids. + + Args: + network (Network): a genet.Network object with `attribute_name` tags, + attribute_name (str): a string corresponding to the name of the link attribute of interest + osm_csv_path (str): path to a .csv config file where OSM way ids are stored in column `osm_ids` + outpath (str): path to a folder + osm_dtype (type, optional): + Data type to pass to pandas.read_csv method. + Should match the python dtype for OSM data tags stored in the network as they are being matched. + Defaults to str. + + Returns: + tuple[pd.DataFrame, dict]: + `osm_df` which is also written to .csv and a mapping between OSM IDs and network link IDs; + `osm_to_network_dict` which is also saved to .json in the `outpath` location. """ osm_df = pd.read_csv(osm_csv_path, dtype={"osm_id": osm_dtype}) @@ -154,12 +173,13 @@ def extract_network_id_from_osm_csv(network, attribute_name, osm_csv_path, outpa return osm_df, osm_to_network_dict -def write_xml(root, path, filename="roadpricing-file.xml"): - """ - Write XML config for MATSim Road Pricing a given folder location. - :param root: an 'lxml.etree._Element' object corresponding to the root of an XML tree - :param path: location of destination folder for Road Pricing config - :return: None +def write_xml(root: _Element, path: str, filename: str = "roadpricing-file.xml"): + """Write XML config for MATSim Road Pricing a given folder location. + + Args: + root (_Element): root of an XML tree. + path (str): location of destination folder for Road Pricing config. + filename (str, optional): Road pricing XML filename. Defaults to "roadpricing-file.xml". """ tree = et.tostring(root, pretty_print=True, xml_declaration=False, encoding="UTF-8") with open(os.path.join(path, filename), "wb") as file: @@ -171,19 +191,33 @@ def write_xml(root, path, filename="roadpricing-file.xml"): def build_tree_from_csv_json( - csv_input, - json_input, - toll_type="link", - toll_scheme_name="simple-toll", - toll_description="A simple toll scheme", -): - """ - Build XML config for MATSim Road Pricing from .csv and .json input - :param csv_input: csv output from `extract_network_id_from_osm_csv` with additional columns: `vehicle_type`, - `toll_amount`, `start_time` and `end_time` for each of the tolls required. - :param json_input: json output from `extract_network_id_from_osm_csv` - :return: an 'lxml.etree._Element' object + csv_input: str, + json_input: str, + toll_type: str = "link", + toll_scheme_name: str = "simple-toll", + toll_description: str = "A simple toll scheme", +) -> _Element: + """Build XML config for MATSim Road Pricing from .csv and .json input. + + Args: + csv_input (str): + csv output from `extract_network_id_from_osm_csv` with additional columns: `vehicle_type`, `toll_amount`, `start_time` and `end_time` for each of the tolls required. + json_input (str): json output from `extract_network_id_from_osm_csv`. + toll_type (str, optional): + Supported MATSim toll types: 'distance', 'cordon', 'area', 'link'. + More info: https://www.matsim.org/apidocs/core/0.3.0/org/matsim/roadpricing/package-summary.html. + Defaults to "link". + toll_scheme_name (str, optional): + Name to pass to xml file, useful for identifying multiple toll schemes. + Defaults to "simple-toll". + toll_description (str, optional): + Additional description of the toll to pass to the xml file. + Defaults to "A simple toll scheme". + + Returns: + _Element: XML root element. """ + # CSV input osm_df = pd.read_csv(csv_input, dtype={"osm_id": str}) # JSON input @@ -198,14 +232,17 @@ def build_tree_from_csv_json( def build_tree( - df_tolls, - toll_type="link", - toll_scheme_name="simple-toll", - toll_description="A simple toll scheme", -): - """ - Build XML config for MATSim Road Pricing from tolls DataFrame input - :param df_tolls: pd.DataFrame( + df_tolls: pd.DataFrame, + toll_type: str = "link", + toll_scheme_name: str = "simple-toll", + toll_description: str = "A simple toll scheme", +) -> _Element: + """Build XML config for MATSim Road Pricing from tolls DataFrame input + + Args: + df_tolls (pd.DataFrame): Of the form: + ```python + pd.DataFrame( columns=[ 'toll_id', # optional, unique ID of the toll, based off OSM ref if applicable 'network_link_id', # network link ID to be charged @@ -216,11 +253,21 @@ def build_tree( 'osm_name', # optional, if derived from OSM, human readable name of the road 'notes' # optional, user notes ] - :param toll_type: default 'link', other supported MATSim toll types: 'distance', 'cordon', 'area', - more info: https://www.matsim.org/apidocs/core/0.3.0/org/matsim/roadpricing/package-summary.html - :param toll_scheme_name: name to pass to xml file, useful for identifying multiple toll schemes - :param toll_description: additional description of the toll to pass to the xml file - :return: an 'lxml.etree._Element' object + ) + ``` + toll_type (str, optional): + Supported MATSim toll types: 'distance', 'cordon', 'area', 'link'. + More info: https://www.matsim.org/apidocs/core/0.3.0/org/matsim/roadpricing/package-summary.html. + Defaults to "link". + toll_scheme_name (str, optional): + Name to pass to xml file, useful for identifying multiple toll schemes. + Defaults to "simple-toll". + toll_description (str, optional): + Additional description of the toll to pass to the xml file. + Defaults to "A simple toll scheme". + + Returns: + _Element: XML root element. """ roadpricing = Element("roadpricing", type=toll_type, name=toll_scheme_name) diff --git a/src/genet/use/schedule.py b/src/genet/use/schedule.py index 144366ef..ebb45fe6 100644 --- a/src/genet/use/schedule.py +++ b/src/genet/use/schedule.py @@ -2,7 +2,7 @@ import logging import os from datetime import datetime, timedelta -from typing import List +from typing import Optional import geopandas as gpd import numpy as np @@ -34,12 +34,17 @@ def get_offset(time): ) -def generate_edge_vph_geodataframe(df, gdf_links): - """ - Generates vehicles per hour for a trips dataframe - :param df: trips dataframe - :param gdf_links: geodataframe containing links of the schedule (element) graph - :return: +def generate_edge_vph_geodataframe( + df: pd.DataFrame, gdf_links: gpd.GeoDataFrame +) -> gpd.GeoDataFrame: + """Generates vehicles per hour for a trips dataframe. + + Args: + df (pd.DataFrame): trips dataframe. + gdf_links (gpd.GeoDataFrame): geodataframe containing links of the schedule (element) graph. + + Returns: + gpd.GeoDataFrame: Geodataframe which merges the two input arguments and keeps the `vph` column. """ df.loc[:, "hour"] = df["departure_time"].dt.round("H") groupby_cols = ["hour", "trip_id", "from_stop", "from_stop_name", "to_stop", "to_stop_name"] @@ -54,13 +59,20 @@ def generate_edge_vph_geodataframe(df, gdf_links): return gdf -def vehicles_per_hour(df, aggregate_by: list, output_path=""): - """ - Generates vehicles per hour for a trips dataframe - :param df: trips dataframe - :param aggregate_by: - :param output_path: path for the frame with .csv extension - :return: +def vehicles_per_hour( + df: pd.DataFrame, aggregate_by: list, output_path: Optional[str] = None +) -> pd.DataFrame: + """Generates vehicles per hour for a trips dataframe + + Args: + df (pd.DataFrame): trips dataframe. + aggregate_by (list): trip metadata to aggregate trips by. + output_path (Optional[str], optional): + If given, path for the frame with .csv extension. + Defaults to None. + + Returns: + pd.DataFrame: Vehicles per hour. """ df.loc[:, "hour"] = df["departure_time"].dt.round("H") df.loc[:, "hour"] = df["hour"].dt.hour @@ -75,12 +87,17 @@ def vehicles_per_hour(df, aggregate_by: list, output_path=""): return df -def trips_per_day_per_service(df, output_dir=""): - """ - Generates trips per day per service for a trips dataframe - :param df: trips dataframe - :param output_dir: directory to save `trips_per_day_per_service.csv` - :return: +def trips_per_day_per_service(df: pd.DataFrame, output_dir: Optional[str] = None) -> pd.DataFrame: + """Generates trips per day per service for a trips dataframe. + + Args: + df (pd.DataFrame): trips dataframe. + output_dir (Optional[str], optional): + If given, directory to save `trips_per_day_per_service.csv`. + Defaults to None. + + Returns: + pd.DataFrame: Trips per day. """ trips_per_day = ( df.groupby(["service_id", "service_name", "route_id", "mode"]) @@ -96,12 +113,17 @@ def trips_per_day_per_service(df, output_dir=""): return trips_per_day -def trips_per_day_per_route(df, output_dir=""): - """ - Generates trips per day per route for a trips dataframe - :param df: trips dataframe - :param output_dir: directory to save `trips_per_day_per_service.csv` - :return: +def trips_per_day_per_route(df: pd.DataFrame, output_dir: Optional[str] = None) -> pd.DataFrame: + """Generates trips per day per route for a trips dataframe. + + Args: + df (pd.DataFrame): trips dataframe + output_dir (Optional[str], optional): + If given, directory to save `trips_per_day_per_service.csv`. + Defaults to None. + + Returns: + pd.DataFrame: trips per day per route. """ trips_per_day = ( df.groupby(["route_id", "route_name", "mode"]).nunique()["trip_id"].reset_index() @@ -163,21 +185,30 @@ def aggregate_by_stop_names(df_aggregate_trips_per_day_per_route_by_end_stop_pai return df -def divide_network_route(route: List[str], stops_linkrefids: List[str]) -> List[List[str]]: - """ - Divides into list of lists, the network route traversed by a PT service. - E.g. - route = ['a-a', 'a-b', 'b-b', 'b-c', 'c-c', 'c-d'] - stops_linkrefids = ['a-a', 'b-b', 'c-c'] - For a service with stops A, B, C, where the stops are snapped to network links 'a-a', 'b-b', 'c-c' respectively. - This method will give you teh answer: - [['a-a', 'a-b', 'b-b'], ['b-b', 'b-c', 'c-c']] - i.e. the route between stops A and B, and B and C, in order. - :param route: list of network link IDs (str) - :param stops_linkrefids: List of network link IDs (str) that the stops on route are snapped to - :return: +def divide_network_route(route: list[str], stops_linkrefids: list[str]) -> list[list[str]]: + """Divides the network route traversed by a PT service into list of lists. + + Examples: + ```python + route = ['a-a', 'a-b', 'b-b', 'b-c', 'c-c', 'c-d'] + stops_linkrefids = ['a-a', 'b-b', 'c-c'] + ``` + + For a service with stops A, B, C, where the stops are snapped to network links 'a-a', 'b-b', 'c-c' respectively. + This method will give you the answer: + ```python + [['a-a', 'a-b', 'b-b'], ['b-b', 'b-c', 'c-c']] + ``` + i.e. the route between stops A and B, and B and C, in order. + + Args: + route (list[str]): list of network link IDs. + stops_linkrefids (list[str]): List of network link IDs (str) that the stops on route are snapped to. + + Returns: + list[list[str]]: Divided route. """ - divided_route = [[]] + divided_route: list[list[str]] = [[]] for link_id in route: divided_route[-1].append(link_id) while stops_linkrefids and (link_id == stops_linkrefids[0]): diff --git a/src/genet/utils/dict_support.py b/src/genet/utils/dict_support.py index 7e3fb5b4..67d3d31d 100644 --- a/src/genet/utils/dict_support.py +++ b/src/genet/utils/dict_support.py @@ -7,12 +7,15 @@ import genet.utils.graph_operations as graph_operations -def set_nested_value(d: dict, value: dict): - """ - Changes or, if not present injects, `different_value` into nested dictionary d at key `key: key_2` - :param d: {key: {key_2: value, key_1: 1234} - :param value: {key: {key_2: different_value}} - :return: +def set_nested_value(d: dict, value: dict) -> dict: + """Changes or, if not present injects, `different_value` into nested dictionary d at key `key: key_2`. + + Args: + d (dict): `{key: {key_2: value, key_1: 1234}` + value (dict): `{key: {key_2: different_value}}` + + Returns: + dict: `d` with nested dictionary value updated. """ if isinstance(value, dict): for k, v in value.items(): @@ -28,21 +31,29 @@ def set_nested_value(d: dict, value: dict): return d -def get_nested_value(d: dict, path: dict): - """ - Retrieves value from nested dictionary - :param d: {key: {key_2: {key_2_1: hey}, key_1: 1234} - :param path: {key: {key_2: key_2_1}} path to take through the dictionary d - :return: d[key][key_2][key_2_1] +def get_nested_value(d: dict, path: Union[dict, str]) -> dict: + """Retrieves value from nested dictionary. + + Args: + d (dict): `{key: {key_2: {key_2_1: hey}, key_1: 1234}`. + path (Union[dict, str]): `{key: {key_2: key_2_1}} path to take through the dictionary d`. + + Raises: + KeyError: All nested keys in the path must exist. + + Returns: + dict: `d[key][key_2][key_2_1]` """ + if isinstance(path, dict): for k, v in path.items(): if k in d: - return get_nested_value(d[k], v) + val = get_nested_value(d[k], v) else: raise KeyError(f"Dictionary {d} does not have key {k}") else: - return d[path] + val = d[path] + return val def find_nested_paths_to_value(d: dict, value: Union[str, int, float, set, list]): @@ -81,17 +92,25 @@ def nest_at_leaf(d: dict, value): return d -def merge_complex_dictionaries(d1, d2): +def merge_complex_dictionaries(d1: dict, d2: dict) -> dict: """ Merges two dictionaries where the values can be lists, sets or other dictionaries with the same behaviour. + If values are not list, set or dict then d2 values prevail. - If the values are lists, the two merge, retaining all elements of both lists and preserving their order - the result is: d1_list + d2_list. + + If the values are lists, the two merge, retaining all elements of both lists and preserving their order. + The result is: `d1_list` + `d2_list`. + If the values are sets, the two combine with the OR operator. + If the values are dicts, the two merge using this method. - :param d1: - :param d2: - :return: + + Args: + d1 (dict): First dictionary in the merge + d2 (dict): Second dictionary in the merge + + Returns: + dict: Merged dictionary. """ d = deepcopy(d1) clashing_keys = set(d1) & set(d2) @@ -109,12 +128,15 @@ def merge_complex_dictionaries(d1, d2): return d -def combine_edge_data_lists(l1, l2): - """ - Merges two lists where each elem is of the form (from_node, to_node, list) - :param l1: - :param l2: - :return: +def combine_edge_data_lists(l1: list, l2: list) -> list: + """Merges two lists where each elem is of the form (from_node, to_node, list). + + Args: + l1 (list): First list in merge. + l2 (list): Second list in merge. + + Returns: + list: List of merged dictionaries. """ edges = merge_complex_dictionaries( {(u, v): dat for u, v, dat in l1}, {(u, v): dat for u, v, dat in l2} diff --git a/src/genet/utils/elevation.py b/src/genet/utils/elevation.py index 80c017c3..12259533 100644 --- a/src/genet/utils/elevation.py +++ b/src/genet/utils/elevation.py @@ -22,15 +22,18 @@ def get_elevation_data(img, lat, lon): return elevation_meters -def validation_report_for_node_elevation(elev_dict, low_limit=-50, mont_blanc_height=4809): - """ - Generates a validation report for the node elevation dictionary. - :param elev_dict: contains node_id as key and elevation in meters as value - :param low_limit: values below this param get flagged as possibly wrong; set at -50m (below sea level) by default, - can optionally set a different value - :param mont_blanc_height: values above this param get flagged as possibly wrong; defaults to 4809m, - the height of Mont Blank, can optionally set a different value - :return: dict, with 2 data subsets - summary statistics, and extreme values lists +def validation_report_for_node_elevation( + elev_dict: dict, low_limit: int = -50, mont_blanc_height: int = 4809 +) -> dict: + """Generates a validation report for the node elevation dictionary. + + Args: + elev_dict (dict): contains node_id as key and elevation in meters as value. + low_limit (int, optional): values below this get flagged as possibly wrong. Defaults to -50 (below sea level). + mont_blanc_height (int, optional): values above this get flagged as possibly wrong. Defaults to 4809 (the height of Mont Blanc). + + Returns: + dict: Contains summary statistics, and extreme values lists. """ elevation_list = [] @@ -68,11 +71,12 @@ def validation_report_for_node_elevation(elev_dict, low_limit=-50, mont_blanc_he return report -def write_slope_xml(link_slope_dictionary, output_dir): - """ - Generates a link_slopes XML file. - :param link_slope_dictionary: dictionary of link slopes in format {link_id: {'slope': slope_value}} - :param output_dir: directory where the XML file will be written to +def write_slope_xml(link_slope_dictionary: dict, output_dir: str): + """Generates a link_slopes XML file. + + Args: + link_slope_dictionary (dict): dictionary of link slopes in format `{link_id: {'slope': slope_value}}` + output_dir (str): directory where the XML file will be written to. """ fname = os.path.join(output_dir, "link_slopes.xml") logging.info(f"Writing {fname}") diff --git a/src/genet/utils/google_directions.py b/src/genet/utils/google_directions.py index bb1eda58..df315317 100644 --- a/src/genet/utils/google_directions.py +++ b/src/genet/utils/google_directions.py @@ -4,8 +4,10 @@ import logging import os import time +from typing import Callable, Optional, Union import polyline +import requests from requests_futures.sessions import FuturesSession import genet.output.geojson as geojson @@ -13,36 +15,49 @@ import genet.utils.secrets_vault as secrets_vault import genet.utils.simplification as simplification import genet.utils.spatial as spatial +from genet.core import Network session = FuturesSession(max_workers=2) def send_requests_for_network( - n, + n: Network, request_number_threshold: int, - output_dir, - departure_time, - traffic_model: str = None, - key: str = None, - secret_name: str = None, - region_name: str = None, -): - """ - Generates, sends and parses results from Google Directions API for the car modal subgraph for network n. - You can pass your API key to this function under `key` variable. Alternatively, you can use AWS Secrets manager - for storing your API and pass secret_name and region_name (make sure you are authenticated to your AWS account). - You can also export an environmental variable in your terminal $ export GOOGLE_DIR_API_KEY='your key' - :param n: genet.Network - :param request_number_threshold: max number of requests - :param output_dir: output directory where to save the google directions api parsed data - :param departure_time: specifies the desired time of departure, in seconds since midnight, January 1, 1970 UTC, - i.e. unix time; if set to None, API will return results for average time-independent traffic conditions - :param traffic_model: str, specifies the assumptions to use when calculating time in traffic for choices see - https://developers.google.com/maps/documentation/directions/get-directions#traffic_model - :param key: API key - :param secret_name: if using aws secrets manager, the name where your directions api key is stored - :param region_name: the aws region you operate in - :return: api requests + output_dir: str, + departure_time: int, + traffic_model: Optional[str] = None, + key: Optional[str] = None, + secret_name: Optional[str] = None, + region_name: Optional[str] = None, +) -> dict: + """Generates, sends and parses results from Google Directions API for the car modal subgraph for network n. + + You can pass your API key to this function under `key` variable. + + Alternatively, you can use AWS Secrets manager for storing your API and pass secret_name and region_name (make sure you are authenticated to your AWS account). + + You can also export an environmental variable in your terminal `$ export GOOGLE_DIR_API_KEY='your key'`. + + Args: + n (Network): GeNet Network. + request_number_threshold (int): max number of requests. + output_dir (str): output directory where to save the google directions api parsed data. + departure_time (int): + specifies the desired time of departure, in seconds since midnight, January 1, 1970 UTC. + i.e. unix time; if set to None, API will return results for average time-independent traffic conditions. + traffic_model (Optional[str], optional): + If given, specifies the assumptions to use when calculating time in traffic. + For choices see https://developers.google.com/maps/documentation/directions/get-directions#traffic_model. + Defaults to None. + key (Optional[str], optional): API key. Defaults to None. + secret_name (Optional[str], optional): If using AWS secrets manager, the name where your directions api key is stored. Defaults to None. + region_name (Optional[str], optional): The AWS region you operate in. Defaults to None. + + Raises: + RuntimeError: Can only make as many requests as `request_number`. + + Returns: + dict: API request results. """ logging.info("Generating Google Directions API requests") api_requests = generate_requests(n) @@ -65,12 +80,14 @@ def send_requests_for_network( return api_requests -def read_api_requests(file_path): - """ - Read the Google Directions API requests, generated to be sent and received back from the API, in the `file_path` - JSON file. - :param file_path: path to the JSON file where the google directions api requests were saved - :return: +def read_api_requests(file_path: str) -> dict: + """Read the Google Directions API request results stored in the `file_path` JSON file. + + Args: + file_path (str): path to the JSON file where the google directions api requests were saved. + + Returns: + dict: Loaded API request results """ api_requests = {} with open(file_path, "rb") as handle: @@ -118,11 +135,11 @@ def make_request(origin_attributes, destination_attributes, key, departure_time, def send_requests( api_requests: dict, - departure_time, - traffic_model: str = None, - key: str = None, - secret_name: str = None, - region_name: str = None, + departure_time: int, + traffic_model: Optional[str] = None, + key: Optional[str] = None, + secret_name: Optional[str] = None, + region_name: Optional[str] = None, ): if key is None: key = secrets_vault.get_google_directions_api_key(secret_name, region_name) @@ -148,12 +165,18 @@ def send_requests( return api_requests -def generate_requests(n, osm_tags=all): - """ - Generates a dictionary describing pairs of nodes for which we need to request - directions from Google directions API. - :param n: genet.Network - :return: +def generate_requests(n: Network, osm_tags: Union[Callable, list[str]] = all) -> dict: + """Generates a dictionary describing pairs of nodes for which we need to request directions from Google directions API. + + Args: + n (Network): GeNet network. + osm_tags (Union[Callable, list[str]], optional): OSM tags to subset the network on. Defaults to all (no subsetting). + + Raises: + RuntimeError: Can only subset on tags for non-simplified networks. + + Returns: + dict: Generated requests. """ if n.is_simplified(): logging.info("Generating Google Directions API requests for a simplified network.") @@ -168,13 +191,21 @@ def generate_requests(n, osm_tags=all): return _generate_requests_for_non_simplified_network(n, osm_tags) -def _generate_requests_for_non_simplified_network(n, osm_tags=all): - """ - Generates a dictionary describing pairs of nodes for which we need to request - directions from Google directions API. For a non-simplified network n - :param n: genet.Network - :param osm_tags: takes a list of OSM tags to subset the network on, e.g. ['primary', 'secondary', 'tertiary'] - :return: +def _generate_requests_for_non_simplified_network( + n: Network, osm_tags: Union[Callable, list[str]] = all +) -> dict: + """Generates a dictionary describing pairs of nodes for which we need to request directions from Google directions API. + + For a non-simplified network. + + Args: + n (Network): Non-simplified network n. + osm_tags (Union[Callable, list[str]], optional): + If given, a list of OSM tags to subset the network on, e.g. ['primary', 'secondary', 'tertiary']. + Defaults to all (no subsetting). + + Returns: + dict: Generated requests. """ if osm_tags == all: g = n.modal_subgraph(modes="car") @@ -205,12 +236,16 @@ def _generate_requests_for_non_simplified_network(n, osm_tags=all): return api_requests -def _generate_requests_for_simplified_network(n): - """ - Generates a dictionary describing pairs of nodes for which we need to request - directions from Google directions API. For a simplified network n - :param n: genet.Network - :return: +def _generate_requests_for_simplified_network(n: Network) -> dict: + """Generates a dictionary describing pairs of nodes for which we need to request directions from Google directions API. + + For a simplified network. + + Args: + n (Network): Simplified network n. + + Returns: + dict: Generated requests. """ gdf_links = geojson.generate_geodataframes(n.modal_subgraph(modes="car"))["links"].to_crs( "epsg:4326" @@ -265,15 +300,19 @@ def compute_speed(): return data -def parse_routes(response, path_polyline): - """ - Parses response contents to infer speed. If response returned more than one route, it picks the one closest on - average to the original request - :param response: request content - :param path_polyline: original request path encoded list of lat lon tuples - :return: +def parse_routes(response: requests.Response, path_polyline: str) -> dict: + """Parses response contents to infer speed. + + If response returned more than one route, it picks the one closest on average to the original request. + + Args: + response (requests.Response): request content + path_polyline (str): original request path encoded list of lat lon tuples + + Returns: + dict: Parsed routes. """ - data = {} + data: dict = {} if response.status_code == 200: content = response.json() @@ -306,11 +345,14 @@ def parse_routes(response, path_polyline): return data -def parse_results(api_requests): - """ - Goes through all api requests and parses results - :param api_requests: generated and 'sent' api requests - :return: +def parse_results(api_requests: dict) -> dict: + """Goes through all api requests and parses results. + + Args: + api_requests (dict): generated and 'sent' api requests. + + Returns: + dict: Requests with parsed results. """ api_requests_with_response = {} for node_request_pair, api_requests_attribs in api_requests.items(): diff --git a/src/genet/utils/graph_operations.py b/src/genet/utils/graph_operations.py index 01cdcce9..9516bb15 100644 --- a/src/genet/utils/graph_operations.py +++ b/src/genet/utils/graph_operations.py @@ -1,71 +1,64 @@ import logging from itertools import count, filterfalse -from typing import Callable, Dict, Iterable, Optional, Union +from typing import Callable, Iterable, Iterator, Optional, Union import pandas as pd from anytree import Node, RenderTree import genet.utils.dict_support as dict_support +from genet.core import Network from genet.utils import pandas_helpers as pd_helpers class Filter: - """ - Helps filtering on specified attributes + def __init__( + self, + conditions: Optional[ + Union[ + list, + dict[str, Union[dict, str, int, float, list, Callable[[str, int, float], bool]]], + ] + ] = None, + how: Callable = any, + mixed_dtypes: bool = True, + ): + """Helps filtering on specified attributes. - Parameters - ---------- - :param conditions e.g. {'attributes': {'osm:way:osmid': {'text': 12345}}} + Args: + conditions (Union[list, dict]): + {'attribute_key': 'target_value'} or nested {'attribute_key': {'another_key': {'yet_another_key': 'target_value'}}}, + where 'target_value' could be: - Dictionary of (or list of such dictionaries) - key = edge attribute key - value = either another key, if the edge data is nested or the target condition for what the value should be. - That is: - - single value, string, int, float, where the edge_data[key] == value + - single value, string, int, float, where the edge_data[key] == value (if mixed_dtypes==True and in case of set/list edge_data[key], value is in edge_data[key]) - - list or set of single values as above, where edge_data[key] in [value1, value2] + - list or set of single values as above, where edge_data[key] in [value1, value2] (if mixed_dtypes==True and in case of set/list edge_data[key], set(edge_data[key]) & set([value1, value2]) is non-empty) - - for int or float values, two-tuple bound (lower_bound, upper_bound) where - lower_bound <= edge_data[key] <= upper_bound + - for int or float values, two-tuple bound (lower_bound, upper_bound) where + lower_bound <= edge_data[key] <= upper_bound (if mixed_dtypes==True and in case of set/list edge_data[key], at least one item in edge_data[key] satisfies lower_bound <= item <= upper_bound) - - function that returns a boolean given the value e.g. - - def below_exclusive_upper_bound(value): - return value < 100 - + - function that returns a boolean given the value e.g. + ```python + def below_exclusive_upper_bound(value): + return value < 100 + ``` (if mixed_dtypes==True and in case of set/list edge_data[key], at least one item in edge_data[key] returns True after applying function) - :param how : {all, any}, default any + how (Callable, optional): + The level of rigour used to match conditions. Defaults to any. + - all: means all conditions need to be met + - any: means at least one condition needs to be met - The level of rigour used to match conditions + mixed_dtypes (bool, optional): + If True, will consider the intersection of single values or lists of values in queried dictionary keys, e.g. as in simplified networks. + Defaults to True. + """ - * all: means all conditions need to be met - * any: means at least one condition needs to be met - - :param mixed_dtypes: True by default, used if values under dictionary keys queried are single values or lists of - values e.g. as in simplified networks. - """ - - def __init__( - self, - conditions: Optional[ - Union[ - list, - Dict[ - str, - Union[dict, Union[str, int, float], list, Callable[[str, int, float], bool]], - ], - ] - ] = None, - how=any, - mixed_dtypes=True, - ): self.conditions = conditions self.how = how self.mixed_dtypes = mixed_dtypes @@ -124,47 +117,57 @@ def evaluate_condition(self, condition, data_dict): return satisfies -def extract_on_attributes(iterator, conditions: Union[list, dict], how=any, mixed_dtypes=True): - """ - Extracts ids in iterator based on values of attributes attached to the items. Fails silently, - assumes not all items have those attributes. In the case were the attributes stored are - a list or set, like in the case of a simplified network (there will be a mix of objects that are sets and not) - an intersection of values satisfying condition(s) is considered in case of iterable value, if not empty, it is - deemed successful by default. To disable this behaviour set mixed_dtypes to False. - :param iterator: generator, list or set of two-tuples: (id of the item, attributes of the item) - :param conditions: {'attribute_key': 'target_value'} or nested - {'attribute_key': {'another_key': {'yet_another_key': 'target_value'}}}, where 'target_value' could be +def extract_on_attributes( + iterator: Iterator[tuple[str, dict]], + conditions: Union[list, dict], + how: Callable = any, + mixed_dtypes: bool = True, +) -> list: + """Extracts ids in iterator based on values of attributes attached to the items. + + Fails silently, assumes not all items have those attributes. + In the case were the attributes stored are a list or set, + like in the case of a simplified network (there will be a mix of objects that are sets and not), + an intersection of values satisfying condition(s) is considered in case of iterable value, if not empty, it is deemed successful by default. + To disable this behaviour set mixed_dtypes to False. + + Args: + iterator (Iterator[tuple[str, dict]]): list or set of two-tuples: (id of the item, attributes of the item) + conditions (Union[list, dict]): + {'attribute_key': 'target_value'} or nested {'attribute_key': {'another_key': {'yet_another_key': 'target_value'}}}, + where 'target_value' could be: - single value, string, int, float, where the edge_data[key] == value - (if mixed_dtypes==True and in case of set/list edge_data[key], value is in edge_data[key]) + (if mixed_dtypes==True and in case of set/list edge_data[key], value is in edge_data[key]) - list or set of single values as above, where edge_data[key] in [value1, value2] - (if mixed_dtypes==True and in case of set/list edge_data[key], - set(edge_data[key]) & set([value1, value2]) is non-empty) + (if mixed_dtypes==True and in case of set/list edge_data[key], + set(edge_data[key]) & set([value1, value2]) is non-empty) - for int or float values, two-tuple bound (lower_bound, upper_bound) where - lower_bound <= edge_data[key] <= upper_bound - (if mixed_dtypes==True and in case of set/list edge_data[key], at least one item in - edge_data[key] satisfies lower_bound <= item <= upper_bound) + lower_bound <= edge_data[key] <= upper_bound + (if mixed_dtypes==True and in case of set/list edge_data[key], at least one item in + edge_data[key] satisfies lower_bound <= item <= upper_bound) - function that returns a boolean given the value e.g. - + ```python def below_exclusive_upper_bound(value): return value < 100 + ``` + (if mixed_dtypes==True and in case of set/list edge_data[key], at least one item in + edge_data[key] returns True after applying function) - (if mixed_dtypes==True and in case of set/list edge_data[key], at least one item in - edge_data[key] returns True after applying function) + how (Callable, optional): + The level of rigour used to match conditions. Defaults to any. + - all: means all conditions need to be met + - any: means at least one condition needs to be met - :param how : {all, any}, default any + mixed_dtypes (bool, optional): + If True, will consider the intersection of single values or lists of values in queried dictionary keys, e.g. as in simplified networks. + Defaults to True. - The level of rigour used to match conditions - - * all: means all conditions need to be met - * any: means at least one condition needs to be met - - :param mixed_dtypes: True by default, used if values under dictionary keys queried are single values or lists of - values e.g. as in simplified networks. - :return: list of ids in input iterator satisfying conditions + Returns: + list: list of ids in input iterator satisfying conditions """ filter = Filter(conditions, how, mixed_dtypes) return [_id for _id, attribs in iterator if filter.satisfies_conditions(attribs)] @@ -217,10 +220,14 @@ def render_tree(root, data=False): print("%s%s" % (pre, node.name)) -def parse_leaf(leaf): +def parse_leaf(leaf: Node) -> Union[str, dict]: """ - :param leaf: anytree.node.node.Node - :return: str or dictionary with string key value pairs, for use as keys to extraction methods + + Args: + leaf (Node): Leaf node. + + Returns: + Union[str, dict]: str or dictionary with string key value pairs, for use as keys to extraction methods. """ if leaf.depth > 1: dict_path = {leaf.path[1].name: leaf.path[2].name} @@ -232,14 +239,19 @@ def parse_leaf(leaf): return leaf.name -def get_attribute_data_under_key(iterator: Iterable, key: Union[str, dict]): - """ - Returns all data stored under key in attribute dictionaries for iterators yielding (index, attribute_dictionary), - inherits index from the iterator. - :param iterator: list or iterator yielding (index, attribute_dictionary) - :param key: either a string e.g. 'modes', or if accessing nested information, a dictionary - e.g. {'attributes': 'osm:way:name'} or {'attributes': {'osm:way:name': 'text'}} - :return: dictionary where keys are indices and values are data stored under the key +def get_attribute_data_under_key(iterator: Iterable, key: Union[str, dict]) -> dict: + """Returns all data stored under key in attribute dictionaries for iterators yielding (index, attribute_dictionary). + + Inherits index from the iterator. + + Args: + iterator (Iterable): list or iterator yielding (index, attribute_dictionary) + key (Union[str, dict]): + A string, e.g. 'modes'. + A dictionary, if accessing nested information, e.g. `{'attributes': 'osm:way:name'}` or `{'attributes': {'osm:way:name': 'text'}}`. + + Returns: + dict: dictionary where keys are indices and values are data stored under the key """ def get_the_data(attributes, key): @@ -254,7 +266,7 @@ def get_the_data(attributes, key): if key in attributes: data[_id] = attributes[key] - data = {} + data: dict = {} for _id, _attribs in iterator: get_the_data(_attribs, key) @@ -263,17 +275,21 @@ def get_the_data(attributes, key): def build_attribute_dataframe( - iterator, keys: Union[Iterable, str], index_name: Optional[str] = None -): + iterator: Iterable, keys: Union[list, dict, str], index_name: Optional[str] = None +) -> pd.DataFrame: + """Builds a pandas.DataFrame from data in iterator. + + Args: + iterator (Iterable): iterator or list of tuples (id, dictionary data with keys of interest). + keys (Union[list, dict, str]): + keys to extract data from. + Can be a string, list or dictionary/list of dictionaries if accessing nested dictionaries, for example on using dictionaries see `get_attribute_data_under_key` docstring. + index_name (Optional[str], optional): Name of returned dataframe index. Defaults to None. + + Returns: + pd.DataFrame: Attribute dataframe. """ - Builds a pandas.DataFrame from data in iterator. - :param iterator: iterator or list of tuples (id, dictionary data with keys of interest) - :param keys: keys to extract data from. Can be a string, list or dictionary/list of dictionaries if accessing - nested dictionaries, for example on using dictionaries see `get_attribute_data_under_key` docstring. - :param index_name: - :return: - """ - df = None + df = pd.DataFrame() if isinstance(keys, str): keys = [keys] if len(keys) > 1: @@ -289,11 +305,8 @@ def build_attribute_dataframe( col_series = pd.Series(attribute_data, dtype=pd_helpers.get_pandas_dtype(attribute_data)) col_series.name = name - if df is not None: - df = df.merge(pd.DataFrame(col_series), left_index=True, right_index=True, how="outer") - else: - df = pd.DataFrame(col_series) - if index_name: + df = df.merge(pd.DataFrame(col_series), left_index=True, right_index=True, how="outer") + if index_name is not None: df.index = df.index.set_names([index_name]) return df @@ -336,13 +349,20 @@ def apply_function_to_attributes(iterator, function, location): return new_attributes -def consolidate_node_indices(left, right): - """ - Changes the node indexing in right to match left spatially and resolves clashing node ids if they don't match - spatially. The two networks need to be in matching coordinate systems. - :param left: genet.core.Network - :param right: genet.core.Network that needs to be updated to match left network - :return: updated right +def consolidate_node_indices(left: Network, right: Network) -> Network: + """Changes the node indexing in right to match left spatially and resolves clashing node ids if they don't match spatially. + + The two networks need to be in matching coordinate systems. + + Args: + left (Network): GeNet network. + right (Network): GeNet network that needs to be updated to match left network. + + Raises: + RuntimeError: Nodes must be spatially unique. + + Returns: + Network: Updated `right` network. """ # find spatially overlapping nodes by extracting all of the s2 spatial ids from right s2_ids_right = right.node_attribute_data_under_key("s2_id") @@ -388,14 +408,17 @@ def consolidate_node_indices(left, right): return right -def consolidate_link_indices(left, right): - """ - Changes the link indexing in right to match left on modes stored on the links and resolves clashing link ids if - they don't match. This method assumes that the node ids of left vs right have already been consolidated (see - the method above with consolidates node ids) - :param left: genet.core.Network - :param right: genet.core.Network that needs to be updated to match left network - :return: updated right +def consolidate_link_indices(left: Network, right: Network) -> Network: + """Changes the link indexing in right to match left on modes stored on the links and resolves clashing link ids if they don't match. + + This method assumes that the node ids of left vs right have already been consolidated (see `consolidate_node_indices`, which consolidates node ids). + + Args: + left (Network): GeNet network. + right (Network): GeNet network that needs to be updated to match left network. + + Returns: + Network: Updated `right` network. """ def sort_and_hash(modes_list): @@ -468,8 +491,8 @@ def append_data_to_unique_clashing_links_data(row): df.groupby(["from", "to"]).apply(get_edges_with_clashing_ids).reset_index(drop=True) ) # store the edge data from right - overlapping_links_data = {} - unique_clashing_links_data = {} + overlapping_links_data: dict = {} + unique_clashing_links_data: dict = {} if not clashing_overlapping_edges.empty: clashing_overlapping_edges[ clashing_overlapping_edges["link_id_right"].notna() @@ -523,15 +546,20 @@ def append_data_to_unique_clashing_links_data(row): return right -def convert_list_of_link_ids_to_network_nodes(network, link_ids: list): - """ - Extracts nodes corresponding to link ids in the order of given link_ids list. Useful for extracting network routes. - :param network: - :param link_ids: - :return: +def convert_list_of_link_ids_to_network_nodes(network: Network, link_ids: list) -> list: + """Extracts nodes corresponding to link ids in the order of given link_ids list. + + Useful for extracting network routes. + + Args: + network (Network): GeNet network. + link_ids (list): Link IDs whose nodes are to be extracted. + + Returns: + list: Node IDs. """ - paths = [] - connected_path = [] + paths: list = [] + connected_path: list = [] for link_id in link_ids: x, y = network.link_id_mapping[link_id]["from"], network.link_id_mapping[link_id]["to"] if not connected_path: @@ -545,15 +573,25 @@ def convert_list_of_link_ids_to_network_nodes(network, link_ids: list): return paths -def find_shortest_path_link(link_attribute_dictionary, modes=None): - """ - Finds link that is deemed quickest if freespeed present. Relies on (link) id being stored on edge data (default - if using genet Network's `add_link` or `add_edge` methods or reading data using genet's Network methods.) +def find_shortest_path_link( + link_attribute_dictionary: dict, modes: Optional[Union[list, str]] = None +) -> str: + """Finds link that is deemed quickest if freespeed present. + + Relies on (link) id being stored on edge data (default if using genet Network's `add_link` or `add_edge` methods or reading data using genet's Network methods.) Throws a `RuntimeError` if a link id is not found. - :param link_attribute_dictionary: {multi_index_id: {'length': 10}} - :param modes: optional, if passed and there are more than one possible edge that has the same length and speed, - will also check if there is a link with modes that match exactly with `modes`. - :return: + + Args: + link_attribute_dictionary (dict): Link attribute dictionary, e.g. `{multi_index_id: {'length': 10}}`. + modes (Optional[Union[list, str]], optional): + If passed and there is more than one possible edge that has the same length and speed, will also check if there is a link with modes that match exactly with `modes`. + Defaults to None. + + Raises: + RuntimeError: Link ID must exist. + + Returns: + str: Shortest path link ID """ selected_link = None if len(link_attribute_dictionary) > 1: diff --git a/src/genet/utils/parallel.py b/src/genet/utils/parallel.py index 9ec4222f..fd1c54db 100644 --- a/src/genet/utils/parallel.py +++ b/src/genet/utils/parallel.py @@ -1,15 +1,20 @@ import logging import multiprocessing as mp from math import ceil +from typing import Callable, Iterable, TypeVar +T = TypeVar("T", bound=Iterable) -def split_list(_list, processes=1): - """ - Split type function. Partitions list into list of subsets of _list - :param _list: any list - :param processes: number of processes to split data across, takes precedence over k to split data evenly over - exactly the number of processes being used but is optional - :return: list of lists + +def split_list(_list: list, processes: int = 1) -> list[list]: + """Partitions list into list of subsets of _list. + + Args: + _list (list): Input list + processes (int, optional): Number of processes to split data across. Defaults to 1. + + Returns: + list: List of lists """ k = ceil(len(_list) / processes) if len(_list) <= k: @@ -24,11 +29,14 @@ def split_list(_list, processes=1): return l_partitioned -def combine_list(list_list): - """ - Combine type function. Combines list of lists into a single list - :param list_list: list of lists - :return: single list +def combine_list(list_list: list[list]) -> list: + """Flattens list of lists into a single list. + + Args: + list_list (list[list]): list of lists to flatten. + + Returns: + list: Flattened list. """ return_list = [] for res in list_list: @@ -36,13 +44,15 @@ def combine_list(list_list): return return_list -def split_dict(_dict, processes=1): - """ - Split type function. Partitions dict into list of subset dicts of _dict - :param _dict: any dict - :param processes: number of processes to split data across, takes precedence over k to split data evenly over - exactly the number of processes being used but is optional - :return: list of dicts +def split_dict(_dict: dict, processes: int = 1) -> list[dict]: + """Partitions dict into list of subset dicts of _dict + + Args: + _dict (dict): Input dictionary to split. + processes (int, optional): _description. Defaults to 1. + + Returns: + list[dict]: List of dictionaries. """ k = ceil(len(_dict) / processes) if len(_dict) <= k: @@ -53,32 +63,42 @@ def split_dict(_dict, processes=1): return [{key: _dict[key] for key in keys_bunch} for keys_bunch in keys_partitioned] -def combine_dict(list_dict): - """ - Combine type function. Combines list of dicts into a single dict. Assumes keys don't clash - :param list_dict: list of lists - :return: single list +def combine_dict(list_dict: list[dict]) -> dict: + """Flattens list of dicts into a single dict. + + Args: + list_dict (list[dict]): list of dicts to flatten. + + Returns: + dict: Flattened dict. """ - return_dict = {} + return_dict: dict = {} for res in list_dict: return_dict = {**return_dict, **res} return return_dict -def multiprocess_wrap(data, split, apply, combine, processes=1, **kwargs): - """ - Split up data into batches using `split` function and process in parallel using `apply(data, kwargs)` function, - kwargs is a dictionary of arguments. Results of all parallel processes are consolidated using the given `combine` - function. - :param data: data the function expects, which should be partitioned by split function to be processed in parallel - :param split: function which partitions `data` into list of bunches of same type as `data` to be processed in - parallel. Include `processes` variable in this function if you want to use this variable for splitting data - :param apply: function that expects `data`, process to be applied to `data` in parallel - :param combine: function which expects a list of the returns of function `apply` and combines it back into - what `apply` would have returned if it had been ran in a single process - :param processes: max number of processes to use for computations - :param kwargs: that need to be passed to the function `apply` which remain constant across all data - :return: output of the combine function +def multiprocess_wrap( + data: T, split: Callable, apply: Callable, combine: Callable, processes: int = 1, **kwargs +) -> T: + """Batch process data using a `split-apply-combine` approach. + + Results of all parallel processes are consolidated using the given `combine` function. + + Args: + data (Iterable): Data the `apply` function expects, which will be partitioned by `split` function if the number of parallel `processes` > 1. + split (Callable): + Function which partitions `data` into list of batches of same type as `data` to be processed in parallel. + `processes` argument must be greater than 1 if you want data to be split. + apply (Callable): Function that expects `data` or a subset of it (if `data` has been split). + combine (Callable): + Function which expects a list of the returns of function `apply` and combines it back into what `apply` would have returned if it had been run in a single process. + processes (int, optional): Max number of processes to use for computations. Defaults to 1. + + Keyword Args: will be passed to the `apply` function. + + Returns: + Iterable: output of (in order of application) `split`, `apply`, then `combine` functions. """ if processes == 1: return apply(data, **kwargs) diff --git a/src/genet/utils/plot.py b/src/genet/utils/plot.py index 71a52d5c..66447837 100644 --- a/src/genet/utils/plot.py +++ b/src/genet/utils/plot.py @@ -1,21 +1,26 @@ from copy import deepcopy -from typing import Dict +from typing import Optional, Union import geopandas as gpd import keplergl def plot_geodataframes_on_kepler_map( - gdfs: Dict[str, gpd.GeoDataFrame], height=750, kepler_config=None -): - """ - Plots geodataframes on a kepler map. - :param gdfs: {'gdf name': gdf} dictionary of geodataframes - :param height: optional, height for the kepler map - :param kepler_config: optional, dict kepler config or one of the keys in the predefined configs in KEPLER_CONFIGS - :return: + gdfs: dict[str, gpd.GeoDataFrame], + height: int = 750, + kepler_config: Optional[Union[dict, str]] = None, +) -> keplergl.KeplerGl: + """Plots geodataframes on a kepler map. + + Args: + gdfs (dict[str, gpd.GeoDataFrame]): {'gdf name': gdf} dictionary of geodataframes. + height (int, optional): height for the kepler map. Defaults to 750. + kepler_config (Optional[dict | str], optional): If given, kepler config or one of the keys in the predefined configs in KEPLER_CONFIGS. Defaults to None. + + Returns: + keplergl.KeplerGl: Kepler plot object """ - if kepler_config in KEPLER_CONFIGS: + if isinstance(kepler_config, str) and kepler_config in KEPLER_CONFIGS: kepler_config = KEPLER_CONFIGS[kepler_config] elif isinstance(kepler_config, dict): kepler_config = kepler_config diff --git a/src/genet/utils/secrets_vault.py b/src/genet/utils/secrets_vault.py index 111108e0..73d914db 100644 --- a/src/genet/utils/secrets_vault.py +++ b/src/genet/utils/secrets_vault.py @@ -1,34 +1,49 @@ import json import os +from typing import Optional import boto3 -def get_google_directions_api_key(secret_name: str = None, region_name: str = None): - """ - Extracts google directions api key from environmental variable or secrets manager - :param secret_name: - :param region_name: - :return: +def get_google_directions_api_key( + secret_name: Optional[str] = None, region_name: Optional[str] = None +) -> Optional[str]: + """Extracts google directions api key from environmental variable or secrets manager. + + Args: + secret_name (Optional[str], optional): + If given and API key is not an environment variable, will search for the secret in the AWS secrets manager. + Defaults to None. + region_name (Optional[str], optional): + If given and API key is not an environment variable, will search for the secret in the given AWS region account. + Defaults to None. + + Returns: + Optional[str]: Google API key, if there is one to find. """ - key = None - if os.getenv("GOOGLE_DIR_API_KEY"): - key = os.getenv("GOOGLE_DIR_API_KEY") - elif secret_name and region_name: - key = get_secret_as_dict(secret_name, region_name) - if "key" in key: - key = key["key"] - elif "api_key" in key: - key = key["api_key"] + key: Optional[str] = os.getenv("GOOGLE_DIR_API_KEY") + + if key is None and (secret_name is not None and region_name is not None): + key_dict = get_secret_as_dict(secret_name, region_name) + if "key" in key_dict: + key = key_dict["key"] + elif "api_key" in key_dict: + key = key_dict["api_key"] return key -def get_secret(secret_name, region_name): - """ - Extracts api key from aws secrets manager - :param secret_name: - :param region_name: - :return: +def get_secret(secret_name: str, region_name: str) -> str: + """Extracts api key from aws secrets manager. + + Args: + secret_name (str): + Will search for the secret in the AWS secrets manager. + region_name (str): + Will search for the secret in the given AWS region account. + + Returns: + str: JSON response string. + """ client = boto3.client("secretsmanager", region_name=region_name) @@ -46,7 +61,7 @@ def get_secret(secret_name, region_name): return response["SecretBinary"] -def get_secret_as_dict(secret_name, region_name): +def get_secret_as_dict(secret_name: str, region_name: str) -> dict: string_secret = get_secret(secret_name, region_name) if string_secret is not None: return json.loads(string_secret) diff --git a/src/genet/utils/simplification.py b/src/genet/utils/simplification.py index 7a0e7264..59e18ecc 100644 --- a/src/genet/utils/simplification.py +++ b/src/genet/utils/simplification.py @@ -4,6 +4,7 @@ from shapely.geometry import LineString, Point +import genet import genet.utils.parallel as parallel from genet.utils.persistence import setify @@ -116,13 +117,21 @@ def _assemble_path_data(n, indexed_paths_to_simplify): return return_d -def _is_endpoint(node_neighbours): +def _is_endpoint(node_neighbours: dict) -> list[str]: """ - :param node_neighbours: dict {node: { - successors: {set of nodes that you can reach from node}, - predecessors: {set of nodes that lead to node} - }} - :return: + + Args: + node_neighbours (dict): + E.g., + ```python + {node: { + successors: {set of nodes that you can reach from node}, + predecessors: {set of nodes that lead to node} + }} + ``` + + Returns: + list[str]: Nodes that are endpoints. """ return [ node @@ -198,26 +207,22 @@ def _get_edge_groups_to_simplify(G, no_processes=1): ) -def simplify_graph(n, no_processes=1): - """ - MONKEY PATCH OF OSMNX'S GRAPH SIMPLIFICATION ALGO +def simplify_graph(n: "genet.core.Network", no_processes=1): + """Simplify a graph's topology by removing interstitial nodes. - Simplify a graph's topology by removing interstitial nodes. + MONKEY PATCH OF OSMNX'S GRAPH SIMPLIFICATION ALGO - Simplify graph topology by removing all nodes that are not intersections - or dead-ends. Create an edge directly between the end points that - encapsulate them, but retain the geometry of the original edges, saved as - attribute in new edge. + Simplify graph topology by removing all nodes that are not intersections or dead-ends. + Create an edge directly between the end points that encapsulate them, + but retain the geometry of the original edges, saved as attribute in new edge. - Parameters - ---------- - n: genet.Network object - no_processes: number of processes to split some of the processess across + Updates network graph, indexing and schedule routes in-place. + Adds a new attribute to n that records map between old and new link indices - Returns - ------- - None, updates n.graph, indexing and schedule routes. Adds a new attribute to n that records map between old - and new link indices + Args: + n (Network): GeNet network. + no_processes (int, optional): + Number of processes to split some of the processes across. Defaults to 1. """ logging.info("Begin simplifying the graph") initial_node_count = len(list(n.graph.nodes())) diff --git a/src/genet/utils/spatial.py b/src/genet/utils/spatial.py index 44cb5dc5..5362c699 100644 --- a/src/genet/utils/spatial.py +++ b/src/genet/utils/spatial.py @@ -1,7 +1,7 @@ import json import logging import statistics -from typing import Tuple +from typing import Union import geopandas as gpd import networkx as nx @@ -13,6 +13,7 @@ from shapely.ops import linemerge, split from sklearn.neighbors import BallTree +import genet import genet.output.geojson as gngeojson from genet.exceptions import EmptySpatialTree @@ -20,75 +21,104 @@ S2_LEVELS_FOR_SPATIAL_INDEXING = [0, 6, 8, 12, 18, 24, 30] -def decode_polyline_to_s2_points(_polyline): +def decode_polyline_to_s2_points(_polyline: str) -> list[int]: """ - :param _polyline: google encoded polyline - :return: + + Args: + _polyline (str): google encoded polyline. + + Returns: + list[int]: S2 points describing the polyline. """ decoded = polyline.decode(_polyline) return [generate_index_s2(lat, lon) for lat, lon in decoded] -def encode_shapely_linestring_to_polyline(linestring): +def encode_shapely_linestring_to_polyline(linestring: LineString) -> str: """ - :param linestring: shapely.geometry.LineString - :return: google encoded polyline + + Args: + linestring (LineString): Shapely LineString to encode. + + Returns: + str: Google encoded polyline. """ return polyline.encode(linestring.coords) -def swap_x_y_in_linestring(linestring): - """ - swaps x with y in a shapely linestring,e.g. from LineString([(1,2), (3,4)]) to LineString([(2,1), (4,3)]) - :param linestring: shapely.geometry.LineString - :return: shapely.geometry.LineString +def swap_x_y_in_linestring(linestring: LineString) -> LineString: + """Swaps x with y in a shapely linestring. + + e.g. from LineString([(1,2), (3,4)]) to LineString([(2,1), (4,3)]). + + Args: + linestring (LineString): Input linestring. + + Returns: + LineString: Input linestring with swapped x and y coordinates. """ return LineString((p[1], p[0]) for p in linestring.coords) -def merge_linestrings(linestring_list): +def merge_linestrings(linestring_list: list[LineString]) -> Union[LineString, MultiLineString]: """ - :param linestring_list: ordered list of shapely.geometry.Linestring objects. - Assumes lines are contiguous. If they are not, will results in a MultiLineString - :return: + + Args: + linestring_list (list[LineString]): ordered list of shapely.geometry.Linestring objects. + + Returns: + Union[LineString, MultiLineString]: + Assumes lines are contiguous. If they are not, will result in a MultiLineString. """ multi_line = MultiLineString(linestring_list) return linemerge(multi_line) -def snap_point_to_line(point: Point, line: LineString, distance_threshold=1e-8) -> Point: - """ - Snap a point to a line, if over a distance threshold. +def snap_point_to_line(point: Point, line: LineString, distance_threshold: float = 1e-8) -> Point: + """Snap a point to a line, if over a distance threshold. + Not using 'contains' method due to too high accuracy required to evaluate to True. - :param point: Point to be snapped to line, IF not close enough - :param line: Line to use for the Point to snap to - :param distance_threshold: default 1e-8, acceptable distance of point from line before snapping - :return: + + Args: + point (Point): Point to be potentially snapped to line. + line (LineString): Line to use for the Point to snap to + distance_threshold (float, optional): Acceptable distance of point from line before snapping. Defaults to 1e-8. + + Returns: + Point: Point on line that is closest to input `point` or `point` itself, if it is within `distance_threshold`. """ if line.distance(point) > distance_threshold: point = line.interpolate(line.project(point)) return point -def continue_line_from_two_points(p1, p2) -> LineString: - """ - Builds a line from p1, p2 and another point, ahead, the same distance and direction from p2 as p1 - :param p1: - :param p2: - :return: +def continue_line_from_two_points(p1: Point, p2: Point) -> LineString: + """Builds a line from p1, p2 and another point, ahead, the same distance and direction from p2 as p1. + + Args: + p1 (Point): Start point of line. + p2 (Point): End point of line. + + Returns: + LineString: Line from p1 to p2. """ return LineString([p1, p2, (p2.x + (p2.x - p1.x), p2.y + (p2.y - p1.y))]) -def split_line_at_point(point: Point, line: LineString) -> Tuple[LineString, LineString]: - """ +def split_line_at_point(point: Point, line: LineString) -> tuple[LineString, LineString]: + """Returns a two-tuple of linestring slices of given line, split at the given point. + If the point is not close enough to the line, it will be snapped. - Returns a two-tuple of linestring slices of given line, split at the given point. The order in the tuple preserves - the given line. - :param point: point used for dividing the line - :param line: line to divide - :return: if given line from A - B, the output will be (A - point, point - B) - subject to point needing to - snap closer to the line + + The order in the returned tuple preserves the given line. + + Args: + point (Point): point used for dividing the line + line (LineString): line to divide + + Returns: + tuple[LineString, LineString]: + If given line from A - B, the output will be (A - point, point - B) - subject to point needing to snap closer to the line. """ # the point has to be on the line for shapely split # https://shapely.readthedocs.io/en/stable/manual.html#splitting @@ -110,22 +140,30 @@ def split_line_at_point(point: Point, line: LineString) -> Tuple[LineString, Lin return result -def decode_polyline_to_shapely_linestring(_polyline): +def decode_polyline_to_shapely_linestring(_polyline: str) -> LineString: """ - :param _polyline: google encoded polyline - :return: shapely.geometry.LineString + + Args: + _polyline (str): google encoded polyline + + Returns: + LineString: Shapely linestring representation of input polyline. """ decoded = polyline.decode(_polyline) return LineString(decoded) -def compute_average_proximity_to_polyline(poly_1, poly_2): - """ - Computes average distance between points in poly_1 and closest points in poly_2. Works best when poly_1 is less - dense with points than poly_2. - :param poly_1: google encoded polyline - :param poly_2: google encoded polyline - :return: +def compute_average_proximity_to_polyline(poly_1: str, poly_2: str) -> float: + """Computes average distance between points in poly_1 and closest points in poly_2. + + Works best when poly_1 is less dense with points than poly_2. + + Args: + poly_1 (str): google encoded polyline. + poly_2 (str): google encoded polyline + + Returns: + float: Average distance between points in poly_1 and their respective closest points in poly_2. """ s2_poly_list_1 = decode_polyline_to_s2_points(poly_1) s2_poly_list_2 = decode_polyline_to_s2_points(poly_2) @@ -156,21 +194,29 @@ def s2_hex_to_cell_union(hex_area): return s2.CellUnion(cell_ids=cell_ids) -def generate_index_s2(lat, lng): - """ - Returns s2.CellId from lat and lon - :param lat - :param lng - :return: +def generate_index_s2(lat: float, lng: float) -> int: + """Returns s2.CellId from lat and lon + + Args: + lat (float): Latitude. + lng (float): Longitude. + + Returns: + int: S2 cell ID. """ return s2.CellId.from_lat_lng(s2.LatLng.from_degrees(lat, lng)).id() -def generate_s2_geometry(points): - """ - Generate ordered list of s2.CellIds - :param points: list of (lat,lng) tuples, list of shapely.geometry.Points or LineString - :return: +def generate_s2_geometry( + points: Union[LineString, list[tuple[float, float]], list[Point]] +) -> list[int]: + """Generate ordered list of s2.CellIds + + Args: + points (Union[LineString, list[tuple[float, float]], list[Point]]): Points to convert to S2 Cell IDs + + Returns: + list[int]: List of S2 Cell IDs """ if isinstance(points, LineString): points = list(points.coords) @@ -201,10 +247,17 @@ def grow_point(x, distance): return x.buffer(distance) -def map_azimuth_to_name(azimuth): +def map_azimuth_to_name(azimuth: float) -> str: """ - assumes -180 =< azimuth =< 180 - degrees from North (0) + + Args: + azimuth (float): degrees from North (0). + + Raises: + NotImplementedError: assumes -180 =< azimuth =< 180. + + Returns: + str: String defining compass direction, e.g. "North Bound". """ azimuth_to_name = { (-22.5, 22.5): "North Bound", @@ -312,12 +365,15 @@ def __init__(self, n=None): if n is not None: self.add_links(n) - def add_links(self, n): - """ - Generates the spatial tree where all links in `n` are nodes and edges exists between nodes if the two links - share to and from (`n`) nodes; i.e. the two links are connected at a node - :param n: genet.Network object - :return: + def add_links(self, n: "genet.core.Network"): + """Generates a spatial tree from links in a network. + + Nodes of the spatial tree are generated to represent the links of the network. + Edges of the spatial tree are generated between the network links which share `to` and `from` nodes; + i.e. the two links are connected at a node. + + Args: + n (genet.core.Network): GeNet network. """ self.links = n.to_geodataframe()["links"].to_crs("epsg:4326") self.links = self.links.rename(columns={"id": "link_id"}) @@ -348,11 +404,17 @@ def add_links(self, n): ) ) - def modal_links_geodataframe(self, modes): - """ - Subsets the links geodataframe on modes - :param modes: str or set of str - :return: + def modal_links_geodataframe(self, modes: Union[str, set[str]]) -> gpd.GeoDataFrame: + """Subsets the links geodataframe on modes + + Args: + modes (Union[str, set[str]]): single or set of modes. + + Raises: + EmptySpatialTree: At least one link must include one of the input modes. + + Returns: + gpd.GeoDataFrame: links that include subset of modes. """ if isinstance(modes, str): modes = {modes} @@ -361,25 +423,38 @@ def modal_links_geodataframe(self, modes): raise EmptySpatialTree(f"No links found satisfying modes: {modes}") return _df - def modal_subtree(self, modes): - """ - :param modes: str of set of strings to consider modal subgraph - :return: + def modal_subtree(self, modes: Union[str, set[str]]) -> nx.Graph: + """Create a networkx subgraph from subset of links which match the input modes. + + Args: + modes (Union[str, set[str]]): single or set of modes. + + Returns: + nx.Graph: Subgraph of Self. """ + sub_tree = self.__class__() links = gpd.GeoDataFrame(self.modal_links_geodataframe(modes)) sub_tree = self.subgraph(links["link_id"]) sub_tree.links = links return sub_tree - def closest_links(self, gdf_points, distance_radius): - """ - Given a GeoDataFrame `gdf_points` with a`geometry` column with shapely.geometry.Points, + def closest_links( + self, gdf_points: gpd.GeoDataFrame, distance_radius: float + ) -> gpd.GeoDataFrame: + """Finds closest links from a list of points within a given radius. + + Given a GeoDataFrame `gdf_points` with a `geometry` column of shapely.geometry.Points, finds closest links within `distance_radius` from the spatial tree which accept `mode`. + Does not work very close to the poles. - :param gdf_points: GeoDataFrame, uniquely indexed, in crs: EPSG:4326 shapely.geometry.Points (lon,lat) - :param distance_radius: metres - :return: GeoDataFrame + + Args: + gdf_points (gpd.GeoDataFrame): Uniquely indexed, in crs: EPSG:4326 and only containing shapely.geometry.Points (lon,lat). + distance_radius (float): Metres in which to consider possible links. + + Returns: + gpd.GeoDataFrame: Closest links to points. """ bdds = gdf_points["geometry"].bounds approx_lat = (bdds["miny"].mean() + bdds["maxy"].mean()) / 2 @@ -406,14 +481,24 @@ def path(self, G, source, target, weight=None): except (nx.NetworkXNoPath, nx.NodeNotFound): pass - def shortest_paths(self, df_pt_edges, from_col="u", to_col="v", weight="length"): + def shortest_paths( + self, + df_pt_edges: pd.DataFrame, + from_col: str = "u", + to_col: str = "v", + weight: str = "length", + ) -> pd.DataFrame: """ - :param df_pt_edges: pandas DataFrame with a `from_col` and `to_col` defining links stored in the graph for - which a path is required - :param from_col: name of the column which gives ID for the source link - :param to_col: name of the column which gives ID for the target link - :param weight: weight for routing, defaults ot length - :return: df_pt_edges with an extra column 'shortest_path' + + Args: + df_pt_edges (pd.DataFrame): + DataFrame with a `from_col` and `to_col` defining links stored in the graph for which a path is required + from_col (str, optional): Name of the column which gives ID for the source link. Defaults to "u". + to_col (str, optional): Name of the column which gives ID for the target link. Defaults to "v". + weight (str, optional): Weight for routing. Defaults to "length". + + Returns: + pd.DataFrame: `df_pt_edges` with an extra column 'shortest_path' """ if df_pt_edges.empty: df_pt_edges["shortest_path"] = None @@ -436,14 +521,24 @@ def path_length(self, G, source, target, weight=None): except nx.NetworkXNoPath: pass - def shortest_path_lengths(self, df_pt_edges, from_col="u", to_col="v", weight="length"): + def shortest_path_lengths( + self, + df_pt_edges: pd.DataFrame, + from_col: str = "u", + to_col: str = "v", + weight: str = "length", + ) -> pd.DataFrame: """ - :param df_pt_edges: pandas DataFrame with a `from_col` and `to_col` defining links stored in the graph for - which a path length is required - :param from_col: name of the column which gives ID for the source link - :param to_col: name of the column which gives ID for the target link - :param weight: weight for routing, defaults ot length - :return: df_pt_edges with an extra column 'shortest_path' + + Args: + df_pt_edges (pd.DataFrame): + DataFrame with a `from_col` and `to_col` defining links stored in the graph for which a path length is required. + from_col (str, optional): Name of the column which gives ID for the source link. Defaults to "u". + to_col (str, optional): Name of the column which gives ID for the target link. Defaults to "v". + weight (str, optional): Weight for routing. Defaults to "length". + + Returns: + pd.DataFrame: `df_pt_edges` with an extra column 'shortest_path' """ if df_pt_edges.empty: df_pt_edges["path_lengths"] = None diff --git a/src/genet/validate/network.py b/src/genet/validate/network.py index 70441f6b..41aa302e 100644 --- a/src/genet/validate/network.py +++ b/src/genet/validate/network.py @@ -29,11 +29,14 @@ def find_connected_subgraphs(G): ] -def describe_graph_connectivity(G): - """ - Computes dead ends and unreachable nodes in G. Computes strongly connected components of G - :param G: - :return: +def describe_graph_connectivity(G: nx.Graph) -> dict: + """Computes dead ends, unreachable nodes, and strongly connected components of G. + + Args: + G (nx.Graph): Network graph. + + Returns: + dict: Summary of problem nodes and strongly connected components of G. """ dict_to_return = {} # find dead ends or unreachable nodes