Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pilotage: Partage du fichier import fluxIAE via nouveau bucket S3 #5249

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clevercloud/cron.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"5 23 * * * $ROOT/clevercloud/run_management_command.sh archive_employee_records --wet-run",
"20 23 * * * $ROOT/clevercloud/run_management_command.sh archive_job_applications",

"0 9-12 * * 1 $ROOT/clevercloud/run_management_command.sh upload_data_to_pilotage asp_riae_shared_bucket/",
"0 0 * * 1 $ROOT/clevercloud/run_management_command.sh shorten_active_sessions",
"0 2 * * 1 $ROOT/clevercloud/crons/populate_metabase_matomo.sh",
"0 12 * * 1 $ROOT/clevercloud/run_management_command.sh import_ea_eatt --from-asp --wet-run",
Expand Down
7 changes: 7 additions & 0 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,13 @@
AWS_S3_ACCESS_KEY_ID = os.getenv("CELLAR_ADDON_KEY_ID")
AWS_S3_SECRET_ACCESS_KEY = os.getenv("CELLAR_ADDON_KEY_SECRET")
AWS_STORAGE_BUCKET_NAME = os.getenv("S3_STORAGE_BUCKET_NAME")

# S3 store for communicating with the Pilotage.
PILOTAGE_DATASTORE_S3_ENDPOINT_URL = os.getenv("PILOTAGE_DATASTORE_S3_ENDPOINT_URL")
PILOTAGE_DATASTORE_S3_ACCESS_KEY = os.getenv("PILOTAGE_DATASTORE_S3_ACCESS_KEY")
PILOTAGE_DATASTORE_S3_SECRET_KEY = os.getenv("PILOTAGE_DATASTORE_S3_SECRET_KEY")
PILOTAGE_DATASTORE_S3_BUCKET_NAME = os.getenv("PILOTAGE_DATASTORE_S3_BUCKET_NAME")

# The maximum amount of memory (in bytes) a file can take up before being rolled over into a temporary file on disk.
# Picked 5 MB, the max size for a resume. Keep it fast for files under that size, and avoid filling up the RAM.
AWS_S3_MAX_MEMORY_SIZE = 5 * 1024 * 1024
Expand Down
5 changes: 5 additions & 0 deletions config/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
AWS_S3_SECRET_ACCESS_KEY = "minioadmin"
AWS_STORAGE_BUCKET_NAME = "tests"

PILOTAGE_DATASTORE_S3_ENDPOINT_URL = AWS_S3_ENDPOINT_URL
PILOTAGE_DATASTORE_S3_ACCESS_KEY = AWS_S3_ACCESS_KEY_ID
PILOTAGE_DATASTORE_S3_SECRET_KEY = AWS_S3_SECRET_ACCESS_KEY
PILOTAGE_DATASTORE_S3_BUCKET_NAME = AWS_STORAGE_BUCKET_NAME

API_DATADOG_API_KEY = "abcde"
API_DATADOG_APPLICATION_KEY = "fghij"

Expand Down
78 changes: 78 additions & 0 deletions itou/metabase/management/commands/upload_data_to_pilotage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
The FluxIAE file contains data used by les emplois and is uploaded to us directly by a supporting organization.
The same file is also parsed by the Pilotage, shared via an S3 bucket.

This command uploads the file from where it has been stored to the S3 bucket for sharing.
"""

import pathlib
import threading
from pathlib import Path

from django.conf import settings
from django.template.defaultfilters import filesizeformat

from itou.utils.command import BaseCommand
from itou.utils.storage.s3 import pilotage_s3_client


class Command(BaseCommand):
help = "Upload FluxIAE to S3 for sharing."

FILENAME_PREFIX = "fluxIAE_ITOU_"
DATASTORE_DIRECTORY = "flux-iae/"

def add_arguments(self, parser):
parser.add_argument("directory", type=Path, help="Directory containing FluxIAE files")
parser.add_argument("--wet-run", dest="wet_run", action="store_true")

def _upload_file(self, file: pathlib.Path, *, wet_run=False):
lock = threading.Lock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 🔒
Si j'ai bien compris ce lock protège contre le cas où log_progress est plus lente que le téléversement des fichiers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C'est pas tout à fait ça, c'est bien lié à une histoire de temps mais c'est plus de la synchronicité qu'une durée, même si tu as raison que plus log_progress est rapide (et aussi une histoire de comment elle est codé) moins le problème est visible.
L'appel de la callback de .upload_file() est asynchrone, sûrement pour ne pas bloquer le téléversement inutilement, mais donc on peux se retrouver avec de multiples appels en parallèle et vu qu'on a des variables non-locale on doit forcer la synchronicité lors de leur modification sinon leur état peux changer entre chaque ligne.
D'ailleurs dans l'exemple de la documentation le lock est mis par défaut : https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html#the-callback-parameter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Le variable lock ici est locale au scope du fonction _upload_file, on n'a pas besoin que ce soit self.lock et définit dans __init__ comme dans l'example ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On pourrais mais pas besoin non, et vu que la variable n'est utilisée que dans ._upload_file() autant la garder au plus proche de ses utilisations.
On aurais eu besoin d'une variable d'instance si log_progress() avait été une méthode de classe, mais vu que là c'est une inner function c'est pas nécessaire ;).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, je vais le merger :) merci pour les explications !

file_size = file.stat().st_size
bytes_transferred = 0
previous_progress = 0

def log_progress(chunk_size):
"""Logs to console or logs the progress of byte transfer"""
nonlocal bytes_transferred
nonlocal previous_progress

with lock:
bytes_transferred += chunk_size
progress = int((bytes_transferred / file_size) * 100)
if progress > previous_progress and progress % 5 == 0:
self.stdout.write(
f"> {file.name}: {filesizeformat(bytes_transferred)}/{filesizeformat(file_size)} transferred ({progress}%)." # noqa: E501
Copy link
Contributor Author

@calummackervoy calummackervoy Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filesizeformat est un nouveau filtre pour moi 👍

)
previous_progress = progress

if wet_run:
pilotage_s3_client().upload_file(
Filename=file.absolute(),
Bucket=settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME,
Key=f"{self.DATASTORE_DIRECTORY}{file.name}",
Callback=log_progress,
)

def handle(self, *, directory: pathlib.Path, wet_run, **options):
client = pilotage_s3_client()
response = client.list_objects_v2(
Bucket=settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME,
Prefix=self.DATASTORE_DIRECTORY,
)
datastore_files = set()
if response["KeyCount"]:
datastore_files.update(
metadata["Key"].replace(self.DATASTORE_DIRECTORY, "") for metadata in response["Contents"]
)
self.stdout.write(f"Files in datastore's {self.DATASTORE_DIRECTORY!r}: {sorted(datastore_files)}")

local_files = set(file.name for file in directory.glob(f"{self.FILENAME_PREFIX}*.tar.gz"))
self.stdout.write(f"Files in local's {directory.name!r}: {sorted(local_files)}")

files_to_upload = local_files - datastore_files
self.stdout.write(f"Files to upload: {sorted(files_to_upload)}")

for filename in files_to_upload:
self.stdout.write(f"Uploading {filename!r}...")
self._upload_file(directory / filename, wet_run=wet_run)
10 changes: 10 additions & 0 deletions itou/utils/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ def s3_client():
)


def pilotage_s3_client():
"""There is an S3 bucket dedicated to sharing files with Pilotage"""
return boto3.client(
"s3",
endpoint_url=settings.PILOTAGE_DATASTORE_S3_ENDPOINT_URL,
aws_access_key_id=settings.PILOTAGE_DATASTORE_S3_ACCESS_KEY,
aws_secret_access_key=settings.PILOTAGE_DATASTORE_S3_SECRET_KEY,
)


class PublicStorage(S3Boto3Storage):
# Not using the S3StaticStorage backend to ensure the listdir() operation remains forbidden.
# Don’t sign URLs, objects are public.
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ def failing_cache():

@pytest.fixture
def temporary_bucket():
with override_settings(AWS_STORAGE_BUCKET_NAME=f"tests-{uuid.uuid4()}"):
bucket_name = f"tests-{uuid.uuid4()}"
with override_settings(AWS_STORAGE_BUCKET_NAME=bucket_name, PILOTAGE_DATASTORE_S3_BUCKET_NAME=bucket_name):
call_command("configure_bucket")
yield
client = s3_client()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# serializer version: 1
# name: test_command_call
'''
Files in datastore's 'flux-iae/': []
Files in local's 'test_command_call0': ['fluxIAE_ITOU_.tar.gz']
Files to upload: ['fluxIAE_ITOU_.tar.gz']
Uploading 'fluxIAE_ITOU_.tar.gz'...
> fluxIAE_ITOU_.tar.gz: 11 octets/11 octets transferred (100%).

'''
# ---
39 changes: 39 additions & 0 deletions tests/metabase/management/test_upload_data_to_pilotage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import io

import pytest
from django.core.management import call_command
from django.utils import timezone

from itou.utils.storage.s3 import pilotage_s3_client


@pytest.mark.usefixtures("temporary_bucket")
def test_command_call(tmp_path, snapshot):
tmp_path.joinpath("fluxIAE_ITOU_.tar.gz").write_bytes(b"Hello World")
stdout = io.StringIO()

call_command("upload_data_to_pilotage", tmp_path.as_posix(), "--wet-run", stdout=stdout)
assert stdout.getvalue() == snapshot


@pytest.mark.usefixtures("temporary_bucket")
def test_command_idempotence(tmp_path, settings):
print("BUCKET", settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME)
filename = f"fluxIAE_ITOU_{timezone.now():%Y%m%d_%H%M%S}.tar.gz"
tmp_path.joinpath(filename).touch()

call_command("upload_data_to_pilotage", tmp_path.as_posix(), "--wet-run")
response = pilotage_s3_client().list_objects_v2(
Bucket=settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME,
)
assert response["KeyCount"] == 1
assert response["Contents"][0]["Key"] == f"flux-iae/{filename}"
modified_at = response["Contents"][0]["LastModified"]

call_command("upload_data_to_pilotage", tmp_path.as_posix(), "--wet-run")
response = pilotage_s3_client().list_objects_v2(
Bucket=settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME,
)
assert response["KeyCount"] == 1
assert response["Contents"][0]["Key"] == f"flux-iae/{filename}"
assert response["Contents"][0]["LastModified"] == modified_at
Loading