diff --git a/server/src/engine/core/model/alt.rs b/server/src/engine/core/model/alt.rs index a4273e3f..659c126a 100644 --- a/server/src/engine/core/model/alt.rs +++ b/server/src/engine/core/model/alt.rs @@ -249,9 +249,8 @@ impl<'a> AlterPlan<'a> { } impl Model { - pub fn transactional_exec_alter( - global: &impl GlobalInstanceLike, - txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS, + pub fn transactional_exec_alter( + global: &G, alter: AlterModel, ) -> DatabaseResult<()> { let (space_name, model_name) = EntityLocator::parse_entity(alter.model)?; @@ -273,14 +272,14 @@ impl Model { AlterAction::Add(new_fields) => { let mut guard = model.delta_state().schema_delta_write(); // TODO(@ohsayan): this impacts lockdown duration; fix it - if GI::NONNULL { + if G::FS_IS_NON_NULL { // prepare txn let txn = gnstxn::AlterModelAddTxn::new( gnstxn::ModelIDRef::new_ref(space_name, space, model_name, model), &new_fields, ); // commit txn - txn_driver.try_commit(txn)?; + global.namespace_txn_driver().lock().try_commit(txn)?; } new_fields .stseq_ord_kv() @@ -294,14 +293,14 @@ impl Model { } AlterAction::Remove(removed) => { let mut guard = model.delta_state().schema_delta_write(); - if GI::NONNULL { + if G::FS_IS_NON_NULL { // prepare txn let txn = gnstxn::AlterModelRemoveTxn::new( gnstxn::ModelIDRef::new_ref(space_name, space, model_name, model), &removed, ); // commit txn - txn_driver.try_commit(txn)?; + global.namespace_txn_driver().lock().try_commit(txn)?; } removed.iter().for_each(|field_id| { model.delta_state().schema_append_unresolved_wl_field_rem( @@ -312,14 +311,14 @@ impl Model { }); } AlterAction::Update(updated) => { - if GI::NONNULL { + if G::FS_IS_NON_NULL { // prepare txn let txn = gnstxn::AlterModelUpdateTxn::new( gnstxn::ModelIDRef::new_ref(space_name, space, model_name, model), &updated, ); // commit txn - txn_driver.try_commit(txn)?; + global.namespace_txn_driver().lock().try_commit(txn)?; } updated.into_iter().for_each(|(field_id, field)| { iwm.fields_mut().st_update(&field_id, field); @@ -330,9 +329,4 @@ impl Model { }) }) } - pub fn exec_alter(global: &impl GlobalInstanceLike, stmt: AlterModel) -> DatabaseResult<()> { - gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(global.namespace(), |driver| { - Self::transactional_exec_alter(global, driver, stmt) - }) - } } diff --git a/server/src/engine/core/model/mod.rs b/server/src/engine/core/model/mod.rs index fb037092..3926f8fe 100644 --- a/server/src/engine/core/model/mod.rs +++ b/server/src/engine/core/model/mod.rs @@ -204,9 +204,8 @@ impl Model { } impl Model { - pub fn transactional_exec_create( - global: &impl GlobalInstanceLike, - txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS, + pub fn transactional_exec_create( + global: &G, stmt: CreateModel, ) -> DatabaseResult<()> { let (space_name, model_name) = stmt.model_name.parse_entity()?; @@ -216,9 +215,10 @@ impl Model { if w_space.st_contains(model_name) { return Err(DatabaseError::DdlModelAlreadyExists); } - if GI::NONNULL { + if G::FS_IS_NON_NULL { // prepare txn let irm = model.intent_read_model(); + let mut txn_driver = global.namespace_txn_driver().lock(); let txn = gnstxn::CreateModelTxn::new( gnstxn::SpaceIDRef::new(space_name, space), model_name, @@ -233,18 +233,8 @@ impl Model { Ok(()) }) } - #[cfg(test)] - pub fn nontransactional_exec_create( - global: &impl GlobalInstanceLike, - stmt: CreateModel, - ) -> DatabaseResult<()> { - gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(global.namespace(), |driver| { - Self::transactional_exec_create(global, driver, stmt) - }) - } - pub fn transactional_exec_drop( - global: &impl GlobalInstanceLike, - txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS, + pub fn transactional_exec_drop( + global: &G, stmt: DropModel, ) -> DatabaseResult<()> { let (space_name, model_name) = stmt.entity.parse_entity()?; @@ -253,7 +243,7 @@ impl Model { let Some(model) = w_space.get(model_name) else { return Err(DatabaseError::DdlModelNotFound); }; - if GI::NONNULL { + if G::FS_IS_NON_NULL { // prepare txn let txn = gnstxn::DropModelTxn::new(gnstxn::ModelIDRef::new( gnstxn::SpaceIDRef::new(space_name, space), @@ -262,22 +252,13 @@ impl Model { model.delta_state().schema_current_version().value_u64(), )); // commit txn - txn_driver.try_commit(txn)?; + global.namespace_txn_driver().lock().try_commit(txn)?; } // update global state let _ = w_space.st_delete(model_name); Ok(()) }) } - #[cfg(test)] - pub fn nontransactional_exec_drop( - global: &impl GlobalInstanceLike, - stmt: DropModel, - ) -> DatabaseResult<()> { - gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(global.namespace(), |driver| { - Self::transactional_exec_drop(global, driver, stmt) - }) - } } /* diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index 6a948dfc..74d2d068 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -189,9 +189,8 @@ impl Space { } impl Space { - pub fn transactional_exec_create( - global: &impl GlobalInstanceLike, - txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS, + pub fn transactional_exec_create( + global: &G, space: CreateSpace, ) -> DatabaseResult<()> { // process create @@ -202,29 +201,20 @@ impl Space { return Err(DatabaseError::DdlSpaceAlreadyExists); } // commit txn - if TI::NONNULL { + if G::FS_IS_NON_NULL { // prepare and commit txn let s_read = space.metadata().dict().read(); - txn_driver.try_commit(gnstxn::CreateSpaceTxn::new(&s_read, &space_name, &space))?; + global + .namespace_txn_driver() + .lock() + .try_commit(gnstxn::CreateSpaceTxn::new(&s_read, &space_name, &space))?; } // update global state let _ = wl.st_insert(space_name, space); Ok(()) } - /// Execute a `create` stmt - #[cfg(test)] - pub fn nontransactional_exec_create( - global: &impl GlobalInstanceLike, - space: CreateSpace, - ) -> DatabaseResult<()> { - gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec( - global.namespace(), - move |driver| Self::transactional_exec_create(global, driver, space), - ) - } - pub fn transactional_exec_alter( - global: &impl GlobalInstanceLike, - txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS, + pub fn transactional_exec_alter( + global: &G, AlterSpace { space_name, updated_props, @@ -243,12 +233,12 @@ impl Space { Some(patch) => patch, None => return Err(DatabaseError::DdlSpaceBadProperty), }; - if TI::NONNULL { + if G::FS_IS_NON_NULL { // prepare txn let txn = gnstxn::AlterSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, space), &patch); // commit - txn_driver.try_commit(txn)?; + global.namespace_txn_driver().lock().try_commit(txn)?; } // merge dict::rmerge_data_with_patch(&mut space_props, patch); @@ -261,20 +251,8 @@ impl Space { Ok(()) }) } - #[cfg(test)] - /// Execute a `alter` stmt - pub fn nontransactional_exec_alter( - global: &impl GlobalInstanceLike, - alter: AlterSpace, - ) -> DatabaseResult<()> { - gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec( - global.namespace(), - move |driver| Self::transactional_exec_alter(global, driver, alter), - ) - } - pub fn transactional_exec_drop( - global: &impl GlobalInstanceLike, - txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS, + pub fn transactional_exec_drop( + global: &G, DropSpace { space, force: _ }: DropSpace, ) -> DatabaseResult<()> { // TODO(@ohsayan): force remove option @@ -290,25 +268,15 @@ impl Space { return Err(DatabaseError::DdlSpaceRemoveNonEmpty); } // we can remove this - if TI::NONNULL { + if G::FS_IS_NON_NULL { // prepare txn let txn = gnstxn::DropSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, space)); - txn_driver.try_commit(txn)?; + global.namespace_txn_driver().lock().try_commit(txn)?; } drop(space_w); let _ = wgns.st_delete(space_name.as_str()); Ok(()) } - #[cfg(test)] - pub fn nontransactional_exec_drop( - global: &impl GlobalInstanceLike, - drop_space: DropSpace, - ) -> DatabaseResult<()> { - gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec( - global.namespace(), - move |driver| Self::transactional_exec_drop(global, driver, drop_space), - ) - } } #[cfg(test)] diff --git a/server/src/engine/core/tests/ddl_model/alt.rs b/server/src/engine/core/tests/ddl_model/alt.rs index ca017e03..c86cf586 100644 --- a/server/src/engine/core/tests/ddl_model/alt.rs +++ b/server/src/engine/core/tests/ddl_model/alt.rs @@ -63,7 +63,7 @@ fn exec_plan( let tok = lex_insecure(plan.as_bytes()).unwrap(); let alter = parse_ast_node_full::(&tok[2..]).unwrap(); let (_space, model_name) = alter.model.into_full().unwrap(); - Model::exec_alter(global, alter)?; + Model::transactional_exec_alter(global, alter)?; let gns_read = global.namespace().spaces().read(); let space = gns_read.st_get("myspace").unwrap(); let model = space.models().read(); @@ -359,7 +359,7 @@ mod exec { }; #[test] fn simple_add() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); super::exec_plan( &global, true, @@ -390,7 +390,7 @@ mod exec { } #[test] fn simple_remove() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); super::exec_plan( &global, true, @@ -416,7 +416,7 @@ mod exec { } #[test] fn simple_update() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); super::exec_plan( &global, true, @@ -435,7 +435,7 @@ mod exec { } #[test] fn failing_alter_nullable_switch_need_lock() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_plan( &global, diff --git a/server/src/engine/core/tests/ddl_model/crt.rs b/server/src/engine/core/tests/ddl_model/crt.rs index 4b247002..1011d8a2 100644 --- a/server/src/engine/core/tests/ddl_model/crt.rs +++ b/server/src/engine/core/tests/ddl_model/crt.rs @@ -145,7 +145,7 @@ mod exec { #[test] fn simple() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); exec_create_new_space( &global, "create model myspace.mymodel(username: string, password: binary)", diff --git a/server/src/engine/core/tests/ddl_model/mod.rs b/server/src/engine/core/tests/ddl_model/mod.rs index 61af1fc4..c2a52baf 100644 --- a/server/src/engine/core/tests/ddl_model/mod.rs +++ b/server/src/engine/core/tests/ddl_model/mod.rs @@ -61,7 +61,7 @@ pub fn exec_create( .namespace() .test_new_empty_space(&create_model.model_name.into_full().unwrap().0); } - Model::nontransactional_exec_create(global, create_model).map(|_| name) + Model::transactional_exec_create(global, create_model).map(|_| name) } pub fn exec_create_new_space( diff --git a/server/src/engine/core/tests/ddl_space/alter.rs b/server/src/engine/core/tests/ddl_space/alter.rs index ecfd7314..aa772b25 100644 --- a/server/src/engine/core/tests/ddl_space/alter.rs +++ b/server/src/engine/core/tests/ddl_space/alter.rs @@ -33,7 +33,7 @@ use crate::engine::{ #[test] fn alter_add_prop_env_var() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); super::exec_create_alter( &global, "create space myspace", @@ -54,7 +54,7 @@ fn alter_add_prop_env_var() { #[test] fn alter_update_prop_env_var() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); let uuid = super::exec_create( &global, "create space myspace with { env: { MY_NEW_PROP: 100 } }", @@ -86,7 +86,7 @@ fn alter_update_prop_env_var() { #[test] fn alter_remove_prop_env_var() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); let uuid = super::exec_create( &global, "create space myspace with { env: { MY_NEW_PROP: 100 } }", @@ -114,7 +114,7 @@ fn alter_remove_prop_env_var() { #[test] fn alter_nx() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_alter( &global, @@ -128,7 +128,7 @@ fn alter_nx() { #[test] fn alter_remove_all_env() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); let uuid = super::exec_create( &global, "create space myspace with { env: { MY_NEW_PROP: 100 } }", diff --git a/server/src/engine/core/tests/ddl_space/create.rs b/server/src/engine/core/tests/ddl_space/create.rs index bcf9cf2b..574121fa 100644 --- a/server/src/engine/core/tests/ddl_space/create.rs +++ b/server/src/engine/core/tests/ddl_space/create.rs @@ -33,7 +33,7 @@ use crate::engine::{ #[test] fn exec_create_space_simple() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); super::exec_create(&global, "create space myspace", |spc| { assert!(spc.models().read().is_empty()) }) @@ -42,7 +42,7 @@ fn exec_create_space_simple() { #[test] fn exec_create_space_with_env() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); super::exec_create( &global, r#" @@ -70,7 +70,7 @@ fn exec_create_space_with_env() { #[test] fn exec_create_space_with_bad_env_type() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_create(&global, "create space myspace with { env: 100 }", |_| {}).unwrap_err(), DatabaseError::DdlSpaceBadProperty @@ -79,7 +79,7 @@ fn exec_create_space_with_bad_env_type() { #[test] fn exec_create_space_with_random_property() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_create( &global, diff --git a/server/src/engine/core/tests/ddl_space/mod.rs b/server/src/engine/core/tests/ddl_space/mod.rs index 7e05f0a8..c3942564 100644 --- a/server/src/engine/core/tests/ddl_space/mod.rs +++ b/server/src/engine/core/tests/ddl_space/mod.rs @@ -47,7 +47,7 @@ fn exec_create( let ast_node = ast::parse_ast_node_full::(&tok[2..]).unwrap(); let name = ast_node.space_name; - Space::nontransactional_exec_create(gns, ast_node)?; + Space::transactional_exec_create(gns, ast_node)?; gns.namespace().with_space(&name, |space| { verify(space); Ok(space.get_uuid()) @@ -63,7 +63,7 @@ fn exec_alter( let ast_node = ast::parse_ast_node_full::(&tok[2..]).unwrap(); let name = ast_node.space_name; - Space::nontransactional_exec_alter(gns, ast_node)?; + Space::transactional_exec_alter(gns, ast_node)?; gns.namespace().with_space(&name, |space| { verify(space); Ok(space.get_uuid()) diff --git a/server/src/engine/core/tests/dml/delete.rs b/server/src/engine/core/tests/dml/delete.rs index 92183ab2..e0b0ee39 100644 --- a/server/src/engine/core/tests/dml/delete.rs +++ b/server/src/engine/core/tests/dml/delete.rs @@ -28,7 +28,7 @@ use crate::engine::{error::DatabaseError, fractal::test_utils::TestGlobal}; #[test] fn simple_delete() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); super::exec_delete( &global, "create model myspace.mymodel(username: string, password: string)", @@ -41,7 +41,7 @@ fn simple_delete() { #[test] fn delete_nonexisting() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_delete( &global, diff --git a/server/src/engine/core/tests/dml/insert.rs b/server/src/engine/core/tests/dml/insert.rs index de36b22a..fb3a07a5 100644 --- a/server/src/engine/core/tests/dml/insert.rs +++ b/server/src/engine/core/tests/dml/insert.rs @@ -31,7 +31,7 @@ struct Tuple(Vec<(Box, Datacell)>); #[test] fn insert_simple() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); super::exec_insert( &global, "create model myspace.mymodel(username: string, password: string)", @@ -46,7 +46,7 @@ fn insert_simple() { #[test] fn insert_with_null() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); super::exec_insert( &global, "create model myspace.mymodel(username: string, null useless_password: string, null useless_email: string, null useless_random_column: uint64)", @@ -69,7 +69,7 @@ fn insert_with_null() { #[test] fn insert_duplicate() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); super::exec_insert( &global, "create model myspace.mymodel(username: string, password: string)", diff --git a/server/src/engine/core/tests/dml/mod.rs b/server/src/engine/core/tests/dml/mod.rs index 8f14363a..c1d489a9 100644 --- a/server/src/engine/core/tests/dml/mod.rs +++ b/server/src/engine/core/tests/dml/mod.rs @@ -51,7 +51,7 @@ fn _exec_only_create_space_model( } let lex_create_model = lex_insecure(model.as_bytes()).unwrap(); let stmt_create_model = parse_ast_node_full(&lex_create_model[2..]).unwrap(); - Model::nontransactional_exec_create(global, stmt_create_model) + Model::transactional_exec_create(global, stmt_create_model) } fn _exec_only_insert( diff --git a/server/src/engine/core/tests/dml/select.rs b/server/src/engine/core/tests/dml/select.rs index 8c5f079c..600c4dd8 100644 --- a/server/src/engine/core/tests/dml/select.rs +++ b/server/src/engine/core/tests/dml/select.rs @@ -28,7 +28,7 @@ use crate::engine::{data::cell::Datacell, error::DatabaseError, fractal::test_ut #[test] fn simple_select_wildcard() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_select( &global, @@ -43,7 +43,7 @@ fn simple_select_wildcard() { #[test] fn simple_select_specified_same_order() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_select( &global, @@ -58,7 +58,7 @@ fn simple_select_specified_same_order() { #[test] fn simple_select_specified_reversed_order() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_select( &global, @@ -73,7 +73,7 @@ fn simple_select_specified_reversed_order() { #[test] fn select_null() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_select( &global, @@ -88,7 +88,7 @@ fn select_null() { #[test] fn select_nonexisting() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_select( &global, diff --git a/server/src/engine/core/tests/dml/update.rs b/server/src/engine/core/tests/dml/update.rs index f49ed310..b1338b17 100644 --- a/server/src/engine/core/tests/dml/update.rs +++ b/server/src/engine/core/tests/dml/update.rs @@ -30,7 +30,7 @@ use crate::engine::{ #[test] fn simple() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_update( &global, @@ -49,7 +49,7 @@ fn simple() { #[test] fn with_null() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_update( &global, @@ -66,7 +66,7 @@ fn with_null() { #[test] fn with_list() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_update( &global, @@ -86,7 +86,7 @@ fn with_list() { #[test] fn fail_operation_on_null() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_update( &global, @@ -106,7 +106,7 @@ fn fail_operation_on_null() { #[test] fn fail_unknown_fields() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_update( &global, @@ -132,7 +132,7 @@ fn fail_unknown_fields() { #[test] fn fail_typedef_violation() { - let global = TestGlobal::empty(); + let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_update( &global, diff --git a/server/src/engine/fractal/drivers.rs b/server/src/engine/fractal/drivers.rs index 8bd42c9d..c003c8c2 100644 --- a/server/src/engine/fractal/drivers.rs +++ b/server/src/engine/fractal/drivers.rs @@ -47,6 +47,9 @@ impl FractalGNSDriver { txn_driver: Mutex::new(txn_driver), } } + pub fn txn_driver(&self) -> &Mutex> { + &self.txn_driver + } } /// Model driver diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index 4358eea5..26f5d1db 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -26,9 +26,12 @@ use { super::{ - core::GlobalNS, data::uuid::Uuid, storage::v1::LocalFS, txn::gns::GNSTransactionDriverAnyFS, + core::GlobalNS, + data::uuid::Uuid, + storage::v1::{LocalFS, RawFSInterface}, + txn::gns::GNSTransactionDriverAnyFS, }, - parking_lot::RwLock, + parking_lot::{Mutex, RwLock}, std::{collections::HashMap, mem::MaybeUninit}, tokio::sync::mpsc::unbounded_channel, }; @@ -98,10 +101,16 @@ pub unsafe fn enable_and_start_all( /// Something that represents the global state pub trait GlobalInstanceLike { - fn namespace(&self) -> &GlobalNS; - fn post_high_priority_task(&self, task: Task); - fn post_standard_priority_task(&self, task: Task); + type FileSystem: RawFSInterface; + const FS_IS_NON_NULL: bool = Self::FileSystem::NOT_NULL; + // stat fn get_max_delta_size(&self) -> usize; + // global namespace + fn namespace(&self) -> &GlobalNS; + fn namespace_txn_driver(&self) -> &Mutex>; + // taskmgr + fn taskmgr_post_high_priority(&self, task: Task); + fn taskmgr_post_standard_priority(&self, task: Task); // default impls fn request_batch_resolve( &self, @@ -110,7 +119,7 @@ pub trait GlobalInstanceLike { model_uuid: Uuid, observed_len: usize, ) { - self.post_high_priority_task(Task::new(CriticalTask::WriteBatch( + self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch( ModelUniqueID::new(space_name, model_name, model_uuid), observed_len, ))) @@ -118,15 +127,22 @@ pub trait GlobalInstanceLike { } impl GlobalInstanceLike for Global { + type FileSystem = LocalFS; + // ns fn namespace(&self) -> &GlobalNS { self._namespace() } - fn post_high_priority_task(&self, task: Task) { + fn namespace_txn_driver(&self) -> &Mutex> { + self.get_state().gns_driver.txn_driver() + } + // taskmgr + fn taskmgr_post_high_priority(&self, task: Task) { self._post_high_priority_task(task) } - fn post_standard_priority_task(&self, task: Task) { + fn taskmgr_post_standard_priority(&self, task: Task) { self._post_standard_priority_task(task) } + // stat fn get_max_delta_size(&self) -> usize { self._get_max_delta_size() } diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 39e2e99d..9789754a 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -26,43 +26,98 @@ use { super::{CriticalTask, GenericTask, GlobalInstanceLike, Task}, - crate::engine::core::GlobalNS, - parking_lot::RwLock, + crate::engine::{ + core::GlobalNS, + storage::v1::{ + header_meta::HostRunMode, + memfs::{NullFS, VirtualFS}, + RawFSInterface, + }, + txn::gns::GNSTransactionDriverAnyFS, + }, + parking_lot::{Mutex, RwLock}, }; /// A `test` mode global implementation -pub struct TestGlobal { +pub struct TestGlobal { gns: GlobalNS, hp_queue: RwLock>>, lp_queue: RwLock>>, max_delta_size: usize, + txn_driver: Mutex>, } -impl TestGlobal { - pub fn empty() -> Self { - Self::with_max_delta_size(0) - } - pub fn with_max_delta_size(max_delta_size: usize) -> Self { +impl TestGlobal { + fn new( + gns: GlobalNS, + max_delta_size: usize, + txn_driver: GNSTransactionDriverAnyFS, + ) -> Self { Self { - gns: GlobalNS::empty(), + gns, hp_queue: RwLock::default(), lp_queue: RwLock::default(), max_delta_size, + txn_driver: Mutex::new(txn_driver), } } } -impl GlobalInstanceLike for TestGlobal { +impl TestGlobal { + pub fn new_with_driver_id(log_name: &str) -> Self { + let gns = GlobalNS::empty(); + let driver = GNSTransactionDriverAnyFS::open_or_reinit_with_name( + &gns, + log_name, + 0, + HostRunMode::Prod, + 0, + ) + .unwrap(); + Self::new(gns, 0, driver) + } +} + +impl TestGlobal { + pub fn new_with_vfs_driver(log_name: &str) -> Self { + Self::new_with_driver_id(log_name) + } +} + +impl TestGlobal { + pub fn new_with_nullfs_driver(log_name: &str) -> Self { + Self::new_with_driver_id(log_name) + } + pub fn new_with_tmp_nullfs_driver() -> Self { + Self::new_with_nullfs_driver("") + } +} + +impl GlobalInstanceLike for TestGlobal { + type FileSystem = Fs; fn namespace(&self) -> &GlobalNS { &self.gns } - fn post_high_priority_task(&self, task: Task) { + fn namespace_txn_driver(&self) -> &Mutex> { + &self.txn_driver + } + fn taskmgr_post_high_priority(&self, task: Task) { self.hp_queue.write().push(task) } - fn post_standard_priority_task(&self, task: Task) { + fn taskmgr_post_standard_priority(&self, task: Task) { self.lp_queue.write().push(task) } fn get_max_delta_size(&self) -> usize { 100 } } + +impl Drop for TestGlobal { + fn drop(&mut self) { + let mut txn_driver = self.txn_driver.lock(); + txn_driver + .__journal_mut() + .__append_journal_close_and_close() + .unwrap(); + } +} diff --git a/server/src/engine/idx/mtchm/imp.rs b/server/src/engine/idx/mtchm/imp.rs index 94d1b2d1..7972d2dd 100644 --- a/server/src/engine/idx/mtchm/imp.rs +++ b/server/src/engine/idx/mtchm/imp.rs @@ -91,7 +91,7 @@ impl MTIndex for Raw { self.len() } fn mt_clear(&self, g: &Guard) { - self.nontransactional_clear(g) + self.transactional_clear(g) } fn mt_insert(&self, e: E, g: &Guard) -> bool diff --git a/server/src/engine/idx/mtchm/mod.rs b/server/src/engine/idx/mtchm/mod.rs index 0c49b6c3..0a75cbe1 100644 --- a/server/src/engine/idx/mtchm/mod.rs +++ b/server/src/engine/idx/mtchm/mod.rs @@ -236,7 +236,7 @@ impl RawTree { } impl RawTree { - fn nontransactional_clear(&self, g: &Guard) { + fn transactional_clear(&self, g: &Guard) { self.iter_key(g).for_each(|k| { let _ = self.remove(k, g); }); diff --git a/server/src/engine/storage/v1/journal.rs b/server/src/engine/storage/v1/journal.rs index 0a39daff..7336b932 100644 --- a/server/src/engine/storage/v1/journal.rs +++ b/server/src/engine/storage/v1/journal.rs @@ -470,7 +470,7 @@ impl JournalWriter { &JournalEntryMetadata::new(id, EventSourceMarker::DRIVER_REOPENED, 0, 0).encoded(), ) } - pub fn append_journal_close_and_close(mut self) -> SDSSResult<()> { + pub fn __append_journal_close_and_close(&mut self) -> SDSSResult<()> { self.closed = true; let id = self._incr_id() as u128; self.log_file.fsynced_write( @@ -478,6 +478,9 @@ impl JournalWriter { )?; Ok(()) } + pub fn append_journal_close_and_close(mut self) -> SDSSResult<()> { + self.__append_journal_close_and_close() + } } impl JournalWriter { diff --git a/server/src/engine/txn/gns/mod.rs b/server/src/engine/txn/gns/mod.rs index 73938417..e7b7e5ad 100644 --- a/server/src/engine/txn/gns/mod.rs +++ b/server/src/engine/txn/gns/mod.rs @@ -66,44 +66,14 @@ pub type GNSTransactionDriverVFS = GNSTransactionDriverAnyFS; const CURRENT_LOG_VERSION: u32 = 0; -pub trait GNSTransactionDriverLLInterface: RawFSInterface { - /// If true, this is an actual txn driver with a non-null (not `/dev/null` like) journal - const NONNULL: bool = ::NOT_NULL; -} -impl GNSTransactionDriverLLInterface for T {} - /// The GNS transaction driver is used to handle DDL transactions pub struct GNSTransactionDriverAnyFS { journal: JournalWriter, } -impl GNSTransactionDriverAnyFS { - pub fn nullzero(gns: &GlobalNS) -> Self { - let journal = v1::open_journal( - "gns.db-tlog", - header_meta::FileSpecifier::GNSTxnLog, - header_meta::FileSpecifierVersion::__new(CURRENT_LOG_VERSION), - 0, - header_meta::HostRunMode::Dev, - 0, - gns, - ) - .unwrap(); - Self { journal } - } - pub fn nullzero_create_exec(gns: &GlobalNS, f: impl FnOnce(&mut Self) -> T) -> T { - let mut j = Self::nullzero(gns); - let r = f(&mut j); - j.close().unwrap(); - r - } -} - -impl GNSTransactionDriverAnyFS { - pub fn close(self) -> TransactionResult<()> { - self.journal - .append_journal_close_and_close() - .map_err(|e| e.into()) +impl GNSTransactionDriverAnyFS { + pub fn __journal_mut(&mut self) -> &mut JournalWriter { + &mut self.journal } pub fn open_or_reinit_with_name( gns: &GlobalNS, @@ -141,7 +111,7 @@ impl GNSTransactionDriverAnyFS { /// the journal adapter for DDL queries on the GNS #[derive(Debug)] -struct GNSAdapter; +pub struct GNSAdapter; impl JournalAdapter for GNSAdapter { const RECOVERY_PLUGIN: bool = true; diff --git a/server/src/engine/txn/gns/tests/full_chain.rs b/server/src/engine/txn/gns/tests/full_chain.rs index 71d12ead..1199d4d4 100644 --- a/server/src/engine/txn/gns/tests/full_chain.rs +++ b/server/src/engine/txn/gns/tests/full_chain.rs @@ -38,8 +38,6 @@ use crate::engine::{ ddl::crt::{CreateModel, CreateSpace}, tests::lex_insecure, }, - storage::v1::header_meta::HostRunMode, - txn::gns::GNSTransactionDriverVFS, }; fn multirun(f: impl FnOnce() + Copy) { @@ -52,28 +50,12 @@ fn with_variable(var: T, f: impl FnOnce(T)) { f(var); } -fn init_txn_driver(global: &impl GlobalInstanceLike, log_name: &str) -> GNSTransactionDriverVFS { - GNSTransactionDriverVFS::open_or_reinit_with_name( - global.namespace(), - log_name, - 0, - HostRunMode::Prod, - 0, - ) - .unwrap() -} - -fn init_space( - global: &impl GlobalInstanceLike, - driver: &mut GNSTransactionDriverVFS, - space_name: &str, - env: &str, -) -> Uuid { +fn init_space(global: &impl GlobalInstanceLike, space_name: &str, env: &str) -> Uuid { let query = format!("create space {space_name} with {{ env: {env} }}"); let stmt = lex_insecure(query.as_bytes()).unwrap(); let stmt = parse_ast_node_full::(&stmt[2..]).unwrap(); let name = stmt.space_name; - Space::transactional_exec_create(global, driver, stmt).unwrap(); + Space::transactional_exec_create(global, stmt).unwrap(); global .namespace() .spaces() @@ -89,14 +71,11 @@ fn create_space() { let uuid; // start 1 { - let global = TestGlobal::empty(); - let mut driver = init_txn_driver(&global, log_name); - uuid = init_space(&global, &mut driver, "myspace", "{ SAYAN_MAX: 65536 }"); // good lord that doesn't sound like a good variable - driver.close().unwrap(); + let global = TestGlobal::new_with_vfs_driver(log_name); + uuid = init_space(&global, "myspace", "{ SAYAN_MAX: 65536 }"); // good lord that doesn't sound like a good variable } multirun(|| { - let global = TestGlobal::empty(); - let driver = init_txn_driver(&global, log_name); + let global = TestGlobal::new_with_vfs_driver(log_name); assert_eq!( global.namespace().spaces().read().get("myspace").unwrap(), &Space::new_restore_empty( @@ -106,7 +85,6 @@ fn create_space() { uuid ) ); - driver.close().unwrap(); }) }) } @@ -116,19 +94,16 @@ fn alter_space() { with_variable("alter_space_test.global.db-tlog", |log_name| { let uuid; { - let global = TestGlobal::empty(); - let mut driver = init_txn_driver(&global, log_name); - uuid = init_space(&global, &mut driver, "myspace", "{}"); + let global = TestGlobal::new_with_vfs_driver(log_name); + uuid = init_space(&global, "myspace", "{}"); let stmt = lex_insecure("alter space myspace with { env: { SAYAN_MAX: 65536 } }".as_bytes()) .unwrap(); let stmt = parse_ast_node_full(&stmt[2..]).unwrap(); - Space::transactional_exec_alter(&global, &mut driver, stmt).unwrap(); - driver.close().unwrap(); + Space::transactional_exec_alter(&global, stmt).unwrap(); } multirun(|| { - let global = TestGlobal::empty(); - let driver = init_txn_driver(&global, log_name); + let global = TestGlobal::new_with_vfs_driver(log_name); assert_eq!( global.namespace().spaces().read().get("myspace").unwrap(), &Space::new_restore_empty( @@ -138,7 +113,6 @@ fn alter_space() { uuid ) ); - driver.close().unwrap(); }) }) } @@ -147,26 +121,21 @@ fn alter_space() { fn drop_space() { with_variable("drop_space_test.global.db-tlog", |log_name| { { - let global = TestGlobal::empty(); - let mut driver = init_txn_driver(&global, log_name); - let _ = init_space(&global, &mut driver, "myspace", "{}"); + let global = TestGlobal::new_with_vfs_driver(log_name); + let _ = init_space(&global, "myspace", "{}"); let stmt = lex_insecure("drop space myspace".as_bytes()).unwrap(); let stmt = parse_ast_node_full(&stmt[2..]).unwrap(); - Space::transactional_exec_drop(&global, &mut driver, stmt).unwrap(); - driver.close().unwrap(); + Space::transactional_exec_drop(&global, stmt).unwrap(); } multirun(|| { - let global = TestGlobal::empty(); - let driver = init_txn_driver(&global, log_name); + let global = TestGlobal::new_with_vfs_driver(log_name); assert_eq!(global.namespace().spaces().read().get("myspace"), None); - driver.close().unwrap(); }) }) } fn init_model( global: &impl GlobalInstanceLike, - txn_driver: &mut GNSTransactionDriverVFS, space_name: &str, model_name: &str, decl: &str, @@ -175,20 +144,16 @@ fn init_model( let stmt = lex_insecure(query.as_bytes()).unwrap(); let stmt = parse_ast_node_full::(&stmt[2..]).unwrap(); let model_name = stmt.model_name; - Model::transactional_exec_create(global, txn_driver, stmt).unwrap(); + Model::transactional_exec_create(global, stmt).unwrap(); global .namespace() .with_model(model_name, |model| Ok(model.get_uuid())) .unwrap() } -fn init_default_model( - global: &impl GlobalInstanceLike, - driver: &mut GNSTransactionDriverVFS, -) -> Uuid { +fn init_default_model(global: &impl GlobalInstanceLike) -> Uuid { init_model( global, - driver, "myspace", "mymodel", "username: string, password: binary", @@ -201,15 +166,12 @@ fn create_model() { let _uuid_space; let uuid_model; { - let global = TestGlobal::empty(); - let mut driver = init_txn_driver(&global, log_name); - _uuid_space = init_space(&global, &mut driver, "myspace", "{}"); - uuid_model = init_default_model(&global, &mut driver); - driver.close().unwrap(); + let global = TestGlobal::new_with_vfs_driver(log_name); + _uuid_space = init_space(&global, "myspace", "{}"); + uuid_model = init_default_model(&global); } multirun(|| { - let global = TestGlobal::empty(); - let driver = init_txn_driver(&global, log_name); + let global = TestGlobal::new_with_vfs_driver(log_name); global .namespace() .with_model(("myspace", "mymodel"), |model| { @@ -228,7 +190,6 @@ fn create_model() { Ok(()) }) .unwrap(); - driver.close().unwrap(); }) }) } @@ -237,21 +198,18 @@ fn create_model() { fn alter_model_add() { with_variable("alter_model_add_test.global.db-tlog", |log_name| { { - let global = TestGlobal::empty(); - let mut driver = init_txn_driver(&global, log_name); - init_space(&global, &mut driver, "myspace", "{}"); - init_default_model(&global, &mut driver); + let global = TestGlobal::new_with_vfs_driver(log_name); + init_space(&global, "myspace", "{}"); + init_default_model(&global); let stmt = lex_insecure( b"alter model myspace.mymodel add profile_pic { type: binary, nullable: true }", ) .unwrap(); let stmt = parse_ast_node_full(&stmt[2..]).unwrap(); - Model::transactional_exec_alter(&global, &mut driver, stmt).unwrap(); - driver.close().unwrap(); + Model::transactional_exec_alter(&global, stmt).unwrap(); } multirun(|| { - let global = TestGlobal::empty(); - let driver = init_txn_driver(&global, log_name); + let global = TestGlobal::new_with_vfs_driver(log_name); global .namespace() .with_model(("myspace", "mymodel"), |model| { @@ -266,7 +224,6 @@ fn alter_model_add() { Ok(()) }) .unwrap(); - driver.close().unwrap(); }) }) } @@ -275,12 +232,10 @@ fn alter_model_add() { fn alter_model_remove() { with_variable("alter_model_remove_test.global.db-tlog", |log_name| { { - let global = TestGlobal::empty(); - let mut driver = init_txn_driver(&global, log_name); - init_space(&global, &mut driver, "myspace", "{}"); + let global = TestGlobal::new_with_vfs_driver(log_name); + init_space(&global, "myspace", "{}"); init_model( &global, - &mut driver, "myspace", "mymodel", "username: string, password: binary, null profile_pic: binary, null has_2fa: bool, null has_secure_key: bool, is_dumb: bool", @@ -290,12 +245,10 @@ fn alter_model_remove() { ) .unwrap(); let stmt = parse_ast_node_full(&stmt[2..]).unwrap(); - Model::transactional_exec_alter(&global, &mut driver, stmt).unwrap(); - driver.close().unwrap() + Model::transactional_exec_alter(&global, stmt).unwrap(); } multirun(|| { - let global = TestGlobal::empty(); - let driver = init_txn_driver(&global, log_name); + let global = TestGlobal::new_with_vfs_driver(log_name); global .namespace() .with_model(("myspace", "mymodel"), |model| { @@ -305,7 +258,6 @@ fn alter_model_remove() { Ok(()) }) .unwrap(); - driver.close().unwrap() }) }) } @@ -314,12 +266,10 @@ fn alter_model_remove() { fn alter_model_update() { with_variable("alter_model_update_test.global.db-tlog", |log_name| { { - let global = TestGlobal::empty(); - let mut driver = init_txn_driver(&global, log_name); - init_space(&global, &mut driver, "myspace", "{}"); + let global = TestGlobal::new_with_vfs_driver(log_name); + init_space(&global, "myspace", "{}"); init_model( &global, - &mut driver, "myspace", "mymodel", "username: string, password: binary, profile_pic: binary", @@ -328,12 +278,10 @@ fn alter_model_update() { lex_insecure(b"alter model myspace.mymodel update profile_pic { nullable: true }") .unwrap(); let stmt = parse_ast_node_full(&stmt[2..]).unwrap(); - Model::transactional_exec_alter(&global, &mut driver, stmt).unwrap(); - driver.close().unwrap(); + Model::transactional_exec_alter(&global, stmt).unwrap(); } multirun(|| { - let global = TestGlobal::empty(); - let driver = init_txn_driver(&global, log_name); + let global = TestGlobal::new_with_vfs_driver(log_name); global .namespace() .with_model(("myspace", "mymodel"), |model| { @@ -348,7 +296,6 @@ fn alter_model_update() { Ok(()) }) .unwrap(); - driver.close().unwrap(); }) }) } @@ -357,18 +304,15 @@ fn alter_model_update() { fn drop_model() { with_variable("drop_model_test.global.db-tlog", |log_name| { { - let global = TestGlobal::empty(); - let mut driver = init_txn_driver(&global, log_name); - init_space(&global, &mut driver, "myspace", "{}"); - init_default_model(&global, &mut driver); + let global = TestGlobal::new_with_vfs_driver(log_name); + init_space(&global, "myspace", "{}"); + init_default_model(&global); let stmt = lex_insecure(b"drop model myspace.mymodel").unwrap(); let stmt = parse_ast_node_full(&stmt[2..]).unwrap(); - Model::transactional_exec_drop(&global, &mut driver, stmt).unwrap(); - driver.close().unwrap(); + Model::transactional_exec_drop(&global, stmt).unwrap(); } multirun(|| { - let global = TestGlobal::empty(); - let driver = init_txn_driver(&global, log_name); + let global = TestGlobal::new_with_vfs_driver(log_name); assert_eq!( global .namespace() @@ -376,7 +320,6 @@ fn drop_model() { .unwrap_err(), DatabaseError::DdlModelNotFound ); - driver.close().unwrap(); }) }) }