You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
7.1 KiB
C++

2 years ago
//
// Created by Ziyang Hu on 2022/7/3.
//
#include <iostream>
#include <memory>
2 years ago
#include "db.h"
#include "cozorocks/src/bridge/mod.rs.h"
unique_ptr<Options> default_db_options() {
auto options = make_unique<Options>();
options->bottommost_compression = kZSTD;
2 years ago
options->compression = kLZ4Compression;
options->level_compaction_dynamic_level_bytes = true;
options->max_background_compactions = 4;
options->max_background_flushes = 2;
options->bytes_per_sync = 1048576;
options->compaction_pri = kMinOverlappingRatio;
BlockBasedTableOptions table_options;
table_options.block_size = 16 * 1024;
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.format_version = 5;
auto table_factory = NewBlockBasedTableFactory(table_options);
options->table_factory.reset(table_factory);
return options;
}
shared_ptr<RawRocksDbBridge>
open_raw_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl, bool no_wal) {
auto options = default_db_options();
if (opts.prepare_for_bulk_load) {
options->PrepareForBulkLoad();
}
if (opts.increase_parallelism > 0) {
options->IncreaseParallelism(opts.increase_parallelism);
}
if (opts.optimize_level_style_compaction) {
options->OptimizeLevelStyleCompaction();
}
options->create_if_missing = opts.create_if_missing;
options->paranoid_checks = opts.paranoid_checks;
if (opts.enable_blob_files) {
options->enable_blob_files = true;
options->min_blob_size = opts.min_blob_size;
options->blob_file_size = opts.blob_file_size;
options->enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
}
if (opts.use_bloom_filter) {
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(opts.bloom_filter_bits_per_key, false));
table_options.whole_key_filtering = opts.bloom_filter_whole_key_filtering;
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
}
if (opts.use_capped_prefix_extractor) {
options->prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
}
if (opts.use_fixed_prefix_extractor) {
options->prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
}
RustComparator *cmp = nullptr;
if (use_cmp) {
cmp = new RustComparator(
string(opts.comparator_name),
opts.comparator_different_bytes_can_be_equal,
cmp_impl);
options->comparator = cmp;
}
shared_ptr<RawRocksDbBridge> db_wrapper = shared_ptr<RawRocksDbBridge>(nullptr);
auto db = new RawRocksDbBridge();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->comparator.reset(cmp);
DB *db_ptr = nullptr;
write_status(DB::Open(*db->options, db->db_path, &db_ptr), status);
db->db.reset(db_ptr);
db->destroy_on_exit = opts.destroy_on_exit;
db->r_opts = std::make_unique<ReadOptions>();
db->w_opts = std::make_unique<WriteOptions>();
if (no_wal) {
db->w_opts->disableWAL = true;
}
db_wrapper.reset(db);
return db_wrapper;
}
2 years ago
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl) {
auto options = default_db_options();
2 years ago
if (opts.prepare_for_bulk_load) {
options->PrepareForBulkLoad();
}
if (opts.increase_parallelism > 0) {
options->IncreaseParallelism(opts.increase_parallelism);
}
if (opts.optimize_level_style_compaction) {
options->OptimizeLevelStyleCompaction();
}
options->create_if_missing = opts.create_if_missing;
options->paranoid_checks = opts.paranoid_checks;
if (opts.enable_blob_files) {
options->enable_blob_files = true;
options->min_blob_size = opts.min_blob_size;
options->blob_file_size = opts.blob_file_size;
options->enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
}
if (opts.use_bloom_filter) {
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(opts.bloom_filter_bits_per_key, false));
table_options.whole_key_filtering = opts.bloom_filter_whole_key_filtering;
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
}
if (opts.use_capped_prefix_extractor) {
options->prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
}
if (opts.use_fixed_prefix_extractor) {
options->prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
}
RustComparator *cmp = nullptr;
2 years ago
if (use_cmp) {
2 years ago
cmp = new RustComparator(
string(opts.comparator_name),
opts.comparator_different_bytes_can_be_equal,
2 years ago
cmp_impl);
2 years ago
options->comparator = cmp;
}
2 years ago
shared_ptr<RocksDbBridge> db_wrapper = shared_ptr<RocksDbBridge>(nullptr);
2 years ago
if (opts.optimistic) {
auto db = new OptimisticRocksDb();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->comparator.reset(cmp);
OptimisticTransactionDB *txn_db = nullptr;
write_status(OptimisticTransactionDB::Open(*db->options, db->db_path, &txn_db), status);
db->db.reset(txn_db);
db->destroy_on_exit = opts.destroy_on_exit;
db_wrapper.reset(db);
} else {
auto db = new PessimisticRocksDb();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->tdb_opts = make_unique<TransactionDBOptions>();
db->comparator.reset(cmp);
TransactionDB *txn_db = nullptr;
write_status(TransactionDB::Open(*db->options, *db->tdb_opts, db->db_path, &txn_db), status);
db->db.reset(txn_db);
db->destroy_on_exit = opts.destroy_on_exit;
db_wrapper.reset(db);
}
return db_wrapper;
}
PessimisticRocksDb::~PessimisticRocksDb() {
if (destroy_on_exit && (db != nullptr)) {
2 years ago
cerr << "destroying database on exit: " << db_path << endl;
auto status = db->Close();
if (!status.ok()) {
cerr << status.ToString() << endl;
}
db.reset();
auto status2 = DestroyDB(db_path, *options);
if (!status2.ok()) {
cerr << status2.ToString() << endl;
2 years ago
}
}
}
OptimisticRocksDb::~OptimisticRocksDb() {
if (destroy_on_exit) {
cerr << "destroying database on exit: " << db_path << endl;
auto status = db->Close();
if (!status.ok()) {
cerr << status.ToString() << endl;
}
db.reset();
auto status2 = DestroyDB(db_path, *options);
if (!status2.ok()) {
cerr << status.ToString() << endl;
}
}
2 years ago
}
void OptimisticRocksDb::del_range(RustBytes, RustBytes, RocksDbStatus &status) const {
status.code = StatusCode::kInvalidArgument;
status.message = rust::String("cannot call 'del_range' on optimistic db");
2 years ago
}