-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathCustomers and Addresses.txt
118 lines (82 loc) · 3.34 KB
/
Customers and Addresses.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import shopify
import pandas as pd
import requests
import sqlalchemy
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
from sqlalchemy.sql import text as sa_text
import logging
# Initialize the Snowflake connection
engine = create_engine(URL(
account='',
user='',
password='',
warehouse='',
database='',
schema='',
# role = ''
))
connection=engine.raw_connection()
cursor=connection.cursor()
# cursor.execute(f"SELECT MAX(UPDATED_AT) FROM CUSTOMERS;")
# results = str(cursor.fetchall()[0][0])
logger = logging.getLogger(__name__)
store_name = ''
access_key = ''
access_secret= ''
access_token = ''
shopify.Session.setup(api_key=access_key, secret=access_secret)
headers = {
'Content-Type': 'application/json',
'X-Shopify-Access-Token': access_token
}
parameters= {
# 'updated_at_min': results,
'limit' : 50
}
def remove_timezone_offset(date_string):
date_string = date_string.split('+')[0]
return pd.to_datetime(date_string)
customer_df = pd.DataFrame()
url=f"https://{access_key}:{access_secret}@{store_name}.myshopify.com/admin/api/2021-10/customers.json"
while True:
df = pd.DataFrame()
response = requests.get(url, params = parameters, headers=headers)
data = response.json()
df = pd.DataFrame(data['customers'])
# print(df.shape)
customer_df = pd.concat([customer_df,df])
if 'next' in response.links:
url = response.links['next']['url']
else:
break
print("Customers Data Before Transaformation: ", customer_df.shape)
if customer_df.shape[0] > 0:
addresses = customer_df[['addresses']]
customer_df = customer_df.astype('str')
customer_df.rename(columns={'id':'customer_id', 'state' : 'customer_state'}, inplace=True)
customer_df = customer_df.astype({'customer_id': 'Int64'})
customer_date_list = ['created_at', 'updated_at']
for cdate in customer_date_list:
customer_df[cdate] = customer_df[cdate].apply(remove_timezone_offset)
customer_df.columns = customer_df.columns.str.upper()
print("Customers: ", customer_df.shape)
addresses_exploded = addresses.explode('addresses')
addresses_exploded = pd.json_normalize(addresses_exploded['addresses'])
addresses_exploded.rename(columns={'id':'address_id', 'default':'default_address'}, inplace=True)
# addresses_exploded = addresses_exploded.fillna('')
# addresses_exploded = addresses_exploded.astype('str')
addresses_exploded = addresses_exploded.astype({'address_id':'Int64', 'customer_id':'Int64'})
addresses_exploded = addresses_exploded.dropna(how='all')
addresses_exploded.columns = addresses_exploded.columns.str.upper()
print("Addresses: ", addresses_exploded.shape)
print("Customers and Addresses data is ready to be loaded.")
try:
customer_df.to_sql('customers', con=engine, index=False, if_exists='append', chunksize = 15000)
print("Customers Data is Loaded")
addresses_exploded.to_sql('customer_addresses', con=engine, index=False, if_exists='append', chunksize = 15000)
print("Customer Addresses are loaded")
except Exception as e:
logger.error("ERROR: %s", str(e))
else:
print("As Data Frame is empty, so nothing to load.")