diff --git a/framework/dlt/.dlt/config.toml b/framework/dlt/.dlt/config.toml new file mode 100644 index 00000000..2db5fac8 --- /dev/null +++ b/framework/dlt/.dlt/config.toml @@ -0,0 +1,7 @@ +# put your configuration values here +#add_dlt_load_id = false + +[runtime] +log_level="WARNING" # the system log level of dlt +# use the dlthub_telemetry setting to enable/disable anonymous usage data reporting, see https://dlthub.com/docs/reference/telemetry +dlthub_telemetry = false diff --git a/framework/dlt/.gitignore b/framework/dlt/.gitignore new file mode 100644 index 00000000..3b28aa3f --- /dev/null +++ b/framework/dlt/.gitignore @@ -0,0 +1,10 @@ +# ignore secrets, virtual environments and typical python compilation artifacts +secrets.toml +# ignore basic python artifacts +.env +**/__pycache__/ +**/*.py[cod] +**/*$py.class +# ignore duckdb +*.duckdb +*.wal \ No newline at end of file diff --git a/framework/dlt/ddl.sql b/framework/dlt/ddl.sql new file mode 100644 index 00000000..447e445f --- /dev/null +++ b/framework/dlt/ddl.sql @@ -0,0 +1,41 @@ +CREATE TABLE IF NOT EXISTS "doc"."_dlt_version" ( + "version" bigint NOT NULL, + "engine_version" bigint NOT NULL, + "inserted_at" timestamp with time zone NOT NULL, + "schema_name" varchar NOT NULL, + "version_hash" varchar NOT NULL, + "schema" varchar NOT NULL); + CREATE TABLE IF NOT EXISTS "doc"."_dlt_loads" ( + "load_id" varchar NOT NULL, + "schema_name" varchar , + "status" bigint NOT NULL, + "inserted_at" timestamp with time zone NOT NULL, + "schema_version_hash" varchar ); + CREATE TABLE IF NOT EXISTS "doc"."_dlt_pipeline_state" ( + "version" bigint NOT NULL, + "engine_version" bigint NOT NULL, + "pipeline_name" varchar NOT NULL, + "state" varchar NOT NULL, + "created_at" timestamp with time zone NOT NULL, + "version_hash" varchar , + "_dlt_load_id" varchar NOT NULL, + "_dlt_id" varchar UNIQUE NOT NULL); + CREATE TABLE "doc"."player" ( + "avatar" varchar , + "player_id" bigint , + "aid" varchar , + "url" varchar , + "name" varchar , + "username" varchar , + "title" varchar , + "followers" bigint , + "country" varchar , + "location" varchar , + "last_online" bigint , + "joined" bigint , + "status" varchar , + "is_streamer" boolean , + "verified" boolean , + "league" varchar , + "_dlt_load_id" varchar NOT NULL, + "_dlt_id" varchar UNIQUE NOT NULL); diff --git a/framework/dlt/example.py b/framework/dlt/example.py new file mode 100644 index 00000000..a42ac2f9 --- /dev/null +++ b/framework/dlt/example.py @@ -0,0 +1,32 @@ +from dlt.common.schema import typing + +# default dlt columns +typing.C_DLT_ID = "__dlt_id" +"""unique id of current row""" +typing.C_DLT_LOAD_ID = "__dlt_load_id" +"""load id to identify records loaded in a single load package""" + +import dlt +from dlt.sources.helpers import requests + +# Create a dlt pipeline that will load +# chess player data to the DuckDB destination +pipeline = dlt.pipeline( + pipeline_name='chess_pipeline', + destination=dlt.destinations.sqlalchemy("crate://crate:@localhost/"), + #destination=dlt.destinations.postgres("postgresql://postgres:@localhost/"), + dataset_name='doc', +) + +# Grab some player data from Chess.com API +data = [] +for player in ['magnuscarlsen', 'rpragchess']: + response = requests.get(f'https://api.chess.com/pub/player/{player}') + response.raise_for_status() + data.append(response.json()) + +# Extract, normalize, and load the data +if __name__ == "__main__": + pipeline.run(data, + table_name='player', + ) diff --git a/framework/dlt/foo_pipeline.py b/framework/dlt/foo_pipeline.py new file mode 100644 index 00000000..f8bca9a4 --- /dev/null +++ b/framework/dlt/foo_pipeline.py @@ -0,0 +1,111 @@ +"""The Intro Pipeline Template contains the example from the docs intro page""" + +# mypy: disable-error-code="no-untyped-def,arg-type" + +from typing import Optional +import pandas as pd +import sqlalchemy as sa + +import dlt +from dlt.sources.helpers import requests + + +def load_api_data() -> None: + """Load data from the chess api, for more complex examples use our rest_api source""" + + # Create a dlt pipeline that will load + # chess player data to the DuckDB destination + pipeline = dlt.pipeline( + pipeline_name="chess_pipeline", destination='sqlalchemy', dataset_name="player_data" + ) + # Grab some player data from Chess.com API + data = [] + for player in ["magnuscarlsen", "rpragchess"]: + response = requests.get(f"https://api.chess.com/pub/player/{player}") + response.raise_for_status() + data.append(response.json()) + + # Extract, normalize, and load the data + load_info = pipeline.run(data, table_name="player") + print(load_info) # noqa: T201 + + +def load_pandas_data() -> None: + """Load data from a public csv via pandas""" + + owid_disasters_csv = ( + "https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/" + "Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/" + "Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv" + ) + df = pd.read_csv(owid_disasters_csv) + + pipeline = dlt.pipeline( + pipeline_name="from_csv", + destination='sqlalchemy', + dataset_name="mydata", + ) + load_info = pipeline.run(df, table_name="natural_disasters") + + print(load_info) # noqa: T201 + + +def load_sql_data() -> None: + """Load data from a sql database with sqlalchemy, for more complex examples use our sql_database source""" + + # Use any SQL database supported by SQLAlchemy, below we use a public + # MySQL instance to get data. + # NOTE: you'll need to install pymysql with `pip install pymysql` + # NOTE: loading data from public mysql instance may take several seconds + engine = sa.create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") + + with engine.connect() as conn: + # Select genome table, stream data in batches of 100 elements + query = "SELECT * FROM genome LIMIT 1000" + rows = conn.execution_options(yield_per=100).exec_driver_sql(query) + + pipeline = dlt.pipeline( + pipeline_name="from_database", + destination='sqlalchemy', + dataset_name="genome_data", + ) + + # Convert the rows into dictionaries on the fly with a map function + load_info = pipeline.run(map(lambda row: dict(row._mapping), rows), table_name="genome") + + print(load_info) # noqa: T201 + + +@dlt.resource(write_disposition="replace") +def github_api_resource(api_secret_key: Optional[str] = dlt.secrets.value): + from dlt.sources.helpers.rest_client import paginate + from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator + + url = "https://api.github.com/repos/dlt-hub/dlt/issues" + + # Github allows both authenticated and non-authenticated requests (with low rate limits) + auth = BearerTokenAuth(api_secret_key) if api_secret_key else None + for page in paginate( + url, auth=auth, paginator=HeaderLinkPaginator(), params={"state": "open", "per_page": "100"} + ): + yield page + + +@dlt.source +def github_api_source(api_secret_key: Optional[str] = dlt.secrets.value): + return github_api_resource(api_secret_key=api_secret_key) + + +def load_data_from_source(): + pipeline = dlt.pipeline( + pipeline_name="github_api_pipeline", destination='sqlalchemy', dataset_name="github_api_data" + ) + load_info = pipeline.run(github_api_source()) + print(load_info) # noqa: T201 + + +if __name__ == "__main__": + load_api_data() + load_pandas_data() + load_sql_data() diff --git a/framework/dlt/pokemon.py b/framework/dlt/pokemon.py new file mode 100644 index 00000000..9ca44257 --- /dev/null +++ b/framework/dlt/pokemon.py @@ -0,0 +1,21 @@ +"""data load tool (dlt) — the open-source Python library for data loading + +How to create a data loading pipeline with dlt in 3 seconds: + +1. Write a pipeline script +>>> import dlt +>>> from dlt.sources.helpers import requests +>>> dlt.run(requests.get("https://pokeapi.co/api/v2/pokemon/").json()["results"], destination="duckdb", table_name="pokemon") + +2. Run your pipeline script + > $ python pokemon.py + +3. See and query your data with autogenerated Streamlit app + > $ dlt pipeline dlt_pokemon show + +Or start with our pipeline template with sample PokeAPI (pokeapi.co) data loaded to bigquery + + > $ dlt init pokemon bigquery + +For more detailed info, see https://dlthub.com/docs/getting-started +""" diff --git a/framework/dlt/requirements.txt b/framework/dlt/requirements.txt new file mode 100644 index 00000000..13c3bbb2 --- /dev/null +++ b/framework/dlt/requirements.txt @@ -0,0 +1 @@ +dlt[sqlalchemy]>=1.3.0