Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding concurrent batch writes #96

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,14 @@ err := db.Table("Books").Get("ID", 555).One(dynamo.AWSEncoding(&someBook))

By default, tests are run in offline mode. Create a table called `TestDB`, with a Number Parition Key called `UserID` and a String Sort Key called `Time`. Change the table name with the environment variable `DYNAMO_TEST_TABLE`. You must specify `DYNAMO_TEST_REGION`, setting it to the AWS region where your test table is.

```bash
```bash
DYNAMO_TEST_REGION=us-west-2 go test github.com/guregu/dynamo/... -cover
```

Or simply run the following command to test it locally :

```bash
./run_tests.sh
```

### License
Expand Down
18 changes: 16 additions & 2 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

const batchSize = 101

func TestBatchGetWrite(t *testing.T) {
func testBatchGetWrite(t *testing.T, isSequential bool) {
if testDB == nil {
t.Skip(offlineSkipMsg)
}
Expand All @@ -29,7 +29,13 @@ func TestBatchGetWrite(t *testing.T) {
}

var wcc ConsumedCapacity
wrote, err := table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).Run()
var wrote int
var err error
if isSequential {
wrote, err = table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).Run()
} else {
wrote, err = table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).RunConcurrently()
}
if wrote != batchSize {
t.Error("unexpected wrote:", wrote, "≠", batchSize)
}
Expand Down Expand Up @@ -90,3 +96,11 @@ func TestBatchGetWrite(t *testing.T) {
t.Error("expected 0 results, got", len(results))
}
}

func TestSequentialBatchGetWrite(t *testing.T) {
testBatchGetWrite(t, true)
}

func TestConcurrentBatchGetWrite(t *testing.T) {
testBatchGetWrite(t, false)
}
177 changes: 177 additions & 0 deletions batchwrite.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package dynamo

import (
"errors"
"math"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/cenkalti/backoff"
multierror "github.com/hashicorp/go-multierror"
)

// DynamoDB API limit, 25 operations per request
Expand Down Expand Up @@ -61,13 +63,188 @@ func (bw *BatchWrite) ConsumedCapacity(cc *ConsumedCapacity) *BatchWrite {
return bw
}

// Structure passed to the concurrent batch write operation
type batchRequest struct {
ctx aws.Context
ops []*dynamodb.WriteRequest
}

// Structure returned after a concurrent batch operation
type batchResponse struct {
Result *dynamodb.BatchWriteItemOutput
Error error
Wrote int
}

// Config used when calling RunConcurrently
type batchWriteConfig struct {
poolSize int
}

// Parameter type to be passed to RunConcurrently
type BatchWriteOption func(*batchWriteConfig)

// Sets the default config
func defaults(cfg *batchWriteConfig) {
cfg.poolSize = 10
}

// Sets the pool size to process the request
func WithPoolSize(poolSize int) BatchWriteOption {
return func(cfg *batchWriteConfig) {
cfg.poolSize = poolSize
}
}

func (bw *BatchWrite) writeBatch(ctx aws.Context, ops []*dynamodb.WriteRequest) batchResponse {

boff := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
wrote := 0

for {
var res *dynamodb.BatchWriteItemOutput
req := bw.input(ops)
err := retry(ctx, func() error {
var err error
res, err = bw.batch.table.db.client.BatchWriteItemWithContext(ctx, req)
return err
})
if err != nil {
return batchResponse{
Result: res,
Error: err,
Wrote: 0,
}
}
if bw.cc != nil {
for _, cc := range res.ConsumedCapacity {
addConsumedCapacity(bw.cc, cc)
}
}

unprocessed := res.UnprocessedItems[bw.batch.table.Name()]
wrote = len(ops) - len(unprocessed)
if len(unprocessed) == 0 {
return batchResponse{
Result: res,
Error: err,
Wrote: wrote,
}
}
ops = unprocessed

// need to sleep when re-requesting, per spec
if err := aws.SleepWithContext(ctx, boff.NextBackOff()); err != nil {
return batchResponse{
Result: nil,
Error: err,
Wrote: wrote,
}
}
}
}

func (bw *BatchWrite) writeBatchWorker(worker int, requests <-chan batchRequest, response chan<- batchResponse) {
for request := range requests {
response <- bw.writeBatch(request.ctx, request.ops)
}
}

func splitBatches(requests []*dynamodb.WriteRequest) (batches [][]*dynamodb.WriteRequest) {
batches = [][]*dynamodb.WriteRequest{}
requestsLength := len(requests)
for i := 0; i < requestsLength; i += maxWriteOps {
end := i + maxWriteOps
if end > requestsLength {
end = requestsLength
}
batches = append(batches, requests[i:end])
}
return batches
}

func min(a int, b int) int {
if a < b {
return a
}
return b
}

// RunConcurrently executes this batch concurrently with the pool size specified.
// By default, the pool size is 10
func (bw *BatchWrite) RunConcurrently(opts ...BatchWriteOption) (wrote int, err error) {
ctx, cancel := defaultContext()
defer cancel()
return bw.RunConcurrentlyWithContext(ctx, opts...)
}

func (bw *BatchWrite) RunConcurrentlyWithContext(ctx aws.Context, opts ...BatchWriteOption) (wrote int, err error) {

if bw.err != nil {
return 0, bw.err
}

cfg := new(batchWriteConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not really using the notion of your threads really. I think it's overly complicating your operation as it's currently implemented. All you are really doing here is spinning up a go routine for each batch you're running, and you'll make too many operation if your threads number is larger than your total batches.

If you were truly concerned about how many go routines were concurrently running, you'd be making a worker pool, like https://gobyexample.com/worker-pools and send your batches to be processed by the worker pool. I'd also not call these things "Threads" as a common Go convention would be to refer to it as "Pool Size".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try this. Thanks!

defaults(cfg)
for _, fn := range opts {
fn(cfg)
}

// TODO : Can split the batches and run them concurrently ?
batches := splitBatches(bw.ops)
totalBatches := len(batches)

requests := make(chan batchRequest, totalBatches)
response := make(chan batchResponse, totalBatches)
defer close(response)

// Create the workers
for i := 0; i < cfg.poolSize; i++ {
go bw.writeBatchWorker(i, requests, response)
}

// Push the write requests
for i := 0; i < totalBatches; i++ {
requests <- batchRequest{
ctx: ctx,
ops: batches[i],
}
}
close(requests)

// Capture the response
wrote = 0
batchCounter := 0
for {
select {
case batchResponse, ok := <-response:
if !ok {
err = multierror.Append(err, errors.New("channel unexpectedly closed"))
return wrote, err
}
if batchResponse.Error != nil {
err = multierror.Append(err, batchResponse.Error)
}
wrote += batchResponse.Wrote
batchCounter++
if batchCounter == totalBatches {
return wrote, err
}
case <-ctx.Done():
err = multierror.Append(err, ctx.Err())
return wrote, err
}
}
}

// Run executes this batch.
// For batches with more than 25 operations, an error could indicate that
// some records have been written and some have not. Consult the wrote
// return amount to figure out which operations have succeeded.
func (bw *BatchWrite) Run() (wrote int, err error) {
ctx, cancel := defaultContext()
defer cancel()
// TODO : Perhaps use RunConcurrentlyWithContext(dynamo.WithPoolSize(1)) instead ?
return bw.RunWithContext(ctx)
}

Expand Down
9 changes: 7 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ var (
const offlineSkipMsg = "DYNAMO_TEST_REGION not set"

func init() {
if region := os.Getenv("DYNAMO_TEST_REGION"); region != "" {
testDB = New(session.New(), &aws.Config{Region: aws.String(region)})
region := os.Getenv("DYNAMO_TEST_REGION")
endpoint := os.Getenv("DYNAMO_ENDPOINT")
if region != "" && endpoint != "" {
testDB = New(session.New(), &aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
})
}
if table := os.Getenv("DYNAMO_TEST_TABLE"); table != "" {
testTable = table
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ require (
github.com/cenkalti/backoff v2.1.1+incompatible
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gofrs/uuid v3.2.0+incompatible
github.com/hashicorp/go-multierror v1.0.0
github.com/stretchr/testify v1.3.0 // indirect
golang.org/x/net v0.0.0-20190318221613-d196dffd7c2b
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
27 changes: 27 additions & 0 deletions run_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env bash

docker rm -f dynamodb > /dev/null
docker run --name dynamodb -p 8000:8000 amazon/dynamodb-local > /dev/null &


export DYNAMO_ENDPOINT="http://localhost:8000"
export DYNAMO_TEST_REGION="us-west-2"
export DYNAMO_TEST_TABLE="TestDB"

aws dynamodb delete-table \
--table-name $DYNAMO_TEST_TABLE \
--endpoint-url $DYNAMO_ENDPOINT > /dev/null 2>&1

aws dynamodb create-table \
--table-name $DYNAMO_TEST_TABLE \
--attribute-definitions \
AttributeName=UserID,AttributeType=N \
AttributeName=Time,AttributeType=S \
--key-schema \
AttributeName=UserID,KeyType=HASH \
AttributeName=Time,KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=1000,WriteCapacityUnits=1000 \
--region $DYNAMO_TEST_REGION \
--endpoint-url $DYNAMO_ENDPOINT > /dev/null

go test . -cover