Sync system stores when config changes

next
Sayan Nandan 10 months ago
parent d6eeb5cdf6
commit 0fed52fe8a
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -0,0 +1,61 @@
/*
* Created on Fri Nov 10 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::engine::{
data::{tag::TagClass, DictEntryGeneric},
error::{QueryError, QueryResult},
fractal::GlobalInstanceLike,
net::protocol::ClientLocalState,
ql::dcl::{UserAdd, UserDel},
};
const KEY_PASSWORD: &str = "password";
pub fn create_user(global: &impl GlobalInstanceLike, mut user_add: UserAdd<'_>) -> QueryResult<()> {
let username = user_add.username().to_owned();
let password = match user_add.options_mut().remove(KEY_PASSWORD) {
Some(DictEntryGeneric::Data(d))
if d.kind() == TagClass::Str && user_add.options().is_empty() =>
unsafe { d.into_str().unwrap_unchecked() },
None | Some(_) => {
// invalid properties
return Err(QueryError::QExecDdlInvalidProperties);
}
};
global.sys_store().create_new_user(username, password)
}
pub fn drop_user(
global: &impl GlobalInstanceLike,
cstate: &ClientLocalState,
user_del: UserDel<'_>,
) -> QueryResult<()> {
if cstate.username() == user_del.username() {
// you can't delete yourself!
return Err(QueryError::SysAuthError);
}
global.sys_store().drop_user(user_del.username())
}

@ -29,7 +29,7 @@ use {
core::{dml, model::Model, space::Space},
error::{QueryError, QueryResult},
fractal::Global,
net::protocol::{Response, SQuery},
net::protocol::{ClientLocalState, Response, SQuery},
ql::{
ast::{traits::ASTNode, InplaceData, State},
lex::{Keyword, KeywordStmt, Token},
@ -38,8 +38,9 @@ use {
core::ops::Deref,
};
pub async fn dispatch_to_executor<'a, 'b>(
global: &'b Global,
pub async fn dispatch_to_executor<'a>(
global: &Global,
cstate: &ClientLocalState,
query: SQuery<'a>,
) -> QueryResult<Response> {
let tokens =
@ -52,9 +53,9 @@ pub async fn dispatch_to_executor<'a, 'b>(
};
state.cursor_ahead();
if stmt.is_blocking() {
run_blocking_stmt(state, stmt, global).await
run_blocking_stmt(global, cstate, state, stmt).await
} else {
run_nb(global, state, stmt)
run_nb(global, cstate, state, stmt)
}
}
@ -114,10 +115,15 @@ fn _call<A: ASTNode<'static> + core::fmt::Debug, T>(
}
async fn run_blocking_stmt(
global: &Global,
cstate: &ClientLocalState,
mut state: State<'_, InplaceData>,
stmt: KeywordStmt,
global: &Global,
) -> Result<Response, QueryError> {
if !cstate.is_root() {
// all the actions here need root permission
return Err(QueryError::SysPermissionDenied);
}
let (a, b) = (&state.current()[0], &state.current()[1]);
let sysctl = stmt == KeywordStmt::Sysctl;
let create = stmt == KeywordStmt::Create;
@ -132,24 +138,26 @@ async fn run_blocking_stmt(
let d_m = (drop & Token![model].eq(a) & last_id) as u8 * 7;
let fc = sysctl as u8 | c_s | c_m | a_s | a_m | d_s | d_m;
state.cursor_ahead();
static BLK_EXEC: [fn(Global, RawSlice<Token<'static>>) -> QueryResult<()>; 8] = [
|_, _| Err(QueryError::QLUnknownStatement), // unknown
blocking_exec_sysctl, // sysctl
|g, t| call(g, t, Space::transactional_exec_create),
|g, t| call(g, t, Model::transactional_exec_create),
|g, t| call(g, t, Space::transactional_exec_alter),
|g, t| call(g, t, Model::transactional_exec_alter),
|g, t| call(g, t, Space::transactional_exec_drop),
|g, t| call(g, t, Model::transactional_exec_drop),
static BLK_EXEC: [fn(Global, &ClientLocalState, RawSlice<Token<'static>>) -> QueryResult<()>;
8] = [
|_, _, _| Err(QueryError::QLUnknownStatement), // unknown
blocking_exec_sysctl, // sysctl
|g, _, t| call(g, t, Space::transactional_exec_create),
|g, _, t| call(g, t, Model::transactional_exec_create),
|g, _, t| call(g, t, Space::transactional_exec_alter),
|g, _, t| call(g, t, Model::transactional_exec_alter),
|g, _, t| call(g, t, Space::transactional_exec_drop),
|g, _, t| call(g, t, Model::transactional_exec_drop),
];
let r = unsafe {
// UNSAFE(@ohsayan): the only await is within this block
let c_glob = global.clone();
let ptr = state.current().as_ptr() as usize;
let len = state.current().len();
let cstate: &'static ClientLocalState = core::mem::transmute(cstate);
tokio::task::spawn_blocking(move || {
let tokens = RawSlice::new(ptr as *const Token, len);
BLK_EXEC[fc as usize](c_glob, tokens)?;
BLK_EXEC[fc as usize](c_glob, cstate, tokens)?;
Ok(Response::Empty)
})
.await
@ -157,8 +165,30 @@ async fn run_blocking_stmt(
r.unwrap()
}
fn blocking_exec_sysctl(_: Global, _: RawSlice<Token<'static>>) -> QueryResult<()> {
todo!()
fn blocking_exec_sysctl(
g: Global,
cstate: &ClientLocalState,
tokens: RawSlice<Token<'static>>,
) -> QueryResult<()> {
let mut state = State::new_inplace(&tokens);
/*
currently supported: sysctl create user, sysctl drop user
*/
if state.remaining() != 2 {
return Err(QueryError::QLInvalidSyntax);
}
let (a, b) = (state.fw_read(), state.fw_read());
match (a, b) {
(Token![create], Token::Ident(id)) if id.eq_ignore_ascii_case("user") => {
let useradd = ASTNode::from_state(&mut state)?;
super::dcl::create_user(&g, useradd)
}
(Token![drop], Token::Ident(id)) if id.eq_ignore_ascii_case("user") => {
let userdel = ASTNode::from_state(&mut state)?;
super::dcl::drop_user(&g, cstate, userdel)
}
_ => Err(QueryError::QLUnknownStatement),
}
}
/*
@ -167,6 +197,7 @@ fn blocking_exec_sysctl(_: Global, _: RawSlice<Token<'static>>) -> QueryResult<(
fn run_nb(
global: &Global,
_cstate: &ClientLocalState,
state: State<'_, InplaceData>,
stmt: KeywordStmt,
) -> QueryResult<Response> {

@ -110,7 +110,7 @@ impl Row {
data,
schema_version,
txn_revised_data,
DeltaVersion::__new(0),
DeltaVersion::genesis(),
)
}
pub fn new_restored(

@ -24,22 +24,24 @@
*
*/
use self::dml::QueryExecMeta;
pub use self::util::{EntityID, EntityIDRef};
use super::{fractal::GlobalInstanceLike, ql::ast::Entity};
pub(in crate::engine) mod dcl;
pub(in crate::engine) mod dml;
pub mod exec;
pub(in crate::engine) mod exec;
pub(in crate::engine) mod index;
pub(in crate::engine) mod model;
pub(in crate::engine) mod query_meta;
pub mod space;
pub(in crate::engine) mod space;
// util
mod util;
// test
#[cfg(test)]
pub(super) mod tests;
// re-exports
pub use self::util::{EntityID, EntityIDRef};
// imports
use {
self::model::Model,
self::{dml::QueryExecMeta, model::Model},
super::{fractal::GlobalInstanceLike, ql::ast::Entity},
crate::engine::{
core::space::Space,
error::{QueryError, QueryResult},

@ -24,18 +24,13 @@
*
*/
#![allow(unused)]
use {
super::{Fields, Model},
crate::{
engine::{
core::{dml::QueryExecMeta, index::Row},
fractal::{FractalToken, GlobalInstanceLike},
sync::atm::Guard,
sync::queue::Queue,
},
util::compiler,
crate::engine::{
core::{dml::QueryExecMeta, index::Row},
fractal::{FractalToken, GlobalInstanceLike},
sync::atm::Guard,
sync::queue::Queue,
},
parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard},
std::{
@ -70,8 +65,8 @@ impl PartialEq for ISyncMatrix {
#[derive(Debug)]
/// Read model, write new data
pub struct IRModelSMData<'a> {
rmodel: RwLockReadGuard<'a, ()>,
mdata: RwLockReadGuard<'a, ()>,
_rmodel: RwLockReadGuard<'a, ()>,
_mdata: RwLockReadGuard<'a, ()>,
fields: &'a Fields,
}
@ -80,8 +75,8 @@ impl<'a> IRModelSMData<'a> {
let rmodel = m.sync_matrix().v_priv_model_alter.read();
let mdata = m.sync_matrix().v_priv_data_new_or_revise.read();
Self {
rmodel,
mdata,
_rmodel: rmodel,
_mdata: mdata,
fields: unsafe {
// UNSAFE(@ohsayan): we already have acquired this resource
m._read_fields()
@ -96,14 +91,14 @@ impl<'a> IRModelSMData<'a> {
#[derive(Debug)]
/// Read model
pub struct IRModel<'a> {
rmodel: RwLockReadGuard<'a, ()>,
_rmodel: RwLockReadGuard<'a, ()>,
fields: &'a Fields,
}
impl<'a> IRModel<'a> {
pub fn new(m: &'a Model) -> Self {
Self {
rmodel: m.sync_matrix().v_priv_model_alter.read(),
_rmodel: m.sync_matrix().v_priv_model_alter.read(),
fields: unsafe {
// UNSAFE(@ohsayan): we already have acquired this resource
m._read_fields()
@ -118,14 +113,14 @@ impl<'a> IRModel<'a> {
#[derive(Debug)]
/// Write model
pub struct IWModel<'a> {
wmodel: RwLockWriteGuard<'a, ()>,
_wmodel: RwLockWriteGuard<'a, ()>,
fields: &'a mut Fields,
}
impl<'a> IWModel<'a> {
pub fn new(m: &'a Model) -> Self {
Self {
wmodel: m.sync_matrix().v_priv_model_alter.write(),
_wmodel: m.sync_matrix().v_priv_model_alter.write(),
fields: unsafe {
// UNSAFE(@ohsayan): we have exclusive access to this resource
m._read_fields_mut()
@ -211,9 +206,6 @@ impl DeltaState {
pub fn create_new_data_delta_version(&self) -> DeltaVersion {
DeltaVersion(self.__data_delta_step())
}
pub fn get_data_delta_size(&self) -> usize {
self.data_deltas_size.load(Ordering::Acquire)
}
}
impl DeltaState {
@ -356,7 +348,7 @@ impl<'a> SchemaDeltaIndexRGuard<'a> {
#[derive(Debug, Clone)]
pub struct DataDelta {
schema_version: DeltaVersion,
_schema_version: DeltaVersion,
data_version: DeltaVersion,
row: Row,
change: DataDeltaKind,
@ -370,15 +362,12 @@ impl DataDelta {
change: DataDeltaKind,
) -> Self {
Self {
schema_version,
_schema_version: schema_version,
data_version,
row,
change,
}
}
pub fn schema_version(&self) -> DeltaVersion {
self.schema_version
}
pub fn data_version(&self) -> DeltaVersion {
self.data_version
}

@ -220,6 +220,21 @@ impl Datacell {
pub fn str(&self) -> &str {
self.try_str().unwrap()
}
pub fn into_str(self) -> Option<String> {
if self.kind() != TagClass::Str {
return None;
}
unsafe {
// UNSAFE(@ohsayan): no double free + tagck
let md = ManuallyDrop::new(self);
let (a, b) = md.data.word.dwordqn_load_qw_nw();
Some(String::from_raw_parts(
b as *const u8 as *mut u8,
a as usize,
a as usize,
))
}
}
// list
pub fn new_list(l: Vec<Self>) -> Self {
unsafe {

@ -45,6 +45,8 @@ pub enum QueryError {
SysAuthError = 3,
/// transactional error
SysTransactionalError = 4,
/// insufficient permissions error
SysPermissionDenied = 5,
// exchange
NetworkSubsystemCorruptedPacket = 24,
// QL

@ -25,6 +25,7 @@
*/
use {
self::sys_store::SystemStore,
super::{
core::{dml::QueryExecMeta, model::Model, GlobalNS},
data::uuid::Uuid,
@ -40,11 +41,11 @@ use {
tokio::sync::mpsc::unbounded_channel,
};
pub mod config;
pub mod context;
mod drivers;
pub mod error;
mod mgr;
pub mod sys_store;
#[cfg(test)]
pub mod test_utils;
mod util;
@ -74,7 +75,7 @@ pub struct GlobalStateStart {
/// Must be called iff this is the only thread calling it
pub unsafe fn load_and_enable_all(
gns: GlobalNS,
config: config::SysConfig,
config: SystemStore<LocalFS>,
gns_driver: GNSTransactionDriverAnyFS<LocalFS>,
model_drivers: ModelDrivers<LocalFS>,
) -> GlobalStateStart {
@ -145,7 +146,7 @@ pub trait GlobalInstanceLike {
}
}
// config handle
fn sys_cfg(&self) -> &config::SysConfig;
fn sys_store(&self) -> &SystemStore<Self::FileSystem>;
}
impl GlobalInstanceLike for Global {
@ -169,7 +170,7 @@ impl GlobalInstanceLike for Global {
self._get_max_delta_size()
}
// sys
fn sys_cfg(&self) -> &config::SysConfig {
fn sys_store(&self) -> &SystemStore<Self::FileSystem> {
&self.get_state().config
}
// model
@ -264,7 +265,7 @@ struct GlobalState {
gns_driver: drivers::FractalGNSDriver<LocalFS>,
mdl_driver: RwLock<ModelDrivers<LocalFS>>,
task_mgr: mgr::FractalMgr,
config: config::SysConfig,
config: SystemStore<LocalFS>,
}
impl GlobalState {
@ -273,7 +274,7 @@ impl GlobalState {
gns_driver: drivers::FractalGNSDriver<LocalFS>,
mdl_driver: RwLock<ModelDrivers<LocalFS>>,
task_mgr: mgr::FractalMgr,
config: config::SysConfig,
config: SystemStore<LocalFS>,
) -> Self {
Self {
gns,

@ -24,17 +24,31 @@
*
*/
use crate::engine::config::ConfigAuth;
use {
crate::engine::{
config::ConfigMode,
config::{ConfigAuth, ConfigMode},
error::{QueryError, QueryResult},
storage::v1::RawFSInterface,
},
parking_lot::RwLock,
std::collections::{hash_map::Entry, HashMap},
std::{
collections::{hash_map::Entry, HashMap},
marker::PhantomData,
},
};
#[derive(Debug)]
pub struct SystemStore<Fs> {
syscfg: SysConfig,
_fs: PhantomData<Fs>,
}
impl<Fs> SystemStore<Fs> {
pub fn system_store(&self) -> &SysConfig {
&self.syscfg
}
}
#[derive(Debug)]
/// The global system configuration
pub struct SysConfig {
@ -63,10 +77,10 @@ impl SysConfig {
pub fn new_full(new_auth: ConfigAuth, host_data: SysHostData, run_mode: ConfigMode) -> Self {
Self::new(
RwLock::new(SysAuth::new(
rcrypt::hash(new_auth.root_key, rcrypt::DEFAULT_COST)
into_dict!(SysAuthUser::USER_ROOT => SysAuthUser::new(
rcrypt::hash(new_auth.root_key.as_str(), rcrypt::DEFAULT_COST)
.unwrap()
.into_boxed_slice(),
Default::default(),
.into_boxed_slice())),
)),
host_data,
run_mode,
@ -80,10 +94,10 @@ impl SysConfig {
pub(super) fn test_default() -> Self {
Self {
auth_data: RwLock::new(SysAuth::new(
into_dict!(SysAuthUser::USER_ROOT => SysAuthUser::new(
rcrypt::hash("password12345678", rcrypt::DEFAULT_COST)
.unwrap()
.into_boxed_slice(),
Default::default(),
.into_boxed_slice())),
)),
host_data: SysHostData::new(0, 0),
run_mode: ConfigMode::Dev,
@ -133,6 +147,51 @@ impl SysHostData {
}
}
impl<Fs: RawFSInterface> SystemStore<Fs> {
pub fn _new(syscfg: SysConfig) -> Self {
Self {
syscfg,
_fs: PhantomData,
}
}
/// Create a new user with the given details
pub fn create_new_user(&self, username: String, password: String) -> QueryResult<()> {
// TODO(@ohsayan): we want to be very careful with this
let _username = username.clone();
let mut auth = self.system_store().auth_data().write();
match auth.users.entry(username.into()) {
Entry::Vacant(ve) => {
ve.insert(SysAuthUser::new(
rcrypt::hash(password, rcrypt::DEFAULT_COST)
.unwrap()
.into_boxed_slice(),
));
self.sync_db_or_rollback(|| {
auth.users.remove(_username.as_str());
})?;
Ok(())
}
Entry::Occupied(_) => Err(QueryError::SysAuthError),
}
}
pub fn drop_user(&self, username: &str) -> QueryResult<()> {
let mut auth = self.system_store().auth_data().write();
if username == SysAuthUser::USER_ROOT {
// you can't remove root!
return Err(QueryError::SysAuthError);
}
match auth.users.remove_entry(username) {
Some((username, user)) => {
self.sync_db_or_rollback(|| {
let _ = auth.users.insert(username, user);
})?;
Ok(())
}
None => Err(QueryError::SysAuthError),
}
}
}
/*
auth
*/
@ -140,28 +199,24 @@ impl SysHostData {
#[derive(Debug, PartialEq)]
/// The auth data section (system.auth)
pub struct SysAuth {
root_key: Box<[u8]>,
users: HashMap<Box<str>, SysAuthUser>,
}
impl SysAuth {
/// New [`SysAuth`] with the given settings
pub fn new(root_key: Box<[u8]>, users: HashMap<Box<str>, SysAuthUser>) -> Self {
Self { root_key, users }
pub fn new(users: HashMap<Box<str>, SysAuthUser>) -> Self {
Self { users }
}
/// Create a new user with the given details
#[allow(unused)]
pub fn create_new_user(&mut self, username: &str, password: &str) -> QueryResult<()> {
match self.users.entry(username.into()) {
Entry::Vacant(ve) => {
ve.insert(SysAuthUser::new(
rcrypt::hash(password, rcrypt::DEFAULT_COST)
.unwrap()
.into_boxed_slice(),
));
Ok(())
pub fn verify_user_is_root<T: AsRef<[u8]> + ?Sized>(
&self,
username: &str,
password: &T,
) -> QueryResult<bool> {
match self.users.get(username) {
Some(user) if rcrypt::verify(password, user.key()).unwrap() => {
Ok(username == SysAuthUser::USER_ROOT)
}
Entry::Occupied(_) => Err(QueryError::SysAuthError),
Some(_) | None => Err(QueryError::SysAuthError),
}
}
/// Verify the user with the given details
@ -170,20 +225,7 @@ impl SysAuth {
username: &str,
password: &T,
) -> QueryResult<()> {
if username == "root" {
if rcrypt::verify(password, self.root_key()).unwrap() {
return Ok(());
} else {
return Err(QueryError::SysAuthError);
}
}
match self.users.get(username) {
Some(user) if rcrypt::verify(password, user.key()).unwrap() => Ok(()),
Some(_) | None => Err(QueryError::SysAuthError),
}
}
pub fn root_key(&self) -> &[u8] {
&self.root_key
self.verify_user_is_root(username, password).map(|_| ())
}
pub fn users(&self) -> &HashMap<Box<str>, SysAuthUser> {
&self.users
@ -197,6 +239,7 @@ pub struct SysAuthUser {
}
impl SysAuthUser {
pub const USER_ROOT: &'static str = "root";
/// Create a new [`SysAuthUser`]
pub fn new(key: Box<[u8]>) -> Self {
Self { key }

@ -26,8 +26,8 @@
use {
super::{
config::SysConfig, CriticalTask, FractalModelDriver, GenericTask, GlobalInstanceLike,
ModelUniqueID, Task,
sys_store::{SysConfig, SystemStore},
CriticalTask, FractalModelDriver, GenericTask, GlobalInstanceLike, ModelUniqueID, Task,
},
crate::engine::{
core::GlobalNS,
@ -54,7 +54,7 @@ pub struct TestGlobal<Fs: RawFSInterface = VirtualFS> {
max_delta_size: usize,
txn_driver: Mutex<GNSTransactionDriverAnyFS<Fs>>,
model_drivers: RwLock<HashMap<ModelUniqueID, FractalModelDriver<Fs>>>,
sys_cfg: SysConfig,
sys_cfg: SystemStore<Fs>,
}
impl<Fs: RawFSInterface> TestGlobal<Fs> {
@ -70,7 +70,7 @@ impl<Fs: RawFSInterface> TestGlobal<Fs> {
max_delta_size,
txn_driver: Mutex::new(txn_driver),
model_drivers: RwLock::default(),
sys_cfg: SysConfig::test_default(),
sys_cfg: SystemStore::_new(SysConfig::test_default()),
}
}
}
@ -117,7 +117,7 @@ impl<Fs: RawFSInterface> GlobalInstanceLike for TestGlobal<Fs> {
fn get_max_delta_size(&self) -> usize {
100
}
fn sys_cfg(&self) -> &super::config::SysConfig {
fn sys_store(&self) -> &SystemStore<Fs> {
&self.sys_cfg
}
fn initialize_model_driver(

@ -47,10 +47,12 @@ pub use error::RuntimeResult;
use {
self::{
config::{ConfigEndpoint, ConfigEndpointTls, ConfigMode, ConfigReturn, Configuration},
fractal::context::{self, Subsystem},
fractal::{
context::{self, Subsystem},
sys_store::SystemStore,
},
storage::v1::{
loader::{self, SEInitState},
sysdb::{self, SystemStoreInit},
LocalFS,
},
},
@ -83,8 +85,7 @@ pub fn load_all() -> RuntimeResult<(Configuration, fractal::GlobalStateStart)> {
// restore system database
info!("loading system database ...");
context::set_dmsg("loading system database");
let SystemStoreInit { store, state } =
sysdb::open_system_database::<LocalFS>(config.auth.clone(), config.mode)?;
let (store, state) = SystemStore::<LocalFS>::open_or_restore(config.auth.clone(), config.mode)?;
let sysdb_is_new = state.is_created();
if state.is_existing_updated_root() {
warn!("the root account was updated");

@ -51,6 +51,25 @@ use {
tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter},
};
#[derive(Debug, PartialEq)]
pub struct ClientLocalState {
username: Box<str>,
root: bool,
hs: handshake::CHandshakeStatic,
}
impl ClientLocalState {
pub fn new(username: Box<str>, root: bool, hs: handshake::CHandshakeStatic) -> Self {
Self { username, root, hs }
}
pub fn is_root(&self) -> bool {
self.root
}
pub fn username(&self) -> &str {
&self.username
}
}
#[derive(Debug, PartialEq)]
pub enum Response {
Empty,
@ -63,7 +82,7 @@ pub(super) async fn query_loop<S: Socket>(
global: &Global,
) -> IoResult<QueryLoopResult> {
// handshake
let _ = match do_handshake(con, buf, global).await? {
let client_state = match do_handshake(con, buf, global).await? {
PostHandshake::Okay(hs) => hs,
PostHandshake::ConnectionClosedFin => return Ok(QueryLoopResult::Fin),
PostHandshake::ConnectionClosedRst => return Ok(QueryLoopResult::Rst),
@ -116,7 +135,7 @@ pub(super) async fn query_loop<S: Socket>(
}
};
// now execute query
match engine::core::exec::dispatch_to_executor(global, sq).await {
match engine::core::exec::dispatch_to_executor(global, &client_state, sq).await {
Ok(Response::Empty) => {
con.write_all(&[0x12]).await?;
}
@ -137,7 +156,7 @@ pub(super) async fn query_loop<S: Socket>(
#[derive(Debug, PartialEq)]
enum PostHandshake {
Okay(handshake::CHandshakeStatic),
Okay(ClientLocalState),
Error(ProtocolError),
ConnectionClosedFin,
ConnectionClosedRst,
@ -197,14 +216,20 @@ async fn do_handshake<S: Socket>(
}
match core::str::from_utf8(handshake.hs_auth().username()) {
Ok(uname) => {
let auth = global.sys_cfg().auth_data().read();
if auth
.verify_user(uname, handshake.hs_auth().password())
.is_ok()
{
let hs_static = handshake.hs_static();
buf.advance(cursor);
return Ok(PostHandshake::Okay(hs_static));
let auth = global.sys_store().system_store().auth_data().read();
let r = auth.verify_user_is_root(uname, handshake.hs_auth().password());
match r {
Ok(is_root) => {
let hs = handshake.hs_static();
let ret = Ok(PostHandshake::Okay(ClientLocalState::new(
uname.into(),
is_root,
hs,
)));
buf.advance(cursor);
return ret;
}
Err(_) => {}
}
}
Err(_) => {}

@ -95,6 +95,15 @@ impl<'a> UserAdd<'a> {
pub fn parse<Qd: QueryData<'a>>(state: &mut State<'a, Qd>) -> QueryResult<Self> {
parse(state).map(|UserMeta { username, options }: UserMeta| Self::new(username, options))
}
pub fn username(&self) -> &str {
self.username
}
pub fn options_mut(&mut self) -> &mut DictGeneric {
&mut self.options
}
pub fn options(&self) -> &DictGeneric {
&self.options
}
}
impl<'a> traits::ASTNode<'a> for UserAdd<'a> {
@ -131,6 +140,9 @@ impl<'a> UserDel<'a> {
}
Err(QueryError::QLInvalidSyntax)
}
pub fn username(&self) -> &str {
self.username
}
}
impl<'a> traits::ASTNode<'a> for UserDel<'a> {

@ -30,22 +30,13 @@ use {
config::{ConfigAuth, ConfigMode},
data::{cell::Datacell, DictEntryGeneric, DictGeneric},
error::{RuntimeResult, StorageError},
fractal::config::{SysAuth, SysAuthUser, SysConfig, SysHostData},
fractal::sys_store::{SysAuth, SysAuthUser, SysConfig, SysHostData, SystemStore},
storage::v1::{inf, spec, RawFSInterface, SDSSFileIO},
},
parking_lot::RwLock,
std::collections::HashMap,
};
const SYSDB_PATH: &str = "sys.db";
const SYSDB_COW_PATH: &str = "sys.db.cow";
const SYS_KEY_AUTH: &str = "auth";
const SYS_KEY_AUTH_ROOT: &str = "root";
const SYS_KEY_AUTH_USERS: &str = "users";
const SYS_KEY_SYS: &str = "sys";
const SYS_KEY_SYS_STARTUP_COUNTER: &str = "sc";
const SYS_KEY_SYS_SETTINGS_VERSION: &str = "sv";
#[derive(Debug, PartialEq)]
/// The system store init state
pub enum SystemStoreInitState {
@ -66,123 +57,6 @@ impl SystemStoreInitState {
}
}
#[derive(Debug, PartialEq)]
/// Result of initializing the system store (sysdb)
pub struct SystemStoreInit {
pub store: SysConfig,
pub state: SystemStoreInitState,
}
impl SystemStoreInit {
pub fn new(store: SysConfig, state: SystemStoreInitState) -> Self {
Self { store, state }
}
}
/// Open the system database
///
/// - If it doesn't exist, create it
/// - If it exists, look for config changes and sync them
pub fn open_system_database<Fs: RawFSInterface>(
auth: ConfigAuth,
mode: ConfigMode,
) -> RuntimeResult<SystemStoreInit> {
open_or_reinit_system_database::<Fs>(auth, mode, SYSDB_PATH, SYSDB_COW_PATH)
}
/// Open or re-initialize the system database
pub fn open_or_reinit_system_database<Fs: RawFSInterface>(
auth: ConfigAuth,
run_mode: ConfigMode,
sysdb_path: &str,
sysdb_path_cow: &str,
) -> RuntimeResult<SystemStoreInit> {
let sysdb_file = match SDSSFileIO::<Fs>::open_or_create_perm_rw::<spec::SysDBV1>(sysdb_path)? {
FileOpen::Created(new) => {
// init new syscfg
let new_syscfg = SysConfig::new_auth(auth, run_mode);
sync_system_database_to(&new_syscfg, new)?;
return Ok(SystemStoreInit::new(
new_syscfg,
SystemStoreInitState::Created,
));
}
FileOpen::Existing((ex, _)) => ex,
};
let prev_sysdb = decode_system_database(sysdb_file, run_mode)?;
let state;
// see if settings have changed
if prev_sysdb
.auth_data()
.read()
.verify_user("root", &auth.root_key)
.is_ok()
{
state = SystemStoreInitState::Unchanged;
} else {
state = SystemStoreInitState::UpdatedRoot;
}
// create new config
let new_syscfg = SysConfig::new_full(
auth,
SysHostData::new(
prev_sysdb.host_data().startup_counter() + 1,
prev_sysdb.host_data().settings_version()
+ !matches!(state, SystemStoreInitState::Unchanged) as u32,
),
run_mode,
);
// sync
sync_system_database_to(
&new_syscfg,
SDSSFileIO::<Fs>::create::<spec::SysDBV1>(sysdb_path_cow)?,
)?;
Fs::fs_rename_file(sysdb_path_cow, sysdb_path)?;
Ok(SystemStoreInit::new(new_syscfg, state))
}
/// Sync the system database to the given file
pub fn sync_system_database_to<Fs: RawFSInterface>(
cfg: &SysConfig,
mut f: SDSSFileIO<Fs>,
) -> RuntimeResult<()> {
// prepare our flat file
let mut map: DictGeneric = into_dict!(
SYS_KEY_SYS => DictEntryGeneric::Map(into_dict!(
SYS_KEY_SYS_SETTINGS_VERSION => Datacell::new_uint_default(cfg.host_data().settings_version() as _),
SYS_KEY_SYS_STARTUP_COUNTER => Datacell::new_uint_default(cfg.host_data().startup_counter() as _),
)),
SYS_KEY_AUTH => DictGeneric::new(),
);
let auth_key = map.get_mut(SYS_KEY_AUTH).unwrap();
let auth = cfg.auth_data().read();
let auth_key = auth_key.as_dict_mut().unwrap();
auth_key.insert(
SYS_KEY_AUTH_ROOT.into(),
DictEntryGeneric::Data(Datacell::new_bin(auth.root_key().into())),
);
auth_key.insert(
SYS_KEY_AUTH_USERS.into(),
DictEntryGeneric::Map(
// username -> [..settings]
auth.users()
.iter()
.map(|(username, user)| {
(
username.to_owned(),
DictEntryGeneric::Data(Datacell::new_list(vec![Datacell::new_bin(
user.key().into(),
)])),
)
})
.collect(),
),
);
// write
let buf = super::inf::enc::enc_dict_full::<super::inf::map::GenericDictSpec>(&map);
f.fsynced_write(&buf)
}
fn rkey<T>(
d: &mut DictGeneric,
key: &str,
@ -194,55 +68,181 @@ fn rkey<T>(
}
}
/// Decode the system database
pub fn decode_system_database<Fs: RawFSInterface>(
mut f: SDSSFileIO<Fs>,
run_mode: ConfigMode,
) -> RuntimeResult<SysConfig> {
let mut sysdb_data =
inf::dec::dec_dict_full::<inf::map::GenericDictSpec>(&f.load_remaining_into_buffer()?)?;
// get our auth and sys stores
let mut auth_store = rkey(&mut sysdb_data, SYS_KEY_AUTH, DictEntryGeneric::into_dict)?;
let mut sys_store = rkey(&mut sysdb_data, SYS_KEY_SYS, DictEntryGeneric::into_dict)?;
// load auth store
let root_key = rkey(&mut auth_store, SYS_KEY_AUTH_ROOT, |d| {
d.into_data()?.into_bin()
})?;
let users = rkey(
&mut auth_store,
SYS_KEY_AUTH_USERS,
DictEntryGeneric::into_dict,
)?;
// load users
let mut loaded_users = HashMap::new();
for (username, userdata) in users {
let mut userdata = userdata
.into_data()
.and_then(Datacell::into_list)
.ok_or(StorageError::SysDBCorrupted)?;
if userdata.len() != 1 {
return Err(StorageError::SysDBCorrupted.into());
impl<Fs: RawFSInterface> SystemStore<Fs> {
const SYSDB_PATH: &'static str = "sys.db";
const SYSDB_COW_PATH: &'static str = "sys.db.cow";
const SYS_KEY_AUTH: &'static str = "auth";
const SYS_KEY_AUTH_USERS: &'static str = "users";
const SYS_KEY_SYS: &'static str = "sys";
const SYS_KEY_SYS_STARTUP_COUNTER: &'static str = "sc";
const SYS_KEY_SYS_SETTINGS_VERSION: &'static str = "sv";
pub fn open_or_restore(
auth: ConfigAuth,
run_mode: ConfigMode,
) -> RuntimeResult<(Self, SystemStoreInitState)> {
Self::open_with_name(Self::SYSDB_PATH, Self::SYSDB_COW_PATH, auth, run_mode)
}
pub fn sync_db_or_rollback(&self, rb: impl FnOnce()) -> RuntimeResult<()> {
match self.sync_db() {
Ok(()) => Ok(()),
Err(e) => {
rb();
Err(e)
}
}
}
pub fn sync_db(&self) -> RuntimeResult<()> {
self._sync_with(Self::SYSDB_PATH, Self::SYSDB_COW_PATH)
}
pub fn open_with_name(
sysdb_name: &str,
sysdb_cow_path: &str,
auth: ConfigAuth,
run_mode: ConfigMode,
) -> RuntimeResult<(Self, SystemStoreInitState)> {
match SDSSFileIO::open_or_create_perm_rw::<spec::SysDBV1>(sysdb_name)? {
FileOpen::Created(new) => {
let me = Self::_new(SysConfig::new_auth(auth, run_mode));
me._sync(new)?;
Ok((me, SystemStoreInitState::Created))
}
FileOpen::Existing((ex, _)) => {
Self::restore_and_sync(ex, auth, run_mode, sysdb_name, sysdb_cow_path)
}
}
}
}
impl<Fs: RawFSInterface> SystemStore<Fs> {
fn _sync(&self, mut f: SDSSFileIO<Fs>) -> RuntimeResult<()> {
let cfg = self.system_store();
// prepare our flat file
let mut map: DictGeneric = into_dict!(
Self::SYS_KEY_SYS => DictEntryGeneric::Map(into_dict!(
Self::SYS_KEY_SYS_SETTINGS_VERSION => Datacell::new_uint_default(cfg.host_data().settings_version() as _),
Self::SYS_KEY_SYS_STARTUP_COUNTER => Datacell::new_uint_default(cfg.host_data().startup_counter() as _),
)),
Self::SYS_KEY_AUTH => DictGeneric::new(),
);
let auth_key = map.get_mut(Self::SYS_KEY_AUTH).unwrap();
let auth = cfg.auth_data().read();
let auth_key = auth_key.as_dict_mut().unwrap();
auth_key.insert(
Self::SYS_KEY_AUTH_USERS.into(),
DictEntryGeneric::Map(
// username -> [..settings]
auth.users()
.iter()
.map(|(username, user)| {
(
username.to_owned(),
DictEntryGeneric::Data(Datacell::new_list(vec![Datacell::new_bin(
user.key().into(),
)])),
)
})
.collect(),
),
);
// write
let buf = super::inf::enc::enc_dict_full::<super::inf::map::GenericDictSpec>(&map);
f.fsynced_write(&buf)
}
fn _sync_with(&self, target: &str, cow: &str) -> RuntimeResult<()> {
let f = SDSSFileIO::create::<spec::SysDBV1>(cow)?;
self._sync(f)?;
Fs::fs_rename_file(cow, target)
}
fn restore_and_sync(
f: SDSSFileIO<Fs>,
auth: ConfigAuth,
run_mode: ConfigMode,
fname: &str,
fcow_name: &str,
) -> RuntimeResult<(Self, SystemStoreInitState)> {
let prev_sysdb = Self::_restore(f, run_mode)?;
let state;
// see if settings have changed
if prev_sysdb
.auth_data()
.read()
.verify_user(SysAuthUser::USER_ROOT, &auth.root_key)
.is_ok()
{
state = SystemStoreInitState::Unchanged;
} else {
state = SystemStoreInitState::UpdatedRoot;
}
let user_password = userdata
.remove(0)
.into_bin()
.ok_or(StorageError::SysDBCorrupted)?;
loaded_users.insert(username, SysAuthUser::new(user_password.into_boxed_slice()));
// create new config
let new_syscfg = SysConfig::new_full(
auth,
SysHostData::new(
prev_sysdb.host_data().startup_counter() + 1,
prev_sysdb.host_data().settings_version()
+ !matches!(state, SystemStoreInitState::Unchanged) as u32,
),
run_mode,
);
let slf = Self::_new(new_syscfg);
// now sync
slf._sync_with(fname, fcow_name)?;
Ok((slf, state))
}
let sys_auth = SysAuth::new(root_key.into_boxed_slice(), loaded_users);
// load sys data
let sc = rkey(&mut sys_store, SYS_KEY_SYS_STARTUP_COUNTER, |d| {
d.into_data()?.into_uint()
})?;
let sv = rkey(&mut sys_store, SYS_KEY_SYS_SETTINGS_VERSION, |d| {
d.into_data()?.into_uint()
})?;
if !(sysdb_data.is_empty() & auth_store.is_empty() & sys_store.is_empty()) {
return Err(StorageError::SysDBCorrupted.into());
fn _restore(mut f: SDSSFileIO<Fs>, run_mode: ConfigMode) -> RuntimeResult<SysConfig> {
let mut sysdb_data =
inf::dec::dec_dict_full::<inf::map::GenericDictSpec>(&f.load_remaining_into_buffer()?)?;
// get our auth and sys stores
let mut auth_store = rkey(
&mut sysdb_data,
Self::SYS_KEY_AUTH,
DictEntryGeneric::into_dict,
)?;
let mut sys_store = rkey(
&mut sysdb_data,
Self::SYS_KEY_SYS,
DictEntryGeneric::into_dict,
)?;
// load auth store
let users = rkey(
&mut auth_store,
Self::SYS_KEY_AUTH_USERS,
DictEntryGeneric::into_dict,
)?;
// load users
let mut loaded_users = HashMap::new();
for (username, userdata) in users {
let mut userdata = userdata
.into_data()
.and_then(Datacell::into_list)
.ok_or(StorageError::SysDBCorrupted)?;
if userdata.len() != 1 {
return Err(StorageError::SysDBCorrupted.into());
}
let user_password = userdata
.remove(0)
.into_bin()
.ok_or(StorageError::SysDBCorrupted)?;
loaded_users.insert(username, SysAuthUser::new(user_password.into_boxed_slice()));
}
let sys_auth = SysAuth::new(loaded_users);
// load sys data
let sc = rkey(&mut sys_store, Self::SYS_KEY_SYS_STARTUP_COUNTER, |d| {
d.into_data()?.into_uint()
})?;
let sv = rkey(&mut sys_store, Self::SYS_KEY_SYS_SETTINGS_VERSION, |d| {
d.into_data()?.into_uint()
})?;
if !(sysdb_data.is_empty()
& auth_store.is_empty()
& sys_store.is_empty()
& sys_auth.users().contains_key(SysAuthUser::USER_ROOT))
{
return Err(StorageError::SysDBCorrupted.into());
}
Ok(SysConfig::new(
RwLock::new(sys_auth),
SysHostData::new(sc, sv as u32),
run_mode,
))
}
Ok(SysConfig::new(
RwLock::new(sys_auth),
SysHostData::new(sc, sv as u32),
run_mode,
))
}

@ -32,24 +32,19 @@ mod tx;
mod sysdb {
use {
super::{
super::sysdb::{self, SystemStoreInitState},
VirtualFS as VFS,
super::{super::sysdb::SystemStoreInitState, VirtualFS as VFS},
crate::engine::{
config::{AuthDriver, ConfigAuth, ConfigMode},
fractal::sys_store::SystemStore,
},
crate::engine::config::{AuthDriver, ConfigAuth, ConfigMode},
};
fn open_sysdb(
auth_config: ConfigAuth,
sysdb_path: &str,
sysdb_cow_path: &str,
) -> sysdb::SystemStoreInit {
sysdb::open_or_reinit_system_database::<VFS>(
auth_config,
ConfigMode::Dev,
sysdb_path,
sysdb_cow_path,
)
.unwrap()
) -> (SystemStore<VFS>, SystemStoreInitState) {
SystemStore::<VFS>::open_with_name(sysdb_path, sysdb_cow_path, auth_config, ConfigMode::Dev)
.unwrap()
}
#[test]
fn open_close() {
@ -62,28 +57,28 @@ mod sysdb {
};
let auth_config = ConfigAuth::new(AuthDriver::Pwd, "password12345678".into());
{
let config = open(auth_config.clone());
assert_eq!(config.state, SystemStoreInitState::Created);
let (config, state) = open(auth_config.clone());
assert_eq!(state, SystemStoreInitState::Created);
assert!(config
.store
.system_store()
.auth_data()
.read()
.verify_user("root", "password12345678")
.is_ok());
assert_eq!(config.store.host_data().settings_version(), 0);
assert_eq!(config.store.host_data().startup_counter(), 0);
assert_eq!(config.system_store().host_data().settings_version(), 0);
assert_eq!(config.system_store().host_data().startup_counter(), 0);
}
// reboot
let config = open(auth_config);
assert_eq!(config.state, SystemStoreInitState::Unchanged);
let (config, state) = open(auth_config);
assert_eq!(state, SystemStoreInitState::Unchanged);
assert!(config
.store
.system_store()
.auth_data()
.read()
.verify_user("root", "password12345678")
.is_ok());
assert_eq!(config.store.host_data().settings_version(), 0);
assert_eq!(config.store.host_data().startup_counter(), 1);
assert_eq!(config.system_store().host_data().settings_version(), 0);
assert_eq!(config.system_store().host_data().startup_counter(), 1);
}
#[test]
fn open_change_root_password() {
@ -95,26 +90,26 @@ mod sysdb {
)
};
{
let config = open(ConfigAuth::new(AuthDriver::Pwd, "password12345678".into()));
assert_eq!(config.state, SystemStoreInitState::Created);
let (config, state) = open(ConfigAuth::new(AuthDriver::Pwd, "password12345678".into()));
assert_eq!(state, SystemStoreInitState::Created);
assert!(config
.store
.system_store()
.auth_data()
.read()
.verify_user("root", "password12345678")
.is_ok());
assert_eq!(config.store.host_data().settings_version(), 0);
assert_eq!(config.store.host_data().startup_counter(), 0);
assert_eq!(config.system_store().host_data().settings_version(), 0);
assert_eq!(config.system_store().host_data().startup_counter(), 0);
}
let config = open(ConfigAuth::new(AuthDriver::Pwd, "password23456789".into()));
assert_eq!(config.state, SystemStoreInitState::UpdatedRoot);
let (config, state) = open(ConfigAuth::new(AuthDriver::Pwd, "password23456789".into()));
assert_eq!(state, SystemStoreInitState::UpdatedRoot);
assert!(config
.store
.system_store()
.auth_data()
.read()
.verify_user("root", "password23456789")
.is_ok());
assert_eq!(config.store.host_data().settings_version(), 1);
assert_eq!(config.store.host_data().startup_counter(), 1);
assert_eq!(config.system_store().host_data().settings_version(), 1);
assert_eq!(config.system_store().host_data().startup_counter(), 1);
}
}

Loading…
Cancel
Save