diff --git a/database/istsos_example_data.sql b/database/istsos_example_data.sql index 89796ba..32b7198 100644 --- a/database/istsos_example_data.sql +++ b/database/istsos_example_data.sql @@ -81,11 +81,15 @@ VALUES ('Room 102', 'Feature of interest for Room 102', 'application/vnd.geo+jso -- observation -INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "result", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") -VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 23.5, NULL, NULL, NULL, 1, 1); +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "featuresofinterest_id") +VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 2, 23.5, NULL, NULL, NULL, 1, 1); -INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "result", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") -VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 23.5, NULL, NULL, NULL, 2, 2); +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "featuresofinterest_id") +VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 2, 23.5, NULL, NULL, NULL, 2, 2); + +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "featuresofinterest_id") +VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 2, 23.5, NULL, NULL, NULL, 3, 3); + +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultString", "resultQuality", "validTime", "parameters", "datastream_id", "featuresofinterest_id") +VALUES ('2023-03-26 10:30:00-04', '2023-03-26 10:30:00-04', 0, 'Test', NULL, NULL, NULL, 3, 3); -INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "result", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") -VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 23.5, NULL, NULL, NULL, 3, 3); diff --git a/database/istsos_schema.sql b/database/istsos_schema.sql index 87d973b..bd1d188 100644 --- a/database/istsos_schema.sql +++ b/database/istsos_schema.sql @@ -13,23 +13,23 @@ CREATE TABLE IF NOT EXISTS sensorthings."Location" ( "description" TEXT NOT NULL, "encodingType" VARCHAR(100) NOT NULL, "location" geometry(geometry, 4326) NOT NULL, - "properties" jsonb NOT NULL + "properties" jsonb ); CREATE TABLE IF NOT EXISTS sensorthings."Thing" ( "id" BIGSERIAL NOT NULL PRIMARY KEY, "name" VARCHAR(255) UNIQUE NOT NULL, "description" TEXT NOT NULL, - "properties" jsonb NOT NULL, - "location_id" BIGINT REFERENCES sensorthings."Location" (id) + "properties" jsonb, + "location_id" BIGINT REFERENCES sensorthings."Location" (id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS sensorthings."HistoricalLocation" ( id BIGSERIAL NOT NULL PRIMARY KEY, time TIMESTAMPTZ NOT NULL, - thing_id BIGINT REFERENCES sensorthings."Thing"(id), - location_id BIGINT REFERENCES sensorthings."Location"(id) + thing_id BIGINT REFERENCES sensorthings."Thing"(id) ON DELETE CASCADE, + location_id BIGINT REFERENCES sensorthings."Location"(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS sensorthings."ObservedProperty" ( @@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS sensorthings."ObservedProperty" ( "name" VARCHAR(255) UNIQUE NOT NULL, "definition" VARCHAR(255) NOT NULL, "description" VARCHAR(255) NOT NULL, - "properties" jsonb NOT NULL + "properties" jsonb ); CREATE TABLE IF NOT EXISTS sensorthings."Sensor" ( @@ -46,7 +46,7 @@ CREATE TABLE IF NOT EXISTS sensorthings."Sensor" ( "description" VARCHAR(255) NOT NULL, "encodingType" VARCHAR(100) NOT NULL, "metadata" jsonb NOT NULL, - "properties" jsonb NOT NULL + "properties" jsonb ); CREATE TABLE IF NOT EXISTS sensorthings."Datastream" ( @@ -59,9 +59,9 @@ CREATE TABLE IF NOT EXISTS sensorthings."Datastream" ( "phenomenonTime" tstzrange, "resultTime" tstzrange, "properties" jsonb, - "thing_id" BIGINT REFERENCES sensorthings."Thing"(id) NOT NULL, - "sensor_id" BIGINT REFERENCES sensorthings."Sensor"(id) NOT NULL, - "observedproperty_id" BIGINT REFERENCES sensorthings."ObservedProperty"(id) + "thing_id" BIGINT REFERENCES sensorthings."Thing"(id) ON DELETE CASCADE, + "sensor_id" BIGINT REFERENCES sensorthings."Sensor"(id) ON DELETE CASCADE, + "observedproperty_id" BIGINT REFERENCES sensorthings."ObservedProperty"(id) ON DELETE CASCADE ); @@ -71,22 +71,37 @@ CREATE TABLE IF NOT EXISTS sensorthings."FeaturesOfInterest" ( "description" VARCHAR(255) NOT NULL, "encodingType" VARCHAR(100) NOT NULL, "feature" geometry(geometry, 4326) NOT NULL, - "properties" jsonb NOT NULL + "properties" jsonb ); CREATE TABLE IF NOT EXISTS sensorthings."Observation" ( "id" BIGSERIAL PRIMARY KEY, "phenomenonTime" TIMESTAMPTZ NOT NULL, "resultTime" TIMESTAMPTZ NOT NULL, - "result" FLOAT NOT NULL, + "resultType" INT NOT NULL, + "resultString" TEXT, + "resultInteger" INT, + "resultDouble" DOUBLE PRECISION, + "resultBoolean" BOOLEAN, + "resultJSON" jsonb, "resultQuality" TEXT, "validTime" tstzrange DEFAULT NULL, "parameters" jsonb, - "datastream_id" BIGINT REFERENCES sensorthings."Datastream"(id), - "feature_of_interest_id" BIGINT REFERENCES sensorthings."FeaturesOfInterest"(id), + "datastream_id" BIGINT REFERENCES sensorthings."Datastream"(id) ON DELETE CASCADE, + "featuresofinterest_id" BIGINT REFERENCES sensorthings."FeaturesOfInterest"(id) ON DELETE CASCADE, UNIQUE ("datastream_id", "phenomenonTime") ); +CREATE OR REPLACE FUNCTION result(sensorthings."Observation") RETURNS text AS $$ + SELECT CASE WHEN $1."resultType" = 0 THEN $1."resultString" + WHEN $1."resultType" = 1 THEN $1."resultInteger"::text + WHEN $1."resultType" = 2 THEN $1."resultDouble"::text + WHEN $1."resultType" = 3 THEN $1."resultBoolean"::text + WHEN $1."resultType" = 4 THEN $1."resultJSON"::text + ELSE NULL + END; +$$ LANGUAGE SQL; + CREATE OR REPLACE FUNCTION "@iot.id"(anyelement) RETURNS text AS $$ SELECT $1.id; $$ LANGUAGE SQL; diff --git a/database/version_observation.sql b/database/version_observation.sql index 354cec3..69ce1e7 100644 --- a/database/version_observation.sql +++ b/database/version_observation.sql @@ -10,7 +10,7 @@ insert "validTime", "parameters", "datastream_id", - "feature_of_interest_id" + "featuresofinterest_id" ) values ( now(), @@ -33,7 +33,7 @@ set --"validTime" = null, --parameters = null, --datastream_id = 1, - feature_of_interest_id = 1 + featuresofinterest_id = 1 --system_time_validity = '["2023-03-25 14:35:11.272722+01",infinity)'::tstzrange where id = 2; @@ -48,7 +48,7 @@ set --"validTime" = null, --parameters = null, --datastream_id = 1, - feature_of_interest_id = 1 + featuresofinterest_id = 1 --system_time_validity = '["2023-03-25 14:35:11.272722+01",infinity)'::tstzrange where id = 2; @@ -63,7 +63,7 @@ set --"validTime" = null, --parameters = null, --datastream_id = 1, - feature_of_interest_id = 1 + featuresofinterest_id = 1 --system_time_validity = '["2023-03-25 14:35:11.272722+01",infinity)'::tstzrange where id = 2; diff --git a/docker-compose.yml b/docker-compose.yml index bd944ba..85fa17a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,24 +35,6 @@ services: depends_on: - database - # POSTGRESQL - # database_test: - # build: - # context: ./database_test - # dockerfile: Dockerfile - # restart: on-failure - # ports: - # - "55432:5432" - # environment: - # POSTGRES_DB: istsos_test - # POSTGRES_USER: admin - # POSTGRES_PASSWORD: admin - # DATADIR: /var/lib/postgresql/data - # POSTGRES_MULTIPLE_EXTENSIONS: postgis,hstore,postgis_topology,postgis_raster,pgrouting,ltree,pg_cron,uuid-ossp - # volumes: - # # - v-istsos-miu-postgis-sql-test:/docker-entrypoint-initdb.d - # - v-istsos-miu-database-data-test:/var/lib/postgresql/data - api: build: context: ./fastapi @@ -63,13 +45,13 @@ services: ports: - 8018:5000 command: uvicorn --reload --workers 1 --host 0.0.0.0 --port 5000 app.main:app + depends_on: + - postgrest volumes: v-istsos-miu-postgis-sql: name: v-istsos-miu-postgis-sql v-istsos-miu-database-data: name: v-istsos-miu-database-data - # v-istsos-miu-postgis-sql-test: - # name: v-istsos-miu-postgis-sql-test v-istsos-miu-database-data-test: name: v-istsos-miu-database-data-test diff --git a/fastapi/app/models/observation.py b/fastapi/app/models/observation.py index 03319b7..066aef1 100644 --- a/fastapi/app/models/observation.py +++ b/fastapi/app/models/observation.py @@ -14,7 +14,7 @@ class Observation(BaseModel): validTime: str | None = None parameters: Json | None = None datastream_id: int - feature_of_interest_id: int + featuresofinterest_id: int class put_Observation(BaseModel): id: int | None = None @@ -25,4 +25,4 @@ class put_Observation(BaseModel): validTime: str | None = None parameters: Json | None = None datastream_id: int | None = None - feature_of_interest_id: int | None = None \ No newline at end of file + featuresofinterest_id: int | None = None \ No newline at end of file diff --git a/fastapi/app/settings.py b/fastapi/app/settings.py new file mode 100644 index 0000000..c31533c --- /dev/null +++ b/fastapi/app/settings.py @@ -0,0 +1,30 @@ +tables = ["Datastreams", "FeaturesOfInterest", "HistoricalLocations", "Locations", "Observations", "ObservedProperties", "Sensors", "Things"] +serverSettings = { + "conformance": [ + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/thing", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/location", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/historical-location", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/datastream", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/sensor", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/observed-property", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/observation", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/feature-of-interest", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/entity-control-information", + "http://www.opengis.net/spec/iot_sensing/1.1/req/resource-path", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/order", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/expand", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/select", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/orderby", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/skip", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/top", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/filter", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/built-in-filter-operations", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/create-entity", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/link-to-existing-entities", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/deep-insert", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/deep-insert-status-code", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/update-entity", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/delete-entity", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/update-entity-jsonpatch", + ], +} diff --git a/fastapi/app/sta2pgrest.py b/fastapi/app/sta2pgrest.py deleted file mode 100644 index 82cb23d..0000000 --- a/fastapi/app/sta2pgrest.py +++ /dev/null @@ -1,30 +0,0 @@ -# This tools convert STA requests to postgREST requests. - -def stqa2pgr(sta_obj,sta_par): - pgr_obj = { - 'select': None, - 'limit': None, - 'properties': None, - 'order': None - } - - if sta_par.select: - for obj in select: - pgr_obj['select'][obj]: None - if sta_par.top: - pgr_obj['limit'] = sta_par.top - - if sta_par.filter: - - - sta_par = { - self.expand = expand, - self.select = select, - self.orderby = orderby, - self.top = top, - self.skip = skip, - self.count = count, - self.filter = filter, - self.resultFormat = resultFormat - self.asofsystime = as_of_system_time - } diff --git a/fastapi/app/sta2rest/sta2rest.py b/fastapi/app/sta2rest/sta2rest.py index 4d14f3c..6bc6821 100644 --- a/fastapi/app/sta2rest/sta2rest.py +++ b/fastapi/app/sta2rest/sta2rest.py @@ -7,13 +7,13 @@ representations in a REST API. """ import re -import sta_parser.ast as ast from .filter_visitor import FilterVisitor from odata_query.grammar import ODataLexer from odata_query.grammar import ODataParser -from sta_parser.lexer import Lexer -from sta_parser.visitor import Visitor -from sta_parser.parser import Parser +from .sta_parser.ast import * +from .sta_parser.lexer import Lexer +from .sta_parser.visitor import Visitor +from .sta_parser.parser import Parser # Create the OData lexer and parser odata_filter_lexer = ODataLexer() @@ -34,7 +34,7 @@ def __init__(self, main_entity=None): This class provides a visitor to convert a STA query to a PostgREST query. """ - def visit_IdentifierNode(self, node: ast.IdentifierNode): + def visit_IdentifierNode(self, node: IdentifierNode): """ Visit an identifier node. @@ -54,7 +54,7 @@ def visit_IdentifierNode(self, node: ast.IdentifierNode): return node.name - def visit_SelectNode(self, node: ast.SelectNode): + def visit_SelectNode(self, node: SelectNode): """ Visit a select node. @@ -68,7 +68,7 @@ def visit_SelectNode(self, node: ast.SelectNode): identifiers = ','.join([self.visit(identifier) for identifier in node.identifiers]) return f'select={identifiers}' - def visit_FilterNode(self, node: ast.FilterNode): + def visit_FilterNode(self, node: FilterNode): """ Visit a filter node. @@ -88,7 +88,7 @@ def visit_FilterNode(self, node: ast.FilterNode): res = FilterVisitor().visit(ast) return res - def visit_OrderByNodeIdentifier(self, node: ast.OrderByNodeIdentifier): + def visit_OrderByNodeIdentifier(self, node: OrderByNodeIdentifier): """ Visit an orderby node identifier. @@ -102,7 +102,7 @@ def visit_OrderByNodeIdentifier(self, node: ast.OrderByNodeIdentifier): # Convert the identifier to the format name.order return f'{node.identifier}.{node.order}' - def visit_OrderByNode(self, node: ast.OrderByNode): + def visit_OrderByNode(self, node: OrderByNode): """ Visit an orderby node. @@ -115,7 +115,7 @@ def visit_OrderByNode(self, node: ast.OrderByNode): identifiers = ','.join([self.visit(identifier) for identifier in node.identifiers]) return f'order={identifiers}' - def visit_SkipNode(self, node: ast.SkipNode): + def visit_SkipNode(self, node: SkipNode): """ Visit a skip node. @@ -127,7 +127,7 @@ def visit_SkipNode(self, node: ast.SkipNode): """ return f'offset={node.count}' - def visit_TopNode(self, node: ast.TopNode): + def visit_TopNode(self, node: TopNode): """ Visit a top node. @@ -139,7 +139,7 @@ def visit_TopNode(self, node: ast.TopNode): """ return f'limit={node.count}' - def visit_CountNode(self, node: ast.CountNode): + def visit_CountNode(self, node: CountNode): """ Visit a count node. @@ -151,7 +151,7 @@ def visit_CountNode(self, node: ast.CountNode): """ return f'count={node.value}' - def visit_ExpandNode(self, node: ast.ExpandNode, parent=None): + def visit_ExpandNode(self, node: ExpandNode, parent=None): """ Visit an expand node. @@ -188,9 +188,9 @@ def visit_ExpandNode(self, node: ast.ExpandNode, parent=None): # check if we have a select, filter, orderby, skip, top or count in the subquery if expand_identifier.subquery.select: if not select: - select = ast.SelectNode([]) + select = SelectNode([]) identifiers = ','.join([self.visit(identifier) for identifier in expand_identifier.subquery.select.identifiers]) - select.identifiers.append(ast.IdentifierNode(f'{expand_identifier.identifier}({identifiers})')) + select.identifiers.append(IdentifierNode(f'{expand_identifier.identifier}({identifiers})')) if expand_identifier.subquery.filter: result = self.visit_FilterNode(expand_identifier.subquery.filter) filter = prefix + result @@ -210,7 +210,7 @@ def visit_ExpandNode(self, node: ast.ExpandNode, parent=None): # merge the results if result['select']: if not select: - select = ast.SelectNode([]) + select = SelectNode([]) select.identifiers.extend(result['select'].identifiers) if result['orderby']: if orderby: @@ -236,11 +236,11 @@ def visit_ExpandNode(self, node: ast.ExpandNode, parent=None): # If we don't have a subquery, we add the identifier to the select node if not expand_identifier.subquery or not expand_identifier.subquery.select: if not select: - select = ast.SelectNode([]) + select = SelectNode([]) default_columns = STA2REST.get_default_column_names(expand_identifier.identifier) # join default columns as single string default_columns = ','.join(default_columns) - select.identifiers.append(ast.IdentifierNode(f'{expand_identifier.identifier}({default_columns})')) + select.identifiers.append(IdentifierNode(f'{expand_identifier.identifier}({default_columns})')) # Return the converted expand node return { @@ -252,7 +252,7 @@ def visit_ExpandNode(self, node: ast.ExpandNode, parent=None): 'count': count } - def visit_QueryNode(self, node: ast.QueryNode): + def visit_QueryNode(self, node: QueryNode): """ Visit a query node. @@ -274,7 +274,7 @@ def visit_QueryNode(self, node: ast.QueryNode): # Merge the results with the other parts of the query if result['select']: if not node.select: - node.select = ast.SelectNode([]) + node.select = SelectNode([]) node.select.identifiers.extend(result['select'].identifiers) if result['orderby']: query_parts.append(result['orderby']) @@ -288,13 +288,13 @@ def visit_QueryNode(self, node: ast.QueryNode): query_parts.append(result['filter']) if not node.select: - node.select = ast.SelectNode([]) + node.select = SelectNode([]) # Add "@iot.id", "@iot.selfLink" and "*" to the select node # get default columns for main entity default_columns = STA2REST.get_default_column_names(self.main_entity) for column in default_columns: - node.select.identifiers.append(ast.IdentifierNode(column)) + node.select.identifiers.append(IdentifierNode(column)) # Check if we have a select, filter, orderby, skip, top or count in the query if node.select: @@ -482,7 +482,7 @@ def convert_query(full_path: str) -> str: # Check if we have a query - query_ast = ast.QueryNode(None, None, None, None, None, None, None, False) + query_ast = QueryNode(None, None, None, None, None, None, None, False) if query: lexer = Lexer(query) tokens = lexer.tokenize() @@ -506,17 +506,17 @@ def convert_query(full_path: str) -> str: if entities: if not query_ast.expand: - query_ast.expand = ast.ExpandNode([]) + query_ast.expand = ExpandNode([]) index = 0 # Merge the entities with the query for entity in entities: entity_name = entity[0] - sub_query = ast.QueryNode(None, None, None, None, None, None, None, True) + sub_query = QueryNode(None, None, None, None, None, None, None, True) if entity[1]: single_result = True - sub_query.filter = ast.FilterNode(f"id eq {entity[1]}") + sub_query.filter = FilterNode(f"id eq {entity[1]}") # Check if we are the last entity if index == len(entities) - 1: @@ -524,8 +524,8 @@ def convert_query(full_path: str) -> str: if uri['property_name']: # Add the property name to the select node if not sub_query.select: - sub_query.select = ast.SelectNode([]) - sub_query.select.identifiers.append(ast.IdentifierNode(uri['property_name'])) + sub_query.select = SelectNode([]) + sub_query.select.identifiers.append(IdentifierNode(uri['property_name'])) # Merge the query with the subquery if query_ast.select: @@ -552,21 +552,29 @@ def convert_query(full_path: str) -> str: sub_query.count = query_ast.count query_ast.count = None - query_ast.expand.identifiers.append(ast.ExpandNodeIdentifier(entity_name, sub_query)) + query_ast.expand.identifiers.append(ExpandNodeIdentifier(entity_name, sub_query)) index += 1 else: if uri['property_name']: if not query_ast.select: - query_ast.select = ast.SelectNode([]) - query_ast.select.identifiers.append(ast.IdentifierNode(uri['property_name'])) + query_ast.select = SelectNode([]) + query_ast.select.identifiers.append(IdentifierNode(uri['property_name'])) # Check if we have a filter in the query if main_entity_id: - query_ast.filter = ast.FilterNode(query_ast.filter.filter + f" and id eq {main_entity_id}" if query_ast.filter else f"id eq {main_entity_id}") + query_ast.filter = FilterNode(query_ast.filter.filter + f" and id eq {main_entity_id}" if query_ast.filter else f"id eq {main_entity_id}") if not entities: single_result = True + # Check if query has an expand but not a select and does not have sub entities + if query_ast.expand and not query_ast.select and not entities: + # Add default columns to the select node + default_columns = STA2REST.get_default_column_names(main_entity) + query_ast.select = SelectNode([]) + for column in default_columns: + query_ast.select.identifiers.append(IdentifierNode(column)) + # Visit the query ast to convert it visitor = NodeVisitor(main_entity) query_converted = visitor.visit(query_ast) diff --git a/fastapi/app/sta2rest/sta_parser/__init__.py b/fastapi/app/sta2rest/sta_parser/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fastapi/app/sta2rest/sta_parser/ast.py b/fastapi/app/sta2rest/sta_parser/ast.py new file mode 100644 index 0000000..4300cb9 --- /dev/null +++ b/fastapi/app/sta2rest/sta_parser/ast.py @@ -0,0 +1,292 @@ +""" +AST for the SensorThings API. + +Author: Filippo Finke +""" + +class PrettyPrinter(object): + """ + A class used to pretty print objects. + + Methods: + __str__(): Returns a string representation of the object. + """ + + def __str__(self): + """ + Returns a string representation of the object. + + Returns: + str: The string representation of the object. + """ + lines = [] + for key, val in vars(self).items(): + if val is None: + continue + + if isinstance(val, list): + lines += ['\n {}:'.format(key)] + for item in val: + lines += [' {}'.format(line) for line in str(item).split('\n')] + else: + lines += '{}: {}'.format(key, val).split('\n') + return '\n'.join(lines) + + +class Node(PrettyPrinter): + """ + A base class for nodes in the AST. + + Inherits from PrettyPrinter. + + Attributes: + None + """ + + +class IdentifierNode(Node): + """ + A class representing an identifier node. + + Inherits from Node. + + Attributes: + name (str): The name of the identifier. + """ + + def __init__(self, name): + """ + Initializes an IdentifierNode object. + + Args: + name (str): The name of the identifier. + """ + self.name = name + + +class SelectNode(Node): + """ + A class representing a select node. + + Inherits from Node. + + Attributes: + identifiers (list): A list of identifier nodes. + """ + + def __init__(self, identifiers): + """ + Initializes a SelectNode object. + + Args: + identifiers (list): A list of identifier nodes. + """ + self.identifiers = identifiers + + +class FilterNode(Node): + """ + A class representing a filter node. + + Inherits from Node. + + Attributes: + filter (str): The filter string. + """ + + def __init__(self, filter): + """ + Initializes a FilterNode object. + + Args: + filter (str): The filter string. + """ + self.filter = filter + + +class ExpandNodeIdentifier(Node): + """ + A class representing an expand node with an identifier. + + Inherits from Node. + + Attributes: + identifier (str): The identifier string. + subquery (QueryNode): The subquery associated with the expand node. + """ + + def __init__(self, identifier, subquery=None): + """ + Initializes an ExpandNodeIdentifier object. + + Args: + identifier (str): The identifier string. + subquery (QueryNode, optional): The subquery associated with the expand node. + """ + self.identifier = identifier + self.subquery = subquery + + +class ExpandNode(Node): + """ + A class representing an expand node. + + Inherits from Node. + + Attributes: + identifiers (list): A list of identifier nodes. + """ + + def __init__(self, identifiers): + """ + Initializes an ExpandNode object. + + Args: + identifiers (list): A list of identifier nodes. + """ + self.identifiers = identifiers + + +class OrderByNodeIdentifier(Node): + """ + A class representing an order by node with an identifier. + + Inherits from Node. + + Attributes: + identifier (str): The identifier string. + order (str): The order string. + """ + + def __init__(self, identifier, order): + """ + Initializes an OrderByNodeIdentifier object. + + Args: + identifier (str): The identifier string. + order (str): The order string. + """ + self.identifier = identifier + self.order = order + + +class OrderByNode(Node): + """ + A class representing an order by node. + + Inherits from Node. + + Attributes: + identifiers (list): A list of identifier nodes. + """ + + def __init__(self, identifiers): + """ + Initializes an OrderByNode object. + + Args: + identifiers (list): A list of identifier nodes. + """ + self.identifiers = identifiers + + +class SkipNode(Node): + """ + A class representing a skip node. + + Inherits from Node. + + Attributes: + count (int): The count value. + """ + + def __init__(self, count): + """ + Initializes a SkipNode object. + + Args: + count (int): The count value. + """ + self.count = count + + +class TopNode(Node): + """ + A class representing a top node. + + Inherits from Node. + + Attributes: + count (int): The count value. + """ + + def __init__(self, count): + """ + Initializes a TopNode object. + + Args: + count (int): The count value. + """ + self.count = count + + +class CountNode(Node): + """ + A class representing a count node. + + Inherits from Node. + + Attributes: + value (str): The value string. + """ + + def __init__(self, value): + """ + Initializes a CountNode object. + + Args: + value (str): The value string. + """ + self.value = value + + +class QueryNode(Node): + """ + A class representing a query node. + + Inherits from Node. + + Attributes: + select (SelectNode, optional): The select node. + filter (FilterNode, optional): The filter node. + expand (ExpandNode, optional): The expand node. + orderby (OrderByNode, optional): The order by node. + skip (SkipNode, optional): The skip node. + top (TopNode, optional): The top node. + count (CountNode, optional): The count node. + is_subquery (bool): Indicates if the query is a subquery. + """ + + def __init__(self, select=None, filter=None, expand=None, orderby=None, skip=None, top=None, count=None, + is_subquery=False): + """ + Initializes a QueryNode object. + + Args: + select (SelectNode, optional): The select node. + filter (FilterNode, optional): The filter node. + expand (ExpandNode, optional): The expand node. + orderby (OrderByNode, optional): The order by node. + skip (SkipNode, optional): The skip node. + top (TopNode, optional): The top node. + count (CountNode, optional): The count node. + is_subquery (bool): Indicates if the query is a subquery. + """ + self.select = select + self.filter = filter + self.expand = expand + self.orderby = orderby + self.skip = skip + self.top = top + self.count = count + self.is_subquery = is_subquery diff --git a/fastapi/app/sta2rest/sta_parser/lexer.py b/fastapi/app/sta2rest/sta_parser/lexer.py new file mode 100644 index 0000000..06e86ef --- /dev/null +++ b/fastapi/app/sta2rest/sta_parser/lexer.py @@ -0,0 +1,118 @@ +""" +Lexer for the SensorThings API. + +Author: Filippo Finke +""" + +import re +import urllib.parse + + +# Define the token types +TOKEN_TYPES = { + 'COUNT': r'\$count=', + 'TOP': r'\$top=', + 'SKIP': r'\$skip=', + 'SELECT': r'\$select=', + 'FILTER': r'\$filter=', + 'EXPAND': r'\$expand=', + 'ORDERBY': r'\$orderby=', + 'SUBQUERY_SEPARATOR': r';', + 'VALUE_SEPARATOR': r',', + 'OPTIONS_SEPARATOR': r'&', + 'EQUALS': r'\beq\b', + 'AND': r'\band\b', + 'OR': r'\bor\b', + 'ORDER': r'\basc\b|\bdesc\b', + 'BOOL': r'\btrue\b|\bfalse\b', + 'IDENTIFIER': r'@{0,1}[a-zA-Z_][.a-zA-Z0-9_/]*', + 'FLOAT': r'[0-9]+\.[0-9]+', + 'INTEGER': r'[0-9]+', + 'STRING': r"'[^']*'", + 'LEFT_PAREN': r'\(', + 'RIGHT_PAREN': r'\)', + 'WHITESPACE': r'\s+', +} + + +class Token: + """A class representing a token.""" + + def __init__(self, type, value): + """ + Initialize a new Token object. + + Args: + type (str): The type of the token. + value (str): The value of the token. + """ + self.type = type + self.value = value + + def __str__(self): + """ + Return a string representation of the token. + + Returns: + str: The string representation of the token. + """ + return f'Token({self.type}, {self.value})' + + +class Lexer: + """A class for tokenizing SensorThings API queries.""" + + def __init__(self, text): + """ + Initialize a new Lexer object. + + Args: + text (str): The input text to be tokenized. + """ + self.text = urllib.parse.unquote_plus(text) + self.tokens = self.tokenize() + + def tokenize(self): + """ + Tokenize the input text. + + Returns: + list: A list of Token objects representing the tokens. + """ + tokens = [] + position = 0 + + while position < len(self.text): + match = None + + for token_type, pattern in TOKEN_TYPES.items(): + regex = re.compile(pattern) + match = regex.match(self.text, position) + + if match: + value = match.group(0) + token = Token(token_type, value) + tokens.append(token) + position = match.end(0) + break + + if not match: + raise Exception(f'Invalid character at position {position}: {self.text[position]}') + + return tokens + + def __str__(self): + """ + Return a string representation of the lexer. + + Returns: + str: The string representation of the lexer. + """ + return '\n'.join(str(token) for token in self.tokens) + + +# Example usage +if __name__ == '__main__': + text = '''$expand=Locations,Datastreams($select=id,name,unitOfMeasurement;$expand=ObservedProperty($select=name),Observations($select=result,phenomenonTime;$orderby=phenomenonTime desc;$top=1))''' + lexer = Lexer(text) + print(lexer) diff --git a/fastapi/app/sta2rest/sta_parser/parser.py b/fastapi/app/sta2rest/sta_parser/parser.py new file mode 100644 index 0000000..b2ef182 --- /dev/null +++ b/fastapi/app/sta2rest/sta_parser/parser.py @@ -0,0 +1,308 @@ +""" +Parser for the SensorThings API query language. + +Author: Filippo Finke +""" + +from .lexer import Lexer +from . import ast + +class Parser: + def __init__(self, tokens): + """ + Initialize the Parser instance. + + Args: + tokens (list): List of tokens generated by the lexer. + """ + self.tokens = tokens + self.current_token = None + self.next_token() + + def next_token(self): + """ + Get the next token from the list of tokens. + + If there are no more tokens, set the current_token to None. + """ + if self.tokens: + self.current_token = self.tokens.pop(0) + else: + self.current_token = None + + def match(self, token_type): + """ + Match the current token with the specified token type. + + If the current token matches, move to the next token. + If the current token doesn't match, raise an exception. + + Args: + token_type (str): The expected token type. + + Raises: + Exception: If the current token doesn't match the expected type. + """ + if self.current_token and self.current_token.type == token_type: + self.next_token() + else: + raise Exception(f"Expected '{token_type}', but found '{self.current_token.type}' ('{self.current_token.value}')") + + def check_token(self, token_type): + """ + Check if the current token matches the specified token type. + + Args: + token_type (str): The token type to check. + + Returns: + bool: True if the current token matches the token type, False otherwise. + """ + return self.current_token and self.current_token.type == token_type + + def parse_identifier_list(self): + """ + Parse a list of identifiers. + + Returns: + list: A list of ast.IdentifierNode objects representing the identifiers. + """ + identifiers = [] + identifiers.append(ast.IdentifierNode(self.current_token.value)) + self.match('IDENTIFIER') + while self.check_token('VALUE_SEPARATOR'): + self.match('VALUE_SEPARATOR') + identifiers.append(ast.IdentifierNode(self.current_token.value)) + self.match('IDENTIFIER') + return identifiers + + def parse_filter(self, is_in_subquery=False): + """ + Parse a filter expression. + + Args: + is_in_subquery (bool, optional): Whether the filter is in a subquery. Defaults to False. + + Returns: + ast.FilterNode: The parsed filter expression. + """ + self.match('FILTER') + filter = "" + + while not self.check_token('OPTIONS_SEPARATOR') and not self.check_token('SUBQUERY_SEPARATOR') and self.current_token != None and not (is_in_subquery and self.check_token('RIGHT_PAREN')): + filter += self.current_token.value + self.next_token() + + return ast.FilterNode(filter) + + def parse_expand(self): + """ + Parse an expand expression. + + Returns: + ast.ExpandNode: The parsed expand expression. + """ + self.match('EXPAND') + + identifiers = [] + while self.current_token.type != 'OPTIONS_SEPARATOR': + identifier = ast.ExpandNodeIdentifier(self.current_token.value) + self.match('IDENTIFIER') + + # Check if there is a subquery + if self.check_token('LEFT_PAREN'): + identifier.subquery = self.parse_subquery() + + identifiers.append(identifier) + + # Check if there is another option + if self.check_token('VALUE_SEPARATOR'): + self.match('VALUE_SEPARATOR') + else: + break + + return ast.ExpandNode(identifiers) + + def parse_select(self): + """ + Parse a select expression. + + Returns: + ast.SelectNode: The parsed select expression. + """ + self.match('SELECT') + identifiers = self.parse_identifier_list() + return ast.SelectNode(identifiers) + + def parse_orderby(self): + """ + Parse an orderby expression. + + Returns: + ast.OrderByNode: The parsed orderby expression. + """ + self.match('ORDERBY') + # match identifiers separated by commas and check if there is a space and order + identifiers = [] + while True: + identifier = self.current_token.value + self.match('IDENTIFIER') + order = 'asc' + if self.check_token('WHITESPACE'): + self.match('WHITESPACE') + order = self.current_token.value + self.match('ORDER') + + identifiers.append(ast.OrderByNodeIdentifier(identifier, order)) + + if not self.check_token('VALUE_SEPARATOR'): + break + + self.match('VALUE_SEPARATOR') + + return ast.OrderByNode(identifiers) + + def parse_skip(self): + """ + Parse a skip expression. + + Returns: + ast.SkipNode: The parsed skip expression. + """ + self.match('SKIP') + count = int(self.current_token.value) + self.match('INTEGER') + return ast.SkipNode(count) + + def parse_top(self): + """ + Parse a top expression. + + Returns: + ast.TopNode: The parsed top expression. + + Raises: + Exception: If an integer value is expected but not found. + """ + self.match('TOP') + if self.check_token("INTEGER"): + count = int(self.current_token.value) + self.match('INTEGER') + return ast.TopNode(count) + else: + raise Exception(f"Expected integer, but found '{self.current_token.type}' ('{self.current_token.value}')") + + def parse_count(self): + """ + Parse a count expression. + + Returns: + ast.CountNode: The parsed count expression. + """ + self.match('COUNT') + value = self.current_token.value.lower() == 'true' + self.match('BOOL') + return ast.CountNode(value) + + def parse_subquery(self): + """ + Parse a subquery. + + Returns: + ast.QueryNode: The parsed subquery. + """ + self.match('LEFT_PAREN') + select = None + filter = None + expand = None + orderby = None + skip = None + top = None + count = None + + # continue parsing until we reach the end of the query + while True: + if self.current_token.type == 'SELECT': + select = self.parse_select() + elif self.current_token.type == 'FILTER': + filter = self.parse_filter(True) + elif self.current_token.type == 'EXPAND': + expand = self.parse_expand() + elif self.current_token.type == 'ORDERBY': + orderby = self.parse_orderby() + elif self.current_token.type == 'SKIP': + skip = self.parse_skip() + elif self.current_token.type == 'TOP': + top = self.parse_top() + elif self.current_token.type == 'COUNT': + count = self.parse_count() + else: + raise Exception(f"Unexpected token: {self.current_token.type}") + + # check for other options + if self.check_token('SUBQUERY_SEPARATOR'): + self.match('SUBQUERY_SEPARATOR') + else: + break + + self.match('RIGHT_PAREN') + + return ast.QueryNode(select, filter, expand, orderby, skip, top, count, True) + + def parse_query(self): + """ + Parse a query. + + Returns: + ast.QueryNode: The parsed query. + """ + select = None + filter = None + expand = None + orderby = None + skip = None + top = None + count = None + + # continue parsing until we reach the end of the query + while self.current_token != None: + if self.current_token.type == 'SELECT': + select = self.parse_select() + elif self.current_token.type == 'FILTER': + filter = self.parse_filter() + elif self.current_token.type == 'EXPAND': + expand = self.parse_expand() + elif self.current_token.type == 'ORDERBY': + orderby = self.parse_orderby() + elif self.current_token.type == 'SKIP': + skip = self.parse_skip() + elif self.current_token.type == 'TOP': + top = self.parse_top() + elif self.current_token.type == 'COUNT': + count = self.parse_count() + else: + raise Exception(f"Unexpected token: {self.current_token.type}") + + if self.current_token != None: + self.match('OPTIONS_SEPARATOR') + + return ast.QueryNode(select, filter, expand, orderby, skip, top, count) + + def parse(self): + """ + Parse the query. + + Returns: + ast.QueryNode: The parsed query. + """ + return self.parse_query() + +# Example usage +if __name__ == '__main__': + text = '''$select=id,name,description,properties&$top=1000&$filter=properties/type eq 'station'&$expand=Locations,Datastreams($select=id,name,unitOfMeasurement;$expand=ObservedProperty($select=name),Observations($select=result,phenomenonTime;$orderby=phenomenonTime desc;$top=1))''' + lexer = Lexer(text) + tokens = lexer.tokens + + parser = Parser(tokens) + ast = parser.parse() + print(ast) diff --git a/fastapi/app/sta2rest/sta_parser/visitor.py b/fastapi/app/sta2rest/sta_parser/visitor.py new file mode 100644 index 0000000..1259230 --- /dev/null +++ b/fastapi/app/sta2rest/sta_parser/visitor.py @@ -0,0 +1,69 @@ +""" +Visitor for the SensorThings API. + +Author: Filippo Finke +""" +import ast as ast + +class Visitor: + """ + Visitor class for traversing the SensorThings API abstract syntax tree (AST). + + This class provides a visit method that can be used to traverse the AST. + It dynamically calls visit methods based on the type of the node being visited. + If no specific visit method is available for a particular node type, it falls back + to the generic_visit method. + + Attributes: + None + + Methods: + visit(node): Traverse the AST by visiting each node in a depth-first manner. + generic_visit(node): Default visit method called when no specific visit method is available for a node type. + """ + def visit(self, node): + """ + Traverse the AST by visiting each node in a depth-first manner. + + This method dynamically calls visit methods based on the type of the node being visited. + + Args: + node: The current node being visited. + + Returns: + The result of visiting the node. + + Raises: + None + """ + method_name = 'visit_' + type(node).__name__ + visitor = getattr(self, method_name, self.generic_visit) + return visitor(node) + + def generic_visit(self, node): + """ + Default visit method called when no specific visit method is available for a node type. + + This method is called for each field of the node and recursively visits any child nodes. + + Args: + node: The current node being visited. + + Returns: + None + + Raises: + None + """ + # get all the fields of the node + for field_name, field_value in vars(node).items(): + # if the field is a list of nodes + if isinstance(field_value, list): + # visit all the nodes in the list + for item in field_value: + if isinstance(item, ast.Node): + self.visit(item) + # if the field is a node + elif isinstance(field_value, ast.Node): + # visit the node + self.visit(field_value) diff --git a/fastapi/app/utils/__init__.py b/fastapi/app/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fastapi/app/utils/utils.py b/fastapi/app/utils/utils.py new file mode 100644 index 0000000..a9af3b8 --- /dev/null +++ b/fastapi/app/utils/utils.py @@ -0,0 +1,287 @@ +from app.sta2rest import sta2rest +import datetime +import json +class PostgRESTError(Exception): + """ + Exception raised for errors in the PostgREST response. + """ + pass + +def get_result_type_and_column(input_string): + """ + Get the result type and the column name for the result + + Args: + input_string (str): The input string + + Returns: + int: The result type + str: The column name + """ + try: + value = eval(str(input_string)) + except (SyntaxError, NameError): + result_type = 0 + column_name = "resultString" + else: + if isinstance(value, int): + result_type = 1 + column_name = "resultInteger" + elif isinstance(value, float): + result_type = 2 + column_name = "resultDouble" + elif isinstance(value, dict): + result_type = 4 + column_name = "resultJSON" + else: + result_type = None + column_name = None + + if input_string == "true" or input_string == "false": + result_type = 3 + column_name = "resultBoolean" + + if result_type is not None: + return result_type, column_name + else: + raise Exception("Cannot cast result to a valid type") + +def flatten_entity_body(entity, body, name = None): + """ + Flatten the entity body + + Args: + entity (dict): The entity + body (dict): The body + name (str, optional): The name of the entity. Defaults to None. + + Returns: + dict: The flattened entity body + """ + + # check if entity is an array + if isinstance(entity, list): + # loop trought all the values + for item in entity: + # create the entity + flatten_entity_body(item, body, name) + return body + + for key in list(entity): + if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: + converted_key = sta2rest.STA2REST.convert_entity(key) + body[converted_key] = entity[key] + + flatten_entity_body(entity[key], body, converted_key) + + if name: + if isinstance(body[name], list): + for item in body[name]: + item[converted_key] = { + "@iot.id": entity[key]["@iot.id"] if "@iot.id" in entity[key] else None + } + else: + body[name][converted_key] = { + "@iot.id": entity[key]["@iot.id"] if "@iot.id" in entity[key] else None + } + + return body + +def format_entity_body(entity_body): + """ + Format the entity body + + Args: + entity_body (dict): The entity body + + Returns: + dict: The formatted entity body + """ + + # Check if entity_body is an array + if isinstance(entity_body, list): + # Loop trought all the values + for i in range(len(entity_body)): + # Create the entity + entity_body[i] = format_entity_body(entity_body[i]) + return entity_body + + formatted_body = {} + # Loop trought all the keys in the body + for key in entity_body: + if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: + if "@iot.id" in entity_body[key]: + new_key = sta2rest.STA2REST.convert_to_database_id(key) + formatted_body[new_key] = entity_body[key]["@iot.id"] + elif key == "result": + value = entity_body["result"] + result_type, column_name = get_result_type_and_column(value) + formatted_body[column_name] = value + formatted_body["resultType"] = result_type + else: + formatted_body[key] = entity_body[key] + + return formatted_body + +def prepare_entity_body_for_insert(entity_body, created_ids): + for key in entity_body: + # check if key is present in created_ids + if key in created_ids: + entity_body[key] = created_ids[key] + elif "Time" in key: + entity_body[key] = datetime.datetime.fromisoformat(entity_body[key]) + # check if value is a dict and convert it to a str + elif isinstance(entity_body[key], dict): + entity_body[key] = json.dumps(entity_body[key]) + + +async def create_entity(entity_name, body, pgpool): + """ + Create an entity + + Args: + entity_name (str): The entity name + body (dict): The body + pgpool (asyncpg.pool.Pool): The database pool + + Raises: + PostgRESTError: If the entity cannot be created + + """ + + entity_body = { + entity_name: body + } + + body = flatten_entity_body(entity_body, entity_body) + + + # Creation order + created_ids = {} + creation_order = ["Location","Thing", "Sensor", "ObservedProperty", "FeaturesOfInterest", "Datastream", "Observation"] + + async with pgpool.acquire() as conn: + async with conn.transaction(): + for entity_name in creation_order: + if entity_name in body: + + formatted_body = format_entity_body(body[entity_name]) + + if "@iot.id" in formatted_body: + continue + + # check if the entity has sub entities and if they are empty check if id is present + if isinstance(formatted_body, list): + for item in formatted_body: + prepare_entity_body_for_insert(item, created_ids) + else: + prepare_entity_body_for_insert(formatted_body, created_ids) + + # Generate SQL from the body + keys = ', '.join(f'"{key}"' for key in formatted_body.keys()) + values_placeholders = ', '.join(f'${i+1}' for i in range(len(formatted_body))) + + # Prevent duplicates of the same entity + if entity_name in ["ObservedProperty"]: + # Check if the entity already exists with all the same values + # Generate WHERE clause from the body + where = ' AND '.join(f'"{key}" = ${i+1}' for i, key in enumerate(formatted_body.keys())) + query = f'SELECT id FROM sensorthings."{entity_name}" WHERE {where}' + existing_id = await conn.fetchval(query, *formatted_body.values()) + print(query, existing_id) + if existing_id: + created_ids[sta2rest.STA2REST.convert_to_database_id(entity_name)] = existing_id + continue + + query = f'INSERT INTO sensorthings."{entity_name}" ({keys}) VALUES ({values_placeholders}) RETURNING id' + + print(query, formatted_body) + new_id = await conn.fetchval(query, *formatted_body.values()) + + id_key = sta2rest.STA2REST.convert_to_database_id(entity_name) + created_ids[id_key] = new_id + + return None + + +def __flatten_navigation_links(row): + """ + Flatten the navigation links + + Args: + row (dict): The row + + Returns: + dict: The flattened navigation links + """ + if row and "@iot.navigationLink" in row: + # merge all the keys from the navigationLink + row.update(row["@iot.navigationLink"]) + del row["@iot.navigationLink"] + + # check if skip@iot.navigationLink is present and remove it + if "skip@iot.navigationLink" in row: + del row["skip@iot.navigationLink"] + +def __flatten_expand_entity(data): + """ + Flatten the entity + + Args: + data (dict): The data + + Returns: + dict: The flattened entity + """ + + # Check if it is an array + if not isinstance(data, list): + # throw an error + raise PostgRESTError(data) + + # check if data is empty + if not data: + return data + + # Check if there is only one key and it is in an ENTITY_MAPPING from the sta2rest module + if len(data[0].keys()) == 1 and list(data[0].keys())[0] in sta2rest.STA2REST.ENTITY_MAPPING: + # Get the value of the first key + key_name = list(data[0].keys())[0] + data = data[0][key_name] + + return data + +def __create_ref_format(data): + """ + Create the ref format + + Args: + data (dict): The data + + Returns: + dict: The ref format + """ + + rows = [data] + + # Check if it is an array + if isinstance(data, list): + key_name = list(data[0].keys())[0] + # Check if the key is in an ENTITY_MAPPING from the sta2rest module + if key_name in sta2rest.STA2REST.ENTITY_MAPPING: + rows = data[0][key_name] + if not isinstance(rows, list): + rows = [rows] + else: + rows = data + + data = { + "value": [] + } + + for row in rows: + data["value"].append({ + "@iot.selfLink": row["@iot.selfLink"] + }) + + return data \ No newline at end of file diff --git a/fastapi/app/v1/api.py b/fastapi/app/v1/api.py index 2761ae3..d68877e 100644 --- a/fastapi/app/v1/api.py +++ b/fastapi/app/v1/api.py @@ -1,15 +1,13 @@ -from app.v1.endpoints import observed_properties as op -from app.v1.endpoints import sensors -from app.v1.endpoints import contacts -from app.v1.endpoints import observations -from app.v1.endpoints import general +from app.v1.endpoints import read +from app.v1.endpoints import insert +from app.v1.endpoints import delete +from app.v1.endpoints import update_patch from fastapi import FastAPI v1 = FastAPI() -v1.include_router(general.v1) - -#v1.include_router(op.v1) -# v1.include_router(sensors.v1) -# v1.include_router(contacts.v1) -#v1.include_router(observations.v1) \ No newline at end of file +# Register the endpoints +v1.include_router(read.v1) +v1.include_router(insert.v1) +v1.include_router(delete.v1) +v1.include_router(update_patch.v1) diff --git a/fastapi/app/v1/endpoints/contacts.py b/fastapi/app/v1/endpoints/contacts.py deleted file mode 100644 index 6cbbd6b..0000000 --- a/fastapi/app/v1/endpoints/contacts.py +++ /dev/null @@ -1,166 +0,0 @@ -from app.db.db import get_pool -from app.models.contact import Contact -from fastapi import APIRouter, Depends, Response, status -from fastapi.responses import UJSONResponse - - -v1 = APIRouter() - -###################### -# CONTACTS TYPES # -###################### -@v1.get("/contact_types/") -async def get_contact_types(pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - "SELECT enum_range(NULL:: public.contact_type) AS contact_types" - ) - return UJSONResponse(status_code=status.HTTP_200_OK, content=dict(result)) - except Exception as e: - return str(e) -################ -# CONTACTS # -################ -@v1.post("/contacts/") -async def create_contact(contact: Contact, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f""" - INSERT INTO public.contact - (type, person, telephone, fax, email, web, - address, city, admin_area, postal_code, country) - VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - RETURNING id - """, - contact.type, - contact.person, - contact.telephone, - contact.fax, - contact.email, - contact.web, - contact.address, - contact.city, - contact.admin_area, - contact.postal_code, - contact.country, - ) - if result: - return UJSONResponse(status_code=status.HTTP_201_CREATED, content=dict(result)) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=result) - except Exception as e: - return str(e) - - -@v1.get("/contacts/{contact_id}") -async def get_contact(contact_id: int, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f""" - SELECT * FROM public.contact - WHERE id = $1 - """, - contact_id, - ) - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=dict(result)) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.get("/contacts/") -async def get_contacts(pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetch( - "SELECT * FROM public.contact" - ) - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=[dict(r) for r in result]) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.put("/contacts/{contact_id}") -async def update_contact(contact_id: int, contact: Contact, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - """ UPDATE public.contact - SET type = $1, person = $2, telephone = $3, fax = $4, email = $5, web = $6, address = $7, city = $8, admin_area = $9, postal_code = $10, country = $11 - WHERE id = $12 - RETURNING id""", - contact.type, - contact.person, - contact.telephone, - contact.fax, - contact.email, - contact.web, - contact.address, - contact.city, - contact.admin_area, - contact.postal_code, - contact.country, - contact_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.delete("/contacts/{contact_id}") -async def delete_contact(contact_id: int, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f"""DELETE FROM public.contact - WHERE id = $1 - RETURNING id""", - contact_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) \ No newline at end of file diff --git a/fastapi/app/v1/endpoints/delete.py b/fastapi/app/v1/endpoints/delete.py new file mode 100644 index 0000000..267a9c7 --- /dev/null +++ b/fastapi/app/v1/endpoints/delete.py @@ -0,0 +1,60 @@ +import traceback +import httpx +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse +from fastapi import status +from app.sta2rest import sta2rest +from app.utils.utils import PostgRESTError + +v1 = APIRouter() + +# Handle DELETE requests +@v1.api_route("/{path_name:path}", methods=["DELETE"]) +async def catch_all_delete(request: Request, path_name: str): + + try: + full_path = request.url.path + # parse uri + result = sta2rest.STA2REST.parse_uri(full_path) + + # Get main entity + [name, id] = result["entity"] + + # Get the name and id + if not name: + raise Exception("No entity name provided") + + if not id: + raise Exception("No entity id provided") + + async with httpx.AsyncClient() as client: + url = "http://postgrest:3000/" + name + "?id=eq." + id + + # post to postgrest + r = await client.delete(url) + + + if r.status_code != 204: + result = r.json() + raise PostgRESTError(result["message"]) + + # Return okay + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "code": 200, + "type": "success" + } + ) + + except Exception as e: + # print stack trace + traceback.print_exc() + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "code": 400, + "type": "error", + "message": str(e) + } + ) diff --git a/fastapi/app/v1/endpoints/general.py b/fastapi/app/v1/endpoints/general.py deleted file mode 100644 index dc67a69..0000000 --- a/fastapi/app/v1/endpoints/general.py +++ /dev/null @@ -1,258 +0,0 @@ -import httpx -import traceback -from fastapi import APIRouter, Request -from fastapi.responses import JSONResponse -from fastapi import status -from app.sta2rest import sta2rest - -v1 = APIRouter() - -tables = ["Datastreams", "FeaturesOfInterest", "HistoricalLocations", "Locations", "Observations", "ObservedProperties", "Sensors", "Things"] -serverSettings = { - "conformance": [ - "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data", - ], -} - -class PostgRESTError(Exception): - pass - - -def __flatten_navigation_links(row): - if row and "@iot.navigationLink" in row: - # merge all the keys from the navigationLink - row.update(row["@iot.navigationLink"]) - del row["@iot.navigationLink"] - - # check if skip@iot.navigationLink is present and remove it - if "skip@iot.navigationLink" in row: - del row["skip@iot.navigationLink"] - -def __flatten_expand_entity(data): - # Check if it is an array - if not isinstance(data, list): - # throw an error - raise PostgRESTError(data) - - # check if data is empty - if not data: - return data - - # Check if there is only one key and it is in an ENTITY_MAPPING from the sta2rest module - if len(data[0].keys()) == 1 and list(data[0].keys())[0] in sta2rest.STA2REST.ENTITY_MAPPING: - # Get the value of the first key - key_name = list(data[0].keys())[0] - data = data[0][key_name] - - return data - -def __create_ref_format(data): - - rows = [data] - - # Check if it is an array - if isinstance(data, list): - key_name = list(data[0].keys())[0] - # Check if the key is in an ENTITY_MAPPING from the sta2rest module - if key_name in sta2rest.STA2REST.ENTITY_MAPPING: - rows = data[0][key_name] - if not isinstance(rows, list): - rows = [rows] - else: - rows = data - - data = { - "value": [] - } - - for row in rows: - data["value"].append({ - "@iot.selfLink": row["@iot.selfLink"] - }) - - return data - -def __handle_root(request: Request): - # Handle the root path - value = [] - # append the domain to the path for each table - for table in tables: - value.append( - { - "name": table, - "url": - request.url._url + table, - } - ) - - response = { - "value": value, - "serverSettings": serverSettings, - } - return response - -@v1.api_route("/{path_name:path}", methods=["GET"]) -async def catch_all_get(request: Request, path_name: str): - if not path_name: - # Handle the root path - return __handle_root(request) - - try: - # get full path from request - full_path = request.url.path - if request.url.query: - full_path += "?" + request.url.query - - result = sta2rest.STA2REST.convert_query(full_path) - - path = result["url"] - - print("original:\t", full_path) - print("url:\t\t", path) - - url = "http://postgrest:3000" + path - - async with httpx.AsyncClient() as client: - r = await client.get(url) - data = r.json() - - # print r status - if r.status_code != 200: - raise PostgRESTError(data["message"]) - - if result['single_result']: - data = __flatten_expand_entity(data) - - # check if the result is an array - if isinstance(data, list): - data = data[0] - - if result['value']: - # get the value of the first key - data = data[list(data.keys())[0]] - elif result['ref']: - data = __create_ref_format(data) - elif "@iot.navigationLink" in data: - __flatten_navigation_links(data) - - elif result['ref']: - data = __create_ref_format(data) - else: - data = __flatten_expand_entity(data) - # check if the result is an array - if not isinstance(data, list): - data = [data] if data else [] - - for row in data: - __flatten_navigation_links(row) - - data = { - "value": data - } - return data - except PostgRESTError as pge: - traceback.print_exc() - return JSONResponse( - status_code=status.HTTP_404_NOT_FOUND, - content={ - "code": 404, - "type": "error", - "message": str(pge) - } - ) - except Exception as e: - # print stack trace - traceback.print_exc() - return JSONResponse( - status_code=status.HTTP_400_BAD_REQUEST, - content={ - "code": 400, - "type": "error", - "message": str(e) - } - ) - -# Handle POST requests -@v1.api_route("/{path_name:path}", methods=["POST"]) -async def catch_all_post(request: Request, path_name: str): - # Accept only content-type application/json - if not "content-type" in request.headers or request.headers["content-type"] != "application/json": - return JSONResponse( - status_code=status.HTTP_400_BAD_REQUEST, - content={ - "code": 400, - "type": "error", - "message": "Only content-type application/json is supported." - } - ) - - try: - full_path = request.url.path - - # parse uri - result = sta2rest.STA2REST.parse_uri(full_path) - - # get json body - body = await request.json() - - print("original:\t", full_path) - print("parsed:\t", result) - print("body:\t", body) - - main_table = result["entity"][0] - - url = "http://postgrest:3000/" + main_table - - - formatted_body = {} - - # Loop trought all the keys in the body - for key in body: - if key in sta2rest.STA2REST.ENTITY_MAPPING: - value = body[key] - if "@iot.id" in value: - # Convert the id - new_key = sta2rest.STA2REST.convert_to_database_id(key) - formatted_body[new_key] = value["@iot.id"] - else: - # TODO(@filippofinke): Create nested entities - pass - else: - formatted_body[key] = body[key] - - print("ORIGINAL BODY:", body) - - print("FORMATTED BODY:", formatted_body) - - async with httpx.AsyncClient() as client: - # post to postgrest - r = await client.post(url, json=formatted_body, headers={"Prefer": "return=representation"}) - - # get response - result = r.json() - - # print r status - if r.status_code != 201: - raise PostgRESTError(result["message"]) - - # Return okay - return JSONResponse( - status_code=status.HTTP_200_OK, - content={ - "code": 200, - "type": "success", - "message": result - } - ) - - except Exception as e: - # print stack trace - traceback.print_exc() - return JSONResponse( - status_code=status.HTTP_400_BAD_REQUEST, - content={ - "code": 400, - "type": "error", - "message": str(e) - } - ) diff --git a/fastapi/app/v1/endpoints/insert.py b/fastapi/app/v1/endpoints/insert.py new file mode 100644 index 0000000..d630578 --- /dev/null +++ b/fastapi/app/v1/endpoints/insert.py @@ -0,0 +1,54 @@ +import traceback +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse +from fastapi import status +from app.sta2rest import sta2rest +from app.utils.utils import create_entity +from fastapi import Depends +from app.db.db import get_pool + +v1 = APIRouter() + +# Handle POST requests +@v1.api_route("/{path_name:path}", methods=["POST"]) +async def catch_all_post(request: Request, path_name: str, pgpool=Depends(get_pool)): + # Accept only content-type application/json + if not "content-type" in request.headers or request.headers["content-type"] != "application/json": + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "code": 400, + "type": "error", + "message": "Only content-type application/json is supported." + } + ) + + try: + full_path = request.url.path + # parse uri + result = sta2rest.STA2REST.parse_uri(full_path) + # get json body + body = await request.json() + main_table = result["entity"][0] + result = await create_entity(main_table, body, pgpool) + # Return okay + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "code": 200, + "type": "success", + "message": result + } + ) + + except Exception as e: + # print stack trace + traceback.print_exc() + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "code": 400, + "type": "error", + "message": str(e) + } + ) diff --git a/fastapi/app/v1/endpoints/observations.py b/fastapi/app/v1/endpoints/observations.py deleted file mode 100644 index 33b1164..0000000 --- a/fastapi/app/v1/endpoints/observations.py +++ /dev/null @@ -1,279 +0,0 @@ -from app.db.db import get_pool -from app.models.observation import Observation -from app.models.query_parameters import QueryParameters -from fastapi import APIRouter, Depends, Response, status -from fastapi.responses import UJSONResponse -from fastapi.encoders import jsonable_encoder - -v1 = APIRouter() - -from starlette.requests import Request -from starlette.responses import StreamingResponse -from starlette.background import BackgroundTask -from datetime import timedelta, datetime -import httpx -import json -import ujson - - -################### -# OBSERVATION # -################### -# from typing import List - -@v1.get("/Observations") -async def get_observations(as_of_system_time: datetime | None = None): - try: - url = "" - print("GO IT!!!") - print("s_of_system_time", as_of_system_time) - async with httpx.AsyncClient() as client: - # result = await client.post(url, data=json.dumps(dict(observation),default=str )) - if not as_of_system_time: - result = await client.get( - 'http://postgrest:3000/Observation', - params={ - 'limit':100, - 'select':'id, phenomenonTime, resultTime, result, resultQuality, validTime, parameters, datastream_id, feature_of_interest_id', - 'order': 'id.asc' - } - ) - else: - print(f'cs.[{as_of_system_time.isoformat()}, {as_of_system_time.isoformat()}}}]'.replace('+','%2B')) - result = await client.get( - 'http://postgrest:3000/Observation_traveltime' + - '?limit=100&system_time_validity=' + - f'cs.[{as_of_system_time.isoformat()}, {as_of_system_time.isoformat()}}}]'.replace('+','%2B')+ - '&select=id,phenomenonTime,resultTime,result,resultQuality,validTime,parameters,datastream_id,feature_of_interest_id' + - '&order=id.asc' - ) - print('result', ujson.loads(result.text)) - - if result and result.status_code == 200: - print('GOT!') - return UJSONResponse(status_code=status.HTTP_200_OK, content=ujson.loads(result.text)) - # return UJSONResponse(status_code=status.HTTP_200_OK, content=result.text) - except Exception as e: - print('except', e) - return str(e) - - -@v1.get("/Observations({id})") -async def get_observation(id: int, query_options: QueryParameters=Depends()): - try: - url = "http://postgrest:3000/Observation" - print("GO IT!!!") - print(id) - async with httpx.AsyncClient() as client: - # print(observation.dict(exclude_none=True)) - # print(dict(observation)) - # result = await client.post(url, data=json.dumps(dict(observation),default=str )) - result = await client.get( - 'http://postgrest:3000/Observation', - params={ - 'id': f'eq.{id}', - 'limit':100, - 'select':'id, phenomenonTime, resultTime, result, resultQuality, validTime, parameters, datastream_id, feature_of_interest_id', - 'order': 'id.asc' - }, - headers={'Accept': 'application/vnd.pgrst.object+json'} - ) - print('result', result) - if result and result.status_code == 200: - print('CREATED!') - # return UJSONResponse(status_code=status.HTTP_201_CREATED, content=result.text) - return UJSONResponse(status_code=status.HTTP_200_OK, content=ujson.loads(result.text)) - # return ujson.loads(result.text) - except Exception as e: - print('except', e) - return str(e) - -@v1.post("/Observation") -async def insert_observation(observation: Observation): - try: - url = "http://postgrest:3000/Observation" - print("GO IT!!!") - async with httpx.AsyncClient() as client: - print(observation.dict(exclude_none=True)) - print(dict(observation)) - # result = await client.post(url, data=json.dumps(dict(observation),default=str )) - result = await client.post(url, data=observation.dict(exclude_none=True)) - print('result', result) - if result and result.status_code == 201: - print('CREATED!') - return UJSONResponse(status_code=status.HTTP_201_CREATED, content='result') - except Exception as e: - print('except', e) - return str(e) - -@v1.put("/Observation({id})") -async def insert_observation(id: int, observation: Observation): - try: - url = "http://postgrest:3000/Observation" - print("GO IT!!!") - print('ID:', id, {'id': f'eq.{id}'}) - async with httpx.AsyncClient() as client: - # print(observation.dict(exclude_none=True)) - # print(dict(observation)) - # result = await client.post(url, data=json.dumps(dict(observation),default=str )) - result = await client.put( - url, - data=observation.dict(exclude_none=True), - params={'id': f'eq.{id}'} - ) - print('result', result) - if result and result.status_code == 204: - print('UPDATED!') - return UJSONResponse(status_code=status.HTTP_204_NO_CONTENT) - except Exception as e: - print('except', e) - return str(e) - -@v1.patch("/Observation({id})") -async def insert_observation(id: int, observation: Observation): - try: - url = "http://postgrest:3000/Observation" - print("GO IT!!!") - print('ID:', id, {'id': f'eq.{id}'}) - async with httpx.AsyncClient() as client: - print(observation.dict(exclude_none=True)) - # print(dict(observation)) - # result = await client.post(url, data=json.dumps(dict(observation),default=str )) - result = await client.put( - url, - data=observation.dict(exclude_none=True), - params={'id': f'eq.{id}'} - ) - print('result', result) - if result and result.status_code == 204: - print('UPDATED!') - return UJSONResponse(status_code=status.HTTP_204_NO_CONTENT) - except Exception as e: - print('except', e) - return str(e) - -# @v1.get("/contacts/{contact_id}") -# async def get_contact(contact_id: int, pgpool=Depends(get_pool)): -# try: -# async with pgpool.acquire() as conn: -# result = await conn.fetchrow( -# f""" -# SELECT * FROM public.contact -# WHERE id = $1 -# """, -# contact_id, -# ) -# if result: -# return UJSONResponse(status_code=status.HTTP_200_OK, content=dict(result)) -# else: -# result = { -# "exceptionReport": { -# "code": "InvalidParameterValue", -# "locator": "id" -# } -# } -# return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) -# except Exception as e: -# return str(e) - - -# @v1.get("/contacts/") -# async def get_contacts(pgpool=Depends(get_pool)): -# try: -# async with pgpool.acquire() as conn: -# result = await conn.fetch( -# "SELECT * FROM public.contact" -# ) -# if result: -# return UJSONResponse(status_code=status.HTTP_200_OK, content=[dict(r) for r in result]) -# else: -# result = { -# "exceptionReport": { -# "code": "InvalidParameterValue", -# "locator": "id" -# } -# } -# return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) -# except Exception as e: -# return str(e) - - -# @v1.put("/contacts/{contact_id}") -# async def update_contact(contact_id: int, contact: Contact, pgpool=Depends(get_pool)): -# try: -# async with pgpool.acquire() as conn: -# result = await conn.fetchrow( -# """ UPDATE public.contact -# SET type = $1, person = $2, telephone = $3, fax = $4, email = $5, web = $6, address = $7, city = $8, admin_area = $9, postal_code = $10, country = $11 -# WHERE id = $12 -# RETURNING id""", -# contact.type, -# contact.person, -# contact.telephone, -# contact.fax, -# contact.email, -# contact.web, -# contact.address, -# contact.city, -# contact.admin_area, -# contact.postal_code, -# contact.country, -# contact_id, -# ) -# if result: -# return Response(status_code=status.HTTP_204_NO_CONTENT) -# else: -# result = { -# "exceptionReport": { -# "code": "InvalidParameterValue", -# "locator": "id" -# } -# } -# return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) -# except Exception as e: -# return str(e) - - -# @v1.delete("/contacts/{contact_id}") -# async def delete_contact(contact_id: int, pgpool=Depends(get_pool)): -# try: -# async with pgpool.acquire() as conn: -# result = await conn.fetchrow( -# f"""DELETE FROM public.contact -# WHERE id = $1 -# RETURNING id""", -# contact_id, -# ) -# if result: -# return Response(status_code=status.HTTP_204_NO_CONTENT) -# else: -# result = { -# "exceptionReport": { -# "code": "InvalidParameterValue", -# "locator": "id" -# } -# } -# return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) -# except Exception as e: -# return str(e) - - -# client = httpx.AsyncClient(base_url="http://localhost:3000/") - - -# async def _reverse_proxy(request: Request): -# url = httpx.URL(path=request.url.path, -# query=request.url.query.encode("utf-8")) -# rp_req = client.build_request(request.method, url, -# headers=request.headers.raw, -# content=await request.body()) -# rp_resp = await client.send(rp_req, stream=True) -# return StreamingResponse( -# rp_resp.aiter_raw(), -# status_code=rp_resp.status_code, -# headers=rp_resp.headers, -# background=BackgroundTask(rp_resp.aclose), -# ) - -# app.add_route("/titles/{path:path}", -# _reverse_proxy, ["GET", "POST"]) diff --git a/fastapi/app/v1/endpoints/observed_properties.py b/fastapi/app/v1/endpoints/observed_properties.py deleted file mode 100644 index 02578fb..0000000 --- a/fastapi/app/v1/endpoints/observed_properties.py +++ /dev/null @@ -1,136 +0,0 @@ -from app.db.db import get_pool -from app.models.observed_property import ObservedProperty -from fastapi import APIRouter, Depends, Response, status -from fastapi.responses import UJSONResponse - - -v1 = APIRouter() - -########################### -# OBSERVED PROPERTIES # -########################### -@v1.post("/ObservedProperties") -async def create_observed_property( - observed_property: ObservedProperty, - pgpool=Depends(get_pool)): - print(observed_property) - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f""" - INSERT INTO public.observed_property - (name, description, definition, constraint) - VALUES($1, $2, $3, $4) - """, - observed_property.name, - observed_property.description, - observed_property.definition, - observed_property.constraint - ) - print(result) - if result: - result = { - "@iot.selfLink": "http://localhost/v.1./ObservedProperties({})".format( - result[0][0] - ) - } - return UJSONResponse(status_code=status.HTTP_201_CREATED, content=dict(result)) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=result) - except Exception as e: - return str(e) - - -@v1.get("/ObservedProperties({observed_property_id})") -async def get_observed_property(observed_property_id: str, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f""" - SELECT * FROM public.observed_property - WHERE id::text = $1 - """, - observed_property_id, - ) - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=dict(result)) - else: - return UJSONResponse(status_code=status.HTTP_204_NO_CONTENT) - except Exception as e: - return str(e) - - -@v1.get("/ObservedProperties") -async def get_observed_properties(pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetch( - "SELECT * FROM public.observed_property" - ) - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=[dict(r) for r in result]) - else: - return UJSONResponse(status_code=status.HTTP_204_NO_CONTENT) - except Exception as e: - return str(e) - - -@v1.put("/ObservedProperties({observed_property_id})") -async def update_observed_properties(observed_property_id: int, observed_property: ObservedProperty, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f""" - UPDATE public.observed_property - SET name = $1, description = $2, definition= $3, constraint = $4 - WHERE id = $5 - RETURNING id - """, - observed_property.name, - observed_property.description, - observed_property.definition, - observed_property.constraint, - observed_property_id - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.delete("/ObservedProperties({observed_property_id})") -async def delete_observed_property(observed_property_id: int, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - """DELETE FROM public.observed_property - WHERE id = $1 - RETURNING *""", - observed_property_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) \ No newline at end of file diff --git a/fastapi/app/v1/endpoints/read.py b/fastapi/app/v1/endpoints/read.py new file mode 100644 index 0000000..f310832 --- /dev/null +++ b/fastapi/app/v1/endpoints/read.py @@ -0,0 +1,111 @@ +import httpx +import traceback +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse +from fastapi import status +from app.sta2rest import sta2rest +from app.utils.utils import PostgRESTError, __create_ref_format, __flatten_expand_entity, __flatten_navigation_links +from app.settings import tables, serverSettings + +v1 = APIRouter() + + +def __handle_root(request: Request): + # Handle the root path + value = [] + # append the domain to the path for each table + for table in tables: + value.append( + { + "name": table, + "url": + request.url._url + table, + } + ) + + response = { + "value": value, + "serverSettings": serverSettings, + } + return response + +@v1.api_route("/{path_name:path}", methods=["GET"]) +async def catch_all_get(request: Request, path_name: str): + if not path_name: + # Handle the root path + return __handle_root(request) + + try: + # get full path from request + full_path = request.url.path + if request.url.query: + full_path += "?" + request.url.query + + result = sta2rest.STA2REST.convert_query(full_path) + + path = result["url"] + + print("original:\t", full_path) + print("url:\t\t", path) + + url = "http://postgrest:3000" + path + + async with httpx.AsyncClient() as client: + r = await client.get(url) + data = r.json() + + # print r status + if r.status_code != 200: + raise PostgRESTError(data["message"]) + + if result['single_result']: + data = __flatten_expand_entity(data) + + # check if the result is an array + if isinstance(data, list): + data = data[0] + + if result['value']: + # get the value of the first key + data = data[list(data.keys())[0]] + elif result['ref']: + data = __create_ref_format(data) + elif "@iot.navigationLink" in data: + __flatten_navigation_links(data) + + elif result['ref']: + data = __create_ref_format(data) + else: + data = __flatten_expand_entity(data) + # check if the result is an array + if not isinstance(data, list): + data = [data] if data else [] + + for row in data: + __flatten_navigation_links(row) + + data = { + "value": data + } + return data + except PostgRESTError as pge: + traceback.print_exc() + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "code": 404, + "type": "error", + "message": str(pge) + } + ) + except Exception as e: + # print stack trace + traceback.print_exc() + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "code": 400, + "type": "error", + "message": str(e) + } + ) diff --git a/fastapi/app/v1/endpoints/sensor_types.py b/fastapi/app/v1/endpoints/sensor_types.py deleted file mode 100644 index ca10ff1..0000000 --- a/fastapi/app/v1/endpoints/sensor_types.py +++ /dev/null @@ -1,136 +0,0 @@ -from app.db.db import get_pool -from app.models.sensor import SensorType, Sensor -from fastapi import APIRouter, Depends, Response, status -from fastapi.responses import UJSONResponse - - -v1 = APIRouter() - - -################ -# SENSOR TYPES # -################ -@v1.post("/sensor_types/") -async def create_sensor_type(sensorType: SensorType, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f"""INSERT INTO public.sensor_type - (description, metadata) - VALUES($1, $2) - RETURNING id""", - sensorType.description, - sensorType.metadata, - ) - if result: - return UJSONResponse(status_code=status.HTTP_201_CREATED, content=dict(result)) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=result) - except Exception as e: - return str(e) - - -@v1.get("/sensor_types/{sensor_type_id}") -async def get_sensor_type(sensor_type_id: int, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f"""SELECT * FROM public.sensor_type - WHERE id = $1""", - sensor_type_id, - ) - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=dict(result)) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.get("/sensor_types/") -async def get_sensor_types(pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetch("SELECT * FROM public.sensor_type") - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=[dict(r) for r in result]) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.put("/sensor_types/{sensor_type_id}") -async def update_sensor_type(sensor_type_id: int, sensor_type: SensorType, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - """ UPDATE public.sensor_type - SET description = $1, metadata = $2 - WHERE id = $3 - RETURNING *""", - sensor_type.description, - sensor_type.metadata, - sensor_type_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.delete("/sensor_types/{sensor_type_id}") -async def delete_sensor_type(sensor_type_id: int, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - """ UPDATE public.sensor - SET sensor_type_id = null - WHERE sensor_type_id = $1 - RETURNING *""", - sensor_type_id, - ) - result = await conn.fetchrow( - f"""DELETE FROM public.sensor_type - WHERE id = $1 - RETURNING *""", - sensor_type_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) \ No newline at end of file diff --git a/fastapi/app/v1/endpoints/sensors.py b/fastapi/app/v1/endpoints/sensors.py deleted file mode 100644 index ac2e68b..0000000 --- a/fastapi/app/v1/endpoints/sensors.py +++ /dev/null @@ -1,224 +0,0 @@ -import json, uuid -from app.db.db import get_pool -from app.models.sensor import SensorType, Sensor -from app.models.contact import Contact -from app.models.query_parameters import QueryParameters -from app.v1.endpoints.contacts import * -from app.v1.endpoints.sensor_types import * -from fastapi import APIRouter, Depends, Response, status -from fastapi.responses import UJSONResponse - - -v1 = APIRouter() - - -################ -# SENSORS # -################ -@v1.post("/Sensors/") -async def create_sensor(sensor: Sensor, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - contact_id = None - sensorType_id = None - ID = None - if (type(sensor.contact)) is Contact: - result = await create_contact(sensor.contact, pgpool) - contact_id = json.loads(result.body.decode("utf-8"))["id"] - if (type(sensor.contact)) is int: - contact_id = sensor.contact - if (type(sensor.sensor_type)) is SensorType: - result = await create_sensor_type(sensor.sensor_type, pgpool) - sensorType_id = json.loads(result.body.decode("utf-8"))["id"] - if (type(sensor.sensor_type)) is int: - sensorType_id = sensor.sensor_type - - ID = str(uuid.uuid4()) - for key in dict(sensor): - if key == "id": - ID = sensor.id - result = await conn.fetchrow( - f"""INSERT INTO public.sensor - (id, name, description, encoding_type, sampling_time_resolution, - acquisition_time_resolution, sensor_type_id) - VALUES($1, $2, $3, $4, $5, $6, $7) - RETURNING *""", - ID, - sensor.name, - sensor.description, - sensor.encoding_type, - sensor.sampling_time_resolution, - sensor.acquisition_time_resolution, - sensorType_id - ) - - tmp_result = result - if contact_id is not None: - sensor_id = dict(result)["id"] - result = await conn.fetchrow( - f"""INSERT INTO public.sensor_contact - (sensor_id, contact_id) - VALUES($1, $2) - ON CONFLICT DO NOTHING - RETURNING *""", - sensor_id, - contact_id, - ) - if tmp_result: - for key in dict(tmp_result): - if key == "id": - tmp_result = {"id": str(dict(tmp_result)[key])} - return UJSONResponse(status_code=status.HTTP_201_CREATED, content=tmp_result) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=result) - except Exception as e: - return str(e) - - -@v1.get("/Sensors/{sensor_id}") -async def get_sensor(sensor_id: str, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f"""SELECT * FROM public.sensor - WHERE id::text = $1""", - sensor_id, - ) - if result: - tmp_result = dict(result) - for key in tmp_result: - if key == "id" or key == "sampling_time_resolution" or key == "acquisition_time_resolution": - tmp_result[key] = str(tmp_result[key]) - return UJSONResponse(status_code=status.HTTP_200_OK, content=tmp_result) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.get("/Sensors/") -async def get_sensors(pgpool=Depends(get_pool), query_options: QueryParameters=Depends()): - try: - sql = "SELECT * FROM public.sensor" - - # return UJSONResponse(status_code=status.HTTP_200_OK, content=query_options.expand) - return UJSONResponse(status_code=status.HTTP_200_OK, content= query_options.to_sql(element='sensor')) - - async with pgpool.acquire() as conn: - result = await conn.fetch("SELECT * FROM public.sensor") - tmp_result = [dict(r) for r in result] - for key in tmp_result: - key["id"] = str(key["id"]) - key["sampling_time_resolution"] = str(key["sampling_time_resolution"]) - key["acquisition_time_resolution"] = str(key["acquisition_time_resolution"]) - - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=tmp_result) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.put("/Sensors({sensor_id})") -async def update_sensor(sensor_id: str, sensor: Sensor, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - contact_id = None - sensorType_id = None - if (type(sensor.contact)) is Contact: - result = await create_contact(sensor.contact, pgpool) - contact_id = json.loads(result.body.decode("utf-8"))["id"] - if (type(sensor.contact)) is int: - contact_id = sensor.contact - if (type(sensor.sensor_type)) is SensorType: - result = await create_sensor_type(sensor.sensor_type, pgpool) - sensorType_id = json.loads(result.body.decode("utf-8"))["id"] - if (type(sensor.sensor_type)) is int: - sensorType_id = sensor.sensor_type - result = await conn.fetchrow( - """ UPDATE public.sensor - SET name = $1, description = $2, encoding_type = $3, sampling_time_resolution = $4, acquisition_time_resolution = $5, sensor_type_id = $6 - WHERE id::text = $7 - RETURNING *""", - sensor.name, - sensor.description, - sensor.encoding_type, - sensor.sampling_time_resolution, - sensor.acquisition_time_resolution, - sensorType_id, - sensor_id, - ) - tmp_result = result - if contact_id is not None: - sensor_id = dict(result)["id"] - result = await conn.fetchrow( - f"""INSERT INTO public.sensor_contact - (sensor_id, contact_id) - VALUES($1, $2) - ON CONFLICT DO NOTHING - RETURNING *""", - sensor_id, - contact_id, - ) - - if tmp_result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.delete("/Sensors({sensor_id})") -async def delete_sensor(sensor_id: str, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - """DELETE FROM public.sensor_contact - WHERE sensor_id::text = $1 - RETURNING *""", - sensor_id, - ) - result = await conn.fetchrow( - f"""DELETE FROM public.sensor - WHERE id::text = $1 - RETURNING *""", - sensor_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) \ No newline at end of file diff --git a/fastapi/app/v1/endpoints/update_patch.py b/fastapi/app/v1/endpoints/update_patch.py new file mode 100644 index 0000000..850b4c1 --- /dev/null +++ b/fastapi/app/v1/endpoints/update_patch.py @@ -0,0 +1,72 @@ +import traceback +import httpx +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse +from fastapi import status +from app.sta2rest import sta2rest +from app.utils.utils import PostgRESTError + +v1 = APIRouter() + +# Handle UPDATE requests +@v1.api_route("/{path_name:path}", methods=["PATCH"]) +async def catch_all_update(request: Request, path_name: str): + # Accept only content-type application/json + if not "content-type" in request.headers or request.headers["content-type"] != "application/json": + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "code": 400, + "type": "error", + "message": "Only content-type application/json is supported." + } + ) + + try: + full_path = request.url.path + # parse uri + result = sta2rest.STA2REST.parse_uri(full_path) + + # Get main entity + [name, id] = result["entity"] + + # Get the name and id + if not name: + raise Exception("No entity name provided") + + if not id: + raise Exception("No entity id provided") + + + body = await request.json() + + async with httpx.AsyncClient() as client: + url = "http://postgrest:3000/" + name + "?id=eq." + id + + # post to postgrest + r = await client.patch(url, json=body) + + if r.status_code != 204: + result = r.json() + raise PostgRESTError(result["message"]) + + # Return okay + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "code": 200, + "type": "success" + } + ) + + except Exception as e: + # print stack trace + traceback.print_exc() + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "code": 400, + "type": "error", + "message": str(e) + } + ) diff --git a/fastapi/requirements.txt b/fastapi/requirements.txt index 05b8cff..128bddc 100644 --- a/fastapi/requirements.txt +++ b/fastapi/requirements.txt @@ -7,5 +7,4 @@ pypika httpx asyncio postgrest -sta_parser @ git+https://github.com/filippofinke/sta-parser@main odata-query \ No newline at end of file