Skip to content

Commit

Permalink
Fixes Benthos Data Race for Parallel Activities, Replaces Benthos wit…
Browse files Browse the repository at this point in the history
…h Bento (#2322)
  • Loading branch information
nickzelei authored Jul 19, 2024
1 parent ace6b2d commit 204e371
Show file tree
Hide file tree
Showing 106 changed files with 164 additions and 142 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ on:
- main
types:
- opened
- synchronize
- reopened
- labeled
- unlabeled
jobs:
check_labels:
runs-on: ubuntu-latest
steps:
- name: Ensure PR has at least one label
if: ${{ github.event.pull_request.labels[0] == null }}
if: ${{ github.event.pull_request.labels[0] == null }}
run: |
echo "No labels found: please add at least one label to the PR"
exit 1
Expand Down
2 changes: 1 addition & 1 deletion cli/internal/benthos/inputs/neosync-connection-data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"sync"

"connectrpc.com/connect"
"github.com/benthosdev/benthos/v4/public/service"
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
"github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect"
"github.com/nucleuscloud/neosync/cli/internal/auth"
auth_interceptor "github.com/nucleuscloud/neosync/cli/internal/connect/interceptors/auth"
"github.com/nucleuscloud/neosync/cli/internal/version"
http_client "github.com/nucleuscloud/neosync/worker/pkg/http/client"
"github.com/warpstreamlabs/bento/public/service"
)

var neosyncConnectionDataConfigSpec = service.NewConfigSpec().
Expand Down
12 changes: 6 additions & 6 deletions cli/internal/cmds/neosync/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ import (
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v2"

_ "github.com/benthosdev/benthos/v4/public/components/aws"
_ "github.com/benthosdev/benthos/v4/public/components/io"
_ "github.com/benthosdev/benthos/v4/public/components/pure"
_ "github.com/benthosdev/benthos/v4/public/components/pure/extended"
_ "github.com/benthosdev/benthos/v4/public/components/sql"
_ "github.com/nucleuscloud/neosync/cli/internal/benthos/inputs"
_ "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql"
_ "github.com/warpstreamlabs/bento/public/components/aws"
_ "github.com/warpstreamlabs/bento/public/components/io"
_ "github.com/warpstreamlabs/bento/public/components/pure"
_ "github.com/warpstreamlabs/bento/public/components/pure/extended"
_ "github.com/warpstreamlabs/bento/public/components/sql"

http_client "github.com/nucleuscloud/neosync/worker/pkg/http/client"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/service"

tea "github.com/charmbracelet/bubbletea"
)
Expand Down
10 changes: 5 additions & 5 deletions cli/internal/cmds/neosync/sync/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (

"golang.org/x/sync/errgroup"

_ "github.com/benthosdev/benthos/v4/public/components/aws"
_ "github.com/benthosdev/benthos/v4/public/components/io"
_ "github.com/benthosdev/benthos/v4/public/components/pure"
_ "github.com/benthosdev/benthos/v4/public/components/pure/extended"
_ "github.com/benthosdev/benthos/v4/public/components/sql"
_ "github.com/nucleuscloud/neosync/cli/internal/benthos/inputs"
_ "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql"
_ "github.com/warpstreamlabs/bento/public/components/aws"
_ "github.com/warpstreamlabs/bento/public/components/io"
_ "github.com/warpstreamlabs/bento/public/components/pure"
_ "github.com/warpstreamlabs/bento/public/components/pure/extended"
_ "github.com/warpstreamlabs/bento/public/components/sql"

"github.com/charmbracelet/bubbles/spinner"
tea "github.com/charmbracelet/bubbletea"
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3
github.com/aws/smithy-go v1.20.3
github.com/benthosdev/benthos/v4 v4.27.0
github.com/charmbracelet/bubbles v0.18.0
github.com/charmbracelet/bubbletea v0.26.6
github.com/charmbracelet/lipgloss v0.12.1
Expand Down Expand Up @@ -50,6 +49,7 @@ require (
github.com/testcontainers/testcontainers-go/modules/postgres v0.32.0
github.com/testcontainers/testcontainers-go/modules/redis v0.32.0
github.com/toqueteos/webbrowser v1.2.0
github.com/warpstreamlabs/bento v1.1.0
github.com/wasilibs/go-pgquery v0.0.0-20240319230125-b9b2e95c69a7
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
github.com/zeebo/assert v1.3.1
Expand Down Expand Up @@ -223,6 +223,7 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down Expand Up @@ -362,7 +363,7 @@ require (
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sijms/go-ora/v2 v2.8.7 // indirect
github.com/sijms/go-ora/v2 v2.8.19 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/snowflakedb/gosnowflake v1.7.2 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,6 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLj
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benhoyt/goawk v1.25.0 h1:DW4DCn2IrVp6FUar2W404G1YyQDXseWAVDwb11PUL+I=
github.com/benhoyt/goawk v1.25.0/go.mod h1:FjIAicXvrv3wbqAhSTo5bn4mIM5y1iy3lcnIynlJvoI=
github.com/benthosdev/benthos/v4 v4.27.0 h1:Z5FiVvJL/NAu9QveMI7c+DsWDJWxjLjuEzkXjMhJOWw=
github.com/benthosdev/benthos/v4 v4.27.0/go.mod h1:RVwc2qyKEzGU+knon8KMlKGXPLFiJeEA7OIhnesE9wk=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
Expand Down Expand Up @@ -1845,8 +1843,8 @@ github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhr
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/sijms/go-ora/v2 v2.8.7 h1:lkbCuXqd5/wn8niyJs/qvfTcSAfi8wBbzc5LYz41g5g=
github.com/sijms/go-ora/v2 v2.8.7/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk=
github.com/sijms/go-ora/v2 v2.8.19 h1:7LoKZatDYGi18mkpQTR/gQvG9yOdtc7hPAex96Bqisc=
github.com/sijms/go-ora/v2 v2.8.19/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down Expand Up @@ -1937,6 +1935,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/wI2L/jsondiff v0.4.0 h1:iP56F9tK83eiLttg3YdmEENtZnwlYd3ezEpNNnfZVyM=
github.com/wI2L/jsondiff v0.4.0/go.mod h1:nR/vyy1efuDeAtMwc3AF6nZf/2LD1ID8GTyyJ+K8YB0=
github.com/warpstreamlabs/bento v1.1.0 h1:7kc4sdtWjpv9glKG9WI1zM7ywP1LI+LIkL4rTk6PGJQ=
github.com/warpstreamlabs/bento v1.1.0/go.mod h1:rWbBdeYUJ3aKawP6J99fGZAUrTndseKt39AwMFoh/vg=
github.com/wasilibs/go-pgquery v0.0.0-20240319230125-b9b2e95c69a7 h1:sqqLVb63En4uTKFKBWSJ7c1aIFonhM1yn35/+KchOf4=
github.com/wasilibs/go-pgquery v0.0.0-20240319230125-b9b2e95c69a7/go.mod h1:ZAUjWnxivykc22k0TKFZylOV0WlVQ9nWMExfGFIBuF4=
github.com/xanzy/go-gitlab v0.15.0 h1:rWtwKTgEnXyNUGrOArN7yyc3THRkpYcKXIXia9abywQ=
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"errors"
"fmt"

"github.com/benthosdev/benthos/v4/public/service"
neosync_benthos_error "github.com/nucleuscloud/neosync/worker/pkg/benthos/error"
benthos_metrics "github.com/nucleuscloud/neosync/worker/pkg/benthos/metrics"
neosync_benthos_mongodb "github.com/nucleuscloud/neosync/worker/pkg/benthos/mongodb"
openaigenerate "github.com/nucleuscloud/neosync/worker/pkg/benthos/openai_generate"
neosync_benthos_sql "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql"
"github.com/warpstreamlabs/bento/public/service"
"go.opentelemetry.io/otel/metric"
)

Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/error/output_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"errors"
"fmt"

"github.com/benthosdev/benthos/v4/public/service"
neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos"
"github.com/warpstreamlabs/bento/public/service"
)

func errorOutputSpec() *service.ConfigSpec {
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/error/output_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"testing"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/stretchr/testify/require"
"github.com/warpstreamlabs/bento/public/service"
)

func Test_ErrorOutputEmptyShutdown(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/error/processor_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/service"
)

func errorProcessorSpec() *service.ConfigSpec {
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/error/processor_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"testing"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/stretchr/testify/require"
"github.com/warpstreamlabs/bento/public/service"
)

func Test_ErrorProcessorEmptyShutdown(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/javascript/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/dop251/goja"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers"
"github.com/warpstreamlabs/bento/public/service"
)

type jsFunction func(call goja.FunctionCall, rt *goja.Runtime, l *service.Logger) (any, error)
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/javascript/logger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package javascript

import "github.com/benthosdev/benthos/v4/public/service"
import "github.com/warpstreamlabs/bento/public/service"

// Logger wraps the service.Logger so that we can define the below methods.
type Logger struct {
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/javascript/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/dop251/goja_nodejs/console"
"github.com/dop251/goja_nodejs/require"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/nucleuscloud/neosync/worker/pkg/benthos/javascript/ifs"
"github.com/warpstreamlabs/bento/public/service"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/javascript/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"testing"
"time"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/service"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/javascript/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/dop251/goja"
"github.com/dop251/goja_nodejs/console"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/service"
)

type vmRunner struct {
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/metrics/otel_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package benthos_metrics
import (
"context"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/service"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/metrics/otel_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package benthos_metrics
import (
"testing"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/stretchr/testify/assert"
"github.com/warpstreamlabs/bento/public/service"
"go.opentelemetry.io/otel/metric"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
)
Expand Down
7 changes: 4 additions & 3 deletions worker/pkg/benthos/mongodb/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strconv"
"time"

"github.com/benthosdev/benthos/v4/public/bloblang"
"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/bloblang"
"github.com/warpstreamlabs/bento/public/service"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
Expand Down Expand Up @@ -332,7 +332,8 @@ func (w writeMaps) extractFromMessage(operation Operation, i int, batch service.
}

func extJSONFromMap(b service.MessageBatch, i int, m *bloblang.Executor) (any, error) {
msg, err := b.BloblangQuery(i, m)
executor := b.BloblangExecutor(m)
msg, err := executor.Query(i)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/mongodb/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/service"
)

// mongodb input component allowed operations.
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/mongodb/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/service"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/openai_generate/openai_generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/ai/azopenai"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/service"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/redis/go-redis/v9"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/service"
)

func clientFields() []*service.ConfigField {
Expand Down
4 changes: 2 additions & 2 deletions worker/pkg/benthos/redis/output_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

"github.com/redis/go-redis/v9"

"github.com/benthosdev/benthos/v4/public/bloblang"
"github.com/benthosdev/benthos/v4/public/service"
"github.com/warpstreamlabs/bento/public/bloblang"
"github.com/warpstreamlabs/bento/public/service"
)

const (
Expand Down
4 changes: 2 additions & 2 deletions worker/pkg/benthos/sql/input_sql_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"sync"

"github.com/Jeffail/shutdown"
"github.com/benthosdev/benthos/v4/public/bloblang"
"github.com/benthosdev/benthos/v4/public/service"
mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql"
neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos"
"github.com/warpstreamlabs/bento/public/bloblang"
"github.com/warpstreamlabs/bento/public/service"
)

func sqlRawInputSpec() *service.ConfigSpec {
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/sql/input_sql_raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"testing"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/stretchr/testify/require"
"github.com/warpstreamlabs/bento/public/service"
)

func Test_SqlRawInputEmptyShutdown(t *testing.T) {
Expand Down
11 changes: 8 additions & 3 deletions worker/pkg/benthos/sql/output_sql_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"sync"

"github.com/Jeffail/shutdown"
"github.com/benthosdev/benthos/v4/public/bloblang"
"github.com/benthosdev/benthos/v4/public/service"
"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/mysql"
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql"
querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder"
"github.com/warpstreamlabs/bento/public/bloblang"
"github.com/warpstreamlabs/bento/public/service"
)

func sqlInsertOutputSpec() *service.ConfigSpec {
Expand Down Expand Up @@ -205,12 +205,17 @@ func (s *pooledInsertOutput) WriteBatch(ctx context.Context, batch service.Messa
return nil
}

var executor *service.MessageBatchBloblangExecutor
if s.argsMapping != nil {
executor = batch.BloblangExecutor(s.argsMapping)
}

rows := [][]interface{}{} //nolint:gofmt
for i := range batch {
if s.argsMapping == nil {
continue
}
resMsg, err := batch.BloblangQuery(i, s.argsMapping)
resMsg, err := executor.Query(i)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/sql/output_sql_insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"testing"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/stretchr/testify/require"
"github.com/warpstreamlabs/bento/public/service"
)

func Test_SqlInsertOutputEmptyShutdown(t *testing.T) {
Expand Down
Loading

0 comments on commit 204e371

Please sign in to comment.