Skip to content

Commit

Permalink
Sync from server repo (7bd601b221a)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Spilchen committed Oct 13, 2023
1 parent fa4852c commit 491554f
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 60 deletions.
8 changes: 0 additions & 8 deletions vclusterops/https_check_db_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@ import (
"github.com/vertica/vcluster/vclusterops/vlog"
)

const (
OneSecond = 1
OneMinute = 60 * OneSecond
StopDBTimeout = 5 * OneMinute
StartupPollingTimeout = 5 * OneMinute
PollingInterval = 3 * OneSecond
)

type OpType int

const (
Expand Down
46 changes: 13 additions & 33 deletions vclusterops/https_poll_node_state_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"sort"
"strconv"
"time"

"github.com/vertica/vcluster/vclusterops/util"
"github.com/vertica/vcluster/vclusterops/vlog"
Expand Down Expand Up @@ -107,6 +106,10 @@ func makeHTTPSPollNodeStateOp(hosts []string,
return httpsPollNodeStateOp, nil
}

func (op *HTTPSPollNodeStateOp) getPollingTimeout() int {
return op.timeout
}

func (op *HTTPSPollNodeStateOp) setupClusterHTTPRequest(hosts []string) error {
op.clusterHTTPRequest = ClusterHTTPRequest{}
op.clusterHTTPRequest.RequestCollection = make(map[string]HostHTTPRequest)
Expand Down Expand Up @@ -147,40 +150,17 @@ func (op *HTTPSPollNodeStateOp) finalize(_ *OpEngineExecContext) error {
}

func (op *HTTPSPollNodeStateOp) processResult(execContext *OpEngineExecContext) error {
startTime := time.Now()
duration := time.Duration(op.timeout) * time.Second
count := 0
for endTime := startTime.Add(duration); ; {
if time.Now().After(endTime) {
break
}

if count > 0 {
time.Sleep(PollingInterval * time.Second)
}

shouldStopPoll, err := op.shouldStopPolling()
if err != nil {
return err
}

if shouldStopPoll {
return nil
}

if err := op.runExecute(execContext); err != nil {
return err
}

count++
err := pollState(op, execContext)
if err != nil {
// show the hosts that are not UP
sort.Strings(op.notUpHosts)
msg := fmt.Sprintf("The following hosts are not up after %d seconds: %v, details: %s",
op.timeout, op.notUpHosts, err)
vlog.LogPrintError(msg)
return errors.New(msg)
}

// show the hosts that are not UP
sort.Strings(op.notUpHosts)
msg := fmt.Sprintf("The following hosts are not up after %d seconds: %v",
op.timeout, op.notUpHosts)
vlog.LogPrintError(msg)
return errors.New(msg)
return nil
}

// the following structs only hosts necessary information for this op
Expand Down
164 changes: 164 additions & 0 deletions vclusterops/https_poll_subscription_state_op.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
(c) Copyright [2023] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vclusterops

import (
"fmt"

"github.com/vertica/vcluster/vclusterops/util"
"github.com/vertica/vcluster/vclusterops/vlog"
)

type httpsPollSubscriptionStateOp struct {
OpBase
OpHTTPSBase
timeout int
}

func makeHTTPSPollSubscriptionStateOp(log vlog.Printer, hosts []string,
useHTTPPassword bool, userName string, httpsPassword *string) (httpsPollSubscriptionStateOp, error) {
op := httpsPollSubscriptionStateOp{}
op.name = "HTTPSPollSubscriptionStateOp"
op.log = log.WithName(op.name)
op.hosts = hosts
op.useHTTPPassword = useHTTPPassword
op.timeout = StartupPollingTimeout

err := util.ValidateUsernameAndPassword(op.name, useHTTPPassword, userName)
if err != nil {
return op, err
}
op.userName = userName
op.httpsPassword = httpsPassword

return op, nil
}

func (op *httpsPollSubscriptionStateOp) getPollingTimeout() int {
return op.timeout
}

func (op *httpsPollSubscriptionStateOp) setupClusterHTTPRequest(hosts []string) error {
op.clusterHTTPRequest = ClusterHTTPRequest{}
op.clusterHTTPRequest.RequestCollection = make(map[string]HostHTTPRequest)
op.setVersionToSemVar()

for _, host := range hosts {
httpRequest := HostHTTPRequest{}
httpRequest.Method = GetMethod
httpRequest.Timeout = httpRequestTimeoutSeconds
httpRequest.BuildHTTPSEndpoint("subscriptions")
if op.useHTTPPassword {
httpRequest.Password = op.httpsPassword
httpRequest.Username = op.userName
}

op.clusterHTTPRequest.RequestCollection[host] = httpRequest
}

return nil
}

func (op *httpsPollSubscriptionStateOp) prepare(execContext *OpEngineExecContext) error {
execContext.dispatcher.Setup(op.hosts)

return op.setupClusterHTTPRequest(op.hosts)
}

func (op *httpsPollSubscriptionStateOp) execute(execContext *OpEngineExecContext) error {
if err := op.runExecute(execContext); err != nil {
return err
}

return op.processResult(execContext)
}

func (op *httpsPollSubscriptionStateOp) finalize(_ *OpEngineExecContext) error {
return nil
}

// The content of SubscriptionMap should look like
/* "subscription_list": [
{
"node_name": "v_practice_db_node0001",
"shard_name": "replica",
"subscription_state": "ACTIVE",
"is_primary": true
},
{
"node_name": "v_practice_db_node0001",
"shard_name": "segment0001",
"subscription_state": "ACTIVE",
"is_primary": true
},
...
]
*/
type SubscriptionList struct {
SubscriptionList []SubscriptionInfo `json:"subscription_list"`
}

type SubscriptionInfo struct {
Nodename string `json:"node_name"`
ShardName string `json:"shard_name"`
SubscriptionState string `json:"subscription_state"`
IsPrimary bool `json:"is_primary"`
}

func (op *httpsPollSubscriptionStateOp) processResult(execContext *OpEngineExecContext) error {
err := pollState(op, execContext)
if err != nil {
return fmt.Errorf("not all subscriptions are ACTIVE, %w", err)
}

return nil
}

func (op *httpsPollSubscriptionStateOp) shouldStopPolling() (bool, error) {
var subscriptionList SubscriptionList

for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)

if result.IsPasswordAndCertificateError() {
return true, fmt.Errorf("[%s] wrong password/certificate for https service on host %s",
op.name, host)
}

if result.isPassing() {
err := op.parseAndCheckResponse(host, result.content, &subscriptionList)
if err != nil {
op.log.PrintError("[%s] fail to parse result on host %s, details: %s",
op.name, host, err)
return true, err
}

// check whether all subscriptions are ACTIVE
for _, s := range subscriptionList.SubscriptionList {
if s.SubscriptionState != "ACTIVE" {
return false, nil
}
}

op.log.PrintInfo("All subscriptions are ACTIVE\n")
return true, nil
}
}

// this could happen if ResultCollection is empty
op.log.PrintError("[%s] empty result received from the provided hosts %v", op.name, op.hosts)
return false, nil
}
48 changes: 29 additions & 19 deletions vclusterops/remove_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (vcc *VClusterCommands) VRemoveNode(options *VRemoveNodeOptions) (VCoordina
return vdb, err
}

instructions, err := produceRemoveNodeInstructions(&vdb, options)
instructions, err := vcc.produceRemoveNodeInstructions(&vdb, options)
if err != nil {
vlog.LogPrintError("failed to produce remove node instructions, %s", err)
return vdb, err
Expand Down Expand Up @@ -235,12 +235,13 @@ func (o *VRemoveNodeOptions) completeVDBSetting(vdb *VCoordinationDatabase) erro
// - Update ksafety if needed
// - Mark nodes to remove as ephemeral
// - Rebalance cluster for Enterprise mode, rebalance shards for Eon mode
// - Poll subscription state, wait for all subscrptions ACTIVE for Eon mode
// - Remove secondary nodes from spread
// - Drop Nodes
// - Reload spread
// - Delete catalog and data directories
// - Sync catalog (eon only)
func produceRemoveNodeInstructions(vdb *VCoordinationDatabase, options *VRemoveNodeOptions) ([]ClusterOp, error) {
func (vcc *VClusterCommands) produceRemoveNodeInstructions(vdb *VCoordinationDatabase, options *VRemoveNodeOptions) ([]ClusterOp, error) {
var instructions []ClusterOp

var initiatorHost []string
Expand Down Expand Up @@ -269,12 +270,21 @@ func produceRemoveNodeInstructions(vdb *VCoordinationDatabase, options *VRemoveN
// contains the hosts to remove.
v := vdb.Copy(options.HostsToRemove)
if vdb.IsEon {
// We pass the set of subclusters of the nodes to remove.
// we pass the set of subclusters of the nodes to remove.
err = produceRebalanceSubclusterShardsOps(&instructions, initiatorHost, v.getSCNames(),
usePassword, username, password)
if err != nil {
return instructions, err
}

// for Eon DB, we check whethter all subscriptions are ACTIVE
// after rebalance shards
httpsPollSubscriptionStateOp, e := makeHTTPSPollSubscriptionStateOp(vcc.Log, initiatorHost,
usePassword, username, password)
if e != nil {
return instructions, e
}
instructions = append(instructions, &httpsPollSubscriptionStateOp)
} else {
var httpsRebalanceClusterOp HTTPSRebalanceClusterOp
httpsRebalanceClusterOp, err = makeHTTPSRebalanceClusterOp(initiatorHost, usePassword, username,
Expand Down Expand Up @@ -322,22 +332,6 @@ func produceRemoveNodeInstructions(vdb *VCoordinationDatabase, options *VRemoveN
return instructions, nil
}

// produceRebalanceSubclusterShardsOps gets a slice of subclusters and for each of them
// produces an HTTPSRebalanceSubclusterShardsOp.
func produceRebalanceSubclusterShardsOps(instructions *[]ClusterOp, initiatorHost, scNames []string,
useHTTPPassword bool, userName string, httpsPassword *string) error {
for _, scName := range scNames {
op, err := makeHTTPSRebalanceSubclusterShardsOp(
initiatorHost, useHTTPPassword, userName, httpsPassword, scName)
if err != nil {
return err
}
*instructions = append(*instructions, &op)
}

return nil
}

// produceMarkEphemeralNodeOps gets a slice of target hosts and for each of them
// produces an HTTPSMarkEphemeralNodeOp.
func produceMarkEphemeralNodeOps(instructions *[]ClusterOp, targetHosts, hosts []string,
Expand All @@ -354,6 +348,22 @@ func produceMarkEphemeralNodeOps(instructions *[]ClusterOp, targetHosts, hosts [
return nil
}

// produceRebalanceSubclusterShardsOps gets a slice of subclusters and for each of them
// produces an HTTPSRebalanceSubclusterShardsOp.
func produceRebalanceSubclusterShardsOps(instructions *[]ClusterOp, initiatorHost, scNames []string,
useHTTPPassword bool, userName string, httpsPassword *string) error {
for _, scName := range scNames {
op, err := makeHTTPSRebalanceSubclusterShardsOp(
initiatorHost, useHTTPPassword, userName, httpsPassword, scName)
if err != nil {
return err
}
*instructions = append(*instructions, &op)
}

return nil
}

// produceDropNodeOps produces an HTTPSDropNodeOp for each node to drop.
// This is because we must drop node one by one to avoid losing quorum.
func produceDropNodeOps(instructions *[]ClusterOp, targetHosts, hosts []string,
Expand Down
Loading

0 comments on commit 491554f

Please sign in to comment.