Skip to content

Commit

Permalink
fix: executable uploader does not upload executable
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Mitchell <[email protected]>
  • Loading branch information
starpit committed Nov 8, 2024
1 parent adb7298 commit f04a3a0
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 23 deletions.
1 change: 1 addition & 0 deletions cmd/subcommands/queue/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func Ls() *cobra.Command {

for {
select {
case <-ctx.Done():
case err := <-errors:
return err
case file := <-files:
Expand Down
2 changes: 1 addition & 1 deletion cmd/subcommands/queue/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Upload() *cobra.Command {
}

run := q.RunContext{RunName: runOpts.Run}
return queue.UploadFiles(ctx, backend, run, []upload.Upload{upload.Upload{Path: args[0], Bucket: args[1]}}, *opts.Log)
return queue.UploadFiles(ctx, backend, run, []upload.Upload{upload.Upload{LocalPath: args[0], Bucket: args[1]}}, *opts.Log)
}

return cmd
Expand Down
1 change: 0 additions & 1 deletion pkg/boot/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func handlePipelineStdin() (llir.Context, error) {
}

if stdinContext.Queue.Endpoint != "" {
// context.Run.RunName = fmt.Sprintf("%s-%d", context.Run.RunName, context.Run.Step)
context = stdinContext
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/boot/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption

//inject executable into s3
if opts.Executable != "" {
if err := s3.UploadFiles(ctx, backend, ir.Context.Run, []upload.Upload{upload.Upload{Path: q.Executable, Bucket: ir.Context.Run.Bucket}}, *opts.BuildOptions.Log); err != nil {
if err := s3.UploadFiles(ctx, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable, TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, *opts.BuildOptions.Log); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ir/queue/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ const (
DispatcherDoneMarker = "lunchpail/run/{{.RunName}}/queue/step/{{.Step}}/marker/dispatcherdone"
WorkerAliveMarker = "lunchpail/run/{{.RunName}}/queue/step/{{.Step}}/marker/alive/pool/{{.PoolName}}/worker/{{.WorkerName}}"
WorkerDeadMarker = "lunchpail/run/{{.RunName}}/queue/step/{{.Step}}/marker/dead/pool/{{.PoolName}}/worker/{{.WorkerName}}"
Executable = "lunchpail/run/{{.RunName}}/data/{{.Executable}}"
Blobs = "lunchpail/run/{{.RunName}}/blobs"
)
7 changes: 6 additions & 1 deletion pkg/runtime/queue/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func Ls(ctx context.Context, backend be.Backend, run queue.RunContext, path stri
prefix = wildcard.AsFile(queue.FinishedWithSucceeded)
case "failed":
prefix = wildcard.AsFile(queue.FinishedWithFailed)
case "blobs":
prefix = wildcard.AsFile(queue.Blobs)
default:
prefix = wildcard.ListenPrefix()
}
Expand All @@ -46,7 +48,10 @@ func Ls(ctx context.Context, backend be.Backend, run queue.RunContext, path stri
if o.Err != nil {
errors <- o.Err
} else {
files <- strings.Replace(o.Key, prefix+"/", "", 1)
f := strings.Replace(o.Key, prefix+"/", "", 1)
if f != "" {
files <- f
}
}
}
}()
Expand Down
53 changes: 37 additions & 16 deletions pkg/runtime/queue/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io/fs"
"log"
"os"
"path/filepath"
"strings"
Expand All @@ -25,53 +24,75 @@ func UploadFiles(ctx context.Context, backend be.Backend, run queue.RunContext,
run.Bucket = s3.RunContext.Bucket // TODO

for _, spec := range specs {
fmt.Fprintf(os.Stderr, "Preparing upload with mkdirp on s3 bucket=%s\n", spec.Bucket)
if err := s3.Mkdirp(spec.Bucket); err != nil {
bucket := spec.Bucket
if bucket == "" {
bucket = run.Bucket
}

if opts.Verbose {
fmt.Fprintf(os.Stderr, "Preparing upload with mkdirp on s3 bucket=%s\n", bucket)
}
if err := s3.Mkdirp(bucket); err != nil {
return err
}

fmt.Fprintf(os.Stderr, "Uploading files from local path=%s to s3 bucket=%s\n", spec.Path, spec.Bucket)
info, err := os.Stat(spec.Path)
if opts.Verbose {
fmt.Fprintf(os.Stderr, "Uploading files from local path=%s to s3 bucket=%s targetDir='%s'\n", spec.LocalPath, bucket, spec.TargetDir)
}
info, err := os.Stat(spec.LocalPath)
if err != nil {
return err
}

switch mode := info.Mode(); {
case mode.IsDir():
if err := s3.copyInDir(spec); err != nil {
if err := s3.copyInDir(bucket, spec, opts); err != nil {
return err
}
case mode.IsRegular():
if err := s3.copyInFile(spec.Path, spec); err != nil {
if err := s3.copyInFile(bucket, spec.LocalPath, spec, opts); err != nil {
return err
}
default:
log.Printf("Skipping upload of filepath %s\n", spec.Path)
if opts.Verbose {
fmt.Fprintf(os.Stderr, "Skipping upload of filepath %s\n", spec.LocalPath)
}
}
}

return nil
}

func (s3 S3Client) copyInDir(spec upload.Upload) error {
return filepath.WalkDir(spec.Path, func(path string, dir fs.DirEntry, err error) error {
func (s3 S3Client) copyInDir(bucket string, spec upload.Upload, opts build.LogOptions) error {
return filepath.WalkDir(spec.LocalPath, func(path string, dir fs.DirEntry, err error) error {
if err != nil {
return err
} else if !dir.IsDir() {
return s3.copyInFile(path, spec)
return s3.copyInFile(bucket, path, spec, opts)
}
return nil
})
}

func (s3 S3Client) copyInFile(path string, spec upload.Upload) error {
func (s3 S3Client) copyInFile(bucket, localPath string, spec upload.Upload, opts build.LogOptions) error {
for i := range 10 {
dst := strings.Replace(path, spec.Path+"/", "", 1)
fmt.Fprintf(os.Stderr, "Uploading %s to s3 %s\n", path, dst)
if err := s3.Upload(spec.Bucket, path, dst); err == nil {
var dst string
switch spec.TargetDir {
case "":
dst = strings.Replace(localPath, spec.LocalPath+"/", "", 1)
default:
dst = filepath.Join(spec.TargetDir, filepath.Base(localPath))
}

if opts.Verbose {
fmt.Fprintf(os.Stderr, "Uploading %s to s3 %s\n", localPath, dst)
}
if err := s3.Upload(bucket, localPath, dst); err == nil {
break
} else {
fmt.Fprintf(os.Stderr, "Retrying upload iter=%d path=%s\n%v\n", i, path, err)
if opts.Verbose {
fmt.Fprintf(os.Stderr, "Retrying upload iter=%d path=%s\n%v\n", i, localPath, err)
}
time.Sleep(1 * time.Second)
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/runtime/queue/upload/upload.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package upload

type Upload struct {
Path string
Bucket string
LocalPath string
TargetDir string
Bucket string
}

0 comments on commit f04a3a0

Please sign in to comment.