diff --git a/server/src/engine/core/dcl.rs b/server/src/engine/core/dcl.rs new file mode 100644 index 00000000..127328d0 --- /dev/null +++ b/server/src/engine/core/dcl.rs @@ -0,0 +1,61 @@ +/* + * Created on Fri Nov 10 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crate::engine::{ + data::{tag::TagClass, DictEntryGeneric}, + error::{QueryError, QueryResult}, + fractal::GlobalInstanceLike, + net::protocol::ClientLocalState, + ql::dcl::{UserAdd, UserDel}, +}; + +const KEY_PASSWORD: &str = "password"; + +pub fn create_user(global: &impl GlobalInstanceLike, mut user_add: UserAdd<'_>) -> QueryResult<()> { + let username = user_add.username().to_owned(); + let password = match user_add.options_mut().remove(KEY_PASSWORD) { + Some(DictEntryGeneric::Data(d)) + if d.kind() == TagClass::Str && user_add.options().is_empty() => + unsafe { d.into_str().unwrap_unchecked() }, + None | Some(_) => { + // invalid properties + return Err(QueryError::QExecDdlInvalidProperties); + } + }; + global.sys_store().create_new_user(username, password) +} + +pub fn drop_user( + global: &impl GlobalInstanceLike, + cstate: &ClientLocalState, + user_del: UserDel<'_>, +) -> QueryResult<()> { + if cstate.username() == user_del.username() { + // you can't delete yourself! + return Err(QueryError::SysAuthError); + } + global.sys_store().drop_user(user_del.username()) +} diff --git a/server/src/engine/core/exec.rs b/server/src/engine/core/exec.rs index 2ae93770..c5c1104b 100644 --- a/server/src/engine/core/exec.rs +++ b/server/src/engine/core/exec.rs @@ -29,7 +29,7 @@ use { core::{dml, model::Model, space::Space}, error::{QueryError, QueryResult}, fractal::Global, - net::protocol::{Response, SQuery}, + net::protocol::{ClientLocalState, Response, SQuery}, ql::{ ast::{traits::ASTNode, InplaceData, State}, lex::{Keyword, KeywordStmt, Token}, @@ -38,8 +38,9 @@ use { core::ops::Deref, }; -pub async fn dispatch_to_executor<'a, 'b>( - global: &'b Global, +pub async fn dispatch_to_executor<'a>( + global: &Global, + cstate: &ClientLocalState, query: SQuery<'a>, ) -> QueryResult { let tokens = @@ -52,9 +53,9 @@ pub async fn dispatch_to_executor<'a, 'b>( }; state.cursor_ahead(); if stmt.is_blocking() { - run_blocking_stmt(state, stmt, global).await + run_blocking_stmt(global, cstate, state, stmt).await } else { - run_nb(global, state, stmt) + run_nb(global, cstate, state, stmt) } } @@ -114,10 +115,15 @@ fn _call + core::fmt::Debug, T>( } async fn run_blocking_stmt( + global: &Global, + cstate: &ClientLocalState, mut state: State<'_, InplaceData>, stmt: KeywordStmt, - global: &Global, ) -> Result { + if !cstate.is_root() { + // all the actions here need root permission + return Err(QueryError::SysPermissionDenied); + } let (a, b) = (&state.current()[0], &state.current()[1]); let sysctl = stmt == KeywordStmt::Sysctl; let create = stmt == KeywordStmt::Create; @@ -132,24 +138,26 @@ async fn run_blocking_stmt( let d_m = (drop & Token![model].eq(a) & last_id) as u8 * 7; let fc = sysctl as u8 | c_s | c_m | a_s | a_m | d_s | d_m; state.cursor_ahead(); - static BLK_EXEC: [fn(Global, RawSlice>) -> QueryResult<()>; 8] = [ - |_, _| Err(QueryError::QLUnknownStatement), // unknown - blocking_exec_sysctl, // sysctl - |g, t| call(g, t, Space::transactional_exec_create), - |g, t| call(g, t, Model::transactional_exec_create), - |g, t| call(g, t, Space::transactional_exec_alter), - |g, t| call(g, t, Model::transactional_exec_alter), - |g, t| call(g, t, Space::transactional_exec_drop), - |g, t| call(g, t, Model::transactional_exec_drop), + static BLK_EXEC: [fn(Global, &ClientLocalState, RawSlice>) -> QueryResult<()>; + 8] = [ + |_, _, _| Err(QueryError::QLUnknownStatement), // unknown + blocking_exec_sysctl, // sysctl + |g, _, t| call(g, t, Space::transactional_exec_create), + |g, _, t| call(g, t, Model::transactional_exec_create), + |g, _, t| call(g, t, Space::transactional_exec_alter), + |g, _, t| call(g, t, Model::transactional_exec_alter), + |g, _, t| call(g, t, Space::transactional_exec_drop), + |g, _, t| call(g, t, Model::transactional_exec_drop), ]; let r = unsafe { // UNSAFE(@ohsayan): the only await is within this block let c_glob = global.clone(); let ptr = state.current().as_ptr() as usize; let len = state.current().len(); + let cstate: &'static ClientLocalState = core::mem::transmute(cstate); tokio::task::spawn_blocking(move || { let tokens = RawSlice::new(ptr as *const Token, len); - BLK_EXEC[fc as usize](c_glob, tokens)?; + BLK_EXEC[fc as usize](c_glob, cstate, tokens)?; Ok(Response::Empty) }) .await @@ -157,8 +165,30 @@ async fn run_blocking_stmt( r.unwrap() } -fn blocking_exec_sysctl(_: Global, _: RawSlice>) -> QueryResult<()> { - todo!() +fn blocking_exec_sysctl( + g: Global, + cstate: &ClientLocalState, + tokens: RawSlice>, +) -> QueryResult<()> { + let mut state = State::new_inplace(&tokens); + /* + currently supported: sysctl create user, sysctl drop user + */ + if state.remaining() != 2 { + return Err(QueryError::QLInvalidSyntax); + } + let (a, b) = (state.fw_read(), state.fw_read()); + match (a, b) { + (Token![create], Token::Ident(id)) if id.eq_ignore_ascii_case("user") => { + let useradd = ASTNode::from_state(&mut state)?; + super::dcl::create_user(&g, useradd) + } + (Token![drop], Token::Ident(id)) if id.eq_ignore_ascii_case("user") => { + let userdel = ASTNode::from_state(&mut state)?; + super::dcl::drop_user(&g, cstate, userdel) + } + _ => Err(QueryError::QLUnknownStatement), + } } /* @@ -167,6 +197,7 @@ fn blocking_exec_sysctl(_: Global, _: RawSlice>) -> QueryResult<( fn run_nb( global: &Global, + _cstate: &ClientLocalState, state: State<'_, InplaceData>, stmt: KeywordStmt, ) -> QueryResult { diff --git a/server/src/engine/core/index/row.rs b/server/src/engine/core/index/row.rs index c2e409bf..c44d8a4f 100644 --- a/server/src/engine/core/index/row.rs +++ b/server/src/engine/core/index/row.rs @@ -110,7 +110,7 @@ impl Row { data, schema_version, txn_revised_data, - DeltaVersion::__new(0), + DeltaVersion::genesis(), ) } pub fn new_restored( diff --git a/server/src/engine/core/mod.rs b/server/src/engine/core/mod.rs index a3cff237..da929a4f 100644 --- a/server/src/engine/core/mod.rs +++ b/server/src/engine/core/mod.rs @@ -24,22 +24,24 @@ * */ -use self::dml::QueryExecMeta; -pub use self::util::{EntityID, EntityIDRef}; -use super::{fractal::GlobalInstanceLike, ql::ast::Entity}; +pub(in crate::engine) mod dcl; pub(in crate::engine) mod dml; -pub mod exec; +pub(in crate::engine) mod exec; pub(in crate::engine) mod index; pub(in crate::engine) mod model; pub(in crate::engine) mod query_meta; -pub mod space; +pub(in crate::engine) mod space; +// util mod util; // test #[cfg(test)] pub(super) mod tests; +// re-exports +pub use self::util::{EntityID, EntityIDRef}; // imports use { - self::model::Model, + self::{dml::QueryExecMeta, model::Model}, + super::{fractal::GlobalInstanceLike, ql::ast::Entity}, crate::engine::{ core::space::Space, error::{QueryError, QueryResult}, diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index 91b1434f..a3760b24 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -24,18 +24,13 @@ * */ -#![allow(unused)] - use { super::{Fields, Model}, - crate::{ - engine::{ - core::{dml::QueryExecMeta, index::Row}, - fractal::{FractalToken, GlobalInstanceLike}, - sync::atm::Guard, - sync::queue::Queue, - }, - util::compiler, + crate::engine::{ + core::{dml::QueryExecMeta, index::Row}, + fractal::{FractalToken, GlobalInstanceLike}, + sync::atm::Guard, + sync::queue::Queue, }, parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}, std::{ @@ -70,8 +65,8 @@ impl PartialEq for ISyncMatrix { #[derive(Debug)] /// Read model, write new data pub struct IRModelSMData<'a> { - rmodel: RwLockReadGuard<'a, ()>, - mdata: RwLockReadGuard<'a, ()>, + _rmodel: RwLockReadGuard<'a, ()>, + _mdata: RwLockReadGuard<'a, ()>, fields: &'a Fields, } @@ -80,8 +75,8 @@ impl<'a> IRModelSMData<'a> { let rmodel = m.sync_matrix().v_priv_model_alter.read(); let mdata = m.sync_matrix().v_priv_data_new_or_revise.read(); Self { - rmodel, - mdata, + _rmodel: rmodel, + _mdata: mdata, fields: unsafe { // UNSAFE(@ohsayan): we already have acquired this resource m._read_fields() @@ -96,14 +91,14 @@ impl<'a> IRModelSMData<'a> { #[derive(Debug)] /// Read model pub struct IRModel<'a> { - rmodel: RwLockReadGuard<'a, ()>, + _rmodel: RwLockReadGuard<'a, ()>, fields: &'a Fields, } impl<'a> IRModel<'a> { pub fn new(m: &'a Model) -> Self { Self { - rmodel: m.sync_matrix().v_priv_model_alter.read(), + _rmodel: m.sync_matrix().v_priv_model_alter.read(), fields: unsafe { // UNSAFE(@ohsayan): we already have acquired this resource m._read_fields() @@ -118,14 +113,14 @@ impl<'a> IRModel<'a> { #[derive(Debug)] /// Write model pub struct IWModel<'a> { - wmodel: RwLockWriteGuard<'a, ()>, + _wmodel: RwLockWriteGuard<'a, ()>, fields: &'a mut Fields, } impl<'a> IWModel<'a> { pub fn new(m: &'a Model) -> Self { Self { - wmodel: m.sync_matrix().v_priv_model_alter.write(), + _wmodel: m.sync_matrix().v_priv_model_alter.write(), fields: unsafe { // UNSAFE(@ohsayan): we have exclusive access to this resource m._read_fields_mut() @@ -211,9 +206,6 @@ impl DeltaState { pub fn create_new_data_delta_version(&self) -> DeltaVersion { DeltaVersion(self.__data_delta_step()) } - pub fn get_data_delta_size(&self) -> usize { - self.data_deltas_size.load(Ordering::Acquire) - } } impl DeltaState { @@ -356,7 +348,7 @@ impl<'a> SchemaDeltaIndexRGuard<'a> { #[derive(Debug, Clone)] pub struct DataDelta { - schema_version: DeltaVersion, + _schema_version: DeltaVersion, data_version: DeltaVersion, row: Row, change: DataDeltaKind, @@ -370,15 +362,12 @@ impl DataDelta { change: DataDeltaKind, ) -> Self { Self { - schema_version, + _schema_version: schema_version, data_version, row, change, } } - pub fn schema_version(&self) -> DeltaVersion { - self.schema_version - } pub fn data_version(&self) -> DeltaVersion { self.data_version } diff --git a/server/src/engine/data/cell.rs b/server/src/engine/data/cell.rs index c23ba0a4..f8a3dfd3 100644 --- a/server/src/engine/data/cell.rs +++ b/server/src/engine/data/cell.rs @@ -220,6 +220,21 @@ impl Datacell { pub fn str(&self) -> &str { self.try_str().unwrap() } + pub fn into_str(self) -> Option { + if self.kind() != TagClass::Str { + return None; + } + unsafe { + // UNSAFE(@ohsayan): no double free + tagck + let md = ManuallyDrop::new(self); + let (a, b) = md.data.word.dwordqn_load_qw_nw(); + Some(String::from_raw_parts( + b as *const u8 as *mut u8, + a as usize, + a as usize, + )) + } + } // list pub fn new_list(l: Vec) -> Self { unsafe { diff --git a/server/src/engine/error.rs b/server/src/engine/error.rs index a9aeae83..6e41fc2d 100644 --- a/server/src/engine/error.rs +++ b/server/src/engine/error.rs @@ -45,6 +45,8 @@ pub enum QueryError { SysAuthError = 3, /// transactional error SysTransactionalError = 4, + /// insufficient permissions error + SysPermissionDenied = 5, // exchange NetworkSubsystemCorruptedPacket = 24, // QL diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index 6cc37f0f..74975e37 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -25,6 +25,7 @@ */ use { + self::sys_store::SystemStore, super::{ core::{dml::QueryExecMeta, model::Model, GlobalNS}, data::uuid::Uuid, @@ -40,11 +41,11 @@ use { tokio::sync::mpsc::unbounded_channel, }; -pub mod config; pub mod context; mod drivers; pub mod error; mod mgr; +pub mod sys_store; #[cfg(test)] pub mod test_utils; mod util; @@ -74,7 +75,7 @@ pub struct GlobalStateStart { /// Must be called iff this is the only thread calling it pub unsafe fn load_and_enable_all( gns: GlobalNS, - config: config::SysConfig, + config: SystemStore, gns_driver: GNSTransactionDriverAnyFS, model_drivers: ModelDrivers, ) -> GlobalStateStart { @@ -145,7 +146,7 @@ pub trait GlobalInstanceLike { } } // config handle - fn sys_cfg(&self) -> &config::SysConfig; + fn sys_store(&self) -> &SystemStore; } impl GlobalInstanceLike for Global { @@ -169,7 +170,7 @@ impl GlobalInstanceLike for Global { self._get_max_delta_size() } // sys - fn sys_cfg(&self) -> &config::SysConfig { + fn sys_store(&self) -> &SystemStore { &self.get_state().config } // model @@ -264,7 +265,7 @@ struct GlobalState { gns_driver: drivers::FractalGNSDriver, mdl_driver: RwLock>, task_mgr: mgr::FractalMgr, - config: config::SysConfig, + config: SystemStore, } impl GlobalState { @@ -273,7 +274,7 @@ impl GlobalState { gns_driver: drivers::FractalGNSDriver, mdl_driver: RwLock>, task_mgr: mgr::FractalMgr, - config: config::SysConfig, + config: SystemStore, ) -> Self { Self { gns, diff --git a/server/src/engine/fractal/config.rs b/server/src/engine/fractal/sys_store.rs similarity index 69% rename from server/src/engine/fractal/config.rs rename to server/src/engine/fractal/sys_store.rs index 699b401a..625285d0 100644 --- a/server/src/engine/fractal/config.rs +++ b/server/src/engine/fractal/sys_store.rs @@ -24,17 +24,31 @@ * */ -use crate::engine::config::ConfigAuth; - use { crate::engine::{ - config::ConfigMode, + config::{ConfigAuth, ConfigMode}, error::{QueryError, QueryResult}, + storage::v1::RawFSInterface, }, parking_lot::RwLock, - std::collections::{hash_map::Entry, HashMap}, + std::{ + collections::{hash_map::Entry, HashMap}, + marker::PhantomData, + }, }; +#[derive(Debug)] +pub struct SystemStore { + syscfg: SysConfig, + _fs: PhantomData, +} + +impl SystemStore { + pub fn system_store(&self) -> &SysConfig { + &self.syscfg + } +} + #[derive(Debug)] /// The global system configuration pub struct SysConfig { @@ -63,10 +77,10 @@ impl SysConfig { pub fn new_full(new_auth: ConfigAuth, host_data: SysHostData, run_mode: ConfigMode) -> Self { Self::new( RwLock::new(SysAuth::new( - rcrypt::hash(new_auth.root_key, rcrypt::DEFAULT_COST) + into_dict!(SysAuthUser::USER_ROOT => SysAuthUser::new( + rcrypt::hash(new_auth.root_key.as_str(), rcrypt::DEFAULT_COST) .unwrap() - .into_boxed_slice(), - Default::default(), + .into_boxed_slice())), )), host_data, run_mode, @@ -80,10 +94,10 @@ impl SysConfig { pub(super) fn test_default() -> Self { Self { auth_data: RwLock::new(SysAuth::new( + into_dict!(SysAuthUser::USER_ROOT => SysAuthUser::new( rcrypt::hash("password12345678", rcrypt::DEFAULT_COST) .unwrap() - .into_boxed_slice(), - Default::default(), + .into_boxed_slice())), )), host_data: SysHostData::new(0, 0), run_mode: ConfigMode::Dev, @@ -133,6 +147,51 @@ impl SysHostData { } } +impl SystemStore { + pub fn _new(syscfg: SysConfig) -> Self { + Self { + syscfg, + _fs: PhantomData, + } + } + /// Create a new user with the given details + pub fn create_new_user(&self, username: String, password: String) -> QueryResult<()> { + // TODO(@ohsayan): we want to be very careful with this + let _username = username.clone(); + let mut auth = self.system_store().auth_data().write(); + match auth.users.entry(username.into()) { + Entry::Vacant(ve) => { + ve.insert(SysAuthUser::new( + rcrypt::hash(password, rcrypt::DEFAULT_COST) + .unwrap() + .into_boxed_slice(), + )); + self.sync_db_or_rollback(|| { + auth.users.remove(_username.as_str()); + })?; + Ok(()) + } + Entry::Occupied(_) => Err(QueryError::SysAuthError), + } + } + pub fn drop_user(&self, username: &str) -> QueryResult<()> { + let mut auth = self.system_store().auth_data().write(); + if username == SysAuthUser::USER_ROOT { + // you can't remove root! + return Err(QueryError::SysAuthError); + } + match auth.users.remove_entry(username) { + Some((username, user)) => { + self.sync_db_or_rollback(|| { + let _ = auth.users.insert(username, user); + })?; + Ok(()) + } + None => Err(QueryError::SysAuthError), + } + } +} + /* auth */ @@ -140,28 +199,24 @@ impl SysHostData { #[derive(Debug, PartialEq)] /// The auth data section (system.auth) pub struct SysAuth { - root_key: Box<[u8]>, users: HashMap, SysAuthUser>, } impl SysAuth { /// New [`SysAuth`] with the given settings - pub fn new(root_key: Box<[u8]>, users: HashMap, SysAuthUser>) -> Self { - Self { root_key, users } + pub fn new(users: HashMap, SysAuthUser>) -> Self { + Self { users } } - /// Create a new user with the given details - #[allow(unused)] - pub fn create_new_user(&mut self, username: &str, password: &str) -> QueryResult<()> { - match self.users.entry(username.into()) { - Entry::Vacant(ve) => { - ve.insert(SysAuthUser::new( - rcrypt::hash(password, rcrypt::DEFAULT_COST) - .unwrap() - .into_boxed_slice(), - )); - Ok(()) + pub fn verify_user_is_root + ?Sized>( + &self, + username: &str, + password: &T, + ) -> QueryResult { + match self.users.get(username) { + Some(user) if rcrypt::verify(password, user.key()).unwrap() => { + Ok(username == SysAuthUser::USER_ROOT) } - Entry::Occupied(_) => Err(QueryError::SysAuthError), + Some(_) | None => Err(QueryError::SysAuthError), } } /// Verify the user with the given details @@ -170,20 +225,7 @@ impl SysAuth { username: &str, password: &T, ) -> QueryResult<()> { - if username == "root" { - if rcrypt::verify(password, self.root_key()).unwrap() { - return Ok(()); - } else { - return Err(QueryError::SysAuthError); - } - } - match self.users.get(username) { - Some(user) if rcrypt::verify(password, user.key()).unwrap() => Ok(()), - Some(_) | None => Err(QueryError::SysAuthError), - } - } - pub fn root_key(&self) -> &[u8] { - &self.root_key + self.verify_user_is_root(username, password).map(|_| ()) } pub fn users(&self) -> &HashMap, SysAuthUser> { &self.users @@ -197,6 +239,7 @@ pub struct SysAuthUser { } impl SysAuthUser { + pub const USER_ROOT: &'static str = "root"; /// Create a new [`SysAuthUser`] pub fn new(key: Box<[u8]>) -> Self { Self { key } diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index d3211ee9..504a5773 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -26,8 +26,8 @@ use { super::{ - config::SysConfig, CriticalTask, FractalModelDriver, GenericTask, GlobalInstanceLike, - ModelUniqueID, Task, + sys_store::{SysConfig, SystemStore}, + CriticalTask, FractalModelDriver, GenericTask, GlobalInstanceLike, ModelUniqueID, Task, }, crate::engine::{ core::GlobalNS, @@ -54,7 +54,7 @@ pub struct TestGlobal { max_delta_size: usize, txn_driver: Mutex>, model_drivers: RwLock>>, - sys_cfg: SysConfig, + sys_cfg: SystemStore, } impl TestGlobal { @@ -70,7 +70,7 @@ impl TestGlobal { max_delta_size, txn_driver: Mutex::new(txn_driver), model_drivers: RwLock::default(), - sys_cfg: SysConfig::test_default(), + sys_cfg: SystemStore::_new(SysConfig::test_default()), } } } @@ -117,7 +117,7 @@ impl GlobalInstanceLike for TestGlobal { fn get_max_delta_size(&self) -> usize { 100 } - fn sys_cfg(&self) -> &super::config::SysConfig { + fn sys_store(&self) -> &SystemStore { &self.sys_cfg } fn initialize_model_driver( diff --git a/server/src/engine/mod.rs b/server/src/engine/mod.rs index 57fd0fb5..bb8f7788 100644 --- a/server/src/engine/mod.rs +++ b/server/src/engine/mod.rs @@ -47,10 +47,12 @@ pub use error::RuntimeResult; use { self::{ config::{ConfigEndpoint, ConfigEndpointTls, ConfigMode, ConfigReturn, Configuration}, - fractal::context::{self, Subsystem}, + fractal::{ + context::{self, Subsystem}, + sys_store::SystemStore, + }, storage::v1::{ loader::{self, SEInitState}, - sysdb::{self, SystemStoreInit}, LocalFS, }, }, @@ -83,8 +85,7 @@ pub fn load_all() -> RuntimeResult<(Configuration, fractal::GlobalStateStart)> { // restore system database info!("loading system database ..."); context::set_dmsg("loading system database"); - let SystemStoreInit { store, state } = - sysdb::open_system_database::(config.auth.clone(), config.mode)?; + let (store, state) = SystemStore::::open_or_restore(config.auth.clone(), config.mode)?; let sysdb_is_new = state.is_created(); if state.is_existing_updated_root() { warn!("the root account was updated"); diff --git a/server/src/engine/net/protocol/mod.rs b/server/src/engine/net/protocol/mod.rs index cb3cdcaf..a553d5db 100644 --- a/server/src/engine/net/protocol/mod.rs +++ b/server/src/engine/net/protocol/mod.rs @@ -51,6 +51,25 @@ use { tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}, }; +#[derive(Debug, PartialEq)] +pub struct ClientLocalState { + username: Box, + root: bool, + hs: handshake::CHandshakeStatic, +} + +impl ClientLocalState { + pub fn new(username: Box, root: bool, hs: handshake::CHandshakeStatic) -> Self { + Self { username, root, hs } + } + pub fn is_root(&self) -> bool { + self.root + } + pub fn username(&self) -> &str { + &self.username + } +} + #[derive(Debug, PartialEq)] pub enum Response { Empty, @@ -63,7 +82,7 @@ pub(super) async fn query_loop( global: &Global, ) -> IoResult { // handshake - let _ = match do_handshake(con, buf, global).await? { + let client_state = match do_handshake(con, buf, global).await? { PostHandshake::Okay(hs) => hs, PostHandshake::ConnectionClosedFin => return Ok(QueryLoopResult::Fin), PostHandshake::ConnectionClosedRst => return Ok(QueryLoopResult::Rst), @@ -116,7 +135,7 @@ pub(super) async fn query_loop( } }; // now execute query - match engine::core::exec::dispatch_to_executor(global, sq).await { + match engine::core::exec::dispatch_to_executor(global, &client_state, sq).await { Ok(Response::Empty) => { con.write_all(&[0x12]).await?; } @@ -137,7 +156,7 @@ pub(super) async fn query_loop( #[derive(Debug, PartialEq)] enum PostHandshake { - Okay(handshake::CHandshakeStatic), + Okay(ClientLocalState), Error(ProtocolError), ConnectionClosedFin, ConnectionClosedRst, @@ -197,14 +216,20 @@ async fn do_handshake( } match core::str::from_utf8(handshake.hs_auth().username()) { Ok(uname) => { - let auth = global.sys_cfg().auth_data().read(); - if auth - .verify_user(uname, handshake.hs_auth().password()) - .is_ok() - { - let hs_static = handshake.hs_static(); - buf.advance(cursor); - return Ok(PostHandshake::Okay(hs_static)); + let auth = global.sys_store().system_store().auth_data().read(); + let r = auth.verify_user_is_root(uname, handshake.hs_auth().password()); + match r { + Ok(is_root) => { + let hs = handshake.hs_static(); + let ret = Ok(PostHandshake::Okay(ClientLocalState::new( + uname.into(), + is_root, + hs, + ))); + buf.advance(cursor); + return ret; + } + Err(_) => {} } } Err(_) => {} diff --git a/server/src/engine/ql/dcl.rs b/server/src/engine/ql/dcl.rs index 630e2f42..f624bab6 100644 --- a/server/src/engine/ql/dcl.rs +++ b/server/src/engine/ql/dcl.rs @@ -95,6 +95,15 @@ impl<'a> UserAdd<'a> { pub fn parse>(state: &mut State<'a, Qd>) -> QueryResult { parse(state).map(|UserMeta { username, options }: UserMeta| Self::new(username, options)) } + pub fn username(&self) -> &str { + self.username + } + pub fn options_mut(&mut self) -> &mut DictGeneric { + &mut self.options + } + pub fn options(&self) -> &DictGeneric { + &self.options + } } impl<'a> traits::ASTNode<'a> for UserAdd<'a> { @@ -131,6 +140,9 @@ impl<'a> UserDel<'a> { } Err(QueryError::QLInvalidSyntax) } + pub fn username(&self) -> &str { + self.username + } } impl<'a> traits::ASTNode<'a> for UserDel<'a> { diff --git a/server/src/engine/storage/v1/sysdb.rs b/server/src/engine/storage/v1/sysdb.rs index 71703c3d..12a8c63e 100644 --- a/server/src/engine/storage/v1/sysdb.rs +++ b/server/src/engine/storage/v1/sysdb.rs @@ -30,22 +30,13 @@ use { config::{ConfigAuth, ConfigMode}, data::{cell::Datacell, DictEntryGeneric, DictGeneric}, error::{RuntimeResult, StorageError}, - fractal::config::{SysAuth, SysAuthUser, SysConfig, SysHostData}, + fractal::sys_store::{SysAuth, SysAuthUser, SysConfig, SysHostData, SystemStore}, storage::v1::{inf, spec, RawFSInterface, SDSSFileIO}, }, parking_lot::RwLock, std::collections::HashMap, }; -const SYSDB_PATH: &str = "sys.db"; -const SYSDB_COW_PATH: &str = "sys.db.cow"; -const SYS_KEY_AUTH: &str = "auth"; -const SYS_KEY_AUTH_ROOT: &str = "root"; -const SYS_KEY_AUTH_USERS: &str = "users"; -const SYS_KEY_SYS: &str = "sys"; -const SYS_KEY_SYS_STARTUP_COUNTER: &str = "sc"; -const SYS_KEY_SYS_SETTINGS_VERSION: &str = "sv"; - #[derive(Debug, PartialEq)] /// The system store init state pub enum SystemStoreInitState { @@ -66,123 +57,6 @@ impl SystemStoreInitState { } } -#[derive(Debug, PartialEq)] -/// Result of initializing the system store (sysdb) -pub struct SystemStoreInit { - pub store: SysConfig, - pub state: SystemStoreInitState, -} - -impl SystemStoreInit { - pub fn new(store: SysConfig, state: SystemStoreInitState) -> Self { - Self { store, state } - } -} - -/// Open the system database -/// -/// - If it doesn't exist, create it -/// - If it exists, look for config changes and sync them -pub fn open_system_database( - auth: ConfigAuth, - mode: ConfigMode, -) -> RuntimeResult { - open_or_reinit_system_database::(auth, mode, SYSDB_PATH, SYSDB_COW_PATH) -} - -/// Open or re-initialize the system database -pub fn open_or_reinit_system_database( - auth: ConfigAuth, - run_mode: ConfigMode, - sysdb_path: &str, - sysdb_path_cow: &str, -) -> RuntimeResult { - let sysdb_file = match SDSSFileIO::::open_or_create_perm_rw::(sysdb_path)? { - FileOpen::Created(new) => { - // init new syscfg - let new_syscfg = SysConfig::new_auth(auth, run_mode); - sync_system_database_to(&new_syscfg, new)?; - return Ok(SystemStoreInit::new( - new_syscfg, - SystemStoreInitState::Created, - )); - } - FileOpen::Existing((ex, _)) => ex, - }; - let prev_sysdb = decode_system_database(sysdb_file, run_mode)?; - let state; - // see if settings have changed - if prev_sysdb - .auth_data() - .read() - .verify_user("root", &auth.root_key) - .is_ok() - { - state = SystemStoreInitState::Unchanged; - } else { - state = SystemStoreInitState::UpdatedRoot; - } - // create new config - let new_syscfg = SysConfig::new_full( - auth, - SysHostData::new( - prev_sysdb.host_data().startup_counter() + 1, - prev_sysdb.host_data().settings_version() - + !matches!(state, SystemStoreInitState::Unchanged) as u32, - ), - run_mode, - ); - // sync - sync_system_database_to( - &new_syscfg, - SDSSFileIO::::create::(sysdb_path_cow)?, - )?; - Fs::fs_rename_file(sysdb_path_cow, sysdb_path)?; - Ok(SystemStoreInit::new(new_syscfg, state)) -} - -/// Sync the system database to the given file -pub fn sync_system_database_to( - cfg: &SysConfig, - mut f: SDSSFileIO, -) -> RuntimeResult<()> { - // prepare our flat file - let mut map: DictGeneric = into_dict!( - SYS_KEY_SYS => DictEntryGeneric::Map(into_dict!( - SYS_KEY_SYS_SETTINGS_VERSION => Datacell::new_uint_default(cfg.host_data().settings_version() as _), - SYS_KEY_SYS_STARTUP_COUNTER => Datacell::new_uint_default(cfg.host_data().startup_counter() as _), - )), - SYS_KEY_AUTH => DictGeneric::new(), - ); - let auth_key = map.get_mut(SYS_KEY_AUTH).unwrap(); - let auth = cfg.auth_data().read(); - let auth_key = auth_key.as_dict_mut().unwrap(); - auth_key.insert( - SYS_KEY_AUTH_ROOT.into(), - DictEntryGeneric::Data(Datacell::new_bin(auth.root_key().into())), - ); - auth_key.insert( - SYS_KEY_AUTH_USERS.into(), - DictEntryGeneric::Map( - // username -> [..settings] - auth.users() - .iter() - .map(|(username, user)| { - ( - username.to_owned(), - DictEntryGeneric::Data(Datacell::new_list(vec![Datacell::new_bin( - user.key().into(), - )])), - ) - }) - .collect(), - ), - ); - // write - let buf = super::inf::enc::enc_dict_full::(&map); - f.fsynced_write(&buf) -} - fn rkey( d: &mut DictGeneric, key: &str, @@ -194,55 +68,181 @@ fn rkey( } } -/// Decode the system database -pub fn decode_system_database( - mut f: SDSSFileIO, - run_mode: ConfigMode, -) -> RuntimeResult { - let mut sysdb_data = - inf::dec::dec_dict_full::(&f.load_remaining_into_buffer()?)?; - // get our auth and sys stores - let mut auth_store = rkey(&mut sysdb_data, SYS_KEY_AUTH, DictEntryGeneric::into_dict)?; - let mut sys_store = rkey(&mut sysdb_data, SYS_KEY_SYS, DictEntryGeneric::into_dict)?; - // load auth store - let root_key = rkey(&mut auth_store, SYS_KEY_AUTH_ROOT, |d| { - d.into_data()?.into_bin() - })?; - let users = rkey( - &mut auth_store, - SYS_KEY_AUTH_USERS, - DictEntryGeneric::into_dict, - )?; - // load users - let mut loaded_users = HashMap::new(); - for (username, userdata) in users { - let mut userdata = userdata - .into_data() - .and_then(Datacell::into_list) - .ok_or(StorageError::SysDBCorrupted)?; - if userdata.len() != 1 { - return Err(StorageError::SysDBCorrupted.into()); +impl SystemStore { + const SYSDB_PATH: &'static str = "sys.db"; + const SYSDB_COW_PATH: &'static str = "sys.db.cow"; + const SYS_KEY_AUTH: &'static str = "auth"; + const SYS_KEY_AUTH_USERS: &'static str = "users"; + const SYS_KEY_SYS: &'static str = "sys"; + const SYS_KEY_SYS_STARTUP_COUNTER: &'static str = "sc"; + const SYS_KEY_SYS_SETTINGS_VERSION: &'static str = "sv"; + pub fn open_or_restore( + auth: ConfigAuth, + run_mode: ConfigMode, + ) -> RuntimeResult<(Self, SystemStoreInitState)> { + Self::open_with_name(Self::SYSDB_PATH, Self::SYSDB_COW_PATH, auth, run_mode) + } + pub fn sync_db_or_rollback(&self, rb: impl FnOnce()) -> RuntimeResult<()> { + match self.sync_db() { + Ok(()) => Ok(()), + Err(e) => { + rb(); + Err(e) + } + } + } + pub fn sync_db(&self) -> RuntimeResult<()> { + self._sync_with(Self::SYSDB_PATH, Self::SYSDB_COW_PATH) + } + pub fn open_with_name( + sysdb_name: &str, + sysdb_cow_path: &str, + auth: ConfigAuth, + run_mode: ConfigMode, + ) -> RuntimeResult<(Self, SystemStoreInitState)> { + match SDSSFileIO::open_or_create_perm_rw::(sysdb_name)? { + FileOpen::Created(new) => { + let me = Self::_new(SysConfig::new_auth(auth, run_mode)); + me._sync(new)?; + Ok((me, SystemStoreInitState::Created)) + } + FileOpen::Existing((ex, _)) => { + Self::restore_and_sync(ex, auth, run_mode, sysdb_name, sysdb_cow_path) + } + } + } +} + +impl SystemStore { + fn _sync(&self, mut f: SDSSFileIO) -> RuntimeResult<()> { + let cfg = self.system_store(); + // prepare our flat file + let mut map: DictGeneric = into_dict!( + Self::SYS_KEY_SYS => DictEntryGeneric::Map(into_dict!( + Self::SYS_KEY_SYS_SETTINGS_VERSION => Datacell::new_uint_default(cfg.host_data().settings_version() as _), + Self::SYS_KEY_SYS_STARTUP_COUNTER => Datacell::new_uint_default(cfg.host_data().startup_counter() as _), + )), + Self::SYS_KEY_AUTH => DictGeneric::new(), + ); + let auth_key = map.get_mut(Self::SYS_KEY_AUTH).unwrap(); + let auth = cfg.auth_data().read(); + let auth_key = auth_key.as_dict_mut().unwrap(); + auth_key.insert( + Self::SYS_KEY_AUTH_USERS.into(), + DictEntryGeneric::Map( + // username -> [..settings] + auth.users() + .iter() + .map(|(username, user)| { + ( + username.to_owned(), + DictEntryGeneric::Data(Datacell::new_list(vec![Datacell::new_bin( + user.key().into(), + )])), + ) + }) + .collect(), + ), + ); + // write + let buf = super::inf::enc::enc_dict_full::(&map); + f.fsynced_write(&buf) + } + fn _sync_with(&self, target: &str, cow: &str) -> RuntimeResult<()> { + let f = SDSSFileIO::create::(cow)?; + self._sync(f)?; + Fs::fs_rename_file(cow, target) + } + fn restore_and_sync( + f: SDSSFileIO, + auth: ConfigAuth, + run_mode: ConfigMode, + fname: &str, + fcow_name: &str, + ) -> RuntimeResult<(Self, SystemStoreInitState)> { + let prev_sysdb = Self::_restore(f, run_mode)?; + let state; + // see if settings have changed + if prev_sysdb + .auth_data() + .read() + .verify_user(SysAuthUser::USER_ROOT, &auth.root_key) + .is_ok() + { + state = SystemStoreInitState::Unchanged; + } else { + state = SystemStoreInitState::UpdatedRoot; } - let user_password = userdata - .remove(0) - .into_bin() - .ok_or(StorageError::SysDBCorrupted)?; - loaded_users.insert(username, SysAuthUser::new(user_password.into_boxed_slice())); + // create new config + let new_syscfg = SysConfig::new_full( + auth, + SysHostData::new( + prev_sysdb.host_data().startup_counter() + 1, + prev_sysdb.host_data().settings_version() + + !matches!(state, SystemStoreInitState::Unchanged) as u32, + ), + run_mode, + ); + let slf = Self::_new(new_syscfg); + // now sync + slf._sync_with(fname, fcow_name)?; + Ok((slf, state)) } - let sys_auth = SysAuth::new(root_key.into_boxed_slice(), loaded_users); - // load sys data - let sc = rkey(&mut sys_store, SYS_KEY_SYS_STARTUP_COUNTER, |d| { - d.into_data()?.into_uint() - })?; - let sv = rkey(&mut sys_store, SYS_KEY_SYS_SETTINGS_VERSION, |d| { - d.into_data()?.into_uint() - })?; - if !(sysdb_data.is_empty() & auth_store.is_empty() & sys_store.is_empty()) { - return Err(StorageError::SysDBCorrupted.into()); + fn _restore(mut f: SDSSFileIO, run_mode: ConfigMode) -> RuntimeResult { + let mut sysdb_data = + inf::dec::dec_dict_full::(&f.load_remaining_into_buffer()?)?; + // get our auth and sys stores + let mut auth_store = rkey( + &mut sysdb_data, + Self::SYS_KEY_AUTH, + DictEntryGeneric::into_dict, + )?; + let mut sys_store = rkey( + &mut sysdb_data, + Self::SYS_KEY_SYS, + DictEntryGeneric::into_dict, + )?; + // load auth store + let users = rkey( + &mut auth_store, + Self::SYS_KEY_AUTH_USERS, + DictEntryGeneric::into_dict, + )?; + // load users + let mut loaded_users = HashMap::new(); + for (username, userdata) in users { + let mut userdata = userdata + .into_data() + .and_then(Datacell::into_list) + .ok_or(StorageError::SysDBCorrupted)?; + if userdata.len() != 1 { + return Err(StorageError::SysDBCorrupted.into()); + } + let user_password = userdata + .remove(0) + .into_bin() + .ok_or(StorageError::SysDBCorrupted)?; + loaded_users.insert(username, SysAuthUser::new(user_password.into_boxed_slice())); + } + let sys_auth = SysAuth::new(loaded_users); + // load sys data + let sc = rkey(&mut sys_store, Self::SYS_KEY_SYS_STARTUP_COUNTER, |d| { + d.into_data()?.into_uint() + })?; + let sv = rkey(&mut sys_store, Self::SYS_KEY_SYS_SETTINGS_VERSION, |d| { + d.into_data()?.into_uint() + })?; + if !(sysdb_data.is_empty() + & auth_store.is_empty() + & sys_store.is_empty() + & sys_auth.users().contains_key(SysAuthUser::USER_ROOT)) + { + return Err(StorageError::SysDBCorrupted.into()); + } + Ok(SysConfig::new( + RwLock::new(sys_auth), + SysHostData::new(sc, sv as u32), + run_mode, + )) } - Ok(SysConfig::new( - RwLock::new(sys_auth), - SysHostData::new(sc, sv as u32), - run_mode, - )) } diff --git a/server/src/engine/storage/v1/tests.rs b/server/src/engine/storage/v1/tests.rs index c0d5fe7e..49700f1b 100644 --- a/server/src/engine/storage/v1/tests.rs +++ b/server/src/engine/storage/v1/tests.rs @@ -32,24 +32,19 @@ mod tx; mod sysdb { use { - super::{ - super::sysdb::{self, SystemStoreInitState}, - VirtualFS as VFS, + super::{super::sysdb::SystemStoreInitState, VirtualFS as VFS}, + crate::engine::{ + config::{AuthDriver, ConfigAuth, ConfigMode}, + fractal::sys_store::SystemStore, }, - crate::engine::config::{AuthDriver, ConfigAuth, ConfigMode}, }; fn open_sysdb( auth_config: ConfigAuth, sysdb_path: &str, sysdb_cow_path: &str, - ) -> sysdb::SystemStoreInit { - sysdb::open_or_reinit_system_database::( - auth_config, - ConfigMode::Dev, - sysdb_path, - sysdb_cow_path, - ) - .unwrap() + ) -> (SystemStore, SystemStoreInitState) { + SystemStore::::open_with_name(sysdb_path, sysdb_cow_path, auth_config, ConfigMode::Dev) + .unwrap() } #[test] fn open_close() { @@ -62,28 +57,28 @@ mod sysdb { }; let auth_config = ConfigAuth::new(AuthDriver::Pwd, "password12345678".into()); { - let config = open(auth_config.clone()); - assert_eq!(config.state, SystemStoreInitState::Created); + let (config, state) = open(auth_config.clone()); + assert_eq!(state, SystemStoreInitState::Created); assert!(config - .store + .system_store() .auth_data() .read() .verify_user("root", "password12345678") .is_ok()); - assert_eq!(config.store.host_data().settings_version(), 0); - assert_eq!(config.store.host_data().startup_counter(), 0); + assert_eq!(config.system_store().host_data().settings_version(), 0); + assert_eq!(config.system_store().host_data().startup_counter(), 0); } // reboot - let config = open(auth_config); - assert_eq!(config.state, SystemStoreInitState::Unchanged); + let (config, state) = open(auth_config); + assert_eq!(state, SystemStoreInitState::Unchanged); assert!(config - .store + .system_store() .auth_data() .read() .verify_user("root", "password12345678") .is_ok()); - assert_eq!(config.store.host_data().settings_version(), 0); - assert_eq!(config.store.host_data().startup_counter(), 1); + assert_eq!(config.system_store().host_data().settings_version(), 0); + assert_eq!(config.system_store().host_data().startup_counter(), 1); } #[test] fn open_change_root_password() { @@ -95,26 +90,26 @@ mod sysdb { ) }; { - let config = open(ConfigAuth::new(AuthDriver::Pwd, "password12345678".into())); - assert_eq!(config.state, SystemStoreInitState::Created); + let (config, state) = open(ConfigAuth::new(AuthDriver::Pwd, "password12345678".into())); + assert_eq!(state, SystemStoreInitState::Created); assert!(config - .store + .system_store() .auth_data() .read() .verify_user("root", "password12345678") .is_ok()); - assert_eq!(config.store.host_data().settings_version(), 0); - assert_eq!(config.store.host_data().startup_counter(), 0); + assert_eq!(config.system_store().host_data().settings_version(), 0); + assert_eq!(config.system_store().host_data().startup_counter(), 0); } - let config = open(ConfigAuth::new(AuthDriver::Pwd, "password23456789".into())); - assert_eq!(config.state, SystemStoreInitState::UpdatedRoot); + let (config, state) = open(ConfigAuth::new(AuthDriver::Pwd, "password23456789".into())); + assert_eq!(state, SystemStoreInitState::UpdatedRoot); assert!(config - .store + .system_store() .auth_data() .read() .verify_user("root", "password23456789") .is_ok()); - assert_eq!(config.store.host_data().settings_version(), 1); - assert_eq!(config.store.host_data().startup_counter(), 1); + assert_eq!(config.system_store().host_data().settings_version(), 1); + assert_eq!(config.system_store().host_data().startup_counter(), 1); } }