Skip to content

Commit

Permalink
increase vm memory
Browse files Browse the repository at this point in the history
  • Loading branch information
vbusson-pass committed Jan 31, 2025
1 parent c87423c commit 0f20490
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
31 changes: 18 additions & 13 deletions jobs/etl_jobs/external/encrypted_exports/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
from typing import List
from google.cloud import storage
from pathlib import Path


run = typer.Typer()
Expand All @@ -16,35 +17,38 @@ def encrypt(
gcs_bucket: str = typer.Option(..., help="GCS bucket name"),
bucket_folder_gcs: str = typer.Option(..., help="Bucket folder in GCS"),
export_date: str = typer.Option(..., help="Export date"),
table_list: List[str] = typer.Option(..., help="List of tables to encrypt"),
table_list: str = typer.Option(..., help="String list of tables to encrypt"),
encryption_key: str = typer.Option(..., help="Encryption key"),
):
logging.info(f"{type(table)}, {table_list}")
if type(table_list) == str:
table_list = ast.literal_eval(table_list)

table_list = ast.literal_eval(table_list)
# enforce key strength
assert len(encryption_key) == 32
assert len(encryption_key) == 32, "Encryption key must be a string of 32 integers"

# connect to source bucket
client = storage.Client()
bucket = client.get_bucket(gcs_bucket)

for table in table_list:
# initialize duckDB connextion
duckdb_conn = duckdb.connect(config={"threads": 1})
duckdb_conn = duckdb.connect(config={"threads": 2})
duckdb_conn.execute(f"PRAGMA add_parquet_key('key256', '{encryption_key}');")

gcs_folder_path = f"{bucket_folder_gcs}/{partner_name}/{table}"
gcs_folder_path = f"{bucket_folder_gcs}/{partner_name}/{export_date}/{table}"
local_folder = f"{bucket_folder_gcs}/{partner_name}"
Path(local_folder).mkdir(parents=True, exist_ok=True)

blobs = bucket.list_blobs(prefix=gcs_folder_path)
parquet_file_list = [
blob.name for blob in blobs if blob.name.endswith(".parquet")
]
assert len(parquet_file_list) != 0, f"{gcs_bucket}/{gcs_folder_path} is empty"

for i, parquet_file in enumerate(parquet_file_list):
print(f"{parquet_file}")
gcs_encrypted_folder_path = (
f"{bucket_folder_gcs}/encrypted/{partner_name}/{export_date}/{table}"
)
local_file_path = f"{partner_name}/{table}/{parquet_file}"
local_file_path = f"{local_folder}/{parquet_file}"
blob = bucket.blob(parquet_file)
blob.download_to_filename(local_file_path)
encrypted_file_path = f"encrypted/{local_file_path}"
Expand All @@ -55,11 +59,12 @@ def encrypt(
f"{gcs_encrypted_folder_path}/{encrypted_file_path}"
)
encrypted_blob.upload_from_filename(encrypted_file_path)
os.remove(local_file_path)
os.remove(encrypted_file_path)
logging.info(
f"Succesfully encrypted table {table} in {1+i} parquet files on gs://{gcs_bucket}/{gcs_encrypted_folder_path} "
# os.remove(local_file_path)
# os.remove(encrypted_file_path)
print(
f"Table {table} succesfully encrypted in {1+i} files -> gs://{gcs_bucket}/{gcs_encrypted_folder_path} "
)
duckdb_conn.close()


@run.command()
Expand Down
15 changes: 10 additions & 5 deletions orchestration/dags/jobs/dbt/dbt_encrypted_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def prepare_bash_env(**kwargs):
"obfuscation_config"
)
logging.info(f"obfuscation_config: {obfuscation_config}")

if not table_list:
raise ValueError("No tables found for export.")

Expand All @@ -165,7 +166,7 @@ def prepare_bash_env(**kwargs):


def create_export_operator(
table_name: str, partner_name: str, dag: DAG
table_name: str, partner_name: str, export_date: str, dag: DAG
) -> BigQueryToGCSOperator:
"""
Creates a single BigQueryToGCS export operator for a specific table.
Expand All @@ -174,7 +175,7 @@ def create_export_operator(
task_id=f"export_bq_to_gcs_{table_name}",
source_project_dataset_table=f"{GCP_PROJECT_ID}.export_{partner_name}.{table_name}",
destination_cloud_storage_uris=[
f"gs://{PARQUET_STORAGE_GCS_BUCKET}/test_exports/{partner_name}/{table_name}/*.parquet"
f"gs://{PARQUET_STORAGE_GCS_BUCKET}/{folder}/encrypted/{partner_name}/{export_date}/{table_name}/*.parquet"
],
export_format="PARQUET",
field_delimiter=",",
Expand Down Expand Up @@ -204,14 +205,18 @@ def _create_tasks(self, **context):
ti = context["ti"]
table_list = ti.xcom_pull(task_ids=self.xcom_pull_task, key=self.xcom_pull_key)
partner_name = ti.xcom_pull(task_ids=self.xcom_pull_task, key="partner_name")
export_date = ti.xcom_pull(task_ids="export_logs", key="export_date")
if not table_list:
raise ValueError(
f"No {self.xcom_pull_key} found in XCom {self.xcom_pull_task} task data"
)

for table_name in table_list:
export_task = create_export_operator(
table_name=table_name, partner_name=partner_name, dag=self.dag
table_name=table_name,
partner_name=partner_name,
export_date=export_date,
dag=self.dag,
)
# Execute the task
logging.info("")
Expand Down Expand Up @@ -318,7 +323,7 @@ def _create_tasks(self, **context):
gce_instance_start = StartGCEOperator(
instance_name=f"{GCE_INSTANCE}-{partner_name}",
task_id="gce_start_task",
instance_type="n1-standard-1",
instance_type="n1-highmem-2",
preemptible=True,
disk_size_gb=100,
)
Expand All @@ -340,7 +345,7 @@ def _create_tasks(self, **context):
--gcs-bucket "{PARQUET_STORAGE_GCS_BUCKET}" \
--bucket-folder-gcs "encrypted_exports" \
--export-date "{{{{ ti.xcom_pull(task_ids='export_logs', key='export_date') }}}}" \
--table-list "{{{{ ti.xcom_pull(task_ids='prepare_bash_env', key='table_list_str') }}}}" \
--table-list '{{{{ ti.xcom_pull(task_ids='prepare_bash_env', key='table_list_str') }}}}' \
--encryption-key "{{{{ ti.xcom_pull(task_ids='export_logs', key='encryption_key') }}}}" """,
dag=dag,
)
Expand Down

0 comments on commit 0f20490

Please sign in to comment.