Add support for Skyhash 1.0

next
Sayan Nandan 2 years ago
parent 7ec599edcb
commit b5e0f68c88
No known key found for this signature in database
GPG Key ID: 8BC07A0A4D41DD52

@ -25,9 +25,7 @@
*/
use crate::{
corestore::booltable::BoolTable,
dbnet::connection::prelude::*,
protocol::{PROTOCOL_VERSION, PROTOCOL_VERSIONSTRING},
corestore::booltable::BoolTable, dbnet::connection::prelude::*,
storage::v1::interface::DIR_ROOT,
};
use ::libsky::VERSION;
@ -56,8 +54,8 @@ action! {
}
fn sys_info(con: &mut T, iter: &mut ActionIter<'_>) {
match unsafe { iter.next_lowercase_unchecked() }.as_ref() {
INFO_PROTOCOL => con.write_string(PROTOCOL_VERSIONSTRING).await?,
INFO_PROTOVER => con.write_float(PROTOCOL_VERSION).await?,
INFO_PROTOCOL => con.write_string(P::PROTOCOL_VERSIONSTRING).await?,
INFO_PROTOVER => con.write_float(P::PROTOCOL_VERSION).await?,
INFO_VERSION => con.write_string(VERSION).await?,
_ => return util::err(ERR_UNKNOWN_PROPERTY),
}

@ -38,6 +38,13 @@ use std::io::{Error as IoError, ErrorKind};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
pub trait ProtocolSpec {
// spec information
/// The Skyhash protocol version
const PROTOCOL_VERSION: f32;
/// The Skyhash protocol version string (Skyhash-x.y)
const PROTOCOL_VERSIONSTRING: &'static str;
// type symbols
const TSYMBOL_STRING: u8;
const TSYMBOL_BINARY: u8;
@ -100,6 +107,7 @@ pub trait ProtocolSpec {
// full responses
const FULLRESP_RCODE_PACKET_ERR: &'static [u8];
const FULLRESP_RCODE_WRONG_TYPE: &'static [u8];
const FULLRESP_HEYA: &'static [u8];
// LUTs
@ -169,6 +177,9 @@ where
Err(ParseError::UnexpectedByte | ParseError::BadPacket) => {
return Ok(QueryResult::E(P::FULLRESP_RCODE_PACKET_ERR));
}
Err(ParseError::WrongType) => {
return Ok(QueryResult::E(P::FULLRESP_RCODE_WRONG_TYPE));
}
}
}
})

@ -24,20 +24,28 @@
*
*/
use crate::corestore::heap_array::HeapArray;
use core::{fmt, slice};
#[cfg(test)]
use self::interface::ProtocolSpec;
use {
crate::corestore::heap_array::HeapArray,
core::{fmt, slice},
};
// pub mods
pub mod interface;
pub mod iter;
// versions
mod v1;
mod v2;
// endof pub mods
/// The Skyhash protocol version
pub const PROTOCOL_VERSION: f32 = 2.0;
/// The Skyhash protocol version string (Skyhash-x.y)
pub const PROTOCOL_VERSIONSTRING: &str = "Skyhash-2.0";
pub type Skyhash2 = v2::Parser;
pub type Skyhash1 = v1::Parser;
#[cfg(test)]
/// The latest protocol version supported by this version
pub const LATEST_PROTOCOL_VERSION: f32 = Skyhash2::PROTOCOL_VERSION;
#[cfg(test)]
/// The latest protocol version supported by this version (`Skyhash-x.y`)
pub const LATEST_PROTOCOL_VERSIONSTRING: &str = Skyhash2::PROTOCOL_VERSIONSTRING;
#[derive(PartialEq)]
/// As its name says, an [`UnsafeSlice`] is a terribly unsafe slice. It's guarantess are
@ -90,7 +98,6 @@ pub enum ParseError {
/// Didn't get the number of expected bytes
NotEnough = 0u8,
/// The packet simply contains invalid data
#[allow(dead_code)] // HACK(@ohsayan): rustc can't "guess" the transmutation
BadPacket = 1u8,
/// The query contains an unexpected byte
UnexpectedByte = 2u8,
@ -98,6 +105,8 @@ pub enum ParseError {
///
/// This can happen not just for elements but can also happen for their sizes ([`Self::parse_into_u64`])
DatatypeParseFailure = 3u8,
/// The client supplied the wrong query data type for the given query
WrongType = 4u8,
}
/// A generic result to indicate parsing errors thorugh the [`ParseError`] enum
@ -121,6 +130,9 @@ impl SimpleQuery {
data: self.data.iter().map(|v| v.as_slice().to_owned()).collect(),
}
}
pub const fn new(data: HeapArray<UnsafeSlice>) -> Self {
Self { data }
}
pub fn as_slice(&self) -> &[UnsafeSlice] {
&self.data
}
@ -137,6 +149,9 @@ pub struct PipelinedQuery {
}
impl PipelinedQuery {
pub const fn new(data: HeapArray<HeapArray<UnsafeSlice>>) -> Self {
Self { data }
}
pub fn len(&self) -> usize {
self.data.len()
}

@ -0,0 +1,80 @@
/*
* Created on Mon May 02 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
extern crate test;
use super::{super::Query, Parser};
use test::Bencher;
#[bench]
fn simple_query(b: &mut Bencher) {
const PAYLOAD: &[u8] = b"*1\n~3\n3\nSET\n1\nx\n3\n100\n";
let expected = vec!["SET".to_owned(), "x".to_owned(), "100".to_owned()];
b.iter(|| {
let (query, forward) = Parser::parse(PAYLOAD).unwrap();
assert_eq!(forward, PAYLOAD.len());
let query = if let Query::Simple(sq) = query {
sq
} else {
panic!("Got pipeline instead of simple query");
};
let ret: Vec<String> = query
.as_slice()
.iter()
.map(|s| String::from_utf8_lossy(s.as_slice()).to_string())
.collect();
assert_eq!(ret, expected)
});
}
#[bench]
fn pipelined_query(b: &mut Bencher) {
const PAYLOAD: &[u8] = b"*2\n~3\n3\nSET\n1\nx\n3\n100\n~2\n3\nGET\n1\nx\n";
let expected = vec![
vec!["SET".to_owned(), "x".to_owned(), "100".to_owned()],
vec!["GET".to_owned(), "x".to_owned()],
];
b.iter(|| {
let (query, forward) = Parser::parse(PAYLOAD).unwrap();
assert_eq!(forward, PAYLOAD.len());
let query = if let Query::Pipelined(sq) = query {
sq
} else {
panic!("Got simple instead of pipeline query");
};
let ret: Vec<Vec<String>> = query
.into_inner()
.iter()
.map(|query| {
query
.as_slice()
.iter()
.map(|v| String::from_utf8_lossy(v.as_slice()).to_string())
.collect()
})
.collect();
assert_eq!(ret, expected)
});
}

@ -0,0 +1,294 @@
/*
* Created on Mon May 02 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::{
corestore::buffers::Integer64,
dbnet::connection::{QueryWithAdvance, RawConnection, Stream},
protocol::{
interface::{ProtocolRead, ProtocolSpec, ProtocolWrite},
ParseError, Skyhash1,
},
util::FutureResult,
IoResult,
},
::sky_macros::compiled_eresp_bytes_v1 as eresp,
tokio::io::AsyncWriteExt,
};
impl ProtocolSpec for Skyhash1 {
// spec information
const PROTOCOL_VERSION: f32 = 1.0;
const PROTOCOL_VERSIONSTRING: &'static str = "Skyhash-1.0";
// type symbols
const TSYMBOL_STRING: u8 = b'+';
const TSYMBOL_BINARY: u8 = b'?';
const TSYMBOL_FLOAT: u8 = b'%';
const TSYMBOL_INT64: u8 = b':';
const TSYMBOL_TYPED_ARRAY: u8 = b'@';
const TSYMBOL_TYPED_NON_NULL_ARRAY: u8 = b'^';
const TSYMBOL_ARRAY: u8 = b'&';
const TSYMBOL_FLAT_ARRAY: u8 = b'_';
// typed array
const TYPE_TYPED_ARRAY_ELEMENT_NULL: &'static [u8] = b"\0\n";
// metaframe
const SIMPLE_QUERY_HEADER: &'static [u8] = b"*1\n";
const PIPELINED_QUERY_FIRST_BYTE: u8 = b'*';
// respcodes
/// Response code 0 as a array element
const RCODE_OKAY: &'static [u8] = eresp!("0");
/// Response code 1 as a array element
const RCODE_NIL: &'static [u8] = eresp!("1");
/// Response code 2 as a array element
const RCODE_OVERWRITE_ERR: &'static [u8] = eresp!("2");
/// Response code 3 as a array element
const RCODE_ACTION_ERR: &'static [u8] = eresp!("3");
/// Response code 4 as a array element
const RCODE_PACKET_ERR: &'static [u8] = eresp!("4");
/// Response code 5 as a array element
const RCODE_SERVER_ERR: &'static [u8] = eresp!("5");
/// Response code 6 as a array element
const RCODE_OTHER_ERR_EMPTY: &'static [u8] = eresp!("6");
/// "Unknown action" error response
const RCODE_UNKNOWN_ACTION: &'static [u8] = eresp!("Unknown action");
/// Response code 7
const RCODE_WRONGTYPE_ERR: &'static [u8] = eresp!("7");
/// Response code 8
const RCODE_UNKNOWN_DATA_TYPE: &'static [u8] = eresp!("8");
/// Response code 9 as an array element
const RCODE_ENCODING_ERROR: &'static [u8] = eresp!("9");
// respstrings
/// Snapshot busy error
const RSTRING_SNAPSHOT_BUSY: &'static [u8] = eresp!("err-snapshot-busy");
/// Snapshot disabled (other error)
const RSTRING_SNAPSHOT_DISABLED: &'static [u8] = eresp!("err-snapshot-disabled");
/// Duplicate snapshot
const RSTRING_SNAPSHOT_DUPLICATE: &'static [u8] = eresp!("duplicate-snapshot");
/// Snapshot has illegal name (other error)
const RSTRING_SNAPSHOT_ILLEGAL_NAME: &'static [u8] = eresp!("err-invalid-snapshot-name");
/// Access after termination signal (other error)
const RSTRING_ERR_ACCESS_AFTER_TERMSIG: &'static [u8] = eresp!("err-access-after-termsig");
// keyspace related resps
/// The default container was not set
const RSTRING_DEFAULT_UNSET: &'static [u8] = eresp!("default-container-unset");
/// The container was not found
const RSTRING_CONTAINER_NOT_FOUND: &'static [u8] = eresp!("container-not-found");
/// The container is still in use and so cannot be removed
const RSTRING_STILL_IN_USE: &'static [u8] = eresp!("still-in-use");
/// This is a protected object and hence cannot be accessed
const RSTRING_PROTECTED_OBJECT: &'static [u8] = eresp!("err-protected-object");
/// The action was applied against the wrong model
const RSTRING_WRONG_MODEL: &'static [u8] = eresp!("wrong-model");
/// The container already exists
const RSTRING_ALREADY_EXISTS: &'static [u8] = eresp!("err-already-exists");
/// The container is not ready
const RSTRING_NOT_READY: &'static [u8] = eresp!("not-ready");
/// A transactional failure occurred
const RSTRING_DDL_TRANSACTIONAL_FAILURE: &'static [u8] = eresp!("transactional-failure");
/// An unknown DDL query was run
const RSTRING_UNKNOWN_DDL_QUERY: &'static [u8] = eresp!("unknown-ddl-query");
/// The expression for a DDL query was malformed
const RSTRING_BAD_EXPRESSION: &'static [u8] = eresp!("malformed-expression");
/// An unknown model was passed in a DDL query
const RSTRING_UNKNOWN_MODEL: &'static [u8] = eresp!("unknown-model");
/// Too many arguments were passed to model constructor
const RSTRING_TOO_MANY_ARGUMENTS: &'static [u8] = eresp!("too-many-args");
/// The container name is too long
const RSTRING_CONTAINER_NAME_TOO_LONG: &'static [u8] = eresp!("container-name-too-long");
/// The container name contains invalid characters
const RSTRING_BAD_CONTAINER_NAME: &'static [u8] = eresp!("bad-container-name");
/// An unknown inspect query
const RSTRING_UNKNOWN_INSPECT_QUERY: &'static [u8] = eresp!("unknown-inspect-query");
/// An unknown table property was passed
const RSTRING_UNKNOWN_PROPERTY: &'static [u8] = eresp!("unknown-property");
/// The keyspace is not empty and hence cannot be removed
const RSTRING_KEYSPACE_NOT_EMPTY: &'static [u8] = eresp!("keyspace-not-empty");
/// Bad type supplied in a DDL query for the key
const RSTRING_BAD_TYPE_FOR_KEY: &'static [u8] = eresp!("bad-type-for-key");
/// The index for the provided list was non-existent
const RSTRING_LISTMAP_BAD_INDEX: &'static [u8] = eresp!("bad-list-index");
/// The list is empty
const RSTRING_LISTMAP_LIST_IS_EMPTY: &'static [u8] = eresp!("list-is-empty");
// full responses
const FULLRESP_RCODE_PACKET_ERR: &'static [u8] = b"*1\n!1\n4\n";
const FULLRESP_RCODE_WRONG_TYPE: &'static [u8] = b"*1\n!1\n7\n";
const FULLRESP_HEYA: &'static [u8] = b"*1\n+4\nHEY!\n";
}
impl<Strm, T> ProtocolRead<Skyhash1, Strm> for T
where
T: RawConnection<Skyhash1, Strm> + Send + Sync,
Strm: Stream,
{
fn try_query(&self) -> Result<QueryWithAdvance, ParseError> {
Skyhash1::parse(self.get_buffer())
}
}
impl<Strm, T> ProtocolWrite<Skyhash1, Strm> for T
where
T: RawConnection<Skyhash1, Strm> + Send + Sync,
Strm: Stream,
{
fn write_string<'life0, 'life1, 'ret_life>(
&'life0 mut self,
string: &'life1 str,
) -> FutureResult<'ret_life, IoResult<()>>
where
'life0: 'ret_life,
'life1: 'ret_life,
Self: 'ret_life,
{
Box::pin(async move {
let stream = self.get_mut_stream();
// tsymbol
stream.write_all(&[Skyhash1::TSYMBOL_STRING]).await?;
// length
let len_bytes = Integer64::from(string.len());
stream.write_all(&len_bytes).await?;
// LF
stream.write_all(&[Skyhash1::LF]).await?;
// payload
stream.write_all(string.as_bytes()).await?;
// final LF
stream.write_all(&[Skyhash1::LF]).await
})
}
fn write_binary<'life0, 'life1, 'ret_life>(
&'life0 mut self,
binary: &'life1 [u8],
) -> FutureResult<'ret_life, IoResult<()>>
where
'life0: 'ret_life,
'life1: 'ret_life,
Self: 'ret_life,
{
Box::pin(async move {
let stream = self.get_mut_stream();
// tsymbol
stream.write_all(&[Skyhash1::TSYMBOL_BINARY]).await?;
// length
let len_bytes = Integer64::from(binary.len());
stream.write_all(&len_bytes).await?;
// LF
stream.write_all(&[Skyhash1::LF]).await?;
// payload
stream.write_all(binary).await?;
// final LF
stream.write_all(&[Skyhash1::LF]).await
})
}
fn write_usize<'life0, 'ret_life>(
&'life0 mut self,
size: usize,
) -> FutureResult<'ret_life, IoResult<()>>
where
'life0: 'ret_life,
Self: 'ret_life,
{
Box::pin(async move { self.write_int64(size as _).await })
}
fn write_int64<'life0, 'ret_life>(
&'life0 mut self,
int: u64,
) -> FutureResult<'ret_life, IoResult<()>>
where
'life0: 'ret_life,
Self: 'ret_life,
{
Box::pin(async move {
let stream = self.get_mut_stream();
// tsymbol
stream.write_all(&[Skyhash1::TSYMBOL_INT64]).await?;
// get body and sizeline
let body = Integer64::from(int);
let body_len = Integer64::from(body.len());
// len of body
stream.write_all(&body_len).await?;
// sizeline LF
stream.write_all(&[Skyhash1::LF]).await?;
// body
stream.write_all(&body).await?;
// LF
stream.write_all(&[Skyhash1::LF]).await
})
}
fn write_float<'life0, 'ret_life>(
&'life0 mut self,
float: f32,
) -> FutureResult<'ret_life, IoResult<()>>
where
'life0: 'ret_life,
Self: 'ret_life,
{
Box::pin(async move {
let stream = self.get_mut_stream();
// tsymbol
stream.write_all(&[Skyhash1::TSYMBOL_FLOAT]).await?;
// get body and sizeline
let body = float.to_string();
let body = body.as_bytes();
let sizeline = Integer64::from(body.len());
// sizeline
stream.write_all(&sizeline).await?;
// sizeline LF
stream.write_all(&[Skyhash1::LF]).await?;
// body
stream.write_all(body).await?;
// LF
stream.write_all(&[Skyhash1::LF]).await
})
}
fn write_typed_array_element<'life0, 'life1, 'ret_life>(
&'life0 mut self,
element: &'life1 [u8],
) -> FutureResult<'ret_life, IoResult<()>>
where
'life0: 'ret_life,
'life1: 'ret_life,
Self: 'ret_life,
{
Box::pin(async move {
let stream = self.get_mut_stream();
// len
stream.write_all(&Integer64::from(element.len())).await?;
// LF
stream.write_all(&[Skyhash1::LF]).await?;
// body
stream.write_all(element).await?;
// LF
stream.write_all(&[Skyhash1::LF]).await
})
}
}

@ -0,0 +1,323 @@
/*
* Created on Sat Apr 30 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::{ParseError, ParseResult, PipelinedQuery, Query, SimpleQuery, UnsafeSlice};
use crate::{
corestore::heap_array::{HeapArray, HeapArrayWriter},
dbnet::connection::QueryWithAdvance,
};
use core::mem::transmute;
mod interface_impls;
// test and bench modules
#[cfg(feature = "nightly")]
mod benches;
#[cfg(test)]
mod tests;
/// A parser for Skyhash 1.0
///
/// Packet structure example (simple query):
/// ```text
/// *1\n
/// ~3\n
/// 3\n
/// SET\n
/// 1\n
/// x\n
/// 3\n
/// 100\n
/// ```
pub struct Parser {
end: *const u8,
cursor: *const u8,
}
unsafe impl Send for Parser {}
unsafe impl Sync for Parser {}
impl Parser {
/// Initialize a new parser
fn new(slice: &[u8]) -> Self {
unsafe {
Self {
end: slice.as_ptr().add(slice.len()),
cursor: slice.as_ptr(),
}
}
}
}
// basic methods
impl Parser {
/// Returns a ptr one byte past the allocation of the buffer
const fn data_end_ptr(&self) -> *const u8 {
self.end
}
/// Returns the position of the cursor
/// WARNING: Deref might led to a segfault
const fn cursor_ptr(&self) -> *const u8 {
self.cursor
}
/// Check how many bytes we have left
fn remaining(&self) -> usize {
self.data_end_ptr() as usize - self.cursor_ptr() as usize
}
/// Check if we have `size` bytes remaining
fn has_remaining(&self, size: usize) -> bool {
self.remaining() >= size
}
/// Check if we have exhausted the buffer
fn exhausted(&self) -> bool {
self.cursor_ptr() >= self.data_end_ptr()
}
/// Check if the buffer is not exhausted
fn not_exhausted(&self) -> bool {
self.cursor_ptr() < self.data_end_ptr()
}
/// Attempts to return the byte pointed at by the cursor.
/// WARNING: The same segfault warning
const unsafe fn get_byte_at_cursor(&self) -> u8 {
*self.cursor_ptr()
}
}
// mutable refs
impl Parser {
/// Increment the cursor by `by` positions
unsafe fn incr_cursor_by(&mut self, by: usize) {
self.cursor = self.cursor.add(by);
}
/// Increment the position of the cursor by one position
unsafe fn incr_cursor(&mut self) {
self.incr_cursor_by(1);
}
}
// utility methods
impl Parser {
/// Returns true if the cursor will give a char, but if `this_if_nothing_ahead` is set
/// to true, then if no byte is ahead, it will still return true
fn will_cursor_give_char(&self, ch: u8, true_if_nothing_ahead: bool) -> ParseResult<bool> {
if self.exhausted() {
// nothing left
if true_if_nothing_ahead {
Ok(true)
} else {
Err(ParseError::NotEnough)
}
} else if unsafe { self.get_byte_at_cursor().eq(&ch) } {
Ok(true)
} else {
Ok(false)
}
}
/// Check if the current cursor will give an LF
fn will_cursor_give_linefeed(&self) -> ParseResult<bool> {
self.will_cursor_give_char(b'\n', false)
}
/// Gets the _next element. **The cursor should be at the tsymbol (passed)**
fn _next(&mut self) -> ParseResult<UnsafeSlice> {
let element_size = self.read_usize()?;
self.read_until(element_size)
}
}
// higher level abstractions
impl Parser {
/// Attempt to read `len` bytes
fn read_until(&mut self, len: usize) -> ParseResult<UnsafeSlice> {
if self.has_remaining(len) {
unsafe {
// UNSAFE(@ohsayan): Already verified lengths
let slice = UnsafeSlice::new(self.cursor_ptr(), len);
self.incr_cursor_by(len);
Ok(slice)
}
} else {
Err(ParseError::NotEnough)
}
}
/// Attempt to read a line, **rejecting an empty payload**
fn read_line_pedantic(&mut self) -> ParseResult<UnsafeSlice> {
let start_ptr = self.cursor_ptr();
unsafe {
while self.not_exhausted() && self.get_byte_at_cursor() != b'\n' {
self.incr_cursor();
}
let len = self.cursor_ptr() as usize - start_ptr as usize;
let has_lf = self.not_exhausted() && self.get_byte_at_cursor() == b'\n';
if has_lf && len != 0 {
self.incr_cursor(); // skip LF
Ok(UnsafeSlice::new(start_ptr, len))
} else {
// just some silly hackery
Err(transmute(has_lf))
}
}
}
/// Attempt to read an `usize` from the buffer
fn read_usize(&mut self) -> ParseResult<usize> {
let line = self.read_line_pedantic()?;
let bytes = line.as_slice();
let mut ret = 0usize;
for byte in bytes {
if byte.is_ascii_digit() {
ret = match ret.checked_mul(10) {
Some(r) => r,
None => return Err(ParseError::DatatypeParseFailure),
};
ret = match ret.checked_add((byte & 0x0F) as _) {
Some(r) => r,
None => return Err(ParseError::DatatypeParseFailure),
};
} else {
return Err(ParseError::DatatypeParseFailure);
}
}
Ok(ret)
}
/// Parse the next blob. **The cursor should be at the tsymbol (passed)**
fn parse_next_blob(&mut self) -> ParseResult<UnsafeSlice> {
{
let chunk = self._next()?;
if self.will_cursor_give_linefeed()? {
unsafe {
// UNSAFE(@ohsayan): We know that the buffer is not exhausted
// due to the above condition
self.incr_cursor();
}
Ok(chunk)
} else {
Err(ParseError::UnexpectedByte)
}
}
}
}
// query abstractions
impl Parser {
/// The buffer should resemble the below structure:
/// ```
/// ~<count>\n
/// <e0l0>\n
/// <e0>\n
/// <e1l1>\n
/// <e1>\n
/// ...
/// ```
fn _parse_simple_query(&mut self) -> ParseResult<HeapArray<UnsafeSlice>> {
if self.not_exhausted() {
if unsafe { self.get_byte_at_cursor() } != b'~' {
// we need an any array
return Err(ParseError::WrongType);
}
unsafe {
// UNSAFE(@ohsayan): Just checked length
self.incr_cursor();
}
let query_count = self.read_usize()?;
let mut writer = HeapArrayWriter::with_capacity(query_count);
for i in 0..query_count {
unsafe {
// UNSAFE(@ohsayan): The index of the for loop ensures that
// we never attempt to write to a bad memory location
writer.write_to_index(i, self.parse_next_blob()?);
}
}
Ok(unsafe {
// UNSAFE(@ohsayan): If we've reached here, then we have initialized
// all the queries
writer.finish()
})
} else {
Err(ParseError::NotEnough)
}
}
fn parse_simple_query(&mut self) -> ParseResult<SimpleQuery> {
Ok(SimpleQuery::new(self._parse_simple_query()?))
}
/// The buffer should resemble the following structure:
/// ```text
/// # query 1
/// ~<count>\n
/// <e0l0>\n
/// <e0>\n
/// <e1l1>\n
/// <e1>\n
/// # query 2
/// ~<count>\n
/// <e0l0>\n
/// <e0>\n
/// <e1l1>\n
/// <e1>\n
/// ...
/// ```
fn parse_pipelined_query(&mut self, length: usize) -> ParseResult<PipelinedQuery> {
let mut writer = HeapArrayWriter::with_capacity(length);
for i in 0..length {
unsafe {
// UNSAFE(@ohsayan): The above condition guarantees that the index
// never causes an overflow
writer.write_to_index(i, self._parse_simple_query()?);
}
}
unsafe {
// UNSAFE(@ohsayan): if we reached here, then we have inited everything
Ok(PipelinedQuery::new(writer.finish()))
}
}
fn _parse(&mut self) -> ParseResult<Query> {
if self.not_exhausted() {
let first_byte = unsafe {
// UNSAFE(@ohsayan): Just checked if buffer is exhausted or not
self.get_byte_at_cursor()
};
if first_byte != b'*' {
// unknown query scheme, so it's a bad packet
return Err(ParseError::BadPacket);
}
unsafe {
// UNSAFE(@ohsayan): Checked buffer len and incremented, so we're good
self.incr_cursor()
};
let query_count = self.read_usize()?; // get the length
if query_count == 1 {
Ok(Query::Simple(self.parse_simple_query()?))
} else {
Ok(Query::Pipelined(self.parse_pipelined_query(query_count)?))
}
} else {
Err(ParseError::NotEnough)
}
}
pub fn parse(buf: &[u8]) -> ParseResult<QueryWithAdvance> {
let mut slf = Self::new(buf);
let body = slf._parse()?;
let consumed = slf.cursor_ptr() as usize - buf.as_ptr() as usize;
Ok((body, consumed))
}
}

@ -0,0 +1,93 @@
/*
* Created on Mon May 02 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::Parser,
crate::protocol::{ParseError, Query},
};
#[cfg(test)]
const SQPAYLOAD: &[u8] = b"*1\n~3\n3\nSET\n1\nx\n3\n100\n";
#[cfg(test)]
const PQPAYLOAD: &[u8] = b"*2\n~3\n3\nSET\n1\nx\n3\n100\n~2\n3\nGET\n1\nx\n";
#[test]
fn parse_simple_query() {
let payload = SQPAYLOAD.to_vec();
let (q, f) = Parser::parse(&payload).unwrap();
let q: Vec<String> = if let Query::Simple(q) = q {
q.as_slice()
.iter()
.map(|v| String::from_utf8_lossy(v.as_slice()).to_string())
.collect()
} else {
panic!("Expected simple query")
};
assert_eq!(f, payload.len());
assert_eq!(q, vec!["SET".to_owned(), "x".into(), "100".into()]);
}
#[test]
fn parse_simple_query_incomplete() {
for i in 0..SQPAYLOAD.len() - 1 {
let slice = &SQPAYLOAD[..i];
assert_eq!(Parser::parse(slice).unwrap_err(), ParseError::NotEnough);
}
}
#[test]
fn parse_pipelined_query() {
let payload = PQPAYLOAD.to_vec();
let (q, f) = Parser::parse(&payload).unwrap();
let q: Vec<Vec<String>> = if let Query::Pipelined(q) = q {
q.into_inner()
.iter()
.map(|sq| {
sq.iter()
.map(|v| String::from_utf8_lossy(v.as_slice()).to_string())
.collect()
})
.collect()
} else {
panic!("Expected pipelined query query")
};
assert_eq!(f, payload.len());
assert_eq!(
q,
vec![
vec!["SET".to_owned(), "x".into(), "100".into()],
vec!["GET".into(), "x".into()]
]
);
}
#[test]
fn parse_pipelined_query_incomplete() {
for i in 0..PQPAYLOAD.len() - 1 {
let slice = &PQPAYLOAD[..i];
assert_eq!(Parser::parse(slice).unwrap_err(), ParseError::NotEnough);
}
}

@ -38,6 +38,10 @@ use ::sky_macros::compiled_eresp_bytes as eresp;
use tokio::io::AsyncWriteExt;
impl ProtocolSpec for Skyhash2 {
// spec information
const PROTOCOL_VERSION: f32 = 2.0;
const PROTOCOL_VERSIONSTRING: &'static str = "Skyhash-2.0";
// type symbols
const TSYMBOL_STRING: u8 = b'+';
const TSYMBOL_BINARY: u8 = b'?';
@ -135,7 +139,8 @@ impl ProtocolSpec for Skyhash2 {
const RSTRING_LISTMAP_LIST_IS_EMPTY: &'static [u8] = eresp!("list-is-empty");
// full responses
const FULLRESP_RCODE_PACKET_ERR: &'static [u8] = b"*!1\n4\n";
const FULLRESP_RCODE_PACKET_ERR: &'static [u8] = b"*!4\n";
const FULLRESP_RCODE_WRONG_TYPE: &'static [u8] = b"*!7\n";
const FULLRESP_HEYA: &'static [u8] = b"+4\nHEY!";
}
@ -206,15 +211,7 @@ where
'life0: 'ret_life,
Self: 'ret_life,
{
Box::pin(async move {
let stream = self.get_mut_stream();
// tsymbol
stream.write_all(&[Skyhash2::TSYMBOL_INT64]).await?;
// body
stream.write_all(&Integer64::from(size)).await?;
// LF
stream.write_all(&[Skyhash2::LF]).await
})
Box::pin(async move { self.write_int64(size as _).await })
}
fn write_int64<'life0, 'ret_life>(
&'life0 mut self,

@ -28,6 +28,7 @@ mod interface_impls;
use crate::{
corestore::heap_array::HeapArray,
dbnet::connection::QueryWithAdvance,
protocol::{ParseError, ParseResult, PipelinedQuery, Query, SimpleQuery, UnsafeSlice},
};
use core::mem::transmute;
@ -205,9 +206,7 @@ impl Parser {
}
/// Parse a simple query
fn next_simple_query(&mut self) -> ParseResult<SimpleQuery> {
Ok(SimpleQuery {
data: self._next_simple_query()?,
})
Ok(SimpleQuery::new(self._next_simple_query()?))
}
/// Parse a pipelined query. This should have passed the `$` tsymbol
///
@ -284,7 +283,7 @@ impl Parser {
}
// only expose this. don't expose Self::new since that'll be _relatively easier_ to
// invalidate invariants for
pub fn parse(buf: &[u8]) -> ParseResult<(Query, usize)> {
pub fn parse(buf: &[u8]) -> ParseResult<QueryWithAdvance> {
let mut slf = Self::new(buf);
let body = slf._parse()?;
let consumed = slf.cursor_ptr() as usize - buf.as_ptr() as usize;

@ -52,7 +52,7 @@ mod tls {
}
mod sys {
use crate::protocol::{PROTOCOL_VERSION, PROTOCOL_VERSIONSTRING};
use crate::protocol::{LATEST_PROTOCOL_VERSION, LATEST_PROTOCOL_VERSIONSTRING};
use libsky::VERSION;
use sky_macros::dbtest_func as dbtest;
use skytable::{query, Element, RespCode};
@ -79,7 +79,7 @@ mod sys {
runeq!(
con,
query!("sys", "info", "protocol"),
Element::String(PROTOCOL_VERSIONSTRING.to_owned())
Element::String(LATEST_PROTOCOL_VERSIONSTRING.to_owned())
)
}
#[dbtest]
@ -87,7 +87,7 @@ mod sys {
runeq!(
con,
query!("sys", "info", "protover"),
Element::Float(PROTOCOL_VERSION)
Element::Float(LATEST_PROTOCOL_VERSION)
)
}
#[dbtest]

@ -106,18 +106,40 @@ pub fn dbtest_func(args: TokenStream, item: TokenStream) -> TokenStream {
/// Get a compile time respcode/respstring array. For example, if you pass: "Unknown action",
/// it will return: `!14\nUnknown Action\n`
pub fn compiled_eresp_array(tokens: TokenStream) -> TokenStream {
_get_eresp_array(tokens)
_get_eresp_array(tokens, false)
}
fn _get_eresp_array(tokens: TokenStream) -> TokenStream {
#[proc_macro]
/// Get a compile time respcode/respstring array. For example, if you pass: "Unknown action",
/// it will return: `!14\n14\nUnknown Action\n`
pub fn compiled_eresp_array_v1(tokens: TokenStream) -> TokenStream {
_get_eresp_array(tokens, true)
}
fn _get_eresp_array(tokens: TokenStream, sizeline: bool) -> TokenStream {
let payload_str = match syn::parse_macro_input!(tokens as Lit) {
Lit::Str(st) => st.value(),
_ => panic!("Expected a string literal"),
};
let payload_bytes = payload_str.as_bytes();
let mut processed = quote! {
b'!',
};
if sizeline {
let payload_len = payload_str.as_bytes().len();
let payload_len_str = payload_len.to_string();
let payload_len_bytes = payload_len_str.as_bytes();
for byte in payload_len_bytes {
processed = quote! {
#processed
#byte,
};
}
processed = quote! {
#processed
b'\n',
};
}
let payload_bytes = payload_str.as_bytes();
for byte in payload_bytes {
processed = quote! {
#processed
@ -145,3 +167,15 @@ pub fn compiled_eresp_bytes(tokens: TokenStream) -> TokenStream {
}
.into()
}
#[proc_macro]
/// Get a compile time respcode/respstring slice. For example, if you pass: "Unknown action",
/// it will return: `!14\nUnknown Action\n`
pub fn compiled_eresp_bytes_v1(tokens: TokenStream) -> TokenStream {
let ret = compiled_eresp_array_v1(tokens);
let ret = syn::parse_macro_input!(ret as syn::Expr);
quote! {
&#ret
}
.into()
}

Loading…
Cancel
Save