diff --git a/server/src/coredb/mod.rs b/server/src/coredb/mod.rs index 41149915..1d50ba47 100644 --- a/server/src/coredb/mod.rs +++ b/server/src/coredb/mod.rs @@ -348,6 +348,19 @@ impl CoreDB { snapcfg, } } + pub fn block_on_process_exit(&mut self) { + while Arc::strong_count(&self.shared) == self.expected_strong_count() { + // Acquire a lock to prevent anyone from writing something + let mut coretable = self.shared.table.write(); + coretable.terminate = true; + // Drop the write lock first to avoid BGSAVE ending up in failing + // to get a read lock + drop(coretable); + // Notify the background tasks to quit + self.shared.bgsave_task.notify_one(); + self.shared.snapshot_service.notify_one(); + } + } /// Check if the database object is poisoned, that is, data couldn't be written /// to disk once, and hence, we have disabled write operations pub fn is_poisoned(&self) -> bool { diff --git a/server/src/dbnet/mod.rs b/server/src/dbnet/mod.rs index 68da7107..05220b39 100644 --- a/server/src/dbnet/mod.rs +++ b/server/src/dbnet/mod.rs @@ -61,7 +61,6 @@ use tokio::sync::Semaphore; use tokio::sync::{broadcast, mpsc}; pub mod connection; mod tls; -use crate::flush_db; /// Responsible for gracefully shutting down the server instead of dying randomly // Sounds very sci-fi ;) @@ -95,8 +94,6 @@ impl Terminator { } } -use std::io::{self, prelude::*}; - /// Multiple Listener Interface /// /// A `MultiListener` is an abstraction over an `SslListener` or a `Listener` to facilitate @@ -318,7 +315,7 @@ pub async fn run( snapshot_cfg: SnapshotConfig, sig: impl Future, restore_filepath: Option, -) { +) -> CoreDB { let (signal, _) = broadcast::channel(1); let (terminate_tx, terminate_rx) = mpsc::channel(1); let (db, lock) = match CoreDB::new(bgsave_cfg, snapshot_cfg, restore_filepath) { @@ -394,23 +391,7 @@ pub async fn run( log::error!("Failed to release lock on data file with '{}'", e); process::exit(0x100); } - if let Err(e) = flush_db!(db) { - log::error!("Failed to flush data to disk with '{}'", e); - loop { - // Keep looping until we successfully write the in-memory table to disk - log::warn!("Press enter to try again..."); - io::stdout().flush().unwrap(); - io::stdin().read(&mut [0]).unwrap(); - if let Ok(_) = flush_db!(db) { - log::info!("Successfully saved data to disk"); - break; - } else { - continue; - } - } - } else { - log::info!("Successfully saved data to disk"); - } + db } /// This is a **test only** function diff --git a/server/src/diskstore/flock.rs b/server/src/diskstore/flock.rs index e5dbc7e0..7a90ad23 100644 --- a/server/src/diskstore/flock.rs +++ b/server/src/diskstore/flock.rs @@ -92,6 +92,9 @@ impl FileLock { } /// Write something to this file pub fn write(&mut self, bytes: &[u8]) -> Result<()> { + // Truncate the file + self.file.set_len(0)?; + // Now write to the file self.file.write_all(bytes) } } @@ -136,7 +139,7 @@ mod tests { #[cfg(windows)] #[test] fn test_windows_lock_and_then_unlock() { - let mut file = FileLock::lock("data4.bin").unwrap(); + let mut file = FileLock::lock("data4.bin").unwrap(); file.unlock().unwrap(); drop(file); let mut file2 = FileLock::lock("data4.bin").unwrap(); @@ -154,7 +157,7 @@ mod __sys { use std::io::{Error, Result}; use std::mem; use std::os::windows::io::AsRawHandle; - use winapi::shared::minwindef::{DWORD}; + use winapi::shared::minwindef::DWORD; use winapi::um::fileapi::{LockFileEx, UnlockFile}; use winapi::um::minwinbase::{LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY}; /// Obtain an exclusive lock and **block** until we acquire it @@ -182,9 +185,7 @@ mod __sys { } /// Attempt to unlock a file pub fn unlock_file(file: &File) -> Result<()> { - let ret = unsafe { - UnlockFile(file.as_raw_handle(), 0, 0, !0, !0) - }; + let ret = unsafe { UnlockFile(file.as_raw_handle(), 0, 0, !0, !0) }; if ret == 0 { Err(Error::last_os_error()) } else { diff --git a/server/src/main.rs b/server/src/main.rs index d553935a..1d78e0ae 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,6 +33,7 @@ use crate::config::BGSave; use crate::config::PortConfig; use crate::config::SnapshotConfig; +use std::io::{self, prelude::*}; mod config; use std::env; mod admin; @@ -70,23 +71,44 @@ fn main() { .init(); // Start the server which asynchronously waits for a CTRL+C signal // which will safely shut down the server - tokio::runtime::Builder::new_multi_thread() + let runtime = tokio::runtime::Builder::new_multi_thread() .thread_name("server") .enable_all() .build() - .unwrap() - .block_on(async { - let (tcplistener, bgsave_config, snapshot_config, restore_filepath) = - check_args_and_get_cfg().await; - run( - tcplistener, - bgsave_config, - snapshot_config, - signal::ctrl_c(), - restore_filepath, - ) - .await - }); + .unwrap(); + let db = runtime.block_on(async { + let (tcplistener, bgsave_config, snapshot_config, restore_filepath) = + check_args_and_get_cfg().await; + let mut db = run( + tcplistener, + bgsave_config, + snapshot_config, + signal::ctrl_c(), + restore_filepath, + ) + .await; + db.block_on_process_exit(); + db + }); + // Make sure all background workers terminate + drop(runtime); + if let Err(e) = flush_db!(db) { + log::error!("Failed to flush data to disk with '{}'", e); + loop { + // Keep looping until we successfully write the in-memory table to disk + log::warn!("Press enter to try again..."); + io::stdout().flush().unwrap(); + io::stdin().read(&mut [0]).unwrap(); + if let Ok(_) = flush_db!(db) { + log::info!("Successfully saved data to disk"); + break; + } else { + continue; + } + } + } else { + log::info!("Successfully saved data to disk"); + } terminal::write_info("Goodbye :)\n").unwrap(); }