Skip to content
This repository has been archived by the owner on Mar 27, 2024. It is now read-only.

Commit

Permalink
feat: GetBulk method for new CouchDB storage implementation
Browse files Browse the repository at this point in the history
- Also includes a minor refactor for the getQueryOptions function that was added in my previous commit.

Signed-off-by: Derek Trider <[email protected]>
  • Loading branch information
Derek Trider committed Jan 14, 2021
1 parent ab74010 commit 91dd4c2
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 14 deletions.
121 changes: 112 additions & 9 deletions component/newstorage/couchdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import ( //nolint:gci // False positive, seemingly caused by the CouchDB driver
const (
couchDBUsersTable = "_users"
revIDFieldKey = "_rev"
deletedFieldKey = "_deleted"

designDocumentName = "AriesStorageDesignDocument"
payloadFieldKey = "payload"

// Hardcoded strings returned from Kivik/CouchDB that we check for.
docNotFoundErrMsgFromKivik = "Not Found: missing"
bulkGetDocNotFoundErrMsgFromKivik = "not_found: missing"
docDeletedErrMsgFromKivik = "Not Found: deleted"
databaseNotFoundErrMsgFromKivik = "Not Found: Database does not exist."
documentUpdateConflictErrMsgFromKivik = "Conflict: Document update conflict."
Expand Down Expand Up @@ -61,6 +63,7 @@ type db interface {
Put(ctx context.Context, docID string, doc interface{}, options ...kivik.Options) (rev string, err error)
Find(ctx context.Context, query interface{}, options ...kivik.Options) (*kivik.Rows, error)
Delete(ctx context.Context, docID, rev string, options ...kivik.Options) (newRev string, err error)
BulkGet(ctx context.Context, docs []kivik.BulkGetReference, options ...kivik.Options) (*kivik.Rows, error)
Close(ctx context.Context) error
}

Expand Down Expand Up @@ -339,7 +342,7 @@ func (s *Store) Get(k string) ([]byte, error) {
return nil, fmt.Errorf(failureWhileScanningRow, err)
}

storedValue, err := getValueFromRawDoc(rawDoc, payloadFieldKey)
storedValue, err := getStringValueFromRawDoc(rawDoc, payloadFieldKey)
if err != nil {
return nil, fmt.Errorf("failed to get payload from raw document: %w", err)
}
Expand Down Expand Up @@ -377,7 +380,21 @@ func (s *Store) GetTags(k string) ([]newstorage.Tag, error) {
// GetBulk fetches the values associated with the given keys.
// If a key doesn't exist, then a nil []byte is returned for that value. It is not considered an error.
func (s *Store) GetBulk(keys ...string) ([][]byte, error) {
return nil, errors.New("not implemented")
if keys == nil {
return nil, errors.New("keys string slice cannot be nil")
}

rawDocs, err := s.getRawDocs(keys)
if err != nil {
return nil, fmt.Errorf("failure while getting raw CouchDB documents: %w", err)
}

values, err := getPayloadsFromRawDocs(rawDocs)
if err != nil {
return nil, fmt.Errorf("failure while getting stored values from raw docs: %w", err)
}

return values, nil
}

// Query returns all data that satisfies the expression. Expression format: TagName:TagValue.
Expand Down Expand Up @@ -535,14 +552,39 @@ func (s *Store) getRevID(k string) (string, error) {
return "", err
}

revID, err := getValueFromRawDoc(rawDoc, revIDFieldKey)
revID, err := getStringValueFromRawDoc(rawDoc, revIDFieldKey)
if err != nil {
return "", fmt.Errorf("failed to get revision ID from the raw document: %w", err)
}

return revID, nil
}

// getRawDocs returns the raw documents from CouchDB using a bulk REST call.
// If a document is not found, then the raw document will be nil. It is not considered an error.
func (s *Store) getRawDocs(keys []string) ([]map[string]interface{}, error) {
bulkGetReferences := make([]kivik.BulkGetReference, len(keys))
for i, key := range keys {
bulkGetReferences[i].ID = key
}

rows, err := s.db.BulkGet(context.Background(), bulkGetReferences)
if err != nil {
return nil, fmt.Errorf("failure while sending request to CouchDB bulk docs endpoint: %w", err)
}

rawDocs, err := getRawDocsFromRows(rows)
if err != nil {
return nil, fmt.Errorf("failed to get raw documents from rows: %w", err)
}

if len(rawDocs) != len(keys) {
return nil, fmt.Errorf("received %d raw documents, but %d were expected", len(rawDocs), len(keys))
}

return rawDocs, nil
}

type couchDBResultsIterator struct {
store *Store
resultRows rows
Expand Down Expand Up @@ -733,15 +775,12 @@ func createIndexes(db *kivik.DB, tagNamesNeedIndexCreation []string) error {

func getQueryOptions(options []newstorage.QueryOption) newstorage.QueryOptions {
var queryOptions newstorage.QueryOptions
queryOptions.PageSize = 25

for _, option := range options {
option(&queryOptions)
}

if queryOptions.PageSize == 0 {
queryOptions.PageSize = 25
}

return queryOptions
}

Expand All @@ -753,7 +792,7 @@ func getValueFromRows(rows rows, rawDocKey string) (string, error) {
return "", fmt.Errorf(failWhileScanResultRows, err)
}

value, err := getValueFromRawDoc(rawDoc, rawDocKey)
value, err := getStringValueFromRawDoc(rawDoc, rawDocKey)
if err != nil {
return "", fmt.Errorf(`failure while getting the value associated with the "%s" key`+
`from the raw document`, rawDocKey)
Expand All @@ -762,7 +801,7 @@ func getValueFromRows(rows rows, rawDocKey string) (string, error) {
return value, nil
}

func getValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string, error) {
func getStringValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string, error) {
value, ok := rawDoc[rawDocKey]
if !ok {
return "", fmt.Errorf(`"%s" is missing from the raw document`, rawDocKey)
Expand All @@ -778,6 +817,70 @@ func getValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string
return valueString, nil
}

func getPayloadsFromRawDocs(rawDocs []map[string]interface{}) ([][]byte, error) {
storedValues := make([][]byte, len(rawDocs))

for i, rawDoc := range rawDocs {
// If the rawDoc is nil, this means that the value could not be found.
// It is not considered an error.
if rawDoc == nil {
storedValues[i] = nil

continue
}

// CouchDB still returns a raw document if the key has been deleted, so if this is a "deleted" raw document
// then we need to return nil to indicate that the value could not be found
isDeleted, containsIsDeleted := rawDoc[deletedFieldKey]
if containsIsDeleted {
isDeletedBool, ok := isDeleted.(bool)
if !ok {
return nil, errors.New("failed to assert the retrieved deleted field value as a bool")
}

if isDeletedBool {
storedValues[i] = nil

continue
}
}

storedValue, err := getStringValueFromRawDoc(rawDoc, payloadFieldKey)
if err != nil {
return nil, fmt.Errorf(`failed to get the payload from the raw document: %w`, err)
}

storedValues[i] = []byte(storedValue)
}

return storedValues, nil
}

func getRawDocsFromRows(rows rows) ([]map[string]interface{}, error) {
moreDocumentsToRead := rows.Next()

var rawDocs []map[string]interface{}

for moreDocumentsToRead {
var rawDoc map[string]interface{}
err := rows.ScanDoc(&rawDoc)
// For the regular Get method, Kivik actually returns a different error message if a document was deleted.
// When doing a bulk get, however, Kivik doesn't return an error message, and we have to check the "_deleted"
// field in the raw doc later. This is done in the getPayloadsFromRawDocs method.
// If the document wasn't found, we allow the nil raw doc to be appended since we don't consider it to be
// an error.
if err != nil && !strings.Contains(err.Error(), bulkGetDocNotFoundErrMsgFromKivik) {
return nil, fmt.Errorf(failWhileScanResultRows, err)
}

rawDocs = append(rawDocs, rawDoc)

moreDocumentsToRead = rows.Next()
}

return rawDocs, nil
}

func getTagsFromRawDoc(rawDoc map[string]interface{}) ([]newstorage.Tag, error) {
var tags []newstorage.Tag

Expand Down
48 changes: 47 additions & 1 deletion component/newstorage/couchdb/store_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type mockDB struct {
errPut error
getRowBodyData string
errGetRow error
errBulkGet error
}

func (m *mockDB) Get(context.Context, string, ...kivik.Options) *kivik.Row {
Expand All @@ -42,17 +43,22 @@ func (m *mockDB) Delete(context.Context, string, string, ...kivik.Options) (stri
return "", errors.New("mockDB Delete always fails")
}

func (m *mockDB) BulkGet(context.Context, []kivik.BulkGetReference, ...kivik.Options) (*kivik.Rows, error) {
return &kivik.Rows{}, m.errBulkGet
}

func (m *mockDB) Close(context.Context) error {
return errors.New("mockDB Close always fails")
}

type mockRows struct {
err error
errClose error
next bool
}

func (m *mockRows) Next() bool {
return false
return m.next
}

func (m *mockRows) Err() error {
Expand Down Expand Up @@ -174,6 +180,17 @@ func TestStore_Get_Internal(t *testing.T) {
})
}

func TestStore_GetBulk_Internal(t *testing.T) {
t.Run("Failure while getting raw CouchDB documents", func(t *testing.T) {
store := &Store{db: &mockDB{errBulkGet: errors.New("mockDB BulkGet always fails")}}

values, err := store.GetBulk("key")
require.EqualError(t, err, "failure while getting raw CouchDB documents: "+
"failure while sending request to CouchDB bulk docs endpoint: mockDB BulkGet always fails")
require.Nil(t, values)
})
}

func TestStore_Query_Internal(t *testing.T) {
t.Run("Failure sending tag name only query to find endpoint", func(t *testing.T) {
store := &Store{db: &mockDB{}}
Expand Down Expand Up @@ -217,6 +234,35 @@ func TestStore_Delete_Internal(t *testing.T) {
})
}

func TestGetRawDocsFromRows(t *testing.T) {
t.Run("Failure while scanning result rows", func(t *testing.T) {
rawDocs, err := getRawDocsFromRows(&mockRows{next: true})
require.EqualError(t, err, "failure while scanning result rows: mockRows ScanDoc always fails")
require.Nil(t, rawDocs)
})
}

func TestPayloadsFromRawDocs(t *testing.T) {
t.Run("Failed to assert deleted field value as a bool", func(t *testing.T) {
rawDocs := make([]map[string]interface{}, 1)
rawDocs[0] = make(map[string]interface{})
rawDocs[0][deletedFieldKey] = "Not a bool"

payloads, err := getPayloadsFromRawDocs(rawDocs)
require.EqualError(t, err, "failed to assert the retrieved deleted field value as a bool")
require.Nil(t, payloads)
})
t.Run("Failed to get the payload from the raw document", func(t *testing.T) {
rawDocs := make([]map[string]interface{}, 1)
rawDocs[0] = make(map[string]interface{})

payloads, err := getPayloadsFromRawDocs(rawDocs)
require.EqualError(t, err,
`failed to get the payload from the raw document: "payload" is missing from the raw document`)
require.Nil(t, payloads)
})
}

func TestCouchDBResultsIterator_Next_Internal(t *testing.T) {
t.Run("Error returned from result rows", func(t *testing.T) {
iterator := &couchDBResultsIterator{
Expand Down
16 changes: 12 additions & 4 deletions component/newstorage/couchdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,18 @@ func TestStore_GetTags(t *testing.T) {
}

func TestStore_GetBulk(t *testing.T) {
t.Run("Failure - not implemented", func(t *testing.T) {
store := &Store{}
_, err := store.GetBulk()
require.EqualError(t, err, "not implemented")
t.Run("Failure: keys string slice cannot be nil", func(t *testing.T) {
provider, err := NewProvider(couchDBURL, WithDBPrefix("prefix"))
require.NoError(t, err)
require.NotNil(t, provider)

store, err := provider.OpenStore(randomStoreName())
require.NoError(t, err)
require.NotNil(t, store)

values, err := store.GetBulk(nil...)
require.EqualError(t, err, "keys string slice cannot be nil")
require.Nil(t, values)
})
}

Expand Down
Loading

0 comments on commit 91dd4c2

Please sign in to comment.