cartesian product iterator

main
Ziyang Hu 2 years ago
parent 06daecfe18
commit 967c27f56a

@ -11,6 +11,8 @@ use crate::parser::text_identifier::build_name_in_def;
use crate::relation::data::DataKind;
use crate::relation::value::Value;
const STORAGE_ID_START: i64 = 10000;
impl<'s> Session<'s> {
pub fn encode_definable_key(&self, name: &str, in_root: bool) -> OwnTuple {
let depth_code = if in_root { 0 } else { self.get_stack_depth() as i64 };
@ -246,7 +248,7 @@ impl<'s> Session<'s> {
} else {
panic!("Unexpected value in storage id");
}
} else { 0 };
} else { STORAGE_ID_START };
let mut new_data = Tuple::with_null_prefix();
new_data.push_int(u + 1);
if in_root {

@ -2,7 +2,7 @@ use cozorocks::SlicePtr;
use crate::db::engine::Session;
use crate::relation::value::Value;
use crate::error::{CozoError, Result};
use crate::relation::data::DataKind;
use crate::relation::data::{DataKind, EMPTY_DATA};
use crate::relation::tuple::{OwnTuple, SliceTuple, Tuple};
/// # layouts for sector 0
@ -31,7 +31,7 @@ impl<'s> Session<'s> {
ikey.push_int(self.stack_depth as i64);
ikey.push_str(name);
self.txn.put(false, &self.temp_cf, key, data)?;
self.txn.put(false, &self.temp_cf, ikey, "")?;
self.txn.put(false, &self.temp_cf, ikey, EMPTY_DATA)?;
}
Ok(())
}
@ -50,7 +50,7 @@ impl<'s> Session<'s> {
if in_root {
match value {
None => {
self.txn.put(true, &self.perm_cf, key, "")?;
self.txn.put(true, &self.perm_cf, key, EMPTY_DATA)?;
}
Some(v) => {
self.txn.put(true, &self.perm_cf, key, &v)?;
@ -59,7 +59,7 @@ impl<'s> Session<'s> {
} else {
match value {
None => {
self.txn.put(false, &self.temp_cf, key, "")?;
self.txn.put(false, &self.temp_cf, key, EMPTY_DATA)?;
}
Some(v) => {
self.txn.put(false, &self.temp_cf, key, &v)?;

@ -1,5 +1,7 @@
use std::collections::BTreeMap;
use std::{iter, mem};
use std::vec::IntoIter;
use chrono::format::Item;
use pest::iterators::Pair;
use cozorocks::{IteratorPtr, SlicePtr};
use crate::db::engine::Session;
@ -8,7 +10,9 @@ use crate::db::table::{ColId, TableId, TableInfo};
use crate::relation::value::{StaticValue, Value};
use crate::parser::Rule;
use crate::error::Result;
use crate::relation::tuple::{OwnTuple, SliceTuple, Tuple};
use crate::relation::data::EMPTY_DATA;
use crate::relation::table::MegaTuple;
use crate::relation::tuple::{CowSlice, CowTuple, OwnTuple, SliceTuple, Tuple};
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum QueryPlan {
@ -134,32 +138,47 @@ impl<'a> Session<'a> {
Ok(QueryPlan::Projection { arg: Box::new(plan), projection: select_data })
}
pub fn iter_table(&self, tid: TableId) -> TableRowIterator {
pub fn iter_table(&self, tid: TableId) -> TableRowIterable {
let it = if tid.in_root {
self.txn.iterator(true, &self.perm_cf)
} else {
self.txn.iterator(false, &self.temp_cf)
};
TableRowIterator::new(it, tid.id as u32)
TableRowIterable::new(it, tid.id as u32)
}
}
pub struct TableRowIterator<'a> {
pub struct TableRowIterable<'a> {
it: IteratorPtr<'a>,
started: bool,
prefix: u32,
}
impl<'a> TableRowIterator<'a> {
impl<'a> TableRowIterable<'a> {
pub fn new(it: IteratorPtr<'a>, prefix: u32) -> Self {
let prefix = OwnTuple::with_prefix(prefix);
it.seek(prefix);
Self {
it,
started: false,
prefix,
}
}
}
impl<'a> IntoIterator for &'a TableRowIterable<'a> {
type Item = (SliceTuple, SliceTuple);
type IntoIter = TableRowIterator<'a>;
fn into_iter(self) -> Self::IntoIter {
let prefix_tuple = OwnTuple::with_prefix(self.prefix);
self.it.seek(prefix_tuple);
Self::IntoIter { it: &self.it, started: false }
}
}
pub struct TableRowIterator<'a> {
it: &'a IteratorPtr<'a>,
started: bool,
}
impl<'a> Iterator for TableRowIterator<'a> {
type Item = (SliceTuple, SliceTuple);
@ -173,28 +192,45 @@ impl<'a> Iterator for TableRowIterator<'a> {
}
}
pub struct TableRowWithAssociatesIterable<'a> {
main: TableRowIterable<'a>,
associates: Vec<TableRowIterable<'a>>,
}
impl<'a> TableRowWithAssociatesIterable<'a> {
pub fn new(main: TableRowIterable<'a>, associates: Vec<TableRowIterable<'a>>) -> Self {
Self { main, associates }
}
}
pub struct TableRowWithAssociatesIterator<'a> {
main: TableRowIterator<'a>,
associates: Vec<TableRowIterator<'a>>,
buffer: Vec<Option<(SliceTuple, SliceTuple)>>,
}
impl<'a> TableRowWithAssociatesIterator<'a> {
pub fn new(main: TableRowIterator<'a>, associates: Vec<TableRowIterator<'a>>) -> Self {
// ceremonial because type is not copy
let buffer = iter::repeat_with(|| None).take(associates.len()).collect();
Self { main, associates, buffer }
impl<'a> IntoIterator for &'a TableRowWithAssociatesIterable<'a> {
type Item = MegaTuple;
type IntoIter = TableRowWithAssociatesIterator<'a>;
fn into_iter(self) -> Self::IntoIter {
Self::IntoIter {
main: (&self.main).into_iter(),
associates: self.associates.iter().map(|v| v.into_iter()).collect(),
buffer: iter::repeat_with(|| None).take(self.associates.len()).collect(),
}
}
}
impl<'a> Iterator for TableRowWithAssociatesIterator<'a> {
type Item = (SliceTuple, SliceTuple, Vec<Option<SliceTuple>>);
type Item = MegaTuple;
fn next(&mut self) -> Option<Self::Item> {
match self.main.next() {
None => None,
Some((k, v)) => {
let mut assoc_vals: Vec<Option<SliceTuple>> = iter::repeat_with(|| None).take(self.associates.len()).collect();
let l = self.associates.len();
let mut assoc_vals: Vec<Option<CowTuple>> = iter::repeat_with(|| None).take(l).collect();
let l = assoc_vals.len();
for i in 0..l {
let cached = self.buffer.get(i).unwrap();
@ -207,31 +243,93 @@ impl<'a> Iterator for TableRowWithAssociatesIterator<'a> {
if let Some((ck, _)) = cached {
if k.key_part_eq(ck) {
let (_, v) = mem::replace(&mut self.buffer[i], None).unwrap();
assoc_vals[i] = Some(v)
assoc_vals[i] = Some(v.into())
}
}
}
Some((k, v, assoc_vals))
let mut vals: Vec<CowTuple> = Vec::with_capacity(assoc_vals.len());
vals.push(v.into());
vals.extend(assoc_vals.into_iter().map(|v|
match v {
None => {
CowTuple::new(CowSlice::Own(EMPTY_DATA.into()))
}
Some(v) => v
}));
Some(MegaTuple {
keys: vec![k.into()],
vals,
})
}
}
}
}
pub struct CartesianProductIterator<'a> {
left: TableRowIterator<'a>,
right: TableRowIterator<'a>,
pub struct CartesianProductIterable<A, B> {
left: A,
right: B,
}
pub struct EquiJoinIterator<'a> {
left: TableRowIterator<'a>,
right: TableRowIterator<'a>,
outer_left: bool,
outer_right: bool,
impl<'a, A, B, AI, BI> IntoIterator for &'a CartesianProductIterable<A, B>
where &'a A: IntoIterator<Item=MegaTuple, IntoIter=AI>,
&'a B: IntoIterator<Item=MegaTuple, IntoIter=BI>,
AI: Iterator<Item=MegaTuple>,
BI: Iterator<Item=MegaTuple> {
type Item = MegaTuple;
type IntoIter = CartesianProductIterator<'a, A, B, AI, BI>;
fn into_iter(self) -> Self::IntoIter {
let mut left = (&self.left).into_iter();
let left_cache = left.next();
Self::IntoIter {
left_source: &self.left,
right_source: &self.right,
left,
right: (&self.right).into_iter(),
left_cache,
}
}
}
pub struct CartesianProductIterator<'a, A, B, AI, BI>
where &'a A: IntoIterator<Item=MegaTuple, IntoIter=AI>, &'a B: IntoIterator<Item=MegaTuple, IntoIter=BI> {
left_source: &'a A,
right_source: &'a B,
left: AI,
right: BI,
left_cache: Option<MegaTuple>,
}
pub struct NodeRowIterator {}
impl<'a, A, B, AI, BI> Iterator for CartesianProductIterator<'a, A, B, AI, BI>
where &'a A: IntoIterator<Item=MegaTuple, IntoIter=AI>,
&'a B: IntoIterator<Item=MegaTuple, IntoIter=BI>,
AI: Iterator<Item=MegaTuple>,
BI: Iterator<Item=MegaTuple> {
type Item = MegaTuple;
pub struct EdgeRowIterator {}
fn next(&mut self) -> Option<Self::Item> {
match &self.left_cache {
None => None,
Some(t) => {
match self.right.next() {
Some(t2) => {
let mut keys = t.keys.clone();
keys.extend(t2.keys);
let mut vals = t.vals.clone();
vals.extend(t2.vals);
Some(MegaTuple {keys, vals})
}
None => {
self.left_cache = self.left.next();
self.right = self.right_source.into_iter();
self.next()
}
}
}
}
}
}
#[cfg(test)]
mod tests {
@ -241,7 +339,7 @@ mod tests {
use crate::db::engine::Engine;
use crate::parser::{Parser, Rule};
use pest::Parser as PestParser;
use crate::db::plan::TableRowWithAssociatesIterator;
use crate::db::plan::{CartesianProductIterable, TableRowWithAssociatesIterable, TableRowWithAssociatesIterator};
use crate::db::query::FromEl;
use crate::db::table::TableId;
use crate::relation::value::Value;
@ -314,7 +412,7 @@ mod tests {
println!("{:?}", rel_tbls);
let tbl = rel_tbls.pop().unwrap();
for (k, v) in sess.iter_table(tbl) {
for (k, v) in &sess.iter_table(tbl) {
let tpair = [(k, v)];
match sess.tuple_eval(&where_vals, &tpair).unwrap() {
Value::Bool(true) => {
@ -332,17 +430,40 @@ mod tests {
let duration2 = start2.elapsed();
println!("Time elapsed {:?} {:?}", duration, duration2);
let a = sess.iter_table(tbl);
let mut b = sess.iter_table(tbl);
let mut c = sess.iter_table(tbl);
for _ in 0..5 {
b.next();
c.next();
c.next();
let b = sess.iter_table(tbl);
let c = sess.iter_table(tbl);
let it = TableRowWithAssociatesIterable::new(a, vec![b, c]);
{
for el in &it {
println!("{:?}", el);
}
}
let it = TableRowWithAssociatesIterator::new(a, vec![b, c]);
for el in it {
println!("{:?}", el);
println!("XXXXX");
{
for el in &it {
println!("{:?}", el);
}
}
let a = sess.iter_table(tbl);
let a = TableRowWithAssociatesIterable::new(a, vec![]);
let b = sess.iter_table(tbl);
let b = TableRowWithAssociatesIterable::new(b, vec![]);
let c_it = CartesianProductIterable {left: a, right: b};
let c = sess.iter_table(tbl);
let c = TableRowWithAssociatesIterable::new(c, vec![]);
let c_it = CartesianProductIterable {left: c, right: c_it};
let start = Instant::now();
println!("Now cartesian product");
for (i, el) in (&c_it).into_iter().enumerate() {
if i % 1024 == 0 {
println!("{}: {:?}", i, el)
}
}
let duration = start.elapsed();
println!("Time elapsed {:?}", duration);
}
drop(engine);
let _ = fs::remove_dir_all(db_path);

@ -13,10 +13,12 @@ pub enum DataKind {
Index = 4,
Val = 5,
Type = 6,
Empty = u32::MAX,
}
// In storage, key layout is `[0, name, stack_depth]` where stack_depth is a non-positive number as zigzag
// Also has inverted index `[0, stack_depth, name]` for easy popping of stacks
pub const EMPTY_DATA: [u8; 4] = u32::MAX.to_be_bytes();
impl<T: AsRef<[u8]>> Tuple<T> {
pub fn data_kind(&self) -> Result<DataKind> {
@ -29,6 +31,7 @@ impl<T: AsRef<[u8]>> Tuple<T> {
4 => Index,
5 => Val,
6 => Type,
u32::MAX => Empty,
v => return Err(CozoError::UndefinedDataKind(v))
})
}

@ -1,3 +1,4 @@
use crate::relation::tuple::{CowSlice, CowTuple};
use crate::relation::typing::Typing;
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
@ -38,3 +39,9 @@ pub enum Table {
stored: StoredRelation,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MegaTuple {
pub keys: Vec<CowTuple>,
pub vals: Vec<CowTuple>
}

@ -23,6 +23,63 @@ impl<T> AsRef<[u8]> for Tuple<T> where T: AsRef<[u8]> {
}
}
pub enum CowSlice {
Ptr(SlicePtr),
Own(Vec<u8>),
}
impl From<SlicePtr> for CowSlice {
fn from(p: SlicePtr) -> Self {
CowSlice::Ptr(p)
}
}
impl From<Vec<u8>> for CowSlice {
fn from(v: Vec<u8>) -> Self {
CowSlice::Own(v)
}
}
impl Clone for CowSlice {
fn clone(&self) -> Self {
match self {
CowSlice::Ptr(p) => { CowSlice::Own(p.as_ref().to_vec()) }
CowSlice::Own(o) => { CowSlice::Own(o.clone()) }
}
}
}
impl AsRef<[u8]> for CowSlice {
fn as_ref(&self) -> &[u8] {
match self {
CowSlice::Ptr(s) => s.as_ref(),
CowSlice::Own(o) => o.as_ref()
}
}
}
impl From<SliceTuple> for CowTuple {
fn from(s: SliceTuple) -> Self {
Tuple::new(CowSlice::Ptr(s.data))
}
}
impl From<OwnTuple> for CowTuple {
fn from(o: OwnTuple) -> Self {
Tuple::new(CowSlice::Own(o.data))
}
}
impl CowTuple {
pub fn to_owned(self) -> OwnTuple {
match self.data {
CowSlice::Ptr(p) => OwnTuple::new(p.as_ref().to_vec()),
CowSlice::Own(o) => OwnTuple::new(o)
}
}
}
pub type CowTuple = Tuple<CowSlice>;
pub type OwnTuple = Tuple<Vec<u8>>;
pub type SliceTuple = Tuple<SlicePtr>;
@ -359,12 +416,9 @@ impl<T: AsRef<[u8]>> Tuple<T> {
impl<T: AsRef<[u8]>> Debug for Tuple<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.data.as_ref().is_empty() {
return write!(f, "Empty");
}
match self.data_kind() {
Ok(data_kind) => {
write!(f, "Tuple<{}:{:?}>{{", self.get_prefix(), data_kind)?;
write!(f, "Tuple<{:?}>{{", data_kind)?;
}
Err(_) => {
write!(f, "Tuple<{}>{{", self.get_prefix())?;

Loading…
Cancel
Save