Skip to content

Commit

Permalink
encryption step
Browse files Browse the repository at this point in the history
  • Loading branch information
vbusson-pass committed Jan 31, 2025
1 parent eedb5b2 commit caa53ab
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 53 deletions.
68 changes: 57 additions & 11 deletions jobs/etl_jobs/external/encrypted_exports/main.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,73 @@
import typer
import duckdb
import os
import logging
from typing import List
from google.cloud import storage


run = typer.Typer()


@run.command()
def encrypt(
partner_name: str,
source_s3_name: str,
source_export_folder: str,
encryption_key: str,
partner_name: str = typer.Option(..., help="Partner name"),
gcs_bucket: str = typer.Option(..., help="GCS bucket name"),
bucket_folder_gcs: str = typer.Option(..., help="Bucket folder in GCS"),
export_date: str = typer.Option(..., help="Export date"),
table_list: List[str] = typer.Option(..., help="List of tables to encrypt"),
encryption_key: str = typer.Option(..., help="Encryption key"),
):
pass
if type(table_list) == str:
table_list = eval(table_list)

# enforce key strength
assert len(encryption_key) == 32

# connect to source bucket
client = storage.Client()
bucket = client.get_bucket(gcs_bucket)

for table in table_list:
# initialize duckDB connextion
duckdb_conn = duckdb.connect(config={"threads": 1})
duckdb_conn.execute(f"PRAGMA add_parquet_key('key256', '{encryption_key}');")

gcs_folder_path = f"{bucket_folder_gcs}/{partner_name}/{table}"
blobs = bucket.list_blobs(prefix=gcs_folder_path)
parquet_file_list = [
blob.name for blob in blobs if blob.name.endswith(".parquet")
]
for i, parquet_file in enumerate(parquet_file_list):
gcs_encrypted_folder_path = (
f"{bucket_folder_gcs}/encrypted/{partner_name}/{export_date}/{table}"
)
local_file_path = f"{partner_name}/{table}/{parquet_file}"
blob = bucket.blob(parquet_file)
blob.download_to_filename(local_file_path)
encrypted_file_path = f"encrypted/{local_file_path}"
duckdb_conn.execute(
f"COPY (SELECT * FROM read_parquet('{local_file_path}')) TO '{encrypted_file_path}' (FORMAT 'parquet', ENCRYPTION_CONFIG {{footer_key: 'key256'}});"
)
encrypted_blob = bucket.blob(
f"{gcs_encrypted_folder_path}/{encrypted_file_path}"
)
encrypted_blob.upload_from_filename(encrypted_file_path)
os.remove(local_file_path)
os.remove(encrypted_file_path)
logging.info(
f"Succesfully encrypted table {table} in {1+i} parquet files on gs://{source_gcs_bucket}/{gcs_encrypted_folder_path} "
)


@run.command()
def transfer(
partner_name: str,
target_endpoint_url: str,
target_s3_name: str,
target_folder: str,
target_s3_region: str,
target_acces_key: str,
partner_name: str = typer.Option(..., help="Partner name"),
target_endpoint_url: str = typer.Option(..., help="Target endpoint URL"),
target_s3_name: str = typer.Option(..., help="Target S3 bucket name"),
target_folder: str = typer.Option(..., help="Target folder path"),
target_s3_region: str = typer.Option(..., help="Target S3 region"),
target_acces_key: str = typer.Option(..., help="Target access key"),
):
pass

Expand Down
109 changes: 67 additions & 42 deletions orchestration/dags/jobs/dbt/dbt_encrypted_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
InstallDependenciesOperator,
SSHGCEOperator,
StartGCEOperator,
StopGCEOperator,
)
from jobs.crons import ENCRYPTED_EXPORT_DICT

Expand All @@ -37,8 +36,9 @@
"project_id": GCP_PROJECT_ID,
}


GCE_INSTANCE = f"encrypted-export-{ENV_SHORT_NAME}"
BASE_PATH = "data-gcp/jobs/etl_jobs/external/ecrypted_exports"
BASE_PATH = "data-gcp/jobs/etl_jobs/external/encrypted_exports"
logs_bucket = "data-bucket-dev"
folder = "encrypted_exports"
PARQUET_STORAGE_GCS_BUCKET = "data-bucket-dev" # same for now
Expand Down Expand Up @@ -67,15 +67,36 @@ def gather_export_info(**kwargs):
partner_salt = access_secret_data(
GCP_PROJECT_ID, f"{partner_name}_unique_salt", default=None
)

if not partner_salt:
raise KeyError(f"No salt found for {partner_name} in Secret Manager.")
logging.info(f"partner salt: {partner_salt} ")

# get partner bucket infos:
target_s3_infos = get_json_from_gcs(
logs_bucket, f"{export_folder}/{partner_name}/next_exported_tables.json"
).get("partner_s3", {})

# Push results to XCom
kwargs["ti"].xcom_push(key="partner_name", value=partner_name)
kwargs["ti"].xcom_push(key="table_list", value=table_list)
kwargs["ti"].xcom_push(key="partner_salt", value=partner_salt)
kwargs["ti"].xcom_push(key="obfuscation_config", value=obfuscation_config)

kwargs["ti"].xcom_push(
key="target_endpoint_url", value=target_s3_infos.get("target_endpoint_url")
)
kwargs["ti"].xcom_push(
key="target_s3_name", value=target_s3_infos.get("target_s3_name")
)
kwargs["ti"].xcom_push(
key="target_folder", value=target_s3_infos.get("target_folder")
)
kwargs["ti"].xcom_push(
key="target_s3_region", value=target_s3_infos.get("target_s3_region")
)
kwargs["ti"].xcom_push(
key="target_acces_key", value=target_s3_infos.get("target_acces_key")
)
return table_list


Expand Down Expand Up @@ -136,15 +157,15 @@ def prepare_bash_env(**kwargs):

obfuscation_fields = obfuscation_config.get("obfuscated_fields", {})
obfuscation_fields_str = json.dumps(obfuscation_fields)
logging.info(f"obfuscation_fields sring: {obfuscation_fields_str}")
logging.info(f"obfuscation_fields string: {obfuscation_fields_str}")
# Push values back to XCom for BashOperator usage
ti.xcom_push(key="table_list_str", value=table_list_str)
ti.xcom_push(key="partner_salt", value=partner_salt)
ti.xcom_push(key="obfuscation_fields_str", value=obfuscation_fields_str)


def create_export_operator(
table_name: str, partner_name: str, task_group: TaskGroup, dag: DAG
table_name: str, partner_name: str, dag: DAG
) -> BigQueryToGCSOperator:
"""
Creates a single BigQueryToGCS export operator for a specific table.
Expand All @@ -159,15 +180,14 @@ def create_export_operator(
field_delimiter=",",
print_header=True,
dag=dag,
task_group=task_group,
)


class DynamicExportBulk:
def __init__(self, task_id: str, xcom_pull_task: str, xcom_pull_key: str, dag: DAG):
self.task_id = task_id
self.dag = dag
self.pull_task = xcom_pull_task
self.xcom_pull_task = xcom_pull_task
self.xcom_pull_key = xcom_pull_key

self.create_export_tasks = PythonOperator(
Expand All @@ -183,14 +203,16 @@ def _create_tasks(self, **context):
"""
ti = context["ti"]
table_list = ti.xcom_pull(task_ids=self.xcom_pull_task, key=self.xcom_pull_key)

partner_name = ti.xcom_pull(task_ids=self.xcom_pull_task, key="partner_name")
if not table_list:
raise ValueError(
f"No {self.xcom_pull_key} found in XCom {self.xcom_pull_task} task data"
)

for table_name in table_list:
export_task = create_export_operator(table_name=table_name, dag=self.dag)
export_task = create_export_operator(
table_name=table_name, partner_name=partner_name, dag=self.dag
)
# Execute the task
logging.info("")
logging.info(f"Exporting table {table_name}")
Expand All @@ -214,7 +236,9 @@ def _create_tasks(self, **context):
),
params={
"branch": Param(
default="production" if ENV_SHORT_NAME == "prod" else "master",
default="production"
if ENV_SHORT_NAME == "prod"
else "DE-1025-obfuscation",
type="string",
),
"target": Param(
Expand Down Expand Up @@ -292,52 +316,52 @@ def _create_tasks(self, **context):
)

gce_instance_start = StartGCEOperator(
instance_name=f"{GCE_INSTANCE}_{partner_name}", task_id="gce_start_task"
instance_name=f"{GCE_INSTANCE}-{partner_name}",
task_id="gce_start_task",
instance_type="n1-standard-1",
preemptible=True,
disk_size_gb=100,
)

fetch_install_code = InstallDependenciesOperator(
task_id="fetch_install_code",
instance_name=f"{GCE_INSTANCE}_{partner_name}",
instance_name=f"{GCE_INSTANCE}-{partner_name}",
branch="{{ params.branch }}",
python_version="3.10",
base_dir=BASE_PATH,
)

parquet_encryption = SSHGCEOperator(
task_id="parquet_encryption",
instance_name=f"{GCE_INSTANCE}_{partner_name}",
instance_name=f"{GCE_INSTANCE}-{partner_name}",
base_dir=BASE_PATH,
command=(
"python main.py encrypt "
"--partner_name {{ ti.xcom_pull(task_ids='export_logs', key='partner_name') }} "
f"--source_s3_name {PARQUET_STORAGE_GCS_BUCKET} "
"--source_export_folder test_exports "
"--encryption_key {{ ti.xcom_pull(task_ids='export_logs', key='encryption_key') }}"
),
command=f"""python main.py encrypt \
--partner-name "{{{{ ti.xcom_pull(task_ids='export_logs', key='partner_name') }}}}" \
--gcs-bucket "{PARQUET_STORAGE_GCS_BUCKET}" \
--bucket-folder-gcs "encrypted_exports" \
--export-date "{{{{ ti.xcom_pull(task_ids='export_logs', key='export_date') }}}}" \
--table-list "{{{{ ti.xcom_pull(task_ids='prepare_bash_env', key='table_list_str') }}}}" \
--encryption-key "{{{{ ti.xcom_pull(task_ids='export_logs', key='encryption_key') }}}}" """,
dag=dag,
provide_context=True,
)

parquet_transfer = SSHGCEOperator(
task_id="parquet_transfer",
instance_name=f"{GCE_INSTANCE}_{partner_name}",
base_dir=BASE_PATH,
command=(
"python main.py transfer "
"--partner_name {{ ti.xcom_pull(task_ids='export_logs', key='partner_name') }} "
"--target_endpoint_url "
"--target_s3_name "
"--target_folder "
"--target_s3_region "
"--target_acces_key "
),
dag=dag,
provide_context=True,
)

gce_instance_stop = StopGCEOperator(
task_id="gce_stop_task", instance_name=f"{GCE_INSTANCE}_{partner_name}"
)
# parquet_transfer = SSHGCEOperator(
# task_id="parquet_transfer",
# instance_name=f"{GCE_INSTANCE}-{partner_name}",
# base_dir=BASE_PATH,
# command="""python main.py transfer \
# --partner-name "{{{{ ti.xcom_pull(task_ids='export_logs', key='partner_name') }}}}" \
# --target-endpoint-url "{{{{ ti.xcom_pull(task_ids='retrieve_export_infos',key='target_endpoint_url' }}}}" \
# --target-s3-name "{{{{ ti.xcom_pull(task_ids='retrieve_export_infos',key='target_s3_name' }}}}" \
# --target-folder "{{{{ ti.xcom_pull(task_ids='retrieve_export_infos',key='target_folder' }}}}" \
# --target-s3-region "{{{{ ti.xcom_pull(task_ids='retrieve_export_infos',key='target_s3_region' }}}}" \
# --target-acces-key "{{{{ ti.xcom_pull(task_ids='retrieve_export_infos',key='target_acces_key' }}}}" """,
# dag=dag,
# )

# gce_instance_stop = StopGCEOperator(
# task_id="gce_stop_task", instance_name=f"{GCE_INSTANCE}-{partner_name}"
# )

(
start
Expand All @@ -350,6 +374,7 @@ def _create_tasks(self, **context):
>> gce_instance_start
>> fetch_install_code
>> parquet_encryption
>> parquet_transfer
# >> parquet_transfer
# >> gce_instance_stop
>> end
)

0 comments on commit caa53ab

Please sign in to comment.