Skip to content

Commit

Permalink
chore(weave): Cleanup a mem leak in ThreadExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
tssweeney authored Jan 24, 2025
1 parent 3aae320 commit 0f4a35d
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions weave/trace/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,35 +61,38 @@ def _wrapped_fn(*args):

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.contexts: list[Context] = []

# ignoring type here for convenience because otherwise you have to write a bunch of overloads
# for py310+ and py39-
def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> Any: # type: ignore
context = copy_context()
self.contexts.append(context)

wrapped_fn = partial(self._run_with_context, context, fn)
return super().submit(wrapped_fn, *args, **kwargs)
wrapped_fn = partial(self._run_with_context, fn)
return super().submit(wrapped_fn, copy_context(), *args, **kwargs)

def map(
self,
fn: Callable,
*iterables: Iterable[Iterable],
*iterables: Iterable[Any],
timeout: float | None = None,
chunksize: int = 1,
) -> Iterator:
contexts = [copy_context() for _ in range(len(list(iterables[0])))]
self.contexts.extend(contexts)
first_iterable = list(iterables[0])
map_len = len(first_iterable)

# Create a context for each item in the first iterable
contexts = [copy_context() for _ in range(map_len)]

wrapped_fn = partial(self._run_with_context, fn)

# Convert lists to iterables for map()
map_iterables = (contexts, first_iterable, *iterables[1:])

wrapped_fn = partial(self._run_with_context, None, fn)
return super().map(wrapped_fn, *iterables, timeout=timeout, chunksize=chunksize)
return super().map(
wrapped_fn, *map_iterables, timeout=timeout, chunksize=chunksize
)

def _run_with_context(
self, context: Context, fn: Callable, *args: Any, **kwargs: Any
self, fn: Callable, context: Context, *args: Any, **kwargs: Any
) -> Any:
if context is None:
context = self.contexts.pop(0)
return context.run(fn, *args, **kwargs)


Expand Down

0 comments on commit 0f4a35d

Please sign in to comment.