Skip to content

Commit

Permalink
feat: revise trace and record data (#3110)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Aug 20, 2024
1 parent 3c156de commit efb517e
Show file tree
Hide file tree
Showing 17 changed files with 164 additions and 142 deletions.
19 changes: 10 additions & 9 deletions internal/io/mqtt/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (

"github.com/lf-edge/ekuiper/v2/internal/io/mqtt/client"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/connection"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
"github.com/lf-edge/ekuiper/v2/pkg/tracer"
)

// SourceConnector is the connector for mqtt source
Expand Down Expand Up @@ -125,17 +125,18 @@ func (ms *SourceConnector) onMessage(ctx api.StreamContext, msg pahoMqtt.Message
ms.eof(ctx)
return
}
ingestCtx := ctx
if ctx.IsTraceEnabled() {
spanCtx, span := tracer.GetTracer().Start(context.Background(), "mqtt_source")
ingestCtx = context.WithContext(spanCtx)
defer span.End()
}
ingest(ingestCtx, msg.Payload(), map[string]interface{}{
traced, spanCtx, span := tracenode.StartTrace(ctx, ctx.GetOpId())
meta := map[string]interface{}{
"topic": msg.Topic(),
"qos": msg.Qos(),
"messageId": msg.MessageID(),
}, rcvTime)
}
if traced {
meta["traceId"] = span.SpanContext().TraceID()
meta["traceCtx"] = spanCtx
defer span.End()
}
ingest(ctx, msg.Payload(), meta, rcvTime)
}

func (ms *SourceConnector) Close(ctx api.StreamContext) error {
Expand Down
30 changes: 16 additions & 14 deletions internal/topo/node/decode_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import (
"time"

"github.com/lf-edge/ekuiper/contract/v2/api"
"go.opentelemetry.io/otel/attribute"

"github.com/lf-edge/ekuiper/v2/internal/converter"
schemaLayer "github.com/lf-edge/ekuiper/v2/internal/converter/schema"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/infra"
"github.com/lf-edge/ekuiper/v2/pkg/message"
"github.com/lf-edge/ekuiper/v2/pkg/tracer"
)

// DecodeOp manages the format decoding (employ schema) and sending frequency (for batch decode, like a json array)
Expand Down Expand Up @@ -141,32 +141,27 @@ func (o *DecodeOp) Worker(ctx api.StreamContext, item any) []any {
defer o.statManager.ProcessTimeEnd()
switch d := item.(type) {
case *xsql.RawTuple:
tupleCtx := ctx
if ctx.IsTraceEnabled() {
spanCtx, span := tracer.GetTracer().Start(d.Ctx, "decode_op")
tupleCtx = context.WithContext(spanCtx)
defer span.End()
d.Ctx = tupleCtx
}
result, err := o.converter.Decode(ctx, d.Raw())
if err != nil {
return []any{err}
}

switch r := result.(type) {
case map[string]interface{}:
return []any{toTupleFromRawTuple(r, d)}
tuple := toTupleFromRawTuple(ctx, r, d)
return []any{tuple}
case []map[string]interface{}:
rr := make([]any, len(r))
for i, v := range r {
rr[i] = toTupleFromRawTuple(v, d)
tuple := toTupleFromRawTuple(ctx, v, d)
rr[i] = tuple
}
return rr
case []interface{}:
rr := make([]any, len(r))
for i, v := range r {
if vc, ok := v.(map[string]interface{}); ok {
rr[i] = toTupleFromRawTuple(vc, d)
rr[i] = toTupleFromRawTuple(ctx, vc, d)
} else {
rr[i] = fmt.Errorf("only map[string]any inside a list is supported but got: %v", v)
}
Expand Down Expand Up @@ -391,14 +386,21 @@ func mergeTuple(ctx api.StreamContext, d *xsql.Tuple, result any) {
}
}

func toTupleFromRawTuple(v map[string]any, d *xsql.RawTuple) *xsql.Tuple {
return &xsql.Tuple{
func toTupleFromRawTuple(ctx api.StreamContext, v map[string]any, d *xsql.RawTuple) *xsql.Tuple {
traced, spanCtx, span := tracenode.TraceRowTuple(ctx, d, ctx.GetOpId())
t := &xsql.Tuple{
Ctx: d.Ctx,
Message: v,
Metadata: d.Metadata,
Timestamp: d.Timestamp,
Emitter: d.Emitter,
}
if traced {
t.Ctx = spanCtx
span.SetAttributes(attribute.String(tracenode.DataKey, tracenode.ToStringRow(t)))
defer span.End()
}
return t
}

func cloneTuple(d *xsql.Tuple) *xsql.Tuple {
Expand Down
29 changes: 28 additions & 1 deletion internal/topo/node/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package node

import (
"fmt"
"time"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/pingcap/failpoint"

"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
topoContext "github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/infra"
"github.com/lf-edge/ekuiper/v2/pkg/tracer"
)

// UnOperation interface represents unary operations (i.e. Map, Filter, etc)
Expand Down Expand Up @@ -105,20 +109,29 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
}
o.statManager.IncTotalRecordsIn()
o.statManager.ProcessTimeStart()
traced, _, span := tracenode.TraceInput(ctx, data, ctx.GetOpId())
result := o.op.Apply(exeCtx, data, fv, afv)

switch val := result.(type) {
case nil:
if traced {
span.End()
}
o.statManager.IncTotalMessagesProcessed(1)
continue
case error:
logger.Errorf("Operation %s error: %s", ctx.GetOpId(), val)
if traced {
span.End()
}
o.Broadcast(val)
o.statManager.IncTotalMessagesProcessed(1)
o.statManager.IncTotalExceptions(val.Error())
continue
case []xsql.Row:
o.statManager.ProcessTimeEnd()
if traced {
span.End()
}
for _, v := range val {
o.Broadcast(v)
o.statManager.IncTotalMessagesProcessed(1)
Expand All @@ -127,6 +140,10 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
o.statManager.SetBufferLength(int64(len(o.input)))
default:
o.statManager.ProcessTimeEnd()
if traced {
tracenode.RecordRowOrCollection(val, span)
span.End()
}
o.Broadcast(val)
o.statManager.IncTotalMessagesProcessed(1)
o.statManager.IncTotalRecordsOut()
Expand All @@ -140,3 +157,13 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
}
}
}

func (o *UnaryOperator) traceUnarySplitRow(ctx, spanCtx api.StreamContext, row xsql.Row) {
if !ctx.IsTraceEnabled() || row == nil {
return
}
subCtx, span := tracer.GetTracer().Start(spanCtx, fmt.Sprintf("%s_split", ctx.GetOpId()))
defer span.End()
row.SetTracerCtx(topoContext.WithContext(subCtx))
tracenode.RecordRowOrCollection(row, span)
}
9 changes: 8 additions & 1 deletion internal/topo/node/source_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,14 @@ func (m *SourceNode) ingestBytes(ctx api.StreamContext, data []byte, meta map[st
ctx.GetLogger().Debugf("source connector %s receive data %+v", m.name, data)
m.statManager.ProcessTimeStart()
m.statManager.IncTotalRecordsIn()
tuple := &xsql.RawTuple{Ctx: ctx, Emitter: m.name, Rawdata: data, Timestamp: ts, Metadata: meta}
tuple := &xsql.RawTuple{Emitter: m.name, Rawdata: data, Timestamp: ts, Metadata: meta}
if ctx.IsTraceEnabled() {
traceCtx, ok := meta["traceCtx"].(api.StreamContext)
if ok {
tuple.SetTracerCtx(traceCtx)
delete(meta, "traceCtx")
}
}
m.Broadcast(tuple)
m.statManager.IncTotalRecordsOut()
m.statManager.IncTotalMessagesProcessed(1)
Expand Down
81 changes: 81 additions & 0 deletions internal/topo/node/tracenode/trace_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package tracenode

import (
"context"
"encoding/json"

"github.com/lf-edge/ekuiper/contract/v2/api"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

topoContext "github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/tracer"
)

const DataKey = "data"

func TraceRowTuple(ctx api.StreamContext, input *xsql.RawTuple, opName string) (bool, api.StreamContext, trace.Span) {
if !ctx.IsTraceEnabled() {
return false, nil, nil
}
spanCtx, span := tracer.GetTracer().Start(input.GetTracerCtx(), opName)
x := topoContext.WithContext(spanCtx)
return true, x, span
}

func RecordRowOrCollection(input interface{}, span trace.Span) {
switch d := input.(type) {
case xsql.Row:
span.SetAttributes(attribute.String(DataKey, ToStringRow(d)))
case xsql.Collection:
if d.Len() > 0 {
span.SetAttributes(attribute.String(DataKey, ToStringCollection(d)))
}
}
}

func TraceInput(ctx api.StreamContext, d interface{}, opName string, opts ...trace.SpanStartOption) (bool, api.StreamContext, trace.Span) {
if !ctx.IsTraceEnabled() {
return false, nil, nil
}
input, ok := d.(xsql.HasTracerCtx)
if !ok {
return false, nil, nil
}
spanCtx, span := tracer.GetTracer().Start(input.GetTracerCtx(), opName, opts...)
x := topoContext.WithContext(spanCtx)
input.SetTracerCtx(x)
return true, x, span
}

func TraceRow(ctx api.StreamContext, input xsql.Row, opName string, opts ...trace.SpanStartOption) (bool, api.StreamContext, trace.Span) {
if !ctx.IsTraceEnabled() {
return false, nil, nil
}
spanCtx, span := tracer.GetTracer().Start(input.GetTracerCtx(), opName, opts...)
x := topoContext.WithContext(spanCtx)
input.SetTracerCtx(x)
return true, x, span
}

func StartTrace(ctx api.StreamContext, opName string) (bool, api.StreamContext, trace.Span) {
if !ctx.IsTraceEnabled() {
return false, nil, nil
}
spanCtx, span := tracer.GetTracer().Start(context.Background(), opName)
ingestCtx := topoContext.WithContext(spanCtx)
return true, ingestCtx, span
}

func ToStringRow(r xsql.Row) string {
d := r.Clone().ToMap()
b, _ := json.Marshal(d)
return string(b)
}

func ToStringCollection(r xsql.Collection) string {
d := r.Clone().ToMaps()
b, _ := json.Marshal(d)
return string(b)
}
19 changes: 10 additions & 9 deletions internal/topo/node/window_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import (

"github.com/benbjohnson/clock"
"github.com/lf-edge/ekuiper/contract/v2/api"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
topoContext "github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/infra"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
"github.com/lf-edge/ekuiper/v2/pkg/tracer"
)

type WindowConfig struct {
Expand Down Expand Up @@ -698,10 +699,10 @@ func (o *WindowOperator) isMatchCondition(ctx api.StreamContext, d *xsql.Tuple)
}

func (o *WindowOperator) handleTraceIngestTuple(ctx api.StreamContext, t *xsql.Tuple) {
if ctx.IsTraceEnabled() {
spanCtx, span := tracer.GetTracer().Start(t.GetTracerCtx(), "window_op_ingest")
traced, _, span := tracenode.TraceRow(ctx, t, "window_op_ingest")
if traced {
span.SetAttributes(attribute.String(tracenode.DataKey, tracenode.ToStringRow(t)))
span.End()
t.SetTracerCtx(topoContext.WithContext(spanCtx))
o.tupleSpanMap[t] = struct{}{}
}
}
Expand All @@ -724,24 +725,24 @@ func (o *WindowOperator) handleTraceEmitTuple(ctx api.StreamContext, wt *xsql.Wi
if ok {
_, stored := o.tupleSpanMap[t]
if stored {
spanCtx, newSpan := tracer.GetTracer().Start(t.GetTracerCtx(), "window_op_emit", trace.WithLinks(o.nextLink))
newSpan.End()
t.SetTracerCtx(topoContext.WithContext(spanCtx))
_, _, span := tracenode.TraceRow(ctx, t, "window_op_emit", trace.WithLinks(o.nextLink))
span.End()
}
}
}
wt.SetTracerCtx(topoContext.WithContext(o.nextSpanCtx))
// discard span if windowTuple is empty
if len(wt.Content) > 0 {
tracenode.RecordRowOrCollection(wt, o.nextSpan)
o.nextSpan.End()
}
o.handleNextWindowTupleSpan(ctx)
}
}

func (o *WindowOperator) handleNextWindowTupleSpan(ctx api.StreamContext) {
if ctx.IsTraceEnabled() {
spanCtx, span := tracer.GetTracer().Start(context.Background(), "window_op")
traced, spanCtx, span := tracenode.StartTrace(ctx, "window_op")
if traced {
o.nextSpanCtx = spanCtx
o.nextSpan = span
o.nextLink = trace.Link{
Expand Down
7 changes: 0 additions & 7 deletions internal/topo/operator/aggregate_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import (

"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/tracer"
)

type AggregateOp struct {
Expand All @@ -42,11 +40,6 @@ func (p *AggregateOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fu
case error:
return input
case xsql.Collection:
if ctx.IsTraceEnabled() {
spanCtx, span := tracer.GetTracer().Start(input.GetTracerCtx(), "aggregate_op")
input.SetTracerCtx(context.WithContext(spanCtx))
defer span.End()
}
wr := input.GetWindowRange()
result := make(map[string]*xsql.GroupedTuples)
err := input.Range(func(i int, ir xsql.ReadonlyRow) (bool, error) {
Expand Down
Loading

0 comments on commit efb517e

Please sign in to comment.