Skip to content

Commit

Permalink
Draft: improve draft
Browse files Browse the repository at this point in the history
  • Loading branch information
selfboot committed Jan 23, 2025
1 parent 91afcfd commit da8fb67
Showing 1 changed file with 144 additions and 5 deletions.
149 changes: 144 additions & 5 deletions source/_drafts/leveldb-source-writedb.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ leveldb::Status status = leveldb::DB::Open(options, "./db", &db);
status = db->Put(leveldb::WriteOptions(), key, value);
```
LevelDB 最大的优点就是写入速度非常快,支持的并发特别高。官方给过一个[写入压力测试结果](https://github.com/google/leveldb/tree/main?tab=readme-ov-file#write-performance):
LevelDB 最大的优点就是**写入速度也非常快,可以支持很高的并发随机写**。官方给过一个[写入压力测试结果](https://github.com/google/leveldb/tree/main?tab=readme-ov-file#write-performance):
```shell
fillseq : 1.765 micros/op; 62.7 MB/s
Expand All @@ -29,15 +29,154 @@ overwrite : 2.380 micros/op; 46.5 MB/s

<!-- more -->

## LevelDB 写入流程
## LevelDB 写入 key 的 2 种方式

LevelDB 支持一次写入一个键值对,也支持一次写入多个键值对。不论是单个写入,还是批量写内部都是通过 [WriteBatch](https://selfboot.cn/2025/01/13/leveldb_source_write_batch/) 来处理。

如果一次只写入一个键值对,LevelDB 内部也是通过 WriteBatch 来处理。如果 在高并发情况下,可能会在内部合并多个写操作,然后将这批键值对写入 WAL 并更新到 memtable。
```cpp
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
```
我们可以选择在调用 LevelDB 接口的应用层聚合写入操作,从而实现批量写入,提高写入吞吐。例如,在应用层可以设计一个缓冲机制,收集一定时间内的写入请求,然后将它们打包在一个 WriteBatch 中提交。这种方式可以减少磁盘的写入次数和上下文切换,从而提高性能。
当然也可以每次都写入单个键值,这时候 LevelDB 内部会通过 WriteBatch 来处理。如果 在高并发情况下,可能会在内部合并多个写操作,然后将这批键值对写入 WAL 并更新到 memtable。
这里整体写入还是比较复杂的,本篇文章只先关注写入到 WAL 和 memtable 的过程。
## LevelDB 写入步骤
完整的写入部分代码在 [leveldb/db/db_impl.cc 的 DBImpl::Write](https://github.com/google/leveldb/blob/main/db/db_impl.cc#L1205) 方法中,咱们一点点拆开看吧。
```cpp
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
// ...
}
```

开始部分把 WriteBatch 和 sync 参数赋值给 Writer 结构体,然后通过一个 writers_ 队列来管理多个 Writer 结构体。这两个结构体和队列在整个写入过程中还是挺重要的,先来看看。

### Writer 结构和处理队列

这里 [writers_](https://github.com/google/leveldb/blob/main/db/db_impl.h#L186) 是一个 `std::deque<Writer*>` 类型的队列,用于管理多个 Writer 结构体。

```cpp
std::deque<Writer*> writers_ GUARDED_BY(mutex_);
```
这里队列用 `GUARDED_BY(mutex_)` 装饰,表示队列的访问需要通过 `mutex_` 互斥锁来保护。这个用到了 Clang 的静态线程安全分析功能,可以参考我之前的文章 [LevelDB 源码阅读:利用 Clang 的静态线程安全分析](https://selfboot.cn/2025/01/02/leveldb_source_thread_anno/)
这里 Writer 结构体定义如下:
```cpp
struct DBImpl::Writer {
explicit Writer(port::Mutex* mu)
: batch(nullptr), sync(false), done(false), cv(mu) {}
Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
};
```

这里 Writer 结构体封装了不少参数,其中最重要是一个 WriteBatch 指针,记录了每个 WriteBatch 写请求的数据。然后用一个 status 用来记录每个 WriteBatch 写请求的错误状态。

此外,用一个 sync **来标记每个 WriteBatch 写请求是否需要立马刷到磁盘中**。默认是 false,不强制刷磁盘,如果系统崩溃,可能会丢掉部分还没来得及写进磁盘的数据。如果打开了 sync 选项,每次写入都会立马刷到磁盘,整体写入耗时会上涨,但是可以保证只要写入成功,数据就不会丢失。关于刷磁盘文件的更多细节,可以参考我之前的文章[LevelDB 源码阅读:Posix 文件操作接口实现细节](https://selfboot.cn/2024/08/02/leveldb_source_env_posixfile/)

如果想利用批量写入的性能优势,则需要在**应用层聚合这些写入操作**。例如,我们可以设计一个缓冲机制,收集一定时间内的写入请求,然后将它们打包在一个 WriteBatch 中提交。这种方式可以减少对磁盘的写入次数和上下文切换,从而提高性能
还有一个 **done 则用来标记每个 WriteBatch 的写请求是否完成。**这里因为内部可能会合并写入多个 WriteBatch,当本次写入请求被合并到其他批次写入后,本次请求标记完成,就不需要再处理了。从而避免重复执行,提高并发的写入效率

为了**实现等待和通知,这里还有一个条件变量 cv,用于支持多个写请求的批量处理,并实现多个写请求的同步**。写入的时候,多个线程可以同时提交写入请求,每个写请求都会先被放入写入队列。**实际写入过程,则是串行化写入,同一时刻只有一批写入过程在执行**。每次会从队列中取出队首的写请求,如果此时队列中还有其他等待的写任务,则会被合并为一个批次一起处理。在当前批次的写入请求处理过程中,后续来的请求进入队列后都需要等待。当前批次的请求处理完成后,会通知后面进入队列在等待中的写请求。

结合这里的介绍,应该能看懂前面 Write 方法开始部分代码的含义了。对于每个写入请求,都会先创建一个 Writer 结构体,然后将其放入 writers_ 队列中。接下来在 while 循环中,判断当前写入请求是否完成,如果完成就会直接返回当前写入的状态结果。如果当前写入请求没在队首,则需要等待在 cv 条件变量上。

如果当前写入请求在队首,那么就需要执行实际的写入操作了,这里具体写入流程是什么样呢?

### 写入过程详细分析

## 其他实现细节问题

### 混合 sync 和非 sync 写入

如果有一批写入请求,其中既有 sync 又有非 sync 的写入,那么 LevelDB 内部会怎么处理呢?

前面分析可以看到每次取出队首的写入任务后,会尝试合并队列中后续的写入任务。因为每个写入任务可以强制 sync 刷磁盘,也可以不刷,合并的时候,怎么处理这种混合不同 sync 配置的写入任务呢?

这里配置 **sync=true 的时候写入会强制刷磁盘,对于合并后的批次写入,取得是队首的 sync**[核心代码](https://github.com/google/leveldb/blob/main/db/db_impl.cc#L1237)如下:

```cpp
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
//...
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
// ...
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
// ...
}
}
}
```
所以,如果队首是的任务是不需要刷磁盘,那么合并的时候,就不能合并 sync=true 的写入任务。[核心实现代码](https://github.com/google/leveldb/blob/main/db/db_impl.cc#L1302)如下:
```cpp
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
// ...
}
```

不过如果队首是 sync=true 的写入任务,那么合并的时候,就不需要考虑被合并的写入任务的 sync 设置。因为整个合并后的批次,都会被强制刷磁盘。

### 优化大批量小 key 写入延迟

上面实现可以看到,如果大批量并发写入的时候,写入请求会先被放入队列中,然后串行化写入。如果写入的 key 都比较小,那么从队首取出一个写入任务,然后和当前队列中的其他写入合并为一个批次。合并的时候,需要设置一个 max_size 来限制合并的 key 数量,那么这里 max_size 要设置多少合理呢?

这里 LevelDB 给了一个经验值,默认是 1 << 20 个字节。但是考虑一个场景,如果写入的 key 都比较小,合并的时候,可能会合并很多 key,从而导致写入耗时变长。**由于是小 key 的写入,写入耗时长的话,体验上来并不好**

所以这里加了个小优化,如果当前队首写入任务的整体 size 小于 128 << 10 个字节,那么这里 max_size 就会小很多。当然,这个值应该也只是经验值,我也没找到官方具体的说明。相关代码在
[BuildBatchGroup](https://github.com/google/leveldb/blob/main/db/db_impl.cc#L1289) 中:

```cpp
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128 << 10)) {
max_size = size + (128 << 10);
}
```
##
## Compaction 机制
当存在未完成的压缩任务或 Level 0 文件过多时,写操作会通过调用 background_work_finished_signal_.Wait() 进入等待状态。这里的等待是为了防止新的写入操作进一步加剧存储压力
Expand Down

0 comments on commit da8fb67

Please sign in to comment.