Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ublk #1067

Closed
wants to merge 1 commit into from
Closed

Ublk #1067

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions app/cmd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func ControllerCmd() cli.Command {
Value: int64(controller.DefaultEngineReplicaTimeout.Seconds()),
Usage: "In seconds. Timeout between engine and replica(s)",
},
cli.IntFlag{
Name: "replica-streams",
Required: false,
Value: 1,
Usage: "Number of concurrent streams to each replica",
},
cli.StringFlag{
Name: "data-server-protocol",
Value: "tcp",
Expand All @@ -83,6 +89,12 @@ func ControllerCmd() cli.Command {
Value: 5,
Usage: "HTTP client timeout for replica file sync server",
},
cli.IntFlag{
Name: "frontend-queues",
Required: false,
Value: 1,
Usage: "Number of frontend queues , only available in ublk frontend",
},
},
Action: func(c *cli.Context) {
if err := startController(c); err != nil {
Expand Down Expand Up @@ -114,6 +126,7 @@ func startController(c *cli.Context) error {
dataServerProtocol := c.String("data-server-protocol")
fileSyncHTTPClientTimeout := c.Int("file-sync-http-client-timeout")
engineInstanceName := c.GlobalString("engine-instance-name")
frontendQueues := c.Int("frontend-queues")

size := c.String("size")
if size == "" {
Expand All @@ -138,6 +151,11 @@ func startController(c *cli.Context) error {
engineReplicaTimeout = controller.DetermineEngineReplicaTimeout(engineReplicaTimeout)
iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeout)

replicaStreams := c.Int("replica-streams")
if replicaStreams < 1 {
return errors.New("at least one stream per replica is required")
}

factories := map[string]types.BackendFactory{}
for _, backend := range backends {
switch backend {
Expand All @@ -152,18 +170,18 @@ func startController(c *cli.Context) error {

var frontend types.Frontend
if frontendName != "" {
f, err := controller.NewFrontend(frontendName, iscsiTargetRequestTimeout)
f, err := controller.NewFrontend(frontendName, iscsiTargetRequestTimeout, frontendQueues)
if err != nil {
return errors.Wrapf(err, "failed to find frontend: %s", frontendName)
}
frontend = f
}

logrus.Infof("Creating volume %v controller with iSCSI target request timeout %v and engine to replica(s) timeout %v",
volumeName, iscsiTargetRequestTimeout, engineReplicaTimeout)
logrus.Infof("Creating volume %v controller with iSCSI target request timeout %v, engine to replica(s) timeout %v, streams per replica %v",
volumeName, iscsiTargetRequestTimeout, engineReplicaTimeout, replicaStreams)
control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter, salvageRequested,
unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeout, types.DataServerProtocol(dataServerProtocol),
fileSyncHTTPClientTimeout)
unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeout, replicaStreams, types.DataServerProtocol(dataServerProtocol),
fileSyncHTTPClientTimeout, frontendQueues)

// need to wait for Shutdown() completion
control.ShutdownWG.Add(1)
Expand Down
18 changes: 18 additions & 0 deletions package/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
zypper -n addrepo --refresh https://download.opensuse.org/repositories/network:/utilities/SLE_15_SP5/network:utilities.repo && \
zypper --gpg-auto-import-keys ref

RUN zypper -n install autoconf automake libtool gcc-c++

Check warning on line 9 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L9

Specify version with `zypper install -y <package>=<version>`. (DL3037)

Check warning on line 9 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L9

`zypper clean` missing after zypper use. (DL3036)

RUN zypper -n install cmake curl git gcc wget xsltproc docbook-xsl-stylesheets && \
rm -rf /var/cache/zypp/*

Expand All @@ -27,6 +29,22 @@
make; \
make install

# Build ubdsrv
ENV UBD_COMMIT_ID 19d3b2133baf1af8ae3a5fe300c962567fb7b0ce
RUN git clone --depth 1 --branch liburing-2.5 https://github.com/axboe/liburing.git /usr/src/liburing && \
cd /usr/src/liburing && \
./configure --cc=gcc --cxx=g++ && \
make -j$(nproc) && \
make installgit clone https://github.com/Kampadais/ubdsrv.git && \
cd ubdsrv && \
git checkout ${UBD_COMMIT_ID} && \
export LIBURING_CFLAGS="-I/usr/include/liburing" && \
export LIBURING_LIBS="-Lusr/lib/pkgconfig -luring"cd /usr/src && \
ls;autoreconf -i && \
./configure && \
make; \
make install

# Install grpc_health_probe
RUN wget https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.4.21/grpc_health_probe-linux-${ARCH} -O /usr/local/bin/grpc_health_probe && \
chmod +x /usr/local/bin/grpc_health_probe
Expand Down
5 changes: 2 additions & 3 deletions pkg/backend/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ func New(factories map[string]types.BackendFactory) types.BackendFactory {
}
}

func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration, replicaStreams int) (types.Backend, error) {
parts := strings.SplitN(address, "://", 2)

if len(parts) == 2 {
if factory, ok := d.factories[parts[0]]; ok {
return factory.Create(volumeName, parts[1], dataServerProtocol, engineToReplicaTimeout)
return factory.Create(volumeName, parts[1], dataServerProtocol, engineToReplicaTimeout, replicaStreams)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/backend/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (f *Wrapper) ResetRebuild() error {
return nil
}

func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration, replicaStreams int) (types.Backend, error) {
logrus.Infof("Creating file: %s", address)
file, err := os.OpenFile(address, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
Expand Down
27 changes: 18 additions & 9 deletions pkg/backend/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,8 @@ func (r *Remote) info() (*types.ReplicaInfo, error) {
return replicaClient.GetReplicaInfo(resp.Replica), nil
}

func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration, replicaStreams int) (types.Backend, error) {
logrus.Infof("Connecting to remote: %s (%v)", address, dataServerProtocol)

controlAddress, dataAddress, _, _, err := util.GetAddresses(volumeName, address, dataServerProtocol)
if err != nil {
return nil, err
Expand All @@ -356,19 +355,29 @@ func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.D
return nil, fmt.Errorf("replica must be closed, cannot add in state: %s", replica.State)
}

conn, err := connect(dataServerProtocol, dataAddress)
if err != nil {
return nil, err
}
var clients []*dataconn.Client
for i := 0; i < replicaStreams; i++ {
conn, err := connect(dataServerProtocol, dataAddress)
if err != nil {
return nil, err
}

dataConnClient := dataconn.NewClient(conn, engineToReplicaTimeout)
r.ReaderWriterUnmapperAt = dataConnClient
dataConnClient := dataconn.NewClient(conn, engineToReplicaTimeout)
clients = append(clients, dataConnClient)
}
if replicaStreams == 1 {
r.ReaderWriterUnmapperAt = clients[0]
} else {
r.ReaderWriterUnmapperAt = dataconn.NewMultiClient(clients)
}

if err := r.open(); err != nil {
return nil, err
}

go r.monitorPing(dataConnClient)
for i := 0; i < replicaStreams; i++ {
go r.monitorPing(clients[i])
}

return r, nil
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Controller struct {
isUpgrade bool
iscsiTargetRequestTimeout time.Duration
engineReplicaTimeout time.Duration
replicaStreams int
DataServerProtocol types.DataServerProtocol

isExpanding bool
Expand All @@ -58,6 +59,8 @@ type Controller struct {
lastExpansionError string

fileSyncHTTPClientTimeout int

frontendQueues int
}

const (
Expand All @@ -67,7 +70,7 @@ const (
)

func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter, salvageRequested, unmapMarkSnapChainRemoved bool,
iscsiTargetRequestTimeout, engineReplicaTimeout time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout int) *Controller {
iscsiTargetRequestTimeout, engineReplicaTimeout time.Duration, replicaStreams int, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout int, frontendQueues int) *Controller {
c := &Controller{
factory: factory,
VolumeName: name,
Expand All @@ -82,9 +85,11 @@ func NewController(name string, factory types.BackendFactory, frontend types.Fro

iscsiTargetRequestTimeout: iscsiTargetRequestTimeout,
engineReplicaTimeout: engineReplicaTimeout,
replicaStreams: replicaStreams,
DataServerProtocol: dataServerProtocol,

fileSyncHTTPClientTimeout: fileSyncHTTPClientTimeout,
frontendQueues: frontendQueues,
}
c.reset()
c.metricsStart()
Expand Down Expand Up @@ -164,7 +169,7 @@ func (c *Controller) addReplica(address string, snapshotRequired bool, mode type
return err
}

newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout, c.replicaStreams)
if err != nil {
return err
}
Expand Down Expand Up @@ -484,7 +489,7 @@ func (c *Controller) StartFrontend(frontend string) error {
}
}

f, err := NewFrontend(frontend, c.iscsiTargetRequestTimeout)
f, err := NewFrontend(frontend, c.iscsiTargetRequestTimeout, c.frontendQueues)
if err != nil {
return errors.Wrapf(err, "failed to find frontend: %s", frontend)
}
Expand Down Expand Up @@ -728,7 +733,7 @@ func (c *Controller) Start(volumeSize, volumeCurrentSize int64, addresses ...str
errorCodes := map[string]codes.Code{}
first := true
for _, address := range addresses {
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout, c.replicaStreams)
if err != nil {
if strings.Contains(err.Error(), "rpc error: code = Unavailable") {
errorCodes[address] = codes.Unavailable
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/init_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/longhorn/longhorn-engine/pkg/frontend/rest"
"github.com/longhorn/longhorn-engine/pkg/frontend/socket"
"github.com/longhorn/longhorn-engine/pkg/frontend/tgt"
"github.com/longhorn/longhorn-engine/pkg/frontend/ublk"
"github.com/longhorn/longhorn-engine/pkg/types"
"github.com/sirupsen/logrus"
)
Expand All @@ -21,7 +22,7 @@ const (
maxEngineReplicaTimeout = 30 * time.Second
)

func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration) (types.Frontend, error) {
func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration, frontendQueues int) (types.Frontend, error) {
switch frontendType {
case "rest":
return rest.New(), nil
Expand All @@ -31,6 +32,8 @@ func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration) (
return tgt.New(devtypes.FrontendTGTBlockDev, defaultScsiTimeout, defaultIscsiAbortTimeout, iscsiTargetRequestTimeout), nil
case devtypes.FrontendTGTISCSI:
return tgt.New(devtypes.FrontendTGTISCSI, defaultScsiTimeout, defaultIscsiAbortTimeout, iscsiTargetRequestTimeout), nil
case "ublk":
return ublk.New(frontendQueues), nil
default:
return nil, fmt.Errorf("unsupported frontend type: %v", frontendType)
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/dataconn/multi_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package dataconn

import (
"sync"
)

type MultiClient struct {
lock sync.Mutex
clients []*Client
next int
}

func NewMultiClient(clients []*Client) *MultiClient {
mc := &MultiClient{
clients: clients,
}
return mc
}

func (mc *MultiClient) getNextClient() *Client {
mc.lock.Lock()
mc.next = (mc.next + 1) % len(mc.clients)
index := mc.next
mc.lock.Unlock()
return mc.clients[index]
}

func (mc *MultiClient) ReadAt(buf []byte, offset int64) (int, error) {
return mc.getNextClient().ReadAt(buf, offset)
}

func (mc *MultiClient) WriteAt(buf []byte, offset int64) (int, error) {
return mc.getNextClient().WriteAt(buf, offset)
}

func (mc *MultiClient) UnmapAt(length uint32, offset int64) (int, error) {
return mc.getNextClient().UnmapAt(length, offset)
}
Loading