Skip to content

Commit

Permalink
add spread_typefilter for UDS. added global message hanlder on_me…
Browse files Browse the repository at this point in the history
…ssage`
  • Loading branch information
mvberg committed Sep 20, 2023
1 parent ed2f3f4 commit a12a2ca
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 33 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ of_client.on_error = lambda x: print("Error:", x)
of_client.on_connected = lambda x: print("Connected")
of_client.on_disconnected = lambda x: print("Disconnected")

# attach a global message handler
of_client.on_message = lambda x: print("Global Message:", x)

# sub to markets by symbol
def on_message(msg):
print("Market Data: ", msg)
Expand Down
14 changes: 10 additions & 4 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
args = parser.parse_args()

# new client with credentials
of_client = openfeed.OpenfeedClient(args.u, args.p)
of_client = openfeed.OpenfeedClient(
args.u, args.p, debug=False, server="demo.openfeed.barchart.com")

# app state handlers
of_client.on_error = lambda x: print("OnError:", x)
Expand All @@ -18,13 +19,18 @@
of_client.on_login = lambda x: print("OnLogin:", x)
of_client.on_logout = lambda x: print("OnLogout:", x)

# sub to markets by symbol
# add a global message handler for all incoming OF messages
# of_client.on_message = lambda msg: print("Global Message:", msg)

# sub message handler
def on_message(msg):
print("Market Data Message: ", msg)

#of_client.add_symbol_subscription("ADF.WS", callback=on_message, subscription_type=["OHLC"])
# subscribe to UDS an filter by type (optional)
of_client.add_symbol_subscription(
"T8^UDS", callback=on_message, subscription_type=["QUOTE"], spread_type_filter=["RR", "JR"])

of_client.request_instruments_for_exchange("AMEX", callback=on_message)
# of_client.request_instruments_for_exchange("AMEX", callback=on_message)

# list exchanges

Expand Down
80 changes: 53 additions & 27 deletions openfeed/openfeed_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, username, password, server="openfeed.aws.barchart.com", debug
self.on_error = None
self.on_login = None
self.on_logout = None
self.on_message = None

websocket.enableTrace(self.debug)

Expand Down Expand Up @@ -75,7 +76,7 @@ def add_heartbeat_subscription(self, callback):

return self

def add_symbol_subscription(self, symbol: Union[str, list], callback, service="REAL_TIME", subscription_type=["QUOTE"], snapshot_interval_seconds=60):
def add_symbol_subscription(self, symbol: Union[str, list], callback, service="REAL_TIME", subscription_type=["QUOTE"], snapshot_interval_seconds=60, spread_type_filter=[]):
"""Subscribe to [Market Data] by Barchart Symbols
Complete list of [SubscriptionTypes]. List of [Service] types. List of [InstrumentTypes].
Expand All @@ -90,7 +91,8 @@ def add_symbol_subscription(self, symbol: Union[str, list], callback, service="R
Default is ['QUOTE']. Can contain any of: 'ALL', 'QUOTE', 'QUOTE_PARTICIPANT', 'DEPTH_PRICE', 'DEPTH_ORDER', 'TRADES', 'OHLC'
[Market Data]: https://docs.barchart.com/openfeed/#/proto?id=marketupdate
[SubscriptionTypes]: https://docs.barchart.com/openfeed/#/proto?id=subscriptiontype
#/proto?id=subscriptiontype
[SubscriptionTypes]: https://docs.barchart.com/openfeed/
[Service]: https://docs.barchart.com/openfeed/#/proto?id=service
"""
symbols = []
Expand All @@ -105,11 +107,11 @@ def add_symbol_subscription(self, symbol: Union[str, list], callback, service="R
self.symbol_handlers[sym] = []

self.symbol_handlers[sym].append(
Listener(symbol=sym, callback=callback, service=service, subscription_type=subscription_type, snapshot_interval_seconds=snapshot_interval_seconds))
Listener(symbol=sym, callback=callback, service=service, subscription_type=subscription_type, snapshot_interval_seconds=snapshot_interval_seconds, spread_types=spread_type_filter))

if self.token is not None:
self._send_message(
self.__create_subscription_request(symbols=symbols, service=service, subscription_type=subscription_type, snapshot_interval_seconds=snapshot_interval_seconds))
self.__create_subscription_request(symbols=symbols, service=service, subscription_type=subscription_type, snapshot_interval_seconds=snapshot_interval_seconds, spread_types=spread_type_filter))

return self

Expand All @@ -136,10 +138,13 @@ def add_exchange_subscription(self, exchange: Union[str, list], callback, servic
List of [BulkSubscriptionFilter]
[Market Data]: https://docs.barchart.com/openfeed/#/proto?id=marketupdate
[SubscriptionTypes]: https://docs.barchart.com/openfeed/#/proto?id=subscriptiontype
#/proto?id=subscriptiontype
[SubscriptionTypes]: https://docs.barchart.com/openfeed/
[ServiceType]: https://docs.barchart.com/openfeed/#/proto?id=service
[InstrumentTypes]: https://docs.barchart.com/openfeed/#/proto?id=instrumentdefinitioninstrumenttype
[BulkSubscriptionFilter]: https://docs.barchart.com/openfeed/#/proto?id=bulksubscriptionfilter
#/proto?id=instrumentdefinitioninstrumenttype
[InstrumentTypes]: https://docs.barchart.com/openfeed/
#/proto?id=bulksubscriptionfilter
[BulkSubscriptionFilter]: https://docs.barchart.com/openfeed/
"""
exchanges = []

Expand All @@ -156,15 +161,16 @@ def add_exchange_subscription(self, exchange: Union[str, list], callback, servic
exchange=exch, callback=callback, service=service, subscription_type=subscription_type, instrument_type=instrument_type, snapshot_interval_seconds=snapshot_interval_seconds, bulk_subscription_filters=bulk_subscription_filters))

if self.token is not None:
self._send_message(
self._send_message(
self.__create_subscription_request(exchanges=exchanges, service=service, subscription_type=subscription_type, instrument_type=instrument_type, snapshot_interval_seconds=snapshot_interval_seconds, bulk_subscription_filters=bulk_subscription_filters))

return self

def request_available_exchanges(self, callback):
"""Request a list of available [Exchanges] for subscription.
[Exchanges]: https://docs.barchart.com/openfeed/#/openfeed_streaming?id=requesting-exchanges
#/openfeed_streaming?id=requesting-exchanges
[Exchanges]: https://docs.barchart.com/openfeed/
"""
rid = random.getrandbits(32)
req = self.__create_exchange_request(rid)
Expand All @@ -179,7 +185,8 @@ def request_available_exchanges(self, callback):
def request_instruments_for_exchange(self, exchange, callback):
"""Request a list of [Instrument Definitions] actively trading trading on an exchange.
[Instrument Definitions]: https://docs.barchart.com/openfeed/#/proto?id=openfeed_instrumentproto
#/proto?id=openfeed_instrumentproto
[Instrument Definitions]: https://docs.barchart.com/openfeed/
"""

rid = random.getrandbits(32)
Expand All @@ -193,12 +200,14 @@ def request_instruments_for_exchange(self, exchange, callback):
return self

def request_instruments(self, callback, symbol=None, market_id=None, exchange=None):
"""Request [Instrument Definitions] by `symbol`, `market_id`, or `exchange`
"""Request [Instrument Definitions] by `symbol`, `market_id`, or `exchange`
See [Instrument Request]
[Instrument Definitions]: https://docs.barchart.com/openfeed/#/proto?id=openfeed_instrumentproto
[Instrument Request]: https://docs.barchart.com/openfeed/#/proto?id=instrumentrequest
#/proto?id=openfeed_instrumentproto
[Instrument Definitions]: https://docs.barchart.com/openfeed/
#/proto?id=instrumentrequest
[Instrument Request]: https://docs.barchart.com/openfeed/
"""

rid = random.getrandbits(32)
Expand All @@ -215,22 +224,26 @@ def request_instruments(self, callback, symbol=None, market_id=None, exchange=No
def get_instrument_definitions(self):
"""Returns a dict of Openfeed [Instrument Definitions] keyed by MarketID
[Instrument Definitions]: https://docs.barchart.com/openfeed/#/proto?id=openfeed_instrumentproto
#/proto?id=openfeed_instrumentproto
[Instrument Definitions]: https://docs.barchart.com/openfeed/
"""
return self.instrument_definitions

def get_instrument_definition(self, id):
def get_instrument_definition(self, _id):
"""Returns an [Instrument Definition] for a Market ID
[Instrument Definition]: https://docs.barchart.com/openfeed/#/proto?id=openfeed_instrumentproto
#/proto?id=openfeed_instrumentproto
[Instrument Definition]: https://docs.barchart.com/openfeed/
"""
return self.instrument_definitions[id].instrumentDefinition
return self.instrument_definitions[_id].instrumentDefinition

def get_instrument_definition_by_symbol(self, symbol):
"""Returns an [Instrument Definition] for a [Symbol] string
[Instrument Definition]: https://docs.barchart.com/openfeed/#/proto?id=openfeed_instrumentproto
[Symbol]: https://docs.barchart.com/openfeed/#/proto?id=instrumentdefinitionsymbol
#/proto?id=openfeed_instrumentproto
[Instrument Definition]: https://docs.barchart.com/openfeed/
#/proto?id=instrumentdefinitionsymbol
[Symbol]: https://docs.barchart.com/openfeed/
"""
return self.instruments_by_symbol[symbol].instrumentDefinition

Expand Down Expand Up @@ -390,15 +403,19 @@ def on_message(ws: websocket.WebSocketApp, message):
msg = openfeed_api_pb2.OpenfeedGatewayMessage()
msg.ParseFromString(byte_buffer.read(msg_len))

msg_type = msg.WhichOneof("data")

if self.debug:
msg_count = msg_count+1
print("msg len:", msg_len, "number of messages:", msg_count)

msg_type = msg.WhichOneof("data")
print("msg len:", msg_len, "type:",
msg_type, "total messages:", msg_count)

handler = handlers.get(
msg_type, lambda x: print("Unhandled Message:", x))

# global on_message callback
self.__callback(self.on_message, msg)

try:
handler(msg)
except Exception as e:
Expand Down Expand Up @@ -443,6 +460,9 @@ def __notify_symbol_listeners(self, instrument, msg):
if instrument.marketId in self.subscription_symbol_by_marketid:
symbol_key = self.subscription_symbol_by_marketid[instrument.marketId]

if symbol_key == "":
symbol_key = instrument.subscriptionSymbol

for s in instrument.symbols:
if s.symbol not in self.symbol_handlers or s.vendor != "Barchart":
continue
Expand All @@ -456,6 +476,9 @@ def __notify_symbol_listeners(self, instrument, msg):
if self.debug:
print("Failed to notify `symbol` callback:", symbol_key, e)
self.__callback(self.on_error, e)
else:
if self.debug:
print("Failed to find symbol key:", instrument)

def __notify_exchange_listeners(self, exchange, msg):
if exchange not in self.exchange_handlers:
Expand Down Expand Up @@ -492,7 +515,7 @@ def __send_existing_interest(self):
listeners_by_service = interest[l.service]
if l.key() not in listeners_by_service:
listeners_by_service[l.key()] = Listener(
symbol=l.symbol, exchange=l.exchange, service=l.service, subscription_type=l.subscription_type, instrument_type=l.instrument_type, snapshot_interval_seconds=l.snapshot_interval_seconds, bulk_subscription_filters=l.bulk_subscription_filters)
symbol=l.symbol, exchange=l.exchange, service=l.service, subscription_type=l.subscription_type, instrument_type=l.instrument_type, snapshot_interval_seconds=l.snapshot_interval_seconds, bulk_subscription_filters=l.bulk_subscription_filters, spread_types=l.spread_types)
else:
existing = listeners_by_service[l.key()]
existing.subscription_type = list(set(
Expand All @@ -508,13 +531,14 @@ def __send_existing_interest(self):
subscription_type=i.subscription_type,
instrument_type=i.get_instrument_types(),
snapshot_interval_seconds=i.snapshot_interval_seconds,
bulk_subscription_filters=i.bulk_subscription_filters))
bulk_subscription_filters=i.bulk_subscription_filters,
spread_types=i.spread_types))

# send other rpc requests
for req in self.request_id_handlers.values():
req.send(self)

def __create_subscription_request(self, exchanges=[], symbols=[], service="REAL_TIME", subscription_type=["QUOTE"], instrument_type=[], snapshot_interval_seconds=60, bulk_subscription_filters=[]):
def __create_subscription_request(self, exchanges=[], symbols=[], service="REAL_TIME", subscription_type=["QUOTE"], instrument_type=[], snapshot_interval_seconds=60, bulk_subscription_filters=[], spread_types=[]):
requests = []

if len(exchanges) > 0:
Expand All @@ -536,7 +560,8 @@ def __create_subscription_request(self, exchanges=[], symbols=[], service="REAL_
symbol=sym,
subscriptionType=[openfeed_api_pb2.SubscriptionType.Value(
t) for t in subscription_type],
snapshotIntervalSeconds=snapshot_interval_seconds
snapshotIntervalSeconds=snapshot_interval_seconds,
spreadTypeFilter=spread_types
))

of_req = openfeed_api_pb2.OpenfeedGatewayRequest(
Expand Down Expand Up @@ -611,7 +636,7 @@ def __callback(self, callback, *args):


class Listener(object):
def __init__(self, symbol="", exchange="", callback=None, service="REAL_TIME", subscription_type=["QUOTE"], instrument_type=[], snapshot_interval_seconds=60, bulk_subscription_filters=[]):
def __init__(self, symbol="", exchange="", callback=None, service="REAL_TIME", subscription_type=["QUOTE"], instrument_type=[], snapshot_interval_seconds=60, bulk_subscription_filters=[], spread_types=[]):
self.symbol = symbol
self.exchange = exchange
self.callback = callback
Expand All @@ -620,6 +645,7 @@ def __init__(self, symbol="", exchange="", callback=None, service="REAL_TIME", s
self.instrument_type = instrument_type
self.snapshot_interval_seconds = snapshot_interval_seconds
self.bulk_subscription_filters = bulk_subscription_filters
self.spread_types = spread_types

def key(self):
if len(self.exchange) > 0:
Expand Down
2 changes: 1 addition & 1 deletion openfeed/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '1.3.0'
VERSION = '1.4.0'
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name='openfeed',
version='1.4.0',
author='Barchart',
author_email='mike@barchart.com',
author_email='openfeed@barchart.com',
license='MIT',
url='https://github.com/openfeed-org/sdk-python',
include_package_data=True,
Expand Down

0 comments on commit a12a2ca

Please sign in to comment.