-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGetCoinDataToS3.py
138 lines (100 loc) · 4.18 KB
/
GetCoinDataToS3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from pycoingecko import CoinGeckoAPI
from datetime import datetime
import pandas as pd
import time
import boto3
from io import StringIO
import configparser
# initialize the client
cg = CoinGeckoAPI()
# Get lists of all coins w market data->transform df then insert into one large df
def getCoinListData():
"""
Purpose: Get JSON from coingecko api call, and convert into a dataframe
"""
coinListdf = pd.DataFrame()
i = 0
while i < 1: #use 10
coinsMarketListJSON = cg.get_coins_markets(vs_currency = 'usd',
page = 20, #use 3
per_page = 25, #use 250
price_change_percentage = '24h,7d,30d')
outputdf = pd.DataFrame(coinsMarketListJSON)
coinListdf = coinListdf.append(outputdf)
i+=1
return coinListdf
# *********Get json of coin price data + get a df to convert to csv for uploading into s3
# Get complete json of coin price
def getCoinPriceData(coinListdf):
"""
Purpose: Get JSON from coingecko api call
Arguments:
coinListdf -- The output df from getCoinLitData function
"""
coinPriceListJson = []
numberOfRequests = 0
loops = 0
for id in coinListdf['id']:
coinData = cg.get_coin_market_chart_by_id(id=id, vs_currency='usd', days=90, interval='daily')
numberOfRequests += 1
# add the id from the coinsMarketList because it does not come with the id
coinData['id'] = id
coinPriceListJson.append(coinData)
if numberOfRequests == 45:
print("Waiting 90 seconds to not exceed request limit...")
numberOfRequests = 0
loops += 1
time.sleep(90)
return coinPriceListJson
# create df from the complete json
def coinListToDF(coinPriceListJson):
"""
Purpose: Convert JSON list obtained from getCoinPriceData function into a dataframe
Arguments:
coinMarketDataJSON -- The output json from getCoinPriceData function
"""
#Create an empty dataframe
coinPriceDF = pd.DataFrame()
for coin in coinPriceListJson:
print(coin['id'])
for price in coin['prices']:
price[0] = datetime.fromtimestamp(price[0] / 1e3)
price[0] = datetime.date(price[0])
dfCoins = pd.DataFrame(coin['prices'], columns=['date', 'price'])
# set a column to be the id of the coin
dfCoins['Id'] = coin['id']
# Get volume data: create a list of the volume data and join it to price data
coinVolume = coin['total_volumes']
volumeList = []
for volume in coinVolume:
volumeList.append(volume[1])
dfCoins['volume'] = volumeList
# append to final df
coinPriceDF = coinPriceDF.append(dfCoins)
return coinPriceDF
# **************************Insert into AWS s3 bucket***************************
# function for uploading the dfs
def copy_to_s3(client, df, bucket, filepath):
"""
Purpose: Function for uploading the dataframes into an S3 bucket
"""
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
print(f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')
def main():
config = configparser.ConfigParser()
config.read('aws.cfg')
s3 = boto3.client('s3',
aws_access_key_id=config.get('AWS', 'KEY'),
aws_secret_access_key=config.get('AWS', 'SECRET'))
coinListdf = getCoinListData()
coinMarketDataJSON = getCoinPriceData(coinListdf)
coinPricedf = coinListToDF(coinMarketDataJSON)
# upload the coinlist df by first converting into csv then uploading
copy_to_s3(client=s3, df=coinListdf, bucket='coingeckobucket', filepath='coinListData')
# upload coin price df by first converting into csv then uploading
copy_to_s3(client=s3, df=coinPricedf, bucket='coingeckobucket', filepath='coinPriceData')
if __name__ == "__main__":
main()