Skip to content
This repository was archived by the owner on Nov 18, 2024. It is now read-only.

Commit 6017872

Browse files
committed
- Added START & END stream messages, which get sent at the end / beginning of each day.
- Added error handling if a stream request fails. - Various Bug Fixes
1 parent 00df25e commit 6017872

File tree

3 files changed

+123
-19
lines changed

3 files changed

+123
-19
lines changed

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "thetadata"
7-
version = "0.7.7"
7+
version = "0.7.8"
88
authors = [
99
{ name="Bailey Danseglio", email="[email protected]" },
1010
{ name="Adler Weber", email="[email protected]" },

thetadata/client.py

+93-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Module that contains Theta Client class."""
2+
import datetime
23
import struct
4+
import threading
35
from decimal import Decimal
46
from threading import Thread
57
from time import sleep
@@ -21,7 +23,7 @@
2123
from .terminal import check_download, launch_terminal
2224

2325
_NOT_CONNECTED_MSG = "You must establish a connection first."
24-
_VERSION = '0.7.7'
26+
_VERSION = '0.7.8'
2527

2628

2729
def _format_strike(strike: float) -> int:
@@ -193,6 +195,7 @@ def __init__(self):
193195
self.quote = Quote()
194196
self.open_interest = OpenInterest()
195197
self.contract = Contract()
198+
self.date = None
196199

197200

198201
class ThetaClient:
@@ -223,7 +226,11 @@ def __init__(self, port: int = 11000, timeout: Optional[float] = 60, launch: boo
223226
self._stream_server: Optional[socket.socket] = None # None while disconnected
224227
self.launch = launch
225228
self._stream_impl = None
229+
self._stream_responses = {}
230+
self._counter_lock = threading.Lock()
231+
self._stream_req_id = 0
226232

233+
print('If you require API support, feel free to join our discord server! https://discord.thetadata.us')
227234
if launch:
228235
if username == "default" or passwd == "default":
229236
print('------------------------------------------------------------------------------------------------')
@@ -290,56 +297,123 @@ def connect_stream(self, callback):
290297
def close_stream(self):
291298
self._stream_server.close()
292299

293-
def req_full_trade_stream_opt(self):
300+
def req_full_trade_stream_opt(self, timeout: int = 5):
294301
"""from_bytes
295302
"""
296303
assert self._stream_server is not None, _NOT_CONNECTED_MSG
297304

305+
with self._counter_lock:
306+
req_id = self._stream_req_id
307+
self._stream_responses[req_id] = None
308+
self._stream_req_id += 1
309+
298310
# send request
299-
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&sec={SecType.OPTION.value}&req={OptionReqType.TRADE.value}\n"
311+
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&sec={SecType.OPTION.value}" \
312+
f"&req={OptionReqType.TRADE.value}&id={req_id}\n"
300313
self._stream_server.sendall(hist_msg.encode("utf-8"))
301314

302-
def req_full_open_interest_stream(self):
315+
tries = 0
316+
lim = timeout * 100
317+
while self._stream_responses[req_id] is None: # This is kind of dumb.
318+
sleep(.01)
319+
tries += 1
320+
if tries >= lim:
321+
return StreamResponseType.TIMED_OUT
322+
323+
if self._stream_responses[req_id] is not StreamResponseType.SUBSCRIBED:
324+
raise PermissionError("Invalid permissions for stream request: " + self._stream_responses[req_id].name)
325+
326+
def req_full_open_interest_stream(self, timeout: int = 5):
303327
"""from_bytes
304328
"""
305329
assert self._stream_server is not None, _NOT_CONNECTED_MSG
306330

331+
with self._counter_lock:
332+
req_id = self._stream_req_id
333+
self._stream_responses[req_id] = None
334+
self._stream_req_id += 1
335+
307336
# send request
308337
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&sec={SecType.OPTION.value}" \
309-
f"&req={OptionReqType.OPEN_INTEREST.value}\n"
338+
f"&req={OptionReqType.OPEN_INTEREST.value}&id={req_id}\n"
310339
self._stream_server.sendall(hist_msg.encode("utf-8"))
311340

312-
def req_trade_stream_opt(self, root: str, exp: date, strike: float, right: OptionRight):
341+
tries = 0
342+
lim = timeout * 100
343+
while self._stream_responses[req_id] is None: # This is kind of dumb.
344+
sleep(.01)
345+
tries += 1
346+
if tries >= lim:
347+
return StreamResponseType.TIMED_OUT
348+
349+
if self._stream_responses[req_id] is not StreamResponseType.SUBSCRIBED:
350+
raise PermissionError("Invalid permissions for stream request: " + self._stream_responses[req_id].name)
351+
352+
def req_trade_stream_opt(self, root: str, exp: date = 0, strike: float = 0, right: OptionRight = 'C', timeout: int = 5):
313353
"""from_bytes
314354
"""
315355
assert self._stream_server is not None, _NOT_CONNECTED_MSG
316356
# format data
317357
strike = _format_strike(strike)
318358
exp_fmt = _format_date(exp)
319359

360+
with self._counter_lock:
361+
req_id = self._stream_req_id
362+
self._stream_responses[req_id] = None
363+
self._stream_req_id += 1
364+
320365
# send request
321366
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&root={root}&exp={exp_fmt}&strike={strike}" \
322-
f"&right={right.value}&sec={SecType.OPTION.value}&req={OptionReqType.TRADE.value}\n"
367+
f"&right={right.value}&sec={SecType.OPTION.value}&req={OptionReqType.TRADE.value}&id={req_id}\n"
323368
self._stream_server.sendall(hist_msg.encode("utf-8"))
324369

325-
def req_quote_stream_opt(self, root: str, exp: date, strike: float, right: OptionRight):
370+
tries = 0
371+
lim = timeout * 100
372+
while self._stream_responses[req_id] is None: # This is kind of dumb.
373+
sleep(.01)
374+
tries += 1
375+
if tries >= lim:
376+
return StreamResponseType.TIMED_OUT
377+
378+
if self._stream_responses[req_id] is not StreamResponseType.SUBSCRIBED:
379+
raise PermissionError("Invalid permissions for stream request: " + self._stream_responses[req_id].name)
380+
381+
def req_quote_stream_opt(self, root: str, exp: date = 0, strike: float = 0, right: OptionRight = 'C', timeout: int = 5):
326382
"""from_bytes
327383
"""
328384
assert self._stream_server is not None, _NOT_CONNECTED_MSG
329385
# format data
330386
strike = _format_strike(strike)
331387
exp_fmt = _format_date(exp)
332388

389+
with self._counter_lock:
390+
req_id = self._stream_req_id
391+
self._stream_responses[req_id] = None
392+
self._stream_req_id += 1
393+
333394
# send request
334395
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&root={root}&exp={exp_fmt}&strike={strike}" \
335-
f"&right={right.value}&sec={SecType.OPTION.value}&req={OptionReqType.QUOTE.value}\n"
396+
f"&right={right.value}&sec={SecType.OPTION.value}&req={OptionReqType.QUOTE.value}&id={req_id}\n"
336397
self._stream_server.sendall(hist_msg.encode("utf-8"))
337398

399+
tries = 0
400+
lim = timeout * 100
401+
while self._stream_responses[req_id] is None: # This is kind of dumb.
402+
sleep(.01)
403+
tries += 1
404+
if tries >= lim:
405+
return StreamResponseType.TIMED_OUT
406+
407+
if self._stream_responses[req_id] is not StreamResponseType.SUBSCRIBED:
408+
raise PermissionError("Invalid permissions for stream request: " + self._stream_responses[req_id].name)
409+
338410
def _recv_stream(self):
339411
"""from_bytes
340412
"""
341413
msg = StreamMsg()
414+
342415
parse_int = lambda d: int.from_bytes(d, "big")
416+
343417
while True:
344418
msg.type = StreamMsgType.from_code(parse_int(self._read_stream(1)[:1]))
345419
msg.contract.from_bytes(self._read_stream(parse_int(self._read_stream(1)[:1])))
@@ -351,11 +425,20 @@ def _recv_stream(self):
351425
msg.trade.from_bytes(data)
352426
elif msg.type == StreamMsgType.PING:
353427
self._read_stream(n_bytes=4)
428+
continue
354429
elif msg.type == StreamMsgType.OPEN_INTEREST:
355430
data = self._read_stream(n_bytes=8)
356431
msg.open_interest.from_bytes(data)
357-
else:
432+
elif msg.type == StreamMsgType.REQ_RESPONSE:
433+
msg_id = parse_int(self._read_stream(4))
434+
msg_rep = StreamResponseType.from_code(parse_int(self._read_stream(4)))
435+
self._stream_responses[msg_id] = msg_rep
358436
continue
437+
elif msg.type == StreamMsgType.STOP or msg.type == StreamMsgType.START:
438+
msg.date = datetime.strptime(str(parse_int(self._read_stream(4))), "%Y%m%d").date()
439+
else:
440+
raise ValueError('undefined msg type')
441+
359442
self._stream_impl(msg)
360443

361444
def _read_stream(self, n_bytes: int) -> bytearray:

thetadata/enums.py

+29-8
Original file line numberDiff line numberDiff line change
@@ -296,15 +296,13 @@ class StreamMsgType(enum.Enum):
296296
TRADE = 22
297297
OPEN_INTEREST = 23
298298

299-
STOP = 30
299+
# Tape commands
300+
START = 30
301+
RESTART = 31
302+
STOP = 32
300303

301-
# Experimental
302-
REQUEST_SERVER_LIST = 300
303-
REQUEST_OPTIMAL_SERVER = 301
304-
OPTIMAL_SERVER = 302
305-
PACKET = 303
306-
BAN_IP = 304
307-
POPULATION = 305
304+
# Misc
305+
REQ_RESPONSE = 40
308306

309307
@classmethod
310308
def from_code(cls, code: int) -> StreamMsgType:
@@ -668,3 +666,26 @@ def from_code(cls, code: int) -> QuoteCondition:
668666
if code == member.value:
669667
return member
670668
return QuoteCondition.UNDEFINED
669+
670+
671+
@enum.unique
672+
class StreamResponseType(enum.Enum):
673+
"""Codes used to ID types of requests/responses."""
674+
675+
# Internal client communication
676+
SUBSCRIBED = 0 # This doesn't guarantee you will get data back for the contract. It just means that if this contract exists, you will get data for it.
677+
TIMED_OUT = 1 # The request to stream something timed out.
678+
MAX_STREAMS_REACHED = 2 # Returned when you are streaming too many contracts.
679+
INVALID_PERMS = 3 # If you do not have permissions for the stream request.
680+
681+
@classmethod
682+
def from_code(cls, code: int) -> StreamResponseType:
683+
"""Create a MessageType by its associated code.
684+
685+
:raises EnumParseError: If the code does not match a MessageType
686+
"""
687+
for member in cls:
688+
if code == member.value:
689+
return member
690+
raise exceptions._EnumParseError(code, cls)
691+

0 commit comments

Comments
 (0)