Skip to content

Commit 16ef654

Browse files
[Logging] Create and add structured logs for Cleanup task (#4781)
### Description Adding structured logging to the cleanup cronjob. In order to enable this, the following changes on logs module were done: * Added a new log context for cronjobs. * Added a filter to the logs handler for both GAE/GKE to propagate the `extras` argument from python logging to the `json_fields` metadata in google cloud logging ([GCP reference](https://cloud.google.com/python/docs/reference/logging/latest/std-lib-integration#automatic-metadata-detection)). (Note that this is a workaround to enable the observability project for now. As a long-term solution, we should try to centralize the logs handlers/filters for all environments). For the cleanup module: * Set env vars for task name and ID in the cron entrypoints (for k8s and appengine). * Instrumented the cleanup code with the cron-based log context and with the testcase-based context. This latest is done for each testcase within the clean up unneeded open testcases method. ### Tests Running the cronjob locally with the debugger and sending logs to GCP. * Setting `IS_K8S_ENV` to true ([link to logs](https://cloudlogging.app.goo.gl/Us8aHBMUyNoaPFbL8)): ![image](https://github.com/user-attachments/assets/4d268895-804d-483d-a426-174a3c8b5f4b)
1 parent b9e07d0 commit 16ef654

File tree

5 files changed

+120
-49
lines changed

5 files changed

+120
-49
lines changed

src/appengine/libs/handler.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ def wrapper(self):
9797
if not self.is_cron():
9898
raise helpers.AccessDeniedError('You are not a cron.')
9999

100+
# Add env vars used by logs context for cleanup/triage.
101+
environment.set_task_id_vars(self.__module__)
102+
100103
with monitor.wrap_with_monitoring():
101104
result = func(self)
102105
if result is None:

src/clusterfuzz/_internal/cron/cleanup.py

Lines changed: 59 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,60 @@ def cleanup_reports_metadata():
100100
ndb_utils.delete_multi(uploaded_reports)
101101

102102

103+
def _cleanup_testcases_and_issues(testcase, jobs,
104+
top_crashes_by_project_and_platform_map,
105+
empty_issue_tracker_policy):
106+
"""Clean up unneeded open testcase and its associated issues."""
107+
testcase_id = testcase.key.id()
108+
logs.info(f'Processing testcase {testcase_id}.')
109+
110+
try:
111+
issue = issue_tracker_utils.get_issue_for_testcase(testcase)
112+
policy = issue_tracker_utils.get_issue_tracker_policy_for_testcase(testcase)
113+
if not policy:
114+
logs.info('No policy')
115+
policy = empty_issue_tracker_policy
116+
117+
# Issue updates.
118+
update_os_labels(policy, testcase, issue)
119+
logs.info('maybe updated os')
120+
update_fuzz_blocker_label(policy, testcase, issue,
121+
top_crashes_by_project_and_platform_map)
122+
logs.info('maybe updated fuzz blocker')
123+
update_component_labels_and_id(policy, testcase, issue)
124+
logs.info('maybe updated component labels and component id')
125+
update_issue_ccs_from_owners_file(policy, testcase, issue)
126+
logs.info('maybe updated issueccs')
127+
update_issue_owner_and_ccs_from_predator_results(policy, testcase, issue)
128+
logs.info('maybe updated update_issue_owner_and_ccs_from_predator_results')
129+
update_issue_labels_for_flaky_testcase(policy, testcase, issue)
130+
131+
# Testcase marking rules.
132+
mark_duplicate_testcase_as_closed_with_no_issue(testcase)
133+
mark_issue_as_closed_if_testcase_is_fixed(policy, testcase, issue)
134+
mark_testcase_as_closed_if_issue_is_closed(policy, testcase, issue)
135+
mark_testcase_as_closed_if_job_is_invalid(testcase, jobs)
136+
mark_unreproducible_testcase_as_fixed_if_issue_is_closed(testcase, issue)
137+
mark_unreproducible_testcase_and_issue_as_closed_after_deadline(
138+
policy, testcase, issue)
139+
mark_na_testcase_issues_as_wontfix(policy, testcase, issue)
140+
141+
# Notification, to be done at end after testcase state is updated from
142+
# previous rules.
143+
notify_closed_issue_if_testcase_is_open(policy, testcase, issue)
144+
notify_issue_if_testcase_is_invalid(policy, testcase, issue)
145+
notify_uploader_when_testcase_is_processed(policy, testcase, issue)
146+
147+
# Mark testcase as triage complete if both testcase and associated issue
148+
# are closed. This also need to be done before the deletion rules.
149+
mark_testcase_as_triaged_if_needed(testcase, issue)
150+
151+
# Testcase deletion rules.
152+
delete_unreproducible_testcase_with_no_issue(testcase)
153+
except Exception:
154+
logs.error(f'Failed to process testcase {testcase_id}.')
155+
156+
103157
def cleanup_testcases_and_issues():
104158
"""Clean up unneeded open testcases and their associated issues."""
105159
logs.info('Getting all job type names.')
@@ -125,55 +179,10 @@ def cleanup_testcases_and_issues():
125179
# Already deleted.
126180
continue
127181

128-
logs.info(f'Processing testcase {testcase_id}.')
129-
130-
try:
131-
issue = issue_tracker_utils.get_issue_for_testcase(testcase)
132-
policy = issue_tracker_utils.get_issue_tracker_policy_for_testcase(
133-
testcase)
134-
if not policy:
135-
logs.info('No policy')
136-
policy = empty_issue_tracker_policy
137-
138-
# Issue updates.
139-
update_os_labels(policy, testcase, issue)
140-
logs.info('maybe updated os')
141-
update_fuzz_blocker_label(policy, testcase, issue,
142-
top_crashes_by_project_and_platform_map)
143-
logs.info('maybe updated fuzz blocker')
144-
update_component_labels_and_id(policy, testcase, issue)
145-
logs.info('maybe updated component labels and component id')
146-
update_issue_ccs_from_owners_file(policy, testcase, issue)
147-
logs.info('maybe updated issueccs')
148-
update_issue_owner_and_ccs_from_predator_results(policy, testcase, issue)
149-
logs.info(
150-
'maybe updated update_issue_owner_and_ccs_from_predator_results')
151-
update_issue_labels_for_flaky_testcase(policy, testcase, issue)
152-
153-
# Testcase marking rules.
154-
mark_duplicate_testcase_as_closed_with_no_issue(testcase)
155-
mark_issue_as_closed_if_testcase_is_fixed(policy, testcase, issue)
156-
mark_testcase_as_closed_if_issue_is_closed(policy, testcase, issue)
157-
mark_testcase_as_closed_if_job_is_invalid(testcase, jobs)
158-
mark_unreproducible_testcase_as_fixed_if_issue_is_closed(testcase, issue)
159-
mark_unreproducible_testcase_and_issue_as_closed_after_deadline(
160-
policy, testcase, issue)
161-
mark_na_testcase_issues_as_wontfix(policy, testcase, issue)
162-
163-
# Notification, to be done at end after testcase state is updated from
164-
# previous rules.
165-
notify_closed_issue_if_testcase_is_open(policy, testcase, issue)
166-
notify_issue_if_testcase_is_invalid(policy, testcase, issue)
167-
notify_uploader_when_testcase_is_processed(policy, testcase, issue)
168-
169-
# Mark testcase as triage complete if both testcase and associated issue
170-
# are closed. This also need to be done before the deletion rules.
171-
mark_testcase_as_triaged_if_needed(testcase, issue)
172-
173-
# Testcase deletion rules.
174-
delete_unreproducible_testcase_with_no_issue(testcase)
175-
except Exception:
176-
logs.error(f'Failed to process testcase {testcase_id}.')
182+
with logs.testcase_log_context(testcase, testcase.get_fuzz_target()):
183+
_cleanup_testcases_and_issues(testcase, jobs,
184+
top_crashes_by_project_and_platform_map,
185+
empty_issue_tracker_policy)
177186

178187
testcases_processed += 1
179188
if testcases_processed % 100 == 0:
@@ -1406,6 +1415,7 @@ def cleanup_unused_heartbeats():
14061415
ndb_utils.delete_multi(unused_heartbeats)
14071416

14081417

1418+
@logs.cron_log_context()
14091419
def main():
14101420
"""Cleaning up unneeded testcases"""
14111421
cleanup_testcases_and_issues()

src/clusterfuzz/_internal/metrics/logs.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,23 @@ def uncaught_exception_handler(exception_type, exception_value,
325325
sys.__excepthook__(exception_type, exception_value, exception_traceback)
326326

327327

328+
def json_fields_filter(record):
329+
"""Add logs `extras` argument to `json_fields` metadata for cloud logging."""
330+
# TODO(vtcosta): This is a workaround to allow structured logs for
331+
# cleanup/triage cronjobs in GKE/GAE. We should try to refactor and
332+
# centralize the logs configurations for all environments.
333+
if not hasattr(record, 'json_fields'):
334+
record.json_fields = {}
335+
336+
record.json_fields.update({
337+
'extras': {
338+
k: truncate(v, STACKDRIVER_LOG_MESSAGE_LIMIT)
339+
for k, v in getattr(record, 'extras', {}).items()
340+
}
341+
})
342+
return True
343+
344+
328345
def configure_appengine():
329346
"""Configure logging for App Engine."""
330347
logging.getLogger().setLevel(logging.INFO)
@@ -335,6 +352,7 @@ def configure_appengine():
335352
import google.cloud.logging
336353
client = google.cloud.logging.Client()
337354
handler = client.get_default_handler()
355+
handler.addFilter(json_fields_filter)
338356
logging.getLogger().addHandler(handler)
339357

340358

@@ -374,6 +392,7 @@ def record_factory(*args, **kwargs):
374392
return record
375393

376394
logging.setLogRecordFactory(record_factory)
395+
logging.getLogger().addFilter(json_fields_filter)
377396
logging.getLogger().setLevel(logging.INFO)
378397

379398

@@ -695,6 +714,11 @@ class TaskLogStruct(NamedTuple):
695714
stage: str
696715

697716

717+
class CronLogStruct(NamedTuple):
718+
task_id: str
719+
task_name: str
720+
721+
698722
class FuzzerLogStruct(NamedTuple):
699723
fuzz_target: str
700724
job: str
@@ -715,6 +739,7 @@ class LogContextType(enum.Enum):
715739
TASK = 'task'
716740
FUZZER = 'fuzzer'
717741
TESTCASE = 'testcase'
742+
CRON = 'cron'
718743

719744
def get_extras(self) -> NamedTuple:
720745
"""Get the structured log for a given context"""
@@ -783,6 +808,17 @@ def get_extras(self) -> NamedTuple:
783808
ignore_context=True)
784809
return GenericLogStruct()
785810

811+
if self == LogContextType.CRON:
812+
try:
813+
return CronLogStruct(
814+
task_name=os.getenv('CF_TASK_NAME', 'null'),
815+
task_id=os.getenv('CF_TASK_ID', 'null'))
816+
except:
817+
error(
818+
'Error retrieving context for cron-based logs.',
819+
ignore_context=True)
820+
return GenericLogStruct()
821+
786822
return GenericLogStruct()
787823

788824

@@ -922,3 +958,13 @@ def testcase_log_context(testcase: 'Testcase',
922958
log_contexts.delete_metadata('fuzzer_name')
923959
log_contexts.delete_metadata('job_type')
924960
log_contexts.delete_metadata('fuzz_target')
961+
962+
963+
@contextlib.contextmanager
964+
def cron_log_context():
965+
with wrap_log_context(contexts=[LogContextType.CRON]):
966+
try:
967+
yield
968+
except Exception as e:
969+
warning(message='Error during cronjob context.')
970+
raise e

src/clusterfuzz/_internal/system/environment.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import socket
2121
import subprocess
2222
import sys
23+
import uuid
2324

2425
import yaml
2526

@@ -1014,6 +1015,14 @@ def set_tsan_max_history_size():
10141015
set_value('TSAN_OPTIONS', tsan_options)
10151016

10161017

1018+
def set_task_id_vars(task_name, task_id=None):
1019+
"""Sets environment vars for task name and ID."""
1020+
if not task_id:
1021+
task_id = uuid.uuid4()
1022+
set_value('CF_TASK_ID', task_id)
1023+
set_value('CF_TASK_NAME', task_name)
1024+
1025+
10171026
def set_value(environment_variable, value, env=None):
10181027
"""Set an environment variable."""
10191028
if env is None:

src/python/bot/startup/run_cron.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ def main():
5959
task = sys.argv[1]
6060

6161
task_module_name = f'clusterfuzz._internal.cron.{task}'
62+
63+
environment.set_task_id_vars(task)
64+
6265
with monitor.wrap_with_monitoring(), ndb_init.context():
6366
task_module = importlib.import_module(task_module_name)
6467
return 0 if task_module.main() else 1

0 commit comments

Comments
 (0)