Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflow): add pause and resume functionality to team workflow #180

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
64f815e
feat(workflow): add pause and resume functionality to team workflow
anthonydevs17 Jan 4, 2025
9662c83
remove logs
anthonydevs17 Jan 4, 2025
1ca477c
feat(agent): add pause handling to ReactChampionAgent workflow
anthonydevs17 Jan 5, 2025
94c5e78
feat(workflow): implement stop functionality for team workflow
anthonydevs17 Jan 6, 2025
ac46726
feat(workflow): enhance workflow control with pause, resume, and stop…
anthonydevs17 Jan 8, 2025
c30a39d
Merge branch 'main' of https://github.com/kaiban-ai/KaibanJS into pau…
anthonydevs17 Jan 8, 2025
5ca097f
fix(tools): update README formatting for better readability
anthonydevs17 Jan 8, 2025
ea3f631
Merge branch 'main' of https://github.com/kaiban-ai/KaibanJS into pau…
anthonydevs17 Jan 13, 2025
5b15c1f
feat(workflow): streamline workflow management with enhanced pause, r…
anthonydevs17 Jan 16, 2025
ce70eb3
feat(tests): update snapshots to include currentIterations and lastFe…
anthonydevs17 Jan 16, 2025
d4a7e7f
docs: remove redundant newline in README.md
anthonydevs17 Jan 16, 2025
b10e551
refactor(reactChampionAgent): remove redundant pause check in handleL…
anthonydevs17 Jan 20, 2025
ca0b70a
feat(workflow): implement task pause functionality and enhance agent …
anthonydevs17 Jan 21, 2025
49b6101
feat(subscriber): add PAUSED state to task status updates
anthonydevs17 Jan 21, 2025
ef4a6a0
feat(workflow): enhance agent state management and task queue handling
anthonydevs17 Jan 22, 2025
b6fc578
feat(tests): update snapshots with detailed lastFeedbackMessage for v…
anthonydevs17 Jan 22, 2025
db13940
feat(agent): add workOnTaskResume method and integrate into workflow …
anthonydevs17 Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(workflow): streamline workflow management with enhanced pause, r…
…esume, and stop methods

- Refactored Team class methods (pause, resume, stop) to utilize new workflow management functions directly from the store, improving code clarity and reducing redundancy.
- Updated ReactChampionAgent to track the last feedback message and handle task execution more effectively, including abort handling.
- Introduced new error classes (StopAbortError, PauseAbortError) for better error management during workflow interruptions.
- Enhanced task logging for aborted tasks, capturing relevant statistics and error details for improved debugging.
- Integrated workflow action enums to standardize workflow control actions across the application.
anthonydevs17 committed Jan 16, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 5b15c1ff72dfdb2a0debd4b944743bc30575bdcc
165 changes: 90 additions & 75 deletions src/agents/reactChampionAgent.js
Original file line number Diff line number Diff line change
@@ -82,8 +82,8 @@ class ReactChampionAgent extends BaseAgent {
};
this.llmConfig = extractedLlmConfig;
}

this.interactionsHistory = new ChatMessageHistory();
this.lastFeedbackMessage = null;
}

async workOnTask(task, inputs, context) {
@@ -147,7 +147,7 @@ class ReactChampionAgent extends BaseAgent {

return {
executableAgent: chainAgentWithHistory,
initialFeedbackMessage: feedbackMessage,
initialFeedbackMessage: this.lastFeedbackMessage || feedbackMessage,
};
}

@@ -164,22 +164,20 @@ class ReactChampionAgent extends BaseAgent {
iterations < maxAgentIterations &&
!loopCriticalError
) {
while (
agent.store.getState().teamWorkflowStatus ===
WORKFLOW_STATUS_enum.PAUSED ||
agent.store.getState().teamWorkflowStatus ===
WORKFLOW_STATUS_enum.STOPPED
// Save the feedback message as the last feedback message
this.lastFeedbackMessage = feedbackMessage;

// Check workflow status
const workflowStatus = agent.store.getState().teamWorkflowStatus;

if (
workflowStatus === WORKFLOW_STATUS_enum.STOPPED ||
workflowStatus === WORKFLOW_STATUS_enum.STOPPING
) {
if (
agent.store.getState().teamWorkflowStatus ===
WORKFLOW_STATUS_enum.STOPPED
) {
return {
result: parsedResultWithFinalAnswer,
metadata: { iterations, maxAgentIterations },
};
}
await new Promise((resolve) => setTimeout(resolve, 100)); // Wait until resumed or stopped
return {
result: parsedResultWithFinalAnswer,
metadata: { iterations, maxAgentIterations },
};
}

try {
@@ -513,63 +511,48 @@ class ReactChampionAgent extends BaseAgent {
}

async executeThinking(agent, task, ExecutableAgent, feedbackMessage) {
const abortController =
agent.store.getState().workflowController.abortController;

return new Promise((resolve, reject) => {
// Check if already aborted
if (abortController.signal.aborted) {
reject(new AbortError());
return;
}

// Use once: true to ensure the listener is removed after firing
abortController.signal.addEventListener(
'abort',
() => {
reject(new AbortError());
},
{ once: true }
);
const promiseObj = {};
let rejectFn; // Declare reject function outside Promise
// Create an AbortController for this invocation
const abortController = new AbortController();
const thinkingPromise = new Promise((resolve, reject) => {
rejectFn = reject; // Capture the reject function

ExecutableAgent.invoke(
{ feedbackMessage },
{
configurable: {
sessionId: 'foo-bar-baz',
signal: abortController.signal,
},
configurable: { sessionId: task.id },
callbacks: [
{
handleChatModelStart: (llm, messages) => {
if (abortController.signal.aborted) return;
agent
.handleThinkingStart({ agent, task, messages })
.catch((error) => {
reject(error);
});
handleChatModelStart: async (llm, messages) => {
await agent.handleThinkingStart({ agent, task, messages });
},

handleLLMEnd: async (output) => {
if (abortController.signal.aborted) return;
agent
.handleThinkingEnd({ agent, task, output })
.then((thinkingResult) => resolve(thinkingResult))
.catch((error) => {
reject(error);
});
if (
this.store.getState().teamWorkflowStatus ===
WORKFLOW_STATUS_enum.PAUSED
) {
return;
}
const result = await agent.handleThinkingEnd({
agent,
task,
output,
});
resolve(result);
},
},
],
], // Add the signal to the options
signal: abortController.signal,
}
).catch((error) => {
logger.error(
`LLM_INVOCATION_ERROR: Error during LLM API call for Agent: ${agent.name}, Task: ${task.id}. Details:`,
error
);
if (error.name === 'AbortError' || abortController.signal.aborted) {
reject(new AbortError());
if (error.name === 'AbortError') {
reject(new AbortError('Task was cancelled'));
} else {
logger.error(
`LLM_INVOCATION_ERROR: Error during LLM API call for Agent: ${agent.name}, Task: ${task.id}. Details:`,
error
);
reject(
new LLMInvocationError(
`LLM API Error during executeThinking for Agent: ${agent.name}, Task: ${task.id}`,
@@ -579,6 +562,35 @@ class ReactChampionAgent extends BaseAgent {
}
});
});

// Assign both the promise and the captured reject function
Object.assign(promiseObj, {
promise: thinkingPromise,
// reject: rejectFn,
reject: (e) => {
abortController.abort();
rejectFn(e);
},
});

// Track promise in store
this.store.getState().trackPromise(this.id, promiseObj);

try {
return await thinkingPromise;
} catch (error) {
// Ensure we properly handle and rethrow the error
if (error instanceof AbortError) {
throw error; // Rethrow AbortError
}
// Wrap unexpected errors
throw new LLMInvocationError(
`LLM API Error during executeThinking for Agent: ${agent.name}, Task: ${task.id}`,
error
);
} finally {
this.store.getState().removePromise(this.id, promiseObj);
}
}

handleIssuesParsingLLMOutput({ agent, task, output }) {
@@ -684,28 +696,29 @@ class ReactChampionAgent extends BaseAgent {
}

async executeUsingTool({ agent, task, parsedLLMOutput, tool }) {
const abortController =
agent.store.getState().workflowController.abortController;

const toolInput = parsedLLMOutput.actionInput;
agent.handleUsingToolStart({ agent, task, tool, input: toolInput });

try {
const toolResult = await Promise.race([
tool.call(toolInput),
new Promise((_, reject) => {
abortController.signal.addEventListener('abort', () => {
reject(new AbortError());
});
}),
]);
const promiseObj = {};
let rejectFn; // Declare reject function outside Promise

agent.handleUsingToolEnd({ agent, task, tool, output: toolResult });
const toolPromise = new Promise((resolve, reject) => {
rejectFn = reject; // Capture the reject function
tool.call(toolInput).then(resolve).catch(reject);
});

// Track promise in store
Object.assign(promiseObj, { promise: toolPromise, reject: rejectFn });

this.store.getState().trackPromise(this.id, promiseObj);

try {
const result = await toolPromise;
agent.handleUsingToolEnd({ agent, task, tool, output: result });
return this.promptTemplates.TOOL_RESULT_FEEDBACK({
agent,
task,
toolResult,
toolResult: result,
parsedLLMOutput,
});
} catch (error) {
@@ -719,6 +732,8 @@ class ReactChampionAgent extends BaseAgent {
tool,
error,
});
} finally {
this.store.getState().removePromise(this.id, promiseObj);
}
}

21 changes: 3 additions & 18 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -163,11 +163,7 @@ class Team {
* @returns {void}
*/
pause() {
const currentStatus = this.store.getState().teamWorkflowStatus;
if (currentStatus !== WORKFLOW_STATUS_enum.RUNNING) {
throw new Error('Cannot pause workflow unless it is running');
}
this.store.setState({ teamWorkflowStatus: WORKFLOW_STATUS_enum.PAUSED });
return this.store.getState().pauseWorkflow();
}

/**
@@ -176,26 +172,15 @@ class Team {
* @returns {void}
*/
resume() {
const currentStatus = this.store.getState().teamWorkflowStatus;
if (currentStatus !== WORKFLOW_STATUS_enum.PAUSED) {
throw new Error('Cannot resume workflow unless it is paused');
}
this.store.setState({ teamWorkflowStatus: WORKFLOW_STATUS_enum.RESUMED });
return this.store.getState().resumeWorkflow();
}
/**
* Stops the team's workflow.
* This method stops the workflow, preventing any further task execution.
* @returns {void}
*/
stop() {
const currentStatus = this.store.getState().teamWorkflowStatus;
if (
currentStatus !== WORKFLOW_STATUS_enum.RUNNING &&
currentStatus !== WORKFLOW_STATUS_enum.PAUSED
) {
throw new Error('Cannot stop workflow unless it is running or paused');
}
this.store.setState({ teamWorkflowStatus: WORKFLOW_STATUS_enum.STOPPING });
return this.store.getState().stopWorkflow();
}

/**
40 changes: 38 additions & 2 deletions src/stores/taskStore.js
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ import {
} from '../utils/enums';
import { getTaskTitleForLogs } from '../utils/tasks';
import { logger } from '../utils/logger';
import { PrettyError } from '../utils/errors';
import { PrettyError, StopAbortError } from '../utils/errors';
import { calculateTaskCost } from '../utils/llmCostCalculator';

export const useTaskStore = (set, get) => ({
@@ -288,6 +288,43 @@ export const useTaskStore = (set, get) => ({
get().handleWorkflowBlocked({ task, error });
},
handleTaskAborted: ({ task, error }) => {
if (error instanceof StopAbortError) {
//create task log
const stats = get().getTaskStats(task, get);
const modelCode = task.agent.llmConfig.model; // Assuming this is where the model code is stored
// Calculate costs directly using stats
const costDetails = calculateTaskCost(modelCode, stats.llmUsageStats);

const taskLog = get().prepareNewLog({
agent: task.agent,
task,
logDescription: `Task aborted: ${getTaskTitleForLogs(task)}, Reason: ${
error.message
}`,
metadata: {
...stats,
costDetails,
error,
},
logType: 'TaskStatusUpdate',
});
// create pretty error
const prettyError = new PrettyError({
name: 'TASK STOPPED',
message: 'Task manually stopped by user.',
recommendedAction:
'Enable logLevel: "debug" during team initialization to obtain more detailed logs and facilitate troubleshooting.',
rootError: error,
context: { task, error },
});
logger.warn(prettyError.prettyMessage);
logger.debug(prettyError.context);

set((state) => ({
workflowLogs: [...state.workflowLogs, taskLog],
}));
return;
}
const stats = get().getTaskStats(task, get);
task.status = TASK_STATUS_enum.BLOCKED;
const modelCode = task.agent.llmConfig.model; // Assuming this is where the model code is stored
@@ -338,6 +375,5 @@ export const useTaskStore = (set, get) => ({
),
workflowLogs: [...state.workflowLogs, taskLog],
}));
get().handleWorkflowAborted({ task, error });
},
});
3 changes: 2 additions & 1 deletion src/stores/teamStore.js
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import { create } from 'zustand';
import { devtools, subscribeWithSelector } from 'zustand/middleware';
import { useAgentStore } from './agentStore';
import { useTaskStore } from './taskStore';
import { useWorkflowLoopStore } from './workflowLoopStore';
import {
TASK_STATUS_enum,
AGENT_STATUS_enum,
@@ -52,7 +53,7 @@ const createTeamStore = (initialState = {}) => {
(set, get) => ({
...useAgentStore(set, get),
...useTaskStore(set, get),

...useWorkflowLoopStore(set, get),
teamWorkflowStatus:
initialState.teamWorkflowStatus || WORKFLOW_STATUS_enum.INITIAL,
workflowResult: initialState.workflowResult || null,
85 changes: 11 additions & 74 deletions src/stores/workflowController.js
Original file line number Diff line number Diff line change
@@ -8,11 +8,12 @@
* Integrate this controller to manage the flow of tasks within your application, ensuring tasks are executed in an orderly and efficient manner.
*/

import PQueue from 'p-queue';
import { TASK_STATUS_enum, WORKFLOW_STATUS_enum } from '../utils/enums';
import { logger } from '../utils/logger';
// import PQueue from 'p-queue';
import { TASK_STATUS_enum /*WORKFLOW_STATUS_enum*/ } from '../utils/enums';
// import { logger } from '../utils/logger';
export const setupWorkflowController = (useTeamStore) => {
const taskQueue = new PQueue({ concurrency: 1 });
// const taskQueue = new PQueue({ concurrency: 1 });
const taskQueue = useTeamStore.getState().taskQueue;
useTeamStore.setState({
workflowController: {
abortController: new AbortController(),
@@ -89,78 +90,14 @@ export const setupWorkflowController = (useTeamStore) => {
}
);

// Managing workflow status changes
//Managing tasks moving to 'DONE'
useTeamStore.subscribe(
(state) => state.teamWorkflowStatus,
async (status) => {
if (status === WORKFLOW_STATUS_enum.PAUSED) {
taskQueue.pause();
} else if (status === WORKFLOW_STATUS_enum.RESUMED) {
taskQueue.start();
useTeamStore.setState({
teamWorkflowStatus: WORKFLOW_STATUS_enum.RUNNING,
(state) => state.tasks.filter((t) => t.status === TASK_STATUS_enum.DONE),
(doneTasks, previousDoneTasks) => {
if (doneTasks.length > previousDoneTasks.length) {
doneTasks.forEach((task) => {
useTeamStore.getState().clearAgentLoopState(task.agent.id);
});
} else if (status === WORKFLOW_STATUS_enum.STOPPING) {
try {
const abortController =
useTeamStore.getState().workflowController.abortController;

// Create a promise that resolves when all ongoing tasks are aborted
const abortPromise = new Promise((resolve) => {
// Use 'aborted' event instead of 'abort'
if (abortController.signal.aborted) {
resolve();
} else {
abortController.signal.addEventListener(
'abort',
() => {
resolve();
},
{ once: true }
);
}
});

// Trigger the abort
abortController.abort();

// Wait for abort to complete with a timeout
await Promise.race([
abortPromise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Abort timeout')), 5000)
),
]);

// Clear the task queue
taskQueue.clear();

// Update all DOING tasks to TODO
const tasks = useTeamStore.getState().tasks;
tasks.forEach((task) => {
if (task.status === TASK_STATUS_enum.DOING) {
useTeamStore
.getState()
.updateTaskStatus(task.id, TASK_STATUS_enum.TODO);
}
});

// Set final stopped status and create new abortController
useTeamStore.setState({
teamWorkflowStatus: WORKFLOW_STATUS_enum.STOPPED,
workflowController: {
abortController: new AbortController(),
},
});
} catch (error) {
logger.error('Error while stopping workflow:', error);
useTeamStore.setState({
teamWorkflowStatus: WORKFLOW_STATUS_enum.STOPPED,
workflowController: {
abortController: new AbortController(),
},
});
}
}
}
);
168 changes: 168 additions & 0 deletions src/stores/workflowLoopStore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import { ChatMessageHistory } from 'langchain/stores/message/in_memory';
import {
WORKFLOW_STATUS_enum,
TASK_STATUS_enum,
WORKFLOW_ACTION_enum,
} from '../utils/enums';
import {
StopAbortError,
PauseAbortError,
WorkflowError,
} from '../utils/errors';
import { logger } from '../utils/logger';
import PQueue from 'p-queue';

export const useWorkflowLoopStore = (set, get) => ({
taskQueue: new PQueue({ concurrency: 1 }),
activePromises: new Map(),
clearAgentLoopState: (agentId) =>
set((store) => {
const newAgents = [...store.agents];
newAgents.forEach(({ agentInstance }) => {
if (agentInstance.id === agentId) {
agentInstance.interactionsHistory = new ChatMessageHistory();
agentInstance.lastFeedbackMessage = null;
agentInstance.currentIterations = 0;
}
});
logger.info('cleared agent loop state', agentId);
return { agents: newAgents };
}),

// Initialize
initializeWorkflow: () => {
set((state) => ({
...state,
teamWorkflowStatus: WORKFLOW_STATUS_enum.RUNNING,
taskQueue: new PQueue({ concurrency: 1 }),
}));
},

// Promise Management
trackPromise: (agentId, promiseObj) => {
set((state) => {
const agentPromises = state.activePromises.get(agentId) || new Set();
agentPromises.add(promiseObj);
return {
activePromises: new Map(state.activePromises).set(
agentId,
agentPromises
),
};
});
},

removePromise: (agentId, promiseObj) => {
set((state) => {
const agentPromises = state.activePromises.get(agentId);
if (agentPromises) {
agentPromises.delete(promiseObj);
}
return {
activePromises: new Map(state.activePromises),
};
});
},

abortAgentPromises: (agentId, action) => {
const agentPromises = get().activePromises.get(agentId);
if (agentPromises) {
for (const { reject } of agentPromises) {
switch (action) {
case WORKFLOW_ACTION_enum.STOP:
reject(new StopAbortError());
break;
case WORKFLOW_ACTION_enum.PAUSE:
reject(new PauseAbortError());
break;
default:
break;
}
}
set((state) => ({
activePromises: new Map(state.activePromises).set(agentId, new Set()),
}));
}
},

// Workflow Control Actions
pauseWorkflow: async () => {
const currentStatus = get().teamWorkflowStatus;
if (currentStatus !== WORKFLOW_STATUS_enum.RUNNING) {
throw new WorkflowError('Cannot pause workflow unless it is running');
}

// Pause task queue
get().taskQueue.pause();
// Abort all active agent promises
for (const agentId of get().activePromises.keys()) {
get().abortAgentPromises(agentId, WORKFLOW_ACTION_enum.PAUSE);
}

set({ teamWorkflowStatus: WORKFLOW_STATUS_enum.PAUSED });
logger.info('Workflow paused');
console.log(get().agents);
},

resumeWorkflow: async () => {
const currentStatus = get().teamWorkflowStatus;
if (currentStatus !== WORKFLOW_STATUS_enum.PAUSED) {
throw new WorkflowError('Cannot resume workflow unless it is paused');
}
set({
teamWorkflowStatus: WORKFLOW_STATUS_enum.RESUMED,
});
const tasks = get().tasks;

tasks.forEach((task) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Blocked tasks are being reset here to the Doing state?

if (task.status === TASK_STATUS_enum.BLOCKED) {
get().updateTaskStatus(task.id, TASK_STATUS_enum.DOING);
}
});

// Resume task queue
get().taskQueue.start();

logger.info('Workflow resumed');
set({ teamWorkflowStatus: WORKFLOW_STATUS_enum.RUNNING });
},

stopWorkflow: async () => {
const currentStatus = get().teamWorkflowStatus;

if (
currentStatus !== WORKFLOW_STATUS_enum.RUNNING &&
currentStatus !== WORKFLOW_STATUS_enum.PAUSED
) {
throw new WorkflowError(
'Cannot stop workflow unless it is running or paused'
);
}

set({ teamWorkflowStatus: WORKFLOW_STATUS_enum.STOPPING });

try {
// Abort all active agent promises
for (const agentId of get().activePromises.keys()) {
get().abortAgentPromises(agentId, WORKFLOW_ACTION_enum.STOP);
get().clearAgentLoopState(agentId);
}

// Clear task queue
get().taskQueue.clear();

// Update all DOING tasks to TODO
const tasks = get().tasks;
tasks.forEach((task) => {
get().updateTaskStatus(task.id, TASK_STATUS_enum.TODO);
});

set({ teamWorkflowStatus: WORKFLOW_STATUS_enum.STOPPED });
logger.info('Workflow stopped successfully');
} catch (error) {
logger.error('Error stopping workflow:', error);
set({ teamWorkflowStatus: WORKFLOW_STATUS_enum.STOPPED });
throw error;
}
},
});
8 changes: 8 additions & 0 deletions src/utils/enums.js
Original file line number Diff line number Diff line change
@@ -108,9 +108,17 @@ const FEEDBACK_STATUS_enum = {
PROCESSED: 'PROCESSED',
};

const WORKFLOW_ACTION_enum = {
STOP: 'STOP',
PAUSE: 'PAUSE',
RESUME: 'RESUME',
INITIATE: 'INITIATE',
};

export {
AGENT_STATUS_enum,
TASK_STATUS_enum,
WORKFLOW_STATUS_enum,
FEEDBACK_STATUS_enum,
WORKFLOW_ACTION_enum,
};
30 changes: 27 additions & 3 deletions src/utils/errors.js
Original file line number Diff line number Diff line change
@@ -67,11 +67,35 @@ class PrettyError extends Error {
}
}

export class AbortError extends Error {
class AbortError extends Error {
constructor(message = 'Operation was aborted') {
super(message);
this.name = 'AbortError';
}
}

export { LLMInvocationError, PrettyError };
class StopAbortError extends AbortError {
constructor(message = 'Operation was aborted and stopped') {
super(message);
this.name = 'StopAbortError';
}
}
class PauseAbortError extends AbortError {
constructor(message = 'Operation was aborted and paused') {
super(message);
this.name = 'PauseAbortError';
}
}
class WorkflowError extends Error {
constructor(message) {
super(message);
this.name = 'WorkflowError';
}
}
export {
LLMInvocationError,
PrettyError,
AbortError,
StopAbortError,
PauseAbortError,
WorkflowError,
};