Simplify response handling

next
Sayan Nandan 10 months ago
parent 341a896c37
commit 2b4d9efb1b
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -33,6 +33,7 @@ use crate::engine::{
error::{QueryError, QueryResult},
fractal::GlobalInstanceLike,
idx::{STIndex, STIndexSeq},
mem::IntegerRepr,
net::protocol::Response,
ql::dml::sel::SelectStatement,
sync,
@ -42,20 +43,13 @@ pub fn select_resp(
global: &impl GlobalInstanceLike,
select: SelectStatement,
) -> QueryResult<Response> {
let mut resp_b = vec![];
let mut resp_a = vec![];
let mut i = 0u64;
let mut data = vec![];
let mut i = 0usize;
self::select_custom(global, select, |item| {
encode_cell(&mut resp_b, item);
encode_cell(&mut data, item);
i += 1;
})?;
resp_a.push(0x11);
resp_a.extend(i.to_string().as_bytes());
resp_a.push(b'\n');
Ok(Response::EncodedAB(
resp_a.into_boxed_slice(),
resp_b.into_boxed_slice(),
))
Ok(Response::Row { size: i, data })
}
fn encode_cell(resp: &mut Vec<u8>, item: &Datacell) {
@ -65,15 +59,14 @@ fn encode_cell(resp: &mut Vec<u8>, item: &Datacell) {
}
unsafe {
// UNSAFE(@ohsayan): +tagck
// NOTE(@ohsayan): optimize out unwanted alloc
match item.tag().tag_class() {
TagClass::Bool => resp.push(item.read_bool() as _),
TagClass::UnsignedInt => resp.extend(item.read_uint().to_string().as_bytes()),
TagClass::SignedInt => resp.extend(item.read_sint().to_string().as_bytes()),
TagClass::UnsignedInt => IntegerRepr::scoped(item.read_uint(), |b| resp.extend(b)),
TagClass::SignedInt => IntegerRepr::scoped(item.read_sint(), |b| resp.extend(b)),
TagClass::Float => resp.extend(item.read_float().to_string().as_bytes()),
TagClass::Bin | TagClass::Str => {
let slc = item.read_bin();
resp.extend(slc.len().to_string().as_bytes());
IntegerRepr::scoped(slc.len() as u64, |b| resp.extend(b));
resp.push(b'\n');
resp.extend(slc);
return;
@ -81,7 +74,7 @@ fn encode_cell(resp: &mut Vec<u8>, item: &Datacell) {
TagClass::List => {
let list = item.read_list();
let ls = list.read();
resp.extend(ls.len().to_string().as_bytes());
IntegerRepr::scoped(ls.len() as u64, |b| resp.extend(b));
resp.push(b'\n');
for item in ls.iter() {
encode_cell(resp, item);

@ -54,7 +54,7 @@ use {
pub(in crate::engine::core) use self::delta::{DeltaState, DeltaVersion, SchemaDeltaKind};
use super::util::{EntityID, EntityIDRef};
pub(in crate::engine) type Fields = IndexSTSeqCns<Box<str>, Field>;
type Fields = IndexSTSeqCns<Box<str>, Field>;
#[derive(Debug)]
pub struct Model {

@ -26,6 +26,7 @@
mod astr;
mod ll;
mod numbuf;
pub mod scanner;
mod stackop;
mod uarray;
@ -42,6 +43,7 @@ pub use {
uarray::UArray,
vinline::VInline,
word::{DwordNN, DwordQN, WordIO, ZERO_BLOCK},
numbuf::IntegerRepr,
};
// imports
use std::alloc::{self, Layout};

@ -0,0 +1,174 @@
/*
* Created on Thu Nov 23 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
/*
derived from the implementation in libcore
*/
use core::{mem, ptr, slice};
pub trait Int {
type Buffer: Default;
fn init_buf() -> Self::Buffer {
Self::Buffer::default()
}
fn init(self, buf: &mut Self::Buffer) -> &[u8];
}
pub struct IntegerRepr<I: Int> {
b: I::Buffer,
}
impl<I: Int> IntegerRepr<I> {
pub fn new() -> Self {
Self { b: I::init_buf() }
}
pub fn as_bytes(&mut self, i: I) -> &[u8] {
i.init(&mut self.b)
}
pub fn scoped<T>(i: I, mut f: impl FnMut(&[u8]) -> T) -> T {
let mut slf = Self::new();
f(slf.as_bytes(i))
}
#[cfg(test)]
pub fn as_str(&mut self, i: I) -> &str {
unsafe { core::mem::transmute(self.as_bytes(i)) }
}
}
const DEC_DIGITS_LUT: &[u8] = b"\
0001020304050607080910111213141516171819\
2021222324252627282930313233343536373839\
4041424344454647484950515253545556575859\
6061626364656667686970717273747576777879\
8081828384858687888990919293949596979899";
macro_rules! impl_int {
($($($int:ty => $max:literal),* as $cast:ty),*) => {
$($(impl Int for $int {
type Buffer = [u8; $max];
fn init(self, buf: &mut Self::Buffer) -> &[u8] {
#[allow(unused_comparisons)]
let negative = self < 0;
let mut n = if negative {
// two's complement (invert, add 1)
((!(self as $cast)).wrapping_add(1))
} else {
self as $cast
};
let mut curr_idx = buf.len() as isize;
let buf_ptr = buf.as_mut_ptr();
let lut_ptr = DEC_DIGITS_LUT.as_ptr();
unsafe {
if mem::size_of::<Self>() >= 2 {
while n >= 10_000 {
let rem = (n % 10_000) as isize;
n /= 10_000;
let d1 = (rem / 100) << 1;
let d2 = (rem % 100) << 1;
curr_idx -= 4;
ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr_idx), 2);
ptr::copy_nonoverlapping(lut_ptr.offset(d2), buf_ptr.offset(curr_idx + 2), 2);
}
}
// 4 chars left
let mut n = n as isize;
// 2 chars
if n >= 100 {
let d1 = (n % 100) << 1;
n /= 100;
curr_idx -= 2;
ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr_idx), 2);
}
// 1 or 2 left
if n < 10 {
curr_idx -= 1;
*buf_ptr.offset(curr_idx) = (n as u8) + b'0';
} else {
let d1 = n << 1;
curr_idx -= 2;
ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr_idx), 2);
}
if negative {
curr_idx -= 1;
*buf_ptr.offset(curr_idx) = b'-';
}
slice::from_raw_parts(buf_ptr.offset(curr_idx), buf.len() - curr_idx as usize)
}
}
})*)*
};
}
impl_int!(u8 => 3, i8 => 4, u16 => 5, i16 => 6, u32 => 10, i32 => 11 as u32, u64 => 20, i64 => 20 as u64);
#[cfg(test)]
mod tests {
fn ibufeq<I: super::Int + ToString + Copy>(v: I) {
let mut buf = super::IntegerRepr::new();
assert_eq!(buf.as_str(v), v.to_string());
}
#[test]
fn u8() {
ibufeq(u8::MIN);
ibufeq(u8::MAX);
}
#[test]
fn i8() {
ibufeq(i8::MIN);
ibufeq(i8::MAX);
}
#[test]
fn u16() {
ibufeq(u16::MIN);
ibufeq(u16::MAX);
}
#[test]
fn i16() {
ibufeq(i16::MIN);
ibufeq(i16::MAX);
}
#[test]
fn u32() {
ibufeq(u32::MIN);
ibufeq(u32::MAX);
}
#[test]
fn i32() {
ibufeq(i32::MIN);
ibufeq(i32::MAX);
}
#[test]
fn u64() {
ibufeq(u64::MIN);
ibufeq(u64::MAX);
}
#[test]
fn i64() {
ibufeq(i64::MIN);
ibufeq(i64::MAX);
}
}

@ -45,7 +45,7 @@ use {
self,
error::QueryError,
fractal::{Global, GlobalInstanceLike},
mem::BufferedScanner,
mem::{BufferedScanner, IntegerRepr},
},
bytes::{Buf, BytesMut},
tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter},
@ -73,7 +73,7 @@ impl ClientLocalState {
#[derive(Debug, PartialEq)]
pub enum Response {
Empty,
EncodedAB(Box<[u8]>, Box<[u8]>),
Row { size: usize, data: Vec<u8> },
}
pub(super) async fn query_loop<S: Socket>(
@ -139,9 +139,12 @@ pub(super) async fn query_loop<S: Socket>(
Ok(Response::Empty) => {
con.write_all(&[0x12]).await?;
}
Ok(Response::EncodedAB(a, b)) => {
con.write_all(&a).await?;
con.write_all(&b).await?;
Ok(Response::Row { size, data }) => {
con.write_u8(0x11).await?;
let mut irep = IntegerRepr::new();
con.write_all(irep.as_bytes(size as u64)).await?;
con.write_u8(b'\n').await?;
con.write_all(&data).await?;
}
Err(e) => {
let [a, b] = (e.value_u8() as u16).to_le_bytes();

Loading…
Cancel
Save