associate iterators

main
Ziyang Hu 2 years ago
parent bfda892965
commit 06daecfe18

@ -3,7 +3,7 @@ use pest::iterators::{Pair, Pairs};
use cozorocks::SlicePtr;
use crate::db::engine::Session;
use crate::db::table::TableId;
use crate::relation::tuple::{OwnTuple, Tuple};
use crate::relation::tuple::{OwnTuple, SliceTuple, Tuple};
use crate::relation::typing::Typing;
use crate::parser::Rule;
use crate::error::{CozoError, Result};
@ -263,7 +263,7 @@ impl<'s> Session<'s> {
Ok(())
}
pub fn table_data(&self, id: i64, in_root: bool) -> Result<Option<Tuple<SlicePtr>>> {
pub fn table_data(&self, id: i64, in_root: bool) -> Result<Option<SliceTuple>> {
let mut key = Tuple::with_null_prefix();
key.push_bool(true);
key.push_int(id);
@ -276,7 +276,7 @@ impl<'s> Session<'s> {
}
}
pub fn resolve_related_tables(&self, name: &str) -> Result<Vec<(String, Tuple<SlicePtr>)>> {
pub fn resolve_related_tables(&self, name: &str) -> Result<Vec<(String, SliceTuple)>> {
let mut prefix = Tuple::with_prefix(0);
prefix.push_null();
prefix.push_str(name);

@ -10,7 +10,7 @@ use uuid::v1::{Context, Timestamp};
use rand::Rng;
use crate::error::{CozoError, Result};
use crate::error::CozoError::{Poisoned, SessionErr};
use crate::relation::tuple::Tuple;
use crate::relation::tuple::{PREFIX_LEN, Tuple};
pub struct EngineOptions {
cmp: RustComparatorPtr,
@ -49,7 +49,7 @@ impl Engine {
.set_create_if_missing(true)
.set_paranoid_checks(false)
.set_bloom_filter(10., true)
.set_fixed_prefix_extractor(4);
.set_fixed_prefix_extractor(PREFIX_LEN);
let mut rng = rand::thread_rng();
let uuid_ctx = Context::new(rng.gen());

@ -3,7 +3,7 @@ use crate::db::engine::Session;
use crate::relation::value::Value;
use crate::error::{CozoError, Result};
use crate::relation::data::DataKind;
use crate::relation::tuple::{OwnTuple, Tuple};
use crate::relation::tuple::{OwnTuple, SliceTuple, Tuple};
/// # layouts for sector 0
///
@ -173,7 +173,7 @@ impl<'s> Session<'s> {
Ok(())
}
pub fn resolve(&self, name: &str) -> Result<Option<Tuple<SlicePtr>>> {
pub fn resolve(&self, name: &str) -> Result<Option<SliceTuple>> {
let mut tuple = Tuple::with_null_prefix();
tuple.push_str(name);
let it = self.txn.iterator(false, &self.temp_cf);

@ -9,12 +9,12 @@ use crate::db::table::TableId;
use crate::relation::value::{Value};
use crate::error::{CozoError, Result};
use crate::error::CozoError::{InvalidArgument, LogicError};
use crate::relation::tuple::{Tuple};
use crate::relation::tuple::{SliceTuple, Tuple};
use crate::relation::value;
impl<'s> Session<'s> {
pub fn tuple_eval<'a>(&self, value: &'a Value<'a>, tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
pub fn tuple_eval<'a>(&self, value: &'a Value<'a>, tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let res: Value = match value {
v @ (Value::Null |
Value::Bool(_) |
@ -274,7 +274,7 @@ impl<'s> Session<'s> {
}
impl<'s> Session<'s> {
fn coalesce_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn coalesce_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
for v in args {
match self.tuple_eval(v, tuples)? {
Value::Null => {}
@ -318,7 +318,7 @@ impl<'s> Session<'s> {
}
}
fn str_cat_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn str_cat_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut ret = String::new();
for v in args {
let v = self.tuple_eval(v, tuples)?;
@ -348,7 +348,7 @@ impl<'s> Session<'s> {
})
}
fn add_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn add_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -387,7 +387,7 @@ impl<'s> Session<'s> {
})
}
fn sub_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn sub_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -426,7 +426,7 @@ impl<'s> Session<'s> {
})
}
fn minus_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn minus_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let left = self.tuple_eval(args.get(0).unwrap(), tuples)?;
Ok(match left {
Value::Int(l) => (-l).into(),
@ -452,7 +452,7 @@ impl<'s> Session<'s> {
})
}
fn negate_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn negate_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let left = self.tuple_eval(args.get(0).unwrap(), tuples)?;
Ok(match left {
Value::Bool(l) => (!l).into(),
@ -476,7 +476,7 @@ impl<'s> Session<'s> {
})
}
fn is_null_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn is_null_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let left = self.tuple_eval(args.get(0).unwrap(), tuples)?;
Ok((left == Value::Null).into())
}
@ -494,7 +494,7 @@ impl<'s> Session<'s> {
Ok((true, false.into()))
}
fn not_null_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn not_null_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let left = self.tuple_eval(args.get(0).unwrap(), tuples)?;
Ok((left != Value::Null).into())
}
@ -512,7 +512,7 @@ impl<'s> Session<'s> {
Ok((true, true.into()))
}
fn pow_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn pow_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -551,7 +551,7 @@ impl<'s> Session<'s> {
})
}
fn gt_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn gt_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -590,7 +590,7 @@ impl<'s> Session<'s> {
})
}
fn lt_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn lt_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -629,7 +629,7 @@ impl<'s> Session<'s> {
})
}
fn ge_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn ge_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -668,7 +668,7 @@ impl<'s> Session<'s> {
})
}
fn le_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn le_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -707,7 +707,7 @@ impl<'s> Session<'s> {
})
}
fn mod_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn mod_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -740,7 +740,7 @@ impl<'s> Session<'s> {
})
}
fn mul_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn mul_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -779,7 +779,7 @@ impl<'s> Session<'s> {
})
}
fn div_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn div_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -818,7 +818,7 @@ impl<'s> Session<'s> {
})
}
fn eq_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn eq_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -845,7 +845,7 @@ impl<'s> Session<'s> {
Ok((true, (left == right).into()))
}
fn ne_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn ne_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut args = args.into_iter();
let left = self.tuple_eval(args.next().unwrap(), tuples)?;
if left == Value::Null {
@ -872,7 +872,7 @@ impl<'s> Session<'s> {
Ok((true, (left != right).into()))
}
fn or_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn or_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut accum = -1;
for v in args.into_iter() {
let v = self.tuple_eval(v, tuples)?;
@ -957,7 +957,7 @@ impl<'s> Session<'s> {
}
}
fn concat_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn concat_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut coll = vec![];
for v in args.into_iter() {
let v = self.tuple_eval(v, tuples)?;
@ -1012,7 +1012,7 @@ impl<'s> Session<'s> {
}
}
fn merge_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn merge_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut coll = BTreeMap::new();
for v in args.into_iter() {
let v = self.tuple_eval(v, tuples)?;
@ -1067,7 +1067,7 @@ impl<'s> Session<'s> {
}
}
fn and_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(Tuple<SlicePtr>, Tuple<SlicePtr>)]) -> Result<Value<'a>> {
fn and_values<'a>(&self, args: &'a [Value<'a>], tuples: &'a [(SliceTuple, SliceTuple)]) -> Result<Value<'a>> {
let mut accum = 1;
for v in args.into_iter() {
let v = self.tuple_eval(v, tuples)?;

@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use std::{iter, mem};
use pest::iterators::Pair;
use cozorocks::{IteratorPtr, SlicePtr};
use crate::db::engine::Session;
@ -7,7 +8,7 @@ use crate::db::table::{ColId, TableId, TableInfo};
use crate::relation::value::{StaticValue, Value};
use crate::parser::Rule;
use crate::error::Result;
use crate::relation::tuple::{OwnTuple, Tuple};
use crate::relation::tuple::{OwnTuple, SliceTuple, Tuple};
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum QueryPlan {
@ -139,12 +140,7 @@ impl<'a> Session<'a> {
} else {
self.txn.iterator(false, &self.temp_cf)
};
let prefix = OwnTuple::with_prefix(tid.id as u32);
it.seek(prefix);
TableRowIterator {
it,
started: false,
}
TableRowIterator::new(it, tid.id as u32)
}
}
@ -153,8 +149,19 @@ pub struct TableRowIterator<'a> {
started: bool,
}
impl<'a> TableRowIterator<'a> {
pub fn new(it: IteratorPtr<'a>, prefix: u32) -> Self {
let prefix = OwnTuple::with_prefix(prefix);
it.seek(prefix);
Self {
it,
started: false,
}
}
}
impl<'a> Iterator for TableRowIterator<'a> {
type Item = (Tuple<SlicePtr>, Tuple<SlicePtr>);
type Item = (SliceTuple, SliceTuple);
fn next(&mut self) -> Option<Self::Item> {
if self.started {
@ -166,6 +173,61 @@ impl<'a> Iterator for TableRowIterator<'a> {
}
}
pub struct TableRowWithAssociatesIterator<'a> {
main: TableRowIterator<'a>,
associates: Vec<TableRowIterator<'a>>,
buffer: Vec<Option<(SliceTuple, SliceTuple)>>,
}
impl<'a> TableRowWithAssociatesIterator<'a> {
pub fn new(main: TableRowIterator<'a>, associates: Vec<TableRowIterator<'a>>) -> Self {
// ceremonial because type is not copy
let buffer = iter::repeat_with(|| None).take(associates.len()).collect();
Self { main, associates, buffer }
}
}
impl<'a> Iterator for TableRowWithAssociatesIterator<'a> {
type Item = (SliceTuple, SliceTuple, Vec<Option<SliceTuple>>);
fn next(&mut self) -> Option<Self::Item> {
match self.main.next() {
None => None,
Some((k, v)) => {
let mut assoc_vals: Vec<Option<SliceTuple>> = iter::repeat_with(|| None).take(self.associates.len()).collect();
let l = assoc_vals.len();
for i in 0..l {
let cached = self.buffer.get(i).unwrap();
let cached = if matches!(cached, None) {
self.buffer[i] = self.associates.get_mut(i).unwrap().next();
self.buffer.get(i).unwrap()
} else {
cached
};
if let Some((ck, _)) = cached {
if k.key_part_eq(ck) {
let (_, v) = mem::replace(&mut self.buffer[i], None).unwrap();
assoc_vals[i] = Some(v)
}
}
}
Some((k, v, assoc_vals))
}
}
}
}
pub struct CartesianProductIterator<'a> {
left: TableRowIterator<'a>,
right: TableRowIterator<'a>,
}
pub struct EquiJoinIterator<'a> {
left: TableRowIterator<'a>,
right: TableRowIterator<'a>,
outer_left: bool,
outer_right: bool,
}
pub struct NodeRowIterator {}
@ -179,6 +241,7 @@ mod tests {
use crate::db::engine::Engine;
use crate::parser::{Parser, Rule};
use pest::Parser as PestParser;
use crate::db::plan::TableRowWithAssociatesIterator;
use crate::db::query::FromEl;
use crate::db::table::TableId;
use crate::relation::value::Value;
@ -268,6 +331,18 @@ mod tests {
let duration = start.elapsed();
let duration2 = start2.elapsed();
println!("Time elapsed {:?} {:?}", duration, duration2);
let a = sess.iter_table(tbl);
let mut b = sess.iter_table(tbl);
let mut c = sess.iter_table(tbl);
for _ in 0..5 {
b.next();
c.next();
c.next();
}
let it = TableRowWithAssociatesIterator::new(a, vec![b, c]);
for el in it {
println!("{:?}", el);
}
}
drop(engine);
let _ = fs::remove_dir_all(db_path);

@ -4,6 +4,7 @@ use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::hash::{Hash, Hasher};
use uuid::Uuid;
use cozorocks::SlicePtr;
use crate::db::table::{ColId, TableId};
use crate::relation::data::DataKind;
use crate::relation::value::{Tag, Value};
@ -23,8 +24,9 @@ impl<T> AsRef<[u8]> for Tuple<T> where T: AsRef<[u8]> {
}
pub type OwnTuple = Tuple<Vec<u8>>;
pub type SliceTuple = Tuple<SlicePtr>;
const PREFIX_LEN: usize = 4;
pub const PREFIX_LEN: usize = 4;
impl<T: AsRef<[u8]>> Tuple<T> {
#[inline]
@ -32,6 +34,11 @@ impl<T: AsRef<[u8]>> Tuple<T> {
self.data.as_ref().starts_with(other.data.as_ref())
}
#[inline]
pub fn key_part_eq<T2: AsRef<[u8]>>(&self, other: &Tuple<T2>) -> bool {
self.data.as_ref()[PREFIX_LEN..] == other.data.as_ref()[PREFIX_LEN..]
}
#[inline]
pub fn new(data: T) -> Self {
Self {

Loading…
Cancel
Save