diff --git a/cozo-core/benches/pokec.rs b/cozo-core/benches/pokec.rs index 48922f4a..0ba14bcc 100644 --- a/cozo-core/benches/pokec.rs +++ b/cozo-core/benches/pokec.rs @@ -195,12 +195,8 @@ const READ_QUERIES: [QueryFn; 1] = [single_vertex]; const WRITE_QUERIES: [QueryFn; 2] = [single_edge_write, single_vertex_write]; const UPDATE_QUERIES: [QueryFn; 1] = [single_vertex_update]; #[allow(dead_code)] -const AGGREGATE_QUERIES: [QueryFn; 4] = [ - aggregation, - aggregation_filter, - aggregation_count, - min_max, -]; +const AGGREGATE_QUERIES: [QueryFn; 4] = + [aggregation, aggregation_filter, aggregation_count, min_max]; const ANALYTICAL_QUERIES: [QueryFn; 15] = [ expansion_1, expansion_2, @@ -294,7 +290,10 @@ fn aggregation() { fn aggregation_count() { TEST_DB - .run_script("?[count(uid), count(age)] := *user{uid, age}", Default::default()) + .run_script( + "?[count(uid), count(age)] := *user{uid, age}", + Default::default(), + ) .unwrap(); } @@ -1017,4 +1016,10 @@ fn mixed(_: &mut Bencher) { wrap(mixed_pct, single_vertex); }); dbg!((count as f64) / single_vertex_time.elapsed().as_secs_f64()); + + let single_vertex_update_time = Instant::now(); + (0..count).into_par_iter().for_each(|_| { + wrap(mixed_pct, single_vertex_update); + }); + dbg!((count as f64) / single_vertex_update_time.elapsed().as_secs_f64()); } diff --git a/cozo-core/src/query/eval.rs b/cozo-core/src/query/eval.rs index 73dc3987..0527c34d 100644 --- a/cozo-core/src/query/eval.rs +++ b/cozo-core/src/query/eval.rs @@ -118,6 +118,10 @@ impl<'a> SessionTx<'a> { } } } else { + for store in stores.values() { + store.ensure_mem_db_for_epoch(epoch); + } + mem::swap(&mut changed, &mut prev_changed); for (_k, v) in changed.iter_mut() { *v = false; diff --git a/cozo-core/src/runtime/in_mem.rs b/cozo-core/src/runtime/in_mem.rs index 4326fae4..5405953c 100644 --- a/cozo-core/src/runtime/in_mem.rs +++ b/cozo-core/src/runtime/in_mem.rs @@ -60,7 +60,7 @@ impl InMemRelation { arity, } } - fn ensure_mem_db_for_epoch(&self, epoch: u32) { + pub(crate) fn ensure_mem_db_for_epoch(&self, epoch: u32) { if self.epoch_size.load(Ordering::Relaxed) > epoch { return; } @@ -84,10 +84,9 @@ impl InMemRelation { aggrs: &mut [Option<(Aggregation, Vec)>], epoch: u32, ) -> Result { - self.ensure_mem_db_for_epoch(epoch); - let mem_db: &RefCell<_> = self.mem_db.borrow(); - let zero_map = mem_db.borrow().get(0).unwrap().clone(); + let zero_maps = mem_db.borrow(); + let zero_map = zero_maps.get(0).unwrap(); let zero_target: &RefCell> = zero_map.borrow(); let mut zero_target = zero_target.borrow_mut(); @@ -116,7 +115,8 @@ impl InMemRelation { } } if changed && epoch != 0 { - let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone(); + let epoch_maps = mem_db.borrow(); + let epoch_map = epoch_maps.get(epoch as usize).unwrap(); let epoch_map: &RefCell> = epoch_map.borrow(); let mut epoch_map = epoch_map.borrow_mut(); epoch_map.insert(key, prev_aggr.clone()); @@ -138,7 +138,8 @@ impl InMemRelation { ); zero_target.insert(key.clone(), tuple_to_store.clone()); if epoch != 0 { - let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone(); + let epoch_maps = mem_db.borrow(); + let epoch_map = epoch_maps.get(epoch as usize).unwrap(); let epoch_map: &RefCell> = epoch_map.borrow(); let mut epoch_map = epoch_map.borrow_mut(); epoch_map.insert(key, tuple_to_store); @@ -147,17 +148,17 @@ impl InMemRelation { } } pub(crate) fn put(&self, tuple: Tuple, epoch: u32) { - self.ensure_mem_db_for_epoch(epoch); let mem_db: &RefCell<_> = self.mem_db.borrow(); - let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone(); + let epoch_maps = mem_db.borrow(); + let epoch_map = epoch_maps.get(epoch as usize).unwrap(); let epoch_map: &RefCell> = epoch_map.borrow(); let mut epoch_map = epoch_map.borrow_mut(); epoch_map.insert(tuple, Tuple::default()); } pub(crate) fn put_with_skip(&self, tuple: Tuple, should_skip: bool) { - self.ensure_mem_db_for_epoch(0); let mem_db: &RefCell<_> = self.mem_db.borrow(); - let epoch_map = mem_db.borrow().get(0).unwrap().clone(); + let epoch_maps = mem_db.borrow(); + let epoch_map = epoch_maps.get(0).unwrap(); let epoch_map: &RefCell> = epoch_map.borrow(); let mut epoch_map = epoch_map.borrow_mut(); @@ -168,9 +169,9 @@ impl InMemRelation { } } pub(crate) fn exists(&self, tuple: &Tuple, epoch: u32) -> bool { - self.ensure_mem_db_for_epoch(epoch); let mem_db: &RefCell<_> = self.mem_db.borrow(); - let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone(); + let epoch_maps = mem_db.borrow(); + let epoch_map = epoch_maps.get(epoch as usize).unwrap(); let epoch_map: &RefCell> = epoch_map.borrow(); let epoch_map = epoch_map.borrow(); @@ -181,12 +182,13 @@ impl InMemRelation { &'a self, epoch: u32, ) -> impl Iterator> + 'a { - self.ensure_mem_db_for_epoch(epoch); let mem_db: &RefCell<_> = self.mem_db.borrow(); - let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone(); + let epoch_maps = mem_db.borrow(); + let epoch_map = epoch_maps.get(epoch as usize).unwrap(); let epoch_map: &RefCell> = epoch_map.borrow(); let epoch_map = epoch_map.borrow(); + // FIXME epoch_map.clone().into_iter().map(|(k, v)| { if v.0.is_empty() { Ok(k) @@ -210,12 +212,13 @@ impl InMemRelation { self.scan_all_for_epoch(0) } pub(crate) fn scan_early_returned<'a>(&'a self) -> impl Iterator> + 'a { - self.ensure_mem_db_for_epoch(0); let mem_db: &RefCell<_> = self.mem_db.borrow(); - let epoch_map = mem_db.borrow().get(0).unwrap().clone(); + let epoch_maps = mem_db.borrow(); + let epoch_map = epoch_maps.get(0).unwrap(); let epoch_map: &RefCell> = epoch_map.borrow(); let epoch_map = epoch_map.borrow(); + // FIXME epoch_map.clone().into_iter().filter_map(|(k, v)| { if v.0.is_empty() { Some(Ok(k)) @@ -248,9 +251,9 @@ impl InMemRelation { let mut upper = prefix.0.clone(); upper.push(DataValue::Bot); let upper = Tuple(upper); - self.ensure_mem_db_for_epoch(epoch); let mem_db: &RefCell<_> = self.mem_db.borrow(); - let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone(); + let epoch_maps = mem_db.borrow(); + let epoch_map = epoch_maps.get(epoch as usize).unwrap(); let epoch_map: &RefCell> = epoch_map.borrow(); let epoch_map = epoch_map.borrow(); @@ -284,14 +287,14 @@ impl InMemRelation { upper: &[DataValue], epoch: u32, ) -> impl Iterator> { - self.ensure_mem_db_for_epoch(epoch); let mut prefix_bound = prefix.clone(); prefix_bound.0.extend_from_slice(lower); let mut upper_bound = prefix.clone(); upper_bound.0.extend_from_slice(upper); let mem_db: &RefCell<_> = self.mem_db.borrow(); - let epoch_map = mem_db.borrow().get(epoch as usize).unwrap().clone(); + let epoch_maps = mem_db.borrow(); + let epoch_map = epoch_maps.get(epoch as usize).unwrap(); let epoch_map: &RefCell> = epoch_map.borrow(); let epoch_map = epoch_map.borrow(); diff --git a/cozo-core/src/runtime/transact.rs b/cozo-core/src/runtime/transact.rs index 798904f6..73525ff4 100644 --- a/cozo-core/src/runtime/transact.rs +++ b/cozo-core/src/runtime/transact.rs @@ -30,19 +30,23 @@ impl<'a> SessionTx<'a> { pub(crate) fn new_rule_store(&self, rule_name: MagicSymbol, arity: usize) -> InMemRelation { let old_count = self.mem_store_id.fetch_add(1, Ordering::AcqRel); let old_count = old_count & 0x00ff_ffffu32; - InMemRelation::new(StoredRelationId(old_count), rule_name, arity) + let ret = InMemRelation::new(StoredRelationId(old_count), rule_name, arity); + ret.ensure_mem_db_for_epoch(0); + ret } pub(crate) fn new_temp_store(&self, span: SourceSpan) -> InMemRelation { let old_count = self.mem_store_id.fetch_add(1, Ordering::AcqRel); let old_count = old_count & 0x00ff_ffffu32; - InMemRelation::new( + let ret = InMemRelation::new( StoredRelationId(old_count), MagicSymbol::Muggle { inner: Symbol::new("", span), }, 0, - ) + ); + ret.ensure_mem_db_for_epoch(0); + ret } pub(crate) fn load_last_relation_store_id(&self) -> Result {