Merge pull request #161 from skytable/refactor-actions

Refactor actions
next
Sayan 3 years ago committed by GitHub
commit ff6e2175d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

8
Cargo.lock generated

@ -903,8 +903,8 @@ dependencies = [
[[package]]
name = "skytable"
version = "0.2.3"
source = "git+https://github.com/skytable/client-rust?branch=next#f36d7567ccae39a512d581d7f0d205c88db82b63"
version = "0.3.0"
source = "git+https://github.com/skytable/client-rust?branch=next#acf41607ebb883e539e5e2663341c2fbcf04a97f"
dependencies = [
"bytes",
"tokio",
@ -976,9 +976,9 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.6.0"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd3076b5c8cc18138b8f8814895c11eb4de37114a5d127bafdc5e55798ceef37"
checksum = "0a38d31d7831c6ed7aad00aa4c12d9375fd225a6dd77da1d25b707346319a975"
dependencies = [
"autocfg",
"bytes",

@ -8,7 +8,7 @@ edition = "2018"
[dependencies]
libsky = { path = "../libsky" }
tokio = { version = "1.6.0", features = ["full"] }
tokio = { version = "1.6.1", features = ["full"] }
bytes = "1.0.1"
clap = { version = "2.33.3", features = ["yaml"] }
openssl = { version = "0.10.34", features = ["vendored"] }

@ -6,7 +6,7 @@ edition = "2018"
build = "build.rs"
[dependencies]
tokio = { version = "1.6.0", features = ["full"] }
tokio = { version = "1.6.1", features = ["full"] }
bytes = "1.0.1"
libsky = { path = "../libsky" }
bincode = "1.3.3"

@ -25,18 +25,19 @@
*/
use crate::dbnet::connection::prelude::*;
use crate::queryengine::ActionIter;
/// Get the number of keys in the database
pub async fn dbsize<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
crate::err_if_len_is!(act, con, != 0);
crate::err_if_len_is!(act, con, not 0);
let len;
{
len = handle.get_ref().len();

@ -29,6 +29,7 @@
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
/// Run a `DEL` query
///
@ -37,13 +38,13 @@ use crate::protocol::responses;
pub async fn del<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
crate::err_if_len_is!(act, con, == 0);
crate::err_if_len_is!(act, con, eq 0);
let done_howmany: Option<usize>;
{
if handle.is_poisoned() {
@ -51,7 +52,7 @@ where
} else {
let mut many = 0;
let cmap = handle.get_ref();
act.into_iter().skip(1).for_each(|key| {
act.for_each(|key| {
if cmap.true_if_removed(key.as_bytes()) {
many += 1
}

@ -28,22 +28,23 @@
//! This module provides functions to work with `EXISTS` queries
use crate::dbnet::connection::prelude::*;
use crate::queryengine::ActionIter;
/// Run an `EXISTS` query
pub async fn exists<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
crate::err_if_len_is!(act, con, == 0);
crate::err_if_len_is!(act, con, eq 0);
let mut how_many_of_them_exist = 0usize;
{
let cmap = handle.get_ref();
act.into_iter().skip(1).for_each(|key| {
act.for_each(|key| {
if cmap.contains_key(key.as_bytes()) {
how_many_of_them_exist += 1;
}

@ -26,18 +26,19 @@
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
/// Delete all the keys in the database
pub async fn flushdb<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
crate::err_if_len_is!(act, con, != 0);
crate::err_if_len_is!(act, con, not 0);
let failed;
{
if handle.is_poisoned() {

@ -29,27 +29,33 @@
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
use crate::resp::BytesWrapper;
use bytes::Bytes;
use core::hint::unreachable_unchecked;
/// Run a `GET` query
pub async fn get<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
crate::err_if_len_is!(act, con, != 1);
crate::err_if_len_is!(act, con, not 1);
let res: Option<Bytes> = {
let reader = handle.get_ref();
unsafe {
// UNSAFE(@ohsayan): act.get_ref().get_unchecked() is safe because we've already if the action
// UNSAFE(@ohsayan): unreachable_unchecked is safe because we've already checked if the action
// group contains one argument (excluding the action itself)
reader
.get(act.get_unchecked(1).as_bytes())
.get(
act.next()
.unwrap_or_else(|| unreachable_unchecked())
.as_bytes(),
)
.map(|b| b.get_blob().clone())
}
};

@ -29,6 +29,7 @@
//! Functions for handling `JGET` queries
use crate::dbnet::connection::prelude::*;
use crate::queryengine::ActionIter;
/// Run a `JGET` query
/// This returns a JSON key/value pair of keys and values
@ -42,13 +43,13 @@ use crate::dbnet::connection::prelude::*;
pub async fn jget<T, Strm>(
_handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
crate::err_if_len_is!(act, con, != 1);
crate::err_if_len_is!(act, con, not 1);
todo!()
}

@ -26,6 +26,8 @@
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
use core::hint::unreachable_unchecked;
/// Run a `KEYLEN` query
///
@ -33,20 +35,24 @@ use crate::protocol::responses;
pub async fn keylen<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
crate::err_if_len_is!(act, con, != 1);
crate::err_if_len_is!(act, con, not 1);
let res: Option<usize> = {
let reader = handle.get_ref();
unsafe {
// UNSAFE(@ohsayan): get_unchecked() is completely safe as we've already checked
// UNSAFE(@ohsayan): unreachable_unchecked() is completely safe as we've already checked
// the number of arguments is one
reader
.get(act.get_unchecked(1).as_bytes())
.get(
act.next()
.unwrap_or_else(|| unreachable_unchecked())
.as_bytes(),
)
.map(|b| b.get_blob().len())
}
};

@ -26,6 +26,7 @@
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
use crate::resp::BytesWrapper;
use bytes::Bytes;
@ -33,14 +34,14 @@ use bytes::Bytes;
pub async fn lskeys<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
crate::err_if_len_is!(act, con, > 1);
let item_count = if let Some(cnt) = act.get(1) {
crate::err_if_len_is!(act, con, gt 1);
let item_count = if let Some(cnt) = act.next() {
if let Ok(cnt) = cnt.parse::<usize>() {
cnt
} else {

@ -25,6 +25,7 @@
*/
use crate::dbnet::connection::prelude::*;
use crate::queryengine::ActionIter;
use crate::resp::BytesWrapper;
use bytes::Bytes;
use skytable::RespCode;
@ -34,19 +35,20 @@ use skytable::RespCode;
pub async fn mget<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
crate::err_if_len_is!(act, con, == 0);
con.write_array_length(act.len() - 1).await?;
let mut keys = act.into_iter().skip(1);
while let Some(key) = keys.next() {
crate::err_if_len_is!(act, con, eq 0);
con.write_array_length(act.len()).await?;
while let Some(key) = act.next() {
let res: Option<Bytes> = {
let reader = handle.get_ref();
reader.get(key.as_bytes()).map(|b| b.get_blob().clone())
handle
.get_ref()
.get(key.as_bytes())
.map(|b| b.get_blob().clone())
};
if let Some(value) = res {
// Good, we got the value, write it off to the stream

@ -24,9 +24,11 @@
*
*/
//! # The Key/Value Engine
//! This is Skytable's K/V engine. It contains utilities to interface with
//! Skytable's K/V store
//! # Actions
//!
//! Actions are like shell commands, you provide arguments -- they return output. This module contains a collection
//! of the actions supported by Skytable
//!
pub mod dbsize;
pub mod del;
@ -47,12 +49,13 @@ pub mod heya {
//! Respond to `HEYA` queries
use crate::dbnet::connection::prelude::*;
use crate::protocol;
use crate::queryengine::ActionIter;
use protocol::responses;
/// Returns a `HEY!` `Response`
pub async fn heya<T, Strm>(
_handle: &crate::coredb::CoreDB,
con: &mut T,
_act: Vec<String>,
_act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
@ -64,50 +67,50 @@ pub mod heya {
#[macro_export]
macro_rules! err_if_len_is {
($buf:ident, $con:ident, == $len:literal) => {
if $buf.len() - 1 == $len {
($buf:ident, $con:ident, eq $len:literal) => {
if $buf.len() == $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, != $len:literal) => {
if $buf.len() - 1 != $len {
($buf:ident, $con:ident, not $len:literal) => {
if $buf.len() != $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, > $len:literal) => {
if $buf.len() - 1 > $len {
($buf:ident, $con:ident, gt $len:literal) => {
if $buf.len() > $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, < $len:literal) => {
if $buf.len() - 1 < $len {
($buf:ident, $con:ident, lt $len:literal) => {
if $buf.len() < $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, >= $len:literal) => {
if $buf.len() - 1 >= $len {
($buf:ident, $con:ident, gt_or_eq $len:literal) => {
if $buf.len() >= $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, <= $len:literal) => {
if $buf.len() - 1 <= $len {
($buf:ident, $con:ident, lt_or_eq $len:literal) => {
if $buf.len() <= $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, & $len:literal) => {
if $buf.len() - 1 & $len {
if $buf.len() & $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;

@ -27,25 +27,25 @@
use crate::coredb::Data;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
/// Run an `MSET` query
pub async fn mset<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.len() - 1;
let howmany = act.len();
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys
// is not the same as the number of values, we won't run this
// action at all
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut kviter = act.into_iter().skip(1);
let done_howmany: Option<usize>;
{
if handle.is_poisoned() {
@ -53,7 +53,7 @@ where
} else {
let writer = handle.get_ref();
let mut didmany = 0;
while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) {
while let (Some(key), Some(val)) = (act.next(), act.next()) {
if writer.true_if_insert(Data::from(key), Data::from(val)) {
didmany += 1;
}

@ -27,25 +27,25 @@
use crate::coredb::Data;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
/// Run an `MUPDATE` query
pub async fn mupdate<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.len() - 1;
let howmany = act.len();
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys
// is not the same as the number of values, we won't run this
// action at all
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut kviter = act.into_iter().skip(1);
let done_howmany: Option<usize>;
{
if handle.is_poisoned() {
@ -53,7 +53,7 @@ where
} else {
let writer = handle.get_ref();
let mut didmany = 0;
while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) {
while let (Some(key), Some(val)) = (act.next(), act.next()) {
if writer.true_if_update(Data::from(key), Data::from(val)) {
didmany += 1;
}

@ -30,6 +30,7 @@
use crate::coredb;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
use coredb::Data;
use std::hint::unreachable_unchecked;
@ -37,30 +38,25 @@ use std::hint::unreachable_unchecked;
pub async fn set<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.len() - 1;
if howmany != 2 {
// There should be exactly 2 arguments
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut it = act.into_iter().skip(1);
crate::err_if_len_is!(act, con, not 2);
let did_we = {
if handle.is_poisoned() {
None
} else {
let writer = handle.get_ref();
if writer.true_if_insert(
Data::from_string(it.next().unwrap_or_else(|| unsafe {
Data::from_string(act.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments
unreachable_unchecked()
})),
Data::from(it.next().unwrap_or_else(|| unsafe {
Data::from(act.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments
unreachable_unchecked()

@ -38,8 +38,8 @@
use crate::coredb::Data;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use std::hint::unreachable_unchecked;
use crate::queryengine::ActionIter;
use core::hint::unreachable_unchecked;
/// Run an `SSET` query
///
@ -48,17 +48,17 @@ use std::hint::unreachable_unchecked;
pub async fn sset<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.len() - 1;
let howmany = act.len();
if howmany & 1 == 1 || howmany == 0 {
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut failed = Some(false);
let failed;
{
// We use this additional scope to tell the compiler that the write lock
// doesn't go beyond the scope of this function - and is never used across
@ -66,34 +66,16 @@ where
// This iterator gives us the keys and values, skipping the first argument which
// is the action name
let mut key_iter = act
.get(1..)
.unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked if the action group contains more than one arugment
unreachable_unchecked()
})
.iter();
let mut key_iter = act.as_ref().iter();
if handle.is_poisoned() {
failed = None;
} else {
let mut_table = handle.get_ref();
while let Some(key) = key_iter.next() {
if mut_table.contains_key(key.as_bytes()) {
// With one of the keys existing - this action can't clearly be done
// So we'll set `failed` to true and ensure that we check this while
// writing a response back to the client
failed = Some(true);
break;
}
}
if !failed.unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): Completely safe because we've already set a value for `failed` earlier
unreachable_unchecked()
}) {
if key_iter.all(|key| !mut_table.contains_key(key.as_bytes())) {
failed = Some(false);
// Since the failed flag is false, none of the keys existed
// So we can safely set the keys
let mut iter = act.into_iter().skip(1);
while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
while let (Some(key), Some(value)) = (act.next(), act.next()) {
if !mut_table.true_if_insert(Data::from(key), Data::from_string(value)) {
// Tell the compiler that this will never be the case
unsafe {
@ -104,6 +86,8 @@ where
}
}
}
} else {
failed = Some(true);
}
}
}
@ -126,49 +110,31 @@ where
pub async fn sdel<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.len() - 1;
let howmany = act.len();
if howmany == 0 {
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut failed = Some(false);
let failed;
{
// We use this additional scope to tell the compiler that the write lock
// doesn't go beyond the scope of this function - and is never used across
// an await: cause, the compiler ain't as smart as we are ;)
let mut key_iter = act
.get(1..)
.unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked if the action group contains more than one arugment
unreachable_unchecked()
})
.iter();
let mut key_iter = act.as_ref().iter();
if handle.is_poisoned() {
failed = None;
} else {
let mut_table = handle.get_ref();
while let Some(key) = key_iter.next() {
if !mut_table.contains_key(key.as_bytes()) {
// With one of the keys not existing - this action can't clearly be done
// So we'll set `failed` to true and ensure that we check this while
// writing a response back to the client
failed = Some(true);
break;
}
}
if !failed.unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): Again, completely safe as we always assign a value to
// `failed`
unreachable_unchecked()
}) {
if key_iter.all(|key| mut_table.contains_key(key.as_bytes())) {
failed = Some(false);
// Since the failed flag is false, all of the keys exist
// So we can safely delete the keys
act.into_iter().skip(1).for_each(|key| {
act.into_iter().for_each(|key| {
// Since we've already checked that the keys don't exist
// We'll tell the compiler to optimize this
let _ = mut_table.remove(key.as_bytes()).unwrap_or_else(|| unsafe {
@ -177,6 +143,8 @@ where
unreachable_unchecked()
});
});
} else {
failed = Some(true);
}
}
}
@ -198,13 +166,13 @@ where
pub async fn supdate<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.len() - 1;
let howmany = act.len();
if howmany & 1 == 1 || howmany == 0 {
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
@ -213,13 +181,7 @@ where
// We use this additional scope to tell the compiler that the write lock
// doesn't go beyond the scope of this function - and is never used across
// an await: cause, the compiler ain't as smart as we are ;)
let mut key_iter = act
.get(1..)
.unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked if the action group contains more than one arugment
unreachable_unchecked()
})
.iter();
let mut key_iter = act.as_ref().iter();
if handle.is_poisoned() {
failed = None;
} else {
@ -245,8 +207,7 @@ where
}) {
// Since the failed flag is false, none of the keys existed
// So we can safely update the keys
let mut iter = act.into_iter().skip(1);
while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
while let (Some(key), Some(value)) = (act.next(), act.next()) {
if !mut_table.true_if_update(Data::from(key), Data::from_string(value)) {
// Tell the compiler that this will never be the case
unsafe { unreachable_unchecked() }

@ -31,6 +31,7 @@
use crate::coredb::{self};
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
use coredb::Data;
use std::hint::unreachable_unchecked;
@ -38,30 +39,25 @@ use std::hint::unreachable_unchecked;
pub async fn update<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.len() - 1;
if howmany != 2 {
// There should be exactly 2 arguments
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut it = act.into_iter().skip(1);
crate::err_if_len_is!(act, con, not 2);
let did_we = {
if handle.is_poisoned() {
None
} else {
let writer = handle.get_ref();
if writer.true_if_update(
Data::from(it.next().unwrap_or_else(|| unsafe {
Data::from(act.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked that the action contains exactly
// two arguments (excluding the action itself). So, this branch won't ever be reached
unreachable_unchecked()
})),
Data::from_string(it.next().unwrap_or_else(|| unsafe {
Data::from_string(act.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked that the action contains exactly
// two arguments (excluding the action itself). So, this branch won't ever be reached
unreachable_unchecked()

@ -27,6 +27,7 @@
use crate::coredb::Data;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
/// Run an `USET` query
///
@ -34,26 +35,25 @@ use crate::protocol::responses;
pub async fn uset<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.len() - 1;
let howmany = act.len();
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys
// is not the same as the number of values, we won't run this
// action at all
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut kviter = act.into_iter().skip(1);
let failed = {
if handle.is_poisoned() {
true
} else {
let writer = handle.get_ref();
while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) {
while let (Some(key), Some(val)) = (act.next(), act.next()) {
let _ = writer.upsert(Data::from(key), Data::from(val));
}
drop(writer);

@ -29,6 +29,7 @@ use crate::diskstore;
use crate::diskstore::snapshot::SnapshotEngine;
use crate::diskstore::snapshot::DIR_SNAPSHOT;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
use std::hint::unreachable_unchecked;
use std::path::{Component, PathBuf};
@ -37,14 +38,13 @@ use std::path::{Component, PathBuf};
pub async fn mksnap<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: Vec<String>,
mut act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.len() - 1;
if howmany == 0 {
if act.len() == 0 {
if !handle.is_snapshot_enabled() {
// Since snapshotting is disabled, we can't create a snapshot!
// We'll just return an error returning the same
@ -100,9 +100,9 @@ where
.await;
}
} else {
if howmany == 1 {
if act.len() == 1 {
// This means that the user wants to create a 'named' snapshot
let snapname = act.get(1).unwrap_or_else(|| unsafe {
let snapname = act.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked that the action
// contains a second argument, so this can't be reached
unreachable_unchecked()

@ -36,11 +36,11 @@ use crate::config::SnapshotConfig;
use std::io::{self, prelude::*};
mod config;
use std::env;
mod actions;
mod admin;
mod coredb;
mod dbnet;
mod diskstore;
mod kvengine;
mod protocol;
mod queryengine;
mod resp;

@ -24,8 +24,6 @@
*
*/
use std::borrow::Cow;
#[derive(Debug, PartialEq)]
#[non_exhaustive]
/// # Data Types
@ -41,27 +39,3 @@ pub enum Element {
/// A non-recursive String array; tsymbol: `_`
FlatArray(Vec<String>),
}
impl Element {
/// This will return a reference to the first element in the element
///
/// If this element is a compound type, it will return a reference to the first element in the compound
/// type
pub fn get_first(&self) -> Option<Cow<String>> {
match self {
Self::Array(elem) => match elem.first() {
Some(el) => match el {
Element::String(st) => Some(Cow::Borrowed(&st)),
_ => None,
},
None => None,
},
Self::FlatArray(elem) => match elem.first() {
Some(el) => Some(Cow::Borrowed(&el)),
None => None,
},
Self::String(ref st) => Some(Cow::Borrowed(&st)),
_ => None,
}
}
}

@ -28,50 +28,36 @@
use crate::coredb::CoreDB;
use crate::dbnet::connection::prelude::*;
use crate::gen_match;
use crate::protocol::responses;
use crate::protocol::Element;
use crate::{admin, kvengine};
use crate::{actions, admin};
mod tags {
use std::vec::IntoIter;
pub type ActionIter = IntoIter<String>;
macro_rules! gen_constants_and_matches {
($con:ident, $buf:ident, $db:ident, $($action:ident => $fns:expr),*) => {
mod tags {
//! This module is a collection of tags/strings used for evaluating queries
//! and responses
/// `GET` action tag
pub const TAG_GET: &'static str = "GET";
/// `SET` action tag
pub const TAG_SET: &'static str = "SET";
/// `UPDATE` action tag
pub const TAG_UPDATE: &'static str = "UPDATE";
/// `DEL` action tag
pub const TAG_DEL: &'static str = "DEL";
/// `HEYA` action tag
pub const TAG_HEYA: &'static str = "HEYA";
/// `EXISTS` action tag
pub const TAG_EXISTS: &'static str = "EXISTS";
/// `MSET` action tag
pub const TAG_MSET: &'static str = "MSET";
/// `MGET` action tag
pub const TAG_MGET: &'static str = "MGET";
/// `MUPDATE` action tag
pub const TAG_MUPDATE: &'static str = "MUPDATE";
/// `SSET` action tag
pub const TAG_SSET: &'static str = "SSET";
/// `SDEL` action tag
pub const TAG_SDEL: &'static str = "SDEL";
/// `SUPDATE` action tag
pub const TAG_SUPDATE: &'static str = "SUPDATE";
/// `DBSIZE` action tag
pub const TAG_DBSIZE: &'static str = "DBSIZE";
/// `FLUSHDB` action tag
pub const TAG_FLUSHDB: &'static str = "FLUSHDB";
/// `USET` action tag
pub const TAG_USET: &'static str = "USET";
/// `KEYLEN` action tag
pub const TAG_KEYLEN: &'static str = "KEYLEN";
/// `MKSNAP` action tag
pub const TAG_MKSNAP: &'static str = "MKSNAP";
/// `LSKEYS` action tag
pub const TAG_LSKEYS: &str = "LSKEYS";
$(
pub const $action: &'static str = stringify!($action);
)*
}
let mut first = match $buf.next() {
Some(first) => first,
None => return $con.write_response(&**responses::groups::PACKET_ERR).await,
};
first.make_ascii_uppercase();
match first.as_str() {
$(
tags::$action => $fns($db, $con, $buf).await?,
)*
_ => {
return $con.write_response(&**responses::groups::UNKNOWN_ACTION).await;
}
}
};
}
/// Execute a simple(*) query
@ -80,59 +66,34 @@ where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let first = match buf.get_first() {
Some(element) => element.to_ascii_uppercase(),
None => return con.write_response(&**responses::groups::PACKET_ERR).await,
};
gen_match!(
first,
db,
con,
buf,
tags::TAG_DEL => kvengine::del::del,
tags::TAG_GET => kvengine::get::get,
tags::TAG_HEYA => kvengine::heya::heya,
tags::TAG_EXISTS => kvengine::exists::exists,
tags::TAG_SET => kvengine::set::set,
tags::TAG_MGET => kvengine::mget::mget,
tags::TAG_MSET => kvengine::mset::mset,
tags::TAG_UPDATE => kvengine::update::update,
tags::TAG_MUPDATE => kvengine::mupdate::mupdate,
tags::TAG_SSET => kvengine::strong::sset,
tags::TAG_SDEL => kvengine::strong::sdel,
tags::TAG_SUPDATE => kvengine::strong::supdate,
tags::TAG_DBSIZE => kvengine::dbsize::dbsize,
tags::TAG_FLUSHDB => kvengine::flushdb::flushdb,
tags::TAG_USET => kvengine::uset::uset,
tags::TAG_KEYLEN => kvengine::keylen::keylen,
tags::TAG_MKSNAP => admin::mksnap::mksnap,
tags::TAG_LSKEYS => kvengine::lskeys::lskeys
);
Ok(())
}
#[macro_export]
/// A match generator macro built specifically for the `crate::queryengine::execute_simple` function
///
/// **NOTE:** This macro needs _paths_ for both sides of the $x => $y, to produce something sensible
macro_rules! gen_match {
($pre:ident, $db:ident, $con:ident, $buf:ident, $($x:pat => $y:expr),*) => {
let flat_array = if let crate::protocol::Element::FlatArray(array) = $buf {
array
let buf = if let Element::FlatArray(arr) = buf {
arr
} else {
return $con.write_response(&**responses::groups::WRONGTYPE_ERR).await;
};
match $pre.as_str() {
// First repeat over all the $x => $y patterns, passing in the variables
// and adding .await calls and adding the `?`
$(
$x => $y($db, $con, flat_array).await?,
)*
// Now add the final case where no action is matched
_ => {
return $con.write_response(&**responses::groups::UNKNOWN_ACTION)
return con
.write_response(&**responses::full_responses::R_WRONGTYPE_ERR)
.await;
},
}
};
let mut buf = buf.into_iter();
gen_constants_and_matches!(
con, buf, db,
GET => actions::get::get,
SET => actions::set::set,
UPDATE => actions::update::update,
DEL => actions::del::del,
HEYA => actions::heya::heya,
EXISTS => actions::exists::exists,
MSET => actions::mset::mset,
MGET => actions::mget::mget,
MUPDATE => actions::mupdate::mupdate,
SSET => actions::strong::sset,
SDEL => actions::strong::sdel,
SUPDATE => actions::strong::supdate,
DBSIZE => actions::dbsize::dbsize,
FLUSHDB => actions::flushdb::flushdb,
USET => actions::uset::uset,
KEYLEN => actions::keylen::keylen,
MKSNAP => admin::mksnap::mksnap,
LSKEYS => actions::lskeys::lskeys
);
Ok(())
}

Loading…
Cancel
Save