diff --git a/README.md b/README.md index 2cb0b26..5249082 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,9 @@ of_client.on_error = lambda x: print("Error:", x) of_client.on_connected = lambda x: print("Connected") of_client.on_disconnected = lambda x: print("Disconnected") +# attach a global message handler +of_client.on_message = lambda x: print("Global Message:", x) + # sub to markets by symbol def on_message(msg): print("Market Data: ", msg) diff --git a/example.py b/example.py index 464e285..75ce37e 100644 --- a/example.py +++ b/example.py @@ -9,7 +9,8 @@ args = parser.parse_args() # new client with credentials - of_client = openfeed.OpenfeedClient(args.u, args.p) + of_client = openfeed.OpenfeedClient( + args.u, args.p, debug=False, server="demo.openfeed.barchart.com") # app state handlers of_client.on_error = lambda x: print("OnError:", x) @@ -18,13 +19,18 @@ of_client.on_login = lambda x: print("OnLogin:", x) of_client.on_logout = lambda x: print("OnLogout:", x) - # sub to markets by symbol + # add a global message handler for all incoming OF messages + # of_client.on_message = lambda msg: print("Global Message:", msg) + + # sub message handler def on_message(msg): print("Market Data Message: ", msg) - #of_client.add_symbol_subscription("ADF.WS", callback=on_message, subscription_type=["OHLC"]) + # subscribe to UDS an filter by type (optional) + of_client.add_symbol_subscription( + "T8^UDS", callback=on_message, subscription_type=["QUOTE"], spread_type_filter=["RR", "JR"]) - of_client.request_instruments_for_exchange("AMEX", callback=on_message) + # of_client.request_instruments_for_exchange("AMEX", callback=on_message) # list exchanges diff --git a/openfeed/openfeed_client.py b/openfeed/openfeed_client.py index 9b4f4c1..32bba99 100644 --- a/openfeed/openfeed_client.py +++ b/openfeed/openfeed_client.py @@ -44,6 +44,7 @@ def __init__(self, username, password, server="openfeed.aws.barchart.com", debug self.on_error = None self.on_login = None self.on_logout = None + self.on_message = None websocket.enableTrace(self.debug) @@ -75,7 +76,7 @@ def add_heartbeat_subscription(self, callback): return self - def add_symbol_subscription(self, symbol: Union[str, list], callback, service="REAL_TIME", subscription_type=["QUOTE"], snapshot_interval_seconds=60): + def add_symbol_subscription(self, symbol: Union[str, list], callback, service="REAL_TIME", subscription_type=["QUOTE"], snapshot_interval_seconds=60, spread_type_filter=[]): """Subscribe to [Market Data] by Barchart Symbols Complete list of [SubscriptionTypes]. List of [Service] types. List of [InstrumentTypes]. @@ -90,7 +91,8 @@ def add_symbol_subscription(self, symbol: Union[str, list], callback, service="R Default is ['QUOTE']. Can contain any of: 'ALL', 'QUOTE', 'QUOTE_PARTICIPANT', 'DEPTH_PRICE', 'DEPTH_ORDER', 'TRADES', 'OHLC' [Market Data]: https://docs.barchart.com/openfeed/#/proto?id=marketupdate - [SubscriptionTypes]: https://docs.barchart.com/openfeed/#/proto?id=subscriptiontype + #/proto?id=subscriptiontype + [SubscriptionTypes]: https://docs.barchart.com/openfeed/ [Service]: https://docs.barchart.com/openfeed/#/proto?id=service """ symbols = [] @@ -105,11 +107,11 @@ def add_symbol_subscription(self, symbol: Union[str, list], callback, service="R self.symbol_handlers[sym] = [] self.symbol_handlers[sym].append( - Listener(symbol=sym, callback=callback, service=service, subscription_type=subscription_type, snapshot_interval_seconds=snapshot_interval_seconds)) + Listener(symbol=sym, callback=callback, service=service, subscription_type=subscription_type, snapshot_interval_seconds=snapshot_interval_seconds, spread_types=spread_type_filter)) if self.token is not None: self._send_message( - self.__create_subscription_request(symbols=symbols, service=service, subscription_type=subscription_type, snapshot_interval_seconds=snapshot_interval_seconds)) + self.__create_subscription_request(symbols=symbols, service=service, subscription_type=subscription_type, snapshot_interval_seconds=snapshot_interval_seconds, spread_types=spread_type_filter)) return self @@ -136,10 +138,13 @@ def add_exchange_subscription(self, exchange: Union[str, list], callback, servic List of [BulkSubscriptionFilter] [Market Data]: https://docs.barchart.com/openfeed/#/proto?id=marketupdate - [SubscriptionTypes]: https://docs.barchart.com/openfeed/#/proto?id=subscriptiontype + #/proto?id=subscriptiontype + [SubscriptionTypes]: https://docs.barchart.com/openfeed/ [ServiceType]: https://docs.barchart.com/openfeed/#/proto?id=service - [InstrumentTypes]: https://docs.barchart.com/openfeed/#/proto?id=instrumentdefinitioninstrumenttype - [BulkSubscriptionFilter]: https://docs.barchart.com/openfeed/#/proto?id=bulksubscriptionfilter + #/proto?id=instrumentdefinitioninstrumenttype + [InstrumentTypes]: https://docs.barchart.com/openfeed/ + #/proto?id=bulksubscriptionfilter + [BulkSubscriptionFilter]: https://docs.barchart.com/openfeed/ """ exchanges = [] @@ -156,7 +161,7 @@ def add_exchange_subscription(self, exchange: Union[str, list], callback, servic exchange=exch, callback=callback, service=service, subscription_type=subscription_type, instrument_type=instrument_type, snapshot_interval_seconds=snapshot_interval_seconds, bulk_subscription_filters=bulk_subscription_filters)) if self.token is not None: - self._send_message( + self._send_message( self.__create_subscription_request(exchanges=exchanges, service=service, subscription_type=subscription_type, instrument_type=instrument_type, snapshot_interval_seconds=snapshot_interval_seconds, bulk_subscription_filters=bulk_subscription_filters)) return self @@ -164,7 +169,8 @@ def add_exchange_subscription(self, exchange: Union[str, list], callback, servic def request_available_exchanges(self, callback): """Request a list of available [Exchanges] for subscription. - [Exchanges]: https://docs.barchart.com/openfeed/#/openfeed_streaming?id=requesting-exchanges + #/openfeed_streaming?id=requesting-exchanges + [Exchanges]: https://docs.barchart.com/openfeed/ """ rid = random.getrandbits(32) req = self.__create_exchange_request(rid) @@ -179,7 +185,8 @@ def request_available_exchanges(self, callback): def request_instruments_for_exchange(self, exchange, callback): """Request a list of [Instrument Definitions] actively trading trading on an exchange. - [Instrument Definitions]: https://docs.barchart.com/openfeed/#/proto?id=openfeed_instrumentproto + #/proto?id=openfeed_instrumentproto + [Instrument Definitions]: https://docs.barchart.com/openfeed/ """ rid = random.getrandbits(32) @@ -193,12 +200,14 @@ def request_instruments_for_exchange(self, exchange, callback): return self def request_instruments(self, callback, symbol=None, market_id=None, exchange=None): - """Request [Instrument Definitions] by `symbol`, `market_id`, or `exchange` + """Request [Instrument Definitions] by `symbol`, `market_id`, or `exchange` See [Instrument Request] - [Instrument Definitions]: https://docs.barchart.com/openfeed/#/proto?id=openfeed_instrumentproto - [Instrument Request]: https://docs.barchart.com/openfeed/#/proto?id=instrumentrequest + #/proto?id=openfeed_instrumentproto + [Instrument Definitions]: https://docs.barchart.com/openfeed/ + #/proto?id=instrumentrequest + [Instrument Request]: https://docs.barchart.com/openfeed/ """ rid = random.getrandbits(32) @@ -215,22 +224,26 @@ def request_instruments(self, callback, symbol=None, market_id=None, exchange=No def get_instrument_definitions(self): """Returns a dict of Openfeed [Instrument Definitions] keyed by MarketID - [Instrument Definitions]: https://docs.barchart.com/openfeed/#/proto?id=openfeed_instrumentproto + #/proto?id=openfeed_instrumentproto + [Instrument Definitions]: https://docs.barchart.com/openfeed/ """ return self.instrument_definitions - def get_instrument_definition(self, id): + def get_instrument_definition(self, _id): """Returns an [Instrument Definition] for a Market ID - [Instrument Definition]: https://docs.barchart.com/openfeed/#/proto?id=openfeed_instrumentproto + #/proto?id=openfeed_instrumentproto + [Instrument Definition]: https://docs.barchart.com/openfeed/ """ - return self.instrument_definitions[id].instrumentDefinition + return self.instrument_definitions[_id].instrumentDefinition def get_instrument_definition_by_symbol(self, symbol): """Returns an [Instrument Definition] for a [Symbol] string - [Instrument Definition]: https://docs.barchart.com/openfeed/#/proto?id=openfeed_instrumentproto - [Symbol]: https://docs.barchart.com/openfeed/#/proto?id=instrumentdefinitionsymbol + #/proto?id=openfeed_instrumentproto + [Instrument Definition]: https://docs.barchart.com/openfeed/ + #/proto?id=instrumentdefinitionsymbol + [Symbol]: https://docs.barchart.com/openfeed/ """ return self.instruments_by_symbol[symbol].instrumentDefinition @@ -390,15 +403,19 @@ def on_message(ws: websocket.WebSocketApp, message): msg = openfeed_api_pb2.OpenfeedGatewayMessage() msg.ParseFromString(byte_buffer.read(msg_len)) + msg_type = msg.WhichOneof("data") + if self.debug: msg_count = msg_count+1 - print("msg len:", msg_len, "number of messages:", msg_count) - - msg_type = msg.WhichOneof("data") + print("msg len:", msg_len, "type:", + msg_type, "total messages:", msg_count) handler = handlers.get( msg_type, lambda x: print("Unhandled Message:", x)) + # global on_message callback + self.__callback(self.on_message, msg) + try: handler(msg) except Exception as e: @@ -443,6 +460,9 @@ def __notify_symbol_listeners(self, instrument, msg): if instrument.marketId in self.subscription_symbol_by_marketid: symbol_key = self.subscription_symbol_by_marketid[instrument.marketId] + if symbol_key == "": + symbol_key = instrument.subscriptionSymbol + for s in instrument.symbols: if s.symbol not in self.symbol_handlers or s.vendor != "Barchart": continue @@ -456,6 +476,9 @@ def __notify_symbol_listeners(self, instrument, msg): if self.debug: print("Failed to notify `symbol` callback:", symbol_key, e) self.__callback(self.on_error, e) + else: + if self.debug: + print("Failed to find symbol key:", instrument) def __notify_exchange_listeners(self, exchange, msg): if exchange not in self.exchange_handlers: @@ -492,7 +515,7 @@ def __send_existing_interest(self): listeners_by_service = interest[l.service] if l.key() not in listeners_by_service: listeners_by_service[l.key()] = Listener( - symbol=l.symbol, exchange=l.exchange, service=l.service, subscription_type=l.subscription_type, instrument_type=l.instrument_type, snapshot_interval_seconds=l.snapshot_interval_seconds, bulk_subscription_filters=l.bulk_subscription_filters) + symbol=l.symbol, exchange=l.exchange, service=l.service, subscription_type=l.subscription_type, instrument_type=l.instrument_type, snapshot_interval_seconds=l.snapshot_interval_seconds, bulk_subscription_filters=l.bulk_subscription_filters, spread_types=l.spread_types) else: existing = listeners_by_service[l.key()] existing.subscription_type = list(set( @@ -508,13 +531,14 @@ def __send_existing_interest(self): subscription_type=i.subscription_type, instrument_type=i.get_instrument_types(), snapshot_interval_seconds=i.snapshot_interval_seconds, - bulk_subscription_filters=i.bulk_subscription_filters)) + bulk_subscription_filters=i.bulk_subscription_filters, + spread_types=i.spread_types)) # send other rpc requests for req in self.request_id_handlers.values(): req.send(self) - def __create_subscription_request(self, exchanges=[], symbols=[], service="REAL_TIME", subscription_type=["QUOTE"], instrument_type=[], snapshot_interval_seconds=60, bulk_subscription_filters=[]): + def __create_subscription_request(self, exchanges=[], symbols=[], service="REAL_TIME", subscription_type=["QUOTE"], instrument_type=[], snapshot_interval_seconds=60, bulk_subscription_filters=[], spread_types=[]): requests = [] if len(exchanges) > 0: @@ -536,7 +560,8 @@ def __create_subscription_request(self, exchanges=[], symbols=[], service="REAL_ symbol=sym, subscriptionType=[openfeed_api_pb2.SubscriptionType.Value( t) for t in subscription_type], - snapshotIntervalSeconds=snapshot_interval_seconds + snapshotIntervalSeconds=snapshot_interval_seconds, + spreadTypeFilter=spread_types )) of_req = openfeed_api_pb2.OpenfeedGatewayRequest( @@ -611,7 +636,7 @@ def __callback(self, callback, *args): class Listener(object): - def __init__(self, symbol="", exchange="", callback=None, service="REAL_TIME", subscription_type=["QUOTE"], instrument_type=[], snapshot_interval_seconds=60, bulk_subscription_filters=[]): + def __init__(self, symbol="", exchange="", callback=None, service="REAL_TIME", subscription_type=["QUOTE"], instrument_type=[], snapshot_interval_seconds=60, bulk_subscription_filters=[], spread_types=[]): self.symbol = symbol self.exchange = exchange self.callback = callback @@ -620,6 +645,7 @@ def __init__(self, symbol="", exchange="", callback=None, service="REAL_TIME", s self.instrument_type = instrument_type self.snapshot_interval_seconds = snapshot_interval_seconds self.bulk_subscription_filters = bulk_subscription_filters + self.spread_types = spread_types def key(self): if len(self.exchange) > 0: diff --git a/openfeed/version.py b/openfeed/version.py index 1f9e5f9..d00fee1 100644 --- a/openfeed/version.py +++ b/openfeed/version.py @@ -1 +1 @@ -VERSION = '1.3.0' +VERSION = '1.4.0' diff --git a/setup.py b/setup.py index d790725..0543a05 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ name='openfeed', version='1.4.0', author='Barchart', - author_email='mike@barchart.com', + author_email='openfeed@barchart.com', license='MIT', url='https://github.com/openfeed-org/sdk-python', include_package_data=True,