diff --git a/openprocurement/bot/identification/databridge/journal_msg_ids.py b/openprocurement/bot/identification/databridge/journal_msg_ids.py index d5e155d..8930eb6 100644 --- a/openprocurement/bot/identification/databridge/journal_msg_ids.py +++ b/openprocurement/bot/identification/databridge/journal_msg_ids.py @@ -39,3 +39,4 @@ DATABRIDGE_DOC_SERVICE_CONN_ERROR = 'edr_databridge_doc_service_conn_error' DATABRIDGE_PROXY_SERVER_CONN_ERROR = 'edr_databridge_proxy_server_conn_error' DATABRIDGE_422_UPLOAD_TO_TENDER = 'edr_databridge_422_upload_to_tender' +DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING = 'edr_databridge_item_status_changed_while_processing' diff --git a/openprocurement/bot/identification/databridge/upload_file.py b/openprocurement/bot/identification/databridge/upload_file.py index fef95cc..1be9f39 100644 --- a/openprocurement/bot/identification/databridge/upload_file.py +++ b/openprocurement/bot/identification/databridge/upload_file.py @@ -15,8 +15,7 @@ DATABRIDGE_SUCCESS_UPLOAD_TO_DOC_SERVICE, DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE, DATABRIDGE_UNSUCCESS_RETRY_UPLOAD_TO_DOC_SERVICE, DATABRIDGE_SUCCESS_UPLOAD_TO_TENDER, DATABRIDGE_UNSUCCESS_UPLOAD_TO_TENDER, DATABRIDGE_UNSUCCESS_RETRY_UPLOAD_TO_TENDER, DATABRIDGE_START_UPLOAD, - DATABRIDGE_422_UPLOAD_TO_TENDER -) + DATABRIDGE_422_UPLOAD_TO_TENDER, DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING) from openprocurement.bot.identification.databridge.constants import file_name logger = logging.getLogger(__name__) @@ -53,7 +52,7 @@ def upload_to_doc_service(self): object to upload_file_to_tender, otherwise put Data to retry_upload_file_queue.""" while not self.exit: try: - tender_data = self.upload_to_doc_service_queue.get() + tender_data = self.upload_to_doc_service_queue.peek() document_id = tender_data.file_content.get('meta', {}).get('id') except LoopExit: gevent.sleep(0) @@ -70,11 +69,13 @@ def upload_to_doc_service(self): "ITEM_ID": tender_data.item_id, "DOCUMENT_ID": document_id})) logger.exception(e) self.retry_upload_to_doc_service_queue.put(tender_data) + self.upload_to_doc_service_queue.get() else: if response.status_code == 200: data = Data(tender_data.tender_id, tender_data.item_id, tender_data.code, tender_data.item_name, tender_data.edr_ids, dict(response.json(), **{'meta': {'id': document_id}})) self.upload_to_tender_queue.put(data) + self.upload_to_doc_service_queue.get() logger.info('Successfully uploaded file to doc service {} {} {} {}'.format( tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id), extra=journal_context({"MESSAGE_ID": DATABRIDGE_SUCCESS_UPLOAD_TO_DOC_SERVICE}, @@ -86,6 +87,7 @@ def upload_to_doc_service(self): params={"TENDER_ID": tender_data.tender_id, "ITEM_ID": tender_data.item_id, "DOCUMENT_ID": document_id})) self.retry_upload_to_doc_service_queue.put(tender_data) + self.upload_to_doc_service_queue.get() gevent.sleep(0) def retry_upload_to_doc_service(self): @@ -93,7 +95,7 @@ def retry_upload_to_doc_service(self): upload_to_tender_queue, otherwise put Data obj back to retry_upload_file_queue""" while not self.exit: try: - tender_data = self.retry_upload_to_doc_service_queue.get() + tender_data = self.retry_upload_to_doc_service_queue.peek() document_id = tender_data.file_content.get('meta', {}).get('id') except LoopExit: gevent.sleep(0) @@ -110,12 +112,15 @@ def retry_upload_to_doc_service(self): params={"TENDER_ID": tender_data.tender_id, "ITEM_ID": tender_data.item_id, "DOCUMENT_ID": document_id})) logger.exception(e) + self.retry_upload_to_doc_service_queue.get() + self.update_processing_items(tender_data.tender_id, tender_data.item_id) raise e else: if response.status_code == 200: data = Data(tender_data.tender_id, tender_data.item_id, tender_data.code, tender_data.item_name, tender_data.edr_ids, dict(response.json(), **{'meta': {'id': document_id}})) self.upload_to_tender_queue.put(data) + self.retry_upload_to_doc_service_queue.get() logger.info('Successfully uploaded file to doc service {} {} {} {}'.format( tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id), extra=journal_context({"MESSAGE_ID": DATABRIDGE_SUCCESS_UPLOAD_TO_DOC_SERVICE}, @@ -126,7 +131,6 @@ def retry_upload_to_doc_service(self): extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_RETRY_UPLOAD_TO_DOC_SERVICE}, params={"TENDER_ID": tender_data.tender_id, "ITEM_ID": tender_data.item_id, "DOCUMENT_ID": document_id})) - self.retry_upload_to_doc_service_queue.put(tender_data) gevent.sleep(0) @retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000) @@ -141,7 +145,7 @@ def upload_to_tender(self): award/qualification from processing_items.""" while not self.exit: try: - tender_data = self.upload_to_tender_queue.get() + tender_data = self.upload_to_tender_queue.peek() except LoopExit: gevent.sleep(0) continue @@ -156,21 +160,36 @@ def upload_to_tender(self): tender_data.item_id)) except ResourceError as re: if re.status_int == 422: # WARNING and don't retry - logger.warning("Accept 422, skip tender {} {} {} {}.".format(tender_data.tender_id, + logger.warning("Accept 422, skip tender {} {} {} {}. Message: {}".format(tender_data.tender_id, tender_data.item_name, - tender_data.item_id, document_id), + tender_data.item_id, document_id, re.msg), extra=journal_context({"MESSAGE_ID": DATABRIDGE_422_UPLOAD_TO_TENDER}, {"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id})) self.update_processing_items(tender_data.tender_id, tender_data.item_id) + self.upload_to_tender_queue.get() continue + elif re.status_int == 403: + logger.warning("Accept 403 while uploading to tender {} {} {} doc_id: {}. Message {}".format( + tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id, re.msg), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING}, + {"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id}) + ) + self.update_processing_items(tender_data.tender_id, tender_data.item_id) + self.upload_to_tender_queue.get() else: + logger.warning('Exception while retry uploading file to tender {} {} {} {}. Message: {}'.format( + tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id, re.message), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_RETRY_UPLOAD_TO_TENDER}, + params={"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id})) self.retry_upload_to_tender_queue.put(tender_data) + self.upload_to_tender_queue.get() except Exception as e: logger.info('Exception while uploading file to tender {} {} {} {}. Message: {}'.format( tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id, e.message), extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_TENDER}, params={"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id})) self.retry_upload_to_tender_queue.put(tender_data) + self.upload_to_tender_queue.get() else: logger.info('Successfully uploaded file to tender {} {} {} {}'.format( tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id), @@ -178,6 +197,7 @@ def upload_to_tender(self): params={"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id})) # delete current tender after successful upload file (to avoid reloading file) self.update_processing_items(tender_data.tender_id, tender_data.item_id) + self.upload_to_tender_queue.get() gevent.sleep(0) def retry_upload_to_tender(self): @@ -185,7 +205,7 @@ def retry_upload_to_tender(self): retry_upload_to_tender_queue""" while not self.exit: try: - tender_data = self.retry_upload_to_tender_queue.get() + tender_data = self.retry_upload_to_tender_queue.peek() except LoopExit: gevent.sleep(0) continue @@ -195,26 +215,33 @@ def retry_upload_to_tender(self): self.client_upload_to_tender(tender_data) except ResourceError as re: if re.status_int == 422: # WARNING and don't retry - logger.warning("Accept 422, skip tender {} {} {} {}.".format(tender_data.tender_id, tender_data.item_name, - tender_data.item_id, document_id), + logger.warning("Accept 422, skip tender {} {} {} {}. Message {}".format(tender_data.tender_id, tender_data.item_name, + tender_data.item_id, document_id, re.msg), extra=journal_context({"MESSAGE_ID": DATABRIDGE_422_UPLOAD_TO_TENDER}, {"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id})) self.update_processing_items(tender_data.tender_id, tender_data.item_id) + self.retry_upload_to_tender_queue.get() + continue + elif re.status_int == 403: + logger.warning("Accept 403 while uploading to tender {} {} {} doc_id: {}. Message {}".format( + tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id, re.msg), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING}, + {"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id}) + ) + self.update_processing_items(tender_data.tender_id, tender_data.item_id) + self.retry_upload_to_tender_queue.get() continue else: logger.info('Exception while retry uploading file to tender {} {} {} {}. Message: {}'.format( tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id, re.message), extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_RETRY_UPLOAD_TO_TENDER}, params={"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id})) - logger.exception(re) - self.retry_upload_to_tender_queue.put(tender_data) except Exception as e: logger.info('Exception while retry uploading file to tender {} {} {} {}. Message: {}'.format( tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id, e.message), extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_RETRY_UPLOAD_TO_TENDER}, params={"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id})) logger.exception(e) - self.retry_upload_to_tender_queue.put(tender_data) else: logger.info('Successfully uploaded file to tender {} {} {} {} in retry'.format( tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id), @@ -222,6 +249,7 @@ def retry_upload_to_tender(self): params={"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id})) # delete current tender after successful upload file (to avoid reloading file) self.update_processing_items(tender_data.tender_id, tender_data.item_id) + self.retry_upload_to_tender_queue.get() gevent.sleep(0) @retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000) diff --git a/openprocurement/bot/identification/tests/upload_file.py b/openprocurement/bot/identification/tests/upload_file.py index da0e668..22ec3e8 100644 --- a/openprocurement/bot/identification/tests/upload_file.py +++ b/openprocurement/bot/identification/tests/upload_file.py @@ -66,7 +66,9 @@ def test_successful_upload(self, mrequest, gevent_sleep): self.assertItemsEqual(processing_items.keys(), [key]) self.assertEqual(upload_to_doc_service_queue.qsize(), 1) worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) - sleep(4) + while (upload_to_doc_service_queue.qsize() or upload_to_tender_queue.qsize() or + worker.retry_upload_to_doc_service_queue.qsize() or worker.retry_upload_to_tender_queue.qsize()): + sleep(1) # sleep while at least one queue is not empty worker.shutdown() self.assertEqual(upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') self.assertEqual(upload_to_tender_queue.qsize(), 0, 'Queue should be empty') @@ -109,7 +111,9 @@ def test_retry_doc_service(self, mrequest, gevent_sleep): self.assertItemsEqual(processing_items.keys(), [key]) self.assertEqual(upload_to_doc_service_queue.qsize(), 1) worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) - sleep(7) + while (upload_to_doc_service_queue.qsize() or upload_to_tender_queue.qsize() or + worker.retry_upload_to_doc_service_queue.qsize() or worker.retry_upload_to_tender_queue.qsize()): + sleep(1) # sleep while at least one queue is not empty worker.shutdown() self.assertEqual(upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') self.assertEqual(upload_to_tender_queue.qsize(), 0, 'Queue should be empty') @@ -131,11 +135,11 @@ def test_retry_upload_to_tender(self, mrequest, gevent_sleep): 'title': file_name}}, status_code=200) client = MagicMock() - client._create_tender_resource_item.side_effect = [Unauthorized(http_code=403), - Unauthorized(http_code=403), - Unauthorized(http_code=403), - Unauthorized(http_code=403), - Unauthorized(http_code=403), + client._create_tender_resource_item.side_effect = [Unauthorized(http_code=401), + Unauthorized(http_code=401), + Unauthorized(http_code=401), + Unauthorized(http_code=401), + Unauthorized(http_code=401), Exception(), {'data': {'id': uuid.uuid4().hex, 'documentOf': 'tender', @@ -152,7 +156,9 @@ def test_retry_upload_to_tender(self, mrequest, gevent_sleep): self.assertItemsEqual(processing_items.keys(), [key]) self.assertEqual(upload_to_doc_service_queue.qsize(), 1) worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) - sleep(60) + while (upload_to_doc_service_queue.qsize() or upload_to_tender_queue.qsize() or + worker.retry_upload_to_doc_service_queue.qsize() or worker.retry_upload_to_tender_queue.qsize()): + sleep(1) # sleep while at least one queue is not empty worker.shutdown() self.assertEqual(upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') self.assertEqual(upload_to_tender_queue.qsize(), 0, 'Queue should be empty') @@ -184,7 +190,9 @@ def test_request_failed(self, mrequest, gevent_sleep): upload_to_tender_queue = Queue(10) upload_to_doc_service_queue.put(Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})) worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) - sleep(10) + while (upload_to_doc_service_queue.qsize() or upload_to_tender_queue.qsize() or + worker.retry_upload_to_doc_service_queue.qsize() or worker.retry_upload_to_tender_queue.qsize()): + sleep(1) # sleep while at least one queue is not empty worker.shutdown() self.assertEqual(upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') self.assertEqual(upload_to_tender_queue.qsize(), 0, 'Queue should be empty') @@ -196,7 +204,7 @@ def test_request_failed(self, mrequest, gevent_sleep): @requests_mock.Mocker() @patch('gevent.sleep') - def test_request_failed_in_retry(self, mrequest, gevent_sleep): + def test_request_failed_item_status_change(self, mrequest, gevent_sleep): gevent_sleep.side_effect = custom_sleep doc_service_client = DocServiceClient(host='127.0.0.1', port='80', user='', password='') mrequest.post('{url}'.format(url=doc_service_client.url), @@ -206,6 +214,44 @@ def test_request_failed_in_retry(self, mrequest, gevent_sleep): 'title': file_name}}, status_code=200) client = MagicMock() + client._create_tender_resource_item.side_effect = [ResourceError(http_code=403), + ResourceError(http_code=403)] + tender_id = uuid.uuid4().hex + award_id = uuid.uuid4().hex + qualification_id = uuid.uuid4().hex + document_id = generate_doc_id() + keys = ['{}_{}'.format(tender_id, award_id), '{}_{}'.format(tender_id, qualification_id)] + processing_items = {keys[0]: 1, keys[1]: 1} + upload_to_doc_service_queue = Queue(10) + upload_to_tender_queue = Queue(10) + upload_to_doc_service_queue.put(Data(tender_id, award_id, '123', 'qualifications', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})) + upload_to_doc_service_queue.put(Data(tender_id, qualification_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})) + worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) + while (upload_to_doc_service_queue.qsize() or upload_to_tender_queue.qsize() or + worker.retry_upload_to_doc_service_queue.qsize() or worker.retry_upload_to_tender_queue.qsize()): + sleep(1) # sleep while at least one queue is not empty + worker.shutdown() + self.assertEqual(upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(mrequest.call_count, 2) + self.assertEqual(mrequest.request_history[0].url, u'127.0.0.1:80/upload') + self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) + self.assertEqual(processing_items, {}) + self.assertEqual(client._create_tender_resource_item.call_count, 2) # check that processed just 1 request + + @requests_mock.Mocker() + @patch('gevent.sleep') + def test_request_failed_in_retry(self, mrequest, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + doc_service_client = DocServiceClient(host='127.0.0.1', port='80', user='', password='') + mrequest.post('{url}'.format(url=doc_service_client.url), + json={'data': { + 'url': 'http://docs-sandbox.openprocurement.org/get/8ccbfde0c6804143b119d9168452cb6f', + 'format': 'application/yaml', + 'hash': 'md5:9a0364b9e99bb480dd25e1f0284c8555', + 'title': file_name}}, + status_code=200) + client = MagicMock() client._create_tender_resource_item.side_effect = [ResourceError(http_code=425), ResourceError(http_code=422), ResourceError(http_code=422), @@ -219,9 +265,13 @@ def test_request_failed_in_retry(self, mrequest, gevent_sleep): processing_items = {key: 1} upload_to_doc_service_queue = Queue(10) upload_to_tender_queue = Queue(10) - upload_to_doc_service_queue.put(Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})) - worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) - sleep(60) + upload_to_doc_service_queue.put( + Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})) + worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, + doc_service_client) + while (upload_to_doc_service_queue.qsize() or upload_to_tender_queue.qsize() or + worker.retry_upload_to_doc_service_queue.qsize() or worker.retry_upload_to_tender_queue.qsize()): + sleep(1) # sleep while at least one queue is not empty worker.shutdown() self.assertEqual(upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') self.assertEqual(upload_to_tender_queue.qsize(), 0, 'Queue should be empty') @@ -231,6 +281,53 @@ def test_request_failed_in_retry(self, mrequest, gevent_sleep): self.assertEqual(processing_items, {}) self.assertEqual(client._create_tender_resource_item.call_count, 6) # check that processed just 1 request + @requests_mock.Mocker() + @patch('gevent.sleep') + def test_request_failed_in_retry_item_status(self, mrequest, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + doc_service_client = DocServiceClient(host='127.0.0.1', port='80', user='', password='') + mrequest.post('{url}'.format(url=doc_service_client.url), + json={'data': {'url': 'http://docs-sandbox.openprocurement.org/get/8ccbfde0c6804143b119d9168452cb6f', + 'format': 'application/yaml', + 'hash': 'md5:9a0364b9e99bb480dd25e1f0284c8555', + 'title': file_name}}, + status_code=200) + client = MagicMock() + client._create_tender_resource_item.side_effect = [ResourceError(http_code=425), + ResourceError(http_code=403), + ResourceError(http_code=403), + ResourceError(http_code=403), + ResourceError(http_code=403), + ResourceError(http_code=403), + ResourceError(http_code=403), + ResourceError(http_code=403), + ResourceError(http_code=403), + ResourceError(http_code=403), + ResourceError(http_code=403), + ] + tender_id = uuid.uuid4().hex + award_id = uuid.uuid4().hex + qualification_id = uuid.uuid4().hex + document_id = generate_doc_id() + keys = ['{}_{}'.format(tender_id, award_id), '{}_{}'.format(tender_id, qualification_id)] + processing_items = {keys[0]: 1, keys[1]: 1} + upload_to_doc_service_queue = Queue(10) + upload_to_tender_queue = Queue(10) + upload_to_doc_service_queue.put(Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})) + upload_to_doc_service_queue.put(Data(tender_id, qualification_id, '123', 'qualifications', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})) + worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) + while (upload_to_doc_service_queue.qsize() or upload_to_tender_queue.qsize() or + worker.retry_upload_to_doc_service_queue.qsize() or worker.retry_upload_to_tender_queue.qsize()): + sleep(1) # sleep while at least one queue is not empty + worker.shutdown() + self.assertEqual(upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(mrequest.call_count, 2) + self.assertEqual(mrequest.request_history[0].url, u'127.0.0.1:80/upload') + self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) + self.assertEqual(processing_items, {}) + self.assertEqual(client._create_tender_resource_item.call_count, 7) # check that processed just 1 request + @requests_mock.Mocker() @patch('gevent.sleep') def test_processing_items(self, mrequest, gevent_sleep): @@ -268,7 +365,9 @@ def test_processing_items(self, mrequest, gevent_sleep): upload_to_doc_service_queue.put(Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})) worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) - sleep(10) + while (upload_to_doc_service_queue.qsize() or upload_to_tender_queue.qsize() or + worker.retry_upload_to_doc_service_queue.qsize() or worker.retry_upload_to_tender_queue.qsize()): + sleep(1) # sleep while at least one queue is not empty worker.shutdown() self.assertEqual(upload_to_tender_queue.qsize(), 0, 'Queue should be empty') self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) @@ -312,8 +411,10 @@ def test_upload_to_doc_service_queue_loop_exit(self, mrequest, gevent_sleep): processing_items = {key: 2} upload_to_doc_service_queue = MagicMock() upload_to_tender_queue = Queue(10) - upload_to_doc_service_queue.get.side_effect = generate_answers( - answers=[LoopExit(), Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'}), Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})], + upload_to_doc_service_queue.peek.side_effect = generate_answers( + answers=[LoopExit(), + Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'}), + Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})], default=LoopExit()) worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) sleep(10) @@ -347,7 +448,7 @@ def test_upload_to_tender_queue_loop_exit(self, mrequest, gevent_sleep): processing_items = {key: 2} upload_to_doc_service_queue = Queue(1) upload_to_tender_queue = MagicMock() - upload_to_tender_queue.get.side_effect = generate_answers( + upload_to_tender_queue.peek.side_effect = generate_answers( answers=[LoopExit(), Data(tender_id=tender_id, item_id=award_id, @@ -402,7 +503,7 @@ def test_retry_upload_to_tender_queue_loop_exit(self, mrequest, gevent_sleep): upload_to_tender_queue = Queue(1) worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) worker.retry_upload_to_tender_queue = MagicMock() - worker.retry_upload_to_tender_queue.get.side_effect = generate_answers( + worker.retry_upload_to_tender_queue.peek.side_effect = generate_answers( answers=[LoopExit(), Data(tender_id=tender_id, item_id=award_id, @@ -423,7 +524,6 @@ def test_retry_upload_to_tender_queue_loop_exit(self, mrequest, gevent_sleep): u'hash': u'md5:9a0364b9e99bb480dd25e1f0284c8555', u'title': file_name})], default=LoopExit()) - sleep(10) worker.shutdown() self.assertEqual(processing_items, {}) @@ -469,7 +569,7 @@ def test_retry_upload_to_doc_service_queue_loop_exit(self, mrequest, gevent_slee upload_to_doc_service_queue = Queue(1) worker = UploadFile.spawn(client, upload_to_doc_service_queue, upload_to_tender_queue, processing_items, doc_service_client) worker.retry_upload_to_doc_service_queue = MagicMock() - worker.retry_upload_to_doc_service_queue.get.side_effect = generate_answers( + worker.retry_upload_to_doc_service_queue.peek.side_effect = generate_answers( answers=[LoopExit(), Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'}), Data(tender_id, award_id, '123', 'awards', None, {'meta': {'id': document_id}, 'test_data': 'test_data'})],