Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve replica revision counter #1097

Merged
merged 4 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions app/cmd/replica.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -91,7 +92,7 @@ func ReplicaCmd() cli.Command {
}
}

func startReplica(c *cli.Context) error {
func startReplica(c *cli.Context) (err error) {
if c.NArg() != 1 {
return errors.New("directory name is required")
}
Expand All @@ -115,7 +116,14 @@ func startReplica(c *cli.Context) error {
}
}

s := replica.NewServer(dir, backingFile, diskutil.ReplicaSectorSize, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, snapshotMaxSize)
ctx, cancel := context.WithCancel(context.Background())
defer func() {
if err != nil {
cancel()
}
}()

s := replica.NewServer(ctx, dir, backingFile, diskutil.ReplicaSectorSize, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, snapshotMaxSize)

address := c.String("listen")

Expand Down Expand Up @@ -144,6 +152,8 @@ func startReplica(c *cli.Context) error {
resp := make(chan error)

go func() {
defer cancel()

listen, err := net.Listen("tcp", controlAddress)
if err != nil {
logrus.WithError(err).Warnf("Failed to listen %v", controlAddress)
Expand All @@ -160,6 +170,8 @@ func startReplica(c *cli.Context) error {
}()

go func() {
defer cancel()

rpcServer := replicarpc.NewDataServer(types.DataServerProtocol(dataServerProtocol), dataAddress, s)
logrus.Infof("Listening on data server %s", dataAddress)
err := rpcServer.ListenAndServe()
Expand All @@ -179,6 +191,8 @@ func startReplica(c *cli.Context) error {
}

go func() {
defer cancel()

cmd := exec.Command(exe, "--volume-name", volumeName, "sync-agent", "--listen", syncAddress,
"--replica", controlAddress,
"--listen-port-range",
Expand Down
3 changes: 2 additions & 1 deletion pkg/replica/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package replica

import (
"context"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -93,7 +94,7 @@ func (rb *BackupStatus) OpenSnapshot(snapID, volumeID string) error {
if err != nil {
return errors.Wrap(err, "cannot get working directory")
}
r, err := NewReadOnly(dir, id, rb.backingFile)
r, err := NewReadOnly(context.Background(), dir, id, rb.backingFile)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/replica/backup_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package replica

import (
"context"
"os"
"path"

Expand All @@ -21,7 +22,7 @@ func (s *TestSuite) TestBackup(c *C) {
err = os.Chdir(dir)
c.Assert(err, IsNil)

r, err := New(10*mb, bs, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 10*mb, bs, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -87,7 +88,7 @@ func (s *TestSuite) testBackupWithBackups(c *C, backingFile *backingfile.Backing
c.Assert(err, IsNil)
volume := "test"

r, err := New(10*mb, bs, dir, backingFile, false, false, 250, 0)
r, err := New(context.Background(), 10*mb, bs, dir, backingFile, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down
54 changes: 35 additions & 19 deletions pkg/replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"

"github.com/pkg/errors"
Expand Down Expand Up @@ -43,6 +44,7 @@ var (

type Replica struct {
sync.RWMutex
ctx context.Context
volume diffDisk
dir string
info Info
Expand All @@ -53,11 +55,11 @@ type Replica struct {
activeDiskData []*disk
readOnly bool

revisionLock sync.Mutex
revisionCache int64
revisionCache atomic.Int64
revisionFile *sparse.DirectFileIoProcessor
revisionRefreshed bool
revisionCounterDisabled bool
revisionCounterReqChan chan bool
revisionCounterAckChan chan error

unmapMarkDiskChainRemoved bool

Expand Down Expand Up @@ -129,7 +131,7 @@ func OpenSnapshot(dir string, snapshotName string) (*Replica, error) {
}
}

r, err := NewReadOnly(dir, snapshotDiskName, backingFile)
r, err := NewReadOnly(context.Background(), dir, snapshotDiskName, backingFile)
if err != nil {
return nil, err
}
Expand All @@ -142,17 +144,17 @@ func ReadInfo(dir string) (Info, error) {
return info, err
}

func New(size, sectorSize int64, dir string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, SnapshotMaxSize int64) (*Replica, error) {
return construct(false, size, sectorSize, dir, "", backingFile, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, SnapshotMaxSize)
func New(ctx context.Context, size, sectorSize int64, dir string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, SnapshotMaxSize int64) (*Replica, error) {
return construct(ctx, false, size, sectorSize, dir, "", backingFile, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, SnapshotMaxSize)
}

func NewReadOnly(dir, head string, backingFile *backingfile.BackingFile) (*Replica, error) {
func NewReadOnly(ctx context.Context, dir, head string, backingFile *backingfile.BackingFile) (*Replica, error) {
// size and sectorSize don't matter because they will be read from metadata
// snapshotMaxCount and SnapshotMaxSize don't matter because readonly replica can't create a new disk
return construct(true, 0, diskutil.ReplicaSectorSize, dir, head, backingFile, false, false, 250, 0)
return construct(ctx, true, 0, diskutil.ReplicaSectorSize, dir, head, backingFile, false, false, 250, 0)
}

func construct(readonly bool, size, sectorSize int64, dir, head string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) (*Replica, error) {
func construct(ctx context.Context, readonly bool, size, sectorSize int64, dir, head string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) (*Replica, error) {
if size%sectorSize != 0 {
return nil, fmt.Errorf("size %d not a multiple of sector size %d", size, sectorSize)
}
Expand All @@ -162,12 +164,15 @@ func construct(readonly bool, size, sectorSize int64, dir, head string, backingF
}

r := &Replica{
ctx: ctx,
dir: dir,
activeDiskData: make([]*disk, 1),
diskData: make(map[string]*disk),
diskChildrenMap: map[string]map[string]bool{},
readOnly: readonly,
revisionCounterDisabled: disableRevCounter,
revisionCounterReqChan: make(chan bool, 1024), // Buffered channel to avoid blocking
revisionCounterAckChan: make(chan error),
unmapMarkDiskChainRemoved: unmapMarkDiskChainRemoved,
snapshotMaxCount: snapshotMaxCount,
snapshotMaxSize: snapshotMaxSize,
Expand Down Expand Up @@ -195,7 +200,7 @@ func construct(readonly bool, size, sectorSize int64, dir, head string, backingF
}

if !r.revisionCounterDisabled {
if err := r.initRevisionCounter(); err != nil {
if err := r.initRevisionCounter(ctx); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -289,7 +294,7 @@ func (r *Replica) SetRebuilding(rebuilding bool) error {
}

func (r *Replica) Reload() (*Replica, error) {
newReplica, err := New(r.info.Size, r.info.SectorSize, r.dir, r.info.BackingFile, r.revisionCounterDisabled, r.unmapMarkDiskChainRemoved, r.snapshotMaxCount, r.snapshotMaxSize)
newReplica, err := New(r.ctx, r.info.Size, r.info.SectorSize, r.dir, r.info.BackingFile, r.revisionCounterDisabled, r.unmapMarkDiskChainRemoved, r.snapshotMaxCount, r.snapshotMaxSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1275,6 +1280,15 @@ func (r *Replica) WriteAt(buf []byte, offset int64) (int, error) {
return 0, fmt.Errorf("cannot write on read-only replica")
}

// Increase the revision counter optimistically in a separate goroutine since most of the time write operations will succeed.
// Once the write operation fails, the revision counter will be wrongly increased by 1. It means that the revision counter is not accurate.
// Actually, the revision counter is not accurate even without this optimistic increment since we cannot make data write operation and the revision counter increment atomic.
go func() {
if !r.revisionCounterDisabled {
r.revisionCounterReqChan <- true
}
}()

r.RLock()
r.info.Dirty = true
c, err := r.volume.WriteAt(buf, offset)
Expand All @@ -1284,12 +1298,10 @@ func (r *Replica) WriteAt(buf []byte, offset int64) (int, error) {
}

if !r.revisionCounterDisabled {
if err := r.increaseRevisionCounter(); err != nil {
return c, err
}
err = <-r.revisionCounterAckChan
}

return c, nil
return c, err
}

func (r *Replica) ReadAt(buf []byte, offset int64) (int, error) {
Expand All @@ -1309,6 +1321,12 @@ func (r *Replica) UnmapAt(length uint32, offset int64) (n int, err error) {
return 0, fmt.Errorf("can not unmap on read-only replica")
}

go func() {
if !r.revisionCounterDisabled {
r.revisionCounterReqChan <- true
}
}()

unmappedSize, err := func() (int, error) {
r.Lock()
defer r.Unlock()
Expand Down Expand Up @@ -1346,12 +1364,10 @@ func (r *Replica) UnmapAt(length uint32, offset int64) (n int, err error) {
}

if !r.revisionCounterDisabled {
if err := r.increaseRevisionCounter(); err != nil {
return 0, err
}
err = <-r.revisionCounterAckChan
}

return unmappedSize, nil
return unmappedSize, err
}

func (r *Replica) ListDisks() map[string]DiskInfo {
Expand Down
Loading
Loading