Skip to content

Commit

Permalink
introduce workflow and task
Browse files Browse the repository at this point in the history
  • Loading branch information
galkleinman committed Jan 30, 2024
1 parent 8dda467 commit 10c0b52
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 12 deletions.
9 changes: 9 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,40 @@ cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdi
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371 h1:kkhsdkhsCvIsutKu5zLMgWtgh9YxGCNAw8Ad8hjwfYg=
github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs=
github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA=
github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
github.com/gliderlabs/ssh v0.3.5/go.mod h1:8XB4KraRrX39qHhT6yxPsHedjA08I/uBVwj4xC+/+z4=
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFzPPsI=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/skeema/knownhosts v1.2.1 h1:SHWdIUa82uGZz+F+47k8SY4QhhI291cXCpopT1lK2AQ=
github.com/skeema/knownhosts v1.2.1/go.mod h1:xYbVRSPxqBZFrdmDyMmsOs+uX1UZC3nTN3ThzgDxUwo=
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=
github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0=
Expand Down
7 changes: 3 additions & 4 deletions sample-app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
tlp "github.com/traceloop/go-openllmetry/traceloop-sdk"
)

func main() {
func workflow_example() {
ctx := context.Background()

traceloop, err := tlp.NewClient(ctx, tlp.Config{
Expand Down Expand Up @@ -47,9 +47,8 @@ func main() {
Model: request.Model,
Messages: promptMsgs,
},
tlp.TraceloopAttributes{
WorkflowName: "example-workflow",
EntityName: "example-entity",
tlp.WorkflowAttributes{
Name: "example-workflow",
},
)
if err != nil {
Expand Down
117 changes: 117 additions & 0 deletions sample-app/workflow_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/sashabaranov/go-openai"
tlp "github.com/traceloop/go-openllmetry/traceloop-sdk"
)

func main() {
ctx := context.Background()

traceloop, err := tlp.NewClient(ctx, tlp.Config{
BaseURL: "api-staging.traceloop.com",
APIKey: os.Getenv("TRACELOOP_API_KEY"),
})
defer func() { traceloop.Shutdown(ctx) }()

if err != nil {
fmt.Printf("NewClient error: %v\n", err)
return
}

wf := traceloop.NewWorkflow(ctx, tlp.WorkflowAttributes{
Name: "history_generation",
})
defer wf.End()

factGenTask := wf.NewTask("current_date_fact_generation")
defer factGenTask.End()

request, err := traceloop.GetOpenAIChatCompletionRequest("example-prompt", map[string]interface{}{ "date": time.Now().Format("01/02") })
if err != nil {
fmt.Printf("GetOpenAIChatCompletionRequest error: %v\n", err)
return
}

var promptMsgs []tlp.Message
for i, message := range request.Messages {
promptMsgs = append(promptMsgs, tlp.Message{
Index: i,
Content: message.Content,
Role: message.Role,
})
}

llmSpan, err := factGenTask.LogPrompt(
tlp.Prompt{
Vendor: "openai",
Mode: "chat",
Model: request.Model,
Messages: promptMsgs,
},
)
if err != nil {
fmt.Printf("LogPrompt error: %v\n", err)
return
}

client := openai.NewClient(os.Getenv("OPENAI_API_KEY"))
resp, err := client.CreateChatCompletion(
context.Background(),
*request,
)
if err != nil {
fmt.Printf("ChatCompletion error: %v\n", err)
return
}

var completionMsgs []tlp.Message
for _, choice := range resp.Choices {
completionMsgs = append(completionMsgs, tlp.Message{
Index: choice.Index,
Content: choice.Message.Content,
Role: choice.Message.Role,
})
}

llmSpan.LogCompletion(ctx, tlp.Completion{
Model: resp.Model,
Messages: completionMsgs,
}, tlp.Usage{
TotalTokens: resp.Usage.TotalTokens,
CompletionTokens: resp.Usage.CompletionTokens,
PromptTokens: resp.Usage.PromptTokens,
})

someOtherTask := wf.NewTask("some_other_task")
defer someOtherTask.End()

otherPrompt, _ := someOtherTask.LogPrompt(tlp.Prompt{
Vendor: "openai",
Mode: "chat",
Model: request.Model,
Messages: []tlp.Message{
{
Index: 0,
Content: "some other prompt",
Role: "user",
},
},
})

otherPrompt.LogCompletion(ctx, tlp.Completion{
Model: resp.Model,
Messages: completionMsgs,
}, tlp.Usage{
TotalTokens: resp.Usage.TotalTokens,
CompletionTokens: resp.Usage.CompletionTokens,
PromptTokens: resp.Usage.PromptTokens,
})

fmt.Println(resp.Choices[0].Message.Content)
}
14 changes: 9 additions & 5 deletions traceloop-sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package traceloop
import (
"context"
"fmt"
"log"
"net/http"
"os"
"time"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (instance *Traceloop) initialize(ctx context.Context) error {
}
}

fmt.Printf("Traceloop %s SDK initialized. Connecting to %s\n", Version(), instance.config.BaseURL)
log.Printf("Traceloop %s SDK initialized. Connecting to %s\n", Version(), instance.config.BaseURL)

instance.pollPrompts()
err := instance.initTracer(ctx, instance.config.ServiceName)
Expand Down Expand Up @@ -91,16 +92,19 @@ func (instance *Traceloop) tracerName() string {
}
}

func (instance *Traceloop) LogPrompt(ctx context.Context, prompt Prompt, traceloopAttrs TraceloopAttributes) (LLMSpan, error) {
func (instance *Traceloop) getTracer() apitrace.Tracer {
return (*instance.tracerProvider).Tracer(instance.tracerName())
}

func (instance *Traceloop) LogPrompt(ctx context.Context, prompt Prompt, workflowAttrs WorkflowAttributes) (LLMSpan, error) {
spanName := fmt.Sprintf("%s.%s", prompt.Vendor, prompt.Mode)
_, span := (*instance.tracerProvider).Tracer(instance.tracerName()).Start(ctx, spanName)
_, span := instance.getTracer().Start(ctx, spanName)

span.SetAttributes(
semconvai.LLMVendor.String(prompt.Vendor),
semconvai.LLMRequestModel.String(prompt.Model),
semconvai.LLMRequestType.String(prompt.Mode),
semconvai.TraceloopWorkflowName.String(traceloopAttrs.WorkflowName),
semconvai.TraceloopEntityName.String(traceloopAttrs.EntityName),
semconvai.TraceloopWorkflowName.String(workflowAttrs.Name),
)

setMessagesAttribute(span, "llm.prompts", prompt.Messages)
Expand Down
5 changes: 2 additions & 3 deletions traceloop-sdk/tracing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ type Completion struct {
Messages []Message `json:"messages"`
}

type TraceloopAttributes struct {
WorkflowName string `json:"workflow_name"`
EntityName string `json:"entity_name"`
type WorkflowAttributes struct {
Name string `json:"workflow_name"`
AssociationProperties map[string]string `json:"association_properties"`
}

Expand Down
69 changes: 69 additions & 0 deletions traceloop-sdk/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package traceloop

import (
"context"
"fmt"

semconvai "github.com/traceloop/go-openllmetry/semconv-ai"
"go.opentelemetry.io/otel/trace"
)

type Workflow struct {
sdk *Traceloop
ctx context.Context
Attributes WorkflowAttributes `json:"workflow_attributes"`
}

type Task struct {
workflow *Workflow
ctx context.Context
Name string `json:"name"`
}

func (instance *Traceloop) NewWorkflow(ctx context.Context, attrs WorkflowAttributes) *Workflow {
wCtx, span := instance.getTracer().Start(ctx, fmt.Sprintf("%s.workflow", attrs.Name), trace.WithNewRoot())

span.SetAttributes(
semconvai.TraceloopWorkflowName.String(attrs.Name),
semconvai.TraceloopSpanKind.String("task"),
semconvai.TraceloopEntityName.String(attrs.Name),
)

return &Workflow{
sdk: instance,
ctx: wCtx,
Attributes: attrs,
}
}

func (workflow *Workflow) End() {
trace.SpanFromContext(workflow.ctx).End()
}

func (workflow *Workflow) LogPrompt(prompt Prompt) (LLMSpan, error) {
return workflow.sdk.LogPrompt(workflow.ctx, prompt, workflow.Attributes)
}

func (workflow *Workflow) NewTask(name string) *Task {
tCtx, span := workflow.sdk.getTracer().Start(workflow.ctx, fmt.Sprintf("%s.task", name))

span.SetAttributes(
semconvai.TraceloopWorkflowName.String(workflow.Attributes.Name),
semconvai.TraceloopSpanKind.String("task"),
semconvai.TraceloopEntityName.String(name),
)

return &Task{
workflow: workflow,
ctx: tCtx,
Name: name,
}
}

func (task *Task) End() {
trace.SpanFromContext(task.ctx).End()
}

func (task *Task) LogPrompt(prompt Prompt) (LLMSpan, error) {
return task.workflow.sdk.LogPrompt(task.ctx, prompt, task.workflow.Attributes)
}

0 comments on commit 10c0b52

Please sign in to comment.