Skip to content

Commit

Permalink
feat: Implement WatcherPool cleaner + enabling flag
Browse files Browse the repository at this point in the history
  • Loading branch information
achetronic committed Apr 4, 2024
1 parent 9ad0f40 commit bf6bf0e
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 120 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ They are described in the following table:
| `--health-probe-bind-address` | The address the probe endpoint binds to | `-` | `--health-probe-bind-address ":8091"` |
| `--metrics-bind-address` | The address the metric endpoint binds to | `:8080` | `--metrics-bind-address ":8090"` |
| `--events-per-second` | Amount of events processed per second (best effort) | `20` | `--events-per-second 50` |
| `--enable-watcher-cleaner` | Enable a WatcherPool cleaning process for orphan watchers | `false` | `--enable-watcher-cleaner true` |

## Config

Expand Down
7 changes: 7 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func main() {
var enableHTTP2 bool
var configPath string
var eventsPerSecond int
var enableWatcherPoolCleaner bool

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -74,6 +75,8 @@ func main() {
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.StringVar(&configPath, "config", "notifik.yaml", "The path to configuration file.")
flag.IntVar(&eventsPerSecond, "events-per-second", 20, "Amount of events processed per second (best effort)")
flag.BoolVar(&enableWatcherPoolCleaner, "enable-watcher-cleaner", false,
"If set, WatcherPool cleaner will be enabled for orphan watchers")

opts := zap.Options{
Development: true,
Expand Down Expand Up @@ -148,6 +151,10 @@ func main() {
if err = (&controller.NotificationReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),

Options: controller.NotificationControllerOptions{
EnableWatcherPoolCleaner: enableWatcherPoolCleaner,
},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Notification")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion config/samples/config/notifik.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
integrations:
# (Optional) Configuration parameters to be able to connect with generic webhooks
webhook:
url: "https://${WEBHOOK_TEST_USERNAME}:${WEBHOOK_TEST_PASSWORD}@webhook.site/98f1c771-bfaf-4c4f-81f6-f11c76684fcf"
url: "https://${WEBHOOK_TEST_USERNAME}:${WEBHOOK_TEST_PASSWORD}@webhook.site/dffa7c58-a11f-4bc5-9ead-305fa2ac6237"
headers:
X-Scope-OrgID: your-company

Expand Down
6 changes: 0 additions & 6 deletions config/samples/configmap-3.yaml

This file was deleted.

6 changes: 0 additions & 6 deletions config/samples/configmap-4.yaml

This file was deleted.

6 changes: 3 additions & 3 deletions config/samples/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
## Append samples of your project ##
resources:
# Sample resources
- configmap.yaml
- configmap-2.yaml
- configmap-3.yaml
- configmap-4.yaml
- secret.yaml

#
- notification/alertmanager/notifik_v1alpha1_notification_yaml.yaml
- notification/alertmanager/notifik_v1alpha1_notification_json.yaml

#+kubebuilder:scaffold:manifestskustomizesamples
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spec:
watch:
group: ""
version: v1
resource: configmaps
resource: secrets

# (Optional) It's possible to watch specific resources
# name: testing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v1
kind: ConfigMap
kind: Secret
metadata:
name: testing-2
data:
name: testing
stringData:
TEST_VAR: "placeholder"
9 changes: 9 additions & 0 deletions internal/controller/notification_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,19 @@ const (
notificationReconcileError = "Can not reconcile Notification: %s"
)

type NotificationControllerOptions struct {

// Enable WatcherPool cleaner process
EnableWatcherPoolCleaner bool
}

// NotificationReconciler reconciles a Notification object
type NotificationReconciler struct {
client.Client
Scheme *runtime.Scheme

//
Options NotificationControllerOptions
}

//+kubebuilder:rbac:groups=notifik.freepik.com,resources=notifications,verbs=get;list;watch;create;update;patch;delete
Expand Down
22 changes: 11 additions & 11 deletions internal/controller/notification_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ const (
func (r *NotificationReconciler) ReconcileNotification(ctx context.Context, eventType watch.EventType, notificationManifest *notifikv1alpha1.Notification) (err error) {
logger := log.FromContext(ctx)

// TODO check if is the last watcher of his resource type in global map

watchedTypeString := strings.Join([]string{
notificationManifest.Spec.Watch.Group,
notificationManifest.Spec.Watch.Version,
Expand All @@ -39,11 +37,11 @@ func (r *NotificationReconciler) ReconcileNotification(ctx context.Context, even
watchedType := globals.ResourceTypeName(watchedTypeString)

// Initialize the watcher into WatcherPool when not registered
if _, watcherFound := globals.Application.WatcherPool[watchedType]; !watcherFound {
if _, watcherFound := globals.Application.WatcherPool.Pool[watchedType]; !watcherFound {
globals.InitWatcher(watchedType)
}

notificationList := globals.Application.WatcherPool[watchedType].NotificationList
notificationList := globals.Application.WatcherPool.Pool[watchedType].NotificationList
//notificationIndex := globals.GetWatcherNotificationIndex(watchedType, notificationManifest)

notificationIndexes := globals.GetWatcherPoolNotificationIndexes(notificationManifest)
Expand Down Expand Up @@ -84,15 +82,17 @@ func (r *NotificationReconciler) ReconcileNotification(ctx context.Context, even
}

// Notification found, update it into the pool
// TODO: Decide if we want to log everything related to state
//logger.Info(watcherPoolUpdatedNotificationMessage,
// "watcher", watchedType)
(*notificationList)[notificationIndex] = notificationManifest

// TODO: Create a cleaner to delete empty watchers from WatcherPool
// TODO: Decide whether the cleaner should be executed by this controller or xyz.WorkloadController
// Delete empty watcher from the WatcherPool.
// This can be enabled setting a flag
if r.Options.EnableWatcherPoolCleaner {
globals.CleanWatcherPool()
}

// TODO: DELETE. DEBUG PURPOSES
// corelog.Printf("notification controller loop checkpoint. event type: %v", eventType)

// TODO: Decide if resourceType watcher restart is suitable on Notification update events
//*(globals.Application.WatcherPool[watchedType].StopSignal) <- true
return nil

}
44 changes: 4 additions & 40 deletions internal/globals/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,15 @@ package globals
import (
"context"
"sync"

"k8s.io/client-go/dynamic"

notifikv1alpha1 "freepik.com/notifik/api/v1alpha1"
)

var (
Application = applicationT{
Context: context.Background(),

WatcherPool: make(map[ResourceTypeName]ResourceTypeWatcherT),
WatcherPool: WatcherPoolT{
Mutex: &sync.Mutex{},
Pool: make(map[ResourceTypeName]ResourceTypeWatcherT),
},
}
)

// TODO
type ResourceTypeName string

// ApplicationT TODO
type applicationT struct {
// Context TODO
Context context.Context

// Configuration TODO
Configuration notifikv1alpha1.ConfigurationT

// KubeRawClient TODO
KubeRawClient *dynamic.DynamicClient

// WatcherPool TODO
WatcherPool map[ResourceTypeName]ResourceTypeWatcherT
}

// TODO
type ResourceTypeWatcherT struct {
// Enforce concurrency safety
Mutex *sync.Mutex

// Started represents a flag to know if the watcher is running
Started *bool
// Blocked represents a flag to prevent watcher from starting
Blocked *bool
// StopSignal represents a flag to kill the watcher.
// Watcher will be potentially re-launched by xyz.WorkloadController
StopSignal *chan bool

//
NotificationList *[]*notifikv1alpha1.Notification
}
51 changes: 51 additions & 0 deletions internal/globals/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package globals

import (
"context"
notifikv1alpha1 "freepik.com/notifik/api/v1alpha1"
"k8s.io/client-go/dynamic"
"sync"
)

// TODO
type ResourceTypeName string

// TODO
type ResourceTypeWatcherT struct {
// Enforce concurrency safety
Mutex *sync.Mutex

// Started represents a flag to know if the watcher is running
Started *bool
// Blocked represents a flag to prevent watcher from starting
Blocked *bool
// StopSignal represents a flag to kill the watcher.
// Watcher will be potentially re-launched by xyz.WorkloadController
StopSignal *chan bool

//
NotificationList *[]*notifikv1alpha1.Notification
}

type WatcherPoolT struct {
// Enforce concurrency safety
Mutex *sync.Mutex

Pool map[ResourceTypeName]ResourceTypeWatcherT
}

// ApplicationT TODO
type applicationT struct {
// Context TODO
Context context.Context

// Configuration TODO
Configuration notifikv1alpha1.ConfigurationT

// KubeRawClient TODO
KubeRawClient *dynamic.DynamicClient

// WatcherPool TODO
//WatcherPool map[ResourceTypeName]ResourceTypeWatcherT
WatcherPool WatcherPoolT
}
Loading

0 comments on commit bf6bf0e

Please sign in to comment.