From 51b70b424b6ea043078f533ccc9aeef64ce71f71 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Thu, 8 Jun 2023 21:02:53 +0800 Subject: [PATCH] support sysops in imperative scripts --- cozo-core/src/cozoscript.pest | 6 ++- cozo-core/src/parse/imperative.rs | 17 +++++++- cozo-core/src/parse/sys.rs | 1 + cozo-core/src/runtime/db.rs | 65 ++++++++++++----------------- cozo-core/src/runtime/imperative.rs | 14 ++++++- cozo-core/src/runtime/relation.rs | 58 ++++++++++++------------- 6 files changed, 87 insertions(+), 74 deletions(-) diff --git a/cozo-core/src/cozoscript.pest b/cozo-core/src/cozoscript.pest index 38b02887..55bcac73 100644 --- a/cozo-core/src/cozoscript.pest +++ b/cozo-core/src/cozoscript.pest @@ -14,6 +14,9 @@ imperative_script = {SOI ~ imperative_stmt+ ~ EOI} sys_script = {SOI ~ "::" ~ (list_relations_op | list_columns_op | list_indices_op | remove_relations_op | trigger_relation_op | trigger_relation_show_op | rename_relations_op | running_op | kill_op | explain_op | access_level_op | index_op | vec_idx_op | fts_idx_op | lsh_idx_op | compact_op | list_fixed_rules) ~ EOI} +sys_script_inner = {"{" ~ "::" ~ (list_relations_op | list_columns_op | list_indices_op | remove_relations_op | trigger_relation_op | + trigger_relation_show_op | rename_relations_op | running_op | kill_op | explain_op | + access_level_op | index_op | vec_idx_op | fts_idx_op | lsh_idx_op | compact_op | list_fixed_rules) ~ "}"} index_op = {"index" ~ (index_create | index_drop)} vec_idx_op = {"hnsw" ~ (index_create_adv | index_drop)} fts_idx_op = {"fts" ~ (index_create_adv | index_drop)} @@ -233,9 +236,10 @@ vec_type = {"<" ~ vec_el_type ~ ";" ~ pos_int ~ ">"} vec_el_type = {"F32" | "F64" | "Float" | "Double" } imperative_stmt = _{ - break_stmt | continue_stmt | return_stmt | debug_stmt | + break_stmt | continue_stmt | return_stmt | debug_stmt | imperative_sysop | imperative_clause | ignore_error_script | if_chain | if_not_chain | loop_block | temp_swap } +imperative_sysop = {sys_script_inner ~ ("as" ~ definitely_underscore_ident)?} imperative_clause = {query_script_inner ~ ("as" ~ definitely_underscore_ident)?} imperative_condition = _{underscore_ident | imperative_clause} if_chain = {"%if" ~ imperative_condition diff --git a/cozo-core/src/parse/imperative.rs b/cozo-core/src/parse/imperative.rs index c4373826..a0140698 100644 --- a/cozo-core/src/parse/imperative.rs +++ b/cozo-core/src/parse/imperative.rs @@ -17,8 +17,10 @@ use smartstring::SmartString; use thiserror::Error; use crate::parse::query::parse_query; +use crate::parse::sys::parse_sys; use crate::parse::{ - ExtractSpan, ImperativeProgram, ImperativeStmt, ImperativeStmtClause, Pair, Rule, SourceSpan, + ExtractSpan, ImperativeProgram, ImperativeStmt, ImperativeStmtClause, ImperativeSysop, Pair, + Rule, SourceSpan, }; use crate::{DataValue, FixedRule, ValidityTs}; @@ -175,6 +177,19 @@ fn parse_imperative_stmt( temp: SmartString::from(name), } } + Rule::imperative_sysop => { + let mut src = pair.into_inner(); + let sysop = parse_sys( + src.next().unwrap().into_inner(), + param_pool, + fixed_rules, + cur_vld, + )?; + let store_as = src.next().map(|p| SmartString::from(p.as_str().trim())); + ImperativeStmt::SysOp { + sysop: ImperativeSysop { sysop, store_as }, + } + } Rule::imperative_clause => { let mut src = pair.into_inner(); let prog = parse_query( diff --git a/cozo-core/src/parse/sys.rs b/cozo-core/src/parse/sys.rs index 70282fb3..dec5e1b8 100644 --- a/cozo-core/src/parse/sys.rs +++ b/cozo-core/src/parse/sys.rs @@ -26,6 +26,7 @@ use crate::parse::{ExtractSpan, Pairs, Rule, SourceSpan}; use crate::runtime::relation::AccessLevel; use crate::{Expr, FixedRule}; +#[derive(Debug)] pub(crate) enum SysOp { Compact, ListColumns(Symbol), diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index 0a44a282..7235646c 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -56,7 +56,7 @@ use crate::runtime::relation::{ }; use crate::runtime::transact::SessionTx; use crate::storage::temp::TempStorage; -use crate::storage::{Storage, StoreTx}; +use crate::storage::Storage; use crate::{decode_tuple_from_kv, FixedRule, Symbol}; pub(crate) struct RunningQueryHandle { @@ -1164,15 +1164,18 @@ impl<'s, S: Storage<'s>> Db { Ok(NamedRows::new(headers, rows)) } - fn run_sys_op(&'s self, op: SysOp, read_only: bool) -> Result { + pub(crate) fn run_sys_op_with_tx( + &'s self, + tx: &mut SessionTx<'_>, + op: &SysOp, + read_only: bool, + ) -> Result { match op { SysOp::Explain(prog) => { - let mut tx = self.transact()?; - let (normalized_program, _) = prog.into_normalized_program(&tx)?; + let (normalized_program, _) = prog.clone().into_normalized_program(&tx)?; let (stratified_program, _) = normalized_program.into_stratified_program()?; let program = stratified_program.magic_sets_rewrite(&tx)?; let compiled = tx.stratified_magic_compile(program)?; - tx.commit_tx()?; self.explain_compiled(&compiled) } SysOp::Compact => { @@ -1185,7 +1188,7 @@ impl<'s, S: Storage<'s>> Db { vec![vec![DataValue::from(OK_STR)]], )) } - SysOp::ListRelations => self.list_relations(), + SysOp::ListRelations => self.list_relations(tx), SysOp::ListFixedRules => { let rules = self.fixed_rules.read().unwrap(); Ok(NamedRows::new( @@ -1204,7 +1207,6 @@ impl<'s, S: Storage<'s>> Db { let locks = self.obtain_relation_locks(rel_name_strs); let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec(); let mut bounds = vec![]; - let mut tx = self.transact_write()?; for rs in rel_names { let bound = tx.destroy_relation(&rs)?; if !rs.is_temp_store_name() { @@ -1214,14 +1216,12 @@ impl<'s, S: Storage<'s>> Db { for (lower, upper) in bounds { tx.store_tx.del_range_from_persisted(&lower, &upper)?; } - tx.commit_tx()?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], vec![vec![DataValue::from(OK_STR)]], )) } SysOp::DescribeRelation(rel_name, description) => { - let mut tx = self.transact_write()?; tx.describe_relation(&rel_name, description)?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], @@ -1237,9 +1237,7 @@ impl<'s, S: Storage<'s>> Db { .pop() .unwrap(); let _guard = lock.write().unwrap(); - let mut tx = self.transact_write()?; tx.create_index(&rel_name, &idx_name, cols)?; - tx.commit_tx()?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], vec![vec![DataValue::from(OK_STR)]], @@ -1254,9 +1252,7 @@ impl<'s, S: Storage<'s>> Db { .pop() .unwrap(); let _guard = lock.write().unwrap(); - let mut tx = self.transact_write()?; tx.create_hnsw_index(config)?; - tx.commit_tx()?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], vec![vec![DataValue::from(OK_STR)]], @@ -1271,9 +1267,7 @@ impl<'s, S: Storage<'s>> Db { .pop() .unwrap(); let _guard = lock.write().unwrap(); - let mut tx = self.transact_write()?; tx.create_fts_index(config)?; - tx.commit_tx()?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], vec![vec![DataValue::from(OK_STR)]], @@ -1288,9 +1282,7 @@ impl<'s, S: Storage<'s>> Db { .pop() .unwrap(); let _guard = lock.write().unwrap(); - let mut tx = self.transact_write()?; tx.create_minhash_lsh_index(config)?; - tx.commit_tx()?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], vec![vec![DataValue::from(OK_STR)]], @@ -1305,19 +1297,17 @@ impl<'s, S: Storage<'s>> Db { .pop() .unwrap(); let _guard = lock.read().unwrap(); - let mut tx = self.transact_write()?; let bounds = tx.remove_index(&rel_name, &idx_name)?; for (lower, upper) in bounds { tx.store_tx.del_range_from_persisted(&lower, &upper)?; } - tx.commit_tx()?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], vec![vec![DataValue::from(OK_STR)]], )) } - SysOp::ListColumns(rs) => self.list_columns(&rs), - SysOp::ListIndices(rs) => self.list_indices(&rs), + SysOp::ListColumns(rs) => self.list_columns(tx, &rs), + SysOp::ListIndices(rs) => self.list_indices(tx, &rs), SysOp::RenameRelation(rename_pairs) => { if read_only { bail!("Cannot rename relations in read-only mode"); @@ -1325,11 +1315,9 @@ impl<'s, S: Storage<'s>> Db { let rel_names = rename_pairs.iter().flat_map(|(f, t)| [&f.name, &t.name]); let locks = self.obtain_relation_locks(rel_names); let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec(); - let mut tx = self.transact_write()?; for (old, new) in rename_pairs { tx.rename_relation(old, new)?; } - tx.commit_tx()?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], vec![vec![DataValue::from(OK_STR)]], @@ -1353,7 +1341,6 @@ impl<'s, S: Storage<'s>> Db { }) } SysOp::ShowTrigger(name) => { - let mut tx = self.transact()?; let rel = tx.get_relation(&name, false)?; let mut rows: Vec> = vec![]; for (i, trigger) in rel.put_triggers.iter().enumerate() { @@ -1369,7 +1356,6 @@ impl<'s, S: Storage<'s>> Db { .into_iter() .map(|row| row.into_iter().map(DataValue::from).collect_vec()) .collect_vec(); - tx.commit_tx()?; Ok(NamedRows::new( vec!["type".to_string(), "idx".to_string(), "trigger".to_string()], rows, @@ -1379,9 +1365,7 @@ impl<'s, S: Storage<'s>> Db { if read_only { bail!("Cannot set triggers in read-only mode"); } - let mut tx = self.transact_write()?; tx.set_relation_triggers(name, puts, rms, replaces)?; - tx.commit_tx()?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], vec![vec![DataValue::from(OK_STR)]], @@ -1391,11 +1375,9 @@ impl<'s, S: Storage<'s>> Db { if read_only { bail!("Cannot set access level in read-only mode"); } - let mut tx = self.transact_write()?; for name in names { - tx.set_access_level(name, level)?; + tx.set_access_level(name, *level)?; } - tx.commit_tx()?; Ok(NamedRows::new( vec![STATUS_STR.to_string()], vec![vec![DataValue::from(OK_STR)]], @@ -1403,6 +1385,16 @@ impl<'s, S: Storage<'s>> Db { } } } + fn run_sys_op(&'s self, op: SysOp, read_only: bool) -> Result { + let mut tx = if read_only { + self.transact()? + } else { + self.transact_write()? + }; + let res = self.run_sys_op_with_tx(&mut tx, &op, read_only)?; + tx.commit_tx()?; + Ok(res) + } /// This is the entry to query evaluation pub(crate) fn run_query( &self, @@ -1653,8 +1645,7 @@ impl<'s, S: Storage<'s>> Db { rows, )) } - fn list_indices(&'s self, name: &str) -> Result { - let mut tx = self.transact()?; + fn list_indices(&'s self, tx: &SessionTx<'_>, name: &str) -> Result { let handle = tx.get_relation(name, false)?; let mut rows = vec![]; for (name, (rel, cols)) in &handle.indices { @@ -1714,7 +1705,6 @@ impl<'s, S: Storage<'s>> Db { }), ]); } - tx.commit_tx()?; let rows = rows .into_iter() .map(|row| row.into_iter().map(DataValue::from).collect_vec()) @@ -1729,8 +1719,7 @@ impl<'s, S: Storage<'s>> Db { rows, )) } - fn list_columns(&'s self, name: &str) -> Result { - let mut tx = self.transact()?; + fn list_columns(&'s self, tx: &SessionTx<'_>, name: &str) -> Result { let handle = tx.get_relation(name, false)?; let mut rows = vec![]; let mut idx = 0; @@ -1754,7 +1743,6 @@ impl<'s, S: Storage<'s>> Db { ]); idx += 1; } - tx.commit_tx()?; let rows = rows .into_iter() .map(|row| row.into_iter().map(DataValue::from).collect_vec()) @@ -1770,13 +1758,12 @@ impl<'s, S: Storage<'s>> Db { rows, )) } - fn list_relations(&'s self) -> Result { + fn list_relations(&'s self, tx: &SessionTx<'_>) -> Result { let lower = vec![DataValue::from("")].encode_as_key(RelationId::SYSTEM); let upper = vec![DataValue::from(String::from(LARGEST_UTF_CHAR))].encode_as_key(RelationId::SYSTEM); - let tx = self.db.transact(false)?; let mut rows: Vec> = vec![]; - for kv_res in tx.range_scan(&lower, &upper) { + for kv_res in tx.store_tx.range_scan(&lower, &upper) { let (k_slice, v_slice) = kv_res?; if upper <= k_slice { break; diff --git a/cozo-core/src/runtime/imperative.rs b/cozo-core/src/runtime/imperative.rs index ed3df0bd..9f0e003b 100644 --- a/cozo-core/src/runtime/imperative.rs +++ b/cozo-core/src/runtime/imperative.rs @@ -72,6 +72,7 @@ impl<'s, S: Storage<'s>> Db { callback_targets: &BTreeSet>, callback_collector: &mut CallbackCollector, poison: &Poison, + readonly: bool, ) -> Result> { let mut ret = NamedRows::default(); for p in ps { @@ -118,6 +119,12 @@ impl<'s, S: Storage<'s>> Db { println!("{}: {:?}", temp, relation.as_named_rows(tx)?); ret = NamedRows::default(); } + ImperativeStmt::SysOp { sysop, .. } => { + ret = self.run_sys_op_with_tx(tx, &sysop.sysop, readonly)?; + if let Some(store_as) = &sysop.store_as { + tx.script_store_as_relation(self, store_as, &ret, cur_vld)?; + } + } ImperativeStmt::Program { prog, .. } => { ret = self.execute_single_program( prog.prog.clone(), @@ -179,6 +186,7 @@ impl<'s, S: Storage<'s>> Db { callback_targets, callback_collector, poison, + readonly, )? { Left(rows) => { ret = rows; @@ -199,6 +207,7 @@ impl<'s, S: Storage<'s>> Db { callback_targets, callback_collector, poison, + readonly, )? { Left(_) => {} Right(ctrl) => match ctrl { @@ -247,11 +256,11 @@ impl<'s, S: Storage<'s>> Db { &'s self, cur_vld: ValidityTs, ps: &ImperativeProgram, - read_only: bool, + readonly: bool, ) -> Result { let mut callback_collector = BTreeMap::new(); let mut write_lock_names = BTreeSet::new(); - if read_only && !write_lock_names.is_empty() { + if readonly && !write_lock_names.is_empty() { bail!("Read-only imperative program attempted to acquire write locks"); } for p in ps { @@ -297,6 +306,7 @@ impl<'s, S: Storage<'s>> Db { &callback_targets, &mut callback_collector, &poison, + readonly, )? { Left(res) => ret = res, Right(ctrl) => match ctrl { diff --git a/cozo-core/src/runtime/relation.rs b/cozo-core/src/runtime/relation.rs index 187c014f..f40f95d3 100644 --- a/cozo-core/src/runtime/relation.rs +++ b/cozo-core/src/runtime/relation.rs @@ -552,10 +552,10 @@ impl<'a> SessionTx<'a> { } pub(crate) fn set_relation_triggers( &mut self, - name: Symbol, - puts: Vec, - rms: Vec, - replaces: Vec, + name: &Symbol, + puts: &[String], + rms: &[String], + replaces: &[String], ) -> Result<()> { if name.name.starts_with('_') { bail!("Cannot set triggers for temp store") @@ -568,9 +568,9 @@ impl<'a> SessionTx<'a> { original.access_level )) } - original.put_triggers = puts; - original.rm_triggers = rms; - original.replace_triggers = replaces; + original.put_triggers = puts.to_vec(); + original.rm_triggers = rms.to_vec(); + original.replace_triggers = replaces.to_vec(); let name_key = vec![DataValue::Str(original.name.clone())].encode_as_key(RelationId::SYSTEM); @@ -662,14 +662,10 @@ impl<'a> SessionTx<'a> { let metadata = RelationHandle::decode(&found)?; Ok(metadata) } - pub(crate) fn describe_relation( - &mut self, - name: &str, - description: SmartString, - ) -> Result<()> { + pub(crate) fn describe_relation(&mut self, name: &str, description: &str) -> Result<()> { let mut meta = self.get_relation(name, true)?; - meta.description = description; + meta.description = SmartString::from(description); let name_key = vec![DataValue::Str(meta.name.clone())].encode_as_key(RelationId::SYSTEM); let mut meta_val = vec![]; meta.serialize(&mut Serializer::new(&mut meta_val).with_struct_map()) @@ -726,7 +722,7 @@ impl<'a> SessionTx<'a> { to_clean.push((lower_bound, upper_bound)); Ok(to_clean) } - pub(crate) fn set_access_level(&mut self, rel: Symbol, level: AccessLevel) -> Result<()> { + pub(crate) fn set_access_level(&mut self, rel: &Symbol, level: AccessLevel) -> Result<()> { let mut meta = self.get_relation(&rel, true)?; meta.access_level = level; @@ -740,7 +736,7 @@ impl<'a> SessionTx<'a> { Ok(()) } - pub(crate) fn create_minhash_lsh_index(&mut self, config: MinHashLshConfig) -> Result<()> { + pub(crate) fn create_minhash_lsh_index(&mut self, config: &MinHashLshConfig) -> Result<()> { // Get relation handle let mut rel_handle = self.get_relation(&config.base_relation, true)?; @@ -805,12 +801,12 @@ impl<'a> SessionTx<'a> { let num_perm = params.b * params.r; let perms = HashPermutations::new(num_perm); let manifest = MinHashLshIndexManifest { - base_relation: config.base_relation, - index_name: config.index_name, - extractor: config.extractor, + base_relation: config.base_relation.clone(), + index_name: config.index_name.clone(), + extractor: config.extractor.clone(), n_gram: config.n_gram, - tokenizer: config.tokenizer, - filters: config.filters, + tokenizer: config.tokenizer.clone(), + filters: config.filters.clone(), num_perm, n_bands: params.b, n_rows_in_band: params.r, @@ -870,7 +866,7 @@ impl<'a> SessionTx<'a> { Ok(()) } - pub(crate) fn create_fts_index(&mut self, config: FtsIndexConfig) -> Result<()> { + pub(crate) fn create_fts_index(&mut self, config: &FtsIndexConfig) -> Result<()> { // Get relation handle let mut rel_handle = self.get_relation(&config.base_relation, true)?; @@ -946,11 +942,11 @@ impl<'a> SessionTx<'a> { // add index to relation let manifest = FtsIndexManifest { - base_relation: config.base_relation, - index_name: config.index_name, - extractor: config.extractor, - tokenizer: config.tokenizer, - filters: config.filters, + base_relation: config.base_relation.clone(), + index_name: config.index_name.clone(), + extractor: config.extractor.clone(), + tokenizer: config.tokenizer.clone(), + filters: config.filters.clone(), }; // populate index @@ -1011,7 +1007,7 @@ impl<'a> SessionTx<'a> { Ok(()) } - pub(crate) fn create_hnsw_index(&mut self, config: HnswIndexConfig) -> Result<()> { + pub(crate) fn create_hnsw_index(&mut self, config: &HnswIndexConfig) -> Result<()> { // Get relation handle let mut rel_handle = self.get_relation(&config.base_relation, true)?; @@ -1149,7 +1145,7 @@ impl<'a> SessionTx<'a> { m_max: config.m_neighbours, m_max0: config.m_neighbours * 2, level_multiplier: 1. / (config.m_neighbours as f64).ln(), - index_filter: config.index_filter, + index_filter: config.index_filter.clone(), extend_candidates: config.extend_candidates, keep_pruned_connections: config.keep_pruned_connections, }; @@ -1237,7 +1233,7 @@ impl<'a> SessionTx<'a> { &mut self, rel_name: &Symbol, idx_name: &Symbol, - cols: Vec, + cols: &[Symbol], ) -> Result<()> { // Get relation handle let mut rel_handle = self.get_relation(rel_name, true)?; @@ -1412,7 +1408,7 @@ impl<'a> SessionTx<'a> { Ok(to_clean) } - pub(crate) fn rename_relation(&mut self, old: Symbol, new: Symbol) -> Result<()> { + pub(crate) fn rename_relation(&mut self, old: &Symbol, new: &Symbol) -> Result<()> { if old.name.starts_with('_') || new.name.starts_with('_') { bail!("Bad name given"); } @@ -1434,7 +1430,7 @@ impl<'a> SessionTx<'a> { rel.access_level )); } - rel.name = new.name; + rel.name = new.name.clone(); let mut meta_val = vec![]; rel.serialize(&mut Serializer::new(&mut meta_val)).unwrap();