Skip to content

Commit

Permalink
Add ublk frontend target
Browse files Browse the repository at this point in the history
Signed-off-by: kampadais <[email protected]>
  • Loading branch information
Kampadais committed Oct 9, 2024
1 parent eedf985 commit 4eb4d57
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 5 deletions.
11 changes: 9 additions & 2 deletions app/cmd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func ControllerCmd() cli.Command {
Name: "snapshot-max-size",
Usage: "Maximum total snapshot size in bytes or human readable 42kb, 42mb, 42gb",
},
cli.IntFlag{
Name: "frontend-queues",
Required: false,
Value: 1,
Usage: "Number of frontend queues , only available in ublk frontend",
},
},
Action: func(c *cli.Context) {
if err := startController(c); err != nil {
Expand Down Expand Up @@ -123,6 +129,7 @@ func startController(c *cli.Context) error {
dataServerProtocol := c.String("data-server-protocol")
fileSyncHTTPClientTimeout := c.Int("file-sync-http-client-timeout")
engineInstanceName := c.GlobalString("engine-instance-name")
frontendQueues := c.Int("frontend-queues")

size := c.String("size")
if size == "" {
Expand Down Expand Up @@ -175,7 +182,7 @@ func startController(c *cli.Context) error {

var frontend types.Frontend
if frontendName != "" {
f, err := controller.NewFrontend(frontendName, iscsiTargetRequestTimeout)
f, err := controller.NewFrontend(frontendName, iscsiTargetRequestTimeout, frontendQueues)
if err != nil {
return errors.Wrapf(err, "failed to find frontend: %s", frontendName)
}
Expand All @@ -195,7 +202,7 @@ func startController(c *cli.Context) error {
control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter,
salvageRequested, unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeoutShort,
engineReplicaTimeoutLong, types.DataServerProtocol(dataServerProtocol), fileSyncHTTPClientTimeout,
snapshotMaxCount, snapshotMaxSize)
snapshotMaxCount, snapshotMaxSize, frontendQueues)

// need to wait for Shutdown() completion
control.ShutdownWG.Add(1)
Expand Down
18 changes: 18 additions & 0 deletions package/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ RUN zypper -n addrepo --refresh https://download.opensuse.org/repositories/syste
zypper -n addrepo --refresh https://download.opensuse.org/repositories/network:/utilities/SLE_15_SP5/network:utilities.repo && \
zypper --gpg-auto-import-keys ref

RUN zypper -n install autoconf automake libtool gcc-c++

Check warning on line 12 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L12

Specify version with `zypper install -y <package>=<version>`. (DL3037)

Check warning on line 12 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L12

`zypper clean` missing after zypper use. (DL3036)

RUN zypper -n install cmake curl git gcc wget xsltproc docbook-xsl-stylesheets && \
rm -rf /var/cache/zypp/*

Expand All @@ -30,6 +32,22 @@ RUN cd /usr/src && \
make; \
make install

# Build ubdsrv
ENV UBD_COMMIT_ID 19d3b2133baf1af8ae3a5fe300c962567fb7b0ce
RUN git clone --depth 1 --branch liburing-2.5 https://github.com/axboe/liburing.git /usr/src/liburing && \
cd /usr/src/liburing && \
./configure --cc=gcc --cxx=g++ && \
make -j$(nproc) && \
make installgit clone https://github.com/Kampadais/ubdsrv.git && \
cd ubdsrv && \
git checkout ${UBD_COMMIT_ID} && \
export LIBURING_CFLAGS="-I/usr/include/liburing" && \
export LIBURING_LIBS="-Lusr/lib/pkgconfig -luring"cd /usr/src && \
ls;autoreconf -i && \
./configure && \
make; \
make install

# Install grpc_health_probe
RUN wget https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.4.28/grpc_health_probe-linux-${ARCH} -O /usr/local/bin/grpc_health_probe && \
chmod +x /usr/local/bin/grpc_health_probe
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Controller struct {
lastExpansionError string

fileSyncHTTPClientTimeout int

frontendQueues int
}

const (
Expand All @@ -75,7 +77,7 @@ const (
func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter,
salvageRequested, unmapMarkSnapChainRemoved bool, iscsiTargetRequestTimeout, engineReplicaTimeoutShort,
engineReplicaTimeoutLong time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout,
snapshotMaxCount int, snapshotMaxSize int64) *Controller {
snapshotMaxCount int, snapshotMaxSize int64, frontendQueues int) *Controller {
c := &Controller{
factory: factory,
VolumeName: name,
Expand All @@ -95,6 +97,7 @@ func NewController(name string, factory types.BackendFactory, frontend types.Fro
DataServerProtocol: dataServerProtocol,

fileSyncHTTPClientTimeout: fileSyncHTTPClientTimeout,
frontendQueues: frontendQueues,
}
c.reset()
c.metricsStart()
Expand Down Expand Up @@ -578,7 +581,7 @@ func (c *Controller) StartFrontend(frontend string) error {
}
}

f, err := NewFrontend(frontend, c.iscsiTargetRequestTimeout)
f, err := NewFrontend(frontend, c.iscsiTargetRequestTimeout, c.frontendQueues)
if err != nil {
return errors.Wrapf(err, "failed to find frontend: %s", frontend)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/init_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/longhorn/longhorn-engine/pkg/frontend/rest"
"github.com/longhorn/longhorn-engine/pkg/frontend/socket"
"github.com/longhorn/longhorn-engine/pkg/frontend/tgt"
"github.com/longhorn/longhorn-engine/pkg/frontend/ublk"
"github.com/longhorn/longhorn-engine/pkg/types"
"github.com/sirupsen/logrus"
)
Expand All @@ -21,7 +22,7 @@ const (
maxEngineReplicaTimeout = 30 * time.Second
)

func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration) (types.Frontend, error) {
func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration, frontendQueues int) (types.Frontend, error) {
switch frontendType {
case "rest":
return rest.New(), nil
Expand All @@ -31,6 +32,8 @@ func NewFrontend(frontendType string, iscsiTargetRequestTimeout time.Duration) (
return tgt.New(devtypes.FrontendTGTBlockDev, defaultScsiTimeout, defaultIscsiAbortTimeout, iscsiTargetRequestTimeout), nil
case devtypes.FrontendTGTISCSI:
return tgt.New(devtypes.FrontendTGTISCSI, defaultScsiTimeout, defaultIscsiAbortTimeout, iscsiTargetRequestTimeout), nil
case "ublk":
return ublk.New(frontendQueues), nil
default:
return nil, fmt.Errorf("unsupported frontend type: %v", frontendType)
}
Expand Down
250 changes: 250 additions & 0 deletions pkg/frontend/ublk/frontend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package ublk

import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"net"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"

"github.com/longhorn/longhorn-engine/pkg/dataconn"
"github.com/longhorn/longhorn-engine/pkg/types"
)

const (
frontendName = "ublk"

SocketDirectory = "/var/run"
DevPath = "/dev/longhorn/"
)

func New(frontendQueues int) *Ublk {
return &Ublk{Queues: frontendQueues}
}

type Ublk struct {
Volume string
Size int64
UblkID int
Queues int
QueueDepth int
BlockSize int
DaemonPId int

isUp bool
socketPath string
socketServer *dataconn.Server
}

func (u *Ublk) FrontendName() string {
return frontendName
}

func (u *Ublk) Init(name string, size, sectorSize int64) error {
u.Volume = name
u.Size = size

return nil
}

func (u *Ublk) StartUblk() error {

command := "add"
args := []string{"-t", "longhorn", "-f", u.socketPath, "-s", strconv.FormatInt(u.Size, 10), "-d", "32", "-q", strconv.Itoa(u.Queues)}

cmd := exec.Command("ublk", append([]string{command}, args...)...)

output, err := cmd.CombinedOutput()
if err != nil {
logrus.Error("Error starting ublk:", err)
return nil
}

logrus.Info("ublk started successfully")

var jsonOutput map[string]interface{}
err = json.Unmarshal(output, &jsonOutput)

if err != nil {
return err
}

u.UblkID = int(jsonOutput["dev_id"].(float64))
u.DaemonPId = int(jsonOutput["daemon_pid"].(float64))
u.Queues = int(jsonOutput["nr_hw_queues"].(float64))
u.QueueDepth = int(jsonOutput["queue_depth"].(float64))
u.BlockSize = int(jsonOutput["block_size"].(float64))

u.isUp = true
return nil
}

func (u *Ublk) Startup(rwu types.ReaderWriterUnmapperAt) error {
if err := u.startSocketServer(rwu); err != nil {
return err
}
go func() {
err := u.StartUblk()
if err != nil {
logrus.Errorf("Failed to start ublk: %v", err)
}
}()

return nil
}
func (u *Ublk) ShutdownUblk() {
comm := "ublk"
args := []string{"del", strconv.Itoa(u.UblkID)}

cmd := exec.Command(comm, args...)
logrus.Infof("Running command: %v", cmd.Args)
output, err := cmd.CombinedOutput()
if err != nil {
logrus.Errorf("Error stopping ublk: %v", err)
return
}
logrus.Infof("ublk stopped successfully: %v", string(output))
}

func (u *Ublk) Shutdown() error {
_, file, no, ok := runtime.Caller(1)
if ok {
logrus.Infof("\ncalled from %s#%d\n\n", file, no)
}
if u.Volume != "" {
if u.socketServer != nil {
logrus.Infof("Shutting down TGT socket server for %v", u.Volume)
u.socketServer.Stop()
u.socketServer = nil
}
}
u.isUp = false

go func() {
u.ShutdownUblk()
}()

return nil
}

func (u *Ublk) State() types.State {
if u.isUp {
return types.StateUp
}
return types.StateDown
}

func (u *Ublk) Endpoint() string {
if u.isUp {
return u.GetSocketPath()
}
return ""
}

func (u *Ublk) GetSocketPath() string {
if u.Volume == "" {
panic("Invalid volume name")
}
return filepath.Join(SocketDirectory, "longhorn-"+u.Volume+".sock")
}

func (u *Ublk) startSocketServer(rwu types.ReaderWriterUnmapperAt) error {
socketPath := u.GetSocketPath()
if err := os.MkdirAll(filepath.Dir(socketPath), 0700); err != nil {
return errors.Wrapf(err, "cannot create directory %v", filepath.Dir(socketPath))
}

if st, err := os.Stat(socketPath); err == nil && !st.IsDir() {
if err := os.Remove(socketPath); err != nil {
return err
}
}

u.socketPath = socketPath
go func() {
err := u.startSocketServerListen(rwu)
if err != nil {
logrus.Errorf("Failed to start socket server: %v", err)
}
}()
return nil
}

func (u *Ublk) startSocketServerListen(rwu types.ReaderWriterUnmapperAt) error {
ln, err := net.Listen("unix", u.socketPath)
if err != nil {
return err
}
defer func(ln net.Listener) {
err := ln.Close()
if err != nil {
logrus.WithError(err).Error("Failed to close socket listener")
}
}(ln)

for {
conn, err := ln.Accept()
if err != nil {
logrus.WithError(err).Error("Failed to accept socket connection")
continue
}
go u.handleServerConnection(conn, rwu)
}
}

func (u *Ublk) handleServerConnection(c net.Conn, rwu types.ReaderWriterUnmapperAt) {
defer func(c net.Conn) {
err := c.Close()
if err != nil {
logrus.WithError(err).Error("Failed to close socket server connection")
}
}(c)

server := dataconn.NewServer(c, NewDataProcessorWrapper(rwu))
logrus.Info("New data socket connection established")
if err := server.Handle(); err != nil && err != io.EOF {
logrus.WithError(err).Errorf("Failed to handle socket server connection")
} else if err == io.EOF {
logrus.Warn("Socket server connection closed")
}
}

type DataProcessorWrapper struct {
rwu types.ReaderWriterUnmapperAt
}

func NewDataProcessorWrapper(rwu types.ReaderWriterUnmapperAt) DataProcessorWrapper {
return DataProcessorWrapper{
rwu: rwu,
}
}

func (d DataProcessorWrapper) ReadAt(p []byte, off int64) (n int, err error) {
return d.rwu.ReadAt(p, off)
}

func (d DataProcessorWrapper) WriteAt(p []byte, off int64) (n int, err error) {
return d.rwu.WriteAt(p, off)
}

func (d DataProcessorWrapper) UnmapAt(length uint32, off int64) (n int, err error) {
return d.rwu.UnmapAt(length, off)
}

func (d DataProcessorWrapper) PingResponse() error {
return nil
}

func (u *Ublk) Upgrade(name string, size, sectorSize int64, rwu types.ReaderWriterUnmapperAt) error {
return fmt.Errorf("upgrade is not supported")
}

func (u *Ublk) Expand(size int64) error {
return fmt.Errorf("expand is not supported")
}

0 comments on commit 4eb4d57

Please sign in to comment.