classMemTable { public: // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator);
// Drop reference count. Delete if no more references exist. voidUnref() { --refs_; assert(refs_ >= 0); if (refs_ <= 0) { delete this; } }
// Returns an estimate of the number of bytes of data in use by this // data structure. It is safe to call when MemTable is being modified. size_tApproximateMemoryUsage();
// Return an iterator that yields the contents of the memtable. // // The caller must ensure that the underlying MemTable remains live // while the returned iterator is live. The keys returned by this // iterator are internal keys encoded by AppendInternalKey in the // db/format.{h,cc} module. Iterator* NewIterator();
// Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. // Typically value will be empty if type==kTypeDeletion. voidAdd(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value);
// If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. // Else, return false. boolGet(const LookupKey& key, std::string* value, Status* s);
// May temporarily unlock and wait. Status status = MakeRoomForWrite(updates == nullptr); // 创建空间用于写入磁盘或者memTable uint64_t last_sequence = versions_->LastSequence(); // 从当前的version中获取到最后使用的sequence Writer* last_writer = &w; //本次写入的writer赋值给last_writer,注意为什么这里明明&w是队头却是lastwriter,是因为在后面的BuildBatchGroup方法中,会将本次批量写入的最后一个writer赋值给他 if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); // 将当前队列中所有的writer里面的数据合并为一次写入 WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);// 设置本次批量写入的sequence,sequence每次写入都是递增的,保证了写入的顺序,也能够进行读取的MVVC last_sequence += WriteBatchInternal::Count(write_batch); // 更新当前的lastSequence,write_batch 中包含了当前数据的大大小 // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { mutex_.Unlock(); // 释放队列锁,此时可以继续写入writers队列了。但是由于当前的writer 没有从队头移除,所以此时仍然等待在 w.cv.Wait();中 status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); // 写入Log文件 bool sync_error = false; if (status.ok() && options.sync) { // 写入成功,是否同步刷盘 status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); // 此处写入mem } mutex_.Lock(); // 再次获取到锁,暂停线程写入writers,注意的是,这个锁的释放是等到本次线程退出方法,调用MutexLock的析构函数达到释放锁的目的 if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (write_batch == tmp_batch_) tmp_batch_->Clear(); // 清理tmp_batch versions_->SetLastSequence(last_sequence);// 设置sequence 到version中 }
while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; } // 依次唤醒本次写入的后续writer,此时会从上面的while中继续调用,如果是头节点,而且已经被写入则直接返回,否则就继续执行上面的代码,该循环一直到本次写入的最后一个writer位置
// Notify new head of write queue if (!writers_.empty()) { // 唤醒下一次调用 writers_.front()->cv.Signal(); }
Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); // 确认当前的线程获取到了锁 assert(!writers_.empty()); // 有writer 操作 bool allow_delay = !force; // 是否运行缓冲,默认是1ms Status s; // 返回的状态 while (true) { if (!bg_error_.ok()) { // 这个bg_error是后台合并level0 的时候的一个操作 // Yield previous error s = bg_error_; break; //1 如果允许等待(正常写入可以等待。force==updates==nullptr),并且当前的0层 // 文件触发了需要等待的条件(0 层文件大于等于8) } elseif (allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each // individual write by 1ms to reduce latency variance. Also, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. mutex_.Unlock();// 首先会释放锁,因为此时会等待操作进行完成,没必要不让后续的写入进入 env_->SleepForMicroseconds(1000);// 等到1ms allow_delay = false; // Do not delay a single write more than once,每次写入最多运行等待一次 mutex_.Lock(); // 加锁,说明要开始干活了 //2 如果当前的内存足够,而且level0 的文件数量没有超过最大,说明有足够的内存和文件,直接返回stats // write_buffer_size 大小为4MB,也就是说一个内存文件大小一般在大于4MB的时候就需要切换了 } elseif (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // There is room in current memtable break; //3 如果正在执行内存文件的合并,则等待内存文件合并完成 } elseif (imm_ != nullptr) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. Log(options_.info_log, "Current memtable full; waiting...\n"); background_work_finished_signal_.Wait(); //4 如果有太多的level0层文件(默认12)。则等待文件合并完成 } elseif (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. Log(options_.info_log, "Too many L0 files; waiting...\n"); background_work_finished_signal_.Wait(); } else { //5 如果当前的文件数量小于8,内存资源不够,而且没有进行合并,则说明需要创建一个新的内存文件 // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); // 文件的名称也就是num 也是version提供的 uint64_t new_log_number = versions_->NewFileNumber(); WritableFile* lfile = nullptr; // 创建可写文件,创建失败则说明岗前的num可以重复使用 s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); if (!s.ok()) { // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); break; } // 释放当前的Log文件 delete log_; // 关闭当前的Log文件 s = logfile_->Close(); if (!s.ok()) { // We may have lost some data written to the previous log file. // Switch to the new log file anyway, but record as a background // error so we do not attempt any more writes. // // We could perhaps attempt to save the memtable corresponding // to log file and suppress the error if that works, but that // would add more complexity in a critical code path. RecordBackgroundError(s); } // 释放内存 delete logfile_; // 将上面创建的文件复制写Log,成为新的Log日志文件 logfile_ = lfile; // 设置num logfile_number_ = new_log_number; // 将创建的文件赋值给Log中的writer log_ = new log::Writer(lfile); // 将当前mem_ 的指针复制给imm_,说明当前的mem已经准备刷到level0 了。 imm_ = mem_; // 设置是has_imm_ 为true,这里的 memory_order_release 前面说过,就是不允许指令重排 has_imm_.store(true, std::memory_order_release); // 创建新的MemTable,传入当前的比较器 mem_ = new MemTable(internal_comparator_); // 给当前的mem 添加引用 mem_->Ref(); force = false; // Do not force another compaction if have room //尝试调用后台合并 MaybeScheduleCompaction(); } } return s; }
inlinechar* Arena::Allocate(size_t bytes) { // The semantics of what to return are a bit messy if we allow // 0-byte allocations, so we disallow them here (we don't need // them for our internal use). assert(bytes > 0); // 如果当前的内存小于剩下的内存,则直接在剩下的内存中进行分配 if (bytes <= alloc_bytes_remaining_) { char* result = alloc_ptr_; alloc_ptr_ += bytes; alloc_bytes_remaining_ -= bytes; return result; } return AllocateFallback(bytes); }
如果上次开辟的空间未使用大于本次使用的空间,直接使用,否则就新创建空间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
char* Arena::AllocateFallback(size_t bytes) { // 当前的额分配的是否大于1k,如果大于1k直接开通当前的数量 if (bytes > kBlockSize / 4) { // Object is more than a quarter of our block size. Allocate it separately // to avoid wasting too much space in leftover bytes. char* result = AllocateNewBlock(bytes); return result; } // 如果超过1k,创建4k的内存 // We waste the remaining space in the current block. alloc_ptr_ = AllocateNewBlock(kBlockSize); alloc_bytes_remaining_ = kBlockSize;