-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pilotage: Partage du fichier import fluxIAE via nouveau bucket S3 #5249
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
78 changes: 78 additions & 0 deletions
78
itou/metabase/management/commands/upload_data_to_pilotage.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
""" | ||
The FluxIAE file contains data used by les emplois and is uploaded to us directly by a supporting organization. | ||
The same file is also parsed by the Pilotage, shared via an S3 bucket. | ||
|
||
This command uploads the file from where it has been stored to the S3 bucket for sharing. | ||
""" | ||
|
||
import pathlib | ||
import threading | ||
from pathlib import Path | ||
|
||
from django.conf import settings | ||
from django.template.defaultfilters import filesizeformat | ||
|
||
from itou.utils.command import BaseCommand | ||
from itou.utils.storage.s3 import pilotage_s3_client | ||
|
||
|
||
class Command(BaseCommand): | ||
help = "Upload FluxIAE to S3 for sharing." | ||
|
||
FILENAME_PREFIX = "fluxIAE_ITOU_" | ||
DATASTORE_DIRECTORY = "flux-iae/" | ||
|
||
def add_arguments(self, parser): | ||
parser.add_argument("directory", type=Path, help="Directory containing FluxIAE files") | ||
parser.add_argument("--wet-run", dest="wet_run", action="store_true") | ||
|
||
def _upload_file(self, file: pathlib.Path, *, wet_run=False): | ||
lock = threading.Lock() | ||
file_size = file.stat().st_size | ||
bytes_transferred = 0 | ||
previous_progress = 0 | ||
|
||
def log_progress(chunk_size): | ||
"""Logs to console or logs the progress of byte transfer""" | ||
nonlocal bytes_transferred | ||
nonlocal previous_progress | ||
|
||
with lock: | ||
bytes_transferred += chunk_size | ||
progress = int((bytes_transferred / file_size) * 100) | ||
if progress > previous_progress and progress % 5 == 0: | ||
self.stdout.write( | ||
f"> {file.name}: {filesizeformat(bytes_transferred)}/{filesizeformat(file_size)} transferred ({progress}%)." # noqa: E501 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
) | ||
previous_progress = progress | ||
|
||
if wet_run: | ||
pilotage_s3_client().upload_file( | ||
Filename=file.absolute(), | ||
Bucket=settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME, | ||
Key=f"{self.DATASTORE_DIRECTORY}{file.name}", | ||
Callback=log_progress, | ||
) | ||
|
||
def handle(self, *, directory: pathlib.Path, wet_run, **options): | ||
client = pilotage_s3_client() | ||
response = client.list_objects_v2( | ||
Bucket=settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME, | ||
Prefix=self.DATASTORE_DIRECTORY, | ||
) | ||
datastore_files = set() | ||
if response["KeyCount"]: | ||
datastore_files.update( | ||
metadata["Key"].replace(self.DATASTORE_DIRECTORY, "") for metadata in response["Contents"] | ||
) | ||
self.stdout.write(f"Files in datastore's {self.DATASTORE_DIRECTORY!r}: {sorted(datastore_files)}") | ||
|
||
local_files = set(file.name for file in directory.glob(f"{self.FILENAME_PREFIX}*.tar.gz")) | ||
self.stdout.write(f"Files in local's {directory.name!r}: {sorted(local_files)}") | ||
|
||
files_to_upload = local_files - datastore_files | ||
self.stdout.write(f"Files to upload: {sorted(files_to_upload)}") | ||
|
||
for filename in files_to_upload: | ||
self.stdout.write(f"Uploading {filename!r}...") | ||
self._upload_file(directory / filename, wet_run=wet_run) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
11 changes: 11 additions & 0 deletions
11
tests/metabase/management/__snapshots__/test_upload_data_to_pilotage.ambr
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# serializer version: 1 | ||
# name: test_command_call | ||
''' | ||
Files in datastore's 'flux-iae/': [] | ||
Files in local's 'test_command_call0': ['fluxIAE_ITOU_.tar.gz'] | ||
Files to upload: ['fluxIAE_ITOU_.tar.gz'] | ||
Uploading 'fluxIAE_ITOU_.tar.gz'... | ||
> fluxIAE_ITOU_.tar.gz: 11 octets/11 octets transferred (100%). | ||
|
||
''' | ||
# --- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import io | ||
|
||
import pytest | ||
from django.core.management import call_command | ||
from django.utils import timezone | ||
|
||
from itou.utils.storage.s3 import pilotage_s3_client | ||
|
||
|
||
@pytest.mark.usefixtures("temporary_bucket") | ||
def test_command_call(tmp_path, snapshot): | ||
tmp_path.joinpath("fluxIAE_ITOU_.tar.gz").write_bytes(b"Hello World") | ||
stdout = io.StringIO() | ||
|
||
call_command("upload_data_to_pilotage", tmp_path.as_posix(), "--wet-run", stdout=stdout) | ||
assert stdout.getvalue() == snapshot | ||
|
||
|
||
@pytest.mark.usefixtures("temporary_bucket") | ||
def test_command_idempotence(tmp_path, settings): | ||
print("BUCKET", settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME) | ||
filename = f"fluxIAE_ITOU_{timezone.now():%Y%m%d_%H%M%S}.tar.gz" | ||
tmp_path.joinpath(filename).touch() | ||
|
||
call_command("upload_data_to_pilotage", tmp_path.as_posix(), "--wet-run") | ||
response = pilotage_s3_client().list_objects_v2( | ||
Bucket=settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME, | ||
) | ||
assert response["KeyCount"] == 1 | ||
assert response["Contents"][0]["Key"] == f"flux-iae/{filename}" | ||
modified_at = response["Contents"][0]["LastModified"] | ||
|
||
call_command("upload_data_to_pilotage", tmp_path.as_posix(), "--wet-run") | ||
response = pilotage_s3_client().list_objects_v2( | ||
Bucket=settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME, | ||
) | ||
assert response["KeyCount"] == 1 | ||
assert response["Contents"][0]["Key"] == f"flux-iae/{filename}" | ||
assert response["Contents"][0]["LastModified"] == modified_at |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 🔒
Si j'ai bien compris ce lock protège contre le cas où
log_progress
est plus lente que le téléversement des fichiersThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
C'est pas tout à fait ça, c'est bien lié à une histoire de temps mais c'est plus de la synchronicité qu'une durée, même si tu as raison que plus
log_progress
est rapide (et aussi une histoire de comment elle est codé) moins le problème est visible.L'appel de la callback de
.upload_file()
est asynchrone, sûrement pour ne pas bloquer le téléversement inutilement, mais donc on peux se retrouver avec de multiples appels en parallèle et vu qu'on a des variables non-locale on doit forcer la synchronicité lors de leur modification sinon leur état peux changer entre chaque ligne.D'ailleurs dans l'exemple de la documentation le lock est mis par défaut : https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html#the-callback-parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Le variable
lock
ici est locale au scope du fonction_upload_file
, on n'a pas besoin que ce soitself.lock
et définit dans__init__
comme dans l'example ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On pourrais mais pas besoin non, et vu que la variable n'est utilisée que dans
._upload_file()
autant la garder au plus proche de ses utilisations.On aurais eu besoin d'une variable d'instance si
log_progress()
avait été une méthode de classe, mais vu que là c'est une inner function c'est pas nécessaire ;).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, je vais le merger :) merci pour les explications !