Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BFD-3854: IDR pipeline POC #2548

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-idr/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.venv
.mypy_cache
__pycache__
27 changes: 27 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-idr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Setup

Install packages

```sh
uv sync
```

# Development

Initialize the database with test data

```sh
./run-db.sh
```

Run the app

```sh
uv run ./pipeline.py
```

Run tests

```sh
uv run pytest
```
12 changes: 12 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-idr/bfd.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
DROP TABLE IF EXISTS idr.beneficiary;
DROP SCHEMA IF EXISTS idr;

CREATE SCHEMA idr
CREATE TABLE beneficiary(
bene_sk BIGINT NOT NULL PRIMARY KEY,
bene_id UUID NOT NULL DEFAULT gen_random_uuid(),
bene_mbi_id VARCHAR(11),
bene_1st_name VARCHAR(30),
bene_last_name VARCHAR(40),
created_timestamp TIMESTAMPTZ NOT NULL,
updated_timestamp TIMESTAMPTZ NOT NULL);
58 changes: 58 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-idr/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import Any, Iterator, Mapping, TypeVar
import psycopg
from psycopg.rows import class_row
from pydantic import BaseModel
import snowflake.connector


T = TypeVar("T", bound=BaseModel)


class Extractor(ABC):
@abstractmethod
def extract(
self, cls: type[T], sql: str, params: dict[str, Any]
) -> Iterator[list[T]]:
pass


class PostgresExtractor(Extractor):
def __init__(self, connection_string: str, batch_size: int):
super().__init__()
self.conn = psycopg.connect(connection_string)
self.batch_size = batch_size

def extract(
self, cls: type[T], sql: str, params: Mapping[str, Any]
) -> Iterator[list[T]]:
with self.conn.cursor(row_factory=class_row(cls)) as cur:
cur.execute(sql, params)
batch: list[T] = cur.fetchmany(self.batch_size)
while len(batch) > 0:
yield batch
batch = cur.fetchmany(self.batch_size)


class SnowflakeExtractor(Extractor):
def __init__(self, batch_size: int):
super().__init__()
self.conn = snowflake.connector.connect(user="", password="", account="")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is untested for now because we don't have Snowflake access. This will be used for prod data and the PostgresExtractor above can be used for test data.

self.batch_size = batch_size

def extract(
self, cls: type[T], sql: str, params: dict[str, Any]
) -> Iterator[list[T]]:
try:
cur = self.conn.cursor()
cur.execute(sql, params)

# fetchmany can return list[dict] or list[tuple] but we'll only use queries that return dicts
batch: list[dict] = cur.fetchmany(self.batch_size) # type: ignore[assignment]
while len(batch) > 0:
data = [cls(**row) for row in batch]
yield data
batch = cur.fetchmany(self.batch_size) # type: ignore[assignment]
finally:
cur.close()
6 changes: 0 additions & 6 deletions apps/bfd-pipeline/bfd-pipeline-idr/hello.py

This file was deleted.

81 changes: 81 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-idr/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from datetime import datetime, timezone
from typing import Iterator, TypeVar
import psycopg
from pydantic import BaseModel

T = TypeVar("T", bound=BaseModel)


class PostgresLoader:
def __init__(
self,
connection_string: str,
table: str,
temp_table: str,
primary_key: str,
exclude_cols: list[str],
insert_cols: list[str],
):
self.conn = psycopg.connect(connection_string)
self.table = table
self.temp_table = temp_table
self.primary_key = primary_key
self.exclude_cols = exclude_cols
self.insert_cols = insert_cols

self.insert_cols.sort()

def load(
self,
fetch_results: Iterator[list[T]],
):
cols_str = ", ".join(self.insert_cols)

update_set = ", ".join(
[f"{v}=EXCLUDED.{v}" for v in self.insert_cols if v != self.primary_key]
)
timestamp = datetime.now(timezone.utc)
with self.conn.cursor() as cur:
# load each batch in a separate transaction
for results in fetch_results:
# Load each batch into a temp table
# This is necessary because we want to use COPY to quickly transfer everything into Postgres
# but COPY can't handle constraint conflicts natively.
# Note that temp tables don't use WAL so that helps with throughput as well.
#
# For simplicity's sake, we'll create our temp tables using the existing schema and just drop the columns we need to ignore
cur.execute(
f"CREATE TEMPORARY TABLE {self.temp_table} (LIKE {self.table}) ON COMMIT DROP"
)
# Created/updated columns don't need to be loaded from the source.
exclude_cols = [
"created_timestamp",
"updated_timestamp",
] + self.exclude_cols
for col in exclude_cols:
cur.execute(f"ALTER TABLE {self.temp_table} DROP COLUMN {col}")

# Use COPY to load the batch into Postgres.
# COPY has a number of optimizations that make bulk loading more efficient than a bunch of INSERTs.
# The entire operation is performed in a single statement, resulting in fewer network round-trips,
# less WAL activity, and less context switching.

# Even though we need to move the data from the temp table in the next step, it should still be
# faster than alternatives.
with cur.copy(
f"COPY {self.temp_table} ({cols_str}) FROM STDIN"
) as copy:
for row in results:
model_dump = row.model_dump()
copy.write_row([model_dump[k] for k in self.insert_cols])
# Upsert into the main table
cur.execute(
f"""
INSERT INTO {self.table}({cols_str}, created_timestamp, updated_timestamp)
SELECT {cols_str}, %(timestamp)s, %(timestamp)s FROM {self.temp_table}
ON CONFLICT ({self.primary_key}) DO UPDATE SET {update_set}, updated_timestamp=%(timestamp)s
""",
{"timestamp": timestamp},
)
self.conn.commit()
# TODO: probably should track progress here so the data load can be stopped and resumed
34 changes: 34 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-idr/mock-idr.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
DROP TABLE IF EXISTS cms_vdm_view_mdcr_prd.v2_mdcr_bene;
DROP SCHEMA IF EXISTS cms_vdm_view_mdcr_prd;

CREATE SCHEMA cms_vdm_view_mdcr_prd
CREATE TABLE v2_mdcr_bene (
bene_sk BIGINT,
bene_xref_efctv_sk BIGINT,
bene_mbi_id VARCHAR(11),
bene_1st_name VARCHAR(30),
bene_last_name VARCHAR(40),
idr_trans_efctv_ts TIMESTAMPTZ,
idr_trans_obslt_ts TIMESTAMPTZ
);

INSERT INTO cms_vdm_view_mdcr_prd.v2_mdcr_bene(
bene_sk,
bene_xref_efctv_sk,
bene_mbi_id,
bene_1st_name,
bene_last_name,
idr_trans_efctv_ts,
idr_trans_obslt_ts)
VALUES(1, 1, '1S000000000', 'Chuck', 'Norris', NOW(), '9999-12-31');

INSERT INTO cms_vdm_view_mdcr_prd.v2_mdcr_bene(
bene_sk,
bene_xref_efctv_sk,
bene_mbi_id,
bene_1st_name,
bene_last_name,
idr_trans_efctv_ts,
idr_trans_obslt_ts)
VALUES(2, 2, '1S000000001', 'Bob', 'Saget', NOW(), '9999-12-31');

8 changes: 8 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-idr/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from pydantic import BaseModel


class IdrBeneficiary(BaseModel):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pydantic's model classes make it easy to work with typed data coming out of the database. It does introduce some overhead due to runtime type checking though, so we may want to tweak this in the future if the performance hit is large.

bene_sk: int
bene_mbi_id: str
bene_1st_name: str
bene_last_name: str
58 changes: 58 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-idr/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
from loader import PostgresLoader
from model import IdrBeneficiary
from extractor import Extractor, PostgresExtractor

logger = logging.getLogger(__name__)

fetch_query = """
SELECT
bene.bene_sk,
bene.bene_mbi_id,
bene.bene_1st_name,
bene.bene_last_name
FROM
cms_vdm_view_mdcr_prd.v2_mdcr_bene bene
WHERE
bene.idr_trans_obslt_ts > '9999-12-30'
AND bene.bene_xref_efctv_sk = bene.bene_sk
"""


def init_logger():
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter("[%(levelname)s] %(asctime)s %(message)s")
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


def main():
init_logger()

run_pipeline(
PostgresExtractor(
"host=localhost dbname=idr user=bfd password=InsecureLocalDev", 100_000
),
"host=localhost dbname=fhirdb user=bfd password=InsecureLocalDev",
)


def run_pipeline(extractor: Extractor, connection_string: str):
logger.info("fetching IDR data")
iter = extractor.extract(IdrBeneficiary, fetch_query, {})

logger.info("loading data")
PostgresLoader(
connection_string=connection_string,
table="idr.beneficiary",
temp_table="beneficiary_temp",
primary_key="bene_sk",
exclude_cols=["bene_id"],
insert_cols=["bene_sk", "bene_mbi_id", "bene_1st_name", "bene_last_name"],
).load(iter)
logger.info("done")


if __name__ == "__main__":
main()
9 changes: 9 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-idr/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,14 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"psycopg>=3.2.4",
"pydantic>=2.10.6",
"snowflake-connector-python>=3.13.2",
]

[dependency-groups]
dev = ["mypy>=1.15.0", "pytest>=8.3.4", "testcontainers>=4.9.1"]

[[tool.mypy.overrides]]
module = ["testcontainers.*"]
follow_untyped_imports = true
45 changes: 45 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-idr/run-db.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash

set -e

image=postgres:16.4
max_connections=500
docker pull $image

docker rm -f bfd-idr-db
docker volume rm -f bfd-idr-db

docker run \
-d \
--name 'bfd-idr-db' \
-e 'POSTGRES_USER=bfd' \
-e 'POSTGRES_PASSWORD=InsecureLocalDev' \
-p '5432:5432' \
-v 'bfd-idr-db:/var/lib/postgresql/data' \
$image \
postgres -N $max_connections

echo
echo Waiting for port 5432 to become available.
sleep 2
docker exec bfd-idr-db timeout 15 bash -c 'until echo > /dev/tcp/localhost/5432; do sleep 1; done'

echo
echo Creating database
docker exec bfd-idr-db createdb --host localhost --username bfd --owner bfd idr
docker exec bfd-idr-db createdb --host localhost --username bfd --owner bfd fhirdb

echo
echo Database created successfully.

docker cp ./mock-idr.sql bfd-idr-db:/docker-entrypoint-initdb.d/mock-idr.sql
docker cp ./bfd.sql bfd-idr-db:/docker-entrypoint-initdb.d/bfd.sql

echo
echo Creating schema.

docker exec -u postgres bfd-idr-db psql idr bfd -f docker-entrypoint-initdb.d/mock-idr.sql
docker exec -u postgres bfd-idr-db psql fhirdb bfd -f docker-entrypoint-initdb.d/bfd.sql

echo
echo Schema created successfully.
Loading
Loading