diff --git a/.github/workflows/setup-docker-and-run-tests.yml b/.github/workflows/setup-docker-and-run-tests.yml new file mode 100644 index 0000000..d650f92 --- /dev/null +++ b/.github/workflows/setup-docker-and-run-tests.yml @@ -0,0 +1,68 @@ +name: setup-docker-and-run-tests.yml + +on: + pull_request: + branches: + - main + push: + +jobs: + setup-and-start-services: + name: Setup Docker and Start Services + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Fiware Environment + run: | + cd fiware-environment + docker compose pull + docker compose up -d + + - name: Create .env File + run: | + cp .env.EXAMPLE .env + + - name: Build docker images + run: | + docker compose build + docker compose up -d + + - name: Wait for service to be ready + run: | + until curl -s http://localhost:5173; do + echo "Waiting for service to be ready..." + sleep 5 + done + + - name: Create .env file for tests + run: cp tests/.env.test.example tests/.env + + - name: Set up Python 3.8 + uses: actions/setup-python@v4 + with: + python-version: '3.8' + + - name: Set up virtual environment + run: | + python -m venv venv + source venv/bin/activate + pip install --upgrade pip + + - name: Install requirements + run: | + source venv/bin/activate + pip install -r tests/requirements.txt + pip install -r backend/api/requirements.txt + pip install -r backend/gateway/requirements.txt + + - name: Set PYTHONPATH + run: echo "PYTHONPATH=$(pwd)" >> $GITHUB_ENV + + - name: Run tests + run: | + source venv/bin/activate + cd tests + python -m unittest discover --verbose . "test_*.py" \ No newline at end of file diff --git a/backend/api/main.py b/backend/api/main.py index 16f5855..b350b2e 100644 --- a/backend/api/main.py +++ b/backend/api/main.py @@ -1,12 +1,12 @@ import importlib import json -from typing import List, Optional, Dict +from typing import List, Optional from uuid import uuid4 import asyncpg import uvicorn -from fastapi import Depends, FastAPI, HTTPException +from fastapi import Depends, FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel, Field, Extra, validator +from pydantic import BaseModel, Field, validator from redis import asyncio as aioredis import aiohttp import logging @@ -14,7 +14,6 @@ import time from settings import settings - __version__ = "0.2.0" app = FastAPI() # enable CORS for the frontend @@ -47,6 +46,8 @@ class Datapoint(BaseModel): attribute_name: Optional[str] = Field(None, min_length=1, max_length=255) description: Optional[str] = "" connected: Optional[bool] = None + fiware_service: Optional[str] = Field(default=settings.FIWARE_SERVICE, min_length=1, + max_length=255) @validator('object_id') def validate_object_id(cls, value): @@ -55,20 +56,13 @@ def validate_object_id(cls, value): raise ValueError('object_id contains invalid characters') return value - class DatapointUpdate(BaseModel): entity_id: Optional[str] = Field(None, min_length=1, max_length=255) entity_type: Optional[str] = Field(None, min_length=1, max_length=255) attribute_name: Optional[str] = Field(None, min_length=1, max_length=255) description: Optional[str] = "" connected: Optional[bool] = None - -class DatapointPartialUpdate(BaseModel): - entity_id: Optional[str] = Field(None, min_length=1, max_length=255) - entity_type: Optional[str] = Field(None, min_length=1, max_length=255) - attribute_name: Optional[str] = Field(None, min_length=1, max_length=255) - description: Optional[str] = "" - connected: Optional[bool] = None + fiware_service: Optional[str] = None # Add this line @app.on_event("startup") @@ -88,7 +82,7 @@ async def startup(): ) # different db for notifications async with app.state.pool.acquire() as connection: - # async with is used to ensure that the connection is released back to the pool after the request is done + # Ensure the datapoints table exists await connection.execute( """CREATE TABLE IF NOT EXISTS datapoints ( object_id TEXT PRIMARY KEY, @@ -98,7 +92,8 @@ async def startup(): entity_type TEXT, attribute_name TEXT, description TEXT, - connected BOOLEAN DEFAULT FALSE + connected BOOLEAN DEFAULT FALSE, + fiware_service TEXT )""" ) @@ -112,6 +107,19 @@ async def startup(): """ALTER TABLE datapoints ADD COLUMN connected BOOLEAN DEFAULT FALSE""" ) + # Check if the fiware_service column exists, and add it if it doesn't + column_exists = await connection.fetchval( + """SELECT EXISTS ( + SELECT 1 + FROM information_schema.columns + WHERE table_name='datapoints' AND column_name='fiware_service' + )""" + ) + if not column_exists: + await connection.execute( + """ALTER TABLE datapoints ADD COLUMN fiware_service TEXT""" + ) + @app.on_event("shutdown") async def shutdown(): @@ -129,8 +137,6 @@ async def get_connection(): to the database for every request. Instead, it can reuse an existing connection from the pool for efficiency. """ async with app.state.pool.acquire() as connection: - # async with is used to ensure that the connection is released back to the pool after the request is done - # a yield statement is used to return the connection to the caller yield connection @@ -189,6 +195,7 @@ async def get_datapoints( except Exception as e: raise HTTPException(status_code=400, detail=str(e)) return rows + @app.get( "/data/{object_id}", response_model=Datapoint, @@ -231,7 +238,7 @@ async def get_datapoint( database that a new datapoint has been added as well as whether the topic needs to be subscribed to.", ) async def add_datapoint( - datapoint: Datapoint, conn: asyncpg.Connection = Depends(get_connection) + request: Request, datapoint: Datapoint, conn: asyncpg.Connection = Depends(get_connection) ): """ Add a new datapoint to the gateway. This is to allow to add new datapoints to the gateway via the frontend. @@ -242,12 +249,15 @@ async def add_datapoint( Args: datapoint (Datapoint): The datapoint to be added to the gateway. conn (asyncpg.Connection, optional): The connection to the database. Defaults to Depends(get_connection) which is a connection from the pool of connections to the database. + request (Request): The request object to get the FIWARE-Service header. Raises: HTTPException: If the datapoint is supposed to be matched but the corresponding information is not provided, a 400 error will be raised. UniqueViolationError: If the object_id of the datapoint already exists in the database, a 409 error will be raised. Exception: If some other error occurs, a 500 error will be raised. """ + if not datapoint.fiware_service: + datapoint.fiware_service = settings.FIWARE_SERVICE logging.info(f"Received datapoint for addition: {datapoint.json()}") @@ -255,11 +265,10 @@ async def add_datapoint( if datapoint.connected: if not datapoint.entity_id or not datapoint.entity_type or not datapoint.attribute_name: raise HTTPException(status_code=400, detail="entity_id, entity_type, and attribute_name cannot be null if connected is True") - + # Remove 'connected' field if it is set datapoint.connected = None - - # Generate a new 6-character object_id if not provided + if datapoint.object_id is None: while True: new_id = str(uuid4())[:6] @@ -272,7 +281,6 @@ async def add_datapoint( break else: - # Check if the provided object_id is unique existing = await conn.fetchrow( """SELECT object_id FROM datapoints WHERE object_id=$1""", datapoint.object_id @@ -280,17 +288,22 @@ async def add_datapoint( if existing: raise HTTPException(status_code=409, detail="object_id already exists!") + if datapoint.connected and ( + datapoint.entity_id is None or datapoint.attribute_name is None + ): + raise HTTPException( + status_code=400, + detail="entity_id and attribute_name must be set if connected is enabled!", + ) try: async with conn.transaction(): - # check if there is already a device subscribed to the same topic - # if so, the gateway will not subscribe to the topic again subscribed = await conn.fetchrow( """SELECT object_id FROM datapoints WHERE topic=$1""", datapoint.topic ) await conn.execute( - """INSERT INTO datapoints (object_id, jsonpath, topic, entity_id, entity_type, attribute_name, description, connected) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8)""", + """INSERT INTO datapoints (object_id, jsonpath, topic, entity_id, entity_type, attribute_name, description, connected, fiware_service) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)""", datapoint.object_id, datapoint.jsonpath, datapoint.topic, @@ -298,7 +311,8 @@ async def add_datapoint( datapoint.entity_type, datapoint.attribute_name, datapoint.description, - False, # Initially set connected to False + False, # Set connected to False initially + datapoint.fiware_service, ) # store the jsonpath and topic in redis for easy retrieval later @@ -319,6 +333,7 @@ async def add_datapoint( "attribute_name": datapoint.attribute_name, "description": datapoint.description, "connected": False, + "fiware_service": datapoint.fiware_service, } ), ) @@ -345,13 +360,13 @@ async def add_datapoint( @app.put( "/data/{object_id}", - response_model=DatapointUpdate, + response_model=Datapoint, summary="Update a specific datapoint in the gateway", description="Update a specific datapoint in the gateway. This is to allow the frontend to match a datapoint to an existing entity/attribute pair in the Context Broker.", ) async def update_datapoint( object_id: str, - datapoint: DatapointUpdate, + datapoint: Datapoint, conn: asyncpg.Connection = Depends(get_connection), ): """ @@ -359,21 +374,21 @@ async def update_datapoint( Args: object_id (str): The object_id of the datapoint to be updated. - datapoint (DatapointUpdate): The updated datapoint. + datapoint (Datapoint): The updated datapoint. conn (asyncpg.Connection, optional): The connection to the database. Defaults to Depends(get_connection) which is a connection from the pool of connections to the database. Raises: - HTTPException: If the datapoint is supposed to be matched but the corresponding information is not provided, a 400 error will be raised. - HTTPException: If the datapoint to be updated is not found, a 404 error will be raised. - HTTPException: Raises a 422 error if attempts are made to modify the original datapoint's jsonpath or topic. - Exception: If some other error occurs, a 500 error will be raised. + HTTPException: If the datapoint is supposed to be matched but the corresponding information is not provided, a 400 error will be raised. + HTTPException: If the datapoint to be updated is not found, a 404 error will be raised. + HTTPException: Raises a 422 error if attempts are made to modify the original datapoint's jsonpath or topic. + Exception: If some other error occurs, a 500 error will be raised. """ # Remove 'connected' field if it is set update_data = datapoint.dict(exclude_unset=True) if 'connected' in update_data: update_data.pop('connected') - + # Add validation to ensure entity_id, entity_type, and attribute_name are not None if datapoint.entity_id is None or datapoint.entity_type is None or datapoint.attribute_name is None: raise HTTPException(status_code=400, detail="entity_id, entity_type, and attribute_name cannot be null") @@ -381,7 +396,20 @@ async def update_datapoint( try: # Start a transaction to ensure atomicity async with conn.transaction(): - + # Fetch the existing datapoint from the database + existing_datapoint = await conn.fetchrow( + """SELECT * FROM datapoints WHERE object_id=$1""", + object_id + ) + # Raise a 404 error if the datapoint does not exist + if not existing_datapoint: + raise HTTPException(status_code=404, detail="Datapoint not found!") + + # Check if the topic or jsonpath field is being updated + if datapoint.topic != existing_datapoint['topic'] or datapoint.jsonpath != existing_datapoint['jsonpath']: + raise HTTPException(status_code=422, detail="Updating the topic or jsonpath field is not allowed!") + + # Update the datapoint in the database await conn.execute( """UPDATE datapoints SET entity_id=$1, entity_type=$2, attribute_name=$3, description=$4 WHERE object_id=$5""", datapoint.entity_id, @@ -413,6 +441,7 @@ async def update_datapoint( # Check if the datapoint can be connected await check_and_update_connected(object_id, conn) + # Return the updated datapoint as a dictionary return {**datapoint.dict()} except Exception as e: @@ -428,7 +457,7 @@ async def update_datapoint( ) async def partial_update_datapoint( object_id: str, - datapoint_update: DatapointPartialUpdate, + datapoint_update: DatapointUpdate, conn: asyncpg.Connection = Depends(get_connection), ): existing_datapoint = await conn.fetchrow( @@ -450,7 +479,7 @@ async def partial_update_datapoint( status_code=400, detail="entity_id must be set if attribute_name is provided!", ) - + if 'connected' in update_data: update_data.pop('connected') @@ -580,7 +609,7 @@ async def delete_all_datapoints(conn: asyncpg.Connection = Depends(get_connectio for datapoint in datapoints: await app.state.redis.hdel(datapoint["topic"], datapoint["object_id"]) return None - + except Exception as e: logging.error(str(e)) raise HTTPException(status_code=500, detail="Internal Server Error!") @@ -610,7 +639,7 @@ async def get_match_status( bool: True if the datapoint is matched to an existing entity/attribute pair in the Context Broker, False otherwise. """ row = await conn.fetchrow( - """SELECT entity_id, entity_type, attribute_name FROM datapoints WHERE object_id=$1""", + """SELECT entity_id, entity_type, attribute_name, fiware_service FROM datapoints WHERE object_id=$1""", object_id, ) if row is None: @@ -620,12 +649,13 @@ async def get_match_status( entity_id = row['entity_id'] attribute_name = row['attribute_name'] entity_type = row['entity_type'] + fiware_service = row['fiware_service'] url = f"{ORION_URL}/v2/entities/{entity_id}/attrs/{attribute_name}/?type={entity_type}" - headers = {'Fiware-Service': settings.FIWARE_SERVICE} + headers = {'Fiware-Service': fiware_service} async with session.get(url, headers=headers) as response: response_text = await response.text() match_status = response.status == 200 - logging.info(f"Checking match status for entity_id: {entity_id}, attribute_name: {attribute_name}, entity_type: {entity_type}") + logging.info(f"Checking match status for entity_id: {entity_id}, attribute_name: {attribute_name}, entity_type: {entity_type}, fiware_service: {fiware_service}") logging.info(f"Request URL: {url}") logging.info(f"Response status: {response.status}") logging.info(f"Response text: {response_text}") @@ -637,14 +667,16 @@ async def check_and_update_connected(object_id: str, conn: asyncpg.Connection): Check if the datapoint can be marked as connected based on the presence of entity_id and attribute_name, and update the connected status accordingly. """ + # Fetch the entity_id, attribute_name, and entity_type from the datapoints table row = await conn.fetchrow( - """SELECT entity_id, attribute_name, entity_type FROM datapoints WHERE object_id=$1""", object_id + """SELECT entity_id, attribute_name, entity_type FROM datapoints WHERE object_id=$1""", object_id ) # Check if entity_id, attribute_name, and entity_type are all present if row['entity_id'] and row['attribute_name'] and row['entity_type']: async with aiohttp.ClientSession() as session: + # Construct the URL to query the FIWARE Context Broker url = f"{settings.ORION_URL}/v2/entities/{row['entity_id']}/attrs/{row['attribute_name']}?type={row['entity_type']}" headers = { @@ -660,6 +692,7 @@ async def check_and_update_connected(object_id: str, conn: asyncpg.Connection): True, object_id, ) + # If the response status is not 200, the entity or attribute does not exist else: await conn.execute( @@ -667,6 +700,7 @@ async def check_and_update_connected(object_id: str, conn: asyncpg.Connection): False, object_id, ) + # If any of the entity_id, attribute_name, or entity_type are missing else: await conn.execute( @@ -675,12 +709,11 @@ async def check_and_update_connected(object_id: str, conn: asyncpg.Connection): object_id, ) - @app.get("/system/status", - response_model=dict, - summary="Get the status of the system", - description="Get the status of the system. This is to allow the frontend to check " - "whether the system is running properly.", + response_model=dict, + summary="Get the status of the system", + description="Get the status of the system. This is to allow the frontend to " + "check whether the system is running properly.", ) async def get_status(): checks = { @@ -732,7 +765,6 @@ async def check_orion(): return {"status": status, "latency": latency, "latency_unit": "ms", "message": None if status else "Failed to connect"} except Exception as e: - latency = time.time() - start_time logging.error(f"Error checking Orion: {e}") return {"status": False, "latency": latency, "latency_unit": "ms", "message": str(e)} diff --git a/backend/gateway/gateway.py b/backend/gateway/gateway.py index 180c25b..0cfcb80 100644 --- a/backend/gateway/gateway.py +++ b/backend/gateway/gateway.py @@ -31,9 +31,6 @@ tls_context = None REDIS_URL = settings.REDIS_URL orion_url = settings.ORION_URL -service = settings.FIWARE_SERVICE -service_path = settings.FIWARE_SERVICEPATH - host = settings.POSTGRES_HOST user = settings.POSTGRES_USER password = settings.POSTGRES_PASSWORD @@ -169,6 +166,12 @@ async def process_mqtt_message( json.loads(message.decode("utf-8")) ) if len(parsed_result) > 0: + # use the fiware_service from the datapoint if it exists, + # otherwise use the default one + if datapoint["fiware_service"]: + service = datapoint["fiware_service"] + else: + service = settings.FIWARE_SERVICE value = parsed_result[0].value payload = { datapoint["attribute_name"]: value @@ -178,10 +181,9 @@ async def process_mqtt_message( await session.patch( url=f"{orion_url}/v2/entities/{datapoint['entity_id']}/attrs?type={datapoint['entity_type']}&options=keyValues", json=payload, - # TODO support other headers headers={ "fiware-service": service, - "fiware-servicepath": service_path, + "fiware-servicepath": settings.FIWARE_SERVICEPATH, }, ) logging.info(f"Sent {payload} to Orion Context Broker") @@ -265,7 +267,7 @@ async def get_datapoints_by_topic(self, topic: str): """ async with self.conn.transaction(): records = await self.conn.fetch( - "SELECT object_id, jsonpath, entity_id, entity_type, attribute_name FROM datapoints WHERE topic = $1", + "SELECT object_id, jsonpath, entity_id, entity_type, attribute_name, fiware_service FROM datapoints WHERE topic = $1", topic, ) return [dict(record) for record in records] diff --git a/frontend/src/components/Form.svelte b/frontend/src/components/Form.svelte index f48c0ed..2dde4ce 100644 --- a/frontend/src/components/Form.svelte +++ b/frontend/src/components/Form.svelte @@ -13,14 +13,20 @@ entity_id: null, entity_type: null, attribute_name: null, - connected: false + connected: false, + fiware_service: '', }; + let isMultiTenancy = false; // New state for multi-tenancy checkbox + // Reactive statement that updates whenever formState changes $: newDatapoint.set(formState as Datapoint); const handleSubmit = async (event: Event) => { event.preventDefault(); + if (!isMultiTenancy) { + formState.fiware_service = ''; // Clear fiware_service if multi-tenancy is not enabled + } try { await addData($newDatapoint); await refreshData(); // Refresh the data after adding a new datapoint @@ -33,8 +39,10 @@ entity_id: null, entity_type: null, attribute_name: null, - connected: false + connected: false, + fiware_service: '', }; + isMultiTenancy = false; // Reset the multi-tenancy checkbox } catch (e) { console.error('An error occurred while adding the data:', e); } @@ -53,6 +61,18 @@ + + +
Services status:
{#if systemStatus} -Orion
-Postgres
-Redis
+Orion
+Postgres
+Redis
{:else}Checking...
{/if}