Skip to content

Commit

Permalink
add table waiters (#193)
Browse files Browse the repository at this point in the history
* add table waiters
  • Loading branch information
guregu authored Mar 21, 2022
1 parent 5734816 commit 757bab6
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 2 deletions.
18 changes: 17 additions & 1 deletion createtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,14 @@ func (ct *CreateTable) SSEEncryption(enabled bool, keyID string, sseType SSEType
return ct
}

// Run creates this table or returns and error.
// Run creates this table or returns an error.
func (ct *CreateTable) Run() error {
ctx, cancel := defaultContext()
defer cancel()
return ct.RunWithContext(ctx)
}

// RunWithContext creates this table or returns an error.
func (ct *CreateTable) RunWithContext(ctx aws.Context) error {
if ct.err != nil {
return ct.err
Expand All @@ -248,6 +249,21 @@ func (ct *CreateTable) RunWithContext(ctx aws.Context) error {
})
}

// Wait creates this table and blocks until it exists and is ready to use.
func (ct *CreateTable) Wait() error {
ctx, cancel := defaultContext()
defer cancel()
return ct.WaitWithContext(ctx)
}

// WaitWithContext creates this table and blocks until it exists and is ready to use.
func (ct *CreateTable) WaitWithContext(ctx aws.Context) error {
if err := ct.RunWithContext(ctx); err != nil {
return err
}
return ct.db.Table(ct.tableName).WaitWithContext(ctx)
}

func (ct *CreateTable) from(rv reflect.Value) error {
switch rv.Kind() {
case reflect.Struct: // ok
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ require (
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd
)

go 1.12
go 1.13
8 changes: 8 additions & 0 deletions retry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dynamo

import (
"errors"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -46,7 +47,14 @@ func retry(ctx aws.Context, f func() error) error {
}
}

// errRetry is a sentinel error to retry, should never be returned to user
var errRetry = errors.New("dynamo: retry")

func canRetry(err error) bool {
if errors.Is(err, errRetry) {
return true
}

if ae, ok := err.(awserr.RequestFailure); ok {
switch ae.StatusCode() {
case 500, 503:
Expand Down
69 changes: 69 additions & 0 deletions table.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package dynamo

import (
"context"
"errors"
"fmt"
"sync/atomic"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
)

// Status is an enumeration of table and index statuses.
type Status string

// Table and index statuses.
const (
// The table or index is ready for use.
ActiveStatus Status = "ACTIVE"
Expand All @@ -20,6 +24,10 @@ const (
UpdatingStatus Status = "UPDATING"
// The table or index is being deleted.
DeletingStatus Status = "DELETING"

// NotExistsStatus is a special status you can pass to table.Wait() to wait until a table doesn't exist.
// DescribeTable will return a ResourceNotFound AWS error instead of this.
NotExistsStatus Status = "_gone"
)

// Table is a DynamoDB table.
Expand All @@ -44,6 +52,52 @@ func (table Table) Name() string {
return table.name
}

// Wait blocks until this table's status matches any status provided by want.
// If no statuses are specified, the active status is used.
func (table Table) Wait(want ...Status) error {
ctx, cancel := defaultContext()
defer cancel()
return table.WaitWithContext(ctx, want...)
}

// Wait blocks until this table's status matches any status provided by want.
// If no statuses are specified, the active status is used.
func (table Table) WaitWithContext(ctx aws.Context, want ...Status) error {
if len(want) == 0 {
want = []Status{ActiveStatus}
}
wantGone := false
for _, status := range want {
if status == NotExistsStatus {
wantGone = true
}
}

err := retry(ctx, func() error {
desc, err := table.Describe().RunWithContext(ctx)
var aerr awserr.RequestFailure
if errors.As(err, &aerr) {
if aerr.Code() == "ResourceNotFoundException" {
if wantGone {
return nil
}
return errRetry
}
}
if err != nil {
return err
}

for _, status := range want {
if status == desc.Status {
return nil
}
}
return errRetry
})
return err
}

// primaryKeys attempts to determine this table's primary keys.
// It will try:
// - output LastEvaluatedKey
Expand Down Expand Up @@ -142,6 +196,21 @@ func (dt *DeleteTable) RunWithContext(ctx aws.Context) error {
})
}

// Wait executes this request and blocks until the table is finished deleting.
func (dt *DeleteTable) Wait() error {
ctx, cancel := defaultContext()
defer cancel()
return dt.WaitWithContext(ctx)
}

// WaitWithContext executes this request and blocks until the table is finished deleting.
func (dt *DeleteTable) WaitWithContext(ctx context.Context) error {
if err := dt.RunWithContext(ctx); err != nil {
return err
}
return dt.table.WaitWithContext(ctx, NotExistsStatus)
}

func (dt *DeleteTable) input() *dynamodb.DeleteTableInput {
name := dt.table.Name()
return &dynamodb.DeleteTableInput{
Expand Down
35 changes: 35 additions & 0 deletions table_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,48 @@
package dynamo

import (
"fmt"
"reflect"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
)

func TestTableLifecycle(t *testing.T) {
if testDB == nil {
t.Skip(offlineSkipMsg)
}

now := time.Now().UTC()
name := fmt.Sprintf("TestDB-%d", now.UnixNano())

// example from the docs
type UserAction struct {
UserID string `dynamo:"ID,hash" index:"Seq-ID-index,range"`
Time time.Time `dynamo:",range"`
Seq int64 `localIndex:"ID-Seq-index,range" index:"Seq-ID-index,hash"`
UUID string `index:"UUID-index,hash"`
}

// create & wait
if err := testDB.CreateTable(name, UserAction{}).Wait(); err != nil {
t.Fatal(err)
}

// make sure it really works
table := testDB.Table(name)
if err := table.Put(UserAction{UserID: "test", Time: now, Seq: 1, UUID: "42"}).Run(); err != nil {
t.Fatal(err)
}

// delete & wait
if err := testDB.Table(name).DeleteTable().Wait(); err != nil {
t.Fatal(err)
}
}

func TestAddConsumedCapacity(t *testing.T) {
raw := &dynamodb.ConsumedCapacity{
TableName: aws.String("TestTable"),
Expand Down

0 comments on commit 757bab6

Please sign in to comment.