Skip to content

Commit

Permalink
Fix timing issue where a sync task would finish before the other one …
Browse files Browse the repository at this point in the history
…was registered in futures dict (#3110)

- this was not possible in async where all done callbacks are called in
next tick
- in sync case this would manifest as the first task done callback
seeing counter == 1 and thus setting event
- the fix is to unset the event whenever a task is scheduled
  • Loading branch information
nfcampos authored Jan 21, 2025
2 parents e10b7c1 + d48b254 commit 3ec55b0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
5 changes: 5 additions & 0 deletions libs/langgraph/langgraph/pregel/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __setitem__(
super().__setitem__(key, value) # type: ignore[index]
if value is not None:
with self.lock:
self.event.clear()
self.counter += 1
key.add_done_callback(partial(self.on_done, value))

Expand Down Expand Up @@ -296,6 +297,8 @@ def call(
futures.event.wait(
timeout=(max(0, end_time - time.monotonic()) if end_time else None)
)
# give control back to the caller
yield
# panic on failure or timeout
_panic_or_proceed(
futures.done.union(f for f, t in futures.items() if t is not None),
Expand Down Expand Up @@ -517,6 +520,8 @@ def call(
futures.event.wait(),
timeout=(max(0, end_time - loop.time()) if end_time else None),
)
# give control back to the caller
yield
# cancel waiter task
for fut in futures:
fut.cancel()
Expand Down
11 changes: 7 additions & 4 deletions libs/langgraph/tests/test_large_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -2829,9 +2829,9 @@ def agent(data: AgentState) -> AgentState:
# Define decision-making logic
def should_continue(data: AgentState) -> str:
assert isinstance(data["session"], httpx.Client)
assert (
data["something_extra"] == "hi there"
), "nodes can pass extra data to their cond edges, which isn't saved in state"
assert data["something_extra"] == "hi there", (
"nodes can pass extra data to their cond edges, which isn't saved in state"
)
# Logic to decide whether to continue in the loop or exit
if tool_calls := data["messages"][-1].tool_calls:
return [Send("tools", tool_call) for tool_call in tool_calls]
Expand Down Expand Up @@ -5425,7 +5425,7 @@ class State(TypedDict, total=False):
docs: Annotated[list[str], sorted_add]

def rewrite_query(data: State) -> State:
return {"query": f'query: {data["query"]}'}
return {"query": f"query: {data['query']}"}

def retriever_one(data: State) -> State:
# timer ensures stream output order is stable
Expand Down Expand Up @@ -7277,6 +7277,9 @@ def __init__(self, name: str):
setattr(self, "__name__", name)

def __call__(self, state):
time.sleep(0)
# sleep makes it more likely to trigger edge case where 1st task
# finishes before 2nd is registered in futures dict
self.ticks += 1
update = (
[self.name]
Expand Down

0 comments on commit 3ec55b0

Please sign in to comment.