From e9e47531ecd9e97787bbd4366e677c4c8b01ffac Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Sat, 4 Jun 2022 21:01:39 +0800 Subject: [PATCH] various fixes --- src/algebra/op/walk.rs | 74 +++++++++++++++++++-------------------- src/bin/cozo_rest/main.rs | 26 +++++--------- src/data/eval.rs | 32 +++++++++++++++++ src/ddl/parser.rs | 2 +- src/ddl/reify.rs | 1 + src/grammar.pest | 3 +- src/runtime/session.rs | 22 ++++++++++-- 7 files changed, 100 insertions(+), 60 deletions(-) diff --git a/src/algebra/op/walk.rs b/src/algebra/op/walk.rs index 3853ae3b..f7109b76 100644 --- a/src/algebra/op/walk.rs +++ b/src/algebra/op/walk.rs @@ -299,17 +299,16 @@ impl<'a> WalkOp<'a> { fn build_selection_iter( &self, it: Box>>, + truncate_kv_sizes: (usize, usize), ) -> Result>> { let extraction_vec = self.extraction_map.values().cloned().collect::>(); extraction_vec.iter().for_each(|ex| ex.aggr_reset()); let mut val_collectors = vec![]; for ex in &extraction_vec { - // TODO the check here is more complicated than in the group case: - // check that nothing non-aggr exceeds the allowed kv range - // if !ex.is_aggr_compatible() { - // return Err(AlgebraParseError::ScalarFnNotAllowed.into()); - // } + if !ex.is_truncate_aggr_compatible(truncate_kv_sizes.0, truncate_kv_sizes.1) { + return Err(AlgebraParseError::ScalarFnNotAllowed.into()); + } if let Ok(heads) = ex.clone().extract_aggr_heads() { val_collectors.extend(heads) } @@ -344,49 +343,50 @@ impl<'a> WalkOp<'a> { } let mut out = TupleSet::default(); out.vals.push(tuple.into()); - + last_tset = TupleSet::default(); Some(Ok(out)) } else { - let eval_ctx = TupleSetEvalContext { - tuple_set: &tset, - txn: &txn, - temp_db: &temp_db, - write_options: &w_opts, - }; - for (op, args) in &val_collectors { - match args.len() { - 0 => match op.put(&[]) { - Ok(_) => {} - Err(e) => return Some(Err(e)), - }, - 1 => { - let arg = args.iter().next().unwrap(); - let arg = match arg.row_eval(&eval_ctx) { - Ok(v) => v, - Err(e) => return Some(Err(e)), - }; - match op.put(&[arg]) { + if !last_tset.keys.is_empty() { + let eval_ctx = TupleSetEvalContext { + tuple_set: &last_tset, + txn: &txn, + temp_db: &temp_db, + write_options: &w_opts, + }; + for (op, args) in &val_collectors { + match args.len() { + 0 => match op.put(&[]) { Ok(_) => {} Err(e) => return Some(Err(e)), - }; - } - _ => { - let mut args_vals = Vec::with_capacity(args.len()); - for arg in args { + }, + 1 => { + let arg = args.iter().next().unwrap(); let arg = match arg.row_eval(&eval_ctx) { Ok(v) => v, Err(e) => return Some(Err(e)), }; - args_vals.push(arg); + match op.put(&[arg]) { + Ok(_) => {} + Err(e) => return Some(Err(e)), + }; + } + _ => { + let mut args_vals = Vec::with_capacity(args.len()); + for arg in args { + let arg = match arg.row_eval(&eval_ctx) { + Ok(v) => v, + Err(e) => return Some(Err(e)), + }; + args_vals.push(arg); + } + match op.put(&args_vals) { + Ok(_) => {} + Err(e) => return Some(Err(e)), + }; } - match op.put(&args_vals) { - Ok(_) => {} - Err(e) => return Some(Err(e)), - }; } } } - last_tset = tset.into_owned(); None } @@ -504,7 +504,7 @@ impl<'b> RelationalAlgebra for WalkOp<'b> { key_len: final_truncate_kv_size.0, }); - let iter = self.build_selection_iter(it)?; + let iter = self.build_selection_iter(it, final_truncate_kv_size)?; Ok(Box::new(iter)) } diff --git a/src/bin/cozo_rest/main.rs b/src/bin/cozo_rest/main.rs index 7f717ece..0663dac6 100644 --- a/src/bin/cozo_rest/main.rs +++ b/src/bin/cozo_rest/main.rs @@ -3,23 +3,15 @@ use cozo::DbInstance; use std::sync::Arc; struct AppStateWithDb { - db: DbInstance, + db: DbInstance } -#[get("/")] -async fn hello(data: web::Data) -> impl Responder { - // let sess = data.db.session().unwrap().start().unwrap(); - // let res = sess.get_next_main_table_id(); - HttpResponse::Ok().body(format!("Hello world! {:?}", None)) -} - -#[post("/echo")] -async fn echo(req_body: String) -> impl Responder { - HttpResponse::Ok().body(req_body) -} - -async fn manual_hello() -> impl Responder { - HttpResponse::Ok().body("Hey there!") +#[post("/")] +async fn query(body: web::Bytes, data: web::Data) -> impl Responder { + let text = String::from_utf8_lossy(body.as_ref()); + let mut sess = data.db.session().unwrap().start().unwrap(); + let res = sess.run_script(text, true); + HttpResponse::Ok().body(format!("{:?}", res)) } #[actix_web::main] @@ -34,9 +26,7 @@ async fn main() -> std::io::Result<()> { HttpServer::new(move || { App::new() .app_data(db.clone()) - .service(hello) - .service(echo) - .route("/hey", web::get().to(manual_hello)) + .service(query) }) .bind(("127.0.0.1", 8080))? .run() diff --git a/src/data/eval.rs b/src/data/eval.rs index 15553b98..76a48e8d 100644 --- a/src/data/eval.rs +++ b/src/data/eval.rs @@ -224,6 +224,38 @@ impl Expr { } } + pub(crate) fn is_truncate_aggr_compatible(&self, key_size: usize, val_size: usize) -> bool { + match self { + Expr::Const(_) => true, + Expr::List(l) => l.iter().all(|el| el.is_aggr_compatible()), + Expr::Dict(d) => d.values().all(|el| el.is_aggr_compatible()), + Expr::Variable(_) => false, + Expr::TupleSetIdx(TupleSetIdx { is_key, t_set, .. }) => { + if *is_key { + *t_set < key_size + } else { + *t_set < val_size + } + } + Expr::ApplyAgg(_, _, _) => true, + Expr::FieldAcc(_, arg) => arg.is_aggr_compatible(), + Expr::IdxAcc(_, arg) => arg.is_aggr_compatible(), + Expr::IfExpr(args) => { + let (a, b, c) = args.as_ref(); + a.is_aggr_compatible() && b.is_aggr_compatible() && c.is_aggr_compatible() + } + Expr::SwitchExpr(args) => args + .iter() + .all(|(cond, expr)| cond.is_aggr_compatible() && expr.is_aggr_compatible()), + Expr::OpAnd(args) => args.iter().all(|el| el.is_aggr_compatible()), + Expr::OpOr(args) => args.iter().all(|el| el.is_aggr_compatible()), + Expr::OpCoalesce(args) => args.iter().all(|el| el.is_aggr_compatible()), + Expr::OpMerge(args) => args.iter().all(|el| el.is_aggr_compatible()), + Expr::OpConcat(args) => args.iter().all(|el| el.is_aggr_compatible()), + Expr::BuiltinFn(_, args) => args.iter().all(|el| el.is_aggr_compatible()), + } + } + pub(crate) fn is_aggr_compatible(&self) -> bool { match self { Expr::Const(_) => true, diff --git a/src/ddl/parser.rs b/src/ddl/parser.rs index 5637910c..4c9b583d 100644 --- a/src/ddl/parser.rs +++ b/src/ddl/parser.rs @@ -129,7 +129,7 @@ impl<'a> TryFrom> for DdlSchema { Rule::assoc_def => DdlSchema::Assoc(pair.try_into()?), Rule::seq_def => DdlSchema::Sequence(pair.try_into()?), Rule::index_def => DdlSchema::Index(pair.try_into()?), - _ => todo!(), + _ => todo!("{:?}", pair.as_rule()), }) } } diff --git a/src/ddl/reify.rs b/src/ddl/reify.rs index 188485ef..fd278723 100644 --- a/src/ddl/reify.rs +++ b/src/ddl/reify.rs @@ -1120,6 +1120,7 @@ impl<'a> DdlContext for TempDbContext<'a> { } fn commit(&mut self) -> Result<()> { + self.txn.commit()?; Ok(()) } } diff --git a/src/grammar.pest b/src/grammar.pest index 8ffd3e69..c876ac29 100644 --- a/src/grammar.pest +++ b/src/grammar.pest @@ -186,7 +186,7 @@ seq_def = { "sequence" ~ name_in_def ~ ";" } definition = _{ node_def | assoc_def | edge_def | index_def | seq_def } definition_all = _{SOI ~ definition ~ EOI} -statement = _{ scope | definition } +statement = _{ scope | definition | ra_expr_stmt } scope = {"{" ~ statement* ~ "}"} persist_block = {"persist!" ~ "{" ~ definition* ~ "}" } @@ -211,6 +211,7 @@ ra_arg = { ra_expr | chain | scoped_list | scoped_dict | keyed_dict | walk_cond ra_call_expr = { query_ident ~ "(" ~ (ra_arg ~ ",")* ~ ra_arg? ~ ")" } ra_call = {"." ~ query_ident ~ "(" ~ (ra_arg ~ ",")* ~ ra_arg? ~ ")"} ra_expr = { ra_call_expr ~ ra_call* } +ra_expr_stmt = _{ra_expr ~ ";"?} ra_expr_all = _{SOI ~ ra_expr ~ EOI} walk_cond = { ident ~ "=>" ~ ra_expr } diff --git a/src/runtime/session.rs b/src/runtime/session.rs index f7674f22..4058daf6 100644 --- a/src/runtime/session.rs +++ b/src/runtime/session.rs @@ -1,3 +1,5 @@ +use crate::algebra::op::RelationalAlgebra; +use crate::algebra::parser::build_relational_expr; use crate::data::expr::Expr; use crate::data::tuple::{DataKind, OwnTuple, Tuple}; use crate::data::tuple_set::{TableId, MIN_TABLE_ID_BOUND}; @@ -179,12 +181,26 @@ impl Session { } fn execute_query(&mut self, pair: Pair, writable_main: bool) -> Result { let mut ctx = self.temp_ctx(writable_main); + let mut ret = vec![]; for pair in pair.into_inner() { - let schema = DdlSchema::try_from(pair)?; - ctx.build_table(schema)?; + match pair.as_rule() { + Rule::node_def + | Rule::assoc_def + | Rule::edge_def + | Rule::index_def + | Rule::seq_def => { + let schema = DdlSchema::try_from(pair)?; + ctx.build_table(schema)?; + } + Rule::ra_expr => { + let ra = build_relational_expr(&ctx, pair)?; + ret.extend(ra.get_values()?); + } + _ => todo!("{:?}", pair.as_rule()), + } } ctx.commit()?; - Ok(Value::Null) + Ok(Value::List(ret)) } fn execute_persist_block(&mut self, pair: Pair) -> Result { let mut ctx = self.main_ctx();