Skip to content

Commit

Permalink
Merge pull request #43 from netboxlabs/feat-OBS-390-retrieve-ingestio…
Browse files Browse the repository at this point in the history
…n-data-sources

feat: OBS-390 - [reconciler] implement ReconcilerService.RetrieveIngestionDataSources
  • Loading branch information
mfiedorowicz authored Feb 21, 2024
2 parents 6518b90 + c37aafa commit 7e8117e
Show file tree
Hide file tree
Showing 19 changed files with 1,313 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Command(BaseCommand):

diode_to_netbox_username = "DIODE_TO_NETBOX"
netbox_to_diode_username = "NETBOX_TO_DIODE"
datasource_to_diode_username = "DATASOURCE_TO_DIODE"
ingestion_username = "INGESTION"

def add_arguments(self, parser):
"""Add command arguments."""
Expand All @@ -43,10 +43,10 @@ def add_arguments(self, parser):
help="NetBox Diode plugin to Diode API key"
)
parser.add_argument(
"--datasource-to-diode-api-key",
dest="datasource_to_diode_api_key",
"--ingestion-api-key",
dest="ingestion_api_key",
required=True,
help="Datasource to Diode API key"
help="Ingestion API key"
)

def handle(self, *args, **options):
Expand All @@ -57,6 +57,6 @@ def handle(self, *args, **options):

_create_user_with_token(self.diode_to_netbox_username, options['diode_to_netbox_api_key'], group)
_create_user_with_token(self.netbox_to_diode_username, options['netbox_to_diode_api_key'], group, True)
_create_user_with_token(self.datasource_to_diode_username, options['datasource_to_diode_api_key'], group)
_create_user_with_token(self.ingestion_username, options['ingestion_api_key'], group)

self.stdout.write("Finished.")
102 changes: 78 additions & 24 deletions diode-server/distributor/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,46 @@ package distributor

import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"time"

"github.com/kelseyhightower/envconfig"
pb "github.com/netboxlabs/diode/diode-sdk-go/diode/v1/diodepb"
"github.com/netboxlabs/diode/diode-sdk-go/diode/v1/diodepb"
"github.com/netboxlabs/diode/diode-server/reconciler"
"github.com/netboxlabs/diode/diode-server/reconciler/v1/reconcilerpb"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/proto"
)

const (
// DefaultRequestStream is the default stream to use when none is provided
DefaultRequestStream = "latest"

streamID = "diode.v1.ingest-stream"
)

var (
errMetadataNotFound = errors.New("no request metadata found")

// ErrUnauthorized is an error for unauthorized requests
ErrUnauthorized = errors.New("missing or invalid authorization header")
)

// Component is a gRPC server that handles data ingestion requests
type Component struct {
pb.UnimplementedDistributorServiceServer

ctx context.Context
config Config
logger *slog.Logger
grpcListener net.Listener
grpcServer *grpc.Server
redisClient *redis.Client
diodepb.UnimplementedDistributorServiceServer

ctx context.Context
config Config
logger *slog.Logger
grpcListener net.Listener
grpcServer *grpc.Server
redisClient *redis.Client
reconcilerClient reconciler.Client
ingestionDataSources []*reconcilerpb.IngestionDataSource
}

// New creates a new distributor component
Expand All @@ -54,21 +64,52 @@ func New(ctx context.Context, logger *slog.Logger) (*Component, error) {
return nil, fmt.Errorf("failed connection to %s: %v", redisClient.String(), err)
}

grpcServer := grpc.NewServer()
reconcilerClient, err := reconciler.NewClient(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to create reconciler client: %v", err)
}

dataSources, err := reconcilerClient.RetrieveIngestionDataSources(ctx, &reconcilerpb.RetrieveIngestionDataSourcesRequest{})
if err != nil {
return nil, fmt.Errorf("failed to retrieve ingestion data sources: %v", err)
}

ingestionDataSources := dataSources.GetIngestionDataSources()
//auth := grpc.UnaryServerInterceptor(authUnaryInterceptor)

auth := newAuthUnaryInterceptor(ingestionDataSources)

grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor(auth))
component := &Component{
ctx: ctx,
config: cfg,
logger: logger,
grpcListener: grpcListener,
grpcServer: grpcServer,
redisClient: redisClient,
}
pb.RegisterDistributorServiceServer(grpcServer, component)
ctx: ctx,
config: cfg,
logger: logger,
grpcListener: grpcListener,
grpcServer: grpcServer,
redisClient: redisClient,
reconcilerClient: reconcilerClient,
ingestionDataSources: ingestionDataSources,
}
diodepb.RegisterDistributorServiceServer(grpcServer, component)
reflection.Register(grpcServer)

return component, nil
}

func newAuthUnaryInterceptor(dataSources []*reconcilerpb.IngestionDataSource) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMetadataNotFound
}
if !authorized(dataSources, md["diode-api-key"]) {
return nil, ErrUnauthorized
}
return handler(ctx, req)
}

}

// Name returns the name of the component
func (c *Component) Name() string {
return "distributor"
Expand All @@ -88,7 +129,7 @@ func (c *Component) Stop() error {
}

// Push handles a push request
func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushResponse, error) {
func (c *Component) Push(ctx context.Context, in *diodepb.PushRequest) (*diodepb.PushResponse, error) {
if err := validatePushRequest(in); err != nil {
return nil, err
}
Expand Down Expand Up @@ -119,10 +160,10 @@ func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushRespo
c.logger.Error("failed to add element to the stream", "error", err, "streamID", streamID, "value", msg)
}

return &pb.PushResponse{Errors: errs}, nil
return &diodepb.PushResponse{Errors: errs}, nil
}

func validatePushRequest(in *pb.PushRequest) error {
func validatePushRequest(in *diodepb.PushRequest) error {
if in.GetId() == "" {
return fmt.Errorf("id is empty")
}
Expand All @@ -149,3 +190,16 @@ func validatePushRequest(in *pb.PushRequest) error {

return nil
}

func authorized(dataSources []*reconcilerpb.IngestionDataSource, authorization []string) bool {
if len(dataSources) < 1 || len(authorization) != 1 {
return false
}

for _, v := range dataSources {
if v.GetApiKey() == authorization[0] {
return true
}
}
return false
}
2 changes: 2 additions & 0 deletions diode-server/docker/diode/env/distributor.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ API_KEY=CHANGE_.ME
REDIS_PASSWORD=@FmnLoA*VnebyVnZoL.!-.6z
REDIS_HOST=diode-redis
REDIS_PORT=6379
RECONCILER_GRPC_HOST=diode-reconciler
RECONCILER_GRPC_PORT=8081
4 changes: 2 additions & 2 deletions diode-server/docker/diode/env/reconciler.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ REDIS_PASSWORD=@FmnLoA*VnebyVnZoL.!-.6z
REDIS_HOST=diode-redis
REDIS_PORT=6379
DIODE_TO_NETBOX_API_KEY=1368dbad13e418d5a443d93cf255edde03a2a754
NETBOX_DIODE_API_KEY=1e99338b8cab5fc637bc55f390bda1446f619c42
DATASOURCE_TO_DIODE_API_KEY=5a52c45ee8231156cb620d193b0291912dd15433
NETBOX_TO_DIODE_API_KEY=1e99338b8cab5fc637bc55f390bda1446f619c42
INGESTION_API_KEY=5a52c45ee8231156cb620d193b0291912dd15433
NETBOX_API_URL=http://netbox:8000/api
2 changes: 1 addition & 1 deletion diode-server/docker/docker-compose.netbox.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ services:
- configurediodeplugin
- --diode-to-netbox-api-key=${DIODE_TO_NETBOX_API_KEY}
- --netbox-to-diode-api-key=${NETBOX_TO_DIODE_API_KEY}
- --datasource-to-diode-api-key=${DATASOURCE_TO_DIODE_API_KEY}
- --ingestion-api-key=${INGESTION_API_KEY}
# postgres
netbox-postgres:
image: docker.io/postgres:16-alpine
Expand Down
1 change: 1 addition & 0 deletions diode-server/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
- "8081:8081"
depends_on:
- diode-redis
- diode-reconciler
diode-ingester:
image: netboxlabs/diode-ingester:${DIODE_VERSION}-${COMMIT_SHA}
env_file: diode/env/ingester.env
Expand Down
4 changes: 2 additions & 2 deletions diode-server/docker/netbox/env/netbox.env
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ SKIP_SUPERUSER=true
WEBHOOKS_ENABLED=true
RELOAD_NETBOX_ON_DIODE_PLUGIN_CHANGE=true
DIODE_TO_NETBOX_API_KEY=1368dbad13e418d5a443d93cf255edde03a2a754
NETBOX_DIODE_API_KEY=1e99338b8cab5fc637bc55f390bda1446f619c42
DATASOURCE_TO_DIODE_API_KEY=5a52c45ee8231156cb620d193b0291912dd15433
NETBOX_TO_DIODE_API_KEY=1e99338b8cab5fc637bc55f390bda1446f619c42
INGESTION_API_KEY=5a52c45ee8231156cb620d193b0291912dd15433
11 changes: 11 additions & 0 deletions diode-server/proto/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: v1
plugins:
- plugin: go
out: ../
opt: module=github.com/netboxlabs/diode/diode-server
- plugin: buf.build/grpc/go:v1.3.0
out: ../
opt: module=github.com/netboxlabs/diode/diode-server
- plugin: buf.build/bufbuild/validate-go:v1.0.4
out: ../
opt: module=github.com/netboxlabs/diode/diode-server
13 changes: 13 additions & 0 deletions diode-server/proto/buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Generated by buf. DO NOT EDIT.
version: v1
deps:
- remote: buf.build
owner: envoyproxy
repository: protoc-gen-validate
commit: 71881f09a0c5420a9545a07987a86728
digest: shake256:d320bbf06653b1b2b45a1f95bfa82bf7b998221a777a042708c50d6f86a30d1a85b50c5704c597142d9b308280efe1295d39d76d1abea5f7046d3df4c8cc3cef
- remote: buf.build
owner: googleapis
repository: googleapis
commit: 7e6f6e774e29406da95bd61cdcdbc8bc
digest: shake256:fe43dd2265ea0c07d76bd925eeba612667cf4c948d2ce53d6e367e1b4b3cb5fa69a51e6acb1a6a50d32f894f054a35e6c0406f6808a483f2752e10c866ffbf73
13 changes: 13 additions & 0 deletions diode-server/proto/buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: v1
deps:
- buf.build/googleapis/googleapis
- buf.build/envoyproxy/protoc-gen-validate
breaking:
use:
- FILE
lint:
use:
- DEFAULT
except:
- RPC_REQUEST_STANDARD_NAME
- RPC_RESPONSE_STANDARD_NAME
44 changes: 44 additions & 0 deletions diode-server/proto/reconciler/v1/reconciler.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
syntax = "proto3";

package reconciler.v1;

import "google/protobuf/timestamp.proto";
import "validate/validate.proto";

option go_package = "github.com/netboxlabs/diode/diode-server/reconciler/v1/reconcilerpb";

// An ingestion data source
message IngestionDataSource {
string name = 1 [(validate.rules).string = {
min_len: 1,
max_len: 255
}];
string api_key = 2 [(validate.rules).string = {
min_len: 40,
max_len: 40
}];
}

// The request to retrieve ingestion data sources
message RetrieveIngestionDataSourcesRequest {
string name = 1 [(validate.rules).string = {
min_len: 1,
max_len: 255
}];
string sdk_name = 2 [(validate.rules).string = {
min_len: 1,
max_len: 255
}];
string sdk_version = 3 [(validate.rules).string = {pattern: "^(\\d)+\\.(\\d)+\\.(\\d)+$"}];
}

// The response from the retrieve ingestion data sources request
message RetrieveIngestionDataSourcesResponse {
repeated IngestionDataSource ingestion_data_sources = 1;
}

// Reconciler service API
service ReconcilerService {
// Retrieves ingestion data sources
rpc RetrieveIngestionDataSources(RetrieveIngestionDataSourcesRequest) returns (RetrieveIngestionDataSourcesResponse) {}
}
6 changes: 3 additions & 3 deletions diode-server/reconciler/api_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ func (ak APIKeys) MarshalBinary() ([]byte, error) {

func loadAPIKeys(ctx context.Context, cfg Config, rc *redis.Client) (APIKeys, error) {
apiKeys := map[string]string{
"DIODE_TO_NETBOX": cfg.DiodeToNetBoxAPIKey,
"NETBOX_TO_DIODE": cfg.NetBoxToDiodeAPIKey,
"DATASOURCE_TO_DIODE": cfg.DatasourceToDiodeAPIKey,
"DIODE_TO_NETBOX": cfg.DiodeToNetBoxAPIKey,
"NETBOX_TO_DIODE": cfg.NetBoxToDiodeAPIKey,
"INGESTION": cfg.IngestionAPIKey,
}

if err := rc.HSet(ctx, "diode.api_keys", apiKeys).Err(); err != nil {
Expand Down
Loading

0 comments on commit 7e8117e

Please sign in to comment.