Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: chahatsagarmain <[email protected]>
  • Loading branch information
chahatsagarmain committed Jan 21, 2025
1 parent 28c1231 commit 04c9aab
Showing 1 changed file with 103 additions and 130 deletions.
233 changes: 103 additions & 130 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,10 +854,13 @@ func TestOTLPReceiverWithV2Storage(t *testing.T) {
span.SetName("test-trace")

var receivedTraces atomic.Pointer[ptrace.Traces]
var recievedCtx atomic.Pointer[context.Context]
mockWriter.On("WriteTraces", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
storeContext := args.Get(0).(context.Context)
storeTrace := args.Get(1).(ptrace.Traces)
receivedTraces.Store(&storeTrace)
recievedCtx.Store(&storeContext)
}).Return(nil)

spanProcessor, err := NewSpanProcessor(
Expand All @@ -871,156 +874,126 @@ func TestOTLPReceiverWithV2Storage(t *testing.T) {
defer spanProcessor.Close()
logger := zaptest.NewLogger(t)

portHttp := "4318"
portGrpc := "4317"

// Can't send tenancy headers with http request to OTLP receiver
tenancyMgr := &tenancy.Manager{
Enabled: false,
}

// Create and start receiver
rec, err := handler.StartOTLPReceiver(
optionsWithPorts(fmt.Sprintf("localhost:%v", portHttp), fmt.Sprintf("localhost:%v", portGrpc)),
logger,
spanProcessor,
tenancyMgr,
)
require.NoError(t, err)
ctx := context.Background()
defer rec.Shutdown(ctx)
portHttp := "4317"
portGrpc := "4318"

// Send trace via HTTP
url := fmt.Sprintf("http://localhost:%v/v1/traces", portHttp)
client := &http.Client{}

marshaler := ptrace.JSONMarshaler{}
data, err := marshaler.MarshalTraces(traces)
require.NoError(t, err)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(data))
require.NoError(t, err)
req.Header.Add("Content-Type", "application/json")
req.Header.Add("x-tenant", "test-tenant")

resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
t.Run("Send trace data using HTTP connection", func(t *testing.T) {
// Can't send tenancy headers with http request to OTLP receiver
tenancyMgr := &tenancy.Manager{
Enabled: false,
}

require.Equal(t, http.StatusOK, resp.StatusCode)
// Create and start receiver
rec, err := handler.StartOTLPReceiver(
optionsWithPorts(fmt.Sprintf("localhost:%v", portHttp), fmt.Sprintf("localhost:%v", portGrpc)),
logger,
spanProcessor,
tenancyMgr,
)
require.NoError(t, err)
ctx := context.Background()
defer rec.Shutdown(ctx)

assert.Eventually(t, func() bool {
storedTraces := receivedTraces.Load()
if storedTraces == nil {
return false
}
receivedSpan := storedTraces.ResourceSpans().At(0).
ScopeSpans().At(0).
Spans().At(0)
return receivedSpan.Name() == span.Name()
}, 1*time.Second, 1*time.Millisecond)
url := fmt.Sprintf("http://localhost:%v/v1/traces", portHttp)
client := &http.Client{}

mockWriter.AssertExpectations(t)
}
marshaler := ptrace.JSONMarshaler{}
data, err := marshaler.MarshalTraces(traces)
require.NoError(t, err)

func TestOTLPReceiverWithV2StorageWithGRPC(t *testing.T) {
// Setup mock writer and expectations
mockWriter := mocks.NewWriter(t)
traces := ptrace.NewTraces()
rSpans := traces.ResourceSpans().AppendEmpty()
sSpans := rSpans.ScopeSpans().AppendEmpty()
span := sSpans.Spans().AppendEmpty()
span.SetName("test-trace")
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(data))
require.NoError(t, err)
req.Header.Add("Content-Type", "application/json")
req.Header.Add("x-tenant", "test-tenant")

var receivedTraces atomic.Pointer[ptrace.Traces]
var recievedCtx atomic.Pointer[context.Context]
mockWriter.On("WriteTraces", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
storeContext := args.Get(0).(context.Context)
storeTrace := args.Get(1).(ptrace.Traces)
receivedTraces.Store(&storeTrace)
recievedCtx.Store(&storeContext)
}).Return(nil)
resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

spanProcessor, err := NewSpanProcessor(
mockWriter,
nil,
Options.NumWorkers(1),
Options.QueueSize(1),
Options.ReportBusy(true),
)
require.NoError(t, err)
defer spanProcessor.Close()
logger := zaptest.NewLogger(t)
require.Equal(t, http.StatusOK, resp.StatusCode)

portHttp := "4318"
portGrpc := "4317"
assert.Eventually(t, func() bool {
storedTraces := receivedTraces.Load()
if storedTraces == nil {
return false
}
receivedSpan := storedTraces.ResourceSpans().At(0).
ScopeSpans().At(0).
Spans().At(0)
return receivedSpan.Name() == span.Name()
}, 1*time.Second, 1*time.Millisecond)

// Setup proper tenancy manager using NewManager
tenancyMgr := tenancy.NewManager(&tenancy.Options{
Enabled: true,
Header: "x-tenant",
Tenants: []string{"test-tenant"},
mockWriter.AssertExpectations(t)
})

// Create and start receiver
rec, err := handler.StartOTLPReceiver(
optionsWithPorts(fmt.Sprintf("localhost:%v", portHttp), fmt.Sprintf("localhost:%v", portGrpc)),
logger,
spanProcessor,
tenancyMgr,
)
require.NoError(t, err)
ctx := context.Background()
defer rec.Shutdown(ctx)
t.Run("Send trace data using GRPC connection", func(t *testing.T) {
// Setup proper tenancy manager using NewManager
tenancyMgr := tenancy.NewManager(&tenancy.Options{
Enabled: true,
Header: "x-tenant",
Tenants: []string{"test-tenant"},
})

// Create gRPC client connection
conn, err := grpc.NewClient(
fmt.Sprintf("localhost:%v", portGrpc),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
defer conn.Close()
// Create and start receiver
rec, err := handler.StartOTLPReceiver(
optionsWithPorts(fmt.Sprintf("localhost:%v", portHttp), fmt.Sprintf("localhost:%v", portGrpc)),
logger,
spanProcessor,
tenancyMgr,
)
require.NoError(t, err)
ctx := context.Background()
defer rec.Shutdown(ctx)

// Create metadata with tenant information
md := metadata.New(map[string]string{
tenancyMgr.Header: "test-tenant",
})
ctxWithMD := metadata.NewOutgoingContext(ctx, md)
// Create gRPC client connection
conn, err := grpc.NewClient(
fmt.Sprintf("localhost:%v", portGrpc),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
defer conn.Close()

client := otlptrace.NewTraceServiceClient(conn)
req := &otlptrace.ExportTraceServiceRequest{
ResourceSpans: []*tracepb.ResourceSpans{
{
ScopeSpans: []*tracepb.ScopeSpans{
{
Spans: []*tracepb.Span{
{
Name: "test-trace",
// Create metadata with tenant information
md := metadata.New(map[string]string{
tenancyMgr.Header: "test-tenant",
})
ctxWithMD := metadata.NewOutgoingContext(ctx, md)

client := otlptrace.NewTraceServiceClient(conn)
req := &otlptrace.ExportTraceServiceRequest{
ResourceSpans: []*tracepb.ResourceSpans{
{
ScopeSpans: []*tracepb.ScopeSpans{
{
Spans: []*tracepb.Span{
{
Name: "test-trace",
},
},
},
},
},
},
},
}
}

// Send traces via gRPC
_, err = client.Export(ctxWithMD, req)
require.NoError(t, err)
// Send traces via gRPC
_, err = client.Export(ctxWithMD, req)
require.NoError(t, err)

assert.Eventually(t, func() bool {
storedTraces := receivedTraces.Load()
storedCtx := recievedCtx.Load()
if storedTraces == nil || storedCtx == nil {
return false
}
receivedSpan := storedTraces.ResourceSpans().At(0).
ScopeSpans().At(0).
Spans().At(0)
receivedTenant := tenancy.GetTenant(*storedCtx)
return receivedSpan.Name() == span.Name() && receivedTenant == "test-tenant"
}, 1*time.Second, 1*time.Millisecond)

mockWriter.AssertExpectations(t)
assert.Eventually(t, func() bool {
storedTraces := receivedTraces.Load()
storedCtx := recievedCtx.Load()
if storedTraces == nil || storedCtx == nil {
return false
}
receivedSpan := storedTraces.ResourceSpans().At(0).
ScopeSpans().At(0).
Spans().At(0)
receivedTenant := tenancy.GetTenant(*storedCtx)
return receivedSpan.Name() == span.Name() && receivedTenant == "test-tenant"
}, 1*time.Second, 1*time.Millisecond)

mockWriter.AssertExpectations(t)
})
}

0 comments on commit 04c9aab

Please sign in to comment.