diff --git a/commands/cmd_start_replication.go b/commands/cmd_start_replication.go index ba84f90..421dea5 100644 --- a/commands/cmd_start_replication.go +++ b/commands/cmd_start_replication.go @@ -136,7 +136,8 @@ func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) { "[Required] The absolute path to the connection file created with the create_connection command, "+ "containing the database name, hosts, and password (if any) for the target database. "+ "Alternatively, you can provide this information manually with --target-db-name, "+ - "--target-hosts, and --target-password-file") + "--target-hosts, and --target-password-file", + ) cmd.Flags().BoolVar( &c.startRepOptions.Async, asyncFlag, @@ -145,7 +146,7 @@ func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) { "Default value is false.", ) cmd.Flags().StringVar( - &c.startRepOptions.ObjectName, + &c.startRepOptions.TableOrSchemaName, tableOrSchemaNameFlag, "", "(only async replication)The object name we want to copy from the source side. The available"+ @@ -274,12 +275,18 @@ func (c *CmdStartReplication) Run(vcc vclusterops.ClusterCommands) error { options := c.startRepOptions - err := vcc.VReplicateDatabase(options) + transactionID, err := vcc.VReplicateDatabase(options) if err != nil { vcc.LogError(err, "failed to replicate to database", "targetDB", options.TargetDB) return err } - vcc.DisplayInfo("Successfully replicated to database %s", options.TargetDB) + + if options.Async { + vcc.DisplayInfo("Successfully started replication to database %s. Transaction ID: %d", options.TargetDB, transactionID) + } else { + vcc.DisplayInfo("Successfully replicated to database %s", options.TargetDB) + } + return nil } diff --git a/vclusterops/cluster_op.go b/vclusterops/cluster_op.go index 2ecf82a..84e08f0 100644 --- a/vclusterops/cluster_op.go +++ b/vclusterops/cluster_op.go @@ -563,7 +563,7 @@ type ClusterCommands interface { VRemoveNode(options *VRemoveNodeOptions) (VCoordinationDatabase, error) VRemoveSubcluster(removeScOpt *VRemoveScOptions) (VCoordinationDatabase, error) VRenameSubcluster(options *VRenameSubclusterOptions) error - VReplicateDatabase(options *VReplicationDatabaseOptions) error + VReplicateDatabase(options *VReplicationDatabaseOptions) (int64, error) VReviveDatabase(options *VReviveDatabaseOptions) (dbInfo string, vdbPtr *VCoordinationDatabase, err error) VSandbox(options *VSandboxOptions) error VScrutinize(options *VScrutinizeOptions) error diff --git a/vclusterops/https_check_db_running_op.go b/vclusterops/https_check_db_running_op.go index 4dea376..afa9a58 100644 --- a/vclusterops/https_check_db_running_op.go +++ b/vclusterops/https_check_db_running_op.go @@ -24,6 +24,7 @@ import ( "github.com/vertica/vcluster/rfc7807" "github.com/vertica/vcluster/vclusterops/util" + "golang.org/x/exp/slices" ) type opType int @@ -62,6 +63,8 @@ func (op opType) String() string { return "unknown operation" } +var maskEOFOp = []opType{DropDB} + // DBIsRunningError is an error to indicate we found the database still running. // This is emitted from this op. Callers can do type checking to perform an // action based on the error. @@ -291,6 +294,11 @@ func (op *httpsCheckRunningDBOp) processResult(_ *opEngineExecContext) error { for host, result := range op.clusterHTTPRequest.ResultCollection { op.logResponse(host, result) + // EOF is expected in node shutdown: we expect the node's HTTPS service to go down quickly + // and the Server HTTPS service does not guarantee that the response being sent back to the client before it closes + if result.isEOF() && slices.Contains(maskEOFOp, op.opType) { + continue + } if !result.isPassing() { allErrs = errors.Join(allErrs, result.err) } diff --git a/vclusterops/nma_poll_replication_status.go b/vclusterops/nma_poll_replication_status.go new file mode 100644 index 0000000..0029987 --- /dev/null +++ b/vclusterops/nma_poll_replication_status.go @@ -0,0 +1,182 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "encoding/json" + "errors" + "fmt" + "slices" + + "github.com/vertica/vcluster/vclusterops/util" +) + +type nmaPollReplicationStatusOp struct { + opBase + TargetDatabaseOptions + hostRequestBodyMap map[string]string + sandbox string + vdb *VCoordinationDatabase + existingTransactionIDs *[]int64 + newTransactionID *int64 +} + +func makeNMAPollReplicationStatusOp(targetDBOpt *TargetDatabaseOptions, targetUsePassword bool, + sandbox string, vdb *VCoordinationDatabase, existingTransactionIDs *[]int64, newTransactionID *int64) (nmaPollReplicationStatusOp, error) { + op := nmaPollReplicationStatusOp{} + op.name = "NMAPollReplicationStatusOp" + op.description = "Retrieve asynchronous replication transaction ID" + op.TargetDB = targetDBOpt.TargetDB + op.TargetHosts = targetDBOpt.TargetHosts + op.sandbox = sandbox + op.vdb = vdb + op.existingTransactionIDs = existingTransactionIDs + op.newTransactionID = newTransactionID + + if targetUsePassword { + err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, targetDBOpt.TargetUserName) + if err != nil { + return op, err + } + op.TargetUserName = targetDBOpt.TargetUserName + op.TargetPassword = targetDBOpt.TargetPassword + } + + return op, nil +} + +func (op *nmaPollReplicationStatusOp) updateRequestBody(hosts []string) error { + op.hostRequestBodyMap = make(map[string]string) + + for _, host := range hosts { + requestData := nmaReplicationStatusRequestData{} + requestData.DBName = op.TargetDB + requestData.ExcludedTransactionIDs = *op.existingTransactionIDs + requestData.GetTransactionIDsOnly = true + requestData.TransactionID = 0 + requestData.UserName = op.TargetUserName + requestData.Password = op.TargetPassword + + dataBytes, err := json.Marshal(requestData) + if err != nil { + return fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err) + } + + op.hostRequestBodyMap[host] = string(dataBytes) + } + + return nil +} + +func (op *nmaPollReplicationStatusOp) setupClusterHTTPRequest(hosts []string) error { + for _, host := range hosts { + httpRequest := hostHTTPRequest{} + httpRequest.Method = PostMethod + httpRequest.buildNMAEndpoint("replicate/status") + httpRequest.RequestData = op.hostRequestBodyMap[host] + op.clusterHTTPRequest.RequestCollection[host] = httpRequest + } + + return nil +} + +func (op *nmaPollReplicationStatusOp) prepare(execContext *opEngineExecContext) error { + err := op.updateRequestBody(op.TargetHosts) + if err != nil { + return err + } + + execContext.dispatcher.setup(op.TargetHosts) + + return op.setupClusterHTTPRequest(op.TargetHosts) +} + +func (op *nmaPollReplicationStatusOp) execute(execContext *opEngineExecContext) error { + if err := op.runExecute(execContext); err != nil { + return err + } + + return op.processResult(execContext) +} + +func (op *nmaPollReplicationStatusOp) finalize(_ *opEngineExecContext) error { + return nil +} + +func (op *nmaPollReplicationStatusOp) processResult(execContext *opEngineExecContext) error { + err := pollState(op, execContext) + if err != nil { + return fmt.Errorf("error polling replication status, %w", err) + } + + return nil +} + +func (op *nmaPollReplicationStatusOp) getPollingTimeout() int { + return OneMinute +} + +func (op *nmaPollReplicationStatusOp) shouldStopPolling() (bool, error) { + var allErrs error + + for host, result := range op.clusterHTTPRequest.ResultCollection { + op.logResponse(host, result) + + if result.isUnauthorizedRequest() { + return true, fmt.Errorf("[%s] wrong certificate for NMA service on host %s", + op.name, host) + } + + if !result.isPassing() { + allErrs = errors.Join(allErrs, result.err) + continue + } + + responseObj := []replicationStatusResponse{} + err := op.parseAndCheckResponse(host, result.content, &responseObj) + if err != nil { + return true, errors.Join(allErrs, err) + } + + // We should only receive 1 new transaction ID. + // More than 1 means multiple replication jobs were started at the same time. + // If this happens, we can't determine which transaction ID belongs to which job. + if len(responseObj) > 1 { + return true, errors.Join(allErrs, fmt.Errorf("[%s] expects one transaction ID but retrieved %d: %+v", + op.name, len(responseObj), responseObj)) + } + + // Stop polling if NMA responds with a single new transaction ID + if len(responseObj) == 1 { + newTransactionID := responseObj[0].TransactionID + + // The transaction ID should be new, i.e. not in the list of existing transaction IDs + if slices.Contains(*op.existingTransactionIDs, newTransactionID) { + return true, errors.Join(allErrs, fmt.Errorf("[%s] transaction ID already exists %d", + op.name, newTransactionID)) + } + + *op.newTransactionID = newTransactionID + return true, nil + } + + // If we're here, we've successfully received a status from one of the target hosts and there are no new transaction IDs + // Keep polling in this case + return false, nil + } + + return true, allErrs +} diff --git a/vclusterops/nma_replication_start.go b/vclusterops/nma_replication_start.go new file mode 100644 index 0000000..a050794 --- /dev/null +++ b/vclusterops/nma_replication_start.go @@ -0,0 +1,154 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/vertica/vcluster/vclusterops/util" +) + +type nmaReplicationStartOp struct { + opBase + nmaStartReplicationRequestData + hostRequestBodyMap map[string]string + sandbox string + vdb *VCoordinationDatabase +} + +func makeNMAReplicationStartOp(sourceHosts []string, + sourceUsePassword bool, targetUsePassword bool, + replicationRequestData *nmaStartReplicationRequestData, + vdb *VCoordinationDatabase) (nmaReplicationStartOp, error) { + op := nmaReplicationStartOp{} + op.name = "NMAReplicationStartOp" + op.description = "Start asynchronous database replication" + op.hosts = sourceHosts + op.nmaStartReplicationRequestData = *replicationRequestData + op.vdb = vdb + + if sourceUsePassword { + err := util.ValidateUsernameAndPassword(op.name, sourceUsePassword, replicationRequestData.Username) + if err != nil { + return op, err + } + op.Username = replicationRequestData.Username + op.Password = replicationRequestData.Password + } + if targetUsePassword { + err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, replicationRequestData.TargetUserName) + if err != nil { + return op, err + } + op.TargetUserName = replicationRequestData.TargetUserName + op.TargetPassword = replicationRequestData.TargetPassword + } + + return op, nil +} + +type nmaStartReplicationRequestData struct { + DBName string `json:"dbname"` + ExcludePattern string `json:"exclude_pattern,omitempty"` + IncludePattern string `json:"include_pattern,omitempty"` + TableOrSchemaName string `json:"table_or_schema_name,omitempty"` + Username string `json:"username"` + Password *string `json:"password"` + TargetDBName string `json:"target_dbname"` + TargetHost string `json:"target_hostname"` + TargetNamespace string `json:"target_namespace,omitempty"` + TargetUserName string `json:"target_username,omitempty"` + TargetPassword *string `json:"target_password,omitempty"` + TLSConfig string `json:"tls_config,omitempty"` +} + +func (op *nmaReplicationStartOp) updateRequestBody(hosts []string) error { + op.hostRequestBodyMap = make(map[string]string) + + for _, host := range hosts { + dataBytes, err := json.Marshal(op.nmaStartReplicationRequestData) + if err != nil { + return fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err) + } + + op.hostRequestBodyMap[host] = string(dataBytes) + } + + return nil +} + +func (op *nmaReplicationStartOp) setupClusterHTTPRequest(hosts []string) error { + for _, host := range hosts { + httpRequest := hostHTTPRequest{} + httpRequest.Method = PostMethod + httpRequest.buildNMAEndpoint("replicate/start") + httpRequest.RequestData = op.hostRequestBodyMap[host] + op.clusterHTTPRequest.RequestCollection[host] = httpRequest + } + + return nil +} + +func (op *nmaReplicationStartOp) prepare(execContext *opEngineExecContext) error { + sourceHost, err := getInitiatorHostForReplication(op.name, op.sandbox, op.hosts, op.vdb) + if err != nil { + return err + } + // use first up host to execute https post request + op.hosts = sourceHost + + err = op.updateRequestBody(op.hosts) + if err != nil { + return err + } + + execContext.dispatcher.setup(op.hosts) + + return op.setupClusterHTTPRequest(op.hosts) +} + +func (op *nmaReplicationStartOp) execute(execContext *opEngineExecContext) error { + if err := op.runExecute(execContext); err != nil { + return err + } + + return op.processResult(execContext) +} + +func (op *nmaReplicationStartOp) finalize(_ *opEngineExecContext) error { + return nil +} + +func (op *nmaReplicationStartOp) processResult(_ *opEngineExecContext) error { + var allErrs error + + for host, result := range op.clusterHTTPRequest.ResultCollection { + op.logResponse(host, result) + + if result.isUnauthorizedRequest() { + return fmt.Errorf("[%s] wrong certificate for NMA service on host %s", + op.name, host) + } + + if !result.isPassing() { + allErrs = errors.Join(allErrs, result.err) + } + } + + return allErrs +} diff --git a/vclusterops/nma_replication_status.go b/vclusterops/nma_replication_status.go new file mode 100644 index 0000000..daf6656 --- /dev/null +++ b/vclusterops/nma_replication_status.go @@ -0,0 +1,167 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "encoding/json" + "errors" + "fmt" + + mapset "github.com/deckarep/golang-set/v2" + "github.com/vertica/vcluster/vclusterops/util" +) + +type nmaReplicationStatusOp struct { + opBase + nmaReplicationStatusRequestData + TargetHosts []string + hostRequestBodyMap map[string]string + sandbox string + vdb *VCoordinationDatabase + transactionIDs *[]int64 +} + +func makeNMAReplicationStatusOp(targetHosts []string, targetUsePassword bool, + replicationStatusData *nmaReplicationStatusRequestData, sandbox string, vdb *VCoordinationDatabase, + transactionIDs *[]int64) (nmaReplicationStatusOp, error) { + op := nmaReplicationStatusOp{} + op.name = "NMAReplicationStatusOp" + op.description = "Get asynchronous replication status" + op.TargetHosts = targetHosts + op.nmaReplicationStatusRequestData = *replicationStatusData + op.sandbox = sandbox + op.vdb = vdb + op.transactionIDs = transactionIDs + + if targetUsePassword { + err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, replicationStatusData.UserName) + if err != nil { + return op, err + } + op.UserName = replicationStatusData.UserName + op.Password = replicationStatusData.Password + } + + return op, nil +} + +type nmaReplicationStatusRequestData struct { + DBName string `json:"dbname"` + ExcludedTransactionIDs []int64 `json:"excluded_txn_ids,omitempty"` + GetTransactionIDsOnly bool `json:"get_txn_ids_only,omitempty"` + TransactionID int64 `json:"txn_id,omitempty"` + UserName string `json:"username"` + Password *string `json:"password"` +} + +func (op *nmaReplicationStatusOp) updateRequestBody(hosts []string) error { + op.hostRequestBodyMap = make(map[string]string) + + for _, host := range hosts { + dataBytes, err := json.Marshal(op.nmaReplicationStatusRequestData) + if err != nil { + return fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err) + } + + op.hostRequestBodyMap[host] = string(dataBytes) + } + + return nil +} + +func (op *nmaReplicationStatusOp) setupClusterHTTPRequest(hosts []string) error { + for _, host := range hosts { + httpRequest := hostHTTPRequest{} + httpRequest.Method = PostMethod + httpRequest.buildNMAEndpoint("replicate/status") + httpRequest.RequestData = op.hostRequestBodyMap[host] + op.clusterHTTPRequest.RequestCollection[host] = httpRequest + } + + return nil +} + +func (op *nmaReplicationStatusOp) prepare(execContext *opEngineExecContext) error { + err := op.updateRequestBody(op.TargetHosts) + if err != nil { + return err + } + + execContext.dispatcher.setup(op.TargetHosts) + + return op.setupClusterHTTPRequest(op.TargetHosts) +} + +func (op *nmaReplicationStatusOp) execute(execContext *opEngineExecContext) error { + if err := op.runExecute(execContext); err != nil { + return err + } + + return op.processResult(execContext) +} + +func (op *nmaReplicationStatusOp) finalize(_ *opEngineExecContext) error { + return nil +} + +type replicationStatusResponse struct { + EndTime string `json:"end_time"` + NodeName string `json:"node_name"` + OpName string `json:"op_name"` + SentBytes int64 `json:"sent_bytes"` + StartTime string `json:"start_time"` + Status string `json:"status"` + TotalBytes int64 `json:"total_bytes"` + TransactionID int64 `json:"txn_id"` +} + +func (op *nmaReplicationStatusOp) processResult(_ *opEngineExecContext) error { + var allErrs error + + transactionIDs := mapset.NewSet[int64]() + for host, result := range op.clusterHTTPRequest.ResultCollection { + op.logResponse(host, result) + + if result.isUnauthorizedRequest() { + return fmt.Errorf("[%s] wrong certificate for NMA service on host %s", + op.name, host) + } + + if !result.isPassing() { + allErrs = errors.Join(allErrs, result.err) + continue + } + + responseObj := []replicationStatusResponse{} + err := op.parseAndCheckResponse(host, result.content, &responseObj) + if err != nil { + allErrs = errors.Join(allErrs, err) + continue + } + + // Get a list of transaction IDs. This can be used to retrieve a transaction ID when replication is started + for _, replicationStatus := range responseObj { + transactionIDs.Add(replicationStatus.TransactionID) + } + + // If we're here, we've successfully received a status from one of the target hosts. + // We don't need to check responses from other hosts as they should be the same + *op.transactionIDs = transactionIDs.ToSlice() + return nil + } + + return allErrs +} diff --git a/vclusterops/replication.go b/vclusterops/replication.go index f0dbced..8c949dc 100644 --- a/vclusterops/replication.go +++ b/vclusterops/replication.go @@ -30,6 +30,13 @@ type TargetDatabaseOptions struct { TargetPassword *string } +type ReplicationOptions struct { + TableOrSchemaName string + IncludePattern string + ExcludePattern string + TargetNamespace string +} + type VReplicationDatabaseOptions struct { /* part 1: basic db info */ DatabaseOptions @@ -39,10 +46,7 @@ type VReplicationDatabaseOptions struct { SourceTLSConfig string SandboxName string Async bool - ObjectName string - IncludePattern string - ExcludePattern string - TargetNamespace string + ReplicationOptions } func VReplicationDatabaseFactory() VReplicationDatabaseOptions { @@ -99,8 +103,8 @@ func (options *VReplicationDatabaseOptions) validateExtraOptions() error { } func (options *VReplicationDatabaseOptions) validateFineGrainedReplicationOptions() error { - if options.ObjectName != "" { - err := util.ValidateQualifiedObjectNamePattern(options.ObjectName, false) + if options.TableOrSchemaName != "" { + err := util.ValidateQualifiedObjectNamePattern(options.TableOrSchemaName, false) if err != nil { return err } @@ -191,7 +195,7 @@ func (options *VReplicationDatabaseOptions) validateAnalyzeOptions(logger vlog.P } // VReplicateDatabase can copy all table data and metadata from this cluster to another -func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOptions) error { +func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOptions) (int64, error) { /* * - Produce Instructions * - Create a VClusterOpEngine @@ -201,20 +205,22 @@ func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOpti // validate and analyze options err := options.validateAnalyzeOptions(vcc.Log) if err != nil { - return err + return 0, err } // retrieve information from the database to accurately determine the state of each node in both the main cluster and a given sandbox vdb := makeVCoordinationDatabase() err = vcc.getVDBFromRunningDBIncludeSandbox(&vdb, &options.DatabaseOptions, options.SandboxName) if err != nil { - return err + return 0, err } + asyncReplicationTransactionID := new(int64) + // produce database replication instructions - instructions, err := vcc.produceDBReplicationInstructions(options, &vdb) + instructions, err := vcc.produceDBReplicationInstructions(options, &vdb, asyncReplicationTransactionID) if err != nil { - return fmt.Errorf("fail to produce instructions, %w", err) + return 0, fmt.Errorf("fail to produce instructions, %w", err) } // create a VClusterOpEngine, and add certs to the engine @@ -229,9 +235,9 @@ func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOpti "2. set EnableConnectCredentialForwarding to True in source database using vsql " + "3. configure a Trust Authentication in target database using vsql") } - return fmt.Errorf("fail to replicate database: %w", runError) + return 0, fmt.Errorf("fail to replicate database: %w", runError) } - return nil + return *asyncReplicationTransactionID, nil } // The generated instructions will later perform the following operations necessary @@ -240,7 +246,7 @@ func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOpti // - Check Vertica versions // - Replicate database func (vcc VClusterCommands) produceDBReplicationInstructions(options *VReplicationDatabaseOptions, - vdb *VCoordinationDatabase) ([]clusterOp, error) { + vdb *VCoordinationDatabase, asyncReplicationTransactionID *int64) ([]clusterOp, error) { var instructions []clusterOp // need username for https operations in source database @@ -263,30 +269,77 @@ func (vcc VClusterCommands) produceDBReplicationInstructions(options *VReplicati vcc.Log.Info("Current target username", "username", options.TargetUserName) } - httpsDisallowMultipleNamespacesOp, err := makeHTTPSDisallowMultipleNamespacesOp(options.Hosts, - options.usePassword, options.UserName, options.Password, options.SandboxName, vdb) - if err != nil { - return instructions, err - } + initiatorTargetHost := getInitiator(options.TargetHosts) + if options.Async { + nmaHealthOp := makeNMAHealthOp(append(options.Hosts, options.TargetHosts...)) - nmaHealthOp := makeNMAHealthOp(options.Hosts) + transactionIDs := &[]int64{} - // require to have the same vertica version - nmaVerticaVersionOp := makeNMACheckVerticaVersionOp(options.Hosts, true, true /*IsEon*/) + nmaReplicationStatusData := nmaReplicationStatusRequestData{} + nmaReplicationStatusData.DBName = options.TargetDB + nmaReplicationStatusData.ExcludedTransactionIDs = []int64{} + nmaReplicationStatusData.GetTransactionIDsOnly = true + nmaReplicationStatusData.TransactionID = 0 + nmaReplicationStatusData.UserName = options.TargetUserName + nmaReplicationStatusData.Password = options.TargetPassword - initiatorTargetHost := getInitiator(options.TargetHosts) - httpsStartReplicationOp, err := makeHTTPSStartReplicationOp(options.DBName, options.Hosts, options.usePassword, - options.UserName, options.Password, targetUsePassword, &options.TargetDatabaseOptions, initiatorTargetHost, - options.SourceTLSConfig, options.SandboxName, vdb) - if err != nil { - return instructions, err + nmaReplicationStatusOp, err := makeNMAReplicationStatusOp(options.TargetHosts, targetUsePassword, + &nmaReplicationStatusData, options.SandboxName, vdb, transactionIDs) + if err != nil { + return instructions, err + } + + nmaReplicationData := nmaStartReplicationRequestData{} + nmaReplicationData.DBName = options.DBName + nmaReplicationData.ExcludePattern = options.ExcludePattern + nmaReplicationData.IncludePattern = options.IncludePattern + nmaReplicationData.TableOrSchemaName = options.TableOrSchemaName + nmaReplicationData.Username = options.UserName + nmaReplicationData.Password = options.Password + nmaReplicationData.TargetDBName = options.TargetDB + nmaReplicationData.TargetHost = initiatorTargetHost + nmaReplicationData.TargetNamespace = options.TargetNamespace + nmaReplicationData.TargetUserName = options.TargetUserName + nmaReplicationData.TargetPassword = options.TargetPassword + nmaReplicationData.TLSConfig = options.SourceTLSConfig + + nmaStartReplicationOp, err := makeNMAReplicationStartOp(options.Hosts, options.usePassword, targetUsePassword, + &nmaReplicationData, vdb) + if err != nil { + return instructions, err + } + + nmaPollReplicationStatusOp, err := makeNMAPollReplicationStatusOp(&options.TargetDatabaseOptions, targetUsePassword, + options.SandboxName, vdb, transactionIDs, asyncReplicationTransactionID) + if err != nil { + return instructions, err + } + + instructions = append(instructions, + &nmaHealthOp, + &nmaReplicationStatusOp, + &nmaStartReplicationOp, + &nmaPollReplicationStatusOp, + ) + } else { + httpsDisallowMultipleNamespacesOp, err := makeHTTPSDisallowMultipleNamespacesOp(options.Hosts, + options.usePassword, options.UserName, options.Password, options.SandboxName, vdb) + if err != nil { + return instructions, err + } + + httpsStartReplicationOp, err := makeHTTPSStartReplicationOp(options.DBName, options.Hosts, options.usePassword, + options.UserName, options.Password, targetUsePassword, &options.TargetDatabaseOptions, initiatorTargetHost, + options.SourceTLSConfig, options.SandboxName, vdb) + if err != nil { + return instructions, err + } + + instructions = append(instructions, + &httpsDisallowMultipleNamespacesOp, + &httpsStartReplicationOp, + ) } - instructions = append(instructions, - &nmaHealthOp, - &nmaVerticaVersionOp, - &httpsDisallowMultipleNamespacesOp, - &httpsStartReplicationOp, - ) return instructions, nil }