Skip to content

Commit

Permalink
conn: rework of monitor mode
Browse files Browse the repository at this point in the history
* read uevent one by one: because syscalls.MSG_PEEK is blocking, the
  monitor stop signal was never trapped without reading a last uevent.

* Allow to set a limit to the number of uevent evaluated successfully,
  after this limit the monitor mode stop watching new uevent.
  • Loading branch information
pilebones committed Jan 26, 2021
1 parent 24add24 commit a3c2a7a
Showing 1 changed file with 62 additions and 22 deletions.
84 changes: 62 additions & 22 deletions netlink/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type NetlinkConn struct {

type UEventConn struct {
NetlinkConn

// Options
MatchedUEventLimit int // allow to stop monitor mode after X event(s) matched by the matcher
}

// Connect allow to connect to system socket AF_NETLINK with family NETLINK_KOBJECT_UEVENT to
Expand Down Expand Up @@ -55,15 +58,15 @@ func (c *UEventConn) Close() error {
return syscall.Close(c.Fd)
}

// ReadMsg allow to read an entire uevent msg
func (c *UEventConn) ReadMsg() (msg []byte, err error) {
func (c *UEventConn) msgPeek() (int, *[]byte, error) {
var n int

var err error
buf := make([]byte, os.Getpagesize())
for {
// Just read how many bytes are available in the socket
// Warning: syscall.MSG_PEEK is a blocking call
if n, _, err = syscall.Recvfrom(c.Fd, buf, syscall.MSG_PEEK); err != nil {
return
return n, &buf, err
}

// If all message could be store inside the buffer : break
Expand All @@ -74,17 +77,37 @@ func (c *UEventConn) ReadMsg() (msg []byte, err error) {
// Increase size of buffer if not enough
buf = make([]byte, len(buf)+os.Getpagesize())
}
return n, &buf, err
}

// Now read complete data
n, _, err = syscall.Recvfrom(c.Fd, buf, 0)
func (c *UEventConn) msgRead(buf *[]byte) error {
if buf == nil {
return errors.New("empty buffer")
}

n, _, err := syscall.Recvfrom(c.Fd, *buf, 0)
if err != nil {
return
return err
}

// Extract only real data from buffer and return that
msg = buf[:n]
*buf = (*buf)[:n]

return
return nil
}

// ReadMsg allow to read an entire uevent msg
func (c *UEventConn) ReadMsg() (msg []byte, err error) {
// Just read how many bytes are available in the socket
_, buf, err := c.msgPeek()
if err != nil {
return nil, err
}

// Now read complete data
err = c.msgRead(buf)

return *buf, err
}

// ReadMsg allow to read an entire uevent msg
Expand All @@ -100,38 +123,55 @@ func (c *UEventConn) ReadUEvent() (*UEvent, error) {
// Monitor run in background a worker to read netlink msg in loop and notify
// when msg receive inside a queue using channel.
// To be notified with only relevant message, use Matcher.
func (c *UEventConn) Monitor(queue chan UEvent, errors chan error, matcher Matcher) chan struct{} {
func (c *UEventConn) Monitor(queue chan UEvent, errs chan error, matcher Matcher) chan struct{} {
quit := make(chan struct{}, 1)

if matcher != nil {
if err := matcher.Compile(); err != nil {
errors <- fmt.Errorf("Wrong matcher, err: %v", err)
errs <- fmt.Errorf("Wrong matcher, err: %w", err)
quit <- struct{}{}
close(queue)
return quit
}
}

go func() {
loop := true
for loop {
bufToRead := make(chan *[]byte, 1)
count := 0
loop:
for {
select {
case <-quit:
loop = false
break
default:
uevent, err := c.ReadUEvent()
break loop // stop iteration in case of stop signal received
case buf := <-bufToRead: // Read one by one
err := c.msgRead(buf)
if err != nil {
errors <- fmt.Errorf("Unable to parse uevent, err: %v", err)
continue
errs <- fmt.Errorf("Unable to read uevent, err: %w", err)
break loop // stop iteration in case of error
}

uevent, err := ParseUEvent(*buf)
if err != nil {
errs <- fmt.Errorf("Unable to parse uevent, err: %w", err)
continue loop // Drop uevent if not known
}

if matcher != nil {
if !matcher.Evaluate(*uevent) {
continue // Drop uevent if not match
continue loop // Drop uevent if not match
}
}

queue <- *uevent
count++
if c.MatchedUEventLimit > 0 && count >= c.MatchedUEventLimit {
break loop // stop iteration when reach limit of uevent
}
default:
_, buf, err := c.msgPeek()
if err != nil {
errs <- fmt.Errorf("Unable to check available uevent, err: %w", err)
break loop // stop iteration in case of error
}
bufToRead <- buf
}
}
}()
Expand Down

0 comments on commit a3c2a7a

Please sign in to comment.