Skip to content

Commit

Permalink
Sync from server repo (d338d028bb4)
Browse files Browse the repository at this point in the history
  • Loading branch information
roypaulin committed May 29, 2024
1 parent 4d33ccd commit 4a93e3b
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 80 deletions.
1 change: 0 additions & 1 deletion vclusterops/cluster_op_engine_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type opEngineExecContext struct {
dbInfo string // store the db info that retrieved from communal storage
restorePoints []RestorePoint // store list existing restore points that queried from an archive
systemTableList systemTableListInfo // used for staging system tables

// hosts on which the wrong authentication occurred
hostsWithWrongAuth []string
}
Expand Down
24 changes: 24 additions & 0 deletions vclusterops/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,30 @@ func getInitiatorHostInCluster(name, sandbox, scname string, vdb *VCoordinationD
return initiatorHost, nil
}

// getInitiatorHostForReplication returns an initiator that is the first up source host in the main cluster
// or a sandbox
func getInitiatorHostForReplication(name, sandbox string, hosts []string, vdb *VCoordinationDatabase) ([]string, error) {
// source hosts will be :
// 1. up hosts from the main subcluster if the sandbox is empty
// 2. up hosts from the sandbox if the sandbox is specified
var sourceHosts []string
for _, node := range vdb.HostNodeMap {
if node.State != util.NodeDownState && node.Sandbox == sandbox {
sourceHosts = append(sourceHosts, node.Address)
}
}
sourceHosts = util.SliceCommon(hosts, sourceHosts)
if len(sourceHosts) == 0 {
if sandbox == "" {
return nil, fmt.Errorf("[%s] cannot find any up hosts from source database", name)
}
return nil, fmt.Errorf("[%s] cannot find any up hosts in the sandbox %s", name, sandbox)
}

initiatorHost := []string{getInitiator(sourceHosts)}
return initiatorHost, nil
}

// getVDBFromRunningDB will retrieve db configurations from a non-sandboxed host by calling https endpoints of a running db
func (vcc VClusterCommands) getVDBFromRunningDB(vdb *VCoordinationDatabase, options *DatabaseOptions) error {
return vcc.getVDBFromRunningDBImpl(vdb, options, false, util.MainClusterSandbox)
Expand Down
29 changes: 29 additions & 0 deletions vclusterops/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,35 @@ func TestForgetInitiatorHost(t *testing.T) {
assert.Equal(t, initiatorHost, "")
}

func TestForGetSourceHostForReplication(t *testing.T) {
mockHostNodeMap := map[string]*VCoordinationNode{
"192.168.1.101": {Address: "192.168.1.101", State: "UP", Sandbox: "sand"},
"192.168.1.102": {Address: "192.168.1.102", State: "UP", Sandbox: "sand"},
"192.168.1.103": {Address: "192.168.1.103", State: "UP", Sandbox: ""},
"192.168.1.104": {Address: "192.168.1.104", State: "UP", Sandbox: ""},
}

// successfully find source hosts from sandbox sand
vdb := VCoordinationDatabase{HostNodeMap: mockHostNodeMap}
hosts := []string{"192.168.1.102"}
sourceHosts, err := getInitiatorHostForReplication("", "sand", hosts, &vdb)
assert.NoError(t, err)
assert.Equal(t, sourceHosts, hosts)

// successfully find source hosts from main cluster
vdb = VCoordinationDatabase{HostNodeMap: mockHostNodeMap}
hosts = []string{"192.168.1.103"}
sourceHosts, err = getInitiatorHostForReplication("", "", hosts, &vdb)
assert.NoError(t, err)
assert.Equal(t, sourceHosts, hosts)

// unable to find any up hosts from main cluster
vdb = VCoordinationDatabase{HostNodeMap: mockHostNodeMap}
hosts = []string{}
_, err = getInitiatorHostForReplication("", "", hosts, &vdb)
assert.ErrorContains(t, err, "cannot find any up hosts from source database")
}

func TestForgetCatalogPath(t *testing.T) {
nodeName := "v_vertdb_node0001"
fullPath := fmt.Sprintf("/data/vertdb/%s_catalog/Catalog", nodeName)
Expand Down
157 changes: 157 additions & 0 deletions vclusterops/https_disallow_multiple_namespaces_op.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
(c) Copyright [2023-2024] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vclusterops

import (
"errors"
"fmt"

"github.com/vertica/vcluster/vclusterops/util"
)

type httpsDisallowMultipleNamespacesOp struct {
opBase
opHTTPSBase
sandbox string
vdb *VCoordinationDatabase
}

func makeHTTPSDisallowMultipleNamespacesOp(hosts []string, useHTTPPassword bool,
userName string, httpsPassword *string, sandbox string, vdb *VCoordinationDatabase) (httpsDisallowMultipleNamespacesOp, error) {
op := httpsDisallowMultipleNamespacesOp{}
op.name = "HTTPSGetNamespaceOp"
op.description = "Get information about all namespaces"
op.hosts = hosts
op.useHTTPPassword = useHTTPPassword
op.sandbox = sandbox
op.vdb = vdb

if useHTTPPassword {
err := util.ValidateUsernameAndPassword(op.name, useHTTPPassword, userName)
if err != nil {
return op, err
}

op.userName = userName
op.httpsPassword = httpsPassword
}

return op, nil
}

func (op *httpsDisallowMultipleNamespacesOp) setupClusterHTTPRequest(hosts []string) error {
for _, host := range hosts {
httpRequest := hostHTTPRequest{}
httpRequest.Method = GetMethod
httpRequest.buildHTTPSEndpoint("namespaces")
if op.useHTTPPassword {
httpRequest.Password = op.httpsPassword
httpRequest.Username = op.userName
}
op.clusterHTTPRequest.RequestCollection[host] = httpRequest
}

return nil
}

func (op *httpsDisallowMultipleNamespacesOp) prepare(execContext *opEngineExecContext) error {
sourceHost, err := getInitiatorHostForReplication(op.name, op.sandbox, op.hosts, op.vdb)
if err != nil {
return err
}
// hosts to be used to make the request should be an up host from source database or sandbox
op.hosts = sourceHost
execContext.dispatcher.setup(op.hosts)

return op.setupClusterHTTPRequest(op.hosts)
}

func (op *httpsDisallowMultipleNamespacesOp) execute(execContext *opEngineExecContext) error {
if err := op.runExecute(execContext); err != nil {
return err
}

return op.processResult(execContext)
}

type namespace struct {
NamespaceID int `json:"namespace_id"`
NamespaceName string `json:"namespace_name"`
IsDefault bool `json:"is_default"`
DefaultShardCount int `json:"default_shard_count"`
}

type namespaceListResponse struct {
NamespaceList []namespace `json:"namespace_list"`
}

func (op *httpsDisallowMultipleNamespacesOp) processResult(_ *opEngineExecContext) error {
var allErrs error

for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)

if result.isUnauthorizedRequest() {
// skip checking response from other nodes because we will get the same error there
return result.err
}
if !result.isPassing() {
allErrs = errors.Join(allErrs, result.err)
// try processing other hosts' responses when the current host has some server errors
continue
}

// decode the json-format response
// The successful response object will be a dictionary:
/*
{
"namespace_list": [
{
"namespace_id": NAMESPACE_ID,
"namespace_name": "default_namespace",
"is_default": true,
"default_shard_count": 4
},
{
"namespace_id": NAMESPACE_ID,
"namespace_name": "test_ns",
"is_default": false,
"default_shard_count": 6
}
]
}
*/
namespaceResponse := namespaceListResponse{}
err := op.parseAndCheckResponse(host, result.content, &namespaceResponse)
if err != nil {
allErrs = errors.Join(allErrs, err)
continue
}
// check if the response contains multiple namespaces
if len(namespaceResponse.NamespaceList) > 1 {
err := fmt.Errorf("[%s] replication is not supported in multiple namespace database", op.name)
allErrs = errors.Join(allErrs, err)
return allErrs
}
return nil
}

return allErrs
}

func (op *httpsDisallowMultipleNamespacesOp) finalize(_ *opEngineExecContext) error {
return nil
}
31 changes: 9 additions & 22 deletions vclusterops/https_start_replication_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ type httpsStartReplicationOp struct {
targetUserName string
targetPassword *string
tlsConfig string
vdb *VCoordinationDatabase
}

func makeHTTPSStartReplicationOp(dbName string, sourceHosts []string,
sourceUseHTTPPassword bool, sourceUserName string,
sourceHTTPPassword *string, targetUseHTTPPassword bool, targetDB, targetUserName, targetHosts string,
targetHTTPSPassword *string, tlsConfig, sandbox string) (httpsStartReplicationOp, error) {
targetHTTPSPassword *string, tlsConfig, sandbox string, vdb *VCoordinationDatabase) (httpsStartReplicationOp, error) {
op := httpsStartReplicationOp{}
op.name = "HTTPSStartReplicationOp"
op.description = "Start database replication"
Expand All @@ -50,6 +51,7 @@ func makeHTTPSStartReplicationOp(dbName string, sourceHosts []string,
op.targetHosts = targetHosts
op.tlsConfig = tlsConfig
op.sandbox = sandbox
op.vdb = vdb

if sourceUseHTTPPassword {
err := util.ValidateUsernameAndPassword(op.name, sourceUseHTTPPassword, sourceUserName)
Expand Down Expand Up @@ -117,29 +119,14 @@ func (op *httpsStartReplicationOp) setupClusterHTTPRequest(hosts []string) error
}

func (op *httpsStartReplicationOp) prepare(execContext *opEngineExecContext) error {
if len(execContext.nodesInfo) == 0 {
return fmt.Errorf(`[%s] cannot find any hosts in OpEngineExecContext`, op.name)
}
// source hosts will be :
// 1. up hosts from the main subcluster if the sandbox is empty
// 2. up hosts from the sandbox if the sandbox is specified
var sourceHosts []string
for _, node := range execContext.nodesInfo {
if node.State != util.NodeDownState && node.Sandbox == op.sandbox {
sourceHosts = append(sourceHosts, node.Address)
}
}
sourceHosts = util.SliceCommon(op.hosts, sourceHosts)
if len(sourceHosts) == 0 {
if op.sandbox == "" {
return fmt.Errorf("[%s] cannot find any up hosts from source database %s", op.name, op.sourceDB)
}
return fmt.Errorf("[%s] cannot find any up hosts in the sandbox %s", op.name, op.sandbox)
sourceHost, err := getInitiatorHostForReplication(op.name, op.sandbox, op.hosts, op.vdb)
if err != nil {
return err
}
// use first up host to execute https post request
op.hosts = sourceHost

op.hosts = []string{sourceHosts[0]}

err := op.setupRequestBody(op.hosts)
err = op.setupRequestBody(op.hosts)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 4a93e3b

Please sign in to comment.