Skip to content

Commit

Permalink
export loop
Browse files Browse the repository at this point in the history
  • Loading branch information
vbusson-pass committed Jan 31, 2025
1 parent 28ecf49 commit aa6a4ac
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
61 changes: 58 additions & 3 deletions jobs/etl_jobs/external/encrypted_exports/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,82 @@ def transfer(
table_list: str = typer.Option(..., help="String list of tables to encrypt"),
):
table_list = ast.literal_eval(table_list)

try:
s3_config = json.loads(target_bucket_config)
except json.JSONDecodeError:
# Method 2: Using ast.literal_eval() (safer than eval() for string to dict conversion)
try:
s3_config = ast.literal_eval(target_bucket_config)
except (ValueError, SyntaxError):
raise typer.BadParameter(f"Invalid config format: {target_bucket_config}")

print(f"Parsed configuration: {s3_config}")
session = boto3.session.Session()

# Create an S3 client for OVH
session = boto3.session.Session()
s3_client = boto3.client(
"s3",
aws_access_key_id=s3_config["target_acces_key"],
aws_access_key_id=s3_config["target_access_key"],
aws_secret_access_key=s3_config["target_secret_key"],
endpoint_url=s3_config["target_endpoint_url"],
region_name=s3_config["target_s3_region"],
config=Config(signature_version="s3v4"),
)

# Initialize GCS client
client = storage.Client()
bucket = client.get_bucket(gcs_bucket)

for i, table in enumerate(table_list):
# Define GCS paths
gcs_folder_path = (
f"{bucket_folder_gcs}/encrypted/{partner_name}/{export_date}/{table}"
)
print(f"gcs folder: {gcs_folder_path}")

# Convert iterator to a list to avoid reusing the same iterator
blobs = list(bucket.list_blobs(prefix=gcs_folder_path))
parquet_file_list = [
blob.name for blob in blobs if blob.name.endswith(".parquet")
]

assert len(parquet_file_list) != 0, f"{gcs_bucket}/{gcs_folder_path} is empty"

s3_prefix_export = f"{export_date}/{table}"

success_count = 0
fail_count = 0
for blob in blobs:
if not blob.name.endswith("/"): # Skip directories
# Create a stream for the GCS object
gcs_blob = bucket.blob(blob.name)
gcs_stream = io.BytesIO(gcs_blob.download_as_bytes())
if gcs_stream.getbuffer().nbytes == 0:
print(f"ERROR: {blob.name} is empty. Skipping upload.")
continue

s3_object_name = (
f"{s3_prefix_export}/{blob.name.split('/')[-1]}".lstrip("/")
)
try:
s3_client.upload_fileobj(
gcs_stream, s3_config["target_s3_name"], s3_object_name
) # ✅ Fixed bucket name
success_count += 1
except Exception as e:
fail_count += 1
print(f"Failed to upload {blob.name}: {e}")

total = success_count + fail_count
if success_count != 0:
print(
f"SUCCESS {table}: {success_count}/{total} files transferred to {s3_config['target_s3_name']}"
)
if fail_count != 0:
print(
f"FAIL {table}: {fail_count}/{total} files NOT transferred to {s3_config['target_s3_name']}"
)


if __name__ == "__main__":
run()
2 changes: 1 addition & 1 deletion orchestration/dags/jobs/dbt/dbt_encrypted_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def _create_tasks(self, **context):
# Execute the task
logging.info("")
logging.info(f"Exporting table {table_name}")
export_task.execute(context=context)
export_task


####################################################################################
Expand Down

0 comments on commit aa6a4ac

Please sign in to comment.