From dd45ebb291a08f8f9421fbb4dae135a870a17abe Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Sun, 24 Apr 2022 00:30:24 +0800 Subject: [PATCH] Aliasing is serious --- cozorocks/bridge/cozorocks.h | 12 +++-- cozorocks/src/lib.rs | 90 +++++++++++++++++++++--------------- src/db/engine.rs | 70 +++++++++++++++++++++++----- src/db/eval.rs | 80 ++++++++++++++++++++++---------- src/relation/key_order.rs | 28 +++++++---- src/relation/table.rs | 3 +- src/relation/tuple.rs | 46 ++++++++++++------ src/relation/value.rs | 49 ++++++++++---------- 8 files changed, 257 insertions(+), 121 deletions(-) diff --git a/cozorocks/bridge/cozorocks.h b/cozorocks/bridge/cozorocks.h index c5fd0e93..984a0446 100644 --- a/cozorocks/bridge/cozorocks.h +++ b/cozorocks/bridge/cozorocks.h @@ -5,6 +5,7 @@ #pragma once #include +#include #include #include "rust/cxx.h" @@ -83,9 +84,10 @@ public: } const char *Name() const { - return "RustComparator"; + return name.c_str(); } + virtual bool CanKeysWithDifferentByteContentsBeEqual() const { return true; } void FindShortestSeparator(std::string *, const rocksdb::Slice &) const {} @@ -179,6 +181,7 @@ struct IteratorBridge { } inline std::unique_ptr key_raw() const { +// std::cout << "c++ get " << inner->key().size() << std::endl; return std::make_unique(inner->key()); } @@ -308,12 +311,15 @@ struct TransactionBridge { rust::Slice val, BridgeStatus &status ) const { + auto k = convert_slice(key); + auto v = convert_slice(val); +// std::cout << "c++ put " << key.size() << " " << k.size() << std::endl; write_status( raw_db->Put( *raw_w_ops, const_cast(&cf), - convert_slice(key), - convert_slice(val)), + k, + v), status ); } diff --git a/cozorocks/src/lib.rs b/cozorocks/src/lib.rs index d7fca3f4..a7204c0a 100644 --- a/cozorocks/src/lib.rs +++ b/cozorocks/src/lib.rs @@ -74,21 +74,22 @@ impl From for Option { pub type Result = std::result::Result; -pub trait SlicePtr { - fn as_bytes(&self) -> &[u8]; +pub enum SlicePtr { + Plain(UniquePtr), + Pinnable(UniquePtr), } -impl SlicePtr for UniquePtr { +impl AsRef<[u8]> for SlicePtr { #[inline] - fn as_bytes(&self) -> &[u8] { - convert_slice_back(self) - } -} - -impl SlicePtr for UniquePtr { - #[inline] - fn as_bytes(&self) -> &[u8] { - convert_pinnable_slice_back(self) + fn as_ref(&self) -> &[u8] { + match self { + SlicePtr::Plain(s) => { + convert_slice_back(s) + } + SlicePtr::Pinnable(s) => { + convert_pinnable_slice_back(s) + } + } } } @@ -374,12 +375,29 @@ impl IteratorPtr { IteratorBridge::do_seek_for_prev(self, key.as_ref()) } #[inline] - pub fn key(&self) -> UniquePtr { - IteratorBridge::key_raw(self) + pub fn key(&self) -> Option { + if self.is_valid() { + Some(SlicePtr::Plain(IteratorBridge::key_raw(self))) + } else { + None + } } #[inline] - pub fn val(&self) -> UniquePtr { - IteratorBridge::value_raw(self) + pub fn val(&self) -> Option { + if self.is_valid() { + Some(SlicePtr::Plain(IteratorBridge::value_raw(self))) + } else { + None + } + } + #[inline] + pub fn pair(&self) -> Option<(SlicePtr, SlicePtr)> { + if self.is_valid() { + Some((SlicePtr::Plain(IteratorBridge::key_raw(self)), + SlicePtr::Plain(IteratorBridge::value_raw(self)))) + } else { + None + } } #[inline] pub fn status(&self) -> BridgeStatus { @@ -387,48 +405,46 @@ impl IteratorPtr { } #[inline] pub fn iter(&self) -> KVIterator { - KVIterator { it: self } + KVIterator { it: self, should_next: false } } #[inline] pub fn keys(&self) -> KeyIterator { - KeyIterator { it: self } + KeyIterator { it: self, should_next: false } } } pub struct KVIterator<'a> { it: &'a IteratorPtr, + should_next: bool, } impl Iterator for KVIterator<'_> { - type Item = (UniquePtr, UniquePtr); + type Item = (SlicePtr, SlicePtr); #[inline] fn next(&mut self) -> Option { - if self.it.is_valid() { - let ret = (self.it.key(), self.it.val()); - self.next(); - Some(ret) - } else { - None + if self.should_next { + self.it.next(); } + self.should_next = true; + self.it.pair() } } pub struct KeyIterator<'a> { it: &'a IteratorPtr, + should_next: bool, } impl Iterator for KeyIterator<'_> { - type Item = UniquePtr; + type Item = SlicePtr; #[inline] fn next(&mut self) -> Option { - if self.it.is_valid() { - let ret = self.it.key(); - self.next(); - Some(ret) - } else { - None + if self.should_next { + self.it.next(); } + self.should_next = true; + self.it.key() } } @@ -481,21 +497,21 @@ impl TransactionPtr { status.check_err(()) } #[inline] - pub fn get(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result> { + pub fn get(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result { let mut status = BridgeStatus::default(); if transact { let ret = self.get_txn(cf, key.as_ref(), &mut status); - status.check_err(ret) + status.check_err(SlicePtr::Pinnable(ret)) } else { let ret = self.get_raw(cf, key.as_ref(), &mut status); - status.check_err(ret) + status.check_err(SlicePtr::Pinnable(ret)) } } #[inline] - pub fn get_for_update(&self, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result> { + pub fn get_for_update(&self, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result { let mut status = BridgeStatus::default(); let ret = self.get_for_update_txn(cf, key.as_ref(), &mut status); - status.check_err(ret) + status.check_err(SlicePtr::Pinnable(ret)) } #[inline] pub fn del(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result<()> { diff --git a/src/db/engine.rs b/src/db/engine.rs index eb128c0a..b4c82444 100644 --- a/src/db/engine.rs +++ b/src/db/engine.rs @@ -94,14 +94,14 @@ impl Engine { Some(h) => h.clone() }; - return Session { + Session { engine: self, stack_depth: 0, txn: TransactionPtr::null(), perm_cf: SharedPtr::null(), temp_cf: SharedPtr::null(), handle, - }; + } } } @@ -119,8 +119,9 @@ pub struct Session<'a> { impl<'a> Session<'a> { pub fn start(&mut self) { self.perm_cf = self.engine.db.default_cf(); - let name = self.handle.read().unwrap().cf_ident.to_string(); - self.temp_cf = self.engine.db.get_cf(name).unwrap(); + assert!(!self.perm_cf.is_null()); + self.temp_cf = self.engine.db.get_cf(&self.handle.read().unwrap().cf_ident).unwrap(); + assert!(!self.temp_cf.is_null()); let t_options = match self.engine.options_store.t_options { TDBOptions::Pessimistic(_) => { TransactOptions::Pessimistic(PTxnOptionsPtr::default()) @@ -129,8 +130,10 @@ impl<'a> Session<'a> { TransactOptions::Optimistic(OTxnOptionsPtr::new(&self.engine.options_store.cmp)) } }; - let r_opts = ReadOptionsPtr::default(); - let rx_opts = ReadOptionsPtr::default(); + let mut r_opts = ReadOptionsPtr::default(); + r_opts.set_total_order_seek(true); + let mut rx_opts = ReadOptionsPtr::default(); + rx_opts.set_total_order_seek(true); let w_opts = WriteOptionsPtr::default(); let mut wx_opts = WriteOptionsPtr::default(); wx_opts.set_disable_wal(true); @@ -159,8 +162,35 @@ pub enum SessionStatus { #[cfg(test)] mod tests { use std::{fs, thread}; + use crate::db::eval::Environment; + use crate::relation::table::DataKind::Value; + use crate::relation::tuple::Tuple; use super::*; + #[test] + fn push_get() { + { + let engine = Engine::new("_push_get".to_string(), false).unwrap(); + let mut sess = engine.session(); + sess.start(); + for i in (-80..-40).step_by(10) { + let mut ikey = Tuple::with_prefix(0); + ikey.push_int(i); + ikey.push_str("pqr"); + println!("in {:?} {:?}", ikey, ikey.data); + sess.txn.put(false, &sess.temp_cf, &ikey, &ikey).unwrap(); + println!("out {:?}", sess.txn.get(false, &sess.temp_cf, &ikey).unwrap().as_ref()); + } + let it = sess.txn.iterator(false, &sess.temp_cf); + it.to_first(); + for (key, val) in it.iter() { + println!("a: {:?} {:?}", key.as_ref(), val.as_ref()); + println!("v: {:?} {:?}", Tuple::new(key), Tuple::new(val)); + } + } + let _ = fs::remove_dir_all("_push_get"); + } + #[test] fn test_create() { let p1 = "_test_db_create1"; @@ -170,7 +200,7 @@ mod tests { { let engine = Engine::new(p1.to_string(), true); assert!(engine.is_ok()); - let engine = Engine::new(p2.to_string(), true); + let engine = Engine::new(p2.to_string(), false); assert!(engine.is_ok()); let engine = Engine::new(p3.to_string(), true); assert!(engine.is_ok()); @@ -179,7 +209,7 @@ mod tests { } let engine2 = Engine::new(p2.to_string(), false); assert!(engine2.is_ok()); - let engine2 = Arc::new(Engine::new(p3.to_string(), false).unwrap()); + let engine2 = Arc::new(Engine::new(p3.to_string(), true).unwrap()); { for _i in 0..10 { let mut _sess = engine2.session(); @@ -196,12 +226,30 @@ mod tests { let mut thread_handles = vec![]; println!("concurrent"); - for i in 0..10 { + for i in 0..1 { let engine = engine2.clone(); thread_handles.push(thread::spawn(move || { println!("In thread {}", i); - let mut _sess = engine.session(); - _sess.start(); + let mut sess = engine.session(); + sess.start(); + for _ in 0..10000 { + sess.push_env(); + sess.define_variable("abc", &"xyz".into(), true); + sess.define_variable("pqr", &"xyz".into(), false); + } + println!("pqr {:?}", sess.resolve("pqr")); + println!("uvw {:?}", sess.resolve("uvw")); + println!("aaa {:?}", sess.resolve("aaa")); + let it = sess.txn.iterator(false, &sess.temp_cf); + it.to_first(); + // for (key, val) in it.iter() { + // println!("a: {:?} {:?}", key.as_ref(), val.as_ref()); + // println!("v: {:?}", Tuple::new(key)); + // } + for _ in 0..5000 { + sess.pop_env(); + } + println!("pqr {:?}", sess.resolve("pqr")); println!("In thread {} end", i); })) } diff --git a/src/db/eval.rs b/src/db/eval.rs index 4c3f3761..093d2884 100644 --- a/src/db/eval.rs +++ b/src/db/eval.rs @@ -1,31 +1,21 @@ -use crate::db::engine::{Engine, Session}; -use crate::relation::table::Table; -use crate::relation::tuple::Tuple; +use cozorocks::SlicePtr; +use crate::db::engine::{Session}; +use crate::relation::table::{DataKind, Table}; +use crate::relation::tuple::{Tuple}; use crate::relation::typing::Typing; use crate::relation::value::Value; -pub trait Environment { +pub trait Environment> { fn push_env(&mut self); fn pop_env(&mut self); fn define_variable(&mut self, name: &str, val: &Value, in_root: bool); fn define_type_alias(&mut self, name: &str, typ: &Typing, in_root: bool); fn define_table(&mut self, table: &Table, in_root: bool); - fn resolve(&mut self, name: &str); + fn resolve(&mut self, name: &str) -> Option>; fn delete_defined(&mut self, name: &str, in_root: bool); } -#[repr(u8)] -enum DefinableTag { - Value = 1, - Typing = 2, - Node = 3, - Edge = 4, - Associate = 5, - Index = 6, -} - - impl<'a> Session<'a> { fn encode_definable_key(&self, name: &str, in_root: bool) -> Tuple> { let depth_code = if in_root { 0 } else { self.stack_depth as i64 }; @@ -37,7 +27,7 @@ impl<'a> Session<'a> { } -impl<'a> Environment for Session<'a> { +impl<'a> Environment for Session<'a> { fn push_env(&mut self) { self.stack_depth -= 1; } @@ -47,17 +37,40 @@ impl<'a> Environment for Session<'a> { return; } // Remove all stuff starting with the stack depth from the temp session + let mut prefix = Tuple::with_prefix(0); + prefix.push_int(self.stack_depth as i64); + let it = self.txn.iterator(false, &self.temp_cf); + it.seek(&prefix); + for val in it.keys() { + let cur = Tuple::new(val); + if cur.starts_with(&prefix) { + let name = cur.get(1).unwrap(); + let mut ikey = Tuple::with_prefix(0); + ikey.push_value(&name); + ikey.push_int(self.stack_depth as i64); + + self.txn.del(false, &self.temp_cf, cur).unwrap(); + self.txn.del(false, &self.temp_cf, ikey).unwrap(); + } else { + break; + } + } + self.stack_depth += 1; } fn define_variable(&mut self, name: &str, val: &Value, in_root: bool) { + let key = self.encode_definable_key(name, in_root); + let mut data = Tuple::with_prefix(DataKind::Value as u32); + data.push_value(val); if in_root { - todo!() + self.txn.put(true, &self.perm_cf, key, data).unwrap(); } else { - let key = self.encode_definable_key(name, in_root); - let mut data = Tuple::with_prefix(0); - data.push_uint(DefinableTag::Value as u8 as u64); - data.push_value(val); + let mut ikey = Tuple::with_prefix(0); + ikey.push_int(self.stack_depth as i64); + ikey.push_str(name); + self.txn.put(false, &self.temp_cf, key, data).unwrap(); + self.txn.put(false, &self.temp_cf, ikey, "").unwrap(); } } @@ -69,8 +82,27 @@ impl<'a> Environment for Session<'a> { todo!() } - fn resolve(&mut self, name: &str) { - todo!() + fn resolve(&mut self, name: &str) -> Option> { + let mut tuple = Tuple::with_prefix(0); + tuple.push_str(name); + let it = self.txn.iterator(false, &self.temp_cf); + it.seek(&tuple); + match it.pair() { + None => { + None + } + Some((tk, vk)) => { + let k = Tuple::new(tk); + if k.starts_with(&tuple) { + println!("Resolved to key {:?}", k); + let vt = Tuple::new(vk); + // let v = vt.iter().collect::>(); + Some(vt) + } else { + None + } + } + } } fn delete_defined(&mut self, name: &str, in_root: bool) { diff --git a/src/relation/key_order.rs b/src/relation/key_order.rs index 91580e7d..c2aab9e8 100644 --- a/src/relation/key_order.rs +++ b/src/relation/key_order.rs @@ -1,18 +1,30 @@ use std::cmp::Ordering; use crate::relation::tuple::Tuple; +impl, T2: AsRef<[u8]>> PartialOrd> for Tuple { + fn partial_cmp(&self, other: &Tuple) -> Option { + match self.get_prefix().cmp(&other.get_prefix()) { + x @ (Ordering::Less | Ordering::Greater) => return Some(x), + Ordering::Equal => {} + } + Some(self.iter().cmp(other.iter())) + } +} + +impl> Ord for Tuple { + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap() + } +} + pub fn compare(a: &[u8], b: &[u8]) -> i8 { let ta = Tuple::new(a); let tb = Tuple::new(b); - match ta.get_prefix().cmp(&tb.get_prefix()) { - Ordering::Less => return -1, - Ordering::Greater => return 1, - Ordering::Equal => {} - } - match ta.iter().cmp(tb.iter()) { + + match ta.cmp(&tb) { Ordering::Less => -1, - Ordering::Equal => 0, - Ordering::Greater => 1 + Ordering::Greater => 1, + Ordering::Equal => 0 } } diff --git a/src/relation/table.rs b/src/relation/table.rs index cfb39007..4b4bbdca 100644 --- a/src/relation/table.rs +++ b/src/relation/table.rs @@ -1,8 +1,9 @@ use crate::relation::typing::Typing; -#[repr(u8)] +#[repr(u32)] #[derive(Ord, PartialOrd, Eq, PartialEq)] pub enum DataKind { + DataTuple = 0, Node = 1, Edge = 2, Associate = 3, diff --git a/src/relation/tuple.rs b/src/relation/tuple.rs index 643c4d43..39d83519 100644 --- a/src/relation/tuple.rs +++ b/src/relation/tuple.rs @@ -1,11 +1,12 @@ use std::borrow::{Cow}; use std::cell::RefCell; use std::collections::BTreeMap; +use std::fmt::{Debug, Formatter}; use std::hash::{Hash, Hasher}; use uuid::Uuid; use crate::relation::value::{Tag, Value}; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct Tuple where T: AsRef<[u8]> { @@ -22,6 +23,11 @@ impl AsRef<[u8]> for Tuple where T: AsRef<[u8]> { const PREFIX_LEN: usize = 4; impl> Tuple { + #[inline] + pub fn starts_with>(&self, other: &Tuple) -> bool { + self.data.as_ref().starts_with(other.data.as_ref()) + } + #[inline] pub fn new(data: T) -> Self { Self { @@ -66,7 +72,7 @@ impl> Tuple { let data = self.data.as_ref(); let tag_start = *self.idx_cache.borrow().last().unwrap_or(&PREFIX_LEN); let start = tag_start + 1; - let nxt = match Tag::from(data[tag_start]) { + let nxt = match Tag::try_from(data[tag_start]).unwrap() { Tag::Null | Tag::BoolTrue | Tag::BoolFalse => start, Tag::Int | Tag::UInt => start + self.parse_varint(start).1, Tag::Float => start + 8, @@ -77,7 +83,7 @@ impl> Tuple { start + slen + offset } Tag::List => start + u32::from_be_bytes(data[start..start + 4].try_into().unwrap()) as usize, - Tag::Dict => start + u32::from_be_bytes(data[start..start + 4].try_into().unwrap()) as usize + Tag::Dict => start + u32::from_be_bytes(data[start..start + 4].try_into().unwrap()) as usize, }; self.idx_cache.borrow_mut().push(nxt); } @@ -120,7 +126,11 @@ impl> Tuple { fn parse_value_at(&self, pos: usize) -> (Value, usize) { let data = self.data.as_ref(); let start = pos + 1; - let (nxt, val): (usize, Value) = match Tag::from(data[pos]) { + let tag = match Tag::try_from(data[pos]) { + Ok(t) => t, + Err(e) => panic!("Cannot parse tag {} for {:?}", e, data) + }; + let (nxt, val): (usize, Value) = match tag { Tag::Null => (start, ().into()), Tag::BoolTrue => (start, true.into()), Tag::BoolFalse => (start, false.into()), @@ -187,6 +197,12 @@ impl> Tuple { } } +impl> Debug for Tuple { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.iter().collect::>()) + } +} + pub struct TupleIter<'a, T: AsRef<[u8]>> { tuple: &'a Tuple, pos: usize, @@ -280,9 +296,7 @@ impl Tuple> { } let length = (self.data.len() - start_pos) as u32; let length_bytes = length.to_be_bytes(); - for i in 0..4 { - self.data[start_pos + i] = length_bytes[i] - } + self.data[start_pos..(4 + start_pos)].clone_from_slice(&length_bytes[..4]); let mut cache = self.idx_cache.borrow_mut(); cache.truncate(start_len); cache.push(self.data.len()); @@ -299,9 +313,7 @@ impl Tuple> { } let length = (self.data.len() - start_pos) as u32; let length_bytes = length.to_be_bytes(); - for i in 0..4 { - self.data[start_pos + i] = length_bytes[i] - } + self.data[start_pos..(4 + start_pos)].clone_from_slice(&length_bytes[..4]); let mut cache = self.idx_cache.borrow_mut(); cache.truncate(start_len); cache.push(self.data.len()); @@ -347,14 +359,14 @@ impl <'a> Extend> for Tuple> { } } -impl> PartialEq for Tuple { +impl, T2: AsRef<[u8]>> PartialEq> for Tuple { #[inline] - fn eq(&self, other: &Self) -> bool { + fn eq(&self, other: &Tuple) -> bool { self.data.as_ref() == other.data.as_ref() } } -impl > Hash for Tuple { +impl> Hash for Tuple { fn hash(&self, state: &mut H) { self.data.as_ref().hash(state); } @@ -479,4 +491,12 @@ mod tests { println!("{:?}", v); } */ + + #[test] + fn particular() { + let mut v = Tuple::with_prefix(0); + v.push_str("pqr"); + v.push_int(-64); + println!("{:?} {:?}", v, v.data); + } } \ No newline at end of file diff --git a/src/relation/value.rs b/src/relation/value.rs index d52f5010..f6473a15 100644 --- a/src/relation/value.rs +++ b/src/relation/value.rs @@ -7,16 +7,16 @@ use uuid::Uuid; #[repr(u8)] #[derive(Ord, PartialOrd, Eq, PartialEq)] pub enum Tag { - BoolFalse = 0, + BoolFalse = 1, Null = 2, - BoolTrue = 4, - Int = 11, - Float = 13, - Text = 15, - Uuid = 17, - UInt = 21, - List = 101, - Dict = 103, + BoolTrue = 3, + Int = 4, + Float = 5, + Text = 6, + Uuid = 7, + UInt = 8, + List = 9, + Dict = 10, // Timestamp = 23, // Datetime = 25, // Timezone = 27, @@ -44,23 +44,24 @@ pub enum Tag { // C128Arr = 74, } -impl From for Tag { +impl TryFrom for Tag { + type Error = u8; #[inline] - fn from(u: u8) -> Self { + fn try_from(u: u8) -> std::result::Result { use self::Tag::*; - match u { - 0 => BoolFalse, + Ok(match u { + 1 => BoolFalse, 2 => Null, - 4 => BoolTrue, - 11 => Int, - 13 => Float, - 15 => Text, - 17 => Uuid, - 21 => UInt, - 101 => List, - 103 => Dict, - _ => panic!("Unexpected value tag {}", u) - } + 3 => BoolTrue, + 4 => Int, + 5 => Float, + 6 => Text, + 7 => Uuid, + 8 => UInt, + 9 => List, + 10 => Dict, + v => return Err(v) + }) } } @@ -93,7 +94,7 @@ impl<'a> Value<'a> { Value::List(l) => l.into_iter().map(|v| v.to_static()).collect::>().into(), Value::Dict(d) => d.into_iter() .map(|(k, v)| (Cow::Owned(k.into_owned()), v.to_static())) - .collect::, StaticValue>>().into() + .collect::, StaticValue>>().into(), } } }