Skip to content

Commit

Permalink
Replace custom blob gRPC API with ByteStream
Browse files Browse the repository at this point in the history
Also allow bot code to be sharded in memcached to get around the 1MB
memcached limit.

BUG=

Review-Url: https://codereview.chromium.org/2953253003
  • Loading branch information
adrianludwin authored and Commit Bot committed Jun 28, 2017
1 parent d96432e commit e3c4ba9
Show file tree
Hide file tree
Showing 50 changed files with 6,605 additions and 1,147 deletions.
37 changes: 36 additions & 1 deletion appengine/swarming/server/bot_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
'client/isolateserver.py',
'client/named_cache.py',
'client/proto/__init__.py',
'client/proto/isolate_bot_pb2.py',
'client/proto/bytestream_pb2.py',
'client/run_isolated.py',
'config/__init__.py',
'infra_libs/__init__.py',
Expand Down Expand Up @@ -120,6 +120,15 @@
'python_libusb1/libusb1.py',
'python_libusb1/usb1.py',
'third_party/__init__.py',
'third_party/cachetools/abc.py',
'third_party/cachetools/cache.py',
'third_party/cachetools/func.py',
'third_party/cachetools/__init__.py',
'third_party/cachetools/keys.py',
'third_party/cachetools/lfu.py',
'third_party/cachetools/lru.py',
'third_party/cachetools/rr.py',
'third_party/cachetools/ttl.py',
'third_party/colorama/__init__.py',
'third_party/colorama/ansi.py',
'third_party/colorama/ansitowin32.py',
Expand All @@ -130,6 +139,32 @@
'third_party/depot_tools/fix_encoding.py',
'third_party/depot_tools/subcommand.py',
'third_party/google/__init__.py',
'third_party/google/auth/app_engine.py',
'third_party/google/auth/_cloud_sdk.py',
'third_party/google/auth/credentials.py',
'third_party/google/auth/crypt.py',
'third_party/google/auth/_default.py',
'third_party/google/auth/environment_vars.py',
'third_party/google/auth/exceptions.py',
'third_party/google/auth/_helpers.py',
'third_party/google/auth/iam.py',
'third_party/google/auth/__init__.py',
'third_party/google/auth/jwt.py',
'third_party/google/auth/_oauth2client.py',
'third_party/google/auth/_service_account_info.py',
'third_party/google/auth/compute_engine/credentials.py',
'third_party/google/auth/compute_engine/__init__.py',
'third_party/google/auth/compute_engine/_metadata.py',
'third_party/google/auth/transport/grpc.py',
'third_party/google/auth/transport/_http_client.py',
'third_party/google/auth/transport/__init__.py',
'third_party/google/auth/transport/requests.py',
'third_party/google/auth/transport/urllib3.py',
'third_party/google/oauth2/_client.py',
'third_party/google/oauth2/credentials.py',
'third_party/google/oauth2/id_token.py',
'third_party/google/oauth2/__init__.py',
'third_party/google/oauth2/service_account.py',
'third_party/google/protobuf/__init__.py',
'third_party/google/protobuf/descriptor.py',
'third_party/google/protobuf/descriptor_database.py',
Expand Down
76 changes: 73 additions & 3 deletions appengine/swarming/server/bot_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

MAX_MEMCACHED_SIZE_BYTES = 1000000
BOT_CODE_NS = 'bot_code'

### Models.

Expand Down Expand Up @@ -222,7 +224,7 @@ def get_swarming_bot_zip(host):
A string representing the zipped file's contents.
"""
version, additionals = get_bot_version(host)
content = memcache.get('code-' + version, namespace='bot_code')
content = get_cached_swarming_bot_zip(version)
if content:
logging.debug('memcached bot code %s; %d bytes', version, len(content))
return content
Expand All @@ -236,12 +238,80 @@ def get_swarming_bot_zip(host):
content, version = bot_archive.get_swarming_bot_zip(
bot_dir, host, utils.get_app_version(), additionals,
local_config.settings().enable_ts_monitoring)
# This is immutable so not no need to set expiration time.
memcache.set('code-' + version, content, namespace='bot_code')
logging.info('generated bot code %s; %d bytes', version, len(content))
cache_swarming_bot_zip(version, content)
return content


def get_cached_swarming_bot_zip(version):
"""Returns the bot contents if its been cached, or None if missing."""
# see cache_swarming_bot_zip for how the "meta" entry is set
meta = bot_memcache_get(version, 'meta').get_result()
if meta is None:
logging.info('memcache did not include metadata for version %s', version)
return None
num_parts, true_sig = meta.split(':')

# Get everything asynchronously. If something's missing, the hash will be
# wrong so no need to check that we got something from each call.
futures = [bot_memcache_get(version, 'content', p)
for p in range(int(num_parts))]
content = ''
for f in futures:
chunk = f.get_result()
if chunk is None:
logging.error('bot code %s was missing some of its contents', version)
return None
content += chunk
h = hashlib.sha256()
h.update(content)
if h.hexdigest() != true_sig:
logging.error('bot code %s had signature %s instead of expected %s',
version, h.hexdigest(), true_sig)
return None
return content


def cache_swarming_bot_zip(version, content):
"""Caches the bot code to memcache."""
h = hashlib.sha256()
h.update(content)
p = 0
futures = []
while len(content) > 0:
chunk_size = min(MAX_MEMCACHED_SIZE_BYTES, len(content))
futures.append(bot_memcache_set(content[0:chunk_size],
version, 'content', p))
content = content[chunk_size:]
p += 1
meta = "%s:%s" % (p, h.hexdigest())
for f in futures:
f.check_success()
bot_memcache_set(meta, version, 'meta').check_success()
logging.info('bot %s with sig %s saved in memcached in %d chunks',
version, h.hexdigest(), p)


def bot_memcache_get(version, desc, part=None):
"""Mockable async memcache getter."""
return ndb.get_context().memcache_get(bot_key(version, desc, part),
namespace=BOT_CODE_NS)


def bot_memcache_set(value, version, desc, part=None):
"""Mockable async memcache setter."""
return ndb.get_context().memcache_set(bot_key(version, desc, part),
value, namespace=BOT_CODE_NS)


def bot_key(version, desc, part=None):
"""Returns a memcache key for bot entries."""
key = 'code-%s-%s' % (version, desc)
if part is not None:
key = '%s-%d' % (key, part)
return key


### Bootstrap token.


Expand Down
34 changes: 34 additions & 0 deletions appengine/swarming/server/bot_code_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import subprocess
import sys
import tempfile
import time
import unittest
import zipfile

import test_env
test_env.setup_test_env()

from google.appengine.ext import ndb
from components import auth
from test_support import test_case

Expand All @@ -40,6 +42,7 @@ def setUp(self):
auth, 'get_current_identity',
lambda: auth.Identity(auth.IDENTITY_USER, 'joe@localhost'))


def test_get_bootstrap(self):
def get_self_config_mock(path, revision=None, store_last_good=False):
self.assertEqual('scripts/bootstrap.py', path)
Expand Down Expand Up @@ -94,7 +97,37 @@ def test_get_bot_version(self):
self.assertEqual(expected, additionals)

def test_get_swarming_bot_zip(self):
local_mc = {'store': {}, 'reads': 0, 'writes': 0}

@ndb.tasklet
def mock_memcache_get(version, desc, part=None):
value = local_mc['store'].get(bot_code.bot_key(version, desc, part))
if value is not None:
local_mc['reads'] += 1
raise ndb.Return(value)

@ndb.tasklet
def mock_memcache_set(value, version, desc, part=None):
local_mc['writes'] += 1
key = bot_code.bot_key(version, desc, part)
local_mc['store'][key] = value
return ndb.Return(None)

self.mock(bot_code, 'bot_memcache_set', mock_memcache_set)
self.mock(bot_code, 'bot_memcache_get', mock_memcache_get)
self.mock(bot_code, 'MAX_MEMCACHED_SIZE_BYTES', 100000)

self.assertEqual(0, local_mc['writes'])
zipped_code = bot_code.get_swarming_bot_zip('http://localhost')
self.assertEqual(0, local_mc['reads'])
self.assertNotEqual(0, local_mc['writes'])

# Make sure that we read from memcached if we get it again
zipped_code_copy = bot_code.get_swarming_bot_zip('http://localhost')
self.assertEqual(local_mc['writes'], local_mc['reads'])
# Why not assertEqual? Don't want to dump ~1MB of data if this fails.
self.assertTrue(zipped_code == zipped_code_copy)

# Ensure the zip is valid and all the expected files are present.
with zipfile.ZipFile(StringIO.StringIO(zipped_code), 'r') as zip_file:
for i in bot_archive.FILES:
Expand All @@ -120,6 +153,7 @@ def test_get_swarming_bot_zip(self):
finally:
file_path.rmtree(temp_dir)


def test_bootstrap_token(self):
tok = bot_code.generate_bootstrap_token()
self.assertEqual(
Expand Down
Loading

0 comments on commit e3c4ba9

Please sign in to comment.