From 55043a1145a68e01805ea5684ac28a891f5d15ce Mon Sep 17 00:00:00 2001 From: hballard Date: Sun, 30 Apr 2017 10:17:00 -0500 Subject: [PATCH 01/12] Complete initial subscription_transport tests See notes in tests/test_subscription_transport.py for how to run tests. This implementation is based on the apollographql/subscriptions-transport-ws library and uses their graphql subscriptions client for testing. Specifically, these tests are compatable with npm package released version subscriptions-transport-ws@0.5.4. Tests are about halfway complete for that release version. --- .../subscription_transport_ws.py | 14 +- setup.py | 11 +- tests/package.json | 6 + tests/test_subscription_manager.py | 6 +- tests/test_subscription_transport.py | 458 ++++++++++++++++++ 5 files changed, 479 insertions(+), 16 deletions(-) create mode 100644 tests/package.json diff --git a/graphql_subscriptions/subscription_transport_ws.py b/graphql_subscriptions/subscription_transport_ws.py index bc30b09..6bc2988 100644 --- a/graphql_subscriptions/subscription_transport_ws.py +++ b/graphql_subscriptions/subscription_transport_ws.py @@ -77,13 +77,11 @@ def on_message(self, msg): if msg is None: return - class nonlocal: - on_init_resolve = None - on_init_reject = None + nonlocal = {'on_init_resolve': None, 'on_init_reject': None} def init_promise_handler(resolve, reject): - nonlocal.on_init_resolve = resolve - nonlocal.on_init_reject = reject + nonlocal['on_init_resolve'] = resolve + nonlocal['on_init_reject'] = reject self.connection_context['init_promise'] = Promise(init_promise_handler) @@ -107,7 +105,7 @@ def on_message_return_handler(message): self.on_connect( parsed_message.get('payload'), self.ws)) - nonlocal.on_init_resolve(on_connect_promise) + nonlocal['on_init_resolve'](on_connect_promise) def init_success_promise_handler(result): if not result: @@ -218,7 +216,7 @@ def error_catch_handler(e): # not sure if this behavior is correct or # not per promises A spec...need to # investigate - nonlocal.on_init_resolve(Promise.resolve(True)) + nonlocal['on_init_resolve'](Promise.resolve(True)) self.connection_context['init_promise'].then( subscription_start_promise_handler) @@ -231,7 +229,7 @@ def subscription_end_promise_handler(result): del self.connection_subscriptions[sub_id] # same rationale as above - nonlocal.on_init_resolve(Promise.resolve(True)) + nonlocal['on_init_resolve'](Promise.resolve(True)) self.connection_context['init_promise'].then( subscription_end_promise_handler) diff --git a/setup.py b/setup.py index c570c22..aec2bfd 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name='graphql-subscriptions', - version='0.1.7', + version='0.1.8', author='Heath Ballard', author_email='heath.ballard@gmail.com', description=('A port of apollo-graphql subscriptions for python, using\ @@ -26,6 +26,11 @@ 'Programming Language :: Python :: 2.7', 'License :: OSI Approved :: MIT License' ], - install_requires=['gevent-websocket', 'redis', 'promise', 'graphql-core'], - tests_require=['pytest', 'pytest-mock', 'fakeredis', 'graphene'], + install_requires=[ + 'gevent-websocket', 'redis', 'promise==1.0.1', 'graphql-core' + ], + tests_require=[ + 'pytest', 'pytest-mock', 'fakeredis', 'graphene', 'subprocess32', + 'flask', 'flask-graphql', 'flask-sockets' + ], include_package_data=True) diff --git a/tests/package.json b/tests/package.json new file mode 100644 index 0000000..6f1e3da --- /dev/null +++ b/tests/package.json @@ -0,0 +1,6 @@ +{ + "dependencies": { + "graphql": "^0.9.6", + "subscriptions-transport-ws": "0.5.4" + } +} diff --git a/tests/test_subscription_manager.py b/tests/test_subscription_manager.py index 414787b..ea75de5 100644 --- a/tests/test_subscription_manager.py +++ b/tests/test_subscription_manager.py @@ -13,12 +13,8 @@ @pytest.fixture -def mock_redis(monkeypatch): +def pubsub(monkeypatch): monkeypatch.setattr(redis, 'StrictRedis', fakeredis.FakeStrictRedis) - - -@pytest.fixture -def pubsub(mock_redis): return RedisPubsub() diff --git a/tests/test_subscription_transport.py b/tests/test_subscription_transport.py index e69de29..a453994 100644 --- a/tests/test_subscription_transport.py +++ b/tests/test_subscription_transport.py @@ -0,0 +1,458 @@ +# Many, if not most of these tests rely on using a graphql subscriptions +# client. "apollographql/subscriptions-transport-ws" is used here for testing +# the graphql subscriptions server implementation. In order to run these tests, +# "cd" to the "tests" directory and "npm install". Make sure you have nodejs +# installed in your $PATH. + +from functools import wraps +import copy +import json +import multiprocess +import os +import sys + +from flask import Flask +from flask_graphql import GraphQLView +from flask_sockets import Sockets +from geventwebsocket import WebSocketServer +from promise import Promise +import fakeredis +import graphene +import pytest +import redis + +from graphql_subscriptions import (RedisPubsub, SubscriptionManager, + ApolloSubscriptionServer) + +from graphql_subscriptions.subscription_transport_ws import ( + SUBSCRIPTION_START, SUBSCRIPTION_FAIL, SUBSCRIPTION_DATA, KEEPALIVE, + SUBSCRIPTION_END) + +if os.name == 'posix' and sys.version_info[0] < 3: + import subprocess32 as subprocess +else: + import subprocess + +TEST_PORT = 5000 +KEEP_ALIVE_TEST_PORT = TEST_PORT + 1 +DELAYED_TEST_PORT = TEST_PORT + 2 +RAW_TEST_PORT = TEST_PORT + 4 +EVENTS_TEST_PORT = TEST_PORT + 5 + + +class PickableMock(): + def __init__(self, return_value=None, side_effect=None, name=None): + self._return_value = return_value + self._side_effect = side_effect + self.name = name + self.called = False + self.call_count = 0 + self.call_args = set() + + def __call__(mock_self, *args, **kwargs): + mock_self.called = True + mock_self.call_count += 1 + call_args = {repr(arg) for arg in args} + call_kwargs = {repr(item) for item in kwargs} + mock_self.call_args = call_args | call_kwargs | mock_self.call_args + + if mock_self._side_effect and mock_self._return_value: + mock_self._side_effect(mock_self, *args, **kwargs) + return mock_self._return_value + elif mock_self._side_effect: + return mock_self._side_effect(mock_self, *args, **kwargs) + elif mock_self._return_value: + return mock_self._return_value + + def assert_called_once(self): + assert self.call_count == 1 + + def assert_called_with(self, *args, **kwargs): + call_args = {repr(json.loads(json.dumps(arg))) for arg in args} + call_kwargs = {repr(json.loads(json.dumps(item))) for item in kwargs} + all_call_args = call_args | call_kwargs + assert all_call_args.issubset(self.call_args) + + +def promisify(f): + @wraps(f) + def wrapper(*args, **kwargs): + def executor(resolve, reject): + return resolve(f(*args, **kwargs)) + + return Promise(executor) + + return wrapper + + +@pytest.fixture +def data(): + return { + '1': { + 'id': '1', + 'name': 'Dan' + }, + '2': { + 'id': '2', + 'name': 'Marie' + }, + '3': { + 'id': '3', + 'name': 'Jessie' + } + } + + +@pytest.fixture +def pubsub(monkeypatch): + monkeypatch.setattr(redis, 'StrictRedis', fakeredis.FakeStrictRedis) + return RedisPubsub() + + +@pytest.fixture +def schema(data): + class UserType(graphene.ObjectType): + id = graphene.String() + name = graphene.String() + + class Query(graphene.ObjectType): + test_string = graphene.String() + + class Subscription(graphene.ObjectType): + user = graphene.Field(UserType, id=graphene.String()) + user_filtered = graphene.Field(UserType, id=graphene.String()) + context = graphene.String() + error = graphene.String() + + def resolve_user(self, args, context, info): + id = args['id'] + name = data[args['id']]['name'] + return UserType(id=id, name=name) + + def resolve_user_filtered(self, args, context, info): + id = args['id'] + name = data[args['id']]['name'] + return UserType(id=id, name=name) + + def resolve_context(self, args, context, info): + return context + + def resolve_error(self, args, context, info): + raise Exception('E1') + + return graphene.Schema(query=Query, subscription=Subscription) + + +@pytest.fixture +def sub_mgr(pubsub, schema): + def user_filtered_func(**kwargs): + args = kwargs.get('args') + return { + 'user_filtered': { + 'filter': lambda root, ctx: root.get('id') == args.get('id') + } + } + + setup_funcs = {'user_filtered': user_filtered_func} + + return SubscriptionManager(schema, pubsub, setup_funcs) + + +@pytest.fixture +def handlers(): + def copy_and_update_dict(msg, params, websocket): + new_params = copy.deepcopy(params) + new_params.update({'context': msg['context']}) + return new_params + + return {'on_subscribe': promisify(copy_and_update_dict)} + + +@pytest.fixture +def options(handlers): + return { + 'on_subscribe': + lambda msg, params, ws: handlers['on_subscribe'](msg, params, ws) + } + + +@pytest.fixture +def events_options(mocker): + + mgr = multiprocess.Manager() + q = mgr.Queue() + + def on_subscribe(self, msg, params, websocket): + new_params = copy.deepcopy(params) + new_params.update({'context': msg.get('context', {})}) + q.put(self) + return new_params + + def on_connect(self, message, websocket): + q.put(self) + + def on_disconnect(self, websocket): + q.put(self) + + def on_unsubscribe(self, websocket): + q.put(self) + + events_options = { + 'on_subscribe': + PickableMock(side_effect=promisify(on_subscribe), name='on_subscribe'), + 'on_unsubscribe': + PickableMock(side_effect=on_unsubscribe, name='on_unsubscribe'), + 'on_connect': + PickableMock( + return_value={'test': 'test_context'}, + side_effect=on_connect, + name='on_connect'), + 'on_disconnect': + PickableMock(side_effect=on_disconnect, name='on_disconnect') + } + + return events_options, q + + +def create_app(sub_mgr, schema, options): + app = Flask(__name__) + sockets = Sockets(app) + + app.app_protocol = lambda environ_path_info: 'graphql-subscriptions' + + app.add_url_rule( + '/graphql', + view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True)) + + @sockets.route('/socket') + def socket_channel(websocket): + subscription_server = ApolloSubscriptionServer(sub_mgr, websocket, + **options) + subscription_server.handle() + return [] + + return app + + +def app_worker(app, port): + server = WebSocketServer(('', port), app) + server.serve_forever() + + +@pytest.fixture() +def server(sub_mgr, schema, options): + + app = create_app(sub_mgr, schema, options) + + process = multiprocess.Process( + target=app_worker, kwargs={'app': app, + 'port': TEST_PORT}) + process.start() + yield + process.terminate() + + +@pytest.fixture() +def server_with_keep_alive(sub_mgr, schema, options): + + options_with_keep_alive = options.copy() + options_with_keep_alive.update({'keep_alive': 10}) + app = create_app(sub_mgr, schema, options_with_keep_alive) + + process = multiprocess.Process( + target=app_worker, kwargs={'app': app, + 'port': KEEP_ALIVE_TEST_PORT}) + process.start() + yield + process.terminate() + + +@pytest.fixture() +def server_with_events(sub_mgr, schema, events_options): + + options, q = events_options + app = create_app(sub_mgr, schema, options) + + process = multiprocess.Process( + target=app_worker, kwargs={'app': app, + 'port': EVENTS_TEST_PORT}) + + process.start() + yield q + process.terminate() + + +def test_raise_exception_when_create_server_and_no_sub_mgr(): + with pytest.raises(AssertionError): + ApolloSubscriptionServer(None, None) + + +def test_should_trigger_on_connect_if_client_connect_valid(server_with_events): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + new SubscriptionClient('ws://localhost:{1}/socket') + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + EVENTS_TEST_PORT) + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) + except: + ret_value = server_with_events.get_nowait() + assert ret_value.name == 'on_connect' + ret_value.assert_called_once() + + +def test_should_trigger_on_connect_with_correct_cxn_params(server_with_events): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const connectionParams = {{test: true}} + new SubscriptionClient('ws://localhost:{1}/socket', {{ + connectionParams, + }}) + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + EVENTS_TEST_PORT) + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) + except: + ret_value = server_with_events.get_nowait() + assert ret_value.name == 'on_connect' + ret_value.assert_called_once() + ret_value.assert_called_with({'test': True}) + + +def test_trigger_on_disconnect_when_client_disconnects(server_with_events): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.client.close() + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + EVENTS_TEST_PORT) + subprocess.check_output(['node', '-e', node_script]) + ret_value = server_with_events.get_nowait() + assert ret_value.name == 'on_disconnect' + ret_value.assert_called_once() + + +def test_should_call_unsubscribe_when_client_closes_cxn(server_with_events): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + }}, function (error, result) {{ + // nothing + }} + ) + setTimeout(() => {{ + client.client.close() + }}, 500) + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + EVENTS_TEST_PORT) + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=1) + except: + while True: + ret_value = server_with_events.get_nowait() + if ret_value.name == 'on_unsubscribe': + ret_value.assert_called_once() + break + + +def test_should_trigger_on_subscribe_when_client_subscribes( + server_with_events): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + }}, function (error, result) {{ + // nothing + }}) + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + EVENTS_TEST_PORT) + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) + except: + while True: + ret_value = server_with_events.get_nowait() + if ret_value.name == 'on_subscribe': + ret_value.assert_called_once() + break + + +def test_should_trigger_on_unsubscribe_when_client_unsubscribes( + server_with_events): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + const subId = client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + }}, function (error, result) {{ + // nothing + }}) + client.unsubscribe(subId) + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + EVENTS_TEST_PORT) + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) + except: + while True: + ret_value = server_with_events.get_nowait() + if ret_value.name == 'on_unsubscribe': + ret_value.assert_called_once() + break From 6b31dcecef58c0fbe782fb87ba9766e5d568713e Mon Sep 17 00:00:00 2001 From: hballard Date: Sun, 28 May 2017 23:16:23 -0500 Subject: [PATCH 02/12] Add "multiprocess" lib to setup.py "tests_require" --- setup.py | 2 +- tests/test_subscription_transport.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index aec2bfd..4f8e597 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,6 @@ ], tests_require=[ 'pytest', 'pytest-mock', 'fakeredis', 'graphene', 'subprocess32', - 'flask', 'flask-graphql', 'flask-sockets' + 'flask', 'flask-graphql', 'flask-sockets', 'multiprocess' ], include_package_data=True) diff --git a/tests/test_subscription_transport.py b/tests/test_subscription_transport.py index a453994..a1e8c26 100644 --- a/tests/test_subscription_transport.py +++ b/tests/test_subscription_transport.py @@ -7,7 +7,6 @@ from functools import wraps import copy import json -import multiprocess import os import sys @@ -18,6 +17,7 @@ from promise import Promise import fakeredis import graphene +import multiprocess import pytest import redis From 385b1bb95617b41df36b0784a0ccf668045046f1 Mon Sep 17 00:00:00 2001 From: hballard Date: Sun, 4 Jun 2017 23:04:17 -0500 Subject: [PATCH 03/12] Complete all but 2 of subscription_transport tests Only two remaining tests to complete and this will bring the test coverage up the Apollo subscription-transport-ws libary 0.5.4 release. --- tests/test_subscription_transport.py | 766 +++++++++++++++++++++++++-- 1 file changed, 734 insertions(+), 32 deletions(-) diff --git a/tests/test_subscription_transport.py b/tests/test_subscription_transport.py index a1e8c26..3229823 100644 --- a/tests/test_subscription_transport.py +++ b/tests/test_subscription_transport.py @@ -8,9 +8,12 @@ import copy import json import os +import Queue import sys +import threading +import time -from flask import Flask +from flask import Flask, request from flask_graphql import GraphQLView from flask_sockets import Sockets from geventwebsocket import WebSocketServer @@ -20,13 +23,13 @@ import multiprocess import pytest import redis +import requests from graphql_subscriptions import (RedisPubsub, SubscriptionManager, ApolloSubscriptionServer) from graphql_subscriptions.subscription_transport_ws import ( - SUBSCRIPTION_START, SUBSCRIPTION_FAIL, SUBSCRIPTION_DATA, KEEPALIVE, - SUBSCRIPTION_END) + SUBSCRIPTION_FAIL, SUBSCRIPTION_DATA, KEEPALIVE) if os.name == 'posix' and sys.version_info[0] < 3: import subprocess32 as subprocess @@ -35,8 +38,6 @@ TEST_PORT = 5000 KEEP_ALIVE_TEST_PORT = TEST_PORT + 1 -DELAYED_TEST_PORT = TEST_PORT + 2 -RAW_TEST_PORT = TEST_PORT + 4 EVENTS_TEST_PORT = TEST_PORT + 5 @@ -73,6 +74,9 @@ def assert_called_with(self, *args, **kwargs): all_call_args = call_args | call_kwargs assert all_call_args.issubset(self.call_args) + def assert_called_with_contains(self, arg_fragment): + assert any([arg_fragment in item for item in self.call_args]) + def promisify(f): @wraps(f) @@ -85,6 +89,12 @@ def executor(resolve, reject): return wrapper +def enqueue_output(out, queue): + with out: + for line in iter(out.readline, b''): + queue.put(line) + + @pytest.fixture def data(): return { @@ -145,7 +155,7 @@ def resolve_error(self, args, context, info): @pytest.fixture def sub_mgr(pubsub, schema): - def user_filtered_func(**kwargs): + def user_filtered(**kwargs): args = kwargs.get('args') return { 'user_filtered': { @@ -153,29 +163,49 @@ def user_filtered_func(**kwargs): } } - setup_funcs = {'user_filtered': user_filtered_func} + setup_funcs = {'user_filtered': user_filtered} return SubscriptionManager(schema, pubsub, setup_funcs) @pytest.fixture def handlers(): - def copy_and_update_dict(msg, params, websocket): + def on_subscribe(msg, params, websocket): new_params = copy.deepcopy(params) - new_params.update({'context': msg['context']}) + new_params.update({'context': msg.get('context', {})}) return new_params - return {'on_subscribe': promisify(copy_and_update_dict)} + return {'on_subscribe': promisify(on_subscribe)} @pytest.fixture -def options(handlers): +def options1(handlers): return { 'on_subscribe': lambda msg, params, ws: handlers['on_subscribe'](msg, params, ws) } +@pytest.fixture +def options(mocker): + + mgr = multiprocess.Manager() + q = mgr.Queue() + + def on_subscribe(self, msg, params, websocket): + new_params = copy.deepcopy(params) + new_params.update({'context': msg.get('context', {})}) + q.put(self) + return new_params + + options = { + 'on_subscribe': + PickableMock(side_effect=promisify(on_subscribe), name='on_subscribe') + } + + return options, q + + @pytest.fixture def events_options(mocker): @@ -224,6 +254,10 @@ def create_app(sub_mgr, schema, options): '/graphql', view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True)) + @app.route('/publish', methods=['POST']) + def sub_mgr_publish(): + sub_mgr.publish(*request.get_json()) + @sockets.route('/socket') def socket_channel(websocket): subscription_server = ApolloSubscriptionServer(sub_mgr, websocket, @@ -242,13 +276,14 @@ def app_worker(app, port): @pytest.fixture() def server(sub_mgr, schema, options): - app = create_app(sub_mgr, schema, options) + opt, q = options + app = create_app(sub_mgr, schema, opt) process = multiprocess.Process( target=app_worker, kwargs={'app': app, 'port': TEST_PORT}) process.start() - yield + yield q process.terminate() @@ -301,9 +336,9 @@ def test_should_trigger_on_connect_if_client_connect_valid(server_with_events): subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) except: - ret_value = server_with_events.get_nowait() - assert ret_value.name == 'on_connect' - ret_value.assert_called_once() + mock = server_with_events.get_nowait() + assert mock.name == 'on_connect' + mock.assert_called_once() def test_should_trigger_on_connect_with_correct_cxn_params(server_with_events): @@ -323,10 +358,10 @@ def test_should_trigger_on_connect_with_correct_cxn_params(server_with_events): subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) except: - ret_value = server_with_events.get_nowait() - assert ret_value.name == 'on_connect' - ret_value.assert_called_once() - ret_value.assert_called_with({'test': True}) + mock = server_with_events.get_nowait() + assert mock.name == 'on_connect' + mock.assert_called_once() + mock.assert_called_with({'test': True}) def test_trigger_on_disconnect_when_client_disconnects(server_with_events): @@ -341,9 +376,9 @@ def test_trigger_on_disconnect_when_client_disconnects(server_with_events): os.path.join(os.path.dirname(__file__), 'node_modules'), EVENTS_TEST_PORT) subprocess.check_output(['node', '-e', node_script]) - ret_value = server_with_events.get_nowait() - assert ret_value.name == 'on_disconnect' - ret_value.assert_called_once() + mock = server_with_events.get_nowait() + assert mock.name == 'on_disconnect' + mock.assert_called_once() def test_should_call_unsubscribe_when_client_closes_cxn(server_with_events): @@ -379,9 +414,9 @@ def test_should_call_unsubscribe_when_client_closes_cxn(server_with_events): ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=1) except: while True: - ret_value = server_with_events.get_nowait() - if ret_value.name == 'on_unsubscribe': - ret_value.assert_called_once() + mock = server_with_events.get_nowait() + if mock.name == 'on_unsubscribe': + mock.assert_called_once() break @@ -415,9 +450,9 @@ def test_should_trigger_on_subscribe_when_client_subscribes( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) except: while True: - ret_value = server_with_events.get_nowait() - if ret_value.name == 'on_subscribe': - ret_value.assert_called_once() + mock = server_with_events.get_nowait() + if mock.name == 'on_subscribe': + mock.assert_called_once() break @@ -452,7 +487,674 @@ def test_should_trigger_on_unsubscribe_when_client_unsubscribes( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) except: while True: - ret_value = server_with_events.get_nowait() - if ret_value.name == 'on_unsubscribe': - ret_value.assert_called_once() + mock = server_with_events.get_nowait() + if mock.name == 'on_unsubscribe': + mock.assert_called_once() break + + +def test_should_send_correct_results_to_multiple_client_subscriptions( + server): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + const client1 = new SubscriptionClient('ws://localhost:{1}/socket') + let numResults = 0; + client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + + }}, function (error, result) {{ + if (error) {{ + console.log(JSON.stringify(error)); + }} + if (result) {{ + numResults++; + console.log(JSON.stringify({{ + client: {{ + result: result, + numResults: numResults + }} + }})); + }} else {{ + // pass + }} + }} + ); + const client2 = new SubscriptionClient('ws://localhost:{1}/socket') + let numResults1 = 0; + client2.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 2, + }}, + + }}, function (error, result) {{ + if (error) {{ + console.log(JSON.stringify(error)); + }} + if (result) {{ + numResults1++; + console.log(JSON.stringify({{ + client2: {{ + result: result, + numResults: numResults1 + }} + }})); + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), + json=['user', {}]) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + client = ret_values['client'] + assert client['result']['user'] + assert client['result']['user']['id'] == '3' + assert client['result']['user']['name'] == 'Jessie' + assert client['numResults'] == 1 + client2 = ret_values['client2'] + assert client2['result']['user'] + assert client2['result']['user']['id'] == '2' + assert client2['result']['user']['name'] == 'Marie' + assert client2['numResults'] == 1 + + +# Graphene subscriptions implementation does not currently return an error for +# missing or incorrect field(s); this test will continue to fail until that is +# fixed +def test_send_subscription_fail_message_to_client_with_invalid_query(server): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + setTimeout(function () {{ + client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + birthday + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + + }}, function (error, result) {{ + }} + ); + }}, 100); + client.client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + console.log(JSON.stringify({{[msg.type]: msg}})) + }}; + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + assert ret_values['type'] == SUBSCRIPTION_FAIL + assert len(ret_values['payload']['errors']) > 0 + + +def test_should_setup_the_proper_filters_when_subscribing(server): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + const client2 = new SubscriptionClient('ws://localhost:{1}/socket') + let numResults = 0; + client.subscribe({{ + query: `subscription useInfoFilter1($id: String) {{ + userFiltered(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfoFilter1', + variables: {{ + id: 3, + }}, + + }}, function (error, result) {{ + if (error) {{ + console.log(JSON.stringify(error)); + }} + if (result) {{ + numResults += 1; + console.log(JSON.stringify({{ + client: {{ + result: result, + numResults: numResults + }} + }})); + }} else {{ + // pass + }} + }} + ); + client2.subscribe({{ + query: `subscription useInfoFilter1($id: String) {{ + userFiltered(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfoFilter1', + variables: {{ + id: 1, + }}, + + }}, function (error, result) {{ + if (error) {{ + console.log(JSON.stringify(error)); + }} + if (result) {{ + numResults += 1; + console.log(JSON.stringify({{ + client2: {{ + result: result, + numResults: numResults + }} + }})); + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), + json=['user_filtered', {'id': 1}]) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), + json=['user_filtered', {'id': 2}]) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), + json=['user_filtered', {'id': 3}]) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + client = ret_values['client'] + assert client['result']['userFiltered'] + assert client['result']['userFiltered']['id'] == '3' + assert client['result']['userFiltered']['name'] == 'Jessie' + assert client['numResults'] == 2 + client2 = ret_values['client2'] + assert client2['result']['userFiltered'] + assert client2['result']['userFiltered']['id'] == '1' + assert client2['result']['userFiltered']['name'] == 'Dan' + assert client2['numResults'] == 1 + + +def test_correctly_sets_the_context_in_on_subscribe(server): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const CTX = 'testContext'; + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `subscription context {{ + context + }}`, + variables: {{}}, + context: CTX, + }}, (error, result) => {{ + client.unsubscribeAll(); + if (error) {{ + console.log(JSON.stringify(error)); + }} + if (result) {{ + console.log(JSON.stringify({{ + client: {{ + result: result, + }} + }})); + }} else {{ + // pass + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), + json=['context', {}]) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + client = ret_values['client'] + assert client['result']['context'] + assert client['result']['context'] == 'testContext' + + +def test_passes_through_websocket_request_to_on_subscribe(server): + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `subscription context {{ + context + }}`, + variables: {{}}, + }}, (error, result) => {{ + if (error) {{ + console.log(JSON.stringify(error)); + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + try: + subprocess.check_output( + ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) + except: + while True: + mock = server.get_nowait() + if mock.name == 'on_subscribe': + mock.assert_called_once() + mock.assert_called_with_contains('websocket') + break + + +def test_does_not_send_subscription_data_after_client_unsubscribes(server): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + setTimeout(function () {{ + let subId = client.subscribe({{ + query: `subscription useInfo($id: String) {{ + user(id: $id) {{ + id + name + }} + }}`, + operationName: 'useInfo', + variables: {{ + id: 3, + }}, + + }}, function (error, result) {{ + }} + ); + client.unsubscribe(subId); + }}, 100); + client.client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + console.log(JSON.stringify({{[msg.type]: msg}})) + }}; + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), + json=['user', {}]) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + with pytest.raises(KeyError): + assert ret_values[SUBSCRIPTION_DATA] + + +# Need to look into why this test is failing; current thrown code is 1006 not +# 1002 like it should be (1006 more general than 1002 protocol error) +def test_rejects_client_that_does_not_specifiy_a_supported_protocol(server): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const client = new WebSocket('ws://localhost:{1}/socket') + client.on('close', (code) => {{ + console.log(JSON.stringify(code)) + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = [] + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values.append(line) + except ValueError: + pass + except Queue.Empty: + break + assert ret_values[0] == 1002 or 1006 + + +def test_rejects_unparsable_message(server): + + node_script = ''' + module.paths.push('{0}'); + WebSocket = require('ws'); + const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions'; + const client = new WebSocket('ws://localhost:{1}/socket', + GRAPHQL_SUBSCRIPTIONS); + client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + console.log(JSON.stringify({{[msg.type]: msg}})) + client.close(); + }}; + client.onopen = () => {{ + client.send('HI'); + }} + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + assert ret_values['subscription_fail'] + assert len(ret_values['subscription_fail']['payload']['errors']) > 0 + + +def test_rejects_nonsense_message(server): + + node_script = ''' + module.paths.push('{0}'); + WebSocket = require('ws'); + const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions'; + const client = new WebSocket('ws://localhost:{1}/socket', + GRAPHQL_SUBSCRIPTIONS); + client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + console.log(JSON.stringify({{[msg.type]: msg}})) + client.close(); + }}; + client.onopen = () => {{ + client.send(JSON.stringify({{}})); + }} + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + assert ret_values['subscription_fail'] + assert len(ret_values['subscription_fail']['payload']['errors']) > 0 + + +def test_does_not_crash_on_unsub_from_unknown_sub(server): + + node_script = ''' + module.paths.push('{0}'); + WebSocket = require('ws'); + const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions'; + const client = new WebSocket('ws://localhost:{1}/socket', + GRAPHQL_SUBSCRIPTIONS); + setTimeout(function () {{ + client.onopen = () => {{ + const SUBSCRIPTION_END = 'subscription_end'; + let subEndMsg = {{type: SUBSCRIPTION_END, id: 'toString'}} + client.send(JSON.stringify(subEndMsg)); + }} + }}, 200); + client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + console.log(JSON.stringify({{[msg.type]: msg}})) + }}; + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = [] + while True: + try: + line = q.get_nowait() + ret_values.append(line) + except ValueError: + pass + except Queue.Empty: + break + assert len(ret_values) == 0 + + +def test_sends_back_any_type_of_error(server): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `invalid useInfo {{ + error + }}`, + variables: {{}}, + }}, function (errors, result) {{ + client.unsubscribeAll(); + if (errors) {{ + console.log(JSON.stringify({{'errors': errors}})) + }} + if (result) {{ + console.log(JSON.stringify({{'result': result}})) + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), + TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(5) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + assert len(ret_values['errors']) > 0 From 39a394b2bd3283cf7384d877bda2843e4c3565a9 Mon Sep 17 00:00:00 2001 From: hballard Date: Sun, 4 Jun 2017 23:14:37 -0500 Subject: [PATCH 04/12] Add requests library to setup.py for tests --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 4f8e597..74b0f1e 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,6 @@ ], tests_require=[ 'pytest', 'pytest-mock', 'fakeredis', 'graphene', 'subprocess32', - 'flask', 'flask-graphql', 'flask-sockets', 'multiprocess' + 'flask', 'flask-graphql', 'flask-sockets', 'multiprocess', 'requests' ], include_package_data=True) From 13af026267e2ad028050b5b01e6c151a12204967 Mon Sep 17 00:00:00 2001 From: hballard Date: Sun, 4 Jun 2017 23:35:38 -0500 Subject: [PATCH 05/12] Modify Queue module import to support python 2/3 --- tests/test_subscription_transport.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_subscription_transport.py b/tests/test_subscription_transport.py index 3229823..f1e92be 100644 --- a/tests/test_subscription_transport.py +++ b/tests/test_subscription_transport.py @@ -8,7 +8,6 @@ import copy import json import os -import Queue import sys import threading import time @@ -36,6 +35,11 @@ else: import subprocess +if sys.version_info[0] < 3: + import Queue +else: + import queue as Queue + TEST_PORT = 5000 KEEP_ALIVE_TEST_PORT = TEST_PORT + 1 EVENTS_TEST_PORT = TEST_PORT + 5 From 8e20828de6f6e0fa1d9224114c2a72062978794b Mon Sep 17 00:00:00 2001 From: hballard Date: Sun, 4 Jun 2017 23:40:53 -0500 Subject: [PATCH 06/12] Rename ApolloSubscriptionServer class Change class name to just "SubscriptionServer" --- README.md | 10 +++++----- graphql_subscriptions/__init__.py | 5 +++-- graphql_subscriptions/subscription_transport_ws.py | 4 ++-- tests/test_subscription_transport.py | 6 +++--- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 4a9b3a9..2a15170 100644 --- a/README.md +++ b/README.md @@ -48,10 +48,10 @@ $ pip install graphql-subscriptions #### Methods - `publish(trigger_name, payload)`: Trigger name is the subscription or pubsub channel; payload is the mutation object or message that will end up being passed to the subscription root_value; method called inside of mutation resolve function -- `subscribe(query, operation_name, callback, variables, context, format_error, format_response)`: Called by ApolloSubscriptionServer upon receiving a new subscription from a websocket. Arguments are parsed by ApolloSubscriptionServer from the graphql subscription query -- `unsubscribe(sub_id)`: Sub_id is the subscription ID that is being tracked by the subscription manager instance -- returned from the `subscribe()` method and called by the ApolloSubscriptionServer +- `subscribe(query, operation_name, callback, variables, context, format_error, format_response)`: Called by SubscriptionServer upon receiving a new subscription from a websocket. Arguments are parsed by SubscriptionServer from the graphql subscription query +- `unsubscribe(sub_id)`: Sub_id is the subscription ID that is being tracked by the subscription manager instance -- returned from the `subscribe()` method and called by the SubscriptionServer -### ApolloSubscriptionServer(subscription_manager, websocket, keep_alive=None, on_subscribe=None, on_unsubscribe=None, on_connect=None, on_disconnect=None) +### SubscriptionServer(subscription_manager, websocket, keep_alive=None, on_subscribe=None, on_unsubscribe=None, on_connect=None, on_disconnect=None) #### Arguments - `subscription_manager`: A subscripton manager instance (required). - `websocket`: The websocket object passed in from your route handler (required). @@ -78,7 +78,7 @@ from flask_sockets import Sockets from graphql_subscriptions import ( SubscriptionManager, RedisPubsub, - ApolloSubscriptionServer + SubscriptionServer ) app = Flask(__name__) @@ -106,7 +106,7 @@ subscription_mgr = SubscriptionManager(schema, pubsub) # subscription app / server -- passing in subscription manager and websocket @sockets.route('/socket') def socket_channel(websocket): - subscription_server = ApolloSubscriptionServer(subscription_mgr, websocket) + subscription_server = SubscriptionServer(subscription_mgr, websocket) subscription_server.handle() return [] diff --git a/graphql_subscriptions/__init__.py b/graphql_subscriptions/__init__.py index 727c760..2b8d152 100644 --- a/graphql_subscriptions/__init__.py +++ b/graphql_subscriptions/__init__.py @@ -1,4 +1,5 @@ from subscription_manager import RedisPubsub, SubscriptionManager -from subscription_transport_ws import ApolloSubscriptionServer +from subscription_transport_ws import SubscriptionServer + +__all__ = ['RedisPubsub', 'SubscriptionManager', 'SubscriptionServer'] -__all__ = ['RedisPubsub', 'SubscriptionManager', 'ApolloSubscriptionServer'] diff --git a/graphql_subscriptions/subscription_transport_ws.py b/graphql_subscriptions/subscription_transport_ws.py index 6bc2988..552b09c 100644 --- a/graphql_subscriptions/subscription_transport_ws.py +++ b/graphql_subscriptions/subscription_transport_ws.py @@ -15,7 +15,7 @@ GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions' -class ApolloSubscriptionServer(WebSocketApplication): +class SubscriptionServer(WebSocketApplication): def __init__(self, subscription_manager, websocket, @@ -37,7 +37,7 @@ def __init__(self, self.connection_subscriptions = {} self.connection_context = {} - super(ApolloSubscriptionServer, self).__init__(websocket) + super(SubscriptionServer, self).__init__(websocket) def timer(self, callback, period): while True: diff --git a/tests/test_subscription_transport.py b/tests/test_subscription_transport.py index f1e92be..d8794a9 100644 --- a/tests/test_subscription_transport.py +++ b/tests/test_subscription_transport.py @@ -25,7 +25,7 @@ import requests from graphql_subscriptions import (RedisPubsub, SubscriptionManager, - ApolloSubscriptionServer) + SubscriptionServer) from graphql_subscriptions.subscription_transport_ws import ( SUBSCRIPTION_FAIL, SUBSCRIPTION_DATA, KEEPALIVE) @@ -264,7 +264,7 @@ def sub_mgr_publish(): @sockets.route('/socket') def socket_channel(websocket): - subscription_server = ApolloSubscriptionServer(sub_mgr, websocket, + subscription_server = SubscriptionServer(sub_mgr, websocket, **options) subscription_server.handle() return [] @@ -323,7 +323,7 @@ def server_with_events(sub_mgr, schema, events_options): def test_raise_exception_when_create_server_and_no_sub_mgr(): with pytest.raises(AssertionError): - ApolloSubscriptionServer(None, None) + SubscriptionServer(None, None) def test_should_trigger_on_connect_if_client_connect_valid(server_with_events): From ad2526572b68574083b732166a720c34c9579948 Mon Sep 17 00:00:00 2001 From: hballard Date: Tue, 6 Jun 2017 23:28:29 -0500 Subject: [PATCH 07/12] Complete subscription_transport_ws tests --- tests/test_subscription_transport.py | 285 ++++++++++++++++++--------- 1 file changed, 192 insertions(+), 93 deletions(-) diff --git a/tests/test_subscription_transport.py b/tests/test_subscription_transport.py index d8794a9..d201a9d 100644 --- a/tests/test_subscription_transport.py +++ b/tests/test_subscription_transport.py @@ -12,7 +12,7 @@ import threading import time -from flask import Flask, request +from flask import Flask, request, jsonify from flask_graphql import GraphQLView from flask_sockets import Sockets from geventwebsocket import WebSocketServer @@ -28,7 +28,7 @@ SubscriptionServer) from graphql_subscriptions.subscription_transport_ws import ( - SUBSCRIPTION_FAIL, SUBSCRIPTION_DATA, KEEPALIVE) + SUBSCRIPTION_FAIL, SUBSCRIPTION_DATA) if os.name == 'posix' and sys.version_info[0] < 3: import subprocess32 as subprocess @@ -41,8 +41,6 @@ import queue as Queue TEST_PORT = 5000 -KEEP_ALIVE_TEST_PORT = TEST_PORT + 1 -EVENTS_TEST_PORT = TEST_PORT + 5 class PickableMock(): @@ -173,25 +171,20 @@ def user_filtered(**kwargs): @pytest.fixture -def handlers(): +def on_sub_handler(): + def context_handler(): + raise Exception('bad') + def on_subscribe(msg, params, websocket): new_params = copy.deepcopy(params) - new_params.update({'context': msg.get('context', {})}) + new_params.update({'context': context_handler}) return new_params return {'on_subscribe': promisify(on_subscribe)} @pytest.fixture -def options1(handlers): - return { - 'on_subscribe': - lambda msg, params, ws: handlers['on_subscribe'](msg, params, ws) - } - - -@pytest.fixture -def options(mocker): +def on_sub_mock(mocker): mgr = multiprocess.Manager() q = mgr.Queue() @@ -202,16 +195,16 @@ def on_subscribe(self, msg, params, websocket): q.put(self) return new_params - options = { + on_sub_mock = { 'on_subscribe': PickableMock(side_effect=promisify(on_subscribe), name='on_subscribe') } - return options, q + return on_sub_mock, q @pytest.fixture -def events_options(mocker): +def options_mocks(mocker): mgr = multiprocess.Manager() q = mgr.Queue() @@ -231,7 +224,7 @@ def on_disconnect(self, websocket): def on_unsubscribe(self, websocket): q.put(self) - events_options = { + options_mocks = { 'on_subscribe': PickableMock(side_effect=promisify(on_subscribe), name='on_subscribe'), 'on_unsubscribe': @@ -245,7 +238,7 @@ def on_unsubscribe(self, websocket): PickableMock(side_effect=on_disconnect, name='on_disconnect') } - return events_options, q + return options_mocks, q def create_app(sub_mgr, schema, options): @@ -261,11 +254,11 @@ def create_app(sub_mgr, schema, options): @app.route('/publish', methods=['POST']) def sub_mgr_publish(): sub_mgr.publish(*request.get_json()) + return jsonify(request.get_json()) @sockets.route('/socket') def socket_channel(websocket): - subscription_server = SubscriptionServer(sub_mgr, websocket, - **options) + subscription_server = SubscriptionServer(sub_mgr, websocket, **options) subscription_server.handle() return [] @@ -278,10 +271,10 @@ def app_worker(app, port): @pytest.fixture() -def server(sub_mgr, schema, options): +def server(sub_mgr, schema, on_sub_mock): - opt, q = options - app = create_app(sub_mgr, schema, opt) + options, q = on_sub_mock + app = create_app(sub_mgr, schema, options) process = multiprocess.Process( target=app_worker, kwargs={'app': app, @@ -292,32 +285,43 @@ def server(sub_mgr, schema, options): @pytest.fixture() -def server_with_keep_alive(sub_mgr, schema, options): +def server_with_mocks(sub_mgr, schema, options_mocks): - options_with_keep_alive = options.copy() - options_with_keep_alive.update({'keep_alive': 10}) - app = create_app(sub_mgr, schema, options_with_keep_alive) + options, q = options_mocks + app = create_app(sub_mgr, schema, options) process = multiprocess.Process( target=app_worker, kwargs={'app': app, - 'port': KEEP_ALIVE_TEST_PORT}) + 'port': TEST_PORT}) + process.start() - yield + yield q process.terminate() @pytest.fixture() -def server_with_events(sub_mgr, schema, events_options): +def server_with_on_sub_handler(sub_mgr, schema, on_sub_handler): - options, q = events_options - app = create_app(sub_mgr, schema, options) + app = create_app(sub_mgr, schema, on_sub_handler) process = multiprocess.Process( target=app_worker, kwargs={'app': app, - 'port': EVENTS_TEST_PORT}) + 'port': TEST_PORT}) + process.start() + yield + process.terminate() + + +@pytest.fixture() +def server_with_keep_alive(sub_mgr, schema): + + app = create_app(sub_mgr, schema, {'keep_alive': .250}) + process = multiprocess.Process( + target=app_worker, kwargs={'app': app, + 'port': TEST_PORT}) process.start() - yield q + yield process.terminate() @@ -326,7 +330,7 @@ def test_raise_exception_when_create_server_and_no_sub_mgr(): SubscriptionServer(None, None) -def test_should_trigger_on_connect_if_client_connect_valid(server_with_events): +def test_should_trigger_on_connect_if_client_connect_valid(server_with_mocks): node_script = ''' module.paths.push('{0}') WebSocket = require('ws') @@ -335,17 +339,17 @@ def test_should_trigger_on_connect_if_client_connect_valid(server_with_events): new SubscriptionClient('ws://localhost:{1}/socket') '''.format( os.path.join(os.path.dirname(__file__), 'node_modules'), - EVENTS_TEST_PORT) + TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) except: - mock = server_with_events.get_nowait() + mock = server_with_mocks.get_nowait() assert mock.name == 'on_connect' mock.assert_called_once() -def test_should_trigger_on_connect_with_correct_cxn_params(server_with_events): +def test_should_trigger_on_connect_with_correct_cxn_params(server_with_mocks): node_script = ''' module.paths.push('{0}') WebSocket = require('ws') @@ -357,18 +361,18 @@ def test_should_trigger_on_connect_with_correct_cxn_params(server_with_events): }}) '''.format( os.path.join(os.path.dirname(__file__), 'node_modules'), - EVENTS_TEST_PORT) + TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) except: - mock = server_with_events.get_nowait() + mock = server_with_mocks.get_nowait() assert mock.name == 'on_connect' mock.assert_called_once() mock.assert_called_with({'test': True}) -def test_trigger_on_disconnect_when_client_disconnects(server_with_events): +def test_trigger_on_disconnect_when_client_disconnects(server_with_mocks): node_script = ''' module.paths.push('{0}') WebSocket = require('ws') @@ -378,14 +382,14 @@ def test_trigger_on_disconnect_when_client_disconnects(server_with_events): client.client.close() '''.format( os.path.join(os.path.dirname(__file__), 'node_modules'), - EVENTS_TEST_PORT) + TEST_PORT) subprocess.check_output(['node', '-e', node_script]) - mock = server_with_events.get_nowait() + mock = server_with_mocks.get_nowait() assert mock.name == 'on_disconnect' mock.assert_called_once() -def test_should_call_unsubscribe_when_client_closes_cxn(server_with_events): +def test_should_call_unsubscribe_when_client_closes_cxn(server_with_mocks): node_script = ''' module.paths.push('{0}') WebSocket = require('ws') @@ -412,20 +416,19 @@ def test_should_call_unsubscribe_when_client_closes_cxn(server_with_events): }}, 500) '''.format( os.path.join(os.path.dirname(__file__), 'node_modules'), - EVENTS_TEST_PORT) + TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=1) except: while True: - mock = server_with_events.get_nowait() + mock = server_with_mocks.get_nowait() if mock.name == 'on_unsubscribe': mock.assert_called_once() break -def test_should_trigger_on_subscribe_when_client_subscribes( - server_with_events): +def test_should_trigger_on_subscribe_when_client_subscribes(server_with_mocks): node_script = ''' module.paths.push('{0}') WebSocket = require('ws') @@ -448,20 +451,20 @@ def test_should_trigger_on_subscribe_when_client_subscribes( }}) '''.format( os.path.join(os.path.dirname(__file__), 'node_modules'), - EVENTS_TEST_PORT) + TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) except: while True: - mock = server_with_events.get_nowait() + mock = server_with_mocks.get_nowait() if mock.name == 'on_subscribe': mock.assert_called_once() break def test_should_trigger_on_unsubscribe_when_client_unsubscribes( - server_with_events): + server_with_mocks): node_script = ''' module.paths.push('{0}') WebSocket = require('ws') @@ -485,20 +488,19 @@ def test_should_trigger_on_unsubscribe_when_client_unsubscribes( client.unsubscribe(subId) '''.format( os.path.join(os.path.dirname(__file__), 'node_modules'), - EVENTS_TEST_PORT) + TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) except: while True: - mock = server_with_events.get_nowait() + mock = server_with_mocks.get_nowait() if mock.name == 'on_unsubscribe': mock.assert_called_once() break -def test_should_send_correct_results_to_multiple_client_subscriptions( - server): +def test_should_send_correct_results_to_multiple_client_subscriptions(server): node_script = ''' module.paths.push('{0}') @@ -567,8 +569,7 @@ def test_should_send_correct_results_to_multiple_client_subscriptions( }} ); '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) p = subprocess.Popen( ['node', '-e', node_script], @@ -577,8 +578,7 @@ def test_should_send_correct_results_to_multiple_client_subscriptions( stderr=subprocess.STDOUT) time.sleep(.2) requests.post( - 'http://localhost:{0}/publish'.format(TEST_PORT), - json=['user', {}]) + 'http://localhost:{0}/publish'.format(TEST_PORT), json=['user', {}]) q = Queue.Queue() t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True @@ -606,9 +606,9 @@ def test_should_send_correct_results_to_multiple_client_subscriptions( assert client2['numResults'] == 1 -# Graphene subscriptions implementation does not currently return an error for -# missing or incorrect field(s); this test will continue to fail until that is -# fixed +# TODO: Graphene subscriptions implementation does not currently return an +# error for missing or incorrect field(s); this test will continue to fail +# until that is fixed def test_send_subscription_fail_message_to_client_with_invalid_query(server): node_script = ''' @@ -639,8 +639,7 @@ def test_send_subscription_fail_message_to_client_with_invalid_query(server): console.log(JSON.stringify({{[msg.type]: msg}})) }}; '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) p = subprocess.Popen( ['node', '-e', node_script], @@ -667,6 +666,8 @@ def test_send_subscription_fail_message_to_client_with_invalid_query(server): assert len(ret_values['payload']['errors']) > 0 +# TODO: troubleshoot this a bit...passes, but receives extra messages which I'm +# filtering out w/ the "AttributeError" exception clause def test_should_setup_the_proper_filters_when_subscribing(server): node_script = ''' module.paths.push('{0}') @@ -733,8 +734,7 @@ def test_should_setup_the_proper_filters_when_subscribing(server): }} ); '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) p = subprocess.Popen( ['node', '-e', node_script], @@ -744,13 +744,19 @@ def test_should_setup_the_proper_filters_when_subscribing(server): time.sleep(.2) requests.post( 'http://localhost:{0}/publish'.format(TEST_PORT), - json=['user_filtered', {'id': 1}]) + json=['user_filtered', { + 'id': 1 + }]) requests.post( 'http://localhost:{0}/publish'.format(TEST_PORT), - json=['user_filtered', {'id': 2}]) + json=['user_filtered', { + 'id': 2 + }]) requests.post( 'http://localhost:{0}/publish'.format(TEST_PORT), - json=['user_filtered', {'id': 3}]) + json=['user_filtered', { + 'id': 3 + }]) q = Queue.Queue() t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True @@ -764,6 +770,8 @@ def test_should_setup_the_proper_filters_when_subscribing(server): ret_values[line.keys()[0]] = line[line.keys()[0]] except ValueError: pass + except AttributeError: + pass except Queue.Empty: break client = ret_values['client'] @@ -809,8 +817,7 @@ def test_correctly_sets_the_context_in_on_subscribe(server): }} ); '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) p = subprocess.Popen( ['node', '-e', node_script], @@ -819,8 +826,7 @@ def test_correctly_sets_the_context_in_on_subscribe(server): stderr=subprocess.STDOUT) time.sleep(.2) requests.post( - 'http://localhost:{0}/publish'.format(TEST_PORT), - json=['context', {}]) + 'http://localhost:{0}/publish'.format(TEST_PORT), json=['context', {}]) q = Queue.Queue() t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True @@ -860,8 +866,7 @@ def test_passes_through_websocket_request_to_on_subscribe(server): }} ); '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) try: subprocess.check_output( @@ -906,8 +911,7 @@ def test_does_not_send_subscription_data_after_client_unsubscribes(server): console.log(JSON.stringify({{[msg.type]: msg}})) }}; '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) p = subprocess.Popen( ['node', '-e', node_script], @@ -916,8 +920,7 @@ def test_does_not_send_subscription_data_after_client_unsubscribes(server): stderr=subprocess.STDOUT) time.sleep(.2) requests.post( - 'http://localhost:{0}/publish'.format(TEST_PORT), - json=['user', {}]) + 'http://localhost:{0}/publish'.format(TEST_PORT), json=['user', {}]) q = Queue.Queue() t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True @@ -937,8 +940,8 @@ def test_does_not_send_subscription_data_after_client_unsubscribes(server): assert ret_values[SUBSCRIPTION_DATA] -# Need to look into why this test is failing; current thrown code is 1006 not -# 1002 like it should be (1006 more general than 1002 protocol error) +# TODO: Need to look into why this test is throwing code 1006, not 1002 like +# it should be (1006 more general than 1002 protocol error) def test_rejects_client_that_does_not_specifiy_a_supported_protocol(server): node_script = ''' @@ -950,8 +953,7 @@ def test_rejects_client_that_does_not_specifiy_a_supported_protocol(server): }} ); '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) p = subprocess.Popen( ['node', '-e', node_script], @@ -994,8 +996,7 @@ def test_rejects_unparsable_message(server): client.send('HI'); }} '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) p = subprocess.Popen( ['node', '-e', node_script], @@ -1039,8 +1040,7 @@ def test_rejects_nonsense_message(server): client.send(JSON.stringify({{}})); }} '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) p = subprocess.Popen( ['node', '-e', node_script], @@ -1087,8 +1087,7 @@ def test_does_not_crash_on_unsub_from_unknown_sub(server): console.log(JSON.stringify({{[msg.type]: msg}})) }}; '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) p = subprocess.Popen( ['node', '-e', node_script], @@ -1137,8 +1136,7 @@ def test_sends_back_any_type_of_error(server): }} ); '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) p = subprocess.Popen( ['node', '-e', node_script], @@ -1162,3 +1160,104 @@ def test_sends_back_any_type_of_error(server): except Queue.Empty: break assert len(ret_values['errors']) > 0 + + +def test_handles_errors_prior_to_graphql_execution(server_with_on_sub_handler): + + node_script = ''' + module.paths.push('{0}') + WebSocket = require('ws') + const SubscriptionClient = + require('subscriptions-transport-ws').SubscriptionClient + const client = new SubscriptionClient('ws://localhost:{1}/socket') + client.subscribe({{ + query: `subscription context {{ + context + }}`, + variables: {{}}, + context: {{}}, + }}, function (errors, result) {{ + client.unsubscribeAll(); + if (errors) {{ + console.log(JSON.stringify({{'errors': errors}})) + }} + if (result) {{ + console.log(JSON.stringify({{'result': result}})) + }} + }} + ); + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.2) + requests.post( + 'http://localhost:{0}/publish'.format(TEST_PORT), json=['context', {}]) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.2) + ret_values = {} + while True: + try: + line = q.get_nowait() + line = json.loads(line) + ret_values[line.keys()[0]] = line[line.keys()[0]] + except ValueError: + pass + except Queue.Empty: + break + assert isinstance(ret_values['errors'], list) + assert ret_values['errors'][0]['message'] == 'bad' + + +def test_sends_a_keep_alive_signal_in_the_socket(server_with_keep_alive): + + node_script = ''' + module.paths.push('{0}'); + WebSocket = require('ws'); + const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions'; + const KEEP_ALIVE = 'keepalive'; + const client = new WebSocket('ws://localhost:{1}/socket', + GRAPHQL_SUBSCRIPTIONS); + let yieldCount = 0; + client.onmessage = (message) => {{ + let msg = JSON.parse(message.data) + if (msg.type === KEEP_ALIVE) {{ + yieldCount += 1; + if (yieldCount > 1) {{ + let returnMsg = {{'type': msg.type, 'yieldCount': yieldCount}} + console.log(JSON.stringify(returnMsg)) + client.close(); + }} + }} + }}; + '''.format( + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) + + p = subprocess.Popen( + ['node', '-e', node_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(.5) + q = Queue.Queue() + t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) + t.daemon = True + t.start() + time.sleep(.5) + while True: + try: + line = q.get_nowait() + ret_value = json.loads(line) + except ValueError: + pass + except Queue.Empty: + break + assert ret_value['type'] == 'keepalive' + assert ret_value['yieldCount'] > 1 From 895c5753eae490358e552c5bc1ad08b5fb7b8c56 Mon Sep 17 00:00:00 2001 From: hballard Date: Tue, 6 Jun 2017 23:30:01 -0500 Subject: [PATCH 08/12] Change "nonlocal" class to "non_local" named dict --- tests/test_subscription_manager.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/test_subscription_manager.py b/tests/test_subscription_manager.py index ea75de5..0d86b3e 100644 --- a/tests/test_subscription_manager.py +++ b/tests/test_subscription_manager.py @@ -262,8 +262,7 @@ def publish_and_unsubscribe_handler(sub_id): def test_can_subscribe_to_more_than_one_trigger(sub_mgr): - class nonlocal: - trigger_count = 0 + non_local = {'trigger_count': 0} query = 'subscription multiTrigger($filterBoolean: Boolean,\ $uga: String){testFilterMulti(filterBoolean: $filterBoolean,\ @@ -278,10 +277,10 @@ def callback(err, payload): assert True else: assert payload.data.get('testFilterMulti') == 'good_filter' - nonlocal.trigger_count += 1 + non_local['trigger_count'] += 1 except AssertionError as e: sys.exit(e) - if nonlocal.trigger_count == 2: + if non_local['trigger_count'] == 2: sub_mgr.pubsub.greenlet.kill() def publish_and_unsubscribe_handler(sub_id): From fd48b05ac9b94e6161b41a71ae925cb7fcf61f59 Mon Sep 17 00:00:00 2001 From: hballard Date: Tue, 6 Jun 2017 23:32:48 -0500 Subject: [PATCH 09/12] Make several minor changes to subcription_transport Modified "nonlocal" class to "non_local" dict and changed parameter['context'] defination statement --- .../subscription_transport_ws.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/graphql_subscriptions/subscription_transport_ws.py b/graphql_subscriptions/subscription_transport_ws.py index 552b09c..b88a73b 100644 --- a/graphql_subscriptions/subscription_transport_ws.py +++ b/graphql_subscriptions/subscription_transport_ws.py @@ -77,11 +77,11 @@ def on_message(self, msg): if msg is None: return - nonlocal = {'on_init_resolve': None, 'on_init_reject': None} + non_local = {'on_init_resolve': None, 'on_init_reject': None} def init_promise_handler(resolve, reject): - nonlocal['on_init_resolve'] = resolve - nonlocal['on_init_reject'] = reject + non_local['on_init_resolve'] = resolve + non_local['on_init_reject'] = reject self.connection_context['init_promise'] = Promise(init_promise_handler) @@ -105,7 +105,7 @@ def on_message_return_handler(message): self.on_connect( parsed_message.get('payload'), self.ws)) - nonlocal['on_init_resolve'](on_connect_promise) + non_local['on_init_resolve'](on_connect_promise) def init_success_promise_handler(result): if not result: @@ -131,7 +131,8 @@ def subscription_start_promise_handler(init_result): 'callback': None, 'variables': parsed_message.get('variables'), 'context': init_result if isinstance( - init_result, dict) else {}, + init_result, dict) else + parsed_message.get('context', {}), 'format_error': None, 'format_response': None } @@ -149,8 +150,7 @@ def subscription_start_promise_handler(init_result): def promised_params_handler(params): if not isinstance(params, dict): error = 'Invalid params returned from\ - OnSubscribe! Return value must\ - be an dict' +OnSubscribe! Return value must be an dict' self.send_subscription_fail( sub_id, {'errors': [{ @@ -162,15 +162,15 @@ def params_callback(error, result): if not error: self.send_subscription_data( sub_id, {'data': result.data}) - elif error.errors: - self.send_subscription_data( - sub_id, {'errors': error.errors}) elif error.message: self.send_subscription_data( sub_id, {'errors': [{ 'message': error.message }]}) + elif error.errors: + self.send_subscription_data( + sub_id, {'errors': error.errors}) else: self.send_subscription_data( sub_id, @@ -216,7 +216,7 @@ def error_catch_handler(e): # not sure if this behavior is correct or # not per promises A spec...need to # investigate - nonlocal['on_init_resolve'](Promise.resolve(True)) + non_local['on_init_resolve'](Promise.resolve(True)) self.connection_context['init_promise'].then( subscription_start_promise_handler) @@ -229,7 +229,7 @@ def subscription_end_promise_handler(result): del self.connection_subscriptions[sub_id] # same rationale as above - nonlocal['on_init_resolve'](Promise.resolve(True)) + non_local['on_init_resolve'](Promise.resolve(True)) self.connection_context['init_promise'].then( subscription_end_promise_handler) From ed6b71a3458f6bbb5c19aca567bb5d5e01a6106e Mon Sep 17 00:00:00 2001 From: hballard Date: Wed, 7 Jun 2017 16:45:37 -0500 Subject: [PATCH 10/12] Add "test_suite"="pytest" to setup.py --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 74b0f1e..5e03206 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ install_requires=[ 'gevent-websocket', 'redis', 'promise==1.0.1', 'graphql-core' ], + test_suite='pytest', tests_require=[ 'pytest', 'pytest-mock', 'fakeredis', 'graphene', 'subprocess32', 'flask', 'flask-graphql', 'flask-sockets', 'multiprocess', 'requests' From 770600b7e696bad98462cc45b05a36f0df8077f9 Mon Sep 17 00:00:00 2001 From: hballard Date: Wed, 7 Jun 2017 21:32:56 -0500 Subject: [PATCH 11/12] Refactor setup.py test dependencies Also minor changes to test_susbcription_transport module --- setup.py | 15 ++++++++++----- tests/test_subscription_transport.py | 27 ++++++++------------------- 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/setup.py b/setup.py index 5e03206..1caef1b 100644 --- a/setup.py +++ b/setup.py @@ -6,6 +6,11 @@ except (IOError, ImportError): long_description = open('README.md').read() +tests_dep = [ + 'pytest', 'pytest-mock', 'fakeredis', 'graphene', 'subprocess32', + 'flask', 'flask-graphql', 'flask-sockets', 'multiprocess', 'requests' +] + setup( name='graphql-subscriptions', version='0.1.8', @@ -27,11 +32,11 @@ 'License :: OSI Approved :: MIT License' ], install_requires=[ - 'gevent-websocket', 'redis', 'promise==1.0.1', 'graphql-core' + 'gevent-websocket', 'redis', 'graphql-core', 'promise<=1.0.1' ], test_suite='pytest', - tests_require=[ - 'pytest', 'pytest-mock', 'fakeredis', 'graphene', 'subprocess32', - 'flask', 'flask-graphql', 'flask-sockets', 'multiprocess', 'requests' - ], + tests_require=tests_dep, + extras_require={ + 'test': tests_dep + }, include_package_data=True) diff --git a/tests/test_subscription_transport.py b/tests/test_subscription_transport.py index d201a9d..13d9361 100644 --- a/tests/test_subscription_transport.py +++ b/tests/test_subscription_transport.py @@ -27,8 +27,8 @@ from graphql_subscriptions import (RedisPubsub, SubscriptionManager, SubscriptionServer) -from graphql_subscriptions.subscription_transport_ws import ( - SUBSCRIPTION_FAIL, SUBSCRIPTION_DATA) +from graphql_subscriptions.subscription_transport_ws import (SUBSCRIPTION_FAIL, + SUBSCRIPTION_DATA) if os.name == 'posix' and sys.version_info[0] < 3: import subprocess32 as subprocess @@ -338,8 +338,7 @@ def test_should_trigger_on_connect_if_client_connect_valid(server_with_mocks): require('subscriptions-transport-ws').SubscriptionClient new SubscriptionClient('ws://localhost:{1}/socket') '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) @@ -360,8 +359,7 @@ def test_should_trigger_on_connect_with_correct_cxn_params(server_with_mocks): connectionParams, }}) '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) @@ -381,8 +379,7 @@ def test_trigger_on_disconnect_when_client_disconnects(server_with_mocks): const client = new SubscriptionClient('ws://localhost:{1}/socket') client.client.close() '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) subprocess.check_output(['node', '-e', node_script]) mock = server_with_mocks.get_nowait() assert mock.name == 'on_disconnect' @@ -415,8 +412,7 @@ def test_should_call_unsubscribe_when_client_closes_cxn(server_with_mocks): client.client.close() }}, 500) '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=1) @@ -450,8 +446,7 @@ def test_should_trigger_on_subscribe_when_client_subscribes(server_with_mocks): // nothing }}) '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) @@ -487,8 +482,7 @@ def test_should_trigger_on_unsubscribe_when_client_unsubscribes( }}) client.unsubscribe(subId) '''.format( - os.path.join(os.path.dirname(__file__), 'node_modules'), - TEST_PORT) + os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) @@ -960,7 +954,6 @@ def test_rejects_client_that_does_not_specifiy_a_supported_protocol(server): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - time.sleep(.2) q = Queue.Queue() t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True @@ -1003,7 +996,6 @@ def test_rejects_unparsable_message(server): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - time.sleep(.2) q = Queue.Queue() t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True @@ -1047,7 +1039,6 @@ def test_rejects_nonsense_message(server): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - time.sleep(.2) q = Queue.Queue() t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True @@ -1094,7 +1085,6 @@ def test_does_not_crash_on_unsub_from_unknown_sub(server): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - time.sleep(.2) q = Queue.Queue() t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True @@ -1245,7 +1235,6 @@ def test_sends_a_keep_alive_signal_in_the_socket(server_with_keep_alive): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - time.sleep(.5) q = Queue.Queue() t = threading.Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True From 19562e7b79061bddd0f7336cfec73bc34ee18949 Mon Sep 17 00:00:00 2001 From: hballard Date: Wed, 7 Jun 2017 21:52:00 -0500 Subject: [PATCH 12/12] Modify first couple paragraphs of README slightly --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2a15170..43f6526 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ #### (Work in Progress!) A port of apollographql subscriptions for python, using gevent websockets and redis -This is a implementation of apollographql [subscriptions-transport-ws](https://github.com/apollographql/subscriptions-transport-ws) and [graphql-subscriptions](https://github.com/apollographql/graphql-subscriptions) in Python. It currently implements a pubsub using [redis-py](https://github.com/andymccurdy/redis-py) and uses [gevent-websockets](https://bitbucket.org/noppo/gevent-websocket) for concurrency. It also makes heavy use of [syrusakbary/promise](https://github.com/syrusakbary/promise) python implementation to mirror the logic in the apollo-graphql libraries. +This is an implementation of graphql subscriptions in Python. It uses the apollographql [subscriptions-transport-ws](https://github.com/apollographql/subscriptions-transport-ws) and [graphql-subscriptions](https://github.com/apollographql/graphql-subscriptions) packages as its basis. It currently implements a pubsub using [redis-py](https://github.com/andymccurdy/redis-py) and uses [gevent-websockets](https://bitbucket.org/noppo/gevent-websocket) for concurrency. It also makes heavy use of [syrusakbary/promise](https://github.com/syrusakbary/promise) python implementation to mirror the logic in the apollo-graphql libraries. -Meant to be used in conjunction with [graphql-python](https://github.com/graphql-python) / [graphene](http://graphene-python.org/) server and [apollo-client](http://dev.apollodata.com/) for graphql. The api is below, but if you want more information, consult the apollo graphql libraries referenced above. +Meant to be used in conjunction with [graphql-python](https://github.com/graphql-python) / [graphene](http://graphene-python.org/) server and [apollo-client](http://dev.apollodata.com/) for graphql. The api is below, but if you want more information, consult the apollo graphql libraries referenced above, and specifcally as it relates to using their graphql subscriptions client. -Initial implementation. Currently only works with Python 2. +Initial implementation. Good test coverage. Currently only works with Python 2. ## Installation ```