From 6dbbec65d31342342b2a6cc2349b28dddf1610dd Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Fri, 15 Apr 2022 22:59:39 +0800 Subject: [PATCH] write and delete --- cozo-rocks/include/cozorocks.h | 79 ++++++++++--- cozo-rocks/src/lib.rs | 207 ++++++++++++++++++--------------- src/storage.rs | 5 +- 3 files changed, 175 insertions(+), 116 deletions(-) diff --git a/cozo-rocks/include/cozorocks.h b/cozo-rocks/include/cozorocks.h index 6487f091..a61bd735 100644 --- a/cozo-rocks/include/cozorocks.h +++ b/cozo-rocks/include/cozorocks.h @@ -144,7 +144,8 @@ struct SliceBridge { } }; -void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, int bridge_code); +void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, + int bridge_code); inline void write_status(Status &&rstatus, BridgeStatus &status) { if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace || @@ -153,12 +154,6 @@ inline void write_status(Status &&rstatus, BridgeStatus &status) { } } -// -//struct WriteBatchBridge { -// mutable WriteBatch inner; -// std::vector *handles; -//}; -// struct IteratorBridge { mutable std::unique_ptr inner; @@ -201,6 +196,41 @@ struct IteratorBridge { BridgeStatus status() const; }; + +struct WriteBatchBridge { + mutable WriteBatch inner; + + inline void batch_put_raw( + const ColumnFamilyHandle &cf, + rust::Slice key, + rust::Slice val, + BridgeStatus &status + ) const { + write_status( + inner.Put(const_cast(&cf), + convert_slice(key), + convert_slice(val)), + status + ); + } + + inline void batch_delete_raw( + const ColumnFamilyHandle &cf, + rust::Slice key, + BridgeStatus &status + ) const { + write_status( + inner.Delete(const_cast(&cf), + convert_slice(key)), + status + ); + } +}; + +inline unique_ptr new_write_batch_raw() { + return make_unique(); +} + struct DBBridge { mutable unique_ptr db; mutable unordered_map > handles; @@ -217,13 +247,6 @@ struct DBBridge { } } -// -// inline std::unique_ptr write_batch() const { -// auto wb = std::make_unique(); -// wb->handles = &handles; -// return wb; -// } -// inline void put_raw( const WriteOptionsBridge &options, const ColumnFamilyHandle &cf, @@ -240,6 +263,28 @@ struct DBBridge { ); } + inline void delete_raw( + const WriteOptionsBridge &options, + const ColumnFamilyHandle &cf, + rust::Slice key, + BridgeStatus &status + ) const { + write_status( + db->Delete(options.inner, + const_cast(&cf), + convert_slice(key)), + status + ); + } + + inline void write_raw( + const WriteOptionsBridge &options, + WriteBatchBridge &updates, + BridgeStatus &status + ) const { + write_status(db->Write(options.inner, &updates.inner), status); + } + inline std::unique_ptr get_raw( const ReadOptionsBridge &options, const ColumnFamilyHandle &cf, @@ -286,9 +331,9 @@ struct DBBridge { // When should we call DestroyColumnFamilyHandle? } - inline unique_ptr> get_column_family_names_raw() const { - auto ret = make_unique>(); - for (auto entry : handles) { + inline unique_ptr > get_column_family_names_raw() const { + auto ret = make_unique < vector < string >> (); + for (auto entry: handles) { ret->push_back(entry.first); } return ret; diff --git a/cozo-rocks/src/lib.rs b/cozo-rocks/src/lib.rs index 3e28d3dc..5b4538d5 100644 --- a/cozo-rocks/src/lib.rs +++ b/cozo-rocks/src/lib.rs @@ -101,17 +101,15 @@ mod ffi { type DBBridge; fn open_db_raw(options: &OptionsBridge, path: &CxxString, status: &mut BridgeStatus) -> UniquePtr; fn get_cf_handle_raw(self: &DBBridge, name: &CxxString) -> SharedPtr; + fn write_raw(self: &DBBridge, options: &WriteOptionsBridge, updates: Pin<&mut WriteBatchBridge>, status: &mut BridgeStatus); fn put_raw(self: &DBBridge, options: &WriteOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], status: &mut BridgeStatus); + fn delete_raw(self: &DBBridge, options: &WriteOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus); fn get_raw(self: &DBBridge, options: &ReadOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus) -> UniquePtr; fn iterator_raw(self: &DBBridge, options: &ReadOptionsBridge, cf: &ColumnFamilyHandle) -> UniquePtr; fn create_column_family_raw(self: &DBBridge, options: &OptionsBridge, name: &CxxString, status: &mut BridgeStatus); fn drop_column_family_raw(self: &DBBridge, name: &CxxString, status: &mut BridgeStatus); fn get_column_family_names_raw(self: &DBBridge) -> UniquePtr>; - // fn write_batch(self: &DBBridge) -> UniquePtr; -// -// type WriteBatchBridge; -// type IteratorBridge; fn seek_to_first(self: &IteratorBridge); fn seek_to_last(self: &IteratorBridge); @@ -122,6 +120,11 @@ mod ffi { fn key(self: &IteratorBridge) -> UniquePtr; fn value(self: &IteratorBridge) -> UniquePtr; fn status(self: &IteratorBridge) -> BridgeStatus; + + pub type WriteBatchBridge; + fn new_write_batch_raw() -> UniquePtr; + fn batch_put_raw(self: &WriteBatchBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], status: &mut BridgeStatus); + fn batch_delete_raw(self: &WriteBatchBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus); } } @@ -268,47 +271,6 @@ fn get_path_bytes(path: &std::path::Path) -> &[u8] { { path.to_string_lossy().to_string().as_bytes() } } -// -// #[inline] -// pub fn write_batch(&self) -> UniquePtr { -// self.bridge.write_batch() -// } -// -// // #[inline] -// // pub fn get_column_family_id(&self, name: impl AsRef) -> Result, Status> { -// // let handles = self.cf_map.read() -// // .map_err(|_| Status::bridge(StatusBridgeCode::LOCK_ERROR))?; -// // Ok(handles.get(name.as_ref()).copied()) -// // } -// -// // #[inline] -// // pub fn create_column_family(&self, name: impl AsRef) -> Result<(), Status> { -// // let mut s = Status::default(); -// // let mut cf_map = self.cf_map.write() -// // .map_err(|_| Status::bridge(StatusBridgeCode::LOCK_ERROR))?; -// // let mut cfs = self.cfs.write() -// // .map_err(|_| Status::bridge(StatusBridgeCode::LOCK_ERROR))?; -// // let v = self.bridge.create_column_family(&self.options, name.as_ref(), &mut s); -// // if v > 0 { -// // assert_eq!(v as usize, cfs.len()); -// // cf_map.insert(name.as_ref().to_string(), v as usize); -// // cfs.push(name.as_ref().to_string()); -// // Ok(()) -// // } else { -// // Err(s) -// // } -// // } -// -// // #[inline] -// // pub fn drop_column_family(&self, _name: impl AsRef) -> Result<(), Status> { -// // unimplemented!() -// // } -// -// pub fn destroy_data(self) -> Result<(), Status> { -// unimplemented!() -// } -// } -// impl Default for BridgeStatus { #[inline] fn default() -> Self { @@ -321,28 +283,6 @@ impl Default for BridgeStatus { } } -impl BridgeStatus { - #[inline] - fn bridge(c: StatusBridgeCode) -> Self { - Self { - code: StatusCode::kMaxCode, - subcode: StatusSubCode::kMaxSubCode, - severity: StatusSeverity::kMaxSeverity, - bridge_code: c, - } - } -} - -pub trait DBRead { - fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) - -> Result>; -} - -pub trait DBWrite { - fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) - -> Result; -} - pub struct DB { inner: UniquePtr, pub options: Options, @@ -350,35 +290,6 @@ pub struct DB { pub default_write_options: WriteOptions, } -impl DBRead for DB { - #[inline] - fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result> { - let mut status = BridgeStatus::default(); - let slice = self.inner.get_raw(options.unwrap_or(&self.default_read_options), cf, key.as_ref(), &mut status); - match status.code { - StatusCode::kOk => Ok(Some(PinnableSlice(slice))), - StatusCode::kNotFound => Ok(None), - _ => Err(status) - } - } -} - -impl DBWrite for DB { - #[inline] - fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result { - let mut status = BridgeStatus::default(); - self.inner.put_raw(options.unwrap_or(&self.default_write_options), cf, - key.as_ref(), val.as_ref(), - &mut status); - if status.code == StatusCode::kOk { - Ok(status) - } else { - Err(status) - } - } -} - - pub trait DBImpl { fn open(options: Options, path: &Path) -> Result; fn get_cf_handle(&self, name: impl AsRef) -> Result; @@ -386,6 +297,14 @@ pub trait DBImpl { fn create_column_family(&self, name: impl AsRef) -> Result<()>; fn drop_column_family(&self, name: impl AsRef) -> Result<()>; fn get_column_family_names(&self) -> Vec; + fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) + -> Result>; + fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) + -> Result; + fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) + -> Result; + fn write(&self, updates: WriteBatch, options: Option<&WriteOptions>) -> Result; + } impl DBImpl for DB { @@ -418,7 +337,7 @@ impl DBImpl for DB { code: StatusCode::kMaxCode, subcode: StatusSubCode::kMaxSubCode, severity: StatusSeverity::kSoftError, - bridge_code: StatusBridgeCode::NOT_FOUND_ERROR + bridge_code: StatusBridgeCode::NOT_FOUND_ERROR, }) } else { Ok(ret) @@ -455,4 +374,98 @@ impl DBImpl for DB { fn get_column_family_names(&self) -> Vec { self.inner.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect() } + + #[inline] + fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result> { + let mut status = BridgeStatus::default(); + let slice = self.inner.get_raw(options.unwrap_or(&self.default_read_options), cf, key.as_ref(), &mut status); + match status.code { + StatusCode::kOk => Ok(Some(PinnableSlice(slice))), + StatusCode::kNotFound => Ok(None), + _ => Err(status) + } + } + + #[inline] + fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result { + let mut status = BridgeStatus::default(); + self.inner.put_raw(options.unwrap_or(&self.default_write_options), cf, + key.as_ref(), val.as_ref(), + &mut status); + if status.code == StatusCode::kOk { + Ok(status) + } else { + Err(status) + } + } + + #[inline] + fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result { + let mut status = BridgeStatus::default(); + self.inner.delete_raw(options.unwrap_or(&self.default_write_options), cf, + key.as_ref(), + &mut status); + if status.code == StatusCode::kOk { + Ok(status) + } else { + Err(status) + } + } + + #[inline] + fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result { + let mut status = BridgeStatus::default(); + self.inner.write_raw(options.unwrap_or(&self.default_write_options), + updates.pin_mut(), + &mut status); + if status.code == StatusCode::kOk { + Ok(status) + } else { + Err(status) + } + } +} + +pub type WriteBatch = UniquePtr; + +pub trait WriteBatchWrapperImp { + fn default() -> WriteBatch; +} + +impl WriteBatchWrapperImp for WriteBatch { + fn default() -> WriteBatch { + new_write_batch_raw() + } +} + +pub trait WriteBatchImpl { + fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result; + fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result; +} + +impl WriteBatchImpl for WriteBatchBridge { + #[inline] + fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result { + let mut status = BridgeStatus::default(); + self.batch_put_raw(cf, + key.as_ref(), val.as_ref(), + &mut status); + if status.code == StatusCode::kOk { + Ok(status) + } else { + Err(status) + } + } + #[inline] + fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result { + let mut status = BridgeStatus::default(); + self.batch_delete_raw(cf, + key.as_ref(), + &mut status); + if status.code == StatusCode::kOk { + Ok(status) + } else { + Err(status) + } + } } \ No newline at end of file diff --git a/src/storage.rs b/src/storage.rs index ab94bca8..3f3cc568 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -58,7 +58,6 @@ impl Storage { #[cfg(test)] mod tests { use std::str::from_utf8; - use cozo_rocks::DBImpl; use crate::value::{ByteArrayBuilder, cozo_comparator_v1, Value}; #[test] @@ -87,7 +86,9 @@ mod tests { println!("before anything {}", val.is_none()); db.put(&key, "A motherfucking value!!! 👋👋👋", &cf, None).unwrap(); - db.put(&key2, "Another motherfucking value!!! 👋👋👋", &cf, None).unwrap(); + let batch = WriteBatch::default(); + batch.put(&key2, "Another motherfucking value!!! 👋👋👋", &cf).unwrap(); + db.write(batch, None).unwrap(); // db.put("Yes man", "A motherfucking value!!! 👋👋👋", None).unwrap(); let val = db.get(&key, &cf, None).unwrap().unwrap(); println!("1 {}", from_utf8(val.as_ref()).unwrap());