|
|
|
@ -60,7 +60,7 @@ impl InMemRelation {
|
|
|
|
|
arity,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fn ensure_mem_db_for_epoch(&self, epoch: u32) {
|
|
|
|
|
pub(crate) fn ensure_mem_db_for_epoch(&self, epoch: u32) {
|
|
|
|
|
if self.epoch_size.load(Ordering::Relaxed) > epoch {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -84,10 +84,9 @@ impl InMemRelation {
|
|
|
|
|
aggrs: &mut [Option<(Aggregation, Vec<DataValue>)>],
|
|
|
|
|
epoch: u32,
|
|
|
|
|
) -> Result<bool> {
|
|
|
|
|
self.ensure_mem_db_for_epoch(epoch);
|
|
|
|
|
|
|
|
|
|
let mem_db: &RefCell<_> = self.mem_db.borrow();
|
|
|
|
|
let zero_map = mem_db.borrow().get(0).unwrap().clone();
|
|
|
|
|
let zero_maps = mem_db.borrow();
|
|
|
|
|
let zero_map = zero_maps.get(0).unwrap();
|
|
|
|
|
|
|
|
|
|
let zero_target: &RefCell<BTreeMap<_, _>> = zero_map.borrow();
|
|
|
|
|
let mut zero_target = zero_target.borrow_mut();
|
|
|
|
@ -116,7 +115,8 @@ impl InMemRelation {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if changed && epoch != 0 {
|
|
|
|
|
let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone();
|
|
|
|
|
let epoch_maps = mem_db.borrow();
|
|
|
|
|
let epoch_map = epoch_maps.get(epoch as usize).unwrap();
|
|
|
|
|
let epoch_map: &RefCell<BTreeMap<_, _>> = epoch_map.borrow();
|
|
|
|
|
let mut epoch_map = epoch_map.borrow_mut();
|
|
|
|
|
epoch_map.insert(key, prev_aggr.clone());
|
|
|
|
@ -138,7 +138,8 @@ impl InMemRelation {
|
|
|
|
|
);
|
|
|
|
|
zero_target.insert(key.clone(), tuple_to_store.clone());
|
|
|
|
|
if epoch != 0 {
|
|
|
|
|
let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone();
|
|
|
|
|
let epoch_maps = mem_db.borrow();
|
|
|
|
|
let epoch_map = epoch_maps.get(epoch as usize).unwrap();
|
|
|
|
|
let epoch_map: &RefCell<BTreeMap<_, _>> = epoch_map.borrow();
|
|
|
|
|
let mut epoch_map = epoch_map.borrow_mut();
|
|
|
|
|
epoch_map.insert(key, tuple_to_store);
|
|
|
|
@ -147,17 +148,17 @@ impl InMemRelation {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pub(crate) fn put(&self, tuple: Tuple, epoch: u32) {
|
|
|
|
|
self.ensure_mem_db_for_epoch(epoch);
|
|
|
|
|
let mem_db: &RefCell<_> = self.mem_db.borrow();
|
|
|
|
|
let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone();
|
|
|
|
|
let epoch_maps = mem_db.borrow();
|
|
|
|
|
let epoch_map = epoch_maps.get(epoch as usize).unwrap();
|
|
|
|
|
let epoch_map: &RefCell<BTreeMap<_, _>> = epoch_map.borrow();
|
|
|
|
|
let mut epoch_map = epoch_map.borrow_mut();
|
|
|
|
|
epoch_map.insert(tuple, Tuple::default());
|
|
|
|
|
}
|
|
|
|
|
pub(crate) fn put_with_skip(&self, tuple: Tuple, should_skip: bool) {
|
|
|
|
|
self.ensure_mem_db_for_epoch(0);
|
|
|
|
|
let mem_db: &RefCell<_> = self.mem_db.borrow();
|
|
|
|
|
let epoch_map = mem_db.borrow().get(0).unwrap().clone();
|
|
|
|
|
let epoch_maps = mem_db.borrow();
|
|
|
|
|
let epoch_map = epoch_maps.get(0).unwrap();
|
|
|
|
|
let epoch_map: &RefCell<BTreeMap<_, _>> = epoch_map.borrow();
|
|
|
|
|
let mut epoch_map = epoch_map.borrow_mut();
|
|
|
|
|
|
|
|
|
@ -168,9 +169,9 @@ impl InMemRelation {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pub(crate) fn exists(&self, tuple: &Tuple, epoch: u32) -> bool {
|
|
|
|
|
self.ensure_mem_db_for_epoch(epoch);
|
|
|
|
|
let mem_db: &RefCell<_> = self.mem_db.borrow();
|
|
|
|
|
let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone();
|
|
|
|
|
let epoch_maps = mem_db.borrow();
|
|
|
|
|
let epoch_map = epoch_maps.get(epoch as usize).unwrap();
|
|
|
|
|
let epoch_map: &RefCell<BTreeMap<_, _>> = epoch_map.borrow();
|
|
|
|
|
let epoch_map = epoch_map.borrow();
|
|
|
|
|
|
|
|
|
@ -181,12 +182,13 @@ impl InMemRelation {
|
|
|
|
|
&'a self,
|
|
|
|
|
epoch: u32,
|
|
|
|
|
) -> impl Iterator<Item = Result<Tuple>> + 'a {
|
|
|
|
|
self.ensure_mem_db_for_epoch(epoch);
|
|
|
|
|
let mem_db: &RefCell<_> = self.mem_db.borrow();
|
|
|
|
|
let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone();
|
|
|
|
|
let epoch_maps = mem_db.borrow();
|
|
|
|
|
let epoch_map = epoch_maps.get(epoch as usize).unwrap();
|
|
|
|
|
let epoch_map: &RefCell<BTreeMap<_, _>> = epoch_map.borrow();
|
|
|
|
|
let epoch_map = epoch_map.borrow();
|
|
|
|
|
|
|
|
|
|
// FIXME
|
|
|
|
|
epoch_map.clone().into_iter().map(|(k, v)| {
|
|
|
|
|
if v.0.is_empty() {
|
|
|
|
|
Ok(k)
|
|
|
|
@ -210,12 +212,13 @@ impl InMemRelation {
|
|
|
|
|
self.scan_all_for_epoch(0)
|
|
|
|
|
}
|
|
|
|
|
pub(crate) fn scan_early_returned<'a>(&'a self) -> impl Iterator<Item = Result<Tuple>> + 'a {
|
|
|
|
|
self.ensure_mem_db_for_epoch(0);
|
|
|
|
|
let mem_db: &RefCell<_> = self.mem_db.borrow();
|
|
|
|
|
let epoch_map = mem_db.borrow().get(0).unwrap().clone();
|
|
|
|
|
let epoch_maps = mem_db.borrow();
|
|
|
|
|
let epoch_map = epoch_maps.get(0).unwrap();
|
|
|
|
|
let epoch_map: &RefCell<BTreeMap<_, _>> = epoch_map.borrow();
|
|
|
|
|
let epoch_map = epoch_map.borrow();
|
|
|
|
|
|
|
|
|
|
// FIXME
|
|
|
|
|
epoch_map.clone().into_iter().filter_map(|(k, v)| {
|
|
|
|
|
if v.0.is_empty() {
|
|
|
|
|
Some(Ok(k))
|
|
|
|
@ -248,9 +251,9 @@ impl InMemRelation {
|
|
|
|
|
let mut upper = prefix.0.clone();
|
|
|
|
|
upper.push(DataValue::Bot);
|
|
|
|
|
let upper = Tuple(upper);
|
|
|
|
|
self.ensure_mem_db_for_epoch(epoch);
|
|
|
|
|
let mem_db: &RefCell<_> = self.mem_db.borrow();
|
|
|
|
|
let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone();
|
|
|
|
|
let epoch_maps = mem_db.borrow();
|
|
|
|
|
let epoch_map = epoch_maps.get(epoch as usize).unwrap();
|
|
|
|
|
let epoch_map: &RefCell<BTreeMap<_, _>> = epoch_map.borrow();
|
|
|
|
|
let epoch_map = epoch_map.borrow();
|
|
|
|
|
|
|
|
|
@ -284,14 +287,14 @@ impl InMemRelation {
|
|
|
|
|
upper: &[DataValue],
|
|
|
|
|
epoch: u32,
|
|
|
|
|
) -> impl Iterator<Item = Result<Tuple>> {
|
|
|
|
|
self.ensure_mem_db_for_epoch(epoch);
|
|
|
|
|
let mut prefix_bound = prefix.clone();
|
|
|
|
|
prefix_bound.0.extend_from_slice(lower);
|
|
|
|
|
let mut upper_bound = prefix.clone();
|
|
|
|
|
upper_bound.0.extend_from_slice(upper);
|
|
|
|
|
|
|
|
|
|
let mem_db: &RefCell<_> = self.mem_db.borrow();
|
|
|
|
|
let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone();
|
|
|
|
|
let epoch_maps = mem_db.borrow();
|
|
|
|
|
let epoch_map = epoch_maps.get(epoch as usize).unwrap();
|
|
|
|
|
let epoch_map: &RefCell<BTreeMap<_, _>> = epoch_map.borrow();
|
|
|
|
|
let epoch_map = epoch_map.borrow();
|
|
|
|
|
|
|
|
|
|