From b5e0f68c884b3f9f9b0ae935bc92a1b0a549a1d7 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Mon, 2 May 2022 10:25:10 -0700 Subject: [PATCH] Add support for Skyhash 1.0 --- server/src/admin/sys.rs | 8 +- server/src/protocol/interface.rs | 11 + server/src/protocol/mod.rs | 29 +- server/src/protocol/v1/benches.rs | 80 ++++++ server/src/protocol/v1/interface_impls.rs | 294 ++++++++++++++++++++ server/src/protocol/v1/mod.rs | 323 ++++++++++++++++++++++ server/src/protocol/v1/tests.rs | 93 +++++++ server/src/protocol/v2/interface_impls.rs | 17 +- server/src/protocol/v2/mod.rs | 7 +- server/src/tests/mod.rs | 6 +- sky-macros/src/lib.rs | 40 ++- 11 files changed, 876 insertions(+), 32 deletions(-) create mode 100644 server/src/protocol/v1/benches.rs create mode 100644 server/src/protocol/v1/interface_impls.rs create mode 100644 server/src/protocol/v1/mod.rs create mode 100644 server/src/protocol/v1/tests.rs diff --git a/server/src/admin/sys.rs b/server/src/admin/sys.rs index 923e5434..2bb280f2 100644 --- a/server/src/admin/sys.rs +++ b/server/src/admin/sys.rs @@ -25,9 +25,7 @@ */ use crate::{ - corestore::booltable::BoolTable, - dbnet::connection::prelude::*, - protocol::{PROTOCOL_VERSION, PROTOCOL_VERSIONSTRING}, + corestore::booltable::BoolTable, dbnet::connection::prelude::*, storage::v1::interface::DIR_ROOT, }; use ::libsky::VERSION; @@ -56,8 +54,8 @@ action! { } fn sys_info(con: &mut T, iter: &mut ActionIter<'_>) { match unsafe { iter.next_lowercase_unchecked() }.as_ref() { - INFO_PROTOCOL => con.write_string(PROTOCOL_VERSIONSTRING).await?, - INFO_PROTOVER => con.write_float(PROTOCOL_VERSION).await?, + INFO_PROTOCOL => con.write_string(P::PROTOCOL_VERSIONSTRING).await?, + INFO_PROTOVER => con.write_float(P::PROTOCOL_VERSION).await?, INFO_VERSION => con.write_string(VERSION).await?, _ => return util::err(ERR_UNKNOWN_PROPERTY), } diff --git a/server/src/protocol/interface.rs b/server/src/protocol/interface.rs index 9c28afba..48252ce9 100644 --- a/server/src/protocol/interface.rs +++ b/server/src/protocol/interface.rs @@ -38,6 +38,13 @@ use std::io::{Error as IoError, ErrorKind}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; pub trait ProtocolSpec { + // spec information + + /// The Skyhash protocol version + const PROTOCOL_VERSION: f32; + /// The Skyhash protocol version string (Skyhash-x.y) + const PROTOCOL_VERSIONSTRING: &'static str; + // type symbols const TSYMBOL_STRING: u8; const TSYMBOL_BINARY: u8; @@ -100,6 +107,7 @@ pub trait ProtocolSpec { // full responses const FULLRESP_RCODE_PACKET_ERR: &'static [u8]; + const FULLRESP_RCODE_WRONG_TYPE: &'static [u8]; const FULLRESP_HEYA: &'static [u8]; // LUTs @@ -169,6 +177,9 @@ where Err(ParseError::UnexpectedByte | ParseError::BadPacket) => { return Ok(QueryResult::E(P::FULLRESP_RCODE_PACKET_ERR)); } + Err(ParseError::WrongType) => { + return Ok(QueryResult::E(P::FULLRESP_RCODE_WRONG_TYPE)); + } } } }) diff --git a/server/src/protocol/mod.rs b/server/src/protocol/mod.rs index f42ecaca..100241f4 100644 --- a/server/src/protocol/mod.rs +++ b/server/src/protocol/mod.rs @@ -24,20 +24,28 @@ * */ -use crate::corestore::heap_array::HeapArray; -use core::{fmt, slice}; +#[cfg(test)] +use self::interface::ProtocolSpec; +use { + crate::corestore::heap_array::HeapArray, + core::{fmt, slice}, +}; // pub mods pub mod interface; pub mod iter; // versions +mod v1; mod v2; // endof pub mods -/// The Skyhash protocol version -pub const PROTOCOL_VERSION: f32 = 2.0; -/// The Skyhash protocol version string (Skyhash-x.y) -pub const PROTOCOL_VERSIONSTRING: &str = "Skyhash-2.0"; pub type Skyhash2 = v2::Parser; +pub type Skyhash1 = v1::Parser; +#[cfg(test)] +/// The latest protocol version supported by this version +pub const LATEST_PROTOCOL_VERSION: f32 = Skyhash2::PROTOCOL_VERSION; +#[cfg(test)] +/// The latest protocol version supported by this version (`Skyhash-x.y`) +pub const LATEST_PROTOCOL_VERSIONSTRING: &str = Skyhash2::PROTOCOL_VERSIONSTRING; #[derive(PartialEq)] /// As its name says, an [`UnsafeSlice`] is a terribly unsafe slice. It's guarantess are @@ -90,7 +98,6 @@ pub enum ParseError { /// Didn't get the number of expected bytes NotEnough = 0u8, /// The packet simply contains invalid data - #[allow(dead_code)] // HACK(@ohsayan): rustc can't "guess" the transmutation BadPacket = 1u8, /// The query contains an unexpected byte UnexpectedByte = 2u8, @@ -98,6 +105,8 @@ pub enum ParseError { /// /// This can happen not just for elements but can also happen for their sizes ([`Self::parse_into_u64`]) DatatypeParseFailure = 3u8, + /// The client supplied the wrong query data type for the given query + WrongType = 4u8, } /// A generic result to indicate parsing errors thorugh the [`ParseError`] enum @@ -121,6 +130,9 @@ impl SimpleQuery { data: self.data.iter().map(|v| v.as_slice().to_owned()).collect(), } } + pub const fn new(data: HeapArray) -> Self { + Self { data } + } pub fn as_slice(&self) -> &[UnsafeSlice] { &self.data } @@ -137,6 +149,9 @@ pub struct PipelinedQuery { } impl PipelinedQuery { + pub const fn new(data: HeapArray>) -> Self { + Self { data } + } pub fn len(&self) -> usize { self.data.len() } diff --git a/server/src/protocol/v1/benches.rs b/server/src/protocol/v1/benches.rs new file mode 100644 index 00000000..a47bb970 --- /dev/null +++ b/server/src/protocol/v1/benches.rs @@ -0,0 +1,80 @@ +/* + * Created on Mon May 02 2022 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2022, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +extern crate test; +use super::{super::Query, Parser}; +use test::Bencher; + +#[bench] +fn simple_query(b: &mut Bencher) { + const PAYLOAD: &[u8] = b"*1\n~3\n3\nSET\n1\nx\n3\n100\n"; + let expected = vec!["SET".to_owned(), "x".to_owned(), "100".to_owned()]; + b.iter(|| { + let (query, forward) = Parser::parse(PAYLOAD).unwrap(); + assert_eq!(forward, PAYLOAD.len()); + let query = if let Query::Simple(sq) = query { + sq + } else { + panic!("Got pipeline instead of simple query"); + }; + let ret: Vec = query + .as_slice() + .iter() + .map(|s| String::from_utf8_lossy(s.as_slice()).to_string()) + .collect(); + assert_eq!(ret, expected) + }); +} + +#[bench] +fn pipelined_query(b: &mut Bencher) { + const PAYLOAD: &[u8] = b"*2\n~3\n3\nSET\n1\nx\n3\n100\n~2\n3\nGET\n1\nx\n"; + let expected = vec![ + vec!["SET".to_owned(), "x".to_owned(), "100".to_owned()], + vec!["GET".to_owned(), "x".to_owned()], + ]; + b.iter(|| { + let (query, forward) = Parser::parse(PAYLOAD).unwrap(); + assert_eq!(forward, PAYLOAD.len()); + let query = if let Query::Pipelined(sq) = query { + sq + } else { + panic!("Got simple instead of pipeline query"); + }; + let ret: Vec> = query + .into_inner() + .iter() + .map(|query| { + query + .as_slice() + .iter() + .map(|v| String::from_utf8_lossy(v.as_slice()).to_string()) + .collect() + }) + .collect(); + assert_eq!(ret, expected) + }); +} diff --git a/server/src/protocol/v1/interface_impls.rs b/server/src/protocol/v1/interface_impls.rs new file mode 100644 index 00000000..f11a8a21 --- /dev/null +++ b/server/src/protocol/v1/interface_impls.rs @@ -0,0 +1,294 @@ +/* + * Created on Mon May 02 2022 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2022, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + crate::{ + corestore::buffers::Integer64, + dbnet::connection::{QueryWithAdvance, RawConnection, Stream}, + protocol::{ + interface::{ProtocolRead, ProtocolSpec, ProtocolWrite}, + ParseError, Skyhash1, + }, + util::FutureResult, + IoResult, + }, + ::sky_macros::compiled_eresp_bytes_v1 as eresp, + tokio::io::AsyncWriteExt, +}; + +impl ProtocolSpec for Skyhash1 { + // spec information + const PROTOCOL_VERSION: f32 = 1.0; + const PROTOCOL_VERSIONSTRING: &'static str = "Skyhash-1.0"; + + // type symbols + const TSYMBOL_STRING: u8 = b'+'; + const TSYMBOL_BINARY: u8 = b'?'; + const TSYMBOL_FLOAT: u8 = b'%'; + const TSYMBOL_INT64: u8 = b':'; + const TSYMBOL_TYPED_ARRAY: u8 = b'@'; + const TSYMBOL_TYPED_NON_NULL_ARRAY: u8 = b'^'; + const TSYMBOL_ARRAY: u8 = b'&'; + const TSYMBOL_FLAT_ARRAY: u8 = b'_'; + + // typed array + const TYPE_TYPED_ARRAY_ELEMENT_NULL: &'static [u8] = b"\0\n"; + + // metaframe + const SIMPLE_QUERY_HEADER: &'static [u8] = b"*1\n"; + const PIPELINED_QUERY_FIRST_BYTE: u8 = b'*'; + + // respcodes + /// Response code 0 as a array element + const RCODE_OKAY: &'static [u8] = eresp!("0"); + /// Response code 1 as a array element + const RCODE_NIL: &'static [u8] = eresp!("1"); + /// Response code 2 as a array element + const RCODE_OVERWRITE_ERR: &'static [u8] = eresp!("2"); + /// Response code 3 as a array element + const RCODE_ACTION_ERR: &'static [u8] = eresp!("3"); + /// Response code 4 as a array element + const RCODE_PACKET_ERR: &'static [u8] = eresp!("4"); + /// Response code 5 as a array element + const RCODE_SERVER_ERR: &'static [u8] = eresp!("5"); + /// Response code 6 as a array element + const RCODE_OTHER_ERR_EMPTY: &'static [u8] = eresp!("6"); + /// "Unknown action" error response + const RCODE_UNKNOWN_ACTION: &'static [u8] = eresp!("Unknown action"); + /// Response code 7 + const RCODE_WRONGTYPE_ERR: &'static [u8] = eresp!("7"); + /// Response code 8 + const RCODE_UNKNOWN_DATA_TYPE: &'static [u8] = eresp!("8"); + /// Response code 9 as an array element + const RCODE_ENCODING_ERROR: &'static [u8] = eresp!("9"); + + // respstrings + + /// Snapshot busy error + const RSTRING_SNAPSHOT_BUSY: &'static [u8] = eresp!("err-snapshot-busy"); + /// Snapshot disabled (other error) + const RSTRING_SNAPSHOT_DISABLED: &'static [u8] = eresp!("err-snapshot-disabled"); + /// Duplicate snapshot + const RSTRING_SNAPSHOT_DUPLICATE: &'static [u8] = eresp!("duplicate-snapshot"); + /// Snapshot has illegal name (other error) + const RSTRING_SNAPSHOT_ILLEGAL_NAME: &'static [u8] = eresp!("err-invalid-snapshot-name"); + /// Access after termination signal (other error) + const RSTRING_ERR_ACCESS_AFTER_TERMSIG: &'static [u8] = eresp!("err-access-after-termsig"); + + // keyspace related resps + /// The default container was not set + const RSTRING_DEFAULT_UNSET: &'static [u8] = eresp!("default-container-unset"); + /// The container was not found + const RSTRING_CONTAINER_NOT_FOUND: &'static [u8] = eresp!("container-not-found"); + /// The container is still in use and so cannot be removed + const RSTRING_STILL_IN_USE: &'static [u8] = eresp!("still-in-use"); + /// This is a protected object and hence cannot be accessed + const RSTRING_PROTECTED_OBJECT: &'static [u8] = eresp!("err-protected-object"); + /// The action was applied against the wrong model + const RSTRING_WRONG_MODEL: &'static [u8] = eresp!("wrong-model"); + /// The container already exists + const RSTRING_ALREADY_EXISTS: &'static [u8] = eresp!("err-already-exists"); + /// The container is not ready + const RSTRING_NOT_READY: &'static [u8] = eresp!("not-ready"); + /// A transactional failure occurred + const RSTRING_DDL_TRANSACTIONAL_FAILURE: &'static [u8] = eresp!("transactional-failure"); + /// An unknown DDL query was run + const RSTRING_UNKNOWN_DDL_QUERY: &'static [u8] = eresp!("unknown-ddl-query"); + /// The expression for a DDL query was malformed + const RSTRING_BAD_EXPRESSION: &'static [u8] = eresp!("malformed-expression"); + /// An unknown model was passed in a DDL query + const RSTRING_UNKNOWN_MODEL: &'static [u8] = eresp!("unknown-model"); + /// Too many arguments were passed to model constructor + const RSTRING_TOO_MANY_ARGUMENTS: &'static [u8] = eresp!("too-many-args"); + /// The container name is too long + const RSTRING_CONTAINER_NAME_TOO_LONG: &'static [u8] = eresp!("container-name-too-long"); + /// The container name contains invalid characters + const RSTRING_BAD_CONTAINER_NAME: &'static [u8] = eresp!("bad-container-name"); + /// An unknown inspect query + const RSTRING_UNKNOWN_INSPECT_QUERY: &'static [u8] = eresp!("unknown-inspect-query"); + /// An unknown table property was passed + const RSTRING_UNKNOWN_PROPERTY: &'static [u8] = eresp!("unknown-property"); + /// The keyspace is not empty and hence cannot be removed + const RSTRING_KEYSPACE_NOT_EMPTY: &'static [u8] = eresp!("keyspace-not-empty"); + /// Bad type supplied in a DDL query for the key + const RSTRING_BAD_TYPE_FOR_KEY: &'static [u8] = eresp!("bad-type-for-key"); + /// The index for the provided list was non-existent + const RSTRING_LISTMAP_BAD_INDEX: &'static [u8] = eresp!("bad-list-index"); + /// The list is empty + const RSTRING_LISTMAP_LIST_IS_EMPTY: &'static [u8] = eresp!("list-is-empty"); + + // full responses + const FULLRESP_RCODE_PACKET_ERR: &'static [u8] = b"*1\n!1\n4\n"; + const FULLRESP_RCODE_WRONG_TYPE: &'static [u8] = b"*1\n!1\n7\n"; + const FULLRESP_HEYA: &'static [u8] = b"*1\n+4\nHEY!\n"; +} + +impl ProtocolRead for T +where + T: RawConnection + Send + Sync, + Strm: Stream, +{ + fn try_query(&self) -> Result { + Skyhash1::parse(self.get_buffer()) + } +} + +impl ProtocolWrite for T +where + T: RawConnection + Send + Sync, + Strm: Stream, +{ + fn write_string<'life0, 'life1, 'ret_life>( + &'life0 mut self, + string: &'life1 str, + ) -> FutureResult<'ret_life, IoResult<()>> + where + 'life0: 'ret_life, + 'life1: 'ret_life, + Self: 'ret_life, + { + Box::pin(async move { + let stream = self.get_mut_stream(); + // tsymbol + stream.write_all(&[Skyhash1::TSYMBOL_STRING]).await?; + // length + let len_bytes = Integer64::from(string.len()); + stream.write_all(&len_bytes).await?; + // LF + stream.write_all(&[Skyhash1::LF]).await?; + // payload + stream.write_all(string.as_bytes()).await?; + // final LF + stream.write_all(&[Skyhash1::LF]).await + }) + } + fn write_binary<'life0, 'life1, 'ret_life>( + &'life0 mut self, + binary: &'life1 [u8], + ) -> FutureResult<'ret_life, IoResult<()>> + where + 'life0: 'ret_life, + 'life1: 'ret_life, + Self: 'ret_life, + { + Box::pin(async move { + let stream = self.get_mut_stream(); + // tsymbol + stream.write_all(&[Skyhash1::TSYMBOL_BINARY]).await?; + // length + let len_bytes = Integer64::from(binary.len()); + stream.write_all(&len_bytes).await?; + // LF + stream.write_all(&[Skyhash1::LF]).await?; + // payload + stream.write_all(binary).await?; + // final LF + stream.write_all(&[Skyhash1::LF]).await + }) + } + fn write_usize<'life0, 'ret_life>( + &'life0 mut self, + size: usize, + ) -> FutureResult<'ret_life, IoResult<()>> + where + 'life0: 'ret_life, + Self: 'ret_life, + { + Box::pin(async move { self.write_int64(size as _).await }) + } + fn write_int64<'life0, 'ret_life>( + &'life0 mut self, + int: u64, + ) -> FutureResult<'ret_life, IoResult<()>> + where + 'life0: 'ret_life, + Self: 'ret_life, + { + Box::pin(async move { + let stream = self.get_mut_stream(); + // tsymbol + stream.write_all(&[Skyhash1::TSYMBOL_INT64]).await?; + // get body and sizeline + let body = Integer64::from(int); + let body_len = Integer64::from(body.len()); + // len of body + stream.write_all(&body_len).await?; + // sizeline LF + stream.write_all(&[Skyhash1::LF]).await?; + // body + stream.write_all(&body).await?; + // LF + stream.write_all(&[Skyhash1::LF]).await + }) + } + fn write_float<'life0, 'ret_life>( + &'life0 mut self, + float: f32, + ) -> FutureResult<'ret_life, IoResult<()>> + where + 'life0: 'ret_life, + Self: 'ret_life, + { + Box::pin(async move { + let stream = self.get_mut_stream(); + // tsymbol + stream.write_all(&[Skyhash1::TSYMBOL_FLOAT]).await?; + // get body and sizeline + let body = float.to_string(); + let body = body.as_bytes(); + let sizeline = Integer64::from(body.len()); + // sizeline + stream.write_all(&sizeline).await?; + // sizeline LF + stream.write_all(&[Skyhash1::LF]).await?; + // body + stream.write_all(body).await?; + // LF + stream.write_all(&[Skyhash1::LF]).await + }) + } + fn write_typed_array_element<'life0, 'life1, 'ret_life>( + &'life0 mut self, + element: &'life1 [u8], + ) -> FutureResult<'ret_life, IoResult<()>> + where + 'life0: 'ret_life, + 'life1: 'ret_life, + Self: 'ret_life, + { + Box::pin(async move { + let stream = self.get_mut_stream(); + // len + stream.write_all(&Integer64::from(element.len())).await?; + // LF + stream.write_all(&[Skyhash1::LF]).await?; + // body + stream.write_all(element).await?; + // LF + stream.write_all(&[Skyhash1::LF]).await + }) + } +} diff --git a/server/src/protocol/v1/mod.rs b/server/src/protocol/v1/mod.rs new file mode 100644 index 00000000..23545309 --- /dev/null +++ b/server/src/protocol/v1/mod.rs @@ -0,0 +1,323 @@ +/* + * Created on Sat Apr 30 2022 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2022, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use super::{ParseError, ParseResult, PipelinedQuery, Query, SimpleQuery, UnsafeSlice}; +use crate::{ + corestore::heap_array::{HeapArray, HeapArrayWriter}, + dbnet::connection::QueryWithAdvance, +}; +use core::mem::transmute; + +mod interface_impls; +// test and bench modules +#[cfg(feature = "nightly")] +mod benches; +#[cfg(test)] +mod tests; + +/// A parser for Skyhash 1.0 +/// +/// Packet structure example (simple query): +/// ```text +/// *1\n +/// ~3\n +/// 3\n +/// SET\n +/// 1\n +/// x\n +/// 3\n +/// 100\n +/// ``` +pub struct Parser { + end: *const u8, + cursor: *const u8, +} + +unsafe impl Send for Parser {} +unsafe impl Sync for Parser {} + +impl Parser { + /// Initialize a new parser + fn new(slice: &[u8]) -> Self { + unsafe { + Self { + end: slice.as_ptr().add(slice.len()), + cursor: slice.as_ptr(), + } + } + } +} + +// basic methods +impl Parser { + /// Returns a ptr one byte past the allocation of the buffer + const fn data_end_ptr(&self) -> *const u8 { + self.end + } + /// Returns the position of the cursor + /// WARNING: Deref might led to a segfault + const fn cursor_ptr(&self) -> *const u8 { + self.cursor + } + /// Check how many bytes we have left + fn remaining(&self) -> usize { + self.data_end_ptr() as usize - self.cursor_ptr() as usize + } + /// Check if we have `size` bytes remaining + fn has_remaining(&self, size: usize) -> bool { + self.remaining() >= size + } + /// Check if we have exhausted the buffer + fn exhausted(&self) -> bool { + self.cursor_ptr() >= self.data_end_ptr() + } + /// Check if the buffer is not exhausted + fn not_exhausted(&self) -> bool { + self.cursor_ptr() < self.data_end_ptr() + } + /// Attempts to return the byte pointed at by the cursor. + /// WARNING: The same segfault warning + const unsafe fn get_byte_at_cursor(&self) -> u8 { + *self.cursor_ptr() + } +} + +// mutable refs +impl Parser { + /// Increment the cursor by `by` positions + unsafe fn incr_cursor_by(&mut self, by: usize) { + self.cursor = self.cursor.add(by); + } + /// Increment the position of the cursor by one position + unsafe fn incr_cursor(&mut self) { + self.incr_cursor_by(1); + } +} + +// utility methods +impl Parser { + /// Returns true if the cursor will give a char, but if `this_if_nothing_ahead` is set + /// to true, then if no byte is ahead, it will still return true + fn will_cursor_give_char(&self, ch: u8, true_if_nothing_ahead: bool) -> ParseResult { + if self.exhausted() { + // nothing left + if true_if_nothing_ahead { + Ok(true) + } else { + Err(ParseError::NotEnough) + } + } else if unsafe { self.get_byte_at_cursor().eq(&ch) } { + Ok(true) + } else { + Ok(false) + } + } + /// Check if the current cursor will give an LF + fn will_cursor_give_linefeed(&self) -> ParseResult { + self.will_cursor_give_char(b'\n', false) + } + /// Gets the _next element. **The cursor should be at the tsymbol (passed)** + fn _next(&mut self) -> ParseResult { + let element_size = self.read_usize()?; + self.read_until(element_size) + } +} + +// higher level abstractions +impl Parser { + /// Attempt to read `len` bytes + fn read_until(&mut self, len: usize) -> ParseResult { + if self.has_remaining(len) { + unsafe { + // UNSAFE(@ohsayan): Already verified lengths + let slice = UnsafeSlice::new(self.cursor_ptr(), len); + self.incr_cursor_by(len); + Ok(slice) + } + } else { + Err(ParseError::NotEnough) + } + } + /// Attempt to read a line, **rejecting an empty payload** + fn read_line_pedantic(&mut self) -> ParseResult { + let start_ptr = self.cursor_ptr(); + unsafe { + while self.not_exhausted() && self.get_byte_at_cursor() != b'\n' { + self.incr_cursor(); + } + let len = self.cursor_ptr() as usize - start_ptr as usize; + let has_lf = self.not_exhausted() && self.get_byte_at_cursor() == b'\n'; + if has_lf && len != 0 { + self.incr_cursor(); // skip LF + Ok(UnsafeSlice::new(start_ptr, len)) + } else { + // just some silly hackery + Err(transmute(has_lf)) + } + } + } + /// Attempt to read an `usize` from the buffer + fn read_usize(&mut self) -> ParseResult { + let line = self.read_line_pedantic()?; + let bytes = line.as_slice(); + let mut ret = 0usize; + for byte in bytes { + if byte.is_ascii_digit() { + ret = match ret.checked_mul(10) { + Some(r) => r, + None => return Err(ParseError::DatatypeParseFailure), + }; + ret = match ret.checked_add((byte & 0x0F) as _) { + Some(r) => r, + None => return Err(ParseError::DatatypeParseFailure), + }; + } else { + return Err(ParseError::DatatypeParseFailure); + } + } + Ok(ret) + } + /// Parse the next blob. **The cursor should be at the tsymbol (passed)** + fn parse_next_blob(&mut self) -> ParseResult { + { + let chunk = self._next()?; + if self.will_cursor_give_linefeed()? { + unsafe { + // UNSAFE(@ohsayan): We know that the buffer is not exhausted + // due to the above condition + self.incr_cursor(); + } + Ok(chunk) + } else { + Err(ParseError::UnexpectedByte) + } + } + } +} + +// query abstractions +impl Parser { + /// The buffer should resemble the below structure: + /// ``` + /// ~\n + /// \n + /// \n + /// \n + /// \n + /// ... + /// ``` + fn _parse_simple_query(&mut self) -> ParseResult> { + if self.not_exhausted() { + if unsafe { self.get_byte_at_cursor() } != b'~' { + // we need an any array + return Err(ParseError::WrongType); + } + unsafe { + // UNSAFE(@ohsayan): Just checked length + self.incr_cursor(); + } + let query_count = self.read_usize()?; + let mut writer = HeapArrayWriter::with_capacity(query_count); + for i in 0..query_count { + unsafe { + // UNSAFE(@ohsayan): The index of the for loop ensures that + // we never attempt to write to a bad memory location + writer.write_to_index(i, self.parse_next_blob()?); + } + } + Ok(unsafe { + // UNSAFE(@ohsayan): If we've reached here, then we have initialized + // all the queries + writer.finish() + }) + } else { + Err(ParseError::NotEnough) + } + } + fn parse_simple_query(&mut self) -> ParseResult { + Ok(SimpleQuery::new(self._parse_simple_query()?)) + } + /// The buffer should resemble the following structure: + /// ```text + /// # query 1 + /// ~\n + /// \n + /// \n + /// \n + /// \n + /// # query 2 + /// ~\n + /// \n + /// \n + /// \n + /// \n + /// ... + /// ``` + fn parse_pipelined_query(&mut self, length: usize) -> ParseResult { + let mut writer = HeapArrayWriter::with_capacity(length); + for i in 0..length { + unsafe { + // UNSAFE(@ohsayan): The above condition guarantees that the index + // never causes an overflow + writer.write_to_index(i, self._parse_simple_query()?); + } + } + unsafe { + // UNSAFE(@ohsayan): if we reached here, then we have inited everything + Ok(PipelinedQuery::new(writer.finish())) + } + } + fn _parse(&mut self) -> ParseResult { + if self.not_exhausted() { + let first_byte = unsafe { + // UNSAFE(@ohsayan): Just checked if buffer is exhausted or not + self.get_byte_at_cursor() + }; + if first_byte != b'*' { + // unknown query scheme, so it's a bad packet + return Err(ParseError::BadPacket); + } + unsafe { + // UNSAFE(@ohsayan): Checked buffer len and incremented, so we're good + self.incr_cursor() + }; + let query_count = self.read_usize()?; // get the length + if query_count == 1 { + Ok(Query::Simple(self.parse_simple_query()?)) + } else { + Ok(Query::Pipelined(self.parse_pipelined_query(query_count)?)) + } + } else { + Err(ParseError::NotEnough) + } + } + pub fn parse(buf: &[u8]) -> ParseResult { + let mut slf = Self::new(buf); + let body = slf._parse()?; + let consumed = slf.cursor_ptr() as usize - buf.as_ptr() as usize; + Ok((body, consumed)) + } +} diff --git a/server/src/protocol/v1/tests.rs b/server/src/protocol/v1/tests.rs new file mode 100644 index 00000000..fb2205c5 --- /dev/null +++ b/server/src/protocol/v1/tests.rs @@ -0,0 +1,93 @@ +/* + * Created on Mon May 02 2022 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2022, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::Parser, + crate::protocol::{ParseError, Query}, +}; + +#[cfg(test)] +const SQPAYLOAD: &[u8] = b"*1\n~3\n3\nSET\n1\nx\n3\n100\n"; +#[cfg(test)] +const PQPAYLOAD: &[u8] = b"*2\n~3\n3\nSET\n1\nx\n3\n100\n~2\n3\nGET\n1\nx\n"; + +#[test] +fn parse_simple_query() { + let payload = SQPAYLOAD.to_vec(); + let (q, f) = Parser::parse(&payload).unwrap(); + let q: Vec = if let Query::Simple(q) = q { + q.as_slice() + .iter() + .map(|v| String::from_utf8_lossy(v.as_slice()).to_string()) + .collect() + } else { + panic!("Expected simple query") + }; + assert_eq!(f, payload.len()); + assert_eq!(q, vec!["SET".to_owned(), "x".into(), "100".into()]); +} + +#[test] +fn parse_simple_query_incomplete() { + for i in 0..SQPAYLOAD.len() - 1 { + let slice = &SQPAYLOAD[..i]; + assert_eq!(Parser::parse(slice).unwrap_err(), ParseError::NotEnough); + } +} + +#[test] +fn parse_pipelined_query() { + let payload = PQPAYLOAD.to_vec(); + let (q, f) = Parser::parse(&payload).unwrap(); + let q: Vec> = if let Query::Pipelined(q) = q { + q.into_inner() + .iter() + .map(|sq| { + sq.iter() + .map(|v| String::from_utf8_lossy(v.as_slice()).to_string()) + .collect() + }) + .collect() + } else { + panic!("Expected pipelined query query") + }; + assert_eq!(f, payload.len()); + assert_eq!( + q, + vec![ + vec!["SET".to_owned(), "x".into(), "100".into()], + vec!["GET".into(), "x".into()] + ] + ); +} + +#[test] +fn parse_pipelined_query_incomplete() { + for i in 0..PQPAYLOAD.len() - 1 { + let slice = &PQPAYLOAD[..i]; + assert_eq!(Parser::parse(slice).unwrap_err(), ParseError::NotEnough); + } +} diff --git a/server/src/protocol/v2/interface_impls.rs b/server/src/protocol/v2/interface_impls.rs index fc60c04a..962543a5 100644 --- a/server/src/protocol/v2/interface_impls.rs +++ b/server/src/protocol/v2/interface_impls.rs @@ -38,6 +38,10 @@ use ::sky_macros::compiled_eresp_bytes as eresp; use tokio::io::AsyncWriteExt; impl ProtocolSpec for Skyhash2 { + // spec information + const PROTOCOL_VERSION: f32 = 2.0; + const PROTOCOL_VERSIONSTRING: &'static str = "Skyhash-2.0"; + // type symbols const TSYMBOL_STRING: u8 = b'+'; const TSYMBOL_BINARY: u8 = b'?'; @@ -135,7 +139,8 @@ impl ProtocolSpec for Skyhash2 { const RSTRING_LISTMAP_LIST_IS_EMPTY: &'static [u8] = eresp!("list-is-empty"); // full responses - const FULLRESP_RCODE_PACKET_ERR: &'static [u8] = b"*!1\n4\n"; + const FULLRESP_RCODE_PACKET_ERR: &'static [u8] = b"*!4\n"; + const FULLRESP_RCODE_WRONG_TYPE: &'static [u8] = b"*!7\n"; const FULLRESP_HEYA: &'static [u8] = b"+4\nHEY!"; } @@ -206,15 +211,7 @@ where 'life0: 'ret_life, Self: 'ret_life, { - Box::pin(async move { - let stream = self.get_mut_stream(); - // tsymbol - stream.write_all(&[Skyhash2::TSYMBOL_INT64]).await?; - // body - stream.write_all(&Integer64::from(size)).await?; - // LF - stream.write_all(&[Skyhash2::LF]).await - }) + Box::pin(async move { self.write_int64(size as _).await }) } fn write_int64<'life0, 'ret_life>( &'life0 mut self, diff --git a/server/src/protocol/v2/mod.rs b/server/src/protocol/v2/mod.rs index 8d533e88..c57e47e7 100644 --- a/server/src/protocol/v2/mod.rs +++ b/server/src/protocol/v2/mod.rs @@ -28,6 +28,7 @@ mod interface_impls; use crate::{ corestore::heap_array::HeapArray, + dbnet::connection::QueryWithAdvance, protocol::{ParseError, ParseResult, PipelinedQuery, Query, SimpleQuery, UnsafeSlice}, }; use core::mem::transmute; @@ -205,9 +206,7 @@ impl Parser { } /// Parse a simple query fn next_simple_query(&mut self) -> ParseResult { - Ok(SimpleQuery { - data: self._next_simple_query()?, - }) + Ok(SimpleQuery::new(self._next_simple_query()?)) } /// Parse a pipelined query. This should have passed the `$` tsymbol /// @@ -284,7 +283,7 @@ impl Parser { } // only expose this. don't expose Self::new since that'll be _relatively easier_ to // invalidate invariants for - pub fn parse(buf: &[u8]) -> ParseResult<(Query, usize)> { + pub fn parse(buf: &[u8]) -> ParseResult { let mut slf = Self::new(buf); let body = slf._parse()?; let consumed = slf.cursor_ptr() as usize - buf.as_ptr() as usize; diff --git a/server/src/tests/mod.rs b/server/src/tests/mod.rs index ac4c243a..3f2a5274 100644 --- a/server/src/tests/mod.rs +++ b/server/src/tests/mod.rs @@ -52,7 +52,7 @@ mod tls { } mod sys { - use crate::protocol::{PROTOCOL_VERSION, PROTOCOL_VERSIONSTRING}; + use crate::protocol::{LATEST_PROTOCOL_VERSION, LATEST_PROTOCOL_VERSIONSTRING}; use libsky::VERSION; use sky_macros::dbtest_func as dbtest; use skytable::{query, Element, RespCode}; @@ -79,7 +79,7 @@ mod sys { runeq!( con, query!("sys", "info", "protocol"), - Element::String(PROTOCOL_VERSIONSTRING.to_owned()) + Element::String(LATEST_PROTOCOL_VERSIONSTRING.to_owned()) ) } #[dbtest] @@ -87,7 +87,7 @@ mod sys { runeq!( con, query!("sys", "info", "protover"), - Element::Float(PROTOCOL_VERSION) + Element::Float(LATEST_PROTOCOL_VERSION) ) } #[dbtest] diff --git a/sky-macros/src/lib.rs b/sky-macros/src/lib.rs index 349aa48d..2d66332f 100644 --- a/sky-macros/src/lib.rs +++ b/sky-macros/src/lib.rs @@ -106,18 +106,40 @@ pub fn dbtest_func(args: TokenStream, item: TokenStream) -> TokenStream { /// Get a compile time respcode/respstring array. For example, if you pass: "Unknown action", /// it will return: `!14\nUnknown Action\n` pub fn compiled_eresp_array(tokens: TokenStream) -> TokenStream { - _get_eresp_array(tokens) + _get_eresp_array(tokens, false) } -fn _get_eresp_array(tokens: TokenStream) -> TokenStream { +#[proc_macro] +/// Get a compile time respcode/respstring array. For example, if you pass: "Unknown action", +/// it will return: `!14\n14\nUnknown Action\n` +pub fn compiled_eresp_array_v1(tokens: TokenStream) -> TokenStream { + _get_eresp_array(tokens, true) +} + +fn _get_eresp_array(tokens: TokenStream, sizeline: bool) -> TokenStream { let payload_str = match syn::parse_macro_input!(tokens as Lit) { Lit::Str(st) => st.value(), _ => panic!("Expected a string literal"), }; - let payload_bytes = payload_str.as_bytes(); let mut processed = quote! { b'!', }; + if sizeline { + let payload_len = payload_str.as_bytes().len(); + let payload_len_str = payload_len.to_string(); + let payload_len_bytes = payload_len_str.as_bytes(); + for byte in payload_len_bytes { + processed = quote! { + #processed + #byte, + }; + } + processed = quote! { + #processed + b'\n', + }; + } + let payload_bytes = payload_str.as_bytes(); for byte in payload_bytes { processed = quote! { #processed @@ -145,3 +167,15 @@ pub fn compiled_eresp_bytes(tokens: TokenStream) -> TokenStream { } .into() } + +#[proc_macro] +/// Get a compile time respcode/respstring slice. For example, if you pass: "Unknown action", +/// it will return: `!14\nUnknown Action\n` +pub fn compiled_eresp_bytes_v1(tokens: TokenStream) -> TokenStream { + let ret = compiled_eresp_array_v1(tokens); + let ret = syn::parse_macro_input!(ret as syn::Expr); + quote! { + &#ret + } + .into() +}