Skip to content

Commit

Permalink
feat: Add log ingest client (#153)
Browse files Browse the repository at this point in the history
Signed-off-by: Sam Lock <[email protected]>
  • Loading branch information
Sambigeara committed Mar 7, 2024
1 parent b127b43 commit 13de0bb
Show file tree
Hide file tree
Showing 13 changed files with 712 additions and 168 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ generate-proto-code: proto-gen-deps
generate-mocks: $(MOCKERY)
@ $(MOCKERY) $(MOCK_QUIET) --srcpkg=./genpb/cerbos/cloud/apikey/v1/apikeyv1connect --name=ApiKeyServiceHandler
@ $(MOCKERY) $(MOCK_QUIET) --srcpkg=./genpb/cerbos/cloud/bundle/v1/bundlev1connect --name=CerbosBundleServiceHandler
@ $(MOCKERY) $(MOCK_QUIET) --srcpkg=./genpb/cerbos/cloud/logs/v1/logsv1connect --name=CerbosLogsServiceHandler

.PHONY: compile
compile:
Expand Down
2 changes: 1 addition & 1 deletion bundle/auth.go → base/auth.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2021-2024 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0
package bundle
package base

import (
"context"
Expand Down
89 changes: 89 additions & 0 deletions base/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2021-2024 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0
package base

import (
"encoding/json"
"fmt"
"net/http"

"connectrpc.com/connect"
"connectrpc.com/otelconnect"
"github.com/go-logr/logr"
"github.com/hashicorp/go-retryablehttp"
"golang.org/x/net/http2"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

type Client struct {
HTTPClient *http.Client
conf ClientConf
}

func NewClient(conf ClientConf) (c Client, opts []connect.ClientOption, _ error) {
otelConnect, err := otelconnect.NewInterceptor()
if err != nil {
return c, opts, fmt.Errorf("failed to create otel interceptor: %w", err)
}

opts = []connect.ClientOption{
connect.WithCompressMinBytes(1024),
connect.WithInterceptors(
otelConnect,
newUserAgentInterceptor(),
),
}

retryableHTTPClient := mkRetryableHTTPClient(conf)
authClient := newAuthClient(conf, retryableHTTPClient, opts...)

opts = append(opts, connect.WithInterceptors(newAuthInterceptor(authClient)))

return Client{
conf: conf,
HTTPClient: retryableHTTPClient,
}, opts, nil
}

func MkHTTPClient(conf ClientConf) *http.Client {
return &http.Client{
Transport: &http2.Transport{
TLSClientConfig: conf.TLS.Clone(),
},
}
}

func mkRetryableHTTPClient(conf ClientConf) *http.Client {
httpClient := retryablehttp.NewClient()
httpClient.HTTPClient = MkHTTPClient(conf)
httpClient.RetryMax = conf.RetryMaxAttempts
httpClient.RetryWaitMin = conf.RetryWaitMin
httpClient.RetryWaitMax = conf.RetryWaitMax
httpClient.Logger = logWrapper{Logger: conf.Logger.WithName("transport")}

return httpClient.StandardClient()
}

func LogResponsePayload(log logr.Logger, payload proto.Message) {
if lg := log.V(3); lg.Enabled() {
lg.Info("RPC response", "payload", ProtoWrapper{p: payload})
}
}

type ProtoWrapper struct {
p proto.Message
}

func NewProtoWrapper(msg proto.Message) ProtoWrapper {
return ProtoWrapper{p: msg}
}

func (pw ProtoWrapper) MarshalLog() any {
bytes, err := protojson.Marshal(pw.p)
if err != nil {
return fmt.Sprintf("error marshaling response: %v", err)
}

return json.RawMessage(bytes)
}
100 changes: 100 additions & 0 deletions base/client_conf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2021-2024 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0

package base

import (
"crypto/tls"
"errors"
"fmt"
"time"

"github.com/go-logr/logr"
"go.uber.org/multierr"

"github.com/cerbos/cloud-api/credentials"
pdpv1 "github.com/cerbos/cloud-api/genpb/cerbos/cloud/pdp/v1"
)

var (
errEmptyAPIEndpoint = errors.New("api endpoint must be defined")
errEmptyBootstrapEndpoint = errors.New("bootstrap endpoint must be defined")
errHeartbeatIntervalTooShort = errors.New("heartbeat interval is too short")
errMissingCredentials = errors.New("missing credentials")
errMissingIdentifier = errors.New("missing PDP identifier")
)

const (
defaultHeartbeatInterval = 2 * time.Minute
defaultRetryWaitMin = 1 * time.Second //nolint:revive
defaultRetryWaitMax = 5 * time.Minute
defaultRetryMaxAttempts = 10
minHeartbeatInterval = 30 * time.Second
)

type ClientConf struct {
PDPIdentifier *pdpv1.Identifier
TLS *tls.Config
Logger logr.Logger
Credentials *credentials.Credentials
APIEndpoint string
BootstrapEndpoint string
RetryWaitMin time.Duration
RetryWaitMax time.Duration
RetryMaxAttempts int
HeartbeatInterval time.Duration
}

func (cc ClientConf) Validate() (outErr error) {
if cc.Credentials == nil {
outErr = multierr.Append(outErr, errMissingCredentials)
}

if cc.APIEndpoint == "" {
outErr = multierr.Append(outErr, errEmptyAPIEndpoint)
}

if cc.BootstrapEndpoint == "" {
outErr = multierr.Append(outErr, errEmptyBootstrapEndpoint)
}

if cc.PDPIdentifier == nil {
outErr = multierr.Append(outErr, errMissingIdentifier)
} else if err := Validate(cc.PDPIdentifier); err != nil {
outErr = multierr.Append(outErr, fmt.Errorf("invalid PDP identifier: %w", err))
}

if cc.HeartbeatInterval > 0 && cc.HeartbeatInterval < minHeartbeatInterval {
outErr = multierr.Append(outErr, errHeartbeatIntervalTooShort)
}

return outErr
}

func (cc *ClientConf) SetDefaults() {
if cc.RetryMaxAttempts == 0 {
cc.RetryMaxAttempts = defaultRetryMaxAttempts
}

if cc.RetryWaitMin == 0 {
cc.RetryWaitMin = defaultRetryWaitMin
}

if cc.RetryWaitMax == 0 {
cc.RetryWaitMax = defaultRetryWaitMax
}

if cc.HeartbeatInterval == 0 {
cc.HeartbeatInterval = defaultHeartbeatInterval
}
}

type logWrapper struct {
logr.Logger
}

func (lw logWrapper) Printf(msg string, args ...any) {
if log := lw.V(1); log.Enabled() {
log.Info(fmt.Sprintf(msg, args...))
}
}
2 changes: 1 addition & 1 deletion bundle/interceptors.go → base/interceptors.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2021-2024 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0

package bundle
package base

import (
"context"
Expand Down
40 changes: 40 additions & 0 deletions base/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021-2024 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0

package base

import (
"fmt"
"sync"

"github.com/bufbuild/protovalidate-go"
"google.golang.org/protobuf/proto"

bootstrapv1 "github.com/cerbos/cloud-api/genpb/cerbos/cloud/bootstrap/v1"
)

var (
validateFn func(proto.Message) error
validatorOnce sync.Once
)

func Validate[T proto.Message](obj T) error {
validatorOnce.Do(func() {
validator, err := protovalidate.New(
protovalidate.WithMessages(
&bootstrapv1.PDPConfig{},
),
)
if err != nil {
validateFn = func(_ proto.Message) error {
return fmt.Errorf("failed to initialize validator: %w", err)
}
} else {
validateFn = func(m proto.Message) error {
return validator.Validate(m)
}
}
})

return validateFn(obj)
}
Loading

0 comments on commit 13de0bb

Please sign in to comment.