diff --git a/CHANGELOG.md b/CHANGELOG.md index d0b48bfb..f9b2b130 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,18 @@ All changes in this project will be noted in this file. +## Version 0.8.0 + +### Additions + +- New protocol: Skyhash 2.0 + - Reduced bandwidth usage (as much as 50%) + - Even simpler client implementations +- Backward compatibility with Skyhash 1.0: + - Simply set the protocol version you want to use in the config file, env vars or pass it as a CLI + argument + - Even faster implementation, even for Skyhash 1.0 + ## Version 0.7.5 ### Additions diff --git a/server/src/actions/dbsize.rs b/server/src/actions/dbsize.rs index 8bbbb843..98ba2cf1 100644 --- a/server/src/actions/dbsize.rs +++ b/server/src/actions/dbsize.rs @@ -29,14 +29,15 @@ use crate::dbnet::connection::prelude::*; action!( /// Returns the number of keys in the database fn dbsize(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) { - ensure_length(act.len(), |len| len < 2)?; + ensure_length::
(act.len(), |len| len < 2)?; if act.is_empty() { let len = get_tbl_ref!(handle, con).count(); - con.write_response(len).await?; + con.write_usize(len).await?; } else { let raw_entity = unsafe { act.next().unsafe_unwrap() }; let entity = handle_entity!(con, raw_entity); - conwrite!(con, get_tbl!(entity, handle, con).count())?; + con.write_usize(get_tbl!(entity, handle, con).count()) + .await?; } Ok(()) } diff --git a/server/src/actions/del.rs b/server/src/actions/del.rs index 0e4c548d..e6edc277 100644 --- a/server/src/actions/del.rs +++ b/server/src/actions/del.rs @@ -38,7 +38,7 @@ action!( /// Do note that this function is blocking since it acquires a write lock. /// It will write an entire datagroup, for this `del` action fn del(handle: &Corestore, con: &'a mut T, act: ActionIter<'a>) { - ensure_length(act.len(), |size| size != 0)?; + ensure_length::
(act.len(), |size| size != 0)?; let table = get_tbl_ref!(handle, con); macro_rules! remove { ($engine:expr) => {{ @@ -57,12 +57,12 @@ action!( } } if let Some(done_howmany) = done_howmany { - con.write_response(done_howmany).await?; + con.write_usize(done_howmany).await?; } else { - con.write_response(responses::groups::SERVER_ERR).await?; + con._write_raw(P::RCODE_SERVER_ERR).await?; } } else { - compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?; + return util::err(P::RCODE_ENCODING_ERROR); } }}; } @@ -74,7 +74,7 @@ action!( remove!(kvlmap) } #[allow(unreachable_patterns)] - _ => conwrite!(con, groups::WRONG_MODEL)?, + _ => return util::err(P::RSTRING_WRONG_MODEL), } Ok(()) } diff --git a/server/src/actions/exists.rs b/server/src/actions/exists.rs index 4040278e..04e9a654 100644 --- a/server/src/actions/exists.rs +++ b/server/src/actions/exists.rs @@ -36,7 +36,7 @@ use crate::util::compiler; action!( /// Run an `EXISTS` query fn exists(handle: &Corestore, con: &'a mut T, act: ActionIter<'a>) { - ensure_length(act.len(), |len| len != 0)?; + ensure_length::
(act.len(), |len| len != 0)?; let mut how_many_of_them_exist = 0usize; macro_rules! exists { ($engine:expr) => {{ @@ -45,9 +45,9 @@ action!( act.for_each(|key| { how_many_of_them_exist += $engine.exists_unchecked(key) as usize; }); - conwrite!(con, how_many_of_them_exist)?; + con.write_usize(how_many_of_them_exist).await?; } else { - compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?; + return util::err(P::RCODE_ENCODING_ERROR); } }}; } @@ -56,7 +56,7 @@ action!( DataModel::KV(kve) => exists!(kve), DataModel::KVExtListmap(kve) => exists!(kve), #[allow(unreachable_patterns)] - _ => conwrite!(con, groups::WRONG_MODEL)?, + _ => return util::err(P::RSTRING_WRONG_MODEL), } Ok(()) } diff --git a/server/src/actions/flushdb.rs b/server/src/actions/flushdb.rs index 6cbe97ce..ef717334 100644 --- a/server/src/actions/flushdb.rs +++ b/server/src/actions/flushdb.rs @@ -30,7 +30,7 @@ use crate::queryengine::ActionIter; action!( /// Delete all the keys in the database fn flushdb(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) { - ensure_length(act.len(), |len| len < 2)?; + ensure_length::
(act.len(), |len| len < 2)?;
if registry::state_okay() {
if act.is_empty() {
// flush the current table
@@ -41,9 +41,9 @@ action!(
let entity = handle_entity!(con, raw_entity);
get_tbl!(entity, handle, con).truncate_table();
}
- conwrite!(con, responses::groups::OKAY)?;
+ con._write_raw(P::RCODE_OKAY).await?;
} else {
- conwrite!(con, responses::groups::SERVER_ERR)?;
+ con._write_raw(P::RCODE_SERVER_ERR).await?;
}
Ok(())
}
diff --git a/server/src/actions/get.rs b/server/src/actions/get.rs
index 1d512563..4a599e2c 100644
--- a/server/src/actions/get.rs
+++ b/server/src/actions/get.rs
@@ -28,19 +28,21 @@
//! This module provides functions to work with `GET` queries
use crate::dbnet::connection::prelude::*;
-use crate::resp::writer;
use crate::util::compiler;
action!(
/// Run a `GET` query
fn get(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
- ensure_length(act.len(), |len| len == 1)?;
- let kve = handle.get_table_with:: (act.len(), |len| len == 1)?;
+ let kve = handle.get_table_with:: ()?;
unsafe {
match kve.get_cloned(act.next_unchecked()) {
- Ok(Some(val)) => writer::write_raw_mono(con, kve.get_value_tsymbol(), &val).await?,
- Err(_) => compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?,
- Ok(_) => conwrite!(con, groups::NIL)?,
+ Ok(Some(val)) => {
+ con.write_mono_length_prefixed_with_tsymbol(&val, kve.get_value_tsymbol())
+ .await?
+ }
+ Err(_) => compiler::cold_err(con._write_raw(P::RCODE_ENCODING_ERROR)).await?,
+ Ok(_) => con._write_raw(P::RCODE_NIL).await?,
}
}
Ok(())
diff --git a/server/src/actions/keylen.rs b/server/src/actions/keylen.rs
index 5b875c35..41c3bd8e 100644
--- a/server/src/actions/keylen.rs
+++ b/server/src/actions/keylen.rs
@@ -31,9 +31,9 @@ action!(
///
/// At this moment, `keylen` only supports a single key
fn keylen(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
- ensure_length(act.len(), |len| len == 1)?;
+ ensure_length:: (act.len(), |len| len == 1)?;
let res: Option ()?;
unsafe {
// UNSAFE(@ohsayan): this is completely safe as we've already checked
// the number of arguments is one
@@ -45,10 +45,10 @@ action!(
};
if let Some(value) = res {
// Good, we got the key's length, write it off to the stream
- con.write_response(value).await?;
+ con.write_usize(value).await?;
} else {
// Ah, couldn't find that key
- con.write_response(responses::groups::NIL).await?;
+ con._write_raw(P::RCODE_NIL).await?;
}
Ok(())
}
diff --git a/server/src/actions/lists/lget.rs b/server/src/actions/lists/lget.rs
index ecfd3149..b98dde40 100644
--- a/server/src/actions/lists/lget.rs
+++ b/server/src/actions/lists/lget.rs
@@ -26,9 +26,6 @@
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
-use crate::resp::writer;
-use crate::resp::writer::TypedArrayWriter;
-
const LEN: &[u8] = "LEN".as_bytes();
const LIMIT: &[u8] = "LIMIT".as_bytes();
const VALUEAT: &[u8] = "VALUEAT".as_bytes();
@@ -66,8 +63,8 @@ action! {
/// - `LGET (act.len(), |len| len != 0)?;
+ let listmap = handle.get_table_with:: ()?;
// get the list name
let listname = unsafe { act.next_unchecked() };
// now let us see what we need to do
@@ -75,7 +72,7 @@ action! {
() => {
match unsafe { String::from_utf8_lossy(act.next_unchecked()) }.parse:: (act.len(), |len| len == 0)?;
match listmap.list_len(listname) {
- Ok(Some(len)) => conwrite!(con, len)?,
- Ok(None) => conwrite!(con, groups::NIL)?,
- Err(()) => conwrite!(con, groups::ENCODING_ERROR)?,
+ Ok(Some(len)) => con.write_usize(len).await?,
+ Ok(None) => return Err(P::RCODE_NIL.into()),
+ Err(()) => return Err(P::RCODE_ENCODING_ERROR.into()),
}
}
LIMIT => {
- ensure_length(act.len(), |len| len == 1)?;
+ ensure_length:: (act.len(), |len| len == 1)?;
let count = get_numeric_count!();
match listmap.list_cloned(listname, count) {
Ok(Some(items)) => writelist!(con, listmap, items),
- Ok(None) => conwrite!(con, groups::NIL)?,
- Err(()) => conwrite!(con, groups::ENCODING_ERROR)?,
+ Ok(None) => return Err(P::RCODE_NIL.into()),
+ Err(()) => return Err(P::RCODE_ENCODING_ERROR.into()),
}
}
VALUEAT => {
- ensure_length(act.len(), |len| len == 1)?;
+ ensure_length:: (act.len(), |len| len == 1)?;
let idx = get_numeric_count!();
let maybe_value = listmap.get(listname).map(|list| {
list.map(|lst| lst.read().get(idx).cloned())
@@ -117,58 +114,56 @@ action! {
match maybe_value {
Ok(v) => match v {
Some(Some(value)) => {
- unsafe {
- // tsymbol is verified
- writer::write_raw_mono(con, listmap.get_value_tsymbol(), &value)
- .await?;
- }
+ con.write_mono_length_prefixed_with_tsymbol(
+ &value, listmap.get_value_tsymbol()
+ ).await?;
}
Some(None) => {
// bad index
- conwrite!(con, groups::LISTMAP_BAD_INDEX)?;
+ return Err(P::RSTRING_LISTMAP_BAD_INDEX.into());
}
None => {
// not found
- conwrite!(con, groups::NIL)?;
+ return Err(P::RCODE_NIL.into());
}
}
- Err(()) => conwrite!(con, groups::ENCODING_ERROR)?,
+ Err(()) => return Err(P::RCODE_ENCODING_ERROR.into()),
}
}
LAST => {
- ensure_length(act.len(), |len| len == 0)?;
+ ensure_length:: (act.len(), |len| len == 0)?;
let maybe_value = listmap.get(listname).map(|list| {
list.map(|lst| lst.read().last().cloned())
});
match maybe_value {
Ok(v) => match v {
Some(Some(value)) => {
- unsafe {
- writer::write_raw_mono(con, listmap.get_value_tsymbol(), &value).await?;
- }
+ con.write_mono_length_prefixed_with_tsymbol(
+ &value, listmap.get_value_tsymbol()
+ ).await?;
},
- Some(None) => conwrite!(con, groups::LISTMAP_LIST_IS_EMPTY)?,
- None => conwrite!(con, groups::NIL)?,
+ Some(None) => return Err(P::RSTRING_LISTMAP_LIST_IS_EMPTY.into()),
+ None => return Err(P::RCODE_NIL.into()),
}
- Err(()) => conwrite!(con, groups::ENCODING_ERROR)?,
+ Err(()) => return Err(P::RCODE_ENCODING_ERROR.into()),
}
}
FIRST => {
- ensure_length(act.len(), |len| len == 0)?;
+ ensure_length:: (act.len(), |len| len == 0)?;
let maybe_value = listmap.get(listname).map(|list| {
list.map(|lst| lst.read().first().cloned())
});
match maybe_value {
Ok(v) => match v {
Some(Some(value)) => {
- unsafe {
- writer::write_raw_mono(con, listmap.get_value_tsymbol(), &value).await?;
- }
+ con.write_mono_length_prefixed_with_tsymbol(
+ &value, listmap.get_value_tsymbol()
+ ).await?;
},
- Some(None) => conwrite!(con, groups::LISTMAP_LIST_IS_EMPTY)?,
- None => conwrite!(con, groups::NIL)?,
+ Some(None) => return Err(P::RSTRING_LISTMAP_LIST_IS_EMPTY.into()),
+ None => return Err(P::RCODE_NIL.into()),
}
- Err(()) => conwrite!(con, groups::ENCODING_ERROR)?,
+ Err(()) => return Err(P::RCODE_ENCODING_ERROR.into()),
}
}
RANGE => {
@@ -176,13 +171,13 @@ action! {
Some(start) => {
let start: usize = match start.parse() {
Ok(v) => v,
- Err(_) => return util::err(groups::WRONGTYPE_ERR),
+ Err(_) => return util::err(P::RCODE_WRONGTYPE_ERR),
};
let mut range = Range::new(start);
if let Some(stop) = act.next_string_owned() {
let stop: usize = match stop.parse() {
Ok(v) => v,
- Err(_) => return util::err(groups::WRONGTYPE_ERR),
+ Err(_) => return util::err(P::RCODE_WRONGTYPE_ERR),
};
range.set_stop(stop);
};
@@ -193,17 +188,17 @@ action! {
Some(ret) => {
writelist!(con, listmap, ret);
},
- None => conwrite!(con, groups::LISTMAP_BAD_INDEX)?,
+ None => return Err(P::RSTRING_LISTMAP_BAD_INDEX.into()),
}
}
- Ok(None) => conwrite!(con, groups::NIL)?,
- Err(()) => conwrite!(con, groups::ENCODING_ERROR)?,
+ Ok(None) => return Err(P::RCODE_NIL.into()),
+ Err(()) => return Err(P::RCODE_ENCODING_ERROR.into()),
}
}
- None => aerr!(con),
+ None => return Err(P::RCODE_ACTION_ERR.into()),
}
}
- _ => conwrite!(con, groups::UNKNOWN_ACTION)?,
+ _ => return Err(P::RCODE_UNKNOWN_ACTION.into()),
}
}
}
diff --git a/server/src/actions/lists/lmod.rs b/server/src/actions/lists/lmod.rs
index cfd024bf..20901bb6 100644
--- a/server/src/actions/lists/lmod.rs
+++ b/server/src/actions/lists/lmod.rs
@@ -24,7 +24,6 @@
*
*/
-use super::{writer, OKAY_BADIDX_NIL_NLUT};
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::util::compiler;
@@ -44,55 +43,55 @@ action! {
/// - `LMOD (act.len(), |len| len > 1)?;
+ let listmap = handle.get_table_with:: ()?;
// get the list name
let listname = unsafe { act.next_unchecked() };
macro_rules! get_numeric_count {
() => {
match unsafe { String::from_utf8_lossy(act.next_unchecked()) }.parse:: (act.len(), |len| len == 0)?;
let list = match listmap.get_inner_ref().get(listname) {
Some(l) => l,
- _ => return conwrite!(con, groups::NIL),
+ _ => return Err(P::RCODE_NIL.into()),
};
let okay = if registry::state_okay() {
list.write().clear();
- groups::OKAY
+ P::RCODE_OKAY
} else {
- groups::SERVER_ERR
+ P::RCODE_SERVER_ERR
};
- conwrite!(con, okay)?;
+ con._write_raw(okay).await?
}
PUSH => {
- ensure_boolean_or_aerr(!act.is_empty())?;
+ ensure_boolean_or_aerr:: (!act.is_empty())?;
let list = match listmap.get_inner_ref().get(listname) {
Some(l) => l,
- _ => return conwrite!(con, groups::NIL),
+ _ => return Err(P::RCODE_NIL.into()),
};
let venc_ok = listmap.get_val_encoder();
let ret = if compiler::likely(act.as_ref().all(venc_ok)) {
if registry::state_okay() {
list.write().extend(act.map(Data::copy_from_slice));
- groups::OKAY
+ P::RCODE_OKAY
} else {
- groups::SERVER_ERR
+ P::RCODE_SERVER_ERR
}
} else {
- groups::ENCODING_ERROR
+ P::RCODE_ENCODING_ERROR
};
- conwrite!(con, ret)?;
+ con._write_raw(ret).await?
}
REMOVE => {
- ensure_length(act.len(), |len| len == 1)?;
+ ensure_length:: (act.len(), |len| len == 1)?;
let idx_to_remove = get_numeric_count!();
if registry::state_okay() {
let maybe_value = listmap.get_inner_ref().get(listname).map(|list| {
@@ -104,13 +103,13 @@ action! {
false
}
});
- conwrite!(con, OKAY_BADIDX_NIL_NLUT[maybe_value])?;
+ con._write_raw(P::OKAY_BADIDX_NIL_NLUT[maybe_value]).await?
} else {
- conwrite!(con, groups::SERVER_ERR)?;
+ return Err(P::RCODE_SERVER_ERR.into());
}
}
INSERT => {
- ensure_length(act.len(), |len| len == 2)?;
+ ensure_length:: (act.len(), |len| len == 2)?;
let idx_to_insert_at = get_numeric_count!();
let bts = unsafe { act.next_unchecked() };
let ret = if compiler::likely(listmap.is_val_ok(bts)) {
@@ -128,21 +127,21 @@ action! {
false
}
}),
- Err(()) => return conwrite!(con, groups::ENCODING_ERROR),
+ Err(()) => return Err(P::RCODE_ENCODING_ERROR.into()),
};
- OKAY_BADIDX_NIL_NLUT[maybe_insert]
+ P::OKAY_BADIDX_NIL_NLUT[maybe_insert]
} else {
// flush broken; server err
- groups::SERVER_ERR
+ P::RCODE_SERVER_ERR
}
} else {
// encoding failed, uh
- groups::ENCODING_ERROR
+ P::RCODE_ENCODING_ERROR
};
- conwrite!(con, ret)?;
+ con._write_raw(ret).await?
}
POP => {
- ensure_length(act.len(), |len| len < 2)?;
+ ensure_length:: (act.len(), |len| len < 2)?;
let idx = if act.len() == 1 {
// we have an idx
Some(get_numeric_count!())
@@ -165,24 +164,24 @@ action! {
wlock.pop()
}
}),
- Err(()) => return conwrite!(con, groups::ENCODING_ERROR),
+ Err(()) => return Err(P::RCODE_ENCODING_ERROR.into()),
};
match maybe_pop {
Some(Some(val)) => {
- unsafe {
- writer::write_raw_mono(con, listmap.get_value_tsymbol(), &val).await?;
- }
+ con.write_mono_length_prefixed_with_tsymbol(
+ &val, listmap.get_value_tsymbol()
+ ).await?;
}
Some(None) => {
- conwrite!(con, groups::LISTMAP_BAD_INDEX)?;
+ con._write_raw(P::RSTRING_LISTMAP_BAD_INDEX).await?;
}
- None => conwrite!(con, groups::NIL)?,
+ None => con._write_raw(P::RCODE_NIL).await?,
}
} else {
- conwrite!(con, groups::SERVER_ERR)?;
+ con._write_raw(P::RCODE_SERVER_ERR).await?
}
}
- _ => conwrite!(con, groups::UNKNOWN_ACTION)?,
+ _ => con._write_raw(P::RCODE_UNKNOWN_ACTION).await?,
}
Ok(())
}
diff --git a/server/src/actions/lists/macros.rs b/server/src/actions/lists/macros.rs
index c2291d3b..fa428330 100644
--- a/server/src/actions/lists/macros.rs
+++ b/server/src/actions/lists/macros.rs
@@ -26,11 +26,10 @@
macro_rules! writelist {
($con:expr, $listmap:expr, $items:expr) => {{
- let mut typed_array_writer =
- unsafe { TypedArrayWriter::new($con, $listmap.get_value_tsymbol(), $items.len()) }
- .await?;
+ $con.write_typed_non_null_array_header($items.len(), $listmap.get_value_tsymbol())
+ .await?;
for item in $items {
- typed_array_writer.write_element(item).await?;
+ $con.write_typed_non_null_array_element(&item).await?;
}
}};
}
diff --git a/server/src/actions/lists/mod.rs b/server/src/actions/lists/mod.rs
index 9f838d05..3f522304 100644
--- a/server/src/actions/lists/mod.rs
+++ b/server/src/actions/lists/mod.rs
@@ -30,23 +30,16 @@ mod macros;
pub mod lget;
pub mod lmod;
-use crate::corestore::booltable::BytesBoolTable;
-use crate::corestore::booltable::BytesNicheLUT;
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::kvengine::LockedVec;
-use crate::resp::writer;
-
-const OKAY_OVW_BLUT: BytesBoolTable = BytesBoolTable::new(groups::OKAY, groups::OVERWRITE_ERR);
-const OKAY_BADIDX_NIL_NLUT: BytesNicheLUT =
- BytesNicheLUT::new(groups::NIL, groups::OKAY, groups::LISTMAP_BAD_INDEX);
action! {
/// Handle an `LSET` query for the list model
/// Syntax: `LSET (act.len(), |len| len > 0)?;
+ let listmap = handle.get_table_with:: ()?;
let listname = unsafe { act.next_unchecked_bytes() };
let list = listmap.get_inner_ref();
if registry::state_okay() {
@@ -57,9 +50,9 @@ action! {
} else {
false
};
- conwrite!(con, OKAY_OVW_BLUT[did])?;
+ con._write_raw(P::OKAY_OVW_BLUT[did]).await?
} else {
- conwrite!(con, groups::SERVER_ERR)?;
+ con._write_raw(P::RCODE_SERVER_ERR).await?
}
Ok(())
}
diff --git a/server/src/actions/lskeys.rs b/server/src/actions/lskeys.rs
index 19aa032f..772265d5 100644
--- a/server/src/actions/lskeys.rs
+++ b/server/src/actions/lskeys.rs
@@ -27,14 +27,13 @@
use crate::corestore::table::DataModel;
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
-use crate::resp::writer::TypedArrayWriter;
const DEFAULT_COUNT: usize = 10;
action!(
/// Run an `LSKEYS` query
fn lskeys(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
- ensure_length(act.len(), |size| size < 4)?;
+ ensure_length:: (act.len(), |size| size < 4)?;
let (table, count) = if act.is_empty() {
(get_tbl!(handle, con), DEFAULT_COUNT)
} else if act.len() == 1 {
@@ -45,7 +44,7 @@ action!(
let count = if let Ok(cnt) = String::from_utf8_lossy(nextret).parse:: >(
+ $store.get_table($entity),
+ )?
}};
($store:expr, $con:expr) => {{
match $store.get_ctable() {
Some(tbl) => tbl,
- None => return $crate::util::err($crate::protocol::responses::groups::DEFAULT_UNSET),
+ None => return $crate::util::err(P::RSTRING_DEFAULT_UNSET),
}
}};
}
@@ -80,7 +66,7 @@ macro_rules! get_tbl_ref {
($store:expr, $con:expr) => {{
match $store.get_ctable_ref() {
Some(tbl) => tbl,
- None => return $crate::util::err($crate::protocol::responses::groups::DEFAULT_UNSET),
+ None => return $crate::util::err(P::RSTRING_DEFAULT_UNSET),
}
}};
}
@@ -88,9 +74,9 @@ macro_rules! get_tbl_ref {
#[macro_export]
macro_rules! handle_entity {
($con:expr, $ident:expr) => {{
- match $crate::queryengine::parser::Entity::from_slice(&$ident) {
+ match $crate::queryengine::parser::Entity::from_slice:: (&$ident) {
Ok(e) => e,
- Err(e) => return conwrite!($con, e),
+ Err(e) => return Err(e.into()),
}
}};
}
diff --git a/server/src/actions/mget.rs b/server/src/actions/mget.rs
index 60c1ae17..2daba2d1 100644
--- a/server/src/actions/mget.rs
+++ b/server/src/actions/mget.rs
@@ -27,30 +27,26 @@
use crate::dbnet::connection::prelude::*;
use crate::kvengine::encoding::ENCODING_LUT_ITER;
use crate::queryengine::ActionIter;
-use crate::resp::writer::TypedArrayWriter;
use crate::util::compiler;
action!(
/// Run an `MGET` query
///
fn mget(handle: &crate::corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
- ensure_length(act.len(), |size| size != 0)?;
- let kve = handle.get_table_with:: (act.len(), |size| size != 0)?;
+ let kve = handle.get_table_with:: ()?;
let encoding_is_okay = ENCODING_LUT_ITER[kve.is_key_encoded()](act.as_ref());
if compiler::likely(encoding_is_okay) {
- let mut writer = unsafe {
- // SAFETY: We are getting the value type ourselves
- TypedArrayWriter::new(con, kve.get_value_tsymbol(), act.len())
- }
- .await?;
+ con.write_typed_array_header(act.len(), kve.get_value_tsymbol())
+ .await?;
for key in act {
match kve.get_cloned_unchecked(key) {
- Some(v) => writer.write_element(&v).await?,
- None => writer.write_null().await?,
+ Some(v) => con.write_typed_array_element(&v).await?,
+ None => con.write_typed_array_element_null().await?,
}
}
} else {
- compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?;
+ return util::err(P::RCODE_ENCODING_ERROR);
}
Ok(())
}
diff --git a/server/src/actions/mod.rs b/server/src/actions/mod.rs
index 8fd8687f..eb1daf31 100644
--- a/server/src/actions/mod.rs
+++ b/server/src/actions/mod.rs
@@ -51,7 +51,7 @@ pub mod update;
pub mod uset;
pub mod whereami;
use crate::corestore::memstore::DdlError;
-use crate::protocol::responses::groups;
+use crate::protocol::interface::ProtocolSpec;
use crate::util;
use std::io::Error as IoError;
@@ -65,6 +65,16 @@ pub enum ActionError {
IoError(std::io::Error),
}
+impl PartialEq for ActionError {
+ fn eq(&self, other: &Self) -> bool {
+ match (self, other) {
+ (Self::ActionError(a1), Self::ActionError(a2)) => a1 == a2,
+ (Self::IoError(ioe1), Self::IoError(ioe2)) => ioe1.to_string() == ioe2.to_string(),
+ _ => false,
+ }
+ }
+}
+
impl From<&'static [u8]> for ActionError {
fn from(e: &'static [u8]) -> Self {
Self::ActionError(e)
@@ -77,36 +87,44 @@ impl From (e)),
}
}
-pub fn ensure_length(len: usize, is_valid: fn(usize) -> bool) -> ActionResult<()> {
+pub fn ensure_length (act.len(), |len| len == 0 || len == 1)?;
if act.len() == 1 {
- let raw_byte = unsafe { act.next_unchecked_bytes() };
- con.write_response(BytesWrapper(raw_byte)).await?;
+ let raw_byte = unsafe { act.next_unchecked() };
+ con.write_mono_length_prefixed_with_tsymbol(raw_byte, b'+')
+ .await?;
} else {
- con.write_response(responses::groups::HEYA).await?;
+ con._write_raw(P::ELEMRESP_HEYA).await?;
}
Ok(())
}
diff --git a/server/src/actions/mpop.rs b/server/src/actions/mpop.rs
index 535e45c2..0a443bdc 100644
--- a/server/src/actions/mpop.rs
+++ b/server/src/actions/mpop.rs
@@ -27,36 +27,31 @@
use crate::corestore;
use crate::dbnet::connection::prelude::*;
use crate::kvengine::encoding::ENCODING_LUT_ITER;
-use crate::protocol::responses;
use crate::queryengine::ActionIter;
-use crate::resp::writer::TypedArrayWriter;
use crate::util::compiler;
action!(
/// Run an MPOP action
fn mpop(handle: &corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
- ensure_length(act.len(), |len| len != 0)?;
+ ensure_length:: (act.len(), |len| len != 0)?;
if registry::state_okay() {
- let kve = handle.get_table_with:: ()?;
let encoding_is_okay = ENCODING_LUT_ITER[kve.is_key_encoded()](act.as_ref());
if compiler::likely(encoding_is_okay) {
- let mut writer = unsafe {
- // SAFETY: We have verified the tsymbol ourselves
- TypedArrayWriter::new(con, kve.get_value_tsymbol(), act.len())
- }
- .await?;
+ con.write_typed_array_header(act.len(), kve.get_value_tsymbol())
+ .await?;
for key in act {
match kve.pop_unchecked(key) {
- Some(val) => writer.write_element(val).await?,
- None => writer.write_null().await?,
+ Some(val) => con.write_typed_array_element(&val).await?,
+ None => con.write_typed_array_element_null().await?,
}
}
} else {
- compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?;
+ return util::err(P::RCODE_ENCODING_ERROR);
}
} else {
// don't begin the operation at all if the database is poisoned
- con.write_response(responses::groups::SERVER_ERR).await?;
+ return util::err(P::RCODE_SERVER_ERR);
}
Ok(())
}
diff --git a/server/src/actions/mset.rs b/server/src/actions/mset.rs
index 33857073..d5bc3052 100644
--- a/server/src/actions/mset.rs
+++ b/server/src/actions/mset.rs
@@ -33,8 +33,8 @@ action!(
/// Run an `MSET` query
fn mset(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
let howmany = act.len();
- ensure_length(howmany, |size| size & 1 == 0 && size != 0)?;
- let kve = handle.get_table_with:: (howmany, |size| size & 1 == 0 && size != 0)?;
+ let kve = handle.get_table_with:: ()?;
let encoding_is_okay = ENCODING_LUT_ITER_PAIR[kve.get_encoding_tuple()](&act);
if compiler::likely(encoding_is_okay) {
let done_howmany: Option (howmany, |size| size & 1 == 0 && size != 0)?;
+ let kve = handle.get_table_with:: ()?;
let encoding_is_okay = ENCODING_LUT_ITER_PAIR[kve.get_encoding_tuple()](&act);
let done_howmany: Option (act.len(), |len| len == 1)?;
let key = unsafe {
// SAFETY: We have checked for there to be one arg
act.next_unchecked()
};
if registry::state_okay() {
- let kve = handle.get_table_with:: ()?;
match kve.pop(key) {
- Ok(Some(val)) => unsafe {
- // SAFETY: We have verified the tsymbol ourselves
- writer::write_raw_mono(con, tsymbol, &val).await?
- },
- Ok(None) => conwrite!(con, groups::NIL)?,
- Err(()) => compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?,
+ Ok(Some(val)) => con.write_mono_length_prefixed_with_tsymbol(
+ &val, kve.get_value_tsymbol()
+ ).await?,
+ Ok(None) => return util::err(P::RCODE_NIL),
+ Err(()) => return util::err(P::RCODE_ENCODING_ERROR),
}
} else {
- conwrite!(con, groups::SERVER_ERR)?;
+ return util::err(P::RCODE_SERVER_ERR);
}
Ok(())
}
diff --git a/server/src/actions/set.rs b/server/src/actions/set.rs
index ca2fb45a..af047876 100644
--- a/server/src/actions/set.rs
+++ b/server/src/actions/set.rs
@@ -28,21 +28,17 @@
//! This module provides functions to work with `SET` queries
use crate::corestore;
-use crate::corestore::booltable::BytesNicheLUT;
use crate::dbnet::connection::prelude::*;
use crate::queryengine::ActionIter;
use corestore::Data;
-const SET_NLUT: BytesNicheLUT =
- BytesNicheLUT::new(groups::ENCODING_ERROR, groups::OKAY, groups::OVERWRITE_ERR);
-
action!(
/// Run a `SET` query
fn set(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
- ensure_length(act.len(), |len| len == 2)?;
+ ensure_length:: (act.len(), |len| len == 2)?;
if registry::state_okay() {
let did_we = {
- let writer = handle.get_table_with:: ()?;
match unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments
@@ -56,9 +52,9 @@ action!(
Err(()) => None,
}
};
- conwrite!(con, SET_NLUT[did_we])?;
+ con._write_raw(P::SET_NLUT[did_we]).await?;
} else {
- conwrite!(con, groups::SERVER_ERR)?;
+ con._write_raw(P::RCODE_SERVER_ERR).await?;
}
Ok(())
}
diff --git a/server/src/actions/strong/sdel.rs b/server/src/actions/strong/sdel.rs
index ce710c53..f2183b62 100644
--- a/server/src/actions/strong/sdel.rs
+++ b/server/src/actions/strong/sdel.rs
@@ -37,8 +37,8 @@ action! {
/// This either returns `Okay` if all the keys were `del`eted, or it returns a
/// `Nil`, which is code `1`
fn sdel(handle: &crate::corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
- ensure_length(act.len(), |len| len != 0)?;
- let kve = handle.get_table_with:: (act.len(), |len| len != 0)?;
+ let kve = handle.get_table_with:: ()?;
if registry::state_okay() {
// guarantee one check: consistency
let key_encoder = kve.get_key_encoder();
@@ -48,15 +48,15 @@ action! {
self::snapshot_and_del(kve, key_encoder, act.into_inner())
};
match outcome {
- StrongActionResult::Okay => conwrite!(con, groups::OKAY)?,
+ StrongActionResult::Okay => con._write_raw(P::RCODE_OKAY).await?,
StrongActionResult::Nil => {
// good, it failed because some key didn't exist
- conwrite!(con, groups::NIL)?;
+ return util::err(P::RCODE_NIL);
},
- StrongActionResult::ServerError => conwrite!(con, groups::SERVER_ERR)?,
+ StrongActionResult::ServerError => return util::err(P::RCODE_SERVER_ERR),
StrongActionResult::EncodingError => {
// error we love to hate: encoding error, ugh
- compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?
+ return util::err(P::RCODE_ENCODING_ERROR);
},
StrongActionResult::OverwriteError => unsafe {
// SAFETY check: never the case
@@ -64,7 +64,7 @@ action! {
}
}
} else {
- conwrite!(con, groups::SERVER_ERR)?;
+ return util::err(P::RCODE_SERVER_ERR);
}
Ok(())
}
diff --git a/server/src/actions/strong/sset.rs b/server/src/actions/strong/sset.rs
index bdf632a5..ddc57dcd 100644
--- a/server/src/actions/strong/sset.rs
+++ b/server/src/actions/strong/sset.rs
@@ -40,8 +40,8 @@ action! {
/// `Overwrite Error` or code `2`
fn sset(handle: &crate::corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
let howmany = act.len();
- ensure_length(howmany, |size| size & 1 == 0 && size != 0)?;
- let kve = handle.get_table_with:: (howmany, |size| size & 1 == 0 && size != 0)?;
+ let kve = handle.get_table_with:: ()?;
if registry::state_okay() {
let encoder = kve.get_double_encoder();
let outcome = unsafe {
@@ -50,12 +50,12 @@ action! {
self::snapshot_and_insert(kve, encoder, act.into_inner())
};
match outcome {
- StrongActionResult::Okay => conwrite!(con, groups::OKAY)?,
- StrongActionResult::OverwriteError => conwrite!(con, groups::OVERWRITE_ERR)?,
- StrongActionResult::ServerError => conwrite!(con, groups::SERVER_ERR)?,
+ StrongActionResult::Okay => con._write_raw(P::RCODE_OKAY).await?,
+ StrongActionResult::OverwriteError => return util::err(P::RCODE_OVERWRITE_ERR),
+ StrongActionResult::ServerError => return util::err(P::RCODE_SERVER_ERR),
StrongActionResult::EncodingError => {
// error we love to hate: encoding error, ugh
- compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?
+ return util::err(P::RCODE_ENCODING_ERROR);
},
StrongActionResult::Nil => unsafe {
// SAFETY check: never the case
@@ -63,7 +63,7 @@ action! {
}
}
} else {
- conwrite!(con, groups::SERVER_ERR)?;
+ return util::err(P::RCODE_SERVER_ERR);
}
Ok(())
}
diff --git a/server/src/actions/strong/supdate.rs b/server/src/actions/strong/supdate.rs
index e217ebe8..8d1d3fb6 100644
--- a/server/src/actions/strong/supdate.rs
+++ b/server/src/actions/strong/supdate.rs
@@ -40,8 +40,8 @@ action! {
/// or code `1`
fn supdate(handle: &crate::corestore::Corestore, con: &mut T, act: ActionIter<'a>) {
let howmany = act.len();
- ensure_length(howmany, |size| size & 1 == 0 && size != 0)?;
- let kve = handle.get_table_with:: (howmany, |size| size & 1 == 0 && size != 0)?;
+ let kve = handle.get_table_with:: ()?;
if registry::state_okay() {
let encoder = kve.get_double_encoder();
let outcome = unsafe {
@@ -49,15 +49,15 @@ action! {
self::snapshot_and_update(kve, encoder, act.into_inner())
};
match outcome {
- StrongActionResult::Okay => conwrite!(con, groups::OKAY)?,
+ StrongActionResult::Okay => con._write_raw(P::RCODE_OKAY).await?,
StrongActionResult::Nil => {
// good, it failed because some key didn't exist
- conwrite!(con, groups::NIL)?;
+ return util::err(P::RCODE_NIL);
},
- StrongActionResult::ServerError => conwrite!(con, groups::SERVER_ERR)?,
+ StrongActionResult::ServerError => return util::err(P::RCODE_SERVER_ERR),
StrongActionResult::EncodingError => {
// error we love to hate: encoding error, ugh
- compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?
+ return util::err(P::RCODE_ENCODING_ERROR);
},
StrongActionResult::OverwriteError => unsafe {
// SAFETY check: never the case
@@ -65,7 +65,7 @@ action! {
}
}
} else {
- conwrite!(con, groups::SERVER_ERR)?;
+ return util::err(P::RCODE_SERVER_ERR);
}
Ok(())
}
diff --git a/server/src/actions/update.rs b/server/src/actions/update.rs
index 6280e93d..29511ec7 100644
--- a/server/src/actions/update.rs
+++ b/server/src/actions/update.rs
@@ -28,20 +28,16 @@
//! This module provides functions to work with `UPDATE` queries
//!
-use crate::corestore::booltable::BytesNicheLUT;
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
-const UPDATE_NLUT: BytesNicheLUT =
- BytesNicheLUT::new(groups::ENCODING_ERROR, groups::OKAY, groups::NIL);
-
action!(
/// Run an `UPDATE` query
fn update(handle: &Corestore, con: &'a mut T, mut act: ActionIter<'a>) {
- ensure_length(act.len(), |len| len == 2)?;
+ ensure_length:: (act.len(), |len| len == 2)?;
if registry::state_okay() {
let did_we = {
- let writer = handle.get_table_with:: ()?;
match unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments
@@ -55,9 +51,9 @@ action!(
Err(()) => None,
}
};
- conwrite!(con, UPDATE_NLUT[did_we])?;
+ con._write_raw(P::UPDATE_NLUT[did_we]).await?;
} else {
- conwrite!(con, groups::SERVER_ERR)?;
+ return util::err(P::RCODE_SERVER_ERR);
}
Ok(())
}
diff --git a/server/src/actions/uset.rs b/server/src/actions/uset.rs
index ad3d58e9..b8e0101b 100644
--- a/server/src/actions/uset.rs
+++ b/server/src/actions/uset.rs
@@ -36,20 +36,20 @@ action!(
/// This is like "INSERT or UPDATE"
fn uset(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter<'a>) {
let howmany = act.len();
- ensure_length(howmany, |size| size & 1 == 0 && size != 0)?;
- let kve = handle.get_table_with:: (howmany, |size| size & 1 == 0 && size != 0)?;
+ let kve = handle.get_table_with:: ()?;
let encoding_is_okay = ENCODING_LUT_ITER_PAIR[kve.get_encoding_tuple()](&act);
if compiler::likely(encoding_is_okay) {
if registry::state_okay() {
while let (Some(key), Some(val)) = (act.next(), act.next()) {
kve.upsert_unchecked(Data::copy_from_slice(key), Data::copy_from_slice(val));
}
- conwrite!(con, howmany / 2)?;
+ con.write_usize(howmany / 2).await?;
} else {
- conwrite!(con, groups::SERVER_ERR)?;
+ return util::err(P::RCODE_SERVER_ERR);
}
} else {
- compiler::cold_err(conwrite!(con, groups::ENCODING_ERROR))?;
+ return util::err(P::RCODE_ENCODING_ERROR);
}
Ok(())
}
diff --git a/server/src/actions/whereami.rs b/server/src/actions/whereami.rs
index 70c94eaa..bc958d02 100644
--- a/server/src/actions/whereami.rs
+++ b/server/src/actions/whereami.rs
@@ -25,20 +25,19 @@
*/
use crate::dbnet::connection::prelude::*;
-use crate::resp::writer::NonNullArrayWriter;
action! {
fn whereami(store: &Corestore, con: &mut T, act: ActionIter<'a>) {
- ensure_length(act.len(), |len| len == 0)?;
+ ensure_length:: (act.len(), |len| len == 0)?;
match store.get_ids() {
(Some(ks), Some(tbl)) => {
- let mut writer = unsafe { NonNullArrayWriter::new(con, b'+', 2).await? };
- writer.write_element(ks).await?;
- writer.write_element(tbl).await?;
+ con.write_typed_non_null_array_header(2, b'+').await?;
+ con.write_typed_non_null_array_element(ks).await?;
+ con.write_typed_non_null_array_element(tbl).await?;
},
(Some(ks), None) => {
- let mut writer = unsafe { NonNullArrayWriter::new(con, b'+', 1).await? };
- writer.write_element(ks).await?;
+ con.write_typed_non_null_array_header(1, b'+').await?;
+ con.write_typed_non_null_array_element(ks).await?;
},
_ => unsafe { impossible!() }
}
diff --git a/server/src/admin/mksnap.rs b/server/src/admin/mksnap.rs
index 9e48be41..cce96361 100644
--- a/server/src/admin/mksnap.rs
+++ b/server/src/admin/mksnap.rs
@@ -38,10 +38,10 @@ action!(
if act.is_empty() {
// traditional mksnap
match engine.mksnap(handle.clone_store()).await {
- SnapshotActionResult::Ok => conwrite!(con, groups::OKAY)?,
- SnapshotActionResult::Failure => conwrite!(con, groups::SERVER_ERR)?,
- SnapshotActionResult::Disabled => conwrite!(con, groups::SNAPSHOT_DISABLED)?,
- SnapshotActionResult::Busy => conwrite!(con, groups::SNAPSHOT_BUSY)?,
+ SnapshotActionResult::Ok => con._write_raw(P::RCODE_OKAY).await?,
+ SnapshotActionResult::Failure => return util::err(P::RCODE_SERVER_ERR),
+ SnapshotActionResult::Disabled => return util::err(P::RSTRING_SNAPSHOT_DISABLED),
+ SnapshotActionResult::Busy => return util::err(P::RSTRING_SNAPSHOT_BUSY),
_ => unsafe { impossible!() },
}
} else if act.len() == 1 {
@@ -51,7 +51,7 @@ action!(
act.next_unchecked_bytes()
};
if !encoding::is_utf8(&name) {
- return conwrite!(con, groups::ENCODING_ERROR);
+ return util::err(P::RCODE_ENCODING_ERROR);
}
// SECURITY: Check for directory traversal syntax
@@ -72,19 +72,21 @@ action!(
.count()
!= 0;
if illegal_snapshot {
- return conwrite!(con, groups::SNAPSHOT_ILLEGAL_NAME);
+ return util::err(P::RSTRING_SNAPSHOT_ILLEGAL_NAME);
}
// now make the snapshot
match engine.mkrsnap(name, handle.clone_store()).await {
- SnapshotActionResult::Ok => conwrite!(con, groups::OKAY)?,
- SnapshotActionResult::Failure => conwrite!(con, groups::SERVER_ERR)?,
- SnapshotActionResult::Busy => conwrite!(con, groups::SNAPSHOT_BUSY)?,
- SnapshotActionResult::AlreadyExists => conwrite!(con, groups::SNAPSHOT_DUPLICATE)?,
+ SnapshotActionResult::Ok => con._write_raw(P::RCODE_OKAY).await?,
+ SnapshotActionResult::Failure => return util::err(P::RCODE_SERVER_ERR),
+ SnapshotActionResult::Busy => return util::err(P::RSTRING_SNAPSHOT_BUSY),
+ SnapshotActionResult::AlreadyExists => {
+ return util::err(P::RSTRING_SNAPSHOT_DUPLICATE)
+ }
_ => unsafe { impossible!() },
}
} else {
- conwrite!(con, groups::ACTION_ERR)?;
+ return util::err(P::RCODE_ACTION_ERR);
}
Ok(())
}
diff --git a/server/src/admin/sys.rs b/server/src/admin/sys.rs
index d432c1c7..2bb280f2 100644
--- a/server/src/admin/sys.rs
+++ b/server/src/admin/sys.rs
@@ -25,9 +25,7 @@
*/
use crate::{
- corestore::booltable::BoolTable,
- dbnet::connection::prelude::*,
- protocol::{PROTOCOL_VERSION, PROTOCOL_VERSIONSTRING},
+ corestore::booltable::BoolTable, dbnet::connection::prelude::*,
storage::v1::interface::DIR_ROOT,
};
use ::libsky::VERSION;
@@ -47,18 +45,18 @@ const HEALTH_TABLE: BoolTable<&str> = BoolTable::new("good", "critical");
action! {
fn sys(_handle: &Corestore, con: &mut T, iter: ActionIter<'_>) {
let mut iter = iter;
- ensure_boolean_or_aerr(iter.len() == 2)?;
+ ensure_boolean_or_aerr:: (iter.len() == 2)?;
match unsafe { iter.next_lowercase_unchecked() }.as_ref() {
INFO => sys_info(con, &mut iter).await,
METRIC => sys_metric(con, &mut iter).await,
- _ => util::err(groups::UNKNOWN_ACTION),
+ _ => util::err(P::RCODE_UNKNOWN_ACTION),
}
}
fn sys_info(con: &mut T, iter: &mut ActionIter<'_>) {
match unsafe { iter.next_lowercase_unchecked() }.as_ref() {
- INFO_PROTOCOL => con.write_response(PROTOCOL_VERSIONSTRING).await?,
- INFO_PROTOVER => con.write_response(PROTOCOL_VERSION).await?,
- INFO_VERSION => con.write_response(VERSION).await?,
+ INFO_PROTOCOL => con.write_string(P::PROTOCOL_VERSIONSTRING).await?,
+ INFO_PROTOVER => con.write_float(P::PROTOCOL_VERSION).await?,
+ INFO_VERSION => con.write_string(VERSION).await?,
_ => return util::err(ERR_UNKNOWN_PROPERTY),
}
Ok(())
@@ -66,14 +64,14 @@ action! {
fn sys_metric(con: &mut T, iter: &mut ActionIter<'_>) {
match unsafe { iter.next_lowercase_unchecked() }.as_ref() {
METRIC_HEALTH => {
- con.write_response(HEALTH_TABLE[registry::state_okay()]).await?
+ con.write_string(HEALTH_TABLE[registry::state_okay()]).await?
}
METRIC_STORAGE_USAGE => {
match util::os::dirsize(DIR_ROOT) {
- Ok(size) => con.write_response(size).await?,
+ Ok(size) => con.write_int64(size).await?,
Err(e) => {
log::error!("Failed to get storage usage with: {e}");
- con.write_response(groups::SERVER_ERR).await?
+ return util::err(P::RCODE_SERVER_ERR);
},
}
}
diff --git a/server/src/arbiter.rs b/server/src/arbiter.rs
index 1b8e0a3e..663d4399 100644
--- a/server/src/arbiter.rs
+++ b/server/src/arbiter.rs
@@ -57,6 +57,7 @@ pub async fn run(
snapshot,
maxcon,
auth,
+ protocol,
..
}: ConfigurationSet,
restore_filepath: Option ()?.as_ref() {
AUTH_LOGIN => self::_auth_login(con, auth, &mut iter).await,
AUTH_CLAIM => self::_auth_claim(con, auth, &mut iter).await,
AUTH_ADDUSER => {
- ensure_boolean_or_aerr(iter.len() == 1)?; // just the username
+ ensure_boolean_or_aerr:: (iter.len() == 1)?; // just the username
let username = unsafe { iter.next_unchecked() };
- let key = auth.provider_mut().claim_user(username)?;
- con.write_response(StringWrapper(key)).await?;
+ let key = auth.provider_mut().claim_user:: (username)?;
+ con.write_string(&key).await?;
Ok(())
}
AUTH_LOGOUT => {
- ensure_boolean_or_aerr(iter.is_empty())?; // nothing else
- auth.provider_mut().logout()?;
+ ensure_boolean_or_aerr:: (iter.is_empty())?; // nothing else
+ auth.provider_mut().logout:: ()?;
auth.swap_executor_to_anonymous();
- con.write_response(groups::OKAY).await?;
+ con._write_raw(P::RCODE_OKAY).await?;
Ok(())
}
AUTH_DELUSER => {
- ensure_boolean_or_aerr(iter.len() == 1)?; // just the username
- auth.provider_mut().delete_user(unsafe { iter.next_unchecked() })?;
- con.write_response(groups::OKAY).await?;
+ ensure_boolean_or_aerr:: (iter.len() == 1)?; // just the username
+ auth.provider_mut().delete_user:: (unsafe { iter.next_unchecked() })?;
+ con._write_raw(P::RCODE_OKAY).await?;
Ok(())
}
AUTH_RESTORE => self::auth_restore(con, auth, &mut iter).await,
AUTH_LISTUSER => self::auth_listuser(con, auth, &mut iter).await,
AUTH_WHOAMI => self::auth_whoami(con, auth, &mut iter).await,
- _ => util::err(groups::UNKNOWN_ACTION),
+ _ => util::err(P::RCODE_UNKNOWN_ACTION),
}
}
- fn auth_whoami(con: &mut T, auth: &mut AuthProviderHandle<'_, T, Strm>, iter: &mut ActionIter<'_>) {
- ensure_boolean_or_aerr(ActionIter::is_empty(iter))?;
- con.write_response(StringWrapper(auth.provider().whoami()?)).await?;
+ fn auth_whoami(con: &mut T, auth: &mut AuthProviderHandle<'_, P, T, Strm>, iter: &mut ActionIter<'_>) {
+ ensure_boolean_or_aerr:: (ActionIter::is_empty(iter))?;
+ con.write_string(&auth.provider().whoami:: ()?).await?;
Ok(())
}
- fn auth_listuser(con: &mut T, auth: &mut AuthProviderHandle<'_, T, Strm>, iter: &mut ActionIter<'_>) {
- ensure_boolean_or_aerr(ActionIter::is_empty(iter))?;
- let usernames = auth.provider().collect_usernames()?;
- let mut array_writer = unsafe {
- // The symbol is definitely correct, obvious from this context
- NonNullArrayWriter::new(con, TSYMBOL_UNICODE_STRING, usernames.len())
- }.await?;
+ fn auth_listuser(con: &mut T, auth: &mut AuthProviderHandle<'_, P, T, Strm>, iter: &mut ActionIter<'_>) {
+ ensure_boolean_or_aerr:: (ActionIter::is_empty(iter))?;
+ let usernames = auth.provider().collect_usernames:: ()?;
+ con.write_typed_non_null_array_header(usernames.len(), b'+').await?;
for username in usernames {
- array_writer.write_element(username).await?;
+ con.write_typed_non_null_array_element(username.as_bytes()).await?;
}
Ok(())
}
- fn auth_restore(con: &mut T, auth: &mut AuthProviderHandle<'_, T, Strm>, iter: &mut ActionIter<'_>) {
+ fn auth_restore(con: &mut T, auth: &mut AuthProviderHandle<'_, P, T, Strm>, iter: &mut ActionIter<'_>) {
let newkey = match iter.len() {
1 => {
// so this fella thinks they're root
- auth.provider().regenerate(unsafe {iter.next_unchecked()})?
+ auth.provider().regenerate:: (
+ unsafe { iter.next_unchecked() }
+ )?
}
2 => {
// so this fella is giving us the origin key
let origin = unsafe { iter.next_unchecked() };
let id = unsafe { iter.next_unchecked() };
- auth.provider().regenerate_using_origin(origin, id)?
+ auth.provider().regenerate_using_origin:: (origin, id)?
}
- _ => return util::err(groups::ACTION_ERR),
+ _ => return util::err(P::RCODE_ACTION_ERR),
};
- con.write_response(StringWrapper(newkey)).await?;
+ con.write_string(&newkey).await?;
Ok(())
}
- fn _auth_claim(con: &mut T, auth: &mut AuthProviderHandle<'_, T, Strm>, iter: &mut ActionIter<'_>) {
- ensure_boolean_or_aerr(iter.len() == 1)?; // just the origin key
+ fn _auth_claim(con: &mut T, auth: &mut AuthProviderHandle<'_, P, T, Strm>, iter: &mut ActionIter<'_>) {
+ ensure_boolean_or_aerr:: (iter.len() == 1)?; // just the origin key
let origin_key = unsafe { iter.next_unchecked() };
- let key = auth.provider_mut().claim_root(origin_key)?;
+ let key = auth.provider_mut().claim_root:: (origin_key)?;
auth.swap_executor_to_authenticated();
- con.write_response(StringWrapper(key)).await?;
+ con.write_string(&key).await?;
Ok(())
}
/// Handle a login operation only. The **`login` token is expected to be present**
fn auth_login_only(
con: &mut T,
- auth: &mut AuthProviderHandle<'_, T, Strm>,
+ auth: &mut AuthProviderHandle<'_, P, T, Strm>,
iter: ActionIter<'_>
) {
let mut iter = iter;
- match iter.next_lowercase().unwrap_or_aerr()?.as_ref() {
+ match iter.next_lowercase().unwrap_or_aerr:: ()?.as_ref() {
AUTH_LOGIN => self::_auth_login(con, auth, &mut iter).await,
AUTH_CLAIM => self::_auth_claim(con, auth, &mut iter).await,
AUTH_RESTORE => self::auth_restore(con, auth, &mut iter).await,
AUTH_WHOAMI => self::auth_whoami(con, auth, &mut iter).await,
- _ => util::err(errors::AUTH_CODE_PERMS),
+ _ => util::err(P::AUTH_CODE_PERMS),
}
}
- fn _auth_login(con: &mut T, auth: &mut AuthProviderHandle<'_, T, Strm>, iter: &mut ActionIter<'_>) {
+ fn _auth_login(con: &mut T, auth: &mut AuthProviderHandle<'_, P, T, Strm>, iter: &mut ActionIter<'_>) {
// sweet, where's our username and password
- ensure_boolean_or_aerr(iter.len() == 2)?; // just the uname and pass
+ ensure_boolean_or_aerr:: (iter.len() == 2)?; // just the uname and pass
let (username, password) = unsafe { (iter.next_unchecked(), iter.next_unchecked()) };
- auth.provider_mut().login(username, password)?;
+ auth.provider_mut().login:: (username, password)?;
auth.swap_executor_to_authenticated();
- con.write_response(groups::OKAY).await?;
+ con._write_raw(P::RCODE_OKAY).await?;
Ok(())
}
}
diff --git a/server/src/auth/provider.rs b/server/src/auth/provider.rs
index 6223e294..35ac27c3 100644
--- a/server/src/auth/provider.rs
+++ b/server/src/auth/provider.rs
@@ -24,9 +24,12 @@
*
*/
-use super::{errors, keys, AuthError};
+use super::keys;
+use crate::actions::{ActionError, ActionResult};
use crate::corestore::array::Array;
use crate::corestore::htable::Coremap;
+use crate::protocol::interface::ProtocolSpec;
+use crate::util::err;
use std::sync::Arc;
// constants
@@ -35,14 +38,12 @@ pub const AUTHKEY_SIZE: usize = 40;
/// Size of an authn ID in bytes
pub const AUTHID_SIZE: usize = 40;
-#[cfg(debug_assertions)]
pub mod testsuite_data {
+ #![allow(unused)]
//! Temporary users created by the testsuite in debug mode
pub const TESTSUITE_ROOT_USER: &str = "root";
pub const TESTSUITE_TEST_USER: &str = "testuser";
- #[cfg(test)]
pub const TESTSUITE_ROOT_TOKEN: &str = "XUOdVKhEONnnGwNwT7WeLqbspDgVtKex0/nwFwBSW7XJxioHwpg6H.";
- #[cfg(all(not(feature = "persist-suite"), test))]
pub const TESTSUITE_TEST_TOKEN: &str = "mpobAB7EY8vnBs70d/..h1VvfinKIeEJgt1rg4wUkwF6aWCvGGR9le";
}
@@ -56,8 +57,6 @@ const USER_ROOT: AuthID = unsafe { AuthID::from_const(USER_ROOT_ARRAY, 4) };
type AuthID = Array (origin_key)?;
// the origin key was good, let's try claiming root
let (key, store) = keys::generate_full();
if self.authmap.true_if_insert(USER_ROOT, store) {
@@ -130,33 +129,33 @@ impl AuthProvider {
self.whoami = Some(USER_ROOT);
Ok(key)
} else {
- Err(AuthError::AlreadyClaimed)
+ err(P::AUTH_ERROR_ALREADYCLAIMED)
}
}
- fn are_you_root(&self) -> AuthResult ()?;
match self.whoami.as_ref().map(|v| v.eq(&USER_ROOT)) {
Some(v) => Ok(v),
- None => Err(AuthError::Anonymous),
+ None => err(P::AUTH_CODE_PERMS),
}
}
- pub fn claim_user(&self, claimant: &[u8]) -> AuthResult ()?;
+ self._claim_user:: (claimant)
}
- pub fn _claim_user(&self, claimant: &[u8]) -> AuthResult (claimant)?, store)
{
Ok(key)
} else {
- Err(AuthError::AlreadyClaimed)
+ err(P::AUTH_ERROR_ALREADYCLAIMED)
}
}
- pub fn login(&mut self, account: &[u8], token: &[u8]) -> AuthResult<()> {
- self.ensure_enabled()?;
+ pub fn login ()?;
match self
.authmap
.get(account)
@@ -164,84 +163,94 @@ impl AuthProvider {
{
Some(Some(true)) => {
// great, authenticated
- self.whoami = Some(Self::try_auth_id(account)?);
+ self.whoami = Some(Self::try_auth_id:: (account)?);
Ok(())
}
_ => {
// either the password was wrong, or the username was wrong
- Err(AuthError::BadCredentials)
+ err(P::AUTH_CODE_BAD_CREDENTIALS)
}
}
}
- pub fn regenerate_using_origin(&self, origin: &[u8], account: &[u8]) -> AuthResult (origin)?;
+ self._regenerate:: (account)
}
- pub fn regenerate(&self, account: &[u8]) -> AuthResult ()?;
+ self._regenerate:: (account)
}
/// Regenerate the token for the given user. This returns a new token
- fn _regenerate(&self, account: &[u8]) -> AuthResult (account)?;
let (key, store) = keys::generate_full();
if self.authmap.true_if_update(id, store) {
Ok(key)
} else {
- Err(AuthError::BadCredentials)
+ err(P::AUTH_CODE_BAD_CREDENTIALS)
}
}
- fn try_auth_id(authid: &[u8]) -> AuthResult ()?;
+ self.whoami
+ .take()
+ .map(|_| ())
+ .ok_or(ActionError::ActionError(P::AUTH_CODE_PERMS))
}
- fn ensure_enabled(&self) -> AuthResult<()> {
- self.origin.as_ref().map(|_| ()).ok_or(AuthError::Disabled)
+ fn ensure_enabled ()?.eq(origin) {
Ok(())
} else {
- Err(AuthError::BadCredentials)
+ err(P::AUTH_CODE_BAD_CREDENTIALS)
}
}
- fn get_origin(&self) -> AuthResult<&Authkey> {
+ fn get_origin ()? {
Ok(())
} else {
- Err(AuthError::PermissionDenied)
+ err(P::AUTH_CODE_PERMS)
}
}
- pub fn delete_user(&self, user: &[u8]) -> AuthResult<()> {
- self.ensure_root()?;
+ pub fn delete_user ()?;
if user.eq(&USER_ROOT) {
// can't delete root!
- Err(AuthError::Other(errors::AUTH_ERROR_FAILED_TO_DELETE_USER))
+ err(P::AUTH_ERROR_FAILED_TO_DELETE_USER)
} else if self.authmap.true_if_removed(user) {
Ok(())
} else {
- Err(AuthError::BadCredentials)
+ err(P::AUTH_CODE_BAD_CREDENTIALS)
}
}
/// List all the users
- pub fn collect_usernames(&self) -> AuthResult ()?;
Ok(self
.authmap
.iter()
@@ -249,12 +258,12 @@ impl AuthProvider {
.collect())
}
/// Return the AuthID of the current user
- pub fn whoami(&self) -> AuthResult ()?;
self.whoami
.as_ref()
.map(|v| String::from_utf8_lossy(v).to_string())
- .ok_or(AuthError::Anonymous)
+ .ok_or(ActionError::ActionError(P::AUTH_CODE_PERMS))
}
}
diff --git a/server/src/auth/tests.rs b/server/src/auth/tests.rs
index c083130a..a067eefe 100644
--- a/server/src/auth/tests.rs
+++ b/server/src/auth/tests.rs
@@ -35,77 +35,88 @@ mod keys {
}
mod authn {
- use crate::auth::{AuthError, AuthProvider};
+ use crate::actions::ActionError;
+ use crate::auth::AuthProvider;
+ use crate::protocol::{interface::ProtocolSpec, Skyhash2};
const ORIG: &[u8; 40] = b"c4299d190fb9a00626797fcc138c56eae9971664";
#[test]
fn claim_root_okay() {
let mut provider = AuthProvider::new_blank(Some(*ORIG));
- let _ = provider.claim_root(ORIG).unwrap();
+ let _ = provider.claim_root:: (self)
}
/// Create a table: in-memory; **no transactional guarantees**. Two tables can be created
/// simultaneously, but are never flushed unless we are very lucky. If the global flush
diff --git a/server/src/corestore/table.rs b/server/src/corestore/table.rs
index fb908093..c9c8959c 100644
--- a/server/src/corestore/table.rs
+++ b/server/src/corestore/table.rs
@@ -32,22 +32,22 @@ use crate::corestore::Data;
use crate::corestore::{memstore::DdlError, KeyspaceResult};
use crate::dbnet::connection::prelude::Corestore;
use crate::kvengine::{KVEListmap, KVEStandard, LockedVec};
-use crate::protocol::responses::groups;
+use crate::protocol::interface::ProtocolSpec;
use crate::util;
pub trait DescribeTable {
type Table;
fn try_get(table: &Table) -> Option<&Self::Table>;
- fn get(store: &Corestore) -> ActionResult<&Self::Table> {
+ fn get ,
_phantom: PhantomData<(T, Strm)>,
}
-impl<'a, T, Strm> AuthProviderHandle<'a, T, Strm>
+impl<'a, P, T, Strm> AuthProviderHandle<'a, P, T, Strm>
where
- T: ClientConnection ,
Strm: Stream,
+ P: ProtocolSpec,
{
- pub fn new(provider: &'a mut AuthProvider, executor: &'a mut ExecutorFn ) -> Self {
Self {
provider,
executor,
@@ -105,217 +102,33 @@ where
}
pub mod prelude {
- //! A 'prelude' for callers that would like to use the `ProtocolConnection` and `ProtocolConnectionExt` traits
+ //! A 'prelude' for callers that would like to use the `RawConnection` and `ProtocolRead` traits
//!
//! This module is hollow itself, it only re-exports from `dbnet::con` and `tokio::io`
- pub use super::{AuthProviderHandle, ClientConnection, ProtocolConnectionExt, Stream};
- pub use crate::actions::{ensure_boolean_or_aerr, ensure_cond_or_err, ensure_length};
- pub use crate::corestore::{
- table::{KVEBlob, KVEList},
- Corestore,
+ pub use super::{AuthProviderHandle, ClientConnection, Stream};
+ pub use crate::{
+ actions::{ensure_boolean_or_aerr, ensure_cond_or_err, ensure_length, translate_ddl_error},
+ corestore::{
+ table::{KVEBlob, KVEList},
+ Corestore,
+ },
+ get_tbl, handle_entity, is_lowbit_set,
+ protocol::interface::ProtocolSpec,
+ queryengine::ActionIter,
+ registry,
+ util::{self, FutureResult, UnwrapActionError, Unwrappable},
};
- pub use crate::protocol::responses::{self, groups};
- pub use crate::queryengine::ActionIter;
- pub use crate::resp::StringWrapper;
- pub use crate::util::{self, FutureResult, UnwrapActionError, Unwrappable};
- pub use crate::{aerr, conwrite, get_tbl, handle_entity, is_lowbit_set, registry};
pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
}
-/// # The `ProtocolConnectionExt` trait
+/// # The `RawConnection` trait
///
-/// The `ProtocolConnectionExt` trait has default implementations and doesn't ever require explicit definitions, unless
-/// there's some black magic that you want to do. All [`ProtocolConnection`] objects will get a free implementation for this trait.
-/// Hence implementing [`ProtocolConnection`] alone is enough for you to get high-level methods to interface with the protocol.
+/// The `RawConnection` trait has low-level methods that can be used to interface with raw sockets. Any type
+/// that successfully implements this trait will get an implementation for `ProtocolRead` and `ProtocolWrite`
+/// provided that it uses a protocol that implements the `ProtocolSpec` trait.
///
-/// ## DO NOT
-/// The fact that this is a trait enables great flexibility in terms of visibility, but **DO NOT EVER CALL any function other than
-/// `read_query`, `close_conn_with_error` or `write_response`**. If you mess with functions like `read_again`, you're likely to pull yourself into some
-/// good trouble.
-pub trait ProtocolConnectionExt for Connection =
+ for<'s> fn(&'s mut ConnectionHandler , Query) -> FutureResult<'s, ActionResult<()>>;
/// # A generic connection handler
///
-/// A [`ConnectionHandler`] object is a generic connection handler for any object that implements the [`ProtocolConnection`] trait (or
-/// the [`ProtocolConnectionExt`] trait). This function will accept such a type `T`, possibly a listener object and then use it to read
+/// A [`ConnectionHandler`] object is a generic connection handler for any object that implements the [`RawConnection`] trait (or
+/// the [`ProtocolRead`] trait). This function will accept such a type `T`, possibly a listener object and then use it to read
/// a query, parse it and return an appropriate response through [`corestore::Corestore::execute_query`]
-pub struct ConnectionHandler {
db: Corestore,
con: T,
climit: Arc ,
terminator: Terminator,
_term_sig_tx: mpsc::Sender<()>,
_marker: PhantomData ConnectionHandler
where
- T: ProtocolConnectionExt + ProtocolWrite + Send + Sync,
+ Strm: Stream,
+ P: ProtocolSpec,
{
pub fn new(
db: Corestore,
con: T,
auth: AuthProvider,
- executor: ExecutorFn ,
climit: Arc Drop for ConnectionHandler {
fn drop(&mut self) {
// Make sure that the permit is returned to the semaphore
// in the case that there is a panic inside
@@ -528,10 +360,14 @@ pub trait Stream: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync {}
impl + ProtocolRead + Send + Sync
+{
+}
+impl ClientConnection for T
where
- T: ProtocolConnectionExt + ProtocolRead + Send + Sync,
Strm: Stream,
+ P: ProtocolSpec,
{
}
diff --git a/server/src/dbnet/mod.rs b/server/src/dbnet/mod.rs
index 109b5570..3eb71665 100644
--- a/server/src/dbnet/mod.rs
+++ b/server/src/dbnet/mod.rs
@@ -39,19 +39,24 @@
//! 5. Now errors are handled if they occur. Otherwise, the query is executed by `Corestore::execute_query()`
//!
-use self::tcp::Listener;
-use crate::{
- auth::AuthProvider,
- config::{PortConfig, SslOpts},
- corestore::Corestore,
- util::error::{Error, SkyResult},
- IoResult,
-};
-use std::{net::IpAddr, sync::Arc};
-use tls::SslListener;
-use tokio::{
- net::TcpListener,
- sync::{broadcast, mpsc, Semaphore},
+use {
+ self::{
+ tcp::{Listener, ListenerV1},
+ tls::{SslListener, SslListenerV1},
+ },
+ crate::{
+ auth::AuthProvider,
+ config::{PortConfig, ProtocolVersion, SslOpts},
+ corestore::Corestore,
+ util::error::{Error, SkyResult},
+ IoResult,
+ },
+ core::future::Future,
+ std::{net::IpAddr, sync::Arc},
+ tokio::{
+ net::TcpListener,
+ sync::{broadcast, mpsc, Semaphore},
+ },
};
pub mod connection;
#[macro_use]
@@ -160,35 +165,93 @@ impl BaseListener {
#[allow(clippy::large_enum_variant)]
pub enum MultiListener {
SecureOnly(SslListener),
+ SecureOnlyV1(SslListenerV1),
InsecureOnly(Listener),
+ InsecureOnlyV1(ListenerV1),
Multi(Listener, SslListener),
+ MultiV1(ListenerV1, SslListenerV1),
+}
+
+async fn wait_on_port_futures(
+ a: impl Future