From b603e29900ac3b7783e9a1d1c82b203067d5f9eb Mon Sep 17 00:00:00 2001 From: Pablo Aguiar Date: Thu, 10 Apr 2014 16:12:13 -0300 Subject: [PATCH] Add support to search providers (fix #79) To wrap up changes: * Default provider is just a bare-bones-search-in-mysql-using-like * Support ElasticSearch for a faster search * Create a console to setup, initialize and clear a search provider * Adjust *handlers*, models and tests to changes * Create some config entries * Update docs (re #19) Thanks @luizgpsantos for the support :+1: --- .gitignore | 3 + CONTRIBUTING.md | 72 +++++ Makefile | 32 +- holmes/cli.py | 7 + holmes/config/__init__.py | 6 + holmes/handlers/domains.py | 35 +-- holmes/handlers/violation.py | 34 +-- holmes/models/domain.py | 12 +- holmes/models/review.py | 70 ++--- holmes/reviewer.py | 6 +- holmes/search.py | 36 +++ holmes/search_providers/__init__.py | 85 ++++++ holmes/search_providers/elastic.py | 411 ++++++++++++++++++++++++++ holmes/search_providers/noexternal.py | 70 +++++ holmes/server.py | 8 + holmes/worker.py | 4 + setup.py | 3 + tests/unit/base.py | 19 ++ tests/unit/handlers/test_domains.py | 58 +++- tests/unit/handlers/test_violation.py | 127 +++++++- 20 files changed, 979 insertions(+), 119 deletions(-) create mode 100644 holmes/search.py create mode 100644 holmes/search_providers/__init__.py create mode 100644 holmes/search_providers/elastic.py create mode 100644 holmes/search_providers/noexternal.py diff --git a/.gitignore b/.gitignore index 3942d88..38f260a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ *.py[cod] +# PIDs +*.pid + # C extensions *.so diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 2814ca9..d396375 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -36,3 +36,75 @@ To test: To test without migrations: make redis_test unit + + +ElasticSearch +------------- + +Holmes supports ElasticSearch for faster searches on big databases. + +### Installing and running + +After having ES properly installed and configured, *optionally* run: + +```bash +make elasticsearch # to start the ES server as daemon +``` + +To shut it down later, run: + +```bash +make kill_elasticsearch # to kill the ES server daemon +``` + +### Overriding default configuration (local.conf) + +Bear in mind that, for testing purposes, overriding these variables is optional. + +#### Optional configurations + +To set it as the default search provider: + +```conf +SEARCH_PROVIDER = 'holmes.search_providers.elastic.ElasticSearchProvider' +``` + +If -- and only if -- ES runs on a host and/or port other than localhost:9200, set one of or both the following variables accordingly: + +```conf +ELASTIC_SEARCH_HOST = 'HOST' # hostname or IP address +ELASTIC_SEARCH_PORT = PORT # default is 9200 +``` + +Should you need or want to use a different index name, just set it at your own will: + +```conf +ELASTIC_SEARCH_INDEX = 'INDEX' # name of the index +``` + +### Setting up + +Prior to running the API, setup the index and optionally index all the active reviews: + +```bash +make elasticsearch_setup # to create the index +make elasticsearch_index # to index all active reviews (optional, may take too long) +``` + +### Testing + +Tests **expect** elasticsearch to be **running** on the default port **9200**. The index name is `holmes-test`. So, to test: + +```bash +make test # this creates the test index for you +``` + +or + +```bash +make elasticsearch_drop_test # to delete the test index +make elasticsearch_setup_test # to create the test index +make unit # to run unit tests +``` + +**Happy contributing!** diff --git a/Makefile b/Makefile index 3829742..6427aa5 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -test: redis_test drop_test data_test unit integration kill_run +test: redis_test drop_test data_test elasticsearch_drop_test elasticsearch_setup_test unit integration kill_run unit: @coverage run --branch `which nosetests` -vv --with-yanc -s tests/unit/ @@ -55,6 +55,36 @@ data db: data_test: @cd tests/ && alembic upgrade head +kill_elasticsearch: + -@pkill -F elasticsearch.pid + +elasticsearch: kill_elasticsearch + elasticsearch -d -p elasticsearch.pid + +elasticsearch_setup: + @python holmes/search_providers/elastic.py -vv -c ./holmes/config/local.conf --create + +elasticsearch_drop: + @python holmes/search_providers/elastic.py -vv -c ./holmes/config/local.conf --delete + +elasticsearch_index: + @python holmes/search_providers/elastic.py -vv -c ./holmes/config/local.conf --all-keys + +elasticsearch_setup_test: + @python holmes/search_providers/elastic.py -vv -c ./holmes/config/local.conf --create --index holmes-test + +elasticsearch_drop_test: + @python holmes/search_providers/elastic.py -vv -c ./holmes/config/local.conf --delete --index holmes-test + +search_setup: + @holmes-search -vv -c ./holmes/config/local.conf --create + +search_drop: + @holmes-search -vv -c ./holmes/config/local.conf --delete + +search_index: + @holmes-search -vv -c ./holmes/config/local.conf --all-keys + migration: @cd holmes/ && alembic revision -m "$(DESC)" diff --git a/holmes/cli.py b/holmes/cli.py index 3b5df59..1fe998a 100644 --- a/holmes/cli.py +++ b/holmes/cli.py @@ -37,6 +37,13 @@ def log(self, message, level=logging.info): def load_error_handlers(self): return load_classes(default=self.config.ERROR_HANDLERS) + def load_search_provider(self): + search_provider = load_classes(default=[self.config.SEARCH_PROVIDER]) + if isinstance(search_provider, list) and len(search_provider) == 1: + return search_provider.pop() + else: + raise Exception('A search provider must be defined!') + def get_config_class(self): return Config diff --git a/holmes/config/__init__.py b/holmes/config/__init__.py index 1b5bdcf..db5cd72 100644 --- a/holmes/config/__init__.py +++ b/holmes/config/__init__.py @@ -97,6 +97,12 @@ Config.define('ERROR_HANDLERS', [], 'List of classes to handle errors', 'General') +Config.define('SEARCH_PROVIDER', 'holmes.search_providers.noexternal.NoExternalSearchProvider', 'Class to handle searching', 'Models') + +Config.define('ELASTIC_SEARCH_HOST', 'localhost', 'ElasticSearch host', 'ElasticSearchProvider') +Config.define('ELASTIC_SEARCH_PORT', 9200, 'ElasticSearch port', 'ElasticSearchProvider') +Config.define('ELASTIC_SEARCH_INDEX', 'holmes', 'ElasticSearch index name', 'ElasticSearchProvider') + # SENTRY ERROR HANDLER Config.define('USE_SENTRY', False, 'If set to true errors will be sent to sentry.', 'Sentry') Config.define('SENTRY_DSN_URL', '', 'URL to use as sentry DSN.', 'Sentry') diff --git a/holmes/handlers/domains.py b/holmes/handlers/domains.py index 4988176..9fbd810 100644 --- a/holmes/handlers/domains.py +++ b/holmes/handlers/domains.py @@ -113,33 +113,22 @@ def get(self, domain_name): self.set_status(404, 'Domain %s not found' % domain_name) return - reviews = domain.get_active_reviews( - self.db, - url_starts_with=term, + reviews = yield self.application.search_provider.get_domain_active_reviews( + domain=domain, current_page=current_page, - page_size=page_size + page_size=page_size, + page_filter=term, ) - if term: - review_count = len(reviews) - else: - review_count = yield self.cache.get_active_review_count(domain) + if 'reviewsCount' not in reviews: + if term: + reviews['reviewsCount'] = len(reviews['pages']) + else: + # TODO: Materialize this + reviews['reviewsCount'] = yield self.cache.get_active_review_count(domain) - result = { - 'reviewCount': review_count, - 'pages': [], - } - - for page in reviews: - result['pages'].append({ - "url": page.url, - "uuid": str(page.uuid), - "violationCount": page.violations_count, - "completedAt": page.last_review_date, - "reviewId": str(page.last_review_uuid) - }) - - self.write_json(result) + self.write_json(reviews) + self.finish() class DomainGroupedViolationsHandler(BaseHandler): diff --git a/holmes/handlers/violation.py b/holmes/handlers/violation.py index 3882403..4d102e5 100644 --- a/holmes/handlers/violation.py +++ b/holmes/handlers/violation.py @@ -40,38 +40,22 @@ def get(self, key_name): violation_title = violations[key_name]['title'] key_id = violations[key_name]['key'].id - reviews = Review.get_by_violation_key_name( - self.db, - key_id, + violation = yield self.application.search_provider.get_by_violation_key_name( + key_id=key_id, current_page=current_page, page_size=page_size, domain_filter=domain_filter, page_filter=page_filter, ) - reviews_count = Review.count_by_violation_key_name( - self.db, - key_id, - domain_filter=domain_filter, - page_filter=page_filter - ) - - reviews_data = [] - for item in reviews: - reviews_data.append({ - 'uuid': item.review_uuid, - 'page': { - 'uuid': item.page_uuid, - 'url': item.url, - 'completedAt': item.completed_date - } - }) + if 'reviewsCount' not in violation: + if domain_filter or page_filter: + violation['reviewsCount'] = len(violation['reviews']) + else: + # TODO: Materialize this + violation['reviewsCount'] = Review.count_by_violation_key_name(self.db, key_id) - violation = { - 'title': violation_title, - 'reviews': reviews_data, - 'reviewsCount': reviews_count - } + violation['title'] = violation_title self.write_json(violation) self.finish() diff --git a/holmes/models/domain.py b/holmes/models/domain.py index 4d37685..90a64a7 100644 --- a/holmes/models/domain.py +++ b/holmes/models/domain.py @@ -102,7 +102,7 @@ def get_violations_per_day(self, db): return result - def get_active_reviews(self, db, url_starts_with=None, current_page=1, page_size=10): + def get_active_reviews(self, db, page_filter=None, current_page=1, page_size=10): from holmes.models import Page # Prevent circular dependency lower_bound = (current_page - 1) * page_size @@ -116,8 +116,8 @@ def get_active_reviews(self, db, url_starts_with=None, current_page=1, page_size .filter(Page.last_review_date != None) \ .filter(Page.domain == self) - if url_starts_with: - items_query = items_query.filter(Page.url.like('%s%%' % url_starts_with)) + if page_filter: + items_query = items_query.filter(Page.url.like('%s/%s%%' % (self.url, page_filter))) items = items_query.order_by('violations_count desc')[lower_bound:upper_bound] @@ -127,14 +127,14 @@ def get_active_reviews(self, db, url_starts_with=None, current_page=1, page_size def get_domain_by_name(self, domain_name, db): return db.query(Domain).filter(Domain.name == domain_name).first() - def get_active_review_count(self, db, url_starts_with=None): + def get_active_review_count(self, db, page_filter=None): from holmes.models import Review, Page query = db.query(func.count(Review.id)) - if url_starts_with: + if page_filter: query = query.join(Page, Page.id == Review.page_id) \ - .filter(Page.url.like('%s%%' % url_starts_with)) + .filter(Page.url.like('%s/%s%%' % (self.url, page_filter))) query = query.filter(Review.is_active == True, Review.domain_id == self.id) diff --git a/holmes/models/review.py b/holmes/models/review.py index b895e39..e1556fe 100644 --- a/holmes/models/review.py +++ b/holmes/models/review.py @@ -120,34 +120,6 @@ def by_uuid(cls, uuid, db): def violation_count(self): return len(self.violations) - @classmethod - def _filter_violation_key_name(cls, db, query, key_id, domain_filter=None, page_filter=None): - from holmes.models.domain import Domain # to avoid circular dependency - from holmes.models.page import Page # to avoid circular dependency - from holmes.models.violation import Violation # to avoid circular dependency - - query = query \ - .filter(Page.id == Review.page_id) \ - .filter(Violation.review_id == Review.id) \ - .filter(Review.is_active == 1) \ - .filter(Violation.key_id == key_id) - - page_filter_prefix = '%' - if domain_filter: - domain = Domain.get_domain_by_name(domain_filter, db) - if domain: - query = query.filter(Review.domain_id == domain.id) - page_filter_prefix = domain.url - - if page_filter: - query = query.filter( - Page.url.like( - '{0}{1}%'.format(page_filter_prefix, page_filter) - ) - ) - - return query - @classmethod def count_by_violation_key_name(cls, db, key_id, domain_filter=None, page_filter=None): from holmes.models.review import Review # to avoid circular dependency @@ -158,23 +130,21 @@ def count_by_violation_key_name(cls, db, key_id, domain_filter=None, page_filter .filter(Violation.review_is_active == 1) \ .filter(Violation.key_id == key_id) - page_filter_prefix = '%' if domain_filter: from holmes.models.domain import Domain # to avoid circular dependency domain = Domain.get_domain_by_name(domain_filter, db) if domain: - query = query.filter(Review.domain_id == domain.id) - page_filter_prefix = domain.url - - if page_filter: - from holmes.models.page import Page # to avoid circular dependency - query = query.filter(Review.id == Violation.review_id) \ - .filter(Page.id == Review.page_id) \ - .filter( - Page.url.like( - '{0}{1}%'.format(page_filter_prefix, page_filter) - ) - ) + query = query.filter(Violation.domain_id == domain.id) + + if page_filter: + from holmes.models.page import Page # to avoid circular dependency + query = query.filter(Review.id == Violation.review_id) \ + .filter(Page.id == Review.page_id) \ + .filter( + Page.url.like( + '{0}/{1}%'.format(domain.url, page_filter) + ) + ) # FIXME: Maybe group by review? Considering there should be only one # Violation of a given Key for every Review -- weither active or not @@ -202,20 +172,18 @@ def get_by_violation_key_name(cls, db, key_id, current_page=1, page_size=10, dom .filter(Review.is_active == 1) \ .filter(Page.id == Review.page_id) \ - page_filter_prefix = '%' if domain_filter: from holmes.models.domain import Domain # to avoid circular dependency domain = Domain.get_domain_by_name(domain_filter, db) if domain: query = query.filter(Review.domain_id == domain.id) - page_filter_prefix = domain.url - if page_filter: - query = query.filter( - Page.url.like( - '{0}{1}%'.format(page_filter_prefix, page_filter) - ) - ) + if page_filter: + query = query.filter( + Page.url.like( + '{0}/{1}%'.format(domain.url, page_filter) + ) + ) # FIXME: Maybe group by review? Considering there should be only one # Violation of a given Key for every Review -- weither active or not @@ -223,7 +191,7 @@ def get_by_violation_key_name(cls, db, key_id, current_page=1, page_size=10, dom return query.order_by(Review.completed_date.desc())[lower_bound:upper_bound] @classmethod - def save_review(cls, page_uuid, review_data, db, fact_definitions, violation_definitions, cache, publish, config): + def save_review(cls, page_uuid, review_data, db, search_provider, fact_definitions, violation_definitions, cache, publish, config): from holmes.models import Page page = Page.by_uuid(page_uuid, db) @@ -290,6 +258,8 @@ def save_review(cls, page_uuid, review_data, db, fact_definitions, violation_def Review.delete_old_reviews(db, config, page) + search_provider.index_review(review) + publish(dumps({ 'type': 'new-review', 'reviewId': str(review.uuid) diff --git a/holmes/reviewer.py b/holmes/reviewer.py index 244c717..eecdc13 100644 --- a/holmes/reviewer.py +++ b/holmes/reviewer.py @@ -64,7 +64,7 @@ def to_dict(self): class Reviewer(object): def __init__( self, api_url, page_uuid, page_url, page_score, - config=None, validators=[], facters=[], async_get=None, + config=None, validators=[], facters=[], search_provider=None, async_get=None, wait=None, wait_timeout=None, db=None, cache=None, publish=None, fact_definitions=None, violation_definitions=None): @@ -98,6 +98,8 @@ def __init__( self.validators = validators self.facters = facters + self.search_provider = search_provider + self.responses = {} self.raw_responses = {} self.status_codes = {} @@ -283,7 +285,7 @@ def save_review(self): data = self.review_dao.to_dict() Review.save_review( - self.page_uuid, data, self.db, + self.page_uuid, data, self.db, self.search_provider, self.fact_definitions, self.violation_definitions, self.cache, self.publish, self.config ) diff --git a/holmes/search.py b/holmes/search.py new file mode 100644 index 0000000..9d02559 --- /dev/null +++ b/holmes/search.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import sys +import logging + +from derpconf.config import ConfigurationError + +from holmes.config import Config +from holmes.utils import load_classes +from holmes.search_providers import SearchProvider + + +def main(): + parser = SearchProvider.argparser() + args = parser.parse_args() + try: + config = Config() + if args.conf: + config = config.load(args.conf[0]) + search_providers = load_classes(default=[config['SEARCH_PROVIDER']]) + if isinstance(search_providers, list) and len(search_providers) == 1: + search_provider = search_providers.pop() + search_provider.main() + else: + logging.error('Could not instantiate search provider!') + sys.exit(1) + except ConfigurationError: + logging.error('Could not load config! Use --conf conf_file') + sys.exit(1) + except KeyError: + logging.error('Could not parse config! Check it\'s contents') + sys.exit(1) + +if __name__ == '__main__': + main() diff --git a/holmes/search_providers/__init__.py b/holmes/search_providers/__init__.py new file mode 100644 index 0000000..2ee8fdc --- /dev/null +++ b/holmes/search_providers/__init__.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from tornado.concurrent import return_future + + +class SearchProvider(object): + def __init__(self, config, db, io_loop=None): + raise NotImplementedError() + + def index_review(self, review): + raise NotImplementedError() + + @return_future + def get_by_violation_key_name(self, key_id, current_page=1, page_size=10, domain_filter=None, page_filter=None, callback=None): + raise NotImplementedError() + + @return_future + def get_domain_active_reviews(self, domain, current_page=1, page_size=10, page_filter=None, callback=None): + raise NotImplementedError() + + @classmethod + def argparser(cls): + import argparse + + parser = argparse.ArgumentParser(description='Setup Holmes index on an ElasticSearch server') + parser.add_argument( + '-c', '--conf', + nargs=1, + metavar='conf_file', + help='path to configuration file' + ) + parser.add_argument( + '-s', '--server', + nargs=1, + metavar='host:port', + help='elastic search server host and port' + ) + parser.add_argument( + '-i', '--index', + nargs=1, + metavar='index_name', + help='name of the index' + ) + parser.add_argument( + '--create', + action='store_true', + help='create the index' + ) + parser.add_argument( + '--recreate', + action='store_true', + help='recreate the index (use with caution)' + ) + parser.add_argument( + '--delete', + action='store_true', + help='delete the index (use with caution)' + ) + parser.add_argument( + '-k', '--keys', + nargs='+', + metavar='key', + help='index reviews with violation of such keys' + ) + parser.add_argument( + '-a', '--all-keys', + action='store_true', + help='index all reviews with at least one violation of any key (might take long)' + ) + parser.add_argument( + '-v', '--verbose', + action='count', + default=0, + help='log level: v=warning, vv=info, vvv=debug' + ) + + return parser + + @classmethod + def main(cls): + raise NotImplementedError() + +if __name__ == '__main__': + SearchProvider.main() diff --git a/holmes/search_providers/elastic.py b/holmes/search_providers/elastic.py new file mode 100644 index 0000000..51a1b7e --- /dev/null +++ b/holmes/search_providers/elastic.py @@ -0,0 +1,411 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from holmes.search_providers import SearchProvider + +from holmes.models.keys import Key +from holmes.models.review import Review +from holmes.models.violation import Violation + +from pyelasticsearch import ElasticSearch +from tornado.concurrent import return_future +from tornadoes import ESConnection +from ujson import loads +from datetime import datetime +from sqlalchemy import func + +import logging + + +class ElasticSearchProvider(SearchProvider): + def __init__(self, config, db=None, io_loop=None): + self.debug = False + self.config = config + if db is not None: + self.db = db + self.syncES = ElasticSearch('http://%(ELASTIC_SEARCH_HOST)s:%(ELASTIC_SEARCH_PORT)s' % config) + self.asyncES = ESConnection( + config.get('ELASTIC_SEARCH_HOST'), + config.get('ELASTIC_SEARCH_PORT'), + io_loop=io_loop + ) + self.index = config.get('ELASTIC_SEARCH_INDEX') + + def activate_debug(self): + self.debug = True + + def connect_to_db(self): + from sqlalchemy import create_engine + from sqlalchemy.orm import scoped_session, sessionmaker + conn_string = self.config.get('SQLALCHEMY_CONNECTION_STRING') + engine = create_engine( + conn_string, + convert_unicode=True, + pool_size=1, + max_overflow=0, + echo=self.debug + ) + maker = sessionmaker(bind=engine, autoflush=True) + self.db = scoped_session(maker) + + def _get_domain_by_name(self, domain_filter): + if domain_filter: + from holmes.models.domain import Domain + return Domain.get_domain_by_name(domain_filter, self.db) or None + return None + + def _assemble_inner_query(self, domain=None, page_filter=None): + if page_filter and domain: + page_prefix = '%s/%s' % (domain.url, page_filter) + else: + page_prefix = None + + if page_prefix: + return { + 'prefix': { + 'page_url': page_prefix + } + } + else: + return { + 'match_all': {} + } + + def _assemble_outer_query(self, inner_query, filter_terms): + return { + 'filtered': { + 'query': inner_query, + 'filter': { + 'and': [{ + 'term': filter_term + } for filter_term in filter_terms] + } + } + } + + def _assemble_filter_terms(self, key_id=None, domain=None): + filter_terms = [] + + if key_id: + filter_terms.append({'keys.id': key_id}) + + if domain: + filter_terms.append({'domain_id': domain.id}) + + return filter_terms + + def _assemble_function_score(self, inner_query, filter_terms, params=None, script=None): + query = { + 'function_score': { + 'query': self._assemble_outer_query(inner_query, filter_terms) + } + } + + if params or script: + query['function_score']['script_score'] = {} + + if params: + query['function_score']['script_score']['params'] = params + + if script: + query['function_score']['script_score']['script'] = script + + return query + + def gen_doc(self, review): + return { + 'keys': [{'id': violation.key_id} for violation in review.violations], + 'uuid': str(review.uuid), + 'completed_date': review.completed_date, + 'violation_count': review.violation_count, + 'page_id': review.page_id, + 'page_uuid': str(review.page.uuid), + 'page_url': review.page.url, + 'page_last_review_date': review.page.last_review_date, + 'domain_id': review.domain_id, + } + + def index_review(self, review): + self.syncES.index(index=self.index, doc_type='review', id=review.page_id, doc=self.gen_doc(review)) + + def index_reviews(self, reviews, reviews_count, batch_size): + for i in xrange(0, reviews_count, batch_size): + + docs = [] + for review in reviews[i:i + batch_size]: + docs.append(self.gen_doc(review)) + + self.syncES.bulk_index(index=self.index, doc_type='review', docs=docs, id_field='page_id') + + logging.info('Done!') + + @return_future + def get_by_violation_key_name(self, key_id, current_page=1, page_size=10, domain_filter=None, page_filter=None, callback=None): + def treat_response(response): + if response.error is None: + hits = loads(response.body).get('hits', {'hits': []}) + + reviews_data = [] + for hit in hits['hits']: + reviews_data.append({ + 'uuid': hit['_source']['uuid'], + 'page': { + 'uuid': hit['_source']['page_uuid'], + 'url': hit['_source']['page_url'], + 'completedAt': datetime.strptime(hit['_source']['completed_date'], '%Y-%m-%dT%H:%M:%S') + } + }) + + reviews_count = hits.get('total', 0) + + callback({ + 'reviews': reviews_data, + 'reviewsCount': reviews_count + }) + else: + logging.error('ElasticSearchProvider error: %s (%s)' % (response.error.message, response.body)) + raise response.error + + domain = self._get_domain_by_name(domain_filter) + + inner_query = self._assemble_inner_query(domain, page_filter) + filter_terms = self._assemble_filter_terms(key_id, domain) + + params = {'half_max_size': 23} + script = '_score * exp(doc[\"violation_count\"].value / half_max_size)' + + query = self._assemble_function_score(inner_query, filter_terms, params, script) + + self.asyncES.search( + callback=treat_response, + index=self.index, + type='review', + source={'query': query}, + page=current_page, + size=page_size, + ) + + @return_future + def get_domain_active_reviews(self, domain, current_page=1, page_size=10, page_filter=None, callback=None): + def treat_response(response): + if response.error is None: + hits = loads(response.body).get('hits', {'hits': []}) + + pages = [] + for hit in hits['hits']: + pages.append({ + 'url': hit['_source']['page_url'], + 'uuid': hit['_source']['page_uuid'], + 'violationCount': len(hit['_source']['keys']), + 'completedAt': datetime.strptime(hit['_source']['page_last_review_date'], '%Y-%m-%dT%H:%M:%S'), + 'reviewId': hit['_source']['uuid'] + }) + + reviews_count = hits.get('total', 0) + + callback({ + 'reviewsCount': reviews_count, + 'pages': pages + }) + else: + logging.error('ElasticSearchProvider error: %s' % response.error.message) + raise response.error + + inner_query = self._assemble_inner_query(domain=domain, page_filter=page_filter) + filter_terms = self._assemble_filter_terms(domain=domain) + + params = {'half_max_size': 23} + script = '_score * exp(doc[\"violation_count\"].value / half_max_size)' + + query = self._assemble_function_score(inner_query, filter_terms, params, script) + + self.asyncES.search( + callback=treat_response, + index=self.index, + type='review', + source={'query': query}, + page=current_page, + size=page_size, + ) + + def refresh(self): + self.syncES.refresh(index=self.index) + + def get_index_settings(cls): + return { + 'index': { + 'number_of_shards': 4 + } + } + + def get_index_mapping(cls): + return { + 'review': { + 'properties': { + 'keys': { + 'properties': { + 'id': { + 'type': 'integer' + } + } + }, + 'uuid': { + 'type': 'string', + 'index': 'not_analyzed' + }, + 'completed_date': { + 'type': 'date' + }, + 'violation_count': { + 'type': 'integer' + }, + 'page_id': { + 'type': 'integer' + }, + 'page_uuid': { + 'type': 'string', + 'index': 'not_analyzed' + }, + 'page_url': { + 'type': 'string', + 'index': 'not_analyzed' + }, + 'page_last_review_date': { + 'type': 'date' + }, + 'domain_id': { + 'type': 'integer' + } + } + } + } + + def setup_index(self): + try: + settings = self.get_index_settings() + self.syncES.create_index(index=self.index, settings=settings) + mapping = self.get_index_mapping() + self.syncES.put_mapping(index=self.index, doc_type='review', mapping=mapping) + logging.info('Index %s created.' % self.index) + except Exception, e: + raise e + + def delete_index(self): + try: + self.syncES.delete_index(index=self.index) + logging.info('Index %s deleted.' % self.index) + except Exception, e: + raise e + + def index_all_reviews(self, keys=None, batch_size=200): + logging.info('Querying database...') + self.connect_to_db() + + if keys is not None: + keys = [k.id for k in self.db.query(Key.id).filter(Key.name.in_(keys)).all()] + + def _filter(query): + if keys is not None: + query = query.filter(Review.id == Violation.review_id).filter(Violation.key_id.in_(keys)) + return query + + reviews_count = _filter(self.db.query(func.count(Review.id)).filter(Review.is_active == True)).scalar() + + reviews = _filter(self.db.query(Review).filter(Review.is_active == True)) + + logging.info('Indexing %d reviews...' % reviews_count) + + self.index_reviews(reviews, reviews_count, batch_size) + + @classmethod + def main(cls): + import sys + + parser = cls.argparser() + args = parser.parse_args() + + config = {} + host = None + port = None + index = None + es = None + + levels = ['ERROR', 'WARNING', 'INFO', 'DEBUG'] + log_level = levels[args.verbose] + logging.basicConfig(level=getattr(logging, log_level), format='%(levelname)s - %(message)s') + + if not (args.create or args.recreate or args.delete or args.keys or args.all_keys): + parser.print_help() + sys.exit(1) + + if args.conf: + from derpconf.config import ConfigurationError + from holmes.config import Config + try: + config = Config().load(args.conf[0]) + host = config['ELASTIC_SEARCH_HOST'] + port = config['ELASTIC_SEARCH_PORT'] + index = config['ELASTIC_SEARCH_INDEX'] + except ConfigurationError: + logging.error('Could not load config! Use --conf conf_file') + sys.exit(1) + except KeyError: + logging.error('Could not parse config! Check it\'s contents') + sys.exit(1) + + if args.server: + try: + host, port = args.server[0].split(':') + config['ELASTIC_SEARCH_HOST'] = host + config['ELASTIC_SEARCH_PORT'] = port + except Exception: + logging.error('Could not parse server host and port! Use --server host:port') + sys.exit(1) + + if args.index: + index = args.index[0] + config['ELASTIC_SEARCH_INDEX'] = index + + from pyelasticsearch.exceptions import IndexAlreadyExistsError, ElasticHttpNotFoundError + from requests.exceptions import ConnectionError + try: + + if args.create or args.recreate or args.delete: + if host is None or port is None: + logging.error('Need either a host and port or a config file to perform such operation!') + sys.exit(1) + if index is None: + logging.error('Need either an index name or a config file to perform such operation!') + sys.exit(1) + else: + es = ElasticSearchProvider(config) + if args.recreate or args.delete: + try: + es.delete_index() + except ElasticHttpNotFoundError: + pass + if args.create or args.recreate: + es.setup_index() + + if args.keys or args.all_keys: + if config is None: + logging.error('Need a config file to perform such operation! Use --conf conf_file') + else: + es = ElasticSearchProvider(config) if not es else es + if args.verbose > 2: + es.activate_debug() + if args.keys: + es.index_all_reviews(args.keys) + elif args.all_keys: + es.index_all_reviews() + + except IndexAlreadyExistsError: + logging.error('Index %s already exists! Use --recreate (with caution) to recreate' % index) + except ConnectionError: + logging.error('Could not connect to server at %s:%s' % (host, port)) + except KeyError: + logging.error('Could not get host nor port! Use either -conf or --server') + sys.exit(1) + +if __name__ == '__main__': + ElasticSearchProvider.main() diff --git a/holmes/search_providers/noexternal.py b/holmes/search_providers/noexternal.py new file mode 100644 index 0000000..b30d805 --- /dev/null +++ b/holmes/search_providers/noexternal.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from holmes.search_providers import SearchProvider +from holmes.models.review import Review + +from tornado.concurrent import return_future + + +class NoExternalSearchProvider(SearchProvider): + def __init__(self, config, db, io_loop=None): + self.db = db + + def index_review(self, review): + pass + + @return_future + def get_by_violation_key_name(self, key_id, current_page=1, page_size=10, domain_filter=None, page_filter=None, callback=None): + reviews = Review.get_by_violation_key_name( + db=self.db, + key_id=key_id, + current_page=current_page, + page_size=page_size, + domain_filter=domain_filter, + page_filter=page_filter, + ) + + reviews_data = [] + for item in reviews: + reviews_data.append({ + 'uuid': item.review_uuid, + 'page': { + 'uuid': item.page_uuid, + 'url': item.url, + 'completedAt': item.completed_date + } + }) + + callback({ + 'reviews': reviews_data + }) + + @return_future + def get_domain_active_reviews(self, domain, current_page=1, page_size=10, page_filter=None, callback=None): + reviews = domain.get_active_reviews( + db=self.db, + page_filter=page_filter, + current_page=current_page, + page_size=page_size, + ) + + pages = [] + + for page in reviews: + pages.append({ + 'url': page.url, + 'uuid': str(page.uuid), + 'violationCount': page.violations_count, + 'completedAt': page.last_review_date, + 'reviewId': str(page.last_review_uuid) + }) + + callback({'pages': pages}) + + @classmethod + def main(cls): + pass + +if __name__ == '__main__': + NoExternalSearchProvider.main() diff --git a/holmes/server.py b/holmes/server.py index 61790d7..7948c28 100644 --- a/holmes/server.py +++ b/holmes/server.py @@ -136,6 +136,7 @@ def after_start(self, io_loop): self.application.facters = self._load_facters() self.application.validators = self._load_validators() self.application.error_handlers = [handler(self.application.config) for handler in self._load_error_handlers()] + self.application.search_provider = self._load_search_provider()(self.application.config, self.application.db, io_loop) self.application.fact_definitions = {} self.application.violation_definitions = {} @@ -200,6 +201,13 @@ def _load_facters(self): def _load_error_handlers(self): return load_classes(default=self.config.ERROR_HANDLERS) + def _load_search_provider(self): + search_provider = load_classes(default=[self.config.SEARCH_PROVIDER]) + if isinstance(search_provider, list) and len(search_provider) == 1: + return search_provider.pop() + else: + raise Exception('A search provider must be defined!') + def before_end(self, io_loop): self.application.db.remove() diff --git a/holmes/worker.py b/holmes/worker.py index 00ac820..a49caa1 100644 --- a/holmes/worker.py +++ b/holmes/worker.py @@ -115,6 +115,9 @@ def initialize(self): self.error_handlers = [handler(self.config) for handler in self.load_error_handlers()] self.connect_sqlalchemy() + + self.search_provider = self.load_search_provider()(self.config, self.db) + self.connect_to_redis() self.start_otto() @@ -218,6 +221,7 @@ def _start_reviewer(self, job): config=self.config, validators=self.validators, facters=self.facters, + search_provider=self.search_provider, async_get=self.async_get, wait=self.otto.wait, wait_timeout=0, # max time to wait for all requests to finish diff --git a/setup.py b/setup.py index 4bd9361..fa3db2f 100644 --- a/setup.py +++ b/setup.py @@ -61,6 +61,8 @@ 'raven>=4.1.1,<4.2.0', 'rotunicode>=1.0.1,<1.1.0', 'materialgirl>=0.4.2,<0.5.0', + 'pyelasticsearch>=0.6.1,<0.7.0', + 'tornadoes>=2.0.0,<2.1.0', ], extras_require={ 'tests': tests_require, @@ -70,6 +72,7 @@ 'holmes-api=holmes.server:main', 'holmes-worker=holmes.worker:main', 'holmes-material=holmes.material:main', + 'holmes-search=holmes.search:main', ], }, ) diff --git a/tests/unit/base.py b/tests/unit/base.py index 6f31d68..6d8b5ae 100644 --- a/tests/unit/base.py +++ b/tests/unit/base.py @@ -67,6 +67,9 @@ def get_config(self): REDISPORT=57575, MATERIAL_GIRL_REDISHOST='localhost', MATERIAL_GIRL_REDISPORT=57575, + ELASTIC_SEARCH_HOST='localhost', + ELASTIC_SEARCH_PORT=9200, + ELASTIC_SEARCH_INDEX='holmes-test', ) def get_server(self): @@ -94,6 +97,22 @@ def connect_to_sync_redis(self): return SyncCache(self.db, redis, self.server.application.config) + def use_no_external_search_provider(self): + from holmes.search_providers.noexternal import NoExternalSearchProvider + self.server.application.search_provider = NoExternalSearchProvider( + self.server.application.config, + self.db, + self.io_loop + ) + + def use_elastic_search_provider(self): + from holmes.search_providers.elastic import ElasticSearchProvider + self.server.application.search_provider = ElasticSearchProvider( + self.server.application.config, + self.db, + self.io_loop + ) + FILES_ROOT_PATH = abspath(join(dirname(__file__), 'files')) diff --git a/tests/unit/handlers/test_domains.py b/tests/unit/handlers/test_domains.py index ce15b73..6a3cb2a 100644 --- a/tests/unit/handlers/test_domains.py +++ b/tests/unit/handlers/test_domains.py @@ -196,7 +196,9 @@ def test_can_get_violations_per_day(self): class TestDomainReviewsHandler(ApiTestCase): @gen_test - def test_can_get_domain_reviews(self): + def test_can_get_domain_reviews_using_no_external_search_provider(self): + self.use_no_external_search_provider() + dt = datetime(2010, 11, 12, 13, 14, 15) dt_timestamp = calendar.timegm(dt.utctimetuple()) @@ -208,10 +210,10 @@ def test_can_get_domain_reviews(self): page = PageFactory.create(domain=domain, last_review_date=dt) page2 = PageFactory.create(domain=domain, last_review_date=dt2) - ReviewFactory.create(page=page, is_active=True, is_complete=True, completed_date=dt, number_of_violations=20) - ReviewFactory.create(page=page, is_active=False, is_complete=True, completed_date=dt2, number_of_violations=30) - ReviewFactory.create(page=page2, is_active=True, is_complete=True, completed_date=dt2, number_of_violations=30) - ReviewFactory.create(page=page2, is_active=False, is_complete=True, completed_date=dt, number_of_violations=20) + ReviewFactory.create(page=page, is_active=True, is_complete=True, completed_date=dt, number_of_violations=13) + ReviewFactory.create(page=page, is_active=False, is_complete=True, completed_date=dt2, number_of_violations=12) + ReviewFactory.create(page=page2, is_active=True, is_complete=True, completed_date=dt2, number_of_violations=11) + ReviewFactory.create(page=page2, is_active=False, is_complete=True, completed_date=dt, number_of_violations=10) response = yield self.http_client.fetch( self.get_url('/domains/%s/reviews/' % domain.name) @@ -231,6 +233,52 @@ def test_can_get_domain_reviews(self): expect(domain_details['pages'][0]['uuid']).to_equal(str(page.uuid)) expect(domain_details['pages'][0]['completedAt']).to_equal(dt_timestamp) + @gen_test + def test_can_get_domain_reviews_using_elastic_search_provider(self): + self.use_elastic_search_provider() + + dt = datetime(2010, 11, 12, 13, 14, 15) + dt_timestamp = calendar.timegm(dt.utctimetuple()) + + dt2 = datetime(2011, 12, 13, 14, 15, 16) + dt2_timestamp = calendar.timegm(dt2.utctimetuple()) + + domain = DomainFactory.create(url="http://www.domain-details.com", name="domain-details.com") + + page = PageFactory.create(domain=domain, last_review_date=dt) + page2 = PageFactory.create(domain=domain, last_review_date=dt2) + + review = ReviewFactory.create(page=page, is_active=False, is_complete=True, completed_date=dt, number_of_violations=13) + self.server.application.search_provider.index_review(review) + review = ReviewFactory.create(page=page, is_active=True, is_complete=True, completed_date=dt, number_of_violations=12) + self.server.application.search_provider.index_review(review) + review = ReviewFactory.create(page=page2, is_active=False, is_complete=True, completed_date=dt2, number_of_violations=11) + self.server.application.search_provider.index_review(review) + review = ReviewFactory.create(page=page2, is_active=True, is_complete=True, completed_date=dt2, number_of_violations=10) + self.server.application.search_provider.index_review(review) + + self.server.application.search_provider.refresh() + + response = yield self.http_client.fetch( + self.get_url('/domains/%s/reviews/' % domain.name) + ) + + expect(response.code).to_equal(200) + + domain_details = loads(response.body) + + expect(domain_details['pages']).to_length(2) + + expect(domain_details['pages'][0]['url']).to_equal(page.url) + expect(domain_details['pages'][0]['uuid']).to_equal(str(page.uuid)) + expect(domain_details['pages'][0]['completedAt']).to_equal(dt_timestamp) + expect(domain_details['pages'][0]['violationCount']).to_equal(12) + + expect(domain_details['pages'][1]['url']).to_equal(page2.url) + expect(domain_details['pages'][1]['uuid']).to_equal(str(page2.uuid)) + expect(domain_details['pages'][1]['completedAt']).to_equal(dt2_timestamp) + expect(domain_details['pages'][1]['violationCount']).to_equal(10) + @gen_test def test_can_get_domain_reviews_for_next_page(self): dt = datetime(2010, 11, 12, 13, 14, 15) diff --git a/tests/unit/handlers/test_violation.py b/tests/unit/handlers/test_violation.py index 369a123..c642cf8 100644 --- a/tests/unit/handlers/test_violation.py +++ b/tests/unit/handlers/test_violation.py @@ -5,6 +5,8 @@ from tests.unit.base import ApiTestCase from preggy import expect from tornado.testing import gen_test +from datetime import datetime +import calendar from tests.fixtures import ReviewFactory, PageFactory, DomainFactory, KeyFactory, ViolationFactory from holmes.models import Key, Violation @@ -64,19 +66,27 @@ def test_can_get_most_common_violations(self): class TestViolationHandler(ApiTestCase): @gen_test - def test_can_get_violation_by_key_name(self): + def test_can_get_violation_by_key_name_using_no_external_search_provider(self): + self.use_no_external_search_provider() + domains = [DomainFactory.create( name='g%s.com' % chr(i), - url='http://g%s.com/' % chr(i) + url='http://g%s.com' % chr(i) ) for i in xrange(ord('a'), ord('d'))] pages = [PageFactory.create( domain=domains[i % 3], - url='%s%d' % (domains[i % 3].url, i % 2) + url='%s/%d' % (domains[i % 3].url, i % 2) ) for i in xrange(6)] for i, page in enumerate(pages): - ReviewFactory.create(page=page, is_active=True, number_of_violations=i) + ReviewFactory.create( + page=page, + is_active=True, + number_of_violations=i, + created_date=datetime(2014, 04, 15, 11, 44, i), + completed_date=datetime(2014, 04, 15, 11, 44, i * 2), + ) self.db.flush() @@ -88,6 +98,9 @@ def test_can_get_violation_by_key_name(self): } for i in xrange(6) } + dt = datetime(2014, 04, 15, 11, 44, 4) + dt_timestamp = calendar.timegm(dt.utctimetuple()) + response = yield self.http_client.fetch( self.get_url('/violation/key.1') ) @@ -97,6 +110,7 @@ def test_can_get_violation_by_key_name(self): expect(violations['title']).to_equal('title.1') expect(violations['reviews']).to_length(4) expect(violations['reviewsCount']).to_equal(4) + expect(violations['reviews'][3]['page']['completedAt']).to_equal(dt_timestamp) response = yield self.http_client.fetch( self.get_url('/violation/key.1?page_size=2¤t_page=1') @@ -115,8 +129,8 @@ def test_can_get_violation_by_key_name(self): expect(response.code).to_equal(200) expect(violations).to_length(3) expect(violations['title']).to_equal('title.1') - expect(violations['reviews']).to_length(2) - expect(violations['reviewsCount']).to_equal(2) + expect(violations['reviews']).to_length(4) + expect(violations['reviewsCount']).to_equal(4) response = yield self.http_client.fetch( self.get_url('/violation/key.1?domain_filter=gc.com') @@ -126,7 +140,7 @@ def test_can_get_violation_by_key_name(self): expect(violations).to_length(3) expect(violations['title']).to_equal('title.1') expect(violations['reviews']).to_length(2) - expect(violations['reviewsCount']).to_equal(8) + expect(violations['reviewsCount']).to_equal(2) response = yield self.http_client.fetch( self.get_url('/violation/key.1?domain_filter=foobar') @@ -148,6 +162,105 @@ def test_can_get_violation_by_key_name(self): expect(violations['reviews']).to_length(1) expect(violations['reviewsCount']).to_equal(1) + @gen_test + def test_can_get_violation_by_key_name_using_elastic_search_provider(self): + self.use_elastic_search_provider() + + domains = [DomainFactory.create( + name='g%s.com' % chr(i), + url='http://g%s.com' % chr(i) + ) for i in xrange(ord('a'), ord('d'))] + + pages = [PageFactory.create( + domain=domains[i % 3], + url='%s/%d' % (domains[i % 3].url, i % 2) + ) for i in xrange(6)] + + for i, page in enumerate(pages): + review = ReviewFactory.create( + page=page, + is_active=True, + number_of_violations=6 - i, + created_date=datetime(2014, 04, 15, 11, 44, i), + completed_date=datetime(2014, 04, 15, 11, 44, i * 2), + ) + self.server.application.search_provider.index_review(review) + + self.db.flush() + self.server.application.search_provider.refresh() + + self.server.application.violation_definitions = { + 'key.%s' % i: { + 'title': 'title.%s' % i, + 'category': 'category.%s' % (i % 3), + 'key': Key.get_or_create(self.db, 'key.%d' % i, 'category.%d' % (i % 3)) + } for i in xrange(6) + } + + dt = datetime(2014, 04, 15, 11, 44, 6) + dt_timestamp = calendar.timegm(dt.utctimetuple()) + + response = yield self.http_client.fetch( + self.get_url('/violation/key.1') + ) + violations = loads(response.body) + expect(response.code).to_equal(200) + expect(violations).to_length(3) + expect(violations['title']).to_equal('title.1') + expect(violations['reviews']).to_length(5) + expect(violations['reviewsCount']).to_equal(5) + expect(violations['reviews'][3]['page']['completedAt']).to_equal(dt_timestamp) + + response = yield self.http_client.fetch( + self.get_url('/violation/key.1?page_size=2¤t_page=1') + ) + violations = loads(response.body) + expect(response.code).to_equal(200) + expect(violations).to_length(3) + expect(violations['title']).to_equal('title.1') + expect(violations['reviews']).to_length(2) + expect(violations['reviewsCount']).to_equal(5) + + response = yield self.http_client.fetch( + self.get_url('/violation/key.1?page_filter=1') + ) + violations = loads(response.body) + expect(response.code).to_equal(200) + expect(violations).to_length(3) + expect(violations['title']).to_equal('title.1') + expect(violations['reviews']).to_length(5) + expect(violations['reviewsCount']).to_equal(5) + + response = yield self.http_client.fetch( + self.get_url('/violation/key.1?domain_filter=gb.com') + ) + violations = loads(response.body) + expect(response.code).to_equal(200) + expect(violations).to_length(3) + expect(violations['title']).to_equal('title.1') + expect(violations['reviews']).to_length(2) + expect(violations['reviewsCount']).to_equal(2) + + response = yield self.http_client.fetch( + self.get_url('/violation/key.1?domain_filter=foobar') + ) + violations = loads(response.body) + expect(response.code).to_equal(200) + expect(violations).to_length(3) + expect(violations['title']).to_equal('title.1') + expect(violations['reviews']).to_length(5) + expect(violations['reviewsCount']).to_equal(5) + + response = yield self.http_client.fetch( + self.get_url('/violation/key.1?domain_filter=gb.com&page_filter=1') + ) + violations = loads(response.body) + expect(response.code).to_equal(200) + expect(violations).to_length(3) + expect(violations['title']).to_equal('title.1') + expect(violations['reviews']).to_length(1) + expect(violations['reviewsCount']).to_equal(1) + @gen_test def test_can_get_blacklist_domains(self): key = KeyFactory.create(name='blacklist.domains')