make db impl simpler

main
Ziyang Hu 2 years ago
parent f619e8f860
commit 07d25e4c7e

@ -70,35 +70,24 @@ shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, boo
}
shared_ptr<RocksDbBridge> db_wrapper = shared_ptr<RocksDbBridge>(nullptr);
if (opts.optimistic) {
auto db = new OptimisticRocksDb();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->comparator.reset(cmp);
OptimisticTransactionDB *txn_db = nullptr;
write_status(OptimisticTransactionDB::Open(*db->options, db->db_path, &txn_db), status);
db->db.reset(txn_db);
db->destroy_on_exit = opts.destroy_on_exit;
db_wrapper.reset(db);
} else {
auto db = new PessimisticRocksDb();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->tdb_opts = make_unique<TransactionDBOptions>();
db->comparator.reset(cmp);
auto db = new RocksDbBridge();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->tdb_opts = make_unique<TransactionDBOptions>();
db->comparator.reset(cmp);
TransactionDB *txn_db = nullptr;
write_status(TransactionDB::Open(*db->options, *db->tdb_opts, db->db_path, &txn_db), status);
db->db.reset(txn_db);
db->destroy_on_exit = opts.destroy_on_exit;
db_wrapper.reset(db);
TransactionDB *txn_db = nullptr;
write_status(TransactionDB::Open(*db->options, *db->tdb_opts, db->db_path, &txn_db), status);
db->db.reset(txn_db);
db->destroy_on_exit = opts.destroy_on_exit;
db_wrapper.reset(db);
}
return db_wrapper;
}
PessimisticRocksDb::~PessimisticRocksDb() {
RocksDbBridge::~RocksDbBridge() {
if (destroy_on_exit && (db != nullptr)) {
cerr << "destroying database on exit: " << db_path << endl;
auto status = db->Close();
@ -112,23 +101,3 @@ PessimisticRocksDb::~PessimisticRocksDb() {
}
}
}
OptimisticRocksDb::~OptimisticRocksDb() {
if (destroy_on_exit) {
cerr << "destroying database on exit: " << db_path << endl;
auto status = db->Close();
if (!status.ok()) {
cerr << status.ToString() << endl;
}
db.reset();
auto status2 = DestroyDB(db_path, *options);
if (!status2.ok()) {
cerr << status.ToString() << endl;
}
}
}
void OptimisticRocksDb::del_range(RustBytes, RustBytes, RocksDbStatus &status) const {
status.code = StatusCode::kInvalidArgument;
status.message = rust::String("cannot call 'del_range' on optimistic db");
}

@ -43,18 +43,12 @@ struct SstFileWriterBridge {
struct RocksDbBridge {
unique_ptr<Comparator> comparator;
unique_ptr<Options> options;
unique_ptr<TransactionDBOptions> tdb_opts;
unique_ptr<TransactionDB> db;
bool destroy_on_exit;
string db_path;
[[nodiscard]] virtual unique_ptr<TxBridge> transact() const = 0;
virtual void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const = 0;
virtual void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const = 0;
[[nodiscard]] virtual DB *get_base_db() const = 0;
inline unique_ptr<SstFileWriterBridge> get_sst_writer(rust::Str path, RocksDbStatus &status) const {
DB *db_ = get_base_db();
Options options_ = db_->GetOptions();
@ -75,44 +69,14 @@ struct RocksDbBridge {
[[nodiscard]] inline const string &get_db_path() const {
return db_path;
}
};
struct OptimisticRocksDb : public RocksDbBridge {
unique_ptr<OptimisticTransactionDB> db;
[[nodiscard]] inline unique_ptr<TxBridge> transact() const override {
auto ret = make_unique<TxBridge>(&*this->db);
ret->o_tx_opts->cmp = &*comparator;
return ret;
}
void del_range(RustBytes, RustBytes, RocksDbStatus &status) const override;
void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const override {
CompactRangeOptions options;
auto start_s = convert_slice(start);
auto end_s = convert_slice(end);
auto s = db->CompactRange(options, &start_s, &end_s);
write_status(s, status);
}
DB *get_base_db() const override {
return db->GetBaseDB();
}
virtual ~OptimisticRocksDb();
};
struct PessimisticRocksDb : public RocksDbBridge {
unique_ptr<TransactionDBOptions> tdb_opts;
unique_ptr<TransactionDB> db;
[[nodiscard]] inline unique_ptr<TxBridge> transact() const override {
[[nodiscard]] inline unique_ptr<TxBridge> transact() const {
auto ret = make_unique<TxBridge>(&*this->db);
return ret;
}
inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const override {
inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const {
WriteBatch batch;
auto s = batch.DeleteRange(db->DefaultColumnFamily(), convert_slice(start), convert_slice(end));
if (!s.ok()) {
@ -127,7 +91,7 @@ struct PessimisticRocksDb : public RocksDbBridge {
write_status(s2, status);
}
void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const override {
void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const {
CompactRangeOptions options;
auto start_s = convert_slice(start);
auto end_s = convert_slice(end);
@ -135,11 +99,11 @@ struct PessimisticRocksDb : public RocksDbBridge {
write_status(s, status);
}
DB *get_base_db() const override {
DB *get_base_db() const {
return db->GetBaseDB();
}
virtual ~PessimisticRocksDb();
~RocksDbBridge();
};
//typedef int8_t (*CmpFn)(RustBytes a, RustBytes b);

Loading…
Cancel
Save