写操作会封装成WriteBatch,然后再写MemTable,为防止过量写影响读的效率,会有一定策略来限制写,本篇将分析具体调用流程及详细实现。
DB写即DBImpl::Put转调用DB默认实现的函数进行WriteBatch封装:
// Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; batch.Put(key, value); return Write(opt, &batch); }
WriteBatch从设计上是为支持批量写入操作,默认DB提供的Put接口对单条插入也封装成批量模式,并提供真正的DBImpl::Write执行真正的批量操作,因此leveldb数据库的使用者实际可用Write接口来自定制批量写入模式。
从另一个角度看WriteBatch,也体现了leveldb数据库写操作的事务性,可将相关的多个写操作绑定于同一个WriteBatch做到原子性更新。
WriteBatch持有string变量rep_,按照一定的格式将多条记录写入rep_,具体格式如下:注意当ValueType为kTypeDeletion时,Record中只有Key。
继续调用Write完成真正的写操作:
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { Writer w(&mutex_); w.batch = my_batch; w.sync = options.sync; w.done = false; MutexLock l(&mutex_); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; } // May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == NULL); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != NULL) { // NULL batch is for compactions WriteBatch* updates = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(updates, last_sequence + 1); last_sequence += WriteBatchInternal::Count(updates); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(updates, mem_); } mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (updates == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); } while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; } // Notify new head of write queue if (!writers_.empty()) { writers_.front()->cv.Signal(); } return status; }
先看第7-11行,这段代码主要考虑多线程并发写的情况,多写需要加锁,确保只有一个线程在执行写操作,获取锁后先加入写请求队列,再用while 循环包住block条件和wait调用,这是互斥量和条件变量一块使用的惯用写法,当获得锁的某个写线程完成批量写入操作唤醒等待的线程,某个写线程被唤醒后检查自身done标记是否已写入完成,完成则直接返回写状态结果。
下面看MakeRoomForWrite这个很重要的函数,分析限制写的策略:
// REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; Status s; while (true) { if (!bg_error_.ok()) { // Yield previous error s = bg_error_; break; } else if ( allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each // individual write by 1ms to reduce latency variance. Also, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. mutex_.Unlock(); env_->SleepForMicroseconds(1000); allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // There is room in current memtable break; } else if (imm_ != NULL) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. Log(options_.info_log, "Current memtable full; waiting...\n"); bg_cv_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. Log(options_.info_log, "Too many L0 files; waiting...\n"); bg_cv_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); uint64_t new_log_number = versions_->NewFileNumber(); WritableFile* lfile = NULL; s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); if (!s.ok()) { // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); break; } delete log_; delete logfile_; logfile_ = lfile; logfile_number_ = new_log_number; log_ = new log::Writer(lfile); imm_ = mem_; has_imm_.Release_Store(imm_); mem_ = new MemTable(internal_comparator_); mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); } } return s; }
首先检查锁持有,断言写请求队列不为空。
正常情况下是允许限制写,而传入NULL WriteBatch则标志试图强制做Compaction检查,来看循环检查当前DB状态:
1> allow_delay为true且当前level 0文件数不小于kL0_SlowdownWritesTrigger,将会减速写,解锁同时sleep 1毫秒,置allow_delay为false,该延迟写线程仅允许一次。
2> MemTable未写满,正常继续写。
3> MemTable写满,但imm又不为空,只得等待compaction完成。
4> imm不为空,但level 0文件数达到kL0_StopWritesTrigger,只得等待compaction完成。
5> MemTable写满且imm为空,则创建新的log文件,并作MemTable切换,构造新的MemTable,同时触发Compaction检查,置force标记为false,不应再强制触发。
继续回到Write函数,开启真正的写操作:
1> 批量写以BuildBatchGroup形式做过量写的限制,避免一次写过多。
2> 设置并根据当前批量写数量更新SequenceNumber。
3> 解锁允许新的写请求进入,对本次批量写调用WriteBatchInternal先写log日志再写MemTable。
4> 加锁更新当前版本LastSequence。
5> pop处理写请求队列,置done完成标记,直到本次BuildBatchGroup的最后一个Writer,并唤醒写请求队列的队首。
当前写线程写操作完成。