diff --git a/scripts/Synapse/ReRunECRfromPostBundle.ipynb b/scripts/Synapse/ReRunECRfromPostBundle.ipynb index 7ac3a968..23fc7ef1 100644 --- a/scripts/Synapse/ReRunECRfromPostBundle.ipynb +++ b/scripts/Synapse/ReRunECRfromPostBundle.ipynb @@ -7,36 +7,47 @@ "metadata": {}, "outputs": [], "source": [ + "import datetime\n", "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import lit, current_timestamp\n", "from notebookutils import mssparkutils\n", "\n", - "spark = SparkSession.builder.getOrCreate()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "05f823b1-ca98-42b1-8723-262e2a984016", - "metadata": {}, - "outputs": [], - "source": [ + "spark = SparkSession.builder.getOrCreate()\n", + "\n", + "# source and destination paths\n", "storage_account = \"$STORAGE_ACCOUNT\"\n", "ecr_post_bundle_file_path = f\"abfss://bundle-snapshots@{storage_account}.dfs.core.windows.net/post/ecr\"\n", - "ecr_rerun_file_path = f\"abfss://source-data@{storage_account}.dfs.core.windows.net/ecr-rerun\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "bdeb5ecc-6fd6-41c0-9174-d66fd727586f", - "metadata": {}, - "outputs": [], - "source": [ - "def move_file(src_path, dest_path):\n", + "ecr_rerun_file_path = f\"abfss://source-data@{storage_account}.dfs.core.windows.net/ecr-rerun\"\n", + "\n", + "# parquet log file: timestamp, filename, and destination path\n", + "timestamp_str = datetime.datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n", + "parquet_file_name = f\"moved_files_log_{timestamp_str}.parquet\"\n", + "parquet_file_path = f\"{ecr_rerun_file_path}/logs/{parquet_file_name}\"\n", + "\n", + "# dataframe to track moved files\n", + "moved_files_log = spark.createDataFrame([], schema=\"filename string, source_path string, dest_path string, timestamp string\")\n", + "\n", + "# get directory contents\n", + "files = mssparkutils.fs.ls(ecr_post_bundle_file_path)\n", + "for file in files:\n", + " src_path = file.path\n", + " dest_path = f\"{ecr_rerun_file_path}/{file.name}\"\n", + "\n", + " # move the files\n", " mssparkutils.fs.mv(src=src_path, dest=dest_path, create_path=True)\n", "\n", - "# Example usage\n", - "move_file(ecr_post_bundle_file_path, destination_path_for_ecr_post_bundle)" + " # log the file move\n", + " new_row = spark.createDataFrame([(file.name, src_path, dest_path, str(file.modificationTime))])\n", + " moved_files_log = moved_files_log.union(new_row)\n", + "\n", + "# add current timestamp\n", + "moved_files_log = moved_files_log.withColumn(\"log_timestamp\", current_timestamp())\n", + "\n", + "# write log to parquet\n", + "moved_files_log.write.mode(\"append\").parquet(parquet_file_path)\n", + "\n", + "# inspect log of moved files\n", + "moved_files_log.show()" ] } ],