Skip to content

Commit

Permalink
fix: be careful about nodes we run ops on for del dag (PostHog#28162)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuziontech authored Jan 31, 2025
1 parent cf6e6be commit c6297d6
Showing 1 changed file with 7 additions and 12 deletions.
19 changes: 7 additions & 12 deletions dags/deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,15 @@ def qualified_name(self):
@property
def create_table_query(self) -> str:
return f"""
CREATE TABLE IF NOT EXISTS {self.qualified_name} ON CLUSTER '{self.cluster}'
CREATE TABLE IF NOT EXISTS {self.qualified_name}
(
uuid UUID,
event String,
team_id Int64,
person_id UUID,
timestamp DateTime
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/noshard/{self.table_name}', '{{shard}}-{{replica}}')
ENGINE = ReplacingMergeTree()
ORDER BY (team_id, event, person_id, timestamp)
"""

Expand Down Expand Up @@ -356,7 +356,7 @@ def sync_replica(client: Client):
def load_pending_event_deletions(client: Client):
client.execute(create_pending_event_deletes_table.populate_query)

cluster.any_host_by_role(load_pending_event_deletions, NodeRole.WORKER).result()
cluster.map_hosts_by_role(load_pending_event_deletions, NodeRole.WORKER).result()

return create_pending_event_deletes_table

Expand All @@ -369,12 +369,6 @@ def delete_person_events(
) -> tuple[PendingEventDeletesTable, ShardMutations]:
"""Delete events from sharded_events table for persons pending deletion."""

# Wait for the table to be fully replicated
def sync_replica(client: Client):
client.execute(f"SYSTEM SYNC REPLICA {load_pending_event_deletions.qualified_name} STRICT")

cluster.map_hosts_by_role(sync_replica, NodeRole.WORKER).result()

def count_pending_deletes(client: Client) -> int:
result = client.execute(
f"""
Expand All @@ -384,15 +378,16 @@ def count_pending_deletes(client: Client) -> int:
)
return result[0][0] if result else 0

count_result = cluster.any_host_by_role(count_pending_deletes, NodeRole.WORKER).result()
count_result = cluster.map_hosts_by_role(count_pending_deletes, NodeRole.WORKER).result()

if count_result == 0:
all_zero = all(count == 0 for count in count_result.values())
if all_zero:
context.add_output_metadata({"events_deleted": MetadataValue.int(0), "message": "No pending deletions found"})
return (load_pending_event_deletions, {})

context.add_output_metadata(
{
"events_deleted": MetadataValue.int(count_result),
"events_deleted": MetadataValue.int(sum(count_result.values())),
}
)

Expand Down

0 comments on commit c6297d6

Please sign in to comment.