diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 161bcb1159e..3678946f321 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -25,12 +25,6 @@ updates: interval: daily labels: [ "changelog:dependencies" ] - - package-ecosystem: "github-actions" - directory: "/" - schedule: - interval: "weekly" - labels: [ "changelog:dependencies" ] - - package-ecosystem: docker directories: - /cmd/agent diff --git a/.github/workflows/ci-all-in-one-build.yml b/.github/workflows/ci-docker-all-in-one.yml similarity index 100% rename from .github/workflows/ci-all-in-one-build.yml rename to .github/workflows/ci-docker-all-in-one.yml diff --git a/.github/workflows/ci-hotrod.yml b/.github/workflows/ci-docker-hotrod.yml similarity index 75% rename from .github/workflows/ci-hotrod.yml rename to .github/workflows/ci-docker-hotrod.yml index 16f39344404..a083953d59a 100644 --- a/.github/workflows/ci-hotrod.yml +++ b/.github/workflows/ci-docker-hotrod.yml @@ -39,17 +39,25 @@ jobs: - name: Export BRANCH variable uses: ./.github/actions/setup-branch - - name: Install tools - run: make install-ci - - uses: docker/setup-qemu-action@5927c834f5b4fdf503fca6f4c7eccda82949e1ee # v3.1.0 + - name: Define BUILD_FLAGS var if running on a Pull Request + run: | + case ${GITHUB_EVENT_NAME} in + pull_request) + echo "BUILD_FLAGS=-l -p linux/amd64" >> ${GITHUB_ENV} + ;; + *) + echo "BUILD_FLAGS=" >> ${GITHUB_ENV} + ;; + esac + - name: Build, test, and publish hotrod image - run: bash scripts/hotrod-integration-test.sh + run: bash scripts/hotrod-integration-test.sh ${{ env.BUILD_FLAGS }} env: DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} QUAY_TOKEN: ${{ secrets.QUAY_TOKEN }} - name: Print logs from hotrod - run: docker logs example-hotrod + run: docker compose -f ./examples/hotrod/docker-compose.yml logs if: failure() diff --git a/.github/workflows/ci-badger.yaml b/.github/workflows/ci-e2e-badger.yaml similarity index 100% rename from .github/workflows/ci-badger.yaml rename to .github/workflows/ci-e2e-badger.yaml diff --git a/.github/workflows/ci-cassandra.yml b/.github/workflows/ci-e2e-cassandra.yml similarity index 100% rename from .github/workflows/ci-cassandra.yml rename to .github/workflows/ci-e2e-cassandra.yml diff --git a/.github/workflows/ci-elasticsearch.yml b/.github/workflows/ci-e2e-elasticsearch.yml similarity index 100% rename from .github/workflows/ci-elasticsearch.yml rename to .github/workflows/ci-e2e-elasticsearch.yml diff --git a/.github/workflows/ci-grpc.yml b/.github/workflows/ci-e2e-grpc.yml similarity index 100% rename from .github/workflows/ci-grpc.yml rename to .github/workflows/ci-e2e-grpc.yml diff --git a/.github/workflows/ci-kafka.yml b/.github/workflows/ci-e2e-kafka.yml similarity index 100% rename from .github/workflows/ci-kafka.yml rename to .github/workflows/ci-e2e-kafka.yml diff --git a/.github/workflows/ci-e2e-memory.yaml b/.github/workflows/ci-e2e-memory.yaml new file mode 100644 index 00000000000..3a8dce7d370 --- /dev/null +++ b/.github/workflows/ci-e2e-memory.yaml @@ -0,0 +1,41 @@ +name: CIT Memory + +on: + push: + branches: [main] + + pull_request: + branches: [main] + +concurrency: + group: ${{ github.workflow }}-${{ (github.event.pull_request && github.event.pull_request.number) || github.ref || github.run_id }} + cancel-in-progress: true + +# See https://github.com/ossf/scorecard/blob/main/docs/checks.md#token-permissions +permissions: # added using https://github.com/step-security/secure-workflows + contents: read + +jobs: + memory-v2: + runs-on: ubuntu-latest + steps: + - name: Harden Runner + uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6 # v2.8.1 + with: + egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs + + - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 + + - uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2 + with: + go-version: 1.22.x + + - name: Run Memory storage integration tests + run: | + STORAGE=memory_v2 make jaeger-v2-storage-integration-test + + - name: Upload coverage to codecov + uses: ./.github/actions/upload-codecov + with: + files: cover.out + flags: memory_v2 diff --git a/.github/workflows/ci-opensearch.yml b/.github/workflows/ci-e2e-opensearch.yml similarity index 100% rename from .github/workflows/ci-opensearch.yml rename to .github/workflows/ci-e2e-opensearch.yml diff --git a/.github/workflows/validate-dependabot-config.yml b/.github/workflows/ci-lint-dependabot-config.yml similarity index 100% rename from .github/workflows/validate-dependabot-config.yml rename to .github/workflows/ci-lint-dependabot-config.yml diff --git a/.github/workflows/ci-protogen-tests.yml b/.github/workflows/ci-lint-protogen.yml similarity index 100% rename from .github/workflows/ci-protogen-tests.yml rename to .github/workflows/ci-lint-protogen.yml diff --git a/.github/workflows/ci-release-testing.yml b/.github/workflows/ci-release-testing.yml deleted file mode 100644 index 9d2295f2971..00000000000 --- a/.github/workflows/ci-release-testing.yml +++ /dev/null @@ -1,95 +0,0 @@ -name: Publish release - -on: - # allow running release workflow manually - workflow_dispatch: - -# See https://github.com/jaegertracing/jaeger/issues/4017 -permissions: - contents: read - -jobs: - publish-release: - permissions: - contents: write - - runs-on: ubuntu-latest - - steps: - - - name: Disk size - run: df -g / - - - name: Harden Runner - uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6 # v2.8.1 - with: - egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs - - - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 - with: - submodules: true - - - name: Fetch git tags - run: | - git fetch --prune --unshallow --tags - - - uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2 - with: - go-version: 1.22.x - - - name: Setup Node.js version - uses: ./.github/actions/setup-node.js - - - name: Export BRANCH variable and validate it is a semver - # Many scripts depend on BRANCH variable. We do not want to - # use ./.github/actions/setup-branch here because it may set - # BRANCH=main when the workflow is triggered manually. - run: | - BRANCH=$(make echo-version) - echo "BRANCH=${BRANCH}" >> ${GITHUB_ENV} - echo Validate that the latest tag ${BRANCH} is in semver format - echo ${BRANCH} | grep -E '^v[0-9]+.[0-9]+.[0-9]+$' - - - name: Install tools - run: make install-ci - - - name: Configure GPG Key - id: import_gpg - uses: crazy-max/ghaction-import-gpg@01dd5d3ca463c7f10f7f4f7b4f177225ac661ee4 # v6.1.0 - with: - gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} - passphrase: ${{ secrets.GPG_PASSPHRASE }} - - - name: Build binaries - run: make build-all-platforms - - - name: Package binaries - id: package-binaries - run: bash scripts/package-deploy.sh - - - uses: docker/setup-qemu-action@5927c834f5b4fdf503fca6f4c7eccda82949e1ee # v3.1.0 - - - name: Build all container images (do not push) - run: bash scripts/build-upload-docker-images.sh -l - env: - DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} - QUAY_TOKEN: ${{ secrets.QUAY_TOKEN }} - - - name: Build and test all-in-one image (do not push) - run: bash scripts/build-all-in-one-image.sh -l - env: - DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} - QUAY_TOKEN: ${{ secrets.QUAY_TOKEN }} - - - name: Build, test, and publish hotrod image - run: bash scripts/hotrod-integration-test.sh - env: - DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} - QUAY_TOKEN: ${{ secrets.QUAY_TOKEN }} - - - name: Generate SBOM - uses: anchore/sbom-action@95b086ac308035dc0850b3853be5b7ab108236a8 # v0.16.1 - with: - output-file: jaeger-SBOM.spdx.json - upload-release-assets: false - upload-artifact: false diff --git a/.github/workflows/ci-release.yml b/.github/workflows/ci-release.yml index 580c6bc6606..448637c6146 100644 --- a/.github/workflows/ci-release.yml +++ b/.github/workflows/ci-release.yml @@ -112,7 +112,7 @@ jobs: QUAY_TOKEN: ${{ secrets.QUAY_TOKEN }} - name: Generate SBOM - uses: anchore/sbom-action@95b086ac308035dc0850b3853be5b7ab108236a8 # v0.16.1 + uses: anchore/sbom-action@d94f46e13c6c62f59525ac9a1e147a99dc0b9bf5 # v0.17.0 with: output-file: jaeger-SBOM.spdx.json upload-release-assets: false diff --git a/.github/workflows/ci-label-check.yml b/.github/workflows/label-check.yml similarity index 100% rename from .github/workflows/ci-label-check.yml rename to .github/workflows/label-check.yml diff --git a/cmd/jaeger/config.yaml b/cmd/jaeger/config.yaml index 3be3a37af5d..dba80824d2d 100644 --- a/cmd/jaeger/config.yaml +++ b/cmd/jaeger/config.yaml @@ -1,9 +1,9 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp, jaeger, zipkin] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: @@ -14,16 +14,28 @@ extensions: # endpoint: 0.0.0.0:55679 jaeger_query: - trace_storage: memstore - trace_storage_archive: memstore_archive + trace_storage: some_store + trace_storage_archive: another_store ui_config: ./cmd/jaeger/config-ui.json jaeger_storage: - memory: - memstore: - max_traces: 100000 - memstore_archive: - max_traces: 100000 + backends: + some_store: + memory: + max_traces: 100000 + another_store: + memory: + max_traces: 100000 + + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + sampling_store: some_store + initial_sampling_probability: 0.1 + http: + grpc: receivers: otlp: @@ -42,7 +54,10 @@ receivers: processors: batch: + # Adaptive Sampling Processor is required to support adaptive sampling. + # It expects remote_sampling extension with `adaptive:` config to be enabled. + adaptive_sampling: exporters: jaeger_storage_exporter: - trace_storage: memstore + trace_storage: some_store diff --git a/cmd/jaeger/internal/components.go b/cmd/jaeger/internal/components.go index f187e731c07..7a615785b4d 100644 --- a/cmd/jaeger/internal/components.go +++ b/cmd/jaeger/internal/components.go @@ -30,7 +30,9 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/processors/adaptivesampling" ) type builders struct { @@ -63,7 +65,7 @@ func (b builders) build() (otelcol.Factories, error) { jaegerquery.NewFactory(), jaegerstorage.NewFactory(), storagecleaner.NewFactory(), - // TODO add adaptive sampling + remotesampling.NewFactory(), ) if err != nil { return otelcol.Factories{}, err @@ -101,7 +103,7 @@ func (b builders) build() (otelcol.Factories, error) { batchprocessor.NewFactory(), memorylimiterprocessor.NewFactory(), // add-ons - // TODO add adaptive sampling + adaptivesampling.NewFactory(), ) if err != nil { return otelcol.Factories{}, err diff --git a/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go b/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go index bdd1ce81a7c..e25acce59a5 100644 --- a/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go +++ b/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go @@ -132,8 +132,7 @@ func TestExporter(t *testing.T) { host := makeStorageExtension(t, memstoreName) - err = tracesExporter.Start(ctx, host) - require.NoError(t, err) + require.NoError(t, tracesExporter.Start(ctx, host)) defer func() { require.NoError(t, tracesExporter.Shutdown(ctx)) }() diff --git a/cmd/jaeger/internal/extension/jaegerquery/server_test.go b/cmd/jaeger/internal/extension/jaegerquery/server_test.go index c5107c4339d..34a7757906a 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -108,27 +109,6 @@ func (fakeStorageExt) Shutdown(context.Context) error { return nil } -type storageHost struct { - extension component.Component -} - -func (storageHost) ReportFatalError(error) { -} - -func (host storageHost) GetExtensions() map[component.ID]component.Component { - return map[component.ID]component.Component{ - jaegerstorage.ID: host.extension, - } -} - -func (storageHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { - return nil -} - -func (storageHost) GetExporters() map[component.DataType]map[component.ID]component.Component { - return nil -} - func TestServerDependencies(t *testing.T) { expectedDependencies := []component.ID{jaegerstorage.ID} telemetrySettings := component.TelemetrySettings{ @@ -142,9 +122,7 @@ func TestServerDependencies(t *testing.T) { } func TestServerStart(t *testing.T) { - host := storageHost{ - extension: fakeStorageExt{}, - } + host := storagetest.NewStorageHost().WithExtension(jaegerstorage.ID, fakeStorageExt{}) tests := []struct { name string config *Config @@ -279,7 +257,7 @@ func TestServerAddArchiveStorage(t *testing.T) { } server := newServer(tt.config, telemetrySettings) if tt.extension != nil { - host = storageHost{extension: tt.extension} + host = storagetest.NewStorageHost().WithExtension(jaegerstorage.ID, tt.extension) } err := server.addArchiveStorage(tt.qSvcOpts, host) if tt.expectedErr == "" { diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index a93f8eb178d..8342e8e2070 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -57,7 +57,7 @@ func GetStorageFactory(name string, host component.Host) (storage.Factory, error f, ok := comp.(Extension).Factory(name) if !ok { return nil, fmt.Errorf( - "cannot find storage '%s' declared by '%s' extension", + "cannot find definition of storage '%s' in the configuration for extension '%s'", name, componentType, ) } diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go index 4d42ff4289f..b05ef36474a 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go @@ -11,6 +11,7 @@ import ( "net/http/httptest" "testing" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -31,26 +32,6 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) -type storageHost struct { - t *testing.T - ext component.Component -} - -func (host storageHost) GetExtensions() map[component.ID]component.Component { - return map[component.ID]component.Component{ - ID: host.ext, - } -} - -func (host storageHost) ReportFatalError(err error) { - host.t.Fatal(err) -} - -func (storageHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } -func (storageHost) GetExporters() map[component.DataType]map[component.ID]component.Component { - return nil -} - type errorFactory struct { closeErr error } @@ -81,9 +62,9 @@ func TestStorageFactoryBadHostError(t *testing.T) { } func TestStorageFactoryBadNameError(t *testing.T) { - host := storageHost{t: t, ext: startStorageExtension(t, "foo")} + host := storagetest.NewStorageHost().WithExtension(ID, startStorageExtension(t, "foo")) _, err := GetStorageFactory("bar", host) - require.ErrorContains(t, err, "cannot find storage 'bar'") + require.ErrorContains(t, err, "cannot find definition of storage 'bar'") } func TestStorageFactoryBadShutdownError(t *testing.T) { @@ -105,7 +86,7 @@ func TestGetFactoryV2Error(t *testing.T) { func TestGetFactory(t *testing.T) { const name = "foo" - host := storageHost{t: t, ext: startStorageExtension(t, name)} + host := storagetest.NewStorageHost().WithExtension(ID, startStorageExtension(t, name)) f, err := GetStorageFactory(name, host) require.NoError(t, err) require.NotNil(t, f) diff --git a/cmd/jaeger/internal/extension/remotesampling/config.go b/cmd/jaeger/internal/extension/remotesampling/config.go new file mode 100644 index 00000000000..edff4707e26 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/config.go @@ -0,0 +1,91 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "errors" + + "github.com/asaskevich/govalidator" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/confmap" + + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" +) + +var ( + errNoProvider = errors.New("no sampling strategy provider specified, expecting 'adaptive' or 'file'") + errMultipleProviders = errors.New("only one sampling strategy provider can be specified, 'adaptive' or 'file'") +) + +var ( + _ component.Config = (*Config)(nil) + _ component.ConfigValidator = (*Config)(nil) + _ confmap.Unmarshaler = (*Config)(nil) +) + +type Config struct { + File *FileConfig `mapstructure:"file"` + Adaptive *AdaptiveConfig `mapstructure:"adaptive"` + HTTP *confighttp.ServerConfig `mapstructure:"http"` + GRPC *configgrpc.ServerConfig `mapstructure:"grpc"` +} + +type FileConfig struct { + // File specifies a local file as the source of sampling strategies. + Path string `valid:"required" mapstructure:"path"` +} + +type AdaptiveConfig struct { + // SamplingStore is the name of the storage defined in the jaegerstorage extension. + SamplingStore string `valid:"required" mapstructure:"sampling_store"` + + adaptive.Options `mapstructure:",squash"` +} + +// Unmarshal is a custom unmarshaler that allows the factory to provide default values +// for nested configs (like GRPC endpoint) yes still reset the pointers to nil if the +// config did not contain the corresponding sections. +// This is a workaround for the lack of opional fields support in OTEL confmap. +// Issue: https://github.com/open-telemetry/opentelemetry-collector/issues/10266 +func (cfg *Config) Unmarshal(conf *confmap.Conf) error { + // first load the config normally + err := conf.Unmarshal(cfg) + if err != nil { + return err + } + + // use string names of fields to see if they are set in the confmap + if !conf.IsSet("file") { + cfg.File = nil + } + + if !conf.IsSet("adaptive") { + cfg.Adaptive = nil + } + + if !conf.IsSet("grpc") { + cfg.GRPC = nil + } + + if !conf.IsSet("http") { + cfg.HTTP = nil + } + + return nil +} + +func (cfg *Config) Validate() error { + if cfg.File == nil && cfg.Adaptive == nil { + return errNoProvider + } + + if cfg.File != nil && cfg.Adaptive != nil { + return errMultipleProviders + } + + _, err := govalidator.ValidateStruct(cfg) + return err +} diff --git a/cmd/jaeger/internal/extension/remotesampling/config_test.go b/cmd/jaeger/internal/extension/remotesampling/config_test.go new file mode 100644 index 00000000000..c5071423c19 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/config_test.go @@ -0,0 +1,134 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" +) + +func Test_Validate(t *testing.T) { + tests := []struct { + name string + config *Config + expectedErr string + }{ + { + name: "No provider specified", + config: &Config{}, + expectedErr: "no sampling strategy provider specified, expecting 'adaptive' or 'file'", + }, + { + name: "Both providers specified", + config: &Config{ + File: &FileConfig{Path: "test-path"}, + Adaptive: &AdaptiveConfig{SamplingStore: "test-store"}, + }, + expectedErr: "only one sampling strategy provider can be specified, 'adaptive' or 'file'", + }, + { + name: "Only File provider specified", + config: &Config{ + File: &FileConfig{Path: "test-path"}, + }, + expectedErr: "", + }, + { + name: "Only Adaptive provider specified", + config: &Config{ + Adaptive: &AdaptiveConfig{SamplingStore: "test-store"}, + }, + expectedErr: "", + }, + { + name: "Invalid File provider", + config: &Config{ + File: &FileConfig{Path: ""}, + }, + expectedErr: "File.Path: non zero value required", + }, + { + name: "Invalid Adaptive provider", + config: &Config{ + Adaptive: &AdaptiveConfig{SamplingStore: ""}, + }, + expectedErr: "Adaptive.SamplingStore: non zero value required", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.Validate() + if tt.expectedErr == "" { + require.NoError(t, err) + } else { + assert.Equal(t, tt.expectedErr, err.Error()) + } + }) + } +} + +func Test_Unmarshal(t *testing.T) { + tests := []struct { + name string + input map[string]any + expectedCfg *Config + expectedErr string + }{ + { + name: "Valid config with File", + input: map[string]any{ + "file": map[string]any{ + "path": "test-path", + }, + }, + expectedCfg: &Config{ + File: &FileConfig{Path: "test-path"}, + }, + expectedErr: "", + }, + { + name: "Valid config with Adaptive", + input: map[string]any{ + "adaptive": map[string]any{ + "sampling_store": "test-store", + }, + }, + expectedCfg: &Config{ + Adaptive: &AdaptiveConfig{SamplingStore: "test-store"}, + }, + expectedErr: "", + }, + { + name: "Empty config", + input: map[string]any{}, + expectedCfg: &Config{}, + expectedErr: "", + }, + { + name: "Invalid config", + input: map[string]any{ + "foo": "bar", + }, + expectedErr: "invalid keys: foo", // sensitive to lib implementation + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.input) + var cfg Config + err := cfg.Unmarshal(conf) + if tt.expectedErr == "" { + require.NoError(t, err) + assert.Equal(t, tt.expectedCfg, &cfg) + } else { + assert.ErrorContains(t, err, tt.expectedErr) + } + }) + } +} diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go new file mode 100644 index 00000000000..58552d0eaab --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -0,0 +1,296 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp" + "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/static" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage/samplingstore" +) + +var _ extension.Extension = (*rsExtension)(nil) + +// type Extension interface { +// extension.Extension +// // rs *rsExtension +// } + +const defaultResourceName = "sampling_store_leader" + +type rsExtension struct { + cfg *Config + telemetry component.TelemetrySettings + httpServer *http.Server + grpcServer *grpc.Server + strategyProvider samplingstrategy.Provider // TODO we should rename this to Provider, not "store" + adaptiveStore samplingstore.Store + distLock *leaderelection.DistributedElectionParticipant + shutdownWG sync.WaitGroup +} + +func newExtension(cfg *Config, telemetry component.TelemetrySettings) *rsExtension { + return &rsExtension{ + cfg: cfg, + telemetry: telemetry, + } +} + +// AdaptiveSamplingComponents is a struct that holds the components needed for adaptive sampling. +type AdaptiveSamplingComponents struct { + SamplingStore samplingstore.Store + DistLock *leaderelection.DistributedElectionParticipant + Options *adaptive.Options +} + +// GetAdaptiveSamplingComponents locates the `remotesampling` extension in Host +// and returns the sampling store and a loader/follower implementation, provided +// that the extension is configured with adaptive sampling (vs. file-based config). +func GetAdaptiveSamplingComponents(host component.Host) (*AdaptiveSamplingComponents, error) { + var comp component.Component + var compID component.ID + for id, ext := range host.GetExtensions() { + if id.Type() == ComponentType { + comp = ext + compID = id + break + } + } + if comp == nil { + return nil, fmt.Errorf( + "cannot find extension '%s' (make sure it's defined earlier in the config)", + ComponentType, + ) + } + ext, ok := comp.(*rsExtension) + if !ok { + return nil, fmt.Errorf("extension '%s' is not of type '%s'", compID, ComponentType) + } + if ext.adaptiveStore == nil || ext.distLock == nil { + return nil, fmt.Errorf("extension '%s' is not configured for adaptive sampling", compID) + } + return &AdaptiveSamplingComponents{ + SamplingStore: ext.adaptiveStore, + DistLock: ext.distLock, + Options: &ext.cfg.Adaptive.Options, + }, nil +} + +func (ext *rsExtension) Start(ctx context.Context, host component.Host) error { + if ext.cfg.File != nil { + ext.telemetry.Logger.Info( + "Starting file-based sampling strategy provider", + zap.String("path", ext.cfg.File.Path), + ) + if err := ext.startFileBasedStrategyProvider(ctx); err != nil { + return err + } + } + + if ext.cfg.Adaptive != nil { + ext.telemetry.Logger.Info( + "Starting adaptive sampling strategy provider", + zap.String("sampling_store", ext.cfg.Adaptive.SamplingStore), + ) + if err := ext.startAdaptiveStrategyProvider(host); err != nil { + return err + } + } + + if ext.cfg.HTTP != nil { + if err := ext.startHTTPServer(ctx, host); err != nil { + return fmt.Errorf("failed to start sampling http server: %w", err) + } + } + + if ext.cfg.GRPC != nil { + if err := ext.startGRPCServer(ctx, host); err != nil { + return fmt.Errorf("failed to start sampling gRPC server: %w", err) + } + } + + return nil +} + +func (ext *rsExtension) Shutdown(ctx context.Context) error { + var errs []error + + if ext.httpServer != nil { + if err := ext.httpServer.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("failed to stop the sampling HTTP server: %w", err)) + } + } + + if ext.grpcServer != nil { + ext.grpcServer.GracefulStop() + } + + if ext.distLock != nil { + if err := ext.distLock.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to stop the distributed lock: %w", err)) + } + } + + if ext.strategyProvider != nil { + if err := ext.strategyProvider.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to stop strategy provider: %w", err)) + } + } + return errors.Join(errs...) +} + +func (ext *rsExtension) startFileBasedStrategyProvider(_ context.Context) error { + opts := static.Options{ + StrategiesFile: ext.cfg.File.Path, + } + + // contextcheck linter complains about next line that context is not passed. + //nolint + provider, err := static.NewProvider(opts, ext.telemetry.Logger) + if err != nil { + return fmt.Errorf("failed to create the local file strategy store: %w", err) + } + + ext.strategyProvider = provider + return nil +} + +func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error { + storageName := ext.cfg.Adaptive.SamplingStore + f, err := jaegerstorage.GetStorageFactory(storageName, host) + if err != nil { + return fmt.Errorf("cannot find storage factory: %w", err) + } + + storeFactory, ok := f.(storage.SamplingStoreFactory) + if !ok { + return fmt.Errorf("storage '%s' does not support sampling store", storageName) + } + + store, err := storeFactory.CreateSamplingStore(ext.cfg.Adaptive.AggregationBuckets) + if err != nil { + return fmt.Errorf("failed to create the sampling store: %w", err) + } + ext.adaptiveStore = store + + { + lock, err := storeFactory.CreateLock() + if err != nil { + return fmt.Errorf("failed to create the distributed lock: %w", err) + } + + ep := leaderelection.NewElectionParticipant(lock, defaultResourceName, + leaderelection.ElectionParticipantOptions{ + LeaderLeaseRefreshInterval: ext.cfg.Adaptive.LeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: ext.cfg.Adaptive.FollowerLeaseRefreshInterval, + Logger: ext.telemetry.Logger, + }) + if err := ep.Start(); err != nil { + return fmt.Errorf("failed to start the leader election participant: %w", err) + } + ext.distLock = ep + } + + provider := adaptive.NewProvider(ext.cfg.Adaptive.Options, ext.telemetry.Logger, ext.distLock, store) + if err := provider.Start(); err != nil { + return fmt.Errorf("failed to start the adaptive strategy store: %w", err) + } + ext.strategyProvider = provider + return nil +} + +func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host) error { + handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{ + ConfigManager: &clientcfghttp.ConfigManager{ + SamplingProvider: ext.strategyProvider, + }, + MetricsFactory: metrics.NullFactory, + BasePath: "/api", // TODO is /api correct? + }) + httpMux := http.NewServeMux() + handler.RegisterRoutesWithHTTP(httpMux) + + var err error + if ext.httpServer, err = ext.cfg.HTTP.ToServer(ctx, host, ext.telemetry, httpMux); err != nil { + return err + } + + ext.telemetry.Logger.Info( + "Starting remote sampling HTTP server", + zap.String("endpoint", ext.cfg.HTTP.Endpoint), + ) + var hln net.Listener + if hln, err = ext.cfg.HTTP.ToListener(ctx); err != nil { + return err + } + + ext.shutdownWG.Add(1) + go func() { + defer ext.shutdownWG.Done() + + err := ext.httpServer.Serve(hln) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + ext.telemetry.ReportStatus(component.NewFatalErrorEvent(err)) + } + }() + + return nil +} + +func (ext *rsExtension) startGRPCServer(ctx context.Context, host component.Host) error { + var err error + if ext.grpcServer, err = ext.cfg.GRPC.ToServer(ctx, host, ext.telemetry); err != nil { + return err + } + + api_v2.RegisterSamplingManagerServer(ext.grpcServer, sampling.NewGRPCHandler(ext.strategyProvider)) + + healthServer := health.NewServer() // support health checks on the gRPC server + healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(ext.grpcServer, healthServer) + + ext.telemetry.Logger.Info( + "Starting remote sampling GRPC server", + zap.String("endpoint", ext.cfg.GRPC.NetAddr.Endpoint), + ) + var gln net.Listener + if gln, err = ext.cfg.GRPC.NetAddr.Listen(ctx); err != nil { + return err + } + + ext.shutdownWG.Add(1) + go func() { + defer ext.shutdownWG.Done() + if err := ext.grpcServer.Serve(gln); err != nil && !errors.Is(err, grpc.ErrServerStopped) { + ext.telemetry.ReportStatus(component.NewFatalErrorEvent(err)) + } + }() + + return nil +} + +func (*rsExtension) Dependencies() []component.ID { + return []component.ID{jaegerstorage.ID} +} diff --git a/cmd/jaeger/internal/extension/remotesampling/extension_test.go b/cmd/jaeger/internal/extension/remotesampling/extension_test.go new file mode 100644 index 00000000000..de8cb300e24 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/extension_test.go @@ -0,0 +1,252 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "io" + "net/http" + "path/filepath" + "testing" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/extension" + noopmetric "go.opentelemetry.io/otel/metric/noop" + nooptrace "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" +) + +func makeStorageExtension(t *testing.T, memstoreName string) component.Host { + telemetrySettings := component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + TracerProvider: nooptrace.NewTracerProvider(), + MeterProvider: noopmetric.NewMeterProvider(), + } + extensionFactory := jaegerstorage.NewFactory() + storageExtension, err := extensionFactory.CreateExtension( + context.Background(), + extension.Settings{ + TelemetrySettings: telemetrySettings, + }, + &jaegerstorage.Config{Backends: map[string]jaegerstorage.Backend{ + memstoreName: {Memory: &memory.Configuration{MaxTraces: 10000}}, + }}, + ) + require.NoError(t, err) + + host := storagetest.NewStorageHost() + host.WithExtension(jaegerstorage.ID, storageExtension) + + err = storageExtension.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, storageExtension.Shutdown(context.Background())) }) + return host +} + +func makeRemoteSamplingExtension(t *testing.T, cfg component.Config) component.Host { + extensionFactory := NewFactory() + samplingExtension, err := extensionFactory.CreateExtension( + context.Background(), + extension.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.L(), + TracerProvider: nooptrace.NewTracerProvider(), + }, + }, + cfg, + ) + require.NoError(t, err) + host := storagetest.NewStorageHost().WithExtension(ID, samplingExtension) + storageHost := makeStorageExtension(t, "foobar") + + require.NoError(t, samplingExtension.Start(context.Background(), storageHost)) + t.Cleanup(func() { require.NoError(t, samplingExtension.Shutdown(context.Background())) }) + return host +} + +func TestStartFileBasedProvider(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + cfg.Adaptive = nil + cfg.HTTP = nil + cfg.GRPC = nil + require.NoError(t, cfg.Validate()) + + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + host := makeStorageExtension(t, "foobar") + require.NoError(t, ext.Start(context.Background(), host)) + require.NoError(t, ext.Shutdown(context.Background())) +} + +func TestStartHTTP(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + cfg.Adaptive = nil + cfg.HTTP = &confighttp.ServerConfig{ + Endpoint: "0.0.0.0:12345", + } + cfg.GRPC = nil + require.NoError(t, cfg.Validate()) + + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + host := makeStorageExtension(t, "foobar") + require.NoError(t, ext.Start(context.Background(), host)) + + resp, err := http.Get("http://0.0.0.0:12345/api/sampling?service=foo") + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusOK, resp.StatusCode) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + expectedResponse := `{ + "probabilisticSampling": { + "samplingRate": 0.8 + }, + "strategyType": 0 + }` + require.JSONEq(t, expectedResponse, string(body)) + + require.NoError(t, ext.Shutdown(context.Background())) +} + +func TestStartGRPC(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + cfg.Adaptive = nil + cfg.HTTP = nil + cfg.GRPC = &configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: "0.0.0.0:12346", + Transport: "tcp", + }, + } + require.NoError(t, cfg.Validate()) + + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + host := makeStorageExtension(t, "foobar") + require.NoError(t, ext.Start(context.Background(), host)) + + conn, err := grpc.NewClient("0.0.0.0:12346", grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + c := api_v2.NewSamplingManagerClient(conn) + response, err := c.GetSamplingStrategy(context.Background(), &api_v2.SamplingStrategyParameters{ServiceName: "foo"}) + require.NoError(t, err) + require.Equal(t, 0.8, response.ProbabilisticSampling.SamplingRate) + + require.NoError(t, ext.Shutdown(context.Background())) +} + +func TestStartAdaptiveProvider(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.File = nil + cfg.Adaptive.SamplingStore = "foobar" + cfg.HTTP = nil + cfg.GRPC = nil + require.NoError(t, cfg.Validate()) + + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + host := makeStorageExtension(t, "foobar") + require.NoError(t, ext.Start(context.Background(), host)) + require.NoError(t, ext.Shutdown(context.Background())) +} + +func TestStartAdaptiveStrategyProviderErrors(t *testing.T) { + host := storagetest.NewStorageHost() + ext := &rsExtension{ + cfg: &Config{ + Adaptive: &AdaptiveConfig{ + SamplingStore: "foobar", + }, + }, + } + err := ext.startAdaptiveStrategyProvider(host) + require.ErrorContains(t, err, "cannot find storage factory") +} + +func TestGetAdaptiveSamplingComponents(t *testing.T) { + // Success case + host := makeRemoteSamplingExtension(t, &Config{ + Adaptive: &AdaptiveConfig{ + SamplingStore: "foobar", + Options: adaptive.Options{ + FollowerLeaseRefreshInterval: 1, + LeaderLeaseRefreshInterval: 1, + AggregationBuckets: 1, + }, + }, + }) + + comps, err := GetAdaptiveSamplingComponents(host) + require.NoError(t, err) + assert.NotNil(t, comps.DistLock) + assert.NotNil(t, comps.SamplingStore) + assert.Equal(t, time.Duration(1), comps.Options.FollowerLeaseRefreshInterval) + assert.Equal(t, time.Duration(1), comps.Options.LeaderLeaseRefreshInterval) + assert.Equal(t, 1, comps.Options.AggregationBuckets) +} + +type wrongExtension struct{} + +func (*wrongExtension) Start(context.Context, component.Host) error { return nil } +func (*wrongExtension) Shutdown(context.Context) error { return nil } + +func TestGetAdaptiveSamplingComponentsErrors(t *testing.T) { + host := makeRemoteSamplingExtension(t, &Config{}) + _, err := GetAdaptiveSamplingComponents(host) + require.ErrorContains(t, err, "extension 'remote_sampling' is not configured for adaptive sampling") + + h1 := storagetest.NewStorageHost() + _, err = GetAdaptiveSamplingComponents(h1) + require.ErrorContains(t, err, "cannot find extension") + + h2 := h1.WithExtension(ID, &wrongExtension{}) + _, err = GetAdaptiveSamplingComponents(h2) + require.ErrorContains(t, err, "is not of type") +} + +func TestDependencies(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + assert.Equal(t, []component.ID{jaegerstorage.ID}, ext.(*rsExtension).Dependencies()) +} diff --git a/cmd/jaeger/internal/extension/remotesampling/factory.go b/cmd/jaeger/internal/extension/remotesampling/factory.go new file mode 100644 index 00000000000..e3a1c9a5756 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/factory.go @@ -0,0 +1,57 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/extension" + + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "github.com/jaegertracing/jaeger/ports" +) + +// ComponentType is the name of this extension in configuration. +var ComponentType = component.MustNewType("remote_sampling") + +var ID = component.NewID(ComponentType) + +// NewFactory creates a factory for the jaeger remote sampling extension. +func NewFactory() extension.Factory { + return extension.NewFactory( + ComponentType, + createDefaultConfig, + createExtension, + component.StabilityLevelBeta, + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + HTTP: &confighttp.ServerConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorHTTP + 100), + }, + GRPC: &configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorGRPC + 100), + Transport: confignet.TransportTypeTCP, + }, + }, + File: &FileConfig{ + Path: "", // path needs to be specified + }, + Adaptive: &AdaptiveConfig{ + SamplingStore: "", // storage name needs to be specified + Options: adaptive.DefaultOptions(), + }, + } +} + +func createExtension(_ context.Context, set extension.Settings, cfg component.Config) (extension.Extension, error) { + return newExtension(cfg.(*Config), set.TelemetrySettings), nil +} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory_test.go b/cmd/jaeger/internal/extension/remotesampling/factory_test.go similarity index 71% rename from cmd/jaeger/internal/integration/receivers/storagereceiver/factory_test.go rename to cmd/jaeger/internal/extension/remotesampling/factory_test.go index 8b94fa2304b..a6de996e02e 100644 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory_test.go +++ b/cmd/jaeger/internal/extension/remotesampling/factory_test.go @@ -1,7 +1,7 @@ // Copyright (c) 2024 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 -package storagereceiver +package remotesampling import ( "context" @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/extension/extensiontest" ) func TestCreateDefaultConfig(t *testing.T) { @@ -19,10 +19,10 @@ func TestCreateDefaultConfig(t *testing.T) { require.NoError(t, componenttest.CheckConfigStruct(cfg)) } -func TestCreateTracesReceiver(t *testing.T) { +func TestCreateExtension(t *testing.T) { cfg := createDefaultConfig().(*Config) f := NewFactory() - r, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil) + r, err := f.CreateExtension(context.Background(), extensiontest.NewNopSettings(), cfg) require.NoError(t, err) assert.NotNil(t, r) } diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/package_test.go b/cmd/jaeger/internal/extension/remotesampling/package_test.go similarity index 90% rename from cmd/jaeger/internal/integration/receivers/storagereceiver/package_test.go rename to cmd/jaeger/internal/extension/remotesampling/package_test.go index 4dbecd011d3..5bd9ea71735 100644 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/package_test.go +++ b/cmd/jaeger/internal/extension/remotesampling/package_test.go @@ -1,7 +1,7 @@ // Copyright (c) 2024 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 -package storagereceiver +package remotesampling import ( "testing" diff --git a/cmd/jaeger/internal/integration/e2e_integration.go b/cmd/jaeger/internal/integration/e2e_integration.go index 6ddcb538cff..670b7b576d4 100644 --- a/cmd/jaeger/internal/integration/e2e_integration.go +++ b/cmd/jaeger/internal/integration/e2e_integration.go @@ -177,6 +177,7 @@ func createStorageCleanerConfig(t *testing.T, configFile string, storage string) func purge(t *testing.T) { addr := fmt.Sprintf("http://0.0.0.0:%s%s", storagecleaner.Port, storagecleaner.URL) + t.Logf("Purging storage via %s", addr) r, err := http.NewRequestWithContext(context.Background(), http.MethodPost, addr, nil) require.NoError(t, err) @@ -185,6 +186,8 @@ func purge(t *testing.T) { resp, err := client.Do(r) require.NoError(t, err) defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, http.StatusOK, resp.StatusCode, "body: %s", string(body)) } diff --git a/cmd/jaeger/internal/integration/es_test.go b/cmd/jaeger/internal/integration/elasticsearch_test.go similarity index 93% rename from cmd/jaeger/internal/integration/es_test.go rename to cmd/jaeger/internal/integration/elasticsearch_test.go index 92b9f0c79e0..2c78bfdb7af 100644 --- a/cmd/jaeger/internal/integration/es_test.go +++ b/cmd/jaeger/internal/integration/elasticsearch_test.go @@ -9,7 +9,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/integration" ) -func TestESStorage(t *testing.T) { +func TestElasticsearchStorage(t *testing.T) { integration.SkipUnlessEnv(t, "elasticsearch") s := &E2EStorageIntegration{ diff --git a/cmd/jaeger/internal/integration/memory_test.go b/cmd/jaeger/internal/integration/memory_test.go new file mode 100644 index 00000000000..a2592b04a38 --- /dev/null +++ b/cmd/jaeger/internal/integration/memory_test.go @@ -0,0 +1,27 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "testing" + + "github.com/jaegertracing/jaeger/plugin/storage/integration" +) + +func TestMemoryStorage(t *testing.T) { + integration.SkipUnlessEnv(t, "memory_v2") + + s := &E2EStorageIntegration{ + ConfigFile: "../../config.yaml", + StorageIntegration: integration.StorageIntegration{ + SkipArchiveTest: true, + CleanUp: purge, + }, + } + s.e2eInitialize(t, "memory") + t.Cleanup(func() { + s.e2eCleanUp(t) + }) + s.RunAll(t) +} diff --git a/cmd/jaeger/internal/integration/os_test.go b/cmd/jaeger/internal/integration/opensearch_test.go similarity index 93% rename from cmd/jaeger/internal/integration/os_test.go rename to cmd/jaeger/internal/integration/opensearch_test.go index a5d3e945214..909c0be6510 100644 --- a/cmd/jaeger/internal/integration/os_test.go +++ b/cmd/jaeger/internal/integration/opensearch_test.go @@ -9,7 +9,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/integration" ) -func TestOSStorage(t *testing.T) { +func TestOpenSearchStorage(t *testing.T) { integration.SkipUnlessEnv(t, "opensearch") s := &E2EStorageIntegration{ ConfigFile: "../../config-opensearch.yaml", diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/README.md b/cmd/jaeger/internal/integration/receivers/storagereceiver/README.md deleted file mode 100644 index 30931adaf0f..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/README.md +++ /dev/null @@ -1,23 +0,0 @@ -# Storage Receiver - -`storagereceiver` is a fake receiver that creates an artificial stream of traces by: - -- repeatedly querying one of Jaeger storage backends for all traces (by service). -- tracking new traces / spans and passing them to the next component in the pipeline. - -# Getting Started - -The following settings are required: - -- `trace_storage` (no default): name of a storage backend defined in `jaegerstorage` extension - -The following settings can be optionally configured: - -- `pull_interval` (default = 0s): The delay between each iteration of pulling traces. - -```yaml -receivers: - jaeger_storage_receiver: - trace_storage: external-storage - pull_interval: 0s -``` diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/config.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/config.go deleted file mode 100644 index e9319b8991d..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/config.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "time" - - "github.com/asaskevich/govalidator" -) - -type Config struct { - TraceStorage string `valid:"required" mapstructure:"trace_storage"` - PullInterval time.Duration `mapstructure:"pull_interval"` -} - -func (cfg *Config) Validate() error { - _, err := govalidator.ValidateStruct(cfg) - return err -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/config_test.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/config_test.go deleted file mode 100644 index 98435b3e2cf..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/config_test.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "errors" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/confmap/confmaptest" -) - -func TestLoadConfig(t *testing.T) { - t.Parallel() - - cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) - require.NoError(t, err) - - tests := []struct { - id component.ID - expected component.Config - expectedErr error - }{ - { - id: component.NewIDWithName(componentType, ""), - expectedErr: errors.New("non zero value required"), - }, - { - id: component.NewIDWithName(componentType, "defaults"), - expected: &Config{ - TraceStorage: "storage", - PullInterval: 0, - }, - }, - { - id: component.NewIDWithName(componentType, "filled"), - expected: &Config{ - TraceStorage: "storage", - PullInterval: 2 * time.Second, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.id.String(), func(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - - sub, err := cm.Sub(tt.id.String()) - require.NoError(t, err) - require.NoError(t, sub.Unmarshal(cfg)) - - if tt.expectedErr != nil { - require.ErrorContains(t, component.ValidateConfig(cfg), tt.expectedErr.Error()) - } else { - require.NoError(t, component.ValidateConfig(cfg)) - assert.Equal(t, tt.expected, cfg) - } - }) - } -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go deleted file mode 100644 index 09abc498e72..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "context" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" -) - -// componentType is the name of this extension in configuration. -var componentType = component.MustNewType("jaeger_storage_receiver") - -// ID is the identifier of this extension. -var ID = component.NewID(componentType) - -func NewFactory() receiver.Factory { - return receiver.NewFactory( - componentType, - createDefaultConfig, - receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), - ) -} - -func createDefaultConfig() component.Config { - return &Config{} -} - -func createTracesReceiver(_ context.Context, set receiver.Settings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) { - cfg := config.(*Config) - - return newTracesReceiver(cfg, set, nextConsumer) -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go deleted file mode 100644 index d6f93ef5ccd..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "context" - "fmt" - "time" - - jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" - "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/storage/spanstore" -) - -type storageReceiver struct { - cancelConsumeLoop context.CancelFunc - config *Config - settings receiver.Settings - consumedTraces map[model.TraceID]*consumedTrace - nextConsumer consumer.Traces - spanReader spanstore.Reader -} - -type consumedTrace struct { - spanIDs map[model.SpanID]struct{} -} - -func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Traces) (*storageReceiver, error) { - return &storageReceiver{ - config: config, - settings: set, - consumedTraces: make(map[model.TraceID]*consumedTrace), - nextConsumer: nextConsumer, - }, nil -} - -func (r *storageReceiver) Start(ctx context.Context, host component.Host) error { - f, err := jaegerstorage.GetStorageFactory(r.config.TraceStorage, host) - if err != nil { - return fmt.Errorf("cannot find storage factory: %w", err) - } - - if r.spanReader, err = f.CreateSpanReader(); err != nil { - return fmt.Errorf("cannot create span reader: %w", err) - } - - ctx, cancel := context.WithCancel(ctx) - r.cancelConsumeLoop = cancel - - go func() { - if err := r.consumeLoop(ctx); err != nil { - r.settings.ReportStatus(component.NewFatalErrorEvent(err)) - } - }() - - return nil -} - -func (r *storageReceiver) consumeLoop(ctx context.Context) error { - for { - services, err := r.spanReader.GetServices(ctx) - if err != nil { - r.settings.Logger.Error("Failed to get services from consumer", zap.Error(err)) - return err - } - - for _, svc := range services { - if err := r.consumeTraces(ctx, svc); err != nil { - r.settings.Logger.Error("Failed to consume traces from consumer", zap.Error(err)) - } - } - - select { - case <-ctx.Done(): - r.settings.Logger.Info("Consumer stopped") - return nil - default: - time.Sleep(r.config.PullInterval) - } - } -} - -func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error { - endTime := time.Now() - traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{ - ServiceName: serviceName, - StartTimeMin: endTime.Add(-1 * time.Hour), - StartTimeMax: endTime, - }) - if err != nil { - return err - } - - for _, trace := range traces { - traceID := trace.Spans[0].TraceID - if _, ok := r.consumedTraces[traceID]; !ok { - r.consumedTraces[traceID] = &consumedTrace{ - spanIDs: make(map[model.SpanID]struct{}), - } - } - r.consumeSpans(ctx, r.consumedTraces[traceID], trace.Spans) - } - - return nil -} - -func (r *storageReceiver) consumeSpans(ctx context.Context, tc *consumedTrace, spans []*model.Span) error { - // Spans are consumed one at a time because we don't know whether all spans - // in a trace have been completely exported - for _, span := range spans { - if _, ok := tc.spanIDs[span.SpanID]; !ok { - tc.spanIDs[span.SpanID] = struct{}{} - td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{ - { - Spans: []*model.Span{span}, - Process: span.Process, - }, - }) - if err != nil { - return err - } - r.nextConsumer.ConsumeTraces(ctx, td) - } - } - - return nil -} - -func (r *storageReceiver) Shutdown(_ context.Context) error { - if r.cancelConsumeLoop != nil { - r.cancelConsumeLoop() - } - return nil -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/testdata/config.yaml b/cmd/jaeger/internal/integration/receivers/storagereceiver/testdata/config.yaml deleted file mode 100644 index e590e8f1694..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/testdata/config.yaml +++ /dev/null @@ -1,6 +0,0 @@ -jaeger_storage_receiver: -jaeger_storage_receiver/defaults: - trace_storage: storage -jaeger_storage_receiver/filled: - trace_storage: storage - pull_interval: 2s diff --git a/cmd/jaeger/internal/integration/storagecleaner/extension.go b/cmd/jaeger/internal/integration/storagecleaner/extension.go index 2a8b37016e6..c49d9b7bff0 100644 --- a/cmd/jaeger/internal/integration/storagecleaner/extension.go +++ b/cmd/jaeger/internal/integration/storagecleaner/extension.go @@ -13,6 +13,7 @@ import ( "github.com/gorilla/mux" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" "github.com/jaegertracing/jaeger/storage" @@ -29,15 +30,15 @@ const ( ) type storageCleaner struct { - config *Config - server *http.Server - settings component.TelemetrySettings + config *Config + server *http.Server + telset component.TelemetrySettings } -func newStorageCleaner(config *Config, telemetrySettings component.TelemetrySettings) *storageCleaner { +func newStorageCleaner(config *Config, telset component.TelemetrySettings) *storageCleaner { return &storageCleaner{ - config: config, - settings: telemetrySettings, + config: config, + telset: telset, } } @@ -74,10 +75,11 @@ func (c *storageCleaner) Start(_ context.Context, host component.Host) error { Handler: r, ReadHeaderTimeout: 3 * time.Second, } + c.telset.Logger.Info("Starting storage cleaner server", zap.String("addr", c.server.Addr)) go func() { if err := c.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { err = fmt.Errorf("error starting cleaner server: %w", err) - c.settings.ReportStatus(component.NewFatalErrorEvent(err)) + c.telset.ReportStatus(component.NewFatalErrorEvent(err)) } }() diff --git a/cmd/jaeger/internal/integration/storagecleaner/extension_test.go b/cmd/jaeger/internal/integration/storagecleaner/extension_test.go index 23d05c28e5c..b95b21b69f5 100644 --- a/cmd/jaeger/internal/integration/storagecleaner/extension_test.go +++ b/cmd/jaeger/internal/integration/storagecleaner/extension_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.uber.org/zap/zaptest" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" "github.com/jaegertracing/jaeger/storage" @@ -92,7 +93,9 @@ func TestStorageCleanerExtension(t *testing.T) { TraceStorage: "storage", Port: Port, } - s := newStorageCleaner(config, component.TelemetrySettings{}) + s := newStorageCleaner(config, component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }) require.NotEmpty(t, s.Dependencies()) host := storagetest.NewStorageHost() host.WithExtension(jaegerstorage.ID, &mockStorageExt{ @@ -118,7 +121,9 @@ func TestStorageCleanerExtension(t *testing.T) { func TestGetStorageFactoryError(t *testing.T) { config := &Config{} - s := newStorageCleaner(config, component.TelemetrySettings{}) + s := newStorageCleaner(config, component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }) host := storagetest.NewStorageHost() host.WithExtension(jaegerstorage.ID, &mockStorageExt{ name: "storage", @@ -136,6 +141,7 @@ func TestStorageExtensionStartError(t *testing.T) { } var startStatus atomic.Pointer[component.StatusEvent] s := newStorageCleaner(config, component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), ReportStatus: func(status *component.StatusEvent) { startStatus.Store(status) }, diff --git a/cmd/jaeger/internal/processors/adaptivesampling/config.go b/cmd/jaeger/internal/processors/adaptivesampling/config.go new file mode 100644 index 00000000000..86f3a30c8d9 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/config.go @@ -0,0 +1,18 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "go.opentelemetry.io/collector/component" +) + +var _ component.ConfigValidator = (*Config)(nil) + +type Config struct { + // all configuration for the processor is in the remotesampling extension +} + +func (*Config) Validate() error { + return nil +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/factory.go b/cmd/jaeger/internal/processors/adaptivesampling/factory.go new file mode 100644 index 00000000000..601dcd60317 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/factory.go @@ -0,0 +1,48 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" +) + +// componentType is the name of this extension in configuration. +var componentType = component.MustNewType("adaptive_sampling") + +// NewFactory creates a factory for the jaeger remote sampling extension. +func NewFactory() processor.Factory { + return processor.NewFactory( + componentType, + createDefaultConfig, + processor.WithTraces(createTracesProcessor, component.StabilityLevelBeta), + ) +} + +func createDefaultConfig() component.Config { + return &Config{} +} + +func createTracesProcessor( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextConsumer consumer.Traces, +) (processor.Traces, error) { + oCfg := cfg.(*Config) + sp := newTraceProcessor(*oCfg, set.TelemetrySettings) + return processorhelper.NewTracesProcessor( + ctx, + set, + cfg, + nextConsumer, + sp.processTraces, + processorhelper.WithStart(sp.start), + processorhelper.WithShutdown(sp.close), + ) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/factory_test.go b/cmd/jaeger/internal/processors/adaptivesampling/factory_test.go new file mode 100644 index 00000000000..85e95b19cd0 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/factory_test.go @@ -0,0 +1,39 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor/processortest" +) + +func TestCreateDefaultConfig(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg, "failed to create default config") + require.NoError(t, componenttest.CheckConfigStruct(cfg)) + require.NoError(t, cfg.Validate()) +} + +func TestCreateTracesProcessor(t *testing.T) { + ctx := context.Background() + cfg := createDefaultConfig().(*Config) + + nextConsumer := consumertest.NewNop() + set := processortest.NewNopSettings() + + tracesProcessor, err := createTracesProcessor(ctx, set, cfg, nextConsumer) + require.NoError(t, err) + assert.NotNil(t, tracesProcessor) +} + +func TestFactoryType(t *testing.T) { + factory := NewFactory() + assert.Equal(t, componentType, factory.Type()) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/package_test.go b/cmd/jaeger/internal/processors/adaptivesampling/package_test.go new file mode 100644 index 00000000000..10d464704eb --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go new file mode 100644 index 00000000000..9573b2257a3 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -0,0 +1,82 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "fmt" + + otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" + "github.com/jaegertracing/jaeger/internal/metrics/otelmetrics" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" +) + +type traceProcessor struct { + config *Config + aggregator samplingstrategy.Aggregator + telset component.TelemetrySettings +} + +func newTraceProcessor(cfg Config, telset component.TelemetrySettings) *traceProcessor { + return &traceProcessor{ + config: &cfg, + telset: telset, + } +} + +func (tp *traceProcessor) start(_ context.Context, host component.Host) error { + parts, err := remotesampling.GetAdaptiveSamplingComponents(host) + if err != nil { + return fmt.Errorf( + "cannot load adaptive sampling components from `%s` extension: %w", + remotesampling.ComponentType, err) + } + + agg, err := adaptive.NewAggregator( + *parts.Options, + tp.telset.Logger, + otelmetrics.NewFactory(tp.telset.MeterProvider), + parts.DistLock, + parts.SamplingStore, + ) + if err != nil { + return fmt.Errorf("failed to create the adaptive sampling aggregator: %w", err) + } + + agg.Start() + tp.aggregator = agg + + return nil +} + +func (tp *traceProcessor) close(context.Context) error { + if tp.aggregator != nil { + if err := tp.aggregator.Close(); err != nil { + return fmt.Errorf("failed to stop the adaptive sampling aggregator : %w", err) + } + } + return nil +} + +func (tp *traceProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { + batches, err := otlp2jaeger.ProtoFromTraces(td) + if err != nil { + return td, fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err) + } + + for _, batch := range batches { + for _, span := range batch.Spans { + if span.Process == nil { + span.Process = batch.Process + } + adaptive.RecordThroughput(tp.aggregator, span, tp.telset.Logger) + } + } + return td, nil +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor_test.go b/cmd/jaeger/internal/processors/adaptivesampling/processor_test.go new file mode 100644 index 00000000000..69e7d83fe8f --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor_test.go @@ -0,0 +1,150 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "errors" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/pdata/ptrace" + noopmetric "go.opentelemetry.io/otel/metric/noop" + nooptrace "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "github.com/jaegertracing/jaeger/plugin/storage/memory" +) + +func makeStorageExtension(t *testing.T, memstoreName string) component.Host { + telemetrySettings := component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + TracerProvider: nooptrace.NewTracerProvider(), + MeterProvider: noopmetric.NewMeterProvider(), + } + extensionFactory := jaegerstorage.NewFactory() + storageExtension, err := extensionFactory.CreateExtension( + context.Background(), + extension.Settings{ + TelemetrySettings: telemetrySettings, + }, + &jaegerstorage.Config{Backends: map[string]jaegerstorage.Backend{ + memstoreName: {Memory: &memory.Configuration{MaxTraces: 10000}}, + }}, + ) + require.NoError(t, err) + + host := storagetest.NewStorageHost() + host.WithExtension(jaegerstorage.ID, storageExtension) + + err = storageExtension.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, storageExtension.Shutdown(context.Background())) }) + return host +} + +var _ component.Config = (*Config)(nil) + +func makeRemoteSamplingExtension(t *testing.T, cfg component.Config) component.Host { + extensionFactory := remotesampling.NewFactory() + samplingExtension, err := extensionFactory.CreateExtension( + context.Background(), + extension.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.L(), + TracerProvider: nooptrace.NewTracerProvider(), + }, + }, + cfg, + ) + require.NoError(t, err) + host := storagetest.NewStorageHost().WithExtension(remotesampling.ID, samplingExtension) + storageHost := makeStorageExtension(t, "foobar") + + err = samplingExtension.Start(context.Background(), storageHost) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, samplingExtension.Shutdown(context.Background())) }) + return host +} + +func TestNewTraceProcessor(t *testing.T) { + telemetrySettings := component.TelemetrySettings{ + Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), + } + config, ok := createDefaultConfig().(*Config) + require.True(t, ok) + newTraceProcessor := newTraceProcessor(*config, telemetrySettings) + require.NotNil(t, newTraceProcessor) +} + +func TestTraceProcessor(t *testing.T) { + telemetrySettings := component.TelemetrySettings{ + Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), + MeterProvider: noopmetric.NewMeterProvider(), + } + config := createDefaultConfig().(*Config) + traceProcessor := newTraceProcessor(*config, telemetrySettings) + + rsCfg := &remotesampling.Config{ + Adaptive: &remotesampling.AdaptiveConfig{ + SamplingStore: "foobar", + Options: adaptive.DefaultOptions(), + }, + } + host := makeRemoteSamplingExtension(t, rsCfg) + + rsCfg.Adaptive.Options.AggregationBuckets = 0 + err := traceProcessor.start(context.Background(), host) + require.ErrorContains(t, err, "AggregationBuckets must be greater than 0") + + rsCfg.Adaptive.Options = adaptive.DefaultOptions() + require.NoError(t, traceProcessor.start(context.Background(), host)) + + twww := makeTracesOneSpan() + trace, err := traceProcessor.processTraces(context.Background(), twww) + require.NoError(t, err) + require.NotNil(t, trace) + + err = traceProcessor.close(context.Background()) + require.NoError(t, err) +} + +func makeTracesOneSpan() ptrace.Traces { + traces := ptrace.NewTraces() + rSpans := traces.ResourceSpans().AppendEmpty() + sSpans := rSpans.ScopeSpans().AppendEmpty() + span := sSpans.Spans().AppendEmpty() + span.SetName("test") + return traces +} + +func TestGetAdaptiveSamplingComponentsError(t *testing.T) { + processor := &traceProcessor{} + err := processor.start(context.Background(), storagetest.NewStorageHost()) + require.ErrorContains(t, err, "cannot load adaptive sampling components") +} + +// aggregator that returns error from Close() +type notClosingAgg struct{} + +func (*notClosingAgg) Close() error { return errors.New("not closing") } + +func (*notClosingAgg) HandleRootSpan(*model.Span, *zap.Logger) {} +func (*notClosingAgg) RecordThroughput(string, string, model.SamplerType, float64) {} +func (*notClosingAgg) Start() {} + +func TestTraceProcessorCloseError(t *testing.T) { + processor := &traceProcessor{ + aggregator: ¬ClosingAgg{}, + } + require.ErrorContains(t, processor.close(context.Background()), "not closing") +} diff --git a/cmd/jaeger/sampling-strategies.json b/cmd/jaeger/sampling-strategies.json new file mode 100644 index 00000000000..6928e6d0436 --- /dev/null +++ b/cmd/jaeger/sampling-strategies.json @@ -0,0 +1,18 @@ +{ + "default_strategy": { + "type": "probabilistic", + "param": 0.1 + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": 0.8 + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 1 + } + ] +} diff --git a/examples/hotrod/cmd/root.go b/examples/hotrod/cmd/root.go index 7db0b0bda83..1a6a0d22211 100644 --- a/examples/hotrod/cmd/root.go +++ b/examples/hotrod/cmd/root.go @@ -56,8 +56,6 @@ func init() { // onInitialize is called before the command is executed. func onInitialize() { - jaegerclientenv2otel.MapJaegerToOtelEnvVars(logger) - zapOptions := []zap.Option{ zap.AddStacktrace(zapcore.FatalLevel), zap.AddCallerSkip(1), @@ -68,6 +66,9 @@ func onInitialize() { ) } logger, _ = zap.NewDevelopment(zapOptions...) + + jaegerclientenv2otel.MapJaegerToOtelEnvVars(logger) + metricsFactory = prometheus.New().Namespace(metrics.NSOptions{Name: "hotrod", Tags: nil}) if config.MySQLGetDelay != fixDBConnDelay { diff --git a/examples/hotrod/docker-compose.yml b/examples/hotrod/docker-compose.yml index 250f78f82b8..048de7de59e 100644 --- a/examples/hotrod/docker-compose.yml +++ b/examples/hotrod/docker-compose.yml @@ -1,11 +1,10 @@ version: '3.7' - # To run a specific version of Jaeger, use environment variable, e.g.: # JAEGER_VERSION=1.52 docker compose up services: jaeger: - image: jaegertracing/all-in-one:${JAEGER_VERSION:-latest} + image: ${REGISTRY:-}jaegertracing/all-in-one:${JAEGER_VERSION:-latest} ports: - "16686:16686" - "4317:4317" @@ -14,8 +13,9 @@ services: - LOG_LEVEL=debug networks: - jaeger-example + hotrod: - image: jaegertracing/example-hotrod:${JAEGER_VERSION:-latest} + image: ${REGISTRY:-}jaegertracing/example-hotrod:${JAEGER_VERSION:-latest} # To run the latest trunk build, find the tag at Docker Hub and use the line below # https://hub.docker.com/r/jaegertracing/example-hotrod-snapshot/tags #image: jaegertracing/example-hotrod-snapshot:0ab8f2fcb12ff0d10830c1ee3bb52b745522db6c diff --git a/go.mod b/go.mod index a7d408564d4..62ec869cbe0 100644 --- a/go.mod +++ b/go.mod @@ -97,6 +97,8 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.105.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.105.0 // indirect go.opentelemetry.io/collector/internal/globalgates v0.105.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.105.0 // indirect + go.opentelemetry.io/collector/pdata/testdata v0.105.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.4.0 // indirect go.opentelemetry.io/otel/log v0.4.0 // indirect go.opentelemetry.io/otel/sdk/log v0.4.0 // indirect diff --git a/pkg/clientcfg/clientcfghttp/handler.go b/pkg/clientcfg/clientcfghttp/handler.go index ab2cef08c5d..7bc22be9a4e 100644 --- a/pkg/clientcfg/clientcfghttp/handler.go +++ b/pkg/clientcfg/clientcfghttp/handler.go @@ -109,6 +109,17 @@ func (h *HTTPHandler) RegisterRoutes(router *mux.Router) { }).Methods(http.MethodGet) } +// RegisterRoutes registers configuration handlers with HTTP Router. +func (h *HTTPHandler) RegisterRoutesWithHTTP(router *http.ServeMux) { + prefix := h.params.BasePath + router.HandleFunc( + prefix+"/", + func(w http.ResponseWriter, r *http.Request) { + h.serveSamplingHTTP(w, r, h.encodeThriftLegacy) + }, + ) +} + func (h *HTTPHandler) serviceFromRequest(w http.ResponseWriter, r *http.Request) (string, error) { services := r.URL.Query()["service"] if len(services) != 1 { diff --git a/pkg/clientcfg/clientcfghttp/handler_test.go b/pkg/clientcfg/clientcfghttp/handler_test.go index 6afc30d61b2..723047c1ed3 100644 --- a/pkg/clientcfg/clientcfghttp/handler_test.go +++ b/pkg/clientcfg/clientcfghttp/handler_test.go @@ -47,6 +47,7 @@ func withServer( basePath string, mockSamplingResponse *api_v2.SamplingStrategyResponse, mockBaggageResponse []*baggage.BaggageRestriction, + withGorilla bool, testFn func(server *testServer), ) { metricsFactory := metricstest.NewFactory(0) @@ -62,9 +63,18 @@ func withServer( BasePath: basePath, LegacySamplingEndpoint: true, }) - r := mux.NewRouter() - handler.RegisterRoutes(r) - server := httptest.NewServer(r) + + var server *httptest.Server + if withGorilla { + r := mux.NewRouter() + handler.RegisterRoutes(r) + server = httptest.NewServer(r) + } else { + mux := http.NewServeMux() + handler.RegisterRoutesWithHTTP(mux) + server = httptest.NewServer(mux) + } + defer server.Close() testFn(&testServer{ metricsFactory: metricsFactory, @@ -76,15 +86,17 @@ func withServer( } func TestHTTPHandler(t *testing.T) { + testGorillaHTTPHandler(t, "") testHTTPHandler(t, "") } func TestHTTPHandlerWithBasePath(t *testing.T) { + testGorillaHTTPHandler(t, "/foo") testHTTPHandler(t, "/foo") } -func testHTTPHandler(t *testing.T, basePath string) { - withServer(basePath, rateLimiting(42), restrictions("luggage", 10), func(ts *testServer) { +func testGorillaHTTPHandler(t *testing.T, basePath string) { + withServer(basePath, rateLimiting(42), restrictions("luggage", 10), true, func(ts *testServer) { tests := []struct { endpoint string expOutput string @@ -146,6 +158,49 @@ func testHTTPHandler(t *testing.T, basePath string) { }) } +func testHTTPHandler(t *testing.T, basePath string) { + withServer(basePath, rateLimiting(42), restrictions("luggage", 10), false, func(ts *testServer) { + tests := []struct { + endpoint string + expOutput string + }{ + { + endpoint: "/", + expOutput: `{"strategyType":1,"rateLimitingSampling":{"maxTracesPerSecond":42}}`, + }, + } + for _, test := range tests { + t.Run("endpoint="+test.endpoint, func(t *testing.T) { + resp, err := http.Get(ts.server.URL + basePath + test.endpoint + "?service=Y") + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + err = resp.Body.Close() + require.NoError(t, err) + assert.Equal(t, test.expOutput, string(body)) + if test.endpoint == "/" { + objResp := &tSampling092.SamplingStrategyResponse{} + require.NoError(t, json.Unmarshal(body, objResp)) + assert.EqualValues(t, + ts.samplingProvider.samplingResponse.GetStrategyType(), + objResp.GetStrategyType()) + assert.EqualValues(t, + ts.samplingProvider.samplingResponse.GetRateLimitingSampling().GetMaxTracesPerSecond(), + objResp.GetRateLimitingSampling().GetMaxTracesPerSecond()) + } else { + objResp, err := p2json.SamplingStrategyResponseFromJSON(body) + require.NoError(t, err) + assert.EqualValues(t, ts.samplingProvider.samplingResponse, objResp) + } + }) + } + + // handler must emit metrics + ts.metricsFactory.AssertCounterMetrics(t, metricstest.ExpectedMetric{Name: "http-server.requests", Tags: map[string]string{"type": "sampling-legacy"}, Value: 1}) + }) +} + func TestHTTPHandlerErrors(t *testing.T) { testCases := []struct { description string @@ -215,61 +270,67 @@ func TestHTTPHandlerErrors(t *testing.T) { for _, tc := range testCases { testCase := tc // capture loop var t.Run(testCase.description, func(t *testing.T) { - withServer("", testCase.mockSamplingResponse, testCase.mockBaggageResponse, func(ts *testServer) { - resp, err := http.Get(ts.server.URL + testCase.url) - require.NoError(t, err) - assert.Equal(t, testCase.statusCode, resp.StatusCode) - if testCase.body != "" { - body, err := io.ReadAll(resp.Body) + for _, withGorilla := range []bool{true, false} { + withServer("", testCase.mockSamplingResponse, testCase.mockBaggageResponse, withGorilla, func(ts *testServer) { + resp, err := http.Get(ts.server.URL + testCase.url) require.NoError(t, err) - assert.Equal(t, testCase.body, string(body)) - } + assert.Equal(t, testCase.statusCode, resp.StatusCode) + if testCase.body != "" { + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, testCase.body, string(body)) + } - if len(testCase.metrics) > 0 { - ts.metricsFactory.AssertCounterMetrics(t, testCase.metrics...) - } - }) + if len(testCase.metrics) > 0 { + ts.metricsFactory.AssertCounterMetrics(t, testCase.metrics...) + } + }) + } }) } t.Run("failure to write a response", func(t *testing.T) { - withServer("", probabilistic(0.001), restrictions("luggage", 10), func(ts *testServer) { - handler := ts.handler + for _, withGorilla := range []bool{true, false} { + withServer("", probabilistic(0.001), restrictions("luggage", 10), withGorilla, func(ts *testServer) { + handler := ts.handler - req := httptest.NewRequest("GET", "http://localhost:80/?service=X", nil) - w := &mockWriter{header: make(http.Header)} - handler.serveSamplingHTTP(w, req, handler.encodeThriftLegacy) + req := httptest.NewRequest("GET", "http://localhost:80/?service=X", nil) + w := &mockWriter{header: make(http.Header)} + handler.serveSamplingHTTP(w, req, handler.encodeThriftLegacy) - ts.metricsFactory.AssertCounterMetrics(t, - metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 1}) + ts.metricsFactory.AssertCounterMetrics(t, + metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 1}) - req = httptest.NewRequest("GET", "http://localhost:80/baggageRestrictions?service=X", nil) - handler.serveBaggageHTTP(w, req) + req = httptest.NewRequest("GET", "http://localhost:80/baggageRestrictions?service=X", nil) + handler.serveBaggageHTTP(w, req) - ts.metricsFactory.AssertCounterMetrics(t, - metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 2}) - }) + ts.metricsFactory.AssertCounterMetrics(t, + metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 2}) + }) + } }) } func TestEncodeErrors(t *testing.T) { - withServer("", nil, nil, func(server *testServer) { - _, err := server.handler.encodeThriftLegacy(&api_v2.SamplingStrategyResponse{ - StrategyType: -1, - }) - require.Error(t, err) - assert.Contains(t, err.Error(), "ConvertSamplingResponseFromDomain failed") - server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ - {Name: "http-server.errors", Tags: map[string]string{"source": "thrift", "status": "5xx"}, Value: 1}, - }...) + for _, withGorilla := range []bool{true, false} { + withServer("", nil, nil, withGorilla, func(server *testServer) { + _, err := server.handler.encodeThriftLegacy(&api_v2.SamplingStrategyResponse{ + StrategyType: -1, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "ConvertSamplingResponseFromDomain failed") + server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ + {Name: "http-server.errors", Tags: map[string]string{"source": "thrift", "status": "5xx"}, Value: 1}, + }...) - _, err = server.handler.encodeProto(nil) - require.Error(t, err) - assert.Contains(t, err.Error(), "SamplingStrategyResponseToJSON failed") - server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ - {Name: "http-server.errors", Tags: map[string]string{"source": "proto", "status": "5xx"}, Value: 1}, - }...) - }) + _, err = server.handler.encodeProto(nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "SamplingStrategyResponseToJSON failed") + server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ + {Name: "http-server.errors", Tags: map[string]string{"source": "proto", "status": "5xx"}, Value: 1}, + }...) + }) + } } func rateLimiting(rate int32) *api_v2.SamplingStrategyResponse { diff --git a/plugin/sampling/strategyprovider/adaptive/aggregator.go b/plugin/sampling/strategyprovider/adaptive/aggregator.go index 97b8e69399c..5fe4fcd59dd 100644 --- a/plugin/sampling/strategyprovider/adaptive/aggregator.go +++ b/plugin/sampling/strategyprovider/adaptive/aggregator.go @@ -129,6 +129,23 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa } } +func RecordThroughput(agg samplingstrategy.Aggregator, span *span_model.Span, logger *zap.Logger) { + // TODO simply checking parentId to determine if a span is a root span is not sufficient. However, + // we can be sure that only a root span will have sampler tags. + if span.ParentSpanID() != span_model.NewSpanID(0) { + return + } + service := span.Process.ServiceName + if service == "" || span.OperationName == "" { + return + } + samplerType, samplerParam := span.GetSamplerParams(logger) + if samplerType == span_model.SamplerTypeUnrecognized { + return + } + agg.RecordThroughput(service, span.OperationName, samplerType, samplerParam) +} + func (a *aggregator) Start() { a.postAggregator.Start() diff --git a/plugin/sampling/strategyprovider/adaptive/aggregator_test.go b/plugin/sampling/strategyprovider/adaptive/aggregator_test.go index bd34c3cb016..9e4247635d4 100644 --- a/plugin/sampling/strategyprovider/adaptive/aggregator_test.go +++ b/plugin/sampling/strategyprovider/adaptive/aggregator_test.go @@ -155,3 +155,44 @@ func TestRecordThroughput(t *testing.T) { a.HandleRootSpan(span, logger) assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count) } + +func TestRecordThroughputFunc(t *testing.T) { + metricsFactory := metricstest.NewFactory(0) + mockStorage := &mocks.Store{} + mockEP := &epmocks.ElectionParticipant{} + logger := zap.NewNop() + testOpts := Options{ + CalculationInterval: 1 * time.Second, + AggregationBuckets: 1, + BucketsForCalculation: 1, + } + + a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) + require.NoError(t, err) + + // Testing non-root span + span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}} + RecordThroughput(a, span, logger) + require.Empty(t, a.(*aggregator).currentThroughput) + + // Testing span with service name but no operation + span.References = []model.SpanRef{} + span.Process = &model.Process{ + ServiceName: "A", + } + RecordThroughput(a, span, logger) + require.Empty(t, a.(*aggregator).currentThroughput) + + // Testing span with service name and operation but no probabilistic sampling tags + span.OperationName = "GET" + RecordThroughput(a, span, logger) + require.Empty(t, a.(*aggregator).currentThroughput) + + // Testing span with service name, operation, and probabilistic sampling tags + span.Tags = model.KeyValues{ + model.String("sampler.type", "probabilistic"), + model.String("sampler.param", "0.001"), + } + RecordThroughput(a, span, logger) + assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count) +} diff --git a/plugin/sampling/strategyprovider/adaptive/options.go b/plugin/sampling/strategyprovider/adaptive/options.go index 64288632dbf..4eb13361443 100644 --- a/plugin/sampling/strategyprovider/adaptive/options.go +++ b/plugin/sampling/strategyprovider/adaptive/options.go @@ -53,7 +53,7 @@ const ( type Options struct { // TargetSamplesPerSecond is the global target rate of samples per operation. // TODO implement manual overrides per service/operation. - TargetSamplesPerSecond float64 + TargetSamplesPerSecond float64 `mapstructure:"target_samples_per_second"` // DeltaTolerance is the acceptable amount of deviation between the observed and the desired (target) // throughput for an operation, expressed as a ratio. For example, the value of 0.3 (30% deviation) @@ -62,23 +62,23 @@ type Options struct { // in the PID Controller terminology) to the sampler in the application. // // Increase this to reduce the amount of fluctuation in the calculated probabilities. - DeltaTolerance float64 + DeltaTolerance float64 `mapstructure:"delta_tolerance"` // CalculationInterval determines how often new probabilities are calculated. E.g. if it is 1 minute, // new sampling probabilities are calculated once a minute and each bucket will contain 1 minute worth // of aggregated throughput data. - CalculationInterval time.Duration + CalculationInterval time.Duration `mapstructure:"calculation_interval"` // AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if // the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the // AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for // all operations. // TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice. - AggregationBuckets int + AggregationBuckets int `mapstructure:"aggregation_buckets"` // BucketsForCalculation determines how many previous buckets used in calculating the weighted QPS, // ie. if BucketsForCalculation is 1, only the most recent bucket will be used in calculating the weighted QPS. - BucketsForCalculation int + BucketsForCalculation int `mapstructure:"calculation_buckets"` // Delay is the amount of time to delay probability generation by, ie. if the CalculationInterval // is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time @@ -88,29 +88,45 @@ type Options struct { // during any 1 minute interval, the clients will be fetching new probabilities in a uniformly // distributed manner throughout the 1 minute window. By setting the delay to 2 minutes, we can // guarantee that all clients can use the latest calculated probabilities for at least 1 minute. - Delay time.Duration + Delay time.Duration `mapstructure:"calculation_delay"` // InitialSamplingProbability is the initial sampling probability for all new operations. - InitialSamplingProbability float64 + InitialSamplingProbability float64 `mapstructure:"initial_sampling_probability"` // MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling // probability will be in the range [MinSamplingProbability, 1.0]. - MinSamplingProbability float64 + MinSamplingProbability float64 `mapstructure:"min_sampling_probability"` // MinSamplesPerSecond determines the min number of traces that are sampled per second. // For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do // its best to sample at least one trace a minute for an operation. This is useful for low QPS operations // that may never be sampled by the probabilistic sampler. - MinSamplesPerSecond float64 + MinSamplesPerSecond float64 `mapstructure:"min_samples_per_second"` // LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before // attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval // to reduce lock thrashing. - LeaderLeaseRefreshInterval time.Duration + LeaderLeaseRefreshInterval time.Duration `mapstructure:"leader_lease_refresh_interval"` // FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower // (ie. failed to gain the leader lock). - FollowerLeaseRefreshInterval time.Duration + FollowerLeaseRefreshInterval time.Duration `mapstructure:"follower_lease_refresh_interval"` +} + +func DefaultOptions() Options { + return Options{ + TargetSamplesPerSecond: defaultTargetSamplesPerSecond, + DeltaTolerance: defaultDeltaTolerance, + BucketsForCalculation: defaultBucketsForCalculation, + CalculationInterval: defaultCalculationInterval, + AggregationBuckets: defaultAggregationBuckets, + Delay: defaultDelay, + InitialSamplingProbability: defaultInitialSamplingProbability, + MinSamplingProbability: defaultMinSamplingProbability, + MinSamplesPerSecond: defaultMinSamplesPerSecond, + LeaderLeaseRefreshInterval: defaultLeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: defaultFollowerLeaseRefreshInterval, + } } // AddFlags adds flags for Options diff --git a/plugin/sampling/strategyprovider/adaptive/options_test.go b/plugin/sampling/strategyprovider/adaptive/options_test.go index 1ab64c0589c..e441ad1472b 100644 --- a/plugin/sampling/strategyprovider/adaptive/options_test.go +++ b/plugin/sampling/strategyprovider/adaptive/options_test.go @@ -43,3 +43,18 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, time.Duration(5000000000), opts.LeaderLeaseRefreshInterval) assert.Equal(t, time.Duration(60000000000), opts.FollowerLeaseRefreshInterval) } + +func TestDefaultOptions(t *testing.T) { + options := DefaultOptions() + assert.Equal(t, float64(defaultTargetSamplesPerSecond), options.TargetSamplesPerSecond) + assert.Equal(t, defaultDeltaTolerance, options.DeltaTolerance) + assert.Equal(t, defaultBucketsForCalculation, options.BucketsForCalculation) + assert.Equal(t, defaultCalculationInterval, options.CalculationInterval) + assert.Equal(t, defaultAggregationBuckets, options.AggregationBuckets) + assert.Equal(t, defaultDelay, options.Delay) + assert.Equal(t, defaultInitialSamplingProbability, options.InitialSamplingProbability) + assert.Equal(t, defaultMinSamplingProbability, options.MinSamplingProbability) + assert.Equal(t, defaultMinSamplesPerSecond, options.MinSamplesPerSecond) + assert.Equal(t, defaultLeaderLeaseRefreshInterval, options.LeaderLeaseRefreshInterval) + assert.Equal(t, defaultFollowerLeaseRefreshInterval, options.FollowerLeaseRefreshInterval) +} diff --git a/plugin/storage/memory/factory.go b/plugin/storage/memory/factory.go index 6c677b03d95..d9bfd6c113a 100644 --- a/plugin/storage/memory/factory.go +++ b/plugin/storage/memory/factory.go @@ -16,6 +16,7 @@ package memory import ( + "context" "flag" "github.com/spf13/viper" @@ -36,6 +37,7 @@ var ( // interface comformance checks _ storage.ArchiveFactory = (*Factory)(nil) _ storage.SamplingStoreFactory = (*Factory)(nil) _ plugin.Configurable = (*Factory)(nil) + _ storage.Purger = (*Factory)(nil) ) // Factory implements storage.Factory and creates storage components backed by memory store. @@ -126,3 +128,10 @@ func (*Factory) CreateLock() (distributedlock.Lock, error) { func (f *Factory) publishOpts() { safeexpvar.SetInt("jaeger_storage_memory_max_traces", int64(f.options.Configuration.MaxTraces)) } + +// Purge removes all data from the Factory's underlying Memory store. +// This function is intended for testing purposes only and should not be used in production environments. +func (f *Factory) Purge(ctx context.Context) error { + f.store.purge(ctx) + return nil +} diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index f92ae9949e9..9bac7d2b2bd 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -337,3 +337,10 @@ func flattenTags(span *model.Span) model.KeyValues { } return retMe } + +// purge supports Purger interface. +func (st *Store) purge(context.Context) { + st.Lock() + st.perTenant = make(map[string]*Tenant) + st.Unlock() +} diff --git a/renovate.json b/renovate.json index 2f52d689897..ef7a5b20f83 100644 --- a/renovate.json +++ b/renovate.json @@ -32,11 +32,12 @@ }, { "matchManagers": ["github-actions"], - "groupName": "github-actions deps" + "groupName": "github-actions deps", + "schedule": ["on the first day of the month"] }, { "matchManagers": ["github-actions"], - "matchUpdateTypes": ["major", "patch", "digest"], + "matchUpdateTypes": ["patch", "digest"], "enabled": false }, { diff --git a/scripts/hotrod-integration-test.sh b/scripts/hotrod-integration-test.sh index 40149bef8ff..7f56eb7a85c 100755 --- a/scripts/hotrod-integration-test.sh +++ b/scripts/hotrod-integration-test.sh @@ -2,32 +2,116 @@ set -euxf -o pipefail -make build-examples GOOS=linux GOARCH=amd64 -make build-examples GOOS=linux GOARCH=s390x -make build-examples GOOS=linux GOARCH=ppc64le -make build-examples GOOS=linux GOARCH=arm64 +print_help() { + echo "Usage: $0 [-l] [-D] [-p platforms] [-h]" + echo "-h: Print help" + echo "-l: Enable local-only mode that only pushes images to local registry" + echo "-p: Comma-separated list of platforms to build for (default: all supported)" + exit 1 +} -REPO=jaegertracing/example-hotrod +docker_compose_file="./examples/hotrod/docker-compose.yml" platforms="linux/amd64,linux/s390x,linux/ppc64le,linux/arm64" +current_platform="$(go env GOOS)/$(go env GOARCH)" +LOCAL_FLAG='' + +while getopts "lp:h" opt; do + case "${opt}" in + l) + # in the local-only mode the images will only be pushed to local registry + LOCAL_FLAG='-l' + ;; + p) + platforms=${OPTARG} + ;; + *) + print_help + ;; + esac +done + +teardown() { + echo "Tearing down..." + docker compose -f "$docker_compose_file" down +} +trap teardown EXIT + make prepare-docker-buildx +make create-baseimg + +# Build hotrod binary for each target platform (separated by commas) +for platform in $(echo "$platforms" | tr ',' ' '); do + # Extract the operating system from the platform string + os=${platform%%/*} #remove everything after the last slash + # Extract the architecture from the platform string + arch=${platform##*/} # Remove everything before the last slash + make build-examples GOOS="${os}" GOARCH="${arch}" +done + +# Build hotrod image locally (-l) for integration test. +# Note: hotrod's Dockerfile is different from main binaries, +# so we do not pass flags like -b and -t. +bash scripts/build-upload-a-docker-image.sh -l -c example-hotrod -d examples/hotrod -p "${current_platform}" -# build image locally (-l) for integration test -bash scripts/build-upload-a-docker-image.sh -l -c example-hotrod -d examples/hotrod -p "${platforms}" +# Build all-in-one image locally (-l) for integration test +make build-all-in-one +bash scripts/build-upload-a-docker-image.sh -l -b -c all-in-one -d cmd/all-in-one -p "${current_platform}" -t release -# pass --name example-hotrod so that we can do `docker logs example-hotrod` later -export CID -CID=$(docker run -d --name example-hotrod -p 8080:8080 "localhost:5000/${REPO}:${GITHUB_SHA}") +JAEGER_VERSION=$GITHUB_SHA REGISTRY="localhost:5000/" docker compose -f "$docker_compose_file" up -d i=0 -while [[ "$(curl -s -o /dev/null -w '%{http_code}' localhost:8080)" != "200" && ${i} -lt 30 ]]; do +while [[ "$(curl -s -o /dev/null -w '%{http_code}' localhost:8080)" != "200" && $i -lt 30 ]]; do sleep 1 i=$((i+1)) done + body=$(curl localhost:8080) if [[ $body != *"Rides On Demand"* ]]; then echo "String \"Rides On Demand\" is not present on the index page" exit 1 fi -docker rm -f "$CID" -bash scripts/build-upload-a-docker-image.sh -c example-hotrod -d examples/hotrod -p "${platforms}" +response=$(curl -i -X POST "http://localhost:8080/dispatch?customer=123") +TRACE_ID=$(echo "$response" | grep -Fi "Traceresponse:" | awk '{print $2}' | cut -d '-' -f 2) + +if [ -n "$TRACE_ID" ]; then + echo "TRACE_ID is not empty: $TRACE_ID" +else + echo "TRACE_ID is empty" + exit 1 +fi + +JAEGER_QUERY_URL="http://localhost:16686" +EXPECTED_SPANS=35 +MAX_RETRIES=30 +SLEEP_INTERVAL=3 + +poll_jaeger() { + local trace_id=$1 + local url="${JAEGER_QUERY_URL}/api/traces/${trace_id}" + + curl -s "${url}" | jq '.data[0].spans | length' || echo "0" +} + +# Poll Jaeger until trace with desired number of spans is loaded or we timeout. +span_count=0 +for ((i=1; i<=MAX_RETRIES; i++)); do + span_count=$(poll_jaeger "${TRACE_ID}") + + if [[ "$span_count" -ge "$EXPECTED_SPANS" ]]; then + echo "Trace found with $span_count spans." + break + fi + + echo "Retry $i/$MAX_RETRIES: Trace not found or insufficient spans ($span_count/$EXPECTED_SPANS). Retrying in $SLEEP_INTERVAL seconds..." + sleep $SLEEP_INTERVAL +done + +if [[ "$span_count" -lt "$EXPECTED_SPANS" ]]; then + echo "Failed to find the trace with the expected number of spans within the timeout period." + exit 1 +fi + +# Ensure the image is published after successful test (maybe with -l flag if on a pull request). +# This is where all those multi-platform binaries we built earlier are utilized. +bash scripts/build-upload-a-docker-image.sh ${LOCAL_FLAG} -c example-hotrod -d examples/hotrod -p "${platforms}" diff --git a/scripts/spm-integration-test.sh b/scripts/spm-integration-test.sh index b87b7a6a32f..45c7e273904 100755 --- a/scripts/spm-integration-test.sh +++ b/scripts/spm-integration-test.sh @@ -85,17 +85,24 @@ validate_service_metrics() { # Store the values in an array mapfile -t metric_points < <(echo "$response" | jq -r '.metrics[0].metricPoints[].gaugeValue.doubleValue') echo "Metric datapoints found for service '$service': " "${metric_points[@]}" - # Check that all values are non-zero + # Check that atleast some values are non-zero after the threshold local non_zero_count=0 + local expected_non_zero_count=3 + local zero_count=0 + local expected_max_zero_count=3 for value in "${metric_points[@]}"; do if [[ $(echo "$value > 0.0" | bc) == "1" ]]; then non_zero_count=$((non_zero_count + 1)) else - echo "❌ ERROR: Zero values not expected" + zero_count=$((zero_count + 1)) + fi + + if [[ $zero_count -gt $expected_max_zero_count ]]; then + echo "❌ ERROR: Zero values crossing threshold limit not expected (Threshold limit - '$expected_max_zero_count')" return 1 fi done - if [ $non_zero_count -lt 3 ]; then + if [ $non_zero_count -lt $expected_non_zero_count ]; then echo "⏳ Expecting at least 3 non-zero data points" return 1 fi