Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change receiver shared component part #12203

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 11 additions & 81 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/internal/sharedcomponent"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metadata"
"go.opentelemetry.io/collector/receiver/xreceiver"
Expand All @@ -23,6 +22,9 @@ const (
defaultMetricsURLPath = "/v1/metrics"
defaultLogsURLPath = "/v1/logs"
defaultProfilesURLPath = "/v1development/profiles"

transportHTTP = "http"
transportGRPC = "grpc"
)

// NewFactory creates a new OTLP receiver factory.
Expand Down Expand Up @@ -62,97 +64,25 @@ func createDefaultConfig() component.Config {
}

// createTraces creates a trace receiver based on provided config.
func createTraces(
_ context.Context,
set receiver.Settings,
cfg component.Config,
nextConsumer consumer.Traces,
) (receiver.Traces, error) {
func createTraces(_ context.Context, set receiver.Settings, cfg component.Config, next consumer.Traces) (receiver.Traces, error) {
oCfg := cfg.(*Config)
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
},
)
if err != nil {
return nil, err
}

r.Unwrap().registerTraceConsumer(nextConsumer)
return r, nil
return newTraces(set, oCfg, next)
}

// createMetrics creates a metrics receiver based on provided config.
func createMetrics(
_ context.Context,
set receiver.Settings,
cfg component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
func createMetrics(_ context.Context, set receiver.Settings, cfg component.Config, next consumer.Metrics) (receiver.Metrics, error) {
oCfg := cfg.(*Config)
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
},
)
if err != nil {
return nil, err
}

r.Unwrap().registerMetricsConsumer(consumer)
return r, nil
return newMetrics(set, oCfg, next)
}

// createLog creates a log receiver based on provided config.
func createLog(
_ context.Context,
set receiver.Settings,
cfg component.Config,
consumer consumer.Logs,
) (receiver.Logs, error) {
func createLog(_ context.Context, set receiver.Settings, cfg component.Config, next consumer.Logs) (receiver.Logs, error) {
oCfg := cfg.(*Config)
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
},
)
if err != nil {
return nil, err
}

r.Unwrap().registerLogsConsumer(consumer)
return r, nil
return newLogs(set, oCfg, next)
}

// createProfiles creates a trace receiver based on provided config.
func createProfiles(
_ context.Context,
set receiver.Settings,
cfg component.Config,
nextConsumer xconsumer.Profiles,
) (xreceiver.Profiles, error) {
func createProfiles(_ context.Context, set receiver.Settings, cfg component.Config, next xconsumer.Profiles) (xreceiver.Profiles, error) {
oCfg := cfg.(*Config)
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
},
)
if err != nil {
return nil, err
}

r.Unwrap().registerProfilesConsumer(nextConsumer)
return r, nil
return newProfiles(set, oCfg, next)
}

// This is the map of already created OTLP receivers for particular configurations.
// We maintain this map because the receiver.Factory is asked trace and metric receivers separately
// when it gets CreateTraces() and CreateMetrics() but they must not
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = sharedcomponent.NewMap[*Config, *otlpReceiver]()
28 changes: 0 additions & 28 deletions receiver/otlpreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,6 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func TestCreateSameReceiver(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.GRPC.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)
cfg.HTTP.Endpoint = testutil.GetAvailableLocalAddress(t)

creationSet := receivertest.NewNopSettings()
tReceiver, err := factory.CreateTraces(context.Background(), creationSet, cfg, consumertest.NewNop())
assert.NotNil(t, tReceiver)
require.NoError(t, err)

mReceiver, err := factory.CreateMetrics(context.Background(), creationSet, cfg, consumertest.NewNop())
assert.NotNil(t, mReceiver)
require.NoError(t, err)

lReceiver, err := factory.CreateMetrics(context.Background(), creationSet, cfg, consumertest.NewNop())
assert.NotNil(t, lReceiver)
require.NoError(t, err)

pReceiver, err := factory.(xreceiver.Factory).CreateProfiles(context.Background(), creationSet, cfg, consumertest.NewNop())
assert.NotNil(t, pReceiver)
require.NoError(t, err)

assert.Same(t, tReceiver, mReceiver)
assert.Same(t, tReceiver, lReceiver)
assert.Same(t, tReceiver, pReceiver)
}

func TestCreateTraces(t *testing.T) {
factory := NewFactory()
defaultGRPCSettings := &configgrpc.ServerConfig{
Expand Down
24 changes: 9 additions & 15 deletions receiver/otlpreceiver/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
Expand All @@ -20,9 +22,7 @@ import (
func FuzzReceiverHandlers(f *testing.F) {
f.Fuzz(func(_ *testing.T, data []byte, pb bool, handler int) {
req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(data))
if err != nil {
return
}
require.NoError(f, err)
if pb {
req.Header.Add("Content-Type", pbContentType)
} else {
Expand All @@ -31,25 +31,19 @@ func FuzzReceiverHandlers(f *testing.F) {
set := receivertest.NewNopSettings()
set.TelemetrySettings = componenttest.NewNopTelemetrySettings()
set.ID = otlpReceiverID
cfg := createDefaultConfig().(*Config)
r, err := newOtlpReceiver(cfg, &set)
if err != nil {
panic(err)
}
r.nextTraces = consumertest.NewNop()
r.nextLogs = consumertest.NewNop()
r.nextMetrics = consumertest.NewNop()
r.nextProfiles = consumertest.NewNop()
resp := httptest.NewRecorder()
switch handler % 3 {
case 0:
httpTracesReceiver := trace.New(r.nextTraces, r.obsrepHTTP)
httpTracesReceiver, err := trace.New(consumertest.NewNop(), set, transportHTTP)
require.NoError(f, err)
handleTraces(resp, req, httpTracesReceiver)
case 1:
httpMetricsReceiver := metrics.New(r.nextMetrics, r.obsrepHTTP)
httpMetricsReceiver, err := metrics.New(consumertest.NewNop(), set, transportHTTP)
require.NoError(f, err)
handleMetrics(resp, req, httpMetricsReceiver)
case 2:
httpLogsReceiver := logs.New(r.nextLogs, r.obsrepHTTP)
httpLogsReceiver, err := logs.New(consumertest.NewNop(), set, transportHTTP)
require.NoError(f, err)
handleLogs(resp, req, httpLogsReceiver)
}
})
Expand Down
85 changes: 85 additions & 0 deletions receiver/otlpreceiver/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlpreceiver // import "go.opentelemetry.io/collector/receiver/otlpreceiver"

import (
"context"
"errors"
"net"
"sync"

"go.uber.org/zap"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/internal/sharedcomponent"
)

type registerServerFunc func(srv *grpc.Server)

type grpcComponent struct {
cfg *configgrpc.ServerConfig
serverGRPC *grpc.Server
tel component.TelemetrySettings
shutdownWG sync.WaitGroup
servers []registerServerFunc
}

func newSharedGRPC(cfg *configgrpc.ServerConfig, tel component.TelemetrySettings) (*sharedcomponent.Component[*grpcComponent], error) {
return grpcs.LoadOrStore(
cfg,
func() (*grpcComponent, error) {
return &grpcComponent{cfg: cfg, tel: tel}, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you pass in tel here, I assume you want it to not have an otel.signal attribute on the logger, etc?

This still leaves us in a situation where it needs to be present sometimes and not others. What's the mechanism for managing this?

Copy link
Member

@djaglowski djaglowski Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, when the server encounters an error, the metrics handler will have to use the signal-agnostic logger anyways, so doesn't it then have to use zap.String(...) anyways to append the signal attribute?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My hope is that, now that we know a component has to create different "types" of sub-component, the mechanism we design will allow that. Very important is that the "main" component still has all the "attributes".

The easiest way I can think of is via the TelemetrySettings we provide, we can have there a mechanism of retrieving the Logger via some functions instead of direct access to 1 variable.

},
)
}

func (r *grpcComponent) Start(_ context.Context, host component.Host) error {
var err error
if r.serverGRPC, err = r.cfg.ToServer(context.Background(), host, r.tel); err != nil {
return err
}

for _, registration := range r.servers {
registration(r.serverGRPC)
}

r.tel.Logger.Info("Starting GRPC server", zap.String("endpoint", r.cfg.NetAddr.Endpoint))
var gln net.Listener
if gln, err = r.cfg.NetAddr.Listen(context.Background()); err != nil {
return err
}

r.shutdownWG.Add(1)
go func() {
defer r.shutdownWG.Done()
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil && !errors.Is(errGrpc, grpc.ErrServerStopped) {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errGrpc))
}
}()
return nil
}

// Shutdown is a method to turn off receiving.
func (r *grpcComponent) Shutdown(context.Context) error {
if r.serverGRPC != nil {
r.serverGRPC.GracefulStop()
}
r.shutdownWG.Wait()
return nil
}

func (r *grpcComponent) registerServer(sr registerServerFunc) {
r.servers = append(r.servers, sr)
}

// This is the map of already created OTLP receivers for particular configurations.
// We maintain this map because the receiver.Factory is asked trace and metric receivers separately
// when it gets CreateTraces() and CreateMetrics() but they must not
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var grpcs = sharedcomponent.NewMap[*configgrpc.ServerConfig, *grpcComponent]()
90 changes: 90 additions & 0 deletions receiver/otlpreceiver/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlpreceiver // import "go.opentelemetry.io/collector/receiver/otlpreceiver"

import (
"context"
"errors"
"net"
"net/http"
"sync"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/internal/sharedcomponent"
)

type handler struct {
pattern string
handlerFunc http.HandlerFunc
}

type httpComponent struct {
cfg *confighttp.ServerConfig
serverHTTP *http.Server
tel component.TelemetrySettings
shutdownWG sync.WaitGroup
handlers []handler
}

func newSharedHTTP(cfg *confighttp.ServerConfig, tel component.TelemetrySettings) (*sharedcomponent.Component[*httpComponent], error) {
return https.LoadOrStore(
cfg,
func() (*httpComponent, error) {
return &httpComponent{cfg: cfg, tel: tel}, nil
},
)
}

func (r *httpComponent) Start(ctx context.Context, host component.Host) error {
httpMux := http.NewServeMux()
for _, h := range r.handlers {
httpMux.HandleFunc(h.pattern, h.handlerFunc)
}

var err error
if r.serverHTTP, err = r.cfg.ToServer(ctx, host, r.tel, httpMux, confighttp.WithErrorHandler(errorHandler)); err != nil {
return err
}

r.tel.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.Endpoint))
var hln net.Listener
if hln, err = r.cfg.ToListener(ctx); err != nil {
return err
}

r.shutdownWG.Add(1)
go func() {
defer r.shutdownWG.Done()
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil && !errors.Is(errHTTP, http.ErrServerClosed) {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errHTTP))
}
}()
return nil
}

// Shutdown is a method to turn off receiving.
func (r *httpComponent) Shutdown(ctx context.Context) error {
var err error
if r.serverHTTP != nil {
err = r.serverHTTP.Shutdown(ctx)
}
r.shutdownWG.Wait()
return err
}

func (r *httpComponent) registerHandler(pattern string, handlerFunc http.HandlerFunc) {
r.handlers = append(r.handlers, handler{pattern: pattern, handlerFunc: handlerFunc})
}

// This is the map of already created OTLP receivers for particular configurations.
// We maintain this map because the receiver.Factory is asked trace and metric receivers separately
// when it gets CreateTraces() and CreateMetrics() but they must not
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var https = sharedcomponent.NewMap[*confighttp.ServerConfig, *httpComponent]()
Loading
Loading