classMemTable { public: // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator);
// Drop reference count. Delete if no more references exist. voidUnref() { --refs_; assert(refs_ >= 0); if (refs_ <= 0) { delete this; } }
// Returns an estimate of the number of bytes of data in use by this // data structure. It is safe to call when MemTable is being modified. size_tApproximateMemoryUsage();
// Return an iterator that yields the contents of the memtable. // // The caller must ensure that the underlying MemTable remains live // while the returned iterator is live. The keys returned by this // iterator are internal keys encoded by AppendInternalKey in the // db/format.{h,cc} module. Iterator* NewIterator();
// Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. // Typically value will be empty if type==kTypeDeletion. voidAdd(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value);
// If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. // Else, return false. boolGet(const LookupKey& key, std::string* value, Status* s);
// May temporarily unlock and wait. Status status = MakeRoomForWrite(updates == nullptr); // 创建空间用于写入磁盘或者memTable uint64_t last_sequence = versions_->LastSequence(); // 从当前的version中获取到最后使用的sequence Writer* last_writer = &w; //本次写入的writer赋值给last_writer,注意为什么这里明明&w是队头却是lastwriter,是因为在后面的BuildBatchGroup方法中,会将本次批量写入的最后一个writer赋值给他 if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); // 将当前队列中所有的writer里面的数据合并为一次写入 WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);// 设置本次批量写入的sequence,sequence每次写入都是递增的,保证了写入的顺序,也能够进行读取的MVVC last_sequence += WriteBatchInternal::Count(write_batch); // 更新当前的lastSequence,write_batch 中包含了当前数据的大大小 // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { mutex_.Unlock(); // 释放队列锁,此时可以继续写入writers队列了。但是由于当前的writer 没有从队头移除,所以此时仍然等待在 w.cv.Wait();中 status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); // 写入Log文件 bool sync_error = false; if (status.ok() && options.sync) { // 写入成功,是否同步刷盘 status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); // 此处写入mem } mutex_.Lock(); // 再次获取到锁,暂停线程写入writers,注意的是,这个锁的释放是等到本次线程退出方法,调用MutexLock的析构函数达到释放锁的目的 if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (write_batch == tmp_batch_) tmp_batch_->Clear(); // 清理tmp_batch versions_->SetLastSequence(last_sequence);// 设置sequence 到version中 }
while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; } // 依次唤醒本次写入的后续writer,此时会从上面的while中继续调用,如果是头节点,而且已经被写入则直接返回,否则就继续执行上面的代码,该循环一直到本次写入的最后一个writer位置
// Notify new head of write queue if (!writers_.empty()) { // 唤醒下一次调用 writers_.front()->cv.Signal(); }
Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); // 确认当前的线程获取到了锁 assert(!writers_.empty()); // 有writer 操作 bool allow_delay = !force; // 是否运行缓冲,默认是1ms Status s; // 返回的状态 while (true) { if (!bg_error_.ok()) { // 这个bg_error是后台合并level0 的时候的一个操作 // Yield previous error s = bg_error_; break; //1 如果允许等待(正常写入可以等待。force==updates==nullptr),并且当前的0层 // 文件触发了需要等待的条件(0 层文件大于等于8) } elseif (allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each // individual write by 1ms to reduce latency variance. Also, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. mutex_.Unlock();// 首先会释放锁,因为此时会等待操作进行完成,没必要不让后续的写入进入 env_->SleepForMicroseconds(1000);// 等到1ms allow_delay = false; // Do not delay a single write more than once,每次写入最多运行等待一次 mutex_.Lock(); // 加锁,说明要开始干活了 //2 如果当前的内存足够,而且level0 的文件数量没有超过最大,说明有足够的内存和文件,直接返回stats // write_buffer_size 大小为4MB,也就是说一个内存文件大小一般在大于4MB的时候就需要切换了 } elseif (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // There is room in current memtable break; //3 如果正在执行内存文件的合并,则等待内存文件合并完成 } elseif (imm_ != nullptr) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. Log(options_.info_log, "Current memtable full; waiting...\n"); background_work_finished_signal_.Wait(); //4 如果有太多的level0层文件(默认12)。则等待文件合并完成 } elseif (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. Log(options_.info_log, "Too many L0 files; waiting...\n"); background_work_finished_signal_.Wait(); } else { //5 如果当前的文件数量小于8,内存资源不够,而且没有进行合并,则说明需要创建一个新的内存文件 // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); // 文件的名称也就是num 也是version提供的 uint64_t new_log_number = versions_->NewFileNumber(); WritableFile* lfile = nullptr; // 创建可写文件,创建失败则说明岗前的num可以重复使用 s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); if (!s.ok()) { // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); break; } // 释放当前的Log文件 delete log_; // 关闭当前的Log文件 s = logfile_->Close(); if (!s.ok()) { // We may have lost some data written to the previous log file. // Switch to the new log file anyway, but record as a background // error so we do not attempt any more writes. // // We could perhaps attempt to save the memtable corresponding // to log file and suppress the error if that works, but that // would add more complexity in a critical code path. RecordBackgroundError(s); } // 释放内存 delete logfile_; // 将上面创建的文件复制写Log,成为新的Log日志文件 logfile_ = lfile; // 设置num logfile_number_ = new_log_number; // 将创建的文件赋值给Log中的writer log_ = new log::Writer(lfile); // 将当前mem_ 的指针复制给imm_,说明当前的mem已经准备刷到level0 了。 imm_ = mem_; // 设置是has_imm_ 为true,这里的 memory_order_release 前面说过,就是不允许指令重排 has_imm_.store(true, std::memory_order_release); // 创建新的MemTable,传入当前的比较器 mem_ = new MemTable(internal_comparator_); // 给当前的mem 添加引用 mem_->Ref(); force = false; // Do not force another compaction if have room //尝试调用后台合并 MaybeScheduleCompaction(); } } return s; }
inlinechar* Arena::Allocate(size_t bytes) { // The semantics of what to return are a bit messy if we allow // 0-byte allocations, so we disallow them here (we don't need // them for our internal use). assert(bytes > 0); // 如果当前的内存小于剩下的内存,则直接在剩下的内存中进行分配 if (bytes <= alloc_bytes_remaining_) { char* result = alloc_ptr_; alloc_ptr_ += bytes; alloc_bytes_remaining_ -= bytes; return result; } return AllocateFallback(bytes); }
如果上次开辟的空间未使用大于本次使用的空间,直接使用,否则就新创建空间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
char* Arena::AllocateFallback(size_t bytes) { // 当前的额分配的是否大于1k,如果大于1k直接开通当前的数量 if (bytes > kBlockSize / 4) { // Object is more than a quarter of our block size. Allocate it separately // to avoid wasting too much space in leftover bytes. char* result = AllocateNewBlock(bytes); return result; } // 如果超过1k,创建4k的内存 // We waste the remaining space in the current block. alloc_ptr_ = AllocateNewBlock(kBlockSize); alloc_bytes_remaining_ = kBlockSize;
classWriter { public: // Create a writer that will append data to "*dest". // "*dest" must be initially empty. // "*dest" must remain live while this Writer is in use. explicit Writer(WritableFile* dest);
// Create a writer that will append data to "*dest". // "*dest" must have initial length "dest_length". // "*dest" must remain live while this Writer is in use. Writer(WritableFile* dest, uint64_t dest_length);
private: Status EmitPhysicalRecord(RecordType type, constchar* ptr, size_t length);
WritableFile* dest_; int block_offset_; // Current offset in block
// crc32c values for all supported record types. These are // pre-computed to reduce the overhead of computing the crc of the // record type stored in the header. uint32_t type_crc_[kMaxRecordType + 1]; };
公有域:
AddRecord 方法,用于外部写入 Slice
私有域:
EmitPhysicalRecord 用于写入磁盘
dest_ env 中提供的一个写文件的封装,可以理解位一个已经打开的可以写入的文件
blockoffset 当前 writer 写入的 block 位置
typecrc ,这个是一个数组,里面存储的是当前的 type 对应的 crc,因为 type 是一个常量,不需要每次都计算。
AddRecord
AddRecord 本身的实现主要就是对当前写入 Record 做切割成 Fragement,具体代码如下:
Status Writer::AddRecord(const Slice& slice) { constchar* ptr = slice.data(); size_t left = slice.size();
// Fragment the record if necessary and emit it. Note that if slice // is empty, we still want to iterate once to emit a single // zero-length record Status s; bool begin = true; do { //当前的block中的剩下的值 constint leftover = kBlockSize - block_offset_; assert(leftover >= 0); // 如果小于7 ,则全部填满为0 if (leftover < kHeaderSize) { // Switch to a new block if (leftover > 0) { // Fill the trailer (literal below relies on kHeaderSize being 7) static_assert(kHeaderSize == 7, ""); dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover)); } block_offset_ = 0; }
// Invariant: we never leave < kHeaderSize bytes in a block. assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
constsize_t avail = kBlockSize - block_offset_ - kHeaderSize; // 一个block中的每一个fragment的大小,left为本次写入过程中待写入的数量,avail为可以写入的长度 constsize_t fragment_length = (left < avail) ? left : avail; RecordType type; constbool end = (left == fragment_length); // 如果当前待写入的数据量小于block可以使用的,则说明本次可以作为一个完整的写入 if (begin && end) { type = kFullType; } elseif (begin) { // 如果第一次待写入的数据库大于可以使用的,则需要进行切段,并且标记 type = kFirstType; } elseif (end) { // 再循环中,如果前面的写入都完成了,那么最后可能是写入一个完整的数据,并且将它标记为最后的fragment type = kLastType; } else { // 在循环中,写入第一个后,后面仍然不足够,则需要进行切分为多个,既然不是开始也不是最后,则是处于中间的数据量 type = kMiddleType; } s = EmitPhysicalRecord(type, ptr, fragment_length); ptr += fragment_length; left -= fragment_length; begin = false; } while (s.ok() && left > 0); return s; }
Status Writer::EmitPhysicalRecord(RecordType t, constchar* ptr, size_t length) { assert(length <= 0xffff); // Must fit in two bytes assert(block_offset_ + kHeaderSize + length <= kBlockSize);
// Compute the crc of the record type and the payload. uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length); crc = crc32c::Mask(crc); // Adjust for storage EncodeFixed32(buf, crc);
// Write the header and the payload Status s = dest_->Append(Slice(buf, kHeaderSize)); if (s.ok()) { s = dest_->Append(Slice(ptr, length)); if (s.ok()) { s = dest_->Flush(); } } block_offset_ += kHeaderSize + length; return s; }
首先是拼接头节点,这里不是从头到尾来做的,而是首先将长度和 type 放入,具体的数据结构可以看上面的图中的 fragement 里面的头节点类容:
前文提到,每次写入都会将 Record 写入到磁盘上作为 WAL 日志,WAL 日志的读取只有一个地方会做,就是数据库重启后的恢复动作。但是数据库的恢复动作除了读取 Record 还涉及到很多其他的如版本等的操作。读操作的篇幅里都不会涉及,在后面了 version 的时候会详细说下,所以本文仅仅涉及到读 Record 的操作。
classReader { public: // Interface for reporting errors. classReporter { public: virtual ~Reporter(); // 某些字节可能已经损坏,损坏的 virtual voidCorruption(size_t bytes, const Status& status) = 0; }; // reader 传入的参数是一个SequentialFile,也就是一个顺序读取的对象 Reader(SequentialFile* file, Reporter* reporter, bool checksum, uint64_t initial_offset); Reader(const Reader&) = delete; Reader& operator=(const Reader&) = delete; ~Reader(); // 将当前Record的数据读取到record里面,读取成功则返回true,如果已经读取到本次输入的尾部,则返回false,并且将数据临时存储在scratch 中 // Read the next record into *record. Returns true if read // successfully, false if we hit end of the input. May use // "*scratch" as temporary storage. The contents filled in *record // will only be valid until the next mutating operation on this // reader or the next mutation to *scratch. boolReadRecord(Slice* record, std::string* scratch);
// Returns the physical offset of the last record returned by ReadRecord. // // Undefined before the first call to ReadRecord. uint64_tLastRecordOffset(); private: // 有删减 SequentialFile* const file_; Reporter* const reporter_; boolconst checksum_; char* const backing_store_; Slice buffer_; bool eof_; // Last Read() indicated EOF by returning < kBlockSize
// Offset of the last record returned by ReadRecord. uint64_t last_record_offset_; // Offset of the first location past the end of buffer_. uint64_t end_of_buffer_offset_;
// Offset at which to start looking for the first record to return uint64_tconst initial_offset_; };
上文没有贴完整的代码,私有域中的方法和对象我没有完全贴。因为 Reader 方法本身只是将 Record 从 Log 中读取出来,当然其他如 MANIFEST 的文件其实也是按照 Record 来存储的。但是整体上来说,都是从文件中将 Record 的日志恢复,然后按照类型插入到 Memtable 或者 VersionSet 中。
while (reader.ReadRecord(&record, &scratch) && status.ok()) { if (record.size() < 12) { reporter.Corruption(record.size(), Status::Corruption("log record too small")); continue; } WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) { mem = new MemTable(internal_comparator_); mem->Ref(); } status = WriteBatchInternal::InsertInto(&batch, mem); MaybeIgnoreError(&status); if (!status.ok()) { break; } const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch) - 1; if (last_seq > *max_sequence) { *max_sequence = last_seq; }
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { compactions++; *save_manifest = true; status = WriteLevel0Table(mem, edit, nullptr); mem->Unref(); mem = nullptr; if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. break; } } }
上面是从文件中读取 record 的实现,是一个循环读取的过程,上面的方法介绍里说过,reader.ReadRecord(&record, &scratch) 中的两个传入的参数分别为,如果是 fullFragement,则将值放在 record 中,如果是 first,mid 则放在 scratch 中,一直遇到 end 后放到 record 中。
日志的切换是在db/db_impl.cc中的 MakeRoomForWrite,当当前的资源不住,主要 i 是 mem 的资源不足的时候,就新建一个 log 文件作为本次写入的文件,然后将原来的文件 close,然后将当前 mem 修改为_mem。而且这个文件是递增命名的,所以根据名字就可以进行先后顺序排序,所以也不存在导致删除错误的情况,至于文件的组织后文在探讨 Version 的时候在讨论。
public: explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { //0x7fffffff u表示无符号 2^31-1 二进制 111 1111 1111 1111 1111 1111 1111 1111 将第一位取0 ,因为无符号转化为有符号的时候头部位1 表示为负数 // 所以此处的主要目的是确保传入的s为正数,默认传入的seed初始值为0xdeadbeef 二进制为1101 1110 1010 1101 1011 1110 1110 1111 // Avoid bad seeds. if (seed_ == 0 || seed_ == 2147483647L) { seed_ = 1; } } uint32_tNext() { staticconstuint32_t M = 2147483647L; // 2^31-1 staticconstuint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0 //无符号64位二进制0100 0001 1010 0111 // We are computing // seed_ = (seed_ * A) % M, where M = 2^31-1 // // seed_ must not be zero or M, or else all subsequent computed values // will be zero or M respectively. For all other values, seed_ will end // up cycling through every number in [1,M-1] uint64_t product = seed_ * A; //26696993619177 二进制 High[000000 00000 0000 0000 1100 0010 0011 1] low [111 00000 1101 0010 0011 1100 1110 1001] //这里是计算Mod算法的一个优化,一个64为的数,可以分为高33 位和低31 位的数 // Compute (product % M) using the fact that ((x << 31) % M) == x. product=high<<31+low 又因为product=seed_*A,所以此时product=(high*M+high+low)%M 其中 M = 2^31-1 // 因为seed_ 和A 中,seed_的值在初始化的时候就让他小于2^31-1,A 是固定的16807,所以这两个值都不会大于M的值,所以取余最后的结果就等(high+low)%M=high+low,所以下面的这个计算是获取high和low的值相加,也就得到了seed_ // 但是有一种情况就是product的low为刚好但是 2^31-1,这个时候product=(high*M+high+1*M)%M=high ,但是使用下面的结果会是high+M,因为M&M=M。所以,需要判断是否seed_ 比M大,然后前去M,直接使用high,也确保了seed的值一直小于M // 最后强制转换为32位的值 seed_ = static_cast<uint32_t>((product >> 31) + (product & M)); // High+low [000000 00000 0000 0000 1100 0010 0011 1]+[111 00000 1101 0010 0011 1100 1110 1001] & [111 1111 1111 1111 1111 1111 1111 1111] // The first reduction may overflow by 1 bit, so we may need to // repeat. mod == M is not possible; using > allows the faster // sign-bit-based test. if (seed_ > M) { seed_ -= M; } return seed_; } // 在[0,n-1]之间返回随机数 // Returns a uniformly distributed value in the range [0..n-1] // REQUIRES: n > 0 uint32_tUniform(int n) { return Next() % n; } // n分之一的概率返回true // Randomly returns true ~"1/n" of the time, and false otherwise. // REQUIRES: n > 0 boolOneIn(int n) { return (Next() % n) == 0; } // //首先求得[0,max_log]的一个随机数,然后求得[0,2^maxlog-1]的一个随机数 // Skewed: pick "base" uniformly from range [0,max_log] and then // return "base" random bits. The effect is to pick a number in the // range [0,2^max_log-1] with exponential bias towards smaller numbers. uint32_tSkewed(int max_log) { return Uniform(1 << Uniform(max_log + 1)); } }; }
Node* Next(int n) { assert(n >= 0); // Use an 'acquire load' so that we observe a fully initialized // version of the returned Node. return next_[n].load(std::memory_order_acquire); } voidSetNext(int n, Node* x) { assert(n >= 0); // Use a 'release store' so that anybody who reads through this // pointer observes a fully initialized version of the inserted node. next_[n].store(x, std::memory_order_release); }
// No-barrier variants that can be safely used in a few locations. Node* NoBarrier_Next(int n) { assert(n >= 0); return next_[n].load(std::memory_order_relaxed); } voidNoBarrier_SetNext(int n, Node* x) { assert(n >= 0); next_[n].store(x, std::memory_order_relaxed); }
template <typename Key, classComparator> void SkipList<Key, Comparator>::Insert(const Key& key) { // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual() // here since Insert() is externally synchronized. Node* prev[kMaxHeight]; Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion assert(x == nullptr || !Equal(key, x->key));
int height = RandomHeight(); if (height > GetMaxHeight()) { for (int i = GetMaxHeight(); i < height; i++) { prev[i] = head_; } // It is ok to mutate max_height_ without any synchronization // with concurrent readers. A concurrent reader that observes // the new value of max_height_ will see either the old value of // new level pointers from head_ (nullptr), or a new value set in // the loop below. In the former case the reader will // immediately drop to the next level since nullptr sorts after all // keys. In the latter case the reader will use the new node. max_height_.store(height, std::memory_order_relaxed); }
x = NewNode(key, height); for (int i = 0; i < height; i++) { // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); prev[i]->SetNext(i, x); } } template <typename Key, classComparator> typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode( const Key& key, int height) { char* const node_memory = arena_->AllocateAligned( sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1)); return new (node_memory) Node(key); }
// Multiple threads can invoke const methods on a Slice without // external synchronization, but if any of the threads may call a // non-const method, all threads accessing the same Slice must use // external synchronization.
Returning a Slice is a cheaper alternative to returning a std::string since we do not need to copy potentially large keys and values. In addition, leveldb methods do not return null-terminated C-style strings since leveldb keys and values are allowed to contain '\0' bytes.
// Pointer-based variants of GetVarint... These either store a value // in *v and return a pointer just past the parsed value, or return // nullptr on error. These routines only look at bytes in the range // [p..limit-1] // 传入的待解码char开始指针p,待解码char结束指针,最后值写入的指针v。看方法参数可以看到,一般情况下都是在某个char* 类型的数据上进行顺序读取来获取数据,也就是p 和limit 应该是属于一个char* 的不同位置的指针,他的返回值是指向当前值结尾指针的下一个指针。如果出错,则返回nullptr constchar* GetVarint32Ptr(constchar* p, constchar* limit, uint32_t* v); constchar* GetVarint64Ptr(constchar* p, constchar* limit, uint64_t* v);
在实现上分为两种情况:
当前的值小于 128,即传入的 p 的第一个字节为 0,也就是上文编码中说的,如果首位为 0 ,则说明后续都没有数据了。
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); // 这里就是上文提到的,将此时队列中的write 合并为一个,一起写入 // 将本次写入的seq 写入到batch中 WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); { // 注释A // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. mutex_.Unlock(); //释放掉了写入queue中的 // 写入到日志中 status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { // 如果需要立即刷盘,就将数据刷盘 status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } // 写入到内存中 if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); } mutex_.Lock(); if (sync_error) { RecordBackgroundError(status); } } if (write_batch == tmp_batch_) tmp_batch_->Clear(); // 虽然本次是一次写入,但是每次写的seq 都是新增在seq中的,所以seq 应该是一个递增的值 versions_->SetLastSequence(last_sequence); }