Simplify action error propagation

next
Sayan Nandan 3 years ago
parent b59243eec1
commit 4c46ff3c2b
No known key found for this signature in database
GPG Key ID: 8BC07A0A4D41DD52

@ -29,7 +29,7 @@ use crate::dbnet::connection::prelude::*;
action!(
/// Returns the number of keys in the database
fn dbsize(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, gt 1);
ensure_length(act.len(), |len| len < 2)?;
if act.is_empty() {
let len = get_tbl!(handle, con).count();
con.write_response(len).await?;

@ -38,7 +38,7 @@ action!(
/// Do note that this function is blocking since it acquires a write lock.
/// It will write an entire datagroup, for this `del` action
fn del(handle: &Corestore, con: &'a mut T, act: ActionIter<'a>) {
err_if_len_is!(act, con, eq 0);
ensure_length(act.len(), |size| size != 0)?;
let table = get_tbl!(handle, con);
macro_rules! remove {
($engine:expr) => {{
@ -57,12 +57,12 @@ action!(
}
}
if let Some(done_howmany) = done_howmany {
con.write_response(done_howmany).await
con.write_response(done_howmany).await?;
} else {
con.write_response(responses::groups::SERVER_ERR).await
con.write_response(responses::groups::SERVER_ERR).await?;
}
} else {
compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))
compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?;
}
}};
}
@ -74,7 +74,8 @@ action!(
remove!(kvlmap)
}
#[allow(unreachable_patterns)]
_ => conwrite!(con, groups::WRONG_MODEL),
_ => conwrite!(con, groups::WRONG_MODEL)?,
}
Ok(())
}
);

@ -36,7 +36,7 @@ use crate::util::compiler;
action!(
/// Run an `EXISTS` query
fn exists(handle: &Corestore, con: &'a mut T, act: ActionIter<'a>) {
err_if_len_is!(act, con, eq 0);
ensure_length(act.len(), |len| len != 0)?;
let mut how_many_of_them_exist = 0usize;
macro_rules! exists {
($engine:expr) => {{

@ -30,7 +30,7 @@ use crate::queryengine::ActionIter;
action!(
/// Delete all the keys in the database
fn flushdb(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, gt 1);
ensure_length(act.len(), |len| len < 2)?;
if registry::state_okay() {
if act.is_empty() {
// flush the current table

@ -34,8 +34,8 @@ use crate::util::compiler;
action!(
/// Run a `GET` query
fn get(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 1);
let kve = kve!(con, handle);
ensure_length(act.len(), |len| len == 0)?;
let kve = handle.get_table_with::<KVE>()?;
unsafe {
match kve.get_cloned_with_tsymbol(act.next_unchecked()) {
Ok((Some(val), tsymbol)) => writer::write_raw_mono(con, tsymbol, &val).await?,

@ -1,91 +0,0 @@
/*
* Created on Mon Aug 31 2020
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2020, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#![allow(dead_code)]
//! #`JGET` queries
//! Functions for handling `JGET` queries
use crate::dbnet::connection::prelude::*;
action! {
/// Run a `JGET` query
/// This returns a JSON key/value pair of keys and values
/// We need to write something like
/// ```json
/// &1\n
/// $15\n
/// {"key":"value"}\n
/// ```
///
fn jget(_handle: &crate::corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
err_if_len_is!(act, con, not 1);
todo!()
}
}
mod json {
use crate::util::Unwrappable;
use bytes::Bytes;
pub struct BuiltJSON(Vec<u8>);
pub struct JSONBlob(Vec<u8>);
impl JSONBlob {
pub fn new(size: usize) -> Self {
let mut jblob = Vec::with_capacity(1 + size);
jblob.push(b'{');
JSONBlob(jblob)
}
pub fn insert(&mut self, key: &str, value: Option<&Bytes>) {
self.0.push(b'"');
self.0.extend(key.as_bytes());
self.0.extend(b"\":");
if let Some(value) = value {
self.0.push(b'"');
self.0.extend(value);
self.0.push(b'"');
} else {
self.0.extend(b"null");
}
self.0.push(b',');
}
pub fn finish(mut self) -> BuiltJSON {
*unsafe {
// UNSAFE(@ohsayan): There will always be a value corresponding to last_mut
self.0.last_mut().unsafe_unwrap()
} = b'}';
BuiltJSON(self.0)
}
}
#[test]
fn test_buildjson() {
let mut jblob = JSONBlob::new(128);
jblob.insert(&"key".to_owned(), Some(&Bytes::from("value".as_bytes())));
jblob.insert(&"key2".to_owned(), None);
assert_eq!(
"{\"key\":\"value\",\"key2\":null}",
String::from_utf8_lossy(&jblob.finish().0)
);
}
}

@ -31,9 +31,9 @@ action!(
///
/// At this moment, `keylen` only supports a single key
fn keylen(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |len| len == 1)?;
let res: Option<usize> = {
let reader = kve!(con, handle);
let reader = handle.get_table_with::<KVE>()?;
unsafe {
// UNSAFE(@ohsayan): this is completely safe as we've already checked
// the number of arguments is one

@ -24,7 +24,6 @@
*
*/
use crate::corestore::table::DataModel;
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::resp::writer;
@ -68,9 +67,8 @@ action! {
/// - `LGET <mylist> LAST` will return the last item
/// if it exists
fn lget(handle: &Corestore, con: &mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, lt 1);
let table = get_tbl!(handle, con);
let listmap = listmap!(table, con);
ensure_length(act.len(), |len| len > 1)?;
let listmap = handle.get_table_with::<KVEList>()?;
// get the list name
let listname = unsafe { act.next_unchecked() };
// now let us see what we need to do
@ -78,7 +76,7 @@ action! {
() => {
match unsafe { String::from_utf8_lossy(act.next_unchecked()) }.parse::<usize>() {
Ok(int) => int,
Err(_) => return conwrite!(con, groups::WRONGTYPE_ERR),
Err(_) => return util::err(groups::WRONGTYPE_ERR),
}
};
}
@ -95,7 +93,7 @@ action! {
Some(subaction) => {
match subaction.as_ref() {
LEN => {
err_if_len_is!(act, con, not 0);
ensure_length(act.len(), |len| len == 0)?;
if let Some(len) = listmap.len_of(listname) {
conwrite!(con, len)?;
} else {
@ -103,7 +101,7 @@ action! {
}
}
LIMIT => {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |len| len == 1)?;
let count = get_numeric_count!();
let items = if let Some(keys) = listmap.get_cloned(listname, count) {
keys
@ -113,7 +111,7 @@ action! {
writelist!(con, listmap, items);
}
VALUEAT => {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |len| len == 1)?;
let idx = get_numeric_count!();
let maybe_value = listmap.get(listname).map(|list| {
let readlist = list.read();
@ -139,7 +137,7 @@ action! {
}
}
LAST => {
err_if_len_is!(act, con, not 0);
ensure_length(act.len(), |len| len == 0)?;
let maybe_value = listmap.get(listname).map(|list| {
list.read().last().cloned()
});
@ -154,7 +152,7 @@ action! {
}
}
FIRST => {
err_if_len_is!(act, con, not 0);
ensure_length(act.len(), |len| len == 0)?;
let maybe_value = listmap.get(listname).map(|list| {
list.read().first().cloned()
});
@ -173,13 +171,13 @@ action! {
Some(start) => {
let start: usize = match start.parse() {
Ok(v) => v,
Err(_) => return conwrite!(con, groups::WRONGTYPE_ERR),
Err(_) => return util::err(groups::WRONGTYPE_ERR),
};
let mut range = Range::new(start);
if let Some(stop) = act.next_string_owned() {
let stop: usize = match stop.parse() {
Ok(v) => v,
Err(_) => return conwrite!(con, groups::WRONGTYPE_ERR),
Err(_) => return util::err(groups::WRONGTYPE_ERR),
};
range.set_stop(stop);
};

@ -25,7 +25,6 @@
*/
use super::{writer, OKAY_BADIDX_NIL_NLUT};
use crate::corestore::table::DataModel;
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::kvengine::encoding::ENCODING_LUT;
@ -47,9 +46,8 @@ action! {
/// - `LMOD <mylist> remove <index>`
/// - `LMOD <mylist> clear`
fn lmod(handle: &Corestore, con: &mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, lt 2);
let table = get_tbl!(handle, con);
let listmap = listmap!(table, con);
ensure_length(act.len(), |len| len > 2)?;
let listmap = handle.get_table_with::<KVEList>()?;
// get the list name
let listname = unsafe { act.next_unchecked() };
macro_rules! get_numeric_count {
@ -63,7 +61,7 @@ action! {
// now let us see what we need to do
match unsafe { act.next_uppercase_unchecked() }.as_ref() {
CLEAR => {
err_if_len_is!(act, con, not 0);
ensure_length(act.len(), |len| len == 0)?;
let list = match listmap.kve_inner_ref().get(listname) {
Some(l) => l,
_ => return conwrite!(con, groups::NIL),
@ -77,7 +75,7 @@ action! {
conwrite!(con, okay)?;
}
PUSH => {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |len| len == 1)?;
let list = match listmap.kve_inner_ref().get(listname) {
Some(l) => l,
_ => return conwrite!(con, groups::NIL),
@ -99,7 +97,7 @@ action! {
conwrite!(con, ret)?;
}
REMOVE => {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |len| len == 1)?;
let idx_to_remove = get_numeric_count!();
if registry::state_okay() {
let maybe_value = listmap.kve_inner_ref().get(listname).map(|list| {
@ -117,7 +115,7 @@ action! {
}
}
INSERT => {
err_if_len_is!(act, con, not 2);
ensure_length(act.len(), |len| len == 2)?;
let idx_to_insert_at = get_numeric_count!();
let bts = unsafe { act.next_unchecked() };
let ret = if compiler::likely(ENCODING_LUT[listmap.kve_payload_encoded()](bts)) {
@ -146,7 +144,7 @@ action! {
conwrite!(con, ret)?;
}
POP => {
err_if_len_is!(act, con, gt 1);
ensure_length(act.len(), |len| len < 2)?;
let idx = if act.len() == 1 {
// we have an idx
Some(get_numeric_count!())

@ -24,15 +24,6 @@
*
*/
macro_rules! listmap {
($tbl:expr, $con:expr) => {
match $tbl.get_model_ref() {
DataModel::KVExtListmap(lm) => lm,
_ => return conwrite!($con, groups::WRONG_MODEL),
}
};
}
macro_rules! writelist {
($con:expr, $listmap:expr, $items:expr) => {
let mut typed_array_writer =

@ -32,7 +32,6 @@ pub mod lmod;
use crate::corestore::booltable::BytesBoolTable;
use crate::corestore::booltable::BytesNicheLUT;
use crate::corestore::table::DataModel;
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::kvengine::listmap::LockedVec;
@ -47,9 +46,8 @@ action! {
/// Handle an `LSET` query for the list model
/// Syntax: `LSET <listname> <values ...>`
fn lset(handle: &Corestore, con: &mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, lt 1);
let table = get_tbl!(handle, con);
let listmap = listmap!(table, con);
ensure_length(act.len(), |len| len > 1)?;
let listmap = handle.get_table_with::<KVEList>()?;
let listname = unsafe { act.next_unchecked_bytes() };
let list = listmap.kve_inner_ref();
if registry::state_okay() {

@ -35,7 +35,7 @@ const DEFAULT_COUNT: usize = 10;
action!(
/// Run an `LSKEYS` query
fn lskeys(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, gt 3);
ensure_length(act.len(), |size| size < 4)?;
let (table, count) = if act.is_empty() {
(get_tbl!(handle, con), DEFAULT_COUNT)
} else if act.len() == 1 {
@ -46,7 +46,7 @@ action!(
let count = if let Ok(cnt) = String::from_utf8_lossy(nextret).parse::<usize>() {
cnt
} else {
return con.write_response(responses::groups::WRONGTYPE_ERR).await;
return util::err(groups::WRONGTYPE_ERR);
};
(get_tbl!(handle, con), count)
} else {
@ -62,7 +62,7 @@ action!(
let count = if let Ok(cnt) = String::from_utf8_lossy(count_ret).parse::<usize>() {
cnt
} else {
return con.write_response(responses::groups::WRONGTYPE_ERR).await;
return util::err(groups::WRONGTYPE_ERR);
};
(get_tbl!(entity, handle, con), count)
};

@ -46,92 +46,12 @@ macro_rules! is_lowbit_unset {
};
}
#[macro_export]
macro_rules! err_if_len_is {
($buf:ident, $con:ident, eq $len:literal) => {
if $buf.len() == $len {
return $con
.write_response(crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, not $len:literal) => {
if $buf.len() != $len {
return $con
.write_response(crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, gt $len:literal) => {
if $buf.len() > $len {
return $con
.write_response(crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, lt $len:literal) => {
if $buf.len() < $len {
return $con
.write_response(crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, gt_or_eq $len:literal) => {
if $buf.len() >= $len {
return $con
.write_response(crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, lt_or_eq $len:literal) => {
if $buf.len() <= $len {
return $con
.write_response(crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($con:ident, $expr:expr) => {
if $expr {
return $con
.write_response(crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
}
#[macro_export]
macro_rules! kve {
($con:expr, $store:expr) => {
match $store.get_kvstore() {
Ok(store) => store,
_ => {
// wrong model
return $con
.write_response(crate::protocol::responses::groups::WRONG_MODEL)
.await;
}
}
};
}
#[macro_export]
macro_rules! default_keyspace {
($store:expr, $con:expr) => {
match $store.get_keyspace() {
Ok(ks) => ks,
Err(_) => {
return $con
.write_response(crate::protocol::responses::groups::DEFAULT_UNSET)
.await;
}
}
};
}
#[macro_export]
macro_rules! conwrite {
($con:expr, $what:expr) => {
$con.write_response($what).await
$con.write_response($what)
.await
.map_err(|e| crate::actions::ActionError::IoError(e))
};
}

@ -34,8 +34,8 @@ action!(
/// Run an `MGET` query
///
fn mget(handle: &crate::corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
crate::err_if_len_is!(act, con, eq 0);
let kve = kve!(con, handle);
ensure_length(act.len(), |size| size != 0)?;
let kve = handle.get_table_with::<KVE>()?;
let encoding_is_okay = ENCODING_LUT_ITER[kve.kve_key_encoded()](act.as_ref());
if compiler::likely(encoding_is_okay) {
let mut writer = unsafe {

@ -37,7 +37,6 @@ pub mod del;
pub mod exists;
pub mod flushdb;
pub mod get;
pub mod jget;
pub mod keylen;
pub mod lists;
pub mod lskeys;
@ -51,6 +50,48 @@ pub mod strong;
pub mod update;
pub mod uset;
pub mod whereami;
use crate::protocol::responses::groups;
use crate::util;
use std::io::Error as IoError;
/// A generic result for actions
pub type ActionResult<T> = Result<T, ActionError>;
/// Errors that can occur while running actions
#[derive(Debug)]
pub enum ActionError {
ActionError(&'static [u8]),
IoError(std::io::Error),
}
impl From<&'static [u8]> for ActionError {
fn from(e: &'static [u8]) -> Self {
Self::ActionError(e)
}
}
impl From<IoError> for ActionError {
fn from(e: IoError) -> Self {
Self::IoError(e)
}
}
pub fn ensure_length(len: usize, is_valid: fn(usize) -> bool) -> ActionResult<()> {
if util::compiler::likely(is_valid(len)) {
Ok(())
} else {
util::err(groups::ACTION_ERR)
}
}
pub fn ensure_cond_or_err(cond: bool, err: &'static [u8]) -> ActionResult<()> {
if util::compiler::likely(cond) {
Ok(())
} else {
util::err(err)
}
}
pub mod heya {
//! Respond to `HEYA` queries
use crate::dbnet::connection::prelude::*;
@ -58,13 +99,14 @@ pub mod heya {
action!(
/// Returns a `HEY!` `Response`
fn heya(_handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, gt 1);
ensure_length(act.len(), |len| len == 0 || len == 1)?;
if act.len() == 1 {
let raw_byte = unsafe { act.next_unchecked_bytes() };
con.write_response(BytesWrapper(raw_byte)).await
con.write_response(BytesWrapper(raw_byte)).await?;
} else {
con.write_response(responses::groups::HEYA).await
con.write_response(responses::groups::HEYA).await?;
}
Ok(())
}
);
}

@ -35,9 +35,9 @@ use crate::util::compiler;
action!(
/// Run an MPOP action
fn mpop(handle: &corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
err_if_len_is!(act, con, eq 0);
ensure_length(act.len(), |len| len != 0)?;
if registry::state_okay() {
let kve = kve!(con, handle);
let kve = handle.get_table_with::<KVE>()?;
let encoding_is_okay = ENCODING_LUT_ITER[kve.needs_key_encoding()](act.as_ref());
if compiler::likely(encoding_is_okay) {
let mut writer = unsafe {
@ -56,7 +56,7 @@ action!(
}
} else {
// don't begin the operation at all if the database is poisoned
return con.write_response(responses::groups::SERVER_ERR).await;
con.write_response(responses::groups::SERVER_ERR).await?;
}
Ok(())
}

@ -33,13 +33,8 @@ action!(
/// Run an `MSET` query
fn mset(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
let howmany = act.len();
if is_lowbit_set!(howmany) || howmany == 0 {
// An odd number of arguments means that the number of keys
// is not the same as the number of values, we won't run this
// action at all
return con.write_response(responses::groups::ACTION_ERR).await;
}
let kve = kve!(con, handle);
ensure_length(howmany, |size| size & 1 == 0 && size != 0)?;
let kve = handle.get_table_with::<KVE>()?;
let encoding_is_okay = ENCODING_LUT_ITER_PAIR[kve.kve_tuple_encoding()](&act);
if compiler::likely(encoding_is_okay) {
let done_howmany: Option<usize>;
@ -55,12 +50,13 @@ action!(
done_howmany = None;
}
if let Some(done_howmany) = done_howmany {
return con.write_response(done_howmany as usize).await;
con.write_response(done_howmany as usize).await?;
} else {
return con.write_response(responses::groups::SERVER_ERR).await;
con.write_response(responses::groups::SERVER_ERR).await?;
}
} else {
compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))
compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?;
}
Ok(())
}
);

@ -33,13 +33,8 @@ action!(
/// Run an `MUPDATE` query
fn mupdate(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
let howmany = act.len();
if is_lowbit_set!(howmany) || howmany == 0 {
// An odd number of arguments means that the number of keys
// is not the same as the number of values, we won't run this
// action at all
return con.write_response(responses::groups::ACTION_ERR).await;
}
let kve = kve!(con, handle);
ensure_length(howmany, |size| size & 1 == 0 && size != 0)?;
let kve = handle.get_table_with::<KVE>()?;
let encoding_is_okay = ENCODING_LUT_ITER_PAIR[kve.kve_tuple_encoding()](&act);
let done_howmany: Option<usize>;
if compiler::likely(encoding_is_okay) {
@ -55,12 +50,13 @@ action!(
done_howmany = None;
}
if let Some(done_howmany) = done_howmany {
return con.write_response(done_howmany as usize).await;
con.write_response(done_howmany as usize).await?;
} else {
return con.write_response(responses::groups::SERVER_ERR).await;
con.write_response(responses::groups::SERVER_ERR).await?;
}
} else {
compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))
compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?;
}
Ok(())
}
);

@ -30,13 +30,13 @@ use crate::util::compiler;
action! {
fn pop(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |len| len == 1)?;
let key = unsafe {
// SAFETY: We have checked for there to be one arg
act.next_unchecked()
};
if registry::state_okay() {
let kve = kve!(con, handle);
let kve = handle.get_table_with::<KVE>()?;
let tsymbol = kve.get_vt();
match kve.pop(key) {
Ok(Some((_key, val))) => unsafe {

@ -39,10 +39,10 @@ const SET_NLUT: BytesNicheLUT =
action!(
/// Run a `SET` query
fn set(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 2);
ensure_length(act.len(), |len| len == 2)?;
if registry::state_okay() {
let did_we = {
let writer = kve!(con, handle);
let writer = handle.get_table_with::<KVE>()?;
match unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments

@ -36,8 +36,8 @@ action! {
/// This either returns `Okay` if all the keys were `del`eted, or it returns a
/// `Nil`, which is code `1`
fn sdel(handle: &crate::corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
err_if_len_is!(act, con, eq 0);
let kve = kve!(con, handle);
ensure_length(act.len(), |len| len != 0)?;
let kve = handle.get_table_with::<KVE>()?;
if registry::state_okay() {
// guarantee one check: consistency
let key_encoder = kve.get_key_encoder();

@ -38,10 +38,8 @@ action! {
/// `Overwrite Error` or code `2`
fn sset(handle: &crate::corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
let howmany = act.len();
if is_lowbit_set!(howmany) || howmany == 0 {
return con.write_response(responses::groups::ACTION_ERR).await;
}
let kve = kve!(con, handle);
ensure_length(howmany, |size| size & 1 == 0 && size != 0)?;
let kve = handle.get_table_with::<KVE>()?;
if registry::state_okay() {
let encoder = kve.get_encoder();
let outcome = {

@ -38,10 +38,8 @@ action! {
/// or code `1`
fn supdate(handle: &crate::corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
let howmany = act.len();
if is_lowbit_set!(howmany) || howmany == 0 {
return con.write_response(responses::groups::ACTION_ERR).await;
}
let kve = kve!(con, handle);
ensure_length(howmany, |size| size & 1 == 0 && size != 0)?;
let kve = handle.get_table_with::<KVE>()?;
if registry::state_okay() {
let encoder = kve.get_encoder();
let outcome = {

@ -38,10 +38,10 @@ const UPDATE_NLUT: BytesNicheLUT =
action!(
/// Run an `UPDATE` query
fn update(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 2);
ensure_length(act.len(), |len| len == 2)?;
if registry::state_okay() {
let did_we = {
let writer = kve!(con, handle);
let writer = handle.get_table_with::<KVE>()?;
match unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments

@ -27,7 +27,6 @@
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::kvengine::{encoding::ENCODING_LUT_ITER_PAIR, KVTable};
use crate::protocol::responses;
use crate::queryengine::ActionIter;
use crate::util::compiler;
@ -37,25 +36,21 @@ action!(
/// This is like "INSERT or UPDATE"
fn uset(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
let howmany = act.len();
if is_lowbit_set!(howmany) || howmany == 0 {
// An odd number of arguments means that the number of keys
// is not the same as the number of values, we won't run this
// action at all
return con.write_response(responses::groups::ACTION_ERR).await;
}
let kve = kve!(con, handle);
ensure_length(howmany, |size| size & 1 == 0 && size != 0)?;
let kve = handle.get_table_with::<KVE>()?;
let encoding_is_okay = ENCODING_LUT_ITER_PAIR[kve.kve_tuple_encoding()](&act);
if compiler::likely(encoding_is_okay) {
if registry::state_okay() {
while let (Some(key), Some(val)) = (act.next(), act.next()) {
kve.upsert_unchecked(Data::copy_from_slice(key), Data::copy_from_slice(val));
}
conwrite!(con, howmany / 2)
conwrite!(con, howmany / 2)?;
} else {
conwrite!(con, groups::SERVER_ERR)
conwrite!(con, groups::SERVER_ERR)?;
}
} else {
compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))
compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?;
}
Ok(())
}
);

@ -28,8 +28,8 @@ use crate::dbnet::connection::prelude::*;
use crate::resp::writer::NonNullArrayWriter;
action! {
fn whereami(store: &Corestore, con: &mut T, iter: ActionIter<'a>) {
err_if_len_is!(iter, con, not 0);
fn whereami(store: &Corestore, con: &mut T, act: ActionIter<'a>) {
ensure_length(act.len(), |len| len == 0)?;
match store.get_ids() {
(Some(ks), Some(tbl)) => {
let mut writer = unsafe { NonNullArrayWriter::new(con, b'+', 2).await? };

@ -24,14 +24,15 @@
*
*/
use crate::actions::ActionResult;
use crate::corestore::memstore::DdlError;
use crate::corestore::memstore::Keyspace;
use crate::corestore::memstore::Memstore;
use crate::corestore::memstore::ObjectID;
use crate::corestore::memstore::DEFAULT;
use crate::corestore::table::DescribeTable;
use crate::corestore::table::Table;
use crate::dbnet::connection::ProtocolConnectionExt;
use crate::kvengine::KVEngine;
use crate::protocol::Query;
use crate::queryengine;
use crate::registry;
@ -42,7 +43,6 @@ use crate::IoResult;
use core::borrow::Borrow;
use core::hash::Hash;
pub use htable::Data;
use libsky::TResult;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub mod array;
@ -240,20 +240,10 @@ impl Corestore {
pub fn get_ctable(&self) -> Option<Arc<Table>> {
self.estate.table.as_ref().map(|(_, tbl)| tbl.clone())
}
/// Get the key/value store
///
/// `Err`s are propagated if the target table has an incorrect table or if
/// the default table is unset
pub fn get_kvstore(&self) -> KeyspaceResult<&KVEngine> {
match &self.estate.table {
Some((_, tbl)) => match tbl.get_kvstore() {
Ok(kvs) => Ok(kvs),
_ => Err(DdlError::WrongModel),
},
None => Err(DdlError::DefaultNotFound),
}
/// Returns a table with the provided specification
pub fn get_table_with<T: DescribeTable>(&self) -> ActionResult<&T::Table> {
T::get(self)
}
/// Create a table: in-memory; **no transactional guarantees**. Two tables can be created
/// simultaneously, but are never flushed unless we are very lucky. If the global flush
/// system is close to a flush cycle -- then we are in luck: we pause the flush cycle
@ -375,7 +365,7 @@ impl Corestore {
&mut self,
query: Query,
con: &mut T,
) -> TResult<()>
) -> ActionResult<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,

@ -24,13 +24,60 @@
*
*/
use crate::actions::ActionResult;
use crate::corestore::htable::Coremap;
use crate::corestore::memstore::DdlError;
use crate::corestore::Data;
use crate::corestore::KeyspaceResult;
#[cfg(test)]
use crate::corestore::{memstore::DdlError, KeyspaceResult};
use crate::dbnet::connection::prelude::Corestore;
use crate::kvengine::listmap::LockedVec;
use crate::kvengine::KVTable;
use crate::kvengine::{listmap::KVEListMap, KVEngine};
use crate::protocol::responses::groups;
use crate::util;
pub trait DescribeTable {
type Table;
fn try_get(table: &Table) -> Option<&Self::Table>;
fn get(store: &Corestore) -> ActionResult<&Self::Table> {
match store.estate.table {
Some((_, ref table)) => {
// so we do have a table
match Self::try_get(&table) {
Some(tbl) => Ok(tbl),
None => util::err(groups::WRONG_MODEL),
}
}
_ => util::err(groups::DEFAULT_UNSET),
}
}
}
pub struct KVE;
impl DescribeTable for KVE {
type Table = KVEngine;
fn try_get(table: &Table) -> Option<&Self::Table> {
if let DataModel::KV(ref kve) = table.model_store {
Some(kve)
} else {
None
}
}
}
pub struct KVEList;
impl DescribeTable for KVEList {
type Table = KVEListMap;
fn try_get(table: &Table) -> Option<&Self::Table> {
if let DataModel::KVExtListmap(ref kvl) = table.model_store {
Some(kvl)
} else {
None
}
}
}
#[derive(Debug)]
pub enum DataModel {
@ -65,6 +112,7 @@ impl Table {
}
}
/// Get the key/value store if the table is a key/value store
#[cfg(test)]
pub const fn get_kvstore(&self) -> KeyspaceResult<&KVEngine> {
#[allow(irrefutable_let_patterns)]
if let DataModel::KV(kvs) = &self.model_store {

@ -36,6 +36,7 @@
//! respones in compliance with the Skyhash protocol.
use super::tcp::Connection;
use crate::actions::ActionError;
use crate::corestore::buffers::Integer64;
use crate::corestore::Corestore;
use crate::dbnet::tcp::BufferedSocketStream;
@ -76,20 +77,19 @@ pub mod prelude {
//!
//! This module is hollow itself, it only re-exports from `dbnet::con` and `tokio::io`
pub use super::ProtocolConnectionExt;
pub use crate::actions::{ensure_cond_or_err, ensure_length};
pub use crate::aerr;
pub use crate::conwrite;
pub use crate::corestore::table::{KVEList, KVE};
pub use crate::corestore::Corestore;
pub use crate::default_keyspace;
pub use crate::err_if_len_is;
pub use crate::get_tbl;
pub use crate::handle_entity;
pub use crate::is_lowbit_set;
pub use crate::kve;
pub use crate::protocol::responses;
pub use crate::protocol::responses::groups;
pub use crate::queryengine::ActionIter;
pub use crate::registry;
pub use crate::util::Unwrappable;
pub use crate::util::{self, Unwrappable};
pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
}
@ -430,7 +430,15 @@ where
};
match try_df {
Ok(QueryResult::Q((query, advance_by))) => {
self.db.execute_query(query, &mut self.con).await?;
match self.db.execute_query(query, &mut self.con).await {
Ok(()) => {}
Err(ActionError::ActionError(e)) => {
self.con.close_conn_with_error(e).await?;
}
Err(ActionError::IoError(e)) => {
return Err(e.into());
}
}
self.con.advance_buffer(advance_by);
}
Ok(QueryResult::E(r)) => self.con.close_conn_with_error(r).await?,

@ -42,7 +42,7 @@ use libsky::VERSION;
use std::env;
use std::process;
#[macro_use]
mod util;
pub mod util;
mod actions;
mod admin;
mod arbiter;

@ -43,7 +43,7 @@ action! {
/// like queries
fn create(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
// minlength is 2 (create has already been checked)
err_if_len_is!(act, con, lt 2);
ensure_length(act.len(), |size| size > 2)?;
let mut create_what = unsafe { act.next().unsafe_unwrap() }.to_vec();
create_what.make_ascii_uppercase();
match create_what.as_ref() {
@ -61,7 +61,7 @@ action! {
/// like queries
fn ddl_drop(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
// minlength is 2 (create has already been checked)
err_if_len_is!(act, con, lt 2);
ensure_length(act.len(), |size| size > 2)?;
let mut create_what = unsafe { act.next().unsafe_unwrap() }.to_vec();
create_what.make_ascii_uppercase();
match create_what.as_ref() {
@ -77,18 +77,12 @@ action! {
/// We should have `<tableid> <model>(args)`
fn create_table(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(con, act.len() > 3 || act.len() < 2);
let (table_entity, model_code) = match parser::parse_table_args(&mut act) {
Ok(v) => v,
Err(e) => return con.write_response(e).await,
};
ensure_length(act.len(), |size| size > 2 && size < 3)?;
let (table_entity, model_code) = parser::parse_table_args(&mut act)?;
let is_volatile = match act.next() {
Some(maybe_volatile) => {
if maybe_volatile.eq(VOLATILE) {
true
} else {
return conwrite!(con, responses::groups::UNKNOWN_PROPERTY);
}
ensure_cond_or_err(maybe_volatile.eq(VOLATILE), responses::groups::UNKNOWN_PROPERTY)?;
true
}
None => false,
};
@ -119,27 +113,19 @@ action! {
/// We should have `<ksid>`
fn create_keyspace(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |len| len == 1)?;
match act.next() {
Some(ksid) => {
if !encoding::is_utf8(&ksid) {
return con.write_response(responses::groups::ENCODING_ERROR).await;
}
ensure_cond_or_err(encoding::is_utf8(&ksid), responses::groups::ENCODING_ERROR)?;
let ksid_str = unsafe { str::from_utf8_unchecked(ksid) };
if !VALID_CONTAINER_NAME.is_match(ksid_str) {
return con.write_response(responses::groups::BAD_EXPRESSION).await;
}
if ksid.len() > 64 {
return con
.write_response(responses::groups::CONTAINER_NAME_TOO_LONG)
.await;
}
ensure_cond_or_err(VALID_CONTAINER_NAME.is_match(ksid_str), responses::groups::BAD_EXPRESSION)?;
ensure_cond_or_err(ksid.len() < 64, responses::groups::CONTAINER_NAME_TOO_LONG)?;
let ksid = unsafe { ObjectID::from_slice(ksid_str) };
if registry::state_okay() {
match handle.create_keyspace(ksid) {
Ok(()) => return con.write_response(responses::groups::OKAY).await,
Ok(()) => con.write_response(responses::groups::OKAY).await?,
Err(DdlError::AlreadyExists) => {
return con.write_response(responses::groups::ALREADY_EXISTS).await
con.write_response(responses::groups::ALREADY_EXISTS).await?;
}
Err(_) => unsafe {
// we already know that Corestore::create_keyspace doesn't return anything else
@ -147,22 +133,20 @@ action! {
},
}
} else {
return conwrite!(con, responses::groups::SERVER_ERR);
conwrite!(con, responses::groups::SERVER_ERR)?;
}
}
None => return con.write_response(responses::groups::ACTION_ERR).await,
None => con.write_response(responses::groups::ACTION_ERR).await?,
}
Ok(())
}
/// Drop a table (`<tblid>` only)
fn drop_table(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |size| size == 1)?;
match act.next() {
Some(eg) => {
let entity_group = match parser::get_query_entity(eg) {
Ok(egroup) => egroup,
Err(e) => return con.write_response(e).await,
};
let entity_group = parser::get_query_entity(eg)?;
if registry::state_okay() {
let ret = match handle.drop_table(entity_group) {
Ok(()) => responses::groups::OKAY,
@ -187,16 +171,16 @@ action! {
/// Drop a keyspace (`<ksid>` only)
fn drop_keyspace(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |size| size == 1)?;
match act.next() {
Some(ksid) => {
if ksid.len() > 64 {
return con.write_response(responses::groups::CONTAINER_NAME_TOO_LONG).await;
}
ensure_cond_or_err(ksid.len() < 64, responses::groups::CONTAINER_NAME_TOO_LONG)?;
let force_remove = match act.next() {
Some(bts) if bts.eq(FORCE_REMOVE) => true,
None => false,
_ => return conwrite!(con, responses::groups::UNKNOWN_ACTION)
_ => {
return util::err(responses::groups::UNKNOWN_ACTION);
}
};
if registry::state_okay() {
let objid = unsafe {ObjectID::from_slice(ksid)};

@ -44,7 +44,7 @@ action! {
KEYSPACE => inspect_keyspace(handle, con, act).await?,
TABLE => inspect_table(handle, con, act).await?,
KEYSPACES => {
err_if_len_is!(act, con, not 0);
ensure_length(act.len(), |len| len == 0)?;
// let's return what all keyspaces exist
let ks_list: Vec<ObjectID> = handle
.get_store()
@ -69,7 +69,7 @@ action! {
/// INSPECT a keyspace. This should only have the keyspace ID
fn inspect_keyspace(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |len| len == 1)?;
match act.next() {
Some(keyspace_name) => {
let ksid = if keyspace_name.len() > 64 {
@ -96,7 +96,7 @@ action! {
/// INSPECT a table. This should only have the table ID
fn inspect_table(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |len| len == 1)?;
match act.next() {
Some(entity) => {
let entity = handle_entity!(con, entity);

@ -26,6 +26,7 @@
//! # The Query Engine
use crate::actions::ActionResult;
use crate::corestore::memstore::DdlError;
use crate::corestore::Corestore;
use crate::dbnet::connection::prelude::*;
@ -55,7 +56,7 @@ macro_rules! gen_constants_and_matches {
}
let first = match $buf.next_uppercase() {
Some(frst) => frst,
None => return $con.write_response(responses::groups::PACKET_ERR).await,
None => return util::err(groups::PACKET_ERR),
};
match first.as_ref() {
$(
@ -99,7 +100,7 @@ action! {
self::execute_stage(db, con, &buf.into_inner()).await
}
} else {
con.write_response(responses::groups::WRONGTYPE_ERR).await
util::err(groups::WRONGTYPE_ERR)
}
}
}
@ -108,7 +109,7 @@ async fn execute_stage<'a, T: 'a, Strm>(
db: &mut Corestore,
con: &'a mut T,
buf: &UnsafeElement,
) -> std::io::Result<()>
) -> ActionResult<()>
where
T: ProtocolConnectionExt<Strm> + Send + Sync,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
@ -174,7 +175,7 @@ where
action! {
/// Handle `use <entity>` like queries
fn entity_swap(handle: &mut Corestore, con: &mut T, mut act: ActionIter<'a>) {
err_if_len_is!(act, con, not 1);
ensure_length(act.len(), |len| len == 1)?;
let entity = unsafe {
// SAFETY: Already checked len
act.next_unchecked()

@ -120,7 +120,7 @@ macro_rules! action {
$block:block)*
) => {
$($(#[$attr])*
pub async fn $fname<'a, T: 'a + Send + Sync, Strm>($($argname: $argty,)*) -> std::io::Result<()>
pub async fn $fname<'a, T: 'a + Send + Sync, Strm>($($argname: $argty,)*) -> crate::actions::ActionResult<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
@ -132,7 +132,7 @@ macro_rules! action {
$block:block)*
) => {
$($(#[$attr])*
pub async fn $fname<'a, T: 'a + Send + Sync, Strm>($argone: $argonety, $argtwo: $argtwoty, mut $argthree: $argthreety) -> std::io::Result<()>
pub async fn $fname<'a, T: 'a + Send + Sync, Strm>($argone: $argonety, $argtwo: $argtwoty, mut $argthree: $argthreety) -> crate::actions::ActionResult<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,

@ -69,3 +69,8 @@ unsafe impl<T> Unwrappable<T> for Option<T> {
pub fn exit_error() -> ! {
process::exit(EXITCODE_ONE)
}
/// Returns a Result with the provided error
pub fn err<T, E>(e: impl Into<E>) -> Result<T, E> {
Err(e.into())
}

Loading…
Cancel
Save