From c58525bbf45fd20e6102edf639d4484dc2bf5f78 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:49:25 -0500 Subject: [PATCH] Feat: add state snapshot events, marker task tracking, etc. (#713) - Add network state polling - Consolidate events - Tie markers to specifics tasks for easy tracing --- pkg/abstractions/experimental/bot/bot.go | 7 +- pkg/abstractions/experimental/bot/bot.proto | 3 + pkg/abstractions/experimental/bot/http.go | 25 +-- pkg/abstractions/experimental/bot/instance.go | 158 +++++++++++++-- .../experimental/bot/interface.go | 48 ++++- pkg/abstractions/experimental/bot/prompt.yaml | 41 ++++ pkg/abstractions/experimental/bot/state.go | 30 ++- pkg/abstractions/experimental/bot/task.go | 23 ++- pkg/abstractions/experimental/bot/types.go | 62 ++++-- pkg/common/config.default.yaml | 9 +- pkg/repository/base.go | 1 + pkg/repository/task_redis.go | 11 + pkg/task/dispatch.go | 19 ++ pkg/types/config.go | 1 - proto/bot.pb.go | 189 +++++++++++------- .../abstractions/experimental/bot/bot.py | 39 +++- .../abstractions/experimental/bot/types.py | 38 ++++ sdk/src/beta9/clients/bot/__init__.py | 5 + sdk/src/beta9/runner/bot/transition.py | 21 +- 19 files changed, 562 insertions(+), 168 deletions(-) create mode 100644 pkg/abstractions/experimental/bot/prompt.yaml diff --git a/pkg/abstractions/experimental/bot/bot.go b/pkg/abstractions/experimental/bot/bot.go index c9565aeab..f39b07ff5 100644 --- a/pkg/abstractions/experimental/bot/bot.go +++ b/pkg/abstractions/experimental/bot/bot.go @@ -204,8 +204,9 @@ func (s *PetriBotService) PushBotEvent(ctx context.Context, in *pb.PushBotEventR } err = instance.botStateManager.pushEvent(instance.workspace.Name, instance.stub.ExternalId, in.SessionId, &BotEvent{ - Type: BotEventType(in.EventType), - Value: in.EventValue, + Type: BotEventType(in.EventType), + Value: in.EventValue, + Metadata: in.Metadata, }) if err != nil { return &pb.PushBotEventResponse{Ok: false}, nil @@ -233,7 +234,9 @@ func (s *PetriBotService) PushBotMarkers(ctx context.Context, in *pb.PushBotMark marker := Marker{ LocationName: marker.LocationName, Fields: fields, + SourceTaskId: in.SourceTaskId, } + err = s.botStateManager.pushMarker(instance.workspace.Name, instance.stub.ExternalId, in.SessionId, locationName, marker) if err != nil { log.Printf(" Failed to push marker: %s", instance.stub.ExternalId, err) diff --git a/pkg/abstractions/experimental/bot/bot.proto b/pkg/abstractions/experimental/bot/bot.proto index 465cdad4e..ae5492fce 100644 --- a/pkg/abstractions/experimental/bot/bot.proto +++ b/pkg/abstractions/experimental/bot/bot.proto @@ -31,6 +31,7 @@ message MarkerField { message Marker { string location_name = 1; repeated MarkerField fields = 2; + string source_task_id = 3; } message PushBotMarkersRequest { @@ -38,6 +39,7 @@ message PushBotMarkersRequest { string session_id = 2; map markers = 3; message MarkerList { repeated Marker markers = 4; } + string source_task_id = 5; } message PushBotMarkersResponse { bool ok = 1; } @@ -47,6 +49,7 @@ message PushBotEventRequest { string session_id = 2; string event_type = 3; string event_value = 4; + map metadata = 5; } message PushBotEventResponse { bool ok = 1; } diff --git a/pkg/abstractions/experimental/bot/http.go b/pkg/abstractions/experimental/bot/http.go index f6082c945..e4a6eb951 100644 --- a/pkg/abstractions/experimental/bot/http.go +++ b/pkg/abstractions/experimental/bot/http.go @@ -45,7 +45,7 @@ func registerBotRoutes(g *echo.Group, pbs *PetriBotService) *botGroup { } const keepAliveInterval = 1 * time.Second -const eventPollingInterval = 1 * time.Second +const eventPollingInterval = 500 * time.Millisecond func (g *botGroup) BotOpenSession(ctx echo.Context) error { cc, _ := ctx.(*auth.HttpAuthContext) @@ -125,6 +125,9 @@ func (g *botGroup) BotOpenSession(ctx echo.Context) error { err = instance.botStateManager.pushEvent(instance.workspace.Name, instance.stub.ExternalId, sessionId, &BotEvent{ Type: BotEventTypeSessionCreated, Value: sessionId, + Metadata: map[string]string{ + string(MetadataSessionId): sessionId, + }, }) if err != nil { return err @@ -183,17 +186,8 @@ func (g *botGroup) BotOpenSession(ctx echo.Context) error { continue } - if event.Type == BotEventTypeUserMessage { - instance.botInterface.SendPrompt(sessionId, PromptTypeUser, event.Value) - continue - } else if event.Type == BotEventTypeTransitionMessage { - instance.botInterface.SendPrompt(sessionId, PromptTypeTransition, event.Value) - continue - } else if event.Type == BotEventTypeMemoryMessage { - instance.botInterface.SendPrompt(sessionId, PromptTypeMemory, event.Value) - continue - } - + // Handle event and echo it back to the client + instance.eventChan <- event serializedEvent, err := json.Marshal(event) if err != nil { continue @@ -216,12 +210,13 @@ func (g *botGroup) BotOpenSession(ctx echo.Context) error { break } - var userRequest UserRequest - if err := json.Unmarshal(message, &userRequest); err != nil { + var event BotEvent + if err := json.Unmarshal(message, &event); err != nil { continue } - if err := instance.botStateManager.pushInputMessage(instance.workspace.Name, instance.stub.ExternalId, sessionId, userRequest.Msg); err != nil { + event.Metadata[string(MetadataSessionId)] = sessionId + if err := instance.botStateManager.pushUserEvent(instance.workspace.Name, instance.stub.ExternalId, sessionId, &event); err != nil { continue } } diff --git a/pkg/abstractions/experimental/bot/instance.go b/pkg/abstractions/experimental/bot/instance.go index f5bde27c7..e672517dc 100644 --- a/pkg/abstractions/experimental/bot/instance.go +++ b/pkg/abstractions/experimental/bot/instance.go @@ -2,6 +2,7 @@ package bot import ( "context" + "encoding/json" "errors" "fmt" "log" @@ -36,6 +37,7 @@ type botInstance struct { taskDispatcher *task.Dispatcher authInfo *auth.AuthInfo containerRepo repository.ContainerRepository + eventChan chan *BotEvent } type botInstanceOpts struct { @@ -65,7 +67,7 @@ func newBotInstance(ctx context.Context, opts botInstanceOpts) (*botInstance, er return nil, err } - return &botInstance{ + instance := &botInstance{ ctx: ctx, appConfig: opts.AppConfig, token: opts.Token, @@ -83,7 +85,12 @@ func newBotInstance(ctx context.Context, opts botInstanceOpts) (*botInstance, er Token: opts.Token, }, containerRepo: opts.ContainerRepo, - }, nil + eventChan: make(chan *BotEvent), + } + + go instance.monitorEvents() + go instance.sendNetworkState() + return instance, nil } func (i *botInstance) containersBySessionId() (map[string][]string, error) { @@ -141,10 +148,8 @@ func (i *botInstance) Start() error { lastActiveSessionAt = time.Now().Unix() for _, session := range activeSessions { - if msg, err := i.botStateManager.popInputMessage(i.workspace.Name, i.stub.ExternalId, session.Id); err == nil { - if err := i.botInterface.SendPrompt(session.Id, PromptTypeUser, msg); err != nil { - continue - } + if event, err := i.botStateManager.popUserEvent(i.workspace.Name, i.stub.ExternalId, session.Id); err == nil { + i.eventChan <- event } // Run any network transitions that can run @@ -218,20 +223,148 @@ func (i *botInstance) step(sessionId string) { }, } + // If this transition requires explicit confirmation, we need to send a confirmation request before executing the task + if transition.Confirm { + t, err := i.taskDispatcher.Send(i.ctx, string(types.ExecutorBot), i.authInfo, i.stub.ExternalId, taskPayload, getDefaultTaskPolicy()) + if err != nil { + i.handleTransitionFailed(sessionId, transition.Name, err) + continue + } + + i.botStateManager.pushEvent(i.workspace.Name, i.stub.ExternalId, sessionId, &BotEvent{ + Type: BotEventTypeConfirmRequest, + Value: transition.Name, + Metadata: map[string]string{ + string(MetadataSessionId): sessionId, + string(MetadataTransitionName): transition.Name, + string(MetadataTaskId): t.Metadata().TaskId, + }, + }) + + continue + } + + t, err := i.taskDispatcher.SendAndExecute(i.ctx, string(types.ExecutorBot), i.authInfo, i.stub.ExternalId, taskPayload, getDefaultTaskPolicy()) + if err != nil { + i.handleTransitionFailed(sessionId, transition.Name, err) + continue + } + i.botStateManager.pushEvent(i.workspace.Name, i.stub.ExternalId, sessionId, &BotEvent{ Type: BotEventTypeTransitionFired, Value: transition.Name, + Metadata: map[string]string{ + string(MetadataSessionId): sessionId, + string(MetadataTransitionName): transition.Name, + string(MetadataTaskId): t.Metadata().TaskId, + }, }) + } + } + + }() +} + +func (i *botInstance) sendNetworkState() { + for { + select { + case <-i.ctx.Done(): + return + case <-time.After(time.Second): + activeSessions, err := i.botStateManager.getActiveSessions(i.workspace.Name, i.stub.ExternalId) + if err != nil || len(activeSessions) == 0 { + continue + } - i.taskDispatcher.SendAndExecute(i.ctx, string(types.ExecutorBot), i.authInfo, i.stub.ExternalId, taskPayload, types.TaskPolicy{ - MaxRetries: 0, - Timeout: 3600, - TTL: 3600, - Expires: time.Now().Add(time.Duration(3600) * time.Second), + for _, session := range activeSessions { + state := &BotNetworkSnapshot{ + SessionId: session.Id, + LocationMarkerCounts: make(map[string]int64), + Config: i.botConfig, + } + + for locationName := range i.botConfig.Locations { + count, err := i.botStateManager.countMarkers(i.workspace.Name, i.stub.ExternalId, session.Id, locationName) + if err != nil { + continue + } + state.LocationMarkerCounts[locationName] = count + } + + stateJson, err := json.Marshal(state) + if err != nil { + continue + } + + i.botStateManager.pushEvent(i.workspace.Name, i.stub.ExternalId, session.Id, &BotEvent{ + Type: BotEventTypeNetworkState, + Value: string(stateJson), + Metadata: map[string]string{ + string(MetadataSessionId): session.Id, + }, }) } } - }() + } +} + +func (i *botInstance) handleTransitionFailed(sessionId, transitionName string, err error) { + i.botStateManager.pushEvent(i.workspace.Name, i.stub.ExternalId, sessionId, &BotEvent{ + Type: BotEventTypeTransitionFailed, + Value: transitionName, + Metadata: map[string]string{ + string(MetadataSessionId): sessionId, + string(MetadataTransitionName): transitionName, + string(MetadataErrorMsg): err.Error(), + }, + }) +} + +func getDefaultTaskPolicy() types.TaskPolicy { + return types.TaskPolicy{ + MaxRetries: 0, + Timeout: 3600, + TTL: 3600, + Expires: time.Now().Add(time.Duration(3600) * time.Second), + } +} + +func (i *botInstance) monitorEvents() error { + for { + select { + case <-i.ctx.Done(): + return nil + case event := <-i.eventChan: + sessionId := event.Metadata[string(MetadataSessionId)] + + switch event.Type { + case BotEventTypeUserMessage: + i.botInterface.SendPrompt(sessionId, PromptTypeUser, &PromptRequest{Msg: event.Value, RequestId: event.Metadata[string(MetadataRequestId)]}) + case BotEventTypeTransitionMessage: + i.botInterface.SendPrompt(sessionId, PromptTypeTransition, &PromptRequest{Msg: event.Value}) + case BotEventTypeMemoryMessage: + i.botInterface.SendPrompt(sessionId, PromptTypeMemory, &PromptRequest{Msg: event.Value}) + case BotEventTypeConfirmResponse: + taskId := event.Metadata[string(MetadataTaskId)] + accepts := event.Metadata[string(MetadataAccept)] == "true" + transitionName := event.Metadata[string(MetadataTransitionName)] + + task, err := i.taskDispatcher.Retrieve(i.ctx, i.workspace.Name, i.stub.ExternalId, taskId) + if err != nil { + continue + } + + if accepts { + err = task.Execute(i.ctx) + if err != nil { + i.handleTransitionFailed(sessionId, transitionName, err) + } + } else { + task.Cancel(i.ctx, types.TaskRequestCancelled) + } + } + } + } } func (i *botInstance) run(transitionName, sessionId, taskId string) error { @@ -290,7 +423,6 @@ func (i *botInstance) run(transitionName, sessionId, taskId string) error { Stub: *i.stub, }) if err != nil { - log.Printf(" Error running transition %s: %s", i.stub.ExternalId, transitionName, err) return err } diff --git a/pkg/abstractions/experimental/bot/interface.go b/pkg/abstractions/experimental/bot/interface.go index 15f11b532..b297b9c80 100644 --- a/pkg/abstractions/experimental/bot/interface.go +++ b/pkg/abstractions/experimental/bot/interface.go @@ -2,12 +2,14 @@ package bot import ( "context" + _ "embed" "encoding/json" "fmt" "github.com/beam-cloud/beta9/pkg/types" openai "github.com/sashabaranov/go-openai" "github.com/sashabaranov/go-openai/jsonschema" + "gopkg.in/yaml.v2" ) type BotInterface struct { @@ -31,18 +33,38 @@ type botInterfaceOpts struct { Stub *types.StubWithRelated } +var ( + //go:embed prompt.yaml + defaultBotSystemPrompt string + defaultBotSystemPromptKey = "prompt" +) + func NewBotInterface(opts botInterfaceOpts) (*BotInterface, error) { + systemPrompt := opts.AppConfig.Abstractions.Bot.SystemPrompt + if systemPrompt == "" { + var promptData map[string]interface{} + + err := yaml.Unmarshal([]byte(defaultBotSystemPrompt), &promptData) + if err != nil { + return nil, err + } + + if prompt, ok := promptData[defaultBotSystemPromptKey].(string); ok { + systemPrompt = prompt + } + } + bi := &BotInterface{ client: openai.NewClient(opts.BotConfig.ApiKey), botConfig: opts.BotConfig, model: opts.BotConfig.Model, - systemPrompt: opts.AppConfig.Abstractions.Bot.SystemPrompt, + systemPrompt: systemPrompt, stateManager: opts.StateManager, workspace: opts.Workspace, stub: opts.Stub, } - // Generate the schema for each response + // Generate the schemas for each response type var userResponse BotUserResponse schema, err := jsonschema.GenerateSchemaForType(userResponse) if err != nil { @@ -154,7 +176,7 @@ func (bi *BotInterface) getSessionHistory(sessionId string) ([]openai.ChatComple return state.GetMessagesInOpenAIFormat(), nil } -func (bi *BotInterface) SendPrompt(sessionId, messageType, prompt string) error { +func (bi *BotInterface) SendPrompt(sessionId, messageType string, req *PromptRequest) error { messages, err := bi.getSessionHistory(sessionId) if err != nil { return err @@ -162,7 +184,7 @@ func (bi *BotInterface) SendPrompt(sessionId, messageType, prompt string) error role := openai.ChatMessageRoleUser promptMessage := openai.ChatCompletionMessage{ - Content: prompt, + Content: req.Msg, } var schema *jsonschema.Definition = bi.userSchema @@ -170,14 +192,14 @@ func (bi *BotInterface) SendPrompt(sessionId, messageType, prompt string) error switch messageType { case PromptTypeUser: role = openai.ChatMessageRoleUser - promptMessage.Content = wrapPrompt(PromptTypeUser, prompt) + promptMessage.Content = wrapPrompt(PromptTypeUser, req.Msg) case PromptTypeTransition: role = openai.ChatMessageRoleUser - promptMessage.Content = wrapPrompt(PromptTypeTransition, prompt) + promptMessage.Content = wrapPrompt(PromptTypeTransition, req.Msg) schema = bi.transitionSchema case PromptTypeMemory: role = openai.ChatMessageRoleUser - promptMessage.Content = wrapPrompt(PromptTypeMemory, prompt) + promptMessage.Content = wrapPrompt(PromptTypeMemory, req.Msg) schema = bi.memorySchema default: return fmt.Errorf("invalid message type: %s", messageType) @@ -206,10 +228,11 @@ func (bi *BotInterface) SendPrompt(sessionId, messageType, prompt string) error } responseMessage := resp.Choices[0].Message + err = bi.addMessagesToSessionHistory(sessionId, []BotChatCompletionMessage{ { Role: role, - Content: prompt, + Content: req.Msg, }, { Role: responseMessage.Role, @@ -248,13 +271,20 @@ func (bi *BotInterface) SendPrompt(sessionId, messageType, prompt string) error } else if messageType == PromptTypeMemory { return bi.stateManager.pushEvent(bi.workspace.Name, bi.stub.ExternalId, sessionId, &BotEvent{ Type: BotEventTypeMemoryUpdated, - Value: prompt, + Value: req.Msg, + Metadata: map[string]string{ + string(MetadataSessionId): sessionId, + }, }) } event := &BotEvent{ Type: BotEventTypeAgentMessage, Value: msg, + Metadata: map[string]string{ + string(MetadataRequestId): req.RequestId, + string(MetadataSessionId): sessionId, + }, } return bi.stateManager.pushEvent(bi.workspace.Name, bi.stub.ExternalId, sessionId, event) diff --git a/pkg/abstractions/experimental/bot/prompt.yaml b/pkg/abstractions/experimental/bot/prompt.yaml new file mode 100644 index 000000000..7878320e3 --- /dev/null +++ b/pkg/abstractions/experimental/bot/prompt.yaml @@ -0,0 +1,41 @@ +prompt: | + You are a useful AI agent. You are given a petri net structure and a set of possible transitions. + + You have three ways of responding to user messages: + + If messages are wrapped with the tag, follow these instructions: + + + When asked what you can do, refer to the configuration provided to you in the next two messages to help guide the user towards a particular set of inputs. + Your job is to take the users input, and format it into markers that can be placed on the petri net. + Use the transition descriptions to explain your capabilties, if they are provided. + Do not ever explicitly mention markers, petri nets, or marker data to the user! + + Use the following rules to respond to user messages: + + 1. Focus on gathering all necessary user inputs to populate markers. If any input is missing, ask the user for it. + 2. Use the "inputs" field of a transition to determine required data for each transition. + 3. Only set complete_marker to true and populate marker_data when all required data for a marker is available and verified. + 4. Return complete markers in marker_data as JSON, with complete_marker set to true, only when all required fields are present. + 5. Avoid resending a complete marker unless specifically requested. + 6. If some markers are complete and others are not, return the complete ones and request missing data for the others. + 7. Do not fabricate data; only use user-provided data formatted according to marker definitions. + 8. If no new marker_data is ready, return an empty JSON object. + 9. Remember previous data and only request missing information for incomplete markers. + 10. Never set default values for marker fields unless they are optional. + 11. Always set source_task_id to an empty string. + 12. Do not mention petri nets to the user. + + + + If the messages are wrapped with tag, follow these instructions: + + + 1. Response as you deem appropriate. + + + If the messages are wrapped with tag, follow these instructions: + + + 1. Do not respond to the user, just remember the information provided. + \ No newline at end of file diff --git a/pkg/abstractions/experimental/bot/state.go b/pkg/abstractions/experimental/bot/state.go index bdb6816f3..a0b8ae26a 100644 --- a/pkg/abstractions/experimental/bot/state.go +++ b/pkg/abstractions/experimental/bot/state.go @@ -23,12 +23,6 @@ func newBotStateManager(rdb *common.RedisClient) *botStateManager { } func (m *botStateManager) loadSession(workspaceName, stubId, sessionId string) (*BotSession, error) { - err := m.lock.Acquire(context.TODO(), Keys.botLock(workspaceName, stubId, sessionId), common.RedisLockOptions{TtlS: 10, Retries: 0}) - if err != nil { - return nil, err - } - defer m.lock.Release(Keys.botLock(workspaceName, stubId, sessionId)) - stateKey := Keys.botSessionState(workspaceName, stubId, sessionId) jsonData, err := m.rdb.Get(context.TODO(), stateKey).Result() if err != nil { @@ -205,13 +199,29 @@ func (m *botStateManager) popTask(workspaceName, stubId, sessionId, transitionNa return markers, nil } -func (m *botStateManager) pushInputMessage(workspaceName, stubId, sessionId, msg string) error { +func (m *botStateManager) pushUserEvent(workspaceName, stubId, sessionId string, event *BotEvent) error { + jsonData, err := json.Marshal(event) + if err != nil { + return err + } + messageKey := Keys.botInputBuffer(workspaceName, stubId, sessionId) - return m.rdb.RPush(context.TODO(), messageKey, msg).Err() + return m.rdb.RPush(context.TODO(), messageKey, jsonData).Err() } -func (m *botStateManager) popInputMessage(workspaceName, stubId, sessionId string) (string, error) { - return m.rdb.LPop(context.TODO(), Keys.botInputBuffer(workspaceName, stubId, sessionId)).Result() +func (m *botStateManager) popUserEvent(workspaceName, stubId, sessionId string) (*BotEvent, error) { + val, err := m.rdb.LPop(context.TODO(), Keys.botInputBuffer(workspaceName, stubId, sessionId)).Result() + if err != nil { + return nil, err + } + + event := &BotEvent{} + err = json.Unmarshal([]byte(val), event) + if err != nil { + return nil, err + } + + return event, nil } func (m *botStateManager) pushEvent(workspaceName, stubId, sessionId string, event *BotEvent) error { diff --git a/pkg/abstractions/experimental/bot/task.go b/pkg/abstractions/experimental/bot/task.go index 9f4aa4605..5ccd9dbd9 100644 --- a/pkg/abstractions/experimental/bot/task.go +++ b/pkg/abstractions/experimental/bot/task.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/beam-cloud/beta9/pkg/types" + "github.com/mitchellh/mapstructure" ) type BotTask struct { @@ -29,7 +30,27 @@ func (t *BotTask) Execute(ctx context.Context, options ...interface{}) error { transitionName := t.msg.Kwargs["transition_name"].(string) sessionId := t.msg.Kwargs["session_id"].(string) - markers := t.msg.Kwargs["markers"].([]Marker) + + // Try to cast the markers to a slice of Marker + var markers []Marker + rawMarkers, ok := t.msg.Kwargs["markers"].([]Marker) + if !ok { + // If that fails, manually decode using mapstructure + rawMarkersInterface, ok := t.msg.Kwargs["markers"].([]interface{}) + if !ok { + return errors.New("invalid markers format") + } + + markers = make([]Marker, len(rawMarkersInterface)) + for i, rawMarker := range rawMarkersInterface { + err := mapstructure.Decode(rawMarker, &markers[i]) + if err != nil { + return err + } + } + } else { + markers = rawMarkers + } err = t.pbs.botStateManager.pushTask(instance.workspace.Name, instance.stub.ExternalId, sessionId, transitionName, t.msg.TaskId, markers) if err != nil { diff --git a/pkg/abstractions/experimental/bot/types.go b/pkg/abstractions/experimental/bot/types.go index 2b3910392..78cd41669 100644 --- a/pkg/abstractions/experimental/bot/types.go +++ b/pkg/abstractions/experimental/bot/types.go @@ -68,24 +68,28 @@ type BotChatCompletionMessage struct { const botSchemaName = "beam_bot" -type UserRequest struct { - Msg string `json:"msg" redis:"msg"` +type PromptRequest struct { + Msg string `json:"msg" redis:"msg"` + RequestId string `json:"request_id" redis:"request_id"` } type BotEventType string const ( - BotEventTypeError BotEventType = "error" - BotEventTypeAgentMessage BotEventType = "agent_message" - BotEventTypeUserMessage BotEventType = "user_message" - BotEventTypeTransitionMessage BotEventType = "transition_message" - BotEventTypeMemoryMessage BotEventType = "memory_message" - BotEventTypeMemoryUpdated BotEventType = "memory_updated" - BotEventTypeSessionCreated BotEventType = "session_created" - BotEventTypeTransitionFired BotEventType = "transition_fired" - BotEventTypeTaskStarted BotEventType = "task_started" - BotEventTypeTaskCompleted BotEventType = "task_completed" - BotEventTypeState BotEventType = "current_state" + BotEventTypeError BotEventType = "error" + BotEventTypeAgentMessage BotEventType = "agent_message" + BotEventTypeUserMessage BotEventType = "user_message" + BotEventTypeTransitionMessage BotEventType = "transition_message" + BotEventTypeMemoryMessage BotEventType = "memory_message" + BotEventTypeMemoryUpdated BotEventType = "memory_updated" + BotEventTypeSessionCreated BotEventType = "session_created" + BotEventTypeTransitionFired BotEventType = "transition_fired" + BotEventTypeTransitionFailed BotEventType = "transition_failed" + BotEventTypeTransitionStarted BotEventType = "transition_started" + BotEventTypeTransitionCompleted BotEventType = "transition_completed" + BotEventTypeNetworkState BotEventType = "network_state" + BotEventTypeConfirmRequest BotEventType = "confirm_request" + BotEventTypeConfirmResponse BotEventType = "confirm_response" ) const PromptTypeUser = "user_message" @@ -93,10 +97,22 @@ const PromptTypeTransition = "transition_message" const PromptTypeMemory = "memory_message" type BotEvent struct { - Type BotEventType `json:"type" redis:"type"` - Value string `json:"value" redis:"value"` + Type BotEventType `json:"type" redis:"type"` + Value string `json:"value" redis:"value"` + Metadata map[string]string `json:"metadata" redis:"metadata"` } +type MetadataKey string + +const ( + MetadataRequestId MetadataKey = "request_id" + MetadataSessionId MetadataKey = "session_id" + MetadataTransitionName MetadataKey = "transition_name" + MetadataTaskId MetadataKey = "task_id" + MetadataErrorMsg MetadataKey = "error_msg" + MetadataAccept MetadataKey = "accept" +) + type BotUserResponse struct { Msg string `json:"msg" redis:"msg"` MarkerData Marker `json:"marker_data" redis:"marker_data"` @@ -112,13 +128,14 @@ type BotMemoryResponse struct { } type Marker struct { - LocationName string `json:"location_name" redis:"location_name"` - Fields []MarkerField `json:"marker_data" redis:"marker_data"` + LocationName string `json:"location_name" mapstructure:"location_name"` + Fields []MarkerField `json:"marker_data" mapstructure:"marker_data"` + SourceTaskId string `json:"source_task_id" mapstructure:"source_task_id"` } type MarkerField struct { - FieldName string `json:"field_name" redis:"field_name"` - FieldValue string `json:"field_value" redis:"field_value"` + FieldName string `json:"field_name" mapstructure:"field_name"` + FieldValue string `json:"field_value" mapstructure:"field_value"` } // BotConfig holds the overall config for the bot @@ -195,6 +212,7 @@ type BotTransitionConfig struct { Outputs []string `json:"outputs" redis:"outputs"` Description string `json:"description" redis:"description"` Expose bool `json:"expose" redis:"expose"` + Confirm bool `json:"confirm" redis:"confirm"` } func (t *BotTransitionConfig) FormatTransition() string { @@ -218,3 +236,9 @@ func (t *BotTransitionConfig) FormatTransition() string { t.Description, ) } + +type BotNetworkSnapshot struct { + SessionId string `json:"session_id" redis:"session_id"` + LocationMarkerCounts map[string]int64 `json:"location_marker_counts" redis:"location_marker_counts"` + Config BotConfig `json:"config" redis:"config"` +} diff --git a/pkg/common/config.default.yaml b/pkg/common/config.default.yaml index 0fd59d55f..7d4881cfd 100644 --- a/pkg/common/config.default.yaml +++ b/pkg/common/config.default.yaml @@ -214,13 +214,6 @@ monitoring: apiKey: "" abstractions: bot: - systemPrompt: | - You are a useful AI agent. You are given a petri net structure and a set of possible transitions. - Convert user input data into markers that can be used to populate the petri net. - Use the transition descriptions to help guide the user towards the correct input data. - - Only return marker_complete=true if you have all the data you need to populate a marker in that response. If you are missing data for a marker, return marker_complete=false. - Do not return the same marker data back to the user in consecutive responses. - + systemPrompt: "" stepIntervalS: 1 sessionInactivityTimeoutS: 10 diff --git a/pkg/repository/base.go b/pkg/repository/base.go index d2087d558..4d8ee4f89 100755 --- a/pkg/repository/base.go +++ b/pkg/repository/base.go @@ -132,6 +132,7 @@ type BackendRepository interface { } type TaskRepository interface { + GetTaskState(ctx context.Context, workspaceName, stubId, taskId string) (*types.TaskMessage, error) SetTaskState(ctx context.Context, workspaceName, stubId, taskId string, msg []byte) error DeleteTaskState(ctx context.Context, workspaceName, stubId, taskId string) error GetTasksInFlight(ctx context.Context) ([]*types.TaskMessage, error) diff --git a/pkg/repository/task_redis.go b/pkg/repository/task_redis.go index 69077cf96..3b3629ff8 100644 --- a/pkg/repository/task_redis.go +++ b/pkg/repository/task_redis.go @@ -96,6 +96,17 @@ func (r *TaskRedisRepository) SetTaskState(ctx context.Context, workspaceName, s return nil } +func (r *TaskRedisRepository) GetTaskState(ctx context.Context, workspaceName, stubId, taskId string) (*types.TaskMessage, error) { + msg, err := r.rdb.Get(ctx, common.RedisKeys.TaskEntry(workspaceName, stubId, taskId)).Bytes() + if err != nil { + return nil, err + } + + taskMessage := &types.TaskMessage{} + taskMessage.Decode(msg) + return taskMessage, nil +} + func (r *TaskRedisRepository) TasksInFlight(ctx context.Context, workspaceName, stubId string) (int, error) { tasks, err := r.rdb.SMembers(ctx, common.RedisKeys.TaskIndexByStub(workspaceName, stubId)).Result() if err != nil { diff --git a/pkg/task/dispatch.go b/pkg/task/dispatch.go index 05d58ae10..3ef9801a2 100644 --- a/pkg/task/dispatch.go +++ b/pkg/task/dispatch.go @@ -108,6 +108,25 @@ func (d *Dispatcher) Send(ctx context.Context, executor string, authInfo *auth.A return task, nil } +func (d *Dispatcher) Retrieve(ctx context.Context, workspaceName, stubId, taskId string) (types.TaskInterface, error) { + taskMessage, err := d.taskRepo.GetTaskState(ctx, workspaceName, stubId, taskId) + if err != nil { + return nil, err + } + + taskFactory, exists := d.executors.Get(taskMessage.Executor) + if !exists { + return nil, fmt.Errorf("invalid task executor: %v", taskMessage.Executor) + } + + task, err := taskFactory(ctx, *taskMessage) + if err != nil { + return nil, err + } + + return task, nil +} + func (d *Dispatcher) Complete(ctx context.Context, workspaceName, stubId, taskId string) error { return d.taskRepo.DeleteTaskState(ctx, workspaceName, stubId, taskId) } diff --git a/pkg/types/config.go b/pkg/types/config.go index be5f7954f..1799724b0 100644 --- a/pkg/types/config.go +++ b/pkg/types/config.go @@ -102,7 +102,6 @@ type GatewayServiceConfig struct { type ImageServiceConfig struct { LocalCacheEnabled bool `key:"localCacheEnabled" json:"local_cache_enabled"` - BlobCacheEnabled bool `key:"blobCacheEnabled" json:"blob_cache_enabled"` // TODO: remove this once all workers cycle with the new config RegistryStore string `key:"registryStore" json:"registry_store"` RegistryCredentialProviderName string `key:"registryCredentialProvider" json:"registry_credential_provider_name"` Registries ImageRegistriesConfig `key:"registries" json:"registries"` diff --git a/proto/bot.pb.go b/proto/bot.pb.go index d072c2303..3e5549ea0 100644 --- a/proto/bot.pb.go +++ b/proto/bot.pb.go @@ -208,6 +208,7 @@ type Marker struct { LocationName string `protobuf:"bytes,1,opt,name=location_name,json=locationName,proto3" json:"location_name,omitempty"` Fields []*MarkerField `protobuf:"bytes,2,rep,name=fields,proto3" json:"fields,omitempty"` + SourceTaskId string `protobuf:"bytes,3,opt,name=source_task_id,json=sourceTaskId,proto3" json:"source_task_id,omitempty"` } func (x *Marker) Reset() { @@ -256,14 +257,22 @@ func (x *Marker) GetFields() []*MarkerField { return nil } +func (x *Marker) GetSourceTaskId() string { + if x != nil { + return x.SourceTaskId + } + return "" +} + type PushBotMarkersRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - StubId string `protobuf:"bytes,1,opt,name=stub_id,json=stubId,proto3" json:"stub_id,omitempty"` - SessionId string `protobuf:"bytes,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` - Markers map[string]*PushBotMarkersRequest_MarkerList `protobuf:"bytes,3,rep,name=markers,proto3" json:"markers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + StubId string `protobuf:"bytes,1,opt,name=stub_id,json=stubId,proto3" json:"stub_id,omitempty"` + SessionId string `protobuf:"bytes,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` + Markers map[string]*PushBotMarkersRequest_MarkerList `protobuf:"bytes,3,rep,name=markers,proto3" json:"markers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + SourceTaskId string `protobuf:"bytes,5,opt,name=source_task_id,json=sourceTaskId,proto3" json:"source_task_id,omitempty"` } func (x *PushBotMarkersRequest) Reset() { @@ -319,6 +328,13 @@ func (x *PushBotMarkersRequest) GetMarkers() map[string]*PushBotMarkersRequest_M return nil } +func (x *PushBotMarkersRequest) GetSourceTaskId() string { + if x != nil { + return x.SourceTaskId + } + return "" +} + type PushBotMarkersResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -371,10 +387,11 @@ type PushBotEventRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - StubId string `protobuf:"bytes,1,opt,name=stub_id,json=stubId,proto3" json:"stub_id,omitempty"` - SessionId string `protobuf:"bytes,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` - EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` - EventValue string `protobuf:"bytes,4,opt,name=event_value,json=eventValue,proto3" json:"event_value,omitempty"` + StubId string `protobuf:"bytes,1,opt,name=stub_id,json=stubId,proto3" json:"stub_id,omitempty"` + SessionId string `protobuf:"bytes,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` + EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` + EventValue string `protobuf:"bytes,4,opt,name=event_value,json=eventValue,proto3" json:"event_value,omitempty"` + Metadata map[string]string `protobuf:"bytes,5,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *PushBotEventRequest) Reset() { @@ -437,6 +454,13 @@ func (x *PushBotEventRequest) GetEventValue() string { return "" } +func (x *PushBotEventRequest) GetMetadata() map[string]string { + if x != nil { + return x.Metadata + } + return nil +} + type PushBotEventResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -611,63 +635,76 @@ var file_bot_proto_rawDesc = []byte{ 0x69, 0x65, 0x6c, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0a, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x57, 0x0a, 0x06, 0x4d, + 0x0a, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7d, 0x0a, 0x06, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x12, 0x23, 0x0a, 0x0d, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, - 0x65, 0x6c, 0x64, 0x73, 0x22, 0xaa, 0x02, 0x0a, 0x15, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, - 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, - 0x0a, 0x07, 0x73, 0x74, 0x75, 0x62, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x73, 0x74, 0x75, 0x62, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x65, 0x73, 0x73, 0x69, - 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, 0x73, - 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x41, 0x0a, 0x07, 0x6d, 0x61, 0x72, 0x6b, 0x65, 0x72, - 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, - 0x73, 0x68, 0x42, 0x6f, 0x74, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x2e, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x52, 0x07, 0x6d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x1a, 0x61, 0x0a, 0x0c, 0x4d, 0x61, 0x72, - 0x6b, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x3b, 0x0a, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x62, 0x6f, 0x74, - 0x2e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, - 0x74, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x33, 0x0a, 0x0a, - 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x07, 0x6d, 0x61, - 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x62, 0x6f, - 0x74, 0x2e, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x07, 0x6d, 0x61, 0x72, 0x6b, 0x65, 0x72, - 0x73, 0x22, 0x28, 0x0a, 0x16, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x4d, 0x61, 0x72, 0x6b, - 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, - 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x22, 0x8d, 0x01, 0x0a, 0x13, + 0x65, 0x6c, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, + 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x22, 0xd0, 0x02, 0x0a, 0x15, 0x50, + 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x74, 0x75, 0x62, 0x5f, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x75, 0x62, 0x49, 0x64, 0x12, 0x1d, 0x0a, + 0x0a, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x41, 0x0a, 0x07, + 0x6d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, + 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x4d, 0x61, 0x72, 0x6b, 0x65, + 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, + 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x6d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, + 0x24, 0x0a, 0x0e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, + 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, + 0x61, 0x73, 0x6b, 0x49, 0x64, 0x1a, 0x61, 0x0a, 0x0c, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x3b, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, 0x73, + 0x68, 0x42, 0x6f, 0x74, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x2e, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x33, 0x0a, 0x0a, 0x4d, 0x61, 0x72, 0x6b, + 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x07, 0x6d, 0x61, 0x72, 0x6b, 0x65, 0x72, + 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x4d, 0x61, + 0x72, 0x6b, 0x65, 0x72, 0x52, 0x07, 0x6d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x22, 0x28, 0x0a, + 0x16, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x22, 0x8e, 0x02, 0x0a, 0x13, 0x50, 0x75, 0x73, 0x68, + 0x42, 0x6f, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x17, 0x0a, 0x07, 0x73, 0x74, 0x75, 0x62, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x73, 0x74, 0x75, 0x62, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x65, 0x73, 0x73, + 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, + 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x42, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x74, 0x75, 0x62, 0x5f, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x75, 0x62, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, - 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x65, - 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x26, 0x0a, 0x14, 0x50, - 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x02, 0x6f, 0x6b, 0x32, 0xe1, 0x01, 0x0a, 0x0a, 0x42, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x12, 0x3f, 0x0a, 0x0a, 0x50, 0x6f, 0x70, 0x42, 0x6f, 0x74, 0x54, 0x61, 0x73, 0x6b, - 0x12, 0x16, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x6f, 0x70, 0x42, 0x6f, 0x74, 0x54, 0x61, 0x73, - 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, - 0x6f, 0x70, 0x42, 0x6f, 0x74, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x4d, 0x61, - 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x1a, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, 0x73, 0x68, - 0x42, 0x6f, 0x74, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1b, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x4d, - 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x12, 0x45, 0x0a, 0x0c, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, - 0x12, 0x18, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x62, 0x6f, 0x74, - 0x2e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2d, 0x63, 0x6c, 0x6f, 0x75, 0x64, - 0x2f, 0x62, 0x65, 0x74, 0x61, 0x39, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x73, 0x74, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x3b, 0x0a, 0x0d, 0x4d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, + 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x26, 0x0a, 0x14, 0x50, 0x75, 0x73, 0x68, + 0x42, 0x6f, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, + 0x32, 0xe1, 0x01, 0x0a, 0x0a, 0x42, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, + 0x3f, 0x0a, 0x0a, 0x50, 0x6f, 0x70, 0x42, 0x6f, 0x74, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x16, 0x2e, + 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x6f, 0x70, 0x42, 0x6f, 0x74, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x6f, 0x70, 0x42, + 0x6f, 0x74, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x4b, 0x0a, 0x0e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x4d, 0x61, 0x72, 0x6b, 0x65, + 0x72, 0x73, 0x12, 0x1a, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, + 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, + 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x4d, 0x61, 0x72, 0x6b, + 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x45, 0x0a, + 0x0c, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x18, 0x2e, + 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x42, 0x6f, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x62, 0x6f, 0x74, 0x2e, 0x50, 0x75, + 0x73, 0x68, 0x42, 0x6f, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2d, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x62, 0x65, + 0x74, 0x61, 0x39, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -682,7 +719,7 @@ func file_bot_proto_rawDescGZIP() []byte { return file_bot_proto_rawDescData } -var file_bot_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_bot_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_bot_proto_goTypes = []interface{}{ (*PopBotTaskRequest)(nil), // 0: bot.PopBotTaskRequest (*PopBotTaskResponse)(nil), // 1: bot.PopBotTaskResponse @@ -696,26 +733,28 @@ var file_bot_proto_goTypes = []interface{}{ (*PopBotTaskResponse_MarkerList)(nil), // 9: bot.PopBotTaskResponse.MarkerList nil, // 10: bot.PushBotMarkersRequest.MarkersEntry (*PushBotMarkersRequest_MarkerList)(nil), // 11: bot.PushBotMarkersRequest.MarkerList + nil, // 12: bot.PushBotEventRequest.MetadataEntry } var file_bot_proto_depIdxs = []int32{ 8, // 0: bot.PopBotTaskResponse.markers:type_name -> bot.PopBotTaskResponse.MarkersEntry 2, // 1: bot.Marker.fields:type_name -> bot.MarkerField 10, // 2: bot.PushBotMarkersRequest.markers:type_name -> bot.PushBotMarkersRequest.MarkersEntry - 9, // 3: bot.PopBotTaskResponse.MarkersEntry.value:type_name -> bot.PopBotTaskResponse.MarkerList - 3, // 4: bot.PopBotTaskResponse.MarkerList.markers:type_name -> bot.Marker - 11, // 5: bot.PushBotMarkersRequest.MarkersEntry.value:type_name -> bot.PushBotMarkersRequest.MarkerList - 3, // 6: bot.PushBotMarkersRequest.MarkerList.markers:type_name -> bot.Marker - 0, // 7: bot.BotService.PopBotTask:input_type -> bot.PopBotTaskRequest - 4, // 8: bot.BotService.PushBotMarkers:input_type -> bot.PushBotMarkersRequest - 6, // 9: bot.BotService.PushBotEvent:input_type -> bot.PushBotEventRequest - 1, // 10: bot.BotService.PopBotTask:output_type -> bot.PopBotTaskResponse - 5, // 11: bot.BotService.PushBotMarkers:output_type -> bot.PushBotMarkersResponse - 7, // 12: bot.BotService.PushBotEvent:output_type -> bot.PushBotEventResponse - 10, // [10:13] is the sub-list for method output_type - 7, // [7:10] is the sub-list for method input_type - 7, // [7:7] is the sub-list for extension type_name - 7, // [7:7] is the sub-list for extension extendee - 0, // [0:7] is the sub-list for field type_name + 12, // 3: bot.PushBotEventRequest.metadata:type_name -> bot.PushBotEventRequest.MetadataEntry + 9, // 4: bot.PopBotTaskResponse.MarkersEntry.value:type_name -> bot.PopBotTaskResponse.MarkerList + 3, // 5: bot.PopBotTaskResponse.MarkerList.markers:type_name -> bot.Marker + 11, // 6: bot.PushBotMarkersRequest.MarkersEntry.value:type_name -> bot.PushBotMarkersRequest.MarkerList + 3, // 7: bot.PushBotMarkersRequest.MarkerList.markers:type_name -> bot.Marker + 0, // 8: bot.BotService.PopBotTask:input_type -> bot.PopBotTaskRequest + 4, // 9: bot.BotService.PushBotMarkers:input_type -> bot.PushBotMarkersRequest + 6, // 10: bot.BotService.PushBotEvent:input_type -> bot.PushBotEventRequest + 1, // 11: bot.BotService.PopBotTask:output_type -> bot.PopBotTaskResponse + 5, // 12: bot.BotService.PushBotMarkers:output_type -> bot.PushBotMarkersResponse + 7, // 13: bot.BotService.PushBotEvent:output_type -> bot.PushBotEventResponse + 11, // [11:14] is the sub-list for method output_type + 8, // [8:11] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_bot_proto_init() } @@ -851,7 +890,7 @@ func file_bot_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_bot_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 13, NumExtensions: 0, NumServices: 1, }, diff --git a/sdk/src/beta9/abstractions/experimental/bot/bot.py b/sdk/src/beta9/abstractions/experimental/bot/bot.py index e9ee94649..785ff28af 100644 --- a/sdk/src/beta9/abstractions/experimental/bot/bot.py +++ b/sdk/src/beta9/abstractions/experimental/bot/bot.py @@ -2,6 +2,7 @@ import json import os import threading +import uuid from enum import Enum from typing import Any, Callable, Dict, List, Optional, Union @@ -34,9 +35,18 @@ class BotEventType(str, Enum): MEMORY_UPDATED = "memory_updated" SESSION_CREATED = "session_created" TRANSITION_FIRED = "transition_fired" - TASK_STARTED = "task_started" - TASK_COMPLETED = "task_completed" - TASK_FAILED = "task_failed" + TRANSITION_STARTED = "transition_started" + TRANSITION_COMPLETED = "transition_completed" + TRANSITION_FAILED = "transition_failed" + NETWORK_STATE = "network_state" + CONFIRM_REQUEST = "confirm_request" + CONFIRM_RESPONSE = "confirm_response" + + +class BotEvent(BaseModel): + type: BotEventType + value: str + metadata: dict = {} class BotTransition: @@ -62,6 +72,8 @@ class BotTransition: A description of the transition. Default is None. expose (bool): Whether or not to give the model awareness of this transition. Default is True. + confirm (bool): + Whether or not to ask the user for confirmation before running the transition. Default is False. task_policy (Optional[str]): The task policy to use for the transition. Default is None. """ @@ -78,6 +90,7 @@ def __init__( outputs: list = [], description: Optional[str] = None, expose: bool = True, + confirm: bool = False, bot_instance: Optional["Bot"] = None, # Reference to parent Bot instance task_policy: Optional[str] = None, handler: Optional[str] = None, @@ -127,6 +140,7 @@ def __init__( ], "description": description or "", "expose": expose, + "confirm": confirm, } self.bot_instance: Optional["Bot"] = bot_instance @@ -304,18 +318,19 @@ def _connect_to_session(): from prompt_toolkit import PromptSession def on_message(ws, message): - event = json.loads(message) - event_type = event.get("type") - event_value = event.get("value") + event = BotEvent(**json.loads(message)) + event_type = event.type + event_value = event.value if event_type == BotEventType.SESSION_CREATED: session_id = event_value terminal.header(f"Session started: {session_id}") terminal.header("💬 Chat with your bot below...") session_event.set() # Signal that session is ready - + elif event_type == BotEventType.NETWORK_STATE: + pass else: - terminal.print(f"\n{json.dumps(event, indent=2)}") + terminal.print(f"\n{json.dumps(event.model_dump(), indent=2)}") def on_error(ws, error): terminal.error(f"Error: {error}") @@ -334,7 +349,13 @@ def _send_user_input(): try: msg = session.prompt("# ") if msg: - ws.send(json.dumps({"msg": msg})) + ws.send( + BotEvent( + type=BotEventType.USER_MESSAGE, + value=msg, + metadata={"request_id": str(uuid.uuid4())}, + ).model_dump_json() + ) except KeyboardInterrupt: confirm = session.prompt("# Exit chat session (y/n) ") if confirm.strip().lower() == "y": diff --git a/sdk/src/beta9/abstractions/experimental/bot/types.py b/sdk/src/beta9/abstractions/experimental/bot/types.py index 9ed7cd455..5266fea9b 100644 --- a/sdk/src/beta9/abstractions/experimental/bot/types.py +++ b/sdk/src/beta9/abstractions/experimental/bot/types.py @@ -47,6 +47,8 @@ def new( return instance def push_event(cls, *, event_type: BotEventType, event_value: str): + """Send an event to the bot (supports all event types)""" + print(f"Sending bot event<{event_type}> {event_value}") cls.bot_stub.push_bot_event( PushBotEventRequest( @@ -54,25 +56,61 @@ def push_event(cls, *, event_type: BotEventType, event_value: str): session_id=cls.session_id, event_type=event_type, event_value=event_value, + metadata={ + "task_id": cls.task_id, + "session_id": cls.session_id, + "transition_name": cls.transition_name, + }, ) ) def prompt(cls, msg: str): + """Send a prompt to the user from the bot""" + cls.bot_stub.push_bot_event( PushBotEventRequest( stub_id=cls.stub_id, session_id=cls.session_id, event_type=BotEventType.TRANSITION_MESSAGE, event_value=msg, + metadata={ + "task_id": cls.task_id, + "session_id": cls.session_id, + "transition_name": cls.transition_name, + }, + ) + ) + + def say(cls, msg: str): + """Send a message to the user from the bot""" + + cls.bot_stub.push_bot_event( + PushBotEventRequest( + stub_id=cls.stub_id, + session_id=cls.session_id, + event_type=BotEventType.AGENT_MESSAGE, + event_value=msg, + metadata={ + "task_id": cls.task_id, + "session_id": cls.session_id, + "transition_name": cls.transition_name, + }, ) ) def remember(cls, obj: Any): + """Store an arbitrary object in the bot's memory (must be JSON serializable)""" + cls.bot_stub.push_bot_event( PushBotEventRequest( stub_id=cls.stub_id, session_id=cls.session_id, event_type=BotEventType.MEMORY_MESSAGE, event_value=json.dumps(obj), + metadata={ + "task_id": cls.task_id, + "session_id": cls.session_id, + "transition_name": cls.transition_name, + }, ) ) diff --git a/sdk/src/beta9/clients/bot/__init__.py b/sdk/src/beta9/clients/bot/__init__.py index 56831c905..d670523a2 100644 --- a/sdk/src/beta9/clients/bot/__init__.py +++ b/sdk/src/beta9/clients/bot/__init__.py @@ -54,6 +54,7 @@ class MarkerField(betterproto.Message): class Marker(betterproto.Message): location_name: str = betterproto.string_field(1) fields: List["MarkerField"] = betterproto.message_field(2) + source_task_id: str = betterproto.string_field(3) @dataclass(eq=False, repr=False) @@ -63,6 +64,7 @@ class PushBotMarkersRequest(betterproto.Message): markers: Dict[str, "PushBotMarkersRequestMarkerList"] = betterproto.map_field( 3, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE ) + source_task_id: str = betterproto.string_field(5) @dataclass(eq=False, repr=False) @@ -81,6 +83,9 @@ class PushBotEventRequest(betterproto.Message): session_id: str = betterproto.string_field(2) event_type: str = betterproto.string_field(3) event_value: str = betterproto.string_field(4) + metadata: Dict[str, str] = betterproto.map_field( + 5, betterproto.TYPE_STRING, betterproto.TYPE_STRING + ) @dataclass(eq=False, repr=False) diff --git a/sdk/src/beta9/runner/bot/transition.py b/sdk/src/beta9/runner/bot/transition.py index 51d1b0ce6..6bb632c8c 100644 --- a/sdk/src/beta9/runner/bot/transition.py +++ b/sdk/src/beta9/runner/bot/transition.py @@ -102,7 +102,6 @@ def _format_outputs(self, outputs: Dict[str, Any]) -> Dict[str, Any]: for field_name, field_value in marker_dict.items(): field_type = marker_annotations.get(field_name, str) - print(field_type) # Decompose the field_type to handle complex types like Optional[int] origin_type = get_origin(field_type) or field_type @@ -141,7 +140,9 @@ def run(self, inputs: Dict[str, Any]) -> BotTransitionResult: bot_stub=self.bot_stub, ) - context.push_event(event_type=BotEventType.TASK_STARTED, event_value=config.task_id) + context.push_event( + event_type=BotEventType.TRANSITION_STARTED, event_value=self.transition_name + ) try: outputs = self.handler(context=context, inputs=self._format_inputs(inputs)) @@ -200,7 +201,10 @@ def main(channel: Channel): else: push_bot_markers_response: PushBotMarkersResponse = bot_stub.push_bot_markers( PushBotMarkersRequest( - stub_id=config.stub_id, session_id=session_id, markers=result.outputs + stub_id=config.stub_id, + session_id=session_id, + markers=result.outputs, + source_task_id=task_id, ) ) if not push_bot_markers_response.ok: @@ -226,10 +230,15 @@ def main(channel: Channel): PushBotEventRequest( stub_id=config.stub_id, session_id=session_id, - event_type=BotEventType.TASK_COMPLETED + event_type=BotEventType.TRANSITION_COMPLETED if task_status == TaskStatus.Complete - else BotEventType.TASK_FAILED, - event_value=task_id, + else BotEventType.TRANSITION_FAILED, + event_value=transition_name, + metadata={ + "task_id": task_id, + "session_id": session_id, + "transition_name": transition_name, + }, ) )