Merge pull request #147 from skytable/skyhash

Implement, stabilize and migrate to Skyhash
next
Sayan Nandan 3 years ago committed by GitHub
commit 246b974bd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -27,4 +27,5 @@ steps:
commands:
- source $SRCENV/.cargo/env
- export RUSTUP_HOME=$SRCENV/.rustup
- cargo test --verbose
- cargo run -p skyd -- --nosave &
- cargo test -- --test-threads=1

@ -66,13 +66,20 @@ jobs:
if: env.BUILD == 'true'
- name: Run Tests
run: cargo test --verbose
run: |
cargo run -p skyd -- --nosave --noart &
cargo test --verbose -- --test-threads=1
sudo pkill skyd
env:
RUST_BACKTRACE: 1
if: env.IS_MD_FILE == 'false' && runner.os != 'Windows'
- name: Run Tests (Windows)
run: cargo test --verbose
run: |
cargo build -p skyd
START /B cargo run -p skyd -- --nosave --noart
cargo test --verbose -- --test-threads=1
taskkill /IM skyd.exe /F
env:
RUST_BACKTRACE: 1
shell: cmd

@ -95,13 +95,20 @@ jobs:
if: runner.os == 'Linux' && env.IS_ACTIONS_DOC == 'true'
- name: Run Tests
run: cargo test --verbose --target ${{ matrix.rust }}
run: |
cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart &
cargo test --verbose --target ${{ matrix.rust }} -- --test-threads=1
sudo pkill skyd
env:
RUST_BACKTRACE: 1
if: env.IS_MD_FILE == 'false' && runner.os != 'Windows'
- name: Run Tests (Windows)
run: cargo test --verbose --target ${{ matrix.rust }}
run: |
cargo build -p skyd --target ${{ matrix.rust }}
START /B cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart
cargo test --target ${{ matrix.rust }} --verbose -- --test-threads=1
taskkill /IM skyd.exe /F
env:
RUST_BACKTRACE: 1
shell: cmd
@ -192,13 +199,20 @@ jobs:
if: runner.os == 'Linux'
- name: Run Tests
run: cargo test --verbose --target ${{ matrix.rust }}
run: |
cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart &
cargo test --verbose --target ${{ matrix.rust }} -- --test-threads=1
sudo pkill skyd
env:
RUST_BACKTRACE: 1
if: env.IS_MD_FILE == 'false' && runner.os == 'Linux'
- name: Run Tests (Windows)
run: cargo test --verbose --target ${{ matrix.rust }}
run: |
cargo build -p skyd --target ${{ matrix.rust }}
START /B cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart
cargo test --target ${{ matrix.rust }} --verbose -- --test-threads=1
taskkill /IM skyd.exe /F
env:
RUST_BACKTRACE: 1
shell: cmd
@ -274,7 +288,7 @@ jobs:
restore-keys: |
${{ matrix.rust }}-target-
if: env.IS_MD_FILE == 'false'
- name: Install MUSL tools
run: sudo apt-get update && sudo apt-get install musl-tools -y
if: runner.os == 'Linux'
@ -286,7 +300,10 @@ jobs:
if: env.IS_MD_FILE == 'false'
- name: Run Tests
run: cargo test --verbose --target ${{ matrix.rust }}
run: |
cargo run -p skyd --target ${{ matrix.rust }} -- --nosave --noart &
cargo test --verbose --target ${{ matrix.rust }} -- --test-threads=1
sudo pkill skyd
env:
RUST_BACKTRACE: 1
if: env.IS_MD_FILE == 'false' && runner.os == 'Linux'

116
Cargo.lock generated

@ -201,9 +201,9 @@ checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
[[package]]
name = "futures"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9d5813545e459ad3ca1bff9915e9ad7f1a47dc6a91b627ce321d5863b7dd253"
checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27"
dependencies = [
"futures-channel",
"futures-core",
@ -216,9 +216,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce79c6a52a299137a6013061e0cf0e688fce5d7f1bc60125f520912fdb29ec25"
checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2"
dependencies = [
"futures-core",
"futures-sink",
@ -226,15 +226,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "098cd1c6dda6ca01650f1a37a794245eb73181d0d4d4e955e2f3c37db7af1815"
checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1"
[[package]]
name = "futures-executor"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f6cb7042eda00f0049b1d2080aa4b93442997ee507eb3828e8bd7577f94c9d"
checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79"
dependencies = [
"futures-core",
"futures-task",
@ -243,16 +243,17 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "365a1a1fb30ea1c03a830fdb2158f5236833ac81fa0ad12fe35b29cddc35cb04"
checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1"
[[package]]
name = "futures-macro"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "668c6733a182cd7deb4f1de7ba3bf2120823835b3bcfbeacf7d2c4a773c1bb8b"
checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121"
dependencies = [
"autocfg",
"proc-macro-hack",
"proc-macro2",
"quote",
@ -261,22 +262,23 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5629433c555de3d82861a7a4e3794a4c40040390907cfbfd7143a92a426c23"
checksum = "a57bead0ceff0d6dde8f465ecd96c9338121bb7717d3e7b108059531870c4282"
[[package]]
name = "futures-task"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba7aa51095076f3ba6d9a1f702f74bd05ec65f555d70d2033d55ba8d69f581bc"
checksum = "8a16bef9fc1a4dddb5bee51c989e3fbba26569cbb0e31f5b303c184e3dd33dae"
[[package]]
name = "futures-util"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c144ad54d60f23927f0a6b6d816e4271278b64f005ad65e4e35291d2de9c025"
checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967"
dependencies = [
"autocfg",
"futures-channel",
"futures-core",
"futures-io",
@ -372,14 +374,15 @@ dependencies = [
"bytes",
"lazy_static",
"regex",
"skytable",
"termcolor",
]
[[package]]
name = "lock_api"
version = "0.4.2"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb"
dependencies = [
"scopeguard",
]
@ -401,9 +404,9 @@ checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc"
[[package]]
name = "mio"
version = "0.7.7"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e50ae3f04d169fcc9bde0b547d1c205219b7157e07ded9c5aff03e0637cb3ed7"
checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956"
dependencies = [
"libc",
"log",
@ -414,11 +417,10 @@ dependencies = [
[[package]]
name = "miow"
version = "0.3.6"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897"
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
"socket2",
"winapi",
]
@ -483,9 +485,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.5.2"
version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
[[package]]
name = "openssl"
@ -503,9 +505,9 @@ dependencies = [
[[package]]
name = "openssl-src"
version = "111.13.0+1.1.1i"
version = "111.15.0+1.1.1k"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "045e4dc48af57aad93d665885789b43222ae26f4886494da12d1ed58d309dcb6"
checksum = "b1a5f6ae2ac04393b217ea9f700cd04fa9bf3d93fae2872069f3d15d908af70a"
dependencies = [
"cc",
]
@ -551,18 +553,18 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.0.5"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96fa8ebb90271c4477f144354485b8068bd8f6b78b428b01ba892ca26caf0b63"
checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.5"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "758669ae3558c6f74bd2a18b41f7ac0b5a195aea6639d6a9b5e5d1ad5ba24c0b"
checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f"
dependencies = [
"proc-macro2",
"quote",
@ -571,9 +573,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.4"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827"
checksum = "dc0e1f259c92177c30a4c9d177246edd0a3568b25756a977d0632cf8fa37e905"
[[package]]
name = "pin-utils"
@ -675,9 +677,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.2.5"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9"
checksum = "742739e41cd49414de871ea5e549afb7e2a3ac77b589bcbebe8c82fab37147fc"
dependencies = [
"bitflags",
]
@ -801,10 +803,13 @@ version = "0.5.2"
dependencies = [
"clap",
"devtimer",
"lazy_static",
"libsky",
"rand",
"regex",
"serde",
"serde_json",
"skytable",
]
[[package]]
@ -813,7 +818,6 @@ version = "0.5.2"
dependencies = [
"proc-macro2",
"quote",
"rand",
"syn",
]
@ -839,6 +843,7 @@ dependencies = [
"serde",
"serde_derive",
"sky_macros",
"skytable",
"tokio",
"tokio-openssl",
"toml",
@ -852,18 +857,30 @@ dependencies = [
"bytes",
"clap",
"crossterm",
"lazy_static",
"libsky",
"openssl",
"regex",
"rustyline",
"skytable",
"tokio",
"tokio-openssl",
]
[[package]]
name = "skytable"
version = "0.2.3"
source = "git+https://github.com/skytable/client-rust?branch=next#48eeaad07dc21d070fa3f3fca3ff2979841bbb65"
dependencies = [
"bytes",
"tokio",
]
[[package]]
name = "slab"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527"
[[package]]
name = "smallvec"
@ -871,17 +888,6 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if",
"libc",
"winapi",
]
[[package]]
name = "strsim"
version = "0.8.0"
@ -994,9 +1000,9 @@ checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]]
name = "unicode-xid"
version = "0.2.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "utf8parse"
@ -1006,9 +1012,9 @@ checksum = "936e4b492acfd135421d8dca4b1aa80a7bfc26e702ef3af710e0752684df5372"
[[package]]
name = "vcpkg"
version = "0.2.11"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb"
checksum = "cbdbff6266a24120518560b5dc983096efb98462e51d0d68169895b237be3e5d"
[[package]]
name = "vec_map"

@ -7,11 +7,14 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
libsky = {path = "../libsky"}
tokio = {version = "1.5.0", features = ["full"]}
libsky = { path = "../libsky" }
tokio = { version = "1.5.0", features = ["full"] }
bytes = "1.0.1"
clap = {version = "2.33.3", features=["yaml"]}
clap = { version = "2.33.3", features = ["yaml"] }
openssl = { version = "0.10.34", features = ["vendored"] }
tokio-openssl = "0.6.1"
rustyline = "8.0.0"
crossterm = "0.19.0"
crossterm = "0.19.0"
skytable = { git = "https://github.com/skytable/client-rust", branch = "next", features = ["async"], default-features = false }
lazy_static = "1.4.0"
regex = "1.5.4"

@ -24,36 +24,20 @@
*
*/
use crate::protocol;
use crate::runner::Runner;
use clap::load_yaml;
use clap::App;
use std::io::stdout;
use libsky::terrapipe::ADDR;
use crossterm::{execute, cursor};
use crossterm::terminal::{Clear, ClearType};
use protocol::{Con, Connection, SslConnection};
use crossterm::{cursor, execute};
use readline::config::Configurer;
use readline::{error::ReadlineError, Editor};
use rustyline as readline;
use skytable::AsyncConnection;
use std::io::stdout;
use std::process;
use std::process::exit;
const MSG_WELCOME: &'static str = "Skytable v0.5.2";
#[macro_use]
macro_rules! close_con {
($con:expr) => {
if let Err(e) = $con.shutdown().await {
eprintln!(
"Failed to gracefully terminate connection with error '{}'",
e
);
std::process::exit(0x100);
}
};
($con:expr, $err:expr) => {
eprintln!("An error occurred while reading your input: '{}'", $err);
close_con!($con)
};
}
const ADDR: &str = "127.0.0.1";
/// This creates a REPL on the command line and also parses command-line arguments
///
@ -63,46 +47,30 @@ macro_rules! close_con {
pub async fn start_repl() {
let cfg_layout = load_yaml!("./cli.yml");
let matches = App::from_yaml(cfg_layout).get_matches();
let mut host = match matches.value_of("host") {
Some(h) => h.to_owned(),
None => ADDR.to_owned(),
};
host.push(':');
match matches.value_of("port") {
let host = matches.value_of("host").unwrap_or(ADDR);
let port = match matches.value_of("port") {
Some(p) => match p.parse::<u16>() {
Ok(p) => host.push_str(&p.to_string()),
Ok(p) => p,
Err(_) => {
eprintln!("ERROR: Invalid port");
process::exit(0x100);
}
},
None => host.push_str("2003"),
}
let ssl = matches.value_of("cert");
let mut con = if let Some(sslcert) = ssl {
let con = match SslConnection::new(&host, sslcert).await {
Ok(c) => c,
Err(e) => {
eprintln!("ERROR: {}", e);
process::exit(0x100);
}
};
Con::Secure(con)
} else {
let con = match Connection::new(&host).await {
Ok(c) => c,
Err(e) => {
eprintln!("ERROR: {}", e);
process::exit(0x100);
}
};
Con::Insecure(con)
None => 2003,
};
let con = match AsyncConnection::new(host, port).await {
Ok(c) => c,
Err(e) => {
eprintln!("ERROR: {}", e);
process::exit(0x100);
}
};
let mut runner = Runner::new(con);
if let Some(eval_expr) = matches.value_of("eval") {
if eval_expr.len() == 0 {
return;
}
con.execute_query(eval_expr.to_string()).await;
runner.run_query(&eval_expr).await;
return;
}
let mut editor = Editor::<()>::new();
@ -117,21 +85,22 @@ pub async fn start_repl() {
"clear" => {
let mut stdout = stdout();
execute!(stdout, Clear(ClearType::All)).expect("Failed to clear screen");
execute!(stdout, cursor::MoveTo(0, 0)).expect("Failed to move cursor to origin");
execute!(stdout, cursor::MoveTo(0, 0))
.expect("Failed to move cursor to origin");
drop(stdout); // aggressively drop stdout
continue;
}
_ => con.execute_query(line).await,
_ => runner.run_query(&line).await,
},
Err(ReadlineError::Interrupted) => break,
Err(err) => {
close_con!(con, err);
eprintln!("Failed to read line with error: {}", err);
exit(1);
}
}
}
if let Err(e) = editor.save_history(".sky_history") {
eprintln!("Failed to save history with error: '{}'", e);
}
close_con!(con);
println!("Goodbye!");
}

@ -25,8 +25,8 @@
*/
mod argparse;
mod protocol;
use tokio;
mod runner;
#[tokio::main]
async fn main() {

@ -1,341 +0,0 @@
/*
* Created on Tue Aug 04 2020
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2020, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! This module provides methods to deserialize an incoming response packet
use libsky::terrapipe::RespCodes;
use libsky::util::terminal;
use std::fmt;
#[derive(Debug, PartialEq)]
/// A response datagroup
///
/// This contains all the elements returned by a certain action. So let's say you did
/// something like `MGET x y`, then the values of x and y will be in a single datagroup.
pub struct DataGroup(Vec<DataType>);
/// A data type as defined by the Terrapipe protocol
///
///
/// Every variant stays in an `Option` for convenience while parsing. It's like we first
/// create a `Variant(None)` variant. Then we read the data which corresponds to it, and then we
/// replace `None` with the appropriate object. When we first detect the type, we use this as a way of matching
/// avoiding duplication by writing another `DataType` enum
#[derive(Debug, PartialEq)]
#[non_exhaustive]
pub enum DataType {
/// A string value
Str(Option<String>),
/// A response code (it is kept as `String` for "other error" types)
RespCode(Option<String>),
/// An unsigned 64-bit integer, equivalent to an `u64`
UnsignedInt(Option<Result<u64, std::num::ParseIntError>>),
}
impl fmt::Display for DataGroup {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for element in self.0.iter() {
match element {
DataType::Str(Some(val)) => write!(f, "\"{}\" ", val)?,
DataType::Str(None) => (),
DataType::UnsignedInt(Some(Ok(int))) => write!(f, "{}", int)?,
DataType::UnsignedInt(Some(Err(_))) => terminal::write_error("[Parse Error]")?,
DataType::UnsignedInt(None) => (),
DataType::RespCode(Some(rc)) => {
if rc.len() == 1 {
if let Some(rcode) = RespCodes::from_str(&rc, None) {
match rcode {
RespCodes::Okay => terminal::write_info("(Okay) ")?,
RespCodes::NotFound => terminal::write_info("(Nil) ")?,
RespCodes::OverwriteError => {
terminal::write_error("(Overwrite Error) ")?
}
RespCodes::ActionError => terminal::write_error("(Action Error) ")?,
RespCodes::PacketError => terminal::write_error("(Packet Error) ")?,
RespCodes::ServerError => terminal::write_error("(Server Error) ")?,
RespCodes::OtherError(_) => {
terminal::write_error("(Other Error) ")?
}
}
}
} else {
terminal::write_error(format!("[ERROR: '{}'] ", rc))?;
}
}
_ => unimplemented!(),
}
}
Ok(())
}
}
/// Errors that may occur while parsing responses from the server
///
/// Every variant, except `Incomplete` has an `usize` field, which is used to advance the
/// buffer
#[derive(Debug, PartialEq)]
pub enum ClientResult {
/// The response was Invalid
InvalidResponse,
/// The response is a valid response and has been parsed into a vector of datagroups
Response(Vec<DataGroup>, usize),
/// The response was empty, which means that the remote end closed the connection
Empty(usize),
/// The response is incomplete
Incomplete,
}
impl fmt::Display for ClientResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use ClientResult::*;
match self {
InvalidResponse => write!(f, "ERROR: The server sent an invalid response"),
Response(_, _) => unimplemented!(),
Empty(_) => write!(f, ""),
Incomplete => write!(f, "ERROR: The server sent an incomplete response"),
}
}
}
/// Parse a response packet
pub fn parse(buf: &[u8]) -> ClientResult {
if buf.len() < 6 {
// A packet that has less than 6 characters? Nonsense!
return ClientResult::Incomplete;
}
/*
We first get the metaframe, which looks something like:
```
#<numchars_in_next_line>\n
!<num_of_datagroups>\n
```
*/
let mut pos = 0;
if buf[pos] != b'#' {
return ClientResult::InvalidResponse;
} else {
pos += 1;
}
let next_line = match read_line_and_return_next_line(&mut pos, &buf) {
Some(line) => line,
None => {
// This is incomplete
return ClientResult::Incomplete;
}
};
pos += 1; // Skip LF
// Find out the number of actions that we have to do
let mut action_size = 0usize;
if next_line[0] == b'*' {
let mut line_iter = next_line.into_iter().skip(1).peekable();
while let Some(dig) = line_iter.next() {
let curdig: usize = match dig.checked_sub(48) {
Some(dig) => {
if dig > 9 {
return ClientResult::InvalidResponse;
} else {
dig.into()
}
}
None => return ClientResult::InvalidResponse,
};
action_size = (action_size * 10) + curdig;
}
// This line gives us the number of actions
} else {
return ClientResult::InvalidResponse;
}
let mut items: Vec<DataGroup> = Vec::with_capacity(action_size);
while pos < buf.len() && items.len() <= action_size {
match buf[pos] {
b'#' => {
pos += 1; // Skip '#'
let next_line = match read_line_and_return_next_line(&mut pos, &buf) {
Some(line) => line,
None => {
// This is incomplete
return ClientResult::Incomplete;
}
}; // Now we have the current line
pos += 1; // Skip the newline
// Move the cursor ahead by the number of bytes that we just read
// Let us check the current char
match next_line[0] {
b'&' => {
// This is an array
// Now let us parse the array size
let mut current_array_size = 0usize;
let mut linepos = 1; // Skip the '&' character
while linepos < next_line.len() {
let curdg: usize = match next_line[linepos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return ClientResult::InvalidResponse;
} else {
dig.into()
}
}
None => return ClientResult::InvalidResponse,
};
current_array_size = (current_array_size * 10) + curdg; // Increment the size
linepos += 1; // Move the position ahead, since we just read another char
}
// Now we know the array size, good!
let mut actiongroup: Vec<DataType> = Vec::with_capacity(current_array_size);
// Let's loop over to get the elements till the size of this array
while pos < buf.len() && actiongroup.len() < current_array_size {
let mut element_size = 0usize;
let datatype = match buf[pos] {
b'+' => DataType::Str(None),
b'!' => DataType::RespCode(None),
b':' => DataType::UnsignedInt(None),
x @ _ => unimplemented!("Type '{}' not implemented", char::from(x)),
};
pos += 1; // We've got the tsymbol above, so skip it
while pos < buf.len() && buf[pos] != b'\n' {
let curdig: usize = match buf[pos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return ClientResult::InvalidResponse;
} else {
dig.into()
}
}
None => return ClientResult::InvalidResponse,
};
element_size = (element_size * 10) + curdig; // Increment the size
pos += 1; // Move the position ahead, since we just read another char
}
pos += 1;
// We now know the item size
let mut value = String::with_capacity(element_size);
let extracted = match buf.get(pos..pos + element_size) {
Some(s) => s,
None => return ClientResult::Incomplete,
};
pos += element_size; // Move the position ahead
value.push_str(&String::from_utf8_lossy(extracted));
pos += 1; // Skip the newline
actiongroup.push(match datatype {
DataType::Str(_) => DataType::Str(Some(value)),
DataType::RespCode(_) => DataType::RespCode(Some(value)),
DataType::UnsignedInt(_) => {
DataType::UnsignedInt(Some(value.parse()))
}
});
}
items.push(DataGroup(actiongroup));
}
_ => return ClientResult::InvalidResponse,
}
continue;
}
_ => {
// Since the variant '#' would does all the array
// parsing business, we should never reach here unless
// the packet is invalid
return ClientResult::InvalidResponse;
}
}
}
if buf.get(pos).is_none() {
// Either more data was sent or some data was missing
if items.len() == action_size {
if items.len() == 1 {
ClientResult::Response(items, pos)
} else {
// The CLI does not support batch queries
unimplemented!();
}
} else {
ClientResult::Incomplete
}
} else {
ClientResult::InvalidResponse
}
}
/// Read a size line and return the following line
///
/// This reads a line that begins with the number, i.e make sure that
/// the **`#` character is skipped**
///
fn read_line_and_return_next_line<'a>(pos: &mut usize, buf: &'a [u8]) -> Option<&'a [u8]> {
let mut next_line_size = 0usize;
while pos < &mut buf.len() && buf[*pos] != b'\n' {
// 48 is the UTF-8 code for '0'
let curdig: usize = match buf[*pos].checked_sub(48) {
Some(dig) => {
if dig > 9 {
// If `dig` is greater than 9, then the current
// UTF-8 char isn't a number
return None;
} else {
dig.into()
}
}
None => return None,
};
next_line_size = (next_line_size * 10) + curdig; // Increment the size
*pos += 1; // Move the position ahead, since we just read another char
}
*pos += 1; // Skip the newline
// We now know the size of the next line
let next_line = match buf.get(*pos..*pos + next_line_size) {
Some(line) => line,
None => {
// This is incomplete
return None;
}
}; // Now we have the current line
// Move the cursor ahead by the number of bytes that we just read
*pos += next_line_size;
Some(next_line)
}
#[cfg(test)]
#[test]
fn test_parser() {
let res = "#2\n*1\n#2\n&1\n+4\nHEY!\n".as_bytes().to_owned();
assert_eq!(
parse(&res),
ClientResult::Response(
vec![DataGroup(vec![DataType::Str(Some("HEY!".to_owned()))])],
res.len()
)
);
let res = "#2\n*1\n#2\n&1\n!1\n0\n".as_bytes().to_owned();
assert_eq!(
parse(&res),
ClientResult::Response(
vec![DataGroup(vec![DataType::RespCode(Some("0".to_owned()))])],
res.len()
)
);
}

@ -1,252 +0,0 @@
/*
* Created on Tue Aug 04 2020
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2020, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
mod deserializer;
use bytes::{Buf, BytesMut};
use deserializer::ClientResult;
use libsky::terrapipe;
use libsky::TResult;
use libsky::BUF_CAP;
use openssl::ssl::Ssl;
use openssl::ssl::SslContext;
use openssl::ssl::SslMethod;
use std::io::Result as IoResult;
use std::net::SocketAddr;
use std::pin::Pin;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_openssl::SslStream;
pub enum Con {
Secure(SslConnection),
Insecure(Connection),
}
impl Con {
pub async fn execute_query(&mut self, query: String) {
match self {
Con::Insecure(con) => con.run_query(query).await,
Con::Secure(con) => con.run_query(query).await,
}
}
pub async fn shutdown(&mut self) -> TResult<()> {
match self {
Con::Insecure(con) => con.shutdown().await,
Con::Secure(con) => con.shutdown().await,
}
}
}
/// A `Connection` is a wrapper around a`TcpStream` and a read buffer
pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}
impl Connection {
/// Create a new connection, creating a connection to `host`
pub async fn new(host: &str) -> TResult<Self> {
let stream = TcpStream::connect(host).await?;
println!("Connected to tp://{}", host);
Ok(Connection {
stream,
buffer: BytesMut::with_capacity(BUF_CAP),
})
}
/// This function will write a query to the stream and read the response from the
/// server. It will then determine if the returned response is complete or incomplete
/// or invalid.
///
/// - If it is complete, then the return is parsed into a `Display`able form
/// and written to the output stream. If any parsing errors occur, they're also handled
/// by this function (usually, "Invalid Response" is written to the terminal).
/// - If the packet is incomplete, it will wait to read the entire response from the stream
/// - If the packet is corrupted, it will output "Invalid Response"
pub async fn run_query(&mut self, query: String) {
let query = terrapipe::proc_query(query);
match self.stream.write_all(&query).await {
Ok(_) => (),
Err(_) => {
eprintln!("ERROR: Couldn't write data to socket");
return;
}
};
loop {
match self.stream.read_buf(&mut self.buffer).await {
Ok(_) => (),
Err(e) => {
eprintln!("ERROR: {}", e);
return;
}
}
match self.try_response().await {
ClientResult::Empty(f) => {
self.buffer.advance(f);
eprintln!("ERROR: The remote end reset the connection");
return;
}
ClientResult::Incomplete => {
continue;
}
ClientResult::Response(r, f) => {
self.buffer.advance(f);
if r.len() == 0 {
return;
}
for group in r {
println!("{}", group);
}
return;
}
ClientResult::InvalidResponse => {
self.buffer.clear();
eprintln!("{}", ClientResult::InvalidResponse);
return;
}
}
}
}
/// This function is a subroutine of `run_query` used to parse the response packet
async fn try_response(&mut self) -> ClientResult {
if self.buffer.is_empty() {
// The connection was possibly reset
return ClientResult::Empty(0);
}
deserializer::parse(&self.buffer)
}
pub async fn shutdown(&mut self) -> TResult<()> {
self.stream.shutdown().await.map_err(|e| e.into())
}
}
/// An `SslConnection` is a wrapper around a `SslStream<TcpStream>` provided by OpenSSL and a
/// read buffer
pub struct SslConnection {
stream: SslStream<TcpStream>,
buffer: BytesMut,
}
impl SslConnection {
/// Create a new connection, creating a connection to `host`
pub async fn new(host: &str, sslcert: &str) -> TResult<Self> {
let mut ctx = SslContext::builder(SslMethod::tls_client())?;
ctx.set_ca_file(sslcert)?;
let ssl = Ssl::new(&ctx.build())?;
let stream = TcpStream::connect(host).await?;
let mut stream = SslStream::new(ssl, stream)?;
Pin::new(&mut stream).connect().await.unwrap();
println!("Connected to tps://{}", host);
Ok(SslConnection {
stream,
buffer: BytesMut::with_capacity(BUF_CAP),
})
}
/// This function will write a query to the stream and read the response from the
/// server. It will then determine if the returned response is complete or incomplete
/// or invalid.
///
/// - If it is complete, then the return is parsed into a `Display`able form
/// and written to the output stream. If any parsing errors occur, they're also handled
/// by this function (usually, "Invalid Response" is written to the terminal).
/// - If the packet is incomplete, it will wait to read the entire response from the stream
/// - If the packet is corrupted, it will output "Invalid Response"
pub async fn run_query(&mut self, query: String) {
let query = terrapipe::proc_query(query);
match self.stream.write_all(&query).await {
Ok(_) => (),
Err(e) => {
eprintln!("ERROR: Couldn't write data to socket with '{}'", e);
return;
}
};
loop {
if let Err(e) = self.read_again().await {
eprintln!("ERROR: Reading from stream failed with: '{}'", e);
return;
}
match self.try_response().await {
ClientResult::Empty(f) => {
self.buffer.advance(f);
eprintln!("ERROR: The remote end reset the connection");
return;
}
ClientResult::Incomplete => {
continue;
}
ClientResult::Response(r, f) => {
self.buffer.advance(f);
if r.len() == 0 {
return;
}
for group in r {
println!("{}", group);
}
return;
}
ClientResult::InvalidResponse => {
self.buffer.clear();
eprintln!("{}", ClientResult::InvalidResponse);
return;
}
}
}
}
/// This function is a subroutine of `run_query` used to parse the response packet
async fn try_response(&mut self) -> ClientResult {
if self.buffer.is_empty() {
// The connection was possibly reset
return ClientResult::Empty(0);
}
deserializer::parse(&self.buffer)
}
async fn read_again(&mut self) -> Result<(), String> {
match self.stream.read_buf(&mut self.buffer).await {
Ok(0) => {
if self.buffer.is_empty() {
return Ok(());
} else {
return Err(format!(
"Connection reset while reading from {}",
if let Ok(p) = self.get_peer() {
p.to_string()
} else {
"peer".to_owned()
}
)
.into());
}
}
Ok(_) => Ok(()),
Err(e) => return Err(format!("{}", e)),
}
}
fn get_peer(&self) -> IoResult<SocketAddr> {
self.stream.get_ref().peer_addr()
}
pub async fn shutdown(&mut self) -> TResult<()> {
self.stream.shutdown().await.map_err(|e| e.into())
}
}

@ -0,0 +1,167 @@
/*
* Created on Wed May 12 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crossterm::style::{Color, Print, ResetColor, SetForegroundColor};
use skytable::{AsyncConnection, Element, RespCode, Response};
pub struct Runner {
con: AsyncConnection,
}
macro_rules! write_string {
($st:ident) => {
println!("\"{}\"", $st);
};
($idx:ident, $st:ident) => {
println!("({}) \"{}\"", $idx, $st);
};
}
macro_rules! write_int {
($int:ident) => {
println!("{}", $int);
};
($idx:ident, $st:ident) => {
println!("({}) \"{}\"", $idx, $st);
};
}
macro_rules! write_err {
($idx:expr, $err:ident) => {
crossterm::execute!(
std::io::stdout(),
SetForegroundColor(Color::Red),
Print(if let Some(idx) = $idx {
format!("({}) ({})\n", idx, $err)
} else {
format!("({})\n", $err)
}),
ResetColor
)
.expect("Failed to write to stdout");
};
($idx:ident, $err:literal) => {
crossterm::execute!(
std::io::stdout(),
SetForegroundColor(Color::Red),
Print(
(if let Some(idx) = $idx {
format!("({}) ({})\n", idx, $err)
} else {
format!("({})\n", $err)
})
),
ResetColor
)
.expect("Failed to write to stdout");
};
}
macro_rules! write_okay {
() => {
crossterm::execute!(
std::io::stdout(),
SetForegroundColor(Color::Cyan),
Print("(Okay)\n".to_string()),
ResetColor
)
.expect("Failed to write to stdout");
};
($idx:ident) => {
crossterm::execute!(
std::io::stdout(),
SetForegroundColor(Color::Cyan),
Print(format!("({}) (Okay)\n", $idx)),
ResetColor
)
.expect("Failed to write to stdout");
};
}
impl Runner {
pub const fn new(con: AsyncConnection) -> Self {
Runner { con }
}
pub async fn run_query(&mut self, unescaped_items: &str) {
let query = libsky::turn_into_query(unescaped_items);
match self.con.run_simple_query(query).await {
Ok(resp) => match resp {
Response::InvalidResponse => {
println!("ERROR: The server sent an invalid response");
}
Response::Item(element) => match element {
Element::String(st) => write_string!(st),
Element::FlatArray(arr) => print_flat_array(arr),
Element::RespCode(r) => print_rcode(r, None),
Element::UnsignedInt(int) => write_int!(int),
Element::Array(a) => print_array(a),
_ => unimplemented!(),
},
Response::ParseError => {
println!("ERROR: The client failed to deserialize data sent by the server")
}
x @ _ => {
println!(
"The server possibly sent a newer data type that we can't parse: {:?}",
x
)
}
},
Err(e) => {
eprintln!("An I/O error occurred while querying: {}", e);
std::process::exit(1);
}
}
}
}
fn print_rcode(rcode: RespCode, idx: Option<usize>) {
match rcode {
RespCode::Okay => write_okay!(),
RespCode::ActionError => write_err!(idx, "Action Error"),
RespCode::ErrorString(st) => write_err!(idx, st),
RespCode::OtherError => write_err!(idx, "Other Error"),
RespCode::NotFound => write_err!(idx, "Not Found"),
RespCode::OverwriteError => write_err!(idx, "Overwrite Error"),
RespCode::PacketError => write_err!(idx, "Packet Error"),
RespCode::ServerError => write_err!(idx, "Server Error"),
}
}
fn print_flat_array(flat_array: Vec<String>) {
flat_array.into_iter().for_each(|item| write_string!(item))
}
fn print_array(array: Vec<Element>) {
for (idx, item) in array.into_iter().enumerate() {
let idx = idx + 1;
match item {
Element::String(st) => write_string!(idx, st),
Element::RespCode(rc) => print_rcode(rc, Some(idx)),
Element::UnsignedInt(int) => write_int!(idx, int),
Element::FlatArray(a) => print_flat_array(a),
_ => unimplemented!("Nested arrays cannot be printed just yet"),
}
}
}

@ -10,4 +10,5 @@ edition = "2018"
lazy_static = "1.4.0"
bytes = "1.0.1"
termcolor = "1.1.2"
regex = "1.5.4"
regex = "1.5.4"
skytable = { git = "https://github.com/skytable/client-rust", branch = "next", features = ["dbg"], default-features = false }

@ -28,10 +28,36 @@
//!
//! This contains modules which are shared by both the `cli` and the `server` modules
pub mod terrapipe;
pub mod util;
use skytable::Query;
use std::error::Error;
/// A generic result
pub type TResult<T> = Result<T, Box<dyn Error>>;
/// The size of the read buffer in bytes
pub const BUF_CAP: usize = 8 * 1024; // 8 KB per-connection
use std::str::FromStr;
lazy_static::lazy_static! {
static ref RE: regex::Regex = regex::Regex::from_str(r#"("[^"]*"|'[^']*'|[\S]+)+"#).unwrap();
}
pub fn split_into_args(q: &str) -> Vec<String> {
let args: Vec<String> = RE
.find_iter(q)
.map(|val| val.as_str().replace("'", "").replace("\"", "").to_owned())
.collect();
args
}
pub fn turn_into_query(q: &str) -> Query {
let mut query = Query::new();
split_into_args(q).into_iter().for_each(|arg| {
query.arg(arg);
});
query
}
pub fn into_raw_query(q: &str) -> Vec<u8> {
turn_into_query(q).into_raw_query()
}

@ -1,177 +0,0 @@
/*
* Created on Sat Jul 18 2020
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2020, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! # The Terrapipe protocol
//! This module implements primitives for the Terrapipe protocol
//!
pub const ADDR: &'static str = "127.0.0.1";
use std::str::FromStr;
lazy_static::lazy_static! {
static ref RE: regex::Regex = regex::Regex::from_str(r#"("[^"]*"|'[^']*'|[\S]+)+"#).unwrap();
}
/// Response codes returned by the server
#[derive(Debug, PartialEq)]
pub enum RespCodes {
/// `0`: Okay (Empty Response) - use the `ResponseBuilder` for building
/// responses that contain data
Okay,
/// `1`: Not Found
NotFound,
/// `2`: Overwrite Error
OverwriteError,
/// `3`: Action Error
ActionError,
/// `4`: Packet Error
PacketError,
/// `5`: Server Error
ServerError,
/// `6`: Some other error - the wrapped `String` will be returned in the response body.
/// Just a note, this gets quite messy, especially when we're using it for deconding responses
OtherError(Option<String>),
}
impl From<RespCodes> for u8 {
fn from(rcode: RespCodes) -> u8 {
use RespCodes::*;
match rcode {
Okay => 0,
NotFound => 1,
OverwriteError => 2,
ActionError => 3,
PacketError => 4,
ServerError => 5,
OtherError(_) => 6,
}
}
}
impl From<RespCodes> for char {
fn from(rcode: RespCodes) -> char {
use RespCodes::*;
match rcode {
Okay => '0',
NotFound => '1',
OverwriteError => '2',
ActionError => '3',
PacketError => '4',
ServerError => '5',
OtherError(_) => '6',
}
}
}
impl RespCodes {
pub fn from_str(val: &str, extra: Option<String>) -> Option<Self> {
use RespCodes::*;
let res = match val.parse::<u8>() {
Ok(val) => match val {
0 => Okay,
1 => NotFound,
2 => OverwriteError,
3 => ActionError,
4 => PacketError,
5 => ServerError,
6 => OtherError(extra),
_ => return None,
},
Err(_) => return None,
};
Some(res)
}
pub fn from_u8(val: u8, extra: Option<String>) -> Option<Self> {
use RespCodes::*;
let res = match val {
0 => Okay,
1 => NotFound,
2 => OverwriteError,
3 => ActionError,
4 => PacketError,
5 => ServerError,
6 => OtherError(extra),
_ => return None,
};
Some(res)
}
pub fn from_utf8(val: u8) -> Option<Self> {
let result = match val.checked_sub(48) {
Some(r) => r,
None => return None,
};
if result > 6 {
return None;
}
return RespCodes::from_u8(result, None);
}
}
/// Prepare a query packet from a string of whitespace separated values
pub fn proc_query<T>(querystr: T) -> Vec<u8>
where
T: AsRef<str>,
{
let mut bytes = Vec::with_capacity(querystr.as_ref().len());
let args: Vec<String> = RE
.find_iter(&querystr.as_ref())
.map(|val| val.as_str().replace("'", "").replace("\"", "").to_owned())
.collect();
bytes.extend(b"#2\n*1\n#");
let arg_len_bytes = args.len().to_string().into_bytes();
let arg_len_bytes_len = (arg_len_bytes.len() + 1).to_string().into_bytes();
bytes.extend(arg_len_bytes_len);
bytes.extend(b"\n&");
bytes.extend(arg_len_bytes);
bytes.push(b'\n');
args.into_iter().for_each(|arg| {
bytes.push(b'#');
let len_bytes = arg.len().to_string().into_bytes();
bytes.extend(len_bytes);
bytes.push(b'\n');
bytes.extend(arg.as_bytes());
bytes.push(b'\n');
});
bytes
}
#[test]
fn test_queryproc() {
let query = "GET x y".to_owned();
assert_eq!(
"#2\n*1\n#2\n&3\n#3\nGET\n#1\nx\n#1\ny\n"
.as_bytes()
.to_owned(),
proc_query(query)
);
let q_escaped = proc_query(r#"SET X 'sayan with spaces'"#);
assert_eq!(
"#2\n*1\n#2\n&3\n#3\nSET\n#1\nX\n#17\nsayan with spaces\n"
.as_bytes()
.to_owned(),
q_escaped
);
}

@ -8,33 +8,36 @@ build = "build.rs"
[dependencies]
tokio = { version = "1.5.0", features = ["full"] }
bytes = "1.0.1"
libsky = {path ="../libsky"}
libsky = { path = "../libsky" }
bincode = "1.3.3"
parking_lot = "0.11.1"
lazy_static = "1.4.0"
serde_derive = "1.0.125"
futures = "0.3.14"
serde = {version = "1.0.125", features= ["derive"]}
serde = { version = "1.0.125", features = ["derive"] }
toml = "0.5.8"
clap = {version = "2.33.3", features=["yaml"]}
clap = { version = "2.33.3", features = ["yaml"] }
env_logger = "0.8.3"
log = "0.4.14"
chrono = "0.4.19"
regex = "1.5.4"
sky_macros = {path="../sky-macros"}
sky_macros = { path = "../sky-macros" }
tokio-openssl = "0.6.1"
openssl = { version = "0.10.34", features = ["vendored"] }
# Use the respcodes from the client-driver
skytable = { git = "https://github.com/skytable/client-rust", branch = "next", default-features = false }
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.3.2"
[target.'cfg(target_os = "windows")'.dependencies]
winapi = {version="0.3.9", features=["fileapi"]}
winapi = { version = "0.3.9", features = ["fileapi"] }
[target.'cfg(unix)'.build-dependencies]
cc = "1.0.67"
[dev-dependencies]
tokio = { version = "1.5.0", features = ["test-util"] }
skytable = { git = "https://github.com/skytable/client-rust", features = ["async"], default-features = false, branch = "next" }
[target.'cfg(unix)'.dependencies]
libc = "0.2.94"
libc = "0.2.94"

@ -37,19 +37,19 @@ use std::path::{Component, PathBuf};
pub async fn mksnap<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
let howmany = act.len() - 1;
if howmany == 0 {
if !handle.is_snapshot_enabled() {
// Since snapshotting is disabled, we can't create a snapshot!
// We'll just return an error returning the same
return con
.write_response(&**responses::fresp::R_SNAPSHOT_DISABLED)
.write_response(&**responses::groups::SNAPSHOT_DISABLED)
.await;
}
// We will just follow the standard convention of creating snapshots
@ -82,38 +82,36 @@ where
}
if was_engine_error {
return con
.write_response(responses::fresp::R_SERVER_ERR.to_owned())
.write_response(responses::groups::SERVER_ERR.to_owned())
.await;
}
if engine_was_busy {
return con
.write_response(&**responses::fresp::R_SNAPSHOT_BUSY)
.write_response(&**responses::groups::SNAPSHOT_BUSY)
.await;
}
if let Some(succeeded) = snap_result {
if succeeded {
// Snapshotting succeeded, return Okay
return con
.write_response(responses::fresp::R_OKAY.to_owned())
.await;
return con.write_response(responses::groups::OKAY.to_owned()).await;
} else {
// Nope, something happened while creating a snapshot
// return a server error
return con
.write_response(responses::fresp::R_SERVER_ERR.to_owned())
.write_response(responses::groups::SERVER_ERR.to_owned())
.await;
}
} else {
// We shouldn't ever reach here if all our logic is correct
// but if we do, something is wrong with the runtime
return con
.write_response(&**responses::fresp::R_ERR_ACCESS_AFTER_TERMSIG)
.write_response(&**responses::groups::ERR_ACCESS_AFTER_TERMSIG)
.await;
}
} else {
if howmany == 1 {
// This means that the user wants to create a 'named' snapshot
let snapname = act.get_ref().get(1).unwrap_or_else(|| unsafe {
let snapname = act.get(1).unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked that the action
// contains a second argument, so this can't be reached
unreachable_unchecked()
@ -134,7 +132,7 @@ where
!= 0;
if illegal_snapshot {
return con
.write_response(&**responses::fresp::R_SNAPSHOT_ILLEGAL_NAME)
.write_response(&**responses::groups::SNAPSHOT_ILLEGAL_NAME)
.await;
}
let failed;
@ -149,16 +147,14 @@ where
}
if failed {
return con
.write_response(responses::fresp::R_SERVER_ERR.to_owned())
.write_response(responses::groups::SERVER_ERR.to_owned())
.await;
} else {
return con
.write_response(responses::fresp::R_OKAY.to_owned())
.await;
return con.write_response(responses::groups::OKAY.to_owned()).await;
}
} else {
return con
.write_response(responses::fresp::R_ACTION_ERR.to_owned())
.write_response(responses::groups::ACTION_ERR.to_owned())
.await;
}
}

@ -29,6 +29,7 @@
use crate::config::BGSave;
use crate::config::SnapshotConfig;
use crate::config::SnapshotPref;
use crate::coredb::htable::HTable;
use crate::dbnet::connection::prelude::*;
use crate::diskstore;
use crate::protocol::Query;
@ -40,7 +41,6 @@ use libsky::TResult;
use parking_lot::RwLock;
use parking_lot::RwLockReadGuard;
use parking_lot::RwLockWriteGuard;
use crate::coredb::htable::HTable;
use std::sync::Arc;
use tokio;
pub mod htable;
@ -268,12 +268,13 @@ impl CoreDB {
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
match query {
Query::Simple(q) => {
Query::SimpleQuery(q) => {
con.write_simple_query_header().await?;
queryengine::execute_simple(&self, con, q).await?;
con.flush_stream().await?;
}
// TODO(@ohsayan): Pipeline commands haven't been implemented yet
Query::Pipelined(_) => unimplemented!(),
Query::PipelinedQuery(_) => unimplemented!(),
}
Ok(())
}
@ -386,15 +387,9 @@ impl CoreDB {
/// **⚠ Do note**: This is super inefficient since it performs an actual
/// clone of the `HTable` and doesn't do any `Arc`-business! This function
/// can be used by test functions and the server, but **use with caution!**
pub fn get_HTable_deep_clone(&self) -> HTable<String, Data> {
pub fn get_htable_deep_clone(&self) -> HTable<String, Data> {
(*self.acquire_read().get_ref()).clone()
}
#[cfg(test)]
/// **⚠⚠⚠ This deletes everything stored in the in-memory table**
pub fn finish_db(&self) {
self.acquire_write().unwrap().coremap.clear()
}
}
impl Drop for CoreDB {

@ -40,13 +40,13 @@ use crate::dbnet::tcp::BufferedSocketStream;
use crate::dbnet::Terminator;
use crate::protocol;
use crate::protocol::responses;
use crate::protocol::ParseError;
use crate::protocol::Query;
use crate::resp::Writable;
use crate::CoreDB;
use bytes::Buf;
use bytes::BytesMut;
use libsky::TResult;
use protocol::ParseResult;
use protocol::QueryResult;
use std::future::Future;
use std::io::Error as IoError;
use std::io::ErrorKind;
@ -60,6 +60,15 @@ use tokio::io::BufWriter;
use tokio::sync::mpsc;
use tokio::sync::Semaphore;
pub const SIMPLE_QUERY_HEADER: [u8; 3] = [b'*', b'1', b'\n'];
pub enum QueryResult {
Q(Query),
E(Vec<u8>),
Empty,
Wrongtype,
}
pub mod prelude {
//! A 'prelude' for callers that would like to use the `ProtocolConnection` and `ProtocolConnectionExt` traits
//!
@ -108,11 +117,11 @@ where
})
}
/// Try to parse a query from the buffered data
fn try_query(&self) -> Result<ParseResult, ()> {
fn try_query(&self) -> Result<(Query, usize), ParseError> {
if self.get_buffer().is_empty() {
return Err(());
return Err(ParseError::Empty);
}
Ok(protocol::parse(&self.get_buffer()))
protocol::Parser::new(&self.get_buffer()).parse()
}
/// Read a query from the remote end
///
@ -128,23 +137,25 @@ where
Box::pin(async move {
let mv_self = self;
let _: Result<QueryResult, IoError> = {
mv_self.read_again().await?;
loop {
mv_self.read_again().await?;
match mv_self.try_query() {
Ok(ParseResult::Query(query, forward)) => {
mv_self.advance_buffer(forward);
Ok((query, forward_by)) => {
mv_self.advance_buffer(forward_by);
return Ok(QueryResult::Q(query));
}
Ok(ParseResult::BadPacket) => {
mv_self.clear_buffer();
return Ok(QueryResult::E(responses::fresp::R_PACKET_ERR.to_owned()));
Err(ParseError::Empty) => return Ok(QueryResult::Empty),
Err(ParseError::NotEnough) => (),
Err(ParseError::DataTypeParseError) => return Ok(QueryResult::Wrongtype),
Err(ParseError::UnexpectedByte) | Err(ParseError::BadPacket) => {
return Ok(QueryResult::E(
responses::full_responses::R_PACKET_ERR.to_owned(),
));
}
Err(_) => {
return Ok(QueryResult::Empty);
Err(ParseError::UnknownDatatype) => {
unimplemented!()
}
_ => (),
}
mv_self.read_again().await?;
}
};
})
@ -168,6 +179,63 @@ where
ret
})
}
/// Write the simple query header `*1\n` to the stream
fn write_simple_query_header<'r, 's>(
&'r mut self,
) -> Pin<Box<dyn Future<Output = IoResult<()>> + Send + 's>>
where
'r: 's,
Self: Send + 's,
{
Box::pin(async move {
let mv_self = self;
let ret: IoResult<()> = {
mv_self.write_response(&SIMPLE_QUERY_HEADER[..]).await?;
Ok(())
};
ret
})
}
/// Write the flat array length (`_<size>\n`)
fn write_flat_array_length<'r, 's>(
&'r mut self,
len: usize,
) -> Pin<Box<dyn Future<Output = IoResult<()>> + Send + 's>>
where
'r: 's,
Self: Send + 's,
{
Box::pin(async move {
let mv_self = self;
let ret: IoResult<()> = {
mv_self.write_response(&[b'_'][..]).await?;
mv_self.write_response(len.to_string().into_bytes()).await?;
mv_self.write_response(&[b'\n'][..]).await?;
Ok(())
};
ret
})
}
/// Write the array length (`&<size>\n`)
fn write_array_length<'r, 's>(
&'r mut self,
len: usize,
) -> Pin<Box<dyn Future<Output = IoResult<()>> + Send + 's>>
where
'r: 's,
Self: Send + 's,
{
Box::pin(async move {
let mv_self = self;
let ret: IoResult<()> = {
mv_self.write_response(&[b'&'][..]).await?;
mv_self.write_response(len.to_string().into_bytes()).await?;
mv_self.write_response(&[b'\n'][..]).await?;
Ok(())
};
ret
})
}
/// Wraps around the `write_response` used to differentiate between a
/// success response and an error response
fn close_conn_with_error<'r, 's>(
@ -315,7 +383,7 @@ where
}
}
pub async fn run(&mut self) -> TResult<()> {
log::debug!("SslConnectionHanler initialized to handle a remote client");
log::debug!("ConnectionHandler initialized to handle a remote client");
while !self.terminator.is_termination_signal() {
let try_df = tokio::select! {
tdf = self.con.read_query() => tdf,
@ -331,6 +399,11 @@ where
log::debug!("Failed to read query!");
self.con.close_conn_with_error(r).await?
}
Ok(QueryResult::Wrongtype) => {
self.con
.close_conn_with_error(responses::groups::WRONGTYPE_ERR.to_owned())
.await?
}
Ok(QueryResult::Empty) => return Ok(()),
#[cfg(windows)]
Err(e) => match e.kind() {

@ -394,34 +394,3 @@ pub async fn run(
}
(db, cloned_descriptor)
}
/// This is a **test only** function
/// This takes a `CoreDB` object so that keys can be modified externally by
/// the testing suite. This will **not save any data to disk**!
/// > **This is not for release builds in any way!**
#[cfg(test)]
pub async fn test_run(listener: TcpListener, db: CoreDB, sig: impl Future) {
let (signal, _) = broadcast::channel(1);
let (terminate_tx, terminate_rx) = mpsc::channel(1);
let mut server = Listener {
listener,
db,
climit: Arc::new(Semaphore::new(50000)),
signal,
terminate_tx,
terminate_rx,
};
tokio::select! {
_ = server.run() => {}
_ = sig => {}
}
let Listener {
mut terminate_rx,
terminate_tx,
signal,
..
} = server;
drop(signal);
drop(terminate_tx);
let _ = terminate_rx.recv().await;
}

@ -220,6 +220,7 @@ mod __sys {
#[cfg(unix)]
mod __sys {
#![allow(dead_code)] // TODO: Enable this lint or remove the offending methods
//! # Unix platform-specific file locking
//! This module contains methods used by the `FileLock` object in this module to lock and/or
//! unlock files.

@ -246,7 +246,7 @@ fn test_snapshot() {
let _ = snapengine.mksnap();
let current = snapengine.get_snapshots().next().unwrap();
let read_hmap = diskstore::test_deserialize(fs::read(PathBuf::from(current)).unwrap()).unwrap();
let dbhmap = db.get_HTable_deep_clone();
let dbhmap = db.get_htable_deep_clone();
assert_eq!(read_hmap, dbhmap);
snapengine.clearall().unwrap();
fs::remove_dir_all(ourdir).unwrap();

@ -24,6 +24,8 @@
*
*/
#![allow(dead_code)] // TODO: Enable this lint or remove offending methods
//! # Snapstore
//!
//! Snapstore is an extremely fundamental but powerful disk storage format which comprises of two parts:
@ -47,9 +49,9 @@
//! > In other words, the `snapstore.bin` file is completely useless without the `snapstore.partmap` file; so,
//! if you happen to lose it — have a good day!
use crate::coredb::htable::HTable;
use bincode;
use serde::{Deserialize, Serialize};
use crate::coredb::htable::HTable;
use std::error::Error;
use std::fs;
use std::io::prelude::*;

@ -25,28 +25,22 @@
*/
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;
/// Get the number of keys in the database
pub async fn dbsize<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
if act.howmany() != 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}
crate::err_if_len_is!(act, con, != 0);
let len;
{
len = handle.acquire_read().get_ref().len();
}
con.write_response(GroupBegin(1)).await?;
con.write_response(len).await?;
Ok(())
}

@ -27,11 +27,8 @@
//! # `DEL` queries
//! This module provides functions to work with `DEL` queries
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;
/// Run a `DEL` query
///
@ -40,24 +37,19 @@ use crate::resp::GroupBegin;
pub async fn del<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}
// Write #<m>\n#<n>\n&<howmany>\n to the stream
con.write_response(GroupBegin(1)).await?;
crate::err_if_len_is!(act, con, == 0);
let done_howmany: Option<usize>;
{
if let Some(mut whandle) = handle.acquire_write() {
let mut many = 0;
let cmap = (*whandle).get_mut_ref();
act.into_iter().for_each(|key| {
act.into_iter().skip(1).for_each(|key| {
if cmap.remove(&key).is_some() {
many += 1
}
@ -72,6 +64,6 @@ where
if let Some(done_howmany) = done_howmany {
con.write_response(done_howmany).await
} else {
con.write_response(&**responses::fresp::R_SERVER_ERR).await
con.write_response(&**responses::groups::SERVER_ERR).await
}
}

@ -27,33 +27,24 @@
//! # `EXISTS` queries
//! This module provides functions to work with `EXISTS` queries
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;
/// Run an `EXISTS` query
pub async fn exists<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}
// Write #<m>\n#<n>\n&1\n to the stream
con.write_response(GroupBegin(1)).await?;
crate::err_if_len_is!(act, con, == 0);
let mut how_many_of_them_exist = 0usize;
{
let rhandle = handle.acquire_read();
let cmap = rhandle.get_ref();
act.into_iter().for_each(|key| {
act.into_iter().skip(1).for_each(|key| {
if cmap.contains_key(&key) {
how_many_of_them_exist += 1;
}

@ -24,24 +24,20 @@
*
*/
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
/// Delete all the keys in the database
pub async fn flushdb<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
if act.howmany() != 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}
crate::err_if_len_is!(act, con, != 0);
let failed;
{
if let Some(mut table) = handle.acquire_write() {
@ -52,8 +48,8 @@ where
}
}
if failed {
con.write_response(&**responses::fresp::R_SERVER_ERR).await
con.write_response(&**responses::groups::SERVER_ERR).await
} else {
con.write_response(&**responses::fresp::R_OKAY).await
con.write_response(&**responses::groups::OKAY).await
}
}

@ -27,29 +27,22 @@
//! # `GET` queries
//! This module provides functions to work with `GET` queries
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::resp::{BytesWrapper, GroupBegin};
use crate::resp::BytesWrapper;
use bytes::Bytes;
/// Run a `GET` query
pub async fn get<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
if howmany != 1 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}
// Write #<m>\n#<n>\n&1\n to the stream
con.write_response(GroupBegin(1)).await?;
crate::err_if_len_is!(act, con, != 1);
let res: Option<Bytes> = {
let rhandle = handle.acquire_read();
let reader = rhandle.get_ref();
@ -57,7 +50,7 @@ where
// UNSAFE(@ohsayan): act.get_ref().get_unchecked() is safe because we've already if the action
// group contains one argument (excluding the action itself)
reader
.get(act.get_ref().get_unchecked(1))
.get(act.get_unchecked(1))
.map(|b| b.get_blob().clone())
}
};

@ -28,10 +28,7 @@
//! #`JGET` queries
//! Functions for handling `JGET` queries
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
/// Run a `JGET` query
/// This returns a JSON key/value pair of keys and values
@ -45,16 +42,13 @@ use crate::protocol::responses;
pub async fn jget<T, Strm>(
_handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
if howmany != 1 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}
crate::err_if_len_is!(act, con, != 1);
todo!()
}

@ -26,8 +26,6 @@
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;
/// Run a `KEYLEN` query
///
@ -35,18 +33,13 @@ use crate::resp::GroupBegin;
pub async fn keylen<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
if howmany != 1 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}
// Write #<m>\n#<n>\n&1\n to the stream
con.write_response(GroupBegin(1)).await?;
crate::err_if_len_is!(act, con, != 1);
let res: Option<usize> = {
let rhandle = handle.acquire_read();
let reader = rhandle.get_ref();
@ -54,7 +47,7 @@ where
// UNSAFE(@ohsayan): get_unchecked() is completely safe as we've already checked
// the number of arguments is one
reader
.get(act.get_ref().get_unchecked(1))
.get(act.get_unchecked(1))
.map(|b| b.get_blob().len())
}
};

@ -24,32 +24,25 @@
*
*/
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::resp::{BytesWrapper, GroupBegin};
use crate::resp::BytesWrapper;
use bytes::Bytes;
use libsky::terrapipe::RespCodes;
use skytable::RespCode;
/// Run an `MGET` query
///
pub async fn mget<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}
// Write #<m>\n#<n>\n&<howmany>\n to the stream
con.write_response(GroupBegin(howmany)).await?;
let mut keys = act.into_iter();
crate::err_if_len_is!(act, con, == 0);
con.write_array_length(act.len() - 1).await?;
let mut keys = act.into_iter().skip(1);
while let Some(key) = keys.next() {
let res: Option<Bytes> = {
let rhandle = handle.acquire_read();
@ -61,7 +54,7 @@ where
con.write_response(BytesWrapper(value)).await?;
} else {
// Ah, couldn't find that key
con.write_response(RespCodes::NotFound).await?;
con.write_response(RespCode::NotFound).await?;
}
}
drop(handle);

@ -44,19 +44,72 @@ pub mod update;
pub mod uset;
pub mod heya {
//! Respond to `HEYA` queries
use crate::protocol;
use crate::dbnet::connection::prelude::*;
use crate::protocol;
use protocol::responses;
/// Returns a `HEY!` `Response`
pub async fn heya<T, Strm>(
_handle: &crate::coredb::CoreDB,
con: &mut T,
_act: crate::protocol::ActionGroup,
_act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
con.write_response(&**responses::fresp::R_HEYA).await
con.write_response(&**responses::groups::HEYA).await
}
}
#[macro_export]
macro_rules! err_if_len_is {
($buf:ident, $con:ident, == $len:literal) => {
if $buf.len() - 1 == $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, != $len:literal) => {
if $buf.len() - 1 != $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, > $len:literal) => {
if $buf.len() - 1 > $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, < $len:literal) => {
if $buf.len() - 1 < $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, >= $len:literal) => {
if $buf.len() - 1 >= $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, <= $len:literal) => {
if $buf.len() - 1 <= $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
($buf:ident, $con:ident, & $len:literal) => {
if $buf.len() - 1 & $len {
return $con
.write_response(&**crate::protocol::responses::groups::ACTION_ERR)
.await;
}
};
}

@ -24,33 +24,29 @@
*
*/
use crate::coredb::{self};
use crate::coredb;
use crate::coredb::htable::Entry;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;
use crate::coredb::htable::Entry;
/// Run an `MSET` query
pub async fn mset<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
let howmany = act.len() - 1;
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys
// is not the same as the number of values, we won't run this
// action at all
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
// Write #<m>\n#<n>\n&<howmany>\n to the stream
// It is howmany/2 since we will be writing howmany/2 number of responses
con.write_response(GroupBegin(1)).await?;
let mut kviter = act.into_iter();
let mut kviter = act.into_iter().skip(1);
let done_howmany: Option<usize>;
{
if let Some(mut whandle) = handle.acquire_write() {
@ -72,6 +68,6 @@ where
if let Some(done_howmany) = done_howmany {
return con.write_response(done_howmany as usize).await;
} else {
return con.write_response(&**responses::fresp::R_SERVER_ERR).await;
return con.write_response(&**responses::groups::SERVER_ERR).await;
}
}

@ -24,33 +24,29 @@
*
*/
use crate::coredb::{self};
use crate::coredb;
use crate::coredb::htable::Entry;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;
use crate::coredb::htable::Entry;
/// Run an `MUPDATE` query
pub async fn mupdate<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
let howmany = act.len() - 1;
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys
// is not the same as the number of values, we won't run this
// action at all
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
// Write #<m>\n#<n>\n&<howmany>\n to the stream
// It is howmany/2 since we will be writing howmany/2 number of responses
con.write_response(GroupBegin(1)).await?;
let mut kviter = act.into_iter();
let mut kviter = act.into_iter().skip(1);
let done_howmany: Option<usize>;
{
if let Some(mut whandle) = handle.acquire_write() {
@ -72,6 +68,6 @@ where
if let Some(done_howmany) = done_howmany {
return con.write_response(done_howmany as usize).await;
} else {
return con.write_response(&**responses::fresp::R_SERVER_ERR).await;
return con.write_response(&**responses::groups::SERVER_ERR).await;
}
}

@ -27,29 +27,29 @@
//! # `SET` queries
//! This module provides functions to work with `SET` queries
use crate::coredb::htable::Entry;
use crate::coredb::{self};
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use coredb::Data;
use crate::coredb::htable::Entry;
use std::hint::unreachable_unchecked;
/// Run a `SET` query
pub async fn set<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
let howmany = act.len() - 1;
if howmany != 2 {
// There should be exactly 2 arguments
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut it = act.into_iter();
let mut it = act.into_iter().skip(1);
let did_we = {
if let Some(mut writer) = handle.acquire_write() {
let writer = writer.get_mut_ref();
@ -73,14 +73,13 @@ where
};
if let Some(did_we) = did_we {
if did_we {
con.write_response(&**responses::fresp::R_OKAY).await?;
con.write_response(&**responses::groups::OKAY).await?;
} else {
con.write_response(&**responses::fresp::R_OVERWRITE_ERR)
con.write_response(&**responses::groups::OVERWRITE_ERR)
.await?;
}
} else {
con.write_response(&**responses::fresp::R_SERVER_ERR)
.await?;
con.write_response(&**responses::groups::SERVER_ERR).await?;
}
Ok(())
}

@ -48,15 +48,15 @@ use std::hint::unreachable_unchecked;
pub async fn sset<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
let howmany = act.len() - 1;
if howmany & 1 == 1 || howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut failed = Some(false);
{
@ -67,7 +67,6 @@ where
// This iterator gives us the keys and values, skipping the first argument which
// is the action name
let mut key_iter = act
.get_ref()
.get(1..)
.unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked if the action group contains more than one arugment
@ -91,7 +90,7 @@ where
}) {
// Since the failed flag is false, none of the keys existed
// So we can safely set the keys
let mut iter = act.into_iter();
let mut iter = act.into_iter().skip(1);
while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
if mut_table.insert(key, Data::from_string(value)).is_some() {
// Tell the compiler that this will never be the case
@ -110,13 +109,13 @@ where
}
if let Some(failed) = failed {
if failed {
con.write_response(&**responses::fresp::R_OVERWRITE_ERR)
con.write_response(&**responses::groups::OVERWRITE_ERR)
.await
} else {
con.write_response(&**responses::fresp::R_OKAY).await
con.write_response(&**responses::groups::OKAY).await
}
} else {
con.write_response(&**responses::fresp::R_SERVER_ERR).await
con.write_response(&**responses::groups::SERVER_ERR).await
}
}
@ -127,15 +126,15 @@ where
pub async fn sdel<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
let howmany = act.len() - 1;
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut failed = Some(false);
{
@ -143,11 +142,9 @@ where
// doesn't go beyond the scope of this function - and is never used across
// an await: cause, the compiler ain't as smart as we are ;)
let mut key_iter = act
.get_ref()
.get(1..)
.unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): This is safe as we've already checked that there are arguments
// in the action group other than the action
// UNSAFE(@ohsayan): We've already checked if the action group contains more than one arugment
unreachable_unchecked()
})
.iter();
@ -169,7 +166,7 @@ where
}) {
// Since the failed flag is false, all of the keys exist
// So we can safely delete the keys
act.into_iter().for_each(|key| {
act.into_iter().skip(1).for_each(|key| {
// Since we've already checked that the keys don't exist
// We'll tell the compiler to optimize this
let _ = mut_table.remove(&key).unwrap_or_else(|| unsafe {
@ -185,12 +182,12 @@ where
}
if let Some(failed) = failed {
if failed {
con.write_response(&**responses::fresp::R_NIL).await
con.write_response(&**responses::groups::NIL).await
} else {
con.write_response(&**responses::fresp::R_OKAY).await
con.write_response(&**responses::groups::OKAY).await
}
} else {
con.write_response(&**responses::fresp::R_SERVER_ERR).await
con.write_response(&**responses::groups::SERVER_ERR).await
}
}
@ -201,15 +198,15 @@ where
pub async fn supdate<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
let howmany = act.len() - 1;
if howmany & 1 == 1 || howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut failed = Some(false);
{
@ -217,11 +214,9 @@ where
// doesn't go beyond the scope of this function - and is never used across
// an await: cause, the compiler ain't as smart as we are ;)
let mut key_iter = act
.get_ref()
.get(1..)
.unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked that the action group contains more
// than one argument. So, this is a safe optimization
// UNSAFE(@ohsayan): We've already checked if the action group contains more than one arugment
unreachable_unchecked()
})
.iter();
@ -248,7 +243,7 @@ where
}) {
// Since the failed flag is false, none of the keys existed
// So we can safely update the keys
let mut iter = act.into_iter();
let mut iter = act.into_iter().skip(1);
while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
if mut_table.insert(key, Data::from_string(value)).is_none() {
// Tell the compiler that this will never be the case
@ -262,11 +257,11 @@ where
}
if let Some(failed) = failed {
if failed {
con.write_response(&**responses::fresp::R_NIL).await
con.write_response(&**responses::groups::NIL).await
} else {
con.write_response(&**responses::fresp::R_OKAY).await
con.write_response(&**responses::groups::OKAY).await
}
} else {
con.write_response(&**responses::fresp::R_SERVER_ERR).await
con.write_response(&**responses::groups::SERVER_ERR).await
}
}

@ -38,18 +38,18 @@ use std::hint::unreachable_unchecked;
pub async fn update<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
let howmany = act.len() - 1;
if howmany != 2 {
// There should be exactly 2 arguments
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
let mut it = act.into_iter();
let mut it = act.into_iter().skip(1);
let did_we = {
if let Some(mut whandle) = handle.acquire_write() {
let writer = whandle.get_mut_ref();
@ -73,13 +73,12 @@ where
};
if let Some(did_we) = did_we {
if did_we {
con.write_response(&**responses::fresp::R_OKAY).await?;
con.write_response(&**responses::groups::OKAY).await?;
} else {
con.write_response(&**responses::fresp::R_NIL).await?;
con.write_response(&**responses::groups::NIL).await?;
}
} else {
con.write_response(&**responses::fresp::R_SERVER_ERR)
.await?;
con.write_response(&**responses::groups::SERVER_ERR).await?;
}
Ok(())
}

@ -27,8 +27,6 @@
use crate::coredb::{self};
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::resp::GroupBegin;
/// Run an `USET` query
///
@ -36,23 +34,20 @@ use crate::resp::GroupBegin;
pub async fn uset<T, Strm>(
handle: &crate::coredb::CoreDB,
con: &mut T,
act: crate::protocol::ActionGroup,
act: Vec<String>,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let howmany = act.howmany();
let howmany = act.len() - 1;
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys
// is not the same as the number of values, we won't run this
// action at all
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
return con.write_response(&**responses::groups::ACTION_ERR).await;
}
// Write #<m>\n#<n>\n&<howmany>\n to the stream
// It is howmany/2 since we will be writing howmany/2 number of responses
con.write_response(GroupBegin(1)).await?;
let mut kviter = act.into_iter();
let mut kviter = act.into_iter().skip(1);
let failed = {
if let Some(mut whandle) = handle.acquire_write() {
let writer = whandle.get_mut_ref();
@ -67,7 +62,7 @@ where
}
};
if failed {
con.write_response(&**responses::fresp::R_SERVER_ERR).await
con.write_response(&**responses::groups::SERVER_ERR).await
} else {
con.write_response(howmany / 2).await
}

@ -62,9 +62,9 @@ use jemallocator::Jemalloc;
static GLOBAL: Jemalloc = Jemalloc;
/// The version text
static MSG: &'static str = "Skytable v0.5.2 | https://github.com/skytable/skytable\n";
static MSG: &'static str = "Skytable v0.5.2 | https://github.com/skytable/skytable";
/// The terminal art for `!noart` configurations
static TEXT: &'static str = "███████ ██  ██ ██  ██ ████████  █████  ██████  ██  ███████ \n████ ██   ██  ██     ██    ██   ██ ██   ██ ██  ██      \n████████████  ████   ██  ███████ ██████  ██  █████  \n██████  ██   ██  ██   ██ ██   ██ ██  ██     \n█████████ ██  ██  ██  ██  ██ ██████  ███████ ███████ \n \n ";
static TEXT: &'static str = "\n███████ ██  ██ ██  ██ ████████  █████  ██████  ██  ███████ \n████ ██   ██  ██     ██    ██   ██ ██   ██ ██  ██      \n████████████  ████   ██  ███████ ██████  ██  █████  \n██████  ██   ██  ██   ██ ██   ██ ██  ██     \n█████████ ██  ██  ██  ██  ██ ██████  ███████ ███████ \n ";
fn main() {
Builder::new()

@ -0,0 +1,67 @@
/*
* Created on Tue May 11 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use std::borrow::Cow;
#[derive(Debug, PartialEq)]
#[non_exhaustive]
/// # Data Types
///
/// This enum represents the data types supported by the Skyhash Protocol
pub enum Element {
/// Arrays can be nested! Their `<tsymbol>` is `&`
Array(Vec<Element>),
/// A String value; `<tsymbol>` is `+`
String(String),
/// An unsigned integer value; `<tsymbol>` is `:`
UnsignedInt(u64),
/// A non-recursive String array; tsymbol: `_`
FlatArray(Vec<String>),
}
impl Element {
/// This will return a reference to the first element in the element
///
/// If this element is a compound type, it will return a reference to the first element in the compound
/// type
pub fn get_first(&self) -> Option<Cow<String>> {
match self {
Self::Array(elem) => match elem.first() {
Some(el) => match el {
Element::String(st) => Some(Cow::Borrowed(&st)),
_ => None,
},
None => None,
},
Self::FlatArray(elem) => match elem.first() {
Some(el) => Some(Cow::Borrowed(&el)),
None => None,
},
Self::String(ref st) => Some(Cow::Borrowed(&st)),
_ => None,
}
}
}

File diff suppressed because it is too large Load Diff

@ -27,9 +27,8 @@
//! Primitives for generating Terrapipe compatible responses
pub mod groups {
//! # Pre-compiled response **groups**
//! These are pre-compiled response groups and **not** complete responses, that is, this is
//! to be sent after a `GroupBegin(n)` has been written to the stream. If complete
//! # Pre-compiled response **elements**
//! These are pre-compiled response groups and **not** complete responses. If complete
//! responses are required, user protocol::responses::fresp
use lazy_static::lazy_static;
lazy_static! {
@ -51,46 +50,56 @@ pub mod groups {
pub static ref HEYA: Vec<u8> = "+4\nHEY!\n".as_bytes().to_owned();
/// "Unknown action" error response
pub static ref UNKNOWN_ACTION: Vec<u8> = "!14\nUnknown action\n".as_bytes().to_owned();
pub static ref WRONGTYPE_ERR: Vec<u8> = "!1\n7\n".as_bytes().to_owned();
pub static ref SNAPSHOT_BUSY: Vec<u8> = "!17\nerr-snapshot-busy\n".as_bytes().to_owned();
/// Snapshot disabled (other error)
pub static ref SNAPSHOT_DISABLED: Vec<u8> = "!21\nerr-snapshot-disabled\n".as_bytes().to_owned();
/// Snapshot has illegal name (other error)
pub static ref SNAPSHOT_ILLEGAL_NAME: Vec<u8> = "!25\nerr-invalid-snapshot-name\n".as_bytes().to_owned();
/// Access after termination signal (other error)
pub static ref ERR_ACCESS_AFTER_TERMSIG: Vec<u8> = "!24\nerr-access-after-termsig\n".as_bytes().to_owned();
}
}
pub mod fresp {
pub mod full_responses {
//! # Pre-compiled **responses**
//! These are pre-compiled **complete** responses. This means that they should
//! be written off directly to the stream and should **not be preceded by a `GroupBegin(n)`**
//! be written off directly to the stream and should **not be preceded by any response metaframe**
use lazy_static::lazy_static;
lazy_static! {
/// Response code: 0 (Okay)
pub static ref R_OKAY: Vec<u8> = "#2\n*1\n#2\n&1\n!1\n0\n".as_bytes().to_owned();
pub static ref R_OKAY: Vec<u8> = "*1\n!1\n0\n".as_bytes().to_owned();
/// Response code: 1 (Nil)
pub static ref R_NIL: Vec<u8> = "#2\n*1\n#2\n&1\n!1\n1\n".as_bytes().to_owned();
pub static ref R_NIL: Vec<u8> = "*1\n!1\n1\n".as_bytes().to_owned();
/// Response code: 2 (Overwrite Error)
pub static ref R_OVERWRITE_ERR: Vec<u8> = "#2\n*1\n#2\n&1\n!1\n2\n".as_bytes().to_owned();
pub static ref R_OVERWRITE_ERR: Vec<u8> = "*1\n!1\n2\n".as_bytes().to_owned();
/// Response code: 3 (Action Error)
pub static ref R_ACTION_ERR: Vec<u8> = "#2\n*1\n#2\n&1\n!1\n3\n".as_bytes().to_owned();
pub static ref R_ACTION_ERR: Vec<u8> = "*1\n!1\n3\n".as_bytes().to_owned();
/// Response code: 4 (Packet Error)
pub static ref R_PACKET_ERR: Vec<u8> = "#2\n*1\n#2\n&1\n!1\n4\n".as_bytes().to_owned();
pub static ref R_PACKET_ERR: Vec<u8> = "*1\n!1\n4\n".as_bytes().to_owned();
/// Response code: 5 (Server Error)
pub static ref R_SERVER_ERR: Vec<u8> = "#2\n*1\n#2\n&1\n!1\n5\n".as_bytes().to_owned();
pub static ref R_SERVER_ERR: Vec<u8> = "*1\n!1\n5\n".as_bytes().to_owned();
/// Response code: 6 (Other Error _without description_)
pub static ref R_OTHER_ERR_EMPTY: Vec<u8> = "#2\n*1\n#2\n&1\n!1\n6\n".as_bytes().to_owned();
pub static ref R_OTHER_ERR_EMPTY: Vec<u8> = "*1\n!1\n6\n".as_bytes().to_owned();
/// A heya response
pub static ref R_HEYA: Vec<u8> = "#2\n*1\n#2\n&1\n+4\nHEY!\n".as_bytes().to_owned();
pub static ref R_HEYA: Vec<u8> = "*1\n+4\nHEY!\n".as_bytes().to_owned();
/// An other response with description: "Unknown action"
pub static ref R_UNKNOWN_ACTION: Vec<u8> = "#2\n*1\n#2\n&1\n!14\nUnknown action\n"
pub static ref R_UNKNOWN_ACTION: Vec<u8> = "*1\n!14\nUnknown action\n"
.as_bytes()
.to_owned();
/// A 0 uint64 reply
pub static ref R_ONE_INT_REPLY: Vec<u8> = "#2\n*1\n#2\n&1\n:1\n1\n".as_bytes().to_owned();
pub static ref R_ONE_INT_REPLY: Vec<u8> = "*1\n:1\n1\n".as_bytes().to_owned();
/// A 1 uint64 reply
pub static ref R_ZERO_INT_REPLY: Vec<u8> = "#2\n*1\n#2\n&1\n:1\n0\n".as_bytes().to_owned();
pub static ref R_ZERO_INT_REPLY: Vec<u8> = "*1\n:1\n0\n".as_bytes().to_owned();
/// Snapshot busy (other error)
pub static ref R_SNAPSHOT_BUSY: Vec<u8> = "#2\n*1\n#2\n&1\n!17\nerr-snapshot-busy\n".as_bytes().to_owned();
pub static ref R_SNAPSHOT_BUSY: Vec<u8> = "*1\n!17\nerr-snapshot-busy\n".as_bytes().to_owned();
/// Snapshot disabled (other error)
pub static ref R_SNAPSHOT_DISABLED: Vec<u8> = "#2\n*1\n#2\n&1\n!21\nerr-snapshot-disabled\n".as_bytes().to_owned();
pub static ref R_SNAPSHOT_DISABLED: Vec<u8> = "*1\n!21\nerr-snapshot-disabled\n".as_bytes().to_owned();
/// Snapshot has illegal name (other error)
pub static ref R_SNAPSHOT_ILLEGAL_NAME: Vec<u8> = "#2\n*1\n#2\n&1\n!25\nerr-invalid-snapshot-name\n".as_bytes().to_owned();
pub static ref R_SNAPSHOT_ILLEGAL_NAME: Vec<u8> = "*1\n!25\nerr-invalid-snapshot-name\n".as_bytes().to_owned();
/// Access after termination signal (other error)
pub static ref R_ERR_ACCESS_AFTER_TERMSIG: Vec<u8> = "#2\n*1\n#2\n&1\n!24\nerr-access-after-termsig\n".as_bytes().to_owned();
pub static ref R_ERR_ACCESS_AFTER_TERMSIG: Vec<u8> = "*1\n!24\nerr-access-after-termsig\n".as_bytes().to_owned();
/// Response code: 7; wrongtype
pub static ref R_WRONGTYPE_ERR: Vec<u8> = "*1\n!1\n7".as_bytes().to_owned();
}
}

@ -30,7 +30,7 @@ use crate::coredb::CoreDB;
use crate::dbnet::connection::prelude::*;
use crate::gen_match;
use crate::protocol::responses;
use crate::protocol::ActionGroup;
use crate::protocol::Element;
use crate::{admin, kvengine};
mod tags {
@ -73,23 +73,14 @@ mod tags {
}
/// Execute a simple(*) query
pub async fn execute_simple<T, Strm>(
db: &CoreDB,
con: &mut T,
buf: ActionGroup,
) -> std::io::Result<()>
pub async fn execute_simple<T, Strm>(db: &CoreDB, con: &mut T, buf: Element) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
let first = match buf.get_first() {
None => {
return con
.write_response(responses::fresp::R_PACKET_ERR.to_owned())
.await
.map_err(|e| e.into());
}
Some(f) => f.to_uppercase(),
Some(element) => element.to_uppercase(),
None => return con.write_response(&**responses::groups::PACKET_ERR).await,
};
gen_match!(
first,
@ -123,16 +114,21 @@ where
/// **NOTE:** This macro needs _paths_ for both sides of the $x => $y, to produce something sensible
macro_rules! gen_match {
($pre:ident, $db:ident, $con:ident, $buf:ident, $($x:path => $y:path),*) => {
let flat_array = if let crate::protocol::Element::FlatArray(array) = $buf {
array
} else {
return $con.write_response(&**responses::groups::WRONGTYPE_ERR).await;
};
match $pre.as_str() {
// First repeat over all the $x => $y patterns, passing in the variables
// and adding .await calls and adding the `?`
$(
$x => $y($db, $con, $buf).await?,
$x => $y($db, $con, flat_array).await?,
)*
// Now add the final case where no action is matched
_ => {
$con.write_response(responses::fresp::R_UNKNOWN_ACTION.to_owned())
.await?;
return $con.write_response(&**responses::groups::UNKNOWN_ACTION)
.await;
},
}
};

@ -27,7 +27,7 @@
//! Utilities for generating responses, which are only used by the `server`
//!
use bytes::Bytes;
use libsky::terrapipe::RespCodes;
use skytable::RespCode;
use std::future::Future;
use std::io::Error as IoError;
use std::pin::Pin;
@ -59,7 +59,7 @@ pub trait IsConnection: std::marker::Sync + std::marker::Send {
fn write_lowlevel<'s>(
&'s mut self,
bytes: &'s [u8],
) -> Pin<Box<dyn Future<Output = Result<usize, IoError>> + Send + Sync + 's>>;
) -> Pin<Box<dyn Future<Output = Result<(), IoError>> + Send + Sync + 's>>;
}
impl<T> IsConnection for T
@ -69,8 +69,8 @@ where
fn write_lowlevel<'s>(
&'s mut self,
bytes: &'s [u8],
) -> Pin<Box<dyn Future<Output = Result<usize, IoError>> + Send + Sync + 's>> {
Box::pin(self.write(bytes))
) -> Pin<Box<dyn Future<Output = Result<(), IoError>> + Send + Sync + 's>> {
Box::pin(self.write_all(bytes))
}
}
@ -82,16 +82,6 @@ where
#[derive(Debug, PartialEq)]
pub struct BytesWrapper(pub Bytes);
/// This indicates the beginning of a response group in a response.
///
/// It holds the number of items to be written and writes:
/// ```text
/// #<self.0.to_string().len().to_string().into_bytes()>\n
/// &<self.0.to_string()>\n
/// ```
#[derive(Debug, PartialEq)]
pub struct GroupBegin(pub usize);
impl BytesWrapper {
pub fn finish_into_bytes(self) -> Bytes {
self.0
@ -151,13 +141,13 @@ impl Writable for BytesWrapper {
}
}
impl Writable for RespCodes {
impl Writable for RespCode {
fn write<'s>(
self,
con: &'s mut impl IsConnection,
) -> Pin<Box<(dyn Future<Output = Result<(), IoError>> + Send + Sync + 's)>> {
async fn write_bytes(con: &mut impl IsConnection, code: RespCodes) -> Result<(), IoError> {
if let RespCodes::OtherError(Some(e)) = code {
async fn write_bytes(con: &mut impl IsConnection, code: RespCode) -> Result<(), IoError> {
if let RespCode::ErrorString(e) = code {
// Since this is an other error which contains a description
// we'll write !<no_of_bytes> followed by the string
con.write_lowlevel(&[b'!']).await?;
@ -194,32 +184,6 @@ impl Writable for RespCodes {
}
}
impl Writable for GroupBegin {
fn write<'s>(
self,
con: &'s mut impl IsConnection,
) -> Pin<Box<(dyn Future<Output = Result<(), IoError>> + Send + Sync + 's)>> {
async fn write_bytes(con: &mut impl IsConnection, size: usize) -> Result<(), IoError> {
con.write_lowlevel(b"#2\n*1\n").await?;
// First write a `#` which indicates that the next bytes give the
// prefix length
con.write_lowlevel(&[b'#']).await?;
let group_len_as_bytes = size.to_string().into_bytes();
let group_prefix_len_as_bytes = (group_len_as_bytes.len() + 1).to_string().into_bytes();
// Now write Self's len as bytes
con.write_lowlevel(&group_prefix_len_as_bytes).await?;
// Now write a LF and '&' which signifies the beginning of a datagroup
con.write_lowlevel(&[b'\n', b'&']).await?;
// Now write the number of items in the datagroup as bytes
con.write_lowlevel(&group_len_as_bytes).await?;
// Now write a '\n' character
con.write_lowlevel(&[b'\n']).await?;
Ok(())
}
Box::pin(write_bytes(con, self.0))
}
}
impl Writable for usize {
fn write<'s>(
self,

File diff suppressed because it is too large Load Diff

@ -26,34 +26,4 @@
//! This module contains automated tests for queries
use crate::coredb::CoreDB;
use crate::dbnet;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
mod kvengine;
/// The function macro returns the name of a function
#[macro_export]
macro_rules! __func__ {
() => {{
fn f() {}
fn typename<T>(_: T) -> &'static str {
std::any::type_name::<T>()
}
let fn_name = typename(f);
&fn_name[..fn_name.len() - 3]
}};
}
async fn start_test_server(port: u16, db: Option<CoreDB>) -> SocketAddr {
let mut socket = String::from("127.0.0.1:");
socket.push_str(&port.to_string());
let db = db.unwrap_or(CoreDB::new_empty(0, Arc::new(None)));
let listener = TcpListener::bind(socket)
.await
.expect(&format!("Failed to bind to port {}", port));
let addr = listener.local_addr().unwrap();
tokio::spawn(async move { dbnet::test_run(listener, db, tokio::signal::ctrl_c()).await });
addr
}

@ -9,7 +9,10 @@ edition = "2018"
[dependencies]
rand = "0.8.3"
devtimer = "4.0.1"
libsky = {path="../libsky"}
clap = {version = "2.33.3", features=["yaml"]}
serde = {version="1.0.125", features=["derive"]}
skytable = { git = "https://github.com/skytable/client-rust", branch = "next" }
clap = { version = "2.33.3", features = ["yaml"] }
serde = { version = "1.0.125", features = ["derive"] }
serde_json = "1.0.64"
regex = "1.5.4"
lazy_static = "1.4.0"
libsky = {path = "../libsky"}

@ -31,7 +31,6 @@
mod benchtool {
use clap::{load_yaml, App};
use devtimer::DevTime;
use libsky::terrapipe;
use rand::distributions::Alphanumeric;
use rand::thread_rng;
use serde::Serialize;
@ -162,24 +161,26 @@ mod benchtool {
Some(h) => h.to_owned(),
None => "127.0.0.1".to_owned(),
};
host.push(':');
match matches.value_of("port") {
let port = match matches.value_of("port") {
Some(p) => match p.parse::<u16>() {
Ok(p) => host.push_str(&p.to_string()),
Ok(p) => p,
Err(_) => {
eprintln!("ERROR: Invalid port");
std::process::exit(0x100);
}
},
None => host.push_str("2003"),
}
None => 2003,
};
println!("Running a sanity test...");
// Run a ping test
if let Err(e) = sanity_test(&host) {
// Run a sanity test
if let Err(e) = sanity_test(&host, port) {
eprintln!("ERROR: Sanity test failed: {}\nBenchmark terminated", e);
return;
}
println!("Sanity test succeeded");
// now push in the port to the host string
host.push_str(":");
host.push_str(&port.to_string());
let mut rand = thread_rng();
if let Some(matches) = matches.subcommand_matches("testkey") {
let numkeys = matches.value_of("count").unwrap();
@ -195,7 +196,7 @@ mod benchtool {
.map(|_| ran_string(8, &mut rand))
.collect();
let set_packs: Vec<Vec<u8>> = (0..num)
.map(|idx| terrapipe::proc_query(format!("SET {} {}", keys[idx], values[idx])))
.map(|idx| libsky::into_raw_query(&format!("SET {} {}", keys[idx], values[idx])))
.collect();
set_packs.into_iter().for_each(|packet| {
np.execute(packet);
@ -254,13 +255,13 @@ mod benchtool {
one of `set_packs`
*/
let set_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| terrapipe::proc_query(format!("SET {} {}", keys[idx], values[idx])))
.map(|idx| libsky::into_raw_query(&format!("SET {} {}", keys[idx], values[idx])))
.collect();
let get_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| terrapipe::proc_query(format!("GET {}", keys[idx])))
.map(|idx| libsky::into_raw_query(&format!("GET {}", keys[idx])))
.collect();
let del_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| terrapipe::proc_query(format!("DEL {}", keys[idx])))
.map(|idx| libsky::into_raw_query(&format!("DEL {}", keys[idx])))
.collect();
eprintln!("Per-packet size (GET): {} bytes", get_packs[0].len());
eprintln!("Per-packet size (SET): {} bytes", set_packs[0].len());
@ -314,50 +315,51 @@ mod benchtool {
/// ## Limitations
/// A 65535 character long key/value pair is created and fetched. This random string has extremely low
/// chances of colliding with any existing key
fn sanity_test(host: &String) -> Result<(), Box<dyn Error>> {
let mut sock = TcpStream::connect(host)
.map_err(|e| format!("connection to host failed with error '{}'", e))?;
let query = terrapipe::proc_query("HEYA");
sock.write_all(&query)
.map_err(|e| format!("couldn't write data to socket with error '{}'", e))?;
let res_should_be = "#2\n*1\n#2\n&1\n+4\nHEY!\n".as_bytes().to_owned();
let mut response = vec![0; res_should_be.len()];
sock.read_exact(&mut response)
.map_err(|e| format!("couldn't read data from socket with error '{}'", e))?;
if response != res_should_be {
fn sanity_test(host: &str, port: u16) -> Result<(), Box<dyn Error>> {
use skytable::{Connection, Element, Query, RespCode, Response};
let mut rng = thread_rng();
let mut connection = Connection::new(host, port)?;
// test heya
let mut query = Query::new();
query.arg("heya");
if !connection
.run_simple_query(query)
.unwrap()
.eq(&Response::Item(Element::String("HEY!".to_owned())))
{
return Err("HEYA test failed".into());
}
let mut ran = thread_rng();
let key = ran_string(65535, &mut ran);
let value = ran_string(65535, &mut ran);
let query = terrapipe::proc_query(format!("SET {} {}", key, value));
sock.write_all(&query)
.map_err(|e| format!("couldn't write data to socket with error '{}'", e))?;
let res_should_be = "#2\n*1\n#2\n&1\n!1\n0\n".as_bytes().to_owned();
let mut response = vec![0; res_should_be.len()];
sock.read_exact(&mut response)
.map_err(|e| format!("couldn't read data from socket with error '{}'", e))?;
if response != res_should_be {
let key = ran_string(65536, &mut rng);
let value = ran_string(65536, &mut rng);
let mut query = Query::new();
query.arg("set");
query.arg(&key);
query.arg(&value);
if !connection
.run_simple_query(query)
.unwrap()
.eq(&Response::Item(Element::RespCode(RespCode::Okay)))
{
return Err("SET test failed".into());
}
let query = terrapipe::proc_query(format!("GET {}", key));
sock.write_all(&query)
.map_err(|e| format!("couldn't write data to socket with error '{}'", e))?;
let res_should_be = format!("#2\n*1\n#2\n&1\n+65535\n{}\n", value).into_bytes();
let mut response = vec![0; res_should_be.len()];
sock.read_exact(&mut response)
.map_err(|e| format!("couldn't read data from socket with error '{}'", e))?;
if response != res_should_be {
let mut query = Query::new();
query.arg("get");
query.arg(&key);
if !connection
.run_simple_query(query)
.unwrap()
.eq(&Response::Item(Element::String(value.to_owned())))
{
return Err("GET test failed".into());
}
let query = terrapipe::proc_query(format!("DEL {}", key));
sock.write_all(&query)
.map_err(|e| format!("couldn't write data to socket with error '{}'", e))?;
let res_should_be = "#2\n*1\n#2\n&1\n:1\n1\n".as_bytes().to_owned();
let mut response = vec![0; res_should_be.len()];
sock.read_exact(&mut response)
.map_err(|e| format!("couldn't read data from socket with error '{}'", e))?;
if response != res_should_be {
let mut query = Query::new();
query.arg("del");
query.arg(&key);
if !connection
.run_simple_query(query)
.unwrap()
.eq(&Response::Item(Element::UnsignedInt(1)))
{
return Err("DEL test failed".into());
}
Ok(())

@ -12,5 +12,4 @@ proc-macro = true
[dependencies]
syn = {version = "1.0.72", features = ["full"]}
quote = "1.0.9"
rand = "0.8.3"
proc-macro2 = "1.0.26"

@ -35,21 +35,20 @@
//!
//! ### Macros and ghost values
//! - `#[dbtest]`:
//! - `stream` - `tokio::net::TcpListener`
//! - `asyncdb` - `sdb::coredb::CoreDB`
//! - `con` - `skytable::AsyncConnection`
//! - `query` - `skytable::Query`
//!
use proc_macro::TokenStream;
use proc_macro2::Span;
use quote::quote;
use rand::*;
use std::collections::HashSet;
use syn::{self};
/// This parses a function within a `dbtest` module
///
/// This accepts an `async` function and returns a non-`async` version of it - by
/// making the body of the function use the `tokio` runtime
fn parse_dbtest(mut input: syn::ItemFn, rand: u16) -> Result<TokenStream, syn::Error> {
fn parse_dbtest(mut input: syn::ItemFn) -> Result<TokenStream, syn::Error> {
let sig = &mut input.sig;
let fname = sig.ident.to_string();
let body = &input.block;
@ -64,13 +63,16 @@ fn parse_dbtest(mut input: syn::ItemFn, rand: u16) -> Result<TokenStream, syn::E
}
sig.asyncness = None;
let body = quote! {
let asyncdb = crate::coredb::CoreDB::new_empty(0, std::sync::Arc::new(None));
let addr = crate::tests::start_test_server(#rand, Some(asyncdb.clone())).await;
let mut stream = tokio::net::TcpStream::connect(&addr).await.unwrap();
let mut con = skytable::AsyncConnection::new("127.0.0.1", 2003).await.unwrap();
let mut query = skytable::Query::new();
#body
stream.shutdown().await.unwrap();
asyncdb.finish_db();
drop(asyncdb);
{
let mut __flush__ = skytable::Query::new(); __flush__.arg("flushdb");
std::assert_eq!(
con.run_simple_query(__flush__).await.unwrap(),
skytable::Response::Item(skytable::Element::RespCode(skytable::RespCode::Okay))
);
}
};
let result = quote! {
#header
@ -90,7 +92,7 @@ fn parse_dbtest(mut input: syn::ItemFn, rand: u16) -> Result<TokenStream, syn::E
}
/// This function checks if the current function is eligible to be a test
fn parse_test_sig(input: syn::ItemFn, rand: u16) -> TokenStream {
fn parse_test_sig(input: syn::ItemFn) -> TokenStream {
for attr in &input.attrs {
if attr.path.is_ident("test") {
let msg = "second test attribute is supplied";
@ -106,7 +108,7 @@ fn parse_test_sig(input: syn::ItemFn, rand: u16) -> TokenStream {
.to_compile_error()
.into();
}
parse_dbtest(input, rand).unwrap_or_else(|e| e.to_compile_error().into())
parse_dbtest(input).unwrap_or_else(|e| e.to_compile_error().into())
}
/// This function accepts an entire module which comprises of `dbtest` functions.
@ -172,25 +174,8 @@ fn parse_test_module(args: TokenStream, item: TokenStream) -> TokenStream {
.to_compile_error()
.into();
}
let mut rng = thread_rng();
let mut in_set = HashSet::<u16>::new();
/*
* As per [this comment](https://github.com/actions/virtual-environments/issues/3275#issuecomment-828214572)
* from the GitHub Actions team, Windows reserves several ports. As our runners are currently hosted on GHA which use Hyper-V VMs
* these ports will be blocked too and thse blocks are the reasons behind spurious test failures on Windows.
* As a consequence to this, we will exclude these port ranges from the random port allocation set
* (by setting them to 'already used' or 'already in in_set').
*/
#[cfg(windows)]
add_reserved_ports(&mut in_set);
let mut result = quote! {};
for item in content {
// We set the port range to the 'dynamic port range' as per IANA's allocation guidelines
let mut rand: u16 = rng.gen_range(49152..=65535);
while in_set.contains(&rand) {
rand = rng.gen_range(49152..=65535);
}
in_set.insert(rand);
match item {
// We just care about functions, so parse functions and ignore everything
// else
@ -202,7 +187,7 @@ fn parse_test_module(args: TokenStream, item: TokenStream) -> TokenStream {
};
continue;
}
let inp = parse_test_sig(function, rand);
let inp = parse_test_sig(function);
let __tok: syn::ItemFn = syn::parse_macro_input!(inp as syn::ItemFn);
let tok = quote! {
#__tok
@ -236,20 +221,22 @@ fn parse_string(int: syn::Lit, span: Span, field: &str) -> Result<String, syn::E
#[proc_macro_attribute]
/// The `dbtest` macro starts an async server in the background and is meant for
/// use within the `sdb` or `WORKSPACEROOT/server/` crate. If you use this compiler
/// use within the `skyd` or `WORKSPACEROOT/server/` crate. If you use this compiler
/// macro in any other crate, you'll simply get compilation errors
///
/// All tests will clean up all values once a single test is over. **These tests should not
/// be run in multi-threaded environments because they often use the same keys**
/// ## _Ghost_ values
/// This macro gives a `tokio::net::TcpStream` accessible by the `stream` variable and a `sdb::coredb::CoreDB`
/// accessible by the `asyncdb` variable.
/// This macro gives a `skytable::AsyncConnection` accessible by the `con` variable and a mutable
/// `skytable::Query` accessible by the `query` variable
///
/// ## Requirements
///
/// The `#[dbtest]` macro expects several things. The calling crate:
/// - should have the `tokio` crate as a dependency and should have the
/// `features` set to full
/// - should have a function to start an async test server, available with the following path:
/// `crate::tests::start_test_server` which accepts an `u16` as the port number
/// - should have the `skytable` crate as a dependency and should have the `features` set to `async` and version
/// upstreamed to `next` on skytable/client-rust
///
/// ## Conventions
/// Since `proc_macro` cannot accept _file-linked_ modules and only accepts inline modules, we have made a workaround, which
@ -260,78 +247,3 @@ fn parse_string(int: syn::Lit, span: Span, field: &str) -> Result<String, syn::E
pub fn dbtest(args: TokenStream, item: TokenStream) -> TokenStream {
parse_test_module(args, item)
}
#[cfg(windows)]
/// We will parse the output from `netsh interface ipv4 show excludedportrange protocol=tcp` on Windows
/// We will then use this to add the port ranges to our `in_set` to not use them
///
/// This is what a typical output of the above command looks like:
/// ```text
///
/// Protocol tcp Port Exclusion Ranges
///
/// Start Port End Port
/// ---------- --------
/// 8501 8501
/// 47001 47001
///
/// * - Administered port exclusions.
///
/// ```
/// So, we first ignore all empty lines and then validate the headers (i.e "start port", "end port", "protocol tcp", etc)
/// and then once that's all good -- we parse the start and end ports and then turn it into a range, and run an iterator
/// over every element in this range, pushing elements into our `set` (or `in_set`)
fn add_reserved_ports(set: &mut HashSet<u16>) {
use std::process::Command;
let mut netsh = Command::new("netsh");
netsh
.arg("interface")
.arg("ipv4")
.arg("show")
.arg("excludedportrange")
.arg("protocol=tcp");
let output = netsh.output().unwrap();
if output.stderr.len() != 0 {
panic!("Errored while trying to get port exclusion ranges on Windows");
}
let stdout = String::from_utf8_lossy(&output.stdout);
let lines: Vec<&str> = stdout
.lines()
.filter(|line| line.len() != 0)
.map(|line| line.trim())
.collect();
let mut line_iter = lines.into_iter();
if let Some("Protocol tcp Port Exclusion Ranges") = line_iter.next() {
} else {
panic!("netsh returned bad output on Windows");
}
match (line_iter.next(), line_iter.next()) {
(Some(line2), Some(line3))
if (line2.contains("Start Port") && line2.contains("End Port"))
&& (line3.contains("---")) => {}
_ => panic!("netsh returned bad stdout for parsing port exclusion ranges on Windows"),
}
// Great, so now we the stdout is as we expected it to be
// Now we will trim each line, get the port range and parse it into u16s
for line in line_iter {
if line.starts_with("*") {
// The last line should look like `* - Administered port exclusions.`
break;
}
let port_high_low: Vec<u16> = line
.split_whitespace()
.map(|port_string| {
port_string
.parse::<u16>()
.expect("Returned port by netsh was not a valid u16")
})
.collect();
if port_high_low.len() != 2 {
panic!("netsh returned more than three columns instead of the expected two for parsing port exclusion ranges");
}
let (range_low, range_high) = (port_high_low[0], port_high_low[1]);
(range_low..=range_high).into_iter().for_each(|port| {
set.insert(port);
})
}
}

Loading…
Cancel
Save