Upgrade to new SE

next
Sayan Nandan 7 months ago
parent 102a4b4b40
commit d6f9b54868
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

131
Cargo.lock generated

@ -37,6 +37,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.11"
@ -173,6 +188,12 @@ dependencies = [
"cipher",
]
[[package]]
name = "bumpalo"
version = "3.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3b1be7772ee4501dba05acbe66bb1e8760f6a6c474a36035631638e4415f130"
[[package]]
name = "byteorder"
version = "1.5.0"
@ -222,6 +243,20 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets 0.52.0",
]
[[package]]
name = "cipher"
version = "0.4.4"
@ -584,6 +619,29 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "iana-time-zone"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "indexmap"
version = "2.2.2"
@ -638,6 +696,15 @@ dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -747,6 +814,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-traits"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
@ -1244,6 +1320,7 @@ name = "skyd"
version = "0.8.0-beta.4"
dependencies = [
"bytes",
"chrono",
"crc",
"crossbeam-epoch",
"env_logger",
@ -1503,6 +1580,60 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.48",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838"
[[package]]
name = "winapi"
version = "0.3.9"

@ -24,6 +24,7 @@ tokio-openssl = "0.6.4"
uuid = { version = "1.7.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
crc = "3.0.1"
serde_yaml = "0.9.31"
chrono = "0.4.34"
[target.'cfg(all(not(target_env = "msvc"), not(miri)))'.dependencies]
# external deps

@ -60,16 +60,21 @@ fn alter_user(
return Err(QueryError::SysAuthError);
}
let (username, password) = get_user_data(user)?;
global.sys_store().alter_user(username, password)
global
.namespace()
.sys_db()
.alter_user(global, &username, &password)
}
fn create_user(global: &impl GlobalInstanceLike, user: UserDecl) -> QueryResult<()> {
let (username, password) = get_user_data(user)?;
global.sys_store().create_new_user(username, password)
global
.namespace()
.sys_db()
.create_user(global, username.into_boxed_str(), &password)
}
fn get_user_data(mut user: UserDecl) -> Result<(String, String), QueryError> {
let username = user.username().to_owned();
fn get_user_data<'a>(mut user: UserDecl<'a>) -> Result<(String, String), QueryError> {
let password = match user.options_mut().remove(KEY_PASSWORD) {
Some(DictEntryGeneric::Data(d))
if d.kind() == TagClass::Str && user.options().is_empty() =>
@ -79,6 +84,7 @@ fn get_user_data(mut user: UserDecl) -> Result<(String, String), QueryError> {
return Err(QueryError::QExecDdlInvalidProperties);
}
};
let username = user.username().to_owned();
Ok((username, password))
}
@ -91,5 +97,8 @@ fn drop_user(
// you can't delete yourself!
return Err(QueryError::SysAuthError);
}
global.sys_store().drop_user(user_del.username())
global
.namespace()
.sys_db()
.drop_user(global, user_del.username())
}

@ -56,8 +56,8 @@ pub fn inspect(
drop(spaces_iter);
drop(spaces);
// collect users
let users = g.sys_store().system_store().auth_data().read();
let mut users_iter = users.users().iter().peekable();
let users = g.namespace().sys_db().users().read();
let mut users_iter = users.iter().peekable();
while let Some((user, _)) = users_iter.next() {
ret.push('"');
ret.push_str(&user);

@ -25,13 +25,14 @@
*/
pub(in crate::engine) mod dcl;
pub(super) mod ddl_misc;
mod ddl_misc;
pub(in crate::engine) mod dml;
pub(in crate::engine) mod exec;
pub(in crate::engine) mod index;
pub(in crate::engine) mod model;
pub(in crate::engine) mod query_meta;
pub(in crate::engine) mod space;
pub(in crate::engine) mod system_db;
// util
mod util;
// test
@ -60,6 +61,7 @@ type RWLIdx<K, V> = RwLock<IndexST<K, V>>;
pub struct GlobalNS {
idx_mdl: RWLIdx<EntityID, Model>,
idx: RWLIdx<Box<str>, Space>,
sys_db: system_db::SystemDatabase,
}
impl GlobalNS {
@ -67,6 +69,7 @@ impl GlobalNS {
Self {
idx_mdl: RWLIdx::default(),
idx: RWLIdx::default(),
sys_db: system_db::SystemDatabase::empty(),
}
}
pub fn ddl_with_all_mut<T>(
@ -137,6 +140,9 @@ impl GlobalNS {
pub fn contains_space(&self, name: &str) -> bool {
self.idx.read().contains_key(name)
}
pub fn sys_db(&self) -> &system_db::SystemDatabase {
&self.sys_db
}
}
pub(self) fn with_model_for_data_update<'a, F>(

@ -273,7 +273,7 @@ impl Model {
&new_fields,
);
// commit txn
global.gns_driver().lock().gns_driver().try_commit(txn)?;
global.gns_driver().lock().gns_driver().commit_event(txn)?;
}
let mut mutator = model.model_mutator();
new_fields
@ -291,7 +291,7 @@ impl Model {
&removed,
);
// commit txn
global.gns_driver().lock().gns_driver().try_commit(txn)?;
global.gns_driver().lock().gns_driver().commit_event(txn)?;
}
let mut mutator = model.model_mutator();
removed.iter().for_each(|field_id| {
@ -306,7 +306,7 @@ impl Model {
&updated,
);
// commit txn
global.gns_driver().lock().gns_driver().try_commit(txn)?;
global.gns_driver().lock().gns_driver().commit_event(txn)?;
}
let mut mutator = model.model_mutator();
updated.into_iter().for_each(|(field_id, field)| {

@ -294,7 +294,7 @@ impl Model {
model.get_uuid(),
)?;
// commit txn
match txn_driver.gns_driver().try_commit(txn) {
match txn_driver.gns_driver().commit_event(txn) {
Ok(()) => {}
Err(e) => {
// failed to commit, request cleanup
@ -358,7 +358,7 @@ impl Model {
model.delta_state().schema_current_version().value_u64(),
));
// commit txn
global.gns_driver().lock().gns_driver().try_commit(txn)?;
global.gns_driver().lock().gns_driver().commit_event(txn)?;
// request cleanup
global.purge_model_driver(
space_name,

@ -24,6 +24,8 @@
*
*/
use crate::engine::storage::safe_interfaces::paths_v1;
use super::EntityIDRef;
use {
@ -33,7 +35,7 @@ use {
fractal::{GenericTask, GlobalInstanceLike, Task},
idx::STIndex,
ql::ddl::{alt::AlterSpace, crt::CreateSpace, drop::DropSpace},
storage::{safe_interfaces::FSInterface, v1::loader::SEInitState},
storage::safe_interfaces::FSInterface,
txn::{self, SpaceIDRef},
},
std::collections::HashSet,
@ -169,12 +171,12 @@ impl Space {
// prepare txn
let txn = txn::gns::space::CreateSpaceTxn::new(space.props(), &space_name, &space);
// try to create space for...the space
G::FileSystem::fs_create_dir_all(&SEInitState::space_dir(
G::FileSystem::fs_create_dir_all(&paths_v1::space_dir(
&space_name,
space.get_uuid(),
))?;
// commit txn
match global.gns_driver().lock().gns_driver().try_commit(txn) {
match global.gns_driver().lock().gns_driver().commit_event(txn) {
Ok(()) => {}
Err(e) => {
// tell fractal to clean it up sometime
@ -221,7 +223,7 @@ impl Space {
&patch,
);
// commit
global.gns_driver().lock().gns_driver().try_commit(txn)?;
global.gns_driver().lock().gns_driver().commit_event(txn)?;
}
// merge
dict::rmerge_data_with_patch(space.props_mut(), patch);
@ -256,7 +258,7 @@ impl Space {
let txn =
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global.gns_driver().lock().gns_driver().try_commit(txn)?;
global.gns_driver().lock().gns_driver().commit_event(txn)?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()),
@ -303,7 +305,7 @@ impl Space {
let txn =
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global.gns_driver().lock().gns_driver().try_commit(txn)?;
global.gns_driver().lock().gns_driver().commit_event(txn)?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()),

@ -0,0 +1,194 @@
/*
* Created on Wed Feb 21 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::RWLIdx,
crate::engine::{
error::{QueryError, QueryResult},
fractal::GlobalInstanceLike,
txn::gns::sysctl::{AlterUserTxn, CreateUserTxn, DropUserTxn},
},
std::collections::hash_map::Entry,
};
#[derive(Debug)]
pub struct SystemDatabase {
users: RWLIdx<Box<str>, User>,
}
#[derive(Debug, PartialEq)]
pub struct User {
password: Box<[u8]>,
}
impl User {
pub fn new(password: Box<[u8]>) -> Self {
Self { password }
}
}
#[derive(Debug, PartialEq)]
pub enum VerifyUser {
NotFound,
IncorrectPassword,
Okay,
OkayRoot,
}
impl VerifyUser {
pub fn is_root(&self) -> bool {
matches!(self, Self::OkayRoot)
}
}
impl SystemDatabase {
pub const ROOT_ACCOUNT: &'static str = "root";
pub fn empty() -> Self {
Self {
users: RWLIdx::default(),
}
}
pub fn users(&self) -> &RWLIdx<Box<str>, User> {
&self.users
}
pub fn __verify_user(&self, username: &str, password: &[u8]) -> VerifyUser {
self.users
.read()
.get(username)
.map(|user| {
if rcrypt::verify(password, &user.password).unwrap() {
if username == Self::ROOT_ACCOUNT {
VerifyUser::OkayRoot
} else {
VerifyUser::Okay
}
} else {
VerifyUser::IncorrectPassword
}
})
.unwrap_or(VerifyUser::NotFound)
}
pub fn __insert_user(&self, username: Box<str>, password: Box<[u8]>) -> bool {
match self.users.write().entry(username) {
Entry::Vacant(ve) => {
ve.insert(User::new(password));
true
}
Entry::Occupied(_) => false,
}
}
pub fn __delete_user(&self, username: &str) -> bool {
self.users.write().remove(username).is_some()
}
pub fn __change_user_password(&self, username: &str, new_password: Box<[u8]>) -> bool {
match self.users.write().get_mut(username) {
Some(user) => {
user.password = new_password;
true
}
None => false,
}
}
}
impl SystemDatabase {
pub fn create_user(
&self,
global: &impl GlobalInstanceLike,
username: Box<str>,
password: &str,
) -> QueryResult<()> {
let mut users = self.users.write();
if users.contains_key(&username) {
return Err(QueryError::SysAuthError);
}
let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap();
match global
.gns_driver()
.lock()
.gns_driver()
.commit_event(CreateUserTxn::new(&username, &password_hash))
{
Ok(()) => {
users.insert(username, User::new(password_hash.into_boxed_slice()));
Ok(())
}
Err(e) => {
error!("failed to create user: {e}");
return Err(QueryError::SysTransactionalError);
}
}
}
pub fn alter_user(
&self,
global: &impl GlobalInstanceLike,
username: &str,
password: &str,
) -> QueryResult<()> {
match self.users.write().get_mut(username) {
Some(user) => {
let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap();
match global
.gns_driver()
.lock()
.gns_driver()
.commit_event(AlterUserTxn::new(username, &password_hash))
{
Ok(()) => {
user.password = password_hash.into_boxed_slice();
Ok(())
}
Err(e) => {
error!("failed to alter user: {e}");
Err(QueryError::SysTransactionalError)
}
}
}
None => Err(QueryError::SysAuthError),
}
}
pub fn drop_user(&self, global: &impl GlobalInstanceLike, username: &str) -> QueryResult<()> {
let mut users = self.users.write();
if !users.contains_key(username) {
return Err(QueryError::SysAuthError);
}
match global
.gns_driver()
.lock()
.gns_driver()
.commit_event(DropUserTxn::new(username))
{
Ok(()) => {
let _ = users.remove(username);
Ok(())
}
Err(e) => {
error!("failed to remove user: {e}");
Err(QueryError::SysTransactionalError)
}
}
}
}

@ -46,12 +46,6 @@ pub enum DictEntryGeneric {
}
impl DictEntryGeneric {
pub fn as_dict_mut(&mut self) -> Option<&mut DictGeneric> {
match self {
Self::Map(m) => Some(m),
_ => None,
}
}
pub fn into_dict(self) -> Option<DictGeneric> {
match self {
Self::Map(m) => Some(m),

@ -188,9 +188,6 @@ enumerate_err! {
/// The entire header is corrupted
HeaderDecodeCorruptedHeader = "header-corrupted",
// journal
/// While attempting to handle a basic failure (such as adding a journal entry), the recovery engine ran into an exceptional
/// situation where it failed to make a necessary repair the log
JournalWRecoveryStageOneFailCritical = "journal-recovery-failure",
/// An entry in the journal is corrupted
JournalLogEntryCorrupted = "journal-entry-corrupted",
/// The structure of the journal is corrupted
@ -202,9 +199,6 @@ enumerate_err! {
InternalDecodeStructureCorruptedPayload = "structure-decode-corrupted-payload",
/// the data for an internal structure was decoded but is logically invalid
InternalDecodeStructureIllegalData = "structure-decode-illegal-data",
/// when attempting to flush a data batch, the batch journal crashed and a recovery event was triggered. But even then,
/// the data batch journal could not be fixed
DataBatchRecoveryFailStageOne = "batch-recovery-failure",
/// when attempting to restore a data batch from disk, the batch journal crashed and had a corruption, but it is irrecoverable
DataBatchRestoreCorruptedBatch = "batch-corrupted-batch",
/// when attempting to restore a data batch from disk, the driver encountered a corrupted entry

@ -25,58 +25,85 @@
*/
use {
super::util,
super::{util, ModelUniqueID},
crate::engine::{
error::RuntimeResult,
storage::{
safe_interfaces::FSInterface,
v1::{DataBatchPersistDriver, GNSTransactionDriverAnyFS},
},
storage::{safe_interfaces::FSInterface, GNSDriver, ModelDriver},
},
parking_lot::Mutex,
std::sync::Arc,
parking_lot::{Mutex, RwLock},
std::{collections::HashMap, sync::Arc},
};
/// GNS driver
pub struct FractalGNSDriver<Fs: FSInterface> {
#[allow(unused)]
status: util::Status,
pub(super) txn_driver: GNSTransactionDriverAnyFS<Fs>,
pub(super) txn_driver: GNSDriver<Fs>,
}
impl<Fs: FSInterface> FractalGNSDriver<Fs> {
pub(super) fn new(txn_driver: GNSTransactionDriverAnyFS<Fs>) -> Self {
pub(super) fn new(txn_driver: GNSDriver<Fs>) -> Self {
Self {
status: util::Status::new_okay(),
txn_driver: txn_driver,
}
}
pub fn gns_driver(&mut self) -> &mut GNSTransactionDriverAnyFS<Fs> {
pub fn gns_driver(&mut self) -> &mut GNSDriver<Fs> {
&mut self.txn_driver
}
}
pub struct ModelDrivers<Fs: FSInterface> {
drivers: RwLock<HashMap<ModelUniqueID, FractalModelDriver<Fs>>>,
}
impl<Fs: FSInterface> ModelDrivers<Fs> {
pub fn empty() -> Self {
Self {
drivers: RwLock::new(HashMap::new()),
}
}
pub fn drivers(&self) -> &RwLock<HashMap<ModelUniqueID, FractalModelDriver<Fs>>> {
&self.drivers
}
pub fn count(&self) -> usize {
self.drivers.read().len()
}
pub fn add_driver(&self, id: ModelUniqueID, batch_driver: ModelDriver<Fs>) {
assert!(self
.drivers
.write()
.insert(id, FractalModelDriver::init(batch_driver))
.is_none());
}
pub fn remove_driver(&self, id: ModelUniqueID) {
assert!(self.drivers.write().remove(&id).is_some())
}
pub fn into_inner(self) -> HashMap<ModelUniqueID, FractalModelDriver<Fs>> {
self.drivers.into_inner()
}
}
/// Model driver
pub struct FractalModelDriver<Fs: FSInterface> {
#[allow(unused)]
hooks: Arc<FractalModelHooks>,
batch_driver: Mutex<DataBatchPersistDriver<Fs>>,
batch_driver: Mutex<ModelDriver<Fs>>,
}
impl<Fs: FSInterface> FractalModelDriver<Fs> {
/// Initialize a model driver with default settings
pub fn init(batch_driver: DataBatchPersistDriver<Fs>) -> Self {
pub(in crate::engine::fractal) fn init(batch_driver: ModelDriver<Fs>) -> Self {
Self {
hooks: Arc::new(FractalModelHooks::new()),
batch_driver: Mutex::new(batch_driver),
}
}
/// Returns a reference to the batch persist driver
pub fn batch_driver(&self) -> &Mutex<DataBatchPersistDriver<Fs>> {
pub fn batch_driver(&self) -> &Mutex<ModelDriver<Fs>> {
&self.batch_driver
}
pub fn close(self) -> RuntimeResult<()> {
self.batch_driver.into_inner().close()
ModelDriver::close_driver(&mut self.batch_driver.into_inner())
}
}

@ -33,7 +33,7 @@ use {
EntityIDRef,
},
data::uuid::Uuid,
storage::safe_interfaces::LocalFS,
storage::safe_interfaces::{paths_v1, LocalFS, StdModelBatch},
},
util::os,
},
@ -85,17 +85,11 @@ impl GenericTask {
model_uuid: Uuid,
) -> Self {
Self::DeleteDirAll(
crate::engine::storage::v1::loader::SEInitState::model_dir(
space_name, space_uuid, model_name, model_uuid,
)
.into(),
paths_v1::model_dir(space_name, space_uuid, model_name, model_uuid).into(),
)
}
pub fn delete_space_dir(space_name: &str, space_uuid: Uuid) -> Self {
Self::DeleteDirAll(
crate::engine::storage::v1::loader::SEInitState::space_dir(space_name, space_uuid)
.into(),
)
Self::DeleteDirAll(paths_v1::space_dir(space_name, space_uuid).into())
}
}
@ -285,7 +279,7 @@ impl FractalMgr {
match task {
CriticalTask::WriteBatch(model_id, observed_size) => {
info!("fhp: {model_id} has reached cache capacity. writing to disk");
let mdl_drivers = global.get_state().get_mdl_drivers().read();
let mdl_drivers = global.get_state().get_mdl_drivers().drivers().read();
let Some(mdl_driver) = mdl_drivers.get(&model_id) else {
// because we maximize throughput, the model driver may have been already removed but this task
// was way behind in the queue
@ -380,7 +374,7 @@ impl FractalMgr {
}
}
fn general_executor(&'static self, global: super::Global) {
let mdl_drivers = global.get_state().get_mdl_drivers().read();
let mdl_drivers = global.get_state().get_mdl_drivers().drivers().read();
for (model_id, driver) in mdl_drivers.iter() {
let mut observed_len = 0;
let res = global._namespace().with_model(
@ -428,7 +422,7 @@ impl FractalMgr {
fn try_write_model_data_batch(
model: &Model,
observed_size: usize,
mdl_driver: &super::FractalModelDriver<LocalFS>,
mdl_driver: &super::drivers::FractalModelDriver<LocalFS>,
) -> crate::engine::error::QueryResult<()> {
if observed_size == 0 {
// no changes, all good
@ -436,7 +430,7 @@ impl FractalMgr {
}
// try flushing the batch
let mut batch_driver = mdl_driver.batch_driver().lock();
batch_driver.write_new_batch(model, observed_size)?;
batch_driver.commit_event(StdModelBatch::new(model, observed_size))?;
Ok(())
}
}

@ -25,19 +25,17 @@
*/
use {
self::sys_store::SystemStore,
super::{
core::{dml::QueryExecMeta, model::Model, GlobalNS},
data::uuid::Uuid,
storage::{
self,
safe_interfaces::{FSInterface, LocalFS},
v1::GNSTransactionDriverAnyFS,
safe_interfaces::{paths_v1, FSInterface, LocalFS},
GNSDriver, ModelDriver,
},
},
crate::engine::error::RuntimeResult,
parking_lot::{Mutex, RwLock},
std::{collections::HashMap, fmt, mem::MaybeUninit},
parking_lot::Mutex,
std::{fmt, mem::MaybeUninit},
tokio::sync::mpsc::unbounded_channel,
};
@ -45,18 +43,15 @@ pub mod context;
mod drivers;
pub mod error;
mod mgr;
pub mod sys_store;
#[cfg(test)]
pub mod test_utils;
mod util;
pub use {
drivers::FractalModelDriver,
drivers::ModelDrivers,
mgr::{CriticalTask, GenericTask, Task, GENERAL_EXECUTOR_WINDOW},
util::FractalToken,
};
pub type ModelDrivers<Fs> = HashMap<ModelUniqueID, drivers::FractalModelDriver<Fs>>;
/*
global state init
*/
@ -75,21 +70,18 @@ pub struct GlobalStateStart {
/// Must be called iff this is the only thread calling it
pub unsafe fn load_and_enable_all(
gns: GlobalNS,
config: SystemStore<LocalFS>,
gns_driver: GNSTransactionDriverAnyFS<LocalFS>,
gns_driver: GNSDriver<LocalFS>,
model_drivers: ModelDrivers<LocalFS>,
) -> GlobalStateStart {
let model_cnt_on_boot = model_drivers.len();
let model_cnt_on_boot = model_drivers.count();
let gns_driver = drivers::FractalGNSDriver::new(gns_driver);
let mdl_driver = RwLock::new(model_drivers);
let (hp_sender, hp_recv) = unbounded_channel();
let (lp_sender, lp_recv) = unbounded_channel();
let global_state = GlobalState::new(
gns,
gns_driver,
mdl_driver,
model_drivers,
mgr::FractalMgr::new(hp_sender, lp_sender, model_cnt_on_boot),
config,
);
*Global::__gref_raw() = MaybeUninit::new(global_state);
let token = Global::new();
@ -153,8 +145,6 @@ pub trait GlobalInstanceLike {
)));
}
}
// config handle
fn sys_store(&self) -> &SystemStore<Self::FileSystem>;
}
impl GlobalInstanceLike for Global {
@ -177,10 +167,6 @@ impl GlobalInstanceLike for Global {
fn get_max_delta_size(&self) -> usize {
self._get_max_delta_size()
}
// sys
fn sys_store(&self) -> &SystemStore<Self::FileSystem> {
&self.get_state().config
}
// model
fn purge_model_driver(
&self,
@ -191,11 +177,7 @@ impl GlobalInstanceLike for Global {
skip_delete: bool,
) {
let id = ModelUniqueID::new(space_name, model_name, model_uuid);
self.get_state()
.mdl_driver
.write()
.remove(&id)
.expect("tried to remove non existent driver");
self.get_state().mdl_driver.remove_driver(id);
if !skip_delete {
self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir(
space_name, space_uuid, model_name, model_uuid,
@ -210,17 +192,16 @@ impl GlobalInstanceLike for Global {
model_uuid: Uuid,
) -> RuntimeResult<()> {
// create dir
LocalFS::fs_create_dir(&storage::v1::loader::SEInitState::model_dir(
LocalFS::fs_create_dir(&paths_v1::model_dir(
space_name, space_uuid, model_name, model_uuid,
))?;
// init driver
let driver =
storage::v1::create_batch_journal(&storage::v1::loader::SEInitState::model_path(
space_name, space_uuid, model_name, model_uuid,
))?;
self.get_state().mdl_driver.write().insert(
let driver = ModelDriver::create_model_driver(&paths_v1::model_path(
space_name, space_uuid, model_name, model_uuid,
))?;
self.get_state().mdl_driver.add_driver(
ModelUniqueID::new(space_name, model_name, model_uuid),
drivers::FractalModelDriver::init(driver),
driver,
);
Ok(())
}
@ -274,9 +255,9 @@ impl Global {
mdl_driver,
..
} = Self::__gref_raw().assume_init_read();
let gns_driver = gns_driver.into_inner().txn_driver.into_inner();
let mut gns_driver = gns_driver.into_inner().txn_driver;
let mdl_drivers = mdl_driver.into_inner();
gns_driver.close().unwrap();
GNSDriver::close_driver(&mut gns_driver).unwrap();
for (_, driver) in mdl_drivers {
driver.close().unwrap();
}
@ -291,28 +272,25 @@ impl Global {
struct GlobalState {
gns: GlobalNS,
gns_driver: Mutex<drivers::FractalGNSDriver<LocalFS>>,
mdl_driver: RwLock<ModelDrivers<LocalFS>>,
mdl_driver: ModelDrivers<LocalFS>,
task_mgr: mgr::FractalMgr,
config: SystemStore<LocalFS>,
}
impl GlobalState {
fn new(
gns: GlobalNS,
gns_driver: drivers::FractalGNSDriver<LocalFS>,
mdl_driver: RwLock<ModelDrivers<LocalFS>>,
mdl_driver: ModelDrivers<LocalFS>,
task_mgr: mgr::FractalMgr,
config: SystemStore<LocalFS>,
) -> Self {
Self {
gns,
gns_driver: Mutex::new(gns_driver),
mdl_driver,
task_mgr,
config,
}
}
pub(self) fn get_mdl_drivers(&self) -> &RwLock<ModelDrivers<LocalFS>> {
pub(self) fn get_mdl_drivers(&self) -> &ModelDrivers<LocalFS> {
&self.mdl_driver
}
pub(self) fn fractal_mgr(&self) -> &mgr::FractalMgr {

@ -1,274 +0,0 @@
/*
* Created on Sun Sep 10 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::engine::{
config::{ConfigAuth, ConfigMode},
error::{QueryError, QueryResult},
storage::safe_interfaces::FSInterface,
},
parking_lot::RwLock,
std::{
collections::{hash_map::Entry, HashMap},
marker::PhantomData,
},
};
#[derive(Debug)]
pub struct SystemStore<Fs> {
syscfg: SysConfig,
_fs: PhantomData<Fs>,
}
impl<Fs> SystemStore<Fs> {
pub fn system_store(&self) -> &SysConfig {
&self.syscfg
}
}
#[derive(Debug)]
/// The global system configuration
pub struct SysConfig {
auth_data: RwLock<SysAuth>,
host_data: SysHostData,
run_mode: ConfigMode,
}
impl PartialEq for SysConfig {
fn eq(&self, other: &Self) -> bool {
self.run_mode == other.run_mode
&& self.host_data == other.host_data
&& self.auth_data.read().eq(&other.auth_data.read())
}
}
impl SysConfig {
/// Initialize a new system config
pub fn new(auth_data: RwLock<SysAuth>, host_data: SysHostData, run_mode: ConfigMode) -> Self {
Self {
auth_data,
host_data,
run_mode,
}
}
pub fn new_full(new_auth: ConfigAuth, host_data: SysHostData, run_mode: ConfigMode) -> Self {
Self::new(
RwLock::new(SysAuth::new(
into_dict!(SysAuthUser::USER_ROOT => SysAuthUser::new(
rcrypt::hash(new_auth.root_key.as_str(), rcrypt::DEFAULT_COST)
.unwrap()
.into_boxed_slice())),
)),
host_data,
run_mode,
)
}
pub fn new_auth(new_auth: ConfigAuth, run_mode: ConfigMode) -> Self {
Self::new_full(new_auth, SysHostData::new(0, 0), run_mode)
}
#[cfg(test)]
/// A test-mode default setting with the root password set to `password12345678`
pub(super) fn test_default() -> Self {
Self {
auth_data: RwLock::new(SysAuth::new(
into_dict!(SysAuthUser::USER_ROOT => SysAuthUser::new(
rcrypt::hash("password12345678", rcrypt::DEFAULT_COST)
.unwrap()
.into_boxed_slice())),
)),
host_data: SysHostData::new(0, 0),
run_mode: ConfigMode::Dev,
}
}
/// Returns a handle to the authentication data
pub fn auth_data(&self) -> &RwLock<SysAuth> {
&self.auth_data
}
/// Returns a reference to host data
pub fn host_data(&self) -> &SysHostData {
&self.host_data
}
}
#[derive(Debug, PartialEq)]
/// The host data section (system.host)
pub struct SysHostData {
startup_counter: u64,
settings_version: u32,
}
impl SysHostData {
/// New [`SysHostData`]
pub fn new(startup_counter: u64, settings_version: u32) -> Self {
Self {
startup_counter,
settings_version,
}
}
/// Returns the startup counter
///
/// Note:
/// - If this is `0` -> this is the first boot
/// - If this is `1` -> this is the second boot (... and so on)
pub fn startup_counter(&self) -> u64 {
self.startup_counter
}
/// Returns the settings version
///
/// Note:
/// - If this is `0` -> this is the initial setting (first boot)
///
/// If it stays at 0, this means that the settings were never changed
pub fn settings_version(&self) -> u32 {
self.settings_version
}
}
impl<Fs: FSInterface> SystemStore<Fs> {
pub fn _new(syscfg: SysConfig) -> Self {
Self {
syscfg,
_fs: PhantomData,
}
}
fn _try_sync_or(&self, auth: &mut SysAuth, rb: impl FnOnce(&mut SysAuth)) -> QueryResult<()> {
match self.sync_db(auth) {
Ok(()) => Ok(()),
Err(e) => {
error!("failed to sync system store: {e}");
rb(auth);
Err(e.into())
}
}
}
/// Create a new user with the given details
pub fn create_new_user(&self, username: String, password: String) -> QueryResult<()> {
// TODO(@ohsayan): we want to be very careful with this
let _username = username.clone();
let mut auth = self.system_store().auth_data().write();
match auth.users.entry(username.into()) {
Entry::Vacant(ve) => {
ve.insert(SysAuthUser::new(
rcrypt::hash(password, rcrypt::DEFAULT_COST)
.unwrap()
.into_boxed_slice(),
));
self._try_sync_or(&mut auth, |auth| {
auth.users.remove(_username.as_str());
})
}
Entry::Occupied(_) => Err(QueryError::SysAuthError),
}
}
pub fn alter_user(&self, username: String, password: String) -> QueryResult<()> {
let mut auth = self.system_store().auth_data().write();
match auth.users.get_mut(username.as_str()) {
Some(user) => {
let last_pass_hash = core::mem::replace(
&mut user.key,
rcrypt::hash(password, rcrypt::DEFAULT_COST)
.unwrap()
.into_boxed_slice(),
);
self._try_sync_or(&mut auth, |auth| {
auth.users.get_mut(username.as_str()).unwrap().key = last_pass_hash;
})
}
None => Err(QueryError::SysAuthError),
}
}
pub fn drop_user(&self, username: &str) -> QueryResult<()> {
let mut auth = self.system_store().auth_data().write();
if username == SysAuthUser::USER_ROOT {
// you can't remove root!
return Err(QueryError::SysAuthError);
}
match auth.users.remove_entry(username) {
Some((username, user)) => self._try_sync_or(&mut auth, |auth| {
let _ = auth.users.insert(username, user);
}),
None => Err(QueryError::SysAuthError),
}
}
}
/*
auth
*/
#[derive(Debug, PartialEq)]
/// The auth data section (system.auth)
pub struct SysAuth {
users: HashMap<Box<str>, SysAuthUser>,
}
impl SysAuth {
/// New [`SysAuth`] with the given settings
pub fn new(users: HashMap<Box<str>, SysAuthUser>) -> Self {
Self { users }
}
pub fn verify_user_check_root<T: AsRef<[u8]> + ?Sized>(
&self,
username: &str,
password: &T,
) -> QueryResult<bool> {
match self.users.get(username) {
Some(user) if rcrypt::verify(password, user.key()).unwrap() => {
Ok(username == SysAuthUser::USER_ROOT)
}
Some(_) | None => Err(QueryError::SysAuthError),
}
}
/// Verify the user with the given details
pub fn verify_user<T: AsRef<[u8]> + ?Sized>(
&self,
username: &str,
password: &T,
) -> QueryResult<()> {
self.verify_user_check_root(username, password).map(|_| ())
}
pub fn users(&self) -> &HashMap<Box<str>, SysAuthUser> {
&self.users
}
}
#[derive(Debug, PartialEq)]
/// The auth user
pub struct SysAuthUser {
key: Box<[u8]>,
}
impl SysAuthUser {
pub const USER_ROOT: &'static str = "root";
/// Create a new [`SysAuthUser`]
pub fn new(key: Box<[u8]>) -> Self {
Self { key }
}
/// Get the key
pub fn key(&self) -> &[u8] {
self.key.as_ref()
}
}

@ -26,17 +26,15 @@
use {
super::{
drivers::FractalGNSDriver,
sys_store::{SysConfig, SystemStore},
CriticalTask, FractalModelDriver, GenericTask, GlobalInstanceLike, ModelUniqueID, Task,
drivers::FractalGNSDriver, CriticalTask, GenericTask, GlobalInstanceLike, ModelUniqueID,
Task,
},
crate::engine::{
core::GlobalNS,
data::uuid::Uuid,
storage::{
self,
safe_interfaces::{FSInterface, NullFS, VirtualFS},
v1::GNSTransactionDriverAnyFS,
safe_interfaces::{paths_v1, FSInterface, NullFS, VirtualFS},
GNSDriver, ModelDriver,
},
},
parking_lot::{Mutex, RwLock},
@ -51,16 +49,11 @@ pub struct TestGlobal<Fs: FSInterface = VirtualFS> {
#[allow(unused)]
max_delta_size: usize,
txn_driver: Mutex<FractalGNSDriver<Fs>>,
model_drivers: RwLock<HashMap<ModelUniqueID, FractalModelDriver<Fs>>>,
sys_cfg: SystemStore<Fs>,
model_drivers: RwLock<HashMap<ModelUniqueID, super::drivers::FractalModelDriver<Fs>>>,
}
impl<Fs: FSInterface> TestGlobal<Fs> {
fn new(
gns: GlobalNS,
max_delta_size: usize,
txn_driver: GNSTransactionDriverAnyFS<Fs>,
) -> Self {
fn new(gns: GlobalNS, max_delta_size: usize, txn_driver: GNSDriver<Fs>) -> Self {
Self {
gns,
hp_queue: RwLock::default(),
@ -68,7 +61,6 @@ impl<Fs: FSInterface> TestGlobal<Fs> {
max_delta_size,
txn_driver: Mutex::new(FractalGNSDriver::new(txn_driver)),
model_drivers: RwLock::default(),
sys_cfg: SystemStore::_new(SysConfig::test_default()),
}
}
}
@ -76,10 +68,8 @@ impl<Fs: FSInterface> TestGlobal<Fs> {
impl<Fs: FSInterface> TestGlobal<Fs> {
pub fn new_with_driver_id(log_name: &str) -> Self {
let gns = GlobalNS::empty();
let driver = storage::v1::loader::open_gns_driver(log_name, &gns)
.unwrap()
.into_inner();
Self::new(gns, 0, GNSTransactionDriverAnyFS::new(driver))
let driver = GNSDriver::open_gns_with_name(log_name, &gns).unwrap();
Self::new(gns, 0, driver)
}
}
@ -115,9 +105,6 @@ impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
fn get_max_delta_size(&self) -> usize {
100
}
fn sys_store(&self) -> &SystemStore<Fs> {
&self.sys_cfg
}
fn purge_model_driver(
&self,
space_name: &str,
@ -145,16 +132,15 @@ impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
model_uuid: Uuid,
) -> crate::engine::error::RuntimeResult<()> {
// create model dir
Fs::fs_create_dir(&storage::v1::loader::SEInitState::model_dir(
Fs::fs_create_dir(&paths_v1::model_dir(
space_name, space_uuid, model_name, model_uuid,
))?;
let driver = ModelDriver::create_model_driver(&paths_v1::model_path(
space_name, space_uuid, model_name, model_uuid,
))?;
let driver =
storage::v1::create_batch_journal(&storage::v1::loader::SEInitState::model_path(
space_name, space_uuid, model_name, model_uuid,
))?;
self.model_drivers.write().insert(
ModelUniqueID::new(space_name, model_name, model_uuid),
FractalModelDriver::init(driver),
super::drivers::FractalModelDriver::init(driver),
);
Ok(())
}
@ -163,10 +149,6 @@ impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
impl<Fs: FSInterface> Drop for TestGlobal<Fs> {
fn drop(&mut self) {
let mut txn_driver = self.txn_driver.lock();
txn_driver
.gns_driver()
.__journal_mut()
.__close_mut()
.unwrap();
GNSDriver::close_driver(&mut txn_driver.txn_driver).unwrap()
}
}

@ -44,17 +44,12 @@ mod tests;
// re-export
pub use error::RuntimeResult;
use crate::engine::storage::SELoaded;
use {
self::{
config::{ConfigEndpoint, ConfigEndpointTls, ConfigMode, ConfigReturn, Configuration},
fractal::{
context::{self, Subsystem},
sys_store::SystemStore,
},
storage::{
safe_interfaces::LocalFS,
v1::loader::{self, SEInitState},
},
fractal::context::{self, Subsystem},
},
crate::util::os::TerminationSignal,
std::process::exit,
@ -82,29 +77,17 @@ pub fn load_all() -> RuntimeResult<(Configuration, fractal::GlobalStateStart)> {
if config.mode == ConfigMode::Dev {
warn!("running in dev mode");
}
// restore system database
info!("loading system database ...");
context::set_dmsg("loading system database");
let (store, state) = SystemStore::<LocalFS>::open_or_restore(config.auth.clone(), config.mode)?;
let sysdb_is_new = state.is_created();
if state.is_existing_updated_root() {
warn!("the root account was updated");
}
// now load all data
if sysdb_is_new {
info!("initializing storage engine ...");
} else {
info!("reinitializing storage engine...");
}
context::set_dmsg("restoring data");
let SEInitState {
txn_driver,
model_drivers,
info!("starting storage engine");
context::set_origin(Subsystem::Storage);
let SELoaded {
gns,
} = loader::SEInitState::try_init(sysdb_is_new)?;
gns_driver,
model_drivers,
} = storage::load(&config)?;
info!("storage engine ready. initializing system");
let global = unsafe {
// UNSAFE(@ohsayan): this is the only entrypoint
fractal::load_and_enable_all(gns, store, txn_driver, model_drivers)
// UNSAFE(@ohsayan): the only call we ever make
fractal::load_and_enable_all(gns, gns_driver, model_drivers)
};
Ok((config, global))
}

@ -47,6 +47,8 @@ mod tests;
// re-export
pub use exchange::SQuery;
use crate::engine::core::system_db::VerifyUser;
use {
self::{
exchange::{QExchangeResult, QExchangeState},
@ -289,20 +291,22 @@ async fn do_handshake<S: Socket>(
}
match core::str::from_utf8(handshake.hs_auth().username()) {
Ok(uname) => {
let auth = global.sys_store().system_store().auth_data().read();
let r = auth.verify_user_check_root(uname, handshake.hs_auth().password());
match r {
Ok(is_root) => {
match global
.namespace()
.sys_db()
.__verify_user(uname, handshake.hs_auth().password())
{
okay @ (VerifyUser::Okay | VerifyUser::OkayRoot) => {
let hs = handshake.hs_static();
let ret = Ok(PostHandshake::Okay(ClientLocalState::new(
uname.into(),
is_root,
okay.is_root(),
hs,
)));
buf.advance(cursor);
return ret;
}
Err(_) => {}
VerifyUser::IncorrectPassword | VerifyUser::NotFound => {}
}
}
Err(_) => {}

@ -40,31 +40,6 @@ pub enum FileOpen<CF, EF = CF> {
Existing(EF),
}
#[cfg(test)]
impl<CF, EF> FileOpen<CF, EF> {
pub fn into_existing(self) -> Option<EF> {
match self {
Self::Existing(e) => Some(e),
Self::Created(_) => None,
}
}
pub fn into_created(self) -> Option<CF> {
match self {
Self::Existing(_) => None,
Self::Created(c) => Some(c),
}
}
}
#[cfg(test)]
impl<CF> FileOpen<CF> {
pub fn into_inner(self) -> CF {
match self {
Self::Created(f) | Self::Existing(f) => f,
}
}
}
pub trait FSInterface {
// settings
/// set to false if the file system is a special device like `/dev/null`

@ -29,3 +29,29 @@ pub mod interface;
pub mod sdss;
pub mod static_meta;
pub mod versions;
pub mod paths_v1 {
use crate::engine::data::uuid::Uuid;
pub fn model_path(
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> String {
format!(
"{}/data.db-btlog",
self::model_dir(space_name, space_uuid, model_name, model_uuid)
)
}
pub fn model_dir(
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> String {
format!("data/{space_name}-{space_uuid}/mdl_{model_name}-{model_uuid}")
}
pub fn space_dir(space_name: &str, space_uuid: Uuid) -> String {
format!("data/{space_name}-{space_uuid}")
}
}

@ -231,6 +231,9 @@ impl<'a, F: FileInterfaceRead, S: FileSpecV1> TrackedReaderContext<'a, F, S> {
let Self { tr, p_checksum } = self;
(p_checksum.finish(), tr)
}
pub fn remaining(&self) -> u64 {
self.tr.remaining()
}
}
impl<F: FileInterface, S: FileSpecV1> TrackedReader<F, S> {

@ -25,3 +25,4 @@
*/
pub mod r1;
pub mod r2;

@ -213,7 +213,9 @@ pub trait MapStorageSpec {
// enc
pub mod enc {
use super::{map, MapStorageSpec, PersistObject, VecU8};
#[cfg(test)]
use super::{map, MapStorageSpec};
use super::{PersistObject, VecU8};
// obj
#[cfg(test)]
pub fn full<Obj: PersistObject>(obj: Obj::InputType) -> Vec<u8> {
@ -229,11 +231,13 @@ pub mod enc {
full::<Obj>(obj)
}
// dict
#[cfg(test)]
pub fn full_dict<PM: MapStorageSpec>(dict: &PM::InMemoryMap) -> Vec<u8> {
let mut v = vec![];
full_dict_into_buffer::<PM>(&mut v, dict);
v
}
#[cfg(test)]
pub fn full_dict_into_buffer<PM: MapStorageSpec>(buf: &mut VecU8, dict: &PM::InMemoryMap) {
<map::PersistMapImpl<PM> as PersistObject>::default_full_enc(buf, dict)
}

@ -0,0 +1,237 @@
/*
* Created on Wed Feb 21 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
/*
gns txn impls
*/
use {
super::r1::{dec, impls::gns::GNSEvent, PersistObject},
crate::{
engine::{
core::GlobalNS,
error::{StorageError, TransactionError},
mem::BufferedScanner,
txn::gns::sysctl::{AlterUserTxn, CreateUserTxn, DropUserTxn},
RuntimeResult,
},
util::EndianQW,
},
};
/*
create user txn
*/
impl<'a> GNSEvent for CreateUserTxn<'a> {
type CommitType = Self;
type RestoreType = FullUserDefinition;
fn update_global_state(
FullUserDefinition { username, password }: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
if gns.sys_db().__insert_user(username, password) {
Ok(())
} else {
Err(TransactionError::OnRestoreDataConflictAlreadyExists.into())
}
}
}
pub struct FullUserDefinition {
username: Box<str>,
password: Box<[u8]>,
}
impl FullUserDefinition {
fn new(username: Box<str>, password: Box<[u8]>) -> Self {
Self { username, password }
}
}
pub struct CreateUserMetadata {
uname_l: u64,
pwd_l: u64,
props_l: u64,
}
impl CreateUserMetadata {
pub fn new(uname_l: u64, pwd_l: u64, props_l: u64) -> Self {
Self {
uname_l,
pwd_l,
props_l,
}
}
}
impl<'a> PersistObject for CreateUserTxn<'a> {
const METADATA_SIZE: usize = sizeof!(u64, 3);
type InputType = Self;
type OutputType = FullUserDefinition;
type Metadata = CreateUserMetadata;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left((md.uname_l + md.pwd_l) as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
// [username length: 8B][password length: 8B][properties length: 8B]
buf.extend(data.username().len().u64_bytes_le());
buf.extend(data.password_hash().len().u64_bytes_le());
buf.extend(0u64.u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let uname_l = scanner.next_u64_le();
let pwd_l = scanner.next_u64_le();
let props_l = scanner.next_u64_le();
Ok(CreateUserMetadata::new(uname_l, pwd_l, props_l))
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.username().as_bytes());
buf.extend(data.password_hash());
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let username = dec::utils::decode_string(s, md.uname_l as _)?;
let password = s.next_chunk_variable(md.pwd_l as _);
if md.props_l == 0 {
Ok(FullUserDefinition::new(
username.into_boxed_str(),
password.to_vec().into_boxed_slice(),
))
} else {
Err(StorageError::InternalDecodeStructureIllegalData.into())
}
}
}
/*
alter user txn
*/
impl<'a> GNSEvent for AlterUserTxn<'a> {
type CommitType = Self;
type RestoreType = FullUserDefinition;
fn update_global_state(
FullUserDefinition { username, password }: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
if gns.sys_db().__change_user_password(&username, password) {
Ok(())
} else {
Err(TransactionError::OnRestoreDataConflictMismatch.into())
}
}
}
impl<'a> PersistObject for AlterUserTxn<'a> {
const METADATA_SIZE: usize = sizeof!(u64, 3);
type InputType = Self;
type OutputType = FullUserDefinition;
type Metadata = CreateUserMetadata;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left((md.uname_l + md.pwd_l) as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
// [username length: 8B][password length: 8B][properties length: 8B]
buf.extend(data.username().len().u64_bytes_le());
buf.extend(data.password_hash().len().u64_bytes_le());
buf.extend(0u64.u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let uname_l = scanner.next_u64_le();
let pwd_l = scanner.next_u64_le();
let props_l = scanner.next_u64_le();
Ok(CreateUserMetadata::new(uname_l, pwd_l, props_l))
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.username().as_bytes());
buf.extend(data.password_hash());
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let username = dec::utils::decode_string(s, md.uname_l as _)?;
let password = s.next_chunk_variable(md.pwd_l as _);
if md.props_l == 0 {
Ok(FullUserDefinition::new(
username.into_boxed_str(),
password.to_vec().into_boxed_slice(),
))
} else {
Err(StorageError::InternalDecodeStructureIllegalData.into())
}
}
}
/*
drop user txn
*/
pub struct DropUserPayload(Box<str>);
impl<'a> GNSEvent for DropUserTxn<'a> {
type CommitType = Self;
type RestoreType = DropUserPayload;
fn update_global_state(
DropUserPayload(username): Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
if gns.sys_db().__delete_user(&username) {
Ok(())
} else {
Err(TransactionError::OnRestoreDataConflictMismatch.into())
}
}
}
impl<'a> PersistObject for DropUserTxn<'a> {
const METADATA_SIZE: usize = sizeof!(u64);
type InputType = Self;
type OutputType = DropUserPayload;
type Metadata = u64;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(*md as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.username().len().u64_bytes_le())
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
Ok(scanner.next_u64_le())
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.username().as_bytes());
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let username = dec::utils::decode_string(s, md as usize)?;
Ok(DropUserPayload(username.into_boxed_str()))
}
}

@ -26,6 +26,15 @@
//! Implementations of the Skytable Disk Storage Subsystem (SDSS)
use {
self::safe_interfaces::LocalFS,
super::{
config::Configuration, core::GlobalNS, fractal::context, fractal::ModelDrivers,
RuntimeResult,
},
std::path::Path,
};
mod common;
mod common_encoding;
// driver versions
@ -35,5 +44,48 @@ pub mod v2;
pub mod safe_interfaces {
#[cfg(test)]
pub use super::common::interface::fs_test::{NullFS, VirtualFS};
pub use super::common::interface::{fs_imp::LocalFS, fs_traits::FSInterface};
pub use super::{
common::{
interface::{fs_imp::LocalFS, fs_traits::FSInterface},
paths_v1,
},
v2::impls::mdl_journal::StdModelBatch,
};
}
/*
loader impl
*/
pub use v2::impls::{gns_log::GNSDriver, mdl_journal::ModelDriver};
pub struct SELoaded {
pub gns: GlobalNS,
pub gns_driver: v2::impls::gns_log::GNSDriver<LocalFS>,
pub model_drivers: ModelDrivers<LocalFS>,
}
pub fn load(cfg: &Configuration) -> RuntimeResult<SELoaded> {
info!("loading databases");
// first determine if this is a new install, an existing install or if it uses the old driver
if Path::new(v1::SYSDB_PATH).is_file() {
warn!("older storage format detected");
// this is an old install
info!("loading data");
context::set_dmsg("loading storage-v1 in compatibility mode");
let gns = v1::load_gns_prepare_migration()?;
info!("loaded data. now upgrading to new storage format");
context::set_dmsg("upgrading storage-v1 to storage-v2 format");
return v2::recreate(gns);
}
if !Path::new(v2::GNS_PATH).is_file() {
info!("initializing databases");
context::set_dmsg("creating databases");
// this is a new install
v2::initialize_new(cfg)
} else {
info!("reinitializing databases");
context::set_dmsg("loading databases");
v2::restore()
}
}

@ -24,132 +24,66 @@
*
*/
#[cfg(test)]
use crate::engine::storage::{
common::interface::fs_traits::{FSInterface, FileOpen},
v1::raw::journal::raw::JournalWriter,
};
use std::collections::HashMap;
use crate::engine::{
core::{EntityIDRef, GlobalNS},
data::uuid::Uuid,
error::RuntimeResult,
fractal::{error::ErrorContext, FractalModelDriver, ModelDrivers, ModelUniqueID},
fractal::{error::ErrorContext, ModelUniqueID},
storage::{
common::interface::fs_imp::LocalFS,
common::{interface::fs_imp::LocalFS, paths_v1},
v1::raw::{
batch_jrnl,
journal::{raw as raw_journal, GNSAdapter, GNSTransactionDriverAnyFS},
journal::{raw as raw_journal, GNSAdapter},
spec,
},
},
};
const GNS_FILE_PATH: &str = "gns.db-tlog";
const DATA_DIR: &str = "data";
pub struct SEInitState {
pub txn_driver: GNSTransactionDriverAnyFS<LocalFS>,
pub model_drivers: ModelDrivers<LocalFS>,
pub gns: GlobalNS,
}
impl SEInitState {
pub fn new(
txn_driver: GNSTransactionDriverAnyFS<LocalFS>,
model_drivers: ModelDrivers<LocalFS>,
gns: GlobalNS,
) -> Self {
Self {
txn_driver,
model_drivers,
gns,
}
}
pub fn try_init(is_new: bool) -> RuntimeResult<Self> {
let gns = GlobalNS::empty();
let gns_txn_driver = if is_new {
raw_journal::create_journal::<GNSAdapter, LocalFS, spec::GNSTransactionLogV1>(
GNS_FILE_PATH,
)
} else {
raw_journal::load_journal::<GNSAdapter, LocalFS, spec::GNSTransactionLogV1>(
GNS_FILE_PATH,
&gns,
)
}?;
let mut model_drivers = ModelDrivers::new();
let mut driver_guard = || {
if is_new {
std::fs::create_dir(DATA_DIR).inherit_set_dmsg("creating data directory")?;
}
if !is_new {
let mut models = gns.idx_models().write();
// this is an existing instance, so read in all data
for (space_name, space) in gns.idx().read().iter() {
let space_uuid = space.get_uuid();
for model_name in space.models().iter() {
let model = models
.get_mut(&EntityIDRef::new(&space_name, &model_name))
.unwrap();
let path =
Self::model_path(space_name, space_uuid, model_name, model.get_uuid());
let persist_driver = batch_jrnl::reinit(&path, model).inherit_set_dmsg(
format!("failed to restore model data from journal in `{path}`"),
)?;
unsafe {
// UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum
model.model_mutator().vacuum_stashed();
}
let _ = model_drivers.insert(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
FractalModelDriver::init(persist_driver),
);
}
pub fn load_gns() -> RuntimeResult<GlobalNS> {
let gns = GlobalNS::empty();
let gns_txn_driver = raw_journal::load_journal::<GNSAdapter, LocalFS, spec::GNSTransactionLogV1>(
super::GNS_PATH,
&gns,
)?;
let mut model_drivers = HashMap::new();
let mut driver_guard = || {
let mut models = gns.idx_models().write();
// this is an existing instance, so read in all data
for (space_name, space) in gns.idx().read().iter() {
let space_uuid = space.get_uuid();
for model_name in space.models().iter() {
let model = models
.get_mut(&EntityIDRef::new(&space_name, &model_name))
.unwrap();
let path =
paths_v1::model_path(space_name, space_uuid, model_name, model.get_uuid());
let persist_driver = batch_jrnl::reinit::<LocalFS>(&path, model).inherit_set_dmsg(
format!("failed to restore model data from journal in `{path}`"),
)?;
unsafe {
// UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum
model.model_mutator().vacuum_stashed();
}
let _ = model_drivers.insert(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
persist_driver,
);
}
RuntimeResult::Ok(())
};
if let Err(e) = driver_guard() {
gns_txn_driver.close().unwrap();
for (_, driver) in model_drivers {
driver.close().unwrap();
}
return Err(e);
}
Ok(SEInitState::new(
GNSTransactionDriverAnyFS::new(gns_txn_driver),
model_drivers,
gns,
))
}
pub fn model_path(
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> String {
format!(
"{}/data.db-btlog",
Self::model_dir(space_name, space_uuid, model_name, model_uuid)
)
}
pub fn model_dir(
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> String {
format!("data/{space_name}-{space_uuid}/mdl_{model_name}-{model_uuid}")
RuntimeResult::Ok(())
};
if let Err(e) = driver_guard() {
gns_txn_driver.close().unwrap();
for (_, driver) in model_drivers {
driver.close().unwrap();
}
return Err(e);
}
pub fn space_dir(space_name: &str, space_uuid: Uuid) -> String {
format!("data/{space_name}-{space_uuid}")
// close all drivers
gns_txn_driver.close().unwrap();
for (_, driver) in model_drivers {
driver.close().unwrap();
}
}
#[cfg(test)]
pub fn open_gns_driver<Fs: FSInterface>(
path: &str,
gns: &GlobalNS,
) -> RuntimeResult<FileOpen<JournalWriter<Fs, GNSAdapter>>> {
raw_journal::open_or_create_journal::<GNSAdapter, Fs, spec::GNSTransactionLogV1>(path, gns)
Ok(gns)
}

@ -28,10 +28,42 @@
//!
//! Target tags: `0.8.0-beta`, `0.8.0-beta.2`, `0.8.0-beta.3`
pub mod loader;
mod loader;
pub mod raw;
pub use self::{
raw::batch_jrnl::create as create_batch_journal, raw::batch_jrnl::DataBatchPersistDriver,
raw::journal::GNSTransactionDriverAnyFS,
use {
self::raw::sysdb::RestoredSystemDatabase,
super::common::interface::{fs_imp::LocalFS, fs_traits::FSInterface},
crate::{
engine::{core::GlobalNS, RuntimeResult},
util,
},
};
pub const GNS_PATH: &str = "gns.db-tlog";
pub const SYSDB_PATH: &str = "sys.db";
pub const DATA_DIR: &str = "data";
pub fn load_gns_prepare_migration() -> RuntimeResult<GlobalNS> {
// load gns
let gns = loader::load_gns()?;
// load sysdb
let RestoredSystemDatabase { users, .. } =
raw::sysdb::RestoredSystemDatabase::restore::<LocalFS>(SYSDB_PATH)?;
for (user, phash) in users {
gns.sys_db().__insert_user(user, phash);
}
// now move all our files into a backup directory
let backup_dir_path = format!(
"backups/{}",
util::time_now_with_postfix("before_upgrade_to_v2")
);
// move data folder
LocalFS::fs_create_dir_all(&backup_dir_path)?;
util::os::move_files_recursively("data", &format!("{backup_dir_path}/data"))?;
// move GNS
LocalFS::fs_rename(GNS_PATH, &format!("{backup_dir_path}/{GNS_PATH}"))?;
// move sysdb
LocalFS::fs_rename(SYSDB_PATH, &format!("{backup_dir_path}/{SYSDB_PATH}"))?;
Ok(gns)
}

@ -38,8 +38,6 @@ const MARKER_ACTUAL_BATCH_EVENT: u8 = 0xFE;
/// recovery batch event marker
const MARKER_RECOVERY_EVENT: u8 = 0xFF;
#[cfg(test)]
pub(super) use restore::{DecodedBatchEvent, DecodedBatchEventKind, NormalBatch};
pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver};
use {
@ -61,9 +59,3 @@ pub fn reinit<Fs: FSInterface>(
restore_driver.read_data_batch_into_model(model)?;
DataBatchPersistDriver::new(restore_driver.into_file()?, false)
}
/// Create a new batch journal
pub fn create<Fs: FSInterface>(path: &str) -> RuntimeResult<DataBatchPersistDriver<Fs>> {
let f = SDSSFileIO::<Fs>::create::<spec::DataBatchJournalV1>(path)?;
DataBatchPersistDriver::new(f, true)
}

@ -25,34 +25,14 @@
*/
use {
super::{
MARKER_ACTUAL_BATCH_EVENT, MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN, MARKER_END_OF_BATCH,
MARKER_RECOVERY_EVENT,
},
crate::{
engine::{
core::{
index::{PrimaryIndexKey, RowData},
model::{
delta::{DataDelta, DataDeltaKind, DeltaVersion},
Model,
},
},
data::{
cell::Datacell,
tag::{DataTag, TagUnique},
},
error::{RuntimeResult, StorageError},
idx::STIndexSeq,
storage::{
common::interface::fs_traits::FSInterface,
common_encoding::r1,
v1::raw::rw::{SDSSFileIO, TrackedWriter},
},
super::{MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN},
crate::engine::{
error::{RuntimeResult, StorageError},
storage::{
common::interface::fs_traits::FSInterface,
v1::raw::rw::{SDSSFileIO, TrackedWriter},
},
util::EndianQW,
},
crossbeam_epoch::pin,
};
pub struct DataBatchPersistDriver<Fs: FSInterface> {
@ -76,176 +56,4 @@ impl<Fs: FSInterface> DataBatchPersistDriver<Fs> {
return Err(StorageError::DataBatchCloseError.into());
}
}
pub fn write_new_batch(&mut self, model: &Model, observed_len: usize) -> RuntimeResult<()> {
// pin model
let schema_version = model.delta_state().schema_current_version();
let g = pin();
// init restore list
let mut restore_list = Vec::new();
// prepare computations
let mut i = 0;
let mut inconsistent_reads = 0;
let mut exec = || -> RuntimeResult<()> {
// write batch start
self.write_batch_start(
observed_len,
schema_version,
model.p_tag().tag_unique(),
model.fields().len() - 1,
)?;
while i < observed_len {
let delta = model.delta_state().__data_delta_dequeue(&g).unwrap();
restore_list.push(delta.clone()); // TODO(@ohsayan): avoid this
match delta.change() {
DataDeltaKind::Delete => {
self.write_batch_item_common_row_data(&delta)?;
self.encode_pk_only(delta.row().d_key())?;
}
DataDeltaKind::Insert | DataDeltaKind::Update => {
// resolve deltas (this is yet another opportunity for us to reclaim memory from deleted items)
let row_data = delta
.row()
.resolve_schema_deltas_and_freeze_if(&model.delta_state(), |row| {
row.get_txn_revised() <= delta.data_version()
});
if row_data.get_txn_revised() > delta.data_version() {
// we made an inconsistent (stale) read; someone updated the state after our snapshot
inconsistent_reads += 1;
i += 1;
continue;
}
self.write_batch_item_common_row_data(&delta)?;
// encode data
self.encode_pk_only(delta.row().d_key())?;
self.encode_row_data(model, &row_data)?;
}
}
i += 1;
}
return self.append_batch_summary_and_sync(observed_len, inconsistent_reads);
};
match exec() {
Ok(()) => Ok(()),
Err(e) => {
// republish changes since we failed to commit
restore_list.into_iter().for_each(|delta| {
model.delta_state().append_new_data_delta(delta, &g);
});
// now attempt to fix the file
self.attempt_fix_data_batchfile()?;
// IMPORTANT: return an error because even though we recovered the journal we still didn't succeed in
// writing the batch
return Err(e);
}
}
}
/// Write the batch start block:
/// - Batch start magic
/// - Primary key type
/// - Expected commit
/// - Schema version
/// - Column count
fn write_batch_start(
&mut self,
observed_len: usize,
schema_version: DeltaVersion,
pk_tag: TagUnique,
col_cnt: usize,
) -> RuntimeResult<()> {
self.f
.tracked_write(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?;
let observed_len_bytes = observed_len.u64_bytes_le();
self.f.tracked_write(&observed_len_bytes)?;
self.f
.tracked_write(&schema_version.value_u64().to_le_bytes())?;
self.f.tracked_write(&col_cnt.u64_bytes_le())?;
Ok(())
}
/// Append a summary of this batch and most importantly, **sync everything to disk**
fn append_batch_summary_and_sync(
&mut self,
observed_len: usize,
inconsistent_reads: usize,
) -> RuntimeResult<()> {
// [0xFD][actual_commit][checksum]
self.f.tracked_write(&[MARKER_END_OF_BATCH])?;
let actual_commit = (observed_len - inconsistent_reads).u64_bytes_le();
self.f.tracked_write(&actual_commit)?;
let cs = self.f.reset_and_finish_checksum().to_le_bytes();
self.f.untracked_write(&cs)?;
// IMPORTANT: now that all data has been written, we need to actually ensure that the writes pass through the cache
self.f.sync_writes()?;
Ok(())
}
/// Attempt to fix the batch journal
// TODO(@ohsayan): declare an "international system disaster" when this happens
fn attempt_fix_data_batchfile(&mut self) -> RuntimeResult<()> {
/*
attempt to append 0xFF to the part of the file where a corruption likely occurred, marking
it recoverable
*/
if self.f.untracked_write(&[MARKER_RECOVERY_EVENT]).is_ok() {
return Ok(());
}
Err(StorageError::DataBatchRecoveryFailStageOne.into())
}
}
impl<Fs: FSInterface> DataBatchPersistDriver<Fs> {
/// encode the primary key only. this means NO TAG is encoded.
fn encode_pk_only(&mut self, pk: &PrimaryIndexKey) -> RuntimeResult<()> {
let buf = &mut self.f;
match pk.tag() {
TagUnique::UnsignedInt | TagUnique::SignedInt => {
let data = unsafe {
// UNSAFE(@ohsayan): +tagck
pk.read_uint()
}
.to_le_bytes();
buf.tracked_write(&data)?;
}
TagUnique::Str | TagUnique::Bin => {
let slice = unsafe {
// UNSAFE(@ohsayan): +tagck
pk.read_bin()
};
let slice_l = slice.len().u64_bytes_le();
buf.tracked_write(&slice_l)?;
buf.tracked_write(slice)?;
}
TagUnique::Illegal => unsafe {
// UNSAFE(@ohsayan): a pk can't be constructed with illegal
impossible!()
},
}
Ok(())
}
/// Encode a single cell
fn encode_cell(&mut self, value: &Datacell) -> RuntimeResult<()> {
let mut buf = vec![];
r1::obj::cell::encode(&mut buf, value);
self.f.tracked_write(&buf)?;
Ok(())
}
/// Encode row data
fn encode_row_data(&mut self, model: &Model, row_data: &RowData) -> RuntimeResult<()> {
for field_name in model.fields().stseq_ord_key() {
match row_data.fields().get(field_name) {
Some(cell) => {
self.encode_cell(cell)?;
}
None if field_name.as_str() == model.p_key() => {}
None => self.f.tracked_write(&[0])?,
}
}
Ok(())
}
/// Write the change type and txnid
fn write_batch_item_common_row_data(&mut self, delta: &DataDelta) -> RuntimeResult<()> {
let change_type = [delta.change().value_u8()];
self.f.tracked_write(&change_type)?;
let txn_id = delta.data_version().value_u64().to_le_bytes();
self.f.tracked_write(&txn_id)?;
Ok(())
}
}

@ -127,17 +127,6 @@ impl<F: FSInterface> DataBatchRestoreDriver<F> {
Self::apply_batch(model, batch)
})
}
#[cfg(test)]
pub(in crate::engine::storage::v1) fn read_all_batches(
&mut self,
) -> RuntimeResult<Vec<NormalBatch>> {
let mut all_batches = vec![];
self.read_all_batches_and_for_each(|batch| {
all_batches.push(batch);
Ok(())
})?;
Ok(all_batches)
}
}
impl<F: FSInterface> DataBatchRestoreDriver<F> {

@ -25,49 +25,15 @@
*/
use {
self::raw::{JournalAdapter, JournalWriter},
self::raw::JournalAdapter,
crate::engine::{
core::GlobalNS,
error::TransactionError,
mem::BufferedScanner,
storage::{
common_encoding::r1::impls::gns::GNSEvent,
safe_interfaces::{FSInterface, LocalFS},
},
txn::gns,
RuntimeResult,
core::GlobalNS, error::TransactionError, mem::BufferedScanner,
storage::common_encoding::r1::impls::gns::GNSEvent, txn::gns, RuntimeResult,
},
};
pub mod raw;
/// The GNS transaction driver is used to handle DDL transactions
pub struct GNSTransactionDriverAnyFS<Fs: FSInterface = LocalFS> {
journal: JournalWriter<Fs, GNSAdapter>,
}
impl<Fs: FSInterface> GNSTransactionDriverAnyFS<Fs> {
pub fn new(journal: JournalWriter<Fs, GNSAdapter>) -> Self {
Self { journal }
}
pub fn into_inner(self) -> JournalWriter<Fs, GNSAdapter> {
self.journal
}
pub fn __journal_mut(&mut self) -> &mut JournalWriter<Fs, GNSAdapter> {
&mut self.journal
}
/// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning
/// errors (if any)
pub fn try_commit<GE: GNSEvent>(&mut self, gns_event: GE) -> RuntimeResult<()> {
let mut buf = vec![];
buf.extend((GE::CODE as u16).to_le_bytes());
GE::encode_event(gns_event, &mut buf);
self.journal
.append_event_with_recovery_plugin(GNSSuperEvent(buf.into_boxed_slice()))?;
Ok(())
}
}
/*
journal implementor
*/

@ -41,9 +41,6 @@
- FIXME(@ohsayan): we will probably (naively) need to dynamically reposition the cursor in case the metadata is corrupted as well
*/
#[cfg(test)]
use crate::engine::storage::common::interface::fs_traits::FileOpen;
use {
super::super::{rw::SDSSFileIO, spec::Header},
crate::{
@ -58,35 +55,6 @@ use {
const CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
#[cfg(test)]
pub fn open_or_create_journal<
TA: JournalAdapter,
Fs: FSInterface,
F: sdss::sdss_r1::FileSpecV1<DecodeArgs = (), EncodeArgs = ()>,
>(
log_file_name: &str,
gs: &TA::GlobalState,
) -> RuntimeResult<FileOpen<JournalWriter<Fs, TA>>> {
let file = match SDSSFileIO::<Fs>::open_or_create_perm_rw::<F>(log_file_name)? {
FileOpen::Created(f) => return Ok(FileOpen::Created(JournalWriter::new(f, 0, true)?)),
FileOpen::Existing((file, _header)) => file,
};
let (file, last_txn) = JournalReader::<TA, Fs>::scroll(file, gs)?;
Ok(FileOpen::Existing(JournalWriter::new(
file, last_txn, false,
)?))
}
pub fn create_journal<
TA: JournalAdapter,
Fs: FSInterface,
F: sdss::sdss_r1::FileSpecV1<EncodeArgs = ()>,
>(
log_file_name: &str,
) -> RuntimeResult<JournalWriter<Fs, TA>> {
JournalWriter::new(SDSSFileIO::create::<F>(log_file_name)?, 0, true)
}
pub fn load_journal<
TA: JournalAdapter,
Fs: FSInterface,
@ -395,46 +363,9 @@ impl<Fs: FSInterface, TA: JournalAdapter> JournalWriter<Fs, TA> {
}
Ok(slf)
}
pub fn append_event(&mut self, event: TA::JournalEvent) -> RuntimeResult<()> {
let encoded = TA::encode(event);
let md = JournalEntryMetadata::new(
self._incr_id() as u128,
EventSourceMarker::SERVER_STD,
CRC.checksum(&encoded),
encoded.len() as u64,
)
.encoded();
self.log_file.write_buffer(&md)?;
self.log_file.write_buffer(&encoded)?;
self.log_file.fsync_all()?;
Ok(())
}
pub fn append_event_with_recovery_plugin(
&mut self,
event: TA::JournalEvent,
) -> RuntimeResult<()> {
debug_assert!(TA::RECOVERY_PLUGIN);
match self.append_event(event) {
Ok(()) => Ok(()),
Err(e) => compiler::cold_call(move || {
// IMPORTANT: we still need to return an error so that the caller can retry if deemed appropriate
self.appendrec_journal_reverse_entry()?;
Err(e)
}),
}
}
}
impl<Fs: FSInterface, TA> JournalWriter<Fs, TA> {
pub fn appendrec_journal_reverse_entry(&mut self) -> RuntimeResult<()> {
let mut entry =
JournalEntryMetadata::new(0, EventSourceMarker::RECOVERY_REVERSE_LAST_JOURNAL, 0, 0);
entry.event_id = self._incr_id() as u128;
if self.log_file.fsynced_write(&entry.encoded()).is_ok() {
return Ok(());
}
Err(StorageError::JournalWRecoveryStageOneFailCritical.into())
}
pub fn append_journal_reopen(&mut self) -> RuntimeResult<()> {
let id = self._incr_id() as u128;
self.log_file.fsynced_write(

@ -30,5 +30,3 @@ pub(super) mod journal;
pub(super) mod rw;
pub mod spec;
pub mod sysdb;
#[cfg(test)]
mod tests;

@ -31,8 +31,8 @@ use {
storage::common::{
checksum::SCrc64,
interface::fs_traits::{
FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt,
FileInterfaceRead, FileInterfaceWrite, FileInterfaceWriteExt, FileOpen,
FSInterface, FileInterface, FileInterfaceExt, FileInterfaceRead,
FileInterfaceWrite, FileInterfaceWriteExt,
},
sdss,
},
@ -44,32 +44,16 @@ use {
pub struct TrackedWriter<Fs: FSInterface> {
file: SDSSFileIO<Fs, <Fs::File as FileInterface>::BufWriter>,
cs: SCrc64,
_cs: SCrc64,
}
impl<Fs: FSInterface> TrackedWriter<Fs> {
pub fn new(f: SDSSFileIO<Fs>) -> RuntimeResult<Self> {
Ok(Self {
file: f.into_buffered_writer()?,
cs: SCrc64::new(),
_cs: SCrc64::new(),
})
}
pub fn tracked_write(&mut self, block: &[u8]) -> RuntimeResult<()> {
self.untracked_write(block).map(|_| self.cs.update(block))
}
pub fn untracked_write(&mut self, block: &[u8]) -> RuntimeResult<()> {
match self.file.write_buffer(block) {
Ok(()) => Ok(()),
e => e,
}
}
pub fn sync_writes(&mut self) -> RuntimeResult<()> {
self.file.f.sync_write_cache()
}
pub fn reset_and_finish_checksum(&mut self) -> u64 {
let scrc = core::mem::replace(&mut self.cs, SCrc64::new());
scrc.finish()
}
pub fn sync_into_inner(self) -> RuntimeResult<SDSSFileIO<Fs>> {
self.file.downgrade_writer()
}
@ -161,31 +145,6 @@ impl<Fs: FSInterface> SDSSFileIO<Fs> {
let v = F::read_metadata(&mut f.f, ())?;
Ok((f, v))
}
pub fn create<F: sdss::sdss_r1::FileSpecV1<EncodeArgs = ()>>(
fpath: &str,
) -> RuntimeResult<Self> {
let mut f = Self::_new(Fs::fs_fcreate_rw(fpath)?);
F::write_metadata(&mut f.f, ())?;
Ok(f)
}
pub fn open_or_create_perm_rw<
F: sdss::sdss_r1::FileSpecV1<DecodeArgs = (), EncodeArgs = ()>,
>(
fpath: &str,
) -> RuntimeResult<FileOpen<Self, (Self, F::Metadata)>> {
match Fs::fs_fopen_or_create_rw(fpath)? {
FileOpen::Created(c) => {
let mut f = Self::_new(c);
F::write_metadata(&mut f.f, ())?;
Ok(FileOpen::Created(f))
}
FileOpen::Existing(e) => {
let mut f = Self::_new(e);
let header = F::read_metadata(&mut f.f, ())?;
Ok(FileOpen::Existing((f, header)))
}
}
}
pub fn into_buffered_reader(
self,
) -> RuntimeResult<SDSSFileIO<Fs, <Fs::File as FileInterface>::BufReader>> {
@ -248,17 +207,7 @@ impl<Fs: FSInterface, F: FileInterfaceRead + FileInterfaceExt> SDSSFileIO<Fs, F>
}
}
impl<Fs: FSInterface, F: FileInterfaceWrite> SDSSFileIO<Fs, F> {
pub fn write_buffer(&mut self, data: &[u8]) -> RuntimeResult<()> {
self.f.fw_write_all(data)
}
}
impl<Fs: FSInterface, F: FileInterfaceWrite + FileInterfaceWriteExt> SDSSFileIO<Fs, F> {
pub fn fsync_all(&mut self) -> RuntimeResult<()> {
self.f.fwext_sync_all()?;
Ok(())
}
pub fn fsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> {
self.f.fw_write_all(data)?;
self.f.fwext_sync_all()

@ -25,41 +25,18 @@
*/
use {
super::spec::SysDBV1,
crate::engine::{
config::{ConfigAuth, ConfigMode},
core::system_db::SystemDatabase,
data::{cell::Datacell, DictEntryGeneric, DictGeneric},
error::{RuntimeResult, StorageError},
fractal::sys_store::{SysAuth, SysAuthUser, SysConfig, SysHostData, SystemStore},
storage::{
common::interface::fs_traits::{FSInterface, FileOpen},
common_encoding::r1,
v1::raw::{rw::SDSSFileIO, spec},
common::interface::fs_traits::FSInterface, common_encoding::r1, v1::raw::rw::SDSSFileIO,
},
},
parking_lot::RwLock,
std::collections::HashMap,
};
#[derive(Debug, PartialEq)]
/// The system store init state
pub enum SystemStoreInitState {
/// No system store was present. it was created
Created,
/// The system store was present, but no new changes were applied
Unchanged,
/// The system store was present, root settings were updated
UpdatedRoot,
}
impl SystemStoreInitState {
pub const fn is_created(&self) -> bool {
matches!(self, Self::Created)
}
pub const fn is_existing_updated_root(&self) -> bool {
matches!(self, Self::UpdatedRoot)
}
}
fn rkey<T>(
d: &mut DictGeneric,
key: &str,
@ -71,117 +48,31 @@ fn rkey<T>(
}
}
impl<Fs: FSInterface> SystemStore<Fs> {
const SYSDB_PATH: &'static str = "sys.db";
const SYSDB_COW_PATH: &'static str = "sys.db.cow";
pub struct RestoredSystemDatabase {
pub users: HashMap<Box<str>, Box<[u8]>>,
pub startup_counter: u64,
pub settings_version: u64,
}
impl RestoredSystemDatabase {
const SYS_KEY_AUTH: &'static str = "auth";
const SYS_KEY_AUTH_USERS: &'static str = "users";
const SYS_KEY_SYS: &'static str = "sys";
const SYS_KEY_SYS_STARTUP_COUNTER: &'static str = "sc";
const SYS_KEY_SYS_SETTINGS_VERSION: &'static str = "sv";
pub fn open_or_restore(
auth: ConfigAuth,
run_mode: ConfigMode,
) -> RuntimeResult<(Self, SystemStoreInitState)> {
Self::open_with_name(Self::SYSDB_PATH, Self::SYSDB_COW_PATH, auth, run_mode)
}
pub fn sync_db(&self, auth: &SysAuth) -> RuntimeResult<()> {
self._sync_with(Self::SYSDB_PATH, Self::SYSDB_COW_PATH, auth)
}
pub fn open_with_name(
sysdb_name: &str,
sysdb_cow_path: &str,
auth: ConfigAuth,
run_mode: ConfigMode,
) -> RuntimeResult<(Self, SystemStoreInitState)> {
match SDSSFileIO::open_or_create_perm_rw::<spec::SysDBV1>(sysdb_name)? {
FileOpen::Created(new) => {
let me = Self::_new(SysConfig::new_auth(auth, run_mode));
me._sync(new, &me.system_store().auth_data().read())?;
Ok((me, SystemStoreInitState::Created))
}
FileOpen::Existing((ex, _)) => {
Self::restore_and_sync(ex, auth, run_mode, sysdb_name, sysdb_cow_path)
}
}
}
}
impl<Fs: FSInterface> SystemStore<Fs> {
fn _sync(&self, mut f: SDSSFileIO<Fs>, auth: &SysAuth) -> RuntimeResult<()> {
let cfg = self.system_store();
// prepare our flat file
let mut map: DictGeneric = into_dict!(
Self::SYS_KEY_SYS => DictEntryGeneric::Map(into_dict!(
Self::SYS_KEY_SYS_SETTINGS_VERSION => Datacell::new_uint_default(cfg.host_data().settings_version() as _),
Self::SYS_KEY_SYS_STARTUP_COUNTER => Datacell::new_uint_default(cfg.host_data().startup_counter() as _),
)),
Self::SYS_KEY_AUTH => DictGeneric::new(),
);
let auth_key = map.get_mut(Self::SYS_KEY_AUTH).unwrap();
let auth_key = auth_key.as_dict_mut().unwrap();
auth_key.insert(
Self::SYS_KEY_AUTH_USERS.into(),
DictEntryGeneric::Map(
// username -> [..settings]
auth.users()
.iter()
.map(|(username, user)| {
(
username.to_owned(),
DictEntryGeneric::Data(Datacell::new_list(vec![Datacell::new_bin(
user.key().into(),
)])),
)
})
.collect(),
),
);
// write
let buf = r1::enc::full_dict::<r1::map::GenericDictSpec>(&map);
f.fsynced_write(&buf)
}
fn _sync_with(&self, target: &str, cow: &str, auth: &SysAuth) -> RuntimeResult<()> {
let f = SDSSFileIO::create::<spec::SysDBV1>(cow)?;
self._sync(f, auth)?;
Fs::fs_rename(cow, target)
}
fn restore_and_sync(
f: SDSSFileIO<Fs>,
auth: ConfigAuth,
run_mode: ConfigMode,
fname: &str,
fcow_name: &str,
) -> RuntimeResult<(Self, SystemStoreInitState)> {
let prev_sysdb = Self::_restore(f, run_mode)?;
let state;
// see if settings have changed
if prev_sysdb
.auth_data()
.read()
.verify_user(SysAuthUser::USER_ROOT, &auth.root_key)
.is_ok()
{
state = SystemStoreInitState::Unchanged;
} else {
state = SystemStoreInitState::UpdatedRoot;
pub fn new(
users: HashMap<Box<str>, Box<[u8]>>,
startup_counter: u64,
settings_version: u64,
) -> Self {
Self {
users,
startup_counter,
settings_version,
}
// create new config
let new_syscfg = SysConfig::new_full(
auth,
SysHostData::new(
prev_sysdb.host_data().startup_counter() + 1,
prev_sysdb.host_data().settings_version()
+ !matches!(state, SystemStoreInitState::Unchanged) as u32,
),
run_mode,
);
let slf = Self::_new(new_syscfg);
// now sync
slf._sync_with(fname, fcow_name, &slf.system_store().auth_data().read())?;
Ok((slf, state))
}
fn _restore(mut f: SDSSFileIO<Fs>, run_mode: ConfigMode) -> RuntimeResult<SysConfig> {
pub fn restore<Fs: FSInterface>(name: &str) -> RuntimeResult<Self> {
let (mut f, _) = SDSSFileIO::<Fs>::open::<SysDBV1>(name)?;
let mut sysdb_data = r1::dec::dict_full::<r1::map::GenericDictSpec>(&f.read_full()?)?;
// get our auth and sys stores
let mut auth_store = rkey(
@ -214,9 +105,8 @@ impl<Fs: FSInterface> SystemStore<Fs> {
.remove(0)
.into_bin()
.ok_or(StorageError::SysDBCorrupted)?;
loaded_users.insert(username, SysAuthUser::new(user_password.into_boxed_slice()));
loaded_users.insert(username, user_password.into_boxed_slice());
}
let sys_auth = SysAuth::new(loaded_users);
// load sys data
let sc = rkey(&mut sys_store, Self::SYS_KEY_SYS_STARTUP_COUNTER, |d| {
d.into_data()?.into_uint()
@ -227,14 +117,10 @@ impl<Fs: FSInterface> SystemStore<Fs> {
if !(sysdb_data.is_empty()
& auth_store.is_empty()
& sys_store.is_empty()
& sys_auth.users().contains_key(SysAuthUser::USER_ROOT))
& loaded_users.contains_key(SystemDatabase::ROOT_ACCOUNT))
{
return Err(StorageError::SysDBCorrupted.into());
}
Ok(SysConfig::new(
RwLock::new(sys_auth),
SysHostData::new(sc, sv as u32),
run_mode,
))
Ok(Self::new(loaded_users, sc, sv))
}
}

@ -1,115 +0,0 @@
/*
* Created on Sat Jul 29 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
type VirtualFS = crate::engine::storage::common::interface::fs_test::VirtualFS;
mod batch;
mod rw;
mod tx;
mod sysdb {
use {
super::{super::sysdb::SystemStoreInitState, VirtualFS as VFS},
crate::engine::{
config::{AuthDriver, ConfigAuth, ConfigMode},
fractal::sys_store::SystemStore,
},
};
fn open_sysdb(
auth_config: ConfigAuth,
sysdb_path: &str,
sysdb_cow_path: &str,
) -> (SystemStore<VFS>, SystemStoreInitState) {
SystemStore::<VFS>::open_with_name(sysdb_path, sysdb_cow_path, auth_config, ConfigMode::Dev)
.unwrap()
}
#[test]
fn open_close() {
let open = |auth_config| {
open_sysdb(
auth_config,
"open_close_test.sys.db",
"open_close_test.sys.cow.db",
)
};
let auth_config = ConfigAuth::new(AuthDriver::Pwd, "password12345678".into());
{
let (config, state) = open(auth_config.clone());
assert_eq!(state, SystemStoreInitState::Created);
assert!(config
.system_store()
.auth_data()
.read()
.verify_user("root", "password12345678")
.is_ok());
assert_eq!(config.system_store().host_data().settings_version(), 0);
assert_eq!(config.system_store().host_data().startup_counter(), 0);
}
// reboot
let (config, state) = open(auth_config);
assert_eq!(state, SystemStoreInitState::Unchanged);
assert!(config
.system_store()
.auth_data()
.read()
.verify_user("root", "password12345678")
.is_ok());
assert_eq!(config.system_store().host_data().settings_version(), 0);
assert_eq!(config.system_store().host_data().startup_counter(), 1);
}
#[test]
fn open_change_root_password() {
let open = |auth_config| {
open_sysdb(
auth_config,
"open_change_root_password.sys.db",
"open_change_root_password.sys.cow.db",
)
};
{
let (config, state) = open(ConfigAuth::new(AuthDriver::Pwd, "password12345678".into()));
assert_eq!(state, SystemStoreInitState::Created);
assert!(config
.system_store()
.auth_data()
.read()
.verify_user("root", "password12345678")
.is_ok());
assert_eq!(config.system_store().host_data().settings_version(), 0);
assert_eq!(config.system_store().host_data().startup_counter(), 0);
}
let (config, state) = open(ConfigAuth::new(AuthDriver::Pwd, "password23456789".into()));
assert_eq!(state, SystemStoreInitState::UpdatedRoot);
assert!(config
.system_store()
.auth_data()
.read()
.verify_user("root", "password23456789")
.is_ok());
assert_eq!(config.system_store().host_data().settings_version(), 1);
assert_eq!(config.system_store().host_data().startup_counter(), 1);
}
}

@ -1,384 +0,0 @@
/*
* Created on Wed Sep 06 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::{
engine::{
core::{
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{
delta::{DataDelta, DataDeltaKind, DeltaVersion},
Field, Layer, Model,
},
},
data::{cell::Datacell, tag::TagSelector, uuid::Uuid},
idx::MTIndex,
storage::{
common::interface::{fs_test::VirtualFS, fs_traits::FileOpen},
v1::raw::{
batch_jrnl::{
DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent,
DecodedBatchEventKind, NormalBatch,
},
rw::SDSSFileIO,
spec::{self, Header},
},
},
},
util::test_utils,
},
crossbeam_epoch::pin,
};
fn pkey(v: impl Into<Datacell>) -> PrimaryIndexKey {
PrimaryIndexKey::try_from_dc(v.into()).unwrap()
}
fn open_file(fpath: &str) -> FileOpen<SDSSFileIO<VirtualFS>, (SDSSFileIO<VirtualFS>, Header)> {
SDSSFileIO::open_or_create_perm_rw::<spec::DataBatchJournalV1>(fpath).unwrap()
}
fn open_batch_data(fpath: &str, mdl: &Model) -> DataBatchPersistDriver<VirtualFS> {
match open_file(fpath) {
FileOpen::Created(f) => DataBatchPersistDriver::new(f, true),
FileOpen::Existing((f, _header)) => {
let mut dbr = DataBatchRestoreDriver::new(f).unwrap();
dbr.read_data_batch_into_model(mdl).unwrap();
DataBatchPersistDriver::new(dbr.into_file().unwrap(), false)
}
}
.unwrap()
}
fn new_delta(
schema: u64,
txnid: u64,
pk: impl Into<Datacell>,
data: DcFieldIndex,
change: DataDeltaKind,
) -> DataDelta {
new_delta_with_row(
txnid,
Row::new(
pkey(pk),
data,
DeltaVersion::__new(schema),
DeltaVersion::__new(txnid),
),
change,
)
}
fn new_delta_with_row(txnid: u64, row: Row, change: DataDeltaKind) -> DataDelta {
DataDelta::new(DeltaVersion::__new(txnid), row, change)
}
fn flush_deltas_and_re_read<const N: usize>(
mdl: &Model,
dt: [DataDelta; N],
fname: &str,
) -> Vec<NormalBatch> {
let mut restore_driver = flush_batches_and_return_restore_driver(dt, mdl, fname);
let batch = restore_driver.read_all_batches().unwrap();
batch
}
fn flush_batches_and_return_restore_driver<const N: usize>(
dt: [DataDelta; N],
mdl: &Model,
fname: &str,
) -> DataBatchRestoreDriver<VirtualFS> {
// delta queue
let g = pin();
for delta in dt {
mdl.delta_state().append_new_data_delta(delta, &g);
}
let file = open_file(fname).into_created().unwrap();
{
let mut persist_driver = DataBatchPersistDriver::new(file, true).unwrap();
persist_driver.write_new_batch(&mdl, N).unwrap();
persist_driver.close().unwrap();
}
DataBatchRestoreDriver::new(open_file(fname).into_existing().unwrap().0).unwrap()
}
#[test]
fn empty_multi_open_reopen() {
let uuid = Uuid::new();
let mdl = Model::new_restore(
uuid,
"username".into(),
TagSelector::String.into_full(),
into_dict!(
"username" => Field::new([Layer::str()].into(), false),
"password" => Field::new([Layer::bin()].into(), false)
),
);
for _ in 0..100 {
let writer = open_batch_data("empty_multi_open_reopen.db-btlog", &mdl);
writer.close().unwrap();
}
}
#[test]
fn unskewed_delta() {
let uuid = Uuid::new();
let mdl = Model::new_restore(
uuid,
"username".into(),
TagSelector::String.into_full(),
into_dict!(
"username" => Field::new([Layer::str()].into(), false),
"password" => Field::new([Layer::bin()].into(), false)
),
);
let deltas = [
new_delta(
0,
0,
"sayan",
into_dict!("password" => Datacell::new_bin("37ae4b773a9fc7a20164eb16".as_bytes().into())),
DataDeltaKind::Insert,
),
new_delta(
0,
1,
"badguy",
into_dict!("password" => Datacell::new_bin("5fe3cbdc470b667cb1ba288a".as_bytes().into())),
DataDeltaKind::Insert,
),
new_delta(
0,
2,
"doggo",
into_dict!("password" => Datacell::new_bin("c80403f9d0ae4d5d0e829dd0".as_bytes().into())),
DataDeltaKind::Insert,
),
new_delta(0, 3, "badguy", into_dict!(), DataDeltaKind::Delete),
];
let batches = flush_deltas_and_re_read(&mdl, deltas, "unskewed_delta.db-btlog");
assert_eq!(
batches,
vec![NormalBatch::new(
vec![
DecodedBatchEvent::new(
0,
pkey("sayan"),
DecodedBatchEventKind::Insert(vec![Datacell::new_bin(
b"37ae4b773a9fc7a20164eb16".to_vec().into_boxed_slice()
)])
),
DecodedBatchEvent::new(
1,
pkey("badguy"),
DecodedBatchEventKind::Insert(vec![Datacell::new_bin(
b"5fe3cbdc470b667cb1ba288a".to_vec().into_boxed_slice()
)])
),
DecodedBatchEvent::new(
2,
pkey("doggo"),
DecodedBatchEventKind::Insert(vec![Datacell::new_bin(
b"c80403f9d0ae4d5d0e829dd0".to_vec().into_boxed_slice()
)])
),
DecodedBatchEvent::new(3, pkey("badguy"), DecodedBatchEventKind::Delete)
],
0
)]
)
}
#[test]
fn skewed_delta() {
// prepare model definition
let uuid = Uuid::new();
let mdl = Model::new_restore(
uuid,
"catname".into(),
TagSelector::String.into_full(),
into_dict!(
"catname" => Field::new([Layer::str()].into(), false),
"is_good" => Field::new([Layer::bool()].into(), false),
"magical" => Field::new([Layer::bool()].into(), false),
),
);
let row = Row::new(
pkey("Schrödinger's cat"),
into_dict!("is_good" => Datacell::new_bool(true), "magical" => Datacell::new_bool(false)),
DeltaVersion::__new(0),
DeltaVersion::__new(2),
);
{
// update the row
let mut wl = row.d_data().write();
wl.set_txn_revised(DeltaVersion::__new(3));
*wl.fields_mut().get_mut("magical").unwrap() = Datacell::new_bool(true);
}
// prepare deltas
let deltas = [
// insert catname: Schrödinger's cat, is_good: true
new_delta_with_row(0, row.clone(), DataDeltaKind::Insert),
// insert catname: good cat, is_good: true, magical: false
new_delta(
0,
1,
"good cat",
into_dict!("is_good" => Datacell::new_bool(true), "magical" => Datacell::new_bool(false)),
DataDeltaKind::Insert,
),
// insert catname: bad cat, is_good: false, magical: false
new_delta(
0,
2,
"bad cat",
into_dict!("is_good" => Datacell::new_bool(false), "magical" => Datacell::new_bool(false)),
DataDeltaKind::Insert,
),
// update catname: Schrödinger's cat, is_good: true, magical: true
new_delta_with_row(3, row.clone(), DataDeltaKind::Update),
];
let batch = flush_deltas_and_re_read(&mdl, deltas, "skewed_delta.db-btlog");
assert_eq!(
batch,
vec![NormalBatch::new(
vec![
DecodedBatchEvent::new(
1,
pkey("good cat"),
DecodedBatchEventKind::Insert(vec![
Datacell::new_bool(true),
Datacell::new_bool(false)
])
),
DecodedBatchEvent::new(
2,
pkey("bad cat"),
DecodedBatchEventKind::Insert(vec![
Datacell::new_bool(false),
Datacell::new_bool(false)
])
),
DecodedBatchEvent::new(
3,
pkey("Schrödinger's cat"),
DecodedBatchEventKind::Update(vec![
Datacell::new_bool(true),
Datacell::new_bool(true)
])
)
],
0
)]
)
}
#[test]
fn skewed_shuffled_persist_restore() {
let uuid = Uuid::new();
let model = Model::new_restore(
uuid,
"username".into(),
TagSelector::String.into_full(),
into_dict!("username" => Field::new([Layer::str()].into(), false), "password" => Field::new([Layer::str()].into(), false)),
);
let mongobongo = Row::new(
pkey("mongobongo"),
into_dict!("password" => "dumbo"),
DeltaVersion::__new(0),
DeltaVersion::__new(4),
);
let rds = Row::new(
pkey("rds"),
into_dict!("password" => "snail"),
DeltaVersion::__new(0),
DeltaVersion::__new(5),
);
let deltas = [
new_delta(
0,
0,
"sayan",
into_dict!("password" => "pwd123456"),
DataDeltaKind::Insert,
),
new_delta(
0,
1,
"joseph",
into_dict!("password" => "pwd234567"),
DataDeltaKind::Insert,
),
new_delta(
0,
2,
"haley",
into_dict!("password" => "pwd345678"),
DataDeltaKind::Insert,
),
new_delta(
0,
3,
"charlotte",
into_dict!("password" => "pwd456789"),
DataDeltaKind::Insert,
),
new_delta_with_row(4, mongobongo.clone(), DataDeltaKind::Insert),
new_delta_with_row(5, rds.clone(), DataDeltaKind::Insert),
new_delta_with_row(6, mongobongo.clone(), DataDeltaKind::Delete),
new_delta_with_row(7, rds.clone(), DataDeltaKind::Delete),
];
for i in 0..deltas.len() {
// prepare pretest
let fname = format!("skewed_shuffled_persist_restore_round{i}.db-btlog");
let mut deltas = deltas.clone();
let mut randomizer = test_utils::randomizer();
test_utils::shuffle_slice(&mut deltas, &mut randomizer);
// restore
let mut restore_driver = flush_batches_and_return_restore_driver(deltas, &model, &fname);
restore_driver.read_data_batch_into_model(&model).unwrap();
}
let g = pin();
for delta in &deltas[..4] {
let row = model
.primary_index()
.__raw_index()
.mt_get(delta.row().d_key(), &g)
.unwrap();
let row_data = row.read();
assert_eq!(row_data.fields().len(), 1);
assert_eq!(
row_data.fields().get("password").unwrap(),
delta
.row()
.d_data()
.read()
.fields()
.get("password")
.unwrap()
);
}
}

@ -1,52 +0,0 @@
/*
* Created on Tue Sep 05 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::engine::storage::{
common::interface::fs_traits::FileOpen,
v1::raw::{rw::SDSSFileIO, spec},
};
#[test]
fn create_delete() {
{
let f = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw::<spec::TestFile>(
"hello_world.db-tlog",
)
.unwrap();
match f {
FileOpen::Existing(_) => panic!(),
FileOpen::Created(_) => {}
};
}
let open = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw::<spec::TestFile>(
"hello_world.db-tlog",
)
.unwrap();
let _ = match open {
FileOpen::Existing(_) => {}
_ => panic!(),
};
}

@ -1,201 +0,0 @@
/*
* Created on Tue Sep 05 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::{
engine::{
error::{RuntimeResult, StorageError},
storage::v1::raw::{
journal::raw::{self, JournalAdapter, JournalWriter},
spec,
},
},
util,
},
std::cell::RefCell,
};
pub struct Database {
data: RefCell<[u8; 10]>,
}
impl Database {
fn copy_data(&self) -> [u8; 10] {
*self.data.borrow()
}
fn new() -> Self {
Self {
data: RefCell::new([0; 10]),
}
}
fn reset(&self) {
*self.data.borrow_mut() = [0; 10];
}
fn set(&self, pos: usize, val: u8) {
self.data.borrow_mut()[pos] = val;
}
fn txn_set(
&self,
pos: usize,
val: u8,
txn_writer: &mut JournalWriter<super::VirtualFS, DatabaseTxnAdapter>,
) -> RuntimeResult<()> {
self.set(pos, val);
txn_writer.append_event(TxEvent::Set(pos, val))
}
}
pub enum TxEvent {
#[allow(unused)]
Reset,
Set(usize, u8),
}
#[derive(Debug)]
pub enum TxError {
SDSS(StorageError),
}
direct_from! {
TxError => {
StorageError as SDSS
}
}
#[derive(Debug)]
pub struct DatabaseTxnAdapter;
impl JournalAdapter for DatabaseTxnAdapter {
const RECOVERY_PLUGIN: bool = false;
type Error = TxError;
type JournalEvent = TxEvent;
type GlobalState = Database;
fn encode(event: Self::JournalEvent) -> Box<[u8]> {
/*
[1B: opcode][8B:Index][1B: New value]
*/
let opcode = match event {
TxEvent::Reset => 0u8,
TxEvent::Set(_, _) => 1u8,
};
let index = match event {
TxEvent::Reset => 0u64,
TxEvent::Set(index, _) => index as u64,
};
let new_value = match event {
TxEvent::Reset => 0,
TxEvent::Set(_, val) => val,
};
let mut ret = Vec::with_capacity(10);
ret.push(opcode);
ret.extend(index.to_le_bytes());
ret.push(new_value);
ret.into_boxed_slice()
}
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> Result<(), TxError> {
assert!(payload.len() >= 10, "corrupt file");
let opcode = payload[0];
let index = u64::from_le_bytes(util::copy_slice_to_array(&payload[1..9]));
let new_value = payload[9];
match opcode {
0 if index == 0 && new_value == 0 => gs.reset(),
1 if index < 10 && index < isize::MAX as u64 => gs.set(index as usize, new_value),
_ => return Err(TxError::SDSS(StorageError::JournalLogEntryCorrupted.into())),
}
Ok(())
}
}
fn open_log(
log_name: &str,
db: &Database,
) -> RuntimeResult<JournalWriter<super::VirtualFS, DatabaseTxnAdapter>> {
raw::open_or_create_journal::<DatabaseTxnAdapter, super::VirtualFS, spec::TestFile>(
log_name, db,
)
.map(|v| v.into_inner())
}
#[test]
fn first_boot_second_readonly() {
// create log
let db1 = Database::new();
let x = || -> RuntimeResult<()> {
let mut log = open_log("testtxn.log", &db1)?;
db1.txn_set(0, 20, &mut log)?;
db1.txn_set(9, 21, &mut log)?;
log.close()
};
x().unwrap();
// backup original data
let original_data = db1.copy_data();
// restore log
let empty_db2 = Database::new();
open_log("testtxn.log", &empty_db2)
.unwrap()
.close()
.unwrap();
assert_eq!(original_data, empty_db2.copy_data());
}
#[test]
fn oneboot_mod_twoboot_mod_thirdboot_read() {
// first boot: set all to 1
let db1 = Database::new();
let x = || -> RuntimeResult<()> {
let mut log = open_log("duatxn.db-tlog", &db1)?;
for i in 0..10 {
db1.txn_set(i, 1, &mut log)?;
}
log.close()
};
x().unwrap();
let bkp_db1 = db1.copy_data();
drop(db1);
// second boot
let db2 = Database::new();
let x = || -> RuntimeResult<()> {
let mut log = open_log("duatxn.db-tlog", &db2)?;
assert_eq!(bkp_db1, db2.copy_data());
for i in 0..10 {
let current_val = db2.data.borrow()[i];
db2.txn_set(i, current_val + i as u8, &mut log)?;
}
log.close()
};
x().unwrap();
let bkp_db2 = db2.copy_data();
drop(db2);
// third boot
let db3 = Database::new();
let log = open_log("duatxn.db-tlog", &db3).unwrap();
log.close().unwrap();
assert_eq!(bkp_db2, db3.copy_data());
assert_eq!(
db3.copy_data(),
(1..=10)
.into_iter()
.map(u8::from)
.collect::<Box<[u8]>>()
.as_ref()
);
}

@ -33,7 +33,9 @@ use {
engine::{
core::GlobalNS,
storage::{
common_encoding::r1::impls::gns::GNSEvent, v2::raw::journal::JournalAdapterEvent,
common::interface::fs_traits::FSInterface,
common_encoding::r1::impls::gns::GNSEvent,
v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent},
},
txn::gns::{
model::{
@ -41,6 +43,7 @@ use {
DropModelTxn,
},
space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn},
sysctl::{AlterUserTxn, CreateUserTxn, DropUserTxn},
GNSTransaction, GNSTransactionCode,
},
RuntimeResult,
@ -53,23 +56,47 @@ use {
GNS event log impl
*/
pub type GNSDriver<Fs> = EventLogDriver<GNSEventLog, Fs>;
pub struct GNSEventLog;
impl<Fs: FSInterface> GNSDriver<Fs> {
const FILE_PATH: &'static str = "gns.db-tlog";
pub fn open_gns_with_name(name: &str, gs: &GlobalNS) -> RuntimeResult<Self> {
journal::open_journal::<_, Fs>(name, gs)
}
pub fn open_gns(gs: &GlobalNS) -> RuntimeResult<Self> {
Self::open_gns_with_name(Self::FILE_PATH, gs)
}
/// Create a new event log
pub fn create_gns() -> RuntimeResult<Self> {
journal::create_journal::<_, Fs>(Self::FILE_PATH)
}
}
macro_rules! make_dispatch {
($($obj:ty),* $(,)?) => {
[$(<$obj as crate::engine::storage::common_encoding::r1::impls::gns::GNSEvent>::decode_apply),*]
}
}
impl EventLogSpec for GNSEventLog {
type Spec = SystemDatabaseV1;
type GlobalState = GlobalNS;
type EventMeta = GNSTransactionCode;
type DecodeDispatch =
[fn(&GlobalNS, Vec<u8>) -> RuntimeResult<()>; GNSTransactionCode::VARIANT_COUNT];
const DECODE_DISPATCH: Self::DecodeDispatch = [
<CreateSpaceTxn as GNSEvent>::decode_apply,
<AlterSpaceTxn as GNSEvent>::decode_apply,
<DropSpaceTxn as GNSEvent>::decode_apply,
<CreateModelTxn as GNSEvent>::decode_apply,
<AlterModelAddTxn as GNSEvent>::decode_apply,
<AlterModelRemoveTxn as GNSEvent>::decode_apply,
<AlterModelUpdateTxn as GNSEvent>::decode_apply,
<DropModelTxn as GNSEvent>::decode_apply,
const DECODE_DISPATCH: Self::DecodeDispatch = make_dispatch![
CreateSpaceTxn,
AlterSpaceTxn,
DropSpaceTxn,
CreateModelTxn,
AlterModelAddTxn,
AlterModelRemoveTxn,
AlterModelUpdateTxn,
DropModelTxn,
CreateUserTxn,
AlterUserTxn,
DropUserTxn,
];
}

@ -23,3 +23,626 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::{
engine::{
core::{
index::{DcFieldIndex, PrimaryIndexKey, Row, RowData},
model::{
delta::{DataDelta, DataDeltaKind, DeltaVersion},
Model,
},
},
data::{
cell::Datacell,
tag::{DataTag, TagUnique},
},
error::StorageError,
idx::{MTIndex, STIndex, STIndexSeq},
storage::{
common::{
interface::fs_traits::{FSInterface, FileInterface},
sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter},
},
common_encoding::r1,
v2::raw::{
journal::{
self, BatchAdapter, BatchAdapterSpec, BatchDriver, JournalAdapterEvent,
RawJournalAdapter,
},
spec::ModelDataBatchAofV1,
},
},
RuntimeResult,
},
util::{compiler::TaggedEnum, EndianQW},
},
crossbeam_epoch::{pin, Guard},
sky_macros::TaggedEnum,
std::collections::{hash_map::Entry as HMEntry, HashMap},
};
pub type ModelDriver<Fs> = BatchDriver<ModelDataAdapter, Fs>;
impl<Fs: FSInterface> ModelDriver<Fs> {
pub fn open_model_driver(mdl: &Model, model_data_file_path: &str) -> RuntimeResult<Self> {
journal::open_journal::<_, Fs>(model_data_file_path, mdl)
}
/// Create a new event log
pub fn create_model_driver(model_data_file_path: &str) -> RuntimeResult<Self> {
journal::create_journal::<_, Fs>(model_data_file_path)
}
}
/// The model data adapter (abstract journal adapter impl)
pub struct ModelDataAdapter;
#[derive(Debug, PartialEq, Clone, Copy, TaggedEnum)]
#[repr(u8)]
/// The kind of batch
pub enum BatchType {
/// a standard batch (with n <= m events; n = Δdata, m = cardinality)
Standard = 0,
}
#[derive(Debug, PartialEq, Clone, Copy, TaggedEnum)]
#[repr(u8)]
/// The type of event *inside* a batch
#[allow(unused)] // TODO(@ohsayan): somehow merge this into delta kind?
pub enum EventType {
Delete = 0,
Insert = 1,
Update = 2,
/// owing to inconsistent reads, we exited early
EarlyExit = 3,
}
/*
persist implementation
---
this section implements persistence for a model data batch. now, there are several special
cases to handle, for example inconsistent views of the database and such so this might look
a little messy.
*/
struct RowWriter<'b, Fs: FSInterface> {
f: &'b mut TrackedWriter<Fs::File, <BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec>,
}
impl<'b, Fs: FSInterface> RowWriter<'b, Fs> {
fn write_row_metadata(&mut self, delta: &DataDelta) -> RuntimeResult<()> {
if cfg!(debug) {
let event_kind = EventType::try_from_raw(delta.change().value_u8()).unwrap();
match (event_kind, delta.change()) {
(EventType::Delete, DataDeltaKind::Delete)
| (EventType::Insert, DataDeltaKind::Insert)
| (EventType::Update, DataDeltaKind::Update) => {}
(EventType::EarlyExit, _) => unreachable!(),
_ => panic!(),
}
}
// write [change type][txn id]
let change_type = [delta.change().value_u8()];
self.f.dtrack_write(&change_type)?;
let txn_id = delta.data_version().value_u64().to_le_bytes();
self.f.dtrack_write(&txn_id)?;
Ok(())
}
/// encode the primary key only. this means NO TAG is encoded.
fn write_row_pk(&mut self, pk: &PrimaryIndexKey) -> RuntimeResult<()> {
match pk.tag() {
TagUnique::UnsignedInt | TagUnique::SignedInt => {
let data = unsafe {
// UNSAFE(@ohsayan): +tagck
pk.read_uint()
}
.to_le_bytes();
self.f.dtrack_write(&data)?;
}
TagUnique::Str | TagUnique::Bin => {
let slice = unsafe {
// UNSAFE(@ohsayan): +tagck
pk.read_bin()
};
let slice_l = slice.len().u64_bytes_le();
self.f.dtrack_write(&slice_l)?;
self.f.dtrack_write(slice)?;
}
TagUnique::Illegal => unsafe {
// UNSAFE(@ohsayan): a pk can't be constructed with illegal
impossible!()
},
}
Ok(())
}
/// Encode a single cell
fn write_cell(&mut self, value: &Datacell) -> RuntimeResult<()> {
let mut buf = vec![];
r1::obj::cell::encode(&mut buf, value);
self.f.dtrack_write(&buf)?;
Ok(())
}
/// Encode row data
fn write_row_data(&mut self, model: &Model, row_data: &RowData) -> RuntimeResult<()> {
for field_name in model.fields().stseq_ord_key() {
match row_data.fields().get(field_name) {
Some(cell) => {
self.write_cell(cell)?;
}
None if field_name.as_str() == model.p_key() => {}
None => self.f.dtrack_write(&[0])?,
}
}
Ok(())
}
}
struct BatchWriter<'a, 'b, Fs: FSInterface> {
model: &'a Model,
row_writer: RowWriter<'b, Fs>,
g: &'a Guard,
sync_count: usize,
}
impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
fn write_batch(
model: &'a Model,
g: &'a Guard,
count: usize,
f: &'b mut TrackedWriter<
Fs::File,
<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec,
>,
) -> RuntimeResult<usize> {
/*
go over each delta, check if inconsistent and apply if not. we currently keep a track
of applied deltas in a vec which is a TERRIBLY INEFFICENT WAY to do so. Instead we should
be able to "iterate" on the concurrent queue. Since that demands a proof of correctness,
once I do finish implementing it I'll swap it in here. This is the primary source of huge
memory blowup during a batch sync.
-- @ohsayan
*/
let mut me = Self::new(model, g, f)?;
let mut applied_deltas = vec![];
let mut i = 0;
while i < count {
let delta = me.model.delta_state().__data_delta_dequeue(me.g).unwrap();
match me.step(&delta) {
Ok(()) => {
applied_deltas.push(delta);
i += 1;
}
Err(e) => {
// errored, so push everything back in
me.model.delta_state().append_new_data_delta(delta, me.g);
for applied_delta in applied_deltas {
me.model
.delta_state()
.append_new_data_delta(applied_delta, g);
}
return Err(e);
}
}
}
Ok(me.sync_count)
}
fn new(
model: &'a Model,
g: &'a Guard,
f: &'b mut TrackedWriter<
Fs::File,
<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec,
>,
) -> RuntimeResult<Self> {
// write batch start information: [pk tag:1B][schema version][column count]
f.dtrack_write(&[model.p_tag().tag_unique().value_u8()])?;
f.dtrack_write(
&model
.delta_state()
.schema_current_version()
.value_u64()
.to_le_bytes(),
)?;
f.dtrack_write(&(model.fields().st_len() as u64).to_le_bytes())?;
Ok(Self {
model,
row_writer: RowWriter { f },
g,
sync_count: 0,
})
}
fn step(&mut self, delta: &DataDelta) -> RuntimeResult<()> {
match delta.change() {
DataDeltaKind::Delete => {
self.row_writer.write_row_metadata(&delta)?;
self.row_writer.write_row_pk(delta.row().d_key())?;
}
DataDeltaKind::Insert | DataDeltaKind::Update => {
// resolve deltas (this is yet another opportunity for us to reclaim memory from deleted items)
let row_data = delta
.row()
.resolve_schema_deltas_and_freeze_if(self.model.delta_state(), |row| {
row.get_txn_revised() <= delta.data_version()
});
if row_data.get_txn_revised() > delta.data_version() {
// inconsistent read. there should already be another revised delta somewhere
return Ok(());
}
self.row_writer.write_row_metadata(&delta)?;
// encode data
self.row_writer.write_row_pk(delta.row().d_key())?;
self.row_writer.write_row_data(self.model, &row_data)?;
}
}
self.sync_count += 1;
Ok(())
}
}
/// A standard model batch where atmost the given number of keys are flushed
pub struct StdModelBatch<'a>(&'a Model, usize);
impl<'a> StdModelBatch<'a> {
pub fn new(model: &'a Model, observed_len: usize) -> Self {
Self(model, observed_len)
}
}
impl<'a> JournalAdapterEvent<BatchAdapter<ModelDataAdapter>> for StdModelBatch<'a> {
fn md(&self) -> u64 {
BatchType::Standard.dscr_u64()
}
fn write_direct<Fs: FSInterface>(
self,
writer: &mut TrackedWriter<
Fs::File,
<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec,
>,
) -> RuntimeResult<()> {
// [expected commit]
writer.dtrack_write(&(self.1 as u64).to_le_bytes())?;
let g = pin();
let actual_commit = BatchWriter::<Fs>::write_batch(self.0, &g, self.1, writer)?;
if actual_commit != self.1 {
// early exit
writer.dtrack_write(&[EventType::EarlyExit.dscr()])?;
}
writer.dtrack_write(&(actual_commit as u64).to_le_bytes())
}
}
/*
restore implementation
---
the section below implements data restore from a single batch. like the persist impl,
this is also a fairly complex implementation because some changes, for example deletes
may need to be applied later due to out-of-order persistence; it is important to postpone
operations that we're unsure about since a change can appear out of order and we want to
restore the database to its exact state
*/
/// Per-batch metadata
pub struct BatchMetadata {
pk_tag: TagUnique,
schema_version: u64,
column_count: u64,
}
enum DecodedBatchEventKind {
Delete,
Insert(Vec<Datacell>),
Update(Vec<Datacell>),
}
/// State handling for any pending queries
pub struct BatchRestoreState {
events: Vec<DecodedBatchEvent>,
}
struct DecodedBatchEvent {
txn_id: DeltaVersion,
pk: PrimaryIndexKey,
kind: DecodedBatchEventKind,
}
impl DecodedBatchEvent {
fn new(txn_id: u64, pk: PrimaryIndexKey, kind: DecodedBatchEventKind) -> Self {
Self {
txn_id: DeltaVersion::__new(txn_id),
pk,
kind,
}
}
}
impl BatchAdapterSpec for ModelDataAdapter {
type Spec = ModelDataBatchAofV1;
type GlobalState = Model;
type BatchType = BatchType;
type EventType = EventType;
type BatchMetadata = BatchMetadata;
type BatchState = BatchRestoreState;
fn is_early_exit(event_type: &Self::EventType) -> bool {
EventType::EarlyExit.eq(event_type)
}
fn initialize_batch_state(_: &Self::GlobalState) -> Self::BatchState {
BatchRestoreState { events: Vec::new() }
}
fn decode_batch_metadata<Fs: FSInterface>(
_: &Self::GlobalState,
f: &mut TrackedReaderContext<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
batch_type: Self::BatchType,
) -> RuntimeResult<Self::BatchMetadata> {
// [pk tag][schema version][column cnt]
match batch_type {
BatchType::Standard => {}
}
let pk_tag = f.read_block().and_then(|[b]| {
TagUnique::try_from_raw(b).ok_or(StorageError::RawJournalCorrupted.into())
})?;
let schema_version = u64::from_le_bytes(f.read_block()?);
let column_count = u64::from_le_bytes(f.read_block()?);
Ok(BatchMetadata {
pk_tag,
schema_version,
column_count,
})
}
fn update_state_for_new_event<Fs: FSInterface>(
_: &Self::GlobalState,
bs: &mut Self::BatchState,
f: &mut TrackedReaderContext<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
batch_info: &Self::BatchMetadata,
event_type: Self::EventType,
) -> RuntimeResult<()> {
// get txn id
let txn_id = u64::from_le_bytes(f.read_block()?);
// get pk
let pk = restore_impls::decode_primary_key::<Fs, Self::Spec>(f, batch_info.pk_tag)?;
match event_type {
EventType::Delete => {
bs.events.push(DecodedBatchEvent::new(
txn_id,
pk,
DecodedBatchEventKind::Delete,
));
}
EventType::Insert | EventType::Update => {
// insert or update
// prepare row
let row = restore_impls::decode_row_data::<Fs>(batch_info, f)?;
if event_type == EventType::Insert {
bs.events.push(DecodedBatchEvent::new(
txn_id,
pk,
DecodedBatchEventKind::Insert(row),
));
} else {
bs.events.push(DecodedBatchEvent::new(
txn_id,
pk,
DecodedBatchEventKind::Update(row),
));
}
}
EventType::EarlyExit => unreachable!(),
}
Ok(())
}
fn finish(
batch_state: Self::BatchState,
batch_md: Self::BatchMetadata,
gs: &Self::GlobalState,
) -> RuntimeResult<()> {
/*
go over each change in this batch, resolve conflicts and then apply to global state
*/
let g = unsafe { crossbeam_epoch::unprotected() };
let mut pending_delete = HashMap::new();
let p_index = gs.primary_index().__raw_index();
let m = gs;
for DecodedBatchEvent { txn_id, pk, kind } in batch_state.events {
match kind {
DecodedBatchEventKind::Insert(new_row) | DecodedBatchEventKind::Update(new_row) => {
// this is more like a "newrow"
match p_index.mt_get_element(&pk, &g) {
Some(row) if row.d_data().read().get_restored_txn_revised() > txn_id => {
// skewed
// resolve deltas if any
let _ = row.resolve_schema_deltas_and_freeze(m.delta_state());
continue;
}
Some(_) | None => {
// new row (logically)
let _ = p_index.mt_delete(&pk, &g);
let mut data = DcFieldIndex::default();
for (field_name, new_data) in m
.fields()
.stseq_ord_key()
.filter(|key| key.as_str() != m.p_key())
.zip(new_row)
{
data.st_insert(
unsafe {
// UNSAFE(@ohsayan): model in scope, we're good
field_name.clone()
},
new_data,
);
}
let row = Row::new_restored(
pk,
data,
DeltaVersion::__new(batch_md.schema_version),
DeltaVersion::__new(0),
txn_id,
);
// resolve any deltas
let _ = row.resolve_schema_deltas_and_freeze(m.delta_state());
// put it back in (lol); blame @ohsayan for this joke
p_index.mt_insert(row, &g);
}
}
}
DecodedBatchEventKind::Delete => {
match pending_delete.entry(pk) {
HMEntry::Occupied(mut existing_delete) => {
if *existing_delete.get() > txn_id {
// the existing delete "happened after" our delete, so it takes precedence
continue;
}
// the existing delete happened before our delete, so our delete takes precedence
// we have a newer delete for the same key
*existing_delete.get_mut() = txn_id;
}
HMEntry::Vacant(new) => {
// we never deleted this
new.insert(txn_id);
}
}
}
}
}
// apply pending deletes; are our conflicts would have been resolved by now
for (pk, txn_id) in pending_delete {
match p_index.mt_get(&pk, &g) {
Some(row) => {
if row.read().get_restored_txn_revised() > txn_id {
// our delete "happened before" this row was inserted
continue;
}
// yup, go ahead and chuck it
let _ = p_index.mt_delete(&pk, &g);
}
None => {
// since we never delete rows until here, this is impossible
unreachable!()
}
}
}
Ok(())
}
}
mod restore_impls {
use {
super::BatchMetadata,
crate::engine::{
core::index::PrimaryIndexKey,
data::{cell::Datacell, tag::TagUnique},
error::StorageError,
storage::{
common::{
interface::fs_traits::{FSInterface, FileInterface, FileInterfaceRead},
sdss::sdss_r1::{rw::TrackedReaderContext, FileSpecV1},
},
common_encoding::r1::{
obj::cell::{self, StorageCellTypeID},
DataSource,
},
v2::raw::spec::ModelDataBatchAofV1,
},
RuntimeResult,
},
std::mem::ManuallyDrop,
};
/// Primary key decode impl
///
/// NB: We really need to make this generic, but for now we can settle for this
pub fn decode_primary_key<Fs: FSInterface, S: FileSpecV1>(
f: &mut TrackedReaderContext<<<Fs as FSInterface>::File as FileInterface>::BufReader, S>,
pk_type: TagUnique,
) -> RuntimeResult<PrimaryIndexKey> {
Ok(match pk_type {
TagUnique::SignedInt | TagUnique::UnsignedInt => {
let qw = u64::from_le_bytes(f.read_block()?);
unsafe {
// UNSAFE(@ohsayan): +tagck
PrimaryIndexKey::new_from_qw(pk_type, qw)
}
}
TagUnique::Str | TagUnique::Bin => {
let len = u64::from_le_bytes(f.read_block()?);
let mut data = vec![0; len as usize];
f.read(&mut data)?;
if pk_type == TagUnique::Str {
if core::str::from_utf8(&data).is_err() {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into());
}
}
unsafe {
// UNSAFE(@ohsayan): +tagck +verityck
let mut md = ManuallyDrop::new(data);
PrimaryIndexKey::new_from_dual(pk_type, len, md.as_mut_ptr() as usize)
}
}
_ => unsafe {
// UNSAFE(@ohsayan): TagUnique::try_from_raw rejects an construction with Invalid as the dscr
impossible!()
},
})
}
pub fn decode_row_data<Fs: FSInterface>(
batch_info: &BatchMetadata,
f: &mut TrackedReaderContext<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
ModelDataBatchAofV1,
>,
) -> Result<Vec<Datacell>, crate::engine::fractal::error::Error> {
let mut row = vec![];
let mut this_col_cnt = batch_info.column_count;
while this_col_cnt != 0 {
let Some(dscr) = StorageCellTypeID::try_from_raw(f.read_block().map(|[b]| b)?) else {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into());
};
let cell = unsafe { cell::decode_element::<Datacell, _>(f, dscr) }.map_err(|e| e.0)?;
row.push(cell);
this_col_cnt -= 1;
}
Ok(row)
}
/*
this is some silly ridiculous hackery because of some of our legacy code. basically an attempt is made to directly coerce error types.
we'll make this super generic so that no more of this madness is needed
*/
pub struct ErrorHack(crate::engine::fractal::error::Error);
impl From<crate::engine::fractal::error::Error> for ErrorHack {
fn from(value: crate::engine::fractal::error::Error) -> Self {
Self(value)
}
}
impl From<()> for ErrorHack {
fn from(_: ()) -> Self {
Self(StorageError::DataBatchRestoreCorruptedEntry.into())
}
}
impl<'a, F: FileInterfaceRead> DataSource for TrackedReaderContext<'a, F, ModelDataBatchAofV1> {
const RELIABLE_SOURCE: bool = false;
type Error = ErrorHack;
fn has_remaining(&self, cnt: usize) -> bool {
self.remaining() >= cnt as u64
}
unsafe fn read_next_byte(&mut self) -> Result<u8, Self::Error> {
Ok(self.read_next_block::<1>()?[0])
}
unsafe fn read_next_block<const N: usize>(&mut self) -> Result<[u8; N], Self::Error> {
Ok(self.read_block()?)
}
unsafe fn read_next_u64_le(&mut self) -> Result<u64, Self::Error> {
self.read_next_block().map(u64::from_le_bytes)
}
unsafe fn read_next_variable_block(&mut self, size: usize) -> Result<Vec<u8>, Self::Error> {
let mut buf = vec![0; size];
self.read(&mut buf)?;
Ok(buf)
}
}
}

@ -24,5 +24,5 @@
*
*/
mod gns_log;
mod mdl_journal;
pub mod gns_log;
pub mod mdl_journal;

@ -24,5 +24,120 @@
*
*/
mod impls;
pub(in crate::engine::storage) mod raw;
use {
super::{
common::interface::{fs_imp::LocalFS, fs_traits::FSInterface},
v1, SELoaded,
},
crate::engine::{
config::Configuration,
core::{system_db::SystemDatabase, GlobalNS},
fractal::{context, ModelDrivers, ModelUniqueID},
storage::common::paths_v1,
txn::{
gns::{model::CreateModelTxn, space::CreateSpaceTxn, sysctl::CreateUserTxn},
SpaceIDRef,
},
RuntimeResult,
},
impls::mdl_journal::ModelDriver,
};
pub(super) mod impls;
pub(super) mod raw;
pub const GNS_PATH: &str = v1::GNS_PATH;
pub const DATA_DIR: &str = v1::DATA_DIR;
pub fn recreate(gns: GlobalNS) -> RuntimeResult<SELoaded> {
let model_drivers = ModelDrivers::empty();
context::set_dmsg("creating gns");
let mut gns_driver = impls::gns_log::GNSDriver::create_gns()?;
// create all spaces
context::set_dmsg("creating all spaces");
for (space_name, space) in gns.idx().read().iter() {
LocalFS::fs_create_dir_all(&paths_v1::space_dir(space_name, space.get_uuid()))?;
gns_driver.commit_event(CreateSpaceTxn::new(space.props(), &space_name, space))?;
}
// create all models
context::set_dmsg("creating all models");
for (model_id, model) in gns.idx_models().read().iter() {
let space_uuid = gns.idx().read().get(model_id.space()).unwrap().get_uuid();
LocalFS::fs_create_dir_all(&paths_v1::model_dir(
model_id.space(),
space_uuid,
model_id.entity(),
model.get_uuid(),
))?;
let model_driver = ModelDriver::create_model_driver(&paths_v1::model_path(
model_id.space(),
space_uuid,
model_id.entity(),
model.get_uuid(),
))?;
gns_driver.commit_event(CreateModelTxn::new(
SpaceIDRef::with_uuid(model_id.space(), space_uuid),
model_id.entity(),
model,
))?;
model_drivers.add_driver(
ModelUniqueID::new(model_id.space(), model_id.entity(), model.get_uuid()),
model_driver,
);
}
// FIXME(@ohsayan): write all model data
Ok(SELoaded {
gns,
gns_driver,
model_drivers,
})
}
pub fn initialize_new(config: &Configuration) -> RuntimeResult<SELoaded> {
LocalFS::fs_create_dir_all(DATA_DIR)?;
let mut gns_driver = impls::gns_log::GNSDriver::create_gns()?;
let gns = GlobalNS::empty();
let password_hash = rcrypt::hash(&config.auth.root_key, rcrypt::DEFAULT_COST).unwrap();
// now go ahead and initialize our root user
gns_driver.commit_event(CreateUserTxn::new(
SystemDatabase::ROOT_ACCOUNT,
&password_hash,
))?;
assert!(gns.sys_db().__insert_user(
SystemDatabase::ROOT_ACCOUNT.to_owned().into_boxed_str(),
password_hash.into_boxed_slice(),
));
Ok(SELoaded {
gns,
gns_driver,
model_drivers: ModelDrivers::empty(),
})
}
pub fn restore() -> RuntimeResult<SELoaded> {
let gns = GlobalNS::empty();
context::set_dmsg("loading gns");
let gns_driver = impls::gns_log::GNSDriver::open_gns(&gns)?;
let model_drivers = ModelDrivers::empty();
for (id, model) in gns.idx_models().write().iter_mut() {
let space_uuid = gns.idx().read().get(id.space()).unwrap().get_uuid();
let model_data_file_path =
paths_v1::model_path(id.space(), space_uuid, id.entity(), model.get_uuid());
context::set_dmsg(format!("loading model driver in {model_data_file_path}"));
let model_driver =
impls::mdl_journal::ModelDriver::open_model_driver(model, &model_data_file_path)?;
model_drivers.add_driver(
ModelUniqueID::new(id.space(), id.entity(), model.get_uuid()),
model_driver,
);
unsafe {
// UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum
model.model_mutator().vacuum_stashed();
}
}
Ok(SELoaded {
gns,
gns_driver,
model_drivers,
})
}

@ -27,7 +27,7 @@
#![allow(dead_code)]
use {
self::raw::{CommitPreference, RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter},
self::raw::{CommitPreference, RawJournalAdapterEvent, RawJournalWriter},
crate::{
engine::{
error::StorageError,
@ -49,7 +49,9 @@ use {
mod raw;
#[cfg(test)]
mod tests;
pub use raw::RawJournalAdapterEvent as JournalAdapterEvent;
pub use raw::{
create_journal, open_journal, RawJournalAdapter, RawJournalAdapterEvent as JournalAdapterEvent,
};
/*
implementation of a blanket event log
@ -65,31 +67,6 @@ pub use raw::RawJournalAdapterEvent as JournalAdapterEvent;
pub type EventLogDriver<EL, Fs> = RawJournalWriter<EventLogAdapter<EL>, Fs>;
/// The event log adapter
pub struct EventLogAdapter<EL: EventLogSpec>(PhantomData<EL>);
impl<EL: EventLogSpec> EventLogAdapter<EL> {
/// Open a new event log
pub fn open<Fs: FSInterface>(
name: &str,
gs: &EL::GlobalState,
) -> RuntimeResult<EventLogDriver<EL, Fs>>
where
EL::Spec: FileSpecV1<DecodeArgs = ()>,
{
raw::open_journal::<EventLogAdapter<EL>, Fs>(name, gs)
}
/// Create a new event log
pub fn create<Fs: FSInterface>(name: &str) -> RuntimeResult<EventLogDriver<EL, Fs>>
where
EL::Spec: FileSpecV1<EncodeArgs = ()>,
{
raw::create_journal::<EventLogAdapter<EL>, Fs>(name)
}
/// Close an event log
pub fn close<Fs: FSInterface>(me: &mut EventLogDriver<EL, Fs>) -> RuntimeResult<()> {
RawJournalWriter::close_driver(me)
}
}
type DispatchFn<G> = fn(&G, Vec<u8>) -> RuntimeResult<()>;
/// Specification for an event log
@ -219,14 +196,23 @@ impl<BA: BatchAdapterSpec> BatchAdapter<BA> {
/// NB: This trait's impl is fairly complex and is going to require careful handling to get it right. Also, the event has to have
/// a specific on-disk layout: `[EXPECTED COMMIT][ANY ADDITIONAL METADATA][BATCH BODY][ACTUAL COMMIT]`
pub trait BatchAdapterSpec {
/// the SDSS spec for this journal
type Spec: FileSpecV1;
/// global state used for syncing events
type GlobalState;
/// batch type tag
type BatchType: TaggedEnum<Dscr = u8>;
/// event type tag (event in batch)
type EventType: TaggedEnum<Dscr = u8> + PartialEq;
/// custom batch metadata
type BatchMetadata;
/// batch state
type BatchState;
/// return true if the given event tag indicates an early exit
fn is_early_exit(event_type: &Self::EventType) -> bool;
/// initialize the batch state
fn initialize_batch_state(gs: &Self::GlobalState) -> Self::BatchState;
/// decode batch start metadata
fn decode_batch_metadata<Fs: FSInterface>(
gs: &Self::GlobalState,
f: &mut TrackedReaderContext<
@ -235,6 +221,7 @@ pub trait BatchAdapterSpec {
>,
meta: Self::BatchType,
) -> RuntimeResult<Self::BatchMetadata>;
/// decode new event and update state. if called, it is guaranteed that the event is not an early exit
fn update_state_for_new_event<Fs: FSInterface>(
gs: &Self::GlobalState,
bs: &mut Self::BatchState,
@ -245,7 +232,12 @@ pub trait BatchAdapterSpec {
batch_info: &Self::BatchMetadata,
event_type: Self::EventType,
) -> RuntimeResult<()>;
fn finish(bs: Self::BatchState, gs: &Self::GlobalState) -> RuntimeResult<()>;
/// finish applying all changes to the global state
fn finish(
batch_state: Self::BatchState,
batch_meta: Self::BatchMetadata,
gs: &Self::GlobalState,
) -> RuntimeResult<()>;
}
impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
@ -317,7 +309,7 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
let _stored_actual_commit_size = u64::from_le_bytes(f.read_block()?);
if _stored_actual_commit_size == real_commit_size {
// finish applying batch
BA::finish(batch_state, gs)?;
BA::finish(batch_state, batch_md, gs)?;
} else {
return Err(StorageError::RawJournalCorrupted.into());
}

@ -46,7 +46,7 @@ use {
sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter},
},
v2::raw::{
journal::raw::{create_journal, open_journal},
journal::raw::{create_journal, open_journal, RawJournalWriter},
spec::{ModelDataBatchAofV1, SystemDatabaseV1},
},
},
@ -197,30 +197,30 @@ fn test_this_data() {
for key in DATA1 {
db.push(&mut log, key).unwrap();
}
EventLogAdapter::close(&mut log).unwrap();
RawJournalWriter::close_driver(&mut log).unwrap();
}
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA1);
db.push(&mut log, DATA2[3]).unwrap();
EventLogAdapter::close(&mut log).unwrap();
RawJournalWriter::close_driver(&mut log).unwrap();
}
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA2);
db.pop(&mut log).unwrap();
EventLogAdapter::close(&mut log).unwrap();
RawJournalWriter::close_driver(&mut log).unwrap();
}
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA3);
db.push(&mut log, DATA4[3]).unwrap();
EventLogAdapter::close(&mut log).unwrap();
RawJournalWriter::close_driver(&mut log).unwrap();
}
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA4);
EventLogAdapter::close(&mut log).unwrap();
RawJournalWriter::close_driver(&mut log).unwrap();
}
}
@ -366,7 +366,11 @@ impl BatchAdapterSpec for BatchDBAdapter {
bs.pending_inserts.push(String::from_utf8(key).unwrap());
Ok(())
}
fn finish(bs: Self::BatchState, gs: &Self::GlobalState) -> RuntimeResult<()> {
fn finish(
bs: Self::BatchState,
_: Self::BatchMetadata,
gs: &Self::GlobalState,
) -> RuntimeResult<()> {
for event in bs.pending_inserts {
gs._mut().data.push(event);
gs._mut().last_flushed_at += 1;

@ -32,6 +32,7 @@ macro_rules! impl_gns_event {
pub mod model;
pub mod space;
pub mod sysctl;
#[derive(Debug, PartialEq, Clone, Copy, sky_macros::TaggedEnum)]
#[repr(u8)]
@ -44,6 +45,9 @@ pub enum GNSTransactionCode {
AlterModelRemove = 5,
AlterModelUpdate = 6,
DropModel = 7,
CreateUser = 8,
AlterUser = 9,
DropUser = 10,
}
pub trait GNSTransaction {

@ -0,0 +1,83 @@
/*
* Created on Wed Feb 21 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#[derive(Debug, PartialEq, Clone, Copy)]
pub struct CreateUserTxn<'a> {
username: &'a str,
password_hash: &'a [u8],
}
impl<'a> CreateUserTxn<'a> {
pub fn new(username: &'a str, password_hash: &'a [u8]) -> Self {
Self {
username,
password_hash,
}
}
pub fn username(&self) -> &str {
self.username
}
pub fn password_hash(&self) -> &[u8] {
self.password_hash
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub struct AlterUserTxn<'a> {
username: &'a str,
password_hash: &'a [u8],
}
impl<'a> AlterUserTxn<'a> {
pub fn new(username: &'a str, password_hash: &'a [u8]) -> Self {
Self {
username,
password_hash,
}
}
pub fn username(&self) -> &str {
self.username
}
pub fn password_hash(&self) -> &[u8] {
self.password_hash
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub struct DropUserTxn<'a> {
username: &'a str,
}
impl<'a> DropUserTxn<'a> {
pub fn new(username: &'a str) -> Self {
Self { username }
}
pub fn username(&self) -> &str {
self.username
}
}
impl_gns_event!(CreateUserTxn<'_> = CreateUser, AlterUserTxn<'_> = AlterUser, DropUserTxn<'_> = DropUser);

@ -36,11 +36,11 @@ pub struct SpaceIDRef<'a> {
}
impl<'a> SpaceIDRef<'a> {
pub fn with_uuid(name: &'a str, uuid: Uuid) -> Self {
Self { uuid, name }
}
pub fn new(name: &'a str, space: &Space) -> Self {
Self {
uuid: space.get_uuid(),
name,
}
Self::with_uuid(name, space.get_uuid())
}
pub fn name(&self) -> &str {
self.name

@ -424,3 +424,11 @@ macro_rules! impl_endian {
}
impl_endian!(u8, i8, u16, i16, u32, i32, u64, i64, usize, isize);
pub fn time_now_with_postfix(post_fix: &str) -> String {
let now = chrono::Local::now();
// Format the current date and time as YYYYMMDD_HHMMSS
let formatted_date_time = now.format("%Y%m%d_%H%M%S").to_string();
// Concatenate the formatted date and time with the postfix
format!("{}-{}", formatted_date_time, post_fix)
}

@ -477,6 +477,33 @@ mod hostname_impl {
}
}
pub fn move_files_recursively(src: &str, dst: &str) -> std::io::Result<()> {
let src = Path::new(src);
let dst = Path::new(dst);
rmove(src, dst)
}
fn rmove(src: &Path, dst: &Path) -> std::io::Result<()> {
if !dst.exists() {
fs::create_dir_all(dst)?;
}
for entry in fs::read_dir(src)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
// Compute the new destination path for this directory
let new_dst = dst.join(entry.file_name());
rmove(&path, &new_dst)?;
} else if path.is_file() {
// Compute the destination path for this file
let dest_file = dst.join(entry.file_name());
fs::rename(&path, &dest_file)?;
}
}
Ok(())
}
#[test]
fn rcopy_okay() {
let dir_paths = [

Loading…
Cancel
Save