diff --git a/ChangeLog b/ChangeLog index d7f7b3ff..0f3b9150 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +Version 11.9.9 +-------------- + * Read from redis only blob size is below limit (and could be found in + Redis). + Version 11.9.8 -------------- * Allow specifying redis timeout through flags/environment. diff --git a/VERSION b/VERSION index 76d3a6cf..0c2b2bec 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.9.8 +11.9.9 diff --git a/mettle/worker/redis.go b/mettle/worker/redis.go index 7c2eab33..0df4bf1b 100644 --- a/mettle/worker/redis.go +++ b/mettle/worker/redis.go @@ -65,17 +65,19 @@ func (r *elanRedisWrapper) Healthcheck() error { } func (r *elanRedisWrapper) ReadBlob(dg *pb.Digest) ([]byte, error) { - if r.limiter.Tokens() >= 1.0 { - cmd := r.readRedis.Get(context.Background(), dg.Hash) - if err := cmd.Err(); err == nil { - blob, _ := cmd.Bytes() - return blob, nil - } else if err != redis.Nil { - log.Warningf("Failed to read blob from Redis: %s", err) - r.limiter.Reserve() + if dg.SizeBytes < r.maxSize { + if r.limiter.Tokens() >= 1.0 { + cmd := r.readRedis.Get(context.Background(), dg.Hash) + if err := cmd.Err(); err == nil { + blob, _ := cmd.Bytes() + return blob, nil + } else if err != redis.Nil { + log.Warningf("Failed to read blob from Redis: %s", err) + r.limiter.Reserve() + } + } else { + log.Warningf("limiter error rate exceeded, skipping Redis lookup") } - } else { - log.Warningf("limiter error rate exceeded, skipping Redis lookup") } // If we get here, the blob didn't exist. Download it then write back to Redis. blob, err := r.elan.ReadBlob(dg) @@ -107,11 +109,17 @@ func (r *elanRedisWrapper) UploadIfMissing(entries []*uploadinfo.Entry, compress // This is approximate only and assumes that Redis always has a strict subset of the total keys. missing := make([]*uploadinfo.Entry, 0, len(entries)) uploads := make([]interface{}, 0, 2*len(entries)) - keys := make([]string, len(entries)) + keys := make([]string, 0, len(entries)) + entriesByHash := make(map[string]*uploadinfo.Entry, len(entries)) // Unfortunately there is no MEXISTS, so we reuse MGET but chuck away the values. // We could also do lots of EXISTS in parallel but this seems simpler... - for i, entry := range entries { - keys[i] = entry.Digest.Hash + for _, entry := range entries { + if entry.Digest.Size < r.maxSize { + keys = append(keys, entry.Digest.Hash) + } else { + missing = append(missing, entry) + } + entriesByHash[entry.Digest.Hash] = entry } blobs := r.readBlobs(keys, false) if blobs == nil { @@ -119,18 +127,18 @@ func (r *elanRedisWrapper) UploadIfMissing(entries []*uploadinfo.Entry, compress } for i, blob := range blobs { if blob == nil { - e := entries[i] + e := entriesByHash[keys[i]] missing = append(missing, e) if dg := e.Digest; dg.Size < r.maxSize && dg.Hash != emptyHash { if e.Contents != nil { - uploads = append(uploads, keys[i], e.Contents) + uploads = append(uploads, e.Digest.Hash, e.Contents) } else { b, err := os.ReadFile(e.Path) if err != nil { log.Warning("Failed to read file %s: %s", e.Path, err) continue } - uploads = append(uploads, keys[i], b) + uploads = append(uploads, e.Digest.Hash, b) } } } @@ -154,15 +162,19 @@ func (r *elanRedisWrapper) UploadIfMissing(entries []*uploadinfo.Entry, compress func (r *elanRedisWrapper) BatchDownload(dgs []digest.Digest) (map[digest.Digest][]byte, error) { log.Debug("Checking Redis for batch of %d files...", len(dgs)) - keys := make([]string, len(dgs)) - for i, dg := range dgs { - keys[i] = dg.Hash + keys := make([]string, 0, len(dgs)) + missingDigests := make([]digest.Digest, 0, len(dgs)) + for _, dg := range dgs { + if dg.Size < r.maxSize { + keys = append(keys, dg.Hash) + } else { + missingDigests = append(missingDigests, dg) + } } blobs := r.readBlobs(keys, true) if blobs == nil { return r.elan.BatchDownload(dgs) } - missingDigests := make([]digest.Digest, 0, len(dgs)) ret := make(map[digest.Digest][]byte, len(dgs)) for i, blob := range blobs { if blob == nil { @@ -245,16 +257,20 @@ func (r *elanRedisWrapper) writeBlobs(uploads []interface{}) { } func (r *elanRedisWrapper) ReadToFile(dg digest.Digest, filename string, compressed bool) error { - blob, err := r.readRedis.Get(context.Background(), dg.Hash).Bytes() - if err != nil { - if err != redis.Nil { // Not found. - log.Warning("Error reading blob from Redis: %s", err) + if dg.Size < r.maxSize { + blob, err := r.readRedis.Get(context.Background(), dg.Hash).Bytes() + if err != nil { + if err != redis.Nil { // Not found. + log.Warning("Error reading blob from Redis: %s", err) + } else { + redisMisses.Inc() + } + } else { + redisHits.Inc() + return os.WriteFile(filename, blob, 0644) } - redisMisses.Inc() - return r.elan.ReadToFile(dg, filename, compressed) } - redisHits.Inc() - return os.WriteFile(filename, blob, 0644) + return r.elan.ReadToFile(dg, filename, compressed) } func (r *elanRedisWrapper) GetDirectoryTree(dg *pb.Digest, usePacks bool) ([]*pb.Directory, error) {