From d3fb9c5d88284bf54cb64c9738a46e56884ca6fd Mon Sep 17 00:00:00 2001 From: Peter Ebden Date: Sat, 15 Jun 2024 15:45:09 -0400 Subject: [PATCH] Send back execution metadata with messages --- elan/BUILD | 2 +- mettle/api/api.go | 6 +++--- mettle/common/common.go | 11 ++++++----- mettle/worker/worker.go | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/elan/BUILD b/elan/BUILD index da2973f8..61903b01 100644 --- a/elan/BUILD +++ b/elan/BUILD @@ -14,5 +14,5 @@ go_binary( sh_cmd( name = "run_local", srcs = [":elan"], - cmd = f"mkdir -p plz-out/elan && exec $(out_location :elan) --host {CONFIG.LOCAL_HOST} --port 7777 -s file://\\\\$PWD/plz-out/elan --log_file plz-out/log/elan.log --admin_host 127.0.0.1 --admin_port 9992 --token_file grpcutil/token.txt --known_blob_cache_size 20M", + cmd = f"mkdir -p plz-out/elan && TMPDIR='plz-out/elan' exec $(out_location :elan) --host {CONFIG.LOCAL_HOST} --port 7777 -s file://\\\\$PWD/plz-out/elan --log_file plz-out/log/elan.log --admin_host 127.0.0.1 --admin_port 9992 --token_file grpcutil/token.txt --known_blob_cache_size 20M", ) diff --git a/mettle/api/api.go b/mettle/api/api.go index 16012374..43a5513d 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -310,7 +310,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ // If we're allowed to check the cache, see if this one has already been done. // A well-behaved client will likely have done this itself but we should make sure again. if !req.SkipCacheLookup { - if err := stream.Send(common.BuildOperation(pb.ExecutionStage_CACHE_CHECK, req.ActionDigest, nil)); err != nil { + if err := stream.Send(common.BuildOperation(pb.ExecutionStage_CACHE_CHECK, req.ActionDigest, nil, nil)); err != nil { log.Warningf("Failed to forward to stream: %s", err) } if ar, err := s.client.CheckActionCache(context.Background(), req.ActionDigest); err != nil { @@ -320,7 +320,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ Result: ar, CachedResult: true, Status: &rpcstatus.Status{Code: int32(codes.OK)}, - })) + }, nil)) } } @@ -331,7 +331,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ } // Dispatch a pre-emptive response message to let our colleagues know we've queued it. // We will also receive & forward this message. - b := common.MarshalOperation(pb.ExecutionStage_QUEUED, req.ActionDigest, nil) + b := common.MarshalOperation(pb.ExecutionStage_QUEUED, req.ActionDigest, nil, nil) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() preResponseStartTime := time.Now() diff --git a/mettle/common/common.go b/mettle/common/common.go index 039d055a..4ec847cf 100644 --- a/mettle/common/common.go +++ b/mettle/common/common.go @@ -173,10 +173,11 @@ func CheckPath(path string) error { } // BuildOperation constructs a longrunning.Operation proto for a task. response may be nil. -func BuildOperation(stage pb.ExecutionStage_Value, actionDigest *pb.Digest, response *pb.ExecuteResponse) *longrunning.Operation { +func BuildOperation(stage pb.ExecutionStage_Value, actionDigest *pb.Digest, response *pb.ExecuteResponse, metadata *pb.ExecutedActionMetadata) *longrunning.Operation { any, _ := ptypes.MarshalAny(&pb.ExecuteOperationMetadata{ - Stage: stage, - ActionDigest: actionDigest, + Stage: stage, + ActionDigest: actionDigest, + PartialExecutionMetadata: metadata, }) op := &longrunning.Operation{ Name: actionDigest.Hash, @@ -191,7 +192,7 @@ func BuildOperation(stage pb.ExecutionStage_Value, actionDigest *pb.Digest, resp } // MarshalOperation is like BuildOperation but gives you back the serialised proto. -func MarshalOperation(stage pb.ExecutionStage_Value, actionDigest *pb.Digest, response *pb.ExecuteResponse) []byte { - b, _ := proto.Marshal(BuildOperation(stage, actionDigest, response)) +func MarshalOperation(stage pb.ExecutionStage_Value, actionDigest *pb.Digest, response *pb.ExecuteResponse, metadata *pb.ExecutedActionMetadata) []byte { + b, _ := proto.Marshal(BuildOperation(stage, actionDigest, response, metadata)) return b } diff --git a/mettle/worker/worker.go b/mettle/worker/worker.go index 13616315..0d8b509b 100644 --- a/mettle/worker/worker.go +++ b/mettle/worker/worker.go @@ -1065,7 +1065,7 @@ func (w *worker) collectOutputs(ar *pb.ActionResult, cmd *pb.Command) error { // update sends an update on the response channel func (w *worker) update(stage pb.ExecutionStage_Value, response *pb.ExecuteResponse) error { w.Report(true, stage == pb.ExecutionStage_EXECUTING, true, stage.String()) - body := common.MarshalOperation(stage, w.actionDigest, response) + body := common.MarshalOperation(stage, w.actionDigest, response, w.metadata) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() return common.PublishWithOrderingKey(ctx, w.responses, body, w.actionDigest.Hash, w.name)