Skip to content

Commit

Permalink
fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
floreks committed Sep 13, 2024
1 parent 3eece4d commit 6136521
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 51 deletions.
36 changes: 13 additions & 23 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"sync"
"time"

"github.com/pluralsh/deployment-operator/pkg/websocket"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/log"

"github.com/pluralsh/deployment-operator/pkg/websocket"

logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -51,7 +52,7 @@ type Controller struct {

// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.RateLimitingInterface
Queue workqueue.TypedRateLimitingInterface[string]

// mu is used to synchronize Controller setup
mu sync.Mutex
Expand Down Expand Up @@ -129,7 +130,7 @@ func (c *Controller) Start(ctx context.Context) {
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
id, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
Expand All @@ -141,35 +142,24 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)
c.reconcileHandler(ctx, obj)
defer c.Queue.Done(id)
c.reconcileHandler(ctx, id)
return true
}

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
func (c *Controller) reconcileHandler(ctx context.Context, id string) {
log := log.FromContext(ctx)
// Make sure that the object is a valid request.
req, ok := obj.(string)
if !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.Queue.Forget(obj)
// Return true, don't take a break
return
}

reconcileID := uuid.NewUUID()
ctx = addReconcileID(ctx, reconcileID)

// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
// resource to be synced.
log.V(5).Info("Reconciling")
result, err := c.Reconcile(ctx, req)
result, err := c.Reconcile(ctx, id)
switch {
case err != nil:

c.Queue.AddRateLimited(req)
c.Queue.AddRateLimited(id)

if !result.IsZero() {
log.V(1).Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")
Expand All @@ -181,16 +171,16 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.Queue.Forget(obj)
c.Queue.AddAfter(req, result.RequeueAfter)
c.Queue.Forget(id)
c.Queue.AddAfter(id, result.RequeueAfter)
case result.Requeue:
log.V(5).Info("Reconcile done, requeueing")
c.Queue.AddRateLimited(req)
c.Queue.AddRateLimited(id)
default:
log.V(5).Info("Reconcile successful")
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.Queue.Forget(obj)
c.Queue.Forget(id)
}
}

Expand Down
15 changes: 8 additions & 7 deletions pkg/controller/namespaces/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ import (
"time"

console "github.com/pluralsh/console/go/client"
clienterrors "github.com/pluralsh/deployment-operator/internal/errors"
"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/websocket"
"github.com/pluralsh/polly/algorithms"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -21,20 +16,26 @@ import (
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clienterrors "github.com/pluralsh/deployment-operator/internal/errors"
"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/websocket"
)

type NamespaceReconciler struct {
ConsoleClient client.Client
K8sClient ctrlclient.Client
NamespaceQueue workqueue.RateLimitingInterface
NamespaceQueue workqueue.TypedRateLimitingInterface[string]
NamespaceCache *client.Cache[console.ManagedNamespaceFragment]
}

func NewNamespaceReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, refresh time.Duration) *NamespaceReconciler {
return &NamespaceReconciler{
ConsoleClient: consoleClient,
K8sClient: k8sClient,
NamespaceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
NamespaceQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
NamespaceCache: client.NewCache[console.ManagedNamespaceFragment](refresh, func(id string) (*console.ManagedNamespaceFragment, error) {
return consoleClient.GetNamespace(id)
}),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/namespaces/socket_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type socketPublisher struct {
restoreQueue workqueue.RateLimitingInterface
restoreQueue workqueue.TypedRateLimitingInterface[string]
restoreCache *client.Cache[console.ManagedNamespaceFragment]
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/pipelinegates/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type GateReconciler struct {
Config *rest.Config
Clientset *kubernetes.Clientset
GateCache *client.Cache[console.PipelineGateFragment]
GateQueue workqueue.RateLimitingInterface
GateQueue workqueue.TypedRateLimitingInterface[string]
UtilFactory util.Factory
discoveryClient *discovery.DiscoveryClient
pinger *ping.Pinger
Expand All @@ -51,7 +51,7 @@ func NewGateReconciler(consoleClient client.Client, k8sClient ctrlclient.Client,
return consoleClient.GetClusterGate(id)
})

gateQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
gateQueue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())

f := utils.NewFactory(config)

Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/pipelinegates/socket_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package pipelinegates

import (
console "github.com/pluralsh/console/go/client"

"github.com/pluralsh/deployment-operator/pkg/client"

"k8s.io/client-go/util/workqueue"
)

type socketPublisher struct {
gateQueue workqueue.RateLimitingInterface
gateQueue workqueue.TypedRateLimitingInterface[string]
gateCache *client.Cache[console.PipelineGateFragment]
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/controller/restore/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import (
"time"

console "github.com/pluralsh/console/go/client"
"github.com/pluralsh/deployment-operator/pkg/client"
plrlerrors "github.com/pluralsh/deployment-operator/pkg/errors"
"github.com/pluralsh/deployment-operator/pkg/websocket"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/pluralsh/deployment-operator/pkg/client"
plrlerrors "github.com/pluralsh/deployment-operator/pkg/errors"
"github.com/pluralsh/deployment-operator/pkg/websocket"
)

var (
Expand Down Expand Up @@ -46,7 +47,7 @@ var (
type RestoreReconciler struct {
ConsoleClient client.Client
K8sClient ctrlclient.Client
RestoreQueue workqueue.RateLimitingInterface
RestoreQueue workqueue.TypedRateLimitingInterface[string]
RestoreCache *client.Cache[console.ClusterRestoreFragment]
Namespace string
}
Expand All @@ -55,7 +56,7 @@ func NewRestoreReconciler(consoleClient client.Client, k8sClient ctrlclient.Clie
return &RestoreReconciler{
ConsoleClient: consoleClient,
K8sClient: k8sClient,
RestoreQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
RestoreQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
RestoreCache: client.NewCache[console.ClusterRestoreFragment](refresh, func(id string) (*console.ClusterRestoreFragment, error) {
return consoleClient.GetClusterRestore(id)
}),
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/restore/socket_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package restore

import (
console "github.com/pluralsh/console/go/client"
"github.com/pluralsh/deployment-operator/pkg/client"
"k8s.io/client-go/util/workqueue"

"github.com/pluralsh/deployment-operator/pkg/client"
)

type socketPublisher struct {
restoreQueue workqueue.RateLimitingInterface
restoreQueue workqueue.TypedRateLimitingInterface[string]
restoreCache *client.Cache[console.ClusterRestoreFragment]
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/service/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ServiceReconciler struct {
Clientset *kubernetes.Clientset
Applier *applier.Applier
Destroyer *apply.Destroyer
SvcQueue workqueue.RateLimitingInterface
SvcQueue workqueue.TypedRateLimitingInterface[string]
SvcCache *client.Cache[console.GetServiceDeploymentForAgent_ServiceDeployment]
ManifestCache *manifests.ManifestCache
UtilFactory util.Factory
Expand All @@ -84,7 +84,7 @@ func NewServiceReconciler(ctx context.Context, consoleClient client.Client, conf
return consoleClient.GetService(id)
})

svcQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
svcQueue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())

manifestCache := manifests.NewCache(manifestTTL, deployToken, consoleURL)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/service/socket_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type socketPublisher struct {
svcQueue workqueue.RateLimitingInterface
svcQueue workqueue.TypedRateLimitingInterface[string]
svcCache *client.Cache[console.GetServiceDeploymentForAgent_ServiceDeployment]
manCache *manifests.ManifestCache
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/controller/stacks/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ import (
"time"

console "github.com/pluralsh/console/go/client"
clienterrors "github.com/pluralsh/deployment-operator/internal/errors"
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/websocket"
"github.com/pluralsh/polly/algorithms"
"k8s.io/client-go/util/workqueue"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clienterrors "github.com/pluralsh/deployment-operator/internal/errors"
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/websocket"
)

type StackReconciler struct {
ConsoleClient client.Client
K8sClient ctrlclient.Client
StackQueue workqueue.RateLimitingInterface
StackQueue workqueue.TypedRateLimitingInterface[string]
StackCache *client.Cache[console.StackRunFragment]
Namespace string
ConsoleURL string
Expand All @@ -32,7 +33,7 @@ func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client
return &StackReconciler{
ConsoleClient: consoleClient,
K8sClient: k8sClient,
StackQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
StackQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
StackCache: client.NewCache[console.StackRunFragment](refresh, func(id string) (*console.StackRunFragment, error) {
return consoleClient.GetStackRun(id)
}),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/stacks/socket_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type socketPublisher struct {
stackRunQueue workqueue.RateLimitingInterface
stackRunQueue workqueue.TypedRateLimitingInterface[string]
stackRunCache *client.Cache[console.StackRunFragment]
}

Expand Down

0 comments on commit 6136521

Please sign in to comment.