From c74f0a4810a24b2526348ffdf6db7e45a3b56006 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Fri, 13 May 2022 17:10:23 +0800 Subject: [PATCH] limiter --- src/db/iterator.rs | 34 ++++++++++++++++++++++++++++++++-- src/db/plan.rs | 26 +++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/db/iterator.rs b/src/db/iterator.rs index 86db454d..f050c068 100644 --- a/src/db/iterator.rs +++ b/src/db/iterator.rs @@ -22,6 +22,36 @@ pub enum NodeEdgeChainKind { Bidi, } +pub struct LimiterIterator<'a> { + pub(crate) source: Box> + 'a>, + pub(crate) limit: usize, + pub(crate) offset: usize, + pub(crate) current: usize, +} + +impl<'a> Iterator for LimiterIterator<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + match self.source.next() { + None => return None, + Some(Err(e)) => return Some(Err(e)), + Some(Ok(val)) => { + self.current += 1; + if self.current <= self.offset { + continue; + } else if self.current > self.limit + self.offset { + return None; + } else { + return Some(Ok(val)); + } + } + } + } + } +} + pub struct NodeToEdgeChainJoinIterator<'a> { // TODO associates, right_outer pub(crate) left: Box> + 'a>, @@ -998,7 +1028,7 @@ mod tests { val_typing: vec![], src_key_typing: vec![], dst_key_typing: vec![], - associates: vec![] + associates: vec![], }; let it = ExecPlan::KeySortedWithAssocItPlan { main: Box::new(sess.iter_node(tbl)), @@ -1007,7 +1037,7 @@ mod tests { (dummy_tinfo.clone(), sess.raw_iterator(true).into()), (dummy_tinfo.clone(), sess.raw_iterator(true).into()), ], - binding: None + binding: None, }; { for el in it.iter()? { diff --git a/src/db/plan.rs b/src/db/plan.rs index e78aaa6f..98a4cc60 100644 --- a/src/db/plan.rs +++ b/src/db/plan.rs @@ -13,7 +13,7 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::iter; -use crate::db::iterator::{BagsUnionIterator, CartesianProdIterator, EdgeIterator, EdgeKeyOnlyBwdIterator, EdgeToNodeChainJoinIterator, EvalIterator, FilterIterator, KeyedDifferenceIterator, KeyedUnionIterator, KeySortedWithAssocIterator, MergeJoinIterator, NodeEdgeChainKind, NodeIterator, NodeToEdgeChainJoinIterator, OuterMergeJoinIterator, OutputIterator}; +use crate::db::iterator::{BagsUnionIterator, CartesianProdIterator, EdgeIterator, EdgeKeyOnlyBwdIterator, EdgeToNodeChainJoinIterator, EvalIterator, FilterIterator, KeyedDifferenceIterator, KeyedUnionIterator, KeySortedWithAssocIterator, LimiterIterator, MergeJoinIterator, NodeEdgeChainKind, NodeIterator, NodeToEdgeChainJoinIterator, OuterMergeJoinIterator, OutputIterator}; use crate::relation::table::MegaTuple; @@ -195,6 +195,11 @@ pub enum ExecPlan<'a> { BagsUnionIt { bags: Vec>, }, + LimiterIt { + source: Box>, + offset: usize, + limit: usize, + }, } impl<'a> ExecPlan<'a> { @@ -240,6 +245,9 @@ impl<'a> ExecPlan<'a> { let (l1, l2) = left.tuple_widths(); (l1 + 1, l2 + 1 + right_associates.len()) } + ExecPlan::LimiterIt { source, .. } => { + source.tuple_widths() + } } } } @@ -450,6 +458,14 @@ impl<'a> ExecPlan<'a> { }), }), }, + ExecPlan::LimiterIt { source, limit, offset } => { + Ok(Box::new(LimiterIterator { + source: source.iter()?, + limit: *limit, + offset: *offset, + current: 0, + })) + } } } } @@ -700,6 +716,14 @@ impl<'a> Session<'a> { }; (plan, l_map) } + ExecPlan::LimiterIt { source, limit, offset } => { + let (source, amap) = self.do_reify_intermediate_plan(*source)?; + (ExecPlan::LimiterIt { + source: source.into(), + limit, + offset, + }, amap) + } }; Ok(res) }