Skip to content

Commit

Permalink
add binance spot
Browse files Browse the repository at this point in the history
  • Loading branch information
uv-orbs committed Jul 24, 2022
1 parent 093d575 commit 6e067f1
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 94 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.venv/
__pycache__/
.env
24 changes: 24 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true
},
{
"name": "Python: main",
"type": "python",
"request": "launch",
"program": "main.py",
"console": "integratedTerminal",
"justMyCode": true
}
]
}
3 changes: 2 additions & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
worker: python3 main.py
deribit: python3 drbt.py
binance: python3 bnnc.py
42 changes: 42 additions & 0 deletions bnnc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

from influxdb_client.client.write_api import SYNCHRONOUS
from binance import AsyncClient, BinanceSocketManager
import os
import asyncio
from influx import get_influx_client, get_influx_write_api

api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_API_SEC')


def send_data_point(price):
p = get_influx_client().db_client.Point("spot").tag(
"version", "dev1.1").tag("exchange", "binance").tag("pair", "BTCUSDT")

p.field("price", price)
res = get_influx_write_api().write(
bucket='deribit-1', org='Orbs', record=p)


async def main():
last_price = 1
client = await AsyncClient.create()
bm = BinanceSocketManager(client)
# start any sockets here, i.e a trade socket
ts = bm.trade_socket('BTCUSDT')
# then start receiving messages
async with ts as tscm:
while True:
res = await tscm.recv()
price = float(res['p'])
if abs(price / last_price - 1) > .0001:
print(price)
last_price = price
send_data_point(price)

await client.close_connection()


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
93 changes: 93 additions & 0 deletions drbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import asyncio
import websockets
import json
import os
from collections.abc import MutableMapping
from datetime import datetime, timedelta
from influx import get_influx_client, get_influx_write_api


# get tomorrow
dt = datetime.now() + timedelta(days=1)
#tom = dt.strftime("%d%b%y").upper()
expiry = os.getenv('EXPIRY')

# 21JUL22
# get strike
strike = int(os.getenv('STRIKE'))
#################################################################
chans = [
f'ticker.BTC-{expiry}-{strike}-C.100ms',
f'ticker.BTC-{expiry}-{strike}-P.100ms',
]
msgSub = \
{
"jsonrpc": "2.0",
"id": 3600,
"method": "public/subscribe",
"params": {
"channels": chans
}
}


def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.') -> MutableMapping:
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, MutableMapping):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)


def create_data_point(data):
p = get_influx_client().Point("options").tag("version", "dev1.1")
flat = flatten_dict(data)
for f in flat:
if isinstance(flat[f], str):
p = p.tag(f, flat[f])
elif type(flat[f]) == int or type(flat[f]) == float:
p = p.field(f, flat[f])

return p


def write_data(j):
params = j['params']
if params is None:
return
channel = params['channel']
if channel is None or channel not in chans:
return
data = params['data']
if data is None:
return

p = create_data_point(data)
res = get_influx_write_api().write(
bucket='deribit-1', org='Orbs', record=p)

if(res):
print(res)

print(data['instrument_name'], data['underlying_price'])


async def call_api(msg):
async with websockets.connect('wss://www.deribit.com/ws/api/v2') as websocket:
await websocket.send(msg)
while websocket.open:
response = await websocket.recv()
# do something with the response...
# print(response)
j = json.loads(response)
try:
write_data(j)
except Exception as e:
print('Exception:', e)


if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msgSub)))
22 changes: 22 additions & 0 deletions influx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS


#################################################################
# influx
influx_bucket = 'deribit-1'
client = influxdb_client.InfluxDBClient(
url=os.getenv('INFLUX_URL'),
token=os.getenv('INFLUX_TOKEN'),
org=os.getenv('INFLUX_ORG')
)
write_api = client.write_api(write_options=SYNCHRONOUS)


def get_influx_client():
return client


def get_influx_write_api():
return write_api
110 changes: 17 additions & 93 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,103 +1,27 @@
import asyncio
import websockets
import json
import influxdb_client
import os
from influxdb_client.client.write_api import SYNCHRONOUS
from collections.abc import MutableMapping
from dotenv import load_dotenv
from datetime import datetime, timedelta
load_dotenv()
from bnnc import start_binance
from drbt import start_deribit
from concurrent.futures import ProcessPoolExecutor

#################################################################
# influx
influx_bucket = 'deribit-1'
client = influxdb_client.InfluxDBClient(
url=os.getenv('INFLUX_URL'),
token=os.getenv('INFLUX_TOKEN'),
org=os.getenv('INFLUX_ORG')
)
write_api = client.write_api(write_options=SYNCHRONOUS)
# async def printi(s):
# print(s)

# get tomorrow
dt = datetime.now() + timedelta(days=1)
#tom = dt.strftime("%d%b%y").upper()
expiry = os.getenv('EXPIRY')

# 21JUL22
# get strike
strike = int(os.getenv('STRIKE'))
#################################################################
chans = [
f'ticker.BTC-{expiry}-{strike}-C.100ms',
f'ticker.BTC-{expiry}-{strike}-P.100ms',
]
msgSub = \
{
"jsonrpc": "2.0",
"id": 3600,
"method": "public/subscribe",
"params": {
"channels": chans
}
}
async def main():
load_dotenv()
#executor = ProcessPoolExecutor(2)

await start_binance(asyncio)
# a = loop.run_in_executor(executor, start_binance)
# b = loop.run_in_executor(executor, start_deribit)

def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.') -> MutableMapping:
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, MutableMapping):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
# asyncio.get_event_loop().run_until_complete(start_all())


def create_data_point(data):
p = influxdb_client.Point("options").tag("version", "dev1.1")
flat = flatten_dict(data)
for f in flat:
if isinstance(flat[f], str):
p = p.tag(f, flat[f])
elif type(flat[f]) == int or type(flat[f]) == float:
p = p.field(f, flat[f])
if __name__ == "__main__":
loop = asyncio.get_event_loop()
main()
loop.run_forever()

return p


def write_data(j):
params = j['params']
if params is None:
return
channel = params['channel']
if channel is None or channel not in chans:
return
data = params['data']
if data is None:
return

p = create_data_point(data)
res = write_api.write(
bucket='deribit-1', org='Orbs', record=p)

if(res):
print(res)

print(data['instrument_name'], data['underlying_price'])


async def call_api(msg):
async with websockets.connect('wss://www.deribit.com/ws/api/v2') as websocket:
await websocket.send(msg)
while websocket.open:
response = await websocket.recv()
# do something with the response...
# print(response)
j = json.loads(response)
try:
write_data(j)
except Exception as e:
print('Exception:', e)

asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msgSub)))
print('DONE!!!')

0 comments on commit 6e067f1

Please sign in to comment.