From 4f4dbcd7ab1ef6e25560539963084998f6d49d2e Mon Sep 17 00:00:00 2001 From: Yan Song Date: Fri, 16 Aug 2024 03:35:16 +0000 Subject: [PATCH] supervisor: fix large opaque data handle When the supervisor needs to process a large amount of opaque data (for example, a nydusd instance has a large number of sub mounts, and the opaque is usually larger than 32kb), a recvmsg / sendmsg system call cannot process all data. The commit fixes the problem with multiple recvmsg/sendmsg calls and updates the test cases. Signed-off-by: Yan Song --- pkg/supervisor/supervisor.go | 202 ++++++++++++++++-------------- pkg/supervisor/supervisor_test.go | 62 +++++---- 2 files changed, 147 insertions(+), 117 deletions(-) diff --git a/pkg/supervisor/supervisor.go b/pkg/supervisor/supervisor.go index 2705c9ee5f..e2b6dfe8f6 100644 --- a/pkg/supervisor/supervisor.go +++ b/pkg/supervisor/supervisor.go @@ -8,6 +8,7 @@ package supervisor import ( "fmt" + "io" "net" "os" "sync" @@ -21,49 +22,44 @@ import ( "github.com/pkg/errors" "golang.org/x/net/context" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "golang.org/x/sys/unix" ) -const MaxOpaqueLen = 1024 * 32 // Bytes - -// oobSpace is the size of the oob slice required to store for multiple FDs. Note -// that unix.UnixRights appears to make the assumption that fd is always int32, -// so sizeof(fd) = 4. -// At most can accommodate 64 fds -var oobSpace = unix.CmsgSpace(4) * 64 - type StatesStorage interface { - // Appended write states to the storage space. - Write([]byte) - // Read out the previously written states to fill `buf` which should be large enough. - Read(buf []byte) (uint, error) - // Mark all data as stale, the previously written data is cleaned + // Save state to storage space. + Save([]byte) + // Load state from storage space. + Load() ([]byte, error) + // Clean the previously saved state. Clean() } // Store daemon states in memory type MemStatesStorage struct { data []byte - head int } func newMemStatesStorage() *MemStatesStorage { - return &MemStatesStorage{data: make([]byte, MaxOpaqueLen)} + return &MemStatesStorage{ + data: []byte{}, + } } -func (mss *MemStatesStorage) Write(data []byte) { - l := copy(mss.data[mss.head:], data) - mss.head += l +func (mss *MemStatesStorage) Save(data []byte) { + mss.data = make([]byte, len(data)) + copy(mss.data, data) } -func (mss *MemStatesStorage) Read(data []byte) (uint, error) { - l := copy(data, mss.data[:mss.head]) - return uint(l), nil +func (mss *MemStatesStorage) Load() ([]byte, error) { + data := make([]byte, len(mss.data)) + copy(data, mss.data) + return data, nil } func (mss *MemStatesStorage) Clean() { - mss.head = 0 + mss.data = []byte{} } // Use daemon ID as the supervisor ID @@ -88,7 +84,7 @@ func (su *Supervisor) save(data []byte, fd int) { if fd > 0 { su.fd = fd } - su.dataStorage.Write(data) + su.dataStorage.Save(data) } // Load resources kept by this supervisor @@ -96,21 +92,89 @@ func (su *Supervisor) save(data []byte, fd int) { // 2. file descriptor // // Note: the resources should be not be consumed. -func (su *Supervisor) load(data []byte, oob []byte) (nData uint, nOob int, err error) { +func (su *Supervisor) load() ([]byte, int, error) { su.mu.Lock() defer su.mu.Unlock() - if su.fd > 0 { - b := syscall.UnixRights(su.fd) - nOob = copy(oob, b) + data, err := su.dataStorage.Load() + if err != nil { + return nil, 0, err + } + + return data, su.fd, nil +} + +func recv(uc *net.UnixConn) ([]byte, int, error) { + data := make([]byte, 0) + oob := make([]byte, 0) + + var dataBufLen = 1024 * 256 // Bytes + + // oobSpace is the size of the oob slice required to store for multiple FDs. Note + // that unix.UnixRights appears to make the assumption that fd is always int32, + // so sizeof(fd) = 4. + // At most can accommodate 64 fds + var oobSpace = unix.CmsgSpace(4) * 64 + + for { + dataBuf := make([]byte, dataBufLen) + oobBuf := make([]byte, oobSpace) + + n, oobn, _, _, err := uc.ReadMsgUnix(dataBuf, oobBuf) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, 0, errors.Wrap(err, "receive message") + } + if n == 0 { + break // EOF + } + + data = append(data, dataBuf[:n]...) + oob = append(oob, oobBuf[:oobn]...) + } + + scms, err := unix.ParseSocketControlMessage(oob) + if err != nil { + return nil, 0, errors.Wrap(err, "parse control message") + } + + var fds []int + if len(scms) == 0 { + return nil, 0, fmt.Errorf("received no control file descriptor") } - nData, err = su.dataStorage.Read(data) + scm := scms[0] + fds, err = unix.ParseUnixRights(&scm) if err != nil { - return 0, 0, err + return nil, 0, errors.Wrap(err, "extract file descriptors") } - return nData, nOob, nil + var fd int + if len(fds) > 0 { + fd = fds[0] + } else { + fd = -1 + } + + return data, fd, nil +} + +func send(uc *net.UnixConn, data []byte, fd int) error { + oob := syscall.UnixRights(fd) + + for len(data) > 0 || len(oob) > 0 { + n, oobn, err := uc.WriteMsgUnix(data, oob, nil) + if err != nil { + return errors.Wrapf(err, "send message, datan %d oobn %d", n, oobn) + } + + data = data[n:] + oob = oob[oobn:] + } + + return nil } // There are several stages from different goroutines to trigger sending daemon states @@ -139,52 +203,16 @@ func (su *Supervisor) waitStatesTimeout(to time.Duration) (func() error, error) if err != nil { return errors.Wrapf(err, "Listener is closed") } - defer conn.Close() - unixConn := conn.(*net.UnixConn) - uf, err := unixConn.File() + data, fd, err := recv(conn.(*net.UnixConn)) if err != nil { return err } + log.L.Infof("Supervisor %s receives states. data %d", su.id, len(data)) - defer uf.Close() - - data := make([]byte, MaxOpaqueLen) - oob := make([]byte, oobSpace) // Out-of-band data - - // TODO: Handle EAGAIN EOF and EINTR - n, oobn, _, _, err := unix.Recvmsg(int(uf.Fd()), data, oob, 0) - if err != nil { - return errors.Wrap(err, "receive message") - } - - log.L.Infof("Supervisor %s receives states. data %d oob %d", su.id, n, oobn) - - scms, err := unix.ParseSocketControlMessage(oob[:oobn]) - if err != nil { - return errors.Wrap(err, "parse control message") - } - - var fds []int - if len(scms) > 0 { - scm := scms[0] - fds, err = unix.ParseUnixRights(&scm) - if err != nil { - return errors.Wrap(err, "extract file descriptors") - } - } else { - log.L.Warn("received no control file descriptor") - } + su.save(data, fd) - var fd int - if len(fds) > 0 { - fd = fds[0] - } else { - fd = -1 - } - - su.save(data[:n], fd) return nil } @@ -240,32 +268,18 @@ func (su *Supervisor) SendStatesTimeout(to time.Duration) error { if err != nil { return errors.Wrapf(err, "Listener is closed") } - defer conn.Close() - unixConn := conn.(*net.UnixConn) - uf, err := unixConn.File() - if err != nil { - return err - } - defer uf.Close() - - data := make([]byte, MaxOpaqueLen) - oob := make([]byte, oobSpace) - // FIXME: It's possible that sending states happens before storing state to the storage. - - datan, oobn, err := su.load(data, oob) + data, fd, err := su.load() if err != nil { return errors.Wrapf(err, "load resources for %s", su.id) } - // TODO: validate returned length - _, _, err = unixConn.WriteMsgUnix(data[:datan], oob[:oobn], nil) - if err != nil { - return errors.Wrapf(err, "send message, datan %d oobn %d", datan, oobn) + if err := send(conn.(*net.UnixConn), data, fd); err != nil { + return err } - log.L.Infof("Supervisor %s sends states. data %d oob %d", su.id, datan, oobn) + log.L.Infof("Supervisor %s sends states. data %d", su.id, len(data)) return nil } @@ -311,13 +325,19 @@ func (su *Supervisor) FetchDaemonStates(trigger func() error) error { return errors.Wrapf(err, "wait states on %s", su.Sock()) } - err = trigger() - if err != nil { + eg := errgroup.Group{} + eg.Go(func() error { + err := trigger() return errors.Wrapf(err, "trigger on %s", su.Sock()) - } + }) + + eg.Go(func() error { + err := receiver() + return errors.Wrapf(err, "receiver on %s", su.Sock()) + }) // FIXME: With Timeout context! - return receiver() + return eg.Wait() } // The unix domain socket on which nydus daemon is connected to diff --git a/pkg/supervisor/supervisor_test.go b/pkg/supervisor/supervisor_test.go index ca35474da1..9993feac4b 100644 --- a/pkg/supervisor/supervisor_test.go +++ b/pkg/supervisor/supervisor_test.go @@ -7,6 +7,7 @@ package supervisor import ( + "crypto/rand" "net" "os" "reflect" @@ -14,7 +15,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "golang.org/x/sys/unix" ) func TestSupervisor(t *testing.T) { @@ -26,49 +26,59 @@ func TestSupervisor(t *testing.T) { }) supervisorSet, err := NewSupervisorSet(rootDir) - assert.Nil(t, err, "%v", err) + assert.Nil(t, err) su1 := supervisorSet.NewSupervisor("su1") assert.NotNil(t, su1) + defer func() { + err = supervisorSet.DestroySupervisor("su1") + assert.NotNil(t, su1) + }() - _, err = su1.waitStatesTimeout(2 * time.Second) - assert.Nil(t, err, "%v", err) sock := su1.Sock() - addr, err := net.ResolveUnixAddr("unix", sock) assert.Nil(t, err) - conn, err := net.DialUnix("unix", nil, addr) - assert.Nil(t, err, "%v", err) - - sentData := []byte("abcde") + // Build a large data to test the multiple recvmsg / sendmsg + // syscalls can handle all the data. + sentData := make([]byte, 1024*1024*2) + _, err = rand.Read(sentData) + assert.Nil(t, err) - sentLen, err := conn.Write(sentData) + tmpFile, err := os.CreateTemp("", "nydus-supervisor-test") assert.Nil(t, err) + defer tmpFile.Close() + defer os.Remove(tmpFile.Name()) - conn.Close() + nydusdSendFd := func() error { + conn, err := net.DialUnix("unix", nil, addr) + assert.Nil(t, err) + defer conn.Close() - // FIXME: Delay for some time until states are stored - time.Sleep(500 * time.Millisecond) + err = send(conn, sentData, int(tmpFile.Fd())) + assert.Nil(t, err) - // Must set length not only capacity - receivedData := make([]byte, 16, 32) - oob := make([]byte, 16, 32) - err = su1.SendStatesTimeout(0) - assert.Nil(t, err, "%v", err) + return nil + } - conn1, err := net.DialUnix("unix", nil, addr) - assert.Nil(t, err, "%v", err) + err = su1.FetchDaemonStates(nydusdSendFd) + assert.NoError(t, err) - f, _ := conn1.File() + nydusdTakeover := func() { + err = su1.SendStatesTimeout(0) + assert.Nil(t, err) - //nolint:dogsled - receivedLen, _, _, _, err := unix.Recvmsg(int(f.Fd()), receivedData, oob, 0) - assert.Nil(t, err) + conn, err := net.DialUnix("unix", nil, addr) + assert.Nil(t, err) + + recvData, _, err := recv(conn) + assert.Nil(t, err) - assert.Equal(t, sentLen, receivedLen) - assert.True(t, reflect.DeepEqual(receivedData[:receivedLen], sentData), "%v", receivedData) + assert.Equal(t, len(sentData), len(recvData)) + assert.True(t, reflect.DeepEqual(recvData, sentData)) + } + nydusdTakeover() } func TestSupervisorTimeout(t *testing.T) {