-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pilotage: Partage du fichier import fluxIAE via nouveau bucket S3 #5249
Conversation
7f25046
to
dbf7051
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Je te propose un plan en 3 PR/étapes
Étape 1 - PR emplois : Téléversement du flux IAE
PR avec juste un commit qui ajoute (sans toucher à l'existant) une tâche cron qui copie les archives ZIP FluxIAE inexistantes du dossier asp_riae_shared_bucket
vers le bucket pilotage-datastore-les-emplois
.
Comme ça une fois fusionnée on aura les archives FluxIAE de disponibles dans le bucket pour tout le monde, tout le temps.
Étape 2 - PR pilotage : DAG flux IAE
Avec l'étape 1 de faite, la PR devient plus facilement testable car les fichiers sont disponible sur le S3, et aussi pas besoin de grosse configuration à faire pour la faire fonctionner sur notre local dev ou sur la prod.
Truc auquel je pense dès à présent c'est que c'est le DAG qui devra choisir l'archive à utiliser, je pense que tu peux reprendre la logique que j'ai faite pour l'import des EA/EATT et qui se base aussi sur une archive hebdomadaire.
Étape 3 - PR emplois : Nettoyage de populate_metabase_flux_iae
C'est grosso modo le commit initial de la PR actuelle où on vire tout ce qui lui est lié car maintenant c'est géré par un DAG.
dbf7051
to
167c62e
Compare
clevercloud/cron.json
Outdated
@@ -31,6 +31,7 @@ | |||
"0 0 * * 1 $ROOT/clevercloud/run_management_command.sh shorten_active_sessions", | |||
"0 2 * * 1 $ROOT/clevercloud/crons/populate_metabase_matomo.sh", | |||
"0 12 * * 1 $ROOT/clevercloud/run_management_command.sh import_ea_eatt --from-asp --wet-run", | |||
"0 12 * * 1 $ROOT/clevercloud/upload_to_pilotage.sh", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
12h chaque lundi - j'ai compris que c'est hebdomanaire mais j'imagine il faudra changer l'heure ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Je pense que midi c'est bien, on changera si on rencontre des problèmes.
A noter que l'heure système des machines Clever est en UTC et que l'ASP est en heure de métropole (CET/CEST suivant la saison) donc on devrait être bon, et avoir en plus de la marge :).
... Dec 2 11:14 fluxIAE_ITOU_20241202_115450609.tar.gz
... Dec 9 09:48 fluxIAE_ITOU_20241209_102749998.tar.gz
... Dec 16 09:51 fluxIAE_ITOU_20241216_103042036.tar.gz
clevercloud/upload_to_pilotage.sh
Outdated
exit 0 | ||
fi | ||
|
||
/bin/bash $ROOT/clevercloud/run_management_command.sh upload_data_to_pilotage "$FLUX_IAE_FILE" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Je n'ai pas testé sur prod. On peut faire un dry-run sur une recette jetable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(J'appele run_management_command.sh
depuis ce script pour minimiser la duplication de code)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Je n'ai pas testé sur prod. On peut faire un dry-run sur une recette jetable ?
Oui tu pourras tester sur une recette jetable, ceci devrais fonctionner :
- Tu te connectes en SSH sur ta recette jetable
- Puis
mkdir "$APP_HOME/asp_riae_shared_bucket && touch "$APP_HOME/asp_riae_shared_bucket/fluxIAE_ITOU_$(date '+%Y%m%d_%H%M%S').tar.gz"
pour créer un dummy file - Tu lances ton script et vérifie que le fichier est bien reçu dans le bucket S3
pilotage-datastorage-les-emplois
Pour run_management_command.sh
c'est bien tenté mais tu pourras pas trop empêcher la duplication de code, donc dans le cas actuel c'est presque préférable d'accepter cette duplication que de créer un mille-feuille de script s’appelant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merci pour l'astuce ! J'ai testé la commande et tout va bien
513ba00
to
95574d0
Compare
clevercloud/cron.json
Outdated
@@ -31,6 +31,7 @@ | |||
"0 0 * * 1 $ROOT/clevercloud/run_management_command.sh shorten_active_sessions", | |||
"0 2 * * 1 $ROOT/clevercloud/crons/populate_metabase_matomo.sh", | |||
"0 12 * * 1 $ROOT/clevercloud/run_management_command.sh import_ea_eatt --from-asp --wet-run", | |||
"0 12 * * 1 $ROOT/clevercloud/upload_to_pilotage.sh", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Je pense que midi c'est bien, on changera si on rencontre des problèmes.
A noter que l'heure système des machines Clever est en UTC et que l'ASP est en heure de métropole (CET/CEST suivant la saison) donc on devrait être bon, et avoir en plus de la marge :).
... Dec 2 11:14 fluxIAE_ITOU_20241202_115450609.tar.gz
... Dec 9 09:48 fluxIAE_ITOU_20241209_102749998.tar.gz
... Dec 16 09:51 fluxIAE_ITOU_20241216_103042036.tar.gz
filename = os.path.basename(filename) | ||
filepath = os.path.join("asp_riae_shared_bucket", filename) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sauf erreur de ma part, ces 2 lignes ne servent à rien, tu ne fait que recréer ce qui t'es passé en paramètre donc filename
filename = os.path.basename(filename) | |
filepath = os.path.join("asp_riae_shared_bucket", filename) |
Mais si tu veux garder filename
alors faut changer le nom du paramètre du script en filepath
vu que c'est ce qui est passé en argument :).
filename = os.path.basename(filename) | |
filepath = os.path.join("asp_riae_shared_bucket", filename) | |
filename = os.path.basename(filepath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Je prends le deuxième. J'avais l'intention de forcer l'utilisation du asp_riae_shared_bucket
, mais la deuxième solution marche pour moi 👍
clevercloud/upload_to_pilotage.sh
Outdated
exit 0 | ||
fi | ||
|
||
/bin/bash $ROOT/clevercloud/run_management_command.sh upload_data_to_pilotage "$FLUX_IAE_FILE" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Je n'ai pas testé sur prod. On peut faire un dry-run sur une recette jetable ?
Oui tu pourras tester sur une recette jetable, ceci devrais fonctionner :
- Tu te connectes en SSH sur ta recette jetable
- Puis
mkdir "$APP_HOME/asp_riae_shared_bucket && touch "$APP_HOME/asp_riae_shared_bucket/fluxIAE_ITOU_$(date '+%Y%m%d_%H%M%S').tar.gz"
pour créer un dummy file - Tu lances ton script et vérifie que le fichier est bien reçu dans le bucket S3
pilotage-datastorage-les-emplois
Pour run_management_command.sh
c'est bien tenté mais tu pourras pas trop empêcher la duplication de code, donc dans le cas actuel c'est presque préférable d'accepter cette duplication que de créer un mille-feuille de script s’appelant.
95574d0
to
187f561
Compare
🥁 La recette jetable est prête ! 👉 Je veux tester cette PR ! |
187f561
to
48cef10
Compare
48cef10
to
58c5edf
Compare
Pour info, suite à mes tests je viens de push des changements afin de :
|
Use a cronjob to share imported data with the pilotage via an S3 bucket.
58c5edf
to
9128832
Compare
parser.add_argument("--wet-run", dest="wet_run", action="store_true") | ||
|
||
def _upload_file(self, file: pathlib.Path, *, wet_run=False): | ||
lock = threading.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 🔒
Si j'ai bien compris ce lock protège contre le cas où log_progress
est plus lente que le téléversement des fichiers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
C'est pas tout à fait ça, c'est bien lié à une histoire de temps mais c'est plus de la synchronicité qu'une durée, même si tu as raison que plus log_progress
est rapide (et aussi une histoire de comment elle est codé) moins le problème est visible.
L'appel de la callback de .upload_file()
est asynchrone, sûrement pour ne pas bloquer le téléversement inutilement, mais donc on peux se retrouver avec de multiples appels en parallèle et vu qu'on a des variables non-locale on doit forcer la synchronicité lors de leur modification sinon leur état peux changer entre chaque ligne.
D'ailleurs dans l'exemple de la documentation le lock est mis par défaut : https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html#the-callback-parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Le variable lock
ici est locale au scope du fonction _upload_file
, on n'a pas besoin que ce soit self.lock
et définit dans __init__
comme dans l'example ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On pourrais mais pas besoin non, et vu que la variable n'est utilisée que dans ._upload_file()
autant la garder au plus proche de ses utilisations.
On aurais eu besoin d'une variable d'instance si log_progress()
avait été une méthode de classe, mais vu que là c'est une inner function c'est pas nécessaire ;).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, je vais le merger :) merci pour les explications !
progress = int((bytes_transferred / file_size) * 100) | ||
if progress > previous_progress and progress % 5 == 0: | ||
self.stdout.write( | ||
f"> {file.name}: {filesizeformat(bytes_transferred)}/{filesizeformat(file_size)} transferred ({progress}%)." # noqa: E501 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filesizeformat
est un nouveau filtre pour moi 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merci pour tes améliorations ! Quand prévoit-on le merger ? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merci pour tes améliorations ! Quand prévoit-on le merger ? :)
Quand tu veux :), je ne voulais pas fusionner sans que tu repasses sur mes modifications dans le cas où j'aurais raté un truc.
parser.add_argument("--wet-run", dest="wet_run", action="store_true") | ||
|
||
def _upload_file(self, file: pathlib.Path, *, wet_run=False): | ||
lock = threading.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
C'est pas tout à fait ça, c'est bien lié à une histoire de temps mais c'est plus de la synchronicité qu'une durée, même si tu as raison que plus log_progress
est rapide (et aussi une histoire de comment elle est codé) moins le problème est visible.
L'appel de la callback de .upload_file()
est asynchrone, sûrement pour ne pas bloquer le téléversement inutilement, mais donc on peux se retrouver avec de multiples appels en parallèle et vu qu'on a des variables non-locale on doit forcer la synchronicité lors de leur modification sinon leur état peux changer entre chaque ligne.
D'ailleurs dans l'exemple de la documentation le lock est mis par défaut : https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html#the-callback-parameter
🤔 Pourquoi ?
On remplace la commande
populate_metabase_flux_iae
avec un DAG géré au côté Pilotage. Avec ce PR, on supprime ce cher processus en le remplaçant avec une commande plus simple à téléversé l'archive vers un S3 accessible par le Pilotage.🚨 À vérifier