Skip to content

Commit

Permalink
Adds logs and labels in auto-import jobs to help create log based met…
Browse files Browse the repository at this point in the history
…rics. (#1186)

* Adds logs and labels in auto-import jobs to help create log based metrics.
* For structured logs, replaced python logging with print
  • Loading branch information
rohitkumarbhagat authored Jan 20, 2025
1 parent 6430e49 commit f9811d5
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
10 changes: 6 additions & 4 deletions import-automation/executor/app/executor/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ def create_or_update_cloud_run_job(project_id: str, location: str, job_id: str,

res = run_v2.types.ResourceRequirements(limits=resources)
container = run_v2.Container(image=image, env=env, resources=res, args=args)
# Labels allow filtering of automated import cloud run jobs, used in log-based metrics.
exe_template = run_v2.ExecutionTemplate(
template=run_v2.TaskTemplate(containers=[container],
max_retries=2,
timeout=duration_pb2.Duration(
seconds=timeout)))
labels={"datacommons_cloud_run_job_type": "auto_import_job"},
template=run_v2.TaskTemplate(
containers=[container],
max_retries=2,
timeout=duration_pb2.Duration(seconds=timeout)))
new_job = run_v2.Job(template=exe_template)
logging.info(f"Creating job: {job_name}")

Expand Down
25 changes: 25 additions & 0 deletions import-automation/executor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
"""
import logging
import json
import os
import time

from absl import flags
from absl import app
Expand All @@ -30,11 +32,20 @@
flags.DEFINE_string('import_name', '', 'Absoluate import name.')
flags.DEFINE_string('import_config', '', 'Import executor configuration.')

CLOUD_RUN_JOB_NAME = os.getenv("CLOUD_RUN_JOB")
# The `log_type` label helps filter log lines, which is useful for creating
# log-based metrics. Each log type has a similar set of fields for easier parsing.
LOG_TYPE_LABEL = "log_type"
# log_type for capturing status of auto import cloud run jobs.
# Required fields - log_type, message, status, latency_secs.
AUTO_IMPORT_JOB_STATUS_LOG_TYPE = "auto-import-job-status"


def scheduled_updates(absolute_import_name: str, import_config: str):
"""
Invokes import update workflow.
"""
start_time = time.time()
logging.info(absolute_import_name)
cfg = json.loads(import_config)
config = configs.ExecutorConfig(**cfg)
Expand All @@ -53,6 +64,20 @@ def scheduled_updates(absolute_import_name: str, import_config: str):
local_repo_dir=config.local_repo_dir)
result = executor.execute_imports_on_update(absolute_import_name)
logging.info(result)
elapsed_time_secs = int(time.time() - start_time)
message = (f"Cloud Run Job [{CLOUD_RUN_JOB_NAME}] completed with status= "
f"[{result.status}] in [{elapsed_time_secs}] seconds.)")
# With Python logging lib, json is interpreted as text (populates textPayload field).
# Using print to populate json as structured logs (populate jsonPayload field).
# Ref: https://cloud.google.com/functions/docs/monitoring/logging#writing_structured_logs
print(
json.dumps({
LOG_TYPE_LABEL: AUTO_IMPORT_JOB_STATUS_LOG_TYPE,
"message": message,
"severity": "INFO" if result.status == 'succeeded' else "ERROR",
"status": result.status,
"latency_secs": elapsed_time_secs,
}))
if result.status == 'failed':
return 1
return 0
Expand Down

0 comments on commit f9811d5

Please sign in to comment.