Skip to content

Commit

Permalink
Merge pull request #94 from kubescape/checksum
Browse files Browse the repository at this point in the history
try to use checksum annotation before calculating it
  • Loading branch information
matthyx authored Nov 20, 2024
2 parents 928e518 + a800c18 commit 32f88f0
Show file tree
Hide file tree
Showing 11 changed files with 512 additions and 609 deletions.
3 changes: 2 additions & 1 deletion adapters/backend/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
storageutils "github.com/kubescape/storage/pkg/utils"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/domain"
"github.com/kubescape/synchronizer/messaging"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (c *Client) sendServerConnectedMessage(ctx context.Context) error {

func (c *Client) callVerifyObject(ctx context.Context, id domain.KindName, object []byte) error {
// calculate checksum
checksum, err := utils.CanonicalHash(object)
checksum, err := storageutils.CanonicalHash(object)
if err != nil {
return fmt.Errorf("calculate checksum: %w", err)
}
Expand Down
138 changes: 42 additions & 96 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/kubescape/go-logger/helpers"
helpersv1 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
spdxv1beta1 "github.com/kubescape/storage/pkg/generated/clientset/versioned/typed/softwarecomposition/v1beta1"
storageutils "github.com/kubescape/storage/pkg/utils"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/config"
"github.com/kubescape/synchronizer/domain"
Expand All @@ -33,7 +34,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
)
Expand Down Expand Up @@ -67,7 +67,6 @@ type Client struct {
includeNamespaces []string
operatorNamespace string // the namespace where the kubescape operator is running
kind *domain.Kind
multiplier int
callbacks domain.Callbacks
res schema.GroupVersionResource
ShadowObjects map[string][]byte
Expand All @@ -81,6 +80,9 @@ func NewClient(dynamicClient dynamic.Interface, storageClient spdxv1beta1.SpdxV1
res := schema.GroupVersionResource{Group: r.Group, Version: r.Version, Resource: r.Resource}
// get event multiplier from env, defaults to 0
multiplier, _ := strconv.Atoi(os.Getenv(envMultiplier))
if multiplier > 0 {
logger.L().Warning("event multiplier config detected, but it is deprecated", helpers.String("resource", res.String()), helpers.Int("multiplier", multiplier))
}
return &Client{
account: cfg.Account,
dynamicClient: dynamicClient,
Expand All @@ -94,7 +96,6 @@ func NewClient(dynamicClient dynamic.Interface, storageClient spdxv1beta1.SpdxV1
Version: res.Version,
Resource: res.Resource,
},
multiplier: multiplier,
res: res,
ShadowObjects: map[string][]byte{},
Strategy: r.Strategy,
Expand Down Expand Up @@ -151,12 +152,12 @@ func (c *Client) Start(ctx context.Context) error {
switch {
case event.Type == watch.Added:
logger.L().Debug("added resource", helpers.String("id", id.String()))
newObject, err := c.getObjectFromMeta(d)
checksum, err := c.getChecksum(d)
if err != nil {
logger.L().Ctx(ctx).Error("cannot get object", helpers.Error(err), helpers.String("id", id.String()))
logger.L().Ctx(ctx).Error("cannot get checksum", helpers.Error(err), helpers.String("id", id.String()))
continue
}
err = c.callVerifyObject(ctx, id, newObject)
err = c.callbacks.VerifyObject(ctx, id, checksum)
if err != nil {
logger.L().Ctx(ctx).Error("cannot handle added resource", helpers.Error(err), helpers.String("id", id.String()))
}
Expand Down Expand Up @@ -224,11 +225,7 @@ func (c *Client) watchRetry(ctx context.Context, watchOpts metav1.ListOptions, e
if event.Type == watch.Error {
return fmt.Errorf("watch error: %s", event.Object)
}
if c.multiplier > 0 {
multiplyEvent(event, eventQueue, c.multiplier)
} else {
eventQueue.Enqueue(event)
}
eventQueue.Enqueue(event)
}
}, utils.NewBackOff(), func(err error, d time.Duration) {
if !errors.Is(err, errWatchClosed) {
Expand All @@ -245,36 +242,6 @@ func (c *Client) watchRetry(ctx context.Context, watchOpts metav1.ListOptions, e
}
}

func multiplyEvent(event watch.Event, queue *utils.CooldownQueue, multiplier int) {
objectName := event.Object.(metav1.Object).GetName()
objectUID := event.Object.(metav1.Object).GetUID()
for i := 0; i < multiplier; i++ {
newEvent := event.DeepCopy()
newEvent.Object.(metav1.Object).SetName(fmt.Sprintf("%s-%d", objectName, i))
// change the workload-name label too - if applicable
labels := newEvent.Object.(metav1.Object).GetLabels()
if labels != nil {
if workloadName, ok := labels[helpersv1.NameMetadataKey]; ok {
labels[helpersv1.NameMetadataKey] = fmt.Sprintf("%s-%d", workloadName, i)
newEvent.Object.(metav1.Object).SetLabels(labels)
}
}

annotations := newEvent.Object.(metav1.Object).GetAnnotations()
if annotations != nil {
if wlid, ok := annotations[helpersv1.WlidMetadataKey]; ok {

// workloadinterface.
annotations[helpersv1.WlidMetadataKey] = fmt.Sprintf("%s-%d", wlid, i)
newEvent.Object.(metav1.Object).SetAnnotations(annotations)
}
}
// need also to modify UID, otherwise it will deduplicated by the cooldown queue
newEvent.Object.(metav1.Object).SetUID(types.UID(fmt.Sprintf("%s-%d", objectUID, i)))
queue.Enqueue(*newEvent)
}
}

// isFiltered returns true if workload should be filtered out.
// filters out workloads that have a parent, unless they are in the kubescape-operator namespace
func (c *Client) isFiltered(workload metav1.Object) bool {
Expand Down Expand Up @@ -344,7 +311,7 @@ func (c *Client) callPutOrPatch(ctx context.Context, id domain.KindName, baseObj
return fmt.Errorf("verifying patch: %w", err)
}
// calculate checksum
checksum, err := utils.CanonicalHash(mergeResult)
checksum, err := storageutils.CanonicalHash(mergeResult)
if err != nil {
return fmt.Errorf("calculate checksum: %w", err)
}
Expand All @@ -371,7 +338,7 @@ func (c *Client) callPutOrPatch(ctx context.Context, id domain.KindName, baseObj

func (c *Client) callVerifyObject(ctx context.Context, id domain.KindName, object []byte) error {
// calculate checksum
checksum, err := utils.CanonicalHash(object)
checksum, err := storageutils.CanonicalHash(object)
if err != nil {
return fmt.Errorf("calculate checksum: %w", err)
}
Expand All @@ -382,6 +349,23 @@ func (c *Client) callVerifyObject(ctx context.Context, id domain.KindName, objec
return nil
}

func (c *Client) getChecksum(d metav1.Object) (string, error) {
// fast path, we have the checksum already
if checksum, ok := d.GetAnnotations()[helpersv1.SyncChecksumMetadataKey]; ok {
return checksum, nil
}
// get the object and calculate the checksum
object, err := c.getObjectFromMeta(d)
if err != nil {
return "", fmt.Errorf("get object: %w", err)
}
checksum, err := storageutils.CanonicalHash(object)
if err != nil {
return "", fmt.Errorf("calculate checksum: %w", err)
}
return checksum, nil
}

func (c *Client) DeleteObject(_ context.Context, id domain.KindName) error {
if c.Strategy == domain.PatchStrategy {
// remove from known resources
Expand All @@ -392,7 +376,7 @@ func (c *Client) DeleteObject(_ context.Context, id domain.KindName) error {
}

func (c *Client) GetObject(ctx context.Context, id domain.KindName, baseObject []byte) error {
obj, err := c.getResource(id.Namespace, id.Name, c.multiplier > 0)
obj, err := c.getResource(id.Namespace, id.Name)
if err != nil {
return fmt.Errorf("get resource: %w", err)
}
Expand All @@ -416,7 +400,7 @@ func (c *Client) patchObject(ctx context.Context, id domain.KindName, checksum s
if c.Strategy != domain.PatchStrategy {
return nil, fmt.Errorf("patch strategy not enabled for resource %s", id.Kind.String())
}
obj, err := c.getResource(id.Namespace, id.Name, c.multiplier > 0)
obj, err := c.getResource(id.Namespace, id.Name)
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
}
Expand All @@ -430,7 +414,7 @@ func (c *Client) patchObject(ctx context.Context, id domain.KindName, checksum s
return object, fmt.Errorf("apply patch: %w", err)
}
// verify checksum
newChecksum, err := utils.CanonicalHash(modified)
newChecksum, err := storageutils.CanonicalHash(modified)
if err != nil {
return object, fmt.Errorf("calculate checksum: %w", err)
}
Expand Down Expand Up @@ -507,15 +491,15 @@ func (c *Client) Batch(ctx context.Context, _ domain.Kind, batchType domain.Batc
}

func (c *Client) verifyObject(id domain.KindName, newChecksum string) ([]byte, error) {
obj, err := c.getResource(id.Namespace, id.Name, c.multiplier > 0)
obj, err := c.getResource(id.Namespace, id.Name)
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
}
object, err := c.filterAndMarshal(obj)
if err != nil {
return nil, fmt.Errorf("marshal resource: %w", err)
}
checksum, err := utils.CanonicalHash(object)
checksum, err := storageutils.CanonicalHash(object)
if err != nil {
return object, fmt.Errorf("calculate checksum: %w", err)
}
Expand All @@ -540,24 +524,15 @@ func (c *Client) getExistingStorageObjects(ctx context.Context) (string, error)
Namespace: d.GetNamespace(),
ResourceVersion: domain.ToResourceVersion(d.GetResourceVersion()),
}
obj, err := c.getResource(d.GetNamespace(), d.GetName(), false)
// get checksum
checksum, err := c.getChecksum(d)
if err != nil {
logger.L().Ctx(ctx).Error("cannot get object", helpers.Error(err), helpers.String("id", id.String()))
logger.L().Ctx(ctx).Error("cannot get checksums", helpers.Error(err), helpers.String("id", id.String()))
return nil
}
if c.multiplier > 0 {
c.multiplyVerifyObject(ctx, id, obj)
} else {
newObject, err := c.filterAndMarshal(obj)
if err != nil {
logger.L().Ctx(ctx).Error("cannot marshal object", helpers.Error(err), helpers.String("id", id.String()))
return nil
}
err = c.callVerifyObject(ctx, id, newObject)
if err != nil {
logger.L().Ctx(ctx).Error("cannot handle added resource", helpers.Error(err), helpers.String("id", id.String()))
return nil
}
err = c.callbacks.VerifyObject(ctx, id, checksum)
if err != nil {
logger.L().Ctx(ctx).Error("cannot handle added resource", helpers.Error(err), helpers.String("id", id.String()))
}
return nil
}); err != nil {
Expand All @@ -567,40 +542,14 @@ func (c *Client) getExistingStorageObjects(ctx context.Context) (string, error)
return resourceVersion, nil
}

func (c *Client) multiplyVerifyObject(ctx context.Context, id domain.KindName, obj metav1.Object) {
objectName := id.Name
for i := 0; i < c.multiplier; i++ {
id.Name = fmt.Sprintf("%s-%d", objectName, i)
obj.SetName(id.Name)
// change the workload-name label too - if applicable
labels := obj.GetLabels()
if labels != nil {
if workloadName, ok := labels[helpersv1.NameMetadataKey]; ok {
labels[helpersv1.NameMetadataKey] = fmt.Sprintf("%s-%d", workloadName, i)
obj.SetLabels(labels)
}
}
newObject, err := c.filterAndMarshal(obj)
if err != nil {
logger.L().Ctx(ctx).Error("cannot marshal object", helpers.Error(err), helpers.String("id", id.String()))
continue
}
err = c.callVerifyObject(ctx, id, newObject)
if err != nil {
logger.L().Ctx(ctx).Error("cannot handle added resource", helpers.Error(err), helpers.String("id", id.String()))
continue
}
}
}

func (c *Client) filterAndMarshal(d metav1.Object) ([]byte, error) {
utils.RemoveManagedFields(d)
storageutils.RemoveManagedFields(d)
if un, ok := d.(*unstructured.Unstructured); ok {
fields, ok := fieldsToRemove[c.kind.String()]
if !ok {
fields = fieldsToRemove["default"]
}
if err := utils.RemoveSpecificFields(un, fields); err != nil {
if err := storageutils.RemoveSpecificFields(un, fields); err != nil {
return nil, fmt.Errorf("remove specific fields: %w", err)
}
} else {
Expand All @@ -616,7 +565,7 @@ func (c *Client) filterAndMarshal(d metav1.Object) ([]byte, error) {

func (c *Client) getObjectFromMeta(d metav1.Object) ([]byte, error) {
if c.res.Group == kubescapeCustomResourceGroup {
obj, err := c.getResource(d.GetNamespace(), d.GetName(), c.multiplier > 0)
obj, err := c.getResource(d.GetNamespace(), d.GetName())
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
}
Expand Down Expand Up @@ -841,10 +790,7 @@ func (c *Client) chooseWatcher(opts metav1.ListOptions) (watch.Interface, error)
return c.dynamicClient.Resource(c.res).Namespace("").Watch(context.Background(), opts)
}

func (c *Client) getResource(namespace string, name string, strip bool) (metav1.Object, error) {
if strip {
name = stripSuffix(name)
}
func (c *Client) getResource(namespace string, name string) (metav1.Object, error) {
if c.storageClient != nil {
switch c.res.Resource {
case "applicationprofiles":
Expand Down
1 change: 0 additions & 1 deletion adapters/incluster/v1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func TestClient_filterAndMarshal(t *testing.T) {
account: tt.fields.account,
cluster: tt.fields.cluster,
kind: tt.fields.kind,
multiplier: tt.fields.multiplier,
callbacks: tt.fields.callbacks,
res: tt.fields.res,
ShadowObjects: tt.fields.ShadowObjects,
Expand Down
11 changes: 6 additions & 5 deletions adapters/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
jsonpatch "github.com/evanphx/json-patch"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
storageutils "github.com/kubescape/storage/pkg/utils"
"github.com/kubescape/synchronizer/domain"
"github.com/kubescape/synchronizer/utils"
"go.uber.org/multierr"
Expand Down Expand Up @@ -121,7 +122,7 @@ func (m *MockAdapter) GetObject(ctx context.Context, id domain.KindName, baseObj
return fmt.Errorf("verifying patch: %w", err)
}
// calculate checksum
checksum, err := utils.CanonicalHash(mergeResult)
checksum, err := storageutils.CanonicalHash(mergeResult)
if err != nil {
return fmt.Errorf("calculate checksum: %w", err)
}
Expand Down Expand Up @@ -152,7 +153,7 @@ func (m *MockAdapter) patchObject(id domain.KindName, checksum string, patch []b
if err != nil {
return object, fmt.Errorf("apply patch: %w", err)
}
newChecksum, err := utils.CanonicalHash(modified)
newChecksum, err := storageutils.CanonicalHash(modified)
if err != nil {
return object, fmt.Errorf("calculate checksum: %w", err)
}
Expand Down Expand Up @@ -214,7 +215,7 @@ func (m *MockAdapter) verifyObject(id domain.KindName, newChecksum string) ([]by
if !ok {
return nil, fmt.Errorf("object not found")
}
checksum, err := utils.CanonicalHash(object)
checksum, err := storageutils.CanonicalHash(object)
if err != nil {
return nil, fmt.Errorf("calculate checksum: %w", err)
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func (m *MockAdapter) TestCallPutOrPatch(ctx context.Context, id domain.KindName
return fmt.Errorf("verifying patch: %w", err)
}
// calculate checksum
checksum, err := utils.CanonicalHash(mergeResult)
checksum, err := storageutils.CanonicalHash(mergeResult)
if err != nil {
return fmt.Errorf("calculate checksum: %w", err)
}
Expand Down Expand Up @@ -295,7 +296,7 @@ func (m *MockAdapter) TestCallVerifyObject(ctx context.Context, id domain.KindNa
// store object locally - this is only for testing purposes
m.Resources[id.String()] = object
// calculate checksum
checksum, err := utils.CanonicalHash(object)
checksum, err := storageutils.CanonicalHash(object)
if err != nil {
return fmt.Errorf("calculate checksum: %w", err)
}
Expand Down
Loading

0 comments on commit 32f88f0

Please sign in to comment.