Skip to content

Commit c353ccc

Browse files
adriandietersamuelattwood
authored andcommitted
feat(controller-runtime): Add consumer controller (#212)
* implement consumerSpecToConfig * implement consumer resource initialization * implement consumer update/creation * implement preventUpdate, readonly and namespace restrictions Checks for the PreventUpdate or readonly mode during creation/update. Skips reconciliation when resource is in namespace not matching restriction. * test consumer creation on alternative server * implement consumer deletion * handle deletion when the underlying stream was deleted * add missing GenerationChanged event filter to consumerReconciler * update logging Set streamName and consumerName fields once. Reword log messages.
1 parent e9dd49c commit c353ccc

File tree

3 files changed

+888
-33
lines changed

3 files changed

+888
-33
lines changed

internal/controller/consumer_controller.go

+262-3
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,18 @@ package controller
1818

1919
import (
2020
"context"
21+
"errors"
22+
"fmt"
23+
"github.com/go-logr/logr"
24+
"github.com/nats-io/nats.go/jetstream"
25+
v1 "k8s.io/api/core/v1"
26+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2127
"k8s.io/klog/v2"
28+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
29+
"sigs.k8s.io/controller-runtime/pkg/predicate"
30+
"time"
2231

23-
jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
32+
api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
2433
ctrl "sigs.k8s.io/controller-runtime"
2534
)
2635

@@ -36,14 +45,264 @@ type ConsumerReconciler struct {
3645
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
3746
func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
3847
log := klog.FromContext(ctx)
39-
log.Info("reconcile", "namespace", req.Namespace, "name", req.Name)
4048

49+
if ok := r.ValidNamespace(req.Namespace); !ok {
50+
log.Info("Controller restricted to namespace, skipping reconciliation.")
51+
return ctrl.Result{}, nil
52+
}
53+
54+
// Fetch consumer resource
55+
consumer := &api.Consumer{}
56+
if err := r.Get(ctx, req.NamespacedName, consumer); err != nil {
57+
if apierrors.IsNotFound(err) {
58+
log.Info("Consumer resource not found. Ignoring since object must be deleted.")
59+
return ctrl.Result{}, nil
60+
}
61+
return ctrl.Result{}, fmt.Errorf("get consumer resource '%s': %w", req.NamespacedName.String(), err)
62+
}
63+
64+
log = log.WithValues(
65+
"streamName", consumer.Spec.StreamName,
66+
"consumerName", consumer.Spec.DurableName,
67+
)
68+
69+
// Update ready status to unknown when no status is set
70+
if consumer.Status.Conditions == nil || len(consumer.Status.Conditions) == 0 {
71+
log.Info("Setting initial ready condition to unknown.")
72+
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation")
73+
err := r.Status().Update(ctx, consumer)
74+
if err != nil {
75+
return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err)
76+
}
77+
return ctrl.Result{Requeue: true}, nil
78+
}
79+
80+
// Add finalizer
81+
if !controllerutil.ContainsFinalizer(consumer, consumerFinalizer) {
82+
log.Info("Adding consumer finalizer.")
83+
if ok := controllerutil.AddFinalizer(consumer, consumerFinalizer); !ok {
84+
return ctrl.Result{}, errors.New("failed to add finalizer to consumer resource")
85+
}
86+
87+
if err := r.Update(ctx, consumer); err != nil {
88+
return ctrl.Result{}, fmt.Errorf("update consumer resource to add finalizer: %w", err)
89+
}
90+
return ctrl.Result{}, nil
91+
}
92+
93+
// Check Deletion
94+
markedForDeletion := consumer.GetDeletionTimestamp() != nil
95+
if markedForDeletion {
96+
if controllerutil.ContainsFinalizer(consumer, consumerFinalizer) {
97+
err := r.deleteConsumer(ctx, log, consumer)
98+
if err != nil {
99+
return ctrl.Result{}, fmt.Errorf("delete consumer: %w", err)
100+
}
101+
} else {
102+
log.Info("Consumer marked for deletion and already finalized. Ignoring.")
103+
}
104+
105+
return ctrl.Result{}, nil
106+
}
107+
108+
// Create or update stream
109+
if err := r.createOrUpdate(ctx, log, consumer); err != nil {
110+
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
111+
}
41112
return ctrl.Result{}, nil
42113
}
43114

115+
func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger, consumer *api.Consumer) error {
116+
117+
// Set status to not false
118+
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.")
119+
if err := r.Status().Update(ctx, consumer); err != nil {
120+
return fmt.Errorf("update ready condition: %w", err)
121+
}
122+
123+
if !consumer.Spec.PreventDelete && !r.ReadOnly() {
124+
err := r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error {
125+
return js.DeleteConsumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
126+
})
127+
switch {
128+
case errors.Is(err, jetstream.ErrConsumerNotFound):
129+
log.Info("Consumer does not exist. Unable to delete.")
130+
case errors.Is(err, jetstream.ErrStreamNotFound):
131+
log.Info("Stream of consumer does not exist. Unable to delete.")
132+
case err != nil:
133+
return fmt.Errorf("delete jetstream consumer: %w", err)
134+
default:
135+
log.Info("Consumer deleted.")
136+
}
137+
} else {
138+
log.Info("Skipping consumer deletion.",
139+
"consumerName", consumer.Spec.DurableName,
140+
"preventDelete", consumer.Spec.PreventDelete,
141+
"read-only", r.ReadOnly(),
142+
)
143+
}
144+
145+
log.Info("Removing consumer finalizer.")
146+
if ok := controllerutil.RemoveFinalizer(consumer, consumerFinalizer); !ok {
147+
return errors.New("failed to remove consumer finalizer")
148+
}
149+
if err := r.Update(ctx, consumer); err != nil {
150+
return fmt.Errorf("remove finalizer: %w", err)
151+
}
152+
153+
return nil
154+
}
155+
156+
func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger, consumer *api.Consumer) error {
157+
158+
// Create or Update the stream based on the spec
159+
if consumer.Spec.PreventUpdate || r.ReadOnly() {
160+
log.Info("Skipping consumer creation or update.",
161+
"preventDelete", consumer.Spec.PreventDelete,
162+
"read-only", r.ReadOnly(),
163+
)
164+
return nil
165+
}
166+
167+
// Map spec to consumer target config
168+
targetConfig, err := consumerSpecToConfig(&consumer.Spec)
169+
if err != nil {
170+
return fmt.Errorf("map consumer spec to target config: %w", err)
171+
}
172+
173+
err = r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error {
174+
log.Info("Consumer created or updated.")
175+
_, err := js.CreateOrUpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
176+
return err
177+
})
178+
if err != nil {
179+
err = fmt.Errorf("create or update consumer: %w", err)
180+
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Errored", err.Error())
181+
if err := r.Status().Update(ctx, consumer); err != nil {
182+
log.Error(err, "Failed to update ready condition to Errored.")
183+
}
184+
return err
185+
}
186+
187+
// update the observed generation and ready status
188+
consumer.Status.ObservedGeneration = consumer.Generation
189+
consumer.Status.Conditions = updateReadyCondition(
190+
consumer.Status.Conditions,
191+
v1.ConditionTrue,
192+
"Reconciling",
193+
"Consumer successfully created or updated.",
194+
)
195+
err = r.Status().Update(ctx, consumer)
196+
if err != nil {
197+
return fmt.Errorf("update ready condition: %w", err)
198+
}
199+
200+
return nil
201+
}
202+
203+
func consumerConnOpts(spec api.ConsumerSpec) *connectionOptions {
204+
return &connectionOptions{
205+
Account: spec.Account,
206+
Creds: spec.Creds,
207+
Nkey: spec.Nkey,
208+
Servers: spec.Servers,
209+
TLS: spec.TLS,
210+
}
211+
}
212+
213+
func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, error) {
214+
215+
config := &jetstream.ConsumerConfig{
216+
Durable: spec.DurableName,
217+
Description: spec.Description,
218+
OptStartSeq: uint64(spec.OptStartSeq),
219+
MaxDeliver: spec.MaxDeliver,
220+
FilterSubject: spec.FilterSubject,
221+
RateLimit: uint64(spec.RateLimitBps),
222+
SampleFrequency: spec.SampleFreq,
223+
MaxWaiting: spec.MaxWaiting,
224+
MaxAckPending: spec.MaxAckPending,
225+
HeadersOnly: spec.HeadersOnly,
226+
MaxRequestBatch: spec.MaxRequestBatch,
227+
MaxRequestMaxBytes: spec.MaxRequestMaxBytes,
228+
Replicas: spec.Replicas,
229+
MemoryStorage: spec.MemStorage,
230+
FilterSubjects: spec.FilterSubjects,
231+
Metadata: spec.Metadata,
232+
233+
// Explicitly set not (yet) mapped fields
234+
Name: "",
235+
InactiveThreshold: 0,
236+
}
237+
238+
// DeliverPolicy
239+
if spec.DeliverPolicy != "" {
240+
err := config.DeliverPolicy.UnmarshalJSON(asJsonString(spec.DeliverPolicy))
241+
if err != nil {
242+
return nil, fmt.Errorf("invalid delivery policy: %w", err)
243+
}
244+
}
245+
246+
// OptStartTime RFC3339
247+
if spec.OptStartTime != "" {
248+
t, err := time.Parse(time.RFC3339, spec.OptStartTime)
249+
if err != nil {
250+
return nil, fmt.Errorf("invalid opt start time: %w", err)
251+
}
252+
config.OptStartTime = &t
253+
}
254+
255+
// AckPolicy
256+
if spec.AckPolicy != "" {
257+
err := config.AckPolicy.UnmarshalJSON(asJsonString(spec.AckPolicy))
258+
if err != nil {
259+
return nil, fmt.Errorf("invalid ack policy: %w", err)
260+
}
261+
}
262+
263+
// AckWait
264+
if spec.AckWait != "" {
265+
d, err := time.ParseDuration(spec.AckWait)
266+
if err != nil {
267+
return nil, fmt.Errorf("invalid ack wait duration: %w", err)
268+
}
269+
config.AckWait = d
270+
}
271+
272+
//BackOff
273+
for _, bo := range spec.BackOff {
274+
d, err := time.ParseDuration(bo)
275+
if err != nil {
276+
return nil, fmt.Errorf("invalid backoff: %w", err)
277+
}
278+
279+
config.BackOff = append(config.BackOff, d)
280+
}
281+
282+
// ReplayPolicy
283+
if spec.ReplayPolicy != "" {
284+
err := config.ReplayPolicy.UnmarshalJSON(asJsonString(spec.ReplayPolicy))
285+
if err != nil {
286+
return nil, fmt.Errorf("invalid replay policy: %w", err)
287+
}
288+
}
289+
290+
// MaxRequestExpires
291+
if spec.MaxRequestExpires != "" {
292+
d, err := time.ParseDuration(spec.MaxRequestExpires)
293+
if err != nil {
294+
return nil, fmt.Errorf("invalid opt start time: %w", err)
295+
}
296+
config.MaxRequestExpires = d
297+
}
298+
299+
return config, nil
300+
}
301+
44302
// SetupWithManager sets up the controller with the Manager.
45303
func (r *ConsumerReconciler) SetupWithManager(mgr ctrl.Manager) error {
46304
return ctrl.NewControllerManagedBy(mgr).
47-
For(&jetstreamnatsiov1beta2.Consumer{}).
305+
For(&api.Consumer{}).
306+
WithEventFilter(predicate.GenerationChangedPredicate{}).
48307
Complete(r)
49308
}

0 commit comments

Comments
 (0)