diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index c3d667b6d..8c21488ca 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -37,6 +37,7 @@ import ( "go.uber.org/cadence/internal/common/backoff" "go.uber.org/cadence/internal/common/metrics" "go.uber.org/cadence/internal/common/util" + "go.uber.org/cadence/internal/pahlimiter" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/time/rate" @@ -137,6 +138,8 @@ type ( pollerAutoScaler *pollerAutoScaler taskQueueCh chan interface{} sessionTokenBucket *sessionTokenBucket + + pollAndHistoryLimiter pahlimiter.PollAndHistoryLimiter } polledTask struct { @@ -288,8 +291,9 @@ func (bw *baseWorker) pollTask() { } } - bw.retrier.Throttle() + bw.retrier.Throttle() // sleeps if retry policy determines it should sleep after failures if bw.pollLimiter == nil || bw.pollLimiter.Wait(bw.limiterContext) == nil { + // TODO: block here on a shared semaphore with history-loading? task, err = bw.options.taskWorker.PollTask() if err != nil && enableVerboseLogging { bw.logger.Debug("Failed to poll for task.", zap.Error(err)) diff --git a/internal/pahlimiter/limiter.go b/internal/pahlimiter/limiter.go new file mode 100644 index 000000000..35841c03d --- /dev/null +++ b/internal/pahlimiter/limiter.go @@ -0,0 +1,305 @@ +// Package pahlimiter contains a PollAndHistoryLimiter, used to share resources between polls and history loading, +// to prevent flooding the server with history requests that will not complete in a reasonable time. +package pahlimiter + +import ( + "context" + "errors" + "sync" +) + +type ( + // PollAndHistoryLimiter defines an interface used to share request resources between pollers and history iterator + // funcs, to prevent unsustainable growth of history-loading requests. + // + // this is intended to be used with other poller limiters and retry backoffs, not on its own. + // + // implementations include: + // - NewUnlimited (historical behavior, a noop) + // - NewHistoryLimited (limits history requests, does not limit polls) + // - NewWeighted (history requests "consume" poll requests, and can reduce or stop polls) + PollAndHistoryLimiter interface { + // Poll will try to acquire a poll resource, + // blocking until it succeeds or the context is canceled. + // + // The done func will release the resource - it will always be returned and can be called multiple times, + // only the first will have an effect. + // TODO: see if this is necessary... but it's easy and safe. + Poll(context.Context) (ok bool, done func()) + // GetHistory will try to acquire a history-downloading resource, + // blocking until it succeeds or the context is canceled. + // + // The done func will release the resource - it will always be returned and can be called multiple times, + // only the first will have an effect. + // TODO: see if this is necessary... but it's easy and safe. + GetHistory(context.Context) (ok bool, done func()) + + // Close will clean up any resources, call at worker shutdown. + // This blocks until they are cleaned up. + Close() + } + unlimited struct{} + history struct { + tokens chan struct{} // sized at startup + } + weighted struct { + stopOnce sync.Once + + // close to clean up resources + stop chan struct{} + // closed when cleaned up + stopped chan struct{} + + // used to signal history requests starting and stopping + historyStart, historyDone chan struct{} + // used to signal poll requests starting and stopping + pollStart, pollDone chan struct{} + } +) + +var _ PollAndHistoryLimiter = (*unlimited)(nil) +var _ PollAndHistoryLimiter = (*history)(nil) +var _ PollAndHistoryLimiter = (*weighted)(nil) + +// NewUnlimited creates a new "unlimited" poll-and-history limiter, which does not constrain either operation. +// This is the default, historical behavior. +func NewUnlimited() (PollAndHistoryLimiter, error) { + return (*unlimited)(nil), nil +} + +func (*unlimited) Poll(_ context.Context) (ok bool, done func()) { return true, func() {} } +func (*unlimited) GetHistory(_ context.Context) (ok bool, done func()) { return true, func() {} } +func (*unlimited) Close() {} + +// NewHistoryLimited creates a simple limiter, which allows a specified number of concurrent history requests, +// and does not limit polls at all. +// +// This implementation is NOT expected to be used widely, but it exists as a trivially-safe fallback implementation +// that will still behave better than the historical default. +// +// This is very simple and should be sufficient to stop request floods during rate-limiting with many pending decision +// tasks, but seems likely to allow too many workflows to *attempt* to make progress on a host, starving progress +// when the sticky cache is higher than this size and leading to interrupted or timed out decision tasks. +func NewHistoryLimited(concurrentHistoryRequests int) (PollAndHistoryLimiter, error) { + l := &history{ + tokens: make(chan struct{}, concurrentHistoryRequests), + } + // fill the token buffer + for i := 0; i < concurrentHistoryRequests; i++ { + l.tokens <- struct{}{} + } + return l, nil +} + +func (p *history) Poll(_ context.Context) (ok bool, done func()) { return true, func() {} } +func (p *history) Close() {} +func (p *history) GetHistory(ctx context.Context) (ok bool, done func()) { + select { + case <-p.tokens: + var once sync.Once + return true, func() { + once.Do(func() { + p.tokens <- struct{}{} + }) + } + case <-ctx.Done(): + return false, func() {} // canceled, nothing to release + } +} + +// NewWeighted creates a new "weighted" poll-and-handler limiter, which shares resources between history requests +// and polls. +// +// Each running poll or history request consumes its weight in total available (capped at max) resources, and one +// request type is allowed to reduce resources for or starve the other completely. +// +// Since this runs "inside" other poller limiting, having equal or lesser poll-resources than the poller limiter +// will allow history requests to block polls... and if history weights are lower, they can perpetually starve polls +// by not releasing enough resources. +// +// **This is intended behavior**, as it can be used to cause a heavily-history-loading worker to stop pulling more +// workflows that may also need their history loaded, until some resources free up. +// +// --- +// +// The reverse situation, where history resources cannot prevent polls, may lead to some undesirable behavior. +// Continually adding workflows while not allowing them to pull history degrades to NewHistoryLimited behavior: +// it is easily possible to have hundreds or thousands of workflows trying to load history, but few or none of them +// are allowed through this limiter to actually perform that request. +// +// In this situation it will still limit the number of actual concurrent requests to load history, but with a very +// large increase in complexity. If you want this, strongly consider just using NewHistoryLimited. +// +// --- +// +// All that said: this is NOT built to be a reliable blocker of polls for at least two reasons: +// - History iterators do not hold their resources between loading (and executing) pages of history, causing a gap +// where a poller could claim resources despite the service being "too busy" loading history from a human's view. +// - History iterators race with polls. If enough resources are available and both possibilities can be satisfied, +// Go chooses fairly between them. +// +// To reduce the chance of this happening, keep history weights relatively small compared to polls, so many concurrent +// workflows loading history will be unlikely to free up enough resources for a poll to occur. +func NewWeighted(pollRequestWeight, historyRequestWeight, maxResources int) (PollAndHistoryLimiter, error) { + if historyRequestWeight > maxResources || pollRequestWeight > maxResources { + return nil, errors.New("weights must be less than max resources, or no requests can be sent") + } + + l := &weighted{ + stopOnce: sync.Once{}, + stop: make(chan struct{}), + stopped: make(chan struct{}), + historyStart: make(chan struct{}), + historyDone: make(chan struct{}), + pollStart: make(chan struct{}), + pollDone: make(chan struct{}), + } + l.init(pollRequestWeight, historyRequestWeight, maxResources) + return l, nil +} + +func (p *weighted) init(pollRequestWeight, historyRequestWeight, maxResources int) { + // mutated only by the actor goroutine + available := maxResources + + // start an actor-goroutine to simplify concurrency logic with many possibilities at any time. + // all logic is decided single-threaded, run by this goroutine, and every operation (except stop) is blocking. + // + // this actor only sends to history/poll channels. + // modifying functions only read from them. + // both read from "stop" and "stopped". + // + // - by reading from a channel, the caller has successfully acquired or released resources, and it can immediately proceed. + // - by sending on a channel, this actor has observed that resources are changed, and it must update its state. + // - by closing `p.stop`, this limiter will stop reading from channels. + // - ALL channel operations (except stop) will block forever. + // - this means "xDone" resource-releasing must also read from `p.stop`. + // - because `p.stop` races with other channel operations, stop DOES NOT guarantee no further polls will start, + // even on the same goroutine, until `Close()` returns. + // - this is one reason why `Close()` waits for the actor to exit. without it, you do not have sequential + // logic guarantees. + // - you can `Close()` any number of times from any goroutines, all calls will wait for the actor to stop. + // + // all operations are "fast", and it must remain this way. + // callers block while waiting on this actor, including when releasing resources. + go func() { + defer func() { close(p.stopped) }() + for { + // every branch must: + // 1. read from `p.stop`, so this limiter can be stopped. + // 2. write to "done" chans, so resources can be freed. + // 3. optionally write to "start" chans, so resources can be acquired + // + // doing otherwise for any reason risks deadlocks or invalid resource values. + + if available >= pollRequestWeight && available >= historyRequestWeight { + // resources available for either == wait for either + select { + case <-p.stop: + return + + case p.historyStart <- struct{}{}: + available -= historyRequestWeight + case p.pollStart <- struct{}{}: + available -= pollRequestWeight + + case p.historyDone <- struct{}{}: + available += historyRequestWeight + case p.pollDone <- struct{}{}: + available += pollRequestWeight + } + } else if available >= pollRequestWeight && available < historyRequestWeight { + // only poll resources available + select { + case <-p.stop: + return + + // case p.historyStart <- struct{}{}: // insufficient resources + case p.pollStart <- struct{}{}: + available -= pollRequestWeight + + case p.historyDone <- struct{}{}: + available += historyRequestWeight + case p.pollDone <- struct{}{}: + available += pollRequestWeight + } + } else if available < pollRequestWeight && available >= historyRequestWeight { + // only history resources available + select { + case <-p.stop: + return + + case p.historyStart <- struct{}{}: + available -= historyRequestWeight + // case p.pollStart <- struct{}{}: // insufficient resources + + case p.historyDone <- struct{}{}: + available += historyRequestWeight + case p.pollDone <- struct{}{}: + available += pollRequestWeight + } + } else { + // no resources for either, wait for something to finish + select { + case <-p.stop: + return + + // case p.historyStart <- struct{}{}: // insufficient resources + // case p.pollStart <- struct{}{}: // insufficient resources + + case p.historyDone <- struct{}{}: + available += historyRequestWeight + case p.pollDone <- struct{}{}: + available += pollRequestWeight + } + } + } + }() +} + +func (p *weighted) Close() { + p.stopOnce.Do(func() { + close(p.stop) + }) + <-p.stopped +} + +func (p *weighted) Poll(ctx context.Context) (ok bool, done func()) { + select { + case <-ctx.Done(): + return false, func() {} // canceled + case <-p.stop: + return false, func() {} // shutting down + case <-p.pollStart: + // resource acquired + var once sync.Once + return true, func() { + once.Do(func() { + select { + case <-p.pollDone: // released + case <-p.stop: // shutting down + } + }) + } + } +} + +func (p *weighted) GetHistory(ctx context.Context) (ok bool, done func()) { + select { + case <-ctx.Done(): + return false, func() {} // canceled + case <-p.stop: + return false, func() {} // shutting down + case <-p.historyStart: + // resource acquired + var once sync.Once + return true, func() { + once.Do(func() { + select { + case <-p.historyDone: // released + case <-p.stop: // shutting down + } + }) + } + } +} diff --git a/internal/pahlimiter/limiter_test.go b/internal/pahlimiter/limiter_test.go new file mode 100644 index 000000000..a942a0c57 --- /dev/null +++ b/internal/pahlimiter/limiter_test.go @@ -0,0 +1,329 @@ +package pahlimiter + +import ( + "context" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// something "quick" but long enough to not succumb to test noise. +// non-limited tests should be many, many times faster than this. +// all tests must be parallel so the total time for the suite is kept low +var fastTest = 100 * time.Millisecond + +func countWhileLeaking(method func(context.Context) (bool, func()), timeout time.Duration) (started int) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for i := 0; i < 100; i++ { + ok, _ := method(ctx) // intentionally leaking close-resources + if ok { + started += 1 + } + } + return started +} + +func TestUnlimited(t *testing.T) { + t.Parallel() + + // sharing one across all tests to exercise it further + l, err := NewUnlimited() + require.NoError(t, err, "should be impossible") + defer l.Close() + + t.Run("should allow many history requests", func(t *testing.T) { + t.Parallel() + started := countWhileLeaking(l.GetHistory, fastTest) + assert.Equal(t, 100, started, "all requests should have been allowed") + }) + t.Run("should allow many poll requests", func(t *testing.T) { + t.Parallel() + started := countWhileLeaking(l.Poll, fastTest) + assert.Equal(t, 100, started, "all requests should have been allowed") + }) +} + +func TestHistoryLimited(t *testing.T) { + t.Parallel() + + t.Run("should not limit poll requests", func(t *testing.T) { + t.Parallel() + l, err := NewHistoryLimited(5) + require.NoError(t, err, "should be impossible") + defer l.Close() + + started := countWhileLeaking(l.Poll, fastTest) + assert.Equal(t, 100, started, "all requests should have been allowed") + }) + t.Run("should limit history requests", func(t *testing.T) { + t.Parallel() + l, err := NewHistoryLimited(5) + require.NoError(t, err, "should be impossible") + defer l.Close() + + started := countWhileLeaking(l.GetHistory, fastTest) + assert.Equal(t, 5, started, "history should have been limited at 5 concurrent requests") + }) + t.Run("limited history should not limit polls", func(t *testing.T) { + t.Parallel() + + l, err := NewHistoryLimited(5) + require.NoError(t, err, "should be impossible") + defer l.Close() + + assert.Equal(t, 100, countWhileLeaking(l.Poll, fastTest), "pre-polls should have all succeeded") + assert.Equal(t, 5, countWhileLeaking(l.GetHistory, fastTest), "history should have been limited at 5 concurrent requests") + assert.Equal(t, 100, countWhileLeaking(l.Poll, fastTest), "post-polls should have all succeeded") + }) + t.Run("concurrent thrashing", func(t *testing.T) { + t.Parallel() + const maxConcurrentHistory = 5 + l, err := NewHistoryLimited(maxConcurrentHistory) + require.NoError(t, err, "should be impossible") + defer l.Close() + + // do a bunch of requests in parallel, assert that successes are within bounds + + var wg sync.WaitGroup + wg.Add(100) // polls + wg.Add(20) // histories + var polls int64 + var histories int64 + + // fire up a lot of polls, randomly spread within < fasttest + pollCtx, cancel := context.WithTimeout(context.Background(), fastTest) + defer cancel() + for i := 0; i < 100; i++ { + go func() { + // start at random times + time.Sleep(time.Duration(rand.Int63n(int64(fastTest)))) + ok, _ := l.Poll(pollCtx) // intentionally leaking resources + if ok { + atomic.AddInt64(&polls, 1) + } + wg.Done() + }() + } + + // fire up a handful of gethistories, hold resources for a bit, make sure some but not all succeed + historyCtx, cancel := context.WithTimeout(context.Background(), fastTest) + defer cancel() + for i := 0; i < 20; i++ { + go func() { + // everything competes at the start. + // 5 will succeed, the rest will be random + ok, release := l.GetHistory(historyCtx) + if ok { + atomic.AddInt64(&histories, 1) + // each one holds the resource for 50% to ~100% of the time limit + time.Sleep(fastTest / 2) // 1/2 + time.Sleep(time.Duration(rand.Int63n(int64(fastTest / 2)))) // plus <1/2 + // then release + release() + } + wg.Done() + }() + } + + wg.Wait() + finalPolls := atomic.LoadInt64(&polls) + finalHistories := atomic.LoadInt64(&histories) + // all polls should have succeeded + assert.Equal(t, int64(100), finalPolls) + // should have had more than max-concurrent successes == used some freed resources + assert.Greater(t, finalHistories, int64(maxConcurrentHistory)) + // should have had less than N total successes == had contention for resources. + // in practice this is heavily biased towards 10 (never 11, no resource is held 3 times as that would have to start after the timeout), + // as resources are acquired at least once and *usually* twice. + assert.Less(t, finalHistories, int64((maxConcurrentHistory*2)+1)) + t.Log("histories:", finalHistories) // to also show successful values with -v + }) +} + +func TestWeighted(t *testing.T) { + t.Parallel() + + t.Run("sequential works", func(t *testing.T) { + t.Parallel() + + l, err := NewWeighted(1, 1, 10) + require.NoError(t, err, "should be impossible") + defer l.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + ok, release := l.Poll(ctx) + assert.True(t, ok) + release() + + ok, release = l.GetHistory(ctx) + assert.True(t, ok) + release() + }) + t.Run("fair thrashing should be roughly fair", func(t *testing.T) { + t.Parallel() + // this is the actual test being done: thrash on a balanced-weight limiter, + // and make sure the behavior is balanced. + // + // since "balanced" is hard to determine with random channel behavior, + // this is just run multiple times, and checked for excessive skew. + test := func() (int64, int64) { + l, err := NewWeighted(1, 1, 10) + require.NoError(t, err, "should be impossible") + defer l.Close() + + return thrashForFairness(l) + } + + // run N times in parallel to reduce noise. + // at least one of them must be fair, and none may have zeros. + var wg sync.WaitGroup + type result struct { + polls, histories int64 + } + results := make([]result, 5) + wg.Add(len(results)) + for i := 0; i < len(results); i++ { + i := i + go func() { + polls, histories := test() + + // this should never happen + assert.NotZero(t, polls) + assert.NotZero(t, histories) + + results[i] = result{ + polls: polls, + histories: histories, + } + wg.Done() + }() + } + wg.Wait() + + t.Log("results:", results) + for _, r := range results { + // must be within 1/3 as much, found experimentally. + // out of 1000+ tries locally, only a couple have checked the third value. + if r.polls < (r.histories/3)*2 { + t.Log("low polls, checking next") + continue + } + if r.histories < (r.polls/3)*2 { + t.Log("low histories, checking next") + continue + } + + t.Log("fair enough!") + break // no need to check the rest + } + }) + t.Run("imbalanced thrashing should starve the higher weight", func(t *testing.T) { + t.Parallel() + // similar to fair-thrashing, but in this case we expect the lower-weight history + // requests to starve polls most of the time (as two need to release for a single poll to run). + test := func() (int64, int64) { + l, err := NewWeighted(2, 1, 10) + require.NoError(t, err, "should be impossible") + defer l.Close() + + return thrashForFairness(l) + } + + // run N times in parallel to reduce noise. + // none may have zeros, and it should be heavily skewed in favor of history requests + var wg sync.WaitGroup + type result struct { + polls, histories int64 + } + results := make([]result, 5) + wg.Add(len(results)) + for i := 0; i < len(results); i++ { + i := i + go func() { + polls, histories := test() + + // this should never happen. + // polls, however, do sometimes go to zero. + assert.NotZero(t, histories) + + results[i] = result{ + polls: polls, + histories: histories, + } + wg.Done() + }() + } + wg.Wait() + + t.Log("results:", results) + for _, r := range results { + // histories must be favored 5x or better, and polls must sometimes be used, found experimentally. + // + // this combination ensures we do not succeed when completely starving polls (== 0), + // as that would be unexpected, and likely a sign of a bad implementation. + // + // out of 1000+ tries locally, around 1% have checked the third value. + if r.polls > 0 && r.polls > (r.histories/5) { + t.Log("high polls, checking next") + continue + } + + t.Log("found expected imbalance") + break // no need to check the rest + } + }) +} + +func thrashForFairness(l PollAndHistoryLimiter) (polls, histories int64) { + var wg sync.WaitGroup + wg.Add(100) // polls + wg.Add(100) // histories + + ctx, cancel := context.WithTimeout(context.Background(), fastTest) + defer cancel() + + useResource := func() { + // use the resource for 10% to 20% of the test time. + // actual amount doesn't matter, just target middling-ish consumption to avoid noise. + // this value was found experimentally, just run with -v and check the result logs. + time.Sleep(time.Duration(int64(fastTest/10) + rand.Int63n(int64(fastTest/10)))) + } + + var p, h int64 + for i := 0; i < 100; i++ { + go func() { + time.Sleep(time.Duration(rand.Int63n(int64(fastTest)))) + ok, release := l.Poll(ctx) + if ok { + atomic.AddInt64(&p, 1) + useResource() + release() + } + wg.Done() + }() + go func() { + // randomly start through the test + time.Sleep(time.Duration(rand.Int63n(int64(fastTest)))) + ok, release := l.GetHistory(ctx) + if ok { + atomic.AddInt64(&h, 1) + useResource() + release() + } + wg.Done() + }() + } + + wg.Wait() + + return atomic.LoadInt64(&p), atomic.LoadInt64(&h) +}