Skip to content

Commit

Permalink
fix(controller): fix concurrent updates of monitoring resource status
Browse files Browse the repository at this point in the history
This fixes:
    Operation cannot be fulfilled on dash0monitorings.operator.dash0.com "$name":
    the object has been modified; please apply your changes to the latest version and try again
after updating a third party resource (Perses dashboard, Prometheus
rule) and writing the outcome to the Dash0 monitoring resource status.

Fixed by queuing all sync tasks via a queue that is shared across
resource types.
  • Loading branch information
basti1302 committed Dec 17, 2024
1 parent ab08ebd commit d03892a
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 24 deletions.
17 changes: 17 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -103,6 +104,8 @@ var (

metricNamePrefix = fmt.Sprintf("%s.", meterName)
meter otelmetric.Meter

thirdPartyResourceSynchronizationQueue *workqueue.Typed[controller.ThirdPartyResourceSyncJob]
)

func init() {
Expand Down Expand Up @@ -420,6 +423,12 @@ func startOperatorManager(
return fmt.Errorf("unable to set up the ready check: %w", err)
}

defer func() {
if thirdPartyResourceSynchronizationQueue != nil {
controller.StopProcessingThirdPartySynchronizationQueue(thirdPartyResourceSynchronizationQueue, &setupLog)
}
}()

setupLog.Info("starting manager")
if err = mgr.Start(ctrl.SetupSignalHandler()); err != nil {
return fmt.Errorf("unable to set up the signal handler: %w", err)
Expand Down Expand Up @@ -613,8 +622,14 @@ func startDash0Controllers(
return fmt.Errorf("unable to set up the backend connection reconciler: %w", err)
}

thirdPartyResourceSynchronizationQueue =
workqueue.NewTypedWithConfig(
workqueue.TypedQueueConfig[controller.ThirdPartyResourceSyncJob]{
Name: "dash0-third-party-resource-reconcile-queue",
})
persesDashboardCrdReconciler := &controller.PersesDashboardCrdReconciler{
Client: k8sClient,
Queue: thirdPartyResourceSynchronizationQueue,
AuthToken: envVars.selfMonitoringAndApiAuthToken,
}
if err := persesDashboardCrdReconciler.SetupWithManager(ctx, mgr, startupTasksK8sClient, &setupLog); err != nil {
Expand All @@ -627,6 +642,7 @@ func startDash0Controllers(
)
prometheusRuleCrdReconciler := &controller.PrometheusRuleCrdReconciler{
Client: k8sClient,
Queue: thirdPartyResourceSynchronizationQueue,
AuthToken: envVars.selfMonitoringAndApiAuthToken,
}
if err := prometheusRuleCrdReconciler.SetupWithManager(ctx, mgr, startupTasksK8sClient, &setupLog); err != nil {
Expand All @@ -637,6 +653,7 @@ func startDash0Controllers(
metricNamePrefix,
&setupLog,
)
controller.StartProcessingThirdPartySynchronizationQueue(thirdPartyResourceSynchronizationQueue, &setupLog)

operatorConfigurationReconciler := &controller.OperatorConfigurationReconciler{
Client: k8sClient,
Expand Down
13 changes: 10 additions & 3 deletions internal/controller/perses_dashboards_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

type PersesDashboardCrdReconciler struct {
Client client.Client
Queue *workqueue.Typed[ThirdPartyResourceSyncJob]
AuthToken string
mgr ctrl.Manager
skipNameValidation bool
Expand All @@ -44,6 +45,7 @@ type PersesDashboardCrdReconciler struct {
type PersesDashboardReconciler struct {
client.Client
pseudoClusterUid types.UID
queue *workqueue.Typed[ThirdPartyResourceSyncJob]
httpClient *http.Client
apiConfig atomic.Pointer[ApiConfig]
authToken string
Expand Down Expand Up @@ -110,6 +112,7 @@ func (r *PersesDashboardCrdReconciler) CreateResourceReconciler(
) {
r.persesDashboardReconciler = &PersesDashboardReconciler{
Client: r.Client,
queue: r.Queue,
pseudoClusterUid: pseudoClusterUid,
authToken: authToken,
httpClient: httpClient,
Expand Down Expand Up @@ -292,6 +295,10 @@ func (r *PersesDashboardReconciler) K8sClient() client.Client {
return r.Client
}

func (r *PersesDashboardReconciler) Queue() *workqueue.Typed[ThirdPartyResourceSyncJob] {
return r.queue
}

func (r *PersesDashboardReconciler) HttpClient() *http.Client {
return r.httpClient
}
Expand Down Expand Up @@ -333,7 +340,7 @@ func (r *PersesDashboardReconciler) Create(
e.Object.GetName(),
)

upsertViaApi(ctx, r, e.Object, &logger)
upsertViaApi(r, e.Object)
}

func (r *PersesDashboardReconciler) Update(
Expand All @@ -354,7 +361,7 @@ func (r *PersesDashboardReconciler) Update(
e.ObjectNew.GetName(),
)

upsertViaApi(ctx, r, e.ObjectNew, &logger)
upsertViaApi(r, e.ObjectNew)
}

func (r *PersesDashboardReconciler) Delete(
Expand All @@ -375,7 +382,7 @@ func (r *PersesDashboardReconciler) Delete(
e.Object.GetName(),
)

deleteViaApi(ctx, r, e.Object, &logger)
deleteViaApi(r, e.Object)
}

func (r *PersesDashboardReconciler) Generic(
Expand Down
9 changes: 9 additions & 0 deletions internal/controller/perses_dashboards_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
Expand All @@ -33,6 +34,9 @@ import (
var (
persesDashboardCrdReconciler *PersesDashboardCrdReconciler
persesDashboardCrd *apiextensionsv1.CustomResourceDefinition
testQueuePersesDashboards = workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[ThirdPartyResourceSyncJob]{
Name: "dash0-third-party-resource-reconcile-queue",
})

dashboardApiBasePath = "/api/dashboards/"

Expand Down Expand Up @@ -216,6 +220,8 @@ var _ = Describe("The Perses dashboard controller", Ordered, func() {
ensurePersesDashboardCrdExists(ctx)

Expect(persesDashboardCrdReconciler.SetupWithManager(ctx, mgr, k8sClient, &logger)).To(Succeed())

StartProcessingThirdPartySynchronizationQueue(testQueuePersesDashboards, &logger)
})

BeforeEach(func() {
Expand All @@ -235,6 +241,7 @@ var _ = Describe("The Perses dashboard controller", Ordered, func() {

AfterAll(func() {
deletePersesDashboardCrdIfItExists(ctx)
StopProcessingThirdPartySynchronizationQueue(testQueuePersesDashboards, &logger)
})

It("it ignores Perses dashboard resource changes if no Dash0 monitoring resource exists in the namespace", func() {
Expand Down Expand Up @@ -426,6 +433,7 @@ var _ = Describe("The Perses dashboard controller", Ordered, func() {
func createPersesDashboardCrdReconcilerWithoutAuthToken() {
persesDashboardCrdReconciler = &PersesDashboardCrdReconciler{
Client: k8sClient,
Queue: testQueuePersesDashboards,

// We create the controller multiple times in tests, this option is required, otherwise the controller
// runtime will complain.
Expand All @@ -436,6 +444,7 @@ func createPersesDashboardCrdReconcilerWithoutAuthToken() {
func createPersesDashboardCrdReconcilerWithAuthToken() {
persesDashboardCrdReconciler = &PersesDashboardCrdReconciler{
Client: k8sClient,
Queue: testQueuePersesDashboards,
AuthToken: AuthorizationTokenTest,

// We create the controller multiple times in tests, this option is required, otherwise the controller
Expand Down
13 changes: 10 additions & 3 deletions internal/controller/prometheus_rules_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

type PrometheusRuleCrdReconciler struct {
Client client.Client
Queue *workqueue.Typed[ThirdPartyResourceSyncJob]
AuthToken string
mgr ctrl.Manager
skipNameValidation bool
Expand All @@ -46,6 +47,7 @@ type PrometheusRuleCrdReconciler struct {
type PrometheusRuleReconciler struct {
client.Client
pseudoClusterUid types.UID
queue *workqueue.Typed[ThirdPartyResourceSyncJob]
httpClient *http.Client
apiConfig atomic.Pointer[ApiConfig]
authToken string
Expand Down Expand Up @@ -130,6 +132,7 @@ func (r *PrometheusRuleCrdReconciler) CreateResourceReconciler(
) {
r.prometheusRuleReconciler = &PrometheusRuleReconciler{
Client: r.Client,
queue: r.Queue,
pseudoClusterUid: pseudoClusterUid,
authToken: authToken,
httpClient: httpClient,
Expand Down Expand Up @@ -312,6 +315,10 @@ func (r *PrometheusRuleReconciler) K8sClient() client.Client {
return r.Client
}

func (r *PrometheusRuleReconciler) Queue() *workqueue.Typed[ThirdPartyResourceSyncJob] {
return r.queue
}

func (r *PrometheusRuleReconciler) HttpClient() *http.Client {
return r.httpClient
}
Expand Down Expand Up @@ -353,7 +360,7 @@ func (r *PrometheusRuleReconciler) Create(
e.Object.GetName(),
)

upsertViaApi(ctx, r, e.Object, &logger)
upsertViaApi(r, e.Object)
}

func (r *PrometheusRuleReconciler) Update(
Expand All @@ -374,7 +381,7 @@ func (r *PrometheusRuleReconciler) Update(
e.ObjectNew.GetName(),
)

upsertViaApi(ctx, r, e.ObjectNew, &logger)
upsertViaApi(r, e.ObjectNew)
}

func (r *PrometheusRuleReconciler) Delete(
Expand All @@ -395,7 +402,7 @@ func (r *PrometheusRuleReconciler) Delete(
e.Object.GetName(),
)

deleteViaApi(ctx, r, e.Object, &logger)
deleteViaApi(r, e.Object)
}

func (r *PrometheusRuleReconciler) Generic(
Expand Down
9 changes: 9 additions & 0 deletions internal/controller/prometheus_rules_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
Expand All @@ -34,6 +35,9 @@ import (
var (
prometheusRuleCrdReconciler *PrometheusRuleCrdReconciler
prometheusRuleCrd *apiextensionsv1.CustomResourceDefinition
testQueuePrometheusRules = workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[ThirdPartyResourceSyncJob]{
Name: "dash0-third-party-resource-reconcile-queue",
})

checkRuleApiBasePath = "/api/alerting/check-rules/"

Expand Down Expand Up @@ -229,6 +233,8 @@ var _ = Describe("The Prometheus rule controller", Ordered, func() {
ensurePrometheusRuleCrdExists(ctx)

Expect(prometheusRuleCrdReconciler.SetupWithManager(ctx, mgr, k8sClient, &logger)).To(Succeed())

StartProcessingThirdPartySynchronizationQueue(testQueuePrometheusRules, &logger)
})

BeforeEach(func() {
Expand All @@ -248,6 +254,7 @@ var _ = Describe("The Prometheus rule controller", Ordered, func() {

AfterAll(func() {
deletePrometheusRuleCrdIfItExists(ctx)
StopProcessingThirdPartySynchronizationQueue(testQueuePrometheusRules, &logger)
})

It("it ignores Prometheus rule resource changes if no Dash0 monitoring resource exists in the namespace", func() {
Expand Down Expand Up @@ -943,6 +950,7 @@ var _ = Describe("The Prometheus rule controller", Ordered, func() {
func createPrometheusRuleCrdReconcilerWithoutAuthToken() {
prometheusRuleCrdReconciler = &PrometheusRuleCrdReconciler{
Client: k8sClient,
Queue: testQueuePrometheusRules,

// We create the controller multiple times in tests, this option is required, otherwise the controller
// runtime will complain.
Expand All @@ -953,6 +961,7 @@ func createPrometheusRuleCrdReconcilerWithoutAuthToken() {
func createPrometheusRuleCrdReconcilerWithAuthToken() {
prometheusRuleCrdReconciler = &PrometheusRuleCrdReconciler{
Client: k8sClient,
Queue: testQueuePrometheusRules,
AuthToken: AuthorizationTokenTest,

// We create the controller multiple times in tests, this option is required, otherwise the controller
Expand Down
Loading

0 comments on commit d03892a

Please sign in to comment.