diff --git a/README.md b/README.md index 4a9b3a9..43f6526 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ #### (Work in Progress!) A port of apollographql subscriptions for python, using gevent websockets and redis -This is a implementation of apollographql [subscriptions-transport-ws](https://github.com/apollographql/subscriptions-transport-ws) and [graphql-subscriptions](https://github.com/apollographql/graphql-subscriptions) in Python. It currently implements a pubsub using [redis-py](https://github.com/andymccurdy/redis-py) and uses [gevent-websockets](https://bitbucket.org/noppo/gevent-websocket) for concurrency. It also makes heavy use of [syrusakbary/promise](https://github.com/syrusakbary/promise) python implementation to mirror the logic in the apollo-graphql libraries. +This is an implementation of graphql subscriptions in Python. It uses the apollographql [subscriptions-transport-ws](https://github.com/apollographql/subscriptions-transport-ws) and [graphql-subscriptions](https://github.com/apollographql/graphql-subscriptions) packages as its basis. It currently implements a pubsub using [redis-py](https://github.com/andymccurdy/redis-py) and uses [gevent-websockets](https://bitbucket.org/noppo/gevent-websocket) for concurrency. It also makes heavy use of [syrusakbary/promise](https://github.com/syrusakbary/promise) python implementation to mirror the logic in the apollo-graphql libraries. -Meant to be used in conjunction with [graphql-python](https://github.com/graphql-python) / [graphene](http://graphene-python.org/) server and [apollo-client](http://dev.apollodata.com/) for graphql. The api is below, but if you want more information, consult the apollo graphql libraries referenced above. +Meant to be used in conjunction with [graphql-python](https://github.com/graphql-python) / [graphene](http://graphene-python.org/) server and [apollo-client](http://dev.apollodata.com/) for graphql. The api is below, but if you want more information, consult the apollo graphql libraries referenced above, and specifcally as it relates to using their graphql subscriptions client. -Initial implementation. Currently only works with Python 2. +Initial implementation. Good test coverage. Currently only works with Python 2. ## Installation ``` @@ -48,10 +48,10 @@ $ pip install graphql-subscriptions #### Methods - `publish(trigger_name, payload)`: Trigger name is the subscription or pubsub channel; payload is the mutation object or message that will end up being passed to the subscription root_value; method called inside of mutation resolve function -- `subscribe(query, operation_name, callback, variables, context, format_error, format_response)`: Called by ApolloSubscriptionServer upon receiving a new subscription from a websocket. Arguments are parsed by ApolloSubscriptionServer from the graphql subscription query -- `unsubscribe(sub_id)`: Sub_id is the subscription ID that is being tracked by the subscription manager instance -- returned from the `subscribe()` method and called by the ApolloSubscriptionServer +- `subscribe(query, operation_name, callback, variables, context, format_error, format_response)`: Called by SubscriptionServer upon receiving a new subscription from a websocket. Arguments are parsed by SubscriptionServer from the graphql subscription query +- `unsubscribe(sub_id)`: Sub_id is the subscription ID that is being tracked by the subscription manager instance -- returned from the `subscribe()` method and called by the SubscriptionServer -### ApolloSubscriptionServer(subscription_manager, websocket, keep_alive=None, on_subscribe=None, on_unsubscribe=None, on_connect=None, on_disconnect=None) +### SubscriptionServer(subscription_manager, websocket, keep_alive=None, on_subscribe=None, on_unsubscribe=None, on_connect=None, on_disconnect=None) #### Arguments - `subscription_manager`: A subscripton manager instance (required). - `websocket`: The websocket object passed in from your route handler (required). @@ -78,7 +78,7 @@ from flask_sockets import Sockets from graphql_subscriptions import ( SubscriptionManager, RedisPubsub, - ApolloSubscriptionServer + SubscriptionServer ) app = Flask(__name__) @@ -106,7 +106,7 @@ subscription_mgr = SubscriptionManager(schema, pubsub) # subscription app / server -- passing in subscription manager and websocket @sockets.route('/socket') def socket_channel(websocket): - subscription_server = ApolloSubscriptionServer(subscription_mgr, websocket) + subscription_server = SubscriptionServer(subscription_mgr, websocket) subscription_server.handle() return [] diff --git a/graphql_subscriptions/__init__.py b/graphql_subscriptions/__init__.py index 727c760..2b8d152 100644 --- a/graphql_subscriptions/__init__.py +++ b/graphql_subscriptions/__init__.py @@ -1,4 +1,5 @@ from subscription_manager import RedisPubsub, SubscriptionManager -from subscription_transport_ws import ApolloSubscriptionServer +from subscription_transport_ws import SubscriptionServer + +__all__ = ['RedisPubsub', 'SubscriptionManager', 'SubscriptionServer'] -__all__ = ['RedisPubsub', 'SubscriptionManager', 'ApolloSubscriptionServer'] diff --git a/graphql_subscriptions/subscription_transport_ws.py b/graphql_subscriptions/subscription_transport_ws.py index bc30b09..b88a73b 100644 --- a/graphql_subscriptions/subscription_transport_ws.py +++ b/graphql_subscriptions/subscription_transport_ws.py @@ -15,7 +15,7 @@ GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions' -class ApolloSubscriptionServer(WebSocketApplication): +class SubscriptionServer(WebSocketApplication): def __init__(self, subscription_manager, websocket, @@ -37,7 +37,7 @@ def __init__(self, self.connection_subscriptions = {} self.connection_context = {} - super(ApolloSubscriptionServer, self).__init__(websocket) + super(SubscriptionServer, self).__init__(websocket) def timer(self, callback, period): while True: @@ -77,13 +77,11 @@ def on_message(self, msg): if msg is None: return - class nonlocal: - on_init_resolve = None - on_init_reject = None + non_local = {'on_init_resolve': None, 'on_init_reject': None} def init_promise_handler(resolve, reject): - nonlocal.on_init_resolve = resolve - nonlocal.on_init_reject = reject + non_local['on_init_resolve'] = resolve + non_local['on_init_reject'] = reject self.connection_context['init_promise'] = Promise(init_promise_handler) @@ -107,7 +105,7 @@ def on_message_return_handler(message): self.on_connect( parsed_message.get('payload'), self.ws)) - nonlocal.on_init_resolve(on_connect_promise) + non_local['on_init_resolve'](on_connect_promise) def init_success_promise_handler(result): if not result: @@ -133,7 +131,8 @@ def subscription_start_promise_handler(init_result): 'callback': None, 'variables': parsed_message.get('variables'), 'context': init_result if isinstance( - init_result, dict) else {}, + init_result, dict) else + parsed_message.get('context', {}), 'format_error': None, 'format_response': None } @@ -151,8 +150,7 @@ def subscription_start_promise_handler(init_result): def promised_params_handler(params): if not isinstance(params, dict): error = 'Invalid params returned from\ - OnSubscribe! Return value must\ - be an dict' +OnSubscribe! Return value must be an dict' self.send_subscription_fail( sub_id, {'errors': [{ @@ -164,15 +162,15 @@ def params_callback(error, result): if not error: self.send_subscription_data( sub_id, {'data': result.data}) - elif error.errors: - self.send_subscription_data( - sub_id, {'errors': error.errors}) elif error.message: self.send_subscription_data( sub_id, {'errors': [{ 'message': error.message }]}) + elif error.errors: + self.send_subscription_data( + sub_id, {'errors': error.errors}) else: self.send_subscription_data( sub_id, @@ -218,7 +216,7 @@ def error_catch_handler(e): # not sure if this behavior is correct or # not per promises A spec...need to # investigate - nonlocal.on_init_resolve(Promise.resolve(True)) + non_local['on_init_resolve'](Promise.resolve(True)) self.connection_context['init_promise'].then( subscription_start_promise_handler) @@ -231,7 +229,7 @@ def subscription_end_promise_handler(result): del self.connection_subscriptions[sub_id] # same rationale as above - nonlocal.on_init_resolve(Promise.resolve(True)) + non_local['on_init_resolve'](Promise.resolve(True)) self.connection_context['init_promise'].then( subscription_end_promise_handler) diff --git a/setup.py b/setup.py index c570c22..1caef1b 100644 --- a/setup.py +++ b/setup.py @@ -6,9 +6,14 @@ except (IOError, ImportError): long_description = open('README.md').read() +tests_dep = [ + 'pytest', 'pytest-mock', 'fakeredis', 'graphene', 'subprocess32', + 'flask', 'flask-graphql', 'flask-sockets', 'multiprocess', 'requests' +] + setup( name='graphql-subscriptions', - version='0.1.7', + version='0.1.8', author='Heath Ballard', author_email='heath.ballard@gmail.com', description=('A port of apollo-graphql subscriptions for python, using\ @@ -26,6 +31,12 @@ 'Programming Language :: Python :: 2.7', 'License :: OSI Approved :: MIT License' ], - install_requires=['gevent-websocket', 'redis', 'promise', 'graphql-core'], - tests_require=['pytest', 'pytest-mock', 'fakeredis', 'graphene'], + install_requires=[ + 'gevent-websocket', 'redis', 'graphql-core', 'promise<=1.0.1' + ], + test_suite='pytest', + tests_require=tests_dep, + extras_require={ + 'test': tests_dep + }, include_package_data=True) diff --git a/tests/package.json b/tests/package.json new file mode 100644 index 0000000..6f1e3da --- /dev/null +++ b/tests/package.json @@ -0,0 +1,6 @@ +{ + "dependencies": { + "graphql": "^0.9.6", + "subscriptions-transport-ws": "0.5.4" + } +} diff --git a/tests/test_subscription_manager.py b/tests/test_subscription_manager.py index 414787b..0d86b3e 100644 --- a/tests/test_subscription_manager.py +++ b/tests/test_subscription_manager.py @@ -13,12 +13,8 @@ @pytest.fixture -def mock_redis(monkeypatch): +def pubsub(monkeypatch): monkeypatch.setattr(redis, 'StrictRedis', fakeredis.FakeStrictRedis) - - -@pytest.fixture -def pubsub(mock_redis): return RedisPubsub() @@ -266,8 +262,7 @@ def publish_and_unsubscribe_handler(sub_id): def test_can_subscribe_to_more_than_one_trigger(sub_mgr): - class nonlocal: - trigger_count = 0 + non_local = {'trigger_count': 0} query = 'subscription multiTrigger($filterBoolean: Boolean,\ $uga: String){testFilterMulti(filterBoolean: $filterBoolean,\ @@ -282,10 +277,10 @@ def callback(err, payload): assert True else: assert payload.data.get('testFilterMulti') == 'good_filter' - nonlocal.trigger_count += 1 + non_local['trigger_count'] += 1 except AssertionError as e: sys.exit(e) - if nonlocal.trigger_count == 2: + if non_local['trigger_count'] == 2: sub_mgr.pubsub.greenlet.kill() def publish_and_unsubscribe_handler(sub_id): diff --git a/tests/test_subscription_transport.py b/tests/test_subscription_transport.py index e69de29..13d9361 100644 --- a/tests/test_subscription_transport.py +++ b/tests/test_subscription_transport.py @@ -0,0 +1,1252 @@ +# Many, if not most of these tests rely on using a graphql subscriptions +# client. "apollographql/subscriptions-transport-ws" is used here for testing +# the graphql subscriptions server implementation. In order to run these tests, +# "cd" to the "tests" directory and "npm install". Make sure you have nodejs +# installed in your $PATH. + +from functools import wraps +import copy +import json +import os +import sys +import threading +import time + +from flask import Flask, request, jsonify +from flask_graphql import GraphQLView +from flask_sockets import Sockets +from geventwebsocket import WebSocketServer +from promise import Promise +import fakeredis +import graphene +import multiprocess +import pytest +import redis +import requests + +from graphql_subscriptions import (RedisPubsub, SubscriptionManager, + SubscriptionServer) + +from graphql_subscriptions.subscription_transport_ws import (SUBSCRIPTION_FAIL, + SUBSCRIPTION_DATA) + +if os.name == 'posix' and sys.version_info[0] < 3: + import subprocess32 as subprocess +else: + import subprocess + +if sys.version_info[0] < 3: + import Queue +else: + import queue as Queue + +TEST_PORT = 5000 + + +class PickableMock(): + def __init__(self, return_value=None, side_effect=None, name=None): + self._return_value = return_value + self._side_effect = side_effect + self.name = name + self.called = False + self.call_count = 0 + self.call_args = set() + + def __call__(mock_self, *args, **kwargs): + mock_self.called = True + mock_self.call_count += 1 + call_args = {repr(arg) for arg in args} + call_kwargs = {repr(item) for item in kwargs} + mock_self.call_args = call_args | call_kwargs | mock_self.call_args + + if mock_self._side_effect and mock_self._return_value: + mock_self._side_effect(mock_self, *args, **kwargs) + return mock_self._return_value + elif mock_self._side_effect: + return mock_self._side_effect(mock_self, *args, **kwargs) + elif mock_self._return_value: + return mock_self._return_value + + def assert_called_once(self): + assert self.call_count == 1 + + def assert_called_with(self, *args, **kwargs): + call_args = {repr(json.loads(json.dumps(arg))) for arg in args} + call_kwargs = {repr(json.loads(json.dumps(item))) for item in kwargs} + all_call_args = call_args | call_kwargs + assert all_call_args.issubset(self.call_args) + + def assert_called_with_contains(self, arg_fragment): + assert any([arg_fragment in item for item in self.call_args]) + + +def promisify(f): + @wraps(f) + def wrapper(*args, **kwargs): + def executor(resolve, reject): + return resolve(f(*args, **kwargs)) + + return Promise(executor) + + return wrapper + + +def enqueue_output(out, queue): + with out: + for line in iter(out.readline, b''): + queue.put(line) + + +@pytest.fixture +def data(): + return { + '1': { + 'id': '1', + 'name': 'Dan' + }, + '2': { + 'id': '2', + 'name': 'Marie' + }, + '3': { + 'id': '3', + 'name': 'Jessie' + } + } + + +@pytest.fixture +def pubsub(monkeypatch): + monkeypatch.setattr(redis, 'StrictRedis', fakeredis.FakeStrictRedis) + return RedisPubsub() + + +@pytest.fixture +def schema(data): + class UserType(graphene.ObjectType): + id = graphene.String() + name = graphene.String() + + class Query(graphene.ObjectType): + test_string = graphene.String() + + class Subscription(graphene.ObjectType): + user = graphene.Field(UserType, id=graphene.String()) + user_filtered = graphene.Field(UserType, id=graphene.String()) + context = graphene.String() + error = graphene.String() + + def resolve_user(self, args, context, info): + id = args['id'] + name = data[args['id']]['name'] + return UserType(id=id, name=name) + + def resolve_user_filtered(self, args, context, info): + id = args['id'] + name = data[args['id']]['name'] + return UserType(id=id, name=name) + + def resolve_context(self, args, context, info): + return context + + def resolve_error(self, args, context, info): + raise Exception('E1') + + return graphene.Schema(query=Query, subscription=Subscription) + + +@pytest.fixture +def sub_mgr(pubsub, schema): + def user_filtered(**kwargs): + args = kwargs.get('args') + return { + 'user_filtered': { + 'filter': lambda root, ctx: root.get('id') == args.get('id') + } + } + + setup_funcs = {'user_filtered': user_filtered} + + return SubscriptionManager(schema, pubsub, setup_funcs) + + +@pytest.fixture +def on_sub_handler(): + def context_handler(): + raise Exception('bad') + + def on_subscribe(msg, params, websocket): + new_params = copy.deepcopy(params) + new_params.update({'context': context_handler}) + return new_params + + return {'on_subscribe': promisify(on_subscribe)} + + +@pytest.fixture +def on_sub_mock(mocker): + + mgr = multiprocess.Manager() + q = mgr.Queue() + + def on_subscribe(self, msg, params, websocket): + new_params = copy.deepcopy(params) + new_params.update({'context': msg.get('context', {})}) + q.put(self) + return new_params + + on_sub_mock = { + 'on_subscribe': + PickableMock(side_effect=promisify(on_subscribe), name='on_subscribe') + } + + return on_sub_mock, q + + +@pytest.fixture +def options_mocks(mocker): + + mgr = multiprocess.Manager() + q = mgr.Queue() + + def on_subscribe(self, msg, params, websocket): + new_params = copy.deepcopy(params) + new_params.update({'context': msg.get('context', {})}) + q.put(self) + return new_params + + def on_connect(self, message, websocket): + q.put(self) + + def on_disconnect(self, websocket): + q.put(self) + + def on_unsubscribe(self, websocket): + q.put(self) + + options_mocks = { + 'on_subscribe': + PickableMock(side_effect=promisify(on_subscribe), name='on_subscribe'), + 'on_unsubscribe': + PickableMock(side_effect=on_unsubscribe, name='on_unsubscribe'), + 'on_connect': + PickableMock( + return_value={'test': 'test_context'}, + side_effect=on_connect, + name='on_connect'), + 'on_disconnect': + PickableMock(side_effect=on_disconnect, name='on_disconnect') + } + + return options_mocks, q + + +def create_app(sub_mgr, schema, options): + app = Flask(__name__) + sockets = Sockets(app) + + app.app_protocol = lambda environ_path_info: 'graphql-subscriptions' + + app.add_url_rule( + '/graphql', + view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True)) + + @app.route('/publish', methods=['POST']) + def sub_mgr_publish(): + sub_mgr.publish(*request.get_json()) + return jsonify(request.get_json()) + + @sockets.route('/socket') + def socket_channel(websocket): + subscription_server = SubscriptionServer(sub_mgr, websocket, **options) + subscription_server.handle() + return [] + + return app + + +def app_worker(app, port): + server = WebSocketServer(('', port), app) + server.serve_forever() + + +@pytest.fixture() +def server(sub_mgr, schema, on_sub_mock): + + options, q = on_sub_mock + app = create_app(sub_mgr, schema, options) + + process = multiprocess.Process( + target=app_worker, kwargs={'app': app, + 'port': TEST_PORT}) + process.start() + yield q + process.terminate() + + +@pytest.fixture() +def server_with_mocks(sub_mgr, schema, options_mocks): + + options, q = options_mocks + app = create_app(sub_mgr, schema, options) + + process = multiprocess.Process( + target=app_worker, kwargs={'app': app, + 'port': TEST_PORT}) + + process.start() + yield q + process.terminate() + + +@pytest.fixture() +def server_with_on_sub_handler(sub_mgr, schema, on_sub_handler): + + app = create_app(sub_mgr, schema, on_sub_handler) + + process = multiprocess.Process( + target=app_worker, kwargs={'app': app, + 'port': TEST_PORT}) + process.start() + yield + process.terminate() + + +@pytest.fixture() +def server_with_keep_alive(sub_mgr, schema): + + app = create_app(sub_mgr, schema, {'keep_alive': .250}) + + process = multiprocess.Process( + target=app_worker, kwargs={'app': app, + 'port': TEST_PORT}) + process.start() + yield + process.terminate() + + +def test_raise_exception_when_create_server_and_no_sub_mgr(): + with pytest.raises(AssertionError): + SubscriptionServer(None, None) + + +def test_should_trigger_on_connect_if_client_connect_valid(server_with_mocks): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + new SubscriptionClient('ws://localhost:{1}/socket') + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) + except: + mock = server_with_mocks.get_nowait() + assert mock.name == 'on_connect' + mock.assert_called_once() + + +def test_should_trigger_on_connect_with_correct_cxn_params(server_with_mocks): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const connectionParams = {{test: true}} + new SubscriptionClient('ws://localhost:{1}/socket', {{ + connectionParams, + }}) + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) + except: + mock = server_with_mocks.get_nowait() + assert mock.name == 'on_connect' + mock.assert_called_once() + mock.assert_called_with({'test': True}) + + +def test_trigger_on_disconnect_when_client_disconnects(server_with_mocks): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.client.close() + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + subprocess.check_output(['node', '-e', node_script]) + mock = server_with_mocks.get_nowait() + assert mock.name == 'on_disconnect' + mock.assert_called_once() + + +def test_should_call_unsubscribe_when_client_closes_cxn(server_with_mocks): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + }}, function (error, result) {{ + // nothing + }} + ) + setTimeout(() => {{ + client.client.close() + }}, 500) + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=1) + except: + while True: + mock = server_with_mocks.get_nowait() + if mock.name == 'on_unsubscribe': + mock.assert_called_once() + break + + +def test_should_trigger_on_subscribe_when_client_subscribes(server_with_mocks): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + }}, function (error, result) {{ + // nothing + }}) + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) + except: + while True: + mock = server_with_mocks.get_nowait() + if mock.name == 'on_subscribe': + mock.assert_called_once() + break + + +def test_should_trigger_on_unsubscribe_when_client_unsubscribes( + server_with_mocks): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + const subId = client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + }}, function (error, result) {{ + // nothing + }}) + client.unsubscribe(subId) + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) + except: + while True: + mock = server_with_mocks.get_nowait() + if mock.name == 'on_unsubscribe': + mock.assert_called_once() + break + + +def test_should_send_correct_results_to_multiple_client_subscriptions(server): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + const client1 = new SubscriptionClient('ws://localhost:{1}/socket') + let numResults = 0; + client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + + }}, function (error, result) {{ + if (error) {{ + console.log(JSON.stringify(error)); + }} + if (result) {{ + numResults++; + console.log(JSON.stringify({{ + client: {{ + result: result, + numResults: numResults + }} + }})); + }} else {{ + // pass + }} + }} + ); + const client2 = new SubscriptionClient('ws://localhost:{1}/socket') + let numResults1 = 0; + client2.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 2, + }}, + + }}, function (error, result) {{ + if (error) {{ + console.log(JSON.stringify(error)); + }} + if (result) {{ + numResults1++; + console.log(JSON.stringify({{ + client2: {{ + result: result, + numResults: numResults1 + }} + }})); + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), json=['user', {}]) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + client = ret_values['client'] + assert client['result']['user'] + assert client['result']['user']['id'] == '3' + assert client['result']['user']['name'] == 'Jessie' + assert client['numResults'] == 1 + client2 = ret_values['client2'] + assert client2['result']['user'] + assert client2['result']['user']['id'] == '2' + assert client2['result']['user']['name'] == 'Marie' + assert client2['numResults'] == 1 + + +# TODO: Graphene subscriptions implementation does not currently return an +# error for missing or incorrect field(s); this test will continue to fail +# until that is fixed +def test_send_subscription_fail_message_to_client_with_invalid_query(server): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + setTimeout(function () {{ + client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + birthday + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + + }}, function (error, result) {{ + }} + ); + }}, 100); + client.client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + console.log(JSON.stringify({{[msg.type]: msg}})) + }}; + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + assert ret_values['type'] == SUBSCRIPTION_FAIL + assert len(ret_values['payload']['errors']) > 0 + + +# TODO: troubleshoot this a bit...passes, but receives extra messages which I'm +# filtering out w/ the "AttributeError" exception clause +def test_should_setup_the_proper_filters_when_subscribing(server): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + const client2 = new SubscriptionClient('ws://localhost:{1}/socket') + let numResults = 0; + client.subscribe({{ + query: `subscription useInfoFilter1($id: String) {{ + userFiltered(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfoFilter1', + variables: {{ + id: 3, + }}, + + }}, function (error, result) {{ + if (error) {{ + console.log(JSON.stringify(error)); + }} + if (result) {{ + numResults += 1; + console.log(JSON.stringify({{ + client: {{ + result: result, + numResults: numResults + }} + }})); + }} else {{ + // pass + }} + }} + ); + client2.subscribe({{ + query: `subscription useInfoFilter1($id: String) {{ + userFiltered(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfoFilter1', + variables: {{ + id: 1, + }}, + + }}, function (error, result) {{ + if (error) {{ + console.log(JSON.stringify(error)); + }} + if (result) {{ + numResults += 1; + console.log(JSON.stringify({{ + client2: {{ + result: result, + numResults: numResults + }} + }})); + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), + json=['user_filtered', { + 'id': 1 + }]) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), + json=['user_filtered', { + 'id': 2 + }]) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), + json=['user_filtered', { + 'id': 3 + }]) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except AttributeError: + pass + except Queue.Empty: + break + client = ret_values['client'] + assert client['result']['userFiltered'] + assert client['result']['userFiltered']['id'] == '3' + assert client['result']['userFiltered']['name'] == 'Jessie' + assert client['numResults'] == 2 + client2 = ret_values['client2'] + assert client2['result']['userFiltered'] + assert client2['result']['userFiltered']['id'] == '1' + assert client2['result']['userFiltered']['name'] == 'Dan' + assert client2['numResults'] == 1 + + +def test_correctly_sets_the_context_in_on_subscribe(server): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const CTX = 'testContext'; + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `subscription context {{ + context + }}`, + variables: {{}}, + context: CTX, + }}, (error, result) => {{ + client.unsubscribeAll(); + if (error) {{ + console.log(JSON.stringify(error)); + }} + if (result) {{ + console.log(JSON.stringify({{ + client: {{ + result: result, + }} + }})); + }} else {{ + // pass + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), json=['context', {}]) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + client = ret_values['client'] + assert client['result']['context'] + assert client['result']['context'] == 'testContext' + + +def test_passes_through_websocket_request_to_on_subscribe(server): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `subscription context {{ + context + }}`, + variables: {{}}, + }}, (error, result) => {{ + if (error) {{ + console.log(JSON.stringify(error)); + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) + except: + while True: + mock = server.get_nowait() + if mock.name == 'on_subscribe': + mock.assert_called_once() + mock.assert_called_with_contains('websocket') + break + + +def test_does_not_send_subscription_data_after_client_unsubscribes(server): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + setTimeout(function () {{ + let subId = client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + + }}, function (error, result) {{ + }} + ); + client.unsubscribe(subId); + }}, 100); + client.client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + console.log(JSON.stringify({{[msg.type]: msg}})) + }}; + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), json=['user', {}]) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + with pytest.raises(KeyError): + assert ret_values[SUBSCRIPTION_DATA] + + +# TODO: Need to look into why this test is throwing code 1006, not 1002 like +# it should be (1006 more general than 1002 protocol error) +def test_rejects_client_that_does_not_specifiy_a_supported_protocol(server): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const client = new WebSocket('ws://localhost:{1}/socket') + client.on('close', (code) => {{ + console.log(JSON.stringify(code)) + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = [] + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values.append(line) + except ValueError: + pass + except Queue.Empty: + break + assert ret_values[0] == 1002 or 1006 + + +def test_rejects_unparsable_message(server): + + node_script = ''' + module.paths.push('{0}'); + WebSocket = require('ws'); + const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions'; + const client = new WebSocket('ws://localhost:{1}/socket', + GRAPHQL_SUBSCRIPTIONS); + client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + console.log(JSON.stringify({{[msg.type]: msg}})) + client.close(); + }}; + client.onopen = () => {{ + client.send('HI'); + }} + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + assert ret_values['subscription_fail'] + assert len(ret_values['subscription_fail']['payload']['errors']) > 0 + + +def test_rejects_nonsense_message(server): + + node_script = ''' + module.paths.push('{0}'); + WebSocket = require('ws'); + const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions'; + const client = new WebSocket('ws://localhost:{1}/socket', + GRAPHQL_SUBSCRIPTIONS); + client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + console.log(JSON.stringify({{[msg.type]: msg}})) + client.close(); + }}; + client.onopen = () => {{ + client.send(JSON.stringify({{}})); + }} + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + assert ret_values['subscription_fail'] + assert len(ret_values['subscription_fail']['payload']['errors']) > 0 + + +def test_does_not_crash_on_unsub_from_unknown_sub(server): + + node_script = ''' + module.paths.push('{0}'); + WebSocket = require('ws'); + const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions'; + const client = new WebSocket('ws://localhost:{1}/socket', + GRAPHQL_SUBSCRIPTIONS); + setTimeout(function () {{ + client.onopen = () => {{ + const SUBSCRIPTION_END = 'subscription_end'; + let subEndMsg = {{type: SUBSCRIPTION_END, id: 'toString'}} + client.send(JSON.stringify(subEndMsg)); + }} + }}, 200); + client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + console.log(JSON.stringify({{[msg.type]: msg}})) + }}; + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = [] + while True: + try: + line = q.get_nowait() + ret_values.append(line) + except ValueError: + pass + except Queue.Empty: + break + assert len(ret_values) == 0 + + +def test_sends_back_any_type_of_error(server): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `invalid useInfo {{ + error + }}`, + variables: {{}}, + }}, function (errors, result) {{ + client.unsubscribeAll(); + if (errors) {{ + console.log(JSON.stringify({{'errors': errors}})) + }} + if (result) {{ + console.log(JSON.stringify({{'result': result}})) + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(5) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + assert len(ret_values['errors']) > 0 + + +def test_handles_errors_prior_to_graphql_execution(server_with_on_sub_handler): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `subscription context {{ + context + }}`, + variables: {{}}, + context: {{}}, + }}, function (errors, result) {{ + client.unsubscribeAll(); + if (errors) {{ + console.log(JSON.stringify({{'errors': errors}})) + }} + if (result) {{ + console.log(JSON.stringify({{'result': result}})) + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), json=['context', {}]) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + assert isinstance(ret_values['errors'], list) + assert ret_values['errors'][0]['message'] == 'bad' + + +def test_sends_a_keep_alive_signal_in_the_socket(server_with_keep_alive): + + node_script = ''' + module.paths.push('{0}'); + WebSocket = require('ws'); + const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions'; + const KEEP_ALIVE = 'keepalive'; + const client = new WebSocket('ws://localhost:{1}/socket', + GRAPHQL_SUBSCRIPTIONS); + let yieldCount = 0; + client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + if (msg.type === KEEP_ALIVE) {{ + yieldCount += 1; + if (yieldCount > 1) {{ + let returnMsg = {{'type': msg.type, 'yieldCount': yieldCount}} + console.log(JSON.stringify(returnMsg)) + client.close(); + }} + }} + }}; + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.5) + while True: + try: + line = q.get_nowait() + ret_value = json.loads(line) + except ValueError: + pass + except Queue.Empty: + break + assert ret_value['type'] == 'keepalive' + assert ret_value['yieldCount'] > 1