Upgrade all interfaces to use the new protocol

next
Sayan Nandan 2 years ago
parent a018b76f40
commit 879e20f6ca
No known key found for this signature in database
GPG Key ID: 8BC07A0A4D41DD52

12
Cargo.lock generated

@ -661,9 +661,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.123" version = "0.2.124"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb691a747a7ab48abc15c5b42066eaafde10dc427e3b6ee2a1cf43db04c763bd" checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50"
[[package]] [[package]]
name = "libsky" name = "libsky"
@ -1114,9 +1114,9 @@ checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.34.3" version = "0.34.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb617eb09c4ef1536405e357e3b63f39e3ab4cc2159db05395278ad5c352bb16" checksum = "3f5d1c6ed6d1c6915aa64749b809fc1bafff49d160f5d927463658093d7d62ab"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"errno", "errno",
@ -1336,7 +1336,7 @@ dependencies = [
[[package]] [[package]]
name = "skytable" name = "skytable"
version = "0.7.0" version = "0.7.0"
source = "git+https://github.com/skytable/client-rust?branch=next#297bd18fb864dc89f0a5c1a42b95a43150127597" source = "git+https://github.com/skytable/client-rust?branch=next#8d4ead1bc0e58421fada2c1fd7a9f75ccdc3f30e"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bb8", "bb8",
@ -1350,7 +1350,7 @@ dependencies = [
[[package]] [[package]]
name = "skytable" name = "skytable"
version = "0.7.0" version = "0.7.0"
source = "git+https://github.com/skytable/client-rust.git#297bd18fb864dc89f0a5c1a42b95a43150127597" source = "git+https://github.com/skytable/client-rust.git#8d4ead1bc0e58421fada2c1fd7a9f75ccdc3f30e"
dependencies = [ dependencies = [
"r2d2", "r2d2",
] ]

@ -27,17 +27,17 @@
use crate::actions::ActionError; use crate::actions::ActionError;
/// Skyhash respstring: already claimed (user was already claimed) /// Skyhash respstring: already claimed (user was already claimed)
pub const AUTH_ERROR_ALREADYCLAIMED: &[u8] = b"!24\nerr-auth-already-claimed\n"; pub const AUTH_ERROR_ALREADYCLAIMED: &[u8] = b"!err-auth-already-claimed\n";
/// Skyhash respcode(10): bad credentials (either bad creds or invalid user) /// Skyhash respcode(10): bad credentials (either bad creds or invalid user)
pub const AUTH_CODE_BAD_CREDENTIALS: &[u8] = b"!2\n10\n"; pub const AUTH_CODE_BAD_CREDENTIALS: &[u8] = b"!10\n";
/// Skyhash respstring: auth is disabled /// Skyhash respstring: auth is disabled
pub const AUTH_ERROR_DISABLED: &[u8] = b"!17\nerr-auth-disabled\n"; pub const AUTH_ERROR_DISABLED: &[u8] = b"!err-auth-disabled\n";
/// Skyhash respcode(11): Insufficient permissions (same for anonymous user) /// Skyhash respcode(11): Insufficient permissions (same for anonymous user)
pub const AUTH_CODE_PERMS: &[u8] = b"!2\n11\n"; pub const AUTH_CODE_PERMS: &[u8] = b"!11\n";
/// Skyhash respstring: ID is too long /// Skyhash respstring: ID is too long
pub const AUTH_ERROR_ILLEGAL_USERNAME: &[u8] = b"!25\nerr-auth-illegal-username\n"; pub const AUTH_ERROR_ILLEGAL_USERNAME: &[u8] = b"!err-auth-illegal-username\n";
/// Skyhash respstring: ID is protected/in use /// Skyhash respstring: ID is protected/in use
pub const AUTH_ERROR_FAILED_TO_DELETE_USER: &[u8] = b"!21\nerr-auth-deluser-fail\n"; pub const AUTH_ERROR_FAILED_TO_DELETE_USER: &[u8] = b"!err-auth-deluser-fail\n";
/// Auth erros /// Auth erros
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]

@ -62,7 +62,7 @@ use tokio::{
sync::{mpsc, Semaphore}, sync::{mpsc, Semaphore},
}; };
pub const SIMPLE_QUERY_HEADER: [u8; 3] = [b'*', b'1', b'\n']; pub const SIMPLE_QUERY_HEADER: [u8; 1] = [b'*'];
type QueryWithAdvance = (Query, usize); type QueryWithAdvance = (Query, usize);
pub enum QueryResult { pub enum QueryResult {
@ -138,7 +138,7 @@ where
{ {
/// Try to parse a query from the buffered data /// Try to parse a query from the buffered data
fn try_query(&self) -> Result<QueryWithAdvance, ParseError> { fn try_query(&self) -> Result<QueryWithAdvance, ParseError> {
protocol::Parser::new(self.get_buffer()).parse() protocol::Parser::parse(self.get_buffer())
} }
/// Read a query from the remote end /// Read a query from the remote end
/// ///
@ -232,7 +232,7 @@ where
{ {
Box::pin(async move { Box::pin(async move {
let slf = self; let slf = self;
slf.write_response([b'*']).await?; slf.write_response([b'$']).await?;
slf.get_mut_stream() slf.get_mut_stream()
.write_all(&Integer64::init(len as u64)) .write_all(&Integer64::init(len as u64))
.await?; .await?;
@ -478,11 +478,11 @@ where
let db = &mut self.db; let db = &mut self.db;
let mut auth_provider = AuthProviderHandle::new(&mut self.auth, &mut self.executor); let mut auth_provider = AuthProviderHandle::new(&mut self.auth, &mut self.executor);
match query { match query {
Query::SimpleQuery(sq) => { Query::Simple(sq) => {
con.write_simple_query_header().await?; con.write_simple_query_header().await?;
queryengine::execute_simple_noauth(db, con, &mut auth_provider, sq).await?; queryengine::execute_simple_noauth(db, con, &mut auth_provider, sq).await?;
} }
Query::PipelineQuery(_) => { Query::Pipelined(_) => {
con.write_simple_query_header().await?; con.write_simple_query_header().await?;
con.write_response(auth::errors::AUTH_CODE_BAD_CREDENTIALS) con.write_response(auth::errors::AUTH_CODE_BAD_CREDENTIALS)
.await?; .await?;
@ -499,11 +499,11 @@ where
let db = &mut self.db; let db = &mut self.db;
let mut auth_provider = AuthProviderHandle::new(&mut self.auth, &mut self.executor); let mut auth_provider = AuthProviderHandle::new(&mut self.auth, &mut self.executor);
match query { match query {
Query::SimpleQuery(q) => { Query::Simple(q) => {
con.write_simple_query_header().await?; con.write_simple_query_header().await?;
queryengine::execute_simple(db, con, &mut auth_provider, q).await?; queryengine::execute_simple(db, con, &mut auth_provider, q).await?;
} }
Query::PipelineQuery(pipeline) => { Query::Pipelined(pipeline) => {
con.write_pipeline_query_header(pipeline.len()).await?; con.write_pipeline_query_header(pipeline.len()).await?;
queryengine::execute_pipeline(db, con, &mut auth_provider, pipeline).await?; queryengine::execute_pipeline(db, con, &mut auth_provider, pipeline).await?;
} }

@ -1,128 +0,0 @@
/*
* Created on Tue May 11 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::UnsafeSlice;
#[cfg(test)]
use bytes::Bytes;
#[non_exhaustive]
#[derive(Debug, PartialEq)]
/// # Unsafe elements
/// This enum represents the data types as **unsafe** elements, supported by the Skyhash Protocol
///
/// ## Safety
///
/// The instantiator must ensure that the [`UnsafeSlice`]s are valid. See its own safety contracts
/// for more information
pub enum UnsafeElement {
/// Arrays can be nested! Their `<tsymbol>` is `&`
Array(Box<[UnsafeElement]>),
/// A String value; `<tsymbol>` is `+`
String(UnsafeSlice),
/// An unsigned integer value; `<tsymbol>` is `:`
UnsignedInt(u64),
/// A non-recursive String array; tsymbol: `_`
FlatArray(Box<[UnsafeFlatElement]>),
/// A type-less non-recursive array
AnyArray(Box<[UnsafeSlice]>),
}
#[derive(Debug, PartialEq)]
/// An **unsafe** flat element, present in a flat array
pub enum UnsafeFlatElement {
String(UnsafeSlice),
}
impl UnsafeElement {
pub const fn is_any_array(&self) -> bool {
matches!(self, Self::AnyArray(_))
}
}
// test impls are for our tests
#[cfg(test)]
impl UnsafeElement {
pub unsafe fn to_owned_flat_array(inner: &[UnsafeFlatElement]) -> Vec<FlatElement> {
inner
.iter()
.map(|v| match v {
UnsafeFlatElement::String(st) => {
FlatElement::String(Bytes::copy_from_slice(st.as_slice()))
}
})
.collect()
}
pub unsafe fn to_owned_any_array(inner: &[UnsafeSlice]) -> Vec<Bytes> {
inner
.iter()
.map(|v| Bytes::copy_from_slice(v.as_slice()))
.collect()
}
pub unsafe fn to_owned_array(inner: &[Self]) -> Vec<OwnedElement> {
inner
.iter()
.map(|v| match &*v {
UnsafeElement::String(st) => {
OwnedElement::String(Bytes::copy_from_slice(st.as_slice()))
}
UnsafeElement::UnsignedInt(int) => OwnedElement::UnsignedInt(*int),
UnsafeElement::AnyArray(arr) => {
OwnedElement::AnyArray(Self::to_owned_any_array(arr))
}
UnsafeElement::Array(arr) => OwnedElement::Array(Self::to_owned_array(arr)),
UnsafeElement::FlatArray(frr) => {
OwnedElement::FlatArray(Self::to_owned_flat_array(frr))
}
})
.collect()
}
pub unsafe fn as_owned_element(&self) -> OwnedElement {
match self {
Self::AnyArray(arr) => OwnedElement::AnyArray(Self::to_owned_any_array(arr)),
Self::FlatArray(frr) => OwnedElement::FlatArray(Self::to_owned_flat_array(frr)),
Self::Array(arr) => OwnedElement::Array(Self::to_owned_array(arr)),
Self::String(st) => OwnedElement::String(Bytes::copy_from_slice(st.as_slice())),
Self::UnsignedInt(int) => OwnedElement::UnsignedInt(*int),
}
}
}
// owned variants to simplify equality in tests
#[derive(Debug, PartialEq)]
#[cfg(test)]
pub enum OwnedElement {
Array(Vec<OwnedElement>),
String(Bytes),
UnsignedInt(u64),
FlatArray(Vec<FlatElement>),
AnyArray(Vec<Bytes>),
}
#[cfg(test)]
#[derive(Debug, PartialEq)]
pub enum FlatElement {
String(Bytes),
}

@ -24,14 +24,9 @@
* *
*/ */
#[cfg(test)] use super::{Query, UnsafeSlice};
use super::UnsafeElement;
use super::UnsafeSlice;
use bytes::Bytes; use bytes::Bytes;
use core::hint::unreachable_unchecked; use core::{hint::unreachable_unchecked, iter::FusedIterator, ops::Deref, slice::Iter};
use core::iter::FusedIterator;
use core::ops::Deref;
use core::slice::Iter;
/// An iterator over an [`AnyArray`] (an [`UnsafeSlice`]). The validity of the iterator is /// An iterator over an [`AnyArray`] (an [`UnsafeSlice`]). The validity of the iterator is
/// left to the caller who has to guarantee: /// left to the caller who has to guarantee:
@ -183,20 +178,12 @@ impl<'a> FusedIterator for BorrowedAnyArrayIter<'a> {}
#[test] #[test]
fn test_iter() { fn test_iter() {
use super::{Parser, Query}; use super::{Parser, Query};
let (q, _fwby) = Parser::new(b"*1\n~3\n3\nset\n1\nx\n3\n100\n") let (q, _fwby) = Parser::parse(b"*3\n3\nset1\nx3\n100").unwrap();
.parse()
.unwrap();
let r = match q { let r = match q {
Query::SimpleQuery(q) => q, Query::Simple(q) => q,
_ => panic!("Wrong query"), _ => panic!("Wrong query"),
}; };
let arr = unsafe { let it = r.as_slice().iter();
match r.into_inner() {
UnsafeElement::AnyArray(arr) => arr,
_ => panic!("Wrong type"),
}
};
let it = arr.iter();
let mut iter = unsafe { AnyArrayIter::new(it) }; let mut iter = unsafe { AnyArrayIter::new(it) };
assert_eq!(iter.next_uppercase().unwrap().as_ref(), "SET".as_bytes()); assert_eq!(iter.next_uppercase().unwrap().as_ref(), "SET".as_bytes());
assert_eq!(iter.next().unwrap(), "x".as_bytes()); assert_eq!(iter.next().unwrap(), "x".as_bytes());

@ -1,5 +1,5 @@
/* /*
* Created on Mon May 10 2021 * Created on Tue Apr 12 2022
* *
* This file is a part of Skytable * This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
@ -7,7 +7,7 @@
* vision to provide flexibility in data modelling without compromising * vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability. * on performance, queryability or scalability.
* *
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com> * Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by * it under the terms of the GNU Affero General Public License as published by
@ -24,83 +24,23 @@
* *
*/ */
//! # The Skyhash Protocol #![allow(unused)] // TODO(@ohsayan): Remove this once we're done
//!
//! ## Introduction
//! The Skyhash Protocol is a serialization protocol that is used by Skytable for client/server communication.
//! It works in a query/response action similar to HTTP's request/response action. Skyhash supersedes the Terrapipe
//! protocol as a more simple, reliable, robust and scalable protocol.
//!
//! This module contains the [`Parser`] for the Skyhash protocol and it's enough to just pass a query packet as
//! a slice of unsigned 8-bit integers and the parser will do everything else. The Skyhash protocol was designed
//! and implemented by the Author (Sayan Nandan)
//!
// modules use crate::corestore::heap_array::HeapArray;
pub mod element; use core::{fmt, marker::PhantomData, mem::transmute, slice};
pub mod iter; #[cfg(feature = "nightly")]
pub mod responses; mod benches;
pub mod v2;
bench! {
mod benches;
}
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
// endof modules // pub mods
// test imports pub mod iter;
#[cfg(test)] pub mod responses;
use self::element::OwnedElement; // endof pub mods
// endof test imports
use self::element::{UnsafeElement, UnsafeFlatElement};
use crate::util::Unwrappable;
use core::fmt;
use core::hint::unreachable_unchecked;
use core::ops;
use core::slice;
/// The Skyhash protocol version /// The Skyhash protocol version
pub const PROTOCOL_VERSION: f32 = 1.2; pub const PROTOCOL_VERSION: f32 = 2.0;
/// The Skyhash protocol version string (Skyhash-x.y) /// The Skyhash protocol version string (Skyhash-x.y)
pub const PROTOCOL_VERSIONSTRING: &str = "Skyhash-1.1"; pub const PROTOCOL_VERSIONSTRING: &str = "Skyhash-2.0";
const ASCII_UNDERSCORE: u8 = b'_';
const ASCII_AMPERSAND: u8 = b'&';
const ASCII_COLON: u8 = b':';
const ASCII_PLUS_SIGN: u8 = b'+';
const ASCII_TILDE_SIGN: u8 = b'~';
#[derive(Debug)]
/// # Skyhash Deserializer (Parser)
///
/// The [`Parser`] object can be used to deserialized a packet serialized by Skyhash which in turn serializes
/// it into data structures native to the Rust Language (and some Compound Types built on top of them).
///
/// ## Safety
///
/// The results returned by the parser are not bound by any lifetime and instead return raw
/// pointers to parts of the source buffer. This means that the caller must ensure that the
/// source buffer remains valid for as long as the result is used.
///
/// ## Evaluation
///
/// The parser is pessimistic in most cases and will readily throw out any errors. On non-recusrive types
/// there is no recursion, but the parser will use implicit recursion for nested arrays. The parser will
/// happily not report any errors if some part of the next query was passed. This is very much a possibility
/// and so has been accounted for
///
/// ## Important note
///
/// All developers willing to modify the deserializer must keep this in mind: the cursor is always Ahead-Of-Position
/// that is the cursor should always point at the next character that can be read.
///
pub struct Parser<'a> {
/// the cursor ptr
cursor: *const u8,
/// the data end ptr
data_end_ptr: *const u8,
/// the buffer
buffer: &'a [u8],
}
#[derive(PartialEq)] #[derive(PartialEq)]
/// As its name says, an [`UnsafeSlice`] is a terribly unsafe slice. It's guarantess are /// As its name says, an [`UnsafeSlice`] is a terribly unsafe slice. It's guarantess are
@ -187,489 +127,324 @@ pub enum ParseError {
UnknownDatatype = 4u8, UnknownDatatype = 4u8,
} }
#[derive(Debug, PartialEq)] /// A generic result to indicate parsing errors thorugh the [`ParseError`] enum
/// A simple query object. This object is **not bound to any lifetime!** That's pub type ParseResult<T> = Result<T, ParseError>;
/// why, merely _having_ it is not unsafe, but all methods on it are unsafe and
/// the caller has to uphold the guarantee of keeping the source buffer's pointers
/// valid
///
/// ## Safety Contracts
///
/// - The provided `UnsafeElement` is valid and generated _legally_
pub struct SimpleQuery {
/// the inner unsafe element
inner: UnsafeElement,
}
impl SimpleQuery { #[derive(Debug)]
/// Create a new `SimpleQuery` pub enum Query {
/// Simple(SimpleQuery),
/// ## Safety Pipelined(PipelinedQuery),
///
/// This is unsafe because the caller must guarantee the sanctity of
/// the provided element
const unsafe fn new(inner: UnsafeElement) -> SimpleQuery {
Self { inner }
}
/// Decomposes self into an [`UnsafeElement`]
///
/// ## Safety
///
/// Caller must ensure that the UnsafeElement's pointers are still valid
pub unsafe fn into_inner(self) -> UnsafeElement {
self.inner
}
pub const fn is_any_array(&self) -> bool {
matches!(self.inner, UnsafeElement::AnyArray(_))
}
} }
#[derive(Debug, PartialEq)] #[derive(Debug)]
/// A pipelined query object. This is bound to an _anonymous lifetime_ which is to be bound by pub struct SimpleQuery {
/// the instantiator data: HeapArray<UnsafeSlice>,
///
/// ## Safety Contracts
///
/// - The provided `UnsafeElement` is valid and generated _legally_
/// - The source pointers for the `UnsafeElement` is valid
pub struct PipelineQuery {
inner: Box<[UnsafeElement]>,
} }
impl PipelineQuery { impl SimpleQuery {
/// Create a new `PipelineQuery` #[cfg(test)]
/// fn into_owned(self) -> OwnedSimpleQuery {
/// ## Safety OwnedSimpleQuery {
/// data: self
/// The caller has the responsibility to uphold the guarantee of keeping the source .data
/// pointers valid .iter()
const unsafe fn new(inner: Box<[UnsafeElement]>) -> PipelineQuery { .map(|v| unsafe { v.as_slice().to_owned() })
Self { inner } .collect(),
}
} }
pub const fn len(&self) -> usize { pub fn as_slice(&self) -> &[UnsafeSlice] {
self.inner.len() &self.data
} }
} }
impl ops::Deref for PipelineQuery { #[cfg(test)]
type Target = [UnsafeElement]; struct OwnedSimpleQuery {
fn deref(&self) -> &Self::Target { data: Vec<Vec<u8>>,
&self.inner
}
} }
#[derive(Debug, PartialEq)] #[derive(Debug)]
/// # Queries pub struct PipelinedQuery {
/// data: HeapArray<HeapArray<UnsafeSlice>>,
/// This enum has two variants: Simple and Pipelined. Both hold two **terribly `unsafe`**
/// objects:
/// - A [`SimpleQuery`] or
/// - A [`PipelineQuery`]
///
/// Again, **both objects hold raw pointers and guarantee nothing about the source's
/// validity**.
/// ## Safety
///
/// This object holds zero ownership on the actual data, but only holds a collection
/// of pointers. This means that you have to **ensure that the source outlives the Query**
/// object. Using it otherwise is very **`unsafe`**! The caller who creates the instance or the
/// one who uses it is responsible for ensuring any validity guarantee
///
pub enum Query {
/// A simple query will just hold one element
SimpleQuery(SimpleQuery),
/// A pipelined/batch query will hold multiple elements
PipelineQuery(PipelineQuery),
} }
impl Query { impl PipelinedQuery {
pub fn len(&self) -> usize {
self.data.len()
}
pub fn into_inner(self) -> HeapArray<HeapArray<UnsafeSlice>> {
self.data
}
#[cfg(test)] #[cfg(test)]
/// Turns self into an onwed query (see [`OwnedQuery`]) fn into_owned(self) -> OwnedPipelinedQuery {
/// OwnedPipelinedQuery {
/// ## Safety contracts data: self
/// .data
/// - The query itself is valid .iter()
/// - The query is correctly bound to the lifetime .map(|v| {
unsafe fn into_owned_query(self) -> OwnedQuery { v.iter()
match self { .map(|v| unsafe { v.as_slice().to_owned() })
Self::SimpleQuery(SimpleQuery { inner, .. }) => { .collect()
OwnedQuery::SimpleQuery(inner.as_owned_element()) })
} .collect(),
Self::PipelineQuery(PipelineQuery { inner, .. }) => {
OwnedQuery::PipelineQuery(inner.iter().map(|v| v.as_owned_element()).collect())
}
} }
} }
} }
#[derive(Debug, PartialEq)]
#[cfg(test)] #[cfg(test)]
/// An owned query with direct ownership of the data rather than through struct OwnedPipelinedQuery {
/// pointers data: Vec<Vec<Vec<u8>>>,
enum OwnedQuery {
SimpleQuery(OwnedElement),
PipelineQuery(Vec<OwnedElement>),
} }
/// A generic result to indicate parsing errors thorugh the [`ParseError`] enum /// A parser for Skyhash 2.0
pub type ParseResult<T> = Result<T, ParseError>; pub struct Parser<'a> {
end: *const u8,
cursor: *const u8,
_lt: PhantomData<&'a ()>,
}
impl<'a> Parser<'a> { impl<'a> Parser<'a> {
/// Create a new parser instance, bound to the lifetime of the source buffer /// Initialize a new parser
pub fn new(buffer: &'a [u8]) -> Self { pub fn new(slice: &[u8]) -> Self {
unsafe { unsafe {
let cursor = buffer.as_ptr();
let data_end_ptr = cursor.add(buffer.len());
Self { Self {
cursor, end: slice.as_ptr().add(slice.len()),
data_end_ptr, cursor: slice.as_ptr(),
buffer, _lt: PhantomData,
} }
} }
} }
/// Returns what we have consumed }
/// ```text
/// [*****************************] // basic methods
/// ^ ^ ^ impl<'a> Parser<'a> {
/// start cursor data end ptr /// Returns a ptr one byte past the allocation of the buffer
/// ```
fn consumed(&self) -> usize {
unsafe { self.cursor.offset_from(self.buffer.as_ptr()) as usize }
}
/// Returns what we have left
/// ```text
/// [*****************************]
/// ^ ^
/// cursor data end ptr
/// ```
fn remaining(&self) -> usize {
unsafe { self.data_end_ptr().offset_from(self.cursor) as usize }
}
fn exhausted(&self) -> bool {
self.cursor >= self.data_end_ptr()
}
/// Returns a ptr to one byte past the last non-null ptr
const fn data_end_ptr(&self) -> *const u8 { const fn data_end_ptr(&self) -> *const u8 {
self.data_end_ptr self.end
} }
/// Move the cursor ptr ahead by the provided amount /// Returns the position of the cursor
fn incr_cursor_by(&mut self, by: usize) { /// WARNING: Deref might led to a segfault
unsafe { self.cursor = self.cursor.add(by) } const fn cursor_ptr(&self) -> *const u8 {
self.cursor
} }
/// Increment the cursor by 1 /// Check how many bytes we have left
fn incr_cursor(&mut self) { fn remaining(&self) -> usize {
self.incr_cursor_by(1) self.data_end_ptr() as usize - self.cursor_ptr() as usize
} }
/// Read `until` bytes from source /// Check if we have `size` bytes remaining
fn read_until(&mut self, until: usize) -> ParseResult<UnsafeSlice> { fn has_remaining(&self, size: usize) -> bool {
if self.remaining() < until { self.remaining() >= size
Err(ParseError::NotEnough)
} else {
let start_ptr = self.cursor;
self.incr_cursor_by(until);
unsafe { Ok(UnsafeSlice::new(start_ptr, until)) }
}
} }
/// Read a line. This will place the cursor at a point just ahead of /// Check if we have exhausted the buffer
/// the LF fn exhausted(&self) -> bool {
fn read_line(&mut self) -> ParseResult<UnsafeSlice> { self.cursor_ptr() >= self.data_end_ptr()
if self.exhausted() {
Err(ParseError::NotEnough)
} else {
let start_ptr = self.cursor;
let end_ptr = self.data_end_ptr();
let mut len = 0usize;
unsafe {
while end_ptr > self.cursor && *self.cursor != b'\n' {
len += 1;
self.incr_cursor();
}
if self.will_cursor_give_linefeed()? {
let ret = Ok(UnsafeSlice::new(start_ptr, len));
self.incr_cursor();
ret
} else {
Err(ParseError::BadPacket)
}
}
}
} }
} /// Check if the buffer is not exhausted
fn not_exhausted(&self) -> bool {
impl<'a> Parser<'a> { self.cursor_ptr() < self.data_end_ptr()
/// Returns true if the cursor will give a char, but if `this_if_nothing_ahead` is set
/// to true, then if no byte is ahead, it will still return true
fn will_cursor_give_char(&self, ch: u8, this_if_nothing_ahead: bool) -> ParseResult<bool> {
if self.exhausted() {
// nothing left
if this_if_nothing_ahead {
Ok(true)
} else {
Err(ParseError::NotEnough)
}
} else if unsafe { (*self.cursor).eq(&ch) } {
Ok(true)
} else {
Ok(false)
}
} }
/// Check if the current cursor will give an LF /// Attempts to return the byte pointed at by the cursor.
fn will_cursor_give_linefeed(&self) -> ParseResult<bool> { /// WARNING: The same segfault warning
self.will_cursor_give_char(b'\n', false) const unsafe fn get_byte_at_cursor(&self) -> u8 {
*self.cursor_ptr()
} }
} }
// mutable refs
impl<'a> Parser<'a> { impl<'a> Parser<'a> {
/// Parse a stream of bytes into [`usize`] /// Increment the cursor by `by` positions
fn parse_into_usize(bytes: &[u8]) -> ParseResult<usize> { unsafe fn incr_cursor_by(&mut self, by: usize) {
if bytes.is_empty() { self.cursor = self.cursor.add(by);
return Err(ParseError::NotEnough);
}
let byte_iter = bytes.iter();
let mut item_usize = 0usize;
for dig in byte_iter {
if !dig.is_ascii_digit() {
// dig has to be an ASCII digit
return Err(ParseError::DatatypeParseFailure);
}
// 48 is the ASCII code for 0, and 57 is the ascii code for 9
// so if 0 is given, the subtraction should give 0; similarly
// if 9 is given, the subtraction should give us 9!
let curdig: usize = unsafe {
// UNSAFE(@ohsayan): We already know that dig is an ASCII digit
dig.checked_sub(48).unsafe_unwrap()
}
.into();
// The usize can overflow; check that case
let product = match item_usize.checked_mul(10) {
Some(not_overflowed) => not_overflowed,
None => return Err(ParseError::DatatypeParseFailure),
};
let sum = match product.checked_add(curdig) {
Some(not_overflowed) => not_overflowed,
None => return Err(ParseError::DatatypeParseFailure),
};
item_usize = sum;
}
Ok(item_usize)
} }
/// Pasre a stream of bytes into an [`u64`] /// Increment the position of the cursor by one position
fn parse_into_u64(bytes: &[u8]) -> ParseResult<u64> { unsafe fn incr_cursor(&mut self) {
if bytes.is_empty() { self.incr_cursor_by(1);
return Err(ParseError::NotEnough);
}
let byte_iter = bytes.iter();
let mut item_u64 = 0u64;
for dig in byte_iter {
if !dig.is_ascii_digit() {
// dig has to be an ASCII digit
return Err(ParseError::DatatypeParseFailure);
}
// 48 is the ASCII code for 0, and 57 is the ascii code for 9
// so if 0 is given, the subtraction should give 0; similarly
// if 9 is given, the subtraction should give us 9!
let curdig: u64 = unsafe {
// UNSAFE(@ohsayan): We already know that dig is an ASCII digit
dig.checked_sub(48).unsafe_unwrap()
}
.into();
// Now the entire u64 can overflow, so let's attempt to check it
let product = match item_u64.checked_mul(10) {
Some(not_overflowed) => not_overflowed,
None => return Err(ParseError::DatatypeParseFailure),
};
let sum = match product.checked_add(curdig) {
Some(not_overflowed) => not_overflowed,
None => return Err(ParseError::DatatypeParseFailure),
};
item_u64 = sum;
}
Ok(item_u64)
} }
} }
// higher level abstractions
impl<'a> Parser<'a> { impl<'a> Parser<'a> {
/// Parse the metaframe to get the number of queries, i.e the datagroup /// Attempt to read `len` bytes
/// count fn read_until(&mut self, len: usize) -> ParseResult<UnsafeSlice> {
fn parse_metaframe_get_datagroup_count(&mut self) -> ParseResult<usize> { if self.has_remaining(len) {
if self.buffer.len() < 3 {
// the smallest query we can have is: *1\n or 3 chars
Err(ParseError::NotEnough)
} else {
unsafe { unsafe {
let our_chunk = self.read_line()?; // UNSAFE(@ohsayan): Already verified lengths
if our_chunk.unsafe_eq(b'*', 0) { let slice = UnsafeSlice::new(self.cursor_ptr(), len);
Ok(Self::parse_into_usize( self.incr_cursor_by(len);
our_chunk.into_slice_with_start_and_end(1, 1), Ok(slice)
)?)
} else {
Err(ParseError::UnexpectedByte)
}
} }
} else {
Err(ParseError::NotEnough)
} }
} }
} /// Attempt to read a byte slice terminated by an LF
fn read_line(&mut self) -> ParseResult<UnsafeSlice> {
impl<'a> Parser<'a> { let start_ptr = self.cursor_ptr();
/// Gets the _next element. **The cursor should be at the tsymbol (passed)**
fn _next(&mut self) -> ParseResult<UnsafeSlice> {
let sizeline = self.read_line()?;
unsafe { unsafe {
let element_size = Self::parse_into_usize(sizeline.into_slice())?; while self.not_exhausted() && self.get_byte_at_cursor() != b'\n' {
self.read_until(element_size)
}
}
/// Gets the next string (`+`). **The cursor should be at the tsymbol (passed)**
fn parse_next_string(&mut self) -> ParseResult<UnsafeSlice> {
{
let chunk = self._next()?;
let haslf = self.will_cursor_give_linefeed()?;
if haslf {
self.incr_cursor(); self.incr_cursor();
Ok(chunk) }
if self.not_exhausted() && self.get_byte_at_cursor() == b'\n' {
let len = self.cursor_ptr() as usize - start_ptr as usize;
self.incr_cursor(); // skip LF
Ok(UnsafeSlice::new(start_ptr, len))
} else { } else {
Err(ParseError::NotEnough) Err(ParseError::NotEnough)
} }
} }
} }
/// Gets the next `u64`. **The cursor should be at the tsymbol (passed)** /// Attempt to read a line, **rejecting an empty payload**
fn parse_next_u64(&mut self) -> ParseResult<u64> { fn read_line_pedantic(&mut self) -> ParseResult<UnsafeSlice> {
let chunk = self._next()?; let start_ptr = self.cursor_ptr();
unsafe { unsafe {
let ret = Self::parse_into_u64(chunk.into_slice())?; while self.not_exhausted() && self.get_byte_at_cursor() != b'\n' {
if self.will_cursor_give_linefeed()? {
self.incr_cursor(); self.incr_cursor();
Ok(ret) }
let len = self.cursor_ptr() as usize - start_ptr as usize;
let has_lf = self.not_exhausted() && self.get_byte_at_cursor() == b'\n';
if has_lf && len != 0 {
self.incr_cursor(); // skip LF
Ok(UnsafeSlice::new(start_ptr, len))
} else { } else {
Err(ParseError::NotEnough) // just some silly hackery
Err(transmute(has_lf))
} }
} }
} }
/// Gets the next element. **The cursor should be at the tsymbol (_not_ passed)** /// Attempt to read an `usize` from the buffer
fn parse_next_element(&mut self) -> ParseResult<UnsafeElement> { fn read_usize(&mut self) -> ParseResult<usize> {
if self.exhausted() { let line = self.read_line_pedantic()?;
Err(ParseError::NotEnough) let bytes = unsafe {
} else { // UNSAFE(@ohsayan): We just extracted the slice
unsafe { line.as_slice()
let tsymbol = *self.cursor; };
// got tsymbol, now incr let mut ret = 0usize;
self.incr_cursor(); for byte in bytes {
let ret = match tsymbol { if byte.is_ascii_digit() {
ASCII_PLUS_SIGN => UnsafeElement::String(self.parse_next_string()?), ret = match ret.checked_mul(10) {
ASCII_COLON => UnsafeElement::UnsignedInt(self.parse_next_u64()?), Some(r) => r,
ASCII_AMPERSAND => UnsafeElement::Array(self.parse_next_array()?), None => return Err(ParseError::DatatypeParseFailure),
ASCII_TILDE_SIGN => UnsafeElement::AnyArray(self.parse_next_any_array()?), };
ASCII_UNDERSCORE => UnsafeElement::FlatArray(self.parse_next_flat_array()?), ret = match ret.checked_add((byte & 0x0F) as _) {
_ => { Some(r) => r,
return Err(ParseError::UnknownDatatype); None => return Err(ParseError::DatatypeParseFailure),
}
}; };
Ok(ret)
}
}
}
/// Parse the next blob. **The cursor should be at the tsymbol (passed)**
fn parse_next_blob(&mut self) -> ParseResult<UnsafeSlice> {
{
let chunk = self._next()?;
if self.will_cursor_give_linefeed()? {
self.incr_cursor();
Ok(chunk)
} else { } else {
Err(ParseError::UnexpectedByte) return Err(ParseError::DatatypeParseFailure);
} }
} }
Ok(ret)
} }
/// Parse the next `AnyArray`. **The cursor should be at the tsymbol (passed)** }
fn parse_next_any_array(&mut self) -> ParseResult<Box<[UnsafeSlice]>> {
// query impls
impl<'a> Parser<'a> {
/// Parse the next simple query. This should have passed the `*` tsymbol
///
/// Simple query structure (tokenized line-by-line):
/// ```text
/// * -> Simple Query Header
/// <n>\n -> Count of elements in the simple query
/// <l0>\n -> Length of element 1
/// <e0> -> element 1 itself
/// <l1>\n -> Length of element 2
/// <e1> -> element 2 itself
/// ...
/// ```
fn _next_simple_query(&mut self) -> ParseResult<HeapArray<UnsafeSlice>> {
let element_count = self.read_usize()?;
unsafe { unsafe {
let size_line = self.read_line()?; let mut data = HeapArray::new_writer(element_count);
let size = Self::parse_into_usize(size_line.into_slice())?; for i in 0..element_count {
let mut array = Vec::with_capacity(size); let element_size = self.read_usize()?;
for _ in 0..size { let element = self.read_until(element_size)?;
array.push(self.parse_next_blob()?); data.write_to_index(i, element);
} }
Ok(array.into_boxed_slice()) Ok(data.finish())
} }
} }
/// The cursor should have passed the tsymbol /// Parse a simple query
fn parse_next_flat_array(&mut self) -> ParseResult<Box<[UnsafeFlatElement]>> { fn next_simple_query(&mut self) -> ParseResult<SimpleQuery> {
unsafe { Ok(SimpleQuery {
let flat_array_sizeline = self.read_line()?; data: self._next_simple_query()?,
let array_size = Self::parse_into_usize(flat_array_sizeline.into_slice())?; })
let mut array = Vec::with_capacity(array_size);
for _ in 0..array_size {
if self.exhausted() {
return Err(ParseError::NotEnough);
} else {
let tsymb = *self.cursor;
// good, there is a tsymbol; move the cursor ahead
self.incr_cursor();
let ret = match tsymb {
b'+' => self.parse_next_string().map(UnsafeFlatElement::String)?,
_ => return Err(ParseError::UnknownDatatype),
};
array.push(ret);
}
}
Ok(array.into_boxed_slice())
}
} }
/// Parse the next array. **The cursor should be at the tsymbol (passed)** /// Parse a pipelined query. This should have passed the `$` tsymbol
fn parse_next_array(&mut self) -> ParseResult<Box<[UnsafeElement]>> { ///
/// Pipelined query structure (tokenized line-by-line):
/// ```text
/// $ -> Pipeline
/// <n>\n -> Pipeline has n queries
/// <lq0>\n -> Query 1 has 3 elements
/// <lq0e0>\n -> Q1E1 has 3 bytes
/// <q0e0> -> Q1E1 itself
/// <lq0e1>\n -> Q1E2 has 1 byte
/// <q0e1> -> Q1E2 itself
/// <lq0e2>\n -> Q1E3 has 3 bytes
/// <q0e2> -> Q1E3 itself
/// <lq1>\n -> Query 2 has 2 elements
/// <lq1e0>\n -> Q2E1 has 3 bytes
/// <q1e0> -> Q2E1 itself
/// <lq1e1>\n -> Q2E2 has 1 byte
/// <q1e1> -> Q2E2 itself
/// ...
/// ```
///
/// Example:
/// ```text
/// $ -> Pipeline
/// 2\n -> Pipeline has 2 queries
/// 3\n -> Query 1 has 3 elements
/// 3\n -> Q1E1 has 3 bytes
/// SET -> Q1E1 itself
/// 1\n -> Q1E2 has 1 byte
/// x -> Q1E2 itself
/// 3\n -> Q1E3 has 3 bytes
/// 100 -> Q1E3 itself
/// 2\n -> Query 2 has 2 elements
/// 3\n -> Q2E1 has 3 bytes
/// GET -> Q2E1 itself
/// 1\n -> Q2E2 has 1 byte
/// x -> Q2E2 itself
/// ```
fn next_pipeline(&mut self) -> ParseResult<PipelinedQuery> {
let query_count = self.read_usize()?;
unsafe { unsafe {
let size_of_array_chunk = self.read_line()?; let mut queries = HeapArray::new_writer(query_count);
let size_of_array = Self::parse_into_usize(size_of_array_chunk.into_slice())?; for i in 0..query_count {
let mut array = Vec::with_capacity(size_of_array); let sq = self._next_simple_query()?;
for _ in 0..size_of_array { queries.write_to_index(i, sq);
array.push(self.parse_next_element()?);
} }
Ok(array.into_boxed_slice()) Ok(PipelinedQuery {
data: queries.finish(),
})
} }
} }
} fn _parse(&mut self) -> ParseResult<Query> {
if self.not_exhausted() {
impl<'a> Parser<'a> { unsafe {
/// Try to parse the provided buffer (lt: 'a) into a Query (lt: 'a). The let first_byte = self.get_byte_at_cursor();
/// parser itself can (or must) go out of scope, but the buffer can't! self.incr_cursor();
pub fn parse(&mut self) -> Result<(Query, usize), ParseError> { let data = match first_byte {
let number_of_queries = self.parse_metaframe_get_datagroup_count()?; b'*' => {
if number_of_queries == 0 { // a simple query
return Err(ParseError::BadPacket); Query::Simple(self.next_simple_query()?)
} }
if number_of_queries == 1 { b'$' => {
let query = self.parse_next_element()?; // a pipelined query
if unsafe { Query::Pipelined(self.next_pipeline()?)
self.will_cursor_give_char(b'*', true) }
.unwrap_or_else(|_| unreachable_unchecked()) _ => return Err(ParseError::UnexpectedByte),
} { };
unsafe { Ok(data)
// SAFETY: Contract upheld
Ok((Query::SimpleQuery(SimpleQuery::new(query)), self.consumed()))
}
} else {
Err(ParseError::UnexpectedByte)
} }
} else { } else {
// pipelined query Err(ParseError::NotEnough)
let mut queries = Vec::with_capacity(number_of_queries);
for _ in 0..number_of_queries {
queries.push(self.parse_next_element()?);
}
if unsafe {
self.will_cursor_give_char(b'*', true)
.unwrap_or_else(|_| unreachable_unchecked())
} {
unsafe {
// SAFETY: Contract upheld
Ok((
Query::PipelineQuery(PipelineQuery::new(queries.into_boxed_slice())),
self.consumed(),
))
}
} else {
Err(ParseError::UnexpectedByte)
}
} }
} }
pub fn parse(buf: &[u8]) -> ParseResult<(Query, usize)> {
let mut slf = Self::new(buf);
let body = slf._parse()?;
let consumed = slf.cursor_ptr() as usize - buf.as_ptr() as usize;
Ok((body, consumed))
}
} }

@ -47,7 +47,7 @@ pub mod groups {
/// Response code 6 as a array element /// Response code 6 as a array element
pub const OTHER_ERR_EMPTY: &[u8] = eresp!("6"); pub const OTHER_ERR_EMPTY: &[u8] = eresp!("6");
/// Response group element with string "HEYA" /// Response group element with string "HEYA"
pub const HEYA: &[u8] = "+4\nHEY!\n".as_bytes(); pub const HEYA: &[u8] = "+4\nHEY!".as_bytes();
/// "Unknown action" error response /// "Unknown action" error response
pub const UNKNOWN_ACTION: &[u8] = eresp!("Unknown action"); pub const UNKNOWN_ACTION: &[u8] = eresp!("Unknown action");
/// Response code 7 /// Response code 7
@ -117,37 +117,37 @@ pub mod full_responses {
//! be written off directly to the stream and should **not be preceded by any response metaframe** //! be written off directly to the stream and should **not be preceded by any response metaframe**
/// Response code: 0 (Okay) /// Response code: 0 (Okay)
pub const R_OKAY: &[u8] = "*1\n!1\n0\n".as_bytes(); pub const R_OKAY: &[u8] = "*!1\n0\n".as_bytes();
/// Response code: 1 (Nil) /// Response code: 1 (Nil)
pub const R_NIL: &[u8] = "*1\n!1\n1\n".as_bytes(); pub const R_NIL: &[u8] = "*!1\n1\n".as_bytes();
/// Response code: 2 (Overwrite Error) /// Response code: 2 (Overwrite Error)
pub const R_OVERWRITE_ERR: &[u8] = "*1\n!1\n2\n".as_bytes(); pub const R_OVERWRITE_ERR: &[u8] = "*!1\n2\n".as_bytes();
/// Response code: 3 (Action Error) /// Response code: 3 (Action Error)
pub const R_ACTION_ERR: &[u8] = "*1\n!1\n3\n".as_bytes(); pub const R_ACTION_ERR: &[u8] = "*!1\n3\n".as_bytes();
/// Response code: 4 (Packet Error) /// Response code: 4 (Packet Error)
pub const R_PACKET_ERR: &[u8] = "*1\n!1\n4\n".as_bytes(); pub const R_PACKET_ERR: &[u8] = "*!1\n4\n".as_bytes();
/// Response code: 5 (Server Error) /// Response code: 5 (Server Error)
pub const R_SERVER_ERR: &[u8] = "*1\n!1\n5\n".as_bytes(); pub const R_SERVER_ERR: &[u8] = "*!1\n5\n".as_bytes();
/// Response code: 6 (Other Error _without description_) /// Response code: 6 (Other Error _without description_)
pub const R_OTHER_ERR_EMPTY: &[u8] = "*1\n!1\n6\n".as_bytes(); pub const R_OTHER_ERR_EMPTY: &[u8] = "*!1\n6\n".as_bytes();
/// Response code: 7; wrongtype /// Response code: 7; wrongtype
pub const R_WRONGTYPE_ERR: &[u8] = "*1\n!1\n7".as_bytes(); pub const R_WRONGTYPE_ERR: &[u8] = "*!1\n7".as_bytes();
/// Response code: 8; unknown data type /// Response code: 8; unknown data type
pub const R_UNKNOWN_DATA_TYPE: &[u8] = "*1\n!1\n8\n".as_bytes(); pub const R_UNKNOWN_DATA_TYPE: &[u8] = "*!1\n8\n".as_bytes();
/// A heya response /// A heya response
pub const R_HEYA: &[u8] = "*1\n+4\nHEY!\n".as_bytes(); pub const R_HEYA: &[u8] = "*+4\nHEY!\n".as_bytes();
/// An other response with description: "Unknown action" /// An other response with description: "Unknown action"
pub const R_UNKNOWN_ACTION: &[u8] = "*1\n!14\nUnknown action\n".as_bytes(); pub const R_UNKNOWN_ACTION: &[u8] = "*!14\nUnknown action\n".as_bytes();
/// A 0 uint64 reply /// A 0 uint64 reply
pub const R_ONE_INT_REPLY: &[u8] = "*1\n:1\n1\n".as_bytes(); pub const R_ONE_INT_REPLY: &[u8] = "*:1\n1\n".as_bytes();
/// A 1 uint64 reply /// A 1 uint64 reply
pub const R_ZERO_INT_REPLY: &[u8] = "*1\n:1\n0\n".as_bytes(); pub const R_ZERO_INT_REPLY: &[u8] = "*:1\n0\n".as_bytes();
/// Snapshot busy (other error) /// Snapshot busy (other error)
pub const R_SNAPSHOT_BUSY: &[u8] = "*1\n!17\nerr-snapshot-busy\n".as_bytes(); pub const R_SNAPSHOT_BUSY: &[u8] = "*!17\nerr-snapshot-busy\n".as_bytes();
/// Snapshot disabled (other error) /// Snapshot disabled (other error)
pub const R_SNAPSHOT_DISABLED: &[u8] = "*1\n!21\nerr-snapshot-disabled\n".as_bytes(); pub const R_SNAPSHOT_DISABLED: &[u8] = "*!21\nerr-snapshot-disabled\n".as_bytes();
/// Snapshot has illegal name (other error) /// Snapshot has illegal name (other error)
pub const R_SNAPSHOT_ILLEGAL_NAME: &[u8] = "*1\n!25\nerr-invalid-snapshot-name\n".as_bytes(); pub const R_SNAPSHOT_ILLEGAL_NAME: &[u8] = "*!25\nerr-invalid-snapshot-name\n".as_bytes();
/// Access after termination signal (other error) /// Access after termination signal (other error)
pub const R_ERR_ACCESS_AFTER_TERMSIG: &[u8] = "*1\n!24\nerr-access-after-termsig\n".as_bytes(); pub const R_ERR_ACCESS_AFTER_TERMSIG: &[u8] = "*!24\nerr-access-after-termsig\n".as_bytes();
} }

File diff suppressed because it is too large Load Diff

@ -1,355 +0,0 @@
/*
* Created on Tue Apr 12 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#![allow(unused)] // TODO(@ohsayan): Remove this once we're done
use crate::corestore::heap_array::HeapArray;
use crate::protocol::{ParseError, ParseResult, UnsafeSlice};
use core::{marker::PhantomData, mem::transmute};
#[cfg(test)]
mod tests;
#[derive(Debug)]
pub struct Query {
forward: usize,
data: QueryType,
}
impl Query {
const fn new(forward: usize, data: QueryType) -> Self {
Self { forward, data }
}
}
#[derive(Debug)]
pub enum QueryType {
Simple(SimpleQuery),
Pipelined(PipelinedQuery),
}
#[derive(Debug)]
pub struct SimpleQuery {
data: HeapArray<UnsafeSlice>,
}
impl SimpleQuery {
#[cfg(test)]
fn into_owned(self) -> OwnedSimpleQuery {
OwnedSimpleQuery {
data: self
.data
.iter()
.map(|v| unsafe { v.as_slice().to_owned() })
.collect(),
}
}
}
#[cfg(test)]
struct OwnedSimpleQuery {
data: Vec<Vec<u8>>,
}
#[derive(Debug)]
pub struct PipelinedQuery {
data: HeapArray<HeapArray<UnsafeSlice>>,
}
impl PipelinedQuery {
#[cfg(test)]
fn into_owned(self) -> OwnedPipelinedQuery {
OwnedPipelinedQuery {
data: self
.data
.iter()
.map(|v| {
v.iter()
.map(|v| unsafe { v.as_slice().to_owned() })
.collect()
})
.collect(),
}
}
}
#[cfg(test)]
struct OwnedPipelinedQuery {
data: Vec<Vec<Vec<u8>>>,
}
/// A parser for Skyhash 2.0
pub struct Parser<'a> {
end: *const u8,
cursor: *const u8,
_lt: PhantomData<&'a ()>,
}
impl<'a> Parser<'a> {
/// Initialize a new parser
pub fn new(slice: &[u8]) -> Self {
unsafe {
Self {
end: slice.as_ptr().add(slice.len()),
cursor: slice.as_ptr(),
_lt: PhantomData,
}
}
}
}
// basic methods
impl<'a> Parser<'a> {
/// Returns a ptr one byte past the allocation of the buffer
const fn data_end_ptr(&self) -> *const u8 {
self.end
}
/// Returns the position of the cursor
/// WARNING: Deref might led to a segfault
const fn cursor_ptr(&self) -> *const u8 {
self.cursor
}
/// Check how many bytes we have left
fn remaining(&self) -> usize {
self.data_end_ptr() as usize - self.cursor_ptr() as usize
}
/// Check if we have `size` bytes remaining
fn has_remaining(&self, size: usize) -> bool {
self.remaining() >= size
}
/// Check if we have exhausted the buffer
fn exhausted(&self) -> bool {
self.cursor_ptr() >= self.data_end_ptr()
}
/// Check if the buffer is not exhausted
fn not_exhausted(&self) -> bool {
self.cursor_ptr() < self.data_end_ptr()
}
/// Attempts to return the byte pointed at by the cursor.
/// WARNING: The same segfault warning
const unsafe fn get_byte_at_cursor(&self) -> u8 {
*self.cursor_ptr()
}
}
// mutable refs
impl<'a> Parser<'a> {
/// Increment the cursor by `by` positions
unsafe fn incr_cursor_by(&mut self, by: usize) {
self.cursor = self.cursor.add(by);
}
/// Increment the position of the cursor by one position
unsafe fn incr_cursor(&mut self) {
self.incr_cursor_by(1);
}
}
// higher level abstractions
impl<'a> Parser<'a> {
/// Attempt to read `len` bytes
fn read_until(&mut self, len: usize) -> ParseResult<UnsafeSlice> {
if self.has_remaining(len) {
unsafe {
// UNSAFE(@ohsayan): Already verified lengths
let slice = UnsafeSlice::new(self.cursor_ptr(), len);
self.incr_cursor_by(len);
Ok(slice)
}
} else {
Err(ParseError::NotEnough)
}
}
/// Attempt to read a byte slice terminated by an LF
fn read_line(&mut self) -> ParseResult<UnsafeSlice> {
let start_ptr = self.cursor_ptr();
unsafe {
while self.not_exhausted() && self.get_byte_at_cursor() != b'\n' {
self.incr_cursor();
}
if self.not_exhausted() && self.get_byte_at_cursor() == b'\n' {
let len = self.cursor_ptr() as usize - start_ptr as usize;
self.incr_cursor(); // skip LF
Ok(UnsafeSlice::new(start_ptr, len))
} else {
Err(ParseError::NotEnough)
}
}
}
/// Attempt to read a line, **rejecting an empty payload**
fn read_line_pedantic(&mut self) -> ParseResult<UnsafeSlice> {
let start_ptr = self.cursor_ptr();
unsafe {
while self.not_exhausted() && self.get_byte_at_cursor() != b'\n' {
self.incr_cursor();
}
let len = self.cursor_ptr() as usize - start_ptr as usize;
let has_lf = self.not_exhausted() && self.get_byte_at_cursor() == b'\n';
if has_lf && len != 0 {
self.incr_cursor(); // skip LF
Ok(UnsafeSlice::new(start_ptr, len))
} else {
// just some silly hackery
Err(transmute(has_lf))
}
}
}
/// Attempt to read an `usize` from the buffer
fn read_usize(&mut self) -> ParseResult<usize> {
let line = self.read_line_pedantic()?;
let bytes = unsafe {
// UNSAFE(@ohsayan): We just extracted the slice
line.as_slice()
};
let mut ret = 0usize;
for byte in bytes {
if byte.is_ascii_digit() {
ret = match ret.checked_mul(10) {
Some(r) => r,
None => return Err(ParseError::DatatypeParseFailure),
};
ret = match ret.checked_add((byte & 0x0F) as _) {
Some(r) => r,
None => return Err(ParseError::DatatypeParseFailure),
};
} else {
return Err(ParseError::DatatypeParseFailure);
}
}
Ok(ret)
}
}
// query impls
impl<'a> Parser<'a> {
/// Parse the next simple query. This should have passed the `*` tsymbol
///
/// Simple query structure (tokenized line-by-line):
/// ```text
/// * -> Simple Query Header
/// <n>\n -> Count of elements in the simple query
/// <l0>\n -> Length of element 1
/// <e0> -> element 1 itself
/// <l1>\n -> Length of element 2
/// <e1> -> element 2 itself
/// ...
/// ```
fn _next_simple_query(&mut self) -> ParseResult<HeapArray<UnsafeSlice>> {
let element_count = self.read_usize()?;
unsafe {
let mut data = HeapArray::new_writer(element_count);
for i in 0..element_count {
let element_size = self.read_usize()?;
let element = self.read_until(element_size)?;
data.write_to_index(i, element);
}
Ok(data.finish())
}
}
/// Parse a simple query
fn next_simple_query(&mut self) -> ParseResult<SimpleQuery> {
Ok(SimpleQuery {
data: self._next_simple_query()?,
})
}
/// Parse a pipelined query. This should have passed the `$` tsymbol
///
/// Pipelined query structure (tokenized line-by-line):
/// ```text
/// $ -> Pipeline
/// <n>\n -> Pipeline has n queries
/// <lq0>\n -> Query 1 has 3 elements
/// <lq0e0>\n -> Q1E1 has 3 bytes
/// <q0e0> -> Q1E1 itself
/// <lq0e1>\n -> Q1E2 has 1 byte
/// <q0e1> -> Q1E2 itself
/// <lq0e2>\n -> Q1E3 has 3 bytes
/// <q0e2> -> Q1E3 itself
/// <lq1>\n -> Query 2 has 2 elements
/// <lq1e0>\n -> Q2E1 has 3 bytes
/// <q1e0> -> Q2E1 itself
/// <lq1e1>\n -> Q2E2 has 1 byte
/// <q1e1> -> Q2E2 itself
/// ...
/// ```
///
/// Example:
/// ```text
/// $ -> Pipeline
/// 2\n -> Pipeline has 2 queries
/// 3\n -> Query 1 has 3 elements
/// 3\n -> Q1E1 has 3 bytes
/// SET -> Q1E1 itself
/// 1\n -> Q1E2 has 1 byte
/// x -> Q1E2 itself
/// 3\n -> Q1E3 has 3 bytes
/// 100 -> Q1E3 itself
/// 2\n -> Query 2 has 2 elements
/// 3\n -> Q2E1 has 3 bytes
/// GET -> Q2E1 itself
/// 1\n -> Q2E2 has 1 byte
/// x -> Q2E2 itself
/// ```
fn next_pipeline(&mut self) -> ParseResult<PipelinedQuery> {
let query_count = self.read_usize()?;
unsafe {
let mut queries = HeapArray::new_writer(query_count);
for i in 0..query_count {
let sq = self._next_simple_query()?;
queries.write_to_index(i, sq);
}
Ok(PipelinedQuery {
data: queries.finish(),
})
}
}
fn _parse(&mut self) -> ParseResult<QueryType> {
if self.not_exhausted() {
unsafe {
let first_byte = self.get_byte_at_cursor();
self.incr_cursor();
let data = match first_byte {
b'*' => {
// a simple query
QueryType::Simple(self.next_simple_query()?)
}
b'$' => {
// a pipelined query
QueryType::Pipelined(self.next_pipeline()?)
}
_ => return Err(ParseError::UnexpectedByte),
};
Ok(data)
}
} else {
Err(ParseError::NotEnough)
}
}
pub fn parse(buf: &[u8]) -> ParseResult<Query> {
let mut slf = Self::new(buf);
let body = slf._parse()?;
let consumed = slf.cursor_ptr() as usize - buf.as_ptr() as usize;
Ok(Query::new(consumed, body))
}
}

@ -1,643 +0,0 @@
/*
* Created on Tue Apr 12 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::{Parser, PipelinedQuery, Query, QueryType, SimpleQuery};
use crate::protocol::ParseError;
use std::iter::Map;
use std::vec::IntoIter as VecIntoIter;
type IterPacketWithLen = Map<VecIntoIter<Vec<u8>>, fn(Vec<u8>) -> (usize, Vec<u8>)>;
type Packets = Vec<Vec<u8>>;
macro_rules! v {
() => {
vec![]
};
($literal:literal) => {
$literal.to_vec()
};
($($lit:literal),*) => {
vec![$(
$lit.as_bytes().to_owned()
),*]
}
}
fn ensure_exhausted(p: &Parser) {
assert!(!p.not_exhausted());
assert!(p.exhausted());
}
fn ensure_remaining(p: &Parser, r: usize) {
assert_eq!(p.remaining(), r);
assert!(p.has_remaining(r));
}
fn ensure_not_exhausted(p: &Parser) {
assert!(p.not_exhausted());
assert!(!p.exhausted());
}
fn get_slices(slices: &[&[u8]]) -> Packets {
slices.iter().map(|slc| slc.to_vec()).collect()
}
fn ensure_zero_reads(parser: &mut Parser) {
let r = parser.read_until(0).unwrap();
unsafe {
let slice = r.as_slice();
assert_eq!(slice, b"");
assert!(slice.is_empty());
}
}
// We do this intentionally for "heap simulation"
fn slices() -> Packets {
const SLICE_COLLECTION: &[&[u8]] = &[
b"",
b"a",
b"ab",
b"abc",
b"abcd",
b"abcde",
b"abcdef",
b"abcdefg",
b"abcdefgh",
b"abcdefghi",
b"abcdefghij",
b"abcdefghijk",
b"abcdefghijkl",
b"abcdefghijklm",
];
get_slices(SLICE_COLLECTION)
}
fn get_slices_with_len(slices: Packets) -> IterPacketWithLen {
slices.into_iter().map(|slc| (slc.len(), slc))
}
fn slices_with_len() -> IterPacketWithLen {
get_slices_with_len(slices())
}
fn slices_lf() -> Packets {
const SLICE_COLLECTION: &[&[u8]] = &[
b"",
b"a\n",
b"ab\n",
b"abc\n",
b"abcd\n",
b"abcde\n",
b"abcdef\n",
b"abcdefg\n",
b"abcdefgh\n",
b"abcdefghi\n",
b"abcdefghij\n",
b"abcdefghijk\n",
b"abcdefghijkl\n",
b"abcdefghijklm\n",
];
get_slices(SLICE_COLLECTION)
}
fn slices_lf_with_len() -> IterPacketWithLen {
get_slices_with_len(slices_lf())
}
fn simple_query(query: Query) -> SimpleQuery {
if let QueryType::Simple(sq) = query.data {
sq
} else {
panic!("Got pipeline instead of simple!");
}
}
fn pipelined_query(query: Query) -> PipelinedQuery {
if let QueryType::Pipelined(pq) = query.data {
pq
} else {
panic!("Got simple instead of pipeline!");
}
}
// "actual" tests
// data_end_ptr
#[test]
fn data_end_ptr() {
for (len, src) in slices_with_len() {
let parser = Parser::new(&src);
unsafe {
assert_eq!(parser.data_end_ptr(), src.as_ptr().add(len));
}
}
}
// cursor_ptr
#[test]
fn cursor_ptr() {
for src in slices() {
let parser = Parser::new(&src);
assert_eq!(parser.cursor_ptr(), src.as_ptr())
}
}
#[test]
fn cursor_ptr_with_incr() {
for src in slices() {
let mut parser = Parser::new(&src);
unsafe {
parser.incr_cursor_by(src.len());
assert_eq!(parser.cursor_ptr(), src.as_ptr().add(src.len()));
}
}
}
// remaining
#[test]
fn remaining() {
for (len, src) in slices_with_len() {
let parser = Parser::new(&src);
assert_eq!(parser.remaining(), len);
}
}
#[test]
fn remaining_with_incr() {
for (len, src) in slices_with_len() {
let mut parser = Parser::new(&src);
unsafe {
// no change
parser.incr_cursor_by(0);
assert_eq!(parser.remaining(), len);
if len != 0 {
// move one byte ahead. should reach EOA or len - 1
parser.incr_cursor();
assert_eq!(parser.remaining(), len - 1);
// move the cursor to the end; should reach EOA
parser.incr_cursor_by(len - 1);
assert_eq!(parser.remaining(), 0);
}
}
}
}
// has_remaining
#[test]
fn has_remaining() {
for (len, src) in slices_with_len() {
let parser = Parser::new(&src);
assert!(parser.has_remaining(len), "should have {len} remaining")
}
}
#[test]
fn has_remaining_with_incr() {
for (len, src) in slices_with_len() {
let mut parser = Parser::new(&src);
unsafe {
// no change
parser.incr_cursor_by(0);
assert!(parser.has_remaining(len));
if len != 0 {
// move one byte ahead. should reach EOA or len - 1
parser.incr_cursor();
assert!(parser.has_remaining(len - 1));
// move the cursor to the end; should reach EOA
parser.incr_cursor_by(len - 1);
assert!(!parser.has_remaining(1));
// should always be true
assert!(parser.has_remaining(0));
}
}
}
}
// exhausted
#[test]
fn exhausted() {
for src in slices() {
let parser = Parser::new(&src);
if src.is_empty() {
assert!(parser.exhausted());
} else {
assert!(!parser.exhausted())
}
}
}
#[test]
fn exhausted_with_incr() {
for (len, src) in slices_with_len() {
let mut parser = Parser::new(&src);
if len == 0 {
assert!(parser.exhausted());
} else {
assert!(!parser.exhausted());
unsafe {
parser.incr_cursor();
if len == 1 {
assert!(parser.exhausted());
} else {
assert!(!parser.exhausted());
parser.incr_cursor_by(len - 1);
assert!(parser.exhausted());
}
}
}
}
}
// not_exhausted
#[test]
fn not_exhausted() {
for src in slices() {
let parser = Parser::new(&src);
if src.is_empty() {
assert!(!parser.not_exhausted());
} else {
assert!(parser.not_exhausted())
}
}
}
#[test]
fn not_exhausted_with_incr() {
for (len, src) in slices_with_len() {
let mut parser = Parser::new(&src);
if len == 0 {
assert!(!parser.not_exhausted());
} else {
assert!(parser.not_exhausted());
unsafe {
parser.incr_cursor();
if len == 1 {
assert!(!parser.not_exhausted());
} else {
assert!(parser.not_exhausted());
parser.incr_cursor_by(len - 1);
assert!(!parser.not_exhausted());
}
}
}
}
}
// read_until
#[test]
fn read_until_empty() {
let b = v!(b"");
let mut parser = Parser::new(&b);
ensure_zero_reads(&mut parser);
assert_eq!(parser.read_until(1).unwrap_err(), ParseError::NotEnough);
}
#[test]
fn read_until_nonempty() {
for (len, src) in slices_with_len() {
let mut parser = Parser::new(&src);
// should always work
ensure_zero_reads(&mut parser);
// now read the entire length; should always work
let r = parser.read_until(len).unwrap();
unsafe {
let slice = r.as_slice();
assert_eq!(slice, src.as_slice());
assert_eq!(slice.len(), len);
}
// even after the buffer is exhausted, `0` should always work
ensure_zero_reads(&mut parser);
}
}
#[test]
fn read_until_not_enough() {
for (len, src) in slices_with_len() {
let mut parser = Parser::new(&src);
ensure_zero_reads(&mut parser);
// try to read more than the amount of data bufferred
assert_eq!(
parser.read_until(len + 1).unwrap_err(),
ParseError::NotEnough
);
// should the above fail, zero reads should still work
ensure_zero_reads(&mut parser);
}
}
#[test]
fn read_until_more_bytes() {
let sample1 = v!(b"abcd1");
let mut p1 = Parser::new(&sample1);
unsafe {
assert_eq!(
p1.read_until(&sample1.len() - 1).unwrap().as_slice(),
&sample1[..&sample1.len() - 1]
);
// ensure we have not exhasuted
ensure_not_exhausted(&p1);
ensure_remaining(&p1, 1);
}
let sample2 = v!(b"abcd1234567890!@#$");
let mut p2 = Parser::new(&sample2);
unsafe {
assert_eq!(p2.read_until(4).unwrap().as_slice(), &sample2[..4]);
// ensure we have not exhasuted
ensure_not_exhausted(&p2);
ensure_remaining(&p2, sample2.len() - 4);
}
}
// read_line
#[test]
fn read_line_special_case_only_lf() {
let b = v!(b"\n");
let mut parser = Parser::new(&b);
unsafe {
let r = parser.read_line().unwrap();
let slice = r.as_slice();
assert_eq!(slice, b"");
assert!(slice.is_empty());
};
// ensure it is exhausted
ensure_exhausted(&parser);
}
#[test]
fn read_line() {
for (len, src) in slices_lf_with_len() {
let mut parser = Parser::new(&src);
if len == 0 {
// should be empty, so NotEnough
assert_eq!(parser.read_line().unwrap_err(), ParseError::NotEnough);
} else {
// should work
unsafe {
assert_eq!(
parser.read_line().unwrap().as_slice(),
&src.as_slice()[..len - 1]
);
}
// now, we attempt to read which should work
ensure_zero_reads(&mut parser);
}
// ensure it is exhausted
ensure_exhausted(&parser);
// now, we attempt to read another line which should fail
assert_eq!(parser.read_line().unwrap_err(), ParseError::NotEnough);
// ensure that cursor is at end
unsafe {
assert_eq!(parser.cursor_ptr(), src.as_ptr().add(len));
}
}
}
#[test]
fn read_line_more_bytes() {
let sample1 = v!(b"abcd\n1");
let mut p1 = Parser::new(&sample1);
let line = p1.read_line().unwrap();
unsafe {
assert_eq!(line.as_slice(), b"abcd");
}
// we should still have one remaining
ensure_not_exhausted(&p1);
ensure_remaining(&p1, 1);
}
#[test]
fn read_line_subsequent_lf() {
let sample1 = v!(b"abcd\n1\n");
let mut p1 = Parser::new(&sample1);
let line = p1.read_line().unwrap();
unsafe {
assert_eq!(line.as_slice(), b"abcd");
}
// we should still have two octets remaining
ensure_not_exhausted(&p1);
ensure_remaining(&p1, 2);
// and we should be able to read in another line
let line = p1.read_line().unwrap();
unsafe {
assert_eq!(line.as_slice(), b"1");
}
ensure_exhausted(&p1);
}
#[test]
fn read_line_pedantic_okay() {
for (len, src) in slices_lf_with_len() {
let mut parser = Parser::new(&src);
if len == 0 {
// should be empty, so NotEnough
assert_eq!(
parser.read_line_pedantic().unwrap_err(),
ParseError::NotEnough
);
} else {
// should work
unsafe {
assert_eq!(
parser.read_line_pedantic().unwrap().as_slice(),
&src.as_slice()[..len - 1]
);
}
// now, we attempt to read which should work
ensure_zero_reads(&mut parser);
}
// ensure it is exhausted
ensure_exhausted(&parser);
// now, we attempt to read another line which should fail
assert_eq!(
parser.read_line_pedantic().unwrap_err(),
ParseError::NotEnough
);
// ensure that cursor is at end
unsafe {
assert_eq!(parser.cursor_ptr(), src.as_ptr().add(len));
}
}
}
#[test]
fn read_line_pedantic_fail_empty() {
let payload = v!(b"");
assert_eq!(
Parser::new(&payload).read_line_pedantic().unwrap_err(),
ParseError::NotEnough
);
}
#[test]
fn read_line_pedantic_fail_only_lf() {
let payload = v!(b"\n");
assert_eq!(
Parser::new(&payload).read_line_pedantic().unwrap_err(),
ParseError::BadPacket
);
}
#[test]
fn read_line_pedantic_fail_only_lf_extra_data() {
let payload = v!(b"\n1");
assert_eq!(
Parser::new(&payload).read_line_pedantic().unwrap_err(),
ParseError::BadPacket
);
}
#[test]
fn read_usize_fail_empty() {
let payload = v!(b"");
assert_eq!(
Parser::new(&payload).read_usize().unwrap_err(),
ParseError::NotEnough
);
let payload = v!(b"\n");
assert_eq!(
Parser::new(&payload).read_usize().unwrap_err(),
ParseError::BadPacket
);
}
#[test]
fn read_usize_fail_no_lf() {
let payload = v!(b"1");
assert_eq!(
Parser::new(&payload).read_usize().unwrap_err(),
ParseError::NotEnough
);
}
#[test]
fn read_usize_okay() {
let payload = v!(b"1\n");
assert_eq!(Parser::new(&payload).read_usize().unwrap(), 1);
let payload = v!(b"1234\n");
assert_eq!(Parser::new(&payload).read_usize().unwrap(), 1234);
}
#[test]
fn read_usize_fail() {
let payload = v!(b"a\n");
assert_eq!(
Parser::new(&payload).read_usize().unwrap_err(),
ParseError::DatatypeParseFailure
);
let payload = v!(b"1a\n");
assert_eq!(
Parser::new(&payload).read_usize().unwrap_err(),
ParseError::DatatypeParseFailure
);
let payload = v!(b"a1\n");
assert_eq!(
Parser::new(&payload).read_usize().unwrap_err(),
ParseError::DatatypeParseFailure
);
let payload = v!(b"aa\n");
assert_eq!(
Parser::new(&payload).read_usize().unwrap_err(),
ParseError::DatatypeParseFailure
);
let payload = v!(b"12345abcde\n");
assert_eq!(
Parser::new(&payload).read_usize().unwrap_err(),
ParseError::DatatypeParseFailure
);
}
#[test]
fn parse_fail_because_unknown_query_scheme() {
let body = v!(b"?3\n3\nSET1\nx3\n100");
assert_eq!(
Parser::parse(&body).unwrap_err(),
ParseError::UnexpectedByte
)
}
#[test]
fn simple_query_okay() {
let body = v!(b"*3\n3\nSET1\nx3\n100");
let ret = Parser::parse(&body).unwrap();
assert_eq!(ret.forward, body.len());
let query = simple_query(ret);
assert_eq!(query.into_owned().data, v!["SET", "x", "100"]);
}
#[test]
fn simple_query_okay_empty_elements() {
let body = v!(b"*3\n3\nSET0\n0\n");
let ret = Parser::parse(&body).unwrap();
assert_eq!(ret.forward, body.len());
let query = simple_query(ret);
assert_eq!(query.into_owned().data, v!["SET", "", ""]);
}
#[test]
fn parse_fail_because_not_enough() {
let full_payload = b"*3\n3\nSET1\nx3\n100";
let samples: Vec<Vec<u8>> = (0..full_payload.len() - 1)
.map(|i| full_payload.iter().take(i).cloned().collect())
.collect();
for body in samples {
assert_eq!(
Parser::parse(&body).unwrap_err(),
ParseError::NotEnough,
"Failed with body len: {}",
body.len()
)
}
}
#[test]
fn pipelined_query_okay() {
let body = v!(b"$2\n3\n3\nSET1\nx3\n1002\n3\nGET1\nx");
let ret = Parser::parse(&body).unwrap();
assert_eq!(ret.forward, body.len());
let query = pipelined_query(ret);
assert_eq!(
query.into_owned().data,
vec![v!["SET", "x", "100"], v!["GET", "x"]]
)
}
#[test]
fn pipelined_query_okay_empty_elements() {
let body = v!(b"$2\n3\n3\nSET0\n3\n1002\n3\nGET0\n");
let ret = Parser::parse(&body).unwrap();
assert_eq!(ret.forward, body.len());
let query = pipelined_query(ret);
assert_eq!(
query.into_owned().data,
vec![v!["SET", "", "100"], v!["GET", ""]]
)
}
#[test]
fn pipelined_query_fail_because_not_enough() {
let full_payload = v!(b"$2\n3\n3\nSET1\nx3\n1002\n3\nGET1\nx");
let samples: Vec<Vec<u8>> = (0..full_payload.len() - 1)
.map(|i| full_payload.iter().cloned().take(i).collect())
.collect();
for body in samples {
let ret = Parser::parse(&body).unwrap_err();
assert_eq!(ret, ParseError::NotEnough)
}
}

@ -30,12 +30,9 @@ use crate::actions::{ActionError, ActionResult};
use crate::auth; use crate::auth;
use crate::corestore::Corestore; use crate::corestore::Corestore;
use crate::dbnet::connection::prelude::*; use crate::dbnet::connection::prelude::*;
use crate::protocol::{ use crate::protocol::{iter::AnyArrayIter, responses, PipelinedQuery, SimpleQuery, UnsafeSlice};
element::UnsafeElement, iter::AnyArrayIter, responses, PipelineQuery, SimpleQuery,
};
use crate::queryengine::parser::Entity; use crate::queryengine::parser::Entity;
use crate::{actions, admin}; use crate::{actions, admin};
use core::hint::unreachable_unchecked;
mod ddl; mod ddl;
mod inspect; mod inspect;
pub mod parser; pub mod parser;
@ -84,15 +81,15 @@ action! {
auth: &mut AuthProviderHandle<'_, T, Strm>, auth: &mut AuthProviderHandle<'_, T, Strm>,
buf: SimpleQuery buf: SimpleQuery
) { ) {
if buf.is_any_array() { let bufref = buf.as_slice();
let bufref = unsafe { buf.into_inner() }; let mut iter = unsafe {
let mut iter = unsafe { get_iter(&bufref) }; // UNSAFE(@ohsayan): The presence of the connection guarantees that this
match iter.next_lowercase().unwrap_or_custom_aerr(groups::PACKET_ERR)?.as_ref() { // won't suddenly become invalid
ACTION_AUTH => auth::auth_login_only(con, auth, iter).await, AnyArrayIter::new(bufref.iter())
_ => util::err(auth::errors::AUTH_CODE_BAD_CREDENTIALS), };
} match iter.next_lowercase().unwrap_or_custom_aerr(groups::PACKET_ERR)?.as_ref() {
} else { ACTION_AUTH => auth::auth_login_only(con, auth, iter).await,
util::err(groups::WRONGTYPE_ERR) _ => util::err(auth::errors::AUTH_CODE_BAD_CREDENTIALS),
} }
} }
//// Execute a simple query //// Execute a simple query
@ -102,46 +99,20 @@ action! {
auth: &mut AuthProviderHandle<'_, T, Strm>, auth: &mut AuthProviderHandle<'_, T, Strm>,
buf: SimpleQuery buf: SimpleQuery
) { ) {
if buf.is_any_array() { self::execute_stage(db, con, auth, buf.as_slice()).await
unsafe {
self::execute_stage(db, con, auth, &buf.into_inner()).await
}
} else {
util::err(groups::WRONGTYPE_ERR)
}
} }
} }
#[allow(clippy::needless_lifetimes)]
unsafe fn get_iter<'a>(buf: &'a UnsafeElement) -> AnyArrayIter<'a> {
// this is the boxed slice
let bufref = {
// SAFETY: execute_simple is called by execute_query which in turn is called
// by ConnnectionHandler::run(). In all cases, the `Con` remains valid
// ensuring that the source buffer exists as long as the connection does
// so this is safe.
match buf {
UnsafeElement::AnyArray(arr) => arr,
_ => unreachable_unchecked(),
}
};
// this is our final iter
let iter = {
// SAFETY: Again, this is guaranteed to be valid because the `con` is valid
AnyArrayIter::new(bufref.iter())
};
iter
}
async fn execute_stage<'a, T: 'a + ClientConnection<Strm>, Strm: Stream>( async fn execute_stage<'a, T: 'a + ClientConnection<Strm>, Strm: Stream>(
db: &mut Corestore, db: &mut Corestore,
con: &'a mut T, con: &'a mut T,
auth: &mut AuthProviderHandle<'_, T, Strm>, auth: &mut AuthProviderHandle<'_, T, Strm>,
buf: &UnsafeElement, buf: &[UnsafeSlice],
) -> ActionResult<()> { ) -> ActionResult<()> {
let mut iter = unsafe { let mut iter = unsafe {
// UNSAFE(@ohsayan): Assumed to be guaranteed by the caller // UNSAFE(@ohsayan): The presence of the connection guarantees that this
get_iter(buf) // won't suddenly become invalid
AnyArrayIter::new(buf.iter())
}; };
{ {
gen_constants_and_matches!( gen_constants_and_matches!(
@ -204,10 +175,9 @@ async fn execute_stage_pedantic<'a, T: ClientConnection<Strm> + 'a, Strm: Stream
handle: &mut Corestore, handle: &mut Corestore,
con: &mut T, con: &mut T,
auth: &mut AuthProviderHandle<'_, T, Strm>, auth: &mut AuthProviderHandle<'_, T, Strm>,
stage: &UnsafeElement, stage: &[UnsafeSlice],
) -> crate::IoResult<()> { ) -> crate::IoResult<()> {
let ret = async { let ret = async {
ensure_cond_or_err(stage.is_any_array(), groups::WRONGTYPE_ERR)?;
self::execute_stage(handle, con, auth, stage).await?; self::execute_stage(handle, con, auth, stage).await?;
Ok(()) Ok(())
}; };
@ -224,9 +194,9 @@ action! {
handle: &mut Corestore, handle: &mut Corestore,
con: &mut T, con: &mut T,
auth: &mut AuthProviderHandle<'_, T, Strm>, auth: &mut AuthProviderHandle<'_, T, Strm>,
pipeline: PipelineQuery pipeline: PipelinedQuery
) { ) {
for stage in pipeline.iter() { for stage in pipeline.into_inner().iter() {
self::execute_stage_pedantic(handle, con, auth, stage).await?; self::execute_stage_pedantic(handle, con, auth, stage).await?;
} }
Ok(()) Ok(())

@ -102,8 +102,6 @@ impl Writable for StringWrapper {
con.write_lowlevel(&[b'\n']).await?; con.write_lowlevel(&[b'\n']).await?;
// Now write the REAL bytes (of the object) // Now write the REAL bytes (of the object)
con.write_lowlevel(self.0.as_bytes()).await?; con.write_lowlevel(self.0.as_bytes()).await?;
// Now write another LF
con.write_lowlevel(&[b'\n']).await?;
Ok(()) Ok(())
}) })
} }
@ -143,8 +141,6 @@ impl Writable for &'static str {
con.write_lowlevel(&[b'\n']).await?; con.write_lowlevel(&[b'\n']).await?;
// Now write the REAL bytes (of the object) // Now write the REAL bytes (of the object)
con.write_lowlevel(self.as_bytes()).await?; con.write_lowlevel(self.as_bytes()).await?;
// Now write another LF
con.write_lowlevel(&[b'\n']).await?;
Ok(()) Ok(())
}) })
} }
@ -167,8 +163,6 @@ impl Writable for BytesWrapper {
con.write_lowlevel(&[b'\n']).await?; con.write_lowlevel(&[b'\n']).await?;
// Now write the REAL bytes (of the object) // Now write the REAL bytes (of the object)
con.write_lowlevel(&bytes).await?; con.write_lowlevel(&bytes).await?;
// Now write another LF
con.write_lowlevel(&[b'\n']).await?;
Ok(()) Ok(())
}) })
} }
@ -179,9 +173,6 @@ impl Writable for usize {
Box::pin(async move { Box::pin(async move {
con.write_lowlevel(b":").await?; con.write_lowlevel(b":").await?;
let usize_bytes = Integer64::from(self); let usize_bytes = Integer64::from(self);
let usize_bytes_len = Integer64::from(usize_bytes.len());
con.write_lowlevel(&usize_bytes_len).await?;
con.write_lowlevel(b"\n").await?;
con.write_lowlevel(&usize_bytes).await?; con.write_lowlevel(&usize_bytes).await?;
con.write_lowlevel(b"\n").await?; con.write_lowlevel(b"\n").await?;
Ok(()) Ok(())
@ -194,9 +185,6 @@ impl Writable for u64 {
Box::pin(async move { Box::pin(async move {
con.write_lowlevel(b":").await?; con.write_lowlevel(b":").await?;
let usize_bytes = Integer64::from(self); let usize_bytes = Integer64::from(self);
let usize_bytes_len = Integer64::from(usize_bytes.len());
con.write_lowlevel(&usize_bytes_len).await?;
con.write_lowlevel(b"\n").await?;
con.write_lowlevel(&usize_bytes).await?; con.write_lowlevel(&usize_bytes).await?;
con.write_lowlevel(b"\n").await?; con.write_lowlevel(b"\n").await?;
Ok(()) Ok(())
@ -220,8 +208,6 @@ impl Writable for ObjectID {
con.write_lowlevel(&[b'\n']).await?; con.write_lowlevel(&[b'\n']).await?;
// Now write the REAL bytes (of the object) // Now write the REAL bytes (of the object)
con.write_lowlevel(&self).await?; con.write_lowlevel(&self).await?;
// Now write another LF
con.write_lowlevel(&[b'\n']).await?;
Ok(()) Ok(())
}) })
} }
@ -231,10 +217,7 @@ impl Writable for f32 {
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> { fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
Box::pin(async move { Box::pin(async move {
let payload = self.to_string(); let payload = self.to_string();
let payload_len = Integer64::from(payload.len());
con.write_lowlevel(&[TSYMBOL_FLOAT]).await?; con.write_lowlevel(&[TSYMBOL_FLOAT]).await?;
con.write_lowlevel(&payload_len).await?;
con.write_lowlevel(&[b'\n']).await?;
con.write_lowlevel(payload.as_bytes()).await?; con.write_lowlevel(payload.as_bytes()).await?;
con.write_lowlevel(&[b'\n']).await?; con.write_lowlevel(&[b'\n']).await?;
Ok(()) Ok(())

@ -49,7 +49,6 @@ where
raw_stream.write_all(&bytes).await?; // then len raw_stream.write_all(&bytes).await?; // then len
raw_stream.write_all(&[b'\n']).await?; // LF raw_stream.write_all(&[b'\n']).await?; // LF
raw_stream.write_all(payload).await?; // payload raw_stream.write_all(payload).await?; // payload
raw_stream.write_all(&[b'\n']).await?; // final LF
Ok(()) Ok(())
} }
@ -103,8 +102,6 @@ where
stream.write_all(&[b'\n']).await?; stream.write_all(&[b'\n']).await?;
// now element // now element
stream.write_all(bytes).await?; stream.write_all(bytes).await?;
// now final LF
stream.write_all(&[b'\n']).await?;
Ok(()) Ok(())
} }
/// Write the NIL response code /// Write the NIL response code
@ -167,14 +164,12 @@ where
stream.write_all(&[b'\n']).await?; stream.write_all(&[b'\n']).await?;
// now element // now element
stream.write_all(bytes).await?; stream.write_all(bytes).await?;
// now final LF
stream.write_all(&[b'\n']).await?;
Ok(()) Ok(())
} }
/// Write a null /// Write a null
pub async fn write_null(&mut self) -> IoResult<()> { pub async fn write_null(&mut self) -> IoResult<()> {
let stream = unsafe { self.con.raw_stream() }; let stream = unsafe { self.con.raw_stream() };
stream.write_all(&[b'\0', b'\n']).await?; stream.write_all(&[b'\0']).await?;
Ok(()) Ok(())
} }
} }
@ -225,8 +220,6 @@ where
stream.write_all(&[b'\n']).await?; stream.write_all(&[b'\n']).await?;
// now element // now element
stream.write_all(bytes).await?; stream.write_all(bytes).await?;
// now final LF
stream.write_all(&[b'\n']).await?;
Ok(()) Ok(())
} }
} }

@ -115,31 +115,21 @@ fn _get_eresp_array(tokens: TokenStream) -> TokenStream {
_ => panic!("Expected a string literal"), _ => panic!("Expected a string literal"),
}; };
let payload_bytes = payload_str.as_bytes(); let payload_bytes = payload_str.as_bytes();
let payload_len = payload_bytes.len();
let payload_len_str = payload_len.to_string();
let payload_len_bytes = payload_len_str.as_bytes();
let mut processed = quote! { let mut processed = quote! {
b'!', b'!',
}; };
for byte in payload_len_bytes { for byte in payload_bytes {
processed = quote! { processed = quote! {
#processed #processed
#byte, #byte,
}; }
} }
processed = quote! { processed = quote! {
#processed #processed
b'\n', b'\n',
}; };
for byte in payload_bytes {
processed = quote! {
#processed
#byte,
}
}
processed = quote! { processed = quote! {
[#processed [#processed]
b'\n',]
}; };
processed.into() processed.into()
} }

Loading…
Cancel
Save