support sysops in imperative scripts

main
Ziyang Hu 1 year ago
parent 45f81b5e84
commit 51b70b424b

@ -14,6 +14,9 @@ imperative_script = {SOI ~ imperative_stmt+ ~ EOI}
sys_script = {SOI ~ "::" ~ (list_relations_op | list_columns_op | list_indices_op | remove_relations_op | trigger_relation_op |
trigger_relation_show_op | rename_relations_op | running_op | kill_op | explain_op |
access_level_op | index_op | vec_idx_op | fts_idx_op | lsh_idx_op | compact_op | list_fixed_rules) ~ EOI}
sys_script_inner = {"{" ~ "::" ~ (list_relations_op | list_columns_op | list_indices_op | remove_relations_op | trigger_relation_op |
trigger_relation_show_op | rename_relations_op | running_op | kill_op | explain_op |
access_level_op | index_op | vec_idx_op | fts_idx_op | lsh_idx_op | compact_op | list_fixed_rules) ~ "}"}
index_op = {"index" ~ (index_create | index_drop)}
vec_idx_op = {"hnsw" ~ (index_create_adv | index_drop)}
fts_idx_op = {"fts" ~ (index_create_adv | index_drop)}
@ -233,9 +236,10 @@ vec_type = {"<" ~ vec_el_type ~ ";" ~ pos_int ~ ">"}
vec_el_type = {"F32" | "F64" | "Float" | "Double" }
imperative_stmt = _{
break_stmt | continue_stmt | return_stmt | debug_stmt |
break_stmt | continue_stmt | return_stmt | debug_stmt | imperative_sysop |
imperative_clause | ignore_error_script | if_chain | if_not_chain | loop_block | temp_swap
}
imperative_sysop = {sys_script_inner ~ ("as" ~ definitely_underscore_ident)?}
imperative_clause = {query_script_inner ~ ("as" ~ definitely_underscore_ident)?}
imperative_condition = _{underscore_ident | imperative_clause}
if_chain = {"%if" ~ imperative_condition

@ -17,8 +17,10 @@ use smartstring::SmartString;
use thiserror::Error;
use crate::parse::query::parse_query;
use crate::parse::sys::parse_sys;
use crate::parse::{
ExtractSpan, ImperativeProgram, ImperativeStmt, ImperativeStmtClause, Pair, Rule, SourceSpan,
ExtractSpan, ImperativeProgram, ImperativeStmt, ImperativeStmtClause, ImperativeSysop, Pair,
Rule, SourceSpan,
};
use crate::{DataValue, FixedRule, ValidityTs};
@ -175,6 +177,19 @@ fn parse_imperative_stmt(
temp: SmartString::from(name),
}
}
Rule::imperative_sysop => {
let mut src = pair.into_inner();
let sysop = parse_sys(
src.next().unwrap().into_inner(),
param_pool,
fixed_rules,
cur_vld,
)?;
let store_as = src.next().map(|p| SmartString::from(p.as_str().trim()));
ImperativeStmt::SysOp {
sysop: ImperativeSysop { sysop, store_as },
}
}
Rule::imperative_clause => {
let mut src = pair.into_inner();
let prog = parse_query(

@ -26,6 +26,7 @@ use crate::parse::{ExtractSpan, Pairs, Rule, SourceSpan};
use crate::runtime::relation::AccessLevel;
use crate::{Expr, FixedRule};
#[derive(Debug)]
pub(crate) enum SysOp {
Compact,
ListColumns(Symbol),

@ -56,7 +56,7 @@ use crate::runtime::relation::{
};
use crate::runtime::transact::SessionTx;
use crate::storage::temp::TempStorage;
use crate::storage::{Storage, StoreTx};
use crate::storage::Storage;
use crate::{decode_tuple_from_kv, FixedRule, Symbol};
pub(crate) struct RunningQueryHandle {
@ -1164,15 +1164,18 @@ impl<'s, S: Storage<'s>> Db<S> {
Ok(NamedRows::new(headers, rows))
}
fn run_sys_op(&'s self, op: SysOp, read_only: bool) -> Result<NamedRows> {
pub(crate) fn run_sys_op_with_tx(
&'s self,
tx: &mut SessionTx<'_>,
op: &SysOp,
read_only: bool,
) -> Result<NamedRows> {
match op {
SysOp::Explain(prog) => {
let mut tx = self.transact()?;
let (normalized_program, _) = prog.into_normalized_program(&tx)?;
let (normalized_program, _) = prog.clone().into_normalized_program(&tx)?;
let (stratified_program, _) = normalized_program.into_stratified_program()?;
let program = stratified_program.magic_sets_rewrite(&tx)?;
let compiled = tx.stratified_magic_compile(program)?;
tx.commit_tx()?;
self.explain_compiled(&compiled)
}
SysOp::Compact => {
@ -1185,7 +1188,7 @@ impl<'s, S: Storage<'s>> Db<S> {
vec![vec![DataValue::from(OK_STR)]],
))
}
SysOp::ListRelations => self.list_relations(),
SysOp::ListRelations => self.list_relations(tx),
SysOp::ListFixedRules => {
let rules = self.fixed_rules.read().unwrap();
Ok(NamedRows::new(
@ -1204,7 +1207,6 @@ impl<'s, S: Storage<'s>> Db<S> {
let locks = self.obtain_relation_locks(rel_name_strs);
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
let mut bounds = vec![];
let mut tx = self.transact_write()?;
for rs in rel_names {
let bound = tx.destroy_relation(&rs)?;
if !rs.is_temp_store_name() {
@ -1214,14 +1216,12 @@ impl<'s, S: Storage<'s>> Db<S> {
for (lower, upper) in bounds {
tx.store_tx.del_range_from_persisted(&lower, &upper)?;
}
tx.commit_tx()?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
))
}
SysOp::DescribeRelation(rel_name, description) => {
let mut tx = self.transact_write()?;
tx.describe_relation(&rel_name, description)?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
@ -1237,9 +1237,7 @@ impl<'s, S: Storage<'s>> Db<S> {
.pop()
.unwrap();
let _guard = lock.write().unwrap();
let mut tx = self.transact_write()?;
tx.create_index(&rel_name, &idx_name, cols)?;
tx.commit_tx()?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1254,9 +1252,7 @@ impl<'s, S: Storage<'s>> Db<S> {
.pop()
.unwrap();
let _guard = lock.write().unwrap();
let mut tx = self.transact_write()?;
tx.create_hnsw_index(config)?;
tx.commit_tx()?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1271,9 +1267,7 @@ impl<'s, S: Storage<'s>> Db<S> {
.pop()
.unwrap();
let _guard = lock.write().unwrap();
let mut tx = self.transact_write()?;
tx.create_fts_index(config)?;
tx.commit_tx()?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1288,9 +1282,7 @@ impl<'s, S: Storage<'s>> Db<S> {
.pop()
.unwrap();
let _guard = lock.write().unwrap();
let mut tx = self.transact_write()?;
tx.create_minhash_lsh_index(config)?;
tx.commit_tx()?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1305,19 +1297,17 @@ impl<'s, S: Storage<'s>> Db<S> {
.pop()
.unwrap();
let _guard = lock.read().unwrap();
let mut tx = self.transact_write()?;
let bounds = tx.remove_index(&rel_name, &idx_name)?;
for (lower, upper) in bounds {
tx.store_tx.del_range_from_persisted(&lower, &upper)?;
}
tx.commit_tx()?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
))
}
SysOp::ListColumns(rs) => self.list_columns(&rs),
SysOp::ListIndices(rs) => self.list_indices(&rs),
SysOp::ListColumns(rs) => self.list_columns(tx, &rs),
SysOp::ListIndices(rs) => self.list_indices(tx, &rs),
SysOp::RenameRelation(rename_pairs) => {
if read_only {
bail!("Cannot rename relations in read-only mode");
@ -1325,11 +1315,9 @@ impl<'s, S: Storage<'s>> Db<S> {
let rel_names = rename_pairs.iter().flat_map(|(f, t)| [&f.name, &t.name]);
let locks = self.obtain_relation_locks(rel_names);
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
let mut tx = self.transact_write()?;
for (old, new) in rename_pairs {
tx.rename_relation(old, new)?;
}
tx.commit_tx()?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1353,7 +1341,6 @@ impl<'s, S: Storage<'s>> Db<S> {
})
}
SysOp::ShowTrigger(name) => {
let mut tx = self.transact()?;
let rel = tx.get_relation(&name, false)?;
let mut rows: Vec<Vec<JsonValue>> = vec![];
for (i, trigger) in rel.put_triggers.iter().enumerate() {
@ -1369,7 +1356,6 @@ impl<'s, S: Storage<'s>> Db<S> {
.into_iter()
.map(|row| row.into_iter().map(DataValue::from).collect_vec())
.collect_vec();
tx.commit_tx()?;
Ok(NamedRows::new(
vec!["type".to_string(), "idx".to_string(), "trigger".to_string()],
rows,
@ -1379,9 +1365,7 @@ impl<'s, S: Storage<'s>> Db<S> {
if read_only {
bail!("Cannot set triggers in read-only mode");
}
let mut tx = self.transact_write()?;
tx.set_relation_triggers(name, puts, rms, replaces)?;
tx.commit_tx()?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1391,11 +1375,9 @@ impl<'s, S: Storage<'s>> Db<S> {
if read_only {
bail!("Cannot set access level in read-only mode");
}
let mut tx = self.transact_write()?;
for name in names {
tx.set_access_level(name, level)?;
tx.set_access_level(name, *level)?;
}
tx.commit_tx()?;
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1403,6 +1385,16 @@ impl<'s, S: Storage<'s>> Db<S> {
}
}
}
fn run_sys_op(&'s self, op: SysOp, read_only: bool) -> Result<NamedRows> {
let mut tx = if read_only {
self.transact()?
} else {
self.transact_write()?
};
let res = self.run_sys_op_with_tx(&mut tx, &op, read_only)?;
tx.commit_tx()?;
Ok(res)
}
/// This is the entry to query evaluation
pub(crate) fn run_query(
&self,
@ -1653,8 +1645,7 @@ impl<'s, S: Storage<'s>> Db<S> {
rows,
))
}
fn list_indices(&'s self, name: &str) -> Result<NamedRows> {
let mut tx = self.transact()?;
fn list_indices(&'s self, tx: &SessionTx<'_>, name: &str) -> Result<NamedRows> {
let handle = tx.get_relation(name, false)?;
let mut rows = vec![];
for (name, (rel, cols)) in &handle.indices {
@ -1714,7 +1705,6 @@ impl<'s, S: Storage<'s>> Db<S> {
}),
]);
}
tx.commit_tx()?;
let rows = rows
.into_iter()
.map(|row| row.into_iter().map(DataValue::from).collect_vec())
@ -1729,8 +1719,7 @@ impl<'s, S: Storage<'s>> Db<S> {
rows,
))
}
fn list_columns(&'s self, name: &str) -> Result<NamedRows> {
let mut tx = self.transact()?;
fn list_columns(&'s self, tx: &SessionTx<'_>, name: &str) -> Result<NamedRows> {
let handle = tx.get_relation(name, false)?;
let mut rows = vec![];
let mut idx = 0;
@ -1754,7 +1743,6 @@ impl<'s, S: Storage<'s>> Db<S> {
]);
idx += 1;
}
tx.commit_tx()?;
let rows = rows
.into_iter()
.map(|row| row.into_iter().map(DataValue::from).collect_vec())
@ -1770,13 +1758,12 @@ impl<'s, S: Storage<'s>> Db<S> {
rows,
))
}
fn list_relations(&'s self) -> Result<NamedRows> {
fn list_relations(&'s self, tx: &SessionTx<'_>) -> Result<NamedRows> {
let lower = vec![DataValue::from("")].encode_as_key(RelationId::SYSTEM);
let upper =
vec![DataValue::from(String::from(LARGEST_UTF_CHAR))].encode_as_key(RelationId::SYSTEM);
let tx = self.db.transact(false)?;
let mut rows: Vec<Vec<JsonValue>> = vec![];
for kv_res in tx.range_scan(&lower, &upper) {
for kv_res in tx.store_tx.range_scan(&lower, &upper) {
let (k_slice, v_slice) = kv_res?;
if upper <= k_slice {
break;

@ -72,6 +72,7 @@ impl<'s, S: Storage<'s>> Db<S> {
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
poison: &Poison,
readonly: bool,
) -> Result<Either<NamedRows, ControlCode>> {
let mut ret = NamedRows::default();
for p in ps {
@ -118,6 +119,12 @@ impl<'s, S: Storage<'s>> Db<S> {
println!("{}: {:?}", temp, relation.as_named_rows(tx)?);
ret = NamedRows::default();
}
ImperativeStmt::SysOp { sysop, .. } => {
ret = self.run_sys_op_with_tx(tx, &sysop.sysop, readonly)?;
if let Some(store_as) = &sysop.store_as {
tx.script_store_as_relation(self, store_as, &ret, cur_vld)?;
}
}
ImperativeStmt::Program { prog, .. } => {
ret = self.execute_single_program(
prog.prog.clone(),
@ -179,6 +186,7 @@ impl<'s, S: Storage<'s>> Db<S> {
callback_targets,
callback_collector,
poison,
readonly,
)? {
Left(rows) => {
ret = rows;
@ -199,6 +207,7 @@ impl<'s, S: Storage<'s>> Db<S> {
callback_targets,
callback_collector,
poison,
readonly,
)? {
Left(_) => {}
Right(ctrl) => match ctrl {
@ -247,11 +256,11 @@ impl<'s, S: Storage<'s>> Db<S> {
&'s self,
cur_vld: ValidityTs,
ps: &ImperativeProgram,
read_only: bool,
readonly: bool,
) -> Result<NamedRows, Report> {
let mut callback_collector = BTreeMap::new();
let mut write_lock_names = BTreeSet::new();
if read_only && !write_lock_names.is_empty() {
if readonly && !write_lock_names.is_empty() {
bail!("Read-only imperative program attempted to acquire write locks");
}
for p in ps {
@ -297,6 +306,7 @@ impl<'s, S: Storage<'s>> Db<S> {
&callback_targets,
&mut callback_collector,
&poison,
readonly,
)? {
Left(res) => ret = res,
Right(ctrl) => match ctrl {

@ -552,10 +552,10 @@ impl<'a> SessionTx<'a> {
}
pub(crate) fn set_relation_triggers(
&mut self,
name: Symbol,
puts: Vec<String>,
rms: Vec<String>,
replaces: Vec<String>,
name: &Symbol,
puts: &[String],
rms: &[String],
replaces: &[String],
) -> Result<()> {
if name.name.starts_with('_') {
bail!("Cannot set triggers for temp store")
@ -568,9 +568,9 @@ impl<'a> SessionTx<'a> {
original.access_level
))
}
original.put_triggers = puts;
original.rm_triggers = rms;
original.replace_triggers = replaces;
original.put_triggers = puts.to_vec();
original.rm_triggers = rms.to_vec();
original.replace_triggers = replaces.to_vec();
let name_key =
vec![DataValue::Str(original.name.clone())].encode_as_key(RelationId::SYSTEM);
@ -662,14 +662,10 @@ impl<'a> SessionTx<'a> {
let metadata = RelationHandle::decode(&found)?;
Ok(metadata)
}
pub(crate) fn describe_relation(
&mut self,
name: &str,
description: SmartString<LazyCompact>,
) -> Result<()> {
pub(crate) fn describe_relation(&mut self, name: &str, description: &str) -> Result<()> {
let mut meta = self.get_relation(name, true)?;
meta.description = description;
meta.description = SmartString::from(description);
let name_key = vec![DataValue::Str(meta.name.clone())].encode_as_key(RelationId::SYSTEM);
let mut meta_val = vec![];
meta.serialize(&mut Serializer::new(&mut meta_val).with_struct_map())
@ -726,7 +722,7 @@ impl<'a> SessionTx<'a> {
to_clean.push((lower_bound, upper_bound));
Ok(to_clean)
}
pub(crate) fn set_access_level(&mut self, rel: Symbol, level: AccessLevel) -> Result<()> {
pub(crate) fn set_access_level(&mut self, rel: &Symbol, level: AccessLevel) -> Result<()> {
let mut meta = self.get_relation(&rel, true)?;
meta.access_level = level;
@ -740,7 +736,7 @@ impl<'a> SessionTx<'a> {
Ok(())
}
pub(crate) fn create_minhash_lsh_index(&mut self, config: MinHashLshConfig) -> Result<()> {
pub(crate) fn create_minhash_lsh_index(&mut self, config: &MinHashLshConfig) -> Result<()> {
// Get relation handle
let mut rel_handle = self.get_relation(&config.base_relation, true)?;
@ -805,12 +801,12 @@ impl<'a> SessionTx<'a> {
let num_perm = params.b * params.r;
let perms = HashPermutations::new(num_perm);
let manifest = MinHashLshIndexManifest {
base_relation: config.base_relation,
index_name: config.index_name,
extractor: config.extractor,
base_relation: config.base_relation.clone(),
index_name: config.index_name.clone(),
extractor: config.extractor.clone(),
n_gram: config.n_gram,
tokenizer: config.tokenizer,
filters: config.filters,
tokenizer: config.tokenizer.clone(),
filters: config.filters.clone(),
num_perm,
n_bands: params.b,
n_rows_in_band: params.r,
@ -870,7 +866,7 @@ impl<'a> SessionTx<'a> {
Ok(())
}
pub(crate) fn create_fts_index(&mut self, config: FtsIndexConfig) -> Result<()> {
pub(crate) fn create_fts_index(&mut self, config: &FtsIndexConfig) -> Result<()> {
// Get relation handle
let mut rel_handle = self.get_relation(&config.base_relation, true)?;
@ -946,11 +942,11 @@ impl<'a> SessionTx<'a> {
// add index to relation
let manifest = FtsIndexManifest {
base_relation: config.base_relation,
index_name: config.index_name,
extractor: config.extractor,
tokenizer: config.tokenizer,
filters: config.filters,
base_relation: config.base_relation.clone(),
index_name: config.index_name.clone(),
extractor: config.extractor.clone(),
tokenizer: config.tokenizer.clone(),
filters: config.filters.clone(),
};
// populate index
@ -1011,7 +1007,7 @@ impl<'a> SessionTx<'a> {
Ok(())
}
pub(crate) fn create_hnsw_index(&mut self, config: HnswIndexConfig) -> Result<()> {
pub(crate) fn create_hnsw_index(&mut self, config: &HnswIndexConfig) -> Result<()> {
// Get relation handle
let mut rel_handle = self.get_relation(&config.base_relation, true)?;
@ -1149,7 +1145,7 @@ impl<'a> SessionTx<'a> {
m_max: config.m_neighbours,
m_max0: config.m_neighbours * 2,
level_multiplier: 1. / (config.m_neighbours as f64).ln(),
index_filter: config.index_filter,
index_filter: config.index_filter.clone(),
extend_candidates: config.extend_candidates,
keep_pruned_connections: config.keep_pruned_connections,
};
@ -1237,7 +1233,7 @@ impl<'a> SessionTx<'a> {
&mut self,
rel_name: &Symbol,
idx_name: &Symbol,
cols: Vec<Symbol>,
cols: &[Symbol],
) -> Result<()> {
// Get relation handle
let mut rel_handle = self.get_relation(rel_name, true)?;
@ -1412,7 +1408,7 @@ impl<'a> SessionTx<'a> {
Ok(to_clean)
}
pub(crate) fn rename_relation(&mut self, old: Symbol, new: Symbol) -> Result<()> {
pub(crate) fn rename_relation(&mut self, old: &Symbol, new: &Symbol) -> Result<()> {
if old.name.starts_with('_') || new.name.starts_with('_') {
bail!("Bad name given");
}
@ -1434,7 +1430,7 @@ impl<'a> SessionTx<'a> {
rel.access_level
));
}
rel.name = new.name;
rel.name = new.name.clone();
let mut meta_val = vec![];
rel.serialize(&mut Serializer::new(&mut meta_val)).unwrap();

Loading…
Cancel
Save