ODBC is slow 🐢 bcp is fast! 🐰
This is a custom dagster IO manager for loading data into SQL Server using the bcp
utility.
pip install dagster-mssql-bcp
The bcp
utility must be installed on the machine that is running the dagster pipeline.
See Microsoft's documentation for more information.
Ideally you should place this on your PATH
, but you can specify in the IO configuration where it is located.
You need the ODBC drivers installed on the machine that is running the dagster pipeline.
See Microsoft's documentation for more information.
The user running the dagster pipeline must have the necessary permissions to load data into the SQL Server database.
CREATE SCHEMA
CREATE/ALTER TABLES
Polars processes as a LazyFrame
. Either a DataFrame
or LazyFrame
can be provided as an output of your asset before its cast automatically to lazy
from dagster import asset, Definitions
from dagster_mssql_bcp import PolarsBCPIOManager
import polars as pl
io_manager = PolarsBCPIOManager(
host="my_mssql_server",
database="my_database",
user="username",
password="password",
query_props={
"TrustServerCertificate": "yes",
},
bcp_arguments={"-u": ""},
bcp_path="/opt/mssql-tools18/bin/bcp",
)
@asset(
metadata={
"asset_schema": [
{"name": "id", "type": "INT"},
],
"schema": "my_schema",
}
)
def my_polars_asset(context):
return pl.DataFrame({"id": [1, 2, 3]})
@asset(
metadata={
"asset_schema": [
{"name": "id", "type": "INT"},
],
"schema": "my_schema",
}
)
def my_polars_asset_lazy(context):
return pl.LazyFrame({"id": [1, 2, 3]})
defs = Definitions(
assets=[my_polars_asset, my_polars_asset_lazy],
io_managers={
"io_manager": io_manager,
},
)
from dagster import asset, Definitions
from dagster_mssql_bcp import PandasBCPIOManager
import pandas as pd
io_manager = PandasBCPIOManager(
host="my_mssql_server",
database="my_database",
user="username",
password="password",
query_props={
"TrustServerCertificate": "yes",
},
bcp_arguments={"-u": ""},
bcp_path="/opt/mssql-tools18/bin/bcp",
)
@asset(
metadata={
"asset_schema": [
{"name": "id", "type": "INT"},
],
"schema": "my_schema",
}
)
def my_pandas_asset(context):
return pd.DataFrame({"id": [1, 2, 3]})
defs = Definitions(
assets=[my_pandas_asset],
io_managers={
"io_manager": io_manager,
},
)
The asset schema
defines your table structure and your asset returns your data to load.
For more details see assets doc, io manager doc, and for how its implemented, the dev doc.