forked from FeatureBaseDB/featurebase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathholder_internal_test.go
98 lines (83 loc) · 2.22 KB
/
holder_internal_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Copyright 2022 Molecula Corp. (DBA FeatureBase).
// SPDX-License-Identifier: Apache-2.0
package pilosa
import (
"testing"
)
func setupTest(t *testing.T, h *Holder, rowCol []rowCols, indexName string) (*Index, *Field) {
idx, err := h.CreateIndexIfNotExists(indexName, "", IndexOptions{TrackExistence: true})
if err != nil {
t.Fatalf("failed to create index %v: %v", indexName, err)
}
f, err := idx.CreateFieldIfNotExists("f", "")
if err != nil {
t.Fatalf("failed to create field in index %v: %v", indexName, err)
}
existencefield := idx.existenceFld
qcx := h.Txf().NewWritableQcx()
defer qcx.Abort()
for _, r := range rowCol {
_, err = f.SetBit(qcx, r.row, r.col, nil)
if err != nil {
t.Fatalf("failed to set bit in index %v: %v", indexName, err)
}
_, err = existencefield.SetBit(qcx, r.row, r.col, nil)
if err != nil {
t.Fatalf("failed to set bit in index %v: %v", indexName, err)
}
}
if err = qcx.Finish(); err != nil {
t.Fatalf("failed to commit tx for index %v: %v", indexName, err)
}
shardsFound := idx.AvailableShards(includeRemote).Slice()
if len(shardsFound) != 3 {
t.Fatalf("expected 3 shards for index %v, got %v", indexName, len(shardsFound))
}
return idx, f
}
type rowCols struct {
row uint64
col uint64
}
func TestHolder_ProcessDeleteInflight(t *testing.T) {
h := newTestHolder(t)
rowCol := []rowCols{
{1, 1},
{1, 2},
{10, ShardWidth + 1},
{1, ShardWidth * 2},
}
idx1, f1 := setupTest(t, h, rowCol, "idxdelete1")
idx2, f2 := setupTest(t, h, rowCol, "idxdelete2")
err := h.processDeleteInflight()
if err != nil {
t.Fatalf("failed to delete: %v", err)
}
tests := []struct {
idx *Index
f *Field
}{
{idx1, f1},
{idx2, f2},
}
for _, test := range tests {
func() {
idx, f := test.idx, test.f
qcx := h.Txf().NewQcx()
defer qcx.Abort()
for _, r := range rowCol {
row, err := f.Row(qcx, r.row)
if err != nil {
t.Fatalf("failed to get row: %v", err)
}
existenceRow, err := idx.existenceFld.Row(qcx, r.row)
if err != nil {
t.Fatalf("failed to get row: %v", err)
}
if len(row.Columns()) != 0 || len(existenceRow.Columns()) != 0 {
t.Fatalf("expected columns for fields to be empty after delete")
}
}
}()
}
}