levelDB 源码解析之debug环境和数据写入

前言

leveldb 是google的大佬写的一个嵌入式的KV数据库。底层是使用的LSM-Tree 索引结构。本文主要涉及levelDB的数据写入,也就是Put 操作。我是个搞java的,C或者C++是不咋会,所以边看边学,可能比较啰嗦。

Debug环境搭建

首先拉代码:

git clone https://github.com/google/leveldb.git

因为源码中的third_party 包含了两个,一个是google的test,还有一个是benchmark。需要拉下来,放到third_party的目录,或者直接在目录里pull

git clone https://github.com/google/benchmark.git
git clone https://github.com/google/googletest.git

按照文档中,需要

mkdir -p build && cd build
cmake -DCMAKE_BUILD_TYPE=Release .. && cmake –build .

但是因为我的目的是在clion debug。所以暂时没有做这一步,而是在源码中新增了一个app目录,下面放我的demo代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <cassert>
#include <iostream>
#include "include/leveldb/db.h"
int main() {
leveldb::DB *db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "/home/tuitaking/source/leveldb/file", &db);
assert(status.ok());
std::cout << "leveldb open success!" << std::endl;
std::string value;
std::string key1 = "testkey1";
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.IsNotFound()) {
std::cout << "can not found for key:" << key1 << std::endl;
db->Put(leveldb::WriteOptions(), key1, "testvalue1");
}
s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) {
std::cout << "found key:" << key1 << ",value:" << value << std::endl;
}
s = db->Delete(leveldb::WriteOptions(), key1);
if (s.ok()) {
std::cout << "delete key success which key:" << key1 << std::endl;
}
s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.IsNotFound()) {
std::cout << "can not found after delete for key:" << key1 << std::endl;
}
delete db;
return 0;
}

然后在CMakeLists.txt 中新增上这个模块

1
2
3
4
5
6
add_library(leveldb "" )
target_sources(leveldb
PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
"app/main.cpp" # 加这个位置
"db/builder.cc"

然后将test 也加入

1
2
leveldb_test("db/c_test.c")
leveldb_test("app/main.cpp") # 加的这行

然后就可以在clion中看到一个叫main的test,直接可以debug。至于为什么估计需要看cmake 相关的知识,暂时不去管他,总之这样就可以直接debug了。如果是windows,cmake的环境只要做好了,其他的都是类似。在clion中可以直接debug。

数据的写入

本文暂时不去处理DB的初始化,而是 看PUT操作。

1
db->Put(leveldb::WriteOptions(), key1, "testvalue1");

其中,writeOptions 标识此时是一个写入操作,写入的key就是key1,值就是testvalue1。写入的options 参数只有一个,就是是否sync。也就是写入pagecache 就返回还是刷到磁盘才返回,后者肯定速度慢一点。

1
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) 

put 的入参为WriteOptions,Slice ,可以看到我们传入的string在这里变成了slice,感觉是c++ 的一个优化还是啥,因为Slice 中有 Slice(const std::string& s) 这样的构造函数,回过来,Slice 可以看做一个变长的而且可以做比较的字符串,使用的是一个c const char* data_; size_t size_;来表示的。然后put操作会将当前传入的值变成一个WriteBatch

WriteBatch 包含了两个成员变量和六个函数:

  • friend class WriteBatchInternal;

    • friend 是一个C++的关键字,表示这个类可以访问私有成员变量
    • 这个对象没有成员变量,全部都是成员函数,用来操作WriteBatch 中的req
  • std::string rep_;

    • 写入的数据就放入到这个string中 在 write_batch.cc 中有标识:

      • WriteBatch::rep_ :=
        sequence: fixed64 // 表示当前写入的seq,每一个写入都有一个seq,用来做类似于mvvc的版本控制的(?)
        count: fixed32 // 当前bacth中写入的个数
        data: record[count] // 写入的值,按照keylen+keyValue+dataLen+dataValue

        [ sequence: fixed64 ][ count: fixed32 ][ record[0] ][ record[1] ]…[ record[count-1] ]

Put方法首先就是将key和value放入到writebatch的req中,也就是上文中的data中的值。数据组织完毕。此时的req 的值为

“\000\000\000\000\000\000\000\000\001\000\000\000\001\008testkey0\004aaaa”

在正式写入的方法中

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates)

包含了下面几个操作:

  1. 将Put操作封装为Writer,并且放到writers 队列中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Writer w(&mutex_); 
w.batch = updates;
w.sync = options.sync;
w.done = false;
// 加锁
MutexLock l(&mutex_);
// 放入队列中 std::deque<Writer*> writers_
writers_.push_back(&w);
// 如果当前的队列不是在头部,则等待。这里就可以控制写入是先来后到的顺序。因为push_back 是加锁的操作,而 且在push之前就已经加锁了。
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
// 等待
}
// 检查下是否已经完成写入,因为下面会有个合并当前写入的操作
if (w.done) {
return w.status;
}
  1. 上文的代码片段是将写入操作放入队列中,感觉就是和java的线程池中的Runnable 队列一样,将需要执行的work放在队列中。下面是写入前的一些操作:
1
2
3
4
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr); // 确保当前的资源可以使用,涉及到mem和log,暂时不说明,主要就是为本次写入做资源的准备
uint64_t last_sequence = versions_->LastSequence();// 从version中获取当前最后的seq
Writer* last_writer = &w;
  1. 创建完空间和获取到seq,就开始执行写入操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); // 这里就是上文提到的,将此时队列中的write 合并为一个,一起写入
// 将本次写入的seq 写入到batch中
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
{
// 注释A
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
mutex_.Unlock(); //释放掉了写入queue中的
// 写入到日志中
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) { // 如果需要立即刷盘,就将数据刷盘
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
// 写入到内存中
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
// 虽然本次是一次写入,但是每次写的seq 都是新增在seq中的,所以seq 应该是一个递增的值
versions_->SetLastSequence(last_sequence);
}

考虑下当前的场景,如果开始有t1,t2,t3 同时写入,此时t1 在writes的front,所以他可以执行上文3 中的代码,也就是将数据写入到log和mem,但是此时writers的front仍然是t1,所以在将队列中所有数据(BuildBatchGroup 会将当前队列中的数据全部变成一个batch)准备好后,如果此时t4,t5,t6线程进行并发写入,他们也会在上文2的代码中被新增到队列尾部,但是不会进入到写操作中,但是由于此时我们的writes的add操作也是被加锁了,所以t4,t5,t6应该是可以加到队列中,但是因为他们都不是front,所以都会在等待,一直到front变成他们中的某一个。

等到当前队列中的数据都写入到日志和内存中后,这个时候会再次获取到锁,判断当前的写入是否出现异常,然后释放当前写入的资源,也就是tmp_batch (BuildBatchGroup)核心就是将写入封装为这个tmp_batch。这里的异常主要是判断是否磁盘有异常,如果有就直接将后续所有的写入全部唤醒为异常状态。清理动作做完后就将sequence进行修改。可以看到sequence的获取和修改都是在加锁的状态下完成的。

  1. 当前新写入成功后,就可以释放后续的写入线程了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
   
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;

这里的唤醒核心就是将writers的队列,这个last_writer 指向的就是上文中已经被写入的最后一个writer,也就是说会挨个唤醒和pop出队列,一直到当前写入批次的最后一个。

如果writers 不为空,则继续唤醒队头,执行上面的操作。

通过将数据的写入进行入队控制写入的顺序,然后将队列中的当前大小(通过锁确定合并写入队列不持续增加)进行合并写入,然后写入日志和内存,最后挨个唤醒。一个锁做到了控制队列,合并写入等功能。

写入日志

写入日志也就是上文的

1
log_->AddRecord(WriteBatchInternal::Contents(write_batch));