Skip to content

Commit

Permalink
Mysql and Mssql processors (#3023)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Dec 11, 2024
1 parent f3d58a8 commit 9115bb7
Show file tree
Hide file tree
Showing 31 changed files with 1,951 additions and 1,097 deletions.
16 changes: 3 additions & 13 deletions internal/benthos/benthos-builder/benthos-builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (b *BuilderProvider) registerStandardBuilders(
connectionclient mgmtv1alpha1connect.ConnectionServiceClient,
redisConfig *shared.RedisConfig,
selectQueryBuilder bb_shared.SelectQueryMapBuilder,
rawSqlInsertMode bool,
) error {
sourceConnectionType := bb_internal.GetConnectionType(sourceConnection)
jobType := bb_internal.GetJobType(job)
Expand All @@ -111,22 +110,17 @@ func (b *BuilderProvider) registerStandardBuilders(
connectionTypes = append(connectionTypes, bb_internal.GetConnectionType(dest))
}

sqlSyncOptions := []bb_conns.SqlSyncOption{}
if rawSqlInsertMode {
sqlSyncOptions = append(sqlSyncOptions, bb_conns.WithRawInsertMode())
}

if jobType == bb_internal.JobTypeSync {
for _, connectionType := range connectionTypes {
switch connectionType {
case bb_internal.ConnectionTypePostgres:
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.PostgresDriver, selectQueryBuilder, sqlSyncOptions...)
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.PostgresDriver, selectQueryBuilder)
b.Register(bb_internal.JobTypeSync, connectionType, sqlbuilder)
case bb_internal.ConnectionTypeMysql:
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.MysqlDriver, selectQueryBuilder, sqlSyncOptions...)
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.MysqlDriver, selectQueryBuilder)
b.Register(bb_internal.JobTypeSync, connectionType, sqlbuilder)
case bb_internal.ConnectionTypeMssql:
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.MssqlDriver, selectQueryBuilder, sqlSyncOptions...)
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.MssqlDriver, selectQueryBuilder)
b.Register(bb_internal.JobTypeSync, connectionType, sqlbuilder)
case bb_internal.ConnectionTypeAwsS3:
b.Register(bb_internal.JobTypeSync, bb_internal.ConnectionTypeAwsS3, bb_conns.NewAwsS3SyncBuilder())
Expand Down Expand Up @@ -217,7 +211,6 @@ type WorkerBenthosConfig struct {
func NewWorkerBenthosConfigManager(
config *WorkerBenthosConfig,
) (*BenthosConfigManager, error) {
rawInsertMode := false
provider := NewBuilderProvider(config.Logger)
err := provider.registerStandardBuilders(
config.Job,
Expand All @@ -228,7 +221,6 @@ func NewWorkerBenthosConfigManager(
config.Connectionclient,
config.RedisConfig,
config.SelectQueryBuilder,
rawInsertMode,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -269,7 +261,6 @@ type CliBenthosConfig struct {
func NewCliBenthosConfigManager(
config *CliBenthosConfig,
) (*BenthosConfigManager, error) {
rawInsertMode := true
destinationProvider := NewBuilderProvider(config.Logger)
err := destinationProvider.registerStandardBuilders(
config.Job,
Expand All @@ -280,7 +271,6 @@ func NewCliBenthosConfigManager(
nil,
config.RedisConfig,
nil,
rawInsertMode,
)
if err != nil {
return nil, err
Expand Down
27 changes: 7 additions & 20 deletions internal/benthos/benthos-builder/builders/aws-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,6 @@ func (b *awsS3SyncBuilder) BuildDestinationConfig(ctx context.Context, params *b
storageClass = convertToS3StorageClass(destinationOpts.GetStorageClass()).String()
}

processors := []*neosync_benthos.BatchProcessor{}
if isPooledSqlRawConfigured(benthosConfig.Config) {
processors = append(processors, &neosync_benthos.BatchProcessor{SqlToJson: &neosync_benthos.SqlToJsonConfig{}})
}

standardProcessors := []*neosync_benthos.BatchProcessor{
{Archive: &neosync_benthos.ArchiveProcessor{Format: "lines"}},
{Compress: &neosync_benthos.CompressProcessor{Algorithm: "gzip"}},
}
processors = append(processors, standardProcessors...)

config.Outputs = append(config.Outputs, neosync_benthos.Outputs{
Fallback: []neosync_benthos.Outputs{
{
Expand All @@ -97,9 +86,13 @@ func (b *awsS3SyncBuilder) BuildDestinationConfig(ctx context.Context, params *b
Path: strings.Join(s3pathpieces, "/"),
ContentType: "application/gzip",
Batching: &neosync_benthos.Batching{
Count: batchingConfig.BatchCount,
Period: batchingConfig.BatchPeriod,
Processors: processors,
Count: batchingConfig.BatchCount,
Period: batchingConfig.BatchPeriod,
Processors: []*neosync_benthos.BatchProcessor{
{NeosyncToJson: &neosync_benthos.NeosyncToJsonConfig{}},
{Archive: &neosync_benthos.ArchiveProcessor{Format: "lines"}},
{Compress: &neosync_benthos.CompressProcessor{Algorithm: "gzip"}},
},
},
Credentials: buildBenthosS3Credentials(connAwsS3Config.Credentials),
Region: connAwsS3Config.GetRegion(),
Expand All @@ -120,12 +113,6 @@ func (b *awsS3SyncBuilder) BuildDestinationConfig(ctx context.Context, params *b
return config, nil
}

func isPooledSqlRawConfigured(cfg *neosync_benthos.BenthosConfig) bool {
return cfg != nil &&
cfg.StreamConfig.Input != nil &&
cfg.StreamConfig.Input.Inputs.PooledSqlRaw != nil
}

type S3StorageClass int

const (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,6 @@ func Test_convertUserDefinedFunctionConfig(t *testing.T) {
require.Equal(t, resp, expected)
}

func Test_buildPlainInsertArgs(t *testing.T) {
require.Empty(t, buildPlainInsertArgs(nil))
require.Empty(t, buildPlainInsertArgs([]string{}))
require.Equal(t, buildPlainInsertArgs([]string{"foo", "bar", "baz"}), `root = [this."foo", this."bar", this."baz"]`)
}

func Test_buildPlainColumns(t *testing.T) {
require.Empty(t, buildPlainColumns(nil))
require.Empty(t, buildPlainColumns([]*mgmtv1alpha1.JobMapping{}))
Expand Down
33 changes: 6 additions & 27 deletions internal/benthos/benthos-builder/builders/generate-ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import (
)

type generateAIBuilder struct {
transformerclient mgmtv1alpha1connect.TransformersServiceClient
sqlmanagerclient sqlmanager.SqlManagerClient
connectionclient mgmtv1alpha1connect.ConnectionServiceClient
aiGroupedTableCols map[string][]string
transformerclient mgmtv1alpha1connect.TransformersServiceClient
sqlmanagerclient sqlmanager.SqlManagerClient
connectionclient mgmtv1alpha1connect.ConnectionServiceClient
}

func NewGenerateAIBuilder(
Expand All @@ -32,10 +31,9 @@ func NewGenerateAIBuilder(
driver string,
) bb_internal.BenthosBuilder {
return &generateAIBuilder{
transformerclient: transformerclient,
sqlmanagerclient: sqlmanagerclient,
connectionclient: connectionclient,
aiGroupedTableCols: map[string][]string{},
transformerclient: transformerclient,
sqlmanagerclient: sqlmanagerclient,
connectionclient: connectionclient,
}
}

Expand Down Expand Up @@ -123,16 +121,6 @@ func (b *generateAIBuilder) BuildSourceConfigs(ctx context.Context, params *bb_i
userBatchSize,
)

// builds a map of table key to columns for AI Generated schemas as they are calculated lazily instead of via job mappings
aiGroupedTableCols := map[string][]string{}
for _, agm := range mappings {
key := neosync_benthos.BuildBenthosTable(agm.Schema, agm.Table)
for _, col := range agm.Columns {
aiGroupedTableCols[key] = append(aiGroupedTableCols[key], col.Column)
}
}
b.aiGroupedTableCols = aiGroupedTableCols

return sourceResponses, nil
}

Expand Down Expand Up @@ -217,12 +205,6 @@ func (b *generateAIBuilder) BuildDestinationConfig(ctx context.Context, params *
if err != nil {
return nil, fmt.Errorf("unable to parse destination options: %w", err)
}
tableKey := neosync_benthos.BuildBenthosTable(benthosConfig.TableSchema, benthosConfig.TableName)

cols, ok := b.aiGroupedTableCols[tableKey]
if !ok {
return nil, fmt.Errorf("unable to find table columns for key (%s) when building destination connection", tableKey)
}

processorConfigs := []neosync_benthos.ProcessorConfig{}
for _, pc := range benthosConfig.Processors {
Expand All @@ -244,12 +226,9 @@ func (b *generateAIBuilder) BuildDestinationConfig(ctx context.Context, params *
ConnectionId: params.DestConnection.GetId(),
Schema: benthosConfig.TableSchema,
Table: benthosConfig.TableName,
Columns: cols,
OnConflictDoNothing: destOpts.OnConflictDoNothing,
TruncateOnRetry: destOpts.Truncate,

ArgsMapping: buildPlainInsertArgs(cols),

Batching: &neosync_benthos.Batching{
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
Expand Down
24 changes: 14 additions & 10 deletions internal/benthos/benthos-builder/builders/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type generateBuilder struct {
transformerclient mgmtv1alpha1connect.TransformersServiceClient
sqlmanagerclient sqlmanager.SqlManagerClient
connectionclient mgmtv1alpha1connect.ConnectionServiceClient
driver string
}

func NewGenerateBuilder(
Expand Down Expand Up @@ -54,6 +55,7 @@ func (b *generateBuilder) BuildSourceConfigs(ctx context.Context, params *bb_int
return nil, fmt.Errorf("unable to create new sql db: %w", err)
}
defer db.Db().Close()
b.driver = db.Driver()

groupedMappings := groupMappingsByTable(job.Mappings)
groupedTableMapping := getTableMappingsMap(groupedMappings)
Expand Down Expand Up @@ -179,6 +181,11 @@ func (b *generateBuilder) BuildDestinationConfig(ctx context.Context, params *bb
processorConfigs = append(processorConfigs, *pc)
}

sqlProcessor, err := getSqlBatchProcessors(b.driver, benthosConfig.Columns, map[string]string{}, benthosConfig.ColumnDefaultProperties)
if err != nil {
return nil, err
}

config.BenthosDsns = append(config.BenthosDsns, &bb_shared.BenthosDsn{ConnectionId: params.DestConnection.Id})
config.Outputs = append(config.Outputs, neosync_benthos.Outputs{
Fallback: []neosync_benthos.Outputs{
Expand All @@ -193,18 +200,15 @@ func (b *generateBuilder) BuildDestinationConfig(ctx context.Context, params *bb
PooledSqlInsert: &neosync_benthos.PooledSqlInsert{
ConnectionId: params.DestConnection.GetId(),

Schema: benthosConfig.TableSchema,
Table: benthosConfig.TableName,
Columns: benthosConfig.Columns,
ColumnDefaultProperties: benthosConfig.ColumnDefaultProperties,
OnConflictDoNothing: destOpts.OnConflictDoNothing,
TruncateOnRetry: destOpts.Truncate,

ArgsMapping: buildPlainInsertArgs(benthosConfig.Columns),
Schema: benthosConfig.TableSchema,
Table: benthosConfig.TableName,
OnConflictDoNothing: destOpts.OnConflictDoNothing,
TruncateOnRetry: destOpts.Truncate,

Batching: &neosync_benthos.Batching{
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
Processors: []*neosync_benthos.BatchProcessor{sqlProcessor},
},
MaxInFlight: int(destOpts.MaxInFlight),
},
Expand Down
34 changes: 23 additions & 11 deletions internal/benthos/benthos-builder/builders/sql-util.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,6 @@ func getMapValuesCount[K comparable, V any](m map[K][]V) int {
return count
}

func buildPlainInsertArgs(cols []string) string {
if len(cols) == 0 {
return ""
}
pieces := make([]string, len(cols))
for idx := range cols {
pieces[idx] = fmt.Sprintf("this.%q", cols[idx])
}
return fmt.Sprintf("root = [%s]", strings.Join(pieces, ", "))
}

func buildPlainColumns(mappings []*mgmtv1alpha1.JobMapping) []string {
columns := make([]string, len(mappings))
for idx := range mappings {
Expand Down Expand Up @@ -439,6 +428,7 @@ func getColumnDefaultProperties(
if !ok {
return nil, fmt.Errorf("transformer missing for column: %s", cName)
}

var hasDefaultTransformer bool
if jmTransformer != nil && isDefaultJobMappingTransformer(jmTransformer) {
hasDefaultTransformer = true
Expand Down Expand Up @@ -906,3 +896,25 @@ func cleanPostgresType(dataType string) string {
}
return strings.TrimSpace(dataType[:parenIndex])
}

func shouldOverrideColumnDefault(columnDefaults map[string]*neosync_benthos.ColumnDefaultProperties) bool {
for _, cd := range columnDefaults {
if cd != nil && !cd.HasDefaultTransformer && cd.NeedsOverride {
return true
}
}
return false
}

func getSqlBatchProcessors(driver string, columns []string, columnDataTypes map[string]string, columnDefaultProperties map[string]*neosync_benthos.ColumnDefaultProperties) (*neosync_benthos.BatchProcessor, error) {
switch driver {
case sqlmanager_shared.PostgresDriver:
return &neosync_benthos.BatchProcessor{NeosyncToPgx: &neosync_benthos.NeosyncToPgxConfig{Columns: columns, ColumnDataTypes: columnDataTypes, ColumnDefaultProperties: columnDefaultProperties}}, nil
case sqlmanager_shared.MysqlDriver:
return &neosync_benthos.BatchProcessor{NeosyncToMysql: &neosync_benthos.NeosyncToMysqlConfig{Columns: columns, ColumnDataTypes: columnDataTypes, ColumnDefaultProperties: columnDefaultProperties}}, nil
case sqlmanager_shared.MssqlDriver:
return &neosync_benthos.BatchProcessor{NeosyncToMssql: &neosync_benthos.NeosyncToMssqlConfig{Columns: columns, ColumnDataTypes: columnDataTypes, ColumnDefaultProperties: columnDefaultProperties}}, nil
default:
return nil, fmt.Errorf("unsupported driver %q when attempting to get sql batch processors", driver)
}
}
Loading

0 comments on commit 9115bb7

Please sign in to comment.