From 0f2049096b39f1d1d24149e05fc158416a4eb077 Mon Sep 17 00:00:00 2001 From: vbusson-pass <142802218+vbusson-pass@users.noreply.github.com> Date: Fri, 31 Jan 2025 02:42:36 +0100 Subject: [PATCH] increase vm memory --- .../external/encrypted_exports/main.py | 31 +++++++++++-------- .../dags/jobs/dbt/dbt_encrypted_export.py | 15 ++++++--- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/jobs/etl_jobs/external/encrypted_exports/main.py b/jobs/etl_jobs/external/encrypted_exports/main.py index ba154a1b8b..e333c955a6 100644 --- a/jobs/etl_jobs/external/encrypted_exports/main.py +++ b/jobs/etl_jobs/external/encrypted_exports/main.py @@ -5,6 +5,7 @@ import logging from typing import List from google.cloud import storage +from pathlib import Path run = typer.Typer() @@ -16,15 +17,12 @@ def encrypt( gcs_bucket: str = typer.Option(..., help="GCS bucket name"), bucket_folder_gcs: str = typer.Option(..., help="Bucket folder in GCS"), export_date: str = typer.Option(..., help="Export date"), - table_list: List[str] = typer.Option(..., help="List of tables to encrypt"), + table_list: str = typer.Option(..., help="String list of tables to encrypt"), encryption_key: str = typer.Option(..., help="Encryption key"), ): - logging.info(f"{type(table)}, {table_list}") - if type(table_list) == str: - table_list = ast.literal_eval(table_list) - + table_list = ast.literal_eval(table_list) # enforce key strength - assert len(encryption_key) == 32 + assert len(encryption_key) == 32, "Encryption key must be a string of 32 integers" # connect to source bucket client = storage.Client() @@ -32,19 +30,25 @@ def encrypt( for table in table_list: # initialize duckDB connextion - duckdb_conn = duckdb.connect(config={"threads": 1}) + duckdb_conn = duckdb.connect(config={"threads": 2}) duckdb_conn.execute(f"PRAGMA add_parquet_key('key256', '{encryption_key}');") - gcs_folder_path = f"{bucket_folder_gcs}/{partner_name}/{table}" + gcs_folder_path = f"{bucket_folder_gcs}/{partner_name}/{export_date}/{table}" + local_folder = f"{bucket_folder_gcs}/{partner_name}" + Path(local_folder).mkdir(parents=True, exist_ok=True) + blobs = bucket.list_blobs(prefix=gcs_folder_path) parquet_file_list = [ blob.name for blob in blobs if blob.name.endswith(".parquet") ] + assert len(parquet_file_list) != 0, f"{gcs_bucket}/{gcs_folder_path} is empty" + for i, parquet_file in enumerate(parquet_file_list): + print(f"{parquet_file}") gcs_encrypted_folder_path = ( f"{bucket_folder_gcs}/encrypted/{partner_name}/{export_date}/{table}" ) - local_file_path = f"{partner_name}/{table}/{parquet_file}" + local_file_path = f"{local_folder}/{parquet_file}" blob = bucket.blob(parquet_file) blob.download_to_filename(local_file_path) encrypted_file_path = f"encrypted/{local_file_path}" @@ -55,11 +59,12 @@ def encrypt( f"{gcs_encrypted_folder_path}/{encrypted_file_path}" ) encrypted_blob.upload_from_filename(encrypted_file_path) - os.remove(local_file_path) - os.remove(encrypted_file_path) - logging.info( - f"Succesfully encrypted table {table} in {1+i} parquet files on gs://{gcs_bucket}/{gcs_encrypted_folder_path} " + # os.remove(local_file_path) + # os.remove(encrypted_file_path) + print( + f"Table {table} succesfully encrypted in {1+i} files -> gs://{gcs_bucket}/{gcs_encrypted_folder_path} " ) + duckdb_conn.close() @run.command() diff --git a/orchestration/dags/jobs/dbt/dbt_encrypted_export.py b/orchestration/dags/jobs/dbt/dbt_encrypted_export.py index fa29173d15..6338ae0884 100644 --- a/orchestration/dags/jobs/dbt/dbt_encrypted_export.py +++ b/orchestration/dags/jobs/dbt/dbt_encrypted_export.py @@ -146,6 +146,7 @@ def prepare_bash_env(**kwargs): "obfuscation_config" ) logging.info(f"obfuscation_config: {obfuscation_config}") + if not table_list: raise ValueError("No tables found for export.") @@ -165,7 +166,7 @@ def prepare_bash_env(**kwargs): def create_export_operator( - table_name: str, partner_name: str, dag: DAG + table_name: str, partner_name: str, export_date: str, dag: DAG ) -> BigQueryToGCSOperator: """ Creates a single BigQueryToGCS export operator for a specific table. @@ -174,7 +175,7 @@ def create_export_operator( task_id=f"export_bq_to_gcs_{table_name}", source_project_dataset_table=f"{GCP_PROJECT_ID}.export_{partner_name}.{table_name}", destination_cloud_storage_uris=[ - f"gs://{PARQUET_STORAGE_GCS_BUCKET}/test_exports/{partner_name}/{table_name}/*.parquet" + f"gs://{PARQUET_STORAGE_GCS_BUCKET}/{folder}/encrypted/{partner_name}/{export_date}/{table_name}/*.parquet" ], export_format="PARQUET", field_delimiter=",", @@ -204,6 +205,7 @@ def _create_tasks(self, **context): ti = context["ti"] table_list = ti.xcom_pull(task_ids=self.xcom_pull_task, key=self.xcom_pull_key) partner_name = ti.xcom_pull(task_ids=self.xcom_pull_task, key="partner_name") + export_date = ti.xcom_pull(task_ids="export_logs", key="export_date") if not table_list: raise ValueError( f"No {self.xcom_pull_key} found in XCom {self.xcom_pull_task} task data" @@ -211,7 +213,10 @@ def _create_tasks(self, **context): for table_name in table_list: export_task = create_export_operator( - table_name=table_name, partner_name=partner_name, dag=self.dag + table_name=table_name, + partner_name=partner_name, + export_date=export_date, + dag=self.dag, ) # Execute the task logging.info("") @@ -318,7 +323,7 @@ def _create_tasks(self, **context): gce_instance_start = StartGCEOperator( instance_name=f"{GCE_INSTANCE}-{partner_name}", task_id="gce_start_task", - instance_type="n1-standard-1", + instance_type="n1-highmem-2", preemptible=True, disk_size_gb=100, ) @@ -340,7 +345,7 @@ def _create_tasks(self, **context): --gcs-bucket "{PARQUET_STORAGE_GCS_BUCKET}" \ --bucket-folder-gcs "encrypted_exports" \ --export-date "{{{{ ti.xcom_pull(task_ids='export_logs', key='export_date') }}}}" \ - --table-list "{{{{ ti.xcom_pull(task_ids='prepare_bash_env', key='table_list_str') }}}}" \ + --table-list '{{{{ ti.xcom_pull(task_ids='prepare_bash_env', key='table_list_str') }}}}' \ --encryption-key "{{{{ ti.xcom_pull(task_ids='export_logs', key='encryption_key') }}}}" """, dag=dag, )