Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test improvements #37

Merged
merged 6 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/pr-created.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,18 @@ jobs:
GO_VERSION: "1.21"
CGO_ENABLED: 0
secrets: inherit

component-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Go 1.21
uses: actions/setup-go@v4
with:
go-version: "1.21"
env: |
CGO_ENABLED=0
- name: Run component tests
run: |
echo "machine github.com login git password ${{ secrets.INGESTERS_READ_TOKEN }}" > ~/.netrc
go test --timeout=20m --tags=integration ./...
79 changes: 41 additions & 38 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,44 +82,8 @@ func (c *Client) Start(ctx context.Context) error {
}
// begin watch
eventQueue := utils.NewCooldownQueue(utils.DefaultQueueSize, utils.DefaultTTL)
go func() {
if err := backoff.RetryNotify(func() error {
watcher, err := c.client.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts)
if err != nil {
return fmt.Errorf("client resource: %w", err)
}
logger.L().Info("starting watch", helpers.String("resource", c.res.Resource))
for {
event, chanActive := <-watcher.ResultChan()
// set resource version to resume watch from
// inspired by https://github.com/kubernetes/client-go/blob/5a0a4247921dd9e72d158aaa6c1ee124aba1da80/tools/watch/retrywatcher.go#L157
if metaObject, ok := event.Object.(resourceVersionGetter); ok {
watchOpts.ResourceVersion = metaObject.GetResourceVersion()
}
if eventQueue.Closed() {
watcher.Stop()
return backoff.Permanent(errors.New("event queue closed"))
}
if !chanActive {
// channel closed, retry
return errWatchClosed
}
if event.Type == watch.Error {
return fmt.Errorf("watch error: %s", event.Object)
}
eventQueue.Enqueue(event)
}
}, utils.NewBackOff(), func(err error, d time.Duration) {
if !errors.Is(err, errWatchClosed) {
logger.L().Ctx(ctx).Warning("watch", helpers.Error(err),
helpers.String("resource", c.res.Resource),
helpers.String("retry in", d.String()))
}
}); err != nil {
logger.L().Ctx(ctx).Fatal("giving up watch", helpers.Error(err),
helpers.String("resource", c.res.Resource))
}
}()
go c.watchRetry(ctx, watchOpts, eventQueue)
// process events
for event := range eventQueue.ResultChan {
// skip non-objects
d, ok := event.Object.(*unstructured.Unstructured)
Expand Down Expand Up @@ -175,6 +139,45 @@ func (c *Client) Start(ctx context.Context) error {
return nil
}

func (c *Client) watchRetry(ctx context.Context, watchOpts metav1.ListOptions, eventQueue *utils.CooldownQueue) {
if err := backoff.RetryNotify(func() error {
watcher, err := c.client.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts)
if err != nil {
return fmt.Errorf("client resource: %w", err)
}
logger.L().Info("starting watch", helpers.String("resource", c.res.Resource))
for {
event, chanActive := <-watcher.ResultChan()
// set resource version to resume watch from
// inspired by https://github.com/kubernetes/client-go/blob/5a0a4247921dd9e72d158aaa6c1ee124aba1da80/tools/watch/retrywatcher.go#L157
if metaObject, ok := event.Object.(resourceVersionGetter); ok {
watchOpts.ResourceVersion = metaObject.GetResourceVersion()
}
if eventQueue.Closed() {
watcher.Stop()
return backoff.Permanent(errors.New("event queue closed"))
}
if !chanActive {
// channel closed, retry
return errWatchClosed
}
if event.Type == watch.Error {
return fmt.Errorf("watch error: %s", event.Object)
}
eventQueue.Enqueue(event)
}
}, utils.NewBackOff(), func(err error, d time.Duration) {
if !errors.Is(err, errWatchClosed) {
logger.L().Ctx(ctx).Warning("watch", helpers.Error(err),
helpers.String("resource", c.res.Resource),
helpers.String("retry in", d.String()))
}
}); err != nil {
logger.L().Ctx(ctx).Fatal("giving up watch", helpers.Error(err),
helpers.String("resource", c.res.Resource))
}
}

// hasParent returns true if workload has a parent
// based on https://github.com/kubescape/k8s-interface/blob/2855cc94bd7666b227ad9e5db5ca25cb895e6cee/k8sinterface/k8sdynamic.go#L219
func hasParent(workload *unstructured.Unstructured) bool {
Expand Down
125 changes: 125 additions & 0 deletions adapters/incluster/v1/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package incluster

import (
"context"
"github.com/kubescape/synchronizer/domain"
"github.com/kubescape/synchronizer/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/k3s"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/ptr"
"testing"
"time"
)

var (
deploy = &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "test"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "test"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}},
},
},
},
}
)

func TestClient_watchRetry(t *testing.T) {
type fields struct {
client dynamic.Interface
account string
cluster string
kind *domain.Kind
callbacks domain.Callbacks
res schema.GroupVersionResource
ShadowObjects map[string][]byte
Strategy domain.Strategy
}
type args struct {
ctx context.Context
watchOpts metav1.ListOptions
eventQueue *utils.CooldownQueue
}
// we need a real client to test this, as the fake client ignores opts
ctx := context.TODO()
k3sC, err := k3s.RunContainer(ctx,
testcontainers.WithImage("docker.io/rancher/k3s:v1.27.9-k3s1"),
)
defer func() {
_ = k3sC.Terminate(ctx)
}()
require.NoError(t, err)
kubeConfigYaml, err := k3sC.GetKubeConfig(ctx)
require.NoError(t, err)
clusterConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeConfigYaml)
require.NoError(t, err)
dynamicClient := dynamic.NewForConfigOrDie(clusterConfig)
k8sclient := kubernetes.NewForConfigOrDie(clusterConfig)
tests := []struct {
name string
fields fields
args args
}{
{
name: "test reconnect after timeout",
fields: fields{
client: dynamicClient,
res: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
},
args: args{
eventQueue: utils.NewCooldownQueue(utils.DefaultQueueSize, utils.DefaultTTL),
watchOpts: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(1))},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
client: tt.fields.client,
account: tt.fields.account,
cluster: tt.fields.cluster,
kind: tt.fields.kind,
callbacks: tt.fields.callbacks,
res: tt.fields.res,
ShadowObjects: tt.fields.ShadowObjects,
Strategy: tt.fields.Strategy,
}
go c.watchRetry(ctx, tt.args.watchOpts, tt.args.eventQueue)
time.Sleep(5 * time.Second)
_, err = k8sclient.AppsV1().Deployments("default").Create(context.TODO(), deploy, metav1.CreateOptions{})
require.NoError(t, err)
time.Sleep(1 * time.Second)
var found bool
for event := range tt.args.eventQueue.ResultChan {
if event.Type == watch.Added && event.Object.(*unstructured.Unstructured).GetName() == "test" {
found = true
break
}
}
assert.True(t, found)
})
}
}
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestLoadConfig(t *testing.T) {
Resources: []Resource{
{Group: "apps", Version: "v1", Resource: "deployments", Strategy: "patch"},
{Group: "apps", Version: "v1", Resource: "statefulsets", Strategy: "patch"},
{Group: "", Version: "v1", Resource: "pods", Strategy: "patch"},
{Group: "spdx.softwarecomposition.kubescape.io", Version: "v1beta1", Resource: "applicationprofiles", Strategy: "patch"},
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions configuration/client/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
"strategy": "patch"
},
{
"group": "",
"version": "v1",
"resource": "pods",
"group": "spdx.softwarecomposition.kubescape.io",
"version": "v1beta1",
"resource": "applicationprofiles",
"strategy": "patch"
}
]
Expand Down
Loading
Loading