Merge pull request #258 from skytable/protocol/compat
Add backwards compatibility for Skyhash 1.0next
commit
4e90d97ee3
@ -1,70 +0,0 @@
|
||||
/*
|
||||
* Created on Sun Mar 06 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use crate::actions::ActionError;
|
||||
|
||||
/// Skyhash respstring: already claimed (user was already claimed)
|
||||
pub const AUTH_ERROR_ALREADYCLAIMED: &[u8] = b"!err-auth-already-claimed\n";
|
||||
/// Skyhash respcode(10): bad credentials (either bad creds or invalid user)
|
||||
pub const AUTH_CODE_BAD_CREDENTIALS: &[u8] = b"!10\n";
|
||||
/// Skyhash respstring: auth is disabled
|
||||
pub const AUTH_ERROR_DISABLED: &[u8] = b"!err-auth-disabled\n";
|
||||
/// Skyhash respcode(11): Insufficient permissions (same for anonymous user)
|
||||
pub const AUTH_CODE_PERMS: &[u8] = b"!11\n";
|
||||
/// Skyhash respstring: ID is too long
|
||||
pub const AUTH_ERROR_ILLEGAL_USERNAME: &[u8] = b"!err-auth-illegal-username\n";
|
||||
/// Skyhash respstring: ID is protected/in use
|
||||
pub const AUTH_ERROR_FAILED_TO_DELETE_USER: &[u8] = b"!err-auth-deluser-fail\n";
|
||||
|
||||
/// Auth erros
|
||||
#[derive(PartialEq, Debug)]
|
||||
pub enum AuthError {
|
||||
/// The auth slot was already claimed
|
||||
AlreadyClaimed,
|
||||
/// Bad userid/tokens/keys
|
||||
BadCredentials,
|
||||
/// Auth is disabled
|
||||
Disabled,
|
||||
/// The action is not available to the current account
|
||||
PermissionDenied,
|
||||
/// The user is anonymous and doesn't have the right to execute this
|
||||
Anonymous,
|
||||
/// Some other error
|
||||
Other(&'static [u8]),
|
||||
}
|
||||
|
||||
impl From<AuthError> for ActionError {
|
||||
fn from(e: AuthError) -> Self {
|
||||
let r = match e {
|
||||
AuthError::AlreadyClaimed => AUTH_ERROR_ALREADYCLAIMED,
|
||||
AuthError::Anonymous | AuthError::PermissionDenied => AUTH_CODE_PERMS,
|
||||
AuthError::BadCredentials => AUTH_CODE_BAD_CREDENTIALS,
|
||||
AuthError::Disabled => AUTH_ERROR_DISABLED,
|
||||
AuthError::Other(e) => e,
|
||||
};
|
||||
ActionError::ActionError(r)
|
||||
}
|
||||
}
|
@ -1,79 +0,0 @@
|
||||
/*
|
||||
* Created on Tue Nov 02 2021
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
Do note that the result of the benches might actually be slower, than faster! The reason it is so, is simply because of
|
||||
the fact that we generate owned queries, by copying bytes which adds an overhead, but offers simplicity in writing tests
|
||||
and/or benches
|
||||
*/
|
||||
|
||||
extern crate test;
|
||||
use super::{element::OwnedElement, OwnedQuery, Parser};
|
||||
use bytes::Bytes;
|
||||
use test::Bencher;
|
||||
|
||||
#[bench]
|
||||
fn bench_simple_query_string(b: &mut Bencher) {
|
||||
const PAYLOAD: &[u8] = b"*1\n+5\nsayan\n";
|
||||
unsafe {
|
||||
b.iter(|| {
|
||||
assert_eq!(
|
||||
Parser::new(PAYLOAD).parse().unwrap().0.into_owned_query(),
|
||||
OwnedQuery::SimpleQuery(OwnedElement::String(Bytes::from("sayan")))
|
||||
);
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_simple_query_uint(b: &mut Bencher) {
|
||||
const PAYLOAD: &[u8] = b"*1\n:5\n12345\n";
|
||||
unsafe {
|
||||
b.iter(|| {
|
||||
assert_eq!(
|
||||
Parser::new(PAYLOAD).parse().unwrap().0.into_owned_query(),
|
||||
OwnedQuery::SimpleQuery(OwnedElement::UnsignedInt(12345))
|
||||
);
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_simple_query_any_array(b: &mut Bencher) {
|
||||
const PAYLOAD: &[u8] = b"*1\n~3\n3\nthe\n3\ncat\n6\nmeowed\n";
|
||||
unsafe {
|
||||
b.iter(|| {
|
||||
assert_eq!(
|
||||
Parser::new(PAYLOAD).parse().unwrap().0.into_owned_query(),
|
||||
OwnedQuery::SimpleQuery(OwnedElement::AnyArray(vec![
|
||||
"the".into(),
|
||||
"cat".into(),
|
||||
"meowed".into()
|
||||
]))
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
@ -0,0 +1,492 @@
|
||||
/*
|
||||
* Created on Tue Apr 26 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use super::ParseError;
|
||||
use crate::{
|
||||
corestore::{
|
||||
booltable::{BytesBoolTable, BytesNicheLUT},
|
||||
buffers::Integer64,
|
||||
},
|
||||
dbnet::connection::{QueryResult, QueryWithAdvance, RawConnection, Stream},
|
||||
util::FutureResult,
|
||||
IoResult,
|
||||
};
|
||||
use std::io::{Error as IoError, ErrorKind};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
|
||||
|
||||
/*
|
||||
NOTE TO SELF (@ohsayan): Why do we split everything into separate traits? To avoid mistakes
|
||||
in the future. We don't want any action to randomly call `read_query`, which was possible
|
||||
with the earlier `ProtcolConnectionExt` trait, since it was imported by every action from
|
||||
the prelude.
|
||||
- `ProtocolSpec`: this is like a charset definition of the protocol along with some other
|
||||
good stuff
|
||||
- `ProtocolRead`: should only read from the stream and never write
|
||||
- `ProtocolWrite`: should only write data and never read
|
||||
|
||||
These distinctions reduce the likelihood of making mistakes while implementing the traits
|
||||
|
||||
-- Sayan (May, 2022)
|
||||
*/
|
||||
|
||||
/// The `ProtocolSpec` trait is used to define the character set and pre-generated elements
|
||||
/// and responses for a protocol version. To make any actual use of it, you need to implement
|
||||
/// both the `ProtocolRead` and `ProtocolWrite` for the protocol
|
||||
pub trait ProtocolSpec: Send + Sync {
|
||||
// spec information
|
||||
|
||||
/// The Skyhash protocol version
|
||||
const PROTOCOL_VERSION: f32;
|
||||
/// The Skyhash protocol version string (Skyhash-x.y)
|
||||
const PROTOCOL_VERSIONSTRING: &'static str;
|
||||
|
||||
// type symbols
|
||||
/// Type symbol for unicode strings
|
||||
const TSYMBOL_STRING: u8;
|
||||
/// Type symbol for blobs
|
||||
const TSYMBOL_BINARY: u8;
|
||||
/// Type symbol for float
|
||||
const TSYMBOL_FLOAT: u8;
|
||||
/// Type symbok for int64
|
||||
const TSYMBOL_INT64: u8;
|
||||
/// Type symbol for typed array
|
||||
const TSYMBOL_TYPED_ARRAY: u8;
|
||||
/// Type symbol for typed non-null array
|
||||
const TSYMBOL_TYPED_NON_NULL_ARRAY: u8;
|
||||
/// Type symbol for an array
|
||||
const TSYMBOL_ARRAY: u8;
|
||||
/// Type symbol for a flat array
|
||||
const TSYMBOL_FLAT_ARRAY: u8;
|
||||
|
||||
// charset
|
||||
/// The line-feed character or separator
|
||||
const LF: u8 = b'\n';
|
||||
|
||||
// metaframe
|
||||
/// The header for simple queries
|
||||
const SIMPLE_QUERY_HEADER: &'static [u8];
|
||||
/// The header for pipelined queries (excluding length, obviously)
|
||||
const PIPELINED_QUERY_FIRST_BYTE: u8;
|
||||
|
||||
// typed array
|
||||
/// Null element represenation for a typed array
|
||||
const TYPE_TYPED_ARRAY_ELEMENT_NULL: &'static [u8];
|
||||
|
||||
// respcodes
|
||||
/// Respcode 0: Okay
|
||||
const RCODE_OKAY: &'static [u8];
|
||||
/// Respcode 1: Nil
|
||||
const RCODE_NIL: &'static [u8];
|
||||
/// Respcode 2: Overwrite error
|
||||
const RCODE_OVERWRITE_ERR: &'static [u8];
|
||||
/// Respcode 3: Action error
|
||||
const RCODE_ACTION_ERR: &'static [u8];
|
||||
/// Respcode 4: Packet error
|
||||
const RCODE_PACKET_ERR: &'static [u8];
|
||||
/// Respcode 5: Server error
|
||||
const RCODE_SERVER_ERR: &'static [u8];
|
||||
/// Respcode 6: Other error
|
||||
const RCODE_OTHER_ERR_EMPTY: &'static [u8];
|
||||
/// Respcode 7: Unknown action
|
||||
const RCODE_UNKNOWN_ACTION: &'static [u8];
|
||||
/// Respcode 8: Wrongtype error
|
||||
const RCODE_WRONGTYPE_ERR: &'static [u8];
|
||||
/// Respcode 9: Unknown data type error
|
||||
const RCODE_UNKNOWN_DATA_TYPE: &'static [u8];
|
||||
/// Respcode 10: Encoding error
|
||||
const RCODE_ENCODING_ERROR: &'static [u8];
|
||||
|
||||
// respstrings
|
||||
/// Respstring when snapshot engine is busy
|
||||
const RSTRING_SNAPSHOT_BUSY: &'static [u8];
|
||||
/// Respstring when snapshots are disabled
|
||||
const RSTRING_SNAPSHOT_DISABLED: &'static [u8];
|
||||
/// Respstring when duplicate snapshot creation is attempted
|
||||
const RSTRING_SNAPSHOT_DUPLICATE: &'static [u8];
|
||||
/// Respstring when snapshot has illegal chars
|
||||
const RSTRING_SNAPSHOT_ILLEGAL_NAME: &'static [u8];
|
||||
/// Respstring when a **very bad error** happens (use after termsig)
|
||||
const RSTRING_ERR_ACCESS_AFTER_TERMSIG: &'static [u8];
|
||||
/// Respstring when the default container is unset
|
||||
const RSTRING_DEFAULT_UNSET: &'static [u8];
|
||||
/// Respstring when the container is not found
|
||||
const RSTRING_CONTAINER_NOT_FOUND: &'static [u8];
|
||||
/// Respstring when the container is still in use, but a _free_ op is attempted
|
||||
const RSTRING_STILL_IN_USE: &'static [u8];
|
||||
/// Respstring when a protected container is attempted to be accessed/modified
|
||||
const RSTRING_PROTECTED_OBJECT: &'static [u8];
|
||||
/// Respstring when an action is not suitable for the current table model
|
||||
const RSTRING_WRONG_MODEL: &'static [u8];
|
||||
/// Respstring when the container already exists
|
||||
const RSTRING_ALREADY_EXISTS: &'static [u8];
|
||||
/// Respstring when the container is not ready
|
||||
const RSTRING_NOT_READY: &'static [u8];
|
||||
/// Respstring when a DDL transaction fails
|
||||
const RSTRING_DDL_TRANSACTIONAL_FAILURE: &'static [u8];
|
||||
/// Respstring when an unknow DDL query is run (`CREATE BLAH`, for example)
|
||||
const RSTRING_UNKNOWN_DDL_QUERY: &'static [u8];
|
||||
/// Respstring when a bad DDL expression is run
|
||||
const RSTRING_BAD_EXPRESSION: &'static [u8];
|
||||
/// Respstring when an unsupported model is attempted to be used during table creation
|
||||
const RSTRING_UNKNOWN_MODEL: &'static [u8];
|
||||
/// Respstring when too many arguments are passed to a DDL query
|
||||
const RSTRING_TOO_MANY_ARGUMENTS: &'static [u8];
|
||||
/// Respstring when the container name is too long
|
||||
const RSTRING_CONTAINER_NAME_TOO_LONG: &'static [u8];
|
||||
/// Respstring when the container name
|
||||
const RSTRING_BAD_CONTAINER_NAME: &'static [u8];
|
||||
/// Respstring when an unknown inspect query is run (`INSPECT blah`, for example)
|
||||
const RSTRING_UNKNOWN_INSPECT_QUERY: &'static [u8];
|
||||
/// Respstring when an unknown table property is passed during table creation
|
||||
const RSTRING_UNKNOWN_PROPERTY: &'static [u8];
|
||||
/// Respstring when a non-empty keyspace is attempted to be dropped
|
||||
const RSTRING_KEYSPACE_NOT_EMPTY: &'static [u8];
|
||||
/// Respstring when a bad type is provided for a key in the K/V engine (like using a `list`
|
||||
/// for the key)
|
||||
const RSTRING_BAD_TYPE_FOR_KEY: &'static [u8];
|
||||
/// Respstring when a non-existent index is attempted to be accessed in a list
|
||||
const RSTRING_LISTMAP_BAD_INDEX: &'static [u8];
|
||||
/// Respstring when a list is empty and we attempt to access/modify it
|
||||
const RSTRING_LISTMAP_LIST_IS_EMPTY: &'static [u8];
|
||||
|
||||
// element responses
|
||||
/// A string element containing the text "HEY!"
|
||||
const ELEMRESP_HEYA: &'static [u8];
|
||||
|
||||
// full responses
|
||||
/// A **full response** for a packet error
|
||||
const FULLRESP_RCODE_PACKET_ERR: &'static [u8];
|
||||
/// A **full response** for a wrongtype error
|
||||
const FULLRESP_RCODE_WRONG_TYPE: &'static [u8];
|
||||
|
||||
// LUTs
|
||||
/// A LUT for SET operations
|
||||
const SET_NLUT: BytesNicheLUT = BytesNicheLUT::new(
|
||||
Self::RCODE_ENCODING_ERROR,
|
||||
Self::RCODE_OKAY,
|
||||
Self::RCODE_OVERWRITE_ERR,
|
||||
);
|
||||
/// A LUT for lists
|
||||
const OKAY_BADIDX_NIL_NLUT: BytesNicheLUT = BytesNicheLUT::new(
|
||||
Self::RCODE_NIL,
|
||||
Self::RCODE_OKAY,
|
||||
Self::RSTRING_LISTMAP_BAD_INDEX,
|
||||
);
|
||||
/// A LUT for SET operations
|
||||
const OKAY_OVW_BLUT: BytesBoolTable =
|
||||
BytesBoolTable::new(Self::RCODE_OKAY, Self::RCODE_OVERWRITE_ERR);
|
||||
/// A LUT for UPDATE operations
|
||||
const UPDATE_NLUT: BytesNicheLUT = BytesNicheLUT::new(
|
||||
Self::RCODE_ENCODING_ERROR,
|
||||
Self::RCODE_OKAY,
|
||||
Self::RCODE_NIL,
|
||||
);
|
||||
|
||||
// auth error respstrings
|
||||
/// respstring: already claimed (user was already claimed)
|
||||
const AUTH_ERROR_ALREADYCLAIMED: &'static [u8];
|
||||
/// respcode(10): bad credentials (either bad creds or invalid user)
|
||||
const AUTH_CODE_BAD_CREDENTIALS: &'static [u8];
|
||||
/// respstring: auth is disabled
|
||||
const AUTH_ERROR_DISABLED: &'static [u8];
|
||||
/// respcode(11): Insufficient permissions (same for anonymous user)
|
||||
const AUTH_CODE_PERMS: &'static [u8];
|
||||
/// respstring: ID is too long
|
||||
const AUTH_ERROR_ILLEGAL_USERNAME: &'static [u8];
|
||||
/// respstring: ID is protected/in use
|
||||
const AUTH_ERROR_FAILED_TO_DELETE_USER: &'static [u8];
|
||||
}
|
||||
|
||||
/// # The `ProtocolRead` trait
|
||||
///
|
||||
/// The `ProtocolRead` trait enables read operations using the protocol for a given stream `Strm` and protocol
|
||||
/// `P`. Both the stream and protocol must implement the appropriate traits for you to be able to use these
|
||||
/// traits
|
||||
///
|
||||
/// ## DO NOT
|
||||
/// The fact that this is a trait enables great flexibility in terms of visibility, but **DO NOT EVER CALL any
|
||||
/// function other than `read_query`, `close_conn_with_error` or `write_response`**. If you mess with functions
|
||||
/// like `read_again`, you're likely to pull yourself into some good trouble.
|
||||
pub trait ProtocolRead<P, Strm>: RawConnection<P, Strm>
|
||||
where
|
||||
Strm: Stream,
|
||||
P: ProtocolSpec,
|
||||
{
|
||||
/// Try to parse a query from the buffered data
|
||||
fn try_query(&self) -> Result<QueryWithAdvance, ParseError>;
|
||||
/// Read a query from the remote end
|
||||
///
|
||||
/// This function asynchronously waits until all the data required
|
||||
/// for parsing the query is available
|
||||
fn read_query<'s, 'r: 's>(&'r mut self) -> FutureResult<'s, Result<QueryResult, IoError>> {
|
||||
Box::pin(async move {
|
||||
let mv_self = self;
|
||||
loop {
|
||||
let (buffer, stream) = mv_self.get_mut_both();
|
||||
match stream.read_buf(buffer).await {
|
||||
Ok(0) => {
|
||||
if buffer.is_empty() {
|
||||
return Ok(QueryResult::Disconnected);
|
||||
} else {
|
||||
return Err(IoError::from(ErrorKind::ConnectionReset));
|
||||
}
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
match mv_self.try_query() {
|
||||
Ok(query_with_advance) => {
|
||||
return Ok(QueryResult::Q(query_with_advance));
|
||||
}
|
||||
Err(ParseError::NotEnough) => (),
|
||||
Err(ParseError::DatatypeParseFailure) => return Ok(QueryResult::Wrongtype),
|
||||
Err(ParseError::UnexpectedByte | ParseError::BadPacket) => {
|
||||
return Ok(QueryResult::E(P::FULLRESP_RCODE_PACKET_ERR));
|
||||
}
|
||||
Err(ParseError::WrongType) => {
|
||||
return Ok(QueryResult::E(P::FULLRESP_RCODE_WRONG_TYPE));
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ProtocolWrite<P, Strm>: RawConnection<P, Strm>
|
||||
where
|
||||
Strm: Stream,
|
||||
P: ProtocolSpec,
|
||||
{
|
||||
// utility
|
||||
fn _get_raw_stream(&mut self) -> &mut BufWriter<Strm> {
|
||||
self.get_mut_stream()
|
||||
}
|
||||
fn _flush_stream<'life0, 'ret_life>(&'life0 mut self) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move { self.get_mut_stream().flush().await })
|
||||
}
|
||||
fn _write_raw<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
data: &'life1 [u8],
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move { self.get_mut_stream().write_all(data).await })
|
||||
}
|
||||
fn _write_raw_flushed<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
data: &'life1 [u8],
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
self._write_raw(data).await?;
|
||||
self._flush_stream().await
|
||||
})
|
||||
}
|
||||
fn close_conn_with_error<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
resp: &'life1 [u8],
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move { self._write_raw_flushed(resp).await })
|
||||
}
|
||||
|
||||
// metaframe
|
||||
fn write_simple_query_header<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
self.get_mut_stream()
|
||||
.write_all(P::SIMPLE_QUERY_HEADER)
|
||||
.await
|
||||
})
|
||||
}
|
||||
fn write_pipelined_query_header<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
qcount: usize,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
self.get_mut_stream()
|
||||
.write_all(&[P::PIPELINED_QUERY_FIRST_BYTE])
|
||||
.await?;
|
||||
self.get_mut_stream()
|
||||
.write_all(&Integer64::from(qcount))
|
||||
.await?;
|
||||
self.get_mut_stream().write_all(&[P::LF]).await
|
||||
})
|
||||
}
|
||||
|
||||
// monoelement
|
||||
fn write_mono_length_prefixed_with_tsymbol<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
data: &'life1 [u8],
|
||||
tsymbol: u8,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: Send + 'ret_life;
|
||||
/// serialize and write an `&str` to the stream
|
||||
fn write_string<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
string: &'life1 str,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: 'ret_life;
|
||||
/// serialize and write an `&[u8]` to the stream
|
||||
fn write_binary<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
binary: &'life1 [u8],
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: 'ret_life;
|
||||
/// serialize and write an `usize` to the stream
|
||||
fn write_usize<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
size: usize,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: 'ret_life;
|
||||
/// serialize and write an `u64` to the stream
|
||||
fn write_int64<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
int: u64,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: 'ret_life;
|
||||
/// serialize and write an `f32` to the stream
|
||||
fn write_float<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
float: f32,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: 'ret_life;
|
||||
|
||||
// typed array
|
||||
fn write_typed_array_header<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
len: usize,
|
||||
tsymbol: u8,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
self.get_mut_stream()
|
||||
.write_all(&[P::TSYMBOL_TYPED_ARRAY, tsymbol])
|
||||
.await?;
|
||||
self.get_mut_stream()
|
||||
.write_all(&Integer64::from(len))
|
||||
.await?;
|
||||
self.get_mut_stream().write_all(&[P::LF]).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
fn write_typed_array_element_null<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
self.get_mut_stream()
|
||||
.write_all(P::TYPE_TYPED_ARRAY_ELEMENT_NULL)
|
||||
.await
|
||||
})
|
||||
}
|
||||
fn write_typed_array_element<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
element: &'life1 [u8],
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: 'ret_life;
|
||||
|
||||
// typed non-null array
|
||||
fn write_typed_non_null_array_header<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
len: usize,
|
||||
tsymbol: u8,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
self.get_mut_stream()
|
||||
.write_all(&[P::TSYMBOL_TYPED_NON_NULL_ARRAY, tsymbol])
|
||||
.await?;
|
||||
self.get_mut_stream()
|
||||
.write_all(&Integer64::from(len))
|
||||
.await?;
|
||||
self.get_mut_stream().write_all(&[P::LF]).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
fn write_typed_non_null_array_element<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
element: &'life1 [u8],
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move { self.write_typed_array_element(element).await })
|
||||
}
|
||||
}
|
@ -0,0 +1,174 @@
|
||||
/*
|
||||
* Created on Tue May 03 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use {
|
||||
super::{ParseError, ParseResult, UnsafeSlice},
|
||||
core::mem::transmute,
|
||||
};
|
||||
|
||||
/*
|
||||
NOTE TO SELF (@ohsayan): The reason we split this into three traits is because:
|
||||
- `RawParser` is the only one that is to be implemented. Just provide information about the cursor
|
||||
- `RawParserMeta` provides information about the buffer based on cursor and end ptr information
|
||||
- `RawParserExt` provides high-level abstractions over `RawParserMeta`. It is like the "super trait"
|
||||
|
||||
These distinctions reduce the likelihood of "accidentally incorrect impls" (we could've easily included
|
||||
`RawParserMeta` inside `RawParser`).
|
||||
|
||||
-- Sayan (May, 2022)
|
||||
*/
|
||||
|
||||
/// The `RawParser` trait has three methods that implementors must define:
|
||||
///
|
||||
/// - `cursor_ptr` -> Should point to the current position in the buffer for the parser
|
||||
/// - `cursor_ptr_mut` -> a mutable reference to the cursor
|
||||
/// - `data_end_ptr` -> a ptr to one byte past the allocated area of the buffer
|
||||
///
|
||||
/// All implementors of `RawParser` get a free implementation for `RawParserMeta` and `RawParserExt`
|
||||
///
|
||||
/// # Safety
|
||||
/// - `cursor_ptr` must point to a valid location in memory
|
||||
/// - `data_end_ptr` must point to a valid location in memory, in the **same allocated area**
|
||||
pub(super) unsafe trait RawParser {
|
||||
fn cursor_ptr(&self) -> *const u8;
|
||||
fn cursor_ptr_mut(&mut self) -> &mut *const u8;
|
||||
fn data_end_ptr(&self) -> *const u8;
|
||||
}
|
||||
|
||||
/// The `RawParserMeta` trait builds on top of the `RawParser` trait to provide low-level interactions
|
||||
/// and information with the parser's buffer. It is implemented for any type that implements the `RawParser`
|
||||
/// trait. Manual implementation is discouraged
|
||||
pub(super) trait RawParserMeta: RawParser {
|
||||
/// Check how many bytes we have left
|
||||
fn remaining(&self) -> usize {
|
||||
self.data_end_ptr() as usize - self.cursor_ptr() as usize
|
||||
}
|
||||
/// Check if we have `size` bytes remaining
|
||||
fn has_remaining(&self, size: usize) -> bool {
|
||||
self.remaining() >= size
|
||||
}
|
||||
/// Check if we have exhausted the buffer
|
||||
fn exhausted(&self) -> bool {
|
||||
self.cursor_ptr() >= self.data_end_ptr()
|
||||
}
|
||||
/// Check if the buffer is not exhausted
|
||||
fn not_exhausted(&self) -> bool {
|
||||
self.cursor_ptr() < self.data_end_ptr()
|
||||
}
|
||||
/// Attempts to return the byte pointed at by the cursor.
|
||||
/// WARNING: The same segfault warning
|
||||
unsafe fn get_byte_at_cursor(&self) -> u8 {
|
||||
*self.cursor_ptr()
|
||||
}
|
||||
/// Increment the cursor by `by` positions
|
||||
unsafe fn incr_cursor_by(&mut self, by: usize) {
|
||||
let current = *self.cursor_ptr_mut();
|
||||
*self.cursor_ptr_mut() = current.add(by);
|
||||
}
|
||||
/// Increment the position of the cursor by one position
|
||||
unsafe fn incr_cursor(&mut self) {
|
||||
self.incr_cursor_by(1);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> RawParserMeta for T where T: RawParser {}
|
||||
|
||||
/// `RawParserExt` builds on the `RawParser` and `RawParserMeta` traits to provide high level abstractions
|
||||
/// like reading lines, or a slice of a given length. It is implemented for any type that
|
||||
/// implements the `RawParser` trait. Manual implementation is discouraged
|
||||
pub(super) trait RawParserExt: RawParser + RawParserMeta {
|
||||
/// Attempt to read `len` bytes
|
||||
fn read_until(&mut self, len: usize) -> ParseResult<UnsafeSlice> {
|
||||
if self.has_remaining(len) {
|
||||
unsafe {
|
||||
// UNSAFE(@ohsayan): Already verified lengths
|
||||
let slice = UnsafeSlice::new(self.cursor_ptr(), len);
|
||||
self.incr_cursor_by(len);
|
||||
Ok(slice)
|
||||
}
|
||||
} else {
|
||||
Err(ParseError::NotEnough)
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
/// Attempt to read a byte slice terminated by an LF
|
||||
fn read_line(&mut self) -> ParseResult<UnsafeSlice> {
|
||||
let start_ptr = self.cursor_ptr();
|
||||
unsafe {
|
||||
while self.not_exhausted() && self.get_byte_at_cursor() != b'\n' {
|
||||
self.incr_cursor();
|
||||
}
|
||||
if self.not_exhausted() && self.get_byte_at_cursor() == b'\n' {
|
||||
let len = self.cursor_ptr() as usize - start_ptr as usize;
|
||||
self.incr_cursor(); // skip LF
|
||||
Ok(UnsafeSlice::new(start_ptr, len))
|
||||
} else {
|
||||
Err(ParseError::NotEnough)
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Attempt to read a line, **rejecting an empty payload**
|
||||
fn read_line_pedantic(&mut self) -> ParseResult<UnsafeSlice> {
|
||||
let start_ptr = self.cursor_ptr();
|
||||
unsafe {
|
||||
while self.not_exhausted() && self.get_byte_at_cursor() != b'\n' {
|
||||
self.incr_cursor();
|
||||
}
|
||||
let len = self.cursor_ptr() as usize - start_ptr as usize;
|
||||
let has_lf = self.not_exhausted() && self.get_byte_at_cursor() == b'\n';
|
||||
if has_lf && len != 0 {
|
||||
self.incr_cursor(); // skip LF
|
||||
Ok(UnsafeSlice::new(start_ptr, len))
|
||||
} else {
|
||||
// just some silly hackery
|
||||
Err(transmute(has_lf))
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Attempt to read an `usize` from the buffer
|
||||
fn read_usize(&mut self) -> ParseResult<usize> {
|
||||
let line = self.read_line_pedantic()?;
|
||||
let bytes = line.as_slice();
|
||||
let mut ret = 0usize;
|
||||
for byte in bytes {
|
||||
if byte.is_ascii_digit() {
|
||||
ret = match ret.checked_mul(10) {
|
||||
Some(r) => r,
|
||||
None => return Err(ParseError::DatatypeParseFailure),
|
||||
};
|
||||
ret = match ret.checked_add((byte & 0x0F) as _) {
|
||||
Some(r) => r,
|
||||
None => return Err(ParseError::DatatypeParseFailure),
|
||||
};
|
||||
} else {
|
||||
return Err(ParseError::DatatypeParseFailure);
|
||||
}
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> RawParserExt for T where T: RawParser + RawParserMeta {}
|
@ -1,153 +0,0 @@
|
||||
/*
|
||||
* Created on Sat Aug 22 2020
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2020, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
//! Primitives for generating Skyhash compatible responses
|
||||
|
||||
pub mod groups {
|
||||
#![allow(unused)]
|
||||
//! # Pre-compiled response **elements**
|
||||
//! These are pre-compiled response groups and **not** complete responses. If complete
|
||||
//! responses are required, user protocol::responses::fresp
|
||||
use ::sky_macros::compiled_eresp_bytes as eresp;
|
||||
/// Response code 0 as a array element
|
||||
pub const OKAY: &[u8] = eresp!("0");
|
||||
/// Response code 1 as a array element
|
||||
pub const NIL: &[u8] = eresp!("1");
|
||||
/// Response code 2 as a array element
|
||||
pub const OVERWRITE_ERR: &[u8] = eresp!("2");
|
||||
/// Response code 3 as a array element
|
||||
pub const ACTION_ERR: &[u8] = eresp!("3");
|
||||
/// Response code 4 as a array element
|
||||
pub const PACKET_ERR: &[u8] = eresp!("4");
|
||||
/// Response code 5 as a array element
|
||||
pub const SERVER_ERR: &[u8] = eresp!("5");
|
||||
/// Response code 6 as a array element
|
||||
pub const OTHER_ERR_EMPTY: &[u8] = eresp!("6");
|
||||
/// Response group element with string "HEYA"
|
||||
pub const HEYA: &[u8] = "+4\nHEY!".as_bytes();
|
||||
/// "Unknown action" error response
|
||||
pub const UNKNOWN_ACTION: &[u8] = eresp!("Unknown action");
|
||||
/// Response code 7
|
||||
pub const WRONGTYPE_ERR: &[u8] = eresp!("7");
|
||||
/// Response code 8
|
||||
pub const UNKNOWN_DATA_TYPE: &[u8] = eresp!("8");
|
||||
/// Response code 9 as an array element
|
||||
pub const ENCODING_ERROR: &[u8] = eresp!("9");
|
||||
/// Snapshot busy error
|
||||
pub const SNAPSHOT_BUSY: &[u8] = eresp!("err-snapshot-busy");
|
||||
/// Snapshot disabled (other error)
|
||||
pub const SNAPSHOT_DISABLED: &[u8] = eresp!("err-snapshot-disabled");
|
||||
/// Duplicate snapshot
|
||||
pub const SNAPSHOT_DUPLICATE: &[u8] = eresp!("duplicate-snapshot");
|
||||
/// Snapshot has illegal name (other error)
|
||||
pub const SNAPSHOT_ILLEGAL_NAME: &[u8] = eresp!("err-invalid-snapshot-name");
|
||||
/// Access after termination signal (other error)
|
||||
pub const ERR_ACCESS_AFTER_TERMSIG: &[u8] = eresp!("err-access-after-termsig");
|
||||
|
||||
// keyspace related resps
|
||||
/// The default container was not set
|
||||
pub const DEFAULT_UNSET: &[u8] = eresp!("default-container-unset");
|
||||
/// The container was not found
|
||||
pub const CONTAINER_NOT_FOUND: &[u8] = eresp!("container-not-found");
|
||||
/// The container is still in use and so cannot be removed
|
||||
pub const STILL_IN_USE: &[u8] = eresp!("still-in-use");
|
||||
/// This is a protected object and hence cannot be accessed
|
||||
pub const PROTECTED_OBJECT: &[u8] = eresp!("err-protected-object");
|
||||
/// The action was applied against the wrong model
|
||||
pub const WRONG_MODEL: &[u8] = eresp!("wrong-model");
|
||||
/// The container already exists
|
||||
pub const ALREADY_EXISTS: &[u8] = eresp!("err-already-exists");
|
||||
/// The container is not ready
|
||||
pub const NOT_READY: &[u8] = eresp!("not-ready");
|
||||
/// A transactional failure occurred
|
||||
pub const DDL_TRANSACTIONAL_FAILURE: &[u8] = eresp!("transactional-failure");
|
||||
/// An unknown DDL query was run
|
||||
pub const UNKNOWN_DDL_QUERY: &[u8] = eresp!("unknown-ddl-query");
|
||||
/// The expression for a DDL query was malformed
|
||||
pub const BAD_EXPRESSION: &[u8] = eresp!("malformed-expression");
|
||||
/// An unknown model was passed in a DDL query
|
||||
pub const UNKNOWN_MODEL: &[u8] = eresp!("unknown-model");
|
||||
/// Too many arguments were passed to model constructor
|
||||
pub const TOO_MANY_ARGUMENTS: &[u8] = eresp!("too-many-args");
|
||||
/// The container name is too long
|
||||
pub const CONTAINER_NAME_TOO_LONG: &[u8] = eresp!("container-name-too-long");
|
||||
/// The container name contains invalid characters
|
||||
pub const BAD_CONTAINER_NAME: &[u8] = eresp!("bad-container-name");
|
||||
/// An unknown inspect query
|
||||
pub const UNKNOWN_INSPECT_QUERY: &[u8] = eresp!("unknown-inspect-query");
|
||||
/// An unknown table property was passed
|
||||
pub const UNKNOWN_PROPERTY: &[u8] = eresp!("unknown-property");
|
||||
/// The keyspace is not empty and hence cannot be removed
|
||||
pub const KEYSPACE_NOT_EMPTY: &[u8] = eresp!("keyspace-not-empty");
|
||||
/// Bad type supplied in a DDL query for the key
|
||||
pub const BAD_TYPE_FOR_KEY: &[u8] = eresp!("bad-type-for-key");
|
||||
/// The index for the provided list was non-existent
|
||||
pub const LISTMAP_BAD_INDEX: &[u8] = eresp!("bad-list-index");
|
||||
/// The list is empty
|
||||
pub const LISTMAP_LIST_IS_EMPTY: &[u8] = eresp!("list-is-empty");
|
||||
}
|
||||
|
||||
pub mod full_responses {
|
||||
#![allow(unused)]
|
||||
//! # Pre-compiled **responses**
|
||||
//! These are pre-compiled **complete** responses. This means that they should
|
||||
//! be written off directly to the stream and should **not be preceded by any response metaframe**
|
||||
|
||||
/// Response code: 0 (Okay)
|
||||
pub const R_OKAY: &[u8] = "*!1\n0\n".as_bytes();
|
||||
/// Response code: 1 (Nil)
|
||||
pub const R_NIL: &[u8] = "*!1\n1\n".as_bytes();
|
||||
/// Response code: 2 (Overwrite Error)
|
||||
pub const R_OVERWRITE_ERR: &[u8] = "*!1\n2\n".as_bytes();
|
||||
/// Response code: 3 (Action Error)
|
||||
pub const R_ACTION_ERR: &[u8] = "*!1\n3\n".as_bytes();
|
||||
/// Response code: 4 (Packet Error)
|
||||
pub const R_PACKET_ERR: &[u8] = "*!1\n4\n".as_bytes();
|
||||
/// Response code: 5 (Server Error)
|
||||
pub const R_SERVER_ERR: &[u8] = "*!1\n5\n".as_bytes();
|
||||
/// Response code: 6 (Other Error _without description_)
|
||||
pub const R_OTHER_ERR_EMPTY: &[u8] = "*!1\n6\n".as_bytes();
|
||||
/// Response code: 7; wrongtype
|
||||
pub const R_WRONGTYPE_ERR: &[u8] = "*!1\n7".as_bytes();
|
||||
/// Response code: 8; unknown data type
|
||||
pub const R_UNKNOWN_DATA_TYPE: &[u8] = "*!1\n8\n".as_bytes();
|
||||
/// A heya response
|
||||
pub const R_HEYA: &[u8] = "*+4\nHEY!\n".as_bytes();
|
||||
/// An other response with description: "Unknown action"
|
||||
pub const R_UNKNOWN_ACTION: &[u8] = "*!14\nUnknown action\n".as_bytes();
|
||||
/// A 0 uint64 reply
|
||||
pub const R_ONE_INT_REPLY: &[u8] = "*:1\n1\n".as_bytes();
|
||||
/// A 1 uint64 reply
|
||||
pub const R_ZERO_INT_REPLY: &[u8] = "*:1\n0\n".as_bytes();
|
||||
/// Snapshot busy (other error)
|
||||
pub const R_SNAPSHOT_BUSY: &[u8] = "*!17\nerr-snapshot-busy\n".as_bytes();
|
||||
/// Snapshot disabled (other error)
|
||||
pub const R_SNAPSHOT_DISABLED: &[u8] = "*!21\nerr-snapshot-disabled\n".as_bytes();
|
||||
/// Snapshot has illegal name (other error)
|
||||
pub const R_SNAPSHOT_ILLEGAL_NAME: &[u8] = "*!25\nerr-invalid-snapshot-name\n".as_bytes();
|
||||
/// Access after termination signal (other error)
|
||||
pub const R_ERR_ACCESS_AFTER_TERMSIG: &[u8] = "*!24\nerr-access-after-termsig\n".as_bytes();
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Created on Mon May 02 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
extern crate test;
|
||||
use super::{super::Query, Parser};
|
||||
use test::Bencher;
|
||||
|
||||
#[bench]
|
||||
fn simple_query(b: &mut Bencher) {
|
||||
const PAYLOAD: &[u8] = b"*1\n~3\n3\nSET\n1\nx\n3\n100\n";
|
||||
let expected = vec!["SET".to_owned(), "x".to_owned(), "100".to_owned()];
|
||||
b.iter(|| {
|
||||
let (query, forward) = Parser::parse(PAYLOAD).unwrap();
|
||||
assert_eq!(forward, PAYLOAD.len());
|
||||
let query = if let Query::Simple(sq) = query {
|
||||
sq
|
||||
} else {
|
||||
panic!("Got pipeline instead of simple query");
|
||||
};
|
||||
let ret: Vec<String> = query
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|s| String::from_utf8_lossy(s.as_slice()).to_string())
|
||||
.collect();
|
||||
assert_eq!(ret, expected)
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn pipelined_query(b: &mut Bencher) {
|
||||
const PAYLOAD: &[u8] = b"*2\n~3\n3\nSET\n1\nx\n3\n100\n~2\n3\nGET\n1\nx\n";
|
||||
let expected = vec![
|
||||
vec!["SET".to_owned(), "x".to_owned(), "100".to_owned()],
|
||||
vec!["GET".to_owned(), "x".to_owned()],
|
||||
];
|
||||
b.iter(|| {
|
||||
let (query, forward) = Parser::parse(PAYLOAD).unwrap();
|
||||
assert_eq!(forward, PAYLOAD.len());
|
||||
let query = if let Query::Pipelined(sq) = query {
|
||||
sq
|
||||
} else {
|
||||
panic!("Got simple instead of pipeline query");
|
||||
};
|
||||
let ret: Vec<Vec<String>> = query
|
||||
.into_inner()
|
||||
.iter()
|
||||
.map(|query| {
|
||||
query
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|v| String::from_utf8_lossy(v.as_slice()).to_string())
|
||||
.collect()
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(ret, expected)
|
||||
});
|
||||
}
|
@ -0,0 +1,288 @@
|
||||
/*
|
||||
* Created on Mon May 02 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use {
|
||||
crate::{
|
||||
corestore::buffers::Integer64,
|
||||
dbnet::connection::{QueryWithAdvance, RawConnection, Stream},
|
||||
protocol::{
|
||||
interface::{ProtocolRead, ProtocolSpec, ProtocolWrite},
|
||||
ParseError, Skyhash1,
|
||||
},
|
||||
util::FutureResult,
|
||||
IoResult,
|
||||
},
|
||||
::sky_macros::compiled_eresp_bytes_v1 as eresp,
|
||||
tokio::io::AsyncWriteExt,
|
||||
};
|
||||
|
||||
impl ProtocolSpec for Skyhash1 {
|
||||
// spec information
|
||||
const PROTOCOL_VERSION: f32 = 1.0;
|
||||
const PROTOCOL_VERSIONSTRING: &'static str = "Skyhash-1.0";
|
||||
|
||||
// type symbols
|
||||
const TSYMBOL_STRING: u8 = b'+';
|
||||
const TSYMBOL_BINARY: u8 = b'?';
|
||||
const TSYMBOL_FLOAT: u8 = b'%';
|
||||
const TSYMBOL_INT64: u8 = b':';
|
||||
const TSYMBOL_TYPED_ARRAY: u8 = b'@';
|
||||
const TSYMBOL_TYPED_NON_NULL_ARRAY: u8 = b'^';
|
||||
const TSYMBOL_ARRAY: u8 = b'&';
|
||||
const TSYMBOL_FLAT_ARRAY: u8 = b'_';
|
||||
|
||||
// typed array
|
||||
const TYPE_TYPED_ARRAY_ELEMENT_NULL: &'static [u8] = b"\0";
|
||||
|
||||
// metaframe
|
||||
const SIMPLE_QUERY_HEADER: &'static [u8] = b"*";
|
||||
const PIPELINED_QUERY_FIRST_BYTE: u8 = b'$';
|
||||
|
||||
// respcodes
|
||||
const RCODE_OKAY: &'static [u8] = eresp!("0");
|
||||
const RCODE_NIL: &'static [u8] = eresp!("1");
|
||||
const RCODE_OVERWRITE_ERR: &'static [u8] = eresp!("2");
|
||||
const RCODE_ACTION_ERR: &'static [u8] = eresp!("3");
|
||||
const RCODE_PACKET_ERR: &'static [u8] = eresp!("4");
|
||||
const RCODE_SERVER_ERR: &'static [u8] = eresp!("5");
|
||||
const RCODE_OTHER_ERR_EMPTY: &'static [u8] = eresp!("6");
|
||||
const RCODE_UNKNOWN_ACTION: &'static [u8] = eresp!("Unknown action");
|
||||
const RCODE_WRONGTYPE_ERR: &'static [u8] = eresp!("7");
|
||||
const RCODE_UNKNOWN_DATA_TYPE: &'static [u8] = eresp!("8");
|
||||
const RCODE_ENCODING_ERROR: &'static [u8] = eresp!("9");
|
||||
|
||||
// respstrings
|
||||
const RSTRING_SNAPSHOT_BUSY: &'static [u8] = eresp!("err-snapshot-busy");
|
||||
const RSTRING_SNAPSHOT_DISABLED: &'static [u8] = eresp!("err-snapshot-disabled");
|
||||
const RSTRING_SNAPSHOT_DUPLICATE: &'static [u8] = eresp!("duplicate-snapshot");
|
||||
const RSTRING_SNAPSHOT_ILLEGAL_NAME: &'static [u8] = eresp!("err-invalid-snapshot-name");
|
||||
const RSTRING_ERR_ACCESS_AFTER_TERMSIG: &'static [u8] = eresp!("err-access-after-termsig");
|
||||
|
||||
// keyspace related resps
|
||||
const RSTRING_DEFAULT_UNSET: &'static [u8] = eresp!("default-container-unset");
|
||||
const RSTRING_CONTAINER_NOT_FOUND: &'static [u8] = eresp!("container-not-found");
|
||||
const RSTRING_STILL_IN_USE: &'static [u8] = eresp!("still-in-use");
|
||||
const RSTRING_PROTECTED_OBJECT: &'static [u8] = eresp!("err-protected-object");
|
||||
const RSTRING_WRONG_MODEL: &'static [u8] = eresp!("wrong-model");
|
||||
const RSTRING_ALREADY_EXISTS: &'static [u8] = eresp!("err-already-exists");
|
||||
const RSTRING_NOT_READY: &'static [u8] = eresp!("not-ready");
|
||||
const RSTRING_DDL_TRANSACTIONAL_FAILURE: &'static [u8] = eresp!("transactional-failure");
|
||||
const RSTRING_UNKNOWN_DDL_QUERY: &'static [u8] = eresp!("unknown-ddl-query");
|
||||
const RSTRING_BAD_EXPRESSION: &'static [u8] = eresp!("malformed-expression");
|
||||
const RSTRING_UNKNOWN_MODEL: &'static [u8] = eresp!("unknown-model");
|
||||
const RSTRING_TOO_MANY_ARGUMENTS: &'static [u8] = eresp!("too-many-args");
|
||||
const RSTRING_CONTAINER_NAME_TOO_LONG: &'static [u8] = eresp!("container-name-too-long");
|
||||
const RSTRING_BAD_CONTAINER_NAME: &'static [u8] = eresp!("bad-container-name");
|
||||
const RSTRING_UNKNOWN_INSPECT_QUERY: &'static [u8] = eresp!("unknown-inspect-query");
|
||||
const RSTRING_UNKNOWN_PROPERTY: &'static [u8] = eresp!("unknown-property");
|
||||
const RSTRING_KEYSPACE_NOT_EMPTY: &'static [u8] = eresp!("keyspace-not-empty");
|
||||
const RSTRING_BAD_TYPE_FOR_KEY: &'static [u8] = eresp!("bad-type-for-key");
|
||||
const RSTRING_LISTMAP_BAD_INDEX: &'static [u8] = eresp!("bad-list-index");
|
||||
const RSTRING_LISTMAP_LIST_IS_EMPTY: &'static [u8] = eresp!("list-is-empty");
|
||||
|
||||
// elements
|
||||
const ELEMRESP_HEYA: &'static [u8] = b"+4\nHEY!\n";
|
||||
|
||||
// full responses
|
||||
const FULLRESP_RCODE_PACKET_ERR: &'static [u8] = b"*1\n!1\n4\n";
|
||||
const FULLRESP_RCODE_WRONG_TYPE: &'static [u8] = b"*1\n!1\n7\n";
|
||||
|
||||
// auth rcodes/strings
|
||||
const AUTH_ERROR_ALREADYCLAIMED: &'static [u8] = eresp!("err-auth-already-claimed");
|
||||
const AUTH_CODE_BAD_CREDENTIALS: &'static [u8] = eresp!("10");
|
||||
const AUTH_ERROR_DISABLED: &'static [u8] = eresp!("err-auth-disabled");
|
||||
const AUTH_CODE_PERMS: &'static [u8] = eresp!("11");
|
||||
const AUTH_ERROR_ILLEGAL_USERNAME: &'static [u8] = eresp!("err-auth-illegal-username");
|
||||
const AUTH_ERROR_FAILED_TO_DELETE_USER: &'static [u8] = eresp!("err-auth-deluser-fail");
|
||||
}
|
||||
|
||||
impl<Strm, T> ProtocolRead<Skyhash1, Strm> for T
|
||||
where
|
||||
T: RawConnection<Skyhash1, Strm> + Send + Sync,
|
||||
Strm: Stream,
|
||||
{
|
||||
fn try_query(&self) -> Result<QueryWithAdvance, ParseError> {
|
||||
Skyhash1::parse(self.get_buffer())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Strm, T> ProtocolWrite<Skyhash1, Strm> for T
|
||||
where
|
||||
T: RawConnection<Skyhash1, Strm> + Send + Sync,
|
||||
Strm: Stream,
|
||||
{
|
||||
fn write_mono_length_prefixed_with_tsymbol<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
data: &'life1 [u8],
|
||||
tsymbol: u8,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// <tsymbol><length><lf>
|
||||
stream.write_all(&[tsymbol]).await?;
|
||||
stream.write_all(&Integer64::from(data.len())).await?;
|
||||
stream.write_all(&[Skyhash1::LF]).await?;
|
||||
// <data><lf>
|
||||
stream.write_all(data).await?;
|
||||
stream.write_all(&[Skyhash1::LF]).await
|
||||
})
|
||||
}
|
||||
fn write_string<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
string: &'life1 str,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// tsymbol
|
||||
stream.write_all(&[Skyhash1::TSYMBOL_STRING]).await?;
|
||||
// length
|
||||
let len_bytes = Integer64::from(string.len());
|
||||
stream.write_all(&len_bytes).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash1::LF]).await?;
|
||||
// payload
|
||||
stream.write_all(string.as_bytes()).await?;
|
||||
// final LF
|
||||
stream.write_all(&[Skyhash1::LF]).await
|
||||
})
|
||||
}
|
||||
fn write_binary<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
binary: &'life1 [u8],
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// tsymbol
|
||||
stream.write_all(&[Skyhash1::TSYMBOL_BINARY]).await?;
|
||||
// length
|
||||
let len_bytes = Integer64::from(binary.len());
|
||||
stream.write_all(&len_bytes).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash1::LF]).await?;
|
||||
// payload
|
||||
stream.write_all(binary).await?;
|
||||
// final LF
|
||||
stream.write_all(&[Skyhash1::LF]).await
|
||||
})
|
||||
}
|
||||
fn write_usize<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
size: usize,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move { self.write_int64(size as _).await })
|
||||
}
|
||||
fn write_int64<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
int: u64,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// tsymbol
|
||||
stream.write_all(&[Skyhash1::TSYMBOL_INT64]).await?;
|
||||
// get body and sizeline
|
||||
let body = Integer64::from(int);
|
||||
let body_len = Integer64::from(body.len());
|
||||
// len of body
|
||||
stream.write_all(&body_len).await?;
|
||||
// sizeline LF
|
||||
stream.write_all(&[Skyhash1::LF]).await?;
|
||||
// body
|
||||
stream.write_all(&body).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash1::LF]).await
|
||||
})
|
||||
}
|
||||
fn write_float<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
float: f32,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// tsymbol
|
||||
stream.write_all(&[Skyhash1::TSYMBOL_FLOAT]).await?;
|
||||
// get body and sizeline
|
||||
let body = float.to_string();
|
||||
let body = body.as_bytes();
|
||||
let sizeline = Integer64::from(body.len());
|
||||
// sizeline
|
||||
stream.write_all(&sizeline).await?;
|
||||
// sizeline LF
|
||||
stream.write_all(&[Skyhash1::LF]).await?;
|
||||
// body
|
||||
stream.write_all(body).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash1::LF]).await
|
||||
})
|
||||
}
|
||||
fn write_typed_array_element<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
element: &'life1 [u8],
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// len
|
||||
stream.write_all(&Integer64::from(element.len())).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash1::LF]).await?;
|
||||
// body
|
||||
stream.write_all(element).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash1::LF]).await
|
||||
})
|
||||
}
|
||||
}
|
@ -0,0 +1,241 @@
|
||||
/*
|
||||
* Created on Sat Apr 30 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use {
|
||||
super::{
|
||||
raw_parser::{RawParser, RawParserExt, RawParserMeta},
|
||||
ParseError, ParseResult, PipelinedQuery, Query, SimpleQuery, UnsafeSlice,
|
||||
},
|
||||
crate::{
|
||||
corestore::heap_array::{HeapArray, HeapArrayWriter},
|
||||
dbnet::connection::QueryWithAdvance,
|
||||
},
|
||||
};
|
||||
|
||||
mod interface_impls;
|
||||
// test and bench modules
|
||||
#[cfg(feature = "nightly")]
|
||||
mod benches;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// A parser for Skyhash 1.0
|
||||
///
|
||||
/// Packet structure example (simple query):
|
||||
/// ```text
|
||||
/// *1\n
|
||||
/// ~3\n
|
||||
/// 3\n
|
||||
/// SET\n
|
||||
/// 1\n
|
||||
/// x\n
|
||||
/// 3\n
|
||||
/// 100\n
|
||||
/// ```
|
||||
pub struct Parser {
|
||||
end: *const u8,
|
||||
cursor: *const u8,
|
||||
}
|
||||
|
||||
unsafe impl RawParser for Parser {
|
||||
fn cursor_ptr(&self) -> *const u8 {
|
||||
self.cursor
|
||||
}
|
||||
fn cursor_ptr_mut(&mut self) -> &mut *const u8 {
|
||||
&mut self.cursor
|
||||
}
|
||||
fn data_end_ptr(&self) -> *const u8 {
|
||||
self.end
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for Parser {}
|
||||
unsafe impl Sync for Parser {}
|
||||
|
||||
impl Parser {
|
||||
/// Initialize a new parser
|
||||
fn new(slice: &[u8]) -> Self {
|
||||
unsafe {
|
||||
Self {
|
||||
end: slice.as_ptr().add(slice.len()),
|
||||
cursor: slice.as_ptr(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// utility methods
|
||||
impl Parser {
|
||||
/// Returns true if the cursor will give a char, but if `this_if_nothing_ahead` is set
|
||||
/// to true, then if no byte is ahead, it will still return true
|
||||
fn will_cursor_give_char(&self, ch: u8, true_if_nothing_ahead: bool) -> ParseResult<bool> {
|
||||
if self.exhausted() {
|
||||
// nothing left
|
||||
if true_if_nothing_ahead {
|
||||
Ok(true)
|
||||
} else {
|
||||
Err(ParseError::NotEnough)
|
||||
}
|
||||
} else if unsafe { self.get_byte_at_cursor().eq(&ch) } {
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
/// Check if the current cursor will give an LF
|
||||
fn will_cursor_give_linefeed(&self) -> ParseResult<bool> {
|
||||
self.will_cursor_give_char(b'\n', false)
|
||||
}
|
||||
/// Gets the _next element. **The cursor should be at the tsymbol (passed)**
|
||||
fn _next(&mut self) -> ParseResult<UnsafeSlice> {
|
||||
let element_size = self.read_usize()?;
|
||||
self.read_until(element_size)
|
||||
}
|
||||
}
|
||||
|
||||
// higher level abstractions
|
||||
impl Parser {
|
||||
/// Parse the next blob. **The cursor should be at the tsymbol (passed)**
|
||||
fn parse_next_blob(&mut self) -> ParseResult<UnsafeSlice> {
|
||||
{
|
||||
let chunk = self._next()?;
|
||||
if self.will_cursor_give_linefeed()? {
|
||||
unsafe {
|
||||
// UNSAFE(@ohsayan): We know that the buffer is not exhausted
|
||||
// due to the above condition
|
||||
self.incr_cursor();
|
||||
}
|
||||
Ok(chunk)
|
||||
} else {
|
||||
Err(ParseError::UnexpectedByte)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// query abstractions
|
||||
impl Parser {
|
||||
/// The buffer should resemble the below structure:
|
||||
/// ```
|
||||
/// ~<count>\n
|
||||
/// <e0l0>\n
|
||||
/// <e0>\n
|
||||
/// <e1l1>\n
|
||||
/// <e1>\n
|
||||
/// ...
|
||||
/// ```
|
||||
fn _parse_simple_query(&mut self) -> ParseResult<HeapArray<UnsafeSlice>> {
|
||||
if self.not_exhausted() {
|
||||
if unsafe { self.get_byte_at_cursor() } != b'~' {
|
||||
// we need an any array
|
||||
return Err(ParseError::WrongType);
|
||||
}
|
||||
unsafe {
|
||||
// UNSAFE(@ohsayan): Just checked length
|
||||
self.incr_cursor();
|
||||
}
|
||||
let query_count = self.read_usize()?;
|
||||
let mut writer = HeapArrayWriter::with_capacity(query_count);
|
||||
for i in 0..query_count {
|
||||
unsafe {
|
||||
// UNSAFE(@ohsayan): The index of the for loop ensures that
|
||||
// we never attempt to write to a bad memory location
|
||||
writer.write_to_index(i, self.parse_next_blob()?);
|
||||
}
|
||||
}
|
||||
Ok(unsafe {
|
||||
// UNSAFE(@ohsayan): If we've reached here, then we have initialized
|
||||
// all the queries
|
||||
writer.finish()
|
||||
})
|
||||
} else {
|
||||
Err(ParseError::NotEnough)
|
||||
}
|
||||
}
|
||||
fn parse_simple_query(&mut self) -> ParseResult<SimpleQuery> {
|
||||
Ok(SimpleQuery::new(self._parse_simple_query()?))
|
||||
}
|
||||
/// The buffer should resemble the following structure:
|
||||
/// ```text
|
||||
/// # query 1
|
||||
/// ~<count>\n
|
||||
/// <e0l0>\n
|
||||
/// <e0>\n
|
||||
/// <e1l1>\n
|
||||
/// <e1>\n
|
||||
/// # query 2
|
||||
/// ~<count>\n
|
||||
/// <e0l0>\n
|
||||
/// <e0>\n
|
||||
/// <e1l1>\n
|
||||
/// <e1>\n
|
||||
/// ...
|
||||
/// ```
|
||||
fn parse_pipelined_query(&mut self, length: usize) -> ParseResult<PipelinedQuery> {
|
||||
let mut writer = HeapArrayWriter::with_capacity(length);
|
||||
for i in 0..length {
|
||||
unsafe {
|
||||
// UNSAFE(@ohsayan): The above condition guarantees that the index
|
||||
// never causes an overflow
|
||||
writer.write_to_index(i, self._parse_simple_query()?);
|
||||
}
|
||||
}
|
||||
unsafe {
|
||||
// UNSAFE(@ohsayan): if we reached here, then we have inited everything
|
||||
Ok(PipelinedQuery::new(writer.finish()))
|
||||
}
|
||||
}
|
||||
fn _parse(&mut self) -> ParseResult<Query> {
|
||||
if self.not_exhausted() {
|
||||
let first_byte = unsafe {
|
||||
// UNSAFE(@ohsayan): Just checked if buffer is exhausted or not
|
||||
self.get_byte_at_cursor()
|
||||
};
|
||||
if first_byte != b'*' {
|
||||
// unknown query scheme, so it's a bad packet
|
||||
return Err(ParseError::BadPacket);
|
||||
}
|
||||
unsafe {
|
||||
// UNSAFE(@ohsayan): Checked buffer len and incremented, so we're good
|
||||
self.incr_cursor()
|
||||
};
|
||||
let query_count = self.read_usize()?; // get the length
|
||||
if query_count == 1 {
|
||||
Ok(Query::Simple(self.parse_simple_query()?))
|
||||
} else {
|
||||
Ok(Query::Pipelined(self.parse_pipelined_query(query_count)?))
|
||||
}
|
||||
} else {
|
||||
Err(ParseError::NotEnough)
|
||||
}
|
||||
}
|
||||
pub fn parse(buf: &[u8]) -> ParseResult<QueryWithAdvance> {
|
||||
let mut slf = Self::new(buf);
|
||||
let body = slf._parse()?;
|
||||
let consumed = slf.cursor_ptr() as usize - buf.as_ptr() as usize;
|
||||
Ok((body, consumed))
|
||||
}
|
||||
}
|
@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Created on Mon May 02 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use {
|
||||
super::Parser,
|
||||
crate::protocol::{ParseError, Query},
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
const SQPAYLOAD: &[u8] = b"*1\n~3\n3\nSET\n1\nx\n3\n100\n";
|
||||
#[cfg(test)]
|
||||
const PQPAYLOAD: &[u8] = b"*2\n~3\n3\nSET\n1\nx\n3\n100\n~2\n3\nGET\n1\nx\n";
|
||||
|
||||
#[test]
|
||||
fn parse_simple_query() {
|
||||
let payload = SQPAYLOAD.to_vec();
|
||||
let (q, f) = Parser::parse(&payload).unwrap();
|
||||
let q: Vec<String> = if let Query::Simple(q) = q {
|
||||
q.as_slice()
|
||||
.iter()
|
||||
.map(|v| String::from_utf8_lossy(v.as_slice()).to_string())
|
||||
.collect()
|
||||
} else {
|
||||
panic!("Expected simple query")
|
||||
};
|
||||
assert_eq!(f, payload.len());
|
||||
assert_eq!(q, vec!["SET".to_owned(), "x".into(), "100".into()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_simple_query_incomplete() {
|
||||
for i in 0..SQPAYLOAD.len() - 1 {
|
||||
let slice = &SQPAYLOAD[..i];
|
||||
assert_eq!(Parser::parse(slice).unwrap_err(), ParseError::NotEnough);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_pipelined_query() {
|
||||
let payload = PQPAYLOAD.to_vec();
|
||||
let (q, f) = Parser::parse(&payload).unwrap();
|
||||
let q: Vec<Vec<String>> = if let Query::Pipelined(q) = q {
|
||||
q.into_inner()
|
||||
.iter()
|
||||
.map(|sq| {
|
||||
sq.iter()
|
||||
.map(|v| String::from_utf8_lossy(v.as_slice()).to_string())
|
||||
.collect()
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
panic!("Expected pipelined query query")
|
||||
};
|
||||
assert_eq!(f, payload.len());
|
||||
assert_eq!(
|
||||
q,
|
||||
vec![
|
||||
vec!["SET".to_owned(), "x".into(), "100".into()],
|
||||
vec!["GET".into(), "x".into()]
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_pipelined_query_incomplete() {
|
||||
for i in 0..PQPAYLOAD.len() - 1 {
|
||||
let slice = &PQPAYLOAD[..i];
|
||||
assert_eq!(Parser::parse(slice).unwrap_err(), ParseError::NotEnough);
|
||||
}
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Created on Sat Apr 30 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
extern crate test;
|
||||
use super::{super::Query, Parser};
|
||||
use test::Bencher;
|
||||
|
||||
#[bench]
|
||||
fn simple_query(b: &mut Bencher) {
|
||||
const PAYLOAD: &[u8] = b"*3\n3\nSET1\nx3\n100";
|
||||
let expected = vec!["SET".to_owned(), "x".to_owned(), "100".to_owned()];
|
||||
b.iter(|| {
|
||||
let (query, forward) = Parser::parse(PAYLOAD).unwrap();
|
||||
assert_eq!(forward, PAYLOAD.len());
|
||||
let query = if let Query::Simple(sq) = query {
|
||||
sq
|
||||
} else {
|
||||
panic!("Got pipeline instead of simple query");
|
||||
};
|
||||
let ret: Vec<String> = query
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|s| String::from_utf8_lossy(s.as_slice()).to_string())
|
||||
.collect();
|
||||
assert_eq!(ret, expected)
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn pipelined_query(b: &mut Bencher) {
|
||||
const PAYLOAD: &[u8] = b"$2\n3\n3\nSET1\nx3\n1002\n3\nGET1\nx";
|
||||
let expected = vec![
|
||||
vec!["SET".to_owned(), "x".to_owned(), "100".to_owned()],
|
||||
vec!["GET".to_owned(), "x".to_owned()],
|
||||
];
|
||||
b.iter(|| {
|
||||
let (query, forward) = Parser::parse(PAYLOAD).unwrap();
|
||||
assert_eq!(forward, PAYLOAD.len());
|
||||
let query = if let Query::Pipelined(sq) = query {
|
||||
sq
|
||||
} else {
|
||||
panic!("Got simple instead of pipeline query");
|
||||
};
|
||||
let ret: Vec<Vec<String>> = query
|
||||
.into_inner()
|
||||
.iter()
|
||||
.map(|query| {
|
||||
query
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|v| String::from_utf8_lossy(v.as_slice()).to_string())
|
||||
.collect()
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(ret, expected)
|
||||
});
|
||||
}
|
@ -0,0 +1,263 @@
|
||||
/*
|
||||
* Created on Sat Apr 30 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use crate::{
|
||||
corestore::buffers::Integer64,
|
||||
dbnet::connection::{QueryWithAdvance, RawConnection, Stream},
|
||||
protocol::{
|
||||
interface::{ProtocolRead, ProtocolSpec, ProtocolWrite},
|
||||
ParseError, Skyhash2,
|
||||
},
|
||||
util::FutureResult,
|
||||
IoResult,
|
||||
};
|
||||
use ::sky_macros::compiled_eresp_bytes as eresp;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
impl ProtocolSpec for Skyhash2 {
|
||||
// spec information
|
||||
const PROTOCOL_VERSION: f32 = 2.0;
|
||||
const PROTOCOL_VERSIONSTRING: &'static str = "Skyhash-2.0";
|
||||
|
||||
// type symbols
|
||||
const TSYMBOL_STRING: u8 = b'+';
|
||||
const TSYMBOL_BINARY: u8 = b'?';
|
||||
const TSYMBOL_FLOAT: u8 = b'%';
|
||||
const TSYMBOL_INT64: u8 = b':';
|
||||
const TSYMBOL_TYPED_ARRAY: u8 = b'@';
|
||||
const TSYMBOL_TYPED_NON_NULL_ARRAY: u8 = b'^';
|
||||
const TSYMBOL_ARRAY: u8 = b'&';
|
||||
const TSYMBOL_FLAT_ARRAY: u8 = b'_';
|
||||
|
||||
// typed array
|
||||
const TYPE_TYPED_ARRAY_ELEMENT_NULL: &'static [u8] = b"\0";
|
||||
|
||||
// metaframe
|
||||
const SIMPLE_QUERY_HEADER: &'static [u8] = b"*";
|
||||
const PIPELINED_QUERY_FIRST_BYTE: u8 = b'$';
|
||||
|
||||
// respcodes
|
||||
const RCODE_OKAY: &'static [u8] = eresp!("0");
|
||||
const RCODE_NIL: &'static [u8] = eresp!("1");
|
||||
const RCODE_OVERWRITE_ERR: &'static [u8] = eresp!("2");
|
||||
const RCODE_ACTION_ERR: &'static [u8] = eresp!("3");
|
||||
const RCODE_PACKET_ERR: &'static [u8] = eresp!("4");
|
||||
const RCODE_SERVER_ERR: &'static [u8] = eresp!("5");
|
||||
const RCODE_OTHER_ERR_EMPTY: &'static [u8] = eresp!("6");
|
||||
const RCODE_UNKNOWN_ACTION: &'static [u8] = eresp!("Unknown action");
|
||||
const RCODE_WRONGTYPE_ERR: &'static [u8] = eresp!("7");
|
||||
const RCODE_UNKNOWN_DATA_TYPE: &'static [u8] = eresp!("8");
|
||||
const RCODE_ENCODING_ERROR: &'static [u8] = eresp!("9");
|
||||
|
||||
// respstrings
|
||||
const RSTRING_SNAPSHOT_BUSY: &'static [u8] = eresp!("err-snapshot-busy");
|
||||
const RSTRING_SNAPSHOT_DISABLED: &'static [u8] = eresp!("err-snapshot-disabled");
|
||||
const RSTRING_SNAPSHOT_DUPLICATE: &'static [u8] = eresp!("duplicate-snapshot");
|
||||
const RSTRING_SNAPSHOT_ILLEGAL_NAME: &'static [u8] = eresp!("err-invalid-snapshot-name");
|
||||
const RSTRING_ERR_ACCESS_AFTER_TERMSIG: &'static [u8] = eresp!("err-access-after-termsig");
|
||||
|
||||
// keyspace related resps
|
||||
const RSTRING_DEFAULT_UNSET: &'static [u8] = eresp!("default-container-unset");
|
||||
const RSTRING_CONTAINER_NOT_FOUND: &'static [u8] = eresp!("container-not-found");
|
||||
const RSTRING_STILL_IN_USE: &'static [u8] = eresp!("still-in-use");
|
||||
const RSTRING_PROTECTED_OBJECT: &'static [u8] = eresp!("err-protected-object");
|
||||
const RSTRING_WRONG_MODEL: &'static [u8] = eresp!("wrong-model");
|
||||
const RSTRING_ALREADY_EXISTS: &'static [u8] = eresp!("err-already-exists");
|
||||
const RSTRING_NOT_READY: &'static [u8] = eresp!("not-ready");
|
||||
const RSTRING_DDL_TRANSACTIONAL_FAILURE: &'static [u8] = eresp!("transactional-failure");
|
||||
const RSTRING_UNKNOWN_DDL_QUERY: &'static [u8] = eresp!("unknown-ddl-query");
|
||||
const RSTRING_BAD_EXPRESSION: &'static [u8] = eresp!("malformed-expression");
|
||||
const RSTRING_UNKNOWN_MODEL: &'static [u8] = eresp!("unknown-model");
|
||||
const RSTRING_TOO_MANY_ARGUMENTS: &'static [u8] = eresp!("too-many-args");
|
||||
const RSTRING_CONTAINER_NAME_TOO_LONG: &'static [u8] = eresp!("container-name-too-long");
|
||||
const RSTRING_BAD_CONTAINER_NAME: &'static [u8] = eresp!("bad-container-name");
|
||||
const RSTRING_UNKNOWN_INSPECT_QUERY: &'static [u8] = eresp!("unknown-inspect-query");
|
||||
const RSTRING_UNKNOWN_PROPERTY: &'static [u8] = eresp!("unknown-property");
|
||||
const RSTRING_KEYSPACE_NOT_EMPTY: &'static [u8] = eresp!("keyspace-not-empty");
|
||||
const RSTRING_BAD_TYPE_FOR_KEY: &'static [u8] = eresp!("bad-type-for-key");
|
||||
const RSTRING_LISTMAP_BAD_INDEX: &'static [u8] = eresp!("bad-list-index");
|
||||
const RSTRING_LISTMAP_LIST_IS_EMPTY: &'static [u8] = eresp!("list-is-empty");
|
||||
|
||||
// elements
|
||||
const ELEMRESP_HEYA: &'static [u8] = b"+4\nHEY!";
|
||||
|
||||
// full responses
|
||||
const FULLRESP_RCODE_PACKET_ERR: &'static [u8] = b"*!4\n";
|
||||
const FULLRESP_RCODE_WRONG_TYPE: &'static [u8] = b"*!7\n";
|
||||
|
||||
// auth respcodes/strings
|
||||
const AUTH_ERROR_ALREADYCLAIMED: &'static [u8] = eresp!("err-auth-already-claimed");
|
||||
const AUTH_CODE_BAD_CREDENTIALS: &'static [u8] = eresp!("10");
|
||||
const AUTH_ERROR_DISABLED: &'static [u8] = eresp!("err-auth-disabled");
|
||||
const AUTH_CODE_PERMS: &'static [u8] = eresp!("11");
|
||||
const AUTH_ERROR_ILLEGAL_USERNAME: &'static [u8] = eresp!("err-auth-illegal-username");
|
||||
const AUTH_ERROR_FAILED_TO_DELETE_USER: &'static [u8] = eresp!("err-auth-deluser-fail");
|
||||
}
|
||||
|
||||
impl<Strm, T> ProtocolRead<Skyhash2, Strm> for T
|
||||
where
|
||||
T: RawConnection<Skyhash2, Strm> + Send + Sync,
|
||||
Strm: Stream,
|
||||
{
|
||||
fn try_query(&self) -> Result<QueryWithAdvance, ParseError> {
|
||||
Skyhash2::parse(self.get_buffer())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Strm, T> ProtocolWrite<Skyhash2, Strm> for T
|
||||
where
|
||||
T: RawConnection<Skyhash2, Strm> + Send + Sync,
|
||||
Strm: Stream,
|
||||
{
|
||||
fn write_mono_length_prefixed_with_tsymbol<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
data: &'life1 [u8],
|
||||
tsymbol: u8,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: Send + 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// <tsymbol><length><lf>
|
||||
stream.write_all(&[tsymbol]).await?;
|
||||
stream.write_all(&Integer64::from(data.len())).await?;
|
||||
stream.write_all(&[Skyhash2::LF]).await?;
|
||||
stream.write_all(data).await
|
||||
})
|
||||
}
|
||||
fn write_string<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
string: &'life1 str,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// tsymbol
|
||||
stream.write_all(&[Skyhash2::TSYMBOL_STRING]).await?;
|
||||
// length
|
||||
let len_bytes = Integer64::from(string.len());
|
||||
stream.write_all(&len_bytes).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash2::LF]).await?;
|
||||
// payload
|
||||
stream.write_all(string.as_bytes()).await
|
||||
})
|
||||
}
|
||||
fn write_binary<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
binary: &'life1 [u8],
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// tsymbol
|
||||
stream.write_all(&[Skyhash2::TSYMBOL_BINARY]).await?;
|
||||
// length
|
||||
let len_bytes = Integer64::from(binary.len());
|
||||
stream.write_all(&len_bytes).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash2::LF]).await?;
|
||||
// payload
|
||||
stream.write_all(binary).await
|
||||
})
|
||||
}
|
||||
fn write_usize<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
size: usize,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move { self.write_int64(size as _).await })
|
||||
}
|
||||
fn write_int64<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
int: u64,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// tsymbol
|
||||
stream.write_all(&[Skyhash2::TSYMBOL_INT64]).await?;
|
||||
// body
|
||||
stream.write_all(&Integer64::from(int)).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash2::LF]).await
|
||||
})
|
||||
}
|
||||
fn write_float<'life0, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
float: f32,
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// tsymbol
|
||||
stream.write_all(&[Skyhash2::TSYMBOL_FLOAT]).await?;
|
||||
// body
|
||||
stream.write_all(float.to_string().as_bytes()).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash2::LF]).await
|
||||
})
|
||||
}
|
||||
fn write_typed_array_element<'life0, 'life1, 'ret_life>(
|
||||
&'life0 mut self,
|
||||
element: &'life1 [u8],
|
||||
) -> FutureResult<'ret_life, IoResult<()>>
|
||||
where
|
||||
'life0: 'ret_life,
|
||||
'life1: 'ret_life,
|
||||
Self: 'ret_life,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let stream = self.get_mut_stream();
|
||||
// len
|
||||
stream.write_all(&Integer64::from(element.len())).await?;
|
||||
// LF
|
||||
stream.write_all(&[Skyhash2::LF]).await?;
|
||||
// body
|
||||
stream.write_all(element).await
|
||||
})
|
||||
}
|
||||
}
|
@ -0,0 +1,186 @@
|
||||
/*
|
||||
* Created on Fri Apr 29 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
mod interface_impls;
|
||||
|
||||
use {
|
||||
super::{
|
||||
raw_parser::{RawParser, RawParserExt, RawParserMeta},
|
||||
ParseError, ParseResult, PipelinedQuery, Query, SimpleQuery, UnsafeSlice,
|
||||
},
|
||||
crate::{corestore::heap_array::HeapArray, dbnet::connection::QueryWithAdvance},
|
||||
};
|
||||
|
||||
#[cfg(feature = "nightly")]
|
||||
mod benches;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// A parser for Skyhash 2.0
|
||||
pub struct Parser {
|
||||
end: *const u8,
|
||||
cursor: *const u8,
|
||||
}
|
||||
|
||||
unsafe impl RawParser for Parser {
|
||||
fn cursor_ptr(&self) -> *const u8 {
|
||||
self.cursor
|
||||
}
|
||||
fn cursor_ptr_mut(&mut self) -> &mut *const u8 {
|
||||
&mut self.cursor
|
||||
}
|
||||
fn data_end_ptr(&self) -> *const u8 {
|
||||
self.end
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Sync for Parser {}
|
||||
unsafe impl Send for Parser {}
|
||||
|
||||
impl Parser {
|
||||
/// Initialize a new parser
|
||||
fn new(slice: &[u8]) -> Self {
|
||||
unsafe {
|
||||
Self {
|
||||
end: slice.as_ptr().add(slice.len()),
|
||||
cursor: slice.as_ptr(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// query impls
|
||||
impl Parser {
|
||||
/// Parse the next simple query. This should have passed the `*` tsymbol
|
||||
///
|
||||
/// Simple query structure (tokenized line-by-line):
|
||||
/// ```text
|
||||
/// * -> Simple Query Header
|
||||
/// <n>\n -> Count of elements in the simple query
|
||||
/// <l0>\n -> Length of element 1
|
||||
/// <e0> -> element 1 itself
|
||||
/// <l1>\n -> Length of element 2
|
||||
/// <e1> -> element 2 itself
|
||||
/// ...
|
||||
/// ```
|
||||
fn _next_simple_query(&mut self) -> ParseResult<HeapArray<UnsafeSlice>> {
|
||||
let element_count = self.read_usize()?;
|
||||
unsafe {
|
||||
let mut data = HeapArray::new_writer(element_count);
|
||||
for i in 0..element_count {
|
||||
let element_size = self.read_usize()?;
|
||||
let element = self.read_until(element_size)?;
|
||||
data.write_to_index(i, element);
|
||||
}
|
||||
Ok(data.finish())
|
||||
}
|
||||
}
|
||||
/// Parse a simple query
|
||||
fn next_simple_query(&mut self) -> ParseResult<SimpleQuery> {
|
||||
Ok(SimpleQuery::new(self._next_simple_query()?))
|
||||
}
|
||||
/// Parse a pipelined query. This should have passed the `$` tsymbol
|
||||
///
|
||||
/// Pipelined query structure (tokenized line-by-line):
|
||||
/// ```text
|
||||
/// $ -> Pipeline
|
||||
/// <n>\n -> Pipeline has n queries
|
||||
/// <lq0>\n -> Query 1 has 3 elements
|
||||
/// <lq0e0>\n -> Q1E1 has 3 bytes
|
||||
/// <q0e0> -> Q1E1 itself
|
||||
/// <lq0e1>\n -> Q1E2 has 1 byte
|
||||
/// <q0e1> -> Q1E2 itself
|
||||
/// <lq0e2>\n -> Q1E3 has 3 bytes
|
||||
/// <q0e2> -> Q1E3 itself
|
||||
/// <lq1>\n -> Query 2 has 2 elements
|
||||
/// <lq1e0>\n -> Q2E1 has 3 bytes
|
||||
/// <q1e0> -> Q2E1 itself
|
||||
/// <lq1e1>\n -> Q2E2 has 1 byte
|
||||
/// <q1e1> -> Q2E2 itself
|
||||
/// ...
|
||||
/// ```
|
||||
///
|
||||
/// Example:
|
||||
/// ```text
|
||||
/// $ -> Pipeline
|
||||
/// 2\n -> Pipeline has 2 queries
|
||||
/// 3\n -> Query 1 has 3 elements
|
||||
/// 3\n -> Q1E1 has 3 bytes
|
||||
/// SET -> Q1E1 itself
|
||||
/// 1\n -> Q1E2 has 1 byte
|
||||
/// x -> Q1E2 itself
|
||||
/// 3\n -> Q1E3 has 3 bytes
|
||||
/// 100 -> Q1E3 itself
|
||||
/// 2\n -> Query 2 has 2 elements
|
||||
/// 3\n -> Q2E1 has 3 bytes
|
||||
/// GET -> Q2E1 itself
|
||||
/// 1\n -> Q2E2 has 1 byte
|
||||
/// x -> Q2E2 itself
|
||||
/// ```
|
||||
fn next_pipeline(&mut self) -> ParseResult<PipelinedQuery> {
|
||||
let query_count = self.read_usize()?;
|
||||
unsafe {
|
||||
let mut queries = HeapArray::new_writer(query_count);
|
||||
for i in 0..query_count {
|
||||
let sq = self._next_simple_query()?;
|
||||
queries.write_to_index(i, sq);
|
||||
}
|
||||
Ok(PipelinedQuery {
|
||||
data: queries.finish(),
|
||||
})
|
||||
}
|
||||
}
|
||||
fn _parse(&mut self) -> ParseResult<Query> {
|
||||
if self.not_exhausted() {
|
||||
unsafe {
|
||||
let first_byte = self.get_byte_at_cursor();
|
||||
self.incr_cursor();
|
||||
let data = match first_byte {
|
||||
b'*' => {
|
||||
// a simple query
|
||||
Query::Simple(self.next_simple_query()?)
|
||||
}
|
||||
b'$' => {
|
||||
// a pipelined query
|
||||
Query::Pipelined(self.next_pipeline()?)
|
||||
}
|
||||
_ => return Err(ParseError::UnexpectedByte),
|
||||
};
|
||||
Ok(data)
|
||||
}
|
||||
} else {
|
||||
Err(ParseError::NotEnough)
|
||||
}
|
||||
}
|
||||
// only expose this. don't expose Self::new since that'll be _relatively easier_ to
|
||||
// invalidate invariants for
|
||||
pub fn parse(buf: &[u8]) -> ParseResult<QueryWithAdvance> {
|
||||
let mut slf = Self::new(buf);
|
||||
let body = slf._parse()?;
|
||||
let consumed = slf.cursor_ptr() as usize - buf.as_ptr() as usize;
|
||||
Ok((body, consumed))
|
||||
}
|
||||
}
|
@ -1,226 +0,0 @@
|
||||
/*
|
||||
* Created on Mon Aug 17 2020
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2020, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
#![allow(clippy::needless_lifetimes)]
|
||||
|
||||
//! Utilities for generating responses, which are only used by the `server`
|
||||
//!
|
||||
use crate::corestore::buffers::Integer64;
|
||||
use crate::corestore::memstore::ObjectID;
|
||||
use crate::util::FutureResult;
|
||||
use bytes::Bytes;
|
||||
use std::io::Error as IoError;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
pub mod writer;
|
||||
|
||||
pub const TSYMBOL_UNICODE_STRING: u8 = b'+';
|
||||
pub const TSYMBOL_FLOAT: u8 = b'%';
|
||||
|
||||
type FutureIoResult<'s> = FutureResult<'s, Result<(), IoError>>;
|
||||
|
||||
/// # The `Writable` trait
|
||||
/// All trait implementors are given access to an asynchronous stream to which
|
||||
/// they must write a response.
|
||||
///
|
||||
/// Every `write()` call makes a call to the [`IsConnection`](./IsConnection)'s
|
||||
/// `write_lowlevel` function, which in turn writes something to the underlying stream.
|
||||
///
|
||||
/// Do note that this write **doesn't gurantee immediate completion** as the underlying
|
||||
/// stream might use buffering. So, the best idea would be to use to use the `flush()`
|
||||
/// call on the stream.
|
||||
pub trait Writable {
|
||||
/*
|
||||
HACK(@ohsayan): Since `async` is not supported in traits just yet, we will have to
|
||||
use explicit declarations for asynchoronous functions
|
||||
*/
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s>;
|
||||
}
|
||||
|
||||
pub trait IsConnection: std::marker::Sync + std::marker::Send {
|
||||
fn write_lowlevel<'s>(&'s mut self, bytes: &'s [u8]) -> FutureIoResult<'s>;
|
||||
}
|
||||
|
||||
impl<T> IsConnection for T
|
||||
where
|
||||
T: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
|
||||
{
|
||||
fn write_lowlevel<'s>(&'s mut self, bytes: &'s [u8]) -> FutureIoResult<'s> {
|
||||
Box::pin(self.write_all(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
/// A `BytesWrapper` object wraps around a `Bytes` object that might have been pulled
|
||||
/// from `Corestore`.
|
||||
///
|
||||
/// This wrapper exists to prevent trait implementation conflicts when
|
||||
/// an impl for `fmt::Display` may be implemented upstream
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct BytesWrapper(pub Bytes);
|
||||
|
||||
impl BytesWrapper {
|
||||
pub fn finish_into_bytes(self) -> Bytes {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct StringWrapper(pub String);
|
||||
|
||||
impl Writable for StringWrapper {
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
|
||||
Box::pin(async move {
|
||||
con.write_lowlevel(&[TSYMBOL_UNICODE_STRING]).await?;
|
||||
// Now get the size of the Bytes object as bytes
|
||||
let size = Integer64::from(self.0.len());
|
||||
// Write this to the stream
|
||||
con.write_lowlevel(&size).await?;
|
||||
// Now write a LF character
|
||||
con.write_lowlevel(&[b'\n']).await?;
|
||||
// Now write the REAL bytes (of the object)
|
||||
con.write_lowlevel(self.0.as_bytes()).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Writable for Vec<u8> {
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
|
||||
Box::pin(async move { con.write_lowlevel(&self).await })
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> Writable for [u8; N] {
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
|
||||
Box::pin(async move { con.write_lowlevel(&self).await })
|
||||
}
|
||||
}
|
||||
|
||||
impl Writable for &'static [u8] {
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
|
||||
Box::pin(async move { con.write_lowlevel(self).await })
|
||||
}
|
||||
}
|
||||
|
||||
impl Writable for &'static str {
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
|
||||
Box::pin(async move {
|
||||
// First write a `+` character to the stream since this is a
|
||||
// string (we represent `String`s as `Byte` objects internally)
|
||||
// and since `Bytes` are effectively `String`s we will append the
|
||||
// type operator `+` to the stream
|
||||
con.write_lowlevel(&[TSYMBOL_UNICODE_STRING]).await?;
|
||||
// Now get the size of the Bytes object as bytes
|
||||
let size = Integer64::from(self.len());
|
||||
// Write this to the stream
|
||||
con.write_lowlevel(&size).await?;
|
||||
// Now write a LF character
|
||||
con.write_lowlevel(&[b'\n']).await?;
|
||||
// Now write the REAL bytes (of the object)
|
||||
con.write_lowlevel(self.as_bytes()).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Writable for BytesWrapper {
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
|
||||
Box::pin(async move {
|
||||
// First write a `+` character to the stream since this is a
|
||||
// string (we represent `String`s as `Byte` objects internally)
|
||||
// and since `Bytes` are effectively `String`s we will append the
|
||||
// type operator `+` to the stream
|
||||
let bytes = self.finish_into_bytes();
|
||||
con.write_lowlevel(&[TSYMBOL_UNICODE_STRING]).await?;
|
||||
// Now get the size of the Bytes object as bytes
|
||||
let size = Integer64::from(bytes.len());
|
||||
// Write this to the stream
|
||||
con.write_lowlevel(&size).await?;
|
||||
// Now write a LF character
|
||||
con.write_lowlevel(&[b'\n']).await?;
|
||||
// Now write the REAL bytes (of the object)
|
||||
con.write_lowlevel(&bytes).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Writable for usize {
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
|
||||
Box::pin(async move {
|
||||
con.write_lowlevel(b":").await?;
|
||||
let usize_bytes = Integer64::from(self);
|
||||
con.write_lowlevel(&usize_bytes).await?;
|
||||
con.write_lowlevel(b"\n").await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Writable for u64 {
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
|
||||
Box::pin(async move {
|
||||
con.write_lowlevel(b":").await?;
|
||||
let usize_bytes = Integer64::from(self);
|
||||
con.write_lowlevel(&usize_bytes).await?;
|
||||
con.write_lowlevel(b"\n").await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Writable for ObjectID {
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
|
||||
Box::pin(async move {
|
||||
// First write a `+` character to the stream since this is a
|
||||
// string (we represent `String`s as `Byte` objects internally)
|
||||
// and since `Bytes` are effectively `String`s we will append the
|
||||
// type operator `+` to the stream
|
||||
con.write_lowlevel(&[TSYMBOL_UNICODE_STRING]).await?;
|
||||
// Now get the size of the Bytes object as bytes
|
||||
let size = Integer64::from(self.len());
|
||||
// Write this to the stream
|
||||
con.write_lowlevel(&size).await?;
|
||||
// Now write a LF character
|
||||
con.write_lowlevel(&[b'\n']).await?;
|
||||
// Now write the REAL bytes (of the object)
|
||||
con.write_lowlevel(&self).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Writable for f32 {
|
||||
fn write<'s>(self, con: &'s mut impl IsConnection) -> FutureIoResult<'s> {
|
||||
Box::pin(async move {
|
||||
let payload = self.to_string();
|
||||
con.write_lowlevel(&[TSYMBOL_FLOAT]).await?;
|
||||
con.write_lowlevel(payload.as_bytes()).await?;
|
||||
con.write_lowlevel(&[b'\n']).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
@ -1,225 +0,0 @@
|
||||
/*
|
||||
* Created on Thu Aug 12 2021
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use crate::corestore::buffers::Integer64;
|
||||
use crate::corestore::Data;
|
||||
use crate::dbnet::connection::ProtocolConnectionExt;
|
||||
use crate::protocol::responses::groups;
|
||||
use crate::IoResult;
|
||||
use core::marker::PhantomData;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
/// Write a raw mono group with a custom tsymbol
|
||||
pub async unsafe fn write_raw_mono<T, Strm>(
|
||||
con: &mut T,
|
||||
tsymbol: u8,
|
||||
payload: &Data,
|
||||
) -> IoResult<()>
|
||||
where
|
||||
T: ProtocolConnectionExt<Strm>,
|
||||
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
|
||||
{
|
||||
let raw_stream = con.raw_stream();
|
||||
raw_stream.write_all(&[tsymbol; 1]).await?; // first write tsymbol
|
||||
let bytes = Integer64::from(payload.len());
|
||||
raw_stream.write_all(&bytes).await?; // then len
|
||||
raw_stream.write_all(&[b'\n']).await?; // LF
|
||||
raw_stream.write_all(payload).await?; // payload
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// A writer for a flat array, which is a multi-typed non-recursive array
|
||||
pub struct FlatArrayWriter<'a, T, Strm> {
|
||||
tsymbol: u8,
|
||||
con: &'a mut T,
|
||||
_owned: PhantomData<Strm>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // TODO(@ohsayan): Remove this once we start using the flat array writer
|
||||
impl<'a, T, Strm> FlatArrayWriter<'a, T, Strm>
|
||||
where
|
||||
T: ProtocolConnectionExt<Strm>,
|
||||
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
|
||||
{
|
||||
/// Intialize a new flat array writer. This will write out the tsymbol
|
||||
/// and length for the flat array
|
||||
pub async unsafe fn new(
|
||||
con: &'a mut T,
|
||||
tsymbol: u8,
|
||||
len: usize,
|
||||
) -> IoResult<FlatArrayWriter<'a, T, Strm>> {
|
||||
{
|
||||
let stream = con.raw_stream();
|
||||
// first write _
|
||||
stream.write_all(&[b'_']).await?;
|
||||
let bytes = Integer64::from(len);
|
||||
// now write len
|
||||
stream.write_all(&bytes).await?;
|
||||
// first LF
|
||||
stream.write_all(&[b'\n']).await?;
|
||||
}
|
||||
Ok(Self {
|
||||
con,
|
||||
tsymbol,
|
||||
_owned: PhantomData,
|
||||
})
|
||||
}
|
||||
/// Write an element
|
||||
pub async fn write_element(&mut self, bytes: impl AsRef<[u8]>) -> IoResult<()> {
|
||||
let stream = unsafe { self.con.raw_stream() };
|
||||
let bytes = bytes.as_ref();
|
||||
// first write <tsymbol>
|
||||
stream.write_all(&[self.tsymbol]).await?;
|
||||
// now len
|
||||
let len = Integer64::from(bytes.len());
|
||||
stream.write_all(&len).await?;
|
||||
// now LF
|
||||
stream.write_all(&[b'\n']).await?;
|
||||
// now element
|
||||
stream.write_all(bytes).await?;
|
||||
Ok(())
|
||||
}
|
||||
/// Write the NIL response code
|
||||
pub async fn write_nil(&mut self) -> IoResult<()> {
|
||||
let stream = unsafe { self.con.raw_stream() };
|
||||
stream.write_all(groups::NIL).await?;
|
||||
Ok(())
|
||||
}
|
||||
/// Write the SERVER_ERR (5) response code
|
||||
pub async fn write_server_error(&mut self) -> IoResult<()> {
|
||||
let stream = unsafe { self.con.raw_stream() };
|
||||
stream.write_all(groups::NIL).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// A writer for a typed array, which is a singly-typed array which either
|
||||
/// has a typed element or a `NULL`
|
||||
pub struct TypedArrayWriter<'a, T, Strm> {
|
||||
con: &'a mut T,
|
||||
_owned: PhantomData<Strm>,
|
||||
}
|
||||
|
||||
impl<'a, T, Strm> TypedArrayWriter<'a, T, Strm>
|
||||
where
|
||||
T: ProtocolConnectionExt<Strm>,
|
||||
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
|
||||
{
|
||||
/// Create a new `typedarraywriter`. This will write the tsymbol and
|
||||
/// the array length
|
||||
pub async unsafe fn new(
|
||||
con: &'a mut T,
|
||||
tsymbol: u8,
|
||||
len: usize,
|
||||
) -> IoResult<TypedArrayWriter<'a, T, Strm>> {
|
||||
{
|
||||
let stream = con.raw_stream();
|
||||
// first write @<tsymbol>
|
||||
stream.write_all(&[b'@', tsymbol]).await?;
|
||||
let bytes = Integer64::from(len);
|
||||
// now write len
|
||||
stream.write_all(&bytes).await?;
|
||||
// first LF
|
||||
stream.write_all(&[b'\n']).await?;
|
||||
}
|
||||
Ok(Self {
|
||||
con,
|
||||
_owned: PhantomData,
|
||||
})
|
||||
}
|
||||
/// Write an element
|
||||
pub async fn write_element(&mut self, bytes: impl AsRef<[u8]>) -> IoResult<()> {
|
||||
let stream = unsafe { self.con.raw_stream() };
|
||||
let bytes = bytes.as_ref();
|
||||
// write len
|
||||
let len = Integer64::from(bytes.len());
|
||||
stream.write_all(&len).await?;
|
||||
// now LF
|
||||
stream.write_all(&[b'\n']).await?;
|
||||
// now element
|
||||
stream.write_all(bytes).await?;
|
||||
Ok(())
|
||||
}
|
||||
/// Write a null
|
||||
pub async fn write_null(&mut self) -> IoResult<()> {
|
||||
let stream = unsafe { self.con.raw_stream() };
|
||||
stream.write_all(&[b'\0']).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// A writer for a non-null typed array, which is a singly-typed array which either
|
||||
/// has a typed element or a `NULL`
|
||||
pub struct NonNullArrayWriter<'a, T, Strm> {
|
||||
con: &'a mut T,
|
||||
_owned: PhantomData<Strm>,
|
||||
}
|
||||
|
||||
impl<'a, T, Strm> NonNullArrayWriter<'a, T, Strm>
|
||||
where
|
||||
T: ProtocolConnectionExt<Strm>,
|
||||
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
|
||||
{
|
||||
/// Create a new `typedarraywriter`. This will write the tsymbol and
|
||||
/// the array length
|
||||
pub async unsafe fn new(
|
||||
con: &'a mut T,
|
||||
tsymbol: u8,
|
||||
len: usize,
|
||||
) -> IoResult<NonNullArrayWriter<'a, T, Strm>> {
|
||||
{
|
||||
let stream = con.raw_stream();
|
||||
// first write @<tsymbol>
|
||||
stream.write_all(&[b'^', tsymbol]).await?;
|
||||
let bytes = Integer64::from(len);
|
||||
// now write len
|
||||
stream.write_all(&bytes).await?;
|
||||
// first LF
|
||||
stream.write_all(&[b'\n']).await?;
|
||||
}
|
||||
Ok(Self {
|
||||
con,
|
||||
_owned: PhantomData,
|
||||
})
|
||||
}
|
||||
/// Write an element
|
||||
pub async fn write_element(&mut self, bytes: impl AsRef<[u8]>) -> IoResult<()> {
|
||||
let stream = unsafe { self.con.raw_stream() };
|
||||
let bytes = bytes.as_ref();
|
||||
// write len
|
||||
let len = Integer64::from(bytes.len());
|
||||
stream.write_all(&len).await?;
|
||||
// now LF
|
||||
stream.write_all(&[b'\n']).await?;
|
||||
// now element
|
||||
stream.write_all(bytes).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue