diff --git a/app/cmd/controller.go b/app/cmd/controller.go index 4bf0784a7..f88f6f146 100644 --- a/app/cmd/controller.go +++ b/app/cmd/controller.go @@ -92,6 +92,12 @@ func ControllerCmd() cli.Command { Name: "snapshot-max-size", Usage: "Maximum total snapshot size in bytes or human readable 42kb, 42mb, 42gb", }, + cli.IntFlag{ + Name: "frontend-queues", + Required: false, + Value: 1, + Usage: "Number of frontend queues , only available in ublk frontend", + }, }, Action: func(c *cli.Context) { if err := startController(c); err != nil { @@ -123,6 +129,7 @@ func startController(c *cli.Context) error { dataServerProtocol := c.String("data-server-protocol") fileSyncHTTPClientTimeout := c.Int("file-sync-http-client-timeout") engineInstanceName := c.GlobalString("engine-instance-name") + frontendQueues := c.Int("frontend-queues") size := c.String("size") if size == "" { @@ -175,7 +182,7 @@ func startController(c *cli.Context) error { var frontend types.Frontend if frontendName != "" { - f, err := controller.NewFrontend(frontendName, iscsiTargetRequestTimeout) + f, err := controller.NewFrontend(frontendName, iscsiTargetRequestTimeout, frontendQueues) if err != nil { return errors.Wrapf(err, "failed to find frontend: %s", frontendName) } @@ -195,7 +202,7 @@ func startController(c *cli.Context) error { control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter, salvageRequested, unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeoutShort, engineReplicaTimeoutLong, types.DataServerProtocol(dataServerProtocol), fileSyncHTTPClientTimeout, - snapshotMaxCount, snapshotMaxSize) + snapshotMaxCount, snapshotMaxSize, frontendQueues) // need to wait for Shutdown() completion control.ShutdownWG.Add(1) diff --git a/package/Dockerfile b/package/Dockerfile index 0b59cab8d..6e60a1195 100644 --- a/package/Dockerfile +++ b/package/Dockerfile @@ -9,6 +9,8 @@ RUN zypper -n addrepo --refresh https://download.opensuse.org/repositories/syste zypper -n addrepo --refresh https://download.opensuse.org/repositories/network:/utilities/SLE_15_SP5/network:utilities.repo && \ zypper --gpg-auto-import-keys ref +RUN zypper -n install autoconf automake libtool gcc-c++ + RUN zypper -n install cmake curl git gcc wget xsltproc docbook-xsl-stylesheets && \ rm -rf /var/cache/zypp/* @@ -30,6 +32,22 @@ RUN cd /usr/src && \ make; \ make install +# Build ubdsrv +ENV UBD_COMMIT_ID 19d3b2133baf1af8ae3a5fe300c962567fb7b0ce +RUN git clone --depth 1 --branch liburing-2.5 https://github.com/axboe/liburing.git /usr/src/liburing && \ + cd /usr/src/liburing && \ + ./configure --cc=gcc --cxx=g++ && \ + make -j$(nproc) && \ + make installgit clone https://github.com/Kampadais/ubdsrv.git && \ + cd ubdsrv && \ + git checkout ${UBD_COMMIT_ID} && \ + export LIBURING_CFLAGS="-I/usr/include/liburing" && \ + export LIBURING_LIBS="-Lusr/lib/pkgconfig -luring"cd /usr/src && \ + ls;autoreconf -i && \ + ./configure && \ + make; \ + make install + # Install grpc_health_probe RUN wget https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.4.28/grpc_health_probe-linux-${ARCH} -O /usr/local/bin/grpc_health_probe && \ chmod +x /usr/local/bin/grpc_health_probe diff --git a/pkg/controller/control.go b/pkg/controller/control.go index 3ea9da0b0..ab8fcc20c 100644 --- a/pkg/controller/control.go +++ b/pkg/controller/control.go @@ -66,6 +66,8 @@ type Controller struct { lastExpansionError string fileSyncHTTPClientTimeout int + + frontendQueues int } const ( @@ -75,7 +77,7 @@ const ( func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter, salvageRequested, unmapMarkSnapChainRemoved bool, iscsiTargetRequestTimeout, engineReplicaTimeoutShort, engineReplicaTimeoutLong time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout, - snapshotMaxCount int, snapshotMaxSize int64) *Controller { + snapshotMaxCount int, snapshotMaxSize int64, frontendQueues int) *Controller { c := &Controller{ factory: factory, VolumeName: name, @@ -95,6 +97,7 @@ func NewController(name string, factory types.BackendFactory, frontend types.Fro DataServerProtocol: dataServerProtocol, fileSyncHTTPClientTimeout: fileSyncHTTPClientTimeout, + frontendQueues: frontendQueues, } c.reset() c.metricsStart() @@ -578,7 +581,7 @@ func (c *Controller) StartFrontend(frontend string) error { } } - f, err := NewFrontend(frontend, c.iscsiTargetRequestTimeout) + f, err := NewFrontend(frontend, c.iscsiTargetRequestTimeout, c.frontendQueues) if err != nil { return errors.Wrapf(err, "failed to find frontend: %s", frontend) } diff --git a/pkg/controller/init_frontend.go b/pkg/controller/init_frontend.go index f17ad25e5..b254c1913 100644 --- a/pkg/controller/init_frontend.go +++ b/pkg/controller/init_frontend.go @@ -8,6 +8,7 @@ import ( "github.com/longhorn/longhorn-engine/pkg/frontend/rest" "github.com/longhorn/longhorn-engine/pkg/frontend/socket" "github.com/longhorn/longhorn-engine/pkg/frontend/tgt" + "github.com/longhorn/longhorn-engine/pkg/frontend/ublk" "github.com/longhorn/longhorn-engine/pkg/types" "github.com/sirupsen/logrus" ) @@ -21,7 +22,7 @@ const ( maxEngineReplicaTimeout = 30 * time.Second ) -func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration) (types.Frontend, error) { +func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration, frontendQueues int) (types.Frontend, error) { switch frontendType { case "rest": return rest.New(), nil @@ -31,6 +32,8 @@ func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration) ( return tgt.New(devtypes.FrontendTGTBlockDev, defaultScsiTimeout, defaultIscsiAbortTimeout, iscsiTargetRequestTimeout), nil case devtypes.FrontendTGTISCSI: return tgt.New(devtypes.FrontendTGTISCSI, defaultScsiTimeout, defaultIscsiAbortTimeout, iscsiTargetRequestTimeout), nil + case "ublk": + return ublk.New(frontendQueues), nil default: return nil, fmt.Errorf("unsupported frontend type: %v", frontendType) } diff --git a/pkg/frontend/ublk/frontend.go b/pkg/frontend/ublk/frontend.go new file mode 100644 index 000000000..0c0887404 --- /dev/null +++ b/pkg/frontend/ublk/frontend.go @@ -0,0 +1,250 @@ +package ublk + +import ( + "encoding/json" + "fmt" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "io" + "net" + "os" + "os/exec" + "path/filepath" + "runtime" + "strconv" + + "github.com/longhorn/longhorn-engine/pkg/dataconn" + "github.com/longhorn/longhorn-engine/pkg/types" +) + +const ( + frontendName = "ublk" + + SocketDirectory = "/var/run" + DevPath = "/dev/longhorn/" +) + +func New(frontendQueues int) *Ublk { + return &Ublk{Queues: frontendQueues} +} + +type Ublk struct { + Volume string + Size int64 + UblkID int + Queues int + QueueDepth int + BlockSize int + DaemonPId int + + isUp bool + socketPath string + socketServer *dataconn.Server +} + +func (u *Ublk) FrontendName() string { + return frontendName +} + +func (u *Ublk) Init(name string, size, sectorSize int64) error { + u.Volume = name + u.Size = size + + return nil +} + +func (u *Ublk) StartUblk() error { + + command := "add" + args := []string{"-t", "longhorn", "-f", u.socketPath, "-s", strconv.FormatInt(u.Size, 10), "-d", "32", "-q", strconv.Itoa(u.Queues)} + + cmd := exec.Command("ublk", append([]string{command}, args...)...) + + output, err := cmd.CombinedOutput() + if err != nil { + logrus.Error("Error starting ublk:", err) + return nil + } + + logrus.Info("ublk started successfully") + + var jsonOutput map[string]interface{} + err = json.Unmarshal(output, &jsonOutput) + + if err != nil { + return err + } + + u.UblkID = int(jsonOutput["dev_id"].(float64)) + u.DaemonPId = int(jsonOutput["daemon_pid"].(float64)) + u.Queues = int(jsonOutput["nr_hw_queues"].(float64)) + u.QueueDepth = int(jsonOutput["queue_depth"].(float64)) + u.BlockSize = int(jsonOutput["block_size"].(float64)) + + u.isUp = true + return nil +} + +func (u *Ublk) Startup(rwu types.ReaderWriterUnmapperAt) error { + if err := u.startSocketServer(rwu); err != nil { + return err + } + go func() { + err := u.StartUblk() + if err != nil { + logrus.Errorf("Failed to start ublk: %v", err) + } + }() + + return nil +} +func (u *Ublk) ShutdownUblk() { + comm := "ublk" + args := []string{"del", strconv.Itoa(u.UblkID)} + + cmd := exec.Command(comm, args...) + logrus.Infof("Running command: %v", cmd.Args) + output, err := cmd.CombinedOutput() + if err != nil { + logrus.Errorf("Error stopping ublk: %v", err) + return + } + logrus.Infof("ublk stopped successfully: %v", string(output)) +} + +func (u *Ublk) Shutdown() error { + _, file, no, ok := runtime.Caller(1) + if ok { + logrus.Infof("\ncalled from %s#%d\n\n", file, no) + } + if u.Volume != "" { + if u.socketServer != nil { + logrus.Infof("Shutting down TGT socket server for %v", u.Volume) + u.socketServer.Stop() + u.socketServer = nil + } + } + u.isUp = false + + go func() { + u.ShutdownUblk() + }() + + return nil +} + +func (u *Ublk) State() types.State { + if u.isUp { + return types.StateUp + } + return types.StateDown +} + +func (u *Ublk) Endpoint() string { + if u.isUp { + return u.GetSocketPath() + } + return "" +} + +func (u *Ublk) GetSocketPath() string { + if u.Volume == "" { + panic("Invalid volume name") + } + return filepath.Join(SocketDirectory, "longhorn-"+u.Volume+".sock") +} + +func (u *Ublk) startSocketServer(rwu types.ReaderWriterUnmapperAt) error { + socketPath := u.GetSocketPath() + if err := os.MkdirAll(filepath.Dir(socketPath), 0700); err != nil { + return errors.Wrapf(err, "cannot create directory %v", filepath.Dir(socketPath)) + } + + if st, err := os.Stat(socketPath); err == nil && !st.IsDir() { + if err := os.Remove(socketPath); err != nil { + return err + } + } + + u.socketPath = socketPath + go func() { + err := u.startSocketServerListen(rwu) + if err != nil { + logrus.Errorf("Failed to start socket server: %v", err) + } + }() + return nil +} + +func (u *Ublk) startSocketServerListen(rwu types.ReaderWriterUnmapperAt) error { + ln, err := net.Listen("unix", u.socketPath) + if err != nil { + return err + } + defer func(ln net.Listener) { + err := ln.Close() + if err != nil { + logrus.WithError(err).Error("Failed to close socket listener") + } + }(ln) + + for { + conn, err := ln.Accept() + if err != nil { + logrus.WithError(err).Error("Failed to accept socket connection") + continue + } + go u.handleServerConnection(conn, rwu) + } +} + +func (u *Ublk) handleServerConnection(c net.Conn, rwu types.ReaderWriterUnmapperAt) { + defer func(c net.Conn) { + err := c.Close() + if err != nil { + logrus.WithError(err).Error("Failed to close socket server connection") + } + }(c) + + server := dataconn.NewServer(c, NewDataProcessorWrapper(rwu)) + logrus.Info("New data socket connection established") + if err := server.Handle(); err != nil && err != io.EOF { + logrus.WithError(err).Errorf("Failed to handle socket server connection") + } else if err == io.EOF { + logrus.Warn("Socket server connection closed") + } +} + +type DataProcessorWrapper struct { + rwu types.ReaderWriterUnmapperAt +} + +func NewDataProcessorWrapper(rwu types.ReaderWriterUnmapperAt) DataProcessorWrapper { + return DataProcessorWrapper{ + rwu: rwu, + } +} + +func (d DataProcessorWrapper) ReadAt(p []byte, off int64) (n int, err error) { + return d.rwu.ReadAt(p, off) +} + +func (d DataProcessorWrapper) WriteAt(p []byte, off int64) (n int, err error) { + return d.rwu.WriteAt(p, off) +} + +func (d DataProcessorWrapper) UnmapAt(length uint32, off int64) (n int, err error) { + return d.rwu.UnmapAt(length, off) +} + +func (d DataProcessorWrapper) PingResponse() error { + return nil +} + +func (u *Ublk) Upgrade(name string, size, sectorSize int64, rwu types.ReaderWriterUnmapperAt) error { + return fmt.Errorf("upgrade is not supported") +} + +func (u *Ublk) Expand(size int64) error { + return fmt.Errorf("expand is not supported") +}