-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipedrive_pipeline.py
137 lines (120 loc) · 4.2 KB
/
pipedrive_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import dlt
from pipedrive import pipedrive_source
from dlt_plus.dbt_generator.utils import table_reference_adapter
def load_pipedrive() -> None:
"""Constructs a pipeline that will load all pipedrive data"""
# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name="pipedrive", destination='bigquery', dataset_name="pipedrive_data"
)
load_info = pipeline.run(pipedrive_source())
print(load_info)
print(pipeline.last_trace.last_normalize_info)
table_reference_adapter(
pipeline,
"activities",
references=[
{
"referenced_table": "users",
"columns": ["user_id"],
"referenced_columns": ["id"],
}
], )
table_reference_adapter(
pipeline,
"activities",
references=[
{
"referenced_table": "organizations",
"columns": ["org_id"],
"referenced_columns": ["id"],
}
], )
table_reference_adapter(
pipeline,
"activities",
references=[
{
"referenced_table": "persons",
"columns": ["person_id"],
"referenced_columns": ["id"],
}
], )
table_reference_adapter(
pipeline,
"deals",
references=[
{
"referenced_table": "users",
"columns": ["user_id"],
"referenced_columns": ["id"],
}
], )
table_reference_adapter(
pipeline,
"deals",
references=[
{
"referenced_table": "organizations",
"columns": ["org_id"],
"referenced_columns": ["id"],
}
], )
table_reference_adapter(
pipeline,
"deals",
references=[
{
"referenced_table": "persons",
"columns": ["person_id"],
"referenced_columns": ["id"],
}
], )
def load_selected_data() -> None:
"""Shows how to load just selected tables using `with_resources`"""
pipeline = dlt.pipeline(
pipeline_name="pipedrive", destination='bigquery', dataset_name="pipedrive_data"
)
# Use with_resources to select which entities to load
# Note: `custom_fields_mapping` must be included to translate custom field hashes to corresponding names
load_info = pipeline.run(
pipedrive_source().with_resources(
"products", "deals", "deals_participants", "custom_fields_mapping"
)
)
print(load_info)
# just to show how to access resources within source
pipedrive_data = pipedrive_source()
# print source info
print(pipedrive_data)
print()
# list resource names
print(pipedrive_data.resources.keys())
print()
# print `persons` resource info
print(pipedrive_data.resources["persons"])
print()
# alternatively
print(pipedrive_data.persons)
def load_from_start_date() -> None:
"""Example to incrementally load activities limited to items updated after a given date"""
pipeline = dlt.pipeline(
pipeline_name="pipedrive", destination='bigquery', dataset_name="pipedrive_data"
)
# First source configure to load everything except activities from the beginning
source = pipedrive_source()
source.resources["activities"].selected = False
# Another source configured to activities starting at the given date (custom_fields_mapping is included to translate custom field hashes to names)
activities_source = pipedrive_source(
since_timestamp="2023-03-01 00:00:00Z"
).with_resources("activities", "custom_fields_mapping")
# Run the pipeline with both sources
load_info = pipeline.run([source, activities_source])
print(load_info)
if __name__ == "__main__":
# run our main example
load_pipedrive()
# load selected tables and display resource info
# load_selected_data()
# load activities updated since given date
# load_from_start_date()