Enable select exec [skip ci]

next
Sayan Nandan 11 months ago
parent e412cea7eb
commit b7fd815e9e
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -44,7 +44,7 @@ pub use upd::collect_trace_path as update_flow_trace;
pub use {
del::{delete, delete_resp},
ins::{insert, insert_resp},
sel::select_custom,
sel::{select_custom, select_resp},
upd::{update, update_resp},
};
@ -65,6 +65,7 @@ impl Model {
}
}
#[derive(Debug)]
pub struct QueryExecMeta {
delta_hint: usize,
}

@ -26,7 +26,10 @@
use crate::engine::{
core::index::DcFieldIndex,
data::cell::{Datacell, VirtualDatacell},
data::{
cell::{Datacell, VirtualDatacell},
tag::{DataTag, TagClass},
},
error::{QueryError, QueryResult},
fractal::GlobalInstanceLike,
idx::{STIndex, STIndexSeq},
@ -39,10 +42,57 @@ pub fn select_resp(
global: &impl GlobalInstanceLike,
select: SelectStatement,
) -> QueryResult<Response> {
todo!()
let mut resp_b = vec![];
let mut resp_a = vec![];
let mut i = 0u64;
self::select_custom(global, select, |item| {
encode_cell(&mut resp_b, item);
i += 1;
})?;
resp_a.push(0x11);
resp_a.extend(i.to_string().as_bytes());
resp_a.push(b'\n');
Ok(Response::EncodedAB(
resp_a.into_boxed_slice(),
resp_b.into_boxed_slice(),
))
}
fn encode_cell(resp: &mut Vec<u8>, item: &Datacell) {
resp.push((item.tag().tag_selector().value_u8() + 1) * (item.is_init() as u8));
if item.is_null() {
return;
}
unsafe {
// UNSAFE(@ohsayan): +tagck
// NOTE(@ohsayan): optimize out unwanted alloc
match item.tag().tag_class() {
TagClass::Bool => resp.push(item.read_bool() as _),
TagClass::UnsignedInt => resp.extend(item.read_uint().to_string().as_bytes()),
TagClass::SignedInt => resp.extend(item.read_sint().to_string().as_bytes()),
TagClass::Float => resp.extend(item.read_float().to_string().as_bytes()),
TagClass::Bin | TagClass::Str => {
let slc = item.read_bin();
resp.extend(slc.len().to_string().as_bytes());
resp.push(b'\n');
resp.extend(slc);
return;
}
TagClass::List => {
let list = item.read_list();
let ls = list.read();
resp.extend(ls.len().to_string().as_bytes());
resp.push(b'\n');
for item in ls.iter() {
encode_cell(resp, item);
}
return;
}
}
}
resp.push(b'\n');
}
#[allow(unused)]
pub fn select_custom<F>(
global: &impl GlobalInstanceLike,
mut select: SelectStatement,

@ -172,14 +172,14 @@ fn run_nb(
) -> QueryResult<Response> {
let stmt = stmt.value_u8() - KeywordStmt::Use.value_u8();
static F: [fn(&Global, &mut State<'static, InplaceData>) -> QueryResult<Response>; 8] = [
|_, _| panic!("use not implemented"),
|_, _| panic!("inspect not implemented"),
|_, _| panic!("describe not implemented"),
|_, _| Err(QueryError::QLUnknownStatement), // use
|_, _| Err(QueryError::QLUnknownStatement), // inspect
|_, _| Err(QueryError::QLUnknownStatement), // describe
|g, s| _call(g, s, dml::insert_resp),
|_, _| panic!("select not implemented"),
|g, s| _call(g, s, dml::select_resp),
|g, s| _call(g, s, dml::update_resp),
|g, s| _call(g, s, dml::delete_resp),
|_, _| panic!("exists not implemented"),
|_, _| Err(QueryError::QLUnknownStatement), // exists
];
{
let mut state = unsafe {

@ -221,13 +221,7 @@ impl DeltaState {
self.data_current_version.fetch_add(1, Ordering::AcqRel)
}
pub fn __data_delta_dequeue(&self, g: &Guard) -> Option<DataDelta> {
match self.data_deltas.blocking_try_dequeue(g) {
Some(d) => {
self.data_deltas_size.fetch_sub(1, Ordering::Release);
Some(d)
}
None => None,
}
self.data_deltas.blocking_try_dequeue(g)
}
}
@ -284,9 +278,6 @@ impl DeltaState {
// fractal
impl DeltaState {
pub fn __fractal_take_from_data_delta(&self, cnt: usize, _token: FractalToken) {
let _ = self.data_deltas_size.fetch_sub(cnt, Ordering::Release);
}
pub fn __fractal_take_full_from_data_delta(&self, _token: FractalToken) -> usize {
self.data_deltas_size.swap(0, Ordering::AcqRel)
}

@ -228,7 +228,7 @@ impl FractalMgr {
// services
impl FractalMgr {
const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60;
const GENERAL_EXECUTOR_WINDOW: u64 = 5;
/// The high priority executor service runs in the background to take care of high priority tasks and take any
/// appropriate action. It will exclusively own the high priority queue since it is the only broker that is
/// allowed to perform HP tasks

@ -54,6 +54,7 @@ use {
#[derive(Debug, PartialEq)]
pub enum Response {
Empty,
EncodedAB(Box<[u8]>, Box<[u8]>),
}
pub(super) async fn query_loop<S: Socket>(
@ -119,6 +120,10 @@ pub(super) async fn query_loop<S: Socket>(
Ok(Response::Empty) => {
con.write_all(&[0x12]).await?;
}
Ok(Response::EncodedAB(a, b)) => {
con.write_all(&a).await?;
con.write_all(&b).await?;
}
Err(e) => {
let [a, b] = (e.value_u8() as u16).to_le_bytes();
con.write_all(&[0x10, a, b]).await?;

Loading…
Cancel
Save