diff --git a/component/newstorage/couchdb/store.go b/component/newstorage/couchdb/store.go index 6f7f4d91..099a1420 100644 --- a/component/newstorage/couchdb/store.go +++ b/component/newstorage/couchdb/store.go @@ -25,12 +25,14 @@ import ( //nolint:gci // False positive, seemingly caused by the CouchDB driver const ( couchDBUsersTable = "_users" revIDFieldKey = "_rev" + deletedFieldKey = "_deleted" designDocumentName = "AriesStorageDesignDocument" payloadFieldKey = "payload" // Hardcoded strings returned from Kivik/CouchDB that we check for. docNotFoundErrMsgFromKivik = "Not Found: missing" + bulkGetDocNotFoundErrMsgFromKivik = "not_found: missing" docDeletedErrMsgFromKivik = "Not Found: deleted" databaseNotFoundErrMsgFromKivik = "Not Found: Database does not exist." documentUpdateConflictErrMsgFromKivik = "Conflict: Document update conflict." @@ -61,6 +63,7 @@ type db interface { Put(ctx context.Context, docID string, doc interface{}, options ...kivik.Options) (rev string, err error) Find(ctx context.Context, query interface{}, options ...kivik.Options) (*kivik.Rows, error) Delete(ctx context.Context, docID, rev string, options ...kivik.Options) (newRev string, err error) + BulkGet(ctx context.Context, docs []kivik.BulkGetReference, options ...kivik.Options) (*kivik.Rows, error) Close(ctx context.Context) error } @@ -339,7 +342,7 @@ func (s *Store) Get(k string) ([]byte, error) { return nil, fmt.Errorf(failureWhileScanningRow, err) } - storedValue, err := getValueFromRawDoc(rawDoc, payloadFieldKey) + storedValue, err := getStringValueFromRawDoc(rawDoc, payloadFieldKey) if err != nil { return nil, fmt.Errorf("failed to get payload from raw document: %w", err) } @@ -377,7 +380,21 @@ func (s *Store) GetTags(k string) ([]newstorage.Tag, error) { // GetBulk fetches the values associated with the given keys. // If a key doesn't exist, then a nil []byte is returned for that value. It is not considered an error. func (s *Store) GetBulk(keys ...string) ([][]byte, error) { - return nil, errors.New("not implemented") + if keys == nil { + return nil, errors.New("keys string slice cannot be nil") + } + + rawDocs, err := s.getRawDocs(keys) + if err != nil { + return nil, fmt.Errorf("failure while getting raw CouchDB documents: %w", err) + } + + values, err := getPayloadsFromRawDocs(rawDocs) + if err != nil { + return nil, fmt.Errorf("failure while getting stored values from raw docs: %w", err) + } + + return values, nil } // Query returns all data that satisfies the expression. Expression format: TagName:TagValue. @@ -535,7 +552,7 @@ func (s *Store) getRevID(k string) (string, error) { return "", err } - revID, err := getValueFromRawDoc(rawDoc, revIDFieldKey) + revID, err := getStringValueFromRawDoc(rawDoc, revIDFieldKey) if err != nil { return "", fmt.Errorf("failed to get revision ID from the raw document: %w", err) } @@ -543,6 +560,31 @@ func (s *Store) getRevID(k string) (string, error) { return revID, nil } +// getRawDocs returns the raw documents from CouchDB using a bulk REST call. +// If a document is not found, then the raw document will be nil. It is not considered an error. +func (s *Store) getRawDocs(keys []string) ([]map[string]interface{}, error) { + bulkGetReferences := make([]kivik.BulkGetReference, len(keys)) + for i, key := range keys { + bulkGetReferences[i].ID = key + } + + rows, err := s.db.BulkGet(context.Background(), bulkGetReferences) + if err != nil { + return nil, fmt.Errorf("failure while sending request to CouchDB bulk docs endpoint: %w", err) + } + + rawDocs, err := getRawDocsFromRows(rows) + if err != nil { + return nil, fmt.Errorf("failed to get raw documents from rows: %w", err) + } + + if len(rawDocs) != len(keys) { + return nil, fmt.Errorf("received %d raw documents, but %d were expected", len(rawDocs), len(keys)) + } + + return rawDocs, nil +} + type couchDBResultsIterator struct { store *Store resultRows rows @@ -733,15 +775,12 @@ func createIndexes(db *kivik.DB, tagNamesNeedIndexCreation []string) error { func getQueryOptions(options []newstorage.QueryOption) newstorage.QueryOptions { var queryOptions newstorage.QueryOptions + queryOptions.PageSize = 25 for _, option := range options { option(&queryOptions) } - if queryOptions.PageSize == 0 { - queryOptions.PageSize = 25 - } - return queryOptions } @@ -753,7 +792,7 @@ func getValueFromRows(rows rows, rawDocKey string) (string, error) { return "", fmt.Errorf(failWhileScanResultRows, err) } - value, err := getValueFromRawDoc(rawDoc, rawDocKey) + value, err := getStringValueFromRawDoc(rawDoc, rawDocKey) if err != nil { return "", fmt.Errorf(`failure while getting the value associated with the "%s" key`+ `from the raw document`, rawDocKey) @@ -762,7 +801,7 @@ func getValueFromRows(rows rows, rawDocKey string) (string, error) { return value, nil } -func getValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string, error) { +func getStringValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string, error) { value, ok := rawDoc[rawDocKey] if !ok { return "", fmt.Errorf(`"%s" is missing from the raw document`, rawDocKey) @@ -778,6 +817,70 @@ func getValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string return valueString, nil } +func getPayloadsFromRawDocs(rawDocs []map[string]interface{}) ([][]byte, error) { + storedValues := make([][]byte, len(rawDocs)) + + for i, rawDoc := range rawDocs { + // If the rawDoc is nil, this means that the value could not be found. + // It is not considered an error. + if rawDoc == nil { + storedValues[i] = nil + + continue + } + + // CouchDB still returns a raw document if the key has been deleted, so if this is a "deleted" raw document + // then we need to return nil to indicate that the value could not be found + isDeleted, containsIsDeleted := rawDoc[deletedFieldKey] + if containsIsDeleted { + isDeletedBool, ok := isDeleted.(bool) + if !ok { + return nil, errors.New("failed to assert the retrieved deleted field value as a bool") + } + + if isDeletedBool { + storedValues[i] = nil + + continue + } + } + + storedValue, err := getStringValueFromRawDoc(rawDoc, payloadFieldKey) + if err != nil { + return nil, fmt.Errorf(`failed to get the payload from the raw document: %w`, err) + } + + storedValues[i] = []byte(storedValue) + } + + return storedValues, nil +} + +func getRawDocsFromRows(rows rows) ([]map[string]interface{}, error) { + moreDocumentsToRead := rows.Next() + + var rawDocs []map[string]interface{} + + for moreDocumentsToRead { + var rawDoc map[string]interface{} + err := rows.ScanDoc(&rawDoc) + // For the regular Get method, Kivik actually returns a different error message if a document was deleted. + // When doing a bulk get, however, Kivik doesn't return an error message, and we have to check the "_deleted" + // field in the raw doc later. This is done in the getPayloadsFromRawDocs method. + // If the document wasn't found, we allow the nil raw doc to be appended since we don't consider it to be + // an error. + if err != nil && !strings.Contains(err.Error(), bulkGetDocNotFoundErrMsgFromKivik) { + return nil, fmt.Errorf(failWhileScanResultRows, err) + } + + rawDocs = append(rawDocs, rawDoc) + + moreDocumentsToRead = rows.Next() + } + + return rawDocs, nil +} + func getTagsFromRawDoc(rawDoc map[string]interface{}) ([]newstorage.Tag, error) { var tags []newstorage.Tag diff --git a/component/newstorage/couchdb/store_internal_test.go b/component/newstorage/couchdb/store_internal_test.go index 0af8fbca..2592be70 100644 --- a/component/newstorage/couchdb/store_internal_test.go +++ b/component/newstorage/couchdb/store_internal_test.go @@ -21,6 +21,7 @@ type mockDB struct { errPut error getRowBodyData string errGetRow error + errBulkGet error } func (m *mockDB) Get(context.Context, string, ...kivik.Options) *kivik.Row { @@ -42,6 +43,10 @@ func (m *mockDB) Delete(context.Context, string, string, ...kivik.Options) (stri return "", errors.New("mockDB Delete always fails") } +func (m *mockDB) BulkGet(context.Context, []kivik.BulkGetReference, ...kivik.Options) (*kivik.Rows, error) { + return &kivik.Rows{}, m.errBulkGet +} + func (m *mockDB) Close(context.Context) error { return errors.New("mockDB Close always fails") } @@ -49,10 +54,11 @@ func (m *mockDB) Close(context.Context) error { type mockRows struct { err error errClose error + next bool } func (m *mockRows) Next() bool { - return false + return m.next } func (m *mockRows) Err() error { @@ -174,6 +180,17 @@ func TestStore_Get_Internal(t *testing.T) { }) } +func TestStore_GetBulk_Internal(t *testing.T) { + t.Run("Failure while getting raw CouchDB documents", func(t *testing.T) { + store := &Store{db: &mockDB{errBulkGet: errors.New("mockDB BulkGet always fails")}} + + values, err := store.GetBulk("key") + require.EqualError(t, err, "failure while getting raw CouchDB documents: "+ + "failure while sending request to CouchDB bulk docs endpoint: mockDB BulkGet always fails") + require.Nil(t, values) + }) +} + func TestStore_Query_Internal(t *testing.T) { t.Run("Failure sending tag name only query to find endpoint", func(t *testing.T) { store := &Store{db: &mockDB{}} @@ -217,6 +234,35 @@ func TestStore_Delete_Internal(t *testing.T) { }) } +func TestGetRawDocsFromRows(t *testing.T) { + t.Run("Failure while scanning result rows", func(t *testing.T) { + rawDocs, err := getRawDocsFromRows(&mockRows{next: true}) + require.EqualError(t, err, "failure while scanning result rows: mockRows ScanDoc always fails") + require.Nil(t, rawDocs) + }) +} + +func TestPayloadsFromRawDocs(t *testing.T) { + t.Run("Failed to assert deleted field value as a bool", func(t *testing.T) { + rawDocs := make([]map[string]interface{}, 1) + rawDocs[0] = make(map[string]interface{}) + rawDocs[0][deletedFieldKey] = "Not a bool" + + payloads, err := getPayloadsFromRawDocs(rawDocs) + require.EqualError(t, err, "failed to assert the retrieved deleted field value as a bool") + require.Nil(t, payloads) + }) + t.Run("Failed to get the payload from the raw document", func(t *testing.T) { + rawDocs := make([]map[string]interface{}, 1) + rawDocs[0] = make(map[string]interface{}) + + payloads, err := getPayloadsFromRawDocs(rawDocs) + require.EqualError(t, err, + `failed to get the payload from the raw document: "payload" is missing from the raw document`) + require.Nil(t, payloads) + }) +} + func TestCouchDBResultsIterator_Next_Internal(t *testing.T) { t.Run("Error returned from result rows", func(t *testing.T) { iterator := &couchDBResultsIterator{ diff --git a/component/newstorage/couchdb/store_test.go b/component/newstorage/couchdb/store_test.go index 2fd5eec4..f827bac3 100644 --- a/component/newstorage/couchdb/store_test.go +++ b/component/newstorage/couchdb/store_test.go @@ -375,10 +375,18 @@ func TestStore_GetTags(t *testing.T) { } func TestStore_GetBulk(t *testing.T) { - t.Run("Failure - not implemented", func(t *testing.T) { - store := &Store{} - _, err := store.GetBulk() - require.EqualError(t, err, "not implemented") + t.Run("Failure: keys string slice cannot be nil", func(t *testing.T) { + provider, err := NewProvider(couchDBURL, WithDBPrefix("prefix")) + require.NoError(t, err) + require.NotNil(t, provider) + + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + values, err := store.GetBulk(nil...) + require.EqualError(t, err, "keys string slice cannot be nil") + require.Nil(t, values) }) } diff --git a/test/component/newstorage/newstore.go b/test/component/newstorage/newstore.go index 78a6aa5a..46fb5065 100644 --- a/test/component/newstorage/newstore.go +++ b/test/component/newstorage/newstore.go @@ -23,6 +23,9 @@ func TestAll(t *testing.T, provider newstorage.Provider) { t.Run("Store Put and Get", func(t *testing.T) { TestPutGet(t, provider) }) + t.Run("Store GetBulk and Get", func(t *testing.T) { + TestGetBulk(t, provider) + }) t.Run("Delete", func(t *testing.T) { TestDelete(t, provider) }) @@ -86,6 +89,108 @@ func TestPutGet(t *testing.T, provider newstorage.Provider) { tryNilOrBlankValues(t, store1, data, commonKey) } +// TestGetBulk tests common GetBulk functionality. +func TestGetBulk(t *testing.T, provider newstorage.Provider) { //nolint: funlen // Test file + t.Run("Success: all values found", func(t *testing.T) { + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + err = store.Put("key1", []byte("value1"), + []newstorage.Tag{ + {Name: "tagName1", Value: "tagValue1"}, + {Name: "tagName2", Value: "tagValue2"}, + }...) + require.NoError(t, err) + + err = store.Put("key2", []byte("value2"), + []newstorage.Tag{ + {Name: "tagName1", Value: "tagValue1"}, + {Name: "tagName2", Value: "tagValue2"}, + }...) + require.NoError(t, err) + + values, err := store.GetBulk("key1", "key2") + require.NoError(t, err) + require.Len(t, values, 2) + require.Equal(t, "value1", string(values[0])) + require.Equal(t, "value2", string(values[1])) + }) + t.Run("Success: one value found, one not", func(t *testing.T) { + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + err = store.Put("key1", []byte("value1"), + []newstorage.Tag{ + {Name: "tagName1", Value: "tagValue1"}, + {Name: "tagName2", Value: "tagValue2"}, + }...) + require.NoError(t, err) + + values, err := store.GetBulk("key1", "key2") + require.NoError(t, err) + require.Len(t, values, 2) + require.Equal(t, "value1", string(values[0])) + require.Nil(t, values[1]) + }) + t.Run("Success: one value found, one not because it was deleted", func(t *testing.T) { + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + err = store.Put("key1", []byte("value1"), + []newstorage.Tag{ + {Name: "tagName1", Value: "tagValue1"}, + {Name: "tagName2", Value: "tagValue2"}, + }...) + require.NoError(t, err) + + err = store.Put("key2", []byte("value2"), + []newstorage.Tag{ + {Name: "tagName1", Value: "tagValue1"}, + {Name: "tagName2", Value: "tagValue2"}, + }...) + require.NoError(t, err) + + err = store.Delete("key2") + require.NoError(t, err) + + values, err := store.GetBulk("key1", "key2") + require.NoError(t, err) + require.Len(t, values, 2) + require.Equal(t, "value1", string(values[0])) + require.Nil(t, values[1]) + }) + t.Run("Success: no values found", func(t *testing.T) { + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + err = store.Put("key1", []byte("value1"), + []newstorage.Tag{ + {Name: "tagName1", Value: "tagValue1"}, + {Name: "tagName2", Value: "tagValue2"}, + }...) + require.NoError(t, err) + + values, err := store.GetBulk("key3", "key4") + require.NoError(t, err) + require.Len(t, values, 2) + require.Nil(t, values[0]) + require.Nil(t, values[1]) + }) + t.Run("Failure: keys string slice cannot be nil", func(t *testing.T) { + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + values, err := store.GetBulk(nil...) + require.EqualError(t, err, "keys string slice cannot be nil") + require.Nil(t, values) + }) +} + // TestDelete tests common Delete functionality. func TestDelete(t *testing.T, provider newstorage.Provider) { t.Helper()