sst file ingestion

main
Ziyang Hu 2 years ago
parent cd7d8ddbdc
commit 94a5972647

@ -99,6 +99,22 @@ struct RawRocksDbBridge {
}
};
struct SstFileWriterBridge {
SstFileWriter inner;
SstFileWriterBridge(EnvOptions eopts, Options opts) : inner(eopts, opts) {
}
inline void finish(RocksDbStatus &status) {
write_status(inner.Finish(), status);
}
inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) {
write_status(inner.Put(convert_slice(key), convert_slice(val)), status);
}
};
struct RocksDbBridge {
unique_ptr<Comparator> comparator;
unique_ptr<Options> options;
@ -108,8 +124,29 @@ struct RocksDbBridge {
[[nodiscard]] virtual unique_ptr<TxBridge> transact() const = 0;
virtual void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const = 0;
virtual void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const = 0;
[[nodiscard]] virtual DB *get_base_db() const = 0;
inline unique_ptr<SstFileWriterBridge> get_sst_writer(rust::Str path, RocksDbStatus &status) const {
DB *db_ = get_base_db();
Options options_ = db_->GetOptions();
auto sst_file_writer = std::make_unique<SstFileWriterBridge>(EnvOptions(), options_);
string path_(path);
write_status(sst_file_writer->inner.Open(path_), status);
return sst_file_writer;
}
inline void ingest_sst(rust::Str path, RocksDbStatus &status) const {
IngestExternalFileOptions ifo;
DB *db_ = get_base_db();
string path_(path);
write_status(db_->IngestExternalFile({std::move(path_)}, ifo), status);
}
[[nodiscard]] inline const string &get_db_path() const {
return db_path;
}
@ -125,6 +162,7 @@ struct OptimisticRocksDb : public RocksDbBridge {
}
void del_range(RustBytes, RustBytes, RocksDbStatus &status) const override;
void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const override {
CompactRangeOptions options;
auto start_s = convert_slice(start);
@ -133,6 +171,10 @@ struct OptimisticRocksDb : public RocksDbBridge {
write_status(s, status);
}
DB *get_base_db() const override {
return db->GetBaseDB();
}
virtual ~OptimisticRocksDb();
};
@ -168,6 +210,10 @@ struct PessimisticRocksDb : public RocksDbBridge {
write_status(s, status);
}
DB *get_base_db() const override {
return db->GetBaseDB();
}
virtual ~PessimisticRocksDb();
};

@ -276,6 +276,41 @@ impl RocksDb {
Err(status)
}
}
pub fn get_sst_writer(&self, path: &str) -> Result<SstWriter, RocksDbStatus> {
let mut status = RocksDbStatus::default();
let ret = self.inner.get_sst_writer(path, &mut status);
if status.is_ok() {
Ok(SstWriter { inner: ret })
} else {
Err(status)
}
}
pub fn ingest_sst_file(&self, path: &str) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.ingest_sst(path, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
}
pub struct SstWriter {
inner: UniquePtr<SstFileWriterBridge>,
}
impl SstWriter {
#[inline]
pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.pin_mut().put(key, val, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
}
unsafe impl Send for RocksDb {}

@ -157,6 +157,11 @@ pub(crate) mod ffi {
upper: &[u8],
status: &mut RocksDbStatus,
);
fn get_sst_writer(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus) -> UniquePtr<SstFileWriterBridge>;
fn ingest_sst(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus);
type SstFileWriterBridge;
fn put(self: Pin<&mut SstFileWriterBridge>, key: &[u8], val: &[u8], status: &mut RocksDbStatus);
type TxBridge;
// fn get_r_opts(self: Pin<&mut TxBridge>) -> Pin<&mut ReadOptions>;

Loading…
Cancel
Save