Skip to content

Commit

Permalink
fix: executable uploader may result in unexpected end of JSON input
Browse files Browse the repository at this point in the history
… error (local backend)

And also a 1-second startup delay... THis is because boot/up.go uploads the executable synchronously *prior* to calling up. But for the local backend it is local.Up() that writes out the queue context.

Signed-off-by: Nick Mitchell <[email protected]>
  • Loading branch information
starpit committed Nov 8, 2024
1 parent f04a3a0 commit 94c0938
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
11 changes: 6 additions & 5 deletions pkg/be/local/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ func restoreContext(run queue.RunContext) (llir.Context, error) {
}

var b []byte
for {
for len(b) == 0 {
if b, err = os.ReadFile(f); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return spec, err
}
time.Sleep(1 * time.Second)
time.Sleep(500 * time.Millisecond)
}
break
}

if err := json.Unmarshal(b, &spec); err != nil {
return spec, err
if len(b) > 0 {
if err := json.Unmarshal(b, &spec); err != nil {
return spec, err
}
}

return spec, nil
Expand Down
32 changes: 20 additions & 12 deletions pkg/boot/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,20 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption
// We need to chain the isRunning channel to our 0-2 consumers
// below. This is because golang channels are not multicast.
isRunning := make(chan llir.Context) // is the job ready for business?
isRunning4 := make(chan llir.Context)
isRunning5 := make(chan llir.Context)
needsCatAndRedirect := len(opts.Inputs) > 0 || ir.Context.Run.Step > 0
go func() {
ctx := <-isRunning
isRunning4 <- ctx
isRunning4 <- ctx
isRunning5 <- ctx
isRunning5 <- ctx
if opts.Executable != "" {
isRunning5 <- ctx
}
if needsCatAndRedirect {
isRunning4 <- ctx
isRunning5 <- ctx
}
if opts.Watch {
isRunning4 <- ctx
isRunning5 <- ctx
}
}()

Expand All @@ -147,7 +150,7 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption
// Behave like `cat inputs | ... > outputs`
go func() {
// wait for the run to be ready for us to enqueue
<-isRunning4
<-isRunning5

defer func() { redirectDone <- struct{}{} }()
if err := catAndRedirect(cancellable, opts.Inputs, backend, ir, *opts.BuildOptions.Log); err != nil {
Expand All @@ -161,21 +164,21 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption
if opts.Watch {
verbose := opts.BuildOptions.Log.Verbose
go func() {
<-isRunning4
<-isRunning5
go watchLogs(cancellable, backend, ir, logsDone, WatchOptions{Verbose: verbose})
go watchUtilization(cancellable, backend, ir, WatchOptions{Verbose: verbose})
}()
}

go func() {
if err := handlePipelineStdout(<-isRunning4); err != nil {
if err := handlePipelineStdout(<-isRunning5); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}()

var errorFromTask error
go func() {
<-isRunning4
<-isRunning5
if err := lookForTaskFailures(cancellable, backend, ir.Context.Run, *opts.BuildOptions.Log); err != nil {
errorFromTask = err
// fail fast? cancel()
Expand All @@ -184,9 +187,14 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption

//inject executable into s3
if opts.Executable != "" {
if err := s3.UploadFiles(ctx, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable, TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, *opts.BuildOptions.Log); err != nil {
fmt.Fprintln(os.Stderr, err)
}
go func() {
// wait for the run to be ready for us to enqueue
<-isRunning5

if err := s3.UploadFiles(ctx, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable, TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, *opts.BuildOptions.Log); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}()
}

defer cancel()
Expand Down

0 comments on commit 94c0938

Please sign in to comment.