insertion of values

main
Ziyang Hu 2 years ago
parent 0adf90e6c9
commit 743529d7f4

@ -391,7 +391,7 @@ struct TDBBridge {
is_odb = (tdb_ == nullptr);
}
inline unique_ptr<TransactionBridge> begin_t_transaction(
inline shared_ptr<TransactionBridge> begin_t_transaction(
unique_ptr<WriteOptions> w_ops,
// unique_ptr<WriteOptions> raw_w_ops,
// unique_ptr<ReadOptions> r_ops,
@ -400,7 +400,7 @@ struct TDBBridge {
if (tdb == nullptr) {
return unique_ptr<TransactionBridge>(nullptr);
}
auto ret = make_unique<TransactionBridge>();
auto ret = make_shared<TransactionBridge>();
ret->raw_db = tdb;
// ret->r_ops = std::move(r_ops);
ret->w_ops = std::move(w_ops);
@ -412,7 +412,7 @@ struct TDBBridge {
return ret;
}
inline unique_ptr<TransactionBridge> begin_o_transaction(
inline shared_ptr<TransactionBridge> begin_o_transaction(
unique_ptr<WriteOptions> w_ops,
// unique_ptr<WriteOptions> raw_w_ops,
// unique_ptr<ReadOptions> r_ops,
@ -421,7 +421,7 @@ struct TDBBridge {
if (odb == nullptr) {
return unique_ptr<TransactionBridge>(nullptr);
}
auto ret = make_unique<TransactionBridge>();
auto ret = make_shared<TransactionBridge>();
ret->raw_db = odb;
// ret->r_ops = std::move(r_ops);
ret->w_ops = std::move(w_ops);

@ -182,12 +182,12 @@ mod ffi {
self: &TDBBridge,
w_ops: UniquePtr<WriteOptions>,
txn_options: UniquePtr<TransactionOptions>,
) -> UniquePtr<TransactionBridge>;
) -> SharedPtr<TransactionBridge>;
fn begin_o_transaction(
self: &TDBBridge,
w_ops: UniquePtr<WriteOptions>,
txn_options: UniquePtr<OptimisticTransactionOptions>,
) -> UniquePtr<TransactionBridge>;
) -> SharedPtr<TransactionBridge>;
fn open_tdb_raw(
options: &Options,
txn_options: &TransactionDBOptions,

@ -207,10 +207,11 @@ impl IteratorPtr {
}
}
pub struct TransactionPtr(UniquePtr<TransactionBridge>);
#[derive(Clone)]
pub struct TransactionPtr(SharedPtr<TransactionBridge>);
impl Deref for TransactionPtr {
type Target = UniquePtr<TransactionBridge>;
type Target = TransactionBridge;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
@ -223,7 +224,7 @@ impl TransactionPtr {
/// Only for testing use, as a placeholder
#[inline]
pub unsafe fn null() -> Self {
TransactionPtr(UniquePtr::null())
TransactionPtr(SharedPtr::null())
}
#[inline]
pub fn set_snapshot(&self) {

@ -3,7 +3,7 @@ use crate::data::eval::{EvalError, PartialEvalContext};
use crate::data::expr::{Expr, StaticExpr};
use crate::data::parser::{parse_scoped_dict, ExprParseError};
use crate::data::tuple::{DataKind, OwnTuple};
use crate::data::tuple_set::{BindingMap, BindingMapEvalContext, TableId, TupleSet, TupleSetIdx};
use crate::data::tuple_set::{BindingMap, BindingMapEvalContext, TableId, TupleSet, TupleSetError, TupleSetIdx};
use crate::data::typing::Typing;
use crate::data::value::{StaticValue, Value};
use crate::ddl::reify::{
@ -17,6 +17,9 @@ use pest::Parser;
use std::collections::{BTreeMap, BTreeSet};
use std::result;
use std::sync::Arc;
use cozorocks::BridgeError;
use crate::ddl::parser::ColSchema;
use crate::runtime::options::default_write_options;
#[derive(thiserror::Error, Debug)]
pub(crate) enum AlgebraParseError {
@ -58,6 +61,12 @@ pub(crate) enum AlgebraParseError {
#[error(transparent)]
Reify(#[from] DdlReifyError),
#[error(transparent)]
TupleSet(#[from] TupleSetError),
#[error(transparent)]
Bridge(#[from] BridgeError),
}
impl From<pest::error::Error<Rule>> for AlgebraParseError {
@ -123,15 +132,15 @@ impl<'a> InterpretContext for TempDbContext<'a> {
pub(crate) trait RelationalAlgebra {
fn name(&self) -> &str;
fn binding_map(&self) -> BindingMap;
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = TupleSet> + 'a>;
fn binding_map(&self) -> Result<BindingMap>;
fn iter<'a>(&'a self) -> Result<Box<dyn Iterator<Item=Result<TupleSet>> + 'a>>;
}
const NAME_RA_FROM_VALUES: &str = "Values";
#[derive(Clone, Debug)]
struct RaFromValues {
binding: BindingMap,
binding_map: BindingMap,
values: Vec<Vec<StaticValue>>,
}
@ -149,7 +158,7 @@ fn assert_rule(pair: &Pair, rule: Rule, name: &str, u: usize) -> Result<()> {
impl RaFromValues {
fn build<'a>(
ctx: &'a impl InterpretContext,
ctx: &'a TempDbContext<'a>,
prev: Option<Arc<dyn RelationalAlgebra + 'a>>,
mut args: Pairs,
) -> Result<Self> {
@ -206,10 +215,8 @@ impl RaFromValues {
Err(v) => Err(AlgebraParseError::ValueError(v)),
})
.collect::<Result<Vec<_>>>()?;
dbg!(&binding_map);
dbg!(&values);
Ok(Self {
binding: binding_map,
binding_map: binding_map,
values,
})
}
@ -220,11 +227,11 @@ impl RelationalAlgebra for RaFromValues {
NAME_RA_FROM_VALUES
}
fn binding_map(&self) -> BindingMap {
self.binding.clone()
fn binding_map(&self) -> Result<BindingMap> {
Ok(self.binding_map.clone())
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = TupleSet> + 'a> {
fn iter<'a>(&'a self) -> Result<Box<dyn Iterator<Item=Result<TupleSet>> + 'a>> {
let it = self.values.iter().map(|vs| {
let mut tuple = OwnTuple::with_data_prefix(DataKind::Data);
for v in vs {
@ -232,31 +239,27 @@ impl RelationalAlgebra for RaFromValues {
}
let mut tset = TupleSet::default();
tset.push_val(tuple.into());
tset
Ok(tset)
});
Box::new(it)
Ok(Box::new(it))
}
}
const NAME_RA_INSERT: &str = "Insert";
struct RaInsert<'a, C: InterpretContext + 'a> {
ctx: &'a C,
struct RaInsert<'a> {
ctx: &'a TempDbContext<'a>,
source: Arc<dyn RelationalAlgebra + 'a>,
binding: String,
target_info: TableInfo,
assoc_infos: Vec<AssocInfo>,
extract_map: StaticExpr,
// key_builder: Vec<(StaticExpr, Typing)>,
// inv_key_builder: Option<Vec<(StaticExpr, Typing)>>,
// val_builder: Vec<(StaticExpr, Typing)>,
// assoc_val_builders: BTreeMap<TableId, Vec<(StaticExpr, Typing)>>,
binding_map: BindingMap,
}
// problem: binding map must survive optimization. now it doesn't
impl<'a, C: InterpretContext + 'a> RaInsert<'a, C> {
impl<'a> RaInsert<'a> {
fn build(
ctx: &'a C,
ctx: &'a TempDbContext<'a>,
prev: Option<Arc<dyn RelationalAlgebra + 'a>>,
mut args: Pairs,
) -> Result<Self> {
@ -286,96 +289,26 @@ impl<'a, C: InterpretContext + 'a> RaInsert<'a, C> {
.resolve_table(&table_name)
.ok_or_else(|| AlgebraParseError::TableNotFound(table_name.to_string()))?;
let target_info = ctx.get_table_info(target_id)?;
// let source_map = source.binding_map();
// let binding_ctx = BindingMapEvalContext {
// map: source_map,
// parent: ctx,
// };
// let extract_map = match vals.partial_eval(&binding_ctx)? {
// Expr::Dict(d) => d,
// v => return Err(AlgebraParseError::Parse(format!("{:?}", v))),
// };
//
// let keys = keys
// .into_iter()
// .map(|(k, v)| -> Result<(String, Expr)> {
// let v = v.partial_eval(&binding_ctx)?;
// Ok((k, v))
// })
// .collect::<Result<BTreeMap<_, _>>>()?;
// let (key_builder, val_builder, inv_key_builder) = match target_info {
// TableInfo::Node(n) => {
// let key_builder = n
// .keys
// .iter()
// .map(|col| {
// let extractor = extract_map
// .get(&col.name)
// .cloned()
// .unwrap_or(Expr::Const(Value::Null))
// .to_static();
// let typing = col.typing.clone();
// (extractor, typing)
// })
// .collect::<Vec<_>>();
// let val_builder = n
// .vals
// .iter()
// .map(|col| {
// let extractor = extract_map
// .get(&col.name)
// .cloned()
// .unwrap_or(Expr::Const(Value::Null))
// .to_static();
// let typing = col.typing.clone();
// (extractor, typing)
// })
// .collect::<Vec<_>>();
// (key_builder, val_builder, None)
// }
// TableInfo::Edge(e) => {
// todo!()
// }
// _ => return Err(AlgebraParseError::WrongTableKind(table_name.to_string())),
// };
// let assoc_infos = assoc_names
// .iter()
// .map(|name| -> Result<TableInfo> {
// let table_id = ctx
// .resolve_table(&table_name)
// .ok_or_else(|| AlgebraParseError::TableNotFound(table_name.to_string()))?;
// ctx.get_table_info(table_id)
// })
// .collect::<Result<Vec<_>>>()?;
let assoc_infos = ctx
.get_table_assocs(target_id)?
.into_iter()
.filter(|v| assoc_names.contains(&v.name))
.collect::<Vec<_>>();
let binding_map_inner = Self::build_binding_map_inner(ctx, &target_info, &assoc_infos)?;
let binding_map = BTreeMap::from([(binding, binding_map_inner)]);
dbg!(&target_info);
dbg!(&assoc_infos);
dbg!(&extract_map);
dbg!(&binding_map);
Ok(Self {
ctx,
binding,
source,
target_info,
assoc_infos,
extract_map,
binding_map,
})
}
fn build_binding_map_inner(
ctx: &impl InterpretContext,
target_info: &TableInfo,
assoc_infos: &Vec<AssocInfo>,
&self,
) -> Result<BTreeMap<String, TupleSetIdx>> {
let mut binding_map_inner = BTreeMap::new();
match &target_info {
match &self.target_info {
TableInfo::Node(n) => {
for (i, k) in n.keys.iter().enumerate() {
binding_map_inner.insert(
@ -399,8 +332,8 @@ impl<'a, C: InterpretContext + 'a> RaInsert<'a, C> {
}
}
TableInfo::Edge(e) => {
let src = ctx.get_node_info(e.src_id)?;
let dst = ctx.get_node_info(e.dst_id)?;
let src = self.ctx.get_node_info(e.src_id)?;
let dst = self.ctx.get_node_info(e.dst_id)?;
for (i, k) in src.keys.iter().enumerate() {
binding_map_inner.insert(
k.name.clone(),
@ -444,7 +377,7 @@ impl<'a, C: InterpretContext + 'a> RaInsert<'a, C> {
}
_ => unreachable!(),
}
for (iset, info) in assoc_infos.iter().enumerate() {
for (iset, info) in self.assoc_infos.iter().enumerate() {
for (i, k) in info.vals.iter().enumerate() {
binding_map_inner.insert(
k.name.clone(),
@ -460,22 +393,131 @@ impl<'a, C: InterpretContext + 'a> RaInsert<'a, C> {
}
}
impl<'a, C: InterpretContext + 'a> RelationalAlgebra for RaInsert<'a, C> {
impl<'a> RelationalAlgebra for RaInsert<'a> {
fn name(&self) -> &str {
NAME_RA_INSERT
}
fn binding_map(&self) -> BindingMap {
self.binding_map.clone()
fn binding_map(&self) -> Result<BindingMap> {
let inner = self.build_binding_map_inner()?;
Ok(BTreeMap::from([(self.binding.clone(), inner)]))
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = TupleSet> + 'b> {
self.source.iter()
fn iter<'b>(&'b self) -> Result<Box<dyn Iterator<Item=Result<TupleSet>> + 'b>> {
let source_map = self.source.binding_map()?;
let binding_ctx = BindingMapEvalContext {
map: &source_map,
parent: self.ctx,
};
let extract_map = match self.extract_map.clone().partial_eval(&binding_ctx)? {
Expr::Dict(d) => d,
v => return Err(AlgebraParseError::Parse(format!("{:?}", v))),
};
let build_extractor = |col: &ColSchema| {
let extractor = extract_map
.get(&col.name)
.cloned()
.unwrap_or(Expr::Const(Value::Null))
.to_static();
let typing = col.typing.clone();
(extractor, typing)
};
let (key_builder, val_builder, inv_key_builder) = match &self.target_info {
TableInfo::Node(n) => {
let key_builder = n
.keys
.iter()
.map(build_extractor)
.collect::<Vec<_>>();
let val_builder = n
.vals
.iter()
.map(build_extractor)
.collect::<Vec<_>>();
(key_builder, val_builder, None)
}
TableInfo::Edge(e) => {
let src = self.ctx.get_node_info(e.src_id)?;
let dst = self.ctx.get_node_info(e.dst_id)?;
let src_key_part = [(Expr::Const(Value::Int(e.src_id.id as i64)), Typing::Any)];
let dst_key_part = [(Expr::Const(Value::Int(e.dst_id.id as i64)), Typing::Any)];
let fwd_edge_part = [(Expr::Const(Value::Bool(true)), Typing::Any)];
let bwd_edge_part = [(Expr::Const(Value::Bool(true)), Typing::Any)];
let key_builder = src_key_part.into_iter()
.chain(src.keys.iter().map(build_extractor))
.chain(fwd_edge_part.into_iter())
.chain(dst.keys.iter().map(build_extractor))
.chain(e.keys.iter().map(build_extractor))
.collect::<Vec<_>>();
let inv_key_builder = dst_key_part.into_iter()
.chain(dst.keys.iter().map(build_extractor))
.chain(bwd_edge_part.into_iter())
.chain(src.keys.iter().map(build_extractor))
.chain(e.keys.iter().map(build_extractor))
.collect::<Vec<_>>();
let val_builder = e
.vals
.iter()
.map(build_extractor)
.collect::<Vec<_>>();
(key_builder, val_builder, Some(inv_key_builder))
}
_ => unreachable!(),
};
let assoc_val_builders = self.assoc_infos.iter().map(|info| {
(info.tid, info.vals.iter().map(build_extractor).collect::<Vec<_>>())
}).collect::<Vec<_>>();
let target_key = self.target_info.table_id();
let txn = self.ctx.txn.clone();
let temp_db = self.ctx.sess.temp.clone();
let write_opts = default_write_options();
Ok(Box::new(self.source.iter()?.map(move |tset| -> Result<TupleSet> {
let tset = tset?;
let mut key = tset.eval_to_tuple(target_key.id, &key_builder)?;
let val = tset.eval_to_tuple(DataKind::Data as u32, &val_builder)?;
if target_key.in_root {
txn.put(&key, &val)?;
} else {
temp_db.put(&write_opts, &key, &val)?;
}
if let Some(builder) = &inv_key_builder {
let inv_key = tset.eval_to_tuple(target_key.id, builder)?;
if target_key.in_root {
txn.put(&inv_key, &key)?;
} else {
temp_db.put(&write_opts, &inv_key, &key)?;
}
}
let assoc_vals = assoc_val_builders.iter().map(|(tid, builder)| -> Result<OwnTuple> {
let ret = tset.eval_to_tuple(DataKind::Data as u32, builder)?;
key.overwrite_prefix(tid.id);
if tid.in_root {
txn.put(&key, &ret)?;
} else {
temp_db.put(&write_opts, &key, &ret)?;
}
Ok(ret)
}).collect::<Result<Vec<_>>>()?;
key.overwrite_prefix(target_key.id);
let mut ret = TupleSet::default();
ret.push_key(key.into());
ret.push_val(val.into());
for av in assoc_vals {
ret.push_val(av.into())
}
Ok(ret)
})))
}
}
pub(crate) fn build_ra_expr<'a>(
ctx: &'a impl InterpretContext,
ctx: &'a TempDbContext,
pair: Pair,
) -> Result<Arc<dyn RelationalAlgebra + 'a>> {
let mut built: Option<Arc<dyn RelationalAlgebra>> = None;
@ -510,33 +552,44 @@ mod tests {
use crate::parser::{CozoParser, Rule};
use crate::runtime::session::tests::create_test_db;
use pest::Parser;
use crate::data::tuple::Tuple;
use crate::runtime::options::default_read_options;
#[test]
fn parse_ra() -> Result<()> {
let (db, mut sess) = create_test_db("_test_parser.db");
let ctx = sess.temp_ctx(true);
let s = r#"
Values(v: [id, name], [[100, 'confidential'], [101, 'top secret']])
.Insert(Department, d: {...v})
"#;
let ra = build_ra_expr(
&ctx,
CozoParser::parse(Rule::ra_expr_all, s)
.unwrap()
.into_iter()
.next()
.unwrap(),
)?;
for t in ra.iter() {
dbg!(t);
{
let ctx = sess.temp_ctx(true);
let s = r#"
Values(v: [id, name], [[100, 'confidential'], [101, 'top secret']])
.Insert(Department, d: {...v})
"#;
let ra = build_ra_expr(
&ctx,
CozoParser::parse(Rule::ra_expr_all, s)
.unwrap()
.into_iter()
.next()
.unwrap(),
)?;
for t in ra.iter().unwrap() {
t.unwrap();
}
ctx.txn.commit().unwrap();
}
let mut r_opts = default_read_options();
r_opts.set_total_order_seek(true);
let it = sess.main.iterator(&r_opts);
it.to_first();
while it.is_valid() {
let (k, v) = it.pair().unwrap();
let k = Tuple::new(k);
let v = Tuple::new(v);
if k.get_prefix() != 0 {
dbg!((k, v));
}
it.next();
}
// let s = r#"
// From(f:Person-[:HasJob]->j:Job,
// f.id == 101, j.id > 10)
// .Select(f: {*id: f.id})
// "#;
// build_ra_expr(CozoParser::parse(Rule::ra_expr_all, s).unwrap().into_iter().next().unwrap());
Ok(())
}
}

@ -3,7 +3,7 @@ use crate::data::op::{
OpMod, OpMul, OpNe, OpNot, OpNotNull, OpOr, OpPow, OpStrCat, OpSub, UnresolvedOp,
};
use crate::data::parser::ExprParseError;
use crate::data::tuple_set::{ColId, TableId, TupleSetIdx};
use crate::data::tuple_set::{TupleSetIdx};
use crate::data::value::{StaticValue, Value};
use crate::parser::{CozoParser, Rule};
use pest::Parser;

@ -1,11 +1,12 @@
use crate::data::eval::{EvalError, PartialEvalContext, RowEvalContext};
use crate::data::expr::Expr;
use crate::data::tuple::{ReifiedTuple, TupleError};
use crate::data::expr::{Expr};
use crate::data::tuple::{OwnTuple, ReifiedTuple, TupleError};
use crate::data::value::{StaticValue, Value};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::result;
use crate::data::typing::{Typing, TypingError};
#[derive(thiserror::Error, Debug)]
pub enum TupleSetError {
@ -17,6 +18,10 @@ pub enum TupleSetError {
Tuple(#[from] TupleError),
#[error("Failed to deserialize {0}")]
Deser(StaticValue),
#[error(transparent)]
Eval(#[from] EvalError),
#[error(transparent)]
Typing(#[from] TypingError),
}
type Result<T> = result::Result<T, TupleSetError>;
@ -135,16 +140,16 @@ impl TupleSet {
self.vals.extend(o.vals);
}
pub(crate) fn extend_keys<I, T>(&mut self, keys: I)
where
I: IntoIterator<Item = T>,
ReifiedTuple: From<T>,
where
I: IntoIterator<Item=T>,
ReifiedTuple: From<T>,
{
self.keys.extend(keys.into_iter().map(ReifiedTuple::from));
}
pub(crate) fn extend_vals<I, T>(&mut self, keys: I)
where
I: IntoIterator<Item = T>,
ReifiedTuple: From<T>,
where
I: IntoIterator<Item=T>,
ReifiedTuple: From<T>,
{
self.vals.extend(keys.into_iter().map(ReifiedTuple::from));
}
@ -191,11 +196,11 @@ impl TupleSet {
}
impl<I1, T1, I2, T2> From<(I1, I2)> for TupleSet
where
I1: IntoIterator<Item = T1>,
ReifiedTuple: From<T1>,
I2: IntoIterator<Item = T2>,
ReifiedTuple: From<T2>,
where
I1: IntoIterator<Item=T1>,
ReifiedTuple: From<T1>,
I2: IntoIterator<Item=T2>,
ReifiedTuple: From<T2>,
{
fn from((keys, vals): (I1, I2)) -> Self {
TupleSet {
@ -212,6 +217,20 @@ impl RowEvalContext for TupleSet {
}
}
pub(crate) type TupleBuilder<'a> = Vec<(Expr<'a>, Typing)>;
impl TupleSet {
pub(crate) fn eval_to_tuple(&self, prefix: u32, builder: &TupleBuilder) -> Result<OwnTuple> {
let mut target = OwnTuple::with_prefix(prefix);
for (expr, typing) in builder {
let value = expr.row_eval(self)?;
let value = typing.coerce(value)?;
target.push_value(&value);
}
Ok(target)
}
}
pub(crate) type BindingMap = BTreeMap<String, BTreeMap<String, TupleSetIdx>>;
pub(crate) struct BindingMapEvalContext<'a, T: PartialEvalContext + 'a> {

@ -1,7 +1,6 @@
use crate::data::expr::StaticExpr;
use crate::data::tuple::{DataKind, OwnTuple, Tuple};
use crate::data::tuple_set::{TableId, MIN_TABLE_ID_BOUND};
use crate::data::typing::Typing;
use crate::data::value::{StaticValue, Value};
use crate::ddl::parser::DdlSchema;
use crate::ddl::reify::{DdlContext, DdlReifyError, TableInfo};
@ -226,7 +225,7 @@ pub(crate) mod tests {
}
pub(crate) fn create_test_db(name: &str) -> (DbInstance, Session) {
let mut db = DbInstance::new("_test_session", false).unwrap();
let mut db = DbInstance::new(name, false).unwrap();
db.set_destroy_on_close(true);
let mut sess = db.session().unwrap().start().unwrap();
sess.run_script(persist_hr_test(), true).unwrap();

Loading…
Cancel
Save