From fa5e6c89159004ffa3337742d35d4e3f0b9efac7 Mon Sep 17 00:00:00 2001 From: Derek Trider Date: Thu, 14 Jan 2021 15:47:45 -0500 Subject: [PATCH] feat: GetBulk method for new CouchDB storage implementation Signed-off-by: Derek Trider --- component/newstorage/couchdb/store.go | 114 +++++++++++++++++- .../newstorage/couchdb/store_internal_test.go | 15 +++ component/newstorage/couchdb/store_test.go | 16 ++- test/component/newstorage/newstore.go | 77 ++++++++++++ 4 files changed, 213 insertions(+), 9 deletions(-) diff --git a/component/newstorage/couchdb/store.go b/component/newstorage/couchdb/store.go index 6f7f4d91..ed7a62da 100644 --- a/component/newstorage/couchdb/store.go +++ b/component/newstorage/couchdb/store.go @@ -31,6 +31,7 @@ const ( // Hardcoded strings returned from Kivik/CouchDB that we check for. docNotFoundErrMsgFromKivik = "Not Found: missing" + bulkGetDocNotFoundErrMsgFromKivik = "not_found: missing" docDeletedErrMsgFromKivik = "Not Found: deleted" databaseNotFoundErrMsgFromKivik = "Not Found: Database does not exist." documentUpdateConflictErrMsgFromKivik = "Conflict: Document update conflict." @@ -61,6 +62,7 @@ type db interface { Put(ctx context.Context, docID string, doc interface{}, options ...kivik.Options) (rev string, err error) Find(ctx context.Context, query interface{}, options ...kivik.Options) (*kivik.Rows, error) Delete(ctx context.Context, docID, rev string, options ...kivik.Options) (newRev string, err error) + BulkGet(ctx context.Context, docs []kivik.BulkGetReference, options ...kivik.Options) (*kivik.Rows, error) Close(ctx context.Context) error } @@ -339,7 +341,7 @@ func (s *Store) Get(k string) ([]byte, error) { return nil, fmt.Errorf(failureWhileScanningRow, err) } - storedValue, err := getValueFromRawDoc(rawDoc, payloadFieldKey) + storedValue, err := getStringValueFromRawDoc(rawDoc, payloadFieldKey) if err != nil { return nil, fmt.Errorf("failed to get payload from raw document: %w", err) } @@ -377,7 +379,21 @@ func (s *Store) GetTags(k string) ([]newstorage.Tag, error) { // GetBulk fetches the values associated with the given keys. // If a key doesn't exist, then a nil []byte is returned for that value. It is not considered an error. func (s *Store) GetBulk(keys ...string) ([][]byte, error) { - return nil, errors.New("not implemented") + if keys == nil { + return nil, errors.New("keys string slice cannot be nil") + } + + rawDocs, err := s.getRawDocs(keys) + if err != nil { + return nil, fmt.Errorf("failure while getting raw CouchDB documents: %w", err) + } + + values, err := s.getStoredValuesFromRawDocs(rawDocs, keys) + if err != nil { + return nil, fmt.Errorf("failure while getting stored values from raw docs: %w", err) + } + + return values, nil } // Query returns all data that satisfies the expression. Expression format: TagName:TagValue. @@ -535,7 +551,7 @@ func (s *Store) getRevID(k string) (string, error) { return "", err } - revID, err := getValueFromRawDoc(rawDoc, revIDFieldKey) + revID, err := getStringValueFromRawDoc(rawDoc, revIDFieldKey) if err != nil { return "", fmt.Errorf("failed to get revision ID from the raw document: %w", err) } @@ -543,6 +559,94 @@ func (s *Store) getRevID(k string) (string, error) { return revID, nil } +// getRawDocs returns the raw documents from CouchDB using a bulk REST call. +// If a document is not found, then the raw document will be nil. It is not considered an error. +func (s *Store) getRawDocs(keys []string) ([]map[string]interface{}, error) { + rawDocs := make([]map[string]interface{}, len(keys)) + + bulkGetReferences := make([]kivik.BulkGetReference, len(keys)) + + for i, key := range keys { + bulkGetReferences[i].ID = key + } + + rows, err := s.db.BulkGet(context.Background(), bulkGetReferences) + if err != nil { + return nil, fmt.Errorf("failure while sending request to CouchDB bulk docs endpoint: %w", err) + } + + ok := rows.Next() + + if !ok { + return nil, errors.New("bulk get from CouchDB was unexpectedly empty") + } + + for i := 0; i < len(rawDocs); i++ { + err := rows.ScanDoc(&rawDocs[i]) + // In the getRawDoc method, Kivik actually returns a different error message if a document was deleted. + // When doing a bulk get, instead Kivik doesn't return an error message, and we have to check the "_deleted" + // field in the raw doc. This is done in the getRevIDs method. + // If the document wasn't found, we return a nil raw doc since we don't consider it to be an error. + if err != nil && !strings.Contains(err.Error(), bulkGetDocNotFoundErrMsgFromKivik) { + return nil, fmt.Errorf(failWhileScanResultRows, err) + } + + ok := rows.Next() + + // ok is expected to be false on the last doc. + if i < len(rawDocs)-1 { + if !ok { + return nil, errors.New("got fewer docs from CouchDB than expected") + } + } else { + if ok { + return nil, errors.New("got more docs from CouchDB than expected") + } + } + } + + return rawDocs, nil +} + +func (s *Store) getStoredValuesFromRawDocs(rawDocs []map[string]interface{}, keys []string) ([][]byte, error) { + storedValues := make([][]byte, len(keys)) + + for i, rawDoc := range rawDocs { + // If the rawDoc is nil, this means that the value could not be found. + // It is not considered an error. + if rawDoc == nil { + storedValues[i] = nil + + continue + } + + // CouchDB still returns a raw document if the key has been deleted, so if this is a "deleted" raw document + // then we need to return nil to indicate that the value could not be found + isDeleted, containsIsDeleted := rawDoc["_deleted"] + if containsIsDeleted { + isDeletedBool, ok := isDeleted.(bool) + if !ok { + return nil, errors.New("failed to assert the retrieved deleted field as a bool") + } + + if isDeletedBool { + storedValues[i] = nil + + continue + } + } + + storedValue, err := getStringValueFromRawDoc(rawDoc, payloadFieldKey) + if err != nil { + return nil, fmt.Errorf(`failed to get the payload from the raw document: %w`, err) + } + + storedValues[i] = []byte(storedValue) + } + + return storedValues, nil +} + type couchDBResultsIterator struct { store *Store resultRows rows @@ -753,7 +857,7 @@ func getValueFromRows(rows rows, rawDocKey string) (string, error) { return "", fmt.Errorf(failWhileScanResultRows, err) } - value, err := getValueFromRawDoc(rawDoc, rawDocKey) + value, err := getStringValueFromRawDoc(rawDoc, rawDocKey) if err != nil { return "", fmt.Errorf(`failure while getting the value associated with the "%s" key`+ `from the raw document`, rawDocKey) @@ -762,7 +866,7 @@ func getValueFromRows(rows rows, rawDocKey string) (string, error) { return value, nil } -func getValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string, error) { +func getStringValueFromRawDoc(rawDoc map[string]interface{}, rawDocKey string) (string, error) { value, ok := rawDoc[rawDocKey] if !ok { return "", fmt.Errorf(`"%s" is missing from the raw document`, rawDocKey) diff --git a/component/newstorage/couchdb/store_internal_test.go b/component/newstorage/couchdb/store_internal_test.go index 0af8fbca..da7938f8 100644 --- a/component/newstorage/couchdb/store_internal_test.go +++ b/component/newstorage/couchdb/store_internal_test.go @@ -42,6 +42,10 @@ func (m *mockDB) Delete(context.Context, string, string, ...kivik.Options) (stri return "", errors.New("mockDB Delete always fails") } +func (m *mockDB) BulkGet(context.Context, []kivik.BulkGetReference, ...kivik.Options) (*kivik.Rows, error) { + return nil, errors.New("mockDB BulkGet always fails") +} + func (m *mockDB) Close(context.Context) error { return errors.New("mockDB Close always fails") } @@ -174,6 +178,17 @@ func TestStore_Get_Internal(t *testing.T) { }) } +func TestStore_GetBulk_Internal(t *testing.T) { + t.Run("Failure while getting raw CouchDB documents", func(t *testing.T) { + store := &Store{db: &mockDB{}} + + values, err := store.GetBulk("key") + require.EqualError(t, err, "failure while getting raw CouchDB documents: "+ + "failure while sending request to CouchDB bulk docs endpoint: mockDB BulkGet always fails") + require.Nil(t, values) + }) +} + func TestStore_Query_Internal(t *testing.T) { t.Run("Failure sending tag name only query to find endpoint", func(t *testing.T) { store := &Store{db: &mockDB{}} diff --git a/component/newstorage/couchdb/store_test.go b/component/newstorage/couchdb/store_test.go index 2fd5eec4..f827bac3 100644 --- a/component/newstorage/couchdb/store_test.go +++ b/component/newstorage/couchdb/store_test.go @@ -375,10 +375,18 @@ func TestStore_GetTags(t *testing.T) { } func TestStore_GetBulk(t *testing.T) { - t.Run("Failure - not implemented", func(t *testing.T) { - store := &Store{} - _, err := store.GetBulk() - require.EqualError(t, err, "not implemented") + t.Run("Failure: keys string slice cannot be nil", func(t *testing.T) { + provider, err := NewProvider(couchDBURL, WithDBPrefix("prefix")) + require.NoError(t, err) + require.NotNil(t, provider) + + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + values, err := store.GetBulk(nil...) + require.EqualError(t, err, "keys string slice cannot be nil") + require.Nil(t, values) }) } diff --git a/test/component/newstorage/newstore.go b/test/component/newstorage/newstore.go index 78a6aa5a..e48bc491 100644 --- a/test/component/newstorage/newstore.go +++ b/test/component/newstorage/newstore.go @@ -23,6 +23,9 @@ func TestAll(t *testing.T, provider newstorage.Provider) { t.Run("Store Put and Get", func(t *testing.T) { TestPutGet(t, provider) }) + t.Run("Store GetBulk and Get", func(t *testing.T) { + TestGetBulk(t, provider) + }) t.Run("Delete", func(t *testing.T) { TestDelete(t, provider) }) @@ -86,6 +89,80 @@ func TestPutGet(t *testing.T, provider newstorage.Provider) { tryNilOrBlankValues(t, store1, data, commonKey) } +// TestGetBulk tests common GetBulk functionality. +func TestGetBulk(t *testing.T, provider newstorage.Provider) { //nolint: funlen // Test file + t.Run("Success: all values found", func(t *testing.T) { + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + err = store.Put("key1", []byte("value1"), + []newstorage.Tag{ + {Name: "tagName1", Value: "tagValue1"}, + {Name: "tagName2", Value: "tagValue2"}, + }...) + require.NoError(t, err) + + err = store.Put("key2", []byte("value2"), + []newstorage.Tag{ + {Name: "tagName1", Value: "tagValue1"}, + {Name: "tagName2", Value: "tagValue2"}, + }...) + require.NoError(t, err) + + values, err := store.GetBulk("key1", "key2") + require.NoError(t, err) + require.Len(t, values, 2) + require.Equal(t, "value1", string(values[0])) + require.Equal(t, "value2", string(values[1])) + }) + t.Run("Success: one value found, one not", func(t *testing.T) { + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + err = store.Put("key1", []byte("value1"), + []newstorage.Tag{ + {Name: "tagName1", Value: "tagValue1"}, + {Name: "tagName2", Value: "tagValue2"}, + }...) + require.NoError(t, err) + + values, err := store.GetBulk("key1", "key2") + require.NoError(t, err) + require.Len(t, values, 2) + require.Equal(t, "value1", string(values[0])) + require.Nil(t, values[1]) + }) + t.Run("Success: no values found", func(t *testing.T) { + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + err = store.Put("key1", []byte("value1"), + []newstorage.Tag{ + {Name: "tagName1", Value: "tagValue1"}, + {Name: "tagName2", Value: "tagValue2"}, + }...) + require.NoError(t, err) + + values, err := store.GetBulk("key3", "key4") + require.NoError(t, err) + require.Len(t, values, 2) + require.Nil(t, values[0]) + require.Nil(t, values[1]) + }) + t.Run("Failure: keys string slice cannot be nil", func(t *testing.T) { + store, err := provider.OpenStore(randomStoreName()) + require.NoError(t, err) + require.NotNil(t, store) + + values, err := store.GetBulk(nil...) + require.EqualError(t, err, "keys string slice cannot be nil") + require.Nil(t, values) + }) +} + // TestDelete tests common Delete functionality. func TestDelete(t *testing.T, provider newstorage.Provider) { t.Helper()