Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rclone: don't rely on cached bandwidth limit in configguard #4120

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 66 additions & 13 deletions pkg/rclone/rcserver/rcconfigguard.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package rcserver

import (
"context"
"slices"
"sync"

"github.com/pkg/errors"
Expand All @@ -26,16 +27,12 @@ type configGuard struct {
initialized atomic.Bool

defaultTransfers int
transfers int
bandwidthLimit string
}

func (cg *configGuard) init() {
if cg.initialized.CompareAndSwap(false, true) {
defaultTransfers := fs.GetConfig(context.Background()).Transfers
cg.defaultTransfers = defaultTransfers
cg.transfers = defaultTransfers
cg.bandwidthLimit = ""
}
}

Expand All @@ -55,15 +52,11 @@ func SetTransfers(transfers int) error {
}
// Returns global config
ci := fs.GetConfig(context.Background())
if transfers == globalConfigGuard.transfers {
// Safety check in case configguard is not in sync with global config
if transfers == ci.Transfers {
// Transfers are already set to specified value
return nil
}
if transfers == ci.Transfers {
// Transfers are already set to specified value
return nil
}

globalConfigGuard.transfers = transfers
ci.Transfers = transfers
// The amount of transfers impacts fs.Fs initialization (e.g. pool.Pool and fs.Pacer),
// so fs.Fs cache should be cleared on transfers count change.
Expand All @@ -77,16 +70,76 @@ func SetBandwidthLimit(limit string) error {
defer globalConfigGuard.mu.Unlock()
globalConfigGuard.init()

if limit == globalConfigGuard.bandwidthLimit {
currLimit, err := getBandwidthLimit()
if err != nil {
return err
}
eq, err := equalBandwidths(limit, currLimit)
if err != nil {
return err
}
if eq {
// Bandwidth limit is already set to specified value
return nil
}

in := rc.Params{
"rate": limit,
}
_, err := rcCalls.Get("core/bwlimit").Fn(context.Background(), in)
// Uses *tokenBucket.rcBwlimit method
_, err = rcCalls.Get("core/bwlimit").Fn(context.Background(), in)
if err != nil {
return errors.Wrapf(err, "set bandwidth to %s", limit)
}
return nil
}

func getBandwidthLimit() (string, error) {
// Uses *tokenBucket.rcBwlimit method
out, err := rcCalls.Get("core/bwlimit").Fn(context.Background(), make(rc.Params))
if err != nil {
return "", errors.Wrap(err, "get bandwidth")
}
limit, err := out.GetString("rate")
if err != nil {
return "", errors.Wrap(err, "parse current bandwidth")
}
return limit, err
}

func equalBandwidths(limit1, limit2 string) (bool, error) {
bws1, err := parseBandwidth(limit1)
if err != nil {
return false, err
}
bws2, err := parseBandwidth(limit2)
if err != nil {
return false, err
}
return slices.EqualFunc(bws1, bws2, func(s1 fs.BwTimeSlot, s2 fs.BwTimeSlot) bool {
if s1.HHMM != s2.HHMM || s1.DayOfTheWeek != s2.DayOfTheWeek {
return false
}
// Unlimited bandwidth can be described by any number <= 0
if s1.Bandwidth.Tx > 0 || s2.Bandwidth.Tx > 0 {
if s1.Bandwidth.Tx != s2.Bandwidth.Tx {
return false
}
}
if s1.Bandwidth.Rx > 0 || s2.Bandwidth.Rx > 0 {
if s1.Bandwidth.Rx != s2.Bandwidth.Rx {
return false
}
}
return true
}), nil
}

func parseBandwidth(limit string) (fs.BwTimetable, error) {
var bws fs.BwTimetable
err := bws.Set(limit)
if err != nil {
return nil, errors.Wrapf(err, "parse limit: %s", limit)
}
return bws, nil
}
5 changes: 5 additions & 0 deletions pkg/scyllaclient/client_rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
// transfers performed under current client session.
// Limit is expressed in MiB per second.
// To turn off limitation set it to 0.
//
// Note that it's safer to set bandwidth limit as a parameter of given Rclone call
// (e.g. RcloneMoveDir or RcloneCopyPaths) as it's resistant to agent restarts
// or some other process modifying Rclone bandwidth limit in the meantime.
// Because of that, this method should be used in tests only.
func (c *Client) RcloneSetBandwidthLimit(ctx context.Context, host string, limit int) error {
p := operations.CoreBwlimitParams{
Context: forceHost(ctx, host),
Expand Down
4 changes: 0 additions & 4 deletions pkg/service/backup/worker_deduplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error {
}(w.hostSnapshotDirs(h))
}

if err := w.setRateLimit(ctx, h); err != nil {
return errors.Wrap(err, "set rate limit")
}

dirs := w.hostSnapshotDirs(h)
f := func(i int) (err error) {
d := &dirs[i]
Expand Down
9 changes: 0 additions & 9 deletions pkg/service/backup/worker_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ func (w *worker) Upload(ctx context.Context, hosts []hostInfo, limits []DCLimit)
}

func (w *worker) uploadHost(ctx context.Context, h hostInfo) error {
if err := w.setRateLimit(ctx, h); err != nil {
return errors.Wrap(err, "set rate limit")
}

dirs := w.hostSnapshotDirs(h)

f := func(i int) (err error) {
Expand Down Expand Up @@ -151,11 +147,6 @@ func (w *worker) snapshotJobID(ctx context.Context, d snapshotDir) int64 {
return 0
}

func (w *worker) setRateLimit(ctx context.Context, h hostInfo) error {
w.Logger.Info(ctx, "Setting rate limit", "host", h.IP, "limit", h.RateLimit.Limit)
return w.Client.RcloneSetBandwidthLimit(ctx, h.IP, h.RateLimit.Limit)
}

func (w *worker) uploadSnapshotDir(ctx context.Context, h hostInfo, d snapshotDir) error {
w.Logger.Info(ctx, "Uploading table snapshot",
"host", h.IP,
Expand Down
Loading