Wait for all workers to drop and release flock(s)

This commit ensures that the workers exit before attempting a flush_db
operation. Only after block_on_process_exit finishes we return `db`.
Now we run a simple flush_db operation knowing that the lock has been
released.
To block on process termination, we introduce a new function
block_on_process_exit that does the same thing as CoreDB's Drop
implementation.
next
Sayan Nandan 3 years ago
parent 7349f5261d
commit 8e46e62d3f

@ -348,6 +348,19 @@ impl CoreDB {
snapcfg, snapcfg,
} }
} }
pub fn block_on_process_exit(&mut self) {
while Arc::strong_count(&self.shared) == self.expected_strong_count() {
// Acquire a lock to prevent anyone from writing something
let mut coretable = self.shared.table.write();
coretable.terminate = true;
// Drop the write lock first to avoid BGSAVE ending up in failing
// to get a read lock
drop(coretable);
// Notify the background tasks to quit
self.shared.bgsave_task.notify_one();
self.shared.snapshot_service.notify_one();
}
}
/// Check if the database object is poisoned, that is, data couldn't be written /// Check if the database object is poisoned, that is, data couldn't be written
/// to disk once, and hence, we have disabled write operations /// to disk once, and hence, we have disabled write operations
pub fn is_poisoned(&self) -> bool { pub fn is_poisoned(&self) -> bool {

@ -61,7 +61,6 @@ use tokio::sync::Semaphore;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
pub mod connection; pub mod connection;
mod tls; mod tls;
use crate::flush_db;
/// Responsible for gracefully shutting down the server instead of dying randomly /// Responsible for gracefully shutting down the server instead of dying randomly
// Sounds very sci-fi ;) // Sounds very sci-fi ;)
@ -95,8 +94,6 @@ impl Terminator {
} }
} }
use std::io::{self, prelude::*};
/// Multiple Listener Interface /// Multiple Listener Interface
/// ///
/// A `MultiListener` is an abstraction over an `SslListener` or a `Listener` to facilitate /// A `MultiListener` is an abstraction over an `SslListener` or a `Listener` to facilitate
@ -318,7 +315,7 @@ pub async fn run(
snapshot_cfg: SnapshotConfig, snapshot_cfg: SnapshotConfig,
sig: impl Future, sig: impl Future,
restore_filepath: Option<PathBuf>, restore_filepath: Option<PathBuf>,
) { ) -> CoreDB {
let (signal, _) = broadcast::channel(1); let (signal, _) = broadcast::channel(1);
let (terminate_tx, terminate_rx) = mpsc::channel(1); let (terminate_tx, terminate_rx) = mpsc::channel(1);
let (db, lock) = match CoreDB::new(bgsave_cfg, snapshot_cfg, restore_filepath) { let (db, lock) = match CoreDB::new(bgsave_cfg, snapshot_cfg, restore_filepath) {
@ -394,23 +391,7 @@ pub async fn run(
log::error!("Failed to release lock on data file with '{}'", e); log::error!("Failed to release lock on data file with '{}'", e);
process::exit(0x100); process::exit(0x100);
} }
if let Err(e) = flush_db!(db) { db
log::error!("Failed to flush data to disk with '{}'", e);
loop {
// Keep looping until we successfully write the in-memory table to disk
log::warn!("Press enter to try again...");
io::stdout().flush().unwrap();
io::stdin().read(&mut [0]).unwrap();
if let Ok(_) = flush_db!(db) {
log::info!("Successfully saved data to disk");
break;
} else {
continue;
}
}
} else {
log::info!("Successfully saved data to disk");
}
} }
/// This is a **test only** function /// This is a **test only** function

@ -92,6 +92,9 @@ impl FileLock {
} }
/// Write something to this file /// Write something to this file
pub fn write(&mut self, bytes: &[u8]) -> Result<()> { pub fn write(&mut self, bytes: &[u8]) -> Result<()> {
// Truncate the file
self.file.set_len(0)?;
// Now write to the file
self.file.write_all(bytes) self.file.write_all(bytes)
} }
} }
@ -136,7 +139,7 @@ mod tests {
#[cfg(windows)] #[cfg(windows)]
#[test] #[test]
fn test_windows_lock_and_then_unlock() { fn test_windows_lock_and_then_unlock() {
let mut file = FileLock::lock("data4.bin").unwrap(); let mut file = FileLock::lock("data4.bin").unwrap();
file.unlock().unwrap(); file.unlock().unwrap();
drop(file); drop(file);
let mut file2 = FileLock::lock("data4.bin").unwrap(); let mut file2 = FileLock::lock("data4.bin").unwrap();
@ -154,7 +157,7 @@ mod __sys {
use std::io::{Error, Result}; use std::io::{Error, Result};
use std::mem; use std::mem;
use std::os::windows::io::AsRawHandle; use std::os::windows::io::AsRawHandle;
use winapi::shared::minwindef::{DWORD}; use winapi::shared::minwindef::DWORD;
use winapi::um::fileapi::{LockFileEx, UnlockFile}; use winapi::um::fileapi::{LockFileEx, UnlockFile};
use winapi::um::minwinbase::{LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY}; use winapi::um::minwinbase::{LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY};
/// Obtain an exclusive lock and **block** until we acquire it /// Obtain an exclusive lock and **block** until we acquire it
@ -182,9 +185,7 @@ mod __sys {
} }
/// Attempt to unlock a file /// Attempt to unlock a file
pub fn unlock_file(file: &File) -> Result<()> { pub fn unlock_file(file: &File) -> Result<()> {
let ret = unsafe { let ret = unsafe { UnlockFile(file.as_raw_handle(), 0, 0, !0, !0) };
UnlockFile(file.as_raw_handle(), 0, 0, !0, !0)
};
if ret == 0 { if ret == 0 {
Err(Error::last_os_error()) Err(Error::last_os_error())
} else { } else {

@ -33,6 +33,7 @@
use crate::config::BGSave; use crate::config::BGSave;
use crate::config::PortConfig; use crate::config::PortConfig;
use crate::config::SnapshotConfig; use crate::config::SnapshotConfig;
use std::io::{self, prelude::*};
mod config; mod config;
use std::env; use std::env;
mod admin; mod admin;
@ -70,23 +71,44 @@ fn main() {
.init(); .init();
// Start the server which asynchronously waits for a CTRL+C signal // Start the server which asynchronously waits for a CTRL+C signal
// which will safely shut down the server // which will safely shut down the server
tokio::runtime::Builder::new_multi_thread() let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("server") .thread_name("server")
.enable_all() .enable_all()
.build() .build()
.unwrap() .unwrap();
.block_on(async { let db = runtime.block_on(async {
let (tcplistener, bgsave_config, snapshot_config, restore_filepath) = let (tcplistener, bgsave_config, snapshot_config, restore_filepath) =
check_args_and_get_cfg().await; check_args_and_get_cfg().await;
run( let mut db = run(
tcplistener, tcplistener,
bgsave_config, bgsave_config,
snapshot_config, snapshot_config,
signal::ctrl_c(), signal::ctrl_c(),
restore_filepath, restore_filepath,
) )
.await .await;
}); db.block_on_process_exit();
db
});
// Make sure all background workers terminate
drop(runtime);
if let Err(e) = flush_db!(db) {
log::error!("Failed to flush data to disk with '{}'", e);
loop {
// Keep looping until we successfully write the in-memory table to disk
log::warn!("Press enter to try again...");
io::stdout().flush().unwrap();
io::stdin().read(&mut [0]).unwrap();
if let Ok(_) = flush_db!(db) {
log::info!("Successfully saved data to disk");
break;
} else {
continue;
}
}
} else {
log::info!("Successfully saved data to disk");
}
terminal::write_info("Goodbye :)\n").unwrap(); terminal::write_info("Goodbye :)\n").unwrap();
} }

Loading…
Cancel
Save