Skip to content

Commit

Permalink
chore(sendconfig) test DB mode errors
Browse files Browse the repository at this point in the history
Add minimal envtest scaffolding to test DB mode error event generation.

Fix a bug where the DB mode resource error array was accessed by value
instead of by reference (via its class instance's pointer).
  • Loading branch information
rainest committed Apr 17, 2024
1 parent 3f63fdb commit 68e0e3a
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 5 deletions.
6 changes: 3 additions & 3 deletions internal/dataplane/sendconfig/dbmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func (s UpdateStrategyDBMode) Update(ctx context.Context, targetContent ContentW

_, errs, _ := syncer.Solve(ctx, s.concurrency, false, false)
cancel()
s.resourceErrorLock.Lock()
defer s.resourceErrorLock.Unlock()
if errs != nil {
return deckutils.ErrArray{Errors: errs}, s.resourceErrors, nil, nil
}
Expand All @@ -110,9 +112,8 @@ func (s UpdateStrategyDBMode) Update(ctx context.Context, targetContent ContentW

// HandleEvents handles logging and error reporting for individual entity change events generated during a sync by
// looping over an event channel. It terminates when its context dies.
func (s UpdateStrategyDBMode) HandleEvents(ctx context.Context, events chan diff.EntityAction) {
func (s *UpdateStrategyDBMode) HandleEvents(ctx context.Context, events chan diff.EntityAction) {
s.resourceErrorLock.Lock()
count := 0
for {
select {
case event := <-events:
Expand All @@ -129,7 +130,6 @@ func (s UpdateStrategyDBMode) HandleEvents(ctx context.Context, events chan diff
s.logger.Error(err, "could not parse entity update error")
} else {
s.resourceErrors = append(s.resourceErrors, rerror)
count++
}
}
}
Expand Down
115 changes: 114 additions & 1 deletion test/envtest/configerrorevent_envtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,32 @@ package envtest
import (
"bytes"
"context"
"errors"
"fmt"
"regexp"
"testing"
"text/template"
"time"

"github.com/kong/kubernetes-testing-framework/pkg/utils/kubernetes/generators"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kong/kubernetes-ingress-controller/v3/internal/annotations"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane"
kongv1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/configuration/v1"
"github.com/kong/kubernetes-ingress-controller/v3/test"
"github.com/kong/kubernetes-ingress-controller/v3/test/mocks"
)

func TestConfigErrorEventGeneration(t *testing.T) {
func TestConfigErrorEventGenerationInMemoryMode(t *testing.T) {
// Can't be run in parallel because we're using t.Setenv() below which doesn't allow it.

const (
Expand Down Expand Up @@ -242,3 +249,109 @@ func formatErrBody(t *testing.T, namespace string, ingress *netv1.Ingress, servi

return b.Bytes()
}

func TestConfigErrorEventGenerationDBMode(t *testing.T) {
// Can't be run in parallel because we're using t.Setenv() below which doesn't allow it.

const (
waitTime = time.Minute
tickTime = 100 * time.Millisecond
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

scheme := Scheme(t, WithKong)
restConfig := Setup(t, scheme)
ctrlClientGlobal := NewControllerClient(t, scheme, restConfig)
ns := CreateNamespace(ctx, t, ctrlClientGlobal)
ctrlClient := client.NewNamespacedClient(ctrlClientGlobal, ns.Name)

ingressClassName := "kongenvtest"
deployIngressClass(ctx, t, ingressClassName, ctrlClient)

const podName = "kong-ingress-controller-tyjh1"
t.Setenv("POD_NAMESPACE", ns.Name)
t.Setenv("POD_NAME", podName)

t.Logf("creating a static consumer in %s namespace which will be used to test global validation", ns.Name)
consumer := &kongv1.KongConsumer{
ObjectMeta: metav1.ObjectMeta{
Name: "donenbai",
Annotations: map[string]string{
annotations.IngressClassKey: ingressClassName,
},
},
Username: "donenbai",
}
require.NoError(t, ctrlClient.Create(ctx, consumer))
t.Cleanup(func() {
if err := ctrlClient.Delete(ctx, consumer); err != nil && !apierrors.IsNotFound(err) && !errors.Is(err, context.Canceled) {
assert.NoError(t, err)
}
})

RunManager(ctx, t, restConfig,
AdminAPIOptFns(
// TODO IDK where we're getting the version from normally but it shouldn't really matter for this.
mocks.WithRoot(formatDBRootResponse("999.999.999")),
),
WithPublishService(ns.Name),
WithIngressClass(ingressClassName),
WithProxySyncSeconds(0.1),
)

t.Log("checking kongconsumer event creation")
require.Eventually(t, func() bool {
var events corev1.EventList
if err := ctrlClient.List(ctx, &events, &client.ListOptions{Namespace: ns.Name}); err != nil {
t.Logf("error listing events: %v", err)
return false
}
t.Logf("got %d events", len(events.Items))

matches := make([]bool, 1)
matches[0] = lo.ContainsBy(events.Items, func(e corev1.Event) bool {
return e.Reason == dataplane.KongConfigurationApplyFailedEventReason &&
e.InvolvedObject.Kind == "KongConsumer" &&
e.InvolvedObject.Name == consumer.Name &&
e.Message == "invalid : HTTP status 400 (message: \"2 schema violations (at least one of these fields must be non-empty: 'custom_id', 'username'; fake: unknown field)\")"
})
if lo.Count(matches, true) != 1 {
t.Logf("not all events matched: %+v", matches)
return false
}
return true
}, waitTime, tickTime)

t.Log("push failure events recorded successfully")
}

func formatDBRootResponse(version string) []byte {
const defaultDBLessRootResponse = `{
"version": "%s",
"configuration": {
"database": "postgres",
"router_flavor": "traditional",
"role": "traditional",
"proxy_listeners": [
{
"ipv6only=on": false,
"ipv6only=off": false,
"ssl": false,
"so_keepalive=off": false,
"listener": "0.0.0.0:8000",
"bind": false,
"port": 8000,
"deferred": false,
"so_keepalive=on": false,
"http2": false,
"proxy_protocol": false,
"ip": "0.0.0.0",
"reuseport": false
}
]
}
}`
return []byte(fmt.Sprintf(defaultDBLessRootResponse, version))
}
74 changes: 73 additions & 1 deletion test/mocks/admin_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ import (
"github.com/kong/kubernetes-ingress-controller/v3/internal/versions"
)

const mockConsumerError = `{
"code": 2,
"fields": {
"@entity": [
"at least one of these fields must be non-empty: 'custom_id', 'username'"
],
"fake": "unknown field"
},
"message": "2 schema violations (at least one of these fields must be non-empty: 'custom_id', 'username'; fake: unknown field)",
"name": "schema violation"
}`

// AdminAPIHandler is a mock implementation of the Admin API. It only implements the endpoints that are
// required for the tests.
type AdminAPIHandler struct {
Expand Down Expand Up @@ -40,6 +52,9 @@ type AdminAPIHandler struct {
// configPostErrorBody contains the error body which will be returned when
// responding to a `POST /config` request.
configPostErrorBody []byte

// rootResponse is the response body served by the admin API root "GET /" endpoint.
rootResponse []byte
}

type AdminAPIHandlerOpt func(h *AdminAPIHandler)
Expand Down Expand Up @@ -79,6 +94,12 @@ func WithConfigPostError(errorbody []byte) AdminAPIHandlerOpt {
}
}

func WithRoot(response []byte) AdminAPIHandlerOpt {
return func(h *AdminAPIHandler) {
h.rootResponse = response
}
}

func NewAdminAPIHandler(t *testing.T, opts ...AdminAPIHandlerOpt) *AdminAPIHandler {
h := &AdminAPIHandler{
version: versions.KICv3VersionCutoff.String(),
Expand All @@ -94,7 +115,11 @@ func NewAdminAPIHandler(t *testing.T, opts ...AdminAPIHandlerOpt) *AdminAPIHandl

mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
_, _ = w.Write(formatDefaultDBLessRootResponse(h.version))
if h.rootResponse != nil {
_, _ = w.Write(h.rootResponse)
} else {
_, _ = w.Write(formatDefaultDBLessRootResponse(h.version))
}
return
}

Expand Down Expand Up @@ -165,6 +190,53 @@ func NewAdminAPIHandler(t *testing.T, opts ...AdminAPIHandlerOpt) *AdminAPIHandl
t.Errorf("unexpected request: %s %s", r.Method, r.URL)
}
})
// These endpoints are only used for the DB mode test. They always return empty responses for GETs and a fake error
// for POSTs and PUTs, to test error handling.
mux.HandleFunc("/consumers/{$}", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{ "data": [], "next": null }`))

case http.MethodPost:
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(mockConsumerError))

case http.MethodPut:
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(mockConsumerError))

default:
t.Errorf("unexpected request: %s %s", r.Method, r.URL)
}
})
mux.HandleFunc("/consumers/", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
// This should technically be the below 404. The mux termination for handling "/consumers/" and
// "/consumers/<something>" isn't behaving as expected and always returns this less specific endpoint.
// Returning the expected 404 here breaks GDR because it can't find the list of existing consumers.
// Returning the 200 list response even for more specific queries is apparently fine for the purposes of the
// test, so we do so as a hack to get to the endpoint we care about, "POST/PUT /consumers/<maybe something>".
//
//w.WriteHeader(http.StatusNotFound)
//_, _ = w.Write([]byte(`{ "message": "Not found" }`))
//
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{ "data": [], "next": null }`))

case http.MethodPost:
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(mockConsumerError))

case http.MethodPut:
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(mockConsumerError))

default:
t.Errorf("unexpected request: %s %s", r.Method, r.URL)
}
})
h.mux = mux
return h
}
Expand Down

0 comments on commit 68e0e3a

Please sign in to comment.