Aliasing is serious

main
Ziyang Hu 2 years ago
parent 3ef1259fa1
commit dd45ebb291

@ -5,6 +5,7 @@
#pragma once
#include <memory>
#include<iostream>
#include <shared_mutex>
#include "rust/cxx.h"
@ -83,9 +84,10 @@ public:
}
const char *Name() const {
return "RustComparator";
return name.c_str();
}
virtual bool CanKeysWithDifferentByteContentsBeEqual() const { return true; }
void FindShortestSeparator(std::string *, const rocksdb::Slice &) const {}
@ -179,6 +181,7 @@ struct IteratorBridge {
}
inline std::unique_ptr<Slice> key_raw() const {
// std::cout << "c++ get " << inner->key().size() << std::endl;
return std::make_unique<Slice>(inner->key());
}
@ -308,12 +311,15 @@ struct TransactionBridge {
rust::Slice<const uint8_t> val,
BridgeStatus &status
) const {
auto k = convert_slice(key);
auto v = convert_slice(val);
// std::cout << "c++ put " << key.size() << " " << k.size() << std::endl;
write_status(
raw_db->Put(
*raw_w_ops,
const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(key),
convert_slice(val)),
k,
v),
status
);
}

@ -74,21 +74,22 @@ impl From<BridgeStatus> for Option<BridgeError> {
pub type Result<T> = std::result::Result<T, BridgeError>;
pub trait SlicePtr {
fn as_bytes(&self) -> &[u8];
pub enum SlicePtr {
Plain(UniquePtr<Slice>),
Pinnable(UniquePtr<PinnableSlice>),
}
impl SlicePtr for UniquePtr<Slice> {
impl AsRef<[u8]> for SlicePtr {
#[inline]
fn as_bytes(&self) -> &[u8] {
convert_slice_back(self)
}
}
impl SlicePtr for UniquePtr<PinnableSlice> {
#[inline]
fn as_bytes(&self) -> &[u8] {
convert_pinnable_slice_back(self)
fn as_ref(&self) -> &[u8] {
match self {
SlicePtr::Plain(s) => {
convert_slice_back(s)
}
SlicePtr::Pinnable(s) => {
convert_pinnable_slice_back(s)
}
}
}
}
@ -374,12 +375,29 @@ impl IteratorPtr {
IteratorBridge::do_seek_for_prev(self, key.as_ref())
}
#[inline]
pub fn key(&self) -> UniquePtr<Slice> {
IteratorBridge::key_raw(self)
pub fn key(&self) -> Option<SlicePtr> {
if self.is_valid() {
Some(SlicePtr::Plain(IteratorBridge::key_raw(self)))
} else {
None
}
}
#[inline]
pub fn val(&self) -> UniquePtr<Slice> {
IteratorBridge::value_raw(self)
pub fn val(&self) -> Option<SlicePtr> {
if self.is_valid() {
Some(SlicePtr::Plain(IteratorBridge::value_raw(self)))
} else {
None
}
}
#[inline]
pub fn pair(&self) -> Option<(SlicePtr, SlicePtr)> {
if self.is_valid() {
Some((SlicePtr::Plain(IteratorBridge::key_raw(self)),
SlicePtr::Plain(IteratorBridge::value_raw(self))))
} else {
None
}
}
#[inline]
pub fn status(&self) -> BridgeStatus {
@ -387,48 +405,46 @@ impl IteratorPtr {
}
#[inline]
pub fn iter(&self) -> KVIterator {
KVIterator { it: self }
KVIterator { it: self, should_next: false }
}
#[inline]
pub fn keys(&self) -> KeyIterator {
KeyIterator { it: self }
KeyIterator { it: self, should_next: false }
}
}
pub struct KVIterator<'a> {
it: &'a IteratorPtr,
should_next: bool,
}
impl Iterator for KVIterator<'_> {
type Item = (UniquePtr<Slice>, UniquePtr<Slice>);
type Item = (SlicePtr, SlicePtr);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.it.is_valid() {
let ret = (self.it.key(), self.it.val());
self.next();
Some(ret)
} else {
None
if self.should_next {
self.it.next();
}
self.should_next = true;
self.it.pair()
}
}
pub struct KeyIterator<'a> {
it: &'a IteratorPtr,
should_next: bool,
}
impl Iterator for KeyIterator<'_> {
type Item = UniquePtr<Slice>;
type Item = SlicePtr;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.it.is_valid() {
let ret = self.it.key();
self.next();
Some(ret)
} else {
None
if self.should_next {
self.it.next();
}
self.should_next = true;
self.it.key()
}
}
@ -481,21 +497,21 @@ impl TransactionPtr {
status.check_err(())
}
#[inline]
pub fn get(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result<UniquePtr<PinnableSlice>> {
pub fn get(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result<SlicePtr> {
let mut status = BridgeStatus::default();
if transact {
let ret = self.get_txn(cf, key.as_ref(), &mut status);
status.check_err(ret)
status.check_err(SlicePtr::Pinnable(ret))
} else {
let ret = self.get_raw(cf, key.as_ref(), &mut status);
status.check_err(ret)
status.check_err(SlicePtr::Pinnable(ret))
}
}
#[inline]
pub fn get_for_update(&self, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result<UniquePtr<PinnableSlice>> {
pub fn get_for_update(&self, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result<SlicePtr> {
let mut status = BridgeStatus::default();
let ret = self.get_for_update_txn(cf, key.as_ref(), &mut status);
status.check_err(ret)
status.check_err(SlicePtr::Pinnable(ret))
}
#[inline]
pub fn del(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result<()> {

@ -94,14 +94,14 @@ impl Engine {
Some(h) => h.clone()
};
return Session {
Session {
engine: self,
stack_depth: 0,
txn: TransactionPtr::null(),
perm_cf: SharedPtr::null(),
temp_cf: SharedPtr::null(),
handle,
};
}
}
}
@ -119,8 +119,9 @@ pub struct Session<'a> {
impl<'a> Session<'a> {
pub fn start(&mut self) {
self.perm_cf = self.engine.db.default_cf();
let name = self.handle.read().unwrap().cf_ident.to_string();
self.temp_cf = self.engine.db.get_cf(name).unwrap();
assert!(!self.perm_cf.is_null());
self.temp_cf = self.engine.db.get_cf(&self.handle.read().unwrap().cf_ident).unwrap();
assert!(!self.temp_cf.is_null());
let t_options = match self.engine.options_store.t_options {
TDBOptions::Pessimistic(_) => {
TransactOptions::Pessimistic(PTxnOptionsPtr::default())
@ -129,8 +130,10 @@ impl<'a> Session<'a> {
TransactOptions::Optimistic(OTxnOptionsPtr::new(&self.engine.options_store.cmp))
}
};
let r_opts = ReadOptionsPtr::default();
let rx_opts = ReadOptionsPtr::default();
let mut r_opts = ReadOptionsPtr::default();
r_opts.set_total_order_seek(true);
let mut rx_opts = ReadOptionsPtr::default();
rx_opts.set_total_order_seek(true);
let w_opts = WriteOptionsPtr::default();
let mut wx_opts = WriteOptionsPtr::default();
wx_opts.set_disable_wal(true);
@ -159,8 +162,35 @@ pub enum SessionStatus {
#[cfg(test)]
mod tests {
use std::{fs, thread};
use crate::db::eval::Environment;
use crate::relation::table::DataKind::Value;
use crate::relation::tuple::Tuple;
use super::*;
#[test]
fn push_get() {
{
let engine = Engine::new("_push_get".to_string(), false).unwrap();
let mut sess = engine.session();
sess.start();
for i in (-80..-40).step_by(10) {
let mut ikey = Tuple::with_prefix(0);
ikey.push_int(i);
ikey.push_str("pqr");
println!("in {:?} {:?}", ikey, ikey.data);
sess.txn.put(false, &sess.temp_cf, &ikey, &ikey).unwrap();
println!("out {:?}", sess.txn.get(false, &sess.temp_cf, &ikey).unwrap().as_ref());
}
let it = sess.txn.iterator(false, &sess.temp_cf);
it.to_first();
for (key, val) in it.iter() {
println!("a: {:?} {:?}", key.as_ref(), val.as_ref());
println!("v: {:?} {:?}", Tuple::new(key), Tuple::new(val));
}
}
let _ = fs::remove_dir_all("_push_get");
}
#[test]
fn test_create() {
let p1 = "_test_db_create1";
@ -170,7 +200,7 @@ mod tests {
{
let engine = Engine::new(p1.to_string(), true);
assert!(engine.is_ok());
let engine = Engine::new(p2.to_string(), true);
let engine = Engine::new(p2.to_string(), false);
assert!(engine.is_ok());
let engine = Engine::new(p3.to_string(), true);
assert!(engine.is_ok());
@ -179,7 +209,7 @@ mod tests {
}
let engine2 = Engine::new(p2.to_string(), false);
assert!(engine2.is_ok());
let engine2 = Arc::new(Engine::new(p3.to_string(), false).unwrap());
let engine2 = Arc::new(Engine::new(p3.to_string(), true).unwrap());
{
for _i in 0..10 {
let mut _sess = engine2.session();
@ -196,12 +226,30 @@ mod tests {
let mut thread_handles = vec![];
println!("concurrent");
for i in 0..10 {
for i in 0..1 {
let engine = engine2.clone();
thread_handles.push(thread::spawn(move || {
println!("In thread {}", i);
let mut _sess = engine.session();
_sess.start();
let mut sess = engine.session();
sess.start();
for _ in 0..10000 {
sess.push_env();
sess.define_variable("abc", &"xyz".into(), true);
sess.define_variable("pqr", &"xyz".into(), false);
}
println!("pqr {:?}", sess.resolve("pqr"));
println!("uvw {:?}", sess.resolve("uvw"));
println!("aaa {:?}", sess.resolve("aaa"));
let it = sess.txn.iterator(false, &sess.temp_cf);
it.to_first();
// for (key, val) in it.iter() {
// println!("a: {:?} {:?}", key.as_ref(), val.as_ref());
// println!("v: {:?}", Tuple::new(key));
// }
for _ in 0..5000 {
sess.pop_env();
}
println!("pqr {:?}", sess.resolve("pqr"));
println!("In thread {} end", i);
}))
}

@ -1,31 +1,21 @@
use crate::db::engine::{Engine, Session};
use crate::relation::table::Table;
use crate::relation::tuple::Tuple;
use cozorocks::SlicePtr;
use crate::db::engine::{Session};
use crate::relation::table::{DataKind, Table};
use crate::relation::tuple::{Tuple};
use crate::relation::typing::Typing;
use crate::relation::value::Value;
pub trait Environment {
pub trait Environment<T: AsRef<[u8]>> {
fn push_env(&mut self);
fn pop_env(&mut self);
fn define_variable(&mut self, name: &str, val: &Value, in_root: bool);
fn define_type_alias(&mut self, name: &str, typ: &Typing, in_root: bool);
fn define_table(&mut self, table: &Table, in_root: bool);
fn resolve(&mut self, name: &str);
fn resolve(&mut self, name: &str) -> Option<Tuple<T>>;
fn delete_defined(&mut self, name: &str, in_root: bool);
}
#[repr(u8)]
enum DefinableTag {
Value = 1,
Typing = 2,
Node = 3,
Edge = 4,
Associate = 5,
Index = 6,
}
impl<'a> Session<'a> {
fn encode_definable_key(&self, name: &str, in_root: bool) -> Tuple<Vec<u8>> {
let depth_code = if in_root { 0 } else { self.stack_depth as i64 };
@ -37,7 +27,7 @@ impl<'a> Session<'a> {
}
impl<'a> Environment for Session<'a> {
impl<'a> Environment<SlicePtr> for Session<'a> {
fn push_env(&mut self) {
self.stack_depth -= 1;
}
@ -47,17 +37,40 @@ impl<'a> Environment for Session<'a> {
return;
}
// Remove all stuff starting with the stack depth from the temp session
let mut prefix = Tuple::with_prefix(0);
prefix.push_int(self.stack_depth as i64);
let it = self.txn.iterator(false, &self.temp_cf);
it.seek(&prefix);
for val in it.keys() {
let cur = Tuple::new(val);
if cur.starts_with(&prefix) {
let name = cur.get(1).unwrap();
let mut ikey = Tuple::with_prefix(0);
ikey.push_value(&name);
ikey.push_int(self.stack_depth as i64);
self.txn.del(false, &self.temp_cf, cur).unwrap();
self.txn.del(false, &self.temp_cf, ikey).unwrap();
} else {
break;
}
}
self.stack_depth += 1;
}
fn define_variable(&mut self, name: &str, val: &Value, in_root: bool) {
let key = self.encode_definable_key(name, in_root);
let mut data = Tuple::with_prefix(DataKind::Value as u32);
data.push_value(val);
if in_root {
todo!()
self.txn.put(true, &self.perm_cf, key, data).unwrap();
} else {
let key = self.encode_definable_key(name, in_root);
let mut data = Tuple::with_prefix(0);
data.push_uint(DefinableTag::Value as u8 as u64);
data.push_value(val);
let mut ikey = Tuple::with_prefix(0);
ikey.push_int(self.stack_depth as i64);
ikey.push_str(name);
self.txn.put(false, &self.temp_cf, key, data).unwrap();
self.txn.put(false, &self.temp_cf, ikey, "").unwrap();
}
}
@ -69,8 +82,27 @@ impl<'a> Environment for Session<'a> {
todo!()
}
fn resolve(&mut self, name: &str) {
todo!()
fn resolve(&mut self, name: &str) -> Option<Tuple<SlicePtr>> {
let mut tuple = Tuple::with_prefix(0);
tuple.push_str(name);
let it = self.txn.iterator(false, &self.temp_cf);
it.seek(&tuple);
match it.pair() {
None => {
None
}
Some((tk, vk)) => {
let k = Tuple::new(tk);
if k.starts_with(&tuple) {
println!("Resolved to key {:?}", k);
let vt = Tuple::new(vk);
// let v = vt.iter().collect::<Vec<_>>();
Some(vt)
} else {
None
}
}
}
}
fn delete_defined(&mut self, name: &str, in_root: bool) {

@ -1,18 +1,30 @@
use std::cmp::Ordering;
use crate::relation::tuple::Tuple;
impl<T: AsRef<[u8]>, T2: AsRef<[u8]>> PartialOrd<Tuple<T2>> for Tuple<T> {
fn partial_cmp(&self, other: &Tuple<T2>) -> Option<Ordering> {
match self.get_prefix().cmp(&other.get_prefix()) {
x @ (Ordering::Less | Ordering::Greater) => return Some(x),
Ordering::Equal => {}
}
Some(self.iter().cmp(other.iter()))
}
}
impl<T: AsRef<[u8]>> Ord for Tuple<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap()
}
}
pub fn compare(a: &[u8], b: &[u8]) -> i8 {
let ta = Tuple::new(a);
let tb = Tuple::new(b);
match ta.get_prefix().cmp(&tb.get_prefix()) {
Ordering::Less => return -1,
Ordering::Greater => return 1,
Ordering::Equal => {}
}
match ta.iter().cmp(tb.iter()) {
match ta.cmp(&tb) {
Ordering::Less => -1,
Ordering::Equal => 0,
Ordering::Greater => 1
Ordering::Greater => 1,
Ordering::Equal => 0
}
}

@ -1,8 +1,9 @@
use crate::relation::typing::Typing;
#[repr(u8)]
#[repr(u32)]
#[derive(Ord, PartialOrd, Eq, PartialEq)]
pub enum DataKind {
DataTuple = 0,
Node = 1,
Edge = 2,
Associate = 3,

@ -1,11 +1,12 @@
use std::borrow::{Cow};
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::hash::{Hash, Hasher};
use uuid::Uuid;
use crate::relation::value::{Tag, Value};
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct Tuple<T>
where T: AsRef<[u8]>
{
@ -22,6 +23,11 @@ impl<T> AsRef<[u8]> for Tuple<T> where T: AsRef<[u8]> {
const PREFIX_LEN: usize = 4;
impl<T: AsRef<[u8]>> Tuple<T> {
#[inline]
pub fn starts_with<T2: AsRef<[u8]>>(&self, other: &Tuple<T2>) -> bool {
self.data.as_ref().starts_with(other.data.as_ref())
}
#[inline]
pub fn new(data: T) -> Self {
Self {
@ -66,7 +72,7 @@ impl<T: AsRef<[u8]>> Tuple<T> {
let data = self.data.as_ref();
let tag_start = *self.idx_cache.borrow().last().unwrap_or(&PREFIX_LEN);
let start = tag_start + 1;
let nxt = match Tag::from(data[tag_start]) {
let nxt = match Tag::try_from(data[tag_start]).unwrap() {
Tag::Null | Tag::BoolTrue | Tag::BoolFalse => start,
Tag::Int | Tag::UInt => start + self.parse_varint(start).1,
Tag::Float => start + 8,
@ -77,7 +83,7 @@ impl<T: AsRef<[u8]>> Tuple<T> {
start + slen + offset
}
Tag::List => start + u32::from_be_bytes(data[start..start + 4].try_into().unwrap()) as usize,
Tag::Dict => start + u32::from_be_bytes(data[start..start + 4].try_into().unwrap()) as usize
Tag::Dict => start + u32::from_be_bytes(data[start..start + 4].try_into().unwrap()) as usize,
};
self.idx_cache.borrow_mut().push(nxt);
}
@ -120,7 +126,11 @@ impl<T: AsRef<[u8]>> Tuple<T> {
fn parse_value_at(&self, pos: usize) -> (Value, usize) {
let data = self.data.as_ref();
let start = pos + 1;
let (nxt, val): (usize, Value) = match Tag::from(data[pos]) {
let tag = match Tag::try_from(data[pos]) {
Ok(t) => t,
Err(e) => panic!("Cannot parse tag {} for {:?}", e, data)
};
let (nxt, val): (usize, Value) = match tag {
Tag::Null => (start, ().into()),
Tag::BoolTrue => (start, true.into()),
Tag::BoolFalse => (start, false.into()),
@ -187,6 +197,12 @@ impl<T: AsRef<[u8]>> Tuple<T> {
}
}
impl<T: AsRef<[u8]>> Debug for Tuple<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.iter().collect::<Vec<_>>())
}
}
pub struct TupleIter<'a, T: AsRef<[u8]>> {
tuple: &'a Tuple<T>,
pos: usize,
@ -280,9 +296,7 @@ impl Tuple<Vec<u8>> {
}
let length = (self.data.len() - start_pos) as u32;
let length_bytes = length.to_be_bytes();
for i in 0..4 {
self.data[start_pos + i] = length_bytes[i]
}
self.data[start_pos..(4 + start_pos)].clone_from_slice(&length_bytes[..4]);
let mut cache = self.idx_cache.borrow_mut();
cache.truncate(start_len);
cache.push(self.data.len());
@ -299,9 +313,7 @@ impl Tuple<Vec<u8>> {
}
let length = (self.data.len() - start_pos) as u32;
let length_bytes = length.to_be_bytes();
for i in 0..4 {
self.data[start_pos + i] = length_bytes[i]
}
self.data[start_pos..(4 + start_pos)].clone_from_slice(&length_bytes[..4]);
let mut cache = self.idx_cache.borrow_mut();
cache.truncate(start_len);
cache.push(self.data.len());
@ -347,14 +359,14 @@ impl <'a> Extend<Value<'a>> for Tuple<Vec<u8>> {
}
}
impl<T: AsRef<[u8]>> PartialEq for Tuple<T> {
impl<T: AsRef<[u8]>, T2: AsRef<[u8]>> PartialEq<Tuple<T2>> for Tuple<T> {
#[inline]
fn eq(&self, other: &Self) -> bool {
fn eq(&self, other: &Tuple<T2>) -> bool {
self.data.as_ref() == other.data.as_ref()
}
}
impl <T: AsRef<[u8]>> Hash for Tuple<T> {
impl<T: AsRef<[u8]>> Hash for Tuple<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.data.as_ref().hash(state);
}
@ -479,4 +491,12 @@ mod tests {
println!("{:?}", v);
}
*/
#[test]
fn particular() {
let mut v = Tuple::with_prefix(0);
v.push_str("pqr");
v.push_int(-64);
println!("{:?} {:?}", v, v.data);
}
}

@ -7,16 +7,16 @@ use uuid::Uuid;
#[repr(u8)]
#[derive(Ord, PartialOrd, Eq, PartialEq)]
pub enum Tag {
BoolFalse = 0,
BoolFalse = 1,
Null = 2,
BoolTrue = 4,
Int = 11,
Float = 13,
Text = 15,
Uuid = 17,
UInt = 21,
List = 101,
Dict = 103,
BoolTrue = 3,
Int = 4,
Float = 5,
Text = 6,
Uuid = 7,
UInt = 8,
List = 9,
Dict = 10,
// Timestamp = 23,
// Datetime = 25,
// Timezone = 27,
@ -44,23 +44,24 @@ pub enum Tag {
// C128Arr = 74,
}
impl From<u8> for Tag {
impl TryFrom<u8> for Tag {
type Error = u8;
#[inline]
fn from(u: u8) -> Self {
fn try_from(u: u8) -> std::result::Result<Tag, u8> {
use self::Tag::*;
match u {
0 => BoolFalse,
Ok(match u {
1 => BoolFalse,
2 => Null,
4 => BoolTrue,
11 => Int,
13 => Float,
15 => Text,
17 => Uuid,
21 => UInt,
101 => List,
103 => Dict,
_ => panic!("Unexpected value tag {}", u)
}
3 => BoolTrue,
4 => Int,
5 => Float,
6 => Text,
7 => Uuid,
8 => UInt,
9 => List,
10 => Dict,
v => return Err(v)
})
}
}
@ -93,7 +94,7 @@ impl<'a> Value<'a> {
Value::List(l) => l.into_iter().map(|v| v.to_static()).collect::<Vec<StaticValue>>().into(),
Value::Dict(d) => d.into_iter()
.map(|(k, v)| (Cow::Owned(k.into_owned()), v.to_static()))
.collect::<BTreeMap<Cow<'static, str>, StaticValue>>().into()
.collect::<BTreeMap<Cow<'static, str>, StaticValue>>().into(),
}
}
}

Loading…
Cancel
Save