Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix mutation types in delete DAG #28110

Merged
merged 9 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions dags/deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,28 @@ def count_pending_deletes(client: Client) -> int:

@op
def wait_for_delete_mutations(
context: OpExecutionContext,
cluster: ResourceParam[ClickhouseCluster],
delete_person_events: tuple[PendingPersonEventDeletesTable, Mutation],
delete_person_events: tuple[PendingPersonEventDeletesTable, ShardMutations],
) -> PendingPersonEventDeletesTable:
pending_person_deletions, shard_mutations = delete_person_events

if not shard_mutations:
return pending_person_deletions
results = cluster.map_all_hosts_in_shards(
{shard: mutation.wait for shard, mutation in shard_mutations.items()}
fuziontech marked this conversation as resolved.
Show resolved Hide resolved
).as_completed()
try:
for shard, result in results:
result.result()
context.log.info(
f"Shard {shard.shard_num if shard.shard_num else 'unknown'} replica {shard.replica_num if shard.replica_num else 'unknown'} completed "
)
fuziontech marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
context.log.exception(
f"Shard {shard.shard_num if shard.shard_num else 'unknown'} replica {shard.replica_num if shard.replica_num else 'unknown'} failed: {e}"
)
raise
fuziontech marked this conversation as resolved.
Show resolved Hide resolved

[table, mutations] = delete_person_events
cluster.map_all_hosts(mutations.wait).result()
return table
return pending_person_deletions


@op
Expand Down
23 changes: 23 additions & 0 deletions posthog/clickhouse/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,29 @@ def map_all_hosts_in_shard(
}
)

def map_all_hosts_in_shards(
self, shard_fns: dict[int, Callable[[Client], T]], concurrency: int | None = None
) -> FuturesMap[HostInfo, T]:
"""
Execute the callable once for each host in the specified shards.

The number of concurrent queries can limited with the ``concurrency`` parameter, or set to ``None`` to use the
default limit of the executor.

Wait for all to return before returning upon ``.values()``
"""

shard_host_fn = {}
for shard, fn in shard_fns.items():
for host in self.__hosts:
if host.shard_num == shard:
shard_host_fn[host] = fn
fuziontech marked this conversation as resolved.
Show resolved Hide resolved

with ThreadPoolExecutor(max_workers=concurrency) as executor:
return FuturesMap(
{host: executor.submit(self.__get_task_function(host, fn)) for host, fn in shard_host_fn.items()}
)

def map_one_host_per_shard(
self, fn: Callable[[Client], T], concurrency: int | None = None
) -> FuturesMap[HostInfo, T]:
Expand Down
Loading