Skip to content

Commit

Permalink
feat: add node roles to del dag (PostHog#28155)
Browse files Browse the repository at this point in the history
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
  • Loading branch information
fuziontech and greptile-apps[bot] authored Jan 31, 2025
1 parent 1d63972 commit 5d6ab53
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
5 changes: 3 additions & 2 deletions dags/deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ClickhouseCluster,
Mutation,
MutationRunner,
NodeRole,
get_cluster,
)
from posthog.models.event.sql import EVENTS_DATA_TABLE
Expand Down Expand Up @@ -256,7 +257,7 @@ def delete_person_events(
def sync_replica(client: Client):
client.execute(f"SYSTEM SYNC REPLICA {load_pending_person_deletions.qualified_name} STRICT")

cluster.map_all_hosts(sync_replica).result()
cluster.map_hosts_by_role(sync_replica, NodeRole.WORKER).result()

def count_pending_deletes(client: Client) -> int:
result = client.execute(
Expand All @@ -267,7 +268,7 @@ def count_pending_deletes(client: Client) -> int:
)
return result[0][0] if result else 0

count_result = cluster.any_host(count_pending_deletes).result()
count_result = cluster.any_host_by_role(count_pending_deletes, NodeRole.WORKER).result()

if count_result == 0:
context.add_output_metadata({"events_deleted": MetadataValue.int(0), "message": "No pending deletions found"})
Expand Down
11 changes: 11 additions & 0 deletions posthog/clickhouse/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ def any_host(self, fn: Callable[[Client], T]) -> Future[T]:
host = self.__hosts[0]
return executor.submit(self.__get_task_function(host, fn))

def any_host_by_role(self, fn: Callable[[Client], T], node_role: NodeRole) -> Future[T]:
"""
Execute the callable once for any host with the given node role.
"""
with ThreadPoolExecutor() as executor:
try:
host = next(host for host in self.__hosts if host.host_cluster_role == node_role.value.lower())
except StopIteration:
raise ValueError(f"No hosts found with role {node_role.value}")
return executor.submit(self.__get_task_function(host, fn))

def map_all_hosts(self, fn: Callable[[Client], T], concurrency: int | None = None) -> FuturesMap[HostInfo, T]:
"""
Execute the callable once for each host in the cluster.
Expand Down

0 comments on commit 5d6ab53

Please sign in to comment.