Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RequestLimit #234

Merged
merged 2 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ var typeCache sync.Map // unmarshalKey → *typedef
type typedef struct {
decoders map[unmarshalKey]decodeFunc
fields []structField
root reflect.Type
info *structInfo
}

func newTypedef(rt reflect.Type) (*typedef, error) {
def := &typedef{
decoders: make(map[unmarshalKey]decodeFunc),
// encoders: make(map[encodeKey]encodeFunc),
root: rt,
}
err := def.init(rt)
return def, err
Expand Down
31 changes: 21 additions & 10 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,11 @@ func (q *Query) SearchLimit(limit int64) *Query {
}

// RequestLimit specifies the maximum amount of requests to make against DynamoDB's API.
// func (q *Query) RequestLimit(limit int) *Query {
// q.reqLimit = limit
// return q
// }
// A limit of zero or less means unlimited requests.
func (q *Query) RequestLimit(limit int) *Query {
q.reqLimit = limit
return q
}

// Order specifies the desired result order.
// Requires a range key (a.k.a. partition key) to be specified.
Expand Down Expand Up @@ -286,22 +287,29 @@ func (q *Query) CountWithContext(ctx context.Context) (int64, error) {
return 0, q.err
}

var count int64
var count, scanned int64
var reqs int
var res *dynamodb.QueryOutput
for {
req := q.queryInput()
req.Select = selectCount
input := q.queryInput()
input.Select = selectCount

err := q.table.db.retry(ctx, func() error {
var err error
res, err = q.table.db.client.QueryWithContext(ctx, req)
res, err = q.table.db.client.QueryWithContext(ctx, input)
if err != nil {
return err
}
reqs++

if res.Count == nil {
return errors.New("nil count")
return errors.New("malformed DynamoDB response: count is nil")
}
count += *res.Count
if res.ScannedCount != nil {
scanned += *res.ScannedCount
}

return nil
})
if err != nil {
Expand All @@ -312,7 +320,10 @@ func (q *Query) CountWithContext(ctx context.Context) (int64, error) {
}

q.startKey = res.LastEvaluatedKey
if res.LastEvaluatedKey == nil || q.searchLimit > 0 {
if res.LastEvaluatedKey == nil ||
(q.limit > 0 && count >= q.limit) ||
(q.searchLimit > 0 && scanned >= q.searchLimit) ||
(q.reqLimit > 0 && reqs >= q.reqLimit) {
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ type encodeKey struct {

type structInfo struct {
root reflect.Type
parent *structInfo
fields map[string]*structField // by name
refs map[encodeKey][]*structField
types map[encodeKey]encodeFunc
zeros map[reflect.Type]func(reflect.Value) bool
parent *structInfo

seen map[encodeKey]struct{}
queue []encodeKey
Expand Down
32 changes: 19 additions & 13 deletions scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dynamo

import (
"context"
"errors"
"strings"
"sync"

Expand Down Expand Up @@ -131,10 +132,11 @@ func (s *Scan) SearchLimit(limit int64) *Scan {
}

// RequestLimit specifies the maximum amount of requests to make against DynamoDB's API.
// func (s *Scan) RequestLimit(limit int) *Scan {
// s.reqLimit = limit
// return s
// }
// A limit of zero or less means unlimited requests.
func (s *Scan) RequestLimit(limit int) *Scan {
s.reqLimit = limit
return s
}

// ConsumedCapacity will measure the throughput capacity consumed by this operation and add it to cc.
func (s *Scan) ConsumedCapacity(cc *ConsumedCapacity) *Scan {
Expand Down Expand Up @@ -260,6 +262,7 @@ func (s *Scan) CountWithContext(ctx context.Context) (int64, error) {
var count, scanned int64
input := s.scanInput()
input.Select = aws.String(dynamodb.SelectCount)
var reqs int
for {
var out *dynamodb.ScanOutput
err := s.table.db.retry(ctx, func() error {
Expand All @@ -268,23 +271,26 @@ func (s *Scan) CountWithContext(ctx context.Context) (int64, error) {
return err
})
if err != nil {
return count, err
return 0, err
}
reqs++

if out.Count == nil {
return count, errors.New("malformed DynamoDB outponse: count is nil")
}
count += *out.Count
scanned += *out.ScannedCount
if out.ScannedCount != nil {
scanned += *out.ScannedCount
}

if s.cc != nil {
addConsumedCapacity(s.cc, out.ConsumedCapacity)
}

if s.limit > 0 && count >= s.limit {
break
}
if s.searchLimit > 0 && scanned >= s.searchLimit {
break
}
if out.LastEvaluatedKey == nil {
if out.LastEvaluatedKey == nil ||
(s.limit > 0 && count >= s.limit) ||
(s.searchLimit > 0 && scanned >= s.searchLimit) ||
(s.reqLimit > 0 && reqs >= s.reqLimit) {
break
}

Expand Down