Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
[#111]: feature: max queue size option
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Feb 1, 2024
2 parents bf0ed57 + dbd5d1b commit 91b2b9c
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.6.0
github.com/prometheus/client_golang v1.18.0
github.com/roadrunner-server/errors v1.3.0
github.com/roadrunner-server/errors v1.4.0
github.com/roadrunner-server/goridge/v3 v3.8.1
github.com/roadrunner-server/tcplisten v1.4.0
github.com/shirou/gopsutil v3.21.11+incompatible
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand All @@ -29,8 +27,8 @@ github.com/prometheus/common v0.46.0 h1:doXzt5ybi1HBKpsZOL0sSkaNHJJqkyfEWZGGqqSc
github.com/prometheus/common v0.46.0/go.mod h1:Tp0qkxpb9Jsg54QMe+EAmqXkSV7Evdy1BTn+g2pa/hQ=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/roadrunner-server/errors v1.3.0 h1:kLVXpXne0jMReN7pj8KIhyYyjqKjsPC5DRGqMsd4/Fo=
github.com/roadrunner-server/errors v1.3.0/go.mod h1:XYVuhXvxi3yQaP/zCLB6QRZ0JvQIRaBa0SKFHL4WLKg=
github.com/roadrunner-server/errors v1.4.0 h1:Odjg3VZrj1q5Y8ILwoN+JgERyv0pkhrWPNOM4h68iQ8=
github.com/roadrunner-server/errors v1.4.0/go.mod h1:78PvraAFj+Sxy5nDmo0S+h6rEMLFIDszWZxA3B0sPAs=
github.com/roadrunner-server/goridge/v3 v3.8.1 h1:mdS5lDKQwPuVJ2jwW7l5cngJNJiie7xEGwpgw7a6CuQ=
github.com/roadrunner-server/goridge/v3 v3.8.1/go.mod h1:L5UkNzD8aKLz6TzpqmmiHOJ6EnsadsWEYNoqK/4qoK0=
github.com/roadrunner-server/tcplisten v1.4.0 h1:yWo09zktv/CSV6VywLfw4pwNcUchgTiIrW4uIICtO5M=
Expand All @@ -45,8 +43,6 @@ github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
Expand Down
3 changes: 2 additions & 1 deletion ipc/pipe/pipe_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func Test_Pipe_StartError(t *testing.T) {
t.Errorf("error running the command: error %v", err)
}

ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
Expand Down
6 changes: 2 additions & 4 deletions pool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type Config struct {
Debug bool
// Command used to override the server command with the custom one
Command []string `mapstructure:"command"`
// MaxQueueSize is maximum allowed queue size with the pending requests to the workers poll
MaxQueueSize uint64 `mapstructure:"max_queue_size"`
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
NumWorkers uint64 `mapstructure:"num_workers"`
Expand Down Expand Up @@ -63,16 +65,12 @@ func (cfg *Config) InitDefaults() {
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
WatchTick time.Duration `mapstructure:"watch_tick"`

// TTL defines the maximum time for the worker is allowed to live.
TTL time.Duration `mapstructure:"ttl"`

// IdleTTL defines the maximum duration worker can spend in idle mode. Disabled when 0.
IdleTTL time.Duration `mapstructure:"idle_ttl"`

// ExecTTL defines maximum lifetime per job.
ExecTTL time.Duration `mapstructure:"exec_ttl"`

// MaxWorkerMemory limits memory per worker.
MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
}
Expand Down
6 changes: 6 additions & 0 deletions pool/static_pool/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ func WithLogger(z *zap.Logger) Options {
p.log = z
}
}

func WithQueueSize(l uint64) Options {
return func(p *Pool) {
p.maxQueueSize = l
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type Pool struct {
// allocate new worker
allocator func() (*worker.Process, error)
// exec queue size
queue uint64
queue uint64
maxQueueSize uint64
// used in the supervised mode
supervisedExec bool
stopCh chan struct{}
Expand All @@ -61,6 +62,7 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p
if cfg.Debug {
cfg.NumWorkers = 0
cfg.MaxJobs = 1
cfg.MaxQueueSize = 0
}

p := &Pool{
Expand Down Expand Up @@ -155,6 +157,11 @@ func (sp *Pool) Exec(ctx context.Context, p *payload.Payload, stopCh chan struct
return nil, errors.E(op, errors.Str("payload can not be empty"))
}

// check if we have space to put the request
if atomic.LoadUint64(&sp.maxQueueSize) != 0 && atomic.LoadUint64(&sp.queue) >= atomic.LoadUint64(&sp.maxQueueSize) {
return nil, errors.E(op, errors.QueueSize, errors.Str("max queue size reached"))
}

if sp.cfg.Debug {
switch sp.supervisedExec {
case true:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Test_NewPool(t *testing.T) {
}

func Test_NewPoolAddRemoveWorkers(t *testing.T) {
var testCfg2 = &pool.Config{
testCfg2 := &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 500,
DestroyTimeout: time.Second * 500,
Expand Down Expand Up @@ -145,7 +145,7 @@ func Test_StaticPool_ImmediateDestroy(t *testing.T) {
func Test_StaticPool_RemoveWorker(t *testing.T) {
ctx := context.Background()

var testCfg2 = &pool.Config{
testCfg2 := &pool.Config{
NumWorkers: 5,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
Expand Down Expand Up @@ -184,7 +184,7 @@ func Test_StaticPool_RemoveWorker(t *testing.T) {
}

func Test_Pool_Reallocate(t *testing.T) {
var testCfg2 = &pool.Config{
testCfg2 := &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 500,
DestroyTimeout: time.Second * 500,
Expand Down Expand Up @@ -230,7 +230,7 @@ func Test_Pool_Reallocate(t *testing.T) {
func Test_NewPoolReset(t *testing.T) {
ctx := context.Background()

var testCfg2 = &pool.Config{
testCfg2 := &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 500,
DestroyTimeout: time.Second * 500,
Expand Down Expand Up @@ -318,6 +318,59 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
p.Destroy(context.Background())
}

func Test_StaticPool_QueueSizeLimit(t *testing.T) {
testCfg2 := &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 500,
DestroyTimeout: time.Second * 500,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*500)
defer cancel()

p, err := NewPool(
ctx,
// sleep for 10 seconds
func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") },
pipe.NewPipeFactory(log()),
testCfg2,
log(),
WithQueueSize(1),
)
require.NoError(t, err)

defer p.Destroy(ctx)

assert.NotNil(t, p)
wg := &sync.WaitGroup{}
wg.Add(2)

go func() {
time.Sleep(time.Second * 2)
_, err1 := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{}))
require.Error(t, err1)
wg.Done()
}()
go func() {
time.Sleep(time.Second * 2)
_, err2 := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{}))
require.Error(t, err2)
wg.Done()
}()

re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{}))
res := <-re

assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body())
assert.Empty(t, res.Context())

assert.Equal(t, "hello world", res.Payload().String())
wg.Wait()

p.Destroy(ctx)
}

func Test_StaticPool_Echo(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
Expand Down Expand Up @@ -452,7 +505,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()

var cfg2 = &pool.Config{
cfg2 := &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
Expand Down Expand Up @@ -1086,7 +1139,9 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
// BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op
// BenchmarkToStringUnsafe-32 1000000000 0.4434 ns/op 0 B/op 0 allocs/op
func BenchmarkToStringUnsafe(b *testing.B) {
testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
testPayload := []byte(
"falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj",
)
b.ResetTimer()
b.ReportAllocs()

Expand All @@ -1099,7 +1154,9 @@ func BenchmarkToStringUnsafe(b *testing.B) {
// BenchmarkToStringSafe-32 8017846 182.5 ns/op 896 B/op 1 allocs/op
// inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op
func BenchmarkToStringSafe(b *testing.B) {
testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
testPayload := []byte(
"falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj",
)

b.ResetTimer()
b.ReportAllocs()
Expand Down

0 comments on commit 91b2b9c

Please sign in to comment.