diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index 88f70cb7..c11617dd 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -130,9 +130,9 @@ pub struct ConfigKeySnapshot { /// pub struct SnapshotPref { /// Capture a snapshot `every` seconds - every: u64, + pub every: u64, /// The maximum numeber of snapshots to be kept - atmost: usize, + pub atmost: usize, } impl SnapshotPref { diff --git a/server/src/coredb/mod.rs b/server/src/coredb/mod.rs index a4a1f692..7f72d0bf 100644 --- a/server/src/coredb/mod.rs +++ b/server/src/coredb/mod.rs @@ -23,6 +23,7 @@ use crate::config::BGSave; use crate::config::SnapshotConfig; +use crate::config::SnapshotPref; use crate::diskstore; use crate::protocol::Connection; use crate::protocol::Query; @@ -55,6 +56,11 @@ pub struct CoreDB { /// This should **not be changed** during runtime, and should only be initialized when `CoreDB` /// is first initialized background_tasks: usize, + /// The number of snapshots that are to be kept at the most + /// + /// If this is set to Some(0), then all the snapshots will be kept. Otherwise, if it is set to + /// Some(n), n ∈ Z+ — then _n_ snapshots will be kept at the maximum. If set to `None`, snapshotting is disabled. + pub snap_count: Option, } /// A shared _state_ @@ -150,6 +156,11 @@ impl CoreDB { } } + /// Check if snapshotting is enabled + pub fn is_snapshot_enabled(&self) -> bool { + self.snap_count.is_some() + } + /// Returns the expected `Arc::strong_count` for the `CoreDB` object pub const fn expected_strong_count(&self) -> usize { self.background_tasks + 1 @@ -172,8 +183,15 @@ impl CoreDB { /// If it is - it restores the data. Otherwise it creates a new in-memory table pub fn new(bgsave: BGSave, snapshot_cfg: SnapshotConfig) -> TResult { let coretable = diskstore::get_saved(Some(PERSIST_FILE.to_path_buf()))?; - let background_tasks: usize = - snapshot_cfg.is_enabled() as usize + !bgsave.is_disabled() as usize; + let mut background_tasks: usize = 0; + if !bgsave.is_disabled() { + background_tasks += 1; + } + let mut snap_count = None; + if let SnapshotConfig::Enabled(SnapshotPref { every: _, atmost }) = snapshot_cfg { + background_tasks += 1; + snap_count = Some(atmost); + } let db = if let Some(coretable) = coretable { CoreDB { shared: Arc::new(Shared { @@ -185,9 +203,10 @@ impl CoreDB { snapshot_service: Notify::new(), }), background_tasks, + snap_count, } } else { - CoreDB::new_empty(background_tasks) + CoreDB::new_empty(background_tasks, snap_count) }; // Spawn the background save task in a separate task tokio::spawn(diskstore::bgsave_scheduler(db.clone(), bgsave)); @@ -199,7 +218,7 @@ impl CoreDB { Ok(db) } /// Create an empty in-memory table - pub fn new_empty(background_tasks: usize) -> Self { + pub fn new_empty(background_tasks: usize, snap_count: Option) -> Self { CoreDB { shared: Arc::new(Shared { bgsave_task: Notify::new(), @@ -210,6 +229,7 @@ impl CoreDB { snapshot_service: Notify::new(), }), background_tasks, + snap_count, } } /// Acquire a write lock diff --git a/server/src/diskstore/snapshot.rs b/server/src/diskstore/snapshot.rs index 2955610b..3538d0d1 100644 --- a/server/src/diskstore/snapshot.rs +++ b/server/src/diskstore/snapshot.rs @@ -142,18 +142,21 @@ impl<'a> SnapshotEngine<'a> { Utc::now().format("%Y%m%d-%H%M%S.snapshot").to_string() } /// Create a snapshot - pub fn mksnap(&mut self) -> bool { + /// + /// This returns `Some(true)` if everything went well, otherwise it returns + /// `Some(false)`. If the database is about to terminate, it returns `None` + pub fn mksnap(&mut self) -> Option { let rlock = self.dbref.acquire_read(); if rlock.terminate { // The database is shutting down, don't create a snapshot - return false; + return None; } let mut snapname = PathBuf::new(); snapname.push(&self.snap_dir); snapname.push(self.get_snapname()); if let Err(e) = diskstore::flush_data(&snapname, &rlock.get_ref()) { log::error!("Snapshotting failed with error: '{}'", e); - return true; + return Some(false); } else { log::info!("Successfully created snapshot"); } @@ -166,11 +169,12 @@ impl<'a> SnapshotEngine<'a> { old_snapshot.to_string_lossy(), e ); + return Some(false); } else { log::info!("Successfully removed old snapshot"); } } - true + Some(true) } /// Delete all snapshots pub fn clearall(&mut self) -> TResult<()> { @@ -188,7 +192,7 @@ impl<'a> SnapshotEngine<'a> { #[test] fn test_snapshot() { let ourdir = "TEST_SS"; - let db = CoreDB::new_empty(3); + let db = CoreDB::new_empty(3, None); let mut write = db.acquire_write(); let _ = write.get_mut_ref().insert( String::from("ohhey"), @@ -210,16 +214,16 @@ fn test_snapshot() { #[test] fn test_pre_existing_snapshots() { let ourdir = "TEST_PX_SS"; - let db = CoreDB::new_empty(3); + let db = CoreDB::new_empty(3, None); let mut snapengine = SnapshotEngine::new(4, &db, Some(ourdir)).unwrap(); // Keep sleeping to ensure the time difference - assert!(snapengine.mksnap()); + assert!(snapengine.mksnap().unwrap().eq(&true)); std::thread::sleep(Duration::from_secs(2)); - assert!(snapengine.mksnap()); + assert!(snapengine.mksnap().unwrap().eq(&true)); std::thread::sleep(Duration::from_secs(2)); - assert!(snapengine.mksnap()); + assert!(snapengine.mksnap().unwrap().eq(&true)); std::thread::sleep(Duration::from_secs(2)); - assert!(snapengine.mksnap()); + assert!(snapengine.mksnap().unwrap().eq(&true)); // Now close everything down drop(snapengine); let mut snapengine = SnapshotEngine::new(4, &db, Some(ourdir)).unwrap(); @@ -261,7 +265,7 @@ pub async fn snapshot_service(handle: CoreDB, ss_config: SnapshotConfig) { } }; while !handle.shared.is_termsig() { - if sengine.mksnap() { + if sengine.mksnap().is_some() { tokio::select! { _ = time::delay_until(time::Instant::now() + duration) => {}, _ = handle.shared.bgsave_task.notified() => {} diff --git a/server/src/kvengine/mksnap.rs b/server/src/kvengine/mksnap.rs index 949c7dc1..1cd834b9 100644 --- a/server/src/kvengine/mksnap.rs +++ b/server/src/kvengine/mksnap.rs @@ -20,14 +20,71 @@ */ use crate::coredb::CoreDB; +use crate::diskstore::snapshot::SnapshotEngine; use crate::protocol::{responses, ActionGroup, Connection}; use crate::resp::GroupBegin; +use libtdb::terrapipe::RespCodes; use libtdb::TResult; +use std::hint::unreachable_unchecked; /// Create a snapshot /// -/// If there is a second argument: a separate snapshot is created. Otherwise -/// if the action is just `MKSNAP`, then the `maxtop` parameter is adhered to pub async fn mksnap(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { - todo!() + if !handle.is_snapshot_enabled() { + // Since snapshotting is disabled, we can't create a snapshot! + // We'll just return an error returning the same + let error = "Snapshotting is disabled"; + con.write_response(GroupBegin(1)).await?; + let error = RespCodes::OtherError(Some(error.to_string())); + return con.write_response(error).await; + } + let howmany = act.howmany(); + if howmany == 0 { + // We will just follow the standard convention of creating snapshots + let mut was_engine_error = false; + let mut outerror = None; + { + let snapengine = SnapshotEngine::new( + handle + .snap_count + .unwrap_or_else(|| unsafe { unreachable_unchecked() }), + &handle, + None, + ); + if snapengine.is_err() { + was_engine_error = true; + } + let mut snapengine = snapengine.unwrap_or_else(|_| unsafe { unreachable_unchecked() }); + outerror = snapengine.mksnap(); + } + if was_engine_error { + return con + .write_response(responses::fresp::R_SERVER_ERR.to_owned()) + .await; + } + if let Some(val) = outerror { + if val { + // Snapshotting succeeded, return Okay + return con + .write_response(responses::fresp::R_OKAY.to_owned()) + .await; + } else { + // Nope, something happened while creating a snapshot + // return a server error + return con + .write_response(responses::fresp::R_SERVER_ERR.to_owned()) + .await; + } + } else { + // We shouldn't ever reach here if all our logic is correct + // but if we do, something is wrong with the runtime + con.write_response(GroupBegin(1)).await?; + let error = RespCodes::OtherError(Some("access-after-termsig".to_owned())); + return con.write_response(error).await; + } + } else { + return con + .write_response(responses::fresp::R_ACTION_ERR.to_owned()) + .await; + } } diff --git a/server/src/kvengine/mod.rs b/server/src/kvengine/mod.rs index 844695c2..e9f733bd 100644 --- a/server/src/kvengine/mod.rs +++ b/server/src/kvengine/mod.rs @@ -31,7 +31,7 @@ pub mod get; pub mod jget; pub mod keylen; pub mod mget; -mod mksnap; +pub mod mksnap; pub mod mset; pub mod mupdate; pub mod set; diff --git a/server/src/queryengine/mod.rs b/server/src/queryengine/mod.rs index 0a028fb1..29a52192 100644 --- a/server/src/queryengine/mod.rs +++ b/server/src/queryengine/mod.rs @@ -61,6 +61,8 @@ mod tags { pub const TAG_USET: &'static str = "USET"; /// `KEYLEN` action tag pub const TAG_KEYLEN: &'static str = "KEYLEN"; + /// `MKSNAP` action tag + pub const TAG_MKSNAP: &'static str = "MKSNAP"; } /// Execute a simple(*) query @@ -90,6 +92,7 @@ pub async fn execute_simple(db: &CoreDB, con: &mut Connection, buf: ActionGroup) tags::TAG_FLUSHDB => kvengine::flushdb::flushdb(db, con, buf).await?, tags::TAG_USET => kvengine::uset::uset(db, con, buf).await?, tags::TAG_KEYLEN => kvengine::keylen::keylen(db, con, buf).await?, + tags::TAG_MKSNAP => kvengine::mksnap::mksnap(db, con, buf).await?, _ => { con.write_response(responses::fresp::R_UNKNOWN_ACTION.to_owned()) .await?