Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rclone: stream files as they are being listed #4146

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,6 @@ require (

replace (
github.com/gocql/gocql => github.com/scylladb/gocql v1.12.0
github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e
github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668
google.golang.org/api v0.114.0 => github.com/scylladb/google-api-go-client v0.34.1-patched
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1051,8 +1051,8 @@ github.com/scylladb/gocqlx/v2 v2.8.0 h1:f/oIgoEPjKDKd+RIoeHqexsIQVIbalVmT+axwvUq
github.com/scylladb/gocqlx/v2 v2.8.0/go.mod h1:4/+cga34PVqjhgSoo5Nr2fX1MQIqZB5eCE5DK4xeDig=
github.com/scylladb/google-api-go-client v0.34.1-patched h1:DW+T0HA+74o6FDr3TFzVwgESabOB1eTwb4woE6oUziY=
github.com/scylladb/google-api-go-client v0.34.1-patched/go.mod h1:RriRmS2wJXH+2yd9PRTEcR380U9AXmurWwznqVhzsSc=
github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e h1:lJRphCtu+nKd+mfo8whOTeFkgjMWvk8iCSlqgibKSa8=
github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM=
github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668 h1:3beIciiaygPjiUPLN0+wVlnfe4JnB1Wb/uV1MChzRbM=
github.com/scylladb/rclone v1.54.1-0.20241203155938-9f3550e7e668/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241104134613-aba35605c28b h1:JRDV1d1FIiH0TIyHVmTAILAjQ2f8O4t7ZtZ/S+fT2sY=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241104134613-aba35605c28b/go.mod h1:Tss7a99vrgds+B70w8ZFG3Skxfr9Br3kAzrKP2b3CmQ=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241104134613-aba35605c28b h1:7CHNmPrQqSdApaEh5nkRL+D52KFHaOHVBBVDvytHEOY=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ rclone:
suffix: ""
suffix_keep_extension: false
use_list_r: false
use_list_cb: false
buffer_size: 16777216
bw_limit: []
bw_limit_file: []
Expand Down
1 change: 1 addition & 0 deletions pkg/config/agent/testdata/basic.golden.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ rclone:
suffix: ""
suffix_keep_extension: false
use_list_r: false
use_list_cb: false
buffer_size: 16777216
bw_limit: []
bw_limit_file: []
Expand Down
1 change: 1 addition & 0 deletions pkg/config/agent/testdata/debug_overwrite.golden.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ rclone:
suffix: ""
suffix_keep_extension: false
use_list_r: false
use_list_cb: false
buffer_size: 16777216
bw_limit: []
bw_limit_file: []
Expand Down
1 change: 1 addition & 0 deletions pkg/config/agent/testdata/https_overwrite.golden.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ rclone:
suffix: ""
suffix_keep_extension: false
use_list_r: false
use_list_cb: false
buffer_size: 16777216
bw_limit: []
bw_limit_file: []
Expand Down
1 change: 1 addition & 0 deletions pkg/config/agent/testdata/multiple_cpus.golden.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ rclone:
suffix: ""
suffix_keep_extension: false
use_list_r: false
use_list_cb: false
buffer_size: 16777216
bw_limit: []
bw_limit_file: []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ rclone:
suffix: ""
suffix_keep_extension: false
use_list_r: false
use_list_cb: false
buffer_size: 16777216
bw_limit: []
bw_limit_file: []
Expand Down
1 change: 1 addition & 0 deletions pkg/config/agent/testdata/scylla_overwrite.golden.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ rclone:
suffix: ""
suffix_keep_extension: false
use_list_r: false
use_list_cb: false
buffer_size: 16777216
bw_limit: []
bw_limit_file: []
Expand Down
1 change: 1 addition & 0 deletions pkg/config/agent/testdata/structs_empty.golden.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ rclone:
suffix: ""
suffix_keep_extension: false
use_list_r: false
use_list_cb: false
buffer_size: 16777216
bw_limit: []
bw_limit_file: []
Expand Down
5 changes: 5 additions & 0 deletions pkg/rclone/rcserver/rc.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ func rcChunkedList(ctx context.Context, in rc.Params) (out rc.Params, err error)
}
}

ctx, ci := fs.AddConfig(ctx)
// Allow for calling callback as the dir is listed (#4132)
// Note that ListCB is implemented only for a non-recursive listings.
ci.UseListCB = true

w, err := in.GetHTTPResponseWriter()
if err != nil {
return nil, err
Expand Down
76 changes: 76 additions & 0 deletions pkg/scyllaclient/client_rclone_agent_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"context"
"fmt"
"os"
"path"
"strings"
"testing"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
"go.uber.org/zap/zapcore"
)

Expand Down Expand Up @@ -605,6 +607,80 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) {
}
}

func BenchmarkRcloneListDirIterIntegration(b *testing.B) {
bucket := S3BucketPath(testBucket)
if err := os.RemoveAll(bucket); err != nil {
b.Fatal(err)
}
if err := os.Mkdir(bucket, 0o700); err != nil {
b.Fatal(err)
}

client, err := scyllaclient.NewClient(scyllaclient.TestConfig(ManagedClusterHosts(), AgentAuthToken()), log.NewDevelopmentWithLevel(zapcore.ErrorLevel))
if err != nil {
b.Fatal(err)
}

const fileCnt = 5555
Printf("Given: dir with %d files", fileCnt)
for i := 0; i < fileCnt; i++ {
f, err := os.Create(path.Join(bucket, fmt.Sprint(i)))
if err != nil {
b.Fatal(err)
}
_ = f.Close()
}

Print("When: check list iter latency")
// ListCB is implemented only for the non-recursive listings (take a look at rcChunkedList)
opts := &scyllaclient.RcloneListDirOpts{
Recurse: false,
}
// 1000 is the default chunk size for s3
const rcloneChunkSize = 1000
const sampleSize = 1000
var avgTotal, avgFirst, avgCross, avgWithin time.Duration
for sample := range sampleSize {
var firstDiff, maxCrossChunkDiff, maxWithinChunkDiff time.Duration
idx := 0
lastCB := timeutc.Now()
start := timeutc.Now()
err = client.RcloneListDirIter(context.Background(), ManagedClusterHost(), remotePath(""), opts, func(item *scyllaclient.RcloneListDirItem) {
idx++
now := timeutc.Now()
if idx == 1 {
firstDiff = now.Sub(lastCB)
lastCB = now
return
}
diff := now.Sub(lastCB)
lastCB = now
if idx%rcloneChunkSize == 1 {
maxCrossChunkDiff = max(maxCrossChunkDiff, diff)
} else {
maxWithinChunkDiff = max(maxWithinChunkDiff, diff)
}
})
if err != nil {
b.Fatal(err)
}
total := timeutc.Now().Sub(start)

b.Log("sample: ", sample, "total: ", total, "first: ", firstDiff, "maxCross: ", maxCrossChunkDiff, "maxWithin: ", maxWithinChunkDiff)
avgTotal += total
avgFirst += firstDiff
avgCross += maxCrossChunkDiff
avgWithin += maxWithinChunkDiff
}

avgTotal /= sampleSize
avgFirst /= sampleSize
avgCross /= sampleSize
avgWithin /= sampleSize

b.Log("Avg total time: ", avgTotal, "Avg first latency: ", avgFirst, "Avg max cross chunk latency: ", avgCross, "Avg max within chunk latency: ", avgWithin)
}

func validateDirContents(ctx context.Context, client *scyllaclient.Client, host, remotePath string, files map[string]string) error {
// Check if all specified files with given contents are present
for f, expected := range files {
Expand Down
1 change: 0 additions & 1 deletion pkg/service/backup/worker_deduplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error {
remoteSSTableBundles := newSSTableBundlesByID()
listOpts := &scyllaclient.RcloneListDirOpts{
FilesOnly: true,
Recurse: true,
}
if err := w.Client.RcloneListDirIter(ctx, h.IP, dataDst, listOpts, func(f *scyllaclient.RcloneListDirItem) {
if err := remoteSSTableBundles.add(f.Name, f.Size); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/testutils/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ func S3Credentials() (provider, endpoint, accessKeyID, secretAccessKey string) {
}
return *flagS3Provider, *flagS3Endpoint, *flagS3AccessKeyID, *flagS3SecretAccessKey
}

// S3BucketPath returns the os path to the bucket.
func S3BucketPath(bucket string) string {
if !flag.Parsed() {
flag.Parse()
}
return filepath.Join(*flagS3DataDir, bucket)
}
Loading
Loading