Param by default in protocol

next
Sayan Nandan 1 year ago
parent a390120231
commit 597a49a91b
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -1,192 +0,0 @@
/*
* Created on Mon Sep 18 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
/*
Skyhash/2 Data Exchange Packets
---
1. Client side packet:
a. SQ
S<len>\n<payload>
b. PQ
P<count>\n(<pld len>\n<pld>)*
TODO(@ohsayan): Restore pipeline impl
*/
use crate::{engine::mem::BufferedScanner, util::compiler};
use std::slice;
#[derive(Debug, PartialEq)]
pub struct CSQuery<'a> {
query: &'a [u8],
}
impl<'a> CSQuery<'a> {
pub(super) const fn new(query: &'a [u8]) -> Self {
Self { query }
}
pub const fn query(&self) -> &'a [u8] {
self.query
}
}
#[derive(Debug, PartialEq)]
pub enum CSQueryState {
Initial,
SizeSegmentPart(u64),
WaitingForFullBlock(usize),
}
impl Default for CSQueryState {
fn default() -> Self {
Self::Initial
}
}
#[derive(Debug, PartialEq)]
pub enum CSQueryExchangeResult<'a> {
Completed(CSQuery<'a>),
ChangeState(CSQueryState, usize),
PacketError,
}
impl<'a> CSQuery<'a> {
pub fn resume_with(
scanner: &mut BufferedScanner<'a>,
state: CSQueryState,
) -> CSQueryExchangeResult<'a> {
match state {
CSQueryState::Initial => Self::resume_initial(scanner),
CSQueryState::SizeSegmentPart(part) => Self::resume_at_meta_segment(scanner, part),
CSQueryState::WaitingForFullBlock(size) => Self::resume_at_data_segment(scanner, size),
}
}
}
enum LFTIntParseResult {
Value(u64),
Partial(u64),
Error,
}
fn parse_lf_separated(
scanner: &mut BufferedScanner,
previously_buffered: u64,
) -> LFTIntParseResult {
let mut ret = previously_buffered;
let mut okay = true;
while scanner.rounded_cursor_not_eof_matches(|b| *b != b'\n') & okay {
let b = unsafe { scanner.next_byte() };
okay &= b.is_ascii_digit();
ret = match ret.checked_mul(10) {
Some(r) => r,
None => return LFTIntParseResult::Error,
};
ret = match ret.checked_add((b & 0x0F) as u64) {
Some(r) => r,
None => return LFTIntParseResult::Error,
};
}
let payload_ok = okay;
let lf_ok = scanner.rounded_cursor_not_eof_matches(|b| *b == b'\n');
unsafe { scanner.incr_cursor_by(lf_ok as usize) }
if payload_ok & lf_ok {
LFTIntParseResult::Value(ret)
} else {
if payload_ok {
LFTIntParseResult::Partial(ret)
} else {
LFTIntParseResult::Error
}
}
}
impl<'a> CSQuery<'a> {
pub const PREEMPTIVE_READ: usize = 4;
const FIRST_BYTE: u8 = b'S';
fn resume_initial(scanner: &mut BufferedScanner<'a>) -> CSQueryExchangeResult<'a> {
if cfg!(debug_assertions) {
if scanner.remaining() < Self::PREEMPTIVE_READ {
return CSQueryExchangeResult::ChangeState(
CSQueryState::Initial,
Self::PREEMPTIVE_READ,
);
}
} else {
assert!(scanner.remaining() >= Self::PREEMPTIVE_READ);
}
// get our block
let first_byte = unsafe { scanner.next_byte() };
// be optimistic and check first byte later
let size_of_query = match parse_lf_separated(scanner, 0) {
LFTIntParseResult::Value(v) => v as usize,
LFTIntParseResult::Partial(v) => {
if compiler::unlikely(first_byte != Self::FIRST_BYTE) {
return CSQueryExchangeResult::PacketError;
} else {
// expect at least 1 LF and at least 1 query byte
return CSQueryExchangeResult::ChangeState(CSQueryState::SizeSegmentPart(v), 2);
}
}
LFTIntParseResult::Error => {
// that's pretty much over
return CSQueryExchangeResult::PacketError;
}
};
if compiler::unlikely(first_byte != Self::FIRST_BYTE) {
return CSQueryExchangeResult::PacketError;
}
Self::resume_at_data_segment(scanner, size_of_query)
}
fn resume_at_meta_segment(
scanner: &mut BufferedScanner<'a>,
previous: u64,
) -> CSQueryExchangeResult<'a> {
match parse_lf_separated(scanner, previous) {
LFTIntParseResult::Value(v) => Self::resume_at_data_segment(scanner, v as usize),
LFTIntParseResult::Partial(p) => {
CSQueryExchangeResult::ChangeState(CSQueryState::SizeSegmentPart(p), 2)
}
LFTIntParseResult::Error => CSQueryExchangeResult::PacketError,
}
}
fn resume_at_data_segment(
scanner: &mut BufferedScanner<'a>,
size: usize,
) -> CSQueryExchangeResult<'a> {
if scanner.has_left(size) {
let slice;
unsafe {
// UNSAFE(@ohsayan): checked len at branch
slice = slice::from_raw_parts(scanner.current_buffer().as_ptr(), size);
scanner.incr_cursor_by(size);
}
CSQueryExchangeResult::Completed(CSQuery::new(slice))
} else {
CSQueryExchangeResult::ChangeState(CSQueryState::WaitingForFullBlock(size), size)
}
}
}

@ -0,0 +1,318 @@
/*
* Created on Wed Sep 20 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::engine::mem::BufferedScanner;
pub const EXCHANGE_MIN_SIZE: usize = b"S1\nh".len();
pub(super) const STATE_READ_INITIAL: QueryTimeExchangeResult<'static> =
QueryTimeExchangeResult::ChangeState {
new_state: QueryTimeExchangeState::Initial,
expect_more: EXCHANGE_MIN_SIZE,
};
pub(super) const STATE_ERROR: QueryTimeExchangeResult<'static> = QueryTimeExchangeResult::Error;
#[derive(Debug, PartialEq)]
/// State of a query time exchange
pub enum QueryTimeExchangeState {
/// beginning of exchange
Initial,
/// SQ (part of packet size)
SQ1Meta1Partial { packet_size_part: u64 },
/// SQ (part of Q window)
SQ2Meta2Partial {
size_of_static_frame: usize,
packet_size: usize,
q_window_part: u64,
},
/// SQ waiting for block
SQ3FinalizeWaitingForBlock {
dataframe_size: usize,
q_window: usize,
},
}
impl Default for QueryTimeExchangeState {
fn default() -> Self {
Self::Initial
}
}
#[derive(Debug, PartialEq)]
/// Result after attempting to complete (or terminate) a query time exchange
pub enum QueryTimeExchangeResult<'a> {
/// We completed the exchange and yielded a [`SQuery`]
SQCompleted(SQuery<'a>),
/// We're changing states
ChangeState {
new_state: QueryTimeExchangeState,
expect_more: usize,
},
/// We hit an error and need to terminate this exchange
Error,
}
/// Resume a query time exchange
pub fn resume<'a>(
scanner: &mut BufferedScanner<'a>,
state: QueryTimeExchangeState,
) -> QueryTimeExchangeResult<'a> {
if cfg!(debug_assertions) {
if !scanner.has_left(EXCHANGE_MIN_SIZE) {
return STATE_READ_INITIAL;
}
} else {
assert!(scanner.has_left(EXCHANGE_MIN_SIZE));
}
match state {
QueryTimeExchangeState::Initial => {
// attempt to read atleast one byte
if cfg!(debug_assertions) {
match scanner.try_next_byte() {
Some(b'S') => SQuery::resume_initial(scanner),
Some(_) => return STATE_ERROR,
None => return STATE_READ_INITIAL,
}
} else {
match unsafe { scanner.next_byte() } {
b'S' => SQuery::resume_initial(scanner),
_ => return STATE_ERROR,
}
}
}
QueryTimeExchangeState::SQ1Meta1Partial { packet_size_part } => {
SQuery::resume_at_sq1_meta1_partial(scanner, packet_size_part)
}
QueryTimeExchangeState::SQ2Meta2Partial {
packet_size,
q_window_part,
size_of_static_frame,
} => SQuery::resume_at_sq2_meta2_partial(
scanner,
size_of_static_frame,
packet_size,
q_window_part,
),
QueryTimeExchangeState::SQ3FinalizeWaitingForBlock {
dataframe_size,
q_window,
} => SQuery::resume_at_final(scanner, q_window, dataframe_size),
}
}
/*
SQ
*/
enum LFTIntParseResult {
Value(u64),
Partial(u64),
Error,
}
fn parse_lf_separated(
scanner: &mut BufferedScanner,
previously_buffered: u64,
) -> LFTIntParseResult {
let mut ret = previously_buffered;
let mut okay = true;
while scanner.rounded_cursor_not_eof_matches(|b| *b != b'\n') & okay {
let b = unsafe { scanner.next_byte() };
okay &= b.is_ascii_digit();
ret = match ret.checked_mul(10) {
Some(r) => r,
None => return LFTIntParseResult::Error,
};
ret = match ret.checked_add((b & 0x0F) as u64) {
Some(r) => r,
None => return LFTIntParseResult::Error,
};
}
let payload_ok = okay;
let lf_ok = scanner.rounded_cursor_not_eof_matches(|b| *b == b'\n');
unsafe { scanner.incr_cursor_if(lf_ok) }
if payload_ok & lf_ok {
LFTIntParseResult::Value(ret)
} else {
if payload_ok {
LFTIntParseResult::Partial(ret)
} else {
LFTIntParseResult::Error
}
}
}
#[derive(Debug, PartialEq)]
pub struct SQuery<'a> {
q: &'a [u8],
q_window: usize,
}
impl<'a> SQuery<'a> {
pub(super) fn new(q: &'a [u8], q_window: usize) -> Self {
Self { q, q_window }
}
pub fn q(&self) -> &'a [u8] {
self.q
}
pub fn q_window(&self) -> usize {
self.q_window
}
pub fn query(&self) -> &'a [u8] {
&self.q[..self.q_window]
}
pub fn query_str(&self) -> Option<&'a str> {
core::str::from_utf8(self.query()).ok()
}
pub fn params(&self) -> &'a [u8] {
&self.q[self.q_window..]
}
pub fn params_str(&self) -> Option<&'a str> {
core::str::from_utf8(self.params()).ok()
}
}
impl<'a> SQuery<'a> {
/// We're touching this packet for the first time
fn resume_initial(scanner: &mut BufferedScanner<'a>) -> QueryTimeExchangeResult<'a> {
Self::resume_at_sq1_meta1_partial(scanner, 0)
}
/// We found some part of meta1, and need to resume
fn resume_at_sq1_meta1_partial(
scanner: &mut BufferedScanner<'a>,
prev: u64,
) -> QueryTimeExchangeResult<'a> {
match parse_lf_separated(scanner, prev) {
LFTIntParseResult::Value(packet_size) => {
// we got the packet size; can we get the q window?
Self::resume_at_sq2_meta2_partial(
scanner,
scanner.cursor(),
packet_size as usize,
0,
)
}
LFTIntParseResult::Partial(partial_packet_size) => {
// we couldn't get the packet size
QueryTimeExchangeResult::ChangeState {
new_state: QueryTimeExchangeState::SQ1Meta1Partial {
packet_size_part: partial_packet_size,
},
expect_more: 3, // 1LF + 1ASCII + 1LF
}
}
LFTIntParseResult::Error => STATE_ERROR,
}
}
/// We found some part of meta2, and need to resume
fn resume_at_sq2_meta2_partial(
scanner: &mut BufferedScanner<'a>,
static_size: usize,
packet_size: usize,
prev_qw_buffered: u64,
) -> QueryTimeExchangeResult<'a> {
let start = scanner.cursor();
match parse_lf_separated(scanner, prev_qw_buffered) {
LFTIntParseResult::Value(q_window) => {
// we got the q window; can we complete the exchange?
Self::resume_at_final(
scanner,
q_window as usize,
Self::compute_df_size(scanner, static_size, packet_size),
)
}
LFTIntParseResult::Partial(q_window_partial) => {
// not enough bytes for getting Q window
QueryTimeExchangeResult::ChangeState {
new_state: QueryTimeExchangeState::SQ2Meta2Partial {
packet_size,
q_window_part: q_window_partial,
size_of_static_frame: static_size,
},
// we passed cursor - start bytes out of the packet, so expect this more
expect_more: packet_size - (scanner.cursor() - start),
}
}
LFTIntParseResult::Error => STATE_ERROR,
}
}
/// We got all our meta and need the dataframe
fn resume_at_final(
scanner: &mut BufferedScanner<'a>,
q_window: usize,
dataframe_size: usize,
) -> QueryTimeExchangeResult<'a> {
if scanner.has_left(dataframe_size) {
// we have sufficient bytes for the dataframe
unsafe {
// UNSAFE(@ohsayan): +lenck
QueryTimeExchangeResult::SQCompleted(SQuery::new(
scanner.next_chunk_variable(dataframe_size),
q_window,
))
}
} else {
// not enough bytes for the dataframe
QueryTimeExchangeResult::ChangeState {
new_state: QueryTimeExchangeState::SQ3FinalizeWaitingForBlock {
dataframe_size,
q_window,
},
expect_more: Self::compute_df_remaining(scanner, dataframe_size), // dataframe
}
}
}
}
impl<'a> SQuery<'a> {
fn compute_df_size(scanner: &BufferedScanner, static_size: usize, packet_size: usize) -> usize {
packet_size - scanner.cursor() + static_size
}
fn compute_df_remaining(scanner: &BufferedScanner<'_>, df_size: usize) -> usize {
(scanner.cursor() + df_size) - scanner.buffer_len()
}
}
pub(super) fn create_simple_query<const N: usize>(query: &str, params: [&str; N]) -> Vec<u8> {
let mut buf = vec![];
let query_size_as_string = query.len().to_string();
let size_of_variable_section = query.len()
+ params.iter().map(|l| l.len()).sum::<usize>()
+ query_size_as_string.len()
+ 1;
// segment 1
buf.push(b'S');
buf.extend(size_of_variable_section.to_string().as_bytes());
buf.push(b'\n');
// segment
buf.extend(query_size_as_string.as_bytes());
buf.push(b'\n');
// dataframe
buf.extend(query.as_bytes());
params
.into_iter()
.for_each(|param| buf.extend(param.as_bytes()));
buf
}

@ -24,12 +24,9 @@
*
*/
use {
crate::{
engine::mem::scanner::{BufferedScanner, ScannerDecodeResult},
util::compiler,
},
std::slice,
use crate::{
engine::mem::scanner::{BufferedScanner, ScannerDecodeResult},
util::compiler,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy, sky_macros::EnumMethods)]
@ -320,10 +317,8 @@ impl<'a> CHandshake<'a> {
// we're done here
return unsafe {
// UNSAFE(@ohsayan): we just checked buffered size
let uname = slice::from_raw_parts(scanner.current_buffer().as_ptr(), uname_l);
let pwd =
slice::from_raw_parts(scanner.current_buffer().as_ptr().add(uname_l), pwd_l);
scanner.incr_cursor_by(uname_l + pwd_l);
let uname = scanner.next_chunk_variable(uname_l);
let pwd = scanner.next_chunk_variable(pwd_l);
HandshakeResult::Completed(Self::new(
static_hs,
Some(CHandshakeAuth::new(uname, pwd)),

@ -24,7 +24,7 @@
*
*/
mod data_exchange;
mod exchange;
mod handshake;
#[cfg(test)]
mod tests;

@ -27,7 +27,7 @@
use crate::engine::{
mem::BufferedScanner,
net::protocol::{
data_exchange::{CSQuery, CSQueryExchangeResult, CSQueryState},
exchange::{self, create_simple_query, QueryTimeExchangeResult, QueryTimeExchangeState},
handshake::{
AuthMode, CHandshake, CHandshakeAuth, CHandshakeStatic, DataExchangeMode,
HandshakeResult, HandshakeState, HandshakeVersion, ProtocolVersion, QueryMode,
@ -207,58 +207,112 @@ fn parse_auth_with_state_updates() {
QT-DEX/SQ
*/
const FULL_SQ: [u8; 116] = *b"S111\nSELECT username, email, bio, profile_pic, following, followers, FROM mysocialapp.users WHERE username = 'sayan'";
const SQ_FULL: CSQuery<'static> = CSQuery::new(
b"SELECT username, email, bio, profile_pic, following, followers, FROM mysocialapp.users WHERE username = 'sayan'"
);
const SQ: &str = "select * from myspace.mymodel where username = ?";
#[test]
fn staged_qt_dex_sq() {
for i in 0..FULL_SQ.len() {
let buf = &FULL_SQ[..i + 1];
let mut scanner = BufferedScanner::new(buf);
let result = CSQuery::resume_with(&mut scanner, CSQueryState::default());
match buf.len() {
1..=3 => assert_eq!(
fn qtdex_simple_query() {
let query = create_simple_query(SQ, ["sayan"]);
let mut fin = 52;
for i in 0..query.len() {
let mut scanner = BufferedScanner::new(&query[..i + 1]);
let result = exchange::resume(&mut scanner, Default::default());
match scanner.buffer_len() {
1..=3 => assert_eq!(result, exchange::STATE_READ_INITIAL),
4 => assert_eq!(
result,
CSQueryExchangeResult::ChangeState(CSQueryState::Initial, CSQuery::PREEMPTIVE_READ)
QueryTimeExchangeResult::ChangeState {
new_state: QueryTimeExchangeState::SQ2Meta2Partial {
size_of_static_frame: 4,
packet_size: 56,
q_window_part: 0,
},
expect_more: 56,
}
),
4 => assert_eq!(
5 => assert_eq!(
result,
QueryTimeExchangeResult::ChangeState {
new_state: QueryTimeExchangeState::SQ2Meta2Partial {
size_of_static_frame: 4,
packet_size: 56,
q_window_part: 4,
},
expect_more: 55,
}
),
6 => assert_eq!(
result,
CSQueryExchangeResult::ChangeState(CSQueryState::SizeSegmentPart(111), 2)
QueryTimeExchangeResult::ChangeState {
new_state: QueryTimeExchangeState::SQ2Meta2Partial {
size_of_static_frame: 4,
packet_size: 56,
q_window_part: 48,
},
expect_more: 54,
}
),
5..=115 => assert_eq!(
7 => assert_eq!(
result,
CSQueryExchangeResult::ChangeState(CSQueryState::WaitingForFullBlock(111), 111),
QueryTimeExchangeResult::ChangeState {
new_state: QueryTimeExchangeState::SQ3FinalizeWaitingForBlock {
dataframe_size: 53,
q_window: 48,
},
expect_more: 53,
}
),
116 => assert_eq!(result, CSQueryExchangeResult::Completed(SQ_FULL)),
8..=59 => {
assert_eq!(
result,
QueryTimeExchangeResult::ChangeState {
new_state: QueryTimeExchangeState::SQ3FinalizeWaitingForBlock {
dataframe_size: 53,
q_window: 48
},
expect_more: fin,
}
);
fin -= 1;
}
60 => match result {
QueryTimeExchangeResult::SQCompleted(sq) => {
assert_eq!(sq.query_str().unwrap(), SQ);
assert_eq!(sq.params_str().unwrap(), "sayan");
}
_ => unreachable!(),
},
_ => unreachable!(),
}
}
}
#[test]
fn staged_with_status_switch_qt_dex_sq() {
fn qtdex_simple_query_update_state() {
let query = create_simple_query(SQ, ["sayan"]);
let mut state = QueryTimeExchangeState::default();
let mut cursor = 0;
let mut expect = CSQuery::PREEMPTIVE_READ;
let mut state = CSQueryState::default();
let mut expected = 0;
let mut rounds = 0;
loop {
rounds += 1;
let buf = &FULL_SQ[..cursor + expect];
let buf = &query[..expected + cursor];
let mut scanner = unsafe { BufferedScanner::new_with_cursor(buf, cursor) };
match CSQuery::resume_with(&mut scanner, state) {
CSQueryExchangeResult::Completed(c) => {
assert_eq!(c, SQ_FULL);
match exchange::resume(&mut scanner, state) {
QueryTimeExchangeResult::SQCompleted(sq) => {
assert_eq!(sq.query_str().unwrap(), SQ);
assert_eq!(sq.params_str().unwrap(), "sayan");
break;
}
CSQueryExchangeResult::ChangeState(new_state, _expect) => {
QueryTimeExchangeResult::ChangeState {
new_state,
expect_more,
} => {
expected = expect_more;
state = new_state;
expect = _expect;
cursor = scanner.cursor();
}
CSQueryExchangeResult::PacketError => panic!("packet error"),
QueryTimeExchangeResult::Error => panic!("hit error!"),
}
cursor = scanner.cursor();
}
assert_eq!(rounds, 3);
}

@ -45,10 +45,7 @@ use {
mod lexer {
use {
super::*,
crate::engine::{
data::{lit::Lit, spec::Dataspec1D},
ql::lex::Token,
},
crate::engine::{data::lit::Lit, ql::lex::Token},
};
#[bench]
fn lex_number(b: &mut Bencher) {

Loading…
Cancel
Save