Skip to content

Commit

Permalink
feat: add redis cache support
Browse files Browse the repository at this point in the history
Currently, we have dynamoDB as the remote shared cache but ideal only
for the cloud use case.
For on-prem use case, add support for redis.

Signed-off-by: Ramkumar Chinchani <[email protected]>
  • Loading branch information
rchincha authored and eusebiu-constantin-petu-dbk committed Apr 30, 2024
1 parent a702384 commit cfca8b9
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 136 deletions.
7 changes: 0 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ require (
golang.org/x/oauth2 v0.19.0
modernc.org/sqlite v1.29.8
oras.land/oras-go/v2 v2.5.0
zotregistry.io/zot v1.4.3
)

require (
Expand Down Expand Up @@ -95,7 +94,6 @@ require (
github.com/alecthomas/chroma v0.10.0 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 // indirect
github.com/apex/log v1.9.0 // indirect
github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect
github.com/aquasecurity/table v1.8.0 // indirect
github.com/aquasecurity/tml v0.6.1 // indirect
Expand Down Expand Up @@ -186,11 +184,8 @@ require (
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/nozzle/throttler v0.0.0-20180817012639-2ea982251481 // indirect
github.com/oleiade/reflections v1.0.1 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/opencontainers/selinux v1.11.0 // indirect
github.com/opencontainers/umoci v0.4.8-0.20210922062158-e60a0cc726e6 // indirect
github.com/openvex/go-vex v0.2.5 // indirect
github.com/oras-project/artifacts-spec v1.0.0-rc.2 // indirect
github.com/owenrumney/go-sarif/v2 v2.3.0 // indirect
github.com/package-url/packageurl-go v0.1.2 // indirect
github.com/pborman/uuid v1.2.1 // indirect
Expand All @@ -215,9 +210,7 @@ require (
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spdx/tools-golang v0.5.4-0.20231108154018-0c0f394b5e1a // indirect
github.com/tetratelabs/wazero v1.7.0 // indirect
github.com/urfave/cli v1.22.14 // indirect
github.com/urfave/cli/v2 v2.27.1 // indirect
github.com/vbatts/go-mtree v0.5.0 // indirect
github.com/vbauerster/mpb/v8 v8.7.2 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
Expand Down
86 changes: 0 additions & 86 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/storage/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Create(dbtype string, parameters interface{}, log zlog.Logger) (cache.Cache
}
case "redis":
{
return cache.NewRedisCache(parameters, log), nil
return cache.NewRedisCache(parameters, log)
}
default:
{
Expand Down
113 changes: 80 additions & 33 deletions pkg/storage/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
godigest "github.com/opencontainers/go-digest"
"github.com/redis/go-redis/v9"

"zotregistry.io/zot/errors"
zlog "zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/storage/constants"
zerr "zotregistry.dev/zot/errors"
zlog "zotregistry.dev/zot/pkg/log"
"zotregistry.dev/zot/pkg/storage/constants"
)

type RedisDriver struct {
Expand All @@ -23,33 +23,39 @@ type RedisDriver struct {

type RedisDriverParameters struct {
RootDir string
Url string // https://github.com/redis/redis-specifications/blob/master/uri/redis.txt
URL string // https://github.com/redis/redis-specifications/blob/master/uri/redis.txt
UseRelPaths bool
}

func NewRedisCache(parameters interface{}, log zlog.Logger) Cache {
func NewRedisCache(parameters interface{}, log zlog.Logger) (*RedisDriver, error) {
properParameters, ok := parameters.(RedisDriverParameters)
if !ok {
panic("Failed type assertion")
log.Error().Err(zerr.ErrTypeAssertionFailed).Msgf("failed to cast type, expected type '%T' but got '%T'",
BoltDBDriverParameters{}, parameters)

return nil, zerr.ErrTypeAssertionFailed
}

connOpts, err := redis.ParseURL(properParameters.Url)
connOpts, err := redis.ParseURL(properParameters.URL)
if err != nil {
log.Error().Err(err).Str("directory", properParameters.Url).Msg("unable to connect to redis")
log.Error().Err(err).Str("directory", properParameters.URL).Msg("failed to connect to redis")
}
cacheDB := redis.NewClient(connOpts)

if _, err := cacheDB.Ping(context.Background()).Result(); err != nil {
log.Error().Err(err).Msg("unable to ping redis cache")
return nil
log.Error().Err(err).Msg("failed to ping redis cache")

return nil, err
}

return &RedisDriver{
driver := &RedisDriver{
db: cacheDB,
log: log,
rootDir: properParameters.RootDir,
useRelPaths: properParameters.UseRelPaths,
}

return driver, nil
}

func join(xs ...string) string {
Expand All @@ -66,42 +72,57 @@ func (d *RedisDriver) Name() string {

func (d *RedisDriver) PutBlob(digest godigest.Digest, path string) error {
ctx := context.TODO()

if path == "" {
d.log.Error().Err(errors.ErrEmptyValue).Str("digest", digest.String()).Msg("empty path provided")
return errors.ErrEmptyValue
d.log.Error().Err(zerr.ErrEmptyValue).Str("digest", digest.String()).Msg("failed to provide non-empty path")

return zerr.ErrEmptyValue
}

// use only relative (to rootDir) paths on blobs
var err error
if d.useRelPaths {
path, err = filepath.Rel(d.rootDir, path)
if err != nil {
d.log.Error().Err(err).Str("path", path).Msg("unable to get relative path")
d.log.Error().Err(err).Str("path", path).Msg("failed to get relative path")
}
}

if len(path) == 0 {
return errors.ErrEmptyValue
return zerr.ErrEmptyValue
}

// see if the blob digest exists.
exists, err := d.db.HExists(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
return err
}
if _, err := d.db.TxPipelined(ctx, func(tx redis.Pipeliner) error {

if _, err := d.db.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
if !exists {
// add the key value pair [digest, path] to blobs:origin if not exist already. the path becomes the canonical blob
// we do this in a transaction to make sure that if something is in the set, then it is guaranteed to always have a path
// note that there is a race, but the worst case is that a different origin path that is still valid is used.
if err := tx.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String(), path).Err(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).Str("value", path).Msg("unable to put record")
// add the key value pair [digest, path] to blobs:origin if not
// exist already. the path becomes the canonical blob we do this in
// a transaction to make sure that if something is in the set, then
// it is guaranteed to always have a path note that there is a
// race, but the worst case is that a different origin path that is
// still valid is used.
if err := txrp.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket),
digest.String(), path).Err(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).
Str("value", path).Msg("unable to put record")

return err
}
}
// add path to the set of paths which the digest represents
if err := d.db.SAdd(ctx, join(constants.BlobsCache, constants.DuplicatesBucket, digest.String()), path).Err(); err != nil {
d.log.Error().Err(err).Str("sadd", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).Str("value", path).Msg("unable to put record")
if err := d.db.SAdd(ctx, join(constants.BlobsCache, constants.DuplicatesBucket,
digest.String()), path).Err(); err != nil {
d.log.Error().Err(err).Str("sadd", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
Str("value", path).Msg("unable to put record")

return err
}

return nil
}); err != nil {
return err
Expand All @@ -112,37 +133,52 @@ func (d *RedisDriver) PutBlob(digest godigest.Digest, path string) error {

func (d *RedisDriver) GetBlob(digest godigest.Digest) (string, error) {
ctx := context.TODO()

path, err := d.db.HGet(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
if goerrors.Is(err, redis.Nil) {
return "", errors.ErrCacheMiss
return "", zerr.ErrCacheMiss
}
d.log.Error().Err(err).Str("hget", join(constants.BlobsCache, constants.OriginalBucket)).Str("digest", digest.String()).Msg("unable to get record")

d.log.Error().Err(err).Str("hget", join(constants.BlobsCache, constants.OriginalBucket)).
Str("digest", digest.String()).Msg("unable to get record")

return "", err
}

return path, nil
}

func (d *RedisDriver) HasBlob(digest godigest.Digest, blob string) bool {
ctx := context.TODO()
// see if we are in the set
exists, err := d.db.SIsMember(ctx, join(constants.BlobsCache, constants.DuplicatesBucket, digest.String()), blob).Result()
exists, err := d.db.SIsMember(ctx, join(constants.BlobsCache, constants.DuplicatesBucket,
digest.String()), blob).Result()
if err != nil {
d.log.Error().Err(err).Str("sismember", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).Str("digest", digest.String()).Msg("unable to get record")
d.log.Error().Err(err).Str("sismember", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
Str("digest", digest.String()).Msg("unable to get record")

return false
}

if !exists {
return false
}

// see if the path entry exists. is this actually needed? i guess it doesn't really hurt (it is fast)
exists, err = d.db.HExists(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
d.log.Error().Err(err).Str("hexists", join(constants.BlobsCache, constants.OriginalBucket)).Str("digest", digest.String()).Msg("unable to get record")

d.log.Error().Err(err).Str("hexists", join(constants.BlobsCache, constants.OriginalBucket)).
Str("digest", digest.String()).Msg("unable to get record")

if err != nil {
return false
}

if !exists {
return false
}

return true
}

Expand All @@ -154,7 +190,7 @@ func (d *RedisDriver) DeleteBlob(digest godigest.Digest, path string) error {
if d.useRelPaths {
path, err = filepath.Rel(d.rootDir, path)
if err != nil {
d.log.Error().Err(err).Str("path", path).Msg("unable to get relative path")
d.log.Error().Err(err).Str("path", path).Msg("failed to get relative path")
}
}

Expand All @@ -163,17 +199,21 @@ func (d *RedisDriver) DeleteBlob(digest godigest.Digest, path string) error {
// delete path from the set of paths which the digest represents
_, err = d.db.SRem(ctx, pathSet, path).Result()
if err != nil {
d.log.Error().Err(err).Str("srem", pathSet).Str("value", path).Msg("unable to delete record")
d.log.Error().Err(err).Str("srem", pathSet).Str("value", path).Msg("failed to delete record")

return err
}

currentPath, err := d.GetBlob(digest)
if err != nil {
return err
}

if currentPath != path {
// nothing we need to do, return nil yay
return nil
}

// we need to set a new path
newPath, err := d.db.SRandMember(ctx, pathSet).Result()
if err != nil {
Expand All @@ -182,13 +222,20 @@ func (d *RedisDriver) DeleteBlob(digest godigest.Digest, path string) error {
if err != nil {
return err
}

return nil
}
d.log.Error().Err(err).Str("srandmember", pathSet).Msg("unable to get new path")

d.log.Error().Err(err).Str("srandmember", pathSet).Msg("failed to get new path")

return err
}
if _, err := d.db.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String(), newPath).Result(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).Str("value", newPath).Msg("unable to put record")

if _, err := d.db.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket),
digest.String(), newPath).Result(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).Str("value", newPath).
Msg("unable to put record")

return err
}

Expand Down
21 changes: 12 additions & 9 deletions pkg/storage/cache/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,26 @@ import (
"github.com/alicebob/miniredis/v2"
. "github.com/smartystreets/goconvey/convey"

"zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/cache"
"zotregistry.dev/zot/errors"
"zotregistry.dev/zot/pkg/log"
"zotregistry.dev/zot/pkg/storage"
"zotregistry.dev/zot/pkg/storage/cache"
)

func TestRedisCache(t *testing.T) {
mr := miniredis.RunT(t)
Convey("Make a new cache", t, func() {
miniRedis := miniredis.RunT(t)

Convey("Make a new cache", t, func() {
dir := t.TempDir()

log := log.NewLogger("debug", "")
So(log, ShouldNotBeNil)

So(func() { _, _ = storage.Create("redis", "failTypeAssertion", log) }, ShouldPanic)
_, err := storage.Create("redis", "failTypeAssertion", log)
So(err, ShouldNotBeNil)

cacheDriver, _ := storage.Create("redis", cache.RedisDriverParameters{dir, "redis://" + mr.Addr(), true}, log)
cacheDriver, _ := storage.Create("redis",
cache.RedisDriverParameters{dir, "redis://" + miniRedis.Addr(), true}, log)
So(cacheDriver, ShouldNotBeNil)

name := cacheDriver.Name()
Expand Down Expand Up @@ -61,7 +63,8 @@ func TestRedisCache(t *testing.T) {
So(err, ShouldNotBeNil)
So(err, ShouldEqual, errors.ErrEmptyValue)

cacheDriver, _ = storage.Create("redis", cache.RedisDriverParameters{t.TempDir(), "redis://" + mr.Addr() + "/5", false}, log)
cacheDriver, _ = storage.Create("redis",
cache.RedisDriverParameters{t.TempDir(), "redis://" + miniRedis.Addr() + "/5", false}, log)
So(cacheDriver, ShouldNotBeNil)

err = cacheDriver.PutBlob("key1", "originalBlobPath")
Expand Down
15 changes: 15 additions & 0 deletions test/blackbox/helpers_redis.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
ROOT_DIR=$(git rev-parse --show-toplevel)
OS=$(go env GOOS)
ARCH=$(go env GOARCH)
ZOT_MINIMAL_PATH=${ROOT_DIR}/bin/zot-${OS}-${ARCH}-minimal
ZB_PATH=${ROOT_DIR}/bin/zb-${OS}-${ARCH}
TEST_DATA_DIR=${BATS_FILE_TMPDIR}/test/data

function redis_start() {
docker run -d --name redis_server -p 6379:6379 redis
}

function redis_stop() {
docker stop redis_server
docker rm -f redis_server
}
Loading

0 comments on commit cfca8b9

Please sign in to comment.