diff --git a/snuba/cli/cleanup.py b/snuba/cli/cleanup.py index e09d729fdb9..10ce232eb62 100644 --- a/snuba/cli/cleanup.py +++ b/snuba/cli/cleanup.py @@ -18,6 +18,24 @@ type=int, help="Clickhouse native port to write to.", ) +@click.option( + "--clickhouse-secure", + type=bool, + default=False, + help="If true, an encrypted connection will be used", +) +@click.option( + "--clickhouse-ca-certs", + type=str, + default=None, + help="An optional path to certificates directory.", +) +@click.option( + "--clickhouse-verify", + type=bool, + default=False, + help="Verify ClickHouse SSL cert.", +) @click.option( "--dry-run", type=bool, @@ -36,6 +54,9 @@ def cleanup( *, clickhouse_host: Optional[str], clickhouse_port: Optional[int], + clickhouse_secure: bool, + clickhouse_ca_certs: Optional[str], + clickhouse_verify: Optional[bool], dry_run: bool, storage_name: str, log_level: Optional[str] = None, @@ -67,6 +88,9 @@ def cleanup( clickhouse_user, clickhouse_password, database, + clickhouse_secure, + clickhouse_ca_certs, + clickhouse_verify, ) elif not cluster.is_single_node(): raise click.ClickException("Provide ClickHouse host and port for cleanup") diff --git a/snuba/cli/migrations.py b/snuba/cli/migrations.py index 768b9e8cd0b..d27cc216502 100644 --- a/snuba/cli/migrations.py +++ b/snuba/cli/migrations.py @@ -318,12 +318,33 @@ def reverse_in_progress( required=True, default=os.environ.get("CLICKHOUSE_DATABASE", "default"), ) +@click.option( + "--secure", + type=bool, + default=False, + help="If true, an encrypted connection will be used", +) +@click.option( + "--ca-certs", + type=str, + default=None, + help="An optional path to certificates directory.", +) +@click.option( + "--verify", + type=bool, + default=False, + help="Verify ClickHouse SSL cert.", +) def add_node( node_type: str, storage_set_names: Sequence[str], host_name: str, port: int, database: str, + secure: bool, + ca_certs: Optional[str], + verify: Optional[bool], ) -> None: """ Runs all migrations on a brand new ClickHouse node. This should be performed @@ -364,6 +385,9 @@ def add_node( user=user, password=password, database=database, + secure=secure, + ca_certs=ca_certs, + verify=verify, ) diff --git a/snuba/cli/optimize.py b/snuba/cli/optimize.py index 055a0371518..166f54a0274 100644 --- a/snuba/cli/optimize.py +++ b/snuba/cli/optimize.py @@ -25,6 +25,24 @@ type=int, help="Clickhouse native port to write to.", ) +@click.option( + "--clickhouse-secure", + type=bool, + default=False, + help="If true, an encrypted connection will be used", +) +@click.option( + "--clickhouse-ca-certs", + type=str, + default=None, + help="An optional path to certificates directory.", +) +@click.option( + "--clickhouse-verify", + type=bool, + default=False, + help="Verify ClickHouse SSL cert.", +) @click.option( "--storage", "storage_name", @@ -44,6 +62,9 @@ def optimize( *, clickhouse_host: Optional[str], clickhouse_port: Optional[int], + clickhouse_secure: bool, + clickhouse_ca_certs: Optional[str], + clickhouse_verify: Optional[bool], storage_name: str, default_parallel_threads: int, log_level: Optional[str] = None, @@ -79,6 +100,9 @@ def optimize( clickhouse_user, clickhouse_password, database, + clickhouse_secure, + clickhouse_ca_certs, + clickhouse_verify, send_receive_timeout=ClickhouseClientSettings.OPTIMIZE.value.timeout, ) elif not storage.get_cluster().is_single_node(): diff --git a/snuba/clickhouse/http.py b/snuba/clickhouse/http.py index 9a1d35f89b0..7bec44f20b9 100644 --- a/snuba/clickhouse/http.py +++ b/snuba/clickhouse/http.py @@ -20,7 +20,7 @@ import rapidjson import sentry_sdk -from urllib3.connectionpool import HTTPConnectionPool +from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool from urllib3.exceptions import HTTPError from snuba import settings, state @@ -161,7 +161,7 @@ class HTTPWriteBatch: def __init__( self, executor: ThreadPoolExecutor, - pool: HTTPConnectionPool, + pool: HTTPConnectionPool | HTTPSConnectionPool, metrics: MetricsBackend, user: str, password: str, @@ -293,6 +293,9 @@ def __init__( port: int, user: str, password: str, + secure: bool, + ca_certs: Optional[str], + verify: Optional[bool], metrics: MetricsBackend, statement: InsertStatement, encoding: Optional[str], @@ -302,9 +305,13 @@ def __init__( max_connections: int = 1, block_connections: bool = False, ): - self.__pool = HTTPConnectionPool( - host, port, maxsize=max_connections, block=block_connections - ) + self.__pool: HTTPSConnectionPool | HTTPConnectionPool + if secure: + self.__pool = HTTPSConnectionPool( + host, port, ca_certs=ca_certs, verify=verify + ) + else: + self.__pool = HTTPConnectionPool(host, port) self.__executor = ThreadPoolExecutor() self.__metrics = metrics diff --git a/snuba/clickhouse/native.py b/snuba/clickhouse/native.py index 1f14639b8b4..6d65d124e84 100644 --- a/snuba/clickhouse/native.py +++ b/snuba/clickhouse/native.py @@ -81,6 +81,9 @@ def __init__( user: str, password: str, database: str, + secure: bool = False, + ca_certs: Optional[str] = None, + verify: Optional[bool] = False, connect_timeout: int = 1, send_receive_timeout: Optional[int] = 300, max_pool_size: int = settings.CLICKHOUSE_MAX_POOL_SIZE, @@ -91,6 +94,9 @@ def __init__( self.user = user self.password = password self.database = database + self.secure = secure + self.ca_certs = ca_certs + self.verify = verify self.connect_timeout = connect_timeout self.send_receive_timeout = send_receive_timeout self.client_settings = client_settings @@ -380,6 +386,9 @@ def _create_conn(self, use_fallback_host: bool = False) -> Client: user=self.user, password=self.password, database=self.database, + secure=self.secure, + ca_certs=self.ca_certs, + verify=self.verify, connect_timeout=self.connect_timeout, send_receive_timeout=self.send_receive_timeout, settings=self.client_settings, diff --git a/snuba/clusters/cluster.py b/snuba/clusters/cluster.py index 86adbab456a..57cba8f4820 100644 --- a/snuba/clusters/cluster.py +++ b/snuba/clusters/cluster.py @@ -162,7 +162,16 @@ def get_batch_writer( ClickhouseWriterOptions = Optional[Mapping[str, Any]] -CacheKey = Tuple[ClickhouseNode, ClickhouseClientSettings, str, str, str] +CacheKey = Tuple[ + ClickhouseNode, + ClickhouseClientSettings, + str, + str, + str, + bool, + Optional[str], + Optional[bool], +] class ConnectionCache: @@ -177,10 +186,22 @@ def get_node_connection( user: str, password: str, database: str, + secure: bool, + ca_certs: Optional[str], + verify: Optional[bool], ) -> ClickhousePool: with self.__lock: settings, timeout = client_settings.value - cache_key = (node, client_settings, user, password, database) + cache_key = ( + node, + client_settings, + user, + password, + database, + secure, + ca_certs, + verify, + ) if cache_key not in self.__cache: self.__cache[cache_key] = ClickhousePool( node.host_name, @@ -190,6 +211,9 @@ def get_node_connection( database, client_settings=settings, send_receive_timeout=timeout, + secure=secure, + ca_certs=ca_certs, + verify=verify, ) return self.__cache[cache_key] @@ -226,6 +250,9 @@ def __init__( password: str, database: str, http_port: int, + secure: bool, + ca_certs: Optional[str], + verify: Optional[bool], storage_sets: Set[str], single_node: bool, # The cluster name and distributed cluster name only apply if single_node is set to False @@ -246,6 +273,9 @@ def __init__( self.__password = password self.__database = database self.__http_port = http_port + self.__secure = secure + self.__ca_certs = ca_certs + self.__verify = verify self.__single_node = single_node self.__cluster_name = cluster_name self.__distributed_cluster_name = distributed_cluster_name @@ -290,6 +320,9 @@ def get_node_connection( self.__user, self.__password, self.__database, + self.__secure, + self.__ca_certs, + self.__verify, ) def get_deleter(self) -> Reader: @@ -331,6 +364,9 @@ def get_batch_writer( block_connections=self.__block_connections, user=self.__user, password=self.__password, + secure=self.__secure, + ca_certs=self.__ca_certs, + verify=self.__verify, metrics=metrics, statement=insert_statement.with_database(self.__database), encoding=encoding, @@ -413,6 +449,9 @@ def get_http_port(self) -> int: password=cluster.get("password", ""), database=cluster.get("database", "default"), http_port=cluster["http_port"], + secure=cluster.get("secure", False), + ca_certs=cluster.get("ca_certs", None), + verify=cluster.get("verify", False), storage_sets=cluster["storage_sets"], single_node=cluster["single_node"], cluster_name=cluster["cluster_name"] if "cluster_name" in cluster else None, @@ -459,6 +498,9 @@ def _build_sliced_cluster(cluster: Mapping[str, Any]) -> ClickhouseCluster: password=cluster.get("password", ""), database=cluster.get("database", "default"), http_port=cluster["http_port"], + secure=cluster.get("secure", False), + ca_certs=cluster.get("ca_certs", None), + verify=cluster.get("verify", False), storage_sets={ storage_tuple[0] for storage_tuple in cluster["storage_set_slices"] }, diff --git a/snuba/migrations/runner.py b/snuba/migrations/runner.py index 4a7ad0771eb..69e01d01e5b 100644 --- a/snuba/migrations/runner.py +++ b/snuba/migrations/runner.py @@ -572,6 +572,9 @@ def add_node( user: str, password: str, database: str, + secure: bool = False, + ca_certs: Optional[str] = None, + verify: Optional[bool] = False, ) -> None: client_settings = ClickhouseClientSettings.MIGRATE.value clickhouse = ClickhousePool( @@ -580,6 +583,9 @@ def add_node( user, password, database, + secure, + ca_certs, + verify, client_settings=client_settings.settings, send_receive_timeout=client_settings.timeout, ) diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index 61db6c84fa6..c952b57c15c 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -93,6 +93,9 @@ "password": os.environ.get("CLICKHOUSE_PASSWORD", ""), "database": os.environ.get("CLICKHOUSE_DATABASE", "default"), "http_port": int(os.environ.get("CLICKHOUSE_HTTP_PORT", 8123)), + "secure": os.environ.get("CLICKHOUSE_SECURE", "False").lower() in ("true", "1"), + "ca_certs": os.environ.get("CLICKHOUSE_CA_CERTS"), + "verify": os.environ.get("CLICKHOUSE_VERIFY"), "storage_sets": { "cdc", "discover", diff --git a/snuba/settings/settings_distributed.py b/snuba/settings/settings_distributed.py index ee01be1a8fe..b0f5cc9bf44 100644 --- a/snuba/settings/settings_distributed.py +++ b/snuba/settings/settings_distributed.py @@ -12,6 +12,9 @@ "password": os.environ.get("CLICKHOUSE_PASSWORD", ""), "database": os.environ.get("CLICKHOUSE_DATABASE", "default"), "http_port": int(os.environ.get("CLICKHOUSE_HTTP_PORT", 8123)), + "secure": os.environ.get("CLICKHOUSE_SECURE", "False").lower() in ("true", "1"), + "ca_certs": os.environ.get("CLICKHOUSE_CA_CERTS"), + "verify": os.environ.get("CLICKHOUSE_VERIFY"), "storage_sets": { "cdc", "discover", diff --git a/test_distributed_migrations/conftest.py b/test_distributed_migrations/conftest.py index c8e4daca201..a69e5fad60b 100644 --- a/test_distributed_migrations/conftest.py +++ b/test_distributed_migrations/conftest.py @@ -58,6 +58,9 @@ def pytest_configure() -> None: distributed_cluster_name=cluster_node["distributed_cluster_name"] if "distributed_cluster_name" in cluster_node else None, + secure=cluster_node.get("secure", False), + ca_certs=cluster_node.get("ca_certs", None), + verify=cluster_node.get("verify", False), ) database_name = cluster_node["database"] diff --git a/tests/clusters/fake_cluster.py b/tests/clusters/fake_cluster.py index 2d175fc114c..e42adc6c742 100644 --- a/tests/clusters/fake_cluster.py +++ b/tests/clusters/fake_cluster.py @@ -69,6 +69,9 @@ def __init__( password: str, database: str, http_port: int, + secure: bool, + ca_certs: Optional[str], + verify: Optional[bool], storage_sets: Set[str], single_node: bool, # The cluster name and distributed cluster name only apply if single_node is set to False @@ -83,6 +86,9 @@ def __init__( password=password, database=database, http_port=http_port, + secure=secure, + ca_certs=ca_certs, + verify=verify, storage_sets=storage_sets, single_node=single_node, cluster_name=cluster_name, diff --git a/tests/clusters/test_cluster.py b/tests/clusters/test_cluster.py index 967a5d02d93..b5d7a227815 100644 --- a/tests/clusters/test_cluster.py +++ b/tests/clusters/test_cluster.py @@ -60,6 +60,9 @@ "password": "", "database": "default", "http_port": 8123, + "secure": False, + "ca_certs": None, + "verify": False, "storage_sets": ALL_STORAGE_SETS, "single_node": True, }, @@ -70,6 +73,9 @@ "password": "", "database": "default", "http_port": 8123, + "secure": False, + "ca_certs": None, + "verify": False, "storage_sets": {"transactions"}, "single_node": False, "cluster_name": "clickhouse_hosts", @@ -87,6 +93,7 @@ "password": "", "database": "default", "http_port": 8123, + "secure": False, "storage_set_slices": {("generic_metrics_distributions", 0)}, "single_node": True, }, @@ -97,6 +104,7 @@ "password": "", "database": "slice_1_default", "http_port": 8124, + "secure": False, "storage_set_slices": {("generic_metrics_distributions", 1)}, "single_node": True, }, @@ -195,16 +203,46 @@ def test_get_local_nodes() -> None: @pytest.mark.clickhouse_db def test_cache_connections() -> None: cluster_1 = cluster.ClickhouseCluster( - "127.0.0.1", 8000, "default", "", "default", 8001, {"events"}, True + "127.0.0.1", + 8000, + "default", + "", + "default", + 8001, + False, + None, + False, + {"events"}, + True, ) cluster_2 = cluster.ClickhouseCluster( - "127.0.0.1", 8000, "default", "", "default", 8001, {"transactions"}, True + "127.0.0.1", + 8000, + "default", + "", + "default", + 8001, + False, + None, + False, + {"transactions"}, + True, ) # Same node but different user cluster_3 = cluster.ClickhouseCluster( - "127.0.0.1", 8000, "readonly", "", "default", 8001, {"metrics"}, True + "127.0.0.1", + 8000, + "readonly", + "", + "default", + 8001, + False, + None, + False, + {"metrics"}, + True, ) assert cluster_1.get_query_connection( diff --git a/tests/conftest.py b/tests/conftest.py index a913e7376a7..45749648f43 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,6 +45,9 @@ def create_databases() -> None: password="", database="default", http_port=cluster["http_port"], + secure=cluster["secure"], + ca_certs=cluster["ca_certs"], + verify=cluster["verify"], storage_sets=cluster["storage_sets"], single_node=cluster["single_node"], cluster_name=cluster["cluster_name"] if "cluster_name" in cluster else None, diff --git a/tests/migrations/test_connect.py b/tests/migrations/test_connect.py index 20d039ffcfe..c9fb045f0ba 100644 --- a/tests/migrations/test_connect.py +++ b/tests/migrations/test_connect.py @@ -24,6 +24,9 @@ password="", database="default", http_port=420, + secure=False, + ca_certs=None, + verify=False, storage_sets={ "querylog", }, @@ -37,6 +40,9 @@ password="", database="default", http_port=420, + secure=False, + ca_certs=None, + verify=False, storage_sets={ "events", }, @@ -50,6 +56,9 @@ password="", database="default", http_port=420, + secure=False, + ca_certs=None, + verify=False, storage_sets=_REMAINING_STORAGE_SET_KEYS, single_node=True, ) diff --git a/tests/migrations/test_table_engines.py b/tests/migrations/test_table_engines.py index acb4f6e25fa..c71b567feb0 100644 --- a/tests/migrations/test_table_engines.py +++ b/tests/migrations/test_table_engines.py @@ -12,6 +12,9 @@ password="", database="default", http_port=8123, + secure=False, + ca_certs=None, + verify=False, storage_sets={"events"}, single_node=True, ) @@ -23,6 +26,9 @@ password="", database="default", http_port=8123, + secure=False, + ca_certs=None, + verify=False, storage_sets={"events"}, single_node=False, cluster_name="cluster_1", diff --git a/tests/replacer/test_cluster_replacements.py b/tests/replacer/test_cluster_replacements.py index 003d15bbeb1..8f36ae51ec5 100644 --- a/tests/replacer/test_cluster_replacements.py +++ b/tests/replacer/test_cluster_replacements.py @@ -57,6 +57,9 @@ def _build_cluster(healthy: bool = True) -> FakeClickhouseCluster: password="", database="default", http_port=8123, + secure=False, + ca_certs=None, + verify=False, storage_sets={"events"}, single_node=False, cluster_name="my_cluster", @@ -389,6 +392,9 @@ def run_query( password="", database="default", http_port=8123, + secure=False, + ca_certs=None, + verify=False, storage_sets={"events"}, single_node=False, cluster_name="my_cluster", diff --git a/tests/replacer/test_load_balancer.py b/tests/replacer/test_load_balancer.py index bd061a85dc8..19a8e3bec4d 100644 --- a/tests/replacer/test_load_balancer.py +++ b/tests/replacer/test_load_balancer.py @@ -103,6 +103,9 @@ def test_load_balancer( password="", database="default", http_port=8123, + secure=True, + ca_certs=None, + verify=True, storage_sets={"events"}, single_node=False, cluster_name="my_cluster",