Keep driver and data in same container

next
Sayan Nandan 7 months ago
parent 0573e80992
commit 982898b819
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -62,6 +62,7 @@ fn alter_user(
let (username, password) = get_user_data(user)?;
global
.state()
.namespace()
.sys_db()
.alter_user(global, &username, &password)
}
@ -70,6 +71,7 @@ fn create_user(global: &impl GlobalInstanceLike, user: UserDecl) -> QueryResult<
let (username, password) = get_user_data(user)?;
global
.state()
.namespace()
.sys_db()
.create_user(global, username.into_boxed_str(), &password)
}
@ -99,6 +101,7 @@ fn drop_user(
}
global
.state()
.namespace()
.sys_db()
.drop_user(global, user_del.username())
}

@ -39,7 +39,7 @@ pub fn inspect(
let ret = match stmt {
Inspect::Global => {
// collect spaces
let spaces = g.state().idx().read();
let spaces = g.state().namespace().idx().read();
let mut spaces_iter = spaces.iter().peekable();
let mut ret = format!("{{\"spaces\":[");
while let Some((space, _)) = spaces_iter.next() {
@ -56,7 +56,7 @@ pub fn inspect(
drop(spaces_iter);
drop(spaces);
// collect users
let users = g.state().sys_db().users().read();
let users = g.state().namespace().sys_db().users().read();
let mut users_iter = users.iter().peekable();
while let Some((user, _)) = users_iter.next() {
ret.push('"');
@ -70,15 +70,18 @@ pub fn inspect(
ret.push_str("],\"settings\":{}}");
ret
}
Inspect::Model(m) => match g.state().idx_models().read().get(&m) {
Some(m) => format!(
Inspect::Model(m) => match g.state().namespace().idx_models().read().get(&m) {
Some(m) => {
let m = m.data();
format!(
"{{\"decl\":\"{}\",\"rows\":{},\"properties\":{{}}}}",
m.describe(),
m.primary_index().count()
),
)
}
None => return Err(QueryError::QExecObjectNotFound),
},
Inspect::Space(s) => match g.state().idx().read().get(s.as_str()) {
Inspect::Space(s) => match g.state().namespace().idx().read().get(s.as_str()) {
Some(s) => {
let mut ret = format!("{{\"models\":[");
let mut models_iter = s.models().iter().peekable();

@ -29,7 +29,7 @@ use crate::engine::{
self,
dml::QueryExecMeta,
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{delta::DataDeltaKind, Model},
model::{delta::DataDeltaKind, ModelData},
},
error::{QueryError, QueryResult},
fractal::GlobalInstanceLike,
@ -67,7 +67,7 @@ pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> Quer
// TODO(@ohsayan): optimize null case
fn prepare_insert(
model: &Model,
model: &ModelData,
insert: InsertData,
) -> QueryResult<(PrimaryIndexKey, DcFieldIndex)> {
let fields = model.fields();

@ -31,7 +31,7 @@ mod upd;
use crate::{
engine::{
core::model::Model,
core::model::ModelData,
data::{lit::Lit, tag::DataTag},
error::{QueryError, QueryResult},
ql::dml::WhereClause,
@ -53,7 +53,7 @@ pub use {
upd::update_resp,
};
impl Model {
impl ModelData {
pub(self) fn resolve_where<'a>(
&self,
where_clause: &mut WhereClause<'a>,

@ -29,7 +29,7 @@ use crate::engine::{
index::{
DcFieldIndex, IndexLatchHandleExclusive, PrimaryIndexKey, Row, RowData, RowDataLck,
},
model::Model,
model::ModelData,
},
data::{
cell::{Datacell, VirtualDatacell},
@ -91,10 +91,10 @@ pub fn select_all<Fm, F, T>(
mut f: F,
) -> QueryResult<usize>
where
Fm: FnMut(&mut T, &Model, usize),
Fm: FnMut(&mut T, &ModelData, usize),
F: FnMut(&mut T, &Datacell, usize),
{
global.state().with_model(select.entity, |mdl| {
global.state().namespace().with_model(select.entity, |mdl| {
let g = sync::atm::cpin();
let mut i = 0;
if select.wildcard {
@ -181,7 +181,10 @@ pub fn select_custom<F>(
where
F: FnMut(&Datacell),
{
global.state().with_model(select.entity(), |mdl| {
global
.state()
.namespace()
.with_model(select.entity(), |mdl| {
let target_key = mdl.resolve_where(select.clauses_mut())?;
let pkdc = VirtualDatacell::new(target_key.clone(), mdl.p_tag().tag_unique());
let g = sync::atm::cpin();
@ -214,14 +217,14 @@ where
struct RowIteratorAll<'g> {
_g: &'g sync::atm::Guard,
mdl: &'g Model,
mdl: &'g ModelData,
iter: <IndexMTRaw<Row> as MTIndexExt<Row, PrimaryIndexKey, RowDataLck>>::IterEntry<'g, 'g, 'g>,
_latch: IndexLatchHandleExclusive<'g>,
limit: usize,
}
impl<'g> RowIteratorAll<'g> {
fn new(g: &'g sync::atm::Guard, mdl: &'g Model, limit: usize) -> Self {
fn new(g: &'g sync::atm::Guard, mdl: &'g ModelData, limit: usize) -> Self {
let idx = mdl.primary_index();
let latch = idx.acquire_exclusive();
Self {

@ -25,7 +25,7 @@
*/
use crate::engine::{
core::{ddl_misc, dml, model::Model, space::Space},
core::{ddl_misc, dml, model::ModelData, space::Space},
error::{QueryError, QueryResult},
fractal::{Global, GlobalInstanceLike},
net::protocol::{ClientLocalState, Response, ResponseType, SQuery},
@ -152,14 +152,25 @@ async fn run_blocking_stmt(
_callgs_map(
&g,
t,
Model::transactional_exec_create,
ModelData::transactional_exec_create,
translate_ddl_result,
)
},
|g, _, t| _callgs_map(&g, t, Space::transactional_exec_alter, |_| Response::Empty),
|g, _, t| _callgs_map(&g, t, Model::transactional_exec_alter, |_| Response::Empty),
|g, _, t| {
_callgs_map(&g, t, ModelData::transactional_exec_alter, |_| {
Response::Empty
})
},
|g, _, t| _callgs_map(&g, t, Space::transactional_exec_drop, translate_ddl_result),
|g, _, t| _callgs_map(&g, t, Model::transactional_exec_drop, translate_ddl_result),
|g, _, t| {
_callgs_map(
&g,
t,
ModelData::transactional_exec_drop,
translate_ddl_result,
)
},
];
let r = unsafe {
// UNSAFE(@ohsayan): the only await is within this block
@ -201,7 +212,11 @@ fn cstate_use(
NB: just like SQL, we don't really care about what this is set to as it's basically a shorthand.
so we do a simple vanity check
*/
if !global.state().contains_space(new_space.as_str()) {
if !global
.state()
.namespace()
.contains_space(new_space.as_str())
{
return Err(QueryError::QExecObjectNotFound);
}
cstate.set_cs(new_space.boxed_str());
@ -209,7 +224,7 @@ fn cstate_use(
Use::RefreshCurrent => match cstate.get_cs() {
None => return Ok(Response::Null),
Some(space) => {
if !global.state().contains_space(space) {
if !global.state().namespace().contains_space(space) {
cstate.unset_cs();
return Err(QueryError::QExecObjectNotFound);
}

@ -40,13 +40,17 @@ mod util;
pub(super) mod tests;
// re-exports
pub use self::util::{EntityID, EntityIDRef};
// imports
use {
self::{dml::QueryExecMeta, model::Model},
super::fractal::GlobalInstanceLike,
self::{
dml::QueryExecMeta,
model::{Model, ModelData},
},
crate::engine::{
core::space::Space,
error::{QueryError, QueryResult},
fractal::{FractalGNSDriver, GlobalInstanceLike},
idx::IndexST,
},
parking_lot::RwLock,
@ -57,14 +61,32 @@ use {
/// but something better is in the offing
type RWLIdx<K, V> = RwLock<IndexST<K, V>>;
#[cfg_attr(test, derive(Debug))]
#[derive(Debug)]
pub struct GlobalNS {
data: GNSData,
driver: FractalGNSDriver,
}
impl GlobalNS {
pub fn new(data: GNSData, driver: FractalGNSDriver) -> Self {
Self { data, driver }
}
pub fn namespace(&self) -> &GNSData {
&self.data
}
pub fn gns_driver(&self) -> &FractalGNSDriver {
&self.driver
}
}
#[derive(Debug)]
pub struct GNSData {
idx_mdl: RWLIdx<EntityID, Model>,
idx: RWLIdx<Box<str>, Space>,
sys_db: system_db::SystemDatabase,
}
impl GlobalNS {
impl GNSData {
pub fn empty() -> Self {
Self {
idx_mdl: RWLIdx::default(),
@ -104,7 +126,7 @@ impl GlobalNS {
f: F,
) -> QueryResult<T>
where
F: FnOnce(&Space, &mut Model) -> QueryResult<T>,
F: FnOnce(&Space, &mut ModelData) -> QueryResult<T>,
{
let mut mdl_idx = self.idx_mdl.write();
let Some(model) = mdl_idx.get_mut(&entity) else {
@ -112,17 +134,17 @@ impl GlobalNS {
};
let space_read = self.idx.read();
let space = space_read.get(entity.space()).unwrap();
f(space, model)
f(space, model.data_mut())
}
pub fn with_model<'a, T, F>(&self, entity: EntityIDRef<'a>, f: F) -> QueryResult<T>
where
F: FnOnce(&Model) -> QueryResult<T>,
F: FnOnce(&ModelData) -> QueryResult<T>,
{
let mdl_idx = self.idx_mdl.read();
let Some(model) = mdl_idx.get(&entity) else {
return Err(QueryError::QExecObjectNotFound);
};
f(model)
f(model.data())
}
pub fn idx_models(&self) -> &RWLIdx<EntityID, Model> {
&self.idx_mdl
@ -151,13 +173,19 @@ pub(self) fn with_model_for_data_update<'a, F>(
f: F,
) -> QueryResult<()>
where
F: FnOnce(&Model) -> QueryResult<QueryExecMeta>,
F: FnOnce(&ModelData) -> QueryResult<QueryExecMeta>,
{
let mdl_idx = global.state().idx_mdl.read();
let mdl_idx = global.state().namespace().idx_mdl.read();
let Some(model) = mdl_idx.get(&entity) else {
return Err(QueryError::QExecObjectNotFound);
};
let r = f(model)?;
model::DeltaState::guard_delta_overflow(global, entity.space(), entity.entity(), model, r);
let r = f(model.data())?;
model::DeltaState::guard_delta_overflow(
global,
entity.space(),
entity.entity(),
model.data(),
r,
);
Ok(())
}

@ -25,7 +25,7 @@
*/
use {
super::{Field, Layer, Model},
super::{Field, Layer, ModelData},
crate::{
engine::{
core::EntityIDRef,
@ -76,7 +76,7 @@ macro_rules! can_ignore {
}
#[inline(always)]
fn no_field(mr: &Model, new: &str) -> bool {
fn no_field(mr: &ModelData, new: &str) -> bool {
!mr.fields().st_contains(new)
}
@ -90,7 +90,7 @@ fn check_nullable(props: &mut HashMap<Box<str>, DictEntryGeneric>) -> QueryResul
impl<'a> AlterPlan<'a> {
pub fn fdeltas(
mdl: &Model,
mdl: &ModelData,
AlterModel { model, kind }: AlterModel<'a>,
) -> QueryResult<AlterPlan<'a>> {
let mut no_lock = true;
@ -245,7 +245,7 @@ impl<'a> AlterPlan<'a> {
}
}
impl Model {
impl ModelData {
pub fn transactional_exec_alter<G: GlobalInstanceLike>(
global: &G,
alter: AlterModel,
@ -253,6 +253,7 @@ impl Model {
let (space_name, model_name) = (alter.model.space(), alter.model.entity());
global
.state()
.namespace()
.with_model_space_mut_for_ddl(alter.model, |space, model| {
// prepare plan
let plan = AlterPlan::fdeltas(model, alter)?;
@ -274,8 +275,8 @@ impl Model {
);
// commit txn
global
.state()
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
let mut mutator = model.model_mutator();
new_fields
@ -293,8 +294,8 @@ impl Model {
);
// commit txn
global
.state()
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
let mut mutator = model.model_mutator();
removed.iter().for_each(|field_id| {
@ -309,8 +310,8 @@ impl Model {
);
// commit txn
global
.state()
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
let mut mutator = model.model_mutator();
updated.into_iter().for_each(|(field_id, field)| {

@ -25,7 +25,7 @@
*/
use {
super::Model,
super::ModelData,
crate::engine::{
core::{dml::QueryExecMeta, index::Row},
fractal::{FractalToken, GlobalInstanceLike},
@ -74,7 +74,7 @@ impl DeltaState {
global: &impl GlobalInstanceLike,
space_name: &str,
model_name: &str,
model: &Model,
model: &ModelData,
hint: QueryExecMeta,
) {
global.request_batch_resolve_if_cache_full(space_name, model_name, model, hint)

@ -36,7 +36,7 @@ use {
uuid::Uuid,
},
error::{QueryError, QueryResult},
fractal::{GenericTask, GlobalInstanceLike, Task},
fractal::{FractalModelDriver, GenericTask, GlobalInstanceLike, Task},
idx::{self, IndexBaseSpec, IndexSTSeqCns, STIndex, STIndexSeq},
mem::{RawStr, VInline},
ql::ddl::{
@ -56,6 +56,30 @@ type Fields = IndexSTSeqCns<RawStr, Field>;
#[derive(Debug)]
pub struct Model {
data: ModelData,
driver: FractalModelDriver,
}
impl Model {
pub fn new(data: ModelData, driver: FractalModelDriver) -> Self {
Self { data, driver }
}
pub fn data(&self) -> &ModelData {
&self.data
}
pub fn data_mut(&mut self) -> &mut ModelData {
&mut self.data
}
pub fn driver(&self) -> &FractalModelDriver {
&self.driver
}
pub fn into_driver(self) -> FractalModelDriver {
self.driver
}
}
#[derive(Debug)]
pub struct ModelData {
uuid: Uuid,
p_key: RawStr,
p_tag: FullTag,
@ -67,7 +91,7 @@ pub struct Model {
}
#[cfg(test)]
impl PartialEq for Model {
impl PartialEq for ModelData {
fn eq(&self, m: &Self) -> bool {
self.uuid == m.uuid
&& self.p_key == m.p_key
@ -76,7 +100,7 @@ impl PartialEq for Model {
}
}
impl Model {
impl ModelData {
pub fn get_uuid(&self) -> Uuid {
self.uuid
}
@ -153,7 +177,7 @@ impl Model {
}
}
impl Model {
impl ModelData {
fn new_with_private(
uuid: Uuid,
p_key: RawStr,
@ -260,7 +284,7 @@ impl Model {
}
}
impl Model {
impl ModelData {
pub fn transactional_exec_create<G: GlobalInstanceLike>(
global: &G,
stmt: CreateModel,
@ -268,7 +292,10 @@ impl Model {
let (space_name, model_name) = (stmt.model_name.space(), stmt.model_name.entity());
let if_nx = stmt.if_not_exists;
let model = Self::process_create(stmt)?;
global.state().ddl_with_space_mut(&space_name, |space| {
global
.state()
.namespace()
.ddl_with_space_mut(&space_name, |space| {
// TODO(@ohsayan): be extra cautious with post-transactional tasks (memck)
if space.models().contains(model_name) {
if if_nx {
@ -278,7 +305,6 @@ impl Model {
}
}
// since we've locked this down, no one else can parallely create another model in the same space (or remove)
let mut txn_driver = global.gns_driver().lock();
// prepare txn
let txn = gns::model::CreateModelTxn::new(
SpaceIDRef::new(&space_name, &space),
@ -286,31 +312,32 @@ impl Model {
&model,
);
// attempt to initialize driver
global.initialize_model_driver(
let mdl_driver = global.initialize_model_driver(
&space_name,
space.get_uuid(),
&model_name,
model.get_uuid(),
)?;
// commit txn
txn_driver.driver_context(
global.state().gns_driver().driver_context(
|drv| drv.commit_event(txn),
|| {
global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir(
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_model_dir(
&space_name,
space.get_uuid(),
&model_name,
model.get_uuid(),
)))
),
))
},
)?;
// update global state
let _ = space.models_mut().insert(model_name.into());
let _ = global
.state()
.idx_models()
.write()
.insert(EntityID::new(&space_name, &model_name), model);
let _ = global.state().namespace().idx_models().write().insert(
EntityID::new(&space_name, &model_name),
Model::new(model, mdl_driver),
);
if if_nx {
Ok(Some(true))
} else {
@ -323,7 +350,10 @@ impl Model {
stmt: DropModel,
) -> QueryResult<Option<bool>> {
let (space_name, model_name) = (stmt.entity.space(), stmt.entity.entity());
global.state().ddl_with_space_mut(&space_name, |space| {
global
.state()
.namespace()
.ddl_with_space_mut(&space_name, |space| {
if !space.models().contains(model_name) {
if stmt.if_exists {
return Ok(Some(false));
@ -333,12 +363,12 @@ impl Model {
}
}
// get exclusive lock on models
let mut models_idx = global.state().idx_models().write();
let mut models_idx = global.state().namespace().idx_models().write();
let model = models_idx
.get(&EntityIDRef::new(&space_name, &model_name))
.unwrap();
// the model must be empty for us to clean it up! (NB: consistent view + EX)
if (model.primary_index().count() != 0) & !(stmt.force) {
if (model.data.primary_index().count() != 0) & !(stmt.force) {
// nope, we can't drop this
return Err(QueryError::QExecDdlNotEmpty);
}
@ -347,21 +377,24 @@ impl Model {
let txn = gns::model::DropModelTxn::new(ModelIDRef::new(
SpaceIDRef::new(&space_name, &space),
&model_name,
model.get_uuid(),
model.delta_state().schema_current_version().value_u64(),
model.data.get_uuid(),
model
.data
.delta_state()
.schema_current_version()
.value_u64(),
));
// commit txn
global
.state()
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
// request cleanup
global.purge_model_driver(
space_name,
space.get_uuid(),
model_name,
model.get_uuid(),
false,
model.data().get_uuid(),
);
// update global state
let _ = models_idx.remove(&EntityIDRef::new(&space_name, &model_name));
@ -429,7 +462,7 @@ impl ModelPrivate {
}
pub struct ModelMutator<'a> {
model: &'a mut Model,
model: &'a mut ModelData,
}
impl<'a> ModelMutator<'a> {

@ -24,11 +24,8 @@
*
*/
use crate::engine::storage::safe_interfaces::{paths_v1, FileSystem};
use super::EntityIDRef;
use {
super::EntityIDRef,
crate::engine::{
data::{dict, uuid::Uuid, DictEntryGeneric, DictGeneric},
error::{QueryError, QueryResult},
@ -157,7 +154,7 @@ impl Space {
if_not_exists,
} = Self::process_create(space)?;
// lock the global namespace
global.state().ddl_with_spaces_write(|spaces| {
global.state().namespace().ddl_with_spaces_write(|spaces| {
if spaces.st_contains(&space_name) {
if if_not_exists {
return Ok(Some(false));
@ -169,9 +166,9 @@ impl Space {
// prepare txn
let txn = txn::gns::space::CreateSpaceTxn::new(space.props(), &space_name, &space);
// try to create space for...the space
FileSystem::create_dir_all(&paths_v1::space_dir(&space_name, space.get_uuid()))?;
global.initialize_space(&space_name, space.get_uuid())?;
// commit txn
global.gns_driver().lock().driver_context(
global.state().gns_driver().driver_context(
|drv| drv.commit_event(txn),
|| {
global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir(
@ -197,7 +194,10 @@ impl Space {
updated_props,
}: AlterSpace,
) -> QueryResult<()> {
global.state().ddl_with_space_mut(&space_name, |space| {
global
.state()
.namespace()
.ddl_with_space_mut(&space_name, |space| {
match updated_props.get(Self::KEY_ENV) {
Some(DictEntryGeneric::Map(_)) if updated_props.len() == 1 => {}
Some(DictEntryGeneric::Data(l)) if updated_props.len() == 1 && l.is_null() => {}
@ -210,13 +210,15 @@ impl Space {
None => return Err(QueryError::QExecDdlInvalidProperties),
};
// prepare txn
let txn =
txn::gns::space::AlterSpaceTxn::new(SpaceIDRef::new(&space_name, space), &patch);
let txn = txn::gns::space::AlterSpaceTxn::new(
SpaceIDRef::new(&space_name, space),
&patch,
);
// commit
// commit txn
global
.state()
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
// merge
dict::rmerge_data_with_patch(space.props_mut(), patch);
@ -237,7 +239,10 @@ impl Space {
}: DropSpace,
) -> QueryResult<Option<bool>> {
if force {
global.state().ddl_with_all_mut(|spaces, models| {
global
.state()
.namespace()
.ddl_with_all_mut(|spaces, models| {
let Some(space) = spaces.remove(space_name.as_str()) else {
if if_exists {
return Ok(Some(false));
@ -247,17 +252,17 @@ impl Space {
};
// commit drop
// prepare txn
let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
let txn =
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global
.state()
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir(
&space_name,
space.get_uuid(),
)));
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()),
));
let space_uuid = space.get_uuid();
for model in space.models.into_iter() {
let e: EntityIDRef<'static> = unsafe {
@ -269,8 +274,7 @@ impl Space {
&space_name,
space_uuid,
&model,
mdl.get_uuid(),
true,
mdl.data().get_uuid(),
);
}
let _ = spaces.st_delete(space_name.as_str());
@ -281,7 +285,7 @@ impl Space {
}
})
} else {
global.state().ddl_with_spaces_write(|spaces| {
global.state().namespace().ddl_with_spaces_write(|spaces| {
let Some(space) = spaces.get(space_name.as_str()) else {
if if_exists {
return Ok(Some(false));
@ -298,8 +302,8 @@ impl Space {
let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global
.state()
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir(

@ -131,7 +131,7 @@ impl SystemDatabase {
return Err(QueryError::SysAuthError);
}
let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap();
global.gns_driver().lock().driver_context(
global.state().gns_driver().driver_context(
|drv| drv.commit_event(CreateUserTxn::new(&username, &password_hash)),
|| {},
)?;
@ -147,7 +147,7 @@ impl SystemDatabase {
match self.users.write().get_mut(username) {
Some(user) => {
let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap();
global.gns_driver().lock().driver_context(
global.state().gns_driver().driver_context(
|drv| drv.commit_event(AlterUserTxn::new(username, &password_hash)),
|| {},
)?;
@ -163,8 +163,8 @@ impl SystemDatabase {
return Err(QueryError::SysAuthError);
}
global
.state()
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(DropUserTxn::new(username)), || {})?;
let _ = users.remove(username);
Ok(())

@ -26,7 +26,7 @@
use crate::engine::{
core::{
model::{alt::AlterPlan, Model},
model::{alt::AlterPlan, ModelData},
tests::ddl_model::{create, exec_create},
EntityIDRef,
},
@ -50,25 +50,26 @@ fn exec_plan(
new_space: bool,
model: &str,
plan: &str,
f: impl Fn(&Model),
f: impl Fn(&ModelData),
) -> QueryResult<()> {
let mdl_name = exec_create(global, model, new_space)?;
let prev_uuid = {
global
.state()
.namespace()
.idx_models()
.read()
.get(&EntityIDRef::new("myspace", &mdl_name))
.map(|mdl| mdl.get_uuid())
.map(|mdl| mdl.data().get_uuid())
.unwrap()
};
let tok = lex_insecure(plan.as_bytes()).unwrap();
let alter = parse_ast_node_full::<AlterModel>(&tok[2..]).unwrap();
Model::transactional_exec_alter(global, alter)?;
let models = global.state().idx_models().read();
ModelData::transactional_exec_alter(global, alter)?;
let models = global.state().namespace().idx_models().read();
let model = models.get(&EntityIDRef::new("myspace", &mdl_name)).unwrap();
assert_eq!(prev_uuid, model.get_uuid());
f(model);
assert_eq!(prev_uuid, model.data().get_uuid());
f(model.data());
Ok(())
}

@ -29,16 +29,16 @@ mod crt;
mod layer;
use crate::engine::{
core::{model::Model, EntityIDRef},
core::{model::ModelData, EntityIDRef},
error::QueryResult,
fractal::GlobalInstanceLike,
ql::{ast::parse_ast_node_full, ddl::crt::CreateModel, tests::lex_insecure},
};
fn create(s: &str) -> QueryResult<Model> {
fn create(s: &str) -> QueryResult<ModelData> {
let tok = lex_insecure(s.as_bytes()).unwrap();
let create_model = parse_ast_node_full(&tok[2..]).unwrap();
Model::process_create(create_model)
ModelData::process_create(create_model)
}
pub fn exec_create(
@ -52,9 +52,10 @@ pub fn exec_create(
if create_new_space {
global
.state()
.namespace()
.create_empty_test_space(create_model.model_name.space())
}
Model::transactional_exec_create(global, create_model).map(|_| name)
ModelData::transactional_exec_create(global, create_model).map(|_| name)
}
pub fn exec_create_new_space(
@ -68,9 +69,9 @@ fn with_model(
global: &impl GlobalInstanceLike,
space_id: &str,
model_name: &str,
f: impl Fn(&Model),
f: impl Fn(&ModelData),
) {
let models = global.state().idx_models().read();
let models = global.state().namespace().idx_models().read();
let model = models.get(&EntityIDRef::new(space_id, model_name)).unwrap();
f(model)
f(model.data())
}

@ -48,7 +48,7 @@ pub fn exec_create(
ast::parse_ast_node_full::<crate::engine::ql::ddl::crt::CreateSpace>(&tok[2..]).unwrap();
let name = ast_node.space_name;
Space::transactional_exec_create(gns, ast_node)?;
gns.state().ddl_with_space_mut(&name, |space| {
gns.state().namespace().ddl_with_space_mut(&name, |space| {
verify(space);
Ok(space.get_uuid())
})
@ -64,7 +64,7 @@ pub fn exec_alter(
ast::parse_ast_node_full::<crate::engine::ql::ddl::alt::AlterSpace>(&tok[2..]).unwrap();
let name = ast_node.space_name;
Space::transactional_exec_alter(gns, ast_node)?;
gns.state().ddl_with_space_mut(&name, |space| {
gns.state().namespace().ddl_with_space_mut(&name, |space| {
verify(space);
Ok(space.get_uuid())
})

@ -30,7 +30,7 @@ mod select;
mod update;
use crate::engine::{
core::{dml, index::Row, model::Model, space::Space, EntityIDRef},
core::{dml, index::Row, model::ModelData, space::Space, EntityIDRef},
data::{cell::Datacell, lit::Lit},
error::QueryResult,
fractal::GlobalInstanceLike,
@ -45,12 +45,13 @@ use crate::engine::{
fn _exec_only_create_space_model(global: &impl GlobalInstanceLike, model: &str) -> QueryResult<()> {
let _ = global
.state()
.namespace()
.idx()
.write()
.insert("myspace".into(), Space::new_auto_all().into());
let lex_create_model = lex_insecure(model.as_bytes()).unwrap();
let stmt_create_model = parse_ast_node_full(&lex_create_model[2..]).unwrap();
Model::transactional_exec_create(global, stmt_create_model).map(|_| ())
ModelData::transactional_exec_create(global, stmt_create_model).map(|_| ())
}
fn _exec_only_insert<T>(
@ -73,7 +74,7 @@ fn _exec_only_read_key_and_then<T>(
and_then: impl Fn(Row) -> T,
) -> QueryResult<T> {
let guard = sync::atm::cpin();
global.state().with_model(entity, |mdl| {
global.state().namespace().with_model(entity, |mdl| {
let row = mdl
.primary_index()
.select(Lit::from(key_name), &guard)
@ -90,7 +91,7 @@ fn _exec_delete_only(global: &impl GlobalInstanceLike, delete: &str, key: &str)
let entity = delete.entity();
dml::delete(global, delete)?;
assert_eq!(
global.state().with_model(entity, |model| {
global.state().namespace().with_model(entity, |model| {
let g = sync::atm::cpin();
Ok(model.primary_index().select(key.into(), &g).is_none())
}),

@ -46,7 +46,9 @@ fn simple_select_wildcard() {
#[test]
fn simple_select_specified_same_order() {
let global = TestGlobal::new_with_driver_id_instant_update("dml_select_simple_select_specified_same_order");
let global = TestGlobal::new_with_driver_id_instant_update(
"dml_select_simple_select_specified_same_order",
);
assert_eq!(
super::exec_select(
&global,
@ -61,8 +63,9 @@ fn simple_select_specified_same_order() {
#[test]
fn simple_select_specified_reversed_order() {
let global =
TestGlobal::new_with_driver_id_instant_update("dml_select_simple_select_specified_reversed_order");
let global = TestGlobal::new_with_driver_id_instant_update(
"dml_select_simple_select_specified_reversed_order",
);
assert_eq!(
super::exec_select(
&global,

@ -25,7 +25,7 @@
*/
use {
super::{util, ModelUniqueID},
super::util,
crate::{
engine::{
error::{QueryError, QueryResult, RuntimeResult},
@ -33,32 +33,33 @@ use {
},
util::compiler,
},
parking_lot::{Mutex, RwLock},
std::{collections::HashMap, sync::Arc},
parking_lot::Mutex,
};
/// GNS driver
#[derive(Debug)]
pub struct FractalGNSDriver {
status: util::Status,
pub(super) txn_driver: GNSDriver,
pub(super) txn_driver: Mutex<GNSDriver>,
}
impl FractalGNSDriver {
pub(super) fn new(txn_driver: GNSDriver) -> Self {
pub fn new(txn_driver: GNSDriver) -> Self {
Self {
status: util::Status::new_okay(),
txn_driver: txn_driver,
txn_driver: Mutex::new(txn_driver),
}
}
pub fn driver_context<T>(
&mut self,
&self,
f: impl Fn(&mut GNSDriver) -> RuntimeResult<T>,
on_failure: impl Fn(),
) -> QueryResult<T> {
if self.status.is_iffy() {
return Err(QueryError::SysServerError);
}
match f(&mut self.txn_driver) {
let mut txn_driver = self.txn_driver.lock();
match f(&mut txn_driver) {
Ok(v) => Ok(v),
Err(e) => compiler::cold_call(|| {
error!("GNS driver failed with: {e}");
@ -70,58 +71,43 @@ impl FractalGNSDriver {
}
}
pub struct ModelDrivers {
drivers: RwLock<HashMap<ModelUniqueID, FractalModelDriver>>,
/// Model driver
#[derive(Debug)]
#[must_use]
pub struct FractalModelDriver {
status: util::Status,
batch_driver: Mutex<Option<ModelDriver>>,
}
impl ModelDrivers {
pub fn empty() -> Self {
impl FractalModelDriver {
pub const fn uninitialized() -> Self {
Self {
drivers: RwLock::new(HashMap::new()),
}
}
pub fn drivers(&self) -> &RwLock<HashMap<ModelUniqueID, FractalModelDriver>> {
&self.drivers
}
pub fn count(&self) -> usize {
self.drivers.read().len()
}
pub fn add_driver(&self, id: ModelUniqueID, batch_driver: ModelDriver) {
assert!(self
.drivers
.write()
.insert(id, FractalModelDriver::init(batch_driver))
.is_none());
}
pub fn remove_driver(&self, id: ModelUniqueID) {
assert!(self.drivers.write().remove(&id).is_some())
status: util::Status::new_okay(),
batch_driver: Mutex::new(None),
}
pub fn into_inner(self) -> HashMap<ModelUniqueID, FractalModelDriver> {
self.drivers.into_inner()
}
pub fn initialize_model_driver(&self, driver: ModelDriver) {
let mut drv = self.batch_driver.lock();
if drv.is_none() {
*drv = Some(driver);
} else {
panic!("driver already initialized")
}
/// Model driver
pub struct FractalModelDriver {
status: Arc<util::Status>,
batch_driver: Mutex<ModelDriver>,
}
impl FractalModelDriver {
pub(in crate::engine::fractal) fn init(batch_driver: ModelDriver) -> Self {
Self {
status: Arc::new(util::Status::new_okay()),
batch_driver: Mutex::new(batch_driver),
status: util::Status::new_okay(),
batch_driver: Mutex::new(Some(batch_driver)),
}
}
pub fn status(&self) -> &util::Status {
&self.status
}
/// Returns a reference to the batch persist driver
pub fn batch_driver(&self) -> &Mutex<ModelDriver> {
pub fn batch_driver(&self) -> &Mutex<Option<ModelDriver>> {
&self.batch_driver
}
pub fn close(self) -> RuntimeResult<()> {
ModelDriver::close_driver(&mut self.batch_driver.into_inner())
ModelDriver::close_driver(&mut self.batch_driver.into_inner().unwrap())
}
}

@ -29,11 +29,12 @@ use {
crate::{
engine::{
core::{
model::{delta::DataDelta, Model},
model::{delta::DataDelta, ModelData},
EntityIDRef,
},
data::uuid::Uuid,
error::ErrorKind,
fractal::GlobalInstanceLike,
storage::{
safe_interfaces::{paths_v1, StdModelBatch},
BatchStats,
@ -289,20 +290,13 @@ impl FractalMgr {
match task {
CriticalTask::WriteBatch(model_id, observed_size) => {
info!("fhp: {model_id} has reached cache capacity. writing to disk");
let mdl_drivers = global.get_state().get_mdl_drivers().drivers().read();
let Some(mdl_driver) = mdl_drivers.get(&model_id) else {
// because we maximize throughput, the model driver may have been already removed but this task
// was way behind in the queue
return;
};
let mdl_read = global._namespace().idx_models().read();
let mdl_read = global.state().namespace().idx_models().read();
let mdl = match mdl_read.get(&EntityIDRef::new(
model_id.space().into(),
model_id.model().into(),
)) {
Some(mdl) if mdl.get_uuid() != model_id.uuid() => {
// so the model driver was not removed, neither was the model *yet* but we happened to find the task
// just return
Some(mdl) if mdl.data().get_uuid() != model_id.uuid() => {
// this is a different model with the same entity path
return;
}
Some(mdl) => mdl,
@ -310,7 +304,7 @@ impl FractalMgr {
panic!("found deleted model")
}
};
match Self::try_write_model_data_batch(mdl, observed_size, mdl_driver) {
match Self::try_write_model_data_batch(mdl.data(), observed_size, mdl.driver()) {
Ok(()) => {
if observed_size != 0 {
info!("fhp: completed maintenance task for {model_id}, synced={observed_size}")
@ -391,40 +385,34 @@ impl FractalMgr {
}
}
fn general_executor(&'static self, global: super::Global) {
let mdl_drivers = global.get_state().get_mdl_drivers().drivers().read();
for (model_id, driver) in mdl_drivers.iter() {
let mdl_read = global._namespace().idx_models().read();
let mdl = match mdl_read.get(&EntityIDRef::new(
model_id.space().into(),
model_id.model().into(),
)) {
Some(mdl) if mdl.get_uuid() != model_id.uuid() => {
// so the model driver was not removed, neither was the model *yet* but we happened to find the task
// just return
return;
}
Some(mdl) => mdl,
None => {
panic!("found deleted model")
}
};
let observed_len = mdl
for (model_id, model) in global.state().namespace().idx_models().read().iter() {
let observed_len = model
.data()
.delta_state()
.__fractal_take_full_from_data_delta(super::FractalToken::new());
match Self::try_write_model_data_batch(mdl, observed_len, driver) {
match Self::try_write_model_data_batch(model.data(), observed_len, model.driver()) {
Ok(()) => {
if observed_len != 0 {
info!(
"flp: completed maintenance task for {model_id}, synced={observed_len}"
"flp: completed maintenance task for {}.{}, synced={observed_len}",
model_id.space(),
model_id.entity()
)
}
}
Err((e, stats)) => {
info!("flp: failed to sync data for {model_id} with {e}. promoting to higher priority");
info!(
"flp: failed to sync data for {}.{} with erro `{e}`. promoting to higher priority",
model_id.space(), model_id.entity(),
);
// this failure is *not* good, so we want to promote this to a critical task
self.hp_dispatcher
.send(Task::new(CriticalTask::WriteBatch(
model_id.clone(),
ModelUniqueID::new(
model_id.space(),
model_id.entity(),
model.data().get_uuid(),
),
observed_len - stats.get_actual(),
)))
.unwrap()
@ -440,11 +428,11 @@ impl FractalMgr {
///
/// The zero check is essential
fn try_write_model_data_batch(
model: &Model,
model: &ModelData,
observed_size: usize,
mdl_driver: &super::drivers::FractalModelDriver,
mdl_driver_: &super::drivers::FractalModelDriver,
) -> Result<(), (super::error::Error, BatchStats)> {
if mdl_driver.status().is_iffy() {
if mdl_driver_.status().is_iffy() {
// don't mess this up any further
return Err((
super::error::Error::from(ErrorKind::Other(
@ -459,14 +447,15 @@ impl FractalMgr {
}
// try flushing the batch
let batch_stats = BatchStats::new();
let mut batch_driver = mdl_driver.batch_driver().lock();
let mut mdl_driver = mdl_driver_.batch_driver().lock();
let batch_driver = mdl_driver.as_mut().unwrap();
batch_driver
.commit_with_ctx(
StdModelBatch::new(model, observed_size),
batch_stats.clone(),
)
.map_err(|e| {
mdl_driver.status().set_iffy();
mdl_driver_.status().set_iffy();
(e, BatchStats::into_inner(batch_stats))
})
}

@ -26,7 +26,7 @@
use {
super::{
core::{dml::QueryExecMeta, model::Model, GlobalNS},
core::{dml::QueryExecMeta, model::ModelData, GlobalNS},
data::uuid::Uuid,
storage::{
safe_interfaces::{paths_v1, FileSystem},
@ -34,7 +34,6 @@ use {
},
},
crate::engine::error::RuntimeResult,
parking_lot::Mutex,
std::{fmt, mem::MaybeUninit},
tokio::sync::mpsc::unbounded_channel,
};
@ -47,7 +46,7 @@ mod mgr;
pub mod test_utils;
mod util;
pub use {
drivers::ModelDrivers,
drivers::{FractalGNSDriver, FractalModelDriver},
mgr::{CriticalTask, GenericTask, Task, GENERAL_EXECUTOR_WINDOW},
util::FractalToken,
};
@ -68,19 +67,12 @@ pub struct GlobalStateStart {
/// ## Safety
///
/// Must be called iff this is the only thread calling it
pub unsafe fn load_and_enable_all(
gns: GlobalNS,
gns_driver: GNSDriver,
model_drivers: ModelDrivers,
) -> GlobalStateStart {
let model_cnt_on_boot = model_drivers.count();
let gns_driver = drivers::FractalGNSDriver::new(gns_driver);
pub unsafe fn load_and_enable_all(gns: GlobalNS) -> GlobalStateStart {
let model_cnt_on_boot = gns.namespace().idx_models().read().len();
let (hp_sender, hp_recv) = unbounded_channel();
let (lp_sender, lp_recv) = unbounded_channel();
let global_state = GlobalState::new(
gns,
gns_driver,
model_drivers,
mgr::FractalMgr::new(hp_sender, lp_sender, model_cnt_on_boot),
);
*Global::__gref_raw() = MaybeUninit::new(global_state);
@ -101,7 +93,11 @@ pub trait GlobalInstanceLike {
fn get_max_delta_size(&self) -> usize;
// global namespace
fn state(&self) -> &GlobalNS;
fn gns_driver(&self) -> &Mutex<drivers::FractalGNSDriver>;
fn initialize_space(&self, space_name: &str, space_uuid: Uuid) -> RuntimeResult<()> {
e!(FileSystem::create_dir_all(&paths_v1::space_dir(
space_name, space_uuid
)))
}
// model drivers
fn initialize_model_driver(
&self,
@ -109,14 +105,13 @@ pub trait GlobalInstanceLike {
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> RuntimeResult<()>;
) -> RuntimeResult<FractalModelDriver>;
fn purge_model_driver(
&self,
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
skip_delete: bool,
);
// taskmgr
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>);
@ -126,7 +121,7 @@ pub trait GlobalInstanceLike {
&self,
space_name: &str,
model_name: &str,
model: &Model,
model: &ModelData,
hint: QueryExecMeta,
) {
// check if we need to sync
@ -150,9 +145,6 @@ impl GlobalInstanceLike for Global {
fn state(&self) -> &GlobalNS {
self._namespace()
}
fn gns_driver(&self) -> &Mutex<drivers::FractalGNSDriver> {
&self.get_state().gns_driver
}
// taskmgr
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>) {
self._post_high_priority_task(task)
@ -171,23 +163,18 @@ impl GlobalInstanceLike for Global {
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
skip_delete: bool,
) {
let id = ModelUniqueID::new(space_name, model_name, model_uuid);
self.get_state().mdl_driver.remove_driver(id);
if !skip_delete {
self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir(
space_name, space_uuid, model_name, model_uuid,
)));
}
}
fn initialize_model_driver(
&self,
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> RuntimeResult<()> {
) -> RuntimeResult<FractalModelDriver> {
// create dir
FileSystem::create_dir(&paths_v1::model_dir(
space_name, space_uuid, model_name, model_uuid,
@ -196,11 +183,7 @@ impl GlobalInstanceLike for Global {
let driver = ModelDriver::create_model_driver(&paths_v1::model_path(
space_name, space_uuid, model_name, model_uuid,
))?;
self.get_state().mdl_driver.add_driver(
ModelUniqueID::new(space_name, model_name, model_uuid),
driver,
);
Ok(())
Ok(FractalModelDriver::init(driver))
}
}
@ -247,16 +230,17 @@ impl Global {
}
pub unsafe fn unload_all(self) {
// TODO(@ohsayan): handle errors
let GlobalState {
gns_driver,
mdl_driver,
..
} = Self::__gref_raw().assume_init_read();
let mut gns_driver = gns_driver.into_inner().txn_driver;
let mdl_drivers = mdl_driver.into_inner();
let GlobalState { gns, .. } = Self::__gref_raw().assume_init_read();
let mut gns_driver = gns.gns_driver().txn_driver.lock();
GNSDriver::close_driver(&mut gns_driver).unwrap();
for (_, driver) in mdl_drivers {
driver.close().unwrap();
for mdl in gns
.namespace()
.idx_models()
.write()
.drain()
.map(|(_, mdl)| mdl)
{
mdl.into_driver().close().unwrap();
}
}
}
@ -268,27 +252,12 @@ impl Global {
/// The global state
struct GlobalState {
gns: GlobalNS,
gns_driver: Mutex<drivers::FractalGNSDriver>,
mdl_driver: ModelDrivers,
task_mgr: mgr::FractalMgr,
}
impl GlobalState {
fn new(
gns: GlobalNS,
gns_driver: drivers::FractalGNSDriver,
mdl_driver: ModelDrivers,
task_mgr: mgr::FractalMgr,
) -> Self {
Self {
gns,
gns_driver: Mutex::new(gns_driver),
mdl_driver,
task_mgr,
}
}
pub(self) fn get_mdl_drivers(&self) -> &ModelDrivers {
&self.mdl_driver
fn new(gns: GlobalNS, task_mgr: mgr::FractalMgr) -> Self {
Self { gns, task_mgr }
}
pub(self) fn fractal_mgr(&self) -> &mgr::FractalMgr {
&self.task_mgr

@ -26,22 +26,20 @@
use {
super::{
drivers::FractalGNSDriver, CriticalTask, GenericTask, GlobalInstanceLike, ModelUniqueID,
Task,
drivers::FractalGNSDriver, CriticalTask, FractalModelDriver, GenericTask,
GlobalInstanceLike, Task,
},
crate::engine::{
core::{EntityIDRef, GlobalNS},
core::{EntityIDRef, GNSData, GlobalNS},
data::uuid::Uuid,
error::ErrorKind,
fractal::drivers::FractalModelDriver,
storage::{
safe_interfaces::{paths_v1, FileSystem, StdModelBatch},
BatchStats, GNSDriver, ModelDriver,
},
RuntimeResult,
},
parking_lot::{Mutex, RwLock},
std::collections::HashMap,
parking_lot::RwLock,
};
/// A `test` mode global implementation
@ -50,19 +48,15 @@ pub struct TestGlobal {
lp_queue: RwLock<Vec<Task<GenericTask>>>,
#[allow(unused)]
max_delta_size: usize,
txn_driver: Mutex<FractalGNSDriver>,
model_drivers: RwLock<HashMap<ModelUniqueID, super::drivers::FractalModelDriver>>,
max_data_pressure: usize,
}
impl TestGlobal {
fn new(gns: GlobalNS, max_delta_size: usize, txn_driver: GNSDriver) -> Self {
fn new(gns: GlobalNS, max_delta_size: usize) -> Self {
Self {
gns,
lp_queue: RwLock::default(),
max_delta_size,
txn_driver: Mutex::new(FractalGNSDriver::new(txn_driver)),
model_drivers: RwLock::default(),
max_data_pressure: usize::MAX,
}
}
@ -72,25 +66,20 @@ impl TestGlobal {
/// Normally, model drivers are not loaded on startup because of shared global state. Calling this will attempt to load
/// all model drivers
pub fn load_model_drivers(&self) -> RuntimeResult<()> {
let mut mdl_drivers = self.model_drivers.write();
let space_idx = self.gns.idx().read();
for (model_name, model) in self.gns.idx_models().read().iter() {
let space_idx = self.gns.namespace().idx().read();
for (model_name, model) in self.gns.namespace().idx_models().read().iter() {
let model_data = model.data();
let space_uuid = space_idx.get(model_name.space()).unwrap().get_uuid();
let driver = ModelDriver::open_model_driver(
model,
model_data,
&paths_v1::model_path(
model_name.space(),
space_uuid,
model_name.entity(),
model.get_uuid(),
model_data.get_uuid(),
),
)?;
assert!(mdl_drivers
.insert(
ModelUniqueID::new(model_name.space(), model_name.entity(), model.get_uuid()),
FractalModelDriver::init(driver)
)
.is_none());
model.driver().initialize_model_driver(driver);
}
Ok(())
}
@ -103,13 +92,13 @@ impl TestGlobal {
me
}
pub fn new_with_driver_id(log_name: &str) -> Self {
let gns = GlobalNS::empty();
let data = GNSData::empty();
let driver = match GNSDriver::create_gns_with_name(log_name) {
Ok(drv) => Ok(drv),
Err(e) => match e.kind() {
ErrorKind::IoError(e_) => match e_.kind() {
std::io::ErrorKind::AlreadyExists => {
GNSDriver::open_gns_with_name(log_name, &gns)
GNSDriver::open_gns_with_name(log_name, &data)
}
_ => Err(e),
},
@ -117,7 +106,7 @@ impl TestGlobal {
},
}
.unwrap();
Self::new(gns, 0, driver)
Self::new(GlobalNS::new(data, FractalGNSDriver::new(driver)), 0)
}
}
@ -125,24 +114,19 @@ impl GlobalInstanceLike for TestGlobal {
fn state(&self) -> &GlobalNS {
&self.gns
}
fn gns_driver(&self) -> &Mutex<FractalGNSDriver> {
&self.txn_driver
}
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>) {
match task.into_task() {
CriticalTask::WriteBatch(mdl_id, count) => {
let models = self.gns.idx_models().read();
let models = self.gns.namespace().idx_models().read();
let mdl = models
.get(&EntityIDRef::new(mdl_id.space(), mdl_id.model()))
.unwrap();
self.model_drivers
.read()
.get(&mdl_id)
let mut mdl_driver = mdl.driver().batch_driver().lock();
mdl_driver
.as_mut()
.unwrap()
.commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new())
.unwrap()
.batch_driver()
.lock()
.commit_with_ctx(StdModelBatch::new(mdl, count), BatchStats::new())
.unwrap();
}
}
}
@ -158,26 +142,18 @@ impl GlobalInstanceLike for TestGlobal {
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
skip_delete: bool,
) {
let id = ModelUniqueID::new(space_name, model_name, model_uuid);
self.model_drivers
.write()
.remove(&id)
.expect("tried to remove non-existent model");
if !skip_delete {
self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir(
space_name, space_uuid, model_name, model_uuid,
)));
}
}
fn initialize_model_driver(
&self,
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> crate::engine::error::RuntimeResult<()> {
) -> crate::engine::error::RuntimeResult<FractalModelDriver> {
// create model dir
FileSystem::create_dir_all(&paths_v1::model_dir(
space_name, space_uuid, model_name, model_uuid,
@ -185,20 +161,16 @@ impl GlobalInstanceLike for TestGlobal {
let driver = ModelDriver::create_model_driver(&paths_v1::model_path(
space_name, space_uuid, model_name, model_uuid,
))?;
self.model_drivers.write().insert(
ModelUniqueID::new(space_name, model_name, model_uuid),
super::drivers::FractalModelDriver::init(driver),
);
Ok(())
Ok(super::drivers::FractalModelDriver::init(driver))
}
}
impl Drop for TestGlobal {
fn drop(&mut self) {
let mut txn_driver = self.txn_driver.lock();
GNSDriver::close_driver(&mut txn_driver.txn_driver).unwrap();
for (_, model_driver) in self.model_drivers.write().drain() {
model_driver.close().unwrap();
let mut txn_driver = self.gns.gns_driver().txn_driver.lock();
GNSDriver::close_driver(&mut txn_driver).unwrap();
for (_, model_driver) in self.gns.namespace().idx_models().write().drain() {
model_driver.into_driver().close().unwrap();
}
}
}

@ -71,15 +71,11 @@ pub fn load_all(
}
info!("starting storage engine");
context::set_origin(Subsystem::Storage);
let SELoaded {
gns,
gns_driver,
model_drivers,
} = storage::load(&config)?;
let SELoaded { gns } = storage::load(&config)?;
info!("storage engine ready. initializing system");
let global = unsafe {
// UNSAFE(@ohsayan): the only call we ever make
fractal::load_and_enable_all(gns, gns_driver, model_drivers)
fractal::load_and_enable_all(gns)
};
Ok((config, global))
}

@ -293,6 +293,7 @@ async fn do_handshake<S: Socket>(
Ok(uname) => {
match global
.state()
.namespace()
.sys_db()
.verify_user(uname, handshake.hs_auth().password())
{

@ -30,15 +30,13 @@
file system
*/
use std::io::BufWriter;
#[cfg(test)]
use super::vfs::{VFileDescriptor, VirtualFS};
use {
crate::IoResult,
std::{
fs as std_fs,
io::{BufReader, Error, ErrorKind, Read, Seek, SeekFrom, Write},
io::{BufReader, BufWriter, Error, ErrorKind, Read, Seek, SeekFrom, Write},
},
};
@ -380,11 +378,13 @@ impl<Lf: FileExt> FileExt for AnyFile<Lf> {
*/
#[cfg(test)]
#[derive(Debug)]
enum AnyFile<Lf = std_fs::File> {
Local(Lf),
Virtual(VFileDescriptor),
}
#[derive(Debug)]
pub struct File {
#[cfg(test)]
f: AnyFile,

@ -24,8 +24,6 @@
*
*/
#![allow(dead_code)]
use {
crate::{engine::sync::cell::Lazy, IoResult},
parking_lot::RwLock,
@ -83,6 +81,7 @@ pub enum FileOpen<CF, EF = CF> {
Existing(EF),
}
#[derive(Debug)]
pub struct VFileDescriptor(pub(super) Box<str>);
impl VFileDescriptor {

@ -40,6 +40,7 @@ use {
util::os::SysIOError,
IoResult,
},
core::fmt,
std::mem,
};
@ -287,13 +288,12 @@ impl<S: FileSpecV1> TrackedReader<S> {
/// interface. It tracks the cursor, automatically buffers writes and in case of buffer flush failure,
/// provides methods to robustly handle errors, down to byte-level cursor tracking in case of failure.
pub struct TrackedWriter<
F,
S: FileSpecV1,
const SIZE: usize = 8192,
const PANIC_IF_UNFLUSHED: bool = true,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = true,
> {
f_d: F,
f_d: File,
f_md: S::Metadata,
t_cursor: u64,
t_checksum: SCrc64,
@ -302,12 +302,32 @@ pub struct TrackedWriter<
}
impl<
F,
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
> fmt::Debug for TrackedWriter<S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
where
S::Metadata: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TrackedWriter")
.field("f_d", &self.f_d)
.field("f_md", &self.f_md)
.field("t_cursor", &self.t_cursor)
.field("t_checksum", &self.t_checksum)
.field("t_partial_checksum", &self.t_partial_checksum)
.field("buf", &self.buf)
.finish()
}
}
impl<
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
fn available_capacity(&self) -> usize {
self.buf.remaining_capacity()
@ -315,14 +335,13 @@ impl<
}
impl<
F,
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
> TrackedWriter<S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
fn _new(f_d: F, f_md: S::Metadata, t_cursor: u64, t_checksum: SCrc64) -> Self {
fn _new(f_d: File, f_md: S::Metadata, t_cursor: u64, t_checksum: SCrc64) -> Self {
Self {
f_d,
f_md,
@ -349,29 +368,27 @@ impl<
}
impl<
F: FileExt,
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
> TrackedWriter<S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
/// Create a new tracked writer
///
/// NB: The cursor is fetched. If the cursor is already available, use [`Self::with_cursor`]
pub fn new(
mut f: SdssFile<S, F>,
) -> IoResult<TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>>
{
mut f: SdssFile<S>,
) -> IoResult<TrackedWriter<S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>> {
f.file_cursor().map(|v| TrackedWriter::with_cursor(f, v))
}
/// Create a new tracked writer with the provided cursor
pub fn with_cursor(f: SdssFile<S, F>, c: u64) -> Self {
pub fn with_cursor(f: SdssFile<S>, c: u64) -> Self {
Self::with_cursor_and_checksum(f, c, SCrc64::new())
}
/// Create a new tracked writer with the provided checksum and cursor
pub fn with_cursor_and_checksum(
SdssFile { file, meta }: SdssFile<S, F>,
SdssFile { file, meta }: SdssFile<S>,
c: u64,
ck: SCrc64,
) -> Self {
@ -383,12 +400,11 @@ impl<
}
impl<
F: FileWrite,
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
> TrackedWriter<S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
/// Same as [`Self::tracked_write_through_buffer`], but the partial state is updated
pub fn dtrack_write_through_buffer(&mut self, buf: &[u8]) -> IoResult<()> {
@ -471,10 +487,7 @@ impl<
Ok(())
}
/// Flush the buffer and then sync data and metadata
pub fn flush_sync(&mut self) -> IoResult<()>
where
F: FileWriteExt,
{
pub fn flush_sync(&mut self) -> IoResult<()> {
self.flush_buf().and_then(|_| self.fsync())
}
/// Flush the buffer
@ -502,21 +515,17 @@ impl<
}
}
}
pub fn fsync(&mut self) -> IoResult<()>
where
F: FileWriteExt,
{
pub fn fsync(&mut self) -> IoResult<()> {
self.f_d.fsync_all()
}
}
impl<
F,
S: FileSpecV1,
const SIZE: usize,
const PANIC_IF_UNFLUSHED: bool,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> Drop for TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
> Drop for TrackedWriter<S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
fn drop(&mut self) {
if PANIC_IF_UNFLUSHED && !self.buf.is_empty() {
@ -544,7 +553,7 @@ fn check_vfs_buffering() {
};
closure! {
// init writer
let mut twriter: TrackedWriter<File, SystemDatabaseV1> =
let mut twriter: TrackedWriter<SystemDatabaseV1> =
TrackedWriter::new(SdssFile::create("myfile")?)?;
assert_eq!(twriter.cursor_usize(), Header::SIZE);
{

@ -27,7 +27,7 @@
use {
crate::{
engine::{
core::GlobalNS,
core::GNSData,
data::uuid::Uuid,
error::{RuntimeResult, StorageError},
mem::BufferedScanner,
@ -68,7 +68,7 @@ where
fn encode_event(commit: Self, buf: &mut Vec<u8>) {
r1::enc::full_into_buffer::<Self>(buf, commit)
}
fn decode_apply(gns: &GlobalNS, data: Vec<u8>) -> RuntimeResult<()> {
fn decode_apply(gns: &GNSData, data: Vec<u8>) -> RuntimeResult<()> {
let mut scanner = BufferedScanner::new(&data);
Self::decode_and_update_global_state(&mut scanner, gns)?;
if scanner.eof() {
@ -79,7 +79,7 @@ where
}
fn decode_and_update_global_state(
scanner: &mut BufferedScanner,
gns: &GlobalNS,
gns: &GNSData,
) -> RuntimeResult<()> {
Self::update_global_state(Self::decode(scanner)?, gns)
}
@ -88,7 +88,7 @@ where
r1::dec::full_from_scanner::<Self>(scanner).map_err(|e| e.into())
}
/// Update the global state from the restored event
fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> RuntimeResult<()>;
fn update_global_state(restore: Self::RestoreType, gns: &GNSData) -> RuntimeResult<()>;
}
#[derive(Debug, PartialEq)]

@ -29,12 +29,13 @@ use {
crate::{
engine::{
core::{
model::{Field, Model},
model::{Field, Model, ModelData},
space::Space,
EntityID, EntityIDRef, GlobalNS,
EntityID, EntityIDRef, GNSData,
},
data::uuid::Uuid,
error::{RuntimeResult, StorageError, TransactionError},
fractal::FractalModelDriver,
idx::{IndexSTSeqCns, STIndex, STIndexSeq},
mem::BufferedScanner,
storage::common_encoding::r1::{self, map, obj, PersistObject},
@ -126,7 +127,7 @@ impl<'a> PersistObject for ModelID<'a> {
}
fn with_space<T>(
gns: &GlobalNS,
gns: &GNSData,
space_id: &super::SpaceIDRes,
f: impl FnOnce(&Space) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
@ -141,7 +142,7 @@ fn with_space<T>(
}
fn with_space_mut<T>(
gns: &GlobalNS,
gns: &GNSData,
space_id: &super::SpaceIDRes,
mut f: impl FnMut(&mut Space) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
@ -156,10 +157,10 @@ fn with_space_mut<T>(
}
fn with_model_mut<T>(
gns: &GlobalNS,
gns: &GNSData,
space_id: &super::SpaceIDRes,
model_id: &ModelIDRes,
f: impl FnOnce(&mut Model) -> RuntimeResult<T>,
f: impl FnOnce(&mut ModelData) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
with_space(gns, space_id, |_| {
let mut models = gns.idx_models().write();
@ -167,11 +168,11 @@ fn with_model_mut<T>(
else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if model.get_uuid() != model_id.model_uuid {
if model.data().get_uuid() != model_id.model_uuid {
// this should have been handled by an earlier transaction
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
f(model)
f(model.data_mut())
})
}
@ -184,7 +185,7 @@ fn with_model_mut<T>(
pub struct CreateModelTxnRestorePL {
pub(super) space_id: super::SpaceIDRes,
pub(super) model_name: Box<str>,
pub(super) model: Model,
pub(super) model: ModelData,
}
pub struct CreateModelTxnMD {
@ -258,9 +259,9 @@ impl<'a> GNSEvent for CreateModelTxn<'a> {
CreateModelTxnRestorePL {
space_id,
model_name,
model,
model: model_data,
}: Self::RestoreType,
gns: &GlobalNS,
gns: &GNSData,
) -> RuntimeResult<()> {
/*
NOTE(@ohsayan):
@ -278,7 +279,10 @@ impl<'a> GNSEvent for CreateModelTxn<'a> {
return Err(TransactionError::OnRestoreDataConflictAlreadyExists.into());
}
if models
.insert(EntityID::new(&space_id.name, &model_name), model)
.insert(
EntityID::new(&space_id.name, &model_name),
Model::new(model_data, FractalModelDriver::uninitialized()),
)
.is_some()
{
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
@ -355,7 +359,7 @@ impl<'a> GNSEvent for AlterModelAddTxn<'a> {
model_id,
new_fields,
}: Self::RestoreType,
gns: &GlobalNS,
gns: &GNSData,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut mutator = model.model_mutator();
@ -445,7 +449,7 @@ impl<'a> GNSEvent for AlterModelRemoveTxn<'a> {
model_id,
removed_fields,
}: Self::RestoreType,
gns: &GlobalNS,
gns: &GNSData,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut mutator = model.model_mutator();
@ -526,7 +530,7 @@ impl<'a> GNSEvent for AlterModelUpdateTxn<'a> {
model_id,
updated_fields,
}: Self::RestoreType,
gns: &GlobalNS,
gns: &GNSData,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut mutator = model.model_mutator();
@ -585,7 +589,7 @@ impl<'a> GNSEvent for DropModelTxn<'a> {
model_uuid,
model_version: _,
}: Self::RestoreType,
gns: &GlobalNS,
gns: &GNSData,
) -> RuntimeResult<()> {
with_space_mut(gns, &space_id, |space| {
let mut models = gns.idx_models().write();
@ -596,7 +600,7 @@ impl<'a> GNSEvent for DropModelTxn<'a> {
else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if removed_model.get_uuid() != model_uuid {
if removed_model.data().get_uuid() != model_uuid {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
Ok(())

@ -28,7 +28,7 @@ use {
super::GNSEvent,
crate::{
engine::{
core::{space::Space, EntityIDRef, GlobalNS},
core::{space::Space, EntityIDRef, GNSData},
data::DictGeneric,
error::{RuntimeResult, TransactionError},
idx::STIndex,
@ -102,7 +102,7 @@ impl<'a> GNSEvent for CreateSpaceTxn<'a> {
type RestoreType = CreateSpaceTxnRestorePL;
fn update_global_state(
CreateSpaceTxnRestorePL { space_name, space }: CreateSpaceTxnRestorePL,
gns: &crate::engine::core::GlobalNS,
gns: &crate::engine::core::GNSData,
) -> RuntimeResult<()> {
let mut spaces = gns.idx().write();
if spaces.st_insert(space_name, space.into()) {
@ -180,7 +180,7 @@ impl<'a> GNSEvent for AlterSpaceTxn<'a> {
space_id,
space_meta,
}: Self::RestoreType,
gns: &crate::engine::core::GlobalNS,
gns: &crate::engine::core::GNSData,
) -> RuntimeResult<()> {
let mut gns = gns.idx().write();
match gns.st_get_mut(&space_id.name) {
@ -229,7 +229,7 @@ impl<'a> GNSEvent for DropSpaceTxn<'a> {
type RestoreType = super::SpaceIDRes;
fn update_global_state(
super::SpaceIDRes { uuid, name }: Self::RestoreType,
gns: &GlobalNS,
gns: &GNSData,
) -> RuntimeResult<()> {
let mut wgns = gns.idx().write();
let mut wmodel = gns.idx_models().write();

@ -26,7 +26,7 @@
use crate::engine::{
core::{
model::{Field, Layer, Model},
model::{Field, Layer, ModelData},
space::Space,
},
data::{cell::Datacell, tag::TagSelector, uuid::Uuid, DictEntryGeneric},
@ -58,6 +58,7 @@ fn init_space(global: &impl GlobalInstanceLike, space_name: &str, env: &str) ->
Space::transactional_exec_create(global, stmt).unwrap();
global
.state()
.namespace()
.idx()
.read()
.get(name.as_str())
@ -76,7 +77,7 @@ fn create_space() {
}
multirun(|| {
let global = TestGlobal::new_with_driver_id(log_name);
let spaces = global.state().idx().read();
let spaces = global.state().namespace().idx().read();
let space = spaces.get("myspace").unwrap();
assert_eq!(
&*space,
@ -106,7 +107,7 @@ fn alter_space() {
}
multirun(|| {
let global = TestGlobal::new_with_driver_id(log_name);
let spaces = global.state().idx().read();
let spaces = global.state().namespace().idx().read();
let space = spaces.get("myspace").unwrap();
assert_eq!(
&*space,
@ -133,7 +134,13 @@ fn drop_space() {
}
multirun(|| {
let global = TestGlobal::new_with_driver_id(log_name);
assert!(global.state().idx().read().get("myspace").is_none());
assert!(global
.state()
.namespace()
.idx()
.read()
.get("myspace")
.is_none());
})
})
}
@ -148,9 +155,10 @@ fn init_model(
let stmt = lex_insecure(query.as_bytes()).unwrap();
let stmt = parse_ast_node_full::<CreateModel>(&stmt[2..]).unwrap();
let model_name = stmt.model_name;
Model::transactional_exec_create(global, stmt).unwrap();
ModelData::transactional_exec_create(global, stmt).unwrap();
global
.state()
.namespace()
.with_model(model_name, |model| Ok(model.get_uuid()))
.unwrap()
}
@ -178,10 +186,11 @@ fn create_model() {
let global = TestGlobal::new_with_driver_id(log_name);
global
.state()
.namespace()
.with_model(("myspace", "mymodel").into(), |model| {
assert_eq!(
model,
&Model::new_restore(
&ModelData::new_restore(
uuid_model,
"username".into(),
TagSelector::String.into_full(),
@ -210,12 +219,13 @@ fn alter_model_add() {
)
.unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Model::transactional_exec_alter(&global, stmt).unwrap();
ModelData::transactional_exec_alter(&global, stmt).unwrap();
}
multirun(|| {
let global = TestGlobal::new_with_driver_id(log_name);
global
.state()
.namespace()
.with_model(("myspace", "mymodel").into(), |model| {
assert_eq!(
model.fields().st_get("profile_pic").unwrap(),
@ -245,12 +255,13 @@ fn alter_model_remove() {
)
.unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Model::transactional_exec_alter(&global, stmt).unwrap();
ModelData::transactional_exec_alter(&global, stmt).unwrap();
}
multirun(|| {
let global = TestGlobal::new_with_driver_id(log_name);
global
.state()
.namespace()
.with_model(("myspace", "mymodel").into(), |model| {
assert!(model.fields().st_get("has_secure_key").is_none());
assert!(model.fields().st_get("is_dumb").is_none());
@ -277,12 +288,13 @@ fn alter_model_update() {
lex_insecure(b"alter model myspace.mymodel update profile_pic { nullable: true }")
.unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Model::transactional_exec_alter(&global, stmt).unwrap();
ModelData::transactional_exec_alter(&global, stmt).unwrap();
}
multirun(|| {
let global = TestGlobal::new_with_driver_id(log_name);
global
.state()
.namespace()
.with_model(("myspace", "mymodel").into(), |model| {
assert_eq!(
model.fields().st_get("profile_pic").unwrap(),
@ -304,13 +316,14 @@ fn drop_model() {
init_default_model(&global);
let stmt = lex_insecure(b"drop model myspace.mymodel").unwrap();
let stmt = parse_ast_node_full(&stmt[2..]).unwrap();
Model::transactional_exec_drop(&global, stmt).unwrap();
ModelData::transactional_exec_drop(&global, stmt).unwrap();
}
multirun(|| {
let global = TestGlobal::new_with_driver_id(log_name);
assert_eq!(
global
.state()
.namespace()
.with_model(("myspace", "mymodel").into(), |_| { Ok(()) })
.unwrap_err(),
QueryError::QExecObjectNotFound

@ -30,7 +30,7 @@ use {
space, SpaceIDRef, SpaceIDRes,
},
crate::engine::{
core::{model::Model, space::Space},
core::{model::ModelData, space::Space},
storage::common_encoding::r1::{dec, enc},
txn::ModelIDRef,
},
@ -95,7 +95,7 @@ mod model_tests {
AlterModelAddTxnRestorePL, AlterModelRemoveTxnRestorePL,
AlterModelUpdateTxnRestorePL, CreateModelTxnRestorePL,
},
Model, Space,
ModelData, Space,
},
crate::engine::{
core::model::{Field, Layer},
@ -106,9 +106,9 @@ mod model_tests {
},
},
};
fn default_space_model() -> (Space, Model) {
fn default_space_model() -> (Space, ModelData) {
let space = Space::new_auto_all();
let model = Model::new_restore(
let model = ModelData::new_restore(
Uuid::new(),
"username".into(),
TagSelector::String.into_full(),

@ -29,7 +29,7 @@ use {
crate::{
engine::{
core::{
model::{Field, Layer, Model},
model::{Field, Layer, ModelData},
space::Space,
},
data::{
@ -405,16 +405,16 @@ impl ModelLayoutMD {
}
#[derive(Clone, Copy)]
pub struct ModelLayoutRef<'a>(pub(super) &'a Model);
impl<'a> From<&'a Model> for ModelLayoutRef<'a> {
fn from(mdl: &'a Model) -> Self {
pub struct ModelLayoutRef<'a>(pub(super) &'a ModelData);
impl<'a> From<&'a ModelData> for ModelLayoutRef<'a> {
fn from(mdl: &'a ModelData) -> Self {
Self(mdl)
}
}
impl<'a> PersistObject for ModelLayoutRef<'a> {
const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64, 3);
type InputType = ModelLayoutRef<'a>;
type OutputType = Model;
type OutputType = ModelData;
type Metadata = ModelLayoutMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.p_key_len as usize)
@ -455,7 +455,7 @@ impl<'a> PersistObject for ModelLayoutRef<'a> {
} else {
TagSelector::from_raw(md.p_key_tag as u8)
};
Ok(Model::new_restore(
Ok(ModelData::new_restore(
md.model_uuid,
key.into_boxed_str(),
ptag.into_full(),

@ -28,7 +28,7 @@ use {
super::obj,
crate::engine::{
core::{
model::{Field, Layer, Model},
model::{Field, Layer, ModelData},
space::Space,
},
data::{
@ -98,7 +98,7 @@ fn fieldmap() {
#[test]
fn model() {
let uuid = Uuid::new();
let model = Model::new_restore(
let model = ModelData::new_restore(
uuid,
"username".into(),
TagSelector::String.into_full(),

@ -32,7 +32,7 @@ use {
super::r1::{dec, impls::gns::GNSEvent, PersistObject},
crate::{
engine::{
core::GlobalNS,
core::GNSData,
error::{StorageError, TransactionError},
mem::BufferedScanner,
txn::gns::sysctl::{AlterUserTxn, CreateUserTxn, DropUserTxn},
@ -51,7 +51,7 @@ impl<'a> GNSEvent for CreateUserTxn<'a> {
type RestoreType = FullUserDefinition;
fn update_global_state(
FullUserDefinition { username, password }: Self::RestoreType,
gns: &GlobalNS,
gns: &GNSData,
) -> RuntimeResult<()> {
if gns.sys_db().__raw_create_user(username, password) {
Ok(())
@ -138,7 +138,7 @@ impl<'a> GNSEvent for AlterUserTxn<'a> {
type RestoreType = FullUserDefinition;
fn update_global_state(
FullUserDefinition { username, password }: Self::RestoreType,
gns: &GlobalNS,
gns: &GNSData,
) -> RuntimeResult<()> {
if gns.sys_db().__raw_alter_user(&username, password) {
Ok(())
@ -200,7 +200,7 @@ impl<'a> GNSEvent for DropUserTxn<'a> {
type RestoreType = DropUserPayload;
fn update_global_state(
DropUserPayload(username): Self::RestoreType,
gns: &GlobalNS,
gns: &GNSData,
) -> RuntimeResult<()> {
if gns.sys_db().__raw_delete_user(&username) {
Ok(())

@ -27,10 +27,7 @@
//! Implementations of the Skytable Disk Storage Subsystem (SDSS)
use {
super::{
config::Configuration, core::GlobalNS, fractal::context, fractal::ModelDrivers,
RuntimeResult,
},
super::{config::Configuration, core::GlobalNS, fractal::context, RuntimeResult},
std::path::Path,
};
@ -58,8 +55,6 @@ pub use v2::impls::{
pub struct SELoaded {
pub gns: GlobalNS,
pub gns_driver: v2::impls::gns_log::GNSDriver,
pub model_drivers: ModelDrivers,
}
pub fn load(cfg: &Configuration) -> RuntimeResult<SELoaded> {

@ -26,7 +26,7 @@
use {
crate::engine::{
core::{EntityIDRef, GlobalNS},
core::{EntityIDRef, GNSData},
error::RuntimeResult,
fractal::{error::ErrorContext, ModelUniqueID},
storage::{
@ -41,8 +41,8 @@ use {
std::collections::HashMap,
};
pub fn load_gns() -> RuntimeResult<GlobalNS> {
let gns = GlobalNS::empty();
pub fn load_gns() -> RuntimeResult<GNSData> {
let gns = GNSData::empty();
let gns_txn_driver =
raw_journal::load_journal::<GNSAdapter, spec::GNSTransactionLogV1>(super::GNS_PATH, &gns)?;
let mut model_drivers = HashMap::new();
@ -55,17 +55,21 @@ pub fn load_gns() -> RuntimeResult<GlobalNS> {
let model = models
.get_mut(&EntityIDRef::new(&space_name, &model_name))
.unwrap();
let path =
paths_v1::model_path(space_name, space_uuid, model_name, model.get_uuid());
let persist_driver = batch_jrnl::reinit(&path, model).inherit_set_dmsg(format!(
"failed to restore model data from journal in `{path}`"
))?;
let path = paths_v1::model_path(
space_name,
space_uuid,
model_name,
model.data().get_uuid(),
);
let persist_driver = batch_jrnl::reinit(&path, model.data()).inherit_set_dmsg(
format!("failed to restore model data from journal in `{path}`"),
)?;
unsafe {
// UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum
model.model_mutator().vacuum_stashed();
model.data_mut().model_mutator().vacuum_stashed();
}
let _ = model_drivers.insert(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
ModelUniqueID::new(space_name, model_name, model.data().get_uuid()),
persist_driver,
);
}

@ -35,7 +35,7 @@ use {
self::raw::sysdb::RestoredSystemDatabase,
super::common::interface::fs::FileSystem,
crate::{
engine::{core::GlobalNS, RuntimeResult},
engine::{core::GNSData, RuntimeResult},
util,
},
};
@ -44,7 +44,7 @@ pub const GNS_PATH: &str = "gns.db-tlog";
pub const SYSDB_PATH: &str = "sys.db";
pub const DATA_DIR: &str = "data";
pub fn load_gns_prepare_migration() -> RuntimeResult<GlobalNS> {
pub fn load_gns_prepare_migration() -> RuntimeResult<GNSData> {
// load gns
let gns = loader::load_gns()?;
// load sysdb

@ -42,11 +42,11 @@ pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver};
use {
super::{rw::SDSSFileIO, spec},
crate::engine::{core::model::Model, error::RuntimeResult},
crate::engine::{core::model::ModelData, error::RuntimeResult},
};
/// Re-initialize an existing batch journal and read all its data into model
pub fn reinit(name: &str, model: &Model) -> RuntimeResult<DataBatchPersistDriver> {
pub fn reinit(name: &str, model: &ModelData) -> RuntimeResult<DataBatchPersistDriver> {
let (f, _header) = SDSSFileIO::open::<spec::DataBatchJournalV1>(name)?;
// restore
let mut restore_driver = DataBatchRestoreDriver::new(f)?;

@ -32,7 +32,7 @@ use {
crate::engine::{
core::{
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{delta::DeltaVersion, Model},
model::{delta::DeltaVersion, ModelData},
},
data::{cell::Datacell, tag::TagUnique},
error::{RuntimeResult, StorageError},
@ -120,7 +120,7 @@ impl DataBatchRestoreDriver {
}
pub(in crate::engine::storage::v1) fn read_data_batch_into_model(
&mut self,
model: &Model,
model: &ModelData,
) -> RuntimeResult<()> {
self.read_all_batches_and_for_each(|batch| {
// apply the batch
@ -199,7 +199,7 @@ impl DataBatchRestoreDriver {
impl DataBatchRestoreDriver {
fn apply_batch(
m: &Model,
m: &ModelData,
NormalBatch {
events,
schema_version,

@ -27,7 +27,7 @@
use {
self::raw::JournalAdapter,
crate::engine::{
core::GlobalNS, error::TransactionError, mem::BufferedScanner,
core::GNSData, error::TransactionError, mem::BufferedScanner,
storage::common_encoding::r1::impls::gns::GNSEvent, txn::gns, RuntimeResult,
},
};
@ -47,7 +47,7 @@ pub struct GNSAdapter;
impl JournalAdapter for GNSAdapter {
const RECOVERY_PLUGIN: bool = true;
type JournalEvent = GNSSuperEvent;
type GlobalState = GlobalNS;
type GlobalState = GNSData;
type Error = crate::engine::fractal::error::Error;
fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> {
b
@ -58,7 +58,7 @@ impl JournalAdapter for GNSAdapter {
[$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())]
};
}
static DISPATCH: [fn(&mut BufferedScanner, &GlobalNS) -> RuntimeResult<()>; 9] = dispatch!(
static DISPATCH: [fn(&mut BufferedScanner, &GNSData) -> RuntimeResult<()>; 9] = dispatch!(
gns::space::CreateSpaceTxn,
gns::space::AlterSpaceTxn,
gns::space::DropSpaceTxn,

@ -31,7 +31,7 @@ use {
},
crate::{
engine::{
core::GlobalNS,
core::GNSData,
storage::{
common_encoding::r1::impls::gns::GNSEvent,
v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent},
@ -56,14 +56,15 @@ use {
*/
pub type GNSDriver = EventLogDriver<GNSEventLog>;
#[derive(Debug)]
pub struct GNSEventLog;
impl GNSDriver {
const FILE_PATH: &'static str = "gns.db-tlog";
pub fn open_gns_with_name(name: &str, gs: &GlobalNS) -> RuntimeResult<Self> {
pub fn open_gns_with_name(name: &str, gs: &GNSData) -> RuntimeResult<Self> {
journal::open_journal(name, gs)
}
pub fn open_gns(gs: &GlobalNS) -> RuntimeResult<Self> {
pub fn open_gns(gs: &GNSData) -> RuntimeResult<Self> {
Self::open_gns_with_name(Self::FILE_PATH, gs)
}
pub fn create_gns_with_name(name: &str) -> RuntimeResult<Self> {
@ -83,10 +84,10 @@ macro_rules! make_dispatch {
impl EventLogSpec for GNSEventLog {
type Spec = SystemDatabaseV1;
type GlobalState = GlobalNS;
type GlobalState = GNSData;
type EventMeta = GNSTransactionCode;
type DecodeDispatch =
[fn(&GlobalNS, Vec<u8>) -> RuntimeResult<()>; GNSTransactionCode::VARIANT_COUNT];
[fn(&GNSData, Vec<u8>) -> RuntimeResult<()>; GNSTransactionCode::VARIANT_COUNT];
const DECODE_DISPATCH: Self::DecodeDispatch = make_dispatch![
CreateSpaceTxn,
AlterSpaceTxn,

@ -31,7 +31,7 @@ use {
index::{DcFieldIndex, PrimaryIndexKey, Row, RowData},
model::{
delta::{DataDelta, DataDeltaKind, DeltaVersion},
Model,
ModelData,
},
},
data::{
@ -41,10 +41,7 @@ use {
error::StorageError,
idx::{MTIndex, STIndex, STIndexSeq},
storage::{
common::{
interface::fs::File,
sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter},
},
common::sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter},
common_encoding::r1,
v2::raw::{
journal::{
@ -69,7 +66,7 @@ use {
pub type ModelDriver = BatchDriver<ModelDataAdapter>;
impl ModelDriver {
pub fn open_model_driver(mdl: &Model, model_data_file_path: &str) -> RuntimeResult<Self> {
pub fn open_model_driver(mdl: &ModelData, model_data_file_path: &str) -> RuntimeResult<Self> {
journal::open_journal(model_data_file_path, mdl)
}
/// Create a new event log
@ -79,6 +76,7 @@ impl ModelDriver {
}
/// The model data adapter (abstract journal adapter impl)
#[derive(Debug)]
pub struct ModelDataAdapter;
#[derive(Debug, PartialEq, Clone, Copy, TaggedEnum)]
@ -110,7 +108,7 @@ pub enum EventType {
*/
struct RowWriter<'b> {
f: &'b mut TrackedWriter<File, <BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec>,
f: &'b mut TrackedWriter<<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec>,
}
impl<'b> RowWriter<'b> {
@ -118,7 +116,7 @@ impl<'b> RowWriter<'b> {
/// - pk tag
/// - schema version
/// - column count
fn write_row_global_metadata(&mut self, model: &Model) -> RuntimeResult<()> {
fn write_row_global_metadata(&mut self, model: &ModelData) -> RuntimeResult<()> {
self.f
.dtrack_write(&[model.p_tag().tag_unique().value_u8()])?;
self.f.dtrack_write(
@ -191,7 +189,7 @@ impl<'b> RowWriter<'b> {
Ok(())
}
/// Encode row data
fn write_row_data(&mut self, model: &Model, row_data: &RowData) -> RuntimeResult<()> {
fn write_row_data(&mut self, model: &ModelData, row_data: &RowData) -> RuntimeResult<()> {
for field_name in model.fields().stseq_ord_key() {
match row_data.fields().get(field_name) {
Some(cell) => {
@ -206,7 +204,7 @@ impl<'b> RowWriter<'b> {
}
struct BatchWriter<'a, 'b> {
model: &'a Model,
model: &'a ModelData,
row_writer: RowWriter<'b>,
g: &'a Guard,
sync_count: usize,
@ -214,10 +212,10 @@ struct BatchWriter<'a, 'b> {
impl<'a, 'b> BatchWriter<'a, 'b> {
fn write_batch(
model: &'a Model,
model: &'a ModelData,
g: &'a Guard,
expected: usize,
f: &'b mut TrackedWriter<File, <BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec>,
f: &'b mut TrackedWriter<<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec>,
batch_stat: &mut BatchStats,
) -> RuntimeResult<usize> {
/*
@ -248,9 +246,9 @@ impl<'a, 'b> BatchWriter<'a, 'b> {
Ok(me.sync_count)
}
fn new(
model: &'a Model,
model: &'a ModelData,
g: &'a Guard,
f: &'b mut TrackedWriter<File, <BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec>,
f: &'b mut TrackedWriter<<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec>,
) -> RuntimeResult<Self> {
let mut row_writer = RowWriter { f };
row_writer.write_row_global_metadata(model)?;
@ -293,10 +291,10 @@ impl<'a, 'b> BatchWriter<'a, 'b> {
}
/// A standard model batch where atmost the given number of keys are flushed
pub struct StdModelBatch<'a>(&'a Model, usize);
pub struct StdModelBatch<'a>(&'a ModelData, usize);
impl<'a> StdModelBatch<'a> {
pub fn new(model: &'a Model, observed_len: usize) -> Self {
pub fn new(model: &'a ModelData, observed_len: usize) -> Self {
Self(model, observed_len)
}
}
@ -307,10 +305,7 @@ impl<'a> JournalAdapterEvent<BatchAdapter<ModelDataAdapter>> for StdModelBatch<'
}
fn write_direct(
self,
writer: &mut TrackedWriter<
File,
<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec,
>,
writer: &mut TrackedWriter<<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec>,
ctx: Rc<RefCell<BatchStats>>,
) -> RuntimeResult<()> {
// [expected commit]
@ -326,10 +321,10 @@ impl<'a> JournalAdapterEvent<BatchAdapter<ModelDataAdapter>> for StdModelBatch<'
}
}
pub struct FullModel<'a>(&'a Model);
pub struct FullModel<'a>(&'a ModelData);
impl<'a> FullModel<'a> {
pub fn new(model: &'a Model) -> Self {
pub fn new(model: &'a ModelData) -> Self {
Self(model)
}
}
@ -340,7 +335,7 @@ impl<'a> JournalAdapterEvent<BatchAdapter<ModelDataAdapter>> for FullModel<'a> {
}
fn write_direct(
self,
f: &mut TrackedWriter<File, <BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec>,
f: &mut TrackedWriter<<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec>,
_: Rc<RefCell<BatchStats>>,
) -> RuntimeResult<()> {
let g = pin();
@ -432,7 +427,7 @@ impl BatchStats {
impl BatchAdapterSpec for ModelDataAdapter {
type Spec = ModelDataBatchAofV1;
type GlobalState = Model;
type GlobalState = ModelData;
type BatchType = BatchType;
type EventType = EventType;
type BatchMetadata = BatchMetadata;

@ -27,7 +27,7 @@
use {
crate::{
engine::{
core::{dml, index::RowData, model::Model, space::Space, EntityID, EntityIDRef},
core::{dml, index::RowData, model::ModelData, space::Space, EntityID, EntityIDRef},
data::lit::Lit,
error::QueryResult,
fractal::{test_utils::TestGlobal, GlobalInstanceLike},
@ -75,7 +75,7 @@ fn create_model_and_space(global: &TestGlobal, create_model: &str) -> QueryResul
let create_space_tokens = lex_insecure(create_space_str.as_bytes()).unwrap();
let create_space: CreateSpace = ast::parse_ast_node_full(&create_space_tokens[2..]).unwrap();
Space::transactional_exec_create(global, create_space)?;
Model::transactional_exec_create(global, create_model).map(|_| mdl_name)
ModelData::transactional_exec_create(global, create_model).map(|_| mdl_name)
}
fn run_insert(global: &TestGlobal, insert: &str) -> QueryResult<()> {
@ -144,6 +144,7 @@ fn run_sample_inserts<K, V>(
global.load_model_drivers().unwrap();
global
.state()
.namespace()
.with_model(
EntityIDRef::new(mdl_name.space(), mdl_name.entity()),
|model| {
@ -225,6 +226,7 @@ fn run_sample_updates<K, V>(
{
global
.state()
.namespace()
.with_model(
EntityIDRef::new(mdl_name.space(), mdl_name.entity()),
|model| {

@ -31,9 +31,9 @@ use {
config::Configuration,
core::{
system_db::{SystemDatabase, VerifyUser},
GlobalNS,
GNSData, GlobalNS,
},
fractal::{context, ModelDrivers, ModelUniqueID},
fractal::{context, FractalGNSDriver},
storage::common::paths_v1,
txn::{
gns::{
@ -54,8 +54,7 @@ pub(super) mod raw;
pub const GNS_PATH: &str = v1::GNS_PATH;
pub const DATA_DIR: &str = v1::DATA_DIR;
pub fn recreate(gns: GlobalNS) -> RuntimeResult<SELoaded> {
let model_drivers = ModelDrivers::empty();
pub fn recreate(gns: GNSData) -> RuntimeResult<SELoaded> {
context::set_dmsg("creating gns");
let mut gns_driver = impls::gns_log::GNSDriver::create_gns()?;
// create all spaces
@ -67,41 +66,37 @@ pub fn recreate(gns: GlobalNS) -> RuntimeResult<SELoaded> {
// create all models
context::set_dmsg("creating all models");
for (model_id, model) in gns.idx_models().read().iter() {
let model_data = model.data();
let space_uuid = gns.idx().read().get(model_id.space()).unwrap().get_uuid();
FileSystem::create_dir_all(&paths_v1::model_dir(
model_id.space(),
space_uuid,
model_id.entity(),
model.get_uuid(),
model_data.get_uuid(),
))?;
let mut model_driver = ModelDriver::create_model_driver(&paths_v1::model_path(
model_id.space(),
space_uuid,
model_id.entity(),
model.get_uuid(),
model_data.get_uuid(),
))?;
gns_driver.commit_event(CreateModelTxn::new(
SpaceIDRef::with_uuid(model_id.space(), space_uuid),
model_id.entity(),
model,
model_data,
))?;
model_driver.commit_with_ctx(FullModel::new(model), BatchStats::new())?;
model_drivers.add_driver(
ModelUniqueID::new(model_id.space(), model_id.entity(), model.get_uuid()),
model_driver,
);
model_driver.commit_with_ctx(FullModel::new(model_data), BatchStats::new())?;
model.driver().initialize_model_driver(model_driver);
}
Ok(SELoaded {
gns,
gns_driver,
model_drivers,
gns: GlobalNS::new(gns, FractalGNSDriver::new(gns_driver)),
})
}
pub fn initialize_new(config: &Configuration) -> RuntimeResult<SELoaded> {
FileSystem::create_dir_all(DATA_DIR)?;
let mut gns_driver = impls::gns_log::GNSDriver::create_gns()?;
let gns = GlobalNS::empty();
let gns = GNSData::empty();
let password_hash = rcrypt::hash(&config.auth.root_key, rcrypt::DEFAULT_COST).unwrap();
// now go ahead and initialize our root user
gns_driver.commit_event(CreateUserTxn::new(
@ -113,31 +108,26 @@ pub fn initialize_new(config: &Configuration) -> RuntimeResult<SELoaded> {
password_hash.into_boxed_slice(),
));
Ok(SELoaded {
gns,
gns_driver,
model_drivers: ModelDrivers::empty(),
gns: GlobalNS::new(gns, FractalGNSDriver::new(gns_driver)),
})
}
pub fn restore(cfg: &Configuration) -> RuntimeResult<SELoaded> {
let gns = GlobalNS::empty();
let gns = GNSData::empty();
context::set_dmsg("loading gns");
let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns)?;
let model_drivers = ModelDrivers::empty();
for (id, model) in gns.idx_models().write().iter_mut() {
let model_data = model.data();
let space_uuid = gns.idx().read().get(id.space()).unwrap().get_uuid();
let model_data_file_path =
paths_v1::model_path(id.space(), space_uuid, id.entity(), model.get_uuid());
paths_v1::model_path(id.space(), space_uuid, id.entity(), model_data.get_uuid());
context::set_dmsg(format!("loading model driver in {model_data_file_path}"));
let model_driver =
impls::mdl_journal::ModelDriver::open_model_driver(model, &model_data_file_path)?;
model_drivers.add_driver(
ModelUniqueID::new(id.space(), id.entity(), model.get_uuid()),
model_driver,
);
impls::mdl_journal::ModelDriver::open_model_driver(model_data, &model_data_file_path)?;
model.driver().initialize_model_driver(model_driver);
unsafe {
// UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum
model.model_mutator().vacuum_stashed();
model.data_mut().model_mutator().vacuum_stashed();
}
}
// check if password has changed
@ -155,8 +145,6 @@ pub fn restore(cfg: &Configuration) -> RuntimeResult<SELoaded> {
.__raw_alter_user(SystemDatabase::ROOT_ACCOUNT, phash.into_boxed_slice());
}
Ok(SELoaded {
gns,
gns_driver,
model_drivers,
gns: GlobalNS::new(gns, FractalGNSDriver::new(gns_driver)),
})
}

@ -24,8 +24,6 @@
*
*/
use crate::engine::storage::common::interface::fs::File;
use {
self::raw::{CommitPreference, RawJournalAdapterEvent, RawJournalWriter},
crate::{
@ -65,6 +63,7 @@ pub use raw::{
/// An event log driver
pub type EventLogDriver<EL> = RawJournalWriter<EventLogAdapter<EL>>;
/// The event log adapter
#[derive(Debug)]
pub struct EventLogAdapter<EL: EventLogSpec>(PhantomData<EL>);
type DispatchFn<G> = fn(&G, Vec<u8>) -> RuntimeResult<()>;
@ -103,7 +102,7 @@ impl<EL: EventLogSpec> RawJournalAdapter for EventLogAdapter<EL> {
}
fn commit_direct<'a, E>(
&mut self,
w: &mut TrackedWriter<File, Self::Spec>,
w: &mut TrackedWriter<Self::Spec>,
ev: E,
ctx: (),
) -> RuntimeResult<()>
@ -160,6 +159,7 @@ impl<EL: EventLogSpec> RawJournalAdapter for EventLogAdapter<EL> {
/// Batch journal driver
pub type BatchDriver<BA> = RawJournalWriter<BatchAdapter<BA>>;
/// Batch journal adapter
#[derive(Debug)]
pub struct BatchAdapter<BA: BatchAdapterSpec>(PhantomData<BA>);
#[cfg(test)]
@ -245,7 +245,7 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
}
fn commit_direct<'a, E>(
&mut self,
w: &mut TrackedWriter<File, Self::Spec>,
w: &mut TrackedWriter<Self::Spec>,
ev: E,
ctx: Self::CommitContext,
) -> RuntimeResult<()>

@ -33,7 +33,6 @@ use {
mem::unsafe_apis::memcpy,
storage::common::{
checksum::SCrc64,
interface::fs::File,
sdss::sdss_r1::{
rw::{SdssFile, TrackedReader, TrackedWriter},
FileSpecV1,
@ -41,6 +40,7 @@ use {
},
RuntimeResult,
},
core::fmt,
std::ops::Range,
};
@ -212,7 +212,7 @@ pub trait RawJournalAdapterEvent<CA: RawJournalAdapter>: Sized {
fn md(&self) -> u64;
fn write_direct(
self,
_: &mut TrackedWriter<File, <CA as RawJournalAdapter>::Spec>,
_: &mut TrackedWriter<<CA as RawJournalAdapter>::Spec>,
_: <CA as RawJournalAdapter>::CommitContext,
) -> RuntimeResult<()> {
unimplemented!()
@ -251,7 +251,7 @@ pub trait RawJournalAdapter: Sized {
/// commit event (direct preference)
fn commit_direct<E>(
&mut self,
_: &mut TrackedWriter<File, Self::Spec>,
_: &mut TrackedWriter<Self::Spec>,
_: E,
_: Self::CommitContext,
) -> RuntimeResult<()>
@ -468,12 +468,27 @@ pub(super) enum DriverEventKind {
/// A low-level journal writer
pub struct RawJournalWriter<J: RawJournalAdapter> {
j: J,
log_file: TrackedWriter<File, <J as RawJournalAdapter>::Spec>,
log_file: TrackedWriter<<J as RawJournalAdapter>::Spec>,
txn_id: u64,
known_txn_id: u64,
known_txn_offset: u64, // if offset is 0, txn id is unset
}
impl<J: RawJournalAdapter + fmt::Debug> fmt::Debug for RawJournalWriter<J>
where
<J::Spec as FileSpecV1>::Metadata: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawJournalWriter")
.field("j", &self.j)
.field("log_file", &self.log_file)
.field("txn_id", &self.txn_id)
.field("known_txn_id", &self.known_txn_id)
.field("known_txn_offset", &self.known_txn_offset)
.finish()
}
}
const SERVER_EV_MASK: u64 = 1 << (u64::BITS - 1);
impl<J: RawJournalAdapter> RawJournalWriter<J> {

@ -38,10 +38,7 @@ use {
error::StorageError,
mem::unsafe_apis,
storage::{
common::{
interface::fs::File,
sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter},
},
common::sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter},
v2::raw::{
journal::raw::{create_journal, open_journal, RawJournalWriter},
spec::{ModelDataBatchAofV1, SystemDatabaseV1},
@ -285,7 +282,6 @@ impl<'a> RawJournalAdapterEvent<BatchAdapter<BatchDBAdapter>> for BatchDBFlush<'
fn write_direct(
self,
f: &mut TrackedWriter<
File,
<BatchAdapter<BatchDBAdapter> as super::raw::RawJournalAdapter>::Spec,
>,
ctx: Rc<RefCell<BatchContext>>,

@ -70,6 +70,7 @@ impl sdss::sdss_r1::HeaderV1Enumeration for FileSpecifier {
}
}
#[derive(Debug)]
pub struct HeaderImplV2;
impl sdss::sdss_r1::HeaderV1Spec for HeaderImplV2 {
type FileClass = FileClass;

@ -25,7 +25,7 @@
*/
use crate::engine::{
core::model::{Field, Model},
core::model::{Field, ModelData},
idx::{IndexST, IndexSTSeqCns},
ql::lex::Ident,
txn::{ModelIDRef, SpaceIDRef},
@ -44,11 +44,11 @@ impl_gns_event!(
pub struct CreateModelTxn<'a> {
space_id: SpaceIDRef<'a>,
model_name: &'a str,
model: &'a Model,
model: &'a ModelData,
}
impl<'a> CreateModelTxn<'a> {
pub const fn new(space_id: SpaceIDRef<'a>, model_name: &'a str, model: &'a Model) -> Self {
pub const fn new(space_id: SpaceIDRef<'a>, model_name: &'a str, model: &'a ModelData) -> Self {
Self {
space_id,
model_name,
@ -61,7 +61,7 @@ impl<'a> CreateModelTxn<'a> {
pub fn model_name(&self) -> &str {
self.model_name
}
pub fn model(&self) -> &Model {
pub fn model(&self) -> &ModelData {
self.model
}
}

@ -25,7 +25,7 @@
*/
use crate::engine::{
core::{model::Model, space::Space},
core::{model::ModelData, space::Space},
data::uuid::Uuid,
};
@ -63,7 +63,7 @@ impl<'a> ModelIDRef<'a> {
space_name: &'a str,
space: &'a Space,
model_name: &'a str,
model: &'a Model,
model: &'a ModelData,
) -> ModelIDRef<'a> {
ModelIDRef::new(
SpaceIDRef::new(space_name, space),

Loading…
Cancel
Save