前言

​ 在 LevelDB 中,写完 WAL 日志以后,就可以将数据写入到 MemTable 了。MemTable 是 LSM-Tree必不可缺的一个组件,主要作用如下:

  1. 写入的时候作为随机写转换为顺序写的buffer也是对数据进行排序的处理器
  2. 读取的时候作为热点数据(刚写入的数据)的cache,加快读取速度
  3. SSTable 的数据来源,初始的SSTable都是一个MemTable的持久化

在具体的使用中,MemTable 需要在内存中开辟堆空间,所以需要内存管理。客户端一般写入MemTable后就可以返回成功。

本文将针对MemTable 在 LevelDB 中的实现做一个简单的介绍,一起将客户端的写入过程也做了个介绍,会涉及到LevelDB如何控制并发等。

MemTable的实现在db/memtable.hdb/memtable.cc。内存管理的是实现是在util/arena.h

数据结构

MemTable 数据结构就是前面提到过的SkipList。LevelDB 是将key和value组合在一起成为一个SkipList中的Node,所以MemTable中还包含了一个比较器。这里对SkipList不再做具体的介绍,如果想深入了解,可以看下我前面写的SkipList原理和Java实现。

db/memtable.h中的具体实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class MemTable {
public:
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator);

MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;

// Increase reference count.
void Ref() { ++refs_; }

// Drop reference count. Delete if no more references exist.
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
delete this;
}
}

// Returns an estimate of the number of bytes of data in use by this
// data structure. It is safe to call when MemTable is being modified.
size_t ApproximateMemoryUsage();

// Return an iterator that yields the contents of the memtable.
//
// The caller must ensure that the underlying MemTable remains live
// while the returned iterator is live. The keys returned by this
// iterator are internal keys encoded by AppendInternalKey in the
// db/format.{h,cc} module.
Iterator* NewIterator();

// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
// Typically value will be empty if type==kTypeDeletion.
void Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value);

// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s);

private:
friend class MemTableIterator;
friend class MemTableBackwardIterator;

struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {}
int operator()(const char* a, const char* b) const;
};

typedef SkipList<const char*, KeyComparator> Table;

~MemTable(); // Private since only Unref() should be used to delete it

KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_;
void PrintBuffer(char* buf, size_t i);
};

上面的实现中可以看到,MemTable 拒绝复制当前内部的数据,或者说拒绝使用拷贝构造函数和赋值来拷贝当前的数据。

1
2
3
MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;
这两个方法的代码指的是禁止了MemTable的复制,也就是说一个MemTable不能通过拷贝构造函数构创建新对象的方式复制当前MemTable对象。也不能使用赋值运算符来创建一个新的对象并且指向当前的对象,赋值运算符也是会拷贝数据的。

能够创建一个MemTable的方式只有使用传入一个InternalKeyComparator。也不能够复制当前MemTable中的数据。

MemTable中还有一个Ref,这个和gc中的引用是一个意思,如果这个refs中的数据没有到0,说明有人还在使用这个MemTable,那么就不能进行内存释放。

ApproximateMemoryUsage当前已经使用的内存,这个命名是说这个是一个大概的值,我觉得挺神奇的,后面看下为什么是大概的值,是否可能会有并发访问结果不一样的情况。

NewIterator创建一个迭代器

提供的读写接口就两个,一个是Get 一个是Add:

  • Add 返回值是void 入参分别为
    • SequenceNumber seq 当前写入的sequence
    • ValueType type // 类型,该类型只有两种,一个是删除,一个是写入
    • const Slice& key // 写入的key
    • const Slice& value)//写入的Value
  • Get 返回值是true 入参分别为:
    • const LookupKey& key // 查询的Key值
    • std::string* value 如果有数据就会写在value里面,也就是如果返回为true,那么值就在value里面
    • Status* s 如果查询的值已经被删除,那么status会有一个error但是函数会返回true

私有域中主要包含了两个迭代器,Key的比较器,内存分配的Arena 和Table,这个Table就是SkipList。在析构函数里说的很清楚的就是只有在Unref里面可以调用,也就是当refs_的值小于等于0 的时候就可以释放当前的内存。

Add

add 是将当前的值写入到MemTable中,为了更好理解里面的并发控制和sequence,直接看db的Write操作,是在db/db_impl.cc中的DBImpl::Write中,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_); // 每次写入都会封装为一个Writer,这个在前面的Log中有了解,注意的是,Writer中传入了互斥锁
w.batch = updates; // 写入操作在Put的是已经封装为了WriteBatch
w.sync = options.sync; // 判断当前是否同步刷盘,也是Log中的操作
w.done = false; // 当前写入状态初始为false
// 锁a
MutexLock l(&mutex_); // 首先根据信号量初始化锁
writers_.push_back(&w); // 将写入操作放入一个writers队列中,注意因为有锁,所以同一时间只会有一个线程进入到writers中
while (!w.done && &w != writers_.front()) {
w.cv.Wait(); // 需要注意的是,在Wait阶段是会释放锁的
} // 使用队头的writer作为本次写入的writer
if (w.done) { // 如果本次的写入已经被完成,则直接返回写入的状态
return w.status;
}

// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr); // 创建空间用于写入磁盘或者memTable
uint64_t last_sequence = versions_->LastSequence(); // 从当前的version中获取到最后使用的sequence
Writer* last_writer = &w; //本次写入的writer赋值给last_writer,注意为什么这里明明&w是队头却是lastwriter,是因为在后面的BuildBatchGroup方法中,会将本次批量写入的最后一个writer赋值给他
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); // 将当前队列中所有的writer里面的数据合并为一次写入
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);// 设置本次批量写入的sequence,sequence每次写入都是递增的,保证了写入的顺序,也能够进行读取的MVVC
last_sequence += WriteBatchInternal::Count(write_batch); // 更新当前的lastSequence,write_batch 中包含了当前数据的大大小
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock(); // 释放队列锁,此时可以继续写入writers队列了。但是由于当前的writer 没有从队头移除,所以此时仍然等待在 w.cv.Wait();中
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); // 写入Log文件
bool sync_error = false;
if (status.ok() && options.sync) { // 写入成功,是否同步刷盘
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_); // 此处写入mem
}
mutex_.Lock(); // 再次获取到锁,暂停线程写入writers,注意的是,这个锁的释放是等到本次线程退出方法,调用MutexLock的析构函数达到释放锁的目的
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear(); // 清理tmp_batch
versions_->SetLastSequence(last_sequence);// 设置sequence 到version中
}

while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
} // 依次唤醒本次写入的后续writer,此时会从上面的while中继续调用,如果是头节点,而且已经被写入则直接返回,否则就继续执行上面的代码,该循环一直到本次写入的最后一个writer位置

// Notify new head of write queue
if (!writers_.empty()) { // 唤醒下一次调用
writers_.front()->cv.Signal();
}

return status;
}

算得上逐行解释了上面的代码。接下来看下如何使用一个互斥锁实现兼顾多线程顺序写入和效率的。

Add中的锁

先来看最开始的while循环中的cv.Wait()方法,在port/port_stdcxx.h中,实现为:

1
2
3
4
5
6
void Wait() {
std::unique_lock<std::mutex> lock(mu_->mu_, std::adopt_lock);//ustd::unique_lock能实现自动加锁与解锁功能,第一个参数是指的传入的参数进行上锁,如果有第二个参数即 std::adopt_lock:表示这个互斥量已经被lock()过了,无需在本次构造函数中加锁,否则会报错。

cv_.wait(lock); // wait 方法会对互斥锁解锁,然后阻塞等待,一直到被notify唤醒。唤醒之后会再次获取锁,一直到获取锁成功后才继续往下执行。
lock.release(); // 检测当前锁是否没有释放,如果是则释放掉
}

也就是说每次写入在Wait过程中,会释放当前获取的锁,允许后面的writer 写入到writers中。

下面举例说明:

如果当前有t1,t2,t3,并发写入。假设初始阶段writers为空:

  1. t1,t2,t3 写入的时候,首先假设t1 首先执行MutexLock l(&mutex_); 即获取到了mutex_的锁,t1 被写入到writers中,而且是作为队头,所以不需要进入Wait状态,直接开始写,此时t2,t3 在等待获取锁。
  2. t1 执行到了mutex_.Unlock();此时t1的writer 还在队头,所以t2,t3 被写入到writers中等待唤醒。
  3. 此时writers中包含了t2,t3,但是t1只会写自己的数据,因为他是在合并后才释放锁让t2,t3进入的.t1先写入Log,然后写入MemTable,此时不存在并发写的情况,因为其他的并发都会被放到writers中。
  4. 等到t1将自己的数据写入到Log 和MemTable后,t1 再次获取到锁,此时阻塞writers的中继续新增writer。
  5. 因为t1 的没有进行操作合并,所以他不会唤醒其他的写,只会唤醒下一次的队头,但是此时并没有释放锁。只是唤醒了t2,t2在等待t1释放锁。
  6. t1 本次写入返回,方法栈中的MutexLock 被释放,然后t2获取到锁,此时writers中的数据有t3t2
  7. t2 的操作相比t1 多了一个合并数据和唤醒的动作。这里就不赘述了。

上文中一个就涉及到1个锁,却能够有效的将数据的顺序和并发全部完成。使用一个队列,将多线程写入转换为单线程写入,保证了顺序,也有效地保证了效率。

校验空间容量

在写入之前,还需要看下当前的内存空间,和level0的文件数,实现就在db/db_impl.cc的MakeRoomForWrite方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld(); // 确认当前的线程获取到了锁
assert(!writers_.empty()); // 有writer 操作
bool allow_delay = !force; // 是否运行缓冲,默认是1ms
Status s; // 返回的状态
while (true) {
if (!bg_error_.ok()) { // 这个bg_error是后台合并level0 的时候的一个操作
// Yield previous error
s = bg_error_;
break;
//1 如果允许等待(正常写入可以等待。force==updates==nullptr),并且当前的0层
// 文件触发了需要等待的条件(0 层文件大于等于8)
} else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();// 首先会释放锁,因为此时会等待操作进行完成,没必要不让后续的写入进入
env_->SleepForMicroseconds(1000);// 等到1ms
allow_delay = false; // Do not delay a single write more than once,每次写入最多运行等待一次
mutex_.Lock(); // 加锁,说明要开始干活了
//2 如果当前的内存足够,而且level0 的文件数量没有超过最大,说明有足够的内存和文件,直接返回stats
// write_buffer_size 大小为4MB,也就是说一个内存文件大小一般在大于4MB的时候就需要切换了
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;
//3 如果正在执行内存文件的合并,则等待内存文件合并完成
} else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();
//4 如果有太多的level0层文件(默认12)。则等待文件合并完成
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
background_work_finished_signal_.Wait();
} else {
//5 如果当前的文件数量小于8,内存资源不够,而且没有进行合并,则说明需要创建一个新的内存文件
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
// 文件的名称也就是num 也是version提供的
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
// 创建可写文件,创建失败则说明岗前的num可以重复使用
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
// 释放当前的Log文件
delete log_;
// 关闭当前的Log文件
s = logfile_->Close();
if (!s.ok()) {
// We may have lost some data written to the previous log file.
// Switch to the new log file anyway, but record as a background
// error so we do not attempt any more writes.
//
// We could perhaps attempt to save the memtable corresponding
// to log file and suppress the error if that works, but that
// would add more complexity in a critical code path.
RecordBackgroundError(s);
}
// 释放内存
delete logfile_;
// 将上面创建的文件复制写Log,成为新的Log日志文件
logfile_ = lfile;
// 设置num
logfile_number_ = new_log_number;
// 将创建的文件赋值给Log中的writer
log_ = new log::Writer(lfile);
// 将当前mem_ 的指针复制给imm_,说明当前的mem已经准备刷到level0 了。
imm_ = mem_;
// 设置是has_imm_ 为true,这里的 memory_order_release 前面说过,就是不允许指令重排
has_imm_.store(true, std::memory_order_release);
// 创建新的MemTable,传入当前的比较器
mem_ = new MemTable(internal_comparator_);
// 给当前的mem 添加引用
mem_->Ref();
force = false; // Do not force another compaction if have room
//尝试调用后台合并
MaybeScheduleCompaction();
}
}
return s;
}

这里分配的空间核心还是内存是否超过4MB,以及当前level0 的数据是否超过配置的阈值。如果说调大上面的值肯定可以提高一定的吞吐量。但是后期合并的数据量也对应会增加,个人觉得如果key,value 都比较小,则4MB就足够了,但是如果每次都是超大的key和value,就可以考虑调大方法中的参数,避免频繁合并。

合并操作

合并操作个人觉得没有什么好了解的,里面有个小tips就是直接对writers的迭代iter++ 这样可以有效的避免其实只有一个还需要走下面的合并操作。

开始写入MemTable

在写入完Log 后就开始执行写入MemTable了。

1
2
3
4
5
6
7
 status = WriteBatchInternal::InsertInto(write_batch, mem_);
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}

首先是创建了一个inserter的对象,赋值mem和sequence,然后调用WriteBatch的迭代器写入。这么封装有什么好处呢?为什么不在MemTable里做一个循环直接往里面写呢?个人觉得是为了解耦,MemTable插入的数据就是简单的Slice 对象,而不用去考虑里面的batch,通过迭代器解析然后插入,能够将WriteBatch的职责和MemTable的职责做一个很好的区分。

在正式进入迭代方法之前,先来看下此时一个的WriteBatch 数据结构。这里就不贴全部的源码了,只是说下WriteBatch里面主要是一个Slice的key和Slice 的value。这些值最后都会被挡在一个string类型名为rep_参数中。

每次写入都会将当前操作类型即ValueType放入到req中,写入之后的seq为:

1
2
如果是kTypeValue则数据是 [seq预留8字节,全部为0][count][kTypeValue(1字节)][key_length][key_value][value_length][value_value]
如果是kTypeDeletion 则数据为[seq预留8字节,全部为0][count][kTypeDeletion(1字节)][key_length][key_value]

然后在BuildBatchGroup 中也只是对这个值进行一个追加,最后是一个大的WriteBatch,包含了n个写入,这个在WriteBatch 中是一个append,每次都会将需要append的数据截取12字节后面的数据,然后将count重新设置到被append的count中。执行完BuildBatchGroup 后,就会在前面的4个字节中写入sequence。

结合Log来看,这里的key,value的值都是使用的Varint32位,这也是为什么需要在Log中写入Fragement的原因了。

了解到了WriteBatch 的数据结构就不在去看WriteBatch::Iterate里面的源码里,其实就是根据数据结果解析为key和value,然后分为delete方法或者Put方法。但是需要注意的是,WriteBatch::Iterate的里面的Put方法最后走到了class MemTableInserter : public WriteBatch::Handler 这个类里面的Put方法里。这个方法会对核心就是对当前批次写入的sequence进行解析和插入到MemTable中,说明虽然我们合并了数据的写入,但是在写入Mem中的时候sequence 还是按照写入顺序+1 的。

1
2
3
4
5
6
7
8
void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void Delete(const Slice& key) override {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}

LevelDB里面的迭代器目前已经了解了两个了,一个是MemTable中的Iterator,用来查询数据,还有一个就是当前的写入了,其实迭代器在LevelDB中使用比较多,后面全部过一遍再回头来看迭代器的使用。

Add 方法

看到这里,终于开始往MemTable 写数据了。具体实现比较简单,主要就是Key值的创建也就是SkipList中Node 的创建。具体代码实现在db/mem_table.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type) // 将type 放在seq后面
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = arena_.Allocate(encoded_len); //创建当前encode 后的数据大小的内存,ie:指向0x8060f8
char* p = EncodeVarint32(buf, internal_key_size);// 将internal_key_size的值放入到buf中,返回指向 //0x8060f9
std::memcpy(p, key.data(), key_size);// 将key的值放入到p指针
p += key_size; //因为key的值有6个
EncodeFixed64(p, (s << 8) | type);
p += 8; //0x8060ff 仍然指向
p = EncodeVarint32(p, val_size);//0x806107
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
// for (size_t i = 0; i < encoded_len; ++i) {
// unsigned char c = static_cast<unsigned char>(buf[i]);
// std::bitset<8> binary(c); // 将字符转换为 8 位二进制
// std::cout << "Character: " << buf[i] << " Binary: " << binary << std::endl;
// }
table_.Insert(buf);
}

ValueType的值就是上面提到的MemTableInserter 中put或者delete来传入的。

然后就是写入MemTable的Key的的构建了,最后构建的结果如下:

1
[internal_key_size_length Varint64][key_length][key_value][tag[sequence<<8|type] 8字节][value_length][value_value]

memTable_innerkey1

然后就是开辟内存空间,传入的参数为上面的所有值长度之和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
inline char* Arena::Allocate(size_t bytes) {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert(bytes > 0);
// 如果当前的内存小于剩下的内存,则直接在剩下的内存中进行分配
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
return AllocateFallback(bytes);
}

如果上次开辟的空间未使用大于本次使用的空间,直接使用,否则就新创建空间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
char* Arena::AllocateFallback(size_t bytes) {
// 当前的额分配的是否大于1k,如果大于1k直接开通当前的数量
if (bytes > kBlockSize / 4) {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
char* result = AllocateNewBlock(bytes);
return result;
}
// 如果超过1k,创建4k的内存
// We waste the remaining space in the current block.
alloc_ptr_ = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize;

char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}

在Log中提到,每次读取都是4kb,所以这里每一个Block也是4kb。仅仅分为两种情况,超过1kb的直接分配需要的内存数,否则直接分配4kb,具体的内存分配在AllocateNewBlock:

1
2
3
4
5
6
7
8
9
10
// 将创建的内存方在blocks_的链表中,然后给memory_usage_ 的值添加一下
//memory_usage_ 记录的就是当前memtable 使用的内存大小,如果超过配置的最大缓存值,会将内存数据写入到磁盘上
// 内存分配就是很简单的创建一个char数组,然后push到队列中。
char* Arena::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes];
blocks_.push_back(result);
memory_usage_.fetch_add(block_bytes + sizeof(char*),
std::memory_order_relaxed);
return result;
}

可以看到内存分配非常简单,就是将分配一个char数组,然后放入到blocks的vector里面。为什么可以这么简单的管理呢?这个和Area的生命周期相关,因为一个MemTable包含了一个Area独占的对象,当前MemTable如果释放了空间,说明当前的MemTable没有被其他人引用,而且已经写入到level0的sstable里了,所以area里面的空间可以直接释放。因为不会有人正在访问该空间了。

分配完成以后,就直接写入到SkiptList里面,MemTable的传入的Comparator 也就是MemTable使用的Comparator了。

Get

首先还是看db/db_impl.cc中的Get方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_); // 加锁,这里加锁的主要原因是需要获取当前的全局变量versions_,然后获取到sequence的值
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}

MemTable* mem = mem_; // 当前正在写的Memtable
MemTable* imm = imm_;// 准备收入level0的MemTable,只读MemTable
Version* current = versions_->current(); //当前的version对象
mem->Ref(); // 给mem 新增一个引用,避免读取过程中该mem被回收
if (imm != nullptr) imm->Ref();// 给imm 新增一个引用,
current->Ref(); // version也增加引用

bool have_stat_update = false;
Version::GetStats stats;

// Unlock while reading from files and memtables
{
mutex_.Unlock(); // 释放锁,因为当前影响读取的version 或者sequence已经获取到了,所以不在会访问这些全局变量,而且mem和mem_也不会被回收。这里留下个问题,如果说当前读取操作的时候,mem和imm 都满了,等着写,那么岂不是也要等待数据读完?个人觉得是不会的,因为Imm已经在此处变成了本地变量,所以如果合并的时候已经被清理,那么是否回收这个imm的对象就交给了当前查询过程中的这个方法,即变成了栈空间对象的回收
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);// 创建查看的key,
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}

if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}

这个LookupKey 核心实现就是组装当前查的值

1
[key_length][user_key][tag]

memP_lookUpkey

总的来说还是通过internal_key 查找,所以查找的比较器就比较重要,这里来看下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}

按照key的值和sequence的值进行比较,都是使用uint64_t比较的,因为sequence取的是后24位,所以大小比较key有效过滤key值相等不同seq的值。

总结

本文介绍了MemTable的实现,还包含了具体的写入过程中的锁。LevelDB对锁的处理,内存的处理是十分指的学习的。但是每次都将Value的值也一起放入到内存,写入到SkipList里,个人觉得对大对象的存储和查询造成了很多内存上的浪费和使用。是否可以考虑存储计算分离,将Value的值和Key值分开,然后不断修改Key中的Value的offset就行了。有个初步的设想,首先Key记录的是Value的offset,磁盘或者内存,在合并的时候去查询,查到数据执行更新,比自己旧的数据可以进行删除和回收。

前言

在上一篇的文章中,将 LevelDB 的架构做了一个简单的介绍。分析了需要的各个模块,后文将针对各个模块做一个更加详细的介绍。在介绍的过程中,希望能够了解到为什么这么做。

LOG

作用

Log 本身就是一个 WAL 日志,将每次写入的改变数据的操作首先持久化到文件,因为数据是顺序写入的所以写入性能高。又因为是每次首先都记录在 WAL 日志中然后进行具体的操作,所以根据 WAL 日志能够在系统意外崩溃的情况下恢复到崩溃前的状态,不会出现客户端已经返回成功,但是数据丢失的情况。

实现

具体实现位置

Log 涉及到的主要就是读写.
写操作就是 WAL 日志,顺序写入操作到磁盘,主要实现在:

  • db/log_writer.cc
  • db/log_writer.h

读操作就是在系统重启后从 WAL 日志恢复,也是顺序读取。和 Writer 感觉就是队列一个,一个队头读,一个队尾写。主要实现在:

  • db/log_reader.cc
  • db/log_reader.h

在写的过程中还涉及到日志的序列化和反序列化。这部分的实现是在:

  • db/log_format.h

当然不同的操作系统对底层调用如文件读写是不一样的,LevelDB 使用了一个 env 来统一上层操作,然后不同环境在编译器自动实现。具体位置在:

  • util/env.cc

在 env 中,包含了多个类。其中和文件相关的类有:

  1. SequentialFile 从一个文件顺序读
  2. RandomAccessFile 随机读取某个文件
  3. WritableFile 顺序写的文件,注意的是,在 env 中说明了,这个类需要提供一个 buffer,可以让小的 fragments 能够合并一起刷入磁盘

上面提到的读写和序列化的三个都属于 log 的 namespce。namespace 是一种用于组织和管理命名空间的机制。命名空间是用来避免名称冲突(名称重复)的一种方式,尤其在大型项目中非常有用,以确保不同部分的代码可以使用相同的名称而不产生冲突。而且 namespace 是可以嵌套的。

个人将它类比为 java 的 package

数据封装

为了减少磁盘 IO,LevelDB 每次读取文件都会读取 4kb 的数据,具体实现后文会说。为了让一次性读取的数据读取到当前刚好能处理的数据,所以写入的过程中也针对 4kb 做了操作。这个 4Kb 大小的数据在 LevelDB 中称之为 Block,写入的数据或者读取的数据被称为 Record,但是并不是每次写入的数据都刚好等于 4Kb,所以针对这种情况,又将存储在 Block 中的数据切割成 Fagement。组织如下图所示:

log_file_format

上图为 logfile 的里面数据的组织结果,一次写入称之为 Record,一个 Record 会被切分成一个或多个 fragement 中分布在一个或者多个 block 中,每次读取是一个 block,但是每次写入只是写入一个 fragement。

写操作

Writer

LevelDB 会为每次写入封装一个 Writer 对象,这个对象定义在db/log_writer.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Writer {
public:
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
explicit Writer(WritableFile* dest);

// Create a writer that will append data to "*dest".
// "*dest" must have initial length "dest_length".
// "*dest" must remain live while this Writer is in use.
Writer(WritableFile* dest, uint64_t dest_length);

Writer(const Writer&) = delete;
Writer& operator=(const Writer&) = delete;

~Writer();

Status AddRecord(const Slice& slice);

private:
Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);

WritableFile* dest_;
int block_offset_; // Current offset in block

// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
// record type stored in the header.
uint32_t type_crc_[kMaxRecordType + 1];
};

公有域:

  • AddRecord 方法,用于外部写入 Slice

私有域:

  • EmitPhysicalRecord 用于写入磁盘
  • dest_ env 中提供的一个写文件的封装,可以理解位一个已经打开的可以写入的文件
  • blockoffset 当前 writer 写入的 block 位置
  • typecrc ,这个是一个数组,里面存储的是当前的 type 对应的 crc,因为 type 是一个常量,不需要每次都计算。
AddRecord

AddRecord 本身的实现主要就是对当前写入 Record 做切割成 Fragement,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();

// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
//当前的block中的剩下的值
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
// 如果小于7 ,则全部填满为0
if (leftover < kHeaderSize) {
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}

// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
// 一个block中的每一个fragment的大小,left为本次写入过程中待写入的数量,avail为可以写入的长度
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length); // 如果当前待写入的数据量小于block可以使用的,则说明本次可以作为一个完整的写入
if (begin && end) {
type = kFullType;
} else if (begin) { // 如果第一次待写入的数据库大于可以使用的,则需要进行切段,并且标记
type = kFirstType;
} else if (end) { // 再循环中,如果前面的写入都完成了,那么最后可能是写入一个完整的数据,并且将它标记为最后的fragment
type = kLastType;
} else { // 在循环中,写入第一个后,后面仍然不足够,则需要进行切分为多个,既然不是开始也不是最后,则是处于中间的数据量
type = kMiddleType;
}
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}

上面代码中的 kBlockSize 初始值在util/log_format.h中的 kBlockSize,大小是 32768 字节也就是 32kb。blockoffset则是 writer 对象中写入成功后会更新的值。

实现的流程核心分为以下判断:

  1. 当前 block 中剩下的值是否不能写入一个 header 即 7 个字节,如果小于则直接填充 0,所以给实际数据写入的值为 avail,即等于 整个 blog 剩下的值减去 header 的 7 个字节
  2. 如果当前的 block 的 avail 大小大于需要写入的数据,则当前 fragement 的长度就等于需要写入的长度,也就是一个 fullfragement,否则只能写入剩下可以写的大小
  3. 判断当前的 fragement 的类型是通过 2 个参数确定的。
    1. begin 在第一次进入方法时候为 true
    2. end 如果当前剩下的可写长度比 fragement 的长度长,则 end 为 true,否则为 false
    3. 如果 end 和 begin 都为 true,则是一个 fragement,如果两者中只有一个为 true,则要么是最后一个,要么是第一个,两个都为 false,则说明是 kMiddleType。

确认好 type 后,也就确认了当前 fragement,也就是可以进行数据的持久化了,即调用了 EmitPhysicalRecord 方法:

EmitPhysicalRecord

EmitPhysicalRecord 方法具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
size_t length) {
assert(length <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + length <= kBlockSize);

// Format the header
char buf[kHeaderSize];
buf[4] = static_cast<char>(length & 0xff); // 将长度的低8位放到第四个位置
buf[5] = static_cast<char>(length >> 8); // char是一个字节,所以这里也是一个字节。可以看到是小端数组,低八位放在前面,然后最多两个字节表示带下,最多一次性写入16k的数据
buf[6] = static_cast<char>(t); // 当前fragment的类型放在6 这个位置

// Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);

// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = dest_->Append(Slice(ptr, length));
if (s.ok()) {
s = dest_->Flush();
}
}
block_offset_ += kHeaderSize + length;
return s;
}

首先是拼接头节点,这里不是从头到尾来做的,而是首先将长度和 type 放入,具体的数据结构可以看上面的图中的 fragement 里面的头节点类容:

1
buff[6] = char[6]{crc_low,crc_mid0,crc_mid2,crc_high,length_low,length_high,type}

和 Varint 类似,甚至前面 4 个 crc 就是使用的 EncodeFixed32,固定长度的 char 表示 32 位整型。都是小端存储。

封装好 header 后,首先将 header 的数据 append 到 dest_ 中,成功后 append 数据,append 成功后会调用 flush,将本次的 record 刷入到磁盘上。

整个写流程就完成了,此时 Log 中已经包含了本次写入的 Record。

读操作

前文提到,每次写入都会将 Record 写入到磁盘上作为 WAL 日志,WAL 日志的读取只有一个地方会做,就是数据库重启后的恢复动作。但是数据库的恢复动作除了读取 Record 还涉及到很多其他的如版本等的操作。读操作的篇幅里都不会涉及,在后面了 version 的时候会详细说下,所以本文仅仅涉及到读 Record 的操作。

Reader

和 Writer 类型,LevelDB 会为每次读取都提供一个 Reader 的对象,实现位置在db/log_reader.h中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Reader {
public:
// Interface for reporting errors.
class Reporter {
public:
virtual ~Reporter();
// 某些字节可能已经损坏,损坏的
virtual void Corruption(size_t bytes, const Status& status) = 0;
};
// reader 传入的参数是一个SequentialFile,也就是一个顺序读取的对象
Reader(SequentialFile* file, Reporter* reporter, bool checksum,
uint64_t initial_offset);
Reader(const Reader&) = delete;
Reader& operator=(const Reader&) = delete;
~Reader();
// 将当前Record的数据读取到record里面,读取成功则返回true,如果已经读取到本次输入的尾部,则返回false,并且将数据临时存储在scratch 中
// Read the next record into *record. Returns true if read
// successfully, false if we hit end of the input. May use
// "*scratch" as temporary storage. The contents filled in *record
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
bool ReadRecord(Slice* record, std::string* scratch);

// Returns the physical offset of the last record returned by ReadRecord.
//
// Undefined before the first call to ReadRecord.
uint64_t LastRecordOffset();
private:
// 有删减
SequentialFile* const file_;
Reporter* const reporter_;
bool const checksum_;
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize

// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
// Offset of the first location past the end of buffer_.
uint64_t end_of_buffer_offset_;

// Offset at which to start looking for the first record to return
uint64_t const initial_offset_;
};

上文没有贴完整的代码,私有域中的方法和对象我没有完全贴。因为 Reader 方法本身只是将 Record 从 Log 中读取出来,当然其他如 MANIFEST 的文件其实也是按照 Record 来存储的。但是整体上来说,都是从文件中将 Record 的日志恢复,然后按照类型插入到 Memtable 或者 VersionSet 中。

Log 日志恢复主要是在 RecoverLogFile 方法中位于db/db_impl.cc中。这个方法比较长,下文挑一些核心的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);

if (mem == nullptr) {
mem = new MemTable(internal_comparator_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
}
const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}

if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
compactions++;
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
mem->Unref();
mem = nullptr;
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
break;
}
}
}

上面是从文件中读取 record 的实现,是一个循环读取的过程,上面的方法介绍里说过,reader.ReadRecord(&record, &scratch) 中的两个传入的参数分别为,如果是 fullFragement,则将值放在 record 中,如果是 first,mid 则放在 scratch 中,一直遇到 end 后放到 record 中。

问题 a:是否存在比如当前有两个日志文件(000001.log,000002.log),然后 000001.log 中的末尾刚好是 000002.log 第一个 fragement 的 header 呢?

在持续读取过程中,会将 Record 的数据写入到 memtable 中,如果发现 Memtable 的值超过了 4MB,则刷入 level0 层。

循环执行完后,当前的 log 日志已经全部弄到内存中了。如果当前的Options中指定了使用原来的 log 文件,则无需将内存中的数据刷入磁盘,因为 log 文件在后续的写入中继续使用,则将当前恢复 memTable 复制给 mem 对象作为后续写入的 memtable,log 的回收可以走写入的流程过程中的日志文件回收策略,否则的话,仍然需要将当前的数据刷入 level0。因为可能在新写入的操作中,该日志文件被删除,到时候没有刷盘则丢失数据了。

Log 回收和截断

Log 日志如果不做截断,数据量会持续堆积,越来越大。截断的时机是个比较重要的事件。看上面的 Recover 可以看出一些端倪。如果当前的 memTable 中的数据被刷入到了磁盘,成了 level0,那么就说明可以回收当前对应的 log 文件了。

db/db_impl.cc中有一个 CompactMemTable,该方法就是将 imm 的数据写入到 level0,然后在数据写入到 level0 的时候就将当前的 log 删除。那么是否可能存在误删除当前写入的数据呢?答案是不会的,因为合并的时候正在写的 log 文件已经变成了新的文件。还记得上面的问题 a 吗?这里得到了答案,就是不可能存在 header 的数据在一个文件,然后 data 的数据在另外一个文件的情况,只有可能出现文件刚好写完 header 系统就挂掉的情况。这种情况在 leveldb 中是作为异常处理的。

日志的切换是在db/db_impl.cc中的 MakeRoomForWrite,当当前的资源不住,主要 i 是 mem 的资源不足的时候,就新建一个 log 文件作为本次写入的文件,然后将原来的文件 close,然后将当前 mem 修改为_mem。而且这个文件是递增命名的,所以根据名字就可以进行先后顺序排序,所以也不存在导致删除错误的情况,至于文件的组织后文在探讨 Version 的时候在讨论。

总结

本文将前面的整体架构中的 Log 模块做了介绍,还涉及到了部分 Recover 的情况。在 LevelDB 中,数据基本上都是按照类似的方式不断的 append 的,所以基本上都是 Record 的方式加解密。这部分会在后面的 SSTable,Version 等中在遇到。

个人觉得设计比较有美感的就是 Log 的截取和 fragement 的方式写入,阅读起来很顺畅。

前言

前面介绍了 LevelDB 中的 Slice,编码和 SkipList 较为基础的组件和实现。本文将介绍下 LevelDB 的整体架构。

leveldb_architure

上图是一个比较简陋的架构图。只涉及到数据的读写,没有涉及到如版本控制,文件合并等。下文会按照每个模块稍微讲解下。

首先看下一个 demo 写入数据产生的文件:

1
2
3
4
5
6
7
8
9
10
000005.ldb
000006.ldb
000007.log
000008.log
000009.ldb
CURRENT
LOCK
LOG
LOG.old
MANIFEST-000004

其中 LOG 就是输出的系统日志文件

  • ldb 文件即 sstable 文件
  • log Log 日志,下文的 WAL 日志
  • CURRENT 存储的是当前的 MANIFEST 文件名称
  • LOCK 加锁,一个 leveldb 的数据库只能由一个进程打开
  • MANIFEST 主要保存的是版本数据
    • 这个文件算的上是一种特殊的 Log 日志文件,里面存储的是 Version 的数据,也就是每次 Version 发生变化都会写入到这个文件中,也是一个 WAL 日志。Version 暂时将他认为就是某一时刻,每一个 Level 中包含的文件等信息

Log

前面提到过 LevelDB 是一个 LSM-Tree,这种数据结构分为两种数据,一个是已经持久化到磁盘上的 SSTable,还有一个就是当前内存中的 SSTable,SSTable 就是 Sorted String Table,按照字符排序的数据。每次插入都是先插入内存,然后在时间或者空间的维度上设置阈值,超过阈值将内存中的刷入到磁盘。查询则是每次先查内存,然后去磁盘上查询。

因为服务端返回一般是写入到内存中就可以直接返回了,所以可能存在当前异常挂掉的时候内存数据丢失。也就是说需要一个系统崩溃后的恢复机制。只要涉及到类似的,先写内存然后异步刷磁盘的场景基本上都是需要这个机制的,这个机制就是 WAL(write-ahead log),实现方式就是将可能对当前系统内部的数据变更的操作(增删改)操作先顺序写入到磁盘上,在出现系统异常推出后,可以通过读取这个日志将将数据恢复到开始阶段。

不过存在下面的情况,WAL 成功,没有执行操作的时候宕机,只能恢复到上一次完成操作的 WAL 日志位置,这也就意味这实际使用过程中,必须要有一个 ID 来标识当前的操作序列,便于恢复的时候不会返回原本已经报错写入失败的数据被恢复的情况。

LevelDB 中的数据没有删除的概念,删除的时候只是在当前的 Key 中设置一个墓碑标识,表示当前的数据已经被删除了,然后在后续文件合并的时候才会真正删除数据。所以基本上只涉及到曾和改,但是在 KV 数据库中,改也是一种增加,也就是直接覆盖原始数据。

Mem_Table 和 Immutable_Table

从命名可以看出。immutable_table 是一个不可变的 table。LevelDB 的操作是先将数据写入到 MemTable 中,MemTable 的实现就是一个 SkipList。等到 MemTable 中的数据到达容量阈值,一般是 4MB,就将他转换成不可变的 immutable_table 即只读不写的 SkipList。后面直接将 immutable_table 数据刷入到磁盘上,就是 level0 的数据了。

SSTable

SSTable 就是一个按找字符排好序的文件,但是仅仅存储值也是不够的。比如最开始写入了 Key_0,然后系统一直运行,而且后续在也没有插入新的 Key 为 Key_0 的值了,那么内存中肯定是没有数据的,必然会触发到文件中去找的需求。这里就遇到元数据的组织问题,即如何知道当前文件是处于那一层的,当前文件中包含了哪些 Key 值,在前面的 MANIFEST 文件内容中提到过,每一时刻当前的数据库都有一个对应的 Version,记录了当前数据库中每一层有哪些文件。

SSTable 文件写入后是不可变的,所以在合并的时候也是新建一个文件,然后将需要合并的数据合并成一个新的文件,必要的时候可能会将他的层级往下。

因为涉及到合并,Version 管理,不一一详细介绍,会在后面的 Version 和 合并等操作中解析。

总结下就是 SSTable 的文件组织是放在 Version 中,也就是某一时刻,当前的数据库有哪些文件,删除哪些文件等等信息会放在 Version,每次操作都会写入 MANIFEST 文件中。

但是每层的数据大小是不一定的,比如 Level0 层的数据是直接写入的 Mem_table,也即是 4MB,但是 Level1 层的大小是 10MB,而且并不是说一个完整的 10MB 的文件,而是分成 2MB 的文件。除了 Level0 以外的数据都是组织成 2MB 的文件,而且每一层都是上一层的 10 倍大小。

由于文件都需要合并,比如说当前 Level0 层和 Level1 层都是满的而且里面的 Key 各不相同,在合并的过程中可能就是需要读取 10+16MB,然后在写入 10+16 MB 的数据。

Others

其实在读取的过程中还会涉及到一个 LRUCache,但是个人觉得和总的架构来说只是一个缓存的左右,用于加快查询,所以没有在图上展示出来。

总结

本文非常粗糙的介绍了下 LevelDB 的整体结构。

前言

在数据结构中,树一直被很多系统钟爱,如 mysql 的 innodb 使用的是 B+ 数,在 java 中 Map 的 hash 碰撞后,如果链表超过 8 会切换为红黑树。树结构的好处个人认为是在写入的时候对数据进行一个预处理,而且这个出力和子节点的数量相关,在写入的时候按照规则插入,能够在查询的时候有效的查询对应的子树,从而达到查询时间为 LogN 。如红黑树,红黑树的结构是异构化的 23 树,能够确保左右子树的高度差维持在 1,有效保证数据平衡,但是红黑树的实现比较复杂,插入过程涉及到树的重平衡。后来出现了一种链表的结构,称之为跳表即 SkipList,他是由 William Pugh 在Skip Lists: A Probabilistic Alternative to Balanced Trees提出的一种平衡性很好的数据结构,这种数据结构是使用空间换时间,实现较为简单,查询复杂度和树结构接近。在 redis 和 leveldb 中都有使用。本文会介绍下 leveldb 中的 SkipList 的实现。

SkipList 原理

在某种意义上,链表可以算得上是一种特殊的树,只有一个分支的树。一般情况下分为单向链表和双向链表。单向链表里面的节点会有个指针指向下一个节点,双向链表包含两个指针,一个指向指向他的节点(prev),一个指向下一个节点。SkipList 是单向链表,但是不止一个链表,而是使用 n 个链表分层,每一层中的 node 指向的下一个节点各不相同。后续的说明主要是针对上文提到的论文中的一个小摘抄:

论文中提到(第一节),二叉树在随机大小写入的情况下效率很高,但是如果是顺序大小写入则表现的比较一般,这个是因为二叉树尤其是平衡二叉树需要对数结构进行一个平衡引起的。

SkipList 是一种平衡树的概率性的替代品,这里概率性个人觉得主要是上面提到的随机大小数据写入过程平衡树效率比顺序写高,就是通过另外的方式实现了这种顺序写变为随机。论文中提到,250 个元素的 skiplist 需要查询查过 3 次的概率不到百万分之一。

论文中还提到,skiplist 在实现上比较容易,而且使用的是概率性的计算来保持数据的平衡,在空间上平均每个节点可能只需要 1+1/3 个指针。

SkipList 还有一个好处就对锁范围的控制上,在平衡二叉树或者红黑树,在并发操作的情况下,特定的时候如需要修改 Root 节点的属性,可能需要对整棵树都加锁。但是 SkipList 里面可以针对局部节点进行加锁,在并发情况下实现比较好。

image-20230831163355393

上图选自论文中的 figure1。

n 标识当前链表的长度,且链表是顺序存储的。

  1. 在初始阶段,当前的单向链表查询可能需要遍历 n
  2. 如果将可以记录每间隔个节点创建连接,则可以将查找的时间缩小为 n/2+1。按照最上层比较,可以找到当前查询值的前一个节点,所以是 n/2+1
  3. 还可以在 2 的基础上,每间隔一个继续创建一个连接,则时间缩小为 n/4+2。首先最上层找,此时是 n/4,然后在第二层,因为当前中间有 b 层,而且 b 层此时只有 1 个节点,当前值要么大于这个值要么小于,所以需要 2 次就可以找到当前值。
  4. 现在将具有 k 个指向其他指针的节点称之为 level K 节点,则说明第 2^i 节点后面有 2^i 个节点,则节点分布在一个比较均匀的数据中。level0 0 节点有 100%个节点,有 50% 的节点在 level1,有 25% 在 level2,以此类推。
  5. 但是如果我们随机选择 level1 层的节点,可能会出现上图 e 中情况,也就是高层分配不均匀,如 6 虽然在第四层,但是他却处于第二个位置。

可以看到,SkipList 的核心其实是这个层数的确定,如果是按照十分恰当的分配节点中的每一个节点的 level,他的时间复杂度就是 logN。

这个上升当前节点的 level 的时机是影响链表查询的关键。

SkipList 的操作

主要设计到增删查:

  1. Search 操作返回目标关键字所关联的 value,如果没有该关键字则返回 failure

  2. Insert 操作对目标关键字更新 value,如果目标关键字不存在,则将关键字插入

  3. Delete 操作删除目标关键字。

查询操作

查询操作是删除和插入的基础,因为每一层都是单向链表,所以必须要找到插入的父节点才行,如果当前节点的 level 较高,可能还需要将查询过程中的每一层的父节点都记录下来。

伪代码如下:(论文 figure2)

img

伪代码是搜索一个 searchKey 是否在 skiplist 中。具体步骤如下:

  1. 将 x 初始话为 skiplist 的头节点
  2. 进入第一次循环,循环是从当前最大的 level 到 level1
  3. 然后进入第二层循环,在当前层的 forword 对象中找到 key 值不小于当前 searchkey 的值
  4. 跳出 3 的循环后,此时已经找到本层不小于 searchkey 的值,继续往下一层找。如果找到了等于当前 searchkey 的值,则返回值存储的 value。
  5. 如果从 maxlevel 到 1 都没有这个值,返回一个 failture。

查询就是从最上层往下查询的过程。

插入和删除

在上文中,我们查询不到这个值就返回一个 failture,但是其实也可以返回当前找到的 level1 层的值,因为这个值是当前 skiplist 中不小于 searchKey 的值。如果说是插入,那么也应该是插入到这个值的后面。但是因为涉及到 level 的变化,所以查询过程中需要将当前查到的每一层不小于当前的值都查出来,然后随机一个当前值的 level,然后将存储的每一层 level 都和当前 level 进行一个绑定。删除也类似。

插入和删除图形化为:

img

后文根据论文中的伪代码介绍下操作

插入

插入的伪代码如下:论文 figure4

image-20230901144301435

  1. 定义一个变量 update,用来存储上文提到的每一层中不小于当前值的 Node。

  2. 查找当前的 key 和将每一层比较的结果赋值到 update 中。

  3. 如果查到当前值了,那么 put 就变成了 update,直接将 skiplist 中的对应 key 的 value 修改为当前的 newvalue。

  4. 如果没有找到,也就是上文提到的,首先随机产生一个当前的 level。如果当前的 level 超过 skiplist 当前最大的 level,那么需要将当前的 level 设置为新生成的 level,然后将 head 放到每一层的头节点。

  5. 将插入的数据变成 Node,Node 包含了当前 key,value 和 level 信息,当然内部还有个 next 信息

  6. 从第 1 层往上,将当前的 x 插入到每一层。

这里有个问题就是对 level 的管理,因为不可能随便随机一个数出来,最好的是希望设置一个 maxlevel,然后作为 seed,在 maxLevel 下面随机产生层数。

随机算法

在论文中提到过,skiplist 本身是个概率性的算法,个人觉得概率性其实就是指的层数的选择。他想要达到的效果是在上升中,某一层出现的概率是大致一定的。也就是说如果第 i 层的元素能够按照某个固定的概率 p(上文使用的是 1/2)出现在在 i+1 层,这里面涉及到概率论里面期望运算,就不再赘述了(我已经还给老师了)。一句话说就是如过选择的是 p=1/2,那么我们就希望如果 n 等于 16,那么第 0 层是 16,第二次是 8,第三层是 4。说白了就是返回的值出现的概率是一样的,比如 返回 1 的概率就是 1/2,返回 2 的概率就是 1/4。

1
2
3
4
5
6
7
8
9
private static final double PROBABILITY = 0.5;
private int randomLevel() {
int level = 1;
Random random = new Random();
while (random.nextDouble() < PROBABILITY && level < MAX_LEVEL) {
level++;
}
return level;
}

使用这种方式可以生成,主要是因为 random.nextDouble() 出现的数是比较均匀的分布,就是出现的概率是相对一定的,然后只要小于 0.5 就多一层,也就是上面的 1/2 的概率,继续增加 level,则就是 1/2 *1/2 的概率了。

删除

删除也是在查询基础上做的,伪代码如下:

img

首先仍然是记录下查询过程的每一层的上一个节点,如果找到这个值对应的 Node,就将 update 中存储的值都进行一个链表删除操作,然后释放内存。最后,还需要检查当前是否删除了最高的 level 中的最后一个 node,如果是还需要将当前的 level 减 1.

自己写个 SkipList

使用 java 自己写了个 skiplist,其中数据使用 DataNode,索引使用 IndexNode,避免值的无效覆盖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import java.util.LinkedList;
import java.util.Random;

public class SkipList<K extends Comparable<K>, V> {
// 当前最大的层数
private int maxLevel = 12;
// 头结点,down 使用down指向下一层
private SkipListIndexNode<K, V> head;
// 当前的level
private int level = 1;
private static Random random;
//跳到下一层的概率
private double probability = 0.5;

public SkipList(int maxLevel, double probability) {
random = new Random();
this.maxLevel = maxLevel;
this.probability = probability;
SkipListDataNode data = new SkipListDataNode(Integer.MIN_VALUE, null);
head = new SkipListIndexNode(data, 1);
}


public V get(K key) {
SkipListDataNode<K, V> old = searchNode(key);
if (old != null) {
return old.value;
} else {
return null;
}
}

public K getMax() {
SkipListIndexNode<K, V> cur = head;
while (cur.down != null) {
while (cur.right != null) {
cur = cur.right;
}
if (cur.down == null && cur.right == null) {
break;
}
cur = cur.down;
}
return cur.getKey();
}

public K getMin() {
SkipListIndexNode<K, V> cur = head;
while (cur.down != null) {
cur = cur.down;
}
return cur.right == null ? null : cur.right.getKey();
}

public void put(K key, V value) {
SkipListDataNode old = searchNode(key);
if (old != null) {
old.value = value;
return;
}
int newLevel = randomLevel();
if(newLevel>maxLevel){
newLevel=maxLevel;
}
if (newLevel > level ) {
for (int i = level; i < newLevel; i++) {
genNewHead();
}
}
SkipListDataNode<K, V> data = new SkipListDataNode<K, V>(key, value);
SkipListIndexNode<K, V> indexNode = new SkipListIndexNode<K, V>(data, newLevel);
LinkedList<SkipListIndexNode> update = new LinkedList<SkipListIndexNode>();
SkipListIndexNode<K, V> cur = head;
while (cur != null && cur.level > newLevel) {
cur = cur.down;
}
while (cur != null) {
while (cur.right != null && cur.right.getKey().compareTo(key) < 0) {
cur = cur.right;
}
update.add(cur);
cur = cur.down;
}
SkipListIndexNode<K, V> bottom = null;
while (!update.isEmpty()) {
SkipListIndexNode prevNode = update.pollLast();
SkipListIndexNode curLevelIndex = indexNode.genIndexNodeByLevel(prevNode.level);
curLevelIndex.right = prevNode.right;
prevNode.right = curLevelIndex;
curLevelIndex.down = bottom;
bottom = curLevelIndex;
}
}


public void delete(K key) {
LinkedList<SkipListIndexNode> update = new LinkedList<SkipListIndexNode>();
SkipListIndexNode<K, V> cur = head;
while (cur != null) {
while (cur.right != null && cur.right.getKey().compareTo(key) < 0) {
cur = cur.right;
}
update.add(cur);
cur = cur.down;
}

while (!update.isEmpty()) {
SkipListIndexNode skipListIndexNode = update.pollLast();
if (skipListIndexNode.right != null && skipListIndexNode.right.getKey().compareTo(key) == 0) {
skipListIndexNode.right = skipListIndexNode.right.right;
}
}
while (head.right == null) {
level--;
head = head.down;
}
}


public void printList() {
SkipListIndexNode bottom = head;
LinkedList<SkipListIndexNode> stack = new LinkedList<SkipListIndexNode>();
while (bottom.down != null) {
bottom = bottom.down;
}
SkipListIndexNode printLevel = head;
while (printLevel != null) {
SkipListIndexNode printLeveltail = printLevel;
System.out.printf("%-5s", "head->");
SkipListIndexNode bottomTail = bottom;
while (printLeveltail != null && bottomTail != null) {
if (printLeveltail.right != null && printLeveltail.right.getKey().compareTo(bottomTail.right.getKey()) == 0) {
System.out.printf("%-5s", printLeveltail.right.getKey() + "->");
printLeveltail = printLeveltail.right;
} else {
System.out.printf("%-5s", "");
}
bottomTail = bottomTail.right;

}
printLevel = printLevel.down;
System.out.println();
}

}

private SkipListDataNode searchNode(K key) {
if (key == null) {
return null;
}
SkipListIndexNode<K, V> cur = head;
while (cur != null) {
while (cur.right != null && cur.right.getKey().compareTo(key) < 0) {
cur = cur.right;
}
if (cur.right != null && cur.right.getKey().compareTo(key) == 0) {
return cur.right.dataNode;
}
cur = cur.down;
}
return null;
}

private void genNewHead() {
SkipListIndexNode<K, V> skipListIndexNode = new SkipListIndexNode<K, V>(null, ++level);
skipListIndexNode.down = head;
head = skipListIndexNode;

}


private int randomLevel() {
int level = 1;
while (random.nextDouble() < probability && level < maxLevel) {
level++;
}
return level;
}


public static void main(String[] args) {
SkipList<Integer, Integer> skipList = new SkipList<Integer, Integer>(12, 0.5);
for (int i = 1; i <= 200; i++) {
skipList.put(new Random().nextInt(200), 100 + i);
}
skipList.printList();
;
System.out.println(skipList.get(10));
System.out.println(skipList.get(11));
System.out.println(skipList.get(12));
System.out.println(skipList.getMin());
System.out.println(skipList.getMax());
for (int i = 0; i <= 20; i += 2) {
skipList.delete(i);
}
System.out.println(skipList.get(10));
System.out.println(skipList.get(11));
System.out.println(skipList.get(12));
System.out.println(skipList.getMin());
System.out.println(skipList.getMax());

}

class SkipListDataNode<K extends Comparable<K>, V> {
private K key;
private V value;

SkipListDataNode(K key, V value) {
this.key = key;
this.value = value;
}

}

class SkipListIndexNode<K extends Comparable<K>, V> {
SkipListDataNode<K, V> dataNode;
int level = 0;
SkipListIndexNode<K, V> right;
SkipListIndexNode<K, V> down;

public SkipListIndexNode(SkipListDataNode<K, V> dataNode, int level) {
this.dataNode = dataNode;
this.level = level;
}

public SkipListIndexNode genIndexNodeByLevel(int level) {
return new SkipListIndexNode(this.dataNode, level);
}

public V getValue() {
return dataNode.value;
}

public K getKey() {
return dataNode.key;
}
}
}

LevelDB 的 SkipList 实现

在上面已经较为详细的介绍了 SkipList,也使用 java 做了一个简单的实现。下面来看下 LevelDB 中的 SkipList 的实现。

在几乎所有的 SkipList 的方法上面都有一个 template 。这个的作用有点类似于 java 的泛型,只不过 java 是运行时而 C++ 是编译阶段。

template <typename Key, class Comparator> 值得是一个 key,这个 key 和 Comparator 是一个占位符,用来表示具体传入的数据类型,后面的 Comparator,则说明需要传入一个实现了 Comparator 的接口或者类的类型。

random 方法

首先来看下 random 的方法使用的位置:

1
2
3
4
5
6
7
8
9
10
11
12
template <typename Key, class Comparator>
int SkipList<Key, Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1; // 初始level是1
while (height < kMaxHeight && rnd_.OneIn(kBranching)) {// 根据概率,默认1/4
height++;
}
assert(height > 0);
assert(height <= kMaxHeight); // 当前的level是不是小于kMaxHeight(默认12)
return height;
}

其中最要的是这个rnd_.OneIn 方法,这个方法就是生成均匀分布的随机数。

现在的随机算法都是伪随机,算法为线性同余生成器,算法表示为:

1
Xn+1 = (a * Xn + c) % m

其中,Xn 是当前生成的伪随机数,Xn+1 是下一个生成的伪随机数,a 是乘法因子,c 是增量,m 是模数。通常,a16807c0m2147483647,这是因为这组参数在计算中具有良好的性质,可以生成均匀分布的伪随机数。

在 leveldb 中就使用到了上面提到的两个魔法数,具体是现在在util/random.h,因为是一个整体,所以全部复制出来,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Random {
private:
uint32_t seed_;

public:
explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { //0x7fffffff u表示无符号 2^31-1 二进制 111 1111 1111 1111 1111 1111 1111 1111 将第一位取0 ,因为无符号转化为有符号的时候头部位1 表示为负数
// 所以此处的主要目的是确保传入的s为正数,默认传入的seed初始值为0xdeadbeef 二进制为1101 1110 1010 1101 1011 1110 1110 1111
// Avoid bad seeds.
if (seed_ == 0 || seed_ == 2147483647L) {
seed_ = 1;
}
}
uint32_t Next() {
static const uint32_t M = 2147483647L; // 2^31-1
static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0 //无符号64位二进制0100 0001 1010 0111
// We are computing
// seed_ = (seed_ * A) % M, where M = 2^31-1
//
// seed_ must not be zero or M, or else all subsequent computed values
// will be zero or M respectively. For all other values, seed_ will end
// up cycling through every number in [1,M-1]
uint64_t product = seed_ * A; //26696993619177 二进制 High[000000 00000 0000 0000 1100 0010 0011 1] low [111 00000 1101 0010 0011 1100 1110 1001]
//这里是计算Mod算法的一个优化,一个64为的数,可以分为高33 位和低31 位的数
// Compute (product % M) using the fact that ((x << 31) % M) == x. product=high<<31+low 又因为product=seed_*A,所以此时product=(high*M+high+low)%M 其中 M = 2^31-1
// 因为seed_ 和A 中,seed_的值在初始化的时候就让他小于2^31-1,A 是固定的16807,所以这两个值都不会大于M的值,所以取余最后的结果就等(high+low)%M=high+low,所以下面的这个计算是获取high和low的值相加,也就得到了seed_
// 但是有一种情况就是product的low为刚好但是 2^31-1,这个时候product=(high*M+high+1*M)%M=high ,但是使用下面的结果会是high+M,因为M&M=M。所以,需要判断是否seed_ 比M大,然后前去M,直接使用high,也确保了seed的值一直小于M
// 最后强制转换为32位的值
seed_ = static_cast<uint32_t>((product >> 31) + (product & M)); // High+low [000000 00000 0000 0000 1100 0010 0011 1]+[111 00000 1101 0010 0011 1100 1110 1001] & [111 1111 1111 1111 1111 1111 1111 1111]
// The first reduction may overflow by 1 bit, so we may need to
// repeat. mod == M is not possible; using > allows the faster
// sign-bit-based test.
if (seed_ > M) {
seed_ -= M;
}
return seed_;
}
// 在[0,n-1]之间返回随机数
// Returns a uniformly distributed value in the range [0..n-1]
// REQUIRES: n > 0
uint32_t Uniform(int n) { return Next() % n; }
// n分之一的概率返回true
// Randomly returns true ~"1/n" of the time, and false otherwise.
// REQUIRES: n > 0
bool OneIn(int n) { return (Next() % n) == 0; }
// //首先求得[0,max_log]的一个随机数,然后求得[0,2^maxlog-1]的一个随机数
// Skewed: pick "base" uniformly from range [0,max_log] and then
// return "base" random bits. The effect is to pick a number in the
// range [0,2^max_log-1] with exponential bias towards smaller numbers.
uint32_t Skewed(int max_log) { return Uniform(1 << Uniform(max_log + 1)); }
};
}

上面的方法就随机数的 seed 计算比较上头。因为第一个种子确定后,后面的种子都相对确定了,而且这个出现的概率比较均匀,所以也就是 OneIn 能够保证出现概率接近的原因。

数据结构

leveldb 中的 SkipList 具体实现在db/skiplist.h中。

私有域:

  1. struct Node 用来存储数据,和上面 Java 实现的不同,Node 中 key 的值既包含了需要查询的 key,也包含具体的 Value,这个和数据存储格式相关系
  2. enum { kMaxHeight = 12 }; 允许最大的层数是 12 层
  3. compare_ 用来做 key 值比较的比较器
  4. arena_ 为每个 Node 分配内存
  5. head_ 头结点指针
  6. std::atomic maxheight; 原子递增的最大层数
  7. 一个上文提到的随机数生成的对象

explicit 关键字一般用来修饰类的构造函数,作用是告诉编译器按照实际的类型来构造函数,不允许做隐式转换。

公有域:

  1. 一个不允许隐式转换的构造器

  2. void Insert(const Key& key); // 插入方法

  3. bool Contains(const Key& key) const; 判断是否存在

  4. Iterator 一个迭代器实现,迭代器中包含了多种查询方式,包括查找当前的 key 等方法(后文详细介绍)。其中也包含了私有域,指向当前的 SkipList 和一个用来遍历的 Node。

Node 结构体包含了一个 Key,这个就是前面提到的template <typename Key, class Comparator>中的这个 Key,还包含了一个数组

std::atomic<Node*> next_[1];

这个是一种柔性的数组。也就是大小是可变的,next 数组是一个指针数组,也就是上文中 java 代码中的 IndexNode,本身不存储 key Value,而是作为索引存储。这个 next 表示当前层数中指向的下一个节点。

一个 SkipList 初始化过程中需要传入的参数就是比较器 comparator* 和内存分配器 arena*的指针。

查询

和上面的实现不同的是,leveldb 的查询是放在 Iterator 的 seek 方法中的

1
2
3
4
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}

最后调用的是 SkipList 本身的 FindGreaterOrEqual 方法,也就是要么找到当前值,要么就找到比当前值大的那个值。该方法包含了两个参数,第一个是需要查询的值,第二个是用来存储当前查询 level 的前序节点,也就是 update 数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
// null n is considered infinite
return (n != nullptr) && (compare_(n->key, key) < 0);
}

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_; // x为后续查询使用的临时数据,此时x在最上层的头结点
int level = GetMaxHeight() - 1; //获取当前最大的层数,从0开始,所以减去1
while (true) {
Node* next = x->Next(level); // next数组就是每一层链表的数据,从head开始
if (KeyIsAfterNode(key, next)) { //找到当前的查询的值是否在x后面,如果是,则说明当前的值的后一个值小于查询的值
// Keep searching in this list
x = next; //往右查询。
} else { // 如果x的后面的值大于查询的key,则判断是是否需要记录层数,get中prev是nullptr,所以不需要存储,在insert的时候就需要记录下来
if (prev != nullptr) prev[level] = x;
if (level == 0) { // 如果当前已经查询到最后一层,则返回这个next,此时next因为前面的KeyIsAfterNode 返回的是false,所以要么当前的值的右边为nullptr要么这个值就比key大,而且是紧挨的着的那个大于这个key的值。仅仅在第0层是这个结果
return next;
} else {
// Switch to next list
level--;
}
}
}
}

本身实现上和上文的没有区别,首先从最高层往下便利,其中 Next 方法就是获取到下一层的节点。这个方法还有一个实现就是NoBarrier_Next,两者的区别就是是否允许重排,在 java 中,指令重排是使用 valite 关键字关闭,c++中用下面的方式:(直接使用 leveldb 的代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].store(x, std::memory_order_release);
}

// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}

其中的 memory_order 就是决定当前的指令是否重排:

  1. std::memory_order_acquire:这是一种内存顺序语义,用于 load 操作。它确保在加载 next_[n] 指针之前,所有之前的内存操作(写入和读取)都不会被重排序到加载之后。在多线程环境中,这可以确保读取到 next_[n] 的值是最新的。
  2. std::memory_order_release:这是一种内存顺序语义,用于 store 操作。它确保在存储 xnext_[n] 指针之后,所有之后的内存操作(写入和读取)都不会被重排序到存储之前。在多线程环境中,这可以确保 x 被完全初始化之后再被其他线程访问。
  3. std::memory_order_relaxed:这是一种较弱的内存顺序语义,通常用于 loadstore 操作。它允许更大的重排序自由度,不保证像 acquirerelease 那样的强制顺序。通常在不需要严格的顺序控制时使用,以提高性能。

所以 setNext 值得是当前的获取的 n 是否被后续访问的线程可见,避免了出现多次创建甚至覆盖的问题,但是后面的两个则没有这个限制。

来看个例子:

当前如果有:

1
2
3
4
5
head2--->3->

head1--->3-> 5-> 9->

head0--->3->4->5->7->9->

如果有上文中的 3 层,现在需要查询 8

  1. 首先 x 被复制为 head2 进入循环,然后 x 获取 head 的链表也就是 next 中的数据,此时为 3
  2. 当前查询的 8 大于 3 ,所以将 3 位置 Node 赋值给 x,进入到下次循环
  3. 继续获取 3 后面的值,但是此时 3 后面的值为空,所以 KeyIsAfterNode 返回为 false,进入到 else,判断 prev 是否存储,判断是否已经到了第 0 层,此时否为 false,所以将 level–,找下一层
  4. x 此时获取的是 next 中的第 2 层,也就是 head1 后面的 3,因为 3 小于 8,所以找到了 5,所以将 5 赋值给 x,继续循环
  5. 此时 x 为 5,里面的 next 指向的是 9,也就是 next 此时为 9,9 大于 8。所以不在赋值给 x。而是进入下一层
  6. 进入到 head0,此时 x 为 5,next 为 7,然后 7 小于 8,进入下次循环
  7. 此时 x 为 7,但是 next 为 9,因为是 0 层了,所以直接返回 next

也就是说这个方法返回的值和他的名字一样,和当前值相等或者比当前值大的第一个数。在 Iterator 中 seek 的值会存储在本身的 node_ 属性中,需要进一步进行判断。如果这个值为 nullptr,则 Iterator 中的 Valid 为 false。

插入

查询也使用到了上面的 FindGreaterOrEqual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));

int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}

x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
const Key& key, int height) {
char* const node_memory = arena_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
return new (node_memory) Node(key);
}

插入过程中,首先是找到已经存在的 key,或者这个 key 的后缀节点,需要注意的是这个 key 再 leveldb 中时不允许在一个一模一样的 key 已经存在然后继续插入的。leveldb 中的每次插入都包含了一个序列,所以这个 key 一般不会一样,我能想到的完全一样的情况感觉就是 sequence 的值获取是出现问题,导致一个 key 拥有了相同的 sequence,这个后面再讨论。

然后就是获取到一个随机的高度,如果当前的高度超过了现在最大的高度(random 不会返回超过 12 的高度),就需要将在获取 key 的时候存储的 prev 节点添加上新的 head 节点。然后更新高度。将当前的 key 和 height 封装为一个新的 Node,主要是开辟内存空间。然后就是填充各个层的数据

删除

leveldb 本身的删除就是一个添加墓碑标记的删除,后续文件合并才会真正的删除,所以 Skiplist 中也没有删除。

迭代方式

SkipList 的迭代器中的方法有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool Valid() const; // 查询的结果是否是nullptr
// Advances to the next position.
// REQUIRES: Valid()
void Next(); // 找到查询值的后续节点

// Advances to the previous position.
// REQUIRES: Valid()
void Prev();// 查询值的前序节点

// Advance to the first entry with a key >= target
void Seek(const Key& target); // 获取到key>=target的值

// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst(); // 最小值

// Position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToLast(); // 最大值

其中 Valid 、 Next 和 Seek 已经大概说过了。主要就是一个 Prev,SeekToFirst 和 SeekToLast。

SeekToLast 的方法比较简单,就是从 level 为 0 一直遍历到最最后面。SeekToFirst 实现就是找到第一个,也就是获取 level 0 的第一个值。所以这个两个值可以算得上是获取最大最小值。

Prev 则是找到当前查询值小的最后的一个值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
if (next == nullptr || compare_(next->key, key) >= 0) {
if (level == 0) {
return x;
} else {
// Switch to next list
level--;
}
} else {
x = next;
}
}
}

核心实现就是上面的 FindLessThan,可以看到和前面的 FindGreaterOrEqual 类似,FindGreaterOrEqual 已经很详细的说过,这里就不赘述了。

总结

leveldb 的 skiplist 的实现是比较经典的实现。只是里面的获取为什么需要通过 Iterator,而不是直接提供一个 get 方法我不是很理解。

前言

LevelDB中出现最多的对象就是Slice了,他算的上是一个轻量级的string 对象,里面只有两个成员变量:

1
2
const char* data_;
size_t size_;
  1. 在Slice 中看到一个c++的operator的关键字,这个关键字的作用就是重载运算。比如重载两个数据中的+
1
2
3
4
5
6
7
8
9
10
class MyClass {
public:
int value;
MyClass(int val) : value(val) {}
// Overloading + operator
MyClass operator+(const MyClass& other) {
MyClass result(value + other.value);
return result;
}
};

如果有MyClass + MyClass的操作,就会执行重载里面的方法。如果是下面那种=的重载,则相当于重载了赋值= 的含义,后面使用default,则表示使用默认的复制函数

  1. const 在后面还有const修饰符,成员函数如果被声明为 const,表示这个成员函数是一个不会修改对象状态的函数。也就是说,它不会修改调用该函数的对象的任何成员变量。感觉是不可变量的一样。

他的构造函数为:

1
2
3
4
5
6
7
8
9
10
 Slice() : data_(""), size_(0) {} //空slice,
// Create a slice that refers to d[0,n-1].
Slice(const char* d, size_t n) : data_(d), size_(n) {} // 在d指向的char数组中0到n-1的char的值
// Create a slice that refers to the contents of "s"
Slice(const std::string& s) : data_(s.data()), size_(s.size()) {} // 传入一个string的地址
// Create a slice that refers to s[0,strlen(s)-1]
Slice(const char* s) : data_(s), size_(strlen(s)) {} //自传入char的指针
// Intentionally copyable.
Slice(const Slice&) = default;
Slice& operator=(const Slice&) = default; // 这种写法就是重载了= 的运算符

可以看到,他接受的最后基本上都是一个char的指针,以及当前在char指针中存储的数据length,需要获取数据,就直接去对应的char数组里面获取。本身没有创建内存或者释放内存,也就是说只是做为一个char数组的标识一样,本身不会产生内存和数据。

他的成员函数包括:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Return a pointer to the beginning of the referenced data
const char* data() const { return data_; } // 返回当前data的指针
// Return the length (in bytes) of the referenced data
size_t size() const { return size_; } // 当前slice表示的数组长度
// Return true iff the length of the referenced data is zero
bool empty() const { return size_ == 0; } // 判断是否为空
// Return the ith byte in the referenced data.
// REQUIRES: n < size()
char operator[](size_t n) const { // 重载[] 运算符,相当于可以让当前的值变成数组?
assert(n < size());
return data_[n];
}
// Change this slice to refer to an empty array
void clear() { // 清理当前的数据
data_ = "";
size_ = 0;
}
// Drop the first "n" bytes from this slice.
void remove_prefix(size_t n) { // char* 后面的数组内存是连续的。在C++中,数组是一块连续的内存区域,而 char* 是一个指向字符(char)的指针,它可以指向数组的起始位置.所以可以直接将指针往后移,将初始位往后移动
assert(n <= size());
data_ += n;
size_ -= n;
}
// Return a string that contains the copy of the referenced data.
std::string ToString() const { return std::string(data_, size_); } // 转为string
// Three-way comparison. Returns value:
// < 0 iff "*this" < "b",
// == 0 iff "*this" == "b",
// > 0 iff "*this" > "b"
int compare(const Slice& b) const; // 和b进行比较
// Return true iff "x" is a prefix of "*this"
bool starts_with(const Slice& x) const { // 前缀判断
return ((size_ >= x.size_) && (memcmp(data_, x.data_, x.size_) == 0));
}

slice的方法除了remove_prefix和clear以外都是非静态的即对Slice本身不做改变,所以在Slice.h中作者写道

1
2
3
4
// Multiple threads can invoke const methods on a Slice without
// external synchronization, but if any of the threads may call a
// non-const method, all threads accessing the same Slice must use
// external synchronization.

也就是说只要没有调用非const方法,那么Slice 就是线程安全的。

Slice还有3个内联函数

  1. 内联方法是会将这个方法在编译阶段直接展开的方法,而不是需要方法调用

  2. memcmp 是 C/C++ 标准库中的一个函数,用于比较两块内存区域的内容是否相等。它的声明如下:

    int memcmp(const void* ptr1, const void* ptr2, size_t num);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
inline bool operator==(const Slice& x, const Slice& y) { // 重载== 方法
return ((x.size() == y.size()) &&
(memcmp(x.data(), y.data(), x.size()) == 0));
}

inline bool operator!=(const Slice& x, const Slice& y) { return !(x == y); } // 重载 !=

inline int Slice::compare(const Slice& b) const // 和b进行比较,比较两者长度相同位置的char 是否相等
const size_t min_len = (size_ < b.size_) ? size_ : b.size_;
int r = memcmp(data_, b.data_, min_len);
if (r == 0) {
if (size_ < b.size_)
r = -1;
else if (size_ > b.size_)
r = +1;
}
return r;
}

why

为什么不使用string对象,而是使用Slice 作为char数组呢?在代码中的doc/index.md 中说明了原因:

Returning a Slice is a cheaper alternative to returning a std::string since we do not need to copy potentially large keys and values. In addition, leveldb methods do not return null-terminated C-style strings since leveldb keys and values are allowed to contain '\0' bytes.

  1. Slice 本身是不包含内存管理和分配的,而是直接应用外部的char数组指针,这样在操作的时候就避免了数据的复制和拷贝。主要是因为存储格式是按照length value的形势紧凑的存储的,如果key或者value的值较大,可能就是会从原始数据中截断,然后复制到string中,添加了额外的数据复制。

  2. 使用Slice 还有一个好处就是上文一直提到的,它本身不管理内存,只是对引用的数据,优化了内存的使用。

  3. 数据更加灵活,因为string本身是按照‘\0’作为结束符,但是在LevelDB中可能存在空的字节,这个是因为sequence等高位为空的情况,所以不太适合于string。PS:在debug的过程中,尤其是debug到skiplist插入的时候,就可能存在只能在clion中看到key,但是无法看到value的情况。就是因为string已经被截断了。

  4. 和string的切换十分方便,因为构造函数和tostring其实都是将size和data中的数据进行一个转换,能够灵活转换,而且更加灵活记录数据。

后记

Slice 其实就是一个仅记录char 数组头部指针和长度的数据类型,本身可以说不存储数据,个人感觉就是个start,end的结构来记录char 数组中的数据,然后就是提供了需要使用的比较和赋值等数据的方法。因为没有内存,但是可以灵活操作数组中的数据,这里的操作大部分都是线程安全的如比较或者获取的操作。写的操作较少。

前言

Leveldb 作为一个 kv 数据库,使用的 lsm-tree,一句话解释 lsm-tree 就是将数据按照字符排序。首先将数据写入内存,然后将数据刷入磁盘,定时合并文件。在每个文件头部记录下当前最小最大的 key,然后使用 bloom 记录是否存在 key,查询的时候根据二分查找等方式进行数据查找。
数据是紧凑的写在内存或磁盘上的,所以每条数据都会记录当前值的长度和具体的值形如:[key_length][key][value_length][value],如 key 为 key_123,value 为 value_123,记录方式就是[7][key_123][9][value_123]。一个 32 位整型的空间一般是 4 个字节,Leveldb 就是觉得这个 4 个字节实在是太多了,如上文,当前的 key 是 7,其实只需要一个字节就可以表示,所以在这个上面 Leveldb 使用了 Varint。具体在leveldb/util/coding.ccleveldb/util/coding.h中实现,本文就是对其中的实现做一个简单的介绍和分析。

具体实现

Leveldb 中上文提到的两个源码文件中,包含了两种,一种是 Varint 和 Fixed,包含了 32 位和 64 位,也就是 32 位整型和 64 位整型 的值。核心都是使用 char 数组来表示 32 位整型和 64 位整型。

编码具体实现

下面是编码方法列表

1
2
3
4
5
char* EncodeVarint32(char* dst, uint32_t value);
char* EncodeVarint64(char* dst, uint64_t value);
// 在coding.h 文件中直接实现的内联函数
inline void EncodeFixed32(char* dst, uint32_t value);
inline void EncodeFixed64(char* dst, uint64_t value);

为了将 32 位整型或者 64 位整型 使用 char 数据记录,肯定需要对 int 的 4 位字节分别存储,也就是每次移动 8 位,为了标识当前字节处于整个字节的位置,还需要预留一个标识符,所以每一个 char 最多有 7 位标识一个 int,2 的 7 次方,也就是一个 char 最多标识 128 的数据。也就是说如果超过了 2 的 28 次方,可能需要额外的字节来标识。也就是说 int 值最多需要 5 个字节来表示。对应的 64 位整型类型则需要使用 64/7 +1 也就是 10 个字节来标识。

32 位整型

下面的代码核心其实就是分为 5 个字节如何存储,每一个 if 就是一个多出一个字节。

*(ptr++) = v;的写法结果:

  1. ptr 指向的地址获取一个指向的值。
  2. 将变量 v 的值赋给该地址处。
  3. ptr 的值增加 1,使其指向下一个地址。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
char* EncodeVarint32(char* dst, uint32_t v) {
// Operate on characters as unsigneds
uint8_t* ptr = reinterpret_cast<uint8_t*>(dst);// 将当前指针变成uint8_t类型的指针
static const int B = 128; // 2的 七次方,确1000 0000
if (v < (1 << 7)) {
*(ptr++) = v; // 如果当前的值小于2^7,则直接将值写入指针,最大标识0111 1111 即127
} else if (v < (1 << 14)) {
*(ptr++) = v | B; // 如果值大于2^7,小于2^14,则首先小后七位写入第一个字节,也就是说是小端存储,此时该字节首位是1,因为B 1000 0000 使用或的关系只是将后七位和原值相等,首位赋值1
*(ptr++) = v >> 7; // 将剩下的值写入,此时首位为0
} else if (v < (1 << 21)) { // 大于2^14小于2^21,和上文类似
*(ptr++) = v | B;
*(ptr++) = (v >> 7) | B;
*(ptr++) = v >> 14;
} else if (v < (1 << 28)) {
*(ptr++) = v | B;
*(ptr++) = (v >> 7) | B;
*(ptr++) = (v >> 14) | B;
*(ptr++) = v >> 21;
} else {
*(ptr++) = v | B;
*(ptr++) = (v >> 7) | B;
*(ptr++) = (v >> 14) | B;
*(ptr++) = (v >> 21) | B;
*(ptr++) = v >> 28;
}
return reinterpret_cast<char*>(ptr); // 将值转换会原来的位置
}

举例说明下面的各个字节标识:

原始值 32 位二进制 转换后
1 00000000 00000000 00000000 00000001 00000001
129 00000000 00000000 00000000 10000001 10000001 00000001
65537 00000000 00000001 00000000 00000001 10000001 10000000 00000100

上面分别为 1,2^7+1 ,2^16+1,也就是分别需要 1 字节,2 字节和 3 字节的数据存储。所以 varint32 的值有以下特征:

  • 小端存储
  • 如果当前 char 的最高位是 1 ,则说明当前的值没有结束,一直到最高位为 0 的 char
64 位整形

64 位和 32 位基本上没有区别:

1
2
3
4
5
6
7
8
9
10
char* EncodeVarint64(char* dst, uint64_t v) {
static const int B = 128;
uint8_t* ptr = reinterpret_cast<uint8_t*>(dst);
while (v >= B) {
*(ptr++) = v | B;
v >>= 7;
}
*(ptr++) = static_cast<uint8_t>(v);
return reinterpret_cast<char*>(ptr);
}

实现上只是使用循环的方式,其他的都是一样的。这里就不赘述了。

解码具体实现

相对编码,解码里面提供了较为多方法,包含了和类外一个 LevelDB 比较重要的对象 Slice 的交互,这里先不看这个 Slice 的交互,只看解码的部分。

下面是我觉得两个比较核心的方法:

1
2
3
4
5
6
7
// Pointer-based variants of GetVarint...  These either store a value
// in *v and return a pointer just past the parsed value, or return
// nullptr on error. These routines only look at bytes in the range
// [p..limit-1]
// 传入的待解码char开始指针p,待解码char结束指针,最后值写入的指针v。看方法参数可以看到,一般情况下都是在某个char* 类型的数据上进行顺序读取来获取数据,也就是p 和limit 应该是属于一个char* 的不同位置的指针,他的返回值是指向当前值结尾指针的下一个指针。如果出错,则返回nullptr
const char* GetVarint32Ptr(const char* p, const char* limit, uint32_t* v);
const char* GetVarint64Ptr(const char* p, const char* limit, uint64_t* v);

在实现上分为两种情况:

  1. 当前的值小于 128,即传入的 p 的第一个字节为 0,也就是上文编码中说的,如果首位为 0 ,则说明后续都没有数据了。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    inline const char* GetVarint32Ptr(const char* p, const char* limit,
    uint32_t* value) {
    if (p < limit) { //确保p是在limit 前面的
    uint32_t result = *(reinterpret_cast<const uint8_t*>(p));
    if ((result & 128) == 0) { // p的第一个字节为0,说明当前值7位就可以标识,字节返回。
    *value = result;
    return p + 1; // 返回当前解码后值的后一个字节
    }
    }
    return GetVarint32PtrFallback(p, limit, value);
    }

如果超过 7 位,则进入到 GetVarint32PtrFallback 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const char* GetVarint32PtrFallback(const char* p, const char* limit,
uint32_t* value) {
uint32_t result = 0;
// 最多执行5次
for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
uint32_t byte = *(reinterpret_cast<const uint8_t*>(p));
p++; // 移动p的指针
if (byte & 128) { // 如果当前char的首位是1,说明后续还有值
// More bytes are present
// 取当前值的后7位,并且移动shift
result |= ((byte & 127) << shift);
} else {
// 当前已经到尾部字节,将值写入即可
result |= (byte << shift);
*value = result;
return reinterpret_cast<const char*>(p);
}
}
// 出现如limit>p这种情况,字节返回nullptr
return nullptr;
}

64 位的解码基本上和 32 位一样,只是移动次数变成了 10 次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const char* GetVarint64Ptr(const char* p, const char* limit, uint64_t* value) {
uint64_t result = 0;
for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) {
uint64_t byte = *(reinterpret_cast<const uint8_t*>(p));
p++;
if (byte & 128) {
// More bytes are present
result |= ((byte & 127) << shift);
} else {
result |= (byte << shift);
*value = result;
return reinterpret_cast<const char*>(p);
}
}
return nullptr;
}

总结

leveldb 针对 32 位和 64 位的整形进行了优化,能够节约空间。个人觉得这么做的主要目的是因为我们的 key 或者 value 的值长度一般不会很大,很少人会使用 2^32 个字节来作为 key 存储。所以这么做积少成多,确实能够节约不少的空间。使用 0,1 判断是否到数据尾部,确实很秀。

前言

leveldb 是google的大佬写的一个嵌入式的KV数据库。底层是使用的LSM-Tree 索引结构。本文主要涉及levelDB的数据写入,也就是Put 操作。我是个搞java的,C或者C++是不咋会,所以边看边学,可能比较啰嗦。

Debug环境搭建

首先拉代码:

git clone https://github.com/google/leveldb.git

因为源码中的third_party 包含了两个,一个是google的test,还有一个是benchmark。需要拉下来,放到third_party的目录,或者直接在目录里pull

git clone https://github.com/google/benchmark.git
git clone https://github.com/google/googletest.git

按照文档中,需要

mkdir -p build && cd build
cmake -DCMAKE_BUILD_TYPE=Release .. && cmake –build .

但是因为我的目的是在clion debug。所以暂时没有做这一步,而是在源码中新增了一个app目录,下面放我的demo代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <cassert>
#include <iostream>
#include "include/leveldb/db.h"
int main() {
leveldb::DB *db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "/home/tuitaking/source/leveldb/file", &db);
assert(status.ok());
std::cout << "leveldb open success!" << std::endl;
std::string value;
std::string key1 = "testkey1";
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.IsNotFound()) {
std::cout << "can not found for key:" << key1 << std::endl;
db->Put(leveldb::WriteOptions(), key1, "testvalue1");
}
s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) {
std::cout << "found key:" << key1 << ",value:" << value << std::endl;
}
s = db->Delete(leveldb::WriteOptions(), key1);
if (s.ok()) {
std::cout << "delete key success which key:" << key1 << std::endl;
}
s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.IsNotFound()) {
std::cout << "can not found after delete for key:" << key1 << std::endl;
}
delete db;
return 0;
}

然后在CMakeLists.txt 中新增上这个模块

1
2
3
4
5
6
add_library(leveldb "" )
target_sources(leveldb
PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
"app/main.cpp" # 加这个位置
"db/builder.cc"

然后将test 也加入

1
2
leveldb_test("db/c_test.c")
leveldb_test("app/main.cpp") # 加的这行

然后就可以在clion中看到一个叫main的test,直接可以debug。至于为什么估计需要看cmake 相关的知识,暂时不去管他,总之这样就可以直接debug了。如果是windows,cmake的环境只要做好了,其他的都是类似。在clion中可以直接debug。

数据的写入

本文暂时不去处理DB的初始化,而是 看PUT操作。

1
db->Put(leveldb::WriteOptions(), key1, "testvalue1");

其中,writeOptions 标识此时是一个写入操作,写入的key就是key1,值就是testvalue1。写入的options 参数只有一个,就是是否sync。也就是写入pagecache 就返回还是刷到磁盘才返回,后者肯定速度慢一点。

1
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) 

put 的入参为WriteOptions,Slice ,可以看到我们传入的string在这里变成了slice,感觉是c++ 的一个优化还是啥,因为Slice 中有 Slice(const std::string& s) 这样的构造函数,回过来,Slice 可以看做一个变长的而且可以做比较的字符串,使用的是一个c const char* data_; size_t size_;来表示的。然后put操作会将当前传入的值变成一个WriteBatch

WriteBatch 包含了两个成员变量和六个函数:

  • friend class WriteBatchInternal;

    • friend 是一个C++的关键字,表示这个类可以访问私有成员变量
    • 这个对象没有成员变量,全部都是成员函数,用来操作WriteBatch 中的req
  • std::string rep_;

    • 写入的数据就放入到这个string中 在 write_batch.cc 中有标识:

      • WriteBatch::rep_ :=
        sequence: fixed64 // 表示当前写入的seq,每一个写入都有一个seq,用来做类似于mvvc的版本控制的(?)
        count: fixed32 // 当前bacth中写入的个数
        data: record[count] // 写入的值,按照keylen+keyValue+dataLen+dataValue

        [ sequence: fixed64 ][ count: fixed32 ][ record[0] ][ record[1] ]…[ record[count-1] ]

Put方法首先就是将key和value放入到writebatch的req中,也就是上文中的data中的值。数据组织完毕。此时的req 的值为

“\000\000\000\000\000\000\000\000\001\000\000\000\001\008testkey0\004aaaa”

在正式写入的方法中

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates)

包含了下面几个操作:

  1. 将Put操作封装为Writer,并且放到writers 队列中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Writer w(&mutex_); 
w.batch = updates;
w.sync = options.sync;
w.done = false;
// 加锁
MutexLock l(&mutex_);
// 放入队列中 std::deque<Writer*> writers_
writers_.push_back(&w);
// 如果当前的队列不是在头部,则等待。这里就可以控制写入是先来后到的顺序。因为push_back 是加锁的操作,而 且在push之前就已经加锁了。
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
// 等待
}
// 检查下是否已经完成写入,因为下面会有个合并当前写入的操作
if (w.done) {
return w.status;
}
  1. 上文的代码片段是将写入操作放入队列中,感觉就是和java的线程池中的Runnable 队列一样,将需要执行的work放在队列中。下面是写入前的一些操作:
1
2
3
4
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr); // 确保当前的资源可以使用,涉及到mem和log,暂时不说明,主要就是为本次写入做资源的准备
uint64_t last_sequence = versions_->LastSequence();// 从version中获取当前最后的seq
Writer* last_writer = &w;
  1. 创建完空间和获取到seq,就开始执行写入操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); // 这里就是上文提到的,将此时队列中的write 合并为一个,一起写入
// 将本次写入的seq 写入到batch中
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
{
// 注释A
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
mutex_.Unlock(); //释放掉了写入queue中的
// 写入到日志中
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) { // 如果需要立即刷盘,就将数据刷盘
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
// 写入到内存中
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
// 虽然本次是一次写入,但是每次写的seq 都是新增在seq中的,所以seq 应该是一个递增的值
versions_->SetLastSequence(last_sequence);
}

考虑下当前的场景,如果开始有t1,t2,t3 同时写入,此时t1 在writes的front,所以他可以执行上文3 中的代码,也就是将数据写入到log和mem,但是此时writers的front仍然是t1,所以在将队列中所有数据(BuildBatchGroup 会将当前队列中的数据全部变成一个batch)准备好后,如果此时t4,t5,t6线程进行并发写入,他们也会在上文2的代码中被新增到队列尾部,但是不会进入到写操作中,但是由于此时我们的writes的add操作也是被加锁了,所以t4,t5,t6应该是可以加到队列中,但是因为他们都不是front,所以都会在等待,一直到front变成他们中的某一个。

等到当前队列中的数据都写入到日志和内存中后,这个时候会再次获取到锁,判断当前的写入是否出现异常,然后释放当前写入的资源,也就是tmp_batch (BuildBatchGroup)核心就是将写入封装为这个tmp_batch。这里的异常主要是判断是否磁盘有异常,如果有就直接将后续所有的写入全部唤醒为异常状态。清理动作做完后就将sequence进行修改。可以看到sequence的获取和修改都是在加锁的状态下完成的。

  1. 当前新写入成功后,就可以释放后续的写入线程了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
   
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;

这里的唤醒核心就是将writers的队列,这个last_writer 指向的就是上文中已经被写入的最后一个writer,也就是说会挨个唤醒和pop出队列,一直到当前写入批次的最后一个。

如果writers 不为空,则继续唤醒队头,执行上面的操作。

通过将数据的写入进行入队控制写入的顺序,然后将队列中的当前大小(通过锁确定合并写入队列不持续增加)进行合并写入,然后写入日志和内存,最后挨个唤醒。一个锁做到了控制队列,合并写入等功能。

写入日志

写入日志也就是上文的

1
log_->AddRecord(WriteBatchInternal::Contents(write_batch));

前言

前面大概说了下kafka的网络模型,还是想将kafka的解析做一个系列。我不是一个很会写故事的人,有时候感觉是意识流。所以这次主要是按照kafka的文档中的design,中文翻译地址。按照这个设计需要考虑的进行分析。我写这个的时候刚好是kafka实现了自己的raft,基于的版本是3.3.1

阅读全文 »

前言

Kafka 是一个高性能的消息队列。在有 Netty 这么成熟的网络扩建之后,kafka 的客户端和服务端都没有使用 netty 作为网络框架,而是自己完全实现了一个网络层的通信。我能找到的的原因上面说 1 是为了更好的性能 2 是因为 kafka 本身引用了各种各样的 jar 包,而这些 jar 包很容易引起冲突,而且网络库是比较底层的库,所以才自己实现了一套。本文就 broker 端的 kafka 网络架构做一个大概的解析。希望能够了解到和 netty 实现的区别以及这么带来的好处。

阅读全文 »
0%