From d33504b87b78c525883857c2fe9c349f06553b95 Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Thu, 25 Apr 2024 13:48:42 -0700 Subject: [PATCH 1/4] replica: Remove unused HTTP server This replica HTTP service was used when the replica gRPC service had not been introduced. Now we neither rely on it for the engine-replica communication, nor use it for debugging. There is no need to retain these code. Signed-off-by: Shuo Wu --- pkg/replica/rest/model.go | 101 ------------------------------------ pkg/replica/rest/replica.go | 22 -------- pkg/replica/rest/router.go | 65 ----------------------- 3 files changed, 188 deletions(-) delete mode 100644 pkg/replica/rest/model.go delete mode 100644 pkg/replica/rest/replica.go delete mode 100644 pkg/replica/rest/router.go diff --git a/pkg/replica/rest/model.go b/pkg/replica/rest/model.go deleted file mode 100644 index 25dd370a8..000000000 --- a/pkg/replica/rest/model.go +++ /dev/null @@ -1,101 +0,0 @@ -package rest - -import ( - "strconv" - - "github.com/rancher/go-rancher/api" - "github.com/rancher/go-rancher/client" - - "github.com/longhorn/longhorn-engine/pkg/replica" - "github.com/longhorn/longhorn-engine/pkg/types" -) - -type Replica struct { - client.Resource - Dirty bool `json:"dirty"` - Rebuilding bool `json:"rebuilding"` - Head string `json:"head"` - Parent string `json:"parent"` - Size string `json:"size"` - SectorSize int64 `json:"sectorSize,string"` - BackingFile string `json:"backingFile"` - State string `json:"state"` - Chain []string `json:"chain"` - Disks map[string]replica.DiskInfo `json:"disks"` - RemainSnapshots int `json:"remainsnapshots"` - RevisionCounter int64 `json:"revisioncounter,string"` - RevisionCounterDisabled bool `json:"revisioncounterdisabled"` -} - -func NewReplica(context *api.ApiContext, state types.ReplicaState, info replica.Info, rep *replica.Replica) *Replica { - r := &Replica{ - Resource: client.Resource{ - Type: "replica", - Id: "1", - Actions: map[string]string{}, - }, - } - - r.State = string(state) - - actions := map[string]bool{} - - switch state { - case types.ReplicaStateInitial: - case types.ReplicaStateOpen: - case types.ReplicaStateClosed: - case types.ReplicaStateDirty: - case types.ReplicaStateRebuilding: - case types.ReplicaStateError: - } - - for action := range actions { - r.Actions[action] = context.UrlBuilder.ActionLink(r.Resource, action) - } - - r.Dirty = info.Dirty - r.Rebuilding = info.Rebuilding - r.Head = info.Head - r.Parent = info.Parent - r.SectorSize = info.SectorSize - r.Size = strconv.FormatInt(info.Size, 10) - r.BackingFile = info.BackingFilePath - - if rep != nil { - r.Chain, _ = rep.DisplayChain() - r.Disks = rep.ListDisks() - r.RemainSnapshots = rep.GetRemainSnapshotCounts() - if !rep.IsRevCounterDisabled() { - r.RevisionCounter = rep.GetRevisionCounter() - r.RevisionCounterDisabled = false - } else { - r.RevisionCounterDisabled = true - } - } - - return r -} - -func NewSchema() *client.Schemas { - schemas := &client.Schemas{} - - schemas.AddType("error", client.ServerApiError{}) - schemas.AddType("apiVersion", client.Resource{}) - schemas.AddType("schema", client.Schema{}) - replica := schemas.AddType("replica", Replica{}) - - replica.ResourceMethods = []string{"GET", "DELETE"} - replica.ResourceActions = map[string]client.Action{} - - return schemas -} - -type Server struct { - s *replica.Server -} - -func NewServer(s *replica.Server) *Server { - return &Server{ - s: s, - } -} diff --git a/pkg/replica/rest/replica.go b/pkg/replica/rest/replica.go deleted file mode 100644 index 98d6bc73f..000000000 --- a/pkg/replica/rest/replica.go +++ /dev/null @@ -1,22 +0,0 @@ -package rest - -import ( - "net/http" - - "github.com/rancher/go-rancher/api" - "github.com/rancher/go-rancher/client" -) - -func (s *Server) ListReplicas(rw http.ResponseWriter, req *http.Request) error { - apiContext := api.GetApiContext(req) - resp := client.GenericCollection{} - resp.Data = append(resp.Data, s.Replica(apiContext)) - - apiContext.Write(&resp) - return nil -} - -func (s *Server) Replica(apiContext *api.ApiContext) *Replica { - state, info := s.s.Status() - return NewReplica(apiContext, state, info, s.s.Replica()) -} diff --git a/pkg/replica/rest/router.go b/pkg/replica/rest/router.go deleted file mode 100644 index 335ddfcc0..000000000 --- a/pkg/replica/rest/router.go +++ /dev/null @@ -1,65 +0,0 @@ -package rest - -import ( - "net/http" - - "github.com/gorilla/mux" - "github.com/rancher/go-rancher/api" - "github.com/rancher/go-rancher/client" - "github.com/sirupsen/logrus" - - // add pprof endpoint - _ "net/http/pprof" -) - -func HandleError(s *client.Schemas, t func(http.ResponseWriter, *http.Request) error) http.Handler { - return api.ApiHandler(s, http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - if err := t(rw, req); err != nil { - apiContext := api.GetApiContext(req) - apiContext.WriteErr(err) - } - })) -} - -func checkAction(s *Server, t func(http.ResponseWriter, *http.Request) error) func(http.ResponseWriter, *http.Request) error { - return func(rw http.ResponseWriter, req *http.Request) error { - replica := s.Replica(api.GetApiContext(req)) - if replica.Actions[req.URL.Query().Get("action")] == "" { - rw.WriteHeader(http.StatusNotFound) - return nil - } - return t(rw, req) - } -} - -func NewRouter(s *Server) *mux.Router { - schemas := NewSchema() - router := mux.NewRouter().StrictSlash(true) - f := HandleError - - router.Methods("GET").Path("/ping").Handler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - if _, err := rw.Write([]byte("pong")); err != nil { - logrus.WithError(err).Warn("Failed to write response") - } - })) - - // API framework routes - router.Methods("GET").Path("/").Handler(api.VersionsHandler(schemas, "v1")) - router.Methods("GET").Path("/v1/schemas").Handler(api.SchemasHandler(schemas)) - router.Methods("GET").Path("/v1/schemas/{id}").Handler(api.SchemaHandler(schemas)) - router.Methods("GET").Path("/v1").Handler(api.VersionHandler(schemas, "v1")) - - // Replicas - router.Methods("GET").Path("/v1/replicas").Handler(f(schemas, s.ListReplicas)) - - // Actions - actions := map[string]func(http.ResponseWriter, *http.Request) error{} - - for name, action := range actions { - router.Methods("POST").Path("/v1/replicas/{id}").Queries("action", name).Handler(f(schemas, checkAction(s, action))) - } - - router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) - - return router -} From f8a8debae5bdf746349f0ce355cc9ceb66a3504b Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Thu, 25 Apr 2024 18:27:48 -0700 Subject: [PATCH 2/4] replica: Improve revision counter by introducing channels Longhorn 8436 Signed-off-by: Shuo Wu --- pkg/replica/replica.go | 23 +++++----- pkg/replica/revision_counter.go | 77 +++++++++++++-------------------- 2 files changed, 41 insertions(+), 59 deletions(-) diff --git a/pkg/replica/replica.go b/pkg/replica/replica.go index c43fb51de..589a08f51 100644 --- a/pkg/replica/replica.go +++ b/pkg/replica/replica.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "github.com/pkg/errors" @@ -53,11 +54,11 @@ type Replica struct { activeDiskData []*disk readOnly bool - revisionLock sync.Mutex - revisionCache int64 + revisionCache atomic.Int64 revisionFile *sparse.DirectFileIoProcessor - revisionRefreshed bool revisionCounterDisabled bool + revisionCounterReqChan chan bool + revisionCounterAckChan chan error unmapMarkDiskChainRemoved bool @@ -168,6 +169,8 @@ func construct(readonly bool, size, sectorSize int64, dir, head string, backingF diskChildrenMap: map[string]map[string]bool{}, readOnly: readonly, revisionCounterDisabled: disableRevCounter, + revisionCounterReqChan: make(chan bool, 1024), // Buffered channel to avoid blocking + revisionCounterAckChan: make(chan error), unmapMarkDiskChainRemoved: unmapMarkDiskChainRemoved, snapshotMaxCount: snapshotMaxCount, snapshotMaxSize: snapshotMaxSize, @@ -1284,12 +1287,11 @@ func (r *Replica) WriteAt(buf []byte, offset int64) (int, error) { } if !r.revisionCounterDisabled { - if err := r.increaseRevisionCounter(); err != nil { - return c, err - } + r.revisionCounterReqChan <- true + err = <-r.revisionCounterAckChan } - return c, nil + return c, err } func (r *Replica) ReadAt(buf []byte, offset int64) (int, error) { @@ -1346,12 +1348,11 @@ func (r *Replica) UnmapAt(length uint32, offset int64) (n int, err error) { } if !r.revisionCounterDisabled { - if err := r.increaseRevisionCounter(); err != nil { - return 0, err - } + r.revisionCounterReqChan <- true + err = <-r.revisionCounterAckChan } - return unmappedSize, nil + return unmappedSize, err } func (r *Replica) ListDisks() map[string]DiskInfo { diff --git a/pkg/replica/revision_counter.go b/pkg/replica/revision_counter.go index e6b682e10..7611295dd 100644 --- a/pkg/replica/revision_counter.go +++ b/pkg/replica/revision_counter.go @@ -8,7 +8,6 @@ import ( "strings" "github.com/pkg/errors" - "github.com/sirupsen/logrus" "github.com/longhorn/sparse-tools/sparse" ) @@ -68,9 +67,6 @@ func (r *Replica) initRevisionCounter() error { return nil } - r.revisionLock.Lock() - defer r.revisionLock.Unlock() - if _, err := os.Stat(r.diskPath(revisionCounterFile)); err != nil { if !os.IsNotExist(err) { return err @@ -90,11 +86,29 @@ func (r *Replica) initRevisionCounter() error { if err != nil { return err } - // Don't use r.revisionCache directly - // r.revisionCache is an internal cache, to avoid read from disk - // every time when counter needs to be updated. - // And it's protected by revisionLock - r.revisionCache = counter + // To avoid read from disk, apply atomic operations against the internal cache r.revisionCache every time counter needs to be updated + r.revisionCache.Swap(counter) + + go func() { + if r.revisionCounterDisabled { + return + } + var err error + for { + select { + case <-r.revisionCounterReqChan: + err = r.writeRevisionCounter(r.revisionCache.Load() + 1) + r.revisionCounterAckChan <- err + if err != nil { + close(r.revisionCounterAckChan) + return + } else { + r.revisionCache.Add(1) + } + } + } + }() + return nil } @@ -103,50 +117,17 @@ func (r *Replica) IsRevCounterDisabled() bool { } func (r *Replica) GetRevisionCounter() int64 { - r.revisionLock.Lock() - defer r.revisionLock.Unlock() - - counter, err := r.readRevisionCounter() - if err != nil { - logrus.WithError(err).Error("Failed to get revision counter") - // -1 will result in the replica to be discarded - return -1 - } - r.revisionCache = counter - return counter + return r.revisionCache.Load() } +// SetRevisionCounter can be invoked only when there is no pending IO. +// Typically, its caller, the engine process, will hold the lock before calling this function. +// And the engine lock holding means all writable replicas finished their IO. +// In other words, the engine lock holding means there is no pending IO. func (r *Replica) SetRevisionCounter(counter int64) error { - r.revisionLock.Lock() - defer r.revisionLock.Unlock() - if err := r.writeRevisionCounter(counter); err != nil { return err } - - r.revisionCache = counter - return nil -} - -func (r *Replica) increaseRevisionCounter() error { - r.revisionLock.Lock() - defer r.revisionLock.Unlock() - - if !r.revisionRefreshed { - counter, err := r.readRevisionCounter() - if err != nil { - return err - } - logrus.Infof("Reloading the revision counter before processing the first write, the current revision cache is %v, the latest revision counter in file is %v", - r.revisionCache, counter) - r.revisionCache = counter - r.revisionRefreshed = true - } - - if err := r.writeRevisionCounter(r.revisionCache + 1); err != nil { - return err - } - - r.revisionCache++ + r.revisionCache.Swap(counter) return nil } From f4a121f8e9edef20c060400736f900895c0b030e Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Thu, 25 Apr 2024 18:28:38 -0700 Subject: [PATCH 3/4] replica: Sending rc increment request with a separate goroutine Longhorn 8436 Signed-off-by: Shuo Wu --- pkg/replica/replica.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/replica/replica.go b/pkg/replica/replica.go index 589a08f51..533c0f443 100644 --- a/pkg/replica/replica.go +++ b/pkg/replica/replica.go @@ -1278,6 +1278,15 @@ func (r *Replica) WriteAt(buf []byte, offset int64) (int, error) { return 0, fmt.Errorf("cannot write on read-only replica") } + // Increase the revision counter optimistically in a separate goroutine since most of the time write operations will succeed. + // Once the write operation fails, the revision counter will be wrongly increased by 1. It means that the revision counter is not accurate. + // Actually, the revision counter is not accurate even without this optimistic increment since we cannot make data write operation and the revision counter increment atomic. + go func() { + if !r.revisionCounterDisabled { + r.revisionCounterReqChan <- true + } + }() + r.RLock() r.info.Dirty = true c, err := r.volume.WriteAt(buf, offset) @@ -1287,7 +1296,6 @@ func (r *Replica) WriteAt(buf []byte, offset int64) (int, error) { } if !r.revisionCounterDisabled { - r.revisionCounterReqChan <- true err = <-r.revisionCounterAckChan } @@ -1311,6 +1319,12 @@ func (r *Replica) UnmapAt(length uint32, offset int64) (n int, err error) { return 0, fmt.Errorf("can not unmap on read-only replica") } + go func() { + if !r.revisionCounterDisabled { + r.revisionCounterReqChan <- true + } + }() + unmappedSize, err := func() (int, error) { r.Lock() defer r.Unlock() @@ -1348,7 +1362,6 @@ func (r *Replica) UnmapAt(length uint32, offset int64) (n int, err error) { } if !r.revisionCounterDisabled { - r.revisionCounterReqChan <- true err = <-r.revisionCounterAckChan } From d52ab67ea7f495f5bc85b315ae236d4772386077 Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Fri, 3 May 2024 23:30:56 -0700 Subject: [PATCH 4/4] replica: Add context for revision counter goroutine Longhorn 8436 Signed-off-by: Shuo Wu --- app/cmd/replica.go | 18 +++++++++++++-- pkg/replica/backup.go | 3 ++- pkg/replica/backup_test.go | 5 +++-- pkg/replica/replica.go | 18 ++++++++------- pkg/replica/replica_test.go | 39 +++++++++++++++++---------------- pkg/replica/revision_counter.go | 5 ++++- pkg/replica/server.go | 9 +++++--- 7 files changed, 61 insertions(+), 36 deletions(-) diff --git a/app/cmd/replica.go b/app/cmd/replica.go index 8eefc36a1..34b46e276 100644 --- a/app/cmd/replica.go +++ b/app/cmd/replica.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "errors" "fmt" "net" @@ -91,7 +92,7 @@ func ReplicaCmd() cli.Command { } } -func startReplica(c *cli.Context) error { +func startReplica(c *cli.Context) (err error) { if c.NArg() != 1 { return errors.New("directory name is required") } @@ -115,7 +116,14 @@ func startReplica(c *cli.Context) error { } } - s := replica.NewServer(dir, backingFile, diskutil.ReplicaSectorSize, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, snapshotMaxSize) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + if err != nil { + cancel() + } + }() + + s := replica.NewServer(ctx, dir, backingFile, diskutil.ReplicaSectorSize, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, snapshotMaxSize) address := c.String("listen") @@ -144,6 +152,8 @@ func startReplica(c *cli.Context) error { resp := make(chan error) go func() { + defer cancel() + listen, err := net.Listen("tcp", controlAddress) if err != nil { logrus.WithError(err).Warnf("Failed to listen %v", controlAddress) @@ -160,6 +170,8 @@ func startReplica(c *cli.Context) error { }() go func() { + defer cancel() + rpcServer := replicarpc.NewDataServer(types.DataServerProtocol(dataServerProtocol), dataAddress, s) logrus.Infof("Listening on data server %s", dataAddress) err := rpcServer.ListenAndServe() @@ -179,6 +191,8 @@ func startReplica(c *cli.Context) error { } go func() { + defer cancel() + cmd := exec.Command(exe, "--volume-name", volumeName, "sync-agent", "--listen", syncAddress, "--replica", controlAddress, "--listen-port-range", diff --git a/pkg/replica/backup.go b/pkg/replica/backup.go index 88c03cc99..520d92681 100644 --- a/pkg/replica/backup.go +++ b/pkg/replica/backup.go @@ -1,6 +1,7 @@ package replica import ( + "context" "fmt" "os" "sync" @@ -93,7 +94,7 @@ func (rb *BackupStatus) OpenSnapshot(snapID, volumeID string) error { if err != nil { return errors.Wrap(err, "cannot get working directory") } - r, err := NewReadOnly(dir, id, rb.backingFile) + r, err := NewReadOnly(context.Background(), dir, id, rb.backingFile) if err != nil { return err } diff --git a/pkg/replica/backup_test.go b/pkg/replica/backup_test.go index ad2e0a1be..a036ce318 100644 --- a/pkg/replica/backup_test.go +++ b/pkg/replica/backup_test.go @@ -1,6 +1,7 @@ package replica import ( + "context" "os" "path" @@ -21,7 +22,7 @@ func (s *TestSuite) TestBackup(c *C) { err = os.Chdir(dir) c.Assert(err, IsNil) - r, err := New(10*mb, bs, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 10*mb, bs, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -87,7 +88,7 @@ func (s *TestSuite) testBackupWithBackups(c *C, backingFile *backingfile.Backing c.Assert(err, IsNil) volume := "test" - r, err := New(10*mb, bs, dir, backingFile, false, false, 250, 0) + r, err := New(context.Background(), 10*mb, bs, dir, backingFile, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() diff --git a/pkg/replica/replica.go b/pkg/replica/replica.go index 533c0f443..815b9bbcd 100644 --- a/pkg/replica/replica.go +++ b/pkg/replica/replica.go @@ -44,6 +44,7 @@ var ( type Replica struct { sync.RWMutex + ctx context.Context volume diffDisk dir string info Info @@ -130,7 +131,7 @@ func OpenSnapshot(dir string, snapshotName string) (*Replica, error) { } } - r, err := NewReadOnly(dir, snapshotDiskName, backingFile) + r, err := NewReadOnly(context.Background(), dir, snapshotDiskName, backingFile) if err != nil { return nil, err } @@ -143,17 +144,17 @@ func ReadInfo(dir string) (Info, error) { return info, err } -func New(size, sectorSize int64, dir string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, SnapshotMaxSize int64) (*Replica, error) { - return construct(false, size, sectorSize, dir, "", backingFile, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, SnapshotMaxSize) +func New(ctx context.Context, size, sectorSize int64, dir string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, SnapshotMaxSize int64) (*Replica, error) { + return construct(ctx, false, size, sectorSize, dir, "", backingFile, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, SnapshotMaxSize) } -func NewReadOnly(dir, head string, backingFile *backingfile.BackingFile) (*Replica, error) { +func NewReadOnly(ctx context.Context, dir, head string, backingFile *backingfile.BackingFile) (*Replica, error) { // size and sectorSize don't matter because they will be read from metadata // snapshotMaxCount and SnapshotMaxSize don't matter because readonly replica can't create a new disk - return construct(true, 0, diskutil.ReplicaSectorSize, dir, head, backingFile, false, false, 250, 0) + return construct(ctx, true, 0, diskutil.ReplicaSectorSize, dir, head, backingFile, false, false, 250, 0) } -func construct(readonly bool, size, sectorSize int64, dir, head string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) (*Replica, error) { +func construct(ctx context.Context, readonly bool, size, sectorSize int64, dir, head string, backingFile *backingfile.BackingFile, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) (*Replica, error) { if size%sectorSize != 0 { return nil, fmt.Errorf("size %d not a multiple of sector size %d", size, sectorSize) } @@ -163,6 +164,7 @@ func construct(readonly bool, size, sectorSize int64, dir, head string, backingF } r := &Replica{ + ctx: ctx, dir: dir, activeDiskData: make([]*disk, 1), diskData: make(map[string]*disk), @@ -198,7 +200,7 @@ func construct(readonly bool, size, sectorSize int64, dir, head string, backingF } if !r.revisionCounterDisabled { - if err := r.initRevisionCounter(); err != nil { + if err := r.initRevisionCounter(ctx); err != nil { return nil, err } } @@ -292,7 +294,7 @@ func (r *Replica) SetRebuilding(rebuilding bool) error { } func (r *Replica) Reload() (*Replica, error) { - newReplica, err := New(r.info.Size, r.info.SectorSize, r.dir, r.info.BackingFile, r.revisionCounterDisabled, r.unmapMarkDiskChainRemoved, r.snapshotMaxCount, r.snapshotMaxSize) + newReplica, err := New(r.ctx, r.info.Size, r.info.SectorSize, r.dir, r.info.BackingFile, r.revisionCounterDisabled, r.unmapMarkDiskChainRemoved, r.snapshotMaxCount, r.snapshotMaxSize) if err != nil { return nil, err } diff --git a/pkg/replica/replica_test.go b/pkg/replica/replica_test.go index 315d915c6..67e97fc51 100644 --- a/pkg/replica/replica_test.go +++ b/pkg/replica/replica_test.go @@ -1,6 +1,7 @@ package replica import ( + "context" "crypto/md5" "fmt" "io" @@ -57,7 +58,7 @@ func (s *TestSuite) TestCreate(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() } @@ -73,7 +74,7 @@ func (s *TestSuite) TestSnapshot(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -158,7 +159,7 @@ func (s *TestSuite) TestRevert(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -270,7 +271,7 @@ func (s *TestSuite) TestRemoveLeafNode(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -345,7 +346,7 @@ func (s *TestSuite) TestRemoveLast(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -391,7 +392,7 @@ func (s *TestSuite) TestRemoveMiddle(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -437,7 +438,7 @@ func (s *TestSuite) TestRemoveFirst(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -468,7 +469,7 @@ func (s *TestSuite) TestRemoveOutOfChain(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -544,7 +545,7 @@ func (s *TestSuite) TestPrepareRemove(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -684,7 +685,7 @@ func (s *TestSuite) TestRead(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9*b, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9*b, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -715,7 +716,7 @@ func (s *TestSuite) TestReadBackingFileSmallerThanVolume(c *C) { Disk: f, } - r, err := New(5*b, b, dir, backing, false, false, 250, 0) + r, err := New(context.Background(), 5*b, b, dir, backing, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -796,7 +797,7 @@ func (s *TestSuite) TestWrite(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9*b, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9*b, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -818,7 +819,7 @@ func (s *TestSuite) TestSnapshotReadWrite(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(3*b, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 3*b, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -880,7 +881,7 @@ func (s *TestSuite) TestBackingFile(c *C) { Disk: f, } - r, err := New(3*b, b, dir, backing, false, false, 250, 0) + r, err := New(context.Background(), 3*b, b, dir, backing, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -913,7 +914,7 @@ func (s *TestSuite) partialWriteRead(c *C, totalLength, writeLength, writeOffset buf := make([]byte, totalLength) fill(buf, 3) - r, err := New(totalLength, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), totalLength, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -961,7 +962,7 @@ func (s *TestSuite) testPartialRead(c *C, totalLength int64, readBuf []byte, off buf := make([]byte, totalLength) fill(buf, 3) - r, err := New(totalLength, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), totalLength, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -1032,7 +1033,7 @@ func (s *TestSuite) TestForceRemoveDiffDisk(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -1076,7 +1077,7 @@ func (s *TestSuite) TestUnmapMarkDiskRemoved(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(9, 3, dir, nil, false, true, 250, 0) + r, err := New(context.Background(), 9, 3, dir, nil, false, true, 250, 0) c.Assert(err, IsNil) defer r.Close() @@ -1127,7 +1128,7 @@ func (s *TestSuite) TestUnmap(c *C) { c.Assert(err, IsNil) defer os.RemoveAll(dir) - r, err := New(8*b, b, dir, nil, false, false, 250, 0) + r, err := New(context.Background(), 8*b, b, dir, nil, false, false, 250, 0) c.Assert(err, IsNil) defer r.Close() diff --git a/pkg/replica/revision_counter.go b/pkg/replica/revision_counter.go index 7611295dd..dc9514eec 100644 --- a/pkg/replica/revision_counter.go +++ b/pkg/replica/revision_counter.go @@ -1,6 +1,7 @@ package replica import ( + "context" "fmt" "io" "os" @@ -62,7 +63,7 @@ func (r *Replica) openRevisionFile(isCreate bool) error { return err } -func (r *Replica) initRevisionCounter() error { +func (r *Replica) initRevisionCounter(ctx context.Context) error { if r.readOnly { return nil } @@ -96,6 +97,8 @@ func (r *Replica) initRevisionCounter() error { var err error for { select { + case <-ctx.Done(): + return case <-r.revisionCounterReqChan: err = r.writeRevisionCounter(r.revisionCache.Load() + 1) r.revisionCounterAckChan <- err diff --git a/pkg/replica/server.go b/pkg/replica/server.go index a781385d6..c1ce6094d 100644 --- a/pkg/replica/server.go +++ b/pkg/replica/server.go @@ -1,6 +1,7 @@ package replica import ( + "context" "fmt" "os" "sync" @@ -13,6 +14,7 @@ import ( type Server struct { sync.RWMutex + ctx context.Context r *Replica dir string sectorSize int64 @@ -23,8 +25,9 @@ type Server struct { snapshotMaxSize int64 } -func NewServer(dir string, backing *backingfile.BackingFile, sectorSize int64, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) *Server { +func NewServer(ctx context.Context, dir string, backing *backingfile.BackingFile, sectorSize int64, disableRevCounter, unmapMarkDiskChainRemoved bool, snapshotMaxCount int, snapshotMaxSize int64) *Server { return &Server{ + ctx: ctx, dir: dir, backing: backing, sectorSize: sectorSize, @@ -54,7 +57,7 @@ func (s *Server) Create(size int64) error { sectorSize := s.getSectorSize() logrus.Infof("Creating replica %s, size %d/%d", s.dir, size, sectorSize) - r, err := New(size, sectorSize, s.dir, s.backing, s.revisionCounterDisabled, s.unmapMarkDiskChainRemoved, s.snapshotMaxCount, s.snapshotMaxSize) + r, err := New(s.ctx, size, sectorSize, s.dir, s.backing, s.revisionCounterDisabled, s.unmapMarkDiskChainRemoved, s.snapshotMaxCount, s.snapshotMaxSize) if err != nil { return err } @@ -74,7 +77,7 @@ func (s *Server) Open() error { sectorSize := s.getSectorSize() logrus.Infof("Opening replica: dir %s, size %d, sector size %d", s.dir, info.Size, sectorSize) - r, err := New(info.Size, sectorSize, s.dir, s.backing, s.revisionCounterDisabled, s.unmapMarkDiskChainRemoved, s.snapshotMaxCount, s.snapshotMaxSize) + r, err := New(s.ctx, info.Size, sectorSize, s.dir, s.backing, s.revisionCounterDisabled, s.unmapMarkDiskChainRemoved, s.snapshotMaxCount, s.snapshotMaxSize) if err != nil { return err }