LevelDB之SkipList

前言

在数据结构中,树一直被很多系统钟爱,如 mysql 的 innodb 使用的是 B+ 数,在 java 中 Map 的 hash 碰撞后,如果链表超过 8 会切换为红黑树。树结构的好处个人认为是在写入的时候对数据进行一个预处理,而且这个出力和子节点的数量相关,在写入的时候按照规则插入,能够在查询的时候有效的查询对应的子树,从而达到查询时间为 LogN 。如红黑树,红黑树的结构是异构化的 23 树,能够确保左右子树的高度差维持在 1,有效保证数据平衡,但是红黑树的实现比较复杂,插入过程涉及到树的重平衡。后来出现了一种链表的结构,称之为跳表即 SkipList,他是由 William Pugh 在Skip Lists: A Probabilistic Alternative to Balanced Trees提出的一种平衡性很好的数据结构,这种数据结构是使用空间换时间,实现较为简单,查询复杂度和树结构接近。在 redis 和 leveldb 中都有使用。本文会介绍下 leveldb 中的 SkipList 的实现。

SkipList 原理

在某种意义上,链表可以算得上是一种特殊的树,只有一个分支的树。一般情况下分为单向链表和双向链表。单向链表里面的节点会有个指针指向下一个节点,双向链表包含两个指针,一个指向指向他的节点(prev),一个指向下一个节点。SkipList 是单向链表,但是不止一个链表,而是使用 n 个链表分层,每一层中的 node 指向的下一个节点各不相同。后续的说明主要是针对上文提到的论文中的一个小摘抄:

论文中提到(第一节),二叉树在随机大小写入的情况下效率很高,但是如果是顺序大小写入则表现的比较一般,这个是因为二叉树尤其是平衡二叉树需要对数结构进行一个平衡引起的。

SkipList 是一种平衡树的概率性的替代品,这里概率性个人觉得主要是上面提到的随机大小数据写入过程平衡树效率比顺序写高,就是通过另外的方式实现了这种顺序写变为随机。论文中提到,250 个元素的 skiplist 需要查询查过 3 次的概率不到百万分之一。

论文中还提到,skiplist 在实现上比较容易,而且使用的是概率性的计算来保持数据的平衡,在空间上平均每个节点可能只需要 1+1/3 个指针。

SkipList 还有一个好处就对锁范围的控制上,在平衡二叉树或者红黑树,在并发操作的情况下,特定的时候如需要修改 Root 节点的属性,可能需要对整棵树都加锁。但是 SkipList 里面可以针对局部节点进行加锁,在并发情况下实现比较好。

image-20230831163355393

上图选自论文中的 figure1。

n 标识当前链表的长度,且链表是顺序存储的。

  1. 在初始阶段,当前的单向链表查询可能需要遍历 n
  2. 如果将可以记录每间隔个节点创建连接,则可以将查找的时间缩小为 n/2+1。按照最上层比较,可以找到当前查询值的前一个节点,所以是 n/2+1
  3. 还可以在 2 的基础上,每间隔一个继续创建一个连接,则时间缩小为 n/4+2。首先最上层找,此时是 n/4,然后在第二层,因为当前中间有 b 层,而且 b 层此时只有 1 个节点,当前值要么大于这个值要么小于,所以需要 2 次就可以找到当前值。
  4. 现在将具有 k 个指向其他指针的节点称之为 level K 节点,则说明第 2^i 节点后面有 2^i 个节点,则节点分布在一个比较均匀的数据中。level0 0 节点有 100%个节点,有 50% 的节点在 level1,有 25% 在 level2,以此类推。
  5. 但是如果我们随机选择 level1 层的节点,可能会出现上图 e 中情况,也就是高层分配不均匀,如 6 虽然在第四层,但是他却处于第二个位置。

可以看到,SkipList 的核心其实是这个层数的确定,如果是按照十分恰当的分配节点中的每一个节点的 level,他的时间复杂度就是 logN。

这个上升当前节点的 level 的时机是影响链表查询的关键。

SkipList 的操作

主要设计到增删查:

  1. Search 操作返回目标关键字所关联的 value,如果没有该关键字则返回 failure

  2. Insert 操作对目标关键字更新 value,如果目标关键字不存在,则将关键字插入

  3. Delete 操作删除目标关键字。

查询操作

查询操作是删除和插入的基础,因为每一层都是单向链表,所以必须要找到插入的父节点才行,如果当前节点的 level 较高,可能还需要将查询过程中的每一层的父节点都记录下来。

伪代码如下:(论文 figure2)

img

伪代码是搜索一个 searchKey 是否在 skiplist 中。具体步骤如下:

  1. 将 x 初始话为 skiplist 的头节点
  2. 进入第一次循环,循环是从当前最大的 level 到 level1
  3. 然后进入第二层循环,在当前层的 forword 对象中找到 key 值不小于当前 searchkey 的值
  4. 跳出 3 的循环后,此时已经找到本层不小于 searchkey 的值,继续往下一层找。如果找到了等于当前 searchkey 的值,则返回值存储的 value。
  5. 如果从 maxlevel 到 1 都没有这个值,返回一个 failture。

查询就是从最上层往下查询的过程。

插入和删除

在上文中,我们查询不到这个值就返回一个 failture,但是其实也可以返回当前找到的 level1 层的值,因为这个值是当前 skiplist 中不小于 searchKey 的值。如果说是插入,那么也应该是插入到这个值的后面。但是因为涉及到 level 的变化,所以查询过程中需要将当前查到的每一层不小于当前的值都查出来,然后随机一个当前值的 level,然后将存储的每一层 level 都和当前 level 进行一个绑定。删除也类似。

插入和删除图形化为:

img

后文根据论文中的伪代码介绍下操作

插入

插入的伪代码如下:论文 figure4

image-20230901144301435

  1. 定义一个变量 update,用来存储上文提到的每一层中不小于当前值的 Node。

  2. 查找当前的 key 和将每一层比较的结果赋值到 update 中。

  3. 如果查到当前值了,那么 put 就变成了 update,直接将 skiplist 中的对应 key 的 value 修改为当前的 newvalue。

  4. 如果没有找到,也就是上文提到的,首先随机产生一个当前的 level。如果当前的 level 超过 skiplist 当前最大的 level,那么需要将当前的 level 设置为新生成的 level,然后将 head 放到每一层的头节点。

  5. 将插入的数据变成 Node,Node 包含了当前 key,value 和 level 信息,当然内部还有个 next 信息

  6. 从第 1 层往上,将当前的 x 插入到每一层。

这里有个问题就是对 level 的管理,因为不可能随便随机一个数出来,最好的是希望设置一个 maxlevel,然后作为 seed,在 maxLevel 下面随机产生层数。

随机算法

在论文中提到过,skiplist 本身是个概率性的算法,个人觉得概率性其实就是指的层数的选择。他想要达到的效果是在上升中,某一层出现的概率是大致一定的。也就是说如果第 i 层的元素能够按照某个固定的概率 p(上文使用的是 1/2)出现在在 i+1 层,这里面涉及到概率论里面期望运算,就不再赘述了(我已经还给老师了)。一句话说就是如过选择的是 p=1/2,那么我们就希望如果 n 等于 16,那么第 0 层是 16,第二次是 8,第三层是 4。说白了就是返回的值出现的概率是一样的,比如 返回 1 的概率就是 1/2,返回 2 的概率就是 1/4。

1
2
3
4
5
6
7
8
9
private static final double PROBABILITY = 0.5;
private int randomLevel() {
int level = 1;
Random random = new Random();
while (random.nextDouble() < PROBABILITY && level < MAX_LEVEL) {
level++;
}
return level;
}

使用这种方式可以生成,主要是因为 random.nextDouble() 出现的数是比较均匀的分布,就是出现的概率是相对一定的,然后只要小于 0.5 就多一层,也就是上面的 1/2 的概率,继续增加 level,则就是 1/2 *1/2 的概率了。

删除

删除也是在查询基础上做的,伪代码如下:

img

首先仍然是记录下查询过程的每一层的上一个节点,如果找到这个值对应的 Node,就将 update 中存储的值都进行一个链表删除操作,然后释放内存。最后,还需要检查当前是否删除了最高的 level 中的最后一个 node,如果是还需要将当前的 level 减 1.

自己写个 SkipList

使用 java 自己写了个 skiplist,其中数据使用 DataNode,索引使用 IndexNode,避免值的无效覆盖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import java.util.LinkedList;
import java.util.Random;

public class SkipList<K extends Comparable<K>, V> {
// 当前最大的层数
private int maxLevel = 12;
// 头结点,down 使用down指向下一层
private SkipListIndexNode<K, V> head;
// 当前的level
private int level = 1;
private static Random random;
//跳到下一层的概率
private double probability = 0.5;

public SkipList(int maxLevel, double probability) {
random = new Random();
this.maxLevel = maxLevel;
this.probability = probability;
SkipListDataNode data = new SkipListDataNode(Integer.MIN_VALUE, null);
head = new SkipListIndexNode(data, 1);
}


public V get(K key) {
SkipListDataNode<K, V> old = searchNode(key);
if (old != null) {
return old.value;
} else {
return null;
}
}

public K getMax() {
SkipListIndexNode<K, V> cur = head;
while (cur.down != null) {
while (cur.right != null) {
cur = cur.right;
}
if (cur.down == null && cur.right == null) {
break;
}
cur = cur.down;
}
return cur.getKey();
}

public K getMin() {
SkipListIndexNode<K, V> cur = head;
while (cur.down != null) {
cur = cur.down;
}
return cur.right == null ? null : cur.right.getKey();
}

public void put(K key, V value) {
SkipListDataNode old = searchNode(key);
if (old != null) {
old.value = value;
return;
}
int newLevel = randomLevel();
if(newLevel>maxLevel){
newLevel=maxLevel;
}
if (newLevel > level ) {
for (int i = level; i < newLevel; i++) {
genNewHead();
}
}
SkipListDataNode<K, V> data = new SkipListDataNode<K, V>(key, value);
SkipListIndexNode<K, V> indexNode = new SkipListIndexNode<K, V>(data, newLevel);
LinkedList<SkipListIndexNode> update = new LinkedList<SkipListIndexNode>();
SkipListIndexNode<K, V> cur = head;
while (cur != null && cur.level > newLevel) {
cur = cur.down;
}
while (cur != null) {
while (cur.right != null && cur.right.getKey().compareTo(key) < 0) {
cur = cur.right;
}
update.add(cur);
cur = cur.down;
}
SkipListIndexNode<K, V> bottom = null;
while (!update.isEmpty()) {
SkipListIndexNode prevNode = update.pollLast();
SkipListIndexNode curLevelIndex = indexNode.genIndexNodeByLevel(prevNode.level);
curLevelIndex.right = prevNode.right;
prevNode.right = curLevelIndex;
curLevelIndex.down = bottom;
bottom = curLevelIndex;
}
}


public void delete(K key) {
LinkedList<SkipListIndexNode> update = new LinkedList<SkipListIndexNode>();
SkipListIndexNode<K, V> cur = head;
while (cur != null) {
while (cur.right != null && cur.right.getKey().compareTo(key) < 0) {
cur = cur.right;
}
update.add(cur);
cur = cur.down;
}

while (!update.isEmpty()) {
SkipListIndexNode skipListIndexNode = update.pollLast();
if (skipListIndexNode.right != null && skipListIndexNode.right.getKey().compareTo(key) == 0) {
skipListIndexNode.right = skipListIndexNode.right.right;
}
}
while (head.right == null) {
level--;
head = head.down;
}
}


public void printList() {
SkipListIndexNode bottom = head;
LinkedList<SkipListIndexNode> stack = new LinkedList<SkipListIndexNode>();
while (bottom.down != null) {
bottom = bottom.down;
}
SkipListIndexNode printLevel = head;
while (printLevel != null) {
SkipListIndexNode printLeveltail = printLevel;
System.out.printf("%-5s", "head->");
SkipListIndexNode bottomTail = bottom;
while (printLeveltail != null && bottomTail != null) {
if (printLeveltail.right != null && printLeveltail.right.getKey().compareTo(bottomTail.right.getKey()) == 0) {
System.out.printf("%-5s", printLeveltail.right.getKey() + "->");
printLeveltail = printLeveltail.right;
} else {
System.out.printf("%-5s", "");
}
bottomTail = bottomTail.right;

}
printLevel = printLevel.down;
System.out.println();
}

}

private SkipListDataNode searchNode(K key) {
if (key == null) {
return null;
}
SkipListIndexNode<K, V> cur = head;
while (cur != null) {
while (cur.right != null && cur.right.getKey().compareTo(key) < 0) {
cur = cur.right;
}
if (cur.right != null && cur.right.getKey().compareTo(key) == 0) {
return cur.right.dataNode;
}
cur = cur.down;
}
return null;
}

private void genNewHead() {
SkipListIndexNode<K, V> skipListIndexNode = new SkipListIndexNode<K, V>(null, ++level);
skipListIndexNode.down = head;
head = skipListIndexNode;

}


private int randomLevel() {
int level = 1;
while (random.nextDouble() < probability && level < maxLevel) {
level++;
}
return level;
}


public static void main(String[] args) {
SkipList<Integer, Integer> skipList = new SkipList<Integer, Integer>(12, 0.5);
for (int i = 1; i <= 200; i++) {
skipList.put(new Random().nextInt(200), 100 + i);
}
skipList.printList();
;
System.out.println(skipList.get(10));
System.out.println(skipList.get(11));
System.out.println(skipList.get(12));
System.out.println(skipList.getMin());
System.out.println(skipList.getMax());
for (int i = 0; i <= 20; i += 2) {
skipList.delete(i);
}
System.out.println(skipList.get(10));
System.out.println(skipList.get(11));
System.out.println(skipList.get(12));
System.out.println(skipList.getMin());
System.out.println(skipList.getMax());

}

class SkipListDataNode<K extends Comparable<K>, V> {
private K key;
private V value;

SkipListDataNode(K key, V value) {
this.key = key;
this.value = value;
}

}

class SkipListIndexNode<K extends Comparable<K>, V> {
SkipListDataNode<K, V> dataNode;
int level = 0;
SkipListIndexNode<K, V> right;
SkipListIndexNode<K, V> down;

public SkipListIndexNode(SkipListDataNode<K, V> dataNode, int level) {
this.dataNode = dataNode;
this.level = level;
}

public SkipListIndexNode genIndexNodeByLevel(int level) {
return new SkipListIndexNode(this.dataNode, level);
}

public V getValue() {
return dataNode.value;
}

public K getKey() {
return dataNode.key;
}
}
}

LevelDB 的 SkipList 实现

在上面已经较为详细的介绍了 SkipList,也使用 java 做了一个简单的实现。下面来看下 LevelDB 中的 SkipList 的实现。

在几乎所有的 SkipList 的方法上面都有一个 template 。这个的作用有点类似于 java 的泛型,只不过 java 是运行时而 C++ 是编译阶段。

template <typename Key, class Comparator> 值得是一个 key,这个 key 和 Comparator 是一个占位符,用来表示具体传入的数据类型,后面的 Comparator,则说明需要传入一个实现了 Comparator 的接口或者类的类型。

random 方法

首先来看下 random 的方法使用的位置:

1
2
3
4
5
6
7
8
9
10
11
12
template <typename Key, class Comparator>
int SkipList<Key, Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1; // 初始level是1
while (height < kMaxHeight && rnd_.OneIn(kBranching)) {// 根据概率,默认1/4
height++;
}
assert(height > 0);
assert(height <= kMaxHeight); // 当前的level是不是小于kMaxHeight(默认12)
return height;
}

其中最要的是这个rnd_.OneIn 方法,这个方法就是生成均匀分布的随机数。

现在的随机算法都是伪随机,算法为线性同余生成器,算法表示为:

1
Xn+1 = (a * Xn + c) % m

其中,Xn 是当前生成的伪随机数,Xn+1 是下一个生成的伪随机数,a 是乘法因子,c 是增量,m 是模数。通常,a16807c0m2147483647,这是因为这组参数在计算中具有良好的性质,可以生成均匀分布的伪随机数。

在 leveldb 中就使用到了上面提到的两个魔法数,具体是现在在util/random.h,因为是一个整体,所以全部复制出来,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Random {
private:
uint32_t seed_;

public:
explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { //0x7fffffff u表示无符号 2^31-1 二进制 111 1111 1111 1111 1111 1111 1111 1111 将第一位取0 ,因为无符号转化为有符号的时候头部位1 表示为负数
// 所以此处的主要目的是确保传入的s为正数,默认传入的seed初始值为0xdeadbeef 二进制为1101 1110 1010 1101 1011 1110 1110 1111
// Avoid bad seeds.
if (seed_ == 0 || seed_ == 2147483647L) {
seed_ = 1;
}
}
uint32_t Next() {
static const uint32_t M = 2147483647L; // 2^31-1
static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0 //无符号64位二进制0100 0001 1010 0111
// We are computing
// seed_ = (seed_ * A) % M, where M = 2^31-1
//
// seed_ must not be zero or M, or else all subsequent computed values
// will be zero or M respectively. For all other values, seed_ will end
// up cycling through every number in [1,M-1]
uint64_t product = seed_ * A; //26696993619177 二进制 High[000000 00000 0000 0000 1100 0010 0011 1] low [111 00000 1101 0010 0011 1100 1110 1001]
//这里是计算Mod算法的一个优化,一个64为的数,可以分为高33 位和低31 位的数
// Compute (product % M) using the fact that ((x << 31) % M) == x. product=high<<31+low 又因为product=seed_*A,所以此时product=(high*M+high+low)%M 其中 M = 2^31-1
// 因为seed_ 和A 中,seed_的值在初始化的时候就让他小于2^31-1,A 是固定的16807,所以这两个值都不会大于M的值,所以取余最后的结果就等(high+low)%M=high+low,所以下面的这个计算是获取high和low的值相加,也就得到了seed_
// 但是有一种情况就是product的low为刚好但是 2^31-1,这个时候product=(high*M+high+1*M)%M=high ,但是使用下面的结果会是high+M,因为M&M=M。所以,需要判断是否seed_ 比M大,然后前去M,直接使用high,也确保了seed的值一直小于M
// 最后强制转换为32位的值
seed_ = static_cast<uint32_t>((product >> 31) + (product & M)); // High+low [000000 00000 0000 0000 1100 0010 0011 1]+[111 00000 1101 0010 0011 1100 1110 1001] & [111 1111 1111 1111 1111 1111 1111 1111]
// The first reduction may overflow by 1 bit, so we may need to
// repeat. mod == M is not possible; using > allows the faster
// sign-bit-based test.
if (seed_ > M) {
seed_ -= M;
}
return seed_;
}
// 在[0,n-1]之间返回随机数
// Returns a uniformly distributed value in the range [0..n-1]
// REQUIRES: n > 0
uint32_t Uniform(int n) { return Next() % n; }
// n分之一的概率返回true
// Randomly returns true ~"1/n" of the time, and false otherwise.
// REQUIRES: n > 0
bool OneIn(int n) { return (Next() % n) == 0; }
// //首先求得[0,max_log]的一个随机数,然后求得[0,2^maxlog-1]的一个随机数
// Skewed: pick "base" uniformly from range [0,max_log] and then
// return "base" random bits. The effect is to pick a number in the
// range [0,2^max_log-1] with exponential bias towards smaller numbers.
uint32_t Skewed(int max_log) { return Uniform(1 << Uniform(max_log + 1)); }
};
}

上面的方法就随机数的 seed 计算比较上头。因为第一个种子确定后,后面的种子都相对确定了,而且这个出现的概率比较均匀,所以也就是 OneIn 能够保证出现概率接近的原因。

数据结构

leveldb 中的 SkipList 具体实现在db/skiplist.h中。

私有域:

  1. struct Node 用来存储数据,和上面 Java 实现的不同,Node 中 key 的值既包含了需要查询的 key,也包含具体的 Value,这个和数据存储格式相关系
  2. enum { kMaxHeight = 12 }; 允许最大的层数是 12 层
  3. compare_ 用来做 key 值比较的比较器
  4. arena_ 为每个 Node 分配内存
  5. head_ 头结点指针
  6. std::atomic maxheight; 原子递增的最大层数
  7. 一个上文提到的随机数生成的对象

explicit 关键字一般用来修饰类的构造函数,作用是告诉编译器按照实际的类型来构造函数,不允许做隐式转换。

公有域:

  1. 一个不允许隐式转换的构造器

  2. void Insert(const Key& key); // 插入方法

  3. bool Contains(const Key& key) const; 判断是否存在

  4. Iterator 一个迭代器实现,迭代器中包含了多种查询方式,包括查找当前的 key 等方法(后文详细介绍)。其中也包含了私有域,指向当前的 SkipList 和一个用来遍历的 Node。

Node 结构体包含了一个 Key,这个就是前面提到的template <typename Key, class Comparator>中的这个 Key,还包含了一个数组

std::atomic<Node*> next_[1];

这个是一种柔性的数组。也就是大小是可变的,next 数组是一个指针数组,也就是上文中 java 代码中的 IndexNode,本身不存储 key Value,而是作为索引存储。这个 next 表示当前层数中指向的下一个节点。

一个 SkipList 初始化过程中需要传入的参数就是比较器 comparator* 和内存分配器 arena*的指针。

查询

和上面的实现不同的是,leveldb 的查询是放在 Iterator 的 seek 方法中的

1
2
3
4
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}

最后调用的是 SkipList 本身的 FindGreaterOrEqual 方法,也就是要么找到当前值,要么就找到比当前值大的那个值。该方法包含了两个参数,第一个是需要查询的值,第二个是用来存储当前查询 level 的前序节点,也就是 update 数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
// null n is considered infinite
return (n != nullptr) && (compare_(n->key, key) < 0);
}

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_; // x为后续查询使用的临时数据,此时x在最上层的头结点
int level = GetMaxHeight() - 1; //获取当前最大的层数,从0开始,所以减去1
while (true) {
Node* next = x->Next(level); // next数组就是每一层链表的数据,从head开始
if (KeyIsAfterNode(key, next)) { //找到当前的查询的值是否在x后面,如果是,则说明当前的值的后一个值小于查询的值
// Keep searching in this list
x = next; //往右查询。
} else { // 如果x的后面的值大于查询的key,则判断是是否需要记录层数,get中prev是nullptr,所以不需要存储,在insert的时候就需要记录下来
if (prev != nullptr) prev[level] = x;
if (level == 0) { // 如果当前已经查询到最后一层,则返回这个next,此时next因为前面的KeyIsAfterNode 返回的是false,所以要么当前的值的右边为nullptr要么这个值就比key大,而且是紧挨的着的那个大于这个key的值。仅仅在第0层是这个结果
return next;
} else {
// Switch to next list
level--;
}
}
}
}

本身实现上和上文的没有区别,首先从最高层往下便利,其中 Next 方法就是获取到下一层的节点。这个方法还有一个实现就是NoBarrier_Next,两者的区别就是是否允许重排,在 java 中,指令重排是使用 valite 关键字关闭,c++中用下面的方式:(直接使用 leveldb 的代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].store(x, std::memory_order_release);
}

// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}

其中的 memory_order 就是决定当前的指令是否重排:

  1. std::memory_order_acquire:这是一种内存顺序语义,用于 load 操作。它确保在加载 next_[n] 指针之前,所有之前的内存操作(写入和读取)都不会被重排序到加载之后。在多线程环境中,这可以确保读取到 next_[n] 的值是最新的。
  2. std::memory_order_release:这是一种内存顺序语义,用于 store 操作。它确保在存储 xnext_[n] 指针之后,所有之后的内存操作(写入和读取)都不会被重排序到存储之前。在多线程环境中,这可以确保 x 被完全初始化之后再被其他线程访问。
  3. std::memory_order_relaxed:这是一种较弱的内存顺序语义,通常用于 loadstore 操作。它允许更大的重排序自由度,不保证像 acquirerelease 那样的强制顺序。通常在不需要严格的顺序控制时使用,以提高性能。

所以 setNext 值得是当前的获取的 n 是否被后续访问的线程可见,避免了出现多次创建甚至覆盖的问题,但是后面的两个则没有这个限制。

来看个例子:

当前如果有:

1
2
3
4
5
head2--->3->

head1--->3-> 5-> 9->

head0--->3->4->5->7->9->

如果有上文中的 3 层,现在需要查询 8

  1. 首先 x 被复制为 head2 进入循环,然后 x 获取 head 的链表也就是 next 中的数据,此时为 3
  2. 当前查询的 8 大于 3 ,所以将 3 位置 Node 赋值给 x,进入到下次循环
  3. 继续获取 3 后面的值,但是此时 3 后面的值为空,所以 KeyIsAfterNode 返回为 false,进入到 else,判断 prev 是否存储,判断是否已经到了第 0 层,此时否为 false,所以将 level–,找下一层
  4. x 此时获取的是 next 中的第 2 层,也就是 head1 后面的 3,因为 3 小于 8,所以找到了 5,所以将 5 赋值给 x,继续循环
  5. 此时 x 为 5,里面的 next 指向的是 9,也就是 next 此时为 9,9 大于 8。所以不在赋值给 x。而是进入下一层
  6. 进入到 head0,此时 x 为 5,next 为 7,然后 7 小于 8,进入下次循环
  7. 此时 x 为 7,但是 next 为 9,因为是 0 层了,所以直接返回 next

也就是说这个方法返回的值和他的名字一样,和当前值相等或者比当前值大的第一个数。在 Iterator 中 seek 的值会存储在本身的 node_ 属性中,需要进一步进行判断。如果这个值为 nullptr,则 Iterator 中的 Valid 为 false。

插入

查询也使用到了上面的 FindGreaterOrEqual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));

int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}

x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
const Key& key, int height) {
char* const node_memory = arena_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
return new (node_memory) Node(key);
}

插入过程中,首先是找到已经存在的 key,或者这个 key 的后缀节点,需要注意的是这个 key 再 leveldb 中时不允许在一个一模一样的 key 已经存在然后继续插入的。leveldb 中的每次插入都包含了一个序列,所以这个 key 一般不会一样,我能想到的完全一样的情况感觉就是 sequence 的值获取是出现问题,导致一个 key 拥有了相同的 sequence,这个后面再讨论。

然后就是获取到一个随机的高度,如果当前的高度超过了现在最大的高度(random 不会返回超过 12 的高度),就需要将在获取 key 的时候存储的 prev 节点添加上新的 head 节点。然后更新高度。将当前的 key 和 height 封装为一个新的 Node,主要是开辟内存空间。然后就是填充各个层的数据

删除

leveldb 本身的删除就是一个添加墓碑标记的删除,后续文件合并才会真正的删除,所以 Skiplist 中也没有删除。

迭代方式

SkipList 的迭代器中的方法有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool Valid() const; // 查询的结果是否是nullptr
// Advances to the next position.
// REQUIRES: Valid()
void Next(); // 找到查询值的后续节点

// Advances to the previous position.
// REQUIRES: Valid()
void Prev();// 查询值的前序节点

// Advance to the first entry with a key >= target
void Seek(const Key& target); // 获取到key>=target的值

// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst(); // 最小值

// Position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToLast(); // 最大值

其中 Valid 、 Next 和 Seek 已经大概说过了。主要就是一个 Prev,SeekToFirst 和 SeekToLast。

SeekToLast 的方法比较简单,就是从 level 为 0 一直遍历到最最后面。SeekToFirst 实现就是找到第一个,也就是获取 level 0 的第一个值。所以这个两个值可以算得上是获取最大最小值。

Prev 则是找到当前查询值小的最后的一个值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
if (next == nullptr || compare_(next->key, key) >= 0) {
if (level == 0) {
return x;
} else {
// Switch to next list
level--;
}
} else {
x = next;
}
}
}

核心实现就是上面的 FindLessThan,可以看到和前面的 FindGreaterOrEqual 类似,FindGreaterOrEqual 已经很详细的说过,这里就不赘述了。

总结

leveldb 的 skiplist 的实现是比较经典的实现。只是里面的获取为什么需要通过 Iterator,而不是直接提供一个 get 方法我不是很理解。