From 41e091cd0f6861cbaca2c6d73e023f698ec3f1a8 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sat, 2 Mar 2024 08:48:03 +0530 Subject: [PATCH] Avoid purge of model drivers on force drop space The purge is unnecessary as the space itself is removed. Also, in fractal mgr simply sleep for a fixed duration if initial threshold is breached. --- server/src/engine/core/space.rs | 11 ++--- server/src/engine/fractal/mgr.rs | 82 ++++++++++++++++++++++---------- server/src/engine/fractal/mod.rs | 23 +++++---- 3 files changed, 73 insertions(+), 43 deletions(-) diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index e0b75be9..4f905379 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -266,19 +266,14 @@ impl Space { global.taskmgr_post_standard_priority(Task::new( GenericTask::delete_space_dir(&space_name, space.get_uuid()), )); - let space_uuid = space.get_uuid(); for model in space.models.into_iter() { let e: EntityIDRef<'static> = unsafe { // UNSAFE(@ohsayan): I want to try what the borrow checker has been trying core::mem::transmute(EntityIDRef::new(space_name.as_str(), &model)) }; - let mdl = models.st_delete_return(&e).unwrap(); - global.purge_model_driver( - &space_name, - space_uuid, - &model, - mdl.data().get_uuid(), - ); + models.st_delete(&e); + // no need to purge model drive since the dir itself is deleted. our work here is to just + // remove this from the linked models from the model ns } let _ = spaces.st_delete(space_name.as_str()); if if_exists { diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 1ab8c608..c699d137 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -42,7 +42,7 @@ use { }, util::os, }, - std::path::PathBuf, + std::{path::PathBuf, time::Duration}, tokio::{ fs, sync::{ @@ -54,6 +54,8 @@ use { }; pub const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60; +const TASK_THRESHOLD: usize = 10; +const TASK_FAILURE_SLEEP_DURATION: u64 = 30; /// A task for the [`FractalMgr`] to perform #[derive(Debug)] @@ -63,10 +65,9 @@ pub struct Task { } impl Task { - const THRESHOLD: usize = 10; /// Create a new task with the default threshold pub fn new(task: T) -> Self { - Self::with_threshold(task, Self::THRESHOLD) + Self::with_threshold(task, TASK_THRESHOLD) } /// Create a task with the given threshold fn with_threshold(task: T, threshold: usize) -> Self { @@ -76,6 +77,11 @@ impl Task { pub fn into_task(self) -> T { self.task } + async fn sleep(&self) { + if self.threshold != TASK_THRESHOLD { + tokio::time::sleep(Duration::from_secs(TASK_FAILURE_SLEEP_DURATION)).await + } + } } /// A general task @@ -248,6 +254,11 @@ impl FractalMgr { // services impl FractalMgr { + #[inline(always)] + fn adjust_threshold(th: usize) -> usize { + // FIXME(@ohsayan): adjust a correct threshold. right now we don't do anything here (and for good reason) + th.saturating_sub(1) + } /// The high priority executor service runs in the background to take care of high priority tasks and take any /// appropriate action. It will exclusively own the high priority queue since it is the only broker that is /// allowed to perform HP tasks @@ -261,7 +272,10 @@ impl FractalMgr { let task = tokio::select! { task = receiver.recv() => { match task { - Some(t) => t, + Some(t) => { + t.sleep().await; + t + }, None => { info!("fhp: exiting executor service because all tasks closed"); break; @@ -284,6 +298,22 @@ impl FractalMgr { .unwrap() } } + #[cold] + #[inline(never)] + fn re_enqueue_model_sync( + &self, + model_id: ModelUniqueID, + observed_size: usize, + stats: BatchStats, + threshold: usize, + ) { + self.hp_dispatcher + .send(Task::with_threshold( + CriticalTask::WriteBatch(model_id, observed_size - stats.get_actual()), + threshold, + )) + .unwrap() + } fn hp_executor( &'static self, global: super::Global, @@ -370,15 +400,12 @@ impl FractalMgr { model_id.uuid() ); // enqueue again for retrying - self.hp_dispatcher - .send(Task::with_threshold( - CriticalTask::WriteBatch( - model_id, - observed_size - stats.get_actual(), - ), - threshold - 1, - )) - .unwrap(); + self.re_enqueue_model_sync( + model_id, + observed_size, + stats, + Self::adjust_threshold(threshold), + ) } } } @@ -411,7 +438,10 @@ impl FractalMgr { } task = lpq.recv() => { let Task { threshold, task } = match task { - Some(t) => t, + Some(t) => { + t.sleep().await; + t + }, None => { info!("flp: exiting executor service because all tasks closed"); break; @@ -422,14 +452,14 @@ impl FractalMgr { GenericTask::DeleteFile(f) => { if let Err(_) = fs::remove_file(&f).await { self.general_dispatcher.send( - Task::with_threshold(GenericTask::DeleteFile(f), threshold - 1) + Task::with_threshold(GenericTask::DeleteFile(f), Self::adjust_threshold(threshold)) ).unwrap(); } } GenericTask::DeleteDirAll(dir) => { if let Err(_) = fs::remove_dir_all(&dir).await { self.general_dispatcher.send( - Task::with_threshold(GenericTask::DeleteDirAll(dir), threshold - 1) + Task::with_threshold(GenericTask::DeleteDirAll(dir), Self::adjust_threshold(threshold)) ).unwrap(); } } @@ -465,16 +495,16 @@ impl FractalMgr { model_id.space(), model_id.entity(), ); // this failure is *not* good, so we want to promote this to a critical task - self.hp_dispatcher - .send(Task::new(CriticalTask::WriteBatch( - ModelUniqueID::new( - model_id.space(), - model_id.entity(), - model.data().get_uuid(), - ), - observed_len - stats.get_actual(), - ))) - .unwrap() + self.re_enqueue_model_sync( + ModelUniqueID::new( + model_id.space(), + model_id.entity(), + model.data().get_uuid(), + ), + observed_len, + stats, + TASK_THRESHOLD, + ) } } } diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index 154db00c..eeb6b0cc 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -33,7 +33,7 @@ use { GNSDriver, ModelDriver, }, }, - crate::engine::error::RuntimeResult, + crate::{engine::error::RuntimeResult, util::compiler}, std::{fmt, mem::MaybeUninit}, tokio::sync::mpsc::unbounded_channel, }; @@ -117,6 +117,7 @@ pub trait GlobalInstanceLike { fn taskmgr_post_high_priority(&self, task: Task); fn taskmgr_post_standard_priority(&self, task: Task); // default impls + #[inline(always)] fn request_batch_resolve_if_cache_full( &self, space_name: &str, @@ -128,14 +129,18 @@ pub trait GlobalInstanceLike { let r_tolerated_change = hint.delta_hint() >= self.get_max_delta_size(); let r_percent_change = (hint.delta_hint() >= ((model.primary_index().count() / 100) * 5)) & (r_tolerated_change); - if r_tolerated_change | r_percent_change { - let obtained_delta_size = model - .delta_state() - .__fractal_take_full_from_data_delta(FractalToken::new()); - self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch( - ModelUniqueID::new(space_name, model_name, model.get_uuid()), - obtained_delta_size, - ))); + if compiler::unlikely(r_tolerated_change | r_percent_change) { + // do not inline this path as we expect sufficient memory to be present and/or the background service + // to pick this up + compiler::cold_call(|| { + let obtained_delta_size = model + .delta_state() + .__fractal_take_full_from_data_delta(FractalToken::new()); + self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch( + ModelUniqueID::new(space_name, model_name, model.get_uuid()), + obtained_delta_size, + ))); + }) } } }