-
Notifications
You must be signed in to change notification settings - Fork 57
/
Copy pathping_expiry_alert.py
336 lines (273 loc) · 11.6 KB
/
ping_expiry_alert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import argparse
import re
import sys
from collections import defaultdict
from datetime import date, datetime
from typing import Any, Dict, List, Tuple
import requests
from google.cloud import bigquery
from probe_scraper.emailer import send_ses
from probe_scraper.parsers.utils import HTTP_HEADERS
EMAIL_SUBJECT_TEMPLATE = "Glean Pings Expiring for {app_name}"
EMAIL_TEMPLATE = """
The following BigQuery tables are set to start expiring collected data soon based on their retention policies.
Note that this expiration will only delete data from the raw telemetry ("ping") tables listed below.
Aggregated/analytics tables derived from the telemetry data, such as "clients_daily" and "clients_last_seen" tables, will not be affected and will continue to retain all historical data.
{tables}
What to do about this:
1. If this is expected and you do not need data for any of these tables past the listed dates, then no action is needed.
2. If you wish to continue collecting data for a longer period of time for any of these tables, please file Jira ticket for data engineering, requesting the changes to retention settings at https://mozilla-hub.atlassian.net/secure/CreateIssue.jspa?issuetype=10007&pid=10056
Requests should be triaged at least weekly but if there's urgency or if you have any questions, please ask on the #data-help Slack channel. We'll give you a hand.
Retention policies are defined in probe-scraper [1], with options to either stop collecting data after a certain or delete data older than the specified number of days.
Your Friendly Neighborhood Data Team
[1] - https://github.com/mozilla/probe-scraper/blob/main/repositories.yaml
This is an automated message sent from probe-scraper. See https://github.com/mozilla/probe-scraper for details.
""" # noqa
APP_GROUP_TEMPLATE = """
{app_name}:
{messages}
"""
RETENTION_DAYS_MESSAGE_TEMPLATE = '\t- The "{table_name}" table for will start expiring data older than {retention_days} days starting on {expiry_date} ({num_weeks} week{plural_weeks} from now)' # noqa
# emails in this list will receive alerts for all pings
DEFAULT_EMAILS = ["[email protected]", "[email protected]"]
NOTIFICATION_DAYS_MAX = 25
NOTIFICATION_DAYS_MIN = 5
EXPIRATIONS_QUERY_TEMPLATE = """
WITH actual_expiration_days AS (
SELECT
table_schema AS dataset_id,
table_name AS table_id,
CAST(REGEXP_EXTRACT(option_value, "^[0-9]+") AS INT) AS actual_partition_expiration_days,
FROM
`moz-fx-data-shared-prod.region-us.INFORMATION_SCHEMA.TABLE_OPTIONS`
WHERE
option_name = "partition_expiration_days"
)
SELECT
project_id,
dataset_id,
ARRAY_AGG(
STRUCT(
table_id, partition_expiration_days,
actual_partition_expiration_days,
next_deletion_date,
expiration_changed
) ORDER BY table_id
) AS tables,
FROM
`moz-fx-data-shared-prod.monitoring_derived.table_partition_expirations_v1`
FULL JOIN
actual_expiration_days
USING
(dataset_id, table_id)
WHERE
run_date = "{run_date}"
AND project_id = "{project}"
AND ENDS_WITH(dataset_id, "_stable")
GROUP BY
project_id,
dataset_id
ORDER BY
dataset_id
"""
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--run-date",
"--run_date",
type=date.fromisoformat,
required=True,
help="The date to use to check for expiring pings",
)
parser.add_argument(
"--dry-run",
"--dry_run",
"--dryrun",
action="store_true",
help="Whether emails should be sent, used for testing",
)
parser.add_argument(
"--bigquery-project",
"--bigquery_project",
default="moz-fx-data-shared-prod",
help="Bigquery project in which ping tables are located",
)
return parser.parse_args()
def request_get(url: str) -> Dict:
response = requests.get(url, headers=HTTP_HEADERS)
response.raise_for_status()
return response.json()
def send_emails(messages_by_email: Dict[str, Dict[str, List[str]]], dry_run: bool):
for email, messages_by_app in messages_by_email.items():
combined_messages = [
APP_GROUP_TEMPLATE.format(app_name=app, messages="\n".join(messages))
for app, messages in messages_by_app.items()
]
email_body = EMAIL_TEMPLATE.format(tables="".join(combined_messages))
email_subject = EMAIL_SUBJECT_TEMPLATE.format(
app_name=(
f"{len(messages_by_app)} apps"
if len(messages_by_app) > 1
else list(messages_by_app.keys())[0]
)
)
send_ses(
fromaddr="[email protected]",
subject=email_subject,
body=email_body,
recipients=[email],
dryrun=dry_run,
)
def send_error_email(message: str, run_date: date, dry_run: bool):
email_subject = f"Ping expiry alert errors on {run_date.isoformat()}"
send_ses(
fromaddr="[email protected]",
subject=email_subject,
body=message,
recipients=["[email protected]"],
dryrun=dry_run,
)
def table_name_to_doctype(table_name: str) -> str:
"""Convert a bigquery table name to the associated telemetry document type."""
return re.sub("_v[0-9]+$", "", table_name).replace("_", "-")
def validate_retention_settings(
dataset_info: bigquery.Row, app_info: Dict[str, Any]
) -> List[Tuple[str, int, int]]:
"""Return a list of tables that have retention settings that do not match metadata.
:param dataset_info: Row from the query on monitoring_derived.table_partition_expirations_v1.
:param app_info: Entry from probeinfo app listings
:return: List of tuples of (table_id, retention set in metadata, retention set in bigquery)
"""
errors = []
pipeline_metadata = app_info.get("moz_pipeline_metadata", {})
default_retention_days = (
app_info.get("moz_pipeline_metadata_defaults", {})
.get("expiration_policy", {})
.get("delete_after_days")
)
for table_info in dataset_info["tables"]:
document_type = table_name_to_doctype(table_info["table_id"])
applied_retention_days = table_info["actual_partition_expiration_days"]
if (
delete_after_days := pipeline_metadata.get(document_type, {})
.get("expiration_policy", {})
.get("delete_after_days")
) is not None:
metadata_retention_days = delete_after_days
else:
metadata_retention_days = default_retention_days
if (
metadata_retention_days != applied_retention_days
and table_info["expiration_changed"] is False
):
errors.append(
(
f"{dataset_info['dataset_id']}.{table_info['table_id']}",
metadata_retention_days,
applied_retention_days,
)
)
return errors
def get_expiring_pings(
run_date: datetime.date,
project_id: str,
) -> Tuple[Dict[str, Dict[str, List[str]]], Dict[str, List]]:
"""
Get expiring pings across all apps and a tuple of expiring pings and errors encountered.
Expiring pings are stored in a dict where the key is an email to send to and the value
is a dict with list of messages per app.
Errors are stored in a dict where the key is a bigquery table name and the value is a list of
errors related to the associated ping.
Ping expiration is based on the most recent data in the
moz-fx-data-shared-prod.monitoring_derived.table_partition_expirations_v1 bigquery table.
"""
client = bigquery.Client()
app_listings = request_get(
"https://probeinfo.telemetry.mozilla.org/v2/glean/app-listings"
)
bq_dataset_to_app_listing = {app["bq_dataset_family"]: app for app in app_listings}
errors = defaultdict(list)
# dict structure: {email: {app_name: [messages]}}
expiring_pings_by_email = defaultdict(lambda: defaultdict(list))
current_expirations = list(
client.query_and_wait(
EXPIRATIONS_QUERY_TEMPLATE.format(run_date=run_date, project=project_id)
)
)
for dataset_info in current_expirations:
document_namespace = re.sub("_stable$", "", dataset_info["dataset_id"])
app_info = bq_dataset_to_app_listing.get(document_namespace)
# check if retention settings in metadata match applied settings (glean apps only)
if app_info is not None:
for (
table_id,
metadata_retention_days,
applied_retention_days,
) in validate_retention_settings(dataset_info, app_info):
errors[f"{project_id}.{table_id}"].append(
f"Retention period in metadata ({metadata_retention_days} days) "
f"does not match period applied to table ({applied_retention_days} days)"
)
app_pings = request_get(
f"https://probeinfo.telemetry.mozilla.org/glean/{app_info['v1_name']}/pings"
)
# Find expiring pings and create list of emails to send
for table_info in dataset_info["tables"]:
# Send to app and ping owners for glean apps
if app_info is not None:
app_name = app_info["app_name"]
document_type = table_name_to_doctype(table_info["table_id"])
email_list = {
*(
app_pings[document_type]["history"][-1]["notification_emails"]
if document_type in app_pings
else []
),
*app_info["notification_emails"],
*DEFAULT_EMAILS,
}
# send to [email protected] for legacy telemetry
else:
app_name = "legacy telemetry"
email_list = {
*DEFAULT_EMAILS,
}
expires_in_days = (
(table_info["next_deletion_date"] or run_date) - run_date
).days
if NOTIFICATION_DAYS_MIN <= expires_in_days <= NOTIFICATION_DAYS_MAX:
message = RETENTION_DAYS_MESSAGE_TEMPLATE.format(
table_name=f"{document_namespace}."
f"{re.sub('_v[0-9]+$', '', table_info['table_id'])}",
retention_days=table_info["partition_expiration_days"],
expiry_date=table_info["next_deletion_date"],
num_weeks=expires_in_days // 7,
plural_weeks="" if expires_in_days // 7 == 1 else "s",
)
for email in email_list:
expiring_pings_by_email[email][app_name].append(message)
return expiring_pings_by_email, errors
def main():
args = parse_args()
expiring_pings_by_email, errors = get_expiring_pings(
run_date=args.run_date,
project_id=args.bigquery_project,
)
# Only send emails on Wednesday, dry run on other days for error checking
dry_run = args.dry_run or args.run_date.weekday() != 2
if len(errors) > 0:
error_string = "\n".join([f"{ping}: {msg}" for ping, msg in errors.items()])
full_message = f"Encountered {len(errors)} errors: \n{error_string}"
send_error_email(
message=full_message,
run_date=args.run_date,
dry_run=args.dry_run, # send error emails regardless of day
)
else:
full_message = None
send_emails(expiring_pings_by_email, dry_run)
if full_message is not None:
print(full_message, file=sys.stderr)
if __name__ == "__main__":
main()