Merge branch 'ssl' into next

Signed-off-by: Sayan Nandan <nandansayan@outlook.com>
next
Sayan Nandan 4 years ago
commit 3a6b9b282f
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

8
.idea/.gitignore vendored

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

356
Cargo.lock generated

@ -2,9 +2,9 @@
# It is not intended for manual editing.
[[package]]
name = "aho-corasick"
version = "0.7.13"
version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86"
checksum = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5"
dependencies = [
"memchr",
]
@ -18,12 +18,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "arc-swap"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
[[package]]
name = "atty"
version = "0.2.14"
@ -59,9 +53,9 @@ checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "byteorder"
version = "1.3.4"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
checksum = "ae44d1a3d5a19df61dd0c8beb138458ac2a53a7ac09eba97d55592540004306b"
[[package]]
name = "bytes"
@ -71,9 +65,9 @@ checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
[[package]]
name = "cc"
version = "1.0.59"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66120af515773fb005778dc07c261bd201ec8ce50bd6e7144c927753fe013381"
checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48"
[[package]]
name = "cfg-if"
@ -81,6 +75,12 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
@ -110,15 +110,6 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "cloudabi"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467"
dependencies = [
"bitflags",
]
[[package]]
name = "devtimer"
version = "4.0.1"
@ -138,49 +129,162 @@ dependencies = [
"termcolor",
]
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "fs_extra"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
[[package]]
name = "futures"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da9052a1a50244d8d5aa9bf55cbc2fb6f357c86cc52e46c62ed390a7180cf150"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2d31b7ec7efab6eefc7c57233bb10b847986139d88cc2f5a02a1ae6871a1846"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79e5145dde8da7d1b3892dad07a9c98fc04bc39892b1ecc9692cf53e2b780a65"
[[package]]
name = "futures-executor"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9e59fdc009a4b3096bf94f740a0f2424c082521f20a9b08c5c07c48d90fd9b9"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500"
[[package]]
name = "futures-macro"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf5c69029bda2e743fddd0582d1083951d65cc9539aebf8812f36c3491342d6"
[[package]]
name = "futures-task"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13de07eb8ea81ae445aca7b69f5f7bf15d7bf4912d8ca37d6645c77ae8a58d86"
dependencies = [
"once_cell",
]
[[package]]
name = "futures-util"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee8025cf36f917e6a52cce185b7c7177689b838b7ec138364e50cc2277a56cf4"
checksum = "4060f4657be78b8e766215b02b18a2e862d83745545de804638e2b545e81aee6"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"libc",
"wasi 0.9.0+wasi-snapshot-preview1",
"wasi",
]
[[package]]
name = "hermit-abi"
version = "0.1.15"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3deed196b6e7f9e44a2ae8d94225d80302d81208b1bb673fd21fe634645c85a9"
checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8"
dependencies = [
"libc",
]
[[package]]
name = "humantime"
version = "2.0.1"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c1ad908cc71012b7bea4d0c53ba96a8cba9962f048fa68d143376143d863b7a"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "instant"
version = "0.1.6"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b141fdc7836c525d4d594027d318c84161ca17aaf8113ab1f81ab93ae897485"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "itoa"
version = "0.4.6"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]]
name = "jemalloc-sys"
@ -211,9 +315,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.72"
version = "0.2.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701"
checksum = "89203f3fba0a3795506acaad8ebce3c80c0af93f994d5a1d7a0b1eeb23271929"
[[package]]
name = "libtdb"
@ -226,9 +330,9 @@ dependencies = [
[[package]]
name = "lock_api"
version = "0.4.1"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28247cc5a5be2f05fbcd76dd0cf2c7d3b5400cb978a28042abcd4fa0b3f8261c"
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
dependencies = [
"scopeguard",
]
@ -239,20 +343,20 @@ version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcf3805d4480bb5b86070dcfeb9e2cb2ebc148adb753c5cca5f884d1d65a42b2"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
]
[[package]]
name = "memchr"
version = "2.3.3"
version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "mio"
version = "0.7.6"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f33bc887064ef1fd66020c9adfc45bb9f33d75a42096c81e7c56c65b75dd1a8b"
checksum = "e50ae3f04d169fcc9bde0b547d1c205219b7157e07ded9c5aff03e0637cb3ed7"
dependencies = [
"libc",
"log",
@ -282,9 +386,9 @@ dependencies = [
[[package]]
name = "num-integer"
version = "0.1.43"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
@ -292,9 +396,9 @@ dependencies = [
[[package]]
name = "num-traits"
version = "0.2.12"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
@ -315,6 +419,43 @@ version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
[[package]]
name = "openssl"
version = "0.10.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "038d43985d1ddca7a9900630d8cd031b56e4794eecc2e9ea39dd17aa04399a70"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
"foreign-types",
"lazy_static",
"libc",
"openssl-sys",
]
[[package]]
name = "openssl-src"
version = "111.13.0+1.1.1i"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "045e4dc48af57aad93d665885789b43222ae26f4886494da12d1ed58d309dcb6"
dependencies = [
"cc",
]
[[package]]
name = "openssl-sys"
version = "0.9.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "921fc71883267538946025deffb622905ecad223c28efbfdef9bb59a0175f3e6"
dependencies = [
"autocfg",
"cc",
"libc",
"openssl-src",
"pkg-config",
"vcpkg",
]
[[package]]
name = "parking_lot"
version = "0.11.1"
@ -328,12 +469,11 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.8.0"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b"
checksum = "9ccb628cad4f84851442432c60ad8e1f607e29752d0bf072cbd0baf28aa34272"
dependencies = [
"cfg-if",
"cloudabi",
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall",
@ -341,17 +481,61 @@ dependencies = [
"winapi",
]
[[package]]
name = "pin-project"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95b70b68509f17aa2857863b6fa00bf21fc93674c7a8893de2f469f6aa7ca2f2"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caa25a6393f22ce819b0f50e0be89287292fda8d425be38ee0ca14c4931d9e71"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.0"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c"
checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c"
[[package]]
name = "ppv-lite86"
version = "0.2.9"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
@ -395,9 +579,9 @@ dependencies = [
[[package]]
name = "rand_core"
version = "0.6.0"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8b34ba8cfb21243bd8df91854c830ff0d785fff2e82ebd4434c2644cb9ada18"
checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5"
dependencies = [
"getrandom",
]
@ -480,29 +664,33 @@ dependencies = [
[[package]]
name = "signal-hook-registry"
version = "1.2.0"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41"
checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6"
dependencies = [
"arc-swap",
"libc",
]
[[package]]
name = "slab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "smallvec"
version = "1.4.1"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
version = "0.3.16"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fd8b795c389288baa5f355489c65e71fd48a02104600d15c4cfbc561e9e429d"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"libc",
"redox_syscall",
"winapi",
]
@ -532,16 +720,20 @@ dependencies = [
"chrono",
"clap",
"env_logger",
"futures",
"jemallocator",
"lazy_static",
"libtdb",
"log",
"openssl",
"openssl-sys",
"parking_lot",
"regex",
"serde",
"serde_derive",
"tdb_macros",
"tokio",
"tokio-openssl",
"toml",
]
@ -559,7 +751,7 @@ dependencies = [
[[package]]
name = "tdb_macros"
version = "0.2.2"
version = "0.5.0"
dependencies = [
"proc-macro2",
"quote",
@ -587,9 +779,9 @@ dependencies = [
[[package]]
name = "thread_local"
version = "1.0.1"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
checksum = "bb9bc092d0d51e76b2b19d9d85534ffc9ec2db959a2523cdae0697e2972cd447"
dependencies = [
"lazy_static",
]
@ -601,7 +793,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"wasi",
"winapi",
]
@ -636,6 +828,18 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-openssl"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac1bec5c0a4aa71e3459802c7a12e8912c2091ce2151004f9ce95cc5d1c6124e"
dependencies = [
"futures",
"openssl",
"pin-project",
"tokio",
]
[[package]]
name = "toml"
version = "0.5.8"
@ -653,8 +857,10 @@ dependencies = [
"clap",
"lazy_static",
"libtdb",
"openssl",
"regex",
"tokio",
"tokio-openssl",
]
[[package]]
@ -670,16 +876,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "vec_map"
version = "0.8.2"
name = "vcpkg"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
name = "vec_map"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "wasi"

@ -16,7 +16,7 @@ TerrabaseDB (or TDB for short) is an effort to provide the best of key/value sto
## Getting started 🚀
1. Download a bundle for your platform from [here ⬇️ ](https://github.com/terrabasedb/terrabase/releases)
1. Download a bundle for your platform from [here ⬇️ ](https://github.com/terrabasedb/terrabasedb/releases)
2. Unzip the bundle
3. Make the files executable (run `chmod +x tdb tsh` on *nix systems)
4. First run `tdb` to start the database server and then run `tsh` to start the interactive shell

@ -12,4 +12,6 @@ tokio = {version = "1.0.2", features = ["full"]}
bytes = "1.0.1"
regex = "1.4.3"
lazy_static = "1.4.0"
clap = {version = "2.33.3", features=["yaml"]}
clap = {version = "2.33.3", features=["yaml"]}
openssl = { version = "0.10.32", features = ["vendored"] }
tokio-openssl = "0.6.1"

@ -23,9 +23,10 @@ use crate::protocol;
use clap::load_yaml;
use clap::App;
use libtdb::terrapipe::ADDR;
use protocol::{Con, Connection, SslConnection};
use std::io::{self, prelude::*};
use std::process;
const MSG_WELCOME: &'static str = "TerrabaseDB v0.5.0";
const MSG_WELCOME: &'static str = "TerrabaseDB v0.5.1";
/// This creates a REPL on the command line and also parses command-line arguments
///
@ -50,23 +51,33 @@ pub async fn start_repl() {
},
None => host.push_str("2003"),
}
let ssl = matches.value_of("cert");
let mut con = if let Some(sslcert) = ssl {
let con = match SslConnection::new(&host, sslcert).await {
Ok(c) => c,
Err(e) => {
eprintln!("ERROR: {}", e);
process::exit(0x100);
}
};
Con::Secure(con)
} else {
let con = match Connection::new(&host).await {
Ok(c) => c,
Err(e) => {
eprintln!("ERROR: {}", e);
process::exit(0x100);
}
};
Con::Insecure(con)
};
if let Some(eval_expr) = matches.value_of("eval") {
if eval_expr.len() == 0 {
return;
}
if let Err(e) = protocol::Connection::oneshot(&host, eval_expr.to_string()).await {
eprintln!("ERROR: {}", e);
process::exit(0x100);
}
con.execute_query(eval_expr.to_string()).await;
return;
}
let mut con = match protocol::Connection::new(&host).await {
Ok(c) => c,
Err(e) => {
eprintln!("ERROR: {}", e);
process::exit(0x100);
}
};
println!("{}", MSG_WELCOME);
loop {
print!("tsh>");
@ -85,6 +96,6 @@ pub async fn start_repl() {
// The query was empty, so let it be
continue;
}
con.run_query(rl).await;
con.execute_query(rl).await;
}
}

@ -20,28 +20,35 @@
#
name: TerrabaseDB Shell
version: 0.5.0
version: 0.5.1
author: Sayan N. <ohsayan@outlook.com>
about: The TerrabaseDB Shell (tsh)
args:
- host:
short: h
required: false
long: host
value_name: host
help: Sets the remote host to connect to
takes_value: true
- port:
short: p
required: false
long: port
value_name: port
help: Sets the remote port to connect to
takes_value: true
- eval:
short: e
required: false
long: eval
value_name: expression
help: Run an expression without REPL
takes_value: true
- host:
short: h
required: false
long: host
value_name: host
help: Sets the remote host to connect to
takes_value: true
- port:
short: p
required: false
long: port
value_name: port
help: Sets the remote port to connect to
takes_value: true
- eval:
short: e
required: false
long: eval
value_name: expression
help: Run an expression without REPL
takes_value: true
- cert:
short: C
required: false
long: sslcert
value_name: cert
help: Sets the PEM certificate to use for SSL connections
takes_value: true

@ -26,14 +26,35 @@ use lazy_static::lazy_static;
use libtdb::terrapipe;
use libtdb::TResult;
use libtdb::BUF_CAP;
use openssl::ssl::Ssl;
use openssl::ssl::SslContext;
use openssl::ssl::SslMethod;
use regex::Regex;
use std::io::Result as IoResult;
use std::net::SocketAddr;
use std::pin::Pin;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_openssl::SslStream;
lazy_static! {
static ref RE: Regex = Regex::new("[^\\s\"']+|\"[^\"]*\"|'[^']*'").unwrap();
}
pub enum Con {
Secure(SslConnection),
Insecure(Connection),
}
impl Con {
pub async fn execute_query(&mut self, query: String) {
match self {
Con::Insecure(con) => con.run_query(query).await,
Con::Secure(con) => con.run_query(query).await,
}
}
}
/// A `Connection` is a wrapper around a`TcpStream` and a read buffer
pub struct Connection {
stream: TcpStream,
@ -44,21 +65,12 @@ impl Connection {
/// Create a new connection, creating a connection to `host`
pub async fn new(host: &str) -> TResult<Self> {
let stream = TcpStream::connect(host).await?;
println!("Connected to {}", host);
println!("Connected to tp://{}", host);
Ok(Connection {
stream,
buffer: BytesMut::with_capacity(BUF_CAP),
})
}
pub async fn oneshot(host: &str, query: String) -> TResult<()> {
let mut con = Connection {
stream: TcpStream::connect(host).await?,
buffer: BytesMut::with_capacity(BUF_CAP),
};
con.run_query(query).await;
drop(con);
Ok(())
}
/// This function will write a query to the stream and read the response from the
/// server. It will then determine if the returned response is complete or incomplete
/// or invalid.
@ -121,3 +133,109 @@ impl Connection {
deserializer::parse(&self.buffer)
}
}
/// An `SslConnection` is a wrapper around a `SslStream<TcpStream>` provided by OpenSSL and a
/// read buffer
pub struct SslConnection {
stream: SslStream<TcpStream>,
buffer: BytesMut,
}
impl SslConnection {
/// Create a new connection, creating a connection to `host`
pub async fn new(host: &str, sslcert: &str) -> TResult<Self> {
let mut ctx = SslContext::builder(SslMethod::tls_client())?;
ctx.set_ca_file(sslcert)?;
let ssl = Ssl::new(&ctx.build())?;
let stream = TcpStream::connect(host).await?;
let mut stream = SslStream::new(ssl, stream)?;
Pin::new(&mut stream).connect().await.unwrap();
println!("Connected to tps://{}", host);
Ok(SslConnection {
stream,
buffer: BytesMut::with_capacity(BUF_CAP),
})
}
/// This function will write a query to the stream and read the response from the
/// server. It will then determine if the returned response is complete or incomplete
/// or invalid.
///
/// - If it is complete, then the return is parsed into a `Display`able form
/// and written to the output stream. If any parsing errors occur, they're also handled
/// by this function (usually, "Invalid Response" is written to the terminal).
/// - If the packet is incomplete, it will wait to read the entire response from the stream
/// - If the packet is corrupted, it will output "Invalid Response"
pub async fn run_query(&mut self, query: String) {
let query = terrapipe::proc_query(query);
match self.stream.write_all(&query).await {
Ok(_) => (),
Err(e) => {
eprintln!("ERROR: Couldn't write data to socket with '{}'", e);
return;
}
};
loop {
if let Err(e) = self.read_again().await {
eprintln!("ERROR: Reading from stream failed with: '{}'", e);
return;
}
match self.try_response().await {
ClientResult::Empty(f) => {
self.buffer.advance(f);
eprintln!("ERROR: The remote end reset the connection");
return;
}
ClientResult::Incomplete => {
continue;
}
ClientResult::Response(r, f) => {
self.buffer.advance(f);
if r.len() == 0 {
return;
}
for group in r {
println!("{}", group);
}
return;
}
ClientResult::InvalidResponse(r) => {
self.buffer.advance(r);
eprintln!("{}", ClientResult::InvalidResponse(0));
return;
}
}
}
}
/// This function is a subroutine of `run_query` used to parse the response packet
async fn try_response(&mut self) -> ClientResult {
if self.buffer.is_empty() {
// The connection was possibly reset
return ClientResult::Empty(0);
}
deserializer::parse(&self.buffer)
}
async fn read_again(&mut self) -> Result<(), String> {
match self.stream.read_buf(&mut self.buffer).await {
Ok(0) => {
if self.buffer.is_empty() {
return Ok(());
} else {
return Err(format!(
"Connection reset while reading from {}",
if let Ok(p) = self.get_peer() {
p.to_string()
} else {
"peer".to_owned()
}
)
.into());
}
}
Ok(_) => Ok(()),
Err(e) => return Err(format!("{}", e)),
}
}
fn get_peer(&self) -> IoResult<SocketAddr> {
self.stream.get_ref().peer_addr()
}
}

@ -0,0 +1,17 @@
[server]
host = "127.0.0.1"
port = 2003
noart = false
[ssl]
key = "/path/to/keyfile.pem"
chain = "/path/to/chain.pem"
port = 2004
[bgsave]
enabled = true
every = 120
[snapshot]
every = 3600
atmost = 4

@ -12,7 +12,7 @@ port = 2003 # The port to which you want TDB to bind to
# Set `noart` to true if you want to disable terminal artwork
noart = false
# This key is *OPTIONAL*, but will be required post 0.5.0
# This key is *OPTIONAL*, but will be required post 0.5.1
[bgsave]
# Run `BGSAVE` `every` seconds. For example, setting this to 60 will cause BGSAVE to run
# after every 2 minutes

@ -13,7 +13,8 @@ libtdb = {path ="../libtdb"}
bincode = "1.3.1"
parking_lot = "0.11.1"
lazy_static = "1.4.0"
serde_derive = "1.0.118"
serde_derive = "1.0.119"
futures = "0.3.12"
serde = {version = "1.0.119", features= ["derive"]}
toml = "0.5.8"
clap = {version = "2.33.3", features=["yaml"]}
@ -22,7 +23,9 @@ log = "0.4.13"
chrono = "0.4.19"
regex = "1.4.3"
tdb_macros = {path="../tdb-macros"}
tokio-openssl = "0.6.1"
openssl = { version = "0.10", features = ["vendored"] }
openssl-sys = "0.9.60"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.3.2"

@ -20,10 +20,11 @@
*/
use crate::coredb::CoreDB;
use crate::dbnet::Con;
use crate::diskstore;
use crate::diskstore::snapshot::SnapshotEngine;
use crate::diskstore::snapshot::DIR_SNAPSHOT;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::terrapipe::RespCodes;
use libtdb::TResult;
@ -32,7 +33,7 @@ use std::path::PathBuf;
/// Create a snapshot
///
pub async fn mksnap(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn mksnap(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany == 0 {
if !handle.is_snapshot_enabled() {

@ -1,83 +1,83 @@
#
# Created on Tue Sep 01 2020
#
# This file is a part of TerrabaseDB
# Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
name: TerrabaseDB Server
version: 0.5.0
version: 0.5.1
author: Sayan N. <ohsayan@outlook.com>
about: The TerrabaseDB Database server
args:
- config:
short: c
required: false
long: withconfig
value_name: cfgfile
help: Sets a configuration file to start tdb
takes_value: true
- restore:
short: r
required: false
long: restore
value_name: snapshotfile
help: Restores data from a previous snapshot
takes_value: true
- host:
short: h
required: false
long: host
value_name: host
help: Sets the host to which the server will bind
takes_value: true
- port:
short: p
required: false
long: port
value_name: port
help: Sets the port to which the server will bind
takes_value: true
- noart:
required: false
long: noart
help: Disables terminal artwork
takes_value: false
- nosave:
required: false
long: nosave
help: Disables automated background saving
takes_value: false
- saveduration:
required: false
long: saveduration
value_name: duration
short: S
takes_value: true
help: Set the BGSAVE duration
- snapevery:
required: false
long: snapevery
value_name: duration
help: Set the periodic snapshot duration
takes_value: true
- snapkeep:
required: false
long: snapkeep
value_name: count
help: Sets the number of most recent snapshots to keep
takes_value: true
- config:
short: c
required: false
long: withconfig
value_name: cfgfile
help: Sets a configuration file to start tdb
takes_value: true
- restore:
short: r
required: false
long: restore
value_name: snapshotfile
help: Restores data from a previous snapshot
takes_value: true
- host:
short: h
required: false
long: host
value_name: host
help: Sets the host to which the server will bind
takes_value: true
- port:
short: p
required: false
long: port
value_name: port
help: Sets the port to which the server will bind
takes_value: true
- noart:
required: false
long: noart
help: Disables terminal artwork
takes_value: false
- nosave:
required: false
long: nosave
help: Disables automated background saving
takes_value: false
- saveduration:
required: false
long: saveduration
value_name: duration
short: S
takes_value: true
help: Set the BGSAVE duration
- snapevery:
required: false
long: snapevery
value_name: duration
help: Set the periodic snapshot duration
takes_value: true
- snapkeep:
required: false
long: snapkeep
value_name: count
help: Sets the number of most recent snapshots to keep
takes_value: true
- sslkey:
required: false
long: sslkey
short: k
value_name: key
help: Sets the PEM key file to use for SSL/TLS
takes_value: true
- sslchain:
required: false
long: sslchain
short: z
value_name: chain
help: Sets the PEM chain file to use for SSL/TLS
takes_value: true
- sslonly:
required: false
long: sslonly
takes_value: false
help: >-
Tells the server to only accept SSL connections and disables the non-SSL
port

@ -34,13 +34,17 @@ use std::path::PathBuf;
use tokio::net::ToSocketAddrs;
use toml;
const DEFAULT_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const DEFAULT_PORT: u16 = 2003;
const DEFAULT_SSL_PORT: u16 = 2004;
/// This struct is an _object representation_ used for parsing the TOML file
#[derive(Deserialize, Debug, PartialEq)]
pub struct Config {
/// The `server` key
server: ConfigKeyServer,
/// The `bgsave` key
/* TODO(@ohsayan): As of now, we will keep this optional, but post 0.5.0,
/* TODO(@ohsayan): As of now, we will keep this optional, but post 0.5.1,
* we will make it compulsory (so that we don't break semver)
* See the link below for more details:
* https://github.com/terrabasedb/terrabasedb/issues/21#issuecomment-693217709
@ -48,6 +52,8 @@ pub struct Config {
bgsave: Option<ConfigKeyBGSAVE>,
/// The snapshot key
snapshot: Option<ConfigKeySnapshot>,
/// SSL configuration
ssl: Option<KeySslOpts>,
}
/// The BGSAVE section in the config file
@ -124,6 +130,75 @@ pub struct ConfigKeySnapshot {
atmost: usize,
}
/// Port configuration
///
/// This enumeration determines whether the ports are:
/// - `Multi`: This means that the database server will be listening to both
/// SSL **and** non-SSL requests
/// - `SecureOnly` : This means that the database server will only accept SSL requests
/// and will not even activate the non-SSL socket
/// - `InsecureOnly` : This indicates that the server would only accept non-SSL connections
/// and will not even activate the SSL socket
#[derive(Debug, PartialEq)]
pub enum PortConfig {
SecureOnly {
host: IpAddr,
ssl: SslOpts,
},
Multi {
host: IpAddr,
port: u16,
ssl: SslOpts,
},
InsecureOnly {
host: IpAddr,
port: u16,
},
}
impl PortConfig {
#[cfg(test)]
pub const fn default() -> PortConfig {
PortConfig::InsecureOnly {
host: DEFAULT_IPV4,
port: DEFAULT_PORT,
}
}
}
impl PortConfig {
pub const fn new_secure_only(host: IpAddr, ssl: SslOpts) -> Self {
PortConfig::SecureOnly { host, ssl }
}
pub const fn new_insecure_only(host: IpAddr, port: u16) -> Self {
PortConfig::InsecureOnly { host, port }
}
pub const fn new_multi(host: IpAddr, port: u16, ssl: SslOpts) -> Self {
PortConfig::Multi { host, port, ssl }
}
}
#[derive(Deserialize, Debug, PartialEq)]
pub struct KeySslOpts {
key: String,
chain: String,
port: u16,
only: Option<bool>,
}
#[derive(Deserialize, Debug, PartialEq)]
pub struct SslOpts {
pub key: String,
pub chain: String,
pub port: u16,
}
impl SslOpts {
pub const fn new(key: String, chain: String, port: u16) -> Self {
SslOpts { key, chain, port }
}
}
#[derive(Debug, PartialEq)]
/// The snapshot configuration
///
@ -180,16 +255,14 @@ impl SnapshotConfig {
/// configuration
#[derive(Debug, PartialEq)]
pub struct ParsedConfig {
/// A valid IPv4/IPv6 address
host: IpAddr,
/// A valid port
port: u16,
/// If `noart` is set to true, no terminal artwork should be displayed
noart: bool,
/// The BGSAVE configuration
pub bgsave: BGSave,
/// The snapshot configuration
pub snapshot: SnapshotConfig,
/// Port configuration
pub ports: PortConfig,
}
impl ParsedConfig {
@ -206,16 +279,14 @@ impl ParsedConfig {
}
/// Create a `ParsedConfig` instance from a `Config` object, which is a parsed
/// TOML file (represented as an object)
const fn from_config(cfg: Config) -> Self {
fn from_config(cfg_info: Config) -> Self {
ParsedConfig {
host: cfg.server.host,
port: cfg.server.port,
noart: if let Some(noart) = cfg.server.noart {
noart: if let Some(noart) = cfg_info.server.noart {
noart
} else {
false
},
bgsave: if let Some(bgsave) = cfg.bgsave {
bgsave: if let Some(bgsave) = cfg_info.bgsave {
match (bgsave.enabled, bgsave.every) {
// TODO: Show a warning that there are unused keys
(Some(enabled), Some(every)) => BGSave::new(enabled, every),
@ -226,11 +297,39 @@ impl ParsedConfig {
} else {
BGSave::default()
},
snapshot: if let Some(snapshot) = cfg.snapshot {
snapshot: if let Some(snapshot) = cfg_info.snapshot {
SnapshotConfig::Enabled(SnapshotPref::new(snapshot.every, snapshot.atmost))
} else {
SnapshotConfig::default()
},
ports: if let Some(sslopts) = cfg_info.ssl {
if sslopts.only.is_some() {
PortConfig::SecureOnly {
ssl: SslOpts {
key: sslopts.key,
chain: sslopts.chain,
port: sslopts.port,
},
host: cfg_info.server.host,
}
} else {
PortConfig::Multi {
ssl: SslOpts {
key: sslopts.key,
chain: sslopts.chain,
port: sslopts.port,
},
host: cfg_info.server.host,
port: cfg_info.server.port,
}
}
} else {
PortConfig::InsecureOnly {
host: cfg_info.server.host,
port: cfg_info.server.port,
}
},
}
}
#[cfg(test)]
@ -242,38 +341,38 @@ impl ParsedConfig {
/// and a supplied `port`
pub const fn default_with_port(port: u16) -> Self {
ParsedConfig {
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port,
noart: false,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ports: PortConfig::new_insecure_only(DEFAULT_IPV4, port),
}
}
#[cfg(test)]
pub const fn default_ports() -> PortConfig {
PortConfig::default()
}
/// Create a new `ParsedConfig` with the default `port` and `noart` settngs
/// and a supplied `host`
pub const fn default_with_host(host: IpAddr) -> Self {
ParsedConfig::new(
host,
2003,
false,
BGSave::default(),
SnapshotConfig::default(),
PortConfig::new_insecure_only(host, 2003),
)
}
/// Create a new `ParsedConfig` with all the fields
pub const fn new(
host: IpAddr,
port: u16,
noart: bool,
bgsave: BGSave,
snapshot: SnapshotConfig,
ports: PortConfig,
) -> Self {
ParsedConfig {
host,
port,
noart,
bgsave,
snapshot,
ports,
}
}
/// Create a default `ParsedConfig` with the following setup defaults:
@ -282,19 +381,15 @@ impl ParsedConfig {
/// - `noart` : false
/// - `bgsave_enabled` : true
/// - `bgsave_duration` : 120
/// - `ssl` : disabled
pub const fn default() -> Self {
ParsedConfig {
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 2003,
noart: false,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ports: PortConfig::new_insecure_only(DEFAULT_IPV4, 2003),
}
}
/// Return a (host, port) tuple which can be bound to with `TcpListener`
pub fn get_host_port_tuple(&self) -> impl ToSocketAddrs {
((self.host), self.port)
}
/// Returns `false` if `noart` is enabled. Otherwise it returns `true`
pub const fn is_artful(&self) -> bool {
!self.noart
@ -401,21 +496,29 @@ pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig, PathBu
path.push(val);
path
});
// Check flags
let sslonly = matches.is_present("sslonly");
let noart = matches.is_present("noart");
let nosave = matches.is_present("nosave");
// Check options
let filename = matches.value_of("config");
let host = matches.value_of("host");
let port = matches.value_of("port");
let noart = matches.is_present("noart");
let nosave = matches.is_present("nosave");
let snapevery = matches.value_of("snapevery");
let snapkeep = matches.value_of("snapkeep");
let saveduration = matches.value_of("saveduration");
let sslkey = matches.value_of("sslkey");
let sslchain = matches.value_of("sslchain");
let cli_has_overrideable_args = host.is_some()
|| port.is_some()
|| noart
|| nosave
|| snapevery.is_some()
|| snapkeep.is_some()
|| saveduration.is_some();
|| saveduration.is_some()
|| sslchain.is_some()
|| sslkey.is_some()
|| sslonly;
if filename.is_some() && cli_has_overrideable_args {
return Err(ConfigError::CfgError(
"Either use command line arguments or use a configuration file",
@ -506,7 +609,33 @@ pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig, PathBu
}
(None, None) => SnapshotConfig::Disabled,
};
let cfg = ParsedConfig::new(host, port, noart, bgsave, snapcfg);
let portcfg = match (
sslkey.map(|val| val.to_owned()),
sslchain.map(|val| val.to_owned()),
) {
(None, None) => {
if sslonly {
return Err(ConfigError::CliArgErr(
"You mast pass values for both --sslkey and --sslchain to use the --sslonly flag"
));
} else {
PortConfig::new_insecure_only(host, port)
}
}
(Some(key), Some(chain)) => {
if sslonly {
PortConfig::new_secure_only(host, SslOpts::new(key, chain, DEFAULT_SSL_PORT))
} else {
PortConfig::new_multi(host, port, SslOpts::new(key, chain, DEFAULT_SSL_PORT))
}
}
_ => {
return Err(ConfigError::CliArgErr(
"To use SSL, pass values for both --sslkey and --sslchain",
));
}
};
let cfg = ParsedConfig::new(noart, bgsave, snapcfg, portcfg);
return Ok(ConfigType::Custom(cfg, restorefile));
}
if let Some(filename) = filename {
@ -558,11 +687,10 @@ fn test_config_file_noart() {
assert_eq!(
cfg,
ParsedConfig {
port: 2003,
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
noart: true,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ports: PortConfig::default()
}
);
}
@ -575,11 +703,13 @@ fn test_config_file_ipv6() {
assert_eq!(
cfg,
ParsedConfig {
port: 2003,
host: IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0x1)),
noart: false,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ports: PortConfig::new_insecure_only(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0x1)),
DEFAULT_PORT
)
}
);
}
@ -592,11 +722,10 @@ fn test_config_file_template() {
assert_eq!(
cfg,
ParsedConfig::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
2003,
false,
BGSave::default(),
SnapshotConfig::Enabled(SnapshotPref::new(3600, 4))
SnapshotConfig::Enabled(SnapshotPref::new(3600, 4)),
PortConfig::default() // TODO: Update the template
)
);
}
@ -617,11 +746,10 @@ fn test_config_file_custom_bgsave() {
assert_eq!(
cfg,
ParsedConfig {
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 2003,
noart: false,
bgsave: BGSave::new(true, 600),
snapshot: SnapshotConfig::default()
snapshot: SnapshotConfig::default(),
ports: PortConfig::default()
}
);
}
@ -637,11 +765,10 @@ fn test_config_file_bgsave_enabled_only() {
assert_eq!(
cfg,
ParsedConfig {
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 2003,
noart: false,
bgsave: BGSave::default(),
snapshot: SnapshotConfig::default(),
ports: PortConfig::default()
}
)
}
@ -657,11 +784,10 @@ fn test_config_file_bgsave_every_only() {
assert_eq!(
cfg,
ParsedConfig {
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 2003,
noart: false,
bgsave: BGSave::new(true, 600),
snapshot: SnapshotConfig::default()
snapshot: SnapshotConfig::default(),
ports: PortConfig::default()
}
)
}
@ -675,9 +801,8 @@ fn test_config_file_snapshot() {
ParsedConfig {
snapshot: SnapshotConfig::Enabled(SnapshotPref::new(3600, 4)),
bgsave: BGSave::default(),
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 2003,
noart: false,
ports: PortConfig::default()
}
);
}

@ -24,8 +24,8 @@
use crate::config::BGSave;
use crate::config::SnapshotConfig;
use crate::config::SnapshotPref;
use crate::dbnet::Con;
use crate::diskstore;
use crate::protocol::Connection;
use crate::protocol::Query;
use crate::queryengine;
use bytes::Bytes;
@ -245,14 +245,16 @@ impl CoreDB {
}
/// Execute a query that has already been validated by `Connection::read_query`
pub async fn execute_query(&self, query: Query, con: &mut Connection) -> TResult<()> {
pub async fn execute_query(&self, query: Query, mut con: &mut Con<'_>) -> TResult<()> {
match query {
Query::Simple(q) => queryengine::execute_simple(&self, con, q).await?,
Query::Simple(q) => {
queryengine::execute_simple(&self, &mut con, q).await?;
// Once we're done executing, flush the stream
con.flush_stream().await
}
// TODO(@ohsayan): Pipeline commands haven't been implemented yet
Query::Pipelined(_) => unimplemented!(),
}
// Once we're done executing, flush the stream
con.flush_stream().await
}
/// Create a new `CoreDB` instance

@ -35,15 +35,21 @@
//!
use crate::config::BGSave;
use crate::config::PortConfig;
use crate::config::SnapshotConfig;
use crate::config::SslOpts;
use crate::diskstore::snapshot::DIR_REMOTE_SNAPSHOT;
use crate::protocol::tls::SslConnection;
use crate::protocol::tls::SslListener;
use crate::protocol::{Connection, QueryResult::*};
use crate::resp::Writable;
use crate::CoreDB;
use libtdb::util::terminal;
use libtdb::TResult;
use std::fs;
use std::future::Future;
use std::io::ErrorKind;
use std::net::IpAddr;
use std::path::PathBuf;
use std::process;
use std::sync::Arc;
@ -52,7 +58,6 @@ use tokio::net::TcpStream;
use tokio::sync::Semaphore;
use tokio::sync::{broadcast, mpsc};
use tokio::time::{self, Duration};
/// Responsible for gracefully shutting down the server instead of dying randomly
// Sounds very sci-fi ;)
pub struct Terminator {
@ -104,7 +109,7 @@ pub struct Listener {
}
/// A per-connection handler
struct CHandler {
pub struct CHandler {
db: CoreDB,
con: Connection,
climit: Arc<Semaphore>,
@ -150,16 +155,59 @@ impl Listener {
};
tokio::spawn(async move {
if let Err(e) = chandle.run().await {
eprintln!("Error: {}", e);
log::error!("Error: {}", e);
}
});
}
}
}
/// # Connection Wrapper
///
/// A `Con` object holds a mutable reference to a standard TCP stream or to
/// an encrypted connection (over the `SslListener` object). It provides a few
/// methods which are provided by the underlying interface.
pub enum Con<'a> {
/// A secure TLS connection
Secure(&'a mut SslConnection),
/// An insecure ('standard') TCP connection
Insecure(&'a mut Connection),
}
impl<'a> Con<'a> {
/// Create a new **unencrypted** connection instance
pub fn init<'b>(con: &'b mut Connection) -> Self
where
'b: 'a,
{
Con::Insecure(con)
}
/// Create a new **encrypted** connection instance
pub fn init_secure<'b>(con: &'b mut SslConnection) -> Self
where
'b: 'a,
{
Con::Secure(con)
}
/// Flush the stream that is held by the underlying connection
pub async fn flush_stream(&mut self) -> TResult<()> {
match self {
Con::Secure(con) => con.flush_stream().await,
Con::Insecure(con) => con.flush_stream().await,
}
}
/// Write bytes to the underlying stream that implement the `Writable` trait
pub async fn write_response(&mut self, resp: impl Writable) -> TResult<()> {
match self {
Con::Insecure(con) => con.write_response(resp).await,
Con::Secure(con) => con.write_response(resp).await,
}
}
}
impl CHandler {
/// Process the incoming connection
async fn run(&mut self) -> TResult<()> {
pub async fn run(&mut self) -> TResult<()> {
while !self.terminator.is_termination_signal() {
let try_df = tokio::select! {
tdf = self.con.read_query() => tdf,
@ -168,7 +216,11 @@ impl CHandler {
}
};
match try_df {
Ok(Q(s)) => self.db.execute_query(s, &mut self.con).await?,
Ok(Q(s)) => {
self.db
.execute_query(s, &mut Con::init(&mut self.con))
.await?
}
Ok(E(r)) => self.con.close_conn_with_error(r).await?,
Ok(Empty) => return Ok(()),
Err(e) => return Err(e.into()),
@ -187,9 +239,217 @@ impl Drop for CHandler {
}
use std::io::{self, prelude::*};
/// Multiple Listener Interface
///
/// A `MultiListener` is an abstraction over an `SslListener` or a `Listener` to facilitate
/// easier asynchronous listening on multiple ports.
///
/// - The `SecureOnly` variant holds an `SslListener`
/// - The `InsecureOnly` variant holds a `Listener`
/// - The `Multi` variant holds both an `SslListener` and a `Listener`
/// This variant enables listening to both secure and insecure sockets at the same time
/// asynchronously
enum MultiListener {
SecureOnly(SslListener),
InsecureOnly(Listener),
Multi(Listener, SslListener),
}
impl MultiListener {
/// Create a new `InsecureOnly` listener
pub async fn new_insecure_only(
host: IpAddr,
port: u16,
climit: Arc<Semaphore>,
db: CoreDB,
signal: broadcast::Sender<()>,
terminate_tx: mpsc::Sender<()>,
terminate_rx: mpsc::Receiver<()>,
) -> Self {
let listener = TcpListener::bind((host, port))
.await
.expect("Failed to bind to port");
MultiListener::InsecureOnly(Listener {
listener,
db,
climit,
signal,
terminate_tx,
terminate_rx,
})
}
/// Create a new `SecureOnly` listener
pub async fn new_secure_only(
host: IpAddr,
climit: Arc<Semaphore>,
db: CoreDB,
signal: broadcast::Sender<()>,
terminate_tx: mpsc::Sender<()>,
terminate_rx: mpsc::Receiver<()>,
ssl: SslOpts,
) -> Self {
let listener = TcpListener::bind((host, ssl.port))
.await
.expect("Failed to bind to port");
MultiListener::SecureOnly(
SslListener::new_pem_based_ssl_connection(
ssl.key,
ssl.chain,
db,
listener,
climit,
signal,
terminate_tx,
terminate_rx,
)
.expect("Couldn't bind to secure port"),
)
}
/// Create a new `Multi` listener that has both a secure and an insecure listener
pub async fn new_multi(
host: IpAddr,
port: u16,
climit: Arc<Semaphore>,
db: CoreDB,
signal: broadcast::Sender<()>,
terminate_tx: mpsc::Sender<()>,
terminate_rx: mpsc::Receiver<()>,
ssl_terminate_tx: mpsc::Sender<()>,
ssl_terminate_rx: mpsc::Receiver<()>,
ssl: SslOpts,
) -> Self {
let listener = TcpListener::bind((host, ssl.port))
.await
.expect("Failed to bind to port");
let secure_listener = SslListener::new_pem_based_ssl_connection(
ssl.key,
ssl.chain,
db.clone(),
listener,
climit.clone(),
signal.clone(),
ssl_terminate_tx,
ssl_terminate_rx,
)
.expect("Couldn't bind to secure port");
let listener = TcpListener::bind((host, port))
.await
.expect("Failed to bind to port");
let insecure_listener = Listener {
listener,
db,
climit,
signal,
terminate_tx,
terminate_rx,
};
MultiListener::Multi(insecure_listener, secure_listener)
}
/// Start the server
///
/// The running of single and/or parallel listeners is handled by this function by
/// exploiting the working of async functions
pub async fn run_server(&mut self) -> TResult<()> {
match self {
MultiListener::SecureOnly(secure_listener) => secure_listener.run().await,
MultiListener::InsecureOnly(insecure_listener) => insecure_listener.run().await,
MultiListener::Multi(insecure_listener, secure_listener) => {
let insec = insecure_listener.run();
let sec = secure_listener.run();
let (e1, e2) = futures::join!(insec, sec);
if let Err(e) = e1 {
log::error!("Insecure listener failed with: {}", e);
}
if let Err(e) = e2 {
log::error!("Secure listener failed with: {}", e);
}
Ok(())
}
}
}
/// Print the port binding status
pub fn print_binding(&self) {
match self {
MultiListener::SecureOnly(secure_listener) => {
log::info!(
"Server started on tps://{}",
secure_listener.listener.local_addr().expect("Failed to g")
)
}
MultiListener::InsecureOnly(insecure_listener) => {
log::info!(
"Server started on tp://{}",
insecure_listener
.listener
.local_addr()
.expect("Failed to g")
)
}
MultiListener::Multi(insecure_listener, secure_listener) => {
log::info!(
"Listening to tp://{} and tps://{}",
insecure_listener
.listener
.local_addr()
.expect("Failed to g"),
secure_listener.listener.local_addr().expect("Failed to g")
)
}
}
}
/// Signal the ports to shut down and only return after they have shut down
///
/// **Do note:** This function doesn't flush the `CoreDB` object! The **caller has to
/// make sure that the data is saved!**
pub async fn finish_with_termsig(self) {
match self {
MultiListener::InsecureOnly(server) => {
let Listener {
mut terminate_rx,
terminate_tx,
signal,
..
} = server;
drop(signal);
drop(terminate_tx);
let _ = terminate_rx.recv().await;
}
MultiListener::SecureOnly(server) => {
let SslListener {
mut terminate_rx,
terminate_tx,
signal,
..
} = server;
drop(signal);
drop(terminate_tx);
let _ = terminate_rx.recv().await;
}
MultiListener::Multi(insecure, secure) => {
let Listener {
mut terminate_rx,
terminate_tx,
signal,
..
} = insecure;
drop((signal, terminate_tx));
let _ = terminate_rx.recv().await;
let SslListener {
mut terminate_rx,
terminate_tx,
signal,
..
} = secure;
drop((signal, terminate_tx));
let _ = terminate_rx.recv().await;
}
}
}
}
/// Start the server waiting for incoming connections or a CTRL+C signal
pub async fn run(
listener: TcpListener,
ports: PortConfig,
bgsave_cfg: BGSave,
snapshot_cfg: SnapshotConfig,
sig: impl Future,
@ -200,7 +460,7 @@ pub async fn run(
let db = match CoreDB::new(bgsave_cfg, snapshot_cfg, restore_filepath) {
Ok(d) => d,
Err(e) => {
eprintln!("ERROR: {}", e);
log::error!("ERROR: {}", e);
process::exit(0x100);
}
};
@ -214,33 +474,58 @@ pub async fn run(
}
},
}
log::info!(
"Started server on terrapipe://{}",
listener
.local_addr()
.expect("The local address couldn't be fetched. Please file a bug report")
);
let mut server = Listener {
listener,
db,
climit: Arc::new(Semaphore::new(50000)),
signal,
terminate_tx,
terminate_rx,
let climit = Arc::new(Semaphore::new(50000));
let mut server = match ports {
PortConfig::InsecureOnly { host, port } => {
MultiListener::new_insecure_only(
host,
port,
climit.clone(),
db.clone(),
signal,
terminate_tx,
terminate_rx,
)
.await
}
PortConfig::SecureOnly { host, ssl } => {
MultiListener::new_secure_only(
host,
climit.clone(),
db.clone(),
signal,
terminate_tx,
terminate_rx,
ssl,
)
.await
}
PortConfig::Multi { host, port, ssl } => {
let (ssl_terminate_tx, ssl_terminate_rx) = mpsc::channel::<()>(1);
let server = MultiListener::new_multi(
host,
port,
climit,
db.clone(),
signal,
terminate_tx,
terminate_rx,
ssl_terminate_tx,
ssl_terminate_rx,
ssl,
)
.await;
server
}
};
server.print_binding();
tokio::select! {
_ = server.run() => {}
_ = server.run_server() => {}
_ = sig => {
log::info!("Signalling all workers to shut down");
}
}
let Listener {
mut terminate_rx,
terminate_tx,
signal,
db,
..
} = server;
server.finish_with_termsig().await;
if let Ok(_) = db.flush_db() {
log::info!("Successfully saved data to disk");
()
@ -259,9 +544,6 @@ pub async fn run(
}
}
}
drop(signal);
drop(terminate_tx);
let _ = terminate_rx.recv().await;
terminal::write_info("Goodbye :)\n").unwrap();
}

@ -19,12 +19,13 @@
*
*/
use crate::coredb::CoreDB;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
/// Get the number of keys in the database
pub async fn dbsize(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn dbsize(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
if act.howmany() != 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}

@ -23,7 +23,8 @@
//! This module provides functions to work with `DEL` queries
use crate::coredb::CoreDB;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
@ -31,7 +32,7 @@ use libtdb::TResult;
///
/// Do note that this function is blocking since it acquires a write lock.
/// It will write an entire datagroup, for this `del` action
pub async fn del(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn del(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -23,12 +23,13 @@
//! This module provides functions to work with `EXISTS` queries
use crate::coredb::CoreDB;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
/// Run an `EXISTS` query
pub async fn exists(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn exists(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -20,11 +20,12 @@
*/
use crate::coredb::CoreDB;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use libtdb::TResult;
/// Delete all the keys in the database
pub async fn flushdb(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn flushdb(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
if act.howmany() != 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
}

@ -23,13 +23,14 @@
//! This module provides functions to work with `GET` queries
use crate::coredb::CoreDB;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::{BytesWrapper, GroupBegin};
use bytes::Bytes;
use libtdb::TResult;
/// Run a `GET` query
pub async fn get(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn get(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany != 1 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -24,7 +24,8 @@
//! Functions for handling `JGET` queries
use crate::coredb::CoreDB;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use libtdb::TResult;
/// Run a `JGET` query
@ -36,7 +37,7 @@ use libtdb::TResult;
/// {"key":"value"}\n
/// ```
///
pub async fn jget(_handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn jget(_handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany != 1 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -19,14 +19,15 @@
*
*/
use crate::coredb::CoreDB;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
/// Run a `KEYLEN` query
///
/// At this moment, `keylen` only supports a single key
pub async fn keylen(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn keylen(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany != 1 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -20,7 +20,8 @@
*/
use crate::coredb::CoreDB;
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::{BytesWrapper, GroupBegin};
use bytes::Bytes;
use libtdb::terrapipe::RespCodes;
@ -28,7 +29,7 @@ use libtdb::TResult;
/// Run an `MGET` query
///
pub async fn mget(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn mget(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -39,13 +39,14 @@ pub mod update;
pub mod uset;
pub mod heya {
//! Respond to `HEYA` queries
use crate::dbnet::Con;
use crate::protocol;
use crate::protocol::ActionGroup;
use crate::CoreDB;
use libtdb::TResult;
use protocol::{responses, Connection};
use protocol::responses;
/// Returns a `HEY!` `Response`
pub async fn heya(_db: &CoreDB, con: &mut Connection, _buf: ActionGroup) -> TResult<()> {
pub async fn heya(_db: &CoreDB, con: &mut Con<'_>, _buf: ActionGroup) -> TResult<()> {
con.write_response(&**responses::fresp::R_HEYA).await
}
}

@ -20,13 +20,14 @@
*/
use crate::coredb::{self, CoreDB};
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
use std::collections::hash_map::Entry;
/// Run an `MSET` query
pub async fn mset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn mset(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys

@ -20,13 +20,14 @@
*/
use crate::coredb::{self, CoreDB};
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
use std::collections::hash_map::Entry;
/// Run an `MUPDATE` query
pub async fn mupdate(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn mupdate(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys

@ -23,14 +23,15 @@
//! This module provides functions to work with `SET` queries
use crate::coredb::{self, CoreDB};
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use coredb::Data;
use libtdb::TResult;
use std::collections::hash_map::Entry;
use std::hint::unreachable_unchecked;
/// Run a `SET` query
pub async fn set(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn set(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany != 2 {
// There should be exactly 2 arguments

@ -31,7 +31,8 @@
//! Do note that this isn't the same as the gurantees provided by ACID transactions
use crate::coredb::{CoreDB, Data};
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use libtdb::TResult;
use std::hint::unreachable_unchecked;
@ -39,7 +40,7 @@ use std::hint::unreachable_unchecked;
///
/// This either returns `Okay` if all the keys were set, or it returns an
/// `Overwrite Error` or code `2`
pub async fn sset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn sset(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany & 1 == 1 || howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
@ -101,7 +102,7 @@ pub async fn sset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TR
///
/// This either returns `Okay` if all the keys were `del`eted, or it returns a
/// `Nil`, which is code `1`
pub async fn sdel(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn sdel(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;
@ -157,7 +158,7 @@ pub async fn sdel(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TR
///
/// This either returns `Okay` if all the keys were updated, or it returns `Nil`
/// or code `1`
pub async fn supdate(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn supdate(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany & 1 == 1 || howmany == 0 {
return con.write_response(&**responses::fresp::R_ACTION_ERR).await;

@ -23,14 +23,15 @@
//! This module provides functions to work with `UPDATE` queries
//!
use crate::coredb::{self, CoreDB};
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use coredb::Data;
use libtdb::TResult;
use std::collections::hash_map::Entry;
use std::hint::unreachable_unchecked;
/// Run an `UPDATE` query
pub async fn update(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn update(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany != 2 {
// There should be exactly 2 arguments

@ -20,14 +20,15 @@
*/
use crate::coredb::{self, CoreDB};
use crate::protocol::{responses, ActionGroup, Connection};
use crate::dbnet::Con;
use crate::protocol::{responses, ActionGroup};
use crate::resp::GroupBegin;
use libtdb::TResult;
/// Run an `USET` query
///
/// This is like "INSERT or UPDATE"
pub async fn uset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> {
pub async fn uset(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> {
let howmany = act.howmany();
if howmany & 1 == 1 || howmany == 0 {
// An odd number of arguments means that the number of keys

@ -26,8 +26,8 @@
//! the modules for their respective documentation.
use crate::config::BGSave;
use crate::config::PortConfig;
use crate::config::SnapshotConfig;
use tokio::net::TcpListener;
mod config;
use std::env;
mod admin;
@ -54,7 +54,7 @@ use jemallocator::Jemalloc;
static GLOBAL: Jemalloc = Jemalloc;
/// The version text
static MSG: &'static str = "TerrabaseDB v0.5.0 | https://github.com/terrabasedb/terrabase";
static MSG: &'static str = "TerrabaseDB v0.5.1 | https://github.com/terrabasedb/terrabasedb";
/// The terminal art for `!noart` configurations
static TEXT: &'static str = "
_______ _ _____ ____
@ -76,7 +76,7 @@ async fn main() {
// Start the server which asynchronously waits for a CTRL+C signal
// which will safely shut down the server
let (tcplistener, bgsave_config, snapshot_config, restore_filepath) =
check_args_or_connect().await;
check_args_and_get_cfg().await;
run(
tcplistener,
bgsave_config,
@ -87,10 +87,10 @@ async fn main() {
.await;
}
/// This function checks the command line arguments and binds to an appropriate
/// port and host, as per the supplied configuration options
async fn check_args_or_connect() -> (
TcpListener,
/// This function checks the command line arguments and either returns a config object
/// or prints an error to `stderr` and terminates the server
async fn check_args_and_get_cfg() -> (
PortConfig,
BGSave,
SnapshotConfig,
Option<std::path::PathBuf>,
@ -104,35 +104,17 @@ async fn check_args_or_connect() -> (
println!("{}", MSG);
}
log::info!("Using settings from supplied configuration");
(
TcpListener::bind(cfg.get_host_port_tuple()).await,
cfg.bgsave,
cfg.snapshot,
file,
)
(cfg.ports, cfg.bgsave, cfg.snapshot, file)
}
Ok(config::ConfigType::Def(cfg, file)) => {
println!("{}\n{}", TEXT, MSG);
log::warn!("No configuration file supplied. Using default settings");
(
TcpListener::bind(cfg.get_host_port_tuple()).await,
cfg.bgsave,
cfg.snapshot,
file,
)
(cfg.ports, cfg.bgsave, cfg.snapshot, file)
}
Err(e) => {
log::error!("{}", e);
std::process::exit(0x100);
}
};
match binding_and_cfg {
(Ok(b), bgsave_cfg, snapshot_cfg, restore_file) => {
(b, bgsave_cfg, snapshot_cfg, restore_file)
}
(Err(e), _, _, _) => {
log::error!("Failed to bind to socket with error: '{}'", e);
std::process::exit(0x100);
}
}
binding_and_cfg
}

@ -38,6 +38,7 @@ use std::io::Result as IoResult;
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
pub mod tls;
/// A TCP connection wrapper
pub struct Connection {

@ -0,0 +1,267 @@
/*
* Created on Fri Dec 18 2020
*
* This file is a part of TerrabaseDB
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::deserializer;
use super::responses;
use super::IoResult;
use super::ParseResult;
use super::QueryResult;
use crate::dbnet::Con;
use crate::dbnet::Terminator;
use crate::resp::Writable;
use crate::CoreDB;
use bytes::Buf;
use bytes::BytesMut;
use libtdb::TResult;
use libtdb::BUF_CAP;
use openssl::ssl::{Ssl, SslAcceptor, SslFiletype, SslMethod};
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::BufWriter;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Semaphore;
use tokio::sync::{broadcast, mpsc};
use tokio::time::{self, Duration};
use tokio_openssl::SslStream;
pub struct SslListener {
/// An atomic reference to the coretable
pub db: CoreDB,
/// The incoming connection listener (binding)
pub listener: TcpListener,
/// The maximum number of connections
climit: Arc<Semaphore>,
/// The shutdown broadcaster
pub signal: broadcast::Sender<()>,
// When all `Sender`s are dropped - the `Receiver` gets a `None` value
// We send a clone of `terminate_tx` to each `CHandler`
pub terminate_tx: mpsc::Sender<()>,
pub terminate_rx: mpsc::Receiver<()>,
acceptor: SslAcceptor,
}
impl SslListener {
pub fn new_pem_based_ssl_connection(
key_file: String,
chain_file: String,
db: CoreDB,
listener: TcpListener,
climit: Arc<Semaphore>,
signal: broadcast::Sender<()>,
terminate_tx: mpsc::Sender<()>,
terminate_rx: mpsc::Receiver<()>,
) -> TResult<Self> {
log::debug!("New SSL/TLS connection registered");
let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls())?;
acceptor.set_private_key_file(key_file, SslFiletype::PEM)?;
acceptor.set_certificate_chain_file(chain_file)?;
let acceptor = acceptor.build();
Ok(SslListener {
db,
listener,
climit,
signal,
terminate_tx,
terminate_rx,
acceptor,
})
}
async fn accept(&mut self) -> TResult<SslStream<TcpStream>> {
log::debug!("Trying to accept a SSL connection");
let mut backoff = 1;
loop {
match self.listener.accept().await {
// We don't need the bindaddr
// We get the encrypted stream which we need to decrypt
// by using the acceptor
Ok((stream, _)) => {
log::debug!("Accepted an SSL/TLS connection");
let ssl = Ssl::new(self.acceptor.context())?;
let mut stream = SslStream::new(ssl, stream)?;
Pin::new(&mut stream).accept().await?;
log::debug!("Connected to secure socket over TCP");
return Ok(stream);
}
Err(e) => {
log::debug!("Failed to establish a secure connection");
if backoff > 64 {
// Too many retries, goodbye user
return Err(e.into());
}
}
}
// Wait for the `backoff` duration
time::sleep(Duration::from_secs(backoff)).await;
// We're using exponential backoff
backoff *= 2;
}
}
pub async fn run(&mut self) -> TResult<()> {
log::debug!("Started secure server");
loop {
// Take the permit first, but we won't use it right now
// that's why we will forget it
self.climit.acquire().await.unwrap().forget();
let stream = self.accept().await?;
let mut sslhandle = SslConnectionHandler {
db: self.db.clone(),
con: SslConnection::new(stream),
climit: self.climit.clone(),
terminator: Terminator::new(self.signal.subscribe()),
_term_sig_tx: self.terminate_tx.clone(),
};
tokio::spawn(async move {
log::debug!("Spawned listener task");
if let Err(e) = sslhandle.run().await {
log::error!("Error: {}", e);
}
});
}
}
}
pub struct SslConnectionHandler {
db: CoreDB,
con: SslConnection,
climit: Arc<Semaphore>,
terminator: Terminator,
_term_sig_tx: mpsc::Sender<()>,
}
impl SslConnectionHandler {
pub async fn run(&mut self) -> TResult<()> {
log::debug!("SslConnectionHanler initialized to handle a remote client");
while !self.terminator.is_termination_signal() {
let try_df = tokio::select! {
tdf = self.con.read_query() => tdf,
_ = self.terminator.receive_signal() => {
return Ok(());
}
};
match try_df {
Ok(QueryResult::Q(s)) => {
self.db
.execute_query(s, &mut Con::init_secure(&mut self.con))
.await?
}
Ok(QueryResult::E(r)) => {
log::debug!("Failed to read query!");
self.con.close_conn_with_error(r).await?
}
Ok(QueryResult::Empty) => return Ok(()),
Err(e) => return Err(e.into()),
}
}
Ok(())
}
}
impl Drop for SslConnectionHandler {
fn drop(&mut self) {
// Make sure that the permit is returned to the semaphore
// in the case that there is a panic inside
self.climit.add_permits(1);
}
}
pub struct SslConnection {
stream: BufWriter<SslStream<TcpStream>>,
buffer: BytesMut,
}
impl SslConnection {
pub fn new(stream: SslStream<TcpStream>) -> Self {
SslConnection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(BUF_CAP),
}
}
async fn read_again(&mut self) -> Result<(), String> {
match self.stream.read_buf(&mut self.buffer).await {
Ok(0) => {
// If 0 bytes were received, then the remote end closed
// the connection
if self.buffer.is_empty() {
return Ok(());
} else {
return Err(format!(
"Connection reset while reading from {}",
if let Ok(p) = self.get_peer() {
p.to_string()
} else {
"peer".to_owned()
}
)
.into());
}
}
Ok(_) => Ok(()),
Err(e) => return Err(format!("{}", e)),
}
}
fn get_peer(&self) -> IoResult<SocketAddr> {
self.stream.get_ref().get_ref().peer_addr()
}
/// Try to parse a query from the buffered data
fn try_query(&mut self) -> Result<ParseResult, ()> {
if self.buffer.is_empty() {
return Err(());
}
Ok(deserializer::parse(&self.buffer))
}
pub async fn read_query(&mut self) -> Result<QueryResult, String> {
self.read_again().await?;
loop {
match self.try_query() {
Ok(ParseResult::Query(query, forward)) => {
self.buffer.advance(forward);
return Ok(QueryResult::Q(query));
}
Ok(ParseResult::BadPacket(bp)) => {
self.buffer.advance(bp);
return Ok(QueryResult::E(responses::fresp::R_PACKET_ERR.to_owned()));
}
Err(_) => {
return Ok(QueryResult::Empty);
}
_ => (),
}
self.read_again().await?;
}
}
/// Write a response to the stream
pub async fn write_response(&mut self, streamer: impl Writable) -> TResult<()> {
streamer.write(&mut self.stream).await?;
Ok(())
}
pub async fn flush_stream(&mut self) -> TResult<()> {
self.stream.flush().await?;
Ok(())
}
/// Wraps around the `write_response` used to differentiate between a
/// success response and an error response
pub async fn close_conn_with_error(&mut self, resp: Vec<u8>) -> TResult<()> {
self.write_response(resp).await?;
self.stream.flush().await?;
Ok(())
}
}

@ -22,8 +22,9 @@
//! # The Query Engine
use crate::coredb::CoreDB;
use crate::dbnet::Con;
use crate::protocol::ActionGroup;
use crate::protocol::{responses, Connection};
use crate::protocol::{responses};
use crate::{admin, kvengine};
use libtdb::TResult;
mod tags {
@ -66,7 +67,7 @@ mod tags {
}
/// Execute a simple(*) query
pub async fn execute_simple(db: &CoreDB, con: &mut Connection, buf: ActionGroup) -> TResult<()> {
pub async fn execute_simple(db: &CoreDB, con: &mut Con<'_>, buf: ActionGroup) -> TResult<()> {
let first = match buf.get_first() {
None => {
return con

@ -21,15 +21,16 @@
//! Utilities for generating responses, which are only used by the `server`
//!
use bytes::Bytes;
use libtdb::terrapipe::RespCodes;
use std::error::Error;
use std::future::Future;
use std::io::Error as IoError;
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use tokio_openssl::SslStream;
/// # The `Writable` trait
/// All trait implementors are given access to an asynchronous stream to which
@ -42,10 +43,44 @@ pub trait Writable {
*/
fn write<'s>(
self,
con: &'s mut BufWriter<TcpStream>,
con: &'s mut impl IsConnection,
) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error>>> + Send + Sync + 's>>;
}
pub trait IsConnection: std::marker::Sync + std::marker::Send {
fn write_lowlevel<'s>(
&'s mut self,
bytes: &'s [u8],
) -> Pin<Box<dyn Future<Output = Result<usize, IoError>> + Send + Sync + 's>>;
}
impl IsConnection for BufWriter<TcpStream> {
fn write_lowlevel<'s>(
&'s mut self,
bytes: &'s [u8],
) -> Pin<Box<dyn Future<Output = Result<usize, IoError>> + Send + Sync + 's>> {
Box::pin(self.write(bytes))
}
}
impl IsConnection for SslStream<TcpStream> {
fn write_lowlevel<'s>(
&'s mut self,
bytes: &'s [u8],
) -> Pin<Box<dyn Future<Output = Result<usize, IoError>> + Send + Sync + 's>> {
Box::pin(self.write(bytes))
}
}
impl IsConnection for BufWriter<SslStream<TcpStream>> {
fn write_lowlevel<'s>(
&'s mut self,
bytes: &'s [u8],
) -> Pin<Box<dyn Future<Output = Result<usize, IoError>> + Send + Sync + 's>> {
Box::pin(self.write(bytes))
}
}
/// A `BytesWrapper` object wraps around a `Bytes` object that might have been pulled
/// from `CoreDB`.
///
@ -73,14 +108,14 @@ impl BytesWrapper {
impl Writable for Vec<u8> {
fn write<'s>(
self,
con: &'s mut BufWriter<TcpStream>,
con: &'s mut impl IsConnection,
) -> Pin<Box<(dyn Future<Output = Result<(), Box<(dyn Error + 'static)>>> + Send + Sync + 's)>>
{
async fn write_bytes(
con: &mut BufWriter<TcpStream>,
con: &mut impl IsConnection,
resp: Vec<u8>,
) -> Result<(), Box<dyn Error>> {
con.write(&resp).await?;
con.write_lowlevel(&resp).await?;
Ok(())
}
Box::pin(write_bytes(con, self))
@ -90,14 +125,14 @@ impl Writable for Vec<u8> {
impl Writable for &'static [u8] {
fn write<'s>(
self,
con: &'s mut BufWriter<TcpStream>,
con: &'s mut impl IsConnection,
) -> Pin<Box<(dyn Future<Output = Result<(), Box<(dyn Error + 'static)>>> + Send + Sync + 's)>>
{
async fn write_bytes(
con: &mut BufWriter<TcpStream>,
con: &mut impl IsConnection,
resp: &[u8],
) -> Result<(), Box<dyn Error>> {
con.write(&resp).await?;
con.write_lowlevel(&resp).await?;
Ok(())
}
Box::pin(write_bytes(con, &self))
@ -107,28 +142,28 @@ impl Writable for &'static [u8] {
impl Writable for BytesWrapper {
fn write<'s>(
self,
con: &'s mut BufWriter<TcpStream>,
con: &'s mut impl IsConnection,
) -> Pin<Box<(dyn Future<Output = Result<(), Box<(dyn Error + 'static)>>> + Send + Sync + 's)>>
{
async fn write_bytes(
con: &mut BufWriter<TcpStream>,
con: &mut impl IsConnection,
bytes: Bytes,
) -> Result<(), Box<dyn Error>> {
// First write a `+` character to the stream since this is a
// string (we represent `String`s as `Byte` objects internally)
// and since `Bytes` are effectively `String`s we will append the
// type operator `+` to the stream
con.write(&[b'+']).await?;
con.write_lowlevel(&[b'+']).await?;
// Now get the size of the Bytes object as bytes
let size = bytes.len().to_string().into_bytes();
// Write this to the stream
con.write(&size).await?;
con.write_lowlevel(&size).await?;
// Now write a LF character
con.write(&[b'\n']).await?;
con.write_lowlevel(&[b'\n']).await?;
// Now write the REAL bytes (of the object)
con.write(&bytes).await?;
con.write_lowlevel(&bytes).await?;
// Now write another LF
con.write(&[b'\n']).await?;
con.write_lowlevel(&[b'\n']).await?;
Ok(())
}
Box::pin(write_bytes(con, self.finish_into_bytes()))
@ -138,44 +173,44 @@ impl Writable for BytesWrapper {
impl Writable for RespCodes {
fn write<'s>(
self,
con: &'s mut BufWriter<TcpStream>,
con: &'s mut impl IsConnection,
) -> Pin<Box<(dyn Future<Output = Result<(), Box<(dyn Error + 'static)>>> + Send + Sync + 's)>>
{
async fn write_bytes(
con: &mut BufWriter<TcpStream>,
con: &mut impl IsConnection,
code: RespCodes,
) -> Result<(), Box<dyn Error>> {
if let RespCodes::OtherError(Some(e)) = code {
// Since this is an other error which contains a description
// we'll write !<no_of_bytes> followed by the string
con.write(&[b'!']).await?;
con.write_lowlevel(&[b'!']).await?;
// Convert the string into a vector of bytes
let e = e.to_string().into_bytes();
// Now get the length of the byte vector and turn it into
// a string and then into a byte vector
let len_as_bytes = e.len().to_string().into_bytes();
// Write the length
con.write(&len_as_bytes).await?;
con.write_lowlevel(&len_as_bytes).await?;
// Then an LF
con.write(&[b'\n']).await?;
con.write_lowlevel(&[b'\n']).await?;
// Then the error string
con.write(&e).await?;
con.write_lowlevel(&e).await?;
// Then another LF
con.write(&[b'\n']).await?;
con.write_lowlevel(&[b'\n']).await?;
// And now we're done
return Ok(());
}
// Self's tsymbol is !
// The length of the response code is 1
// And we need a newline
con.write(&[b'!', b'1', b'\n']).await?;
con.write_lowlevel(&[b'!', b'1', b'\n']).await?;
// We need to get the u8 version of the response code
let code: u8 = code.into();
// We need the UTF8 equivalent of the response code
let code_bytes = code.to_string().into_bytes();
con.write(&code_bytes).await?;
con.write_lowlevel(&code_bytes).await?;
// Now append a newline
con.write(&[b'\n']).await?;
con.write_lowlevel(&[b'\n']).await?;
Ok(())
}
Box::pin(write_bytes(con, self))
@ -185,27 +220,27 @@ impl Writable for RespCodes {
impl Writable for GroupBegin {
fn write<'s>(
self,
con: &'s mut BufWriter<TcpStream>,
con: &'s mut impl IsConnection,
) -> Pin<Box<(dyn Future<Output = Result<(), Box<(dyn Error + 'static)>>> + Send + Sync + 's)>>
{
async fn write_bytes(
con: &mut BufWriter<TcpStream>,
con: &mut impl IsConnection,
size: usize,
) -> Result<(), Box<dyn Error>> {
con.write(b"#2\n*1\n").await?;
con.write_lowlevel(b"#2\n*1\n").await?;
// First write a `#` which indicates that the next bytes give the
// prefix length
con.write(&[b'#']).await?;
con.write_lowlevel(&[b'#']).await?;
let group_len_as_bytes = size.to_string().into_bytes();
let group_prefix_len_as_bytes = (group_len_as_bytes.len() + 1).to_string().into_bytes();
// Now write Self's len as bytes
con.write(&group_prefix_len_as_bytes).await?;
con.write_lowlevel(&group_prefix_len_as_bytes).await?;
// Now write a LF and '&' which signifies the beginning of a datagroup
con.write(&[b'\n', b'&']).await?;
con.write_lowlevel(&[b'\n', b'&']).await?;
// Now write the number of items in the datagroup as bytes
con.write(&group_len_as_bytes).await?;
con.write_lowlevel(&group_len_as_bytes).await?;
// Now write a '\n' character
con.write(&[b'\n']).await?;
con.write_lowlevel(&[b'\n']).await?;
Ok(())
}
Box::pin(write_bytes(con, self.0))
@ -215,20 +250,20 @@ impl Writable for GroupBegin {
impl Writable for usize {
fn write<'s>(
self,
con: &'s mut BufWriter<TcpStream>,
con: &'s mut impl IsConnection,
) -> Pin<Box<(dyn Future<Output = Result<(), Box<(dyn Error + 'static)>>> + Send + Sync + 's)>>
{
async fn write_bytes(
con: &mut BufWriter<TcpStream>,
con: &mut impl IsConnection,
val: usize,
) -> Result<(), Box<dyn Error>> {
con.write(b":").await?;
con.write_lowlevel(b":").await?;
let usize_bytes = val.to_string().into_bytes();
let usize_bytes_len = usize_bytes.len().to_string().into_bytes();
con.write(&usize_bytes_len).await?;
con.write(b"\n").await?;
con.write(&usize_bytes).await?;
con.write(b"\n").await?;
con.write_lowlevel(&usize_bytes_len).await?;
con.write_lowlevel(b"\n").await?;
con.write_lowlevel(&usize_bytes).await?;
con.write_lowlevel(b"\n").await?;
Ok(())
}
Box::pin(write_bytes(con, self))
@ -238,20 +273,17 @@ impl Writable for usize {
impl Writable for u64 {
fn write<'s>(
self,
con: &'s mut BufWriter<TcpStream>,
con: &'s mut impl IsConnection,
) -> Pin<Box<(dyn Future<Output = Result<(), Box<(dyn Error + 'static)>>> + Send + Sync + 's)>>
{
async fn write_bytes(
con: &mut BufWriter<TcpStream>,
val: u64,
) -> Result<(), Box<dyn Error>> {
con.write(b":").await?;
async fn write_bytes(con: &mut impl IsConnection, val: u64) -> Result<(), Box<dyn Error>> {
con.write_lowlevel(b":").await?;
let usize_bytes = val.to_string().into_bytes();
let usize_bytes_len = usize_bytes.len().to_string().into_bytes();
con.write(&usize_bytes_len).await?;
con.write(b"\n").await?;
con.write(&usize_bytes).await?;
con.write(b"\n").await?;
con.write_lowlevel(&usize_bytes_len).await?;
con.write_lowlevel(b"\n").await?;
con.write_lowlevel(&usize_bytes).await?;
con.write_lowlevel(b"\n").await?;
Ok(())
}
Box::pin(write_bytes(con, self))

@ -81,6 +81,7 @@ mod __private {
) where
T: AsRef<str>,
{
use tokio::io::AsyncWriteExt;
let mut query = String::from("MSET ");
query.push_str(values_split_with_whitespace.as_ref());
let count_bytes_len = homwany.to_string().as_bytes().len();

@ -20,7 +20,7 @@
#
name: TerrabaseDB Benchmark Tool
version: 0.5.0
version: 0.5.1
author: Sayan N. <ohsayan@outlook.com>
about: |
The TerrabaseDB benchmark tool can be used to benchmark TerrabaseDB installations.

@ -28,7 +28,7 @@ mod benchtool {
use devtimer::DevTime;
use libtdb::terrapipe;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use rand::thread_rng;
use serde::Serialize;
use std::io::prelude::*;
use std::net::{self, TcpStream};
@ -167,7 +167,7 @@ mod benchtool {
},
None => host.push_str("2003"),
}
let rand = thread_rng();
let mut rand = thread_rng();
if let Some(matches) = matches.subcommand_matches("testkey") {
let numkeys = matches.value_of("count").unwrap();
if let Ok(num) = numkeys.parse::<usize>() {
@ -175,27 +175,11 @@ mod benchtool {
println!("Generating keys ...");
let keys: Vec<String> = (0..num)
.into_iter()
.map(|_| {
let rand_string: String = unsafe {
String::from_utf8_unchecked(
rand.clone().sample_iter(&Alphanumeric).take(8).collect(),
)
.to_string()
};
rand_string
})
.map(|_| ran_string(8, &mut rand))
.collect();
let values: Vec<String> = (0..num)
.into_iter()
.map(|_| {
let rand_string: String = unsafe {
String::from_utf8_unchecked(
rand.clone().sample_iter(&Alphanumeric).take(8).collect(),
)
.to_string()
};
rand_string
})
.map(|_| ran_string(8, &mut rand))
.collect();
let set_packs: Vec<Vec<u8>> = (0..num)
.map(|idx| terrapipe::proc_query(format!("SET {} {}", keys[idx], values[idx])))
@ -235,40 +219,18 @@ mod benchtool {
max_queries,
(packet_size * 2), // key size + value size
);
let rand = thread_rng();
let mut rand = thread_rng();
let mut dt = DevTime::new_complex();
// Create separate connection pools for get and set operations
let mut setpool = Netpool::new(max_connections, &host);
let mut getpool = Netpool::new(max_connections, &host);
let keys: Vec<String> = (0..max_queries)
.into_iter()
.map(|_| {
let rand_string: String = unsafe {
String::from_utf8_unchecked(
rand.clone()
.sample_iter(&Alphanumeric)
.take(packet_size)
.collect(),
)
.to_string()
};
rand_string
})
.map(|_| ran_string(packet_size, &mut rand))
.collect();
let values: Vec<String> = (0..max_queries)
.into_iter()
.map(|_| {
let rand_string: String = unsafe {
String::from_utf8_unchecked(
rand.clone()
.sample_iter(&Alphanumeric)
.take(packet_size)
.collect(),
)
.to_string()
};
rand_string
})
.map(|_| ran_string(packet_size, &mut rand))
.collect();
/*
We create three vectors of vectors: `set_packs`, `get_packs` and `del_packs`
@ -333,6 +295,15 @@ mod benchtool {
fn calc(reqs: usize, time: u128) -> f64 {
reqs as f64 / (time as f64 / 1_000_000_000 as f64)
}
fn ran_string(len: usize, rand: impl rand::Rng) -> String {
let rand_string: String = rand
.sample_iter(&Alphanumeric)
.take(len)
.map(char::from)
.collect();
rand_string
}
}
fn main() {

@ -1,6 +1,6 @@
[package]
name = "tdb_macros"
version = "0.2.2"
version = "0.5.0"
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
edition = "2018"

Loading…
Cancel
Save