From f9811d5e08808e43c5407dee3e13000cb05569a4 Mon Sep 17 00:00:00 2001 From: Rohit Kumar Date: Mon, 20 Jan 2025 23:20:05 +0530 Subject: [PATCH] Adds logs and labels in auto-import jobs to help create log based metrics. (#1186) * Adds logs and labels in auto-import jobs to help create log based metrics. * For structured logs, replaced python logging with print --- .../executor/app/executor/cloud_run.py | 10 +++++--- import-automation/executor/main.py | 25 +++++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/import-automation/executor/app/executor/cloud_run.py b/import-automation/executor/app/executor/cloud_run.py index 5d3fae30d5..0452d3cd21 100644 --- a/import-automation/executor/app/executor/cloud_run.py +++ b/import-automation/executor/app/executor/cloud_run.py @@ -61,11 +61,13 @@ def create_or_update_cloud_run_job(project_id: str, location: str, job_id: str, res = run_v2.types.ResourceRequirements(limits=resources) container = run_v2.Container(image=image, env=env, resources=res, args=args) + # Labels allow filtering of automated import cloud run jobs, used in log-based metrics. exe_template = run_v2.ExecutionTemplate( - template=run_v2.TaskTemplate(containers=[container], - max_retries=2, - timeout=duration_pb2.Duration( - seconds=timeout))) + labels={"datacommons_cloud_run_job_type": "auto_import_job"}, + template=run_v2.TaskTemplate( + containers=[container], + max_retries=2, + timeout=duration_pb2.Duration(seconds=timeout))) new_job = run_v2.Job(template=exe_template) logging.info(f"Creating job: {job_name}") diff --git a/import-automation/executor/main.py b/import-automation/executor/main.py index 8f22945ccb..696c27f25c 100644 --- a/import-automation/executor/main.py +++ b/import-automation/executor/main.py @@ -16,6 +16,8 @@ """ import logging import json +import os +import time from absl import flags from absl import app @@ -30,11 +32,20 @@ flags.DEFINE_string('import_name', '', 'Absoluate import name.') flags.DEFINE_string('import_config', '', 'Import executor configuration.') +CLOUD_RUN_JOB_NAME = os.getenv("CLOUD_RUN_JOB") +# The `log_type` label helps filter log lines, which is useful for creating +# log-based metrics. Each log type has a similar set of fields for easier parsing. +LOG_TYPE_LABEL = "log_type" +# log_type for capturing status of auto import cloud run jobs. +# Required fields - log_type, message, status, latency_secs. +AUTO_IMPORT_JOB_STATUS_LOG_TYPE = "auto-import-job-status" + def scheduled_updates(absolute_import_name: str, import_config: str): """ Invokes import update workflow. """ + start_time = time.time() logging.info(absolute_import_name) cfg = json.loads(import_config) config = configs.ExecutorConfig(**cfg) @@ -53,6 +64,20 @@ def scheduled_updates(absolute_import_name: str, import_config: str): local_repo_dir=config.local_repo_dir) result = executor.execute_imports_on_update(absolute_import_name) logging.info(result) + elapsed_time_secs = int(time.time() - start_time) + message = (f"Cloud Run Job [{CLOUD_RUN_JOB_NAME}] completed with status= " + f"[{result.status}] in [{elapsed_time_secs}] seconds.)") + # With Python logging lib, json is interpreted as text (populates textPayload field). + # Using print to populate json as structured logs (populate jsonPayload field). + # Ref: https://cloud.google.com/functions/docs/monitoring/logging#writing_structured_logs + print( + json.dumps({ + LOG_TYPE_LABEL: AUTO_IMPORT_JOB_STATUS_LOG_TYPE, + "message": message, + "severity": "INFO" if result.status == 'succeeded' else "ERROR", + "status": result.status, + "latency_secs": elapsed_time_secs, + })) if result.status == 'failed': return 1 return 0