diff --git a/elan/rpc/BUILD b/elan/rpc/BUILD index d244d6fe..c90f67f7 100644 --- a/elan/rpc/BUILD +++ b/elan/rpc/BUILD @@ -36,6 +36,7 @@ go_library( "//proto/purity", "//rexclient", "//third_party/go:grpc", + "///third_party/go/google.golang.org_grpc//metadata", ], ) diff --git a/elan/rpc/rpc.go b/elan/rpc/rpc.go index fdd46f98..5dd67f7f 100644 --- a/elan/rpc/rpc.go +++ b/elan/rpc/rpc.go @@ -10,6 +10,7 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "google.golang.org/grpc/metadata" "io" "io/ioutil" "net" @@ -240,6 +241,16 @@ func (s *server) GetCapabilities(ctx context.Context, req *pb.GetCapabilitiesReq } func (s *server) GetActionResult(ctx context.Context, req *pb.GetActionResultRequest) (*pb.ActionResult, error) { + md, _ := metadata.FromIncomingContext(ctx) + callers := md.Get("mettle-caller-name") + + caller := "" + if len(callers) > 0 && callers[0] != "" { + caller = callers[0] + } else { + // Be paranoid and assume that a client may not have updated, and set the caller to the instance name + caller = req.InstanceName + } ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() ar := &pb.ActionResult{} @@ -256,7 +267,8 @@ func (s *server) GetActionResult(ctx context.Context, req *pb.GetActionResultReq ar.StdoutRaw = b } } - if s.isFileStorage && req.InstanceName != "purity-gc" { + + if s.isFileStorage && caller != "purity-gc" { now := time.Now() if err := os.Chtimes(path.Join(s.storageRoot, s.key("ac", req.ActionDigest)), now, now); err != nil { log.Warning("Failed to change times on file: %s", err) diff --git a/proto/purity/purity.proto b/proto/purity/purity.proto index 14e37543..6f701bf0 100644 --- a/proto/purity/purity.proto +++ b/proto/purity/purity.proto @@ -16,6 +16,8 @@ service GC { message ListRequest { // The prefix of blobs to list. Should be exactly two hex characters. string prefix = 1; + // The instance name for the CAS server. + string instance_name = 2; } message ListResponse { @@ -53,6 +55,8 @@ message DeleteRequest { // False gives the server an option to 'soft' delete them (however it may // interpret that). bool hard = 4; + // The instance name for the CAS server. + string instance_name = 5; } message DeleteResponse {} diff --git a/purity/gc/BUILD b/purity/gc/BUILD index 15487abe..48dae5ef 100644 --- a/purity/gc/BUILD +++ b/purity/gc/BUILD @@ -18,5 +18,6 @@ go_library( "//proto/purity", "//rexclient", "//third_party/go:grpc", + "///third_party/go/google.golang.org_grpc//metadata", ], ) diff --git a/purity/gc/gc.go b/purity/gc/gc.go index bb0ed424..57dc8f10 100644 --- a/purity/gc/gc.go +++ b/purity/gc/gc.go @@ -5,6 +5,7 @@ import ( "context" "encoding/hex" "fmt" + "google.golang.org/grpc/metadata" "path" "sort" "sync" @@ -124,6 +125,8 @@ type Action struct { InputSize, OutputSize int } +const name = "purity-gc" + type collector struct { client *client.Client gcclient ppb.GCClient @@ -187,6 +190,7 @@ func (c *collector) LoadAllBlobs() error { for j := 0; j < 16; j++ { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) defer cancel() + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("mettle-caller-name", name)) resp, err := c.gcclient.List(ctx, &ppb.ListRequest{ Prefix: hex.EncodeToString([]byte{byte(i*16 + j)}), }) @@ -475,6 +479,7 @@ func (c *collector) ReplicateBlobs(rf int) error { if err := c.replicateBlobs("blobs", blobs, func(dg *pb.Digest) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("mettle-caller-name", name)) blob, _, err := c.client.ReadBlob(ctx, digest.NewFromProtoUnvalidated(dg)) if err != nil { return err @@ -487,6 +492,7 @@ func (c *collector) ReplicateBlobs(rf int) error { return c.replicateBlobs("action results", ars, func(dg *pb.Digest) error { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("mettle-caller-name", name)) ar, err := c.client.GetActionResult(ctx, &pb.GetActionResultRequest{ InstanceName: c.client.InstanceName, ActionDigest: dg, @@ -616,6 +622,7 @@ func (c *collector) BlobUsage() ([]Blob, error) { func (c *collector) inputDirs(ar *ppb.ActionResult) (*pb.Action, []*pb.Directory, error) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("mettle-caller-name", name)) action := &pb.Action{} blob, present := c.allBlobs[ar.Hash] if !present { diff --git a/purity/main.go b/purity/main.go index 9e8b589e..9660f188 100644 --- a/purity/main.go +++ b/purity/main.go @@ -20,7 +20,7 @@ var opts = struct { Logging cli.LoggingOpts `group:"Options controlling logging output"` GC struct { URL string `short:"u" long:"url" required:"true" description:"URL for the storage server"` - InstanceName string `short:"i" long:"instance_name" default:"purity-gc" description:"Name of this execution instance"` + InstanceName string `short:"i" long:"instance_name" default:"mettle" description:"Name of this execution instance"` TokenFile string `long:"token_file" description:"File containing token to authenticate gRPC requests with"` TLS bool `long:"tls" description:"Use TLS for communicating with the storage server"` } `group:"Options controlling GC settings"`