From 231985829961f8604a01b5997e60a24533343244 Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Fri, 3 May 2024 23:30:56 -0700 Subject: [PATCH] replica: Add context for revision counter goroutine Longhorn 8436 Signed-off-by: Shuo Wu --- app/cmd/replica.go | 18 +++++++++++++-- pkg/replica/backup.go | 3 ++- pkg/replica/backup_test.go | 5 +++-- pkg/replica/replica.go | 18 ++++++++------- pkg/replica/replica_test.go | 39 +++++++++++++++++---------------- pkg/replica/revision_counter.go | 5 ++++- pkg/replica/server.go | 9 +++++--- 7 files changed, 61 insertions(+), 36 deletions(-) diff --git a/app/cmd/replica.go b/app/cmd/replica.go index 8eefc36a1..34b46e276 100644 --- a/app/cmd/replica.go +++ b/app/cmd/replica.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "errors" "fmt" "net" @@ -91,7 +92,7 @@ func ReplicaCmd() cli.Command { } } -func startReplica(c *cli.Context) error { +func startReplica(c *cli.Context) (err error) { if c.NArg() != 1 { return errors.New("directory name is required") } @@ -115,7 +116,14 @@ func startReplica(c *cli.Context) error { } } - s := replica.NewServer(dir, backingFile, diskutil.ReplicaSectorSize, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, snapshotMaxSize) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + if err != nil { + cancel() + } + }() + + s := replica.NewServer(ctx, dir, backingFile, diskutil.ReplicaSectorSize, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, snapshotMaxSize) address := c.String("listen") @@ -144,6 +152,8 @@ func startReplica(c *cli.Context) error { resp := make(chan error) go func() { + defer cancel() + listen, err := net.Listen("tcp", controlAddress) if err != nil { logrus.WithError(err).Warnf("Failed to listen %v", controlAddress) @@ -160,6 +170,8 @@ func startReplica(c *cli.Context) error { }() go func() { + defer cancel() + rpcServer := replicarpc.NewDataServer(types.DataServerProtocol(dataServerProtocol), dataAddress, s) logrus.Infof("Listening on data server %s", dataAddress) err := rpcServer.ListenAndServe() @@ -179,6 +191,8 @@ func startReplica(c *cli.Context) error { } go func() { + defer cancel() + cmd := exec.Command(exe, "--volume-name", volumeName, "sync-agent", "--listen", syncAddress, "--replica", controlAddress, "--listen-port-range", diff --git a/pkg/replica/backup.go b/pkg/replica/backup.go index 88c03cc99..520d92681 100644 --- a/pkg/replica/backup.go +++ b/pkg/replica/backup.go @@ -1,6 +1,7 @@ package replica import ( + "context" "fmt" "os" "sync" @@ -93,7 +94,7 @@ func (rb *BackupStatus) OpenSnapshot(snapID, volumeID string) error { if err != nil { return errors.Wrap(err, "cannot get working directory") } - r, err := NewReadOnly(dir, id, rb.backingFile) + r, err := NewReadOnly(context.Background(), dir, id, rb.backingFile) if err != nil { return err } diff --git a/pkg/replica/backup_test.go b/pkg/replica/backup_test.go index ad2e0a1be..a036ce318 100644 --- a/pkg/replica/backup_test.go +++ b/pkg/replica/backup_test.go @@ -1,6 +1,7 @@ package replica import ( + "context" "os" "path" @@ -21,7 +22,7 @@ func (s *TestSuite) TestBackup(c *C) { err = os.Chdir(dir) c.Assert(err, IsNil) - r, err := New(10*mb, bs, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 10*mb, bs, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -87,7 +88,7 @@ func (s *TestSuite) testBackupWithBackups(c *C, backingFile *backingfile.Backing c.Assert(err, IsNil) volume := "test" - r, err := New(10*mb, bs, dir, backingFile, false, false, 250, 0) + r, err := New(context.Background(), 10*mb, bs, dir, backingFile, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() diff --git a/pkg/replica/replica.go b/pkg/replica/replica.go index 533c0f443..815b9bbcd 100644 --- a/pkg/replica/replica.go +++ b/pkg/replica/replica.go @@ -44,6 +44,7 @@ var ( type Replica struct { sync.RWMutex + ctx context.Context volume diffDisk dir string info Info @@ -130,7 +131,7 @@ func OpenSnapshot(dir string, snapshotName string) (*Replica, error) { } } - r, err := NewReadOnly(dir, snapshotDiskName, backingFile) + r, err := NewReadOnly(context.Background(), dir, snapshotDiskName, backingFile) if err != nil { return nil, err } @@ -143,17 +144,17 @@ func ReadInfo(dir string) (Info, error) { return info, err } -func New(size, sectorSize int64, dir string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, SnapshotMaxSize int64) (*Replica, error) { - return construct(false, size, sectorSize, dir, "", backingFile, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, SnapshotMaxSize) +func New(ctx context.Context, size, sectorSize int64, dir string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, SnapshotMaxSize int64) (*Replica, error) { + return construct(ctx, false, size, sectorSize, dir, "", backingFile, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, SnapshotMaxSize) } -func NewReadOnly(dir, head string, backingFile *backingfile.BackingFile) (*Replica, error) { +func NewReadOnly(ctx context.Context, dir, head string, backingFile *backingfile.BackingFile) (*Replica, error) { // size and sectorSize don't matter because they will be read from metadata // snapshotMaxCount and SnapshotMaxSize don't matter because readonly replica can't create a new disk - return construct(true, 0, diskutil.ReplicaSectorSize, dir, head, backingFile, false, false, 250, 0) + return construct(ctx, true, 0, diskutil.ReplicaSectorSize, dir, head, backingFile, false, false, 250, 0) } -func construct(readonly bool, size, sectorSize int64, dir, head string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) (*Replica, error) { +func construct(ctx context.Context, readonly bool, size, sectorSize int64, dir, head string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) (*Replica, error) { if size%sectorSize != 0 { return nil, fmt.Errorf("size %d not a multiple of sector size %d", size, sectorSize) } @@ -163,6 +164,7 @@ func construct(readonly bool, size, sectorSize int64, dir, head string, backingF } r := &Replica{ + ctx: ctx, dir: dir, activeDiskData: make([]*disk, 1), diskData: make(map[string]*disk), @@ -198,7 +200,7 @@ func construct(readonly bool, size, sectorSize int64, dir, head string, backingF } if !r.revisionCounterDisabled { - if err := r.initRevisionCounter(); err != nil { + if err := r.initRevisionCounter(ctx); err != nil { return nil, err } } @@ -292,7 +294,7 @@ func (r *Replica) SetRebuilding(rebuilding bool) error { } func (r *Replica) Reload() (*Replica, error) { - newReplica, err := New(r.info.Size, r.info.SectorSize, r.dir, r.info.BackingFile, r.revisionCounterDisabled, r.unmapMarkDiskChainRemoved, r.snapshotMaxCount, r.snapshotMaxSize) + newReplica, err := New(r.ctx, r.info.Size, r.info.SectorSize, r.dir, r.info.BackingFile, r.revisionCounterDisabled, r.unmapMarkDiskChainRemoved, r.snapshotMaxCount, r.snapshotMaxSize) if err != nil { return nil, err } diff --git a/pkg/replica/replica_test.go b/pkg/replica/replica_test.go index 315d915c6..67e97fc51 100644 --- a/pkg/replica/replica_test.go +++ b/pkg/replica/replica_test.go @@ -1,6 +1,7 @@ package replica import ( + "context" "crypto/md5" "fmt" "io" @@ -57,7 +58,7 @@ func (s *TestSuite) TestCreate(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() } @@ -73,7 +74,7 @@ func (s *TestSuite) TestSnapshot(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -158,7 +159,7 @@ func (s *TestSuite) TestRevert(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -270,7 +271,7 @@ func (s *TestSuite) TestRemoveLeafNode(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -345,7 +346,7 @@ func (s *TestSuite) TestRemoveLast(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -391,7 +392,7 @@ func (s *TestSuite) TestRemoveMiddle(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -437,7 +438,7 @@ func (s *TestSuite) TestRemoveFirst(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -468,7 +469,7 @@ func (s *TestSuite) TestRemoveOutOfChain(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -544,7 +545,7 @@ func (s *TestSuite) TestPrepareRemove(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -684,7 +685,7 @@ func (s *TestSuite) TestRead(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9*b, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9*b, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -715,7 +716,7 @@ func (s *TestSuite) TestReadBackingFileSmallerThanVolume(c *C) { Disk: f, } - r, err := New(5*b, b, dir, backing, false, false, 250, 0) + r, err := New(context.Background(), 5*b, b, dir, backing, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -796,7 +797,7 @@ func (s *TestSuite) TestWrite(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9*b, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9*b, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -818,7 +819,7 @@ func (s *TestSuite) TestSnapshotReadWrite(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(3*b, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 3*b, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -880,7 +881,7 @@ func (s *TestSuite) TestBackingFile(c *C) { Disk: f, } - r, err := New(3*b, b, dir, backing, false, false, 250, 0) + r, err := New(context.Background(), 3*b, b, dir, backing, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -913,7 +914,7 @@ func (s *TestSuite) partialWriteRead(c *C, totalLength, writeLength, writeOffset buf := make([]byte, totalLength) fill(buf, 3) - r, err := New(totalLength, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), totalLength, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -961,7 +962,7 @@ func (s *TestSuite) testPartialRead(c *C, totalLength int64, readBuf []byte, off buf := make([]byte, totalLength) fill(buf, 3) - r, err := New(totalLength, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), totalLength, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -1032,7 +1033,7 @@ func (s *TestSuite) TestForceRemoveDiffDisk(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -1076,7 +1077,7 @@ func (s *TestSuite) TestUnmapMarkDiskRemoved(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, true, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, true, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -1127,7 +1128,7 @@ func (s *TestSuite) TestUnmap(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(8*b, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 8*b, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() diff --git a/pkg/replica/revision_counter.go b/pkg/replica/revision_counter.go index 7611295dd..dc9514eec 100644 --- a/pkg/replica/revision_counter.go +++ b/pkg/replica/revision_counter.go @@ -1,6 +1,7 @@ package replica import ( + "context" "fmt" "io" "os" @@ -62,7 +63,7 @@ func (r *Replica) openRevisionFile(isCreate bool) error { return err } -func (r *Replica) initRevisionCounter() error { +func (r *Replica) initRevisionCounter(ctx context.Context) error { if r.readOnly { return nil } @@ -96,6 +97,8 @@ func (r *Replica) initRevisionCounter() error { var err error for { select { + case <-ctx.Done(): + return case <-r.revisionCounterReqChan: err = r.writeRevisionCounter(r.revisionCache.Load() + 1) r.revisionCounterAckChan <- err diff --git a/pkg/replica/server.go b/pkg/replica/server.go index a781385d6..c1ce6094d 100644 --- a/pkg/replica/server.go +++ b/pkg/replica/server.go @@ -1,6 +1,7 @@ package replica import ( + "context" "fmt" "os" "sync" @@ -13,6 +14,7 @@ import ( type Server struct { sync.RWMutex + ctx context.Context r *Replica dir string sectorSize int64 @@ -23,8 +25,9 @@ type Server struct { snapshotMaxSize int64 } -func NewServer(dir string, backing *backingfile.BackingFile, sectorSize int64, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) *Server { +func NewServer(ctx context.Context, dir string, backing *backingfile.BackingFile, sectorSize int64, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) *Server { return &Server{ + ctx: ctx, dir: dir, backing: backing, sectorSize: sectorSize, @@ -54,7 +57,7 @@ func (s *Server) Create(size int64) error { sectorSize := s.getSectorSize() logrus.Infof("Creating replica %s, size %d/%d", s.dir, size, sectorSize) - r, err := New(size, sectorSize, s.dir, s.backing, s.revisionCounterDisabled, s.unmapMarkDiskChainRemoved, s.snapshotMaxCount, s.snapshotMaxSize) + r, err := New(s.ctx, size, sectorSize, s.dir, s.backing, s.revisionCounterDisabled, s.unmapMarkDiskChainRemoved, s.snapshotMaxCount, s.snapshotMaxSize) if err != nil { return err } @@ -74,7 +77,7 @@ func (s *Server) Open() error { sectorSize := s.getSectorSize() logrus.Infof("Opening replica: dir %s, size %d, sector size %d", s.dir, info.Size, sectorSize) - r, err := New(info.Size, sectorSize, s.dir, s.backing, s.revisionCounterDisabled, s.unmapMarkDiskChainRemoved, s.snapshotMaxCount, s.snapshotMaxSize) + r, err := New(s.ctx, info.Size, sectorSize, s.dir, s.backing, s.revisionCounterDisabled, s.unmapMarkDiskChainRemoved, s.snapshotMaxCount, s.snapshotMaxSize) if err != nil { return err }