Skip to content

Commit

Permalink
add RequestLimit to Query and Scan (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
guregu committed May 4, 2024
1 parent aa3c35c commit d5fb452
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
17 changes: 17 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Query struct {
consistent bool
limit int64
searchLimit int64
reqLimit int
order *Order

subber
Expand Down Expand Up @@ -177,11 +178,18 @@ func (q *Query) Limit(limit int64) *Query {
// SearchLimit specifies the maximum amount of results to examine.
// If a filter is not specified, the number of results will be limited.
// If a filter is specified, the number of results to consider for filtering will be limited.
// SearchLimit > 0 implies RequestLimit(1).
func (q *Query) SearchLimit(limit int64) *Query {
q.searchLimit = limit
return q
}

// RequestLimit specifies the maximum amount of requests to make against DynamoDB's API.
func (q *Query) RequestLimit(limit int) *Query {
q.reqLimit = limit
return q
}

// Order specifies the desired result order.
// Requires a range key (a.k.a. partition key) to be specified.
func (q *Query) Order(order Order) *Query {
Expand Down Expand Up @@ -320,6 +328,7 @@ type queryIter struct {
err error
idx int
n int64
reqs int

// last item evaluated
last map[string]*dynamodb.AttributeValue
Expand Down Expand Up @@ -379,6 +388,10 @@ func (itr *queryIter) NextWithContext(ctx context.Context, out interface{}) bool
if itr.output.LastEvaluatedKey == nil || itr.query.searchLimit > 0 {
return false
}
// have we hit the request limit?
if itr.query.reqLimit > 0 && itr.reqs == itr.query.reqLimit {
return false
}

// no, prepare next request and reset index
itr.input.ExclusiveStartKey = itr.output.LastEvaluatedKey
Expand All @@ -400,8 +413,12 @@ func (itr *queryIter) NextWithContext(ctx context.Context, out interface{}) bool
if len(itr.output.LastEvaluatedKey) > len(itr.exLEK) {
itr.exLEK = itr.output.LastEvaluatedKey
}
itr.reqs++

if len(itr.output.Items) == 0 {
if itr.query.reqLimit > 0 && itr.reqs == itr.query.reqLimit {
return false
}
if itr.output.LastEvaluatedKey != nil {
// we need to retry until we get some data
return itr.NextWithContext(ctx, out)
Expand Down
19 changes: 18 additions & 1 deletion scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Scan struct {
consistent bool
limit int64
searchLimit int64
reqLimit int

segment int64
totalSegments int64
Expand Down Expand Up @@ -120,14 +121,21 @@ func (s *Scan) Limit(limit int64) *Scan {
return s
}

// SearchLimit specifies a maximum amount of results to evaluate.
// SearchLimit specifies the maximum amount of results to evaluate.
// Use this along with StartFrom and Iter's LastEvaluatedKey to split up results.
// Note that DynamoDB limits result sets to 1MB.
// SearchLimit > 0 implies RequestLimit(1).
func (s *Scan) SearchLimit(limit int64) *Scan {
s.searchLimit = limit
return s
}

// RequestLimit specifies the maximum amount of requests to make against DynamoDB's API.

This comment has been minimized.

Copy link
@extemporalgenome

extemporalgenome May 5, 2024

Perhaps we should define in the doc comment the behavior when the given limit argument is <= 0 ?

This comment has been minimized.

Copy link
@guregu

guregu May 8, 2024

Author Owner

Good idea

func (s *Scan) RequestLimit(limit int) *Scan {
s.reqLimit = limit
return s
}

// ConsumedCapacity will measure the throughput capacity consumed by this operation and add it to cc.
func (s *Scan) ConsumedCapacity(cc *ConsumedCapacity) *Scan {
s.cc = cc
Expand Down Expand Up @@ -335,6 +343,7 @@ type scanIter struct {
err error
idx int
n int64
reqs int

// last item evaluated
last map[string]*dynamodb.AttributeValue
Expand Down Expand Up @@ -395,6 +404,10 @@ redo:
if itr.output.LastEvaluatedKey == nil || itr.scan.searchLimit > 0 {
return false
}
// have we hit the request limit?
if itr.scan.reqLimit > 0 && itr.reqs == itr.scan.reqLimit {
return false
}

// no, prepare next request and reset index
itr.input.ExclusiveStartKey = itr.output.LastEvaluatedKey
Expand All @@ -416,8 +429,12 @@ redo:
if len(itr.output.LastEvaluatedKey) > len(itr.exLEK) {
itr.exLEK = itr.output.LastEvaluatedKey
}
itr.reqs++

if len(itr.output.Items) == 0 {
if itr.scan.reqLimit > 0 && itr.reqs == itr.scan.reqLimit {
return false
}
if itr.output.LastEvaluatedKey != nil {
goto redo
}
Expand Down

0 comments on commit d5fb452

Please sign in to comment.