Skip to content

Commit

Permalink
refine code, add more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy committed Nov 22, 2023
1 parent 624d538 commit 9b74653
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 16 deletions.
18 changes: 9 additions & 9 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,18 @@ func TestCompareWriter(t *testing.T) {
afterWriterClose: afterClose,
}

// writePlainFile(suite)
// baseSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
// t.Logf("base speed for %d bytes: %.2f MB/s", source.outputSize(), baseSpeed)
writePlainFile(suite)
baseSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
t.Logf("base speed for %d bytes: %.2f MB/s", source.outputSize(), baseSpeed)

// suite.source = newAscendingKeySource(20, 100, 10000000)
// writeExternalFile(suite)
// writerSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
// t.Logf("writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed)
suite.source = newAscendingKeySource(20, 100, 10000000)
writeExternalFile(suite)
writerSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
t.Logf("writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed)

suite.source = newAscendingKeySource(20, 100, 10000)
suite.source = newAscendingKeySource(20, 100, 10000000)
writeExternalOneFile(suite)
writerSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
writerSpeed = float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
t.Logf("one file writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed)
}

Expand Down
10 changes: 6 additions & 4 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,7 @@ func MergeOverlappingFilesV2(
return err
}
maxKey = tidbkv.Key(iter.Key()).Clone()
err = writer.Close(ctx)
if err != nil {
return err
}

var stat MultipleFilesStat
stat.Filenames = append(stat.Filenames,
[2]string{writer.dataFile, writer.statFile})
Expand All @@ -220,5 +217,10 @@ func MergeOverlappingFilesV2(
MultipleFilesStats: []MultipleFilesStat{stat},
})
}

err = writer.Close(ctx)
if err != nil {
return err
}
return nil
}
3 changes: 1 addition & 2 deletions br/pkg/lightning/backend/external/onefile_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) err
_, buf, _, allocated := w.kvBuffer.Alloc(length)
if !allocated {
w.kvBuffer.reset()
logutil.BgLogger().Info("ywq test here...")
_, buf, _, allocated = w.kvBuffer.Alloc(length)
// we now don't support KV larger than blockSize
if !allocated {
Expand All @@ -110,7 +109,7 @@ func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) err
binary.BigEndian.AppendUint64(buf[lengthBytes+keyLen:lengthBytes+keyLen], uint64(len(idxVal)))
copy(buf[lengthBytes*2+keyLen:], idxVal)
w.kvStore.addEncodedData(buf[:length])
w.totalSize = uint64(keyLen + len(idxVal))
w.totalSize += uint64(keyLen + len(idxVal))
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/external/onefile_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestOnefileWriterBasic(t *testing.T) {
// 1. write into one file.
// 2. read kv file and check result.
// 3. read stat file and check result.

writer := NewWriterBuilder().
SetPropSizeDistance(100).
SetPropKeysDistance(2).
Expand Down Expand Up @@ -281,16 +280,20 @@ func TestOnefileWriterManyRows(t *testing.T) {
require.NoError(t, err)

kvCnt := 100000
expectedTotalSize := 0
kvs := make([]common.KvPair, kvCnt)
for i := 0; i < kvCnt; i++ {
randLen := rand.Intn(10) + 1
kvs[i].Key = make([]byte, randLen)
_, err := rand.Read(kvs[i].Key)
expectedTotalSize += randLen

require.NoError(t, err)
randLen = rand.Intn(10) + 1
kvs[i].Val = make([]byte, randLen)
_, err = rand.Read(kvs[i].Val)
require.NoError(t, err)
expectedTotalSize += randLen
}

slices.SortFunc(kvs, func(i, j common.KvPair) int {
Expand Down Expand Up @@ -352,4 +355,5 @@ func TestOnefileWriterManyRows(t *testing.T) {
require.EqualValues(t, expected.MaxKey, resSummary.Max)
require.Equal(t, expected.Filenames, resSummary.MultipleFilesStats[0].Filenames)
require.Equal(t, expected.MaxOverlappingNum, resSummary.MultipleFilesStats[0].MaxOverlappingNum)
require.EqualValues(t, expectedTotalSize, resSummary.TotalSize)
}
3 changes: 3 additions & 0 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
m.subtaskSortedKVMeta = &external.SortedKVMeta{}
onClose := func(summary *external.WriterSummary) {
m.mu.Lock()
logutil.BgLogger().Error("ywq test summary",
zap.String("category", "ddl"),
zap.Any("summary", summary))
m.subtaskSortedKVMeta.MergeSummary(summary)
m.mu.Unlock()
}
Expand Down

0 comments on commit 9b74653

Please sign in to comment.