Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: release v0.6.4 #358

Merged
merged 4 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ import (
"time"
)

type connState = int32

const (
defaultZeroCopyTimeoutSec = 60

connStateNone = 0
connStateConnected = 1
connStateDisconnected = 2
)

// connection is the implement of Connection
Expand All @@ -45,9 +51,9 @@ type connection struct {
outputBuffer *LinkBuffer
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
state int32 // 0: not connected, 1: connected, 2: disconnected. Connection state should be changed sequentially.
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
state connState // Connection state should be changed sequentially.
}

var (
Expand Down Expand Up @@ -333,7 +339,7 @@ func (c *connection) init(conn Conn, opts *options) (err error) {
c.bookSize, c.maxSize = defaultLinkBufferSize, defaultLinkBufferSize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(defaultLinkBufferSize), NewLinkBuffer()
c.outputBarrier = barrierPool.Get().(*barrier)
c.state = 0
c.state = connStateNone

c.initNetFD(conn) // conn must be *netFD{}
c.initFDOperator()
Expand Down Expand Up @@ -536,3 +542,15 @@ func (c *connection) waitFlush() (err error) {
return Exception(ErrWriteTimeout, c.remoteAddr.String())
}
}

func (c *connection) getState() connState {
return atomic.LoadInt32(&c.state)
}

func (c *connection) setState(newState connState) {
atomic.StoreInt32(&c.state, newState)
}

func (c *connection) changeState(from, to connState) bool {
return atomic.CompareAndSwapInt32(&c.state, from, to)
}
113 changes: 45 additions & 68 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,43 +134,15 @@ func (c *connection) onPrepare(opts *options) (err error) {
func (c *connection) onConnect() {
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
if onConnect == nil {
atomic.StoreInt32(&c.state, 1)
c.changeState(connStateNone, connStateConnected)
return
}
if !c.lock(connecting) {
// it never happens because onDisconnect will not lock connecting if c.connected == 0
return
}
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
c.onProcess(
// only process when conn active and have unread data
func(c *connection) bool {
// if onConnect not called
if atomic.LoadInt32(&c.state) == 0 {
return true
}
// check for onRequest
return onRequest != nil && c.Reader().Len() > 0
},
func(c *connection) {
if atomic.CompareAndSwapInt32(&c.state, 0, 1) {
c.ctx = onConnect(c.ctx, c)

if !c.IsActive() && atomic.CompareAndSwapInt32(&c.state, 1, 2) {
// since we hold connecting lock, so we should help to call onDisconnect here
var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect)
if onDisconnect != nil {
onDisconnect(c.ctx, c)
}
}
c.unlock(connecting)
return
}
if onRequest != nil {
_ = onRequest(c.ctx, c)
}
},
)
c.onProcess(onConnect, onRequest)
}

// when onDisconnect called, c.IsActive() must return false
Expand All @@ -182,15 +154,16 @@ func (c *connection) onDisconnect() {
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
if onConnect == nil {
// no need lock if onConnect is nil
atomic.StoreInt32(&c.state, 2)
// it's ok to force set state to disconnected since onConnect is nil
c.setState(connStateDisconnected)
onDisconnect(c.ctx, c)
return
}
// check if OnConnect finished when onConnect != nil && onDisconnect != nil
if atomic.LoadInt32(&c.state) > 0 && c.lock(connecting) { // means OnConnect already finished
if c.getState() != connStateNone && c.lock(connecting) { // means OnConnect already finished
// protect onDisconnect run once
// if CAS return false, means OnConnect already helps to run onDisconnect
if atomic.CompareAndSwapInt32(&c.state, 1, 2) {
if c.changeState(connStateConnected, connStateDisconnected) {
onDisconnect(c.ctx, c)
}
c.unlock(connecting)
Expand All @@ -207,63 +180,66 @@ func (c *connection) onRequest() (needTrigger bool) {
return true
}
// wait onConnect finished first
if atomic.LoadInt32(&c.state) == 0 && c.onConnectCallback.Load() != nil {
if c.getState() == connStateNone && c.onConnectCallback.Load() != nil {
// let onConnect to call onRequest
return
}
processed := c.onProcess(
// only process when conn active and have unread data
func(c *connection) bool {
return c.Reader().Len() > 0
},
func(c *connection) {
_ = onRequest(c.ctx, c)
},
)
processed := c.onProcess(nil, onRequest)
// if not processed, should trigger read
return !processed
}

// onProcess is responsible for executing the process function serially,
// and make sure the connection has been closed correctly if user call c.Close() in process function.
func (c *connection) onProcess(isProcessable func(c *connection) bool, process func(c *connection)) (processed bool) {
if process == nil {
return false
}
// onProcess is responsible for executing the onConnect/onRequest function serially,
// and make sure the connection has been closed correctly if user call c.Close() in onConnect/onRequest function.
func (c *connection) onProcess(onConnect OnConnect, onRequest OnRequest) (processed bool) {
// task already exists
if !c.lock(processing) {
return false
}
// add new task
var task = func() {

task := func() {
panicked := true
defer func() {
if !panicked {
return
}
// cannot use recover() here, since we don't want to break the panic stack
if panicked {
c.unlock(processing)
if c.IsActive() {
c.Close()
} else {
c.closeCallback(false, false)
}
c.unlock(processing)
if c.IsActive() {
c.Close()
} else {
c.closeCallback(false, false)
}
}()
// trigger onConnect first
if onConnect != nil && c.changeState(connStateNone, connStateConnected) {
c.ctx = onConnect(c.ctx, c)
if !c.IsActive() && c.changeState(connStateConnected, connStateDisconnected) {
// since we hold connecting lock, so we should help to call onDisconnect here
onDisconnect, _ := c.onDisconnectCallback.Load().(OnDisconnect)
if onDisconnect != nil {
onDisconnect(c.ctx, c)
}
}
c.unlock(connecting)
}
START:
// `process` must be executed at least once if `isProcessable` in order to cover the `send & close by peer` case.
// Then the loop processing must ensure that the connection `IsActive`.
if isProcessable(c) {
process(c)
// The `onRequest` must be executed at least once if conn have any readable data,
// which is in order to cover the `send & close by peer` case.
if onRequest != nil && c.Reader().Len() > 0 {
_ = onRequest(c.ctx, c)
}
// `process` must either eventually read all the input data or actively Close the connection,
// The processing loop must ensure that the connection meets `IsActive`.
// `onRequest` must either eventually read all the input data or actively Close the connection,
// otherwise the goroutine will fall into a dead loop.
var closedBy who
for {
closedBy = c.status(closing)
// close by user or no processable
if closedBy == user || !isProcessable(c) {
// close by user or not processable
if closedBy == user || onRequest == nil || c.Reader().Len() == 0 {
break
}
process(c)
_ = onRequest(c.ctx, c)
}
// handling callback if connection has been closed.
if closedBy != none {
Expand All @@ -288,14 +264,15 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
panicked = false
return
}
// double check isProcessable
if isProcessable(c) && c.lock(processing) {
// double check is processable
if onRequest != nil && c.Reader().Len() > 0 && c.lock(processing) {
goto START
}
// task exits
panicked = false
return
}
// add new task
runTask(c.ctx, task)
return true
}
Expand Down
24 changes: 24 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,30 @@ import (
"time"
)

func BenchmarkConnectionIO(b *testing.B) {
var dataSize = 1024 * 16
var writeBuffer = make([]byte, dataSize)
var rfd, wfd = GetSysFdPairs()
var rconn, wconn = new(connection), new(connection)
rconn.init(&netFD{fd: rfd}, &options{onRequest: func(ctx context.Context, connection Connection) error {
read, _ := connection.Reader().Next(dataSize)
_ = wconn.Reader().Release()
_, _ = connection.Writer().WriteBinary(read)
_ = connection.Writer().Flush()
return nil
}})
wconn.init(&netFD{fd: wfd}, new(options))

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, _ = wconn.WriteBinary(writeBuffer)
_ = wconn.Flush()
_, _ = wconn.Reader().Next(dataSize)
_ = wconn.Reader().Release()
}
}

func TestConnectionWrite(t *testing.T) {
var cycle, caps = 10000, 256
var msg, buf = make([]byte, caps), make([]byte, caps)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module github.com/cloudwego/netpoll
go 1.15

require (
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3
github.com/bytedance/gopkg v0.1.0
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3 h1:ZKUHguI38SRQJkq7hhmwn8lAv3xM6B5qkj1IneS15YY=
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ=
github.com/bytedance/gopkg v0.1.0 h1:aAxB7mm1qms4Wz4sp8e1AtKDOeFLtdqvGiUe7aonRJs=
github.com/bytedance/gopkg v0.1.0/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
6 changes: 2 additions & 4 deletions netpoll_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ package netpoll
import (
"context"
"io"
"time"
)

// global config
var (
defaultLinkBufferSize = pagesize
defaultGracefulShutdownCheckInterval = time.Second
featureAlwaysNoCopyRead = false
defaultLinkBufferSize = pagesize
featureAlwaysNoCopyRead = false
)

// Config expose some tuning parameters to control the internal behaviors of netpoll.
Expand Down
20 changes: 13 additions & 7 deletions netpoll_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,33 @@ func (s *server) Close(ctx context.Context) error {
s.operator.Control(PollDetach)
s.ln.Close()

var ticker = time.NewTicker(defaultGracefulShutdownCheckInterval)
defer ticker.Stop()
var hasConn bool
for {
hasConn = false
activeConn := 0
s.connections.Range(func(key, value interface{}) bool {
var conn, ok = value.(gracefulExit)
if !ok || conn.isIdle() {
value.(Connection).Close()
} else {
activeConn++
}
hasConn = true
return true
})
if !hasConn { // all connections have been closed
if activeConn == 0 { // all connections have been closed
return nil
}

// smart control graceful shutdown check internal
// we should wait for more time if there are more active connections
waitTime := time.Millisecond * time.Duration(activeConn)
if waitTime > time.Second { // max wait time is 1000 ms
waitTime = time.Millisecond * 1000
} else if waitTime < time.Millisecond*50 { // min wait time is 50 ms
waitTime = time.Millisecond * 50
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
case <-time.After(waitTime):
continue
}
}
Expand Down
9 changes: 0 additions & 9 deletions netpoll_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,6 @@ func Assert(t *testing.T, cond bool, val ...interface{}) {
}
}

func TestMain(m *testing.M) {
// defaultGracefulShutdownCheckInterval will affect shutdown function running time,
// so for speed up tests, we change it to 10ms here
oldGracefulShutdownCheckInterval := defaultGracefulShutdownCheckInterval
defaultGracefulShutdownCheckInterval = time.Millisecond * 10
m.Run()
defaultGracefulShutdownCheckInterval = oldGracefulShutdownCheckInterval
}

var testPort int32 = 10000

// getTestAddress return a unique port for every tests, so all tests will not share a same listerner
Expand Down
2 changes: 1 addition & 1 deletion poll_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestPollManagerSetNumLoops(t *testing.T) {
poll := pm.Pick()
newGs := runtime.NumGoroutine()
Assert(t, poll != nil)
Assert(t, newGs-startGs == 1, newGs, startGs)
Assert(t, newGs-startGs >= 1, newGs, startGs)
t.Logf("old=%d, new=%d", startGs, newGs)

// change pollers
Expand Down
2 changes: 1 addition & 1 deletion poll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestPollMod(t *testing.T) {
runtime.Gosched()
}
r, w, h = atomic.LoadInt32(&rn), atomic.LoadInt32(&wn), atomic.LoadInt32(&hn)
Assert(t, r == 0 && w == 1 && h == 0, r, w, h)
Assert(t, r == 0 && w >= 1 && h == 0, r, w, h)

err = p.Control(rop, PollR2RW) // trigger write
MustNil(t, err)
Expand Down
Loading