engin start

main
Ziyang Hu 2 years ago
parent 18205e9b09
commit 3160789922

@ -51,6 +51,10 @@ inline shared_ptr<PinnableSlice> make_shared_pinnable_slice(unique_ptr<PinnableS
return ret;
}
inline shared_ptr<Options> make_shared_options(unique_ptr<Options> o) {
shared_ptr<Options> ret = std::move(o);
return ret;
}
inline Slice convert_slice(rust::Slice<const uint8_t> d) {
return Slice(reinterpret_cast<const char *>(d.data()), d.size());
@ -146,8 +150,8 @@ inline void prepare_for_bulk_load(Options &inner) {
inner.PrepareForBulkLoad();
}
inline void increase_parallelism(Options &inner) {
inner.IncreaseParallelism();
inline void increase_parallelism(Options &inner, uint32_t size) {
inner.IncreaseParallelism(size);
}
inline void optimize_level_style_compaction(Options &inner) {
@ -351,20 +355,6 @@ struct TransactionBridge {
);
}
inline void get_raw(
const ReadOptions &r_ops,
rust::Slice<const uint8_t> key,
PinnableSlice &pinnable_val,
BridgeStatus &status
) const {
write_status(
raw_db->Get(r_ops,
raw_db->DefaultColumnFamily(),
convert_slice(key),
&pinnable_val),
status
);
}
inline void put_txn(
rust::Slice<const uint8_t> key,
@ -374,17 +364,6 @@ struct TransactionBridge {
write_status(inner->Put(convert_slice(key), convert_slice(val)), status);
}
inline void put_raw(
const WriteOptions &raw_w_ops,
rust::Slice<const uint8_t> key,
rust::Slice<const uint8_t> val,
BridgeStatus &status
) const {
auto k = convert_slice(key);
auto v = convert_slice(val);
write_status(raw_db->Put(raw_w_ops, k, v), status);
}
inline void del_txn(
rust::Slice<const uint8_t> key,
BridgeStatus &status
@ -392,61 +371,20 @@ struct TransactionBridge {
write_status(inner->Delete(convert_slice(key)), status);
}
inline void del_raw(
const WriteOptions &raw_w_ops,
rust::Slice<const uint8_t> key,
BridgeStatus &status
) const {
write_status(raw_db->Delete(raw_w_ops, convert_slice(key)), status);
}
inline void del_range_raw(
const WriteOptions &raw_w_ops,
rust::Slice<const uint8_t> start_key,
rust::Slice<const uint8_t> end_key,
BridgeStatus &status
) const {
write_status(
raw_db->GetRootDB()->DeleteRange(
raw_w_ops,
raw_db->DefaultColumnFamily(),
convert_slice(start_key), convert_slice(end_key)),
status);
}
inline void flush_raw(const FlushOptions &options, BridgeStatus &status) const {
write_status(raw_db->Flush(options), status);
}
inline void compact_all_raw(BridgeStatus &status) const {
auto options = CompactRangeOptions();
options.change_level = true;
options.target_level = 0;
options.exclusive_manual_compaction = false;
write_status(raw_db->CompactRange(options,
raw_db->DefaultColumnFamily(),
nullptr, nullptr), status);
}
inline std::unique_ptr<IteratorBridge> iterator_txn(const ReadOptions &r_ops) const {
return std::make_unique<IteratorBridge>(
inner->GetIterator(r_ops));
}
inline std::unique_ptr<IteratorBridge> iterator_raw(const ReadOptions &raw_r_ops) const {
return std::make_unique<IteratorBridge>(
raw_db->NewIterator(raw_r_ops));
}
};
struct TDBBridge {
mutable unique_ptr<StackableDB> db;
mutable unique_ptr<DB> db;
mutable TransactionDB *tdb;
mutable OptimisticTransactionDB *odb;
bool is_odb;
TDBBridge(StackableDB *db_,
TDBBridge(DB *db_,
TransactionDB *tdb_,
OptimisticTransactionDB *odb_) :
db(db_), tdb(tdb_), odb(odb_) {
@ -494,14 +432,107 @@ struct TDBBridge {
ret->inner = unique_ptr<Transaction>(txn);
return ret;
}
inline void close_raw(BridgeStatus &status) const {
write_status(db->Close(), status);
}
inline void get_approximate_sizes_raw(
rust::Slice<const rust::Slice<const uint8_t>> ranges,
rust::Slice<uint64_t> sizes,
BridgeStatus &status) const {
uint64_t n = sizes.size();
vector<Range> cpp_ranges;
cpp_ranges.reserve(n);
for (uint64_t i = 0; i < n; ++i) {
auto x = ranges.at(2 * i);
auto start = convert_slice(x);
auto end = convert_slice(ranges.at(2 * i + 1));
auto rg = Range(start, end);
cpp_ranges.emplace_back(rg);
};
write_status(
db->GetApproximateSizes(db->DefaultColumnFamily(),
cpp_ranges.data(), (int) n, sizes.data()),
status
);
}
inline void del_range_raw(
const WriteOptions &raw_w_ops,
rust::Slice<const uint8_t> start_key,
rust::Slice<const uint8_t> end_key,
BridgeStatus &status
) const {
write_status(
db->GetRootDB()->DeleteRange(
raw_w_ops,
db->DefaultColumnFamily(),
convert_slice(start_key), convert_slice(end_key)),
status);
}
inline void flush_raw(const FlushOptions &options, BridgeStatus &status) const {
write_status(db->Flush(options), status);
}
inline void compact_all_raw(BridgeStatus &status) const {
auto options = CompactRangeOptions();
options.change_level = true;
options.target_level = 0;
options.exclusive_manual_compaction = false;
write_status(db->CompactRange(options,
db->DefaultColumnFamily(),
nullptr, nullptr), status);
}
inline void get_raw(
const ReadOptions &r_ops,
rust::Slice<const uint8_t> key,
PinnableSlice &pinnable_val,
BridgeStatus &status
) const {
write_status(
db->Get(r_ops,
db->DefaultColumnFamily(),
convert_slice(key),
&pinnable_val),
status
);
}
inline void put_raw(
const WriteOptions &raw_w_ops,
rust::Slice<const uint8_t> key,
rust::Slice<const uint8_t> val,
BridgeStatus &status
) const {
auto k = convert_slice(key);
auto v = convert_slice(val);
write_status(db->Put(raw_w_ops, k, v), status);
}
inline void del_raw(
const WriteOptions &raw_w_ops,
rust::Slice<const uint8_t> key,
BridgeStatus &status
) const {
write_status(db->Delete(raw_w_ops, convert_slice(key)), status);
}
inline std::unique_ptr<IteratorBridge> iterator_raw(const ReadOptions &raw_r_ops) const {
return std::make_unique<IteratorBridge>(
db->NewIterator(raw_r_ops));
}
};
inline unique_ptr<TransactionDBOptions> new_tdb_options() {
return make_unique<TransactionDBOptions>();
inline shared_ptr<TransactionDBOptions> new_tdb_options() {
return make_shared<TransactionDBOptions>();
}
inline unique_ptr<OptimisticTransactionDBOptions> new_odb_options() {
return make_unique<OptimisticTransactionDBOptions>();
inline shared_ptr<OptimisticTransactionDBOptions> new_odb_options() {
return make_shared<OptimisticTransactionDBOptions>();
}
inline unique_ptr<FlushOptions> new_flush_options() {
@ -542,4 +573,25 @@ open_odb_raw(const Options &options, const string &path, BridgeStatus &status) {
unordered_map<string, shared_ptr<ColumnFamilyHandle>> handle_map;
return make_shared<TDBBridge>(txn_db, nullptr, txn_db);
}
inline shared_ptr<TDBBridge>
open_db_raw(const Options &options, const string &path, BridgeStatus &status) {
DB *db = nullptr;
write_status(DB::Open(options,
path,
&db), status);
unordered_map<string, shared_ptr<ColumnFamilyHandle>> handle_map;
return make_shared<TDBBridge>(db, nullptr, nullptr);
}
inline void repair_db_raw(const Options &options, const string &path, BridgeStatus &status) {
write_status(RepairDB(path, options), status);
}
inline void destroy_db_raw(const Options &options, const string &path, BridgeStatus &status) {
write_status(DestroyDB(path, options), status);
}

@ -85,8 +85,9 @@ mod ffi {
type Options;
fn new_options() -> UniquePtr<Options>;
fn make_shared_options(o: UniquePtr<Options>) -> SharedPtr<Options>;
fn prepare_for_bulk_load(o: Pin<&mut Options>);
fn increase_parallelism(o: Pin<&mut Options>);
fn increase_parallelism(o: Pin<&mut Options>, n_threads: u32);
fn optimize_level_style_compaction(o: Pin<&mut Options>);
fn set_create_if_missing(o: Pin<&mut Options>, v: bool);
fn set_comparator(o: Pin<&mut Options>, cmp: &RustComparator);
@ -116,9 +117,9 @@ mod ffi {
cmp: &RustComparator,
) -> UniquePtr<OptimisticTransactionOptions>;
type TransactionDBOptions;
fn new_tdb_options() -> UniquePtr<TransactionDBOptions>;
fn new_tdb_options() -> SharedPtr<TransactionDBOptions>;
type OptimisticTransactionDBOptions;
fn new_odb_options() -> UniquePtr<OptimisticTransactionDBOptions>;
fn new_odb_options() -> SharedPtr<OptimisticTransactionDBOptions>;
type FlushOptions;
fn new_flush_options() -> UniquePtr<FlushOptions>;
@ -169,45 +170,12 @@ mod ffi {
slice: Pin<&mut PinnableSlice>,
status: &mut BridgeStatus,
);
fn get_raw(
self: &TransactionBridge,
options: &ReadOptions,
key: &[u8],
slice: Pin<&mut PinnableSlice>,
status: &mut BridgeStatus,
);
fn put_txn(self: &TransactionBridge, key: &[u8], val: &[u8], status: &mut BridgeStatus);
fn put_raw(
self: &TransactionBridge,
options: &WriteOptions,
key: &[u8],
val: &[u8],
status: &mut BridgeStatus,
);
fn del_txn(self: &TransactionBridge, key: &[u8], status: &mut BridgeStatus);
fn del_raw(
self: &TransactionBridge,
options: &WriteOptions,
key: &[u8],
status: &mut BridgeStatus,
);
fn del_range_raw(
self: &TransactionBridge,
options: &WriteOptions,
start_key: &[u8],
end_key: &[u8],
status: &mut BridgeStatus,
);
fn flush_raw(self: &TransactionBridge, options: &FlushOptions, status: &mut BridgeStatus);
fn compact_all_raw(self: &TransactionBridge, status: &mut BridgeStatus);
fn iterator_txn(
self: &TransactionBridge,
r_opts: &ReadOptions,
) -> UniquePtr<IteratorBridge>;
fn iterator_raw(
self: &TransactionBridge,
r_opts: &ReadOptions,
) -> UniquePtr<IteratorBridge>;
type TDBBridge;
fn begin_t_transaction(
@ -232,6 +200,58 @@ mod ffi {
path: &CxxString,
status: &mut BridgeStatus,
) -> SharedPtr<TDBBridge>;
fn get_raw(
self: &TDBBridge,
options: &ReadOptions,
key: &[u8],
slice: Pin<&mut PinnableSlice>,
status: &mut BridgeStatus,
);
fn put_raw(
self: &TDBBridge,
options: &WriteOptions,
key: &[u8],
val: &[u8],
status: &mut BridgeStatus,
);
fn del_raw(
self: &TDBBridge,
options: &WriteOptions,
key: &[u8],
status: &mut BridgeStatus,
);
fn iterator_raw(
self: &TDBBridge,
r_opts: &ReadOptions,
) -> UniquePtr<IteratorBridge>;
fn open_db_raw(
options: &Options,
// txn_options: &OptimisticTransactionDBOptions,
path: &CxxString,
status: &mut BridgeStatus,
) -> SharedPtr<TDBBridge>;
fn del_range_raw(
self: &TDBBridge,
options: &WriteOptions,
start_key: &[u8],
end_key: &[u8],
status: &mut BridgeStatus,
);
fn flush_raw(self: &TDBBridge, options: &FlushOptions, status: &mut BridgeStatus);
fn compact_all_raw(self: &TDBBridge, status: &mut BridgeStatus);
fn get_approximate_sizes_raw(
self: &TDBBridge,
ranges: &[&[u8]],
sizes: &mut [u64],
status: &mut BridgeStatus,
);
fn close_raw(self: &TDBBridge, status: &mut BridgeStatus);
// raw functions
fn destroy_db_raw(options: &Options, path: &CxxString, status: &mut BridgeStatus);
fn repair_db_raw(options: &Options, path: &CxxString, status: &mut BridgeStatus);
}
}

@ -12,6 +12,7 @@ pub use bridge::StatusBridgeCode;
pub use bridge::StatusCode;
pub use bridge::StatusSeverity;
pub use bridge::StatusSubCode;
pub use status::BridgeError;
use cxx::let_cxx_string;
pub use cxx::{SharedPtr, UniquePtr};
use status::Result;
@ -22,7 +23,7 @@ pub struct PinnableSlicePtr(UniquePtr<PinnableSlice>);
impl PinnableSlicePtr {
#[inline]
pub fn pinned_slice(&mut self) -> Pin<&mut PinnableSlice> {
pub fn pin_mut(&mut self) -> Pin<&mut PinnableSlice> {
self.0.pin_mut()
}
@ -42,7 +43,7 @@ impl Default for PinnableSlicePtr {
impl PinnableSlicePtr {
#[inline]
pub fn reset(&mut self) {
reset_pinnable_slice(self.pinned_slice());
reset_pinnable_slice(self.pin_mut());
}
}
@ -85,7 +86,7 @@ pub struct SlicePtr(UniquePtr<Slice>);
impl SlicePtr {
#[inline]
pub fn pinned_slice(&mut self) -> Pin<&mut Slice> {
pub fn pin_mut(&mut self) -> Pin<&mut Slice> {
self.0.pin_mut()
}
@ -217,8 +218,11 @@ impl Deref for TransactionPtr {
}
impl TransactionPtr {
/// # Safety
///
/// Only for testing use, as a placeholder
#[inline]
pub fn null() -> Self {
pub unsafe fn null() -> Self {
TransactionPtr(UniquePtr::null())
}
#[inline]
@ -257,16 +261,12 @@ impl TransactionPtr {
pub fn get(
&self,
options: &ReadOptions,
transact: bool,
key: impl AsRef<[u8]>,
slice: &mut PinnableSlicePtr,
) -> Result<bool> {
let mut status = BridgeStatus::default();
let res = if transact {
self.get_txn(options, key.as_ref(), slice.pinned_slice(), &mut status);
status.check_err(())
} else {
self.get_raw(options, key.as_ref(), slice.pinned_slice(), &mut status);
let res = {
self.get_txn(options, key.as_ref(), slice.pin_mut(), &mut status);
status.check_err(())
};
match res {
@ -283,7 +283,7 @@ impl TransactionPtr {
slice: &mut PinnableSlicePtr,
) -> Result<bool> {
let mut status = BridgeStatus::default();
self.get_for_update_txn(options, key.as_ref(), slice.pinned_slice(), &mut status);
self.get_for_update_txn(options, key.as_ref(), slice.pin_mut(), &mut status);
match status.check_err(()) {
Ok(_) => Ok(true),
Err(e) if e.status.code == StatusCode::kNotFound => Ok(false),
@ -291,70 +291,31 @@ impl TransactionPtr {
}
}
#[inline]
pub fn del(&self, options: &WriteOptions, transact: bool, key: impl AsRef<[u8]>) -> Result<()> {
let mut status = BridgeStatus::default();
if transact {
let ret = self.del_txn(key.as_ref(), &mut status);
status.check_err(ret)
} else {
let ret = self.del_raw(options, key.as_ref(), &mut status);
status.check_err(ret)
}
}
#[inline]
pub fn del_range(
&self,
options: &WriteOptions,
start_key: impl AsRef<[u8]>,
end_key: impl AsRef<[u8]>,
) -> Result<()> {
pub fn del(&self, key: impl AsRef<[u8]>) -> Result<()> {
let mut status = BridgeStatus::default();
let ret = self.del_range_raw(options, start_key.as_ref(), end_key.as_ref(), &mut status);
let ret = self.del_txn(key.as_ref(), &mut status);
status.check_err(ret)
}
#[inline]
pub fn flush(&self, options: FlushOptionsPtr) -> Result<()> {
let mut status = BridgeStatus::default();
self.flush_raw(&options, &mut status);
status.check_err(())
}
#[inline]
pub fn compact_all(&self) -> Result<()> {
let mut status = BridgeStatus::default();
self.compact_all_raw(&mut status);
status.check_err(())
}
#[inline]
pub fn put(
&self,
options: &WriteOptions,
transact: bool,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()> {
let mut status = BridgeStatus::default();
if transact {
let ret = self.put_txn(key.as_ref(), val.as_ref(), &mut status);
status.check_err(ret)
} else {
let ret = self.put_raw(options, key.as_ref(), val.as_ref(), &mut status);
status.check_err(ret)
}
let ret = self.put_txn(key.as_ref(), val.as_ref(), &mut status);
status.check_err(ret)
}
#[inline]
pub fn iterator(&self, options: &ReadOptions, transact: bool) -> IteratorPtr {
if transact {
IteratorPtr(self.iterator_txn(options))
} else {
IteratorPtr(self.iterator_raw(options))
}
pub fn iterator(&self, options: &ReadOptions) -> IteratorPtr {
IteratorPtr(self.iterator_txn(options))
}
}
#[derive(Clone)]
pub struct DBPtr(SharedPtr<TDBBridge>);
pub struct DbPtr(SharedPtr<TDBBridge>);
impl Deref for DBPtr {
impl Deref for DbPtr {
type Target = SharedPtr<TDBBridge>;
#[inline]
@ -363,25 +324,80 @@ impl Deref for DBPtr {
}
}
unsafe impl Send for DBPtr {}
unsafe impl Send for DbPtr {}
unsafe impl Sync for DbPtr {}
impl DbPtr {
/// # Safety
///
/// Only for testing use, as a placeholder
pub unsafe fn null() -> Self {
DbPtr(SharedPtr::null())
}
unsafe impl Sync for DBPtr {}
pub fn open_non_txn(options: &Options,
path: impl AsRef<str>, ) -> Result<Self> {
let_cxx_string!(cname = path.as_ref());
let mut status = BridgeStatus::default();
let ret = open_db_raw(options, &cname, &mut status);
status.check_err(Self(ret))
}
impl DBPtr {
pub fn open(
options: &OptionsPtr,
t_options: &TDBOptions,
options: &Options,
t_options: &TDbOptions,
path: impl AsRef<str>,
) -> Result<Self> {
let_cxx_string!(cname = path.as_ref());
let mut status = BridgeStatus::default();
let ret = match t_options {
TDBOptions::Pessimistic(o) => open_tdb_raw(options, o, &cname, &mut status),
TDBOptions::Optimistic(_o) => open_odb_raw(options, &cname, &mut status),
TDbOptions::Pessimistic(o) => open_tdb_raw(options, o, &cname, &mut status),
TDbOptions::Optimistic(_o) => open_odb_raw(options, &cname, &mut status),
};
status.check_err(Self(ret))
}
#[inline]
pub fn get(
&self,
options: &ReadOptions,
key: impl AsRef<[u8]>,
slice: &mut PinnableSlicePtr,
) -> Result<bool> {
let mut status = BridgeStatus::default();
let res = {
self.get_raw(options, key.as_ref(), slice.pin_mut(), &mut status);
status.check_err(())
};
match res {
Ok(_) => Ok(true),
Err(e) if e.status.code == StatusCode::kNotFound => Ok(false),
Err(e) => Err(e),
}
}
#[inline]
pub fn del(&self, options: &WriteOptions, key: impl AsRef<[u8]>) -> Result<()> {
let mut status = BridgeStatus::default();
let ret = self.del_raw(options, key.as_ref(), &mut status);
status.check_err(ret)
}
#[inline]
pub fn put(
&self,
options: &WriteOptions,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()> {
let mut status = BridgeStatus::default();
let ret = self.put_raw(options, key.as_ref(), val.as_ref(), &mut status);
status.check_err(ret)
}
#[inline]
pub fn iterator(&self, options: &ReadOptions) -> IteratorPtr {
IteratorPtr(self.iterator_raw(options))
}
#[inline]
pub fn make_transaction(
&self,
@ -393,4 +409,68 @@ impl DBPtr {
TransactOptions::Pessimistic(o) => self.begin_t_transaction(write_ops.0, o.0),
})
}
#[inline]
pub fn del_range(
&self,
options: &WriteOptions,
start_key: impl AsRef<[u8]>,
end_key: impl AsRef<[u8]>,
) -> Result<()> {
let mut status = BridgeStatus::default();
let ret = self.del_range_raw(options, start_key.as_ref(), end_key.as_ref(), &mut status);
status.check_err(ret)
}
#[inline]
pub fn flush(&self, options: FlushOptionsPtr) -> Result<()> {
let mut status = BridgeStatus::default();
self.flush_raw(&options, &mut status);
status.check_err(())
}
#[inline]
pub fn compact_all(&self) -> Result<()> {
let mut status = BridgeStatus::default();
self.compact_all_raw(&mut status);
status.check_err(())
}
#[inline]
pub fn get_approximate_sizes<T: AsRef<[u8]>>(&self, ranges: &[(T, T)]) -> Result<Vec<u64>> {
let mut status = BridgeStatus::default();
let n = ranges.len();
let mut ret = vec![0u64; n];
let mut bridge_range = Vec::with_capacity(2 * n);
for (start, end) in ranges {
let start = start.as_ref();
let end = end.as_ref();
bridge_range.push(start);
bridge_range.push(end);
}
self.get_approximate_sizes_raw(&bridge_range, &mut ret, &mut status);
status.check_err(ret)
}
#[inline]
pub fn close(&self) -> Result<()> {
let mut status = BridgeStatus::default();
self.close_raw(&mut status);
status.check_err(())
}
}
pub fn destroy_db(options: &Options, path: impl AsRef<str>) -> Result<()> {
let_cxx_string!(cname = path.as_ref());
let mut status = BridgeStatus::default();
destroy_db_raw(options, &cname, &mut status);
status.check_err(())
}
pub fn repair_db(options: &Options, path: impl AsRef<str>) -> Result<()> {
let_cxx_string!(cname = path.as_ref());
let mut status = BridgeStatus::default();
repair_db_raw(options, &cname, &mut status);
status.check_err(())
}

@ -1,9 +1,13 @@
use crate::bridge::*;
use cxx::UniquePtr;
use std::ops::{Deref, DerefMut};
use cxx::{SharedPtr, UniquePtr};
use std::ops::Deref;
use std::pin::Pin;
pub struct RustComparatorPtr(UniquePtr<RustComparator>);
unsafe impl Send for RustComparatorPtr {}
unsafe impl Sync for RustComparatorPtr {}
impl RustComparatorPtr {
#[inline]
pub fn new(name: &str, cmp: fn(&[u8], &[u8]) -> i8, diff_bytes_can_equal: bool) -> Self {
@ -12,7 +16,7 @@ impl RustComparatorPtr {
}
impl Deref for RustComparatorPtr {
type Target = UniquePtr<RustComparator>;
type Target = RustComparator;
#[inline]
fn deref(&self) -> &Self::Target {
@ -23,7 +27,7 @@ impl Deref for RustComparatorPtr {
pub struct OptionsPtr(UniquePtr<Options>);
impl Deref for OptionsPtr {
type Target = UniquePtr<Options>;
type Target = Options;
#[inline]
fn deref(&self) -> &Self::Target {
@ -31,14 +35,26 @@ impl Deref for OptionsPtr {
}
}
impl DerefMut for OptionsPtr {
pub struct OptionsPtrShared(SharedPtr<Options>);
impl Deref for OptionsPtrShared {
type Target = Options;
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl OptionsPtr {
#[inline]
pub fn pin_mut(&mut self) -> Pin<&mut Options> {
self.0.pin_mut()
}
#[inline]
pub fn make_shared(self) -> OptionsPtrShared {
OptionsPtrShared(make_shared_options(self.0))
}
#[inline]
pub fn default() -> Self {
Self(new_options())
@ -49,8 +65,8 @@ impl OptionsPtr {
self
}
#[inline]
pub fn increase_parallelism(&mut self) -> &mut Self {
increase_parallelism(self.pin_mut());
pub fn increase_parallelism(&mut self, n_threads: u32) -> &mut Self {
increase_parallelism(self.pin_mut(), n_threads);
self
}
#[inline]
@ -121,14 +137,11 @@ impl Deref for ReadOptionsPtr {
}
}
impl DerefMut for ReadOptionsPtr {
impl ReadOptionsPtr {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
pub fn pin_mut(&mut self) -> Pin<&mut ReadOptions> {
self.0.pin_mut()
}
}
impl ReadOptionsPtr {
#[inline]
pub fn default() -> Self {
Self(new_read_options())
@ -170,14 +183,11 @@ impl Deref for WriteOptionsPtr {
}
}
impl DerefMut for WriteOptionsPtr {
impl WriteOptionsPtr {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
pub fn pin_mut(&mut self) -> Pin<&mut WriteOptions> {
self.0.pin_mut()
}
}
impl WriteOptionsPtr {
#[inline]
pub fn default() -> Self {
Self(new_write_options())
@ -227,14 +237,11 @@ impl Deref for PTxnOptionsPtr {
}
}
impl DerefMut for PTxnOptionsPtr {
impl PTxnOptionsPtr {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
pub fn pin_mut(&mut self) -> Pin<&mut TransactionOptions> {
self.0.pin_mut()
}
}
impl PTxnOptionsPtr {
#[inline]
pub fn default() -> Self {
Self(new_transaction_options())
@ -249,7 +256,7 @@ impl PTxnOptionsPtr {
pub struct OTxnOptionsPtr(pub(crate) UniquePtr<OptimisticTransactionOptions>);
impl Deref for OTxnOptionsPtr {
type Target = UniquePtr<OptimisticTransactionOptions>;
type Target = OptimisticTransactionOptions;
#[inline]
fn deref(&self) -> &Self::Target {
@ -257,13 +264,6 @@ impl Deref for OTxnOptionsPtr {
}
}
impl DerefMut for OTxnOptionsPtr {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl OTxnOptionsPtr {
#[inline]
pub fn new(cmp: &RustComparatorPtr) -> Self {
@ -271,10 +271,11 @@ impl OTxnOptionsPtr {
}
}
pub struct PTxnDBOptionsPtr(UniquePtr<TransactionDBOptions>);
#[derive(Clone)]
pub struct PTxnDbOptionsPtr(SharedPtr<TransactionDBOptions>);
impl Deref for PTxnDBOptionsPtr {
type Target = UniquePtr<TransactionDBOptions>;
impl Deref for PTxnDbOptionsPtr {
type Target = TransactionDBOptions;
#[inline]
fn deref(&self) -> &Self::Target {
@ -282,24 +283,18 @@ impl Deref for PTxnDBOptionsPtr {
}
}
impl DerefMut for PTxnDBOptionsPtr {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl PTxnDBOptionsPtr {
impl PTxnDbOptionsPtr {
#[inline]
pub fn default() -> Self {
Self(new_tdb_options())
}
}
pub struct OTxnDBOptionsPtr(UniquePtr<OptimisticTransactionDBOptions>);
#[derive(Clone)]
pub struct OTxnDbOptionsPtr(SharedPtr<OptimisticTransactionDBOptions>);
impl Deref for OTxnDBOptionsPtr {
type Target = UniquePtr<OptimisticTransactionDBOptions>;
impl Deref for OTxnDbOptionsPtr {
type Target = OptimisticTransactionDBOptions;
#[inline]
fn deref(&self) -> &Self::Target {
@ -307,14 +302,7 @@ impl Deref for OTxnDBOptionsPtr {
}
}
impl DerefMut for OTxnDBOptionsPtr {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl OTxnDBOptionsPtr {
impl OTxnDbOptionsPtr {
#[inline]
pub fn default() -> Self {
Self(new_odb_options())
@ -326,7 +314,8 @@ pub enum TransactOptions {
Optimistic(OTxnOptionsPtr),
}
pub enum TDBOptions {
Pessimistic(PTxnDBOptionsPtr),
Optimistic(OTxnDBOptionsPtr),
#[derive(Clone)]
pub enum TDbOptions {
Pessimistic(PTxnDbOptionsPtr),
Optimistic(OTxnDbOptionsPtr),
}

@ -5,3 +5,4 @@ pub(crate) mod tuple;
pub(crate) mod tuple_set;
pub(crate) mod typing;
pub(crate) mod value;
pub(crate) mod key_order;

@ -0,0 +1,70 @@
use crate::data::tuple::Tuple;
use std::cmp::Ordering;
impl<T: AsRef<[u8]>, T2: AsRef<[u8]>> PartialOrd<Tuple<T2>> for Tuple<T> {
fn partial_cmp(&self, other: &Tuple<T2>) -> Option<Ordering> {
match self.get_prefix().cmp(&other.get_prefix()) {
x @ (Ordering::Less | Ordering::Greater) => return Some(x),
Ordering::Equal => {}
}
Some(self.key_part_cmp(other))
}
}
impl<T: AsRef<[u8]>> Ord for Tuple<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap()
}
}
pub fn compare(a: &[u8], b: &[u8]) -> i8 {
let ta = Tuple::new(a);
let tb = Tuple::new(b);
match ta.cmp(&tb) {
Ordering::Less => -1,
Ordering::Greater => 1,
Ordering::Equal => 0,
}
}
//
// #[cfg(test)]
// mod tests {
// use crate::relation::key_order::compare;
// use crate::relation::tuple::Tuple;
// use crate::relation::value::Value;
// use std::collections::BTreeMap;
//
// #[test]
// fn ordering() {
// let mut t = Tuple::with_prefix(0);
// let t2 = Tuple::with_prefix(123);
// assert_eq!(compare(t.as_ref(), t.as_ref()), 0);
// assert_eq!(compare(t.as_ref(), t2.as_ref()), -1);
// assert_eq!(compare(t2.as_ref(), t.as_ref()), 1);
// let mut t2 = Tuple::with_prefix(0);
// t.push_str("aaa");
// t2.push_str("aaac");
// assert_eq!(compare(t.as_ref(), t2.as_ref()), -1);
// let mut t2 = Tuple::with_prefix(0);
// t2.push_str("aaa");
// t2.push_null();
// assert_eq!(compare(t.as_ref(), t2.as_ref()), -1);
// t.push_null();
// assert_eq!(compare(t.as_ref(), t2.as_ref()), 0);
// t.push_int(-123);
// t2.push_int(123);
// assert_eq!(compare(t.as_ref(), t2.as_ref()), -1);
// assert_eq!(compare(t.as_ref(), t.as_ref()), 0);
// let vals: Value = vec![
// ().into(),
// BTreeMap::new().into(),
// 1e23.into(),
// false.into(),
// Value::from("xxyx"),
// ]
// .into();
// t.push_value(&vals);
// assert_eq!(compare(t.as_ref(), t.as_ref()), 0);
// }
// }

@ -110,16 +110,16 @@ impl<T: AsRef<[u8]>> Tuple<T> {
#[derive(Clone)]
pub(crate) struct Tuple<T>
where
T: AsRef<[u8]>,
where
T: AsRef<[u8]>,
{
pub(crate) data: T,
idx_cache: RefCell<Vec<usize>>,
}
impl<T> Tuple<T>
where
T: AsRef<[u8]>,
where
T: AsRef<[u8]>,
{
pub(crate) fn clear_cache(&self) {
self.idx_cache.borrow_mut().clear()
@ -127,13 +127,14 @@ where
}
impl<T> AsRef<[u8]> for Tuple<T>
where
T: AsRef<[u8]>,
where
T: AsRef<[u8]>,
{
fn as_ref(&self) -> &[u8] {
self.data.as_ref()
}
}
pub(crate) type OwnTuple = Tuple<Vec<u8>>;
pub(crate) const PREFIX_LEN: usize = 4;
@ -160,8 +161,8 @@ impl<T: AsRef<[u8]>> Tuple<T> {
#[inline]
pub(crate) fn key_part_cmp<T2: AsRef<[u8]>>(&self, other: &Tuple<T2>) -> Ordering {
self.iter()
.filter_map(|v| v.ok())
.cmp(other.iter().filter_map(|v| v.ok()))
.map(|v| v.expect("Key comparison failed"))
.cmp(other.iter().map(|v| v.expect("Key comparison failed")))
}
#[inline]
@ -408,7 +409,7 @@ impl<T: AsRef<[u8]>> Tuple<T> {
let (val, offset) = self.parse_value_at(pos + 1)?;
(offset, Value::DescVal(Reverse(val.into())))
}
StorageTag::Max => return Err(UndefinedDataTag(StorageTag::Max as u8)),
StorageTag::Max => (start, Value::EndSentinel),
};
Ok((val, nxt))
}
@ -676,7 +677,7 @@ impl OwnTuple {
impl<'a> Extend<Value<'a>> for OwnTuple {
#[inline]
fn extend<T: IntoIterator<Item = Value<'a>>>(&mut self, iter: T) {
fn extend<T: IntoIterator<Item=Value<'a>>>(&mut self, iter: T) {
for v in iter {
self.push_value(&v)
}

@ -9,7 +9,3 @@
// pub mod query;
// pub mod table;
pub struct DBInstance {
pub(crate) main_db: (),
session_handles: (),
}

@ -2,7 +2,10 @@
// pub mod error;
// pub mod relation;
// pub(crate) mod eval;
// pub(crate) mod db;
pub(crate) mod data;
pub(crate) mod db;
pub(crate) mod logger;
pub(crate) mod parser;
pub(crate) mod runtime;
pub use runtime::instance::DbInstance;

@ -1,20 +1,23 @@
pub(crate) fn init_test_logger() {
let _ = env_logger::builder()
// Include all events in tests
.filter_level(log::LevelFilter::max())
// Ensure events are captured by `cargo test`
.is_test(true)
// Ignore errors initializing the logger if tests race to configure it
.try_init();
}
#[cfg(test)]
mod tests {
fn init_logger() {
let _ = env_logger::builder()
// Include all events in tests
.filter_level(log::LevelFilter::max())
// Ensure events are captured by `cargo test`
.is_test(true)
// Ignore errors initializing the logger if tests race to configure it
.try_init();
}
use super::*;
#[test]
fn test_logger() {
use log::{debug, error, info, log_enabled, Level};
init_logger();
init_test_logger();
debug!("this is a debug {}", "message");
error!("this is printed by default");

@ -0,0 +1,3 @@
pub(crate) mod instance;
pub(crate) mod interpreter;
pub(crate) mod options;

@ -0,0 +1,224 @@
use std::{mem, result};
use cozorocks::{BridgeError, DbPtr, destroy_db, OptionsPtrShared, TDbOptions};
use std::sync::{Arc, LockResult, Mutex, PoisonError};
use log::error;
use crate::data::tuple::Tuple;
use crate::runtime::options::{default_options, default_txn_options, default_write_options};
#[derive(thiserror::Error, Debug)]
pub enum DbInstanceError {
#[error(transparent)]
DbBridgeError(#[from] BridgeError),
#[error("Cannot obtain session lock")]
SessionLockError,
}
type Result<T> = result::Result<T, DbInstanceError>;
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub enum SessionStatus {
Prepared,
Running,
Completed,
}
struct SessionHandle {
id: usize,
db: DbPtr,
status: SessionStatus,
}
pub struct DbInstance {
pub(crate) main: DbPtr,
options: OptionsPtrShared,
tdb_options: TDbOptions,
path: String,
session_handles: Mutex<Vec<Arc<Mutex<SessionHandle>>>>,
destroy_on_close: bool,
}
impl DbInstance {
pub fn new(path: &str, optimistic: bool) -> Result<Self> {
let options = default_options().make_shared();
let tdb_options = default_txn_options(optimistic);
let main = DbPtr::open(&options, &tdb_options, path)?;
Ok(Self {
options,
tdb_options,
main,
path: path.to_string(),
session_handles: vec![].into(),
destroy_on_close: false,
})
}
}
impl DbInstance {
pub fn session(&self) -> Result<Session> {
let mut handles = self.session_handles.lock()
.map_err(|_| DbInstanceError::SessionLockError)?;
let handle = handles.iter().find_map(|handle| {
match handle.try_lock() {
Ok(inner) => {
if inner.status == SessionStatus::Completed {
let db = inner.db.clone();
let idx = inner.id;
Some((db, idx, handle))
} else {
None
}
}
Err(_) => None
}
});
let (temp, handle) = match handle {
None => {
let idx = handles.len();
let temp_path = self.get_session_storage_path(idx);
let temp = DbPtr::open_non_txn(
&self.options,
&temp_path)?;
let handle = Arc::new(Mutex::new(SessionHandle {
status: SessionStatus::Prepared,
id: idx,
db: temp.clone(),
}));
handles.push(handle.clone());
(temp, handle)
}
Some((db, _, handle)) => (db, handle.clone())
};
drop(handles);
Ok(Session {
main: self.main.clone(),
temp,
session_handle: handle,
})
}
pub fn set_destroy_on_close(&mut self, v: bool) {
self.destroy_on_close = v;
}
fn get_session_storage_path(&self, idx: usize) -> String {
format!("{}_sess_{}", self.path, idx)
}
}
impl Drop for DbInstance {
fn drop(&mut self) {
if let Err(e) = self.main.close() {
error!("Encountered error on closing main DB {:?}", e);
}
let mut to_wipe = 0;
match self.session_handles.lock() {
Ok(mut handles) => {
to_wipe = handles.len();
while let Some(handle) = handles.pop() {
match handle.lock() {
Ok(handle) => {
if let Err(e) = handle.db.close() {
error!("Encountered error on closing temp DB {:?}", e);
}
}
Err(e) => {
error!("Cannot obtain handles for DbInstance on drop {:?}", e)
}
}
}
}
Err(e) => {
error!("Cannot obtain handles for DbInstance on drop {:?}", e)
}
}
for i in 0..to_wipe {
let path = self.get_session_storage_path(i);
if let Err(e) = destroy_db(&self.options, &path) {
error!("Encountered error on destroying temp DB {:?}", e);
}
}
if self.destroy_on_close {
let mut temp = unsafe { DbPtr::null() };
mem::swap(&mut temp, &mut self.main);
drop(temp);
if let Err(e) = destroy_db(&self.options, &self.path) {
error!("Encountered error on destroying temp DB {:?}", e);
}
}
}
}
pub struct Session {
pub(crate) main: DbPtr,
pub(crate) temp: DbPtr,
session_handle: Arc<Mutex<SessionHandle>>,
}
impl Session {
pub fn start(&mut self) -> Result<()> {
let mut handle = self.session_handle.lock()
.map_err(|_| DbInstanceError::SessionLockError)?;
handle.status = SessionStatus::Running;
Ok(())
}
fn clear_data(&self) -> Result<()> {
let w_opts = default_write_options();
self.temp
.del_range(&w_opts, Tuple::with_null_prefix(), Tuple::max_tuple())?;
// self.temp.compact_all()?;
Ok(())
}
pub fn stop(&mut self) -> Result<()> {
self.clear_data()?;
let mut handle = self.session_handle.lock()
.map_err(|_| {
error!("failed to stop interpreter");
DbInstanceError::SessionLockError
})?;
handle.status = SessionStatus::Completed;
Ok(())
}
}
impl Drop for Session {
fn drop(&mut self) {
if let Err(e) = self.stop() {
error!("failed to drop session {:?}", e);
}
}
}
#[cfg(test)]
mod tests {
use std::time::Instant;
use crate::logger::init_test_logger;
use super::*;
use crate::runtime::instance::DbInstance;
fn test_send_sync<T: Send + Sync>(_x: T) {}
#[test]
fn creation() -> Result<()> {
init_test_logger();
let start = Instant::now();
let mut db = DbInstance::new("_test", false)?;
db.set_destroy_on_close(true);
dbg!(start.elapsed());
let start = Instant::now();
let mut db2 = DbInstance::new("_test2", true)?;
db2.set_destroy_on_close(true);
for _ in 0..1000 {
let i1 = db2.session()?;
test_send_sync(i1);
}
dbg!(start.elapsed());
Ok(())
}
}

@ -0,0 +1,43 @@
use lazy_static::lazy_static;
use cozorocks::{FlushOptionsPtr, OptionsPtr, OTxnDbOptionsPtr, OTxnOptionsPtr, PTxnDbOptionsPtr, PTxnOptionsPtr, ReadOptionsPtr, RustComparatorPtr, TDbOptions, WriteOptionsPtr};
use crate::data::tuple::PREFIX_LEN;
const COMPARATOR_NAME: &str = "cozo_cmp_v1";
lazy_static! {
static ref DEFAULT_COMPARATOR: RustComparatorPtr = RustComparatorPtr::new(
COMPARATOR_NAME,
crate::data::key_order::compare,
false);
}
pub fn default_options() -> OptionsPtr {
let mut options = OptionsPtr::default();
options
.set_comparator(&DEFAULT_COMPARATOR)
.set_create_if_missing(true)
.set_bloom_filter(10., true)
.set_fixed_prefix_extractor(PREFIX_LEN);
options
}
pub fn default_read_options() -> ReadOptionsPtr {
ReadOptionsPtr::default()
}
pub fn default_write_options() -> WriteOptionsPtr {
WriteOptionsPtr::default()
}
pub fn default_flush_options() -> FlushOptionsPtr {
FlushOptionsPtr::default()
}
pub fn default_txn_options(optimistic: bool) -> TDbOptions {
if optimistic {
TDbOptions::Optimistic(OTxnDbOptionsPtr::default())
} else {
TDbOptions::Pessimistic(PTxnDbOptionsPtr::default())
}
}
Loading…
Cancel
Save