Recreate compacted model data file on upgrade

next
Sayan Nandan 7 months ago
parent d6f9b54868
commit e8e62cf924
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -61,7 +61,7 @@ fn alter_user(
}
let (username, password) = get_user_data(user)?;
global
.namespace()
.state()
.sys_db()
.alter_user(global, &username, &password)
}
@ -69,7 +69,7 @@ fn alter_user(
fn create_user(global: &impl GlobalInstanceLike, user: UserDecl) -> QueryResult<()> {
let (username, password) = get_user_data(user)?;
global
.namespace()
.state()
.sys_db()
.create_user(global, username.into_boxed_str(), &password)
}
@ -98,7 +98,7 @@ fn drop_user(
return Err(QueryError::SysAuthError);
}
global
.namespace()
.state()
.sys_db()
.drop_user(global, user_del.username())
}

@ -39,7 +39,7 @@ pub fn inspect(
let ret = match stmt {
Inspect::Global => {
// collect spaces
let spaces = g.namespace().idx().read();
let spaces = g.state().idx().read();
let mut spaces_iter = spaces.iter().peekable();
let mut ret = format!("{{\"spaces\":[");
while let Some((space, _)) = spaces_iter.next() {
@ -56,7 +56,7 @@ pub fn inspect(
drop(spaces_iter);
drop(spaces);
// collect users
let users = g.namespace().sys_db().users().read();
let users = g.state().sys_db().users().read();
let mut users_iter = users.iter().peekable();
while let Some((user, _)) = users_iter.next() {
ret.push('"');
@ -70,7 +70,7 @@ pub fn inspect(
ret.push_str("],\"settings\":{}}");
ret
}
Inspect::Model(m) => match g.namespace().idx_models().read().get(&m) {
Inspect::Model(m) => match g.state().idx_models().read().get(&m) {
Some(m) => format!(
"{{\"decl\":\"{}\",\"rows\":{},\"properties\":{{}}}}",
m.describe(),
@ -78,7 +78,7 @@ pub fn inspect(
),
None => return Err(QueryError::QExecObjectNotFound),
},
Inspect::Space(s) => match g.namespace().idx().read().get(s.as_str()) {
Inspect::Space(s) => match g.state().idx().read().get(s.as_str()) {
Some(s) => {
let mut ret = format!("{{\"models\":[");
let mut models_iter = s.models().iter().peekable();

@ -94,7 +94,7 @@ where
Fm: FnMut(&mut T, &Model, usize),
F: FnMut(&mut T, &Datacell, usize),
{
global.namespace().with_model(select.entity, |mdl| {
global.state().with_model(select.entity, |mdl| {
let g = sync::atm::cpin();
let mut i = 0;
if select.wildcard {
@ -181,7 +181,7 @@ pub fn select_custom<F>(
where
F: FnMut(&Datacell),
{
global.namespace().with_model(select.entity(), |mdl| {
global.state().with_model(select.entity(), |mdl| {
let target_key = mdl.resolve_where(select.clauses_mut())?;
let pkdc = VirtualDatacell::new(target_key.clone(), mdl.p_tag().tag_unique());
let g = sync::atm::cpin();

@ -201,7 +201,7 @@ fn cstate_use(
NB: just like SQL, we don't really care about what this is set to as it's basically a shorthand.
so we do a simple vanity check
*/
if !global.namespace().contains_space(new_space.as_str()) {
if !global.state().contains_space(new_space.as_str()) {
return Err(QueryError::QExecObjectNotFound);
}
cstate.set_cs(new_space.boxed_str());
@ -209,7 +209,7 @@ fn cstate_use(
Use::RefreshCurrent => match cstate.get_cs() {
None => return Ok(Response::Null),
Some(space) => {
if !global.namespace().contains_space(space) {
if !global.state().contains_space(space) {
cstate.unset_cs();
return Err(QueryError::QExecObjectNotFound);
}

@ -153,7 +153,7 @@ pub(self) fn with_model_for_data_update<'a, F>(
where
F: FnOnce(&Model) -> QueryResult<QueryExecMeta>,
{
let mdl_idx = global.namespace().idx_mdl.read();
let mdl_idx = global.state().idx_mdl.read();
let Some(model) = mdl_idx.get(&entity) else {
return Err(QueryError::QExecObjectNotFound);
};

@ -252,7 +252,7 @@ impl Model {
) -> QueryResult<()> {
let (space_name, model_name) = (alter.model.space(), alter.model.entity());
global
.namespace()
.state()
.with_model_space_mut_for_ddl(alter.model, |space, model| {
// prepare plan
let plan = AlterPlan::fdeltas(model, alter)?;
@ -273,7 +273,7 @@ impl Model {
&new_fields,
);
// commit txn
global.gns_driver().lock().gns_driver().commit_event(txn)?;
global.gns_driver().lock().driver().commit_event(txn)?;
}
let mut mutator = model.model_mutator();
new_fields
@ -291,7 +291,7 @@ impl Model {
&removed,
);
// commit txn
global.gns_driver().lock().gns_driver().commit_event(txn)?;
global.gns_driver().lock().driver().commit_event(txn)?;
}
let mut mutator = model.model_mutator();
removed.iter().for_each(|field_id| {
@ -306,7 +306,7 @@ impl Model {
&updated,
);
// commit txn
global.gns_driver().lock().gns_driver().commit_event(txn)?;
global.gns_driver().lock().driver().commit_event(txn)?;
}
let mut mutator = model.model_mutator();
updated.into_iter().for_each(|(field_id, field)| {

@ -268,7 +268,7 @@ impl Model {
let (space_name, model_name) = (stmt.model_name.space(), stmt.model_name.entity());
let if_nx = stmt.if_not_exists;
let model = Self::process_create(stmt)?;
global.namespace().ddl_with_space_mut(&space_name, |space| {
global.state().ddl_with_space_mut(&space_name, |space| {
// TODO(@ohsayan): be extra cautious with post-transactional tasks (memck)
if space.models().contains(model_name) {
if if_nx {
@ -294,7 +294,7 @@ impl Model {
model.get_uuid(),
)?;
// commit txn
match txn_driver.gns_driver().commit_event(txn) {
match txn_driver.driver().commit_event(txn) {
Ok(()) => {}
Err(e) => {
// failed to commit, request cleanup
@ -313,7 +313,7 @@ impl Model {
// update global state
let _ = space.models_mut().insert(model_name.into());
let _ = global
.namespace()
.state()
.idx_models()
.write()
.insert(EntityID::new(&space_name, &model_name), model);
@ -329,7 +329,7 @@ impl Model {
stmt: DropModel,
) -> QueryResult<Option<bool>> {
let (space_name, model_name) = (stmt.entity.space(), stmt.entity.entity());
global.namespace().ddl_with_space_mut(&space_name, |space| {
global.state().ddl_with_space_mut(&space_name, |space| {
if !space.models().contains(model_name) {
if stmt.if_exists {
return Ok(Some(false));
@ -339,7 +339,7 @@ impl Model {
}
}
// get exclusive lock on models
let mut models_idx = global.namespace().idx_models().write();
let mut models_idx = global.state().idx_models().write();
let model = models_idx
.get(&EntityIDRef::new(&space_name, &model_name))
.unwrap();
@ -358,7 +358,7 @@ impl Model {
model.delta_state().schema_current_version().value_u64(),
));
// commit txn
global.gns_driver().lock().gns_driver().commit_event(txn)?;
global.gns_driver().lock().driver().commit_event(txn)?;
// request cleanup
global.purge_model_driver(
space_name,

@ -158,7 +158,7 @@ impl Space {
if_not_exists,
} = Self::process_create(space)?;
// lock the global namespace
global.namespace().ddl_with_spaces_write(|spaces| {
global.state().ddl_with_spaces_write(|spaces| {
if spaces.st_contains(&space_name) {
if if_not_exists {
return Ok(Some(false));
@ -176,7 +176,7 @@ impl Space {
space.get_uuid(),
))?;
// commit txn
match global.gns_driver().lock().gns_driver().commit_event(txn) {
match global.gns_driver().lock().driver().commit_event(txn) {
Ok(()) => {}
Err(e) => {
// tell fractal to clean it up sometime
@ -204,7 +204,7 @@ impl Space {
updated_props,
}: AlterSpace,
) -> QueryResult<()> {
global.namespace().ddl_with_space_mut(&space_name, |space| {
global.state().ddl_with_space_mut(&space_name, |space| {
match updated_props.get(Self::KEY_ENV) {
Some(DictEntryGeneric::Map(_)) if updated_props.len() == 1 => {}
Some(DictEntryGeneric::Data(l)) if updated_props.len() == 1 && l.is_null() => {}
@ -223,7 +223,7 @@ impl Space {
&patch,
);
// commit
global.gns_driver().lock().gns_driver().commit_event(txn)?;
global.gns_driver().lock().driver().commit_event(txn)?;
}
// merge
dict::rmerge_data_with_patch(space.props_mut(), patch);
@ -244,7 +244,7 @@ impl Space {
}: DropSpace,
) -> QueryResult<Option<bool>> {
if force {
global.namespace().ddl_with_all_mut(|spaces, models| {
global.state().ddl_with_all_mut(|spaces, models| {
let Some(space) = spaces.remove(space_name.as_str()) else {
if if_exists {
return Ok(Some(false));
@ -258,7 +258,7 @@ impl Space {
let txn =
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global.gns_driver().lock().gns_driver().commit_event(txn)?;
global.gns_driver().lock().driver().commit_event(txn)?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()),
@ -287,7 +287,7 @@ impl Space {
}
})
} else {
global.namespace().ddl_with_spaces_write(|spaces| {
global.state().ddl_with_spaces_write(|spaces| {
let Some(space) = spaces.get(space_name.as_str()) else {
if if_exists {
return Ok(Some(false));
@ -305,7 +305,7 @@ impl Space {
let txn =
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global.gns_driver().lock().gns_driver().commit_event(txn)?;
global.gns_driver().lock().driver().commit_event(txn)?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()),

@ -41,16 +41,18 @@ pub struct SystemDatabase {
#[derive(Debug, PartialEq)]
pub struct User {
password: Box<[u8]>,
phash: Box<[u8]>,
}
impl User {
pub fn new(password: Box<[u8]>) -> Self {
Self { password }
pub fn new(password_hash: Box<[u8]>) -> Self {
Self {
phash: password_hash,
}
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum VerifyUser {
NotFound,
IncorrectPassword,
@ -74,12 +76,12 @@ impl SystemDatabase {
pub fn users(&self) -> &RWLIdx<Box<str>, User> {
&self.users
}
pub fn __verify_user(&self, username: &str, password: &[u8]) -> VerifyUser {
pub fn verify_user(&self, username: &str, password: &[u8]) -> VerifyUser {
self.users
.read()
.get(username)
.map(|user| {
if rcrypt::verify(password, &user.password).unwrap() {
if rcrypt::verify(password, &user.phash).unwrap() {
if username == Self::ROOT_ACCOUNT {
VerifyUser::OkayRoot
} else {
@ -91,22 +93,25 @@ impl SystemDatabase {
})
.unwrap_or(VerifyUser::NotFound)
}
pub fn __insert_user(&self, username: Box<str>, password: Box<[u8]>) -> bool {
}
impl SystemDatabase {
pub fn __raw_create_user(&self, username: Box<str>, password_hash: Box<[u8]>) -> bool {
match self.users.write().entry(username) {
Entry::Vacant(ve) => {
ve.insert(User::new(password));
ve.insert(User::new(password_hash));
true
}
Entry::Occupied(_) => false,
}
}
pub fn __delete_user(&self, username: &str) -> bool {
pub fn __raw_delete_user(&self, username: &str) -> bool {
self.users.write().remove(username).is_some()
}
pub fn __change_user_password(&self, username: &str, new_password: Box<[u8]>) -> bool {
pub fn __raw_alter_user(&self, username: &str, new_password_hash: Box<[u8]>) -> bool {
match self.users.write().get_mut(username) {
Some(user) => {
user.password = new_password;
user.phash = new_password_hash;
true
}
None => false,
@ -129,7 +134,7 @@ impl SystemDatabase {
match global
.gns_driver()
.lock()
.gns_driver()
.driver()
.commit_event(CreateUserTxn::new(&username, &password_hash))
{
Ok(()) => {
@ -154,11 +159,11 @@ impl SystemDatabase {
match global
.gns_driver()
.lock()
.gns_driver()
.driver()
.commit_event(AlterUserTxn::new(username, &password_hash))
{
Ok(()) => {
user.password = password_hash.into_boxed_slice();
user.phash = password_hash.into_boxed_slice();
Ok(())
}
Err(e) => {
@ -178,7 +183,7 @@ impl SystemDatabase {
match global
.gns_driver()
.lock()
.gns_driver()
.driver()
.commit_event(DropUserTxn::new(username))
{
Ok(()) => {

@ -55,7 +55,7 @@ fn exec_plan(
let mdl_name = exec_create(global, model, new_space)?;
let prev_uuid = {
global
.namespace()
.state()
.idx_models()
.read()
.get(&EntityIDRef::new("myspace", &mdl_name))
@ -65,7 +65,7 @@ fn exec_plan(
let tok = lex_insecure(plan.as_bytes()).unwrap();
let alter = parse_ast_node_full::<AlterModel>(&tok[2..]).unwrap();
Model::transactional_exec_alter(global, alter)?;
let models = global.namespace().idx_models().read();
let models = global.state().idx_models().read();
let model = models.get(&EntityIDRef::new("myspace", &mdl_name)).unwrap();
assert_eq!(prev_uuid, model.get_uuid());
f(model);

@ -51,7 +51,7 @@ pub fn exec_create(
let name = create_model.model_name.entity().to_owned();
if create_new_space {
global
.namespace()
.state()
.create_empty_test_space(create_model.model_name.space())
}
Model::transactional_exec_create(global, create_model).map(|_| name)
@ -70,7 +70,7 @@ fn with_model(
model_name: &str,
f: impl Fn(&Model),
) {
let models = global.namespace().idx_models().read();
let models = global.state().idx_models().read();
let model = models.get(&EntityIDRef::new(space_id, model_name)).unwrap();
f(model)
}

@ -48,7 +48,7 @@ fn exec_create(
ast::parse_ast_node_full::<crate::engine::ql::ddl::crt::CreateSpace>(&tok[2..]).unwrap();
let name = ast_node.space_name;
Space::transactional_exec_create(gns, ast_node)?;
gns.namespace().ddl_with_space_mut(&name, |space| {
gns.state().ddl_with_space_mut(&name, |space| {
verify(space);
Ok(space.get_uuid())
})
@ -64,7 +64,7 @@ fn exec_alter(
ast::parse_ast_node_full::<crate::engine::ql::ddl::alt::AlterSpace>(&tok[2..]).unwrap();
let name = ast_node.space_name;
Space::transactional_exec_alter(gns, ast_node)?;
gns.namespace().ddl_with_space_mut(&name, |space| {
gns.state().ddl_with_space_mut(&name, |space| {
verify(space);
Ok(space.get_uuid())
})

@ -44,7 +44,7 @@ use crate::engine::{
fn _exec_only_create_space_model(global: &impl GlobalInstanceLike, model: &str) -> QueryResult<()> {
let _ = global
.namespace()
.state()
.idx()
.write()
.insert("myspace".into(), Space::new_auto_all().into());
@ -73,7 +73,7 @@ fn _exec_only_read_key_and_then<T>(
and_then: impl Fn(Row) -> T,
) -> QueryResult<T> {
let guard = sync::atm::cpin();
global.namespace().with_model(entity, |mdl| {
global.state().with_model(entity, |mdl| {
let row = mdl
.primary_index()
.select(Lit::from(key_name), &guard)
@ -90,7 +90,7 @@ fn _exec_delete_only(global: &impl GlobalInstanceLike, delete: &str, key: &str)
let entity = delete.entity();
dml::delete(global, delete)?;
assert_eq!(
global.namespace().with_model(entity, |model| {
global.state().with_model(entity, |model| {
let g = sync::atm::cpin();
Ok(model.primary_index().select(key.into(), &g).is_none())
}),

@ -48,7 +48,7 @@ impl<Fs: FSInterface> FractalGNSDriver<Fs> {
txn_driver: txn_driver,
}
}
pub fn gns_driver(&mut self) -> &mut GNSDriver<Fs> {
pub fn driver(&mut self) -> &mut GNSDriver<Fs> {
&mut self.txn_driver
}
}

@ -102,7 +102,7 @@ pub trait GlobalInstanceLike {
// stat
fn get_max_delta_size(&self) -> usize;
// global namespace
fn namespace(&self) -> &GlobalNS;
fn state(&self) -> &GlobalNS;
fn gns_driver(&self) -> &Mutex<drivers::FractalGNSDriver<Self::FileSystem>>;
// model drivers
fn initialize_model_driver(
@ -150,7 +150,7 @@ pub trait GlobalInstanceLike {
impl GlobalInstanceLike for Global {
type FileSystem = LocalFS;
// ns
fn namespace(&self) -> &GlobalNS {
fn state(&self) -> &GlobalNS {
self._namespace()
}
fn gns_driver(&self) -> &Mutex<drivers::FractalGNSDriver<Self::FileSystem>> {

@ -90,7 +90,7 @@ impl TestGlobal<NullFS> {
impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
type FileSystem = Fs;
fn namespace(&self) -> &GlobalNS {
fn state(&self) -> &GlobalNS {
&self.gns
}
fn gns_driver(&self) -> &Mutex<FractalGNSDriver<Self::FileSystem>> {

@ -292,9 +292,9 @@ async fn do_handshake<S: Socket>(
match core::str::from_utf8(handshake.hs_auth().username()) {
Ok(uname) => {
match global
.namespace()
.state()
.sys_db()
.__verify_user(uname, handshake.hs_auth().password())
.verify_user(uname, handshake.hs_auth().password())
{
okay @ (VerifyUser::Okay | VerifyUser::OkayRoot) => {
let hs = handshake.hs_static();

@ -57,7 +57,7 @@ fn init_space(global: &impl GlobalInstanceLike, space_name: &str, env: &str) ->
let name = stmt.space_name;
Space::transactional_exec_create(global, stmt).unwrap();
global
.namespace()
.state()
.idx()
.read()
.get(name.as_str())
@ -76,7 +76,7 @@ fn create_space() {
}
multirun(|| {
let global = TestGlobal::new_with_vfs_driver(log_name);
let spaces = global.namespace().idx().read();
let spaces = global.state().idx().read();
let space = spaces.get("myspace").unwrap();
assert_eq!(
&*space,
@ -106,7 +106,7 @@ fn alter_space() {
}
multirun(|| {
let global = TestGlobal::new_with_vfs_driver(log_name);
let spaces = global.namespace().idx().read();
let spaces = global.state().idx().read();
let space = spaces.get("myspace").unwrap();
assert_eq!(
&*space,
@ -133,7 +133,7 @@ fn drop_space() {
}
multirun(|| {
let global = TestGlobal::new_with_vfs_driver(log_name);
assert!(global.namespace().idx().read().get("myspace").is_none());
assert!(global.state().idx().read().get("myspace").is_none());
})
})
}
@ -150,7 +150,7 @@ fn init_model(
let model_name = stmt.model_name;
Model::transactional_exec_create(global, stmt).unwrap();
global
.namespace()
.state()
.with_model(model_name, |model| Ok(model.get_uuid()))
.unwrap()
}
@ -177,7 +177,7 @@ fn create_model() {
multirun(|| {
let global = TestGlobal::new_with_vfs_driver(log_name);
global
.namespace()
.state()
.with_model(("myspace", "mymodel").into(), |model| {
assert_eq!(
model,
@ -215,7 +215,7 @@ fn alter_model_add() {
multirun(|| {
let global = TestGlobal::new_with_vfs_driver(log_name);
global
.namespace()
.state()
.with_model(("myspace", "mymodel").into(), |model| {
assert_eq!(
model.fields().st_get("profile_pic").unwrap(),
@ -250,7 +250,7 @@ fn alter_model_remove() {
multirun(|| {
let global = TestGlobal::new_with_vfs_driver(log_name);
global
.namespace()
.state()
.with_model(("myspace", "mymodel").into(), |model| {
assert!(model.fields().st_get("has_secure_key").is_none());
assert!(model.fields().st_get("is_dumb").is_none());
@ -282,7 +282,7 @@ fn alter_model_update() {
multirun(|| {
let global = TestGlobal::new_with_vfs_driver(log_name);
global
.namespace()
.state()
.with_model(("myspace", "mymodel").into(), |model| {
assert_eq!(
model.fields().st_get("profile_pic").unwrap(),
@ -310,7 +310,7 @@ fn drop_model() {
let global = TestGlobal::new_with_vfs_driver(log_name);
assert_eq!(
global
.namespace()
.state()
.with_model(("myspace", "mymodel").into(), |_| { Ok(()) })
.unwrap_err(),
QueryError::QExecObjectNotFound

@ -53,7 +53,7 @@ impl<'a> GNSEvent for CreateUserTxn<'a> {
FullUserDefinition { username, password }: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
if gns.sys_db().__insert_user(username, password) {
if gns.sys_db().__raw_create_user(username, password) {
Ok(())
} else {
Err(TransactionError::OnRestoreDataConflictAlreadyExists.into())
@ -140,7 +140,7 @@ impl<'a> GNSEvent for AlterUserTxn<'a> {
FullUserDefinition { username, password }: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
if gns.sys_db().__change_user_password(&username, password) {
if gns.sys_db().__raw_alter_user(&username, password) {
Ok(())
} else {
Err(TransactionError::OnRestoreDataConflictMismatch.into())
@ -202,7 +202,7 @@ impl<'a> GNSEvent for DropUserTxn<'a> {
DropUserPayload(username): Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
if gns.sys_db().__delete_user(&username) {
if gns.sys_db().__raw_delete_user(&username) {
Ok(())
} else {
Err(TransactionError::OnRestoreDataConflictMismatch.into())

@ -66,7 +66,6 @@ pub struct SELoaded {
}
pub fn load(cfg: &Configuration) -> RuntimeResult<SELoaded> {
info!("loading databases");
// first determine if this is a new install, an existing install or if it uses the old driver
if Path::new(v1::SYSDB_PATH).is_file() {
warn!("older storage format detected");
@ -86,6 +85,6 @@ pub fn load(cfg: &Configuration) -> RuntimeResult<SELoaded> {
} else {
info!("reinitializing databases");
context::set_dmsg("loading databases");
v2::restore()
v2::restore(cfg)
}
}

@ -51,7 +51,7 @@ pub fn load_gns_prepare_migration() -> RuntimeResult<GlobalNS> {
let RestoredSystemDatabase { users, .. } =
raw::sysdb::RestoredSystemDatabase::restore::<LocalFS>(SYSDB_PATH)?;
for (user, phash) in users {
gns.sys_db().__insert_user(user, phash);
gns.sys_db().__raw_create_user(user, phash);
}
// now move all our files into a backup directory
let backup_dir_path = format!(

@ -110,10 +110,28 @@ struct RowWriter<'b, Fs: FSInterface> {
}
impl<'b, Fs: FSInterface> RowWriter<'b, Fs> {
fn write_row_metadata(&mut self, delta: &DataDelta) -> RuntimeResult<()> {
fn write_static_metadata(&mut self, model: &Model) -> RuntimeResult<()> {
// write batch start information: [pk tag:1B][schema version][column count]
self.f
.dtrack_write(&[model.p_tag().tag_unique().value_u8()])?;
self.f.dtrack_write(
&model
.delta_state()
.schema_current_version()
.value_u64()
.to_le_bytes(),
)?;
self.f
.dtrack_write(&(model.fields().st_len() as u64).to_le_bytes())
}
fn write_row_metadata(
&mut self,
change: DataDeltaKind,
txn_id: DeltaVersion,
) -> RuntimeResult<()> {
if cfg!(debug) {
let event_kind = EventType::try_from_raw(delta.change().value_u8()).unwrap();
match (event_kind, delta.change()) {
let event_kind = EventType::try_from_raw(change.value_u8()).unwrap();
match (event_kind, change) {
(EventType::Delete, DataDeltaKind::Delete)
| (EventType::Insert, DataDeltaKind::Insert)
| (EventType::Update, DataDeltaKind::Update) => {}
@ -122,9 +140,9 @@ impl<'b, Fs: FSInterface> RowWriter<'b, Fs> {
}
}
// write [change type][txn id]
let change_type = [delta.change().value_u8()];
let change_type = [change.value_u8()];
self.f.dtrack_write(&change_type)?;
let txn_id = delta.data_version().value_u64().to_le_bytes();
let txn_id = txn_id.value_u64().to_le_bytes();
self.f.dtrack_write(&txn_id)?;
Ok(())
}
@ -235,19 +253,11 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec,
>,
) -> RuntimeResult<Self> {
// write batch start information: [pk tag:1B][schema version][column count]
f.dtrack_write(&[model.p_tag().tag_unique().value_u8()])?;
f.dtrack_write(
&model
.delta_state()
.schema_current_version()
.value_u64()
.to_le_bytes(),
)?;
f.dtrack_write(&(model.fields().st_len() as u64).to_le_bytes())?;
let mut row_writer = RowWriter { f };
row_writer.write_static_metadata(model)?;
Ok(Self {
model,
row_writer: RowWriter { f },
row_writer,
g,
sync_count: 0,
})
@ -255,7 +265,8 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
fn step(&mut self, delta: &DataDelta) -> RuntimeResult<()> {
match delta.change() {
DataDeltaKind::Delete => {
self.row_writer.write_row_metadata(&delta)?;
self.row_writer
.write_row_metadata(delta.change(), delta.data_version())?;
self.row_writer.write_row_pk(delta.row().d_key())?;
}
DataDeltaKind::Insert | DataDeltaKind::Update => {
@ -269,7 +280,8 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
// inconsistent read. there should already be another revised delta somewhere
return Ok(());
}
self.row_writer.write_row_metadata(&delta)?;
self.row_writer
.write_row_metadata(delta.change(), delta.data_version())?;
// encode data
self.row_writer.write_row_pk(delta.row().d_key())?;
self.row_writer.write_row_data(self.model, &row_data)?;
@ -312,6 +324,49 @@ impl<'a> JournalAdapterEvent<BatchAdapter<ModelDataAdapter>> for StdModelBatch<'
}
}
pub struct FullModel<'a>(&'a Model);
impl<'a> FullModel<'a> {
pub fn new(model: &'a Model) -> Self {
Self(model)
}
}
impl<'a> JournalAdapterEvent<BatchAdapter<ModelDataAdapter>> for FullModel<'a> {
fn md(&self) -> u64 {
BatchType::Standard.dscr_u64()
}
fn write_direct<Fs: FSInterface>(
self,
f: &mut TrackedWriter<
Fs::File,
<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec,
>,
) -> RuntimeResult<()> {
let g = pin();
let mut row_writer: RowWriter<'_, Fs> = RowWriter { f };
let index = self.0.primary_index().__raw_index();
let current_row_count = index.mt_len();
// expect commit == current row count
row_writer
.f
.dtrack_write(&current_row_count.u64_bytes_le())?;
// [pk tag][schema version][column cnt]
row_writer.write_static_metadata(self.0)?;
for (key, row_data) in index.mt_iter_kv(&g) {
let row_data = row_data.read();
row_writer.write_row_metadata(DataDeltaKind::Insert, row_data.get_txn_revised())?;
row_writer.write_row_pk(key)?;
row_writer.write_row_data(self.0, &row_data)?;
}
// actual commit == current row count
row_writer
.f
.dtrack_write(&current_row_count.u64_bytes_le())?;
Ok(())
}
}
/*
restore implementation
---

@ -25,17 +25,25 @@
*/
use {
self::impls::mdl_journal::FullModel,
super::{
common::interface::{fs_imp::LocalFS, fs_traits::FSInterface},
v1, SELoaded,
},
crate::engine::{
config::Configuration,
core::{system_db::SystemDatabase, GlobalNS},
core::{
system_db::{SystemDatabase, VerifyUser},
GlobalNS,
},
fractal::{context, ModelDrivers, ModelUniqueID},
storage::common::paths_v1,
txn::{
gns::{model::CreateModelTxn, space::CreateSpaceTxn, sysctl::CreateUserTxn},
gns::{
model::CreateModelTxn,
space::CreateSpaceTxn,
sysctl::{AlterUserTxn, CreateUserTxn},
},
SpaceIDRef,
},
RuntimeResult,
@ -69,7 +77,7 @@ pub fn recreate(gns: GlobalNS) -> RuntimeResult<SELoaded> {
model_id.entity(),
model.get_uuid(),
))?;
let model_driver = ModelDriver::create_model_driver(&paths_v1::model_path(
let mut model_driver = ModelDriver::create_model_driver(&paths_v1::model_path(
model_id.space(),
space_uuid,
model_id.entity(),
@ -80,12 +88,12 @@ pub fn recreate(gns: GlobalNS) -> RuntimeResult<SELoaded> {
model_id.entity(),
model,
))?;
model_driver.commit_event(FullModel::new(model))?;
model_drivers.add_driver(
ModelUniqueID::new(model_id.space(), model_id.entity(), model.get_uuid()),
model_driver,
);
}
// FIXME(@ohsayan): write all model data
Ok(SELoaded {
gns,
gns_driver,
@ -103,7 +111,7 @@ pub fn initialize_new(config: &Configuration) -> RuntimeResult<SELoaded> {
SystemDatabase::ROOT_ACCOUNT,
&password_hash,
))?;
assert!(gns.sys_db().__insert_user(
assert!(gns.sys_db().__raw_create_user(
SystemDatabase::ROOT_ACCOUNT.to_owned().into_boxed_str(),
password_hash.into_boxed_slice(),
));
@ -114,10 +122,10 @@ pub fn initialize_new(config: &Configuration) -> RuntimeResult<SELoaded> {
})
}
pub fn restore() -> RuntimeResult<SELoaded> {
pub fn restore(cfg: &Configuration) -> RuntimeResult<SELoaded> {
let gns = GlobalNS::empty();
context::set_dmsg("loading gns");
let gns_driver = impls::gns_log::GNSDriver::open_gns(&gns)?;
let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns)?;
let model_drivers = ModelDrivers::empty();
for (id, model) in gns.idx_models().write().iter_mut() {
let space_uuid = gns.idx().read().get(id.space()).unwrap().get_uuid();
@ -135,6 +143,20 @@ pub fn restore() -> RuntimeResult<SELoaded> {
model.model_mutator().vacuum_stashed();
}
}
// check if password has changed
if gns
.sys_db()
.verify_user(SystemDatabase::ROOT_ACCOUNT, cfg.auth.root_key.as_bytes())
== VerifyUser::IncorrectPassword
{
// the password was changed
warn!("root password changed via configuration");
context::set_dmsg("updating password to system database from configuration");
let phash = rcrypt::hash(&cfg.auth.root_key, rcrypt::DEFAULT_COST).unwrap();
gns_driver.commit_event(AlterUserTxn::new(SystemDatabase::ROOT_ACCOUNT, &phash))?;
gns.sys_db()
.__raw_alter_user(SystemDatabase::ROOT_ACCOUNT, phash.into_boxed_slice());
}
Ok(SELoaded {
gns,
gns_driver,

@ -24,8 +24,6 @@
*
*/
#![allow(dead_code)]
use {
self::raw::{CommitPreference, RawJournalAdapterEvent, RawJournalWriter},
crate::{
@ -167,6 +165,7 @@ pub type BatchDriver<BA, Fs> = RawJournalWriter<BatchAdapter<BA>, Fs>;
/// Batch journal adapter
pub struct BatchAdapter<BA: BatchAdapterSpec>(PhantomData<BA>);
#[cfg(test)]
impl<BA: BatchAdapterSpec> BatchAdapter<BA> {
/// Open a new batch journal
pub fn open<Fs: FSInterface>(

@ -122,11 +122,13 @@ pub fn obtain_trace() -> Vec<JournalTraceEvent> {
}
#[derive(Debug, PartialEq)]
#[cfg(test)]
pub enum JournalTraceEvent {
Writer(JournalWriterTraceEvent),
Reader(JournalReaderTraceEvent),
}
#[cfg(test)]
direct_from! {
JournalTraceEvent => {
JournalWriterTraceEvent as Writer,
@ -135,6 +137,7 @@ direct_from! {
}
#[derive(Debug, PartialEq)]
#[cfg(test)]
pub enum JournalReaderTraceEvent {
Initialized,
Completed,
@ -159,6 +162,7 @@ pub enum JournalReaderTraceEvent {
}
#[derive(Debug, PartialEq)]
#[cfg(test)]
pub(super) enum JournalWriterTraceEvent {
Initialized,
ReinitializeAttempt,
@ -174,7 +178,6 @@ pub(super) enum JournalWriterTraceEvent {
event_id: u64,
prev_id: u64,
},
DriverEventPresyncCompleted,
DriverEventCompleted,
DriverClosed,
}
@ -277,6 +280,7 @@ pub trait RawJournalAdapter: Sized {
#[derive(Debug, PartialEq)]
pub enum CommitPreference {
#[allow(unused)]
Buffered,
Direct,
}
@ -318,6 +322,7 @@ impl DriverEvent {
const OFFSET_6_LAST_TXN_ID: Range<usize> =
Self::OFFSET_5_LAST_OFFSET.end..Self::OFFSET_5_LAST_OFFSET.end + sizeof!(u64);
/// Create a new driver event (checksum auto-computed)
#[cfg(test)]
fn new(
txn_id: u128,
driver_event: DriverEventKind,
@ -359,6 +364,7 @@ impl DriverEvent {
}
}
/// Encode the current driver event
#[cfg(test)]
fn encode_self(&self) -> [u8; 64] {
Self::encode(
self.txn_id,
@ -493,9 +499,6 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
}
Ok(me)
}
pub fn context(&mut self) -> J::Context<'_> {
J::enter_context(self)
}
/// Commit a new event to the journal
///
/// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns,

@ -116,7 +116,7 @@ macro_rules! impl_db_event {
($($ty:ty as $code:expr $(=> $expr:expr)?),*) => {
$(impl SimpleDBEvent for $ty {
const OPC: u8 = $code;
fn write_buffered(self, buf: &mut Vec<u8>) { let _ = buf; fn do_it(s: $ty, b: &mut Vec<u8>, f: impl Fn($ty, &mut Vec<u8>)) { f(s, b) } $(do_it(self, buf, $expr))? }
fn write_buffered(self, buf: &mut Vec<u8>) { let _ = buf; fn _do_it(s: $ty, b: &mut Vec<u8>, f: impl Fn($ty, &mut Vec<u8>)) { f(s, b) } $(_do_it(self, buf, $expr))? }
})*
}
}
@ -139,11 +139,6 @@ impl<T: SimpleDBEvent> RawJournalAdapterEvent<SimpleDBJournal> for T {
}
}
pub enum DbEventRestored {
NewKey(String),
Pop,
Clear,
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum EventMeta {
NewKey,

@ -77,7 +77,7 @@ macro_rules! impl_test_event {
($($ty:ty as $code:expr $(=> $expr:expr)?),* $(,)?) => {
$(impl IsTestEvent for $ty {
const EVCODE: TestEvent = $code;
fn encode(self, buf: &mut Vec<u8>) { let _ = buf; fn do_it(s: $ty, b: &mut Vec<u8>, f: impl Fn($ty, &mut Vec<u8>)) { f(s, b) } $(do_it(self, buf, $expr))? }
fn encode(self, buf: &mut Vec<u8>) { let _ = buf; fn _do_it(s: $ty, b: &mut Vec<u8>, f: impl Fn($ty, &mut Vec<u8>)) { f(s, b) } $(_do_it(self, buf, $expr))? }
})*
}
}
@ -220,6 +220,7 @@ fn test_this_data() {
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA4);
db.clear(&mut log).unwrap();
RawJournalWriter::close_driver(&mut log).unwrap();
}
}

Loading…
Cancel
Save