Skip to content

Commit

Permalink
Adds on conflict update support for Mysql in CLI (#3127)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Jan 13, 2025
1 parent 08b16a0 commit c59da24
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 10 deletions.
4 changes: 4 additions & 0 deletions cli/internal/cmds/neosync/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func isConfigValid(cmd *cmdConfig, logger *slog.Logger, sourceConnection *mgmtv1
return fmt.Errorf("truncate cascade is only supported in postgres")
}

if cmd.Destination.OnConflict.DoNothing && cmd.Destination.OnConflict.DoUpdate != nil && cmd.Destination.OnConflict.DoUpdate.Enabled {
return errors.New("on-conflict-do-nothing and on-conflict-do-update cannot be used together")
}

if sourceConnectionType == benthosbuilder_shared.ConnectionTypeMysql || sourceConnectionType == benthosbuilder_shared.ConnectionTypePostgres {
if cmd.Destination.Driver == "" {
return fmt.Errorf("must provide destination-driver")
Expand Down
27 changes: 20 additions & 7 deletions cli/internal/cmds/neosync/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ type connectionOpts struct {
}

type onConflictConfig struct {
DoNothing bool `yaml:"do-nothing"`
DoNothing bool `yaml:"do-nothing"`
DoUpdate *UpdateConflictConfig `yaml:"do-update,omitempty"`
}

type UpdateConflictConfig struct {
Enabled bool `yaml:"enabled"`
}

type dynamoDbDestinationConfig struct {
Expand Down Expand Up @@ -397,7 +402,7 @@ func (c *clisync) configureSync() ([][]*benthosbuilder.BenthosConfigResponse, er
}
configs, err := bm.GenerateBenthosConfigs(c.ctx)
if err != nil {
c.logger.Error("unable to build benthos configs")
c.logger.Error("unable to build benthos configs", "error", err)
return nil, err
}

Expand Down Expand Up @@ -649,18 +654,26 @@ func cmdConfigToDestinationConnectionOptions(cmd *cmdConfig, tables map[string]s
},
}
case mysqlDriver:
conflictConfig := &mgmtv1alpha1.MysqlOnConflictConfig{}
if cmd.Destination.OnConflict.DoUpdate != nil && cmd.Destination.OnConflict.DoUpdate.Enabled {
conflictConfig.Strategy = &mgmtv1alpha1.MysqlOnConflictConfig_Update{
Update: &mgmtv1alpha1.MysqlOnConflictConfig_MysqlOnConflictUpdate{},
}
} else if cmd.Destination.OnConflict.DoNothing {
conflictConfig.Strategy = &mgmtv1alpha1.MysqlOnConflictConfig_Nothing{
Nothing: &mgmtv1alpha1.MysqlOnConflictConfig_MysqlOnConflictDoNothing{},
}
}
return &mgmtv1alpha1.JobDestinationOptions{
Config: &mgmtv1alpha1.JobDestinationOptions_MysqlOptions{
MysqlOptions: &mgmtv1alpha1.MysqlDestinationConnectionOptions{
TruncateTable: &mgmtv1alpha1.MysqlTruncateTableConfig{
TruncateBeforeInsert: cmd.Destination.TruncateBeforeInsert,
},
InitTableSchema: cmd.Destination.InitSchema,
OnConflict: &mgmtv1alpha1.MysqlOnConflictConfig{
DoNothing: cmd.Destination.OnConflict.DoNothing,
},
MaxInFlight: cmd.Destination.MaxInFlight,
Batch: cmdConfigSqlDestinationToBatch(cmd.Destination),
OnConflict: conflictConfig,
MaxInFlight: cmd.Destination.MaxInFlight,
Batch: cmdConfigSqlDestinationToBatch(cmd.Destination),
},
},
}
Expand Down
45 changes: 42 additions & 3 deletions docs/docs/cli/sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,48 @@ neosync sync

The following options can be passed using the `neosync sync` command:

### General Options

- `--api-key` - Neosync API Key. Takes precedence over `$NEOSYNC_API_KEY`
- `--config` - Path to yaml config. Defaults to `neosync.yaml` in current directory.
- `--connection-id` - Neosync connection id for sync data source. Takes precedence over config.
- `--job-id` - Neosync job id for sync data source. For [AWS S3, GCP Cloud Storage] jobs only. Takes precedence over config.
- `--job-run-id` - Neosync job run id for sync data source. For [AWS S3, GCP Cloud Storage] jobs only. Takes precedence over config.
- `--output` - Sets output type (auto, plain, tty). (default `auto`).
- `--debug` - Sets the log level to debug and prints much more information. Works best with `--output plain`.

### SQL Destination Options

- `--destination-connection-url` - Local destination connection url to sync data to. Takes precedence over config.
- `--destination-driver` - Destination connection driver (postgres, mysql). Takes precedence over config.
- `--truncate-before-insert` - Truncates the table before inserting data. This will not work with Foreign Keys.
- `--truncate-cascade` - Truncate cascades to all tables. Only supported for postgres.
- `--init-schema` - Creates the table schema and its constraints.
- `--on-conflict-do-nothing` - If there is a conflict when inserting data into SQL database do not insert.
- `--output` - Sets output type (auto, plain, tty). (default `auto`).
- `--debug` - Sets the log level to debug and prints much more information. Works best with `--output plain`.

### SQL Connection Pool Options

- `--destination-idle-duration` - Maximum amount of time a connection may be idle (e.g. '5m')
- `--destination-idle-limit` - Maximum number of idle connections
- `--destination-open-duration` - Maximum amount of time a connection may be open (e.g. '30s')
- `--destination-open-limit` - Maximum number of open connections

### Batch Processing Options

- `--destination-max-in-flight` - Maximum allowed batched rows to sync. If not provided, uses server default of 64
- `--destination-batch-count` - Batch size of rows that will be sent to the destination.
- `--destination-batch-period` - Duration of tim e that a batch of rows will be sent.
- `--destination-batch-period` - Duration of time that a batch of rows will be sent.

### AWS DynamoDB Destination Options

- `--aws-access-key-id` - AWS Access Key ID for DynamoDB
- `--aws-secret-access-key` - AWS Secret Access Key for DynamoDB
- `--aws-session-token` - AWS Session Token for DynamoDB
- `--aws-role-arn` - AWS Role ARN for DynamoDB
- `--aws-role-external-id` - AWS Role External ID for DynamoDB
- `--aws-profile` - AWS Profile for DynamoDB
- `--aws-endpoint` - Custom endpoint for DynamoDB
- `--aws-region` - AWS Region for DynamoDB

## Yaml Config File

Expand All @@ -61,15 +83,20 @@ neosync sync --config ./path/to/config.yaml
source:
connection-id: d9dc020d-746b-48c1-9319-a165a25ac32e
connection-opts:
# only used if source is AWS S3
job-run-id: 43a4aac5-c4a8-4e4f-8554-03b7c5fffc04-2024-06-14T17:43:24Z

destination:
# SQL destination configuration
connection-url: user:pass@tcp(127.0.0.1:3306)/database
driver: mysql
truncate-before-insert: false
truncate-cascade: false
init-schema: false
on-conflict:
do-nothing: false
do-update:
enabled: false
connection-opts:
open-limit: 25 # remove to unset and use system default (CLI falls back to default of 25 if not provided)
idle-limit: 2 # remove to unset and use system default
Expand All @@ -79,6 +106,18 @@ destination:
batch:
count: 100
period: 5s

# AWS DynamoDB destination configuration
aws-dynamodb-destination:
aws-cred-config:
region: us-west-2
access-key-id: your-access-key
secret-access-key: your-secret-key
session-token: your-session-token
role-arn: your-role-arn
role-external-id: your-external-id
endpoint: http://localhost:8000
profile: default
```
## Circular Dependencies
Expand Down

0 comments on commit c59da24

Please sign in to comment.