Skip to content

Commit

Permalink
add check before save manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
cnfatal committed Dec 21, 2023
1 parent 2dfe14d commit 08d140a
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 29 deletions.
6 changes: 4 additions & 2 deletions pkg/client/progress/bar.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Bar struct {

nameindex int // scroll name index
refreshcount int // refresh count for scroll name
mu sync.Mutex
mu sync.RWMutex
mp *MultiBar
}

Expand Down Expand Up @@ -52,11 +52,13 @@ func (b *Bar) SetDone() {

func (r *Bar) Notify() {
if r.mp != nil {
r.mp.print()
r.mp.haschange = true
}
}

func (b *Bar) Print(w io.Writer) {
b.mu.RLock()
defer b.mu.RUnlock()
processwidth := b.Width

buff := make([]byte, processwidth)
Expand Down
2 changes: 2 additions & 0 deletions pkg/client/progress/mbar.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func (m *MultiBar) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
// print once
m.print()
return
case <-t.C:
if m.haschange {
Expand Down
10 changes: 9 additions & 1 deletion pkg/registry/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ type FsObjectMeta struct {
Name string
Size int64
LastModified time.Time
Metadata map[string]string
ContentType string
}

type FSProvider interface {
Put(ctx context.Context, path string, content BlobContent) error
Get(ctx context.Context, path string) (*BlobContent, error)
Stat(ctx context.Context, path string) (FsObjectMeta, error)
Remove(ctx context.Context, path string, recursive bool) error
Exists(ctx context.Context, path string) (bool, error)
List(ctx context.Context, path string, recursive bool) ([]FsObjectMeta, error)
Expand All @@ -37,3 +38,10 @@ func StringDeref(ptr *string, def string) string {
}
return def
}

func TimeDeref(ptr *time.Time, def time.Time) time.Time {
if ptr != nil {
return *ptr
}
return def
}
39 changes: 29 additions & 10 deletions pkg/registry/fs_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ func NewLocalFSProvider(options *LocalFSOptions) (*LocalFSProvider, error) {
}

type localFileMeta struct {
ContentType string `json:"contentType,omitempty"`
ContentLength int64 `json:"contentLength,omitempty"`
ContentEncoding string `json:"contentEncoding,omitempty"`
ContentType string `json:"contentType,omitempty"`
ContentLength int64 `json:"contentLength,omitempty"`
}

func (f *LocalFSProvider) Put(ctx context.Context, path string, content BlobContent) error {
Expand All @@ -61,10 +60,9 @@ func (f *LocalFSProvider) Get(ctx context.Context, path string) (*BlobContent, e
return nil, err
}
return &BlobContent{
ContentType: meta.ContentType,
ContentLength: meta.ContentLength,
ContentEncoding: meta.ContentEncoding,
Content: stream,
ContentType: meta.ContentType,

Content: stream,
}, nil
}

Expand All @@ -86,6 +84,20 @@ func (f *LocalFSProvider) Exists(ctx context.Context, path string) (bool, error)
return false, err
}

func (f *LocalFSProvider) Stat(ctx context.Context, path string) (FsObjectMeta, error) {
fi, err := os.Stat(iopath.Join(f.basepath, path))
if err != nil {
return FsObjectMeta{}, err
}
meta, _ := f.readmeta(path)
return FsObjectMeta{
Name: path,
Size: fi.Size(),
LastModified: fi.ModTime(),
ContentType: meta.ContentType,
}, nil
}

func (f *LocalFSProvider) List(ctx context.Context, path string, recursive bool) ([]FsObjectMeta, error) {
out := []FsObjectMeta{}
if recursive {
Expand Down Expand Up @@ -142,9 +154,8 @@ func (f *LocalFSProvider) List(ctx context.Context, path string, recursive bool)

func (f *LocalFSProvider) writemeta(path string, content BlobContent) error {
meta := localFileMeta{
ContentType: content.ContentType,
ContentLength: content.ContentLength,
ContentEncoding: content.ContentEncoding,
ContentType: content.ContentType,
ContentLength: content.ContentLength,
}
jsonData, err := json.MarshalIndent(meta, "", " ")
if err != nil {
Expand Down Expand Up @@ -174,6 +185,13 @@ func (f *LocalFSProvider) getdata(path string) (io.ReadCloser, error) {
}

func (f *LocalFSProvider) readmeta(path string) (*localFileMeta, error) {
fi, err := os.Stat(iopath.Join(f.basepath, path))
if err != nil {
return nil, err
}
if fi.IsDir() {
return nil, os.ErrNotExist
}
metafile := iopath.Join(f.basepath, path+".meta")
raw, err := os.ReadFile(metafile)
if err != nil {
Expand All @@ -183,5 +201,6 @@ func (f *LocalFSProvider) readmeta(path string) (*localFileMeta, error) {
if err := json.Unmarshal(raw, &meta); err != nil {
return nil, err
}
meta.ContentLength = fi.Size()
return &meta, nil
}
27 changes: 23 additions & 4 deletions pkg/registry/fs_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package registry
import (
"context"
"errors"
"os"
"path"
"strings"
"time"
Expand Down Expand Up @@ -141,10 +142,9 @@ func (m *S3StorageProvider) Get(ctx context.Context, path string) (*BlobContent,
return nil, err
}
return &BlobContent{
Content: getobjout.Body,
ContentType: StringDeref(getobjout.ContentType, ""),
ContentLength: getobjout.ContentLength,
ContentEncoding: StringDeref(getobjout.ContentEncoding, ""),
Content: getobjout.Body,
ContentType: StringDeref(getobjout.ContentType, ""),
ContentLength: getobjout.ContentLength,
}, nil
}

Expand All @@ -162,6 +162,25 @@ func (m *S3StorageProvider) Exists(ctx context.Context, path string) (bool, erro
return true, nil
}

func (m *S3StorageProvider) Stat(ctx context.Context, path string) (FsObjectMeta, error) {
headobjout, err := m.Client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(m.Bucket),
Key: m.prefixedKey(path),
})
if err != nil {
if IsS3StorageNotFound(err) {
return FsObjectMeta{}, os.ErrNotExist
}
return FsObjectMeta{}, err
}
return FsObjectMeta{
Name: path,
Size: headobjout.ContentLength,
LastModified: TimeDeref(headobjout.LastModified, time.Time{}),
ContentType: StringDeref(headobjout.ContentType, ""),
}, nil
}

func (m *S3StorageProvider) List(ctx context.Context, path string, recursive bool) ([]FsObjectMeta, error) {
prefix := *m.prefixedKey(path)
if !strings.HasSuffix(prefix, "/") {
Expand Down
1 change: 0 additions & 1 deletion pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ func (s *Registry) GetBlob(w http.ResponseWriter, r *http.Request) {

w.Header().Set("Content-Length", strconv.Itoa(int(result.ContentLength)))
w.Header().Set("Content-Type", result.ContentType)
w.Header().Set("Content-Encoding", result.ContentEncoding)
w.WriteHeader(http.StatusOK)
io.Copy(w, result.Content)
return
Expand Down
13 changes: 9 additions & 4 deletions pkg/registry/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ var (
)

type BlobContent struct {
ContentType string
ContentLength int64
ContentEncoding string
Content io.ReadCloser
ContentType string
ContentLength int64
Content io.ReadCloser
}

type BlobMeta struct {
ContentType string
ContentLength int64
}

type RegistryStore interface {
Expand All @@ -43,6 +47,7 @@ type RegistryStore interface {
DeleteBlob(ctx context.Context, repository string, digest digest.Digest) error
PutBlob(ctx context.Context, repository string, digest digest.Digest, content BlobContent) error
ExistsBlob(ctx context.Context, repository string, digest digest.Digest) (bool, error)
GetBlobMeta(ctx context.Context, repository string, digest digest.Digest) (BlobMeta, error)

GetBlobLocation(ctx context.Context, repository string, digest digest.Digest,
purpose string, properties map[string]string) (*BlobLocation, error)
Expand Down
9 changes: 9 additions & 0 deletions pkg/registry/store_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,15 @@ func (m *FSRegistryStore) ExistsBlob(ctx context.Context, repository string, dig
}
}

func (m *FSRegistryStore) GetBlobMeta(ctx context.Context, repository string, digest digest.Digest) (BlobMeta, error) {
path := BlobDigestPath(repository, digest)
meta, err := m.FS.Stat(ctx, path)
if err != nil {
return BlobMeta{}, errors.NewInternalError(err)
}
return BlobMeta{ContentType: meta.ContentType, ContentLength: meta.Size}, nil
}

func (m *FSRegistryStore) GetBlob(ctx context.Context, repository string, digest digest.Digest) (*BlobContent, error) {
path := BlobDigestPath(repository, digest)
content, err := m.FS.Get(ctx, path)
Expand Down
33 changes: 26 additions & 7 deletions pkg/registry/store_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,23 @@ func (s *S3RegistryStore) PutManifest(ctx context.Context, repository string, re
// complete multipart upload
for _, blob := range manifest.Blobs {
path := BlobDigestPath(repository, blob.Digest)
if err := s.completeMultipartUpload(ctx, path, blob.Size); err != nil {
return err
if blob.Size > MultiPartUploadThreshold {
if err := s.completeMultipartUpload(ctx, path, blob.Size); err != nil {
return err
}
} else {
// check if uploadid exists and match size
meta, err := s.fs.GetBlobMeta(ctx, repository, blob.Digest)
if err != nil {
return err
}
if meta.ContentLength != blob.Size {
// remove this blob
if err := s.fs.DeleteBlob(ctx, repository, blob.Digest); err != nil {
return err
}
return fmt.Errorf("size mismatch: %d != %d", meta.ContentLength, blob.Size)
}
}
}
return s.fs.PutManifest(ctx, repository, reference, contentType, manifest)
Expand Down Expand Up @@ -100,6 +115,10 @@ func (s *S3RegistryStore) ExistsBlob(ctx context.Context, repository string, dig
return s.fs.ExistsBlob(ctx, repository, digest)
}

func (s *S3RegistryStore) GetBlobMeta(ctx context.Context, repository string, digest digest.Digest) (BlobMeta, error) {
return s.fs.GetBlobMeta(ctx, repository, digest)
}

func (s *S3RegistryStore) GetBlobLocation(ctx context.Context, repository string, digest digest.Digest,
purpose string, properties map[string]string,
) (*BlobLocation, error) {
Expand Down Expand Up @@ -215,11 +234,6 @@ type presignedPart struct {

func (s *S3RegistryStore) getUploadId(ctx context.Context, path string, withCreate bool) (*string, error) {
key := s.provider.prefixedKey(path)
input := &s3.CreateMultipartUploadInput{
Bucket: aws.String(s.provider.Bucket),
Key: key,
Expires: aws.Time(time.Now().Add(s.provider.Expire)),
}
existsupload, err := s.provider.Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{
Bucket: aws.String(s.provider.Bucket),
Delimiter: aws.String("/"),
Expand All @@ -235,6 +249,11 @@ func (s *S3RegistryStore) getUploadId(ctx context.Context, path string, withCrea
if !withCreate {
return nil, ErrUploadNotFound
}
input := &s3.CreateMultipartUploadInput{
Bucket: aws.String(s.provider.Bucket),
Key: key,
Expires: aws.Time(time.Now().Add(s.provider.Expire)),
}
createOutput, err := s.provider.Client.CreateMultipartUpload(ctx, input)
if err != nil {
return nil, err
Expand Down

0 comments on commit 08d140a

Please sign in to comment.