diff --git a/.gitignore b/.gitignore index 41eb114e..dddc40c1 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,7 @@ hs_err_pid* .venv .dev_venv .python-version +.ipynb_checkpoints # Azure Functions *local.settings.json diff --git a/scripts/Synapse/ReRunECRfromPostBundle.ipynb b/scripts/Synapse/ReRunECRfromPostBundle.ipynb new file mode 100644 index 00000000..50519fcc --- /dev/null +++ b/scripts/Synapse/ReRunECRfromPostBundle.ipynb @@ -0,0 +1,101 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "962a47d0-b2c4-4516-84c0-b947280645fe", + "metadata": {}, + "outputs": [], + "source": [ + "import datetime\n", + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import lit, current_timestamp\n", + "from notebookutils import mssparkutils\n", + "\n", + "spark = SparkSession.builder.getOrCreate()\n", + "\n", + "# source and destination paths\n", + "storage_account = \"$STORAGE_ACCOUNT\"\n", + "ecr_post_bundle_file_path = f\"abfss://bundle-snapshots@{storage_account}.dfs.core.windows.net/post/ecr\"\n", + "ecr_rerun_file_path = f\"abfss://source-data@{storage_account}.dfs.core.windows.net//ecr-rerun\"\n", + "\n", + "# parquet log file: timestamp, filename, and destination path\n", + "timestamp_str = datetime.datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n", + "parquet_file_name = f\"copied_files_log_{timestamp_str}.parquet\"\n", + "delta_tables = f\"abfss://delta-tables@{storage_account}.dfs.core.windows.net/\"\n", + "parquet_file_path = f\"{delta_tables}/ecr-rerun-logs/{parquet_file_name}\"\n", + "\n", + "# dataframe to track moved files\n", + "copied_files_log = spark.createDataFrame([], schema=\"filename string, source_path string, dest_path string, timestamp string, file_exists_skip boolean, success boolean\")\n", + "\n", + "# outer try/except for acessing list of file\n", + "# inner try/except for issues copying files and marking success or failure\n", + "try:\n", + " # get list of files\n", + " files = mssparkutils.fs.ls(ecr_post_bundle_file_path)\n", + "\n", + " for file in files:\n", + " # initialize 'success' flag\n", + " success = True\n", + " try:\n", + " src_path = file.path\n", + " dest_path = f\"{ecr_rerun_file_path}/{file.name}\"\n", + "\n", + " # capture the timestamp before copying the file\n", + " copy_timestamp = datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n", + "\n", + " # check if the file exists\n", + " file_exists = mssparkutils.fs.exists(dest_path)\n", + "\n", + " # copy the files if it doesn't exist\n", + " if not file_exists:\n", + " mssparkutils.fs.cp(src=src_path, dest=dest_path)\n", + " else:\n", + " # if the file already exists, set 'success' to false\n", + " success = False\n", + "\n", + " except Exception as e:\n", + " # if there's an error copying, set 'success' to false\n", + " success = False\n", + " print(f\"Error copying file {file.name}: {str(e)}\")\n", + "\n", + " # log the file copy\n", + " new_row = spark.createDataFrame([(file.name, src_path, dest_path, copy_timestamp, file_exists, success)])\n", + " copied_files_log = copied_files_log.union(new_row)\n", + "\n", + "except Exception as e:\n", + " print(f\"Error retrieving file list: {str(e)}\")\n", + " \n", + "# add current timestamp\n", + "copied_files_log = copied_files_log.withColumn(\"log_timestamp\", current_timestamp())\n", + "\n", + "# write log to parquet\n", + "copied_files_log.write.mode(\"append\").parquet(parquet_file_path)\n", + " \n", + "# inspect log of moved files\n", + "copied_files_log.show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}