Skip to content

Commit

Permalink
add metric
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Dec 11, 2023
1 parent daf3c8a commit db3c392
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
4 changes: 4 additions & 0 deletions arroyo/processing/strategies/run_task_with_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,14 @@ def __init__(
self.__num_processes = num_processes
self.__initializer = initializer
self.__pool: Optional[Pool] = None
self.__metrics = get_metrics()
self.maybe_create_pool()

def maybe_create_pool(self) -> None:
if self.__pool is None:
self.__metrics.increment(
"arroyo.strategies.run_task_with_multiprocessing.pool.create"
)
self.__pool = Pool(
self.__num_processes,
initializer=partial(parallel_worker_initializer, self.__initializer),
Expand Down
2 changes: 2 additions & 0 deletions arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
# Gauge. Shows how many processes the multiprocessing strategy is
# configured with.
"arroyo.strategies.run_task_with_multiprocessing.processes",
# Counter. Incremented when the multiprocessing pool is created (or re-created).
"arroyo.strategies.run_task_with_multiprocessing.pool.create",
# Time (unitless) spent polling librdkafka for new messages.
"arroyo.consumer.poll.time",
# Time (unitless) spent in strategies (blocking in strategy.submit or
Expand Down

0 comments on commit db3c392

Please sign in to comment.