Skip to content

Commit

Permalink
Sync from server repo (ee6359dd08d)
Browse files Browse the repository at this point in the history
  • Loading branch information
releng committed Sep 25, 2024
1 parent 837a6c4 commit 8726cf9
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 44 deletions.
50 changes: 29 additions & 21 deletions commands/vcluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,30 +233,11 @@ func readVDBToDBConfig(vdb *vclusterops.VCoordinationDatabase) (DatabaseConfig,
if !ok {
return dbConfig, fmt.Errorf("cannot find host %s from HostNodeMap", host)
}
nodeConfig := NodeConfig{}
nodeConfig.Name = vnode.Name
nodeConfig.Address = vnode.Address
nodeConfig.Subcluster = vnode.Subcluster
nodeConfig.Sandbox = vnode.Sandbox

if vdb.CatalogPrefix == "" {
nodeConfig.CatalogPath = vnode.CatalogPath
} else {
nodeConfig.CatalogPath = vdb.GenCatalogPath(vnode.Name)
}
if vdb.DataPrefix == "" && len(vnode.StorageLocations) > 0 {
nodeConfig.DataPath = vnode.StorageLocations[0]
} else {
nodeConfig.DataPath = vdb.GenDataPath(vnode.Name)
}
if vdb.IsEon && vdb.DepotPrefix == "" {
nodeConfig.DepotPath = vnode.DepotPath
} else if vdb.DepotPrefix != "" {
nodeConfig.DepotPath = vdb.GenDepotPath(vnode.Name)
}

nodeConfig := buildNodeConfig(vnode, vdb)
dbConfig.Nodes = append(dbConfig.Nodes, &nodeConfig)
}

dbConfig.IsEon = vdb.IsEon
dbConfig.CommunalStorageLocation = vdb.CommunalStorageLocation
dbConfig.Ipv6 = vdb.Ipv6
Expand All @@ -266,6 +247,33 @@ func readVDBToDBConfig(vdb *vclusterops.VCoordinationDatabase) (DatabaseConfig,
return dbConfig, nil
}

func buildNodeConfig(vnode *vclusterops.VCoordinationNode,
vdb *vclusterops.VCoordinationDatabase) NodeConfig {
nodeConfig := NodeConfig{}
nodeConfig.Name = vnode.Name
nodeConfig.Address = vnode.Address
nodeConfig.Subcluster = vnode.Subcluster
nodeConfig.Sandbox = vnode.Sandbox

if vdb.CatalogPrefix == "" {
nodeConfig.CatalogPath = vnode.CatalogPath
} else {
nodeConfig.CatalogPath = vdb.GenCatalogPath(vnode.Name)
}
if vdb.DataPrefix == "" && len(vnode.StorageLocations) > 0 {
nodeConfig.DataPath = vnode.StorageLocations[0]
} else {
nodeConfig.DataPath = vdb.GenDataPath(vnode.Name)
}
if vdb.IsEon && vdb.DepotPrefix == "" {
nodeConfig.DepotPath = vnode.DepotPath
} else if vdb.DepotPrefix != "" {
nodeConfig.DepotPath = vdb.GenDepotPath(vnode.Name)
}

return nodeConfig
}

// read reads information from configFilePath to a DatabaseConfig object.
// It returns any read error encountered.
func readConfig() (dbConfig *DatabaseConfig, err error) {
Expand Down
8 changes: 8 additions & 0 deletions vclusterops/coordinator_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type VCoordinationDatabase struct {
CatalogPrefix string
DataPrefix string
HostNodeMap vHostNodeMap
UnboundNodes []*VCoordinationNode
// for convenience
HostList []string // expected to be resolved IP addresses

Expand Down Expand Up @@ -138,6 +139,13 @@ func (vdb *VCoordinationDatabase) setFromCreateDBOptions(options *VCreateDatabas
// addNode adds a given host to the VDB's HostList and HostNodeMap.
// Duplicate host will not be added.
func (vdb *VCoordinationDatabase) addNode(vnode *VCoordinationNode) error {
// unbound nodes have the same 0.0.0.0 address,
// so we add them into the UnboundedNodes list
if vnode.Address == util.UnboundedIPv4 || vnode.Address == util.UnboundedIPv6 {
vdb.UnboundNodes = append(vdb.UnboundNodes, vnode)
return nil
}

if _, exist := vdb.HostNodeMap[vnode.Address]; exist {
return fmt.Errorf("host %s has already been in the VDB's HostList", vnode.Address)
}
Expand Down
16 changes: 16 additions & 0 deletions vclusterops/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,19 @@ func (vcc *VClusterCommands) getUnreachableHosts(options *DatabaseOptions, hosts
type nmaGenericJSONResponse struct {
RespStr string
}

// extractCatalogPrefix extracts the catalog prefix from a node's catalog path.
// This function takes the full catalog path, database name, and node name as
// input parameters, and returns the catalog prefix along with a boolean indicating
// whether the extraction was successful.
func extractCatalogPrefix(catalogPath, dbName, nodeName string) (string, bool) {
catalogSuffix := "/" + dbName + "/" + nodeName + "_catalog/Catalog"
// if catalog suffix matches catalog path, it means we created the catalog in the root path
if catalogPath == catalogSuffix {
return "/", true
}
if !strings.HasSuffix(catalogPath, catalogSuffix) {
return "", false
}
return strings.TrimSuffix(catalogPath, catalogSuffix), true
}
35 changes: 35 additions & 0 deletions vclusterops/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,38 @@ func TestValidateHostMap(t *testing.T) {
err = validateHostMaps(threeHosts, oneMap, twoMap)
assert.Error(t, err)
}

func TestExtractCatalogPrefix(t *testing.T) {
// positive cases
catalogPath := "/verticadb/vertica01/vertica/v_vertica_node0001_catalog/Catalog"
dbName := "vertica"
nodeName := "v_vertica_node0001"

expected := "/verticadb/vertica01"
catalogPrefix, found := extractCatalogPrefix(catalogPath, dbName, nodeName)
assert.True(t, found)
assert.Equal(t, catalogPrefix, expected)

catalogPath = "/catalog/test/v_test_node0001_catalog/Catalog"
dbName = "test"
nodeName = "v_test_node0001"

expected = "/catalog"
catalogPrefix, found = extractCatalogPrefix(catalogPath, dbName, nodeName)
assert.True(t, found)
assert.Equal(t, catalogPrefix, expected)

catalogPath = "/test/v_test_node0001_catalog/Catalog"
expected = "/"
catalogPrefix, found = extractCatalogPrefix(catalogPath, dbName, nodeName)
assert.True(t, found)
assert.Equal(t, catalogPrefix, expected)

// negative case
catalogPath = "/catalog/test/v_test_node0001_catalog/Catalog/test"

expected = ""
catalogPrefix, found = extractCatalogPrefix(catalogPath, dbName, nodeName)
assert.False(t, found)
assert.Equal(t, catalogPrefix, expected)
}
47 changes: 26 additions & 21 deletions vclusterops/https_get_nodes_info_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package vclusterops
import (
"errors"
"fmt"
"strings"

"github.com/vertica/vcluster/rfc7807"
"github.com/vertica/vcluster/vclusterops/util"
Expand Down Expand Up @@ -140,41 +139,30 @@ func (op *httpsGetNodesInfoOp) processResult(_ *opEngineExecContext) error {
// save nodes info to vdb
op.vdb.HostNodeMap = makeVHostNodeMap()
op.vdb.HostList = []string{}
op.vdb.PrimaryUpNodes = []string{}
op.vdb.UnboundNodes = []*VCoordinationNode{}
for _, node := range nodesStates.NodeList {
if node.Database != op.dbName {
err = fmt.Errorf(`[%s] database %s is running on host %s, rather than database %s`, op.name, node.Database, host, op.dbName)
allErrs = errors.Join(allErrs, err)
return appendHTTPSFailureError(allErrs)
}
vNode := makeVCoordinationNode()
vNode.Name = node.Name
vNode.Address = node.Address
vNode.CatalogPath = node.CatalogPath
vNode.DepotPath = node.DepotPath
vNode.StorageLocations = node.StorageLocations
vNode.IsPrimary = node.IsPrimary
vNode.State = node.State
vNode.Subcluster = node.Subcluster
vNode.Sandbox = node.Sandbox
vNode.IsControlNode = node.IsControlNode
vNode.ControlNode = node.ControlNode
vnode := buildVnodeFromNodeStateInfo(node)
if node.IsPrimary && node.State == util.NodeUpState {
op.vdb.PrimaryUpNodes = append(op.vdb.PrimaryUpNodes, node.Address)
}
err := op.vdb.addNode(&vNode)
err := op.vdb.addNode(&vnode)
if err != nil {
allErrs = errors.Join(allErrs, err)
return appendHTTPSFailureError(allErrs)
}
// extract catalog prefix from node's catalog path
// catalog prefix is preceding db name
dbPath := "/" + node.Database
index := strings.Index(node.CatalogPath, dbPath)
if index == -1 {
op.logger.PrintWarning("[%s] failed to get catalog prefix because catalog path %s does not contain database name %s",
op.name, node.CatalogPath, node.Database)
catalogPrefix, found := extractCatalogPrefix(node.CatalogPath, node.Database, node.Name)
if !found {
op.logger.PrintError("[%s] failed to retrieve catalog prefix because catalog path %q is malformed",
op.name, node.CatalogPath)
}
op.vdb.CatalogPrefix = node.CatalogPath[:index]
op.vdb.CatalogPrefix = catalogPrefix
}

return nil
Expand All @@ -187,3 +175,20 @@ func (op *httpsGetNodesInfoOp) processResult(_ *opEngineExecContext) error {
func (op *httpsGetNodesInfoOp) finalize(_ *opEngineExecContext) error {
return nil
}

func buildVnodeFromNodeStateInfo(node *nodeStateInfo) VCoordinationNode {
vnode := makeVCoordinationNode()
vnode.Name = node.Name
vnode.Address = node.Address
vnode.CatalogPath = node.CatalogPath
vnode.DepotPath = node.DepotPath
vnode.StorageLocations = node.StorageLocations
vnode.IsPrimary = node.IsPrimary
vnode.State = node.State
vnode.Subcluster = node.Subcluster
vnode.Sandbox = node.Sandbox
vnode.IsControlNode = node.IsControlNode
vnode.ControlNode = node.ControlNode

return vnode
}
18 changes: 17 additions & 1 deletion vclusterops/nma_download_file_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type ReviveDBNodeCountMismatchError struct {

func (e *ReviveDBNodeCountMismatchError) Error() string {
return fmt.Sprintf(`[%s] nodes mismatch found on host %s: the number of the new nodes in --hosts is %d,`+
` but the number of the old nodes in description file is %d`,
` but the number of primary nodes in the description file is %d`,
e.ReviveDBStep, e.FailureHost, e.NumOfNewNodes, e.NumOfOldNodes)
}

Expand Down Expand Up @@ -259,6 +259,12 @@ func (op *nmaDownloadFileOp) processResult(execContext *opEngineExecContext) err
return nil
}

// if users provide a subset of nodes for reviving,
// we assume users intend to revive to primary subclusters
if len(descFileContent.NodeList) > len(op.newNodes) {
filterPrimaryNodes(&descFileContent)
}

if len(descFileContent.NodeList) != len(op.newNodes) {
err := &ReviveDBNodeCountMismatchError{
ReviveDBStep: op.name,
Expand All @@ -282,6 +288,16 @@ func (op *nmaDownloadFileOp) processResult(execContext *opEngineExecContext) err
return appendHTTPSFailureError(allErrs)
}

func filterPrimaryNodes(descFileContent *fileContent) {
var updatedFileContent fileContent
for _, node := range descFileContent.NodeList {
if node.IsPrimary {
updatedFileContent.NodeList = append(updatedFileContent.NodeList, node)
}
}
descFileContent.NodeList = updatedFileContent.NodeList
}

// buildVDBFromClusterConfig can build a vdb using cluster_config.json
func (op *nmaDownloadFileOp) buildVDBFromClusterConfig(descFileContent fileContent) error {
op.vdb.HostNodeMap = makeVHostNodeMap()
Expand Down
6 changes: 6 additions & 0 deletions vclusterops/re_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ func (options *VReIPOptions) validateAnalyzeOptions(logger vlog.Printer) error {
nodeAddresses := make(map[string]struct{})
for _, info := range options.ReIPList {
// the addresses must be valid IPs
if info.NodeAddress != "" {
if info.NodeAddress == util.UnboundedIPv4 || info.NodeAddress == util.UnboundedIPv6 {
return errors.New("the re-ip list should not contain unbound addresses")
}
}

if err := util.AddressCheck(info.TargetAddress, ipv6); err != nil {
return err
}
Expand Down
18 changes: 17 additions & 1 deletion vclusterops/start_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,6 @@ func (options *VStartNodesOptions) separateHostsBasedOnReIPNeed(
if oldIP != newIP {
startNodeInfo.ReIPList = append(startNodeInfo.ReIPList, newIP)
startNodeInfo.NodeNamesToStart = append(startNodeInfo.NodeNamesToStart, nodename)
logger.Info("the nodes need to be re-IP", "nodeNames", startNodeInfo.NodeNamesToStart, "IPs", startNodeInfo.ReIPList)
} else {
vnode, ok := vdb.HostNodeMap[newIP]
if ok && vnode.State == util.NodeDownState {
Expand All @@ -545,5 +544,22 @@ func (options *VStartNodesOptions) separateHostsBasedOnReIPNeed(
}
}

// handle unbound nodes
// some of the unbound nodes may need to re-ip
for _, vnode := range vdb.UnboundNodes {
if newIP, exists := options.Nodes[vnode.Name]; exists {
startNodeInfo.ReIPList = append(startNodeInfo.ReIPList, newIP)
startNodeInfo.NodeNamesToStart = append(startNodeInfo.NodeNamesToStart, vnode.Name)
logger.DisplayInfo("the unbound node (%s) needs to change its IP to %s", vnode.Name, newIP)

sortedHosts = append(sortedHosts, newIP)
}
}

// log nodes that need to re-ip
if len(startNodeInfo.NodeNamesToStart) > 0 {
logger.Info("the nodes need to be re-IP", "nodeNames", startNodeInfo.NodeNamesToStart, "IPs", startNodeInfo.ReIPList)
}

return sortedHosts, nil
}
19 changes: 19 additions & 0 deletions vclusterops/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package util

import (
"encoding/json"
"errors"
"flag"
"fmt"
"io/fs"
Expand Down Expand Up @@ -87,6 +88,17 @@ const (
objectNameUnsupportedCharacters = `=<>'^\".@?#&/:;{}()[] \~!%+|,` + "`$"
)

const (
// Unbound nodes are the nodes in catalog but without IP assigned.
// These nodes can come from the following scenario:
// - a database has primary and secondary nodes
// - users run revive_db to the primary nodes only
// - the secondary nodes become "unbound nodes" after this revive
// - users need to run start_node with re-ip to bring the unbound nodes up
UnboundedIPv4 = "0.0.0.0"
UnboundedIPv6 = "0:0:0:0:0:0:0:0"
)

// NmaSecretLookup retrieves kubernetes secrets.
func NmaSecretLookup(f FetchAllEnvVars) {
k8port, _ := os.LookupEnv(kubernetesPort)
Expand Down Expand Up @@ -281,6 +293,10 @@ func AddressCheck(address string, ipv6 bool) error {
return fmt.Errorf("%s in the re-ip file is not a valid %s address", address, ipVersion)
}

if address == UnboundedIPv4 || address == UnboundedIPv6 {
return errors.New("the re-ip list should not contain unbound addresses")
}

return nil
}

Expand Down Expand Up @@ -350,6 +366,9 @@ func ResolveRawHostsToAddresses(rawHosts []string, ipv6 bool) ([]string, error)
if host == "" {
return hostAddresses, fmt.Errorf("invalid empty host found in the provided host list")
}
if host == UnboundedIPv4 || host == UnboundedIPv6 {
return hostAddresses, fmt.Errorf("ambiguous host address (%s) is used", host)
}
addr, err := ResolveToOneIP(host, ipv6)
if err != nil {
return hostAddresses, err
Expand Down

0 comments on commit 8726cf9

Please sign in to comment.