Skip to content

Commit

Permalink
Merge branch 'exotic-jetstream' of https://github.com/gokulav137/argo…
Browse files Browse the repository at this point in the history
…-events into exotic-jetstream
  • Loading branch information
gokulav137 committed Oct 22, 2023
2 parents be8a425 + 266fedf commit b9e69b3
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 11 deletions.
2 changes: 2 additions & 0 deletions common/leaderelection/leaderelection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"github.com/argoproj/argo-events/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
)

var (
configs = []eventbusv1alpha1.BusConfig{
{NATS: &eventbusv1alpha1.NATSConfig{}},
{JetStream: &eventbusv1alpha1.JetStreamConfig{}},
{JetStream: &eventbusv1alpha1.JetStreamConfig{AccessSecret: &v1.SecretKeySelector{}}},
}
)

Expand Down
50 changes: 50 additions & 0 deletions controllers/eventbus/installer/exotic_jetstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package installer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)

var (
testJSExoticURL = "nats://nats:4222"

testJSExoticBus = &v1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testExoticName,
},
Spec: v1alpha1.EventBusSpec{
JetStreamExotic: &v1alpha1.JetStreamConfig{
URL: testJSExoticURL,
},
},
}
)

func TestInstallationJSExotic(t *testing.T) {
t.Run("installation with exotic jetstream config", func(t *testing.T) {
installer := NewExoticJetStreamInstaller(testJSExoticBus, logging.NewArgoEventsLogger())
conf, err := installer.Install(context.TODO())
assert.NoError(t, err)
assert.NotNil(t, conf.JetStream)
assert.Equal(t, conf.JetStream.URL, testJSExoticURL)
})
}

func TestUninstallationJSExotic(t *testing.T) {
t.Run("uninstallation with exotic jetstream config", func(t *testing.T) {
installer := NewExoticJetStreamInstaller(testJSExoticBus, logging.NewArgoEventsLogger())
err := installer.Uninstall(context.TODO())
assert.NoError(t, err)
})
}
6 changes: 6 additions & 0 deletions controllers/eventbus/installer/installer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ func TestGetInstaller(t *testing.T) {
assert.NotNil(t, installer)
_, ok := installer.(*jetStreamInstaller)
assert.True(t, ok)

installer, err = getInstaller(testJetStreamExoticBus, nil, nil, fakeConfig, zaptest.NewLogger(t).Sugar())
assert.NoError(t, err)
assert.NotNil(t, installer)
_, ok = installer.(*exoticJetStreamInstaller)
assert.True(t, ok)
})
}

Expand Down
16 changes: 16 additions & 0 deletions controllers/eventbus/installer/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ var (
},
},
}

testJetStreamExoticBus = &v1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testName,
},
Spec: v1alpha1.EventBusSpec{
JetStreamExotic: &v1alpha1.JetStreamConfig{
URL: "nats://nats:4222",
},
},
}
)

func TestJetStreamBadInstallation(t *testing.T) {
Expand Down
25 changes: 25 additions & 0 deletions controllers/eventbus/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ var (
},
}

testJetStreamExoticBus = &v1alpha1.EventBus{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: common.DefaultEventBusName,
},
Spec: v1alpha1.EventBusSpec{
JetStreamExotic: &v1alpha1.JetStreamConfig{
URL: "nats://nats:4222",
},
},
}

testKafkaEventBus = &v1alpha1.EventBus{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Expand Down Expand Up @@ -68,6 +80,11 @@ func TestValidate(t *testing.T) {
assert.NoError(t, err)
})

t.Run("test good js exotic eventbus", func(t *testing.T) {
err := ValidateEventBus(testJetStreamExoticBus)
assert.NoError(t, err)
})

t.Run("test bad eventbus", func(t *testing.T) {
eb := testNatsEventBus.DeepCopy()
eb.Spec.NATS = nil
Expand Down Expand Up @@ -130,4 +147,12 @@ func TestValidate(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "\"spec.kafka.url\" is missing")
})

t.Run("test exotic js eventbus empty URL", func(t *testing.T) {
eb := testJetStreamExoticBus.DeepCopy()
eb.Spec.JetStreamExotic.URL = ""
err := ValidateEventBus(eb)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "\"spec.jetstreamExotic.url\" is missing"))
})
}
18 changes: 18 additions & 0 deletions docs/eventbus/jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,21 @@ For Jetstream, TLS is turned on for all client-server communication as well as b
## How it works under the hood

Jetstream has the concept of a Stream, and Subjects (i.e. topics) which are used on a Stream. From the documentation: “Each Stream defines how messages are stored and what the limits (duration, size, interest) of the retention are.” For Argo Events, we have one Stream called "default" with a single set of settings, but we have multiple subjects, each of which is named `default.<eventsourcename>.<eventname>`. Sensors subscribe to the subjects they need using durable consumers.

### Exotic

To use an existing JetStream service, follow the example below.

```yaml
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
spec:
jetstreamExotic:
url: nats://xxxxx:xxx
accessSecret:
name: my-secret-name
key: secret-key
streamConfig: ""
```
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/mitchellh/mapstructure v1.5.0
github.com/nats-io/graft v0.0.0-20220215174245-93d18541496f
github.com/nats-io/nats.go v1.30.2
github.com/nats-io/nats.go v1.31.0
github.com/nats-io/stan.go v0.10.4
github.com/nsqio/go-nsq v1.1.0
github.com/pkg/sftp v1.13.6
Expand Down Expand Up @@ -262,7 +262,7 @@ require (
github.com/moby/spdystream v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/nats-server/v2 v2.8.1 // indirect
github.com/nats-io/nats-server/v2 v2.9.23 // indirect
github.com/nats-io/nats-streaming-server v0.24.3 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
Expand Down Expand Up @@ -332,5 +332,3 @@ require (
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
)

replace github.com/antonmedv/expr => github.com/expr-lang/expr v0.0.0-20230912141041-709c5dd55aa7
14 changes: 7 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,8 @@ github.com/andygrunwald/go-gerrit v0.0.0-20230325081502-da63a5c62d80/go.mod h1:S
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
github.com/antonmedv/expr v1.15.2 h1:afFXpDWIC2n3bF+kTZE1JvFo+c34uaM3sTqh8z0xfdU=
github.com/antonmedv/expr v1.15.2/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE=
github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0=
github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI=
github.com/apache/openwhisk-client-go v0.0.0-20190915054138-716c6f973eb2 h1:mOsBfI/27csXzqNYu7XAf14RPGsRrcXJ8fjaYIhkuVU=
Expand Down Expand Up @@ -913,8 +915,6 @@ github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/expr-lang/expr v0.0.0-20230912141041-709c5dd55aa7 h1:Sg2XxaymeyqqaLG34aB2mvlX+nii916/Gv1ovWc4jMc=
github.com/expr-lang/expr v0.0.0-20230912141041-709c5dd55aa7/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE=
github.com/facebookgo/ensure v0.0.0-20160127193407-b4ab57deab51/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64=
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg=
github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0=
Expand Down Expand Up @@ -1535,19 +1535,19 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+
github.com/nats-io/graft v0.0.0-20220215174245-93d18541496f h1:UE9EK14XcoK/PmGqPtVWlrdMoPzBwJyzTCWEJ+cW7DI=
github.com/nats-io/graft v0.0.0-20220215174245-93d18541496f/go.mod h1:FDlTkeZBkKG5O+8RL3R0Q3gyhhHwG5sxcXcV7Lnx9x4=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak=
github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.4/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc=
github.com/nats-io/nats-server/v2 v2.8.1 h1:WZ9m/d8rklkWo6opo3X927vXnuaE00VEEl5zXcpL6qw=
github.com/nats-io/nats-server/v2 v2.8.1/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4=
github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU=
github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0=
github.com/nats-io/nats-streaming-server v0.24.3 h1:uZez8jBkXscua++jaDsK7DhpSAkizdetar6yWbPMRco=
github.com/nats-io/nats-streaming-server v0.24.3/go.mod h1:rqWfyCbxlhKj//fAp8POdQzeADwqkVhZcoWlbhkuU5w=
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
Expand Down
12 changes: 12 additions & 0 deletions webhook/validator/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,16 @@ func TestValidateEventBusUpdate(t *testing.T) {
r := v.ValidateUpdate(contextWithLogger(t))
assert.False(t, r.Allowed)
})

t.Run("test update native nats to exotic js", func(t *testing.T) {
newEb := eb.DeepCopy()
newEb.Generation++
newEb.Spec.NATS = nil
newEb.Spec.JetStreamExotic = &eventbusv1alpha1.JetStreamConfig{
URL: "nats://nats:4222",
}
v := NewEventBusValidator(fakeK8sClient, fakeEventBusClient, fakeEventSourceClient, fakeSensorClient, eb, newEb)
r := v.ValidateUpdate(contextWithLogger(t))
assert.False(t, r.Allowed)
})
}

0 comments on commit b9e69b3

Please sign in to comment.