diff --git a/openprocurement/bot/identification/client.py b/openprocurement/bot/identification/client.py index 3d5a806..b4f9c7d 100644 --- a/openprocurement/bot/identification/client.py +++ b/openprocurement/bot/identification/client.py @@ -21,7 +21,7 @@ def verify(self, param, code, headers): return response - def details(self, id, headers): + def details(self, id, headers={}): """ Send request to Proxy server to get details.""" url = '{url}/{id}'.format(url=self.details_url, id=id) response = self.session.get(url=url, auth=(self.user, self.password), timeout=self.timeout, headers=headers) diff --git a/openprocurement/bot/identification/databridge/bridge.py b/openprocurement/bot/identification/databridge/bridge.py index 8a34727..a947dda 100644 --- a/openprocurement/bot/identification/databridge/bridge.py +++ b/openprocurement/bot/identification/databridge/bridge.py @@ -36,7 +36,7 @@ def __init__(self, config): self.config = config api_server = self.config_get('tenders_api_server') - api_version = self.config_get('tenders_api_version') + self.api_version = self.config_get('tenders_api_version') ro_api_server = self.config_get('public_tenders_api_server') or api_server buffers_size = self.config_get('buffers_size') or 500 self.delay = self.config_get('delay') or 15 @@ -46,8 +46,8 @@ def __init__(self, config): self.doc_service_port = self.config_get('doc_service_port') or 6555 # init clients - self.tenders_sync_client = TendersClientSync('', host_url=ro_api_server, api_version=api_version) - self.client = TendersClient(self.config_get('api_token'), host_url=api_server, api_version=api_version) + self.tenders_sync_client = TendersClientSync('', host_url=ro_api_server, api_version=self.api_version) + self.client = TendersClient(self.config_get('api_token'), host_url=api_server, api_version=self.api_version) self.proxyClient = ProxyClient(host=self.config_get('proxy_server'), user=self.config_get('proxy_user'), password=self.config_get('proxy_password'), @@ -112,9 +112,6 @@ def config_get(self, name): return self.config.get('main').get(name) def check_doc_service(self): - """Makes request to the doc_service, returns True if it's up, raises RequestError otherwise - Separated to allow for possible granular checks - """ try: request("{host}:{port}/".format(host=self.doc_service_host, port=self.doc_service_port)) except RequestError as e: @@ -124,10 +121,17 @@ def check_doc_service(self): else: return True + def check_openprocurement_api(self): + try: + self.client.head('/api/{}/spore'.format(self.api_version)) + except RequestError as e: + logger.info('TendersServer connection error, message {}'.format(e), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_DOC_SERVICE_CONN_ERROR}, {})) + raise e + else: + return True + def check_proxy(self): - """Makes request to the EDR proxy, returns True if it's up, raises RequestError otherwise - Separated to allow for possible granular checks - """ try: self.proxyClient.health() except RequestException as e: @@ -137,6 +141,20 @@ def check_proxy(self): else: return True + def set_sleep(self, new_status): + for job in self.jobs.values(): + job.exit = new_status + + def check_services(self): + try: + self.check_proxy() and self.check_openprocurement_api() and self.check_doc_service() + except Exception as e: + logger.info("Service is unavailable, message {}".format(e)) + self.set_sleep(True) + else: + logger.info("All services have become available, starting all workers") + self.set_sleep(False) + def _start_jobs(self): self.jobs = {'scanner': self.scanner(), 'filter_tender': self.filter_tender(), @@ -150,6 +168,7 @@ def run(self): try: while True: gevent.sleep(self.delay) + self.check_services() if counter == 20: logger.info('Current state: Filtered tenders {}; Edrpou codes queue {}; Retry edrpou codes queue {}; ' 'Edr ids queue {}; Retry edr ids queue {}; Upload to doc service {}; Retry upload to doc service {}; ' @@ -167,7 +186,7 @@ def run(self): counter = 0 counter += 1 for name, job in self.jobs.items(): - if job.dead: + if job.dead and not job.exit: logger.warning('Restarting {} worker'.format(name), extra=journal_context({"MESSAGE_ID": DATABRIDGE_RESTART_WORKER})) self.jobs[name] = gevent.spawn(getattr(self, name)) @@ -188,7 +207,7 @@ def main(): config = load(config_file_obj.read()) logging.config.dictConfig(config) bridge = EdrDataBridge(config) - bridge.check_proxy() and bridge.check_doc_service() + bridge.check_proxy() and bridge.check_doc_service() and bridge.check_openprocurement_api() bridge.run() else: logger.info('Invalid configuration file. Exiting...') diff --git a/openprocurement/bot/identification/databridge/edr_handler.py b/openprocurement/bot/identification/databridge/edr_handler.py index 14bee5d..e9e7d6b 100644 --- a/openprocurement/bot/identification/databridge/edr_handler.py +++ b/openprocurement/bot/identification/databridge/edr_handler.py @@ -228,7 +228,7 @@ def get_edr_details(self): self.retry_edr_ids_queue.put(Data(tender_data.tender_id, tender_data.item_id, tender_data.code, tender_data.item_name, [edr_id], file_content)) self.handle_status_response(response, tender_data.tender_id) - logger.info('Put tender {} with {} id {} {} to retry_edr_ids_queue'.format( + logger.info('Put tender {} with {} id {} document_id {} to retry_edr_ids_queue'.format( tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id), extra=journal_context(params={"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id})) self.edr_ids_queue.get() diff --git a/openprocurement/bot/identification/databridge/journal_msg_ids.py b/openprocurement/bot/identification/databridge/journal_msg_ids.py index 8930eb6..3e0e4e0 100644 --- a/openprocurement/bot/identification/databridge/journal_msg_ids.py +++ b/openprocurement/bot/identification/databridge/journal_msg_ids.py @@ -37,6 +37,7 @@ DATABRIDGE_RESTART_RETRY_GET_EDR_ID = 'edr_databridge_restart_retry_get_edr_id' DATABRIDGE_RESTART_RETRY_GET_EDR_DETAILS = 'edr_databridge_restart_retry_get_edr_details' DATABRIDGE_DOC_SERVICE_CONN_ERROR = 'edr_databridge_doc_service_conn_error' +DATABRIDGE_TENDERS_SERVER_CONN_ERROR = 'edr_databridge_tenders_server_conn_error' DATABRIDGE_PROXY_SERVER_CONN_ERROR = 'edr_databridge_proxy_server_conn_error' DATABRIDGE_422_UPLOAD_TO_TENDER = 'edr_databridge_422_upload_to_tender' DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING = 'edr_databridge_item_status_changed_while_processing' diff --git a/openprocurement/bot/identification/databridge/scanner.py b/openprocurement/bot/identification/databridge/scanner.py index 5185dc7..1d9138b 100644 --- a/openprocurement/bot/identification/databridge/scanner.py +++ b/openprocurement/bot/identification/databridge/scanner.py @@ -92,36 +92,38 @@ def get_tenders(self, params={}, direction=""): raise re def get_tenders_forward(self): - logger.info('Start forward data sync worker...') - params = {'opt_fields': 'status,procurementMethodType', 'mode': '_all_'} - try: - for tender in self.get_tenders(params=params, direction="forward"): - logger.info('Forward sync: Put tender {} to process...'.format(tender['id']), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS}, - {"TENDER_ID": tender['id']})) - self.filtered_tender_ids_queue.put(tender['id']) - except Exception as e: - logger.warning('Forward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {})) - logger.exception(e) - else: - logger.warning('Forward data sync finished!') + if not self.exit: + logger.info('Start forward data sync worker...') + params = {'opt_fields': 'status,procurementMethodType', 'mode': '_all_'} + try: + for tender in self.get_tenders(params=params, direction="forward"): + logger.info('Forward sync: Put tender {} to process...'.format(tender['id']), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS}, + {"TENDER_ID": tender['id']})) + self.filtered_tender_ids_queue.put(tender['id']) + except Exception as e: + logger.warning('Forward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {})) + logger.exception(e) + else: + logger.warning('Forward data sync finished!') def get_tenders_backward(self): - logger.info('Start backward data sync worker...') - params = {'opt_fields': 'status,procurementMethodType', 'descending': 1, 'mode': '_all_'} - try: - for tender in self.get_tenders(params=params, direction="backward"): - logger.info('Backward sync: Put tender {} to process...'.format(tender['id']), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS}, - {"TENDER_ID": tender['id']})) - self.filtered_tender_ids_queue.put(tender['id']) - except Exception as e: - logger.warning('Backward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {})) - logger.exception(e) - return False - else: - logger.info('Backward data sync finished.') - return True + if not self.exit: + logger.info('Start backward data sync worker...') + params = {'opt_fields': 'status,procurementMethodType', 'descending': 1, 'mode': '_all_'} + try: + for tender in self.get_tenders(params=params, direction="backward"): + logger.info('Backward sync: Put tender {} to process...'.format(tender['id']), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS}, + {"TENDER_ID": tender['id']})) + self.filtered_tender_ids_queue.put(tender['id']) + except Exception as e: + logger.warning('Backward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {})) + logger.exception(e) + return False + else: + logger.info('Backward data sync finished.') + return True def _start_synchronization_workers(self): logger.info('Scanner starting forward and backward sync workers') diff --git a/openprocurement/bot/identification/tests/bridge.py b/openprocurement/bot/identification/tests/bridge.py index b519c50..3a80b81 100644 --- a/openprocurement/bot/identification/tests/bridge.py +++ b/openprocurement/bot/identification/tests/bridge.py @@ -64,7 +64,7 @@ def setUpClass(cls): cls.proxy_server_bottle = Bottle() cls.doc_server_bottle = Bottle() cls.api_server = WSGIServer(('127.0.0.1', 20604), cls.api_server_bottle, log=None) - setup_routing(cls.api_server_bottle, response_spore) + setup_routing(cls.api_server_bottle, response_spore, method='HEAD') cls.public_api_server = WSGIServer(('127.0.0.1', 20605), cls.api_server_bottle, log=None) cls.doc_server = WSGIServer(('127.0.0.1', 20606), cls.doc_server_bottle, log=None) setup_routing(cls.doc_server_bottle, doc_response, path='/') @@ -84,14 +84,21 @@ def tearDownClass(cls): cls.doc_server.close() cls.proxy_server.close() + def setUp(self): + self.worker = EdrDataBridge(config) + workers = {'scanner': MagicMock(return_value=MagicMock(exit=False)), + 'filter_tender': MagicMock(return_value=MagicMock(exit=False)), + 'edr_handler': MagicMock(return_value=MagicMock(exit=False)), + 'upload_file': MagicMock(return_value=MagicMock(exit=False))} + for name, value in workers.items(): + setattr(self.worker, name, value) + def tearDown(self): del self.worker - def setup_routing(app, func, path='/api/0/spore', method='GET'): app.route(path, method, func) - def response_spore(): response.set_cookie("SERVER_ID", ("a7afc9b1fc79e640f2487ba48243ca071c07a823d27" "8cf9b7adf0fae467a524747e3c6c6973262130fac2b" @@ -102,15 +109,17 @@ def response_spore(): def doc_response(): return response - def proxy_response(): return response +def proxy_response_402(): + response.status = "402 Payment required" + return response + class TestBridgeWorker(BaseServersTest): def test_init(self): - # setup_routing(self.api_server_bottle, response_spore) self.worker = EdrDataBridge(config) self.assertEqual(self.worker.delay, 15) self.assertEqual(self.worker.increment_step, 1) @@ -161,7 +170,6 @@ def test_tender_sync_clients(self, sync_client, client, doc_service_client, prox 'version': config['main']['proxy_version']}) def test_start_jobs(self): - # setup_routing(self.api_server_bottle, response_spore) self.worker = EdrDataBridge(config) scanner, filter_tender, edr_handler, upload_file = [MagicMock(return_value=i) for i in range(4)] @@ -173,9 +181,9 @@ def test_start_jobs(self): self.worker._start_jobs() # check that all jobs were started self.assertTrue(scanner.called) - self.assertTrue(scanner.called) - self.assertTrue(scanner.called) - self.assertTrue(scanner.called) + self.assertTrue(filter_tender.called) + self.assertTrue(edr_handler.called) + self.assertTrue(upload_file.called) self.assertEqual(self.worker.jobs['scanner'], 0) self.assertEqual(self.worker.jobs['filter_tender'], 1) @@ -194,31 +202,89 @@ def test_run(self, sleep): with patch('__builtin__.True', AlmostAlwaysTrue(100)): self.worker.run() - self.assertEqual(scanner.call_count, 1) - self.assertEqual(filter_tender.call_count, 1) - self.assertEqual(edr_handler.call_count, 1) - self.assertEqual(upload_file.call_count, 1) + self.assertEqual(self.worker.scanner.call_count, 1) + self.assertEqual(self.worker.filter_tender.call_count, 1) + self.assertEqual(self.worker.edr_handler.call_count, 1) + self.assertEqual(self.worker.upload_file.call_count, 1) - def test_proxy_server_failure(self): + def test_proxy_server(self): self.proxy_server.stop() - self.worker = EdrDataBridge(config) with self.assertRaises(RequestException): self.worker.check_proxy() self.proxy_server.start() - self.assertEqual(self.worker.check_proxy(), True) + self.assertTrue(self.worker.check_proxy()) - def test_proxy_server_success(self): - self.worker = EdrDataBridge(config) - self.assertEqual(self.worker.check_proxy(), True) - - def test_doc_service_failure(self): + def test_doc_service(self): self.doc_server.stop() - self.worker = EdrDataBridge(config) with self.assertRaises(RequestError): self.worker.check_doc_service() self.doc_server.start() - self.assertEqual(self.worker.check_doc_service(), True) + self.assertTrue(self.worker.check_doc_service()) - def test_doc_service_success(self): - self.worker = EdrDataBridge(config) - self.assertEqual(self.worker.check_doc_service(), True) + def test_api(self): + self.api_server.stop() + with self.assertRaises(RequestError): + self.worker.check_openprocurement_api() + self.api_server.start() + self.assertTrue(self.worker.check_openprocurement_api()) + + def test_check_services_did_not_stop(self): + self.worker._start_jobs() + functions = {'check_proxy': MagicMock(return_value = True), + 'check_doc_service': MagicMock(return_value = True), + 'check_openprocurement_api': MagicMock(return_value = True)} + for name, value in functions.items(): + setattr(self.worker, name, value) + self.worker.check_services() + self.assertTrue(all([i.call_count == 1 for i in functions.values()])) + self.assertFalse(all([i.exit for i in self.worker.jobs.values()])) + + def test_check_services(self): + self.worker._start_jobs() + self.proxy_server.stop() + self.worker.check_services() + self.assertTrue(all([i.exit for i in self.worker.jobs.values()])) + self.proxy_server.start() + self.worker.set_sleep(False) + + self.doc_server.stop() + self.worker.check_services() + self.assertTrue(all([i.exit for i in self.worker.jobs.values()])) + self.doc_server.start() + self.worker.set_sleep(False) + + self.api_server.stop() + self.worker.check_services() + self.assertTrue(all([i.exit for i in self.worker.jobs.values()])) + self.api_server.start() + self.worker.set_sleep(False) + + def test_check_services_needs_all(self): + self.worker._start_jobs() + self.worker.set_sleep(True) + self.proxy_server.stop() + self.doc_server.stop() + self.api_server.stop() + + self.proxy_server.start() + self.worker.check_services() + self.assertTrue(all([i.exit for i in self.worker.jobs.values()])) + + self.doc_server.start() + self.worker.check_services() + self.assertTrue(all([i.exit for i in self.worker.jobs.values()])) + self.worker.set_sleep(False) + + self.api_server.start() + self.worker.check_services() + self.assertFalse(all([i.exit for i in self.worker.jobs.values()])) + + @patch('gevent.sleep') + def test_run_with_mock_check_services(self, sleep): + """Basic test to ensure run() goes into the while (and inside that for) loops and that jobs are called only once""" + self.worker.check_services = MagicMock() + self.worker.run() + self.assertEqual(self.worker.scanner.call_count, 1) + self.assertEqual(self.worker.filter_tender.call_count, 1) + self.assertEqual(self.worker.edr_handler.call_count, 1) + self.assertEqual(self.worker.upload_file.call_count, 1)