-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
supervisor: fix large opaque data handle #613
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ package supervisor | |
|
||
import ( | ||
"fmt" | ||
"io" | ||
"net" | ||
"os" | ||
"sync" | ||
|
@@ -21,49 +22,44 @@ import ( | |
"github.com/pkg/errors" | ||
|
||
"golang.org/x/net/context" | ||
"golang.org/x/sync/errgroup" | ||
"golang.org/x/sync/semaphore" | ||
"golang.org/x/sys/unix" | ||
) | ||
|
||
const MaxOpaqueLen = 1024 * 32 // Bytes | ||
|
||
// oobSpace is the size of the oob slice required to store for multiple FDs. Note | ||
// that unix.UnixRights appears to make the assumption that fd is always int32, | ||
// so sizeof(fd) = 4. | ||
// At most can accommodate 64 fds | ||
var oobSpace = unix.CmsgSpace(4) * 64 | ||
|
||
type StatesStorage interface { | ||
// Appended write states to the storage space. | ||
Write([]byte) | ||
// Read out the previously written states to fill `buf` which should be large enough. | ||
Read(buf []byte) (uint, error) | ||
// Mark all data as stale, the previously written data is cleaned | ||
// Save state to storage space. | ||
Save([]byte) | ||
// Load state from storage space. | ||
Load() ([]byte, error) | ||
// Clean the previously saved state. | ||
Clean() | ||
} | ||
|
||
// Store daemon states in memory | ||
type MemStatesStorage struct { | ||
data []byte | ||
head int | ||
} | ||
|
||
func newMemStatesStorage() *MemStatesStorage { | ||
return &MemStatesStorage{data: make([]byte, MaxOpaqueLen)} | ||
return &MemStatesStorage{ | ||
data: []byte{}, | ||
} | ||
} | ||
|
||
func (mss *MemStatesStorage) Write(data []byte) { | ||
l := copy(mss.data[mss.head:], data) | ||
mss.head += l | ||
func (mss *MemStatesStorage) Save(data []byte) { | ||
mss.data = make([]byte, len(data)) | ||
copy(mss.data, data) | ||
} | ||
|
||
func (mss *MemStatesStorage) Read(data []byte) (uint, error) { | ||
l := copy(data, mss.data[:mss.head]) | ||
return uint(l), nil | ||
func (mss *MemStatesStorage) Load() ([]byte, error) { | ||
data := make([]byte, len(mss.data)) | ||
copy(data, mss.data) | ||
return data, nil | ||
} | ||
|
||
func (mss *MemStatesStorage) Clean() { | ||
mss.head = 0 | ||
mss.data = []byte{} | ||
} | ||
|
||
// Use daemon ID as the supervisor ID | ||
|
@@ -88,29 +84,97 @@ func (su *Supervisor) save(data []byte, fd int) { | |
if fd > 0 { | ||
su.fd = fd | ||
} | ||
su.dataStorage.Write(data) | ||
su.dataStorage.Save(data) | ||
} | ||
|
||
// Load resources kept by this supervisor | ||
// 1. daemon runtime states | ||
// 2. file descriptor | ||
// | ||
// Note: the resources should be not be consumed. | ||
func (su *Supervisor) load(data []byte, oob []byte) (nData uint, nOob int, err error) { | ||
func (su *Supervisor) load() ([]byte, int, error) { | ||
su.mu.Lock() | ||
defer su.mu.Unlock() | ||
|
||
if su.fd > 0 { | ||
b := syscall.UnixRights(su.fd) | ||
nOob = copy(oob, b) | ||
data, err := su.dataStorage.Load() | ||
if err != nil { | ||
return nil, 0, err | ||
} | ||
|
||
return data, su.fd, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we check if fd is positive? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be ok, but maybe we don't have to limit it. |
||
} | ||
|
||
func recv(uc *net.UnixConn) ([]byte, int, error) { | ||
data := make([]byte, 0) | ||
oob := make([]byte, 0) | ||
|
||
var dataBufLen = 1024 * 256 // Bytes | ||
|
||
// oobSpace is the size of the oob slice required to store for multiple FDs. Note | ||
// that unix.UnixRights appears to make the assumption that fd is always int32, | ||
// so sizeof(fd) = 4. | ||
// At most can accommodate 64 fds | ||
var oobSpace = unix.CmsgSpace(4) * 64 | ||
|
||
for { | ||
dataBuf := make([]byte, dataBufLen) | ||
oobBuf := make([]byte, oobSpace) | ||
|
||
n, oobn, _, _, err := uc.ReadMsgUnix(dataBuf, oobBuf) | ||
if err != nil { | ||
if errors.Is(err, io.EOF) { | ||
break | ||
} | ||
return nil, 0, errors.Wrap(err, "receive message") | ||
} | ||
if n == 0 { | ||
break // EOF | ||
} | ||
|
||
data = append(data, dataBuf[:n]...) | ||
oob = append(oob, oobBuf[:oobn]...) | ||
} | ||
|
||
scms, err := unix.ParseSocketControlMessage(oob) | ||
if err != nil { | ||
return nil, 0, errors.Wrap(err, "parse control message") | ||
} | ||
|
||
var fds []int | ||
if len(scms) == 0 { | ||
return nil, 0, fmt.Errorf("received no control file descriptor") | ||
} | ||
|
||
nData, err = su.dataStorage.Read(data) | ||
scm := scms[0] | ||
fds, err = unix.ParseUnixRights(&scm) | ||
if err != nil { | ||
return 0, 0, err | ||
return nil, 0, errors.Wrap(err, "extract file descriptors") | ||
} | ||
|
||
return nData, nOob, nil | ||
var fd int | ||
if len(fds) > 0 { | ||
fd = fds[0] | ||
} else { | ||
fd = -1 | ||
} | ||
|
||
return data, fd, nil | ||
} | ||
|
||
func send(uc *net.UnixConn, data []byte, fd int) error { | ||
oob := syscall.UnixRights(fd) | ||
|
||
for len(data) > 0 || len(oob) > 0 { | ||
n, oobn, err := uc.WriteMsgUnix(data, oob, nil) | ||
if err != nil { | ||
return errors.Wrapf(err, "send message, datan %d oobn %d", n, oobn) | ||
} | ||
|
||
data = data[n:] | ||
oob = oob[oobn:] | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// There are several stages from different goroutines to trigger sending daemon states | ||
|
@@ -139,52 +203,16 @@ func (su *Supervisor) waitStatesTimeout(to time.Duration) (func() error, error) | |
if err != nil { | ||
return errors.Wrapf(err, "Listener is closed") | ||
} | ||
|
||
defer conn.Close() | ||
|
||
unixConn := conn.(*net.UnixConn) | ||
uf, err := unixConn.File() | ||
data, fd, err := recv(conn.(*net.UnixConn)) | ||
if err != nil { | ||
return err | ||
} | ||
log.L.Infof("Supervisor %s receives states. data %d", su.id, len(data)) | ||
|
||
defer uf.Close() | ||
|
||
data := make([]byte, MaxOpaqueLen) | ||
oob := make([]byte, oobSpace) // Out-of-band data | ||
|
||
// TODO: Handle EAGAIN EOF and EINTR | ||
n, oobn, _, _, err := unix.Recvmsg(int(uf.Fd()), data, oob, 0) | ||
if err != nil { | ||
return errors.Wrap(err, "receive message") | ||
} | ||
|
||
log.L.Infof("Supervisor %s receives states. data %d oob %d", su.id, n, oobn) | ||
|
||
scms, err := unix.ParseSocketControlMessage(oob[:oobn]) | ||
if err != nil { | ||
return errors.Wrap(err, "parse control message") | ||
} | ||
|
||
var fds []int | ||
if len(scms) > 0 { | ||
scm := scms[0] | ||
fds, err = unix.ParseUnixRights(&scm) | ||
if err != nil { | ||
return errors.Wrap(err, "extract file descriptors") | ||
} | ||
} else { | ||
log.L.Warn("received no control file descriptor") | ||
} | ||
su.save(data, fd) | ||
|
||
var fd int | ||
if len(fds) > 0 { | ||
fd = fds[0] | ||
} else { | ||
fd = -1 | ||
} | ||
|
||
su.save(data[:n], fd) | ||
return nil | ||
} | ||
|
||
|
@@ -240,32 +268,18 @@ func (su *Supervisor) SendStatesTimeout(to time.Duration) error { | |
if err != nil { | ||
return errors.Wrapf(err, "Listener is closed") | ||
} | ||
|
||
defer conn.Close() | ||
|
||
unixConn := conn.(*net.UnixConn) | ||
uf, err := unixConn.File() | ||
if err != nil { | ||
return err | ||
} | ||
defer uf.Close() | ||
|
||
data := make([]byte, MaxOpaqueLen) | ||
oob := make([]byte, oobSpace) | ||
|
||
// FIXME: It's possible that sending states happens before storing state to the storage. | ||
|
||
datan, oobn, err := su.load(data, oob) | ||
data, fd, err := su.load() | ||
if err != nil { | ||
return errors.Wrapf(err, "load resources for %s", su.id) | ||
} | ||
// TODO: validate returned length | ||
_, _, err = unixConn.WriteMsgUnix(data[:datan], oob[:oobn], nil) | ||
if err != nil { | ||
return errors.Wrapf(err, "send message, datan %d oobn %d", datan, oobn) | ||
if err := send(conn.(*net.UnixConn), data, fd); err != nil { | ||
return err | ||
} | ||
|
||
log.L.Infof("Supervisor %s sends states. data %d oob %d", su.id, datan, oobn) | ||
log.L.Infof("Supervisor %s sends states. data %d", su.id, len(data)) | ||
|
||
return nil | ||
} | ||
|
@@ -311,13 +325,19 @@ func (su *Supervisor) FetchDaemonStates(trigger func() error) error { | |
return errors.Wrapf(err, "wait states on %s", su.Sock()) | ||
} | ||
|
||
err = trigger() | ||
if err != nil { | ||
eg := errgroup.Group{} | ||
eg.Go(func() error { | ||
err := trigger() | ||
return errors.Wrapf(err, "trigger on %s", su.Sock()) | ||
} | ||
}) | ||
|
||
eg.Go(func() error { | ||
err := receiver() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why |
||
return errors.Wrapf(err, "receiver on %s", su.Sock()) | ||
}) | ||
|
||
// FIXME: With Timeout context! | ||
return receiver() | ||
return eg.Wait() | ||
} | ||
|
||
// The unix domain socket on which nydus daemon is connected to | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without
head
, States storage can't support append-writeThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, we haven't needed append yet. :)