Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[postgres] Emit raw query statements and plans for non prepared statements #19493

Merged
merged 9 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions postgres/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,22 @@ files:
type: boolean
example: false
display_default: false
- name: collect_raw_query_statement
hidden: true
description: |
Configure the collection of raw query statements in query activity and execution plans.
Raw query statements and execution plans may contain sensitive information in query text.
Enabling this option will allow the collection and ingestion of raw query statements and
execution plans into Datadog. This option is disabled by default.
Note: This option only applies when `dbm` is enabled.
options:
- name: enabled
description: |
Set to `true` to collect the raw query statements.
value:
type: boolean
example: false
display_default: false
- name: log_unobfuscated_queries
hidden: true
description: |
Expand Down
1 change: 1 addition & 0 deletions postgres/changelog.d/19493.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for collecting raw query statements and explain plans when `collect_raw_query_statement.enabled` is true.
21 changes: 16 additions & 5 deletions postgres/datadog_checks/postgres/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ def __init__(self, instance, init_config, check):
'"dbname" parameter must be set OR autodiscovery must be enabled when using the "relations" parameter.'
)
self.max_connections = instance.get('max_connections', 30)
self.tags = self._build_tags(
custom_tags=instance.get('tags', []),
propagate_agent_tags=self._should_propagate_agent_tags(instance, init_config),
)

ssl = instance.get('ssl', "allow")
if ssl in SSL_MODES:
Expand Down Expand Up @@ -162,6 +158,12 @@ def __init__(self, instance, init_config, check):
),
'keep_json_path': is_affirmative(obfuscator_options_config.get('keep_json_path', False)),
}
collect_raw_query_statement_config: dict = instance.get('collect_raw_query_statement', {}) or {}
self.collect_raw_query_statement = {
"enabled": is_affirmative(collect_raw_query_statement_config.get('enabled', False)),
"cache_max_size": int(collect_raw_query_statement_config.get('cache_max_size', 10000)),
"samples_per_hour_per_query": int(collect_raw_query_statement_config.get('samples_per_hour_per_query', 1)),
}
self.log_unobfuscated_queries = is_affirmative(instance.get('log_unobfuscated_queries', False))
self.log_unobfuscated_plans = is_affirmative(instance.get('log_unobfuscated_plans', False))
self.database_instance_collection_interval = instance.get('database_instance_collection_interval', 300)
Expand All @@ -171,7 +173,13 @@ def __init__(self, instance, init_config, check):
self.baseline_metrics_expiry = self.statement_metrics_config.get('baseline_metrics_expiry', 300)
self.service = instance.get('service') or init_config.get('service') or ''

def _build_tags(self, custom_tags, propagate_agent_tags):
self.tags = self._build_tags(
custom_tags=instance.get('tags', []),
propagate_agent_tags=self._should_propagate_agent_tags(instance, init_config),
additional_tags=["raw_query_statement:enabled"] if self.collect_raw_query_statement["enabled"] else [],
)

def _build_tags(self, custom_tags, propagate_agent_tags, additional_tags):
# Clean up tags in case there was a None entry in the instance
# e.g. if the yaml contains tags: but no actual tags
if custom_tags is None:
Expand Down Expand Up @@ -202,6 +210,9 @@ def _build_tags(self, custom_tags, propagate_agent_tags):
raise ConfigurationError(
'propagate_agent_tags enabled but there was an error fetching agent tags {}'.format(e)
)

if additional_tags:
tags.extend(additional_tags)
return tags

@staticmethod
Expand Down
9 changes: 9 additions & 0 deletions postgres/datadog_checks/postgres/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ class Azure(BaseModel):
managed_authentication: Optional[ManagedAuthentication1] = None


class CollectRawQueryStatement(BaseModel):
model_config = ConfigDict(
arbitrary_types_allowed=True,
frozen=True,
)
enabled: Optional[bool] = None


class CollectSchemas(BaseModel):
model_config = ConfigDict(
arbitrary_types_allowed=True,
Expand Down Expand Up @@ -227,6 +235,7 @@ class InstanceConfig(BaseModel):
collect_database_size_metrics: Optional[bool] = None
collect_default_database: Optional[bool] = None
collect_function_metrics: Optional[bool] = None
collect_raw_query_statement: Optional[CollectRawQueryStatement] = None
collect_schemas: Optional[CollectSchemas] = None
collect_settings: Optional[CollectSettings] = None
collect_wal_metrics: Optional[bool] = None
Expand Down
128 changes: 99 additions & 29 deletions postgres/datadog_checks/postgres/statement_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __init__(self, check, config, shutdown_callback):
self._explain_function = config.statement_samples_config.get('explain_function', 'datadog.explain_statement')
self._explain_parameterized_queries = ExplainParameterizedQueries(check, config, self._explain_function)
self._obfuscate_options = to_native_string(json.dumps(self._config.obfuscator_options))
self._collect_raw_query_statement = config.collect_raw_query_statement.get("enabled", False)

self._collection_strategy_cache = TTLCache(
maxsize=config.statement_samples_config.get('collection_strategy_cache_maxsize', 1000),
Expand All @@ -206,6 +207,11 @@ def __init__(self, check, config, shutdown_callback):
ttl=60 * 60 / int(config.statement_samples_config.get('samples_per_hour_per_query', 15)),
)

self._raw_statement_text_cache = RateLimitingTTLCache(
maxsize=config.collect_raw_query_statement["cache_max_size"],
ttl=60 * 60 / config.collect_raw_query_statement["samples_per_hour_per_query"],
)

self._activity_coll_enabled = is_affirmative(self._config.statement_activity_config.get('enabled', True))
self._explain_plan_coll_enabled = is_affirmative(self._config.statement_samples_config.get('enabled', True))

Expand Down Expand Up @@ -476,8 +482,7 @@ def _collect_statement_samples(self):
rows = self._filter_and_normalize_statement_rows(rows)
submitted_count = 0
if self._explain_plan_coll_enabled:
event_samples = self._collect_plans(rows)
for e in event_samples:
for e in self._collect_plans(rows):
self._check.database_monitoring_query_sample(json.dumps(e, default=default_json_event_encoding))
submitted_count += 1

Expand Down Expand Up @@ -553,16 +558,74 @@ def _to_active_session(self, row, track_activity_query_size):
if (row.get('backend_type', 'client backend') != 'client backend') or (
row['state'] is not None and row['state'] != 'idle'
):
active_row = {key: val for key, val in row.items() if val is not None and key != 'query'}
# Create an active_row, for each session by
# 1. Removing all null key/value pairs and the original query
# 2. if row['statement'] is none, replace with ERROR: failed to obfuscate so we can still collect activity
active_row['query_truncated'] = self._get_truncation_state(
row['query_truncated'] = self._get_truncation_state(
track_activity_query_size, row['query'], row['query_signature']
).value
if row['statement'] is None:
active_row['statement'] = "ERROR: failed to obfuscate"
return active_row
row['statement'] = "ERROR: failed to obfuscate"
return row

def _create_active_sessions(self, rows):
active_sessions_count = 0
for row in rows:
active_row = self._to_active_session(row, self._get_track_activity_query_size())
if active_row:
active_sessions_count += 1
yield active_row
if active_sessions_count >= self._activity_max_rows:
break

@tracked_method(agent_check_getter=agent_check_getter)
def _row_to_raw_statement_event(self, row):
query_signature = row.get('query_signature')
if not query_signature:
return

raw_statement = row.get("query", None)
if not raw_statement:
self._log.debug("No raw statement found for query_signature=%s", query_signature)
return

if row.get('backend_type') != 'client backend':
# we only want to collect raw statements for client backends
return

raw_query_signature = compute_sql_signature(raw_statement)
row["raw_query_signature"] = raw_query_signature
raw_statement_key = (query_signature, raw_query_signature)

if not self._raw_statement_text_cache.acquire(raw_statement_key):
return

raw_query_event = {
"timestamp": time.time() * 1000,
"host": self._check.resolved_hostname,
"ddagentversion": datadog_agent.get_version(),
"ddsource": "postgres",
"dbm_type": "rqt",
"ddtags": ",".join(self._dbtags(row["datname"])),
'service': self._config.service,
"db": {
"instance": row['datname'],
"query_signature": row['query_signature'],
"raw_query_signature": raw_query_signature,
"statement": raw_statement,
"metadata": {
"tables": row['dd_tables'],
"commands": row['dd_commands'],
"comments": row['dd_comments'],
},
},
"postgres": {
"datname": row["datname"],
"rolname": row.get("rolname"),
},
}

self._check.database_monitoring_query_sample(json.dumps(raw_query_event, default=default_json_event_encoding))

def _can_explain_statement(self, obfuscated_statement):
if obfuscated_statement.startswith('SELECT {}'.format(self._explain_function)):
Expand Down Expand Up @@ -764,7 +827,7 @@ def _collect_plan_for_statement(self, row):
# limit the rate of explains done to the database
cache_key = (row['datname'], row['query_signature'])
if not self._explained_statements_ratelimiter.acquire(cache_key):
return None
return

# Plans have several important signatures to tag events with. Note that for postgres, the
# query_signature and resource_hash will be the same value.
Expand All @@ -778,24 +841,25 @@ def _collect_plan_for_statement(self, row):
if explain_err_code:
collection_errors = [{'code': explain_err_code.value, 'message': err_msg if err_msg else None}]

plan, normalized_plan, obfuscated_plan, plan_signature = None, None, None, None
raw_plan, normalized_plan, obfuscated_plan, plan_signature, raw_plan_signature = None, None, None, None, None
if plan_dict:
plan = json.dumps(plan_dict)
raw_plan = json.dumps(plan_dict)
# if we're using the orjson implementation then json.dumps returns bytes
plan = plan.decode('utf-8') if isinstance(plan, bytes) else plan
raw_plan = raw_plan.decode('utf-8') if isinstance(raw_plan, bytes) else raw_plan
try:
normalized_plan = datadog_agent.obfuscate_sql_exec_plan(plan, normalize=True)
obfuscated_plan = datadog_agent.obfuscate_sql_exec_plan(plan)
normalized_plan = datadog_agent.obfuscate_sql_exec_plan(raw_plan, normalize=True)
obfuscated_plan = datadog_agent.obfuscate_sql_exec_plan(raw_plan)
except Exception as e:
if self._config.log_unobfuscated_plans:
self._log.warning("Failed to obfuscate plan=[%s] | err=[%s]", plan, e)
self._log.warning("Failed to obfuscate plan=[%s] | err=[%s]", raw_plan, e)
raise e

plan_signature = compute_exec_plan_signature(normalized_plan)
raw_plan_signature = compute_exec_plan_signature(raw_plan)

statement_plan_sig = (row['query_signature'], plan_signature)
if self._seen_samples_ratelimiter.acquire(statement_plan_sig):
event = {
obfuscated_plan_event = {
"host": self._check.resolved_hostname,
"dbm_type": "plan",
"ddagentversion": datadog_agent.get_version(),
Expand Down Expand Up @@ -836,27 +900,35 @@ def _collect_plan_for_statement(self, row):
}
if row['state'] in {'idle', 'idle in transaction'}:
if row['state_change'] and row['query_start']:
event['duration'] = (row['state_change'] - row['query_start']).total_seconds() * 1e9
obfuscated_plan_event['duration'] = (row['state_change'] - row['query_start']).total_seconds() * 1e9
# If the transaction is idle then we have a more specific "end time" than the current time at
# which we're collecting this event. According to the postgres docs, all of the timestamps in
# pg_stat_activity are `timestamp with time zone` so the timezone should always be present. However,
# if there is something wrong and it's missing then we can't use `state_change` for the timestamp
# of the event else we risk the timestamp being significantly off and the event getting dropped
# during ingestion.
if row['state_change'].tzinfo:
event['timestamp'] = get_timestamp(row['state_change']) * 1000
return event
return None
obfuscated_plan_event['timestamp'] = get_timestamp(row['state_change']) * 1000

if self._collect_raw_query_statement and plan_signature:
# Emit RQP (raw query plan) event when raw query statement collection is enabled
raw_plan_event = copy.deepcopy(obfuscated_plan_event)
raw_plan_event["dbm_type"] = "rqp" # raw query plan
raw_plan_event["db"]["statement"] = row["query"] # raw query
raw_plan_event["db"]["plan"]["definition"] = raw_plan
raw_plan_event["db"]["plan"]["raw_signature"] = raw_plan_signature
# set the raw plan signature in obfuscated plan event
obfuscated_plan_event["db"]["plan"]["raw_signature"] = raw_plan_signature
yield raw_plan_event
yield obfuscated_plan_event
return

def _collect_plans(self, rows):
events = []
for row in rows:
try:
if row['statement'] is None or row.get('backend_type', 'client backend') != 'client backend':
continue
event = self._collect_plan_for_statement(row)
if event:
events.append(event)
yield from self._collect_plan_for_statement(row)
except Exception:
self._log.exception(
"Crashed trying to collect execution plan for statement in dbname=%s", row['datname']
Expand All @@ -868,17 +940,15 @@ def _collect_plans(self, rows):
hostname=self._check.resolved_hostname,
raw=True,
)
return events

def _create_activity_event(self, rows, active_connections):
self._time_since_last_activity_event = time.time()
active_sessions = []
for row in rows:
active_row = self._to_active_session(row, self._get_track_activity_query_size())
if active_row:
active_sessions.append(active_row)
if len(active_sessions) > self._activity_max_rows:
active_sessions = self._truncate_activity_rows(active_sessions, self._activity_max_rows)
for row in self._create_active_sessions(rows):
if self._collect_raw_query_statement:
self._row_to_raw_statement_event(row)
row = {key: val for key, val in row.items() if val is not None and key != 'query'}
active_sessions.append(row)
event = {
"host": self._check.resolved_hostname,
"ddagentversion": datadog_agent.get_version(),
Expand Down
Loading
Loading