Skip to content

Commit

Permalink
feat: OBS-390 - [distributor] handle ingestion data sources
Browse files Browse the repository at this point in the history
- retrieve ingestion data sources utilising reconciler svc client
- add auth unary interceptor
- tidy up

Signed-off-by: Michal Fiedorowicz <[email protected]>
  • Loading branch information
mfiedorowicz committed Feb 21, 2024
1 parent 15290cf commit d4b2223
Showing 1 changed file with 80 additions and 21 deletions.
101 changes: 80 additions & 21 deletions diode-server/distributor/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package distributor

import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"time"

"github.com/kelseyhightower/envconfig"
pb "github.com/netboxlabs/diode/diode-sdk-go/diode/v1/diodepb"
"github.com/netboxlabs/diode/diode-sdk-go/diode/v1/diodepb"
"github.com/netboxlabs/diode/diode-server/reconciler"
"github.com/netboxlabs/diode/diode-server/reconciler/v1/reconcilerpb"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/proto"
)
Expand All @@ -19,16 +23,25 @@ const (
streamID = "diode.v1.ingest-stream"
)

var (
errMetadataNotFound = errors.New("no request metadata found")

// ErrUnauthorized is an error for unauthorized requests
ErrUnauthorized = errors.New("missing or invalid authorization header")
)

// Component is a gRPC server that handles data ingestion requests
type Component struct {
pb.UnimplementedDistributorServiceServer

ctx context.Context
config Config
logger *slog.Logger
grpcListener net.Listener
grpcServer *grpc.Server
redisClient *redis.Client
diodepb.UnimplementedDistributorServiceServer

ctx context.Context
config Config
logger *slog.Logger
grpcListener net.Listener
grpcServer *grpc.Server
redisClient *redis.Client
reconcilerClient reconciler.Client
ingestionDataSources []*reconcilerpb.IngestionDataSource
}

// New creates a new distributor component
Expand All @@ -51,21 +64,54 @@ func New(ctx context.Context, logger *slog.Logger) (*Component, error) {
return nil, fmt.Errorf("failed connection to %s: %v", redisClient.String(), err)
}

grpcServer := grpc.NewServer()
reconcilerClient, err := reconciler.NewClient(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to create reconciler client: %v", err)
}

dataSources, err := reconcilerClient.RetrieveIngestionDataSources(ctx, &reconcilerpb.RetrieveIngestionDataSourcesRequest{})
if err != nil {
return nil, fmt.Errorf("failed to retrieve ingestion data sources: %v", err)
}

ingestionDataSources := dataSources.GetIngestionDataSources()
//auth := grpc.UnaryServerInterceptor(authUnaryInterceptor)

auth := newAuthUnaryInterceptor(ingestionDataSources)

grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor(auth))
component := &Component{
ctx: ctx,
config: cfg,
logger: logger,
grpcListener: grpcListener,
grpcServer: grpcServer,
redisClient: redisClient,
}
pb.RegisterDistributorServiceServer(grpcServer, component)
ctx: ctx,
config: cfg,
logger: logger,
grpcListener: grpcListener,
grpcServer: grpcServer,
redisClient: redisClient,
reconcilerClient: reconcilerClient,
ingestionDataSources: ingestionDataSources,
}
diodepb.RegisterDistributorServiceServer(grpcServer, component)
reflection.Register(grpcServer)

logger.Info("ingestion data sources", "sources", dataSources.GetIngestionDataSources())

return component, nil
}

func newAuthUnaryInterceptor(dataSources []*reconcilerpb.IngestionDataSource) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMetadataNotFound
}
if !authorized(dataSources, md["diode-api-key"]) {
return nil, ErrUnauthorized
}
return handler(ctx, req)
}

}

// Name returns the name of the component
func (c *Component) Name() string {
return "distributor"
Expand All @@ -85,7 +131,7 @@ func (c *Component) Stop() error {
}

// Push handles a push request
func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushResponse, error) {
func (c *Component) Push(ctx context.Context, in *diodepb.PushRequest) (*diodepb.PushResponse, error) {
if err := validatePushRequest(in); err != nil {
return nil, err
}
Expand Down Expand Up @@ -116,10 +162,10 @@ func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushRespo
c.logger.Error("failed to add element to the stream", "error", err, "streamID", streamID, "value", msg)
}

return &pb.PushResponse{Errors: errs}, nil
return &diodepb.PushResponse{Errors: errs}, nil
}

func validatePushRequest(in *pb.PushRequest) error {
func validatePushRequest(in *diodepb.PushRequest) error {
if in.GetId() == "" {
return fmt.Errorf("id is empty")
}
Expand All @@ -146,3 +192,16 @@ func validatePushRequest(in *pb.PushRequest) error {

return nil
}

func authorized(dataSources []*reconcilerpb.IngestionDataSource, authorization []string) bool {
if len(dataSources) < 1 || len(authorization) != 1 {
return false
}

for _, v := range dataSources {
if v.GetApiKey() == authorization[0] {
return true
}
}
return false
}

0 comments on commit d4b2223

Please sign in to comment.