Skip to content

Commit

Permalink
Merge pull request #37 from kubescape/tests
Browse files Browse the repository at this point in the history
Test improvements
  • Loading branch information
matthyx authored Jan 18, 2024
2 parents cf3d5e3 + 8f96091 commit 6887cb6
Show file tree
Hide file tree
Showing 8 changed files with 1,030 additions and 205 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/pr-created.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,18 @@ jobs:
GO_VERSION: "1.21"
CGO_ENABLED: 0
secrets: inherit

component-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Go 1.21
uses: actions/setup-go@v4
with:
go-version: "1.21"
env: |
CGO_ENABLED=0
- name: Run component tests
run: |
echo "machine github.com login git password ${{ secrets.INGESTERS_READ_TOKEN }}" > ~/.netrc
go test --timeout=20m --tags=integration ./...
79 changes: 41 additions & 38 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,44 +82,8 @@ func (c *Client) Start(ctx context.Context) error {
}
// begin watch
eventQueue := utils.NewCooldownQueue(utils.DefaultQueueSize, utils.DefaultTTL)
go func() {
if err := backoff.RetryNotify(func() error {
watcher, err := c.client.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts)
if err != nil {
return fmt.Errorf("client resource: %w", err)
}
logger.L().Info("starting watch", helpers.String("resource", c.res.Resource))
for {
event, chanActive := <-watcher.ResultChan()
// set resource version to resume watch from
// inspired by https://github.com/kubernetes/client-go/blob/5a0a4247921dd9e72d158aaa6c1ee124aba1da80/tools/watch/retrywatcher.go#L157
if metaObject, ok := event.Object.(resourceVersionGetter); ok {
watchOpts.ResourceVersion = metaObject.GetResourceVersion()
}
if eventQueue.Closed() {
watcher.Stop()
return backoff.Permanent(errors.New("event queue closed"))
}
if !chanActive {
// channel closed, retry
return errWatchClosed
}
if event.Type == watch.Error {
return fmt.Errorf("watch error: %s", event.Object)
}
eventQueue.Enqueue(event)
}
}, utils.NewBackOff(), func(err error, d time.Duration) {
if !errors.Is(err, errWatchClosed) {
logger.L().Ctx(ctx).Warning("watch", helpers.Error(err),
helpers.String("resource", c.res.Resource),
helpers.String("retry in", d.String()))
}
}); err != nil {
logger.L().Ctx(ctx).Fatal("giving up watch", helpers.Error(err),
helpers.String("resource", c.res.Resource))
}
}()
go c.watchRetry(ctx, watchOpts, eventQueue)
// process events
for event := range eventQueue.ResultChan {
// skip non-objects
d, ok := event.Object.(*unstructured.Unstructured)
Expand Down Expand Up @@ -175,6 +139,45 @@ func (c *Client) Start(ctx context.Context) error {
return nil
}

func (c *Client) watchRetry(ctx context.Context, watchOpts metav1.ListOptions, eventQueue *utils.CooldownQueue) {
if err := backoff.RetryNotify(func() error {
watcher, err := c.client.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts)
if err != nil {
return fmt.Errorf("client resource: %w", err)
}
logger.L().Info("starting watch", helpers.String("resource", c.res.Resource))
for {
event, chanActive := <-watcher.ResultChan()
// set resource version to resume watch from
// inspired by https://github.com/kubernetes/client-go/blob/5a0a4247921dd9e72d158aaa6c1ee124aba1da80/tools/watch/retrywatcher.go#L157
if metaObject, ok := event.Object.(resourceVersionGetter); ok {
watchOpts.ResourceVersion = metaObject.GetResourceVersion()
}
if eventQueue.Closed() {
watcher.Stop()
return backoff.Permanent(errors.New("event queue closed"))
}
if !chanActive {
// channel closed, retry
return errWatchClosed
}
if event.Type == watch.Error {
return fmt.Errorf("watch error: %s", event.Object)
}
eventQueue.Enqueue(event)
}
}, utils.NewBackOff(), func(err error, d time.Duration) {
if !errors.Is(err, errWatchClosed) {
logger.L().Ctx(ctx).Warning("watch", helpers.Error(err),
helpers.String("resource", c.res.Resource),
helpers.String("retry in", d.String()))
}
}); err != nil {
logger.L().Ctx(ctx).Fatal("giving up watch", helpers.Error(err),
helpers.String("resource", c.res.Resource))
}
}

// hasParent returns true if workload has a parent
// based on https://github.com/kubescape/k8s-interface/blob/2855cc94bd7666b227ad9e5db5ca25cb895e6cee/k8sinterface/k8sdynamic.go#L219
func hasParent(workload *unstructured.Unstructured) bool {
Expand Down
125 changes: 125 additions & 0 deletions adapters/incluster/v1/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package incluster

import (
"context"
"github.com/kubescape/synchronizer/domain"
"github.com/kubescape/synchronizer/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/k3s"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/ptr"
"testing"
"time"
)

var (
deploy = &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "test"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "test"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}},
},
},
},
}
)

func TestClient_watchRetry(t *testing.T) {
type fields struct {
client dynamic.Interface
account string
cluster string
kind *domain.Kind
callbacks domain.Callbacks
res schema.GroupVersionResource
ShadowObjects map[string][]byte
Strategy domain.Strategy
}
type args struct {
ctx context.Context
watchOpts metav1.ListOptions
eventQueue *utils.CooldownQueue
}
// we need a real client to test this, as the fake client ignores opts
ctx := context.TODO()
k3sC, err := k3s.RunContainer(ctx,
testcontainers.WithImage("docker.io/rancher/k3s:v1.27.9-k3s1"),
)
defer func() {
_ = k3sC.Terminate(ctx)
}()
require.NoError(t, err)
kubeConfigYaml, err := k3sC.GetKubeConfig(ctx)
require.NoError(t, err)
clusterConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeConfigYaml)
require.NoError(t, err)
dynamicClient := dynamic.NewForConfigOrDie(clusterConfig)
k8sclient := kubernetes.NewForConfigOrDie(clusterConfig)
tests := []struct {
name string
fields fields
args args
}{
{
name: "test reconnect after timeout",
fields: fields{
client: dynamicClient,
res: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
},
args: args{
eventQueue: utils.NewCooldownQueue(utils.DefaultQueueSize, utils.DefaultTTL),
watchOpts: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(1))},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
client: tt.fields.client,
account: tt.fields.account,
cluster: tt.fields.cluster,
kind: tt.fields.kind,
callbacks: tt.fields.callbacks,
res: tt.fields.res,
ShadowObjects: tt.fields.ShadowObjects,
Strategy: tt.fields.Strategy,
}
go c.watchRetry(ctx, tt.args.watchOpts, tt.args.eventQueue)
time.Sleep(5 * time.Second)
_, err = k8sclient.AppsV1().Deployments("default").Create(context.TODO(), deploy, metav1.CreateOptions{})
require.NoError(t, err)
time.Sleep(1 * time.Second)
var found bool
for event := range tt.args.eventQueue.ResultChan {
if event.Type == watch.Added && event.Object.(*unstructured.Unstructured).GetName() == "test" {
found = true
break
}
}
assert.True(t, found)
})
}
}
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestLoadConfig(t *testing.T) {
Resources: []Resource{
{Group: "apps", Version: "v1", Resource: "deployments", Strategy: "patch"},
{Group: "apps", Version: "v1", Resource: "statefulsets", Strategy: "patch"},
{Group: "", Version: "v1", Resource: "pods", Strategy: "patch"},
{Group: "spdx.softwarecomposition.kubescape.io", Version: "v1beta1", Resource: "applicationprofiles", Strategy: "patch"},
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions configuration/client/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
"strategy": "patch"
},
{
"group": "",
"version": "v1",
"resource": "pods",
"group": "spdx.softwarecomposition.kubescape.io",
"version": "v1beta1",
"resource": "applicationprofiles",
"strategy": "patch"
}
]
Expand Down
Loading

0 comments on commit 6887cb6

Please sign in to comment.