Skip to content
This repository has been archived by the owner on Mar 27, 2024. It is now read-only.

feat: GetBulk method for new CouchDB storage implementation #49

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 112 additions & 9 deletions component/newstorage/couchdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import ( //nolint:gci // False positive, seemingly caused by the CouchDB driver
const (
couchDBUsersTable = "_users"
revIDFieldKey = "_rev"
deletedFieldKey = "_deleted"

designDocumentName = "AriesStorageDesignDocument"
payloadFieldKey = "payload"

// Hardcoded strings returned from Kivik/CouchDB that we check for.
docNotFoundErrMsgFromKivik = "Not Found: missing"
bulkGetDocNotFoundErrMsgFromKivik = "not_found: missing"
docDeletedErrMsgFromKivik = "Not Found: deleted"
databaseNotFoundErrMsgFromKivik = "Not Found: Database does not exist."
documentUpdateConflictErrMsgFromKivik = "Conflict: Document update conflict."
Expand Down Expand Up @@ -61,6 +63,7 @@ type db interface {
Put(ctx context.Context, docID string, doc interface{}, options ...kivik.Options) (rev string, err error)
Find(ctx context.Context, query interface{}, options ...kivik.Options) (*kivik.Rows, error)
Delete(ctx context.Context, docID, rev string, options ...kivik.Options) (newRev string, err error)
BulkGet(ctx context.Context, docs []kivik.BulkGetReference, options ...kivik.Options) (*kivik.Rows, error)
Close(ctx context.Context) error
}

Expand Down Expand Up @@ -339,7 +342,7 @@ func (s *Store) Get(k string) ([]byte, error) {
return nil, fmt.Errorf(failureWhileScanningRow, err)
}

storedValue, err := getValueFromRawDoc(rawDoc, payloadFieldKey)
storedValue, err := getStringValueFromRawDoc(rawDoc, payloadFieldKey)
if err != nil {
return nil, fmt.Errorf("failed to get payload from raw document: %w", err)
}
Expand Down Expand Up @@ -377,7 +380,21 @@ func (s *Store) GetTags(k string) ([]newstorage.Tag, error) {
// GetBulk fetches the values associated with the given keys.
// If a key doesn't exist, then a nil []byte is returned for that value. It is not considered an error.
func (s *Store) GetBulk(keys ...string) ([][]byte, error) {
return nil, errors.New("not implemented")
if keys == nil {
return nil, errors.New("keys string slice cannot be nil")
}

rawDocs, err := s.getRawDocs(keys)
if err != nil {
return nil, fmt.Errorf("failure while getting raw CouchDB documents: %w", err)
}

values, err := getPayloadsFromRawDocs(rawDocs)
if err != nil {
return nil, fmt.Errorf("failure while getting stored values from raw docs: %w", err)
}

return values, nil
}

// Query returns all data that satisfies the expression. Expression format: TagName:TagValue.
Expand Down Expand Up @@ -535,14 +552,39 @@ func (s *Store) getRevID(k string) (string, error) {
return "", err
}

revID, err := getValueFromRawDoc(rawDoc, revIDFieldKey)
revID, err := getStringValueFromRawDoc(rawDoc, revIDFieldKey)
if err != nil {
return "", fmt.Errorf("failed to get revision ID from the raw document: %w", err)
}

return revID, nil
}

// getRawDocs returns the raw documents from CouchDB using a bulk REST call.
// If a document is not found, then the raw document will be nil. It is not considered an error.
func (s *Store) getRawDocs(keys []string) ([]map[string]interface{}, error) {
bulkGetReferences := make([]kivik.BulkGetReference, len(keys))
for i, key := range keys {
bulkGetReferences[i].ID = key
}

rows, err := s.db.BulkGet(context.Background(), bulkGetReferences)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder that contexts need to be passed-in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a followup issue earlier for it: #48

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal is to get the storage implementations done first so that I can update the aries-framework-go functionality to use the new storage interface. Then I'll go and make those improvements that can be done easily afterwards.

if err != nil {
return nil, fmt.Errorf("failure while sending request to CouchDB bulk docs endpoint: %w", err)
}

rawDocs, err := getRawDocsFromRows(rows)
if err != nil {
return nil, fmt.Errorf("failed to get raw documents from rows: %w", err)
}

if len(rawDocs) != len(keys) {
return nil, fmt.Errorf("received %d raw documents, but %d were expected", len(rawDocs), len(keys))
}

return rawDocs, nil
}

type couchDBResultsIterator struct {
store *Store
resultRows rows
Expand Down Expand Up @@ -733,15 +775,12 @@ func createIndexes(db *kivik.DB, tagNamesNeedIndexCreation []string) error {

func getQueryOptions(options []newstorage.QueryOption) newstorage.QueryOptions {
var queryOptions newstorage.QueryOptions
queryOptions.PageSize = 25

for _, option := range options {
option(&queryOptions)
}

if queryOptions.PageSize == 0 {
queryOptions.PageSize = 25
}

return queryOptions
}

Expand All @@ -753,7 +792,7 @@ func getValueFromRows(rows rows, rawDocKey string) (string, error) {
return "", fmt.Errorf(failWhileScanResultRows, err)
}

value, err := getValueFromRawDoc(rawDoc, rawDocKey)
value, err := getStringValueFromRawDoc(rawDoc, rawDocKey)
if err != nil {
return "", fmt.Errorf(`failure while getting the value associated with the "%s" key`+
`from the raw document`, rawDocKey)
Expand All @@ -762,7 +801,7 @@ func getValueFromRows(rows rows, rawDocKey string) (string, error) {
return value, nil
}

func getValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string, error) {
func getStringValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string, error) {
value, ok := rawDoc[rawDocKey]
if !ok {
return "", fmt.Errorf(`"%s" is missing from the raw document`, rawDocKey)
Expand All @@ -778,6 +817,70 @@ func getValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string
return valueString, nil
}

func getPayloadsFromRawDocs(rawDocs []map[string]interface{}) ([][]byte, error) {
storedValues := make([][]byte, len(rawDocs))

for i, rawDoc := range rawDocs {
// If the rawDoc is nil, this means that the value could not be found.
// It is not considered an error.
if rawDoc == nil {
storedValues[i] = nil

continue
}

// CouchDB still returns a raw document if the key has been deleted, so if this is a "deleted" raw document
// then we need to return nil to indicate that the value could not be found
isDeleted, containsIsDeleted := rawDoc[deletedFieldKey]
if containsIsDeleted {
isDeletedBool, ok := isDeleted.(bool)
if !ok {
return nil, errors.New("failed to assert the retrieved deleted field value as a bool")
}

if isDeletedBool {
storedValues[i] = nil

continue
}
}

storedValue, err := getStringValueFromRawDoc(rawDoc, payloadFieldKey)
if err != nil {
return nil, fmt.Errorf(`failed to get the payload from the raw document: %w`, err)
}

storedValues[i] = []byte(storedValue)
}

return storedValues, nil
}

func getRawDocsFromRows(rows rows) ([]map[string]interface{}, error) {
moreDocumentsToRead := rows.Next()

var rawDocs []map[string]interface{}

for moreDocumentsToRead {
var rawDoc map[string]interface{}
err := rows.ScanDoc(&rawDoc)
// For the regular Get method, Kivik actually returns a different error message if a document was deleted.
// When doing a bulk get, however, Kivik doesn't return an error message, and we have to check the "_deleted"
// field in the raw doc later. This is done in the getPayloadsFromRawDocs method.
// If the document wasn't found, we allow the nil raw doc to be appended since we don't consider it to be
// an error.
if err != nil && !strings.Contains(err.Error(), bulkGetDocNotFoundErrMsgFromKivik) {
return nil, fmt.Errorf(failWhileScanResultRows, err)
}

rawDocs = append(rawDocs, rawDoc)

moreDocumentsToRead = rows.Next()
}

return rawDocs, nil
}

func getTagsFromRawDoc(rawDoc map[string]interface{}) ([]newstorage.Tag, error) {
var tags []newstorage.Tag

Expand Down
48 changes: 47 additions & 1 deletion component/newstorage/couchdb/store_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type mockDB struct {
errPut error
getRowBodyData string
errGetRow error
errBulkGet error
}

func (m *mockDB) Get(context.Context, string, ...kivik.Options) *kivik.Row {
Expand All @@ -42,17 +43,22 @@ func (m *mockDB) Delete(context.Context, string, string, ...kivik.Options) (stri
return "", errors.New("mockDB Delete always fails")
}

func (m *mockDB) BulkGet(context.Context, []kivik.BulkGetReference, ...kivik.Options) (*kivik.Rows, error) {
return &kivik.Rows{}, m.errBulkGet
}

func (m *mockDB) Close(context.Context) error {
return errors.New("mockDB Close always fails")
}

type mockRows struct {
err error
errClose error
next bool
}

func (m *mockRows) Next() bool {
return false
return m.next
}

func (m *mockRows) Err() error {
Expand Down Expand Up @@ -174,6 +180,17 @@ func TestStore_Get_Internal(t *testing.T) {
})
}

func TestStore_GetBulk_Internal(t *testing.T) {
t.Run("Failure while getting raw CouchDB documents", func(t *testing.T) {
store := &Store{db: &mockDB{errBulkGet: errors.New("mockDB BulkGet always fails")}}

values, err := store.GetBulk("key")
require.EqualError(t, err, "failure while getting raw CouchDB documents: "+
"failure while sending request to CouchDB bulk docs endpoint: mockDB BulkGet always fails")
require.Nil(t, values)
})
}

func TestStore_Query_Internal(t *testing.T) {
t.Run("Failure sending tag name only query to find endpoint", func(t *testing.T) {
store := &Store{db: &mockDB{}}
Expand Down Expand Up @@ -217,6 +234,35 @@ func TestStore_Delete_Internal(t *testing.T) {
})
}

func TestGetRawDocsFromRows(t *testing.T) {
t.Run("Failure while scanning result rows", func(t *testing.T) {
rawDocs, err := getRawDocsFromRows(&mockRows{next: true})
require.EqualError(t, err, "failure while scanning result rows: mockRows ScanDoc always fails")
require.Nil(t, rawDocs)
})
}

func TestPayloadsFromRawDocs(t *testing.T) {
t.Run("Failed to assert deleted field value as a bool", func(t *testing.T) {
rawDocs := make([]map[string]interface{}, 1)
rawDocs[0] = make(map[string]interface{})
rawDocs[0][deletedFieldKey] = "Not a bool"

payloads, err := getPayloadsFromRawDocs(rawDocs)
require.EqualError(t, err, "failed to assert the retrieved deleted field value as a bool")
require.Nil(t, payloads)
})
t.Run("Failed to get the payload from the raw document", func(t *testing.T) {
rawDocs := make([]map[string]interface{}, 1)
rawDocs[0] = make(map[string]interface{})

payloads, err := getPayloadsFromRawDocs(rawDocs)
require.EqualError(t, err,
`failed to get the payload from the raw document: "payload" is missing from the raw document`)
require.Nil(t, payloads)
})
}

func TestCouchDBResultsIterator_Next_Internal(t *testing.T) {
t.Run("Error returned from result rows", func(t *testing.T) {
iterator := &couchDBResultsIterator{
Expand Down
16 changes: 12 additions & 4 deletions component/newstorage/couchdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,18 @@ func TestStore_GetTags(t *testing.T) {
}

func TestStore_GetBulk(t *testing.T) {
t.Run("Failure - not implemented", func(t *testing.T) {
store := &Store{}
_, err := store.GetBulk()
require.EqualError(t, err, "not implemented")
t.Run("Failure: keys string slice cannot be nil", func(t *testing.T) {
provider, err := NewProvider(couchDBURL, WithDBPrefix("prefix"))
require.NoError(t, err)
require.NotNil(t, provider)

store, err := provider.OpenStore(randomStoreName())
require.NoError(t, err)
require.NotNil(t, store)

values, err := store.GetBulk(nil...)
require.EqualError(t, err, "keys string slice cannot be nil")
require.Nil(t, values)
})
}

Expand Down
Loading