Wire up queries to SE

next
Sayan Nandan 1 year ago
parent 9102834623
commit fac2802849
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -40,7 +40,7 @@ use {
uuid::Uuid,
},
error::{Error, QueryResult},
fractal::GlobalInstanceLike,
fractal::{GenericTask, GlobalInstanceLike, Task},
idx::{IndexBaseSpec, IndexSTSeqCns, STIndex, STIndexSeq},
mem::VInline,
ql::ddl::{
@ -216,17 +216,38 @@ impl Model {
return Err(Error::QPDdlObjectAlreadyExists);
}
if G::FS_IS_NON_NULL {
// prepare txn
let irm = model.intent_read_model();
let mut txn_driver = global.namespace_txn_driver().lock();
// prepare txn
let txn = gnstxn::CreateModelTxn::new(
gnstxn::SpaceIDRef::new(space_name, space),
model_name,
&model,
&irm,
);
// attempt to initialize driver
global.initialize_model_driver(
space_name,
space.get_uuid(),
model_name,
model.get_uuid(),
)?;
// commit txn
txn_driver.try_commit(txn)?;
match txn_driver.try_commit(txn) {
Ok(()) => {}
Err(e) => {
// failed to commit, delete this
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_model_dir(
space_name,
space.get_uuid(),
model_name,
model.get_uuid(),
),
));
return Err(e.into());
}
}
}
// update global state
let _ = w_space.st_insert(model_name.into(), model);
@ -253,6 +274,13 @@ impl Model {
));
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
// ask for cleanup
global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir(
space_name,
space.get_uuid(),
model_name,
model.get_uuid(),
)));
}
// update global state
let _ = w_space.st_delete(model_name);

@ -29,9 +29,10 @@ use {
core::{model::Model, RWLIdx},
data::{dict, uuid::Uuid, DictEntryGeneric, DictGeneric},
error::{Error, QueryResult},
fractal::GlobalInstanceLike,
fractal::{GenericTask, GlobalInstanceLike, Task},
idx::{IndexST, STIndex},
ql::ddl::{alt::AlterSpace, crt::CreateSpace, drop::DropSpace},
storage::v1::{loader::SEInitState, RawFSInterface},
txn::gns as gnstxn,
},
parking_lot::RwLock,
@ -203,12 +204,25 @@ impl Space {
}
// commit txn
if G::FS_IS_NON_NULL {
// prepare and commit txn
// prepare txn
let s_read = space.metadata().dict().read();
global
.namespace_txn_driver()
.lock()
.try_commit(gnstxn::CreateSpaceTxn::new(&s_read, &space_name, &space))?;
let txn = gnstxn::CreateSpaceTxn::new(&s_read, &space_name, &space);
// try to create space for...the space
G::FileSystem::fs_create_dir_all(&SEInitState::space_dir(
&space_name,
space.get_uuid(),
))?;
// commit txn
match global.namespace_txn_driver().lock().try_commit(txn) {
Ok(()) => {}
Err(e) => {
// tell fractal to clean it up sometime
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()),
));
return Err(e.into());
}
}
}
// update global state
let _ = wl.st_insert(space_name, space);
@ -272,7 +286,13 @@ impl Space {
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::DropSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, space));
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
// ask for cleanup
global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir(
&space_name,
space.get_uuid(),
)));
}
drop(space_w);
let _ = wgns.st_delete(space_name.as_str());

@ -27,7 +27,10 @@
use {
super::ModelUniqueID,
crate::{
engine::core::model::{delta::DataDelta, Model},
engine::{
core::model::{delta::DataDelta, Model},
data::uuid::Uuid,
},
util::os,
},
std::path::PathBuf,
@ -64,6 +67,28 @@ pub enum GenericTask {
DeleteDirAll(PathBuf),
}
impl GenericTask {
pub fn delete_model_dir(
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> Self {
Self::DeleteDirAll(
crate::engine::storage::v1::loader::SEInitState::model_dir(
space_name, space_uuid, model_name, model_uuid,
)
.into(),
)
}
pub fn delete_space_dir(space_name: &str, space_uuid: Uuid) -> Self {
Self::DeleteDirAll(
crate::engine::storage::v1::loader::SEInitState::space_dir(space_name, space_uuid)
.into(),
)
}
}
/// A critical task
pub enum CriticalTask {
/// Write a new data batch

@ -28,7 +28,7 @@ use {
super::{
core::GlobalNS,
data::uuid::Uuid,
storage::v1::{LocalFS, RawFSInterface},
storage::v1::{LocalFS, RawFSInterface, SDSSResult},
txn::gns::GNSTransactionDriverAnyFS,
},
parking_lot::{Mutex, RwLock},
@ -108,6 +108,14 @@ pub trait GlobalInstanceLike {
// global namespace
fn namespace(&self) -> &GlobalNS;
fn namespace_txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<Self::FileSystem>>;
// model drivers
fn initialize_model_driver(
&self,
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> SDSSResult<()>;
// taskmgr
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>);
fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>);
@ -152,6 +160,16 @@ impl GlobalInstanceLike for Global {
fn sys_cfg(&self) -> &config::SysConfig {
&self.get_state().config
}
// model
fn initialize_model_driver(
&self,
_space_name: &str,
_space_uuid: Uuid,
_model_name: &str,
_model_uuid: Uuid,
) -> SDSSResult<()> {
todo!()
}
}
#[derive(Debug)]

@ -115,6 +115,15 @@ impl<Fs: RawFSInterface> GlobalInstanceLike for TestGlobal<Fs> {
fn sys_cfg(&self) -> &super::config::SysConfig {
&self.sys_cfg
}
fn initialize_model_driver(
&self,
_space_name: &str,
_space_uuid: crate::engine::data::uuid::Uuid,
_model_name: &str,
_model_uuid: crate::engine::data::uuid::Uuid,
) -> crate::engine::storage::v1::SDSSResult<()> {
todo!()
}
}
impl<Fs: RawFSInterface> Drop for TestGlobal<Fs> {

@ -92,12 +92,26 @@ impl SEInitState {
}
Ok(SEInitState::new(gns_txn_driver, model_drivers, gns))
}
fn model_path(
pub fn model_path(
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> String {
format!("data/{space_name}-{space_uuid}/{model_name}-{model_uuid}/data.db-btlog")
format!(
"{}/data.db-btlog",
Self::model_dir(space_name, space_uuid, model_name, model_uuid)
)
}
pub fn model_dir(
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> String {
format!("data/{space_name}-{space_uuid}/mdl_{model_name}-{model_uuid}")
}
pub fn space_dir(space_name: &str, space_uuid: Uuid) -> String {
format!("data/{space_name}-{space_uuid}")
}
}

@ -29,7 +29,7 @@ mod header_impl;
// impls
mod batch_jrnl;
mod journal;
mod loader;
pub(in crate::engine) mod loader;
mod rw;
mod sysdb;
// hl

Loading…
Cancel
Save