Skip to content

Commit

Permalink
Added multiple connections from controller to replicas using --replic…
Browse files Browse the repository at this point in the history
…a-streams flag
  • Loading branch information
Kampadais committed Mar 26, 2024
1 parent 4b73f1c commit 5397b06
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 29 deletions.
17 changes: 14 additions & 3 deletions app/cmd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func ControllerCmd() cli.Command {
Value: int64(controller.DefaultEngineReplicaTimeout.Seconds()),
Usage: "In seconds. Timeout between engine and replica(s)",
},
cli.IntFlag{
Name: "replica-streams",
Required: false,
Value: 1,
Usage: "Number of concurrent streams to each replica",
},
cli.StringFlag{
Name: "data-server-protocol",
Value: "tcp",
Expand Down Expand Up @@ -145,6 +151,11 @@ func startController(c *cli.Context) error {
engineReplicaTimeout = controller.DetermineEngineReplicaTimeout(engineReplicaTimeout)
iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeout)

replicaStreams := c.Int("replica-streams")
if replicaStreams < 1 {
return errors.New("at least one stream per replica is required")
}

factories := map[string]types.BackendFactory{}
for _, backend := range backends {
switch backend {
Expand All @@ -166,10 +177,10 @@ func startController(c *cli.Context) error {
frontend = f
}

logrus.Infof("Creating volume %v controller with iSCSI target request timeout %v and engine to replica(s) timeout %v",
volumeName, iscsiTargetRequestTimeout, engineReplicaTimeout)
logrus.Infof("Creating volume %v controller with iSCSI target request timeout %v, engine to replica(s) timeout %v, streams per replica %v",
volumeName, iscsiTargetRequestTimeout, engineReplicaTimeout, replicaStreams)
control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter, salvageRequested,
unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeout, types.DataServerProtocol(dataServerProtocol),
unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeout, replicaStreams, types.DataServerProtocol(dataServerProtocol),
fileSyncHTTPClientTimeout, frontendQueues)

// need to wait for Shutdown() completion
Expand Down
5 changes: 2 additions & 3 deletions pkg/backend/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ func New(factories map[string]types.BackendFactory) types.BackendFactory {
}
}

func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration, replicaStreams int) (types.Backend, error) {
parts := strings.SplitN(address, "://", 2)

if len(parts) == 2 {
if factory, ok := d.factories[parts[0]]; ok {
return factory.Create(volumeName, parts[1], dataServerProtocol, engineToReplicaTimeout)
return factory.Create(volumeName, parts[1], dataServerProtocol, engineToReplicaTimeout, replicaStreams)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/backend/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (f *Wrapper) ResetRebuild() error {
return nil
}

func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration, replicaStreams int) (types.Backend, error) {
logrus.Infof("Creating file: %s", address)
file, err := os.OpenFile(address, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
Expand Down
27 changes: 18 additions & 9 deletions pkg/backend/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,8 @@ func (r *Remote) info() (*types.ReplicaInfo, error) {
return replicaClient.GetReplicaInfo(resp.Replica), nil
}

func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration, replicaStreams int) (types.Backend, error) {
logrus.Infof("Connecting to remote: %s (%v)", address, dataServerProtocol)

controlAddress, dataAddress, _, _, err := util.GetAddresses(volumeName, address, dataServerProtocol)
if err != nil {
return nil, err
Expand All @@ -356,19 +355,29 @@ func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.D
return nil, fmt.Errorf("replica must be closed, cannot add in state: %s", replica.State)
}

conn, err := connect(dataServerProtocol, dataAddress)
if err != nil {
return nil, err
}
var clients []*dataconn.Client
for i := 0; i < replicaStreams; i++ {
conn, err := connect(dataServerProtocol, dataAddress)
if err != nil {
return nil, err
}

dataConnClient := dataconn.NewClient(conn, engineToReplicaTimeout)
r.ReaderWriterUnmapperAt = dataConnClient
dataConnClient := dataconn.NewClient(conn, engineToReplicaTimeout)
clients = append(clients, dataConnClient)
}
if replicaStreams == 1 {
r.ReaderWriterUnmapperAt = clients[0]
} else {
r.ReaderWriterUnmapperAt = dataconn.NewMultiClient(clients)
}

if err := r.open(); err != nil {
return nil, err
}

go r.monitorPing(dataConnClient)
for i := 0; i < replicaStreams; i++ {
go r.monitorPing(clients[i])
}

return r, nil
}
Expand Down
22 changes: 19 additions & 3 deletions pkg/controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Controller struct {
isUpgrade bool
iscsiTargetRequestTimeout time.Duration
engineReplicaTimeout time.Duration
replicaStreams int
DataServerProtocol types.DataServerProtocol

isExpanding bool
Expand Down Expand Up @@ -69,7 +70,7 @@ const (
)

func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter, salvageRequested, unmapMarkSnapChainRemoved bool,
iscsiTargetRequestTimeout, engineReplicaTimeout time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout int, frontendQueues int) *Controller {
iscsiTargetRequestTimeout, engineReplicaTimeout time.Duration, replicaStreams int, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout int, frontendQueues int) *Controller {
c := &Controller{
factory: factory,
VolumeName: name,
Expand All @@ -84,6 +85,7 @@ func NewController(name string, factory types.BackendFactory, frontend types.Fro

iscsiTargetRequestTimeout: iscsiTargetRequestTimeout,
engineReplicaTimeout: engineReplicaTimeout,
replicaStreams: replicaStreams,
DataServerProtocol: dataServerProtocol,

fileSyncHTTPClientTimeout: fileSyncHTTPClientTimeout,
Expand Down Expand Up @@ -167,7 +169,7 @@ func (c *Controller) addReplica(address string, snapshotRequired bool, mode type
return err
}

newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout, c.replicaStreams)
if err != nil {
return err
}
Expand Down Expand Up @@ -731,7 +733,7 @@ func (c *Controller) Start(volumeSize, volumeCurrentSize int64, addresses ...str
errorCodes := map[string]codes.Code{}
first := true
for _, address := range addresses {
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout, c.replicaStreams)
if err != nil {
if strings.Contains(err.Error(), "rpc error: code = Unavailable") {
errorCodes[address] = codes.Unavailable
Expand All @@ -740,6 +742,20 @@ func (c *Controller) Start(volumeSize, volumeCurrentSize int64, addresses ...str
continue
}

// If the instance manager crashes during the execution of [this code block](https://github.com/longhorn/longhorn-engine/blob/v1.5.1/pkg/sync/sync.go#L435-L446)
// the volume.meta file will be left with `Rebuilding` set to true. If Longhorn subsequently updates the replica
// as healthy, then the old replica will be removed. In scenarios involving multiple replicas, Longhorn will
// remove the replica with illegal values, thereby allowing rebuilding from other healthy replicas. However, in
// the case of single replicas, we cannot employ the same strategy.
// As a result, we will make a best-effort attempt to reset the `Rebuilding` flag for single replica cases.
// Ref: https://github.com/longhorn/longhorn/issues/6626
if len(addresses) == 1 {
err = newBackend.ResetRebuild()
if err != nil {
logrus.WithError(err).Warnf("Failed to reset invalid rebuild for backend with address %v", address)
}
}

newSize, err := newBackend.Size()
if err != nil {
if strings.Contains(err.Error(), "rpc error: code = Unavailable") {
Expand Down
38 changes: 38 additions & 0 deletions pkg/dataconn/multi_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package dataconn

import (
"sync"
)

type MultiClient struct {
lock sync.Mutex
clients []*Client
next int
}

func NewMultiClient(clients []*Client) *MultiClient {
mc := &MultiClient{
clients: clients,
}
return mc
}

func (mc *MultiClient) getNextClient() *Client {
mc.lock.Lock()
mc.next = (mc.next + 1) % len(mc.clients)
index := mc.next
mc.lock.Unlock()
return mc.clients[index]
}

func (mc *MultiClient) ReadAt(buf []byte, offset int64) (int, error) {
return mc.getNextClient().ReadAt(buf, offset)
}

func (mc *MultiClient) WriteAt(buf []byte, offset int64) (int, error) {
return mc.getNextClient().WriteAt(buf, offset)
}

func (mc *MultiClient) UnmapAt(length uint32, offset int64) (int, error) {
return mc.getNextClient().UnmapAt(length, offset)
}
9 changes: 0 additions & 9 deletions pkg/frontend/ublk/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,23 +226,14 @@ func NewDataProcessorWrapper(rwu types.ReaderWriterUnmapperAt) DataProcessorWrap
}

func (d DataProcessorWrapper) ReadAt(p []byte, off int64) (n int, err error) {
//Kchange
//logrus.Info("ReadAt socket/frontend")
//return len(p), nil
return d.rwu.ReadAt(p, off)
}

func (d DataProcessorWrapper) WriteAt(p []byte, off int64) (n int, err error) {
//Kchange
//logrus.Info("WriteAt socket/frontend")
//return len(p), nil
return d.rwu.WriteAt(p, off)
}

func (d DataProcessorWrapper) UnmapAt(length uint32, off int64) (n int, err error) {
//Kchange
//logrus.Info("UnmapAt socket/frontend")
//return int(length), nil
return d.rwu.UnmapAt(length, off)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type Backend interface {
}

type BackendFactory interface {
Create(volumeName, address string, dataServerProtocol DataServerProtocol, engineReplicaTimeout time.Duration) (Backend, error)
Create(volumeName, address string, dataServerProtocol DataServerProtocol, engineReplicaTimeout time.Duration, replicaStreams int) (Backend, error)
}

type Controller interface {
Expand Down

0 comments on commit 5397b06

Please sign in to comment.