From 10c0b529e31cfbab51f0fcccb89f6cf5a79fa6f1 Mon Sep 17 00:00:00 2001 From: Gal Kleinman Date: Tue, 30 Jan 2024 14:49:06 +0200 Subject: [PATCH] introduce workflow and task --- go.work.sum | 9 +++ sample-app/main.go | 7 +- sample-app/workflow_example.go | 117 +++++++++++++++++++++++++++++++++ traceloop-sdk/sdk.go | 14 ++-- traceloop-sdk/tracing_types.go | 5 +- traceloop-sdk/workflow.go | 69 +++++++++++++++++++ 6 files changed, 209 insertions(+), 12 deletions(-) create mode 100644 sample-app/workflow_example.go create mode 100644 traceloop-sdk/workflow.go diff --git a/go.work.sum b/go.work.sum index 869a847..fbbefe0 100644 --- a/go.work.sum +++ b/go.work.sum @@ -2,31 +2,40 @@ cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdi cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= +github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371 h1:kkhsdkhsCvIsutKu5zLMgWtgh9YxGCNAw8Ad8hjwfYg= github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs= github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/gliderlabs/ssh v0.3.5/go.mod h1:8XB4KraRrX39qHhT6yxPsHedjA08I/uBVwj4xC+/+z4= github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4= github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4= github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFzPPsI= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/skeema/knownhosts v1.2.1 h1:SHWdIUa82uGZz+F+47k8SY4QhhI291cXCpopT1lK2AQ= github.com/skeema/knownhosts v1.2.1/go.mod h1:xYbVRSPxqBZFrdmDyMmsOs+uX1UZC3nTN3ThzgDxUwo= +github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0= diff --git a/sample-app/main.go b/sample-app/main.go index 9f07617..a1708f4 100644 --- a/sample-app/main.go +++ b/sample-app/main.go @@ -10,7 +10,7 @@ import ( tlp "github.com/traceloop/go-openllmetry/traceloop-sdk" ) -func main() { +func workflow_example() { ctx := context.Background() traceloop, err := tlp.NewClient(ctx, tlp.Config{ @@ -47,9 +47,8 @@ func main() { Model: request.Model, Messages: promptMsgs, }, - tlp.TraceloopAttributes{ - WorkflowName: "example-workflow", - EntityName: "example-entity", + tlp.WorkflowAttributes{ + Name: "example-workflow", }, ) if err != nil { diff --git a/sample-app/workflow_example.go b/sample-app/workflow_example.go new file mode 100644 index 0000000..5022985 --- /dev/null +++ b/sample-app/workflow_example.go @@ -0,0 +1,117 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/sashabaranov/go-openai" + tlp "github.com/traceloop/go-openllmetry/traceloop-sdk" +) + +func main() { + ctx := context.Background() + + traceloop, err := tlp.NewClient(ctx, tlp.Config{ + BaseURL: "api-staging.traceloop.com", + APIKey: os.Getenv("TRACELOOP_API_KEY"), + }) + defer func() { traceloop.Shutdown(ctx) }() + + if err != nil { + fmt.Printf("NewClient error: %v\n", err) + return + } + + wf := traceloop.NewWorkflow(ctx, tlp.WorkflowAttributes{ + Name: "history_generation", + }) + defer wf.End() + + factGenTask := wf.NewTask("current_date_fact_generation") + defer factGenTask.End() + + request, err := traceloop.GetOpenAIChatCompletionRequest("example-prompt", map[string]interface{}{ "date": time.Now().Format("01/02") }) + if err != nil { + fmt.Printf("GetOpenAIChatCompletionRequest error: %v\n", err) + return + } + + var promptMsgs []tlp.Message + for i, message := range request.Messages { + promptMsgs = append(promptMsgs, tlp.Message{ + Index: i, + Content: message.Content, + Role: message.Role, + }) + } + + llmSpan, err := factGenTask.LogPrompt( + tlp.Prompt{ + Vendor: "openai", + Mode: "chat", + Model: request.Model, + Messages: promptMsgs, + }, + ) + if err != nil { + fmt.Printf("LogPrompt error: %v\n", err) + return + } + + client := openai.NewClient(os.Getenv("OPENAI_API_KEY")) + resp, err := client.CreateChatCompletion( + context.Background(), + *request, + ) + if err != nil { + fmt.Printf("ChatCompletion error: %v\n", err) + return + } + + var completionMsgs []tlp.Message + for _, choice := range resp.Choices { + completionMsgs = append(completionMsgs, tlp.Message{ + Index: choice.Index, + Content: choice.Message.Content, + Role: choice.Message.Role, + }) + } + + llmSpan.LogCompletion(ctx, tlp.Completion{ + Model: resp.Model, + Messages: completionMsgs, + }, tlp.Usage{ + TotalTokens: resp.Usage.TotalTokens, + CompletionTokens: resp.Usage.CompletionTokens, + PromptTokens: resp.Usage.PromptTokens, + }) + + someOtherTask := wf.NewTask("some_other_task") + defer someOtherTask.End() + + otherPrompt, _ := someOtherTask.LogPrompt(tlp.Prompt{ + Vendor: "openai", + Mode: "chat", + Model: request.Model, + Messages: []tlp.Message{ + { + Index: 0, + Content: "some other prompt", + Role: "user", + }, + }, + }) + + otherPrompt.LogCompletion(ctx, tlp.Completion{ + Model: resp.Model, + Messages: completionMsgs, + }, tlp.Usage{ + TotalTokens: resp.Usage.TotalTokens, + CompletionTokens: resp.Usage.CompletionTokens, + PromptTokens: resp.Usage.PromptTokens, + }) + + fmt.Println(resp.Choices[0].Message.Content) +} diff --git a/traceloop-sdk/sdk.go b/traceloop-sdk/sdk.go index ddf02da..b953f52 100644 --- a/traceloop-sdk/sdk.go +++ b/traceloop-sdk/sdk.go @@ -3,6 +3,7 @@ package traceloop import ( "context" "fmt" + "log" "net/http" "os" "time" @@ -62,7 +63,7 @@ func (instance *Traceloop) initialize(ctx context.Context) error { } } - fmt.Printf("Traceloop %s SDK initialized. Connecting to %s\n", Version(), instance.config.BaseURL) + log.Printf("Traceloop %s SDK initialized. Connecting to %s\n", Version(), instance.config.BaseURL) instance.pollPrompts() err := instance.initTracer(ctx, instance.config.ServiceName) @@ -91,16 +92,19 @@ func (instance *Traceloop) tracerName() string { } } -func (instance *Traceloop) LogPrompt(ctx context.Context, prompt Prompt, traceloopAttrs TraceloopAttributes) (LLMSpan, error) { +func (instance *Traceloop) getTracer() apitrace.Tracer { + return (*instance.tracerProvider).Tracer(instance.tracerName()) +} + +func (instance *Traceloop) LogPrompt(ctx context.Context, prompt Prompt, workflowAttrs WorkflowAttributes) (LLMSpan, error) { spanName := fmt.Sprintf("%s.%s", prompt.Vendor, prompt.Mode) - _, span := (*instance.tracerProvider).Tracer(instance.tracerName()).Start(ctx, spanName) + _, span := instance.getTracer().Start(ctx, spanName) span.SetAttributes( semconvai.LLMVendor.String(prompt.Vendor), semconvai.LLMRequestModel.String(prompt.Model), semconvai.LLMRequestType.String(prompt.Mode), - semconvai.TraceloopWorkflowName.String(traceloopAttrs.WorkflowName), - semconvai.TraceloopEntityName.String(traceloopAttrs.EntityName), + semconvai.TraceloopWorkflowName.String(workflowAttrs.Name), ) setMessagesAttribute(span, "llm.prompts", prompt.Messages) diff --git a/traceloop-sdk/tracing_types.go b/traceloop-sdk/tracing_types.go index b81d54a..41f9960 100644 --- a/traceloop-sdk/tracing_types.go +++ b/traceloop-sdk/tracing_types.go @@ -23,9 +23,8 @@ type Completion struct { Messages []Message `json:"messages"` } -type TraceloopAttributes struct { - WorkflowName string `json:"workflow_name"` - EntityName string `json:"entity_name"` +type WorkflowAttributes struct { + Name string `json:"workflow_name"` AssociationProperties map[string]string `json:"association_properties"` } diff --git a/traceloop-sdk/workflow.go b/traceloop-sdk/workflow.go new file mode 100644 index 0000000..246c4bd --- /dev/null +++ b/traceloop-sdk/workflow.go @@ -0,0 +1,69 @@ +package traceloop + +import ( + "context" + "fmt" + + semconvai "github.com/traceloop/go-openllmetry/semconv-ai" + "go.opentelemetry.io/otel/trace" +) + +type Workflow struct { + sdk *Traceloop + ctx context.Context + Attributes WorkflowAttributes `json:"workflow_attributes"` +} + +type Task struct { + workflow *Workflow + ctx context.Context + Name string `json:"name"` +} + +func (instance *Traceloop) NewWorkflow(ctx context.Context, attrs WorkflowAttributes) *Workflow { + wCtx, span := instance.getTracer().Start(ctx, fmt.Sprintf("%s.workflow", attrs.Name), trace.WithNewRoot()) + + span.SetAttributes( + semconvai.TraceloopWorkflowName.String(attrs.Name), + semconvai.TraceloopSpanKind.String("task"), + semconvai.TraceloopEntityName.String(attrs.Name), + ) + + return &Workflow{ + sdk: instance, + ctx: wCtx, + Attributes: attrs, + } +} + +func (workflow *Workflow) End() { + trace.SpanFromContext(workflow.ctx).End() +} + +func (workflow *Workflow) LogPrompt(prompt Prompt) (LLMSpan, error) { + return workflow.sdk.LogPrompt(workflow.ctx, prompt, workflow.Attributes) +} + +func (workflow *Workflow) NewTask(name string) *Task { + tCtx, span := workflow.sdk.getTracer().Start(workflow.ctx, fmt.Sprintf("%s.task", name)) + + span.SetAttributes( + semconvai.TraceloopWorkflowName.String(workflow.Attributes.Name), + semconvai.TraceloopSpanKind.String("task"), + semconvai.TraceloopEntityName.String(name), + ) + + return &Task{ + workflow: workflow, + ctx: tCtx, + Name: name, + } +} + +func (task *Task) End() { + trace.SpanFromContext(task.ctx).End() +} + +func (task *Task) LogPrompt(prompt Prompt) (LLMSpan, error) { + return task.workflow.sdk.LogPrompt(task.ctx, prompt, task.workflow.Attributes) +}