Merge branch 'model/tsymbol' into next

next
Sayan Nandan 3 years ago
commit 3bd34ae5c9

71
Cargo.lock generated

@ -139,9 +139,9 @@ dependencies = [
[[package]]
name = "crossbeam-deque"
version = "0.8.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9"
checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e"
dependencies = [
"cfg-if",
"crossbeam-epoch",
@ -291,9 +291,9 @@ checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
[[package]]
name = "futures"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27"
checksum = "1adc00f486adfc9ce99f77d717836f0c5aa84965eb0b4f051f4e83f7cab53f8b"
dependencies = [
"futures-channel",
"futures-core",
@ -306,9 +306,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2"
checksum = "74ed2411805f6e4e3d9bc904c95d5d423b89b3b25dc0250aa74729de20629ff9"
dependencies = [
"futures-core",
"futures-sink",
@ -316,15 +316,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1"
checksum = "af51b1b4a7fdff033703db39de8802c673eb91855f2e0d47dcf3bf2c0ef01f99"
[[package]]
name = "futures-executor"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79"
checksum = "4d0d535a57b87e1ae31437b892713aee90cd2d7b0ee48727cd11fc72ef54761c"
dependencies = [
"futures-core",
"futures-task",
@ -333,15 +333,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1"
checksum = "0b0e06c393068f3a6ef246c75cdca793d6a46347e75286933e5e75fd2fd11582"
[[package]]
name = "futures-macro"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121"
checksum = "c54913bae956fb8df7f4dc6fc90362aa72e69148e3f39041fbe8742d21e0ac57"
dependencies = [
"autocfg",
"proc-macro-hack",
@ -352,21 +352,21 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a57bead0ceff0d6dde8f465ecd96c9338121bb7717d3e7b108059531870c4282"
checksum = "c0f30aaa67363d119812743aa5f33c201a7a66329f97d1a887022971feea4b53"
[[package]]
name = "futures-task"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a16bef9fc1a4dddb5bee51c989e3fbba26569cbb0e31f5b303c184e3dd33dae"
checksum = "bbe54a98670017f3be909561f6ad13e810d9a51f3f061b902062ca3da80799f2"
[[package]]
name = "futures-util"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967"
checksum = "67eb846bfd58e44a8481a00049e82c43e0ccb5d61f8dc071057cb19249dd4d78"
dependencies = [
"autocfg",
"futures-channel",
@ -420,9 +420,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "instant"
version = "0.1.9"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d"
dependencies = [
"cfg-if",
]
@ -553,14 +553,15 @@ dependencies = [
[[package]]
name = "nix"
version = "0.20.0"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa9b4819da1bc61c0ea48b63b7bc8604064dd43013e7cc325df098d49cd7c18a"
checksum = "df8e5e343312e7fbeb2a52139114e9e702991ef9c2aea6817ff2440b35647d56"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
"memoffset",
]
[[package]]
@ -800,9 +801,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.2.9"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ab49abadf3f9e1c4bc499e8845e152ad87d2ad2d30371841171169e9d75feee"
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
dependencies = [
"bitflags",
]
@ -1001,7 +1002,7 @@ dependencies = [
[[package]]
name = "skytable"
version = "0.4.0"
source = "git+https://github.com/skytable/client-rust?branch=next#69677c1cd5a11b4b2c0f0728c1eb504e1c128cac"
source = "git+https://github.com/skytable/client-rust?branch=next#9c44e56dc1a4bcaa755b5a07066f800d94f67b5f"
dependencies = [
"bytes",
"openssl",
@ -1011,9 +1012,9 @@ dependencies = [
[[package]]
name = "slab"
version = "0.4.3"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527"
checksum = "c307a32c1c5c437f38c7fd45d753050587732ba8628319fbdf12a7e289ccc590"
[[package]]
name = "smallvec"
@ -1105,9 +1106,9 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b7b349f11a7047e6d1276853e612d152f5e8a352c61917887cc2169e2366b4c"
checksum = "01cf844b23c6131f624accf65ce0e4e9956a8bb329400ea5bcc26ae3a5c20b0b"
dependencies = [
"autocfg",
"bytes",
@ -1125,9 +1126,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "1.2.0"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37"
checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110"
dependencies = [
"proc-macro2",
"quote",
@ -1157,9 +1158,9 @@ dependencies = [
[[package]]
name = "unicode-segmentation"
version = "1.7.1"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-width"

@ -26,6 +26,7 @@
use core::future::Future;
use core::pin::Pin;
use crossterm::style::{Color, Print, ResetColor, SetForegroundColor};
use skytable::types::FlatElement;
use skytable::Query;
use skytable::{aio, Element, RespCode, Response};
use std::io::Error as IoError;
@ -108,6 +109,12 @@ macro_rules! write_err {
};
}
macro_rules! str {
($in:expr) => {
String::from_utf8_lossy(&$in)
};
}
macro_rules! write_okay {
() => {
crossterm::execute!(
@ -141,10 +148,16 @@ impl<T: AsyncSocket> Runner<T> {
println!("ERROR: The server sent an invalid response");
}
Response::Item(element) => match element {
Element::String(st) => write_string!(st),
Element::FlatArray(arr) => print_flat_array(arr),
Element::Str(st) => write_string!(st),
Element::Binstr(st) => {
let st = String::from_utf8_lossy(&st);
write_string!(st)
}
Element::BinArray(brr) => print_bin_array(brr),
Element::StrArray(srr) => print_str_array(srr),
Element::RespCode(r) => print_rcode(r, None),
Element::UnsignedInt(int) => write_int!(int),
Element::FlatArray(frr) => write_flat_array(frr),
Element::Array(a) => print_array(a),
_ => unimplemented!(),
},
@ -180,21 +193,56 @@ fn print_rcode(rcode: RespCode, idx: Option<usize>) {
}
}
fn print_flat_array(flat_array: Vec<String>) {
flat_array.into_iter().enumerate().for_each(|(idx, item)| {
fn print_bin_array(bin_array: Vec<Option<Vec<u8>>>) {
bin_array.into_iter().enumerate().for_each(|(idx, elem)| {
let idx = idx + 1;
write_string!(idx, item)
match elem {
Some(ele) => {
let st = String::from_utf8_lossy(&ele);
println!("({}) {}", idx, st)
}
None => print_rcode(RespCode::NotFound, Some(idx)),
}
})
}
fn print_str_array(str_array: Vec<Option<String>>) {
str_array.into_iter().enumerate().for_each(|(idx, elem)| {
let idx = idx + 1;
match elem {
Some(ele) => {
println!("({}) {}", idx, ele)
}
None => print_rcode(RespCode::NotFound, Some(idx)),
}
})
}
fn write_flat_array(flat_array: Vec<FlatElement>) {
for (idx, item) in flat_array.into_iter().enumerate() {
let idx = idx + 1;
match item {
FlatElement::String(st) => write_string!(idx, st),
FlatElement::Binstr(st) => {
let st = str!(st);
write_string!(idx, st)
}
FlatElement::RespCode(rc) => print_rcode(rc, Some(idx)),
FlatElement::UnsignedInt(int) => write_int!(int, idx),
}
}
}
fn print_array(array: Vec<Element>) {
for (idx, item) in array.into_iter().enumerate() {
let idx = idx + 1;
match item {
Element::String(st) => write_string!(idx, st),
Element::Str(st) => write_string!(idx, st),
Element::RespCode(rc) => print_rcode(rc, Some(idx)),
Element::UnsignedInt(int) => write_int!(idx, int),
Element::FlatArray(a) => print_flat_array(a),
_ => unimplemented!("Nested arrays cannot be printed just yet"),
Element::BinArray(brr) => print_bin_array(brr),
Element::StrArray(srr) => print_str_array(srr),
_ => eprintln!("Nested arrays cannot be printed just yet"),
}
}
}

@ -28,6 +28,7 @@
//! This module provides functions to work with `DEL` queries
use crate::dbnet::connection::prelude::*;
use crate::util::compiler;
action!(
/// Run a `DEL` query
@ -36,25 +37,35 @@ action!(
/// It will write an entire datagroup, for this `del` action
fn del(handle: &Corestore, con: &mut T, act: ActionIter) {
err_if_len_is!(act, con, eq 0);
let done_howmany: Option<usize>;
{
if registry::state_okay() {
let mut many = 0;
let cmap = kve!(con, handle);
act.for_each(|key| {
if not_enc_err!(cmap.remove(&key)) {
many += 1
}
});
done_howmany = Some(many);
let kve = kve!(con, handle);
let encoding_is_okay = if kve.needs_key_encoding() {
true
} else {
let encoder = kve.get_key_encoder();
act.as_ref().iter().all(|k| encoder.is_ok(k))
};
if compiler::likely(encoding_is_okay) {
let done_howmany: Option<usize>;
{
if registry::state_okay() {
let mut many = 0;
act.for_each(|key| {
if kve.remove_unchecked(&key) {
many += 1
}
});
done_howmany = Some(many);
} else {
done_howmany = None;
}
}
if let Some(done_howmany) = done_howmany {
con.write_response(done_howmany).await
} else {
done_howmany = None;
con.write_response(responses::groups::SERVER_ERR).await
}
}
if let Some(done_howmany) = done_howmany {
con.write_response(done_howmany).await
} else {
con.write_response(responses::groups::SERVER_ERR).await
conwrite!(con, groups::ENCODING_ERROR)
}
}
);

@ -35,15 +35,25 @@ action!(
fn exists(handle: &Corestore, con: &mut T, act: ActionIter) {
err_if_len_is!(act, con, eq 0);
let mut how_many_of_them_exist = 0usize;
{
let cmap = kve!(con, handle);
act.for_each(|key| {
if not_enc_err!(cmap.exists(&key)) {
how_many_of_them_exist += 1;
}
});
let kve = kve!(con, handle);
let encoding_is_okay = if kve.needs_key_encoding() {
true
} else {
let encoder = kve.get_key_encoder();
act.as_ref().iter().all(|k| encoder.is_ok(k))
};
if encoding_is_okay {
{
act.for_each(|key| {
if kve.exists_unchecked(&key) {
how_many_of_them_exist += 1;
}
});
}
con.write_response(how_many_of_them_exist).await?;
} else {
conwrite!(con, groups::ENCODING_ERROR)?;
}
con.write_response(how_many_of_them_exist).await?;
Ok(())
}
);

@ -28,30 +28,18 @@
//! This module provides functions to work with `GET` queries
use crate::dbnet::connection::prelude::*;
use crate::resp::BytesWrapper;
use bytes::Bytes;
use crate::resp::writer;
action!(
/// Run a `GET` query
fn get(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter) {
err_if_len_is!(act, con, not 1);
let res: Option<Bytes> = {
let reader = kve!(con, handle);
unsafe {
// UNSAFE(@ohsayan): this is safe because we've already checked if the action
// group contains one argument (excluding the action itself)
match reader.get(&act.next().unsafe_unwrap()) {
Ok(v) => v.map(|b| b.get_blob().clone()),
Err(_) => None,
}
let kve = kve!(con, handle);
unsafe {
match kve.get_cloned_with_tsymbol(&act.next().unsafe_unwrap()) {
Ok((Some(val), tsymbol)) => writer::write_raw_mono(con, tsymbol, &val).await?,
Err(_) => conwrite!(con, groups::ENCODING_ERROR)?,
Ok(_) => conwrite!(con, groups::NIL)?,
}
};
if let Some(value) = res {
// Good, we got the value, write it off to the stream
con.write_response(BytesWrapper(value)).await?;
} else {
// Ah, couldn't find that key
con.write_response(responses::groups::NIL).await?;
}
Ok(())
}

@ -26,7 +26,7 @@
use crate::corestore::memstore::DdlError;
use crate::dbnet::connection::prelude::*;
use crate::resp::BytesWrapper;
use crate::resp::writer::TypedArrayWriter;
use bytes::Bytes;
const DEFAULT_COUNT: usize = 10;
@ -71,9 +71,14 @@ action!(
Err(_) => unsafe { impossible!() },
};
let items: Vec<Bytes> = kve.__get_inner_ref().get_keys(count);
con.write_flat_array_length(items.len()).await?;
for item in items {
con.write_response(BytesWrapper(item)).await?;
let tsymbol = kve.get_kt();
let mut writer = unsafe {
// SAFETY: We have checked kty ourselves
TypedArrayWriter::new(con, tsymbol, items.len())
}
.await?;
for key in items {
writer.write_element(key).await?;
}
Ok(())
}

@ -26,28 +26,35 @@
use crate::dbnet::connection::prelude::*;
use crate::queryengine::ActionIter;
use crate::resp::BytesWrapper;
use bytes::Bytes;
use skytable::RespCode;
use crate::resp::writer::TypedArrayWriter;
use crate::util::compiler;
action!(
/// Run an `MGET` query
///
fn mget(handle: &crate::corestore::Corestore, con: &mut T, act: ActionIter) {
crate::err_if_len_is!(act, con, eq 0);
con.write_array_length(act.len()).await?;
for key in act {
let res: Option<Bytes> = match kve!(con, handle).get(&key) {
Ok(v) => v.map(|b| b.get_blob().clone()),
Err(_) => None,
};
if let Some(value) = res {
// Good, we got the value, write it off to the stream
con.write_response(BytesWrapper(value)).await?;
} else {
// Ah, couldn't find that key
con.write_response(RespCode::NotFound).await?;
let kve = kve!(con, handle);
let encoding_is_okay = if kve.needs_key_encoding() {
true
} else {
let encoder = kve.get_key_encoder();
act.as_ref().iter().all(|k| encoder.is_ok(k))
};
if compiler::likely(encoding_is_okay) {
let mut writer = unsafe {
// SAFETY: We are getting the value type ourselves
TypedArrayWriter::new(con, kve.get_vt(), act.len())
}
.await?;
for key in act {
match kve.get_cloned_unchecked(&key) {
Some(v) => writer.write_element(&v).await?,
None => writer.write_null().await?,
}
}
} else {
conwrite!(con, groups::ENCODING_ERROR)?;
}
Ok(())
}

@ -28,31 +28,41 @@ use crate::corestore;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
use crate::resp::BytesWrapper;
use crate::resp::writer::FlatArrayWriter;
use crate::util::compiler;
action!(
/// Run an MPOP action
fn mpop(handle: &corestore::Corestore, con: &mut T, act: ActionIter) {
err_if_len_is!(act, con, eq 0);
if registry::state_okay() {
con.write_array_length(act.len()).await?;
for key in act {
if !registry::state_okay() {
// we keep this check just in case the server fails in-between running a
// pop operation
con.write_response(responses::groups::SERVER_ERR).await?;
} else {
match kve!(con, handle).pop(&key) {
Ok(Some((_key, val))) => {
con.write_response(BytesWrapper(val.into_inner())).await?
}
Ok(None) => con.write_response(responses::groups::NIL).await?,
Err(_) => {
con.write_response(responses::groups::ENCODING_ERROR)
.await?
let kve = kve!(con, handle);
let encoding_is_okay = if kve.needs_key_encoding() {
true
} else {
let encoder = kve.get_key_encoder();
act.as_ref().iter().all(|k| encoder.is_ok(k))
};
if compiler::likely(encoding_is_okay) {
let mut writer = unsafe {
// SAFETY: We have verified the tsymbol ourselves
FlatArrayWriter::new(con, kve.get_vt(), act.len())
}
.await?;
for key in act {
if registry::state_okay() {
match kve.pop_unchecked(&key) {
Some((_key, val)) => writer.write_element(val).await?,
None => writer.write_nil().await?,
}
} else {
// we keep this check just in case the server fails in-between running a
// pop operation
writer.write_server_error().await?;
}
}
} else {
conwrite!(con, groups::ENCODING_ERROR)?;
}
} else {
// don't begin the operation at all if the database is poisoned

@ -26,6 +26,7 @@
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::util::compiler;
action!(
/// Run an `MSET` query
@ -37,13 +38,22 @@ action!(
// action at all
return con.write_response(responses::groups::ACTION_ERR).await;
}
let done_howmany: Option<usize>;
{
let kve = kve!(con, handle);
let encoding_is_okay = if kve.needs_no_encoding() {
true
} else {
let encoder = kve.get_encoder();
act.as_ref().chunks_exact(2).all(|kv| unsafe {
let (k, v) = (kv.get_unchecked(0), kv.get_unchecked(1));
encoder.is_ok(k, v)
})
};
if compiler::likely(encoding_is_okay) {
let done_howmany: Option<usize>;
if registry::state_okay() {
let writer = kve!(con, handle);
let mut didmany = 0;
while let (Some(key), Some(val)) = (act.next(), act.next()) {
if not_enc_err!(writer.set(Data::from(key), Data::from(val))) {
if kve.set_unchecked(Data::from(key), Data::from(val)) {
didmany += 1;
}
}
@ -51,11 +61,13 @@ action!(
} else {
done_howmany = None;
}
}
if let Some(done_howmany) = done_howmany {
return con.write_response(done_howmany as usize).await;
if let Some(done_howmany) = done_howmany {
return con.write_response(done_howmany as usize).await;
} else {
return con.write_response(responses::groups::SERVER_ERR).await;
}
} else {
return con.write_response(responses::groups::SERVER_ERR).await;
conwrite!(con, groups::ENCODING_ERROR)
}
}
);

@ -26,6 +26,7 @@
use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::util::compiler;
action!(
/// Run an `MUPDATE` query
@ -37,13 +38,22 @@ action!(
// action at all
return con.write_response(responses::groups::ACTION_ERR).await;
}
let kve = kve!(con, handle);
let encoding_is_okay = if kve.needs_no_encoding() {
true
} else {
let encoder = kve.get_encoder();
act.as_ref().chunks_exact(2).all(|kv| unsafe {
let (k, v) = (kv.get_unchecked(0), kv.get_unchecked(1));
encoder.is_ok(k, v)
})
};
let done_howmany: Option<usize>;
{
if compiler::likely(encoding_is_okay) {
if registry::state_okay() {
let writer = kve!(con, handle);
let mut didmany = 0;
while let (Some(key), Some(val)) = (act.next(), act.next()) {
if not_enc_err!(writer.update(Data::from(key), Data::from(val))) {
if kve.update_unchecked(Data::from(key), Data::from(val)) {
didmany += 1;
}
}
@ -51,11 +61,13 @@ action!(
} else {
done_howmany = None;
}
}
if let Some(done_howmany) = done_howmany {
return con.write_response(done_howmany as usize).await;
if let Some(done_howmany) = done_howmany {
return con.write_response(done_howmany as usize).await;
} else {
return con.write_response(responses::groups::SERVER_ERR).await;
}
} else {
return con.write_response(responses::groups::SERVER_ERR).await;
conwrite!(con, groups::ENCODING_ERROR)
}
}
);

@ -25,7 +25,7 @@
*/
use crate::dbnet::connection::prelude::*;
use crate::resp::BytesWrapper;
use crate::resp::writer;
action! {
fn pop(handle: &Corestore, con: &mut T, mut act: ActionIter) {
@ -35,8 +35,13 @@ action! {
act.next().unsafe_unwrap()
};
if registry::state_okay() {
match kve!(con, handle).pop(&key) {
Ok(Some((_key, val))) => conwrite!(con, BytesWrapper(val.into_inner()))?,
let kve = kve!(con, handle);
let tsymbol = kve.get_vt();
match kve.pop(&key) {
Ok(Some((_key, val))) => unsafe {
// SAFETY: We have verified the tsymbol ourselves
writer::write_raw_mono(con, tsymbol, &val).await?
},
Ok(None) => conwrite!(con, groups::NIL)?,
Err(()) => conwrite!(con, groups::ENCODING_ERROR)?,
}

@ -37,35 +37,34 @@ action!(
/// Run a `SET` query
fn set(handle: &crate::corestore::Corestore, con: &mut T, mut act: ActionIter) {
err_if_len_is!(act, con, not 2);
let did_we = {
if registry::state_okay() {
if registry::state_okay() {
let did_we = {
let writer = kve!(con, handle);
// clippy thinks we're doing something complex when we aren't, at all!
#[allow(clippy::blocks_in_if_conditions)]
if unsafe {
match unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments
not_enc_err!(writer.set(
writer.set(
Data::from(act.next().unsafe_unwrap()),
Data::from(act.next().unsafe_unwrap()),
))
)
} {
Some(true)
Ok(true) => Some(true),
Ok(false) => Some(false),
Err(()) => None,
}
};
if let Some(did_we) = did_we {
if did_we {
con.write_response(responses::groups::OKAY).await?;
} else {
Some(false)
con.write_response(responses::groups::OVERWRITE_ERR).await?;
}
} else {
None
}
};
if let Some(did_we) = did_we {
if did_we {
con.write_response(responses::groups::OKAY).await?;
} else {
con.write_response(responses::groups::OVERWRITE_ERR).await?;
con.write_response(responses::groups::ENCODING_ERROR)
.await?;
}
} else {
con.write_response(responses::groups::SERVER_ERR).await?;
conwrite!(con, groups::SERVER_ERR)?;
}
Ok(())
}

@ -35,35 +35,34 @@ action!(
/// Run an `UPDATE` query
fn update(handle: &Corestore, con: &mut T, mut act: ActionIter) {
err_if_len_is!(act, con, not 2);
let did_we = {
if registry::state_okay() {
if registry::state_okay() {
let did_we = {
let writer = kve!(con, handle);
// clippy thinks we're doing something complex when we aren't, at all!
#[allow(clippy::blocks_in_if_conditions)]
if unsafe {
match unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments
not_enc_err!(writer.update(
writer.update(
Data::from(act.next().unsafe_unwrap()),
Data::from(act.next().unsafe_unwrap()),
))
)
} {
Some(true)
Ok(true) => Some(true),
Ok(false) => Some(false),
Err(()) => None,
}
};
if let Some(did_we) = did_we {
if did_we {
con.write_response(responses::groups::OKAY).await?;
} else {
Some(false)
con.write_response(responses::groups::NIL).await?;
}
} else {
None
}
};
if let Some(did_we) = did_we {
if did_we {
con.write_response(responses::groups::OKAY).await?;
} else {
con.write_response(responses::groups::NIL).await?;
con.write_response(responses::groups::ENCODING_ERROR)
.await?;
}
} else {
con.write_response(responses::groups::SERVER_ERR).await?;
conwrite!(con, groups::SERVER_ERR)?;
}
Ok(())
}

@ -28,6 +28,7 @@ use crate::corestore::Data;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
use crate::util::compiler;
action!(
/// Run an `USET` query
@ -41,21 +42,27 @@ action!(
// action at all
return con.write_response(responses::groups::ACTION_ERR).await;
}
let failed = {
let kve = kve!(con, handle);
let encoding_is_okay = if kve.needs_no_encoding() {
true
} else {
let encoder = kve.get_encoder();
act.as_ref().chunks_exact(2).all(|kv| unsafe {
let (k, v) = (kv.get_unchecked(0), kv.get_unchecked(1));
encoder.is_ok(k, v)
})
};
if compiler::likely(encoding_is_okay) {
if registry::state_okay() {
let writer = kve!(con, handle);
while let (Some(key), Some(val)) = (act.next(), act.next()) {
let _ = writer.upsert(Data::from(key), Data::from(val));
kve.upsert_unchecked(Data::from(key), Data::from(val));
}
false
conwrite!(con, howmany / 2)
} else {
true
conwrite!(con, groups::SERVER_ERR)
}
};
if failed {
con.write_response(responses::groups::SERVER_ERR).await
} else {
con.write_response(howmany / 2).await
conwrite!(con, groups::ENCODING_ERROR)
}
}
);

@ -168,6 +168,16 @@ where
}
}
impl<K: Eq + Hash, V: Clone> Coremap<K, V> {
pub fn get_cloned<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner.get_cloned(key)
}
}
impl Coremap<Data, Data> {
/// Returns atleast `count` number of keys from the hashtable
pub fn get_keys(&self, count: usize) -> Vec<Bytes> {

@ -373,7 +373,7 @@ impl<'a, K: 'a + Hash + Eq, V: 'a, S: BuildHasher + Clone> Skymap<K, V, S> {
}
// cloned impls
impl<'a, K: Clone, V: Clone, S: BuildHasher> Skymap<K, V, S> {
impl<'a, K, V: Clone, S: BuildHasher> Skymap<K, V, S> {
pub fn get_cloned<Q>(&'a self, k: &Q) -> Option<V>
where
K: Borrow<Q>,

@ -83,7 +83,6 @@ pub mod prelude {
pub use crate::handle_entity;
pub use crate::is_lowbit_set;
pub use crate::kve;
pub use crate::not_enc_err;
pub use crate::protocol::responses;
pub use crate::protocol::responses::groups;
pub use crate::queryengine::ActionIter;
@ -105,15 +104,6 @@ pub mod prelude {
};
}
#[macro_export]
macro_rules! not_enc_err {
($val:expr) => {
match $val {
Ok(v) => v,
Err(_) => false,
}
};
}
#[macro_export]
macro_rules! default_keyspace {
($store:expr, $con:expr) => {
match $store.get_keyspace() {
@ -367,6 +357,9 @@ where
ret
})
}
unsafe fn raw_stream(&mut self) -> &mut BufWriter<Strm> {
self.get_mut_stream()
}
}
/// # The `ProtocolConnection` trait

@ -27,16 +27,16 @@
use crate::corestore::htable::Coremap;
use crate::corestore::htable::Data;
use crate::corestore::map::bref::Ref;
use crate::resp::TSYMBOL_BINARY;
use crate::resp::TSYMBOL_UNICODE;
use core::borrow::Borrow;
use core::hash::Hash;
use core::sync::atomic::AtomicBool;
use core::sync::atomic::Ordering;
pub mod encoding;
const ORD_RELAXED: Ordering = Ordering::Relaxed;
/// An arbitrary unicode/binary _double encoder_ for two byte slice inputs
pub struct DoubleEncoder {
fn_ptr: fn(&[u8], &[u8]) -> bool,
v_t: u8,
}
impl DoubleEncoder {
@ -44,11 +44,15 @@ impl DoubleEncoder {
pub fn is_ok(&self, a: &[u8], b: &[u8]) -> bool {
(self.fn_ptr)(a, b)
}
pub const fn get_tsymbol(&self) -> u8 {
self.v_t
}
}
/// A _single encoder_ for a single byte slice input
pub struct SingleEncoder {
fn_ptr: fn(&[u8]) -> bool,
v_t: u8,
}
impl SingleEncoder {
@ -56,6 +60,27 @@ impl SingleEncoder {
pub fn is_ok(&self, a: &[u8]) -> bool {
(self.fn_ptr)(a)
}
pub const fn get_tsymbol(&self) -> u8 {
self.v_t
}
}
macro_rules! d_encoder {
($fn:expr, $t:expr) => {
DoubleEncoder {
fn_ptr: $fn,
v_t: $t,
}
};
}
macro_rules! s_encoder {
($fn:expr, $t:expr) => {
SingleEncoder {
fn_ptr: $fn,
v_t: $t,
}
};
}
// DROP impl isn't required as ShardLock's field types need-drop (std::mem)
@ -67,9 +92,9 @@ pub struct KVEngine {
/// the atomic table
table: Coremap<Data, Data>,
/// the encoding switch for the key
encoded_k: AtomicBool,
encoded_k: bool,
/// the encoding switch for the value
encoded_v: AtomicBool,
encoded_v: bool,
}
impl Default for KVEngine {
@ -88,83 +113,73 @@ impl KVEngine {
pub fn init_with_data(encoded_k: bool, encoded_v: bool, table: Coremap<Data, Data>) -> Self {
Self {
table,
encoded_k: AtomicBool::new(encoded_k),
encoded_v: AtomicBool::new(encoded_v),
encoded_k,
encoded_v,
}
}
pub fn get_encoding(&self) -> (bool, bool) {
(
self.encoded_k.load(ORD_RELAXED),
self.encoded_v.load(ORD_RELAXED),
)
(self.encoded_k, self.encoded_v)
}
/// Returns an encoder for the key and the value
pub fn get_encoder(&self) -> DoubleEncoder {
let (encoded_k, encoded_v) = (
self.encoded_k.load(ORD_RELAXED),
self.encoded_v.load(ORD_RELAXED),
);
let ret = match (encoded_k, encoded_v) {
match self.get_encoding() {
(true, true) => {
// both k & v
fn is_okay(key: &[u8], value: &[u8]) -> bool {
encoding::is_utf8(key) && encoding::is_utf8(value)
}
is_okay
d_encoder!(is_okay, TSYMBOL_UNICODE)
}
(true, false) => {
// only k
fn is_okay(key: &[u8], _value: &[u8]) -> bool {
encoding::is_utf8(key)
}
is_okay
d_encoder!(is_okay, TSYMBOL_BINARY)
}
(false, false) => {
// none
fn is_okay(_k: &[u8], _v: &[u8]) -> bool {
true
}
is_okay
d_encoder!(is_okay, TSYMBOL_BINARY)
}
(false, true) => {
// only v
fn is_okay(_k: &[u8], v: &[u8]) -> bool {
encoding::is_utf8(v)
}
is_okay
d_encoder!(is_okay, TSYMBOL_UNICODE)
}
};
DoubleEncoder { fn_ptr: ret }
}
}
/// Returns an encoder for the key
pub fn get_key_encoder(&self) -> SingleEncoder {
let ret = if self.encoded_k.load(ORD_RELAXED) {
if self.encoded_k {
fn e(inp: &[u8]) -> bool {
encoding::is_utf8(inp)
}
e
s_encoder!(e, TSYMBOL_UNICODE)
} else {
fn e(_inp: &[u8]) -> bool {
true
}
e
};
SingleEncoder { fn_ptr: ret }
s_encoder!(e, TSYMBOL_BINARY)
}
}
/// Returns an encoder for the value
pub fn get_value_encoder(&self) -> SingleEncoder {
let ret = if self.encoded_v.load(ORD_RELAXED) {
if self.encoded_v {
fn e(inp: &[u8]) -> bool {
encoding::is_utf8(inp)
}
e
s_encoder!(e, TSYMBOL_UNICODE)
} else {
fn e(_inp: &[u8]) -> bool {
true
}
e
};
SingleEncoder { fn_ptr: ret }
s_encoder!(e, TSYMBOL_BINARY)
}
}
pub fn len(&self) -> usize {
self.table.len()
@ -185,6 +200,38 @@ impl KVEngine {
pub fn truncate_table(&self) {
self.table.clear()
}
pub const fn needs_value_encoding(&self) -> bool {
self.encoded_v
}
pub const fn needs_key_encoding(&self) -> bool {
self.encoded_k
}
pub const fn needs_no_encoding(&self) -> bool {
!(self.encoded_k && self.encoded_v)
}
pub const fn get_vt(&self) -> u8 {
if self.encoded_v {
TSYMBOL_UNICODE
} else {
TSYMBOL_BINARY
}
}
pub const fn get_kt(&self) -> u8 {
if self.encoded_k {
TSYMBOL_UNICODE
} else {
TSYMBOL_BINARY
}
}
/// Get the value for a given key if it exists
pub fn get_with_tsymbol<Q>(&self, key: &Q) -> Result<(Option<Ref<Data, Data>>, u8), ()>
where
Data: Borrow<Q>,
Q: AsRef<[u8]> + Hash + Eq + ?Sized,
{
self._encode_key(key)?;
Ok((self.table.get(key), self.get_vt()))
}
/// Get the value for a given key if it exists
pub fn get<Q>(&self, key: &Q) -> Result<Option<Ref<Data, Data>>, ()>
where
@ -194,6 +241,31 @@ impl KVEngine {
self._encode_key(key)?;
Ok(self.table.get(key))
}
/// Get the value for a given key if it exists, returning a cloned reference
pub fn get_cloned<Q>(&self, key: &Q) -> Result<Option<Data>, ()>
where
Data: Borrow<Q>,
Q: AsRef<[u8]> + Hash + Eq + ?Sized,
{
self._encode_key(key)?;
Ok(self.table.get_cloned(key))
}
pub fn get_cloned_unchecked<Q>(&self, key: &Q) -> Option<Data>
where
Data: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.table.get_cloned(key)
}
/// Get the value for a given key if it exists, returning a cloned reference
pub fn get_cloned_with_tsymbol<Q>(&self, key: &Q) -> Result<(Option<Data>, u8), ()>
where
Data: Borrow<Q>,
Q: AsRef<[u8]> + Hash + Eq + ?Sized,
{
self._encode_key(key)?;
Ok((self.table.get_cloned(key), self.get_vt()))
}
pub fn exists<Q>(&self, key: &Q) -> Result<bool, ()>
where
Data: Borrow<Q>,
@ -202,6 +274,13 @@ impl KVEngine {
self._encode_key(key)?;
Ok(self.table.contains_key(key))
}
pub fn exists_unchecked<Q>(&self, key: &Q) -> bool
where
Data: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.table.contains_key(key)
}
/// Check the unicode encoding of a given byte array
fn _encode<T: AsRef<[u8]>>(data: T) -> Result<(), ()> {
if encoding::is_utf8(data.as_ref()) {
@ -212,15 +291,16 @@ impl KVEngine {
}
/// Check the unicode encoding of the given key, if the encoded_k flag is set
fn _encode_key<T: AsRef<[u8]>>(&self, key: T) -> Result<(), ()> {
if self.encoded_k.load(ORD_RELAXED) {
Self::_encode(key.as_ref())
if self.encoded_k {
Self::_encode(key.as_ref())?;
Ok(())
} else {
Ok(())
}
}
/// Check the unicode encoding of the given value, if the encoded_v flag is set
fn _encode_value<T: AsRef<[u8]>>(&self, value: T) -> Result<(), ()> {
if self.encoded_v.load(ORD_RELAXED) {
if self.encoded_v {
Self::_encode(value)
} else {
Ok(())
@ -232,12 +312,20 @@ impl KVEngine {
self._encode_value(&value)?;
Ok(self.table.true_if_insert(key, value))
}
/// Set the value of a non-existent key
pub fn set_unchecked(&self, key: Data, value: Data) -> bool {
self.table.true_if_insert(key, value)
}
/// Update the value of an existing key
pub fn update(&self, key: Data, value: Data) -> Result<bool, ()> {
self._encode_key(&key)?;
self._encode_value(&value)?;
Ok(self.table.true_if_update(key, value))
}
/// Update the value of an existing key
pub fn update_unchecked(&self, key: Data, value: Data) -> bool {
self.table.true_if_update(key, value)
}
/// Update or insert the value of a key
pub fn upsert(&self, key: Data, value: Data) -> Result<(), ()> {
self._encode_key(&key)?;
@ -245,6 +333,10 @@ impl KVEngine {
self.table.upsert(key, value);
Ok(())
}
/// Update or insert the value of a key
pub fn upsert_unchecked(&self, key: Data, value: Data) {
self.table.upsert(key, value);
}
/// Remove an existing key
pub fn remove<Q>(&self, key: &Q) -> Result<bool, ()>
where
@ -254,6 +346,14 @@ impl KVEngine {
self._encode_key(key)?;
Ok(self.table.true_if_removed(key))
}
/// Remove an existing key
pub fn remove_unchecked<Q>(&self, key: &Q) -> bool
where
Data: Borrow<Q>,
Q: AsRef<[u8]> + Hash + Eq + ?Sized,
{
self.table.true_if_removed(key)
}
pub fn pop<Q>(&self, key: &Q) -> Result<Option<(Data, Data)>, ()>
where
Data: Borrow<Q>,
@ -262,6 +362,13 @@ impl KVEngine {
self._encode_key(key)?;
Ok(self.table.remove(key))
}
pub fn pop_unchecked<Q>(&self, key: &Q) -> Option<(Data, Data)>
where
Data: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.table.remove(key)
}
}
#[test]

@ -40,6 +40,8 @@ pub enum Element {
UnsignedInt(u64),
/// A non-recursive String array; tsymbol: `_`
FlatArray(Vec<Bytes>),
/// A type-less non-recursive array
AnyArray(Vec<Bytes>),
/// Swap the KS (ASCII `1A` (SUB HEADER))
SwapKSHeader(Bytes),
}

@ -47,6 +47,7 @@ const ASCII_UNDERSCORE: u8 = b'_';
const ASCII_AMPERSAND: u8 = b'&';
const ASCII_COLON: u8 = b':';
const ASCII_PLUS_SIGN: u8 = b'+';
const ASCII_TILDE_SIGN: u8 = b'~';
#[derive(Debug)]
/// # Skyhash Deserializer (Parser)
@ -329,6 +330,7 @@ impl<'a> Parser<'a> {
ASCII_PLUS_SIGN => Element::String(self.parse_next_string()?),
ASCII_COLON => Element::UnsignedInt(self.parse_next_u64()?),
ASCII_AMPERSAND => Element::Array(self.parse_next_array()?),
ASCII_TILDE_SIGN => Element::AnyArray(self.parse_next_any_array()?),
ASCII_UNDERSCORE => Element::FlatArray(self.parse_next_flat_array()?),
// switch keyspace with SUB
ASCII_CONTROL_SUB_HEADER => Element::SwapKSHeader(self.parse_next_byte()?),
@ -340,6 +342,32 @@ impl<'a> Parser<'a> {
Err(ParseError::NotEnough)
}
}
fn parse_next_blob(&mut self) -> ParseResult<Bytes> {
let our_string_chunk = self.__get_next_element()?;
let our_string = Bytes::copy_from_slice(our_string_chunk);
if self.will_cursor_give_linefeed()? {
// there is a lf after the end of the string; great!
// let's skip that now
self.incr_cursor();
// let's return our string
Ok(our_string)
} else {
Err(ParseError::UnexpectedByte)
}
}
fn parse_next_any_array(&mut self) -> ParseResult<Vec<Bytes>> {
let (start, stop) = self.read_line();
if let Some(our_size_chunk) = self.buffer.get(start..stop) {
let array_size = Self::parse_into_usize(our_size_chunk)?;
let mut array = Vec::with_capacity(array_size);
for _ in 0..array_size {
array.push(self.parse_next_blob()?);
}
Ok(array)
} else {
Err(ParseError::NotEnough)
}
}
/// The cursor should have passed the tsymbol
fn parse_next_flat_array(&mut self) -> ParseResult<Vec<Bytes>> {
let (start, stop) = self.read_line();
@ -800,3 +828,18 @@ fn test_ks_sub() {
)
);
}
#[test]
fn test_parse_any_array() {
let anyarray = "*1\n~3\n3\nthe\n3\ncat\n6\nmeowed\n".as_bytes();
let (query, forward_by) = Parser::new(anyarray).parse().unwrap();
assert_eq!(forward_by, anyarray.len());
assert_eq!(
query,
Query::SimpleQuery(Element::AnyArray(vec![
"the".into(),
"cat".into(),
"meowed".into()
]))
)
}

@ -27,6 +27,7 @@
use super::ddl::{KEYSPACE, TABLE};
use crate::corestore::memstore::ObjectID;
use crate::dbnet::connection::prelude::*;
use crate::resp::writer::TypedArrayWriter;
const KEYSPACES: &[u8] = "KEYSPACES".as_bytes();
action! {
@ -46,9 +47,11 @@ action! {
.iter()
.map(|kv| kv.key().clone())
.collect();
con.write_flat_array_length(ks_list.len()).await?;
let mut writer = unsafe {
TypedArrayWriter::new(con, b'+', ks_list.len())
}.await?;
for tbl in ks_list {
con.write_response(tbl).await?;
writer.write_element(tbl).await?;
}
}
_ => conwrite!(con, responses::groups::UNKNOWN_INSPECT_QUERY)?,
@ -74,9 +77,11 @@ action! {
None => return conwrite!(con, responses::groups::CONTAINER_NOT_FOUND),
};
let tbl_list: Vec<ObjectID> = ks.tables.iter().map(|kv| kv.key().clone()).collect();
con.write_flat_array_length(tbl_list.len()).await?;
let mut writer = unsafe {
TypedArrayWriter::new(con, b'+', tbl_list.len())
}.await?;
for tbl in tbl_list {
con.write_response(tbl).await?;
writer.write_element(tbl).await?;
}
},
None => aerr!(con, aerr),

@ -101,7 +101,7 @@ where
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let buf = match buf {
Element::FlatArray(a) => a,
Element::AnyArray(a) => a,
Element::SwapKSHeader(swapks) => {
swap_entity!(con, db, swapks);
return Ok(());

@ -35,6 +35,10 @@ use std::io::Error as IoError;
use std::pin::Pin;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
pub mod writer;
pub const TSYMBOL_BINARY: u8 = b'?';
pub const TSYMBOL_UNICODE: u8 = b'+';
/// # The `Writable` trait
/// All trait implementors are given access to an asynchronous stream to which

@ -0,0 +1,179 @@
/*
* Created on Thu Aug 12 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::corestore::buffers::Integer64;
use crate::corestore::Data;
use crate::dbnet::connection::ProtocolConnectionExt;
use crate::protocol::responses::groups;
use crate::IoResult;
use core::marker::PhantomData;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
/// Write a raw mono group with a custom tsymbol
pub async unsafe fn write_raw_mono<T, Strm>(
con: &mut T,
tsymbol: u8,
payload: &Data,
) -> IoResult<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let raw_stream = unsafe { con.raw_stream() };
raw_stream.write_all(&[tsymbol; 1]).await?; // first write tsymbol
let bytes = Integer64::from(payload.len());
raw_stream.write_all(&bytes).await?; // then len
raw_stream.write_all(&[b'\n']).await?; // LF
raw_stream.write_all(payload).await?; // payload
raw_stream.write_all(&[b'\n']).await?; // final LF
Ok(())
}
#[derive(Debug)]
/// A writer for a flat array, which is a multi-typed non-recursive array
pub struct FlatArrayWriter<'a, T, Strm> {
tsymbol: u8,
con: &'a mut T,
_owned: PhantomData<Strm>,
}
impl<'a, T, Strm> FlatArrayWriter<'a, T, Strm>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
/// Intialize a new flat array writer. This will write out the tsymbol
/// and length for the flat array
pub async unsafe fn new(
con: &'a mut T,
tsymbol: u8,
len: usize,
) -> IoResult<FlatArrayWriter<'a, T, Strm>> {
{
let stream = unsafe { con.raw_stream() };
// first write _
stream.write_all(&[b'_']).await?;
let bytes = Integer64::from(len);
// now write len
stream.write_all(&bytes).await?;
// first LF
stream.write_all(&[b'\n']).await?;
}
Ok(Self {
con,
tsymbol,
_owned: PhantomData,
})
}
/// Write an element
pub async fn write_element(&mut self, bytes: impl AsRef<[u8]>) -> IoResult<()> {
let stream = unsafe { self.con.raw_stream() };
let bytes = bytes.as_ref();
// first write <tsymbol>
stream.write_all(&[self.tsymbol]).await?;
// now len
let len = Integer64::from(bytes.len());
stream.write_all(&len).await?;
// now LF
stream.write_all(&[b'\n']).await?;
// now element
stream.write_all(bytes).await?;
// now final LF
stream.write_all(&[b'\n']).await?;
Ok(())
}
/// Write the NIL response code
pub async fn write_nil(&mut self) -> IoResult<()> {
let stream = unsafe { self.con.raw_stream() };
stream.write_all(groups::NIL).await?;
Ok(())
}
/// Write the SERVER_ERR (5) response code
pub async fn write_server_error(&mut self) -> IoResult<()> {
let stream = unsafe { self.con.raw_stream() };
stream.write_all(groups::NIL).await?;
Ok(())
}
}
#[derive(Debug)]
/// A writer for a typed array, which is a singly-typed array which either
/// has a typed element or a `NULL`
pub struct TypedArrayWriter<'a, T, Strm> {
con: &'a mut T,
_owned: PhantomData<Strm>,
}
impl<'a, T, Strm> TypedArrayWriter<'a, T, Strm>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
/// Create a new `typedarraywriter`. This will write the tsymbol and
/// the array length
pub async unsafe fn new(
con: &'a mut T,
tsymbol: u8,
len: usize,
) -> IoResult<TypedArrayWriter<'a, T, Strm>> {
{
let stream = unsafe { con.raw_stream() };
// first write @<tsymbol>
stream.write_all(&[b'@', tsymbol]).await?;
let bytes = Integer64::from(len);
// now write len
stream.write_all(&bytes).await?;
// first LF
stream.write_all(&[b'\n']).await?;
}
Ok(Self {
con,
_owned: PhantomData,
})
}
/// Write an element
pub async fn write_element(&mut self, bytes: impl AsRef<[u8]>) -> IoResult<()> {
let stream = unsafe { self.con.raw_stream() };
let bytes = bytes.as_ref();
// write len
let len = Integer64::from(bytes.len());
stream.write_all(&len).await?;
// now LF
stream.write_all(&[b'\n']).await?;
// now element
stream.write_all(bytes).await?;
// now final LF
stream.write_all(&[b'\n']).await?;
Ok(())
}
/// Write a null
pub async fn write_null(&mut self) -> IoResult<()> {
let stream = unsafe { self.con.raw_stream() };
stream.write_all(&[b'\0', b'\n']).await?;
Ok(())
}
}

@ -32,7 +32,7 @@ mod __private {
query.push("KEYSPACES");
assert!(matches!(
con.run_simple_query(&query).await.unwrap(),
Response::Item(Element::FlatArray(_))
Response::Item(Element::StrArray(_))
))
}
async fn test_inspect_keyspace() {
@ -42,7 +42,7 @@ mod __private {
query.push(my_keyspace);
assert!(matches!(
con.run_simple_query(&query).await.unwrap(),
Response::Item(Element::FlatArray(_))
Response::Item(Element::StrArray(_))
))
}
async fn test_inspect_table() {
@ -51,11 +51,8 @@ mod __private {
query.push("TABLE");
query.push(my_table);
match con.run_simple_query(&query).await.unwrap() {
Response::Item(Element::String(st)) => {
assert_eq!(
st,
"Keymap { data:(binstr,binstr), volatile:true }".to_owned()
)
Response::Item(Element::Str(st)) => {
assert_eq!(st, "Keymap { data:(str,str), volatile:true }".to_owned())
}
_ => panic!("Bad response for inspect table"),
}
@ -65,11 +62,8 @@ mod __private {
query.push("TABLE");
query.push(__MYENTITY__);
match con.run_simple_query(&query).await.unwrap() {
Response::Item(Element::String(st)) => {
assert_eq!(
st,
"Keymap { data:(binstr,binstr), volatile:true }".to_owned()
)
Response::Item(Element::Str(st)) => {
assert_eq!(st, "Keymap { data:(str,str), volatile:true }".to_owned())
}
_ => panic!("Bad response for inspect table"),
}

@ -51,12 +51,12 @@ mod __private {
};
}
#[cfg(test)]
use skytable::{Element, Query, RespCode, Response};
use skytable::{types::FlatElement, Element, Query, RespCode, Response};
/// Test a HEYA query: The server should return HEY!
async fn test_heya() {
query.push("heya");
let resp = con.run_simple_query(&query).await.unwrap();
assert_eq!(resp, Response::Item(Element::String("HEY!".to_owned())));
assert_eq!(resp, Response::Item(Element::Str("HEY!".to_owned())));
}
/// Test a GET query: for a non-existing key
@ -78,7 +78,7 @@ mod __private {
query.push("get");
query.push("x");
let resp = con.run_simple_query(&query).await.unwrap();
assert_eq!(resp, Response::Item(Element::String("100".to_owned())));
assert_eq!(resp, Response::Item(Element::Str("100".to_owned())));
}
/// Test a GET query with an incorrect number of arguments
@ -323,10 +323,10 @@ mod __private {
query.push("z");
assert_eq!(
con.run_simple_query(&query).await.unwrap(),
Response::Item(Element::Array(vec![
Element::String("100".to_owned()),
Element::String("200".to_owned()),
Element::String("300".to_owned())
Response::Item(Element::StrArray(vec![
Some("100".to_owned()),
Some("200".to_owned()),
Some("300".to_owned())
]))
);
}
@ -354,12 +354,12 @@ mod __private {
query.push("b");
assert_eq!(
con.run_simple_query(&query).await.unwrap(),
Response::Item(Element::Array(vec![
Element::String("100".to_owned()),
Element::String("200".to_owned()),
Element::RespCode(RespCode::NotFound),
Element::String("300".to_owned()),
Element::RespCode(RespCode::NotFound)
Response::Item(Element::StrArray(vec![
Some("100".to_owned()),
Some("200".to_owned()),
None,
Some("300".to_owned()),
None
]))
);
}
@ -1046,7 +1046,8 @@ mod __private {
.into_iter()
.map(|element| element.to_owned())
.collect();
if let Response::Item(Element::FlatArray(arr)) = ret {
if let Response::Item(Element::StrArray(arr)) = ret {
let arr: Vec<String> = arr.into_iter().map(|v| v.unwrap()).collect();
assert_eq!(ret_should_have.len(), arr.len());
assert!(ret_should_have.into_iter().all(|key| arr.contains(&key)));
} else {
@ -1080,7 +1081,8 @@ mod __private {
.into_iter()
.map(|element| element.to_owned())
.collect();
if let Response::Item(Element::FlatArray(arr)) = ret {
if let Response::Item(Element::StrArray(arr)) = ret {
let arr: Vec<String> = arr.into_iter().map(|v| v.unwrap()).collect();
assert_eq!(ret_should_have.len(), arr.len());
assert!(ret_should_have.into_iter().all(|key| arr.contains(&key)));
} else {
@ -1101,7 +1103,8 @@ mod __private {
.into_iter()
.map(|element| element.to_owned())
.collect();
if let Response::Item(Element::FlatArray(arr)) = ret {
if let Response::Item(Element::StrArray(arr)) = ret {
let arr: Vec<String> = arr.into_iter().map(|v| v.unwrap()).collect();
assert_eq!(ret_should_have.len(), arr.len());
assert!(ret_should_have.into_iter().all(|key| arr.contains(&key)));
} else {
@ -1123,7 +1126,8 @@ mod __private {
.into_iter()
.map(|element| element.to_owned())
.collect();
if let Response::Item(Element::FlatArray(arr)) = ret {
if let Response::Item(Element::StrArray(arr)) = ret {
let arr: Vec<String> = arr.into_iter().map(|v| v.unwrap()).collect();
assert_eq!(ret_should_have.len(), arr.len());
assert!(ret_should_have.into_iter().all(|key| arr.contains(&key)));
} else {
@ -1159,10 +1163,10 @@ mod __private {
query.push(vec!["mpop", "x", "y", "z"]);
assert_eq!(
con.run_simple_query(&query).await.unwrap(),
Response::Item(Element::Array(vec![
Element::String("100".to_owned()),
Element::String("200".to_owned()),
Element::String("300".to_owned())
Response::Item(Element::FlatArray(vec![
FlatElement::String("100".to_owned()),
FlatElement::String("200".to_owned()),
FlatElement::String("300".to_owned())
]))
)
}
@ -1176,13 +1180,13 @@ mod __private {
query.push(vec!["mpop", "apple", "arnold", "x", "madonna", "y", "z"]);
assert_eq!(
con.run_simple_query(&query).await.unwrap(),
Response::Item(Element::Array(vec![
Element::RespCode(RespCode::NotFound),
Element::RespCode(RespCode::NotFound),
Element::String("100".to_owned()),
Element::RespCode(RespCode::NotFound),
Element::String("200".to_owned()),
Element::String("300".to_owned())
Response::Item(Element::FlatArray(vec![
FlatElement::RespCode(RespCode::NotFound),
FlatElement::RespCode(RespCode::NotFound),
FlatElement::String("100".to_owned()),
FlatElement::RespCode(RespCode::NotFound),
FlatElement::String("200".to_owned()),
FlatElement::String("300".to_owned())
]))
);
}
@ -1202,7 +1206,7 @@ mod __private {
query.push("x");
assert_eq!(
con.run_simple_query(&query).await.unwrap(),
Response::Item(Element::String("100".to_owned()))
Response::Item(Element::Str("100".to_owned()))
);
}
async fn test_pop_nil() {

@ -42,7 +42,7 @@ mod ssl {
let q = Query::from("heya");
assert_eq!(
con.run_simple_query(&q).await.unwrap(),
Response::Item(Element::String("HEY!".to_owned()))
Response::Item(Element::Str("HEY!".to_owned()))
);
}
}

@ -24,9 +24,7 @@
*
*/
use crate::hoststr;
use crate::report;
use crate::sanity_test;
use devtimer::DevTime;
use libstress::utils::generate_random_string_vector;
use libstress::PoolConfig;
@ -163,7 +161,7 @@ pub fn runner(
socket.shutdown(std::net::Shutdown::Both).unwrap();
},
true,
Some(max_queries)
Some(max_queries),
);
// create table

@ -88,7 +88,7 @@ pub fn run_sanity_test(host: &str, port: u16) -> Result<(), Box<dyn Error>> {
if !connection
.run_simple_query(&query)
.unwrap()
.eq(&Response::Item(Element::String("HEY!".to_owned())))
.eq(&Response::Item(Element::Str("HEY!".to_owned())))
{
return Err("HEYA test failed".into());
}
@ -111,7 +111,9 @@ pub fn run_sanity_test(host: &str, port: u16) -> Result<(), Box<dyn Error>> {
if !connection
.run_simple_query(&query)
.unwrap()
.eq(&Response::Item(Element::String(value)))
.eq(&Response::Item(Element::Binstr(
value.as_bytes().to_owned(),
)))
{
return Err("GET test failed".into());
}

@ -88,7 +88,7 @@ fn parse_dbtest(
).await.unwrap();
let __create_tbl =
con.run_simple_query(
&skytable::query!("create", "table", #rand_string, "keymap(binstr,binstr)", "volatile")
&skytable::query!("create", "table", #rand_string, "keymap(str,str)", "volatile")
).await.unwrap();
let mut __concat_entity = std::string::String::new();
__concat_entity.push_str("testsuite:");

@ -228,7 +228,7 @@ pub fn stress_linearity_concurrent_clients_get(
let rets: Vec<String> = rx
.into_iter()
.map(|v| {
if let Response::Item(Element::String(val)) = v {
if let Response::Item(Element::Str(val)) = v {
val
} else {
panic!("Unexpected response from server");

Loading…
Cancel
Save