diff --git a/server/src/engine/core/dml/mod.rs b/server/src/engine/core/dml/mod.rs index 402c6b45..702ed855 100644 --- a/server/src/engine/core/dml/mod.rs +++ b/server/src/engine/core/dml/mod.rs @@ -44,7 +44,7 @@ pub use upd::collect_trace_path as update_flow_trace; pub use { del::{delete, delete_resp}, ins::{insert, insert_resp}, - sel::select_custom, + sel::{select_custom, select_resp}, upd::{update, update_resp}, }; @@ -65,6 +65,7 @@ impl Model { } } +#[derive(Debug)] pub struct QueryExecMeta { delta_hint: usize, } diff --git a/server/src/engine/core/dml/sel.rs b/server/src/engine/core/dml/sel.rs index d281a27b..8c2d0f1f 100644 --- a/server/src/engine/core/dml/sel.rs +++ b/server/src/engine/core/dml/sel.rs @@ -26,7 +26,10 @@ use crate::engine::{ core::index::DcFieldIndex, - data::cell::{Datacell, VirtualDatacell}, + data::{ + cell::{Datacell, VirtualDatacell}, + tag::{DataTag, TagClass}, + }, error::{QueryError, QueryResult}, fractal::GlobalInstanceLike, idx::{STIndex, STIndexSeq}, @@ -39,10 +42,57 @@ pub fn select_resp( global: &impl GlobalInstanceLike, select: SelectStatement, ) -> QueryResult { - todo!() + let mut resp_b = vec![]; + let mut resp_a = vec![]; + let mut i = 0u64; + self::select_custom(global, select, |item| { + encode_cell(&mut resp_b, item); + i += 1; + })?; + resp_a.push(0x11); + resp_a.extend(i.to_string().as_bytes()); + resp_a.push(b'\n'); + Ok(Response::EncodedAB( + resp_a.into_boxed_slice(), + resp_b.into_boxed_slice(), + )) +} + +fn encode_cell(resp: &mut Vec, item: &Datacell) { + resp.push((item.tag().tag_selector().value_u8() + 1) * (item.is_init() as u8)); + if item.is_null() { + return; + } + unsafe { + // UNSAFE(@ohsayan): +tagck + // NOTE(@ohsayan): optimize out unwanted alloc + match item.tag().tag_class() { + TagClass::Bool => resp.push(item.read_bool() as _), + TagClass::UnsignedInt => resp.extend(item.read_uint().to_string().as_bytes()), + TagClass::SignedInt => resp.extend(item.read_sint().to_string().as_bytes()), + TagClass::Float => resp.extend(item.read_float().to_string().as_bytes()), + TagClass::Bin | TagClass::Str => { + let slc = item.read_bin(); + resp.extend(slc.len().to_string().as_bytes()); + resp.push(b'\n'); + resp.extend(slc); + return; + } + TagClass::List => { + let list = item.read_list(); + let ls = list.read(); + resp.extend(ls.len().to_string().as_bytes()); + resp.push(b'\n'); + for item in ls.iter() { + encode_cell(resp, item); + } + return; + } + } + } + resp.push(b'\n'); } -#[allow(unused)] pub fn select_custom( global: &impl GlobalInstanceLike, mut select: SelectStatement, diff --git a/server/src/engine/core/exec.rs b/server/src/engine/core/exec.rs index 8006e2d9..2ae93770 100644 --- a/server/src/engine/core/exec.rs +++ b/server/src/engine/core/exec.rs @@ -172,14 +172,14 @@ fn run_nb( ) -> QueryResult { let stmt = stmt.value_u8() - KeywordStmt::Use.value_u8(); static F: [fn(&Global, &mut State<'static, InplaceData>) -> QueryResult; 8] = [ - |_, _| panic!("use not implemented"), - |_, _| panic!("inspect not implemented"), - |_, _| panic!("describe not implemented"), + |_, _| Err(QueryError::QLUnknownStatement), // use + |_, _| Err(QueryError::QLUnknownStatement), // inspect + |_, _| Err(QueryError::QLUnknownStatement), // describe |g, s| _call(g, s, dml::insert_resp), - |_, _| panic!("select not implemented"), + |g, s| _call(g, s, dml::select_resp), |g, s| _call(g, s, dml::update_resp), |g, s| _call(g, s, dml::delete_resp), - |_, _| panic!("exists not implemented"), + |_, _| Err(QueryError::QLUnknownStatement), // exists ]; { let mut state = unsafe { diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index af1c8d4d..91b1434f 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -221,13 +221,7 @@ impl DeltaState { self.data_current_version.fetch_add(1, Ordering::AcqRel) } pub fn __data_delta_dequeue(&self, g: &Guard) -> Option { - match self.data_deltas.blocking_try_dequeue(g) { - Some(d) => { - self.data_deltas_size.fetch_sub(1, Ordering::Release); - Some(d) - } - None => None, - } + self.data_deltas.blocking_try_dequeue(g) } } @@ -284,9 +278,6 @@ impl DeltaState { // fractal impl DeltaState { - pub fn __fractal_take_from_data_delta(&self, cnt: usize, _token: FractalToken) { - let _ = self.data_deltas_size.fetch_sub(cnt, Ordering::Release); - } pub fn __fractal_take_full_from_data_delta(&self, _token: FractalToken) -> usize { self.data_deltas_size.swap(0, Ordering::AcqRel) } diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index e83e53da..99375c2c 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -228,7 +228,7 @@ impl FractalMgr { // services impl FractalMgr { - const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60; + const GENERAL_EXECUTOR_WINDOW: u64 = 5; /// The high priority executor service runs in the background to take care of high priority tasks and take any /// appropriate action. It will exclusively own the high priority queue since it is the only broker that is /// allowed to perform HP tasks diff --git a/server/src/engine/net/protocol/mod.rs b/server/src/engine/net/protocol/mod.rs index 1044e2a1..cb3cdcaf 100644 --- a/server/src/engine/net/protocol/mod.rs +++ b/server/src/engine/net/protocol/mod.rs @@ -54,6 +54,7 @@ use { #[derive(Debug, PartialEq)] pub enum Response { Empty, + EncodedAB(Box<[u8]>, Box<[u8]>), } pub(super) async fn query_loop( @@ -119,6 +120,10 @@ pub(super) async fn query_loop( Ok(Response::Empty) => { con.write_all(&[0x12]).await?; } + Ok(Response::EncodedAB(a, b)) => { + con.write_all(&a).await?; + con.write_all(&b).await?; + } Err(e) => { let [a, b] = (e.value_u8() as u16).to_le_bytes(); con.write_all(&[0x10, a, b]).await?;