Skip to content

Commit

Permalink
add spark df to log moved file; save that df as parquet in logs subdi…
Browse files Browse the repository at this point in the history
…rectory in ecr-rerun
  • Loading branch information
robertmitchellv committed Dec 1, 2023
1 parent b1cccdb commit d39ff84
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions scripts/Synapse/ReRunECRfromPostBundle.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,47 @@
"metadata": {},
"outputs": [],
"source": [
"import datetime\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import lit, current_timestamp\n",
"from notebookutils import mssparkutils\n",
"\n",
"spark = SparkSession.builder.getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "05f823b1-ca98-42b1-8723-262e2a984016",
"metadata": {},
"outputs": [],
"source": [
"spark = SparkSession.builder.getOrCreate()\n",
"\n",
"# source and destination paths\n",
"storage_account = \"$STORAGE_ACCOUNT\"\n",
"ecr_post_bundle_file_path = f\"abfss://bundle-snapshots@{storage_account}.dfs.core.windows.net/post/ecr\"\n",
"ecr_rerun_file_path = f\"abfss://source-data@{storage_account}.dfs.core.windows.net/ecr-rerun\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bdeb5ecc-6fd6-41c0-9174-d66fd727586f",
"metadata": {},
"outputs": [],
"source": [
"def move_file(src_path, dest_path):\n",
"ecr_rerun_file_path = f\"abfss://source-data@{storage_account}.dfs.core.windows.net/ecr-rerun\"\n",
"\n",
"# parquet log file: timestamp, filename, and destination path\n",
"timestamp_str = datetime.datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
"parquet_file_name = f\"moved_files_log_{timestamp_str}.parquet\"\n",
"parquet_file_path = f\"{ecr_rerun_file_path}/logs/{parquet_file_name}\"\n",
"\n",
"# dataframe to track moved files\n",
"moved_files_log = spark.createDataFrame([], schema=\"filename string, source_path string, dest_path string, timestamp string\")\n",
"\n",
"# get directory contents\n",
"files = mssparkutils.fs.ls(ecr_post_bundle_file_path)\n",
"for file in files:\n",
" src_path = file.path\n",
" dest_path = f\"{ecr_rerun_file_path}/{file.name}\"\n",
"\n",
" # move the files\n",
" mssparkutils.fs.mv(src=src_path, dest=dest_path, create_path=True)\n",
"\n",
"# Example usage\n",
"move_file(ecr_post_bundle_file_path, destination_path_for_ecr_post_bundle)"
" # log the file move\n",
" new_row = spark.createDataFrame([(file.name, src_path, dest_path, str(file.modificationTime))])\n",
" moved_files_log = moved_files_log.union(new_row)\n",
"\n",
"# add current timestamp\n",
"moved_files_log = moved_files_log.withColumn(\"log_timestamp\", current_timestamp())\n",
"\n",
"# write log to parquet\n",
"moved_files_log.write.mode(\"append\").parquet(parquet_file_path)\n",
"\n",
"# inspect log of moved files\n",
"moved_files_log.show()"
]
}
],
Expand Down

0 comments on commit d39ff84

Please sign in to comment.