Skip to content

Commit

Permalink
Refactor all gRPC proxy code into a single class.
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianludwin authored and Commit Bot committed Aug 4, 2017
1 parent aa0910e commit a0cd804
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 214 deletions.
12 changes: 12 additions & 0 deletions appengine/swarming/doc/Magic-Values.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ always set:
The following environment variables may be set to alter bot behavior:

- `SWARMING_EXTERNAL_BOT_SETUP=1` disables bot_config.setup_bot hook.
- `SWARMING_GRPC_PROXY=<url>` and `ISOLATED_GRPC_PROXY=<url>` override the
equivalent value in the bot config.
- `LUCI_GRPC_PROXY_VERBOSE` dumps out additional gRPC proxy information if set
to a truthy value (e.g. `1`).
- `LUCI_GRPC_PROXY_TLS_ROOTS=<file>` and points to a .crt file containing
certificate authorities. `LUCI_GRPC_PROXY_TLS_OVERRIDE=<name>` specifies the
name of the server in the certificate. These are useful for testing a gRPC
proxy running on localhost but with TLS enabled. Unlike the `*_GRPC_PROXY`
env vars, these are shared between Swarming and Isolated since they're only
used in the limited case when you need to override TLS. See
[/client/utils/grpc_proxy.py](../../../client/utils/grpc_proxy.py) for more
information.


### dimensions
Expand Down
1 change: 1 addition & 0 deletions appengine/swarming/server/bot_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@
'utils/cacert.pem',
'utils/file_path.py',
'utils/fs.py',
'utils/grpc_proxy.py',
'utils/large.py',
'utils/logging_utils.py',
'utils/lru.py',
Expand Down
2 changes: 2 additions & 0 deletions appengine/swarming/swarming_bot/bot_code/remote_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import base64
import logging
import os
import threading
import time
import traceback
Expand Down Expand Up @@ -42,6 +43,7 @@


def createRemoteClient(server, auth, grpc_proxy):
grpc_proxy = os.environ.get('SWARMING_GRPC_PROXY', grpc_proxy)
if grpc_proxy:
import remote_client_grpc
return remote_client_grpc.RemoteClientGrpc(grpc_proxy)
Expand Down
48 changes: 11 additions & 37 deletions appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,13 @@
import logging
import time

import grpc
import google.protobuf.json_format
from proto_bot import swarming_bot_pb2
from remote_client_errors import InternalError
from remote_client_errors import MintOAuthTokenError
from remote_client_errors import PollError
from utils import net


# How long to wait for a response from the server. Keeping the same as
# the equivalent in remote_client.py for now.
NET_CONNECTION_TIMEOUT_SEC = 5*60


# How many times to retry a gRPC call
MAX_GRPC_ATTEMPTS = 30


# Longest time to sleep between gRPC calls
MAX_GRPC_SLEEP = 10.
from utils import grpc_proxy


class RemoteClientGrpc(object):
Expand All @@ -40,11 +27,13 @@ class RemoteClientGrpc(object):
might be garbage-collected before the values are used.
"""

def __init__(self, server):
def __init__(self, server, fake_proxy=None):
logging.info('Communicating with host %s via gRPC', server)
if fake_proxy:
self._proxy = fake_proxy
else:
self._proxy = grpc_proxy.Proxy(server, swarming_bot_pb2.BotServiceStub)
self._server = server
self._channel = grpc.insecure_channel(server)
self._stub = swarming_bot_pb2.BotServiceStub(self._channel)
self._log_is_asleep = False

def is_grpc(self):
Expand Down Expand Up @@ -84,7 +73,7 @@ def post_task_update(self, task_id, bot_id, params,
google.protobuf.json_format.ParseDict(params, request)

# Perform update
response = call_grpc(self._stub.TaskUpdate, request)
response = self._proxy.call_unary('TaskUpdate', request)
logging.debug('post_task_update() = %s', request)
if response.error:
raise InternalError(response.error)
Expand All @@ -97,7 +86,7 @@ def post_task_error(self, task_id, bot_id, message):
request.msg = message
logging.error('post_task_error() = %s', request)

response = call_grpc(self._stub.TaskError, request)
response = self._proxy.call_unary('TaskError', request)
return response.ok

def _attributes_json_to_proto(self, json_attr, msg):
Expand All @@ -111,7 +100,7 @@ def _attributes_json_to_proto(self, json_attr, msg):
def do_handshake(self, attributes):
request = swarming_bot_pb2.HandshakeRequest()
self._attributes_json_to_proto(attributes, request.attributes)
response = call_grpc(self._stub.Handshake, request)
response = self._proxy.call_unary('Handshake', request)
resp = {
'server_version': response.server_version,
'bot_version': response.bot_version,
Expand All @@ -129,7 +118,7 @@ def poll(self, attributes):
request = swarming_bot_pb2.PollRequest()
self._attributes_json_to_proto(attributes, request.attributes)
# TODO(aludwin): gRPC-specific exception handling (raise PollError).
response = call_grpc(self._stub.Poll, request)
response = self._proxy.call_unary('Poll', request)

if response.cmd == swarming_bot_pb2.PollResponse.UPDATE:
return 'update', response.version
Expand Down Expand Up @@ -189,7 +178,7 @@ def get_bot_code(self, new_zip_fn, bot_version, _bot_id):
logging.info('Updating to version: %s', bot_version)
request = swarming_bot_pb2.BotUpdateRequest()
request.bot_version = bot_version
response = call_grpc(self._stub.BotUpdate, request)
response = self._proxy.call_unary('BotUpdate', request)
with open(new_zip_fn, 'wb') as f:
f.write(response.bot_code)

Expand Down Expand Up @@ -244,18 +233,3 @@ def insert_dict_as_submessage(message, keyname, value):
google.protobuf.json_format.Parse(json.dumps(value), sub_msg)


def call_grpc(method, request):
"""Retries a command a set number of times"""
for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
try:
return method(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
except grpc.RpcError as g:
if g.code() is not grpc.StatusCode.UNAVAILABLE:
raise
logging.warning('call_grpc - proxy is unavailable (attempt %d/%d)',
attempt, MAX_GRPC_ATTEMPTS)
grpc_error = g
time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
# If we get here, it must be because we got (and saved) an error
assert grpc_error is not None
raise grpc_error
106 changes: 7 additions & 99 deletions appengine/swarming/swarming_bot/bot_code/remote_client_grpc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,15 @@
test_env_bot_code.setup_test_env()

from depot_tools import auto_stub
import remote_client_grpc

try:
import remote_client_grpc
except ImportError as e:
print('Could not import gRPC remote client, likely due to missing grpc '
'library. Skipping tests.')
sys.exit(0)


class FakeBotServiceStub(object):
class FakeGrpcProxy(object):
def __init__(self, testobj):
self._testobj = testobj

def TaskUpdate(self, request, **_kwargs):
return self._testobj._handle_call('TaskUpdate', request)

def Handshake(self, request, **_kwargs):
return self._testobj._handle_call('Handshake', request)

def Poll(self, request, **_kwargs):
return self._testobj._handle_call('Poll', request)


# If gRPC isn't successfully imported, this will be seen as a nonstandard
# exception because it won't appear to be derived from Exception. This
# only affects PyLint because the test will never be run if gRPC import
# fails.
# pylint: disable=W0710
class FakeGrpcError(remote_client_grpc.grpc.RpcError):
"""Duplicates a basic UNAVAILABLE error"""
def __init__(self, code):
self._code = code
super(FakeGrpcError, self).__init__('something terrible happened')

def code(self):
return self._code
def call_unary(self, name, request):
return self._testobj._handle_call(name, request)


class TestRemoteClientGrpc(auto_stub.TestCase):
Expand All @@ -58,22 +31,13 @@ def setUp(self):
def fake_sleep(_time):
self._num_sleeps += 1
self.mock(time, 'sleep', fake_sleep)
self._client = remote_client_grpc.RemoteClientGrpc('1.2.3.4:90')
self._client._stub = FakeBotServiceStub(self)
self._client = remote_client_grpc.RemoteClientGrpc('1.2.3.4:90',
FakeGrpcProxy(self))
self._expected = []
self._error_codes = []

def _handle_call(self, method, request):
"""This is called by FakeBotServiceStub to implement fake calls"""
if len(self._error_codes) > 0:
code, self._error_codes = self._error_codes[0], self._error_codes[1:]
raise FakeGrpcError(code)

if self._error_codes:
text = self._error_codes
self._error_codes = ''
raise FakeGrpcError(text)

"""This is called by FakeGrpcProxy to implement fake calls"""
# Pop off the first item on the list
self.assertTrue(len(self._expected) > 0)
expected, self._expected = self._expected[0], self._expected[1:]
Expand Down Expand Up @@ -154,62 +118,6 @@ def test_handshake(self):
},
})

def test_handshake_grpc_unavailable(self):
"""Ensures that the handshake function sleeps after a gRPC error"""
msg_req = remote_client_grpc.swarming_bot_pb2.HandshakeRequest()
msg_req.attributes.CopyFrom(self.get_bot_attributes_proto())

# Create proto response
msg_rsp = remote_client_grpc.swarming_bot_pb2.HandshakeResponse()
msg_rsp.server_version = '101'
msg_rsp.bot_version = '102'
d1 = msg_rsp.bot_group_cfg.dimensions.add()
d1.name = 'mammal'
d1.values.extend(['kangaroo', 'emu'])

# Execute call and verify response
expected_call = ('Handshake', msg_req, msg_rsp)
self._expected.append(expected_call)
self._error_codes.append(remote_client_grpc.grpc.StatusCode.UNAVAILABLE)
self._error_codes.append(remote_client_grpc.grpc.StatusCode.UNAVAILABLE)
response = self._client.do_handshake(self.get_bot_attributes_dict())
self.assertEqual(self._num_sleeps, 2)
self.assertEqual(response, {
'server_version': u'101',
'bot_version': u'102',
'bot_group_cfg_version': u'',
'bot_group_cfg': {
'dimensions': {
u'mammal': [u'kangaroo', u'emu'],
},
},
})

def test_handshake_grpc_other_error(self):
"""Ensures that the handshake function only catches UNAVAILABLE"""
self._error_codes.append(remote_client_grpc.grpc.StatusCode.UNAVAILABLE)
self._error_codes.append(remote_client_grpc.grpc.StatusCode.INTERNAL)
got_exception = None
try:
self._client.do_handshake(self.get_bot_attributes_dict())
except remote_client_grpc.grpc.RpcError as g:
got_exception = g
self.assertEqual(got_exception.code(),
remote_client_grpc.grpc.StatusCode.INTERNAL)
self.assertEqual(self._num_sleeps, 1)

def test_handshake_grpc_too_many_errors(self):
"""Ensures that the handshake function only catches UNAVAILABLE"""
self._error_codes.append(remote_client_grpc.grpc.StatusCode.UNAVAILABLE)
self._error_codes.append(remote_client_grpc.grpc.StatusCode.UNAVAILABLE)
self._error_codes.append(remote_client_grpc.grpc.StatusCode.UNAVAILABLE)
self.mock(remote_client_grpc, 'MAX_GRPC_ATTEMPTS', 2)
with self.assertRaises(remote_client_grpc.grpc.RpcError) as g:
self._client.do_handshake(self.get_bot_attributes_dict())
self.assertEqual(self._num_sleeps, 2)
self.assertEqual(g.exception.code(),
remote_client_grpc.grpc.StatusCode.UNAVAILABLE)

def test_poll_manifest(self):
"""Verifies that we can generate a reasonable manifest from a proto"""
msg_req = remote_client_grpc.swarming_bot_pb2.PollRequest()
Expand Down
Loading

0 comments on commit a0cd804

Please sign in to comment.