Skip to content

dlt: Hello, World! #875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions framework/dlt/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# put your configuration values here
#add_dlt_load_id = false

[runtime]
log_level="WARNING" # the system log level of dlt
# use the dlthub_telemetry setting to enable/disable anonymous usage data reporting, see https://dlthub.com/docs/reference/telemetry
dlthub_telemetry = false
10 changes: 10 additions & 0 deletions framework/dlt/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# ignore secrets, virtual environments and typical python compilation artifacts
secrets.toml
# ignore basic python artifacts
.env
**/__pycache__/
**/*.py[cod]
**/*$py.class
# ignore duckdb
*.duckdb
*.wal
41 changes: 41 additions & 0 deletions framework/dlt/ddl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
CREATE TABLE IF NOT EXISTS "doc"."_dlt_version" (
"version" bigint NOT NULL,
"engine_version" bigint NOT NULL,
"inserted_at" timestamp with time zone NOT NULL,
"schema_name" varchar NOT NULL,
"version_hash" varchar NOT NULL,
"schema" varchar NOT NULL);
CREATE TABLE IF NOT EXISTS "doc"."_dlt_loads" (
"load_id" varchar NOT NULL,
"schema_name" varchar ,
"status" bigint NOT NULL,
"inserted_at" timestamp with time zone NOT NULL,
"schema_version_hash" varchar );
CREATE TABLE IF NOT EXISTS "doc"."_dlt_pipeline_state" (
"version" bigint NOT NULL,
"engine_version" bigint NOT NULL,
"pipeline_name" varchar NOT NULL,
"state" varchar NOT NULL,
"created_at" timestamp with time zone NOT NULL,
"version_hash" varchar ,
"_dlt_load_id" varchar NOT NULL,
"_dlt_id" varchar UNIQUE NOT NULL);
CREATE TABLE "doc"."player" (
"avatar" varchar ,
"player_id" bigint ,
"aid" varchar ,
"url" varchar ,
"name" varchar ,
"username" varchar ,
"title" varchar ,
"followers" bigint ,
"country" varchar ,
"location" varchar ,
"last_online" bigint ,
"joined" bigint ,
"status" varchar ,
"is_streamer" boolean ,
"verified" boolean ,
"league" varchar ,
"_dlt_load_id" varchar NOT NULL,
"_dlt_id" varchar UNIQUE NOT NULL);
32 changes: 32 additions & 0 deletions framework/dlt/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dlt.common.schema import typing

# default dlt columns
typing.C_DLT_ID = "__dlt_id"
"""unique id of current row"""
typing.C_DLT_LOAD_ID = "__dlt_load_id"
"""load id to identify records loaded in a single load package"""

import dlt
from dlt.sources.helpers import requests

# Create a dlt pipeline that will load
# chess player data to the DuckDB destination
pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination=dlt.destinations.sqlalchemy("crate://crate:@localhost/"),
#destination=dlt.destinations.postgres("postgresql://postgres:@localhost/"),
dataset_name='doc',
)

# Grab some player data from Chess.com API
data = []
for player in ['magnuscarlsen', 'rpragchess']:
response = requests.get(f'https://api.chess.com/pub/player/{player}')
response.raise_for_status()
data.append(response.json())

# Extract, normalize, and load the data
if __name__ == "__main__":
pipeline.run(data,
table_name='player',
)
111 changes: 111 additions & 0 deletions framework/dlt/foo_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""The Intro Pipeline Template contains the example from the docs intro page"""

# mypy: disable-error-code="no-untyped-def,arg-type"

from typing import Optional
import pandas as pd
import sqlalchemy as sa

import dlt
from dlt.sources.helpers import requests


def load_api_data() -> None:
"""Load data from the chess api, for more complex examples use our rest_api source"""

# Create a dlt pipeline that will load
# chess player data to the DuckDB destination
pipeline = dlt.pipeline(
pipeline_name="chess_pipeline", destination='sqlalchemy', dataset_name="player_data"
)
# Grab some player data from Chess.com API
data = []
for player in ["magnuscarlsen", "rpragchess"]:
response = requests.get(f"https://api.chess.com/pub/player/{player}")
response.raise_for_status()
data.append(response.json())

# Extract, normalize, and load the data
load_info = pipeline.run(data, table_name="player")
print(load_info) # noqa: T201


def load_pandas_data() -> None:
"""Load data from a public csv via pandas"""

owid_disasters_csv = (
"https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv"
)
df = pd.read_csv(owid_disasters_csv)

pipeline = dlt.pipeline(
pipeline_name="from_csv",
destination='sqlalchemy',
dataset_name="mydata",
)
load_info = pipeline.run(df, table_name="natural_disasters")

print(load_info) # noqa: T201


def load_sql_data() -> None:
"""Load data from a sql database with sqlalchemy, for more complex examples use our sql_database source"""

# Use any SQL database supported by SQLAlchemy, below we use a public
# MySQL instance to get data.
# NOTE: you'll need to install pymysql with `pip install pymysql`
# NOTE: loading data from public mysql instance may take several seconds
engine = sa.create_engine("mysql+pymysql://[email protected]:4497/Rfam")

with engine.connect() as conn:
# Select genome table, stream data in batches of 100 elements
query = "SELECT * FROM genome LIMIT 1000"
rows = conn.execution_options(yield_per=100).exec_driver_sql(query)

pipeline = dlt.pipeline(
pipeline_name="from_database",
destination='sqlalchemy',
dataset_name="genome_data",
)

# Convert the rows into dictionaries on the fly with a map function
load_info = pipeline.run(map(lambda row: dict(row._mapping), rows), table_name="genome")

print(load_info) # noqa: T201


@dlt.resource(write_disposition="replace")
def github_api_resource(api_secret_key: Optional[str] = dlt.secrets.value):
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator

url = "https://api.github.com/repos/dlt-hub/dlt/issues"

# Github allows both authenticated and non-authenticated requests (with low rate limits)
auth = BearerTokenAuth(api_secret_key) if api_secret_key else None
for page in paginate(
url, auth=auth, paginator=HeaderLinkPaginator(), params={"state": "open", "per_page": "100"}
):
yield page


@dlt.source
def github_api_source(api_secret_key: Optional[str] = dlt.secrets.value):
return github_api_resource(api_secret_key=api_secret_key)


def load_data_from_source():
pipeline = dlt.pipeline(
pipeline_name="github_api_pipeline", destination='sqlalchemy', dataset_name="github_api_data"
)
load_info = pipeline.run(github_api_source())
print(load_info) # noqa: T201


if __name__ == "__main__":
load_api_data()
load_pandas_data()
load_sql_data()
21 changes: 21 additions & 0 deletions framework/dlt/pokemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""data load tool (dlt) — the open-source Python library for data loading

How to create a data loading pipeline with dlt in 3 seconds:

1. Write a pipeline script
>>> import dlt
>>> from dlt.sources.helpers import requests
>>> dlt.run(requests.get("https://pokeapi.co/api/v2/pokemon/").json()["results"], destination="duckdb", table_name="pokemon")

2. Run your pipeline script
> $ python pokemon.py

3. See and query your data with autogenerated Streamlit app
> $ dlt pipeline dlt_pokemon show

Or start with our pipeline template with sample PokeAPI (pokeapi.co) data loaded to bigquery

> $ dlt init pokemon bigquery

For more detailed info, see https://dlthub.com/docs/getting-started
"""
1 change: 1 addition & 0 deletions framework/dlt/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dlt[sqlalchemy]>=1.3.0