Skip to content

Commit

Permalink
custom metrics to flask exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
charlienegri committed Oct 7, 2024
1 parent 08ed25e commit 126defa
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 49 deletions.
7 changes: 7 additions & 0 deletions container/wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import sys

from prometheus_flask_exporter.multiprocess import GunicornPrometheusMetrics
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from prometheus_client import make_wsgi_app, CollectorRegistry
from prometheus_client.core import REGISTRY

from dmci.api import App
from dmci import CONFIG
Expand All @@ -29,4 +32,8 @@
sys.exit(1)

app = App()
REGISTRY.register(CollectorRegistry())
app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {
'/metrics': make_wsgi_app(REGISTRY)
})
GunicornPrometheusMetrics(app)
94 changes: 66 additions & 28 deletions dmci/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@

import dmci
from dmci.api.worker import Worker
from prometheus_client import Counter

logger = logging.getLogger(__name__)

OK_RETURN = "Everything is OK"

FILE_DIST_FAIL = Counter("failed_file_dist", "Number of failed file_dist", ["path"])
CSW_DIST_FAIL = Counter("failed_csw_dist", "Number of failed csw_dist", ["path"])
SOLR_DIST_FAIL = Counter("failed_solr_dist", "Number of failed solr_dist", ["path"])


class App(Flask):

Expand All @@ -54,11 +59,11 @@ def __init__(self):

# Create the XML Validator Object
try:
self._xsd_obj = etree.XMLSchema(
etree.parse(self._conf.mmd_xsd_path))
self._xsd_obj = etree.XMLSchema(etree.parse(self._conf.mmd_xsd_path))
except Exception as e:
logger.critical("XML Schema could not be parsed: %s" %
str(self._conf.mmd_xsd_path))
logger.critical(
"XML Schema could not be parsed: %s" % str(self._conf.mmd_xsd_path)
)
logger.critical(str(e))
sys.exit(1)

Expand All @@ -67,25 +72,47 @@ def __init__(self):
@self.route("/v1/insert", methods=["POST"])
def post_insert():
msg, code, failed = self._insert_update_method_post("insert", request)
logger.info(f"failed {failed}")
if failed:
if "file" in failed:
FILE_DIST_FAIL.labels(path=request.path).inc()
if "csw" in failed:
CSW_DIST_FAIL.labels(path=request.path).inc()
if "solr" in failed:
SOLR_DIST_FAIL.labels(path=request.path).inc()
return self._formatMsgReturn(msg), code

@self.route("/v1/update", methods=["POST"])
def post_update():
msg, code, failed = self._insert_update_method_post("update", request)
logger.info(f"failed {failed}")
if failed:
if "file" in failed:
FILE_DIST_FAIL.labels(path=request.path).inc()
if "csw" in failed:
CSW_DIST_FAIL.labels(path=request.path).inc()
if "solr" in failed:
SOLR_DIST_FAIL.labels(path=request.path).inc()
return self._formatMsgReturn(msg), code

@self.route("/v1/delete/<metadata_id>", methods=["POST"])
def post_delete(metadata_id=None):
"""Process delete command."""
md_namespace, md_uuid, err = self._check_metadata_id(metadata_id,
self._conf.env_string)
md_namespace, md_uuid, err = self._check_metadata_id(
metadata_id, self._conf.env_string
)

if err is not None:
logger.error(err)

if md_uuid is not None:
worker = Worker("delete", None, self._xsd_obj,
md_uuid=md_uuid, md_namespace=md_namespace)
worker = Worker(
"delete",
None,
self._xsd_obj,
md_uuid=md_uuid,
md_namespace=md_namespace,
)
err, failed = self._distributor_wrapper(worker)
else:
return self._formatMsgReturn(err), 400
Expand Down Expand Up @@ -123,23 +150,29 @@ def _insert_update_method_post(self, cmd, request):
if request.content_length is None:
return "There is no data sent to the api", 202, None
if request.content_length > self._conf.max_permitted_size:
return f"The file is larger than maximum size: {self._conf.max_permitted_size}", 413, None
return (
f"The file is larger than maximum size: {self._conf.max_permitted_size}",
413,
None,
)

data = request.get_data()

# Cache the job file
file_uuid = uuid.uuid4()
full_path = os.path.join(
self._conf.distributor_cache, f"{file_uuid}.xml")
reject_path = os.path.join(
self._conf.rejected_jobs_path, f"{file_uuid}.xml")
full_path = os.path.join(self._conf.distributor_cache, f"{file_uuid}.xml")
reject_path = os.path.join(self._conf.rejected_jobs_path, f"{file_uuid}.xml")
msg, code = self._persist_file(data, full_path)
if code != 200:
return msg, code, None

# Run the validator
worker = Worker(cmd, full_path, self._xsd_obj,
path_to_parent_list=self._conf.path_to_parent_list)
worker = Worker(
cmd,
full_path,
self._xsd_obj,
path_to_parent_list=self._conf.path_to_parent_list,
)
valid, msg, data_ = worker.validate(data)
if not valid:
msg += f"\n Rejected persistent file : {file_uuid}.xml \n "
Expand Down Expand Up @@ -167,22 +200,28 @@ def _insert_update_method_post(self, cmd, request):
def _validate_method_post(self, request):
"""Only run the validator for submitted file."""
if request.content_length > self._conf.max_permitted_size:
return f"The file is larger than maximum size: {self._conf.max_permitted_size}", 413
return (
f"The file is larger than maximum size: {self._conf.max_permitted_size}",
413,
)

data = request.get_data()

# Cache the job file
file_uuid = uuid.uuid4()
full_path = os.path.join(
self._conf.distributor_cache, f"{file_uuid}.xml")
full_path = os.path.join(self._conf.distributor_cache, f"{file_uuid}.xml")
msg, code = self._persist_file(data, full_path)
if code != 200:
self._handle_persist_file(True, full_path)
return msg, code

# Run the validator
worker = Worker("none", full_path, self._xsd_obj,
path_to_parent_list=self._conf.path_to_parent_list)
worker = Worker(
"none",
full_path,
self._xsd_obj,
path_to_parent_list=self._conf.path_to_parent_list,
)
valid, msg, data = worker.validate(data)
self._handle_persist_file(True, full_path)
if valid:
Expand All @@ -197,20 +236,18 @@ def _distributor_wrapper(self, worker):
err = []
status, valid, _, failed, skipped, failed_msg = worker.distribute()
if not status:
err.append("The following distributors failed: %s" %
", ".join(failed))
err.append("The following distributors failed: %s" % ", ".join(failed))
for name, reason in zip(failed, failed_msg):
err.append(" - %s: %s" % (name, reason))

if not valid:
err.append("The following jobs were skipped: %s" %
", ".join(skipped))
err.append("The following jobs were skipped: %s" % ", ".join(skipped))

return err, failed

@staticmethod
def _check_metadata_id(metadata_id, env_string=None):
""" Check that the metadata_id is structured as
"""Check that the metadata_id is structured as
namespace:UUID, that the uuid part is of type UUID, and that
the namespace is correct.
"""
Expand Down Expand Up @@ -273,7 +310,8 @@ def _handle_persist_file(status, full_path, reject_path=None, reject_reason=""):
except shutil.SameFileError as e:
logger.error(
"Source and destination represents the same file. %s -> %s"
% (full_path, reject_path))
% (full_path, reject_path)
)
logger.error(str(e))
return False

Expand All @@ -297,8 +335,7 @@ def _handle_persist_file(status, full_path, reject_path=None, reject_reason=""):
with open(reason_path, mode="w", encoding="utf-8") as ofile:
ofile.write(reject_reason)
except Exception as e:
logger.error(
"Failed to write rejected reason to file: %s", reason_path)
logger.error("Failed to write rejected reason to file: %s", reason_path)
logger.error(str(e))
return False

Expand All @@ -317,4 +354,5 @@ def _persist_file(data, full_path):

return OK_RETURN, 200


# END Class App
Loading

0 comments on commit 126defa

Please sign in to comment.