From 94a59726478ba8c767e8dab390d9af205c484a96 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Fri, 26 Aug 2022 12:30:47 +0800 Subject: [PATCH] sst file ingestion --- cozorocks/bridge/db.h | 46 +++++++++++++++++++++++++++++++++++++ cozorocks/src/bridge/db.rs | 35 ++++++++++++++++++++++++++++ cozorocks/src/bridge/mod.rs | 5 ++++ 3 files changed, 86 insertions(+) diff --git a/cozorocks/bridge/db.h b/cozorocks/bridge/db.h index 7a0cd60f..dd12dbba 100644 --- a/cozorocks/bridge/db.h +++ b/cozorocks/bridge/db.h @@ -99,6 +99,22 @@ struct RawRocksDbBridge { } }; +struct SstFileWriterBridge { + SstFileWriter inner; + + SstFileWriterBridge(EnvOptions eopts, Options opts) : inner(eopts, opts) { + } + + inline void finish(RocksDbStatus &status) { + write_status(inner.Finish(), status); + } + + inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) { + write_status(inner.Put(convert_slice(key), convert_slice(val)), status); + } + +}; + struct RocksDbBridge { unique_ptr comparator; unique_ptr options; @@ -108,8 +124,29 @@ struct RocksDbBridge { [[nodiscard]] virtual unique_ptr transact() const = 0; virtual void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const = 0; + virtual void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const = 0; + [[nodiscard]] virtual DB *get_base_db() const = 0; + + + inline unique_ptr get_sst_writer(rust::Str path, RocksDbStatus &status) const { + DB *db_ = get_base_db(); + Options options_ = db_->GetOptions(); + auto sst_file_writer = std::make_unique(EnvOptions(), options_); + string path_(path); + + write_status(sst_file_writer->inner.Open(path_), status); + return sst_file_writer; + } + + inline void ingest_sst(rust::Str path, RocksDbStatus &status) const { + IngestExternalFileOptions ifo; + DB *db_ = get_base_db(); + string path_(path); + write_status(db_->IngestExternalFile({std::move(path_)}, ifo), status); + } + [[nodiscard]] inline const string &get_db_path() const { return db_path; } @@ -125,6 +162,7 @@ struct OptimisticRocksDb : public RocksDbBridge { } void del_range(RustBytes, RustBytes, RocksDbStatus &status) const override; + void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const override { CompactRangeOptions options; auto start_s = convert_slice(start); @@ -133,6 +171,10 @@ struct OptimisticRocksDb : public RocksDbBridge { write_status(s, status); } + DB *get_base_db() const override { + return db->GetBaseDB(); + } + virtual ~OptimisticRocksDb(); }; @@ -168,6 +210,10 @@ struct PessimisticRocksDb : public RocksDbBridge { write_status(s, status); } + DB *get_base_db() const override { + return db->GetBaseDB(); + } + virtual ~PessimisticRocksDb(); }; diff --git a/cozorocks/src/bridge/db.rs b/cozorocks/src/bridge/db.rs index bd6417bd..4ef9f8ac 100644 --- a/cozorocks/src/bridge/db.rs +++ b/cozorocks/src/bridge/db.rs @@ -276,6 +276,41 @@ impl RocksDb { Err(status) } } + pub fn get_sst_writer(&self, path: &str) -> Result { + let mut status = RocksDbStatus::default(); + let ret = self.inner.get_sst_writer(path, &mut status); + if status.is_ok() { + Ok(SstWriter { inner: ret }) + } else { + Err(status) + } + } + pub fn ingest_sst_file(&self, path: &str) -> Result<(), RocksDbStatus> { + let mut status = RocksDbStatus::default(); + self.inner.ingest_sst(path, &mut status); + if status.is_ok() { + Ok(()) + } else { + Err(status) + } + } +} + +pub struct SstWriter { + inner: UniquePtr, +} + +impl SstWriter { + #[inline] + pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> { + let mut status = RocksDbStatus::default(); + self.inner.pin_mut().put(key, val, &mut status); + if status.is_ok() { + Ok(()) + } else { + Err(status) + } + } } unsafe impl Send for RocksDb {} diff --git a/cozorocks/src/bridge/mod.rs b/cozorocks/src/bridge/mod.rs index e2eb32b8..71231789 100644 --- a/cozorocks/src/bridge/mod.rs +++ b/cozorocks/src/bridge/mod.rs @@ -157,6 +157,11 @@ pub(crate) mod ffi { upper: &[u8], status: &mut RocksDbStatus, ); + fn get_sst_writer(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus) -> UniquePtr; + fn ingest_sst(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus); + + type SstFileWriterBridge; + fn put(self: Pin<&mut SstFileWriterBridge>, key: &[u8], val: &[u8], status: &mut RocksDbStatus); type TxBridge; // fn get_r_opts(self: Pin<&mut TxBridge>) -> Pin<&mut ReadOptions>;