Merge pull request #249 from skytable/protocol/skyhash-2

Implement and stabilize Skyhash 2.0
next
Glydr 2 years ago committed by GitHub
commit 3b1f9e2f06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

60
Cargo.lock generated

@ -91,14 +91,14 @@ checksum = "8a32fd6af2b5827bce66c29053ba0e7c42b9dcab01835835058558c10851a46b"
[[package]]
name = "bb8"
version = "0.7.1"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e9f4fa9768efd269499d8fba693260cfc670891cf6de3adc935588447a77cc8"
checksum = "1627eccf3aa91405435ba240be23513eeca466b5dc33866422672264de061582"
dependencies = [
"async-trait",
"futures-channel",
"futures-util",
"parking_lot 0.11.2",
"parking_lot 0.12.0",
"tokio",
]
@ -460,9 +460,9 @@ dependencies = [
[[package]]
name = "flate2"
version = "1.0.22"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f"
checksum = "b39522e96686d38f4bc984b9198e3a0613264abaebaff2c5c918bfa6b6da09af"
dependencies = [
"cfg-if",
"crc32fast",
@ -556,7 +556,7 @@ dependencies = [
"log",
"openssl",
"powershell_script",
"skytable 0.7.0-alpha.4 (git+https://github.com/skytable/client-rust.git)",
"skytable 0.7.0 (git+https://github.com/skytable/client-rust.git)",
"zip",
]
@ -661,9 +661,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.122"
version = "0.2.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec647867e2bf0772e28c8bcde4f0d19a9216916e890543b5a03ed8ef27b8f259"
checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50"
[[package]]
name = "libsky"
@ -722,12 +722,11 @@ dependencies = [
[[package]]
name = "miniz_oxide"
version = "0.4.4"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
checksum = "d2b29bd4bc3f33391105ebee3589c19197c4271e3e5a9ec9bfe8127eeff8f082"
dependencies = [
"adler",
"autocfg",
]
[[package]]
@ -1043,9 +1042,9 @@ dependencies = [
[[package]]
name = "rayon"
version = "1.5.1"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90"
checksum = "fd249e82c21598a9a426a4e00dd7adc1d640b22445ec8545feef801d1a74c221"
dependencies = [
"autocfg",
"crossbeam-deque",
@ -1055,14 +1054,13 @@ dependencies = [
[[package]]
name = "rayon-core"
version = "1.9.1"
version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e"
checksum = "9f51245e1e62e1f1629cbfec37b5793bbabcaeb90f30e94d2ba03564687353e4"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"lazy_static",
"num_cpus",
]
@ -1116,9 +1114,9 @@ checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "rustix"
version = "0.34.2"
version = "0.34.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96619609a54d638872db136f56941d34e2a00bb0acf3fa783a90d6b96a093ba2"
checksum = "3f5d1c6ed6d1c6915aa64749b809fc1bafff49d160f5d927463658093d7d62ab"
dependencies = [
"bitflags",
"errno",
@ -1266,7 +1264,7 @@ dependencies = [
"rand",
"serde",
"serde_json",
"skytable 0.7.0-alpha.4 (git+https://github.com/skytable/client-rust?branch=next)",
"skytable 0.7.0 (git+https://github.com/skytable/client-rust?branch=next)",
]
[[package]]
@ -1277,7 +1275,7 @@ dependencies = [
"clap",
"env_logger",
"log",
"skytable 0.7.0-alpha.4 (git+https://github.com/skytable/client-rust.git)",
"skytable 0.7.0 (git+https://github.com/skytable/client-rust.git)",
]
[[package]]
@ -1316,7 +1314,7 @@ dependencies = [
"regex",
"serde",
"sky_macros",
"skytable 0.7.0-alpha.4 (git+https://github.com/skytable/client-rust?branch=next)",
"skytable 0.7.0 (git+https://github.com/skytable/client-rust?branch=next)",
"tokio",
"tokio-openssl",
"toml",
@ -1331,14 +1329,14 @@ dependencies = [
"crossterm",
"libsky",
"rustyline",
"skytable 0.7.0-alpha.4 (git+https://github.com/skytable/client-rust?branch=next)",
"skytable 0.7.0 (git+https://github.com/skytable/client-rust?branch=next)",
"tokio",
]
[[package]]
name = "skytable"
version = "0.7.0-alpha.4"
source = "git+https://github.com/skytable/client-rust?branch=next#feb1dda0c41c866e6e52dac6589fadc20b41af68"
version = "0.7.0"
source = "git+https://github.com/skytable/client-rust?branch=next#8d4ead1bc0e58421fada2c1fd7a9f75ccdc3f30e"
dependencies = [
"async-trait",
"bb8",
@ -1351,8 +1349,8 @@ dependencies = [
[[package]]
name = "skytable"
version = "0.7.0-alpha.4"
source = "git+https://github.com/skytable/client-rust.git#feb1dda0c41c866e6e52dac6589fadc20b41af68"
version = "0.7.0"
source = "git+https://github.com/skytable/client-rust.git#8d4ead1bc0e58421fada2c1fd7a9f75ccdc3f30e"
dependencies = [
"r2d2",
]
@ -1396,7 +1394,7 @@ dependencies = [
"log",
"num_cpus",
"rand",
"skytable 0.7.0-alpha.4 (git+https://github.com/skytable/client-rust?branch=next)",
"skytable 0.7.0 (git+https://github.com/skytable/client-rust?branch=next)",
"sysinfo",
]
@ -1425,9 +1423,9 @@ dependencies = [
[[package]]
name = "sysinfo"
version = "0.23.9"
version = "0.23.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3fb8adaa82317f1e8a040281807f411803c9111303cfe129b4abb4a14b2c223"
checksum = "4eea2ed6847da2e0c7289f72cb4f285f0bd704694ca067d32be811b2a45ea858"
dependencies = [
"cfg-if",
"core-foundation-sys",
@ -1550,9 +1548,9 @@ dependencies = [
[[package]]
name = "toml"
version = "0.5.8"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa"
checksum = "8d82e1a7758622a465f8cee077614c73484dac5b836c02ff6a40d5d1010324d7"
dependencies = [
"serde",
]

@ -274,5 +274,4 @@ fn clear_screen() {
let mut stdout = stdout();
execute!(stdout, Clear(ClearType::All)).expect("Failed to clear screen");
execute!(stdout, cursor::MoveTo(0, 0)).expect("Failed to move cursor to origin");
drop(stdout); // aggressively drop stdout
}

@ -82,7 +82,7 @@ pub fn build(mode: BuildMode) -> HarnessResult<PathBuf> {
let target_folder = util::get_target_folder(mode);
if let Some(t) = util::get_var(util::VAR_TARGET) {
build_args.push("--target".to_owned());
build_args.push(t.to_string());
build_args.push(t);
};
// assemble build args

@ -28,11 +28,11 @@ use crate::{
util::{self},
HarnessError, HarnessResult, ROOT_DIR,
};
use skytable::Connection;
use skytable::{error::Error, Connection, SkyResult};
#[cfg(windows)]
use std::os::windows::process::CommandExt;
use std::{
io::{Error as IoError, ErrorKind},
io::ErrorKind,
path::Path,
process::{Child, Command},
};
@ -92,10 +92,10 @@ pub(super) fn wait_for_server_exit() -> HarnessResult<()> {
Ok(())
}
fn connection_refused<T>(input: Result<T, IoError>) -> HarnessResult<bool> {
fn connection_refused<T>(input: SkyResult<T>) -> HarnessResult<bool> {
match input {
Ok(_) => Ok(false),
Err(e)
Err(Error::IoError(e))
if matches!(
e.kind(),
ErrorKind::ConnectionRefused | ErrorKind::ConnectionReset

@ -51,14 +51,14 @@ macro_rules! conwrite {
($con:expr, $what:expr) => {
$con.write_response($what)
.await
.map_err(|e| crate::actions::ActionError::IoError(e))
.map_err(|e| $crate::actions::ActionError::IoError(e))
};
}
#[macro_export]
macro_rules! aerr {
($con:expr) => {
return conwrite!($con, crate::protocol::responses::groups::ACTION_ERR)
return conwrite!($con, $crate::protocol::responses::groups::ACTION_ERR)
};
}
@ -70,7 +70,7 @@ macro_rules! get_tbl {
($store:expr, $con:expr) => {{
match $store.get_ctable() {
Some(tbl) => tbl,
None => return crate::util::err(crate::protocol::responses::groups::DEFAULT_UNSET),
None => return $crate::util::err($crate::protocol::responses::groups::DEFAULT_UNSET),
}
}};
}
@ -80,7 +80,7 @@ macro_rules! get_tbl_ref {
($store:expr, $con:expr) => {{
match $store.get_ctable_ref() {
Some(tbl) => tbl,
None => return crate::util::err(crate::protocol::responses::groups::DEFAULT_UNSET),
None => return $crate::util::err($crate::protocol::responses::groups::DEFAULT_UNSET),
}
}};
}
@ -88,7 +88,7 @@ macro_rules! get_tbl_ref {
#[macro_export]
macro_rules! handle_entity {
($con:expr, $ident:expr) => {{
match crate::queryengine::parser::Entity::from_slice(&$ident) {
match $crate::queryengine::parser::Entity::from_slice(&$ident) {
Ok(e) => e,
Err(e) => return conwrite!($con, e),
}

@ -27,17 +27,17 @@
use crate::actions::ActionError;
/// Skyhash respstring: already claimed (user was already claimed)
pub const AUTH_ERROR_ALREADYCLAIMED: &[u8] = b"!24\nerr-auth-already-claimed\n";
pub const AUTH_ERROR_ALREADYCLAIMED: &[u8] = b"!err-auth-already-claimed\n";
/// Skyhash respcode(10): bad credentials (either bad creds or invalid user)
pub const AUTH_CODE_BAD_CREDENTIALS: &[u8] = b"!2\n10\n";
pub const AUTH_CODE_BAD_CREDENTIALS: &[u8] = b"!10\n";
/// Skyhash respstring: auth is disabled
pub const AUTH_ERROR_DISABLED: &[u8] = b"!17\nerr-auth-disabled\n";
pub const AUTH_ERROR_DISABLED: &[u8] = b"!err-auth-disabled\n";
/// Skyhash respcode(11): Insufficient permissions (same for anonymous user)
pub const AUTH_CODE_PERMS: &[u8] = b"!2\n11\n";
pub const AUTH_CODE_PERMS: &[u8] = b"!11\n";
/// Skyhash respstring: ID is too long
pub const AUTH_ERROR_ILLEGAL_USERNAME: &[u8] = b"!25\nerr-auth-illegal-username\n";
pub const AUTH_ERROR_ILLEGAL_USERNAME: &[u8] = b"!err-auth-illegal-username\n";
/// Skyhash respstring: ID is protected/in use
pub const AUTH_ERROR_FAILED_TO_DELETE_USER: &[u8] = b"!21\nerr-auth-deluser-fail\n";
pub const AUTH_ERROR_FAILED_TO_DELETE_USER: &[u8] = b"!err-auth-deluser-fail\n";
/// Auth erros
#[derive(PartialEq, Debug)]

@ -24,68 +24,98 @@
*
*/
#![allow(dead_code)] // TODO(@ohsayan): Remove this once we're done
use core::alloc::Layout;
use core::fmt;
use core::mem::ManuallyDrop;
use core::ops::Deref;
use core::ptr;
use core::slice;
use core::{alloc::Layout, fmt, marker::PhantomData, mem::ManuallyDrop, ops::Deref, ptr, slice};
use std::alloc::dealloc;
/// A heap-allocated array
pub struct HeapArray {
ptr: *const u8,
pub struct HeapArray<T> {
ptr: *const T,
len: usize,
_marker: PhantomData<T>,
}
pub struct HeapArrayWriter<T> {
base: Vec<T>,
}
impl<T> HeapArrayWriter<T> {
pub fn with_capacity(cap: usize) -> Self {
Self {
base: Vec::with_capacity(cap),
}
}
/// ## Safety
/// Caller must ensure that `idx <= cap`. If not, you'll corrupt your
/// memory
pub unsafe fn write_to_index(&mut self, idx: usize, element: T) {
debug_assert!(idx <= self.base.capacity());
ptr::write(self.base.as_mut_ptr().add(idx), element);
self.base.set_len(self.base.len() + 1);
}
/// ## Safety
/// This function can lead to memory unsafety in two ways:
/// - Excess capacity: In that case, it will leak memory
/// - Uninitialized elements: In that case, it will segfault while attempting to call
/// `T`'s dtor
pub unsafe fn finish(self) -> HeapArray<T> {
let base = ManuallyDrop::new(self.base);
HeapArray::new(base.as_ptr(), base.len())
}
}
impl HeapArray {
pub fn new(mut v: Vec<u8>) -> Self {
impl<T> HeapArray<T> {
#[cfg(test)]
pub fn new_from_vec(mut v: Vec<T>) -> Self {
v.shrink_to_fit();
let v = ManuallyDrop::new(v);
unsafe { Self::new(v.as_ptr(), v.len()) }
}
pub unsafe fn new(ptr: *const T, len: usize) -> Self {
Self {
ptr: v.as_ptr(),
len: v.len(),
ptr,
len,
_marker: PhantomData,
}
}
pub fn as_slice(&self) -> &[u8] {
pub fn new_writer(cap: usize) -> HeapArrayWriter<T> {
HeapArrayWriter::with_capacity(cap)
}
#[cfg(test)]
pub fn as_slice(&self) -> &[T] {
self
}
}
impl Drop for HeapArray {
impl<T> Drop for HeapArray<T> {
fn drop(&mut self) {
unsafe {
// run dtor
ptr::drop_in_place(ptr::slice_from_raw_parts_mut(self.ptr as *mut u8, self.len));
ptr::drop_in_place(ptr::slice_from_raw_parts_mut(self.ptr as *mut T, self.len));
// deallocate
if self.len != 0 {
let layout = Layout::array::<u8>(self.len).unwrap();
dealloc(self.ptr as *mut u8, layout);
}
let layout = Layout::array::<T>(self.len).unwrap();
dealloc(self.ptr as *mut T as *mut u8, layout);
}
}
}
// totally fine because `u8`s can be safely shared across threads
unsafe impl Send for HeapArray {}
unsafe impl Sync for HeapArray {}
unsafe impl<T: Send> Send for HeapArray<T> {}
unsafe impl<T: Sync> Sync for HeapArray<T> {}
impl Deref for HeapArray {
type Target = [u8];
impl<T> Deref for HeapArray<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
unsafe { slice::from_raw_parts(self.ptr, self.len) }
}
}
impl fmt::Debug for HeapArray {
impl<T: fmt::Debug> fmt::Debug for HeapArray<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.iter()).finish()
}
}
impl PartialEq for HeapArray {
impl<T: PartialEq> PartialEq for HeapArray<T> {
fn eq(&self, other: &Self) -> bool {
self == other
}
@ -95,6 +125,6 @@ impl PartialEq for HeapArray {
fn heaparray_impl() {
// basically, this shouldn't segfault
let heap_array = b"notasuperuser".to_vec();
let heap_array = HeapArray::new(heap_array);
let heap_array = HeapArray::new_from_vec(heap_array);
assert_eq!(heap_array.as_slice(), b"notasuperuser");
}

@ -66,8 +66,8 @@ struct ConnectionEntityState {
impl ConnectionEntityState {
fn default(ks: Arc<Keyspace>, tbl: Arc<Table>) -> Self {
Self {
table: Some((DEFAULT.clone(), tbl)),
ks: Some((DEFAULT.clone(), ks)),
table: Some((DEFAULT, tbl)),
ks: Some((DEFAULT, ks)),
}
}
fn set_ks(&mut self, ks: Arc<Keyspace>, ksid: ObjectID) {

@ -62,14 +62,14 @@ use tokio::{
sync::{mpsc, Semaphore},
};
pub const SIMPLE_QUERY_HEADER: [u8; 3] = [b'*', b'1', b'\n'];
pub const SIMPLE_QUERY_HEADER: [u8; 1] = [b'*'];
type QueryWithAdvance = (Query, usize);
pub enum QueryResult {
Q(QueryWithAdvance),
E(&'static [u8]),
Empty,
Wrongtype,
Disconnected,
}
pub struct AuthProviderHandle<'a, T, Strm> {
@ -136,37 +136,9 @@ pub trait ProtocolConnectionExt<Strm>: ProtocolConnection<Strm> + Send
where
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
/// Try to fill the buffer again
fn read_again<'r, 's>(&'r mut self) -> FutureResult<'s, IoResult<()>>
where
'r: 's,
Self: Send + Sync + 's,
{
Box::pin(async move {
let mv_self = self;
let ret: IoResult<()> = {
let (buffer, stream) = mv_self.get_mut_both();
match stream.read_buf(buffer).await {
Ok(0) => {
if buffer.is_empty() {
return Ok(());
} else {
return Err(IoError::from(ErrorKind::ConnectionReset));
}
}
Ok(_) => Ok(()),
Err(e) => return Err(e),
}
};
ret
})
}
/// Try to parse a query from the buffered data
fn try_query(&self) -> Result<QueryWithAdvance, ParseError> {
if self.get_buffer().is_empty() {
return Err(ParseError::Empty);
}
protocol::Parser::new(self.get_buffer()).parse()
protocol::Parser::parse(self.get_buffer())
}
/// Read a query from the remote end
///
@ -181,27 +153,30 @@ where
{
Box::pin(async move {
let mv_self = self;
let _: Result<QueryResult, IoError> = {
loop {
mv_self.read_again().await?;
match mv_self.try_query() {
Ok(query_with_advance) => {
return Ok(QueryResult::Q(query_with_advance));
}
Err(ParseError::Empty) => return Ok(QueryResult::Empty),
Err(ParseError::NotEnough) => (),
Err(ParseError::DatatypeParseFailure) => return Ok(QueryResult::Wrongtype),
Err(ParseError::UnexpectedByte) | Err(ParseError::BadPacket) => {
return Ok(QueryResult::E(responses::full_responses::R_PACKET_ERR));
}
Err(ParseError::UnknownDatatype) => {
return Ok(QueryResult::E(
responses::full_responses::R_UNKNOWN_DATA_TYPE,
));
loop {
let (buffer, stream) = mv_self.get_mut_both();
match stream.read_buf(buffer).await {
Ok(0) => {
if buffer.is_empty() {
return Ok(QueryResult::Disconnected);
} else {
return Err(IoError::from(ErrorKind::ConnectionReset));
}
}
Ok(_) => {}
Err(e) => return Err(e),
}
};
match mv_self.try_query() {
Ok(query_with_advance) => {
return Ok(QueryResult::Q(query_with_advance));
}
Err(ParseError::NotEnough) => (),
Err(ParseError::DatatypeParseFailure) => return Ok(QueryResult::Wrongtype),
Err(ParseError::UnexpectedByte) | Err(ParseError::BadPacket) => {
return Ok(QueryResult::E(responses::full_responses::R_PACKET_ERR));
}
}
}
})
}
/// Write a response to the stream
@ -252,7 +227,7 @@ where
{
Box::pin(async move {
let slf = self;
slf.write_response([b'*']).await?;
slf.write_response([b'$']).await?;
slf.get_mut_stream()
.write_all(&Integer64::init(len as u64))
.await?;
@ -478,7 +453,7 @@ where
.close_conn_with_error(responses::groups::WRONGTYPE_ERR.to_owned())
.await?
}
Ok(QueryResult::Empty) => return Ok(()),
Ok(QueryResult::Disconnected) => return Ok(()),
#[cfg(windows)]
Err(e) => match e.kind() {
ErrorKind::ConnectionReset => return Ok(()),
@ -498,11 +473,11 @@ where
let db = &mut self.db;
let mut auth_provider = AuthProviderHandle::new(&mut self.auth, &mut self.executor);
match query {
Query::SimpleQuery(sq) => {
Query::Simple(sq) => {
con.write_simple_query_header().await?;
queryengine::execute_simple_noauth(db, con, &mut auth_provider, sq).await?;
}
Query::PipelineQuery(_) => {
Query::Pipelined(_) => {
con.write_simple_query_header().await?;
con.write_response(auth::errors::AUTH_CODE_BAD_CREDENTIALS)
.await?;
@ -519,11 +494,11 @@ where
let db = &mut self.db;
let mut auth_provider = AuthProviderHandle::new(&mut self.auth, &mut self.executor);
match query {
Query::SimpleQuery(q) => {
Query::Simple(q) => {
con.write_simple_query_header().await?;
queryengine::execute_simple(db, con, &mut auth_provider, q).await?;
}
Query::PipelineQuery(pipeline) => {
Query::Pipelined(pipeline) => {
con.write_pipeline_query_header(pipeline.len()).await?;
queryengine::execute_pipeline(db, con, &mut auth_provider, pipeline).await?;
}

@ -269,7 +269,7 @@ impl KVEListmap {
Ok(self
.data
.get(listname)
.map(|list| list.read().iter().cloned().take(count).collect()))
.map(|list| list.read().iter().take(count).cloned().collect()))
}
pub fn list_cloned_full(&self, listname: &[u8]) -> EncodingResult<Option<Vec<Data>>> {
self.check_key_encoding(listname)?;

@ -1,128 +0,0 @@
/*
* Created on Tue May 11 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::UnsafeSlice;
#[cfg(test)]
use bytes::Bytes;
#[non_exhaustive]
#[derive(Debug, PartialEq)]
/// # Unsafe elements
/// This enum represents the data types as **unsafe** elements, supported by the Skyhash Protocol
///
/// ## Safety
///
/// The instantiator must ensure that the [`UnsafeSlice`]s are valid. See its own safety contracts
/// for more information
pub enum UnsafeElement {
/// Arrays can be nested! Their `<tsymbol>` is `&`
Array(Box<[UnsafeElement]>),
/// A String value; `<tsymbol>` is `+`
String(UnsafeSlice),
/// An unsigned integer value; `<tsymbol>` is `:`
UnsignedInt(u64),
/// A non-recursive String array; tsymbol: `_`
FlatArray(Box<[UnsafeFlatElement]>),
/// A type-less non-recursive array
AnyArray(Box<[UnsafeSlice]>),
}
#[derive(Debug, PartialEq)]
/// An **unsafe** flat element, present in a flat array
pub enum UnsafeFlatElement {
String(UnsafeSlice),
}
impl UnsafeElement {
pub const fn is_any_array(&self) -> bool {
matches!(self, Self::AnyArray(_))
}
}
// test impls are for our tests
#[cfg(test)]
impl UnsafeElement {
pub unsafe fn to_owned_flat_array(inner: &[UnsafeFlatElement]) -> Vec<FlatElement> {
inner
.iter()
.map(|v| match v {
UnsafeFlatElement::String(st) => {
FlatElement::String(Bytes::copy_from_slice(st.as_slice()))
}
})
.collect()
}
pub unsafe fn to_owned_any_array(inner: &[UnsafeSlice]) -> Vec<Bytes> {
inner
.iter()
.map(|v| Bytes::copy_from_slice(v.as_slice()))
.collect()
}
pub unsafe fn to_owned_array(inner: &[Self]) -> Vec<OwnedElement> {
inner
.iter()
.map(|v| match &*v {
UnsafeElement::String(st) => {
OwnedElement::String(Bytes::copy_from_slice(st.as_slice()))
}
UnsafeElement::UnsignedInt(int) => OwnedElement::UnsignedInt(*int),
UnsafeElement::AnyArray(arr) => {
OwnedElement::AnyArray(Self::to_owned_any_array(arr))
}
UnsafeElement::Array(arr) => OwnedElement::Array(Self::to_owned_array(arr)),
UnsafeElement::FlatArray(frr) => {
OwnedElement::FlatArray(Self::to_owned_flat_array(frr))
}
})
.collect()
}
pub unsafe fn as_owned_element(&self) -> OwnedElement {
match self {
Self::AnyArray(arr) => OwnedElement::AnyArray(Self::to_owned_any_array(arr)),
Self::FlatArray(frr) => OwnedElement::FlatArray(Self::to_owned_flat_array(frr)),
Self::Array(arr) => OwnedElement::Array(Self::to_owned_array(arr)),
Self::String(st) => OwnedElement::String(Bytes::copy_from_slice(st.as_slice())),
Self::UnsignedInt(int) => OwnedElement::UnsignedInt(*int),
}
}
}
// owned variants to simplify equality in tests
#[derive(Debug, PartialEq)]
#[cfg(test)]
pub enum OwnedElement {
Array(Vec<OwnedElement>),
String(Bytes),
UnsignedInt(u64),
FlatArray(Vec<FlatElement>),
AnyArray(Vec<Bytes>),
}
#[cfg(test)]
#[derive(Debug, PartialEq)]
pub enum FlatElement {
String(Bytes),
}

@ -24,14 +24,9 @@
*
*/
#[cfg(test)]
use super::UnsafeElement;
use super::UnsafeSlice;
use bytes::Bytes;
use core::hint::unreachable_unchecked;
use core::iter::FusedIterator;
use core::ops::Deref;
use core::slice::Iter;
use core::{hint::unreachable_unchecked, iter::FusedIterator, ops::Deref, slice::Iter};
/// An iterator over an [`AnyArray`] (an [`UnsafeSlice`]). The validity of the iterator is
/// left to the caller who has to guarantee:
@ -183,20 +178,12 @@ impl<'a> FusedIterator for BorrowedAnyArrayIter<'a> {}
#[test]
fn test_iter() {
use super::{Parser, Query};
let (q, _fwby) = Parser::new(b"*1\n~3\n3\nset\n1\nx\n3\n100\n")
.parse()
.unwrap();
let (q, _fwby) = Parser::parse(b"*3\n3\nset1\nx3\n100").unwrap();
let r = match q {
Query::SimpleQuery(q) => q,
Query::Simple(q) => q,
_ => panic!("Wrong query"),
};
let arr = unsafe {
match r.into_inner() {
UnsafeElement::AnyArray(arr) => arr,
_ => panic!("Wrong type"),
}
};
let it = arr.iter();
let it = r.as_slice().iter();
let mut iter = unsafe { AnyArrayIter::new(it) };
assert_eq!(iter.next_uppercase().unwrap().as_ref(), "SET".as_bytes());
assert_eq!(iter.next().unwrap(), "x".as_bytes());

@ -1,5 +1,5 @@
/*
* Created on Mon May 10 2021
* Created on Tue Apr 12 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
@ -7,7 +7,7 @@
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@ -24,82 +24,21 @@
*
*/
//! # The Skyhash Protocol
//!
//! ## Introduction
//! The Skyhash Protocol is a serialization protocol that is used by Skytable for client/server communication.
//! It works in a query/response action similar to HTTP's request/response action. Skyhash supersedes the Terrapipe
//! protocol as a more simple, reliable, robust and scalable protocol.
//!
//! This module contains the [`Parser`] for the Skyhash protocol and it's enough to just pass a query packet as
//! a slice of unsigned 8-bit integers and the parser will do everything else. The Skyhash protocol was designed
//! and implemented by the Author (Sayan Nandan)
//!
// modules
pub mod element;
pub mod iter;
pub mod responses;
bench! {
mod benches;
}
use crate::corestore::heap_array::HeapArray;
use core::{fmt, marker::PhantomData, mem::transmute, slice};
#[cfg(feature = "nightly")]
mod benches;
#[cfg(test)]
mod tests;
// endof modules
// test imports
#[cfg(test)]
use self::element::OwnedElement;
// endof test imports
use self::element::{UnsafeElement, UnsafeFlatElement};
use crate::util::Unwrappable;
use core::fmt;
use core::hint::unreachable_unchecked;
use core::ops;
use core::slice;
// pub mods
pub mod iter;
pub mod responses;
// endof pub mods
/// The Skyhash protocol version
pub const PROTOCOL_VERSION: f32 = 1.2;
pub const PROTOCOL_VERSION: f32 = 2.0;
/// The Skyhash protocol version string (Skyhash-x.y)
pub const PROTOCOL_VERSIONSTRING: &str = "Skyhash-1.1";
const ASCII_UNDERSCORE: u8 = b'_';
const ASCII_AMPERSAND: u8 = b'&';
const ASCII_COLON: u8 = b':';
const ASCII_PLUS_SIGN: u8 = b'+';
const ASCII_TILDE_SIGN: u8 = b'~';
#[derive(Debug)]
/// # Skyhash Deserializer (Parser)
///
/// The [`Parser`] object can be used to deserialized a packet serialized by Skyhash which in turn serializes
/// it into data structures native to the Rust Language (and some Compound Types built on top of them).
///
/// ## Safety
///
/// The results returned by the parser are not bound by any lifetime and instead return raw
/// pointers to parts of the source buffer. This means that the caller must ensure that the
/// source buffer remains valid for as long as the result is used.
///
/// ## Evaluation
///
/// The parser is pessimistic in most cases and will readily throw out any errors. On non-recusrive types
/// there is no recursion, but the parser will use implicit recursion for nested arrays. The parser will
/// happily not report any errors if some part of the next query was passed. This is very much a possibility
/// and so has been accounted for
///
/// ## Important note
///
/// All developers willing to modify the deserializer must keep this in mind: the cursor is always Ahead-Of-Position
/// that is the cursor should always point at the next character that can be read.
///
pub struct Parser<'a> {
/// the cursor ptr
cursor: *const u8,
/// the data end ptr
data_end_ptr: *const u8,
/// the buffer
buffer: &'a [u8],
}
pub const PROTOCOL_VERSIONSTRING: &str = "Skyhash-2.0";
#[derive(PartialEq)]
/// As its name says, an [`UnsafeSlice`] is a terribly unsafe slice. It's guarantess are
@ -136,545 +75,347 @@ impl UnsafeSlice {
pub unsafe fn as_slice(&self) -> &[u8] {
slice::from_raw_parts(self.start_ptr, self.len)
}
/// Destruct self, and return a slice, lifetime bound by whoever calls it
pub unsafe fn into_slice<'a>(self) -> &'a [u8] {
slice::from_raw_parts(self.start_ptr, self.len)
}
#[cfg(test)]
pub unsafe fn to_owned(&self) -> Vec<u8> {
self.as_slice().to_owned()
}
/// Check if a certain idx has a certain byte. This can be _thought of something
/// like_:
/// ```notest
/// *(slice.get_unchecked(pos)).eq(pos)
/// ```
pub unsafe fn unsafe_eq(&self, byte: u8, pos: usize) -> bool {
*self.start_ptr.add(pos) == byte
}
/// Turns self into a slice, lifetime bound by caller's lifetime with the provided `chop`.
/// This is roughly equivalent to:
/// ```notest
/// &slice[..slice.len() - chop]
/// ```
pub unsafe fn into_slice_with_start_and_end<'a>(self, len: usize, chop: usize) -> &'a [u8] {
debug_assert!(len <= self.len);
slice::from_raw_parts(self.start_ptr.add(len), self.len - chop)
}
}
#[derive(Debug, PartialEq)]
#[repr(u8)]
/// # Parser Errors
///
/// Several errors can arise during parsing and this enum accounts for them
pub enum ParseError {
/// Didn't get the number of expected bytes
NotEnough,
/// The query contains an unexpected byte
UnexpectedByte,
NotEnough = 0u8,
/// The packet simply contains invalid data
///
/// This is rarely returned and only in the special cases where a bad client sends `0` as
/// the query count
BadPacket,
#[allow(dead_code)] // HACK(@ohsayan): rustc can't "guess" the transmutation
BadPacket = 1u8,
/// The query contains an unexpected byte
UnexpectedByte = 2u8,
/// A data type was given but the parser failed to serialize it into this type
///
/// This can happen not just for elements but can also happen for their sizes ([`Self::parse_into_u64`])
DatatypeParseFailure,
/// A data type that the server doesn't know was passed into the query
///
/// This is a frequent problem that can arise between different server editions as more data types
/// can be added with changing server versions
UnknownDatatype,
/// The query is empty
///
/// The **parser will never return this**, but instead it is provided for convenience with [`dbnet`]
Empty,
DatatypeParseFailure = 3u8,
}
#[derive(Debug, PartialEq)]
/// A simple query object. This object is **not bound to any lifetime!** That's
/// why, merely _having_ it is not unsafe, but all methods on it are unsafe and
/// the caller has to uphold the guarantee of keeping the source buffer's pointers
/// valid
///
/// ## Safety Contracts
///
/// - The provided `UnsafeElement` is valid and generated _legally_
pub struct SimpleQuery {
/// the inner unsafe element
inner: UnsafeElement,
}
/// A generic result to indicate parsing errors thorugh the [`ParseError`] enum
pub type ParseResult<T> = Result<T, ParseError>;
impl SimpleQuery {
/// Create a new `SimpleQuery`
///
/// ## Safety
///
/// This is unsafe because the caller must guarantee the sanctity of
/// the provided element
const unsafe fn new(inner: UnsafeElement) -> SimpleQuery {
Self { inner }
}
/// Decomposes self into an [`UnsafeElement`]
///
/// ## Safety
///
/// Caller must ensure that the UnsafeElement's pointers are still valid
pub unsafe fn into_inner(self) -> UnsafeElement {
self.inner
}
pub const fn is_any_array(&self) -> bool {
matches!(self.inner, UnsafeElement::AnyArray(_))
}
#[derive(Debug)]
pub enum Query {
Simple(SimpleQuery),
Pipelined(PipelinedQuery),
}
#[derive(Debug, PartialEq)]
/// A pipelined query object. This is bound to an _anonymous lifetime_ which is to be bound by
/// the instantiator
///
/// ## Safety Contracts
///
/// - The provided `UnsafeElement` is valid and generated _legally_
/// - The source pointers for the `UnsafeElement` is valid
pub struct PipelineQuery {
inner: Box<[UnsafeElement]>,
#[derive(Debug)]
pub struct SimpleQuery {
data: HeapArray<UnsafeSlice>,
}
impl PipelineQuery {
/// Create a new `PipelineQuery`
///
/// ## Safety
///
/// The caller has the responsibility to uphold the guarantee of keeping the source
/// pointers valid
const unsafe fn new(inner: Box<[UnsafeElement]>) -> PipelineQuery {
Self { inner }
impl SimpleQuery {
#[cfg(test)]
fn into_owned(self) -> OwnedSimpleQuery {
OwnedSimpleQuery {
data: self
.data
.iter()
.map(|v| unsafe { v.as_slice().to_owned() })
.collect(),
}
}
pub const fn len(&self) -> usize {
self.inner.len()
pub fn as_slice(&self) -> &[UnsafeSlice] {
&self.data
}
}
impl ops::Deref for PipelineQuery {
type Target = [UnsafeElement];
fn deref(&self) -> &Self::Target {
&self.inner
}
#[cfg(test)]
struct OwnedSimpleQuery {
data: Vec<Vec<u8>>,
}
#[derive(Debug, PartialEq)]
/// # Queries
///
/// This enum has two variants: Simple and Pipelined. Both hold two **terribly `unsafe`**
/// objects:
/// - A [`SimpleQuery`] or
/// - A [`PipelineQuery`]
///
/// Again, **both objects hold raw pointers and guarantee nothing about the source's
/// validity**.
/// ## Safety
///
/// This object holds zero ownership on the actual data, but only holds a collection
/// of pointers. This means that you have to **ensure that the source outlives the Query**
/// object. Using it otherwise is very **`unsafe`**! The caller who creates the instance or the
/// one who uses it is responsible for ensuring any validity guarantee
///
pub enum Query {
/// A simple query will just hold one element
SimpleQuery(SimpleQuery),
/// A pipelined/batch query will hold multiple elements
PipelineQuery(PipelineQuery),
#[derive(Debug)]
pub struct PipelinedQuery {
data: HeapArray<HeapArray<UnsafeSlice>>,
}
impl Query {
impl PipelinedQuery {
pub fn len(&self) -> usize {
self.data.len()
}
pub fn into_inner(self) -> HeapArray<HeapArray<UnsafeSlice>> {
self.data
}
#[cfg(test)]
/// Turns self into an onwed query (see [`OwnedQuery`])
///
/// ## Safety contracts
///
/// - The query itself is valid
/// - The query is correctly bound to the lifetime
unsafe fn into_owned_query(self) -> OwnedQuery {
match self {
Self::SimpleQuery(SimpleQuery { inner, .. }) => {
OwnedQuery::SimpleQuery(inner.as_owned_element())
}
Self::PipelineQuery(PipelineQuery { inner, .. }) => {
OwnedQuery::PipelineQuery(inner.iter().map(|v| v.as_owned_element()).collect())
}
fn into_owned(self) -> OwnedPipelinedQuery {
OwnedPipelinedQuery {
data: self
.data
.iter()
.map(|v| {
v.iter()
.map(|v| unsafe { v.as_slice().to_owned() })
.collect()
})
.collect(),
}
}
}
#[derive(Debug, PartialEq)]
#[cfg(test)]
/// An owned query with direct ownership of the data rather than through
/// pointers
enum OwnedQuery {
SimpleQuery(OwnedElement),
PipelineQuery(Vec<OwnedElement>),
struct OwnedPipelinedQuery {
data: Vec<Vec<Vec<u8>>>,
}
/// A generic result to indicate parsing errors thorugh the [`ParseError`] enum
pub type ParseResult<T> = Result<T, ParseError>;
/// A parser for Skyhash 2.0
pub struct Parser<'a> {
end: *const u8,
cursor: *const u8,
_lt: PhantomData<&'a ()>,
}
impl<'a> Parser<'a> {
/// Create a new parser instance, bound to the lifetime of the source buffer
pub fn new(buffer: &'a [u8]) -> Self {
/// Initialize a new parser
pub fn new(slice: &[u8]) -> Self {
unsafe {
let cursor = buffer.as_ptr();
let data_end_ptr = cursor.add(buffer.len());
Self {
cursor,
data_end_ptr,
buffer,
end: slice.as_ptr().add(slice.len()),
cursor: slice.as_ptr(),
_lt: PhantomData,
}
}
}
/// Returns what we have consumed
/// ```text
/// [*****************************]
/// ^ ^ ^
/// start cursor data end ptr
/// ```
fn consumed(&self) -> usize {
unsafe { self.cursor.offset_from(self.buffer.as_ptr()) as usize }
}
/// Returns what we have left
/// ```text
/// [*****************************]
/// ^ ^
/// cursor data end ptr
/// ```
fn remaining(&self) -> usize {
unsafe { self.data_end_ptr().offset_from(self.cursor) as usize }
}
fn exhausted(&self) -> bool {
self.cursor >= self.data_end_ptr()
}
/// Returns a ptr to one byte past the last non-null ptr
}
// basic methods
impl<'a> Parser<'a> {
/// Returns a ptr one byte past the allocation of the buffer
const fn data_end_ptr(&self) -> *const u8 {
self.data_end_ptr
self.end
}
/// Move the cursor ptr ahead by the provided amount
fn incr_cursor_by(&mut self, by: usize) {
unsafe { self.cursor = self.cursor.add(by) }
/// Returns the position of the cursor
/// WARNING: Deref might led to a segfault
const fn cursor_ptr(&self) -> *const u8 {
self.cursor
}
/// Increment the cursor by 1
fn incr_cursor(&mut self) {
self.incr_cursor_by(1)
/// Check how many bytes we have left
fn remaining(&self) -> usize {
self.data_end_ptr() as usize - self.cursor_ptr() as usize
}
/// Read `until` bytes from source
fn read_until(&mut self, until: usize) -> ParseResult<UnsafeSlice> {
if self.remaining() < until {
Err(ParseError::NotEnough)
} else {
let start_ptr = self.cursor;
self.incr_cursor_by(until);
unsafe { Ok(UnsafeSlice::new(start_ptr, until)) }
}
/// Check if we have `size` bytes remaining
fn has_remaining(&self, size: usize) -> bool {
self.remaining() >= size
}
/// Read a line. This will place the cursor at a point just ahead of
/// the LF
fn read_line(&mut self) -> ParseResult<UnsafeSlice> {
if self.exhausted() {
Err(ParseError::NotEnough)
} else {
let start_ptr = self.cursor;
let end_ptr = self.data_end_ptr();
let mut len = 0usize;
unsafe {
while end_ptr > self.cursor && *self.cursor != b'\n' {
len += 1;
self.incr_cursor();
}
if self.will_cursor_give_linefeed()? {
let ret = Ok(UnsafeSlice::new(start_ptr, len));
self.incr_cursor();
ret
} else {
Err(ParseError::BadPacket)
}
}
}
#[cfg(test)]
/// Check if we have exhausted the buffer
fn exhausted(&self) -> bool {
self.cursor_ptr() >= self.data_end_ptr()
}
}
impl<'a> Parser<'a> {
/// Returns true if the cursor will give a char, but if `this_if_nothing_ahead` is set
/// to true, then if no byte is ahead, it will still return true
fn will_cursor_give_char(&self, ch: u8, this_if_nothing_ahead: bool) -> ParseResult<bool> {
if self.exhausted() {
// nothing left
if this_if_nothing_ahead {
Ok(true)
} else {
Err(ParseError::NotEnough)
}
} else if unsafe { (*self.cursor).eq(&ch) } {
Ok(true)
} else {
Ok(false)
}
/// Check if the buffer is not exhausted
fn not_exhausted(&self) -> bool {
self.cursor_ptr() < self.data_end_ptr()
}
/// Check if the current cursor will give an LF
fn will_cursor_give_linefeed(&self) -> ParseResult<bool> {
self.will_cursor_give_char(b'\n', false)
/// Attempts to return the byte pointed at by the cursor.
/// WARNING: The same segfault warning
const unsafe fn get_byte_at_cursor(&self) -> u8 {
*self.cursor_ptr()
}
}
// mutable refs
impl<'a> Parser<'a> {
/// Parse a stream of bytes into [`usize`]
fn parse_into_usize(bytes: &[u8]) -> ParseResult<usize> {
if bytes.is_empty() {
return Err(ParseError::NotEnough);
}
let byte_iter = bytes.iter();
let mut item_usize = 0usize;
for dig in byte_iter {
if !dig.is_ascii_digit() {
// dig has to be an ASCII digit
return Err(ParseError::DatatypeParseFailure);
}
// 48 is the ASCII code for 0, and 57 is the ascii code for 9
// so if 0 is given, the subtraction should give 0; similarly
// if 9 is given, the subtraction should give us 9!
let curdig: usize = unsafe {
// UNSAFE(@ohsayan): We already know that dig is an ASCII digit
dig.checked_sub(48).unsafe_unwrap()
}
.into();
// The usize can overflow; check that case
let product = match item_usize.checked_mul(10) {
Some(not_overflowed) => not_overflowed,
None => return Err(ParseError::DatatypeParseFailure),
};
let sum = match product.checked_add(curdig) {
Some(not_overflowed) => not_overflowed,
None => return Err(ParseError::DatatypeParseFailure),
};
item_usize = sum;
}
Ok(item_usize)
/// Increment the cursor by `by` positions
unsafe fn incr_cursor_by(&mut self, by: usize) {
self.cursor = self.cursor.add(by);
}
/// Pasre a stream of bytes into an [`u64`]
fn parse_into_u64(bytes: &[u8]) -> ParseResult<u64> {
if bytes.is_empty() {
return Err(ParseError::NotEnough);
}
let byte_iter = bytes.iter();
let mut item_u64 = 0u64;
for dig in byte_iter {
if !dig.is_ascii_digit() {
// dig has to be an ASCII digit
return Err(ParseError::DatatypeParseFailure);
}
// 48 is the ASCII code for 0, and 57 is the ascii code for 9
// so if 0 is given, the subtraction should give 0; similarly
// if 9 is given, the subtraction should give us 9!
let curdig: u64 = unsafe {
// UNSAFE(@ohsayan): We already know that dig is an ASCII digit
dig.checked_sub(48).unsafe_unwrap()
}
.into();
// Now the entire u64 can overflow, so let's attempt to check it
let product = match item_u64.checked_mul(10) {
Some(not_overflowed) => not_overflowed,
None => return Err(ParseError::DatatypeParseFailure),
};
let sum = match product.checked_add(curdig) {
Some(not_overflowed) => not_overflowed,
None => return Err(ParseError::DatatypeParseFailure),
};
item_u64 = sum;
}
Ok(item_u64)
/// Increment the position of the cursor by one position
unsafe fn incr_cursor(&mut self) {
self.incr_cursor_by(1);
}
}
// higher level abstractions
impl<'a> Parser<'a> {
/// Parse the metaframe to get the number of queries, i.e the datagroup
/// count
fn parse_metaframe_get_datagroup_count(&mut self) -> ParseResult<usize> {
if self.buffer.len() < 3 {
// the smallest query we can have is: *1\n or 3 chars
Err(ParseError::NotEnough)
} else {
/// Attempt to read `len` bytes
fn read_until(&mut self, len: usize) -> ParseResult<UnsafeSlice> {
if self.has_remaining(len) {
unsafe {
let our_chunk = self.read_line()?;
if our_chunk.unsafe_eq(b'*', 0) {
Ok(Self::parse_into_usize(
our_chunk.into_slice_with_start_and_end(1, 1),
)?)
} else {
Err(ParseError::UnexpectedByte)
}
// UNSAFE(@ohsayan): Already verified lengths
let slice = UnsafeSlice::new(self.cursor_ptr(), len);
self.incr_cursor_by(len);
Ok(slice)
}
} else {
Err(ParseError::NotEnough)
}
}
}
impl<'a> Parser<'a> {
/// Gets the _next element. **The cursor should be at the tsymbol (passed)**
fn _next(&mut self) -> ParseResult<UnsafeSlice> {
let sizeline = self.read_line()?;
#[cfg(test)]
/// Attempt to read a byte slice terminated by an LF
fn read_line(&mut self) -> ParseResult<UnsafeSlice> {
let start_ptr = self.cursor_ptr();
unsafe {
let element_size = Self::parse_into_usize(sizeline.into_slice())?;
self.read_until(element_size)
}
}
/// Gets the next string (`+`). **The cursor should be at the tsymbol (passed)**
fn parse_next_string(&mut self) -> ParseResult<UnsafeSlice> {
{
let chunk = self._next()?;
let haslf = self.will_cursor_give_linefeed()?;
if haslf {
while self.not_exhausted() && self.get_byte_at_cursor() != b'\n' {
self.incr_cursor();
Ok(chunk)
}
if self.not_exhausted() && self.get_byte_at_cursor() == b'\n' {
let len = self.cursor_ptr() as usize - start_ptr as usize;
self.incr_cursor(); // skip LF
Ok(UnsafeSlice::new(start_ptr, len))
} else {
Err(ParseError::NotEnough)
}
}
}
/// Gets the next `u64`. **The cursor should be at the tsymbol (passed)**
fn parse_next_u64(&mut self) -> ParseResult<u64> {
let chunk = self._next()?;
/// Attempt to read a line, **rejecting an empty payload**
fn read_line_pedantic(&mut self) -> ParseResult<UnsafeSlice> {
let start_ptr = self.cursor_ptr();
unsafe {
let ret = Self::parse_into_u64(chunk.into_slice())?;
if self.will_cursor_give_linefeed()? {
while self.not_exhausted() && self.get_byte_at_cursor() != b'\n' {
self.incr_cursor();
Ok(ret)
}
let len = self.cursor_ptr() as usize - start_ptr as usize;
let has_lf = self.not_exhausted() && self.get_byte_at_cursor() == b'\n';
if has_lf && len != 0 {
self.incr_cursor(); // skip LF
Ok(UnsafeSlice::new(start_ptr, len))
} else {
Err(ParseError::NotEnough)
// just some silly hackery
Err(transmute(has_lf))
}
}
}
/// Gets the next element. **The cursor should be at the tsymbol (_not_ passed)**
fn parse_next_element(&mut self) -> ParseResult<UnsafeElement> {
if self.exhausted() {
Err(ParseError::NotEnough)
} else {
unsafe {
let tsymbol = *self.cursor;
// got tsymbol, now incr
self.incr_cursor();
let ret = match tsymbol {
ASCII_PLUS_SIGN => UnsafeElement::String(self.parse_next_string()?),
ASCII_COLON => UnsafeElement::UnsignedInt(self.parse_next_u64()?),
ASCII_AMPERSAND => UnsafeElement::Array(self.parse_next_array()?),
ASCII_TILDE_SIGN => UnsafeElement::AnyArray(self.parse_next_any_array()?),
ASCII_UNDERSCORE => UnsafeElement::FlatArray(self.parse_next_flat_array()?),
_ => {
return Err(ParseError::UnknownDatatype);
}
/// Attempt to read an `usize` from the buffer
fn read_usize(&mut self) -> ParseResult<usize> {
let line = self.read_line_pedantic()?;
let bytes = unsafe {
// UNSAFE(@ohsayan): We just extracted the slice
line.as_slice()
};
let mut ret = 0usize;
for byte in bytes {
if byte.is_ascii_digit() {
ret = match ret.checked_mul(10) {
Some(r) => r,
None => return Err(ParseError::DatatypeParseFailure),
};
ret = match ret.checked_add((byte & 0x0F) as _) {
Some(r) => r,
None => return Err(ParseError::DatatypeParseFailure),
};
Ok(ret)
}
}
}
/// Parse the next blob. **The cursor should be at the tsymbol (passed)**
fn parse_next_blob(&mut self) -> ParseResult<UnsafeSlice> {
{
let chunk = self._next()?;
if self.will_cursor_give_linefeed()? {
self.incr_cursor();
Ok(chunk)
} else {
Err(ParseError::UnexpectedByte)
return Err(ParseError::DatatypeParseFailure);
}
}
Ok(ret)
}
/// Parse the next `AnyArray`. **The cursor should be at the tsymbol (passed)**
fn parse_next_any_array(&mut self) -> ParseResult<Box<[UnsafeSlice]>> {
}
// query impls
impl<'a> Parser<'a> {
/// Parse the next simple query. This should have passed the `*` tsymbol
///
/// Simple query structure (tokenized line-by-line):
/// ```text
/// * -> Simple Query Header
/// <n>\n -> Count of elements in the simple query
/// <l0>\n -> Length of element 1
/// <e0> -> element 1 itself
/// <l1>\n -> Length of element 2
/// <e1> -> element 2 itself
/// ...
/// ```
fn _next_simple_query(&mut self) -> ParseResult<HeapArray<UnsafeSlice>> {
let element_count = self.read_usize()?;
unsafe {
let size_line = self.read_line()?;
let size = Self::parse_into_usize(size_line.into_slice())?;
let mut array = Vec::with_capacity(size);
for _ in 0..size {
array.push(self.parse_next_blob()?);
let mut data = HeapArray::new_writer(element_count);
for i in 0..element_count {
let element_size = self.read_usize()?;
let element = self.read_until(element_size)?;
data.write_to_index(i, element);
}
Ok(array.into_boxed_slice())
Ok(data.finish())
}
}
/// The cursor should have passed the tsymbol
fn parse_next_flat_array(&mut self) -> ParseResult<Box<[UnsafeFlatElement]>> {
unsafe {
let flat_array_sizeline = self.read_line()?;
let array_size = Self::parse_into_usize(flat_array_sizeline.into_slice())?;
let mut array = Vec::with_capacity(array_size);
for _ in 0..array_size {
if self.exhausted() {
return Err(ParseError::NotEnough);
} else {
let tsymb = *self.cursor;
// good, there is a tsymbol; move the cursor ahead
self.incr_cursor();
let ret = match tsymb {
b'+' => self.parse_next_string().map(UnsafeFlatElement::String)?,
_ => return Err(ParseError::UnknownDatatype),
};
array.push(ret);
}
}
Ok(array.into_boxed_slice())
}
/// Parse a simple query
fn next_simple_query(&mut self) -> ParseResult<SimpleQuery> {
Ok(SimpleQuery {
data: self._next_simple_query()?,
})
}
/// Parse the next array. **The cursor should be at the tsymbol (passed)**
fn parse_next_array(&mut self) -> ParseResult<Box<[UnsafeElement]>> {
/// Parse a pipelined query. This should have passed the `$` tsymbol
///
/// Pipelined query structure (tokenized line-by-line):
/// ```text
/// $ -> Pipeline
/// <n>\n -> Pipeline has n queries
/// <lq0>\n -> Query 1 has 3 elements
/// <lq0e0>\n -> Q1E1 has 3 bytes
/// <q0e0> -> Q1E1 itself
/// <lq0e1>\n -> Q1E2 has 1 byte
/// <q0e1> -> Q1E2 itself
/// <lq0e2>\n -> Q1E3 has 3 bytes
/// <q0e2> -> Q1E3 itself
/// <lq1>\n -> Query 2 has 2 elements
/// <lq1e0>\n -> Q2E1 has 3 bytes
/// <q1e0> -> Q2E1 itself
/// <lq1e1>\n -> Q2E2 has 1 byte
/// <q1e1> -> Q2E2 itself
/// ...
/// ```
///
/// Example:
/// ```text
/// $ -> Pipeline
/// 2\n -> Pipeline has 2 queries
/// 3\n -> Query 1 has 3 elements
/// 3\n -> Q1E1 has 3 bytes
/// SET -> Q1E1 itself
/// 1\n -> Q1E2 has 1 byte
/// x -> Q1E2 itself
/// 3\n -> Q1E3 has 3 bytes
/// 100 -> Q1E3 itself
/// 2\n -> Query 2 has 2 elements
/// 3\n -> Q2E1 has 3 bytes
/// GET -> Q2E1 itself
/// 1\n -> Q2E2 has 1 byte
/// x -> Q2E2 itself
/// ```
fn next_pipeline(&mut self) -> ParseResult<PipelinedQuery> {
let query_count = self.read_usize()?;
unsafe {
let size_of_array_chunk = self.read_line()?;
let size_of_array = Self::parse_into_usize(size_of_array_chunk.into_slice())?;
let mut array = Vec::with_capacity(size_of_array);
for _ in 0..size_of_array {
array.push(self.parse_next_element()?);
let mut queries = HeapArray::new_writer(query_count);
for i in 0..query_count {
let sq = self._next_simple_query()?;
queries.write_to_index(i, sq);
}
Ok(array.into_boxed_slice())
Ok(PipelinedQuery {
data: queries.finish(),
})
}
}
}
impl<'a> Parser<'a> {
/// Try to parse the provided buffer (lt: 'a) into a Query (lt: 'a). The
/// parser itself can (or must) go out of scope, but the buffer can't!
pub fn parse(&mut self) -> Result<(Query, usize), ParseError> {
let number_of_queries = self.parse_metaframe_get_datagroup_count()?;
if number_of_queries == 0 {
return Err(ParseError::BadPacket);
}
if number_of_queries == 1 {
let query = self.parse_next_element()?;
if unsafe {
self.will_cursor_give_char(b'*', true)
.unwrap_or_else(|_| unreachable_unchecked())
} {
unsafe {
// SAFETY: Contract upheld
Ok((Query::SimpleQuery(SimpleQuery::new(query)), self.consumed()))
}
} else {
Err(ParseError::UnexpectedByte)
fn _parse(&mut self) -> ParseResult<Query> {
if self.not_exhausted() {
unsafe {
let first_byte = self.get_byte_at_cursor();
self.incr_cursor();
let data = match first_byte {
b'*' => {
// a simple query
Query::Simple(self.next_simple_query()?)
}
b'$' => {
// a pipelined query
Query::Pipelined(self.next_pipeline()?)
}
_ => return Err(ParseError::UnexpectedByte),
};
Ok(data)
}
} else {
// pipelined query
let mut queries = Vec::with_capacity(number_of_queries);
for _ in 0..number_of_queries {
queries.push(self.parse_next_element()?);
}
if unsafe {
self.will_cursor_give_char(b'*', true)
.unwrap_or_else(|_| unreachable_unchecked())
} {
unsafe {
// SAFETY: Contract upheld
Ok((
Query::PipelineQuery(PipelineQuery::new(queries.into_boxed_slice())),
self.consumed(),
))
}
} else {
Err(ParseError::UnexpectedByte)
}
Err(ParseError::NotEnough)
}
}
pub fn parse(buf: &[u8]) -> ParseResult<(Query, usize)> {
let mut slf = Self::new(buf);
let body = slf._parse()?;
let consumed = slf.cursor_ptr() as usize - buf.as_ptr() as usize;
Ok((body, consumed))
}
}

@ -47,7 +47,7 @@ pub mod groups {
/// Response code 6 as a array element
pub const OTHER_ERR_EMPTY: &[u8] = eresp!("6");
/// Response group element with string "HEYA"
pub const HEYA: &[u8] = "+4\nHEY!\n".as_bytes();
pub const HEYA: &[u8] = "+4\nHEY!".as_bytes();
/// "Unknown action" error response
pub const UNKNOWN_ACTION: &[u8] = eresp!("Unknown action");
/// Response code 7
@ -117,37 +117,37 @@ pub mod full_responses {
//! be written off directly to the stream and should **not be preceded by any response metaframe**
/// Response code: 0 (Okay)
pub const R_OKAY: &[u8] = "*1\n!1\n0\n".as_bytes();
pub const R_OKAY: &[u8] = "*!1\n0\n".as_bytes();
/// Response code: 1 (Nil)
pub const R_NIL: &[u8] = "*1\n!1\n1\n".as_bytes();
pub const R_NIL: &[u8] = "*!1\n1\n".as_bytes();
/// Response code: 2 (Overwrite Error)
pub const R_OVERWRITE_ERR: &[u8] = "*1\n!1\n2\n".as_bytes();
pub const R_OVERWRITE_ERR: &[u8] = "*!1\n2\n".as_bytes();
/// Response code: 3 (Action Error)
pub const R_ACTION_ERR: &[u8] = "*1\n!1\n3\n".as_bytes();
pub const R_ACTION_ERR: &[u8] = "*!1\n3\n".as_bytes();
/// Response code: 4 (Packet Error)
pub const R_PACKET_ERR: &[u8] = "*1\n!1\n4\n".as_bytes();
pub const R_PACKET_ERR: &[u8] = "*!1\n4\n".as_bytes();
/// Response code: 5 (Server Error)
pub const R_SERVER_ERR: &[u8] = "*1\n!1\n5\n".as_bytes();
pub const R_SERVER_ERR: &[u8] = "*!1\n5\n".as_bytes();
/// Response code: 6 (Other Error _without description_)
pub const R_OTHER_ERR_EMPTY: &[u8] = "*1\n!1\n6\n".as_bytes();
pub const R_OTHER_ERR_EMPTY: &[u8] = "*!1\n6\n".as_bytes();
/// Response code: 7; wrongtype
pub const R_WRONGTYPE_ERR: &[u8] = "*1\n!1\n7".as_bytes();
pub const R_WRONGTYPE_ERR: &[u8] = "*!1\n7".as_bytes();
/// Response code: 8; unknown data type
pub const R_UNKNOWN_DATA_TYPE: &[u8] = "*1\n!1\n8\n".as_bytes();
pub const R_UNKNOWN_DATA_TYPE: &[u8] = "*!1\n8\n".as_bytes();
/// A heya response
pub const R_HEYA: &[u8] = "*1\n+4\nHEY!\n".as_bytes();
pub const R_HEYA: &[u8] = "*+4\nHEY!\n".as_bytes();
/// An other response with description: "Unknown action"
pub const R_UNKNOWN_ACTION: &[u8] = "*1\n!14\nUnknown action\n".as_bytes();
pub const R_UNKNOWN_ACTION: &[u8] = "*!14\nUnknown action\n".as_bytes();
/// A 0 uint64 reply
pub const R_ONE_INT_REPLY: &[u8] = "*1\n:1\n1\n".as_bytes();
pub const R_ONE_INT_REPLY: &[u8] = "*:1\n1\n".as_bytes();
/// A 1 uint64 reply
pub const R_ZERO_INT_REPLY: &[u8] = "*1\n:1\n0\n".as_bytes();
pub const R_ZERO_INT_REPLY: &[u8] = "*:1\n0\n".as_bytes();
/// Snapshot busy (other error)
pub const R_SNAPSHOT_BUSY: &[u8] = "*1\n!17\nerr-snapshot-busy\n".as_bytes();
pub const R_SNAPSHOT_BUSY: &[u8] = "*!17\nerr-snapshot-busy\n".as_bytes();
/// Snapshot disabled (other error)
pub const R_SNAPSHOT_DISABLED: &[u8] = "*1\n!21\nerr-snapshot-disabled\n".as_bytes();
pub const R_SNAPSHOT_DISABLED: &[u8] = "*!21\nerr-snapshot-disabled\n".as_bytes();
/// Snapshot has illegal name (other error)
pub const R_SNAPSHOT_ILLEGAL_NAME: &[u8] = "*1\n!25\nerr-invalid-snapshot-name\n".as_bytes();
pub const R_SNAPSHOT_ILLEGAL_NAME: &[u8] = "*!25\nerr-invalid-snapshot-name\n".as_bytes();
/// Access after termination signal (other error)
pub const R_ERR_ACCESS_AFTER_TERMSIG: &[u8] = "*1\n!24\nerr-access-after-termsig\n".as_bytes();
pub const R_ERR_ACCESS_AFTER_TERMSIG: &[u8] = "*!24\nerr-access-after-termsig\n".as_bytes();
}

File diff suppressed because it is too large Load Diff

@ -30,12 +30,9 @@ use crate::actions::{ActionError, ActionResult};
use crate::auth;
use crate::corestore::Corestore;
use crate::dbnet::connection::prelude::*;
use crate::protocol::{
element::UnsafeElement, iter::AnyArrayIter, responses, PipelineQuery, SimpleQuery,
};
use crate::protocol::{iter::AnyArrayIter, responses, PipelinedQuery, SimpleQuery, UnsafeSlice};
use crate::queryengine::parser::Entity;
use crate::{actions, admin};
use core::hint::unreachable_unchecked;
mod ddl;
mod inspect;
pub mod parser;
@ -84,15 +81,15 @@ action! {
auth: &mut AuthProviderHandle<'_, T, Strm>,
buf: SimpleQuery
) {
if buf.is_any_array() {
let bufref = unsafe { buf.into_inner() };
let mut iter = unsafe { get_iter(&bufref) };
match iter.next_lowercase().unwrap_or_custom_aerr(groups::PACKET_ERR)?.as_ref() {
ACTION_AUTH => auth::auth_login_only(con, auth, iter).await,
_ => util::err(auth::errors::AUTH_CODE_BAD_CREDENTIALS),
}
} else {
util::err(groups::WRONGTYPE_ERR)
let bufref = buf.as_slice();
let mut iter = unsafe {
// UNSAFE(@ohsayan): The presence of the connection guarantees that this
// won't suddenly become invalid
AnyArrayIter::new(bufref.iter())
};
match iter.next_lowercase().unwrap_or_custom_aerr(groups::PACKET_ERR)?.as_ref() {
ACTION_AUTH => auth::auth_login_only(con, auth, iter).await,
_ => util::err(auth::errors::AUTH_CODE_BAD_CREDENTIALS),
}
}
//// Execute a simple query
@ -102,46 +99,20 @@ action! {
auth: &mut AuthProviderHandle<'_, T, Strm>,
buf: SimpleQuery
) {
if buf.is_any_array() {
unsafe {
self::execute_stage(db, con, auth, &buf.into_inner()).await
}
} else {
util::err(groups::WRONGTYPE_ERR)
}
self::execute_stage(db, con, auth, buf.as_slice()).await
}
}
#[allow(clippy::needless_lifetimes)]
unsafe fn get_iter<'a>(buf: &'a UnsafeElement) -> AnyArrayIter<'a> {
// this is the boxed slice
let bufref = {
// SAFETY: execute_simple is called by execute_query which in turn is called
// by ConnnectionHandler::run(). In all cases, the `Con` remains valid
// ensuring that the source buffer exists as long as the connection does
// so this is safe.
match buf {
UnsafeElement::AnyArray(arr) => arr,
_ => unreachable_unchecked(),
}
};
// this is our final iter
let iter = {
// SAFETY: Again, this is guaranteed to be valid because the `con` is valid
AnyArrayIter::new(bufref.iter())
};
iter
}
async fn execute_stage<'a, T: 'a + ClientConnection<Strm>, Strm: Stream>(
db: &mut Corestore,
con: &'a mut T,
auth: &mut AuthProviderHandle<'_, T, Strm>,
buf: &UnsafeElement,
buf: &[UnsafeSlice],
) -> ActionResult<()> {
let mut iter = unsafe {
// UNSAFE(@ohsayan): Assumed to be guaranteed by the caller
get_iter(buf)
// UNSAFE(@ohsayan): The presence of the connection guarantees that this
// won't suddenly become invalid
AnyArrayIter::new(buf.iter())
};
{
gen_constants_and_matches!(
@ -204,10 +175,9 @@ async fn execute_stage_pedantic<'a, T: ClientConnection<Strm> + 'a, Strm: Stream
handle: &mut Corestore,
con: &mut T,
auth: &mut AuthProviderHandle<'_, T, Strm>,
stage: &UnsafeElement,
stage: &[UnsafeSlice],
) -> crate::IoResult<()> {
let ret = async {
ensure_cond_or_err(stage.is_any_array(), groups::WRONGTYPE_ERR)?;
self::execute_stage(handle, con, auth, stage).await?;
Ok(())
};
@ -224,9 +194,9 @@ action! {
handle: &mut Corestore,
con: &mut T,
auth: &mut AuthProviderHandle<'_, T, Strm>,
pipeline: PipelineQuery
pipeline: PipelinedQuery
) {
for stage in pipeline.iter() {
for stage in pipeline.into_inner().iter() {
self::execute_stage_pedantic(handle, con, auth, stage).await?;
}
Ok(())

@ -102,8 +102,6 @@ impl Writable for StringWrapper {
con.write_lowlevel(&[b'\n']).await?;
// Now write the REAL bytes (of the object)
con.write_lowlevel(self.0.as_bytes()).await?;
// Now write another LF
con.write_lowlevel(&[b'\n']).await?;
Ok(())
})
}
@ -143,8 +141,6 @@ impl Writable for &'static str {
con.write_lowlevel(&[b'\n']).await?;
// Now write the REAL bytes (of the object)
con.write_lowlevel(self.as_bytes()).await?;
// Now write another LF
con.write_lowlevel(&[b'\n']).await?;
Ok(())
})
}
@ -167,8 +163,6 @@ impl Writable for BytesWrapper {
con.write_lowlevel(&[b'\n']).await?;
// Now write the REAL bytes (of the object)
con.write_lowlevel(&bytes).await?;
// Now write another LF
con.write_lowlevel(&[b'\n']).await?;
Ok(())
})
}
@ -179,9 +173,6 @@ impl Writable for usize {
Box::pin(async move {
con.write_lowlevel(b":").await?;
let usize_bytes = Integer64::from(self);
let usize_bytes_len = Integer64::from(usize_bytes.len());
con.write_lowlevel(&usize_bytes_len).await?;
con.write_lowlevel(b"\n").await?;
con.write_lowlevel(&usize_bytes).await?;
con.write_lowlevel(b"\n").await?;
Ok(())
@ -194,9 +185,6 @@ impl Writable for u64 {
Box::pin(async move {
con.write_lowlevel(b":").await?;
let usize_bytes = Integer64::from(self);
let usize_bytes_len = Integer64::from(usize_bytes.len());
con.write_lowlevel(&usize_bytes_len).await?;
con.write_lowlevel(b"\n").await?;
con.write_lowlevel(&usize_bytes).await?;
con.write_lowlevel(b"\n").await?;
Ok(())
@ -220,8 +208,6 @@ impl Writable for ObjectID {
con.write_lowlevel(&[b'\n']).await?;
// Now write the REAL bytes (of the object)
con.write_lowlevel(&self).await?;
// Now write another LF
con.write_lowlevel(&[b'\n']).await?;
Ok(())
})
}
@ -231,10 +217,7 @@ impl Writable for f32 {
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
Box::pin(async move {
let payload = self.to_string();
let payload_len = Integer64::from(payload.len());
con.write_lowlevel(&[TSYMBOL_FLOAT]).await?;
con.write_lowlevel(&payload_len).await?;
con.write_lowlevel(&[b'\n']).await?;
con.write_lowlevel(payload.as_bytes()).await?;
con.write_lowlevel(&[b'\n']).await?;
Ok(())

@ -49,7 +49,6 @@ where
raw_stream.write_all(&bytes).await?; // then len
raw_stream.write_all(&[b'\n']).await?; // LF
raw_stream.write_all(payload).await?; // payload
raw_stream.write_all(&[b'\n']).await?; // final LF
Ok(())
}
@ -103,8 +102,6 @@ where
stream.write_all(&[b'\n']).await?;
// now element
stream.write_all(bytes).await?;
// now final LF
stream.write_all(&[b'\n']).await?;
Ok(())
}
/// Write the NIL response code
@ -167,14 +164,12 @@ where
stream.write_all(&[b'\n']).await?;
// now element
stream.write_all(bytes).await?;
// now final LF
stream.write_all(&[b'\n']).await?;
Ok(())
}
/// Write a null
pub async fn write_null(&mut self) -> IoResult<()> {
let stream = unsafe { self.con.raw_stream() };
stream.write_all(&[b'\0', b'\n']).await?;
stream.write_all(&[b'\0']).await?;
Ok(())
}
}
@ -225,8 +220,6 @@ where
stream.write_all(&[b'\n']).await?;
// now element
stream.write_all(bytes).await?;
// now final LF
stream.write_all(&[b'\n']).await?;
Ok(())
}
}

@ -122,7 +122,7 @@ macro_rules! action {
$($(#[$attr])*
pub async fn $fname<'a, T: 'a + ClientConnection<Strm>, Strm:Stream>(
$($argname: $argty,)*
) -> crate::actions::ActionResult<()>
) -> $crate::actions::ActionResult<()>
$block)*
};
(
@ -137,7 +137,7 @@ macro_rules! action {
$argone: $argonety,
$argtwo: $argtwoty,
mut $argthree: $argthreety
) -> crate::actions::ActionResult<()>
) -> $crate::actions::ActionResult<()>
$block)*
};
}

@ -24,19 +24,17 @@
*
*/
use crate::report::AggregatedReport;
use crate::util;
use crate::{report::AggregatedReport, util};
use devtimer::DevTime;
use libstress::utils::generate_random_byte_vector;
use libstress::PoolConfig;
use libstress::{utils::generate_random_byte_vector, PoolConfig};
use rand::thread_rng;
use skytable::types::RawString;
use skytable::Query;
use std::io::{Read, Write};
use std::net::TcpStream;
mod validation;
use self::validation::SIMPLE_QUERY_SIZE;
use skytable::{types::RawString, Query};
use std::{
io::{Read, Write},
net::TcpStream,
};
pub mod validation;
use self::validation::SQ_RESPCODE_SIZE;
const NOTICE_INIT_BENCH: &str = "Finished sanity test. Initializing benchmark ...";
const NOTICE_INIT_COMPLETE: &str = "Initialization complete! Benchmark started";
@ -79,23 +77,19 @@ pub fn runner(
.arg(format!("default:{}", &temp_table))
.into_raw_query();
// an okay response code size: `*1\n!1\n0\n`:
let response_okay_size =
validation::calculate_monoelement_dataframe_size(1) + SIMPLE_QUERY_SIZE;
let pool_config = PoolConfig::new(
max_connections,
move || {
let mut stream = TcpStream::connect(&host).unwrap();
stream.write_all(&switch_table.clone()).unwrap();
let mut v = vec![0; response_okay_size];
let mut v = vec![0; SQ_RESPCODE_SIZE];
let _ = stream.read_exact(&mut v).unwrap();
stream
},
move |sock, packet: Vec<u8>| {
sock.write_all(&packet).unwrap();
// all `okay`s are returned (for both update and set)
let mut v = vec![0; response_okay_size];
let mut v = vec![0; SQ_RESPCODE_SIZE];
let _ = sock.read_exact(&mut v).unwrap();
},
|socket| {
@ -166,7 +160,8 @@ pub fn runner(
dt.stop_timer("SET").unwrap();
let get_response_packet_size =
validation::calculate_monoelement_dataframe_size(per_kv_size) + SIMPLE_QUERY_SIZE;
validation::calculate_monoelement_dataframe_size(per_kv_size)
+ validation::calculate_metaframe_size(1);
let getpool =
pool_config.with_loop_closure(move |sock: &mut TcpStream, packet: Vec<u8>| {
sock.write_all(&packet).unwrap();
@ -215,7 +210,7 @@ fn init_temp_table(rand: &mut impl rand::Rng, host: &str) -> String {
let mut create_table_connection = TcpStream::connect(host).unwrap();
// create table
create_table_connection.write_all(&create_table).unwrap();
let mut v = [0u8; 8];
let mut v = [0u8; SQ_RESPCODE_SIZE];
let _ = create_table_connection.read_exact(&mut v).unwrap();
temp_table
}

@ -24,8 +24,7 @@
*
*/
/// Just a sweet `*1\n`
pub(super) const SIMPLE_QUERY_SIZE: usize = 3;
pub const SQ_RESPCODE_SIZE: usize = b"*!1\n".len();
/// For a dataframe, this returns the dataframe size for array responses.
///
@ -77,7 +76,7 @@ pub fn calculate_typed_array_dataframe_size(
/// For a monoelement dataframe, this returns the size:
/// ```text
/// <tsymbol><size>\n
/// <element>\n
/// <element>
/// ```
///
/// For an `okay` respcode, it will look like this:
@ -91,7 +90,6 @@ pub fn calculate_monoelement_dataframe_size(per_element_size: usize) -> usize {
s += per_element_size.to_string().len(); // the bytes in size string
s += 1; // the LF
s += per_element_size; // the element itself
s += 1; // the final LF
s
}
@ -102,10 +100,11 @@ pub fn calculate_monoelement_dataframe_size(per_element_size: usize) -> usize {
#[allow(dead_code)] // TODO(@ohsayan): Remove this lint
pub fn calculate_metaframe_size(queries: usize) -> usize {
if queries == 1 {
SIMPLE_QUERY_SIZE
// just `*`
1
} else {
let mut s = 0;
s += 1; // `*`
s += 1; // `$`
s += queries.to_string().len(); // the bytes in size string
s += 1; // `\n`
s
@ -118,11 +117,11 @@ mod tests {
#[test]
fn test_monoelement_calculation() {
assert_eq!(calculate_monoelement_dataframe_size(1), 5);
assert_eq!(calculate_monoelement_dataframe_size(1), 4);
}
#[test]
fn test_simple_query_metaframe_size() {
assert_eq!(calculate_metaframe_size(1), SIMPLE_QUERY_SIZE);
assert_eq!(calculate_metaframe_size(1), 1);
}
#[test]
fn test_typed_array_dataframe_size() {

@ -24,8 +24,7 @@
*
*/
use crate::hoststr;
use crate::sanity_test;
use crate::{benchtool::validation::SQ_RESPCODE_SIZE, hoststr, sanity_test};
use libstress::Workpool;
use rand::thread_rng;
use skytable::Query;
@ -44,7 +43,7 @@ pub fn create_testkeys(host: &str, port: u16, num: usize, connections: usize, si
move || TcpStream::connect(host.clone()).unwrap(),
|sock, packet: Vec<u8>| {
sock.write_all(&packet).unwrap();
let mut buf = [0u8; 8];
let mut buf = [0u8; SQ_RESPCODE_SIZE];
let _ = sock.read_exact(&mut buf).unwrap();
},
|socket| {

@ -47,7 +47,7 @@ macro_rules! hoststr {
macro_rules! sanity_test {
($host:expr, $port:expr) => {{
// Run a sanity test
if let Err(e) = crate::util::run_sanity_test(&$host, $port) {
if let Err(e) = $crate::util::run_sanity_test(&$host, $port) {
Err(e)
} else {
Ok(())

@ -115,31 +115,21 @@ fn _get_eresp_array(tokens: TokenStream) -> TokenStream {
_ => panic!("Expected a string literal"),
};
let payload_bytes = payload_str.as_bytes();
let payload_len = payload_bytes.len();
let payload_len_str = payload_len.to_string();
let payload_len_bytes = payload_len_str.as_bytes();
let mut processed = quote! {
b'!',
};
for byte in payload_len_bytes {
for byte in payload_bytes {
processed = quote! {
#processed
#byte,
};
}
}
processed = quote! {
#processed
b'\n',
};
for byte in payload_bytes {
processed = quote! {
#processed
#byte,
}
}
processed = quote! {
[#processed
b'\n',]
[#processed]
};
processed.into()
}

Loading…
Cancel
Save