-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraph-interactive.py
229 lines (193 loc) · 9.24 KB
/
graph-interactive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import subprocess
import sys
# Function to install dependencies if not already installed
def install_dependencies(requirements_path='requirements.txt'):
try:
import matplotlib
import pandas
import requests
# Add any other modules you want to check for here
print("All dependencies are already installed.")
except ImportError:
try:
subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-r', requirements_path])
print("All dependencies installed successfully.")
except subprocess.CalledProcessError:
print("Failed to install dependencies. Ensure you have the necessary permissions.")
# Call the function at the start of your script execution
install_dependencies()
import hashlib
import json
import pickle
import requests
import time
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import os
import argparse
# Constants
default_url = "https://rest-osmosis.ecostake.com:443"
query_path = "/osmosis/concentratedliquidity/v1beta1/liquidity_per_tick_range"
# Function to read data from API
def read_data_from_api(url, pool_id, block_height=None):
headers = {"Content-Type": "application/json"}
if block_height is not None:
headers['x-cosmos-block-height'] = str(block_height)
response = requests.get(url + query_path, params={"pool_id": pool_id}, headers=headers)
print(f"API response status: {response.status_code}") # Debugging print
if response.status_code != 200:
raise ValueError(f"Error fetching data: HTTP status {response.status_code}")
data = response.json()
print(f"Data received from API: {data}") # Debugging print
return data
# Function to preprocess data
def preprocess_data(data):
if 'liquidity' not in data:
raise ValueError("Data format error: 'liquidity' key not found in response data")
df = pd.DataFrame(data['liquidity'])
df['liquidity_amount'] = pd.to_numeric(df['liquidity_amount'])
df['lower_tick'] = pd.to_numeric(df['lower_tick'])
df['upper_tick'] = pd.to_numeric(df['upper_tick'])
df['tick_range'] = df['upper_tick'] - df['lower_tick']
print(f"Dataframe after preprocessing: {df.head()}") # Debugging print
return df
# Function to export data to CSV
def export_to_csv(df, output_path, pool_id):
csv_file_path = f"{output_path}/pool_{pool_id}_{time.strftime('%Y%m%d-%H%M%S')}.csv"
df.to_csv(csv_file_path, index=False)
print(f"Data exported to CSV file at {csv_file_path}")
# Function to plot 3D liquidity data
def plot_3d_liquidity(df, output_file, dot_size, block_height):
fig = plt.figure(figsize=(14, 8)) # Slightly larger to accommodate the table
ax = fig.add_subplot(111, projection='3d')
# Customize the title to include block height if provided
title = f'Liquidity Amount Per Tick Range\nHeight: {block_height}' if block_height else 'Liquidity Amount Per Tick Range'
ax.set_title(title)
# Plot the 3D scatter
focus_df = df[(df['liquidity_amount'] < df['liquidity_amount'].quantile(0.99)) &
(df['tick_range'] < df['tick_range'].quantile(0.99))]
img = ax.scatter(focus_df['lower_tick'], focus_df['upper_tick'], focus_df['liquidity_amount'],
c=focus_df['liquidity_amount'], cmap='viridis', marker='o', s=dot_size, depthshade=True)
ax.set_xlabel('Lower Tick Value')
ax.set_ylabel('Upper Tick Value')
ax.set_zlabel('Liquidity Amount')
# Add a color bar for the scatter plot
fig.colorbar(img, ax=ax, label='Liquidity Amount')
# Creating an inset axes for the additional details table
table_ax = fig.add_axes([0.05, 0.5, 0.2, 0.15]) # Adjust as needed for placement and size of the table
table_ax.axis('tight')
table_ax.axis('off')
# Prepare the data for the table
table_data = [
['Mean', df['liquidity_amount'].mean()],
['Median', df['liquidity_amount'].median()],
['Std Dev', df['liquidity_amount'].std()],
['Total Count', df['liquidity_amount'].count()]
]
# Create the table and plot it
table = table_ax.table(
cellText=table_data,
colLabels=['Metric', 'Value'],
cellLoc='center',
loc='center'
)
table.auto_set_font_size(False)
table.set_fontsize(8)
table.scale(1, 1.5)
# Save the plot to a file
plt.savefig(output_file)
plt.close()
print(f"Plot saved to {output_file}")
# Function to process range of heights
def process_heights(heights_arg):
if '-' in heights_arg: # It's a range
start, end = map(int, heights_arg.split('-'))
return list(range(start, end + 1))
elif ',' in heights_arg: # It's a list
return list(map(int, heights_arg.split(',')))
elif heights_arg: # It's a single height
return [int(heights_arg)]
else:
return [] # No heights specified
# Function to compute a hash for the data
def compute_hash(data):
data_string = json.dumps(data, sort_keys=True)
data_hash = hashlib.sha256(data_string.encode('utf-8')).hexdigest()
return data_hash
# Dictionary to store the last hash for a given pool_id
last_hashes = {}
# Function to load and save the last hash for persistence across script runs
def save_hashes(hashes, filename='last_hashes.pkl'):
with open(filename, 'wb') as f:
pickle.dump(hashes, f)
def load_hashes(filename='last_hashes.pkl'):
try:
with open(filename, 'rb') as f:
return pickle.load(f)
except FileNotFoundError:
return {}
# Main Execution Block
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generate a 3D scatter plot and/or CSV from pool data.")
parser.add_argument('--pool_id', type=str, help='Pool ID.')
parser.add_argument('--heights', type=str, default='', help='Range of heights (e.g., 1000-1200) or specific heights (e.g., 1000,1005,1249).')
parser.add_argument('--csv', choices=['yes', 'no', 'exclusively'], help='Output a CSV file in addition to the plot.')
parser.add_argument('--dot_size', type=int, default=30, help='Size of the dots in the plot, ranging from 1 to 100. Default is 30.')
parser.add_argument('--url', type=str, default=default_url, help='Node REST URL.')
args = parser.parse_args()
# Prompt for pool_id if not provided as argument
if not args.pool_id:
args.pool_id = input("Enter the Pool ID: ")
# Prompt for csv option if not provided as argument
if not args.csv:
csv_choice = input("Do you want to create a CSV file? (yes, no, exclusively): ").strip().lower()
if csv_choice not in ['yes', 'no', 'exclusively']:
print("Invalid choice. Defaulting to 'yes'.")
csv_choice = 'yes'
args.csv = csv_choice
# Load last hashes or initialize if not found
last_hashes = load_hashes()
# Ensure the data directory exists
output_path = './data'
if not os.path.exists(output_path):
os.makedirs(output_path)
# Process a range of heights or specific heights
heights_to_check = process_heights(args.heights) or [None] # Use None as a placeholder for interactive mode if no heights provided
# Iterate over each specified block height
for block_height in heights_to_check:
if block_height is None: # Interactive mode if no height is provided
block_height = input("Enter the Block Height (optional, press Enter to skip): ")
if block_height:
block_height = int(block_height)
else:
block_height = None # Reset to None if no height is entered
try:
print(f"Fetching data for pool ID {args.pool_id} at block height {block_height}...") # Debugging print
data = read_data_from_api(args.url, args.pool_id, block_height)
print(f"Data fetched: {data}") # Debugging print
data_hash = compute_hash(data)
# Check if the data is identical to the last run
if args.pool_id in last_hashes and last_hashes[args.pool_id] == data_hash:
print(f"No changes in data for pool ID {args.pool_id} at block height {block_height}.")
continue # Skip to the next height
# If the data is new or has changed, process it and update the hash
last_hashes[args.pool_id] = data_hash
df = preprocess_data(data)
print(f"Data preprocessed: {df}") # Debugging print
# Determine file names and paths
timestamp = time.strftime('%Y%m%d-%H%M%S')
output_file_base = f"pool_{args.pool_id}_height_{block_height}_{timestamp}"
if args.csv in ['yes', 'exclusively']:
export_to_csv(df, output_path, output_file_base)
if args.csv != 'exclusively':
output_file_name = f"{output_file_base}.png"
output_file_path = os.path.join(output_path, output_file_name)
plot_3d_liquidity(df, output_file_path, args.dot_size, block_height)
print(f"Plot saved to {output_file_path}")
except Exception as e:
print(f"An error occurred at block height {block_height}: {e}")
continue # Continue to the next height
# Save updated hashes after all operations
save_hashes(last_hashes)
print("Hashes updated and saved.") # Debugging print