From 0517b3ffdec025c8889ca12a62fa7dd8822b1738 Mon Sep 17 00:00:00 2001 From: "datadog-agent-integrations-bot[bot]" <159767151+datadog-agent-integrations-bot[bot]@users.noreply.github.com> Date: Tue, 28 Jan 2025 18:36:54 +0100 Subject: [PATCH] Revert "Update Posgres to psycopg3 (#19322)" (#19499) (#19504) Co-authored-by: Enrico Donnici Co-authored-by: Seth Samuel --- .builders/images/helpers.ps1 | 8 - .builders/images/linux-aarch64/Dockerfile | 2 +- .builders/images/linux-x86_64/Dockerfile | 2 +- .../images/macos-x86_64/builder_setup.sh | 7 - .builders/images/windows-x86_64/Dockerfile | 36 --- .builders/images/windows-x86_64/pg_config.pl | 4 - .../postgres/linux/50_install_postgres.sh | 6 - .ddev/config.toml | 2 - LICENSE-3rdparty.csv | 1 - agent_requirements.in | 1 - postgres/changelog.d/19499.fixed | 1 + .../datadog_checks/postgres/connections.py | 18 +- postgres/datadog_checks/postgres/cursor.py | 7 +- postgres/datadog_checks/postgres/discovery.py | 3 +- .../postgres/explain_parameterized_queries.py | 11 +- postgres/datadog_checks/postgres/metadata.py | 75 +++--- postgres/datadog_checks/postgres/postgres.py | 59 +++-- .../postgres/statement_samples.py | 41 +-- .../datadog_checks/postgres/statements.py | 40 ++- .../datadog_checks/postgres/version_utils.py | 5 +- postgres/pyproject.toml | 2 +- postgres/tests/conftest.py | 20 +- postgres/tests/test_connections.py | 21 +- postgres/tests/test_cursor.py | 7 +- postgres/tests/test_deadlock.py | 56 ++-- postgres/tests/test_discovery.py | 10 +- .../test_explain_parameterized_queries.py | 12 +- postgres/tests/test_pg_integration.py | 27 +- postgres/tests/test_pg_replication.py | 6 +- postgres/tests/test_relations.py | 4 +- postgres/tests/test_replication_slot.py | 28 +- postgres/tests/test_statements.py | 247 ++++++++++-------- postgres/tests/test_unit.py | 6 +- postgres/tests/utils.py | 41 +-- 34 files changed, 376 insertions(+), 440 deletions(-) delete mode 100644 .builders/images/windows-x86_64/pg_config.pl delete mode 100755 .ddev/ci/scripts/postgres/linux/50_install_postgres.sh create mode 100644 postgres/changelog.d/19499.fixed diff --git a/.builders/images/helpers.ps1 b/.builders/images/helpers.ps1 index 70643bed99d03..cdd7ee8439a0e 100644 --- a/.builders/images/helpers.ps1 +++ b/.builders/images/helpers.ps1 @@ -38,11 +38,3 @@ function Add-ToPath() { $target="$oldPath;$Append" [Environment]::SetEnvironmentVariable("Path", $target, [System.EnvironmentVariableTarget]::User) } - -function RunOnVSConsole() { - param( - [Parameter(Mandatory = $true)][string] $Command - ) - Write-Host "Running $Command" - Start-Process -Wait -NoNewWindow "cmd.exe" -ArgumentList "/c ""$Env:VCVARSALL_BAT"" $Env:DD_TARGET_ARCH && $Command" -} diff --git a/.builders/images/linux-aarch64/Dockerfile b/.builders/images/linux-aarch64/Dockerfile index 21781de9c2342..baa904a8559d1 100644 --- a/.builders/images/linux-aarch64/Dockerfile +++ b/.builders/images/linux-aarch64/Dockerfile @@ -91,7 +91,7 @@ RUN \ --without-debugger \ --disable-static -# libpq and pg_config as needed by psycopg +# libpq and pg_config as needed by psycopg2 RUN \ DOWNLOAD_URL="https://ftp.postgresql.org/pub/source/v{{version}}/postgresql-{{version}}.tar.bz2" \ VERSION="16.0" \ diff --git a/.builders/images/linux-x86_64/Dockerfile b/.builders/images/linux-x86_64/Dockerfile index dc07ca511dc23..332405a713efb 100644 --- a/.builders/images/linux-x86_64/Dockerfile +++ b/.builders/images/linux-x86_64/Dockerfile @@ -95,7 +95,7 @@ RUN \ --without-debugger \ --disable-static -# libpq and pg_config as needed by psycopg +# libpq and pg_config as needed by psycopg2 RUN \ DOWNLOAD_URL="https://ftp.postgresql.org/pub/source/v{{version}}/postgresql-{{version}}.tar.bz2" \ VERSION="16.0" \ diff --git a/.builders/images/macos-x86_64/builder_setup.sh b/.builders/images/macos-x86_64/builder_setup.sh index b1ecbc9ec4322..51e152c227d69 100644 --- a/.builders/images/macos-x86_64/builder_setup.sh +++ b/.builders/images/macos-x86_64/builder_setup.sh @@ -94,13 +94,6 @@ RELATIVE_PATH="curl-{{version}}" \ # Remove the binary installed so that we consistenly use the same original `curl` binary rm "${DD_PREFIX_PATH}/bin/curl" -# libpq and pg_config as needed by psycopg -DOWNLOAD_URL="https://ftp.postgresql.org/pub/source/v{{version}}/postgresql-{{version}}.tar.bz2" \ -VERSION="16.0" \ -SHA256="df9e823eb22330444e1d48e52cc65135a652a6fdb3ce325e3f08549339f51b99" \ -RELATIVE_PATH="postgresql-{{version}}" \ - install-from-source --without-readline --with-openssl --without-icu - # Dependencies needed to build librdkafka (and thus, confluent-kafka) with kerberos support DOWNLOAD_URL="https://github.com/LMDB/lmdb/archive/LMDB_{{version}}.tar.gz" \ VERSION="0.9.29" \ diff --git a/.builders/images/windows-x86_64/Dockerfile b/.builders/images/windows-x86_64/Dockerfile index 5701aa6fdac51..0fc86a4bdc223 100644 --- a/.builders/images/windows-x86_64/Dockerfile +++ b/.builders/images/windows-x86_64/Dockerfile @@ -25,7 +25,6 @@ RUN curl -SL --output vs_buildtools.exe https://aka.ms/vs/17/release/vs_buildtoo --add Microsoft.VisualStudio.Workload.VCTools ` || IF "%ERRORLEVEL%"=="3010" EXIT 0) ` && del /q vs_buildtools.exe -ENV VCVARSALL_BAT="C:\Program Files (x86)\Microsoft Visual Studio\2022\BuildTools\VC\Auxiliary\Build\vcvarsall.bat" # Upgrade PowerShell ENV POWERSHELL_VERSION="7.4.0" @@ -115,42 +114,7 @@ RUN Get-RemoteFile ` Add-ToPath -Append "C:\perl\perl\bin" && ` Remove-Item "strawberry-perl-$Env:PERL_VERSION-64bit.zip" -# Nasm -ENV NASM_VERSION="2.16.03" -RUN Get-RemoteFile ` - -Uri https://www.nasm.us/pub/nasm/releasebuilds/$Env:NASM_VERSION/win64/nasm-$Env:NASM_VERSION-win64.zip ` - -Path "nasm-$Env:NASM_VERSION-win64.zip" ` - -Hash '3ee4782247bcb874378d02f7eab4e294a84d3d15f3f6ee2de2f47a46aa7226e6' && ` - 7z x "nasm-$Env:NASM_VERSION-win64.zip" -o"C:\nasm" && ` - Add-ToPath -Append "C:\nasm\nasm-$Env:NASM_VERSION" && ` - Remove-Item "nasm-$Env:NASM_VERSION-win64.zip" -# openssl ENV OPENSSL_VERSION="3.3.2" -RUN Get-RemoteFile ` - -Uri https://www.openssl.org/source/openssl-$Env:OPENSSL_VERSION.tar.gz ` - -Path openssl-$Env:OPENSSL_VERSION.tar.gz ` - -Hash '2e8a40b01979afe8be0bbfb3de5dc1c6709fedb46d6c89c10da114ab5fc3d281'; ` - 7z x openssl-$Env:OPENSSL_VERSION.tar.gz -r -y && ` - 7z x openssl-$Env:OPENSSL_VERSION.tar -oC:\openssl_3 && ` - cd C:\openssl_3\openssl-$Env:OPENSSL_VERSION && ` - RunOnVSConsole -Command ` - 'C:\perl\perl\bin\perl.exe Configure && ` - nmake && ` - nmake install_sw' -# libpq and pg_config as needed by psycopg -ENV PG_VERSION="16.0" -COPY pg_config.pl C:\pg_config.pl -RUN Get-RemoteFile ` - -Uri https://ftp.postgresql.org/pub/source/v$Env:PG_VERSION/postgresql-$Env:PG_VERSION.tar.bz2 ` - -Path postgresql-$Env:PG_VERSION.tar.bz2 ` - -Hash 'df9e823eb22330444e1d48e52cc65135a652a6fdb3ce325e3f08549339f51b99'; ` - 7z x postgresql-$Env:PG_VERSION.tar.bz2 -r -y && ` - 7z x postgresql-$Env:PG_VERSION.tar -oC:\postgresql_src && ` - cd C:\postgresql_src\postgresql-$Env:PG_VERSION\src\tools\msvc && ` - Copy-Item C:\pg_config.pl -Destination .\config.pl && ` - RunOnVSConsole -Command 'C:\perl\perl\bin\perl.exe build.pl' && ` - RunOnVSConsole -Command 'C:\perl\perl\bin\perl.exe install.pl C:\postgresql' && ` - Add-ToPath -Append "C:\postgresql\bin" ENV CURL_VERSION="8.11.1" diff --git a/.builders/images/windows-x86_64/pg_config.pl b/.builders/images/windows-x86_64/pg_config.pl deleted file mode 100644 index c7933bd808bca..0000000000000 --- a/.builders/images/windows-x86_64/pg_config.pl +++ /dev/null @@ -1,4 +0,0 @@ -# Overrides over default postgres config -# https://github.com/postgres/postgres/blob/REL_16_0/src/tools/msvc/config_default.pl - -$config->{openssl} = 'c:\Program Files\OpenSSL'; diff --git a/.ddev/ci/scripts/postgres/linux/50_install_postgres.sh b/.ddev/ci/scripts/postgres/linux/50_install_postgres.sh deleted file mode 100755 index bc6125a6616e1..0000000000000 --- a/.ddev/ci/scripts/postgres/linux/50_install_postgres.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash - -set -ex - -sudo apt update -sudo apt install -y --no-install-recommends build-essential python3-dev libpq-dev \ No newline at end of file diff --git a/.ddev/config.toml b/.ddev/config.toml index abaf726a673f2..3e480bfac83b2 100644 --- a/.ddev/config.toml +++ b/.ddev/config.toml @@ -83,8 +83,6 @@ oauthlib = ['BSD-3-Clause'] mmh3 = ['CC0-1.0'] # https://github.com/paramiko/paramiko/blob/master/LICENSE paramiko = ['LGPL-2.1-only'] -# https://github.com/psycopg/psycopg/blob/master/LICENSE.txt -psycopg = ['LGPL-3.0-only'] # https://github.com/psycopg/psycopg2/blob/master/LICENSE # https://github.com/psycopg/psycopg2/blob/master/doc/COPYING.LESSER psycopg2-binary = ['LGPL-3.0-only', 'BSD-3-Clause'] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 1fad1f21bdb43..36883fad061c4 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -45,7 +45,6 @@ ply,PyPI,BSD-3-Clause,Copyright (C) 2001-2018 prometheus-client,PyPI,Apache-2.0,Copyright 2015 The Prometheus Authors protobuf,PyPI,BSD-3-Clause,Copyright 2008 Google Inc. All rights reserved. psutil,PyPI,BSD-3-Clause,"Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola" -psycopg,PyPI,LGPL-3.0-only,Copyright (C) 2020 The Psycopg Team psycopg2-binary,PyPI,BSD-3-Clause,Copyright 2013 Federico Di Gregorio psycopg2-binary,PyPI,LGPL-3.0-only,Copyright (C) 2013 Federico Di Gregorio pyOpenSSL,PyPI,Apache-2.0,Copyright The pyOpenSSL developers diff --git a/agent_requirements.in b/agent_requirements.in index e8983d4ffefaa..dc1755f63914e 100644 --- a/agent_requirements.in +++ b/agent_requirements.in @@ -32,7 +32,6 @@ prometheus-client==0.21.1 protobuf==5.29.3 psutil==6.0.0 psycopg2-binary==2.9.9 -psycopg[c]==3.2.3 pyasn1==0.4.8 pycryptodomex==3.21.0 pydantic==2.10.5 diff --git a/postgres/changelog.d/19499.fixed b/postgres/changelog.d/19499.fixed new file mode 100644 index 0000000000000..03a7138d6239a --- /dev/null +++ b/postgres/changelog.d/19499.fixed @@ -0,0 +1 @@ +Revert "Upgrade postgres to psycopg3" due to instability in testing \ No newline at end of file diff --git a/postgres/datadog_checks/postgres/connections.py b/postgres/datadog_checks/postgres/connections.py index b9c396d18cbed..5c78e5d716042 100644 --- a/postgres/datadog_checks/postgres/connections.py +++ b/postgres/datadog_checks/postgres/connections.py @@ -8,7 +8,7 @@ import time from typing import Callable, Dict -import psycopg +import psycopg2 class ConnectionPoolFullError(Exception): @@ -23,7 +23,7 @@ def __str__(self): class ConnectionInfo: def __init__( self, - connection: psycopg.Connection, + connection: psycopg2.extensions.connection, deadline: int, active: bool, last_accessed: int, @@ -86,9 +86,9 @@ def _get_connection_raw( dbname: str, ttl_ms: int, timeout: int = None, - startup_fn: Callable[[psycopg.Connection], None] = None, + startup_fn: Callable[[psycopg2.extensions.connection], None] = None, persistent: bool = False, - ) -> psycopg.Connection: + ) -> psycopg2.extensions.connection: """ Return a connection from the pool. Pass a function to startup_func if there is an action needed with the connection @@ -117,7 +117,7 @@ def _get_connection_raw( # if already in pool, retain persistence status persistent = conn.persistent - if db.info.status != psycopg.pq.ConnStatus.OK: + if db.status != psycopg2.extensions.STATUS_READY: # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that db.rollback() @@ -138,7 +138,7 @@ def get_connection( dbname: str, ttl_ms: int, timeout: int = None, - startup_fn: Callable[[psycopg.Connection], None] = None, + startup_fn: Callable[[psycopg2.extensions.connection], None] = None, persistent: bool = False, ): """ @@ -147,14 +147,12 @@ def get_connection( make a new connection if the max_conn limit hasn't been reached. Blocks until a connection can be added to the pool, and optionally takes a timeout in seconds. - Note that leaving a connection context here does NOT close the connection in psycopg; + Note that leaving a connection context here does NOT close the connection in psycopg2; connections must be manually closed by `close_all_connections()`. """ try: with self._mu: - db = self._get_connection_raw( - dbname=dbname, ttl_ms=ttl_ms, timeout=timeout, startup_fn=startup_fn, persistent=persistent - ) + db = self._get_connection_raw(dbname, ttl_ms, timeout, startup_fn, persistent) yield db finally: with self._mu: diff --git a/postgres/datadog_checks/postgres/cursor.py b/postgres/datadog_checks/postgres/cursor.py index b5b9fc870a4bf..a122790231b6d 100644 --- a/postgres/datadog_checks/postgres/cursor.py +++ b/postgres/datadog_checks/postgres/cursor.py @@ -2,7 +2,8 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) -import psycopg +import psycopg2.extensions +import psycopg2.extras from datadog_checks.base.utils.db.sql_commenter import add_sql_comment @@ -27,9 +28,9 @@ def execute(self, query, vars=None, ignore_query_metric=False): return super().execute(query, vars) -class CommenterCursor(BaseCommenterCursor, psycopg.ClientCursor): +class CommenterCursor(BaseCommenterCursor, psycopg2.extensions.cursor): pass -class CommenterDictCursor(BaseCommenterCursor, psycopg.ClientCursor): +class CommenterDictCursor(BaseCommenterCursor, psycopg2.extras.DictCursor): pass diff --git a/postgres/datadog_checks/postgres/discovery.py b/postgres/datadog_checks/postgres/discovery.py index 46ce5de4d3ad4..3ec4b9fab86fa 100644 --- a/postgres/datadog_checks/postgres/discovery.py +++ b/postgres/datadog_checks/postgres/discovery.py @@ -6,6 +6,7 @@ from datadog_checks.base import AgentCheck from datadog_checks.base.utils.discovery import Discovery +from datadog_checks.postgres.cursor import CommenterCursor from datadog_checks.postgres.util import DatabaseConfigurationError, warning_with_tags AUTODISCOVERY_QUERY: str = """select datname from pg_catalog.pg_database where datistemplate = false;""" @@ -71,7 +72,7 @@ def get_items(self) -> List[str]: def _get_databases(self) -> List[str]: with self.db_pool.get_connection(self._db, self._default_ttl) as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: cursor.execute(AUTODISCOVERY_QUERY) databases = list(cursor.fetchall()) databases = [ diff --git a/postgres/datadog_checks/postgres/explain_parameterized_queries.py b/postgres/datadog_checks/postgres/explain_parameterized_queries.py index 4e095c8ab3562..bfb2a54d12940 100644 --- a/postgres/datadog_checks/postgres/explain_parameterized_queries.py +++ b/postgres/datadog_checks/postgres/explain_parameterized_queries.py @@ -5,10 +5,11 @@ import logging import re -import psycopg +import psycopg2 from datadog_checks.base.utils.db.sql import compute_sql_signature from datadog_checks.base.utils.tracking import tracked_method +from datadog_checks.postgres.cursor import CommenterDictCursor from .util import DBExplainError from .version_utils import V12 @@ -76,7 +77,7 @@ def explain_statement(self, dbname, statement, obfuscated_statement): if self._check.version < V12: # if pg version < 12, skip explaining parameterized queries because # plan_cache_mode is not supported - e = psycopg.errors.UndefinedParameter("Unable to explain parameterized query") + e = psycopg2.errors.UndefinedParameter("Unable to explain parameterized query") logger.debug( "Unable to explain parameterized query. Postgres version %s does not support plan_cache_mode", self._check.version, @@ -179,14 +180,16 @@ def _deallocate_prepared_statement(self, dbname, query_signature): ) def _execute_query(self, dbname, query): + # Psycopg2 connections do not get closed when context ends; + # leaving context will just mark the connection as inactive in MultiDatabaseConnectionPool with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: logger.debug('Executing query=[%s]', query) cursor.execute(query, ignore_query_metric=True) def _execute_query_and_fetch_rows(self, dbname, query): with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: cursor.execute(query, ignore_query_metric=True) return cursor.fetchall() diff --git a/postgres/datadog_checks/postgres/metadata.py b/postgres/datadog_checks/postgres/metadata.py index 95261a548e89f..038346389ba13 100644 --- a/postgres/datadog_checks/postgres/metadata.py +++ b/postgres/datadog_checks/postgres/metadata.py @@ -6,8 +6,9 @@ import time from typing import Dict, List, Optional, Tuple, Union # noqa: F401 -import psycopg -from psycopg.rows import dict_row +import psycopg2 + +from datadog_checks.postgres.cursor import CommenterDictCursor try: import datadog_agent @@ -234,7 +235,7 @@ def __init__(self, check, config, shutdown_callback): enabled=is_affirmative(config.resources_metadata_config.get("enabled", True)), dbms="postgres", min_collection_interval=config.min_collection_interval, - expected_db_exceptions=(psycopg.errors.DatabaseError,), + expected_db_exceptions=(psycopg2.errors.DatabaseError,), job_name="database-metadata", shutdown_callback=shutdown_callback, ) @@ -332,7 +333,7 @@ def _collect_postgres_schemas(self): continue with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn: - with conn.cursor(row_factory=dict_row) as cursor: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: for schema in database["schemas"]: if not self._should_collect_metadata(schema["name"], "schema"): continue @@ -447,7 +448,9 @@ def _collect_schema_info(self): self._last_schemas_query_time = time.time() return metadata - def _query_database_information(self, cursor: psycopg.Cursor, dbname: str) -> Dict[str, Union[str, int]]: + def _query_database_information( + self, cursor: psycopg2.extensions.cursor, dbname: str + ) -> Dict[str, Union[str, int]]: """ Collect database info. Returns description: str @@ -460,7 +463,7 @@ def _query_database_information(self, cursor: psycopg.Cursor, dbname: str) -> Di row = cursor.fetchone() return row - def _query_schema_information(self, cursor: psycopg.Cursor, dbname: str) -> Dict[str, str]: + def _query_schema_information(self, cursor: psycopg2.extensions.cursor, dbname: str) -> Dict[str, str]: """ Collect user schemas. Returns id: str @@ -566,7 +569,7 @@ def sort_tables(info): return table_info[:limit] def _query_tables_for_schema( - self, cursor: psycopg.Cursor, schema_id: str, dbname: str + self, cursor: psycopg2.extensions.cursor, schema_id: str, dbname: str ) -> List[Dict[str, Union[str, Dict]]]: """ Collect list of tables for a schema. Returns a list of dictionaries @@ -597,7 +600,7 @@ def _query_tables_for_schema( return table_payloads def _query_table_information( - self, cursor: psycopg.Cursor, schema_name: str, table_info: List[Dict[str, Union[str, bool]]] + self, cursor: psycopg2.extensions.cursor, schema_name: str, table_info: List[Dict[str, Union[str, bool]]] ) -> List[Dict[str, Union[str, Dict]]]: """ Collect table information . Returns a dictionary @@ -677,7 +680,7 @@ def _query_table_information( def _collect_metadata_for_database(self, dbname): metadata = {} with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn: - with conn.cursor(row_factory=dict_row) as cursor: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: database_info = self._query_database_information(cursor, dbname) metadata.update( { @@ -698,32 +701,28 @@ def _collect_metadata_for_database(self, dbname): @tracked_method(agent_check_getter=agent_check_getter) def _collect_postgres_settings(self): with self._check._get_main_db() as conn: - with conn.cursor(row_factory=dict_row) as cursor: - with conn.transaction(): - # Get loaded extensions - cursor.execute(PG_EXTENSIONS_QUERY) - rows = cursor.fetchall() - query = PG_SETTINGS_QUERY - for row in rows: - extension = row['extname'] - # Run query to force loading of extension - # This allow us to reliably collect extension settings - if extension in PG_EXTENSION_LOADER_QUERY: - cursor.execute(PG_EXTENSION_LOADER_QUERY[extension]) - else: - self._log.warning("unable to collect settings for unknown extension %s", extension) - - if self.pg_settings_ignored_patterns: - query = query + " WHERE name NOT LIKE ALL(%s)" - - self._log.debug( - "Running query [%s] and patterns are %s", - query, - self.pg_settings_ignored_patterns, - ) - self._time_since_last_settings_query = time.time() - cursor.execute(query, (self.pg_settings_ignored_patterns,)) - rows = cursor.fetchall() - self._log.warning("Loaded %s rows from pg_settings", rows) - self._log.debug("Loaded %s rows from pg_settings", len(rows)) - return rows + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: + # Get loaded extensions + cursor.execute(PG_EXTENSIONS_QUERY) + rows = cursor.fetchall() + query = PG_SETTINGS_QUERY + for row in rows: + extension = row['extname'] + if extension in PG_EXTENSION_LOADER_QUERY: + query = PG_EXTENSION_LOADER_QUERY[extension] + "\n" + query + else: + self._log.warning("unable to collect settings for unknown extension %s", extension) + + if self.pg_settings_ignored_patterns: + query = query + " WHERE name NOT LIKE ALL(%s)" + + self._log.debug( + "Running query [%s] and patterns are %s", + query, + self.pg_settings_ignored_patterns, + ) + self._time_since_last_settings_query = time.time() + cursor.execute(query, (self.pg_settings_ignored_patterns,)) + rows = cursor.fetchall() + self._log.debug("Loaded %s rows from pg_settings", len(rows)) + return [dict(row) for row in rows] diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index 4573f5d001b57..3f20801a9a399 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -7,7 +7,7 @@ import os from time import time -import psycopg +import psycopg2 from cachetools import TTLCache from datadog_checks.base import AgentCheck @@ -21,7 +21,7 @@ from datadog_checks.base.utils.serialization import json from datadog_checks.postgres import aws, azure from datadog_checks.postgres.connections import MultiDatabaseConnectionPool -from datadog_checks.postgres.cursor import CommenterCursor +from datadog_checks.postgres.cursor import CommenterCursor, CommenterDictCursor from datadog_checks.postgres.discovery import PostgresAutodiscovery from datadog_checks.postgres.metadata import PostgresMetadata from datadog_checks.postgres.metrics_cache import PostgresMetricsCache @@ -214,7 +214,7 @@ def _new_query_executor(self, queries, db): def execute_query_raw(self, query, db): with db() as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: cursor.execute(query) rows = cursor.fetchall() return rows @@ -229,11 +229,11 @@ def db(self): self._db = self._new_connection(self._config.dbname) # once the connection is reinitialized, we need to reload the pg_settings self._load_pg_settings(self._db) - if self._db.info.status != psycopg.pq.ConnStatus.OK: + if self._db.status != psycopg2.extensions.STATUS_READY: self._db.rollback() try: yield self._db - except (psycopg.InterfaceError, InterruptedError): + except (psycopg2.InterfaceError, InterruptedError): # if we get an interface error or an interrupted error, # we gracefully close the connection self.log.warning( @@ -254,10 +254,10 @@ def _connection_health_check(self, conn): try: # run a simple query to check if the connection is healthy # health check should run after a connection is established - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: cursor.execute("SELECT 1") cursor.fetchall() - except psycopg.OperationalError as e: + except psycopg2.OperationalError as e: err_msg = f"Database {self._config.dbname} connection health check failed: {str(e)}" self.log.error(err_msg) raise DatabaseHealthCheckError(err_msg) @@ -382,7 +382,7 @@ def _get_debug_tags(self): def _get_replication_role(self): with self.db() as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: cursor.execute('SELECT pg_is_in_recovery();') role = cursor.fetchone()[0] # value fetched for role is of @@ -433,13 +433,13 @@ def _get_local_wal_file_age(self): def load_system_identifier(self): with self.db() as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: cursor.execute('SELECT system_identifier FROM pg_control_system();') self.system_identifier = cursor.fetchone()[0] def load_cluster_name(self): with self.db() as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: cursor.execute('SHOW cluster_name;') self.cluster_name = cursor.fetchone()[0] @@ -512,20 +512,20 @@ def _run_query_scope(self, cursor, scope, is_custom_metrics, cols, descriptors): cursor.execute(query.replace(r'%', r'%%')) results = cursor.fetchall() - except psycopg.errors.FeatureNotSupported as e: + except psycopg2.errors.FeatureNotSupported as e: # This happens for example when trying to get replication metrics from readers in Aurora. Let's ignore it. log_func(e) self.log.debug("Disabling replication metrics") self.is_aurora = False self.metrics_cache.replication_metrics = {} - except psycopg.errors.UndefinedFunction as e: + except psycopg2.errors.UndefinedFunction as e: log_func(e) log_func( "It seems the PG version has been incorrectly identified as %s. " "A reattempt to identify the right version will happen on next agent run." % self.version ) self._clean_state() - except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e: + except (psycopg2.ProgrammingError, psycopg2.errors.QueryCanceled) as e: log_func("Not all metrics may be available: %s" % str(e)) if not results: @@ -645,7 +645,7 @@ def _collect_metric_autodiscovery(self, instance_tags, scopes, scope_type): databases = self.autodiscovery.get_items() for db in databases: with self.db_pool.get_connection(db, self._config.idle_connection_timeout) as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: for scope in scopes: self._query_scope(cursor, scope, instance_tags, False, db) elapsed_ms = (time() - start_time) * 1000 @@ -736,7 +736,7 @@ def _collect_stats(self, instance_tags): metric_scope.append(replication_stats_metrics) with self.db() as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: results_len = self._query_scope(cursor, db_instance_metrics, instance_tags, False) if results_len is not None: self.gauge( @@ -746,14 +746,14 @@ def _collect_stats(self, instance_tags): hostname=self.resolved_hostname, ) - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: self._query_scope(cursor, bgw_instance_metrics, instance_tags, False) - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: self._query_scope(cursor, archiver_instance_metrics, instance_tags, False) if self._config.collect_checksum_metrics and self.version >= V12: # SHOW queries need manual cursor execution so can't be bundled with the metrics - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: cursor.execute("SHOW data_checksums;") enabled = cursor.fetchone()[0] self.count( @@ -764,7 +764,7 @@ def _collect_stats(self, instance_tags): ) if self._config.collect_activity_metrics: activity_metrics = self.metrics_cache.get_activity_metrics(self.version) - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: self._query_scope(cursor, activity_metrics, instance_tags, False) if per_database_metric_scope: @@ -780,11 +780,11 @@ def _collect_stats(self, instance_tags): metric_scope.extend(per_database_metric_scope) for scope in list(metric_scope): - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: self._query_scope(cursor, scope, instance_tags, False) for scope in self._config.custom_metrics: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: self._query_scope(cursor, scope, instance_tags, True) if self.dynamic_queries: @@ -794,12 +794,12 @@ def _collect_stats(self, instance_tags): def _new_connection(self, dbname): if self._config.host == 'localhost' and self._config.password == '': # Use ident method - connection_string = "dbname=%s user=%s application_name=%s" % ( + connection_string = "user=%s dbname=%s application_name=%s" % ( self._config.user, dbname, self._config.application_name, ) - conn = psycopg.connect(conninfo=connection_string, autocommit=True, cursor_factory=CommenterCursor) + conn = psycopg2.connect(connection_string) else: password = self._config.password if 'aws' in self.cloud_metadata: @@ -832,7 +832,7 @@ def _new_connection(self, dbname): 'host': self._config.host, 'user': self._config.user, 'password': password, - 'dbname': dbname, + 'database': dbname, 'sslmode': self._config.ssl_mode, 'application_name': self._config.application_name, } @@ -846,8 +846,9 @@ def _new_connection(self, dbname): args['sslkey'] = self._config.ssl_key if self._config.ssl_password: args['sslpassword'] = self._config.ssl_password - conn = psycopg.connect(**args, autocommit=True, cursor_factory=CommenterCursor) + conn = psycopg2.connect(**args) # Autocommit is enabled by default for safety for all new connections (to prevent long-lived transactions). + conn.set_session(autocommit=True, readonly=True) if self._config.query_timeout: # Set the statement_timeout for the session with conn.cursor() as cursor: @@ -866,7 +867,7 @@ def _connect(self): # Reload pg_settings on a new connection to the main db def _load_pg_settings(self, db): try: - with db.cursor() as cursor: + with db.cursor(cursor_factory=CommenterDictCursor) as cursor: self.log.debug("Running query [%s]", PG_SETTINGS_QUERY) cursor.execute( PG_SETTINGS_QUERY, @@ -877,7 +878,7 @@ def _load_pg_settings(self, db): for setting in rows: name, val = setting self.pg_settings[name] = val - except (psycopg.DatabaseError, psycopg.OperationalError) as err: + except (psycopg2.DatabaseError, psycopg2.OperationalError) as err: self.log.warning("Failed to query for pg_settings: %s", repr(err)) self.count( "dd.postgres.error", @@ -889,9 +890,9 @@ def _load_pg_settings(self, db): def _get_main_db(self): """ - Returns a memoized, persistent psycopg connection to `self.dbname`. + Returns a memoized, persistent psycopg2 connection to `self.dbname`. Threadsafe as long as no transactions are used - :return: a psycopg connection + :return: a psycopg2 connection """ # reload settings for the main DB only once every time the connection is reestablished return self.db_pool.get_connection( diff --git a/postgres/datadog_checks/postgres/statement_samples.py b/postgres/datadog_checks/postgres/statement_samples.py index 07ce8f27331da..7a9baf45fef55 100644 --- a/postgres/datadog_checks/postgres/statement_samples.py +++ b/postgres/datadog_checks/postgres/statement_samples.py @@ -8,9 +8,10 @@ from enum import Enum from typing import Dict, Optional, Tuple # noqa: F401 -import psycopg +import psycopg2 from cachetools import TTLCache -from psycopg.rows import dict_row + +from datadog_checks.postgres.cursor import CommenterCursor, CommenterDictCursor try: import datadog_agent @@ -165,7 +166,7 @@ def __init__(self, check, config, shutdown_callback): ), dbms="postgres", min_collection_interval=config.min_collection_interval, - expected_db_exceptions=(psycopg.errors.DatabaseError,), + expected_db_exceptions=(psycopg2.errors.DatabaseError,), job_name="query-samples", shutdown_callback=shutdown_callback, ) @@ -237,7 +238,7 @@ def _get_active_connections(self): pg_stat_activity_view=self._config.pg_stat_activity_view, extra_filters=extra_filters ) with self._check._get_main_db() as conn: - with conn.cursor(row_factory=dict_row) as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: self._log.debug("Running query [%s] %s", query, params) cursor.execute(query, params) rows = cursor.fetchall() @@ -271,7 +272,7 @@ def _get_new_pg_stat_activity(self, available_activity_columns, activity_columns ) with self._check._get_main_db() as conn: - with conn.cursor(row_factory=dict_row) as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: self._log.debug("Running query [%s] %s", query, params) cursor.execute(query, params) rows = cursor.fetchall() @@ -290,7 +291,7 @@ def _get_pg_stat_activity_cols_cached(self, expected_cols): @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _get_available_activity_columns(self, all_expected_columns): with self._check._get_main_db() as conn: - with conn.cursor(row_factory=dict_row) as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: try: cursor.execute( "select * from {pg_stat_activity_view} LIMIT 0".format( @@ -305,12 +306,12 @@ def _get_available_activity_columns(self, all_expected_columns): "missing the following expected columns from pg_stat_activity: %s", missing_columns ) self._log.debug("found available pg_stat_activity columns: %s", available_columns) - except psycopg.errors.InvalidSchemaName as e: + except psycopg2.errors.InvalidSchemaName as e: self._log.warning( "cannot collect activity due to invalid schema in dbname=%s: %s", self._config.dbname, repr(e) ) return None - except psycopg.DatabaseError as e: + except psycopg2.DatabaseError as e: # if the schema is valid then it's some problem with the function (missing, or invalid permissions, # incorrect definition) self._check.record_warning( @@ -340,7 +341,7 @@ def _filter_and_normalize_statement_rows(self, rows): total_count += 1 if row.get('backend_type') is not None: try: - row['backend_type'] = row['backend_type'].decode('utf-8') + row['backend_type'] = row['backend_type'].tobytes().decode('utf-8') except UnicodeDecodeError: row['backend_type'] = 'unknown' if (not row['datname'] or not row['query']) and row.get( @@ -577,12 +578,12 @@ def _get_db_explain_setup_state(self, dbname): # type: (str) -> Tuple[Optional[DBExplainError], Optional[Exception]] try: self.db_pool.get_connection(dbname, self._conn_ttl_ms) - except psycopg.OperationalError as e: + except psycopg2.OperationalError as e: self._log.warning( "cannot collect execution plans due to failed DB connection to dbname=%s: %s", dbname, repr(e) ) return DBExplainError.connection_error, e - except psycopg.DatabaseError as e: + except psycopg2.DatabaseError as e: self._log.warning( "cannot collect execution plans due to a database error in dbname=%s: %s", dbname, repr(e) ) @@ -590,14 +591,14 @@ def _get_db_explain_setup_state(self, dbname): try: result = self._run_explain(dbname, EXPLAIN_VALIDATION_QUERY, EXPLAIN_VALIDATION_QUERY) - except psycopg.errors.InvalidSchemaName as e: + except psycopg2.errors.InvalidSchemaName as e: self._log.warning("cannot collect execution plans due to invalid schema in dbname=%s: %s", dbname, repr(e)) self._emit_run_explain_error(dbname, DBExplainError.invalid_schema, e) return DBExplainError.invalid_schema, e - except psycopg.errors.DatatypeMismatch as e: + except psycopg2.errors.DatatypeMismatch as e: self._emit_run_explain_error(dbname, DBExplainError.datatype_mismatch, e) return DBExplainError.datatype_mismatch, e - except psycopg.DatabaseError as e: + except psycopg2.DatabaseError as e: # if the schema is valid then it's some problem with the function (missing, or invalid permissions, # incorrect definition) self._emit_run_explain_error(dbname, DBExplainError.failed_function, e) @@ -641,7 +642,7 @@ def _get_db_explain_setup_state_cached(self, dbname): def _run_explain(self, dbname, statement, obfuscated_statement): start_time = time.time() with self.db_pool.get_connection(dbname, ttl_ms=self._conn_ttl_ms) as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: self._log.debug( "Running query on dbname=%s: %s(%s)", dbname, self._explain_function, obfuscated_statement ) @@ -716,7 +717,7 @@ def _run_explain_safe(self, dbname, statement, obfuscated_statement, query_signa return self._explain_parameterized_queries.explain_statement( dbname, statement, obfuscated_statement ) - e = psycopg.errors.UndefinedParameter("Unable to explain parameterized query") + e = psycopg2.errors.UndefinedParameter("Unable to explain parameterized query") self._log.debug( "Unable to collect execution plan, clients using the extended query protocol or prepared statements" " can't be explained due to the separation of the parsed query and raw bind parameters: %s", @@ -727,18 +728,18 @@ def _run_explain_safe(self, dbname, statement, obfuscated_statement, query_signa self._emit_run_explain_error(dbname, DBExplainError.parameterized_query, e) return error_response return self._run_explain(dbname, statement, obfuscated_statement), None, None - except psycopg.errors.UndefinedTable as e: + except psycopg2.errors.UndefinedTable as e: self._log.debug("Failed to collect execution plan: %s", repr(e)) error_response = None, DBExplainError.undefined_table, '{}'.format(type(e)) self._explain_errors_cache[query_signature] = error_response self._emit_run_explain_error(dbname, DBExplainError.undefined_table, e) return error_response - except psycopg.errors.DatabaseError as e: + except psycopg2.errors.DatabaseError as e: self._log.debug("Failed to collect execution plan: %s", repr(e)) error_response = None, DBExplainError.database_error, '{}'.format(type(e)) self._emit_run_explain_error(dbname, DBExplainError.database_error, e) - if isinstance(e, psycopg.errors.ProgrammingError) and not isinstance( - e, psycopg.errors.InsufficientPrivilege + if isinstance(e, psycopg2.errors.ProgrammingError) and not isinstance( + e, psycopg2.errors.InsufficientPrivilege ): # ProgrammingError is things like InvalidName, InvalidSchema, SyntaxError # we don't want to cache things like permission errors for a very long time because they can be fixed diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index eb494306fd892..5ff06b200a550 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -6,9 +6,9 @@ import copy import time -import psycopg +import psycopg2 +import psycopg2.extras from cachetools import TTLCache -from psycopg.rows import dict_row from datadog_checks.base import is_affirmative from datadog_checks.base.utils.common import to_native_string @@ -17,6 +17,7 @@ from datadog_checks.base.utils.db.utils import DBMAsyncJob, default_json_event_encoding, obfuscate_sql_with_metadata from datadog_checks.base.utils.serialization import json from datadog_checks.base.utils.tracking import tracked_method +from datadog_checks.postgres.cursor import CommenterCursor, CommenterDictCursor from .query_calls_cache import QueryCallsCache from .util import DatabaseConfigurationError, payload_pg_version, warning_with_tags @@ -158,7 +159,7 @@ def __init__(self, check, config, shutdown_callback): check, run_sync=is_affirmative(config.statement_metrics_config.get('run_sync', False)), enabled=is_affirmative(config.statement_metrics_config.get('enabled', True)), - expected_db_exceptions=(psycopg.errors.DatabaseError,), + expected_db_exceptions=(psycopg2.errors.DatabaseError,), min_collection_interval=config.min_collection_interval, dbms="postgres", rate_limit=1 / float(collection_interval), @@ -188,13 +189,12 @@ def __init__(self, check, config, shutdown_callback): def _execute_query(self, cursor, query, params=()): try: - self._log.warning("Running query [%s] %s", query, params) + self._log.debug("Running query [%s] %s", query, params) cursor.execute(query, params) return cursor.fetchall() - except (psycopg.ProgrammingError, psycopg.errors.QueryCanceled) as e: + except (psycopg2.ProgrammingError, psycopg2.errors.QueryCanceled) as e: # A failed query could've derived from incorrect columns within the cache. It's a rare edge case, # but the next time the query is run, it will retrieve the correct columns. - self._log.warning("Failed to run query [%s] %s", query, params) self._stat_column_cache = [] raise e @@ -206,7 +206,6 @@ def _get_pg_stat_statements_columns(self): be upgraded without upgrading extensions, even when the extension is included by default. """ if self._stat_column_cache: - self._log.warning("Returning cached columns %s", self._stat_column_cache) return self._stat_column_cache # Querying over '*' with limit 0 allows fetching only the column names from the cursor without data @@ -216,11 +215,10 @@ def _get_pg_stat_statements_columns(self): extra_clauses="LIMIT 0", ) with self._check._get_main_db() as conn: - with conn.cursor() as cursor: - self._execute_query(cursor, query) + with conn.cursor(cursor_factory=CommenterCursor) as cursor: + self._execute_query(cursor, query, params=(self._config.dbname,)) col_names = [desc[0] for desc in cursor.description] if cursor.description else [] self._stat_column_cache = col_names - self._log.warning("Fetched columns %s", col_names) return col_names def _check_called_queries(self): @@ -233,9 +231,9 @@ def _check_called_queries(self): pgss_view_without_query_text = "pg_stat_statements(false)" with self._check._get_main_db() as conn: - with conn.cursor(row_factory=dict_row) as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: query = QUERYID_TO_CALLS_QUERY.format(pg_stat_statements_view=pgss_view_without_query_text) - rows = self._execute_query(cursor, query) + rows = self._execute_query(cursor, query, params=(self._config.dbname,)) self._query_calls_cache.set_calls(rows) self._check.gauge( "dd.postgresql.pg_stat_statements.calls_changed", @@ -354,7 +352,7 @@ def _load_pg_stat_statements(self): ) params = params + tuple(self._config.ignore_databases) with self._check._get_main_db() as conn: - with conn.cursor(row_factory=dict_row) as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: if len(self._query_calls_cache.cache) > 0: return self._execute_query( cursor, @@ -376,11 +374,11 @@ def _load_pg_stat_statements(self): ), params=params, ) - except psycopg.Error as e: + except psycopg2.Error as e: error_tag = "error:database-{}".format(type(e).__name__) if ( - isinstance(e, psycopg.errors.ObjectNotInPrerequisiteState) + isinstance(e, psycopg2.errors.ObjectNotInPrerequisiteState) ) and 'pg_stat_statements must be loaded' in str(e.pgerror): error_tag = "error:database-{}-pg_stat_statements_not_loaded".format(type(e).__name__) self._check.record_warning( @@ -397,7 +395,7 @@ def _load_pg_stat_statements(self): code=DatabaseConfigurationError.pg_stat_statements_not_loaded.value, ), ) - elif isinstance(e, psycopg.errors.UndefinedTable) and 'pg_stat_statements' in str(e.pgerror): + elif isinstance(e, psycopg2.errors.UndefinedTable) and 'pg_stat_statements' in str(e.pgerror): error_tag = "error:database-{}-pg_stat_statements_not_created".format(type(e).__name__) self._check.record_warning( DatabaseConfigurationError.pg_stat_statements_not_created, @@ -440,7 +438,7 @@ def _emit_pg_stat_statements_dealloc(self): return try: with self._check._get_main_db() as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: rows = self._execute_query( cursor, PG_STAT_STATEMENTS_DEALLOC, @@ -453,7 +451,7 @@ def _emit_pg_stat_statements_dealloc(self): tags=self.tags, hostname=self._check.resolved_hostname, ) - except psycopg.Error as e: + except psycopg2.Error as e: self._log.warning("Failed to query for pg_stat_statements_info: %s", e) @tracked_method(agent_check_getter=agent_check_getter) @@ -461,7 +459,7 @@ def _emit_pg_stat_statements_metrics(self): query = PG_STAT_STATEMENTS_COUNT_QUERY_LT_9_4 if self._check.version < V9_4 else PG_STAT_STATEMENTS_COUNT_QUERY try: with self._check._get_main_db() as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: rows = self._execute_query( cursor, query, @@ -481,7 +479,7 @@ def _emit_pg_stat_statements_metrics(self): tags=self.tags, hostname=self._check.resolved_hostname, ) - except psycopg.Error as e: + except psycopg2.Error as e: self._log.warning("Failed to query for pg_stat_statements count: %s", e) def _baseline_metrics_query_key(self, row): @@ -521,7 +519,7 @@ def _check_baseline_metrics_expiry(self): if ( self._last_baseline_metrics_expiry is None or self._last_baseline_metrics_expiry + self._config.baseline_metrics_expiry < time.time() - or len(self._baseline_metrics) > 3 * int(self._check.pg_settings.get("pg_stat_statements.max", 10000)) + or len(self._baseline_metrics) > 3 * int(self._check.pg_settings.get("pg_stat_statements.max")) ): self._baseline_metrics = {} self._query_calls_cache = QueryCallsCache() diff --git a/postgres/datadog_checks/postgres/version_utils.py b/postgres/datadog_checks/postgres/version_utils.py index 0eb38b573cb6f..68ebf3e26c945 100644 --- a/postgres/datadog_checks/postgres/version_utils.py +++ b/postgres/datadog_checks/postgres/version_utils.py @@ -6,6 +6,7 @@ from semver import VersionInfo from datadog_checks.base.log import get_check_logger +from datadog_checks.postgres.cursor import CommenterCursor V8_3 = VersionInfo.parse("8.3.0") V9 = VersionInfo.parse("9.0.0") @@ -30,7 +31,7 @@ def __init__(self): @staticmethod def get_raw_version(db): with db as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: cursor.execute('SHOW SERVER_VERSION;') raw_version = cursor.fetchone()[0] return raw_version @@ -39,7 +40,7 @@ def is_aurora(self, db): if self._seen_aurora_exception: return False with db as conn: - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: # This query will pollute PG logs in non aurora versions, # but is the only reliable way to detect aurora try: diff --git a/postgres/pyproject.toml b/postgres/pyproject.toml index e0fcd40090a4e..0c7e7a1dc0866 100644 --- a/postgres/pyproject.toml +++ b/postgres/pyproject.toml @@ -40,7 +40,7 @@ deps = [ "azure-identity==1.19.0", "boto3==1.36.1", "cachetools==5.5.0", - "psycopg[c]==3.2.3", + "psycopg2-binary==2.9.9", "semver==3.0.2", ] diff --git a/postgres/tests/conftest.py b/postgres/tests/conftest.py index a0081ffe3fc7d..91ad37bdde6ab 100644 --- a/postgres/tests/conftest.py +++ b/postgres/tests/conftest.py @@ -4,7 +4,7 @@ import copy import os -import psycopg +import psycopg2 import pytest from semver import VersionInfo @@ -38,20 +38,12 @@ } -E2E_METADATA = { - 'start_commands': [ - 'apt update', - 'apt install -y --no-install-recommends build-essential python3-dev libpq-dev', - ], -} - - def connect_to_pg(): - psycopg.connect(host=HOST, dbname=DB_NAME, user=USER, password=PASSWORD) + psycopg2.connect(host=HOST, dbname=DB_NAME, user=USER, password=PASSWORD) if float(POSTGRES_VERSION) >= 10.0: - psycopg.connect(host=HOST, dbname=DB_NAME, user=USER, port=PORT_REPLICA, password=PASSWORD) - psycopg.connect(host=HOST, dbname=DB_NAME, user=USER, port=PORT_REPLICA2, password=PASSWORD) - psycopg.connect(host=HOST, dbname=DB_NAME, user=USER, port=PORT_REPLICA_LOGICAL, password=PASSWORD) + psycopg2.connect(host=HOST, dbname=DB_NAME, user=USER, port=PORT_REPLICA, password=PASSWORD) + psycopg2.connect(host=HOST, dbname=DB_NAME, user=USER, port=PORT_REPLICA2, password=PASSWORD) + psycopg2.connect(host=HOST, dbname=DB_NAME, user=USER, port=PORT_REPLICA_LOGICAL, password=PASSWORD) @pytest.fixture(scope='session') @@ -67,7 +59,7 @@ def dd_environment(e2e_instance): conditions=[WaitFor(connect_to_pg)], env_vars={"POSTGRES_IMAGE": POSTGRES_IMAGE}, ): - yield e2e_instance, E2E_METADATA + yield e2e_instance @pytest.fixture diff --git a/postgres/tests/test_connections.py b/postgres/tests/test_connections.py index 969fbd8eafdba..670373fd772cb 100644 --- a/postgres/tests/test_connections.py +++ b/postgres/tests/test_connections.py @@ -7,9 +7,8 @@ import time import uuid -import psycopg +import psycopg2 import pytest -from psycopg.rows import dict_row from datadog_checks.postgres import PostgreSql from datadog_checks.postgres.connections import ConnectionPoolFullError, MultiDatabaseConnectionPool @@ -35,7 +34,7 @@ def test_conn_pool(pg_instance): assert len(pool._conns) == 1 assert pool._stats.connection_closed == 0 - with db.cursor() as cursor: + with db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute("select 1") rows = cursor.fetchall() assert len(rows) == 1 and rows[0][0] == 1 @@ -71,7 +70,7 @@ def test_conn_pool_no_leaks_on_close(pg_instance): # Used to make verification queries pool2 = MultiDatabaseConnectionPool( - lambda dbname: psycopg.connect(host=HOST, dbname=dbname, user=USER_ADMIN, password=PASSWORD_ADMIN) + lambda dbname: psycopg2.connect(host=HOST, dbname=dbname, user=USER_ADMIN, password=PASSWORD_ADMIN) ) # Iterate in the test many times to detect flakiness @@ -83,7 +82,7 @@ def get_activity(): Fetches all pg_stat_activity rows generated by this test and connection to a "dogs%" database """ with pool2.get_connection('postgres', 1) as conn: - cursor = conn.cursor() + cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cursor.execute( "SELECT pid, datname, usename, state, query_start, state_change, application_name" " FROM pg_stat_activity" @@ -96,7 +95,7 @@ def get_activity(): for i in range(0, conn_count): dbname = 'dogs_{}'.format(i) db = pool._get_connection_raw(dbname, 10 * 1000) - with db.cursor() as cursor: + with db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute("select current_database()") rows = cursor.fetchall() assert len(rows) == 1 @@ -137,7 +136,7 @@ def test_conn_pool_no_leaks_on_prune(pg_instance): pool = MultiDatabaseConnectionPool(check._new_connection) # Used to make verification queries pool2 = MultiDatabaseConnectionPool( - lambda dbname: psycopg.connect(host=HOST, dbname=dbname, user=USER_ADMIN, password=PASSWORD_ADMIN) + lambda dbname: psycopg2.connect(host=HOST, dbname=dbname, user=USER_ADMIN, password=PASSWORD_ADMIN) ) ttl_long = 90 * 1000 ttl_short = 1 @@ -147,7 +146,7 @@ def get_activity(): Fetches all pg_stat_activity rows generated by this test and connection to a "dogs%" database """ with pool2.get_connection('postgres', 1) as conn: - cursor = conn.cursor(row_factory=dict_row) + cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cursor.execute( "SELECT pid, datname, usename, state, query_start, state_change, application_name" " FROM pg_stat_activity" @@ -163,7 +162,7 @@ def get_many_connections(count, ttl): for i in range(0, count): dbname = 'dogs_{}'.format(i) db = pool._get_connection_raw(dbname, ttl) - with db.cursor() as cursor: + with db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute("select current_database()") rows = cursor.fetchall() assert len(rows) == 1 @@ -190,7 +189,7 @@ def get_many_connections(count, ttl): < approximate_deadline + datetime.timedelta(seconds=2) ) assert not db.closed - assert db.info.status == psycopg.pq.ConnStatus.OK + assert db.status == psycopg2.extensions.STATUS_READY # Check that those pooled connections do exist on the database rows = get_activity() assert len(rows) == 50 @@ -367,7 +366,7 @@ def test_conn_statement_timeout(pg_instance): pg_instance["query_timeout"] = 500 check = PostgreSql('postgres', {}, [pg_instance]) check._connect() - with pytest.raises(psycopg.errors.QueryCanceled): + with pytest.raises(psycopg2.errors.QueryCanceled): with check.db() as conn: with conn.cursor() as cursor: cursor.execute("SELECT pg_sleep(1)") diff --git a/postgres/tests/test_cursor.py b/postgres/tests/test_cursor.py index 412ac737e436f..97e81300e2715 100644 --- a/postgres/tests/test_cursor.py +++ b/postgres/tests/test_cursor.py @@ -3,6 +3,8 @@ # Licensed under a 3-clause BSD style license (see LICENSE) import pytest +from datadog_checks.postgres.cursor import CommenterCursor, CommenterDictCursor + from .utils import _get_superconn @@ -14,7 +16,8 @@ def test_integration_connection_with_commenter_cursor(integration_check, pg_inst check = integration_check(pg_instance) with check.db() as conn: - with conn.cursor() as cursor: + # verify CommenterCursor and CommenterDictCursor prepend the query with /* service='datadog-agent' */ + with conn.cursor(cursor_factory=CommenterCursor) as cursor: cursor.execute( 'SELECT generate_series(1, 10) AS number', ignore_query_metric=ignore, @@ -23,7 +26,7 @@ def test_integration_connection_with_commenter_cursor(integration_check, pg_inst assert isinstance(result[0], int) __check_prepand_sql_comment(pg_instance, ignore) - with conn.cursor() as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: cursor.execute( 'SELECT generate_series(1, 10) AS number', ignore_query_metric=ignore, diff --git a/postgres/tests/test_deadlock.py b/postgres/tests/test_deadlock.py index e003a017e3049..b992b65360df4 100644 --- a/postgres/tests/test_deadlock.py +++ b/postgres/tests/test_deadlock.py @@ -2,12 +2,12 @@ # All rights reserved # Licensed under Simplified BSD License (see LICENSE) -import threading +import copy +import select import time -import psycopg +import psycopg2 import pytest -from psycopg import ClientCursor from .common import DB_NAME, HOST, POSTGRES_VERSION, _get_expected_tags @@ -48,17 +48,27 @@ def test_deadlock(aggregator, dd_run_check, integration_check, pg_instance): conn = check._new_connection(pg_instance['dbname']) cursor = conn.cursor() - def execute_in_thread(q, args): - with psycopg.connect( - host=HOST, dbname=DB_NAME, user="bob", password="bob", cursor_factory=ClientCursor - ) as tconn: - with tconn.cursor() as cur: - # this will block, and eventually throw when - # the deadlock is created - try: - cur.execute(q, args) - except psycopg.errors.DeadlockDetected: - pass + def wait(conn): + while True: + state = conn.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_WRITE: + select.select([], [conn.fileno()], []) + elif state == psycopg2.extensions.POLL_READ: + select.select([conn.fileno()], [], []) + else: + raise psycopg2.OperationalError("poll() returned %s" % state) + time.sleep(0.1) + + conn_args = {'host': HOST, 'dbname': DB_NAME, 'user': "bob", 'password': "bob"} + conn_args_async = copy.copy(conn_args) + conn_args_async["async_"] = 1 + conn1 = psycopg2.connect(**conn_args) + conn1.autocommit = False + + conn2 = psycopg2.connect(**conn_args_async) + wait(conn2) appname = 'deadlock sess' appname1 = appname + '1' @@ -70,25 +80,22 @@ def execute_in_thread(q, args): cursor.execute(deadlock_count_sql, (DB_NAME,)) deadlocks_before = cursor.fetchone()[0] - conn_args = {'host': HOST, 'dbname': DB_NAME, 'user': "bob", 'password': "bob"} - conn1 = psycopg.connect(**conn_args, autocommit=False, cursor_factory=ClientCursor) - cur1 = conn1.cursor() cur1.execute(appname_sql, (appname1,)) cur1.execute(update_sql, (1,)) - args = (appname2, 2, 1) - query = """SET application_name=%s; + cur2 = conn2.cursor() + cur2.execute( + """SET application_name=%s; begin transaction; {}; {}; commit; """.format( - update_sql, update_sql + update_sql, update_sql + ), + (appname2, 2, 1), ) - # ... now execute the test query in a separate thread - lock_task = threading.Thread(target=execute_in_thread, args=(query, args)) - lock_task.start() lock_count_sql = """SELECT COUNT(1) FROM pg_catalog.pg_locks blocked_locks @@ -118,10 +125,11 @@ def execute_in_thread(q, args): try: cur1.execute(update_sql, (2,)) cur1.execute("commit") - except psycopg.errors.DeadlockDetected: + except psycopg2.errors.DeadlockDetected: pass conn1.close() + conn2.close() dd_run_check(check) diff --git a/postgres/tests/test_discovery.py b/postgres/tests/test_discovery.py index 33a619fe3d6a9..31608a9664d27 100644 --- a/postgres/tests/test_discovery.py +++ b/postgres/tests/test_discovery.py @@ -8,8 +8,8 @@ import time from contextlib import contextmanager -import psycopg -import psycopg.sql +import psycopg2 +import psycopg2.sql import pytest from datadog_checks.base import ConfigurationError @@ -75,7 +75,7 @@ @contextmanager def get_postgres_connection(dbname="postgres"): conn_args = {'host': HOST, 'dbname': dbname, 'user': USER_ADMIN, 'password': PASSWORD_ADMIN} - conn = psycopg.connect(**conn_args) + conn = psycopg2.connect(**conn_args) conn.autocommit = True yield conn @@ -172,7 +172,7 @@ def test_autodiscovery_refresh(integration_check, pg_instance): with get_postgres_connection() as conn: cursor = conn.cursor() try: - cursor.execute(psycopg.sql.SQL("CREATE DATABASE {}").format(psycopg.sql.Identifier(database_to_find))) + cursor.execute(psycopg2.sql.SQL("CREATE DATABASE {}").format(psycopg2.sql.Identifier(database_to_find))) time.sleep(pg_instance["database_autodiscovery"]['refresh']) databases = check.autodiscovery.get_items() @@ -180,7 +180,7 @@ def test_autodiscovery_refresh(integration_check, pg_instance): finally: # Need to drop the new database to clean up the environment for next tests. cursor.execute( - psycopg.sql.SQL("DROP DATABASE {} WITH (FORCE);").format(psycopg.sql.Identifier(database_to_find)) + psycopg2.sql.SQL("DROP DATABASE {} WITH (FORCE);").format(psycopg2.sql.Identifier(database_to_find)) ) diff --git a/postgres/tests/test_explain_parameterized_queries.py b/postgres/tests/test_explain_parameterized_queries.py index c14263fd33a20..1b8d0a87abc1f 100644 --- a/postgres/tests/test_explain_parameterized_queries.py +++ b/postgres/tests/test_explain_parameterized_queries.py @@ -4,7 +4,7 @@ from unittest import mock -import psycopg +import psycopg2 import pytest from datadog_checks.base.utils.db.sql import compute_sql_signature @@ -116,7 +116,7 @@ def test_explain_parameterized_queries_version_below_12(integration_check, dbm_i assert plan_dict is None assert explain_err_code == DBExplainError.parameterized_query assert err is not None - assert err == "" + assert err == "" @pytest.mark.integration @@ -130,7 +130,7 @@ def test_explain_parameterized_queries_create_prepared_statement_exception(integ with mock.patch( 'datadog_checks.postgres.explain_parameterized_queries.ExplainParameterizedQueries._create_prepared_statement', - side_effect=psycopg.errors.DatabaseError("unexpected exception"), + side_effect=psycopg2.errors.DatabaseError("unexpected exception"), ): plan_dict, explain_err_code, err = check.statement_samples._run_and_track_explain( DB_NAME, "SELECT * FROM pg_settings WHERE name = $1", "SELECT * FROM pg_settings WHERE name = $1", "" @@ -138,7 +138,7 @@ def test_explain_parameterized_queries_create_prepared_statement_exception(integ assert plan_dict is None assert explain_err_code == DBExplainError.failed_to_explain_with_prepared_statement assert err is not None - assert err == "" + assert err == "" @pytest.mark.integration @@ -152,14 +152,14 @@ def test_explain_parameterized_queries_explain_prepared_statement_exception(inte with mock.patch( 'datadog_checks.postgres.explain_parameterized_queries.ExplainParameterizedQueries._explain_prepared_statement', - side_effect=psycopg.errors.DatabaseError("unexpected exception"), + side_effect=psycopg2.errors.DatabaseError("unexpected exception"), ): query = "SELECT * FROM pg_settings WHERE name = $1" plan_dict, explain_err_code, err = check.statement_samples._run_and_track_explain(DB_NAME, query, query, "") assert plan_dict is None assert explain_err_code == DBExplainError.failed_to_explain_with_prepared_statement assert err is not None - assert err == "" + assert err == "" # check that we deallocated the prepared statement after explaining rows = check.statement_samples._explain_parameterized_queries._execute_query_and_fetch_rows( DB_NAME, diff --git a/postgres/tests/test_pg_integration.py b/postgres/tests/test_pg_integration.py index da16f2270a4ac..b5c27a830530f 100644 --- a/postgres/tests/test_pg_integration.py +++ b/postgres/tests/test_pg_integration.py @@ -6,7 +6,7 @@ import time import mock -import psycopg +import psycopg2 import pytest from datadog_checks.base.errors import ConfigurationError @@ -115,7 +115,8 @@ def test_initialization_tags(integration_check, pg_instance): def test_snapshot_xmin(aggregator, integration_check, pg_instance): - with psycopg.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g", autocommit=True) as conn: + with psycopg2.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn: + conn.set_session(autocommit=True) with conn.cursor() as cur: if float(POSTGRES_VERSION) >= 13.0: query = 'select pg_snapshot_xmin(pg_current_snapshot());' @@ -132,7 +133,9 @@ def test_snapshot_xmin(aggregator, integration_check, pg_instance): aggregator.assert_metric('postgresql.snapshot.xmax', count=1, tags=expected_tags) assert aggregator.metrics('postgresql.snapshot.xmax')[0].value >= xmin - with psycopg.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g", autocommit=True) as conn: + with psycopg2.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn: + # Force autocommit + conn.set_session(autocommit=True) with conn.cursor() as cur: _increase_txid(cur) @@ -274,7 +277,7 @@ def test_unsupported_replication(aggregator, integration_check, pg_instance): def format_with_error(value, **kwargs): if 'pg_is_in_recovery' in value: called.append(True) - raise psycopg.errors.FeatureNotSupported("Not available") + raise psycopg2.errors.FeatureNotSupported("Not available") return unpatched_fmt.format(value, **kwargs) # This simulate an error in the fmt function, as it's a bit hard to mock psycopg @@ -322,7 +325,7 @@ def test_can_connect_service_check(aggregator, integration_check, pg_instance): # Forth: connection health check failed with pytest.raises(DatabaseHealthCheckError): db = mock.MagicMock() - db.cursor().__enter__().execute.side_effect = psycopg.OperationalError('foo') + db.cursor().__enter__().execute.side_effect = psycopg2.OperationalError('foo') @contextlib.contextmanager def mock_db(): @@ -380,7 +383,7 @@ def test_locks_metrics_no_relations(aggregator, integration_check, pg_instance): Since 4.0.0, to prevent tag explosion, lock metrics are not collected anymore unless relations are specified """ check = integration_check(pg_instance) - with psycopg.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn: + with psycopg2.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn: with conn.cursor() as cur: cur.execute('LOCK persons') check.run() @@ -637,7 +640,7 @@ def test_query_timeout(integration_check, pg_instance): pg_instance['query_timeout'] = 1000 check = integration_check(pg_instance) check._connect() - with pytest.raises(psycopg.errors.QueryCanceled): + with pytest.raises(psycopg2.errors.QueryCanceled): with check.db() as conn: with conn.cursor() as cursor: cursor.execute("select pg_sleep(2000)") @@ -823,7 +826,7 @@ def test_database_instance_metadata(aggregator, pg_instance, dbm_enabled, report "enabled": True, }, }, - psycopg.OperationalError, + psycopg2.OperationalError, 'password authentication failed', True, ), @@ -853,7 +856,7 @@ def test_database_instance_metadata(aggregator, pg_instance, dbm_enabled, report "enabled": 'true', }, }, - psycopg.OperationalError, + psycopg2.OperationalError, 'password authentication failed', True, ), @@ -927,7 +930,7 @@ def test_database_instance_cloud_metadata_aws( { "client_id": "my-client-id", }, - psycopg.OperationalError, + psycopg2.OperationalError, 'password authentication failed', True, ), @@ -973,7 +976,7 @@ def test_database_instance_cloud_metadata_aws( { "client_id": "my-client-id", }, - psycopg.OperationalError, + psycopg2.OperationalError, 'password authentication failed', True, ), @@ -988,7 +991,7 @@ def test_database_instance_cloud_metadata_aws( }, }, None, - psycopg.OperationalError, + psycopg2.OperationalError, 'password authentication failed', True, ), diff --git a/postgres/tests/test_pg_replication.py b/postgres/tests/test_pg_replication.py index a7f5a2983e98e..c0307b6337bf3 100644 --- a/postgres/tests/test_pg_replication.py +++ b/postgres/tests/test_pg_replication.py @@ -118,11 +118,13 @@ def test_conflicts_lock(aggregator, integration_check, pg_instance, pg_replica_i check = integration_check(pg_replica_instance2) replica_con = _get_superconn(pg_replica_instance2) + replica_con.set_session(autocommit=False) replica_cur = replica_con.cursor() replica_cur.execute('BEGIN;') replica_cur.execute('select * from persons;') conn = _get_superconn(pg_instance) + conn.set_session(autocommit=True) cur = conn.cursor() cur.execute('update persons SET personid = 1 where personid = 1;') cur.execute('vacuum full persons;') @@ -148,11 +150,13 @@ def test_conflicts_snapshot(aggregator, integration_check, pg_instance, pg_repli check = integration_check(pg_replica_instance2) replica2_con = _get_superconn(pg_replica_instance2) + replica2_con.set_session(autocommit=False) replica2_cur = replica2_con.cursor() replica2_cur.execute('BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;') replica2_cur.execute('select * from persons;') - conn = _get_superconn(pg_instance, autocommit=True) + conn = _get_superconn(pg_instance) + conn.set_session(autocommit=True) cur = conn.cursor() cur.execute('update persons SET personid = 1 where personid = 1;') time.sleep(1.2) diff --git a/postgres/tests/test_relations.py b/postgres/tests/test_relations.py index 3419fb26a036f..1670be09f7b43 100644 --- a/postgres/tests/test_relations.py +++ b/postgres/tests/test_relations.py @@ -4,7 +4,7 @@ import threading -import psycopg +import psycopg2 import pytest from datadog_checks.base import ConfigurationError @@ -435,7 +435,7 @@ def check_with_lock(check, instance, lock_table=None): lock_statement = 'LOCK persons' if lock_table is not None: lock_statement = 'LOCK {}'.format(lock_table) - with psycopg.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn: + with psycopg2.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn: with conn.cursor() as cur: cur.execute(lock_statement) check.check(instance) diff --git a/postgres/tests/test_replication_slot.py b/postgres/tests/test_replication_slot.py index dc88bb0798e55..c33fa66cc326c 100644 --- a/postgres/tests/test_replication_slot.py +++ b/postgres/tests/test_replication_slot.py @@ -3,7 +3,7 @@ # Licensed under a 3-clause BSD style license (see LICENSE) import time -import psycopg +import psycopg2 import pytest from datadog_checks.postgres.util import QUERY_PG_REPLICATION_SLOTS_STATS @@ -22,17 +22,17 @@ def test_physical_replication_slots(aggregator, integration_check, pg_instance): time.sleep(5) redo_lsn_age = 0 xmin_age_higher_bound = 1 - conn = psycopg.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") - with conn.cursor() as cur: - cur.execute("select pg_wal_lsn_diff(pg_current_wal_lsn(), redo_lsn) from pg_control_checkpoint();") - redo_lsn_age = int(cur.fetchall()[0][0]) - cur.execute('select age(xmin) FROM pg_replication_slots;') - bound = cur.fetchall()[0][0] - xmin_age_higher_bound += int(bound) if bound is not None else 0 - - cur.execute("select * from pg_create_physical_replication_slot('phys_1');") - cur.execute("select * from pg_create_physical_replication_slot('phys_2', true);") - cur.execute("select * from pg_create_physical_replication_slot('phys_3', true, true);") + with psycopg2.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn: + with conn.cursor() as cur: + cur.execute("select pg_wal_lsn_diff(pg_current_wal_lsn(), redo_lsn) from pg_control_checkpoint();") + redo_lsn_age = int(cur.fetchall()[0][0]) + cur.execute('select age(xmin) FROM pg_replication_slots;') + bound = cur.fetchall()[0][0] + xmin_age_higher_bound += int(bound) if bound is not None else 0 + + cur.execute("select * from pg_create_physical_replication_slot('phys_1');") + cur.execute("select * from pg_create_physical_replication_slot('phys_2', true);") + cur.execute("select * from pg_create_physical_replication_slot('phys_3', true, true);") check.check(pg_instance) # slot_name | slot_type | temporary | active | active_pid | xmin | restart_lsn @@ -86,13 +86,11 @@ def test_physical_replication_slots(aggregator, integration_check, pg_instance): count=1, ) - conn.close() - @requires_over_10 def test_logical_replication_slots(aggregator, integration_check, pg_instance): check = integration_check(pg_instance) - with psycopg.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn: + with psycopg2.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn: with conn.cursor() as cur: cur.execute("SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) FROM pg_replication_slots;") restart_age = cur.fetchall()[0][0] diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 42703369bd262..f486cbf57f3ba 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -3,16 +3,15 @@ # Licensed under Simplified BSD License (see LICENSE) import datetime import re -import threading +import select import time from collections import Counter from concurrent.futures.thread import ThreadPoolExecutor import mock -import psycopg +import psycopg2 import pytest from dateutil import parser -from psycopg import ClientCursor from semver import VersionInfo from datadog_checks.base.utils.db.sql import compute_sql_signature @@ -23,7 +22,7 @@ DBExplainError, StatementTruncationState, ) -from datadog_checks.postgres.statements import PG_STAT_STATEMENTS_METRICS_COLUMNS +from datadog_checks.postgres.statements import PG_STAT_STATEMENTS_METRICS_COLUMNS, PG_STAT_STATEMENTS_TIMING_COLUMNS from datadog_checks.postgres.util import payload_pg_version from datadog_checks.postgres.version_utils import V12 @@ -35,7 +34,7 @@ _get_expected_replication_tags, _get_expected_tags, ) -from .utils import WaitGroup, _get_conn, _get_superconn, requires_over_10, requires_over_13, run_one_check +from .utils import _get_conn, _get_superconn, requires_over_10, requires_over_13, run_one_check pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] @@ -115,9 +114,7 @@ def _run_query(idx): password = "bob" dbname = "datadog_test" if dbname not in connections: - connections[dbname] = psycopg.connect( - host=HOST, dbname=dbname, user=user, password=password, cursor_factory=ClientCursor - ) + connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) args = ('two',) if idx == 1: @@ -233,9 +230,7 @@ def test_statement_metrics( def _run_queries(): for user, password, dbname, query, arg in SAMPLE_QUERIES: if dbname not in connections: - connections[dbname] = psycopg.connect( - host=HOST, dbname=dbname, user=user, password=password, autocommit=True, cursor_factory=ClientCursor - ) + connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) connections[dbname].cursor().execute(query, (arg,)) check = integration_check(dbm_instance) @@ -280,13 +275,9 @@ def _should_catch_query(dbname): assert len(aggregator.metrics("postgresql.pg_stat_statements.count")) != 0 dbm_samples = aggregator.get_event_platform_events("dbm-samples") - print(event) for username, _, dbname, query, _ in SAMPLE_QUERIES: - print(query) expected_query = query % obfuscated_param - print(expected_query) query_signature = compute_sql_signature(expected_query) - print("query_signature: ", query_signature) matching_rows = [r for r in event['postgres_rows'] if r['query_signature'] == query_signature] if not _should_catch_query(dbname): assert len(matching_rows) == 0 @@ -300,10 +291,10 @@ def _should_catch_query(dbname): assert row['query'] == expected_query available_columns = set(row.keys()) metric_columns = available_columns & PG_STAT_STATEMENTS_METRICS_COLUMNS - # if track_io_timing_enabled: - # assert (available_columns & PG_STAT_STATEMENTS_TIMING_COLUMNS) == PG_STAT_STATEMENTS_TIMING_COLUMNS - # else: - # assert (available_columns & PG_STAT_STATEMENTS_TIMING_COLUMNS) == set() + if track_io_timing_enabled: + assert (available_columns & PG_STAT_STATEMENTS_TIMING_COLUMNS) == PG_STAT_STATEMENTS_TIMING_COLUMNS + else: + assert (available_columns & PG_STAT_STATEMENTS_TIMING_COLUMNS) == set() for col in metric_columns: assert type(row[col]) in (float, int) @@ -417,7 +408,7 @@ def test_statement_metrics_cloud_metadata( def _run_queries(): for user, password, dbname, query, arg in SAMPLE_QUERIES: if dbname not in connections: - connections[dbname] = psycopg.connect(host=HOST, dbname=dbname, user=user, password=password) + connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) connections[dbname].cursor().execute(query, (arg,)) check = integration_check(dbm_instance) @@ -457,7 +448,7 @@ def test_wal_metrics(aggregator, integration_check, dbm_instance): def _run_queries(): for user, password, dbname, query, arg in SAMPLE_QUERIES: if dbname not in connections: - connections[dbname] = psycopg.connect(host=HOST, dbname=dbname, user=user, password=password) + connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) connections[dbname].cursor().execute(query, (arg,)) check = integration_check(dbm_instance) @@ -494,7 +485,7 @@ def test_statement_metrics_with_duplicates(aggregator, integration_check, dbm_in normalized_query = 'select * from pg_stat_activity where application_name = ANY(array [ ? ])' def obfuscate_sql(query, options=None): - if 'select * from pg_stat_activity where application_name' in query: + if query.startswith('select * from pg_stat_activity where application_name'): return normalized_query return query @@ -519,8 +510,6 @@ def obfuscate_sql(query, options=None): assert len(events) == 1 event = events[0] - print(event) - matching = [e for e in event['postgres_rows'] if e['query_signature'] == query_signature] assert len(matching) == 1 row = matching[0] @@ -529,7 +518,7 @@ def obfuscate_sql(query, options=None): @pytest.fixture def bob_conn(): - conn = psycopg.connect(host=HOST, dbname=DB_NAME, user="bob", password="bob") + conn = psycopg2.connect(host=HOST, dbname=DB_NAME, user="bob", password="bob") yield conn conn.close() @@ -586,42 +575,42 @@ def test_get_db_explain_setup_state(integration_check, dbm_instance, dbname, exp [ ( "select * from fake_table", - "error:explain-undefined_table-", + "error:explain-undefined_table-", None, 1, None, ), ( "select * from fake_schema.fake_table", - "error:explain-undefined_table-", + "error:explain-undefined_table-", None, 1, None, ), ( "select * from pg_settings where name = $1", - "error:explain-parameterized_query-", + "error:explain-parameterized_query-", None, 1, None, ), ( "select * from pg_settings where name = 'this query is truncated' limi", - "error:explain-database_error-", + "error:explain-database_error-", None, 1, None, ), ( "select * from persons", - "error:explain-database_error-", + "error:explain-database_error-", "datadog.explain_statement_noaccess", failed_explain_test_repeat_count, None, ), ( "update persons set firstname='firstname' where personid in (2, 1); select pg_sleep(1);", - "error:explain-database_error-", + "error:explain-database_error-", None, 1, None, @@ -716,7 +705,7 @@ def test_failed_explain_handling( "SELECT * FROM kennel WHERE id = %s", 123, "error:explain-no_plans_possible", - [{'code': 'invalid_schema', 'message': ""}], + [{'code': 'invalid_schema', 'message': ""}], StatementTruncationState.not_truncated.value, [], ), @@ -726,8 +715,8 @@ def test_failed_explain_handling( "dogs_nofunc", "SELECT * FROM kennel WHERE id = %s", 123, - "error:explain-failed_function-", - [{'code': 'failed_function', 'message': ""}], + "error:explain-failed_function-", + [{'code': 'failed_function', 'message': ""}], StatementTruncationState.not_truncated.value, [ "Unable to collect execution plans in dbname=dogs_nofunc. Check that the function " @@ -736,7 +725,7 @@ def test_failed_explain_handling( " for more details: function datadog.explain_statement(unknown) does not exist\nLINE 1: " "... DDIGNORE */ /* service='datadog-agent' */ SELECT datadog.ex...\n" " ^\nHINT: No function matches the given " - "name and argument types. You might need to add explicit type casts.\ncode=undefined-explain-function" + "name and argument types. You might need to add explicit type casts.\n\ncode=undefined-explain-function" " dbname=dogs_nofunc host=stubbed.hostname", ], ), @@ -795,10 +784,8 @@ def test_statement_samples_collect( check = integration_check(dbm_instance) check._connect() - conn = psycopg.connect( - host=HOST, dbname=dbname, user=user, password=password, autocommit=True, cursor_factory=ClientCursor - ) - # we are able to see the full query (including the raw parameters) in pg_stat_activity because psycopg uses + conn = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) + # we are able to see the full query (including the raw parameters) in pg_stat_activity because psycopg2 uses # the simple query protocol, sending the whole query as a plain string to postgres. # if a client is using the extended query protocol with prepare then the query would appear as # leave connection open until after the check has run to ensure we're able to see the query in @@ -812,9 +799,11 @@ def test_statement_samples_collect( expected_query = query % ('\'' + arg + '\'' if isinstance(arg, str) else arg) - # Find matching events by checking if the expected query is continaed the event statement. Using this - # instead of a direct equality check covers cases of truncated statements and leading comments - matching = [e for e in dbm_samples if e['db']['statement'].encode("utf-8") in expected_query.encode("utf-8")] + # Find matching events by checking if the expected query starts with the event statement. Using this + # instead of a direct equality check covers cases of truncated statements + matching = [ + e for e in dbm_samples if expected_query.encode("utf-8").startswith(e['db']['statement'].encode("utf-8")) + ] if POSTGRES_VERSION.split('.')[0] == "9" and pg_stat_activity_view == "pg_stat_activity": # pg_monitor role exists only in version 10+ @@ -831,7 +820,7 @@ def test_statement_samples_collect( hostname='stubbed.hostname', ) else: - assert len(matching) == 1, "missing captured event for query: {query}".format(query=query) + assert len(matching) == 1, "missing captured event" event = matching[0] assert event['db']['query_truncated'] == expected_statement_truncated assert set(event['ddtags'].split(',')) == set(tags) @@ -904,7 +893,7 @@ def obfuscate_sql(query, options=None): check = integration_check(dbm_instance) check._connect() - conn = psycopg.connect(host=HOST, dbname="datadog_test", user="bob", password="bob") + conn = psycopg2.connect(host=HOST, dbname="datadog_test", user="bob", password="bob") cursor = conn.cursor() # Execute the query with the mocked obfuscate_sql. The result should produce an event payload with the metadata. with mock.patch.object(datadog_agent, 'obfuscate_sql', passthrough=True) as mock_agent: @@ -993,27 +982,29 @@ def test_statement_reported_hostname( @pytest.mark.parametrize("pg_stat_activity_view", ["pg_stat_activity", "datadog.pg_stat_activity()"]) @pytest.mark.parametrize( - "user,password,dbname,query,blocking_query,expected_out,expected_keys,expected_conn_out", + "user,password,dbname,query,blocking_query,arg,expected_out,expected_keys,expected_conn_out", [ ( "bob", "bob", "datadog_test", - "BEGIN TRANSACTION; SELECT city FROM persons WHERE city = 'hello';", + "BEGIN TRANSACTION; SET application_name='test_snapshot'; SELECT city FROM persons WHERE city = %s;", "LOCK TABLE persons IN ACCESS EXCLUSIVE MODE", + "hello", { 'datname': 'datadog_test', 'usename': 'bob', 'state': 'active', - 'query_signature': '9382c42e92099c04', - 'statement': "BEGIN TRANSACTION; SELECT city FROM persons WHERE city = 'hello';", + 'query_signature': '4bd870d5ce614fd', + 'statement': "BEGIN TRANSACTION; SET application_name='test_snapshot'; " + "SELECT city FROM persons WHERE city = 'hello';", 'query_truncated': StatementTruncationState.not_truncated.value, }, ["now", "xact_start", "query_start", "pid", "client_port", "client_addr", "backend_type", "blocking_pids"], { 'usename': 'bob', 'state': 'active', - 'application_name': '', + 'application_name': 'test_snapshot', 'datname': 'datadog_test', 'connections': 1, }, @@ -1022,50 +1013,49 @@ def test_statement_reported_hostname( "bob", "bob", "datadog_test", - "BEGIN TRANSACTION; SELECT city as city0, city as city1, city as city2, city as city3, " - "city as city4, city as city5, city as city6, city as city7, city as city8, city as city9, " - "city as city10, city as city11, city as city12, city as city13, city as city14, city as city15, " - "city as city16, city as city17, city as city18, city as city19, city as city20, city as city21, " - "city as city22, city as city23, city as city24, city as city25, city as city26, city as city27, " - "city as city28, city as city29, city as city30, city as city31, city as city32, city as city33, " - "city as city34, city as city35, city as city36, city as city37, city as city38, city as city39, " - "city as city40, city as city41, city as city42, city as city43, city as city44, city as city45, " - "city as city46, city as city47, city as city48, city as city49, city as city50, city as city51, " - "city as city52, city as city53, city as city54, city as city55, city as city56, city as city57, " - "city as city58, city as city59, city as city60, city as city61 " - "FROM persons WHERE city = 'hello';", + "BEGIN TRANSACTION; SET application_name='test_snapshot'; SELECT city as city0, city as city1, " + "city as city2, city as city3, city as city4, city as city5, city as city6, city as city7, " + "city as city8, city as city9, city as city10, city as city11, city as city12, city as city13, " + "city as city14, city as city15, city as city16, city as city17, city as city18, city as city19, " + "city as city20, city as city21, city as city22, city as city23, city as city24, city as city25, " + "city as city26, city as city27, city as city28, city as city29, city as city30, city as city31, " + "city as city32, city as city33, city as city34, city as city35, city as city36, city as city37, " + "city as city38, city as city39, city as city40, city as city41, city as city42, city as city43, " + "city as city44, city as city45, city as city46, city as city47, city as city48, city as city49, " + "city as city50, city as city51, city as city52, city as city53, city as city54, city as city55, " + "city as city56, city as city57, city as city58, city as city59, city as city60, city as city61 " + "FROM persons WHERE city = %s;", "LOCK TABLE persons IN ACCESS EXCLUSIVE MODE", + "hello", { 'datname': 'datadog_test', 'usename': 'bob', 'state': 'active', - 'query_signature': 'e1429b86c013a78e', - 'statement': "BEGIN TRANSACTION; SELECT city as city0, city as city1, city as city2, city as city3, " - "city as city4, city as city5, city as city6, city as city7, city as city8, city as city9, " - "city as city10, city as city11, city as city12, city as city13, city as city14, city as city15, " - "city as city16, city as city17, city as city18, city as city19, city as city20, city as city21, " - "city as city22, city as city23, city as city24, city as city25, city as city26, city as city27, " - "city as city28, city as city29, city as city30, city as city31, city as city32, city as city33, " - "city as city34, city as city35, city as city36, city as city37, city as city38, city as city39, " - "city as city40, city as city41, city as city42, city as city43, city as city44, city as city45, " - "city as city46, city as city47, city as city48, city as city49, city as city50, city as city51, " - "city as city52, city as city53, city as city54, city as city55, city as city56, city as city57, " - "city as city58, city as city59, city as city60, city as city61 " - "FROM persons WHE", + 'query_signature': 'f79596b3cba3247a', + 'statement': "BEGIN TRANSACTION; SET application_name='test_snapshot'; SELECT city as city0, " + "city as city1, city as city2, city as city3, city as city4, city as city5, city as city6, " + "city as city7, city as city8, city as city9, city as city10, city as city11, city as city12, " + "city as city13, city as city14, city as city15, city as city16, city as city17, city as city18, " + "city as city19, city as city20, city as city21, city as city22, city as city23, city as city24, " + "city as city25, city as city26, city as city27, city as city28, city as city29, city as city30, " + "city as city31, city as city32, city as city33, city as city34, city as city35, city as city36, " + "city as city37, city as city38, city as city39, city as city40, city as city41, city as city42, " + "city as city43, city as city44, city as city45, city as city46, city as city47, city as city48, " + "city as city49, city as city50, city as city51, city as city52, city as city53, city as city54, " + "city as city55, city as city56, city as city57, city as city58, city as city59, city as", 'query_truncated': StatementTruncationState.truncated.value, }, ["now", "xact_start", "query_start", "pid", "client_port", "client_addr", "backend_type", "blocking_pids"], { 'usename': 'bob', 'state': 'active', - 'application_name': '', + 'application_name': 'test_snapshot', 'datname': 'datadog_test', 'connections': 1, }, ), ], ) -@pytest.mark.unit def test_activity_snapshot_collection( aggregator, integration_check, @@ -1077,11 +1067,11 @@ def test_activity_snapshot_collection( dbname, query, blocking_query, + arg, expected_out, expected_keys, expected_conn_out, ): - if POSTGRES_VERSION.split('.')[0] == "9" and pg_stat_activity_view == "pg_stat_activity": # cannot catch any queries from other users # only can see own queries @@ -1093,30 +1083,41 @@ def test_activity_snapshot_collection( check = integration_check(dbm_instance) check._connect() - blocking_conn = psycopg.connect(host=HOST, dbname=dbname, user="blocking_bob", password=password, autocommit=False) - conn = psycopg.connect(host=HOST, dbname=dbname, user=user, password=password, autocommit=False) - wg = WaitGroup() + conn = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password, async_=1) + blocking_conn = psycopg2.connect(host=HOST, dbname=dbname, user="blocking_bob", password=password) - def execute_in_thread(q): - with conn.cursor() as cursor: - cursor.execute(q) - wg.done() + def wait(conn): + while True: + state = conn.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_WRITE: + select.select([], [conn.fileno()], []) + elif state == psycopg2.extensions.POLL_READ: + select.select([conn.fileno()], [], []) + else: + raise psycopg2.OperationalError("poll() returned %s" % state) - # we are able to see the full query (including the raw parameters) in pg_stat_activity because psycopg uses + # we are able to see the full query (including the raw parameters) in pg_stat_activity because psycopg2 uses # the simple query protocol, sending the whole query as a plain string to postgres. # if a client is using the extended query protocol with prepare then the query would appear as # leave connection open until after the check has run to ensure we're able to see the query in # pg_stat_activity try: # first lock the table, which will cause the test query to be blocked + blocking_conn.autocommit = False blocking_conn.cursor().execute(blocking_query) - # ... now execute the test query in a separate thread - t = threading.Thread(target=execute_in_thread, args=(query,)) - wg.add(1) - t.start() - check.check(dbm_instance) + # ... now execute the test query + wait(conn) + conn.cursor().execute(query, (arg,)) + run_one_check(check) dbm_activity_event = aggregator.get_event_platform_events("dbm-activity") + if POSTGRES_VERSION.split('.')[0] == "9" and pg_stat_activity_view == "pg_stat_activity": + # cannot catch any queries from other users + # only can see own queries + return + event = dbm_activity_event[0] assert event['host'] == "stubbed.hostname" assert event['ddsource'] == "postgres" @@ -1124,7 +1125,14 @@ def execute_in_thread(q): assert event['ddagentversion'] == datadog_agent.get_version() assert len(event['postgres_activity']) > 0 # find bob's query and blocking_bob's query - bobs_query = next((q for q in event['postgres_activity'] if q.get('usename', None) == "bob"), None) + bobs_query = next( + ( + q + for q in event['postgres_activity'] + if q.get('usename', None) == "bob" and q.get('application_name', None) == 'test_snapshot' + ), + None, + ) blocking_bobs_query = next( (q for q in event['postgres_activity'] if q.get('usename', None) == "blocking_bob"), None ) @@ -1169,43 +1177,53 @@ def execute_in_thread(q): # find bob's connections. bobs_conns = None for query_json in event['postgres_connections']: - if 'usename' in query_json and query_json['usename'] == "bob": + if ( + 'usename' in query_json + and query_json['usename'] == "bob" + and query_json['application_name'] == 'test_snapshot' + ): bobs_conns = query_json + break assert bobs_conns is not None for key in expected_conn_out: assert expected_conn_out[key] == bobs_conns[key] - assert sorted(event['ddtags']) == sorted(expected_tags) + assert set(event['ddtags']) == set(expected_tags) + + if POSTGRES_VERSION == '9.5': + # rest of test is to confirm blocking behavior + # which we cannot collect in pg v9.5 at this time + return # ... now run the check again after closing blocking_bob's conn. # this means we should report bob as no longer blocked # close blocking_bob's tx blocking_conn.close() - - # wait for query to complete, but commit has not been called, - # so it should remain open and idle - wg.wait() - # Wait collection interval to make sure dbm events are reported time.sleep(dbm_instance['query_activity']['collection_interval']) - check.check(dbm_instance) + run_one_check(check) dbm_activity_event = aggregator.get_event_platform_events("dbm-activity") event = dbm_activity_event[1] assert len(event['postgres_activity']) > 0 - # find bob's query bobs_query = None for query_json in event['postgres_activity']: - if 'usename' in query_json and query_json['usename'] == "bob": + if ( + 'usename' in query_json + and query_json['usename'] == "bob" + and query_json['application_name'] == 'test_snapshot' + ): bobs_query = query_json + break assert bobs_query is not None assert len(bobs_query['blocking_pids']) == 0 # state should be idle now that it's no longer blocked assert bobs_query['state'] == "idle in transaction" + finally: - blocking_conn.close() conn.close() + blocking_conn.close() @pytest.mark.parametrize( @@ -1336,15 +1354,15 @@ def test_truncate_activity_rows(integration_check, dbm_instance, active_rows, ex [ ( "select * from fake_table", - "error:explain-undefined_table-", + "error:explain-undefined_table-", DBExplainError.undefined_table, - "", + "", ), ( "select * from pg_settings where name = $1", - "error:explain-parameterized_query-", + "error:explain-parameterized_query-", DBExplainError.parameterized_query, - "", + "", ), ( "SELECT city as city0, city as city1, city as city2, city as city3, " @@ -1452,9 +1470,7 @@ def test_statement_samples_dbstrict(aggregator, integration_check, dbm_instance, connections = [] for user, password, dbname, query, arg in SAMPLE_QUERIES: - conn = psycopg.connect( - host=HOST, dbname=dbname, user=user, password=password, cursor_factory=ClientCursor, autocommit=True - ) + conn = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) conn.cursor().execute(query, (arg,)) connections.append(conn) @@ -1645,7 +1661,7 @@ def _sample_key(e): @pytest.mark.parametrize("query_activity_enabled", [True, False]) @pytest.mark.parametrize( "user,password,dbname,query,arg", - [("bob", "bob", "datadog_test", "SELECT city FROM persons WHERE city = %s;", "hello")], + [("bob", "bob", "datadog_test", "BEGIN TRANSACTION; SELECT city FROM persons WHERE city = %s;", "hello")], ) def test_disabled_activity_or_explain_plans( aggregator, @@ -1673,9 +1689,10 @@ def test_disabled_activity_or_explain_plans( check = integration_check(dbm_instance) check._connect() - conn = psycopg.connect(host=HOST, dbname=dbname, user=user, password=password, cursor_factory=ClientCursor) + conn = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) try: + conn.autocommit = True conn.cursor().execute(query, (arg,)) run_one_check(check) dbm_activity = aggregator.get_event_platform_events("dbm-activity") @@ -1769,7 +1786,7 @@ def test_statement_samples_config_invalid_number(integration_check, pg_instance, integration_check(pg_instance) -class ObjectNotInPrerequisiteState(psycopg.errors.ObjectNotInPrerequisiteState): +class ObjectNotInPrerequisiteState(psycopg2.errors.ObjectNotInPrerequisiteState): """ A fake ObjectNotInPrerequisiteState that allows setting pg_error on construction since ObjectNotInPrerequisiteState has it as read-only and not settable at construction-time @@ -1788,7 +1805,7 @@ def __str__(self): return self.pg_error -class UndefinedTable(psycopg.errors.UndefinedTable): +class UndefinedTable(psycopg2.errors.UndefinedTable): """ A fake UndefinedTable that allows setting pg_error on construction since UndefinedTable has it as read-only and not settable at construction-time @@ -1843,7 +1860,7 @@ def __str__(self): ], ), ( - psycopg.errors.DatabaseError('connection reset'), + psycopg2.errors.DatabaseError('connection reset'), [], 'error:database-DatabaseError', [ @@ -1979,7 +1996,7 @@ def test_plan_time_metrics(aggregator, integration_check, dbm_instance): def _run_queries(): for user, password, dbname, query, arg in SAMPLE_QUERIES: if dbname not in connections: - connections[dbname] = psycopg.connect(host=HOST, dbname=dbname, user=user, password=password) + connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) connections[dbname].cursor().execute(query, (arg,)) check = integration_check(dbm_instance) diff --git a/postgres/tests/test_unit.py b/postgres/tests/test_unit.py index 95e2e2fe227c8..79ea811764ca7 100644 --- a/postgres/tests/test_unit.py +++ b/postgres/tests/test_unit.py @@ -4,7 +4,7 @@ import copy import mock -import psycopg +import psycopg2 import pytest from pytest import fail from semver import VersionInfo @@ -125,9 +125,9 @@ def test_query_timeout_connection_string(aggregator, integration_check, pg_insta check = integration_check(pg_instance) try: check.db_pool.get_connection(pg_instance['dbname'], 100) - except psycopg.ProgrammingError as e: + except psycopg2.ProgrammingError as e: fail(str(e)) - except psycopg.OperationalError: + except psycopg2.OperationalError: # could not connect to server because there is no server running pass diff --git a/postgres/tests/utils.py b/postgres/tests/utils.py index 442b11f2d904b..685f2d90021bb 100644 --- a/postgres/tests/utils.py +++ b/postgres/tests/utils.py @@ -4,7 +4,7 @@ import threading import time -import psycopg +import psycopg2 import pytest from .common import PASSWORD_ADMIN, POSTGRES_VERSION, USER_ADMIN @@ -39,24 +39,22 @@ ) -def _get_conn(db_instance, dbname=None, user=None, password=None, application_name='test', autocommit=True): - conn = psycopg.connect( +def _get_conn(db_instance, dbname=None, user=None, password=None, application_name='test'): + conn = psycopg2.connect( host=db_instance['host'], port=db_instance['port'], dbname=dbname or db_instance['dbname'], user=user or db_instance['username'], password=password or db_instance['password'], application_name=application_name, - autocommit=autocommit, ) + conn.autocommit = True return conn # Get a connection with superuser -def _get_superconn(db_instance, application_name='test', autocommit=True): - return _get_conn( - db_instance, user=USER_ADMIN, password=PASSWORD_ADMIN, application_name=application_name, autocommit=autocommit - ) +def _get_superconn(db_instance, application_name='test'): + return _get_conn(db_instance, user=USER_ADMIN, password=PASSWORD_ADMIN, application_name=application_name) def lock_table(pg_instance, table, lock_mode): @@ -108,7 +106,7 @@ def run_query(): cur.execute(stmt) try: cur.execute(query) - except psycopg.errors.QueryCanceled: + except psycopg2.errors.QueryCanceled: pass conn.close() @@ -137,28 +135,3 @@ def run_one_check(check, cancel=True): check.statement_metrics._job_loop_future.result() if check.metadata_samples._job_loop_future is not None: check.metadata_samples._job_loop_future.result() - - -# WaitGroup is used like go's sync.WaitGroup -class WaitGroup(object): - def __init__(self): - self.count = 0 - self.cv = threading.Condition() - - def add(self, n): - self.cv.acquire() - self.count += n - self.cv.release() - - def done(self): - self.cv.acquire() - self.count -= 1 - if self.count == 0: - self.cv.notify_all() - self.cv.release() - - def wait(self): - self.cv.acquire() - while self.count > 0: - self.cv.wait() - self.cv.release()