diff --git a/cache/cache_blob.go b/cache/cache_blob.go index 5eebb48..8d805b4 100644 --- a/cache/cache_blob.go +++ b/cache/cache_blob.go @@ -51,6 +51,13 @@ func cleanDigest(blob string) string { return strings.TrimPrefix(blob, "sha256:") } +func ensureDigestPrefix(blob string) string { + if !strings.HasPrefix(blob, "sha256:") { + return "sha256:" + blob + } + return blob +} + func blobCachePath(blob string) string { blob = cleanDigest(blob) return path.Join("/docker/registry/v2/blobs/sha256", blob[:2], blob, "data") diff --git a/cache/cache_manifest.go b/cache/cache_manifest.go index cc52591..785ddb7 100644 --- a/cache/cache_manifest.go +++ b/cache/cache_manifest.go @@ -12,7 +12,7 @@ import ( ) func (c *Cache) RelinkManifest(ctx context.Context, host, image, tag string, blob string) error { - blob = cleanDigest(blob) + blob = ensureDigestPrefix(blob) _, err := c.StatBlob(ctx, blob) if err != nil { @@ -20,13 +20,13 @@ func (c *Cache) RelinkManifest(ctx context.Context, host, image, tag string, blo } manifestLinkPath := manifestTagCachePath(host, image, tag) - err = c.PutContent(ctx, manifestLinkPath, []byte("sha256:"+blob)) + err = c.PutContent(ctx, manifestLinkPath, []byte(blob)) if err != nil { return fmt.Errorf("put manifest link path %s error: %w", manifestLinkPath, err) } manifestBlobLinkPath := manifestRevisionsCachePath(host, image, blob) - err = c.PutContent(ctx, manifestBlobLinkPath, []byte("sha256:"+blob)) + err = c.PutContent(ctx, manifestBlobLinkPath, []byte(blob)) if err != nil { return fmt.Errorf("put manifest revisions path %s error: %w", manifestLinkPath, err) } @@ -144,6 +144,41 @@ func (c *Cache) StatManifest(ctx context.Context, host, image, tagOrBlob string) return stat.Size() != 0, nil } +func (c *Cache) StatOrRelinkManifest(ctx context.Context, host, image, tag string, blob string) (bool, error) { + manifestLinkPath := manifestTagCachePath(host, image, tag) + + digestContent, err := c.GetContent(ctx, manifestLinkPath) + if err != nil { + return false, fmt.Errorf("get manifest link path %s error: %w", manifestLinkPath, err) + } + digest := string(digestContent) + stat, err := c.StatBlob(ctx, digest) + if err != nil { + return false, err + } + + if stat.Size() == 0 { + return false, nil + } + + blob = ensureDigestPrefix(blob) + if digest == blob { + return true, nil + } + + err = c.PutContent(ctx, manifestLinkPath, []byte(blob)) + if err != nil { + return false, fmt.Errorf("put manifest link path %s error: %w", manifestLinkPath, err) + } + + manifestBlobLinkPath := manifestRevisionsCachePath(host, image, blob) + err = c.PutContent(ctx, manifestBlobLinkPath, []byte(blob)) + if err != nil { + return false, fmt.Errorf("put manifest revisions path %s error: %w", manifestLinkPath, err) + } + return true, nil +} + func manifestRevisionsCachePath(host, image, blob string) string { blob = cleanDigest(blob) return path.Join("/docker/registry/v2/repositories", host, image, "_manifests/revisions/sha256", blob, "link") diff --git a/sync/sync.go b/sync/sync.go index 821fbcf..2decfd6 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -11,6 +11,7 @@ import ( "sync" "github.com/daocloud/crproxy/cache" + "github.com/daocloud/crproxy/internal/utils" "github.com/distribution/reference" "github.com/docker/distribution" "github.com/docker/distribution/manifest/manifestlist" @@ -43,6 +44,7 @@ type SyncManager struct { logger *slog.Logger deep bool + uniq map[digest.Digest]struct{} excludeTags []*regexp.Regexp filterPlatform func(pf manifestlist.PlatformSpec) bool } @@ -89,6 +91,7 @@ func NewSyncManager(opts ...Option) (*SyncManager, error) { c := &SyncManager{ logger: slog.Default(), transport: http.DefaultTransport, + uniq: map[digest.Digest]struct{}{}, } for _, opt := range opts { opt(c) @@ -134,6 +137,7 @@ func (c *SyncManager) Image(ctx context.Context, image string) error { path := reference.Path(named) + host, path = utils.CorrectImage(host, path) name := newNameWithoutDomain(named, path) repo, err := client.NewRepository(name, "https://"+host, c.transport) @@ -148,14 +152,13 @@ func (c *SyncManager) Image(ctx context.Context, image string) error { bs := repo.Blobs(ctx) - uniq := map[digest.Digest]struct{}{} blobCallback := func(caches []*cache.Cache, dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec, name string) error { - _, ok := uniq[dgst] + _, ok := c.uniq[dgst] if ok { c.logger.Info("skip blob by unique", "image", image, "digest", dgst) return nil } - uniq[dgst] = struct{}{} + c.uniq[dgst] = struct{}{} blob := dgst.String() var subCaches []*cache.Cache @@ -186,12 +189,14 @@ func (c *SyncManager) Image(ctx context.Context, image string) error { } defer f.Close() + c.logger.Info("start sync blob", "image", image, "digest", dgst, "platform", pf, "name", name) + if len(subCaches) == 1 { n, err := subCaches[0].PutBlob(ctx, blob, f) if err != nil { return fmt.Errorf("put blob failed: %w", err) } - c.logger.Info("sync blob", "image", image, "digest", dgst, "size", n, "platform", pf, "name", name) + c.logger.Info("finish sync blob", "image", image, "digest", dgst, "size", n, "platform", pf, "name", name) return nil } @@ -224,7 +229,7 @@ func (c *SyncManager) Image(ctx context.Context, image string) error { wg.Wait() - c.logger.Info("sync blob", "image", image, "digest", dgst, "platform", pf, "name", name, "size", n) + c.logger.Info("finish sync blob", "image", image, "digest", dgst, "size", n, "platform", pf, "name", name) return nil } @@ -248,7 +253,7 @@ func (c *SyncManager) Image(ctx context.Context, image string) error { switch ref.(type) { case reference.Digested, reference.Tagged: - err = c.syncLayerFromManifestList(ctx, image, ms, ts, ref, blobCallback, manifestCallback, host+"/"+ref.String()) + err = c.syncLayerFromManifestList(ctx, host, path, image, ms, ts, ref, blobCallback, manifestCallback, host+"/"+ref.String()) if err != nil { return fmt.Errorf("sync layer from manifest list failed: %w", err) } @@ -277,7 +282,7 @@ func (c *SyncManager) Image(ctx context.Context, image string) error { if err != nil { return fmt.Errorf("with tag failed: %w", err) } - err = c.syncLayerFromManifestList(ctx, image, ms, ts, t, blobCallback, manifestCallback, host+"/"+t.String()) + err = c.syncLayerFromManifestList(ctx, host, path, image, ms, ts, t, blobCallback, manifestCallback, host+"/"+t.String()) if err != nil { return fmt.Errorf("sync layer from manifest list failed: %w", err) } @@ -302,7 +307,7 @@ type manifestPushPending struct { playload []byte } -func (c *SyncManager) syncLayerFromManifestList(ctx context.Context, image string, ms distribution.ManifestService, ts distribution.TagService, ref reference.Reference, +func (c *SyncManager) syncLayerFromManifestList(ctx context.Context, host, path, image string, ms distribution.ManifestService, ts distribution.TagService, ref reference.Reference, digestCallback func(caches []*cache.Cache, dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec, name string) error, manifestCallback func(caches []*cache.Cache, tagOrHash string, m distribution.Manifest) error, name string) error { @@ -323,8 +328,8 @@ func (c *SyncManager) syncLayerFromManifestList(ctx context.Context, image strin hash = r.Digest() if !c.deep { for _, cache := range c.caches { - stat, err := cache.StatBlob(ctx, hash.String()) - if err != nil || stat.Size() == 0 { + b, _ := cache.StatManifest(ctx, host, path, hash.String()) + if !b { caches = append(caches, cache) } } @@ -350,8 +355,8 @@ func (c *SyncManager) syncLayerFromManifestList(ctx context.Context, image strin hash = desc.Digest if !c.deep { for _, cache := range c.caches { - stat, err := cache.StatBlob(ctx, hash.String()) - if err != nil || stat.Size() == 0 { + b, _ := cache.StatOrRelinkManifest(ctx, host, path, tag, hash.String()) + if !b { caches = append(caches, cache) } }