tagged insertion
parent
86c262a237
commit
dded6b887a
@ -0,0 +1,116 @@
|
||||
use crate::algebra::parser::AlgebraParseError;
|
||||
use crate::context::TempDbContext;
|
||||
use crate::data::eval::PartialEvalContext;
|
||||
use crate::data::tuple_set::{BindingMap, TableId, TupleSet};
|
||||
use crate::ddl::parser::ColExtractor;
|
||||
use crate::ddl::reify::{AssocInfo, DdlContext, EdgeInfo, IndexInfo, NodeInfo, TableInfo};
|
||||
use crate::runtime::session::Definable;
|
||||
use anyhow::Result;
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::collections::BTreeMap;
|
||||
use std::rc::Rc;
|
||||
|
||||
mod from_values;
|
||||
mod insert;
|
||||
mod insert_tagged;
|
||||
|
||||
pub(crate) use from_values::*;
|
||||
pub(crate) use insert::*;
|
||||
pub(crate) use insert_tagged::*;
|
||||
|
||||
pub(crate) trait InterpretContext: PartialEvalContext {
|
||||
fn resolve_definable(&self, name: &str) -> Option<Definable>;
|
||||
fn resolve_table(&self, name: &str) -> Option<TableId>;
|
||||
fn get_table_info(&self, table_id: TableId) -> Result<TableInfo>;
|
||||
fn get_node_info(&self, table_id: TableId) -> Result<NodeInfo>;
|
||||
fn get_edge_info(&self, table_id: TableId) -> Result<EdgeInfo>;
|
||||
fn get_table_assocs(&self, table_id: TableId) -> Result<Vec<AssocInfo>>;
|
||||
fn get_node_edges(&self, table_id: TableId) -> Result<(Vec<EdgeInfo>, Vec<EdgeInfo>)>;
|
||||
fn get_table_indices(&self, table_id: TableId) -> Result<Vec<IndexInfo>>;
|
||||
}
|
||||
|
||||
impl<'a> InterpretContext for TempDbContext<'a> {
|
||||
fn resolve_definable(&self, _name: &str) -> Option<Definable> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn resolve_table(&self, name: &str) -> Option<TableId> {
|
||||
self.table_id_by_name(name).ok()
|
||||
}
|
||||
|
||||
fn get_table_info(&self, table_id: TableId) -> Result<TableInfo> {
|
||||
self.table_by_id(table_id)
|
||||
.map_err(|_| AlgebraParseError::TableIdNotFound(table_id).into())
|
||||
}
|
||||
|
||||
fn get_node_info(&self, table_id: TableId) -> Result<NodeInfo> {
|
||||
match self.get_table_info(table_id)? {
|
||||
TableInfo::Node(n) => Ok(n),
|
||||
_ => Err(AlgebraParseError::WrongTableKind(table_id).into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_edge_info(&self, table_id: TableId) -> Result<EdgeInfo> {
|
||||
match self.get_table_info(table_id)? {
|
||||
TableInfo::Edge(n) => Ok(n),
|
||||
_ => Err(AlgebraParseError::WrongTableKind(table_id).into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_table_assocs(&self, table_id: TableId) -> Result<Vec<AssocInfo>> {
|
||||
let res = self.assocs_by_main_id(table_id)?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn get_node_edges(&self, table_id: TableId) -> Result<(Vec<EdgeInfo>, Vec<EdgeInfo>)> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_table_indices(&self, table_id: TableId) -> Result<Vec<IndexInfo>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait RelationalAlgebra {
|
||||
fn name(&self) -> &str;
|
||||
fn binding_map(&self) -> Result<BindingMap>;
|
||||
fn iter<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<TupleSet>> + 'a>>;
|
||||
fn identity(&self) -> Option<TableInfo>;
|
||||
}
|
||||
|
||||
type KeyBuilderSet = (
|
||||
Vec<ColExtractor>,
|
||||
Vec<ColExtractor>,
|
||||
Option<Vec<ColExtractor>>,
|
||||
);
|
||||
|
||||
struct TableInfoByNameCache<'a> {
|
||||
ctx: &'a TempDbContext<'a>,
|
||||
cache: BTreeMap<String, Rc<TableInfo>>,
|
||||
}
|
||||
|
||||
impl<'a> TableInfoByNameCache<'a> {
|
||||
fn get_info(&mut self, name: &str) -> Result<Rc<TableInfo>> {
|
||||
if !self.cache.contains_key(name) {
|
||||
let tid = self.ctx.table_id_by_name(name)?;
|
||||
let info = self.ctx.get_table_info(tid)?;
|
||||
self.cache.insert(name.to_string(), info.into());
|
||||
}
|
||||
Ok(self.cache.get(name).unwrap().clone())
|
||||
}
|
||||
}
|
||||
|
||||
struct TableInfoByIdCache<'a> {
|
||||
ctx: &'a TempDbContext<'a>,
|
||||
cache: BTreeMap<TableId, Rc<TableInfo>>,
|
||||
}
|
||||
|
||||
impl<'a> TableInfoByIdCache<'a> {
|
||||
fn get_info(&mut self, tid: TableId) -> Result<Rc<TableInfo>> {
|
||||
if let Entry::Vacant(e) = self.cache.entry(tid) {
|
||||
let info = self.ctx.get_table_info(tid)?;
|
||||
e.insert(info.into());
|
||||
}
|
||||
Ok(self.cache.get(&tid).unwrap().clone())
|
||||
}
|
||||
}
|
@ -0,0 +1,115 @@
|
||||
use crate::algebra::op::RelationalAlgebra;
|
||||
use crate::algebra::parser::{assert_rule, AlgebraParseError};
|
||||
use crate::context::TempDbContext;
|
||||
use crate::data::expr::Expr;
|
||||
use crate::data::tuple::{DataKind, OwnTuple};
|
||||
use crate::data::tuple_set::{BindingMap, TupleSet, TupleSetIdx};
|
||||
use crate::data::value::{StaticValue, Value};
|
||||
use crate::ddl::reify::TableInfo;
|
||||
use crate::parser::{Pairs, Rule};
|
||||
use anyhow::Result;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(crate) const NAME_RELATION_FROM_VALUES: &str = "Values";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct RelationFromValues {
|
||||
binding_map: BindingMap,
|
||||
values: Vec<Vec<StaticValue>>,
|
||||
}
|
||||
|
||||
impl RelationFromValues {
|
||||
pub(crate) fn build<'a>(
|
||||
ctx: &'a TempDbContext<'a>,
|
||||
prev: Option<Arc<dyn RelationalAlgebra + 'a>>,
|
||||
mut args: Pairs,
|
||||
) -> Result<Self> {
|
||||
if !matches!(prev, None) {
|
||||
return Err(
|
||||
AlgebraParseError::Unchainable(NAME_RELATION_FROM_VALUES.to_string()).into(),
|
||||
);
|
||||
}
|
||||
let not_enough_args =
|
||||
|| AlgebraParseError::NotEnoughArguments(NAME_RELATION_FROM_VALUES.to_string());
|
||||
let schema = args
|
||||
.next()
|
||||
.ok_or_else(not_enough_args)?
|
||||
.into_inner()
|
||||
.next()
|
||||
.ok_or_else(not_enough_args)?;
|
||||
assert_rule(&schema, Rule::scoped_list, NAME_RELATION_FROM_VALUES, 0)?;
|
||||
let mut schema_pairs = schema.into_inner();
|
||||
let binding = schema_pairs.next().ok_or_else(not_enough_args)?.as_str();
|
||||
let binding_map = schema_pairs
|
||||
.enumerate()
|
||||
.map(|(i, v)| {
|
||||
(
|
||||
v.as_str().to_string(),
|
||||
TupleSetIdx {
|
||||
is_key: false,
|
||||
t_set: 0,
|
||||
col_idx: i,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect::<BTreeMap<_, _>>();
|
||||
let n_fields = binding_map.len();
|
||||
let binding_map = BTreeMap::from([(binding.to_string(), binding_map)]);
|
||||
let data = args
|
||||
.next()
|
||||
.ok_or_else(not_enough_args)?
|
||||
.into_inner()
|
||||
.next()
|
||||
.ok_or_else(not_enough_args)?;
|
||||
assert_rule(&data, Rule::expr, NAME_RELATION_FROM_VALUES, 1)?;
|
||||
let data = Expr::try_from(data)?.interpret_eval(ctx)?.to_static();
|
||||
let data = data.into_vec().map_err(AlgebraParseError::ValueError)?;
|
||||
let values = data
|
||||
.into_iter()
|
||||
.map(|v| -> Result<Vec<Value>> {
|
||||
match v.into_vec() {
|
||||
Ok(v) => {
|
||||
if v.len() == n_fields {
|
||||
Ok(v)
|
||||
} else {
|
||||
Err(AlgebraParseError::ValueError(Value::List(v)).into())
|
||||
}
|
||||
}
|
||||
Err(v) => Err(AlgebraParseError::ValueError(v).into()),
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
Ok(Self {
|
||||
binding_map: binding_map,
|
||||
values,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl RelationalAlgebra for RelationFromValues {
|
||||
fn name(&self) -> &str {
|
||||
NAME_RELATION_FROM_VALUES
|
||||
}
|
||||
|
||||
fn binding_map(&self) -> Result<BindingMap> {
|
||||
Ok(self.binding_map.clone())
|
||||
}
|
||||
|
||||
fn iter<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<TupleSet>> + 'a>> {
|
||||
let it = self.values.iter().map(|vs| {
|
||||
let mut tuple = OwnTuple::with_data_prefix(DataKind::Data);
|
||||
for v in vs {
|
||||
tuple.push_value(v);
|
||||
}
|
||||
let mut tset = TupleSet::default();
|
||||
tset.push_val(tuple.into());
|
||||
Ok(tset)
|
||||
});
|
||||
Ok(Box::new(it))
|
||||
}
|
||||
|
||||
fn identity(&self) -> Option<TableInfo> {
|
||||
None
|
||||
}
|
||||
}
|
@ -0,0 +1,336 @@
|
||||
use crate::algebra::op::{InterpretContext, KeyBuilderSet, RelationalAlgebra};
|
||||
use crate::algebra::parser::{assert_rule, build_relational_expr, AlgebraParseError};
|
||||
use crate::context::TempDbContext;
|
||||
use crate::data::expr::{Expr, StaticExpr};
|
||||
use crate::data::parser::parse_scoped_dict;
|
||||
use crate::data::tuple::{DataKind, OwnTuple};
|
||||
use crate::data::tuple_set::{
|
||||
BindingMap, BindingMapEvalContext, TupleSet, TupleSetEvalContext, TupleSetIdx,
|
||||
};
|
||||
use crate::data::typing::Typing;
|
||||
use crate::data::value::Value;
|
||||
use crate::ddl::reify::{AssocInfo, TableInfo};
|
||||
use crate::parser::text_identifier::parse_table_with_assocs;
|
||||
use crate::parser::{Pairs, Rule};
|
||||
use crate::runtime::options::{default_read_options, default_write_options};
|
||||
use anyhow::Result;
|
||||
use cozorocks::PinnableSlicePtr;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(crate) const NAME_INSERTION: &str = "Insert";
|
||||
pub(crate) const NAME_UPSERT: &str = "Upsert";
|
||||
|
||||
pub(crate) struct Insertion<'a> {
|
||||
ctx: &'a TempDbContext<'a>,
|
||||
source: Arc<dyn RelationalAlgebra + 'a>,
|
||||
binding: String,
|
||||
target_info: TableInfo,
|
||||
assoc_infos: Vec<AssocInfo>,
|
||||
extract_map: StaticExpr,
|
||||
upsert: bool,
|
||||
}
|
||||
|
||||
// problem: binding map must survive optimization. now it doesn't
|
||||
impl<'a> Insertion<'a> {
|
||||
pub(crate) fn build(
|
||||
ctx: &'a TempDbContext<'a>,
|
||||
prev: Option<Arc<dyn RelationalAlgebra + 'a>>,
|
||||
mut args: Pairs,
|
||||
upsert: bool,
|
||||
) -> Result<Self> {
|
||||
let not_enough_args = || {
|
||||
AlgebraParseError::NotEnoughArguments(
|
||||
(if upsert { NAME_UPSERT } else { NAME_INSERTION }).to_string(),
|
||||
)
|
||||
};
|
||||
let source = match prev {
|
||||
Some(v) => v,
|
||||
None => build_relational_expr(ctx, args.next().ok_or_else(not_enough_args)?)?,
|
||||
};
|
||||
let table_name = args.next().ok_or_else(not_enough_args)?;
|
||||
let (table_name, assoc_names) = parse_table_with_assocs(table_name.as_str())?;
|
||||
let pair = args
|
||||
.next()
|
||||
.ok_or_else(not_enough_args)?
|
||||
.into_inner()
|
||||
.next()
|
||||
.unwrap();
|
||||
assert_rule(&pair, Rule::scoped_dict, NAME_INSERTION, 2)?;
|
||||
let (binding, keys, extract_map) = parse_scoped_dict(pair)?;
|
||||
if !keys.is_empty() {
|
||||
return Err(
|
||||
AlgebraParseError::Parse("Cannot have keyed map in Insert".to_string()).into(),
|
||||
);
|
||||
}
|
||||
let extract_map = extract_map.to_static();
|
||||
|
||||
let target_id = ctx
|
||||
.resolve_table(&table_name)
|
||||
.ok_or_else(|| AlgebraParseError::TableNotFound(table_name.to_string()))?;
|
||||
let target_info = ctx.get_table_info(target_id)?;
|
||||
let assoc_infos = ctx
|
||||
.get_table_assocs(target_id)?
|
||||
.into_iter()
|
||||
.filter(|v| assoc_names.contains(&v.name))
|
||||
.collect::<Vec<_>>();
|
||||
Ok(Self {
|
||||
ctx,
|
||||
binding,
|
||||
source,
|
||||
target_info,
|
||||
assoc_infos,
|
||||
extract_map,
|
||||
upsert,
|
||||
})
|
||||
}
|
||||
|
||||
fn build_binding_map_inner(&self) -> Result<BTreeMap<String, TupleSetIdx>> {
|
||||
let mut binding_map_inner = BTreeMap::new();
|
||||
match &self.target_info {
|
||||
TableInfo::Node(n) => {
|
||||
for (i, k) in n.keys.iter().enumerate() {
|
||||
binding_map_inner.insert(
|
||||
k.name.clone(),
|
||||
TupleSetIdx {
|
||||
is_key: true,
|
||||
t_set: 0,
|
||||
col_idx: i,
|
||||
},
|
||||
);
|
||||
}
|
||||
for (i, k) in n.vals.iter().enumerate() {
|
||||
binding_map_inner.insert(
|
||||
k.name.clone(),
|
||||
TupleSetIdx {
|
||||
is_key: false,
|
||||
t_set: 0,
|
||||
col_idx: i,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
TableInfo::Edge(e) => {
|
||||
let src = self.ctx.get_node_info(e.src_id)?;
|
||||
let dst = self.ctx.get_node_info(e.dst_id)?;
|
||||
for (i, k) in src.keys.iter().enumerate() {
|
||||
binding_map_inner.insert(
|
||||
k.name.clone(),
|
||||
TupleSetIdx {
|
||||
is_key: true,
|
||||
t_set: 0,
|
||||
col_idx: i + 1,
|
||||
},
|
||||
);
|
||||
}
|
||||
for (i, k) in dst.keys.iter().enumerate() {
|
||||
binding_map_inner.insert(
|
||||
k.name.clone(),
|
||||
TupleSetIdx {
|
||||
is_key: true,
|
||||
t_set: 0,
|
||||
col_idx: i + 2 + src.keys.len(),
|
||||
},
|
||||
);
|
||||
}
|
||||
for (i, k) in e.keys.iter().enumerate() {
|
||||
binding_map_inner.insert(
|
||||
k.name.clone(),
|
||||
TupleSetIdx {
|
||||
is_key: true,
|
||||
t_set: 0,
|
||||
col_idx: i + 2 + src.keys.len() + dst.keys.len(),
|
||||
},
|
||||
);
|
||||
}
|
||||
for (i, k) in e.vals.iter().enumerate() {
|
||||
binding_map_inner.insert(
|
||||
k.name.clone(),
|
||||
TupleSetIdx {
|
||||
is_key: false,
|
||||
t_set: 0,
|
||||
col_idx: i,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
for (iset, info) in self.assoc_infos.iter().enumerate() {
|
||||
for (i, k) in info.vals.iter().enumerate() {
|
||||
binding_map_inner.insert(
|
||||
k.name.clone(),
|
||||
TupleSetIdx {
|
||||
is_key: false,
|
||||
t_set: iset + 1,
|
||||
col_idx: i,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(binding_map_inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> RelationalAlgebra for Insertion<'a> {
|
||||
fn name(&self) -> &str {
|
||||
if self.upsert {
|
||||
NAME_UPSERT
|
||||
} else {
|
||||
NAME_INSERTION
|
||||
}
|
||||
}
|
||||
|
||||
fn binding_map(&self) -> Result<BindingMap> {
|
||||
let inner = self.build_binding_map_inner()?;
|
||||
Ok(BTreeMap::from([(self.binding.clone(), inner)]))
|
||||
}
|
||||
|
||||
fn iter<'b>(&'b self) -> Result<Box<dyn Iterator<Item = Result<TupleSet>> + 'b>> {
|
||||
let source_map = self.source.binding_map()?;
|
||||
let binding_ctx = BindingMapEvalContext {
|
||||
map: &source_map,
|
||||
parent: self.ctx,
|
||||
};
|
||||
let extract_map = match self.extract_map.clone().partial_eval(&binding_ctx)? {
|
||||
Expr::Dict(d) => d,
|
||||
v => return Err(AlgebraParseError::Parse(format!("{:?}", v)).into()),
|
||||
};
|
||||
|
||||
let (key_builder, val_builder, inv_key_builder) = self.make_key_builders(&extract_map)?;
|
||||
let assoc_val_builders = self
|
||||
.assoc_infos
|
||||
.iter()
|
||||
.map(|info| {
|
||||
(
|
||||
info.tid,
|
||||
info.vals
|
||||
.iter()
|
||||
.map(|v| v.make_extractor(&extract_map))
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let target_key = self.target_info.table_id();
|
||||
|
||||
let mut eval_ctx = TupleSetEvalContext {
|
||||
tuple_set: Default::default(),
|
||||
txn: self.ctx.txn.clone(),
|
||||
temp_db: self.ctx.sess.temp.clone(),
|
||||
write_options: default_write_options(),
|
||||
};
|
||||
|
||||
let r_opts = default_read_options();
|
||||
let mut temp_slice = PinnableSlicePtr::default();
|
||||
|
||||
Ok(Box::new(self.source.iter()?.map(
|
||||
move |tset| -> Result<TupleSet> {
|
||||
eval_ctx.set_tuple_set(tset?);
|
||||
let mut key = eval_ctx.eval_to_tuple(target_key.id, &key_builder)?;
|
||||
let val = eval_ctx.eval_to_tuple(DataKind::Data as u32, &val_builder)?;
|
||||
if !self.upsert {
|
||||
let existing = if target_key.in_root {
|
||||
eval_ctx.txn.get(&r_opts, &key, &mut temp_slice)?
|
||||
} else {
|
||||
eval_ctx.temp_db.get(&r_opts, &key, &mut temp_slice)?
|
||||
};
|
||||
if existing {
|
||||
return Err(AlgebraParseError::KeyConflict(key.to_owned()).into());
|
||||
}
|
||||
}
|
||||
if target_key.in_root {
|
||||
eval_ctx.txn.put(&key, &val)?;
|
||||
} else {
|
||||
eval_ctx.temp_db.put(&eval_ctx.write_options, &key, &val)?;
|
||||
}
|
||||
if let Some(builder) = &inv_key_builder {
|
||||
let inv_key = eval_ctx.eval_to_tuple(target_key.id, builder)?;
|
||||
if target_key.in_root {
|
||||
eval_ctx.txn.put(&inv_key, &key)?;
|
||||
} else {
|
||||
eval_ctx
|
||||
.temp_db
|
||||
.put(&eval_ctx.write_options, &inv_key, &key)?;
|
||||
}
|
||||
}
|
||||
let assoc_vals = assoc_val_builders
|
||||
.iter()
|
||||
.map(|(tid, builder)| -> Result<OwnTuple> {
|
||||
let ret = eval_ctx.eval_to_tuple(DataKind::Data as u32, builder)?;
|
||||
key.overwrite_prefix(tid.id);
|
||||
if tid.in_root {
|
||||
eval_ctx.txn.put(&key, &ret)?;
|
||||
} else {
|
||||
eval_ctx.temp_db.put(&eval_ctx.write_options, &key, &ret)?;
|
||||
}
|
||||
Ok(ret)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
key.overwrite_prefix(target_key.id);
|
||||
|
||||
let mut ret = TupleSet::default();
|
||||
ret.push_key(key.into());
|
||||
ret.push_val(val.into());
|
||||
for av in assoc_vals {
|
||||
ret.push_val(av.into())
|
||||
}
|
||||
Ok(ret)
|
||||
},
|
||||
)))
|
||||
}
|
||||
|
||||
fn identity(&self) -> Option<TableInfo> {
|
||||
Some(self.target_info.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Insertion<'a> {
|
||||
fn make_key_builders(&self, extract_map: &BTreeMap<String, Expr>) -> Result<KeyBuilderSet> {
|
||||
let ret = match &self.target_info {
|
||||
TableInfo::Node(n) => {
|
||||
let key_builder = n
|
||||
.keys
|
||||
.iter()
|
||||
.map(|v| v.make_extractor(&extract_map))
|
||||
.collect::<Vec<_>>();
|
||||
let val_builder = n
|
||||
.vals
|
||||
.iter()
|
||||
.map(|v| v.make_extractor(&extract_map))
|
||||
.collect::<Vec<_>>();
|
||||
(key_builder, val_builder, None)
|
||||
}
|
||||
TableInfo::Edge(e) => {
|
||||
let src = self.ctx.get_node_info(e.src_id)?;
|
||||
let dst = self.ctx.get_node_info(e.dst_id)?;
|
||||
let src_key_part = [(Expr::Const(Value::Int(e.src_id.id as i64)), Typing::Any)];
|
||||
let dst_key_part = [(Expr::Const(Value::Int(e.dst_id.id as i64)), Typing::Any)];
|
||||
let fwd_edge_part = [(Expr::Const(Value::Bool(true)), Typing::Any)];
|
||||
let bwd_edge_part = [(Expr::Const(Value::Bool(true)), Typing::Any)];
|
||||
let key_builder = src_key_part
|
||||
.into_iter()
|
||||
.chain(src.keys.iter().map(|v| v.make_extractor(&extract_map)))
|
||||
.chain(fwd_edge_part.into_iter())
|
||||
.chain(dst.keys.iter().map(|v| v.make_extractor(&extract_map)))
|
||||
.chain(e.keys.iter().map(|v| v.make_extractor(&extract_map)))
|
||||
.collect::<Vec<_>>();
|
||||
let inv_key_builder = dst_key_part
|
||||
.into_iter()
|
||||
.chain(dst.keys.iter().map(|v| v.make_extractor(&extract_map)))
|
||||
.chain(bwd_edge_part.into_iter())
|
||||
.chain(src.keys.iter().map(|v| v.make_extractor(&extract_map)))
|
||||
.chain(e.keys.iter().map(|v| v.make_extractor(&extract_map)))
|
||||
.collect::<Vec<_>>();
|
||||
let val_builder = e
|
||||
.vals
|
||||
.iter()
|
||||
.map(|v| v.make_extractor(&extract_map))
|
||||
.collect::<Vec<_>>();
|
||||
(key_builder, val_builder, Some(inv_key_builder))
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
@ -0,0 +1,311 @@
|
||||
use crate::algebra::op::{RelationalAlgebra, TableInfoByIdCache, TableInfoByNameCache};
|
||||
use crate::algebra::parser::{assert_rule, AlgebraParseError};
|
||||
use crate::context::TempDbContext;
|
||||
use crate::data::expr::Expr;
|
||||
use crate::data::tuple::{DataKind, OwnTuple};
|
||||
use crate::data::tuple_set::{BindingMap, TableId, TupleSet, TupleSetIdx};
|
||||
use crate::data::value::Value;
|
||||
use crate::ddl::reify::TableInfo;
|
||||
use crate::parser::text_identifier::{build_name_in_def, parse_table_with_assocs};
|
||||
use crate::parser::{CozoParser, Pairs, Rule};
|
||||
use crate::runtime::options::{default_read_options, default_write_options};
|
||||
use anyhow::Result;
|
||||
use cozorocks::PinnableSlicePtr;
|
||||
use pest::Parser;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(crate) const NAME_TAGGED_INSERTION: &str = "InsertTagged";
|
||||
pub(crate) const NAME_TAGGED_UPSERT: &str = "UpsertTagged";
|
||||
|
||||
type TaggedInsertionSet = (
|
||||
OwnTuple,
|
||||
OwnTuple,
|
||||
Option<OwnTuple>,
|
||||
Vec<(TableId, OwnTuple)>,
|
||||
);
|
||||
|
||||
pub(crate) struct TaggedInsertion<'a> {
|
||||
ctx: &'a TempDbContext<'a>,
|
||||
// source: Arc<Vec<StaticValue>>,
|
||||
values: BTreeMap<TableId, Vec<TaggedInsertionSet>>,
|
||||
tally: BTreeMap<String, usize>,
|
||||
binding: String,
|
||||
upsert: bool,
|
||||
}
|
||||
|
||||
impl<'a> TaggedInsertion<'a> {
|
||||
pub(crate) fn build(
|
||||
ctx: &'a TempDbContext<'a>,
|
||||
prev: Option<Arc<dyn RelationalAlgebra + 'a>>,
|
||||
mut args: Pairs,
|
||||
upsert: bool,
|
||||
) -> Result<Self> {
|
||||
let a_name = if upsert {
|
||||
NAME_TAGGED_UPSERT
|
||||
} else {
|
||||
NAME_TAGGED_INSERTION
|
||||
};
|
||||
if !matches!(prev, None) {
|
||||
return Err(AlgebraParseError::Unchainable(a_name.to_string()).into());
|
||||
}
|
||||
let not_enough_args = || AlgebraParseError::NotEnoughArguments(a_name.to_string());
|
||||
let values = args
|
||||
.next()
|
||||
.ok_or_else(not_enough_args)?
|
||||
.into_inner()
|
||||
.next()
|
||||
.ok_or_else(not_enough_args)?;
|
||||
assert_rule(&values, Rule::expr, a_name, 0)?;
|
||||
let values = Expr::try_from(values)?
|
||||
.interpret_eval(ctx)?
|
||||
.to_static()
|
||||
.into_vec()
|
||||
.map_err(|e| {
|
||||
AlgebraParseError::WrongArgumentType(a_name.to_string(), 0, format!("{:?}", e))
|
||||
})?;
|
||||
let (values, tally) = Self::make_values(values, ctx)?;
|
||||
|
||||
let binding = match args.next() {
|
||||
None => "_".to_string(),
|
||||
Some(pair) => {
|
||||
let pair = CozoParser::parse(Rule::name_in_def_all, pair.as_str())?
|
||||
.next()
|
||||
.unwrap();
|
||||
build_name_in_def(pair, true)?
|
||||
}
|
||||
};
|
||||
Ok(Self {
|
||||
ctx,
|
||||
binding,
|
||||
upsert,
|
||||
values,
|
||||
tally,
|
||||
})
|
||||
}
|
||||
|
||||
fn make_values(
|
||||
source: Vec<Value>,
|
||||
ctx: &'a TempDbContext<'a>,
|
||||
) -> Result<(
|
||||
BTreeMap<TableId, Vec<TaggedInsertionSet>>,
|
||||
BTreeMap<String, usize>,
|
||||
)> {
|
||||
let mut collected = BTreeMap::new();
|
||||
let mut tally = BTreeMap::new();
|
||||
|
||||
let mut cache = TableInfoByNameCache {
|
||||
ctx,
|
||||
cache: Default::default(),
|
||||
};
|
||||
|
||||
let mut id_cache = TableInfoByIdCache {
|
||||
ctx,
|
||||
cache: Default::default(),
|
||||
};
|
||||
|
||||
for value in source.iter() {
|
||||
let gen_err = || AlgebraParseError::ValueError(value.clone().to_static());
|
||||
let d_map = value.get_map().ok_or_else(gen_err)?;
|
||||
let targets = d_map
|
||||
.get("_type")
|
||||
.ok_or_else(gen_err)?
|
||||
.get_str()
|
||||
.ok_or_else(gen_err)?;
|
||||
let (main, assocs) = parse_table_with_assocs(targets)?;
|
||||
let main_info = cache.get_info(&main)?;
|
||||
let (key_tuple, val_tuple, inv_key_tuple) = match main_info.as_ref() {
|
||||
TableInfo::Node(n) => {
|
||||
*tally.entry(n.name.to_string()).or_default() += 1;
|
||||
let mut key_tuple = OwnTuple::with_prefix(n.tid.id);
|
||||
for col in &n.keys {
|
||||
let k = &col.name as &str;
|
||||
let val = d_map.get(k).unwrap_or(&Value::Null);
|
||||
let val = col.typing.coerce_ref(val)?;
|
||||
key_tuple.push_value(&val);
|
||||
}
|
||||
|
||||
let mut val_tuple = OwnTuple::with_data_prefix(DataKind::Data);
|
||||
for col in &n.vals {
|
||||
let k = &col.name as &str;
|
||||
let val = d_map.get(k).unwrap_or(&Value::Null);
|
||||
let val = col.typing.coerce_ref(val)?;
|
||||
val_tuple.push_value(&val);
|
||||
}
|
||||
|
||||
(key_tuple, val_tuple, None)
|
||||
}
|
||||
TableInfo::Edge(e) => {
|
||||
*tally.entry(e.name.to_string()).or_default() += 1;
|
||||
let src = id_cache.get_info(e.src_id)?;
|
||||
let dst = id_cache.get_info(e.dst_id)?;
|
||||
let mut key_tuple = OwnTuple::with_prefix(e.tid.id);
|
||||
key_tuple.push_int(e.src_id.id as i64);
|
||||
let mut inv_key_tuple = OwnTuple::with_prefix(e.tid.id);
|
||||
inv_key_tuple.push_int(e.dst_id.id as i64);
|
||||
let mut val_tuple = OwnTuple::with_data_prefix(DataKind::Data);
|
||||
|
||||
for col in &src.as_node()?.keys {
|
||||
let k = &col.name as &str;
|
||||
let val = d_map.get(k).unwrap_or(&Value::Null);
|
||||
let val = col.typing.coerce_ref(val)?;
|
||||
key_tuple.push_value(&val);
|
||||
}
|
||||
|
||||
key_tuple.push_bool(true);
|
||||
|
||||
for col in &dst.as_node()?.keys {
|
||||
let k = &col.name as &str;
|
||||
let val = d_map.get(k).unwrap_or(&Value::Null);
|
||||
let val = col.typing.coerce_ref(val)?;
|
||||
key_tuple.push_value(&val);
|
||||
inv_key_tuple.push_value(&val);
|
||||
}
|
||||
|
||||
inv_key_tuple.push_bool(false);
|
||||
|
||||
for col in &src.as_node()?.keys {
|
||||
let k = &col.name as &str;
|
||||
let val = d_map.get(k).unwrap_or(&Value::Null);
|
||||
let val = col.typing.coerce_ref(val)?;
|
||||
inv_key_tuple.push_value(&val);
|
||||
}
|
||||
|
||||
for col in &e.keys {
|
||||
let k = &col.name as &str;
|
||||
let val = d_map.get(k).unwrap_or(&Value::Null);
|
||||
let val = col.typing.coerce_ref(val)?;
|
||||
key_tuple.push_value(&val);
|
||||
}
|
||||
|
||||
for col in &e.vals {
|
||||
let k = &col.name as &str;
|
||||
let val = d_map.get(k).unwrap_or(&Value::Null);
|
||||
let val = col.typing.coerce_ref(val)?;
|
||||
val_tuple.push_value(&val);
|
||||
}
|
||||
|
||||
(key_tuple, val_tuple, Some(inv_key_tuple))
|
||||
}
|
||||
_ => return Err(AlgebraParseError::WrongTableKind(main_info.table_id()).into()),
|
||||
};
|
||||
let mut assoc_vecs = vec![];
|
||||
for assoc_name in assocs.into_iter() {
|
||||
let assoc_info = cache.get_info(&assoc_name)?;
|
||||
let assoc_info = assoc_info.as_assoc()?;
|
||||
*tally.entry(assoc_info.name.to_string()).or_default() += 1;
|
||||
if assoc_info.src_id != main_info.table_id() {
|
||||
return Err(AlgebraParseError::NoAssociation(main, assoc_name).into());
|
||||
}
|
||||
let mut assoc_tuple = OwnTuple::with_data_prefix(DataKind::Data);
|
||||
for col in &assoc_info.vals {
|
||||
let k = &col.name as &str;
|
||||
let val = d_map.get(k).unwrap_or(&Value::Null);
|
||||
let val = col.typing.coerce_ref(val)?;
|
||||
assoc_tuple.push_value(&val);
|
||||
}
|
||||
assoc_vecs.push((assoc_info.tid, assoc_tuple));
|
||||
}
|
||||
|
||||
let cur_table = collected
|
||||
.entry(main_info.table_id())
|
||||
.or_insert_with(Vec::new);
|
||||
cur_table.push((key_tuple, val_tuple, inv_key_tuple, assoc_vecs));
|
||||
}
|
||||
|
||||
dbg!(&collected);
|
||||
|
||||
Ok((collected, tally))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'b> RelationalAlgebra for TaggedInsertion<'b> {
|
||||
fn name(&self) -> &str {
|
||||
if self.upsert {
|
||||
NAME_TAGGED_UPSERT
|
||||
} else {
|
||||
NAME_TAGGED_INSERTION
|
||||
}
|
||||
}
|
||||
|
||||
fn binding_map(&self) -> Result<BindingMap> {
|
||||
Ok(BTreeMap::from([(
|
||||
self.binding.clone(),
|
||||
BTreeMap::from([
|
||||
(
|
||||
"table".to_string(),
|
||||
TupleSetIdx {
|
||||
is_key: true,
|
||||
t_set: 0,
|
||||
col_idx: 0,
|
||||
},
|
||||
),
|
||||
(
|
||||
"n".to_string(),
|
||||
TupleSetIdx {
|
||||
is_key: false,
|
||||
t_set: 0,
|
||||
col_idx: 0,
|
||||
},
|
||||
),
|
||||
]),
|
||||
)]))
|
||||
}
|
||||
|
||||
fn iter<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<TupleSet>> + 'a>> {
|
||||
let mut temp_slice = PinnableSlicePtr::default();
|
||||
let r_opts = default_read_options();
|
||||
let w_opts = default_write_options();
|
||||
for (tid, rows) in &self.values {
|
||||
for (key, val, inv_key, assocs) in rows {
|
||||
if !self.upsert {
|
||||
let exists = if tid.in_root {
|
||||
self.ctx.txn.get(&r_opts, key, &mut temp_slice)?
|
||||
} else {
|
||||
self.ctx.sess.temp.get(&r_opts, key, &mut temp_slice)?
|
||||
};
|
||||
if exists {
|
||||
return Err(AlgebraParseError::KeyConflict(key.to_owned()).into());
|
||||
}
|
||||
}
|
||||
if tid.in_root {
|
||||
self.ctx.txn.put(key, val)?;
|
||||
} else {
|
||||
self.ctx.sess.temp.put(&w_opts, key, val)?;
|
||||
}
|
||||
if let Some(ik) = inv_key {
|
||||
if tid.in_root {
|
||||
self.ctx.txn.put(ik, key)?;
|
||||
} else {
|
||||
self.ctx.sess.temp.put(&w_opts, ik, key)?;
|
||||
}
|
||||
}
|
||||
if !assocs.is_empty() {
|
||||
let mut k = key.clone();
|
||||
for (aid, v) in assocs {
|
||||
k.overwrite_prefix(aid.id);
|
||||
if aid.in_root {
|
||||
self.ctx.txn.put(&k, v)?;
|
||||
} else {
|
||||
self.ctx.sess.temp.put(&w_opts, &k, v)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Box::new(self.tally.iter().map(|(name, n)| {
|
||||
let mut key_tuple = OwnTuple::with_prefix(0);
|
||||
let mut val_tuple = OwnTuple::with_prefix(0);
|
||||
key_tuple.push_str(name);
|
||||
val_tuple.push_int(*n as i64);
|
||||
Ok(TupleSet {
|
||||
keys: vec![key_tuple.into()],
|
||||
vals: vec![val_tuple.into()],
|
||||
})
|
||||
})))
|
||||
}
|
||||
|
||||
fn identity(&self) -> Option<TableInfo> {
|
||||
None
|
||||
}
|
||||
}
|
@ -1,6 +0,0 @@
|
||||
use crate::data::value::StaticValue;
|
||||
|
||||
pub(crate) struct ValueSet {
|
||||
values: Vec<StaticValue>,
|
||||
default_table: Option<String>,
|
||||
}
|
Loading…
Reference in New Issue