Skip to content

Commit

Permalink
replica: Add context for revision counter goroutine
Browse files Browse the repository at this point in the history
Longhorn 8436

Signed-off-by: Shuo Wu <[email protected]>
  • Loading branch information
shuo-wu authored and mergify[bot] committed May 31, 2024
1 parent 866e803 commit 2319858
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 36 deletions.
18 changes: 16 additions & 2 deletions app/cmd/replica.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -91,7 +92,7 @@ func ReplicaCmd() cli.Command {
}
}

func startReplica(c *cli.Context) error {
func startReplica(c *cli.Context) (err error) {
if c.NArg() != 1 {
return errors.New("directory name is required")
}
Expand All @@ -115,7 +116,14 @@ func startReplica(c *cli.Context) error {
}
}

s := replica.NewServer(dir, backingFile, diskutil.ReplicaSectorSize, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, snapshotMaxSize)
ctx, cancel := context.WithCancel(context.Background())
defer func() {
if err != nil {
cancel()
}
}()

s := replica.NewServer(ctx, dir, backingFile, diskutil.ReplicaSectorSize, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, snapshotMaxSize)

address := c.String("listen")

Expand Down Expand Up @@ -144,6 +152,8 @@ func startReplica(c *cli.Context) error {
resp := make(chan error)

go func() {
defer cancel()

listen, err := net.Listen("tcp", controlAddress)
if err != nil {
logrus.WithError(err).Warnf("Failed to listen %v", controlAddress)
Expand All @@ -160,6 +170,8 @@ func startReplica(c *cli.Context) error {
}()

go func() {
defer cancel()

rpcServer := replicarpc.NewDataServer(types.DataServerProtocol(dataServerProtocol), dataAddress, s)
logrus.Infof("Listening on data server %s", dataAddress)
err := rpcServer.ListenAndServe()
Expand All @@ -179,6 +191,8 @@ func startReplica(c *cli.Context) error {
}

go func() {
defer cancel()

cmd := exec.Command(exe, "--volume-name", volumeName, "sync-agent", "--listen", syncAddress,
"--replica", controlAddress,
"--listen-port-range",
Expand Down
3 changes: 2 additions & 1 deletion pkg/replica/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package replica

import (
"context"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -93,7 +94,7 @@ func (rb *BackupStatus) OpenSnapshot(snapID, volumeID string) error {
if err != nil {
return errors.Wrap(err, "cannot get working directory")
}
r, err := NewReadOnly(dir, id, rb.backingFile)
r, err := NewReadOnly(context.Background(), dir, id, rb.backingFile)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/replica/backup_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package replica

import (
"context"
"os"
"path"

Expand All @@ -21,7 +22,7 @@ func (s *TestSuite) TestBackup(c *C) {
err = os.Chdir(dir)
c.Assert(err, IsNil)

r, err := New(10*mb, bs, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 10*mb, bs, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -87,7 +88,7 @@ func (s *TestSuite) testBackupWithBackups(c *C, backingFile *backingfile.Backing
c.Assert(err, IsNil)
volume := "test"

r, err := New(10*mb, bs, dir, backingFile, false, false, 250, 0)
r, err := New(context.Background(), 10*mb, bs, dir, backingFile, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down
18 changes: 10 additions & 8 deletions pkg/replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (

type Replica struct {
sync.RWMutex
ctx context.Context
volume diffDisk
dir string
info Info
Expand Down Expand Up @@ -130,7 +131,7 @@ func OpenSnapshot(dir string, snapshotName string) (*Replica, error) {
}
}

r, err := NewReadOnly(dir, snapshotDiskName, backingFile)
r, err := NewReadOnly(context.Background(), dir, snapshotDiskName, backingFile)
if err != nil {
return nil, err
}
Expand All @@ -143,17 +144,17 @@ func ReadInfo(dir string) (Info, error) {
return info, err
}

func New(size, sectorSize int64, dir string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, SnapshotMaxSize int64) (*Replica, error) {
return construct(false, size, sectorSize, dir, "", backingFile, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, SnapshotMaxSize)
func New(ctx context.Context, size, sectorSize int64, dir string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, SnapshotMaxSize int64) (*Replica, error) {
return construct(ctx, false, size, sectorSize, dir, "", backingFile, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, SnapshotMaxSize)
}

func NewReadOnly(dir, head string, backingFile *backingfile.BackingFile) (*Replica, error) {
func NewReadOnly(ctx context.Context, dir, head string, backingFile *backingfile.BackingFile) (*Replica, error) {
// size and sectorSize don't matter because they will be read from metadata
// snapshotMaxCount and SnapshotMaxSize don't matter because readonly replica can't create a new disk
return construct(true, 0, diskutil.ReplicaSectorSize, dir, head, backingFile, false, false, 250, 0)
return construct(ctx, true, 0, diskutil.ReplicaSectorSize, dir, head, backingFile, false, false, 250, 0)
}

func construct(readonly bool, size, sectorSize int64, dir, head string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) (*Replica, error) {
func construct(ctx context.Context, readonly bool, size, sectorSize int64, dir, head string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) (*Replica, error) {
if size%sectorSize != 0 {
return nil, fmt.Errorf("size %d not a multiple of sector size %d", size, sectorSize)
}
Expand All @@ -163,6 +164,7 @@ func construct(readonly bool, size, sectorSize int64, dir, head string, backingF
}

r := &Replica{
ctx: ctx,
dir: dir,
activeDiskData: make([]*disk, 1),
diskData: make(map[string]*disk),
Expand Down Expand Up @@ -198,7 +200,7 @@ func construct(readonly bool, size, sectorSize int64, dir, head string, backingF
}

if !r.revisionCounterDisabled {
if err := r.initRevisionCounter(); err != nil {
if err := r.initRevisionCounter(ctx); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -292,7 +294,7 @@ func (r *Replica) SetRebuilding(rebuilding bool) error {
}

func (r *Replica) Reload() (*Replica, error) {
newReplica, err := New(r.info.Size, r.info.SectorSize, r.dir, r.info.BackingFile, r.revisionCounterDisabled, r.unmapMarkDiskChainRemoved, r.snapshotMaxCount, r.snapshotMaxSize)
newReplica, err := New(r.ctx, r.info.Size, r.info.SectorSize, r.dir, r.info.BackingFile, r.revisionCounterDisabled, r.unmapMarkDiskChainRemoved, r.snapshotMaxCount, r.snapshotMaxSize)
if err != nil {
return nil, err
}
Expand Down
39 changes: 20 additions & 19 deletions pkg/replica/replica_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package replica

import (
"context"
"crypto/md5"
"fmt"
"io"
Expand Down Expand Up @@ -57,7 +58,7 @@ func (s *TestSuite) TestCreate(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()
}
Expand All @@ -73,7 +74,7 @@ func (s *TestSuite) TestSnapshot(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -158,7 +159,7 @@ func (s *TestSuite) TestRevert(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -270,7 +271,7 @@ func (s *TestSuite) TestRemoveLeafNode(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -345,7 +346,7 @@ func (s *TestSuite) TestRemoveLast(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -391,7 +392,7 @@ func (s *TestSuite) TestRemoveMiddle(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -437,7 +438,7 @@ func (s *TestSuite) TestRemoveFirst(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -468,7 +469,7 @@ func (s *TestSuite) TestRemoveOutOfChain(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -544,7 +545,7 @@ func (s *TestSuite) TestPrepareRemove(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -684,7 +685,7 @@ func (s *TestSuite) TestRead(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9*b, b, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9*b, b, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -715,7 +716,7 @@ func (s *TestSuite) TestReadBackingFileSmallerThanVolume(c *C) {
Disk: f,
}

r, err := New(5*b, b, dir, backing, false, false, 250, 0)
r, err := New(context.Background(), 5*b, b, dir, backing, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -796,7 +797,7 @@ func (s *TestSuite) TestWrite(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9*b, b, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9*b, b, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand All @@ -818,7 +819,7 @@ func (s *TestSuite) TestSnapshotReadWrite(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(3*b, b, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 3*b, b, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -880,7 +881,7 @@ func (s *TestSuite) TestBackingFile(c *C) {
Disk: f,
}

r, err := New(3*b, b, dir, backing, false, false, 250, 0)
r, err := New(context.Background(), 3*b, b, dir, backing, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -913,7 +914,7 @@ func (s *TestSuite) partialWriteRead(c *C, totalLength, writeLength, writeOffset
buf := make([]byte, totalLength)
fill(buf, 3)

r, err := New(totalLength, b, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), totalLength, b, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -961,7 +962,7 @@ func (s *TestSuite) testPartialRead(c *C, totalLength int64, readBuf []byte, off
buf := make([]byte, totalLength)
fill(buf, 3)

r, err := New(totalLength, b, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), totalLength, b, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -1032,7 +1033,7 @@ func (s *TestSuite) TestForceRemoveDiffDisk(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -1076,7 +1077,7 @@ func (s *TestSuite) TestUnmapMarkDiskRemoved(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(9, 3, dir, nil, false, true, 250, 0)
r, err := New(context.Background(), 9, 3, dir, nil, false, true, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down Expand Up @@ -1127,7 +1128,7 @@ func (s *TestSuite) TestUnmap(c *C) {
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

r, err := New(8*b, b, dir, nil, false, false, 250, 0)
r, err := New(context.Background(), 8*b, b, dir, nil, false, false, 250, 0)
c.Assert(err, IsNil)
defer r.Close()

Expand Down
5 changes: 4 additions & 1 deletion pkg/replica/revision_counter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package replica

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (r *Replica) openRevisionFile(isCreate bool) error {
return err
}

func (r *Replica) initRevisionCounter() error {
func (r *Replica) initRevisionCounter(ctx context.Context) error {
if r.readOnly {
return nil
}
Expand Down Expand Up @@ -96,6 +97,8 @@ func (r *Replica) initRevisionCounter() error {
var err error
for {
select {
case <-ctx.Done():
return
case <-r.revisionCounterReqChan:
err = r.writeRevisionCounter(r.revisionCache.Load() + 1)
r.revisionCounterAckChan <- err
Expand Down
Loading

0 comments on commit 2319858

Please sign in to comment.