diff --git a/diode-netbox-plugin/netbox_diode_plugin/management/commands/configurediodeplugin.py b/diode-netbox-plugin/netbox_diode_plugin/management/commands/configurediodeplugin.py index c209e6fb..4e8afab7 100644 --- a/diode-netbox-plugin/netbox_diode_plugin/management/commands/configurediodeplugin.py +++ b/diode-netbox-plugin/netbox_diode_plugin/management/commands/configurediodeplugin.py @@ -26,7 +26,7 @@ class Command(BaseCommand): diode_to_netbox_username = "DIODE_TO_NETBOX" netbox_to_diode_username = "NETBOX_TO_DIODE" - datasource_to_diode_username = "DATASOURCE_TO_DIODE" + ingestion_username = "INGESTION" def add_arguments(self, parser): """Add command arguments.""" @@ -43,10 +43,10 @@ def add_arguments(self, parser): help="NetBox Diode plugin to Diode API key" ) parser.add_argument( - "--datasource-to-diode-api-key", - dest="datasource_to_diode_api_key", + "--ingestion-api-key", + dest="ingestion_api_key", required=True, - help="Datasource to Diode API key" + help="Ingestion API key" ) def handle(self, *args, **options): @@ -57,6 +57,6 @@ def handle(self, *args, **options): _create_user_with_token(self.diode_to_netbox_username, options['diode_to_netbox_api_key'], group) _create_user_with_token(self.netbox_to_diode_username, options['netbox_to_diode_api_key'], group, True) - _create_user_with_token(self.datasource_to_diode_username, options['datasource_to_diode_api_key'], group) + _create_user_with_token(self.ingestion_username, options['ingestion_api_key'], group) self.stdout.write("Finished.") diff --git a/diode-server/distributor/component.go b/diode-server/distributor/component.go index e293b82b..1526be12 100644 --- a/diode-server/distributor/component.go +++ b/diode-server/distributor/component.go @@ -2,36 +2,46 @@ package distributor import ( "context" + "errors" "fmt" "log/slog" "net" "time" "github.com/kelseyhightower/envconfig" - pb "github.com/netboxlabs/diode/diode-sdk-go/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-sdk-go/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/reconciler" + "github.com/netboxlabs/diode/diode-server/reconciler/v1/reconcilerpb" "github.com/redis/go-redis/v9" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" "google.golang.org/protobuf/proto" ) const ( - // DefaultRequestStream is the default stream to use when none is provided - DefaultRequestStream = "latest" - streamID = "diode.v1.ingest-stream" ) +var ( + errMetadataNotFound = errors.New("no request metadata found") + + // ErrUnauthorized is an error for unauthorized requests + ErrUnauthorized = errors.New("missing or invalid authorization header") +) + // Component is a gRPC server that handles data ingestion requests type Component struct { - pb.UnimplementedDistributorServiceServer - - ctx context.Context - config Config - logger *slog.Logger - grpcListener net.Listener - grpcServer *grpc.Server - redisClient *redis.Client + diodepb.UnimplementedDistributorServiceServer + + ctx context.Context + config Config + logger *slog.Logger + grpcListener net.Listener + grpcServer *grpc.Server + redisClient *redis.Client + reconcilerClient reconciler.Client + ingestionDataSources []*reconcilerpb.IngestionDataSource } // New creates a new distributor component @@ -54,21 +64,52 @@ func New(ctx context.Context, logger *slog.Logger) (*Component, error) { return nil, fmt.Errorf("failed connection to %s: %v", redisClient.String(), err) } - grpcServer := grpc.NewServer() + reconcilerClient, err := reconciler.NewClient(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to create reconciler client: %v", err) + } + + dataSources, err := reconcilerClient.RetrieveIngestionDataSources(ctx, &reconcilerpb.RetrieveIngestionDataSourcesRequest{}) + if err != nil { + return nil, fmt.Errorf("failed to retrieve ingestion data sources: %v", err) + } + + ingestionDataSources := dataSources.GetIngestionDataSources() + //auth := grpc.UnaryServerInterceptor(authUnaryInterceptor) + + auth := newAuthUnaryInterceptor(ingestionDataSources) + + grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor(auth)) component := &Component{ - ctx: ctx, - config: cfg, - logger: logger, - grpcListener: grpcListener, - grpcServer: grpcServer, - redisClient: redisClient, - } - pb.RegisterDistributorServiceServer(grpcServer, component) + ctx: ctx, + config: cfg, + logger: logger, + grpcListener: grpcListener, + grpcServer: grpcServer, + redisClient: redisClient, + reconcilerClient: reconcilerClient, + ingestionDataSources: ingestionDataSources, + } + diodepb.RegisterDistributorServiceServer(grpcServer, component) reflection.Register(grpcServer) return component, nil } +func newAuthUnaryInterceptor(dataSources []*reconcilerpb.IngestionDataSource) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, errMetadataNotFound + } + if !authorized(dataSources, md["diode-api-key"]) { + return nil, ErrUnauthorized + } + return handler(ctx, req) + } + +} + // Name returns the name of the component func (c *Component) Name() string { return "distributor" @@ -88,7 +129,7 @@ func (c *Component) Stop() error { } // Push handles a push request -func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushResponse, error) { +func (c *Component) Push(ctx context.Context, in *diodepb.PushRequest) (*diodepb.PushResponse, error) { if err := validatePushRequest(in); err != nil { return nil, err } @@ -119,10 +160,10 @@ func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushRespo c.logger.Error("failed to add element to the stream", "error", err, "streamID", streamID, "value", msg) } - return &pb.PushResponse{Errors: errs}, nil + return &diodepb.PushResponse{Errors: errs}, nil } -func validatePushRequest(in *pb.PushRequest) error { +func validatePushRequest(in *diodepb.PushRequest) error { if in.GetId() == "" { return fmt.Errorf("id is empty") } @@ -149,3 +190,16 @@ func validatePushRequest(in *pb.PushRequest) error { return nil } + +func authorized(dataSources []*reconcilerpb.IngestionDataSource, authorization []string) bool { + if len(dataSources) < 1 || len(authorization) != 1 { + return false + } + + for _, v := range dataSources { + if v.GetApiKey() == authorization[0] { + return true + } + } + return false +} diff --git a/diode-server/docker/diode/env/distributor.env b/diode-server/docker/diode/env/distributor.env index a89183c6..fd5c768a 100644 --- a/diode-server/docker/diode/env/distributor.env +++ b/diode-server/docker/diode/env/distributor.env @@ -2,3 +2,5 @@ API_KEY=CHANGE_.ME REDIS_PASSWORD=@FmnLoA*VnebyVnZoL.!-.6z REDIS_HOST=diode-redis REDIS_PORT=6379 +RECONCILER_GRPC_HOST=diode-reconciler +RECONCILER_GRPC_PORT=8081 diff --git a/diode-server/docker/diode/env/reconciler.env b/diode-server/docker/diode/env/reconciler.env index bb214a6a..80d2a82c 100644 --- a/diode-server/docker/diode/env/reconciler.env +++ b/diode-server/docker/diode/env/reconciler.env @@ -2,6 +2,6 @@ REDIS_PASSWORD=@FmnLoA*VnebyVnZoL.!-.6z REDIS_HOST=diode-redis REDIS_PORT=6379 DIODE_TO_NETBOX_API_KEY=1368dbad13e418d5a443d93cf255edde03a2a754 -NETBOX_DIODE_API_KEY=1e99338b8cab5fc637bc55f390bda1446f619c42 -DATASOURCE_TO_DIODE_API_KEY=5a52c45ee8231156cb620d193b0291912dd15433 +NETBOX_TO_DIODE_API_KEY=1e99338b8cab5fc637bc55f390bda1446f619c42 +INGESTION_API_KEY=5a52c45ee8231156cb620d193b0291912dd15433 NETBOX_API_URL=http://netbox:8000/api diff --git a/diode-server/docker/docker-compose.netbox.yaml b/diode-server/docker/docker-compose.netbox.yaml index 329f54d0..60f6f0d8 100644 --- a/diode-server/docker/docker-compose.netbox.yaml +++ b/diode-server/docker/docker-compose.netbox.yaml @@ -50,7 +50,7 @@ services: - configurediodeplugin - --diode-to-netbox-api-key=${DIODE_TO_NETBOX_API_KEY} - --netbox-to-diode-api-key=${NETBOX_TO_DIODE_API_KEY} - - --datasource-to-diode-api-key=${DATASOURCE_TO_DIODE_API_KEY} + - --ingestion-api-key=${INGESTION_API_KEY} # postgres netbox-postgres: image: docker.io/postgres:16-alpine diff --git a/diode-server/docker/docker-compose.yaml b/diode-server/docker/docker-compose.yaml index bb29ae0e..712d30fe 100644 --- a/diode-server/docker/docker-compose.yaml +++ b/diode-server/docker/docker-compose.yaml @@ -8,6 +8,7 @@ services: - "8081:8081" depends_on: - diode-redis + - diode-reconciler diode-ingester: image: netboxlabs/diode-ingester:${DIODE_VERSION}-${COMMIT_SHA} env_file: diode/env/ingester.env diff --git a/diode-server/docker/netbox/env/netbox.env b/diode-server/docker/netbox/env/netbox.env index cb78065d..285efe14 100644 --- a/diode-server/docker/netbox/env/netbox.env +++ b/diode-server/docker/netbox/env/netbox.env @@ -34,5 +34,5 @@ SKIP_SUPERUSER=true WEBHOOKS_ENABLED=true RELOAD_NETBOX_ON_DIODE_PLUGIN_CHANGE=true DIODE_TO_NETBOX_API_KEY=1368dbad13e418d5a443d93cf255edde03a2a754 -NETBOX_DIODE_API_KEY=1e99338b8cab5fc637bc55f390bda1446f619c42 -DATASOURCE_TO_DIODE_API_KEY=5a52c45ee8231156cb620d193b0291912dd15433 +NETBOX_TO_DIODE_API_KEY=1e99338b8cab5fc637bc55f390bda1446f619c42 +INGESTION_API_KEY=5a52c45ee8231156cb620d193b0291912dd15433 diff --git a/diode-server/proto/buf.gen.yaml b/diode-server/proto/buf.gen.yaml new file mode 100644 index 00000000..e59915f0 --- /dev/null +++ b/diode-server/proto/buf.gen.yaml @@ -0,0 +1,11 @@ +version: v1 +plugins: + - plugin: go + out: ../ + opt: module=github.com/netboxlabs/diode/diode-server + - plugin: buf.build/grpc/go:v1.3.0 + out: ../ + opt: module=github.com/netboxlabs/diode/diode-server + - plugin: buf.build/bufbuild/validate-go:v1.0.4 + out: ../ + opt: module=github.com/netboxlabs/diode/diode-server diff --git a/diode-server/proto/buf.lock b/diode-server/proto/buf.lock new file mode 100644 index 00000000..6675af88 --- /dev/null +++ b/diode-server/proto/buf.lock @@ -0,0 +1,13 @@ +# Generated by buf. DO NOT EDIT. +version: v1 +deps: + - remote: buf.build + owner: envoyproxy + repository: protoc-gen-validate + commit: 71881f09a0c5420a9545a07987a86728 + digest: shake256:d320bbf06653b1b2b45a1f95bfa82bf7b998221a777a042708c50d6f86a30d1a85b50c5704c597142d9b308280efe1295d39d76d1abea5f7046d3df4c8cc3cef + - remote: buf.build + owner: googleapis + repository: googleapis + commit: 7e6f6e774e29406da95bd61cdcdbc8bc + digest: shake256:fe43dd2265ea0c07d76bd925eeba612667cf4c948d2ce53d6e367e1b4b3cb5fa69a51e6acb1a6a50d32f894f054a35e6c0406f6808a483f2752e10c866ffbf73 diff --git a/diode-server/proto/buf.yaml b/diode-server/proto/buf.yaml new file mode 100644 index 00000000..2056cfad --- /dev/null +++ b/diode-server/proto/buf.yaml @@ -0,0 +1,13 @@ +version: v1 +deps: + - buf.build/googleapis/googleapis + - buf.build/envoyproxy/protoc-gen-validate +breaking: + use: + - FILE +lint: + use: + - DEFAULT + except: + - RPC_REQUEST_STANDARD_NAME + - RPC_RESPONSE_STANDARD_NAME diff --git a/diode-server/proto/reconciler/v1/reconciler.proto b/diode-server/proto/reconciler/v1/reconciler.proto new file mode 100644 index 00000000..eb6c66ed --- /dev/null +++ b/diode-server/proto/reconciler/v1/reconciler.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; + +package reconciler.v1; + +import "google/protobuf/timestamp.proto"; +import "validate/validate.proto"; + +option go_package = "github.com/netboxlabs/diode/diode-server/reconciler/v1/reconcilerpb"; + +// An ingestion data source +message IngestionDataSource { + string name = 1 [(validate.rules).string = { + min_len: 1, + max_len: 255 + }]; + string api_key = 2 [(validate.rules).string = { + min_len: 40, + max_len: 40 + }]; +} + +// The request to retrieve ingestion data sources +message RetrieveIngestionDataSourcesRequest { + string name = 1 [(validate.rules).string = { + min_len: 1, + max_len: 255 + }]; + string sdk_name = 2 [(validate.rules).string = { + min_len: 1, + max_len: 255 + }]; + string sdk_version = 3 [(validate.rules).string = {pattern: "^(\\d)+\\.(\\d)+\\.(\\d)+$"}]; +} + +// The response from the retrieve ingestion data sources request +message RetrieveIngestionDataSourcesResponse { + repeated IngestionDataSource ingestion_data_sources = 1; +} + +// Reconciler service API +service ReconcilerService { + // Retrieves ingestion data sources + rpc RetrieveIngestionDataSources(RetrieveIngestionDataSourcesRequest) returns (RetrieveIngestionDataSourcesResponse) {} +} diff --git a/diode-server/reconciler/api_keys.go b/diode-server/reconciler/api_keys.go index 41137c14..a40bd2f0 100644 --- a/diode-server/reconciler/api_keys.go +++ b/diode-server/reconciler/api_keys.go @@ -16,9 +16,9 @@ func (ak APIKeys) MarshalBinary() ([]byte, error) { func loadAPIKeys(ctx context.Context, cfg Config, rc *redis.Client) (APIKeys, error) { apiKeys := map[string]string{ - "DIODE_TO_NETBOX": cfg.DiodeToNetBoxAPIKey, - "NETBOX_TO_DIODE": cfg.NetBoxToDiodeAPIKey, - "DATASOURCE_TO_DIODE": cfg.DatasourceToDiodeAPIKey, + "DIODE_TO_NETBOX": cfg.DiodeToNetBoxAPIKey, + "NETBOX_TO_DIODE": cfg.NetBoxToDiodeAPIKey, + "INGESTION": cfg.IngestionAPIKey, } if err := rc.HSet(ctx, "diode.api_keys", apiKeys).Err(); err != nil { diff --git a/diode-server/reconciler/client.go b/diode-server/reconciler/client.go new file mode 100644 index 00000000..0b8fc143 --- /dev/null +++ b/diode-server/reconciler/client.go @@ -0,0 +1,136 @@ +package reconciler + +import ( + "context" + "errors" + "fmt" + "os" + "strconv" + "time" + + pb "github.com/netboxlabs/diode/diode-server/reconciler/v1/reconcilerpb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + // SDKName is the name of the Diode SDK + SDKName = "reconciler-sdk-go" + + // SDKVersion is the version of the Diode SDK + SDKVersion = "0.1.0" + + // GRPCHostEnvVarName is the environment variable name for the reconciler gRPC host + GRPCHostEnvVarName = "RECONCILER_GRPC_HOST" + + // GRPCPortEnvVarName is the environment variable name for the reconciler gRPC port + GRPCPortEnvVarName = "RECONCILER_GRPC_PORT" + + // GRPCTimeoutSecondsEnvVarName is the environment variable name for the reconciler gRPC timeout in seconds + GRPCTimeoutSecondsEnvVarName = "RECONCILER_GRPC_TIMEOUT_SECONDS" + + defaultGRPCHost = "127.0.0.1" + + defaultGRPCPort = "8081" + + defaultGRPCTimeoutSeconds = 5 +) + +var ( + // ErrInvalidTimeout is an error for invalid timeout value + ErrInvalidTimeout = errors.New("invalid timeout value") +) + +// Client is an interface that defines the methods available from reconciler API +type Client interface { + // Close closes the connection to the API service + Close() error + + // RetrieveIngestionDataSources retrieves ingestion data sources + RetrieveIngestionDataSources(context.Context, *pb.RetrieveIngestionDataSourcesRequest, ...grpc.CallOption) (*pb.RetrieveIngestionDataSourcesResponse, error) +} + +// GRPCClient is a gRPC implementation of the distributor service +type GRPCClient struct { + // gRPC virtual connection + conn *grpc.ClientConn + + // The gRPC API client + client pb.ReconcilerServiceClient +} + +// Close closes the connection to the API service +func (g *GRPCClient) Close() error { + if g.conn != nil { + return g.conn.Close() + } + return nil +} + +// RetrieveIngestionDataSources retrieves ingestion data sources +func (g *GRPCClient) RetrieveIngestionDataSources(ctx context.Context, req *pb.RetrieveIngestionDataSourcesRequest, opt ...grpc.CallOption) (*pb.RetrieveIngestionDataSourcesResponse, error) { + req.SdkName = SDKName + req.SdkVersion = SDKVersion + return g.client.RetrieveIngestionDataSources(ctx, req, opt...) +} + +// NewClient creates a new reconciler client based on gRPC +func NewClient(ctx context.Context) (Client, error) { + dialOpts := []grpc.DialOption{ + grpc.WithUserAgent(userAgent()), + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + + timeout, err := grpcTimeout() + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + target := grpcTarget() + + conn, err := grpc.DialContext(ctx, target, dialOpts...) + if err != nil { + return nil, err + } + + c := &GRPCClient{ + conn: conn, + client: pb.NewReconcilerServiceClient(conn), + } + + return c, nil +} + +func userAgent() string { + return fmt.Sprintf("%s/%s", SDKName, SDKVersion) +} + +func grpcTarget() string { + host, ok := os.LookupEnv(GRPCHostEnvVarName) + if !ok { + host = defaultGRPCHost + } + + port, ok := os.LookupEnv(GRPCPortEnvVarName) + if !ok { + port = defaultGRPCPort + } + + return fmt.Sprintf("%s:%s", host, port) +} + +func grpcTimeout() (time.Duration, error) { + timeoutSecondsStr, ok := os.LookupEnv(GRPCTimeoutSecondsEnvVarName) + if !ok || len(timeoutSecondsStr) == 0 { + return defaultGRPCTimeoutSeconds * time.Second, nil + } + + timeout, err := strconv.Atoi(timeoutSecondsStr) + if err != nil || timeout <= 0 { + return 0, ErrInvalidTimeout + } + return time.Duration(timeout) * time.Second, nil +} diff --git a/diode-server/reconciler/client_test.go b/diode-server/reconciler/client_test.go new file mode 100644 index 00000000..043d18a8 --- /dev/null +++ b/diode-server/reconciler/client_test.go @@ -0,0 +1,67 @@ +package reconciler_test + +import ( + "context" + "github.com/netboxlabs/diode/diode-server/reconciler" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewClient(t *testing.T) { + cleanUpEnvVars() + + client, err := reconciler.NewClient(context.Background()) + require.NoError(t, err) + require.NotNil(t, client) + require.NoError(t, client.Close()) +} + +func TestNewClientWithTimeout(t *testing.T) { + tests := []struct { + desc string + timeoutStr string + err error + }{ + { + desc: "timeout with valid value", + timeoutStr: "10", + err: nil, + }, + { + desc: "timeout with negative value", + timeoutStr: "-1", + err: reconciler.ErrInvalidTimeout, + }, + { + desc: "timeout with non-parseable value", + timeoutStr: "10a", + err: reconciler.ErrInvalidTimeout, + }, + } + + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + cleanUpEnvVars() + + _ = os.Setenv(reconciler.GRPCTimeoutSecondsEnvVarName, tt.timeoutStr) + + client, err := reconciler.NewClient(context.Background()) + if tt.err == nil { + require.NoError(t, err) + require.NotNil(t, client) + require.NoError(t, client.Close()) + } else { + require.Nil(t, client) + require.EqualError(t, err, tt.err.Error()) + } + }) + } +} + +func cleanUpEnvVars() { + _ = os.Unsetenv(reconciler.GRPCHostEnvVarName) + _ = os.Unsetenv(reconciler.GRPCPortEnvVarName) + _ = os.Unsetenv(reconciler.GRPCTimeoutSecondsEnvVarName) +} diff --git a/diode-server/reconciler/component.go b/diode-server/reconciler/component.go index 254a8bb2..3676b94b 100644 --- a/diode-server/reconciler/component.go +++ b/diode-server/reconciler/component.go @@ -5,8 +5,10 @@ import ( "fmt" "log/slog" "net" + "strings" "github.com/kelseyhightower/envconfig" + pb "github.com/netboxlabs/diode/diode-server/reconciler/v1/reconcilerpb" "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -14,6 +16,8 @@ import ( // Component reconciles ingested data type Component struct { + pb.UnimplementedReconcilerServiceServer + config Config logger *slog.Logger grpcListener net.Listener @@ -56,6 +60,7 @@ func New(ctx context.Context, logger *slog.Logger) (*Component, error) { redisClient: redisClient, apiKeys: apiKeys, } + pb.RegisterReconcilerServiceServer(grpcServer, component) reflection.Register(grpcServer) return component, nil @@ -76,5 +81,40 @@ func (c *Component) Start(_ context.Context) error { func (c *Component) Stop() error { c.logger.Info("stopping component", "name", c.Name()) c.grpcServer.GracefulStop() + return c.redisClient.Close() +} + +// RetrieveIngestionDataSources retrieves ingestion data sources +func (c *Component) RetrieveIngestionDataSources(_ context.Context, in *pb.RetrieveIngestionDataSourcesRequest) (*pb.RetrieveIngestionDataSourcesResponse, error) { + if err := validateRetrieveIngestionDataSourcesRequest(in); err != nil { + return nil, err + } + + dataSources := make([]*pb.IngestionDataSource, 0) + filterByName := in.Name != "" + + if filterByName { + if _, ok := c.apiKeys[in.Name]; !ok || !strings.HasPrefix(in.Name, "INGESTION") { + return nil, fmt.Errorf("data source %s not found", in.Name) + } + dataSources = append(dataSources, &pb.IngestionDataSource{Name: in.Name, ApiKey: c.apiKeys[in.Name]}) + return &pb.RetrieveIngestionDataSourcesResponse{IngestionDataSources: dataSources}, nil + } + + for name, key := range c.apiKeys { + if strings.HasPrefix(name, "INGESTION") { + dataSources = append(dataSources, &pb.IngestionDataSource{Name: name, ApiKey: key}) + } + } + return &pb.RetrieveIngestionDataSourcesResponse{IngestionDataSources: dataSources}, nil +} + +func validateRetrieveIngestionDataSourcesRequest(in *pb.RetrieveIngestionDataSourcesRequest) error { + if in.GetSdkName() == "" { + return fmt.Errorf("sdk name is empty") + } + if in.GetSdkVersion() == "" { + return fmt.Errorf("sdk version is empty") + } return nil } diff --git a/diode-server/reconciler/config.go b/diode-server/reconciler/config.go index 20a85b24..8b06be11 100644 --- a/diode-server/reconciler/config.go +++ b/diode-server/reconciler/config.go @@ -2,13 +2,13 @@ package reconciler // Config is the configuration for the reconciler service type Config struct { - GRPCPort int `envconfig:"GRPC_PORT" default:"8081"` - RedisHost string `envconfig:"REDIS_HOST" default:"127.0.0.1"` - RedisPort string `envconfig:"REDIS_PORT" default:"6379"` - RedisPassword string `envconfig:"REDIS_PASSWORD" required:"true"` - RedisDB int `envconfig:"REDIS_DB" default:"0"` - NetBoxAPIURL string `envconfig:"NETBOX_API_URL" required:"true"` - DiodeToNetBoxAPIKey string `envconfig:"DIODE_TO_NETBOX_API_KEY" required:"true"` - NetBoxToDiodeAPIKey string `envconfig:"NETBOX_DIODE_API_KEY" required:"true"` - DatasourceToDiodeAPIKey string `envconfig:"DATASOURCE_TO_DIODE_API_KEY" required:"true"` + GRPCPort int `envconfig:"GRPC_PORT" default:"8081"` + RedisHost string `envconfig:"REDIS_HOST" default:"127.0.0.1"` + RedisPort string `envconfig:"REDIS_PORT" default:"6379"` + RedisPassword string `envconfig:"REDIS_PASSWORD" required:"true"` + RedisDB int `envconfig:"REDIS_DB" default:"0"` + NetBoxAPIURL string `envconfig:"NETBOX_API_URL" required:"true"` + DiodeToNetBoxAPIKey string `envconfig:"DIODE_TO_NETBOX_API_KEY" required:"true"` + NetBoxToDiodeAPIKey string `envconfig:"NETBOX_TO_DIODE_API_KEY" required:"true"` + IngestionAPIKey string `envconfig:"INGESTION_API_KEY" required:"true"` } diff --git a/diode-server/reconciler/v1/reconcilerpb/reconciler.pb.go b/diode-server/reconciler/v1/reconcilerpb/reconciler.pb.go new file mode 100644 index 00000000..8feedf2d --- /dev/null +++ b/diode-server/reconciler/v1/reconcilerpb/reconciler.pb.go @@ -0,0 +1,335 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc (unknown) +// source: reconciler/v1/reconciler.proto + +package reconcilerpb + +import ( + _ "github.com/envoyproxy/protoc-gen-validate/validate" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + _ "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// An ingestion data source +type IngestionDataSource struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + ApiKey string `protobuf:"bytes,2,opt,name=api_key,json=apiKey,proto3" json:"api_key,omitempty"` +} + +func (x *IngestionDataSource) Reset() { + *x = IngestionDataSource{} + if protoimpl.UnsafeEnabled { + mi := &file_reconciler_v1_reconciler_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IngestionDataSource) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IngestionDataSource) ProtoMessage() {} + +func (x *IngestionDataSource) ProtoReflect() protoreflect.Message { + mi := &file_reconciler_v1_reconciler_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IngestionDataSource.ProtoReflect.Descriptor instead. +func (*IngestionDataSource) Descriptor() ([]byte, []int) { + return file_reconciler_v1_reconciler_proto_rawDescGZIP(), []int{0} +} + +func (x *IngestionDataSource) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *IngestionDataSource) GetApiKey() string { + if x != nil { + return x.ApiKey + } + return "" +} + +// The request to retrieve ingestion data sources +type RetrieveIngestionDataSourcesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + SdkName string `protobuf:"bytes,2,opt,name=sdk_name,json=sdkName,proto3" json:"sdk_name,omitempty"` + SdkVersion string `protobuf:"bytes,3,opt,name=sdk_version,json=sdkVersion,proto3" json:"sdk_version,omitempty"` +} + +func (x *RetrieveIngestionDataSourcesRequest) Reset() { + *x = RetrieveIngestionDataSourcesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_reconciler_v1_reconciler_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RetrieveIngestionDataSourcesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RetrieveIngestionDataSourcesRequest) ProtoMessage() {} + +func (x *RetrieveIngestionDataSourcesRequest) ProtoReflect() protoreflect.Message { + mi := &file_reconciler_v1_reconciler_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RetrieveIngestionDataSourcesRequest.ProtoReflect.Descriptor instead. +func (*RetrieveIngestionDataSourcesRequest) Descriptor() ([]byte, []int) { + return file_reconciler_v1_reconciler_proto_rawDescGZIP(), []int{1} +} + +func (x *RetrieveIngestionDataSourcesRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *RetrieveIngestionDataSourcesRequest) GetSdkName() string { + if x != nil { + return x.SdkName + } + return "" +} + +func (x *RetrieveIngestionDataSourcesRequest) GetSdkVersion() string { + if x != nil { + return x.SdkVersion + } + return "" +} + +// The response from the retrieve ingestion data sources request +type RetrieveIngestionDataSourcesResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + IngestionDataSources []*IngestionDataSource `protobuf:"bytes,1,rep,name=ingestion_data_sources,json=ingestionDataSources,proto3" json:"ingestion_data_sources,omitempty"` +} + +func (x *RetrieveIngestionDataSourcesResponse) Reset() { + *x = RetrieveIngestionDataSourcesResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_reconciler_v1_reconciler_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RetrieveIngestionDataSourcesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RetrieveIngestionDataSourcesResponse) ProtoMessage() {} + +func (x *RetrieveIngestionDataSourcesResponse) ProtoReflect() protoreflect.Message { + mi := &file_reconciler_v1_reconciler_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RetrieveIngestionDataSourcesResponse.ProtoReflect.Descriptor instead. +func (*RetrieveIngestionDataSourcesResponse) Descriptor() ([]byte, []int) { + return file_reconciler_v1_reconciler_proto_rawDescGZIP(), []int{2} +} + +func (x *RetrieveIngestionDataSourcesResponse) GetIngestionDataSources() []*IngestionDataSource { + if x != nil { + return x.IngestionDataSources + } + return nil +} + +var File_reconciler_v1_reconciler_proto protoreflect.FileDescriptor + +var file_reconciler_v1_reconciler_proto_rawDesc = []byte{ + 0x0a, 0x1e, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x2f, 0x76, 0x31, 0x2f, + 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x0d, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x1a, + 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x1a, 0x17, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, + 0x61, 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x13, 0x49, 0x6e, 0x67, + 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x12, 0x1e, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x0a, + 0xfa, 0x42, 0x07, 0x72, 0x05, 0x10, 0x01, 0x18, 0xff, 0x01, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x22, 0x0a, 0x07, 0x61, 0x70, 0x69, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x42, 0x09, 0xfa, 0x42, 0x06, 0x72, 0x04, 0x10, 0x28, 0x18, 0x28, 0x52, 0x06, 0x61, 0x70, + 0x69, 0x4b, 0x65, 0x79, 0x22, 0xab, 0x01, 0x0a, 0x23, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, + 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x0a, 0xfa, 0x42, 0x07, 0x72, + 0x05, 0x10, 0x01, 0x18, 0xff, 0x01, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x25, 0x0a, 0x08, + 0x73, 0x64, 0x6b, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x0a, + 0xfa, 0x42, 0x07, 0x72, 0x05, 0x10, 0x01, 0x18, 0xff, 0x01, 0x52, 0x07, 0x73, 0x64, 0x6b, 0x4e, + 0x61, 0x6d, 0x65, 0x12, 0x3d, 0x0a, 0x0b, 0x73, 0x64, 0x6b, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x42, 0x1c, 0xfa, 0x42, 0x19, 0x72, 0x17, 0x32, + 0x15, 0x5e, 0x28, 0x5c, 0x64, 0x29, 0x2b, 0x5c, 0x2e, 0x28, 0x5c, 0x64, 0x29, 0x2b, 0x5c, 0x2e, + 0x28, 0x5c, 0x64, 0x29, 0x2b, 0x24, 0x52, 0x0a, 0x73, 0x64, 0x6b, 0x56, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x22, 0x80, 0x01, 0x0a, 0x24, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, + 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x58, 0x0a, 0x16, 0x69, + 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x72, 0x65, + 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x67, 0x65, + 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, + 0x14, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x73, 0x32, 0x9f, 0x01, 0x0a, 0x11, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, + 0x69, 0x6c, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x1c, + 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, + 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x32, 0x2e, 0x72, + 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, + 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, + 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x33, 0x2e, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x2e, 0x76, 0x31, + 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, + 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x45, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x65, 0x74, 0x62, 0x6f, 0x78, 0x6c, 0x61, 0x62, 0x73, + 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2d, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x2f, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x2f, 0x76, + 0x31, 0x2f, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_reconciler_v1_reconciler_proto_rawDescOnce sync.Once + file_reconciler_v1_reconciler_proto_rawDescData = file_reconciler_v1_reconciler_proto_rawDesc +) + +func file_reconciler_v1_reconciler_proto_rawDescGZIP() []byte { + file_reconciler_v1_reconciler_proto_rawDescOnce.Do(func() { + file_reconciler_v1_reconciler_proto_rawDescData = protoimpl.X.CompressGZIP(file_reconciler_v1_reconciler_proto_rawDescData) + }) + return file_reconciler_v1_reconciler_proto_rawDescData +} + +var file_reconciler_v1_reconciler_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_reconciler_v1_reconciler_proto_goTypes = []interface{}{ + (*IngestionDataSource)(nil), // 0: reconciler.v1.IngestionDataSource + (*RetrieveIngestionDataSourcesRequest)(nil), // 1: reconciler.v1.RetrieveIngestionDataSourcesRequest + (*RetrieveIngestionDataSourcesResponse)(nil), // 2: reconciler.v1.RetrieveIngestionDataSourcesResponse +} +var file_reconciler_v1_reconciler_proto_depIdxs = []int32{ + 0, // 0: reconciler.v1.RetrieveIngestionDataSourcesResponse.ingestion_data_sources:type_name -> reconciler.v1.IngestionDataSource + 1, // 1: reconciler.v1.ReconcilerService.RetrieveIngestionDataSources:input_type -> reconciler.v1.RetrieveIngestionDataSourcesRequest + 2, // 2: reconciler.v1.ReconcilerService.RetrieveIngestionDataSources:output_type -> reconciler.v1.RetrieveIngestionDataSourcesResponse + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_reconciler_v1_reconciler_proto_init() } +func file_reconciler_v1_reconciler_proto_init() { + if File_reconciler_v1_reconciler_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_reconciler_v1_reconciler_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*IngestionDataSource); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_reconciler_v1_reconciler_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RetrieveIngestionDataSourcesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_reconciler_v1_reconciler_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RetrieveIngestionDataSourcesResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_reconciler_v1_reconciler_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_reconciler_v1_reconciler_proto_goTypes, + DependencyIndexes: file_reconciler_v1_reconciler_proto_depIdxs, + MessageInfos: file_reconciler_v1_reconciler_proto_msgTypes, + }.Build() + File_reconciler_v1_reconciler_proto = out.File + file_reconciler_v1_reconciler_proto_rawDesc = nil + file_reconciler_v1_reconciler_proto_goTypes = nil + file_reconciler_v1_reconciler_proto_depIdxs = nil +} diff --git a/diode-server/reconciler/v1/reconcilerpb/reconciler.pb.validate.go b/diode-server/reconciler/v1/reconcilerpb/reconciler.pb.validate.go new file mode 100644 index 00000000..f1d1add5 --- /dev/null +++ b/diode-server/reconciler/v1/reconcilerpb/reconciler.pb.validate.go @@ -0,0 +1,440 @@ +// Code generated by protoc-gen-validate. DO NOT EDIT. +// source: reconciler/v1/reconciler.proto + +package reconcilerpb + +import ( + "bytes" + "errors" + "fmt" + "net" + "net/mail" + "net/url" + "regexp" + "sort" + "strings" + "time" + "unicode/utf8" + + "google.golang.org/protobuf/types/known/anypb" +) + +// ensure the imports are used +var ( + _ = bytes.MinRead + _ = errors.New("") + _ = fmt.Print + _ = utf8.UTFMax + _ = (*regexp.Regexp)(nil) + _ = (*strings.Reader)(nil) + _ = net.IPv4len + _ = time.Duration(0) + _ = (*url.URL)(nil) + _ = (*mail.Address)(nil) + _ = anypb.Any{} + _ = sort.Sort +) + +// Validate checks the field values on IngestionDataSource with the rules +// defined in the proto definition for this message. If any rules are +// violated, the first error encountered is returned, or nil if there are no violations. +func (m *IngestionDataSource) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on IngestionDataSource with the rules +// defined in the proto definition for this message. If any rules are +// violated, the result is a list of violation errors wrapped in +// IngestionDataSourceMultiError, or nil if none found. +func (m *IngestionDataSource) ValidateAll() error { + return m.validate(true) +} + +func (m *IngestionDataSource) validate(all bool) error { + if m == nil { + return nil + } + + var errors []error + + if l := utf8.RuneCountInString(m.GetName()); l < 1 || l > 255 { + err := IngestionDataSourceValidationError{ + field: "Name", + reason: "value length must be between 1 and 255 runes, inclusive", + } + if !all { + return err + } + errors = append(errors, err) + } + + if utf8.RuneCountInString(m.GetApiKey()) != 40 { + err := IngestionDataSourceValidationError{ + field: "ApiKey", + reason: "value length must be 40 runes", + } + if !all { + return err + } + errors = append(errors, err) + + } + + if len(errors) > 0 { + return IngestionDataSourceMultiError(errors) + } + + return nil +} + +// IngestionDataSourceMultiError is an error wrapping multiple validation +// errors returned by IngestionDataSource.ValidateAll() if the designated +// constraints aren't met. +type IngestionDataSourceMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m IngestionDataSourceMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m IngestionDataSourceMultiError) AllErrors() []error { return m } + +// IngestionDataSourceValidationError is the validation error returned by +// IngestionDataSource.Validate if the designated constraints aren't met. +type IngestionDataSourceValidationError struct { + field string + reason string + cause error + key bool +} + +// Field function returns field value. +func (e IngestionDataSourceValidationError) Field() string { return e.field } + +// Reason function returns reason value. +func (e IngestionDataSourceValidationError) Reason() string { return e.reason } + +// Cause function returns cause value. +func (e IngestionDataSourceValidationError) Cause() error { return e.cause } + +// Key function returns key value. +func (e IngestionDataSourceValidationError) Key() bool { return e.key } + +// ErrorName returns error name. +func (e IngestionDataSourceValidationError) ErrorName() string { + return "IngestionDataSourceValidationError" +} + +// Error satisfies the builtin error interface +func (e IngestionDataSourceValidationError) Error() string { + cause := "" + if e.cause != nil { + cause = fmt.Sprintf(" | caused by: %v", e.cause) + } + + key := "" + if e.key { + key = "key for " + } + + return fmt.Sprintf( + "invalid %sIngestionDataSource.%s: %s%s", + key, + e.field, + e.reason, + cause) +} + +var _ error = IngestionDataSourceValidationError{} + +var _ interface { + Field() string + Reason() string + Key() bool + Cause() error + ErrorName() string +} = IngestionDataSourceValidationError{} + +// Validate checks the field values on RetrieveIngestionDataSourcesRequest with +// the rules defined in the proto definition for this message. If any rules +// are violated, the first error encountered is returned, or nil if there are +// no violations. +func (m *RetrieveIngestionDataSourcesRequest) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on RetrieveIngestionDataSourcesRequest +// with the rules defined in the proto definition for this message. If any +// rules are violated, the result is a list of violation errors wrapped in +// RetrieveIngestionDataSourcesRequestMultiError, or nil if none found. +func (m *RetrieveIngestionDataSourcesRequest) ValidateAll() error { + return m.validate(true) +} + +func (m *RetrieveIngestionDataSourcesRequest) validate(all bool) error { + if m == nil { + return nil + } + + var errors []error + + if l := utf8.RuneCountInString(m.GetName()); l < 1 || l > 255 { + err := RetrieveIngestionDataSourcesRequestValidationError{ + field: "Name", + reason: "value length must be between 1 and 255 runes, inclusive", + } + if !all { + return err + } + errors = append(errors, err) + } + + if l := utf8.RuneCountInString(m.GetSdkName()); l < 1 || l > 255 { + err := RetrieveIngestionDataSourcesRequestValidationError{ + field: "SdkName", + reason: "value length must be between 1 and 255 runes, inclusive", + } + if !all { + return err + } + errors = append(errors, err) + } + + if !_RetrieveIngestionDataSourcesRequest_SdkVersion_Pattern.MatchString(m.GetSdkVersion()) { + err := RetrieveIngestionDataSourcesRequestValidationError{ + field: "SdkVersion", + reason: "value does not match regex pattern \"^(\\\\d)+\\\\.(\\\\d)+\\\\.(\\\\d)+$\"", + } + if !all { + return err + } + errors = append(errors, err) + } + + if len(errors) > 0 { + return RetrieveIngestionDataSourcesRequestMultiError(errors) + } + + return nil +} + +// RetrieveIngestionDataSourcesRequestMultiError is an error wrapping multiple +// validation errors returned by +// RetrieveIngestionDataSourcesRequest.ValidateAll() if the designated +// constraints aren't met. +type RetrieveIngestionDataSourcesRequestMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m RetrieveIngestionDataSourcesRequestMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m RetrieveIngestionDataSourcesRequestMultiError) AllErrors() []error { return m } + +// RetrieveIngestionDataSourcesRequestValidationError is the validation error +// returned by RetrieveIngestionDataSourcesRequest.Validate if the designated +// constraints aren't met. +type RetrieveIngestionDataSourcesRequestValidationError struct { + field string + reason string + cause error + key bool +} + +// Field function returns field value. +func (e RetrieveIngestionDataSourcesRequestValidationError) Field() string { return e.field } + +// Reason function returns reason value. +func (e RetrieveIngestionDataSourcesRequestValidationError) Reason() string { return e.reason } + +// Cause function returns cause value. +func (e RetrieveIngestionDataSourcesRequestValidationError) Cause() error { return e.cause } + +// Key function returns key value. +func (e RetrieveIngestionDataSourcesRequestValidationError) Key() bool { return e.key } + +// ErrorName returns error name. +func (e RetrieveIngestionDataSourcesRequestValidationError) ErrorName() string { + return "RetrieveIngestionDataSourcesRequestValidationError" +} + +// Error satisfies the builtin error interface +func (e RetrieveIngestionDataSourcesRequestValidationError) Error() string { + cause := "" + if e.cause != nil { + cause = fmt.Sprintf(" | caused by: %v", e.cause) + } + + key := "" + if e.key { + key = "key for " + } + + return fmt.Sprintf( + "invalid %sRetrieveIngestionDataSourcesRequest.%s: %s%s", + key, + e.field, + e.reason, + cause) +} + +var _ error = RetrieveIngestionDataSourcesRequestValidationError{} + +var _ interface { + Field() string + Reason() string + Key() bool + Cause() error + ErrorName() string +} = RetrieveIngestionDataSourcesRequestValidationError{} + +var _RetrieveIngestionDataSourcesRequest_SdkVersion_Pattern = regexp.MustCompile("^(\\d)+\\.(\\d)+\\.(\\d)+$") + +// Validate checks the field values on RetrieveIngestionDataSourcesResponse +// with the rules defined in the proto definition for this message. If any +// rules are violated, the first error encountered is returned, or nil if +// there are no violations. +func (m *RetrieveIngestionDataSourcesResponse) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on RetrieveIngestionDataSourcesResponse +// with the rules defined in the proto definition for this message. If any +// rules are violated, the result is a list of violation errors wrapped in +// RetrieveIngestionDataSourcesResponseMultiError, or nil if none found. +func (m *RetrieveIngestionDataSourcesResponse) ValidateAll() error { + return m.validate(true) +} + +func (m *RetrieveIngestionDataSourcesResponse) validate(all bool) error { + if m == nil { + return nil + } + + var errors []error + + for idx, item := range m.GetIngestionDataSources() { + _, _ = idx, item + + if all { + switch v := interface{}(item).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, RetrieveIngestionDataSourcesResponseValidationError{ + field: fmt.Sprintf("IngestionDataSources[%v]", idx), + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, RetrieveIngestionDataSourcesResponseValidationError{ + field: fmt.Sprintf("IngestionDataSources[%v]", idx), + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(item).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return RetrieveIngestionDataSourcesResponseValidationError{ + field: fmt.Sprintf("IngestionDataSources[%v]", idx), + reason: "embedded message failed validation", + cause: err, + } + } + } + + } + + if len(errors) > 0 { + return RetrieveIngestionDataSourcesResponseMultiError(errors) + } + + return nil +} + +// RetrieveIngestionDataSourcesResponseMultiError is an error wrapping multiple +// validation errors returned by +// RetrieveIngestionDataSourcesResponse.ValidateAll() if the designated +// constraints aren't met. +type RetrieveIngestionDataSourcesResponseMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m RetrieveIngestionDataSourcesResponseMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m RetrieveIngestionDataSourcesResponseMultiError) AllErrors() []error { return m } + +// RetrieveIngestionDataSourcesResponseValidationError is the validation error +// returned by RetrieveIngestionDataSourcesResponse.Validate if the designated +// constraints aren't met. +type RetrieveIngestionDataSourcesResponseValidationError struct { + field string + reason string + cause error + key bool +} + +// Field function returns field value. +func (e RetrieveIngestionDataSourcesResponseValidationError) Field() string { return e.field } + +// Reason function returns reason value. +func (e RetrieveIngestionDataSourcesResponseValidationError) Reason() string { return e.reason } + +// Cause function returns cause value. +func (e RetrieveIngestionDataSourcesResponseValidationError) Cause() error { return e.cause } + +// Key function returns key value. +func (e RetrieveIngestionDataSourcesResponseValidationError) Key() bool { return e.key } + +// ErrorName returns error name. +func (e RetrieveIngestionDataSourcesResponseValidationError) ErrorName() string { + return "RetrieveIngestionDataSourcesResponseValidationError" +} + +// Error satisfies the builtin error interface +func (e RetrieveIngestionDataSourcesResponseValidationError) Error() string { + cause := "" + if e.cause != nil { + cause = fmt.Sprintf(" | caused by: %v", e.cause) + } + + key := "" + if e.key { + key = "key for " + } + + return fmt.Sprintf( + "invalid %sRetrieveIngestionDataSourcesResponse.%s: %s%s", + key, + e.field, + e.reason, + cause) +} + +var _ error = RetrieveIngestionDataSourcesResponseValidationError{} + +var _ interface { + Field() string + Reason() string + Key() bool + Cause() error + ErrorName() string +} = RetrieveIngestionDataSourcesResponseValidationError{} diff --git a/diode-server/reconciler/v1/reconcilerpb/reconciler_grpc.pb.go b/diode-server/reconciler/v1/reconcilerpb/reconciler_grpc.pb.go new file mode 100644 index 00000000..aa50e76a --- /dev/null +++ b/diode-server/reconciler/v1/reconcilerpb/reconciler_grpc.pb.go @@ -0,0 +1,111 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc (unknown) +// source: reconciler/v1/reconciler.proto + +package reconcilerpb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + ReconcilerService_RetrieveIngestionDataSources_FullMethodName = "/reconciler.v1.ReconcilerService/RetrieveIngestionDataSources" +) + +// ReconcilerServiceClient is the client API for ReconcilerService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ReconcilerServiceClient interface { + // Retrieves ingestion data sources + RetrieveIngestionDataSources(ctx context.Context, in *RetrieveIngestionDataSourcesRequest, opts ...grpc.CallOption) (*RetrieveIngestionDataSourcesResponse, error) +} + +type reconcilerServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewReconcilerServiceClient(cc grpc.ClientConnInterface) ReconcilerServiceClient { + return &reconcilerServiceClient{cc} +} + +func (c *reconcilerServiceClient) RetrieveIngestionDataSources(ctx context.Context, in *RetrieveIngestionDataSourcesRequest, opts ...grpc.CallOption) (*RetrieveIngestionDataSourcesResponse, error) { + out := new(RetrieveIngestionDataSourcesResponse) + err := c.cc.Invoke(ctx, ReconcilerService_RetrieveIngestionDataSources_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ReconcilerServiceServer is the server API for ReconcilerService service. +// All implementations must embed UnimplementedReconcilerServiceServer +// for forward compatibility +type ReconcilerServiceServer interface { + // Retrieves ingestion data sources + RetrieveIngestionDataSources(context.Context, *RetrieveIngestionDataSourcesRequest) (*RetrieveIngestionDataSourcesResponse, error) + mustEmbedUnimplementedReconcilerServiceServer() +} + +// UnimplementedReconcilerServiceServer must be embedded to have forward compatible implementations. +type UnimplementedReconcilerServiceServer struct { +} + +func (UnimplementedReconcilerServiceServer) RetrieveIngestionDataSources(context.Context, *RetrieveIngestionDataSourcesRequest) (*RetrieveIngestionDataSourcesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RetrieveIngestionDataSources not implemented") +} +func (UnimplementedReconcilerServiceServer) mustEmbedUnimplementedReconcilerServiceServer() {} + +// UnsafeReconcilerServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ReconcilerServiceServer will +// result in compilation errors. +type UnsafeReconcilerServiceServer interface { + mustEmbedUnimplementedReconcilerServiceServer() +} + +func RegisterReconcilerServiceServer(s grpc.ServiceRegistrar, srv ReconcilerServiceServer) { + s.RegisterService(&ReconcilerService_ServiceDesc, srv) +} + +func _ReconcilerService_RetrieveIngestionDataSources_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RetrieveIngestionDataSourcesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ReconcilerServiceServer).RetrieveIngestionDataSources(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ReconcilerService_RetrieveIngestionDataSources_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ReconcilerServiceServer).RetrieveIngestionDataSources(ctx, req.(*RetrieveIngestionDataSourcesRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// ReconcilerService_ServiceDesc is the grpc.ServiceDesc for ReconcilerService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var ReconcilerService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "reconciler.v1.ReconcilerService", + HandlerType: (*ReconcilerServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "RetrieveIngestionDataSources", + Handler: _ReconcilerService_RetrieveIngestionDataSources_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "reconciler/v1/reconciler.proto", +}