diff --git a/cmd/subcommands/queue/ls.go b/cmd/subcommands/queue/ls.go index f0e043544..945bd98f3 100644 --- a/cmd/subcommands/queue/ls.go +++ b/cmd/subcommands/queue/ls.go @@ -64,6 +64,7 @@ func Ls() *cobra.Command { for { select { + case <-ctx.Done(): case err := <-errors: return err case file := <-files: diff --git a/cmd/subcommands/queue/upload.go b/cmd/subcommands/queue/upload.go index a2c2269bd..decd00db0 100644 --- a/cmd/subcommands/queue/upload.go +++ b/cmd/subcommands/queue/upload.go @@ -38,7 +38,7 @@ func Upload() *cobra.Command { } run := q.RunContext{RunName: runOpts.Run} - return queue.UploadFiles(ctx, backend, run, []upload.Upload{upload.Upload{Path: args[0], Bucket: args[1]}}, *opts.Log) + return queue.UploadFiles(ctx, backend, run, []upload.Upload{upload.Upload{LocalPath: args[0], Bucket: args[1]}}, *opts.Log) } return cmd diff --git a/pkg/boot/pipeline.go b/pkg/boot/pipeline.go index a22e4eafb..98763d359 100644 --- a/pkg/boot/pipeline.go +++ b/pkg/boot/pipeline.go @@ -29,7 +29,6 @@ func handlePipelineStdin() (llir.Context, error) { } if stdinContext.Queue.Endpoint != "" { - // context.Run.RunName = fmt.Sprintf("%s-%d", context.Run.RunName, context.Run.Step) context = stdinContext } } diff --git a/pkg/boot/up.go b/pkg/boot/up.go index fa05bf09e..d7db8ec83 100644 --- a/pkg/boot/up.go +++ b/pkg/boot/up.go @@ -184,7 +184,7 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption //inject executable into s3 if opts.Executable != "" { - if err := s3.UploadFiles(ctx, backend, ir.Context.Run, []upload.Upload{upload.Upload{Path: q.Executable, Bucket: ir.Context.Run.Bucket}}, *opts.BuildOptions.Log); err != nil { + if err := s3.UploadFiles(ctx, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable, TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, *opts.BuildOptions.Log); err != nil { fmt.Fprintln(os.Stderr, err) } } diff --git a/pkg/ir/queue/paths.go b/pkg/ir/queue/paths.go index 2ef3308a0..0f2e16d0f 100644 --- a/pkg/ir/queue/paths.go +++ b/pkg/ir/queue/paths.go @@ -17,5 +17,5 @@ const ( DispatcherDoneMarker = "lunchpail/run/{{.RunName}}/queue/step/{{.Step}}/marker/dispatcherdone" WorkerAliveMarker = "lunchpail/run/{{.RunName}}/queue/step/{{.Step}}/marker/alive/pool/{{.PoolName}}/worker/{{.WorkerName}}" WorkerDeadMarker = "lunchpail/run/{{.RunName}}/queue/step/{{.Step}}/marker/dead/pool/{{.PoolName}}/worker/{{.WorkerName}}" - Executable = "lunchpail/run/{{.RunName}}/data/{{.Executable}}" + Blobs = "lunchpail/run/{{.RunName}}/blobs" ) diff --git a/pkg/runtime/queue/ls.go b/pkg/runtime/queue/ls.go index d15cedb71..09e5f38fd 100644 --- a/pkg/runtime/queue/ls.go +++ b/pkg/runtime/queue/ls.go @@ -32,6 +32,8 @@ func Ls(ctx context.Context, backend be.Backend, run queue.RunContext, path stri prefix = wildcard.AsFile(queue.FinishedWithSucceeded) case "failed": prefix = wildcard.AsFile(queue.FinishedWithFailed) + case "blobs": + prefix = wildcard.AsFile(queue.Blobs) default: prefix = wildcard.ListenPrefix() } @@ -46,7 +48,10 @@ func Ls(ctx context.Context, backend be.Backend, run queue.RunContext, path stri if o.Err != nil { errors <- o.Err } else { - files <- strings.Replace(o.Key, prefix+"/", "", 1) + f := strings.Replace(o.Key, prefix+"/", "", 1) + if f != "" { + files <- f + } } } }() diff --git a/pkg/runtime/queue/upload.go b/pkg/runtime/queue/upload.go index 189a90c98..f3f8d5fa8 100644 --- a/pkg/runtime/queue/upload.go +++ b/pkg/runtime/queue/upload.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io/fs" - "log" "os" "path/filepath" "strings" @@ -25,53 +24,75 @@ func UploadFiles(ctx context.Context, backend be.Backend, run queue.RunContext, run.Bucket = s3.RunContext.Bucket // TODO for _, spec := range specs { - fmt.Fprintf(os.Stderr, "Preparing upload with mkdirp on s3 bucket=%s\n", spec.Bucket) - if err := s3.Mkdirp(spec.Bucket); err != nil { + bucket := spec.Bucket + if bucket == "" { + bucket = run.Bucket + } + + if opts.Verbose { + fmt.Fprintf(os.Stderr, "Preparing upload with mkdirp on s3 bucket=%s\n", bucket) + } + if err := s3.Mkdirp(bucket); err != nil { return err } - fmt.Fprintf(os.Stderr, "Uploading files from local path=%s to s3 bucket=%s\n", spec.Path, spec.Bucket) - info, err := os.Stat(spec.Path) + if opts.Verbose { + fmt.Fprintf(os.Stderr, "Uploading files from local path=%s to s3 bucket=%s targetDir='%s'\n", spec.LocalPath, bucket, spec.TargetDir) + } + info, err := os.Stat(spec.LocalPath) if err != nil { return err } switch mode := info.Mode(); { case mode.IsDir(): - if err := s3.copyInDir(spec); err != nil { + if err := s3.copyInDir(bucket, spec, opts); err != nil { return err } case mode.IsRegular(): - if err := s3.copyInFile(spec.Path, spec); err != nil { + if err := s3.copyInFile(bucket, spec.LocalPath, spec, opts); err != nil { return err } default: - log.Printf("Skipping upload of filepath %s\n", spec.Path) + if opts.Verbose { + fmt.Fprintf(os.Stderr, "Skipping upload of filepath %s\n", spec.LocalPath) + } } } return nil } -func (s3 S3Client) copyInDir(spec upload.Upload) error { - return filepath.WalkDir(spec.Path, func(path string, dir fs.DirEntry, err error) error { +func (s3 S3Client) copyInDir(bucket string, spec upload.Upload, opts build.LogOptions) error { + return filepath.WalkDir(spec.LocalPath, func(path string, dir fs.DirEntry, err error) error { if err != nil { return err } else if !dir.IsDir() { - return s3.copyInFile(path, spec) + return s3.copyInFile(bucket, path, spec, opts) } return nil }) } -func (s3 S3Client) copyInFile(path string, spec upload.Upload) error { +func (s3 S3Client) copyInFile(bucket, localPath string, spec upload.Upload, opts build.LogOptions) error { for i := range 10 { - dst := strings.Replace(path, spec.Path+"/", "", 1) - fmt.Fprintf(os.Stderr, "Uploading %s to s3 %s\n", path, dst) - if err := s3.Upload(spec.Bucket, path, dst); err == nil { + var dst string + switch spec.TargetDir { + case "": + dst = strings.Replace(localPath, spec.LocalPath+"/", "", 1) + default: + dst = filepath.Join(spec.TargetDir, filepath.Base(localPath)) + } + + if opts.Verbose { + fmt.Fprintf(os.Stderr, "Uploading %s to s3 %s\n", localPath, dst) + } + if err := s3.Upload(bucket, localPath, dst); err == nil { break } else { - fmt.Fprintf(os.Stderr, "Retrying upload iter=%d path=%s\n%v\n", i, path, err) + if opts.Verbose { + fmt.Fprintf(os.Stderr, "Retrying upload iter=%d path=%s\n%v\n", i, localPath, err) + } time.Sleep(1 * time.Second) } } diff --git a/pkg/runtime/queue/upload/upload.go b/pkg/runtime/queue/upload/upload.go index 1d1ef48aa..15c4898aa 100644 --- a/pkg/runtime/queue/upload/upload.go +++ b/pkg/runtime/queue/upload/upload.go @@ -1,6 +1,7 @@ package upload type Upload struct { - Path string - Bucket string + LocalPath string + TargetDir string + Bucket string }