diff --git a/.github/dependabot.yml b/.github/dependabot.yml index a3aefbce..69d6b902 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -147,6 +147,11 @@ updates: schedule: interval: "daily" + - directory: "/framework/dlt" + package-ecosystem: "pip" + schedule: + interval: "daily" + - directory: "/framework/flink/kafka-jdbcsink-java" package-ecosystem: "docker-compose" schedule: diff --git a/.github/workflows/framework-dlt.yml b/.github/workflows/framework-dlt.yml new file mode 100644 index 00000000..32374275 --- /dev/null +++ b/.github/workflows/framework-dlt.yml @@ -0,0 +1,73 @@ +name: dlt + +on: + pull_request: + paths: + - '.github/workflows/framework-dlt.yml' + - 'framework/dlt/**' + - '/requirements.txt' + push: + branches: [ main ] + paths: + - '.github/workflows/framework-dlt.yml' + - 'framework/dlt/**' + - '/requirements.txt' + + # Allow job to be triggered manually. + workflow_dispatch: + + # Run job each night after CrateDB nightly has been published. + schedule: + - cron: '0 3 * * *' + +# Cancel in-progress jobs when pushing to the same branch. +concurrency: + cancel-in-progress: true + group: ${{ github.workflow }}-${{ github.ref_name }} + +jobs: + test: + name: " + Python: ${{ matrix.python-version }} + CrateDB: ${{ matrix.cratedb-version }} + on ${{ matrix.os }}" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ 'ubuntu-latest' ] + python-version: [ '3.9', '3.13' ] + cratedb-version: [ 'nightly' ] + + services: + cratedb: + image: crate/crate:${{ matrix.cratedb-version }} + ports: + - 4200:4200 + - 5432:5432 + env: + CRATE_HEAP_SIZE: 4g + + steps: + + - name: Acquire sources + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + cache: 'pip' + cache-dependency-path: | + requirements.txt + framework/dlt/requirements.txt + framework/dlt/requirements-dev.txt + + - name: Install utilities + run: | + pip install -r requirements.txt + + - name: Validate framework/dlt + run: | + ngr test --accept-no-venv framework/dlt diff --git a/framework/dlt/.dlt/config.toml b/framework/dlt/.dlt/config.toml new file mode 100644 index 00000000..466650e5 --- /dev/null +++ b/framework/dlt/.dlt/config.toml @@ -0,0 +1,12 @@ +# Put your main configuration values here. +#add_dlt_id = false +#add_dlt_load_id = false + +[runtime] + +# The system log level of dlt. +log_level="DEBUG" + +# Use the `dlthub_telemetry` setting to enable/disable anonymous +# usage data reporting, see https://dlthub.com/docs/reference/telemetry. +dlthub_telemetry = false diff --git a/framework/dlt/.dlt/secrets.toml b/framework/dlt/.dlt/secrets.toml new file mode 100644 index 00000000..40026ec0 --- /dev/null +++ b/framework/dlt/.dlt/secrets.toml @@ -0,0 +1,26 @@ +[destination.cratedb.credentials] +# CrateDB PostgreSQL interface +host = "localhost" +port = 5432 +username = "crate" +password = "" + +[destination.sqlalchemy.credentials] +# CrateDB HTTP interface +# https://dlthub.com/docs/dlt-ecosystem/destinations/sqlalchemy +drivername = "crate" +host = "localhost" +port = 4200 +database = "" +username = "crate" +password = "" + +[sources.sql_database.credentials] +# CrateDB HTTP interface +# https://dlthub.com/docs/dlt-ecosystem/verified-sources/sql_database/setup +drivername = "crate" +host = "localhost" +port = 4200 +database = "" +username = "crate" +password = "" diff --git a/framework/dlt/.gitignore b/framework/dlt/.gitignore new file mode 100644 index 00000000..1196403b --- /dev/null +++ b/framework/dlt/.gitignore @@ -0,0 +1,11 @@ +# ignore secrets, virtual environments and typical python compilation artifacts +# remark: Add it in this case, in order to provide out-of-the-box settings for localhost +# secrets.toml +# ignore basic python artifacts +.env +**/__pycache__/ +**/*.py[cod] +**/*$py.class +# ignore duckdb +*.duckdb +*.wal diff --git a/framework/dlt/README.md b/framework/dlt/README.md new file mode 100644 index 00000000..bc072ebb --- /dev/null +++ b/framework/dlt/README.md @@ -0,0 +1,70 @@ +# dlt with CrateDB example + +## About +Demonstrate connectivity from dlt to CrateDB. + +## Configuration +Configure database connection address and credentials in `.dlt/secrets.toml`. +Please make sure to use valid credentials matching your environment. + +For [CrateDB] on localhost, a default configuration snippet looks like this. +```toml +[destination.cratedb.credentials] +host = "localhost" # CrateDB server host. +port = 5432 # CrateDB PostgreSQL TCP protocol port, default is 5432. +username = "crate" # CrateDB username, default is usually "crate". +password = "" # CrateDB password, if any. +``` + +For [CrateDB Cloud], a configuration snippet looks like this. +```toml +[destination.cratedb.credentials] +host = ".eks1.eu-west-1.aws.cratedb.net" # CrateDB server host. +port = 5432 # CrateDB PostgreSQL TCP protocol port, default is 5432. +username = "admin" # CrateDB username, default is usually "admin". +password = "" # CrateDB password, if any. +``` + +## Usage + +Install dependencies. +```shell +pip install -r requirements.txt +``` + +Invoke two example pipelines. +```shell +python basic.py +python pokemon.py +``` + +## Appendix + +### CrateDB on localhost +Start a CrateDB instance on your machine. +```shell +docker run -it --rm \ + --publish=4200:4200 --publish=5432:5432 \ + --env=CRATE_HEAP_SIZE=2g \ + crate:latest -Cdiscovery.type=single-node +``` + +### Sandbox +Acquire `cratedb-example` repository, and set up a development sandbox. +```shell +git clone https://github.com/crate/cratedb-examples +cd cratedb-examples +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +### Software tests +Invoke the integration test cases. +```shell +ngr test framework/dlt +``` + + +[CrateDB]: https://github.com/crate/crate +[CrateDB Cloud]: https://console.cratedb.cloud/ diff --git a/framework/dlt/basic.py b/framework/dlt/basic.py new file mode 100644 index 00000000..5beb741f --- /dev/null +++ b/framework/dlt/basic.py @@ -0,0 +1,148 @@ +"""The Intro Pipeline Template contains the example from the docs intro page""" +import os +from typing import Optional +import pandas as pd +import sqlalchemy as sa + +import dlt +from dlt.sources.helpers import requests + + +CRATEDB_ADDRESS = os.getenv("CRATEDB_ADDRESS", "postgresql://crate:@localhost:5432/") + + +def load_api_data() -> None: + """Load data from the chess api, for more complex examples use our rest_api source""" + + # Create a dlt pipeline that will load + # chess player data to the CrateDB destination + pipeline = dlt.pipeline( + pipeline_name="from_api", + destination=dlt.destinations.cratedb(CRATEDB_ADDRESS), + dataset_name="doc", + ) + + # Grab some player data from Chess.com API + data = [] + for player in ["magnuscarlsen", "rpragchess"]: + response = requests.get(f"https://api.chess.com/pub/player/{player}", timeout=30) + response.raise_for_status() + data.append(response.json()) + + # Extract, normalize, and load the data + load_info = pipeline.run( + data=data, + table_name="chess_players", + ) + print(load_info) # noqa: T201 + + +def load_pandas_data() -> None: + """Load data from a public csv via pandas""" + + owid_disasters_csv = ( + "https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/" + "Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/" + "Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv" + ) + df = pd.read_csv(owid_disasters_csv) + + pipeline = dlt.pipeline( + pipeline_name="from_csv", + destination=dlt.destinations.cratedb(CRATEDB_ADDRESS), + dataset_name="doc", + ) + load_info = pipeline.run( + data=df, + table_name="natural_disasters", + ) + + print(load_info) # noqa: T201 + + +def load_sql_data() -> None: + """Load data from a sql database with sqlalchemy, for more complex examples use our sql_database source""" + + # Use any SQL database supported by SQLAlchemy, below we use a public + # MySQL instance to get data. + # NOTE: you'll need to install pymysql with `pip install pymysql` + # NOTE: loading data from public mysql instance may take several seconds + # NOTE: this relies on external public database availability + engine = sa.create_engine( + "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" + ) + + with engine.connect() as conn: + # Select genome table, stream data in batches of 100 elements + query = "SELECT * FROM genome LIMIT 1000" + rows = conn.execution_options(yield_per=100).exec_driver_sql(query) + + pipeline = dlt.pipeline( + pipeline_name="from_database", + destination=dlt.destinations.cratedb(CRATEDB_ADDRESS), + dataset_name="doc", + ) + + # Convert the rows into dictionaries on the fly with a map function + load_info = pipeline.run( + data=(dict(row._mapping) for row in rows), + table_name="genome", + ) + + print(load_info) # noqa: T201 + + +@dlt.resource(write_disposition="replace") +def github_api_resource(api_secret_key: Optional[str] = dlt.secrets.value): + from dlt.sources.helpers.rest_client import paginate + from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator + + url = "https://api.github.com/repos/dlt-hub/dlt/issues" + + # Github allows both authenticated and non-authenticated requests (with low rate limits) + auth = BearerTokenAuth(api_secret_key) if api_secret_key else None + for page in paginate( + url, + auth=auth, + paginator=HeaderLinkPaginator(), + params={"state": "open", "per_page": "100"}, + ): + yield page + + +@dlt.source +def github_api_source(api_secret_key: Optional[str] = dlt.secrets.value): + return github_api_resource(api_secret_key=api_secret_key) + + +def load_github_data() -> None: + """Load GitHub issues data using the github_api_source.""" + pipeline = dlt.pipeline( + pipeline_name="github_api_pipeline", + destination=dlt.destinations.cratedb(CRATEDB_ADDRESS), + dataset_name="doc", + ) + load_info = pipeline.run( + data=github_api_source(), + table_name="github_api_data", + ) + print(load_info) # noqa: T201 + + +def main(): + functions = [ + load_api_data, + load_pandas_data, + load_sql_data, + load_github_data, + ] + for func in functions: + try: + func() + except Exception as e: + print(f"Error in {func.__name__}: {e}") # noqa: T201 + + +if __name__ == "__main__": + main() diff --git a/framework/dlt/pokemon.py b/framework/dlt/pokemon.py new file mode 100644 index 00000000..c8375c50 --- /dev/null +++ b/framework/dlt/pokemon.py @@ -0,0 +1,49 @@ +"""data load tool (dlt) — the open-source Python library for data loading + +How to create a data loading pipeline with dlt and CrateDB in 3 seconds: + +0. Configure `cratedb` destination in `.dlt/secrets.toml`. + ```toml + [destination.cratedb.credentials] + host = "localhost" + port = 5432 + username = "crate" + password = "" + ``` + +1. Write a pipeline script +>>> import dlt +>>> from dlt.sources.helpers import requests +>>> dlt.run( +... data=requests.get("https://pokeapi.co/api/v2/pokemon/").json()["results"], +... destination="cratedb", +... dataset_name="doc", +... table_name="pokemon") + +2. Run your pipeline script + > $ python pokemon.py + +3. See and query your data with autogenerated Streamlit app + > $ dlt pipeline dlt_pokemon show + +Or start with our pipeline template with sample PokeAPI (pokeapi.co) data loaded to bigquery + + > $ dlt init pokemon bigquery + +For more detailed info, see https://dlthub.com/docs/intro +""" + + +def main(): + import dlt + from dlt.sources.helpers import requests + + dlt.run( + data=requests.get("https://pokeapi.co/api/v2/pokemon/").json()["results"], + destination="cratedb", + dataset_name="doc", + table_name="pokemon") + + +if __name__ == "__main__": + main() diff --git a/framework/dlt/pyproject.toml b/framework/dlt/pyproject.toml new file mode 100644 index 00000000..8cd2294a --- /dev/null +++ b/framework/dlt/pyproject.toml @@ -0,0 +1,12 @@ +[tool.pytest.ini_options] +minversion = "2.0" +addopts = """ + -rfEX -p pytester --strict-markers --verbosity=3 + --capture=no + """ +log_level = "DEBUG" +log_cli_level = "DEBUG" +testpaths = ["*.py"] +xfail_strict = true +markers = [ +] diff --git a/framework/dlt/requirements-dev.txt b/framework/dlt/requirements-dev.txt new file mode 100644 index 00000000..f24557a6 --- /dev/null +++ b/framework/dlt/requirements-dev.txt @@ -0,0 +1,3 @@ +cratedb-toolkit>=0.0.33 +pueblo[sfa] +pytest<9 diff --git a/framework/dlt/requirements.txt b/framework/dlt/requirements.txt new file mode 100644 index 00000000..f4ef8fb9 --- /dev/null +++ b/framework/dlt/requirements.txt @@ -0,0 +1,9 @@ +# Development +dlt[cratedb,parquet] @ git+https://github.com/crate-workbench/dlt@cratedb + +# Production +# TODO: switch to official dlt[cratedb]>=1.12.0 once it’s published with CrateDB support. +# dlt[cratedb,parquet]>=1.12.0 + +pandas>=2,<2.4 +pymysql>=1,<1.2 diff --git a/framework/dlt/test.py b/framework/dlt/test.py new file mode 100644 index 00000000..d72e5657 --- /dev/null +++ b/framework/dlt/test.py @@ -0,0 +1,89 @@ +import logging + +import pytest +from cratedb_toolkit.util.database import DatabaseAdapter +from pueblo.sfa.core import run + +logger = logging.getLogger() + + +@pytest.fixture(scope="session") +def db(): + return DatabaseAdapter("crate://") + + +def test_basic_load_api(db): + """ + Verify dlt pipeline loading data from an API. + """ + + # Invoke database import. + run("basic.py:load_api_data", {}) + + # Validate database content. + db.refresh_table("doc.chess_players") + records = db.run_sql("SELECT * FROM doc.chess_players", records=True) + assert len(records) >= 2 + names = [record["name"] for record in records] + assert "Magnus Carlsen" in names + assert "Praggnanandhaa Rameshbabu" in names + + +def test_basic_load_pandas(db): + """ + Verify dlt pipeline loading data from a pandas dataframe that has been populated from CSV data. + """ + + # Invoke database import. + run("basic.py:load_pandas_data", {}) + + # Validate database content. + db.refresh_table("doc.natural_disasters") + records = db.run_sql("SELECT * FROM doc.natural_disasters", records=True) + assert len(records) >= 837 + entities = [record["entity"] for record in records] + assert "Flood" in entities + assert "Landslide" in entities + assert "Wildfire" in entities + + +def test_basic_load_sql(db): + """ + Verify dlt pipeline loading data from an SQL database. + """ + + # Invoke database import. + run("basic.py:load_sql_data", {}) + + # Validate database content. + db.refresh_table("doc.genome") + records = db.run_sql("SELECT * FROM doc.genome", records=True) + assert len(records) >= 1000 + + +def test_basic_load_github(db): + """ + Verify dlt pipeline loading data from GitHub. + """ + + # Invoke database import. + run("basic.py:load_github_data", {}) + + # Validate database content. + db.refresh_table("doc.github_api_data") + records = db.run_sql("SELECT * FROM doc.github_api_data", records=True) + assert len(records) >= 277 + + +def test_pokemon(db): + """ + Verify the dlt pokemon example. + """ + + # Invoke database import. + run("pokemon.py:main", {}) + + # Validate database content. + db.refresh_table("doc.pokemon") + records = db.run_sql("SELECT * FROM doc.pokemon", records=True) + assert len(records) >= 20