Improve error reporting infrastructure

This commit greatly improves the error reporting infrastructure
all across the `skyd` crate. Now, every error has far more
context making it easier to debug and test while also providing
helpful feedback to users. This feedback can be extremely helpful
to find out offending files and fix them; for example, by being
specific about which file is corrupted or has bad metadata.
next
Sayan Nandan 3 years ago
parent 624c7e5aaa
commit d57b0d98cf
No known key found for this signature in database
GPG Key ID: 8BC07A0A4D41DD52

@ -9,7 +9,11 @@ All changes in this project will be noted in this file.
- `INSPECT KEYSPACE` without arguments to inspect the current keyspace - `INSPECT KEYSPACE` without arguments to inspect the current keyspace
- `INSPECT TABLE` without arguments to inspect the current table - `INSPECT TABLE` without arguments to inspect the current table
- `AUTH WHOAMI` returns the AuthID of the currently logged in user - `AUTH WHOAMI` returns the AuthID of the currently logged in user
### Improvements
- Enable multiples values to be pushed into a list at once with `lmod push` - Enable multiples values to be pushed into a list at once with `lmod push`
- (skyd) Improved error reporting infrastructure with more details and context
### Fixes ### Fixes

@ -24,18 +24,20 @@
* *
*/ */
use crate::auth::AuthProvider; use crate::{
use crate::config::ConfigurationSet; auth::AuthProvider,
use crate::config::SnapshotConfig; config::{ConfigurationSet, SnapshotConfig, SnapshotPref},
use crate::config::SnapshotPref; corestore::Corestore,
use crate::corestore::Corestore; dbnet::{self, Terminator},
use crate::dbnet::{self, Terminator}; diskstore::flock::FileLock,
use crate::diskstore::flock::FileLock; services,
use crate::services; storage::v1::sengine::SnapshotEngine,
use crate::storage::v1::sengine::SnapshotEngine; util::{
use crate::util::os::TerminationSignal; error::{Error, SkyResult},
use std::sync::Arc; os::TerminationSignal,
use std::thread::sleep; },
};
use std::{sync::Arc, thread::sleep};
use tokio::{ use tokio::{
sync::{ sync::{
broadcast, broadcast,
@ -58,7 +60,7 @@ pub async fn run(
.. ..
}: ConfigurationSet, }: ConfigurationSet,
restore_filepath: Option<String>, restore_filepath: Option<String>,
) -> Result<Corestore, String> { ) -> SkyResult<Corestore> {
// Intialize the broadcast channel // Intialize the broadcast channel
let (signal, _) = broadcast::channel(1); let (signal, _) = broadcast::channel(1);
let engine = match &snapshot { let engine = match &snapshot {
@ -68,14 +70,11 @@ pub async fn run(
let engine = Arc::new(engine); let engine = Arc::new(engine);
// restore data // restore data
services::restore_data(restore_filepath) services::restore_data(restore_filepath)
.map_err(|e| format!("Failed to restore data from backup with error: {}", e))?; .map_err(|e| Error::ioerror_extra(e, "restoring data from backup"))?;
// init the store // init the store
let db = Corestore::init_with_snapcfg(engine.clone()) let db = Corestore::init_with_snapcfg(engine.clone())?;
.map_err(|e| format!("Error while initializing database: {}", e))?;
// refresh the snapshotengine state // refresh the snapshotengine state
engine engine.parse_dir()?;
.parse_dir()
.map_err(|e| format!("Failed to init snapshot engine: {}", e))?;
let auth_provider = match auth.origin_key { let auth_provider = match auth.origin_key {
Some(key) => { Some(key) => {
let authref = db.get_store().setup_auth(); let authref = db.get_store().setup_auth();
@ -101,7 +100,8 @@ pub async fn run(
let mut server = let mut server =
dbnet::connect(ports, maxcon, db.clone(), auth_provider, signal.clone()).await?; dbnet::connect(ports, maxcon, db.clone(), auth_provider, signal.clone()).await?;
let termsig = TerminationSignal::init().map_err(|e| e.to_string())?; let termsig =
TerminationSignal::init().map_err(|e| Error::ioerror_extra(e, "binding to signals"))?;
tokio::select! { tokio::select! {
_ = server.run_server() => {}, _ = server.run_server() => {},
_ = termsig => {} _ = termsig => {}

@ -202,6 +202,21 @@ impl PortConfig {
pub const fn secure_only(&self) -> bool { pub const fn secure_only(&self) -> bool {
matches!(self, Self::SecureOnly { .. }) matches!(self, Self::SecureOnly { .. })
} }
pub fn get_description(&self) -> String {
match self {
Self::Multi { host, port, ssl } => {
format!(
"skyhash://{host}:{port} and skyhash-secure://{host}:{tlsport}",
tlsport = ssl.get_port()
)
}
Self::SecureOnly {
host,
ssl: SslOpts { port, .. },
} => format!("skyhash-secure://{host}:{port}"),
Self::InsecureOnly { host, port } => format!("skyhash://{host}:{port}"),
}
}
} }
#[derive(Deserialize, Debug, PartialEq)] #[derive(Deserialize, Debug, PartialEq)]
@ -221,6 +236,9 @@ impl SslOpts {
passfile, passfile,
} }
} }
pub const fn get_port(&self) -> u16 {
self.port
}
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]

@ -32,9 +32,8 @@ use crate::corestore::{
use crate::queryengine::parser::{Entity, OwnedEntity}; use crate::queryengine::parser::{Entity, OwnedEntity};
use crate::registry; use crate::registry;
use crate::storage; use crate::storage;
use crate::storage::v1::sengine::SnapshotEngine; use crate::storage::v1::{error::StorageEngineResult, sengine::SnapshotEngine};
use crate::util::Unwrappable; use crate::util::Unwrappable;
use crate::IoResult;
use core::borrow::Borrow; use core::borrow::Borrow;
use core::hash::Hash; use core::hash::Hash;
pub use htable::Data; pub use htable::Data;
@ -103,7 +102,7 @@ pub struct Corestore {
impl Corestore { impl Corestore {
/// This is the only function you'll ever need to either create a new database instance /// This is the only function you'll ever need to either create a new database instance
/// or restore from an earlier instance /// or restore from an earlier instance
pub fn init_with_snapcfg(sengine: Arc<SnapshotEngine>) -> IoResult<Self> { pub fn init_with_snapcfg(sengine: Arc<SnapshotEngine>) -> StorageEngineResult<Self> {
let store = storage::unflush::read_full()?; let store = storage::unflush::read_full()?;
Ok(Self::default_with_store(store, sengine)) Ok(Self::default_with_store(store, sengine))
} }

@ -35,36 +35,32 @@
//! enables this connection object/type to use methods like read_query enabling it to read and interact with queries and write //! enables this connection object/type to use methods like read_query enabling it to read and interact with queries and write
//! respones in compliance with the Skyhash protocol. //! respones in compliance with the Skyhash protocol.
use super::tcp::Connection; use crate::{
use crate::actions::ActionError; actions::{ActionError, ActionResult},
use crate::actions::ActionResult; auth::{self, AuthProvider},
use crate::auth::{self, AuthProvider}; corestore::{buffers::Integer64, Corestore},
use crate::corestore::buffers::Integer64; dbnet::{
use crate::corestore::Corestore; connection::prelude::FutureResult,
use crate::dbnet::connection::prelude::FutureResult; tcp::{BufferedSocketStream, Connection},
use crate::dbnet::tcp::BufferedSocketStream; Terminator,
use crate::dbnet::Terminator; },
use crate::protocol; protocol::{self, responses, ParseError, Query},
use crate::protocol::responses; queryengine,
use crate::protocol::ParseError; resp::Writable,
use crate::protocol::Query; IoResult,
use crate::queryengine; };
use crate::resp::Writable; use bytes::{Buf, BytesMut};
use crate::IoResult; use std::{
use bytes::Buf; future::Future,
use bytes::BytesMut; io::{Error as IoError, ErrorKind},
use libsky::TResult; marker::PhantomData,
use std::future::Future; pin::Pin,
use std::io::Error as IoError; sync::Arc,
use std::io::ErrorKind; };
use std::marker::PhantomData; use tokio::{
use std::pin::Pin; io::{AsyncReadExt, AsyncWriteExt, BufWriter},
use std::sync::Arc; sync::{mpsc, Semaphore},
use tokio::io::AsyncReadExt; };
use tokio::io::AsyncWriteExt;
use tokio::io::BufWriter;
use tokio::sync::mpsc;
use tokio::sync::Semaphore;
pub const SIMPLE_QUERY_HEADER: [u8; 3] = [b'*', b'1', b'\n']; pub const SIMPLE_QUERY_HEADER: [u8; 3] = [b'*', b'1', b'\n'];
type QueryWithAdvance = (Query, usize); type QueryWithAdvance = (Query, usize);
@ -451,7 +447,7 @@ where
_marker: PhantomData, _marker: PhantomData,
} }
} }
pub async fn run(&mut self) -> TResult<()> { pub async fn run(&mut self) -> IoResult<()> {
while !self.terminator.is_termination_signal() { while !self.terminator.is_termination_signal() {
let try_df = tokio::select! { let try_df = tokio::select! {
tdf = self.con.read_query() => tdf, tdf = self.con.read_query() => tdf,
@ -469,7 +465,7 @@ where
self.con.close_conn_with_error(e).await?; self.con.close_conn_with_error(e).await?;
} }
Err(ActionError::IoError(e)) => { Err(ActionError::IoError(e)) => {
return Err(e.into()); return Err(e);
} }
} }
// this is only when we clear the buffer. since execute_query is not called // this is only when we clear the buffer. since execute_query is not called
@ -486,10 +482,10 @@ where
#[cfg(windows)] #[cfg(windows)]
Err(e) => match e.kind() { Err(e) => match e.kind() {
ErrorKind::ConnectionReset => return Ok(()), ErrorKind::ConnectionReset => return Ok(()),
_ => return Err(e.into()), _ => return Err(e),
}, },
#[cfg(not(windows))] #[cfg(not(windows))]
Err(e) => return Err(e.into()), Err(e) => return Err(e),
} }
} }
Ok(()) Ok(())

@ -40,17 +40,19 @@
//! //!
use self::tcp::Listener; use self::tcp::Listener;
use crate::auth::AuthProvider; use crate::{
use crate::config::PortConfig; auth::AuthProvider,
use crate::config::SslOpts; config::{PortConfig, SslOpts},
use crate::corestore::Corestore; corestore::Corestore,
use libsky::TResult; util::error::{Error, SkyResult},
use std::net::IpAddr; IoResult,
use std::sync::Arc; };
use std::{net::IpAddr, sync::Arc};
use tls::SslListener; use tls::SslListener;
use tokio::net::TcpListener; use tokio::{
use tokio::sync::Semaphore; net::TcpListener,
use tokio::sync::{broadcast, mpsc}; sync::{broadcast, mpsc, Semaphore},
};
pub mod connection; pub mod connection;
#[macro_use] #[macro_use]
mod macros; mod macros;
@ -117,11 +119,11 @@ impl BaseListener {
port: u16, port: u16,
semaphore: Arc<Semaphore>, semaphore: Arc<Semaphore>,
signal: broadcast::Sender<()>, signal: broadcast::Sender<()>,
) -> Result<Self, String> { ) -> SkyResult<Self> {
let (terminate_tx, terminate_rx) = mpsc::channel(1); let (terminate_tx, terminate_rx) = mpsc::channel(1);
let listener = TcpListener::bind((host, port)) let listener = TcpListener::bind((host, port))
.await .await
.map_err(|e| format!("Failed to bind to port {port} with error {e}"))?; .map_err(|e| Error::ioerror_extra(e, format!("binding to port {port}")))?;
Ok(Self { Ok(Self {
db: db.clone(), db: db.clone(),
auth, auth,
@ -145,18 +147,6 @@ impl BaseListener {
} }
} }
/// This macro returns the bind address of a listener
///
/// We were just very lazy, so we just used a macro instead of a member function
macro_rules! bindaddr {
($base:ident) => {
$base
.listener
.local_addr()
.map_err(|e| format!("Failed to get bind address: {}", e))?
};
}
/// Multiple Listener Interface /// Multiple Listener Interface
/// ///
/// A `MultiListener` is an abstraction over an `SslListener` or a `Listener` to facilitate /// A `MultiListener` is an abstraction over an `SslListener` or a `Listener` to facilitate
@ -176,48 +166,35 @@ pub enum MultiListener {
impl MultiListener { impl MultiListener {
/// Create a new `InsecureOnly` listener /// Create a new `InsecureOnly` listener
pub fn new_insecure_only(base: BaseListener) -> Result<Self, String> { pub fn new_insecure_only(base: BaseListener) -> Self {
log::info!("Server started on: skyhash://{}", bindaddr!(base)); MultiListener::InsecureOnly(Listener::new(base))
Ok(MultiListener::InsecureOnly(Listener::new(base)))
} }
/// Create a new `SecureOnly` listener /// Create a new `SecureOnly` listener
pub fn new_secure_only(base: BaseListener, ssl: SslOpts) -> Result<Self, String> { pub fn new_secure_only(base: BaseListener, ssl: SslOpts) -> SkyResult<Self> {
let bindaddr = bindaddr!(base); let listener =
let slf = MultiListener::SecureOnly( SslListener::new_pem_based_ssl_connection(ssl.key, ssl.chain, base, ssl.passfile)?;
SslListener::new_pem_based_ssl_connection(ssl.key, ssl.chain, base, ssl.passfile) Ok(MultiListener::SecureOnly(listener))
.map_err(|e| format!("Couldn't bind to secure port: {}", e))?,
);
log::info!("Server started on: skyhash-secure://{}", bindaddr);
Ok(slf)
} }
/// Create a new `Multi` listener that has both a secure and an insecure listener /// Create a new `Multi` listener that has both a secure and an insecure listener
pub async fn new_multi( pub async fn new_multi(
ssl_base_listener: BaseListener, ssl_base_listener: BaseListener,
tcp_base_listener: BaseListener, tcp_base_listener: BaseListener,
ssl: SslOpts, ssl: SslOpts,
) -> Result<Self, String> { ) -> SkyResult<Self> {
let sec_bindaddr = bindaddr!(ssl_base_listener);
let insec_binaddr = bindaddr!(tcp_base_listener);
let secure_listener = SslListener::new_pem_based_ssl_connection( let secure_listener = SslListener::new_pem_based_ssl_connection(
ssl.key, ssl.key,
ssl.chain, ssl.chain,
ssl_base_listener, ssl_base_listener,
ssl.passfile, ssl.passfile,
) )?;
.map_err(|e| format!("Couldn't bind to secure port: {}", e))?;
let insecure_listener = Listener::new(tcp_base_listener); let insecure_listener = Listener::new(tcp_base_listener);
log::info!(
"Server started on: skyhash://{} and skyhash-secure://{}",
insec_binaddr,
sec_bindaddr
);
Ok(MultiListener::Multi(insecure_listener, secure_listener)) Ok(MultiListener::Multi(insecure_listener, secure_listener))
} }
/// Start the server /// Start the server
/// ///
/// The running of single and/or parallel listeners is handled by this function by /// The running of single and/or parallel listeners is handled by this function by
/// exploiting the working of async functions /// exploiting the working of async functions
pub async fn run_server(&mut self) -> TResult<()> { pub async fn run_server(&mut self) -> IoResult<()> {
match self { match self {
MultiListener::SecureOnly(secure_listener) => secure_listener.run().await, MultiListener::SecureOnly(secure_listener) => secure_listener.run().await,
MultiListener::InsecureOnly(insecure_listener) => insecure_listener.run().await, MultiListener::InsecureOnly(insecure_listener) => insecure_listener.run().await,
@ -258,7 +235,7 @@ pub async fn connect(
db: Corestore, db: Corestore,
auth: AuthProvider, auth: AuthProvider,
signal: broadcast::Sender<()>, signal: broadcast::Sender<()>,
) -> Result<MultiListener, String> { ) -> SkyResult<MultiListener> {
let climit = Arc::new(Semaphore::new(maxcon)); let climit = Arc::new(Semaphore::new(maxcon));
let base_listener_init = |host, port| { let base_listener_init = |host, port| {
BaseListener::init( BaseListener::init(
@ -270,9 +247,10 @@ pub async fn connect(
signal.clone(), signal.clone(),
) )
}; };
let description = ports.get_description();
let server = match ports { let server = match ports {
PortConfig::InsecureOnly { host, port } => { PortConfig::InsecureOnly { host, port } => {
MultiListener::new_insecure_only(base_listener_init(host, port).await?)? MultiListener::new_insecure_only(base_listener_init(host, port).await?)
} }
PortConfig::SecureOnly { host, ssl } => { PortConfig::SecureOnly { host, ssl } => {
MultiListener::new_secure_only(base_listener_init(host, ssl.port).await?, ssl)? MultiListener::new_secure_only(base_listener_init(host, ssl.port).await?, ssl)?
@ -283,5 +261,6 @@ pub async fn connect(
MultiListener::new_multi(secure_listener, insecure_listener, ssl).await? MultiListener::new_multi(secure_listener, insecure_listener, ssl).await?
} }
}; };
log::info!("Server started on {}", description);
Ok(server) Ok(server)
} }

@ -24,22 +24,22 @@
* *
*/ */
use crate::dbnet::connection::ConnectionHandler; use crate::{
use crate::dbnet::connection::ExecutorFn; dbnet::{
use crate::dbnet::BaseListener; connection::{ConnectionHandler, ExecutorFn},
use crate::dbnet::Terminator; BaseListener, Terminator,
use crate::protocol; },
protocol, IoResult,
};
use bytes::BytesMut; use bytes::BytesMut;
use core::cell::Cell;
use libsky::TResult;
use libsky::BUF_CAP; use libsky::BUF_CAP;
pub use protocol::ParseResult; pub use protocol::{ParseResult, Query};
pub use protocol::Query; use std::{cell::Cell, time::Duration};
use std::time::Duration; use tokio::{
use tokio::io::AsyncWrite; io::{AsyncWrite, BufWriter},
use tokio::io::BufWriter; net::TcpStream,
use tokio::net::TcpStream; time,
use tokio::time; };
pub trait BufferedSocketStream: AsyncWrite {} pub trait BufferedSocketStream: AsyncWrite {}
@ -112,7 +112,7 @@ impl Listener {
} }
} }
/// Accept an incoming connection /// Accept an incoming connection
async fn accept(&mut self) -> TResult<TcpStream> { async fn accept(&mut self) -> IoResult<TcpStream> {
let backoff = TcpBackoff::new(); let backoff = TcpBackoff::new();
loop { loop {
match self.base.listener.accept().await { match self.base.listener.accept().await {
@ -121,7 +121,7 @@ impl Listener {
Err(e) => { Err(e) => {
if backoff.should_disconnect() { if backoff.should_disconnect() {
// Too many retries, goodbye user // Too many retries, goodbye user
return Err(e.into()); return Err(e);
} }
} }
} }
@ -130,7 +130,7 @@ impl Listener {
} }
} }
/// Run the server /// Run the server
pub async fn run(&mut self) -> TResult<()> { pub async fn run(&mut self) -> IoResult<()> {
loop { loop {
// Take the permit first, but we won't use it right now // Take the permit first, but we won't use it right now
// that's why we will forget it // that's why we will forget it

@ -24,20 +24,21 @@
* *
*/ */
use super::connection::ConnectionHandler; use crate::{
use super::tcp::TcpBackoff; dbnet::{
use crate::dbnet::connection::ExecutorFn; connection::{ConnectionHandler, ExecutorFn},
use crate::dbnet::tcp::BufferedSocketStream; tcp::{BufferedSocketStream, Connection, TcpBackoff},
use crate::dbnet::tcp::Connection; BaseListener, Terminator,
use crate::dbnet::BaseListener; },
use crate::dbnet::Terminator; util::error::{Error, SkyResult},
use libsky::TResult; IoResult,
use openssl::pkey::PKey; };
use openssl::rsa::Rsa; use openssl::{
use openssl::ssl::{Ssl, SslAcceptor, SslFiletype, SslMethod}; pkey::PKey,
use std::fs; rsa::Rsa,
use std::io::Error as IoError; ssl::{Ssl, SslAcceptor, SslFiletype, SslMethod},
use std::pin::Pin; };
use std::{fs, pin::Pin};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_openssl::SslStream; use tokio_openssl::SslStream;
@ -56,22 +57,17 @@ impl SslListener {
chain_file: String, chain_file: String,
base: BaseListener, base: BaseListener,
tls_passfile: Option<String>, tls_passfile: Option<String>,
) -> TResult<Self> { ) -> SkyResult<Self> {
let mut acceptor_builder = SslAcceptor::mozilla_intermediate(SslMethod::tls())?; let mut acceptor_builder = SslAcceptor::mozilla_intermediate(SslMethod::tls())?;
// cert is the same for both // cert is the same for both
acceptor_builder.set_certificate_chain_file(chain_file)?; acceptor_builder.set_certificate_chain_file(chain_file)?;
if let Some(tls_passfile) = tls_passfile { if let Some(tls_passfile) = tls_passfile {
// first read in the private key // first read in the private key
let tls_private_key = fs::read(key_file).map_err(|e: IoError| { let tls_private_key = fs::read(key_file)
format!("Failed to read TLS private key file with error: {}", e) .map_err(|e| Error::ioerror_extra(e, "reading TLS private key"))?;
})?;
// read the passphrase because the passphrase file stream was provided // read the passphrase because the passphrase file stream was provided
let tls_keyfile_stream = fs::read(tls_passfile).map_err(|e: IoError| { let tls_keyfile_stream = fs::read(tls_passfile)
format!( .map_err(|e| Error::ioerror_extra(e, "reading TLS password file"))?;
"Failed to read TLS private key passphrase file with error: {}",
e
)
})?;
// decrypt the private key // decrypt the private key
let pkey = Rsa::private_key_from_pem_passphrase(&tls_private_key, &tls_keyfile_stream)?; let pkey = Rsa::private_key_from_pem_passphrase(&tls_private_key, &tls_keyfile_stream)?;
let pkey = PKey::from_rsa(pkey)?; let pkey = PKey::from_rsa(pkey)?;
@ -91,7 +87,7 @@ impl SslListener {
base, base,
}) })
} }
async fn accept(&mut self) -> TResult<SslStream<TcpStream>> { async fn accept(&mut self) -> SkyResult<SslStream<TcpStream>> {
let backoff = TcpBackoff::new(); let backoff = TcpBackoff::new();
loop { loop {
match self.base.listener.accept().await { match self.base.listener.accept().await {
@ -115,7 +111,7 @@ impl SslListener {
backoff.spin().await; backoff.spin().await;
} }
} }
pub async fn run(&mut self) -> TResult<()> { pub async fn run(&mut self) -> IoResult<()> {
loop { loop {
// Take the permit first, but we won't use it right now // Take the permit first, but we won't use it right now
// that's why we will forget it // that's why we will forget it
@ -139,7 +135,6 @@ impl SslListener {
self.base.terminate_tx.clone(), self.base.terminate_tx.clone(),
); );
tokio::spawn(async move { tokio::spawn(async move {
log::debug!("Spawned listener task");
if let Err(e) = sslhandle.run().await { if let Err(e) = sslhandle.run().await {
log::error!("Error: {}", e); log::error!("Error: {}", e);
} }

@ -104,8 +104,7 @@ fn main() {
// important: create the pid_file just here and nowhere else because check_args can also // important: create the pid_file just here and nowhere else because check_args can also
// involve passing --help or wrong arguments which can falsely create a PID file // involve passing --help or wrong arguments which can falsely create a PID file
let pid_file = run_pre_startup_tasks(); let pid_file = run_pre_startup_tasks();
let db: Result<corestore::Corestore, String> = let db = runtime.block_on(async move { arbiter::run(cfg, restore_file).await });
runtime.block_on(async move { arbiter::run(cfg, restore_file).await });
// Make sure all background workers terminate // Make sure all background workers terminate
drop(runtime); drop(runtime);
let db = match db { let db = match db {

@ -24,12 +24,14 @@
* *
*/ */
use crate::config::BGSave; use crate::{
use crate::corestore::Corestore; config::BGSave,
use crate::dbnet::Terminator; corestore::Corestore,
use crate::registry; dbnet::Terminator,
use crate::storage::{self, v1::flush::Autoflush}; registry,
use libsky::TResult; storage::{self, v1::flush::Autoflush},
IoResult,
};
use tokio::time::{self, Duration}; use tokio::time::{self, Duration};
/// The bgsave_scheduler calls the bgsave task in `Corestore` after `every` seconds /// The bgsave_scheduler calls the bgsave task in `Corestore` after `every` seconds
@ -73,8 +75,8 @@ pub async fn bgsave_scheduler(handle: Corestore, bgsave_cfg: BGSave, mut termina
/// Run bgsave /// Run bgsave
/// ///
/// This function just hides away the BGSAVE blocking section from the _public API_ /// This function just hides away the BGSAVE blocking section from the _public API_
pub fn run_bgsave(handle: &Corestore) -> TResult<()> { pub fn run_bgsave(handle: &Corestore) -> IoResult<()> {
storage::v1::flush::flush_full(Autoflush, handle.get_store()).map_err(|e| e.into()) storage::v1::flush::flush_full(Autoflush, handle.get_store())
} }
/// This just wraps around [`_bgsave_blocking_section`] and prints nice log messages depending on the outcome /// This just wraps around [`_bgsave_blocking_section`] and prints nice log messages depending on the outcome

@ -68,9 +68,8 @@ in practice).
pub mod v1; pub mod v1;
pub mod unflush { pub mod unflush {
use crate::corestore::memstore::Memstore; use crate::{corestore::memstore::Memstore, storage::v1::error::StorageEngineResult};
use crate::IoResult; pub fn read_full() -> StorageEngineResult<Memstore> {
pub fn read_full() -> IoResult<Memstore> {
super::v1::unflush::read_full() super::v1::unflush::read_full()
} }
} }

@ -0,0 +1,92 @@
/*
* Created on Sat Mar 26 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::corestore::memstore::ObjectID;
use core::fmt;
use std::io::Error as IoError;
pub type StorageEngineResult<T> = Result<T, StorageEngineError>;
pub trait ErrorContext<T> {
/// Provide some context to an error
fn map_err_context(self, extra: impl ToString) -> StorageEngineResult<T>;
}
impl<T> ErrorContext<T> for Result<T, IoError> {
fn map_err_context(self, extra: impl ToString) -> StorageEngineResult<T> {
self.map_err(|e| StorageEngineError::ioerror_extra(e, extra.to_string()))
}
}
#[derive(Debug)]
pub enum StorageEngineError {
/// An I/O Error
IoError(IoError),
/// An I/O Error with extra context
IoErrorExtra(IoError, String),
/// A corrupted file
CorruptedFile(String),
/// The file contains bad metadata
BadMetadata(String),
}
impl StorageEngineError {
pub fn corrupted_partmap(ksid: &ObjectID) -> Self {
Self::CorruptedFile(format!("{ksid}/PARTMAP", ksid = unsafe { ksid.as_str() }))
}
pub fn bad_metadata_in_table(ksid: &ObjectID, table: &ObjectID) -> Self {
unsafe {
Self::CorruptedFile(format!(
"{ksid}/{table}",
ksid = ksid.as_str(),
table = table.as_str()
))
}
}
pub fn corrupted_preload() -> Self {
Self::CorruptedFile("PRELOAD".into())
}
pub fn ioerror_extra(ioe: IoError, extra: impl ToString) -> Self {
Self::IoErrorExtra(ioe, extra.to_string())
}
}
impl From<IoError> for StorageEngineError {
fn from(ioe: IoError) -> Self {
Self::IoError(ioe)
}
}
impl fmt::Display for StorageEngineError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::IoError(ioe) => write!(f, "I/O error: {}", ioe),
Self::IoErrorExtra(ioe, extra) => write!(f, "I/O error while {extra}: {ioe}"),
Self::CorruptedFile(cfile) => write!(f, "file `{cfile}` is corrupted"),
Self::BadMetadata(file) => write!(f, "bad metadata in file `{file}`"),
}
}
}

@ -99,13 +99,6 @@ macro_rules! concat_str {
}}}; }}};
} }
#[macro_export]
macro_rules! bad_data {
() => {
std::io::Error::from(std::io::ErrorKind::InvalidData)
};
}
macro_rules! read_dir_to_col { macro_rules! read_dir_to_col {
($root:expr) => { ($root:expr) => {
std::fs::read_dir($root)? std::fs::read_dir($root)?

@ -64,6 +64,7 @@ use std::io::Write;
mod macros; mod macros;
// endof do not mess // endof do not mess
pub mod bytemarks; pub mod bytemarks;
pub mod error;
pub mod flush; pub mod flush;
pub mod interface; pub mod interface;
pub mod iter; pub mod iter;

@ -34,12 +34,11 @@
use crate::corestore::memstore::Memstore; use crate::corestore::memstore::Memstore;
use crate::corestore::memstore::ObjectID; use crate::corestore::memstore::ObjectID;
use crate::storage::v1::error::{StorageEngineError, StorageEngineResult};
use crate::IoResult; use crate::IoResult;
use core::ptr; use core::ptr;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Write; use std::io::Write;
pub type LoadedPartfile = HashMap<ObjectID, (u8, u8)>; pub type LoadedPartfile = HashMap<ObjectID, (u8, u8)>;
@ -70,10 +69,10 @@ pub(super) fn raw_generate_preload<W: Write>(w: &mut W, store: &Memstore) -> IoR
} }
/// Reads the preload file and returns a set /// Reads the preload file and returns a set
pub(super) fn read_preload_raw(preload: Vec<u8>) -> IoResult<HashSet<ObjectID>> { pub(super) fn read_preload_raw(preload: Vec<u8>) -> StorageEngineResult<HashSet<ObjectID>> {
if preload.len() < 16 { if preload.len() < 16 {
// nah, this is a bad disk file // nah, this is a bad disk file
return Err(IoError::from(ErrorKind::UnexpectedEof)); return Err(StorageEngineError::corrupted_preload());
} }
// first read in the meta segment // first read in the meta segment
unsafe { unsafe {
@ -85,21 +84,10 @@ pub(super) fn read_preload_raw(preload: Vec<u8>) -> IoResult<HashSet<ObjectID>>
META_SEGMENT_LE => { META_SEGMENT_LE => {
super::iter::endian_set_little(); super::iter::endian_set_little();
} }
_ => return Err(IoError::from(ErrorKind::Unsupported)), _ => return Err(StorageEngineError::BadMetadata("preload".into())),
} }
} }
// all checks complete; time to decode // all checks complete; time to decode
let ret = super::de::deserialize_set_ctype(&preload[1..]); super::de::deserialize_set_ctype(&preload[1..])
match ret { .ok_or_else(StorageEngineError::corrupted_preload)
Some(ret) => Ok(ret),
_ => Err(IoError::from(ErrorKind::InvalidData)),
}
}
/// Reads the partfile and returns a set
pub fn read_partfile_raw(partfile: Vec<u8>) -> IoResult<LoadedPartfile> {
match super::de::deserialize_set_ctype_bytemark(&partfile) {
Some(s) => Ok(s),
None => Err(IoError::from(ErrorKind::InvalidData)),
}
} }

@ -51,6 +51,7 @@ pub static SNAP_MATCH: Lazy<Regex, fn() -> Regex> = Lazy::new(|| {
Regex::new("^\\d{4}(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01])(-)(?:(?:([01]?\\d|2[0-3]))?([0-5]?\\d))?([0-5]?\\d)$").unwrap() Regex::new("^\\d{4}(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01])(-)(?:(?:([01]?\\d|2[0-3]))?([0-5]?\\d))?([0-5]?\\d)$").unwrap()
}); });
#[derive(Debug)]
pub enum SnapshotEngineError { pub enum SnapshotEngineError {
Io(IoError), Io(IoError),
Engine(&'static str), Engine(&'static str),

@ -29,26 +29,23 @@
//! Routines for unflushing data //! Routines for unflushing data
use super::bytemarks; use super::bytemarks;
use crate::corestore::memstore::Keyspace; use crate::{
use crate::corestore::memstore::Memstore; corestore::{
use crate::corestore::memstore::ObjectID; memstore::{Keyspace, Memstore, ObjectID, SystemKeyspace, SYSTEM},
use crate::corestore::memstore::SystemKeyspace; table::{SystemTable, Table},
use crate::corestore::memstore::SYSTEM; },
use crate::corestore::table::SystemTable; storage::v1::{
use crate::corestore::table::Table; de::DeserializeInto,
use crate::storage::v1::de::DeserializeInto; error::{ErrorContext, StorageEngineError, StorageEngineResult},
use crate::storage::v1::flush::Autoflush; flush::Autoflush,
use crate::storage::v1::interface::DIR_KSROOT; interface::DIR_KSROOT,
use crate::storage::v1::preload::LoadedPartfile; preload::LoadedPartfile,
use crate::storage::v1::Coremap; Coremap,
use crate::util::Wrapper; },
use crate::IoResult; util::Wrapper,
};
use core::mem::transmute; use core::mem::transmute;
use std::fs; use std::{fs, path::Path, sync::Arc};
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::path::Path;
use std::sync::Arc;
type PreloadSet = std::collections::HashSet<ObjectID>; type PreloadSet = std::collections::HashSet<ObjectID>;
const PRELOAD_PATH: &str = "data/ks/PRELOAD"; const PRELOAD_PATH: &str = "data/ks/PRELOAD";
@ -56,15 +53,15 @@ const PRELOAD_PATH: &str = "data/ks/PRELOAD";
/// A keyspace that can be restored from disk storage /// A keyspace that can be restored from disk storage
pub trait UnflushableKeyspace: Sized { pub trait UnflushableKeyspace: Sized {
/// Unflush routine for a keyspace /// Unflush routine for a keyspace
fn unflush_keyspace(partmap: LoadedPartfile, ksid: &ObjectID) -> IoResult<Self>; fn unflush_keyspace(partmap: LoadedPartfile, ksid: &ObjectID) -> StorageEngineResult<Self>;
} }
impl UnflushableKeyspace for Keyspace { impl UnflushableKeyspace for Keyspace {
fn unflush_keyspace(partmap: LoadedPartfile, ksid: &ObjectID) -> IoResult<Self> { fn unflush_keyspace(partmap: LoadedPartfile, ksid: &ObjectID) -> StorageEngineResult<Self> {
let ks: Coremap<ObjectID, Arc<Table>> = Coremap::with_capacity(partmap.len()); let ks: Coremap<ObjectID, Arc<Table>> = Coremap::with_capacity(partmap.len());
for (tableid, (table_storage_type, model_code)) in partmap.into_iter() { for (tableid, (table_storage_type, model_code)) in partmap.into_iter() {
if table_storage_type > 1 { if table_storage_type > 1 {
return Err(bad_data!()); return Err(StorageEngineError::bad_metadata_in_table(ksid, &tableid));
} }
let is_volatile = table_storage_type == bytemarks::BYTEMARK_STORAGE_VOLATILE; let is_volatile = table_storage_type == bytemarks::BYTEMARK_STORAGE_VOLATILE;
let tbl = self::read_table::<Table>(ksid, &tableid, is_volatile, model_code)?; let tbl = self::read_table::<Table>(ksid, &tableid, is_volatile, model_code)?;
@ -75,11 +72,11 @@ impl UnflushableKeyspace for Keyspace {
} }
impl UnflushableKeyspace for SystemKeyspace { impl UnflushableKeyspace for SystemKeyspace {
fn unflush_keyspace(partmap: LoadedPartfile, ksid: &ObjectID) -> IoResult<Self> { fn unflush_keyspace(partmap: LoadedPartfile, ksid: &ObjectID) -> StorageEngineResult<Self> {
let ks: Coremap<ObjectID, Wrapper<SystemTable>> = Coremap::with_capacity(partmap.len()); let ks: Coremap<ObjectID, Wrapper<SystemTable>> = Coremap::with_capacity(partmap.len());
for (tableid, (table_storage_type, model_code)) in partmap.into_iter() { for (tableid, (table_storage_type, model_code)) in partmap.into_iter() {
if table_storage_type > 1 { if table_storage_type > 1 {
return Err(bad_data!()); return Err(StorageEngineError::bad_metadata_in_table(ksid, &tableid));
} }
let is_volatile = table_storage_type == bytemarks::BYTEMARK_STORAGE_VOLATILE; let is_volatile = table_storage_type == bytemarks::BYTEMARK_STORAGE_VOLATILE;
let tbl = self::read_table::<SystemTable>(ksid, &tableid, is_volatile, model_code)?; let tbl = self::read_table::<SystemTable>(ksid, &tableid, is_volatile, model_code)?;
@ -92,12 +89,20 @@ impl UnflushableKeyspace for SystemKeyspace {
/// Tables that can be restored from disk storage /// Tables that can be restored from disk storage
pub trait UnflushableTable: Sized { pub trait UnflushableTable: Sized {
/// Procedure to restore (deserialize) table from disk storage /// Procedure to restore (deserialize) table from disk storage
fn unflush_table(filepath: impl AsRef<Path>, model_code: u8, volatile: bool) -> IoResult<Self>; fn unflush_table(
filepath: impl AsRef<Path>,
model_code: u8,
volatile: bool,
) -> StorageEngineResult<Self>;
} }
#[allow(clippy::transmute_int_to_bool)] #[allow(clippy::transmute_int_to_bool)]
impl UnflushableTable for Table { impl UnflushableTable for Table {
fn unflush_table(filepath: impl AsRef<Path>, model_code: u8, volatile: bool) -> IoResult<Self> { fn unflush_table(
filepath: impl AsRef<Path>,
model_code: u8,
volatile: bool,
) -> StorageEngineResult<Self> {
let ret = match model_code { let ret = match model_code {
// pure KVEBlob: [0, 3] // pure KVEBlob: [0, 3]
x if x < 4 => { x if x < 4 => {
@ -122,32 +127,50 @@ impl UnflushableTable for Table {
}; };
Table::new_kve_listmap_with_data(data, volatile, k_enc, v_enc) Table::new_kve_listmap_with_data(data, volatile, k_enc, v_enc)
} }
_ => return Err(IoError::from(ErrorKind::Unsupported)), _ => {
return Err(StorageEngineError::BadMetadata(
filepath.as_ref().to_string_lossy().to_string(),
))
}
}; };
Ok(ret) Ok(ret)
} }
} }
impl UnflushableTable for SystemTable { impl UnflushableTable for SystemTable {
fn unflush_table(filepath: impl AsRef<Path>, model_code: u8, volatile: bool) -> IoResult<Self> { fn unflush_table(
filepath: impl AsRef<Path>,
model_code: u8,
volatile: bool,
) -> StorageEngineResult<Self> {
match model_code { match model_code {
0 => { 0 => {
// this is the authmap // this is the authmap
let authmap = decode(filepath, volatile)?; let authmap = decode(filepath, volatile)?;
Ok(SystemTable::new_auth(Arc::new(authmap))) Ok(SystemTable::new_auth(Arc::new(authmap)))
} }
_ => Err(IoError::from(ErrorKind::Unsupported)), _ => Err(StorageEngineError::BadMetadata(
filepath.as_ref().to_string_lossy().to_string(),
)),
} }
} }
} }
#[inline(always)] #[inline(always)]
fn decode<T: DeserializeInto>(filepath: impl AsRef<Path>, volatile: bool) -> IoResult<T> { fn decode<T: DeserializeInto>(
filepath: impl AsRef<Path>,
volatile: bool,
) -> StorageEngineResult<T> {
if volatile { if volatile {
Ok(T::new_empty()) Ok(T::new_empty())
} else { } else {
let data = fs::read(filepath)?; let data = fs::read(filepath.as_ref()).map_err_context(format!(
super::de::deserialize_into(&data).ok_or_else(|| bad_data!()) "reading file {}",
filepath.as_ref().to_string_lossy()
))?;
super::de::deserialize_into(&data).ok_or_else(|| {
StorageEngineError::CorruptedFile(filepath.as_ref().to_string_lossy().to_string())
})
} }
} }
@ -160,27 +183,31 @@ pub fn read_table<T: UnflushableTable>(
tblid: &ObjectID, tblid: &ObjectID,
volatile: bool, volatile: bool,
model_code: u8, model_code: u8,
) -> IoResult<T> { ) -> StorageEngineResult<T> {
let filepath = unsafe { concat_path!(DIR_KSROOT, ksid.as_str(), tblid.as_str()) }; let filepath = unsafe { concat_path!(DIR_KSROOT, ksid.as_str(), tblid.as_str()) };
let tbl = T::unflush_table(filepath, model_code, volatile)?; let tbl = T::unflush_table(filepath, model_code, volatile)?;
Ok(tbl) Ok(tbl)
} }
/// Read an entire keyspace into a Coremap. You'll need to initialize the rest /// Read an entire keyspace into a Coremap. You'll need to initialize the rest
pub fn read_keyspace<K: UnflushableKeyspace>(ksid: &ObjectID) -> IoResult<K> { pub fn read_keyspace<K: UnflushableKeyspace>(ksid: &ObjectID) -> StorageEngineResult<K> {
let partmap = self::read_partmap(ksid)?; let partmap = self::read_partmap(ksid)?;
K::unflush_keyspace(partmap, ksid) K::unflush_keyspace(partmap, ksid)
} }
/// Read the `PARTMAP` for a given keyspace /// Read the `PARTMAP` for a given keyspace
pub fn read_partmap(ksid: &ObjectID) -> IoResult<LoadedPartfile> { pub fn read_partmap(ksid: &ObjectID) -> StorageEngineResult<LoadedPartfile> {
let filepath = unsafe { concat_path!(DIR_KSROOT, ksid.as_str(), "PARTMAP") }; let ksid_str = unsafe { ksid.as_str() };
super::preload::read_partfile_raw(fs::read(filepath)?) let filepath = concat_path!(DIR_KSROOT, ksid_str, "PARTMAP");
let partmap_raw = fs::read(&filepath)
.map_err_context(format!("while reading {}", filepath.to_string_lossy()))?;
super::de::deserialize_set_ctype_bytemark(&partmap_raw)
.ok_or_else(|| StorageEngineError::corrupted_partmap(ksid))
} }
/// Read the `PRELOAD` /// Read the `PRELOAD`
pub fn read_preload() -> IoResult<PreloadSet> { pub fn read_preload() -> StorageEngineResult<PreloadSet> {
let read = fs::read(PRELOAD_PATH)?; let read = fs::read(PRELOAD_PATH).map_err_context("reading PRELOAD")?;
super::preload::read_preload_raw(read) super::preload::read_preload_raw(read)
} }
@ -189,7 +216,7 @@ pub fn read_preload() -> IoResult<PreloadSet> {
/// If this is a new instance an empty store is returned while the directory tree /// If this is a new instance an empty store is returned while the directory tree
/// is also created. If this is an already initialized instance then the store /// is also created. If this is an already initialized instance then the store
/// is read and returned (and any possible errors that are encountered are returned) /// is read and returned (and any possible errors that are encountered are returned)
pub fn read_full() -> IoResult<Memstore> { pub fn read_full() -> StorageEngineResult<Memstore> {
if is_new_instance() { if is_new_instance() {
log::trace!("Detected new instance. Creating data directory"); log::trace!("Detected new instance. Creating data directory");
/* /*

@ -0,0 +1,90 @@
/*
* Created on Sat Mar 26 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::storage::v1::{error::StorageEngineError, sengine::SnapshotEngineError};
use openssl::{error::ErrorStack as SslErrorStack, ssl::Error as SslError};
use std::{fmt, io::Error as IoError};
pub type SkyResult<T> = Result<T, Error>;
#[derive(Debug)]
pub enum Error {
Storage(StorageEngineError),
IoError(IoError),
IoErrorExtra(IoError, String),
OtherError(String),
TlsError(SslError),
SnapshotEngineError(SnapshotEngineError),
}
impl Error {
pub fn ioerror_extra(ioe: IoError, extra: impl ToString) -> Self {
Self::IoErrorExtra(ioe, extra.to_string())
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Storage(serr) => write!(f, "Storage engine error: {}", serr),
Self::IoError(nerr) => write!(f, "I/O error: {}", nerr),
Self::IoErrorExtra(ioe, extra) => write!(f, "I/O error while {extra}: {ioe}"),
Self::OtherError(oerr) => write!(f, "Error: {}", oerr),
Self::TlsError(terr) => write!(f, "TLS error: {}", terr),
Self::SnapshotEngineError(snaperr) => write!(f, "Snapshot engine error: {snaperr}"),
}
}
}
impl From<IoError> for Error {
fn from(ioe: IoError) -> Self {
Self::IoError(ioe)
}
}
impl From<StorageEngineError> for Error {
fn from(see: StorageEngineError) -> Self {
Self::Storage(see)
}
}
impl From<SslError> for Error {
fn from(sslerr: SslError) -> Self {
Self::TlsError(sslerr)
}
}
impl From<SslErrorStack> for Error {
fn from(estack: SslErrorStack) -> Self {
Self::TlsError(estack.into())
}
}
impl From<SnapshotEngineError> for Error {
fn from(snaperr: SnapshotEngineError) -> Self {
Self::SnapshotEngineError(snaperr)
}
}

@ -28,6 +28,7 @@
mod macros; mod macros;
pub mod compiler; pub mod compiler;
pub mod os; pub mod os;
pub mod error;
use crate::actions::{ActionError, ActionResult}; use crate::actions::{ActionError, ActionResult};
use crate::protocol::responses::groups; use crate::protocol::responses::groups;
use core::fmt::Debug; use core::fmt::Debug;

Loading…
Cancel
Save