diff --git a/common/expr/eval.go b/common/expr/eval.go index a29c78bed1..9e3da5e785 100644 --- a/common/expr/eval.go +++ b/common/expr/eval.go @@ -8,8 +8,8 @@ import ( "encoding/json" - "github.com/antonmedv/expr" "github.com/doublerebel/bellows" + "github.com/expr-lang/expr" sprig "github.com/Masterminds/sprig/v3" exprpkg "github.com/argoproj/pkg/expr" diff --git a/docs/eventsources/filtering.md b/docs/eventsources/filtering.md index 770760d3c1..efbd1dd11e 100644 --- a/docs/eventsources/filtering.md +++ b/docs/eventsources/filtering.md @@ -29,7 +29,7 @@ spec: jitter: 0.2 ``` -The `expression` string is evaluated with the [expr](https://github.com/antonmedv/expr) package which offers a wide set of basic operators and comparators. +The `expression` string is evaluated with the [expr](https://github.com/expr-lang/expr) package which offers a wide set of basic operators and comparators. # Example diff --git a/docs/sensors/filters/expr.md b/docs/sensors/filters/expr.md index b164dbb799..1909f0b60f 100644 --- a/docs/sensors/filters/expr.md +++ b/docs/sensors/filters/expr.md @@ -90,18 +90,18 @@ The `expr` field defines the expression to be evaluated. The `fields` stanza def `name` is arbitrary and used in the `expr`, `path` defines how to find the value in the data payload then to be assigned to a parameter. -The expr filter evaluates the expression contained in `expr` using [govaluate](https://github.com/Knetic/govaluate). This library leverages an incredible flexibility and power. +The expr filter evaluates the expression contained in `expr` using [expr](https://github.com/expr-lang/expr). This library leverages an incredible flexibility and power. -With govaluate we are able to define complex combination of arithmetic (`-`, `*`, `/`, `**`, `%`), negation (`-`), inversion (`!`), bitwise not (`~`), logical (`&&`, `||`), ternary conditional (`?`, `:`) operators, +With expr we are able to define complex combination of arithmetic (`-`, `*`, `/`, `**`, `%`), negation (`not` or `!`), inversion (`!`), logical (`&&` or `and`, `||` or `or`), ternary conditional (`?`, `:`) operators, together with comparators (`>`, `<`, `>=`, `<=`), comma-separated arrays and custom functions. Here some examples: -- `action =~ "start"` +- `action matches "start"` - `action == "end" && started == true` -- `action =~ "start" || (started == true && instances == 2)` +- `action matches "start" || (started == true && instances == 2)` -To discover all options offered by govaluate, take a look at its [manual](https://github.com/Knetic/govaluate/blob/master/MANUAL.md). +To discover all options offered by expr, take a look at its [manual](https://expr.medv.io/docs/Language-Definition). ## Practical example diff --git a/eventbus/jetstream/sensor/trigger_conn.go b/eventbus/jetstream/sensor/trigger_conn.go index bc97e3404f..946b50eb3d 100644 --- a/eventbus/jetstream/sensor/trigger_conn.go +++ b/eventbus/jetstream/sensor/trigger_conn.go @@ -9,8 +9,9 @@ import ( "sync" "time" - "github.com/Knetic/govaluate" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/expr-lang/expr" + "github.com/expr-lang/expr/vm" nats "github.com/nats-io/nats.go" eventbuscommon "github.com/argoproj/argo-events/eventbus/common" @@ -24,7 +25,7 @@ type JetstreamTriggerConn struct { keyValueStore nats.KeyValue dependencyExpression string requiresANDLogic bool - evaluableExpression *govaluate.EvaluableExpression + evaluableExpression *vm.Program deps []eventbuscommon.Dependency sourceDepMap map[string][]string // maps EventSource and EventName to dependency name recentMsgsByID map[string]*msg // prevent re-processing the same message as before (map of msg ID to time) @@ -44,30 +45,35 @@ func NewJetstreamTriggerConn(conn *jetstreambase.JetstreamConnection, var err error sourceDepMap := make(map[string][]string) - for _, d := range deps { + sanitizedDepExpr := dependencyExpression + + for i, d := range deps { + sanitizedDepName := strings.ReplaceAll(d.Name, "-", "_") key := d.EventSourceName + "__" + d.EventName _, found := sourceDepMap[key] if !found { sourceDepMap[key] = make([]string, 0) } - sourceDepMap[key] = append(sourceDepMap[key], d.Name) + sanitizedDepExpr = strings.ReplaceAll(sanitizedDepExpr, d.Name, sanitizedDepName) + deps[i].Name = sanitizedDepName + sourceDepMap[key] = append(sourceDepMap[key], sanitizedDepName) } connection := &JetstreamTriggerConn{ JetstreamConnection: conn, sensorName: sensorName, triggerName: triggerName, - dependencyExpression: dependencyExpression, - requiresANDLogic: strings.Contains(dependencyExpression, "&"), + dependencyExpression: sanitizedDepExpr, + requiresANDLogic: strings.Contains(sanitizedDepExpr, "&"), deps: deps, sourceDepMap: sourceDepMap, recentMsgsByID: make(map[string]*msg), recentMsgsByTime: make([]*msg, 0)} connection.Logger = connection.Logger.With("triggerName", connection.triggerName, "sensorName", connection.sensorName) - connection.evaluableExpression, err = govaluate.NewEvaluableExpression(strings.ReplaceAll(dependencyExpression, "-", "\\-")) + connection.evaluableExpression, err = expr.Compile(sanitizedDepExpr) if err != nil { - errStr := fmt.Sprintf("failed to evaluate expression %s: %v", dependencyExpression, err) + errStr := fmt.Sprintf("failed to evaluate expression %s: %v", sanitizedDepExpr, err) connection.Logger.Error(errStr) return nil, fmt.Errorf(errStr) } @@ -161,7 +167,7 @@ func (conn *JetstreamTriggerConn) Subscribe(ctx context.Context, subscriptionIndex++ } - // create a single goroutine which which handle receiving messages to ensure that all of the processing is occurring on that + // create a single goroutine which handles receiving messages to ensure that all of the processing is occurring on that // one goroutine and we don't need to worry about race conditions go conn.processMsgs(ch, processMsgsCloseCh, resetConditionsCh, transform, filter, action, &wg) wg.Add(1) @@ -379,7 +385,7 @@ func (conn *JetstreamTriggerConn) processDependency( log.Infof("Current state of dependencies: %v", parameters) // evaluate the filter expression - result, err := conn.evaluableExpression.Evaluate(parameters) + result, err := expr.Run(conn.evaluableExpression, parameters) if err != nil { errStr := fmt.Sprintf("failed to evaluate dependency expression: %v", err) log.Error(errStr) diff --git a/eventbus/kafka/sensor/kafka_sensor.go b/eventbus/kafka/sensor/kafka_sensor.go index 1f39403701..e02cb813f0 100644 --- a/eventbus/kafka/sensor/kafka_sensor.go +++ b/eventbus/kafka/sensor/kafka_sensor.go @@ -9,12 +9,12 @@ import ( "time" "github.com/IBM/sarama" - "github.com/Knetic/govaluate" eventbuscommon "github.com/argoproj/argo-events/eventbus/common" "github.com/argoproj/argo-events/eventbus/kafka/base" eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/expr-lang/expr" "go.uber.org/zap" ) @@ -172,7 +172,15 @@ func (s *KafkaSensor) Connect(ctx context.Context, triggerName string, depExpres } if _, ok := s.triggers[triggerName]; !ok { - expr, err := govaluate.NewEvaluableExpression(strings.ReplaceAll(depExpression, "-", "\\-")) + sanitizedDepExpr := depExpression + + for i, d := range dependencies { + sanitizedDepName := strings.ReplaceAll(d.Name, "-", "_") + sanitizedDepExpr = strings.ReplaceAll(sanitizedDepExpr, d.Name, sanitizedDepName) + dependencies[i].Name = sanitizedDepName + } + + expr, err := expr.Compile(sanitizedDepExpr) if err != nil { return nil, err } @@ -285,15 +293,8 @@ func (s *KafkaSensor) Event(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMess // can skip ahead to the action topic, otherwise produce to // the trigger topic - var data any - var topic string - if trigger.OneAndDone() { - data = []*cloudevents.Event{event} - topic = s.topics.action - } else { - data = event - topic = s.topics.trigger - } + data := event + topic := s.topics.trigger value, err := json.Marshal(data) if err != nil { diff --git a/eventbus/kafka/sensor/trigger_conn.go b/eventbus/kafka/sensor/trigger_conn.go index cacf973f69..bae76eaa6b 100644 --- a/eventbus/kafka/sensor/trigger_conn.go +++ b/eventbus/kafka/sensor/trigger_conn.go @@ -5,10 +5,10 @@ import ( "fmt" "time" - "github.com/Knetic/govaluate" "github.com/argoproj/argo-events/eventbus/common" "github.com/argoproj/argo-events/eventbus/kafka/base" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/expr-lang/expr/vm" ) type KafkaTriggerConnection struct { @@ -17,7 +17,7 @@ type KafkaTriggerConnection struct { sensorName string triggerName string - depExpression *govaluate.EvaluableExpression + depExpression *vm.Program dependencies map[string]common.Dependency atLeastOnce bool diff --git a/eventbus/kafka/sensor/trigger_handler.go b/eventbus/kafka/sensor/trigger_handler.go index c89f104daf..d91ea117e6 100644 --- a/eventbus/kafka/sensor/trigger_handler.go +++ b/eventbus/kafka/sensor/trigger_handler.go @@ -3,10 +3,10 @@ package kafka import ( "time" - "github.com/Knetic/govaluate" "github.com/argoproj/argo-events/eventbus/common" "github.com/argoproj/argo-events/eventbus/kafka/base" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/expr-lang/expr" "go.uber.org/zap" ) @@ -15,7 +15,6 @@ type KafkaTriggerHandler interface { Name() string Ready() bool Reset() - OneAndDone() bool DependsOn(*cloudevents.Event) (string, bool) Transform(string, *cloudevents.Event) (*cloudevents.Event, error) Filter(string, *cloudevents.Event) bool @@ -42,16 +41,6 @@ func (c *KafkaTriggerConnection) DependsOn(event *cloudevents.Event) (string, bo return "", false } -func (c *KafkaTriggerConnection) OneAndDone() bool { - for _, token := range c.depExpression.Tokens() { - if token.Kind == govaluate.LOGICALOP && token.Value == "&&" { - return false - } - } - - return true -} - func (c *KafkaTriggerConnection) Transform(depName string, event *cloudevents.Event) (*cloudevents.Event, error) { return c.transform(depName, *event) } @@ -140,9 +129,9 @@ func (c *KafkaTriggerConnection) satisfied() (interface{}, error) { } } - c.Logger.Infow("Evaluating", zap.String("expr", c.depExpression.String()), zap.Any("parameters", parameters)) + c.Logger.Infow("Evaluating", zap.String("expr", c.depExpression.Source().Content()), zap.Any("parameters", parameters)) - return c.depExpression.Eval(parameters) + return expr.Run(c.depExpression, parameters) } func (c *KafkaTriggerConnection) Reset() { diff --git a/eventbus/stan/sensor/trigger_conn.go b/eventbus/stan/sensor/trigger_conn.go index 466e0bc511..8f5611df72 100644 --- a/eventbus/stan/sensor/trigger_conn.go +++ b/eventbus/stan/sensor/trigger_conn.go @@ -8,8 +8,9 @@ import ( "sync" "time" - "github.com/Knetic/govaluate" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/expr-lang/expr" + "github.com/expr-lang/expr/vm" "github.com/gobwas/glob" "github.com/nats-io/stan.go" "github.com/nats-io/stan.go/pb" @@ -255,7 +256,7 @@ func (n *STANTriggerConn) processEventSourceMsg(m *stan.Msg, msgHolder *eventSou } } - result, err := msgHolder.expr.Evaluate(msgHolder.parameters) + result, err := expr.Run(msgHolder.expr, msgHolder.parameters) if err != nil { log.Errorf("failed to evaluate dependency expression: %v", err) // TODO: how to handle this situation? @@ -315,7 +316,7 @@ type eventSourceMessageHolder struct { lastResetTime time.Time // if we reach this time, we reset everything (occurs 60 seconds after lastResetTime) resetTimeout int64 - expr *govaluate.EvaluableExpression + expr *vm.Program depNames []string // Mapping of [eventSourceName + eventName]dependencyName sourceDepMap map[string]string @@ -330,14 +331,26 @@ type eventSourceMessageHolder struct { } func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr string, dependencies []eventbuscommon.Dependency, lastResetTime time.Time) (*eventSourceMessageHolder, error) { - dependencyExpr = strings.ReplaceAll(dependencyExpr, "-", "\\-") - expression, err := govaluate.NewEvaluableExpression(dependencyExpr) + sanitizedDepExpr := dependencyExpr + + for i, d := range dependencies { + sanitizedDepName := strings.ReplaceAll(d.Name, "-", "_") + sanitizedDepExpr = strings.ReplaceAll(sanitizedDepExpr, d.Name, sanitizedDepName) + dependencies[i].Name = sanitizedDepName + } + + expression, err := expr.Compile(sanitizedDepExpr) if err != nil { return nil, err } - deps := unique(expression.Vars()) - if len(dependencyExpr) == 0 { - return nil, fmt.Errorf("no dependencies found: %s", dependencyExpr) + deps := []string{} + for _, c := range expression.Constants { + if v, ok := c.(string); ok { + deps = append(deps, v) + } + } + if len(sanitizedDepExpr) == 0 { + return nil, fmt.Errorf("no dependencies found: %s", sanitizedDepExpr) } srcDepMap := make(map[string]string) @@ -447,18 +460,3 @@ func (mh *eventSourceMessageHolder) isCleanedUp() bool { } return len(mh.msgs) == 0 } - -func unique(stringSlice []string) []string { - if len(stringSlice) == 0 { - return stringSlice - } - keys := make(map[string]bool) - list := []string{} - for _, entry := range stringSlice { - if _, value := keys[entry]; !value { - keys[entry] = true - list = append(list, entry) - } - } - return list -} diff --git a/eventsources/eventing.go b/eventsources/eventing.go index 149e5e8a4b..28475c9a97 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -640,7 +640,7 @@ func filterEvent(data []byte, filter *v1alpha1.EventSourceFilter) (bool, error) params := make(map[string]interface{}) for key, value := range dataMap { - params[strings.ReplaceAll(key, "-", "_")] = value + params[key] = value } env := expr.GetFuncMap(params) return expr.EvalBool(filter.Expression, env) diff --git a/eventsources/sources/hdfs/client.go b/eventsources/sources/hdfs/client.go index c1bce92fb0..bd671ea458 100644 --- a/eventsources/sources/hdfs/client.go +++ b/eventsources/sources/hdfs/client.go @@ -3,11 +3,11 @@ package hdfs import ( "fmt" - "github.com/colinmarc/hdfs" - krb "gopkg.in/jcmturner/gokrb5.v5/client" - "gopkg.in/jcmturner/gokrb5.v5/config" - "gopkg.in/jcmturner/gokrb5.v5/credentials" - "gopkg.in/jcmturner/gokrb5.v5/keytab" + "github.com/colinmarc/hdfs/v2" + krb "github.com/jcmturner/gokrb5/v8/client" + "github.com/jcmturner/gokrb5/v8/config" + "github.com/jcmturner/gokrb5/v8/credentials" + "github.com/jcmturner/gokrb5/v8/keytab" corev1 "k8s.io/api/core/v1" "github.com/argoproj/argo-events/common" @@ -31,12 +31,12 @@ type KrbOptions struct { // CCacheOptions is options for ccache type CCacheOptions struct { - CCache credentials.CCache + CCache *credentials.CCache } // KeytabOptions is options for keytab type KeytabOptions struct { - Keytab keytab.Keytab + Keytab *keytab.Keytab Username string Realm string } @@ -74,7 +74,8 @@ func createHDFSConfig(hdfsEventSource *v1alpha1.HDFSEventSource) (*HDFSConfig, e if err != nil { return nil, err } - ccache, err := credentials.ParseCCache(bytes) + ccache := new(credentials.CCache) + err = ccache.Unmarshal(bytes) if err != nil { return nil, err } @@ -91,7 +92,8 @@ func createHDFSConfig(hdfsEventSource *v1alpha1.HDFSEventSource) (*HDFSConfig, e if err != nil { return nil, err } - ktb, err := keytab.Parse(bytes) + ktb := new(keytab.Keytab) + err = ktb.Unmarshal(bytes) if err != nil { return nil, err } @@ -134,25 +136,20 @@ func createHDFSClient(addresses []string, user string, krbOptions *KrbOptions) ( } func createKrbClient(krbOptions *KrbOptions) (*krb.Client, error) { - krbConfig, err := config.NewConfigFromString(krbOptions.Config) + krbConfig, err := config.NewFromString(krbOptions.Config) if err != nil { return nil, err } if krbOptions.CCacheOptions != nil { - client, err := krb.NewClientFromCCache(krbOptions.CCacheOptions.CCache) - if err != nil { - return nil, err - } - return client.WithConfig(krbConfig), nil + return krb.NewFromCCache(krbOptions.CCacheOptions.CCache, krbConfig) } else if krbOptions.KeytabOptions != nil { - client := krb.NewClientWithKeytab(krbOptions.KeytabOptions.Username, krbOptions.KeytabOptions.Realm, krbOptions.KeytabOptions.Keytab) - client = *client.WithConfig(krbConfig) + client := krb.NewWithKeytab(krbOptions.KeytabOptions.Username, krbOptions.KeytabOptions.Realm, krbOptions.KeytabOptions.Keytab, krbConfig) err = client.Login() if err != nil { return nil, err } - return &client, nil + return client, nil } return nil, fmt.Errorf("Failed to get a Kerberos client") diff --git a/eventsources/sources/hdfs/start.go b/eventsources/sources/hdfs/start.go index 0774f7c766..f81005c514 100644 --- a/eventsources/sources/hdfs/start.go +++ b/eventsources/sources/hdfs/start.go @@ -10,7 +10,7 @@ import ( "strings" "time" - "github.com/colinmarc/hdfs" + "github.com/colinmarc/hdfs/v2" "go.uber.org/zap" "github.com/argoproj/argo-events/common/logging" diff --git a/go.mod b/go.mod index 5c073bb01b..e09fa42e9b 100644 --- a/go.mod +++ b/go.mod @@ -15,11 +15,9 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.6.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 github.com/IBM/sarama v1.42.2 - github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible github.com/Masterminds/sprig/v3 v3.2.3 github.com/ahmetb/gen-crd-api-reference-docs v0.3.0 github.com/andygrunwald/go-gerrit v0.0.0-20230325081502-da63a5c62d80 - github.com/antonmedv/expr v1.15.5 github.com/apache/openwhisk-client-go v0.0.0-20190915054138-716c6f973eb2 github.com/apache/pulsar-client-go v0.12.0 github.com/argoproj/notifications-engine v0.4.0 @@ -28,9 +26,10 @@ require ( github.com/blushft/go-diagrams v0.0.0-20201006005127-c78c821223d9 github.com/bradleyfalzon/ghinstallation/v2 v2.9.0 github.com/cloudevents/sdk-go/v2 v2.15.0 - github.com/colinmarc/hdfs v1.1.4-0.20180802165501-48eb8d6c34a9 + github.com/colinmarc/hdfs/v2 v2.4.0 github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/emitter-io/go/v2 v2.0.9 + github.com/expr-lang/expr v1.15.7 github.com/fsnotify/fsnotify v1.7.0 github.com/gavv/httpexpect/v2 v2.16.0 github.com/gfleury/go-bitbucket-v1 v0.0.0-20210707202713-7d616f7c18ac @@ -81,7 +80,6 @@ require ( golang.org/x/crypto v0.19.0 google.golang.org/api v0.163.0 google.golang.org/grpc v1.61.0 - gopkg.in/jcmturner/gokrb5.v5 v5.3.0 k8s.io/api v0.24.3 k8s.io/apimachinery v0.24.3 k8s.io/client-go v0.24.3 @@ -102,6 +100,8 @@ require ( github.com/Azure/azure-amqp-common-go/v4 v4.2.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1 // indirect + github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect + github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect github.com/DataDog/zstd v1.5.0 // indirect github.com/PagerDuty/go-pagerduty v1.6.0 // indirect @@ -112,6 +112,7 @@ require ( github.com/cloudflare/circl v1.3.7 // indirect github.com/cyphar/filepath-securejoin v0.2.4 // indirect github.com/danieljoos/wincred v1.1.2 // indirect + github.com/devigned/tab v0.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/emicklei/go-restful/v3 v3.8.0 // indirect @@ -131,6 +132,8 @@ require ( github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/jcmturner/goidentity/v6 v6.0.1 // indirect + github.com/jpillora/backoff v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/kr/fs v0.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect @@ -180,8 +183,6 @@ require ( github.com/Azure/go-autorest/autorest v0.11.28 // indirect github.com/Azure/go-autorest/autorest/adal v0.9.21 // indirect github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect - github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect - github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect @@ -197,7 +198,6 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cloudfoundry/jibber_jabber v0.0.0-20151120183258-bcc4c8345a21 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/devigned/tab v0.1.1 // indirect github.com/doublerebel/bellows v0.0.0-20160303004610-f177d92a03d3 github.com/dustin/go-humanize v1.0.1 // indirect github.com/eapache/go-resiliency v1.5.0 // indirect @@ -243,12 +243,11 @@ require ( github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect - github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jessevdk/go-flags v1.5.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect @@ -323,10 +322,6 @@ require ( google.golang.org/protobuf v1.32.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect - gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect - gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect - gopkg.in/jcmturner/goidentity.v2 v2.0.0 // indirect - gopkg.in/jcmturner/rpc.v0 v0.0.2 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 6291c1df12..11fe6dfbf9 100644 --- a/go.sum +++ b/go.sum @@ -681,8 +681,6 @@ github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSL github.com/Jeffail/gabs v1.4.0 h1://5fYRRTq1edjfIrQGvdkcd22pkYUrHZ5YC/H2GJVAo= github.com/Jeffail/gabs v1.4.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= -github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= -github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= github.com/Masterminds/semver/v3 v3.2.0 h1:3MEsd0SM6jqZojhjLWWeBY+Kcjy9i6MQAeY7YgDP83g= @@ -730,8 +728,6 @@ github.com/andygrunwald/go-gerrit v0.0.0-20230325081502-da63a5c62d80/go.mod h1:S github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= -github.com/antonmedv/expr v1.15.5 h1:y0Iz3cEwmpRz5/r3w4qQR0MfIqJGdGM1zbhD/v0G5Vg= -github.com/antonmedv/expr v1.15.5/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI= github.com/apache/openwhisk-client-go v0.0.0-20190915054138-716c6f973eb2 h1:mOsBfI/27csXzqNYu7XAf14RPGsRrcXJ8fjaYIhkuVU= @@ -828,8 +824,8 @@ github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoC github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27/go.mod h1:VQx0hjo2oUeQkQUET7wRwradO6f+fN5jzXgB/zROxxE= -github.com/colinmarc/hdfs v1.1.4-0.20180802165501-48eb8d6c34a9 h1:N98Et5DzDoJ1IO1cd8cZkXXT81W5+CR5S8rDU2I0HnM= -github.com/colinmarc/hdfs v1.1.4-0.20180802165501-48eb8d6c34a9/go.mod h1:0DumPviB681UcSuJErAbDIOx6SIaJWj463TymfZG02I= +github.com/colinmarc/hdfs/v2 v2.4.0 h1:v6R8oBx/Wu9fHpdPoJJjpGSUxo8NhHIwrwsfhFvU9W0= +github.com/colinmarc/hdfs/v2 v2.4.0/go.mod h1:0NAO+/3knbMx6+5pCv+Hcbaz4xn/Zzbn9+WIib2rKVI= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= @@ -914,6 +910,8 @@ github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/expr-lang/expr v1.15.7 h1:BK0JcWUkoW6nrbLBo6xCKhz4BvH5DSOOu1Gx5lucyZo= +github.com/expr-lang/expr v1.15.7/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ= github.com/facebookgo/ensure v0.0.0-20160127193407-b4ab57deab51/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64= github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg= github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= @@ -1244,7 +1242,9 @@ github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -2747,16 +2747,6 @@ gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= -gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= -gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= -gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= -gopkg.in/jcmturner/goidentity.v2 v2.0.0 h1:6Bmcdaxb0dD3HyHbo/MtJ2Q1wXLDuZJFwXZmuZvM+zw= -gopkg.in/jcmturner/goidentity.v2 v2.0.0/go.mod h1:vCwK9HeXksMeUmQ4SxDd1tRz4LejrKh3KRVjQWhjvZI= -gopkg.in/jcmturner/gokrb5.v5 v5.3.0 h1:RS1MYApX27Hx1Xw7NECs7XxGxxrm69/4OmaRuX9kwec= -gopkg.in/jcmturner/gokrb5.v5 v5.3.0/go.mod h1:oQz8Wc5GsctOTgCVyKad1Vw4TCWz5G6gfIQr88RPv4k= -gopkg.in/jcmturner/rpc.v0 v0.0.2 h1:wBTgrbL1qmLBUPsYVCqdJiI5aJgQhexmK+JkTHPUNJI= -gopkg.in/jcmturner/rpc.v0 v0.0.2/go.mod h1:NzMq6cRzR9lipgw7WxRBHNx5N8SifBuaCQsOT1kWY/E= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= diff --git a/sensors/dependencies/filter.go b/sensors/dependencies/filter.go index d96341cdd3..213b42d263 100644 --- a/sensors/dependencies/filter.go +++ b/sensors/dependencies/filter.go @@ -27,10 +27,10 @@ import ( "text/template" "time" - "github.com/Knetic/govaluate" "github.com/Masterminds/sprig/v3" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" + "github.com/expr-lang/expr" "github.com/tidwall/gjson" lua "github.com/yuin/gopher-lua" ) @@ -109,7 +109,7 @@ func filterEvent(filter *v1alpha1.EventDependencyFilter, operator v1alpha1.Logic } // filterExpr applies expression based filters against event data -// expression evaluation is based on https://github.com/Knetic/govaluate +// expression evaluation is based on https://github.com/expr-lang/expr // in case "operator input" is equal to v1alpha1.OrLogicalOperator, filters are evaluated as mutual exclusive func filterExpr(filters []v1alpha1.ExprFilter, operator v1alpha1.LogicalOperator, event *v1alpha1.Event) (bool, error) { if filters == nil { @@ -151,7 +151,7 @@ filterExpr: continue } - expr, exprErr := govaluate.NewEvaluableExpression(filter.Expr) + exp, exprErr := expr.Compile(filter.Expr) if exprErr != nil { if operator == v1alpha1.OrLogicalOperator { errMessages = append(errMessages, exprErr.Error()) @@ -161,7 +161,7 @@ filterExpr: } } - result, resErr := expr.Evaluate(parameters) + result, resErr := expr.Run(exp, parameters) if resErr != nil { if operator == v1alpha1.OrLogicalOperator { errMessages = append(errMessages, resErr.Error()) diff --git a/sensors/dependencies/filter_expr_test.go b/sensors/dependencies/filter_expr_test.go index 87a99cd585..5e574c69e1 100644 --- a/sensors/dependencies/filter_expr_test.go +++ b/sensors/dependencies/filter_expr_test.go @@ -205,7 +205,7 @@ func TestFilterExpr(t *testing.T) { }, filters: []v1alpha1.ExprFilter{ { - Expr: `b =~ "start"`, + Expr: `b contains "start"`, Fields: []v1alpha1.PayloadField{ { Path: "a.b", @@ -225,7 +225,7 @@ func TestFilterExpr(t *testing.T) { }, filters: []v1alpha1.ExprFilter{ { - Expr: `b !~ "start"`, + Expr: `b not contains "start"`, Fields: []v1alpha1.PayloadField{ { Path: "a.b", diff --git a/sensors/listener.go b/sensors/listener.go index a8f9f96582..039b369e20 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -24,8 +24,6 @@ import ( "sync/atomic" "time" - "github.com/Knetic/govaluate" - "github.com/antonmedv/expr" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/leaderelection" "github.com/argoproj/argo-events/common/logging" @@ -36,6 +34,7 @@ import ( sensordependencies "github.com/argoproj/argo-events/sensors/dependencies" sensortriggers "github.com/argoproj/argo-events/sensors/triggers" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/expr-lang/expr" cronlib "github.com/robfig/cron/v3" "go.uber.org/ratelimit" "go.uber.org/zap" @@ -106,8 +105,13 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error { sensor := sensorCtx.sensor depMapping := make(map[string]v1alpha1.EventDependency) - for _, d := range sensor.Spec.Dependencies { - depMapping[d.Name] = d + sanitizedDepMap := map[string]string{} + + for i, d := range sensor.Spec.Dependencies { + sanitizedDepName := strings.ReplaceAll(d.Name, "-", "_") + depMapping[sanitizedDepName] = sensor.Spec.Dependencies[i] + sanitizedDepMap[sanitizedDepName] = d.Name + sensor.Spec.Dependencies[i].Name = sanitizedDepName } ctx, cancel := context.WithCancel(ctx) @@ -131,38 +135,39 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error { triggerLogger := logger.With(logging.LabelTriggerName, trigger.Template.Name) defer wg.Done() - depExpression, err := sensorCtx.getDependencyExpression(ctx, trigger) - if err != nil { - triggerLogger.Errorw("failed to get dependency expression", zap.Error(err)) - return + sanitizedDepExpr := sensorCtx.getDependencyExpression(ctx, trigger) + + for _, d := range sensor.Spec.Dependencies { + sanitizedDepExpr = strings.ReplaceAll(sanitizedDepExpr, sanitizedDepMap[d.Name], d.Name) } + // Calculate dependencies of each of the triggers. - de := strings.ReplaceAll(depExpression, "-", "\\-") - expr, err := govaluate.NewEvaluableExpression(de) + expr, err := expr.Compile(sanitizedDepExpr) if err != nil { triggerLogger.Errorw("failed to get new evaluable expression", zap.Error(err)) return } - depNames := unique(expr.Vars()) deps := []eventbuscommon.Dependency{} - for _, depName := range depNames { - dep, ok := depMapping[depName] - if !ok { - triggerLogger.Errorf("Dependency expression and dependency list do not match, %s is not found", depName) - return - } - d := eventbuscommon.Dependency{ - Name: dep.Name, - EventSourceName: dep.EventSourceName, - EventName: dep.EventName, + for _, constant := range expr.Constants { + if depName, ok := constant.(string); ok { + dep, ok := depMapping[depName] + if !ok { + triggerLogger.Errorf("Dependency expression and dependency list do not match, %s is not found", depName) + return + } + d := eventbuscommon.Dependency{ + Name: dep.Name, + EventSourceName: dep.EventSourceName, + EventName: dep.EventName, + } + deps = append(deps, d) } - deps = append(deps, d) } var conn eventbuscommon.TriggerConnection err = common.DoWithRetry(&common.DefaultBackoff, func() error { var err error - conn, err = ebDriver.Connect(ctx, trigger.Template.Name, depExpression, deps, trigger.AtLeastOnce) + conn, err = ebDriver.Connect(ctx, trigger.Template.Name, sanitizedDepExpr, deps, trigger.AtLeastOnce) triggerLogger.Debugf("just created connection %v, %+v", &conn, conn) return err }) @@ -309,7 +314,7 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error { case <-ticker.C: if conn == nil || conn.IsClosed() { triggerLogger.Info("EventBus connection lost, reconnecting...") - conn, err = ebDriver.Connect(ctx, trigger.Template.Name, depExpression, deps, trigger.AtLeastOnce) + conn, err = ebDriver.Connect(ctx, trigger.Template.Name, sanitizedDepExpr, deps, trigger.AtLeastOnce) if err != nil { triggerLogger.Errorw("failed to reconnect to eventbus", zap.Any("connection", conn), zap.Error(err)) continue @@ -435,50 +440,14 @@ func (sensorCtx *SensorContext) triggerOne(ctx context.Context, sensor *v1alpha1 return nil } -func (sensorCtx *SensorContext) getDependencyExpression(ctx context.Context, trigger v1alpha1.Trigger) (string, error) { +func (sensorCtx *SensorContext) getDependencyExpression(ctx context.Context, trigger v1alpha1.Trigger) string { logger := logging.FromContext(ctx) - // Translate original expression which might contain group names - // to an expression only contains dependency names - translate := func(originalExpr string, parameters map[string]string) (string, error) { - originalExpr = strings.ReplaceAll(originalExpr, "&&", " + \"&&\" + ") - originalExpr = strings.ReplaceAll(originalExpr, "||", " + \"||\" + ") - originalExpr = strings.ReplaceAll(originalExpr, "-", "_") - originalExpr = strings.ReplaceAll(originalExpr, "(", "\"(\"+") - originalExpr = strings.ReplaceAll(originalExpr, ")", "+\")\"") - - program, err := expr.Compile(originalExpr, expr.Env(parameters)) - if err != nil { - logger.Errorw("Failed to compile original dependency expression", zap.Error(err)) - return "", err - } - result, err := expr.Run(program, parameters) - if err != nil { - logger.Errorw("Failed to parse original dependency expression", zap.Error(err)) - return "", err - } - newExpr := fmt.Sprintf("%v", result) - newExpr = strings.ReplaceAll(newExpr, "\"(\"", "(") - newExpr = strings.ReplaceAll(newExpr, "\")\"", ")") - return newExpr, nil - } - sensor := sensorCtx.sensor var depExpression string - var err error switch { case trigger.Template.Conditions != "": - conditions := trigger.Template.Conditions - // Add all the dependency and dependency group to the parameter mappings - depGroupMapping := make(map[string]string) - for _, dep := range sensor.Spec.Dependencies { - key := strings.ReplaceAll(dep.Name, "-", "_") - depGroupMapping[key] = dep.Name - } - depExpression, err = translate(conditions, depGroupMapping) - if err != nil { - return "", err - } + depExpression = trigger.Template.Conditions default: deps := []string{} for _, dep := range sensor.Spec.Dependencies { @@ -487,7 +456,7 @@ func (sensorCtx *SensorContext) getDependencyExpression(ctx context.Context, tri depExpression = strings.Join(deps, "&&") } logger.Infof("Dependency expression for trigger %s: %s", trigger.Template.Name, depExpression) - return depExpression, nil + return depExpression } func eventToString(event *v1alpha1.Event) string { @@ -509,18 +478,3 @@ func convertEvent(event cloudevents.Event) *v1alpha1.Event { Data: event.Data(), } } - -func unique(stringSlice []string) []string { - if len(stringSlice) == 0 { - return stringSlice - } - keys := make(map[string]bool) - list := []string{} - for _, entry := range stringSlice { - if _, value := keys[entry]; !value { - keys[entry] = true - list = append(list, entry) - } - } - return list -} diff --git a/sensors/listener_test.go b/sensors/listener_test.go index 2390c3ce2c..ece12bb249 100644 --- a/sensors/listener_test.go +++ b/sensors/listener_test.go @@ -60,8 +60,7 @@ func TestGetDependencyExpression(t *testing.T) { sensorCtx := &SensorContext{ sensor: obj, } - expr, err := sensorCtx.getDependencyExpression(context.Background(), *fakeTrigger) - assert.NoError(t, err) + expr := sensorCtx.getDependencyExpression(context.Background(), *fakeTrigger) assert.Equal(t, "dep1", expr) }) @@ -82,8 +81,8 @@ func TestGetDependencyExpression(t *testing.T) { sensorCtx := &SensorContext{ sensor: obj, } - _, err := sensorCtx.getDependencyExpression(context.Background(), *fakeTrigger) - assert.NoError(t, err) + expr := sensorCtx.getDependencyExpression(context.Background(), *fakeTrigger) + assert.Equal(t, "dep1&&dep2", expr) }) t.Run("get complex expression", func(t *testing.T) { @@ -109,8 +108,8 @@ func TestGetDependencyExpression(t *testing.T) { sensor: obj, } trig := fakeTrigger.DeepCopy() - _, err := sensorCtx.getDependencyExpression(context.Background(), *trig) - assert.NoError(t, err) + expr := sensorCtx.getDependencyExpression(context.Background(), *trig) + assert.Equal(t, "dep1&&dep1a&&dep2", expr) }) t.Run("get conditions expression", func(t *testing.T) { @@ -142,7 +141,7 @@ func TestGetDependencyExpression(t *testing.T) { } trig := fakeTrigger.DeepCopy() trig.Template.Conditions = "dep-1 || dep-1a || dep-3" - _, err := sensorCtx.getDependencyExpression(context.Background(), *trig) - assert.NoError(t, err) + expr := sensorCtx.getDependencyExpression(context.Background(), *trig) + assert.Equal(t, trig.Template.Conditions, expr) }) }