Skip to content

Commit

Permalink
refactor etcd restoration process
Browse files Browse the repository at this point in the history
  • Loading branch information
galal-hussein authored and Alena Prokharchyk committed Nov 30, 2018
1 parent d5a5fd5 commit ff4c93e
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 63 deletions.
49 changes: 19 additions & 30 deletions cluster/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ package cluster
import (
"context"
"fmt"
"path"

"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/rke/services"
"github.com/rancher/types/apis/management.cattle.io/v3"
)

func (c *Cluster) SnapshotEtcd(ctx context.Context, snapshotName string) error {
Expand All @@ -21,11 +18,8 @@ func (c *Cluster) SnapshotEtcd(ctx context.Context, snapshotName string) error {
}

func (c *Cluster) RestoreEtcdSnapshot(ctx context.Context, snapshotPath string) error {
// Stopping all etcd containers
for _, host := range c.EtcdHosts {
if err := tearDownOldEtcd(ctx, host, c.SystemImages.Alpine, c.PrivateRegistriesMap); err != nil {
return err
}
if isEqual := c.etcdSnapshotChecksum(ctx, snapshotPath); !isEqual {
return fmt.Errorf("etcd snapshots are not consistent")
}
// Start restore process on all etcd hosts
initCluster := services.GetEtcdInitialCluster(c.EtcdHosts)
Expand All @@ -34,30 +28,25 @@ func (c *Cluster) RestoreEtcdSnapshot(ctx context.Context, snapshotPath string)
return fmt.Errorf("[etcd] Failed to restore etcd snapshot: %v", err)
}
}
// Deploy Etcd Plane
etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
// Build etcd node plan map
for _, etcdHost := range c.EtcdHosts {
etcdNodePlanMap[etcdHost.Address] = BuildRKEConfigNodePlan(ctx, c, etcdHost, etcdHost.DockerInfo)
}
etcdRollingSnapshots := services.EtcdSnapshot{
Snapshot: c.Services.Etcd.Snapshot,
Creation: c.Services.Etcd.Creation,
Retention: c.Services.Etcd.Retention,
}
if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdNodePlanMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap, c.UpdateWorkersOnly, c.SystemImages.Alpine, etcdRollingSnapshots); err != nil {
return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err)
}
return nil
}

func tearDownOldEtcd(ctx context.Context, host *hosts.Host, cleanupImage string, prsMap map[string]v3.PrivateRegistry) error {
if err := docker.DoRemoveContainer(ctx, host.DClient, services.EtcdContainerName, host.Address); err != nil {
return fmt.Errorf("[etcd] Failed to stop old etcd containers: %v", err)
func (c *Cluster) etcdSnapshotChecksum(ctx context.Context, snapshotPath string) bool {
log.Infof(ctx, "[etcd] Checking if all snapshots are identical")
etcdChecksums := []string{}
for _, etcdHost := range c.EtcdHosts {
checksum, err := services.GetEtcdSnapshotChecksum(ctx, etcdHost, c.PrivateRegistriesMap, c.SystemImages.Alpine, snapshotPath)
if err != nil {
return false
}
etcdChecksums = append(etcdChecksums, checksum)
log.Infof(ctx, "[etcd] Checksum of etcd snapshot on host [%s] is [%s]", etcdHost.Address, checksum)
}
// cleanup etcd data directory
toCleanPaths := []string{
path.Join(host.PrefixPath, hosts.ToCleanEtcdDir),
hostChecksum := etcdChecksums[0]
for _, checksum := range etcdChecksums {
if checksum != hostChecksum {
return false
}
}
return host.CleanUp(ctx, toCleanPaths, cleanupImage, prsMap)
return true
}
2 changes: 1 addition & 1 deletion cluster/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func checkPlaneTCPPortsFromHost(ctx context.Context, host *hosts.Host, portList
return err
}

containerLog, logsErr := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, PortCheckContainer, "all", true)
containerLog, _, logsErr := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, PortCheckContainer, "all", true)
if logsErr != nil {
log.Warnf(ctx, "[network] Failed to get network port check logs: %v", logsErr)
}
Expand Down
74 changes: 53 additions & 21 deletions cluster/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/rancher/rke/util"
Expand All @@ -12,6 +14,36 @@ import (
)

func (c *Cluster) ClusterRemove(ctx context.Context) error {
if err := c.CleanupNodes(ctx); err != nil {
return err
}
c.CleanupFiles(ctx)
return nil
}

func cleanUpHosts(ctx context.Context, cpHosts, workerHosts, etcdHosts []*hosts.Host, cleanerImage string, prsMap map[string]v3.PrivateRegistry, externalEtcd bool) error {

uniqueHosts := hosts.GetUniqueHostList(cpHosts, workerHosts, etcdHosts)

var errgrp errgroup.Group
hostsQueue := util.GetObjectQueue(uniqueHosts)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
var errList []error
for host := range hostsQueue {
runHost := host.(*hosts.Host)
if err := runHost.CleanUpAll(ctx, cleanerImage, prsMap, externalEtcd); err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}

return errgrp.Wait()
}

func (c *Cluster) CleanupNodes(ctx context.Context) error {
externalEtcd := false
if len(c.Services.Etcd.ExternalURLs) > 0 {
externalEtcd = true
Expand All @@ -33,33 +65,33 @@ func (c *Cluster) ClusterRemove(ctx context.Context) error {
}

// Clean up all hosts
if err := cleanUpHosts(ctx, c.ControlPlaneHosts, c.WorkerHosts, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, externalEtcd); err != nil {
return err
}
return cleanUpHosts(ctx, c.ControlPlaneHosts, c.WorkerHosts, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, externalEtcd)
}

func (c *Cluster) CleanupFiles(ctx context.Context) error {
pki.RemoveAdminConfig(ctx, c.LocalKubeConfigPath)
removeStateFile(ctx, c.StateFilePath)
return nil
}

func cleanUpHosts(ctx context.Context, cpHosts, workerHosts, etcdHosts []*hosts.Host, cleanerImage string, prsMap map[string]v3.PrivateRegistry, externalEtcd bool) error {

uniqueHosts := hosts.GetUniqueHostList(cpHosts, workerHosts, etcdHosts)

var errgrp errgroup.Group
hostsQueue := util.GetObjectQueue(uniqueHosts)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
var errList []error
for host := range hostsQueue {
runHost := host.(*hosts.Host)
if err := runHost.CleanUpAll(ctx, cleanerImage, prsMap, externalEtcd); err != nil {
errList = append(errList, err)
}
func (c *Cluster) RemoveOldNodes(ctx context.Context) error {
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return err
}
nodeList, err := k8s.GetNodeList(kubeClient)
if err != nil {
return err
}
uniqueHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
for _, node := range nodeList.Items {
host := &hosts.Host{}
host.HostnameOverride = node.Name
if !hosts.IsNodeInList(host, uniqueHosts) {
if err := k8s.DeleteNode(kubeClient, node.Name, c.CloudProvider.Name); err != nil {
log.Warnf(ctx, "Failed to delete old node [%s] from kubernetes")
}
return util.ErrList(errList)
})
}
}

return errgrp.Wait()
return nil
}
29 changes: 27 additions & 2 deletions cmd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,48 @@ func RestoreEtcdSnapshot(
dialersOptions hosts.DialersOptions,
flags cluster.ExternalFlags, snapshotName string) error {

log.Infof(ctx, "Starting restoring snapshot on etcd hosts")
log.Infof(ctx, "Restoring etcd snapshot %s", snapshotName)
stateFilePath := cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir)
rkeFullState, err := cluster.ReadStateFile(ctx, stateFilePath)
if err != nil {
return err
}

rkeFullState.CurrentState = cluster.State{}
if err := rkeFullState.WriteStateFile(ctx, stateFilePath); err != nil {
return err
}
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags)
if err != nil {
return err
}
if err := kubeCluster.SetupDialers(ctx, dialersOptions); err != nil {
return err
}

if err := kubeCluster.TunnelHosts(ctx, flags); err != nil {
return err
}

log.Infof(ctx, "Cleaning old kubernetes cluster")
if err := kubeCluster.CleanupNodes(ctx); err != nil {
return err
}
if err := kubeCluster.RestoreEtcdSnapshot(ctx, snapshotName); err != nil {
return err
}

if err := ClusterInit(ctx, rkeConfig, dialersOptions, flags); err != nil {
return err
}
if _, _, _, _, _, err := ClusterUp(ctx, dialersOptions, flags); err != nil {
return err
}
if err := cluster.RestartClusterPods(ctx, kubeCluster); err != nil {
return nil
}
if err := kubeCluster.RemoveOldNodes(ctx); err != nil {
return err
}
log.Infof(ctx, "Finished restoring snapshot [%s] on all etcd hosts", snapshotName)
return nil
}
Expand Down
11 changes: 6 additions & 5 deletions docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,19 +388,20 @@ func ReadContainerLogs(ctx context.Context, dClient *client.Client, containerNam
return dClient.ContainerLogs(ctx, containerName, types.ContainerLogsOptions{Follow: follow, ShowStdout: true, ShowStderr: true, Timestamps: false, Tail: tail})
}

func GetContainerLogsStdoutStderr(ctx context.Context, dClient *client.Client, containerName, tail string, follow bool) (string, error) {
func GetContainerLogsStdoutStderr(ctx context.Context, dClient *client.Client, containerName, tail string, follow bool) (string, string, error) {
var containerStderr bytes.Buffer
var containerStdout bytes.Buffer
var containerLog string
var containerErrLog, containerStdLog string
clogs, logserr := ReadContainerLogs(ctx, dClient, containerName, follow, tail)
if logserr != nil {
logrus.Debugf("logserr: %v", logserr)
return containerLog, fmt.Errorf("Failed to get gather logs from container [%s]: %v", containerName, logserr)
return containerErrLog, containerStdLog, fmt.Errorf("Failed to get gather logs from container [%s]: %v", containerName, logserr)
}
defer clogs.Close()
stdcopy.StdCopy(&containerStdout, &containerStderr, clogs)
containerLog = containerStderr.String()
return containerLog, nil
containerErrLog = containerStderr.String()
containerStdLog = containerStdout.String()
return containerErrLog, containerStdLog, nil
}

func tryRegistryAuth(pr v3.PrivateRegistry) types.RequestPrivilegeFunc {
Expand Down
4 changes: 2 additions & 2 deletions pki/pki.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,15 @@ func ExtractBackupBundleOnHost(ctx context.Context, host *hosts.Host, alpineSyst
return err
}
if status != 0 {
containerLog, err := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, BundleCertContainer, "5", false)
containerErrLog, _, err := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, BundleCertContainer, "5", false)
if err != nil {
return err
}
// removing the container in case of an error too
if err := docker.RemoveContainer(ctx, host.DClient, host.Address, BundleCertContainer); err != nil {
return err
}
return fmt.Errorf("Failed to run certificate bundle extract, exit status is: %d, container logs: %s", status, containerLog)
return fmt.Errorf("Failed to run certificate bundle extract, exit status is: %d, container logs: %s", status, containerErrLog)
}
log.Infof(ctx, "[certificates] successfully extracted certificate bundle on host [%s] to backup path [%s]", host.Address, TempCertPath)
return docker.RemoveContainer(ctx, host.DClient, host.Address, BundleCertContainer)
Expand Down
36 changes: 35 additions & 1 deletion services/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func RestoreEtcdSnapshot(ctx context.Context, etcdHost *hosts.Host, prsMap map[s
return err
}
if status != 0 {
containerLog, err := docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdRestoreContainerName, "5", false)
containerLog, _, err := docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdRestoreContainerName, "5", false)
if err != nil {
return err
}
Expand All @@ -373,3 +373,37 @@ func RestoreEtcdSnapshot(ctx context.Context, etcdHost *hosts.Host, prsMap map[s
}
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName)
}

func GetEtcdSnapshotChecksum(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, alpineImage, snapshotName string) (string, error) {
var checksum string
var err error

snapshotPath := fmt.Sprintf("%s%s", EtcdSnapshotPath, snapshotName)
imageCfg := &container.Config{
Cmd: []string{
"sh", "-c", strings.Join([]string{
"md5sum", snapshotPath,
"|", "cut", "-f1", "-d' '", "|", "tr", "-d", "'\n'"}, " "),
},
Image: alpineImage,
}
hostCfg := &container.HostConfig{
Binds: []string{
"/opt/rke/:/opt/rke/:z",
}}

if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdChecksumContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return checksum, err
}
if _, err := docker.WaitForContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdChecksumContainerName); err != nil {
return checksum, err
}
_, checksum, err = docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdChecksumContainerName, "1", false)
if err != nil {
return checksum, err
}
if err := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdChecksumContainerName); err != nil {
return checksum, err
}
return checksum, nil
}
2 changes: 1 addition & 1 deletion services/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func runHealthcheck(ctx context.Context, host *hosts.Host, serviceName string, l
return nil
}
logrus.Debug("Checking container logs")
containerLog, logserr := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, serviceName, "1", false)
containerLog, _, logserr := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, serviceName, "1", false)
containerLog = strings.TrimSuffix(containerLog, "\n")
if logserr != nil {
return fmt.Errorf("Failed to verify healthcheck for service [%s]: %v", serviceName, logserr)
Expand Down
1 change: 1 addition & 0 deletions services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
EtcdSnapshotContainerName = "etcd-rolling-snapshots"
EtcdSnapshotOnceContainerName = "etcd-snapshot-once"
EtcdRestoreContainerName = "etcd-restore"
EtcdChecksumContainerName = "etcd-checksum-checker"
NginxProxyContainerName = "nginx-proxy"
SidekickContainerName = "service-sidekick"
LogLinkContainerName = "rke-log-linker"
Expand Down

0 comments on commit ff4c93e

Please sign in to comment.