From 1e60e596872c0150fcd26eba8fcc6ae698696c58 Mon Sep 17 00:00:00 2001 From: filippofinke Date: Mon, 24 Jul 2023 21:50:17 +0200 Subject: [PATCH 01/33] feat: initial implementation of multiple columns --- database/istsos_example_data.sql | 16 ++++++++++------ database/istsos_schema.sql | 17 ++++++++++++++++- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/database/istsos_example_data.sql b/database/istsos_example_data.sql index 89796ba..0babda6 100644 --- a/database/istsos_example_data.sql +++ b/database/istsos_example_data.sql @@ -81,11 +81,15 @@ VALUES ('Room 102', 'Feature of interest for Room 102', 'application/vnd.geo+jso -- observation -INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "result", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") -VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 23.5, NULL, NULL, NULL, 1, 1); +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") +VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 2, 23.5, NULL, NULL, NULL, 1, 1); -INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "result", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") -VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 23.5, NULL, NULL, NULL, 2, 2); +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") +VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 2, 23.5, NULL, NULL, NULL, 2, 2); + +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") +VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 2, 23.5, NULL, NULL, NULL, 3, 3); + +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultString", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") +VALUES ('2023-03-26 10:30:00-04', '2023-03-26 10:30:00-04', 0, "Test", NULL, NULL, NULL, 3, 3); -INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "result", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") -VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 23.5, NULL, NULL, NULL, 3, 3); diff --git a/database/istsos_schema.sql b/database/istsos_schema.sql index 87d973b..5a8826f 100644 --- a/database/istsos_schema.sql +++ b/database/istsos_schema.sql @@ -78,7 +78,12 @@ CREATE TABLE IF NOT EXISTS sensorthings."Observation" ( "id" BIGSERIAL PRIMARY KEY, "phenomenonTime" TIMESTAMPTZ NOT NULL, "resultTime" TIMESTAMPTZ NOT NULL, - "result" FLOAT NOT NULL, + "resultType" INT NOT NULL, + "resultString" TEXT, + "resultInteger" INT, + "resultDouble" DOUBLE PRECISION, + "resultBoolean" BOOLEAN, + "resultJSON" jsonb, "resultQuality" TEXT, "validTime" tstzrange DEFAULT NULL, "parameters" jsonb, @@ -87,6 +92,16 @@ CREATE TABLE IF NOT EXISTS sensorthings."Observation" ( UNIQUE ("datastream_id", "phenomenonTime") ); +CREATE OR REPLACE FUNCTION result(sensorthings."Observation") RETURNS text AS $$ + SELECT CASE WHEN $1."resultType" = 0 THEN $1."resultString" + WHEN $1."resultType" = 1 THEN $1."resultInteger"::text + WHEN $1."resultType" = 2 THEN $1."resultDouble"::text + WHEN $1."resultType" = 3 THEN $1."resultBoolean"::text + WHEN $1."resultType" = 4 THEN $1."resultJSON"::text + ELSE NULL + END; +$$ LANGUAGE SQL; + CREATE OR REPLACE FUNCTION "@iot.id"(anyelement) RETURNS text AS $$ SELECT $1.id; $$ LANGUAGE SQL; From 3f9c5cd5fab9f1ae995ca344b953f7177547dec0 Mon Sep 17 00:00:00 2001 From: filippofinke Date: Fri, 28 Jul 2023 21:18:37 +0200 Subject: [PATCH 02/33] fix: remove required properties --- database/istsos_schema.sql | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/database/istsos_schema.sql b/database/istsos_schema.sql index 5a8826f..bb4d09b 100644 --- a/database/istsos_schema.sql +++ b/database/istsos_schema.sql @@ -13,14 +13,14 @@ CREATE TABLE IF NOT EXISTS sensorthings."Location" ( "description" TEXT NOT NULL, "encodingType" VARCHAR(100) NOT NULL, "location" geometry(geometry, 4326) NOT NULL, - "properties" jsonb NOT NULL + "properties" jsonb ); CREATE TABLE IF NOT EXISTS sensorthings."Thing" ( "id" BIGSERIAL NOT NULL PRIMARY KEY, "name" VARCHAR(255) UNIQUE NOT NULL, "description" TEXT NOT NULL, - "properties" jsonb NOT NULL, + "properties" jsonb, "location_id" BIGINT REFERENCES sensorthings."Location" (id) ); @@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS sensorthings."ObservedProperty" ( "name" VARCHAR(255) UNIQUE NOT NULL, "definition" VARCHAR(255) NOT NULL, "description" VARCHAR(255) NOT NULL, - "properties" jsonb NOT NULL + "properties" jsonb ); CREATE TABLE IF NOT EXISTS sensorthings."Sensor" ( @@ -46,7 +46,7 @@ CREATE TABLE IF NOT EXISTS sensorthings."Sensor" ( "description" VARCHAR(255) NOT NULL, "encodingType" VARCHAR(100) NOT NULL, "metadata" jsonb NOT NULL, - "properties" jsonb NOT NULL + "properties" jsonb ); CREATE TABLE IF NOT EXISTS sensorthings."Datastream" ( @@ -71,7 +71,7 @@ CREATE TABLE IF NOT EXISTS sensorthings."FeaturesOfInterest" ( "description" VARCHAR(255) NOT NULL, "encodingType" VARCHAR(100) NOT NULL, "feature" geometry(geometry, 4326) NOT NULL, - "properties" jsonb NOT NULL + "properties" jsonb ); CREATE TABLE IF NOT EXISTS sensorthings."Observation" ( @@ -88,7 +88,7 @@ CREATE TABLE IF NOT EXISTS sensorthings."Observation" ( "validTime" tstzrange DEFAULT NULL, "parameters" jsonb, "datastream_id" BIGINT REFERENCES sensorthings."Datastream"(id), - "feature_of_interest_id" BIGINT REFERENCES sensorthings."FeaturesOfInterest"(id), + "featuresofinterest_id" BIGINT REFERENCES sensorthings."FeaturesOfInterest"(id), UNIQUE ("datastream_id", "phenomenonTime") ); From ddf8bb16f2d6c99b40b0753f3041013f32f521f3 Mon Sep 17 00:00:00 2001 From: filippofinke Date: Fri, 28 Jul 2023 22:10:48 +0200 Subject: [PATCH 03/33] feat: use recursive function --- fastapi/app/v1/endpoints/general.py | 146 ++++++++++++++++++---------- 1 file changed, 95 insertions(+), 51 deletions(-) diff --git a/fastapi/app/v1/endpoints/general.py b/fastapi/app/v1/endpoints/general.py index dc67a69..3478c10 100644 --- a/fastapi/app/v1/endpoints/general.py +++ b/fastapi/app/v1/endpoints/general.py @@ -171,6 +171,91 @@ async def catch_all_get(request: Request, path_name: str): "message": str(e) } ) + +def get_result_type_and_column(input_string): + try: + value = eval(str(input_string)) + except (SyntaxError, NameError): + result_type = 0 + column_name = "resultString" + else: + if isinstance(value, int): + result_type = 1 + column_name = "resultInteger" + elif isinstance(value, float): + result_type = 2 + column_name = "resultDouble" + elif isinstance(value, dict): + result_type = 4 + column_name = "resultJSON" + else: + result_type = None + column_name = None + + if input_string == "true" or input_string == "false": + result_type = 3 + column_name = "resultBoolean" + + if result_type is not None: + return result_type, column_name + else: + raise Exception("Cannot cast result to a valid type") + + +async def create_entity(entity_name, body): + url = "http://postgrest:3000/" + entity_name + + formatted_body = {} + + # Loop trought all the keys in the body + for key in body: + if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: + value = body[key] + if "@iot.id" in value: + # Convert the id + new_key = sta2rest.STA2REST.convert_to_database_id(key) + formatted_body[new_key] = value["@iot.id"] + else: + entity_name = sta2rest.STA2REST.convert_entity(key) + print("MUST CREATE ENTITY", entity_name) + # check if value is an array + if isinstance(value, list): + # create an empty array + formatted_body[key] = [] + # loop trought all the values + for item in value: + # create the entity + result = await create_entity(entity_name, item) + # append the id to the array + formatted_body[key].append(result) + else: + formatted_body[key] = await create_entity(entity_name, value) + + print("RESULT", formatted_body[key]) + + elif key == "result": + value = body["result"] + result_type, column_name = get_result_type_and_column(value) + formatted_body[column_name] = value + formatted_body["resultType"] = result_type + else: + formatted_body[key] = body[key] + + print("FORMATTED BODY", formatted_body) + + async with httpx.AsyncClient() as client: + # post to postgrest + r = await client.post(url, json=formatted_body, headers={"Prefer": "return=representation"}) + + # get response + result = r.json() + + # print r status + if r.status_code != 201: + raise PostgRESTError(result["message"]) + + return result + # Handle POST requests @v1.api_route("/{path_name:path}", methods=["POST"]) @@ -188,62 +273,21 @@ async def catch_all_post(request: Request, path_name: str): try: full_path = request.url.path - # parse uri result = sta2rest.STA2REST.parse_uri(full_path) - # get json body body = await request.json() - - print("original:\t", full_path) - print("parsed:\t", result) - print("body:\t", body) - main_table = result["entity"][0] - - url = "http://postgrest:3000/" + main_table - - - formatted_body = {} - - # Loop trought all the keys in the body - for key in body: - if key in sta2rest.STA2REST.ENTITY_MAPPING: - value = body[key] - if "@iot.id" in value: - # Convert the id - new_key = sta2rest.STA2REST.convert_to_database_id(key) - formatted_body[new_key] = value["@iot.id"] - else: - # TODO(@filippofinke): Create nested entities - pass - else: - formatted_body[key] = body[key] - - print("ORIGINAL BODY:", body) - - print("FORMATTED BODY:", formatted_body) - - async with httpx.AsyncClient() as client: - # post to postgrest - r = await client.post(url, json=formatted_body, headers={"Prefer": "return=representation"}) - - # get response - result = r.json() - - # print r status - if r.status_code != 201: - raise PostgRESTError(result["message"]) - - # Return okay - return JSONResponse( - status_code=status.HTTP_200_OK, - content={ - "code": 200, - "type": "success", - "message": result - } - ) + result = await create_entity(main_table, body) + # Return okay + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "code": 200, + "type": "success", + "message": result + } + ) except Exception as e: # print stack trace From da067df821d3b1b946c7f48ef8561a38d509e609 Mon Sep 17 00:00:00 2001 From: filippofinke Date: Thu, 3 Aug 2023 10:39:39 +0200 Subject: [PATCH 04/33] feat: flatten entity creation body --- fastapi/app/v1/endpoints/general.py | 101 +++++++++++++++++----------- 1 file changed, 63 insertions(+), 38 deletions(-) diff --git a/fastapi/app/v1/endpoints/general.py b/fastapi/app/v1/endpoints/general.py index 3478c10..bacd7b1 100644 --- a/fastapi/app/v1/endpoints/general.py +++ b/fastapi/app/v1/endpoints/general.py @@ -201,60 +201,85 @@ def get_result_type_and_column(input_string): else: raise Exception("Cannot cast result to a valid type") +def flatten_entity_body(entity, body): -async def create_entity(entity_name, body): - url = "http://postgrest:3000/" + entity_name + # check if entity is an array + if isinstance(entity, list): + # loop trought all the values + for item in entity: + # create the entity + flatten_entity_body(item, body) + return body - formatted_body = {} + + for key in list(entity): + if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: + body[sta2rest.STA2REST.convert_entity(key)] = entity[key] + flatten_entity_body(entity[key], body) + return body + +def format_entity_body(entity_body): + # Check if entity_body is an array + if isinstance(entity_body, list): + # Loop trought all the values + for i in range(len(entity_body)): + # Create the entity + entity_body[i] = format_entity_body(entity_body[i]) + return entity_body + formatted_body = {} # Loop trought all the keys in the body - for key in body: + for key in entity_body: if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: - value = body[key] - if "@iot.id" in value: - # Convert the id + if "@iot.id" in entity_body[key]: new_key = sta2rest.STA2REST.convert_to_database_id(key) formatted_body[new_key] = value["@iot.id"] - else: - entity_name = sta2rest.STA2REST.convert_entity(key) - print("MUST CREATE ENTITY", entity_name) - # check if value is an array - if isinstance(value, list): - # create an empty array - formatted_body[key] = [] - # loop trought all the values - for item in value: - # create the entity - result = await create_entity(entity_name, item) - # append the id to the array - formatted_body[key].append(result) - else: - formatted_body[key] = await create_entity(entity_name, value) - - print("RESULT", formatted_body[key]) - elif key == "result": - value = body["result"] + value = entity_body["result"] result_type, column_name = get_result_type_and_column(value) formatted_body[column_name] = value formatted_body["resultType"] = result_type else: - formatted_body[key] = body[key] + formatted_body[key] = entity_body[key] - print("FORMATTED BODY", formatted_body) + return formatted_body - async with httpx.AsyncClient() as client: - # post to postgrest - r = await client.post(url, json=formatted_body, headers={"Prefer": "return=representation"}) +async def create_entity(entity_name, body): - # get response - result = r.json() + body = flatten_entity_body(body, body) - # print r status - if r.status_code != 201: - raise PostgRESTError(result["message"]) - - return result + body[entity_name] = {} + # Loop trough all keys in the body and if they are not an entity create a main entity + for key in list(body): + if isinstance(key, str) and key not in sta2rest.STA2REST.ENTITY_MAPPING: + body[entity_name][key] = body[key] + del body[key] + + # Creation order + created_ids = {} + creation_order = ["Thing", "Location", "Sensor", "ObservedProperty", "FeaturesOfInterest", "Datastream", "Observation"] + for entity_name in creation_order: + if entity_name in body: + formatted_body = format_entity_body(body[entity_name]) + + url = "http://postgrest:3000/" + entity_name + + print("CREATING ENTITY", entity_name, formatted_body) + ''' + async with httpx.AsyncClient() as client: + # post to postgrest + r = await client.post(url, json=formatted_body, headers={"Prefer": "return=representation"}) + + # get response + result = r.json() + + # print r status + if r.status_code != 201: + raise PostgRESTError(result["message"]) + + return result''' + + return None # Handle POST requests From 906dedabb3a8c006fca4e0d2a65b46e1618a5b90 Mon Sep 17 00:00:00 2001 From: filippofinke Date: Thu, 3 Aug 2023 15:06:53 +0200 Subject: [PATCH 05/33] feat: add ids --- fastapi/app/v1/endpoints/general.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/fastapi/app/v1/endpoints/general.py b/fastapi/app/v1/endpoints/general.py index bacd7b1..f2d5a61 100644 --- a/fastapi/app/v1/endpoints/general.py +++ b/fastapi/app/v1/endpoints/general.py @@ -201,21 +201,34 @@ def get_result_type_and_column(input_string): else: raise Exception("Cannot cast result to a valid type") -def flatten_entity_body(entity, body): +def flatten_entity_body(entity, body, name = None): # check if entity is an array if isinstance(entity, list): # loop trought all the values for item in entity: # create the entity - flatten_entity_body(item, body) + flatten_entity_body(item, body, name) return body - for key in list(entity): if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: - body[sta2rest.STA2REST.convert_entity(key)] = entity[key] - flatten_entity_body(entity[key], body) + converted_key = sta2rest.STA2REST.convert_entity(key) + body[converted_key] = entity[key] + + flatten_entity_body(entity[key], body, converted_key) + + if name: + if isinstance(body[name], list): + for item in body[name]: + item[converted_key] = { + "@iot.id": None + } + else: + body[name][converted_key] = { + "@iot.id": None + } + return body def format_entity_body(entity_body): @@ -233,7 +246,7 @@ def format_entity_body(entity_body): if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: if "@iot.id" in entity_body[key]: new_key = sta2rest.STA2REST.convert_to_database_id(key) - formatted_body[new_key] = value["@iot.id"] + formatted_body[new_key] = entity_body[key]["@iot.id"] elif key == "result": value = entity_body["result"] result_type, column_name = get_result_type_and_column(value) @@ -246,8 +259,6 @@ def format_entity_body(entity_body): async def create_entity(entity_name, body): - body = flatten_entity_body(body, body) - body[entity_name] = {} # Loop trough all keys in the body and if they are not an entity create a main entity for key in list(body): @@ -255,6 +266,8 @@ async def create_entity(entity_name, body): body[entity_name][key] = body[key] del body[key] + body = flatten_entity_body(body, body) + # Creation order created_ids = {} creation_order = ["Thing", "Location", "Sensor", "ObservedProperty", "FeaturesOfInterest", "Datastream", "Observation"] From 93a5860570ccdfc52dd2ea0091076878d21b904b Mon Sep 17 00:00:00 2001 From: filippofinke Date: Thu, 3 Aug 2023 16:58:34 +0200 Subject: [PATCH 06/33] feat: get ids --- fastapi/app/v1/endpoints/general.py | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/fastapi/app/v1/endpoints/general.py b/fastapi/app/v1/endpoints/general.py index f2d5a61..bfd1a0a 100644 --- a/fastapi/app/v1/endpoints/general.py +++ b/fastapi/app/v1/endpoints/general.py @@ -268,6 +268,7 @@ async def create_entity(entity_name, body): body = flatten_entity_body(body, body) + # Creation order created_ids = {} creation_order = ["Thing", "Location", "Sensor", "ObservedProperty", "FeaturesOfInterest", "Datastream", "Observation"] @@ -275,10 +276,24 @@ async def create_entity(entity_name, body): if entity_name in body: formatted_body = format_entity_body(body[entity_name]) + # check if the entity has sub entities and if they are empty check if id is present + if isinstance(formatted_body, list): + for item in formatted_body: + for key in item: + if key in created_ids: + item[key] = created_ids[key] + else: + for key in formatted_body: + # check if key is present in created_ids + if key in created_ids: + formatted_body[key] = created_ids[key] + url = "http://postgrest:3000/" + entity_name - print("CREATING ENTITY", entity_name, formatted_body) - ''' + print("Creating entity: ", entity_name) + print("Body: ", formatted_body) + + """ async with httpx.AsyncClient() as client: # post to postgrest r = await client.post(url, json=formatted_body, headers={"Prefer": "return=representation"}) @@ -290,7 +305,14 @@ async def create_entity(entity_name, body): if r.status_code != 201: raise PostgRESTError(result["message"]) - return result''' + # get first element of the result + result = result[0] + + # get the id of the created entity + id_key = sta2rest.STA2REST.convert_to_database_id(entity_name) + created_ids[id_key] = result["id"] + + print("Created entity: ", id_key, " with id: ", result["id"])""" return None From 6098d4cf09fd6c7fc9d7618ae86ec82fa0c28643 Mon Sep 17 00:00:00 2001 From: filippofinke Date: Thu, 3 Aug 2023 17:05:22 +0200 Subject: [PATCH 07/33] fix: get correct ids --- fastapi/app/v1/endpoints/general.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/fastapi/app/v1/endpoints/general.py b/fastapi/app/v1/endpoints/general.py index bfd1a0a..92fdbce 100644 --- a/fastapi/app/v1/endpoints/general.py +++ b/fastapi/app/v1/endpoints/general.py @@ -222,11 +222,11 @@ def flatten_entity_body(entity, body, name = None): if isinstance(body[name], list): for item in body[name]: item[converted_key] = { - "@iot.id": None + "@iot.id": entity[key]["@iot.id"] if "@iot.id" in entity[key] else None } else: body[name][converted_key] = { - "@iot.id": None + "@iot.id": entity[key]["@iot.id"] if "@iot.id" in entity[key] else None } return body @@ -259,14 +259,11 @@ def format_entity_body(entity_body): async def create_entity(entity_name, body): - body[entity_name] = {} - # Loop trough all keys in the body and if they are not an entity create a main entity - for key in list(body): - if isinstance(key, str) and key not in sta2rest.STA2REST.ENTITY_MAPPING: - body[entity_name][key] = body[key] - del body[key] + entity_body = { + entity_name: body + } - body = flatten_entity_body(body, body) + body = flatten_entity_body(entity_body, entity_body) # Creation order @@ -274,8 +271,12 @@ async def create_entity(entity_name, body): creation_order = ["Thing", "Location", "Sensor", "ObservedProperty", "FeaturesOfInterest", "Datastream", "Observation"] for entity_name in creation_order: if entity_name in body: + formatted_body = format_entity_body(body[entity_name]) + if "@iot.id" in formatted_body: + continue + # check if the entity has sub entities and if they are empty check if id is present if isinstance(formatted_body, list): for item in formatted_body: @@ -292,7 +293,6 @@ async def create_entity(entity_name, body): print("Creating entity: ", entity_name) print("Body: ", formatted_body) - """ async with httpx.AsyncClient() as client: # post to postgrest From f7349c250e170ae77f39e61c75dda70ea8aefb2d Mon Sep 17 00:00:00 2001 From: filippofinke Date: Thu, 3 Aug 2023 17:08:48 +0200 Subject: [PATCH 08/33] feat: create entities --- fastapi/app/v1/endpoints/general.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fastapi/app/v1/endpoints/general.py b/fastapi/app/v1/endpoints/general.py index 92fdbce..00ee8dc 100644 --- a/fastapi/app/v1/endpoints/general.py +++ b/fastapi/app/v1/endpoints/general.py @@ -293,7 +293,7 @@ async def create_entity(entity_name, body): print("Creating entity: ", entity_name) print("Body: ", formatted_body) - """ + async with httpx.AsyncClient() as client: # post to postgrest r = await client.post(url, json=formatted_body, headers={"Prefer": "return=representation"}) @@ -312,7 +312,7 @@ async def create_entity(entity_name, body): id_key = sta2rest.STA2REST.convert_to_database_id(entity_name) created_ids[id_key] = result["id"] - print("Created entity: ", id_key, " with id: ", result["id"])""" + print("Created entity: ", id_key, " with id: ", result["id"]) return None From 8782934c46382157c9b72dd8213bb8952e512ae5 Mon Sep 17 00:00:00 2001 From: filippofinke Date: Fri, 4 Aug 2023 21:45:58 +0200 Subject: [PATCH 09/33] fix: default data column name --- database/istsos_example_data.sql | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/database/istsos_example_data.sql b/database/istsos_example_data.sql index 0babda6..20b8720 100644 --- a/database/istsos_example_data.sql +++ b/database/istsos_example_data.sql @@ -81,15 +81,15 @@ VALUES ('Room 102', 'Feature of interest for Room 102', 'application/vnd.geo+jso -- observation -INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "featuresofinterest_id") VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 2, 23.5, NULL, NULL, NULL, 1, 1); -INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "featuresofinterest_id") VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 2, 23.5, NULL, NULL, NULL, 2, 2); -INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultDouble", "resultQuality", "validTime", "parameters", "datastream_id", "featuresofinterest_id") VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 2, 23.5, NULL, NULL, NULL, 3, 3); -INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultString", "resultQuality", "validTime", "parameters", "datastream_id", "feature_of_interest_id") +INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultString", "resultQuality", "validTime", "parameters", "datastream_id", "featuresofinterest_id") VALUES ('2023-03-26 10:30:00-04', '2023-03-26 10:30:00-04', 0, "Test", NULL, NULL, NULL, 3, 3); From 1b01b359908146681473a1f5585ecacb037d6e07 Mon Sep 17 00:00:00 2001 From: filippofinke Date: Fri, 4 Aug 2023 21:47:20 +0200 Subject: [PATCH 10/33] fix: column name --- database/version_observation.sql | 8 ++++---- fastapi/app/models/observation.py | 4 ++-- fastapi/app/v1/endpoints/observations.py | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/database/version_observation.sql b/database/version_observation.sql index 354cec3..69ce1e7 100644 --- a/database/version_observation.sql +++ b/database/version_observation.sql @@ -10,7 +10,7 @@ insert "validTime", "parameters", "datastream_id", - "feature_of_interest_id" + "featuresofinterest_id" ) values ( now(), @@ -33,7 +33,7 @@ set --"validTime" = null, --parameters = null, --datastream_id = 1, - feature_of_interest_id = 1 + featuresofinterest_id = 1 --system_time_validity = '["2023-03-25 14:35:11.272722+01",infinity)'::tstzrange where id = 2; @@ -48,7 +48,7 @@ set --"validTime" = null, --parameters = null, --datastream_id = 1, - feature_of_interest_id = 1 + featuresofinterest_id = 1 --system_time_validity = '["2023-03-25 14:35:11.272722+01",infinity)'::tstzrange where id = 2; @@ -63,7 +63,7 @@ set --"validTime" = null, --parameters = null, --datastream_id = 1, - feature_of_interest_id = 1 + featuresofinterest_id = 1 --system_time_validity = '["2023-03-25 14:35:11.272722+01",infinity)'::tstzrange where id = 2; diff --git a/fastapi/app/models/observation.py b/fastapi/app/models/observation.py index 03319b7..066aef1 100644 --- a/fastapi/app/models/observation.py +++ b/fastapi/app/models/observation.py @@ -14,7 +14,7 @@ class Observation(BaseModel): validTime: str | None = None parameters: Json | None = None datastream_id: int - feature_of_interest_id: int + featuresofinterest_id: int class put_Observation(BaseModel): id: int | None = None @@ -25,4 +25,4 @@ class put_Observation(BaseModel): validTime: str | None = None parameters: Json | None = None datastream_id: int | None = None - feature_of_interest_id: int | None = None \ No newline at end of file + featuresofinterest_id: int | None = None \ No newline at end of file diff --git a/fastapi/app/v1/endpoints/observations.py b/fastapi/app/v1/endpoints/observations.py index 33b1164..9206384 100644 --- a/fastapi/app/v1/endpoints/observations.py +++ b/fastapi/app/v1/endpoints/observations.py @@ -34,7 +34,7 @@ async def get_observations(as_of_system_time: datetime | None = None): 'http://postgrest:3000/Observation', params={ 'limit':100, - 'select':'id, phenomenonTime, resultTime, result, resultQuality, validTime, parameters, datastream_id, feature_of_interest_id', + 'select':'id, phenomenonTime, resultTime, result, resultQuality, validTime, parameters, datastream_id, featuresofinterest_id', 'order': 'id.asc' } ) @@ -44,7 +44,7 @@ async def get_observations(as_of_system_time: datetime | None = None): 'http://postgrest:3000/Observation_traveltime' + '?limit=100&system_time_validity=' + f'cs.[{as_of_system_time.isoformat()}, {as_of_system_time.isoformat()}}}]'.replace('+','%2B')+ - '&select=id,phenomenonTime,resultTime,result,resultQuality,validTime,parameters,datastream_id,feature_of_interest_id' + + '&select=id,phenomenonTime,resultTime,result,resultQuality,validTime,parameters,datastream_id,featuresofinterest_id' + '&order=id.asc' ) print('result', ujson.loads(result.text)) @@ -73,7 +73,7 @@ async def get_observation(id: int, query_options: QueryParameters=Depends()): params={ 'id': f'eq.{id}', 'limit':100, - 'select':'id, phenomenonTime, resultTime, result, resultQuality, validTime, parameters, datastream_id, feature_of_interest_id', + 'select':'id, phenomenonTime, resultTime, result, resultQuality, validTime, parameters, datastream_id, featuresofinterest_id', 'order': 'id.asc' }, headers={'Accept': 'application/vnd.pgrst.object+json'} From 6fd604b14a9f15b606a5b2b9ab0069a8640348de Mon Sep 17 00:00:00 2001 From: filippofinke Date: Fri, 4 Aug 2023 21:57:51 +0200 Subject: [PATCH 11/33] fix: select default columns with expand --- fastapi/app/sta2rest/sta2rest.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/fastapi/app/sta2rest/sta2rest.py b/fastapi/app/sta2rest/sta2rest.py index 4d14f3c..cafc20d 100644 --- a/fastapi/app/sta2rest/sta2rest.py +++ b/fastapi/app/sta2rest/sta2rest.py @@ -567,6 +567,14 @@ def convert_query(full_path: str) -> str: if not entities: single_result = True + # Check if query has an expand but not a select + if query_ast.expand and not query_ast.select: + # Add default columns to the select node + default_columns = STA2REST.get_default_column_names(main_entity) + query_ast.select = ast.SelectNode([]) + for column in default_columns: + query_ast.select.identifiers.append(ast.IdentifierNode(column)) + # Visit the query ast to convert it visitor = NodeVisitor(main_entity) query_converted = visitor.visit(query_ast) From b90238cef5ed05faa32030b33257c317d317fdcd Mon Sep 17 00:00:00 2001 From: filippofinke Date: Sat, 5 Aug 2023 17:47:46 +0200 Subject: [PATCH 12/33] fix: creation order --- fastapi/app/v1/endpoints/general.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastapi/app/v1/endpoints/general.py b/fastapi/app/v1/endpoints/general.py index 00ee8dc..8793772 100644 --- a/fastapi/app/v1/endpoints/general.py +++ b/fastapi/app/v1/endpoints/general.py @@ -268,7 +268,7 @@ async def create_entity(entity_name, body): # Creation order created_ids = {} - creation_order = ["Thing", "Location", "Sensor", "ObservedProperty", "FeaturesOfInterest", "Datastream", "Observation"] + creation_order = ["Location","Thing", "Sensor", "ObservedProperty", "FeaturesOfInterest", "Datastream", "Observation"] for entity_name in creation_order: if entity_name in body: From bfcf89c067bebd419ceb2b25a42f1946c2920379 Mon Sep 17 00:00:00 2001 From: filippofinke Date: Sat, 5 Aug 2023 17:49:12 +0200 Subject: [PATCH 13/33] fix: check for sub entities --- fastapi/app/sta2rest/sta2rest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fastapi/app/sta2rest/sta2rest.py b/fastapi/app/sta2rest/sta2rest.py index cafc20d..d82b0dd 100644 --- a/fastapi/app/sta2rest/sta2rest.py +++ b/fastapi/app/sta2rest/sta2rest.py @@ -567,8 +567,8 @@ def convert_query(full_path: str) -> str: if not entities: single_result = True - # Check if query has an expand but not a select - if query_ast.expand and not query_ast.select: + # Check if query has an expand but not a select and does not have sub entities + if query_ast.expand and not query_ast.select and not entities: # Add default columns to the select node default_columns = STA2REST.get_default_column_names(main_entity) query_ast.select = ast.SelectNode([]) From 50b50affd8f99da9f733ad9d557b13923ac199be Mon Sep 17 00:00:00 2001 From: filippofinke Date: Sat, 5 Aug 2023 17:53:04 +0200 Subject: [PATCH 14/33] fix: default data --- database/istsos_example_data.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database/istsos_example_data.sql b/database/istsos_example_data.sql index 20b8720..32b7198 100644 --- a/database/istsos_example_data.sql +++ b/database/istsos_example_data.sql @@ -91,5 +91,5 @@ INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultT VALUES ('2023-03-25 10:30:00-04', '2023-03-25 10:30:00-04', 2, 23.5, NULL, NULL, NULL, 3, 3); INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultTime", "resultType", "resultString", "resultQuality", "validTime", "parameters", "datastream_id", "featuresofinterest_id") -VALUES ('2023-03-26 10:30:00-04', '2023-03-26 10:30:00-04', 0, "Test", NULL, NULL, NULL, 3, 3); +VALUES ('2023-03-26 10:30:00-04', '2023-03-26 10:30:00-04', 0, 'Test', NULL, NULL, NULL, 3, 3); From b6e3d23db7c631fa9cf542095219b4cb989f185b Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 20:55:40 +0200 Subject: [PATCH 15/33] feat: add sta_parser module --- fastapi/app/sta2rest/sta_parser/__init__.py | 0 fastapi/app/sta2rest/sta_parser/ast.py | 292 +++++++++++++++++++ fastapi/app/sta2rest/sta_parser/lexer.py | 118 ++++++++ fastapi/app/sta2rest/sta_parser/parser.py | 308 ++++++++++++++++++++ fastapi/app/sta2rest/sta_parser/visitor.py | 69 +++++ fastapi/requirements.txt | 1 - 6 files changed, 787 insertions(+), 1 deletion(-) create mode 100644 fastapi/app/sta2rest/sta_parser/__init__.py create mode 100644 fastapi/app/sta2rest/sta_parser/ast.py create mode 100644 fastapi/app/sta2rest/sta_parser/lexer.py create mode 100644 fastapi/app/sta2rest/sta_parser/parser.py create mode 100644 fastapi/app/sta2rest/sta_parser/visitor.py diff --git a/fastapi/app/sta2rest/sta_parser/__init__.py b/fastapi/app/sta2rest/sta_parser/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fastapi/app/sta2rest/sta_parser/ast.py b/fastapi/app/sta2rest/sta_parser/ast.py new file mode 100644 index 0000000..4300cb9 --- /dev/null +++ b/fastapi/app/sta2rest/sta_parser/ast.py @@ -0,0 +1,292 @@ +""" +AST for the SensorThings API. + +Author: Filippo Finke +""" + +class PrettyPrinter(object): + """ + A class used to pretty print objects. + + Methods: + __str__(): Returns a string representation of the object. + """ + + def __str__(self): + """ + Returns a string representation of the object. + + Returns: + str: The string representation of the object. + """ + lines = [] + for key, val in vars(self).items(): + if val is None: + continue + + if isinstance(val, list): + lines += ['\n {}:'.format(key)] + for item in val: + lines += [' {}'.format(line) for line in str(item).split('\n')] + else: + lines += '{}: {}'.format(key, val).split('\n') + return '\n'.join(lines) + + +class Node(PrettyPrinter): + """ + A base class for nodes in the AST. + + Inherits from PrettyPrinter. + + Attributes: + None + """ + + +class IdentifierNode(Node): + """ + A class representing an identifier node. + + Inherits from Node. + + Attributes: + name (str): The name of the identifier. + """ + + def __init__(self, name): + """ + Initializes an IdentifierNode object. + + Args: + name (str): The name of the identifier. + """ + self.name = name + + +class SelectNode(Node): + """ + A class representing a select node. + + Inherits from Node. + + Attributes: + identifiers (list): A list of identifier nodes. + """ + + def __init__(self, identifiers): + """ + Initializes a SelectNode object. + + Args: + identifiers (list): A list of identifier nodes. + """ + self.identifiers = identifiers + + +class FilterNode(Node): + """ + A class representing a filter node. + + Inherits from Node. + + Attributes: + filter (str): The filter string. + """ + + def __init__(self, filter): + """ + Initializes a FilterNode object. + + Args: + filter (str): The filter string. + """ + self.filter = filter + + +class ExpandNodeIdentifier(Node): + """ + A class representing an expand node with an identifier. + + Inherits from Node. + + Attributes: + identifier (str): The identifier string. + subquery (QueryNode): The subquery associated with the expand node. + """ + + def __init__(self, identifier, subquery=None): + """ + Initializes an ExpandNodeIdentifier object. + + Args: + identifier (str): The identifier string. + subquery (QueryNode, optional): The subquery associated with the expand node. + """ + self.identifier = identifier + self.subquery = subquery + + +class ExpandNode(Node): + """ + A class representing an expand node. + + Inherits from Node. + + Attributes: + identifiers (list): A list of identifier nodes. + """ + + def __init__(self, identifiers): + """ + Initializes an ExpandNode object. + + Args: + identifiers (list): A list of identifier nodes. + """ + self.identifiers = identifiers + + +class OrderByNodeIdentifier(Node): + """ + A class representing an order by node with an identifier. + + Inherits from Node. + + Attributes: + identifier (str): The identifier string. + order (str): The order string. + """ + + def __init__(self, identifier, order): + """ + Initializes an OrderByNodeIdentifier object. + + Args: + identifier (str): The identifier string. + order (str): The order string. + """ + self.identifier = identifier + self.order = order + + +class OrderByNode(Node): + """ + A class representing an order by node. + + Inherits from Node. + + Attributes: + identifiers (list): A list of identifier nodes. + """ + + def __init__(self, identifiers): + """ + Initializes an OrderByNode object. + + Args: + identifiers (list): A list of identifier nodes. + """ + self.identifiers = identifiers + + +class SkipNode(Node): + """ + A class representing a skip node. + + Inherits from Node. + + Attributes: + count (int): The count value. + """ + + def __init__(self, count): + """ + Initializes a SkipNode object. + + Args: + count (int): The count value. + """ + self.count = count + + +class TopNode(Node): + """ + A class representing a top node. + + Inherits from Node. + + Attributes: + count (int): The count value. + """ + + def __init__(self, count): + """ + Initializes a TopNode object. + + Args: + count (int): The count value. + """ + self.count = count + + +class CountNode(Node): + """ + A class representing a count node. + + Inherits from Node. + + Attributes: + value (str): The value string. + """ + + def __init__(self, value): + """ + Initializes a CountNode object. + + Args: + value (str): The value string. + """ + self.value = value + + +class QueryNode(Node): + """ + A class representing a query node. + + Inherits from Node. + + Attributes: + select (SelectNode, optional): The select node. + filter (FilterNode, optional): The filter node. + expand (ExpandNode, optional): The expand node. + orderby (OrderByNode, optional): The order by node. + skip (SkipNode, optional): The skip node. + top (TopNode, optional): The top node. + count (CountNode, optional): The count node. + is_subquery (bool): Indicates if the query is a subquery. + """ + + def __init__(self, select=None, filter=None, expand=None, orderby=None, skip=None, top=None, count=None, + is_subquery=False): + """ + Initializes a QueryNode object. + + Args: + select (SelectNode, optional): The select node. + filter (FilterNode, optional): The filter node. + expand (ExpandNode, optional): The expand node. + orderby (OrderByNode, optional): The order by node. + skip (SkipNode, optional): The skip node. + top (TopNode, optional): The top node. + count (CountNode, optional): The count node. + is_subquery (bool): Indicates if the query is a subquery. + """ + self.select = select + self.filter = filter + self.expand = expand + self.orderby = orderby + self.skip = skip + self.top = top + self.count = count + self.is_subquery = is_subquery diff --git a/fastapi/app/sta2rest/sta_parser/lexer.py b/fastapi/app/sta2rest/sta_parser/lexer.py new file mode 100644 index 0000000..06e86ef --- /dev/null +++ b/fastapi/app/sta2rest/sta_parser/lexer.py @@ -0,0 +1,118 @@ +""" +Lexer for the SensorThings API. + +Author: Filippo Finke +""" + +import re +import urllib.parse + + +# Define the token types +TOKEN_TYPES = { + 'COUNT': r'\$count=', + 'TOP': r'\$top=', + 'SKIP': r'\$skip=', + 'SELECT': r'\$select=', + 'FILTER': r'\$filter=', + 'EXPAND': r'\$expand=', + 'ORDERBY': r'\$orderby=', + 'SUBQUERY_SEPARATOR': r';', + 'VALUE_SEPARATOR': r',', + 'OPTIONS_SEPARATOR': r'&', + 'EQUALS': r'\beq\b', + 'AND': r'\band\b', + 'OR': r'\bor\b', + 'ORDER': r'\basc\b|\bdesc\b', + 'BOOL': r'\btrue\b|\bfalse\b', + 'IDENTIFIER': r'@{0,1}[a-zA-Z_][.a-zA-Z0-9_/]*', + 'FLOAT': r'[0-9]+\.[0-9]+', + 'INTEGER': r'[0-9]+', + 'STRING': r"'[^']*'", + 'LEFT_PAREN': r'\(', + 'RIGHT_PAREN': r'\)', + 'WHITESPACE': r'\s+', +} + + +class Token: + """A class representing a token.""" + + def __init__(self, type, value): + """ + Initialize a new Token object. + + Args: + type (str): The type of the token. + value (str): The value of the token. + """ + self.type = type + self.value = value + + def __str__(self): + """ + Return a string representation of the token. + + Returns: + str: The string representation of the token. + """ + return f'Token({self.type}, {self.value})' + + +class Lexer: + """A class for tokenizing SensorThings API queries.""" + + def __init__(self, text): + """ + Initialize a new Lexer object. + + Args: + text (str): The input text to be tokenized. + """ + self.text = urllib.parse.unquote_plus(text) + self.tokens = self.tokenize() + + def tokenize(self): + """ + Tokenize the input text. + + Returns: + list: A list of Token objects representing the tokens. + """ + tokens = [] + position = 0 + + while position < len(self.text): + match = None + + for token_type, pattern in TOKEN_TYPES.items(): + regex = re.compile(pattern) + match = regex.match(self.text, position) + + if match: + value = match.group(0) + token = Token(token_type, value) + tokens.append(token) + position = match.end(0) + break + + if not match: + raise Exception(f'Invalid character at position {position}: {self.text[position]}') + + return tokens + + def __str__(self): + """ + Return a string representation of the lexer. + + Returns: + str: The string representation of the lexer. + """ + return '\n'.join(str(token) for token in self.tokens) + + +# Example usage +if __name__ == '__main__': + text = '''$expand=Locations,Datastreams($select=id,name,unitOfMeasurement;$expand=ObservedProperty($select=name),Observations($select=result,phenomenonTime;$orderby=phenomenonTime desc;$top=1))''' + lexer = Lexer(text) + print(lexer) diff --git a/fastapi/app/sta2rest/sta_parser/parser.py b/fastapi/app/sta2rest/sta_parser/parser.py new file mode 100644 index 0000000..1fc8393 --- /dev/null +++ b/fastapi/app/sta2rest/sta_parser/parser.py @@ -0,0 +1,308 @@ +""" +Parser for the SensorThings API query language. + +Author: Filippo Finke +""" + +from .lexer import Lexer +import ast as ast + +class Parser: + def __init__(self, tokens): + """ + Initialize the Parser instance. + + Args: + tokens (list): List of tokens generated by the lexer. + """ + self.tokens = tokens + self.current_token = None + self.next_token() + + def next_token(self): + """ + Get the next token from the list of tokens. + + If there are no more tokens, set the current_token to None. + """ + if self.tokens: + self.current_token = self.tokens.pop(0) + else: + self.current_token = None + + def match(self, token_type): + """ + Match the current token with the specified token type. + + If the current token matches, move to the next token. + If the current token doesn't match, raise an exception. + + Args: + token_type (str): The expected token type. + + Raises: + Exception: If the current token doesn't match the expected type. + """ + if self.current_token and self.current_token.type == token_type: + self.next_token() + else: + raise Exception(f"Expected '{token_type}', but found '{self.current_token.type}' ('{self.current_token.value}')") + + def check_token(self, token_type): + """ + Check if the current token matches the specified token type. + + Args: + token_type (str): The token type to check. + + Returns: + bool: True if the current token matches the token type, False otherwise. + """ + return self.current_token and self.current_token.type == token_type + + def parse_identifier_list(self): + """ + Parse a list of identifiers. + + Returns: + list: A list of ast.IdentifierNode objects representing the identifiers. + """ + identifiers = [] + identifiers.append(ast.IdentifierNode(self.current_token.value)) + self.match('IDENTIFIER') + while self.check_token('VALUE_SEPARATOR'): + self.match('VALUE_SEPARATOR') + identifiers.append(ast.IdentifierNode(self.current_token.value)) + self.match('IDENTIFIER') + return identifiers + + def parse_filter(self, is_in_subquery=False): + """ + Parse a filter expression. + + Args: + is_in_subquery (bool, optional): Whether the filter is in a subquery. Defaults to False. + + Returns: + ast.FilterNode: The parsed filter expression. + """ + self.match('FILTER') + filter = "" + + while not self.check_token('OPTIONS_SEPARATOR') and not self.check_token('SUBQUERY_SEPARATOR') and self.current_token != None and not (is_in_subquery and self.check_token('RIGHT_PAREN')): + filter += self.current_token.value + self.next_token() + + return ast.FilterNode(filter) + + def parse_expand(self): + """ + Parse an expand expression. + + Returns: + ast.ExpandNode: The parsed expand expression. + """ + self.match('EXPAND') + + identifiers = [] + while self.current_token.type != 'OPTIONS_SEPARATOR': + identifier = ast.ExpandNodeIdentifier(self.current_token.value) + self.match('IDENTIFIER') + + # Check if there is a subquery + if self.check_token('LEFT_PAREN'): + identifier.subquery = self.parse_subquery() + + identifiers.append(identifier) + + # Check if there is another option + if self.check_token('VALUE_SEPARATOR'): + self.match('VALUE_SEPARATOR') + else: + break + + return ast.ExpandNode(identifiers) + + def parse_select(self): + """ + Parse a select expression. + + Returns: + ast.SelectNode: The parsed select expression. + """ + self.match('SELECT') + identifiers = self.parse_identifier_list() + return ast.SelectNode(identifiers) + + def parse_orderby(self): + """ + Parse an orderby expression. + + Returns: + ast.OrderByNode: The parsed orderby expression. + """ + self.match('ORDERBY') + # match identifiers separated by commas and check if there is a space and order + identifiers = [] + while True: + identifier = self.current_token.value + self.match('IDENTIFIER') + order = 'asc' + if self.check_token('WHITESPACE'): + self.match('WHITESPACE') + order = self.current_token.value + self.match('ORDER') + + identifiers.append(ast.OrderByNodeIdentifier(identifier, order)) + + if not self.check_token('VALUE_SEPARATOR'): + break + + self.match('VALUE_SEPARATOR') + + return ast.OrderByNode(identifiers) + + def parse_skip(self): + """ + Parse a skip expression. + + Returns: + ast.SkipNode: The parsed skip expression. + """ + self.match('SKIP') + count = int(self.current_token.value) + self.match('INTEGER') + return ast.SkipNode(count) + + def parse_top(self): + """ + Parse a top expression. + + Returns: + ast.TopNode: The parsed top expression. + + Raises: + Exception: If an integer value is expected but not found. + """ + self.match('TOP') + if self.check_token("INTEGER"): + count = int(self.current_token.value) + self.match('INTEGER') + return ast.TopNode(count) + else: + raise Exception(f"Expected integer, but found '{self.current_token.type}' ('{self.current_token.value}')") + + def parse_count(self): + """ + Parse a count expression. + + Returns: + ast.CountNode: The parsed count expression. + """ + self.match('COUNT') + value = self.current_token.value.lower() == 'true' + self.match('BOOL') + return ast.CountNode(value) + + def parse_subquery(self): + """ + Parse a subquery. + + Returns: + ast.QueryNode: The parsed subquery. + """ + self.match('LEFT_PAREN') + select = None + filter = None + expand = None + orderby = None + skip = None + top = None + count = None + + # continue parsing until we reach the end of the query + while True: + if self.current_token.type == 'SELECT': + select = self.parse_select() + elif self.current_token.type == 'FILTER': + filter = self.parse_filter(True) + elif self.current_token.type == 'EXPAND': + expand = self.parse_expand() + elif self.current_token.type == 'ORDERBY': + orderby = self.parse_orderby() + elif self.current_token.type == 'SKIP': + skip = self.parse_skip() + elif self.current_token.type == 'TOP': + top = self.parse_top() + elif self.current_token.type == 'COUNT': + count = self.parse_count() + else: + raise Exception(f"Unexpected token: {self.current_token.type}") + + # check for other options + if self.check_token('SUBQUERY_SEPARATOR'): + self.match('SUBQUERY_SEPARATOR') + else: + break + + self.match('RIGHT_PAREN') + + return ast.QueryNode(select, filter, expand, orderby, skip, top, count, True) + + def parse_query(self): + """ + Parse a query. + + Returns: + ast.QueryNode: The parsed query. + """ + select = None + filter = None + expand = None + orderby = None + skip = None + top = None + count = None + + # continue parsing until we reach the end of the query + while self.current_token != None: + if self.current_token.type == 'SELECT': + select = self.parse_select() + elif self.current_token.type == 'FILTER': + filter = self.parse_filter() + elif self.current_token.type == 'EXPAND': + expand = self.parse_expand() + elif self.current_token.type == 'ORDERBY': + orderby = self.parse_orderby() + elif self.current_token.type == 'SKIP': + skip = self.parse_skip() + elif self.current_token.type == 'TOP': + top = self.parse_top() + elif self.current_token.type == 'COUNT': + count = self.parse_count() + else: + raise Exception(f"Unexpected token: {self.current_token.type}") + + if self.current_token != None: + self.match('OPTIONS_SEPARATOR') + + return ast.QueryNode(select, filter, expand, orderby, skip, top, count) + + def parse(self): + """ + Parse the query. + + Returns: + ast.QueryNode: The parsed query. + """ + return self.parse_query() + +# Example usage +if __name__ == '__main__': + text = '''$select=id,name,description,properties&$top=1000&$filter=properties/type eq 'station'&$expand=Locations,Datastreams($select=id,name,unitOfMeasurement;$expand=ObservedProperty($select=name),Observations($select=result,phenomenonTime;$orderby=phenomenonTime desc;$top=1))''' + lexer = Lexer(text) + tokens = lexer.tokens + + parser = Parser(tokens) + ast = parser.parse() + print(ast) diff --git a/fastapi/app/sta2rest/sta_parser/visitor.py b/fastapi/app/sta2rest/sta_parser/visitor.py new file mode 100644 index 0000000..1259230 --- /dev/null +++ b/fastapi/app/sta2rest/sta_parser/visitor.py @@ -0,0 +1,69 @@ +""" +Visitor for the SensorThings API. + +Author: Filippo Finke +""" +import ast as ast + +class Visitor: + """ + Visitor class for traversing the SensorThings API abstract syntax tree (AST). + + This class provides a visit method that can be used to traverse the AST. + It dynamically calls visit methods based on the type of the node being visited. + If no specific visit method is available for a particular node type, it falls back + to the generic_visit method. + + Attributes: + None + + Methods: + visit(node): Traverse the AST by visiting each node in a depth-first manner. + generic_visit(node): Default visit method called when no specific visit method is available for a node type. + """ + def visit(self, node): + """ + Traverse the AST by visiting each node in a depth-first manner. + + This method dynamically calls visit methods based on the type of the node being visited. + + Args: + node: The current node being visited. + + Returns: + The result of visiting the node. + + Raises: + None + """ + method_name = 'visit_' + type(node).__name__ + visitor = getattr(self, method_name, self.generic_visit) + return visitor(node) + + def generic_visit(self, node): + """ + Default visit method called when no specific visit method is available for a node type. + + This method is called for each field of the node and recursively visits any child nodes. + + Args: + node: The current node being visited. + + Returns: + None + + Raises: + None + """ + # get all the fields of the node + for field_name, field_value in vars(node).items(): + # if the field is a list of nodes + if isinstance(field_value, list): + # visit all the nodes in the list + for item in field_value: + if isinstance(item, ast.Node): + self.visit(item) + # if the field is a node + elif isinstance(field_value, ast.Node): + # visit the node + self.visit(field_value) diff --git a/fastapi/requirements.txt b/fastapi/requirements.txt index 05b8cff..128bddc 100644 --- a/fastapi/requirements.txt +++ b/fastapi/requirements.txt @@ -7,5 +7,4 @@ pypika httpx asyncio postgrest -sta_parser @ git+https://github.com/filippofinke/sta-parser@main odata-query \ No newline at end of file From 5a05ecdec72aa6387d151bf460389c86fe1955aa Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 20:58:20 +0200 Subject: [PATCH 16/33] feat: use local module --- fastapi/app/sta2rest/sta2rest.py | 68 ++++++++++++++++---------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/fastapi/app/sta2rest/sta2rest.py b/fastapi/app/sta2rest/sta2rest.py index d82b0dd..6bc6821 100644 --- a/fastapi/app/sta2rest/sta2rest.py +++ b/fastapi/app/sta2rest/sta2rest.py @@ -7,13 +7,13 @@ representations in a REST API. """ import re -import sta_parser.ast as ast from .filter_visitor import FilterVisitor from odata_query.grammar import ODataLexer from odata_query.grammar import ODataParser -from sta_parser.lexer import Lexer -from sta_parser.visitor import Visitor -from sta_parser.parser import Parser +from .sta_parser.ast import * +from .sta_parser.lexer import Lexer +from .sta_parser.visitor import Visitor +from .sta_parser.parser import Parser # Create the OData lexer and parser odata_filter_lexer = ODataLexer() @@ -34,7 +34,7 @@ def __init__(self, main_entity=None): This class provides a visitor to convert a STA query to a PostgREST query. """ - def visit_IdentifierNode(self, node: ast.IdentifierNode): + def visit_IdentifierNode(self, node: IdentifierNode): """ Visit an identifier node. @@ -54,7 +54,7 @@ def visit_IdentifierNode(self, node: ast.IdentifierNode): return node.name - def visit_SelectNode(self, node: ast.SelectNode): + def visit_SelectNode(self, node: SelectNode): """ Visit a select node. @@ -68,7 +68,7 @@ def visit_SelectNode(self, node: ast.SelectNode): identifiers = ','.join([self.visit(identifier) for identifier in node.identifiers]) return f'select={identifiers}' - def visit_FilterNode(self, node: ast.FilterNode): + def visit_FilterNode(self, node: FilterNode): """ Visit a filter node. @@ -88,7 +88,7 @@ def visit_FilterNode(self, node: ast.FilterNode): res = FilterVisitor().visit(ast) return res - def visit_OrderByNodeIdentifier(self, node: ast.OrderByNodeIdentifier): + def visit_OrderByNodeIdentifier(self, node: OrderByNodeIdentifier): """ Visit an orderby node identifier. @@ -102,7 +102,7 @@ def visit_OrderByNodeIdentifier(self, node: ast.OrderByNodeIdentifier): # Convert the identifier to the format name.order return f'{node.identifier}.{node.order}' - def visit_OrderByNode(self, node: ast.OrderByNode): + def visit_OrderByNode(self, node: OrderByNode): """ Visit an orderby node. @@ -115,7 +115,7 @@ def visit_OrderByNode(self, node: ast.OrderByNode): identifiers = ','.join([self.visit(identifier) for identifier in node.identifiers]) return f'order={identifiers}' - def visit_SkipNode(self, node: ast.SkipNode): + def visit_SkipNode(self, node: SkipNode): """ Visit a skip node. @@ -127,7 +127,7 @@ def visit_SkipNode(self, node: ast.SkipNode): """ return f'offset={node.count}' - def visit_TopNode(self, node: ast.TopNode): + def visit_TopNode(self, node: TopNode): """ Visit a top node. @@ -139,7 +139,7 @@ def visit_TopNode(self, node: ast.TopNode): """ return f'limit={node.count}' - def visit_CountNode(self, node: ast.CountNode): + def visit_CountNode(self, node: CountNode): """ Visit a count node. @@ -151,7 +151,7 @@ def visit_CountNode(self, node: ast.CountNode): """ return f'count={node.value}' - def visit_ExpandNode(self, node: ast.ExpandNode, parent=None): + def visit_ExpandNode(self, node: ExpandNode, parent=None): """ Visit an expand node. @@ -188,9 +188,9 @@ def visit_ExpandNode(self, node: ast.ExpandNode, parent=None): # check if we have a select, filter, orderby, skip, top or count in the subquery if expand_identifier.subquery.select: if not select: - select = ast.SelectNode([]) + select = SelectNode([]) identifiers = ','.join([self.visit(identifier) for identifier in expand_identifier.subquery.select.identifiers]) - select.identifiers.append(ast.IdentifierNode(f'{expand_identifier.identifier}({identifiers})')) + select.identifiers.append(IdentifierNode(f'{expand_identifier.identifier}({identifiers})')) if expand_identifier.subquery.filter: result = self.visit_FilterNode(expand_identifier.subquery.filter) filter = prefix + result @@ -210,7 +210,7 @@ def visit_ExpandNode(self, node: ast.ExpandNode, parent=None): # merge the results if result['select']: if not select: - select = ast.SelectNode([]) + select = SelectNode([]) select.identifiers.extend(result['select'].identifiers) if result['orderby']: if orderby: @@ -236,11 +236,11 @@ def visit_ExpandNode(self, node: ast.ExpandNode, parent=None): # If we don't have a subquery, we add the identifier to the select node if not expand_identifier.subquery or not expand_identifier.subquery.select: if not select: - select = ast.SelectNode([]) + select = SelectNode([]) default_columns = STA2REST.get_default_column_names(expand_identifier.identifier) # join default columns as single string default_columns = ','.join(default_columns) - select.identifiers.append(ast.IdentifierNode(f'{expand_identifier.identifier}({default_columns})')) + select.identifiers.append(IdentifierNode(f'{expand_identifier.identifier}({default_columns})')) # Return the converted expand node return { @@ -252,7 +252,7 @@ def visit_ExpandNode(self, node: ast.ExpandNode, parent=None): 'count': count } - def visit_QueryNode(self, node: ast.QueryNode): + def visit_QueryNode(self, node: QueryNode): """ Visit a query node. @@ -274,7 +274,7 @@ def visit_QueryNode(self, node: ast.QueryNode): # Merge the results with the other parts of the query if result['select']: if not node.select: - node.select = ast.SelectNode([]) + node.select = SelectNode([]) node.select.identifiers.extend(result['select'].identifiers) if result['orderby']: query_parts.append(result['orderby']) @@ -288,13 +288,13 @@ def visit_QueryNode(self, node: ast.QueryNode): query_parts.append(result['filter']) if not node.select: - node.select = ast.SelectNode([]) + node.select = SelectNode([]) # Add "@iot.id", "@iot.selfLink" and "*" to the select node # get default columns for main entity default_columns = STA2REST.get_default_column_names(self.main_entity) for column in default_columns: - node.select.identifiers.append(ast.IdentifierNode(column)) + node.select.identifiers.append(IdentifierNode(column)) # Check if we have a select, filter, orderby, skip, top or count in the query if node.select: @@ -482,7 +482,7 @@ def convert_query(full_path: str) -> str: # Check if we have a query - query_ast = ast.QueryNode(None, None, None, None, None, None, None, False) + query_ast = QueryNode(None, None, None, None, None, None, None, False) if query: lexer = Lexer(query) tokens = lexer.tokenize() @@ -506,17 +506,17 @@ def convert_query(full_path: str) -> str: if entities: if not query_ast.expand: - query_ast.expand = ast.ExpandNode([]) + query_ast.expand = ExpandNode([]) index = 0 # Merge the entities with the query for entity in entities: entity_name = entity[0] - sub_query = ast.QueryNode(None, None, None, None, None, None, None, True) + sub_query = QueryNode(None, None, None, None, None, None, None, True) if entity[1]: single_result = True - sub_query.filter = ast.FilterNode(f"id eq {entity[1]}") + sub_query.filter = FilterNode(f"id eq {entity[1]}") # Check if we are the last entity if index == len(entities) - 1: @@ -524,8 +524,8 @@ def convert_query(full_path: str) -> str: if uri['property_name']: # Add the property name to the select node if not sub_query.select: - sub_query.select = ast.SelectNode([]) - sub_query.select.identifiers.append(ast.IdentifierNode(uri['property_name'])) + sub_query.select = SelectNode([]) + sub_query.select.identifiers.append(IdentifierNode(uri['property_name'])) # Merge the query with the subquery if query_ast.select: @@ -552,17 +552,17 @@ def convert_query(full_path: str) -> str: sub_query.count = query_ast.count query_ast.count = None - query_ast.expand.identifiers.append(ast.ExpandNodeIdentifier(entity_name, sub_query)) + query_ast.expand.identifiers.append(ExpandNodeIdentifier(entity_name, sub_query)) index += 1 else: if uri['property_name']: if not query_ast.select: - query_ast.select = ast.SelectNode([]) - query_ast.select.identifiers.append(ast.IdentifierNode(uri['property_name'])) + query_ast.select = SelectNode([]) + query_ast.select.identifiers.append(IdentifierNode(uri['property_name'])) # Check if we have a filter in the query if main_entity_id: - query_ast.filter = ast.FilterNode(query_ast.filter.filter + f" and id eq {main_entity_id}" if query_ast.filter else f"id eq {main_entity_id}") + query_ast.filter = FilterNode(query_ast.filter.filter + f" and id eq {main_entity_id}" if query_ast.filter else f"id eq {main_entity_id}") if not entities: single_result = True @@ -571,9 +571,9 @@ def convert_query(full_path: str) -> str: if query_ast.expand and not query_ast.select and not entities: # Add default columns to the select node default_columns = STA2REST.get_default_column_names(main_entity) - query_ast.select = ast.SelectNode([]) + query_ast.select = SelectNode([]) for column in default_columns: - query_ast.select.identifiers.append(ast.IdentifierNode(column)) + query_ast.select.identifiers.append(IdentifierNode(column)) # Visit the query ast to convert it visitor = NodeVisitor(main_entity) From 69d27b2cb2348d04b6bd9412be1903658632266c Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 21:10:43 +0200 Subject: [PATCH 17/33] feat: split endpoints --- fastapi/app/sta2pgrest.py | 30 -- fastapi/app/v1/api.py | 16 +- fastapi/app/v1/endpoints/contacts.py | 166 ----------- .../v1/endpoints/{general.py => insert.py} | 165 ----------- fastapi/app/v1/endpoints/observations.py | 279 ------------------ .../app/v1/endpoints/observed_properties.py | 136 --------- fastapi/app/v1/endpoints/read.py | 173 +++++++++++ fastapi/app/v1/endpoints/sensor_types.py | 136 --------- fastapi/app/v1/endpoints/sensors.py | 224 -------------- 9 files changed, 178 insertions(+), 1147 deletions(-) delete mode 100644 fastapi/app/sta2pgrest.py delete mode 100644 fastapi/app/v1/endpoints/contacts.py rename fastapi/app/v1/endpoints/{general.py => insert.py} (58%) delete mode 100644 fastapi/app/v1/endpoints/observations.py delete mode 100644 fastapi/app/v1/endpoints/observed_properties.py create mode 100644 fastapi/app/v1/endpoints/read.py delete mode 100644 fastapi/app/v1/endpoints/sensor_types.py delete mode 100644 fastapi/app/v1/endpoints/sensors.py diff --git a/fastapi/app/sta2pgrest.py b/fastapi/app/sta2pgrest.py deleted file mode 100644 index 82cb23d..0000000 --- a/fastapi/app/sta2pgrest.py +++ /dev/null @@ -1,30 +0,0 @@ -# This tools convert STA requests to postgREST requests. - -def stqa2pgr(sta_obj,sta_par): - pgr_obj = { - 'select': None, - 'limit': None, - 'properties': None, - 'order': None - } - - if sta_par.select: - for obj in select: - pgr_obj['select'][obj]: None - if sta_par.top: - pgr_obj['limit'] = sta_par.top - - if sta_par.filter: - - - sta_par = { - self.expand = expand, - self.select = select, - self.orderby = orderby, - self.top = top, - self.skip = skip, - self.count = count, - self.filter = filter, - self.resultFormat = resultFormat - self.asofsystime = as_of_system_time - } diff --git a/fastapi/app/v1/api.py b/fastapi/app/v1/api.py index 2761ae3..073e318 100644 --- a/fastapi/app/v1/api.py +++ b/fastapi/app/v1/api.py @@ -1,15 +1,9 @@ -from app.v1.endpoints import observed_properties as op -from app.v1.endpoints import sensors -from app.v1.endpoints import contacts -from app.v1.endpoints import observations -from app.v1.endpoints import general +from app.v1.endpoints import read +from app.v1.endpoints import insert from fastapi import FastAPI v1 = FastAPI() -v1.include_router(general.v1) - -#v1.include_router(op.v1) -# v1.include_router(sensors.v1) -# v1.include_router(contacts.v1) -#v1.include_router(observations.v1) \ No newline at end of file +# Register the endpoints +v1.include_router(read.v1) +v1.include_router(insert.v1) diff --git a/fastapi/app/v1/endpoints/contacts.py b/fastapi/app/v1/endpoints/contacts.py deleted file mode 100644 index 6cbbd6b..0000000 --- a/fastapi/app/v1/endpoints/contacts.py +++ /dev/null @@ -1,166 +0,0 @@ -from app.db.db import get_pool -from app.models.contact import Contact -from fastapi import APIRouter, Depends, Response, status -from fastapi.responses import UJSONResponse - - -v1 = APIRouter() - -###################### -# CONTACTS TYPES # -###################### -@v1.get("/contact_types/") -async def get_contact_types(pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - "SELECT enum_range(NULL:: public.contact_type) AS contact_types" - ) - return UJSONResponse(status_code=status.HTTP_200_OK, content=dict(result)) - except Exception as e: - return str(e) -################ -# CONTACTS # -################ -@v1.post("/contacts/") -async def create_contact(contact: Contact, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f""" - INSERT INTO public.contact - (type, person, telephone, fax, email, web, - address, city, admin_area, postal_code, country) - VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - RETURNING id - """, - contact.type, - contact.person, - contact.telephone, - contact.fax, - contact.email, - contact.web, - contact.address, - contact.city, - contact.admin_area, - contact.postal_code, - contact.country, - ) - if result: - return UJSONResponse(status_code=status.HTTP_201_CREATED, content=dict(result)) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=result) - except Exception as e: - return str(e) - - -@v1.get("/contacts/{contact_id}") -async def get_contact(contact_id: int, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f""" - SELECT * FROM public.contact - WHERE id = $1 - """, - contact_id, - ) - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=dict(result)) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.get("/contacts/") -async def get_contacts(pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetch( - "SELECT * FROM public.contact" - ) - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=[dict(r) for r in result]) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.put("/contacts/{contact_id}") -async def update_contact(contact_id: int, contact: Contact, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - """ UPDATE public.contact - SET type = $1, person = $2, telephone = $3, fax = $4, email = $5, web = $6, address = $7, city = $8, admin_area = $9, postal_code = $10, country = $11 - WHERE id = $12 - RETURNING id""", - contact.type, - contact.person, - contact.telephone, - contact.fax, - contact.email, - contact.web, - contact.address, - contact.city, - contact.admin_area, - contact.postal_code, - contact.country, - contact_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.delete("/contacts/{contact_id}") -async def delete_contact(contact_id: int, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f"""DELETE FROM public.contact - WHERE id = $1 - RETURNING id""", - contact_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) \ No newline at end of file diff --git a/fastapi/app/v1/endpoints/general.py b/fastapi/app/v1/endpoints/insert.py similarity index 58% rename from fastapi/app/v1/endpoints/general.py rename to fastapi/app/v1/endpoints/insert.py index 8793772..ee2f779 100644 --- a/fastapi/app/v1/endpoints/general.py +++ b/fastapi/app/v1/endpoints/insert.py @@ -6,171 +6,6 @@ from app.sta2rest import sta2rest v1 = APIRouter() - -tables = ["Datastreams", "FeaturesOfInterest", "HistoricalLocations", "Locations", "Observations", "ObservedProperties", "Sensors", "Things"] -serverSettings = { - "conformance": [ - "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data", - ], -} - -class PostgRESTError(Exception): - pass - - -def __flatten_navigation_links(row): - if row and "@iot.navigationLink" in row: - # merge all the keys from the navigationLink - row.update(row["@iot.navigationLink"]) - del row["@iot.navigationLink"] - - # check if skip@iot.navigationLink is present and remove it - if "skip@iot.navigationLink" in row: - del row["skip@iot.navigationLink"] - -def __flatten_expand_entity(data): - # Check if it is an array - if not isinstance(data, list): - # throw an error - raise PostgRESTError(data) - - # check if data is empty - if not data: - return data - - # Check if there is only one key and it is in an ENTITY_MAPPING from the sta2rest module - if len(data[0].keys()) == 1 and list(data[0].keys())[0] in sta2rest.STA2REST.ENTITY_MAPPING: - # Get the value of the first key - key_name = list(data[0].keys())[0] - data = data[0][key_name] - - return data - -def __create_ref_format(data): - - rows = [data] - - # Check if it is an array - if isinstance(data, list): - key_name = list(data[0].keys())[0] - # Check if the key is in an ENTITY_MAPPING from the sta2rest module - if key_name in sta2rest.STA2REST.ENTITY_MAPPING: - rows = data[0][key_name] - if not isinstance(rows, list): - rows = [rows] - else: - rows = data - - data = { - "value": [] - } - - for row in rows: - data["value"].append({ - "@iot.selfLink": row["@iot.selfLink"] - }) - - return data - -def __handle_root(request: Request): - # Handle the root path - value = [] - # append the domain to the path for each table - for table in tables: - value.append( - { - "name": table, - "url": - request.url._url + table, - } - ) - - response = { - "value": value, - "serverSettings": serverSettings, - } - return response - -@v1.api_route("/{path_name:path}", methods=["GET"]) -async def catch_all_get(request: Request, path_name: str): - if not path_name: - # Handle the root path - return __handle_root(request) - - try: - # get full path from request - full_path = request.url.path - if request.url.query: - full_path += "?" + request.url.query - - result = sta2rest.STA2REST.convert_query(full_path) - - path = result["url"] - - print("original:\t", full_path) - print("url:\t\t", path) - - url = "http://postgrest:3000" + path - - async with httpx.AsyncClient() as client: - r = await client.get(url) - data = r.json() - - # print r status - if r.status_code != 200: - raise PostgRESTError(data["message"]) - - if result['single_result']: - data = __flatten_expand_entity(data) - - # check if the result is an array - if isinstance(data, list): - data = data[0] - - if result['value']: - # get the value of the first key - data = data[list(data.keys())[0]] - elif result['ref']: - data = __create_ref_format(data) - elif "@iot.navigationLink" in data: - __flatten_navigation_links(data) - - elif result['ref']: - data = __create_ref_format(data) - else: - data = __flatten_expand_entity(data) - # check if the result is an array - if not isinstance(data, list): - data = [data] if data else [] - - for row in data: - __flatten_navigation_links(row) - - data = { - "value": data - } - return data - except PostgRESTError as pge: - traceback.print_exc() - return JSONResponse( - status_code=status.HTTP_404_NOT_FOUND, - content={ - "code": 404, - "type": "error", - "message": str(pge) - } - ) - except Exception as e: - # print stack trace - traceback.print_exc() - return JSONResponse( - status_code=status.HTTP_400_BAD_REQUEST, - content={ - "code": 400, - "type": "error", - "message": str(e) - } - ) def get_result_type_and_column(input_string): try: diff --git a/fastapi/app/v1/endpoints/observations.py b/fastapi/app/v1/endpoints/observations.py deleted file mode 100644 index 9206384..0000000 --- a/fastapi/app/v1/endpoints/observations.py +++ /dev/null @@ -1,279 +0,0 @@ -from app.db.db import get_pool -from app.models.observation import Observation -from app.models.query_parameters import QueryParameters -from fastapi import APIRouter, Depends, Response, status -from fastapi.responses import UJSONResponse -from fastapi.encoders import jsonable_encoder - -v1 = APIRouter() - -from starlette.requests import Request -from starlette.responses import StreamingResponse -from starlette.background import BackgroundTask -from datetime import timedelta, datetime -import httpx -import json -import ujson - - -################### -# OBSERVATION # -################### -# from typing import List - -@v1.get("/Observations") -async def get_observations(as_of_system_time: datetime | None = None): - try: - url = "" - print("GO IT!!!") - print("s_of_system_time", as_of_system_time) - async with httpx.AsyncClient() as client: - # result = await client.post(url, data=json.dumps(dict(observation),default=str )) - if not as_of_system_time: - result = await client.get( - 'http://postgrest:3000/Observation', - params={ - 'limit':100, - 'select':'id, phenomenonTime, resultTime, result, resultQuality, validTime, parameters, datastream_id, featuresofinterest_id', - 'order': 'id.asc' - } - ) - else: - print(f'cs.[{as_of_system_time.isoformat()}, {as_of_system_time.isoformat()}}}]'.replace('+','%2B')) - result = await client.get( - 'http://postgrest:3000/Observation_traveltime' + - '?limit=100&system_time_validity=' + - f'cs.[{as_of_system_time.isoformat()}, {as_of_system_time.isoformat()}}}]'.replace('+','%2B')+ - '&select=id,phenomenonTime,resultTime,result,resultQuality,validTime,parameters,datastream_id,featuresofinterest_id' + - '&order=id.asc' - ) - print('result', ujson.loads(result.text)) - - if result and result.status_code == 200: - print('GOT!') - return UJSONResponse(status_code=status.HTTP_200_OK, content=ujson.loads(result.text)) - # return UJSONResponse(status_code=status.HTTP_200_OK, content=result.text) - except Exception as e: - print('except', e) - return str(e) - - -@v1.get("/Observations({id})") -async def get_observation(id: int, query_options: QueryParameters=Depends()): - try: - url = "http://postgrest:3000/Observation" - print("GO IT!!!") - print(id) - async with httpx.AsyncClient() as client: - # print(observation.dict(exclude_none=True)) - # print(dict(observation)) - # result = await client.post(url, data=json.dumps(dict(observation),default=str )) - result = await client.get( - 'http://postgrest:3000/Observation', - params={ - 'id': f'eq.{id}', - 'limit':100, - 'select':'id, phenomenonTime, resultTime, result, resultQuality, validTime, parameters, datastream_id, featuresofinterest_id', - 'order': 'id.asc' - }, - headers={'Accept': 'application/vnd.pgrst.object+json'} - ) - print('result', result) - if result and result.status_code == 200: - print('CREATED!') - # return UJSONResponse(status_code=status.HTTP_201_CREATED, content=result.text) - return UJSONResponse(status_code=status.HTTP_200_OK, content=ujson.loads(result.text)) - # return ujson.loads(result.text) - except Exception as e: - print('except', e) - return str(e) - -@v1.post("/Observation") -async def insert_observation(observation: Observation): - try: - url = "http://postgrest:3000/Observation" - print("GO IT!!!") - async with httpx.AsyncClient() as client: - print(observation.dict(exclude_none=True)) - print(dict(observation)) - # result = await client.post(url, data=json.dumps(dict(observation),default=str )) - result = await client.post(url, data=observation.dict(exclude_none=True)) - print('result', result) - if result and result.status_code == 201: - print('CREATED!') - return UJSONResponse(status_code=status.HTTP_201_CREATED, content='result') - except Exception as e: - print('except', e) - return str(e) - -@v1.put("/Observation({id})") -async def insert_observation(id: int, observation: Observation): - try: - url = "http://postgrest:3000/Observation" - print("GO IT!!!") - print('ID:', id, {'id': f'eq.{id}'}) - async with httpx.AsyncClient() as client: - # print(observation.dict(exclude_none=True)) - # print(dict(observation)) - # result = await client.post(url, data=json.dumps(dict(observation),default=str )) - result = await client.put( - url, - data=observation.dict(exclude_none=True), - params={'id': f'eq.{id}'} - ) - print('result', result) - if result and result.status_code == 204: - print('UPDATED!') - return UJSONResponse(status_code=status.HTTP_204_NO_CONTENT) - except Exception as e: - print('except', e) - return str(e) - -@v1.patch("/Observation({id})") -async def insert_observation(id: int, observation: Observation): - try: - url = "http://postgrest:3000/Observation" - print("GO IT!!!") - print('ID:', id, {'id': f'eq.{id}'}) - async with httpx.AsyncClient() as client: - print(observation.dict(exclude_none=True)) - # print(dict(observation)) - # result = await client.post(url, data=json.dumps(dict(observation),default=str )) - result = await client.put( - url, - data=observation.dict(exclude_none=True), - params={'id': f'eq.{id}'} - ) - print('result', result) - if result and result.status_code == 204: - print('UPDATED!') - return UJSONResponse(status_code=status.HTTP_204_NO_CONTENT) - except Exception as e: - print('except', e) - return str(e) - -# @v1.get("/contacts/{contact_id}") -# async def get_contact(contact_id: int, pgpool=Depends(get_pool)): -# try: -# async with pgpool.acquire() as conn: -# result = await conn.fetchrow( -# f""" -# SELECT * FROM public.contact -# WHERE id = $1 -# """, -# contact_id, -# ) -# if result: -# return UJSONResponse(status_code=status.HTTP_200_OK, content=dict(result)) -# else: -# result = { -# "exceptionReport": { -# "code": "InvalidParameterValue", -# "locator": "id" -# } -# } -# return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) -# except Exception as e: -# return str(e) - - -# @v1.get("/contacts/") -# async def get_contacts(pgpool=Depends(get_pool)): -# try: -# async with pgpool.acquire() as conn: -# result = await conn.fetch( -# "SELECT * FROM public.contact" -# ) -# if result: -# return UJSONResponse(status_code=status.HTTP_200_OK, content=[dict(r) for r in result]) -# else: -# result = { -# "exceptionReport": { -# "code": "InvalidParameterValue", -# "locator": "id" -# } -# } -# return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) -# except Exception as e: -# return str(e) - - -# @v1.put("/contacts/{contact_id}") -# async def update_contact(contact_id: int, contact: Contact, pgpool=Depends(get_pool)): -# try: -# async with pgpool.acquire() as conn: -# result = await conn.fetchrow( -# """ UPDATE public.contact -# SET type = $1, person = $2, telephone = $3, fax = $4, email = $5, web = $6, address = $7, city = $8, admin_area = $9, postal_code = $10, country = $11 -# WHERE id = $12 -# RETURNING id""", -# contact.type, -# contact.person, -# contact.telephone, -# contact.fax, -# contact.email, -# contact.web, -# contact.address, -# contact.city, -# contact.admin_area, -# contact.postal_code, -# contact.country, -# contact_id, -# ) -# if result: -# return Response(status_code=status.HTTP_204_NO_CONTENT) -# else: -# result = { -# "exceptionReport": { -# "code": "InvalidParameterValue", -# "locator": "id" -# } -# } -# return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) -# except Exception as e: -# return str(e) - - -# @v1.delete("/contacts/{contact_id}") -# async def delete_contact(contact_id: int, pgpool=Depends(get_pool)): -# try: -# async with pgpool.acquire() as conn: -# result = await conn.fetchrow( -# f"""DELETE FROM public.contact -# WHERE id = $1 -# RETURNING id""", -# contact_id, -# ) -# if result: -# return Response(status_code=status.HTTP_204_NO_CONTENT) -# else: -# result = { -# "exceptionReport": { -# "code": "InvalidParameterValue", -# "locator": "id" -# } -# } -# return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) -# except Exception as e: -# return str(e) - - -# client = httpx.AsyncClient(base_url="http://localhost:3000/") - - -# async def _reverse_proxy(request: Request): -# url = httpx.URL(path=request.url.path, -# query=request.url.query.encode("utf-8")) -# rp_req = client.build_request(request.method, url, -# headers=request.headers.raw, -# content=await request.body()) -# rp_resp = await client.send(rp_req, stream=True) -# return StreamingResponse( -# rp_resp.aiter_raw(), -# status_code=rp_resp.status_code, -# headers=rp_resp.headers, -# background=BackgroundTask(rp_resp.aclose), -# ) - -# app.add_route("/titles/{path:path}", -# _reverse_proxy, ["GET", "POST"]) diff --git a/fastapi/app/v1/endpoints/observed_properties.py b/fastapi/app/v1/endpoints/observed_properties.py deleted file mode 100644 index 02578fb..0000000 --- a/fastapi/app/v1/endpoints/observed_properties.py +++ /dev/null @@ -1,136 +0,0 @@ -from app.db.db import get_pool -from app.models.observed_property import ObservedProperty -from fastapi import APIRouter, Depends, Response, status -from fastapi.responses import UJSONResponse - - -v1 = APIRouter() - -########################### -# OBSERVED PROPERTIES # -########################### -@v1.post("/ObservedProperties") -async def create_observed_property( - observed_property: ObservedProperty, - pgpool=Depends(get_pool)): - print(observed_property) - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f""" - INSERT INTO public.observed_property - (name, description, definition, constraint) - VALUES($1, $2, $3, $4) - """, - observed_property.name, - observed_property.description, - observed_property.definition, - observed_property.constraint - ) - print(result) - if result: - result = { - "@iot.selfLink": "http://localhost/v.1./ObservedProperties({})".format( - result[0][0] - ) - } - return UJSONResponse(status_code=status.HTTP_201_CREATED, content=dict(result)) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=result) - except Exception as e: - return str(e) - - -@v1.get("/ObservedProperties({observed_property_id})") -async def get_observed_property(observed_property_id: str, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f""" - SELECT * FROM public.observed_property - WHERE id::text = $1 - """, - observed_property_id, - ) - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=dict(result)) - else: - return UJSONResponse(status_code=status.HTTP_204_NO_CONTENT) - except Exception as e: - return str(e) - - -@v1.get("/ObservedProperties") -async def get_observed_properties(pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetch( - "SELECT * FROM public.observed_property" - ) - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=[dict(r) for r in result]) - else: - return UJSONResponse(status_code=status.HTTP_204_NO_CONTENT) - except Exception as e: - return str(e) - - -@v1.put("/ObservedProperties({observed_property_id})") -async def update_observed_properties(observed_property_id: int, observed_property: ObservedProperty, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f""" - UPDATE public.observed_property - SET name = $1, description = $2, definition= $3, constraint = $4 - WHERE id = $5 - RETURNING id - """, - observed_property.name, - observed_property.description, - observed_property.definition, - observed_property.constraint, - observed_property_id - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.delete("/ObservedProperties({observed_property_id})") -async def delete_observed_property(observed_property_id: int, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - """DELETE FROM public.observed_property - WHERE id = $1 - RETURNING *""", - observed_property_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) \ No newline at end of file diff --git a/fastapi/app/v1/endpoints/read.py b/fastapi/app/v1/endpoints/read.py new file mode 100644 index 0000000..d9b6669 --- /dev/null +++ b/fastapi/app/v1/endpoints/read.py @@ -0,0 +1,173 @@ +import httpx +import traceback +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse +from fastapi import status +from app.sta2rest import sta2rest + +v1 = APIRouter() + +tables = ["Datastreams", "FeaturesOfInterest", "HistoricalLocations", "Locations", "Observations", "ObservedProperties", "Sensors", "Things"] +serverSettings = { + "conformance": [ + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data", + ], +} + +class PostgRESTError(Exception): + pass + + +def __flatten_navigation_links(row): + if row and "@iot.navigationLink" in row: + # merge all the keys from the navigationLink + row.update(row["@iot.navigationLink"]) + del row["@iot.navigationLink"] + + # check if skip@iot.navigationLink is present and remove it + if "skip@iot.navigationLink" in row: + del row["skip@iot.navigationLink"] + +def __flatten_expand_entity(data): + # Check if it is an array + if not isinstance(data, list): + # throw an error + raise PostgRESTError(data) + + # check if data is empty + if not data: + return data + + # Check if there is only one key and it is in an ENTITY_MAPPING from the sta2rest module + if len(data[0].keys()) == 1 and list(data[0].keys())[0] in sta2rest.STA2REST.ENTITY_MAPPING: + # Get the value of the first key + key_name = list(data[0].keys())[0] + data = data[0][key_name] + + return data + +def __create_ref_format(data): + + rows = [data] + + # Check if it is an array + if isinstance(data, list): + key_name = list(data[0].keys())[0] + # Check if the key is in an ENTITY_MAPPING from the sta2rest module + if key_name in sta2rest.STA2REST.ENTITY_MAPPING: + rows = data[0][key_name] + if not isinstance(rows, list): + rows = [rows] + else: + rows = data + + data = { + "value": [] + } + + for row in rows: + data["value"].append({ + "@iot.selfLink": row["@iot.selfLink"] + }) + + return data + +def __handle_root(request: Request): + # Handle the root path + value = [] + # append the domain to the path for each table + for table in tables: + value.append( + { + "name": table, + "url": + request.url._url + table, + } + ) + + response = { + "value": value, + "serverSettings": serverSettings, + } + return response + +@v1.api_route("/{path_name:path}", methods=["GET"]) +async def catch_all_get(request: Request, path_name: str): + if not path_name: + # Handle the root path + return __handle_root(request) + + try: + # get full path from request + full_path = request.url.path + if request.url.query: + full_path += "?" + request.url.query + + result = sta2rest.STA2REST.convert_query(full_path) + + path = result["url"] + + print("original:\t", full_path) + print("url:\t\t", path) + + url = "http://postgrest:3000" + path + + async with httpx.AsyncClient() as client: + r = await client.get(url) + data = r.json() + + # print r status + if r.status_code != 200: + raise PostgRESTError(data["message"]) + + if result['single_result']: + data = __flatten_expand_entity(data) + + # check if the result is an array + if isinstance(data, list): + data = data[0] + + if result['value']: + # get the value of the first key + data = data[list(data.keys())[0]] + elif result['ref']: + data = __create_ref_format(data) + elif "@iot.navigationLink" in data: + __flatten_navigation_links(data) + + elif result['ref']: + data = __create_ref_format(data) + else: + data = __flatten_expand_entity(data) + # check if the result is an array + if not isinstance(data, list): + data = [data] if data else [] + + for row in data: + __flatten_navigation_links(row) + + data = { + "value": data + } + return data + except PostgRESTError as pge: + traceback.print_exc() + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={ + "code": 404, + "type": "error", + "message": str(pge) + } + ) + except Exception as e: + # print stack trace + traceback.print_exc() + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "code": 400, + "type": "error", + "message": str(e) + } + ) diff --git a/fastapi/app/v1/endpoints/sensor_types.py b/fastapi/app/v1/endpoints/sensor_types.py deleted file mode 100644 index ca10ff1..0000000 --- a/fastapi/app/v1/endpoints/sensor_types.py +++ /dev/null @@ -1,136 +0,0 @@ -from app.db.db import get_pool -from app.models.sensor import SensorType, Sensor -from fastapi import APIRouter, Depends, Response, status -from fastapi.responses import UJSONResponse - - -v1 = APIRouter() - - -################ -# SENSOR TYPES # -################ -@v1.post("/sensor_types/") -async def create_sensor_type(sensorType: SensorType, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f"""INSERT INTO public.sensor_type - (description, metadata) - VALUES($1, $2) - RETURNING id""", - sensorType.description, - sensorType.metadata, - ) - if result: - return UJSONResponse(status_code=status.HTTP_201_CREATED, content=dict(result)) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=result) - except Exception as e: - return str(e) - - -@v1.get("/sensor_types/{sensor_type_id}") -async def get_sensor_type(sensor_type_id: int, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f"""SELECT * FROM public.sensor_type - WHERE id = $1""", - sensor_type_id, - ) - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=dict(result)) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.get("/sensor_types/") -async def get_sensor_types(pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetch("SELECT * FROM public.sensor_type") - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=[dict(r) for r in result]) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.put("/sensor_types/{sensor_type_id}") -async def update_sensor_type(sensor_type_id: int, sensor_type: SensorType, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - """ UPDATE public.sensor_type - SET description = $1, metadata = $2 - WHERE id = $3 - RETURNING *""", - sensor_type.description, - sensor_type.metadata, - sensor_type_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.delete("/sensor_types/{sensor_type_id}") -async def delete_sensor_type(sensor_type_id: int, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - """ UPDATE public.sensor - SET sensor_type_id = null - WHERE sensor_type_id = $1 - RETURNING *""", - sensor_type_id, - ) - result = await conn.fetchrow( - f"""DELETE FROM public.sensor_type - WHERE id = $1 - RETURNING *""", - sensor_type_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) \ No newline at end of file diff --git a/fastapi/app/v1/endpoints/sensors.py b/fastapi/app/v1/endpoints/sensors.py deleted file mode 100644 index ac2e68b..0000000 --- a/fastapi/app/v1/endpoints/sensors.py +++ /dev/null @@ -1,224 +0,0 @@ -import json, uuid -from app.db.db import get_pool -from app.models.sensor import SensorType, Sensor -from app.models.contact import Contact -from app.models.query_parameters import QueryParameters -from app.v1.endpoints.contacts import * -from app.v1.endpoints.sensor_types import * -from fastapi import APIRouter, Depends, Response, status -from fastapi.responses import UJSONResponse - - -v1 = APIRouter() - - -################ -# SENSORS # -################ -@v1.post("/Sensors/") -async def create_sensor(sensor: Sensor, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - contact_id = None - sensorType_id = None - ID = None - if (type(sensor.contact)) is Contact: - result = await create_contact(sensor.contact, pgpool) - contact_id = json.loads(result.body.decode("utf-8"))["id"] - if (type(sensor.contact)) is int: - contact_id = sensor.contact - if (type(sensor.sensor_type)) is SensorType: - result = await create_sensor_type(sensor.sensor_type, pgpool) - sensorType_id = json.loads(result.body.decode("utf-8"))["id"] - if (type(sensor.sensor_type)) is int: - sensorType_id = sensor.sensor_type - - ID = str(uuid.uuid4()) - for key in dict(sensor): - if key == "id": - ID = sensor.id - result = await conn.fetchrow( - f"""INSERT INTO public.sensor - (id, name, description, encoding_type, sampling_time_resolution, - acquisition_time_resolution, sensor_type_id) - VALUES($1, $2, $3, $4, $5, $6, $7) - RETURNING *""", - ID, - sensor.name, - sensor.description, - sensor.encoding_type, - sensor.sampling_time_resolution, - sensor.acquisition_time_resolution, - sensorType_id - ) - - tmp_result = result - if contact_id is not None: - sensor_id = dict(result)["id"] - result = await conn.fetchrow( - f"""INSERT INTO public.sensor_contact - (sensor_id, contact_id) - VALUES($1, $2) - ON CONFLICT DO NOTHING - RETURNING *""", - sensor_id, - contact_id, - ) - if tmp_result: - for key in dict(tmp_result): - if key == "id": - tmp_result = {"id": str(dict(tmp_result)[key])} - return UJSONResponse(status_code=status.HTTP_201_CREATED, content=tmp_result) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=result) - except Exception as e: - return str(e) - - -@v1.get("/Sensors/{sensor_id}") -async def get_sensor(sensor_id: str, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - f"""SELECT * FROM public.sensor - WHERE id::text = $1""", - sensor_id, - ) - if result: - tmp_result = dict(result) - for key in tmp_result: - if key == "id" or key == "sampling_time_resolution" or key == "acquisition_time_resolution": - tmp_result[key] = str(tmp_result[key]) - return UJSONResponse(status_code=status.HTTP_200_OK, content=tmp_result) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.get("/Sensors/") -async def get_sensors(pgpool=Depends(get_pool), query_options: QueryParameters=Depends()): - try: - sql = "SELECT * FROM public.sensor" - - # return UJSONResponse(status_code=status.HTTP_200_OK, content=query_options.expand) - return UJSONResponse(status_code=status.HTTP_200_OK, content= query_options.to_sql(element='sensor')) - - async with pgpool.acquire() as conn: - result = await conn.fetch("SELECT * FROM public.sensor") - tmp_result = [dict(r) for r in result] - for key in tmp_result: - key["id"] = str(key["id"]) - key["sampling_time_resolution"] = str(key["sampling_time_resolution"]) - key["acquisition_time_resolution"] = str(key["acquisition_time_resolution"]) - - if result: - return UJSONResponse(status_code=status.HTTP_200_OK, content=tmp_result) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.put("/Sensors({sensor_id})") -async def update_sensor(sensor_id: str, sensor: Sensor, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - contact_id = None - sensorType_id = None - if (type(sensor.contact)) is Contact: - result = await create_contact(sensor.contact, pgpool) - contact_id = json.loads(result.body.decode("utf-8"))["id"] - if (type(sensor.contact)) is int: - contact_id = sensor.contact - if (type(sensor.sensor_type)) is SensorType: - result = await create_sensor_type(sensor.sensor_type, pgpool) - sensorType_id = json.loads(result.body.decode("utf-8"))["id"] - if (type(sensor.sensor_type)) is int: - sensorType_id = sensor.sensor_type - result = await conn.fetchrow( - """ UPDATE public.sensor - SET name = $1, description = $2, encoding_type = $3, sampling_time_resolution = $4, acquisition_time_resolution = $5, sensor_type_id = $6 - WHERE id::text = $7 - RETURNING *""", - sensor.name, - sensor.description, - sensor.encoding_type, - sensor.sampling_time_resolution, - sensor.acquisition_time_resolution, - sensorType_id, - sensor_id, - ) - tmp_result = result - if contact_id is not None: - sensor_id = dict(result)["id"] - result = await conn.fetchrow( - f"""INSERT INTO public.sensor_contact - (sensor_id, contact_id) - VALUES($1, $2) - ON CONFLICT DO NOTHING - RETURNING *""", - sensor_id, - contact_id, - ) - - if tmp_result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) - - -@v1.delete("/Sensors({sensor_id})") -async def delete_sensor(sensor_id: str, pgpool=Depends(get_pool)): - try: - async with pgpool.acquire() as conn: - result = await conn.fetchrow( - """DELETE FROM public.sensor_contact - WHERE sensor_id::text = $1 - RETURNING *""", - sensor_id, - ) - result = await conn.fetchrow( - f"""DELETE FROM public.sensor - WHERE id::text = $1 - RETURNING *""", - sensor_id, - ) - if result: - return Response(status_code=status.HTTP_204_NO_CONTENT) - else: - result = { - "exceptionReport": { - "code": "InvalidParameterValue", - "locator": "id" - } - } - return UJSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=result) - except Exception as e: - return str(e) \ No newline at end of file From 8d9fbab2c41d1238024abfbf31761d2de226c414 Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 21:16:30 +0200 Subject: [PATCH 18/33] feat: add utils --- fastapi/app/utils/__init__.py | 0 fastapi/app/utils/utils.py | 204 +++++++++++++++++++++++++++++ fastapi/app/v1/endpoints/insert.py | 147 +-------------------- fastapi/app/v1/endpoints/read.py | 66 +--------- 4 files changed, 207 insertions(+), 210 deletions(-) create mode 100644 fastapi/app/utils/__init__.py create mode 100644 fastapi/app/utils/utils.py diff --git a/fastapi/app/utils/__init__.py b/fastapi/app/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fastapi/app/utils/utils.py b/fastapi/app/utils/utils.py new file mode 100644 index 0000000..079cc3e --- /dev/null +++ b/fastapi/app/utils/utils.py @@ -0,0 +1,204 @@ +import httpx +from app.sta2rest import sta2rest + +class PostgRESTError(Exception): + pass + +def get_result_type_and_column(input_string): + try: + value = eval(str(input_string)) + except (SyntaxError, NameError): + result_type = 0 + column_name = "resultString" + else: + if isinstance(value, int): + result_type = 1 + column_name = "resultInteger" + elif isinstance(value, float): + result_type = 2 + column_name = "resultDouble" + elif isinstance(value, dict): + result_type = 4 + column_name = "resultJSON" + else: + result_type = None + column_name = None + + if input_string == "true" or input_string == "false": + result_type = 3 + column_name = "resultBoolean" + + if result_type is not None: + return result_type, column_name + else: + raise Exception("Cannot cast result to a valid type") + +def flatten_entity_body(entity, body, name = None): + + # check if entity is an array + if isinstance(entity, list): + # loop trought all the values + for item in entity: + # create the entity + flatten_entity_body(item, body, name) + return body + + for key in list(entity): + if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: + converted_key = sta2rest.STA2REST.convert_entity(key) + body[converted_key] = entity[key] + + flatten_entity_body(entity[key], body, converted_key) + + if name: + if isinstance(body[name], list): + for item in body[name]: + item[converted_key] = { + "@iot.id": entity[key]["@iot.id"] if "@iot.id" in entity[key] else None + } + else: + body[name][converted_key] = { + "@iot.id": entity[key]["@iot.id"] if "@iot.id" in entity[key] else None + } + + return body + +def format_entity_body(entity_body): + # Check if entity_body is an array + if isinstance(entity_body, list): + # Loop trought all the values + for i in range(len(entity_body)): + # Create the entity + entity_body[i] = format_entity_body(entity_body[i]) + return entity_body + + formatted_body = {} + # Loop trought all the keys in the body + for key in entity_body: + if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: + if "@iot.id" in entity_body[key]: + new_key = sta2rest.STA2REST.convert_to_database_id(key) + formatted_body[new_key] = entity_body[key]["@iot.id"] + elif key == "result": + value = entity_body["result"] + result_type, column_name = get_result_type_and_column(value) + formatted_body[column_name] = value + formatted_body["resultType"] = result_type + else: + formatted_body[key] = entity_body[key] + + return formatted_body + +async def create_entity(entity_name, body): + + entity_body = { + entity_name: body + } + + body = flatten_entity_body(entity_body, entity_body) + + + # Creation order + created_ids = {} + creation_order = ["Location","Thing", "Sensor", "ObservedProperty", "FeaturesOfInterest", "Datastream", "Observation"] + for entity_name in creation_order: + if entity_name in body: + + formatted_body = format_entity_body(body[entity_name]) + + if "@iot.id" in formatted_body: + continue + + # check if the entity has sub entities and if they are empty check if id is present + if isinstance(formatted_body, list): + for item in formatted_body: + for key in item: + if key in created_ids: + item[key] = created_ids[key] + else: + for key in formatted_body: + # check if key is present in created_ids + if key in created_ids: + formatted_body[key] = created_ids[key] + + url = "http://postgrest:3000/" + entity_name + + print("Creating entity: ", entity_name) + print("Body: ", formatted_body) + + async with httpx.AsyncClient() as client: + # post to postgrest + r = await client.post(url, json=formatted_body, headers={"Prefer": "return=representation"}) + + # get response + result = r.json() + + # print r status + if r.status_code != 201: + raise PostgRESTError(result["message"]) + + # get first element of the result + result = result[0] + + # get the id of the created entity + id_key = sta2rest.STA2REST.convert_to_database_id(entity_name) + created_ids[id_key] = result["id"] + + print("Created entity: ", id_key, " with id: ", result["id"]) + + return None + + +def __flatten_navigation_links(row): + if row and "@iot.navigationLink" in row: + # merge all the keys from the navigationLink + row.update(row["@iot.navigationLink"]) + del row["@iot.navigationLink"] + + # check if skip@iot.navigationLink is present and remove it + if "skip@iot.navigationLink" in row: + del row["skip@iot.navigationLink"] + +def __flatten_expand_entity(data): + # Check if it is an array + if not isinstance(data, list): + # throw an error + raise PostgRESTError(data) + + # check if data is empty + if not data: + return data + + # Check if there is only one key and it is in an ENTITY_MAPPING from the sta2rest module + if len(data[0].keys()) == 1 and list(data[0].keys())[0] in sta2rest.STA2REST.ENTITY_MAPPING: + # Get the value of the first key + key_name = list(data[0].keys())[0] + data = data[0][key_name] + + return data + +def __create_ref_format(data): + + rows = [data] + + # Check if it is an array + if isinstance(data, list): + key_name = list(data[0].keys())[0] + # Check if the key is in an ENTITY_MAPPING from the sta2rest module + if key_name in sta2rest.STA2REST.ENTITY_MAPPING: + rows = data[0][key_name] + if not isinstance(rows, list): + rows = [rows] + else: + rows = data + + data = { + "value": [] + } + + for row in rows: + data["value"].append({ + "@iot.selfLink": row["@iot.selfLink"] + }) + + return data \ No newline at end of file diff --git a/fastapi/app/v1/endpoints/insert.py b/fastapi/app/v1/endpoints/insert.py index ee2f779..dd9135f 100644 --- a/fastapi/app/v1/endpoints/insert.py +++ b/fastapi/app/v1/endpoints/insert.py @@ -1,156 +1,11 @@ -import httpx import traceback from fastapi import APIRouter, Request from fastapi.responses import JSONResponse from fastapi import status from app.sta2rest import sta2rest +from app.utils.utils import create_entity v1 = APIRouter() - -def get_result_type_and_column(input_string): - try: - value = eval(str(input_string)) - except (SyntaxError, NameError): - result_type = 0 - column_name = "resultString" - else: - if isinstance(value, int): - result_type = 1 - column_name = "resultInteger" - elif isinstance(value, float): - result_type = 2 - column_name = "resultDouble" - elif isinstance(value, dict): - result_type = 4 - column_name = "resultJSON" - else: - result_type = None - column_name = None - - if input_string == "true" or input_string == "false": - result_type = 3 - column_name = "resultBoolean" - - if result_type is not None: - return result_type, column_name - else: - raise Exception("Cannot cast result to a valid type") - -def flatten_entity_body(entity, body, name = None): - - # check if entity is an array - if isinstance(entity, list): - # loop trought all the values - for item in entity: - # create the entity - flatten_entity_body(item, body, name) - return body - - for key in list(entity): - if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: - converted_key = sta2rest.STA2REST.convert_entity(key) - body[converted_key] = entity[key] - - flatten_entity_body(entity[key], body, converted_key) - - if name: - if isinstance(body[name], list): - for item in body[name]: - item[converted_key] = { - "@iot.id": entity[key]["@iot.id"] if "@iot.id" in entity[key] else None - } - else: - body[name][converted_key] = { - "@iot.id": entity[key]["@iot.id"] if "@iot.id" in entity[key] else None - } - - return body - -def format_entity_body(entity_body): - # Check if entity_body is an array - if isinstance(entity_body, list): - # Loop trought all the values - for i in range(len(entity_body)): - # Create the entity - entity_body[i] = format_entity_body(entity_body[i]) - return entity_body - - formatted_body = {} - # Loop trought all the keys in the body - for key in entity_body: - if isinstance(key, str) and key in sta2rest.STA2REST.ENTITY_MAPPING: - if "@iot.id" in entity_body[key]: - new_key = sta2rest.STA2REST.convert_to_database_id(key) - formatted_body[new_key] = entity_body[key]["@iot.id"] - elif key == "result": - value = entity_body["result"] - result_type, column_name = get_result_type_and_column(value) - formatted_body[column_name] = value - formatted_body["resultType"] = result_type - else: - formatted_body[key] = entity_body[key] - - return formatted_body - -async def create_entity(entity_name, body): - - entity_body = { - entity_name: body - } - - body = flatten_entity_body(entity_body, entity_body) - - - # Creation order - created_ids = {} - creation_order = ["Location","Thing", "Sensor", "ObservedProperty", "FeaturesOfInterest", "Datastream", "Observation"] - for entity_name in creation_order: - if entity_name in body: - - formatted_body = format_entity_body(body[entity_name]) - - if "@iot.id" in formatted_body: - continue - - # check if the entity has sub entities and if they are empty check if id is present - if isinstance(formatted_body, list): - for item in formatted_body: - for key in item: - if key in created_ids: - item[key] = created_ids[key] - else: - for key in formatted_body: - # check if key is present in created_ids - if key in created_ids: - formatted_body[key] = created_ids[key] - - url = "http://postgrest:3000/" + entity_name - - print("Creating entity: ", entity_name) - print("Body: ", formatted_body) - - async with httpx.AsyncClient() as client: - # post to postgrest - r = await client.post(url, json=formatted_body, headers={"Prefer": "return=representation"}) - - # get response - result = r.json() - - # print r status - if r.status_code != 201: - raise PostgRESTError(result["message"]) - - # get first element of the result - result = result[0] - - # get the id of the created entity - id_key = sta2rest.STA2REST.convert_to_database_id(entity_name) - created_ids[id_key] = result["id"] - - print("Created entity: ", id_key, " with id: ", result["id"]) - - return None - # Handle POST requests @v1.api_route("/{path_name:path}", methods=["POST"]) diff --git a/fastapi/app/v1/endpoints/read.py b/fastapi/app/v1/endpoints/read.py index d9b6669..f310832 100644 --- a/fastapi/app/v1/endpoints/read.py +++ b/fastapi/app/v1/endpoints/read.py @@ -4,73 +4,11 @@ from fastapi.responses import JSONResponse from fastapi import status from app.sta2rest import sta2rest +from app.utils.utils import PostgRESTError, __create_ref_format, __flatten_expand_entity, __flatten_navigation_links +from app.settings import tables, serverSettings v1 = APIRouter() -tables = ["Datastreams", "FeaturesOfInterest", "HistoricalLocations", "Locations", "Observations", "ObservedProperties", "Sensors", "Things"] -serverSettings = { - "conformance": [ - "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data", - ], -} - -class PostgRESTError(Exception): - pass - - -def __flatten_navigation_links(row): - if row and "@iot.navigationLink" in row: - # merge all the keys from the navigationLink - row.update(row["@iot.navigationLink"]) - del row["@iot.navigationLink"] - - # check if skip@iot.navigationLink is present and remove it - if "skip@iot.navigationLink" in row: - del row["skip@iot.navigationLink"] - -def __flatten_expand_entity(data): - # Check if it is an array - if not isinstance(data, list): - # throw an error - raise PostgRESTError(data) - - # check if data is empty - if not data: - return data - - # Check if there is only one key and it is in an ENTITY_MAPPING from the sta2rest module - if len(data[0].keys()) == 1 and list(data[0].keys())[0] in sta2rest.STA2REST.ENTITY_MAPPING: - # Get the value of the first key - key_name = list(data[0].keys())[0] - data = data[0][key_name] - - return data - -def __create_ref_format(data): - - rows = [data] - - # Check if it is an array - if isinstance(data, list): - key_name = list(data[0].keys())[0] - # Check if the key is in an ENTITY_MAPPING from the sta2rest module - if key_name in sta2rest.STA2REST.ENTITY_MAPPING: - rows = data[0][key_name] - if not isinstance(rows, list): - rows = [rows] - else: - rows = data - - data = { - "value": [] - } - - for row in rows: - data["value"].append({ - "@iot.selfLink": row["@iot.selfLink"] - }) - - return data def __handle_root(request: Request): # Handle the root path From c4279650c15617025f1be60c8060f249cd7f5fc8 Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 21:16:35 +0200 Subject: [PATCH 19/33] feat: add settings --- fastapi/app/settings.py | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 fastapi/app/settings.py diff --git a/fastapi/app/settings.py b/fastapi/app/settings.py new file mode 100644 index 0000000..cdc3d13 --- /dev/null +++ b/fastapi/app/settings.py @@ -0,0 +1,6 @@ +tables = ["Datastreams", "FeaturesOfInterest", "HistoricalLocations", "Locations", "Observations", "ObservedProperties", "Sensors", "Things"] +serverSettings = { + "conformance": [ + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data", + ], +} From f37454779add1ca20e9a8a35b0509da740f5b565 Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 21:23:00 +0200 Subject: [PATCH 20/33] feat: add comments --- fastapi/app/utils/utils.py | 73 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/fastapi/app/utils/utils.py b/fastapi/app/utils/utils.py index 079cc3e..d90a54c 100644 --- a/fastapi/app/utils/utils.py +++ b/fastapi/app/utils/utils.py @@ -2,9 +2,22 @@ from app.sta2rest import sta2rest class PostgRESTError(Exception): + """ + Exception raised for errors in the PostgREST response. + """ pass def get_result_type_and_column(input_string): + """ + Get the result type and the column name for the result + + Args: + input_string (str): The input string + + Returns: + int: The result type + str: The column name + """ try: value = eval(str(input_string)) except (SyntaxError, NameError): @@ -34,6 +47,17 @@ def get_result_type_and_column(input_string): raise Exception("Cannot cast result to a valid type") def flatten_entity_body(entity, body, name = None): + """ + Flatten the entity body + + Args: + entity (dict): The entity + body (dict): The body + name (str, optional): The name of the entity. Defaults to None. + + Returns: + dict: The flattened entity body + """ # check if entity is an array if isinstance(entity, list): @@ -64,6 +88,16 @@ def flatten_entity_body(entity, body, name = None): return body def format_entity_body(entity_body): + """ + Format the entity body + + Args: + entity_body (dict): The entity body + + Returns: + dict: The formatted entity body + """ + # Check if entity_body is an array if isinstance(entity_body, list): # Loop trought all the values @@ -90,6 +124,17 @@ def format_entity_body(entity_body): return formatted_body async def create_entity(entity_name, body): + """ + Create an entity + + Args: + entity_name (str): The entity name + body (dict): The body + + Raises: + PostgRESTError: If the entity cannot be created + + """ entity_body = { entity_name: body @@ -150,6 +195,15 @@ async def create_entity(entity_name, body): def __flatten_navigation_links(row): + """ + Flatten the navigation links + + Args: + row (dict): The row + + Returns: + dict: The flattened navigation links + """ if row and "@iot.navigationLink" in row: # merge all the keys from the navigationLink row.update(row["@iot.navigationLink"]) @@ -160,6 +214,16 @@ def __flatten_navigation_links(row): del row["skip@iot.navigationLink"] def __flatten_expand_entity(data): + """ + Flatten the entity + + Args: + data (dict): The data + + Returns: + dict: The flattened entity + """ + # Check if it is an array if not isinstance(data, list): # throw an error @@ -178,6 +242,15 @@ def __flatten_expand_entity(data): return data def __create_ref_format(data): + """ + Create the ref format + + Args: + data (dict): The data + + Returns: + dict: The ref format + """ rows = [data] From 62112b69963ca24dbe7f1cae7a4b0dc0cb401e2c Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 21:40:42 +0200 Subject: [PATCH 21/33] feat: add delete route --- fastapi/app/v1/api.py | 2 + fastapi/app/v1/endpoints/delete.py | 60 ++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 fastapi/app/v1/endpoints/delete.py diff --git a/fastapi/app/v1/api.py b/fastapi/app/v1/api.py index 073e318..7b1316b 100644 --- a/fastapi/app/v1/api.py +++ b/fastapi/app/v1/api.py @@ -1,5 +1,6 @@ from app.v1.endpoints import read from app.v1.endpoints import insert +from app.v1.endpoints import delete from fastapi import FastAPI v1 = FastAPI() @@ -7,3 +8,4 @@ # Register the endpoints v1.include_router(read.v1) v1.include_router(insert.v1) +v1.include_router(delete.v1) diff --git a/fastapi/app/v1/endpoints/delete.py b/fastapi/app/v1/endpoints/delete.py new file mode 100644 index 0000000..251e256 --- /dev/null +++ b/fastapi/app/v1/endpoints/delete.py @@ -0,0 +1,60 @@ +import traceback +import httpx +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse +from fastapi import status +from app.sta2rest import sta2rest +from app.utils.utils import PostgRESTError + +v1 = APIRouter() + +# Handle DELETE requests +@v1.api_route("/{path_name:path}", methods=["DELETE"]) +async def catch_all_delete(request: Request, path_name: str): + + try: + full_path = request.url.path + # parse uri + result = sta2rest.STA2REST.parse_uri(full_path) + + # Get main entity + [name, id] = result["entity"] + + # Get the name and id + if not name: + raise Exception("No entity name provided") + + if not id: + raise Exception("No entity id provided") + + async with httpx.AsyncClient() as client: + url = "http://postgrest:3000/" + name + "?id=eq." + id + + # post to postgrest + r = await client.delete(url) + + + if r.status_code != 204: + result = r.json() + raise PostgRESTError(result["message"]) + + # Return okay + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "code": 200, + "type": "success" + } + ) + + except Exception as e: + # print stack trace + traceback.print_exc() + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "code": 400, + "type": "error", + "message": str(e) + } + ) From 4a53858ec9ef44904450acd95661b6d3538cba5c Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 21:58:49 +0200 Subject: [PATCH 22/33] feat: fix cascade rules --- database/istsos_schema.sql | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/database/istsos_schema.sql b/database/istsos_schema.sql index bb4d09b..bd1d188 100644 --- a/database/istsos_schema.sql +++ b/database/istsos_schema.sql @@ -21,15 +21,15 @@ CREATE TABLE IF NOT EXISTS sensorthings."Thing" ( "name" VARCHAR(255) UNIQUE NOT NULL, "description" TEXT NOT NULL, "properties" jsonb, - "location_id" BIGINT REFERENCES sensorthings."Location" (id) + "location_id" BIGINT REFERENCES sensorthings."Location" (id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS sensorthings."HistoricalLocation" ( id BIGSERIAL NOT NULL PRIMARY KEY, time TIMESTAMPTZ NOT NULL, - thing_id BIGINT REFERENCES sensorthings."Thing"(id), - location_id BIGINT REFERENCES sensorthings."Location"(id) + thing_id BIGINT REFERENCES sensorthings."Thing"(id) ON DELETE CASCADE, + location_id BIGINT REFERENCES sensorthings."Location"(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS sensorthings."ObservedProperty" ( @@ -59,9 +59,9 @@ CREATE TABLE IF NOT EXISTS sensorthings."Datastream" ( "phenomenonTime" tstzrange, "resultTime" tstzrange, "properties" jsonb, - "thing_id" BIGINT REFERENCES sensorthings."Thing"(id) NOT NULL, - "sensor_id" BIGINT REFERENCES sensorthings."Sensor"(id) NOT NULL, - "observedproperty_id" BIGINT REFERENCES sensorthings."ObservedProperty"(id) + "thing_id" BIGINT REFERENCES sensorthings."Thing"(id) ON DELETE CASCADE, + "sensor_id" BIGINT REFERENCES sensorthings."Sensor"(id) ON DELETE CASCADE, + "observedproperty_id" BIGINT REFERENCES sensorthings."ObservedProperty"(id) ON DELETE CASCADE ); @@ -87,8 +87,8 @@ CREATE TABLE IF NOT EXISTS sensorthings."Observation" ( "resultQuality" TEXT, "validTime" tstzrange DEFAULT NULL, "parameters" jsonb, - "datastream_id" BIGINT REFERENCES sensorthings."Datastream"(id), - "featuresofinterest_id" BIGINT REFERENCES sensorthings."FeaturesOfInterest"(id), + "datastream_id" BIGINT REFERENCES sensorthings."Datastream"(id) ON DELETE CASCADE, + "featuresofinterest_id" BIGINT REFERENCES sensorthings."FeaturesOfInterest"(id) ON DELETE CASCADE, UNIQUE ("datastream_id", "phenomenonTime") ); From 60ce084a6bcdea1df8fb2d04f3ce1ca15b62e8ac Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 21:58:59 +0200 Subject: [PATCH 23/33] feat: wait for database before starting --- docker-compose.yml | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index bd944ba..85fa17a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,24 +35,6 @@ services: depends_on: - database - # POSTGRESQL - # database_test: - # build: - # context: ./database_test - # dockerfile: Dockerfile - # restart: on-failure - # ports: - # - "55432:5432" - # environment: - # POSTGRES_DB: istsos_test - # POSTGRES_USER: admin - # POSTGRES_PASSWORD: admin - # DATADIR: /var/lib/postgresql/data - # POSTGRES_MULTIPLE_EXTENSIONS: postgis,hstore,postgis_topology,postgis_raster,pgrouting,ltree,pg_cron,uuid-ossp - # volumes: - # # - v-istsos-miu-postgis-sql-test:/docker-entrypoint-initdb.d - # - v-istsos-miu-database-data-test:/var/lib/postgresql/data - api: build: context: ./fastapi @@ -63,13 +45,13 @@ services: ports: - 8018:5000 command: uvicorn --reload --workers 1 --host 0.0.0.0 --port 5000 app.main:app + depends_on: + - postgrest volumes: v-istsos-miu-postgis-sql: name: v-istsos-miu-postgis-sql v-istsos-miu-database-data: name: v-istsos-miu-database-data - # v-istsos-miu-postgis-sql-test: - # name: v-istsos-miu-postgis-sql-test v-istsos-miu-database-data-test: name: v-istsos-miu-database-data-test From 728bc3537caa106b3f47a2306b06f00734db9a25 Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 23:22:58 +0200 Subject: [PATCH 24/33] fix: move return --- fastapi/app/v1/endpoints/delete.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/fastapi/app/v1/endpoints/delete.py b/fastapi/app/v1/endpoints/delete.py index 251e256..267a9c7 100644 --- a/fastapi/app/v1/endpoints/delete.py +++ b/fastapi/app/v1/endpoints/delete.py @@ -38,14 +38,14 @@ async def catch_all_delete(request: Request, path_name: str): result = r.json() raise PostgRESTError(result["message"]) - # Return okay - return JSONResponse( - status_code=status.HTTP_200_OK, - content={ - "code": 200, - "type": "success" - } - ) + # Return okay + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "code": 200, + "type": "success" + } + ) except Exception as e: # print stack trace From c84b8a8917d66ca3c049b0bd0695e793ddc145c8 Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 23:23:05 +0200 Subject: [PATCH 25/33] feat: add update endpoint --- fastapi/app/v1/api.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fastapi/app/v1/api.py b/fastapi/app/v1/api.py index 7b1316b..d68877e 100644 --- a/fastapi/app/v1/api.py +++ b/fastapi/app/v1/api.py @@ -1,6 +1,7 @@ from app.v1.endpoints import read from app.v1.endpoints import insert from app.v1.endpoints import delete +from app.v1.endpoints import update_patch from fastapi import FastAPI v1 = FastAPI() @@ -9,3 +10,4 @@ v1.include_router(read.v1) v1.include_router(insert.v1) v1.include_router(delete.v1) +v1.include_router(update_patch.v1) From ceac2598d49b7267f092983d1755ecd796f94db7 Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Mon, 7 Aug 2023 23:23:13 +0200 Subject: [PATCH 26/33] feat: update route for PATCH operations --- fastapi/app/v1/endpoints/update_patch.py | 72 ++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 fastapi/app/v1/endpoints/update_patch.py diff --git a/fastapi/app/v1/endpoints/update_patch.py b/fastapi/app/v1/endpoints/update_patch.py new file mode 100644 index 0000000..850b4c1 --- /dev/null +++ b/fastapi/app/v1/endpoints/update_patch.py @@ -0,0 +1,72 @@ +import traceback +import httpx +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse +from fastapi import status +from app.sta2rest import sta2rest +from app.utils.utils import PostgRESTError + +v1 = APIRouter() + +# Handle UPDATE requests +@v1.api_route("/{path_name:path}", methods=["PATCH"]) +async def catch_all_update(request: Request, path_name: str): + # Accept only content-type application/json + if not "content-type" in request.headers or request.headers["content-type"] != "application/json": + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "code": 400, + "type": "error", + "message": "Only content-type application/json is supported." + } + ) + + try: + full_path = request.url.path + # parse uri + result = sta2rest.STA2REST.parse_uri(full_path) + + # Get main entity + [name, id] = result["entity"] + + # Get the name and id + if not name: + raise Exception("No entity name provided") + + if not id: + raise Exception("No entity id provided") + + + body = await request.json() + + async with httpx.AsyncClient() as client: + url = "http://postgrest:3000/" + name + "?id=eq." + id + + # post to postgrest + r = await client.patch(url, json=body) + + if r.status_code != 204: + result = r.json() + raise PostgRESTError(result["message"]) + + # Return okay + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "code": 200, + "type": "success" + } + ) + + except Exception as e: + # print stack trace + traceback.print_exc() + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content={ + "code": 400, + "type": "error", + "message": str(e) + } + ) From 95166b3914ddd452dd74161696a5ec430e318e38 Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Tue, 8 Aug 2023 20:57:19 +0200 Subject: [PATCH 27/33] feat: convert to SQL and use transactions --- fastapi/app/utils/utils.py | 85 ++++++++++++++---------------- fastapi/app/v1/endpoints/insert.py | 6 ++- 2 files changed, 44 insertions(+), 47 deletions(-) diff --git a/fastapi/app/utils/utils.py b/fastapi/app/utils/utils.py index d90a54c..efd9d3e 100644 --- a/fastapi/app/utils/utils.py +++ b/fastapi/app/utils/utils.py @@ -1,5 +1,5 @@ -import httpx from app.sta2rest import sta2rest +import datetime class PostgRESTError(Exception): """ @@ -123,13 +123,14 @@ def format_entity_body(entity_body): return formatted_body -async def create_entity(entity_name, body): +async def create_entity(entity_name, body, pgpool): """ Create an entity Args: entity_name (str): The entity name body (dict): The body + pgpool (asyncpg.pool.Pool): The database pool Raises: PostgRESTError: If the entity cannot be created @@ -146,50 +147,44 @@ async def create_entity(entity_name, body): # Creation order created_ids = {} creation_order = ["Location","Thing", "Sensor", "ObservedProperty", "FeaturesOfInterest", "Datastream", "Observation"] - for entity_name in creation_order: - if entity_name in body: - - formatted_body = format_entity_body(body[entity_name]) - - if "@iot.id" in formatted_body: - continue - - # check if the entity has sub entities and if they are empty check if id is present - if isinstance(formatted_body, list): - for item in formatted_body: - for key in item: - if key in created_ids: - item[key] = created_ids[key] - else: - for key in formatted_body: - # check if key is present in created_ids - if key in created_ids: - formatted_body[key] = created_ids[key] - - url = "http://postgrest:3000/" + entity_name - - print("Creating entity: ", entity_name) - print("Body: ", formatted_body) - - async with httpx.AsyncClient() as client: - # post to postgrest - r = await client.post(url, json=formatted_body, headers={"Prefer": "return=representation"}) - - # get response - result = r.json() - - # print r status - if r.status_code != 201: - raise PostgRESTError(result["message"]) - - # get first element of the result - result = result[0] - - # get the id of the created entity - id_key = sta2rest.STA2REST.convert_to_database_id(entity_name) - created_ids[id_key] = result["id"] - print("Created entity: ", id_key, " with id: ", result["id"]) + async with pgpool.acquire() as conn: + async with conn.transaction(): + for entity_name in creation_order: + if entity_name in body: + + formatted_body = format_entity_body(body[entity_name]) + + if "@iot.id" in formatted_body: + continue + + # check if the entity has sub entities and if they are empty check if id is present + if isinstance(formatted_body, list): + for item in formatted_body: + for key in item: + if key in created_ids: + item[key] = created_ids[key] + elif "Time" in key: + formatted_body[key] = datetime.datetime.fromisoformat(formatted_body[key]) + else: + for key in formatted_body: + # check if key is present in created_ids + if key in created_ids: + formatted_body[key] = created_ids[key] + elif "Time" in key: + formatted_body[key] = datetime.datetime.fromisoformat(formatted_body[key]) + + # Generate SQL from the body + keys = ', '.join(f'"{key}"' for key in formatted_body.keys()) + values_placeholders = ', '.join(f'${i+1}' for i in range(len(formatted_body))) + + query = f'INSERT INTO sensorthings."{entity_name}" ({keys}) VALUES ({values_placeholders}) RETURNING id' + + print(query) + new_id = await conn.fetchval(query, *formatted_body.values()) + + id_key = sta2rest.STA2REST.convert_to_database_id(entity_name) + created_ids[id_key] = new_id return None diff --git a/fastapi/app/v1/endpoints/insert.py b/fastapi/app/v1/endpoints/insert.py index dd9135f..d630578 100644 --- a/fastapi/app/v1/endpoints/insert.py +++ b/fastapi/app/v1/endpoints/insert.py @@ -4,12 +4,14 @@ from fastapi import status from app.sta2rest import sta2rest from app.utils.utils import create_entity +from fastapi import Depends +from app.db.db import get_pool v1 = APIRouter() # Handle POST requests @v1.api_route("/{path_name:path}", methods=["POST"]) -async def catch_all_post(request: Request, path_name: str): +async def catch_all_post(request: Request, path_name: str, pgpool=Depends(get_pool)): # Accept only content-type application/json if not "content-type" in request.headers or request.headers["content-type"] != "application/json": return JSONResponse( @@ -28,7 +30,7 @@ async def catch_all_post(request: Request, path_name: str): # get json body body = await request.json() main_table = result["entity"][0] - result = await create_entity(main_table, body) + result = await create_entity(main_table, body, pgpool) # Return okay return JSONResponse( status_code=status.HTTP_200_OK, From ec016061d5c83a4f069c3172f7648177ad0bd68a Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Tue, 8 Aug 2023 21:04:02 +0200 Subject: [PATCH 28/33] feat: fix json fields --- fastapi/app/utils/utils.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/fastapi/app/utils/utils.py b/fastapi/app/utils/utils.py index efd9d3e..c983b4a 100644 --- a/fastapi/app/utils/utils.py +++ b/fastapi/app/utils/utils.py @@ -1,6 +1,6 @@ from app.sta2rest import sta2rest import datetime - +import json class PostgRESTError(Exception): """ Exception raised for errors in the PostgREST response. @@ -166,6 +166,9 @@ async def create_entity(entity_name, body, pgpool): item[key] = created_ids[key] elif "Time" in key: formatted_body[key] = datetime.datetime.fromisoformat(formatted_body[key]) + # check if value is a dict and convert it to a str + elif isinstance(formatted_body[key], dict): + formatted_body[key] = json.dumps(formatted_body[key]) else: for key in formatted_body: # check if key is present in created_ids @@ -173,14 +176,18 @@ async def create_entity(entity_name, body, pgpool): formatted_body[key] = created_ids[key] elif "Time" in key: formatted_body[key] = datetime.datetime.fromisoformat(formatted_body[key]) + # check if value is a dict and convert it to a str + elif isinstance(formatted_body[key], dict): + formatted_body[key] = json.dumps(formatted_body[key]) # Generate SQL from the body keys = ', '.join(f'"{key}"' for key in formatted_body.keys()) values_placeholders = ', '.join(f'${i+1}' for i in range(len(formatted_body))) + query = f'INSERT INTO sensorthings."{entity_name}" ({keys}) VALUES ({values_placeholders}) RETURNING id' - print(query) + print(query, formatted_body) new_id = await conn.fetchval(query, *formatted_body.values()) id_key = sta2rest.STA2REST.convert_to_database_id(entity_name) From 500492ff82a07bbd3e1a990bf34aceee1c7c7de8 Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Tue, 8 Aug 2023 21:07:24 +0200 Subject: [PATCH 29/33] refactor: duplicate code --- fastapi/app/utils/utils.py | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/fastapi/app/utils/utils.py b/fastapi/app/utils/utils.py index c983b4a..cceec3a 100644 --- a/fastapi/app/utils/utils.py +++ b/fastapi/app/utils/utils.py @@ -123,6 +123,18 @@ def format_entity_body(entity_body): return formatted_body +def prepare_entity_body_for_insert(entity_body, created_ids): + for key in entity_body: + # check if key is present in created_ids + if key in created_ids: + entity_body[key] = created_ids[key] + elif "Time" in key: + entity_body[key] = datetime.datetime.fromisoformat(entity_body[key]) + # check if value is a dict and convert it to a str + elif isinstance(entity_body[key], dict): + entity_body[key] = json.dumps(entity_body[key]) + + async def create_entity(entity_name, body, pgpool): """ Create an entity @@ -161,24 +173,9 @@ async def create_entity(entity_name, body, pgpool): # check if the entity has sub entities and if they are empty check if id is present if isinstance(formatted_body, list): for item in formatted_body: - for key in item: - if key in created_ids: - item[key] = created_ids[key] - elif "Time" in key: - formatted_body[key] = datetime.datetime.fromisoformat(formatted_body[key]) - # check if value is a dict and convert it to a str - elif isinstance(formatted_body[key], dict): - formatted_body[key] = json.dumps(formatted_body[key]) + prepare_entity_body_for_insert(item, created_ids) else: - for key in formatted_body: - # check if key is present in created_ids - if key in created_ids: - formatted_body[key] = created_ids[key] - elif "Time" in key: - formatted_body[key] = datetime.datetime.fromisoformat(formatted_body[key]) - # check if value is a dict and convert it to a str - elif isinstance(formatted_body[key], dict): - formatted_body[key] = json.dumps(formatted_body[key]) + prepare_entity_body_for_insert(formatted_body, created_ids) # Generate SQL from the body keys = ', '.join(f'"{key}"' for key in formatted_body.keys()) From 62c9e74f9400910b02a0395b05f73023b2725929 Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Sat, 19 Aug 2023 07:35:48 +0200 Subject: [PATCH 30/33] feat: add conformance --- fastapi/app/settings.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/fastapi/app/settings.py b/fastapi/app/settings.py index cdc3d13..d5394ce 100644 --- a/fastapi/app/settings.py +++ b/fastapi/app/settings.py @@ -1,6 +1,29 @@ tables = ["Datastreams", "FeaturesOfInterest", "HistoricalLocations", "Locations", "Observations", "ObservedProperties", "Sensors", "Things"] serverSettings = { "conformance": [ - "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/thing", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/location", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/historical-location", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/datastream", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/sensor", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/observed-property", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/observation", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/feature-of-interest", + "http://www.opengis.net/spec/iot_sensing/1.1/req/datamodel/entity-control-information", + "http://www.opengis.net/spec/iot_sensing/1.1/req/resource-path", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/order", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/expand", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/select", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/orderby", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/top", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/filter", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/built-in-filter-operations", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/create-entity", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/link-to-existing-entities", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/deep-insert", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/deep-insert-status-code", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/update-entity", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/delete-entity", + "http://www.opengis.net/spec/iot_sensing/1.1/req/create-update-delete/update-entity-jsonpatch", ], } From c1bf0e4148623e12431e1303a9149b640a9c9a27 Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Sat, 19 Aug 2023 07:43:09 +0200 Subject: [PATCH 31/33] fix: ast import --- fastapi/app/sta2rest/sta_parser/parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastapi/app/sta2rest/sta_parser/parser.py b/fastapi/app/sta2rest/sta_parser/parser.py index 1fc8393..b2ef182 100644 --- a/fastapi/app/sta2rest/sta_parser/parser.py +++ b/fastapi/app/sta2rest/sta_parser/parser.py @@ -5,7 +5,7 @@ """ from .lexer import Lexer -import ast as ast +from . import ast class Parser: def __init__(self, tokens): From f12c22f668d2bd938d69534d7a01f26b852b24bb Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Sat, 19 Aug 2023 07:43:26 +0200 Subject: [PATCH 32/33] feat: add skip conformance --- fastapi/app/settings.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fastapi/app/settings.py b/fastapi/app/settings.py index d5394ce..c31533c 100644 --- a/fastapi/app/settings.py +++ b/fastapi/app/settings.py @@ -15,6 +15,7 @@ "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/expand", "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/select", "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/orderby", + "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/skip", "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/top", "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/filter", "http://www.opengis.net/spec/iot_sensing/1.1/req/request-data/built-in-filter-operations", From a219cc3f6f25b4887c7167cbf0ab64c38006c5ae Mon Sep 17 00:00:00 2001 From: Filippo Finke Date: Sat, 19 Aug 2023 07:58:52 +0200 Subject: [PATCH 33/33] feat: prevent duplicates for ObservedProperty --- fastapi/app/utils/utils.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/fastapi/app/utils/utils.py b/fastapi/app/utils/utils.py index cceec3a..a9af3b8 100644 --- a/fastapi/app/utils/utils.py +++ b/fastapi/app/utils/utils.py @@ -181,6 +181,17 @@ async def create_entity(entity_name, body, pgpool): keys = ', '.join(f'"{key}"' for key in formatted_body.keys()) values_placeholders = ', '.join(f'${i+1}' for i in range(len(formatted_body))) + # Prevent duplicates of the same entity + if entity_name in ["ObservedProperty"]: + # Check if the entity already exists with all the same values + # Generate WHERE clause from the body + where = ' AND '.join(f'"{key}" = ${i+1}' for i, key in enumerate(formatted_body.keys())) + query = f'SELECT id FROM sensorthings."{entity_name}" WHERE {where}' + existing_id = await conn.fetchval(query, *formatted_body.values()) + print(query, existing_id) + if existing_id: + created_ids[sta2rest.STA2REST.convert_to_database_id(entity_name)] = existing_id + continue query = f'INSERT INTO sensorthings."{entity_name}" ({keys}) VALUES ({values_placeholders}) RETURNING id'