Skip to content

Commit

Permalink
Merge pull request #84 from guregu/tx
Browse files Browse the repository at this point in the history
Transactions
  • Loading branch information
guregu authored Jan 8, 2019
2 parents bb702e3 + 09352f5 commit d8bb6a0
Show file tree
Hide file tree
Showing 13 changed files with 654 additions and 7 deletions.
6 changes: 2 additions & 4 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package dynamo
import (
"testing"
"time"

"github.com/guregu/toki"
)

var (
Expand All @@ -16,7 +14,7 @@ var (
Page: 1,
SkipThis: "i should disappear",
Bonus: &arbitraryNumber,
TestText: toki.MustParseTime("1:2:3"),
TestText: time.Now(),
StringSlice: []string{"A", "B", "C", "QQQ"},
embedMe: embedMe{
Extra: true,
Expand Down Expand Up @@ -117,7 +115,7 @@ type fancyObject struct {
SkipThis string `dynamo:"-"`
Bonus *int `dynamo:",omitempty"`

TestText toki.Time
TestText time.Time
SkipMePlz time.Time `dynamo:",omitempty"`

StringSlice []string
Expand Down
95 changes: 95 additions & 0 deletions conditioncheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package dynamo

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
)

// ConditionCheck represents a condition for a write transaction to succeed.
// It is used along with WriteTx.Check.
type ConditionCheck struct {
table Table
hashKey string
hashValue *dynamodb.AttributeValue
rangeKey string
rangeValue *dynamodb.AttributeValue

condition string
subber

err error
}

// Check creates a new ConditionCheck, which represents a condition for a write transaction to succeed.
// hashKey specifies the name of the table's hash key and value specifies the value of the hash key.
// You must use Range to specify a range key for tables with hash and range keys.
func (table Table) Check(hashKey string, value interface{}) *ConditionCheck {
check := &ConditionCheck{
table: table,
hashKey: hashKey,
}
check.hashValue, check.err = marshal(value, "")
return check
}

// Range specifies the name and value of the range key for this item.
func (check *ConditionCheck) Range(rangeKey string, value interface{}) *ConditionCheck {
check.rangeKey = rangeKey
var err error
check.rangeValue, err = marshal(value, "")
check.setError(err)
return check
}

// If specifies a conditional expression for this coniditon check to succeed.
// Use single quotes to specificy reserved names inline (like 'Count').
// Use the placeholder ? within the expression to substitute values, and use $ for names.
// You need to use quoted or placeholder names when the name is a reserved word in DynamoDB.
func (check *ConditionCheck) If(expr string, args ...interface{}) *ConditionCheck {
cond, err := check.subExpr(expr, args...)
check.setError(err)
check.condition = cond
return check
}

// IfExists sets this check to succeed if the item exists.
func (check *ConditionCheck) IfExists() *ConditionCheck {
return check.If("attribute_exists($)", check.hashKey)
}

// IfNotExists sets this check to succeed if the item does not exist.
func (check *ConditionCheck) IfNotExists() *ConditionCheck {
return check.If("attribute_not_exists($)", check.hashKey)
}

func (check *ConditionCheck) writeTxItem() (*dynamodb.TransactWriteItem, error) {
if check.err != nil {
return nil, check.err
}
item := &dynamodb.ConditionCheck{
TableName: aws.String(check.table.name),
Key: check.keys(),
ExpressionAttributeNames: check.nameExpr,
ExpressionAttributeValues: check.valueExpr,
}
if check.condition != "" {
item.ConditionExpression = aws.String(check.condition)
}
return &dynamodb.TransactWriteItem{
ConditionCheck: item,
}, nil
}

func (check *ConditionCheck) keys() map[string]*dynamodb.AttributeValue {
keys := map[string]*dynamodb.AttributeValue{check.hashKey: check.hashValue}
if check.rangeKey != "" {
keys[check.rangeKey] = check.rangeValue
}
return keys
}

func (check *ConditionCheck) setError(err error) {
if check.err == nil {
check.err = err
}
}
17 changes: 17 additions & 0 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ func (d *Delete) deleteInput() *dynamodb.DeleteItemInput {
return input
}

func (d *Delete) writeTxItem() (*dynamodb.TransactWriteItem, error) {
if d.err != nil {
return nil, d.err
}
input := d.deleteInput()
item := &dynamodb.TransactWriteItem{
Delete: &dynamodb.Delete{
TableName: input.TableName,
Key: input.Key,
ExpressionAttributeNames: input.ExpressionAttributeNames,
ExpressionAttributeValues: input.ExpressionAttributeValues,
ConditionExpression: input.ConditionExpression,
},
}
return item, nil
}

func (d *Delete) key() map[string]*dynamodb.AttributeValue {
key := map[string]*dynamodb.AttributeValue{
d.hashKey: d.hashValue,
Expand Down
25 changes: 25 additions & 0 deletions encode.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dynamo

import (
"bytes"
"encoding"
"fmt"
"reflect"
Expand Down Expand Up @@ -443,3 +444,27 @@ func isZero(rv reflect.Value) bool {
z := reflect.Zero(rv.Type())
return rv.Interface() == z.Interface()
}

// only works for primary key types
func isAVEqual(a, b *dynamodb.AttributeValue) bool {
if a.S != nil {
if b.S == nil {
return false
}
return *a.S == *b.S
}
if a.N != nil {
if b.N == nil {
return false
}
// TODO: parse numbers?
return *a.N == *b.N
}
if a.B != nil {
if b.B == nil {
return false
}
return bytes.Equal(a.B, b.B)
}
return false
}
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/guregu/dynamo

require (
github.com/aws/aws-sdk-go v1.16.15
github.com/cenkalti/backoff v2.1.1+incompatible
github.com/gofrs/uuid v3.1.0+incompatible
github.com/stretchr/testify v1.3.0 // indirect
golang.org/x/net v0.0.0-20190108155000-395948e2f546
golang.org/x/text v0.3.0 // indirect
)
19 changes: 19 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
github.com/aws/aws-sdk-go v1.16.15 h1:kQyxfRyjAwIYjf0225sn/pn+WAlncKyI8dmT3+ItMFE=
github.com/aws/aws-sdk-go v1.16.15/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY=
github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofrs/uuid v3.1.0+incompatible h1:q2rtkjaKT4YEr6E1kamy0Ha4RtepWlQBedyHx0uzKwA=
github.com/gofrs/uuid v3.1.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/net v0.0.0-20190108155000-395948e2f546 h1:tkMg6+6TF2qZ/3I8fw+DiNgPSsABxdVIqWE90w8Vxzk=
golang.org/x/net v0.0.0-20190108155000-395948e2f546/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
19 changes: 19 additions & 0 deletions put.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,25 @@ func (p *Put) input() *dynamodb.PutItemInput {
return input
}

func (p *Put) writeTxItem() (*dynamodb.TransactWriteItem, error) {
if p.err != nil {
return nil, p.err
}
input := p.input()
item := &dynamodb.TransactWriteItem{
Put: &dynamodb.Put{
TableName: input.TableName,
Item: input.Item,
ExpressionAttributeNames: input.ExpressionAttributeNames,
ExpressionAttributeValues: input.ExpressionAttributeValues,
ConditionExpression: input.ConditionExpression,
// TODO: add support when aws-sdk-go updates
// ReturnValuesOnConditionCheckFailure: aws.String(dynamodb.ReturnValuesOnConditionCheckFailureAllOld),
},
}
return item, nil
}

func (p *Put) setError(err error) {
if p.err != nil {
p.err = err
Expand Down
18 changes: 16 additions & 2 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
// "github.com/davecgh/go-spew/spew"
)

// Query is a request to get one or more items in a table.
Expand Down Expand Up @@ -39,7 +38,7 @@ type Query struct {
}

var (
// ErrNotFound is returned when no items could be found in Get or OldValue a and similar operations.
// ErrNotFound is returned when no items could be found in Get or OldValue and similar operations.
ErrNotFound = errors.New("dynamo: no item found")
// ErrTooMany is returned when one item was requested, but the query returned multiple items.
ErrTooMany = errors.New("dynamo: too many items")
Expand Down Expand Up @@ -525,6 +524,21 @@ func (q *Query) getItemInput() *dynamodb.GetItemInput {
return req
}

func (q *Query) getTxItem() (*dynamodb.TransactGetItem, error) {
if !q.canGetItem() {
return nil, errors.New("dynamo: transaction Query is too complex; no indexes or filters are allowed")
}
input := q.getItemInput()
return &dynamodb.TransactGetItem{
Get: &dynamodb.Get{
TableName: input.TableName,
Key: input.Key,
ExpressionAttributeNames: input.ExpressionAttributeNames,
ProjectionExpression: input.ProjectionExpression,
},
}, nil
}

func (q *Query) keys() map[string]*dynamodb.AttributeValue {
keys := map[string]*dynamodb.AttributeValue{
q.hashKey: q.hashValue,
Expand Down
10 changes: 10 additions & 0 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (dt *DeleteTable) input() *dynamodb.DeleteTableInput {
type ConsumedCapacity struct {
// Total is the total number of capacity units consumed during this operation.
Total float64
// Read is the total number of read capacity units consumed during this operation.
Read float64
// Write is the total number of write capacity units consumed during this operation.
Write float64
// GSI is a map of Global Secondary Index names to consumed capacity units.
GSI map[string]float64
// GSI is a map of Local Secondary Index names to consumed capacity units.
Expand All @@ -93,6 +97,12 @@ func addConsumedCapacity(cc *ConsumedCapacity, raw *dynamodb.ConsumedCapacity) {
if raw.CapacityUnits != nil {
cc.Total += *raw.CapacityUnits
}
if raw.ReadCapacityUnits != nil {
cc.Read += *raw.ReadCapacityUnits
}
if raw.WriteCapacityUnits != nil {
cc.Write += *raw.WriteCapacityUnits
}
if len(raw.GlobalSecondaryIndexes) > 0 {
if cc.GSI == nil {
cc.GSI = make(map[string]float64, len(raw.GlobalSecondaryIndexes))
Expand Down
Loading

0 comments on commit d8bb6a0

Please sign in to comment.