Skip to content

Commit

Permalink
nb data pipeline to new wave db complete
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbull committed Jun 12, 2024
1 parent a200aea commit def9fe3
Show file tree
Hide file tree
Showing 2 changed files with 291 additions and 54 deletions.
277 changes: 255 additions & 22 deletions nbs/python/240612_match_bodhi_sl_data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 57,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"import pandas as pd\n",
"import requests\n",
"from utils.sl_models import engine as sl_engine\n",
"from utils.bodhi_models import engine as bodhi_engine\n",
"from utils.bodhi_models import get_session, SpotsModel, WaveForecastModel\n",
"from sqlalchemy import select, and_, not_, text\n",
"from pydantic import BaseModel, ConfigDict\n",
"import pandas as pd"
"from sqlalchemy import and_, insert, not_, select, text, tuple_\n",
"from utils.bodhi_models import BodhiWaves, BohdiWavesModel\n",
"from utils.bodhi_models import engine as bodhi_engine\n",
"from utils.bodhi_models import get_session\n",
"from typing import List, Dict, Any, Optional\n",
"from utils.sl_models import engine as sl_engine"
]
},
{
Expand All @@ -32,7 +35,7 @@
"name": "stderr",
"output_type": "stream",
"text": [
"/tmp/ipykernel_84535/62947810.py:3: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to \"sqlalchemy<2.0\". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)\n",
"/tmp/ipykernel_90876/62947810.py:3: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to \"sqlalchemy<2.0\". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)\n",
" results = db.execute(stmt).fetchall()\n"
]
}
Expand All @@ -51,7 +54,7 @@
{
"data": {
"text/plain": [
"[(20132278, '0101000020E610000000000000000023400000000000F05340', 79.75, 9.5, datetime.datetime(2024, 6, 10, 0, 0, tzinfo=datetime.timezone.utc), datetime.timedelta(0), datetime.datetime(2024, 6, 10, 0, 0, tzinfo=datetime.timezone.utc), 1.090000033378601, 5.170000076293945, 46.459999084472656, 1.0399999618530273, 5.139999866485596, 44.43000030517578, 8.420000076293945, 63.06999969482422, 0.33000001311302185, 6.800000190734863, datetime.datetime(2024, 6, 10, 16, 22, 15, 153926, tzinfo=datetime.timezone.utc))]"
"[(4182236, '0101000020E610000000000000000024C00000000000001EC0', -7.5, -10.0, datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.timedelta(seconds=75600), datetime.datetime(2024, 6, 12, 21, 0, tzinfo=datetime.timezone.utc), 1.7599999904632568, 7.329999923706055, 133.85000610351562, 1.6200000047683716, 7.119999885559082, 126.06999969482422, 7.349999904632568, 122.31999969482422, 0.6499999761581421, 11.680000305175781, datetime.datetime(2024, 6, 12, 13, 27, 53, 61400, tzinfo=datetime.timezone.utc))]"
]
},
"execution_count": 3,
Expand All @@ -71,7 +74,7 @@
{
"data": {
"text/plain": [
"[WaveForecastModel(id=20132278, location='0101000020E610000000000000000023400000000000F05340', latitude=79.75, longitude=9.5, time=datetime.datetime(2024, 6, 10, 0, 0, tzinfo=datetime.timezone.utc), step=datetime.timedelta(0), valid_time=datetime.datetime(2024, 6, 10, 0, 0, tzinfo=datetime.timezone.utc), swh=1.090000033378601, perpw=5.170000076293945, dirpw=46.459999084472656, shww=1.0399999618530273, mpww=5.139999866485596, wvdir=44.43000030517578, ws=8.420000076293945, wdir=63.06999969482422, swell=0.33000001311302185, swper=6.800000190734863, entry_updated=datetime.datetime(2024, 6, 10, 16, 22, 15, 153926, tzinfo=datetime.timezone.utc))]"
"[BohdiWavesModel(id=4182236, location='0101000020E610000000000000000024C00000000000001EC0', latitude=-7.5, longitude=-10.0, time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), step=datetime.timedelta(seconds=75600), valid_time=datetime.datetime(2024, 6, 12, 21, 0, tzinfo=datetime.timezone.utc), swh=1.7599999904632568, perpw=7.329999923706055, dirpw=133.85000610351562, shww=1.6200000047683716, mpww=7.119999885559082, wvdir=126.06999969482422, ws=7.349999904632568, wdir=122.31999969482422, swell=0.6499999761581421, swper=11.680000305175781, entry_updated=datetime.datetime(2024, 6, 12, 13, 27, 53, 61400, tzinfo=datetime.timezone.utc))]"
]
},
"execution_count": 4,
Expand All @@ -80,7 +83,14 @@
}
],
"source": [
"[WaveForecastModel.model_validate(entry._asdict()) for entry in results]"
"[BohdiWavesModel.model_validate(entry._asdict()) for entry in results]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sl Spots by id, lat, and lon"
]
},
{
Expand Down Expand Up @@ -117,6 +127,13 @@
"spatial_idxs = [SpotSpatialIdx.model_validate(entry) for entry in results]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get the associated Offshore Location for each spot"
]
},
{
"cell_type": "code",
"execution_count": 8,
Expand Down Expand Up @@ -160,6 +177,20 @@
"data_dicts = [SlOffshoreIdx.model_dump(entry) for entry in data]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Transform to dataframe"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Filter to only include where the spot's offshore location matches bodhi-cast's offshore location"
]
},
{
"cell_type": "code",
"execution_count": 12,
Expand Down Expand Up @@ -312,7 +343,7 @@
{
"data": {
"text/plain": [
"582"
"579"
]
},
"execution_count": 18,
Expand Down Expand Up @@ -342,11 +373,37 @@
"lat_lon_str = ', '.join(map(str, lat_lon_list))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Enable postgis"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"with get_session(sl_engine) as db:\n",
" stmt = text(\"\"\"CREATE EXTENSION IF NOT EXISTS postgis\"\"\")\n",
" results = db.execute(stmt)\n",
" db.commit()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create indexes and reindex "
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"with get_session(bodhi_engine) as db:\n",
" stmt = text(\"\"\"CREATE INDEX if not exists idx_wave_forecast_lat_lon ON wave_forecast (latitude, longitude)\"\"\")\n",
Expand All @@ -356,7 +413,7 @@
},
{
"cell_type": "code",
"execution_count": 22,
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -366,39 +423,215 @@
" db.commit()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Getting matching bodhi wave data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Get all waves from bodhi for the current day that match the filtered sl spots "
]
},
{
"cell_type": "code",
"execution_count": 23,
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"with get_session(bodhi_engine) as db:\n",
" stmt = text(f\"\"\"select from wave_forecast where time = CURRENT_DATE AND (latitude, longitude) in ({lat_lon_str}) limit 5\"\"\")\n",
" stmt = text(f\"\"\"select * from wave_forecast where time = CURRENT_DATE AND (latitude, longitude) in ({lat_lon_str}) limit 5\"\"\")\n",
" results = db.execute(stmt).fetchall()"
]
},
{
"cell_type": "code",
"execution_count": 24,
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"data = [BohdiWavesModel.model_validate(entry) for entry in results]"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[(32601315, '0101000020E610000000000000008061C00000000000C04D40', 59.5, -140.0, datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.timedelta(0), datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), 1.9600000381469727, 10.489999771118164, 218.67999267578125, None, None, None, 4.110000133514404, 269.8800048828125, 1.8700000047683716, 10.489999771118164, datetime.datetime(2024, 6, 12, 10, 56, 37, 64720, tzinfo=datetime.timezone.utc)),\n",
" (32605384, '0101000020E610000000000000000063C00000000000E04C40', 57.75, -152.0, datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.timedelta(0), datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), 1.3300000429153442, 8.25, 162.5, 0.009999999776482582, 0.20000000298023224, 2.1500000953674316, 3.809999942779541, 84.05000305175781, 1.2000000476837158, 8.210000038146973, datetime.datetime(2024, 6, 12, 10, 56, 37, 64720, tzinfo=datetime.timezone.utc)),\n",
" (32605984, '0101000020E610000000000000000863C00000000000C04C40', 57.5, -152.25, datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.timedelta(0), datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), 1.2699999809265137, 8.149999618530273, 159.33999633789062, 0.029999999329447746, 0.4099999964237213, 5.099999904632568, 4.190000057220459, 82.70999908447266, 1.2000000476837158, 8.15999984741211, datetime.datetime(2024, 6, 12, 10, 56, 37, 64720, tzinfo=datetime.timezone.utc)),\n",
" (32605986, '0101000020E61000000000000000F862C00000000000C04C40', 57.5, -151.75, datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.timedelta(0), datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), 1.5499999523162842, 8.260000228881836, 173.07000732421875, None, None, None, 4.050000190734863, 86.37000274658203, 1.440000057220459, 8.260000228881836, datetime.datetime(2024, 6, 12, 10, 56, 37, 64720, tzinfo=datetime.timezone.utc)),\n",
" (32606642, '0101000020E610000000000000000061C00000000000A04C40', 57.25, -136.0, datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.timedelta(0), datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), 1.9600000381469727, 10.800000190734863, 237.75, 0.07999999821186066, 0.6100000143051147, 47.45000076293945, 6.400000095367432, 304.1400146484375, 1.909999966621399, 10.800000190734863, datetime.datetime(2024, 6, 12, 10, 56, 37, 64720, tzinfo=datetime.timezone.utc))]"
"[BohdiWavesModel(id=30254, location='0101000020E610000000000000008061C00000000000C04D40', latitude=59.5, longitude=-140.0, time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), step=datetime.timedelta(0), valid_time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), swh=1.9600000381469727, perpw=10.489999771118164, dirpw=218.67999267578125, shww=None, mpww=None, wvdir=None, ws=4.110000133514404, wdir=269.8800048828125, swell=1.8700000047683716, swper=10.489999771118164, entry_updated=datetime.datetime(2024, 6, 12, 13, 19, 52, 744570, tzinfo=datetime.timezone.utc)),\n",
" BohdiWavesModel(id=34323, location='0101000020E610000000000000000063C00000000000E04C40', latitude=57.75, longitude=-152.0, time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), step=datetime.timedelta(0), valid_time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), swh=1.3300000429153442, perpw=8.25, dirpw=162.5, shww=0.009999999776482582, mpww=0.20000000298023224, wvdir=2.1500000953674316, ws=3.809999942779541, wdir=84.05000305175781, swell=1.2000000476837158, swper=8.210000038146973, entry_updated=datetime.datetime(2024, 6, 12, 13, 19, 52, 744570, tzinfo=datetime.timezone.utc)),\n",
" BohdiWavesModel(id=34923, location='0101000020E610000000000000000863C00000000000C04C40', latitude=57.5, longitude=-152.25, time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), step=datetime.timedelta(0), valid_time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), swh=1.2699999809265137, perpw=8.149999618530273, dirpw=159.33999633789062, shww=0.029999999329447746, mpww=0.4099999964237213, wvdir=5.099999904632568, ws=4.190000057220459, wdir=82.70999908447266, swell=1.2000000476837158, swper=8.15999984741211, entry_updated=datetime.datetime(2024, 6, 12, 13, 19, 52, 744570, tzinfo=datetime.timezone.utc)),\n",
" BohdiWavesModel(id=34925, location='0101000020E61000000000000000F862C00000000000C04C40', latitude=57.5, longitude=-151.75, time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), step=datetime.timedelta(0), valid_time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), swh=1.5499999523162842, perpw=8.260000228881836, dirpw=173.07000732421875, shww=None, mpww=None, wvdir=None, ws=4.050000190734863, wdir=86.37000274658203, swell=1.440000057220459, swper=8.260000228881836, entry_updated=datetime.datetime(2024, 6, 12, 13, 19, 52, 744570, tzinfo=datetime.timezone.utc)),\n",
" BohdiWavesModel(id=35581, location='0101000020E610000000000000000061C00000000000A04C40', latitude=57.25, longitude=-136.0, time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), step=datetime.timedelta(0), valid_time=datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc), swh=1.9600000381469727, perpw=10.800000190734863, dirpw=237.75, shww=0.07999999821186066, mpww=0.6100000143051147, wvdir=47.45000076293945, ws=6.400000095367432, wdir=304.1400146484375, swell=1.909999966621399, swper=10.800000190734863, entry_updated=datetime.datetime(2024, 6, 12, 13, 19, 52, 744570, tzinfo=datetime.timezone.utc))]"
]
},
"execution_count": 24,
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"results"
"data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create New Table to push to same postgres db as other sl data"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"from utils.bodhi_models import create_tables as create_bodhi_tables\n"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"create_bodhi_tables(sl_engine)"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [],
"source": [
"data_dicts = [entry.model_dump() for entry in data]"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"for d in data_dicts:\n",
" d.pop(\"location\", None)"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"dict_keys(['id', 'latitude', 'longitude', 'time', 'step', 'valid_time', 'swh', 'perpw', 'dirpw', 'shww', 'mpww', 'wvdir', 'ws', 'wdir', 'swell', 'swper', 'entry_updated'])"
]
},
"execution_count": 41,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data_dicts[0].keys()"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [],
"source": [
"with get_session(sl_engine) as db:\n",
" stmt = insert(BodhiWaves).values(data_dicts)\n",
" db.execute(stmt)\n",
" db.commit()"
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {},
"outputs": [],
"source": [
"def fetch_wave_data(bs: int = 10):\n",
" with get_session(bodhi_engine) as db:\n",
" stmt = text(\n",
" f\"\"\"select * from wave_forecast where time = CURRENT_DATE AND (latitude, longitude) in ({lat_lon_str}) limit {bs}\"\"\"\n",
" )\n",
" results = db.execute(stmt).fetchall()\n",
" data = [BohdiWavesModel.model_validate(entry) for entry in results]\n",
" data_dict = [entry.model_dump() for entry in data]\n",
" for d in data_dict:\n",
" d.pop(\"location\", None)\n",
" return data_dict"
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [
{
"ename": "SyntaxError",
"evalue": "non-default argument follows default argument (2102384898.py, line 1)",
"output_type": "error",
"traceback": [
"\u001b[0;36m Cell \u001b[0;32mIn[58], line 1\u001b[0;36m\u001b[0m\n\u001b[0;31m def wave_data_to_db(bs: int = 10, data: List[Dict]):\u001b[0m\n\u001b[0m ^\u001b[0m\n\u001b[0;31mSyntaxError\u001b[0m\u001b[0;31m:\u001b[0m non-default argument follows default argument\n"
]
}
],
"source": [
"def wave_data_to_db(data: List[Dict], bs: int = 10):\n",
" with get_session(sl_engine) as db:\n",
" stmt = insert(BodhiWaves).values(data)\n",
" db.execute(stmt)\n",
" db.commit()\n"
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[{'id': 30254,\n",
" 'latitude': 59.5,\n",
" 'longitude': -140.0,\n",
" 'time': datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc),\n",
" 'step': datetime.timedelta(0),\n",
" 'valid_time': datetime.datetime(2024, 6, 12, 0, 0, tzinfo=datetime.timezone.utc),\n",
" 'swh': 1.9600000381469727,\n",
" 'perpw': 10.489999771118164,\n",
" 'dirpw': 218.67999267578125,\n",
" 'shww': None,\n",
" 'mpww': None,\n",
" 'wvdir': None,\n",
" 'ws': 4.110000133514404,\n",
" 'wdir': 269.8800048828125,\n",
" 'swell': 1.8700000047683716,\n",
" 'swper': 10.489999771118164,\n",
" 'entry_updated': datetime.datetime(2024, 6, 12, 13, 19, 52, 744570, tzinfo=datetime.timezone.utc)}]"
]
},
"execution_count": 56,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"fetch_wave_data(1)"
]
},
{
Expand Down
Loading

0 comments on commit def9fe3

Please sign in to comment.