-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopb.py
150 lines (140 loc) · 4.96 KB
/
opb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
""" Options Strangle Builder """
from sqlalchemy import create_engine, Table, MetaData, and_
from sqlalchemy.sql import select
from datetime import timedelta, datetime
import pandas as pd
import numpy as np
class opb(object):
""" Options strangel builder class"""
def __init__(self):
self.db = create_engine("sqlite:///data1.db")
self.meta_data = MetaData(self.db)
self.table = Table("DATA", self.meta_data, autoload=True)
self.strategy = None
def push_to_db(self, data_in_dict):
"""
Pushes data in dict to sqlite db
-------------------------------------------------
Parameters
----------
data_in_dict -> (dict) instrument data
-------------------------------------------------
Returns
-------
None
"""
if data_in_dict is None:
raise Exception("No input data to push.")
df = pd.DataFrame.from_dict(data_in_dict, orient="index")
df = df.T
df.to_sql("DATA", self.db, if_exists="append")
def get_strikes_latest_data(self, strike, opt):
"""
Get the strikes latest record available in the db
-------------------------------------------------
Parameters
----------
strike -> (int) strike price
opt -> (str) option type - CE or PE
-------------------------------------------------
Returns
-------
return -> (pandas data frame) having one row of data
"""
dts = self.table
stm = select(["*"]).where(and_(dts.c.strike == strike, dts.c.type == opt))
sdf = pd.read_sql_query(stm, con=self.db, parse_dates=["exchange_timestamp"])
sdf = sdf.sort_values("exchange_timestamp")
if len(sdf) > 0:
return sdf.tail(1)
else:
raise Exception("No data available for given inputs.")
def get_strikes_data_at(self, strike, opt, time):
"""
Get the strikes record at a given time
-------------------------------------------------
Parameters
----------
strike -> (int) strike price
opt -> (str) option type - CE or PE
time -> (int) time in int format
-------------------------------------------------
Returns
-------
return -> (pandas data frame) having one row of data
"""
dts = self.table
stm = select(["*"]).where(
and_(
dts.c.strike == strike,
dts.c.type == opt,
dts.c.exchange_timestamp >= time,
)
)
sdf = pd.read_sql_query(stm, con=self.db, parse_dates=["exchange_timestamp"])
sdf = sdf.sort_values("exchange_timestamp")
if len(sdf) > 0:
return sdf.head(1)
else:
raise Exception("No data available for given inputs.")
def create_strangle_position(self, cs, ps):
"""
Creates strangle position
-------------------------------------------------
Parameters
----------
cs -> (int) Call strike
ps -> (int) Put strike
"""
csdf = self.get_strikes_latest_data(cs, "CE")
psdf = self.get_strikes_latest_data(ps, "PE")
posdf = self.build_strangle_position(csdf, psdf)
return posdf
def create_strangle_position_at_time(self, cs, ps, at):
"""
Creates strangle position
-------------------------------------------------
Parameters
----------
cs -> (int) Call strike
ps -> (int) Put strike
at -> (int) Time
"""
csdf = self.get_strikes_data_at(cs, "CE", at)
psdf = self.get_strikes_data_at(ps, "PE", at)
posdf = self.build_strangle_position(csdf, psdf)
return posdf
def build_strangle_position(self, csdf, psdf):
"""
Builds strangle position from given data frames
-------------------------------------------------
Parameters
----------
csdf -> (pandas.DataFrame) Call DataFrame
psdf -> (pandas.DataFrame) Put DataFrame
-------------------------------------------------
Returns
-------
return -> (pandas data frame) having one row of data
"""
csd = csdf[["strike", "last_traded_price", "exchange_timestamp"]]
csd = csd.rename(
columns={
"strike": "CALL_STRIKE",
"last_traded_price": "CALL_LTP",
"exchange_timestamp": "CALL_ET",
}
)
psd = psdf[["strike", "last_traded_price", "exchange_timestamp"]]
psd = psd.rename(
columns={
"strike": "PUT_STRIKE",
"last_traded_price": "PUT_LTP",
"exchange_timestamp": "PUT_ET",
}
)
posdf = csd.join(psd)
posdf = posdf.assign(PNL=(posdf.CALL_LTP + posdf.PUT_LTP))
self.strategy = posdf
posdf.to_json("strategy.json")
return posdf