main
Ziyang Hu 2 years ago
parent ff14454d90
commit b0c77a7998

@ -39,8 +39,8 @@ cozorocks = { path = "cozorocks" }
#[profile.release]
#lto = true
[profile.release]
debug = true
#[profile.release]
#debug = true
[workspace]
members = ["cozorocks", "cozohttp", "cozopy", "cozoplay/src-tauri"]

@ -3,7 +3,6 @@ use std::fmt::{Display, Formatter};
use anyhow::{anyhow, bail, Result};
use rmp_serde::Serializer;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
use serde_json::json;
use smallvec::SmallVec;
@ -16,7 +15,17 @@ use crate::data::value::{DataValue, Number};
use crate::parse::triple::TempIdCtx;
#[repr(u8)]
#[derive(Copy, Clone, PartialEq, Ord, PartialOrd, Eq, Debug, Deserialize, Serialize)]
#[derive(
Copy,
Clone,
PartialEq,
Ord,
PartialOrd,
Eq,
Debug,
serde_derive::Deserialize,
serde_derive::Serialize,
)]
pub(crate) enum AttributeCardinality {
One = 1,
Many = 2,
@ -52,7 +61,17 @@ impl TryFrom<&'_ str> for AttributeCardinality {
}
#[repr(u8)]
#[derive(Copy, Clone, PartialEq, Ord, PartialOrd, Eq, Debug, Deserialize, Serialize)]
#[derive(
Copy,
Clone,
PartialEq,
Ord,
PartialOrd,
Eq,
Debug,
serde_derive::Deserialize,
serde_derive::Serialize,
)]
pub(crate) enum AttributeTyping {
Ref = 1,
Component = 2,
@ -176,7 +195,9 @@ impl AttributeTyping {
}
#[repr(u8)]
#[derive(Clone, PartialEq, Ord, PartialOrd, Eq, Debug, Deserialize, Serialize)]
#[derive(
Clone, PartialEq, Ord, PartialOrd, Eq, Debug, serde_derive::Deserialize, serde_derive::Serialize,
)]
pub(crate) enum AttributeIndex {
None = 0,
Indexed = 1,
@ -218,7 +239,9 @@ impl TryFrom<&'_ str> for AttributeIndex {
}
}
#[derive(Clone, PartialEq, Ord, PartialOrd, Eq, Debug, Deserialize, Serialize)]
#[derive(
Clone, PartialEq, Ord, PartialOrd, Eq, Debug, serde_derive::Deserialize, serde_derive::Serialize,
)]
pub(crate) struct Attribute {
#[serde(rename = "i")]
pub(crate) id: AttrId,

@ -6,7 +6,6 @@ use anyhow::{bail, Result};
use regex::Regex;
use rmp_serde::Serializer;
use serde::{Deserialize, Deserializer, Serialize};
use serde_derive::{Deserialize, Serialize};
use smallvec::SmallVec;
use smartstring::{LazyCompact, SmartString};
use uuid::Uuid;
@ -56,7 +55,7 @@ impl PartialOrd for RegexWrapper {
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, serde_derive::Deserialize, serde_derive::Serialize)]
pub(crate) enum DataValue {
#[serde(rename = "n")]
Null,
@ -100,7 +99,7 @@ impl From<f64> for DataValue {
}
}
#[derive(Copy, Clone, Deserialize, Serialize)]
#[derive(Copy, Clone, serde_derive::Deserialize, serde_derive::Serialize)]
pub(crate) enum Number {
#[serde(rename = "i")]
Int(i64),

@ -1173,21 +1173,16 @@ impl TripleRelation {
it: impl Iterator<Item = Result<Tuple>> + 'a,
eliminate_indices: BTreeSet<usize>,
) -> TupleIter<'a> {
if self.filters.is_empty() {
if eliminate_indices.is_empty() {
Box::new(it)
} else {
match (self.filters.is_empty(), eliminate_indices.is_empty()) {
(true, true) => Box::new(it),
(true, false) => {
Box::new(it.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)))
}
} else {
if eliminate_indices.is_empty() {
Box::new(filter_iter(self.filters.clone(), it))
} else {
Box::new(
filter_iter(self.filters.clone(), it)
.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)),
)
}
(false, true) => Box::new(filter_iter(self.filters.clone(), it)),
(false, false) => Box::new(
filter_iter(self.filters.clone(), it)
.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)),
),
}
}
}
@ -1376,21 +1371,16 @@ impl StoredDerivedRelation {
})
.flatten_ok()
.map(flatten_err);
if self.filters.is_empty() {
if eliminate_indices.is_empty() {
Box::new(it)
} else {
match (self.filters.is_empty(), eliminate_indices.is_empty()) {
(true, true) => Box::new(it),
(true, false) => {
Box::new(it.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)))
}
} else {
if eliminate_indices.is_empty() {
Box::new(filter_iter(self.filters.clone(), it))
} else {
Box::new(
filter_iter(self.filters.clone(), it)
.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)),
)
}
(false, true) => Box::new(filter_iter(self.filters.clone(), it)),
(false, false) => Box::new(
filter_iter(self.filters.clone(), it)
.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)),
),
}
}
}

@ -1,4 +1,4 @@
use std::borrow::{BorrowMut};
use std::borrow::BorrowMut;
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::ops::Bound::{Excluded, Included};
@ -27,7 +27,6 @@ impl Debug for TempStoreId {
#[derive(Clone)]
pub(crate) struct TempStore {
// db: RawRocksDb,
mem_db: Arc<RwLock<Vec<Arc<RwLock<BTreeMap<Tuple, Tuple>>>>>>,
epoch_size: Arc<AtomicU32>,
pub(crate) id: TempStoreId,
@ -37,7 +36,7 @@ pub(crate) struct TempStore {
impl Debug for TempStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Throwaway<{}>", self.id.0)
write!(f, "TempStore<{}>", self.id.0)
}
}
@ -49,7 +48,6 @@ impl TempStore {
arity: usize,
) -> TempStore {
Self {
// db,
epoch_size: Default::default(),
mem_db: Default::default(),
id,
@ -94,12 +92,6 @@ impl TempStore {
})
.collect_vec(),
);
// let key_encoded = key.encode_as_key_for_epoch(self.id, 0);
// let prev_aggr = swap_result_option(
// self.db
// .get(&key_encoded)?
// .map(|slice| EncodedTuple(&slice).decode()),
// )?;
let prev_aggr = zero_target.get_mut(&key);
if let Some(prev_aggr) = prev_aggr {
@ -110,16 +102,9 @@ impl TempStore {
changed |= op(&mut prev_aggr.0[i], &tuple.0[i], aggr_args)?;
}
}
if changed {
// let tuple_data = prev_aggr.encode_as_key_for_epoch(self.id, 0);
// self.db.put(&key_encoded, &tuple_data)?;
if epoch != 0 {
let mut epoch_target =
db_target.get(epoch as usize).unwrap().try_write().unwrap();
epoch_target.insert(key, prev_aggr.clone());
// let key_encoded = key.encode_as_key_for_epoch(self.id, epoch);
// self.db.put(&key_encoded, &tuple_data)?;
}
if changed && epoch != 0 {
let mut epoch_target = db_target.get(epoch as usize).unwrap().try_write().unwrap();
epoch_target.insert(key, prev_aggr.clone());
}
Ok(changed)
} else {
@ -139,12 +124,8 @@ impl TempStore {
})
.try_collect()?,
);
// let tuple_data = tuple_to_store.encode_as_key_for_epoch(self.id, 0);
zero_target.insert(key.clone(), tuple_to_store.clone());
// self.db.put(&key_encoded, &tuple_data)?;
if epoch != 0 {
// let key_encoded = key.encode_as_key_for_epoch(self.id, epoch);
// self.db.put(&key_encoded, &tuple_data)?;
let mut zero = db_target.get(epoch as usize).unwrap().try_write().unwrap();
zero.insert(key, tuple_to_store);
}
@ -157,8 +138,6 @@ impl TempStore {
let mut target = db.get(epoch as usize).unwrap().try_write().unwrap();
target.insert(tuple, Tuple::default());
Ok(())
// let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
// self.db.put(&key_encoded, &[])
}
pub(crate) fn put_kv(&self, tuple: Tuple, val: Tuple, epoch: u32) -> Result<(), RocksDbStatus> {
self.ensure_mem_db_for_epoch(epoch);
@ -166,9 +145,6 @@ impl TempStore {
let mut target = db.get(epoch as usize).unwrap().try_write().unwrap();
target.insert(tuple, val);
Ok(())
// let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
// let val_encoded = val.encode_as_key_for_epoch(self.id, epoch);
// self.db.put(&key_encoded, &val_encoded)
}
pub(crate) fn normal_aggr_put(
&self,
@ -194,16 +170,12 @@ impl TempStore {
let mut target = target.get(0).unwrap().try_write().unwrap();
target.insert(Tuple(vals), Tuple::default());
Ok(())
// self.db
// .put(&Tuple(vals).encode_as_key_for_epoch(self.id, 0), &[])
}
pub(crate) fn exists(&self, tuple: &Tuple, epoch: u32) -> Result<bool, RocksDbStatus> {
self.ensure_mem_db_for_epoch(epoch);
let target = self.mem_db.try_read().unwrap();
let target = target.get(epoch as usize).unwrap().try_read().unwrap();
Ok(target.contains_key(tuple))
// let key_encoded = tuple.encode_as_key_for_epoch(self.id, epoch);
// self.db.exists(&key_encoded)
}
pub(crate) fn normal_aggr_scan_and_put(
@ -214,15 +186,6 @@ impl TempStore {
) -> Result<bool> {
let db_target = self.mem_db.try_read().unwrap();
let target = db_target.get(0).unwrap().try_read().unwrap();
// let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, 0);
// let mut it = self
// .db
// .iterator()
// .upper_bound(&upper)
// .prefix_same_as_start(true)
// .start();
// it.seek(&lower);
// let it = TempStoreIter { it, started: false };
let it = target.clone().into_iter().map(|(k, v)| {
if v.0.is_empty() {
k
@ -262,7 +225,6 @@ impl TempStore {
.map(|(a, _b)| a)
.collect_vec();
for (_key, group) in grouped.into_iter() {
// if key.is_some() {
let mut aggr_res = vec![DataValue::Guard; aggrs.len()];
let mut it = group.into_iter();
let first_tuple = it.next().unwrap();
@ -275,7 +237,6 @@ impl TempStore {
}
}
for tuple in it {
// let tuple = tuple?;
for (idx, aggr) in aggrs.iter().enumerate() {
let val = &tuple.0[invert_indices[idx]];
if let Some((aggr_op, aggr_args)) = aggr {
@ -299,9 +260,6 @@ impl TempStore {
} else {
store.put(res_tpl, 0)?;
}
// } else {
// return group.into_iter().next().unwrap().map(|_| true);
// }
}
Ok(false)
}
@ -336,16 +294,6 @@ impl TempStore {
Ok(Tuple(combined))
}
})
// let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, epoch);
// let mut it = self
// .db
// .iterator()
// .upper_bound(&upper)
// .prefix_same_as_start(true)
// .start();
// it.seek(&lower);
// TempStoreIter { it, started: false }
}
pub(crate) fn scan_all(&self) -> impl Iterator<Item = Result<Tuple>> {
self.scan_all_for_epoch(0)
@ -355,15 +303,6 @@ impl TempStore {
let target = self.mem_db.try_read().unwrap();
let target = target.get(0).unwrap().try_read().unwrap();
target.clone().into_iter().map(|(_k, v)| Ok(v))
// let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, 0);
// let mut it = self
// .db
// .iterator()
// .upper_bound(&upper)
// .prefix_same_as_start(true)
// .start();
// it.seek(&lower);
// SortedIter { it, started: false }
}
pub(crate) fn scan_prefix(&self, prefix: &Tuple) -> impl Iterator<Item = Result<Tuple>> {
self.scan_prefix_for_epoch(prefix, 0)
@ -401,19 +340,6 @@ impl TempStore {
})
.collect_vec();
res.into_iter()
// let mut upper = prefix.0.clone();
// upper.push(DataValue::Bottom);
// let upper = Tuple(upper);
// let upper = upper.encode_as_key_for_epoch(self.id, epoch);
// let lower = prefix.encode_as_key_for_epoch(self.id, epoch);
// let mut it = self
// .db
// .iterator()
// .upper_bound(&upper)
// .prefix_same_as_start(true)
// .start();
// it.seek(&lower);
// TempStoreIter { it, started: false }
}
}
@ -440,54 +366,3 @@ impl Iterator for SortedIter {
}
}
}
struct TempStoreIter {
it: DbIter,
started: bool,
}
impl Iterator for TempStoreIter {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {
if !self.started {
self.started = true;
} else {
self.it.next();
}
match self.it.pair() {
Err(e) => Some(Err(e.into())),
Ok(None) => None,
Ok(Some((k_slice, v_slice))) => match EncodedTuple(k_slice).decode() {
Err(e) => Some(Err(e)),
Ok(t) => {
if v_slice.len() == 0 {
Some(Ok(t))
} else {
match EncodedTuple(v_slice).decode() {
Err(e) => Some(Err(e)),
Ok(vt) => Some(Ok(Tuple(
t.0.into_iter()
.zip(vt.0)
.map(|(kv, vv)| match kv {
DataValue::Guard => vv,
kv => kv,
})
.collect_vec(),
))),
}
}
}
},
}
}
}
// impl Drop for TempStore {
// fn drop(&mut self) {
// let (lower, upper) = EncodedTuple::bounds_for_prefix(self.id);
// if let Err(e) = self.db.range_del(&lower, &upper) {
// error!("{}", e);
// }
// }
// }

@ -5,7 +5,6 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use rmp_serde::Serializer;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
use smallvec::SmallVec;
use cozorocks::{DbIter, RawRocksDb, Tx};
@ -37,7 +36,9 @@ pub struct SessionTx {
pub(crate) touched_eids: BTreeSet<EntityId>,
}
#[derive(Clone, PartialEq, Ord, PartialOrd, Eq, Debug, Deserialize, Serialize)]
#[derive(
Clone, PartialEq, Ord, PartialOrd, Eq, Debug, serde_derive::Deserialize, serde_derive::Serialize,
)]
pub(crate) struct TxLog {
#[serde(rename = "i")]
pub(crate) id: TxId,

Loading…
Cancel
Save