Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: OBS-381 - implement DistributorService.Push RPC #9

Merged
merged 12 commits into from
Feb 2, 2024
Merged
13 changes: 8 additions & 5 deletions diode-server/cmd/distributor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"context"
"log"
"os"

"github.com/netboxlabs/diode-internal/diode-server/distributor"
"github.com/netboxlabs/diode-internal/diode-server/server"
Expand All @@ -12,19 +12,22 @@ func main() {
ctx := context.Background()
s := server.New(ctx, "diode-distributor")

distributorComponent, err := distributor.New(s.Logger())
distributorComponent, err := distributor.New(ctx, s.Logger())
if err != nil {
log.Fatalf("failed to instantiate distributor component: %v", err)
s.Logger().Error("failed to instantiate distributor component", "error", err)
os.Exit(1)
}

if err := s.RegisterComponent(distributorComponent); err != nil {
log.Fatalf("failed to register distributor component: %v", err)
s.Logger().Error("failed to register distributor component", "error", err)
os.Exit(1)
}

// instantiate a prom service for /metrics
// prometheusSvc, err := prometheus.New()

if err := s.Run(); err != nil {
log.Fatalf("server %s failure: %v", s.Name(), err)
s.Logger().Error("server failure", "serverName", s.Name(), "error", err)
os.Exit(1)
}
}
8 changes: 5 additions & 3 deletions diode-server/cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"context"
"log"
"os"

"github.com/netboxlabs/diode-internal/diode-server/ingester"
"github.com/netboxlabs/diode-internal/diode-server/server"
Expand All @@ -15,10 +15,12 @@ func main() {
ingesterComponent := ingester.New(s.Logger())

if err := s.RegisterComponent(ingesterComponent); err != nil {
log.Fatalf("failed to register ingerster component: %v", err)
s.Logger().Error("failed to register ingester component", "error", err)
os.Exit(1)
}

if err := s.Run(); err != nil {
log.Fatalf("server %s failure: %v", s.Name(), err)
s.Logger().Error("server failure", "serverName", s.Name(), "error", err)
os.Exit(1)
}
}
11 changes: 7 additions & 4 deletions diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"context"
"log"
"os"

"github.com/netboxlabs/diode-internal/diode-server/reconciler"
"github.com/netboxlabs/diode-internal/diode-server/server"
Expand All @@ -14,17 +14,20 @@ func main() {

reconcilerComponent, err := reconciler.New(s.Logger())
if err != nil {
log.Fatalf("failed to instantiate reconciler component: %v", err)
s.Logger().Error("failed to instantiate reconciler component", "error", err)
os.Exit(1)
}

if err := s.RegisterComponent(reconcilerComponent); err != nil {
log.Fatalf("failed to register reconciler component: %v", err)
s.Logger().Error("failed to register reconciler component", "error", err)
os.Exit(1)
}

// instantiate a prom service for /metrics
// prometheusSvc, err := prometheus.New()

if err := s.Run(); err != nil {
log.Fatalf("server %s failure: %v", s.Name(), err)
s.Logger().Error("server failure", "serverName", s.Name(), "error", err)
os.Exit(1)
}
}
101 changes: 96 additions & 5 deletions diode-server/distributor/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,37 @@ import (
"fmt"
"log/slog"
"net"
"time"

"github.com/kelseyhightower/envconfig"
pb "github.com/netboxlabs/diode-internal/diode-sdk-go/diode/v1/diodepb"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/proto"
)

const (
// DefaultRequestStream is the default stream to use when none is provided
DefaultRequestStream = "latest"

streamID = "diode.v1.ingest"
)

// Component is a gRPC server that handles data ingestion requests
type Component struct {
pb.UnimplementedDistributorServiceServer

ctx context.Context
config Config
logger *slog.Logger
grpcListener net.Listener
grpcServer *grpc.Server
redisClient *redis.Client
}

// New creates a new distributor component
func New(logger *slog.Logger) (*Component, error) {
func New(ctx context.Context, logger *slog.Logger) (*Component, error) {
var cfg Config
envconfig.MustProcess("", &cfg)

Expand All @@ -32,12 +44,23 @@ func New(logger *slog.Logger) (*Component, error) {
return nil, fmt.Errorf("failed to listen on port %d: %v", cfg.GRPCPort, err)
}

redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%s", cfg.RedisHost, cfg.RedisPort),
Password: cfg.RedisPassword,
})

if _, err := redisClient.Ping(ctx).Result(); err != nil {
return nil, fmt.Errorf("failed connection to %s: %v", redisClient.String(), err)
}

grpcServer := grpc.NewServer()
component := &Component{
ctx: ctx,
config: cfg,
logger: logger,
grpcListener: grpcListener,
grpcServer: grpcServer,
redisClient: redisClient,
}
pb.RegisterDistributorServiceServer(grpcServer, component)
reflection.Register(grpcServer)
Expand All @@ -60,11 +83,79 @@ func (c *Component) Start(_ context.Context) error {
func (c *Component) Stop() error {
c.logger.Info("stopping component", "name", c.Name())
c.grpcServer.GracefulStop()
return nil
return c.redisClient.Close()
}

// Push handles a push request
func (c *Component) Push(_ context.Context, in *pb.PushRequest) (*pb.PushResponse, error) {
c.logger.Info("diode.v1.DistributorService/Push called", "stream", in.Stream)
return &pb.PushResponse{}, nil
func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushResponse, error) {
if err := validatePushRequest(in); err != nil {
return nil, err
}

reqStream := in.GetStream()
if reqStream == "" {
reqStream = DefaultRequestStream
}

errs := make([]string, 0)

for i, v := range in.GetData() {
if v.GetData() == nil {
errs = append(errs, fmt.Sprintf("data for index %d is nil", i))
continue
}

encodedEntity, err := proto.Marshal(v)
if err != nil {
c.logger.Error("failed to marshal", "error", err, "value", v)
continue
}
msg := map[string]interface{}{
"id": in.GetId(),
"stream": reqStream,
"producer_app_name": in.GetProducerAppName(),
"producer_app_version": in.GetProducerAppVersion(),
"sdk_name": in.GetSdkName(),
"sdk_version": in.GetSdkVersion(),
"data": encodedEntity,
"ts": v.GetTimestamp().String(),
"ingestion_ts": time.Now().UnixNano(),
}
if err := c.redisClient.XAdd(ctx, &redis.XAddArgs{
Stream: streamID,
Values: msg,
}).Err(); err != nil {
c.logger.Error("failed to add element to the stream", "error", err, "streamID", streamID, "value", msg)
}
}

return &pb.PushResponse{Errors: errs}, nil
}

func validatePushRequest(in *pb.PushRequest) error {
if in.GetId() == "" {
return fmt.Errorf("id is empty")
}

if in.GetProducerAppName() == "" {
return fmt.Errorf("producer app name is empty")
}

if in.GetProducerAppVersion() == "" {
return fmt.Errorf("producer app version is empty")
}

if in.GetSdkName() == "" {
return fmt.Errorf("sdk name is empty")
}

if in.GetSdkVersion() == "" {
return fmt.Errorf("sdk version is empty")
}

if len(in.GetData()) < 1 {
return fmt.Errorf("data is empty")
}

return nil
}
5 changes: 4 additions & 1 deletion diode-server/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@ package distributor

// Config is the configuration for the distributor service
type Config struct {
GRPCPort int `envconfig:"GRPC_PORT" default:"8081"`
GRPCPort int `envconfig:"GRPC_PORT" default:"8081"`
RedisHost string `envconfig:"REDIS_HOST" default:"127.0.0.1"`
RedisPort string `envconfig:"REDIS_PORT" default:"6379"`
RedisPassword string `envconfig:"REDIS_PASSWORD" required:"true"`
}
9 changes: 8 additions & 1 deletion diode-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/netboxlabs/diode-internal/diode-sdk-go v0.0.0
github.com/oklog/run v1.1.0
github.com/redis/go-redis/v9 v9.4.0
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.61.0
google.golang.org/protobuf v1.32.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/netboxlabs/diode-internal/diode-sdk-go v0.0.0 => ../diode-sdk-go
20 changes: 20 additions & 0 deletions diode-server/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
Expand All @@ -10,6 +20,12 @@ github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dv
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk=
github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
Expand All @@ -25,3 +41,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
8 changes: 4 additions & 4 deletions diode-server/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// A Server is a diode Server
type Server struct {
cxt context.Context
ctx context.Context
name string
logger *slog.Logger

Expand All @@ -37,7 +37,7 @@ func New(ctx context.Context, name string) *Server {
envconfig.MustProcess("", &cfg)

return &Server{
cxt: ctx,
ctx: ctx,
name: name,
logger: newLogger(cfg),
components: make(map[string]Component),
Expand Down Expand Up @@ -66,7 +66,7 @@ func (s *Server) RegisterComponent(c Component) error {

s.components[c.Name()] = c

ctx, cancel := context.WithCancel(s.cxt)
ctx, cancel := context.WithCancel(s.ctx)

s.componentGroup.Add(
func() error {
Expand All @@ -86,7 +86,7 @@ func (s *Server) RegisterComponent(c Component) error {
func (s *Server) Run() error {
s.logger.Info("starting server", "serverName", s.name)

s.componentGroup.Add(run.SignalHandler(s.cxt, os.Interrupt, os.Kill))
s.componentGroup.Add(run.SignalHandler(s.ctx, os.Interrupt, os.Kill))

return s.componentGroup.Run()
}
Expand Down
Loading
Loading