diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index a06ecdff681..b6d35244c74 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -79,10 +79,7 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { spanReader, err := f.CreateSpanReader() require.NoError(t, err) s.TraceReader = v1adapter.NewTraceReader(spanReader) - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) + s.ArchiveTraceReader, s.ArchiveTraceWriter = v1adapter.InitializeArchiveStorage(s.factory, zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))) s.SamplingStore, err = f.CreateSamplingStore(0) require.NoError(t, err) s.initializeDependencyReaderAndWriter(t, f) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 007380733d7..92b24619c90 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -139,9 +139,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) spanReader, err := f.CreateSpanReader() require.NoError(t, err) s.TraceReader = v1adapter.NewTraceReader(spanReader) - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() + s.ArchiveTraceReader, s.ArchiveTraceWriter = v1adapter.InitializeArchiveStorage(s.factory, zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))) require.NoError(t, err) s.DependencyReader, err = f.CreateDependencyReader() diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 9d8e85bc29e..784929f993e 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -43,10 +43,7 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) { spanReader, err := f.CreateSpanReader() require.NoError(t, err) s.TraceReader = v1adapter.NewTraceReader(spanReader) - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) + s.ArchiveTraceReader, s.ArchiveTraceWriter = v1adapter.InitializeArchiveStorage(s.factory, zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))) // TODO DependencyWriter is not implemented in grpc store diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 3e3a377c630..e010edea604 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -26,7 +26,6 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" - "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) @@ -45,8 +44,8 @@ var fixtures embed.FS type StorageIntegration struct { TraceWriter tracestore.Writer TraceReader tracestore.Reader - ArchiveSpanReader spanstore.Reader - ArchiveSpanWriter spanstore.Writer + ArchiveTraceReader tracestore.Reader + ArchiveTraceWriter tracestore.Writer DependencyWriter dependencystore.Writer DependencyReader dependencystore.Reader SamplingStore samplingstore.Store @@ -189,7 +188,7 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) { } defer s.cleanUp(t) tID := model.NewTraceID(uint64(11), uint64(22)) - expected := &model.Span{ + expectedSpan := &model.Span{ OperationName: "archive_span", StartTime: time.Now().Add(-time.Hour * 72 * 5).Truncate(time.Microsecond), TraceID: tID, @@ -197,17 +196,25 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) { References: []model.SpanRef{}, Process: model.NewProcess("archived_service", model.KeyValues{}), } + expectedTrace := &model.Trace{ + Spans: []*model.Span{ + expectedSpan, + }, + } + require.NoError(t, s.ArchiveTraceWriter.WriteTraces(context.Background(), v1adapter.V1TraceToOtelTrace(expectedTrace))) - require.NoError(t, s.ArchiveSpanWriter.WriteSpan(context.Background(), expected)) - - var actual *model.Trace + var actualTrace *model.Trace found := s.waitForCondition(t, func(_ *testing.T) bool { var err error - actual, err = s.ArchiveSpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: tID}) - return err == nil && len(actual.Spans) == 1 + iterTraces := s.ArchiveTraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: tID.ToOTELTraceID()}) + traces, err := v1adapter.V1TracesFromSeq2(iterTraces) + if len(traces) > 0 { + actualTrace = traces[0] + } + return err == nil && len(actualTrace.Spans) == 1 }) require.True(t, found) - CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual) + CompareTraces(t, expectedTrace, actualTrace) } func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { diff --git a/plugin/storage/integration/memstore_test.go b/plugin/storage/integration/memstore_test.go index daa38551cb0..ab675ecb277 100644 --- a/plugin/storage/integration/memstore_test.go +++ b/plugin/storage/integration/memstore_test.go @@ -27,8 +27,8 @@ func (s *MemStorageIntegrationTestSuite) initialize(_ *testing.T) { s.SamplingStore = memory.NewSamplingStore(2) s.TraceReader = v1adapter.NewTraceReader(store) s.TraceWriter = v1adapter.NewTraceWriter(store) - s.ArchiveSpanReader = archiveStore - s.ArchiveSpanWriter = archiveStore + s.ArchiveTraceReader = v1adapter.NewTraceReader(archiveStore) + s.ArchiveTraceWriter = v1adapter.NewTraceWriter(archiveStore) // TODO DependencyWriter is not implemented in memory store