temp store

main
Ziyang Hu 2 years ago
parent 4ef4404d87
commit 0f84dfca8d

@ -26,13 +26,23 @@ use crate::data::value::DataValue;
type Store = BTreeMap<Tuple, Tuple>; type Store = BTreeMap<Tuple, Tuple>;
#[derive(Default)] #[derive(Default)]
struct NormalStore { pub(crate) struct NormalTempStore {
inner: BTreeMap<Tuple, bool>, inner: BTreeMap<Tuple, bool>,
} }
const EMPTY_TUPLE_REF: &Tuple = &vec![]; const EMPTY_TUPLE_REF: &Tuple = &vec![];
impl NormalStore { impl NormalTempStore {
pub(crate) fn wrap(self) -> TempStore {
TempStore::Normal(self)
}
pub(crate) fn exists(&self, old: Option<&EpochStore>, key: &Tuple) -> bool {
// if let Some(old_store) = old {
// if old_store.
// }
todo!()
}
fn range_iter( fn range_iter(
&self, &self,
lower: &Tuple, lower: &Tuple,
@ -50,11 +60,11 @@ impl NormalStore {
.map(|(t, skip)| TupleInIter(t, EMPTY_TUPLE_REF, *skip)) .map(|(t, skip)| TupleInIter(t, EMPTY_TUPLE_REF, *skip))
} }
// must check prev_store for existence before putting here! // must check prev_store for existence before putting here!
fn put(&mut self, tuple: Tuple) { pub(crate) fn put(&mut self, tuple: Tuple) {
self.inner.insert(tuple, false); self.inner.insert(tuple, false);
} }
// must check prev_store for existence before putting here! // must check prev_store for existence before putting here!
fn put_with_skip(&mut self, tuple: Tuple) { pub(crate) fn put_with_skip(&mut self, tuple: Tuple) {
self.inner.insert(tuple, true); self.inner.insert(tuple, true);
} }
fn merge(&mut self, mut other: Self) { fn merge(&mut self, mut other: Self) {
@ -68,17 +78,28 @@ impl NormalStore {
} }
} }
struct MeetAggrStore { pub(crate) struct MeetAggrStore {
inner: BTreeMap<Tuple, Tuple>, inner: BTreeMap<Tuple, Tuple>,
aggregations: Vec<(Aggregation, Vec<DataValue>)>, aggregations: Vec<(Aggregation, Vec<DataValue>)>,
grouping_len: usize, grouping_len: usize,
name: Symbol,
} }
// optimization: MeetAggrStore can be used to simulate functional dependency // optimization: MeetAggrStore can be used to simulate functional dependency
impl MeetAggrStore { impl MeetAggrStore {
fn new(name: Symbol, aggrs: Vec<Option<(Aggregation, Vec<DataValue>)>>) -> Result<Self> { pub(crate) fn wrap(self) -> TempStore {
TempStore::MeetAggr(self)
}
pub(crate) fn exists(&self, old: Option<&EpochStore>, key: &Tuple) -> bool {
// if let Some(old_store) = old {
// if old_store.
// }
todo!()
}
pub(crate) fn is_empty(&self, old: Option<&EpochStore>) -> bool {
todo!()
}
pub(crate) fn new(aggrs: Vec<Option<(Aggregation, Vec<DataValue>)>>) -> Result<Self> {
let total_key_len = aggrs.len(); let total_key_len = aggrs.len();
let mut aggregations = aggrs.into_iter().flatten().collect_vec(); let mut aggregations = aggrs.into_iter().flatten().collect_vec();
for (aggr, args) in aggregations.iter_mut() { for (aggr, args) in aggregations.iter_mut() {
@ -89,7 +110,6 @@ impl MeetAggrStore {
inner: Default::default(), inner: Default::default(),
aggregations, aggregations,
grouping_len, grouping_len,
name,
}) })
} }
fn merge(&mut self, mut other: Self) -> Result<()> { fn merge(&mut self, mut other: Self) -> Result<()> {
@ -117,7 +137,7 @@ impl MeetAggrStore {
} }
// also need to check if value exists beforehand! use the idempotency! // also need to check if value exists beforehand! use the idempotency!
// need to think this through more carefully. // need to think this through more carefully.
fn put(&mut self, tuple: Tuple) -> Result<bool> { pub(crate) fn meet_put(&mut self, tuple: Tuple) -> Result<bool> {
let (key_part, val_part) = tuple.split_at(self.grouping_len); let (key_part, val_part) = tuple.split_at(self.grouping_len);
match self.inner.get_mut(key_part) { match self.inner.get_mut(key_part) {
Some(prev_aggr) => { Some(prev_aggr) => {
@ -175,12 +195,16 @@ impl MeetAggrStore {
} }
} }
enum TempStore { pub(crate) enum TempStore {
Normal(NormalStore), Normal(NormalTempStore),
MeetAggr(MeetAggrStore), MeetAggr(MeetAggrStore),
} }
impl TempStore { impl TempStore {
// TODO
// pub(crate) fn new() -> Self {
// Self::Normal(NormalTempStore::default())
// }
fn merge(&mut self, other: Self) -> Result<()> { fn merge(&mut self, other: Self) -> Result<()> {
match (self, other) { match (self, other) {
(TempStore::Normal(s), TempStore::Normal(o)) => { (TempStore::Normal(s), TempStore::Normal(o)) => {
@ -210,17 +234,17 @@ impl TempStore {
} }
} }
struct EpochStore { pub(crate) struct EpochStore {
prev: TempStore, prev: TempStore,
delta: TempStore, delta: TempStore,
} }
impl EpochStore { impl EpochStore {
fn merge(&mut self, mut new: TempStore) -> Result<()> { pub(crate) fn merge(&mut self, mut new: TempStore) -> Result<()> {
mem::swap(&mut new, &mut self.delta); mem::swap(&mut new, &mut self.delta);
self.prev.merge(new) self.prev.merge(new)
} }
fn range_iter( pub(crate) fn range_iter(
&self, &self,
lower: &Tuple, lower: &Tuple,
upper: &Tuple, upper: &Tuple,
@ -230,7 +254,7 @@ impl EpochStore {
.range_iter(lower, upper, upper_inclusive) .range_iter(lower, upper, upper_inclusive)
.merge(self.prev.range_iter(lower, upper, upper_inclusive)) .merge(self.prev.range_iter(lower, upper, upper_inclusive))
} }
fn delta_range_iter( pub(crate) fn delta_range_iter(
&self, &self,
lower: &Tuple, lower: &Tuple,
upper: &Tuple, upper: &Tuple,
@ -238,36 +262,39 @@ impl EpochStore {
) -> impl Iterator<Item = TupleInIter<'_>> { ) -> impl Iterator<Item = TupleInIter<'_>> {
self.delta.range_iter(lower, upper, upper_inclusive) self.delta.range_iter(lower, upper, upper_inclusive)
} }
fn prefix_iter(&self, prefix: &Tuple) -> impl Iterator<Item = TupleInIter<'_>> { pub(crate) fn prefix_iter(&self, prefix: &Tuple) -> impl Iterator<Item = TupleInIter<'_>> {
let mut upper = prefix.to_vec(); let mut upper = prefix.to_vec();
upper.push(DataValue::Bot); upper.push(DataValue::Bot);
self.range_iter(prefix, &upper, true) self.range_iter(prefix, &upper, true)
} }
fn delta_prefix_iter(&self, prefix: &Tuple) -> impl Iterator<Item = TupleInIter<'_>> { pub(crate) fn delta_prefix_iter(&self, prefix: &Tuple) -> impl Iterator<Item = TupleInIter<'_>> {
let mut upper = prefix.to_vec(); let mut upper = prefix.to_vec();
upper.push(DataValue::Bot); upper.push(DataValue::Bot);
self.delta_range_iter(prefix, &upper, true) self.delta_range_iter(prefix, &upper, true)
} }
fn all_iter(&self) -> impl Iterator<Item = TupleInIter<'_>> { pub(crate) fn all_iter(&self) -> impl Iterator<Item = TupleInIter<'_>> {
self.prefix_iter(&vec![]) self.prefix_iter(&vec![])
} }
fn delta_all_iter(&self) -> impl Iterator<Item = TupleInIter<'_>> { pub(crate) fn delta_all_iter(&self) -> impl Iterator<Item = TupleInIter<'_>> {
self.delta_prefix_iter(&vec![]) self.delta_prefix_iter(&vec![])
} }
} }
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
struct TupleInIter<'a>(&'a Tuple, &'a Tuple, bool); pub(crate) struct TupleInIter<'a>(&'a Tuple, &'a Tuple, bool);
impl<'a> TupleInIter<'a> { impl<'a> TupleInIter<'a> {
fn get(self, idx: usize) -> &'a DataValue { pub(crate) fn get(self, idx: usize) -> &'a DataValue {
self.0 self.0
.get(idx) .get(idx)
.unwrap_or_else(|| self.1.get(idx - self.0.len()).unwrap()) .unwrap_or_else(|| self.1.get(idx - self.0.len()).unwrap())
} }
fn should_skip(&self) -> bool { pub(crate) fn should_skip(&self) -> bool {
self.2 self.2
} }
pub(crate) fn into_tuple(self) -> Tuple {
self.into_iter().cloned().collect_vec()
}
} }
impl<'a> IntoIterator for TupleInIter<'a> { impl<'a> IntoIterator for TupleInIter<'a> {

Loading…
Cancel
Save