Avoid purge of model drivers on force drop space

The purge is unnecessary as the space itself is removed.

Also, in fractal mgr simply sleep for a fixed duration if
initial threshold is breached.
next
Sayan Nandan 7 months ago
parent 1ed4f41565
commit 41e091cd0f
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -266,19 +266,14 @@ impl Space {
global.taskmgr_post_standard_priority(Task::new( global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()), GenericTask::delete_space_dir(&space_name, space.get_uuid()),
)); ));
let space_uuid = space.get_uuid();
for model in space.models.into_iter() { for model in space.models.into_iter() {
let e: EntityIDRef<'static> = unsafe { let e: EntityIDRef<'static> = unsafe {
// UNSAFE(@ohsayan): I want to try what the borrow checker has been trying // UNSAFE(@ohsayan): I want to try what the borrow checker has been trying
core::mem::transmute(EntityIDRef::new(space_name.as_str(), &model)) core::mem::transmute(EntityIDRef::new(space_name.as_str(), &model))
}; };
let mdl = models.st_delete_return(&e).unwrap(); models.st_delete(&e);
global.purge_model_driver( // no need to purge model drive since the dir itself is deleted. our work here is to just
&space_name, // remove this from the linked models from the model ns
space_uuid,
&model,
mdl.data().get_uuid(),
);
} }
let _ = spaces.st_delete(space_name.as_str()); let _ = spaces.st_delete(space_name.as_str());
if if_exists { if if_exists {

@ -42,7 +42,7 @@ use {
}, },
util::os, util::os,
}, },
std::path::PathBuf, std::{path::PathBuf, time::Duration},
tokio::{ tokio::{
fs, fs,
sync::{ sync::{
@ -54,6 +54,8 @@ use {
}; };
pub const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60; pub const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60;
const TASK_THRESHOLD: usize = 10;
const TASK_FAILURE_SLEEP_DURATION: u64 = 30;
/// A task for the [`FractalMgr`] to perform /// A task for the [`FractalMgr`] to perform
#[derive(Debug)] #[derive(Debug)]
@ -63,10 +65,9 @@ pub struct Task<T> {
} }
impl<T> Task<T> { impl<T> Task<T> {
const THRESHOLD: usize = 10;
/// Create a new task with the default threshold /// Create a new task with the default threshold
pub fn new(task: T) -> Self { pub fn new(task: T) -> Self {
Self::with_threshold(task, Self::THRESHOLD) Self::with_threshold(task, TASK_THRESHOLD)
} }
/// Create a task with the given threshold /// Create a task with the given threshold
fn with_threshold(task: T, threshold: usize) -> Self { fn with_threshold(task: T, threshold: usize) -> Self {
@ -76,6 +77,11 @@ impl<T> Task<T> {
pub fn into_task(self) -> T { pub fn into_task(self) -> T {
self.task self.task
} }
async fn sleep(&self) {
if self.threshold != TASK_THRESHOLD {
tokio::time::sleep(Duration::from_secs(TASK_FAILURE_SLEEP_DURATION)).await
}
}
} }
/// A general task /// A general task
@ -248,6 +254,11 @@ impl FractalMgr {
// services // services
impl FractalMgr { impl FractalMgr {
#[inline(always)]
fn adjust_threshold(th: usize) -> usize {
// FIXME(@ohsayan): adjust a correct threshold. right now we don't do anything here (and for good reason)
th.saturating_sub(1)
}
/// The high priority executor service runs in the background to take care of high priority tasks and take any /// The high priority executor service runs in the background to take care of high priority tasks and take any
/// appropriate action. It will exclusively own the high priority queue since it is the only broker that is /// appropriate action. It will exclusively own the high priority queue since it is the only broker that is
/// allowed to perform HP tasks /// allowed to perform HP tasks
@ -261,7 +272,10 @@ impl FractalMgr {
let task = tokio::select! { let task = tokio::select! {
task = receiver.recv() => { task = receiver.recv() => {
match task { match task {
Some(t) => t, Some(t) => {
t.sleep().await;
t
},
None => { None => {
info!("fhp: exiting executor service because all tasks closed"); info!("fhp: exiting executor service because all tasks closed");
break; break;
@ -284,6 +298,22 @@ impl FractalMgr {
.unwrap() .unwrap()
} }
} }
#[cold]
#[inline(never)]
fn re_enqueue_model_sync(
&self,
model_id: ModelUniqueID,
observed_size: usize,
stats: BatchStats,
threshold: usize,
) {
self.hp_dispatcher
.send(Task::with_threshold(
CriticalTask::WriteBatch(model_id, observed_size - stats.get_actual()),
threshold,
))
.unwrap()
}
fn hp_executor( fn hp_executor(
&'static self, &'static self,
global: super::Global, global: super::Global,
@ -370,15 +400,12 @@ impl FractalMgr {
model_id.uuid() model_id.uuid()
); );
// enqueue again for retrying // enqueue again for retrying
self.hp_dispatcher self.re_enqueue_model_sync(
.send(Task::with_threshold( model_id,
CriticalTask::WriteBatch( observed_size,
model_id, stats,
observed_size - stats.get_actual(), Self::adjust_threshold(threshold),
), )
threshold - 1,
))
.unwrap();
} }
} }
} }
@ -411,7 +438,10 @@ impl FractalMgr {
} }
task = lpq.recv() => { task = lpq.recv() => {
let Task { threshold, task } = match task { let Task { threshold, task } = match task {
Some(t) => t, Some(t) => {
t.sleep().await;
t
},
None => { None => {
info!("flp: exiting executor service because all tasks closed"); info!("flp: exiting executor service because all tasks closed");
break; break;
@ -422,14 +452,14 @@ impl FractalMgr {
GenericTask::DeleteFile(f) => { GenericTask::DeleteFile(f) => {
if let Err(_) = fs::remove_file(&f).await { if let Err(_) = fs::remove_file(&f).await {
self.general_dispatcher.send( self.general_dispatcher.send(
Task::with_threshold(GenericTask::DeleteFile(f), threshold - 1) Task::with_threshold(GenericTask::DeleteFile(f), Self::adjust_threshold(threshold))
).unwrap(); ).unwrap();
} }
} }
GenericTask::DeleteDirAll(dir) => { GenericTask::DeleteDirAll(dir) => {
if let Err(_) = fs::remove_dir_all(&dir).await { if let Err(_) = fs::remove_dir_all(&dir).await {
self.general_dispatcher.send( self.general_dispatcher.send(
Task::with_threshold(GenericTask::DeleteDirAll(dir), threshold - 1) Task::with_threshold(GenericTask::DeleteDirAll(dir), Self::adjust_threshold(threshold))
).unwrap(); ).unwrap();
} }
} }
@ -465,16 +495,16 @@ impl FractalMgr {
model_id.space(), model_id.entity(), model_id.space(), model_id.entity(),
); );
// this failure is *not* good, so we want to promote this to a critical task // this failure is *not* good, so we want to promote this to a critical task
self.hp_dispatcher self.re_enqueue_model_sync(
.send(Task::new(CriticalTask::WriteBatch( ModelUniqueID::new(
ModelUniqueID::new( model_id.space(),
model_id.space(), model_id.entity(),
model_id.entity(), model.data().get_uuid(),
model.data().get_uuid(), ),
), observed_len,
observed_len - stats.get_actual(), stats,
))) TASK_THRESHOLD,
.unwrap() )
} }
} }
} }

@ -33,7 +33,7 @@ use {
GNSDriver, ModelDriver, GNSDriver, ModelDriver,
}, },
}, },
crate::engine::error::RuntimeResult, crate::{engine::error::RuntimeResult, util::compiler},
std::{fmt, mem::MaybeUninit}, std::{fmt, mem::MaybeUninit},
tokio::sync::mpsc::unbounded_channel, tokio::sync::mpsc::unbounded_channel,
}; };
@ -117,6 +117,7 @@ pub trait GlobalInstanceLike {
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>); fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>);
fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>); fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>);
// default impls // default impls
#[inline(always)]
fn request_batch_resolve_if_cache_full( fn request_batch_resolve_if_cache_full(
&self, &self,
space_name: &str, space_name: &str,
@ -128,14 +129,18 @@ pub trait GlobalInstanceLike {
let r_tolerated_change = hint.delta_hint() >= self.get_max_delta_size(); let r_tolerated_change = hint.delta_hint() >= self.get_max_delta_size();
let r_percent_change = (hint.delta_hint() >= ((model.primary_index().count() / 100) * 5)) let r_percent_change = (hint.delta_hint() >= ((model.primary_index().count() / 100) * 5))
& (r_tolerated_change); & (r_tolerated_change);
if r_tolerated_change | r_percent_change { if compiler::unlikely(r_tolerated_change | r_percent_change) {
let obtained_delta_size = model // do not inline this path as we expect sufficient memory to be present and/or the background service
.delta_state() // to pick this up
.__fractal_take_full_from_data_delta(FractalToken::new()); compiler::cold_call(|| {
self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch( let obtained_delta_size = model
ModelUniqueID::new(space_name, model_name, model.get_uuid()), .delta_state()
obtained_delta_size, .__fractal_take_full_from_data_delta(FractalToken::new());
))); self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
obtained_delta_size,
)));
})
} }
} }
} }

Loading…
Cancel
Save