Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding supports of job resource filters when access hubstorage #372

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ matrix:
language: generic
env:
- TOX_ENV=py27
- PYTHON_VERSION='2.7'
- PYTHON_VERSION='2.7.9'
- os: osx
language: generic
env:
Expand All @@ -40,7 +40,8 @@ branches:
before_install: |
if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then
# From https://pythonhosted.org/CodeChat/.travis.yml.html
brew install pyenv-virtualenv
# Homebrew currently fails after updating. See also: https://discuss.circleci.com/t/brew-install-fails-while-updating/32992/4
HOMEBREW_NO_AUTO_UPDATE=1 brew install pyenv-virtualenv
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"
# See https://github.com/travis-ci/travis-ci/issues/4834, but
Expand Down
6 changes: 4 additions & 2 deletions requirements.in
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
click
docker-py
PyYAML
requests
#PyYAML
#requests
retrying
six
tqdm

scrapinghub>=2.0.3

pip<19.3

# address known vulnerabilities
requests>=2.20.0 # CVE-2018-18074
pyyaml>=4.2b1 # CVE-2017-18342
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ six==1.10.0
tqdm==4.11.2
urllib3==1.25.3 # via requests
websocket-client==0.37.0 # via docker-py

# The following packages are considered to be unsafe in a requirements file:
# pip==19.2.3
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
install_requires=[
'click',
'docker-py',
'pip',
'pip<19.3',
'PyYAML',
'retrying',
'requests',
Expand Down
13 changes: 11 additions & 2 deletions shub/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
by providing the -f flag:

shub items -f 2/15

Additional filters may be applied to the query:

shub items 12345/2/15 --filter '["foo","exists",[]]'
"""

SHORT_HELP = "Fetch items from Scrapy Cloud"
Expand All @@ -40,8 +44,13 @@
@click.option('-f', '--follow', help='output new items as they are scraped',
is_flag=True)
@click.option('-n', '--tail', help='output last N items only', type=int)
def cli(job_id, follow, tail):
@click.option('--filter', 'filter_', help='filter to be applied to the query')
@click.option('--filter_type', default='filter',
type=click.Choice(['filter', 'filterall', 'filterany']),
help='type of filter to be applied')
def cli(job_id, follow, tail, filter_, filter_type):
job = get_job(job_id)
for item in job_resource_iter(job, job.items, output_json=True,
follow=follow, tail=tail):
follow=follow, tail=tail, filter_=filter_,
filter_type=filter_type):
click.echo(item)
10 changes: 8 additions & 2 deletions shub/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
providing the -f flag:

shub log -f 2/15

Additional filters may be applied to the query:

shub log 12345/2/15 --filter '["level",">=",["20"]]' # loglevel>=INFO
"""

SHORT_HELP = "Fetch log from Scrapy Cloud"
Expand All @@ -43,9 +47,11 @@
'produced', is_flag=True)
@click.option('-n', '--tail', help='output last N log entries only', type=int)
@click.option('--json', 'json_', help='output log entries in JSON', is_flag=True, default=False)
def cli(job_id, follow, tail, json_):
@click.option('--filter', 'filter_', help='filter to be applied to the query')
def cli(job_id, follow, tail, json_, filter_):
job = get_job(job_id)
for item in job_resource_iter(job, job.logs, follow=follow, tail=tail, output_json=json_):
for item in job_resource_iter(job, job.logs, follow=follow, tail=tail, output_json=json_,
filter_=filter_):
if json_:
click.echo(item)
else:
Expand Down
9 changes: 7 additions & 2 deletions shub/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
by providing the -f flag:

shub requests -f 2/15

Additional filters may be applied to the query:

shub requests 12345/2/15 --filter '["url","icontains",["foo"]]'
"""

SHORT_HELP = "Fetch requests from Scrapy Cloud"
Expand All @@ -40,8 +44,9 @@
@click.option('-f', '--follow', help='output new requests as they are made',
is_flag=True)
@click.option('-n', '--tail', help='output last N requests only', type=int)
def cli(job_id, follow, tail):
@click.option('--filter', 'filter_', help='filter to be applied to the query')
def cli(job_id, follow, tail, filter_):
job = get_job(job_id)
for item in job_resource_iter(job, job.requests, output_json=True,
follow=follow, tail=tail):
follow=follow, tail=tail, filter_=filter_):
click.echo(item)
17 changes: 13 additions & 4 deletions shub/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ def job_live(job, refresh_meta_after=60):


def job_resource_iter(job, resource, output_json=False, follow=True,
tail=None):
tail=None, filter_=None, filter_type=None):
"""
Given a python-hubstorage job and resource (e.g. job.items), return a
generator that periodically checks the job resource and yields its items.
Expand All @@ -549,17 +549,24 @@ def job_resource_iter(job, resource, output_json=False, follow=True,
last_item_key = '{}/{}'.format(job.key, last_item)
if not job_live(job):
follow = False
# XXX: Some simple validations for the filter value?
api_params = {
# It's okay to have null-values included here since the underlying
# package would have it removed
'startafter': last_item_key,
filter_type or 'filter': filter_,
}
resource_iter = resource.iter_json if output_json else resource.iter_values
if not follow:
for item in resource_iter(startafter=last_item_key):
for item in resource_iter(**api_params):
yield item
return
while True:
# XXX: Always use iter_json until Kumo team fixes iter_values to also
# return '_key'
for json_line in resource.iter_json(startafter=last_item_key):
for json_line in resource.iter_json(**api_params):
item = json.loads(json_line)
last_item_key = item['_key']
api_params['startafter'] = item['_key']
yield json_line if output_json else item
if not job_live(job):
break
Expand Down Expand Up @@ -637,6 +644,8 @@ def download_from_pypi(dest, pkg=None, reqfile=None, extra_args=None):
no_wheel = ['--no-binary=:all:']
if pip_version >= LooseVersion('8'):
cmd = 'download'
if pip_version >= LooseVersion('19.3'):
raise NotImplementedError('Expecting pip<19.3')
with patch_sys_executable():
pip_main([cmd, '-d', dest, '--no-deps'] + no_wheel + extra_args +
target)
Expand Down
15 changes: 15 additions & 0 deletions tests/test_jobresource.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,27 @@ def _test_forwards_follow(self, cmd_mod):
self.runner.invoke(cmd_mod.cli, ('1/2/3', '-f'))
self.assertTrue(mock_jri.call_args[1]['follow'])

def _test_resource_filter(self, cmd_mod, test_filter_type=False):
with mock.patch.object(cmd_mod, 'get_job'), \
mock.patch.object(cmd_mod, 'job_resource_iter', autospec=True) \
as mock_jri:
self.runner.invoke(cmd_mod.cli, ('1/2/3',))
self.assertFalse(mock_jri.call_args[1]['filter_'])
self.runner.invoke(cmd_mod.cli, ('1/2/3', '--filter', '["foo"]'))
self.assertEqual(mock_jri.call_args[1]['filter_'], '["foo"]')
if test_filter_type:
self.runner.invoke(cmd_mod.cli, ('1/2/3', '--filter', '["foo"]', '--filter_type', 'filterall'))
self.assertEqual(mock_jri.call_args[1]['filter_type'], 'filterall')

def test_items(self):
self._test_prints_objects(items, 'items')
self._test_forwards_follow(items)
self._test_resource_filter(items, test_filter_type=True)

def test_requests(self):
self._test_prints_objects(requests, 'requests')
self._test_forwards_follow(requests)
self._test_resource_filter(requests)

def test_log(self):
objects = [
Expand All @@ -65,6 +79,7 @@ def test_log(self):
for idx, line in enumerate(result.output.splitlines()):
self.assertEqual(json.loads(line), objects[idx])
self._test_forwards_follow(log)
self._test_resource_filter(log)

def test_log_unicode(self):
objects = [
Expand Down
16 changes: 12 additions & 4 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,23 +243,26 @@ def magic_iter(*args, **kwargs):
self.assertEqual(kwargs['startafter'], 'jobkey/996')
return iter([])

def jri_result(follow, tail=None):
def jri_result(follow, tail=None, filter_=None, filter_type=None):
return list(utils.job_resource_iter(
job,
job.resource,
follow=follow,
tail=tail,
filter_=filter_,
filter_type=filter_type,
output_json=True,
))

job.resource.iter_json = magic_iter
job.resource.iter_json = Mock(wraps=magic_iter)

magic_iter.stage = 0
self.assertEqual(jri_result(False), make_items([1, 2, 3]))
self.assertFalse(mock_sleep.called)

magic_iter.stage = 0
self.assertEqual(jri_result(True), make_items([1, 2, 3, 4, 5, 6]))
self.assertEqual(jri_result(True, filter_='["foo"]'), make_items([1, 2, 3, 4, 5, 6]))
self.assertEqual(job.resource.iter_json.call_args[1]['filter'], '["foo"]')
self.assertTrue(mock_sleep.called)

magic_iter.stage = 0
Expand All @@ -268,7 +271,8 @@ def jri_result(follow, tail=None):

magic_iter.stage = 2
job.resource.stats.return_value = {'totals': {'input_values': 1000}}
self.assertEqual(jri_result(True, tail=3), [])
self.assertEqual(jri_result(True, tail=3, filter_='["foo"]', filter_type='filterall'), [])
self.assertEqual(job.resource.iter_json.call_args[1]['filterall'], '["foo"]')

@patch('shub.utils.requests.get', autospec=True)
def test_latest_github_release(self, mock_get):
Expand Down Expand Up @@ -371,6 +375,10 @@ def _call(*args, **kwargs):
pipargs = _call('tmpdir', reqfile='req.txt')
self.assertEqual(pipargs.index('-r') + 1, pipargs.index('req.txt'))

# pip>=19.3 shall be unsupported for now
mock_pip.__version__ = '19.3'
self.assertRaises(NotImplementedError, _call, ['tmpdir'], {'pkg': 'shub'})

# Replace deprecated commands in newer versions
mock_pip.__version__ = '7.1.2.dev0'
pipargs = _call('tmpdir', pkg='shub')
Expand Down