|
|
@ -29,10 +29,23 @@ use crate::config::SnapshotConfig;
|
|
|
|
use crate::config::SnapshotPref;
|
|
|
|
use crate::config::SnapshotPref;
|
|
|
|
use crate::corestore::Corestore;
|
|
|
|
use crate::corestore::Corestore;
|
|
|
|
use crate::dbnet::{self, Terminator};
|
|
|
|
use crate::dbnet::{self, Terminator};
|
|
|
|
|
|
|
|
use crate::diskstore::flock::FileLock;
|
|
|
|
use crate::services;
|
|
|
|
use crate::services;
|
|
|
|
use crate::storage::sengine::SnapshotEngine;
|
|
|
|
use crate::storage::sengine::SnapshotEngine;
|
|
|
|
|
|
|
|
use libsky::util::terminal;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
use std::thread::sleep;
|
|
|
|
|
|
|
|
use tokio::{
|
|
|
|
|
|
|
|
signal::ctrl_c,
|
|
|
|
|
|
|
|
sync::{
|
|
|
|
|
|
|
|
broadcast,
|
|
|
|
|
|
|
|
mpsc::{self, Sender},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
task::{self, JoinHandle},
|
|
|
|
|
|
|
|
time::Duration,
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const TERMSIG_THRESHOLD: usize = 3;
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(unix)]
|
|
|
|
#[cfg(unix)]
|
|
|
|
use core::{future::Future, pin::Pin, task::Context, task::Poll};
|
|
|
|
use core::{future::Future, pin::Pin, task::Context, task::Poll};
|
|
|
@ -145,3 +158,69 @@ pub async fn run(
|
|
|
|
let _ = bgsave_handle.await;
|
|
|
|
let _ = bgsave_handle.await;
|
|
|
|
Ok(db)
|
|
|
|
Ok(db)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn spawn_task(tx: Sender<bool>, db: Corestore, do_sleep: bool) -> JoinHandle<()> {
|
|
|
|
|
|
|
|
task::spawn_blocking(move || {
|
|
|
|
|
|
|
|
if do_sleep {
|
|
|
|
|
|
|
|
log::info!("Waiting for 10 seconds before retrying ...");
|
|
|
|
|
|
|
|
sleep(Duration::from_secs(10));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
let ret = match crate::services::bgsave::run_bgsave(&db) {
|
|
|
|
|
|
|
|
Ok(()) => true,
|
|
|
|
|
|
|
|
Err(e) => {
|
|
|
|
|
|
|
|
log::error!("Failed to run save on termination: {e}");
|
|
|
|
|
|
|
|
false
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
tx.blocking_send(ret).expect("Receiver dropped");
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub fn finalize_shutdown(corestore: Corestore, pid_file: FileLock) {
|
|
|
|
|
|
|
|
let rt = tokio::runtime::Builder::new_multi_thread()
|
|
|
|
|
|
|
|
.thread_name("server-final")
|
|
|
|
|
|
|
|
.enable_all()
|
|
|
|
|
|
|
|
.build()
|
|
|
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let dbc = corestore.clone();
|
|
|
|
|
|
|
|
let mut okay: bool = rt.block_on(async move {
|
|
|
|
|
|
|
|
let db = dbc;
|
|
|
|
|
|
|
|
let (tx, mut rx) = mpsc::channel::<bool>(1);
|
|
|
|
|
|
|
|
spawn_task(tx.clone(), db.clone(), false);
|
|
|
|
|
|
|
|
let mut threshold = TERMSIG_THRESHOLD;
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
|
|
|
|
ret = rx.recv() => {
|
|
|
|
|
|
|
|
if ret.unwrap() {
|
|
|
|
|
|
|
|
// that's good to go
|
|
|
|
|
|
|
|
log::info!("Save before termination successful");
|
|
|
|
|
|
|
|
break true;
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
let txc = tx.clone();
|
|
|
|
|
|
|
|
let dbc = db.clone();
|
|
|
|
|
|
|
|
// we failed, so we better sleep
|
|
|
|
|
|
|
|
// now spawn it again to see the state
|
|
|
|
|
|
|
|
spawn_task(txc, dbc, true);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
_ = ctrl_c() => {
|
|
|
|
|
|
|
|
if threshold == 0 {
|
|
|
|
|
|
|
|
log::error!("SIGTERM received but failed to flush data. Quitting because threshold exceeded");
|
|
|
|
|
|
|
|
break false;
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
log::error!("SIGTERM received but failed to flush data. Threshold is at {threshold}");
|
|
|
|
|
|
|
|
threshold -= 1;
|
|
|
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
okay &= services::pre_shutdown_cleanup(pid_file, Some(corestore.get_store()));
|
|
|
|
|
|
|
|
if okay {
|
|
|
|
|
|
|
|
terminal::write_success("Goodbye :)").unwrap()
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
log::error!("Didn't terminate successfully");
|
|
|
|
|
|
|
|
crate::exit_error();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|