From 7238f0c0e840bbdb25094dfc8201a1c87d934424 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Mon, 18 Sep 2023 18:02:05 +0000 Subject: [PATCH] Impl Skyhash/2 data exchange --- server/src/engine/mem/buf.rs | 2 +- server/src/engine/net/mod.rs | 9 + .../src/engine/net/protocol/data_exchange.rs | 192 ++++++++++++++++++ server/src/engine/net/protocol/mod.rs | 76 +++++++ server/src/engine/net/protocol/tests.rs | 81 +++++++- 5 files changed, 352 insertions(+), 8 deletions(-) create mode 100644 server/src/engine/net/protocol/data_exchange.rs diff --git a/server/src/engine/mem/buf.rs b/server/src/engine/mem/buf.rs index cc4d34ab..4870772a 100644 --- a/server/src/engine/mem/buf.rs +++ b/server/src/engine/mem/buf.rs @@ -58,7 +58,7 @@ impl<'a> BufferedScanner<'a> { self.remaining() >= sizeof } pub fn matches_cursor_rounded(&self, f: impl Fn(u8) -> bool) -> bool { - f(self.d[self.d.len().min(self.__cursor)]) + f(self.d[(self.d.len() - 1).min(self.__cursor)]) } pub fn matches_cursor_rounded_and_not_eof(&self, f: impl Fn(u8) -> bool) -> bool { self.matches_cursor_rounded(f) & !self.eof() diff --git a/server/src/engine/net/mod.rs b/server/src/engine/net/mod.rs index d5c17871..9975ae49 100644 --- a/server/src/engine/net/mod.rs +++ b/server/src/engine/net/mod.rs @@ -24,4 +24,13 @@ * */ +use tokio::io::{AsyncRead, AsyncWrite}; mod protocol; + +pub trait Socket: AsyncWrite + AsyncRead + Unpin {} +pub type IoResult = Result; + +enum QLoopReturn { + Fin, + ConnectionRst, +} diff --git a/server/src/engine/net/protocol/data_exchange.rs b/server/src/engine/net/protocol/data_exchange.rs new file mode 100644 index 00000000..2ffd9177 --- /dev/null +++ b/server/src/engine/net/protocol/data_exchange.rs @@ -0,0 +1,192 @@ +/* + * Created on Mon Sep 18 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +/* + Skyhash/2 Data Exchange Packets + --- + 1. Client side packet: + a. SQ + S\n + b. PQ + P\n(\n)* + + TODO(@ohsayan): Restore pipeline impl +*/ + +use crate::{engine::mem::BufferedScanner, util::compiler}; +use std::slice; + +#[derive(Debug, PartialEq)] +pub struct CSQuery<'a> { + query: &'a [u8], +} + +impl<'a> CSQuery<'a> { + pub(super) const fn new(query: &'a [u8]) -> Self { + Self { query } + } + pub const fn query(&self) -> &'a [u8] { + self.query + } +} + +#[derive(Debug, PartialEq)] +pub enum CSQueryState { + Initial, + SizeSegmentPart(u64), + WaitingForFullBlock(usize), +} + +impl Default for CSQueryState { + fn default() -> Self { + Self::Initial + } +} + +#[derive(Debug, PartialEq)] +pub enum CSQueryExchangeResult<'a> { + Completed(CSQuery<'a>), + ChangeState(CSQueryState, usize), + PacketError, +} + +impl<'a> CSQuery<'a> { + pub fn resume_with( + scanner: &mut BufferedScanner<'a>, + state: CSQueryState, + ) -> CSQueryExchangeResult<'a> { + match state { + CSQueryState::Initial => Self::resume_initial(scanner), + CSQueryState::SizeSegmentPart(part) => Self::resume_at_meta_segment(scanner, part), + CSQueryState::WaitingForFullBlock(size) => Self::resume_at_data_segment(scanner, size), + } + } +} + +enum LFTIntParseResult { + Value(u64), + Partial(u64), + Error, +} + +fn parse_lf_separated( + scanner: &mut BufferedScanner, + previously_buffered: u64, +) -> LFTIntParseResult { + let mut ret = previously_buffered; + let mut okay = true; + while scanner.matches_cursor_rounded_and_not_eof(|b| b != b'\n') & okay { + let b = unsafe { scanner.next_byte() }; + okay &= b.is_ascii_digit(); + ret = match ret.checked_mul(10) { + Some(r) => r, + None => return LFTIntParseResult::Error, + }; + ret = match ret.checked_add((b & 0x0F) as u64) { + Some(r) => r, + None => return LFTIntParseResult::Error, + }; + } + let payload_ok = okay; + let lf_ok = scanner.matches_cursor_rounded_and_not_eof(|b| b == b'\n'); + unsafe { scanner.move_ahead_by(lf_ok as usize) } + if payload_ok & lf_ok { + LFTIntParseResult::Value(ret) + } else { + if payload_ok { + LFTIntParseResult::Partial(ret) + } else { + LFTIntParseResult::Error + } + } +} + +impl<'a> CSQuery<'a> { + pub const PREEMPTIVE_READ: usize = 4; + const FIRST_BYTE: u8 = b'S'; + fn resume_initial(scanner: &mut BufferedScanner<'a>) -> CSQueryExchangeResult<'a> { + if cfg!(debug_assertions) { + if scanner.remaining() < Self::PREEMPTIVE_READ { + return CSQueryExchangeResult::ChangeState( + CSQueryState::Initial, + Self::PREEMPTIVE_READ, + ); + } + } else { + assert!(scanner.remaining() >= Self::PREEMPTIVE_READ); + } + // get our block + let first_byte = unsafe { scanner.next_byte() }; + // be optimistic and check first byte later + let size_of_query = match parse_lf_separated(scanner, 0) { + LFTIntParseResult::Value(v) => v as usize, + LFTIntParseResult::Partial(v) => { + if compiler::unlikely(first_byte != Self::FIRST_BYTE) { + return CSQueryExchangeResult::PacketError; + } else { + // expect at least 1 LF and at least 1 query byte + return CSQueryExchangeResult::ChangeState(CSQueryState::SizeSegmentPart(v), 2); + } + } + LFTIntParseResult::Error => { + // that's pretty much over + return CSQueryExchangeResult::PacketError; + } + }; + if compiler::unlikely(first_byte != Self::FIRST_BYTE) { + return CSQueryExchangeResult::PacketError; + } + Self::resume_at_data_segment(scanner, size_of_query) + } + fn resume_at_meta_segment( + scanner: &mut BufferedScanner<'a>, + previous: u64, + ) -> CSQueryExchangeResult<'a> { + match parse_lf_separated(scanner, previous) { + LFTIntParseResult::Value(v) => Self::resume_at_data_segment(scanner, v as usize), + LFTIntParseResult::Partial(p) => { + CSQueryExchangeResult::ChangeState(CSQueryState::SizeSegmentPart(p), 2) + } + LFTIntParseResult::Error => CSQueryExchangeResult::PacketError, + } + } + fn resume_at_data_segment( + scanner: &mut BufferedScanner<'a>, + size: usize, + ) -> CSQueryExchangeResult<'a> { + if scanner.has_left(size) { + let slice; + unsafe { + // UNSAFE(@ohsayan): checked len at branch + slice = slice::from_raw_parts(scanner.current().as_ptr(), size); + scanner.move_ahead_by(size); + } + CSQueryExchangeResult::Completed(CSQuery::new(slice)) + } else { + CSQueryExchangeResult::ChangeState(CSQueryState::WaitingForFullBlock(size), size) + } + } +} diff --git a/server/src/engine/net/protocol/mod.rs b/server/src/engine/net/protocol/mod.rs index ec08551e..ba794028 100644 --- a/server/src/engine/net/protocol/mod.rs +++ b/server/src/engine/net/protocol/mod.rs @@ -24,6 +24,82 @@ * */ +mod data_exchange; mod handshake; #[cfg(test)] mod tests; + +use { + self::handshake::{CHandshake, HandshakeResult, HandshakeState}, + super::{IoResult, QLoopReturn, Socket}, + crate::engine::mem::BufferedScanner, + bytes::{Buf, BytesMut}, + tokio::io::{AsyncReadExt, BufWriter}, +}; + +pub async fn query_loop( + con: &mut BufWriter, + buf: &mut BytesMut, +) -> IoResult { + // handshake + match do_handshake(con, buf).await? { + Some(ret) => return Ok(ret), + None => {} + } + // done handshaking + loop { + let read_many = con.read_buf(buf).await?; + if let Some(t) = see_if_connection_terminates(read_many, buf) { + return Ok(t); + } + todo!() + } +} + +fn see_if_connection_terminates(read_many: usize, buf: &[u8]) -> Option { + if read_many == 0 { + // that's a connection termination + if buf.is_empty() { + // nice termination + return Some(QLoopReturn::Fin); + } else { + return Some(QLoopReturn::ConnectionRst); + } + } + None +} + +async fn do_handshake( + con: &mut BufWriter, + buf: &mut BytesMut, +) -> IoResult> { + let mut expected = CHandshake::INITIAL_READ; + let mut state = HandshakeState::default(); + let mut cursor = 0; + let handshake; + loop { + let read_many = con.read_buf(buf).await?; + if let Some(t) = see_if_connection_terminates(read_many, buf) { + return Ok(Some(t)); + } + if buf.len() < expected { + continue; + } + let mut scanner = unsafe { BufferedScanner::new_with_cursor(buf, cursor) }; + match handshake::CHandshake::resume_with(&mut scanner, state) { + HandshakeResult::Completed(hs) => { + handshake = hs; + break; + } + HandshakeResult::ChangeState { new_state, expect } => { + expected = expect; + state = new_state; + cursor = scanner.cursor(); + } + HandshakeResult::Error(_) => todo!(), + } + } + dbg!(handshake); + buf.advance(cursor); + Ok(None) +} diff --git a/server/src/engine/net/protocol/tests.rs b/server/src/engine/net/protocol/tests.rs index 91185cdb..e2159deb 100644 --- a/server/src/engine/net/protocol/tests.rs +++ b/server/src/engine/net/protocol/tests.rs @@ -24,12 +24,21 @@ * */ -use crate::engine::mem::BufferedScanner; -use crate::engine::net::protocol::handshake::{ - AuthMode, CHandshake, CHandshakeAuth, CHandshakeStatic, DataExchangeMode, HandshakeResult, - HandshakeState, HandshakeVersion, ProtocolVersion, QueryMode, +use crate::engine::{ + mem::BufferedScanner, + net::protocol::{ + data_exchange::{CSQuery, CSQueryExchangeResult, CSQueryState}, + handshake::{ + AuthMode, CHandshake, CHandshakeAuth, CHandshakeStatic, DataExchangeMode, + HandshakeResult, HandshakeState, HandshakeVersion, ProtocolVersion, QueryMode, + }, + }, }; +/* + client handshake +*/ + const FULL_HANDSHAKE_NO_AUTH: [u8; 7] = [b'H', 0, 0, 0, 0, 0, 0]; const FULL_HANDSHAKE_WITH_AUTH: [u8; 23] = *b"H\0\0\0\0\x015\n8\nsayanpass1234"; @@ -150,7 +159,6 @@ fn parse_staged_with_auth() { fn run_state_changes_return_rounds(src: &[u8], expected_final_handshake: CHandshake) -> usize { let mut rounds = 0; - let hs; let mut state = HandshakeState::default(); let mut cursor = 0; let mut expect_many = CHandshake::INITIAL_READ; @@ -164,8 +172,7 @@ fn run_state_changes_return_rounds(src: &[u8], expected_final_handshake: CHandsh expect_many = expect; cursor = scanner.cursor(); } - HandshakeResult::Completed(c) => { - hs = c; + HandshakeResult::Completed(hs) => { assert_eq!(hs, expected_final_handshake); break; } @@ -195,3 +202,63 @@ fn parse_auth_with_state_updates() { ); assert_eq!(rounds, 3); // r1 = initial read, r2 = lengths, r3 = items } + +/* + QT-DEX/SQ +*/ + +const FULL_SQ: [u8; 116] = *b"S111\nSELECT username, email, bio, profile_pic, following, followers, FROM mysocialapp.users WHERE username = 'sayan'"; +const SQ_FULL: CSQuery<'static> = CSQuery::new( + b"SELECT username, email, bio, profile_pic, following, followers, FROM mysocialapp.users WHERE username = 'sayan'" +); + +#[test] +fn staged_qt_dex_sq() { + for i in 0..FULL_SQ.len() { + let buf = &FULL_SQ[..i + 1]; + let mut scanner = BufferedScanner::new(buf); + let result = CSQuery::resume_with(&mut scanner, CSQueryState::default()); + match buf.len() { + 1..=3 => assert_eq!( + result, + CSQueryExchangeResult::ChangeState(CSQueryState::Initial, CSQuery::PREEMPTIVE_READ) + ), + 4 => assert_eq!( + result, + CSQueryExchangeResult::ChangeState(CSQueryState::SizeSegmentPart(111), 2) + ), + 5..=115 => assert_eq!( + result, + CSQueryExchangeResult::ChangeState(CSQueryState::WaitingForFullBlock(111), 111), + ), + 116 => assert_eq!(result, CSQueryExchangeResult::Completed(SQ_FULL)), + _ => unreachable!(), + } + } +} + +#[test] +fn staged_with_status_switch_qt_dex_sq() { + let mut cursor = 0; + let mut expect = CSQuery::PREEMPTIVE_READ; + let mut state = CSQueryState::default(); + let mut rounds = 0; + loop { + rounds += 1; + let buf = &FULL_SQ[..cursor + expect]; + let mut scanner = unsafe { BufferedScanner::new_with_cursor(buf, cursor) }; + match CSQuery::resume_with(&mut scanner, state) { + CSQueryExchangeResult::Completed(c) => { + assert_eq!(c, SQ_FULL); + break; + } + CSQueryExchangeResult::ChangeState(new_state, _expect) => { + state = new_state; + expect = _expect; + cursor = scanner.cursor(); + } + CSQueryExchangeResult::PacketError => panic!("packet error"), + } + } + assert_eq!(rounds, 3); +}