locking in imperative script sysops

main
Ziyang Hu 1 year ago
parent e3e9bc90c4
commit c1c111c8c0

442
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -82,7 +82,7 @@ storage-tikv = ["dep:tikv-client", "dep:tokio"]
#! The other storage options are just for experimentation. We do not recommend using them.
[dependencies]
casey = "0.3.3"
casey = "0.4.0"
either = "1.7.0"
rand = "0.8.5"
miette = { version = "5.5.0", features = ["fancy"] }
@ -137,4 +137,4 @@ jieba-rs = "0.6.7"
aho-corasick = "1.0.1"
rust-stemmers = "1.2.0"
fast2s = "0.3.1"
swapvec = "0.2.0"
swapvec = "0.3.0"

@ -148,7 +148,41 @@ impl ImperativeStmt {
| ImperativeStmt::Break { .. }
| ImperativeStmt::Continue { .. }
| ImperativeStmt::TempSwap { .. } => {}
ImperativeStmt::SysOp { .. } => {}
ImperativeStmt::SysOp { sysop } => {
match &sysop.sysop {
SysOp::RemoveRelation(rels) => {
for rel in rels {
collector.insert(rel.name.clone());
}
}
SysOp::RenameRelation(renames) => {
for (old, new) in renames {
collector.insert(old.name.clone());
collector.insert(new.name.clone());
}
}
SysOp::CreateIndex(symb, subs, _) => {
collector.insert(symb.name.clone());
collector.insert(SmartString::from(format!("{}:{}", symb.name, subs.name)));
}
SysOp::CreateVectorIndex(m) => {
collector.insert(m.base_relation.clone());
collector.insert(SmartString::from(format!("{}:{}", m.base_relation, m.index_name)));
}
SysOp::CreateFtsIndex(m) => {
collector.insert(m.base_relation.clone());
collector.insert(SmartString::from(format!("{}:{}", m.base_relation, m.index_name)));
}
SysOp::CreateMinHashLshIndex(m) => {
collector.insert(m.base_relation.clone());
collector.insert(SmartString::from(format!("{}:{}", m.base_relation, m.index_name)));
}
SysOp::RemoveIndex(rel, idx) => {
collector.insert(SmartString::from(format!("{}:{}", rel.name, idx.name)));
}
_ => {}
}
}
}
}
}

@ -1169,6 +1169,7 @@ impl<'s, S: Storage<'s>> Db<S> {
tx: &mut SessionTx<'_>,
op: &SysOp,
read_only: bool,
skip_locking: bool,
) -> Result<NamedRows> {
match op {
SysOp::Explain(prog) => {
@ -1204,7 +1205,11 @@ impl<'s, S: Storage<'s>> Db<S> {
bail!("Cannot remove relations in read-only mode");
}
let rel_name_strs = rel_names.iter().map(|n| &n.name);
let locks = self.obtain_relation_locks(rel_name_strs);
let locks = if skip_locking {
vec![]
} else {
self.obtain_relation_locks(rel_name_strs)
};
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
let mut bounds = vec![];
for rs in rel_names {
@ -1232,12 +1237,16 @@ impl<'s, S: Storage<'s>> Db<S> {
if read_only {
bail!("Cannot create index in read-only mode");
}
let lock = self
.obtain_relation_locks(iter::once(&rel_name.name))
.pop()
.unwrap();
let _guard = lock.write().unwrap();
tx.create_index(&rel_name, &idx_name, cols)?;
if skip_locking {
tx.create_index(&rel_name, &idx_name, cols)?;
} else {
let lock = self
.obtain_relation_locks(iter::once(&rel_name.name))
.pop()
.unwrap();
let _guard = lock.write().unwrap();
tx.create_index(&rel_name, &idx_name, cols)?;
}
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1247,12 +1256,16 @@ impl<'s, S: Storage<'s>> Db<S> {
if read_only {
bail!("Cannot create vector index in read-only mode");
}
let lock = self
.obtain_relation_locks(iter::once(&config.base_relation))
.pop()
.unwrap();
let _guard = lock.write().unwrap();
tx.create_hnsw_index(config)?;
if skip_locking {
tx.create_hnsw_index(config)?;
} else {
let lock = self
.obtain_relation_locks(iter::once(&config.base_relation))
.pop()
.unwrap();
let _guard = lock.write().unwrap();
tx.create_hnsw_index(config)?;
}
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1262,12 +1275,16 @@ impl<'s, S: Storage<'s>> Db<S> {
if read_only {
bail!("Cannot create fts index in read-only mode");
}
let lock = self
.obtain_relation_locks(iter::once(&config.base_relation))
.pop()
.unwrap();
let _guard = lock.write().unwrap();
tx.create_fts_index(config)?;
if skip_locking {
tx.create_fts_index(config)?;
} else {
let lock = self
.obtain_relation_locks(iter::once(&config.base_relation))
.pop()
.unwrap();
let _guard = lock.write().unwrap();
tx.create_fts_index(config)?;
}
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1277,12 +1294,17 @@ impl<'s, S: Storage<'s>> Db<S> {
if read_only {
bail!("Cannot create minhash lsh index in read-only mode");
}
let lock = self
.obtain_relation_locks(iter::once(&config.base_relation))
.pop()
.unwrap();
let _guard = lock.write().unwrap();
tx.create_minhash_lsh_index(config)?;
if skip_locking {
tx.create_minhash_lsh_index(config)?;
} else {
let lock = self
.obtain_relation_locks(iter::once(&config.base_relation))
.pop()
.unwrap();
let _guard = lock.write().unwrap();
tx.create_minhash_lsh_index(config)?;
}
Ok(NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
@ -1292,12 +1314,17 @@ impl<'s, S: Storage<'s>> Db<S> {
if read_only {
bail!("Cannot remove index in read-only mode");
}
let lock = self
.obtain_relation_locks(iter::once(&rel_name.name))
.pop()
.unwrap();
let _guard = lock.read().unwrap();
let bounds = tx.remove_index(&rel_name, &idx_name)?;
let bounds = if skip_locking {
tx.remove_index(&rel_name, &idx_name)?
} else {
let lock = self
.obtain_relation_locks(iter::once(&rel_name.name))
.pop()
.unwrap();
let _guard = lock.read().unwrap();
tx.remove_index(&rel_name, &idx_name)?
};
for (lower, upper) in bounds {
tx.store_tx.del_range_from_persisted(&lower, &upper)?;
}
@ -1313,7 +1340,11 @@ impl<'s, S: Storage<'s>> Db<S> {
bail!("Cannot rename relations in read-only mode");
}
let rel_names = rename_pairs.iter().flat_map(|(f, t)| [&f.name, &t.name]);
let locks = self.obtain_relation_locks(rel_names);
let locks = if skip_locking {
vec![]
} else {
self.obtain_relation_locks(rel_names)
};
let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
for (old, new) in rename_pairs {
tx.rename_relation(old, new)?;
@ -1391,7 +1422,7 @@ impl<'s, S: Storage<'s>> Db<S> {
} else {
self.transact_write()?
};
let res = self.run_sys_op_with_tx(&mut tx, &op, read_only)?;
let res = self.run_sys_op_with_tx(&mut tx, &op, read_only, false)?;
tx.commit_tx()?;
Ok(res)
}

@ -120,7 +120,7 @@ impl<'s, S: Storage<'s>> Db<S> {
ret = NamedRows::default();
}
ImperativeStmt::SysOp { sysop, .. } => {
ret = self.run_sys_op_with_tx(tx, &sysop.sysop, readonly)?;
ret = self.run_sys_op_with_tx(tx, &sysop.sysop, readonly, true)?;
if let Some(store_as) = &sysop.store_as {
tx.script_store_as_relation(self, store_as, &ret, cur_vld)?;
}

@ -1225,4 +1225,67 @@ fn update_shall_work() {
db.run_default(r"?[x, y] <- [[1, 4]] :update z {x, y}").unwrap();
let r = db.run_default(r"?[x, y, z] := *z {x, y, z}").unwrap();
assert_eq!(r.into_json()["rows"], json!([[1, 4, 3]]));
}
#[test]
fn sysop_in_imperatives() {
let script = r#"
{
:create cm_src {
aid: String =>
title: String,
author: String?,
kind: String,
url: String,
domain: String?,
pub_time: Float?,
dt: Float default now(),
weight: Float default 1,
}
}
{
:create cm_txt {
tid: String =>
aid: String,
tag: String,
follows_tid: String?,
dup_for: String?,
text: String,
info_amount: Int,
}
}
{
:create cm_seg {
sid: String =>
tid: String,
tag: String,
part: Int,
text: String,
vec: <F32; 1536>,
}
}
{
::hnsw create cm_seg:vec {
dim: 1536,
m: 50,
dtype: F32,
fields: vec,
distance: Cosine,
ef: 100,
}
}
{
::lsh create cm_txt:lsh {
extractor: text,
extract_filter: is_null(dup_for),
tokenizer: NGram,
n_perm: 200,
target_threshold: 0.5,
n_gram: 7,
}
}
{::relations}
"#;
let db = DbInstance::default();
db.run_default(script).unwrap();
}
Loading…
Cancel
Save