Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: OnRequest should wait all readable data consumed when sender close connection #278

Merged
merged 3 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: "1.20"

- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
Expand All @@ -66,7 +66,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
uses: github/codeql-action/autobuild@v2

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
Expand All @@ -80,4 +80,4 @@ jobs:
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v2
26 changes: 13 additions & 13 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: Push and Pull Request Check

on: [ push, pull_request ]
on: [ push ]

jobs:
compatibility-test:
Expand All @@ -15,12 +15,12 @@ jobs:
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go }}
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-
- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...
- name: Benchmark
Expand All @@ -33,12 +33,12 @@ jobs:
uses: actions/setup-go@v3
with:
go-version: "1.20"
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-
- name: Build Test
run: go vet -v ./...
style-test:
Expand Down
66 changes: 37 additions & 29 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type connection struct {
operator *FDOperator
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan struct{}
readTrigger chan error
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
Expand Down Expand Up @@ -319,7 +319,7 @@ var barrierPool = sync.Pool{
// init initialize the connection with options
func (c *connection) init(conn Conn, opts *options) (err error) {
// init buffer, barrier, finalizer
c.readTrigger = make(chan struct{}, 1)
c.readTrigger = make(chan error, 1)
c.writeTrigger = make(chan error, 1)
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
Expand Down Expand Up @@ -357,19 +357,12 @@ func (c *connection) initNetFD(conn Conn) {
}

func (c *connection) initFDOperator() {
var op *FDOperator
if c.pd != nil && c.pd.operator != nil {
// reuse operator created at connect step
op = c.pd.operator
} else {
poll := pollmanager.Pick()
op = poll.Alloc()
}
poll := pollmanager.Pick()
op := poll.Alloc()
op.FD = c.fd
op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup
op.Inputs, op.InputAck = c.inputs, c.inputAck
op.Outputs, op.OutputAck = c.outputs, c.outputAck

c.operator = op
}

Expand All @@ -385,9 +378,9 @@ func (c *connection) initFinalizer() {
})
}

func (c *connection) triggerRead() {
func (c *connection) triggerRead(err error) {
select {
case c.readTrigger <- struct{}{}:
case c.readTrigger <- err:
default:
}
}
Expand All @@ -411,10 +404,17 @@ func (c *connection) waitRead(n int) (err error) {
}
// wait full n
for c.inputBuffer.Len() < n {
if !c.IsActive() {
switch c.status(closing) {
case poller:
return Exception(ErrEOF, "wait read")
case user:
return Exception(ErrConnClosed, "wait read")
default:
err = <-c.readTrigger
if err != nil {
return err
}
}
<-c.readTrigger
}
return nil
}
Expand All @@ -429,24 +429,32 @@ func (c *connection) waitReadWithTimeout(n int) (err error) {
}

for c.inputBuffer.Len() < n {
if !c.IsActive() {
// cannot return directly, stop timer before !
switch c.status(closing) {
case poller:
// cannot return directly, stop timer first!
err = Exception(ErrEOF, "wait read")
goto RET
case user:
// cannot return directly, stop timer first!
err = Exception(ErrConnClosed, "wait read")
break
}

select {
case <-c.readTimer.C:
// double check if there is enough data to be read
if c.inputBuffer.Len() >= n {
return nil
goto RET
default:
select {
case <-c.readTimer.C:
// double check if there is enough data to be read
if c.inputBuffer.Len() >= n {
return nil
}
return Exception(ErrReadTimeout, c.remoteAddr.String())
case err = <-c.readTrigger:
if err != nil {
return err
}
continue
}
return Exception(ErrReadTimeout, c.remoteAddr.String())
case <-c.readTrigger:
continue
}
}

RET:
// clean timer.C
if !c.readTimer.Stop() {
<-c.readTimer.C
Expand Down
10 changes: 9 additions & 1 deletion connection_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"sync/atomic"
)

type who int32
type who = int32

const (
none who = iota
Expand Down Expand Up @@ -65,6 +65,14 @@ func (l *locker) isCloseBy(w who) (yes bool) {
return atomic.LoadInt32(&l.keychain[closing]) == int32(w)
}

func (l *locker) status(k key) int32 {
return atomic.LoadInt32(&l.keychain[k])
}

func (l *locker) force(k key, v int32) {
atomic.StoreInt32(&l.keychain[k], v)
}

func (l *locker) lock(k key) (success bool) {
return atomic.CompareAndSwapInt32(&l.keychain[k], 0, 1)
}
Expand Down
17 changes: 2 additions & 15 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
if isProcessable(c) {
process(c)
}
for c.IsActive() && isProcessable(c) {
for !c.isCloseBy(user) && isProcessable(c) {
process(c)
}
// Handling callback if connection has been closed.
Expand Down Expand Up @@ -225,12 +225,6 @@ func (c *connection) closeCallback(needLock bool) (err error) {
if needLock && !c.lock(processing) {
return nil
}
// If Close is called during OnPrepare, poll is not registered.
if c.isCloseBy(user) && c.operator.poll != nil {
if err = c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: closeCallback detach operator failed: %v", err)
}
}
var latest = c.closeCallbacks.Load()
if latest == nil {
return nil
Expand All @@ -243,14 +237,7 @@ func (c *connection) closeCallback(needLock bool) (err error) {

// register only use for connection register into poll.
func (c *connection) register() (err error) {
if c.operator.isUnused() {
// operator is not registered
err = c.operator.Control(PollReadable)
} else {
// operator is already registered
// change event to wait read new data
err = c.operator.Control(PollModReadable)
}
err = c.operator.Control(PollReadable)
if err != nil {
logger.Printf("NETPOLL: connection register failed: %v", err)
c.Close()
Expand Down
46 changes: 29 additions & 17 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,44 @@ import (

// onHup means close by poller.
func (c *connection) onHup(p Poll) error {
if c.closeBy(poller) {
c.triggerRead()
c.triggerWrite(ErrConnClosed)
// It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively.
// It can be confirmed that the OnRequest goroutine has been exited before closecallback executing,
// and it is safe to close the buffer at this time.
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
if onConnect != nil || onRequest != nil {
c.closeCallback(true)
}
if !c.closeBy(poller) {
return nil
}
// already PollDetach when call OnHup
c.triggerRead(Exception(ErrEOF, "peer close"))
c.triggerWrite(Exception(ErrConnClosed, "peer close"))
// It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively.
// It can be confirmed that the OnRequest goroutine has been exited before closecallback executing,
// and it is safe to close the buffer at this time.
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
if onConnect != nil || onRequest != nil {
c.closeCallback(true)
}
return nil
}

// onClose means close by user.
func (c *connection) onClose() error {
if c.closeBy(user) {
c.triggerRead()
c.triggerWrite(ErrConnClosed)
// If Close is called during OnPrepare, poll is not registered.
if c.operator.poll != nil {
if err := c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: onClose detach operator failed: %v", err)
}
}
c.triggerRead(Exception(ErrConnClosed, "self close"))
c.triggerWrite(Exception(ErrConnClosed, "self close"))
c.closeCallback(true)
return nil
}
if c.isCloseBy(poller) {
// Connection with OnRequest of nil
// relies on the user to actively close the connection to recycle resources.

closedByPoller := c.isCloseBy(poller)
// force change closed by user
c.force(closing, user)
jayantxie marked this conversation as resolved.
Show resolved Hide resolved

// If OnRequest is nil, relies on the user to actively close the connection to recycle resources.
if closedByPoller {
c.closeCallback(true)
}
return nil
Expand Down Expand Up @@ -103,7 +115,7 @@ func (c *connection) inputAck(n int) (err error) {
needTrigger = c.onRequest()
}
if needTrigger && length >= int(atomic.LoadInt64(&c.waitReadSize)) {
c.triggerRead()
c.triggerRead(nil)
}
return nil
}
Expand Down
Loading
Loading