From 5d6ab53810778c4477c53617e1345c6557fe9114 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Fri, 31 Jan 2025 11:05:43 -0800 Subject: [PATCH] feat: add node roles to del dag (#28155) Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- dags/deletes.py | 5 +++-- posthog/clickhouse/cluster.py | 11 +++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/dags/deletes.py b/dags/deletes.py index 00b62c438f2c1..b426325232dda 100644 --- a/dags/deletes.py +++ b/dags/deletes.py @@ -18,6 +18,7 @@ ClickhouseCluster, Mutation, MutationRunner, + NodeRole, get_cluster, ) from posthog.models.event.sql import EVENTS_DATA_TABLE @@ -256,7 +257,7 @@ def delete_person_events( def sync_replica(client: Client): client.execute(f"SYSTEM SYNC REPLICA {load_pending_person_deletions.qualified_name} STRICT") - cluster.map_all_hosts(sync_replica).result() + cluster.map_hosts_by_role(sync_replica, NodeRole.WORKER).result() def count_pending_deletes(client: Client) -> int: result = client.execute( @@ -267,7 +268,7 @@ def count_pending_deletes(client: Client) -> int: ) return result[0][0] if result else 0 - count_result = cluster.any_host(count_pending_deletes).result() + count_result = cluster.any_host_by_role(count_pending_deletes, NodeRole.WORKER).result() if count_result == 0: context.add_output_metadata({"events_deleted": MetadataValue.int(0), "message": "No pending deletions found"}) diff --git a/posthog/clickhouse/cluster.py b/posthog/clickhouse/cluster.py index 95ad0eb974dc2..a4c3bacac302b 100644 --- a/posthog/clickhouse/cluster.py +++ b/posthog/clickhouse/cluster.py @@ -155,6 +155,17 @@ def any_host(self, fn: Callable[[Client], T]) -> Future[T]: host = self.__hosts[0] return executor.submit(self.__get_task_function(host, fn)) + def any_host_by_role(self, fn: Callable[[Client], T], node_role: NodeRole) -> Future[T]: + """ + Execute the callable once for any host with the given node role. + """ + with ThreadPoolExecutor() as executor: + try: + host = next(host for host in self.__hosts if host.host_cluster_role == node_role.value.lower()) + except StopIteration: + raise ValueError(f"No hosts found with role {node_role.value}") + return executor.submit(self.__get_task_function(host, fn)) + def map_all_hosts(self, fn: Callable[[Client], T], concurrency: int | None = None) -> FuturesMap[HostInfo, T]: """ Execute the callable once for each host in the cluster.