Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8 forbid extra fields in payload #125

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ce6b283
chore: migrate from Pydantic v1 to v2
basarbyz Jul 23, 2024
71e76a8
fix(validation): fix response model issues
basarbyz Jul 23, 2024
0890028
fix(datapoint): remove subscribe field from Datapoint model and handl…
basarbyz Jul 30, 2024
bbaacf4
Merge branch 'main' into 8-forbid-extra-fields-in-payload
basarbyz Sep 10, 2024
29e2333
fixing the merge conflict
basarbyz Sep 10, 2024
3758760
fix(merge conflict): trying to fix pydantic migration
basarbyz Sep 10, 2024
ac3c742
Merge remote-tracking branch 'origin/8-forbid-extra-fields-in-payload…
basarbyz Sep 10, 2024
4f39c9f
fix(merge conflict): trying to fix pydantic migration
basarbyz Sep 10, 2024
e03e779
fix(merge conflict): trying to fix errors related to validation_alias
basarbyz Sep 10, 2024
a0b0d51
fix(merge conflict): trying to resolve the issue with unsupported dum…
basarbyz Sep 10, 2024
d565415
fix(merge conflict): trying to resolve the issue with unsupported dum…
basarbyz Sep 10, 2024
f6c023d
fix(merge conflict): trying to resolve the issue with unsupported dum…
basarbyz Sep 10, 2024
38c1ab2
fix(merge conflict): trying to resolve the issue with unsupported dum…
basarbyz Sep 10, 2024
8f19af1
fix(merge conflict): updated the pydantic version regarding filip
basarbyz Sep 10, 2024
2d52bd1
fix(merge conflict): updated the pydantic version regarding filip
basarbyz Sep 10, 2024
166b04d
fix(merge conflict): updated the pydantic version regarding filip
basarbyz Sep 10, 2024
d88ca39
fix(merge conflict): trying to fix merge conflict
basarbyz Sep 10, 2024
f13b15a
fix(merge conflict): trying to fix merge conflict
basarbyz Sep 10, 2024
76b2d1c
fix(type error): fixing pydantic core url can't concatenate with a st…
basarbyz Sep 11, 2024
e67560c
fix(type error): fixing pydantic core url can't concatenate with a st…
basarbyz Sep 11, 2024
6352888
fix(test errors): in pydantic 2.x json() is deprecated in favor of mo…
basarbyz Sep 11, 2024
d9ebd62
fix(test errors): in pydantic 2.x copy() is deprecated in favor of mo…
basarbyz Sep 11, 2024
286e7de
fix(test errors): fixed version conflicts
basarbyz Sep 12, 2024
804161a
Merge branch 'main' into 8-forbid-extra-fields-in-payload
basarbyz Oct 7, 2024
3985b74
fix(test error)
basarbyz Oct 7, 2024
9cbe282
fixed the merge conflict
basarbyz Oct 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 82 additions & 43 deletions backend/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from uuid import uuid4
import asyncpg
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, Field, validator, field_validator, ConfigDict
from redis import asyncio as aioredis
import aiohttp
import logging
Expand Down Expand Up @@ -36,8 +36,11 @@
logging.basicConfig(level=settings.LOG_LEVEL.upper(),
format='%(asctime)s %(name)s %(levelname)s: %(message)s')


# Pydantic model
class Datapoint(BaseModel):
model_config = ConfigDict(extra="forbid")

object_id: Optional[str] = Field(None, min_length=1, max_length=255)
jsonpath: str
topic: str
Expand All @@ -49,14 +52,17 @@ class Datapoint(BaseModel):
fiware_service: Optional[str] = Field(default=settings.FIWARE_SERVICE, min_length=1,
max_length=255)

@validator('object_id')
@field_validator('object_id')
@classmethod
def validate_object_id(cls, value):
if value is not None:
if not re.match(r'^[a-zA-Z0-9_\-:]+$', value):
raise ValueError('object_id contains invalid characters')
return value

class DatapointUpdate(BaseModel):
model_config = ConfigDict(extra="forbid")

entity_id: Optional[str] = Field(None, min_length=1, max_length=255)
entity_type: Optional[str] = Field(None, min_length=1, max_length=255)
attribute_name: Optional[str] = Field(None, min_length=1, max_length=255)
Expand Down Expand Up @@ -147,13 +153,14 @@ async def get_connection():
description="Get datapoints based on filters. This is to allow the frontend to search datapoints based on any attribute.",
)
async def get_datapoints(
conn: asyncpg.Connection = Depends(get_connection),
object_id: Optional[str] = None,
topic: Optional[str] = None,
jsonpath: Optional[str] = None,
entity_id: Optional[str] = None,
entity_type: Optional[str] = None,
attribute_name: Optional[str] = None
response: Response,
conn: asyncpg.Connection = Depends(get_connection),
object_id: Optional[str] = None,
topic: Optional[str] = None,
jsonpath: Optional[str] = None,
entity_id: Optional[str] = None,
entity_type: Optional[str] = None,
attribute_name: Optional[str] = None
):
"""
Get datapoints based on filters. This is to allow the frontend to search datapoints based on any attribute.
Expand All @@ -173,28 +180,29 @@ async def get_datapoints(
query = "SELECT * FROM datapoints WHERE 1=1"
params = []
if object_id is not None:
query += f" AND object_id=${len(params)+1}"
query += f" AND object_id=${len(params) + 1}"
params.append(object_id)
if topic is not None:
query += f" AND topic=${len(params)+1}"
query += f" AND topic=${len(params) + 1}"
params.append(topic)
if jsonpath is not None:
query += f" AND jsonpath=${len(params)+1}"
query += f" AND jsonpath=${len(params) + 1}"
params.append(jsonpath)
if entity_id is not None:
query += f" AND entity_id=${len(params)+1}"
query += f" AND entity_id=${len(params) + 1}"
params.append(entity_id)
if entity_type is not None:
query += f" AND entity_type=${len(params)+1}"
query += f" AND entity_type=${len(params) + 1}"
params.append(entity_type)
if attribute_name is not None:
query += f" AND attribute_name=${len(params)+1}"
query += f" AND attribute_name=${len(params) + 1}"
params.append(attribute_name)
try:
rows = await conn.fetch(query, *params)
datapoints = [Datapoint(**row) for row in rows]
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return rows
return datapoints

@app.get(
"/data/{object_id}",
Expand All @@ -204,7 +212,7 @@ async def get_datapoints(
If the datapoint is not found, an error will be raised.",
)
async def get_datapoint(
object_id: str, conn: asyncpg.Connection = Depends(get_connection)
object_id: str, conn: asyncpg.Connection = Depends(get_connection)
):
"""
Get a specific datapoint from the gateway. This is to allow the frontend to display a specific datapoint in the database.
Expand All @@ -223,9 +231,7 @@ async def get_datapoint(
)
if row is None:
raise HTTPException(status_code=404, detail="Device not found!")
return row


return Datapoint(**row)

@app.post(
"/data",
Expand All @@ -238,7 +244,7 @@ async def get_datapoint(
database that a new datapoint has been added as well as whether the topic needs to be subscribed to.",
)
async def add_datapoint(
request: Request, datapoint: Datapoint, conn: asyncpg.Connection = Depends(get_connection)
request: Request, response: Response, datapoint: Datapoint, conn: asyncpg.Connection = Depends(get_connection)
):
"""
Add a new datapoint to the gateway. This is to allow to add new datapoints to the gateway via the frontend.
Expand All @@ -264,7 +270,8 @@ async def add_datapoint(
# Validate the presence of required fields if connected is True
if datapoint.connected:
if not datapoint.entity_id or not datapoint.entity_type or not datapoint.attribute_name:
raise HTTPException(status_code=400, detail="entity_id, entity_type, and attribute_name cannot be null if connected is True")
raise HTTPException(status_code=400,
detail="entity_id, entity_type, and attribute_name cannot be null if connected is True")

# Remove 'connected' field if it is set
datapoint.connected = None
Expand Down Expand Up @@ -345,10 +352,14 @@ async def add_datapoint(
stream_name,
{'subscribe': datapoint.topic},
)
response.headers["mqtt-subscription"] = "added"
else:
response.headers["mqtt-subscription"] = "already subscribed"

# Check if the datapoint can be connected
await check_and_update_connected(datapoint.object_id, conn)

return {**datapoint.dict(), "subscribe": subscribed is None}
return {**datapoint.dict()}

except asyncpg.exceptions.UniqueViolationError:
raise HTTPException(status_code=409, detail="Device already exists!")
Expand Down Expand Up @@ -391,7 +402,8 @@ async def update_datapoint(

# Add validation to ensure entity_id, entity_type, and attribute_name are not None
if datapoint.entity_id is None or datapoint.entity_type is None or datapoint.attribute_name is None:
raise HTTPException(status_code=400, detail="entity_id, entity_type, and attribute_name cannot be null")
raise HTTPException(status_code=400,
detail="entity_id, entity_type, and attribute_name cannot be null")

try:
# Start a transaction to ensure atomicity
Expand All @@ -407,9 +419,10 @@ async def update_datapoint(

# Check if the topic or jsonpath field is being updated
if datapoint.topic != existing_datapoint['topic'] or datapoint.jsonpath != existing_datapoint['jsonpath']:
raise HTTPException(status_code=422, detail="Updating the topic or jsonpath field is not allowed!")
raise HTTPException(status_code=422,
detail="Updating the topic or jsonpath field is not allowed!")

# Update the datapoint in the database
# Update the datapoint in the database
await conn.execute(
"""UPDATE datapoints SET entity_id=$1, entity_type=$2, attribute_name=$3, description=$4 WHERE object_id=$5""",
datapoint.entity_id,
Expand Down Expand Up @@ -468,13 +481,15 @@ async def partial_update_datapoint(

update_data = datapoint_update.dict(exclude_unset=True)

if 'entity_id' in update_data and 'attribute_name' not in update_data and existing_datapoint['attribute_name'] is None:
if 'entity_id' in update_data and 'attribute_name' not in update_data and \
existing_datapoint['attribute_name'] is None:
raise HTTPException(
status_code=400,
detail="attribute_name must be set if entity_id is provided!",
)

if 'attribute_name' in update_data and 'entity_id' not in update_data and existing_datapoint['entity_id'] is None:
if 'attribute_name' in update_data and 'entity_id' not in update_data and \
existing_datapoint['entity_id'] is None:
raise HTTPException(
status_code=400,
detail="entity_id must be set if attribute_name is provided!",
Expand All @@ -492,7 +507,8 @@ async def partial_update_datapoint(
try:
async with conn.transaction():
# Dynamically build the SQL query to update only provided fields
set_clauses = ", ".join([f"{key} = ${i + 1}" for i, key in enumerate(update_data.keys())])
set_clauses = ", ".join(
[f"{key} = ${i + 1}" for i, key in enumerate(update_data.keys())])
values = list(update_data.values()) + [object_id]
query = f"UPDATE datapoints SET {set_clauses} WHERE object_id = ${len(values)}"
await conn.execute(query, *values)
Expand Down Expand Up @@ -521,7 +537,7 @@ async def partial_update_datapoint(
# Check if the datapoint can be connected
await check_and_update_connected(object_id, conn)

return updated_datapoint
return Datapoint(**updated_datapoint)

except Exception as e:
logging.error(str(e))
Expand All @@ -535,7 +551,7 @@ async def partial_update_datapoint(
description="Delete a specific datapoint from the gateway. This is to allow the frontend to delete a datapoint from the gateway.",
)
async def delete_datapoint(
object_id: str, conn: asyncpg.Connection = Depends(get_connection)
object_id: str, response: Response, conn: asyncpg.Connection = Depends(get_connection)
):
"""
Delete a specific datapoint from the gateway. This is to allow the frontend to delete a datapoint from the gateway and unsubscribe from the topic if it is the last subscriber.
Expand Down Expand Up @@ -574,11 +590,16 @@ async def delete_datapoint(
stream_name,
{'unsubscribe': datapoint["topic"]},
)
response.headers["mqtt-subscription"] = "unsubscribed"
else:
response.headers["mqtt-subscription"] = "still subscribed"

return None
except Exception as e:
logging.error(str(e))
raise HTTPException(status_code=500, detail="Internal Server Error!")


@app.delete(
"/data",
status_code=204,
Expand Down Expand Up @@ -614,6 +635,7 @@ async def delete_all_datapoints(conn: asyncpg.Connection = Depends(get_connectio
logging.error(str(e))
raise HTTPException(status_code=500, detail="Internal Server Error!")


@app.get(
"/data/{object_id}/status",
response_model=bool,
Expand All @@ -623,7 +645,7 @@ async def delete_all_datapoints(conn: asyncpg.Connection = Depends(get_connectio
"entity/attribute pair in the Context Broker.",
)
async def get_match_status(
object_id: str, conn: asyncpg.Connection = Depends(get_connection)
object_id: str, conn: asyncpg.Connection = Depends(get_connection)
):
"""
Get the match status of a specific datapoint. This is to allow the frontend to check whether a datapoint is matched to an existing entity/attribute pair in the Context Broker.
Expand Down Expand Up @@ -722,36 +744,42 @@ async def get_status():
"redis": await check_redis(),
}

overall_status = "healthy" if all(check["status"] for check in checks.values()) else "unhealthy"
overall_status = "healthy" if all(
check["status"] for check in checks.values()) else "unhealthy"

system_status = {
"overall_status": overall_status,
"checks": checks,
}
return system_status


@app.get("/system/version",
response_model=dict,
summary="Get the version of the system and the dependencies",
description="Get the version of the system. This is to allow the frontend to check the version of the system and its dependencies."
)
)
async def get_version_info():
"""
Return version information for the application and its dependencies.
"""
dependencies = ["fastapi", "aiohttp", "asyncpg", "pydantic", "redis", "uvicorn"]
dependencies = ["fastapi", "aiohttp", "asyncpg", "pydantic", "pydantic-settings",
"redis", "uvicorn"]

def get_dependency_version(package: str):
"""
Get the version of a package.
"""
return importlib.metadata.version(package)

version_results = [get_dependency_version(dep) for dep in dependencies]
version_info = {
"application_version": __version__,
"dependencies": dict(zip(dependencies, version_results))
}
return version_info


async def check_orion():
"""
Check whether the Orion Context Broker is running properly.
Expand All @@ -761,12 +789,15 @@ async def check_orion():
async with aiohttp.ClientSession() as session:
response = await session.get(f"{ORION_URL}/version")
status = response.status == 200
latency = (time.time() - start_time)*1000
latency = (time.time() - start_time) * 1000
return {"status": status, "latency": latency, "latency_unit": "ms",
"message": None if status else "Failed to connect"}
except Exception as e:
logging.error(f"Error checking Orion: {e}")
return {"status": False, "message": str(e)}
return {"status": False, "latency": latency,
"latency_unit": "ms", "message": str(e)}


async def check_postgres():
"""
Check whether the PostgreSQL database is running properly.
Expand All @@ -775,25 +806,33 @@ async def check_postgres():
try:
async with app.state.pool.acquire() as connection:
await connection.execute("SELECT 1")
latency = (time.time() - start_time)*1000
latency = (time.time() - start_time) * 1000
return {"status": True, "latency": latency,
"latency_unit": "ms", "message": None}
except Exception as e:
latency = (time.time() - start_time) * 1000
logging.error(f"Error checking PostgreSQL: {e}")
return {"status": False, "message": str(e)}
return {"status": False, "latency": latency,
"latency_unit": "ms", "message": str(e)}


async def check_redis():
"""
Check whether the Redis cache is running properly.
"""
start_time = time.time()
try:
await app.state.redis.ping()
latency = (time.time() - start_time)*1000
latency = (time.time() - start_time) * 1000
return {"status": True, "latency": latency,
"latency_unit": "ms", "message": None}
except Exception as e:
latency = (time.time() - start_time) * 1000
logging.error(f"Error checking Redis: {e}")
return {"status": False, "message": str(e)}
return {"status": False, "latency": latency,
"latency_unit": "ms", "message": str(e)}


if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True,
log_level=settings.LOG_LEVEL.lower())
log_level=settings.LOG_LEVEL.lower())
3 changes: 2 additions & 1 deletion backend/api/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
asyncpg==0.27.0
fastapi==0.109.1
aiohttp==3.8.4
pydantic==1.10.7
pydantic==2.5.3
pydantic-settings==2.0.0
redis==4.5.4
uvicorn==0.22.0
requests==2.26.0
3 changes: 2 additions & 1 deletion backend/gateway/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ jsonpath_ng==1.5.3
redis==4.5.4
aiologger==0.7.0
aiofiles==23.1.0
pydantic==1.10.7
pydantic==2.5.3
pydantic-settings==2.0.0
Loading
Loading