-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added deploy with modal. #1805
base: devel
Are you sure you want to change the base?
Added deploy with modal. #1805
Changes from 3 commits
3d327f2
8a49dce
87a1045
e1c8cbd
6bca836
9da1b27
1ad1ee9
48674f3
ba8cdbe
3160477
177ac20
fd225f9
ebbd06e
2d27c3f
c783772
a87d1b5
13bcf7d
e397ff6
cc8d5ae
71efdaa
cf1a092
de31e7d
cd04716
1a2d744
d1fbc18
71ab82f
9d0c70b
4209656
1841007
e5d9a30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,154 @@ | ||||||
--- | ||||||
title: Deploy with Modal | ||||||
description: How to deploy a pipeline with Modal | ||||||
keywords: [how to, deploy a pipeline, Modal] | ||||||
canonical: https://modal.com/blog/analytics-stack | ||||||
--- | ||||||
|
||||||
# Deploy with Modal | ||||||
|
||||||
## Introduction to Modal | ||||||
|
||||||
[Modal](https://modal.com/blog/analytics-stack) is a serverless platform designed for developers. It allows you to run and deploy code in the cloud without managing infrastructure. | ||||||
|
||||||
With Modal, you can perform tasks like running generative models, large-scale batch jobs, and job queues, all while easily scaling compute resources. | ||||||
|
||||||
### Modal features | ||||||
|
||||||
- Serverless Compute: No infrastructure management; scales automatically from zero to thousands of CPUs/GPUs. | ||||||
- Cloud Functions: Run Python code in the cloud instantly and scale horizontally. | ||||||
- GPU/CPU Scaling: Easily attach GPUs for heavy tasks like AI model training with a single line of code. | ||||||
- Web Endpoints: Expose any function as an HTTPS API endpoint quickly. | ||||||
- Scheduled Jobs: Convert Python functions into scheduled tasks effortlessly. | ||||||
|
||||||
To know more, please refer to [Modals's documentation.](https://modal.com/docs) | ||||||
|
||||||
## Building Data Pipelines with `dlt` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted, Thanks! |
||||||
|
||||||
**`dlt`** is an open-source Python library that allows you to declaratively load data sources into well-structured tables or datasets. It does this through automatic schema inference and evolution. The library simplifies building data pipelines by providing functionality to support the entire extract and load process. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Let's tone down the formatting here |
||||||
|
||||||
### How does `dlt` integrate with Modal for pipeline orchestration? | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Through the docs, please use plain "dlt" (no backticks) when referring the dlt as a project. Use backticks only when referring to dlt as a code (e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, thanks! |
||||||
|
||||||
To illustrate setting up a pipeline in Modal, we’ll be using the following example: [Building a cost-effective analytics stack with Modal, dlt, and dbt.](https://modal.com/blog/analytics-stack) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, thanks! |
||||||
|
||||||
The example demonstrates automating a workflow to load data from Postgres to Snowflake using `dlt`. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||||||
|
||||||
## How to run `dlt` on Modal | ||||||
burnash marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
Here’s our `dlt` setup copying data from our Postgres read replica into Snowflake: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||||||
|
||||||
1. Run the `dlt` SQL database setup to initialize their `sql_database_pipeline.py` template: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! However, I don't see these changes on GitHub. Is there a chance you haven't pushed the updates to GitHub? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's also not clear what do we do with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||||||
```sh | ||||||
dlt init sql_database snowflake | ||||||
``` | ||||||
2. Open the file and define the Modal Image you want to run `dlt` in: | ||||||
```py | ||||||
import dlt | ||||||
import pendulum | ||||||
from sql_database import sql_database, ConnectionStringCredentials, sql_table | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should probably update this for 1.1.0 right? i.e. |
||||||
import modal | ||||||
import os | ||||||
image = ( | ||||||
modal.Image.debian_slim() | ||||||
.apt_install(["libpq-dev"]) # system requirement for postgres driver | ||||||
.pip_install( | ||||||
"sqlalchemy>=1.4", # how `dlt` establishes connections | ||||||
"dlt[snowflake]>=0.4.11", | ||||||
"psycopg2-binary", # postgres driver | ||||||
"dlt[parquet]", | ||||||
"psutil==6.0.0", # for `dlt` logging | ||||||
"connectorx", # creates arrow tables from database for fast data extraction | ||||||
) | ||||||
) | ||||||
app = modal.App("dlt-postgres-pipeline", image=image) | ||||||
``` | ||||||
|
||||||
3. Wrap the provided `load_table_from_database` with the Modal Function decorator, Modal Secrets containing your database credentials, and a daily cron schedule | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we take There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added the context |
||||||
```py | ||||||
# Function to load the table from the database, scheduled to run daily | ||||||
@app.function( | ||||||
secrets=[ | ||||||
modal.Secret.from_name("snowflake-secret"), | ||||||
modal.Secret.from_name("postgres-read-replica-prod"), | ||||||
], | ||||||
# run this pipeline daily at 6:24 AM | ||||||
schedule=modal.Cron("24 6 * * *"), | ||||||
timeout=3000, | ||||||
) | ||||||
def load_table_from_database( | ||||||
table: str, | ||||||
incremental_col: str, | ||||||
dev: bool = False, | ||||||
) -> None: | ||||||
# Placeholder for future logic | ||||||
pass | ||||||
``` | ||||||
|
||||||
4. Write your `dlt` pipeline: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where the user should put the code from this section? Is it still goes to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It goes to |
||||||
```py | ||||||
# Modal Secrets are loaded as environment variables which are used here to create the SQLALchemy connection string | ||||||
pg_url = f'postgresql://{os.environ["PGUSER"]}:{os.environ["PGPASSWORD"]}@localhost:{os.environ["PGPORT"]}/{os.environ["PGDATABASE"]}' | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use the dlt-native way to configure connection with environment variables: https://dlthub.com/docs/devel/general-usage/credentials/setup#environment-variables that should eliminate the need of manual connection string construction and usage of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a note about this in step 3; I tested it, too, and it worked for source creds. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hey original author here :). are you saying it's better practice to define the sql connection string as a single env variable and then reassign the env variable in the pipeline? e.g.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hey @kning! I would say it's a matter of taste, if you prefer string connection, use it, if not, don't, dlt supports both. In this example, I think, Anton wants to reduce the amount of code and unnecessary manipulations. For example, in this case you can avoid this
|
||||||
snowflake_url = f'snowflake://{os.environ["SNOWFLAKE_USER"]}:{os.environ["SNOWFLAKE_PASSWORD"]}@{os.environ["SNOWFLAKE_ACCOUNT"]}/{os.environ["SNOWFLAKE_DATABASE"]}' | ||||||
# Create a pipeline | ||||||
schema = "POSTGRES_DLT_DEV" if dev else "POSTGRES_DLT" | ||||||
pipeline = dlt.pipeline( | ||||||
pipeline_name="task", | ||||||
destination=dlt.destinations.snowflake(snowflake_url), | ||||||
dataset_name=schema, | ||||||
progress="log", | ||||||
) | ||||||
credentials = ConnectionStringCredentials(pg_url) | ||||||
# defines the postgres table to sync (in this case, the "task" table) | ||||||
source_1 = sql_database(credentials, backend="connectorx").with_resources("task") | ||||||
# defines which column to reference for incremental loading (i.e. only load newer rows) | ||||||
source_1.task.apply_hints( | ||||||
incremental=dlt.sources.incremental( | ||||||
"enqueued_at", | ||||||
initial_value=pendulum.datetime(2024, 7, 24, 0, 0, 0, tz="UTC"), | ||||||
) | ||||||
) | ||||||
# if there are duplicates, merge the latest values | ||||||
info = pipeline.run(source_1, write_disposition="merge") | ||||||
print(info) | ||||||
``` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like the next step is missing: how this code ends up on Modal? How to trigger runs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added step 5 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This runs the pipeline once, but might be worth adding that you need to run |
||||||
## Advanced configuration | ||||||
### Modal Proxy | ||||||
If your database is in a private VPN, you can use [Modal Proxy](https://modal.com/docs/reference/modal.Proxy) as a bastion server (only available to Enterprise customers). We use Modal Proxy to connect to our production read replica by attaching it to the Function definition and changing the hostname to localhost: | ||||||
dat-a-man marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
```py | ||||||
@app.function( | ||||||
secrets=[ | ||||||
modal.Secret.from_name("snowflake-secret"), | ||||||
modal.Secret.from_name("postgres-read-replica-prod"), | ||||||
], | ||||||
schedule=modal.Cron("24 6 * * *"), | ||||||
proxy=modal.Proxy.from_name("prod-postgres-proxy", environment_name="main"), | ||||||
timeout=3000, | ||||||
) | ||||||
def task_pipeline(dev: bool = False) -> None: | ||||||
pg_url = f'postgresql://{os.environ["PGUSER"]}:{os.environ["PGPASSWORD"]}@localhost:{os.environ["PGPORT"]}/{os.environ["PGDATABASE"]}' | ||||||
``` | ||||||
### Capturing deletes | ||||||
One limitation of our simple approach above is that it does not capture updates or deletions of data. This isn’t a hard requirement yet for our use cases, but it appears that `dlt` does have a [Postgres CDC replication feature](../../dlt-ecosystem/verified-sources/pg_replication) that we are considering. | ||||||
### Scaling out | ||||||
The example above syncs one table from our Postgres data source. In practice, we are syncing multiple tables and mapping each table copy job to a single container using [Modal.starmap](https://modal.com/docs/reference/modal.Function#starmap): | ||||||
```py | ||||||
@app.function(timeout=3000, schedule=modal.Cron("29 11 * * *")) | ||||||
def main(dev: bool = False): | ||||||
tables = [ | ||||||
("task", "enqueued_at", dev), | ||||||
("worker", "launched_at", dev), | ||||||
... | ||||||
] | ||||||
list(load_table_from_database.starmap(tables)) | ||||||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there the link from Modal should go to the https://modal.com/. I can see that the blog post is already linked from another section below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks! Corrected.