Skip to content

Commit

Permalink
feat: server support skywalking integration
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Oct 11, 2024
1 parent 734f831 commit 3b09a85
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 1 deletion.
54 changes: 54 additions & 0 deletions server/ingester/flow_log/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/golang/protobuf/proto"
logging "github.com/op/go-logging"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"

"github.com/deepflowio/deepflow/server/ingester/common"
"github.com/deepflowio/deepflow/server/ingester/exporters"
Expand Down Expand Up @@ -153,6 +154,8 @@ func (d *Decoder) Run() {
decoder := &codec.SimpleDecoder{}
pbTaggedFlow := pb.NewTaggedFlow()
pbTracesData := &v1.TracesData{}
pbSkywalkingData := &pb.SkyWalkingExtra{}
pbSegmentData := &agentV3.SegmentObject{}
for {
n := d.inQueue.Gets(buffer)
start := time.Now()
Expand Down Expand Up @@ -181,6 +184,8 @@ func (d *Decoder) Run() {
d.handleOpenTelemetry(decoder, pbTracesData, true)
case datatype.MESSAGE_TYPE_PACKETSEQUENCE:
d.handleL4Packet(decoder)
case datatype.MESSAGE_TYPE_SKYWALKING:
d.handleSkyWalking(decoder, pbSkywalkingData, pbSegmentData, false)
default:
log.Warningf("unknown msg type: %d", d.msgType)

Expand Down Expand Up @@ -258,6 +263,55 @@ func (d *Decoder) handleOpenTelemetry(decoder *codec.SimpleDecoder, pbTracesData
}
}

func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingData *pb.SkyWalkingExtra, pbSegmentData *agentV3.SegmentObject, compressed bool) {
var err error
for !decoder.IsEnd() {
pbSkyWalkingData.Reset()
bytes := decoder.ReadBytes()
log.Warningf("skywalking data decode bytes: %v", bytes)
if len(bytes) > 0 {
if compressed {
bytes, err = decompressOpenTelemetry(bytes)
}
if err == nil {
err = proto.Unmarshal(bytes, pbSkyWalkingData)
if err == nil {
err = proto.Unmarshal(pbSkyWalkingData.Data, pbSegmentData)
}
}
}
if decoder.Failed() || err != nil {
if d.counter.ErrorCount == 0 {
log.Errorf("skywalking data decode failed, offset=%d len=%d err: %s", decoder.Offset(), len(decoder.Bytes()), err)
}
d.counter.ErrorCount++
continue
}
d.sendSkyWalking(pbSegmentData, pbSkyWalkingData.PeerIp)
}
}

func (d *Decoder) sendSkyWalking(skywalkingExtra *agentV3.SegmentObject, peerIP []byte) {
if d.debugEnabled {
log.Debugf("decoder %d vtap %d recv skywalking data: %s", d.index, d.agentId, skywalkingExtra)
}
d.counter.Count++
ls := log_data.SkyWalkingDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, skywalkingExtra, peerIP, d.platformData, d.cfg)
for _, l := range ls {
l.AddReferenceCount()
if !d.throttler.SendWithThrottling(l) {
d.counter.DropCount++
} else {
d.fieldsBuf, d.fieldValuesBuf = d.fieldsBuf[:0], d.fieldValuesBuf[:0]
l.GenerateNewFlowTags(d.flowTagWriter.Cache)
d.flowTagWriter.WriteFieldsAndFieldValuesInCache()
d.appServiceTagWrite(l)
d.spanWrite(l)
}
l.Release()
}
}

func (d *Decoder) sendOpenMetetry(tracesData *v1.TracesData) {
if d.debugEnabled {
log.Debugf("decoder %d vtap %d recv otel: %s", d.index, d.agentId, tracesData)
Expand Down
12 changes: 12 additions & 0 deletions server/ingester/flow_log/flow_log/flow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type FlowLog struct {
OtelLogger *Logger
OtelCompressedLogger *Logger
L4PacketLogger *Logger
SkyWalkingLogger *Logger
Exporters *exporters.Exporters
SpanWriter *dbwriter.SpanWriter
TraceTreeWriter *dbwriter.TraceTreeWriter
Expand Down Expand Up @@ -114,6 +115,10 @@ func NewFlowLog(config *config.Config, traceTreeQueue *queue.OverwriteQueue, rec
if err != nil {
return nil, err
}
skywalkingLogger, err := NewLogger(datatype.MESSAGE_TYPE_SKYWALKING, config, platformDataManager, manager, recv, flowLogWriter, common.L7_FLOW_ID, nil, spanWriter)
if err != nil {
return nil, err
}
l4PacketLogger, err := NewLogger(datatype.MESSAGE_TYPE_PACKETSEQUENCE, config, nil, manager, recv, flowLogWriter, common.L4_PACKET_ID, nil, nil)
if err != nil {
return nil, err
Expand All @@ -124,6 +129,7 @@ func NewFlowLog(config *config.Config, traceTreeQueue *queue.OverwriteQueue, rec
L7FlowLogger: l7FlowLogger,
OtelLogger: otelLogger,
OtelCompressedLogger: otelCompressedLogger,
SkyWalkingLogger: skywalkingLogger,
L4PacketLogger: l4PacketLogger,
Exporters: exporters,
SpanWriter: spanWriter,
Expand Down Expand Up @@ -356,6 +362,9 @@ func (s *FlowLog) Start() {
if s.OtelCompressedLogger != nil {
s.OtelCompressedLogger.Start()
}
if s.SkyWalkingLogger != nil {
s.SkyWalkingLogger.Start()
}
if s.SpanWriter != nil {
s.SpanWriter.Start()
}
Expand All @@ -380,6 +389,9 @@ func (s *FlowLog) Close() error {
if s.OtelCompressedLogger != nil {
s.OtelCompressedLogger.Close()
}
if s.SkyWalkingLogger != nil {
s.SkyWalkingLogger.Close()
}
if s.SpanWriter != nil {
s.SpanWriter.Close()
}
Expand Down
262 changes: 262 additions & 0 deletions server/ingester/flow_log/log_data/sw_import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package log_data

import (
"fmt"
"net"
"strconv"
"time"

flowlogCfg "github.com/deepflowio/deepflow/server/ingester/flow_log/config"
"github.com/deepflowio/deepflow/server/libs/datatype"
flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics"
"github.com/deepflowio/deepflow/server/libs/grpc"
"github.com/deepflowio/deepflow/server/libs/utils"
otelV1 "go.opentelemetry.io/proto/otlp/trace/v1"
agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

const (
AttributeRefType = "refType"
AttributeParentService = "parent.service"
AttributeParentInstance = "parent.service.instance"
AttributeParentEndpoint = "parent.endpoint"
AttributeSkywalkingSpanID = "sw8.span_id"
AttributeSkywalkingTraceID = "sw8.trace_id"
AttributeSkywalkingSegmentID = "sw8.segment_id"
AttributeSkywalkingParentSpanID = "sw8.parent_span_id"
AttributeSkywalkingParentSegmentID = "sw8.parent_segment_id"
AttributeNetworkAddressUsedAtPeer = "network.AddressUsedAtPeer"
)

func SkyWalkingDataToL7FlowLogs(vtapID, orgId, teamId uint16, segment *agentV3.SegmentObject, peerIP []byte, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*L7FlowLog {
if segment.GetSpans() == nil && len(segment.GetSpans()) == 0 {
return nil
}

ip := net.ParseIP(string(peerIP))
ret := make([]*L7FlowLog, 0, len(segment.GetSpans()))
for _, span := range segment.GetSpans() {
if span == nil {
continue
}
ret = append(ret, swSpanToL7FlowLog(
vtapID,
orgId,
teamId,
segment.GetTraceId(),
segment.GetTraceSegmentId(),
segment.GetService(),
segment.GetServiceInstance(),
span,
ip,
platformData,
cfg))
}
return ret
}

func swSpanToL7FlowLog(vtapID, orgId, teamId uint16, traceID, segmentID, appService, appInstance string, span *agentV3.SpanObject, ip net.IP, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) *L7FlowLog {
h := AcquireL7FlowLog()

// metadata
h._id = genID(uint32(span.GetEndTime()/int64(time.Second)), &L7FlowLogCounter, platformData.QueryAnalyzerID())
h.VtapID, h.OrgId, h.TeamID = vtapID, orgId, teamId
h.TraceId, h.AppService, h.AppInstance = traceID, appService, appInstance
h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex)

// sw.span.OperationName -> otel.Span.Name -> l7_flow_log.Endpoint
h.Endpoint = span.OperationName
// unit: ms
h.L7Base.StartTime = span.GetStartTime()
h.L7Base.EndTime = span.GetEndTime()

h.AttributeNames = make([]string, 0, len(span.Tags))
h.AttributeValues = make([]string, 0, len(span.Tags))

swSpanIDToFlowLogSpanID(segmentID, span.GetSpanId(), span.GetParentSpanId(), span.GetRefs(), h)
swTagsToAttr(span, h)
swInternalErrorToStatus(span, h)
swSpanLayerToSpanKind(span, h)

swLogsToAttrs(span.GetLogs(), h)
swReferencesToAttrs(span.Refs, h)

if ip != nil {
if ip4 := ip.To4(); ip4 != nil {
if h.TapSideEnum == uint8(flow_metrics.ClientApp) {
h.IP40 = utils.IpToUint32(ip4)
} else {
h.IP41 = utils.IpToUint32(ip4)
}
} else {
h.IsIPv4 = false
if h.TapSideEnum == uint8(flow_metrics.ClientApp) {
h.IP60 = ip
} else {
h.IP61 = ip
}
}
}

return h
}

func swTagsToAttr(span *agentV3.SpanObject, h *L7FlowLog) {
if span == nil || span.Tags == nil {
return
}

for _, tag := range span.Tags {
h.AttributeNames = append(h.AttributeNames, tag.Key)
h.AttributeValues = append(h.AttributeValues, tag.Value)

// some key parse to specific columns
switch tag.Key {
case "url":
// sw.tags.`url` -> otel.resource_span.attributes.`http.url` -> l7_flow_log.RequestResource
parsedURLPath, err := parseUrlPath(tag.Value)
if err != nil {
log.Debugf("http.url (%s) parse failed: %s", tag.Value, err)
} else {
h.RequestResource = parsedURLPath
}
case "status_code":
// sw.tags.`status_code` -> otel.resource_span.attributes.`http.status_code` -> l7_flow_log.ResponseCode
statusCode, _ := strconv.Atoi(tag.Value)
h.responseCode = int32(statusCode)
h.ResponseCode = &h.responseCode
case "db.type":
// sw.tags.`db.type` -> otel.resource_span.attributes.`db.system` -> l7_flow_log.L7ProtocolStr
h.L7ProtocolStr = tag.Value
case "db.instance":
// sw.tags.`db.instance` -> otel.resource_span.attributes.`db.name`
// TODO: undefined in l7_flow_log
case "mq.broker":
// sw.tags.`mq.broker` -> otel.resource_span.attributes.`net.peer.name`
// TODO: undefined in l7_flow_log
default:
}
}
}

func swSpanIDToFlowLogSpanID(segmentID string, spanID, parentSpanID int32, refs []*agentV3.SegmentReference, dest *L7FlowLog) {
// expected format: segmentid-spanid
dest.SpanId = fmt.Sprintf("%s-%d", segmentID, spanID)

if parentSpanID != -1 {
// parent spanid = -1, means root span without parent span in current skywalking segment, so it is necessary to search for the parent segment.
dest.ParentSpanId = fmt.Sprintf("%s-%d", segmentID, parentSpanID)
} else if len(refs) == 1 {
// 注意这里 OTel v0.79.0 之后的 skywalkingreceiver 实现把 segmentIDToSpanID(ref[0].ParentTraceSegmentId, ref[0].ParentSpanId) 结果(异或计算)转成了 ParentSpanID
// 这会导致 DeepFlow 里误判 ParentSpanID 认为有值直接写入,但这个值是错的,这里期望值是: "ref[0].ParentTraceSegmentId"-"ref[0].ParentSpanId"
// 所以这里不能直接用 skywalkingreceiver 的实现
// TODO: SegmentReference references usually have only one element, but in batch consumer case, such as in MQ or async batch process, it could be multiple.
// We only handle one element for now.
ref0 := refs[0]
if ref0.RefType.String() == agentV3.RefType_CrossProcess.String() || ref0.RefType.String() == agentV3.RefType_CrossThread.String() {
if ref0.ParentTraceSegmentId != "" && ref0.ParentSpanId >= 0 {
dest.ParentSpanId = fmt.Sprintf("%s-%d", ref0.ParentTraceSegmentId, ref0.ParentSpanId)
}
}
}

// original segmentID & spanID
dest.AttributeNames = append(dest.AttributeNames, AttributeSkywalkingSegmentID, AttributeSkywalkingSpanID)
dest.AttributeValues = append(dest.AttributeValues, segmentID, strconv.Itoa(int(spanID)))
if parentSpanID != -1 {
dest.AttributeNames = append(dest.AttributeNames, AttributeSkywalkingParentSpanID)
dest.AttributeValues = append(dest.AttributeValues, strconv.Itoa(int(parentSpanID)))
}
}

func swInternalErrorToStatus(span *agentV3.SpanObject, dest *L7FlowLog) {
if dest.responseCode != 0 {
// if dest.responseCode set by `status_code`, use it first
dest.ResponseStatus = uint8(httpCodeToResponseStatus(dest.responseCode))
if dest.ResponseStatus == uint8(datatype.STATUS_CLIENT_ERROR) || dest.ResponseStatus == uint8(datatype.STATUS_SERVER_ERROR) {
dest.ResponseException = GetHTTPExceptionDesc(uint16(dest.responseCode))
}
} else {
// if not set, use span Error instead
if span.GetIsError() {
dest.responseCode = int32(datatype.STATUS_SERVER_ERROR)
dest.ResponseException = GetHTTPExceptionDesc(uint16(dest.responseCode))
} else {
dest.responseCode = int32(datatype.STATUS_OK)
}
dest.ResponseStatus = uint8(httpCodeToResponseStatus(dest.responseCode))
}
}

func swSpanLayerToSpanKind(span *agentV3.SpanObject, dest *L7FlowLog) {
switch {
case span.SpanLayer == agentV3.SpanLayer_MQ:
if span.SpanType == agentV3.SpanType_Entry {
dest.TapSideEnum = uint8(flow_metrics.ServerApp)
dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_CONSUMER)
} else if span.SpanType == agentV3.SpanType_Exit {
dest.TapSideEnum = uint8(flow_metrics.ClientApp)
dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_PRODUCER)
}
case span.GetSpanType() == agentV3.SpanType_Exit:
dest.TapSideEnum = uint8(flow_metrics.ClientApp)
dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_CLIENT)
case span.GetSpanType() == agentV3.SpanType_Entry:
dest.TapSideEnum = uint8(flow_metrics.ServerApp)
dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_SERVER)
case span.GetSpanType() == agentV3.SpanType_Local:
dest.TapSideEnum = uint8(flow_metrics.App)
dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_INTERNAL)
default:
dest.TapSideEnum = uint8(flow_metrics.Rest)
dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_UNSPECIFIED)
}

dest.TapSide = flow_metrics.TAPSideEnum(dest.TapSideEnum).String()
dest.spanKind = &dest.SpanKind
}

func swReferencesToAttrs(refs []*agentV3.SegmentReference, dest *L7FlowLog) {
if len(refs) == 0 {
return
}
for _, ref := range refs {
dest.AttributeNames = append(dest.AttributeNames,
AttributeParentService,
AttributeParentInstance,
AttributeParentEndpoint,
AttributeNetworkAddressUsedAtPeer,
AttributeRefType,
AttributeSkywalkingTraceID,
AttributeSkywalkingParentSegmentID,
AttributeSkywalkingParentSpanID,
)
dest.AttributeValues = append(dest.AttributeValues,
ref.ParentService,
ref.ParentServiceInstance,
ref.ParentEndpoint,
ref.NetworkAddressUsedAtPeer,
ref.RefType.String(),
ref.TraceId,
ref.ParentTraceSegmentId,
strconv.Itoa(int(ref.ParentSpanId)),
)
}
}

func swLogsToAttrs(logs []*agentV3.Log, dest *L7FlowLog) {
if len(logs) == 0 {
return
}
// swLogs -> otel spanEvents
for _, log := range logs {
// TODO: log.GetTime() unset
if len(log.GetData()) == 0 {
continue
}
for _, v := range log.GetData() {
dest.AttributeNames = append(dest.AttributeNames, v.Key)
dest.AttributeValues = append(dest.AttributeValues, v.Value)
}
}
}
Loading

0 comments on commit 3b09a85

Please sign in to comment.