Skip to content

Commit d426bcd

Browse files
authored
br/lightning: add kv writer for external backend (pingcap#46042)
ref pingcap#45719
1 parent 5a30540 commit d426bcd

File tree

7 files changed

+533
-55
lines changed

7 files changed

+533
-55
lines changed

br/pkg/lightning/backend/external/BUILD.bazel

+13-2
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,22 @@ go_library(
88
"file.go",
99
"iter.go",
1010
"kv_reader.go",
11-
"sharedisk.go",
1211
"stat_reader.go",
12+
"writer.go",
1313
],
1414
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
1515
visibility = ["//visibility:public"],
1616
deps = [
17+
"//br/pkg/lightning/backend",
18+
"//br/pkg/lightning/backend/encode",
19+
"//br/pkg/lightning/backend/kv",
20+
"//br/pkg/lightning/common",
21+
"//br/pkg/membuf",
1722
"//br/pkg/storage",
23+
"//kv",
1824
"//util/logutil",
1925
"//util/mathutil",
26+
"//util/size",
2027
"@com_github_pingcap_errors//:errors",
2128
"@org_golang_x_sync//errgroup",
2229
"@org_uber_go_zap//:zap",
@@ -31,15 +38,19 @@ go_test(
3138
"codec_test.go",
3239
"file_test.go",
3340
"iter_test.go",
41+
"writer_test.go",
3442
],
3543
embed = [":external"],
3644
flaky = True,
37-
shard_count = 13,
45+
shard_count = 15,
3846
deps = [
47+
"//br/pkg/lightning/backend/kv",
48+
"//br/pkg/lightning/common",
3949
"//br/pkg/storage",
4050
"@com_github_pingcap_errors//:errors",
4151
"@com_github_stretchr_testify//require",
4252
"@org_golang_x_exp//rand",
53+
"@org_golang_x_exp//slices",
4354
"@org_uber_go_atomic//:atomic",
4455
],
4556
)

br/pkg/lightning/backend/external/file.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
9595
s.rc.currProp.size += uint64(len(key) + len(value))
9696
s.rc.currProp.keys++
9797

98-
if s.rc.currProp.size >= s.rc.propSizeIdxDistance ||
99-
s.rc.currProp.keys >= s.rc.propKeysIdxDistance {
98+
if s.rc.currProp.size >= s.rc.propSizeDist ||
99+
s.rc.currProp.keys >= s.rc.propKeysDist {
100100
newProp := *s.rc.currProp
101101
s.rc.props = append(s.rc.props, &newProp)
102102

@@ -109,6 +109,14 @@ func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
109109
return nil
110110
}
111111

112+
// Close closes the KeyValueStore and append the last range property.
113+
func (s *KeyValueStore) Close() {
114+
if s.rc.currProp.keys > 0 {
115+
newProp := *s.rc.currProp
116+
s.rc.props = append(s.rc.props, &newProp)
117+
}
118+
}
119+
112120
var statSuffix = filepath.Join("_stat", "0")
113121

114122
// GetAllFileNames returns a FilePathHandle that contains all data file paths

br/pkg/lightning/backend/external/file_test.go

+19-7
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
3131
writer, err := memStore.Create(ctx, "/test", nil)
3232
require.NoError(t, err)
3333
rc := &rangePropertiesCollector{
34-
propSizeIdxDistance: 100,
35-
propKeysIdxDistance: 2,
34+
propSizeDist: 100,
35+
propKeysDist: 2,
3636
}
3737
rc.reset()
3838
initRC := *rc
@@ -49,7 +49,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
4949
// when not accumulated enough data, no range property will be added.
5050
require.Equal(t, &initRC, rc)
5151

52-
// propKeysIdxDistance = 2, so after adding 2 keys, a new range property will be added.
52+
// propKeysDist = 2, so after adding 2 keys, a new range property will be added.
5353
k2, v2 := []byte("key2"), []byte("value2")
5454
err = kvStore.AddKeyValue(k2, v2)
5555
require.NoError(t, err)
@@ -72,12 +72,21 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
7272

7373
err = writer.Close(ctx)
7474
require.NoError(t, err)
75+
kvStore.Close()
76+
expected = &rangeProperty{
77+
key: k3,
78+
offset: uint64(len(k1) + len(v1) + 16 + len(k2) + len(v2) + 16),
79+
size: uint64(len(k3) + len(v3)),
80+
keys: 1,
81+
}
82+
require.Len(t, rc.props, 2)
83+
require.Equal(t, expected, rc.props[1])
7584

7685
writer, err = memStore.Create(ctx, "/test2", nil)
7786
require.NoError(t, err)
7887
rc = &rangePropertiesCollector{
79-
propSizeIdxDistance: 1,
80-
propKeysIdxDistance: 100,
88+
propSizeDist: 1,
89+
propKeysDist: 100,
8190
}
8291
rc.reset()
8392
kvStore, err = NewKeyValueStore(ctx, writer, rc, 2, 2)
@@ -103,6 +112,9 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
103112
keys: 1,
104113
}
105114
require.Equal(t, expected, rc.props[1])
115+
kvStore.Close()
116+
// Length of properties should not change after close.
117+
require.Len(t, rc.props, 2)
106118
err = writer.Close(ctx)
107119
require.NoError(t, err)
108120
}
@@ -116,8 +128,8 @@ func TestKVReadWrite(t *testing.T) {
116128
writer, err := memStore.Create(ctx, "/test", nil)
117129
require.NoError(t, err)
118130
rc := &rangePropertiesCollector{
119-
propSizeIdxDistance: 100,
120-
propKeysIdxDistance: 2,
131+
propSizeDist: 100,
132+
propKeysDist: 2,
121133
}
122134
rc.reset()
123135
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)

br/pkg/lightning/backend/external/iter_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ func TestMergeKVIter(t *testing.T) {
6666
writer, err := memStore.Create(ctx, filename, nil)
6767
require.NoError(t, err)
6868
rc := &rangePropertiesCollector{
69-
propSizeIdxDistance: 100,
70-
propKeysIdxDistance: 2,
69+
propSizeDist: 100,
70+
propKeysDist: 2,
7171
}
7272
rc.reset()
7373
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)
@@ -118,8 +118,8 @@ func TestOneUpstream(t *testing.T) {
118118
writer, err := memStore.Create(ctx, filename, nil)
119119
require.NoError(t, err)
120120
rc := &rangePropertiesCollector{
121-
propSizeIdxDistance: 100,
122-
propKeysIdxDistance: 2,
121+
propSizeDist: 100,
122+
propKeysDist: 2,
123123
}
124124
rc.reset()
125125
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)
@@ -196,8 +196,8 @@ func TestCorruptContent(t *testing.T) {
196196
writer, err := memStore.Create(ctx, filename, nil)
197197
require.NoError(t, err)
198198
rc := &rangePropertiesCollector{
199-
propSizeIdxDistance: 100,
200-
propKeysIdxDistance: 2,
199+
propSizeDist: 100,
200+
propKeysDist: 2,
201201
}
202202
rc.reset()
203203
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)
@@ -249,8 +249,8 @@ func generateMockFileReader() *kvReader {
249249
panic(err)
250250
}
251251
rc := &rangePropertiesCollector{
252-
propSizeIdxDistance: 100,
253-
propKeysIdxDistance: 2,
252+
propSizeDist: 100,
253+
propKeysDist: 2,
254254
}
255255
rc.reset()
256256
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)

br/pkg/lightning/backend/external/sharedisk.go

-36
This file was deleted.

0 commit comments

Comments
 (0)