-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BFD-3854: IDR pipeline POC #2548
base: master
Are you sure you want to change the base?
Changes from all commits
29a47e2
0c86b37
a36be64
1a05867
6cb04b5
72b7b6e
5ec4fda
f39aa48
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
.venv | ||
.mypy_cache | ||
__pycache__ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Setup | ||
|
||
Install packages | ||
|
||
```sh | ||
uv sync | ||
``` | ||
|
||
# Development | ||
|
||
Initialize the database with test data | ||
|
||
```sh | ||
./run-db.sh | ||
``` | ||
|
||
Run the app | ||
|
||
```sh | ||
uv run ./pipeline.py | ||
``` | ||
|
||
Run tests | ||
|
||
```sh | ||
uv run pytest | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
DROP TABLE IF EXISTS idr.beneficiary; | ||
DROP SCHEMA IF EXISTS idr; | ||
|
||
CREATE SCHEMA idr | ||
CREATE TABLE beneficiary( | ||
bene_sk BIGINT NOT NULL PRIMARY KEY, | ||
bene_id UUID NOT NULL DEFAULT gen_random_uuid(), | ||
bene_mbi_id VARCHAR(11), | ||
bene_1st_name VARCHAR(30), | ||
bene_last_name VARCHAR(40), | ||
created_timestamp TIMESTAMPTZ NOT NULL, | ||
updated_timestamp TIMESTAMPTZ NOT NULL); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
from abc import ABC, abstractmethod | ||
from datetime import datetime, timezone | ||
from typing import Any, Iterator, Mapping, TypeVar | ||
import psycopg | ||
from psycopg.rows import class_row | ||
from pydantic import BaseModel | ||
import snowflake.connector | ||
|
||
|
||
T = TypeVar("T", bound=BaseModel) | ||
|
||
|
||
class Extractor(ABC): | ||
@abstractmethod | ||
def extract( | ||
self, cls: type[T], sql: str, params: dict[str, Any] | ||
) -> Iterator[list[T]]: | ||
pass | ||
|
||
|
||
class PostgresExtractor(Extractor): | ||
def __init__(self, connection_string: str, batch_size: int): | ||
super().__init__() | ||
self.conn = psycopg.connect(connection_string) | ||
self.batch_size = batch_size | ||
|
||
def extract( | ||
self, cls: type[T], sql: str, params: Mapping[str, Any] | ||
) -> Iterator[list[T]]: | ||
with self.conn.cursor(row_factory=class_row(cls)) as cur: | ||
cur.execute(sql, params) | ||
batch: list[T] = cur.fetchmany(self.batch_size) | ||
while len(batch) > 0: | ||
yield batch | ||
batch = cur.fetchmany(self.batch_size) | ||
|
||
|
||
class SnowflakeExtractor(Extractor): | ||
def __init__(self, batch_size: int): | ||
super().__init__() | ||
self.conn = snowflake.connector.connect(user="", password="", account="") | ||
self.batch_size = batch_size | ||
|
||
def extract( | ||
self, cls: type[T], sql: str, params: dict[str, Any] | ||
) -> Iterator[list[T]]: | ||
try: | ||
cur = self.conn.cursor() | ||
cur.execute(sql, params) | ||
|
||
# fetchmany can return list[dict] or list[tuple] but we'll only use queries that return dicts | ||
batch: list[dict] = cur.fetchmany(self.batch_size) # type: ignore[assignment] | ||
while len(batch) > 0: | ||
data = [cls(**row) for row in batch] | ||
yield data | ||
batch = cur.fetchmany(self.batch_size) # type: ignore[assignment] | ||
finally: | ||
cur.close() |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
from datetime import datetime, timezone | ||
from typing import Iterator, TypeVar | ||
import psycopg | ||
from pydantic import BaseModel | ||
|
||
T = TypeVar("T", bound=BaseModel) | ||
|
||
|
||
class PostgresLoader: | ||
def __init__( | ||
self, | ||
connection_string: str, | ||
table: str, | ||
temp_table: str, | ||
primary_key: str, | ||
exclude_cols: list[str], | ||
insert_cols: list[str], | ||
): | ||
self.conn = psycopg.connect(connection_string) | ||
self.table = table | ||
self.temp_table = temp_table | ||
self.primary_key = primary_key | ||
self.exclude_cols = exclude_cols | ||
self.insert_cols = insert_cols | ||
|
||
self.insert_cols.sort() | ||
|
||
def load( | ||
self, | ||
fetch_results: Iterator[list[T]], | ||
): | ||
cols_str = ", ".join(self.insert_cols) | ||
|
||
update_set = ", ".join( | ||
[f"{v}=EXCLUDED.{v}" for v in self.insert_cols if v != self.primary_key] | ||
) | ||
timestamp = datetime.now(timezone.utc) | ||
with self.conn.cursor() as cur: | ||
# load each batch in a separate transaction | ||
for results in fetch_results: | ||
# Load each batch into a temp table | ||
# This is necessary because we want to use COPY to quickly transfer everything into Postgres | ||
# but COPY can't handle constraint conflicts natively. | ||
# Note that temp tables don't use WAL so that helps with throughput as well. | ||
# | ||
# For simplicity's sake, we'll create our temp tables using the existing schema and just drop the columns we need to ignore | ||
cur.execute( | ||
f"CREATE TEMPORARY TABLE {self.temp_table} (LIKE {self.table}) ON COMMIT DROP" | ||
) | ||
# Created/updated columns don't need to be loaded from the source. | ||
exclude_cols = [ | ||
"created_timestamp", | ||
"updated_timestamp", | ||
] + self.exclude_cols | ||
for col in exclude_cols: | ||
cur.execute(f"ALTER TABLE {self.temp_table} DROP COLUMN {col}") | ||
|
||
# Use COPY to load the batch into Postgres. | ||
# COPY has a number of optimizations that make bulk loading more efficient than a bunch of INSERTs. | ||
# The entire operation is performed in a single statement, resulting in fewer network round-trips, | ||
# less WAL activity, and less context switching. | ||
|
||
# Even though we need to move the data from the temp table in the next step, it should still be | ||
# faster than alternatives. | ||
with cur.copy( | ||
f"COPY {self.temp_table} ({cols_str}) FROM STDIN" | ||
) as copy: | ||
for row in results: | ||
model_dump = row.model_dump() | ||
copy.write_row([model_dump[k] for k in self.insert_cols]) | ||
# Upsert into the main table | ||
cur.execute( | ||
f""" | ||
INSERT INTO {self.table}({cols_str}, created_timestamp, updated_timestamp) | ||
SELECT {cols_str}, %(timestamp)s, %(timestamp)s FROM {self.temp_table} | ||
ON CONFLICT ({self.primary_key}) DO UPDATE SET {update_set}, updated_timestamp=%(timestamp)s | ||
""", | ||
{"timestamp": timestamp}, | ||
) | ||
self.conn.commit() | ||
# TODO: probably should track progress here so the data load can be stopped and resumed |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
DROP TABLE IF EXISTS cms_vdm_view_mdcr_prd.v2_mdcr_bene; | ||
DROP SCHEMA IF EXISTS cms_vdm_view_mdcr_prd; | ||
|
||
CREATE SCHEMA cms_vdm_view_mdcr_prd | ||
CREATE TABLE v2_mdcr_bene ( | ||
bene_sk BIGINT, | ||
bene_xref_efctv_sk BIGINT, | ||
bene_mbi_id VARCHAR(11), | ||
bene_1st_name VARCHAR(30), | ||
bene_last_name VARCHAR(40), | ||
idr_trans_efctv_ts TIMESTAMPTZ, | ||
idr_trans_obslt_ts TIMESTAMPTZ | ||
); | ||
|
||
INSERT INTO cms_vdm_view_mdcr_prd.v2_mdcr_bene( | ||
bene_sk, | ||
bene_xref_efctv_sk, | ||
bene_mbi_id, | ||
bene_1st_name, | ||
bene_last_name, | ||
idr_trans_efctv_ts, | ||
idr_trans_obslt_ts) | ||
VALUES(1, 1, '1S000000000', 'Chuck', 'Norris', NOW(), '9999-12-31'); | ||
|
||
INSERT INTO cms_vdm_view_mdcr_prd.v2_mdcr_bene( | ||
bene_sk, | ||
bene_xref_efctv_sk, | ||
bene_mbi_id, | ||
bene_1st_name, | ||
bene_last_name, | ||
idr_trans_efctv_ts, | ||
idr_trans_obslt_ts) | ||
VALUES(2, 2, '1S000000001', 'Bob', 'Saget', NOW(), '9999-12-31'); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from pydantic import BaseModel | ||
|
||
|
||
class IdrBeneficiary(BaseModel): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pydantic's model classes make it easy to work with typed data coming out of the database. It does introduce some overhead due to runtime type checking though, so we may want to tweak this in the future if the performance hit is large. |
||
bene_sk: int | ||
bene_mbi_id: str | ||
bene_1st_name: str | ||
bene_last_name: str |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import logging | ||
from loader import PostgresLoader | ||
from model import IdrBeneficiary | ||
from extractor import Extractor, PostgresExtractor | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
fetch_query = """ | ||
SELECT | ||
bene.bene_sk, | ||
bene.bene_mbi_id, | ||
bene.bene_1st_name, | ||
bene.bene_last_name | ||
FROM | ||
cms_vdm_view_mdcr_prd.v2_mdcr_bene bene | ||
WHERE | ||
bene.idr_trans_obslt_ts > '9999-12-30' | ||
AND bene.bene_xref_efctv_sk = bene.bene_sk | ||
""" | ||
|
||
|
||
def init_logger(): | ||
logger.setLevel(logging.INFO) | ||
console_handler = logging.StreamHandler() | ||
formatter = logging.Formatter("[%(levelname)s] %(asctime)s %(message)s") | ||
console_handler.setFormatter(formatter) | ||
logger.addHandler(console_handler) | ||
|
||
|
||
def main(): | ||
init_logger() | ||
|
||
run_pipeline( | ||
PostgresExtractor( | ||
"host=localhost dbname=idr user=bfd password=InsecureLocalDev", 100_000 | ||
), | ||
"host=localhost dbname=fhirdb user=bfd password=InsecureLocalDev", | ||
) | ||
|
||
|
||
def run_pipeline(extractor: Extractor, connection_string: str): | ||
logger.info("fetching IDR data") | ||
iter = extractor.extract(IdrBeneficiary, fetch_query, {}) | ||
|
||
logger.info("loading data") | ||
PostgresLoader( | ||
connection_string=connection_string, | ||
table="idr.beneficiary", | ||
temp_table="beneficiary_temp", | ||
primary_key="bene_sk", | ||
exclude_cols=["bene_id"], | ||
insert_cols=["bene_sk", "bene_mbi_id", "bene_1st_name", "bene_last_name"], | ||
).load(iter) | ||
logger.info("done") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
#!/bin/bash | ||
|
||
set -e | ||
|
||
image=postgres:16.4 | ||
max_connections=500 | ||
docker pull $image | ||
|
||
docker rm -f bfd-idr-db | ||
docker volume rm -f bfd-idr-db | ||
|
||
docker run \ | ||
-d \ | ||
--name 'bfd-idr-db' \ | ||
-e 'POSTGRES_USER=bfd' \ | ||
-e 'POSTGRES_PASSWORD=InsecureLocalDev' \ | ||
-p '5432:5432' \ | ||
-v 'bfd-idr-db:/var/lib/postgresql/data' \ | ||
$image \ | ||
postgres -N $max_connections | ||
|
||
echo | ||
echo Waiting for port 5432 to become available. | ||
sleep 2 | ||
docker exec bfd-idr-db timeout 15 bash -c 'until echo > /dev/tcp/localhost/5432; do sleep 1; done' | ||
|
||
echo | ||
echo Creating database | ||
docker exec bfd-idr-db createdb --host localhost --username bfd --owner bfd idr | ||
docker exec bfd-idr-db createdb --host localhost --username bfd --owner bfd fhirdb | ||
|
||
echo | ||
echo Database created successfully. | ||
|
||
docker cp ./mock-idr.sql bfd-idr-db:/docker-entrypoint-initdb.d/mock-idr.sql | ||
docker cp ./bfd.sql bfd-idr-db:/docker-entrypoint-initdb.d/bfd.sql | ||
|
||
echo | ||
echo Creating schema. | ||
|
||
docker exec -u postgres bfd-idr-db psql idr bfd -f docker-entrypoint-initdb.d/mock-idr.sql | ||
docker exec -u postgres bfd-idr-db psql fhirdb bfd -f docker-entrypoint-initdb.d/bfd.sql | ||
|
||
echo | ||
echo Schema created successfully. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is untested for now because we don't have Snowflake access. This will be used for prod data and the
PostgresExtractor
above can be used for test data.