Skip to content

Commit

Permalink
fix: detach race
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Aug 7, 2023
1 parent e69450b commit 52fd5fb
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
13 changes: 10 additions & 3 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
if c.IsActive() {
c.Close()
} else {
c.closeCallback(false)
c.closeCallback(false, false)
}
}
}()
Expand All @@ -200,7 +200,8 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
}
// Handling callback if connection has been closed.
if !c.IsActive() {
c.closeCallback(false)
// connection if closed by user when processing, so it needs detach
c.closeCallback(false, true)
panicked = false
return
}
Expand All @@ -221,10 +222,16 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
// closeCallback .
// It can be confirmed that closeCallback and onRequest will not be executed concurrently.
// If onRequest is still running, it will trigger closeCallback on exit.
func (c *connection) closeCallback(needLock bool) (err error) {
func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) {
if needLock && !c.lock(processing) {
return nil
}
if needDetach && c.operator.poll != nil { // If Close is called during OnPrepare, poll is not registered.
// PollDetach only happen when user call conn.Close() or poller detect error
if err := c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: onClose detach operator failed: %v", err)
}
}
var latest = c.closeCallbacks.Load()
if latest == nil {
return nil
Expand Down
26 changes: 10 additions & 16 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func (c *connection) onHup(p Poll) error {
if !c.closeBy(poller) {
return nil
}
// already PollDetach when call OnHup
c.triggerRead(Exception(ErrEOF, "peer close"))
c.triggerWrite(Exception(ErrConnClosed, "peer close"))
// It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively.
Expand All @@ -37,35 +36,30 @@ func (c *connection) onHup(p Poll) error {
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
if onConnect != nil || onRequest != nil {
c.closeCallback(true)
// already PollDetach when call OnHup
c.closeCallback(true, false)
}
return nil
}

// onClose means close by user.
func (c *connection) onClose() error {
// user code close the connection
if c.closeBy(user) {
// If Close is called during OnPrepare, poll is not registered.
if c.operator.poll != nil {
if err := c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: onClose detach operator failed: %v", err)
}
}
c.triggerRead(Exception(ErrConnClosed, "self close"))
c.triggerWrite(Exception(ErrConnClosed, "self close"))
c.closeCallback(true)
// Detach from poller when processing finished, otherwise it will cause race
c.closeCallback(true, true)
return nil
}

closedByPoller := c.isCloseBy(poller)
// force change closed by user
// closed by poller
// still need to change closing status to `user` since OnProcess should not be processed again
c.force(closing, user)

// If OnRequest is nil, relies on the user to actively close the connection to recycle resources.
if closedByPoller {
c.closeCallback(true)
}
return nil
// user code should actively close the connection to recycle resources.
// poller already detached operator
return c.closeCallback(true, false)
}

// closeBuffer recycle input & output LinkBuffer.
Expand Down

0 comments on commit 52fd5fb

Please sign in to comment.