Add txn impls and tests for model DDL

next
Sayan Nandan 1 year ago
parent 29d4137a2c
commit 8d2097e526
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -28,7 +28,7 @@ use {
super::{Field, IWModel, Layer, Model}, super::{Field, IWModel, Layer, Model},
crate::{ crate::{
engine::{ engine::{
core::GlobalNS, core::{util::EntityLocator, GlobalNS},
data::{ data::{
tag::{DataTag, TagClass}, tag::{DataTag, TagClass},
DictEntryGeneric, DictEntryGeneric,
@ -43,6 +43,7 @@ use {
}, },
lex::Ident, lex::Ident,
}, },
txn::gns as gnstxn,
}, },
util, util,
}, },
@ -247,8 +248,14 @@ impl<'a> AlterPlan<'a> {
} }
impl Model { impl Model {
pub fn exec_alter(gns: &GlobalNS, alter: AlterModel) -> DatabaseResult<()> { pub fn transactional_exec_alter<GI: gnstxn::GNSTransactionDriverLLInterface>(
gns.with_model(alter.model, |model| { gns: &GlobalNS,
txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS<GI>,
alter: AlterModel,
) -> DatabaseResult<()> {
let (space_name, model_name) = EntityLocator::parse_entity(alter.model)?;
gns.with_space(space_name, |space| {
space.with_model(model_name, |model| {
// make intent // make intent
let iwm = model.intent_write_model(); let iwm = model.intent_write_model();
// prepare plan // prepare plan
@ -265,6 +272,15 @@ impl Model {
AlterAction::Add(new_fields) => { AlterAction::Add(new_fields) => {
let mut guard = model.delta_state().wguard(); let mut guard = model.delta_state().wguard();
// TODO(@ohsayan): this impacts lockdown duration; fix it // TODO(@ohsayan): this impacts lockdown duration; fix it
if GI::NONNULL {
// prepare txn
let txn = gnstxn::AlterModelAddTxn::new(
gnstxn::ModelIDRef::new_ref(space_name, space, model_name, model),
&new_fields,
);
// commit txn
txn_driver.try_commit(txn)?;
}
new_fields new_fields
.stseq_ord_kv() .stseq_ord_kv()
.map(|(x, y)| (x.clone(), y.clone())) .map(|(x, y)| (x.clone(), y.clone()))
@ -275,22 +291,46 @@ impl Model {
iwm.fields_mut().st_insert(field_id, field); iwm.fields_mut().st_insert(field_id, field);
}); });
} }
AlterAction::Remove(remove) => { AlterAction::Remove(removed) => {
let mut guard = model.delta_state().wguard(); let mut guard = model.delta_state().wguard();
remove.iter().for_each(|field_id| { if GI::NONNULL {
// prepare txn
let txn = gnstxn::AlterModelRemoveTxn::new(
gnstxn::ModelIDRef::new_ref(space_name, space, model_name, model),
&removed,
);
// commit txn
txn_driver.try_commit(txn)?;
}
removed.iter().for_each(|field_id| {
model model
.delta_state() .delta_state()
.append_unresolved_wl_field_rem(&mut guard, field_id.as_str()); .append_unresolved_wl_field_rem(&mut guard, field_id.as_str());
iwm.fields_mut().st_delete(field_id.as_str()); iwm.fields_mut().st_delete(field_id.as_str());
}); });
} }
AlterAction::Update(u) => { AlterAction::Update(updated) => {
u.into_iter().for_each(|(field_id, field)| { if GI::NONNULL {
// prepare txn
let txn = gnstxn::AlterModelUpdateTxn::new(
gnstxn::ModelIDRef::new_ref(space_name, space, model_name, model),
&updated,
);
// commit txn
txn_driver.try_commit(txn)?;
}
updated.into_iter().for_each(|(field_id, field)| {
iwm.fields_mut().st_update(&field_id, field); iwm.fields_mut().st_update(&field_id, field);
}); });
} }
} }
Ok(()) Ok(())
}) })
})
}
pub fn exec_alter(gns: &GlobalNS, stmt: AlterModel) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, |driver| {
Self::transactional_exec_alter(gns, driver, stmt)
})
} }
} }

@ -47,6 +47,7 @@ use {
drop::DropModel, drop::DropModel,
syn::{FieldSpec, LayerSpec}, syn::{FieldSpec, LayerSpec},
}, },
txn::gns as gnstxn,
}, },
std::cell::UnsafeCell, std::cell::UnsafeCell,
}; };
@ -202,20 +203,78 @@ impl Model {
} }
impl Model { impl Model {
pub fn exec_create(gns: &super::GlobalNS, stmt: CreateModel) -> DatabaseResult<()> { pub fn transactional_exec_create<GI: gnstxn::GNSTransactionDriverLLInterface>(
gns: &super::GlobalNS,
txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS<GI>,
stmt: CreateModel,
) -> DatabaseResult<()> {
let (space_name, model_name) = stmt.model_name.parse_entity()?; let (space_name, model_name) = stmt.model_name.parse_entity()?;
let model = Self::process_create(stmt)?; let model = Self::process_create(stmt)?;
gns.with_space(space_name, |space| space._create_model(model_name, model)) gns.with_space(space_name, |space| {
let mut w_space = space.models().write();
if w_space.st_contains(model_name) {
return Err(DatabaseError::DdlModelAlreadyExists);
}
if GI::NONNULL {
// prepare txn
let irm = model.intent_read_model();
let txn = gnstxn::CreateModelTxn::new(
gnstxn::SpaceIDRef::new(space_name, space),
model_name,
&model,
&irm,
);
// commit txn
txn_driver.try_commit(txn)?;
}
// update global state
let _ = w_space.st_insert(model_name.into(), model);
Ok(())
})
}
#[cfg(test)]
pub fn nontransactional_exec_create(
gns: &super::GlobalNS,
stmt: CreateModel,
) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, |driver| {
Self::transactional_exec_create(gns, driver, stmt)
})
} }
pub fn exec_drop(gns: &super::GlobalNS, stmt: DropModel) -> DatabaseResult<()> { pub fn transactional_exec_drop<GI: gnstxn::GNSTransactionDriverLLInterface>(
let (space, model) = stmt.entity.parse_entity()?; gns: &super::GlobalNS,
gns.with_space(space, |space| { txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS<GI>,
stmt: DropModel,
) -> DatabaseResult<()> {
let (space_name, model_name) = stmt.entity.parse_entity()?;
gns.with_space(space_name, |space| {
let mut w_space = space.models().write(); let mut w_space = space.models().write();
match w_space.st_delete_if(model, |mdl| !mdl.is_empty_atomic()) { let Some(model) = w_space.get(model_name) else {
Some(true) => Ok(()), return Err(DatabaseError::DdlModelNotFound);
Some(false) => Err(DatabaseError::DdlModelViewNotEmpty), };
None => Err(DatabaseError::DdlModelNotFound), if GI::NONNULL {
// prepare txn
let txn = gnstxn::DropModelTxn::new(gnstxn::ModelIDRef::new(
gnstxn::SpaceIDRef::new(space_name, space),
model_name,
model.get_uuid(),
model.delta_state().current_version().value_u64(),
));
// commit txn
txn_driver.try_commit(txn)?;
}
// update global state
let _ = w_space.st_delete(model_name);
Ok(())
})
} }
#[cfg(test)]
pub fn nontransactional_exec_drop(
gns: &super::GlobalNS,
stmt: DropModel,
) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, |driver| {
Self::transactional_exec_drop(gns, driver, stmt)
}) })
} }
} }

@ -212,7 +212,10 @@ impl Space {
} }
/// Execute a `create` stmt /// Execute a `create` stmt
#[cfg(test)] #[cfg(test)]
pub fn exec_create(gns: &super::GlobalNS, space: CreateSpace) -> DatabaseResult<()> { pub fn nontransactional_exec_create(
gns: &super::GlobalNS,
space: CreateSpace,
) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, move |driver| { gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, move |driver| {
Self::transactional_exec_create(gns, driver, space) Self::transactional_exec_create(gns, driver, space)
}) })
@ -258,7 +261,10 @@ impl Space {
} }
#[cfg(test)] #[cfg(test)]
/// Execute a `alter` stmt /// Execute a `alter` stmt
pub fn exec_alter(gns: &super::GlobalNS, alter: AlterSpace) -> DatabaseResult<()> { pub fn nontransactional_exec_alter(
gns: &super::GlobalNS,
alter: AlterSpace,
) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, move |driver| { gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, move |driver| {
Self::transactional_exec_alter(gns, driver, alter) Self::transactional_exec_alter(gns, driver, alter)
}) })
@ -291,7 +297,10 @@ impl Space {
Ok(()) Ok(())
} }
#[cfg(test)] #[cfg(test)]
pub fn exec_drop(gns: &super::GlobalNS, drop_space: DropSpace) -> DatabaseResult<()> { pub fn nontransactional_exec_drop(
gns: &super::GlobalNS,
drop_space: DropSpace,
) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, move |driver| { gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, move |driver| {
Self::transactional_exec_drop(gns, driver, drop_space) Self::transactional_exec_drop(gns, driver, drop_space)
}) })

@ -58,7 +58,7 @@ pub fn exec_create(
if create_new_space { if create_new_space {
gns.test_new_empty_space(&create_model.model_name.into_full().unwrap().0); gns.test_new_empty_space(&create_model.model_name.into_full().unwrap().0);
} }
Model::exec_create(gns, create_model).map(|_| name) Model::nontransactional_exec_create(gns, create_model).map(|_| name)
} }
pub fn exec_create_new_space(gns: &GlobalNS, create_stmt: &str) -> DatabaseResult<()> { pub fn exec_create_new_space(gns: &GlobalNS, create_stmt: &str) -> DatabaseResult<()> {

@ -42,7 +42,7 @@ fn exec_create(gns: &GlobalNS, create: &str, verify: impl Fn(&Space)) -> Databas
let ast_node = let ast_node =
ast::parse_ast_node_full::<crate::engine::ql::ddl::crt::CreateSpace>(&tok[2..]).unwrap(); ast::parse_ast_node_full::<crate::engine::ql::ddl::crt::CreateSpace>(&tok[2..]).unwrap();
let name = ast_node.space_name; let name = ast_node.space_name;
Space::exec_create(gns, ast_node)?; Space::nontransactional_exec_create(gns, ast_node)?;
gns.with_space(&name, |space| { gns.with_space(&name, |space| {
verify(space); verify(space);
Ok(space.get_uuid()) Ok(space.get_uuid())
@ -54,7 +54,7 @@ fn exec_alter(gns: &GlobalNS, alter: &str, verify: impl Fn(&Space)) -> DatabaseR
let ast_node = let ast_node =
ast::parse_ast_node_full::<crate::engine::ql::ddl::alt::AlterSpace>(&tok[2..]).unwrap(); ast::parse_ast_node_full::<crate::engine::ql::ddl::alt::AlterSpace>(&tok[2..]).unwrap();
let name = ast_node.space_name; let name = ast_node.space_name;
Space::exec_alter(gns, ast_node)?; Space::nontransactional_exec_alter(gns, ast_node)?;
gns.with_space(&name, |space| { gns.with_space(&name, |space| {
verify(space); verify(space);
Ok(space.get_uuid()) Ok(space.get_uuid())

@ -47,7 +47,7 @@ fn _exec_only_create_space_model(gns: &GlobalNS, model: &str) -> DatabaseResult<
} }
let lex_create_model = lex_insecure(model.as_bytes()).unwrap(); let lex_create_model = lex_insecure(model.as_bytes()).unwrap();
let stmt_create_model = parse_ast_node_full(&lex_create_model[2..]).unwrap(); let stmt_create_model = parse_ast_node_full(&lex_create_model[2..]).unwrap();
Model::exec_create(gns, stmt_create_model) Model::nontransactional_exec_create(gns, stmt_create_model)
} }
fn _exec_only_insert<T>( fn _exec_only_insert<T>(

@ -53,6 +53,7 @@ mod tests;
pub use { pub use {
model::{ model::{
AlterModelAddTxn, AlterModelRemoveTxn, AlterModelUpdateTxn, CreateModelTxn, DropModelTxn, AlterModelAddTxn, AlterModelRemoveTxn, AlterModelUpdateTxn, CreateModelTxn, DropModelTxn,
ModelIDRef,
}, },
space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn}, space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn},
}; };

@ -57,6 +57,19 @@ pub struct ModelIDRef<'a> {
} }
impl<'a> ModelIDRef<'a> { impl<'a> ModelIDRef<'a> {
pub fn new_ref(
space_name: &'a str,
space: &'a Space,
model_name: &'a str,
model: &'a Model,
) -> ModelIDRef<'a> {
ModelIDRef::new(
super::SpaceIDRef::new(space_name, space),
model_name,
model.get_uuid(),
model.delta_state().current_version().value_u64(),
)
}
pub fn new( pub fn new(
space_id: super::SpaceIDRef<'a>, space_id: super::SpaceIDRef<'a>,
model_name: &'a str, model_name: &'a str,
@ -177,7 +190,7 @@ fn with_model<T>(
create model create model
*/ */
#[derive(Clone, Copy)] #[derive(Debug, Clone, Copy)]
/// The commit payload for a `create model ... (...) with {...}` txn /// The commit payload for a `create model ... (...) with {...}` txn
pub struct CreateModelTxn<'a> { pub struct CreateModelTxn<'a> {
space_id: super::SpaceIDRef<'a>, space_id: super::SpaceIDRef<'a>,

@ -26,11 +26,17 @@
use crate::engine::{ use crate::engine::{
core::{ core::{
model::{Field, Layer, Model},
space::{Space, SpaceMeta}, space::{Space, SpaceMeta},
GlobalNS, GlobalNS,
}, },
data::{cell::Datacell, uuid::Uuid, DictEntryGeneric}, data::{cell::Datacell, tag::TagSelector, uuid::Uuid, DictEntryGeneric},
ql::{ast::parse_ast_node_full, ddl::crt::CreateSpace, tests::lex_insecure}, idx::STIndex,
ql::{
ast::parse_ast_node_full,
ddl::crt::{CreateModel, CreateSpace},
tests::lex_insecure,
},
storage::v1::header_meta::HostRunMode, storage::v1::header_meta::HostRunMode,
txn::gns::GNSTransactionDriverVFS, txn::gns::GNSTransactionDriverVFS,
}; };
@ -143,3 +149,103 @@ fn drop_space() {
}) })
}) })
} }
fn init_model(
gns: &GlobalNS,
txn_driver: &mut GNSTransactionDriverVFS,
space_name: &str,
model_name: &str,
decl: &str,
) -> Uuid {
let query = format!("create model {space_name}.{model_name} ({decl})");
let stmt = lex_insecure(query.as_bytes()).unwrap();
let stmt = parse_ast_node_full::<CreateModel>(&stmt[2..]).unwrap();
let model_name = stmt.model_name;
Model::transactional_exec_create(&gns, txn_driver, stmt).unwrap();
gns.with_model(model_name, |model| Ok(model.get_uuid()))
.unwrap()
}
fn init_default_model(gns: &GlobalNS, driver: &mut GNSTransactionDriverVFS) -> Uuid {
init_model(
gns,
driver,
"myspace",
"mymodel",
"username: string, password: binary",
)
}
#[test]
fn create_model() {
with_variable("create_model_test.gns.db-tlog", |log_name| {
let _uuid_space;
let uuid_model;
{
let gns = GlobalNS::empty();
let mut driver = init_txn_driver(&gns, log_name);
_uuid_space = init_space(&gns, &mut driver, "myspace", "{}");
uuid_model = init_default_model(&gns, &mut driver);
driver.close().unwrap();
}
double_run(|| {
let gns = GlobalNS::empty();
let driver = init_txn_driver(&gns, log_name);
gns.with_model(("myspace", "mymodel"), |model| {
assert_eq!(
model,
&Model::new_restore(
uuid_model,
"username".into(),
TagSelector::Str.into_full(),
into_dict! {
"username" => Field::new([Layer::str()].into(), false),
"password" => Field::new([Layer::bin()].into(), false),
}
)
);
Ok(())
})
.unwrap();
driver.close().unwrap();
})
})
}
#[test]
fn alter_model_add() {
with_variable("alter_model_add_test.gns.db-tlog", |log_name| {
{
let gns = GlobalNS::empty();
let mut driver = init_txn_driver(&gns, log_name);
init_space(&gns, &mut driver, "myspace", "{}");
init_default_model(&gns, &mut driver);
let stmt = lex_insecure(
b"alter model myspace.mymodel add profile_pic { type: binary, nullable: true }",
)
.unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Model::transactional_exec_alter(&gns, &mut driver, stmt).unwrap();
driver.close().unwrap();
}
{
double_run(|| {
let gns = GlobalNS::empty();
let driver = init_txn_driver(&gns, log_name);
gns.with_model(("myspace", "mymodel"), |model| {
assert_eq!(
model
.intent_read_model()
.fields()
.st_get("profile_pic")
.unwrap(),
&Field::new([Layer::bin()].into(), true)
);
Ok(())
})
.unwrap();
driver.close().unwrap();
})
}
})
}

Loading…
Cancel
Save