From 88221cd95f8ffdfd041cc8c4a50321f15edf12f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 2 Dec 2024 22:35:37 +0100 Subject: [PATCH 1/4] chore(go.mod): use rclone with list dir fix (#4132) --- go.mod | 2 +- go.sum | 4 +- .../testdata/auth_token_overwrite.golden.yaml | 1 + pkg/config/agent/testdata/basic.golden.yaml | 1 + .../testdata/debug_overwrite.golden.yaml | 1 + .../testdata/https_overwrite.golden.yaml | 1 + .../agent/testdata/multiple_cpus.golden.yaml | 1 + .../testdata/prometheus_overwrite.golden.yaml | 1 + .../testdata/scylla_overwrite.golden.yaml | 1 + .../agent/testdata/structs_empty.golden.yaml | 1 + .../rclone/backend/azureblob/azureblob.go | 140 ++++++++++-------- .../googlecloudstorage/googlecloudstorage.go | 111 ++++++++------ .../github.com/rclone/rclone/backend/s3/s3.go | 116 ++++++++------- vendor/github.com/rclone/rclone/fs/config.go | 1 + vendor/github.com/rclone/rclone/fs/fs.go | 19 ++- .../github.com/rclone/rclone/fs/list/list.go | 31 ++++ .../github.com/rclone/rclone/fs/walk/walk.go | 72 ++++++--- vendor/modules.txt | 4 +- 18 files changed, 322 insertions(+), 186 deletions(-) diff --git a/go.mod b/go.mod index c9f452f32e..d03870678a 100644 --- a/go.mod +++ b/go.mod @@ -127,6 +127,6 @@ require ( replace ( github.com/gocql/gocql => github.com/scylladb/gocql v1.12.0 - github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e + github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668 google.golang.org/api v0.114.0 => github.com/scylladb/google-api-go-client v0.34.1-patched ) diff --git a/go.sum b/go.sum index 4eec7b07b4..8e5cba721e 100644 --- a/go.sum +++ b/go.sum @@ -1051,8 +1051,8 @@ github.com/scylladb/gocqlx/v2 v2.8.0 h1:f/oIgoEPjKDKd+RIoeHqexsIQVIbalVmT+axwvUq github.com/scylladb/gocqlx/v2 v2.8.0/go.mod h1:4/+cga34PVqjhgSoo5Nr2fX1MQIqZB5eCE5DK4xeDig= github.com/scylladb/google-api-go-client v0.34.1-patched h1:DW+T0HA+74o6FDr3TFzVwgESabOB1eTwb4woE6oUziY= github.com/scylladb/google-api-go-client v0.34.1-patched/go.mod h1:RriRmS2wJXH+2yd9PRTEcR380U9AXmurWwznqVhzsSc= -github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e h1:lJRphCtu+nKd+mfo8whOTeFkgjMWvk8iCSlqgibKSa8= -github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM= +github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668 h1:3beIciiaygPjiUPLN0+wVlnfe4JnB1Wb/uV1MChzRbM= +github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM= github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241104134613-aba35605c28b h1:JRDV1d1FIiH0TIyHVmTAILAjQ2f8O4t7ZtZ/S+fT2sY= github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241104134613-aba35605c28b/go.mod h1:Tss7a99vrgds+B70w8ZFG3Skxfr9Br3kAzrKP2b3CmQ= github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241104134613-aba35605c28b h1:7CHNmPrQqSdApaEh5nkRL+D52KFHaOHVBBVDvytHEOY= diff --git a/pkg/config/agent/testdata/auth_token_overwrite.golden.yaml b/pkg/config/agent/testdata/auth_token_overwrite.golden.yaml index 1098be0202..5db6a11c88 100644 --- a/pkg/config/agent/testdata/auth_token_overwrite.golden.yaml +++ b/pkg/config/agent/testdata/auth_token_overwrite.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/basic.golden.yaml b/pkg/config/agent/testdata/basic.golden.yaml index 89d7faf302..4a4b500b08 100644 --- a/pkg/config/agent/testdata/basic.golden.yaml +++ b/pkg/config/agent/testdata/basic.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/debug_overwrite.golden.yaml b/pkg/config/agent/testdata/debug_overwrite.golden.yaml index cb08018b62..00121efa15 100644 --- a/pkg/config/agent/testdata/debug_overwrite.golden.yaml +++ b/pkg/config/agent/testdata/debug_overwrite.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/https_overwrite.golden.yaml b/pkg/config/agent/testdata/https_overwrite.golden.yaml index a9bf9c00f6..2220fb6597 100644 --- a/pkg/config/agent/testdata/https_overwrite.golden.yaml +++ b/pkg/config/agent/testdata/https_overwrite.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/multiple_cpus.golden.yaml b/pkg/config/agent/testdata/multiple_cpus.golden.yaml index f39d31ffd1..045607c5e8 100644 --- a/pkg/config/agent/testdata/multiple_cpus.golden.yaml +++ b/pkg/config/agent/testdata/multiple_cpus.golden.yaml @@ -69,6 +69,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/prometheus_overwrite.golden.yaml b/pkg/config/agent/testdata/prometheus_overwrite.golden.yaml index cc7225ec8a..98f6c706e8 100644 --- a/pkg/config/agent/testdata/prometheus_overwrite.golden.yaml +++ b/pkg/config/agent/testdata/prometheus_overwrite.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/scylla_overwrite.golden.yaml b/pkg/config/agent/testdata/scylla_overwrite.golden.yaml index d0b242140f..8befdcc56e 100644 --- a/pkg/config/agent/testdata/scylla_overwrite.golden.yaml +++ b/pkg/config/agent/testdata/scylla_overwrite.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/structs_empty.golden.yaml b/pkg/config/agent/testdata/structs_empty.golden.yaml index 89d7faf302..4a4b500b08 100644 --- a/pkg/config/agent/testdata/structs_empty.golden.yaml +++ b/pkg/config/agent/testdata/structs_empty.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/vendor/github.com/rclone/rclone/backend/azureblob/azureblob.go b/vendor/github.com/rclone/rclone/backend/azureblob/azureblob.go index e0478c2074..9398f559ff 100644 --- a/vendor/github.com/rclone/rclone/backend/azureblob/azureblob.go +++ b/vendor/github.com/rclone/rclone/backend/azureblob/azureblob.go @@ -1,5 +1,6 @@ // Package azureblob provides an interface to the Microsoft Azure blob object storage system +//go:build !plan9 && !solaris && !js && go1.14 // +build !plan9,!solaris,!js,go1.14 package azureblob @@ -861,6 +862,30 @@ func (f *Fs) list(ctx context.Context, container, directory, prefix string, addC return nil } +// listCB calls list with cb wrapped in walk.NewListRHelper. +func (f *Fs) listCB(ctx context.Context, container, directory, prefix string, addContainer, recourse bool, cb fs.ListRCallback) error { + if !f.containerOK(container) { + return fs.ErrorDirNotFound + } + list := walk.NewListRHelper(cb) + err := f.list(ctx, container, directory, prefix, addContainer, recourse, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItemInternal, isDirectory bool) error { + entry, err := f.itemToDirEntry(remote, object, isDirectory) + if err != nil { + return err + } + if entry != nil { + return list.Add(entry) + } + return nil + }) + if err != nil { + return err + } + // container must be present if listing succeeded + f.cache.MarkOK(container) + return list.Flush() +} + // Convert a list item into a DirEntry func (f *Fs) itemToDirEntry(remote string, object *azblob.BlobItemInternal, isDirectory bool) (fs.DirEntry, error) { if isDirectory { @@ -889,29 +914,6 @@ func (f *Fs) containerOK(container string) bool { return false } -// listDir lists a single directory -func (f *Fs) listDir(ctx context.Context, container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) { - if !f.containerOK(container) { - return nil, fs.ErrorDirNotFound - } - err = f.list(ctx, container, directory, prefix, addContainer, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItemInternal, isDirectory bool) error { - entry, err := f.itemToDirEntry(remote, object, isDirectory) - if err != nil { - return err - } - if entry != nil { - entries = append(entries, entry) - } - return nil - }) - if err != nil { - return nil, err - } - // container must be present if listing succeeded - f.cache.MarkOK(container) - return entries, nil -} - // listContainers returns all the containers to out func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err error) { if f.isLimited { @@ -944,7 +946,7 @@ func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err err // // This should return ErrDirNotFound if the directory isn't // found. -func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { container, directory := f.split(dir) if container == "" { if directory != "" { @@ -952,7 +954,37 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } return f.listContainers(ctx) } - return f.listDir(ctx, container, directory, f.rootDirectory, f.rootContainer == "") + + // Use callback for regular listing + var entries fs.DirEntries + err := f.listCB(ctx, container, directory, f.rootDirectory, f.rootContainer == "", false, func(e fs.DirEntries) error { + entries = append(entries, e...) + return nil + }) + if err != nil { + return nil, err + } + return entries, nil +} + +// ListCB calls callback to the objects and directories in dir as they are being listed. +// The callback might be called for just a subset of directory entries. +// When listing buckets, the callback is called just once for all of them. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't found. +func (f *Fs) ListCB(ctx context.Context, dir string, cb fs.ListRCallback) error { + container, directory := f.split(dir) + if container == "" { + entries, err := f.listContainers(ctx) + if err != nil { + return err + } + return cb(entries) + } + return f.listCB(ctx, container, directory, f.rootDirectory, f.rootContainer == "", false, cb) } // ListR lists the objects and directories of the Fs starting @@ -971,48 +1003,28 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // // Don't implement this unless you have a more efficient way // of listing recursively that doing a directory traversal. -func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { +func (f *Fs) ListR(ctx context.Context, dir string, cb fs.ListRCallback) error { container, directory := f.split(dir) - list := walk.NewListRHelper(callback) - listR := func(container, directory, prefix string, addContainer bool) error { - return f.list(ctx, container, directory, prefix, addContainer, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItemInternal, isDirectory bool) error { - entry, err := f.itemToDirEntry(remote, object, isDirectory) - if err != nil { - return err - } - return list.Add(entry) - }) - } if container == "" { entries, err := f.listContainers(ctx) if err != nil { return err } for _, entry := range entries { - err = list.Add(entry) + // Call callback on container right before listing its files + err = cb(fs.DirEntries{entry}) if err != nil { return err } container := entry.Remote() - err = listR(container, "", f.rootDirectory, true) + err = f.listCB(ctx, container, "", f.rootDirectory, true, true, cb) if err != nil { return err } - // container must be present if listing succeeded - f.cache.MarkOK(container) } - } else { - if !f.containerOK(container) { - return fs.ErrorDirNotFound - } - err = listR(container, directory, f.rootDirectory, f.rootContainer == "") - if err != nil { - return err - } - // container must be present if listing succeeded - f.cache.MarkOK(container) + return nil } - return list.Flush() + return f.listCB(ctx, container, directory, f.rootDirectory, f.rootContainer == "", true, cb) } // listContainerFn is called from listContainersToFn to handle a container @@ -1049,7 +1061,7 @@ func (f *Fs) listContainersToFn(fn listContainerFn) error { // Put the object into the container // -// Copy the reader in to the new object which is returned +// # Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { @@ -1181,9 +1193,9 @@ func (f *Fs) Purge(ctx context.Context, dir string) error { // Copy src to this remote using server-side copy operations. // -// This is stored with the remote path given +// # This is stored with the remote path given // -// It returns the destination Object and a possible error +// # It returns the destination Object and a possible error // // Will only be called if src.Fs().Name() == f.Name() // @@ -1303,11 +1315,12 @@ func (o *Object) setMetadata(metadata azblob.Metadata) { // decodeMetaDataFromPropertiesResponse sets the metadata from the data passed in // // Sets -// o.id -// o.modTime -// o.size -// o.md5 -// o.meta +// +// o.id +// o.modTime +// o.size +// o.md5 +// o.meta func (o *Object) decodeMetaDataFromPropertiesResponse(info *azblob.BlobGetPropertiesResponse) (err error) { metadata := info.NewMetadata() size := info.ContentLength() @@ -1357,10 +1370,11 @@ func (o *Object) clearMetaData() { // readMetaData gets the metadata if it hasn't already been fetched // // Sets -// o.id -// o.modTime -// o.size -// o.md5 +// +// o.id +// o.modTime +// o.size +// o.md5 func (o *Object) readMetaData() (err error) { if !o.modTime.IsZero() { return nil diff --git a/vendor/github.com/rclone/rclone/backend/googlecloudstorage/googlecloudstorage.go b/vendor/github.com/rclone/rclone/backend/googlecloudstorage/googlecloudstorage.go index e96e4f5d9a..56a597adcb 100644 --- a/vendor/github.com/rclone/rclone/backend/googlecloudstorage/googlecloudstorage.go +++ b/vendor/github.com/rclone/rclone/backend/googlecloudstorage/googlecloudstorage.go @@ -556,7 +556,7 @@ type listFn func(remote string, object *storage.Object, isDirectory bool) error // // dir is the starting directory, "" for root // -// Set recurse to read sub directories +// # Set recurse to read sub directories // // The remote has prefix removed from it and if addBucket is set // then it adds the bucket to the start. @@ -634,38 +634,38 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck return nil } -// Convert a list item into a DirEntry -func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *storage.Object, isDirectory bool) (fs.DirEntry, error) { - if isDirectory { - d := fs.NewDir(remote, time.Time{}).SetSize(int64(object.Size)) - return d, nil - } - o, err := f.newObjectWithInfo(ctx, remote, object) - if err != nil { - return nil, err - } - return o, nil -} - -// listDir lists a single directory -func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { - // List the objects - err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *storage.Object, isDirectory bool) error { +// listCB calls list with cb wrapped in walk.NewListRHelper. +func (f *Fs) listCB(ctx context.Context, bucket, directory, prefix string, addBucket, recourse bool, cb fs.ListRCallback) error { + list := walk.NewListRHelper(cb) + err := f.list(ctx, bucket, directory, prefix, addBucket, recourse, func(remote string, object *storage.Object, isDirectory bool) error { entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) if err != nil { return err } if entry != nil { - entries = append(entries, entry) + return list.Add(entry) } return nil }) if err != nil { - return nil, err + return err } // bucket must be present if listing succeeded f.cache.MarkOK(bucket) - return entries, err + return list.Flush() +} + +// Convert a list item into a DirEntry +func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *storage.Object, isDirectory bool) (fs.DirEntry, error) { + if isDirectory { + d := fs.NewDir(remote, time.Time{}).SetSize(int64(object.Size)) + return d, nil + } + o, err := f.newObjectWithInfo(ctx, remote, object) + if err != nil { + return nil, err + } + return o, nil } // listBuckets lists the buckets @@ -704,7 +704,7 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) // // This should return ErrDirNotFound if the directory isn't // found. -func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { bucket, directory := f.split(dir) if bucket == "" { if directory != "" { @@ -712,7 +712,37 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } return f.listBuckets(ctx) } - return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") + + // Use callback for regular listing + var entries fs.DirEntries + err := f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", false, func(e fs.DirEntries) error { + entries = append(entries, e...) + return nil + }) + if err != nil { + return nil, err + } + return entries, nil +} + +// ListCB calls callback to the objects and directories in dir as they are being listed. +// The callback might be called for just a subset of directory entries. +// When listing buckets, the callback is called just once for all of them. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't found. +func (f *Fs) ListCB(ctx context.Context, dir string, cb fs.ListRCallback) error { + bucket, directory := f.split(dir) + if bucket == "" { + entries, err := f.listBuckets(ctx) + if err != nil { + return err + } + return cb(entries) + } + return f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", false, cb) } // ListR lists the objects and directories of the Fs starting @@ -731,50 +761,33 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // // Don't implement this unless you have a more efficient way // of listing recursively that doing a directory traversal. -func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { +func (f *Fs) ListR(ctx context.Context, dir string, cb fs.ListRCallback) error { bucket, directory := f.split(dir) - list := walk.NewListRHelper(callback) - listR := func(bucket, directory, prefix string, addBucket bool) error { - return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *storage.Object, isDirectory bool) error { - entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) - if err != nil { - return err - } - return list.Add(entry) - }) - } if bucket == "" { entries, err := f.listBuckets(ctx) if err != nil { return err } for _, entry := range entries { - err = list.Add(entry) + // Call callback on bucket right before listing its files + err = cb(fs.DirEntries{entry}) if err != nil { return err } bucket := entry.Remote() - err = listR(bucket, "", f.rootDirectory, true) + err = f.listCB(ctx, bucket, "", f.rootDirectory, true, true, cb) if err != nil { return err } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) } - } else { - err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") - if err != nil { - return err - } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) + return nil } - return list.Flush() + return f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", true, cb) } // Put the object into the bucket // -// Copy the reader in to the new object which is returned +// # Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { @@ -871,9 +884,9 @@ func (f *Fs) Precision() time.Duration { // Copy src to this remote using server-side copy operations. // -// This is stored with the remote path given +// # This is stored with the remote path given // -// It returns the destination Object and a possible error +// # It returns the destination Object and a possible error // // Will only be called if src.Fs().Name() == f.Name() // diff --git a/vendor/github.com/rclone/rclone/backend/s3/s3.go b/vendor/github.com/rclone/rclone/backend/s3/s3.go index d51c6ee7e6..c643c4e047 100644 --- a/vendor/github.com/rclone/rclone/backend/s3/s3.go +++ b/vendor/github.com/rclone/rclone/backend/s3/s3.go @@ -1396,7 +1396,7 @@ var retryErrorCodes = []int{ 503, // Service Unavailable/Slow Down - "Reduce your request rate" } -//S3 is pretty resilient, and the built in retry handling is probably sufficient +// S3 is pretty resilient, and the built in retry handling is probably sufficient // as it should notice closed connections and timeouts which are the most likely // sort of failure modes func (f *Fs) shouldRetry(err error) (bool, error) { @@ -1713,7 +1713,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e // Return an Object from a path // -//If it can't be found it returns the error ErrorObjectNotFound. +// If it can't be found it returns the error ErrorObjectNotFound. func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Object) (fs.Object, error) { o := &Object{ fs: f, @@ -1962,6 +1962,27 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck return nil } +// listCB calls list with cb wrapped in walk.NewListRHelper. +func (f *Fs) listCB(ctx context.Context, bucket, directory, prefix string, addBucket, recurse bool, cb fs.ListRCallback) error { + list := walk.NewListRHelper(cb) + err := f.list(ctx, bucket, directory, prefix, addBucket, recurse, func(remote string, object *s3.Object, isDirectory bool) error { + entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) + if err != nil { + return err + } + if entry != nil { + return list.Add(entry) + } + return nil + }) + if err != nil { + return err + } + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) + return list.Flush() +} + // Convert a list item into a DirEntry func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Object, isDirectory bool) (fs.DirEntry, error) { if isDirectory { @@ -1979,27 +2000,6 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec return o, nil } -// listDir lists files and directories to out -func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { - // List the objects and directories - err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error { - entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) - if err != nil { - return err - } - if entry != nil { - entries = append(entries, entry) - } - return nil - }) - if err != nil { - return nil, err - } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) - return entries, nil -} - // listBuckets lists the buckets to out func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) { req := s3.ListBucketsInput{} @@ -2029,7 +2029,7 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) // // This should return ErrDirNotFound if the directory isn't // found. -func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { bucket, directory := f.split(dir) if bucket == "" { if directory != "" { @@ -2037,7 +2037,40 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } return f.listBuckets(ctx) } - return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") + + // Use callback for regular listing + var entries fs.DirEntries + err := f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", false, func(e fs.DirEntries) error { + entries = append(entries, e...) + return nil + }) + if err != nil { + return nil, err + } + return entries, nil +} + +// ListCB calls callback to the objects and directories in dir as they are being listed. +// The callback might be called for just a subset of directory entries. +// When listing buckets, the callback is called just once for all of them. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't found. +func (f *Fs) ListCB(ctx context.Context, dir string, cb fs.ListRCallback) error { + bucket, directory := f.split(dir) + if bucket == "" { + if directory != "" { + return fs.ErrorListBucketRequired + } + entries, err := f.listBuckets(ctx) + if err != nil { + return err + } + return cb(entries) + } + return f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", false, cb) } // ListR lists the objects and directories of the Fs starting @@ -2056,45 +2089,28 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // // Don't implement this unless you have a more efficient way // of listing recursively than doing a directory traversal. -func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { +func (f *Fs) ListR(ctx context.Context, dir string, cb fs.ListRCallback) error { bucket, directory := f.split(dir) - list := walk.NewListRHelper(callback) - listR := func(bucket, directory, prefix string, addBucket bool) error { - return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *s3.Object, isDirectory bool) error { - entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) - if err != nil { - return err - } - return list.Add(entry) - }) - } if bucket == "" { entries, err := f.listBuckets(ctx) if err != nil { return err } for _, entry := range entries { - err = list.Add(entry) + // Call callback on bucket right before listing its files + err = cb(fs.DirEntries{entry}) if err != nil { return err } bucket := entry.Remote() - err = listR(bucket, "", f.rootDirectory, true) + err = f.listCB(ctx, bucket, "", f.rootDirectory, true, true, cb) if err != nil { return err } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) - } - } else { - err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") - if err != nil { - return err } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) + return nil } - return list.Flush() + return f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", true, cb) } // Put the Object into the bucket @@ -2358,9 +2374,9 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst // Copy src to this remote using server-side copy operations. // -// This is stored with the remote path given +// # This is stored with the remote path given // -// It returns the destination Object and a possible error +// # It returns the destination Object and a possible error // // Will only be called if src.Fs().Name() == f.Name() // diff --git a/vendor/github.com/rclone/rclone/fs/config.go b/vendor/github.com/rclone/rclone/fs/config.go index 22e3682f99..9d8793d05b 100644 --- a/vendor/github.com/rclone/rclone/fs/config.go +++ b/vendor/github.com/rclone/rclone/fs/config.go @@ -82,6 +82,7 @@ type ConfigInfo struct { Suffix string `yaml:"suffix"` SuffixKeepExtension bool `yaml:"suffix_keep_extension"` UseListR bool `yaml:"use_list_r"` + UseListCB bool `yaml:"use_list_cb"` BufferSize SizeSuffix `yaml:"buffer_size"` BwLimit BwTimetable `yaml:"bw_limit"` BwLimitFile BwTimetable `yaml:"bw_limit_file"` diff --git a/vendor/github.com/rclone/rclone/fs/fs.go b/vendor/github.com/rclone/rclone/fs/fs.go index f91192261c..c668343de5 100644 --- a/vendor/github.com/rclone/rclone/fs/fs.go +++ b/vendor/github.com/rclone/rclone/fs/fs.go @@ -1045,6 +1045,23 @@ type ListRer interface { ListR(ctx context.Context, dir string, callback ListRCallback) error } +// ListCBer extends Fs with ListCB. +type ListCBer interface { + Fs + // ListCB calls callback on directory entries as they are being listed. + // + // dir should be "" to start from the root, and should not + // have trailing slashes. + // + // This should return ErrDirNotFound if the directory isn't found. + // + // It should call callback for each tranche of entries read. + // These need not be returned in any particular order. If + // callback returns an error then the listing will stop + // immediately. + ListCB(ctx context.Context, dir string, callback ListRCallback) error +} + // RangeSeeker is the interface that wraps the RangeSeek method. // // Some of the returns from Object.Open() may optionally implement @@ -1187,7 +1204,7 @@ func Find(name string) (*RegInfo, error) { // MustFind looks for an Info object for the type name passed in // -// Services are looked up in the config file +// # Services are looked up in the config file // // Exits with a fatal error if not found func MustFind(name string) *RegInfo { diff --git a/vendor/github.com/rclone/rclone/fs/list/list.go b/vendor/github.com/rclone/rclone/fs/list/list.go index dfa8b688d4..38aca0cd92 100644 --- a/vendor/github.com/rclone/rclone/fs/list/list.go +++ b/vendor/github.com/rclone/rclone/fs/list/list.go @@ -36,6 +36,37 @@ func DirSorted(ctx context.Context, f fs.Fs, includeAll bool, dir string) (entri return filterAndSortDir(ctx, entries, includeAll, dir, fi.IncludeObject, fi.IncludeDirectory(ctx, f)) } +// Func is copied from imports github.com/rclone/rclone/fs/walk +// in order to avoid import cycle. +type Func func(path string, entries fs.DirEntries, err error) error + +// DirCBFunc is the type of DirCB function. +type DirCBFunc func(ctx context.Context, fs fs.ListCBer, includeAll bool, dir string, cb Func) error + +// DirCB works like DirSorted but uses ListCB instead of List for file listing. +func DirCB(ctx context.Context, f fs.ListCBer, includeAll bool, dir string, cb Func) error { + fi := filter.GetConfig(ctx) + // Get unfiltered entries from the fs + return f.ListCB(ctx, dir, func(entries fs.DirEntries) error { + // This should happen only if exclude files lives in the + // starting directory, otherwise ListDirSorted should not be + // called. + if !includeAll && fi.ListContainsExcludeFile(entries) { + fs.Debugf(dir, "Excluded") + return nil + } + var err error + entries, err = filterAndSortDir(ctx, entries, includeAll, dir, fi.IncludeObject, fi.IncludeDirectory(ctx, f)) + if err != nil { + return err + } + if len(entries) > 0 { + return cb(dir, entries, nil) + } + return nil + }) +} + // filter (if required) and check the entries, then sort them func filterAndSortDir(ctx context.Context, entries fs.DirEntries, includeAll bool, dir string, IncludeObject func(ctx context.Context, o fs.Object) bool, diff --git a/vendor/github.com/rclone/rclone/fs/walk/walk.go b/vendor/github.com/rclone/rclone/fs/walk/walk.go index b731fcf5d7..6be245e5bc 100644 --- a/vendor/github.com/rclone/rclone/fs/walk/walk.go +++ b/vendor/github.com/rclone/rclone/fs/walk/walk.go @@ -49,7 +49,7 @@ type Func func(path string, entries fs.DirEntries, err error) error // Note that fn will not be called concurrently whereas the directory // listing will proceed concurrently. // -// Parent directories are always listed before their children +// # Parent directories are always listed before their children // // This is implemented by WalkR if Config.UseListR is true // and f supports it and level > 1, or WalkN otherwise. @@ -62,12 +62,19 @@ func Walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i ci := fs.GetConfig(ctx) fi := filter.GetConfig(ctx) if ci.NoTraverse && fi.HaveFilesFrom() { + fs.Logf(nil, "Walk: used walkR for listing") return walkR(ctx, f, path, includeAll, maxLevel, fn, fi.MakeListR(ctx, f.NewObject)) } // FIXME should this just be maxLevel < 0 - why the maxLevel > 1 if (maxLevel < 0 || maxLevel > 1) && ci.UseListR && f.Features().ListR != nil { + fs.Logf(nil, "Walk: used walkListR for listing") return walkListR(ctx, f, path, includeAll, maxLevel, fn) } + if fcb, ok := f.(fs.ListCBer); ci.UseListCB && ok { + fs.Logf(nil, "Walk: used walkCB for listing") + return walkCB(ctx, fcb, path, includeAll, maxLevel, fn, list.DirCB) + } + fs.Logf(nil, "Walk: used walkListDirSorted for listing") return walkListDirSorted(ctx, f, path, includeAll, maxLevel, fn) } @@ -353,9 +360,54 @@ func walkListR(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLe return walkR(ctx, f, path, includeAll, maxLevel, fn, listR) } +// listJob describe a directory listing that needs to be done. +type listJob struct { + remote string + depth int +} + type listDirFunc func(ctx context.Context, fs fs.Fs, includeAll bool, dir string) (entries fs.DirEntries, err error) func walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel int, fn Func, listDir listDirFunc) error { + return walkCore(ctx, path, maxLevel, func(ctx context.Context, job listJob) ([]listJob, error) { + entries, err := listDir(ctx, f, includeAll, job.remote) + var jobs []listJob + if err == nil && job.depth != 0 { + entries.ForDir(func(dir fs.Directory) { + // Recurse for the directory + jobs = append(jobs, listJob{ + remote: dir.Remote(), + depth: job.depth - 1, + }) + }) + } + err = fn(job.remote, entries, err) + return jobs, err + }) +} + +func walkCB(ctx context.Context, f fs.ListCBer, path string, includeAll bool, maxLevel int, fn Func, listDir list.DirCBFunc) error { + return walkCore(ctx, path, maxLevel, func(ctx context.Context, job listJob) ([]listJob, error) { + var jobs []listJob + err := listDir(ctx, f, includeAll, job.remote, func(path string, entries fs.DirEntries, err error) error { + if err == nil && job.depth != 0 { + entries.ForDir(func(dir fs.Directory) { + // Recurse for the directory + jobs = append(jobs, listJob{ + remote: dir.Remote(), + depth: job.depth - 1, + }) + }) + } + return fn(path, entries, err) + }) + return jobs, err + }) +} + +type walkCoreFunc func(ctx context.Context, job listJob) ([]listJob, error) + +func walkCore(ctx context.Context, path string, maxLevel int, core walkCoreFunc) error { var ( wg sync.WaitGroup // sync closing of go routines traversing sync.WaitGroup // running directory traversals @@ -363,11 +415,6 @@ func walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i mu sync.Mutex // stop fn being called concurrently ci = fs.GetConfig(ctx) // current config ) - // listJob describe a directory listing that needs to be done - type listJob struct { - remote string - depth int - } in := make(chan listJob, ci.Checkers) errs := make(chan error, 1) @@ -392,19 +439,8 @@ func walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i if !ok { return } - entries, err := listDir(ctx, f, includeAll, job.remote) - var jobs []listJob - if err == nil && job.depth != 0 { - entries.ForDir(func(dir fs.Directory) { - // Recurse for the directory - jobs = append(jobs, listJob{ - remote: dir.Remote(), - depth: job.depth - 1, - }) - }) - } mu.Lock() - err = fn(job.remote, entries, err) + jobs, err := core(ctx, job) mu.Unlock() // NB once we have passed entries to fn we mustn't touch it again if err != nil && err != ErrorSkipDir { diff --git a/vendor/modules.txt b/vendor/modules.txt index 992bd5d1df..922e3d7aec 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -313,7 +313,7 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/rclone/rclone v1.51.0 => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e +# github.com/rclone/rclone v1.51.0 => github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668 ## explicit; go 1.21 github.com/rclone/rclone/backend/azureblob github.com/rclone/rclone/backend/crypt @@ -758,4 +758,4 @@ gopkg.in/yaml.v2 ## explicit gopkg.in/yaml.v3 # github.com/gocql/gocql => github.com/scylladb/gocql v1.12.0 -# github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e +# github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668 From 00383face989ce3e8f8d4e0946842fb7651b3415 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 2 Dec 2024 22:41:54 +0100 Subject: [PATCH 2/4] fix(rcserver): allow to stream files as the dir is listed Fixes #4132 --- pkg/rclone/rcserver/rc.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/rclone/rcserver/rc.go b/pkg/rclone/rcserver/rc.go index 50f63a298c..c6bd3dd28f 100644 --- a/pkg/rclone/rcserver/rc.go +++ b/pkg/rclone/rcserver/rc.go @@ -486,6 +486,11 @@ func rcChunkedList(ctx context.Context, in rc.Params) (out rc.Params, err error) } } + ctx, ci := fs.AddConfig(ctx) + // Allow for calling callback as the dir is listed (#4132) + // Note that ListCB is implemented only for a non-recursive listings. + ci.UseListCB = true + w, err := in.GetHTTPResponseWriter() if err != nil { return nil, err From d3e2a6639eb45dd0f377388dd2b03ea9e9ccb7cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 3 Dec 2024 17:07:16 +0100 Subject: [PATCH 3/4] fix(backup): don't use recursive listing in deduplicate stage It does not make sense, as sstable dirs are flat. Moreover, using recursion makes it impossible to use internal rclone ListCB which improves performance and memory allocation. --- pkg/service/backup/worker_deduplicate.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/service/backup/worker_deduplicate.go b/pkg/service/backup/worker_deduplicate.go index 82cdf59ec7..04ccf961cd 100644 --- a/pkg/service/backup/worker_deduplicate.go +++ b/pkg/service/backup/worker_deduplicate.go @@ -68,7 +68,6 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error { remoteSSTableBundles := newSSTableBundlesByID() listOpts := &scyllaclient.RcloneListDirOpts{ FilesOnly: true, - Recurse: true, } if err := w.Client.RcloneListDirIter(ctx, h.IP, dataDst, listOpts, func(f *scyllaclient.RcloneListDirItem) { if err := remoteSSTableBundles.add(f.Name, f.Size); err != nil { From e3fe0c9464f8c151920761b73eb8b23716f444bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 4 Dec 2024 09:17:36 +0100 Subject: [PATCH 4/4] feat(scyllaclient_test): benchmark RcloneListDirIter The main focus of this benchmark is to test the impact of the issues: - #4134 - #4133 - #4132 --- .../client_rclone_agent_integration_test.go | 76 +++++++++++++++++++ pkg/testutils/s3.go | 8 ++ 2 files changed, 84 insertions(+) diff --git a/pkg/scyllaclient/client_rclone_agent_integration_test.go b/pkg/scyllaclient/client_rclone_agent_integration_test.go index b2db38af66..e2bd67f345 100644 --- a/pkg/scyllaclient/client_rclone_agent_integration_test.go +++ b/pkg/scyllaclient/client_rclone_agent_integration_test.go @@ -9,6 +9,7 @@ import ( "bytes" "context" "fmt" + "os" "path" "strings" "testing" @@ -20,6 +21,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/testutils" . "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig" + "github.com/scylladb/scylla-manager/v3/pkg/util/timeutc" "go.uber.org/zap/zapcore" ) @@ -605,6 +607,80 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) { } } +func BenchmarkRcloneListDirIterIntegration(b *testing.B) { + bucket := S3BucketPath(testBucket) + if err := os.RemoveAll(bucket); err != nil { + b.Fatal(err) + } + if err := os.Mkdir(bucket, 0o700); err != nil { + b.Fatal(err) + } + + client, err := scyllaclient.NewClient(scyllaclient.TestConfig(ManagedClusterHosts(), AgentAuthToken()), log.NewDevelopmentWithLevel(zapcore.ErrorLevel)) + if err != nil { + b.Fatal(err) + } + + const fileCnt = 5555 + Printf("Given: dir with %d files", fileCnt) + for i := 0; i < fileCnt; i++ { + f, err := os.Create(path.Join(bucket, fmt.Sprint(i))) + if err != nil { + b.Fatal(err) + } + _ = f.Close() + } + + Print("When: check list iter latency") + // ListCB is implemented only for the non-recursive listings (take a look at rcChunkedList) + opts := &scyllaclient.RcloneListDirOpts{ + Recurse: false, + } + // 1000 is the default chunk size for s3 + const rcloneChunkSize = 1000 + const sampleSize = 1000 + var avgTotal, avgFirst, avgCross, avgWithin time.Duration + for sample := range sampleSize { + var firstDiff, maxCrossChunkDiff, maxWithinChunkDiff time.Duration + idx := 0 + lastCB := timeutc.Now() + start := timeutc.Now() + err = client.RcloneListDirIter(context.Background(), ManagedClusterHost(), remotePath(""), opts, func(item *scyllaclient.RcloneListDirItem) { + idx++ + now := timeutc.Now() + if idx == 1 { + firstDiff = now.Sub(lastCB) + lastCB = now + return + } + diff := now.Sub(lastCB) + lastCB = now + if idx%rcloneChunkSize == 1 { + maxCrossChunkDiff = max(maxCrossChunkDiff, diff) + } else { + maxWithinChunkDiff = max(maxWithinChunkDiff, diff) + } + }) + if err != nil { + b.Fatal(err) + } + total := timeutc.Now().Sub(start) + + b.Log("sample: ", sample, "total: ", total, "first: ", firstDiff, "maxCross: ", maxCrossChunkDiff, "maxWithin: ", maxWithinChunkDiff) + avgTotal += total + avgFirst += firstDiff + avgCross += maxCrossChunkDiff + avgWithin += maxWithinChunkDiff + } + + avgTotal /= sampleSize + avgFirst /= sampleSize + avgCross /= sampleSize + avgWithin /= sampleSize + + b.Log("Avg total time: ", avgTotal, "Avg first latency: ", avgFirst, "Avg max cross chunk latency: ", avgCross, "Avg max within chunk latency: ", avgWithin) +} + func validateDirContents(ctx context.Context, client *scyllaclient.Client, host, remotePath string, files map[string]string) error { // Check if all specified files with given contents are present for f, expected := range files { diff --git a/pkg/testutils/s3.go b/pkg/testutils/s3.go index 67e22e89bf..3bb7ab3326 100644 --- a/pkg/testutils/s3.go +++ b/pkg/testutils/s3.go @@ -45,3 +45,11 @@ func S3Credentials() (provider, endpoint, accessKeyID, secretAccessKey string) { } return *flagS3Provider, *flagS3Endpoint, *flagS3AccessKeyID, *flagS3SecretAccessKey } + +// S3BucketPath returns the os path to the bucket. +func S3BucketPath(bucket string) string { + if !flag.Parsed() { + flag.Parse() + } + return filepath.Join(*flagS3DataDir, bucket) +}