Skip to content

Commit

Permalink
Feat/allow replica of 1 for Jetstream (#2822)
Browse files Browse the repository at this point in the history
Signed-off-by: jmillage <[email protected]>
Co-authored-by: jmillage <[email protected]>
  • Loading branch information
joelcomp1 and jmillage authored Oct 5, 2023
1 parent 3d2cf74 commit 4f01b34
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 37 deletions.
47 changes: 47 additions & 0 deletions controllers/eventbus/installer/assets/jetstream/nats-cluster.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
max_payload: {{.MaxPayloadSize}}
port: {{.ClientPort}}
pid_file: "/var/run/nats/nats.pid"
###############
# #
# Monitoring #
# #
###############
http: {{.MonitorPort}}
server_name: $POD_NAME
###################################
# #
# NATS JetStream #
# #
###################################
jetstream {
key: $JS_KEY
store_dir: "/data/jetstream/store"
{{.Settings}}
}

###################################
# #
# NATS Cluster #
# #
###################################
cluster {
port: {{.ClusterPort}}
name: {{.ClusterName}}
routes: [{{.Routes}}]
cluster_advertise: $CLUSTER_ADVERTISE
connect_retries: 120

tls {
cert_file: "/etc/nats-config/cluster-server-cert.pem"
key_file: "/etc/nats-config/cluster-server-key.pem"
ca_file: "/etc/nats-config/cluster-ca-cert.pem"
}
}

lame_duck_duration: 120s
##################
# #
# Authorization #
# #
##################
include ./auth.conf
19 changes: 0 additions & 19 deletions controllers/eventbus/installer/assets/jetstream/nats.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,6 @@ jetstream {
store_dir: "/data/jetstream/store"
{{.Settings}}
}

###################################
# #
# NATS Cluster #
# #
###################################
cluster {
port: {{.ClusterPort}}
name: {{.ClusterName}}
routes: [{{.Routes}}]
cluster_advertise: $CLUSTER_ADVERTISE
connect_retries: 120

tls {
cert_file: "/etc/nats-config/cluster-server-cert.pem"
key_file: "/etc/nats-config/cluster-server-key.pem"
ca_file: "/etc/nats-config/cluster-ca-cert.pem"
}
}
lame_duck_duration: 120s
##################
# #
Expand Down
11 changes: 6 additions & 5 deletions controllers/eventbus/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,6 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
svcName := generateJetStreamServiceName(r.eventBus)
ssName := generateJetStreamStatefulSetName(r.eventBus)
replicas := r.eventBus.Spec.JetStream.GetReplicas()
if replicas < 3 {
replicas = 3
}
routes := []string{}
for j := 0; j < replicas; j++ {
routes = append(routes, fmt.Sprintf("nats://%s-%s.%s.%s.svc:%s", ssName, strconv.Itoa(j), svcName, r.eventBus.Namespace, strconv.Itoa(int(jsClusterPort))))
Expand All @@ -649,8 +646,12 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
if r.eventBus.Spec.JetStream.MaxPayload != nil {
maxPayload = *r.eventBus.Spec.JetStream.MaxPayload
}

confTpl := template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats.conf"))
var confTpl *template.Template
if replicas > 2 {
confTpl = template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats-cluster.conf"))
} else {
confTpl = template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats.conf"))
}
var confTplOutput bytes.Buffer
if err := confTpl.Execute(&confTplOutput, struct {
MaxPayloadSize string
Expand Down
3 changes: 0 additions & 3 deletions controllers/eventbus/installer/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,4 @@ func Test_JSBufferGetReplicas(t *testing.T) {
five := int32(5)
s.Replicas = &five
assert.Equal(t, 5, s.GetReplicas())
two := int32(2)
s.Replicas = &two
assert.Equal(t, 3, s.GetReplicas())
}
4 changes: 2 additions & 2 deletions controllers/eventbus/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func ValidateEventBus(eb *v1alpha1.EventBus) error {
if x.Version == "" {
return fmt.Errorf("invalid spec: a version for jetstream needs to be specified")
}
if x.Replicas != nil && *x.Replicas < 3 {
return fmt.Errorf("invalid spec: a jetstream eventbus requires at least 3 replicas")
if x.Replicas != nil && (*x.Replicas == 2 || *x.Replicas <= 0) {
return fmt.Errorf("invalid spec: a jetstream eventbus requires 1 replica or >= 3 replicas")
}
}
if x := eb.Spec.Kafka; x != nil {
Expand Down
6 changes: 1 addition & 5 deletions controllers/eventbus/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,8 @@ func TestValidate(t *testing.T) {

t.Run("test js eventbus replica", func(t *testing.T) {
eb := testJetStreamEventBus.DeepCopy()
eb.Spec.JetStream.Replicas = pointer.Int32(2)
err := ValidateEventBus(eb)
assert.Error(t, err)
assert.Contains(t, err.Error(), "invalid spec: a jetstream eventbus requires at least 3 replicas")
eb.Spec.JetStream.Replicas = pointer.Int32(3)
err = ValidateEventBus(eb)
err := ValidateEventBus(eb)
assert.NoError(t, err)
eb.Spec.JetStream.Replicas = nil
err = ValidateEventBus(eb)
Expand Down
3 changes: 0 additions & 3 deletions pkg/apis/eventbus/v1alpha1/jetstream_eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ func (j JetStreamBus) GetReplicas() int {
if j.Replicas == nil {
return 3
}
if *j.Replicas < 3 {
return 3
}
return int(*j.Replicas)
}

Expand Down

0 comments on commit 4f01b34

Please sign in to comment.