Impl Skyhash/2 data exchange

next
Sayan Nandan 1 year ago
parent 91514d4219
commit 7238f0c0e8
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -58,7 +58,7 @@ impl<'a> BufferedScanner<'a> {
self.remaining() >= sizeof
}
pub fn matches_cursor_rounded(&self, f: impl Fn(u8) -> bool) -> bool {
f(self.d[self.d.len().min(self.__cursor)])
f(self.d[(self.d.len() - 1).min(self.__cursor)])
}
pub fn matches_cursor_rounded_and_not_eof(&self, f: impl Fn(u8) -> bool) -> bool {
self.matches_cursor_rounded(f) & !self.eof()

@ -24,4 +24,13 @@
*
*/
use tokio::io::{AsyncRead, AsyncWrite};
mod protocol;
pub trait Socket: AsyncWrite + AsyncRead + Unpin {}
pub type IoResult<T> = Result<T, std::io::Error>;
enum QLoopReturn {
Fin,
ConnectionRst,
}

@ -0,0 +1,192 @@
/*
* Created on Mon Sep 18 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
/*
Skyhash/2 Data Exchange Packets
---
1. Client side packet:
a. SQ
S<len>\n<payload>
b. PQ
P<count>\n(<pld len>\n<pld>)*
TODO(@ohsayan): Restore pipeline impl
*/
use crate::{engine::mem::BufferedScanner, util::compiler};
use std::slice;
#[derive(Debug, PartialEq)]
pub struct CSQuery<'a> {
query: &'a [u8],
}
impl<'a> CSQuery<'a> {
pub(super) const fn new(query: &'a [u8]) -> Self {
Self { query }
}
pub const fn query(&self) -> &'a [u8] {
self.query
}
}
#[derive(Debug, PartialEq)]
pub enum CSQueryState {
Initial,
SizeSegmentPart(u64),
WaitingForFullBlock(usize),
}
impl Default for CSQueryState {
fn default() -> Self {
Self::Initial
}
}
#[derive(Debug, PartialEq)]
pub enum CSQueryExchangeResult<'a> {
Completed(CSQuery<'a>),
ChangeState(CSQueryState, usize),
PacketError,
}
impl<'a> CSQuery<'a> {
pub fn resume_with(
scanner: &mut BufferedScanner<'a>,
state: CSQueryState,
) -> CSQueryExchangeResult<'a> {
match state {
CSQueryState::Initial => Self::resume_initial(scanner),
CSQueryState::SizeSegmentPart(part) => Self::resume_at_meta_segment(scanner, part),
CSQueryState::WaitingForFullBlock(size) => Self::resume_at_data_segment(scanner, size),
}
}
}
enum LFTIntParseResult {
Value(u64),
Partial(u64),
Error,
}
fn parse_lf_separated(
scanner: &mut BufferedScanner,
previously_buffered: u64,
) -> LFTIntParseResult {
let mut ret = previously_buffered;
let mut okay = true;
while scanner.matches_cursor_rounded_and_not_eof(|b| b != b'\n') & okay {
let b = unsafe { scanner.next_byte() };
okay &= b.is_ascii_digit();
ret = match ret.checked_mul(10) {
Some(r) => r,
None => return LFTIntParseResult::Error,
};
ret = match ret.checked_add((b & 0x0F) as u64) {
Some(r) => r,
None => return LFTIntParseResult::Error,
};
}
let payload_ok = okay;
let lf_ok = scanner.matches_cursor_rounded_and_not_eof(|b| b == b'\n');
unsafe { scanner.move_ahead_by(lf_ok as usize) }
if payload_ok & lf_ok {
LFTIntParseResult::Value(ret)
} else {
if payload_ok {
LFTIntParseResult::Partial(ret)
} else {
LFTIntParseResult::Error
}
}
}
impl<'a> CSQuery<'a> {
pub const PREEMPTIVE_READ: usize = 4;
const FIRST_BYTE: u8 = b'S';
fn resume_initial(scanner: &mut BufferedScanner<'a>) -> CSQueryExchangeResult<'a> {
if cfg!(debug_assertions) {
if scanner.remaining() < Self::PREEMPTIVE_READ {
return CSQueryExchangeResult::ChangeState(
CSQueryState::Initial,
Self::PREEMPTIVE_READ,
);
}
} else {
assert!(scanner.remaining() >= Self::PREEMPTIVE_READ);
}
// get our block
let first_byte = unsafe { scanner.next_byte() };
// be optimistic and check first byte later
let size_of_query = match parse_lf_separated(scanner, 0) {
LFTIntParseResult::Value(v) => v as usize,
LFTIntParseResult::Partial(v) => {
if compiler::unlikely(first_byte != Self::FIRST_BYTE) {
return CSQueryExchangeResult::PacketError;
} else {
// expect at least 1 LF and at least 1 query byte
return CSQueryExchangeResult::ChangeState(CSQueryState::SizeSegmentPart(v), 2);
}
}
LFTIntParseResult::Error => {
// that's pretty much over
return CSQueryExchangeResult::PacketError;
}
};
if compiler::unlikely(first_byte != Self::FIRST_BYTE) {
return CSQueryExchangeResult::PacketError;
}
Self::resume_at_data_segment(scanner, size_of_query)
}
fn resume_at_meta_segment(
scanner: &mut BufferedScanner<'a>,
previous: u64,
) -> CSQueryExchangeResult<'a> {
match parse_lf_separated(scanner, previous) {
LFTIntParseResult::Value(v) => Self::resume_at_data_segment(scanner, v as usize),
LFTIntParseResult::Partial(p) => {
CSQueryExchangeResult::ChangeState(CSQueryState::SizeSegmentPart(p), 2)
}
LFTIntParseResult::Error => CSQueryExchangeResult::PacketError,
}
}
fn resume_at_data_segment(
scanner: &mut BufferedScanner<'a>,
size: usize,
) -> CSQueryExchangeResult<'a> {
if scanner.has_left(size) {
let slice;
unsafe {
// UNSAFE(@ohsayan): checked len at branch
slice = slice::from_raw_parts(scanner.current().as_ptr(), size);
scanner.move_ahead_by(size);
}
CSQueryExchangeResult::Completed(CSQuery::new(slice))
} else {
CSQueryExchangeResult::ChangeState(CSQueryState::WaitingForFullBlock(size), size)
}
}
}

@ -24,6 +24,82 @@
*
*/
mod data_exchange;
mod handshake;
#[cfg(test)]
mod tests;
use {
self::handshake::{CHandshake, HandshakeResult, HandshakeState},
super::{IoResult, QLoopReturn, Socket},
crate::engine::mem::BufferedScanner,
bytes::{Buf, BytesMut},
tokio::io::{AsyncReadExt, BufWriter},
};
pub async fn query_loop<S: Socket>(
con: &mut BufWriter<S>,
buf: &mut BytesMut,
) -> IoResult<QLoopReturn> {
// handshake
match do_handshake(con, buf).await? {
Some(ret) => return Ok(ret),
None => {}
}
// done handshaking
loop {
let read_many = con.read_buf(buf).await?;
if let Some(t) = see_if_connection_terminates(read_many, buf) {
return Ok(t);
}
todo!()
}
}
fn see_if_connection_terminates(read_many: usize, buf: &[u8]) -> Option<QLoopReturn> {
if read_many == 0 {
// that's a connection termination
if buf.is_empty() {
// nice termination
return Some(QLoopReturn::Fin);
} else {
return Some(QLoopReturn::ConnectionRst);
}
}
None
}
async fn do_handshake<S: Socket>(
con: &mut BufWriter<S>,
buf: &mut BytesMut,
) -> IoResult<Option<QLoopReturn>> {
let mut expected = CHandshake::INITIAL_READ;
let mut state = HandshakeState::default();
let mut cursor = 0;
let handshake;
loop {
let read_many = con.read_buf(buf).await?;
if let Some(t) = see_if_connection_terminates(read_many, buf) {
return Ok(Some(t));
}
if buf.len() < expected {
continue;
}
let mut scanner = unsafe { BufferedScanner::new_with_cursor(buf, cursor) };
match handshake::CHandshake::resume_with(&mut scanner, state) {
HandshakeResult::Completed(hs) => {
handshake = hs;
break;
}
HandshakeResult::ChangeState { new_state, expect } => {
expected = expect;
state = new_state;
cursor = scanner.cursor();
}
HandshakeResult::Error(_) => todo!(),
}
}
dbg!(handshake);
buf.advance(cursor);
Ok(None)
}

@ -24,12 +24,21 @@
*
*/
use crate::engine::mem::BufferedScanner;
use crate::engine::net::protocol::handshake::{
AuthMode, CHandshake, CHandshakeAuth, CHandshakeStatic, DataExchangeMode, HandshakeResult,
HandshakeState, HandshakeVersion, ProtocolVersion, QueryMode,
use crate::engine::{
mem::BufferedScanner,
net::protocol::{
data_exchange::{CSQuery, CSQueryExchangeResult, CSQueryState},
handshake::{
AuthMode, CHandshake, CHandshakeAuth, CHandshakeStatic, DataExchangeMode,
HandshakeResult, HandshakeState, HandshakeVersion, ProtocolVersion, QueryMode,
},
},
};
/*
client handshake
*/
const FULL_HANDSHAKE_NO_AUTH: [u8; 7] = [b'H', 0, 0, 0, 0, 0, 0];
const FULL_HANDSHAKE_WITH_AUTH: [u8; 23] = *b"H\0\0\0\0\x015\n8\nsayanpass1234";
@ -150,7 +159,6 @@ fn parse_staged_with_auth() {
fn run_state_changes_return_rounds(src: &[u8], expected_final_handshake: CHandshake) -> usize {
let mut rounds = 0;
let hs;
let mut state = HandshakeState::default();
let mut cursor = 0;
let mut expect_many = CHandshake::INITIAL_READ;
@ -164,8 +172,7 @@ fn run_state_changes_return_rounds(src: &[u8], expected_final_handshake: CHandsh
expect_many = expect;
cursor = scanner.cursor();
}
HandshakeResult::Completed(c) => {
hs = c;
HandshakeResult::Completed(hs) => {
assert_eq!(hs, expected_final_handshake);
break;
}
@ -195,3 +202,63 @@ fn parse_auth_with_state_updates() {
);
assert_eq!(rounds, 3); // r1 = initial read, r2 = lengths, r3 = items
}
/*
QT-DEX/SQ
*/
const FULL_SQ: [u8; 116] = *b"S111\nSELECT username, email, bio, profile_pic, following, followers, FROM mysocialapp.users WHERE username = 'sayan'";
const SQ_FULL: CSQuery<'static> = CSQuery::new(
b"SELECT username, email, bio, profile_pic, following, followers, FROM mysocialapp.users WHERE username = 'sayan'"
);
#[test]
fn staged_qt_dex_sq() {
for i in 0..FULL_SQ.len() {
let buf = &FULL_SQ[..i + 1];
let mut scanner = BufferedScanner::new(buf);
let result = CSQuery::resume_with(&mut scanner, CSQueryState::default());
match buf.len() {
1..=3 => assert_eq!(
result,
CSQueryExchangeResult::ChangeState(CSQueryState::Initial, CSQuery::PREEMPTIVE_READ)
),
4 => assert_eq!(
result,
CSQueryExchangeResult::ChangeState(CSQueryState::SizeSegmentPart(111), 2)
),
5..=115 => assert_eq!(
result,
CSQueryExchangeResult::ChangeState(CSQueryState::WaitingForFullBlock(111), 111),
),
116 => assert_eq!(result, CSQueryExchangeResult::Completed(SQ_FULL)),
_ => unreachable!(),
}
}
}
#[test]
fn staged_with_status_switch_qt_dex_sq() {
let mut cursor = 0;
let mut expect = CSQuery::PREEMPTIVE_READ;
let mut state = CSQueryState::default();
let mut rounds = 0;
loop {
rounds += 1;
let buf = &FULL_SQ[..cursor + expect];
let mut scanner = unsafe { BufferedScanner::new_with_cursor(buf, cursor) };
match CSQuery::resume_with(&mut scanner, state) {
CSQueryExchangeResult::Completed(c) => {
assert_eq!(c, SQ_FULL);
break;
}
CSQueryExchangeResult::ChangeState(new_state, _expect) => {
state = new_state;
expect = _expect;
cursor = scanner.cursor();
}
CSQueryExchangeResult::PacketError => panic!("packet error"),
}
}
assert_eq!(rounds, 3);
}

Loading…
Cancel
Save