main
Ziyang Hu 2 years ago
parent 743529d7f4
commit 9c1a368508

@ -3,7 +3,9 @@ use crate::data::eval::{EvalError, PartialEvalContext};
use crate::data::expr::{Expr, StaticExpr};
use crate::data::parser::{parse_scoped_dict, ExprParseError};
use crate::data::tuple::{DataKind, OwnTuple};
use crate::data::tuple_set::{BindingMap, BindingMapEvalContext, TableId, TupleSet, TupleSetError, TupleSetIdx};
use crate::data::tuple_set::{
BindingMap, BindingMapEvalContext, TableId, TupleSet, TupleSetError, TupleSetIdx,
};
use crate::data::typing::Typing;
use crate::data::value::{StaticValue, Value};
use crate::ddl::reify::{
@ -11,15 +13,14 @@ use crate::ddl::reify::{
};
use crate::parser::text_identifier::{build_name_in_def, TextParseError};
use crate::parser::{CozoParser, Pair, Pairs, Rule};
use crate::runtime::options::default_write_options;
use crate::runtime::session::Definable;
use cozorocks::BridgeError;
use pest::error::Error;
use pest::Parser;
use std::collections::{BTreeMap, BTreeSet};
use std::result;
use std::sync::Arc;
use cozorocks::BridgeError;
use crate::ddl::parser::ColSchema;
use crate::runtime::options::default_write_options;
#[derive(thiserror::Error, Debug)]
pub(crate) enum AlgebraParseError {
@ -133,7 +134,7 @@ impl<'a> InterpretContext for TempDbContext<'a> {
pub(crate) trait RelationalAlgebra {
fn name(&self) -> &str;
fn binding_map(&self) -> Result<BindingMap>;
fn iter<'a>(&'a self) -> Result<Box<dyn Iterator<Item=Result<TupleSet>> + 'a>>;
fn iter<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<TupleSet>> + 'a>>;
}
const NAME_RA_FROM_VALUES: &str = "Values";
@ -231,7 +232,7 @@ impl RelationalAlgebra for RaFromValues {
Ok(self.binding_map.clone())
}
fn iter<'a>(&'a self) -> Result<Box<dyn Iterator<Item=Result<TupleSet>> + 'a>> {
fn iter<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<TupleSet>> + 'a>> {
let it = self.values.iter().map(|vs| {
let mut tuple = OwnTuple::with_data_prefix(DataKind::Data);
for v in vs {
@ -304,9 +305,7 @@ impl<'a> RaInsert<'a> {
})
}
fn build_binding_map_inner(
&self,
) -> Result<BTreeMap<String, TupleSetIdx>> {
fn build_binding_map_inner(&self) -> Result<BTreeMap<String, TupleSetIdx>> {
let mut binding_map_inner = BTreeMap::new();
match &self.target_info {
TableInfo::Node(n) => {
@ -403,7 +402,7 @@ impl<'a> RelationalAlgebra for RaInsert<'a> {
Ok(BTreeMap::from([(self.binding.clone(), inner)]))
}
fn iter<'b>(&'b self) -> Result<Box<dyn Iterator<Item=Result<TupleSet>> + 'b>> {
fn iter<'b>(&'b self) -> Result<Box<dyn Iterator<Item = Result<TupleSet>> + 'b>> {
let source_map = self.source.binding_map()?;
let binding_ctx = BindingMapEvalContext {
map: &source_map,
@ -414,27 +413,27 @@ impl<'a> RelationalAlgebra for RaInsert<'a> {
v => return Err(AlgebraParseError::Parse(format!("{:?}", v))),
};
let build_extractor = |col: &ColSchema| {
let extractor = extract_map
.get(&col.name)
.cloned()
.unwrap_or(Expr::Const(Value::Null))
.to_static();
let typing = col.typing.clone();
(extractor, typing)
};
// let build_extractor = |col: &ColSchema| {
// let extractor = extract_map
// .get(&col.name)
// .cloned()
// .unwrap_or(Expr::Const(Value::Null))
// .to_static();
// let typing = col.typing.clone();
// (extractor, typing)
// };
let (key_builder, val_builder, inv_key_builder) = match &self.target_info {
TableInfo::Node(n) => {
let key_builder = n
.keys
.iter()
.map(build_extractor)
.map(|v| v.make_extractor(&extract_map))
.collect::<Vec<_>>();
let val_builder = n
.vals
.iter()
.map(build_extractor)
.map(|v| v.make_extractor(&extract_map))
.collect::<Vec<_>>();
(key_builder, val_builder, None)
}
@ -445,74 +444,91 @@ impl<'a> RelationalAlgebra for RaInsert<'a> {
let dst_key_part = [(Expr::Const(Value::Int(e.dst_id.id as i64)), Typing::Any)];
let fwd_edge_part = [(Expr::Const(Value::Bool(true)), Typing::Any)];
let bwd_edge_part = [(Expr::Const(Value::Bool(true)), Typing::Any)];
let key_builder = src_key_part.into_iter()
.chain(src.keys.iter().map(build_extractor))
let key_builder = src_key_part
.into_iter()
.chain(src.keys.iter().map(|v| v.make_extractor(&extract_map)))
.chain(fwd_edge_part.into_iter())
.chain(dst.keys.iter().map(build_extractor))
.chain(e.keys.iter().map(build_extractor))
.chain(dst.keys.iter().map(|v| v.make_extractor(&extract_map)))
.chain(e.keys.iter().map(|v| v.make_extractor(&extract_map)))
.collect::<Vec<_>>();
let inv_key_builder = dst_key_part.into_iter()
.chain(dst.keys.iter().map(build_extractor))
let inv_key_builder = dst_key_part
.into_iter()
.chain(dst.keys.iter().map(|v| v.make_extractor(&extract_map)))
.chain(bwd_edge_part.into_iter())
.chain(src.keys.iter().map(build_extractor))
.chain(e.keys.iter().map(build_extractor))
.chain(src.keys.iter().map(|v| v.make_extractor(&extract_map)))
.chain(e.keys.iter().map(|v| v.make_extractor(&extract_map)))
.collect::<Vec<_>>();
let val_builder = e
.vals
.iter()
.map(build_extractor)
.map(|v| v.make_extractor(&extract_map))
.collect::<Vec<_>>();
(key_builder, val_builder, Some(inv_key_builder))
}
_ => unreachable!(),
};
let assoc_val_builders = self.assoc_infos.iter().map(|info| {
(info.tid, info.vals.iter().map(build_extractor).collect::<Vec<_>>())
}).collect::<Vec<_>>();
let assoc_val_builders = self
.assoc_infos
.iter()
.map(|info| {
(
info.tid,
info.vals
.iter()
.map(|v| v.make_extractor(&extract_map))
.collect::<Vec<_>>(),
)
})
.collect::<Vec<_>>();
let target_key = self.target_info.table_id();
let txn = self.ctx.txn.clone();
let temp_db = self.ctx.sess.temp.clone();
let write_opts = default_write_options();
Ok(Box::new(self.source.iter()?.map(move |tset| -> Result<TupleSet> {
let tset = tset?;
let mut key = tset.eval_to_tuple(target_key.id, &key_builder)?;
let val = tset.eval_to_tuple(DataKind::Data as u32, &val_builder)?;
if target_key.in_root {
txn.put(&key, &val)?;
} else {
temp_db.put(&write_opts, &key, &val)?;
}
if let Some(builder) = &inv_key_builder {
let inv_key = tset.eval_to_tuple(target_key.id, builder)?;
Ok(Box::new(self.source.iter()?.map(
move |tset| -> Result<TupleSet> {
let tset = tset?;
let mut key = tset.eval_to_tuple(target_key.id, &key_builder)?;
let val = tset.eval_to_tuple(DataKind::Data as u32, &val_builder)?;
if target_key.in_root {
txn.put(&inv_key, &key)?;
txn.put(&key, &val)?;
} else {
temp_db.put(&write_opts, &inv_key, &key)?;
temp_db.put(&write_opts, &key, &val)?;
}
}
let assoc_vals = assoc_val_builders.iter().map(|(tid, builder)| -> Result<OwnTuple> {
let ret = tset.eval_to_tuple(DataKind::Data as u32, builder)?;
key.overwrite_prefix(tid.id);
if tid.in_root {
txn.put(&key, &ret)?;
} else {
temp_db.put(&write_opts, &key, &ret)?;
if let Some(builder) = &inv_key_builder {
let inv_key = tset.eval_to_tuple(target_key.id, builder)?;
if target_key.in_root {
txn.put(&inv_key, &key)?;
} else {
temp_db.put(&write_opts, &inv_key, &key)?;
}
}
let assoc_vals = assoc_val_builders
.iter()
.map(|(tid, builder)| -> Result<OwnTuple> {
let ret = tset.eval_to_tuple(DataKind::Data as u32, builder)?;
key.overwrite_prefix(tid.id);
if tid.in_root {
txn.put(&key, &ret)?;
} else {
temp_db.put(&write_opts, &key, &ret)?;
}
Ok(ret)
})
.collect::<Result<Vec<_>>>()?;
key.overwrite_prefix(target_key.id);
let mut ret = TupleSet::default();
ret.push_key(key.into());
ret.push_val(val.into());
for av in assoc_vals {
ret.push_val(av.into())
}
Ok(ret)
}).collect::<Result<Vec<_>>>()?;
key.overwrite_prefix(target_key.id);
let mut ret = TupleSet::default();
ret.push_key(key.into());
ret.push_val(val.into());
for av in assoc_vals {
ret.push_val(av.into())
}
Ok(ret)
})))
},
)))
}
}
@ -549,11 +565,11 @@ fn parse_table_with_assocs(pair: Pair) -> Result<(String, BTreeSet<String>)> {
#[cfg(test)]
mod tests {
use super::*;
use crate::data::tuple::Tuple;
use crate::parser::{CozoParser, Rule};
use crate::runtime::options::default_read_options;
use crate::runtime::session::tests::create_test_db;
use pest::Parser;
use crate::data::tuple::Tuple;
use crate::runtime::options::default_read_options;
#[test]
fn parse_ra() -> Result<()> {

@ -3,7 +3,7 @@ use crate::data::op::{
OpMod, OpMul, OpNe, OpNot, OpNotNull, OpOr, OpPow, OpStrCat, OpSub, UnresolvedOp,
};
use crate::data::parser::ExprParseError;
use crate::data::tuple_set::{TupleSetIdx};
use crate::data::tuple_set::TupleSetIdx;
use crate::data::value::{StaticValue, Value};
use crate::parser::{CozoParser, Rule};
use pest::Parser;

@ -1,12 +1,12 @@
use crate::data::eval::{EvalError, PartialEvalContext, RowEvalContext};
use crate::data::expr::{Expr};
use crate::data::expr::Expr;
use crate::data::tuple::{OwnTuple, ReifiedTuple, TupleError};
use crate::data::typing::{Typing, TypingError};
use crate::data::value::{StaticValue, Value};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::result;
use crate::data::typing::{Typing, TypingError};
#[derive(thiserror::Error, Debug)]
pub enum TupleSetError {
@ -140,16 +140,16 @@ impl TupleSet {
self.vals.extend(o.vals);
}
pub(crate) fn extend_keys<I, T>(&mut self, keys: I)
where
I: IntoIterator<Item=T>,
ReifiedTuple: From<T>,
where
I: IntoIterator<Item = T>,
ReifiedTuple: From<T>,
{
self.keys.extend(keys.into_iter().map(ReifiedTuple::from));
}
pub(crate) fn extend_vals<I, T>(&mut self, keys: I)
where
I: IntoIterator<Item=T>,
ReifiedTuple: From<T>,
where
I: IntoIterator<Item = T>,
ReifiedTuple: From<T>,
{
self.vals.extend(keys.into_iter().map(ReifiedTuple::from));
}
@ -196,11 +196,11 @@ impl TupleSet {
}
impl<I1, T1, I2, T2> From<(I1, I2)> for TupleSet
where
I1: IntoIterator<Item=T1>,
ReifiedTuple: From<T1>,
I2: IntoIterator<Item=T2>,
ReifiedTuple: From<T2>,
where
I1: IntoIterator<Item = T1>,
ReifiedTuple: From<T1>,
I2: IntoIterator<Item = T2>,
ReifiedTuple: From<T2>,
{
fn from((keys, vals): (I1, I2)) -> Self {
TupleSet {

@ -4,6 +4,7 @@ use crate::data::typing::{Typing, TypingError};
use crate::data::value::{StaticValue, Value};
use crate::parser::text_identifier::{build_name_in_def, TextParseError};
use crate::parser::{Pair, Rule};
use std::collections::BTreeMap;
use std::result;
#[derive(thiserror::Error, Debug)]
@ -36,6 +37,21 @@ pub(crate) struct ColSchema {
pub(crate) default: StaticExpr,
}
impl ColSchema {
pub(crate) fn make_extractor(
&self,
extractor_map: &BTreeMap<String, Expr>,
) -> (StaticExpr, Typing) {
let extractor = extractor_map
.get(&self.name)
.cloned()
.unwrap_or(Expr::Const(Value::Null))
.to_static();
let typing = self.typing.clone();
(extractor, typing)
}
}
impl From<ColSchema> for StaticValue {
fn from(s: ColSchema) -> Self {
Value::from(vec![

Loading…
Cancel
Save