various fixes

main
Ziyang Hu 2 years ago
parent fbf03b8fea
commit e9e47531ec

@ -299,17 +299,16 @@ impl<'a> WalkOp<'a> {
fn build_selection_iter( fn build_selection_iter(
&self, &self,
it: Box<dyn Iterator<Item = Result<TupleSet>>>, it: Box<dyn Iterator<Item = Result<TupleSet>>>,
truncate_kv_sizes: (usize, usize),
) -> Result<impl Iterator<Item = Result<TupleSet>>> { ) -> Result<impl Iterator<Item = Result<TupleSet>>> {
let extraction_vec = self.extraction_map.values().cloned().collect::<Vec<_>>(); let extraction_vec = self.extraction_map.values().cloned().collect::<Vec<_>>();
extraction_vec.iter().for_each(|ex| ex.aggr_reset()); extraction_vec.iter().for_each(|ex| ex.aggr_reset());
let mut val_collectors = vec![]; let mut val_collectors = vec![];
for ex in &extraction_vec { for ex in &extraction_vec {
// TODO the check here is more complicated than in the group case: if !ex.is_truncate_aggr_compatible(truncate_kv_sizes.0, truncate_kv_sizes.1) {
// check that nothing non-aggr exceeds the allowed kv range return Err(AlgebraParseError::ScalarFnNotAllowed.into());
// if !ex.is_aggr_compatible() { }
// return Err(AlgebraParseError::ScalarFnNotAllowed.into());
// }
if let Ok(heads) = ex.clone().extract_aggr_heads() { if let Ok(heads) = ex.clone().extract_aggr_heads() {
val_collectors.extend(heads) val_collectors.extend(heads)
} }
@ -344,49 +343,50 @@ impl<'a> WalkOp<'a> {
} }
let mut out = TupleSet::default(); let mut out = TupleSet::default();
out.vals.push(tuple.into()); out.vals.push(tuple.into());
last_tset = TupleSet::default();
Some(Ok(out)) Some(Ok(out))
} else { } else {
let eval_ctx = TupleSetEvalContext { if !last_tset.keys.is_empty() {
tuple_set: &tset, let eval_ctx = TupleSetEvalContext {
txn: &txn, tuple_set: &last_tset,
temp_db: &temp_db, txn: &txn,
write_options: &w_opts, temp_db: &temp_db,
}; write_options: &w_opts,
for (op, args) in &val_collectors { };
match args.len() { for (op, args) in &val_collectors {
0 => match op.put(&[]) { match args.len() {
Ok(_) => {} 0 => match op.put(&[]) {
Err(e) => return Some(Err(e)),
},
1 => {
let arg = args.iter().next().unwrap();
let arg = match arg.row_eval(&eval_ctx) {
Ok(v) => v,
Err(e) => return Some(Err(e)),
};
match op.put(&[arg]) {
Ok(_) => {} Ok(_) => {}
Err(e) => return Some(Err(e)), Err(e) => return Some(Err(e)),
}; },
} 1 => {
_ => { let arg = args.iter().next().unwrap();
let mut args_vals = Vec::with_capacity(args.len());
for arg in args {
let arg = match arg.row_eval(&eval_ctx) { let arg = match arg.row_eval(&eval_ctx) {
Ok(v) => v, Ok(v) => v,
Err(e) => return Some(Err(e)), Err(e) => return Some(Err(e)),
}; };
args_vals.push(arg); match op.put(&[arg]) {
Ok(_) => {}
Err(e) => return Some(Err(e)),
};
}
_ => {
let mut args_vals = Vec::with_capacity(args.len());
for arg in args {
let arg = match arg.row_eval(&eval_ctx) {
Ok(v) => v,
Err(e) => return Some(Err(e)),
};
args_vals.push(arg);
}
match op.put(&args_vals) {
Ok(_) => {}
Err(e) => return Some(Err(e)),
};
} }
match op.put(&args_vals) {
Ok(_) => {}
Err(e) => return Some(Err(e)),
};
} }
} }
} }
last_tset = tset.into_owned(); last_tset = tset.into_owned();
None None
} }
@ -504,7 +504,7 @@ impl<'b> RelationalAlgebra for WalkOp<'b> {
key_len: final_truncate_kv_size.0, key_len: final_truncate_kv_size.0,
}); });
let iter = self.build_selection_iter(it)?; let iter = self.build_selection_iter(it, final_truncate_kv_size)?;
Ok(Box::new(iter)) Ok(Box::new(iter))
} }

@ -3,23 +3,15 @@ use cozo::DbInstance;
use std::sync::Arc; use std::sync::Arc;
struct AppStateWithDb { struct AppStateWithDb {
db: DbInstance, db: DbInstance
} }
#[get("/")] #[post("/")]
async fn hello(data: web::Data<AppStateWithDb>) -> impl Responder { async fn query(body: web::Bytes, data: web::Data<AppStateWithDb>) -> impl Responder {
// let sess = data.db.session().unwrap().start().unwrap(); let text = String::from_utf8_lossy(body.as_ref());
// let res = sess.get_next_main_table_id(); let mut sess = data.db.session().unwrap().start().unwrap();
HttpResponse::Ok().body(format!("Hello world! {:?}", None)) let res = sess.run_script(text, true);
} HttpResponse::Ok().body(format!("{:?}", res))
#[post("/echo")]
async fn echo(req_body: String) -> impl Responder {
HttpResponse::Ok().body(req_body)
}
async fn manual_hello() -> impl Responder {
HttpResponse::Ok().body("Hey there!")
} }
#[actix_web::main] #[actix_web::main]
@ -34,9 +26,7 @@ async fn main() -> std::io::Result<()> {
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.app_data(db.clone()) .app_data(db.clone())
.service(hello) .service(query)
.service(echo)
.route("/hey", web::get().to(manual_hello))
}) })
.bind(("127.0.0.1", 8080))? .bind(("127.0.0.1", 8080))?
.run() .run()

@ -224,6 +224,38 @@ impl Expr {
} }
} }
pub(crate) fn is_truncate_aggr_compatible(&self, key_size: usize, val_size: usize) -> bool {
match self {
Expr::Const(_) => true,
Expr::List(l) => l.iter().all(|el| el.is_aggr_compatible()),
Expr::Dict(d) => d.values().all(|el| el.is_aggr_compatible()),
Expr::Variable(_) => false,
Expr::TupleSetIdx(TupleSetIdx { is_key, t_set, .. }) => {
if *is_key {
*t_set < key_size
} else {
*t_set < val_size
}
}
Expr::ApplyAgg(_, _, _) => true,
Expr::FieldAcc(_, arg) => arg.is_aggr_compatible(),
Expr::IdxAcc(_, arg) => arg.is_aggr_compatible(),
Expr::IfExpr(args) => {
let (a, b, c) = args.as_ref();
a.is_aggr_compatible() && b.is_aggr_compatible() && c.is_aggr_compatible()
}
Expr::SwitchExpr(args) => args
.iter()
.all(|(cond, expr)| cond.is_aggr_compatible() && expr.is_aggr_compatible()),
Expr::OpAnd(args) => args.iter().all(|el| el.is_aggr_compatible()),
Expr::OpOr(args) => args.iter().all(|el| el.is_aggr_compatible()),
Expr::OpCoalesce(args) => args.iter().all(|el| el.is_aggr_compatible()),
Expr::OpMerge(args) => args.iter().all(|el| el.is_aggr_compatible()),
Expr::OpConcat(args) => args.iter().all(|el| el.is_aggr_compatible()),
Expr::BuiltinFn(_, args) => args.iter().all(|el| el.is_aggr_compatible()),
}
}
pub(crate) fn is_aggr_compatible(&self) -> bool { pub(crate) fn is_aggr_compatible(&self) -> bool {
match self { match self {
Expr::Const(_) => true, Expr::Const(_) => true,

@ -129,7 +129,7 @@ impl<'a> TryFrom<Pair<'a>> for DdlSchema {
Rule::assoc_def => DdlSchema::Assoc(pair.try_into()?), Rule::assoc_def => DdlSchema::Assoc(pair.try_into()?),
Rule::seq_def => DdlSchema::Sequence(pair.try_into()?), Rule::seq_def => DdlSchema::Sequence(pair.try_into()?),
Rule::index_def => DdlSchema::Index(pair.try_into()?), Rule::index_def => DdlSchema::Index(pair.try_into()?),
_ => todo!(), _ => todo!("{:?}", pair.as_rule()),
}) })
} }
} }

@ -1120,6 +1120,7 @@ impl<'a> DdlContext for TempDbContext<'a> {
} }
fn commit(&mut self) -> Result<()> { fn commit(&mut self) -> Result<()> {
self.txn.commit()?;
Ok(()) Ok(())
} }
} }

@ -186,7 +186,7 @@ seq_def = { "sequence" ~ name_in_def ~ ";" }
definition = _{ node_def | assoc_def | edge_def | index_def | seq_def } definition = _{ node_def | assoc_def | edge_def | index_def | seq_def }
definition_all = _{SOI ~ definition ~ EOI} definition_all = _{SOI ~ definition ~ EOI}
statement = _{ scope | definition } statement = _{ scope | definition | ra_expr_stmt }
scope = {"{" ~ statement* ~ "}"} scope = {"{" ~ statement* ~ "}"}
persist_block = {"persist!" ~ "{" ~ definition* ~ "}" } persist_block = {"persist!" ~ "{" ~ definition* ~ "}" }
@ -211,6 +211,7 @@ ra_arg = { ra_expr | chain | scoped_list | scoped_dict | keyed_dict | walk_cond
ra_call_expr = { query_ident ~ "(" ~ (ra_arg ~ ",")* ~ ra_arg? ~ ")" } ra_call_expr = { query_ident ~ "(" ~ (ra_arg ~ ",")* ~ ra_arg? ~ ")" }
ra_call = {"." ~ query_ident ~ "(" ~ (ra_arg ~ ",")* ~ ra_arg? ~ ")"} ra_call = {"." ~ query_ident ~ "(" ~ (ra_arg ~ ",")* ~ ra_arg? ~ ")"}
ra_expr = { ra_call_expr ~ ra_call* } ra_expr = { ra_call_expr ~ ra_call* }
ra_expr_stmt = _{ra_expr ~ ";"?}
ra_expr_all = _{SOI ~ ra_expr ~ EOI} ra_expr_all = _{SOI ~ ra_expr ~ EOI}
walk_cond = { ident ~ "=>" ~ ra_expr } walk_cond = { ident ~ "=>" ~ ra_expr }

@ -1,3 +1,5 @@
use crate::algebra::op::RelationalAlgebra;
use crate::algebra::parser::build_relational_expr;
use crate::data::expr::Expr; use crate::data::expr::Expr;
use crate::data::tuple::{DataKind, OwnTuple, Tuple}; use crate::data::tuple::{DataKind, OwnTuple, Tuple};
use crate::data::tuple_set::{TableId, MIN_TABLE_ID_BOUND}; use crate::data::tuple_set::{TableId, MIN_TABLE_ID_BOUND};
@ -179,12 +181,26 @@ impl Session {
} }
fn execute_query(&mut self, pair: Pair, writable_main: bool) -> Result<Value> { fn execute_query(&mut self, pair: Pair, writable_main: bool) -> Result<Value> {
let mut ctx = self.temp_ctx(writable_main); let mut ctx = self.temp_ctx(writable_main);
let mut ret = vec![];
for pair in pair.into_inner() { for pair in pair.into_inner() {
let schema = DdlSchema::try_from(pair)?; match pair.as_rule() {
ctx.build_table(schema)?; Rule::node_def
| Rule::assoc_def
| Rule::edge_def
| Rule::index_def
| Rule::seq_def => {
let schema = DdlSchema::try_from(pair)?;
ctx.build_table(schema)?;
}
Rule::ra_expr => {
let ra = build_relational_expr(&ctx, pair)?;
ret.extend(ra.get_values()?);
}
_ => todo!("{:?}", pair.as_rule()),
}
} }
ctx.commit()?; ctx.commit()?;
Ok(Value::Null) Ok(Value::List(ret))
} }
fn execute_persist_block(&mut self, pair: Pair) -> Result<Value> { fn execute_persist_block(&mut self, pair: Pair) -> Result<Value> {
let mut ctx = self.main_ctx(); let mut ctx = self.main_ctx();

Loading…
Cancel
Save