From 9b19350d26ac18b0d22122fb1573af8091c443b8 Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Thu, 1 Feb 2024 19:14:21 +0000 Subject: [PATCH 01/12] feat: OBS-381 - use same logger for consistency Signed-off-by: Michal Fiedorowicz --- diode-server/cmd/distributor/main.go | 11 +++++++---- diode-server/cmd/ingester/main.go | 8 +++++--- diode-server/cmd/reconciler/main.go | 11 +++++++---- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/diode-server/cmd/distributor/main.go b/diode-server/cmd/distributor/main.go index 2c8eb890..ab179e56 100644 --- a/diode-server/cmd/distributor/main.go +++ b/diode-server/cmd/distributor/main.go @@ -2,7 +2,7 @@ package main import ( "context" - "log" + "os" "github.com/netboxlabs/diode-internal/diode-server/distributor" "github.com/netboxlabs/diode-internal/diode-server/server" @@ -14,17 +14,20 @@ func main() { distributorComponent, err := distributor.New(s.Logger()) if err != nil { - log.Fatalf("failed to instantiate distributor component: %v", err) + s.Logger().Error("failed to instantiate distributor component", "error", err) + os.Exit(1) } if err := s.RegisterComponent(distributorComponent); err != nil { - log.Fatalf("failed to register distributor component: %v", err) + s.Logger().Error("failed to register distributor component: %v", err) + os.Exit(1) } // instantiate a prom service for /metrics // prometheusSvc, err := prometheus.New() if err := s.Run(); err != nil { - log.Fatalf("server %s failure: %v", s.Name(), err) + s.Logger().Error("server %s failure: %v", s.Name(), err) + os.Exit(1) } } diff --git a/diode-server/cmd/ingester/main.go b/diode-server/cmd/ingester/main.go index 9642f37b..9d1bc945 100644 --- a/diode-server/cmd/ingester/main.go +++ b/diode-server/cmd/ingester/main.go @@ -2,7 +2,7 @@ package main import ( "context" - "log" + "os" "github.com/netboxlabs/diode-internal/diode-server/ingester" "github.com/netboxlabs/diode-internal/diode-server/server" @@ -15,10 +15,12 @@ func main() { ingesterComponent := ingester.New(s.Logger()) if err := s.RegisterComponent(ingesterComponent); err != nil { - log.Fatalf("failed to register ingerster component: %v", err) + s.Logger().Error("failed to register ingerster component: %v", err) + os.Exit(1) } if err := s.Run(); err != nil { - log.Fatalf("server %s failure: %v", s.Name(), err) + s.Logger().Error("server %s failure: %v", s.Name(), err) + os.Exit(1) } } diff --git a/diode-server/cmd/reconciler/main.go b/diode-server/cmd/reconciler/main.go index 34ca9e4b..50e25184 100644 --- a/diode-server/cmd/reconciler/main.go +++ b/diode-server/cmd/reconciler/main.go @@ -2,7 +2,7 @@ package main import ( "context" - "log" + "os" "github.com/netboxlabs/diode-internal/diode-server/reconciler" "github.com/netboxlabs/diode-internal/diode-server/server" @@ -14,17 +14,20 @@ func main() { reconcilerComponent, err := reconciler.New(s.Logger()) if err != nil { - log.Fatalf("failed to instantiate reconciler component: %v", err) + s.Logger().Error("failed to instantiate reconciler component: %v", err) + os.Exit(1) } if err := s.RegisterComponent(reconcilerComponent); err != nil { - log.Fatalf("failed to register reconciler component: %v", err) + s.Logger().Error("failed to register reconciler component: %v", err) + os.Exit(1) } // instantiate a prom service for /metrics // prometheusSvc, err := prometheus.New() if err := s.Run(); err != nil { - log.Fatalf("server %s failure: %v", s.Name(), err) + s.Logger().Error("server %s failure: %v", s.Name(), err) + os.Exit(1) } } From b95b194d64d51fdcb9e93118ebf700e5f2be3958 Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Thu, 1 Feb 2024 19:15:20 +0000 Subject: [PATCH 02/12] feat: OBS-381 - add redis to distributor config Signed-off-by: Michal Fiedorowicz --- diode-server/distributor/config.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/diode-server/distributor/config.go b/diode-server/distributor/config.go index 012aa2c8..a5904a2b 100644 --- a/diode-server/distributor/config.go +++ b/diode-server/distributor/config.go @@ -2,5 +2,8 @@ package distributor // Config is the configuration for the distributor service type Config struct { - GRPCPort int `envconfig:"GRPC_PORT" default:"8081"` + GRPCPort int `envconfig:"GRPC_PORT" default:"8081"` + RedisHost string `envconfig:"REDIS_HOST" default:"127.0.0.1"` + RedisPort string `envconfig:"REDIS_PORT" default:"6378"` + RedisPassword string `envconfig:"REDIS_PASSWORD" required:"true"` } From 66b0850948f62cf7cf14f7fd49c28535a5c4ab94 Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Thu, 1 Feb 2024 19:16:03 +0000 Subject: [PATCH 03/12] feat: OBS-381 - add redis client to distributor component Signed-off-by: Michal Fiedorowicz --- diode-server/cmd/distributor/main.go | 2 +- diode-server/distributor/component.go | 19 ++++++++++++++++++- diode-server/go.mod | 3 +++ diode-server/go.sum | 10 ++++++++++ 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/diode-server/cmd/distributor/main.go b/diode-server/cmd/distributor/main.go index ab179e56..eb10801f 100644 --- a/diode-server/cmd/distributor/main.go +++ b/diode-server/cmd/distributor/main.go @@ -12,7 +12,7 @@ func main() { ctx := context.Background() s := server.New(ctx, "diode-distributor") - distributorComponent, err := distributor.New(s.Logger()) + distributorComponent, err := distributor.New(ctx, s.Logger()) if err != nil { s.Logger().Error("failed to instantiate distributor component", "error", err) os.Exit(1) diff --git a/diode-server/distributor/component.go b/diode-server/distributor/component.go index 66359ed1..40f0ef52 100644 --- a/diode-server/distributor/component.go +++ b/diode-server/distributor/component.go @@ -8,6 +8,7 @@ import ( "github.com/kelseyhightower/envconfig" pb "github.com/netboxlabs/diode-internal/diode-sdk-go/diode/v1/diodepb" + "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) @@ -16,14 +17,16 @@ import ( type Component struct { pb.UnimplementedDistributorServiceServer + ctx context.Context config Config logger *slog.Logger grpcListener net.Listener grpcServer *grpc.Server + redisClient *redis.Client } // New creates a new distributor component -func New(logger *slog.Logger) (*Component, error) { +func New(ctx context.Context, logger *slog.Logger) (*Component, error) { var cfg Config envconfig.MustProcess("", &cfg) @@ -32,12 +35,23 @@ func New(logger *slog.Logger) (*Component, error) { return nil, fmt.Errorf("failed to listen on port %d: %v", cfg.GRPCPort, err) } + redisClient := redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%s:%s", cfg.RedisHost, cfg.RedisPort), + Password: cfg.RedisPassword, + }) + + if _, err := redisClient.Ping(ctx).Result(); err != nil { + return nil, fmt.Errorf("failed connection to %s: %v", redisClient.String(), err) + } + grpcServer := grpc.NewServer() component := &Component{ + ctx: ctx, config: cfg, logger: logger, grpcListener: grpcListener, grpcServer: grpcServer, + redisClient: redisClient, } pb.RegisterDistributorServiceServer(grpcServer, component) reflection.Register(grpcServer) @@ -60,6 +74,9 @@ func (c *Component) Start(_ context.Context) error { func (c *Component) Stop() error { c.logger.Info("stopping component", "name", c.Name()) c.grpcServer.GracefulStop() + if err := c.redisClient.Close(); err != nil { + return err + } return nil } diff --git a/diode-server/go.mod b/diode-server/go.mod index c653d46d..b79521c7 100644 --- a/diode-server/go.mod +++ b/diode-server/go.mod @@ -6,10 +6,13 @@ require ( github.com/kelseyhightower/envconfig v1.4.0 github.com/netboxlabs/diode-internal/diode-sdk-go v0.0.0 github.com/oklog/run v1.1.0 + github.com/redis/go-redis/v9 v9.4.0 google.golang.org/grpc v1.61.0 ) require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect github.com/golang/protobuf v1.5.3 // indirect golang.org/x/net v0.20.0 // indirect diff --git a/diode-server/go.sum b/diode-server/go.sum index eba5435d..15054795 100644 --- a/diode-server/go.sum +++ b/diode-server/go.sum @@ -1,3 +1,11 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= @@ -10,6 +18,8 @@ github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dv github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= +github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk= +github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= From 1c3651b74269784615b247a7d212f6e4fcdd7e63 Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Thu, 1 Feb 2024 19:32:04 +0000 Subject: [PATCH 04/12] feat: OBS-381 - set correct default redis port Signed-off-by: Michal Fiedorowicz --- diode-server/distributor/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diode-server/distributor/config.go b/diode-server/distributor/config.go index a5904a2b..75af3a31 100644 --- a/diode-server/distributor/config.go +++ b/diode-server/distributor/config.go @@ -4,6 +4,6 @@ package distributor type Config struct { GRPCPort int `envconfig:"GRPC_PORT" default:"8081"` RedisHost string `envconfig:"REDIS_HOST" default:"127.0.0.1"` - RedisPort string `envconfig:"REDIS_PORT" default:"6378"` + RedisPort string `envconfig:"REDIS_PORT" default:"6379"` RedisPassword string `envconfig:"REDIS_PASSWORD" required:"true"` } From 0a94db3f5fedf7cb9cc2cd44b6f1d5bf86f16312 Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Thu, 1 Feb 2024 21:43:06 +0000 Subject: [PATCH 05/12] feat: OBS-381 - fix server error messages formatting Signed-off-by: Michal Fiedorowicz --- diode-server/cmd/distributor/main.go | 4 ++-- diode-server/cmd/ingester/main.go | 4 ++-- diode-server/cmd/reconciler/main.go | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/diode-server/cmd/distributor/main.go b/diode-server/cmd/distributor/main.go index eb10801f..3572e5d5 100644 --- a/diode-server/cmd/distributor/main.go +++ b/diode-server/cmd/distributor/main.go @@ -19,7 +19,7 @@ func main() { } if err := s.RegisterComponent(distributorComponent); err != nil { - s.Logger().Error("failed to register distributor component: %v", err) + s.Logger().Error("failed to register distributor component", "error", err) os.Exit(1) } @@ -27,7 +27,7 @@ func main() { // prometheusSvc, err := prometheus.New() if err := s.Run(); err != nil { - s.Logger().Error("server %s failure: %v", s.Name(), err) + s.Logger().Error("server failure", "serverName", s.Name(), "error", err) os.Exit(1) } } diff --git a/diode-server/cmd/ingester/main.go b/diode-server/cmd/ingester/main.go index 9d1bc945..ab411d1c 100644 --- a/diode-server/cmd/ingester/main.go +++ b/diode-server/cmd/ingester/main.go @@ -15,12 +15,12 @@ func main() { ingesterComponent := ingester.New(s.Logger()) if err := s.RegisterComponent(ingesterComponent); err != nil { - s.Logger().Error("failed to register ingerster component: %v", err) + s.Logger().Error("failed to register ingester component", "error", err) os.Exit(1) } if err := s.Run(); err != nil { - s.Logger().Error("server %s failure: %v", s.Name(), err) + s.Logger().Error("server failure", "serverName", s.Name(), "error", err) os.Exit(1) } } diff --git a/diode-server/cmd/reconciler/main.go b/diode-server/cmd/reconciler/main.go index 50e25184..d9c6fc49 100644 --- a/diode-server/cmd/reconciler/main.go +++ b/diode-server/cmd/reconciler/main.go @@ -14,12 +14,12 @@ func main() { reconcilerComponent, err := reconciler.New(s.Logger()) if err != nil { - s.Logger().Error("failed to instantiate reconciler component: %v", err) + s.Logger().Error("failed to instantiate reconciler component", "error", err) os.Exit(1) } if err := s.RegisterComponent(reconcilerComponent); err != nil { - s.Logger().Error("failed to register reconciler component: %v", err) + s.Logger().Error("failed to register reconciler component", "error", err) os.Exit(1) } @@ -27,7 +27,7 @@ func main() { // prometheusSvc, err := prometheus.New() if err := s.Run(); err != nil { - s.Logger().Error("server %s failure: %v", s.Name(), err) + s.Logger().Error("server failure", "serverName", s.Name(), "error", err) os.Exit(1) } } From b452aa0e51334f15102336be055b70e43d949f51 Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Fri, 2 Feb 2024 16:16:07 +0000 Subject: [PATCH 06/12] feat: OBS-381 - append ingestion data into redis stream Signed-off-by: Michal Fiedorowicz --- diode-server/distributor/component.go | 34 +++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/diode-server/distributor/component.go b/diode-server/distributor/component.go index 40f0ef52..3f730ba6 100644 --- a/diode-server/distributor/component.go +++ b/diode-server/distributor/component.go @@ -5,12 +5,18 @@ import ( "fmt" "log/slog" "net" + "time" "github.com/kelseyhightower/envconfig" pb "github.com/netboxlabs/diode-internal/diode-sdk-go/diode/v1/diodepb" "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/reflection" + "google.golang.org/protobuf/proto" +) + +const ( + streamID = "diode.v1.ingest" ) // Component is a gRPC server that handles data ingestion requests @@ -81,7 +87,31 @@ func (c *Component) Stop() error { } // Push handles a push request -func (c *Component) Push(_ context.Context, in *pb.PushRequest) (*pb.PushResponse, error) { - c.logger.Info("diode.v1.DistributorService/Push called", "stream", in.Stream) +func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushResponse, error) { + for _, v := range in.GetData() { + encodedEntity, err := proto.Marshal(v) + if err != nil { + c.logger.Error("failed to marshal", "error", err, "value", v) + continue + } + msg := map[string]interface{}{ + "id": in.GetId(), + "stream": in.GetStream(), + "producer_app_name": in.GetProducerAppName(), + "producer_app_version": in.GetProducerAppVersion(), + "sdk_name": in.GetSdkName(), + "sdk_version": in.GetSdkVersion(), + "data": encodedEntity, + "ts": v.GetTimestamp().String(), + "ingestion_ts": time.Now().UnixNano(), + } + if err := c.redisClient.XAdd(ctx, &redis.XAddArgs{ + Stream: streamID, + Values: msg, + }).Err(); err != nil { + c.logger.Error("failed to add element to the stream", "error", err, "streamID", streamID, "value", msg) + } + } + return &pb.PushResponse{}, nil } From cae3de2d31a01dec9a8ba3ae70cd285d79615209 Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Fri, 2 Feb 2024 18:16:06 +0000 Subject: [PATCH 07/12] feat: OBS-381 - fix typo Signed-off-by: Michal Fiedorowicz --- diode-server/server/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/diode-server/server/server.go b/diode-server/server/server.go index 4b7fdfa1..e4eef1d5 100644 --- a/diode-server/server/server.go +++ b/diode-server/server/server.go @@ -14,7 +14,7 @@ import ( // A Server is a diode Server type Server struct { - cxt context.Context + ctx context.Context name string logger *slog.Logger @@ -37,7 +37,7 @@ func New(ctx context.Context, name string) *Server { envconfig.MustProcess("", &cfg) return &Server{ - cxt: ctx, + ctx: ctx, name: name, logger: newLogger(cfg), components: make(map[string]Component), @@ -66,7 +66,7 @@ func (s *Server) RegisterComponent(c Component) error { s.components[c.Name()] = c - ctx, cancel := context.WithCancel(s.cxt) + ctx, cancel := context.WithCancel(s.ctx) s.componentGroup.Add( func() error { @@ -86,7 +86,7 @@ func (s *Server) RegisterComponent(c Component) error { func (s *Server) Run() error { s.logger.Info("starting server", "serverName", s.name) - s.componentGroup.Add(run.SignalHandler(s.cxt, os.Interrupt, os.Kill)) + s.componentGroup.Add(run.SignalHandler(s.ctx, os.Interrupt, os.Kill)) return s.componentGroup.Run() } From 150fd61439602378daf424779cffe04998ec2cbf Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Fri, 2 Feb 2024 19:54:18 +0000 Subject: [PATCH 08/12] feat: OBS-381 - add unit tests for server package Signed-off-by: Michal Fiedorowicz --- diode-server/go.mod | 6 +- diode-server/go.sum | 10 ++ diode-server/server/server_test.go | 200 +++++++++++++++++++++++++++++ 3 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 diode-server/server/server_test.go diff --git a/diode-server/go.mod b/diode-server/go.mod index b79521c7..b71be2cd 100644 --- a/diode-server/go.mod +++ b/diode-server/go.mod @@ -7,19 +7,23 @@ require ( github.com/netboxlabs/diode-internal/diode-sdk-go v0.0.0 github.com/oklog/run v1.1.0 github.com/redis/go-redis/v9 v9.4.0 + github.com/stretchr/testify v1.8.4 google.golang.org/grpc v1.61.0 + google.golang.org/protobuf v1.32.0 ) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect - google.golang.org/protobuf v1.32.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) replace github.com/netboxlabs/diode-internal/diode-sdk-go v0.0.0 => ../diode-sdk-go diff --git a/diode-server/go.sum b/diode-server/go.sum index 15054795..4e4903cb 100644 --- a/diode-server/go.sum +++ b/diode-server/go.sum @@ -4,6 +4,8 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= @@ -18,8 +20,12 @@ github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dv github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk= github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= @@ -35,3 +41,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/diode-server/server/server_test.go b/diode-server/server/server_test.go new file mode 100644 index 00000000..4618bf70 --- /dev/null +++ b/diode-server/server/server_test.go @@ -0,0 +1,200 @@ +package server_test + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "testing" + + "github.com/netboxlabs/diode-internal/diode-server/server" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestNewServer tests the server.New function +func TestNewServer(t *testing.T) { + tests := []struct { + desc string + serverName string + loggingLevel string + loggingFormat string + }{ + { + desc: "diode-test-server with debug level and json format", + serverName: "diode-test-server", + loggingLevel: "debug", + loggingFormat: "json", + }, + { + desc: "diode-test-server2 with debug level and text format", + serverName: "diode-test-server2", + loggingLevel: "debug", + loggingFormat: "text", + }, + { + desc: "diode-test-server with info level and json format", + serverName: "diode-test-server", + loggingLevel: "info", + loggingFormat: "json", + }, + { + desc: "diode-test-server with info level and text format", + serverName: "diode-test-server", + loggingLevel: "warn", + loggingFormat: "json", + }, + { + desc: "diode-test-server with error level and text format", + serverName: "diode-test-server", + loggingLevel: "error", + loggingFormat: "text", + }, + { + desc: "diode-test-server with error level and empty format", + serverName: "diode-test-server", + loggingLevel: "error", + loggingFormat: "", + }, + { + desc: "diode-test-server with empty level and text format", + serverName: "diode-test-server", + loggingLevel: "", + loggingFormat: "text", + }, + } + + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + ctx := context.Background() + err := os.Setenv("LOGGING_LEVEL", tt.loggingLevel) + require.NoError(t, err) + err = os.Setenv("LOGGING_FORMAT", tt.loggingFormat) + require.NoError(t, err) + + s := server.New(ctx, tt.serverName) + + assert.Equal(t, tt.serverName, s.Name()) + require.NotNil(t, s.Logger()) + //assert.True(t, s.Logger().Enabled(ctx, slog.LevelDebug)) + + handlerOK := false + if tt.loggingFormat == "text" { + _, handlerOK = s.Logger().Handler().(*slog.TextHandler) + } else { + _, handlerOK = s.Logger().Handler().(*slog.JSONHandler) + } + assert.True(t, handlerOK) + }) + } +} + +// TestRegisterComponent tests the server.RegisterComponent function +func TestRegisterComponent(t *testing.T) { + tests := []struct { + desc string + registrationsNum int + err error + }{ + { + desc: "registering a component", + registrationsNum: 1, + err: nil, + }, + { + desc: "registering a component twice", + registrationsNum: 2, + err: fmt.Errorf("Server.RegisterComponent found duplicate component registration for noop"), + }, + } + + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + ctx := context.Background() + s := server.New(ctx, "diode-test-server") + + var err error + for i := 0; i < tt.registrationsNum; i++ { + err = s.RegisterComponent(&NoopComponent{}) + } + if tt.err != nil { + require.EqualError(t, err, tt.err.Error()) + } else { + require.NoError(t, err) + } + }) + } +} + +// TestRun tests the server.Run function +func TestRun(t *testing.T) { + tests := []struct { + desc string + component server.Component + err error + }{ + { + desc: "running a server with the NoopComponent", + component: &NoopComponent{}, + err: nil, + }, + { + desc: "running a server with the FailingComponent", + component: &FailingComponent{}, + err: fmt.Errorf("start failed"), + }, + } + + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + ctx := context.Background() + s := server.New(ctx, "diode-test-server") + + require.NoError(t, s.RegisterComponent(tt.component)) + err := s.Run() + + if tt.err != nil { + require.EqualError(t, err, tt.err.Error()) + } else { + require.NoError(t, err) + } + }) + } +} + +// NoopComponent is a component that does nothing +type NoopComponent struct{} + +// Name returns the name of the component +func (c *NoopComponent) Name() string { + return "noop" +} + +// Start starts the component +func (c *NoopComponent) Start(_ context.Context) error { + return nil +} + +// Stop stops the component +func (c *NoopComponent) Stop() error { + return nil +} + +// FailingComponent is a component that fails to start and stop +type FailingComponent struct{} + +// Name returns the name of the component +func (c *FailingComponent) Name() string { + return "failing" +} + +// Start starts the component +func (c *FailingComponent) Start(_ context.Context) error { + return errors.New("start failed") +} + +// Stop stops the component +func (c *FailingComponent) Stop() error { + return errors.New("stop failed") +} From 79cbafa2db79b833381385e0bffdd449514312eb Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Fri, 2 Feb 2024 20:14:41 +0000 Subject: [PATCH 09/12] feat: OBS-381 - tidy up server tests Signed-off-by: Michal Fiedorowicz --- diode-server/server/server_test.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/diode-server/server/server_test.go b/diode-server/server/server_test.go index 4618bf70..c0f14c1b 100644 --- a/diode-server/server/server_test.go +++ b/diode-server/server/server_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" ) -// TestNewServer tests the server.New function func TestNewServer(t *testing.T) { tests := []struct { desc string @@ -90,7 +89,6 @@ func TestNewServer(t *testing.T) { } } -// TestRegisterComponent tests the server.RegisterComponent function func TestRegisterComponent(t *testing.T) { tests := []struct { desc string @@ -127,7 +125,6 @@ func TestRegisterComponent(t *testing.T) { } } -// TestRun tests the server.Run function func TestRun(t *testing.T) { tests := []struct { desc string @@ -163,38 +160,30 @@ func TestRun(t *testing.T) { } } -// NoopComponent is a component that does nothing type NoopComponent struct{} -// Name returns the name of the component func (c *NoopComponent) Name() string { return "noop" } -// Start starts the component func (c *NoopComponent) Start(_ context.Context) error { return nil } -// Stop stops the component func (c *NoopComponent) Stop() error { return nil } -// FailingComponent is a component that fails to start and stop type FailingComponent struct{} -// Name returns the name of the component func (c *FailingComponent) Name() string { return "failing" } -// Start starts the component func (c *FailingComponent) Start(_ context.Context) error { return errors.New("start failed") } -// Stop stops the component func (c *FailingComponent) Stop() error { return errors.New("stop failed") } From 5d057d46c8244070a2c4ae20c0bc858171ecbaec Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Fri, 2 Feb 2024 20:43:23 +0000 Subject: [PATCH 10/12] feat: OBS-381 - basic push request validation Signed-off-by: Michal Fiedorowicz --- diode-server/distributor/component.go | 60 +++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/diode-server/distributor/component.go b/diode-server/distributor/component.go index 3f730ba6..866fc5ce 100644 --- a/diode-server/distributor/component.go +++ b/diode-server/distributor/component.go @@ -16,6 +16,9 @@ import ( ) const ( + // DefaultRequestStream is the default stream to use when none is provided + DefaultRequestStream = "latest" + streamID = "diode.v1.ingest" ) @@ -88,19 +91,60 @@ func (c *Component) Stop() error { // Push handles a push request func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushResponse, error) { - for _, v := range in.GetData() { + reqID := in.GetId() + if reqID == "" { + return nil, fmt.Errorf("id is empty") + } + + reqStream := in.GetStream() + if reqStream == "" { + reqStream = DefaultRequestStream + } + + producerAppName := in.GetProducerAppName() + if producerAppName == "" { + return nil, fmt.Errorf("producer app name is empty") + } + + producerAppVersion := in.GetProducerAppVersion() + if producerAppVersion == "" { + return nil, fmt.Errorf("producer app version is empty") + } + + sdkName := in.GetSdkName() + if sdkName == "" { + return nil, fmt.Errorf("sdk name is empty") + } + + sdkVersion := in.GetSdkVersion() + if sdkVersion == "" { + return nil, fmt.Errorf("sdk version is empty") + } + + if len(in.GetData()) < 1 { + return nil, fmt.Errorf("data is empty") + } + + errs := make([]string, 0) + + for i, v := range in.GetData() { + if v.GetData() == nil { + errs = append(errs, fmt.Sprintf("data for index %d is nil", i)) + continue + } + encodedEntity, err := proto.Marshal(v) if err != nil { c.logger.Error("failed to marshal", "error", err, "value", v) continue } msg := map[string]interface{}{ - "id": in.GetId(), - "stream": in.GetStream(), - "producer_app_name": in.GetProducerAppName(), - "producer_app_version": in.GetProducerAppVersion(), - "sdk_name": in.GetSdkName(), - "sdk_version": in.GetSdkVersion(), + "id": reqID, + "stream": reqStream, + "producer_app_name": producerAppName, + "producer_app_version": producerAppVersion, + "sdk_name": sdkName, + "sdk_version": sdkVersion, "data": encodedEntity, "ts": v.GetTimestamp().String(), "ingestion_ts": time.Now().UnixNano(), @@ -113,5 +157,5 @@ func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushRespo } } - return &pb.PushResponse{}, nil + return &pb.PushResponse{Errors: errs}, nil } From a065f49db3fa48e0dbd09de1e52d672d0bb4b687 Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Fri, 2 Feb 2024 20:50:29 +0000 Subject: [PATCH 11/12] feat: OBS-381 - extract push request validation into func Signed-off-by: Michal Fiedorowicz --- diode-server/distributor/component.go | 67 ++++++++++++++------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/diode-server/distributor/component.go b/diode-server/distributor/component.go index 866fc5ce..b222673e 100644 --- a/diode-server/distributor/component.go +++ b/diode-server/distributor/component.go @@ -91,9 +91,8 @@ func (c *Component) Stop() error { // Push handles a push request func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushResponse, error) { - reqID := in.GetId() - if reqID == "" { - return nil, fmt.Errorf("id is empty") + if err := validatePushRequest(in); err != nil { + return nil, err } reqStream := in.GetStream() @@ -101,30 +100,6 @@ func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushRespo reqStream = DefaultRequestStream } - producerAppName := in.GetProducerAppName() - if producerAppName == "" { - return nil, fmt.Errorf("producer app name is empty") - } - - producerAppVersion := in.GetProducerAppVersion() - if producerAppVersion == "" { - return nil, fmt.Errorf("producer app version is empty") - } - - sdkName := in.GetSdkName() - if sdkName == "" { - return nil, fmt.Errorf("sdk name is empty") - } - - sdkVersion := in.GetSdkVersion() - if sdkVersion == "" { - return nil, fmt.Errorf("sdk version is empty") - } - - if len(in.GetData()) < 1 { - return nil, fmt.Errorf("data is empty") - } - errs := make([]string, 0) for i, v := range in.GetData() { @@ -139,12 +114,12 @@ func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushRespo continue } msg := map[string]interface{}{ - "id": reqID, + "id": in.GetId(), "stream": reqStream, - "producer_app_name": producerAppName, - "producer_app_version": producerAppVersion, - "sdk_name": sdkName, - "sdk_version": sdkVersion, + "producer_app_name": in.GetProducerAppName(), + "producer_app_version": in.GetProducerAppVersion(), + "sdk_name": in.GetSdkName(), + "sdk_version": in.GetSdkVersion(), "data": encodedEntity, "ts": v.GetTimestamp().String(), "ingestion_ts": time.Now().UnixNano(), @@ -159,3 +134,31 @@ func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushRespo return &pb.PushResponse{Errors: errs}, nil } + +func validatePushRequest(in *pb.PushRequest) error { + if in.GetId() == "" { + return fmt.Errorf("id is empty") + } + + if in.GetProducerAppName() == "" { + return fmt.Errorf("producer app name is empty") + } + + if in.GetProducerAppVersion() == "" { + return fmt.Errorf("producer app version is empty") + } + + if in.GetSdkName() == "" { + return fmt.Errorf("sdk name is empty") + } + + if in.GetSdkVersion() == "" { + return fmt.Errorf("sdk version is empty") + } + + if len(in.GetData()) < 1 { + return fmt.Errorf("data is empty") + } + + return nil +} From 2aac012fae14f10cccac7c7a60561d9f4f00d1ed Mon Sep 17 00:00:00 2001 From: Michal Fiedorowicz Date: Fri, 2 Feb 2024 21:00:21 +0000 Subject: [PATCH 12/12] feat: OBS-381 - revive if-return Signed-off-by: Michal Fiedorowicz --- diode-server/distributor/component.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/diode-server/distributor/component.go b/diode-server/distributor/component.go index b222673e..0f377a7b 100644 --- a/diode-server/distributor/component.go +++ b/diode-server/distributor/component.go @@ -83,10 +83,7 @@ func (c *Component) Start(_ context.Context) error { func (c *Component) Stop() error { c.logger.Info("stopping component", "name", c.Name()) c.grpcServer.GracefulStop() - if err := c.redisClient.Close(); err != nil { - return err - } - return nil + return c.redisClient.Close() } // Push handles a push request