diff --git a/netpoll.go b/netpoll.go deleted file mode 100644 index 7f53f2a2..00000000 --- a/netpoll.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2022 CloudWeGo Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux -// +build darwin netbsd freebsd openbsd dragonfly linux - -package netpoll - -import ( - "context" - "net" - "runtime" - "sync" -) - -// NewEventLoop . -func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) { - opts := &options{ - onRequest: onRequest, - } - for _, do := range ops { - do.f(opts) - } - return &eventLoop{ - opts: opts, - stop: make(chan error, 1), - }, nil -} - -type eventLoop struct { - sync.Mutex - opts *options - svr *server - stop chan error -} - -// Serve implements EventLoop. -func (evl *eventLoop) Serve(ln net.Listener) error { - npln, err := ConvertListener(ln) - if err != nil { - return err - } - evl.Lock() - evl.svr = newServer(npln, evl.opts, evl.quit) - evl.svr.Run() - evl.Unlock() - - err = evl.waitQuit() - // ensure evl will not be finalized until Serve returns - runtime.SetFinalizer(evl, nil) - return err -} - -// Shutdown signals a shutdown a begins server closing. -func (evl *eventLoop) Shutdown(ctx context.Context) error { - evl.Lock() - var svr = evl.svr - evl.svr = nil - evl.Unlock() - - if svr == nil { - return nil - } - evl.quit(nil) - return svr.Close(ctx) -} - -// waitQuit waits for a quit signal -func (evl *eventLoop) waitQuit() error { - return <-evl.stop -} - -func (evl *eventLoop) quit(err error) { - select { - case evl.stop <- err: - default: - } -} diff --git a/netpoll_config.go b/netpoll_config.go new file mode 100644 index 00000000..85c05925 --- /dev/null +++ b/netpoll_config.go @@ -0,0 +1,45 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import ( + "context" + "io" +) + +// global config +var ( + defaultLinkBufferSize = pagesize + featureAlwaysNoCopyRead = false +) + +// Config expose some tuning parameters to control the internal behaviors of netpoll. +// Every parameter with the default zero value should keep the default behavior of netpoll. +type Config struct { + PollerNum int // number of pollers + BufferSize int // default size of a new connection's LinkBuffer + Runner func(ctx context.Context, f func()) // runner for event handler, most of the time use a goroutine pool. + LoggerOutput io.Writer // logger output + LoadBalance LoadBalance // load balance for poller picker + Feature // define all features that not enable by default +} + +// Feature expose some new features maybe promoted as a default behavior but not yet. +type Feature struct { + // AlwaysNoCopyRead allows some copy Read functions like ReadBinary/ReadString + // will use NoCopy read and will not reuse the underlying buffer. + // It gains more performance benefits when need read much big string/bytes in codec. + AlwaysNoCopyRead bool +} diff --git a/netpoll_options.go b/netpoll_options.go index 7b225256..b72bba49 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -1,4 +1,4 @@ -// Copyright 2022 CloudWeGo Authors +// Copyright 2024 CloudWeGo Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,123 +14,21 @@ package netpoll -import ( - "context" - "io" - "log" - "os" - "runtime" - "time" -) +import "time" -var ( - pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) // pollmanager manage all pollers - logger = log.New(os.Stderr, "", log.LstdFlags) - - // global config - defaultLinkBufferSize = pagesize - featureAlwaysNoCopyRead = false -) - -// Config expose some tuning parameters to control the internal behaviors of netpoll. -// Every parameter with the default zero value should keep the default behavior of netpoll. -type Config struct { - PollerNum int // number of pollers - BufferSize int // default size of a new connection's LinkBuffer - Runner func(ctx context.Context, f func()) // runner for event handler, most of the time use a goroutine pool. - LoggerOutput io.Writer // logger output - LoadBalance LoadBalance // load balance for poller picker - Feature // define all features that not enable by default -} - -// Feature expose some new features maybe promoted as a default behavior but not yet. -type Feature struct { - // AlwaysNoCopyRead allows some copy Read functions like ReadBinary/ReadString - // will use NoCopy read and will not reuse the underlying buffer. - // It gains more performance benefits when need read much big string/bytes in codec. - AlwaysNoCopyRead bool -} - -// Configure the internal behaviors of netpoll. -// Configure must called in init() function, because the poller will read some global variable after init() finished -func Configure(config Config) (err error) { - if config.PollerNum > 0 { - if err = pollmanager.SetNumLoops(config.PollerNum); err != nil { - return err - } - } - if config.BufferSize > 0 { - defaultLinkBufferSize = config.BufferSize - } - - if config.Runner != nil { - setRunner(config.Runner) - } - if config.LoggerOutput != nil { - logger = log.New(config.LoggerOutput, "", log.LstdFlags) - } - if config.LoadBalance >= 0 { - if err = pollmanager.SetLoadBalance(config.LoadBalance); err != nil { - return err - } - } - - featureAlwaysNoCopyRead = config.AlwaysNoCopyRead - return nil -} - -// Initialize the pollers actively. By default, it's lazy initialized. -// It's safe to call it multi times. -func Initialize() { - // The first call of Pick() will init pollers - _ = pollmanager.Pick() -} - -// SetNumLoops is used to set the number of pollers, generally do not need to actively set. -// By default, the number of pollers is equal to runtime.GOMAXPROCS(0)/20+1. -// If the number of cores in your service process is less than 20c, theoretically only one poller is needed. -// Otherwise you may need to adjust the number of pollers to achieve the best results. -// Experience recommends assigning a poller every 20c. -// -// You can only use SetNumLoops before any connection is created. An example usage: -// -// func init() { -// netpoll.SetNumLoops(...) -// } -// -// Deprecated: use Configure instead. -func SetNumLoops(numLoops int) error { - return pollmanager.SetNumLoops(numLoops) -} - -// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt -// to distribute the incoming connections between multiple polls. -// This option only works when numLoops is set. -// Deprecated: use Configure instead. -func SetLoadBalance(lb LoadBalance) error { - return pollmanager.SetLoadBalance(lb) -} - -// SetLoggerOutput sets the logger output target. -// Deprecated: use Configure instead. -func SetLoggerOutput(w io.Writer) { - logger = log.New(w, "", log.LstdFlags) -} - -// SetRunner set the runner function for every OnRequest/OnConnect callback -// Deprecated: use Configure instead. -func SetRunner(f func(ctx context.Context, f func())) { - setRunner(f) +// Option . +type Option struct { + f func(*options) } -// DisableGopool will remove gopool(the goroutine pool used to run OnRequest), -// which means that OnRequest will be run via `go OnRequest(...)`. -// Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. -// But if you can confirm that the OnRequest will not cause stack expansion, -// it is recommended to use DisableGopool to reduce redundancy and improve performance. -// Deprecated: use Configure instead. -func DisableGopool() error { - return disableGopool() +type options struct { + onPrepare OnPrepare + onConnect OnConnect + onDisconnect OnDisconnect + onRequest OnRequest + readTimeout time.Duration + writeTimeout time.Duration + idleTimeout time.Duration } // WithOnPrepare registers the OnPrepare method to EventLoop. @@ -174,18 +72,3 @@ func WithIdleTimeout(timeout time.Duration) Option { op.idleTimeout = timeout }} } - -// Option . -type Option struct { - f func(*options) -} - -type options struct { - onPrepare OnPrepare - onConnect OnConnect - onDisconnect OnDisconnect - onRequest OnRequest - readTimeout time.Duration - writeTimeout time.Duration - idleTimeout time.Duration -} diff --git a/netpoll_unix.go b/netpoll_unix.go new file mode 100644 index 00000000..4eb25a05 --- /dev/null +++ b/netpoll_unix.go @@ -0,0 +1,179 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux +// +build darwin netbsd freebsd openbsd dragonfly linux + +package netpoll + +import ( + "context" + "io" + "log" + "net" + "os" + "runtime" + "sync" +) + +var ( + pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) // pollmanager manage all pollers + logger = log.New(os.Stderr, "", log.LstdFlags) +) + +// Initialize the pollers actively. By default, it's lazy initialized. +// It's safe to call it multi times. +func Initialize() { + // The first call of Pick() will init pollers + _ = pollmanager.Pick() +} + +// Configure the internal behaviors of netpoll. +// Configure must called in init() function, because the poller will read some global variable after init() finished +func Configure(config Config) (err error) { + if config.PollerNum > 0 { + if err = pollmanager.SetNumLoops(config.PollerNum); err != nil { + return err + } + } + if config.BufferSize > 0 { + defaultLinkBufferSize = config.BufferSize + } + + if config.Runner != nil { + setRunner(config.Runner) + } + if config.LoggerOutput != nil { + logger = log.New(config.LoggerOutput, "", log.LstdFlags) + } + if config.LoadBalance >= 0 { + if err = pollmanager.SetLoadBalance(config.LoadBalance); err != nil { + return err + } + } + + featureAlwaysNoCopyRead = config.AlwaysNoCopyRead + return nil +} + +// SetNumLoops is used to set the number of pollers, generally do not need to actively set. +// By default, the number of pollers is equal to runtime.GOMAXPROCS(0)/20+1. +// If the number of cores in your service process is less than 20c, theoretically only one poller is needed. +// Otherwise, you may need to adjust the number of pollers to achieve the best results. +// Experience recommends assigning a poller every 20c. +// +// You can only use SetNumLoops before any connection is created. An example usage: +// +// func init() { +// netpoll.SetNumLoops(...) +// } +// +// Deprecated: use Configure instead. +func SetNumLoops(numLoops int) error { + return pollmanager.SetNumLoops(numLoops) +} + +// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt +// to distribute the incoming connections between multiple polls. +// This option only works when numLoops is set. +// Deprecated: use Configure instead. +func SetLoadBalance(lb LoadBalance) error { + return pollmanager.SetLoadBalance(lb) +} + +// SetLoggerOutput sets the logger output target. +// Deprecated: use Configure instead. +func SetLoggerOutput(w io.Writer) { + logger = log.New(w, "", log.LstdFlags) +} + +// SetRunner set the runner function for every OnRequest/OnConnect callback +// Deprecated: use Configure instead. +func SetRunner(f func(ctx context.Context, f func())) { + setRunner(f) +} + +// DisableGopool will remove gopool(the goroutine pool used to run OnRequest), +// which means that OnRequest will be run via `go OnRequest(...)`. +// Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. +// But if you can confirm that the OnRequest will not cause stack expansion, +// it is recommended to use DisableGopool to reduce redundancy and improve performance. +// Deprecated: use Configure instead. +func DisableGopool() error { + return disableGopool() +} + +// NewEventLoop . +func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) { + opts := &options{ + onRequest: onRequest, + } + for _, do := range ops { + do.f(opts) + } + return &eventLoop{ + opts: opts, + stop: make(chan error, 1), + }, nil +} + +type eventLoop struct { + sync.Mutex + opts *options + svr *server + stop chan error +} + +// Serve implements EventLoop. +func (evl *eventLoop) Serve(ln net.Listener) error { + npln, err := ConvertListener(ln) + if err != nil { + return err + } + evl.Lock() + evl.svr = newServer(npln, evl.opts, evl.quit) + evl.svr.Run() + evl.Unlock() + + err = evl.waitQuit() + // ensure evl will not be finalized until Serve returns + runtime.SetFinalizer(evl, nil) + return err +} + +// Shutdown signals a shutdown a begins server closing. +func (evl *eventLoop) Shutdown(ctx context.Context) error { + evl.Lock() + var svr = evl.svr + evl.svr = nil + evl.Unlock() + + if svr == nil { + return nil + } + evl.quit(nil) + return svr.Close(ctx) +} + +// waitQuit waits for a quit signal +func (evl *eventLoop) waitQuit() error { + return <-evl.stop +} + +func (evl *eventLoop) quit(err error) { + select { + case evl.stop <- err: + default: + } +} diff --git a/netpoll_test.go b/netpoll_unix_test.go similarity index 100% rename from netpoll_test.go rename to netpoll_unix_test.go diff --git a/netpoll_windows.go b/netpoll_windows.go index 634d1ef9..86434e79 100644 --- a/netpoll_windows.go +++ b/netpoll_windows.go @@ -1,4 +1,4 @@ -// Copyright 2022 CloudWeGo Authors +// Copyright 2024 CloudWeGo Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,34 +20,11 @@ package netpoll import ( "net" - "time" ) -// Option . -type Option struct { - f func(*options) -} - -type options struct{} - -// WithOnPrepare registers the OnPrepare method to EventLoop. -func WithOnPrepare(onPrepare OnPrepare) Option { - return Option{} -} - -// WithOnConnect registers the OnConnect method to EventLoop. -func WithOnConnect(onConnect OnConnect) Option { - return Option{} -} - -// WithReadTimeout sets the read timeout of connections. -func WithReadTimeout(timeout time.Duration) Option { - return Option{} -} - -// WithIdleTimeout sets the idle timeout of connections. -func WithIdleTimeout(timeout time.Duration) Option { - return Option{} +// Configure the internal behaviors of netpoll. +func Configure(config Config) (err error) { + return nil } // NewDialer only support TCP and unix socket now.