Simplify gns txn driver handling

next
Sayan Nandan 1 year ago
parent cf055f418d
commit 1abe4a7217
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -249,9 +249,8 @@ impl<'a> AlterPlan<'a> {
}
impl Model {
pub fn transactional_exec_alter<GI: gnstxn::GNSTransactionDriverLLInterface>(
global: &impl GlobalInstanceLike,
txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS<GI>,
pub fn transactional_exec_alter<G: GlobalInstanceLike>(
global: &G,
alter: AlterModel,
) -> DatabaseResult<()> {
let (space_name, model_name) = EntityLocator::parse_entity(alter.model)?;
@ -273,14 +272,14 @@ impl Model {
AlterAction::Add(new_fields) => {
let mut guard = model.delta_state().schema_delta_write();
// TODO(@ohsayan): this impacts lockdown duration; fix it
if GI::NONNULL {
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::AlterModelAddTxn::new(
gnstxn::ModelIDRef::new_ref(space_name, space, model_name, model),
&new_fields,
);
// commit txn
txn_driver.try_commit(txn)?;
global.namespace_txn_driver().lock().try_commit(txn)?;
}
new_fields
.stseq_ord_kv()
@ -294,14 +293,14 @@ impl Model {
}
AlterAction::Remove(removed) => {
let mut guard = model.delta_state().schema_delta_write();
if GI::NONNULL {
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::AlterModelRemoveTxn::new(
gnstxn::ModelIDRef::new_ref(space_name, space, model_name, model),
&removed,
);
// commit txn
txn_driver.try_commit(txn)?;
global.namespace_txn_driver().lock().try_commit(txn)?;
}
removed.iter().for_each(|field_id| {
model.delta_state().schema_append_unresolved_wl_field_rem(
@ -312,14 +311,14 @@ impl Model {
});
}
AlterAction::Update(updated) => {
if GI::NONNULL {
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::AlterModelUpdateTxn::new(
gnstxn::ModelIDRef::new_ref(space_name, space, model_name, model),
&updated,
);
// commit txn
txn_driver.try_commit(txn)?;
global.namespace_txn_driver().lock().try_commit(txn)?;
}
updated.into_iter().for_each(|(field_id, field)| {
iwm.fields_mut().st_update(&field_id, field);
@ -330,9 +329,4 @@ impl Model {
})
})
}
pub fn exec_alter(global: &impl GlobalInstanceLike, stmt: AlterModel) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(global.namespace(), |driver| {
Self::transactional_exec_alter(global, driver, stmt)
})
}
}

@ -204,9 +204,8 @@ impl Model {
}
impl Model {
pub fn transactional_exec_create<GI: gnstxn::GNSTransactionDriverLLInterface>(
global: &impl GlobalInstanceLike,
txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS<GI>,
pub fn transactional_exec_create<G: GlobalInstanceLike>(
global: &G,
stmt: CreateModel,
) -> DatabaseResult<()> {
let (space_name, model_name) = stmt.model_name.parse_entity()?;
@ -216,9 +215,10 @@ impl Model {
if w_space.st_contains(model_name) {
return Err(DatabaseError::DdlModelAlreadyExists);
}
if GI::NONNULL {
if G::FS_IS_NON_NULL {
// prepare txn
let irm = model.intent_read_model();
let mut txn_driver = global.namespace_txn_driver().lock();
let txn = gnstxn::CreateModelTxn::new(
gnstxn::SpaceIDRef::new(space_name, space),
model_name,
@ -233,18 +233,8 @@ impl Model {
Ok(())
})
}
#[cfg(test)]
pub fn nontransactional_exec_create(
global: &impl GlobalInstanceLike,
stmt: CreateModel,
) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(global.namespace(), |driver| {
Self::transactional_exec_create(global, driver, stmt)
})
}
pub fn transactional_exec_drop<GI: gnstxn::GNSTransactionDriverLLInterface>(
global: &impl GlobalInstanceLike,
txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS<GI>,
pub fn transactional_exec_drop<G: GlobalInstanceLike>(
global: &G,
stmt: DropModel,
) -> DatabaseResult<()> {
let (space_name, model_name) = stmt.entity.parse_entity()?;
@ -253,7 +243,7 @@ impl Model {
let Some(model) = w_space.get(model_name) else {
return Err(DatabaseError::DdlModelNotFound);
};
if GI::NONNULL {
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::DropModelTxn::new(gnstxn::ModelIDRef::new(
gnstxn::SpaceIDRef::new(space_name, space),
@ -262,22 +252,13 @@ impl Model {
model.delta_state().schema_current_version().value_u64(),
));
// commit txn
txn_driver.try_commit(txn)?;
global.namespace_txn_driver().lock().try_commit(txn)?;
}
// update global state
let _ = w_space.st_delete(model_name);
Ok(())
})
}
#[cfg(test)]
pub fn nontransactional_exec_drop(
global: &impl GlobalInstanceLike,
stmt: DropModel,
) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(global.namespace(), |driver| {
Self::transactional_exec_drop(global, driver, stmt)
})
}
}
/*

@ -189,9 +189,8 @@ impl Space {
}
impl Space {
pub fn transactional_exec_create<TI: gnstxn::GNSTransactionDriverLLInterface>(
global: &impl GlobalInstanceLike,
txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS<TI>,
pub fn transactional_exec_create<G: GlobalInstanceLike>(
global: &G,
space: CreateSpace,
) -> DatabaseResult<()> {
// process create
@ -202,29 +201,20 @@ impl Space {
return Err(DatabaseError::DdlSpaceAlreadyExists);
}
// commit txn
if TI::NONNULL {
if G::FS_IS_NON_NULL {
// prepare and commit txn
let s_read = space.metadata().dict().read();
txn_driver.try_commit(gnstxn::CreateSpaceTxn::new(&s_read, &space_name, &space))?;
global
.namespace_txn_driver()
.lock()
.try_commit(gnstxn::CreateSpaceTxn::new(&s_read, &space_name, &space))?;
}
// update global state
let _ = wl.st_insert(space_name, space);
Ok(())
}
/// Execute a `create` stmt
#[cfg(test)]
pub fn nontransactional_exec_create(
global: &impl GlobalInstanceLike,
space: CreateSpace,
) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(
global.namespace(),
move |driver| Self::transactional_exec_create(global, driver, space),
)
}
pub fn transactional_exec_alter<TI: gnstxn::GNSTransactionDriverLLInterface>(
global: &impl GlobalInstanceLike,
txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS<TI>,
pub fn transactional_exec_alter<G: GlobalInstanceLike>(
global: &G,
AlterSpace {
space_name,
updated_props,
@ -243,12 +233,12 @@ impl Space {
Some(patch) => patch,
None => return Err(DatabaseError::DdlSpaceBadProperty),
};
if TI::NONNULL {
if G::FS_IS_NON_NULL {
// prepare txn
let txn =
gnstxn::AlterSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, space), &patch);
// commit
txn_driver.try_commit(txn)?;
global.namespace_txn_driver().lock().try_commit(txn)?;
}
// merge
dict::rmerge_data_with_patch(&mut space_props, patch);
@ -261,20 +251,8 @@ impl Space {
Ok(())
})
}
#[cfg(test)]
/// Execute a `alter` stmt
pub fn nontransactional_exec_alter(
global: &impl GlobalInstanceLike,
alter: AlterSpace,
) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(
global.namespace(),
move |driver| Self::transactional_exec_alter(global, driver, alter),
)
}
pub fn transactional_exec_drop<TI: gnstxn::GNSTransactionDriverLLInterface>(
global: &impl GlobalInstanceLike,
txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS<TI>,
pub fn transactional_exec_drop<G: GlobalInstanceLike>(
global: &G,
DropSpace { space, force: _ }: DropSpace,
) -> DatabaseResult<()> {
// TODO(@ohsayan): force remove option
@ -290,25 +268,15 @@ impl Space {
return Err(DatabaseError::DdlSpaceRemoveNonEmpty);
}
// we can remove this
if TI::NONNULL {
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::DropSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, space));
txn_driver.try_commit(txn)?;
global.namespace_txn_driver().lock().try_commit(txn)?;
}
drop(space_w);
let _ = wgns.st_delete(space_name.as_str());
Ok(())
}
#[cfg(test)]
pub fn nontransactional_exec_drop(
global: &impl GlobalInstanceLike,
drop_space: DropSpace,
) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(
global.namespace(),
move |driver| Self::transactional_exec_drop(global, driver, drop_space),
)
}
}
#[cfg(test)]

@ -63,7 +63,7 @@ fn exec_plan(
let tok = lex_insecure(plan.as_bytes()).unwrap();
let alter = parse_ast_node_full::<AlterModel>(&tok[2..]).unwrap();
let (_space, model_name) = alter.model.into_full().unwrap();
Model::exec_alter(global, alter)?;
Model::transactional_exec_alter(global, alter)?;
let gns_read = global.namespace().spaces().read();
let space = gns_read.st_get("myspace").unwrap();
let model = space.models().read();
@ -359,7 +359,7 @@ mod exec {
};
#[test]
fn simple_add() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
super::exec_plan(
&global,
true,
@ -390,7 +390,7 @@ mod exec {
}
#[test]
fn simple_remove() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
super::exec_plan(
&global,
true,
@ -416,7 +416,7 @@ mod exec {
}
#[test]
fn simple_update() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
super::exec_plan(
&global,
true,
@ -435,7 +435,7 @@ mod exec {
}
#[test]
fn failing_alter_nullable_switch_need_lock() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_plan(
&global,

@ -145,7 +145,7 @@ mod exec {
#[test]
fn simple() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
exec_create_new_space(
&global,
"create model myspace.mymodel(username: string, password: binary)",

@ -61,7 +61,7 @@ pub fn exec_create(
.namespace()
.test_new_empty_space(&create_model.model_name.into_full().unwrap().0);
}
Model::nontransactional_exec_create(global, create_model).map(|_| name)
Model::transactional_exec_create(global, create_model).map(|_| name)
}
pub fn exec_create_new_space(

@ -33,7 +33,7 @@ use crate::engine::{
#[test]
fn alter_add_prop_env_var() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
super::exec_create_alter(
&global,
"create space myspace",
@ -54,7 +54,7 @@ fn alter_add_prop_env_var() {
#[test]
fn alter_update_prop_env_var() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
let uuid = super::exec_create(
&global,
"create space myspace with { env: { MY_NEW_PROP: 100 } }",
@ -86,7 +86,7 @@ fn alter_update_prop_env_var() {
#[test]
fn alter_remove_prop_env_var() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
let uuid = super::exec_create(
&global,
"create space myspace with { env: { MY_NEW_PROP: 100 } }",
@ -114,7 +114,7 @@ fn alter_remove_prop_env_var() {
#[test]
fn alter_nx() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_alter(
&global,
@ -128,7 +128,7 @@ fn alter_nx() {
#[test]
fn alter_remove_all_env() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
let uuid = super::exec_create(
&global,
"create space myspace with { env: { MY_NEW_PROP: 100 } }",

@ -33,7 +33,7 @@ use crate::engine::{
#[test]
fn exec_create_space_simple() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
super::exec_create(&global, "create space myspace", |spc| {
assert!(spc.models().read().is_empty())
})
@ -42,7 +42,7 @@ fn exec_create_space_simple() {
#[test]
fn exec_create_space_with_env() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
super::exec_create(
&global,
r#"
@ -70,7 +70,7 @@ fn exec_create_space_with_env() {
#[test]
fn exec_create_space_with_bad_env_type() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_create(&global, "create space myspace with { env: 100 }", |_| {}).unwrap_err(),
DatabaseError::DdlSpaceBadProperty
@ -79,7 +79,7 @@ fn exec_create_space_with_bad_env_type() {
#[test]
fn exec_create_space_with_random_property() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_create(
&global,

@ -47,7 +47,7 @@ fn exec_create(
let ast_node =
ast::parse_ast_node_full::<crate::engine::ql::ddl::crt::CreateSpace>(&tok[2..]).unwrap();
let name = ast_node.space_name;
Space::nontransactional_exec_create(gns, ast_node)?;
Space::transactional_exec_create(gns, ast_node)?;
gns.namespace().with_space(&name, |space| {
verify(space);
Ok(space.get_uuid())
@ -63,7 +63,7 @@ fn exec_alter(
let ast_node =
ast::parse_ast_node_full::<crate::engine::ql::ddl::alt::AlterSpace>(&tok[2..]).unwrap();
let name = ast_node.space_name;
Space::nontransactional_exec_alter(gns, ast_node)?;
Space::transactional_exec_alter(gns, ast_node)?;
gns.namespace().with_space(&name, |space| {
verify(space);
Ok(space.get_uuid())

@ -28,7 +28,7 @@ use crate::engine::{error::DatabaseError, fractal::test_utils::TestGlobal};
#[test]
fn simple_delete() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
super::exec_delete(
&global,
"create model myspace.mymodel(username: string, password: string)",
@ -41,7 +41,7 @@ fn simple_delete() {
#[test]
fn delete_nonexisting() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_delete(
&global,

@ -31,7 +31,7 @@ struct Tuple(Vec<(Box<str>, Datacell)>);
#[test]
fn insert_simple() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
super::exec_insert(
&global,
"create model myspace.mymodel(username: string, password: string)",
@ -46,7 +46,7 @@ fn insert_simple() {
#[test]
fn insert_with_null() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
super::exec_insert(
&global,
"create model myspace.mymodel(username: string, null useless_password: string, null useless_email: string, null useless_random_column: uint64)",
@ -69,7 +69,7 @@ fn insert_with_null() {
#[test]
fn insert_duplicate() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
super::exec_insert(
&global,
"create model myspace.mymodel(username: string, password: string)",

@ -51,7 +51,7 @@ fn _exec_only_create_space_model(
}
let lex_create_model = lex_insecure(model.as_bytes()).unwrap();
let stmt_create_model = parse_ast_node_full(&lex_create_model[2..]).unwrap();
Model::nontransactional_exec_create(global, stmt_create_model)
Model::transactional_exec_create(global, stmt_create_model)
}
fn _exec_only_insert<T>(

@ -28,7 +28,7 @@ use crate::engine::{data::cell::Datacell, error::DatabaseError, fractal::test_ut
#[test]
fn simple_select_wildcard() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_select(
&global,
@ -43,7 +43,7 @@ fn simple_select_wildcard() {
#[test]
fn simple_select_specified_same_order() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_select(
&global,
@ -58,7 +58,7 @@ fn simple_select_specified_same_order() {
#[test]
fn simple_select_specified_reversed_order() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_select(
&global,
@ -73,7 +73,7 @@ fn simple_select_specified_reversed_order() {
#[test]
fn select_null() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_select(
&global,
@ -88,7 +88,7 @@ fn select_null() {
#[test]
fn select_nonexisting() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_select(
&global,

@ -30,7 +30,7 @@ use crate::engine::{
#[test]
fn simple() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_update(
&global,
@ -49,7 +49,7 @@ fn simple() {
#[test]
fn with_null() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_update(
&global,
@ -66,7 +66,7 @@ fn with_null() {
#[test]
fn with_list() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_update(
&global,
@ -86,7 +86,7 @@ fn with_list() {
#[test]
fn fail_operation_on_null() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_update(
&global,
@ -106,7 +106,7 @@ fn fail_operation_on_null() {
#[test]
fn fail_unknown_fields() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_update(
&global,
@ -132,7 +132,7 @@ fn fail_unknown_fields() {
#[test]
fn fail_typedef_violation() {
let global = TestGlobal::empty();
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_update(
&global,

@ -47,6 +47,9 @@ impl FractalGNSDriver {
txn_driver: Mutex::new(txn_driver),
}
}
pub fn txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<LocalFS>> {
&self.txn_driver
}
}
/// Model driver

@ -26,9 +26,12 @@
use {
super::{
core::GlobalNS, data::uuid::Uuid, storage::v1::LocalFS, txn::gns::GNSTransactionDriverAnyFS,
core::GlobalNS,
data::uuid::Uuid,
storage::v1::{LocalFS, RawFSInterface},
txn::gns::GNSTransactionDriverAnyFS,
},
parking_lot::RwLock,
parking_lot::{Mutex, RwLock},
std::{collections::HashMap, mem::MaybeUninit},
tokio::sync::mpsc::unbounded_channel,
};
@ -98,10 +101,16 @@ pub unsafe fn enable_and_start_all(
/// Something that represents the global state
pub trait GlobalInstanceLike {
fn namespace(&self) -> &GlobalNS;
fn post_high_priority_task(&self, task: Task<CriticalTask>);
fn post_standard_priority_task(&self, task: Task<GenericTask>);
type FileSystem: RawFSInterface;
const FS_IS_NON_NULL: bool = Self::FileSystem::NOT_NULL;
// stat
fn get_max_delta_size(&self) -> usize;
// global namespace
fn namespace(&self) -> &GlobalNS;
fn namespace_txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<Self::FileSystem>>;
// taskmgr
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>);
fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>);
// default impls
fn request_batch_resolve(
&self,
@ -110,7 +119,7 @@ pub trait GlobalInstanceLike {
model_uuid: Uuid,
observed_len: usize,
) {
self.post_high_priority_task(Task::new(CriticalTask::WriteBatch(
self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch(
ModelUniqueID::new(space_name, model_name, model_uuid),
observed_len,
)))
@ -118,15 +127,22 @@ pub trait GlobalInstanceLike {
}
impl GlobalInstanceLike for Global {
type FileSystem = LocalFS;
// ns
fn namespace(&self) -> &GlobalNS {
self._namespace()
}
fn post_high_priority_task(&self, task: Task<CriticalTask>) {
fn namespace_txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<Self::FileSystem>> {
self.get_state().gns_driver.txn_driver()
}
// taskmgr
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>) {
self._post_high_priority_task(task)
}
fn post_standard_priority_task(&self, task: Task<GenericTask>) {
fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>) {
self._post_standard_priority_task(task)
}
// stat
fn get_max_delta_size(&self) -> usize {
self._get_max_delta_size()
}

@ -26,43 +26,98 @@
use {
super::{CriticalTask, GenericTask, GlobalInstanceLike, Task},
crate::engine::core::GlobalNS,
parking_lot::RwLock,
crate::engine::{
core::GlobalNS,
storage::v1::{
header_meta::HostRunMode,
memfs::{NullFS, VirtualFS},
RawFSInterface,
},
txn::gns::GNSTransactionDriverAnyFS,
},
parking_lot::{Mutex, RwLock},
};
/// A `test` mode global implementation
pub struct TestGlobal {
pub struct TestGlobal<Fs: RawFSInterface = VirtualFS> {
gns: GlobalNS,
hp_queue: RwLock<Vec<Task<CriticalTask>>>,
lp_queue: RwLock<Vec<Task<GenericTask>>>,
max_delta_size: usize,
txn_driver: Mutex<GNSTransactionDriverAnyFS<Fs>>,
}
impl TestGlobal {
pub fn empty() -> Self {
Self::with_max_delta_size(0)
}
pub fn with_max_delta_size(max_delta_size: usize) -> Self {
impl<Fs: RawFSInterface> TestGlobal<Fs> {
fn new(
gns: GlobalNS,
max_delta_size: usize,
txn_driver: GNSTransactionDriverAnyFS<Fs>,
) -> Self {
Self {
gns: GlobalNS::empty(),
gns,
hp_queue: RwLock::default(),
lp_queue: RwLock::default(),
max_delta_size,
txn_driver: Mutex::new(txn_driver),
}
}
}
impl GlobalInstanceLike for TestGlobal {
impl<Fs: RawFSInterface> TestGlobal<Fs> {
pub fn new_with_driver_id(log_name: &str) -> Self {
let gns = GlobalNS::empty();
let driver = GNSTransactionDriverAnyFS::open_or_reinit_with_name(
&gns,
log_name,
0,
HostRunMode::Prod,
0,
)
.unwrap();
Self::new(gns, 0, driver)
}
}
impl TestGlobal<VirtualFS> {
pub fn new_with_vfs_driver(log_name: &str) -> Self {
Self::new_with_driver_id(log_name)
}
}
impl TestGlobal<NullFS> {
pub fn new_with_nullfs_driver(log_name: &str) -> Self {
Self::new_with_driver_id(log_name)
}
pub fn new_with_tmp_nullfs_driver() -> Self {
Self::new_with_nullfs_driver("")
}
}
impl<Fs: RawFSInterface> GlobalInstanceLike for TestGlobal<Fs> {
type FileSystem = Fs;
fn namespace(&self) -> &GlobalNS {
&self.gns
}
fn post_high_priority_task(&self, task: Task<CriticalTask>) {
fn namespace_txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<Self::FileSystem>> {
&self.txn_driver
}
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>) {
self.hp_queue.write().push(task)
}
fn post_standard_priority_task(&self, task: Task<GenericTask>) {
fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>) {
self.lp_queue.write().push(task)
}
fn get_max_delta_size(&self) -> usize {
100
}
}
impl<Fs: RawFSInterface> Drop for TestGlobal<Fs> {
fn drop(&mut self) {
let mut txn_driver = self.txn_driver.lock();
txn_driver
.__journal_mut()
.__append_journal_close_and_close()
.unwrap();
}
}

@ -91,7 +91,7 @@ impl<E: TreeElement, C: Config> MTIndex<E, E::Key, E::Value> for Raw<E, C> {
self.len()
}
fn mt_clear(&self, g: &Guard) {
self.nontransactional_clear(g)
self.transactional_clear(g)
}
fn mt_insert(&self, e: E, g: &Guard) -> bool

@ -236,7 +236,7 @@ impl<T: TreeElement, C: Config> RawTree<T, C> {
}
impl<T: TreeElement, C: Config> RawTree<T, C> {
fn nontransactional_clear(&self, g: &Guard) {
fn transactional_clear(&self, g: &Guard) {
self.iter_key(g).for_each(|k| {
let _ = self.remove(k, g);
});

@ -470,7 +470,7 @@ impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {
&JournalEntryMetadata::new(id, EventSourceMarker::DRIVER_REOPENED, 0, 0).encoded(),
)
}
pub fn append_journal_close_and_close(mut self) -> SDSSResult<()> {
pub fn __append_journal_close_and_close(&mut self) -> SDSSResult<()> {
self.closed = true;
let id = self._incr_id() as u128;
self.log_file.fsynced_write(
@ -478,6 +478,9 @@ impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {
)?;
Ok(())
}
pub fn append_journal_close_and_close(mut self) -> SDSSResult<()> {
self.__append_journal_close_and_close()
}
}
impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {

@ -66,44 +66,14 @@ pub type GNSTransactionDriverVFS = GNSTransactionDriverAnyFS<VirtualFS>;
const CURRENT_LOG_VERSION: u32 = 0;
pub trait GNSTransactionDriverLLInterface: RawFSInterface {
/// If true, this is an actual txn driver with a non-null (not `/dev/null` like) journal
const NONNULL: bool = <Self as RawFSInterface>::NOT_NULL;
}
impl<T: RawFSInterface> GNSTransactionDriverLLInterface for T {}
/// The GNS transaction driver is used to handle DDL transactions
pub struct GNSTransactionDriverAnyFS<F: RawFSInterface = LocalFS> {
journal: JournalWriter<F, GNSAdapter>,
}
impl GNSTransactionDriverAnyFS<crate::engine::storage::v1::NullFS> {
pub fn nullzero(gns: &GlobalNS) -> Self {
let journal = v1::open_journal(
"gns.db-tlog",
header_meta::FileSpecifier::GNSTxnLog,
header_meta::FileSpecifierVersion::__new(CURRENT_LOG_VERSION),
0,
header_meta::HostRunMode::Dev,
0,
gns,
)
.unwrap();
Self { journal }
}
pub fn nullzero_create_exec<T>(gns: &GlobalNS, f: impl FnOnce(&mut Self) -> T) -> T {
let mut j = Self::nullzero(gns);
let r = f(&mut j);
j.close().unwrap();
r
}
}
impl<F: GNSTransactionDriverLLInterface> GNSTransactionDriverAnyFS<F> {
pub fn close(self) -> TransactionResult<()> {
self.journal
.append_journal_close_and_close()
.map_err(|e| e.into())
impl<Fs: RawFSInterface> GNSTransactionDriverAnyFS<Fs> {
pub fn __journal_mut(&mut self) -> &mut JournalWriter<Fs, GNSAdapter> {
&mut self.journal
}
pub fn open_or_reinit_with_name(
gns: &GlobalNS,
@ -141,7 +111,7 @@ impl<F: GNSTransactionDriverLLInterface> GNSTransactionDriverAnyFS<F> {
/// the journal adapter for DDL queries on the GNS
#[derive(Debug)]
struct GNSAdapter;
pub struct GNSAdapter;
impl JournalAdapter for GNSAdapter {
const RECOVERY_PLUGIN: bool = true;

@ -38,8 +38,6 @@ use crate::engine::{
ddl::crt::{CreateModel, CreateSpace},
tests::lex_insecure,
},
storage::v1::header_meta::HostRunMode,
txn::gns::GNSTransactionDriverVFS,
};
fn multirun(f: impl FnOnce() + Copy) {
@ -52,28 +50,12 @@ fn with_variable<T>(var: T, f: impl FnOnce(T)) {
f(var);
}
fn init_txn_driver(global: &impl GlobalInstanceLike, log_name: &str) -> GNSTransactionDriverVFS {
GNSTransactionDriverVFS::open_or_reinit_with_name(
global.namespace(),
log_name,
0,
HostRunMode::Prod,
0,
)
.unwrap()
}
fn init_space(
global: &impl GlobalInstanceLike,
driver: &mut GNSTransactionDriverVFS,
space_name: &str,
env: &str,
) -> Uuid {
fn init_space(global: &impl GlobalInstanceLike, space_name: &str, env: &str) -> Uuid {
let query = format!("create space {space_name} with {{ env: {env} }}");
let stmt = lex_insecure(query.as_bytes()).unwrap();
let stmt = parse_ast_node_full::<CreateSpace>(&stmt[2..]).unwrap();
let name = stmt.space_name;
Space::transactional_exec_create(global, driver, stmt).unwrap();
Space::transactional_exec_create(global, stmt).unwrap();
global
.namespace()
.spaces()
@ -89,14 +71,11 @@ fn create_space() {
let uuid;
// start 1
{
let global = TestGlobal::empty();
let mut driver = init_txn_driver(&global, log_name);
uuid = init_space(&global, &mut driver, "myspace", "{ SAYAN_MAX: 65536 }"); // good lord that doesn't sound like a good variable
driver.close().unwrap();
let global = TestGlobal::new_with_vfs_driver(log_name);
uuid = init_space(&global, "myspace", "{ SAYAN_MAX: 65536 }"); // good lord that doesn't sound like a good variable
}
multirun(|| {
let global = TestGlobal::empty();
let driver = init_txn_driver(&global, log_name);
let global = TestGlobal::new_with_vfs_driver(log_name);
assert_eq!(
global.namespace().spaces().read().get("myspace").unwrap(),
&Space::new_restore_empty(
@ -106,7 +85,6 @@ fn create_space() {
uuid
)
);
driver.close().unwrap();
})
})
}
@ -116,19 +94,16 @@ fn alter_space() {
with_variable("alter_space_test.global.db-tlog", |log_name| {
let uuid;
{
let global = TestGlobal::empty();
let mut driver = init_txn_driver(&global, log_name);
uuid = init_space(&global, &mut driver, "myspace", "{}");
let global = TestGlobal::new_with_vfs_driver(log_name);
uuid = init_space(&global, "myspace", "{}");
let stmt =
lex_insecure("alter space myspace with { env: { SAYAN_MAX: 65536 } }".as_bytes())
.unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Space::transactional_exec_alter(&global, &mut driver, stmt).unwrap();
driver.close().unwrap();
Space::transactional_exec_alter(&global, stmt).unwrap();
}
multirun(|| {
let global = TestGlobal::empty();
let driver = init_txn_driver(&global, log_name);
let global = TestGlobal::new_with_vfs_driver(log_name);
assert_eq!(
global.namespace().spaces().read().get("myspace").unwrap(),
&Space::new_restore_empty(
@ -138,7 +113,6 @@ fn alter_space() {
uuid
)
);
driver.close().unwrap();
})
})
}
@ -147,26 +121,21 @@ fn alter_space() {
fn drop_space() {
with_variable("drop_space_test.global.db-tlog", |log_name| {
{
let global = TestGlobal::empty();
let mut driver = init_txn_driver(&global, log_name);
let _ = init_space(&global, &mut driver, "myspace", "{}");
let global = TestGlobal::new_with_vfs_driver(log_name);
let _ = init_space(&global, "myspace", "{}");
let stmt = lex_insecure("drop space myspace".as_bytes()).unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Space::transactional_exec_drop(&global, &mut driver, stmt).unwrap();
driver.close().unwrap();
Space::transactional_exec_drop(&global, stmt).unwrap();
}
multirun(|| {
let global = TestGlobal::empty();
let driver = init_txn_driver(&global, log_name);
let global = TestGlobal::new_with_vfs_driver(log_name);
assert_eq!(global.namespace().spaces().read().get("myspace"), None);
driver.close().unwrap();
})
})
}
fn init_model(
global: &impl GlobalInstanceLike,
txn_driver: &mut GNSTransactionDriverVFS,
space_name: &str,
model_name: &str,
decl: &str,
@ -175,20 +144,16 @@ fn init_model(
let stmt = lex_insecure(query.as_bytes()).unwrap();
let stmt = parse_ast_node_full::<CreateModel>(&stmt[2..]).unwrap();
let model_name = stmt.model_name;
Model::transactional_exec_create(global, txn_driver, stmt).unwrap();
Model::transactional_exec_create(global, stmt).unwrap();
global
.namespace()
.with_model(model_name, |model| Ok(model.get_uuid()))
.unwrap()
}
fn init_default_model(
global: &impl GlobalInstanceLike,
driver: &mut GNSTransactionDriverVFS,
) -> Uuid {
fn init_default_model(global: &impl GlobalInstanceLike) -> Uuid {
init_model(
global,
driver,
"myspace",
"mymodel",
"username: string, password: binary",
@ -201,15 +166,12 @@ fn create_model() {
let _uuid_space;
let uuid_model;
{
let global = TestGlobal::empty();
let mut driver = init_txn_driver(&global, log_name);
_uuid_space = init_space(&global, &mut driver, "myspace", "{}");
uuid_model = init_default_model(&global, &mut driver);
driver.close().unwrap();
let global = TestGlobal::new_with_vfs_driver(log_name);
_uuid_space = init_space(&global, "myspace", "{}");
uuid_model = init_default_model(&global);
}
multirun(|| {
let global = TestGlobal::empty();
let driver = init_txn_driver(&global, log_name);
let global = TestGlobal::new_with_vfs_driver(log_name);
global
.namespace()
.with_model(("myspace", "mymodel"), |model| {
@ -228,7 +190,6 @@ fn create_model() {
Ok(())
})
.unwrap();
driver.close().unwrap();
})
})
}
@ -237,21 +198,18 @@ fn create_model() {
fn alter_model_add() {
with_variable("alter_model_add_test.global.db-tlog", |log_name| {
{
let global = TestGlobal::empty();
let mut driver = init_txn_driver(&global, log_name);
init_space(&global, &mut driver, "myspace", "{}");
init_default_model(&global, &mut driver);
let global = TestGlobal::new_with_vfs_driver(log_name);
init_space(&global, "myspace", "{}");
init_default_model(&global);
let stmt = lex_insecure(
b"alter model myspace.mymodel add profile_pic { type: binary, nullable: true }",
)
.unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Model::transactional_exec_alter(&global, &mut driver, stmt).unwrap();
driver.close().unwrap();
Model::transactional_exec_alter(&global, stmt).unwrap();
}
multirun(|| {
let global = TestGlobal::empty();
let driver = init_txn_driver(&global, log_name);
let global = TestGlobal::new_with_vfs_driver(log_name);
global
.namespace()
.with_model(("myspace", "mymodel"), |model| {
@ -266,7 +224,6 @@ fn alter_model_add() {
Ok(())
})
.unwrap();
driver.close().unwrap();
})
})
}
@ -275,12 +232,10 @@ fn alter_model_add() {
fn alter_model_remove() {
with_variable("alter_model_remove_test.global.db-tlog", |log_name| {
{
let global = TestGlobal::empty();
let mut driver = init_txn_driver(&global, log_name);
init_space(&global, &mut driver, "myspace", "{}");
let global = TestGlobal::new_with_vfs_driver(log_name);
init_space(&global, "myspace", "{}");
init_model(
&global,
&mut driver,
"myspace",
"mymodel",
"username: string, password: binary, null profile_pic: binary, null has_2fa: bool, null has_secure_key: bool, is_dumb: bool",
@ -290,12 +245,10 @@ fn alter_model_remove() {
)
.unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Model::transactional_exec_alter(&global, &mut driver, stmt).unwrap();
driver.close().unwrap()
Model::transactional_exec_alter(&global, stmt).unwrap();
}
multirun(|| {
let global = TestGlobal::empty();
let driver = init_txn_driver(&global, log_name);
let global = TestGlobal::new_with_vfs_driver(log_name);
global
.namespace()
.with_model(("myspace", "mymodel"), |model| {
@ -305,7 +258,6 @@ fn alter_model_remove() {
Ok(())
})
.unwrap();
driver.close().unwrap()
})
})
}
@ -314,12 +266,10 @@ fn alter_model_remove() {
fn alter_model_update() {
with_variable("alter_model_update_test.global.db-tlog", |log_name| {
{
let global = TestGlobal::empty();
let mut driver = init_txn_driver(&global, log_name);
init_space(&global, &mut driver, "myspace", "{}");
let global = TestGlobal::new_with_vfs_driver(log_name);
init_space(&global, "myspace", "{}");
init_model(
&global,
&mut driver,
"myspace",
"mymodel",
"username: string, password: binary, profile_pic: binary",
@ -328,12 +278,10 @@ fn alter_model_update() {
lex_insecure(b"alter model myspace.mymodel update profile_pic { nullable: true }")
.unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Model::transactional_exec_alter(&global, &mut driver, stmt).unwrap();
driver.close().unwrap();
Model::transactional_exec_alter(&global, stmt).unwrap();
}
multirun(|| {
let global = TestGlobal::empty();
let driver = init_txn_driver(&global, log_name);
let global = TestGlobal::new_with_vfs_driver(log_name);
global
.namespace()
.with_model(("myspace", "mymodel"), |model| {
@ -348,7 +296,6 @@ fn alter_model_update() {
Ok(())
})
.unwrap();
driver.close().unwrap();
})
})
}
@ -357,18 +304,15 @@ fn alter_model_update() {
fn drop_model() {
with_variable("drop_model_test.global.db-tlog", |log_name| {
{
let global = TestGlobal::empty();
let mut driver = init_txn_driver(&global, log_name);
init_space(&global, &mut driver, "myspace", "{}");
init_default_model(&global, &mut driver);
let global = TestGlobal::new_with_vfs_driver(log_name);
init_space(&global, "myspace", "{}");
init_default_model(&global);
let stmt = lex_insecure(b"drop model myspace.mymodel").unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Model::transactional_exec_drop(&global, &mut driver, stmt).unwrap();
driver.close().unwrap();
Model::transactional_exec_drop(&global, stmt).unwrap();
}
multirun(|| {
let global = TestGlobal::empty();
let driver = init_txn_driver(&global, log_name);
let global = TestGlobal::new_with_vfs_driver(log_name);
assert_eq!(
global
.namespace()
@ -376,7 +320,6 @@ fn drop_model() {
.unwrap_err(),
DatabaseError::DdlModelNotFound
);
driver.close().unwrap();
})
})
}

Loading…
Cancel
Save