write and delete

main
Ziyang Hu 2 years ago
parent f8de57de91
commit 6dbbec65d3

@ -144,7 +144,8 @@ struct SliceBridge {
}
};
void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, int bridge_code);
void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity,
int bridge_code);
inline void write_status(Status &&rstatus, BridgeStatus &status) {
if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace ||
@ -153,12 +154,6 @@ inline void write_status(Status &&rstatus, BridgeStatus &status) {
}
}
//
//struct WriteBatchBridge {
// mutable WriteBatch inner;
// std::vector<ColumnFamilyHandle *> *handles;
//};
//
struct IteratorBridge {
mutable std::unique_ptr <Iterator> inner;
@ -201,6 +196,41 @@ struct IteratorBridge {
BridgeStatus status() const;
};
struct WriteBatchBridge {
mutable WriteBatch inner;
inline void batch_put_raw(
const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key,
rust::Slice<const uint8_t> val,
BridgeStatus &status
) const {
write_status(
inner.Put(const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(key),
convert_slice(val)),
status
);
}
inline void batch_delete_raw(
const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key,
BridgeStatus &status
) const {
write_status(
inner.Delete(const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(key)),
status
);
}
};
inline unique_ptr <WriteBatchBridge> new_write_batch_raw() {
return make_unique<WriteBatchBridge>();
}
struct DBBridge {
mutable unique_ptr <DB> db;
mutable unordered_map <string, shared_ptr<ColumnFamilyHandle>> handles;
@ -217,13 +247,6 @@ struct DBBridge {
}
}
//
// inline std::unique_ptr <WriteBatchBridge> write_batch() const {
// auto wb = std::make_unique<WriteBatchBridge>();
// wb->handles = &handles;
// return wb;
// }
//
inline void put_raw(
const WriteOptionsBridge &options,
const ColumnFamilyHandle &cf,
@ -240,6 +263,28 @@ struct DBBridge {
);
}
inline void delete_raw(
const WriteOptionsBridge &options,
const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key,
BridgeStatus &status
) const {
write_status(
db->Delete(options.inner,
const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(key)),
status
);
}
inline void write_raw(
const WriteOptionsBridge &options,
WriteBatchBridge &updates,
BridgeStatus &status
) const {
write_status(db->Write(options.inner, &updates.inner), status);
}
inline std::unique_ptr <PinnableSliceBridge> get_raw(
const ReadOptionsBridge &options,
const ColumnFamilyHandle &cf,

@ -101,17 +101,15 @@ mod ffi {
type DBBridge;
fn open_db_raw(options: &OptionsBridge, path: &CxxString, status: &mut BridgeStatus) -> UniquePtr<DBBridge>;
fn get_cf_handle_raw(self: &DBBridge, name: &CxxString) -> SharedPtr<ColumnFamilyHandle>;
fn write_raw(self: &DBBridge, options: &WriteOptionsBridge, updates: Pin<&mut WriteBatchBridge>, status: &mut BridgeStatus);
fn put_raw(self: &DBBridge, options: &WriteOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], status: &mut BridgeStatus);
fn delete_raw(self: &DBBridge, options: &WriteOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus);
fn get_raw(self: &DBBridge, options: &ReadOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus) -> UniquePtr<PinnableSliceBridge>;
fn iterator_raw(self: &DBBridge, options: &ReadOptionsBridge, cf: &ColumnFamilyHandle) -> UniquePtr<IteratorBridge>;
fn create_column_family_raw(self: &DBBridge, options: &OptionsBridge, name: &CxxString, status: &mut BridgeStatus);
fn drop_column_family_raw(self: &DBBridge, name: &CxxString, status: &mut BridgeStatus);
fn get_column_family_names_raw(self: &DBBridge) -> UniquePtr<CxxVector<CxxString>>;
// fn write_batch(self: &DBBridge) -> UniquePtr<WriteBatchBridge>;
//
// type WriteBatchBridge;
//
type IteratorBridge;
fn seek_to_first(self: &IteratorBridge);
fn seek_to_last(self: &IteratorBridge);
@ -122,6 +120,11 @@ mod ffi {
fn key(self: &IteratorBridge) -> UniquePtr<SliceBridge>;
fn value(self: &IteratorBridge) -> UniquePtr<SliceBridge>;
fn status(self: &IteratorBridge) -> BridgeStatus;
pub type WriteBatchBridge;
fn new_write_batch_raw() -> UniquePtr<WriteBatchBridge>;
fn batch_put_raw(self: &WriteBatchBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], status: &mut BridgeStatus);
fn batch_delete_raw(self: &WriteBatchBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus);
}
}
@ -268,47 +271,6 @@ fn get_path_bytes(path: &std::path::Path) -> &[u8] {
{ path.to_string_lossy().to_string().as_bytes() }
}
//
// #[inline]
// pub fn write_batch(&self) -> UniquePtr<WriteBatchBridge> {
// self.bridge.write_batch()
// }
//
// // #[inline]
// // pub fn get_column_family_id(&self, name: impl AsRef<str>) -> Result<Option<usize>, Status> {
// // let handles = self.cf_map.read()
// // .map_err(|_| Status::bridge(StatusBridgeCode::LOCK_ERROR))?;
// // Ok(handles.get(name.as_ref()).copied())
// // }
//
// // #[inline]
// // pub fn create_column_family(&self, name: impl AsRef<str>) -> Result<(), Status> {
// // let mut s = Status::default();
// // let mut cf_map = self.cf_map.write()
// // .map_err(|_| Status::bridge(StatusBridgeCode::LOCK_ERROR))?;
// // let mut cfs = self.cfs.write()
// // .map_err(|_| Status::bridge(StatusBridgeCode::LOCK_ERROR))?;
// // let v = self.bridge.create_column_family(&self.options, name.as_ref(), &mut s);
// // if v > 0 {
// // assert_eq!(v as usize, cfs.len());
// // cf_map.insert(name.as_ref().to_string(), v as usize);
// // cfs.push(name.as_ref().to_string());
// // Ok(())
// // } else {
// // Err(s)
// // }
// // }
//
// // #[inline]
// // pub fn drop_column_family(&self, _name: impl AsRef<str>) -> Result<(), Status> {
// // unimplemented!()
// // }
//
// pub fn destroy_data(self) -> Result<(), Status> {
// unimplemented!()
// }
// }
//
impl Default for BridgeStatus {
#[inline]
fn default() -> Self {
@ -321,28 +283,6 @@ impl Default for BridgeStatus {
}
}
impl BridgeStatus {
#[inline]
fn bridge(c: StatusBridgeCode) -> Self {
Self {
code: StatusCode::kMaxCode,
subcode: StatusSubCode::kMaxSubCode,
severity: StatusSeverity::kMaxSeverity,
bridge_code: c,
}
}
}
pub trait DBRead {
fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>)
-> Result<Option<PinnableSlice>>;
}
pub trait DBWrite {
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>)
-> Result<BridgeStatus>;
}
pub struct DB {
inner: UniquePtr<DBBridge>,
pub options: Options,
@ -350,35 +290,6 @@ pub struct DB {
pub default_write_options: WriteOptions,
}
impl DBRead for DB {
#[inline]
fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result<Option<PinnableSlice>> {
let mut status = BridgeStatus::default();
let slice = self.inner.get_raw(options.unwrap_or(&self.default_read_options), cf, key.as_ref(), &mut status);
match status.code {
StatusCode::kOk => Ok(Some(PinnableSlice(slice))),
StatusCode::kNotFound => Ok(None),
_ => Err(status)
}
}
}
impl DBWrite for DB {
#[inline]
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default();
self.inner.put_raw(options.unwrap_or(&self.default_write_options), cf,
key.as_ref(), val.as_ref(),
&mut status);
if status.code == StatusCode::kOk {
Ok(status)
} else {
Err(status)
}
}
}
pub trait DBImpl {
fn open(options: Options, path: &Path) -> Result<DB>;
fn get_cf_handle(&self, name: impl AsRef<str>) -> Result<ColumnFamilyHandle>;
@ -386,6 +297,14 @@ pub trait DBImpl {
fn create_column_family(&self, name: impl AsRef<str>) -> Result<()>;
fn drop_column_family(&self, name: impl AsRef<str>) -> Result<()>;
fn get_column_family_names(&self) -> Vec<String>;
fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>)
-> Result<Option<PinnableSlice>>;
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>)
-> Result<BridgeStatus>;
fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>)
-> Result<BridgeStatus>;
fn write(&self, updates: WriteBatch, options: Option<&WriteOptions>) -> Result<BridgeStatus>;
}
impl DBImpl for DB {
@ -418,7 +337,7 @@ impl DBImpl for DB {
code: StatusCode::kMaxCode,
subcode: StatusSubCode::kMaxSubCode,
severity: StatusSeverity::kSoftError,
bridge_code: StatusBridgeCode::NOT_FOUND_ERROR
bridge_code: StatusBridgeCode::NOT_FOUND_ERROR,
})
} else {
Ok(ret)
@ -455,4 +374,98 @@ impl DBImpl for DB {
fn get_column_family_names(&self) -> Vec<String> {
self.inner.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect()
}
#[inline]
fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result<Option<PinnableSlice>> {
let mut status = BridgeStatus::default();
let slice = self.inner.get_raw(options.unwrap_or(&self.default_read_options), cf, key.as_ref(), &mut status);
match status.code {
StatusCode::kOk => Ok(Some(PinnableSlice(slice))),
StatusCode::kNotFound => Ok(None),
_ => Err(status)
}
}
#[inline]
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default();
self.inner.put_raw(options.unwrap_or(&self.default_write_options), cf,
key.as_ref(), val.as_ref(),
&mut status);
if status.code == StatusCode::kOk {
Ok(status)
} else {
Err(status)
}
}
#[inline]
fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default();
self.inner.delete_raw(options.unwrap_or(&self.default_write_options), cf,
key.as_ref(),
&mut status);
if status.code == StatusCode::kOk {
Ok(status)
} else {
Err(status)
}
}
#[inline]
fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default();
self.inner.write_raw(options.unwrap_or(&self.default_write_options),
updates.pin_mut(),
&mut status);
if status.code == StatusCode::kOk {
Ok(status)
} else {
Err(status)
}
}
}
pub type WriteBatch = UniquePtr<WriteBatchBridge>;
pub trait WriteBatchWrapperImp {
fn default() -> WriteBatch;
}
impl WriteBatchWrapperImp for WriteBatch {
fn default() -> WriteBatch {
new_write_batch_raw()
}
}
pub trait WriteBatchImpl {
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result<BridgeStatus>;
fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result<BridgeStatus>;
}
impl WriteBatchImpl for WriteBatchBridge {
#[inline]
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default();
self.batch_put_raw(cf,
key.as_ref(), val.as_ref(),
&mut status);
if status.code == StatusCode::kOk {
Ok(status)
} else {
Err(status)
}
}
#[inline]
fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default();
self.batch_delete_raw(cf,
key.as_ref(),
&mut status);
if status.code == StatusCode::kOk {
Ok(status)
} else {
Err(status)
}
}
}

@ -58,7 +58,6 @@ impl Storage {
#[cfg(test)]
mod tests {
use std::str::from_utf8;
use cozo_rocks::DBImpl;
use crate::value::{ByteArrayBuilder, cozo_comparator_v1, Value};
#[test]
@ -87,7 +86,9 @@ mod tests {
println!("before anything {}", val.is_none());
db.put(&key, "A motherfucking value!!! 👋👋👋", &cf, None).unwrap();
db.put(&key2, "Another motherfucking value!!! 👋👋👋", &cf, None).unwrap();
let batch = WriteBatch::default();
batch.put(&key2, "Another motherfucking value!!! 👋👋👋", &cf).unwrap();
db.write(batch, None).unwrap();
// db.put("Yes man", "A motherfucking value!!! 👋👋👋", None).unwrap();
let val = db.get(&key, &cf, None).unwrap().unwrap();
println!("1 {}", from_utf8(val.as_ref()).unwrap());

Loading…
Cancel
Save