Skip to content

Commit

Permalink
refactor(organization): switch to one-bucket-per-org policy (#336)
Browse files Browse the repository at this point in the history
* feat: first commit for Site table creation

* fix: error when launching backend

* fix: sites tests

* fix mypy

* fix tests

* fix: add site_id in user table

* feat: implement new security behavior using site_id

* feat : add CRUD endpoints for the site path + tests

* refactor: Site -> Organization

* feat: refactor security behavior

* feat: fix tests

* fix e2e tests

* fix client tests

* fix "role" column

* fix: remove useless security in case of create_detection

* fix: resolve first comments

* fix error in detection endpoint

* take feedback into account

* feat: add crud function to avoid for loop

* feedback PR

* fix lint

* fix linting

* feat: add acknowledged endpoint

* feat: no need to have warning level of error

* feat: use is_wildfire instead of a new boolean

* refactor fetch_all crud

* fix: rm Exception

* fix mypy

* feat: start implemnting new payload from_date

* fir error date -> datetime

* fix unlabeled endpoints

* feat: add localization in Detection table

* fix: forgot update client function for bbox

* fix: many errors in localization usage

* feat: rm Exception as e

* refactor: rm Optional

* fix: style

* feat: add regexp

* localization can be equal to []

* fix error after rebase

* fix: many errors in localization usage

* refactor: rm Optional

* fix: style

* feat: add regexp

* fix: many errors in localization usage

* feat: add localization in Detection table

* fix: many errors in localization usage

* feat: use a bucket per organiztion

* feat: update diagramme

* fix: some errors

* feat: use id instead of name for bucket_name

* fix miss function params

* feat: automatic creation of the S3 bucket

* fix the diagramm

* fix: linting

* fix after merge

* fix: don't create bucket in Dection, better in Organization

* fix error when using ADMIN scope

* fix lint

* use global var for admin name

* some fixes after rebase

* fix test storage

* Add localization in Detection table (#342)

* feat: add localization in Detection table

* fix: forgot update client function for bbox

* fix: many errors in localization usage

* feat: rm Exception as e

* refactor: rm Optional

* fix: style

* feat: add regexp

* localization can be equal to []

* fix error after rebase

---------

Co-authored-by: Ronan <[email protected]>

* Send url with detection (#346)

* fix: many errors in localization usage

* refactor: rm Optional

* fix: style

* fix: many errors in localization usage

* feat: use a bucket per organiztion

* fix miss function params

* feat: automatic creation of the S3 bucket

* fix after merge

* fix: don't create bucket in Dection, better in Organization

* fix: many errors in localization usage

* feat: use a bucket per organiztion

* feat: use id instead of name for bucket_name

* fix miss function params

* fix: don't create bucket in Dection, better in Organization

* feat: sendback tuple

* fix lint

* fix error whencreating orga

* fix localization

* fix localization

* linting

* we lost a fix during rebase

* fix test

* fix after rebase

* fix after merge

---------

Co-authored-by: Ronan <[email protected]>

* clean up comments

* feat: refactor fecth_all function

* clean up comments

* fix typing

* fix error in delete bucket

* revert(organization): revert unnecessary changes

* fix(detections): fix unlabeled detections

* fix(organizations): clean org creation

* revert(migration): remove alembic op

* refactor(storage): update S3 management

* refactor(storage): update s3

* style(detections): remove unused import

* style(mypy): fix typing

* fix(storage): fix async issues

* test(storage): fix async troubles

* fix(detections): fix interaction with s3

* test(conftest): update config

* test(conftest): fixing init

* fix(docker): fix docker orchestration

---------

Co-authored-by: Ronan <[email protected]>
Co-authored-by: F-G Fernandez <[email protected]>
  • Loading branch information
3 people authored Aug 24, 2024
1 parent 7441114 commit 3f7b5f9
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 91 deletions.
1 change: 0 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ S3_ACCESS_KEY='na'
S3_SECRET_KEY='na'
S3_REGION='us-east-1'
S3_ENDPOINT_URL='http://localstack:4566'
S3_BUCKET_NAME=bucket

# Initialization
SUPERADMIN_LOGIN='pyroadmin'
Expand Down
1 change: 0 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ None :)
- `S3_REGION`: your S3 bucket is geographically identified by its location's region
- `S3_ENDPOINT_URL`: the URL providing a S3 endpoint by your cloud provider
- `S3_PROXY_URL`: the url of the proxy to hide the real s3 url behind, do not use proxy if ""
- `S3_BUCKET_NAME`: the name of the storage bucket

#### Production-only values
- `ACME_EMAIL`: the email linked to your certificate for HTTPS
Expand Down
2 changes: 1 addition & 1 deletion client/pyroclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#################
# DETECTIONS
#################
"detections-create": "/detections",
"detections-create": "/detections/",
"detections-label": "/detections/{det_id}/label",
"detections-fetch": "/detections",
"detections-fetch-unl": "/detections/unlabeled/fromdate",
Expand Down
5 changes: 2 additions & 3 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ services:
volumes:
- ./scripts/localstack:/etc/localstack/init/ready.d
healthcheck:
test: ["CMD-SHELL", "awslocal --endpoint-url=http://localhost:4566 s3 ls s3://bucket"]
test: ["CMD-SHELL", "awslocal --endpoint-url=http://localhost:4566 s3 ls s3://admin"]
interval: 10s
timeout: 5s
retries: 10
Expand All @@ -46,12 +46,11 @@ services:
- POSTGRES_URL=postgresql+asyncpg://dummy_pg_user:dummy_pg_pwd@db/dummy_pg_db
- SUPERADMIN_LOGIN=superadmin_login
- SUPERADMIN_PWD=superadmin_pwd
- SUPERADMIN_ORG=superadmin_org
- SUPERADMIN_ORG=admin
- JWT_SECRET=${JWT_SECRET}
- SUPPORT_EMAIL=${SUPPORT_EMAIL}
- DEBUG=true
- SQLALCHEMY_SILENCE_UBER_WARNING=1
- S3_BUCKET_NAME=bucket
- S3_ENDPOINT_URL=http://localstack:4566
- S3_ACCESS_KEY=fake
- S3_SECRET_KEY=fake
Expand Down
3 changes: 1 addition & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ services:
- ./scripts/localstack:/etc/localstack/init/ready.d
- localstack_data:/tmp/localstack
healthcheck:
test: ["CMD-SHELL", "awslocal --endpoint-url=http://localhost:4566 s3 ls s3://bucket"]
test: ["CMD-SHELL", "awslocal --endpoint-url=http://localhost:4566 s3 ls s3://admin"]
interval: 10s
timeout: 5s
retries: 10
Expand All @@ -58,7 +58,6 @@ services:
- DEBUG=true
- PROMETHEUS_ENABLED=true
- SQLALCHEMY_SILENCE_UBER_WARNING=1
- S3_BUCKET_NAME=${S3_BUCKET_NAME:-bucket}
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://localstack:4566}
- S3_ACCESS_KEY=${S3_ACCESS_KEY:-na}
- S3_SECRET_KEY=${S3_SECRET_KEY:-na}
Expand Down
1 change: 0 additions & 1 deletion scripts/dbdiagram.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

Enum "userrole" {
"admin"
"agent"
Expand Down
4 changes: 2 additions & 2 deletions scripts/localstack/setup-s3.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env bash
awslocal s3 mb s3://bucket
awslocal s3 mb s3://admin
echo -n "" > my_file
awslocal s3 cp my_file s3://bucket/my_file
awslocal s3 cp my_file s3://admin/my_file
38 changes: 24 additions & 14 deletions src/app/api/api_v1/endpoints/detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://www.apache.org/licenses/LICENSE-2.0> for full license details.

import asyncio
import hashlib
import logging
from datetime import datetime
from mimetypes import guess_extension
from typing import List, cast
Expand All @@ -25,7 +25,7 @@
DetectionWithUrl,
)
from app.schemas.login import TokenPayload
from app.services.storage import s3_bucket
from app.services.storage import s3_service
from app.services.telemetry import telemetry_client

router = APIRouter()
Expand Down Expand Up @@ -67,15 +67,19 @@ async def create_detection(
bucket_key = f"{token_payload.sub}-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{sha_hash[:8]}{extension}"
# Reset byte position of the file (cf. https://fastapi.tiangolo.com/tutorial/request-files/#uploadfile)
await file.seek(0)
# Failed upload
if not (await s3_bucket.upload_file(bucket_key, file.file)): # type: ignore[arg-type]
bucket_name = s3_service.resolve_bucket_name(token_payload.organization_id)
bucket = s3_service.get_bucket(bucket_name)
# Upload the file
if not bucket.upload_file(bucket_key, file.file): # type: ignore[arg-type]
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed upload")
logging.info(f"File uploaded to bucket {bucket_name} with key {bucket_key}.")

# Data integrity check
file_meta = await s3_bucket.get_file_metadata(bucket_key)
file_meta = bucket.get_file_metadata(bucket_key)
# Corrupted file
if md5_hash != file_meta["ETag"].replace('"', ""):
# Delete the corrupted upload
await s3_bucket.delete_file(bucket_key)
bucket.delete_file(bucket_key)
# Raise the exception
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -119,13 +123,16 @@ async def get_detection_url(
detection = cast(Detection, await detections.get(detection_id, strict=True))

if UserRole.ADMIN in token_payload.scopes:
return DetectionUrl(url=await s3_bucket.get_public_url(detection.bucket_key))
camera = cast(Camera, await cameras.get(detection.camera_id, strict=True))
bucket = s3_service.get_bucket(s3_service.resolve_bucket_name(camera.organization_id))
return DetectionUrl(url=bucket.get_public_url(detection.bucket_key))

camera = cast(Camera, await cameras.get(detection.camera_id, strict=True))
if token_payload.organization_id != camera.organization_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access forbidden.")
# Check in bucket
return DetectionUrl(url=await s3_bucket.get_public_url(detection.bucket_key))
bucket = s3_service.get_bucket(s3_service.resolve_bucket_name(camera.organization_id))
return DetectionUrl(url=bucket.get_public_url(detection.bucket_key))


@router.get("/", status_code=status.HTTP_200_OK, summary="Fetch all the detections")
Expand Down Expand Up @@ -153,8 +160,10 @@ async def fetch_unlabeled_detections(
) -> List[DetectionWithUrl]:
telemetry_client.capture(token_payload.sub, event="unacknowledged-fetch")

async def get_url(detection: Detection) -> str:
return await s3_bucket.get_public_url(detection.bucket_key)
bucket = s3_service.get_bucket(s3_service.resolve_bucket_name(token_payload.organization_id))

def get_url(detection: Detection) -> str:
return bucket.get_public_url(detection.bucket_key)

if UserRole.ADMIN in token_payload.scopes:
all_unck_detections = await detections.fetch_all(
Expand All @@ -168,9 +177,7 @@ async def get_url(detection: Detection) -> str:
inequality_pair=("created_at", ">=", from_date),
)

# Launch all get_url calls in parallel
url_tasks = [get_url(detection) for detection in all_unck_detections]
urls = await asyncio.gather(*url_tasks)
urls = (get_url(detection) for detection in all_unck_detections)

return [DetectionWithUrl(**detection.model_dump(), url=url) for detection, url in zip(all_unck_detections, urls)]

Expand Down Expand Up @@ -199,9 +206,12 @@ async def label_detection(
async def delete_detection(
detection_id: int = Path(..., gt=0),
detections: DetectionCRUD = Depends(get_detection_crud),
cameras: CameraCRUD = Depends(get_camera_crud),
token_payload: TokenPayload = Security(get_jwt, scopes=[UserRole.ADMIN]),
) -> None:
telemetry_client.capture(token_payload.sub, event="detections-deletion", properties={"detection_id": detection_id})
detection = cast(Detection, await detections.get(detection_id, strict=True))
await s3_bucket.delete_file(detection.bucket_key)
camera = cast(Camera, await cameras.get(detection.camera_id, strict=True))
bucket = s3_service.get_bucket(s3_service.resolve_bucket_name(camera.organization_id))
bucket.delete_file(detection.bucket_key)
await detections.delete(detection_id)
14 changes: 12 additions & 2 deletions src/app/api/api_v1/endpoints/organizations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

from typing import List, cast

from fastapi import APIRouter, Depends, Path, Security, status
from fastapi import APIRouter, Depends, HTTPException, Path, Security, status

from app.api.dependencies import get_jwt, get_organization_crud
from app.crud import OrganizationCRUD
from app.models import Organization, UserRole
from app.schemas.login import TokenPayload
from app.schemas.organizations import OrganizationCreate
from app.services.storage import s3_service
from app.services.telemetry import telemetry_client

router = APIRouter()
Expand All @@ -27,7 +28,13 @@ async def register_organization(
telemetry_client.capture(
token_payload.sub, event="organization-create", properties={"organization_name": payload.name}
)
return await organizations.create(payload)
organization = await organizations.create(payload)
bucket_name = s3_service.resolve_bucket_name(organization.id)
if not s3_service.create_bucket(bucket_name):
# Delete the organization if the bucket creation failed
await organizations.delete(organization.id)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create bucket")
return organization


@router.get(
Expand Down Expand Up @@ -62,4 +69,7 @@ async def delete_organization(
telemetry_client.capture(
token_payload.sub, event="organizations-deletion", properties={"organization_id": organization_id}
)
bucket_name = s3_service.resolve_bucket_name(organization_id)
if not (await s3_service.delete_bucket(bucket_name)):
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create bucket")
await organizations.delete(organization_id)
1 change: 0 additions & 1 deletion src/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def sqlachmey_uri(cls, v: str) -> str:
)

# Storage
S3_BUCKET_NAME: str = os.environ["S3_BUCKET_NAME"]
S3_ACCESS_KEY: str = os.environ["S3_ACCESS_KEY"]
S3_SECRET_KEY: str = os.environ["S3_SECRET_KEY"]
S3_REGION: str = os.environ["S3_REGION"]
Expand Down
4 changes: 4 additions & 0 deletions src/app/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from app.core.config import settings
from app.core.security import hash_password
from app.models import Organization, User, UserRole
from app.services.storage import s3_service

__all__ = ["get_session", "init_db"]

Expand All @@ -34,6 +35,7 @@ async def init_db() -> None:
async with AsyncSession(engine) as session:
logger.info("Initializing PostgreSQL database...")

# Create the superadmin organization
statement = select(Organization).where(Organization.name == settings.SUPERADMIN_ORG) # type: ignore[var-annotated]
results = await session.execute(statement=statement)
organization = results.scalar_one_or_none()
Expand All @@ -45,6 +47,8 @@ async def init_db() -> None:
organization_id = new_orga.id
else:
organization_id = organization.id
# Create the bucket
s3_service.create_bucket(s3_service.resolve_bucket_name(organization_id))

# Check if admin exists
statement = select(User).where(User.login == settings.SUPERADMIN_LOGIN)
Expand Down
Loading

0 comments on commit 3f7b5f9

Please sign in to comment.