Skip to content
This repository has been archived by the owner on Mar 27, 2024. It is now read-only.

Commit

Permalink
feat: GetBulk method for new CouchDB storage implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Trider <[email protected]>
  • Loading branch information
Derek Trider committed Jan 14, 2021
1 parent ab74010 commit fa5e6c8
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 9 deletions.
114 changes: 109 additions & 5 deletions component/newstorage/couchdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (

// Hardcoded strings returned from Kivik/CouchDB that we check for.
docNotFoundErrMsgFromKivik = "Not Found: missing"
bulkGetDocNotFoundErrMsgFromKivik = "not_found: missing"
docDeletedErrMsgFromKivik = "Not Found: deleted"
databaseNotFoundErrMsgFromKivik = "Not Found: Database does not exist."
documentUpdateConflictErrMsgFromKivik = "Conflict: Document update conflict."
Expand Down Expand Up @@ -61,6 +62,7 @@ type db interface {
Put(ctx context.Context, docID string, doc interface{}, options ...kivik.Options) (rev string, err error)
Find(ctx context.Context, query interface{}, options ...kivik.Options) (*kivik.Rows, error)
Delete(ctx context.Context, docID, rev string, options ...kivik.Options) (newRev string, err error)
BulkGet(ctx context.Context, docs []kivik.BulkGetReference, options ...kivik.Options) (*kivik.Rows, error)
Close(ctx context.Context) error
}

Expand Down Expand Up @@ -339,7 +341,7 @@ func (s *Store) Get(k string) ([]byte, error) {
return nil, fmt.Errorf(failureWhileScanningRow, err)
}

storedValue, err := getValueFromRawDoc(rawDoc, payloadFieldKey)
storedValue, err := getStringValueFromRawDoc(rawDoc, payloadFieldKey)
if err != nil {
return nil, fmt.Errorf("failed to get payload from raw document: %w", err)
}
Expand Down Expand Up @@ -377,7 +379,21 @@ func (s *Store) GetTags(k string) ([]newstorage.Tag, error) {
// GetBulk fetches the values associated with the given keys.
// If a key doesn't exist, then a nil []byte is returned for that value. It is not considered an error.
func (s *Store) GetBulk(keys ...string) ([][]byte, error) {
return nil, errors.New("not implemented")
if keys == nil {
return nil, errors.New("keys string slice cannot be nil")
}

rawDocs, err := s.getRawDocs(keys)
if err != nil {
return nil, fmt.Errorf("failure while getting raw CouchDB documents: %w", err)
}

values, err := s.getStoredValuesFromRawDocs(rawDocs, keys)
if err != nil {
return nil, fmt.Errorf("failure while getting stored values from raw docs: %w", err)
}

return values, nil
}

// Query returns all data that satisfies the expression. Expression format: TagName:TagValue.
Expand Down Expand Up @@ -535,14 +551,102 @@ func (s *Store) getRevID(k string) (string, error) {
return "", err
}

revID, err := getValueFromRawDoc(rawDoc, revIDFieldKey)
revID, err := getStringValueFromRawDoc(rawDoc, revIDFieldKey)
if err != nil {
return "", fmt.Errorf("failed to get revision ID from the raw document: %w", err)
}

return revID, nil
}

// getRawDocs returns the raw documents from CouchDB using a bulk REST call.
// If a document is not found, then the raw document will be nil. It is not considered an error.
func (s *Store) getRawDocs(keys []string) ([]map[string]interface{}, error) {
rawDocs := make([]map[string]interface{}, len(keys))

bulkGetReferences := make([]kivik.BulkGetReference, len(keys))

for i, key := range keys {
bulkGetReferences[i].ID = key
}

rows, err := s.db.BulkGet(context.Background(), bulkGetReferences)
if err != nil {
return nil, fmt.Errorf("failure while sending request to CouchDB bulk docs endpoint: %w", err)
}

ok := rows.Next()

if !ok {
return nil, errors.New("bulk get from CouchDB was unexpectedly empty")
}

for i := 0; i < len(rawDocs); i++ {
err := rows.ScanDoc(&rawDocs[i])
// In the getRawDoc method, Kivik actually returns a different error message if a document was deleted.
// When doing a bulk get, instead Kivik doesn't return an error message, and we have to check the "_deleted"
// field in the raw doc. This is done in the getRevIDs method.
// If the document wasn't found, we return a nil raw doc since we don't consider it to be an error.
if err != nil && !strings.Contains(err.Error(), bulkGetDocNotFoundErrMsgFromKivik) {
return nil, fmt.Errorf(failWhileScanResultRows, err)
}

ok := rows.Next()

// ok is expected to be false on the last doc.
if i < len(rawDocs)-1 {
if !ok {
return nil, errors.New("got fewer docs from CouchDB than expected")
}
} else {
if ok {
return nil, errors.New("got more docs from CouchDB than expected")
}
}
}

return rawDocs, nil
}

func (s *Store) getStoredValuesFromRawDocs(rawDocs []map[string]interface{}, keys []string) ([][]byte, error) {
storedValues := make([][]byte, len(keys))

for i, rawDoc := range rawDocs {
// If the rawDoc is nil, this means that the value could not be found.
// It is not considered an error.
if rawDoc == nil {
storedValues[i] = nil

continue
}

// CouchDB still returns a raw document if the key has been deleted, so if this is a "deleted" raw document
// then we need to return nil to indicate that the value could not be found
isDeleted, containsIsDeleted := rawDoc["_deleted"]
if containsIsDeleted {
isDeletedBool, ok := isDeleted.(bool)
if !ok {
return nil, errors.New("failed to assert the retrieved deleted field as a bool")
}

if isDeletedBool {
storedValues[i] = nil

continue
}
}

storedValue, err := getStringValueFromRawDoc(rawDoc, payloadFieldKey)
if err != nil {
return nil, fmt.Errorf(`failed to get the payload from the raw document: %w`, err)
}

storedValues[i] = []byte(storedValue)
}

return storedValues, nil
}

type couchDBResultsIterator struct {
store *Store
resultRows rows
Expand Down Expand Up @@ -753,7 +857,7 @@ func getValueFromRows(rows rows, rawDocKey string) (string, error) {
return "", fmt.Errorf(failWhileScanResultRows, err)
}

value, err := getValueFromRawDoc(rawDoc, rawDocKey)
value, err := getStringValueFromRawDoc(rawDoc, rawDocKey)
if err != nil {
return "", fmt.Errorf(`failure while getting the value associated with the "%s" key`+
`from the raw document`, rawDocKey)
Expand All @@ -762,7 +866,7 @@ func getValueFromRows(rows rows, rawDocKey string) (string, error) {
return value, nil
}

func getValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string, error) {
func getStringValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string, error) {
value, ok := rawDoc[rawDocKey]
if !ok {
return "", fmt.Errorf(`"%s" is missing from the raw document`, rawDocKey)
Expand Down
15 changes: 15 additions & 0 deletions component/newstorage/couchdb/store_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (m *mockDB) Delete(context.Context, string, string, ...kivik.Options) (stri
return "", errors.New("mockDB Delete always fails")
}

func (m *mockDB) BulkGet(context.Context, []kivik.BulkGetReference, ...kivik.Options) (*kivik.Rows, error) {
return nil, errors.New("mockDB BulkGet always fails")
}

func (m *mockDB) Close(context.Context) error {
return errors.New("mockDB Close always fails")
}
Expand Down Expand Up @@ -174,6 +178,17 @@ func TestStore_Get_Internal(t *testing.T) {
})
}

func TestStore_GetBulk_Internal(t *testing.T) {
t.Run("Failure while getting raw CouchDB documents", func(t *testing.T) {
store := &Store{db: &mockDB{}}

values, err := store.GetBulk("key")
require.EqualError(t, err, "failure while getting raw CouchDB documents: "+
"failure while sending request to CouchDB bulk docs endpoint: mockDB BulkGet always fails")
require.Nil(t, values)
})
}

func TestStore_Query_Internal(t *testing.T) {
t.Run("Failure sending tag name only query to find endpoint", func(t *testing.T) {
store := &Store{db: &mockDB{}}
Expand Down
16 changes: 12 additions & 4 deletions component/newstorage/couchdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,18 @@ func TestStore_GetTags(t *testing.T) {
}

func TestStore_GetBulk(t *testing.T) {
t.Run("Failure - not implemented", func(t *testing.T) {
store := &Store{}
_, err := store.GetBulk()
require.EqualError(t, err, "not implemented")
t.Run("Failure: keys string slice cannot be nil", func(t *testing.T) {
provider, err := NewProvider(couchDBURL, WithDBPrefix("prefix"))
require.NoError(t, err)
require.NotNil(t, provider)

store, err := provider.OpenStore(randomStoreName())
require.NoError(t, err)
require.NotNil(t, store)

values, err := store.GetBulk(nil...)
require.EqualError(t, err, "keys string slice cannot be nil")
require.Nil(t, values)
})
}

Expand Down
77 changes: 77 additions & 0 deletions test/component/newstorage/newstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func TestAll(t *testing.T, provider newstorage.Provider) {
t.Run("Store Put and Get", func(t *testing.T) {
TestPutGet(t, provider)
})
t.Run("Store GetBulk and Get", func(t *testing.T) {
TestGetBulk(t, provider)
})
t.Run("Delete", func(t *testing.T) {
TestDelete(t, provider)
})
Expand Down Expand Up @@ -86,6 +89,80 @@ func TestPutGet(t *testing.T, provider newstorage.Provider) {
tryNilOrBlankValues(t, store1, data, commonKey)
}

// TestGetBulk tests common GetBulk functionality.
func TestGetBulk(t *testing.T, provider newstorage.Provider) { //nolint: funlen // Test file
t.Run("Success: all values found", func(t *testing.T) {
store, err := provider.OpenStore(randomStoreName())
require.NoError(t, err)
require.NotNil(t, store)

err = store.Put("key1", []byte("value1"),
[]newstorage.Tag{
{Name: "tagName1", Value: "tagValue1"},
{Name: "tagName2", Value: "tagValue2"},
}...)
require.NoError(t, err)

err = store.Put("key2", []byte("value2"),
[]newstorage.Tag{
{Name: "tagName1", Value: "tagValue1"},
{Name: "tagName2", Value: "tagValue2"},
}...)
require.NoError(t, err)

values, err := store.GetBulk("key1", "key2")
require.NoError(t, err)
require.Len(t, values, 2)
require.Equal(t, "value1", string(values[0]))
require.Equal(t, "value2", string(values[1]))
})
t.Run("Success: one value found, one not", func(t *testing.T) {
store, err := provider.OpenStore(randomStoreName())
require.NoError(t, err)
require.NotNil(t, store)

err = store.Put("key1", []byte("value1"),
[]newstorage.Tag{
{Name: "tagName1", Value: "tagValue1"},
{Name: "tagName2", Value: "tagValue2"},
}...)
require.NoError(t, err)

values, err := store.GetBulk("key1", "key2")
require.NoError(t, err)
require.Len(t, values, 2)
require.Equal(t, "value1", string(values[0]))
require.Nil(t, values[1])
})
t.Run("Success: no values found", func(t *testing.T) {
store, err := provider.OpenStore(randomStoreName())
require.NoError(t, err)
require.NotNil(t, store)

err = store.Put("key1", []byte("value1"),
[]newstorage.Tag{
{Name: "tagName1", Value: "tagValue1"},
{Name: "tagName2", Value: "tagValue2"},
}...)
require.NoError(t, err)

values, err := store.GetBulk("key3", "key4")
require.NoError(t, err)
require.Len(t, values, 2)
require.Nil(t, values[0])
require.Nil(t, values[1])
})
t.Run("Failure: keys string slice cannot be nil", func(t *testing.T) {
store, err := provider.OpenStore(randomStoreName())
require.NoError(t, err)
require.NotNil(t, store)

values, err := store.GetBulk(nil...)
require.EqualError(t, err, "keys string slice cannot be nil")
require.Nil(t, values)
})
}

// TestDelete tests common Delete functionality.
func TestDelete(t *testing.T, provider newstorage.Provider) {
t.Helper()
Expand Down

0 comments on commit fa5e6c8

Please sign in to comment.