Skip to content
This repository was archived by the owner on Nov 18, 2024. It is now read-only.

Commit e114823

Browse files
committed
- Performance improvement for handling streaming requests.
1 parent 6017872 commit e114823

File tree

1 file changed

+19
-51
lines changed

1 file changed

+19
-51
lines changed

thetadata/client.py

+19-51
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ class StreamMsg:
191191
"""Stream Msg"""
192192
def __init__(self):
193193
self.type = StreamMsgType.ERROR
194+
self.req_response = None
195+
self.req_response_id = None
194196
self.trade = Trade()
195197
self.quote = Quote()
196198
self.open_interest = OpenInterest()
@@ -297,7 +299,7 @@ def connect_stream(self, callback):
297299
def close_stream(self):
298300
self._stream_server.close()
299301

300-
def req_full_trade_stream_opt(self, timeout: int = 5):
302+
def req_full_trade_stream_opt(self) -> int:
301303
"""from_bytes
302304
"""
303305
assert self._stream_server is not None, _NOT_CONNECTED_MSG
@@ -308,22 +310,11 @@ def req_full_trade_stream_opt(self, timeout: int = 5):
308310
self._stream_req_id += 1
309311

310312
# send request
311-
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&sec={SecType.OPTION.value}" \
312-
f"&req={OptionReqType.TRADE.value}&id={req_id}\n"
313+
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&sec={SecType.OPTION.value}&req={OptionReqType.TRADE.value}&id={req_id}\n"
313314
self._stream_server.sendall(hist_msg.encode("utf-8"))
315+
return req_id
314316

315-
tries = 0
316-
lim = timeout * 100
317-
while self._stream_responses[req_id] is None: # This is kind of dumb.
318-
sleep(.01)
319-
tries += 1
320-
if tries >= lim:
321-
return StreamResponseType.TIMED_OUT
322-
323-
if self._stream_responses[req_id] is not StreamResponseType.SUBSCRIBED:
324-
raise PermissionError("Invalid permissions for stream request: " + self._stream_responses[req_id].name)
325-
326-
def req_full_open_interest_stream(self, timeout: int = 5):
317+
def req_full_open_interest_stream(self) -> id:
327318
"""from_bytes
328319
"""
329320
assert self._stream_server is not None, _NOT_CONNECTED_MSG
@@ -334,22 +325,11 @@ def req_full_open_interest_stream(self, timeout: int = 5):
334325
self._stream_req_id += 1
335326

336327
# send request
337-
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&sec={SecType.OPTION.value}" \
338-
f"&req={OptionReqType.OPEN_INTEREST.value}&id={req_id}\n"
328+
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&sec={SecType.OPTION.value}&req={OptionReqType.OPEN_INTEREST.value}&id={req_id}\n"
339329
self._stream_server.sendall(hist_msg.encode("utf-8"))
330+
return req_id
340331

341-
tries = 0
342-
lim = timeout * 100
343-
while self._stream_responses[req_id] is None: # This is kind of dumb.
344-
sleep(.01)
345-
tries += 1
346-
if tries >= lim:
347-
return StreamResponseType.TIMED_OUT
348-
349-
if self._stream_responses[req_id] is not StreamResponseType.SUBSCRIBED:
350-
raise PermissionError("Invalid permissions for stream request: " + self._stream_responses[req_id].name)
351-
352-
def req_trade_stream_opt(self, root: str, exp: date = 0, strike: float = 0, right: OptionRight = 'C', timeout: int = 5):
332+
def req_trade_stream_opt(self, root: str, exp: date = 0, strike: float = 0, right: OptionRight = 'C') -> int:
353333
"""from_bytes
354334
"""
355335
assert self._stream_server is not None, _NOT_CONNECTED_MSG
@@ -363,22 +343,11 @@ def req_trade_stream_opt(self, root: str, exp: date = 0, strike: float = 0, righ
363343
self._stream_req_id += 1
364344

365345
# send request
366-
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&root={root}&exp={exp_fmt}&strike={strike}" \
367-
f"&right={right.value}&sec={SecType.OPTION.value}&req={OptionReqType.TRADE.value}&id={req_id}\n"
346+
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&root={root}&exp={exp_fmt}&strike={strike}&right={right.value}&sec={SecType.OPTION.value}&req={OptionReqType.TRADE.value}&id={req_id}\n"
368347
self._stream_server.sendall(hist_msg.encode("utf-8"))
348+
return req_id
369349

370-
tries = 0
371-
lim = timeout * 100
372-
while self._stream_responses[req_id] is None: # This is kind of dumb.
373-
sleep(.01)
374-
tries += 1
375-
if tries >= lim:
376-
return StreamResponseType.TIMED_OUT
377-
378-
if self._stream_responses[req_id] is not StreamResponseType.SUBSCRIBED:
379-
raise PermissionError("Invalid permissions for stream request: " + self._stream_responses[req_id].name)
380-
381-
def req_quote_stream_opt(self, root: str, exp: date = 0, strike: float = 0, right: OptionRight = 'C', timeout: int = 5):
350+
def req_quote_stream_opt(self, root: str, exp: date = 0, strike: float = 0, right: OptionRight = 'C') -> int:
382351
"""from_bytes
383352
"""
384353
assert self._stream_server is not None, _NOT_CONNECTED_MSG
@@ -392,10 +361,11 @@ def req_quote_stream_opt(self, root: str, exp: date = 0, strike: float = 0, righ
392361
self._stream_req_id += 1
393362

394363
# send request
395-
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&root={root}&exp={exp_fmt}&strike={strike}" \
396-
f"&right={right.value}&sec={SecType.OPTION.value}&req={OptionReqType.QUOTE.value}&id={req_id}\n"
364+
hist_msg = f"MSG_CODE={MessageType.STREAM_REQ.value}&root={root}&exp={exp_fmt}&strike={strike}&right={right.value}&sec={SecType.OPTION.value}&req={OptionReqType.QUOTE.value}&id={req_id}\n"
397365
self._stream_server.sendall(hist_msg.encode("utf-8"))
366+
return req_id
398367

368+
def verify(self, req_id: int, timeout: int = 5) -> StreamResponseType:
399369
tries = 0
400370
lim = timeout * 100
401371
while self._stream_responses[req_id] is None: # This is kind of dumb.
@@ -404,8 +374,7 @@ def req_quote_stream_opt(self, root: str, exp: date = 0, strike: float = 0, righ
404374
if tries >= lim:
405375
return StreamResponseType.TIMED_OUT
406376

407-
if self._stream_responses[req_id] is not StreamResponseType.SUBSCRIBED:
408-
raise PermissionError("Invalid permissions for stream request: " + self._stream_responses[req_id].name)
377+
return self._stream_responses[req_id]
409378

410379
def _recv_stream(self):
411380
"""from_bytes
@@ -430,10 +399,9 @@ def _recv_stream(self):
430399
data = self._read_stream(n_bytes=8)
431400
msg.open_interest.from_bytes(data)
432401
elif msg.type == StreamMsgType.REQ_RESPONSE:
433-
msg_id = parse_int(self._read_stream(4))
434-
msg_rep = StreamResponseType.from_code(parse_int(self._read_stream(4)))
435-
self._stream_responses[msg_id] = msg_rep
436-
continue
402+
msg.req_response_id = parse_int(self._read_stream(4))
403+
msg.req_response = StreamResponseType.from_code(parse_int(self._read_stream(4)))
404+
self._stream_responses[msg.req_response_id] = msg.req_response
437405
elif msg.type == StreamMsgType.STOP or msg.type == StreamMsgType.START:
438406
msg.date = datetime.strptime(str(parse_int(self._read_stream(4))), "%Y%m%d").date()
439407
else:

0 commit comments

Comments
 (0)