Skip to content

Commit

Permalink
K8SPXC-1503: Fix exec in pitr and proxysql pods (percona#1838)
Browse files Browse the repository at this point in the history
* Fix exec in pitr and proxysql pods

* Modify to use RetryOnConflict

* Retry logic working

* Conforming retry logic for PodLogs and IsPodRunning

---------

Co-authored-by: Viacheslav Sarzhan <[email protected]>
  • Loading branch information
dcaputo-harmoni and hors authored Nov 12, 2024
1 parent afe047d commit b4d1191
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 42 deletions.
93 changes: 62 additions & 31 deletions clientcmd/clientcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/util/retry"
)

type Client struct {
Expand Down Expand Up @@ -49,36 +50,57 @@ func NewClient() (*Client, error) {
}

func (c *Client) PodLogs(namespace, podName string, opts *corev1.PodLogOptions) ([]string, error) {
logs, err := c.client.Pods(namespace).GetLogs(podName, opts).Stream(context.TODO())
if err != nil {
return nil, errors.Wrap(err, "get pod logs stream")
}
defer logs.Close()

logArr := make([]string, 0)
sc := bufio.NewScanner(logs)
for sc.Scan() {
logArr = append(logArr, sc.Text())
var logArr []string
retryErr := retry.OnError(retry.DefaultRetry, func(err error) bool {
return true // Retry on all errors
}, func() error {
logs, err := c.client.Pods(namespace).GetLogs(podName, opts).Stream(context.TODO())
if err != nil {
return errors.Wrap(err, "get pod logs stream")
}
defer logs.Close()
logArr = make([]string, 0)
sc := bufio.NewScanner(logs)
for sc.Scan() {
logArr = append(logArr, sc.Text())
}
if sc.Err() != nil {
return errors.Wrap(sc.Err(), "reading logs stream")
}
return nil
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "failed to get pod logs")
}
return logArr, errors.Wrap(sc.Err(), "reading logs stream")
return logArr, nil
}

func (c *Client) IsPodRunning(namespace, podName string) (bool, error) {
pod, err := c.client.Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}

if pod.Status.Phase != corev1.PodRunning {
return false, nil
}

for _, v := range pod.Status.Conditions {
if v.Type == corev1.ContainersReady && v.Status == corev1.ConditionTrue {
return true, nil
var isRunning bool
retryErr := retry.OnError(retry.DefaultRetry, func(err error) bool {
return true // Retry on all errors
}, func() error {
pod, err := c.client.Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return err
}
if pod.Status.Phase != corev1.PodRunning {
isRunning = false
return nil
}
for _, v := range pod.Status.Conditions {
if v.Type == corev1.ContainersReady && v.Status == corev1.ConditionTrue {
isRunning = true
return nil
}
}
isRunning = false
return nil
})
if retryErr != nil {
return false, errors.Wrap(retryErr, "failed to check pod status")
}
return false, nil
return isRunning, nil
}

func (c *Client) Exec(pod *corev1.Pod, containerName string, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
Expand All @@ -101,16 +123,25 @@ func (c *Client) Exec(pod *corev1.Pod, containerName string, command []string, s

exec, err := remotecommand.NewSPDYExecutor(c.restconfig, "POST", req.URL())
if err != nil {
return err
return errors.Wrap(err, "failed to create executor")
}

// Connect this process' std{in,out,err} to the remote shell process.
return exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
retryErr := retry.OnError(retry.DefaultRetry, func(err error) bool {
return true // Retry on all errors
}, func() error {
return exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
})
})

if retryErr != nil {
return errors.Wrap(retryErr, "failed to execute command in pod")
}

return nil
}

func (c *Client) REST() restclient.Interface {
Expand Down
15 changes: 4 additions & 11 deletions pkg/pxc/backup/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,12 @@ func CheckPITRErrors(ctx context.Context, cl client.Client, clcmd *clientcmd.Cli
stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}

err = clcmd.Exec(collectorPod, "pitr", []string{"/bin/bash", "-c", "cat /tmp/gap-detected"}, nil, stdoutBuf, stderrBuf, false)
err = clcmd.Exec(collectorPod, "pitr", []string{"/bin/bash", "-c", "cat /tmp/gap-detected || true"}, nil, stdoutBuf, stderrBuf, false)
if err != nil {
if strings.Contains(stderrBuf.String(), "No such file or directory") {
return nil
}
return errors.Wrapf(err, "check binlog gaps in pod %s", collectorPod.Name)
return errors.Wrapf(err, "exec binlog collector pod %s", collectorPod.Name)
}

if stdoutBuf.Len() == 0 {
log.Info("Gap detected but GTID set is empty", "collector", collectorPod.Name)
return nil
}

Expand Down Expand Up @@ -127,12 +123,9 @@ func UpdatePITRTimeline(ctx context.Context, cl client.Client, clcmd *clientcmd.

stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}
err = clcmd.Exec(collectorPod, "pitr", []string{"/bin/bash", "-c", "cat /tmp/pitr-timeline"}, nil, stdoutBuf, stderrBuf, false)
err = clcmd.Exec(collectorPod, "pitr", []string{"/bin/bash", "-c", "cat /tmp/pitr-timeline || true"}, nil, stdoutBuf, stderrBuf, false)
if err != nil {
if strings.Contains(stderrBuf.String(), "No such file or directory") {
return nil
}
return errors.Wrapf(err, "check binlog gaps in pod %s", collectorPod.Name)
return errors.Wrapf(err, "exec binlog collector pod %s", collectorPod.Name)
}

timelines := strings.Split(stdoutBuf.String(), "\n")
Expand Down

0 comments on commit b4d1191

Please sign in to comment.