diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 5c8e81b9c3..45a13fd41f 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -6,6 +6,8 @@ on: - main types: - opened + - synchronize + - reopened - labeled - unlabeled jobs: @@ -13,7 +15,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Ensure PR has at least one label - if: ${{ github.event.pull_request.labels[0] == null }} + if: ${{ github.event.pull_request.labels[0] == null }} run: | echo "No labels found: please add at least one label to the PR" exit 1 diff --git a/cli/internal/benthos/inputs/neosync-connection-data.go b/cli/internal/benthos/inputs/neosync-connection-data.go index 13dcbc7780..dda40b13c8 100644 --- a/cli/internal/benthos/inputs/neosync-connection-data.go +++ b/cli/internal/benthos/inputs/neosync-connection-data.go @@ -5,13 +5,13 @@ import ( "sync" "connectrpc.com/connect" - "github.com/benthosdev/benthos/v4/public/service" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" "github.com/nucleuscloud/neosync/cli/internal/auth" auth_interceptor "github.com/nucleuscloud/neosync/cli/internal/connect/interceptors/auth" "github.com/nucleuscloud/neosync/cli/internal/version" http_client "github.com/nucleuscloud/neosync/worker/pkg/http/client" + "github.com/warpstreamlabs/bento/public/service" ) var neosyncConnectionDataConfigSpec = service.NewConfigSpec(). diff --git a/cli/internal/cmds/neosync/sync/sync.go b/cli/internal/cmds/neosync/sync/sync.go index 0f16044ae4..7589db59b4 100644 --- a/cli/internal/cmds/neosync/sync/sync.go +++ b/cli/internal/cmds/neosync/sync/sync.go @@ -35,17 +35,17 @@ import ( "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" - _ "github.com/benthosdev/benthos/v4/public/components/aws" - _ "github.com/benthosdev/benthos/v4/public/components/io" - _ "github.com/benthosdev/benthos/v4/public/components/pure" - _ "github.com/benthosdev/benthos/v4/public/components/pure/extended" - _ "github.com/benthosdev/benthos/v4/public/components/sql" _ "github.com/nucleuscloud/neosync/cli/internal/benthos/inputs" _ "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" + _ "github.com/warpstreamlabs/bento/public/components/aws" + _ "github.com/warpstreamlabs/bento/public/components/io" + _ "github.com/warpstreamlabs/bento/public/components/pure" + _ "github.com/warpstreamlabs/bento/public/components/pure/extended" + _ "github.com/warpstreamlabs/bento/public/components/sql" http_client "github.com/nucleuscloud/neosync/worker/pkg/http/client" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/service" tea "github.com/charmbracelet/bubbletea" ) diff --git a/cli/internal/cmds/neosync/sync/ui.go b/cli/internal/cmds/neosync/sync/ui.go index 421ee672c9..def5546c5c 100644 --- a/cli/internal/cmds/neosync/sync/ui.go +++ b/cli/internal/cmds/neosync/sync/ui.go @@ -9,13 +9,13 @@ import ( "golang.org/x/sync/errgroup" - _ "github.com/benthosdev/benthos/v4/public/components/aws" - _ "github.com/benthosdev/benthos/v4/public/components/io" - _ "github.com/benthosdev/benthos/v4/public/components/pure" - _ "github.com/benthosdev/benthos/v4/public/components/pure/extended" - _ "github.com/benthosdev/benthos/v4/public/components/sql" _ "github.com/nucleuscloud/neosync/cli/internal/benthos/inputs" _ "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" + _ "github.com/warpstreamlabs/bento/public/components/aws" + _ "github.com/warpstreamlabs/bento/public/components/io" + _ "github.com/warpstreamlabs/bento/public/components/pure" + _ "github.com/warpstreamlabs/bento/public/components/pure/extended" + _ "github.com/warpstreamlabs/bento/public/components/sql" "github.com/charmbracelet/bubbles/spinner" tea "github.com/charmbracelet/bubbletea" diff --git a/go.mod b/go.mod index 6cb5ada101..0442163882 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2 github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 github.com/aws/smithy-go v1.20.3 - github.com/benthosdev/benthos/v4 v4.27.0 github.com/charmbracelet/bubbles v0.18.0 github.com/charmbracelet/bubbletea v0.26.6 github.com/charmbracelet/lipgloss v0.12.1 @@ -50,6 +49,7 @@ require ( github.com/testcontainers/testcontainers-go/modules/postgres v0.32.0 github.com/testcontainers/testcontainers-go/modules/redis v0.32.0 github.com/toqueteos/webbrowser v1.2.0 + github.com/warpstreamlabs/bento v1.1.0 github.com/wasilibs/go-pgquery v0.0.0-20240319230125-b9b2e95c69a7 github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 github.com/zeebo/assert v1.3.1 @@ -223,6 +223,7 @@ require ( github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect + github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect @@ -362,7 +363,7 @@ require ( github.com/shirou/gopsutil/v3 v3.23.12 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/shopspring/decimal v1.3.1 // indirect - github.com/sijms/go-ora/v2 v2.8.7 // indirect + github.com/sijms/go-ora/v2 v2.8.19 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/snowflakedb/gosnowflake v1.7.2 // indirect github.com/sourcegraph/conc v0.3.0 // indirect diff --git a/go.sum b/go.sum index 82e8c3e59a..586ca483c3 100644 --- a/go.sum +++ b/go.sum @@ -883,8 +883,6 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLj github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benhoyt/goawk v1.25.0 h1:DW4DCn2IrVp6FUar2W404G1YyQDXseWAVDwb11PUL+I= github.com/benhoyt/goawk v1.25.0/go.mod h1:FjIAicXvrv3wbqAhSTo5bn4mIM5y1iy3lcnIynlJvoI= -github.com/benthosdev/benthos/v4 v4.27.0 h1:Z5FiVvJL/NAu9QveMI7c+DsWDJWxjLjuEzkXjMhJOWw= -github.com/benthosdev/benthos/v4 v4.27.0/go.mod h1:RVwc2qyKEzGU+knon8KMlKGXPLFiJeEA7OIhnesE9wk= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= @@ -1845,8 +1843,8 @@ github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhr github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= -github.com/sijms/go-ora/v2 v2.8.7 h1:lkbCuXqd5/wn8niyJs/qvfTcSAfi8wBbzc5LYz41g5g= -github.com/sijms/go-ora/v2 v2.8.7/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk= +github.com/sijms/go-ora/v2 v2.8.19 h1:7LoKZatDYGi18mkpQTR/gQvG9yOdtc7hPAex96Bqisc= +github.com/sijms/go-ora/v2 v2.8.19/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -1937,6 +1935,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/wI2L/jsondiff v0.4.0 h1:iP56F9tK83eiLttg3YdmEENtZnwlYd3ezEpNNnfZVyM= github.com/wI2L/jsondiff v0.4.0/go.mod h1:nR/vyy1efuDeAtMwc3AF6nZf/2LD1ID8GTyyJ+K8YB0= +github.com/warpstreamlabs/bento v1.1.0 h1:7kc4sdtWjpv9glKG9WI1zM7ywP1LI+LIkL4rTk6PGJQ= +github.com/warpstreamlabs/bento v1.1.0/go.mod h1:rWbBdeYUJ3aKawP6J99fGZAUrTndseKt39AwMFoh/vg= github.com/wasilibs/go-pgquery v0.0.0-20240319230125-b9b2e95c69a7 h1:sqqLVb63En4uTKFKBWSJ7c1aIFonhM1yn35/+KchOf4= github.com/wasilibs/go-pgquery v0.0.0-20240319230125-b9b2e95c69a7/go.mod h1:ZAUjWnxivykc22k0TKFZylOV0WlVQ9nWMExfGFIBuF4= github.com/xanzy/go-gitlab v0.15.0 h1:rWtwKTgEnXyNUGrOArN7yyc3THRkpYcKXIXia9abywQ= diff --git a/worker/pkg/benthos/environment/environment.go b/worker/pkg/benthos/environment/environment.go index 875174a1d1..0f30cc3948 100644 --- a/worker/pkg/benthos/environment/environment.go +++ b/worker/pkg/benthos/environment/environment.go @@ -4,12 +4,12 @@ import ( "errors" "fmt" - "github.com/benthosdev/benthos/v4/public/service" neosync_benthos_error "github.com/nucleuscloud/neosync/worker/pkg/benthos/error" benthos_metrics "github.com/nucleuscloud/neosync/worker/pkg/benthos/metrics" neosync_benthos_mongodb "github.com/nucleuscloud/neosync/worker/pkg/benthos/mongodb" openaigenerate "github.com/nucleuscloud/neosync/worker/pkg/benthos/openai_generate" neosync_benthos_sql "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" + "github.com/warpstreamlabs/bento/public/service" "go.opentelemetry.io/otel/metric" ) diff --git a/worker/pkg/benthos/error/output_error.go b/worker/pkg/benthos/error/output_error.go index 9fd2cb2ceb..2aa5ea58e5 100644 --- a/worker/pkg/benthos/error/output_error.go +++ b/worker/pkg/benthos/error/output_error.go @@ -5,8 +5,8 @@ import ( "errors" "fmt" - "github.com/benthosdev/benthos/v4/public/service" neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos" + "github.com/warpstreamlabs/bento/public/service" ) func errorOutputSpec() *service.ConfigSpec { diff --git a/worker/pkg/benthos/error/output_error_test.go b/worker/pkg/benthos/error/output_error_test.go index 12376ae9f3..11b5ad9de7 100644 --- a/worker/pkg/benthos/error/output_error_test.go +++ b/worker/pkg/benthos/error/output_error_test.go @@ -4,8 +4,8 @@ import ( "context" "testing" - "github.com/benthosdev/benthos/v4/public/service" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/service" ) func Test_ErrorOutputEmptyShutdown(t *testing.T) { diff --git a/worker/pkg/benthos/error/processor_error.go b/worker/pkg/benthos/error/processor_error.go index 9e19231658..da3b5bf392 100644 --- a/worker/pkg/benthos/error/processor_error.go +++ b/worker/pkg/benthos/error/processor_error.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/service" ) func errorProcessorSpec() *service.ConfigSpec { diff --git a/worker/pkg/benthos/error/processor_error_test.go b/worker/pkg/benthos/error/processor_error_test.go index 72f84bfc33..3cc20715a4 100644 --- a/worker/pkg/benthos/error/processor_error_test.go +++ b/worker/pkg/benthos/error/processor_error_test.go @@ -5,8 +5,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/service" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/service" ) func Test_ErrorProcessorEmptyShutdown(t *testing.T) { diff --git a/worker/pkg/benthos/javascript/functions.go b/worker/pkg/benthos/javascript/functions.go index 18c840346a..7f70709aaf 100644 --- a/worker/pkg/benthos/javascript/functions.go +++ b/worker/pkg/benthos/javascript/functions.go @@ -9,8 +9,8 @@ import ( "github.com/dop251/goja" - "github.com/benthosdev/benthos/v4/public/service" "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" + "github.com/warpstreamlabs/bento/public/service" ) type jsFunction func(call goja.FunctionCall, rt *goja.Runtime, l *service.Logger) (any, error) diff --git a/worker/pkg/benthos/javascript/logger.go b/worker/pkg/benthos/javascript/logger.go index 9b43264bd9..ca138ff825 100644 --- a/worker/pkg/benthos/javascript/logger.go +++ b/worker/pkg/benthos/javascript/logger.go @@ -1,6 +1,6 @@ package javascript -import "github.com/benthosdev/benthos/v4/public/service" +import "github.com/warpstreamlabs/bento/public/service" // Logger wraps the service.Logger so that we can define the below methods. type Logger struct { diff --git a/worker/pkg/benthos/javascript/processor.go b/worker/pkg/benthos/javascript/processor.go index 2d2a1e4f6b..64083ec632 100644 --- a/worker/pkg/benthos/javascript/processor.go +++ b/worker/pkg/benthos/javascript/processor.go @@ -17,8 +17,8 @@ import ( "github.com/dop251/goja_nodejs/console" "github.com/dop251/goja_nodejs/require" - "github.com/benthosdev/benthos/v4/public/service" "github.com/nucleuscloud/neosync/worker/pkg/benthos/javascript/ifs" + "github.com/warpstreamlabs/bento/public/service" ) const ( diff --git a/worker/pkg/benthos/javascript/processor_test.go b/worker/pkg/benthos/javascript/processor_test.go index 0de40190ca..4b4a1e3306 100644 --- a/worker/pkg/benthos/javascript/processor_test.go +++ b/worker/pkg/benthos/javascript/processor_test.go @@ -12,7 +12,7 @@ import ( "testing" "time" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/service" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/worker/pkg/benthos/javascript/vm.go b/worker/pkg/benthos/javascript/vm.go index c53d9b43a8..9457012b4b 100644 --- a/worker/pkg/benthos/javascript/vm.go +++ b/worker/pkg/benthos/javascript/vm.go @@ -7,7 +7,7 @@ import ( "github.com/dop251/goja" "github.com/dop251/goja_nodejs/console" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/service" ) type vmRunner struct { diff --git a/worker/pkg/benthos/metrics/otel_metrics.go b/worker/pkg/benthos/metrics/otel_metrics.go index 5a9be5be9b..936496d327 100644 --- a/worker/pkg/benthos/metrics/otel_metrics.go +++ b/worker/pkg/benthos/metrics/otel_metrics.go @@ -3,7 +3,7 @@ package benthos_metrics import ( "context" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/service" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) diff --git a/worker/pkg/benthos/metrics/otel_metrics_test.go b/worker/pkg/benthos/metrics/otel_metrics_test.go index 6ef7684c0e..8c13e53e41 100644 --- a/worker/pkg/benthos/metrics/otel_metrics_test.go +++ b/worker/pkg/benthos/metrics/otel_metrics_test.go @@ -3,8 +3,8 @@ package benthos_metrics import ( "testing" - "github.com/benthosdev/benthos/v4/public/service" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/service" "go.opentelemetry.io/otel/metric" metricsdk "go.opentelemetry.io/otel/sdk/metric" ) diff --git a/worker/pkg/benthos/mongodb/common.go b/worker/pkg/benthos/mongodb/common.go index 939560c6b7..9bb00631b5 100644 --- a/worker/pkg/benthos/mongodb/common.go +++ b/worker/pkg/benthos/mongodb/common.go @@ -6,8 +6,8 @@ import ( "strconv" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/bloblang" + "github.com/warpstreamlabs/bento/public/service" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/writeconcern" @@ -332,7 +332,8 @@ func (w writeMaps) extractFromMessage(operation Operation, i int, batch service. } func extJSONFromMap(b service.MessageBatch, i int, m *bloblang.Executor) (any, error) { - msg, err := b.BloblangQuery(i, m) + executor := b.BloblangExecutor(m) + msg, err := executor.Query(i) if err != nil { return nil, err } diff --git a/worker/pkg/benthos/mongodb/input.go b/worker/pkg/benthos/mongodb/input.go index bca16f92a4..7d421f590f 100644 --- a/worker/pkg/benthos/mongodb/input.go +++ b/worker/pkg/benthos/mongodb/input.go @@ -10,7 +10,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/service" ) // mongodb input component allowed operations. diff --git a/worker/pkg/benthos/mongodb/output.go b/worker/pkg/benthos/mongodb/output.go index 3eb5114979..b0d6b1dda1 100644 --- a/worker/pkg/benthos/mongodb/output.go +++ b/worker/pkg/benthos/mongodb/output.go @@ -9,7 +9,7 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/service" ) const ( diff --git a/worker/pkg/benthos/openai_generate/openai_generate.go b/worker/pkg/benthos/openai_generate/openai_generate.go index e8a43b2536..7b1bffe5c2 100644 --- a/worker/pkg/benthos/openai_generate/openai_generate.go +++ b/worker/pkg/benthos/openai_generate/openai_generate.go @@ -13,7 +13,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/ai/azopenai" "github.com/Azure/azure-sdk-for-go/sdk/azcore" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/service" ) const ( diff --git a/worker/pkg/benthos/redis/client.go b/worker/pkg/benthos/redis/client.go index b95115e640..0408909f75 100644 --- a/worker/pkg/benthos/redis/client.go +++ b/worker/pkg/benthos/redis/client.go @@ -7,7 +7,7 @@ import ( "github.com/redis/go-redis/v9" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/service" ) func clientFields() []*service.ConfigField { diff --git a/worker/pkg/benthos/redis/output_hash.go b/worker/pkg/benthos/redis/output_hash.go index a645d8ff8a..261d52b6c9 100644 --- a/worker/pkg/benthos/redis/output_hash.go +++ b/worker/pkg/benthos/redis/output_hash.go @@ -8,8 +8,8 @@ import ( "github.com/redis/go-redis/v9" - "github.com/benthosdev/benthos/v4/public/bloblang" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/bloblang" + "github.com/warpstreamlabs/bento/public/service" ) const ( diff --git a/worker/pkg/benthos/sql/input_sql_raw.go b/worker/pkg/benthos/sql/input_sql_raw.go index a60b9040a5..1dbf657898 100644 --- a/worker/pkg/benthos/sql/input_sql_raw.go +++ b/worker/pkg/benthos/sql/input_sql_raw.go @@ -7,10 +7,10 @@ import ( "sync" "github.com/Jeffail/shutdown" - "github.com/benthosdev/benthos/v4/public/bloblang" - "github.com/benthosdev/benthos/v4/public/service" mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql" neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos" + "github.com/warpstreamlabs/bento/public/bloblang" + "github.com/warpstreamlabs/bento/public/service" ) func sqlRawInputSpec() *service.ConfigSpec { diff --git a/worker/pkg/benthos/sql/input_sql_raw_test.go b/worker/pkg/benthos/sql/input_sql_raw_test.go index 39d6f6c987..b0d58eace8 100644 --- a/worker/pkg/benthos/sql/input_sql_raw_test.go +++ b/worker/pkg/benthos/sql/input_sql_raw_test.go @@ -4,8 +4,8 @@ import ( "context" "testing" - "github.com/benthosdev/benthos/v4/public/service" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/service" ) func Test_SqlRawInputEmptyShutdown(t *testing.T) { diff --git a/worker/pkg/benthos/sql/output_sql_insert.go b/worker/pkg/benthos/sql/output_sql_insert.go index b046f25e07..2d637a2b9f 100644 --- a/worker/pkg/benthos/sql/output_sql_insert.go +++ b/worker/pkg/benthos/sql/output_sql_insert.go @@ -6,13 +6,13 @@ import ( "sync" "github.com/Jeffail/shutdown" - "github.com/benthosdev/benthos/v4/public/bloblang" - "github.com/benthosdev/benthos/v4/public/service" "github.com/doug-martin/goqu/v9" _ "github.com/doug-martin/goqu/v9/dialect/mysql" _ "github.com/doug-martin/goqu/v9/dialect/postgres" mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql" querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder" + "github.com/warpstreamlabs/bento/public/bloblang" + "github.com/warpstreamlabs/bento/public/service" ) func sqlInsertOutputSpec() *service.ConfigSpec { @@ -205,12 +205,17 @@ func (s *pooledInsertOutput) WriteBatch(ctx context.Context, batch service.Messa return nil } + var executor *service.MessageBatchBloblangExecutor + if s.argsMapping != nil { + executor = batch.BloblangExecutor(s.argsMapping) + } + rows := [][]interface{}{} //nolint:gofmt for i := range batch { if s.argsMapping == nil { continue } - resMsg, err := batch.BloblangQuery(i, s.argsMapping) + resMsg, err := executor.Query(i) if err != nil { return err } diff --git a/worker/pkg/benthos/sql/output_sql_insert_test.go b/worker/pkg/benthos/sql/output_sql_insert_test.go index cbd37a25cb..9719e86bef 100644 --- a/worker/pkg/benthos/sql/output_sql_insert_test.go +++ b/worker/pkg/benthos/sql/output_sql_insert_test.go @@ -4,8 +4,8 @@ import ( "context" "testing" - "github.com/benthosdev/benthos/v4/public/service" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/service" ) func Test_SqlInsertOutputEmptyShutdown(t *testing.T) { diff --git a/worker/pkg/benthos/sql/output_sql_update.go b/worker/pkg/benthos/sql/output_sql_update.go index 87b36eaac9..a732c20312 100644 --- a/worker/pkg/benthos/sql/output_sql_update.go +++ b/worker/pkg/benthos/sql/output_sql_update.go @@ -7,12 +7,12 @@ import ( "sync" "github.com/Jeffail/shutdown" - "github.com/benthosdev/benthos/v4/public/bloblang" - "github.com/benthosdev/benthos/v4/public/service" _ "github.com/doug-martin/goqu/v9/dialect/mysql" _ "github.com/doug-martin/goqu/v9/dialect/postgres" mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql" querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder" + "github.com/warpstreamlabs/bento/public/bloblang" + "github.com/warpstreamlabs/bento/public/service" ) type SqlDbtx interface { @@ -222,11 +222,16 @@ func (s *pooledUpdateOutput) WriteBatch(ctx context.Context, batch service.Messa return nil } + var executor *service.MessageBatchBloblangExecutor + if s.argsMapping != nil { + executor = batch.BloblangExecutor(s.argsMapping) + } + for i := range batch { if s.argsMapping == nil { continue } - resMsg, err := batch.BloblangQuery(i, s.argsMapping) + resMsg, err := executor.Query(i) if err != nil { return err } diff --git a/worker/pkg/benthos/sql/output_sql_update_test.go b/worker/pkg/benthos/sql/output_sql_update_test.go index edc1a25b2f..b50031bbbf 100644 --- a/worker/pkg/benthos/sql/output_sql_update_test.go +++ b/worker/pkg/benthos/sql/output_sql_update_test.go @@ -4,8 +4,8 @@ import ( "context" "testing" - "github.com/benthosdev/benthos/v4/public/service" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/service" ) func Test_SqlUpdateOutputEmptyShutdown(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_bool.go b/worker/pkg/benthos/transformers/generate_bool.go index 24601b7cf5..a9cff58c0f 100644 --- a/worker/pkg/benthos/transformers/generate_bool.go +++ b/worker/pkg/benthos/transformers/generate_bool.go @@ -5,8 +5,8 @@ import ( "math/rand" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateBool diff --git a/worker/pkg/benthos/transformers/generate_bool_test.go b/worker/pkg/benthos/transformers/generate_bool_test.go index 0caeacfd66..1008fbea6b 100644 --- a/worker/pkg/benthos/transformers/generate_bool_test.go +++ b/worker/pkg/benthos/transformers/generate_bool_test.go @@ -4,9 +4,9 @@ import ( "testing" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/nucleuscloud/neosync/worker/pkg/rng" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateRandomBool(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_card_number.go b/worker/pkg/benthos/transformers/generate_card_number.go index 0c75b4618c..8c72b7913c 100644 --- a/worker/pkg/benthos/transformers/generate_card_number.go +++ b/worker/pkg/benthos/transformers/generate_card_number.go @@ -5,8 +5,8 @@ import ( "fmt" "strconv" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateCardNumber diff --git a/worker/pkg/benthos/transformers/generate_card_number_test.go b/worker/pkg/benthos/transformers/generate_card_number_test.go index c31526798a..14ad5f7d8b 100644 --- a/worker/pkg/benthos/transformers/generate_card_number_test.go +++ b/worker/pkg/benthos/transformers/generate_card_number_test.go @@ -4,8 +4,8 @@ import ( "strconv" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateValidLuhnCardNumber(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_categorical.go b/worker/pkg/benthos/transformers/generate_categorical.go index 13979ae93f..3668e294b7 100644 --- a/worker/pkg/benthos/transformers/generate_categorical.go +++ b/worker/pkg/benthos/transformers/generate_categorical.go @@ -5,7 +5,7 @@ import ( "math/rand" "strings" - "github.com/benthosdev/benthos/v4/public/bloblang" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateCategorical diff --git a/worker/pkg/benthos/transformers/generate_categorical_test.go b/worker/pkg/benthos/transformers/generate_categorical_test.go index 40c7dd448b..aef582c343 100644 --- a/worker/pkg/benthos/transformers/generate_categorical_test.go +++ b/worker/pkg/benthos/transformers/generate_categorical_test.go @@ -5,8 +5,8 @@ import ( "strings" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateCategorical(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_city.go b/worker/pkg/benthos/transformers/generate_city.go index 2bf9e9ed9d..70432ace86 100644 --- a/worker/pkg/benthos/transformers/generate_city.go +++ b/worker/pkg/benthos/transformers/generate_city.go @@ -5,9 +5,9 @@ import ( "fmt" "math/rand" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateCity diff --git a/worker/pkg/benthos/transformers/generate_city_test.go b/worker/pkg/benthos/transformers/generate_city_test.go index 9f0407f0a5..ae7add83b1 100644 --- a/worker/pkg/benthos/transformers/generate_city_test.go +++ b/worker/pkg/benthos/transformers/generate_city_test.go @@ -4,8 +4,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) var maxLength = int64(20) diff --git a/worker/pkg/benthos/transformers/generate_email.go b/worker/pkg/benthos/transformers/generate_email.go index bf14ef4af7..cb6214725f 100644 --- a/worker/pkg/benthos/transformers/generate_email.go +++ b/worker/pkg/benthos/transformers/generate_email.go @@ -5,11 +5,11 @@ import ( "fmt" "strings" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/google/uuid" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateEmail diff --git a/worker/pkg/benthos/transformers/generate_email_test.go b/worker/pkg/benthos/transformers/generate_email_test.go index 91cdd8d77d..22910d29c5 100644 --- a/worker/pkg/benthos/transformers/generate_email_test.go +++ b/worker/pkg/benthos/transformers/generate_email_test.go @@ -5,9 +5,9 @@ import ( "math/rand" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateRandomEmailShort(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_first_name.go b/worker/pkg/benthos/transformers/generate_first_name.go index 3c49971e63..9e933c3c66 100644 --- a/worker/pkg/benthos/transformers/generate_first_name.go +++ b/worker/pkg/benthos/transformers/generate_first_name.go @@ -5,10 +5,10 @@ import ( "fmt" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateFirstName diff --git a/worker/pkg/benthos/transformers/generate_first_name_test.go b/worker/pkg/benthos/transformers/generate_first_name_test.go index 82bc0e473f..d5d771eb21 100644 --- a/worker/pkg/benthos/transformers/generate_first_name_test.go +++ b/worker/pkg/benthos/transformers/generate_first_name_test.go @@ -6,10 +6,10 @@ import ( "testing" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateRandomFirstName(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_float.go b/worker/pkg/benthos/transformers/generate_float.go index f40c19f8c3..9ce39fd0c0 100644 --- a/worker/pkg/benthos/transformers/generate_float.go +++ b/worker/pkg/benthos/transformers/generate_float.go @@ -8,9 +8,9 @@ import ( "strings" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateFloat64 diff --git a/worker/pkg/benthos/transformers/generate_float_test.go b/worker/pkg/benthos/transformers/generate_float_test.go index 9f9a39626c..69bdec4f94 100644 --- a/worker/pkg/benthos/transformers/generate_float_test.go +++ b/worker/pkg/benthos/transformers/generate_float_test.go @@ -5,11 +5,11 @@ import ( "testing" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/nucleuscloud/neosync/worker/pkg/rng" "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateRandomFloat(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_full_address.go b/worker/pkg/benthos/transformers/generate_full_address.go index e29a1e2c0f..66a7541f23 100644 --- a/worker/pkg/benthos/transformers/generate_full_address.go +++ b/worker/pkg/benthos/transformers/generate_full_address.go @@ -5,9 +5,9 @@ import ( "fmt" "math/rand" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateFullAddress diff --git a/worker/pkg/benthos/transformers/generate_full_address_test.go b/worker/pkg/benthos/transformers/generate_full_address_test.go index bf1f0e3d81..6f71e349b5 100644 --- a/worker/pkg/benthos/transformers/generate_full_address_test.go +++ b/worker/pkg/benthos/transformers/generate_full_address_test.go @@ -4,8 +4,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) var faMaxLength = int64(40) diff --git a/worker/pkg/benthos/transformers/generate_full_name.go b/worker/pkg/benthos/transformers/generate_full_name.go index 6d019181f3..19978ce469 100644 --- a/worker/pkg/benthos/transformers/generate_full_name.go +++ b/worker/pkg/benthos/transformers/generate_full_name.go @@ -5,10 +5,10 @@ import ( "fmt" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateFullName diff --git a/worker/pkg/benthos/transformers/generate_full_name_test.go b/worker/pkg/benthos/transformers/generate_full_name_test.go index 98a8f643e7..5c63a3b860 100644 --- a/worker/pkg/benthos/transformers/generate_full_name_test.go +++ b/worker/pkg/benthos/transformers/generate_full_name_test.go @@ -5,9 +5,9 @@ import ( "math/rand" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateRandomFullName(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_gender.go b/worker/pkg/benthos/transformers/generate_gender.go index 26acb9d64c..2e4be4830e 100644 --- a/worker/pkg/benthos/transformers/generate_gender.go +++ b/worker/pkg/benthos/transformers/generate_gender.go @@ -4,9 +4,9 @@ import ( "errors" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateGender diff --git a/worker/pkg/benthos/transformers/generate_gender_test.go b/worker/pkg/benthos/transformers/generate_gender_test.go index f816cff9b7..74c7735b61 100644 --- a/worker/pkg/benthos/transformers/generate_gender_test.go +++ b/worker/pkg/benthos/transformers/generate_gender_test.go @@ -4,9 +4,9 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/nucleuscloud/neosync/worker/pkg/rng" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) var maxGenderCharLimit = int64(6) diff --git a/worker/pkg/benthos/transformers/generate_int64.go b/worker/pkg/benthos/transformers/generate_int64.go index 8654405399..42f71c8a75 100644 --- a/worker/pkg/benthos/transformers/generate_int64.go +++ b/worker/pkg/benthos/transformers/generate_int64.go @@ -5,8 +5,8 @@ import ( "fmt" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateInt64 diff --git a/worker/pkg/benthos/transformers/generate_int64_phone_number.go b/worker/pkg/benthos/transformers/generate_int64_phone_number.go index bf1fe49039..008ec337f7 100644 --- a/worker/pkg/benthos/transformers/generate_int64_phone_number.go +++ b/worker/pkg/benthos/transformers/generate_int64_phone_number.go @@ -3,9 +3,9 @@ package transformers import ( "fmt" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateInt64PhoneNumber diff --git a/worker/pkg/benthos/transformers/generate_int64_phone_number_test.go b/worker/pkg/benthos/transformers/generate_int64_phone_number_test.go index c9ee4ad8dd..c74f779400 100644 --- a/worker/pkg/benthos/transformers/generate_int64_phone_number_test.go +++ b/worker/pkg/benthos/transformers/generate_int64_phone_number_test.go @@ -4,8 +4,8 @@ import ( "strconv" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateRandomIntPhoneNumber(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_int64_test.go b/worker/pkg/benthos/transformers/generate_int64_test.go index eefbfa2aff..109451c2cb 100644 --- a/worker/pkg/benthos/transformers/generate_int64_test.go +++ b/worker/pkg/benthos/transformers/generate_int64_test.go @@ -4,9 +4,9 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateRandomInt(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_international_phone_number.go b/worker/pkg/benthos/transformers/generate_international_phone_number.go index b7eb2aef4a..babad4579d 100644 --- a/worker/pkg/benthos/transformers/generate_international_phone_number.go +++ b/worker/pkg/benthos/transformers/generate_international_phone_number.go @@ -5,8 +5,8 @@ import ( "fmt" "strings" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateInternationalPhoneNumber diff --git a/worker/pkg/benthos/transformers/generate_international_phone_number_test.go b/worker/pkg/benthos/transformers/generate_international_phone_number_test.go index 4b6379add7..77657847c6 100644 --- a/worker/pkg/benthos/transformers/generate_international_phone_number_test.go +++ b/worker/pkg/benthos/transformers/generate_international_phone_number_test.go @@ -4,8 +4,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateInternationalPhoneNumber(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_last_name.go b/worker/pkg/benthos/transformers/generate_last_name.go index 97ad0334c2..7ba0c5ccc2 100644 --- a/worker/pkg/benthos/transformers/generate_last_name.go +++ b/worker/pkg/benthos/transformers/generate_last_name.go @@ -5,10 +5,10 @@ import ( "fmt" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateLastName diff --git a/worker/pkg/benthos/transformers/generate_last_name_test.go b/worker/pkg/benthos/transformers/generate_last_name_test.go index 0093c5cd7a..25421cbb1c 100644 --- a/worker/pkg/benthos/transformers/generate_last_name_test.go +++ b/worker/pkg/benthos/transformers/generate_last_name_test.go @@ -6,10 +6,10 @@ import ( "testing" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateRandomLastName(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_random_string.go b/worker/pkg/benthos/transformers/generate_random_string.go index cfd97f694a..491fbd2ed0 100644 --- a/worker/pkg/benthos/transformers/generate_random_string.go +++ b/worker/pkg/benthos/transformers/generate_random_string.go @@ -4,8 +4,8 @@ import ( "errors" "fmt" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateRandomString diff --git a/worker/pkg/benthos/transformers/generate_random_string_test.go b/worker/pkg/benthos/transformers/generate_random_string_test.go index 057671e960..c1957519df 100644 --- a/worker/pkg/benthos/transformers/generate_random_string_test.go +++ b/worker/pkg/benthos/transformers/generate_random_string_test.go @@ -4,8 +4,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateRandomStringTransformerWithValue(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_sha256hash.go b/worker/pkg/benthos/transformers/generate_sha256hash.go index d113c266c7..a2515869a1 100644 --- a/worker/pkg/benthos/transformers/generate_sha256hash.go +++ b/worker/pkg/benthos/transformers/generate_sha256hash.go @@ -5,8 +5,8 @@ import ( "encoding/hex" "fmt" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/google/uuid" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateSHA256Hash diff --git a/worker/pkg/benthos/transformers/generate_sha256hash_test.go b/worker/pkg/benthos/transformers/generate_sha256hash_test.go index d11cc153e2..a210afab50 100644 --- a/worker/pkg/benthos/transformers/generate_sha256hash_test.go +++ b/worker/pkg/benthos/transformers/generate_sha256hash_test.go @@ -3,8 +3,8 @@ package transformers import ( "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateSHA256Hash(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_ssn.go b/worker/pkg/benthos/transformers/generate_ssn.go index cb65c83fac..e6b231913c 100644 --- a/worker/pkg/benthos/transformers/generate_ssn.go +++ b/worker/pkg/benthos/transformers/generate_ssn.go @@ -5,8 +5,8 @@ import ( "fmt" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateSSN diff --git a/worker/pkg/benthos/transformers/generate_ssn_test.go b/worker/pkg/benthos/transformers/generate_ssn_test.go index fcef0c1289..c4fc1ffb3a 100644 --- a/worker/pkg/benthos/transformers/generate_ssn_test.go +++ b/worker/pkg/benthos/transformers/generate_ssn_test.go @@ -4,9 +4,9 @@ import ( "regexp" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/nucleuscloud/neosync/worker/pkg/rng" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateSSN(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_state.go b/worker/pkg/benthos/transformers/generate_state.go index 14989d4bd2..35380471e0 100644 --- a/worker/pkg/benthos/transformers/generate_state.go +++ b/worker/pkg/benthos/transformers/generate_state.go @@ -3,8 +3,8 @@ package transformers import ( "math/rand" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateState diff --git a/worker/pkg/benthos/transformers/generate_state_test.go b/worker/pkg/benthos/transformers/generate_state_test.go index ec5e7eada3..1c5211187c 100644 --- a/worker/pkg/benthos/transformers/generate_state_test.go +++ b/worker/pkg/benthos/transformers/generate_state_test.go @@ -3,9 +3,9 @@ package transformers import ( "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateState(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_street_address.go b/worker/pkg/benthos/transformers/generate_street_address.go index cbd6ed512b..4d6aa97257 100644 --- a/worker/pkg/benthos/transformers/generate_street_address.go +++ b/worker/pkg/benthos/transformers/generate_street_address.go @@ -5,9 +5,9 @@ import ( "fmt" "math/rand" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateStreetAddress diff --git a/worker/pkg/benthos/transformers/generate_street_address_test.go b/worker/pkg/benthos/transformers/generate_street_address_test.go index 80753527cd..569a639249 100644 --- a/worker/pkg/benthos/transformers/generate_street_address_test.go +++ b/worker/pkg/benthos/transformers/generate_street_address_test.go @@ -4,8 +4,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateStreetAddress(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_string_phone_number.go b/worker/pkg/benthos/transformers/generate_string_phone_number.go index ff5ba6d402..ed0e60178c 100644 --- a/worker/pkg/benthos/transformers/generate_string_phone_number.go +++ b/worker/pkg/benthos/transformers/generate_string_phone_number.go @@ -4,8 +4,8 @@ import ( "errors" "fmt" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateStringPhoneNumber diff --git a/worker/pkg/benthos/transformers/generate_string_phone_number_test.go b/worker/pkg/benthos/transformers/generate_string_phone_number_test.go index 3d6f7d9a95..63d5301c2c 100644 --- a/worker/pkg/benthos/transformers/generate_string_phone_number_test.go +++ b/worker/pkg/benthos/transformers/generate_string_phone_number_test.go @@ -4,8 +4,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateStringPhoneNumber(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_unix_timestamp.go b/worker/pkg/benthos/transformers/generate_unix_timestamp.go index 59e813a575..3b6d53a9d1 100644 --- a/worker/pkg/benthos/transformers/generate_unix_timestamp.go +++ b/worker/pkg/benthos/transformers/generate_unix_timestamp.go @@ -6,7 +6,7 @@ import ( "math/big" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateUnixTimestamp diff --git a/worker/pkg/benthos/transformers/generate_unix_timestamp_test.go b/worker/pkg/benthos/transformers/generate_unix_timestamp_test.go index 92155f1545..0537a5a7fd 100644 --- a/worker/pkg/benthos/transformers/generate_unix_timestamp_test.go +++ b/worker/pkg/benthos/transformers/generate_unix_timestamp_test.go @@ -3,8 +3,8 @@ package transformers import ( "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateUnixTimestamp(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_username.go b/worker/pkg/benthos/transformers/generate_username.go index b09e24cd88..12dd14a89c 100644 --- a/worker/pkg/benthos/transformers/generate_username.go +++ b/worker/pkg/benthos/transformers/generate_username.go @@ -6,9 +6,9 @@ import ( "strings" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateUsername diff --git a/worker/pkg/benthos/transformers/generate_username_test.go b/worker/pkg/benthos/transformers/generate_username_test.go index fb617259b4..fc86c487b5 100644 --- a/worker/pkg/benthos/transformers/generate_username_test.go +++ b/worker/pkg/benthos/transformers/generate_username_test.go @@ -6,8 +6,8 @@ import ( "testing" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateUsername(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_utc_timestamp.go b/worker/pkg/benthos/transformers/generate_utc_timestamp.go index b2d6f2f5eb..30d6f877e3 100644 --- a/worker/pkg/benthos/transformers/generate_utc_timestamp.go +++ b/worker/pkg/benthos/transformers/generate_utc_timestamp.go @@ -6,7 +6,7 @@ import ( "math/big" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateUTCTimestamp diff --git a/worker/pkg/benthos/transformers/generate_utc_timestamp_test.go b/worker/pkg/benthos/transformers/generate_utc_timestamp_test.go index 41fdc5678d..e23cb37c59 100644 --- a/worker/pkg/benthos/transformers/generate_utc_timestamp_test.go +++ b/worker/pkg/benthos/transformers/generate_utc_timestamp_test.go @@ -4,8 +4,8 @@ import ( "testing" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_ProcessUTCTimestamp(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_uuid.go b/worker/pkg/benthos/transformers/generate_uuid.go index fe0600aa32..35c17c0f7c 100644 --- a/worker/pkg/benthos/transformers/generate_uuid.go +++ b/worker/pkg/benthos/transformers/generate_uuid.go @@ -6,7 +6,7 @@ import ( "github.com/google/uuid" - "github.com/benthosdev/benthos/v4/public/bloblang" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateUUID diff --git a/worker/pkg/benthos/transformers/generate_uuid_test.go b/worker/pkg/benthos/transformers/generate_uuid_test.go index 592eceaa49..e4daa720ad 100644 --- a/worker/pkg/benthos/transformers/generate_uuid_test.go +++ b/worker/pkg/benthos/transformers/generate_uuid_test.go @@ -4,9 +4,9 @@ import ( "strings" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateUuidPreserveHyphhensTrue(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/generate_zipcode.go b/worker/pkg/benthos/transformers/generate_zipcode.go index ba100875d4..95cf851ec5 100644 --- a/worker/pkg/benthos/transformers/generate_zipcode.go +++ b/worker/pkg/benthos/transformers/generate_zipcode.go @@ -3,8 +3,8 @@ package transformers import ( "math/rand" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:generate:generateZipcode diff --git a/worker/pkg/benthos/transformers/generate_zipcode_test.go b/worker/pkg/benthos/transformers/generate_zipcode_test.go index 057c7ca18f..a1cb707d6f 100644 --- a/worker/pkg/benthos/transformers/generate_zipcode_test.go +++ b/worker/pkg/benthos/transformers/generate_zipcode_test.go @@ -3,9 +3,9 @@ package transformers import ( "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_GenerateZipcode(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/transform_character_scramble.go b/worker/pkg/benthos/transformers/transform_character_scramble.go index 770bf0dba9..3ca541e35f 100644 --- a/worker/pkg/benthos/transformers/transform_character_scramble.go +++ b/worker/pkg/benthos/transformers/transform_character_scramble.go @@ -7,8 +7,8 @@ import ( "strings" "unicode" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformCharacterScramble diff --git a/worker/pkg/benthos/transformers/transform_character_scramble_test.go b/worker/pkg/benthos/transformers/transform_character_scramble_test.go index 2304462f51..bafe9dd616 100644 --- a/worker/pkg/benthos/transformers/transform_character_scramble_test.go +++ b/worker/pkg/benthos/transformers/transform_character_scramble_test.go @@ -6,9 +6,9 @@ import ( "testing" "unicode" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) var helloWorldRegex = "ell" diff --git a/worker/pkg/benthos/transformers/transform_e164_phone_number.go b/worker/pkg/benthos/transformers/transform_e164_phone_number.go index a59d4b0838..c75d07a48d 100644 --- a/worker/pkg/benthos/transformers/transform_e164_phone_number.go +++ b/worker/pkg/benthos/transformers/transform_e164_phone_number.go @@ -5,8 +5,8 @@ import ( "fmt" "strings" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformE164PhoneNumber diff --git a/worker/pkg/benthos/transformers/transform_e164_phone_number_test.go b/worker/pkg/benthos/transformers/transform_e164_phone_number_test.go index eec47028ba..062f5d8f2b 100644 --- a/worker/pkg/benthos/transformers/transform_e164_phone_number_test.go +++ b/worker/pkg/benthos/transformers/transform_e164_phone_number_test.go @@ -4,8 +4,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) var testE164Phone = "+13782983927" diff --git a/worker/pkg/benthos/transformers/transform_email.go b/worker/pkg/benthos/transformers/transform_email.go index 71ddb7f2ff..ee179fc3fb 100644 --- a/worker/pkg/benthos/transformers/transform_email.go +++ b/worker/pkg/benthos/transformers/transform_email.go @@ -7,10 +7,10 @@ import ( "net/mail" "strings" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/google/uuid" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformEmail diff --git a/worker/pkg/benthos/transformers/transform_email_test.go b/worker/pkg/benthos/transformers/transform_email_test.go index c96c79e15a..cfa0a0280d 100644 --- a/worker/pkg/benthos/transformers/transform_email_test.go +++ b/worker/pkg/benthos/transformers/transform_email_test.go @@ -9,9 +9,9 @@ import ( "testing" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" ) var email = "evis@gmail.com" diff --git a/worker/pkg/benthos/transformers/transform_first_name.go b/worker/pkg/benthos/transformers/transform_first_name.go index 18df98fb74..41987db5a2 100644 --- a/worker/pkg/benthos/transformers/transform_first_name.go +++ b/worker/pkg/benthos/transformers/transform_first_name.go @@ -4,9 +4,9 @@ import ( "errors" "fmt" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformFirstName diff --git a/worker/pkg/benthos/transformers/transform_first_name_test.go b/worker/pkg/benthos/transformers/transform_first_name_test.go index 03945e01a0..6442e272d3 100644 --- a/worker/pkg/benthos/transformers/transform_first_name_test.go +++ b/worker/pkg/benthos/transformers/transform_first_name_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) var name = "evis" diff --git a/worker/pkg/benthos/transformers/transform_float.go b/worker/pkg/benthos/transformers/transform_float.go index b308d5912b..9cee1bbf00 100644 --- a/worker/pkg/benthos/transformers/transform_float.go +++ b/worker/pkg/benthos/transformers/transform_float.go @@ -7,9 +7,9 @@ import ( "strconv" "sync" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformFloat64 diff --git a/worker/pkg/benthos/transformers/transform_float_test.go b/worker/pkg/benthos/transformers/transform_float_test.go index b972b3db50..91d5006d7a 100644 --- a/worker/pkg/benthos/transformers/transform_float_test.go +++ b/worker/pkg/benthos/transformers/transform_float_test.go @@ -5,10 +5,10 @@ import ( "testing" "time" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/nucleuscloud/neosync/worker/pkg/rng" "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_TransformFloat64InRange(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/transform_full_name.go b/worker/pkg/benthos/transformers/transform_full_name.go index ff9ff39a2a..2eb0d32b39 100644 --- a/worker/pkg/benthos/transformers/transform_full_name.go +++ b/worker/pkg/benthos/transformers/transform_full_name.go @@ -5,9 +5,9 @@ import ( "fmt" "strings" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformFullName diff --git a/worker/pkg/benthos/transformers/transform_full_name_test.go b/worker/pkg/benthos/transformers/transform_full_name_test.go index 0078ef5133..15dc4642d7 100644 --- a/worker/pkg/benthos/transformers/transform_full_name_test.go +++ b/worker/pkg/benthos/transformers/transform_full_name_test.go @@ -5,8 +5,8 @@ import ( "math/rand" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) var fullName = "john smith" diff --git a/worker/pkg/benthos/transformers/transform_int64.go b/worker/pkg/benthos/transformers/transform_int64.go index 090839de64..97a485e56d 100644 --- a/worker/pkg/benthos/transformers/transform_int64.go +++ b/worker/pkg/benthos/transformers/transform_int64.go @@ -4,8 +4,8 @@ import ( "errors" "fmt" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformInt64 diff --git a/worker/pkg/benthos/transformers/transform_int64_phone_number.go b/worker/pkg/benthos/transformers/transform_int64_phone_number.go index 4403aeebcd..d8ed831beb 100644 --- a/worker/pkg/benthos/transformers/transform_int64_phone_number.go +++ b/worker/pkg/benthos/transformers/transform_int64_phone_number.go @@ -4,9 +4,9 @@ import ( "errors" "fmt" - "github.com/benthosdev/benthos/v4/public/bloblang" transformers_dataset "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/data-sets" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformInt64PhoneNumber diff --git a/worker/pkg/benthos/transformers/transform_int64_phone_number_test.go b/worker/pkg/benthos/transformers/transform_int64_phone_number_test.go index 2250cbd060..564c13c293 100644 --- a/worker/pkg/benthos/transformers/transform_int64_phone_number_test.go +++ b/worker/pkg/benthos/transformers/transform_int64_phone_number_test.go @@ -6,9 +6,9 @@ import ( "strings" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) var testValue = int64(8384928322) diff --git a/worker/pkg/benthos/transformers/transform_int64_test.go b/worker/pkg/benthos/transformers/transform_int64_test.go index 8f9d709c79..e89b423738 100644 --- a/worker/pkg/benthos/transformers/transform_int64_test.go +++ b/worker/pkg/benthos/transformers/transform_int64_test.go @@ -4,8 +4,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_TransformIntInRange(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/transform_lastname.go b/worker/pkg/benthos/transformers/transform_lastname.go index b8f90ada27..b2a1f3c4fb 100644 --- a/worker/pkg/benthos/transformers/transform_lastname.go +++ b/worker/pkg/benthos/transformers/transform_lastname.go @@ -4,9 +4,9 @@ import ( "errors" "fmt" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" "github.com/nucleuscloud/neosync/worker/pkg/rng" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformLastName diff --git a/worker/pkg/benthos/transformers/transform_lastname_test.go b/worker/pkg/benthos/transformers/transform_lastname_test.go index 348b9d9549..4297a60dc4 100644 --- a/worker/pkg/benthos/transformers/transform_lastname_test.go +++ b/worker/pkg/benthos/transformers/transform_lastname_test.go @@ -6,8 +6,8 @@ import ( "math/rand" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) func Test_TranformLastNameEmptyName(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/transform_string.go b/worker/pkg/benthos/transformers/transform_string.go index 23f4ddf989..2d698562ee 100644 --- a/worker/pkg/benthos/transformers/transform_string.go +++ b/worker/pkg/benthos/transformers/transform_string.go @@ -4,8 +4,8 @@ import ( "errors" "fmt" - "github.com/benthosdev/benthos/v4/public/bloblang" transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformString diff --git a/worker/pkg/benthos/transformers/transform_string_phone_number.go b/worker/pkg/benthos/transformers/transform_string_phone_number.go index 7b4c4eb0c8..efa262b328 100644 --- a/worker/pkg/benthos/transformers/transform_string_phone_number.go +++ b/worker/pkg/benthos/transformers/transform_string_phone_number.go @@ -4,7 +4,7 @@ import ( "errors" "fmt" - "github.com/benthosdev/benthos/v4/public/bloblang" + "github.com/warpstreamlabs/bento/public/bloblang" ) // +neosyncTransformerBuilder:transform:transformStringPhoneNumber diff --git a/worker/pkg/benthos/transformers/transform_string_phone_number_test.go b/worker/pkg/benthos/transformers/transform_string_phone_number_test.go index 561de00046..1fe1a5430f 100644 --- a/worker/pkg/benthos/transformers/transform_string_phone_number_test.go +++ b/worker/pkg/benthos/transformers/transform_string_phone_number_test.go @@ -4,8 +4,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) var testPhone = "1234567890" diff --git a/worker/pkg/benthos/transformers/transform_string_test.go b/worker/pkg/benthos/transformers/transform_string_test.go index 2ce107ac3e..b7a4b4287f 100644 --- a/worker/pkg/benthos/transformers/transform_string_test.go +++ b/worker/pkg/benthos/transformers/transform_string_test.go @@ -4,8 +4,8 @@ import ( "fmt" "testing" - "github.com/benthosdev/benthos/v4/public/bloblang" "github.com/stretchr/testify/assert" + "github.com/warpstreamlabs/bento/public/bloblang" ) var testStringValue = "hello" diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go index 45db6c9e4b..2e59e4958f 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go @@ -8,8 +8,6 @@ import ( "testing" "connectrpc.com/connect" - "github.com/benthosdev/benthos/v4/public/bloblang" - "github.com/benthosdev/benthos/v4/public/service" db_queries "github.com/nucleuscloud/neosync/backend/gen/go/db" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" @@ -20,21 +18,23 @@ import ( "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/bloblang" + "github.com/warpstreamlabs/bento/public/service" "gopkg.in/yaml.v3" - _ "github.com/benthosdev/benthos/v4/public/components/aws" - _ "github.com/benthosdev/benthos/v4/public/components/io" + _ "github.com/warpstreamlabs/bento/public/components/aws" + _ "github.com/warpstreamlabs/bento/public/components/io" - _ "github.com/benthosdev/benthos/v4/public/components/pure" - _ "github.com/benthosdev/benthos/v4/public/components/pure/extended" - _ "github.com/benthosdev/benthos/v4/public/components/redis" - _ "github.com/benthosdev/benthos/v4/public/components/sql" neosync_benthos_error "github.com/nucleuscloud/neosync/worker/pkg/benthos/error" _ "github.com/nucleuscloud/neosync/worker/pkg/benthos/javascript" benthos_metrics "github.com/nucleuscloud/neosync/worker/pkg/benthos/metrics" _ "github.com/nucleuscloud/neosync/worker/pkg/benthos/redis" neosync_benthos_sql "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" _ "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" + _ "github.com/warpstreamlabs/bento/public/components/pure" + _ "github.com/warpstreamlabs/bento/public/components/pure/extended" + _ "github.com/warpstreamlabs/bento/public/components/redis" + _ "github.com/warpstreamlabs/bento/public/components/sql" neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos" ) diff --git a/worker/pkg/workflows/datasync/activities/sync/activity.go b/worker/pkg/workflows/datasync/activities/sync/activity.go index a72cfcaf4a..1387116b62 100644 --- a/worker/pkg/workflows/datasync/activities/sync/activity.go +++ b/worker/pkg/workflows/datasync/activities/sync/activity.go @@ -7,16 +7,16 @@ import ( "sync" "time" - _ "github.com/benthosdev/benthos/v4/public/components/aws" - _ "github.com/benthosdev/benthos/v4/public/components/gcp" - _ "github.com/benthosdev/benthos/v4/public/components/io" - - _ "github.com/benthosdev/benthos/v4/public/components/mongodb" - _ "github.com/benthosdev/benthos/v4/public/components/pure" - _ "github.com/benthosdev/benthos/v4/public/components/pure/extended" - _ "github.com/benthosdev/benthos/v4/public/components/redis" - _ "github.com/benthosdev/benthos/v4/public/components/sql" + _ "github.com/warpstreamlabs/bento/public/components/aws" + _ "github.com/warpstreamlabs/bento/public/components/gcp" + _ "github.com/warpstreamlabs/bento/public/components/io" + _ "github.com/nucleuscloud/neosync/worker/pkg/benthos/javascript" + _ "github.com/warpstreamlabs/bento/public/components/mongodb" + _ "github.com/warpstreamlabs/bento/public/components/pure" + _ "github.com/warpstreamlabs/bento/public/components/pure/extended" + _ "github.com/warpstreamlabs/bento/public/components/redis" + _ "github.com/warpstreamlabs/bento/public/components/sql" neosynclogger "github.com/nucleuscloud/neosync/backend/pkg/logger" connectiontunnelmanager "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager" @@ -105,6 +105,11 @@ func (a *Activity) getTunnelManagerByRunId(wfId, runId string) (connectiontunnel return manager, nil } +var ( + // Hack that locks the instanced bento stream builder build step that causes data races if done in parallel + streamBuilderMu sync.Mutex +) + // Temporal activity that runs benthos and syncs a source connection to one or more destination connections func (a *Activity) Sync(ctx context.Context, req *SyncRequest, metadata *SyncMetadata) (*SyncResponse, error) { session := uuid.NewString() @@ -238,6 +243,7 @@ func (a *Activity) Sync(ctx context.Context, req *SyncRequest, metadata *SyncMet envKeyMap["TEMPORAL_WORKFLOW_ID"] = info.WorkflowExecution.ID envKeyMap["TEMPORAL_RUN_ID"] = info.WorkflowExecution.RunID + streamBuilderMu.Lock() streambldr := benthosenv.NewStreamBuilder() // would ideally use the activity logger here but can't convert it into a slog. streambldr.SetLogger(slogger.With( @@ -249,10 +255,12 @@ func (a *Activity) Sync(ctx context.Context, req *SyncRequest, metadata *SyncMet err = streambldr.SetYAML(req.BenthosConfig) if err != nil { + streamBuilderMu.Unlock() return nil, fmt.Errorf("unable to convert benthos config to yaml for stream builder: %w", err) } stream, err := a.benthosStreamManager.NewBenthosStreamFromBuilder(streambldr) + streamBuilderMu.Unlock() if err != nil { return nil, fmt.Errorf("unable to build benthos config: %w", err) } diff --git a/worker/pkg/workflows/datasync/activities/sync/benthos-stream.go b/worker/pkg/workflows/datasync/activities/sync/benthos-stream.go index cb8ec3cf26..8a995cc01d 100644 --- a/worker/pkg/workflows/datasync/activities/sync/benthos-stream.go +++ b/worker/pkg/workflows/datasync/activities/sync/benthos-stream.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/benthosdev/benthos/v4/public/service" + "github.com/warpstreamlabs/bento/public/service" ) type BenthosStreamClient interface { diff --git a/worker/pkg/workflows/datasync/activities/sync/mock_BenthosStreamManagerClient.go b/worker/pkg/workflows/datasync/activities/sync/mock_BenthosStreamManagerClient.go index 2cbe5ba78e..29963c58d5 100644 --- a/worker/pkg/workflows/datasync/activities/sync/mock_BenthosStreamManagerClient.go +++ b/worker/pkg/workflows/datasync/activities/sync/mock_BenthosStreamManagerClient.go @@ -3,7 +3,7 @@ package sync_activity import ( - service "github.com/benthosdev/benthos/v4/public/service" + service "github.com/warpstreamlabs/bento/public/service" mock "github.com/stretchr/testify/mock" )