forked from pydiverse/pydiverse.pipedag
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_pipeline.py
89 lines (66 loc) · 2.36 KB
/
run_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from __future__ import annotations
import pandas as pd
import sqlalchemy as sa
from pydiverse.pipedag import Flow, Stage, Table, materialize
from pydiverse.pipedag.context import StageLockContext
from pydiverse.pipedag.util.structlog import setup_logging
@materialize(lazy=True)
def lazy_task_1():
return sa.select(
sa.literal(1).label("x"),
sa.literal(2).label("y"),
)
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_2(input1: sa.sql.expression.Alias, input2: sa.sql.expression.Alias):
query = sa.select(
(input1.c.x * 5).label("x5"),
input2.c.a,
).select_from(input1.outerjoin(input2, input2.c.x == input1.c.x))
return Table(query, name="task_2_out", primary_key=["a"])
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_3(input1: sa.sql.expression.Alias):
return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.original.name}")
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_4(input1: sa.sql.expression.Alias):
return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.original.name}")
@materialize(nout=2, version="1.0.0")
def eager_inputs():
dfA = pd.DataFrame(
{
"a": [0, 1, 2, 4],
"b": [9, 8, 7, 6],
}
)
dfB = pd.DataFrame(
{
"a": [2, 1, 0, 1],
"x": [1, 1, 2, 2],
}
)
return Table(dfA, "dfA"), Table(dfB, "dfB_%%")
@materialize(version="1.0.0", input_type=pd.DataFrame)
def eager_task(tbl1: pd.DataFrame, tbl2: pd.DataFrame):
return tbl1.merge(tbl2, on="x")
def main():
with Flow() as f:
with Stage("stage_1"):
lazy_1 = lazy_task_1()
a, b = eager_inputs()
with Stage("stage_2"):
lazy_2 = lazy_task_2(lazy_1, b)
lazy_3 = lazy_task_3(lazy_2)
eager = eager_task(lazy_1, b)
with Stage("stage_3"):
lazy_4 = lazy_task_4(lazy_2)
_ = lazy_3, lazy_4, eager # unused terminal output tables
# Run flow
result = f.run()
assert result.successful
# Run in a different way for testing
with StageLockContext():
result = f.run()
assert result.successful
assert result.get(lazy_1, as_type=pd.DataFrame)["x"][0] == 1
if __name__ == "__main__":
setup_logging() # you can setup the logging and/or structlog libraries as you wish
main()