diff --git a/go.mod b/go.mod index c9f452f32e..d03870678a 100644 --- a/go.mod +++ b/go.mod @@ -127,6 +127,6 @@ require ( replace ( github.com/gocql/gocql => github.com/scylladb/gocql v1.12.0 - github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e + github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668 google.golang.org/api v0.114.0 => github.com/scylladb/google-api-go-client v0.34.1-patched ) diff --git a/go.sum b/go.sum index 4eec7b07b4..8e5cba721e 100644 --- a/go.sum +++ b/go.sum @@ -1051,8 +1051,8 @@ github.com/scylladb/gocqlx/v2 v2.8.0 h1:f/oIgoEPjKDKd+RIoeHqexsIQVIbalVmT+axwvUq github.com/scylladb/gocqlx/v2 v2.8.0/go.mod h1:4/+cga34PVqjhgSoo5Nr2fX1MQIqZB5eCE5DK4xeDig= github.com/scylladb/google-api-go-client v0.34.1-patched h1:DW+T0HA+74o6FDr3TFzVwgESabOB1eTwb4woE6oUziY= github.com/scylladb/google-api-go-client v0.34.1-patched/go.mod h1:RriRmS2wJXH+2yd9PRTEcR380U9AXmurWwznqVhzsSc= -github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e h1:lJRphCtu+nKd+mfo8whOTeFkgjMWvk8iCSlqgibKSa8= -github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM= +github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668 h1:3beIciiaygPjiUPLN0+wVlnfe4JnB1Wb/uV1MChzRbM= +github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM= github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241104134613-aba35605c28b h1:JRDV1d1FIiH0TIyHVmTAILAjQ2f8O4t7ZtZ/S+fT2sY= github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241104134613-aba35605c28b/go.mod h1:Tss7a99vrgds+B70w8ZFG3Skxfr9Br3kAzrKP2b3CmQ= github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241104134613-aba35605c28b h1:7CHNmPrQqSdApaEh5nkRL+D52KFHaOHVBBVDvytHEOY= diff --git a/pkg/config/agent/testdata/auth_token_overwrite.golden.yaml b/pkg/config/agent/testdata/auth_token_overwrite.golden.yaml index 1098be0202..5db6a11c88 100644 --- a/pkg/config/agent/testdata/auth_token_overwrite.golden.yaml +++ b/pkg/config/agent/testdata/auth_token_overwrite.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/basic.golden.yaml b/pkg/config/agent/testdata/basic.golden.yaml index 89d7faf302..4a4b500b08 100644 --- a/pkg/config/agent/testdata/basic.golden.yaml +++ b/pkg/config/agent/testdata/basic.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/debug_overwrite.golden.yaml b/pkg/config/agent/testdata/debug_overwrite.golden.yaml index cb08018b62..00121efa15 100644 --- a/pkg/config/agent/testdata/debug_overwrite.golden.yaml +++ b/pkg/config/agent/testdata/debug_overwrite.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/https_overwrite.golden.yaml b/pkg/config/agent/testdata/https_overwrite.golden.yaml index a9bf9c00f6..2220fb6597 100644 --- a/pkg/config/agent/testdata/https_overwrite.golden.yaml +++ b/pkg/config/agent/testdata/https_overwrite.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/multiple_cpus.golden.yaml b/pkg/config/agent/testdata/multiple_cpus.golden.yaml index f39d31ffd1..045607c5e8 100644 --- a/pkg/config/agent/testdata/multiple_cpus.golden.yaml +++ b/pkg/config/agent/testdata/multiple_cpus.golden.yaml @@ -69,6 +69,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/prometheus_overwrite.golden.yaml b/pkg/config/agent/testdata/prometheus_overwrite.golden.yaml index cc7225ec8a..98f6c706e8 100644 --- a/pkg/config/agent/testdata/prometheus_overwrite.golden.yaml +++ b/pkg/config/agent/testdata/prometheus_overwrite.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/scylla_overwrite.golden.yaml b/pkg/config/agent/testdata/scylla_overwrite.golden.yaml index d0b242140f..8befdcc56e 100644 --- a/pkg/config/agent/testdata/scylla_overwrite.golden.yaml +++ b/pkg/config/agent/testdata/scylla_overwrite.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/config/agent/testdata/structs_empty.golden.yaml b/pkg/config/agent/testdata/structs_empty.golden.yaml index 89d7faf302..4a4b500b08 100644 --- a/pkg/config/agent/testdata/structs_empty.golden.yaml +++ b/pkg/config/agent/testdata/structs_empty.golden.yaml @@ -66,6 +66,7 @@ rclone: suffix: "" suffix_keep_extension: false use_list_r: false + use_list_cb: false buffer_size: 16777216 bw_limit: [] bw_limit_file: [] diff --git a/pkg/rclone/rcserver/rc.go b/pkg/rclone/rcserver/rc.go index 50f63a298c..c6bd3dd28f 100644 --- a/pkg/rclone/rcserver/rc.go +++ b/pkg/rclone/rcserver/rc.go @@ -486,6 +486,11 @@ func rcChunkedList(ctx context.Context, in rc.Params) (out rc.Params, err error) } } + ctx, ci := fs.AddConfig(ctx) + // Allow for calling callback as the dir is listed (#4132) + // Note that ListCB is implemented only for a non-recursive listings. + ci.UseListCB = true + w, err := in.GetHTTPResponseWriter() if err != nil { return nil, err diff --git a/pkg/scyllaclient/client_rclone_agent_integration_test.go b/pkg/scyllaclient/client_rclone_agent_integration_test.go index b2db38af66..e2bd67f345 100644 --- a/pkg/scyllaclient/client_rclone_agent_integration_test.go +++ b/pkg/scyllaclient/client_rclone_agent_integration_test.go @@ -9,6 +9,7 @@ import ( "bytes" "context" "fmt" + "os" "path" "strings" "testing" @@ -20,6 +21,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/testutils" . "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig" + "github.com/scylladb/scylla-manager/v3/pkg/util/timeutc" "go.uber.org/zap/zapcore" ) @@ -605,6 +607,80 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) { } } +func BenchmarkRcloneListDirIterIntegration(b *testing.B) { + bucket := S3BucketPath(testBucket) + if err := os.RemoveAll(bucket); err != nil { + b.Fatal(err) + } + if err := os.Mkdir(bucket, 0o700); err != nil { + b.Fatal(err) + } + + client, err := scyllaclient.NewClient(scyllaclient.TestConfig(ManagedClusterHosts(), AgentAuthToken()), log.NewDevelopmentWithLevel(zapcore.ErrorLevel)) + if err != nil { + b.Fatal(err) + } + + const fileCnt = 5555 + Printf("Given: dir with %d files", fileCnt) + for i := 0; i < fileCnt; i++ { + f, err := os.Create(path.Join(bucket, fmt.Sprint(i))) + if err != nil { + b.Fatal(err) + } + _ = f.Close() + } + + Print("When: check list iter latency") + // ListCB is implemented only for the non-recursive listings (take a look at rcChunkedList) + opts := &scyllaclient.RcloneListDirOpts{ + Recurse: false, + } + // 1000 is the default chunk size for s3 + const rcloneChunkSize = 1000 + const sampleSize = 1000 + var avgTotal, avgFirst, avgCross, avgWithin time.Duration + for sample := range sampleSize { + var firstDiff, maxCrossChunkDiff, maxWithinChunkDiff time.Duration + idx := 0 + lastCB := timeutc.Now() + start := timeutc.Now() + err = client.RcloneListDirIter(context.Background(), ManagedClusterHost(), remotePath(""), opts, func(item *scyllaclient.RcloneListDirItem) { + idx++ + now := timeutc.Now() + if idx == 1 { + firstDiff = now.Sub(lastCB) + lastCB = now + return + } + diff := now.Sub(lastCB) + lastCB = now + if idx%rcloneChunkSize == 1 { + maxCrossChunkDiff = max(maxCrossChunkDiff, diff) + } else { + maxWithinChunkDiff = max(maxWithinChunkDiff, diff) + } + }) + if err != nil { + b.Fatal(err) + } + total := timeutc.Now().Sub(start) + + b.Log("sample: ", sample, "total: ", total, "first: ", firstDiff, "maxCross: ", maxCrossChunkDiff, "maxWithin: ", maxWithinChunkDiff) + avgTotal += total + avgFirst += firstDiff + avgCross += maxCrossChunkDiff + avgWithin += maxWithinChunkDiff + } + + avgTotal /= sampleSize + avgFirst /= sampleSize + avgCross /= sampleSize + avgWithin /= sampleSize + + b.Log("Avg total time: ", avgTotal, "Avg first latency: ", avgFirst, "Avg max cross chunk latency: ", avgCross, "Avg max within chunk latency: ", avgWithin) +} + func validateDirContents(ctx context.Context, client *scyllaclient.Client, host, remotePath string, files map[string]string) error { // Check if all specified files with given contents are present for f, expected := range files { diff --git a/pkg/service/backup/worker_deduplicate.go b/pkg/service/backup/worker_deduplicate.go index 82cdf59ec7..04ccf961cd 100644 --- a/pkg/service/backup/worker_deduplicate.go +++ b/pkg/service/backup/worker_deduplicate.go @@ -68,7 +68,6 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error { remoteSSTableBundles := newSSTableBundlesByID() listOpts := &scyllaclient.RcloneListDirOpts{ FilesOnly: true, - Recurse: true, } if err := w.Client.RcloneListDirIter(ctx, h.IP, dataDst, listOpts, func(f *scyllaclient.RcloneListDirItem) { if err := remoteSSTableBundles.add(f.Name, f.Size); err != nil { diff --git a/pkg/testutils/s3.go b/pkg/testutils/s3.go index 67e22e89bf..3bb7ab3326 100644 --- a/pkg/testutils/s3.go +++ b/pkg/testutils/s3.go @@ -45,3 +45,11 @@ func S3Credentials() (provider, endpoint, accessKeyID, secretAccessKey string) { } return *flagS3Provider, *flagS3Endpoint, *flagS3AccessKeyID, *flagS3SecretAccessKey } + +// S3BucketPath returns the os path to the bucket. +func S3BucketPath(bucket string) string { + if !flag.Parsed() { + flag.Parse() + } + return filepath.Join(*flagS3DataDir, bucket) +} diff --git a/vendor/github.com/rclone/rclone/backend/azureblob/azureblob.go b/vendor/github.com/rclone/rclone/backend/azureblob/azureblob.go index e0478c2074..9398f559ff 100644 --- a/vendor/github.com/rclone/rclone/backend/azureblob/azureblob.go +++ b/vendor/github.com/rclone/rclone/backend/azureblob/azureblob.go @@ -1,5 +1,6 @@ // Package azureblob provides an interface to the Microsoft Azure blob object storage system +//go:build !plan9 && !solaris && !js && go1.14 // +build !plan9,!solaris,!js,go1.14 package azureblob @@ -861,6 +862,30 @@ func (f *Fs) list(ctx context.Context, container, directory, prefix string, addC return nil } +// listCB calls list with cb wrapped in walk.NewListRHelper. +func (f *Fs) listCB(ctx context.Context, container, directory, prefix string, addContainer, recourse bool, cb fs.ListRCallback) error { + if !f.containerOK(container) { + return fs.ErrorDirNotFound + } + list := walk.NewListRHelper(cb) + err := f.list(ctx, container, directory, prefix, addContainer, recourse, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItemInternal, isDirectory bool) error { + entry, err := f.itemToDirEntry(remote, object, isDirectory) + if err != nil { + return err + } + if entry != nil { + return list.Add(entry) + } + return nil + }) + if err != nil { + return err + } + // container must be present if listing succeeded + f.cache.MarkOK(container) + return list.Flush() +} + // Convert a list item into a DirEntry func (f *Fs) itemToDirEntry(remote string, object *azblob.BlobItemInternal, isDirectory bool) (fs.DirEntry, error) { if isDirectory { @@ -889,29 +914,6 @@ func (f *Fs) containerOK(container string) bool { return false } -// listDir lists a single directory -func (f *Fs) listDir(ctx context.Context, container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) { - if !f.containerOK(container) { - return nil, fs.ErrorDirNotFound - } - err = f.list(ctx, container, directory, prefix, addContainer, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItemInternal, isDirectory bool) error { - entry, err := f.itemToDirEntry(remote, object, isDirectory) - if err != nil { - return err - } - if entry != nil { - entries = append(entries, entry) - } - return nil - }) - if err != nil { - return nil, err - } - // container must be present if listing succeeded - f.cache.MarkOK(container) - return entries, nil -} - // listContainers returns all the containers to out func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err error) { if f.isLimited { @@ -944,7 +946,7 @@ func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err err // // This should return ErrDirNotFound if the directory isn't // found. -func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { container, directory := f.split(dir) if container == "" { if directory != "" { @@ -952,7 +954,37 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } return f.listContainers(ctx) } - return f.listDir(ctx, container, directory, f.rootDirectory, f.rootContainer == "") + + // Use callback for regular listing + var entries fs.DirEntries + err := f.listCB(ctx, container, directory, f.rootDirectory, f.rootContainer == "", false, func(e fs.DirEntries) error { + entries = append(entries, e...) + return nil + }) + if err != nil { + return nil, err + } + return entries, nil +} + +// ListCB calls callback to the objects and directories in dir as they are being listed. +// The callback might be called for just a subset of directory entries. +// When listing buckets, the callback is called just once for all of them. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't found. +func (f *Fs) ListCB(ctx context.Context, dir string, cb fs.ListRCallback) error { + container, directory := f.split(dir) + if container == "" { + entries, err := f.listContainers(ctx) + if err != nil { + return err + } + return cb(entries) + } + return f.listCB(ctx, container, directory, f.rootDirectory, f.rootContainer == "", false, cb) } // ListR lists the objects and directories of the Fs starting @@ -971,48 +1003,28 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // // Don't implement this unless you have a more efficient way // of listing recursively that doing a directory traversal. -func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { +func (f *Fs) ListR(ctx context.Context, dir string, cb fs.ListRCallback) error { container, directory := f.split(dir) - list := walk.NewListRHelper(callback) - listR := func(container, directory, prefix string, addContainer bool) error { - return f.list(ctx, container, directory, prefix, addContainer, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItemInternal, isDirectory bool) error { - entry, err := f.itemToDirEntry(remote, object, isDirectory) - if err != nil { - return err - } - return list.Add(entry) - }) - } if container == "" { entries, err := f.listContainers(ctx) if err != nil { return err } for _, entry := range entries { - err = list.Add(entry) + // Call callback on container right before listing its files + err = cb(fs.DirEntries{entry}) if err != nil { return err } container := entry.Remote() - err = listR(container, "", f.rootDirectory, true) + err = f.listCB(ctx, container, "", f.rootDirectory, true, true, cb) if err != nil { return err } - // container must be present if listing succeeded - f.cache.MarkOK(container) } - } else { - if !f.containerOK(container) { - return fs.ErrorDirNotFound - } - err = listR(container, directory, f.rootDirectory, f.rootContainer == "") - if err != nil { - return err - } - // container must be present if listing succeeded - f.cache.MarkOK(container) + return nil } - return list.Flush() + return f.listCB(ctx, container, directory, f.rootDirectory, f.rootContainer == "", true, cb) } // listContainerFn is called from listContainersToFn to handle a container @@ -1049,7 +1061,7 @@ func (f *Fs) listContainersToFn(fn listContainerFn) error { // Put the object into the container // -// Copy the reader in to the new object which is returned +// # Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { @@ -1181,9 +1193,9 @@ func (f *Fs) Purge(ctx context.Context, dir string) error { // Copy src to this remote using server-side copy operations. // -// This is stored with the remote path given +// # This is stored with the remote path given // -// It returns the destination Object and a possible error +// # It returns the destination Object and a possible error // // Will only be called if src.Fs().Name() == f.Name() // @@ -1303,11 +1315,12 @@ func (o *Object) setMetadata(metadata azblob.Metadata) { // decodeMetaDataFromPropertiesResponse sets the metadata from the data passed in // // Sets -// o.id -// o.modTime -// o.size -// o.md5 -// o.meta +// +// o.id +// o.modTime +// o.size +// o.md5 +// o.meta func (o *Object) decodeMetaDataFromPropertiesResponse(info *azblob.BlobGetPropertiesResponse) (err error) { metadata := info.NewMetadata() size := info.ContentLength() @@ -1357,10 +1370,11 @@ func (o *Object) clearMetaData() { // readMetaData gets the metadata if it hasn't already been fetched // // Sets -// o.id -// o.modTime -// o.size -// o.md5 +// +// o.id +// o.modTime +// o.size +// o.md5 func (o *Object) readMetaData() (err error) { if !o.modTime.IsZero() { return nil diff --git a/vendor/github.com/rclone/rclone/backend/googlecloudstorage/googlecloudstorage.go b/vendor/github.com/rclone/rclone/backend/googlecloudstorage/googlecloudstorage.go index e96e4f5d9a..56a597adcb 100644 --- a/vendor/github.com/rclone/rclone/backend/googlecloudstorage/googlecloudstorage.go +++ b/vendor/github.com/rclone/rclone/backend/googlecloudstorage/googlecloudstorage.go @@ -556,7 +556,7 @@ type listFn func(remote string, object *storage.Object, isDirectory bool) error // // dir is the starting directory, "" for root // -// Set recurse to read sub directories +// # Set recurse to read sub directories // // The remote has prefix removed from it and if addBucket is set // then it adds the bucket to the start. @@ -634,38 +634,38 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck return nil } -// Convert a list item into a DirEntry -func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *storage.Object, isDirectory bool) (fs.DirEntry, error) { - if isDirectory { - d := fs.NewDir(remote, time.Time{}).SetSize(int64(object.Size)) - return d, nil - } - o, err := f.newObjectWithInfo(ctx, remote, object) - if err != nil { - return nil, err - } - return o, nil -} - -// listDir lists a single directory -func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { - // List the objects - err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *storage.Object, isDirectory bool) error { +// listCB calls list with cb wrapped in walk.NewListRHelper. +func (f *Fs) listCB(ctx context.Context, bucket, directory, prefix string, addBucket, recourse bool, cb fs.ListRCallback) error { + list := walk.NewListRHelper(cb) + err := f.list(ctx, bucket, directory, prefix, addBucket, recourse, func(remote string, object *storage.Object, isDirectory bool) error { entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) if err != nil { return err } if entry != nil { - entries = append(entries, entry) + return list.Add(entry) } return nil }) if err != nil { - return nil, err + return err } // bucket must be present if listing succeeded f.cache.MarkOK(bucket) - return entries, err + return list.Flush() +} + +// Convert a list item into a DirEntry +func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *storage.Object, isDirectory bool) (fs.DirEntry, error) { + if isDirectory { + d := fs.NewDir(remote, time.Time{}).SetSize(int64(object.Size)) + return d, nil + } + o, err := f.newObjectWithInfo(ctx, remote, object) + if err != nil { + return nil, err + } + return o, nil } // listBuckets lists the buckets @@ -704,7 +704,7 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) // // This should return ErrDirNotFound if the directory isn't // found. -func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { bucket, directory := f.split(dir) if bucket == "" { if directory != "" { @@ -712,7 +712,37 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } return f.listBuckets(ctx) } - return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") + + // Use callback for regular listing + var entries fs.DirEntries + err := f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", false, func(e fs.DirEntries) error { + entries = append(entries, e...) + return nil + }) + if err != nil { + return nil, err + } + return entries, nil +} + +// ListCB calls callback to the objects and directories in dir as they are being listed. +// The callback might be called for just a subset of directory entries. +// When listing buckets, the callback is called just once for all of them. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't found. +func (f *Fs) ListCB(ctx context.Context, dir string, cb fs.ListRCallback) error { + bucket, directory := f.split(dir) + if bucket == "" { + entries, err := f.listBuckets(ctx) + if err != nil { + return err + } + return cb(entries) + } + return f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", false, cb) } // ListR lists the objects and directories of the Fs starting @@ -731,50 +761,33 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // // Don't implement this unless you have a more efficient way // of listing recursively that doing a directory traversal. -func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { +func (f *Fs) ListR(ctx context.Context, dir string, cb fs.ListRCallback) error { bucket, directory := f.split(dir) - list := walk.NewListRHelper(callback) - listR := func(bucket, directory, prefix string, addBucket bool) error { - return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *storage.Object, isDirectory bool) error { - entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) - if err != nil { - return err - } - return list.Add(entry) - }) - } if bucket == "" { entries, err := f.listBuckets(ctx) if err != nil { return err } for _, entry := range entries { - err = list.Add(entry) + // Call callback on bucket right before listing its files + err = cb(fs.DirEntries{entry}) if err != nil { return err } bucket := entry.Remote() - err = listR(bucket, "", f.rootDirectory, true) + err = f.listCB(ctx, bucket, "", f.rootDirectory, true, true, cb) if err != nil { return err } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) } - } else { - err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") - if err != nil { - return err - } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) + return nil } - return list.Flush() + return f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", true, cb) } // Put the object into the bucket // -// Copy the reader in to the new object which is returned +// # Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { @@ -871,9 +884,9 @@ func (f *Fs) Precision() time.Duration { // Copy src to this remote using server-side copy operations. // -// This is stored with the remote path given +// # This is stored with the remote path given // -// It returns the destination Object and a possible error +// # It returns the destination Object and a possible error // // Will only be called if src.Fs().Name() == f.Name() // diff --git a/vendor/github.com/rclone/rclone/backend/s3/s3.go b/vendor/github.com/rclone/rclone/backend/s3/s3.go index d51c6ee7e6..c643c4e047 100644 --- a/vendor/github.com/rclone/rclone/backend/s3/s3.go +++ b/vendor/github.com/rclone/rclone/backend/s3/s3.go @@ -1396,7 +1396,7 @@ var retryErrorCodes = []int{ 503, // Service Unavailable/Slow Down - "Reduce your request rate" } -//S3 is pretty resilient, and the built in retry handling is probably sufficient +// S3 is pretty resilient, and the built in retry handling is probably sufficient // as it should notice closed connections and timeouts which are the most likely // sort of failure modes func (f *Fs) shouldRetry(err error) (bool, error) { @@ -1713,7 +1713,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e // Return an Object from a path // -//If it can't be found it returns the error ErrorObjectNotFound. +// If it can't be found it returns the error ErrorObjectNotFound. func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Object) (fs.Object, error) { o := &Object{ fs: f, @@ -1962,6 +1962,27 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck return nil } +// listCB calls list with cb wrapped in walk.NewListRHelper. +func (f *Fs) listCB(ctx context.Context, bucket, directory, prefix string, addBucket, recurse bool, cb fs.ListRCallback) error { + list := walk.NewListRHelper(cb) + err := f.list(ctx, bucket, directory, prefix, addBucket, recurse, func(remote string, object *s3.Object, isDirectory bool) error { + entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) + if err != nil { + return err + } + if entry != nil { + return list.Add(entry) + } + return nil + }) + if err != nil { + return err + } + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) + return list.Flush() +} + // Convert a list item into a DirEntry func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Object, isDirectory bool) (fs.DirEntry, error) { if isDirectory { @@ -1979,27 +2000,6 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec return o, nil } -// listDir lists files and directories to out -func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { - // List the objects and directories - err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error { - entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) - if err != nil { - return err - } - if entry != nil { - entries = append(entries, entry) - } - return nil - }) - if err != nil { - return nil, err - } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) - return entries, nil -} - // listBuckets lists the buckets to out func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) { req := s3.ListBucketsInput{} @@ -2029,7 +2029,7 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) // // This should return ErrDirNotFound if the directory isn't // found. -func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { bucket, directory := f.split(dir) if bucket == "" { if directory != "" { @@ -2037,7 +2037,40 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } return f.listBuckets(ctx) } - return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") + + // Use callback for regular listing + var entries fs.DirEntries + err := f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", false, func(e fs.DirEntries) error { + entries = append(entries, e...) + return nil + }) + if err != nil { + return nil, err + } + return entries, nil +} + +// ListCB calls callback to the objects and directories in dir as they are being listed. +// The callback might be called for just a subset of directory entries. +// When listing buckets, the callback is called just once for all of them. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't found. +func (f *Fs) ListCB(ctx context.Context, dir string, cb fs.ListRCallback) error { + bucket, directory := f.split(dir) + if bucket == "" { + if directory != "" { + return fs.ErrorListBucketRequired + } + entries, err := f.listBuckets(ctx) + if err != nil { + return err + } + return cb(entries) + } + return f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", false, cb) } // ListR lists the objects and directories of the Fs starting @@ -2056,45 +2089,28 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // // Don't implement this unless you have a more efficient way // of listing recursively than doing a directory traversal. -func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { +func (f *Fs) ListR(ctx context.Context, dir string, cb fs.ListRCallback) error { bucket, directory := f.split(dir) - list := walk.NewListRHelper(callback) - listR := func(bucket, directory, prefix string, addBucket bool) error { - return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *s3.Object, isDirectory bool) error { - entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) - if err != nil { - return err - } - return list.Add(entry) - }) - } if bucket == "" { entries, err := f.listBuckets(ctx) if err != nil { return err } for _, entry := range entries { - err = list.Add(entry) + // Call callback on bucket right before listing its files + err = cb(fs.DirEntries{entry}) if err != nil { return err } bucket := entry.Remote() - err = listR(bucket, "", f.rootDirectory, true) + err = f.listCB(ctx, bucket, "", f.rootDirectory, true, true, cb) if err != nil { return err } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) - } - } else { - err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") - if err != nil { - return err } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) + return nil } - return list.Flush() + return f.listCB(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", true, cb) } // Put the Object into the bucket @@ -2358,9 +2374,9 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst // Copy src to this remote using server-side copy operations. // -// This is stored with the remote path given +// # This is stored with the remote path given // -// It returns the destination Object and a possible error +// # It returns the destination Object and a possible error // // Will only be called if src.Fs().Name() == f.Name() // diff --git a/vendor/github.com/rclone/rclone/fs/config.go b/vendor/github.com/rclone/rclone/fs/config.go index 22e3682f99..9d8793d05b 100644 --- a/vendor/github.com/rclone/rclone/fs/config.go +++ b/vendor/github.com/rclone/rclone/fs/config.go @@ -82,6 +82,7 @@ type ConfigInfo struct { Suffix string `yaml:"suffix"` SuffixKeepExtension bool `yaml:"suffix_keep_extension"` UseListR bool `yaml:"use_list_r"` + UseListCB bool `yaml:"use_list_cb"` BufferSize SizeSuffix `yaml:"buffer_size"` BwLimit BwTimetable `yaml:"bw_limit"` BwLimitFile BwTimetable `yaml:"bw_limit_file"` diff --git a/vendor/github.com/rclone/rclone/fs/fs.go b/vendor/github.com/rclone/rclone/fs/fs.go index f91192261c..c668343de5 100644 --- a/vendor/github.com/rclone/rclone/fs/fs.go +++ b/vendor/github.com/rclone/rclone/fs/fs.go @@ -1045,6 +1045,23 @@ type ListRer interface { ListR(ctx context.Context, dir string, callback ListRCallback) error } +// ListCBer extends Fs with ListCB. +type ListCBer interface { + Fs + // ListCB calls callback on directory entries as they are being listed. + // + // dir should be "" to start from the root, and should not + // have trailing slashes. + // + // This should return ErrDirNotFound if the directory isn't found. + // + // It should call callback for each tranche of entries read. + // These need not be returned in any particular order. If + // callback returns an error then the listing will stop + // immediately. + ListCB(ctx context.Context, dir string, callback ListRCallback) error +} + // RangeSeeker is the interface that wraps the RangeSeek method. // // Some of the returns from Object.Open() may optionally implement @@ -1187,7 +1204,7 @@ func Find(name string) (*RegInfo, error) { // MustFind looks for an Info object for the type name passed in // -// Services are looked up in the config file +// # Services are looked up in the config file // // Exits with a fatal error if not found func MustFind(name string) *RegInfo { diff --git a/vendor/github.com/rclone/rclone/fs/list/list.go b/vendor/github.com/rclone/rclone/fs/list/list.go index dfa8b688d4..38aca0cd92 100644 --- a/vendor/github.com/rclone/rclone/fs/list/list.go +++ b/vendor/github.com/rclone/rclone/fs/list/list.go @@ -36,6 +36,37 @@ func DirSorted(ctx context.Context, f fs.Fs, includeAll bool, dir string) (entri return filterAndSortDir(ctx, entries, includeAll, dir, fi.IncludeObject, fi.IncludeDirectory(ctx, f)) } +// Func is copied from imports github.com/rclone/rclone/fs/walk +// in order to avoid import cycle. +type Func func(path string, entries fs.DirEntries, err error) error + +// DirCBFunc is the type of DirCB function. +type DirCBFunc func(ctx context.Context, fs fs.ListCBer, includeAll bool, dir string, cb Func) error + +// DirCB works like DirSorted but uses ListCB instead of List for file listing. +func DirCB(ctx context.Context, f fs.ListCBer, includeAll bool, dir string, cb Func) error { + fi := filter.GetConfig(ctx) + // Get unfiltered entries from the fs + return f.ListCB(ctx, dir, func(entries fs.DirEntries) error { + // This should happen only if exclude files lives in the + // starting directory, otherwise ListDirSorted should not be + // called. + if !includeAll && fi.ListContainsExcludeFile(entries) { + fs.Debugf(dir, "Excluded") + return nil + } + var err error + entries, err = filterAndSortDir(ctx, entries, includeAll, dir, fi.IncludeObject, fi.IncludeDirectory(ctx, f)) + if err != nil { + return err + } + if len(entries) > 0 { + return cb(dir, entries, nil) + } + return nil + }) +} + // filter (if required) and check the entries, then sort them func filterAndSortDir(ctx context.Context, entries fs.DirEntries, includeAll bool, dir string, IncludeObject func(ctx context.Context, o fs.Object) bool, diff --git a/vendor/github.com/rclone/rclone/fs/walk/walk.go b/vendor/github.com/rclone/rclone/fs/walk/walk.go index b731fcf5d7..6be245e5bc 100644 --- a/vendor/github.com/rclone/rclone/fs/walk/walk.go +++ b/vendor/github.com/rclone/rclone/fs/walk/walk.go @@ -49,7 +49,7 @@ type Func func(path string, entries fs.DirEntries, err error) error // Note that fn will not be called concurrently whereas the directory // listing will proceed concurrently. // -// Parent directories are always listed before their children +// # Parent directories are always listed before their children // // This is implemented by WalkR if Config.UseListR is true // and f supports it and level > 1, or WalkN otherwise. @@ -62,12 +62,19 @@ func Walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i ci := fs.GetConfig(ctx) fi := filter.GetConfig(ctx) if ci.NoTraverse && fi.HaveFilesFrom() { + fs.Logf(nil, "Walk: used walkR for listing") return walkR(ctx, f, path, includeAll, maxLevel, fn, fi.MakeListR(ctx, f.NewObject)) } // FIXME should this just be maxLevel < 0 - why the maxLevel > 1 if (maxLevel < 0 || maxLevel > 1) && ci.UseListR && f.Features().ListR != nil { + fs.Logf(nil, "Walk: used walkListR for listing") return walkListR(ctx, f, path, includeAll, maxLevel, fn) } + if fcb, ok := f.(fs.ListCBer); ci.UseListCB && ok { + fs.Logf(nil, "Walk: used walkCB for listing") + return walkCB(ctx, fcb, path, includeAll, maxLevel, fn, list.DirCB) + } + fs.Logf(nil, "Walk: used walkListDirSorted for listing") return walkListDirSorted(ctx, f, path, includeAll, maxLevel, fn) } @@ -353,9 +360,54 @@ func walkListR(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLe return walkR(ctx, f, path, includeAll, maxLevel, fn, listR) } +// listJob describe a directory listing that needs to be done. +type listJob struct { + remote string + depth int +} + type listDirFunc func(ctx context.Context, fs fs.Fs, includeAll bool, dir string) (entries fs.DirEntries, err error) func walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel int, fn Func, listDir listDirFunc) error { + return walkCore(ctx, path, maxLevel, func(ctx context.Context, job listJob) ([]listJob, error) { + entries, err := listDir(ctx, f, includeAll, job.remote) + var jobs []listJob + if err == nil && job.depth != 0 { + entries.ForDir(func(dir fs.Directory) { + // Recurse for the directory + jobs = append(jobs, listJob{ + remote: dir.Remote(), + depth: job.depth - 1, + }) + }) + } + err = fn(job.remote, entries, err) + return jobs, err + }) +} + +func walkCB(ctx context.Context, f fs.ListCBer, path string, includeAll bool, maxLevel int, fn Func, listDir list.DirCBFunc) error { + return walkCore(ctx, path, maxLevel, func(ctx context.Context, job listJob) ([]listJob, error) { + var jobs []listJob + err := listDir(ctx, f, includeAll, job.remote, func(path string, entries fs.DirEntries, err error) error { + if err == nil && job.depth != 0 { + entries.ForDir(func(dir fs.Directory) { + // Recurse for the directory + jobs = append(jobs, listJob{ + remote: dir.Remote(), + depth: job.depth - 1, + }) + }) + } + return fn(path, entries, err) + }) + return jobs, err + }) +} + +type walkCoreFunc func(ctx context.Context, job listJob) ([]listJob, error) + +func walkCore(ctx context.Context, path string, maxLevel int, core walkCoreFunc) error { var ( wg sync.WaitGroup // sync closing of go routines traversing sync.WaitGroup // running directory traversals @@ -363,11 +415,6 @@ func walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i mu sync.Mutex // stop fn being called concurrently ci = fs.GetConfig(ctx) // current config ) - // listJob describe a directory listing that needs to be done - type listJob struct { - remote string - depth int - } in := make(chan listJob, ci.Checkers) errs := make(chan error, 1) @@ -392,19 +439,8 @@ func walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i if !ok { return } - entries, err := listDir(ctx, f, includeAll, job.remote) - var jobs []listJob - if err == nil && job.depth != 0 { - entries.ForDir(func(dir fs.Directory) { - // Recurse for the directory - jobs = append(jobs, listJob{ - remote: dir.Remote(), - depth: job.depth - 1, - }) - }) - } mu.Lock() - err = fn(job.remote, entries, err) + jobs, err := core(ctx, job) mu.Unlock() // NB once we have passed entries to fn we mustn't touch it again if err != nil && err != ErrorSkipDir { diff --git a/vendor/modules.txt b/vendor/modules.txt index 992bd5d1df..922e3d7aec 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -313,7 +313,7 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/rclone/rclone v1.51.0 => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e +# github.com/rclone/rclone v1.51.0 => github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668 ## explicit; go 1.21 github.com/rclone/rclone/backend/azureblob github.com/rclone/rclone/backend/crypt @@ -758,4 +758,4 @@ gopkg.in/yaml.v2 ## explicit gopkg.in/yaml.v3 # github.com/gocql/gocql => github.com/scylladb/gocql v1.12.0 -# github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e +# github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668