Skip to content

Commit

Permalink
Add quotas to Permits (#3333)
Browse files Browse the repository at this point in the history
Allows a Permit impl to decide how much quota a caller is allowed to use
for each Permit acquired. Having a quota > 1 is a nice performance
optimization to avoid having to acquire a permit everytime a resource is
consumed.

Additionally the new Permit structure allows tracking if a Permit is
acquired/released more than once by a caller using a refCount. Allows
catching bugs in tests by panicing.
  • Loading branch information
ryanhall07 authored Mar 9, 2021
1 parent 3eec0ce commit ffdce8e
Show file tree
Hide file tree
Showing 20 changed files with 316 additions and 347 deletions.
3 changes: 0 additions & 3 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,6 @@ type DBConfiguration struct {
// of applying back-pressure or protecting the db nodes.
Limits LimitsConfiguration `yaml:"limits"`

// FetchTagged contains configuration related to the FetchTagged API endpoint.
FetchTagged FetchTaggedConfiguration `yaml:"fetchTagged"`

// WideConfig contains some limits for wide operations. These operations
// differ from regular paths by optimizing for query completeness across
// arbitary query ranges rather than speed.
Expand Down
4 changes: 0 additions & 4 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ db:
writeNewSeriesBackoffDuration: 2ms
tracing:
backend: jaeger
fetchTagged:
seriesBlocksPerBatch: 100
`

func TestConfiguration(t *testing.T) {
Expand Down Expand Up @@ -741,8 +739,6 @@ func TestConfiguration(t *testing.T) {
maxOutstandingRepairedBytes: 0
maxEncodersPerBlock: 0
writeNewSeriesPerSecond: 0
fetchTagged:
seriesBlocksPerBatch: 100
wide: null
tchannel: null
debug:
Expand Down
41 changes: 0 additions & 41 deletions src/cmd/services/m3dbnode/config/fetch_tagged.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
)

func TestFetchResultIterTest(t *testing.T) {
Expand All @@ -46,7 +47,7 @@ func TestFetchResultIterTest(t *testing.T) {

scope, ctx, nsID, resMap, start, end, db := setup(mocks)

blockPermits := &fakePermits{available: 5}
blockPermits := &fakePermits{available: 5, quotaPerPermit: 5}
iter := newFetchTaggedResultsIter(fetchTaggedResultsIterOpts{
queryResult: index.QueryResult{
Results: resMap,
Expand All @@ -59,7 +60,6 @@ func TestFetchResultIterTest(t *testing.T) {
db: db,
nsID: nsID,
blockPermits: blockPermits,
blocksPerBatch: 5,
nowFn: time.Now,
dataReadMetrics: index.NewQueryMetrics("", scope),
totalMetrics: index.NewQueryMetrics("", scope),
Expand All @@ -76,77 +76,22 @@ func TestFetchResultIterTest(t *testing.T) {
iter.Close(nil)

require.Equal(t, 10, total)
require.Equal(t, 5, blockPermits.acquired)
require.Equal(t, 5, blockPermits.released)
// 20 permits are not acquired because the accounting is not 100% accurate. permits are not acquired until
// after the block is processed, so a block might be eagerly processed and then permit acquisition fails.
require.Equal(t, 19, blockPermits.acquired)
require.Equal(t, 19, blockPermits.released)
requireSeriesBlockMetric(t, scope)
}

func TestFetchResultIterTestUnsetBlocksPerBatch(t *testing.T) {
mocks := gomock.NewController(t)
defer mocks.Finish()

scope, ctx, nsID, resMap, start, end, db := setup(mocks)

blockPermits := &fakePermits{available: 10}
iter := newFetchTaggedResultsIter(fetchTaggedResultsIterOpts{
queryResult: index.QueryResult{
Results: resMap,
},
queryOpts: index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
},
fetchData: true,
db: db,
nsID: nsID,
blockPermits: blockPermits,
nowFn: time.Now,
dataReadMetrics: index.NewQueryMetrics("", scope),
totalMetrics: index.NewQueryMetrics("", scope),
seriesBlocks: scope.Histogram("series-blocks", tally.MustMakeExponentialValueBuckets(10, 2, 5)),
instrumentClose: func(err error) {},
})
total := 0
for iter.Next(ctx) {
total++
require.NotNil(t, iter.Current())
require.Len(t, iter.Current().(*idResult).blockReaders, 10)
}
require.NoError(t, iter.Err())
iter.Close(nil)

require.Equal(t, 10, total)
require.Equal(t, 10, blockPermits.acquired)
require.Equal(t, 10, blockPermits.released)
requireSeriesBlockMetric(t, scope)
}

func TestFetchResultIterTestForceBlocksPerBatch(t *testing.T) {
blockPermits := &permits.LookbackLimitPermit{}
resMap := index.NewQueryResults(ident.StringID("testNs"), index.QueryResultsOptions{}, testIndexOptions)
iter := newFetchTaggedResultsIter(fetchTaggedResultsIterOpts{
queryResult: index.QueryResult{
Results: resMap,
},
blockPermits: blockPermits,
blocksPerBatch: 1000,
nowFn: time.Now,
})
downcast, ok := iter.(*fetchTaggedResultsIter)
require.True(t, ok)
require.Equal(t, 1, downcast.blocksPerBatch)
}

func TestFetchResultIterTestNoReleaseWithoutAcquire(t *testing.T) {
blockPermits := &fakePermits{available: 10}
blockPermits := &fakePermits{available: 10, quotaPerPermit: 1000}
emptyMap := index.NewQueryResults(ident.StringID("testNs"), index.QueryResultsOptions{}, testIndexOptions)
scope := tally.NewTestScope("", map[string]string{})
iter := newFetchTaggedResultsIter(fetchTaggedResultsIterOpts{
queryResult: index.QueryResult{
Results: emptyMap,
},
blockPermits: blockPermits,
blocksPerBatch: 1000,
nowFn: time.Now,
instrumentClose: func(err error) {},
dataReadMetrics: index.NewQueryMetrics("", scope),
Expand Down Expand Up @@ -204,30 +149,31 @@ func setup(mocks *gomock.Controller) (
}

type fakePermits struct {
acquired int
released int
available int
acquired int
released int
available int
quotaPerPermit int64
}

func (p *fakePermits) Acquire(_ context.Context) error {
func (p *fakePermits) Acquire(_ context.Context) (permits.Permit, error) {
if p.available == 0 {
return errors.New("available should never be 0")
return nil, errors.New("available should never be 0")
}
p.available--
p.acquired++
return nil
return permits.NewPermit(p.quotaPerPermit, instrument.NewOptions()), nil
}

func (p *fakePermits) TryAcquire(_ context.Context) (bool, error) {
func (p *fakePermits) TryAcquire(_ context.Context) (permits.Permit, error) {
if p.available == 0 {
return false, nil
return nil, nil
}
p.available--
p.acquired++
return true, nil
return permits.NewPermit(p.quotaPerPermit, instrument.NewOptions()), nil
}

func (p *fakePermits) Release(_ int64) {
func (p *fakePermits) Release(_ permits.Permit) {
p.released++
p.available++
}
64 changes: 29 additions & 35 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,6 @@ func (s *service) fetchTaggedIter(
fetchStart: startTime,
dataReadMetrics: s.metrics.queryTimingDataRead,
totalMetrics: s.metrics.queryTimingFetchTagged,
blocksPerBatch: s.opts.FetchTaggedSeriesBlocksPerBatch(),
seriesBlocks: s.metrics.fetchTaggedSeriesBlocks,
}), nil
}
Expand Down Expand Up @@ -912,8 +911,8 @@ type fetchTaggedResultsIter struct {
blockReadIdx int
cur IDResult
err error
batchesAcquired int
blocksAvailable int
permits []permits.Permit
unreleasedQuota int64
dataReadStart time.Time
totalSeriesBlocks int
}
Expand All @@ -929,7 +928,6 @@ type fetchTaggedResultsIterOpts struct {
iOpts instrument.Options
instrumentClose func(error)
blockPermits permits.Permits
blocksPerBatch int
nowFn clock.NowFn
fetchStart time.Time
totalDocsCount int
Expand All @@ -939,17 +937,11 @@ type fetchTaggedResultsIterOpts struct {
}

func newFetchTaggedResultsIter(opts fetchTaggedResultsIterOpts) FetchTaggedResultsIter { //nolint: gocritic
_, limitBased := opts.blockPermits.(*permits.LookbackLimitPermit)
if opts.blocksPerBatch == 0 || limitBased {
// NB(nate): if blocksPerBatch is unset, set blocksPerBatch to 1 (i.e. acquire a permit
// for each block as opposed to acquiring in bulk). Additionally, limit-based permits
// are required to use a blocksPerBatch size of 1 so as to not throw off limit accounting.
opts.blocksPerBatch = 1
}
return &fetchTaggedResultsIter{
fetchTaggedResultsIterOpts: opts,
idResults: make([]idResult, 0, opts.queryResult.Results.Map().Len()),
dataReadStart: opts.nowFn(),
permits: make([]permits.Permit, 0),
}
}

Expand Down Expand Up @@ -993,7 +985,7 @@ func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool {
}
} else {
// release the permits and memory from the previous block readers.
i.releaseAll(i.idx - 1)
i.releaseQuotaUsed(i.idx - 1)
i.idResults[i.idx-1].blockReaders = nil
}

Expand Down Expand Up @@ -1038,37 +1030,45 @@ func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool {

// acquire a block permit for a series ID. returns true if a permit is available.
func (i *fetchTaggedResultsIter) acquire(ctx context.Context, idx int) (bool, error) {
if i.blocksAvailable > 0 {
i.blocksAvailable--
} else {
var curPermit permits.Permit
if len(i.permits) > 0 {
curPermit = i.permits[len(i.permits)-1]
}
if curPermit == nil || curPermit.QuotaRemaining() <= 0 {
if i.idx == idx {
// block acquiring if we need the block readers to fulfill the current fetch.
if err := i.blockPermits.Acquire(ctx); err != nil {
permit, err := i.blockPermits.Acquire(ctx)
if err != nil {
return false, err
}
i.permits = append(i.permits, permit)
curPermit = permit
} else {
// don't block if we are prefetching for a future seriesID.
acquired, err := i.blockPermits.TryAcquire(ctx)
permit, err := i.blockPermits.TryAcquire(ctx)
if err != nil {
return false, err
}
if !acquired {
if permit == nil {
return false, nil
}
i.permits = append(i.permits, permit)
curPermit = permit
}
i.batchesAcquired++
i.blocksAvailable = i.blocksPerBatch - 1
}
i.idResults[idx].blocksAcquired++
curPermit.Use(1)
i.idResults[idx].quotaUsed++
return true, nil
}

// release all the block permits acquired by a series ID that has been processed.
func (i *fetchTaggedResultsIter) releaseAll(idx int) {
// Note: the actual batch permits are not released until the query completely finishes and the iterator
// closes.
for n := 0; n < i.idResults[idx].blocksAcquired; n++ {
i.blocksAvailable++
func (i *fetchTaggedResultsIter) releaseQuotaUsed(idx int) {
i.unreleasedQuota += i.idResults[idx].quotaUsed
for i.unreleasedQuota > 0 && i.unreleasedQuota >= i.permits[0].AllowedQuota() {
p := i.permits[0]
i.blockPermits.Release(p)
i.unreleasedQuota -= p.AllowedQuota()
i.permits = i.permits[1:]
}
}

Expand All @@ -1092,15 +1092,9 @@ func (i *fetchTaggedResultsIter) Close(err error) {

i.seriesBlocks.RecordValue(float64(i.totalSeriesBlocks))

// No need to release resources if we acquired no batches.
if i.batchesAcquired == 0 {
return
}

for n := 0; n < i.batchesAcquired-1; n++ {
i.blockPermits.Release(int64(i.blocksPerBatch))
for _, p := range i.permits {
i.blockPermits.Release(p)
}
i.blockPermits.Release(int64(i.blocksPerBatch - i.blocksAvailable))
}

// IDResult is the FetchTagged result for a series ID.
Expand All @@ -1124,7 +1118,7 @@ type idResult struct {
tagEncoder serialize.TagEncoder
blockReadersIter series.BlockReaderIter
blockReaders [][]xio.BlockReader
blocksAcquired int
quotaUsed int64
iOpts instrument.Options
}

Expand Down
4 changes: 0 additions & 4 deletions src/dbnode/network/server/tchannelthrift/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ type Options interface {
// SetPermitsOptions sets the permits options.
SetPermitsOptions(value permits.Options) Options

// FetchTaggedSeriesBlocksPerBatch gets the series blocks allowed to be read
// per permit acquired.
FetchTaggedSeriesBlocksPerBatch() int

// SetFetchTaggedSeriesBlocksPerBatch sets the series blocks allowed to be read
// per permit acquired.
SetFetchTaggedSeriesBlocksPerBatch(value int) Options
Expand Down
Loading

0 comments on commit ffdce8e

Please sign in to comment.