diff --git a/container/wsgi.py b/container/wsgi.py index 7f4ea3f..2c4ac68 100644 --- a/container/wsgi.py +++ b/container/wsgi.py @@ -21,6 +21,9 @@ import sys from prometheus_flask_exporter.multiprocess import GunicornPrometheusMetrics +from werkzeug.middleware.dispatcher import DispatcherMiddleware +from prometheus_client import make_wsgi_app, CollectorRegistry +from prometheus_client.core import REGISTRY from dmci.api import App from dmci import CONFIG @@ -29,4 +32,8 @@ sys.exit(1) app = App() +REGISTRY.register(CollectorRegistry()) +app.wsgi_app = DispatcherMiddleware(app.wsgi_app, { + '/metrics': make_wsgi_app(REGISTRY) +}) GunicornPrometheusMetrics(app) diff --git a/dmci/api/app.py b/dmci/api/app.py index 8ea729b..929b014 100644 --- a/dmci/api/app.py +++ b/dmci/api/app.py @@ -28,11 +28,16 @@ import dmci from dmci.api.worker import Worker +from prometheus_client import Counter logger = logging.getLogger(__name__) OK_RETURN = "Everything is OK" +FILE_DIST_FAIL = Counter("failed_file_dist", "Number of failed file_dist", ["path"]) +CSW_DIST_FAIL = Counter("failed_csw_dist", "Number of failed csw_dist", ["path"]) +SOLR_DIST_FAIL = Counter("failed_solr_dist", "Number of failed solr_dist", ["path"]) + class App(Flask): @@ -54,11 +59,11 @@ def __init__(self): # Create the XML Validator Object try: - self._xsd_obj = etree.XMLSchema( - etree.parse(self._conf.mmd_xsd_path)) + self._xsd_obj = etree.XMLSchema(etree.parse(self._conf.mmd_xsd_path)) except Exception as e: - logger.critical("XML Schema could not be parsed: %s" % - str(self._conf.mmd_xsd_path)) + logger.critical( + "XML Schema could not be parsed: %s" % str(self._conf.mmd_xsd_path) + ) logger.critical(str(e)) sys.exit(1) @@ -66,27 +71,49 @@ def __init__(self): @self.route("/v1/create", methods=["POST"]) @self.route("/v1/insert", methods=["POST"]) def post_insert(): - msg, code = self._insert_update_method_post("insert", request) + msg, code, failed = self._insert_update_method_post("insert", request) + logger.info(f"failed {failed}") + if failed: + if "file" in failed: + FILE_DIST_FAIL.labels(path=request.path).inc() + if "csw" in failed: + CSW_DIST_FAIL.labels(path=request.path).inc() + if "solr" in failed: + SOLR_DIST_FAIL.labels(path=request.path).inc() return self._formatMsgReturn(msg), code @self.route("/v1/update", methods=["POST"]) def post_update(): - msg, code = self._insert_update_method_post("update", request) + msg, code, failed = self._insert_update_method_post("update", request) + logger.info(f"failed {failed}") + if failed: + if "file" in failed: + FILE_DIST_FAIL.labels(path=request.path).inc() + if "csw" in failed: + CSW_DIST_FAIL.labels(path=request.path).inc() + if "solr" in failed: + SOLR_DIST_FAIL.labels(path=request.path).inc() return self._formatMsgReturn(msg), code @self.route("/v1/delete/", methods=["POST"]) def post_delete(metadata_id=None): """Process delete command.""" - md_namespace, md_uuid, err = self._check_metadata_id(metadata_id, - self._conf.env_string) + md_namespace, md_uuid, err = self._check_metadata_id( + metadata_id, self._conf.env_string + ) if err is not None: logger.error(err) if md_uuid is not None: - worker = Worker("delete", None, self._xsd_obj, - md_uuid=md_uuid, md_namespace=md_namespace) - err = self._distributor_wrapper(worker) + worker = Worker( + "delete", + None, + self._xsd_obj, + md_uuid=md_uuid, + md_namespace=md_namespace, + ) + err, failed = self._distributor_wrapper(worker) else: return self._formatMsgReturn(err), 400 @@ -121,68 +148,80 @@ def _formatMsgReturn(self, msg): def _insert_update_method_post(self, cmd, request): """Process insert or update command requests.""" if request.content_length is None: - return "There is no data sent to the api", 202 + return "There is no data sent to the api", 202, None if request.content_length > self._conf.max_permitted_size: - return f"The file is larger than maximum size: {self._conf.max_permitted_size}", 413 + return ( + f"The file is larger than maximum size: {self._conf.max_permitted_size}", + 413, + None, + ) data = request.get_data() # Cache the job file file_uuid = uuid.uuid4() - full_path = os.path.join( - self._conf.distributor_cache, f"{file_uuid}.xml") - reject_path = os.path.join( - self._conf.rejected_jobs_path, f"{file_uuid}.xml") + full_path = os.path.join(self._conf.distributor_cache, f"{file_uuid}.xml") + reject_path = os.path.join(self._conf.rejected_jobs_path, f"{file_uuid}.xml") msg, code = self._persist_file(data, full_path) if code != 200: - return msg, code + return msg, code, None # Run the validator - worker = Worker(cmd, full_path, self._xsd_obj, - path_to_parent_list=self._conf.path_to_parent_list) + worker = Worker( + cmd, + full_path, + self._xsd_obj, + path_to_parent_list=self._conf.path_to_parent_list, + ) valid, msg, data_ = worker.validate(data) if not valid: msg += f"\n Rejected persistent file : {file_uuid}.xml \n " self._handle_persist_file(False, full_path, reject_path, msg) - return msg, 400 + return msg, 400, None # Check if the data from the request was modified in worker.validate(). # If so we will need to write the modified data to disk. if not data == data_: msg, code = self._persist_file(data_, full_path) if code != 200: - return msg, code + return msg, code, None # Run the distributors - err = self._distributor_wrapper(worker) + err, failed = self._distributor_wrapper(worker) if err: msg = "\n".join(err) self._handle_persist_file(False, full_path, reject_path, msg) - return msg, 500 + return msg, 500, failed else: self._handle_persist_file(True, full_path) - return OK_RETURN, 200 + return OK_RETURN, 200, None def _validate_method_post(self, request): """Only run the validator for submitted file.""" if request.content_length > self._conf.max_permitted_size: - return f"The file is larger than maximum size: {self._conf.max_permitted_size}", 413 + return ( + f"The file is larger than maximum size: {self._conf.max_permitted_size}", + 413, + ) data = request.get_data() # Cache the job file file_uuid = uuid.uuid4() - full_path = os.path.join( - self._conf.distributor_cache, f"{file_uuid}.xml") + full_path = os.path.join(self._conf.distributor_cache, f"{file_uuid}.xml") msg, code = self._persist_file(data, full_path) if code != 200: self._handle_persist_file(True, full_path) return msg, code # Run the validator - worker = Worker("none", full_path, self._xsd_obj, - path_to_parent_list=self._conf.path_to_parent_list) + worker = Worker( + "none", + full_path, + self._xsd_obj, + path_to_parent_list=self._conf.path_to_parent_list, + ) valid, msg, data = worker.validate(data) self._handle_persist_file(True, full_path) if valid: @@ -197,20 +236,18 @@ def _distributor_wrapper(self, worker): err = [] status, valid, _, failed, skipped, failed_msg = worker.distribute() if not status: - err.append("The following distributors failed: %s" % - ", ".join(failed)) + err.append("The following distributors failed: %s" % ", ".join(failed)) for name, reason in zip(failed, failed_msg): err.append(" - %s: %s" % (name, reason)) if not valid: - err.append("The following jobs were skipped: %s" % - ", ".join(skipped)) + err.append("The following jobs were skipped: %s" % ", ".join(skipped)) - return err + return err, failed @staticmethod def _check_metadata_id(metadata_id, env_string=None): - """ Check that the metadata_id is structured as + """Check that the metadata_id is structured as namespace:UUID, that the uuid part is of type UUID, and that the namespace is correct. """ @@ -273,7 +310,8 @@ def _handle_persist_file(status, full_path, reject_path=None, reject_reason=""): except shutil.SameFileError as e: logger.error( "Source and destination represents the same file. %s -> %s" - % (full_path, reject_path)) + % (full_path, reject_path) + ) logger.error(str(e)) return False @@ -297,8 +335,7 @@ def _handle_persist_file(status, full_path, reject_path=None, reject_reason=""): with open(reason_path, mode="w", encoding="utf-8") as ofile: ofile.write(reject_reason) except Exception as e: - logger.error( - "Failed to write rejected reason to file: %s", reason_path) + logger.error("Failed to write rejected reason to file: %s", reason_path) logger.error(str(e)) return False @@ -317,4 +354,5 @@ def _persist_file(data, full_path): return OK_RETURN, 200 + # END Class App diff --git a/tests/test_api/test_app.py b/tests/test_api/test_app.py index 3f3460f..67e1131 100644 --- a/tests/test_api/test_app.py +++ b/tests/test_api/test_app.py @@ -30,6 +30,8 @@ from tools import causeSameFileError from tools import causeShUtilError +from prometheus_client import REGISTRY + from dmci.api import App MOCK_XML = b"" @@ -88,6 +90,7 @@ def testApiApp_Init(tmpConf, tmpDir, monkeypatch): with pytest.raises(SystemExit) as sysExit: App() + # END Test testApiApp_Init @@ -105,6 +108,7 @@ def testApiApp_EndPoints(client): # Bare delete command is not allowed assert client.post("/v1/delete").status_code == 404 + # END Test testApiApp_EndPoints @@ -132,6 +136,7 @@ def testApiApp_EndPoints_Exception(tmpDir, tmpConf, mockXsd, monkeypatch): with pytest.raises(SystemExit): App() + # END Test testApiApp_EndPoints_Exception @@ -170,14 +175,19 @@ def testApiApp_InsertUpdateRequests(client, monkeypatch): # first _persist_file fails with monkeypatch.context() as mp: mp.setattr("dmci.api.app.Worker.validate", lambda *a: (True, "", MOCK_XML)) - mp.setattr("dmci.api.app.App._persist_file", lambda *a: ("Failed to write the file", 666)) + mp.setattr( + "dmci.api.app.App._persist_file", + lambda *a: ("Failed to write the file", 666), + ) assert client.post("/v1/insert", data=MOCK_XML).status_code == 666 assert client.post("/v1/update", data=MOCK_XML).status_code == 666 # first _persist_file works with monkeypatch.context() as mp: mp.setattr("dmci.api.app.Worker.validate", lambda *a: (True, "", MOCK_XML)) - mp.setattr("dmci.api.app.App._persist_file", lambda *a: ("Everything is OK", 200)) + mp.setattr( + "dmci.api.app.App._persist_file", lambda *a: ("Everything is OK", 200) + ) assert client.post("/v1/insert", data=MOCK_XML).status_code == 200 assert client.post("/v1/update", data=MOCK_XML).status_code == 200 @@ -193,7 +203,9 @@ def testApiApp_InsertUpdateRequests(client, monkeypatch): s = ["C"] e = ["Reason A", "Reason B"] mp.setattr("dmci.api.app.Worker.validate", lambda *a: (True, "", MOCK_XML)) - mp.setattr("dmci.api.app.Worker.distribute", lambda *a: (False, False, [], f, s, e)) + mp.setattr( + "dmci.api.app.Worker.distribute", lambda *a: (False, False, [], f, s, e) + ) response = client.post("/v1/insert", data=MOCK_XML) assert response.status_code == 500 @@ -213,24 +225,70 @@ def testApiApp_InsertUpdateRequests(client, monkeypatch): b"The following jobs were skipped: C\n" ) + # Distribution fails, metrics are incremented + with monkeypatch.context() as mp: + f = ["file", "solr", "csw"] + s = ["C"] + e = ["Reason A", "Reason B", "Reason C"] + mp.setattr("dmci.api.app.Worker.validate", lambda *a: (True, "", MOCK_XML)) + mp.setattr( + "dmci.api.app.Worker.distribute", lambda *a: (False, False, [], f, s, e) + ) + + response = client.post("/v1/insert", data=MOCK_XML) + after_file = REGISTRY.get_sample_value( + "failed_file_dist_total", {"path": "/v1/insert"} + ) + after_csw = REGISTRY.get_sample_value( + "failed_csw_dist_total", {"path": "/v1/insert"} + ) + after_solr = REGISTRY.get_sample_value( + "failed_solr_dist_total", {"path": "/v1/insert"} + ) + assert after_file == 1 + assert after_csw == 1 + assert after_solr == 1 + + response = client.post("/v1/update", data=MOCK_XML) + after_file = REGISTRY.get_sample_value( + "failed_file_dist_total", {"path": "/v1/update"} + ) + after_csw = REGISTRY.get_sample_value( + "failed_csw_dist_total", {"path": "/v1/update"} + ) + after_solr = REGISTRY.get_sample_value( + "failed_solr_dist_total", {"path": "/v1/update"} + ) + assert after_file == 1 + assert after_csw == 1 + assert after_solr == 1 + # Data is valid, distribute OK. with monkeypatch.context() as mp: mp.setattr("dmci.api.app.Worker.validate", lambda *a: (True, "", MOCK_XML)) - mp.setattr("dmci.api.app.Worker.distribute", lambda *a: ( - True, True, [], [], [], [])) + mp.setattr( + "dmci.api.app.Worker.distribute", lambda *a: (True, True, [], [], [], []) + ) response = client.post("/v1/insert", data=MOCK_XML) assert response.status_code == 200 assert response.data == (b"Everything is OK\n") + # END Test testApiApp_InsertUpdateRequests @pytest.mark.api def testApiApp_PersistAgainAfterModification(client, monkeypatch): - outputs = iter([("Everything is OK", 200), ("Failure in persisting", 666), - ("Everything is OK", 200), ("Failure in persisting", 666)]) + outputs = iter( + [ + ("Everything is OK", 200), + ("Failure in persisting", 666), + ("Everything is OK", 200), + ("Failure in persisting", 666), + ] + ) @staticmethod def fake_output(data, full_path): @@ -243,6 +301,7 @@ def fake_output(data, full_path): assert client.post("/v1/insert", data=MOCK_XML).status_code == 666 assert client.post("/v1/update", data=MOCK_XML).status_code == 666 + # END Test testApiApp_PersistAgainAfterModification @@ -262,7 +321,9 @@ def testApiApp_DeleteRequests(client, monkeypatch): f = ["A", "B"] s = ["C"] e = ["Reason A", "Reason B"] - mp.setattr("dmci.api.app.Worker.distribute", lambda *a: (False, False, [], f, s, e)) + mp.setattr( + "dmci.api.app.Worker.distribute", lambda *a: (False, False, [], f, s, e) + ) response = client.post("/v1/delete/%s" % testUUID, data=MOCK_XML) assert response.status_code == 500 @@ -275,11 +336,14 @@ def testApiApp_DeleteRequests(client, monkeypatch): # Distribute ok with monkeypatch.context() as mp: - mp.setattr("dmci.api.app.Worker.distribute", lambda *a: (True, True, [], [], [], [])) + mp.setattr( + "dmci.api.app.Worker.distribute", lambda *a: (True, True, [], [], [], []) + ) response = client.post("/v1/delete/%s" % testUUID, data=MOCK_XML) assert response.status_code == 200 assert response.data == b"Everything is OK\n" + # END Test testApiApp_DeleteRequests @@ -307,6 +371,7 @@ def testApiApp_ValidateRequests(client, monkeypatch): mp.setattr("dmci.api.app.Worker.validate", lambda *a: (False, "", MOCK_XML)) assert client.post("/v1/validate", data=MOCK_XML).status_code == 400 + # END Test testApiApp_ValidateRequests @@ -325,6 +390,7 @@ def testApiApp_PersistFile(tmpDir, monkeypatch): assert App._persist_file(MOCK_XML, outFile)[1] == 200 assert os.path.isfile(outFile) + # END Test testApiApp_PersistFile @@ -334,21 +400,27 @@ def testApiApp_CheckMetadataId(): correct_UUID = uuid.UUID(testUUID) # Correct with namespace - assert App._check_metadata_id("test:"+testUUID) == ("test", correct_UUID, None) + assert App._check_metadata_id("test:" + testUUID) == ("test", correct_UUID, None) # Without namespace - assert App._check_metadata_id(testUUID) == (None, None, - "Input must be structured as :.") + assert App._check_metadata_id(testUUID) == ( + None, + None, + "Input must be structured as :.", + ) # With namespace, but not in accordance with the defined env_string - assert App._check_metadata_id("test:"+testUUID, env_string="TEST") == (None, None, - "Dataset metadata_id " - "namespace is wrong: " - "test") + assert App._check_metadata_id("test:" + testUUID, env_string="TEST") == ( + None, + None, + "Dataset metadata_id " "namespace is wrong: " "test", + ) # With namespace, defined env_string, but present in call - assert App._check_metadata_id("test.TEST:"+testUUID, env_string="TEST") == ("test.TEST", - correct_UUID, - None) + assert App._check_metadata_id("test.TEST:" + testUUID, env_string="TEST") == ( + "test.TEST", + correct_UUID, + None, + ) # Test with namespace, but malformed UUID out = App._check_metadata_id("test:blabla") @@ -429,7 +501,7 @@ def testApiApp_HandlePersistFile(caplog, fncDir, monkeypatch): @pytest.mark.api def testApiApp_HandlePersistFile_fail2write_reason(caplog, fncDir, monkeypatch): - """ Test that _handle_persist_file catches the error if it fails + """Test that _handle_persist_file catches the error if it fails to open the reject file (that should provide the reason for failing to write the persist file to the rejected folder). """ @@ -457,7 +529,11 @@ def patched_open(*args, **kwargs): with monkeypatch.context() as mp: mp.setattr("builtins.open", patched_open) - assert App._handle_persist_file(False, full_path, reject_path, "Some reason") is False + assert ( + App._handle_persist_file(False, full_path, reject_path, "Some reason") + is False + ) assert "Failed to write rejected reason to file" in caplog.text + # END Test testApiApp_HandlePersistFile