Skip to content

dlt: Hello, World! #875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ updates:
schedule:
interval: "daily"

- directory: "/framework/dlt"
package-ecosystem: "pip"
schedule:
interval: "daily"

- directory: "/framework/flink/kafka-jdbcsink-java"
package-ecosystem: "docker-compose"
schedule:
Expand Down
73 changes: 73 additions & 0 deletions .github/workflows/framework-dlt.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
name: dlt

on:
pull_request:
paths:
- '.github/workflows/framework-dlt.yml'
- 'framework/dlt/**'
- '/requirements.txt'
push:
branches: [ main ]
paths:
- '.github/workflows/framework-dlt.yml'
- 'framework/dlt/**'
- '/requirements.txt'

# Allow job to be triggered manually.
workflow_dispatch:

# Run job each night after CrateDB nightly has been published.
schedule:
- cron: '0 3 * * *'

# Cancel in-progress jobs when pushing to the same branch.
concurrency:
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.ref_name }}

jobs:
test:
name: "
Python: ${{ matrix.python-version }}
CrateDB: ${{ matrix.cratedb-version }}
on ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ 'ubuntu-latest' ]
python-version: [ '3.9', '3.13' ]
cratedb-version: [ 'nightly' ]

services:
cratedb:
image: crate/crate:${{ matrix.cratedb-version }}
ports:
- 4200:4200
- 5432:5432
env:
CRATE_HEAP_SIZE: 4g

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: |
requirements.txt
framework/dlt/requirements.txt
framework/dlt/requirements-dev.txt

- name: Install utilities
run: |
pip install -r requirements.txt

- name: Validate framework/dlt
run: |
ngr test --accept-no-venv framework/dlt
12 changes: 12 additions & 0 deletions framework/dlt/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Put your main configuration values here.
#add_dlt_id = false
#add_dlt_load_id = false

[runtime]

# The system log level of dlt.
log_level="DEBUG"

# Use the `dlthub_telemetry` setting to enable/disable anonymous
# usage data reporting, see https://dlthub.com/docs/reference/telemetry.
dlthub_telemetry = false
26 changes: 26 additions & 0 deletions framework/dlt/.dlt/secrets.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[destination.cratedb.credentials]
# CrateDB PostgreSQL interface
host = "localhost"
port = 5432
username = "crate"
password = ""

[destination.sqlalchemy.credentials]
# CrateDB HTTP interface
# https://dlthub.com/docs/dlt-ecosystem/destinations/sqlalchemy
drivername = "crate"
host = "localhost"
port = 4200
database = ""
username = "crate"
password = ""

[sources.sql_database.credentials]
# CrateDB HTTP interface
# https://dlthub.com/docs/dlt-ecosystem/verified-sources/sql_database/setup
drivername = "crate"
host = "localhost"
port = 4200
database = ""
username = "crate"
password = ""
11 changes: 11 additions & 0 deletions framework/dlt/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ignore secrets, virtual environments and typical python compilation artifacts
# remark: Add it in this case, in order to provide out-of-the-box settings for localhost
# secrets.toml
# ignore basic python artifacts
.env
**/__pycache__/
**/*.py[cod]
**/*$py.class
# ignore duckdb
*.duckdb
*.wal
70 changes: 70 additions & 0 deletions framework/dlt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# dlt with CrateDB example

## About
Demonstrate connectivity from dlt to CrateDB.

## Configuration
Configure database connection address and credentials in `.dlt/secrets.toml`.
Please make sure to use valid credentials matching your environment.

For [CrateDB] on localhost, a default configuration snippet looks like this.
```toml
[destination.cratedb.credentials]
host = "localhost" # CrateDB server host.
port = 5432 # CrateDB PostgreSQL TCP protocol port, default is 5432.
username = "crate" # CrateDB username, default is usually "crate".
password = "" # CrateDB password, if any.
```

For [CrateDB Cloud], a configuration snippet looks like this.
```toml
[destination.cratedb.credentials]
host = "<CLUSTERNAME>.eks1.eu-west-1.aws.cratedb.net" # CrateDB server host.
port = 5432 # CrateDB PostgreSQL TCP protocol port, default is 5432.
username = "admin" # CrateDB username, default is usually "admin".
password = "<PASSWORD>" # CrateDB password, if any.
```

## Usage

Install dependencies.
```shell
pip install -r requirements.txt
```

Invoke two example pipelines.
```shell
python basic.py
python pokemon.py
```

## Appendix

### CrateDB on localhost
Start a CrateDB instance on your machine.
```shell
docker run -it --rm \
--publish=4200:4200 --publish=5432:5432 \
--env=CRATE_HEAP_SIZE=2g \
crate:latest -Cdiscovery.type=single-node
```

### Sandbox
Acquire `cratedb-example` repository, and set up a development sandbox.
```shell
git clone https://github.com/crate/cratedb-examples
cd cratedb-examples
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```

### Software tests
Invoke the integration test cases.
```shell
ngr test framework/dlt
```


[CrateDB]: https://github.com/crate/crate
[CrateDB Cloud]: https://console.cratedb.cloud/
148 changes: 148 additions & 0 deletions framework/dlt/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""The Intro Pipeline Template contains the example from the docs intro page"""
import os
from typing import Optional
import pandas as pd
import sqlalchemy as sa

import dlt
from dlt.sources.helpers import requests


CRATEDB_ADDRESS = os.getenv("CRATEDB_ADDRESS", "postgresql://crate:@localhost:5432/")


def load_api_data() -> None:
"""Load data from the chess api, for more complex examples use our rest_api source"""

# Create a dlt pipeline that will load
# chess player data to the CrateDB destination
pipeline = dlt.pipeline(
pipeline_name="from_api",
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
dataset_name="doc",
)

# Grab some player data from Chess.com API
data = []
for player in ["magnuscarlsen", "rpragchess"]:
response = requests.get(f"https://api.chess.com/pub/player/{player}", timeout=30)
response.raise_for_status()
data.append(response.json())

# Extract, normalize, and load the data
load_info = pipeline.run(
data=data,
table_name="chess_players",
)
print(load_info) # noqa: T201


def load_pandas_data() -> None:
"""Load data from a public csv via pandas"""

owid_disasters_csv = (
"https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv"
)
df = pd.read_csv(owid_disasters_csv)

pipeline = dlt.pipeline(
pipeline_name="from_csv",
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
dataset_name="doc",
)
load_info = pipeline.run(
data=df,
table_name="natural_disasters",
)

print(load_info) # noqa: T201


def load_sql_data() -> None:
"""Load data from a sql database with sqlalchemy, for more complex examples use our sql_database source"""

# Use any SQL database supported by SQLAlchemy, below we use a public
# MySQL instance to get data.
# NOTE: you'll need to install pymysql with `pip install pymysql`
# NOTE: loading data from public mysql instance may take several seconds
# NOTE: this relies on external public database availability
engine = sa.create_engine(
"mysql+pymysql://[email protected]:4497/Rfam"
)

with engine.connect() as conn:
# Select genome table, stream data in batches of 100 elements
query = "SELECT * FROM genome LIMIT 1000"
rows = conn.execution_options(yield_per=100).exec_driver_sql(query)

pipeline = dlt.pipeline(
pipeline_name="from_database",
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
dataset_name="doc",
)

# Convert the rows into dictionaries on the fly with a map function
load_info = pipeline.run(
data=(dict(row._mapping) for row in rows),
table_name="genome",
)

print(load_info) # noqa: T201


@dlt.resource(write_disposition="replace")
def github_api_resource(api_secret_key: Optional[str] = dlt.secrets.value):
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator

url = "https://api.github.com/repos/dlt-hub/dlt/issues"

# Github allows both authenticated and non-authenticated requests (with low rate limits)
auth = BearerTokenAuth(api_secret_key) if api_secret_key else None
for page in paginate(
url,
auth=auth,
paginator=HeaderLinkPaginator(),
params={"state": "open", "per_page": "100"},
):
yield page


@dlt.source
def github_api_source(api_secret_key: Optional[str] = dlt.secrets.value):
return github_api_resource(api_secret_key=api_secret_key)


def load_github_data() -> None:
"""Load GitHub issues data using the github_api_source."""
pipeline = dlt.pipeline(
pipeline_name="github_api_pipeline",
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
dataset_name="doc",
)
load_info = pipeline.run(
data=github_api_source(),
table_name="github_api_data",
)
print(load_info) # noqa: T201


def main():
functions = [
load_api_data,
load_pandas_data,
load_sql_data,
load_github_data,
]
for func in functions:
try:
func()
except Exception as e:
print(f"Error in {func.__name__}: {e}") # noqa: T201


if __name__ == "__main__":
main()
Loading