From 3ce1b0e2ad26df4fb8a2886201c6cf8d23ea9207 Mon Sep 17 00:00:00 2001 From: Weiqiang Date: Tue, 24 Jan 2023 13:58:37 -0800 Subject: [PATCH] delete v2 workflow --- kubeflow/workflow_v2.ipynb | 2359 ------------------------------------ 1 file changed, 2359 deletions(-) delete mode 100644 kubeflow/workflow_v2.ipynb diff --git a/kubeflow/workflow_v2.ipynb b/kubeflow/workflow_v2.ipynb deleted file mode 100644 index 3265b9f..0000000 --- a/kubeflow/workflow_v2.ipynb +++ /dev/null @@ -1,2359 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Earthquake Detection Workflow\n", - "\n", - "## Outline\n", - "\n", - "Here we show an example of the current modules in QuakeFlow\n", - "\n", - "1. Download data using Obpsy:\n", - "\n", - " [FDSN web service client for ObsPy](https://docs.obspy.org/packages/obspy.clients.fdsn.html#module-obspy.clients.fdsn)\n", - " \n", - " [Mass Downloader for FDSN Compliant Web Services](https://docs.obspy.org/packages/autogen/obspy.clients.fdsn.mass_downloader.html#module-obspy.clients.fdsn.mass_downloader)\n", - "\n", - "2. PhaseNet for picking P/S phases\n", - "\n", - " Find more details in [PhaseNet github page](https://wayneweiqiang.github.io/PhaseNet/)\n", - "\n", - "3. GaMMA for associating picking and estimate approximate location and magnitude\n", - "\n", - " Find more details in [GaMMA github page](https://wayneweiqiang.github.io/GMMA/)\n", - "\n", - "4. Earthquake location, magnitude estimation, etc. (to be continued)\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## 1. Install [miniconda](https://docs.conda.io/en/latest/miniconda.html) and download packages" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:29.845680Z", - "iopub.status.busy": "2021-07-23T07:24:29.845272Z", - "iopub.status.idle": "2021-07-23T07:24:29.922435Z", - "shell.execute_reply": "2021-07-23T07:24:29.917867Z", - "shell.execute_reply.started": "2021-07-23T07:24:29.845649Z" - }, - "tags": [] - }, - "source": [ - "\n", - "```bash\n", - "git clone https://github.com/wayneweiqiang/PhaseNet.git\n", - "git clone https://github.com/wayneweiqiang/GaMMA.git\n", - "conda env update -f=env.yml -n base\n", - "```\n", - "\n", - "**Second option: install to quakeflow environment, but need to select jupyter notebook kernel to quakflow**\n", - "```bash\n", - "conda env create -f=env.yml -n quakeflow\n", - "python -m ipykernel install --user --name=quakeflow\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": 816, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:29.932092Z", - "iopub.status.busy": "2021-07-23T07:24:29.931653Z", - "iopub.status.idle": "2021-07-23T07:24:31.371242Z", - "shell.execute_reply": "2021-07-23T07:24:31.370340Z", - "shell.execute_reply.started": "2021-07-23T07:24:29.932059Z" - } - }, - "outputs": [], - "source": [ - "import warnings\n", - "\n", - "import kfp\n", - "import kfp.dsl as dsl\n", - "import kfp.components as comp\n", - "from kfp.components import InputPath, OutputPath\n", - "\n", - "warnings.filterwarnings(\"ignore\")\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## 2. Set configurations" - ] - }, - { - "cell_type": "code", - "execution_count": 817, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:31.373260Z", - "iopub.status.busy": "2021-07-23T07:24:31.373027Z", - "iopub.status.idle": "2021-07-23T07:24:31.410541Z", - "shell.execute_reply": "2021-07-23T07:24:31.409735Z", - "shell.execute_reply.started": "2021-07-23T07:24:31.373232Z" - } - }, - "outputs": [], - "source": [ - "import os\n", - "import matplotlib\n", - "\n", - "# matplotlib.use(\"agg\")\n", - "import matplotlib.pyplot as plt\n", - "\n", - "region_name = \"Ridgecrest_demo\"\n", - "# region_name = \"Ridgecrest_oneweek\"\n", - "# region_name = \"SaltonSea\"\n", - "# region_name = \"Ridgecrest\"\n", - "# region_name = \"SanSimeon\"\n", - "# region_name = \"Italy\"\n", - "# region_name = \"PNSN\"\n", - "# region_name = \"Hawaii\"\n", - "# region_name = \"Hawaii_202111_to_202205\"\n", - "# region_name = \"PuertoRico\"\n", - "# region_name = \"SmithValley\"\n", - "# region_name = \"Antilles\"\n", - "# region_name = \"Test\"\n", - "dir_name = region_name\n", - "if not os.path.exists(dir_name):\n", - " os.mkdir(dir_name)\n", - "root_dir = lambda x: os.path.join(dir_name, x)\n", - "\n", - "run_local = False\n" - ] - }, - { - "cell_type": "code", - "execution_count": 818, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:31.412031Z", - "iopub.status.busy": "2021-07-23T07:24:31.411820Z", - "iopub.status.idle": "2021-07-23T07:24:31.463023Z", - "shell.execute_reply": "2021-07-23T07:24:31.462151Z", - "shell.execute_reply.started": "2021-07-23T07:24:31.412008Z" - } - }, - "outputs": [], - "source": [ - "def set_config(\n", - " index_json: OutputPath(\"json\"),\n", - " config_json: OutputPath(\"json\"),\n", - " datetime_json: OutputPath(\"json\"),\n", - " num_parallel: int = 1,\n", - ") -> list:\n", - "\n", - " import obspy\n", - " import os\n", - " import pickle\n", - " import datetime\n", - " import numpy as np\n", - " import json\n", - "\n", - " pi = 3.1415926\n", - " degree2km = pi * 6371 / 180\n", - "\n", - " region_name = \"Ridgecrest_demo\"\n", - " center = (-117.504, 35.705)\n", - " horizontal_degree = 1.0\n", - " vertical_degree = 1.0\n", - " starttime = obspy.UTCDateTime(\"2019-07-04T17\")\n", - " endtime = obspy.UTCDateTime(\"2019-07-04T19\")\n", - " client = \"SCEDC\"\n", - " network_list = [\"CI\"]\n", - " channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - " # region_name = \"Ridgecrest_oneweek\"\n", - " # center = (-117.504, 35.705)\n", - " # horizontal_degree = 1.0\n", - " # vertical_degree = 1.0\n", - " # starttime = obspy.UTCDateTime(\"2019-07-04T00\")\n", - " # endtime = obspy.UTCDateTime(\"2019-07-10T00\")\n", - " # client = \"SCEDC\"\n", - " # network_list = [\"CI\"]\n", - " # channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - " # region_name = \"Test\"\n", - " # center = (-119.296, 34.443)\n", - " # horizontal_degree = 1.0\n", - " # vertical_degree = 1.0\n", - " # starttime = obspy.UTCDateTime(\"2022-02-25T00\")\n", - " # endtime = obspy.UTCDateTime(\"2022-03-02T22\")\n", - " # client = \"SCEDC\"\n", - " # network_list = [\"CI\"]\n", - " # channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - " # region_name = \"Ridgecrest\"\n", - " # center = (-117.504, 35.705)\n", - " # horizontal_degree = 1.0\n", - " # vertical_degree = 1.0\n", - " # starttime = obspy.UTCDateTime(\"2019-07-04T17\")\n", - " # endtime = obspy.UTCDateTime(\"2019-07-05T00\")\n", - " # # starttime = obspy.UTCDateTime(\"2019-07-04T00\")\n", - " # # endtime = obspy.UTCDateTime(\"2019-07-11T00\")\n", - " # client = \"SCEDC\"\n", - " # network_list = [\"CI\"]\n", - " # channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - " # region_name = \"Hawaii\"\n", - " # center = (-155.32, 19.39)\n", - " # horizontal_degree = 2.0\n", - " # vertical_degree = 2.0\n", - " # starttime = obspy.UTCDateTime(\"2021-04-01T00\")\n", - " # endtime = obspy.UTCDateTime(\"2021-11-01T00\")\n", - " # client = \"IRIS\"\n", - " # network_list = [\"HV\", \"PT\"]\n", - " # channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - "\n", - " # region_name = \"Hawaii_202111_to_202205\"\n", - " # center = (-155.32, 19.39)\n", - " # horizontal_degree = 2.0\n", - " # vertical_degree = 2.0\n", - " # starttime = obspy.UTCDateTime(\"2021-11-01T00\")\n", - " # endtime = obspy.UTCDateTime(\"2022-05-01T00\")\n", - " # client = \"IRIS\"\n", - " # network_list = [\"HV\", \"PT\"]\n", - " # channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - " # region_name = \"PuertoRico\"\n", - " # center = (-66.5, 18)\n", - " # horizontal_degree = 3.0\n", - " # vertical_degree = 2.0\n", - " # # starttime = obspy.UTCDateTime(\"2020-01-07T00\")\n", - " # # endtime = obspy.UTCDateTime(\"2020-01-14T00\")\n", - " # starttime = obspy.UTCDateTime(\"2018-05-01T00\")\n", - " # endtime = obspy.UTCDateTime(\"2021-11-01T00\")\n", - " # client = \"IRIS\"\n", - " # network_list = [\"*\"]\n", - " # # channel_list = \"HH*,BH*,EH*,HN*\"\n", - " # # channel_list = \"HH*,BH*,HN*\"\n", - " # channel_list = \"HH*,BH*,HN*\"\n", - "\n", - " # region_name = \"SaltonSea\"\n", - " # center = (-115.53, 32.98)\n", - " # horizontal_degree = 1.0\n", - " # vertical_degree = 1.0\n", - " # starttime = obspy.UTCDateTime(\"2020-10-01T00\")\n", - " # endtime = obspy.UTCDateTime(\"2020-10-01T02\")\n", - " # client = \"SCEDC\"\n", - " # network_list = [\"CI\"]\n", - " # channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - " # region_name = \"2003SanSimeon\"\n", - " # center = (-121.101, 35.701)\n", - " # horizontal_degree = 1.0\n", - " # vertical_degree = 1.0\n", - " # starttime = obspy.UTCDateTime(\"2003-12-22T00\")\n", - " # endtime = obspy.UTCDateTime(\"2003-12-24T00\")\n", - " # client = \"NCEDC\"\n", - " # network_list = [\"*\"]\n", - " # channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - " # region_name = \"Italy\"\n", - " # center = (13.188, 42.723)\n", - " # horizontal_degree = 1.0\n", - " # vertical_degree = 1.0\n", - " # starttime = obspy.UTCDateTime(\"2016-08-24T00\")\n", - " # endtime = obspy.UTCDateTime(\"2016-08-26T00\")\n", - " # client = \"INGV\"\n", - " # network_list = [\"*\"]\n", - " # channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - " # region_name = \"SmithValley\"\n", - " # center = (-119.5, 38.51)\n", - " # horizontal_degree = 1.0\n", - " # vertical_degree = 1.0\n", - " # starttime = obspy.UTCDateTime(\"2021-07-08T00:00\")\n", - " # endtime = obspy.UTCDateTime(\"2021-07-16T00:00\")\n", - " # client = \"NCEDC\"\n", - " # network_list = [\"*\"]\n", - " # channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - " # region_name = \"Antilles\"\n", - " # center = (-61.14867, 14.79683)\n", - " # horizontal_degree = 0.2\n", - " # vertical_degree = 0.2\n", - " # starttime = obspy.UTCDateTime(\"2021-04-10T00\")\n", - " # endtime = obspy.UTCDateTime(\"2021-04-15T00\")\n", - " # client = \"RESIF\"\n", - " # network_list = [\"*\"]\n", - " # channel_list = \"HH*,BH*,EH*,HN*\"\n", - "\n", - " ####### save config ########\n", - " config = {}\n", - " config[\"region\"] = region_name\n", - " config[\"center\"] = center\n", - " config[\"xlim_degree\"] = [\n", - " center[0] - horizontal_degree / 2,\n", - " center[0] + horizontal_degree / 2,\n", - " ]\n", - " config[\"ylim_degree\"] = [\n", - " center[1] - vertical_degree / 2,\n", - " center[1] + vertical_degree / 2,\n", - " ]\n", - " config[\"degree2km\"] = degree2km\n", - " config[\"starttime\"] = starttime.datetime.isoformat()\n", - " config[\"endtime\"] = endtime.datetime.isoformat()\n", - " config[\"networks\"] = network_list\n", - " config[\"channels\"] = channel_list\n", - " config[\"client\"] = client\n", - "\n", - " with open(config_json, 'w') as fp:\n", - " json.dump(config, fp)\n", - "\n", - " one_day = datetime.timedelta(days=1)\n", - " one_hour = datetime.timedelta(hours=1)\n", - " starttimes = []\n", - " tmp_start = starttime\n", - " while tmp_start < endtime:\n", - " starttimes.append(tmp_start.datetime.isoformat())\n", - " tmp_start += one_hour\n", - "\n", - " with open(datetime_json, \"w\") as fp:\n", - " json.dump({\"starttimes\": starttimes, \"interval\": one_hour.total_seconds()}, fp)\n", - "\n", - " if num_parallel == 0:\n", - " # num_parallel = min(60, len(starttimes)//6)\n", - " num_parallel = min(60, len(starttimes))\n", - " # num_parallel = 24\n", - "\n", - " idx = [[] for i in range(num_parallel)]\n", - " for i in range(len(starttimes)):\n", - " idx[i - i // num_parallel * num_parallel].append(i)\n", - "\n", - " with open(index_json, 'w') as fp:\n", - " json.dump(idx, fp)\n", - "\n", - " return list(range(num_parallel))\n" - ] - }, - { - "cell_type": "code", - "execution_count": 819, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:31.465407Z", - "iopub.status.busy": "2021-07-23T07:24:31.464744Z", - "iopub.status.idle": "2021-07-23T07:24:31.510297Z", - "shell.execute_reply": "2021-07-23T07:24:31.509494Z", - "shell.execute_reply.started": "2021-07-23T07:24:31.465364Z" - } - }, - "outputs": [], - "source": [ - "if run_local:\n", - " idx = set_config(root_dir(\"index.json\"), root_dir(\"config.json\"), root_dir(\"datetimes.json\"), num_parallel=1,)" - ] - }, - { - "cell_type": "code", - "execution_count": 820, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:31.512170Z", - "iopub.status.busy": "2021-07-23T07:24:31.511860Z", - "iopub.status.idle": "2021-07-23T07:24:31.636918Z", - "shell.execute_reply": "2021-07-23T07:24:31.636196Z", - "shell.execute_reply.started": "2021-07-23T07:24:31.512138Z" - } - }, - "outputs": [], - "source": [ - "config_op = comp.func_to_container_op(\n", - " set_config,\n", - " # base_image='zhuwq0/quakeflow-env:latest',\n", - " base_image='python:3.8',\n", - " packages_to_install=[\"numpy\", \"obspy\",],\n", - ")\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## 3. Download events in the routine catalog\n", - "\n", - "This catalog is not used by QuakeFolow. It is only used for comparing detection results." - ] - }, - { - "cell_type": "code", - "execution_count": 821, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:31.638319Z", - "iopub.status.busy": "2021-07-23T07:24:31.638063Z", - "iopub.status.idle": "2021-07-23T07:24:31.702875Z", - "shell.execute_reply": "2021-07-23T07:24:31.701768Z", - "shell.execute_reply.started": "2021-07-23T07:24:31.638291Z" - } - }, - "outputs": [], - "source": [ - "def download_events(config_json: InputPath(\"json\"), event_csv: OutputPath(str)):\n", - "\n", - " import pickle, os\n", - " import obspy\n", - " from obspy.clients.fdsn import Client\n", - " from collections import defaultdict\n", - " import pandas as pd\n", - " import json\n", - " import matplotlib\n", - "\n", - " # matplotlib.use(\"agg\")\n", - " import matplotlib.pyplot as plt\n", - "\n", - " with open(config_json, \"r\") as fp:\n", - " config = json.load(fp)\n", - "\n", - " ####### IRIS catalog ########\n", - " try:\n", - " events = Client(config[\"client\"]).get_events(\n", - " starttime=config[\"starttime\"],\n", - " endtime=config[\"endtime\"],\n", - " minlongitude=config[\"xlim_degree\"][0],\n", - " maxlongitude=config[\"xlim_degree\"][1],\n", - " minlatitude=config[\"ylim_degree\"][0],\n", - " maxlatitude=config[\"ylim_degree\"][1],\n", - " # filename='events.xml',\n", - " )\n", - " except:\n", - " events = Client(\"iris\").get_events(\n", - " starttime=config[\"starttime\"],\n", - " endtime=config[\"endtime\"],\n", - " minlongitude=config[\"xlim_degree\"][0],\n", - " maxlongitude=config[\"xlim_degree\"][1],\n", - " minlatitude=config[\"ylim_degree\"][0],\n", - " maxlatitude=config[\"ylim_degree\"][1],\n", - " # filename='events.xml',\n", - " )\n", - "\n", - " # events = obspy.read_events('events.xml')\n", - " print(f\"Number of events: {len(events)}\")\n", - " # events.plot('local', outfile=\"events.png\")\n", - " # events.plot('local')\n", - "\n", - " ####### Save catalog ########\n", - " catalog = defaultdict(list)\n", - " for event in events:\n", - " if len(event.magnitudes) > 0:\n", - " catalog[\"time\"].append(event.origins[0].time.datetime)\n", - " catalog[\"magnitude\"].append(event.magnitudes[0].mag)\n", - " catalog[\"longitude\"].append(event.origins[0].longitude)\n", - " catalog[\"latitude\"].append(event.origins[0].latitude)\n", - " catalog[\"depth(m)\"].append(event.origins[0].depth)\n", - " catalog = pd.DataFrame.from_dict(catalog).sort_values([\"time\"])\n", - " catalog.to_csv(\n", - " event_csv,\n", - " sep=\"\\t\",\n", - " index=False,\n", - " float_format=\"%.3f\",\n", - " date_format='%Y-%m-%dT%H:%M:%S.%f',\n", - " columns=[\"time\", \"magnitude\", \"longitude\", \"latitude\", \"depth(m)\"],\n", - " )\n", - "\n", - " ####### Plot catalog ########\n", - " plt.figure()\n", - " plt.plot(catalog[\"longitude\"], catalog[\"latitude\"], '.', markersize=1)\n", - " plt.xlabel(\"Longitude\")\n", - " plt.ylabel(\"Latitude\")\n", - " plt.axis(\"scaled\")\n", - " plt.xlim(config[\"xlim_degree\"])\n", - " plt.ylim(config[\"ylim_degree\"])\n", - " # plt.savefig(os.path.join(data_path, \"events_loc.png\"))\n", - " plt.show()\n", - "\n", - " plt.figure()\n", - " plt.plot_date(catalog[\"time\"], catalog[\"magnitude\"], '.', markersize=1)\n", - " plt.gcf().autofmt_xdate()\n", - " plt.ylabel(\"Magnitude\")\n", - " plt.title(f\"Number of events: {len(events)}\")\n", - " plt.savefig(os.path.join(\"events_mag_time.png\"))\n", - " plt.show()\n" - ] - }, - { - "cell_type": "code", - "execution_count": 822, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:31.708478Z", - "iopub.status.busy": "2021-07-23T07:24:31.708086Z", - "iopub.status.idle": "2021-07-23T07:24:31.763254Z", - "shell.execute_reply": "2021-07-23T07:24:31.762255Z", - "shell.execute_reply.started": "2021-07-23T07:24:31.708431Z" - } - }, - "outputs": [], - "source": [ - "if run_local:\n", - " download_events(root_dir(\"config.json\"), root_dir(\"events.csv\"))\n" - ] - }, - { - "cell_type": "code", - "execution_count": 823, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:31.767253Z", - "iopub.status.busy": "2021-07-23T07:24:31.766926Z", - "iopub.status.idle": "2021-07-23T07:24:31.858952Z", - "shell.execute_reply": "2021-07-23T07:24:31.857871Z", - "shell.execute_reply.started": "2021-07-23T07:24:31.767217Z" - } - }, - "outputs": [], - "source": [ - "download_events_op = comp.func_to_container_op(\n", - " download_events,\n", - " # base_image='zhuwq0/quakeflow-env:latest',\n", - " base_image='python:3.8',\n", - " packages_to_install=[\"obspy\", \"pandas\", \"matplotlib\",],\n", - ")\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## 4. Download stations" - ] - }, - { - "cell_type": "code", - "execution_count": 824, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:31.861343Z", - "iopub.status.busy": "2021-07-23T07:24:31.861019Z", - "iopub.status.idle": "2021-07-23T07:24:31.939716Z", - "shell.execute_reply": "2021-07-23T07:24:31.938058Z", - "shell.execute_reply.started": "2021-07-23T07:24:31.861310Z" - } - }, - "outputs": [], - "source": [ - "def download_stations(\n", - " config_json: InputPath(\"json\"), station_csv: OutputPath(str), station_pkl: OutputPath(\"pickle\"),\n", - "):\n", - "\n", - " import pickle, os\n", - " import obspy\n", - " from obspy.clients.fdsn import Client\n", - " from collections import defaultdict\n", - " import pandas as pd\n", - " import json\n", - " import matplotlib\n", - "\n", - " # matplotlib.use(\"agg\")\n", - " import matplotlib.pyplot as plt\n", - "\n", - " with open(config_json, \"r\") as fp:\n", - " config = json.load(fp)\n", - "\n", - " print(\"Network:\", \",\".join(config[\"networks\"]))\n", - " ####### Download stations ########\n", - " stations = Client(config[\"client\"]).get_stations(\n", - " network=\",\".join(config[\"networks\"]),\n", - " station=\"*\",\n", - " starttime=config[\"starttime\"],\n", - " endtime=config[\"endtime\"],\n", - " minlongitude=config[\"xlim_degree\"][0],\n", - " maxlongitude=config[\"xlim_degree\"][1],\n", - " minlatitude=config[\"ylim_degree\"][0],\n", - " maxlatitude=config[\"ylim_degree\"][1],\n", - " channel=config[\"channels\"],\n", - " level=\"response\",\n", - " ) # ,\n", - " # filename=\"stations.xml\")\n", - "\n", - " # stations = obspy.read_inventory(\"stations.xml\")\n", - " print(\"Number of stations: {}\".format(sum([len(x) for x in stations])))\n", - " # stations.plot('local', outfile=\"stations.png\")\n", - " # stations.plot('local')\n", - "\n", - " ####### Save stations ########\n", - " station_locs = defaultdict(dict)\n", - " for network in stations:\n", - " for station in network:\n", - " for chn in station:\n", - " sid = f\"{network.code}.{station.code}.{chn.location_code}.{chn.code[:-1]}\"\n", - " if sid in station_locs:\n", - " station_locs[sid][\"component\"] += f\",{chn.code[-1]}\"\n", - " station_locs[sid][\"response\"] += f\",{chn.response.instrument_sensitivity.value:.2f}\"\n", - " else:\n", - " component = f\"{chn.code[-1]}\"\n", - " response = f\"{chn.response.instrument_sensitivity.value:.2f}\"\n", - " dtype = chn.response.instrument_sensitivity.input_units.lower()\n", - " tmp_dict = {}\n", - " (tmp_dict[\"longitude\"], tmp_dict[\"latitude\"], tmp_dict[\"elevation(m)\"],) = (\n", - " chn.longitude,\n", - " chn.latitude,\n", - " chn.elevation,\n", - " )\n", - " tmp_dict[\"component\"], tmp_dict[\"response\"], tmp_dict[\"unit\"] = (\n", - " component,\n", - " response,\n", - " dtype,\n", - " )\n", - " station_locs[sid] = tmp_dict\n", - "\n", - " station_locs = pd.DataFrame.from_dict(station_locs, orient='index')\n", - " station_locs.to_csv(\n", - " station_csv,\n", - " sep=\"\\t\",\n", - " float_format=\"%.3f\",\n", - " index_label=\"station\",\n", - " columns=[\"longitude\", \"latitude\", \"elevation(m)\", \"unit\", \"component\", \"response\",],\n", - " )\n", - "\n", - " with open(station_pkl, \"wb\") as fp:\n", - " pickle.dump(stations, fp)\n", - "\n", - " # ####### Plot stations ########\n", - " plt.figure()\n", - " plt.plot(station_locs[\"longitude\"], station_locs[\"latitude\"], \"^\", label=\"Stations\")\n", - " plt.xlabel(\"X (km)\")\n", - " plt.ylabel(\"Y (km)\")\n", - " plt.axis(\"scaled\")\n", - " plt.xlim(config[\"xlim_degree\"])\n", - " plt.ylim(config[\"ylim_degree\"])\n", - " plt.legend()\n", - " plt.title(f\"Number of stations: {len(station_locs)}\")\n", - " # plt.savefig(os.path.join(data_path, \"stations_loc.png\"))\n", - " plt.show()\n" - ] - }, - { - "cell_type": "code", - "execution_count": 825, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:31.942085Z", - "iopub.status.busy": "2021-07-23T07:24:31.941654Z", - "iopub.status.idle": "2021-07-23T07:24:31.991241Z", - "shell.execute_reply": "2021-07-23T07:24:31.989989Z", - "shell.execute_reply.started": "2021-07-23T07:24:31.941898Z" - } - }, - "outputs": [], - "source": [ - "if run_local:\n", - " download_stations(root_dir(\"config.json\"), root_dir(\"stations.csv\"), root_dir(\"stations.pkl\"))\n" - ] - }, - { - "cell_type": "code", - "execution_count": 826, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:31.993707Z", - "iopub.status.busy": "2021-07-23T07:24:31.992949Z", - "iopub.status.idle": "2021-07-23T07:24:32.100151Z", - "shell.execute_reply": "2021-07-23T07:24:32.099340Z", - "shell.execute_reply.started": "2021-07-23T07:24:31.993636Z" - } - }, - "outputs": [], - "source": [ - "download_stations_op = comp.func_to_container_op(\n", - " download_stations,\n", - " # base_image='zhuwq0/quakeflow-env:latest',\n", - " base_image='python:3.8',\n", - " packages_to_install=[\"obspy\", \"pandas\", \"matplotlib\",],\n", - ")\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## 5. Download waveform data" - ] - }, - { - "cell_type": "code", - "execution_count": 827, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.103202Z", - "iopub.status.busy": "2021-07-23T07:24:32.102609Z", - "iopub.status.idle": "2021-07-23T07:24:32.179621Z", - "shell.execute_reply": "2021-07-23T07:24:32.178724Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.103138Z" - } - }, - "outputs": [], - "source": [ - "def download_waveform(\n", - " i: int,\n", - " index_json: InputPath(\"json\"),\n", - " config_json: InputPath(\"json\"),\n", - " datetime_json: InputPath(\"json\"),\n", - " station_pkl: InputPath(\"pickle\"),\n", - " fname_csv: OutputPath(str),\n", - " data_path: str,\n", - " bucket_name: str = \"waveforms\",\n", - " s3_url: str = \"minio-service:9000\",\n", - " secure: bool = True,\n", - ") -> str:\n", - "\n", - " import pickle, os\n", - " import obspy\n", - " from obspy.clients.fdsn import Client\n", - " import time\n", - " import json\n", - " import random\n", - " import threading\n", - "\n", - " lock = threading.Lock()\n", - "\n", - " upload_minio = False\n", - " # try:\n", - " # from minio import Minio\n", - "\n", - " # minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)\n", - " # if not minioClient.bucket_exists(bucket_name):\n", - " # minioClient.make_bucket(bucket_name)\n", - " # upload_minio = True\n", - " # except Exception as err:\n", - " # # print(f\"ERROR: can not access minio service! \\n{err}\")\n", - " # pass\n", - "\n", - " with open(index_json, \"r\") as fp:\n", - " index = json.load(fp)\n", - " idx = index[i]\n", - " with open(config_json, \"r\") as fp:\n", - " config = json.load(fp)\n", - " with open(datetime_json, \"r\") as fp:\n", - " tmp = json.load(fp)\n", - " starttimes = tmp[\"starttimes\"]\n", - " interval = tmp[\"interval\"]\n", - " with open(station_pkl, \"rb\") as fp:\n", - " stations = pickle.load(fp)\n", - "\n", - " waveform_dir = os.path.join(data_path, config[\"region\"], \"waveforms\")\n", - " if not os.path.exists(waveform_dir):\n", - " os.makedirs(waveform_dir)\n", - "\n", - " ####### Download data ########\n", - " client = Client(config[\"client\"])\n", - " fname_list = [\"fname\"]\n", - "\n", - " def download(i):\n", - " # for i in idx:\n", - " starttime = obspy.UTCDateTime(starttimes[i])\n", - " endtime = starttime + interval\n", - " fname = \"{}.mseed\".format(starttime.datetime.strftime(\"%Y-%m-%dT%H:%M:%S\"))\n", - " if not upload_minio:\n", - " if os.path.exists(os.path.join(waveform_dir, fname)):\n", - " print(f\"{fname} exists\")\n", - " fname_list.append(fname)\n", - " return\n", - " else:\n", - " try:\n", - " minioClient.fget_object(\n", - " bucket_name, os.path.join(config['region'], fname), os.path.join(waveform_dir, fname),\n", - " )\n", - " print(\n", - " f\"{bucket_name}/{os.path.join(config['region'], fname)} download to {os.path.join(waveform_dir, fname)}\"\n", - " )\n", - " fname_list.append(fname)\n", - " return\n", - " except Exception as err:\n", - " print(err)\n", - "\n", - " max_retry = 10\n", - " stream = obspy.Stream()\n", - " print(f\"{fname} download starts\")\n", - " num_sta = 0\n", - " for network in stations:\n", - " for station in network:\n", - " print(f\"********{network.code}.{station.code}********\")\n", - " retry = 0\n", - " while retry < max_retry:\n", - " try:\n", - " tmp = client.get_waveforms(\n", - " network.code, station.code, \"*\", config[\"channels\"], starttime, endtime,\n", - " )\n", - " # for trace in tmp:\n", - " # if trace.stats.sampling_rate != 100:\n", - " # print(trace)\n", - " # trace = trace.interpolate(100, method=\"linear\")\n", - " # trace = trace.detrend(\"spline\", order=2, dspline=5*trace.stats.sampling_rate)\n", - " # stream.append(trace)\n", - " stream += tmp\n", - " num_sta += len(tmp)\n", - " break\n", - " except Exception as err:\n", - " print(\"Error {}.{}: {}\".format(network.code, station.code, err))\n", - " message = \"No data available for request.\"\n", - " if str(err)[: len(message)] == message:\n", - " break\n", - " retry += 1\n", - " time.sleep(5)\n", - " continue\n", - " if retry == max_retry:\n", - " print(f\"{fname}: MAX {max_retry} retries reached : {network.code}.{station.code}\")\n", - "\n", - " if len(stream) > 0:\n", - " # stream = stream.merge(fill_value=0)\n", - " # stream = stream.trim(starttime, endtime, pad=True, fill_value=0)\n", - " stream.write(os.path.join(waveform_dir, fname))\n", - " print(f\"{fname} download succeeds\")\n", - " # if upload_minio:\n", - " # minioClient.fput_object(bucket_name, os.path.join(config['region'], fname), os.path.join(waveform_dir, fname))\n", - " # print(f\"{fname} upload to minio {os.path.join(config['region'], fname)}\")\n", - " else:\n", - " print(f\"{fname} empty data\")\n", - " lock.acquire()\n", - " fname_list.append(fname)\n", - " lock.release()\n", - "\n", - " threads = []\n", - " MAX_THREADS = 4\n", - " # MAX_THREADS = 1\n", - " for ii, i in enumerate(idx):\n", - " t = threading.Thread(target=download, args=(i,))\n", - " t.start()\n", - " time.sleep(1)\n", - " threads.append(t)\n", - " if ii % MAX_THREADS == MAX_THREADS - 1:\n", - " for t in threads:\n", - " t.join()\n", - " threads = []\n", - " for t in threads:\n", - " t.join()\n", - "\n", - " with open(fname_csv, \"w\") as fp:\n", - " fp.write(\"\\n\".join(fname_list))\n", - "\n", - " return waveform_dir\n" - ] - }, - { - "cell_type": "code", - "execution_count": 828, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.181167Z", - "iopub.status.busy": "2021-07-23T07:24:32.180847Z", - "iopub.status.idle": "2021-07-23T07:24:32.236207Z", - "shell.execute_reply": "2021-07-23T07:24:32.235446Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.181139Z" - }, - "tags": [] - }, - "outputs": [], - "source": [ - "if run_local:\n", - " waveform_path = download_waveform(\n", - " 0,\n", - " root_dir(\"index.json\"),\n", - " root_dir(\"config.json\"),\n", - " root_dir(\"datetimes.json\"),\n", - " root_dir(\"stations.pkl\"),\n", - " root_dir(\"fname.csv\"),\n", - " data_path=root_dir(\"\"),\n", - " )\n" - ] - }, - { - "cell_type": "code", - "execution_count": 829, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.237575Z", - "iopub.status.busy": "2021-07-23T07:24:32.237229Z", - "iopub.status.idle": "2021-07-23T07:24:32.304622Z", - "shell.execute_reply": "2021-07-23T07:24:32.303775Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.237538Z" - } - }, - "outputs": [], - "source": [ - "download_waveform_op = comp.func_to_container_op(\n", - " download_waveform,\n", - " base_image='python:3.8',\n", - " packages_to_install=[\"obspy\", \"minio\"],\n", - ")\n" - ] - }, - { - "cell_type": "code", - "execution_count": 830, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.306222Z", - "iopub.status.busy": "2021-07-23T07:24:32.305927Z", - "iopub.status.idle": "2021-07-23T07:24:32.380151Z", - "shell.execute_reply": "2021-07-23T07:24:32.376951Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.306184Z" - } - }, - "outputs": [], - "source": [ - "def phasenet_op(data_path: str, data_list: str, stations: str):\n", - "\n", - " return dsl.ContainerOp(\n", - " name='PhaseNet Picking',\n", - " image=\"zhuwq0/phasenet-api:1.0\",\n", - " command=['python'],\n", - " arguments=[\n", - " 'phasenet/predict.py',\n", - " '--model',\n", - " \"model/190703-214543\",\n", - " '--data_dir',\n", - " data_path,\n", - " '--data_list',\n", - " dsl.InputArgumentPath(data_list),\n", - " '--stations',\n", - " dsl.InputArgumentPath(stations),\n", - " # '--result_dir', \"results\",\n", - " '--format',\n", - " \"mseed_array\",\n", - " '--amplitude',\n", - " '--upload_waveform',\n", - " ],\n", - " # file_outputs={\"picks\": \"/opt/results/picks.json\"},\n", - " file_outputs={\"picks\": \"/opt/results/picks.csv\"},\n", - " )\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## 6. Run PhaseNet to pick P/S picks" - ] - }, - { - "cell_type": "code", - "execution_count": 831, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.382805Z", - "iopub.status.busy": "2021-07-23T07:24:32.382393Z", - "iopub.status.idle": "2021-07-23T07:24:32.448401Z", - "shell.execute_reply": "2021-07-23T07:24:32.447595Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.382761Z" - }, - "tags": [] - }, - "outputs": [], - "source": [ - "# %%capture\n", - "if run_local:\n", - " command = f\"python ../PhaseNet/phasenet/predict.py --model=../PhaseNet/model/190703-214543 --data_dir={root_dir(root_dir('waveforms'))} --data_list={root_dir('fname.csv')} --stations={root_dir('stations.csv')} --result_dir={root_dir('phasenet')} --format=mseed_array --amplitude --upload_waveform\"\n", - " print(command)\n", - " !{command}" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## 7. Run GaMMA to associate P/S picks" - ] - }, - { - "cell_type": "code", - "execution_count": 832, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.450081Z", - "iopub.status.busy": "2021-07-23T07:24:32.449764Z", - "iopub.status.idle": "2021-07-23T07:24:32.541346Z", - "shell.execute_reply": "2021-07-23T07:24:32.540537Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.450054Z" - }, - "tags": [] - }, - "outputs": [], - "source": [ - "def gamma(\n", - " i: int,\n", - " index_json: InputPath(\"json\"),\n", - " config_json: InputPath(\"json\"),\n", - " pick_csv: InputPath(\"csv\"),\n", - " station_csv: InputPath(str),\n", - " catalog_csv: OutputPath(str),\n", - " picks_csv: OutputPath(str),\n", - " bucket_name: str = \"catalogs\",\n", - " s3_url: str = \"localhost:9000\",\n", - " secure: bool = True,\n", - ") -> str:\n", - "\n", - " import pandas as pd\n", - " from datetime import datetime, timedelta\n", - " import numpy as np\n", - " from datetime import datetime, timedelta\n", - " import os\n", - " import json\n", - " import pickle\n", - " from tqdm import tqdm\n", - " from gamma.utils import from_seconds, convert_picks_csv, association\n", - "\n", - " catalog_dir = os.path.join(\"/tmp/\", bucket_name)\n", - " if not os.path.exists(catalog_dir):\n", - " os.makedirs(catalog_dir)\n", - "\n", - " ## read config\n", - " with open(index_json, \"r\") as fp:\n", - " index = json.load(fp)\n", - " idx = index[i]\n", - "\n", - " with open(config_json, \"r\") as fp:\n", - " config = json.load(fp)\n", - " config[\"x(km)\"] = (np.array(config[\"xlim_degree\"]) - np.array(config[\"center\"][0])) * config[\"degree2km\"]\n", - " config[\"y(km)\"] = (np.array(config[\"ylim_degree\"]) - np.array(config[\"center\"][1])) * config[\"degree2km\"]\n", - " config[\"z(km)\"] = (0, 60)\n", - "\n", - " ## read picks\n", - " picks = pd.read_csv(pick_csv, parse_dates=[\"timestamp\"])\n", - " picks[\"id\"] = picks[\"station_id\"]\n", - " picks[\"time_idx\"] = picks[\"timestamp\"].apply(lambda x: x.strftime(\"%Y-%m-%dT%H\")) ## process by hours\n", - "\n", - " ## read stations\n", - " stations = pd.read_csv(station_csv, delimiter=\"\\t\")\n", - " stations = stations.rename(columns={\"station\": \"id\"})\n", - " stations[\"x(km)\"] = stations[\"longitude\"].apply(lambda x: (x - config[\"center\"][0]) * config[\"degree2km\"])\n", - " stations[\"y(km)\"] = stations[\"latitude\"].apply(lambda x: (x - config[\"center\"][1]) * config[\"degree2km\"])\n", - " stations[\"z(km)\"] = stations[\"elevation(m)\"].apply(lambda x: -x / 1e3)\n", - "\n", - " ### setting GMMA configs\n", - " config[\"dims\"] = ['x(km)', 'y(km)', 'z(km)']\n", - " config[\"use_amplitude\"] = True\n", - " config[\"vel\"] = {\"p\": 6.0, \"s\": 6.0 / 1.73}\n", - " config[\"method\"] = \"BGMM\"\n", - " if config[\"method\"] == \"BGMM\":\n", - " config[\"oversample_factor\"] = 4\n", - " if config[\"method\"] == \"GMM\":\n", - " config[\"oversample_factor\"] = 1\n", - "\n", - " # DBSCAN\n", - " config[\"bfgs_bounds\"] = (\n", - " (config[\"x(km)\"][0] - 1, config[\"x(km)\"][1] + 1), # x\n", - " (config[\"y(km)\"][0] - 1, config[\"y(km)\"][1] + 1), # y\n", - " (0, config[\"z(km)\"][1] + 1), # x\n", - " (None, None),\n", - " ) # t\n", - " config[\"dbscan_eps\"] = min(\n", - " 15,\n", - " np.sqrt(\n", - " (stations[\"x(km)\"].max() - stations[\"x(km)\"].min()) ** 2\n", - " + (stations[\"y(km)\"].max() - stations[\"y(km)\"].min()) ** 2\n", - " )\n", - " / (6.0 / 1.75),\n", - " ) # s\n", - " config[\"dbscan_min_samples\"] = min(3, len(stations))\n", - "\n", - " # Filtering\n", - " config[\"min_picks_per_eq\"] = min(8, len(stations) // 2)\n", - " config[\"max_sigma11\"] = 2.0 # s\n", - " config[\"max_sigma22\"] = 2.0 # m/s\n", - " config[\"max_sigma12\"] = 1.0 # covariance\n", - "\n", - " # print(config)\n", - " for k, v in config.items():\n", - " print(f\"{k}: {v}\")\n", - "\n", - " ## if use amplitude\n", - " if config[\"use_amplitude\"]:\n", - " picks = picks[picks[\"amp\"] != 0]\n", - "\n", - " ## run GMMA association\n", - " pbar = tqdm(sorted(list(set(picks[\"time_idx\"]))))\n", - " event_idx0 = 1 ## current earthquake index\n", - " assignments = []\n", - " if (len(picks) > 0) and (len(picks) < 5000):\n", - " catalogs, assignments = association(picks, stations, config, event_idx0, method=config[\"method\"], pbar=pbar,)\n", - " event_idx0 += len(catalogs)\n", - " else:\n", - " catalogs = []\n", - " for i, segment in enumerate(pbar):\n", - " picks_ = picks[picks[\"time_idx\"] == segment]\n", - " if len(picks_) == 0:\n", - " continue\n", - " catalog, assign = association(picks_, stations, config, event_idx0, method=config[\"method\"], pbar=pbar,)\n", - " event_idx0 += len(catalog)\n", - " catalogs.extend(catalog)\n", - " assignments.extend(assign)\n", - "\n", - " ## create catalog\n", - " catalogs = pd.DataFrame(\n", - " catalogs,\n", - " columns=[\"time(s)\"]\n", - " + config[\"dims\"]\n", - " + [\"magnitude\", \"sigma_time\", \"sigma_amp\", \"cov_time_amp\", \"event_idx\", \"prob_gamma\",],\n", - " )\n", - " catalogs[\"time\"] = catalogs[\"time(s)\"].apply(lambda x: from_seconds(x))\n", - " catalogs[\"longitude\"] = catalogs[\"x(km)\"].apply(lambda x: x / config[\"degree2km\"] + config[\"center\"][0])\n", - " catalogs[\"latitude\"] = catalogs[\"y(km)\"].apply(lambda x: x / config[\"degree2km\"] + config[\"center\"][1])\n", - " catalogs[\"depth(m)\"] = catalogs[\"z(km)\"].apply(lambda x: x * 1e3)\n", - "\n", - " catalogs.sort_values(by=[\"time\"], inplace=True)\n", - " with open(catalog_csv, 'w') as fp:\n", - " catalogs.to_csv(\n", - " fp,\n", - " sep=\"\\t\",\n", - " index=False,\n", - " float_format=\"%.3f\",\n", - " date_format='%Y-%m-%dT%H:%M:%S.%f',\n", - " columns=[\n", - " \"time\",\n", - " \"magnitude\",\n", - " \"longitude\",\n", - " \"latitude\",\n", - " \"depth(m)\",\n", - " \"sigma_time\",\n", - " \"sigma_amp\",\n", - " \"cov_time_amp\",\n", - " \"prob_gamma\",\n", - " \"event_idx\",\n", - " ],\n", - " )\n", - " # catalogs = catalogs[\n", - " # ['time', 'magnitude', 'longitude', 'latitude', 'depth(m)', 'sigma_time', 'sigma_amp']\n", - " # ]\n", - "\n", - " ## add assignment to picks\n", - " assignments = pd.DataFrame(assignments, columns=[\"pick_idx\", \"event_idx\", \"prob_gamma\"])\n", - " picks = picks.join(assignments.set_index(\"pick_idx\")).fillna(-1).astype({'event_idx': int})\n", - " picks.sort_values(by=[\"timestamp\"], inplace=True)\n", - " with open(picks_csv, 'w') as fp:\n", - " picks.to_csv(\n", - " fp,\n", - " sep=\"\\t\",\n", - " index=False,\n", - " date_format='%Y-%m-%dT%H:%M:%S.%f',\n", - " columns=[\"id\", \"timestamp\", \"type\", \"prob\", \"amp\", \"prob_gamma\", \"event_idx\",],\n", - " )\n", - "\n", - " ## upload to mongodb\n", - " try:\n", - " from pymongo import MongoClient\n", - "\n", - " username = \"root\"\n", - " password = \"quakeflow123\"\n", - " mongodb_url = \"quakeflow-mongodb.default.svc.cluster.local:27017\"\n", - " client = MongoClient(f\"mongodb://{username}:{password}@{mongodb_url}\")\n", - " db = client[\"quakeflow\"]\n", - " collection = db[\"waveform\"]\n", - " for i, p in tqdm(picks.iterrows(), desc=\"Uploading to mongodb\"):\n", - " collection.update(\n", - " {\"_id\": f\"{p['station_id']}_{p['timestamp'].isoformat(timespec='milliseconds')}_{p['type']}\"},\n", - " {\"$set\": {\"event_index\": p[\"event_idx\"]}},\n", - " )\n", - " except Exception as err:\n", - " print(f\"ERROR: can not access mongodb service! \\n{err}\")\n", - " pass\n", - "\n", - " ## upload to s3 bucket\n", - " try:\n", - " from minio import Minio\n", - "\n", - " minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)\n", - " if not minioClient.bucket_exists(bucket_name):\n", - " minioClient.make_bucket(bucket_name)\n", - "\n", - " with open(os.path.join(catalog_dir, f\"catalog_{idx[0]:04d}.csv\"), 'w') as fp:\n", - " catalogs.to_csv(\n", - " fp,\n", - " sep=\"\\t\",\n", - " index=False,\n", - " float_format=\"%.3f\",\n", - " date_format='%Y-%m-%dT%H:%M:%S.%f',\n", - " columns=[\n", - " \"time\",\n", - " \"magnitude\",\n", - " \"longitude\",\n", - " \"latitude\",\n", - " \"depth(m)\",\n", - " \"sigma_time\",\n", - " \"sigma_amp\",\n", - " \"cov_time_amp\",\n", - " \"prob_gamma\",\n", - " \"event_idx\",\n", - " ],\n", - " )\n", - " minioClient.fput_object(\n", - " bucket_name,\n", - " f\"{config['region']}/catalog_{idx[0]:04d}.csv\",\n", - " os.path.join(catalog_dir, f\"catalog_{idx[0]:04d}.csv\"),\n", - " )\n", - "\n", - " with open(os.path.join(catalog_dir, f\"picks_{idx[0]:04d}.csv\"), 'w') as fp:\n", - " picks.to_csv(\n", - " fp,\n", - " sep=\"\\t\",\n", - " index=False,\n", - " date_format='%Y-%m-%dT%H:%M:%S.%f',\n", - " columns=[\"id\", \"timestamp\", \"type\", \"prob\", \"amp\", \"prob_gamma\", \"event_idx\",],\n", - " )\n", - " minioClient.fput_object(\n", - " bucket_name,\n", - " f\"{config['region']}/picks_{idx[0]:04d}.csv\",\n", - " os.path.join(catalog_dir, f\"picks_{idx[0]:04d}.csv\"),\n", - " )\n", - "\n", - " except Exception as err:\n", - " print(f\"ERROR: can not access minio service! \\n{err}\")\n", - " pass\n", - "\n", - " return f\"catalog_{idx[0]:04d}.csv\"" - ] - }, - { - "cell_type": "code", - "execution_count": 833, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.542933Z", - "iopub.status.busy": "2021-07-23T07:24:32.542555Z", - "iopub.status.idle": "2021-07-23T07:24:32.602390Z", - "shell.execute_reply": "2021-07-23T07:24:32.601248Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.542875Z" - }, - "scrolled": true, - "tags": [] - }, - "outputs": [], - "source": [ - "if run_local:\n", - " catalog = gamma(\n", - " 0,\n", - " root_dir(\"index.json\"),\n", - " root_dir(\"config.json\"),\n", - " root_dir(\"phasenet/picks.csv\"),\n", - " root_dir(\"stations.csv\"),\n", - " root_dir(\"catalog.csv\"),\n", - " root_dir(\"picks.csv\"),\n", - " bucket_name=\"catalogs\",\n", - " s3_url=\"localhost:9000\",\n", - " secure=False,\n", - " )\n" - ] - }, - { - "cell_type": "code", - "execution_count": 834, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.604301Z", - "iopub.status.busy": "2021-07-23T07:24:32.603898Z", - "iopub.status.idle": "2021-07-23T07:24:32.742769Z", - "shell.execute_reply": "2021-07-23T07:24:32.742059Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.604261Z" - }, - "tags": [] - }, - "outputs": [], - "source": [ - "gamma_op = comp.func_to_container_op(\n", - " gamma,\n", - " base_image='python:3.8',\n", - " packages_to_install=[\"pandas\", \"numpy\", \"scikit-learn\", \"tqdm\", \"minio\", \"gmma\", \"pymongo\"],\n", - ")\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## 8. Plot catalogs" - ] - }, - { - "cell_type": "code", - "execution_count": 835, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.744876Z", - "iopub.status.busy": "2021-07-23T07:24:32.744517Z", - "iopub.status.idle": "2021-07-23T07:24:32.799683Z", - "shell.execute_reply": "2021-07-23T07:24:32.798887Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.744833Z" - }, - "tags": [] - }, - "outputs": [], - "source": [ - "if run_local:\n", - " # %run plot_catalog.ipynb\n", - " import pandas \n", - " import matplotlib.pyplot as plt\n", - " gamma_catalog = pandas.read_csv(root_dir(\"catalog.csv\"), sep=\"\\t\")\n", - " plt.figure()\n", - " plt.plot(gamma_catalog[\"longitude\"], gamma_catalog[\"latitude\"], '.')\n", - " plt.show()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## 9. Parallel processing on cloud\n", - "\n", - "Only run this section for parallel jobs on cloud. Setting cloud environment is needed." - ] - }, - { - "cell_type": "code", - "execution_count": 836, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.801895Z", - "iopub.status.busy": "2021-07-23T07:24:32.801552Z", - "iopub.status.idle": "2021-07-23T07:24:32.853533Z", - "shell.execute_reply": "2021-07-23T07:24:32.852549Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.801857Z" - } - }, - "outputs": [], - "source": [ - "def merge_catalog(\n", - " config_json: InputPath(\"json\"),\n", - " catalog_csv: OutputPath(str),\n", - " picks_csv: OutputPath(str),\n", - " bucket_name: str = \"catalogs\",\n", - " s3_url: str = \"minio-service:9000\",\n", - " secure: bool = True,\n", - "):\n", - "\n", - " import pandas as pd\n", - " from glob import glob\n", - " import os\n", - " import json\n", - "\n", - " from minio import Minio\n", - "\n", - " minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)\n", - "\n", - " with open(config_json, \"r\") as fp:\n", - " config = json.load(fp)\n", - "\n", - " objects = minioClient.list_objects(bucket_name, prefix=config[\"region\"], recursive=True)\n", - "\n", - " tmp_path = lambda x: os.path.join(\"/tmp/\", x)\n", - " for obj in objects:\n", - " print(obj._object_name)\n", - " minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split(\"/\")[-1]))\n", - "\n", - " files_catalog = sorted(glob(tmp_path(\"catalog_*.csv\")))\n", - " files_picks = sorted(glob(tmp_path(\"picks_*.csv\")))\n", - "\n", - " if len(files_catalog) > 0:\n", - " catalog_list = []\n", - " for f in files_catalog:\n", - " tmp = pd.read_csv(f, sep=\"\\t\", dtype=str)\n", - " tmp[\"file_idx\"] = f.rstrip(\".csv\").split(\"_\")[-1]\n", - " catalog_list.append(tmp)\n", - " merged_catalog = pd.concat(catalog_list).sort_values(by=\"time\")\n", - "\n", - " pick_list = []\n", - " for f in files_picks:\n", - " tmp = pd.read_csv(f, sep=\"\\t\", dtype=str)\n", - " tmp[\"file_idx\"] = f.rstrip(\".csv\").split(\"_\")[-1]\n", - " pick_list.append(tmp)\n", - " merged_picks = pd.concat(pick_list).sort_values(by=\"timestamp\")\n", - "\n", - " merged_catalog[\"match_id\"] = merged_catalog.apply(lambda x: f'{x[\"event_idx\"]}_{x[\"file_idx\"]}', axis=1)\n", - " merged_picks[\"match_id\"] = merged_picks.apply(lambda x: f'{x[\"event_idx\"]}_{x[\"file_idx\"]}', axis=1)\n", - " merged_catalog.sort_values(by=\"time\", inplace=True, ignore_index=True)\n", - "\n", - " merged_catalog.drop(columns=[\"event_idx\", \"file_idx\"], inplace=True)\n", - " merged_picks.drop(columns=[\"event_idx\", \"file_idx\"], inplace=True)\n", - " merged_catalog[\"event_idx\"] = merged_catalog.index.values\n", - " mapping = dict(zip(merged_catalog[\"match_id\"], merged_catalog[\"event_idx\"]))\n", - " merged_picks[\"event_idx\"] = merged_picks[\"match_id\"].apply(lambda x: mapping[x] if x in mapping else -1)\n", - " merged_catalog.drop(columns=[\"match_id\"], inplace=True)\n", - " merged_picks.drop(columns=[\"match_id\"], inplace=True)\n", - "\n", - " merged_catalog.to_csv(tmp_path(\"gamma_catalog.csv\"), sep=\"\\t\", index=False)\n", - " minioClient.fput_object(\n", - " bucket_name,\n", - " f\"{config['region']}/gamma_catalog.csv\",\n", - " tmp_path(\"gamma_catalog.csv\"),\n", - " )\n", - " merged_picks.to_csv(tmp_path(\"gamma_picks.csv\"), sep=\"\\t\", index=False)\n", - " minioClient.fput_object(\n", - " bucket_name,\n", - " f\"{config['region']}/gamma_picks.csv\",\n", - " tmp_path(\"gamma_picks.csv\"),\n", - " )\n", - "\n", - " with open(catalog_csv, \"w\") as fout:\n", - " with open(tmp_path(\"gamma_catalog.csv\"), \"r\") as fin:\n", - " for line in fin:\n", - " fout.write(line)\n", - " with open(picks_csv, \"w\") as fout:\n", - " with open(tmp_path(\"gamma_picks.csv\"), \"r\") as fin:\n", - " for line in fin:\n", - " fout.write(line)\n", - " else:\n", - " with open(catalog_csv, \"w\") as fout:\n", - " pass\n", - " print(\"No catalog.csv found!\")\n", - " with open(picks_csv, \"w\") as fout:\n", - " pass\n", - " print(\"No picks.csv found!\")\n" - ] - }, - { - "cell_type": "code", - "execution_count": 837, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.897131Z", - "iopub.status.busy": "2021-07-23T07:24:32.896881Z", - "iopub.status.idle": "2021-07-23T07:24:32.983081Z", - "shell.execute_reply": "2021-07-23T07:24:32.982120Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.897104Z" - } - }, - "outputs": [], - "source": [ - "merge_op = comp.func_to_container_op(\n", - " merge_catalog,\n", - " base_image='python:3.8',\n", - " packages_to_install=[\"pandas\", \"minio\"],\n", - ")\n" - ] - }, - { - "cell_type": "code", - "execution_count": 838, - "metadata": {}, - "outputs": [], - "source": [ - "def split_hypodd(\n", - " config_json: InputPath(\"json\"),\n", - " picks_csv: InputPath(str),\n", - " catalog_csv: InputPath(str),\n", - " bucket_name: str = \"catalogs\",\n", - " s3_url: str = \"minio-service:9000\",\n", - " secure: bool = True,\n", - ") -> list:\n", - "\n", - " import pandas as pd\n", - " import json\n", - " from tqdm import tqdm\n", - " from datetime import datetime\n", - " import os\n", - "\n", - " with open(config_json, \"r\") as fp:\n", - " config = json.load(fp)\n", - "\n", - " picks = pd.read_csv(picks_csv, sep=\"\\t\")\n", - " events = pd.read_csv(catalog_csv, sep=\"\\t\")\n", - " tmp_path = lambda x: os.path.join(\"/tmp/\", x)\n", - "\n", - " events[\"match_id\"] = events[\"event_idx\"]\n", - " picks[\"match_id\"] = picks[\"event_idx\"]\n", - "\n", - " # %%\n", - " # MAXEVENT = len(events)\n", - " MAXEVENT = 1e4 ## segment by time\n", - " MAXEVENT = len(events) // ((len(events) - 1) // MAXEVENT + 1) + 1\n", - "\n", - " # %% convert format\n", - " idx = 0\n", - " out_file = open(tmp_path(f\"hypoDD_{idx:03d}.pha\"), \"w\")\n", - "\n", - " picks_by_event = picks.groupby(\"match_id\").groups\n", - " for i in tqdm(range(len(events))):\n", - " if i % MAXEVENT == MAXEVENT - 1:\n", - " out_file.close()\n", - " idx = int((i + 1) // MAXEVENT)\n", - " out_file = open(tmp_path(f\"hypoDD_{idx:03d}.pha\"), \"w\")\n", - "\n", - " event = events.iloc[i]\n", - " event_time = datetime.strptime(event[\"time\"], \"%Y-%m-%dT%H:%M:%S.%f\")\n", - " lat = event[\"latitude\"]\n", - " lng = event[\"longitude\"]\n", - " dep = event[\"depth(m)\"] / 1e3\n", - " mag = event[\"magnitude\"]\n", - " EH = 0\n", - " EZ = 0\n", - " RMS = event[\"sigma_time\"]\n", - "\n", - " year, month, day, hour, min, sec = (\n", - " event_time.year,\n", - " event_time.month,\n", - " event_time.day,\n", - " event_time.hour,\n", - " event_time.minute,\n", - " float(event_time.strftime(\"%S.%f\")),\n", - " )\n", - " event_line = f\"# {year:4d} {month:2d} {day:2d} {hour:2d} {min:2d} {sec:5.2f} {lat:7.4f} {lng:9.4f} {dep:5.2f} {mag:5.2f} {EH:5.2f} {EZ:5.2f} {RMS:5.2f} {event['event_idx']:9d}\\n\"\n", - " out_file.write(event_line)\n", - "\n", - " picks_idx = picks_by_event[event[\"match_id\"]]\n", - " for j in picks_idx:\n", - " pick = picks.iloc[j]\n", - " network_code, station_code, comp_code, channel_code = pick['id'].split('.')\n", - " phase_type = pick['type'].upper()\n", - " phase_weight = pick['prob']\n", - " pick_time = (datetime.strptime(pick[\"timestamp\"], \"%Y-%m-%dT%H:%M:%S.%f\") - event_time).total_seconds()\n", - " # if pick_time <= 0:\n", - " # continue\n", - " # pick_line = f\"{station_code:<5s} {pick_time:8.3f} {phase_weight:5.4f} {phase_type}\\n\"\n", - " tmp_code = f\"{station_code}{channel_code}\"\n", - " pick_line = f\"{tmp_code:<7s} {pick_time:6.3f} {phase_weight:5.4f} {phase_type}\\n\"\n", - " out_file.write(pick_line)\n", - "\n", - " out_file.close()\n", - "\n", - " try:\n", - " from minio import Minio\n", - "\n", - " minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)\n", - " for i in range(idx + 1):\n", - " minioClient.fput_object(\n", - " bucket_name, f\"{config['region']}/hypoDD_{i:03d}.pha\", tmp_path(f\"hypoDD_{i:03d}.pha\")\n", - " )\n", - "\n", - " except Exception as err:\n", - " print(f\"ERROR: can not access minio service! \\n{err}\")\n", - " pass\n", - "\n", - " return list(range(idx + 1))\n" - ] - }, - { - "cell_type": "code", - "execution_count": 839, - "metadata": {}, - "outputs": [], - "source": [ - "if run_local:\n", - " num_split = split_hypodd(root_dir(\"config.json\"), root_dir(\"picks.csv\"), root_dir(\"catalog.csv\"))\n", - " # print(split_hypodd(root_dir(\"config.json\"), root_dir(\"gamma_picks.csv\"), root_dir(\"gamma_catalog.csv\")))" - ] - }, - { - "cell_type": "code", - "execution_count": 840, - "metadata": {}, - "outputs": [], - "source": [ - "split_hypodd_op = comp.func_to_container_op(\n", - " split_hypodd,\n", - " base_image='python:3.8',\n", - " packages_to_install=[\"pandas\", \"tqdm\", \"minio\"],\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": 841, - "metadata": {}, - "outputs": [], - "source": [ - "def convert_station(\n", - " station_csv: InputPath(str),\n", - " hypoinverse_station: OutputPath(str),\n", - " hypodd_station: OutputPath(str),\n", - "):\n", - "\n", - " import pandas as pd\n", - " from tqdm import tqdm\n", - "\n", - " # %%\n", - " # stations = pd.read_csv('stations.csv', sep=\"\\t\")\n", - " stations = pd.read_csv(station_csv, sep=\"\\t\")\n", - "\n", - " converted_hypoinverse = []\n", - " converted_hypoDD = {}\n", - "\n", - " for i in tqdm(range(len(stations))):\n", - "\n", - " network_code, station_code, comp_code, channel_code = stations.iloc[i]['station'].split('.')\n", - " station_weight = \" \"\n", - " lat_degree = int(stations.iloc[i]['latitude'])\n", - " lat_minute = (stations.iloc[i]['latitude'] - lat_degree) * 60\n", - " north = \"N\" if lat_degree >= 0 else \"S\"\n", - " lng_degree = int(stations.iloc[i]['longitude'])\n", - " lng_minute = (stations.iloc[i]['longitude'] - lng_degree) * 60\n", - " west = \"W\" if lng_degree <= 0 else \"E\"\n", - " elevation = stations.iloc[i]['elevation(m)']\n", - " line_hypoinverse = f\"{station_code:<5} {network_code:<2} {comp_code[:-1]:<1}{channel_code:<3} {station_weight}{abs(lat_degree):2.0f} {abs(lat_minute):7.4f}{north}{abs(lng_degree):3.0f} {abs(lng_minute):7.4f}{west}{elevation:4.0f}\\n\"\n", - " # line_hypoDD = f\"{network_code:<2}.{station_code:<5} {stations.iloc[i]['latitude']:.3f}, {stations.iloc[i]['longitude']:.3f}\\n\"\n", - " #line_hypoDD = f\"{station_code} {stations.iloc[i]['latitude']:.3f} {stations.iloc[i]['longitude']:.3f}\\n\"\n", - " converted_hypoinverse.append(line_hypoinverse)\n", - " #converted_hypoDD.append(line_hypoDD)\n", - " # converted_hypoDD[f\"{station_code}\"] = f\"{station_code} {stations.iloc[i]['latitude']:.3f} {stations.iloc[i]['longitude']:.3f}\\n\"\n", - " tmp_code = f\"{station_code}{channel_code}\"\n", - " converted_hypoDD[f\"{station_code}{channel_code}\"] = f\"{tmp_code:<8s} {stations.iloc[i]['latitude']:.3f} {stations.iloc[i]['longitude']:.3f}\\n\"\n", - "\n", - " # %%\n", - " # out_file = 'stations_hypoinverse.dat'\n", - " with open(hypoinverse_station, 'w') as f:\n", - " f.writelines(converted_hypoinverse)\n", - "\n", - " # out_file = 'stations_hypoDD.dat'\n", - " with open(hypodd_station, 'w') as f:\n", - " for k, v in converted_hypoDD.items():\n", - " f.write(v)" - ] - }, - { - "cell_type": "code", - "execution_count": 842, - "metadata": {}, - "outputs": [], - "source": [ - "if run_local:\n", - " convert_station(root_dir(\"stations.csv\"), root_dir(\"stations_hypoinverse.dat\"), root_dir(\"stations_hypoDD.dat\"))" - ] - }, - { - "cell_type": "code", - "execution_count": 843, - "metadata": {}, - "outputs": [], - "source": [ - "convert_station_op = comp.func_to_container_op(\n", - " convert_station,\n", - " base_image='python:3.8',\n", - " packages_to_install=[\"pandas\", \"tqdm\"],\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": 844, - "metadata": {}, - "outputs": [], - "source": [ - "def ph2dt(\n", - " i: int,\n", - " config_json: InputPath(\"json\"),\n", - " station_dat: InputPath(str),\n", - " ct_file: OutputPath(str),\n", - " hypodd_event: OutputPath(str),\n", - " bucket_name: str = \"catalogs\",\n", - " s3_url: str = \"minio-service:9000\",\n", - " secure: bool = True,\n", - ") -> str:\n", - " import json\n", - " from datetime import datetime\n", - " import os\n", - " from minio import Minio\n", - "\n", - " with open(config_json, \"r\") as fp:\n", - " config = json.load(fp)\n", - " tmp_path = lambda x: os.path.join(\"/tmp/\", x)\n", - "\n", - " try:\n", - " minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)\n", - " minioClient.fget_object(bucket_name, f\"{config['region']}/hypoDD_{i:03d}.pha\", tmp_path(f\"hypoDD_{i:03d}.pha\"))\n", - " except Exception as err:\n", - " print(f\"ERROR: can not access minio service! \\n{err}\")\n", - " pass\n", - " \n", - " # print(f\"cat {tmp_path(f'hypoDD_{i:03d}.pha')} > hypoDD.pha\")\n", - " os.system(f\"cat {tmp_path(f'hypoDD_{i:03d}.pha')} > hypoDD.pha\")\n", - " # os.system(f\"cat {station_csv} > stations.csv\")\n", - " # os.system(\"python convert_stations.py\")\n", - " os.system(f\"cat {station_dat} > stations_hypoDD.dat\")\n", - "\n", - " PH2DT_CMD = f\"HYPODD/src/ph2dt/ph2dt ph2dt.inp\"\n", - " if os.system(PH2DT_CMD) != 0:\n", - " raise (\"{PH2DT_CMD}\" + \" failed!\")\n", - "\n", - " os.system(f\"cat dt.ct > {ct_file}\")\n", - " os.system(f\"cat event.sel > {hypodd_event}\")\n", - " os.system(f\"mv dt.ct dt_{i:03d}.ct\")\n", - " os.system(f\"mv event.dat event_{i:03d}.dat\")\n", - " os.system(f\"mv event.sel event_{i:03d}.sel\")\n", - " os.system(\"rm -f hypoDD.reloc.*\")\n", - "\n", - " try:\n", - " minioClient.fput_object(\n", - " bucket_name,\n", - " f\"{config['region']}/dt_{i:03d}.ct\",\n", - " f\"dt_{i:03d}.ct\",\n", - " )\n", - " minioClient.fput_object(\n", - " bucket_name,\n", - " f\"{config['region']}/event_{i:03d}.dat\",\n", - " f\"event_{i:03d}.dat\",\n", - " )\n", - " minioClient.fput_object(\n", - " bucket_name,\n", - " f\"{config['region']}/event_{i:03d}.sel\",\n", - " f\"event_{i:03d}.sel\",\n", - " )\n", - " except Exception as err:\n", - " print(f\"ERROR: can not access minio service! \\n{err}\")\n", - " pass\n", - " \n", - "\n", - " return f\"dt_{i:03d}.ct\"\n" - ] - }, - { - "cell_type": "code", - "execution_count": 845, - "metadata": {}, - "outputs": [], - "source": [ - "if run_local:\n", - " for i in num_split:\n", - " ph2dt(i, root_dir(\"config.json\"), root_dir(\"stations_hypoDD.dat\"), root_dir(\"dt.ct\"), root_dir(\"event.sel\"))" - ] - }, - { - "cell_type": "code", - "execution_count": 846, - "metadata": {}, - "outputs": [], - "source": [ - "ph2dt_op = comp.func_to_container_op(\n", - " ph2dt,\n", - " base_image='zhuwq0/hypodd-api:1.0'\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": 847, - "metadata": {}, - "outputs": [], - "source": [ - "def hypodd_ct(\n", - " i: int,\n", - " config_json: InputPath(\"json\"),\n", - " ct_file: InputPath(str),\n", - " event: InputPath(str),\n", - " station: InputPath(str),\n", - " inp_file: str = \"hypoDD_ct.inp\",\n", - " bucket_name: str = \"catalogs\",\n", - " s3_url: str = \"minio-service:9000\",\n", - " secure: bool = True,\n", - "):\n", - " import json\n", - " from datetime import datetime\n", - " import os\n", - " from minio import Minio\n", - "\n", - " with open(config_json, \"r\") as fp:\n", - " config = json.load(fp)\n", - " tmp_path = lambda x: os.path.join(\"/tmp/\", x)\n", - "\n", - " os.system(f\"cat {ct_file} > dt.ct\")\n", - " os.system(f\"cat {event} > event.sel\")\n", - " os.system(f\"cat {station} > stations_hypoDD.dat\")\n", - "\n", - " HYPODD_CMD = f\"HYPODD/src/hypoDD/hypoDD {inp_file}\"\n", - " if os.system(HYPODD_CMD) != 0:\n", - " raise (\"{HYPODD_CMD}\" + \" failed!\")\n", - " os.system(f\"cat hypoDD.reloc > {tmp_path(f'hypoDD_ct_{i:03d}.reloc')}\")\n", - "\n", - " try:\n", - " minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)\n", - " minioClient.fput_object(\n", - " bucket_name,\n", - " f\"{config['region']}/hypoDD_ct_{i:03d}.reloc\",\n", - " tmp_path(f\"hypoDD_ct_{i:03d}.reloc\"),\n", - " )\n", - " except Exception as err:\n", - " print(f\"ERROR: can not access minio service! \\n{err}\")\n", - " pass\n" - ] - }, - { - "cell_type": "code", - "execution_count": 848, - "metadata": {}, - "outputs": [], - "source": [ - "if run_local:\n", - " hypodd_ct(0, root_dir(\"config.json\"), root_dir(\"dt.ct\"), root_dir(\"event.sel\"), root_dir(\"stations_hypoDD.dat\"))" - ] - }, - { - "cell_type": "code", - "execution_count": 849, - "metadata": {}, - "outputs": [], - "source": [ - "hypodd_ct_op = comp.func_to_container_op(\n", - " hypodd_ct,\n", - " base_image='zhuwq0/hypodd-api:1.0'\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": 850, - "metadata": {}, - "outputs": [], - "source": [ - "if run_local:\n", - " import pandas as pd\n", - " import matplotlib.pyplot as plt\n", - " from datetime import datetime\n", - " catalog_hypoDD = pd.read_csv(f\"/tmp/hypoDD_ct_{0:03d}.reloc\", sep=\"\\s+\", names=[\"ID\", \"LAT\", \"LON\", \"DEPTH\", \"X\", \"Y\", \"Z\", \"EX\", \"EY\", \"EZ\", \"YR\", \"MO\", \"DY\", \"HR\", \"MI\", \"SC\", \"MAG\", \"NCCP\", \"NCCS\", \"NCTP\",\n", - " \"NCTS\", \"RCC\", \"RCT\", \"CID\"])\n", - " catalog_hypoDD[\"time\"] = catalog_hypoDD.apply(lambda x: f'{x[\"YR\"]:04.0f}-{x[\"MO\"]:02.0f}-{x[\"DY\"]:02.0f}T{x[\"HR\"]:02.0f}:{x[\"MI\"]:02.0f}:{min(x[\"SC\"], 59.999):05.3f}', axis=1)\n", - " catalog_hypoDD[\"time\"] = catalog_hypoDD[\"time\"].apply(lambda x: datetime.strptime(x, \"%Y-%m-%dT%H:%M:%S.%f\"))\n", - " plt.figure()\n", - " plt.plot(catalog_hypoDD[\"LON\"], catalog_hypoDD[\"LAT\"], '.')\n" - ] - }, - { - "cell_type": "code", - "execution_count": 851, - "metadata": {}, - "outputs": [], - "source": [ - "def cross_correlation(\n", - " ct_file: InputPath(str),\n", - " catalog_file: InputPath(str),\n", - " picks_file: InputPath(str),\n", - " cc_file: OutputPath(str),\n", - "):\n", - "\n", - " import pandas as pd\n", - " from multiprocessing import Process, Manager\n", - " from pymongo import MongoClient\n", - " import numpy as np\n", - " from tqdm import tqdm\n", - " import time\n", - "\n", - " catalog = pd.read_csv(\n", - " catalog_file,\n", - " sep=\"\\t\",\n", - " parse_dates=[\"time\"],\n", - " index_col=[\"event_idx\"],\n", - " dtype={\"event_idx\": str},\n", - " )\n", - " picks = pd.read_csv(picks_file, sep=\"\\t\", parse_dates=[\"timestamp\"], dtype={\"event_idx\": str})\n", - " picks[\"station\"] = picks[\"id\"].apply(lambda x: x.split(\".\")[1] + x.split(\".\")[3])\n", - " picks = picks.set_index([\"event_idx\", \"station\", \"type\"])\n", - " picks = picks.sort_index()\n", - "\n", - " pick_index = 100\n", - " lo = pick_index - 50\n", - " hi = pick_index + 100\n", - " dt = 0.01\n", - "\n", - " ct_dict = Manager().dict()\n", - " cc_dict = Manager().dict()\n", - " with open(ct_file) as fct:\n", - " meta = fct.readlines()\n", - " for i, line in enumerate(meta):\n", - " if line[0] == \"#\":\n", - " if i > 0:\n", - " ct_dict[key] = value\n", - " key = line\n", - " value = []\n", - " continue\n", - " value.append(line)\n", - " ct_dict[key] = value\n", - " keys = sorted(list(ct_dict.keys()))\n", - "\n", - " def calc_cross_correlation(keys, ct_dict, cc_dict):\n", - " username = \"root\"\n", - " password = \"quakeflow123\"\n", - " # client = MongoClient(f\"mongodb://{username}:{password}@127.0.0.1:27017\")\n", - " client = MongoClient(f\"mongodb://{username}:{password}@quakeflow-mongodb.default.svc.cluster.local:27017\")\n", - " db = client[\"quakeflow\"]\n", - " collection = db[\"waveform\"]\n", - " # normalize = lambda x: (x - np.mean(x, axis=0, keepdims=True)) / np.std(x, axis=0, keepdims=True)\n", - "\n", - " for key in keys:\n", - " tmp = key.split()\n", - " ID1, ID2 = tmp[1], tmp[2]\n", - " key_cc = f\"# {ID1} {ID2} 0.0\\n\"\n", - " lines_cc = []\n", - " for line in ct_dict[key]:\n", - " tmp = line.split()\n", - " STA, TT1, TT2, WGT, PHA = (\n", - " tmp[0],\n", - " tmp[1],\n", - " tmp[2],\n", - " tmp[3],\n", - " tmp[4],\n", - " ) ##HypoDD format\n", - "\n", - " for i, row1 in picks.loc[(ID1, STA, PHA)].iterrows():\n", - "\n", - " data1 = collection.find_one(\n", - " {\"_id\": f\"{row1['id']}_{row1['timestamp'].isoformat(timespec='milliseconds')}_{PHA}\"}\n", - " )\n", - "\n", - " for j, row2 in picks.loc[(ID2, STA, PHA)].iterrows():\n", - "\n", - " data2 = collection.find_one(\n", - " {\"_id\": f\"{row2['id']}_{row2['timestamp'].isoformat(timespec='milliseconds')}_{PHA}\"}\n", - " )\n", - "\n", - " # if PHA == \"P\": # Z\n", - " # waveform1 = np.array(data1[\"waveform\"])[lo:hi, -1:]\n", - " # waveform2 = np.array(data2[\"waveform\"])[lo:hi, -1:]\n", - " # elif PHA == \"S\": # E, N\n", - " # waveform1 = np.array(data1[\"waveform\"])[lo:hi, :-1]\n", - " # waveform2 = np.array(data2[\"waveform\"])[lo:hi, :-1]\n", - " # else:\n", - " # raise (Exception(\"PHA must be P or S\"))\n", - " waveform1 = np.array(data1[\"waveform\"])[lo:hi, :]\n", - " waveform2 = np.array(data2[\"waveform\"])[lo:hi, :]\n", - "\n", - " cc = np.zeros(waveform1.shape[0])\n", - " for k in range(waveform1.shape[1]):\n", - " cc += np.correlate(waveform1[:, k], waveform2[:, k], mode=\"same\")\n", - " norm = np.sqrt(np.sum(waveform1**2) * np.sum(waveform2**2))\n", - " if norm == 0:\n", - " continue\n", - " else:\n", - " cc /= norm\n", - " shift = (np.argmax(np.abs(cc)) - waveform1.shape[0] // 2) * dt + float(TT1) - float(TT2)\n", - " coeff = np.max(np.abs(cc))\n", - "\n", - " if not np.isnan(coeff):\n", - " lines_cc.append(f\"{STA:<7s} {shift:.5f} {coeff:.3f} {PHA}\\n\")\n", - "\n", - " cc_dict[key_cc] = lines_cc\n", - "\n", - " return 0\n", - "\n", - " t0 = time.time()\n", - " processes = []\n", - " num_process = 16\n", - " for i in range(num_process):\n", - " p = Process(target=calc_cross_correlation, args=(keys[i::num_process], ct_dict, cc_dict))\n", - " p.start()\n", - " processes.append(p)\n", - " for p in processes:\n", - " p.join()\n", - " print(f\"{num_process} process: time = {time.time()-t0:.1f}\")\n", - "\n", - " with open(cc_file, \"w\") as fcc:\n", - " for key in cc_dict:\n", - " fcc.write(key)\n", - " for line in cc_dict[key]:\n", - " fcc.write(line)\n" - ] - }, - { - "cell_type": "code", - "execution_count": 852, - "metadata": {}, - "outputs": [], - "source": [ - "cc_op = comp.func_to_container_op(\n", - " cross_correlation,\n", - " base_image='python:3.8',\n", - " packages_to_install=[\"pandas\", \"tqdm\", \"minio\", \"pymongo\"],\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": 853, - "metadata": {}, - "outputs": [], - "source": [ - "def hypodd_cc(\n", - " i: int,\n", - " config_json: InputPath(\"json\"),\n", - " ct_file: InputPath(str),\n", - " cc_file: InputPath(str),\n", - " event: InputPath(str),\n", - " station: InputPath(str),\n", - " inp_file: str = \"hypoDD_cc.inp\",\n", - " bucket_name: str = \"catalogs\",\n", - " s3_url: str = \"minio-service:9000\",\n", - " secure: bool = True,\n", - "):\n", - " import json\n", - " import os\n", - " from minio import Minio\n", - "\n", - " with open(config_json, \"r\") as fp:\n", - " config = json.load(fp)\n", - "\n", - " minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)\n", - "\n", - " os.system(f\"cat {ct_file} > dt.ct\")\n", - " os.system(f\"cat {cc_file} > dt.cc\")\n", - " os.system(f\"cat {event} > event.sel\")\n", - " os.system(f\"cat {station} > stations_hypoDD.dat \")\n", - "\n", - " HYPODD_CMD = f\"HYPODD/src/hypoDD/hypoDD {inp_file}\"\n", - " if os.system(HYPODD_CMD) != 0:\n", - " raise (\"{HYPODD_CMD}\" + \" failed!\")\n", - " os.system(f\"mv hypoDD.reloc hypoDD_cc_{i:03d}.reloc\")\n", - "\n", - " minioClient.fput_object(\n", - " bucket_name,\n", - " f\"{config['region']}/hypoDD_cc_{i:03d}.reloc\",\n", - " f\"hypoDD_cc_{i:03d}.reloc\",\n", - " )\n" - ] - }, - { - "cell_type": "code", - "execution_count": 854, - "metadata": {}, - "outputs": [], - "source": [ - "hypodd_cc_op = comp.func_to_container_op(\n", - " hypodd_cc,\n", - " base_image='zhuwq0/hypodd-api:1.0'\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": 855, - "metadata": {}, - "outputs": [], - "source": [ - "def merge_hypodd(\n", - " config_json: InputPath(\"json\"),\n", - " catalog_txt: OutputPath(str),\n", - " bucket_name: str = \"catalogs\",\n", - " s3_url: str = \"minio-service:9000\",\n", - " secure: bool = True,\n", - "):\n", - " import json\n", - " from glob import glob\n", - " import os\n", - " from minio import Minio\n", - "\n", - " minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)\n", - "\n", - " with open(config_json, \"r\") as fp:\n", - " config = json.load(fp)\n", - "\n", - " objects = minioClient.list_objects(bucket_name, prefix=f\"{config['region']}/hypoDD_\", recursive=True)\n", - "\n", - " tmp_path = lambda x: os.path.join(\"/tmp/\", x)\n", - " for obj in objects:\n", - " print(obj._object_name)\n", - " minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split(\"/\")[-1]))\n", - "\n", - " hypoDD_ct_catalogs = sorted(glob(tmp_path(\"hypoDD_ct_*.reloc\")))\n", - " print(f\"cat {' '.join(hypoDD_ct_catalogs)} > {tmp_path('hypoDD_ct_catalog.txt')}\")\n", - " os.system(f\"cat {' '.join(hypoDD_ct_catalogs)} > {tmp_path('hypoDD_ct_catalog.txt')}\")\n", - " minioClient.fput_object(\n", - " bucket_name, f\"{config['region']}/hypoDD_ct_catalog.txt\", tmp_path(\"hypoDD_ct_catalog.txt\")\n", - " )\n", - " os.system(f\"mv {tmp_path('hypoDD_ct_catalog.txt')} {catalog_txt}\")\n", - "\n", - " hypoDD_cc_catalogs = sorted(glob(tmp_path(\"hypoDD_cc_*.reloc\")))\n", - " print(f\"cat {' '.join(hypoDD_cc_catalogs)} > {tmp_path('hypoDD_cc_catalog.txt')}\")\n", - " os.system(f\"cat {' '.join(hypoDD_cc_catalogs)} > {tmp_path('hypoDD_cc_catalog.txt')}\")\n", - " minioClient.fput_object(\n", - " bucket_name, f\"{config['region']}/hypoDD_cc_catalog.txt\", tmp_path(\"hypoDD_cc_catalog.txt\")\n", - " )\n", - " os.system(f\"mv {tmp_path('hypoDD_cc_catalog.txt')} {catalog_txt}\")" - ] - }, - { - "cell_type": "code", - "execution_count": 856, - "metadata": {}, - "outputs": [], - "source": [ - "merge_hypodd_op = comp.func_to_container_op(\n", - " merge_hypodd,\n", - " base_image='python:3.8',\n", - " packages_to_install=[\"pandas\", \"tqdm\", \"minio\"],\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": 857, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:32.985184Z", - "iopub.status.busy": "2021-07-23T07:24:32.984821Z", - "iopub.status.idle": "2021-07-23T07:24:33.040031Z", - "shell.execute_reply": "2021-07-23T07:24:33.039300Z", - "shell.execute_reply.started": "2021-07-23T07:24:32.985158Z" - } - }, - "outputs": [], - "source": [ - "@dsl.pipeline(name='QuakeFlow', description='')\n", - "def quakeflow_pipeline(\n", - " data_path: str = \"/tmp/\",\n", - " num_parallel=0,\n", - " bucket_catalog: str = \"catalogs\",\n", - " s3_url: str = \"minio-service:9000\",\n", - " secure: bool = False,\n", - "):\n", - "\n", - " config = config_op(num_parallel)\n", - "\n", - " events = download_events_op(config.outputs[\"config_json\"]).set_display_name('Download Events')\n", - "\n", - " stations = download_stations_op(config.outputs[\"config_json\"]).set_display_name('Download Stations')\n", - "\n", - " with kfp.dsl.ParallelFor(config.outputs[\"output\"]) as i:\n", - "\n", - " vop_ = dsl.VolumeOp(\n", - " name=f\"Create volume\",\n", - " resource_name=f\"data-volume-{str(i)}\",\n", - " size=\"50Gi\",\n", - " modes=dsl.VOLUME_MODE_RWO,\n", - " ).set_retry(3)\n", - "\n", - " download_op_ = (\n", - " download_waveform_op(\n", - " i,\n", - " config.outputs[\"index_json\"],\n", - " config.outputs[\"config_json\"],\n", - " config.outputs[\"datetime_json\"],\n", - " stations.outputs[\"station_pkl\"],\n", - " data_path=data_path,\n", - " bucket_name=f\"waveforms\",\n", - " s3_url=s3_url,\n", - " secure=secure,\n", - " )\n", - " .add_pvolumes({data_path: vop_.volume})\n", - " .set_cpu_request(\"800m\")\n", - " .set_retry(3)\n", - " .set_display_name('Download Waveforms')\n", - " )\n", - " download_op_.execution_options.caching_strategy.max_cache_staleness = \"P30D\"\n", - "\n", - " phasenet_op_ = (\n", - " phasenet_op(\n", - " download_op_.outputs[\"Output\"],\n", - " download_op_.outputs[\"fname_csv\"],\n", - " stations.outputs[\"station_csv\"],\n", - " )\n", - " .add_pvolumes({data_path: download_op_.pvolume})\n", - " .set_memory_request(\"9G\")\n", - " .set_retry(3)\n", - " .set_display_name('PhaseNet Picking')\n", - " )\n", - " phasenet_op_.execution_options.caching_strategy.max_cache_staleness = \"P30D\"\n", - " phasenet_op_.set_image_pull_policy(\"Always\")\n", - "\n", - " gamma_op_ = (\n", - " gamma_op(\n", - " i,\n", - " config.outputs[\"index_json\"],\n", - " config.outputs[\"config_json\"],\n", - " phasenet_op_.outputs[\"picks\"],\n", - " stations.outputs[\"station_csv\"],\n", - " bucket_name=f\"catalogs\",\n", - " s3_url=s3_url,\n", - " secure=secure,\n", - " )\n", - " .set_cpu_request(\"800m\")\n", - " .set_retry(3)\n", - " .set_display_name('GaMMA Association')\n", - " )\n", - " gamma_op_.execution_options.caching_strategy.max_cache_staleness = \"P30D\"\n", - "\n", - " merge_op_ = (\n", - " merge_op(\n", - " config.outputs[\"config_json\"],\n", - " bucket_name=f\"catalogs\",\n", - " s3_url=s3_url,\n", - " secure=secure,\n", - " )\n", - " .after(gamma_op_)\n", - " .set_display_name('Merge Catalog')\n", - " )\n", - " merge_op_.execution_options.caching_strategy.max_cache_staleness = \"P30D\"\n", - "\n", - " convert_station_op_ = convert_station_op(station_csv=stations.outputs[\"station_csv\"])\n", - " split_hypodd_op_ = (\n", - " split_hypodd_op(\n", - " config.outputs[\"config_json\"],\n", - " picks_csv=merge_op_.outputs[\"picks_csv\"],\n", - " catalog_csv=merge_op_.outputs[\"catalog_csv\"],\n", - " bucket_name=\"catalogs\",\n", - " s3_url=s3_url,\n", - " secure=secure,\n", - " )\n", - " .after(merge_op_)\n", - " .set_display_name('Split Catalog')\n", - " )\n", - " split_hypodd_op_.execution_options.caching_strategy.max_cache_staleness = \"P30D\"\n", - " split_hypodd_op_.set_image_pull_policy(\"Always\")\n", - "\n", - " with kfp.dsl.ParallelFor(split_hypodd_op_.outputs[\"output\"]) as i:\n", - "\n", - " ph2dt_op_ = ph2dt_op(\n", - " i,\n", - " config_json=config.outputs[\"config_json\"],\n", - " station_dat=convert_station_op_.outputs[\"hypodd_station\"],\n", - " bucket_name=\"catalogs\",\n", - " s3_url=s3_url,\n", - " secure=secure,\n", - " ).set_display_name('HypoDD PH2DT')\n", - " ph2dt_op_.execution_options.caching_strategy.max_cache_staleness = \"P30D\"\n", - " ph2dt_op_.set_image_pull_policy(\"Always\")\n", - "\n", - " cc_op_ = cc_op(\n", - " ct=ph2dt_op_.outputs[\"ct\"],\n", - " picks=merge_op_.outputs[\"picks_csv\"],\n", - " catalog=merge_op_.outputs[\"catalog_csv\"],\n", - " ).set_display_name('Cross Correlation')\n", - " cc_op_.execution_options.caching_strategy.max_cache_staleness = \"P30D\"\n", - " cc_op_.set_image_pull_policy(\"Always\")\n", - "\n", - " hypodd_ct_op_ = hypodd_ct_op(\n", - " i,\n", - " config_json=config.outputs[\"config_json\"],\n", - " ct=ph2dt_op_.outputs[\"ct\"],\n", - " event=ph2dt_op_.outputs[\"hypodd_event\"],\n", - " station=convert_station_op_.outputs[\"hypodd_station\"],\n", - " bucket_name=\"catalogs\",\n", - " s3_url=s3_url,\n", - " secure=secure,\n", - " ).set_display_name('HypoDD')\n", - " hypodd_ct_op_.execution_options.caching_strategy.max_cache_staleness = \"P30D\"\n", - " hypodd_ct_op_.set_image_pull_policy(\"Always\")\n", - "\n", - " hypodd_cc_op_ = hypodd_cc_op(\n", - " i,\n", - " config_json=config.outputs[\"config_json\"],\n", - " ct=ph2dt_op_.outputs[\"ct\"],\n", - " cc=cc_op_.outputs[\"cc\"],\n", - " event=ph2dt_op_.outputs[\"hypodd_event\"],\n", - " station=convert_station_op_.outputs[\"hypodd_station\"],\n", - " bucket_name=\"catalogs\",\n", - " s3_url=s3_url,\n", - " secure=secure,\n", - " ).set_display_name('HypoDD + CC')\n", - " hypodd_cc_op_.execution_options.caching_strategy.max_cache_staleness = \"P30D\"\n", - " hypodd_cc_op_.set_image_pull_policy(\"Always\")\n", - "\n", - " merge_hypodd_op_ = (\n", - " merge_hypodd_op(\n", - " config_json=config.outputs[\"config_json\"], bucket_name=f\"catalogs\", s3_url=s3_url, secure=secure\n", - " )\n", - " .after(hypodd_ct_op_)\n", - " .after(hypodd_cc_op_)\n", - " .set_display_name('Merge Catalog')\n", - " )\n", - " merge_hypodd_op_.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n", - " merge_hypodd_op_.set_image_pull_policy(\"Always\")\n", - "\n", - " # vop_.delete().after(merge_hypodd_op_)\n" - ] - }, - { - "cell_type": "code", - "execution_count": 858, - "metadata": { - "execution": { - "iopub.execute_input": "2021-07-23T07:24:33.046370Z", - "iopub.status.busy": "2021-07-23T07:24:33.046091Z", - "iopub.status.idle": "2021-07-23T07:24:36.738285Z", - "shell.execute_reply": "2021-07-23T07:24:36.737618Z", - "shell.execute_reply.started": "2021-07-23T07:24:33.046345Z" - } - }, - "outputs": [ - { - "data": { - "text/html": [ - "Experiment details." - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/html": [ - "Run details." - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "import os\n", - "\n", - "os.environ[\"GOOGLE_APPLICATION_CREDENTIALS\"] = \"/home/weiqiang/.dotbot/cloud/quakeflow_zhuwq.json\"\n", - "experiment_name = 'QuakeFlow'\n", - "pipeline_func = quakeflow_pipeline\n", - "run_name = pipeline_func.__name__ + '_run'\n", - "\n", - "arguments = {\n", - " \"data_path\": \"/tmp\",\n", - " \"num_parallel\": 0,\n", - " \"bucket_catalog\": \"catalogs\",\n", - " \"s3_url\": \"minio-service:9000\",\n", - " \"secure\": False,\n", - "}\n", - "\n", - "if not run_local:\n", - " pipeline_conf = kfp.dsl.PipelineConf()\n", - " pipeline_conf.set_image_pull_policy(\"Always\")\n", - " pipeline_conf.ttl_seconds_after_finished = 60 * 10\n", - " # client = kfp.Client(host=\"2dbc4e1ef495773d-dot-us-west1.pipelines.googleusercontent.com\")\n", - " client = kfp.Client(host=\"http://localhost:8080\")\n", - " kfp.compiler.Compiler().compile(pipeline_func, '{}.zip'.format(experiment_name), pipeline_conf=pipeline_conf)\n", - " results = client.create_run_from_pipeline_func(\n", - " pipeline_func, experiment_name=experiment_name, run_name=run_name, arguments=arguments,\n", - " )\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "interpreter": { - "hash": "cd49b9d623d06aa0c5f872a997e70207e179b28bd8e4cd8fec363e5d29096c9c" - }, - "kernelspec": { - "display_name": "Python 3.8.12 ('base')", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.12" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -}