Rocksdb 的 MergeOperator 简单使用记录

news/2024/7/3 17:09:54

本篇仅仅是一个记录 MergeOperator 的使用方式。
Rocksdb 使用MergeOperator 来代替Update 场景中的读改写操作,即用户的一个Update 操作需要调用rocksdb的 Get + Put 接口才能完成。
而这种情况下会引入一些额外的读写放大,对于支持SQL这种update 频繁的场景来说实在是不划算,所以Merge 操作横空出世,用户只需要实现自己的 Merge 操作,通过option 传入,接下来有update 的场景时只需要调用一个Merge 就可以完成了,后续针对当前key的 real update 都会在后台Compaction 以及 用户调研 Get 或者 迭代器操作 时会进行合并。当然,Merge本身也存在问题,就是如果kMergeType得不到及时得compaction 调度,那可能读得负载就重了,因为读需要将之前的未Merge 都进行Merge 才能返回。

因为MergeOperator虚基类 的函数太多了,会区分 full merge 和 partial merge,但是对于很多用户来说就是一个计数累加或者 string-append 操作,并没有过于复杂的操作,所以rocksdb 提供了 更为通用的虚基类AssociativeMergeOperator来屏蔽复杂的Full merge 和 partial merge,继承这个类则只需要主体实现一个MergeName函数即可。

如下代码使用了Rocksdb 已经封装好的两个 Merge操作,一个是StringAppendOperator ,另一个是UInt64AddOperatorMerge本身就是写一个key/value,只不过key的type是kMergeType,value 也是实际存在的。

StringAppendOperator

StringAppend的简单测试代码如下,我们使用同一个key进行两次Merge操作,相当于写入了两条kMergeType的key到db中,然后调用一次Flush,会生成一个sst文件(进行Merge),再Get会发现这个key的value 按照StringAppend中的行为完成了Merge。

#include <iostream>
#include <vector>#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/trace_reader_writer.h>
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;void OpenDB() {option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));// 默认会用 逗号分隔 不同的mergeoption.merge_operator = MergeOperators::CreateStringAppendOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) {cout << "open faled :  " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() {int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;EncodeFixed64(buf, 2);s = db->Merge(rocksdb::WriteOptions(),key, "2");s = db->Merge(rocksdb::WriteOptions(),key, "3");db->Flush(rocksdb::FlushOptions());if (!s.ok()) {cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) {cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << value << endl;
}int main() {OpenDB();DoWrite();return 0;
}

输出如下:

Finish open !
Finish merge !
Get merge value len: 3 data: 2,3

可以看到Get到的value 已经进行合并了。

UInt64AddOperator

这个是一个自增计数的Merge 案例。
需要主要的是如果自己实现 MergeOperator底层有编解码,那上层用户侧请求的写入也需要 编码方式写入 以及 按照底层的解码方式读取。
Rocksdb实现的案例代码在拿到用户传入的value的时候会进行编解码:

// A 'model' merge operator with uint64 addition semantics
// Implemented as an AssociativeMergeOperator for simplicity and example.
class UInt64AddOperator : public AssociativeMergeOperator {public:bool Merge(const Slice& /*key*/, const Slice* existing_value,const Slice& value, std::string* new_value,Logger* logger) const override {uint64_t orig_value = 0;if (existing_value){// 解码以存在的value,则我们上层调用Merge 写入的时候需要按照Fixed64进行编码orig_value = DecodeInteger(*existing_value, logger);}uint64_t operand = DecodeInteger(value, logger);assert(new_value);new_value->clear();PutFixed64(new_value, orig_value + operand);return true;  // Return true always since corruption will be treated as 0}const char* Name() const override { return "UInt64AddOperator"; }private:// Takes the string and decodes it into a uint64_t// On error, prints a message and returns 0uint64_t DecodeInteger(const Slice& value, Logger* logger) const {uint64_t result = 0;if (value.size() == sizeof(uint64_t)) {result = DecodeFixed64(value.data());} else if (logger != nullptr) {// If value is corrupted, treat it as 0ROCKS_LOG_ERROR(logger, "uint64 value corruption, size: %" ROCKSDB_PRIszt" > %" ROCKSDB_PRIszt,value.size(), sizeof(uint64_t));}return result;}};

案例代码:

#include <iostream>
#include <vector>#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/trace_reader_writer.h>
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;static bool LittleEndian() {int i = 1;return *((char*)(&i));
}inline uint32_t DecodeFixed32(const char* ptr) {if (LittleEndian()) {// Load the raw bytesuint32_t result;memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain loadreturn result;} else {return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));}
}inline uint64_t DecodeFixed64(const char* ptr) {if (LittleEndian()) {// Load the raw bytesuint64_t result;memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain loadreturn result;} else {uint64_t lo = DecodeFixed32(ptr);uint64_t hi = DecodeFixed32(ptr + 4);return (hi << 32) | lo;}
}inline void EncodeFixed64(char* buf, uint64_t value) {if (LittleEndian()) {memcpy(buf, &value, sizeof(value));} else {buf[0] = value & 0xff;buf[1] = (value >> 8) & 0xff;buf[2] = (value >> 16) & 0xff;buf[3] = (value >> 24) & 0xff;buf[4] = (value >> 32) & 0xff;buf[5] = (value >> 40) & 0xff;buf[6] = (value >> 48) & 0xff;buf[7] = (value >> 56) & 0xff;}
}void OpenDB() {option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));option.merge_operator = MergeOperators::CreateUInt64AddOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) {cout << "open faled :  " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() {int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;// 因为底层实现的Uint64AddOperator 会进行编码 以及 解码EncodeFixed64(buf, 2);// 对同一个key ,merge 两个2,则最后Get的时候会变成4s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));db->Flush(rocksdb::FlushOptions());if (!s.ok()) {cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}cout << "Finish merge !" << endl;s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) {cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << DecodeFixed64(value.data()) << endl;
}int main() {OpenDB();DoWrite();return 0;
}

输出如下:

Finish open !
Finish merge !
Get merge value 8 4

http://lihuaxi.xjx100.cn/news/219017.html

相关文章

【408篇】C语言笔记-第九章(数据结构概述)

文章目录第一节&#xff1a;逻辑结构与存储结构1. 逻辑结构2. 存储结构1. 顺序存储2. 链式存储3. 顺序存储与链式存储分析第二节&#xff1a;算法的评价&#xff08;时间复杂度与空间复杂度&#xff09;1. 算法定义2. 时间复杂度3. 空间复杂度第一节&#xff1a;逻辑结构与存储…

PPP和PDP激活是什么区别

From: http://www.mscbsc.com/askpro/question.php?qid16261 ppp相当于链路层协议 socket套接字&#xff0c;对tcp/ip协议的封装、应用 gprs上网首先要设置pdp&#xff0c;接着建立ppp连接&#xff0c;ppp连接建立后&#xff0c;就可以进行tcp/ip传输了&#xff0c; 要进行t…

关于std::string 在 并发场景下 __grow_by_and_replace free was not allocated 的异常问题

使用string时发现了一些坑。 我们知道stl 容器并不是线程安全的&#xff0c;所以在使用它们的过程中往往需要一些同步机制来保证并发场景下的同步更新。 应该踩的坑还是一个不拉的踩了进去&#xff0c;所以还是记录一下吧。 string作为一个容器&#xff0c;随着我们的append 或…

在Blender中创建真实的汽车CGI视觉动画效果

Blender VFX Tutorial Rig & Animate a Realistic Car in Real 大小&#xff1a;1.18G 时长1h 包含项目文件 1280X720 MP4 语言&#xff1a;英语中英文字幕&#xff08;根据原英文字幕机译更准确&#xff09; Blender VFX教程绑定&动画真实的汽车 云桥网络 平台获取教程…

子网掩码

子网掩码 from: http://baike.baidu.com/view/878.htm 子网掩码(subnet mask) 又叫网络掩码、地址掩码、子网络遮罩&#xff0c;它是一种用来指明一个IP地址 的哪些位标识的是主机所在的子网以及哪些位标识的是主机的位掩码。子网掩码不能单独存在&#xff0c;它必须结合…

关于 智能指针 的线程安全问题

先说结论&#xff0c;智能指针都是非线程安全的。 多线程调度智能指针 这里案例使用的是shared_ptr&#xff0c;其他的unique_ptr或者weak_ptr的结果都是类似的&#xff0c;如下多线程调度代码&#xff1a; #include <memory> #include <thread> #include <v…

C++中 public,protected, private 访问标号小结

第一&#xff1a;private, public, protected 访问标号的访问范围。 private&#xff1a; 只能由1.该类中的函数、2.其友元函数访问。 不能被任何其他访问&#xff0c;该类的对象也不能访问。 protected&#xff1a; 可以被1.该类中的函数、2.子类的函数、以及3.其友元函数…

做片子留着备用 超级游戏影视配乐音效库36套合集

做片子留着备用 超级游戏影视配乐音效库36套合集 Epic Stock Media 创造数字媒体产品,改变你听到和看到的音频方式在游戏、电影、电视、演出和世界上任何地方的人们消费媒体。 影视配乐音效素材大全 百度一下 云桥网络 平台huo取 教程&#xff01; 所有采样均为&#xff1a;…