Skip to content

Commit

Permalink
fix: improve log and portforward output for be/kubernetes
Browse files Browse the repository at this point in the history
This touches a lot of files, because we have updated be.Backend.AccessQueue() to require a build.LogOptions.
This also removes be.Backend.Queue() from the public API.

Signed-off-by: Nick Mitchell <[email protected]>
  • Loading branch information
starpit committed Nov 8, 2024
1 parent 7e41ddf commit adb7298
Show file tree
Hide file tree
Showing 23 changed files with 155 additions and 69 deletions.
2 changes: 1 addition & 1 deletion cmd/subcommands/queue/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Cat() *cobra.Command {
return err
}

return queue.Qcat(ctx, backend, run, args[0])
return queue.Qcat(ctx, backend, run, args[0], *opts.Log)
}

return cmd
Expand Down
2 changes: 1 addition & 1 deletion cmd/subcommands/queue/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Drain() *cobra.Command {
return err
}

return queue.Drain(ctx, backend, run)
return queue.Drain(ctx, backend, run, *opts.Log)
}

return cmd
Expand Down
2 changes: 1 addition & 1 deletion cmd/subcommands/queue/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Ls() *cobra.Command {
return err
}

files, errors, err := queue.Ls(ctx, backend, runContext, path)
files, errors, err := queue.Ls(ctx, backend, runContext, path, *opts.Log)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/subcommands/queue/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Upload() *cobra.Command {
}

run := q.RunContext{RunName: runOpts.Run}
return queue.UploadFiles(ctx, backend, run, []upload.Upload{upload.Upload{Path: args[0], Bucket: args[1]}})
return queue.UploadFiles(ctx, backend, run, []upload.Upload{upload.Upload{Path: args[0], Bucket: args[1]}}, *opts.Log)
}

return cmd
Expand Down
5 changes: 4 additions & 1 deletion hack/dpk.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ fi
mkdir -p dpk

for i in tests/tests/python*
do ./lunchpail build -o dpk/$(basename "$i") "$i"/pail --target local --create-namespace &
do
if [[ -z "$1" ]] || [[ "$1" = $(basename "$i") ]]
then ./lunchpail build -o dpk/$(basename "$i") "$i"/pail --target local --create-namespace &
fi
done

wait
6 changes: 2 additions & 4 deletions pkg/be/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"lunchpail.io/pkg/be/runs"
"lunchpail.io/pkg/be/streamer"
"lunchpail.io/pkg/build"
"lunchpail.io/pkg/ir/llir"
"lunchpail.io/pkg/ir/queue"
"lunchpail.io/pkg/lunchpail"
Expand Down Expand Up @@ -32,11 +33,8 @@ type Backend interface {
// Number of instances of the given component for the given run
InstanceCount(ctx context.Context, c lunchpail.Component, run queue.RunContext) (int, error)

// Queue properties for a given run
Queue(ctx context.Context, run queue.RunContext) (endpoint, accessKeyID, secretAccessKey, bucket string, err error)

// Queue properties for a given run, plus ensure access to the endpoint from this client
AccessQueue(ctx context.Context, run queue.RunContext) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error)
AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error)

// Return a streamer
Streamer(ctx context.Context, run queue.RunContext) streamer.Streamer
Expand Down
8 changes: 2 additions & 6 deletions pkg/be/ibmcloud/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@ import (
"context"
"fmt"

"lunchpail.io/pkg/build"
"lunchpail.io/pkg/ir/queue"
)

// Queue properties for a given run, plus ensure access to the endpoint from this client
func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) {
func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) {
err = fmt.Errorf("Unsupported operation: 'AccessQueue'")
return
}

func (backend Backend) Queue(ctx context.Context, run queue.RunContext) (endpoint, accessKeyID, secretAccessKey, bucket string, err error) {
err = fmt.Errorf("Unsupported operation: 'Queue'")
return
}
48 changes: 36 additions & 12 deletions pkg/be/kubernetes/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,48 @@ func (streamer Streamer) ComponentLogs(component lunchpail.Component, opts strea
}

selector := "app.kubernetes.io/component=" + string(component) + runSelector
cmdline := "kubectl logs -n " + streamer.backend.namespace + " -l " + selector + " --tail=" + strconv.Itoa(opts.Tail) + " " + followFlag + " -c " + containers + " --max-log-requests=99 | grep -v 'workerpool worker'"

if opts.Verbose {
fmt.Fprintf(os.Stderr, "Tracking logs of component=%s\n", component)
fmt.Fprintf(os.Stderr, "Tracking logs via cmdline=%s\n", cmdline)
}
cmdline := "kubectl logs -n " + streamer.backend.namespace + " -l " + selector + " --tail=" + strconv.Itoa(opts.Tail) + " " + followFlag + " -c " + containers + " --max-log-requests=99"

for {
if opts.Verbose {
fmt.Fprintf(os.Stderr, "Starting log tracking via cmdline=%s\n", cmdline)
}

cmd := exec.Command("/bin/sh", "-c", cmdline)
cmd.Stdout = os.Stdout
if opts.Writer != nil {
cmd.Stdout = opts.Writer
}

cmd.Stderr = os.Stderr
err := cmd.Run()
if err == nil {
stderr, err := cmd.StderrPipe()
if err != nil {
return err
}

if err := cmd.Start(); err != nil {
if !strings.Contains(err.Error(), "signal:") {
fmt.Fprintf(os.Stderr, "Error tracking component logs %v: %v\n", component, err)
return err
} else {
// swallow signal: interrupt/killed
return nil
}
}

if opts.Verbose {
fmt.Fprintf(os.Stderr, "Now tracking component logs for %v\n", component)
}

// Filter out not found error messages. We will retry.
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
line := scanner.Text()
if !strings.Contains(line, "No resources found") && !strings.Contains(line, "waiting to start") {
fmt.Fprintln(os.Stderr, line)
}
}

if err := cmd.Wait(); err == nil {
break
} else {
if opts.Verbose {
Expand All @@ -61,11 +86,10 @@ func (streamer Streamer) ComponentLogs(component lunchpail.Component, opts strea
case <-streamer.Context.Done():
return nil
default:
time.Sleep(1 * time.Second)
time.Sleep(500 * time.Millisecond)
continue
}
}

break
}

return nil
Expand Down
76 changes: 62 additions & 14 deletions pkg/be/kubernetes/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,34 @@ import (
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"

"lunchpail.io/pkg/build"
)

func retryOnError(err error) bool {
func retryOnError(ctx context.Context, err error) bool {
select {
case <-ctx.Done():
return false
default:
}

if strings.Contains(err.Error(), "connection refused") {
time.Sleep(3 * time.Second)
time.Sleep(2 * time.Second)
return true
}

return false
}

func (backend Backend) portForward(ctx context.Context, podName string, localPort, podPort int) (func(), error) {
func (backend Backend) portForward(ctx context.Context, podName string, localPort, podPort int, opts build.LogOptions) (func(), error) {
c, restConfig, err := Client()
if err != nil {
return func() {}, err
Expand All @@ -39,58 +50,95 @@ func (backend Backend) portForward(ctx context.Context, podName string, localPor
// readyCh communicate when the port forward is ready to get traffic
readyCh := make(chan struct{})

// we will set this below when a successfully launched
// portforwarder exits normally
done := false

// managing termination signal from the terminal. As you can see the stopCh
// gets closed to gracefully handle its termination.
/*sigs := make(chan os.Signal, 1)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
group.Go(func() error {
go func() error {
<-sigs
close(stopCh)
if !done {
if opts.Debug {
fmt.Fprintln(os.Stderr, "SIGINT/TERM has initiated close of portforward closed", os.Args)
}
done = true
close(stopCh)
}
return nil
})*/
}()

go func() error {
for {
// hmmm... the client-go portforward.go logs an UnhandledError when things are all done and good...
// portforward.go:413] "Unhandled Error" err="an error occurred forwarding
runtime.ErrorHandlers = []runtime.ErrorHandler{}

for !done {
select {
case <-ctx.Done():
return nil
default:
}

path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward",
backend.namespace, podName)
hostIP := strings.TrimLeft(restConfig.Host, "htps:/")

transport, upgrader, err := spdy.RoundTripperFor(restConfig)
if err != nil {
if !retryOnError(err) {
if !retryOnError(ctx, err) {
return err
}
continue
}

stdout := ioutil.Discard // TODO verbose?
stderr := os.Stderr
stdout := ioutil.Discard
stderr := ioutil.Discard
if opts.Verbose {
stdout = os.Stderr
stderr = os.Stderr
}

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, &url.URL{Scheme: "https", Path: path, Host: hostIP})

fw, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", localPort, podPort)}, stopCh, readyCh, stdout, stderr)
if err != nil {
if !retryOnError(err) {
if !retryOnError(ctx, err) {
return err
}
continue
}

if err := fw.ForwardPorts(); err != nil {
if !retryOnError(err) {
if !retryOnError(ctx, err) {
return err
}
continue
}

if opts.Verbose {
fmt.Fprintln(os.Stderr, "Portforward closed", os.Args)
}
done = true
}

return nil
}()

// wait for it to be ready
<-readyCh

stop := func() {
// hmm... for kubernetes backends, this can result in a panic: close on closed channel
// close(stopCh)
if !done {
if opts.Debug {
fmt.Fprintln(os.Stderr, "Client has requested close of portforward closed", os.Args)
}
done = true
close(stopCh)
}
}

return stop, nil
Expand Down
19 changes: 13 additions & 6 deletions pkg/be/kubernetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"lunchpail.io/pkg/be/kubernetes/names"
"lunchpail.io/pkg/build"
"lunchpail.io/pkg/ir/queue"
)

// Queue properties for a given run, plus ensure access to the endpoint from this client
func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) {
endpoint, accessKeyID, secretAccessKey, bucket, err = backend.Queue(ctx, run)
func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) {
endpoint, accessKeyID, secretAccessKey, bucket, err = backend.queue(ctx, run)
if err != nil {
return
}
Expand All @@ -36,7 +37,10 @@ func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext) (e
err = perr
return
}
fmt.Fprintf(os.Stderr, "Opening port forward to pod=%s\n", podName)

if opts.Verbose {
fmt.Fprintf(os.Stderr, "Opening port forward to pod=%s args=%v\n", podName, os.Args)
}

var localPort int
for {
Expand All @@ -45,7 +49,7 @@ func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext) (e
continue
}

if s, perr := backend.portForward(ctx, podName, localPort, podPort); perr != nil {
if s, perr := backend.portForward(ctx, podName, localPort, podPort, opts); perr != nil {
if strings.Contains(perr.Error(), "already in use") {
// Oops, someone else grabbed the port. Try again.
continue
Expand All @@ -60,7 +64,10 @@ func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext) (e

oendpoint := endpoint
endpoint = fmt.Sprintf("http://localhost:%d", localPort)
fmt.Fprintf(os.Stderr, "Port forwarding with endpoint=%s -> %s\n", oendpoint, endpoint)

if opts.Verbose {
fmt.Fprintf(os.Stderr, "Port forwarding with endpoint=%s -> %s\n", oendpoint, endpoint)
}
}

return
Expand All @@ -80,7 +87,7 @@ func portFromEndpoint(endpoint string) (int, error) {
return port, nil
}

func (backend Backend) Queue(ctx context.Context, run queue.RunContext) (endpoint, accessKeyID, secretAccessKey, bucket string, err error) {
func (backend Backend) queue(ctx context.Context, run queue.RunContext) (endpoint, accessKeyID, secretAccessKey, bucket string, err error) {
endpoint = os.Getenv("lunchpail_queue_endpoint")
accessKeyID = os.Getenv("lunchpail_queue_accessKeyID")
secretAccessKey = os.Getenv("lunchpail_queue_secretAccessKey")
Expand Down
7 changes: 4 additions & 3 deletions pkg/be/local/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ import (
"time"

"lunchpail.io/pkg/be/local/files"
"lunchpail.io/pkg/build"
"lunchpail.io/pkg/ir/llir"
"lunchpail.io/pkg/ir/queue"
)

// Queue properties for a given run, plus ensure access to the endpoint from this client
func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) {
endpoint, accessKeyID, secretAccessKey, bucket, err = backend.Queue(ctx, run)
func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) {
endpoint, accessKeyID, secretAccessKey, bucket, err = backend.queue(ctx, run)
stop = func() {}
return
}

func (backend Backend) Queue(ctx context.Context, run queue.RunContext) (endpoint, accessKeyID, secretAccessKey, bucket string, err error) {
func (backend Backend) queue(ctx context.Context, run queue.RunContext) (endpoint, accessKeyID, secretAccessKey, bucket string, err error) {
spec, rerr := restoreContext(run)
if rerr != nil {
err = rerr
Expand Down
2 changes: 1 addition & 1 deletion pkg/boot/alldone.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func waitForAllDone(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error {
client, err := s3.NewS3ClientForRun(ctx, backend, run.RunName)
client, err := s3.NewS3ClientForRun(ctx, backend, run.RunName, opts)
if err != nil {
if strings.Contains(err.Error(), "Connection closed") {
// already gone
Expand Down
Loading

0 comments on commit adb7298

Please sign in to comment.