cleaning up session drop

main
Ziyang Hu 2 years ago
parent c5ae62cea7
commit 16f04399cd

@ -358,6 +358,24 @@ struct TransactionBridge {
);
}
inline void del_range_raw(
const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> start_key,
rust::Slice<const uint8_t> end_key,
BridgeStatus &status
) const {
write_status(
raw_db->GetRootDB()->DeleteRange(
*raw_w_ops, const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(start_key), convert_slice(end_key)),
status);
}
inline void flush_raw(const ColumnFamilyHandle &cf, const FlushOptions &options, BridgeStatus &status) const {
write_status(raw_db->Flush(options, const_cast<ColumnFamilyHandle *>(&cf)), status);
}
inline std::unique_ptr<IteratorBridge> iterator_txn(
const ColumnFamilyHandle &cf) const {
return std::make_unique<IteratorBridge>(
@ -514,6 +532,18 @@ inline unique_ptr<OptimisticTransactionDBOptions> new_odb_options() {
return make_unique<OptimisticTransactionDBOptions>();
}
inline unique_ptr<FlushOptions> new_flush_options() {
return make_unique<FlushOptions>();
}
void set_flush_wait(FlushOptions &options, bool v) {
options.wait = v;
}
void set_allow_write_stall(FlushOptions &options, bool v) {
options.allow_write_stall = v;
}
inline unique_ptr<TDBBridge>
open_tdb_raw(const Options &options,
const TransactionDBOptions &txn_db_options,

@ -105,6 +105,11 @@ mod ffi {
type OptimisticTransactionDBOptions;
fn new_odb_options() -> UniquePtr<OptimisticTransactionDBOptions>;
type FlushOptions;
fn new_flush_options() -> UniquePtr<FlushOptions>;
fn set_flush_wait(o: Pin<&mut FlushOptions>, v: bool);
fn set_allow_write_stall(o: Pin<&mut FlushOptions>, v: bool);
type RustComparator;
fn new_rust_comparator(name: &str, cmp: fn(&[u8], &[u8]) -> i8, diff_bytes_can_equal: bool) -> UniquePtr<RustComparator>;
@ -140,6 +145,9 @@ mod ffi {
status: &mut BridgeStatus);
fn del_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8],
status: &mut BridgeStatus);
fn del_range_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle,
start_key: &[u8], end_key: &[u8], status: &mut BridgeStatus);
fn flush_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle, options: &FlushOptions, status: &mut BridgeStatus);
fn iterator_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle) -> UniquePtr<IteratorBridge>;
fn iterator_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle) -> UniquePtr<IteratorBridge>;

@ -233,6 +233,33 @@ impl WriteOptionsPtr {
}
}
pub struct FlushOptionsPtr(UniquePtr<FlushOptions>);
impl Deref for FlushOptionsPtr {
type Target = UniquePtr<FlushOptions>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl FlushOptionsPtr {
#[inline]
pub fn default() -> Self {
Self(new_flush_options())
}
#[inline]
pub fn set_allow_write_stall(&mut self, v: bool) -> &mut Self {
set_allow_write_stall(self.0.pin_mut(), v);
self
}
#[inline]
pub fn set_flush_wait(&mut self, v: bool) -> &mut Self {
set_flush_wait(self.0.pin_mut(), v);
self
}
}
pub struct PTxnOptionsPtr(UniquePtr<TransactionOptions>);
impl Deref for PTxnOptionsPtr {
@ -525,6 +552,18 @@ impl TransactionPtr {
}
}
#[inline]
pub fn del_range(&self, cf: &ColumnFamilyHandle, start_key: impl AsRef<[u8]>, end_key: impl AsRef<[u8]>) -> Result<()> {
let mut status = BridgeStatus::default();
let ret = self.del_range_raw(cf, start_key.as_ref(), end_key.as_ref(), &mut status);
status.check_err(ret)
}
#[inline]
pub fn flush(&self, cf: &ColumnFamilyHandle, options: FlushOptionsPtr) -> Result<()> {
let mut status = BridgeStatus::default();
self.flush_raw(cf, &options, &mut status);
status.check_err(())
}
#[inline]
pub fn put(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> {
let mut status = BridgeStatus::default();
if transact {

@ -8,8 +8,10 @@ use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
use uuid::v1::{Context, Timestamp};
use rand::Rng;
use crate::db::eval::Environment;
use crate::error::{CozoError, Result};
use crate::error::CozoError::{Poisoned, SessionErr};
use crate::relation::tuple::Tuple;
pub struct EngineOptions {
cmp: RustComparatorPtr,
@ -161,7 +163,10 @@ impl<'a> Session<'a> {
Ok(())
}
pub fn finish_work(&mut self) -> Result<()> {
self.handle.write().map_err(|_| Poisoned)?.status = SessionStatus::Completed;
self.txn.del_range(&self.temp_cf, Tuple::with_null_prefix(), Tuple::max_tuple())?;
let mut options = FlushOptionsPtr::default();
options.set_allow_write_stall(true).set_flush_wait(true);
self.txn.flush(&self.temp_cf, options)?;
Ok(())
}
}
@ -169,7 +174,12 @@ impl<'a> Session<'a> {
impl<'a> Drop for Session<'a> {
fn drop(&mut self) {
if let Err(e) = self.finish_work() {
eprintln!("{:?}", e);
eprintln!("Dropping session failed {:?}", e);
}
if let Ok(mut h) = self.handle.write().map_err(|_| Poisoned) {
h.status = SessionStatus::Completed;
} else {
eprintln!("Accessing lock of session handle failed");
}
}
}

@ -34,9 +34,6 @@ impl<'a> Environment<SlicePtr> for Session<'a> {
}
fn pop_env(&mut self) -> Result<()> {
if self.stack_depth == 0 {
return Ok(());
}
// Remove all stuff starting with the stack depth from the temp session
let mut prefix = Tuple::with_null_prefix();
prefix.push_int(self.stack_depth as i64);
@ -58,7 +55,9 @@ impl<'a> Environment<SlicePtr> for Session<'a> {
}
}
self.stack_depth += 1;
if self.stack_depth != 0 {
self.stack_depth += 1;
}
Ok(())
}

@ -85,6 +85,7 @@ impl<T: AsRef<[u8]>> Tuple<T> {
}
Tag::List => start + u32::from_be_bytes(data[start..start + 4].try_into().unwrap()) as usize,
Tag::Dict => start + u32::from_be_bytes(data[start..start + 4].try_into().unwrap()) as usize,
Tag::MaxTag => panic!()
};
self.idx_cache.borrow_mut().push(nxt);
}
@ -187,6 +188,7 @@ impl<T: AsRef<[u8]>> Tuple<T> {
}
(end_pos, collected.into())
}
Tag::MaxTag => (start, Value::End_Sentinel)
};
(val, nxt)
}
@ -240,6 +242,16 @@ impl Tuple<Vec<u8>> {
}
}
#[inline]
pub fn max_tuple() -> Self {
let mut ret = Tuple::with_prefix(u32::MAX);
ret.seal_with_sentinel();
ret
}
#[inline]
pub fn seal_with_sentinel(&mut self) {
self.push_tag(Tag::MaxTag);
}
#[inline]
fn push_tag(&mut self, tag: Tag) {
self.data.push(tag as u8);
}
@ -327,6 +339,7 @@ impl Tuple<Vec<u8>> {
cache.truncate(start_len);
cache.push(self.data.len());
}
Value::End_Sentinel => panic!("Cannot push sentinel value")
}
}
@ -359,7 +372,7 @@ impl Tuple<Vec<u8>> {
}
}
impl <'a> Extend<Value<'a>> for Tuple<Vec<u8>> {
impl<'a> Extend<Value<'a>> for Tuple<Vec<u8>> {
#[inline]
fn extend<T: IntoIterator<Item=Value<'a>>>(&mut self, iter: T) {
for v in iter {

@ -42,6 +42,7 @@ pub enum Tag {
// C32Arr = 72,
// C64Arr = 73,
// C128Arr = 74,
MaxTag = u8::MAX
}
impl TryFrom<u8> for Tag {
@ -60,6 +61,7 @@ impl TryFrom<u8> for Tag {
8 => UInt,
9 => List,
10 => Dict,
u8::MAX => MaxTag,
v => return Err(v)
})
}
@ -76,6 +78,7 @@ pub enum Value<'a> {
Text(Cow<'a, str>),
List(Vec<Value<'a>>),
Dict(BTreeMap<Cow<'a, str>, Value<'a>>),
End_Sentinel
}
pub type StaticValue = Value<'static>;
@ -95,6 +98,7 @@ impl<'a> Value<'a> {
Value::Dict(d) => d.into_iter()
.map(|(k, v)| (Cow::Owned(k.into_owned()), v.to_static()))
.collect::<BTreeMap<Cow<'static, str>, StaticValue>>().into(),
Value::End_Sentinel => panic!("Cannot process sentinel value")
}
}
}
@ -249,6 +253,9 @@ impl<'a> Display for Value<'a> {
}
f.write_char('}')?;
}
Value::End_Sentinel => {
write!(f, "Sentinel")?
}
}
Ok(())
}

Loading…
Cancel
Save