next
Sayan Nandan 11 months ago
parent cac33bf7c2
commit d6eeb5cdf6
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -40,13 +40,13 @@ use crate::{
};
#[cfg(test)]
pub use upd::collect_trace_path as update_flow_trace;
pub use {
del::{delete, delete_resp},
ins::{insert, insert_resp},
sel::{select_custom, select_resp},
upd::{update, update_resp},
del::delete,
ins::insert,
sel::select_custom,
upd::{collect_trace_path as update_flow_trace, update},
};
pub use {del::delete_resp, ins::insert_resp, sel::select_resp, upd::update_resp};
impl Model {
pub(self) fn resolve_where<'a>(

@ -32,7 +32,7 @@ use {
std::{collections::hash_map::RandomState, sync::Arc},
};
const LNODE_STACK: usize = 2;
const LNODE_STACK: usize = 1;
pub type DefConfig = Config2B<RandomState>;
pub type LNode<T> = VInline<LNODE_STACK, T>;

@ -39,10 +39,9 @@ pub use {
astr::AStr,
ll::CachePadded,
scanner::BufferedScanner,
stackop::ByteStack,
uarray::UArray,
vinline::VInline,
word::{DwordNN, DwordQN, QwordNNNN, TwordNNN, WordIO, ZERO_BLOCK},
word::{DwordNN, DwordQN, WordIO, ZERO_BLOCK},
};
// imports
use std::alloc::{self, Layout};

@ -41,11 +41,12 @@ mod txn;
// test
#[cfg(test)]
mod tests;
// re-export
pub use error::RuntimeResult;
use {
self::{
config::{ConfigEndpoint, ConfigEndpointTls, ConfigMode, ConfigReturn, Configuration},
error::RuntimeResult,
fractal::context::{self, Subsystem},
storage::v1::{
loader::{self, SEInitState},
@ -58,6 +59,10 @@ use {
tokio::sync::broadcast,
};
pub(super) fn set_context_init(msg: &'static str) {
context::set(Subsystem::Init, msg)
}
/// Initialize all drivers, load all data
///
/// WARN: Must be in [`tokio::runtime::Runtime`] context!
@ -137,12 +142,10 @@ impl EndpointListeners {
}
pub async fn start(
termsig: TerminationSignal,
Configuration { endpoints, .. }: Configuration,
fractal::GlobalStateStart { global, boot }: fractal::GlobalStateStart,
) -> RuntimeResult<()> {
// bind termination signal
context::set(Subsystem::Init, "binding system signals");
let termsig = TerminationSignal::init()?;
// create our system-wide channel
let (signal, _) = broadcast::channel::<()>(1);
// start our services
@ -212,7 +215,6 @@ pub async fn start(
(_, Err(e)) => error!("error while terminating flp-executor: {e}"),
_ => {}
}
info!("all services have exited");
Ok(())
}

@ -101,6 +101,7 @@ pub fn parse_drop<'a, Qd: QueryData<'a>>(state: &mut State<'a, Qd>) -> QueryResu
}
}
#[cfg(test)]
pub use impls::DropStatementAST;
mod impls {
use {

@ -70,6 +70,7 @@ pub fn parse_inspect<'a, Qd: QueryData<'a>>(
}
}
#[cfg(test)]
pub use impls::InspectStatementAST;
mod impls {
use crate::engine::{

@ -25,10 +25,9 @@
*/
mod raw;
pub use {
insecure_impl::InsecureLexer,
raw::{Ident, Keyword, KeywordMisc, KeywordStmt, Symbol, Token},
};
#[cfg(test)]
pub use insecure_impl::InsecureLexer;
pub use raw::{Ident, Keyword, KeywordMisc, KeywordStmt, Symbol, Token};
use {
crate::engine::{

@ -44,7 +44,6 @@ use {
idx::{MTIndex, STIndex, STIndexSeq},
storage::v1::rw::{RawFSInterface, SDSSFileIO, SDSSFileTrackedReader},
},
crossbeam_epoch::pin,
std::{
collections::{hash_map::Entry as HMEntry, HashMap},
mem::ManuallyDrop,
@ -217,7 +216,7 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
// NOTE(@ohsayan): current complexity is O(n) which is good enough (in the future I might revise this to a fancier impl)
// pin model
let irm = m.intent_read_model();
let g = pin();
let g = unsafe { crossbeam_epoch::unprotected() };
let mut pending_delete = HashMap::new();
let p_index = m.primary_index().__raw_index();
// scan rows

@ -41,9 +41,8 @@ mod tests;
// re-exports
pub use {
journal::{JournalAdapter, JournalWriter},
memfs::NullFS,
rw::{LocalFS, RawFSInterface, SDSSFileIO},
};
pub mod data_batch {
pub use super::batch_jrnl::{create, reinit, DataBatchPersistDriver, DataBatchRestoreDriver};
pub use super::batch_jrnl::{create, DataBatchPersistDriver};
}

@ -67,23 +67,28 @@ fn main() {
.parse_filters(&env::var("SKY_LOG").unwrap_or_else(|_| "info".to_owned()))
.init();
println!("{TEXT}\nSkytable v{VERSION} | {URL}\n");
let (config, global) = match engine::load_all() {
Ok(x) => x,
Err(e) => {
error!("{e}");
exit_error()
}
let run = || {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("server")
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
engine::set_context_init("binding system signals");
let signal = util::os::TerminationSignal::init()?;
let (config, global) = tokio::task::spawn_blocking(|| engine::load_all())
.await
.unwrap()?;
let g = global.global.clone();
engine::start(signal, config, global).await?;
engine::RuntimeResult::Ok(g)
})
};
let g = global.global.clone();
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("server")
.enable_all()
.build()
.unwrap();
match runtime.block_on(async move { engine::start(config, global).await }) {
Ok(()) => {
match run() {
Ok(g) => {
info!("completing cleanup before exit");
engine::finish(g);
info!("finished all pending tasks. Goodbye!");
println!("Goodbye!");
}
Err(e) => {
error!("{e}");

Loading…
Cancel
Save