Skip to content

Commit

Permalink
Merge pull request #66 from ITVaan/add_sleep_mode
Browse files Browse the repository at this point in the history
Реалізували сплячий режим для бота, перевіряємо працездатність proxy, api, doc service
  • Loading branch information
kroman0 authored Jun 7, 2017
2 parents 9f7723c + 472895f commit 22e9572
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 67 deletions.
2 changes: 1 addition & 1 deletion openprocurement/bot/identification/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def verify(self, param, code, headers):

return response

def details(self, id, headers):
def details(self, id, headers={}):
""" Send request to Proxy server to get details."""
url = '{url}/{id}'.format(url=self.details_url, id=id)
response = self.session.get(url=url, auth=(self.user, self.password), timeout=self.timeout, headers=headers)
Expand Down
41 changes: 30 additions & 11 deletions openprocurement/bot/identification/databridge/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, config):
self.config = config

api_server = self.config_get('tenders_api_server')
api_version = self.config_get('tenders_api_version')
self.api_version = self.config_get('tenders_api_version')
ro_api_server = self.config_get('public_tenders_api_server') or api_server
buffers_size = self.config_get('buffers_size') or 500
self.delay = self.config_get('delay') or 15
Expand All @@ -46,8 +46,8 @@ def __init__(self, config):
self.doc_service_port = self.config_get('doc_service_port') or 6555

# init clients
self.tenders_sync_client = TendersClientSync('', host_url=ro_api_server, api_version=api_version)
self.client = TendersClient(self.config_get('api_token'), host_url=api_server, api_version=api_version)
self.tenders_sync_client = TendersClientSync('', host_url=ro_api_server, api_version=self.api_version)
self.client = TendersClient(self.config_get('api_token'), host_url=api_server, api_version=self.api_version)
self.proxyClient = ProxyClient(host=self.config_get('proxy_server'),
user=self.config_get('proxy_user'),
password=self.config_get('proxy_password'),
Expand Down Expand Up @@ -112,9 +112,6 @@ def config_get(self, name):
return self.config.get('main').get(name)

def check_doc_service(self):
"""Makes request to the doc_service, returns True if it's up, raises RequestError otherwise
Separated to allow for possible granular checks
"""
try:
request("{host}:{port}/".format(host=self.doc_service_host, port=self.doc_service_port))
except RequestError as e:
Expand All @@ -124,10 +121,17 @@ def check_doc_service(self):
else:
return True

def check_openprocurement_api(self):
try:
self.client.head('/api/{}/spore'.format(self.api_version))
except RequestError as e:
logger.info('TendersServer connection error, message {}'.format(e),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_DOC_SERVICE_CONN_ERROR}, {}))
raise e
else:
return True

def check_proxy(self):
"""Makes request to the EDR proxy, returns True if it's up, raises RequestError otherwise
Separated to allow for possible granular checks
"""
try:
self.proxyClient.health()
except RequestException as e:
Expand All @@ -137,6 +141,20 @@ def check_proxy(self):
else:
return True

def set_sleep(self, new_status):
for job in self.jobs.values():
job.exit = new_status

def check_services(self):
try:
self.check_proxy() and self.check_openprocurement_api() and self.check_doc_service()
except Exception as e:
logger.info("Service is unavailable, message {}".format(e))
self.set_sleep(True)
else:
logger.info("All services have become available, starting all workers")
self.set_sleep(False)

def _start_jobs(self):
self.jobs = {'scanner': self.scanner(),
'filter_tender': self.filter_tender(),
Expand All @@ -150,6 +168,7 @@ def run(self):
try:
while True:
gevent.sleep(self.delay)
self.check_services()
if counter == 20:
logger.info('Current state: Filtered tenders {}; Edrpou codes queue {}; Retry edrpou codes queue {}; '
'Edr ids queue {}; Retry edr ids queue {}; Upload to doc service {}; Retry upload to doc service {}; '
Expand All @@ -167,7 +186,7 @@ def run(self):
counter = 0
counter += 1
for name, job in self.jobs.items():
if job.dead:
if job.dead and not job.exit:
logger.warning('Restarting {} worker'.format(name),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_RESTART_WORKER}))
self.jobs[name] = gevent.spawn(getattr(self, name))
Expand All @@ -188,7 +207,7 @@ def main():
config = load(config_file_obj.read())
logging.config.dictConfig(config)
bridge = EdrDataBridge(config)
bridge.check_proxy() and bridge.check_doc_service()
bridge.check_proxy() and bridge.check_doc_service() and bridge.check_openprocurement_api()
bridge.run()
else:
logger.info('Invalid configuration file. Exiting...')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def get_edr_details(self):
self.retry_edr_ids_queue.put(Data(tender_data.tender_id, tender_data.item_id, tender_data.code,
tender_data.item_name, [edr_id], file_content))
self.handle_status_response(response, tender_data.tender_id)
logger.info('Put tender {} with {} id {} {} to retry_edr_ids_queue'.format(
logger.info('Put tender {} with {} id {} document_id {} to retry_edr_ids_queue'.format(
tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id),
extra=journal_context(params={"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id}))
self.edr_ids_queue.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
DATABRIDGE_RESTART_RETRY_GET_EDR_ID = 'edr_databridge_restart_retry_get_edr_id'
DATABRIDGE_RESTART_RETRY_GET_EDR_DETAILS = 'edr_databridge_restart_retry_get_edr_details'
DATABRIDGE_DOC_SERVICE_CONN_ERROR = 'edr_databridge_doc_service_conn_error'
DATABRIDGE_TENDERS_SERVER_CONN_ERROR = 'edr_databridge_tenders_server_conn_error'
DATABRIDGE_PROXY_SERVER_CONN_ERROR = 'edr_databridge_proxy_server_conn_error'
DATABRIDGE_422_UPLOAD_TO_TENDER = 'edr_databridge_422_upload_to_tender'
DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING = 'edr_databridge_item_status_changed_while_processing'
58 changes: 30 additions & 28 deletions openprocurement/bot/identification/databridge/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,36 +92,38 @@ def get_tenders(self, params={}, direction=""):
raise re

def get_tenders_forward(self):
logger.info('Start forward data sync worker...')
params = {'opt_fields': 'status,procurementMethodType', 'mode': '_all_'}
try:
for tender in self.get_tenders(params=params, direction="forward"):
logger.info('Forward sync: Put tender {} to process...'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
{"TENDER_ID": tender['id']}))
self.filtered_tender_ids_queue.put(tender['id'])
except Exception as e:
logger.warning('Forward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {}))
logger.exception(e)
else:
logger.warning('Forward data sync finished!')
if not self.exit:
logger.info('Start forward data sync worker...')
params = {'opt_fields': 'status,procurementMethodType', 'mode': '_all_'}
try:
for tender in self.get_tenders(params=params, direction="forward"):
logger.info('Forward sync: Put tender {} to process...'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
{"TENDER_ID": tender['id']}))
self.filtered_tender_ids_queue.put(tender['id'])
except Exception as e:
logger.warning('Forward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {}))
logger.exception(e)
else:
logger.warning('Forward data sync finished!')

def get_tenders_backward(self):
logger.info('Start backward data sync worker...')
params = {'opt_fields': 'status,procurementMethodType', 'descending': 1, 'mode': '_all_'}
try:
for tender in self.get_tenders(params=params, direction="backward"):
logger.info('Backward sync: Put tender {} to process...'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
{"TENDER_ID": tender['id']}))
self.filtered_tender_ids_queue.put(tender['id'])
except Exception as e:
logger.warning('Backward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {}))
logger.exception(e)
return False
else:
logger.info('Backward data sync finished.')
return True
if not self.exit:
logger.info('Start backward data sync worker...')
params = {'opt_fields': 'status,procurementMethodType', 'descending': 1, 'mode': '_all_'}
try:
for tender in self.get_tenders(params=params, direction="backward"):
logger.info('Backward sync: Put tender {} to process...'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
{"TENDER_ID": tender['id']}))
self.filtered_tender_ids_queue.put(tender['id'])
except Exception as e:
logger.warning('Backward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {}))
logger.exception(e)
return False
else:
logger.info('Backward data sync finished.')
return True

def _start_synchronization_workers(self):
logger.info('Scanner starting forward and backward sync workers')
Expand Down
118 changes: 92 additions & 26 deletions openprocurement/bot/identification/tests/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def setUpClass(cls):
cls.proxy_server_bottle = Bottle()
cls.doc_server_bottle = Bottle()
cls.api_server = WSGIServer(('127.0.0.1', 20604), cls.api_server_bottle, log=None)
setup_routing(cls.api_server_bottle, response_spore)
setup_routing(cls.api_server_bottle, response_spore, method='HEAD')
cls.public_api_server = WSGIServer(('127.0.0.1', 20605), cls.api_server_bottle, log=None)
cls.doc_server = WSGIServer(('127.0.0.1', 20606), cls.doc_server_bottle, log=None)
setup_routing(cls.doc_server_bottle, doc_response, path='/')
Expand All @@ -84,14 +84,21 @@ def tearDownClass(cls):
cls.doc_server.close()
cls.proxy_server.close()

def setUp(self):
self.worker = EdrDataBridge(config)
workers = {'scanner': MagicMock(return_value=MagicMock(exit=False)),
'filter_tender': MagicMock(return_value=MagicMock(exit=False)),
'edr_handler': MagicMock(return_value=MagicMock(exit=False)),
'upload_file': MagicMock(return_value=MagicMock(exit=False))}
for name, value in workers.items():
setattr(self.worker, name, value)

def tearDown(self):
del self.worker


def setup_routing(app, func, path='/api/0/spore', method='GET'):
app.route(path, method, func)


def response_spore():
response.set_cookie("SERVER_ID", ("a7afc9b1fc79e640f2487ba48243ca071c07a823d27"
"8cf9b7adf0fae467a524747e3c6c6973262130fac2b"
Expand All @@ -102,15 +109,17 @@ def response_spore():
def doc_response():
return response


def proxy_response():
return response

def proxy_response_402():
response.status = "402 Payment required"
return response


class TestBridgeWorker(BaseServersTest):

def test_init(self):
# setup_routing(self.api_server_bottle, response_spore)
self.worker = EdrDataBridge(config)
self.assertEqual(self.worker.delay, 15)
self.assertEqual(self.worker.increment_step, 1)
Expand Down Expand Up @@ -161,7 +170,6 @@ def test_tender_sync_clients(self, sync_client, client, doc_service_client, prox
'version': config['main']['proxy_version']})

def test_start_jobs(self):
# setup_routing(self.api_server_bottle, response_spore)
self.worker = EdrDataBridge(config)

scanner, filter_tender, edr_handler, upload_file = [MagicMock(return_value=i) for i in range(4)]
Expand All @@ -173,9 +181,9 @@ def test_start_jobs(self):
self.worker._start_jobs()
# check that all jobs were started
self.assertTrue(scanner.called)
self.assertTrue(scanner.called)
self.assertTrue(scanner.called)
self.assertTrue(scanner.called)
self.assertTrue(filter_tender.called)
self.assertTrue(edr_handler.called)
self.assertTrue(upload_file.called)

self.assertEqual(self.worker.jobs['scanner'], 0)
self.assertEqual(self.worker.jobs['filter_tender'], 1)
Expand All @@ -194,31 +202,89 @@ def test_run(self, sleep):

with patch('__builtin__.True', AlmostAlwaysTrue(100)):
self.worker.run()
self.assertEqual(scanner.call_count, 1)
self.assertEqual(filter_tender.call_count, 1)
self.assertEqual(edr_handler.call_count, 1)
self.assertEqual(upload_file.call_count, 1)
self.assertEqual(self.worker.scanner.call_count, 1)
self.assertEqual(self.worker.filter_tender.call_count, 1)
self.assertEqual(self.worker.edr_handler.call_count, 1)
self.assertEqual(self.worker.upload_file.call_count, 1)

def test_proxy_server_failure(self):
def test_proxy_server(self):
self.proxy_server.stop()
self.worker = EdrDataBridge(config)
with self.assertRaises(RequestException):
self.worker.check_proxy()
self.proxy_server.start()
self.assertEqual(self.worker.check_proxy(), True)
self.assertTrue(self.worker.check_proxy())

def test_proxy_server_success(self):
self.worker = EdrDataBridge(config)
self.assertEqual(self.worker.check_proxy(), True)

def test_doc_service_failure(self):
def test_doc_service(self):
self.doc_server.stop()
self.worker = EdrDataBridge(config)
with self.assertRaises(RequestError):
self.worker.check_doc_service()
self.doc_server.start()
self.assertEqual(self.worker.check_doc_service(), True)
self.assertTrue(self.worker.check_doc_service())

def test_doc_service_success(self):
self.worker = EdrDataBridge(config)
self.assertEqual(self.worker.check_doc_service(), True)
def test_api(self):
self.api_server.stop()
with self.assertRaises(RequestError):
self.worker.check_openprocurement_api()
self.api_server.start()
self.assertTrue(self.worker.check_openprocurement_api())

def test_check_services_did_not_stop(self):
self.worker._start_jobs()
functions = {'check_proxy': MagicMock(return_value = True),
'check_doc_service': MagicMock(return_value = True),
'check_openprocurement_api': MagicMock(return_value = True)}
for name, value in functions.items():
setattr(self.worker, name, value)
self.worker.check_services()
self.assertTrue(all([i.call_count == 1 for i in functions.values()]))
self.assertFalse(all([i.exit for i in self.worker.jobs.values()]))

def test_check_services(self):
self.worker._start_jobs()
self.proxy_server.stop()
self.worker.check_services()
self.assertTrue(all([i.exit for i in self.worker.jobs.values()]))
self.proxy_server.start()
self.worker.set_sleep(False)

self.doc_server.stop()
self.worker.check_services()
self.assertTrue(all([i.exit for i in self.worker.jobs.values()]))
self.doc_server.start()
self.worker.set_sleep(False)

self.api_server.stop()
self.worker.check_services()
self.assertTrue(all([i.exit for i in self.worker.jobs.values()]))
self.api_server.start()
self.worker.set_sleep(False)

def test_check_services_needs_all(self):
self.worker._start_jobs()
self.worker.set_sleep(True)
self.proxy_server.stop()
self.doc_server.stop()
self.api_server.stop()

self.proxy_server.start()
self.worker.check_services()
self.assertTrue(all([i.exit for i in self.worker.jobs.values()]))

self.doc_server.start()
self.worker.check_services()
self.assertTrue(all([i.exit for i in self.worker.jobs.values()]))
self.worker.set_sleep(False)

self.api_server.start()
self.worker.check_services()
self.assertFalse(all([i.exit for i in self.worker.jobs.values()]))

@patch('gevent.sleep')
def test_run_with_mock_check_services(self, sleep):
"""Basic test to ensure run() goes into the while (and inside that for) loops and that jobs are called only once"""
self.worker.check_services = MagicMock()
self.worker.run()
self.assertEqual(self.worker.scanner.call_count, 1)
self.assertEqual(self.worker.filter_tender.call_count, 1)
self.assertEqual(self.worker.edr_handler.call_count, 1)
self.assertEqual(self.worker.upload_file.call_count, 1)

0 comments on commit 22e9572

Please sign in to comment.