Skip to content

Commit

Permalink
[storage] Remove distinction between primary and archive storage in…
Browse files Browse the repository at this point in the history
…terfaces (#6567)

## Which problem is this PR solving?
- Towards #6065

## Description of the changes
- This PR completely removes the interface `storage.ArchiveFactory` by
refactoring all the storage implementations to remove the distinction
between a primary and archive interface. Note that the concept of
archive storage remains the same within Jaeger, we just now use the same
interface to handle both primary and archive storages.
- 🛑 Breaking change for users of GRPC storage 🛑
- The GRPC storage was changed to only dispatch to the primary storage
instead of being able to dispatch to a primary and archive storage.
- Mitigation for jaeger-v1 users: In order to restore your archive
storage, configure a new GRPC server on a different port and specify it
via `--grpc-storage-archive.server`. Archive storage will also need to
be enabled via `--grpc-storage-archive.enabled=true`
- Mitigation for jaeger-v2 users: In order to restore your archive
storage, configured a new GRPC server on a different port and specify it
via a new storage backend in `jaeger_storage.backends` (an example of
this can be viewed at
https://github.com/jaegertracing/jaeger/blob/main/cmd/jaeger/config-remote-storage.yaml)

## How was this change tested?
### gRPC Storage 
#### On main (establish ground truth)
Start the remote storage binary (uses memory storage by default which
implements the ArchiveFactory interface)
```
go run ./cmd/remote-storage
```

Start the all in one binary configured with grpc storage (jaeger-v1)
```
SPAN_STORAGE_TYPE=grpc go run ./cmd/all-in-one --grpc-storage.server=http://localhost:17271
```
Traces can be archived from the UI
<img width="1469" alt="Screenshot 2025-01-23 at 10 17 32 PM"
src="https://github.com/user-attachments/assets/76a7341b-0344-479b-ad78-380a32412ee4"
/>

For jaeger-v2, change the extension section of
`cmd/jaeger/internal/all-in-one.yaml` to be the following
```yaml
  jaeger_query:
    storage:
      traces: some_storage
      traces_archive: some_storage

  jaeger_storage:
    backends:
      some_storage:
        grpc:
          endpoint: localhost:17271
          tls:
            insecure: true
```
and then start the binary as follows:
```
go run ./cmd/jaeger/
```

#### For current PR 
Stop both binaries and checkout this PR 
```
gh pr checkout 6567
```
Start two remote storage binaries (in two separate terminals)
```
go run ./cmd/remote-storage --admin.http.host-port=:17270 --grpc.host-port=:17271 
go run ./cmd/remote-storage --admin.http.host-port=:17272 --grpc.host-port=:17273
```
Start the all-in-one binary with explicit archive configurations
```
SPAN_STORAGE_TYPE=grpc go run ./cmd/all-in-one --grpc-storage.server=http://localhost:17271 --grpc-storage-archive.enabled=true --grpc-storage-archive.server=http://localhost:17273
```
Traces can be once again archived from the UI
<img width="1469" alt="Screenshot 2025-01-23 at 10 25 29 PM"
src="https://github.com/user-attachments/assets/1b4ca69f-e94b-4b08-8854-04d3db2650fc"
/>

For jaeger-v2, the configuration was changed to the following:
```yaml
extensions:
  jaeger_query:
    storage:
      traces: some_storage
      traces_archive: another_storage

  jaeger_storage:
    backends:
      some_storage:
        grpc:
          endpoint: localhost:17271
          tls:
            insecure: true
      another_storage:
        grpc:
          endpoint: localhost:17273
          tls:
            insecure: true
```

Try running all-in-one without archive-storage enabled
```
SPAN_STORAGE_TYPE=grpc go run ./cmd/all-in-one --grpc-storage.server=http://localhost:17271
```
We cannot archive traces
<img width="1469" alt="Screenshot 2025-01-23 at 10 26 53 PM"
src="https://github.com/user-attachments/assets/0ae7cdb6-25a5-4a74-be63-fb421d77d611"
/>

### CLI Flags
#### ElasticSearch CLI Flags
```
git checkout main
SPAN_STORAGE_TYPE=elasticsearch go run ./cmd/collector help > es_main
git checkout v1-es-archive
SPAN_STORAGE_TYPE=elasticsearch go run ./cmd/collector help > es_current
```
The diff can be viewed [here](https://www.diffchecker.com/xXuFqnOc/).
There is no difference.

#### Cassandra CLI Flags
```
git checkout main
SPAN_STORAGE_TYPE=cassandra go run ./cmd/collector help > cassandra_main
git checkout v1-es-archive
SPAN_STORAGE_TYPE=cassandra go run ./cmd/collector help > cassandra_current
```
The diff can be viewed [here](https://www.diffchecker.com/x74PKvhA/).
There are a few here differences here in which `cassandra-archive.*`
gains some new configuration options that were previously only existed
for the primary storage. `cassandra-archive.*` also gains some defaults
that will be the same as the the primary storage.

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Jan 25, 2025
1 parent aece007 commit d7ab0f8
Show file tree
Hide file tree
Showing 49 changed files with 1,126 additions and 1,674 deletions.
4 changes: 2 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ by default uses only in-memory database.`,
queryTelset := baseTelset // copy
queryTelset.Metrics = queryMetricsFactory
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
qOpts.BuildQueryServiceOptionsV2(storageFactory, logger),
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory.InitArchiveStorage, logger),
qOpts.BuildQueryServiceOptionsV2(storageFactory.InitArchiveStorage, logger),
traceReader, dependencyReader, metricsQueryService,
tm, queryTelset,
)
Expand Down
6 changes: 6 additions & 0 deletions cmd/jaeger/config-remote-storage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extensions:
jaeger_query:
storage:
traces: some-storage
traces_archive: another-storage
ui:
config_file: ./cmd/jaeger/config-ui.json

Expand All @@ -34,6 +35,11 @@ extensions:
endpoint: localhost:17271
tls:
insecure: true
another-storage:
grpc:
endpoint: localhost:17272
tls:
insecure: true

receivers:
otlp:
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (cfg *TraceBackend) Unmarshal(conf *confmap.Conf) error {
}
if conf.IsSet("cassandra") {
cfg.Cassandra = &cassandra.Options{
Primary: cassandra.NamespaceConfig{
NamespaceConfig: cassandra.NamespaceConfig{
Configuration: casCfg.DefaultConfiguration(),
Enabled: true,
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ backends:
`)
cfg := createDefaultConfig().(*Config)
require.NoError(t, conf.Unmarshal(cfg))
assert.NotEmpty(t, cfg.TraceBackends["some_storage"].Cassandra.Primary.Connection.Servers)
assert.NotEmpty(t, cfg.TraceBackends["some_storage"].Cassandra.Configuration.Connection.Servers)
}

func TestConfigDefaultElasticsearch(t *testing.T) {
Expand Down
9 changes: 7 additions & 2 deletions cmd/jaeger/internal/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@ import (
"testing"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/ports"
)

type GRPCStorageIntegration struct {
E2EStorageIntegration

remoteStorage *integration.RemoteMemoryStorage
remoteStorage *integration.RemoteMemoryStorage
archiveRemoteStorage *integration.RemoteMemoryStorage
}

func (s *GRPCStorageIntegration) initialize(t *testing.T) {
s.remoteStorage = integration.StartNewRemoteMemoryStorage(t)
s.remoteStorage = integration.StartNewRemoteMemoryStorage(t, ports.RemoteStorageGRPC)
s.archiveRemoteStorage = integration.StartNewRemoteMemoryStorage(t, ports.RemoteStorageGRPC+1)
}

func (s *GRPCStorageIntegration) cleanUp(t *testing.T) {
s.remoteStorage.Close(t)
s.archiveRemoteStorage.Close(t)
s.initialize(t)
}

Expand All @@ -37,6 +41,7 @@ func TestGRPCStorage(t *testing.T) {
s.e2eInitialize(t, "grpc")
t.Cleanup(func() {
s.remoteStorage.Close(t)
s.archiveRemoteStorage.Close(t)
})
s.RunSpanStoreTests(t)
}
23 changes: 16 additions & 7 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

Expand Down Expand Up @@ -138,10 +138,19 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q
return qOpts, nil
}

type InitArchiveStorageFn func(logger *zap.Logger) (spanstore.Reader, spanstore.Writer)

// BuildQueryServiceOptions creates a QueryServiceOptions struct with appropriate adjusters and archive config
func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.BaseFactory, logger *zap.Logger) *querysvc.QueryServiceOptions {
func (qOpts *QueryOptions) BuildQueryServiceOptions(
initArchiveStorageFn InitArchiveStorageFn,
logger *zap.Logger,
) *querysvc.QueryServiceOptions {
opts := &querysvc.QueryServiceOptions{}
if !opts.InitArchiveStorage(storageFactory, logger) {
ar, aw := initArchiveStorageFn(logger)
if ar != nil && aw != nil {
opts.ArchiveSpanReader = ar
opts.ArchiveSpanWriter = aw
} else {
logger.Info("Archive storage not initialized")
}

Expand All @@ -150,13 +159,13 @@ func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.BaseF
return opts
}

func (qOpts *QueryOptions) BuildQueryServiceOptionsV2(storageFactory storage.BaseFactory, logger *zap.Logger) *v2querysvc.QueryServiceOptions {
func (qOpts *QueryOptions) BuildQueryServiceOptionsV2(initArchiveStorageFn InitArchiveStorageFn, logger *zap.Logger) *v2querysvc.QueryServiceOptions {
opts := &v2querysvc.QueryServiceOptions{}

ar, aw := v1adapter.InitializeArchiveStorage(storageFactory, logger)
ar, aw := initArchiveStorageFn(logger)
if ar != nil && aw != nil {
opts.ArchiveTraceReader = ar
opts.ArchiveTraceWriter = aw
opts.ArchiveTraceReader = v1adapter.NewTraceReader(ar)
opts.ArchiveTraceWriter = v1adapter.NewTraceWriter(aw)
} else {
logger.Info("Archive storage not initialized")
}
Expand Down
61 changes: 26 additions & 35 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/mocks"
spanstore_mocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

func TestQueryBuilderFlags(t *testing.T) {
Expand Down Expand Up @@ -86,60 +86,51 @@ func TestStringSliceAsHeader(t *testing.T) {
require.NoError(t, err)
}

func initializedFn(*zap.Logger) (spanstore.Reader, spanstore.Writer) {
return &spanstoremocks.Reader{}, &spanstoremocks.Writer{}
}

func uninitializedFn(*zap.Logger) (spanstore.Reader, spanstore.Writer) {
return nil, nil
}

func TestBuildQueryServiceOptions(t *testing.T) {
v, _ := config.Viperize(AddFlags)
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.NotNil(t, qOpts)

qSvcOpts := qOpts.BuildQueryServiceOptions(&mocks.Factory{}, zap.NewNop())
assert.NotNil(t, qSvcOpts)
assert.NotNil(t, qSvcOpts.Adjuster)
assert.Nil(t, qSvcOpts.ArchiveSpanReader)
assert.Nil(t, qSvcOpts.ArchiveSpanWriter)

comboFactory := struct {
*mocks.Factory
*mocks.ArchiveFactory
}{
&mocks.Factory{},
&mocks.ArchiveFactory{},
}

comboFactory.ArchiveFactory.On("CreateArchiveSpanReader").Return(&spanstore_mocks.Reader{}, nil)
comboFactory.ArchiveFactory.On("CreateArchiveSpanWriter").Return(&spanstore_mocks.Writer{}, nil)

qSvcOpts = qOpts.BuildQueryServiceOptions(comboFactory, zap.NewNop())
qSvcOpts := qOpts.BuildQueryServiceOptions(initializedFn, zap.NewNop())
assert.NotNil(t, qSvcOpts)
assert.NotNil(t, qSvcOpts.Adjuster)
assert.NotNil(t, qSvcOpts.ArchiveSpanReader)
assert.NotNil(t, qSvcOpts.ArchiveSpanWriter)
}

func TestBuildQueryServiceOptionsV2(t *testing.T) {
func TestBuildQueryServiceOptions_NoArchiveStorage(t *testing.T) {
v, _ := config.Viperize(AddFlags)
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.NotNil(t, qOpts)

qSvcOpts := qOpts.BuildQueryServiceOptionsV2(&mocks.Factory{}, zap.NewNop())
logger, logBuf := testutils.NewLogger()
qSvcOpts := qOpts.BuildQueryServiceOptions(uninitializedFn, logger)
assert.NotNil(t, qSvcOpts)
assert.NotNil(t, qSvcOpts.Adjuster)
assert.Nil(t, qSvcOpts.ArchiveTraceReader)
assert.Nil(t, qSvcOpts.ArchiveTraceWriter)
assert.Nil(t, qSvcOpts.ArchiveSpanReader)
assert.Nil(t, qSvcOpts.ArchiveSpanWriter)

comboFactory := struct {
*mocks.Factory
*mocks.ArchiveFactory
}{
&mocks.Factory{},
&mocks.ArchiveFactory{},
}
require.Contains(t, logBuf.String(), "Archive storage not initialized")
}

func TestBuildQueryServiceOptionsV2(t *testing.T) {
v, _ := config.Viperize(AddFlags)
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.NotNil(t, qOpts)

comboFactory.ArchiveFactory.On("CreateArchiveSpanReader").Return(&spanstore_mocks.Reader{}, nil)
comboFactory.ArchiveFactory.On("CreateArchiveSpanWriter").Return(&spanstore_mocks.Writer{}, nil)
qSvcOpts := qOpts.BuildQueryServiceOptionsV2(initializedFn, zap.NewNop())

qSvcOpts = qOpts.BuildQueryServiceOptionsV2(comboFactory, zap.NewNop())
assert.NotNil(t, qSvcOpts)
assert.NotNil(t, qSvcOpts.Adjuster)
assert.NotNil(t, qSvcOpts.ArchiveTraceReader)
Expand All @@ -153,7 +144,7 @@ func TestBuildQueryServiceOptionsV2_NoArchiveStorage(t *testing.T) {
assert.NotNil(t, qOpts)

logger, logBuf := testutils.NewLogger()
qSvcOpts := qOpts.BuildQueryServiceOptionsV2(&mocks.Factory{}, logger)
qSvcOpts := qOpts.BuildQueryServiceOptionsV2(uninitializedFn, logger)
assert.Nil(t, qSvcOpts.ArchiveTraceReader)
assert.Nil(t, qSvcOpts.ArchiveTraceWriter)

Expand Down
33 changes: 0 additions & 33 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ import (
"errors"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
Expand Down Expand Up @@ -164,36 +161,6 @@ func (qs QueryService) GetCapabilities() StorageCapabilities {
}
}

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.BaseFactory, logger *zap.Logger) bool {
archiveFactory, ok := storageFactory.(storage.ArchiveFactory)
if !ok {
logger.Info("Archive storage not supported by the factory")
return false
}
reader, err := archiveFactory.CreateArchiveSpanReader()
if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) {
logger.Info("Archive storage not created", zap.String("reason", err.Error()))
return false
}
if err != nil {
logger.Error("Cannot init archive storage reader", zap.Error(err))
return false
}
writer, err := archiveFactory.CreateArchiveSpanWriter()
if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) {
logger.Info("Archive storage not created", zap.String("reason", err.Error()))
return false
}
if err != nil {
logger.Error("Cannot init archive storage writer", zap.Error(err))
return false
}
opts.ArchiveSpanReader = reader
opts.ArchiveSpanWriter = writer
return true
}

// hasArchiveStorage returns true if archive storage reader/writer are initialized.
func (opts *QueryServiceOptions) hasArchiveStorage() bool {
return opts.ArchiveSpanReader != nil && opts.ArchiveSpanWriter != nil
Expand Down
52 changes: 1 addition & 51 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,64 +451,14 @@ func TestGetCapabilitiesWithSupportsArchive(t *testing.T) {

type fakeStorageFactory1 struct{}

type fakeStorageFactory2 struct {
fakeStorageFactory1
r spanstore.Reader
w spanstore.Writer
rErr error
wErr error
}

func (*fakeStorageFactory1) Initialize(metrics.Factory, *zap.Logger) error {
return nil
}
func (*fakeStorageFactory1) CreateSpanReader() (spanstore.Reader, error) { return nil, nil }
func (*fakeStorageFactory1) CreateSpanWriter() (spanstore.Writer, error) { return nil, nil }
func (*fakeStorageFactory1) CreateDependencyReader() (dependencystore.Reader, error) { return nil, nil }

func (f *fakeStorageFactory2) CreateArchiveSpanReader() (spanstore.Reader, error) { return f.r, f.rErr }
func (f *fakeStorageFactory2) CreateArchiveSpanWriter() (spanstore.Writer, error) { return f.w, f.wErr }

var (
_ storage.Factory = new(fakeStorageFactory1)
_ storage.ArchiveFactory = new(fakeStorageFactory2)
)

func TestInitArchiveStorageErrors(t *testing.T) {
opts := &QueryServiceOptions{}
logger := zap.NewNop()

assert.False(t, opts.InitArchiveStorage(new(fakeStorageFactory1), logger))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{rErr: storage.ErrArchiveStorageNotConfigured},
logger,
))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{rErr: errors.New("error")},
logger,
))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{wErr: storage.ErrArchiveStorageNotConfigured},
logger,
))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{wErr: errors.New("error")},
logger,
))
}

func TestInitArchiveStorage(t *testing.T) {
opts := &QueryServiceOptions{}
logger := zap.NewNop()
reader := &spanstoremocks.Reader{}
writer := &spanstoremocks.Writer{}
assert.True(t, opts.InitArchiveStorage(
&fakeStorageFactory2{r: reader, w: writer},
logger,
))
assert.Equal(t, reader, opts.ArchiveSpanReader)
assert.Equal(t, writer, opts.ArchiveSpanWriter)
}
var _ storage.Factory = new(fakeStorageFactory1)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/token_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func runQueryService(t *testing.T, esURL string) *Server {
}))
f.InitFromViper(v, flagsSvc.Logger)
// set AllowTokenFromContext manually because we don't register the respective CLI flag from query svc
f.Options.Primary.Authentication.BearerTokenAuthentication.AllowFromContext = true
f.Options.Config.Authentication.BearerTokenAuthentication.AllowFromContext = true
require.NoError(t, f.Initialize(telset.Metrics, telset.Logger))
defer f.Close()

Expand Down
10 changes: 8 additions & 2 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,19 @@ func main() {
if err != nil {
logger.Fatal("Failed to create metrics query service", zap.Error(err))
}
queryServiceOptions := queryOpts.BuildQueryServiceOptions(storageFactory, logger)
queryServiceOptions := queryOpts.BuildQueryServiceOptions(
storageFactory.InitArchiveStorage,
logger,
)
queryService := querysvc.NewQueryService(
traceReader,
dependencyReader,
*queryServiceOptions)

queryServiceOptionsV2 := queryOpts.BuildQueryServiceOptionsV2(storageFactory, logger)
queryServiceOptionsV2 := queryOpts.BuildQueryServiceOptionsV2(
storageFactory.InitArchiveStorage,
logger,
)
queryServiceV2 := querysvcv2.NewQueryService(
traceReader,
dependencyReader,
Expand Down
Loading

0 comments on commit d7ab0f8

Please sign in to comment.