LevelDB源码分析之九:env

网友投稿 862 2022-12-01

LevelDB源码分析之九:env

LevelDB源码分析之九:env

虚基类env,在env_posix.cc中,派生类PosixEnv继承自env类,是LevelDB的默认实现。虚基类WritableFile、SequentialFile、RandomAccessFile,分别是文件的写抽象类,顺序读抽象类和随机读抽象类类Logger,log文件的写入接口,log文件是防止系统异常终止造成数据丢失,是memtable在磁盘的备份类FileLock,为文件上锁WriteStringToFile、ReadFileToString、Log三个全局函数,封装了上述接口

下面来看看env_posix.cc中为我们写好的默认实现

顺序读:

class PosixSequentialFile: public SequentialFile { private: std::string filename_; FILE* file_; public: PosixSequentialFile(const std::string& fname, FILE* f) : filename_(fname), file_(f) { } virtual ~PosixSequentialFile() { fclose(file_); } // 从文件中读取n个字节存放到 "scratch[0..n-1]", 然后将"scratch[0..n-1]"转化为Slice类型并存放到*result中 // 如果正确读取,则返回OK status,否则返回non-OK status virtual Status Read(size_t n, Slice* result, char* scratch) { Status s;#ifdef BSD // fread_unlocked doesn't exist on FreeBSD size_t r = fread(scratch, 1, n, file_);#else // size_t fread_unlocked(void *ptr, size_t size, size_t n,FILE *stream); // ptr:用于接收数据的内存地址 // size:要读的每个数据项的字节数,单位是字节 // n:要读n个数据项,每个数据项size个字节 // stream:输入流 // 返回值:返回实际读取的数据大小 // 因为函数名带了"_unlocked"后缀,所以它不是线程安全的 size_t r = fread_unlocked(scratch, 1, n, file_);#endif // Slice的第二个参数要用实际读到的数据大小,因为读到文件尾部,剩下的字节数可能小于n *result = Slice(scratch, r); if (r < n) { if (feof(file_)) { // We leave status as ok if we hit the end of the file // 如果r

这就是LevelDB从磁盘顺序读取文件的接口了,用的是C的流文件操作和FILE结构体。需要注意的是Read接口读取文件时不会锁住文件流,因此外部的并发访问需要自行提供并发控制。

随机读:

class PosixRandomAccessFile: public RandomAccessFile { private: std::string filename_; int fd_; mutable boost::mutex mu_; public: PosixRandomAccessFile(const std::string& fname, int fd) : filename_(fname), fd_(fd) { } virtual ~PosixRandomAccessFile() { close(fd_); } // 这里与顺序读的同名函数相比,多了一个参数offset,offset用来指定 // 读取位置距离文件起始位置的偏移量,这样就可以实现随机读了。 virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const { Status s;#ifdef WIN32 // no pread on Windows so we emulate it with a mutex boost::unique_lock lock(mu_); if (::_lseeki64(fd_, offset, SEEK_SET) == -1L) { return Status::IOError(filename_, strerror(errno)); } // int _read(int _FileHandle, void * _DstBuf, unsigned int _MaxCharCount) // _FileHandle:文件描述符 // _DstBuf:保存读取数据的缓冲区 // _MaxCharCount:读取的字节数 // 返回值:成功返回读取的字节数,出错返回-1并设置errno。 int r = ::_read(fd_, scratch, n); *result = Slice(scratch, (r < 0) ? 0 : r); lock.unlock();#else // 在非windows系统上使用pread进行随机读,为何此时不用锁呢?详见下文分析 ssize_t r = pread(fd_, scratch, n, static_cast(offset)); *result = Slice(scratch, (r < 0) ? 0 : r);#endif if (r < 0) { // An error: return a non-ok status s = Status::IOError(filename_, strerror(errno)); } return s; }};

可以看到的是,PosixRandomAccessFile 在非windows系统上使用了 pread 来实现原子的定位加访问功能。常规的随机访问文件的过程可以分为两步,fseek (seek) 定位到访问点,调用 fread (read) 来从特定位置开始访问 FILE* (fd)。然而,这两个操作组合在一起并不是原子的,即 fseek 和 fread 之间可能会插入其他线程的文件操作。相比之下 pread 由系统来保证实现原子的定位和读取组合功能。需要注意的是,pread 操作不会更新文件指针。

需要注意的是,在随机读和顺序读中,分别用fd和FILE *来表示一个文件。文件描述符(file descriptor)是系统层的概念, fd 对应于系统打开文件表里面的一个文件;FILE* 是应用层的概念,其包含了应用层操作文件的数据结构。

顺序写:

class BoostFile : public WritableFile {public: explicit BoostFile(std::string path) : path_(path), written_(0) { Open(); } virtual ~BoostFile() { Close(); }private: void Open() { // we truncate the file as implemented in env_posix // trunc:先将文件中原有的内容清空 // out:为输出(写)而打开文件 // binary:以二进制方式打开文件 file_.open(path_.generic_string().c_str(), std::ios_base::trunc | std::ios_base::out | std::ios_base::binary); written_ = 0; }public: virtual Status Append(const Slice& data) { Status result; file_.write(data.data(), data.size()); if (!file_.good()) { result = Status::IOError( path_.generic_string() + " Append", "cannot write"); } return result; } virtual Status Close() { Status result; try { if (file_.is_open()) { Sync(); // 关闭流时,缓冲区中的数据会自动写入到文件 // 上面调用Sync()强制刷新,是为了确保数据写入,防止数据丢失 file_.close(); } } catch (const std::exception & e) { result = Status::IOError(path_.generic_string() + " close", e.what()); } return result; } virtual Status Flush() { file_.flush(); return Status::OK(); } // 手动刷新(清空输出缓冲区,并把缓冲区内容同步到文件) virtual Status Sync() { Status result; try { Flush(); } catch (const std::exception & e) { result = Status::IOError(path_.string() + " sync", e.what()); } return result; }private: boost::filesystem::path path_; boost::uint64_t written_; std::ofstream file_;};

关于ofstream::flush和ofstream::Close的区别,详见:​​C++之ofstream::flush与ofstream::close​​

文件锁:

class BoostFileLock : public FileLock { public: boost::interprocess::file_lock fl_;};

virtual Status LockFile(const std::string& fname, FileLock** lock) { *lock = NULL; Status result; try { if (!boost::filesystem::exists(fname)) { std::ofstream of(fname, std::ios_base::trunc | std::ios_base::out); } assert(boost::filesystem::exists(fname)); boost::interprocess::file_lock fl(fname.c_str()); BoostFileLock * my_lock = new BoostFileLock(); my_lock->fl_ = std::move(fl); if (my_lock->fl_.try_lock()) *lock = my_lock; else result = Status::IOError("acquiring lock " + fname + " failed"); } catch (const std::exception & e) { result = Status::IOError("lock " + fname, e.what()); } return result; }

virtual Status UnlockFile(FileLock* lock) { Status result; try { BoostFileLock * my_lock = static_cast(lock); my_lock->fl_.unlock(); delete my_lock; } catch (const std::exception & e) { result = Status::IOError("unlock", e.what()); } return result; }

文件的锁操作是调用Boost的锁实现的。加锁是为了防止多进程的并发冲突,如果加锁失败,*lock=NULL,且返回non-OK;如果加锁成功,*lock存放的的是锁的指针,并返回OK。如果进程退出,锁会自动释放,否则用户需要调用UnlockFile显式的释放锁。

std::move是C++11标准库在中提供的一个有用的函数,这个函数的名字具有迷惑性,因为实际上std::move并不能移动任何东西,它唯一的功能是将一个左值强制转化为右值引用,继而我们可以通过右值引用使用该值,以用于移动语义。从实现上讲,std::move基本等同于一个类型转换:static_cast(lvalue);值得一提的是,被转化的左值,其生命期并没有随着左右值的转化而改变。如果读者期望std::move转化的左值变量lvalue能立即被析构,那么肯定会失望了。左值与右值这两概念是从c中传承而来的,在c中,左值指的是既能够出现在等号左边也能出现在等号右边的变量(或表达式),右值指的则是只能出现在等号右边的变量(或表达式)。

计划任务:

compaction线程。compaction就是压缩合并的意思,在​​LevelDB源码分析之六:skiplist(2)​​中也有提到。对于LevelDB来说,写入记录操作很简单,删除记录仅仅写入一个删除标记就算完事,但是读取记录比较复杂,需要在内存以及各个层级文件中依照新鲜程度依次查找,代价很高。为了加快读取速度,LevelDB采取了compaction的方式来对已有的记录进行整理压缩,通过这种方式,来删除掉一些不再有效的KV数据,减小数据规模,减少文件数量等。

PosixEnv中定义了一个任务队列:

struct BGItem { void* arg; void (*function)(void*); }; //用的是deque双端队列作为底层的数据结构 typedef std::deque BGQueue; BGQueue queue_;

主线程一旦判定需要进行compaction操作,就把compaction任务压进队列queue_中,BGItem是存有任务函数和db对象指针的结构。而后台线程从一开始就不断根据队列中的函数指针执行compaction任务。BGThread()函数就是不停的在queue_中取出函数指针,执行。

后台进程一直执行queue_中的任务,由于queue_是动态的,自然需要考虑queue_空了怎么办,LevelDB采用的是条件变量boost::condition_variable bgsignal_,队列空了就进入等待,直至有新的任务加入进来。而条件变量一般是要和boost::mutex mu_搭配使用,防止某些逻辑错误。

// BGThread函数的包装,里面调用的就是BGThread函数 static void* BGThreadWrapper(void* arg) { reinterpret_cast(arg)->BGThread(); return NULL; }

void PosixEnv::Schedule(void (*function)(void*), void* arg) { boost::unique_lock lock(mu_); // Start background thread if necessary if (!bgthread_) { bgthread_.reset( new boost::thread(boost::bind(&PosixEnv::BGThreadWrapper, this))); } // Add to priority queue // 将任务压进队列中 queue_.push_back(BGItem()); queue_.back().function = function; queue_.back().arg = arg; lock.unlock(); bgsignal_.notify_one();}

void PosixEnv::BGThread() { while (true) { // 加锁,防止并发冲突 boost::unique_lock lock(mu_); // 如果队列为空,等待,直到收到通知(notification) while (queue_.empty()) { bgsignal_.wait(lock); } // 从队列头取出任务的函数及其参数 void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); lock.unlock(); // 调用函数 (*function)(arg); }}

此外PosixEnv中还有FileExists、GetChildren、DeleteFile、CreateDir、DeleteDir、GetFileSize、RenameFile等等函数,他们见名知义,都是调用Boot的相应函数实现的。

EnvWrapper:

在levelDB中还实现了一个EnvWrapper类,该类继承自Env,且只有一个成员函数Env* target_,该类的所有变量都调用Env类相应的成员变量,我们知道,Env是一个抽象类,是不能定义Env 类型的对象的。我们传给EnvWrapper 的构造函数的类型是PosixEnv,所以,最后调用的都是PosixEnv类的成员变量,你可能已经猜到了,这就是设计模式中的代理模式,EnvWrapper只是进行了简单的封装,它的代理了Env的子类PosixEnv。 EnvWrapper和Env与PosixEnv的关系如下:

由于篇幅限制,Env中的Logger类就放在后面分析了,参考:​​LevelDB源码分析之十:LOG文件​​,从env给我的收获就是:

利用虚基类的特性提供了默认的实现,也开放了用户自定义操作的权限面向对象编程范式的学习,把一切操作定义成类文件的加锁解锁,线程的同步C的文件流操作,对文件名的字符提取操作,创建、删除文件和路径,这些都可以直接用到将来自己的项目中

参考链接:​​http://360doc.com/content/14/0325/16/15064667_363619343.shtml​​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:MQTT 服务器(broker)
下一篇:LevelDB源码分析之十二:block
相关文章

 发表评论

暂时没有评论,来抢沙发吧~