From 3b09a8515a9dcae647dc729455b397dd512fcf32 Mon Sep 17 00:00:00 2001 From: taloric Date: Fri, 11 Oct 2024 12:29:03 +0800 Subject: [PATCH] feat: server support skywalking integration --- server/ingester/flow_log/decoder/decoder.go | 54 ++++ server/ingester/flow_log/flow_log/flow_log.go | 12 + .../ingester/flow_log/log_data/sw_import.go | 262 ++++++++++++++++++ server/libs/datatype/droplet-message.go | 5 +- 4 files changed, 332 insertions(+), 1 deletion(-) create mode 100644 server/ingester/flow_log/log_data/sw_import.go diff --git a/server/ingester/flow_log/decoder/decoder.go b/server/ingester/flow_log/decoder/decoder.go index 7a61e1d319d..df6ffbd0199 100644 --- a/server/ingester/flow_log/decoder/decoder.go +++ b/server/ingester/flow_log/decoder/decoder.go @@ -26,6 +26,7 @@ import ( "github.com/golang/protobuf/proto" logging "github.com/op/go-logging" v1 "go.opentelemetry.io/proto/otlp/trace/v1" + agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" "github.com/deepflowio/deepflow/server/ingester/common" "github.com/deepflowio/deepflow/server/ingester/exporters" @@ -153,6 +154,8 @@ func (d *Decoder) Run() { decoder := &codec.SimpleDecoder{} pbTaggedFlow := pb.NewTaggedFlow() pbTracesData := &v1.TracesData{} + pbSkywalkingData := &pb.SkyWalkingExtra{} + pbSegmentData := &agentV3.SegmentObject{} for { n := d.inQueue.Gets(buffer) start := time.Now() @@ -181,6 +184,8 @@ func (d *Decoder) Run() { d.handleOpenTelemetry(decoder, pbTracesData, true) case datatype.MESSAGE_TYPE_PACKETSEQUENCE: d.handleL4Packet(decoder) + case datatype.MESSAGE_TYPE_SKYWALKING: + d.handleSkyWalking(decoder, pbSkywalkingData, pbSegmentData, false) default: log.Warningf("unknown msg type: %d", d.msgType) @@ -258,6 +263,55 @@ func (d *Decoder) handleOpenTelemetry(decoder *codec.SimpleDecoder, pbTracesData } } +func (d *Decoder) handleSkyWalking(decoder *codec.SimpleDecoder, pbSkyWalkingData *pb.SkyWalkingExtra, pbSegmentData *agentV3.SegmentObject, compressed bool) { + var err error + for !decoder.IsEnd() { + pbSkyWalkingData.Reset() + bytes := decoder.ReadBytes() + log.Warningf("skywalking data decode bytes: %v", bytes) + if len(bytes) > 0 { + if compressed { + bytes, err = decompressOpenTelemetry(bytes) + } + if err == nil { + err = proto.Unmarshal(bytes, pbSkyWalkingData) + if err == nil { + err = proto.Unmarshal(pbSkyWalkingData.Data, pbSegmentData) + } + } + } + if decoder.Failed() || err != nil { + if d.counter.ErrorCount == 0 { + log.Errorf("skywalking data decode failed, offset=%d len=%d err: %s", decoder.Offset(), len(decoder.Bytes()), err) + } + d.counter.ErrorCount++ + continue + } + d.sendSkyWalking(pbSegmentData, pbSkyWalkingData.PeerIp) + } +} + +func (d *Decoder) sendSkyWalking(skywalkingExtra *agentV3.SegmentObject, peerIP []byte) { + if d.debugEnabled { + log.Debugf("decoder %d vtap %d recv skywalking data: %s", d.index, d.agentId, skywalkingExtra) + } + d.counter.Count++ + ls := log_data.SkyWalkingDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, skywalkingExtra, peerIP, d.platformData, d.cfg) + for _, l := range ls { + l.AddReferenceCount() + if !d.throttler.SendWithThrottling(l) { + d.counter.DropCount++ + } else { + d.fieldsBuf, d.fieldValuesBuf = d.fieldsBuf[:0], d.fieldValuesBuf[:0] + l.GenerateNewFlowTags(d.flowTagWriter.Cache) + d.flowTagWriter.WriteFieldsAndFieldValuesInCache() + d.appServiceTagWrite(l) + d.spanWrite(l) + } + l.Release() + } +} + func (d *Decoder) sendOpenMetetry(tracesData *v1.TracesData) { if d.debugEnabled { log.Debugf("decoder %d vtap %d recv otel: %s", d.index, d.agentId, tracesData) diff --git a/server/ingester/flow_log/flow_log/flow_log.go b/server/ingester/flow_log/flow_log/flow_log.go index 839d78c3af5..f9ca494acdb 100644 --- a/server/ingester/flow_log/flow_log/flow_log.go +++ b/server/ingester/flow_log/flow_log/flow_log.go @@ -54,6 +54,7 @@ type FlowLog struct { OtelLogger *Logger OtelCompressedLogger *Logger L4PacketLogger *Logger + SkyWalkingLogger *Logger Exporters *exporters.Exporters SpanWriter *dbwriter.SpanWriter TraceTreeWriter *dbwriter.TraceTreeWriter @@ -114,6 +115,10 @@ func NewFlowLog(config *config.Config, traceTreeQueue *queue.OverwriteQueue, rec if err != nil { return nil, err } + skywalkingLogger, err := NewLogger(datatype.MESSAGE_TYPE_SKYWALKING, config, platformDataManager, manager, recv, flowLogWriter, common.L7_FLOW_ID, nil, spanWriter) + if err != nil { + return nil, err + } l4PacketLogger, err := NewLogger(datatype.MESSAGE_TYPE_PACKETSEQUENCE, config, nil, manager, recv, flowLogWriter, common.L4_PACKET_ID, nil, nil) if err != nil { return nil, err @@ -124,6 +129,7 @@ func NewFlowLog(config *config.Config, traceTreeQueue *queue.OverwriteQueue, rec L7FlowLogger: l7FlowLogger, OtelLogger: otelLogger, OtelCompressedLogger: otelCompressedLogger, + SkyWalkingLogger: skywalkingLogger, L4PacketLogger: l4PacketLogger, Exporters: exporters, SpanWriter: spanWriter, @@ -356,6 +362,9 @@ func (s *FlowLog) Start() { if s.OtelCompressedLogger != nil { s.OtelCompressedLogger.Start() } + if s.SkyWalkingLogger != nil { + s.SkyWalkingLogger.Start() + } if s.SpanWriter != nil { s.SpanWriter.Start() } @@ -380,6 +389,9 @@ func (s *FlowLog) Close() error { if s.OtelCompressedLogger != nil { s.OtelCompressedLogger.Close() } + if s.SkyWalkingLogger != nil { + s.SkyWalkingLogger.Close() + } if s.SpanWriter != nil { s.SpanWriter.Close() } diff --git a/server/ingester/flow_log/log_data/sw_import.go b/server/ingester/flow_log/log_data/sw_import.go new file mode 100644 index 00000000000..e1b6ab59aee --- /dev/null +++ b/server/ingester/flow_log/log_data/sw_import.go @@ -0,0 +1,262 @@ +package log_data + +import ( + "fmt" + "net" + "strconv" + "time" + + flowlogCfg "github.com/deepflowio/deepflow/server/ingester/flow_log/config" + "github.com/deepflowio/deepflow/server/libs/datatype" + flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" + "github.com/deepflowio/deepflow/server/libs/grpc" + "github.com/deepflowio/deepflow/server/libs/utils" + otelV1 "go.opentelemetry.io/proto/otlp/trace/v1" + agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" +) + +const ( + AttributeRefType = "refType" + AttributeParentService = "parent.service" + AttributeParentInstance = "parent.service.instance" + AttributeParentEndpoint = "parent.endpoint" + AttributeSkywalkingSpanID = "sw8.span_id" + AttributeSkywalkingTraceID = "sw8.trace_id" + AttributeSkywalkingSegmentID = "sw8.segment_id" + AttributeSkywalkingParentSpanID = "sw8.parent_span_id" + AttributeSkywalkingParentSegmentID = "sw8.parent_segment_id" + AttributeNetworkAddressUsedAtPeer = "network.AddressUsedAtPeer" +) + +func SkyWalkingDataToL7FlowLogs(vtapID, orgId, teamId uint16, segment *agentV3.SegmentObject, peerIP []byte, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*L7FlowLog { + if segment.GetSpans() == nil && len(segment.GetSpans()) == 0 { + return nil + } + + ip := net.ParseIP(string(peerIP)) + ret := make([]*L7FlowLog, 0, len(segment.GetSpans())) + for _, span := range segment.GetSpans() { + if span == nil { + continue + } + ret = append(ret, swSpanToL7FlowLog( + vtapID, + orgId, + teamId, + segment.GetTraceId(), + segment.GetTraceSegmentId(), + segment.GetService(), + segment.GetServiceInstance(), + span, + ip, + platformData, + cfg)) + } + return ret +} + +func swSpanToL7FlowLog(vtapID, orgId, teamId uint16, traceID, segmentID, appService, appInstance string, span *agentV3.SpanObject, ip net.IP, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) *L7FlowLog { + h := AcquireL7FlowLog() + + // metadata + h._id = genID(uint32(span.GetEndTime()/int64(time.Second)), &L7FlowLogCounter, platformData.QueryAnalyzerID()) + h.VtapID, h.OrgId, h.TeamID = vtapID, orgId, teamId + h.TraceId, h.AppService, h.AppInstance = traceID, appService, appInstance + h.TraceIdIndex = parseTraceIdIndex(h.TraceId, &cfg.Base.TraceIdWithIndex) + + // sw.span.OperationName -> otel.Span.Name -> l7_flow_log.Endpoint + h.Endpoint = span.OperationName + // unit: ms + h.L7Base.StartTime = span.GetStartTime() + h.L7Base.EndTime = span.GetEndTime() + + h.AttributeNames = make([]string, 0, len(span.Tags)) + h.AttributeValues = make([]string, 0, len(span.Tags)) + + swSpanIDToFlowLogSpanID(segmentID, span.GetSpanId(), span.GetParentSpanId(), span.GetRefs(), h) + swTagsToAttr(span, h) + swInternalErrorToStatus(span, h) + swSpanLayerToSpanKind(span, h) + + swLogsToAttrs(span.GetLogs(), h) + swReferencesToAttrs(span.Refs, h) + + if ip != nil { + if ip4 := ip.To4(); ip4 != nil { + if h.TapSideEnum == uint8(flow_metrics.ClientApp) { + h.IP40 = utils.IpToUint32(ip4) + } else { + h.IP41 = utils.IpToUint32(ip4) + } + } else { + h.IsIPv4 = false + if h.TapSideEnum == uint8(flow_metrics.ClientApp) { + h.IP60 = ip + } else { + h.IP61 = ip + } + } + } + + return h +} + +func swTagsToAttr(span *agentV3.SpanObject, h *L7FlowLog) { + if span == nil || span.Tags == nil { + return + } + + for _, tag := range span.Tags { + h.AttributeNames = append(h.AttributeNames, tag.Key) + h.AttributeValues = append(h.AttributeValues, tag.Value) + + // some key parse to specific columns + switch tag.Key { + case "url": + // sw.tags.`url` -> otel.resource_span.attributes.`http.url` -> l7_flow_log.RequestResource + parsedURLPath, err := parseUrlPath(tag.Value) + if err != nil { + log.Debugf("http.url (%s) parse failed: %s", tag.Value, err) + } else { + h.RequestResource = parsedURLPath + } + case "status_code": + // sw.tags.`status_code` -> otel.resource_span.attributes.`http.status_code` -> l7_flow_log.ResponseCode + statusCode, _ := strconv.Atoi(tag.Value) + h.responseCode = int32(statusCode) + h.ResponseCode = &h.responseCode + case "db.type": + // sw.tags.`db.type` -> otel.resource_span.attributes.`db.system` -> l7_flow_log.L7ProtocolStr + h.L7ProtocolStr = tag.Value + case "db.instance": + // sw.tags.`db.instance` -> otel.resource_span.attributes.`db.name` + // TODO: undefined in l7_flow_log + case "mq.broker": + // sw.tags.`mq.broker` -> otel.resource_span.attributes.`net.peer.name` + // TODO: undefined in l7_flow_log + default: + } + } +} + +func swSpanIDToFlowLogSpanID(segmentID string, spanID, parentSpanID int32, refs []*agentV3.SegmentReference, dest *L7FlowLog) { + // expected format: segmentid-spanid + dest.SpanId = fmt.Sprintf("%s-%d", segmentID, spanID) + + if parentSpanID != -1 { + // parent spanid = -1, means root span without parent span in current skywalking segment, so it is necessary to search for the parent segment. + dest.ParentSpanId = fmt.Sprintf("%s-%d", segmentID, parentSpanID) + } else if len(refs) == 1 { + // 注意这里 OTel v0.79.0 之后的 skywalkingreceiver 实现把 segmentIDToSpanID(ref[0].ParentTraceSegmentId, ref[0].ParentSpanId) 结果(异或计算)转成了 ParentSpanID + // 这会导致 DeepFlow 里误判 ParentSpanID 认为有值直接写入,但这个值是错的,这里期望值是: "ref[0].ParentTraceSegmentId"-"ref[0].ParentSpanId" + // 所以这里不能直接用 skywalkingreceiver 的实现 + // TODO: SegmentReference references usually have only one element, but in batch consumer case, such as in MQ or async batch process, it could be multiple. + // We only handle one element for now. + ref0 := refs[0] + if ref0.RefType.String() == agentV3.RefType_CrossProcess.String() || ref0.RefType.String() == agentV3.RefType_CrossThread.String() { + if ref0.ParentTraceSegmentId != "" && ref0.ParentSpanId >= 0 { + dest.ParentSpanId = fmt.Sprintf("%s-%d", ref0.ParentTraceSegmentId, ref0.ParentSpanId) + } + } + } + + // original segmentID & spanID + dest.AttributeNames = append(dest.AttributeNames, AttributeSkywalkingSegmentID, AttributeSkywalkingSpanID) + dest.AttributeValues = append(dest.AttributeValues, segmentID, strconv.Itoa(int(spanID))) + if parentSpanID != -1 { + dest.AttributeNames = append(dest.AttributeNames, AttributeSkywalkingParentSpanID) + dest.AttributeValues = append(dest.AttributeValues, strconv.Itoa(int(parentSpanID))) + } +} + +func swInternalErrorToStatus(span *agentV3.SpanObject, dest *L7FlowLog) { + if dest.responseCode != 0 { + // if dest.responseCode set by `status_code`, use it first + dest.ResponseStatus = uint8(httpCodeToResponseStatus(dest.responseCode)) + if dest.ResponseStatus == uint8(datatype.STATUS_CLIENT_ERROR) || dest.ResponseStatus == uint8(datatype.STATUS_SERVER_ERROR) { + dest.ResponseException = GetHTTPExceptionDesc(uint16(dest.responseCode)) + } + } else { + // if not set, use span Error instead + if span.GetIsError() { + dest.responseCode = int32(datatype.STATUS_SERVER_ERROR) + dest.ResponseException = GetHTTPExceptionDesc(uint16(dest.responseCode)) + } else { + dest.responseCode = int32(datatype.STATUS_OK) + } + dest.ResponseStatus = uint8(httpCodeToResponseStatus(dest.responseCode)) + } +} + +func swSpanLayerToSpanKind(span *agentV3.SpanObject, dest *L7FlowLog) { + switch { + case span.SpanLayer == agentV3.SpanLayer_MQ: + if span.SpanType == agentV3.SpanType_Entry { + dest.TapSideEnum = uint8(flow_metrics.ServerApp) + dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_CONSUMER) + } else if span.SpanType == agentV3.SpanType_Exit { + dest.TapSideEnum = uint8(flow_metrics.ClientApp) + dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_PRODUCER) + } + case span.GetSpanType() == agentV3.SpanType_Exit: + dest.TapSideEnum = uint8(flow_metrics.ClientApp) + dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_CLIENT) + case span.GetSpanType() == agentV3.SpanType_Entry: + dest.TapSideEnum = uint8(flow_metrics.ServerApp) + dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_SERVER) + case span.GetSpanType() == agentV3.SpanType_Local: + dest.TapSideEnum = uint8(flow_metrics.App) + dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_INTERNAL) + default: + dest.TapSideEnum = uint8(flow_metrics.Rest) + dest.SpanKind = uint8(otelV1.Span_SPAN_KIND_UNSPECIFIED) + } + + dest.TapSide = flow_metrics.TAPSideEnum(dest.TapSideEnum).String() + dest.spanKind = &dest.SpanKind +} + +func swReferencesToAttrs(refs []*agentV3.SegmentReference, dest *L7FlowLog) { + if len(refs) == 0 { + return + } + for _, ref := range refs { + dest.AttributeNames = append(dest.AttributeNames, + AttributeParentService, + AttributeParentInstance, + AttributeParentEndpoint, + AttributeNetworkAddressUsedAtPeer, + AttributeRefType, + AttributeSkywalkingTraceID, + AttributeSkywalkingParentSegmentID, + AttributeSkywalkingParentSpanID, + ) + dest.AttributeValues = append(dest.AttributeValues, + ref.ParentService, + ref.ParentServiceInstance, + ref.ParentEndpoint, + ref.NetworkAddressUsedAtPeer, + ref.RefType.String(), + ref.TraceId, + ref.ParentTraceSegmentId, + strconv.Itoa(int(ref.ParentSpanId)), + ) + } +} + +func swLogsToAttrs(logs []*agentV3.Log, dest *L7FlowLog) { + if len(logs) == 0 { + return + } + // swLogs -> otel spanEvents + for _, log := range logs { + // TODO: log.GetTime() unset + if len(log.GetData()) == 0 { + continue + } + for _, v := range log.GetData() { + dest.AttributeNames = append(dest.AttributeNames, v.Key) + dest.AttributeValues = append(dest.AttributeValues, v.Value) + } + } +} diff --git a/server/libs/datatype/droplet-message.go b/server/libs/datatype/droplet-message.go index 70cd12b9e0a..b529fdf7f1e 100644 --- a/server/libs/datatype/droplet-message.go +++ b/server/libs/datatype/droplet-message.go @@ -54,7 +54,8 @@ const ( MESSAGE_TYPE_ALERT_EVENT MESSAGE_TYPE_K8S_EVENT MESSAGE_TYPE_APPLICATION_LOG - MESSAGE_TYPE_AGENT_LOG // 18 + MESSAGE_TYPE_AGENT_LOG + MESSAGE_TYPE_SKYWALKING // 19 MESSAGE_TYPE_MAX ) @@ -80,6 +81,7 @@ var MessageTypeString = [MESSAGE_TYPE_MAX]string{ MESSAGE_TYPE_K8S_EVENT: "k8s_event", MESSAGE_TYPE_APPLICATION_LOG: "application_log", MESSAGE_TYPE_AGENT_LOG: "agent_log", + MESSAGE_TYPE_SKYWALKING: "skywalking", } func (m MessageType) String() string { @@ -123,6 +125,7 @@ var MessageHeaderTypes = [MESSAGE_TYPE_MAX]MessageHeaderType{ MESSAGE_TYPE_K8S_EVENT: HEADER_TYPE_LT_VTAP, MESSAGE_TYPE_APPLICATION_LOG: HEADER_TYPE_LT_VTAP, MESSAGE_TYPE_AGENT_LOG: HEADER_TYPE_LT_VTAP, + MESSAGE_TYPE_SKYWALKING: HEADER_TYPE_LT_VTAP, } func (m MessageType) HeaderType() MessageHeaderType {