Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: protect operator detach twice #283

Merged
merged 5 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
name: Push and Pull Request Check

on: [ push ]
on: [ push, pull_request ]

jobs:
compatibility-test:
strategy:
matrix:
go: [ 1.15, "1.20" ]
go: [ 1.15, "1.21" ]
os: [ X64, ARM64 ]
runs-on: ${{ matrix.os }}
steps:
Expand Down
23 changes: 18 additions & 5 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,26 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
if isProcessable(c) {
process(c)
}
for !c.isCloseBy(user) && isProcessable(c) {
// `process` must either eventually read all the input data or actively Close the connection,
// otherwise the goroutine will fall into a dead loop.
var closedBy who
for {
closedBy = c.status(closing)
// close by user or no processable
if closedBy == user || !isProcessable(c) {
break
}
process(c)
}
// Handling callback if connection has been closed.
if !c.IsActive() {
// connection if closed by user when processing, so it needs detach
c.closeCallback(false, true)
if closedBy != none {
// if closed by user when processing, it "may" needs detach
needDetach := closedBy == user
// Here is a conor case that operator will be detached twice:
// If server closed the connection(client OnHup will detach op first and closeBy=poller),
// and then client's OnRequest function also closed the connection(closeBy=user).
// But operator already prevent that detach twice will not cause any problem
c.closeCallback(false, needDetach)
panicked = false
return
}
Expand Down Expand Up @@ -229,7 +242,7 @@ func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) {
if needDetach && c.operator.poll != nil { // If Close is called during OnPrepare, poll is not registered.
// PollDetach only happen when user call conn.Close() or poller detect error
if err := c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: onClose detach operator failed: %v", err)
logger.Printf("NETPOLL: closeCallback[%v,%v] detach operator failed: %v", needLock, needDetach, err)
}
}
var latest = c.closeCallbacks.Load()
Expand Down
9 changes: 5 additions & 4 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func (c *connection) onHup(p Poll) error {
c.triggerRead(Exception(ErrEOF, "peer close"))
c.triggerWrite(Exception(ErrConnClosed, "peer close"))
// It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively.
// It can be confirmed that the OnRequest goroutine has been exited before closecallback executing,
// It can be confirmed that the OnRequest goroutine has been exited before closeCallback executing,
// and it is safe to close the buffer at this time.
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
if onConnect != nil || onRequest != nil {
var onConnect = c.onConnectCallback.Load()
var onRequest = c.onRequestCallback.Load()
var needCloseByUser = onConnect == nil && onRequest == nil
if !needCloseByUser {
// already PollDetach when call OnHup
c.closeCallback(true, false)
}
Expand Down
98 changes: 98 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,101 @@ func TestParallelShortConnection(t *testing.T) {
time.Sleep(time.Millisecond * 100)
}
}

func TestConnectionServerClose(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
MustNil(t, err)
defer ln.Close()

/*
Client Server
- Client --- connect --> Server
- Client <-- [ping] --- Server
- Client --- [pong] --> Server
- Client <-- close --- Server
- Client --- close --> Server
*/
const PING, PONG = "ping", "pong"
var wg sync.WaitGroup
el, err := NewEventLoop(
func(ctx context.Context, connection Connection) error {
t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr())
defer wg.Done()
buf, err := connection.Reader().Next(len(PONG)) // pong
Equal(t, string(buf), PONG)
MustNil(t, err)
err = connection.Reader().Release()
MustNil(t, err)
err = connection.Close()
MustNil(t, err)
return err
},
WithOnConnect(func(ctx context.Context, connection Connection) context.Context {
t.Logf("server.OnConnect: addr=%s", connection.RemoteAddr())
defer wg.Done()
// check OnPrepare
v := ctx.Value("prepare").(string)
Equal(t, v, "true")

_, err := connection.Writer().WriteBinary([]byte(PING))
MustNil(t, err)
err = connection.Writer().Flush()
MustNil(t, err)
connection.AddCloseCallback(func(connection Connection) error {
t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr())
wg.Done()
return nil
})
return ctx
}),
WithOnPrepare(func(connection Connection) context.Context {
t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr())
defer wg.Done()
return context.WithValue(context.Background(), "prepare", "true")
}),
)
defer el.Shutdown(context.Background())
go func() {
err := el.Serve(ln)
if err != nil {
t.Logf("servce end with error: %v", err)
}
}()

var clientOnRequest OnRequest = func(ctx context.Context, connection Connection) error {
t.Logf("client.OnRequest: addr=%s", connection.LocalAddr())
defer wg.Done()
buf, err := connection.Reader().Next(len(PING))
MustNil(t, err)
Equal(t, string(buf), PING)

_, err = connection.Writer().WriteBinary([]byte(PONG))
MustNil(t, err)
err = connection.Writer().Flush()
MustNil(t, err)

_, err = connection.Reader().Next(1) // server will not send any data, just wait for server close
MustTrue(t, errors.Is(err, ErrEOF)) // should get EOF when server close

return connection.Close()
}
conns := 100
// server: OnPrepare, OnConnect, OnRequest, CloseCallback
// client: OnRequest, CloseCallback
wg.Add(conns * 6)
for i := 0; i < conns; i++ {
go func() {
conn, err := DialConnection("tcp", ":12345", time.Second)
MustNil(t, err)
err = conn.SetOnRequest(clientOnRequest)
MustNil(t, err)
conn.AddCloseCallback(func(connection Connection) error {
t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr())
defer wg.Done()
return nil
})
}()
}
//time.Sleep(time.Second)
wg.Wait()
}
7 changes: 7 additions & 0 deletions fd_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,19 @@ type FDOperator struct {
// poll is the registered location of the file descriptor.
poll Poll

// protect only detach once
detached int32
joway marked this conversation as resolved.
Show resolved Hide resolved

// private, used by operatorCache
next *FDOperator
state int32 // CAS: 0(unused) 1(inuse) 2(do-done)
index int32 // index in operatorCache
}

func (op *FDOperator) Control(event PollEvent) error {
if event == PollDetach && atomic.AddInt32(&op.detached, 1) > 1 {
return nil
}
return op.poll.Control(op, event)
}

Expand Down Expand Up @@ -92,4 +98,5 @@ func (op *FDOperator) reset() {
op.Inputs, op.InputAck = nil, nil
op.Outputs, op.OutputAck = nil, nil
op.poll = nil
op.detached = 0
}
8 changes: 4 additions & 4 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,8 @@ func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) {
}
// here will copy
b.growth(n)
malloc := b.write.malloc
b.write.malloc += n
return copy(b.write.buf[malloc:b.write.malloc], p), nil
buf := b.write.Malloc(n)
return copy(buf, p), nil
}

// WriteDirect cannot be mixed with WriteString or WriteBinary functions.
Expand Down Expand Up @@ -578,7 +577,8 @@ func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) {
//
// bookSize: The size of data that can be read at once.
// maxSize: The maximum size of data between two Release(). In some cases, this can
// guarantee all data allocated in one node to reduce copy.
//
// guarantee all data allocated in one node to reduce copy.
func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) {
l := cap(b.write.buf) - b.write.malloc
// grow linkBuffer
Expand Down
8 changes: 4 additions & 4 deletions nocopy_linkbuffer_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,8 @@ func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) {
}
// here will copy
b.growth(n)
malloc := b.write.malloc
b.write.malloc += n
return copy(b.write.buf[malloc:b.write.malloc], p), nil
buf := b.write.Malloc(n)
joway marked this conversation as resolved.
Show resolved Hide resolved
return copy(buf, p), nil
}

// WriteDirect cannot be mixed with WriteString or WriteBinary functions.
Expand Down Expand Up @@ -622,7 +621,8 @@ func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) {
//
// bookSize: The size of data that can be read at once.
// maxSize: The maximum size of data between two Release(). In some cases, this can
// guarantee all data allocated in one node to reduce copy.
//
// guarantee all data allocated in one node to reduce copy.
func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) {
b.Lock()
defer b.Unlock()
Expand Down
Loading