diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 00000000..73f69e09 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/Cargo.lock b/Cargo.lock index 246510eb..2028720f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,9 +2,9 @@ # It is not intended for manual editing. [[package]] name = "aho-corasick" -version = "0.7.13" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86" +checksum = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5" dependencies = [ "memchr", ] @@ -18,12 +18,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "arc-swap" -version = "0.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" - [[package]] name = "atty" version = "0.2.14" @@ -59,9 +53,9 @@ checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" [[package]] name = "byteorder" -version = "1.3.4" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" +checksum = "ae44d1a3d5a19df61dd0c8beb138458ac2a53a7ac09eba97d55592540004306b" [[package]] name = "bytes" @@ -71,9 +65,9 @@ checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" [[package]] name = "cc" -version = "1.0.59" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66120af515773fb005778dc07c261bd201ec8ce50bd6e7144c927753fe013381" +checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48" [[package]] name = "cfg-if" @@ -81,6 +75,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "chrono" version = "0.4.19" @@ -110,15 +110,6 @@ dependencies = [ "yaml-rust", ] -[[package]] -name = "cloudabi" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467" -dependencies = [ - "bitflags", -] - [[package]] name = "devtimer" version = "4.0.1" @@ -138,49 +129,162 @@ dependencies = [ "termcolor", ] +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "fs_extra" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" +[[package]] +name = "futures" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da9052a1a50244d8d5aa9bf55cbc2fb6f357c86cc52e46c62ed390a7180cf150" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2d31b7ec7efab6eefc7c57233bb10b847986139d88cc2f5a02a1ae6871a1846" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79e5145dde8da7d1b3892dad07a9c98fc04bc39892b1ecc9692cf53e2b780a65" + +[[package]] +name = "futures-executor" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e59fdc009a4b3096bf94f740a0f2424c082521f20a9b08c5c07c48d90fd9b9" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500" + +[[package]] +name = "futures-macro" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf5c69029bda2e743fddd0582d1083951d65cc9539aebf8812f36c3491342d6" + +[[package]] +name = "futures-task" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13de07eb8ea81ae445aca7b69f5f7bf15d7bf4912d8ca37d6645c77ae8a58d86" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + [[package]] name = "getrandom" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee8025cf36f917e6a52cce185b7c7177689b838b7ec138364e50cc2277a56cf4" +checksum = "4060f4657be78b8e766215b02b18a2e862d83745545de804638e2b545e81aee6" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", - "wasi 0.9.0+wasi-snapshot-preview1", + "wasi", ] [[package]] name = "hermit-abi" -version = "0.1.15" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3deed196b6e7f9e44a2ae8d94225d80302d81208b1bb673fd21fe634645c85a9" +checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8" dependencies = [ "libc", ] [[package]] name = "humantime" -version = "2.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c1ad908cc71012b7bea4d0c53ba96a8cba9962f048fa68d143376143d863b7a" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "instant" -version = "0.1.6" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b141fdc7836c525d4d594027d318c84161ca17aaf8113ab1f81ab93ae897485" +checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" +dependencies = [ + "cfg-if 1.0.0", +] [[package]] name = "itoa" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" +checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" [[package]] name = "jemalloc-sys" @@ -211,9 +315,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.72" +version = "0.2.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701" +checksum = "89203f3fba0a3795506acaad8ebce3c80c0af93f994d5a1d7a0b1eeb23271929" [[package]] name = "libtdb" @@ -226,9 +330,9 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28247cc5a5be2f05fbcd76dd0cf2c7d3b5400cb978a28042abcd4fa0b3f8261c" +checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312" dependencies = [ "scopeguard", ] @@ -239,20 +343,20 @@ version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcf3805d4480bb5b86070dcfeb9e2cb2ebc148adb753c5cca5f884d1d65a42b2" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", ] [[package]] name = "memchr" -version = "2.3.3" +version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" +checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" [[package]] name = "mio" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33bc887064ef1fd66020c9adfc45bb9f33d75a42096c81e7c56c65b75dd1a8b" +checksum = "e50ae3f04d169fcc9bde0b547d1c205219b7157e07ded9c5aff03e0637cb3ed7" dependencies = [ "libc", "log", @@ -282,9 +386,9 @@ dependencies = [ [[package]] name = "num-integer" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" dependencies = [ "autocfg", "num-traits", @@ -292,9 +396,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.12" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" dependencies = [ "autocfg", ] @@ -315,6 +419,43 @@ version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0" +[[package]] +name = "openssl" +version = "0.10.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "038d43985d1ddca7a9900630d8cd031b56e4794eecc2e9ea39dd17aa04399a70" +dependencies = [ + "bitflags", + "cfg-if 1.0.0", + "foreign-types", + "lazy_static", + "libc", + "openssl-sys", +] + +[[package]] +name = "openssl-src" +version = "111.13.0+1.1.1i" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "045e4dc48af57aad93d665885789b43222ae26f4886494da12d1ed58d309dcb6" +dependencies = [ + "cc", +] + +[[package]] +name = "openssl-sys" +version = "0.9.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "921fc71883267538946025deffb622905ecad223c28efbfdef9bb59a0175f3e6" +dependencies = [ + "autocfg", + "cc", + "libc", + "openssl-src", + "pkg-config", + "vcpkg", +] + [[package]] name = "parking_lot" version = "0.11.1" @@ -328,12 +469,11 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b" +checksum = "9ccb628cad4f84851442432c60ad8e1f607e29752d0bf072cbd0baf28aa34272" dependencies = [ - "cfg-if", - "cloudabi", + "cfg-if 1.0.0", "instant", "libc", "redox_syscall", @@ -341,17 +481,61 @@ dependencies = [ "winapi", ] +[[package]] +name = "pin-project" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95b70b68509f17aa2857863b6fa00bf21fc93674c7a8893de2f469f6aa7ca2f2" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caa25a6393f22ce819b0f50e0be89287292fda8d425be38ee0ca14c4931d9e71" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" -version = "0.2.0" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "pkg-config" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" +checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" [[package]] name = "ppv-lite86" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20" +checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" + +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" [[package]] name = "proc-macro2" @@ -395,9 +579,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8b34ba8cfb21243bd8df91854c830ff0d785fff2e82ebd4434c2644cb9ada18" +checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5" dependencies = [ "getrandom", ] @@ -480,29 +664,33 @@ dependencies = [ [[package]] name = "signal-hook-registry" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41" +checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6" dependencies = [ - "arc-swap", "libc", ] +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + [[package]] name = "smallvec" -version = "1.4.1" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f" +checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" [[package]] name = "socket2" -version = "0.3.16" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fd8b795c389288baa5f355489c65e71fd48a02104600d15c4cfbc561e9e429d" +checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", - "redox_syscall", "winapi", ] @@ -532,16 +720,20 @@ dependencies = [ "chrono", "clap", "env_logger", + "futures", "jemallocator", "lazy_static", "libtdb", "log", + "openssl", + "openssl-sys", "parking_lot", "regex", "serde", "serde_derive", "tdb_macros", "tokio", + "tokio-openssl", "toml", ] @@ -559,7 +751,7 @@ dependencies = [ [[package]] name = "tdb_macros" -version = "0.2.2" +version = "0.5.0" dependencies = [ "proc-macro2", "quote", @@ -587,9 +779,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +checksum = "bb9bc092d0d51e76b2b19d9d85534ffc9ec2db959a2523cdae0697e2972cd447" dependencies = [ "lazy_static", ] @@ -601,7 +793,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", - "wasi 0.10.0+wasi-snapshot-preview1", + "wasi", "winapi", ] @@ -636,6 +828,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-openssl" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac1bec5c0a4aa71e3459802c7a12e8912c2091ce2151004f9ce95cc5d1c6124e" +dependencies = [ + "futures", + "openssl", + "pin-project", + "tokio", +] + [[package]] name = "toml" version = "0.5.8" @@ -653,8 +857,10 @@ dependencies = [ "clap", "lazy_static", "libtdb", + "openssl", "regex", "tokio", + "tokio-openssl", ] [[package]] @@ -670,16 +876,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" [[package]] -name = "vec_map" -version = "0.8.2" +name = "vcpkg" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb" [[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" +name = "vec_map" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" [[package]] name = "wasi" diff --git a/README.md b/README.md index e1a780ea..71b2977c 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ TerrabaseDB (or TDB for short) is an effort to provide the best of key/value sto ## Getting started 🚀 -1. Download a bundle for your platform from [here ⬇️ ](https://github.com/terrabasedb/terrabase/releases) +1. Download a bundle for your platform from [here ⬇️ ](https://github.com/terrabasedb/terrabasedb/releases) 2. Unzip the bundle 3. Make the files executable (run `chmod +x tdb tsh` on *nix systems) 4. First run `tdb` to start the database server and then run `tsh` to start the interactive shell diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 395a2bd4..f29a1bf7 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -12,4 +12,6 @@ tokio = {version = "1.0.2", features = ["full"]} bytes = "1.0.1" regex = "1.4.3" lazy_static = "1.4.0" -clap = {version = "2.33.3", features=["yaml"]} \ No newline at end of file +clap = {version = "2.33.3", features=["yaml"]} +openssl = { version = "0.10.32", features = ["vendored"] } +tokio-openssl = "0.6.1" \ No newline at end of file diff --git a/cli/src/argparse.rs b/cli/src/argparse.rs index ca547e19..c5513a1b 100644 --- a/cli/src/argparse.rs +++ b/cli/src/argparse.rs @@ -23,9 +23,10 @@ use crate::protocol; use clap::load_yaml; use clap::App; use libtdb::terrapipe::ADDR; +use protocol::{Con, Connection, SslConnection}; use std::io::{self, prelude::*}; use std::process; -const MSG_WELCOME: &'static str = "TerrabaseDB v0.5.0"; +const MSG_WELCOME: &'static str = "TerrabaseDB v0.5.1"; /// This creates a REPL on the command line and also parses command-line arguments /// @@ -50,23 +51,33 @@ pub async fn start_repl() { }, None => host.push_str("2003"), } + let ssl = matches.value_of("cert"); + let mut con = if let Some(sslcert) = ssl { + let con = match SslConnection::new(&host, sslcert).await { + Ok(c) => c, + Err(e) => { + eprintln!("ERROR: {}", e); + process::exit(0x100); + } + }; + Con::Secure(con) + } else { + let con = match Connection::new(&host).await { + Ok(c) => c, + Err(e) => { + eprintln!("ERROR: {}", e); + process::exit(0x100); + } + }; + Con::Insecure(con) + }; if let Some(eval_expr) = matches.value_of("eval") { if eval_expr.len() == 0 { return; } - if let Err(e) = protocol::Connection::oneshot(&host, eval_expr.to_string()).await { - eprintln!("ERROR: {}", e); - process::exit(0x100); - } + con.execute_query(eval_expr.to_string()).await; return; } - let mut con = match protocol::Connection::new(&host).await { - Ok(c) => c, - Err(e) => { - eprintln!("ERROR: {}", e); - process::exit(0x100); - } - }; println!("{}", MSG_WELCOME); loop { print!("tsh>"); @@ -85,6 +96,6 @@ pub async fn start_repl() { // The query was empty, so let it be continue; } - con.run_query(rl).await; + con.execute_query(rl).await; } } diff --git a/cli/src/cli.yml b/cli/src/cli.yml index 0745b293..a666c600 100644 --- a/cli/src/cli.yml +++ b/cli/src/cli.yml @@ -20,28 +20,35 @@ # name: TerrabaseDB Shell -version: 0.5.0 +version: 0.5.1 author: Sayan N. about: The TerrabaseDB Shell (tsh) args: - - host: - short: h - required: false - long: host - value_name: host - help: Sets the remote host to connect to - takes_value: true - - port: - short: p - required: false - long: port - value_name: port - help: Sets the remote port to connect to - takes_value: true - - eval: - short: e - required: false - long: eval - value_name: expression - help: Run an expression without REPL - takes_value: true + - host: + short: h + required: false + long: host + value_name: host + help: Sets the remote host to connect to + takes_value: true + - port: + short: p + required: false + long: port + value_name: port + help: Sets the remote port to connect to + takes_value: true + - eval: + short: e + required: false + long: eval + value_name: expression + help: Run an expression without REPL + takes_value: true + - cert: + short: C + required: false + long: sslcert + value_name: cert + help: Sets the PEM certificate to use for SSL connections + takes_value: true diff --git a/cli/src/protocol/mod.rs b/cli/src/protocol/mod.rs index dde8edb8..5675515e 100644 --- a/cli/src/protocol/mod.rs +++ b/cli/src/protocol/mod.rs @@ -26,14 +26,35 @@ use lazy_static::lazy_static; use libtdb::terrapipe; use libtdb::TResult; use libtdb::BUF_CAP; +use openssl::ssl::Ssl; +use openssl::ssl::SslContext; +use openssl::ssl::SslMethod; use regex::Regex; +use std::io::Result as IoResult; +use std::net::SocketAddr; +use std::pin::Pin; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; +use tokio_openssl::SslStream; lazy_static! { static ref RE: Regex = Regex::new("[^\\s\"']+|\"[^\"]*\"|'[^']*'").unwrap(); } +pub enum Con { + Secure(SslConnection), + Insecure(Connection), +} + +impl Con { + pub async fn execute_query(&mut self, query: String) { + match self { + Con::Insecure(con) => con.run_query(query).await, + Con::Secure(con) => con.run_query(query).await, + } + } +} + /// A `Connection` is a wrapper around a`TcpStream` and a read buffer pub struct Connection { stream: TcpStream, @@ -44,21 +65,12 @@ impl Connection { /// Create a new connection, creating a connection to `host` pub async fn new(host: &str) -> TResult { let stream = TcpStream::connect(host).await?; - println!("Connected to {}", host); + println!("Connected to tp://{}", host); Ok(Connection { stream, buffer: BytesMut::with_capacity(BUF_CAP), }) } - pub async fn oneshot(host: &str, query: String) -> TResult<()> { - let mut con = Connection { - stream: TcpStream::connect(host).await?, - buffer: BytesMut::with_capacity(BUF_CAP), - }; - con.run_query(query).await; - drop(con); - Ok(()) - } /// This function will write a query to the stream and read the response from the /// server. It will then determine if the returned response is complete or incomplete /// or invalid. @@ -121,3 +133,109 @@ impl Connection { deserializer::parse(&self.buffer) } } + +/// An `SslConnection` is a wrapper around a `SslStream` provided by OpenSSL and a +/// read buffer +pub struct SslConnection { + stream: SslStream, + buffer: BytesMut, +} + +impl SslConnection { + /// Create a new connection, creating a connection to `host` + pub async fn new(host: &str, sslcert: &str) -> TResult { + let mut ctx = SslContext::builder(SslMethod::tls_client())?; + ctx.set_ca_file(sslcert)?; + let ssl = Ssl::new(&ctx.build())?; + let stream = TcpStream::connect(host).await?; + let mut stream = SslStream::new(ssl, stream)?; + Pin::new(&mut stream).connect().await.unwrap(); + println!("Connected to tps://{}", host); + Ok(SslConnection { + stream, + buffer: BytesMut::with_capacity(BUF_CAP), + }) + } + /// This function will write a query to the stream and read the response from the + /// server. It will then determine if the returned response is complete or incomplete + /// or invalid. + /// + /// - If it is complete, then the return is parsed into a `Display`able form + /// and written to the output stream. If any parsing errors occur, they're also handled + /// by this function (usually, "Invalid Response" is written to the terminal). + /// - If the packet is incomplete, it will wait to read the entire response from the stream + /// - If the packet is corrupted, it will output "Invalid Response" + pub async fn run_query(&mut self, query: String) { + let query = terrapipe::proc_query(query); + match self.stream.write_all(&query).await { + Ok(_) => (), + Err(e) => { + eprintln!("ERROR: Couldn't write data to socket with '{}'", e); + return; + } + }; + loop { + if let Err(e) = self.read_again().await { + eprintln!("ERROR: Reading from stream failed with: '{}'", e); + return; + } + match self.try_response().await { + ClientResult::Empty(f) => { + self.buffer.advance(f); + eprintln!("ERROR: The remote end reset the connection"); + return; + } + ClientResult::Incomplete => { + continue; + } + ClientResult::Response(r, f) => { + self.buffer.advance(f); + if r.len() == 0 { + return; + } + for group in r { + println!("{}", group); + } + return; + } + ClientResult::InvalidResponse(r) => { + self.buffer.advance(r); + eprintln!("{}", ClientResult::InvalidResponse(0)); + return; + } + } + } + } + /// This function is a subroutine of `run_query` used to parse the response packet + async fn try_response(&mut self) -> ClientResult { + if self.buffer.is_empty() { + // The connection was possibly reset + return ClientResult::Empty(0); + } + deserializer::parse(&self.buffer) + } + async fn read_again(&mut self) -> Result<(), String> { + match self.stream.read_buf(&mut self.buffer).await { + Ok(0) => { + if self.buffer.is_empty() { + return Ok(()); + } else { + return Err(format!( + "Connection reset while reading from {}", + if let Ok(p) = self.get_peer() { + p.to_string() + } else { + "peer".to_owned() + } + ) + .into()); + } + } + Ok(_) => Ok(()), + Err(e) => return Err(format!("{}", e)), + } + } + fn get_peer(&self) -> IoResult { + self.stream.get_ref().peer_addr() + } +} diff --git a/examples/config-files/ssl.toml b/examples/config-files/ssl.toml new file mode 100644 index 00000000..c4e951e3 --- /dev/null +++ b/examples/config-files/ssl.toml @@ -0,0 +1,17 @@ +[server] +host = "127.0.0.1" +port = 2003 +noart = false + +[ssl] +key = "/path/to/keyfile.pem" +chain = "/path/to/chain.pem" +port = 2004 + +[bgsave] +enabled = true +every = 120 + +[snapshot] +every = 3600 +atmost = 4 \ No newline at end of file diff --git a/examples/config-files/template.toml b/examples/config-files/template.toml index 697aa9db..bee655f1 100644 --- a/examples/config-files/template.toml +++ b/examples/config-files/template.toml @@ -12,7 +12,7 @@ port = 2003 # The port to which you want TDB to bind to # Set `noart` to true if you want to disable terminal artwork noart = false -# This key is *OPTIONAL*, but will be required post 0.5.0 +# This key is *OPTIONAL*, but will be required post 0.5.1 [bgsave] # Run `BGSAVE` `every` seconds. For example, setting this to 60 will cause BGSAVE to run # after every 2 minutes diff --git a/server/Cargo.toml b/server/Cargo.toml index 8924cf09..95e41ff4 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,7 +13,8 @@ libtdb = {path ="../libtdb"} bincode = "1.3.1" parking_lot = "0.11.1" lazy_static = "1.4.0" -serde_derive = "1.0.118" +serde_derive = "1.0.119" +futures = "0.3.12" serde = {version = "1.0.119", features= ["derive"]} toml = "0.5.8" clap = {version = "2.33.3", features=["yaml"]} @@ -22,7 +23,9 @@ log = "0.4.13" chrono = "0.4.19" regex = "1.4.3" tdb_macros = {path="../tdb-macros"} - +tokio-openssl = "0.6.1" +openssl = { version = "0.10", features = ["vendored"] } +openssl-sys = "0.9.60" [target.'cfg(not(target_env = "msvc"))'.dependencies] jemallocator = "0.3.2" diff --git a/server/src/admin/mksnap.rs b/server/src/admin/mksnap.rs index 07a8b55d..f5dce926 100644 --- a/server/src/admin/mksnap.rs +++ b/server/src/admin/mksnap.rs @@ -20,10 +20,11 @@ */ use crate::coredb::CoreDB; +use crate::dbnet::Con; use crate::diskstore; use crate::diskstore::snapshot::SnapshotEngine; use crate::diskstore::snapshot::DIR_SNAPSHOT; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::protocol::{responses, ActionGroup}; use crate::resp::GroupBegin; use libtdb::terrapipe::RespCodes; use libtdb::TResult; @@ -32,7 +33,7 @@ use std::path::PathBuf; /// Create a snapshot /// -pub async fn mksnap(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn mksnap(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany == 0 { if !handle.is_snapshot_enabled() { diff --git a/server/src/cli.yml b/server/src/cli.yml index 04ba4cdf..01940e98 100644 --- a/server/src/cli.yml +++ b/server/src/cli.yml @@ -1,83 +1,83 @@ -# -# Created on Tue Sep 01 2020 -# -# This file is a part of TerrabaseDB -# Copyright (c) 2020, Sayan Nandan -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -# - name: TerrabaseDB Server -version: 0.5.0 +version: 0.5.1 author: Sayan N. about: The TerrabaseDB Database server args: - - config: - short: c - required: false - long: withconfig - value_name: cfgfile - help: Sets a configuration file to start tdb - takes_value: true - - restore: - short: r - required: false - long: restore - value_name: snapshotfile - help: Restores data from a previous snapshot - takes_value: true - - host: - short: h - required: false - long: host - value_name: host - help: Sets the host to which the server will bind - takes_value: true - - port: - short: p - required: false - long: port - value_name: port - help: Sets the port to which the server will bind - takes_value: true - - noart: - required: false - long: noart - help: Disables terminal artwork - takes_value: false - - nosave: - required: false - long: nosave - help: Disables automated background saving - takes_value: false - - saveduration: - required: false - long: saveduration - value_name: duration - short: S - takes_value: true - help: Set the BGSAVE duration - - snapevery: - required: false - long: snapevery - value_name: duration - help: Set the periodic snapshot duration - takes_value: true - - snapkeep: - required: false - long: snapkeep - value_name: count - help: Sets the number of most recent snapshots to keep - takes_value: true + - config: + short: c + required: false + long: withconfig + value_name: cfgfile + help: Sets a configuration file to start tdb + takes_value: true + - restore: + short: r + required: false + long: restore + value_name: snapshotfile + help: Restores data from a previous snapshot + takes_value: true + - host: + short: h + required: false + long: host + value_name: host + help: Sets the host to which the server will bind + takes_value: true + - port: + short: p + required: false + long: port + value_name: port + help: Sets the port to which the server will bind + takes_value: true + - noart: + required: false + long: noart + help: Disables terminal artwork + takes_value: false + - nosave: + required: false + long: nosave + help: Disables automated background saving + takes_value: false + - saveduration: + required: false + long: saveduration + value_name: duration + short: S + takes_value: true + help: Set the BGSAVE duration + - snapevery: + required: false + long: snapevery + value_name: duration + help: Set the periodic snapshot duration + takes_value: true + - snapkeep: + required: false + long: snapkeep + value_name: count + help: Sets the number of most recent snapshots to keep + takes_value: true + - sslkey: + required: false + long: sslkey + short: k + value_name: key + help: Sets the PEM key file to use for SSL/TLS + takes_value: true + - sslchain: + required: false + long: sslchain + short: z + value_name: chain + help: Sets the PEM chain file to use for SSL/TLS + takes_value: true + - sslonly: + required: false + long: sslonly + takes_value: false + help: >- + Tells the server to only accept SSL connections and disables the non-SSL + port diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index aa1a5809..09bb5e1a 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -34,13 +34,17 @@ use std::path::PathBuf; use tokio::net::ToSocketAddrs; use toml; +const DEFAULT_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); +const DEFAULT_PORT: u16 = 2003; +const DEFAULT_SSL_PORT: u16 = 2004; + /// This struct is an _object representation_ used for parsing the TOML file #[derive(Deserialize, Debug, PartialEq)] pub struct Config { /// The `server` key server: ConfigKeyServer, /// The `bgsave` key - /* TODO(@ohsayan): As of now, we will keep this optional, but post 0.5.0, + /* TODO(@ohsayan): As of now, we will keep this optional, but post 0.5.1, * we will make it compulsory (so that we don't break semver) * See the link below for more details: * https://github.com/terrabasedb/terrabasedb/issues/21#issuecomment-693217709 @@ -48,6 +52,8 @@ pub struct Config { bgsave: Option, /// The snapshot key snapshot: Option, + /// SSL configuration + ssl: Option, } /// The BGSAVE section in the config file @@ -124,6 +130,75 @@ pub struct ConfigKeySnapshot { atmost: usize, } +/// Port configuration +/// +/// This enumeration determines whether the ports are: +/// - `Multi`: This means that the database server will be listening to both +/// SSL **and** non-SSL requests +/// - `SecureOnly` : This means that the database server will only accept SSL requests +/// and will not even activate the non-SSL socket +/// - `InsecureOnly` : This indicates that the server would only accept non-SSL connections +/// and will not even activate the SSL socket +#[derive(Debug, PartialEq)] +pub enum PortConfig { + SecureOnly { + host: IpAddr, + ssl: SslOpts, + }, + Multi { + host: IpAddr, + port: u16, + ssl: SslOpts, + }, + InsecureOnly { + host: IpAddr, + port: u16, + }, +} + +impl PortConfig { + #[cfg(test)] + pub const fn default() -> PortConfig { + PortConfig::InsecureOnly { + host: DEFAULT_IPV4, + port: DEFAULT_PORT, + } + } +} + +impl PortConfig { + pub const fn new_secure_only(host: IpAddr, ssl: SslOpts) -> Self { + PortConfig::SecureOnly { host, ssl } + } + pub const fn new_insecure_only(host: IpAddr, port: u16) -> Self { + PortConfig::InsecureOnly { host, port } + } + pub const fn new_multi(host: IpAddr, port: u16, ssl: SslOpts) -> Self { + PortConfig::Multi { host, port, ssl } + } +} + +#[derive(Deserialize, Debug, PartialEq)] +pub struct KeySslOpts { + key: String, + chain: String, + port: u16, + only: Option, +} + +#[derive(Deserialize, Debug, PartialEq)] +pub struct SslOpts { + pub key: String, + pub chain: String, + pub port: u16, +} + +impl SslOpts { + pub const fn new(key: String, chain: String, port: u16) -> Self { + SslOpts { key, chain, port } + } +} + #[derive(Debug, PartialEq)] /// The snapshot configuration /// @@ -180,16 +255,14 @@ impl SnapshotConfig { /// configuration #[derive(Debug, PartialEq)] pub struct ParsedConfig { - /// A valid IPv4/IPv6 address - host: IpAddr, - /// A valid port - port: u16, /// If `noart` is set to true, no terminal artwork should be displayed noart: bool, /// The BGSAVE configuration pub bgsave: BGSave, /// The snapshot configuration pub snapshot: SnapshotConfig, + /// Port configuration + pub ports: PortConfig, } impl ParsedConfig { @@ -206,16 +279,14 @@ impl ParsedConfig { } /// Create a `ParsedConfig` instance from a `Config` object, which is a parsed /// TOML file (represented as an object) - const fn from_config(cfg: Config) -> Self { + fn from_config(cfg_info: Config) -> Self { ParsedConfig { - host: cfg.server.host, - port: cfg.server.port, - noart: if let Some(noart) = cfg.server.noart { + noart: if let Some(noart) = cfg_info.server.noart { noart } else { false }, - bgsave: if let Some(bgsave) = cfg.bgsave { + bgsave: if let Some(bgsave) = cfg_info.bgsave { match (bgsave.enabled, bgsave.every) { // TODO: Show a warning that there are unused keys (Some(enabled), Some(every)) => BGSave::new(enabled, every), @@ -226,11 +297,39 @@ impl ParsedConfig { } else { BGSave::default() }, - snapshot: if let Some(snapshot) = cfg.snapshot { + snapshot: if let Some(snapshot) = cfg_info.snapshot { SnapshotConfig::Enabled(SnapshotPref::new(snapshot.every, snapshot.atmost)) } else { SnapshotConfig::default() }, + ports: if let Some(sslopts) = cfg_info.ssl { + if sslopts.only.is_some() { + PortConfig::SecureOnly { + ssl: SslOpts { + key: sslopts.key, + chain: sslopts.chain, + port: sslopts.port, + }, + + host: cfg_info.server.host, + } + } else { + PortConfig::Multi { + ssl: SslOpts { + key: sslopts.key, + chain: sslopts.chain, + port: sslopts.port, + }, + host: cfg_info.server.host, + port: cfg_info.server.port, + } + } + } else { + PortConfig::InsecureOnly { + host: cfg_info.server.host, + port: cfg_info.server.port, + } + }, } } #[cfg(test)] @@ -242,38 +341,38 @@ impl ParsedConfig { /// and a supplied `port` pub const fn default_with_port(port: u16) -> Self { ParsedConfig { - host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - port, noart: false, bgsave: BGSave::default(), snapshot: SnapshotConfig::default(), + ports: PortConfig::new_insecure_only(DEFAULT_IPV4, port), } } + #[cfg(test)] + pub const fn default_ports() -> PortConfig { + PortConfig::default() + } /// Create a new `ParsedConfig` with the default `port` and `noart` settngs /// and a supplied `host` pub const fn default_with_host(host: IpAddr) -> Self { ParsedConfig::new( - host, - 2003, false, BGSave::default(), SnapshotConfig::default(), + PortConfig::new_insecure_only(host, 2003), ) } /// Create a new `ParsedConfig` with all the fields pub const fn new( - host: IpAddr, - port: u16, noart: bool, bgsave: BGSave, snapshot: SnapshotConfig, + ports: PortConfig, ) -> Self { ParsedConfig { - host, - port, noart, bgsave, snapshot, + ports, } } /// Create a default `ParsedConfig` with the following setup defaults: @@ -282,19 +381,15 @@ impl ParsedConfig { /// - `noart` : false /// - `bgsave_enabled` : true /// - `bgsave_duration` : 120 + /// - `ssl` : disabled pub const fn default() -> Self { ParsedConfig { - host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - port: 2003, noart: false, bgsave: BGSave::default(), snapshot: SnapshotConfig::default(), + ports: PortConfig::new_insecure_only(DEFAULT_IPV4, 2003), } } - /// Return a (host, port) tuple which can be bound to with `TcpListener` - pub fn get_host_port_tuple(&self) -> impl ToSocketAddrs { - ((self.host), self.port) - } /// Returns `false` if `noart` is enabled. Otherwise it returns `true` pub const fn is_artful(&self) -> bool { !self.noart @@ -401,21 +496,29 @@ pub fn get_config_file_or_return_cfg() -> Result Result SnapshotConfig::Disabled, }; - let cfg = ParsedConfig::new(host, port, noart, bgsave, snapcfg); + let portcfg = match ( + sslkey.map(|val| val.to_owned()), + sslchain.map(|val| val.to_owned()), + ) { + (None, None) => { + if sslonly { + return Err(ConfigError::CliArgErr( + "You mast pass values for both --sslkey and --sslchain to use the --sslonly flag" + )); + } else { + PortConfig::new_insecure_only(host, port) + } + } + (Some(key), Some(chain)) => { + if sslonly { + PortConfig::new_secure_only(host, SslOpts::new(key, chain, DEFAULT_SSL_PORT)) + } else { + PortConfig::new_multi(host, port, SslOpts::new(key, chain, DEFAULT_SSL_PORT)) + } + } + _ => { + return Err(ConfigError::CliArgErr( + "To use SSL, pass values for both --sslkey and --sslchain", + )); + } + }; + let cfg = ParsedConfig::new(noart, bgsave, snapcfg, portcfg); return Ok(ConfigType::Custom(cfg, restorefile)); } if let Some(filename) = filename { @@ -558,11 +687,10 @@ fn test_config_file_noart() { assert_eq!( cfg, ParsedConfig { - port: 2003, - host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), noart: true, bgsave: BGSave::default(), snapshot: SnapshotConfig::default(), + ports: PortConfig::default() } ); } @@ -575,11 +703,13 @@ fn test_config_file_ipv6() { assert_eq!( cfg, ParsedConfig { - port: 2003, - host: IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0x1)), noart: false, bgsave: BGSave::default(), snapshot: SnapshotConfig::default(), + ports: PortConfig::new_insecure_only( + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0x1)), + DEFAULT_PORT + ) } ); } @@ -592,11 +722,10 @@ fn test_config_file_template() { assert_eq!( cfg, ParsedConfig::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 2003, false, BGSave::default(), - SnapshotConfig::Enabled(SnapshotPref::new(3600, 4)) + SnapshotConfig::Enabled(SnapshotPref::new(3600, 4)), + PortConfig::default() // TODO: Update the template ) ); } @@ -617,11 +746,10 @@ fn test_config_file_custom_bgsave() { assert_eq!( cfg, ParsedConfig { - host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - port: 2003, noart: false, bgsave: BGSave::new(true, 600), - snapshot: SnapshotConfig::default() + snapshot: SnapshotConfig::default(), + ports: PortConfig::default() } ); } @@ -637,11 +765,10 @@ fn test_config_file_bgsave_enabled_only() { assert_eq!( cfg, ParsedConfig { - host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - port: 2003, noart: false, bgsave: BGSave::default(), snapshot: SnapshotConfig::default(), + ports: PortConfig::default() } ) } @@ -657,11 +784,10 @@ fn test_config_file_bgsave_every_only() { assert_eq!( cfg, ParsedConfig { - host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - port: 2003, noart: false, bgsave: BGSave::new(true, 600), - snapshot: SnapshotConfig::default() + snapshot: SnapshotConfig::default(), + ports: PortConfig::default() } ) } @@ -675,9 +801,8 @@ fn test_config_file_snapshot() { ParsedConfig { snapshot: SnapshotConfig::Enabled(SnapshotPref::new(3600, 4)), bgsave: BGSave::default(), - host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - port: 2003, noart: false, + ports: PortConfig::default() } ); } diff --git a/server/src/coredb/mod.rs b/server/src/coredb/mod.rs index 42b50e95..3d4d5068 100644 --- a/server/src/coredb/mod.rs +++ b/server/src/coredb/mod.rs @@ -24,8 +24,8 @@ use crate::config::BGSave; use crate::config::SnapshotConfig; use crate::config::SnapshotPref; +use crate::dbnet::Con; use crate::diskstore; -use crate::protocol::Connection; use crate::protocol::Query; use crate::queryengine; use bytes::Bytes; @@ -245,14 +245,16 @@ impl CoreDB { } /// Execute a query that has already been validated by `Connection::read_query` - pub async fn execute_query(&self, query: Query, con: &mut Connection) -> TResult<()> { + pub async fn execute_query(&self, query: Query, mut con: &mut Con<'_>) -> TResult<()> { match query { - Query::Simple(q) => queryengine::execute_simple(&self, con, q).await?, + Query::Simple(q) => { + queryengine::execute_simple(&self, &mut con, q).await?; + // Once we're done executing, flush the stream + con.flush_stream().await + } // TODO(@ohsayan): Pipeline commands haven't been implemented yet Query::Pipelined(_) => unimplemented!(), } - // Once we're done executing, flush the stream - con.flush_stream().await } /// Create a new `CoreDB` instance diff --git a/server/src/dbnet.rs b/server/src/dbnet.rs index 507a808f..6829c5b5 100644 --- a/server/src/dbnet.rs +++ b/server/src/dbnet.rs @@ -35,15 +35,21 @@ //! use crate::config::BGSave; +use crate::config::PortConfig; use crate::config::SnapshotConfig; +use crate::config::SslOpts; use crate::diskstore::snapshot::DIR_REMOTE_SNAPSHOT; +use crate::protocol::tls::SslConnection; +use crate::protocol::tls::SslListener; use crate::protocol::{Connection, QueryResult::*}; +use crate::resp::Writable; use crate::CoreDB; use libtdb::util::terminal; use libtdb::TResult; use std::fs; use std::future::Future; use std::io::ErrorKind; +use std::net::IpAddr; use std::path::PathBuf; use std::process; use std::sync::Arc; @@ -52,7 +58,6 @@ use tokio::net::TcpStream; use tokio::sync::Semaphore; use tokio::sync::{broadcast, mpsc}; use tokio::time::{self, Duration}; - /// Responsible for gracefully shutting down the server instead of dying randomly // Sounds very sci-fi ;) pub struct Terminator { @@ -104,7 +109,7 @@ pub struct Listener { } /// A per-connection handler -struct CHandler { +pub struct CHandler { db: CoreDB, con: Connection, climit: Arc, @@ -150,16 +155,59 @@ impl Listener { }; tokio::spawn(async move { if let Err(e) = chandle.run().await { - eprintln!("Error: {}", e); + log::error!("Error: {}", e); } }); } } } +/// # Connection Wrapper +/// +/// A `Con` object holds a mutable reference to a standard TCP stream or to +/// an encrypted connection (over the `SslListener` object). It provides a few +/// methods which are provided by the underlying interface. +pub enum Con<'a> { + /// A secure TLS connection + Secure(&'a mut SslConnection), + /// An insecure ('standard') TCP connection + Insecure(&'a mut Connection), +} + +impl<'a> Con<'a> { + /// Create a new **unencrypted** connection instance + pub fn init<'b>(con: &'b mut Connection) -> Self + where + 'b: 'a, + { + Con::Insecure(con) + } + /// Create a new **encrypted** connection instance + pub fn init_secure<'b>(con: &'b mut SslConnection) -> Self + where + 'b: 'a, + { + Con::Secure(con) + } + /// Flush the stream that is held by the underlying connection + pub async fn flush_stream(&mut self) -> TResult<()> { + match self { + Con::Secure(con) => con.flush_stream().await, + Con::Insecure(con) => con.flush_stream().await, + } + } + /// Write bytes to the underlying stream that implement the `Writable` trait + pub async fn write_response(&mut self, resp: impl Writable) -> TResult<()> { + match self { + Con::Insecure(con) => con.write_response(resp).await, + Con::Secure(con) => con.write_response(resp).await, + } + } +} + impl CHandler { /// Process the incoming connection - async fn run(&mut self) -> TResult<()> { + pub async fn run(&mut self) -> TResult<()> { while !self.terminator.is_termination_signal() { let try_df = tokio::select! { tdf = self.con.read_query() => tdf, @@ -168,7 +216,11 @@ impl CHandler { } }; match try_df { - Ok(Q(s)) => self.db.execute_query(s, &mut self.con).await?, + Ok(Q(s)) => { + self.db + .execute_query(s, &mut Con::init(&mut self.con)) + .await? + } Ok(E(r)) => self.con.close_conn_with_error(r).await?, Ok(Empty) => return Ok(()), Err(e) => return Err(e.into()), @@ -187,9 +239,217 @@ impl Drop for CHandler { } use std::io::{self, prelude::*}; +/// Multiple Listener Interface +/// +/// A `MultiListener` is an abstraction over an `SslListener` or a `Listener` to facilitate +/// easier asynchronous listening on multiple ports. +/// +/// - The `SecureOnly` variant holds an `SslListener` +/// - The `InsecureOnly` variant holds a `Listener` +/// - The `Multi` variant holds both an `SslListener` and a `Listener` +/// This variant enables listening to both secure and insecure sockets at the same time +/// asynchronously +enum MultiListener { + SecureOnly(SslListener), + InsecureOnly(Listener), + Multi(Listener, SslListener), +} + +impl MultiListener { + /// Create a new `InsecureOnly` listener + pub async fn new_insecure_only( + host: IpAddr, + port: u16, + climit: Arc, + db: CoreDB, + signal: broadcast::Sender<()>, + terminate_tx: mpsc::Sender<()>, + terminate_rx: mpsc::Receiver<()>, + ) -> Self { + let listener = TcpListener::bind((host, port)) + .await + .expect("Failed to bind to port"); + MultiListener::InsecureOnly(Listener { + listener, + db, + climit, + signal, + terminate_tx, + terminate_rx, + }) + } + /// Create a new `SecureOnly` listener + pub async fn new_secure_only( + host: IpAddr, + climit: Arc, + db: CoreDB, + signal: broadcast::Sender<()>, + terminate_tx: mpsc::Sender<()>, + terminate_rx: mpsc::Receiver<()>, + ssl: SslOpts, + ) -> Self { + let listener = TcpListener::bind((host, ssl.port)) + .await + .expect("Failed to bind to port"); + MultiListener::SecureOnly( + SslListener::new_pem_based_ssl_connection( + ssl.key, + ssl.chain, + db, + listener, + climit, + signal, + terminate_tx, + terminate_rx, + ) + .expect("Couldn't bind to secure port"), + ) + } + /// Create a new `Multi` listener that has both a secure and an insecure listener + pub async fn new_multi( + host: IpAddr, + port: u16, + climit: Arc, + db: CoreDB, + signal: broadcast::Sender<()>, + terminate_tx: mpsc::Sender<()>, + terminate_rx: mpsc::Receiver<()>, + ssl_terminate_tx: mpsc::Sender<()>, + ssl_terminate_rx: mpsc::Receiver<()>, + ssl: SslOpts, + ) -> Self { + let listener = TcpListener::bind((host, ssl.port)) + .await + .expect("Failed to bind to port"); + let secure_listener = SslListener::new_pem_based_ssl_connection( + ssl.key, + ssl.chain, + db.clone(), + listener, + climit.clone(), + signal.clone(), + ssl_terminate_tx, + ssl_terminate_rx, + ) + .expect("Couldn't bind to secure port"); + let listener = TcpListener::bind((host, port)) + .await + .expect("Failed to bind to port"); + let insecure_listener = Listener { + listener, + db, + climit, + signal, + terminate_tx, + terminate_rx, + }; + MultiListener::Multi(insecure_listener, secure_listener) + } + /// Start the server + /// + /// The running of single and/or parallel listeners is handled by this function by + /// exploiting the working of async functions + pub async fn run_server(&mut self) -> TResult<()> { + match self { + MultiListener::SecureOnly(secure_listener) => secure_listener.run().await, + MultiListener::InsecureOnly(insecure_listener) => insecure_listener.run().await, + MultiListener::Multi(insecure_listener, secure_listener) => { + let insec = insecure_listener.run(); + let sec = secure_listener.run(); + let (e1, e2) = futures::join!(insec, sec); + if let Err(e) = e1 { + log::error!("Insecure listener failed with: {}", e); + } + if let Err(e) = e2 { + log::error!("Secure listener failed with: {}", e); + } + Ok(()) + } + } + } + /// Print the port binding status + pub fn print_binding(&self) { + match self { + MultiListener::SecureOnly(secure_listener) => { + log::info!( + "Server started on tps://{}", + secure_listener.listener.local_addr().expect("Failed to g") + ) + } + MultiListener::InsecureOnly(insecure_listener) => { + log::info!( + "Server started on tp://{}", + insecure_listener + .listener + .local_addr() + .expect("Failed to g") + ) + } + MultiListener::Multi(insecure_listener, secure_listener) => { + log::info!( + "Listening to tp://{} and tps://{}", + insecure_listener + .listener + .local_addr() + .expect("Failed to g"), + secure_listener.listener.local_addr().expect("Failed to g") + ) + } + } + } + /// Signal the ports to shut down and only return after they have shut down + /// + /// **Do note:** This function doesn't flush the `CoreDB` object! The **caller has to + /// make sure that the data is saved!** + pub async fn finish_with_termsig(self) { + match self { + MultiListener::InsecureOnly(server) => { + let Listener { + mut terminate_rx, + terminate_tx, + signal, + .. + } = server; + drop(signal); + drop(terminate_tx); + let _ = terminate_rx.recv().await; + } + MultiListener::SecureOnly(server) => { + let SslListener { + mut terminate_rx, + terminate_tx, + signal, + .. + } = server; + drop(signal); + drop(terminate_tx); + let _ = terminate_rx.recv().await; + } + MultiListener::Multi(insecure, secure) => { + let Listener { + mut terminate_rx, + terminate_tx, + signal, + .. + } = insecure; + drop((signal, terminate_tx)); + let _ = terminate_rx.recv().await; + let SslListener { + mut terminate_rx, + terminate_tx, + signal, + .. + } = secure; + drop((signal, terminate_tx)); + let _ = terminate_rx.recv().await; + } + } + } +} + /// Start the server waiting for incoming connections or a CTRL+C signal pub async fn run( - listener: TcpListener, + ports: PortConfig, bgsave_cfg: BGSave, snapshot_cfg: SnapshotConfig, sig: impl Future, @@ -200,7 +460,7 @@ pub async fn run( let db = match CoreDB::new(bgsave_cfg, snapshot_cfg, restore_filepath) { Ok(d) => d, Err(e) => { - eprintln!("ERROR: {}", e); + log::error!("ERROR: {}", e); process::exit(0x100); } }; @@ -214,33 +474,58 @@ pub async fn run( } }, } - log::info!( - "Started server on terrapipe://{}", - listener - .local_addr() - .expect("The local address couldn't be fetched. Please file a bug report") - ); - let mut server = Listener { - listener, - db, - climit: Arc::new(Semaphore::new(50000)), - signal, - terminate_tx, - terminate_rx, + let climit = Arc::new(Semaphore::new(50000)); + let mut server = match ports { + PortConfig::InsecureOnly { host, port } => { + MultiListener::new_insecure_only( + host, + port, + climit.clone(), + db.clone(), + signal, + terminate_tx, + terminate_rx, + ) + .await + } + PortConfig::SecureOnly { host, ssl } => { + MultiListener::new_secure_only( + host, + climit.clone(), + db.clone(), + signal, + terminate_tx, + terminate_rx, + ssl, + ) + .await + } + PortConfig::Multi { host, port, ssl } => { + let (ssl_terminate_tx, ssl_terminate_rx) = mpsc::channel::<()>(1); + let server = MultiListener::new_multi( + host, + port, + climit, + db.clone(), + signal, + terminate_tx, + terminate_rx, + ssl_terminate_tx, + ssl_terminate_rx, + ssl, + ) + .await; + server + } }; + server.print_binding(); tokio::select! { - _ = server.run() => {} + _ = server.run_server() => {} _ = sig => { log::info!("Signalling all workers to shut down"); } } - let Listener { - mut terminate_rx, - terminate_tx, - signal, - db, - .. - } = server; + server.finish_with_termsig().await; if let Ok(_) = db.flush_db() { log::info!("Successfully saved data to disk"); () @@ -259,9 +544,6 @@ pub async fn run( } } } - drop(signal); - drop(terminate_tx); - let _ = terminate_rx.recv().await; terminal::write_info("Goodbye :)\n").unwrap(); } diff --git a/server/src/kvengine/dbsize.rs b/server/src/kvengine/dbsize.rs index 1144582d..a77f69f8 100644 --- a/server/src/kvengine/dbsize.rs +++ b/server/src/kvengine/dbsize.rs @@ -19,12 +19,13 @@ * */ use crate::coredb::CoreDB; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use crate::resp::GroupBegin; use libtdb::TResult; /// Get the number of keys in the database -pub async fn dbsize(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn dbsize(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { if act.howmany() != 0 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; } diff --git a/server/src/kvengine/del.rs b/server/src/kvengine/del.rs index 7c924634..ead67647 100644 --- a/server/src/kvengine/del.rs +++ b/server/src/kvengine/del.rs @@ -23,7 +23,8 @@ //! This module provides functions to work with `DEL` queries use crate::coredb::CoreDB; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use crate::resp::GroupBegin; use libtdb::TResult; @@ -31,7 +32,7 @@ use libtdb::TResult; /// /// Do note that this function is blocking since it acquires a write lock. /// It will write an entire datagroup, for this `del` action -pub async fn del(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn del(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany == 0 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; diff --git a/server/src/kvengine/exists.rs b/server/src/kvengine/exists.rs index ee724b26..c176d6b2 100644 --- a/server/src/kvengine/exists.rs +++ b/server/src/kvengine/exists.rs @@ -23,12 +23,13 @@ //! This module provides functions to work with `EXISTS` queries use crate::coredb::CoreDB; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use crate::resp::GroupBegin; use libtdb::TResult; /// Run an `EXISTS` query -pub async fn exists(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn exists(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany == 0 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; diff --git a/server/src/kvengine/flushdb.rs b/server/src/kvengine/flushdb.rs index b3e7a97b..5a3fbdd8 100644 --- a/server/src/kvengine/flushdb.rs +++ b/server/src/kvengine/flushdb.rs @@ -20,11 +20,12 @@ */ use crate::coredb::CoreDB; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use libtdb::TResult; /// Delete all the keys in the database -pub async fn flushdb(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn flushdb(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { if act.howmany() != 0 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; } diff --git a/server/src/kvengine/get.rs b/server/src/kvengine/get.rs index 86b8c4c4..723f6d41 100644 --- a/server/src/kvengine/get.rs +++ b/server/src/kvengine/get.rs @@ -23,13 +23,14 @@ //! This module provides functions to work with `GET` queries use crate::coredb::CoreDB; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use crate::resp::{BytesWrapper, GroupBegin}; use bytes::Bytes; use libtdb::TResult; /// Run a `GET` query -pub async fn get(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn get(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany != 1 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; diff --git a/server/src/kvengine/jget.rs b/server/src/kvengine/jget.rs index 99cbe486..709bcb7b 100644 --- a/server/src/kvengine/jget.rs +++ b/server/src/kvengine/jget.rs @@ -24,7 +24,8 @@ //! Functions for handling `JGET` queries use crate::coredb::CoreDB; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use libtdb::TResult; /// Run a `JGET` query @@ -36,7 +37,7 @@ use libtdb::TResult; /// {"key":"value"}\n /// ``` /// -pub async fn jget(_handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn jget(_handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany != 1 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; diff --git a/server/src/kvengine/keylen.rs b/server/src/kvengine/keylen.rs index d4caf0fa..bc9c816c 100644 --- a/server/src/kvengine/keylen.rs +++ b/server/src/kvengine/keylen.rs @@ -19,14 +19,15 @@ * */ use crate::coredb::CoreDB; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use crate::resp::GroupBegin; use libtdb::TResult; /// Run a `KEYLEN` query /// /// At this moment, `keylen` only supports a single key -pub async fn keylen(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn keylen(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany != 1 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; diff --git a/server/src/kvengine/mget.rs b/server/src/kvengine/mget.rs index ac357e04..65fda7f6 100644 --- a/server/src/kvengine/mget.rs +++ b/server/src/kvengine/mget.rs @@ -20,7 +20,8 @@ */ use crate::coredb::CoreDB; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use crate::resp::{BytesWrapper, GroupBegin}; use bytes::Bytes; use libtdb::terrapipe::RespCodes; @@ -28,7 +29,7 @@ use libtdb::TResult; /// Run an `MGET` query /// -pub async fn mget(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn mget(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany == 0 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; diff --git a/server/src/kvengine/mod.rs b/server/src/kvengine/mod.rs index 68762077..a1f94db0 100644 --- a/server/src/kvengine/mod.rs +++ b/server/src/kvengine/mod.rs @@ -39,13 +39,14 @@ pub mod update; pub mod uset; pub mod heya { //! Respond to `HEYA` queries + use crate::dbnet::Con; use crate::protocol; use crate::protocol::ActionGroup; use crate::CoreDB; use libtdb::TResult; - use protocol::{responses, Connection}; + use protocol::responses; /// Returns a `HEY!` `Response` - pub async fn heya(_db: &CoreDB, con: &mut Connection, _buf: ActionGroup) -> TResult<()> { + pub async fn heya(_db: &CoreDB, con: &mut Con<'_>, _buf: ActionGroup) -> TResult<()> { con.write_response(&**responses::fresp::R_HEYA).await } } diff --git a/server/src/kvengine/mset.rs b/server/src/kvengine/mset.rs index fb0e692d..7496ccf2 100644 --- a/server/src/kvengine/mset.rs +++ b/server/src/kvengine/mset.rs @@ -20,13 +20,14 @@ */ use crate::coredb::{self, CoreDB}; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use crate::resp::GroupBegin; use libtdb::TResult; use std::collections::hash_map::Entry; /// Run an `MSET` query -pub async fn mset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn mset(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany & 1 == 1 || howmany == 0 { // An odd number of arguments means that the number of keys diff --git a/server/src/kvengine/mupdate.rs b/server/src/kvengine/mupdate.rs index b09bf36e..04d0c42e 100644 --- a/server/src/kvengine/mupdate.rs +++ b/server/src/kvengine/mupdate.rs @@ -20,13 +20,14 @@ */ use crate::coredb::{self, CoreDB}; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use crate::resp::GroupBegin; use libtdb::TResult; use std::collections::hash_map::Entry; /// Run an `MUPDATE` query -pub async fn mupdate(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn mupdate(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany & 1 == 1 || howmany == 0 { // An odd number of arguments means that the number of keys diff --git a/server/src/kvengine/set.rs b/server/src/kvengine/set.rs index 42f5e0e0..a0ad73bb 100644 --- a/server/src/kvengine/set.rs +++ b/server/src/kvengine/set.rs @@ -23,14 +23,15 @@ //! This module provides functions to work with `SET` queries use crate::coredb::{self, CoreDB}; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use coredb::Data; use libtdb::TResult; use std::collections::hash_map::Entry; use std::hint::unreachable_unchecked; /// Run a `SET` query -pub async fn set(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn set(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany != 2 { // There should be exactly 2 arguments diff --git a/server/src/kvengine/strong.rs b/server/src/kvengine/strong.rs index e189f133..99bbfe61 100644 --- a/server/src/kvengine/strong.rs +++ b/server/src/kvengine/strong.rs @@ -31,7 +31,8 @@ //! Do note that this isn't the same as the gurantees provided by ACID transactions use crate::coredb::{CoreDB, Data}; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use libtdb::TResult; use std::hint::unreachable_unchecked; @@ -39,7 +40,7 @@ use std::hint::unreachable_unchecked; /// /// This either returns `Okay` if all the keys were set, or it returns an /// `Overwrite Error` or code `2` -pub async fn sset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn sset(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany & 1 == 1 || howmany == 0 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; @@ -101,7 +102,7 @@ pub async fn sset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TR /// /// This either returns `Okay` if all the keys were `del`eted, or it returns a /// `Nil`, which is code `1` -pub async fn sdel(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn sdel(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany == 0 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; @@ -157,7 +158,7 @@ pub async fn sdel(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TR /// /// This either returns `Okay` if all the keys were updated, or it returns `Nil` /// or code `1` -pub async fn supdate(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn supdate(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany & 1 == 1 || howmany == 0 { return con.write_response(&**responses::fresp::R_ACTION_ERR).await; diff --git a/server/src/kvengine/update.rs b/server/src/kvengine/update.rs index 4619eca4..1eeed75b 100644 --- a/server/src/kvengine/update.rs +++ b/server/src/kvengine/update.rs @@ -23,14 +23,15 @@ //! This module provides functions to work with `UPDATE` queries //! use crate::coredb::{self, CoreDB}; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use coredb::Data; use libtdb::TResult; use std::collections::hash_map::Entry; use std::hint::unreachable_unchecked; /// Run an `UPDATE` query -pub async fn update(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn update(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany != 2 { // There should be exactly 2 arguments diff --git a/server/src/kvengine/uset.rs b/server/src/kvengine/uset.rs index cf02d806..37235a1a 100644 --- a/server/src/kvengine/uset.rs +++ b/server/src/kvengine/uset.rs @@ -20,14 +20,15 @@ */ use crate::coredb::{self, CoreDB}; -use crate::protocol::{responses, ActionGroup, Connection}; +use crate::dbnet::Con; +use crate::protocol::{responses, ActionGroup}; use crate::resp::GroupBegin; use libtdb::TResult; /// Run an `USET` query /// /// This is like "INSERT or UPDATE" -pub async fn uset(handle: &CoreDB, con: &mut Connection, act: ActionGroup) -> TResult<()> { +pub async fn uset(handle: &CoreDB, con: &mut Con<'_>, act: ActionGroup) -> TResult<()> { let howmany = act.howmany(); if howmany & 1 == 1 || howmany == 0 { // An odd number of arguments means that the number of keys diff --git a/server/src/main.rs b/server/src/main.rs index 9389552a..4ccbfd0a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -26,8 +26,8 @@ //! the modules for their respective documentation. use crate::config::BGSave; +use crate::config::PortConfig; use crate::config::SnapshotConfig; -use tokio::net::TcpListener; mod config; use std::env; mod admin; @@ -54,7 +54,7 @@ use jemallocator::Jemalloc; static GLOBAL: Jemalloc = Jemalloc; /// The version text -static MSG: &'static str = "TerrabaseDB v0.5.0 | https://github.com/terrabasedb/terrabase"; +static MSG: &'static str = "TerrabaseDB v0.5.1 | https://github.com/terrabasedb/terrabasedb"; /// The terminal art for `!noart` configurations static TEXT: &'static str = " _______ _ _____ ____ @@ -76,7 +76,7 @@ async fn main() { // Start the server which asynchronously waits for a CTRL+C signal // which will safely shut down the server let (tcplistener, bgsave_config, snapshot_config, restore_filepath) = - check_args_or_connect().await; + check_args_and_get_cfg().await; run( tcplistener, bgsave_config, @@ -87,10 +87,10 @@ async fn main() { .await; } -/// This function checks the command line arguments and binds to an appropriate -/// port and host, as per the supplied configuration options -async fn check_args_or_connect() -> ( - TcpListener, +/// This function checks the command line arguments and either returns a config object +/// or prints an error to `stderr` and terminates the server +async fn check_args_and_get_cfg() -> ( + PortConfig, BGSave, SnapshotConfig, Option, @@ -104,35 +104,17 @@ async fn check_args_or_connect() -> ( println!("{}", MSG); } log::info!("Using settings from supplied configuration"); - ( - TcpListener::bind(cfg.get_host_port_tuple()).await, - cfg.bgsave, - cfg.snapshot, - file, - ) + (cfg.ports, cfg.bgsave, cfg.snapshot, file) } Ok(config::ConfigType::Def(cfg, file)) => { println!("{}\n{}", TEXT, MSG); log::warn!("No configuration file supplied. Using default settings"); - ( - TcpListener::bind(cfg.get_host_port_tuple()).await, - cfg.bgsave, - cfg.snapshot, - file, - ) + (cfg.ports, cfg.bgsave, cfg.snapshot, file) } Err(e) => { log::error!("{}", e); std::process::exit(0x100); } }; - match binding_and_cfg { - (Ok(b), bgsave_cfg, snapshot_cfg, restore_file) => { - (b, bgsave_cfg, snapshot_cfg, restore_file) - } - (Err(e), _, _, _) => { - log::error!("Failed to bind to socket with error: '{}'", e); - std::process::exit(0x100); - } - } + binding_and_cfg } diff --git a/server/src/protocol/mod.rs b/server/src/protocol/mod.rs index abfba009..b740c7c8 100644 --- a/server/src/protocol/mod.rs +++ b/server/src/protocol/mod.rs @@ -38,6 +38,7 @@ use std::io::Result as IoResult; use std::net::SocketAddr; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::net::TcpStream; +pub mod tls; /// A TCP connection wrapper pub struct Connection { diff --git a/server/src/protocol/tls.rs b/server/src/protocol/tls.rs new file mode 100644 index 00000000..d9aa3242 --- /dev/null +++ b/server/src/protocol/tls.rs @@ -0,0 +1,267 @@ +/* + * Created on Fri Dec 18 2020 + * + * This file is a part of TerrabaseDB + * Copyright (c) 2020, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use super::deserializer; +use super::responses; +use super::IoResult; +use super::ParseResult; +use super::QueryResult; +use crate::dbnet::Con; +use crate::dbnet::Terminator; +use crate::resp::Writable; +use crate::CoreDB; +use bytes::Buf; +use bytes::BytesMut; +use libtdb::TResult; +use libtdb::BUF_CAP; +use openssl::ssl::{Ssl, SslAcceptor, SslFiletype, SslMethod}; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use tokio::io::BufWriter; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::Semaphore; +use tokio::sync::{broadcast, mpsc}; +use tokio::time::{self, Duration}; +use tokio_openssl::SslStream; + +pub struct SslListener { + /// An atomic reference to the coretable + pub db: CoreDB, + /// The incoming connection listener (binding) + pub listener: TcpListener, + /// The maximum number of connections + climit: Arc, + /// The shutdown broadcaster + pub signal: broadcast::Sender<()>, + // When all `Sender`s are dropped - the `Receiver` gets a `None` value + // We send a clone of `terminate_tx` to each `CHandler` + pub terminate_tx: mpsc::Sender<()>, + pub terminate_rx: mpsc::Receiver<()>, + acceptor: SslAcceptor, +} + +impl SslListener { + pub fn new_pem_based_ssl_connection( + key_file: String, + chain_file: String, + db: CoreDB, + listener: TcpListener, + climit: Arc, + signal: broadcast::Sender<()>, + terminate_tx: mpsc::Sender<()>, + terminate_rx: mpsc::Receiver<()>, + ) -> TResult { + log::debug!("New SSL/TLS connection registered"); + let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls())?; + acceptor.set_private_key_file(key_file, SslFiletype::PEM)?; + acceptor.set_certificate_chain_file(chain_file)?; + let acceptor = acceptor.build(); + Ok(SslListener { + db, + listener, + climit, + signal, + terminate_tx, + terminate_rx, + acceptor, + }) + } + async fn accept(&mut self) -> TResult> { + log::debug!("Trying to accept a SSL connection"); + let mut backoff = 1; + loop { + match self.listener.accept().await { + // We don't need the bindaddr + // We get the encrypted stream which we need to decrypt + // by using the acceptor + Ok((stream, _)) => { + log::debug!("Accepted an SSL/TLS connection"); + let ssl = Ssl::new(self.acceptor.context())?; + let mut stream = SslStream::new(ssl, stream)?; + Pin::new(&mut stream).accept().await?; + log::debug!("Connected to secure socket over TCP"); + return Ok(stream); + } + Err(e) => { + log::debug!("Failed to establish a secure connection"); + if backoff > 64 { + // Too many retries, goodbye user + return Err(e.into()); + } + } + } + // Wait for the `backoff` duration + time::sleep(Duration::from_secs(backoff)).await; + // We're using exponential backoff + backoff *= 2; + } + } + pub async fn run(&mut self) -> TResult<()> { + log::debug!("Started secure server"); + loop { + // Take the permit first, but we won't use it right now + // that's why we will forget it + self.climit.acquire().await.unwrap().forget(); + let stream = self.accept().await?; + let mut sslhandle = SslConnectionHandler { + db: self.db.clone(), + con: SslConnection::new(stream), + climit: self.climit.clone(), + terminator: Terminator::new(self.signal.subscribe()), + _term_sig_tx: self.terminate_tx.clone(), + }; + tokio::spawn(async move { + log::debug!("Spawned listener task"); + if let Err(e) = sslhandle.run().await { + log::error!("Error: {}", e); + } + }); + } + } +} + +pub struct SslConnectionHandler { + db: CoreDB, + con: SslConnection, + climit: Arc, + terminator: Terminator, + _term_sig_tx: mpsc::Sender<()>, +} + +impl SslConnectionHandler { + pub async fn run(&mut self) -> TResult<()> { + log::debug!("SslConnectionHanler initialized to handle a remote client"); + while !self.terminator.is_termination_signal() { + let try_df = tokio::select! { + tdf = self.con.read_query() => tdf, + _ = self.terminator.receive_signal() => { + return Ok(()); + } + }; + match try_df { + Ok(QueryResult::Q(s)) => { + self.db + .execute_query(s, &mut Con::init_secure(&mut self.con)) + .await? + } + Ok(QueryResult::E(r)) => { + log::debug!("Failed to read query!"); + self.con.close_conn_with_error(r).await? + } + Ok(QueryResult::Empty) => return Ok(()), + Err(e) => return Err(e.into()), + } + } + Ok(()) + } +} +impl Drop for SslConnectionHandler { + fn drop(&mut self) { + // Make sure that the permit is returned to the semaphore + // in the case that there is a panic inside + self.climit.add_permits(1); + } +} + +pub struct SslConnection { + stream: BufWriter>, + buffer: BytesMut, +} + +impl SslConnection { + pub fn new(stream: SslStream) -> Self { + SslConnection { + stream: BufWriter::new(stream), + buffer: BytesMut::with_capacity(BUF_CAP), + } + } + async fn read_again(&mut self) -> Result<(), String> { + match self.stream.read_buf(&mut self.buffer).await { + Ok(0) => { + // If 0 bytes were received, then the remote end closed + // the connection + if self.buffer.is_empty() { + return Ok(()); + } else { + return Err(format!( + "Connection reset while reading from {}", + if let Ok(p) = self.get_peer() { + p.to_string() + } else { + "peer".to_owned() + } + ) + .into()); + } + } + Ok(_) => Ok(()), + Err(e) => return Err(format!("{}", e)), + } + } + fn get_peer(&self) -> IoResult { + self.stream.get_ref().get_ref().peer_addr() + } + /// Try to parse a query from the buffered data + fn try_query(&mut self) -> Result { + if self.buffer.is_empty() { + return Err(()); + } + Ok(deserializer::parse(&self.buffer)) + } + pub async fn read_query(&mut self) -> Result { + self.read_again().await?; + loop { + match self.try_query() { + Ok(ParseResult::Query(query, forward)) => { + self.buffer.advance(forward); + return Ok(QueryResult::Q(query)); + } + Ok(ParseResult::BadPacket(bp)) => { + self.buffer.advance(bp); + return Ok(QueryResult::E(responses::fresp::R_PACKET_ERR.to_owned())); + } + Err(_) => { + return Ok(QueryResult::Empty); + } + _ => (), + } + self.read_again().await?; + } + } + /// Write a response to the stream + pub async fn write_response(&mut self, streamer: impl Writable) -> TResult<()> { + streamer.write(&mut self.stream).await?; + Ok(()) + } + pub async fn flush_stream(&mut self) -> TResult<()> { + self.stream.flush().await?; + Ok(()) + } + /// Wraps around the `write_response` used to differentiate between a + /// success response and an error response + pub async fn close_conn_with_error(&mut self, resp: Vec) -> TResult<()> { + self.write_response(resp).await?; + self.stream.flush().await?; + Ok(()) + } +} diff --git a/server/src/queryengine/mod.rs b/server/src/queryengine/mod.rs index c732ed92..0299ddf2 100644 --- a/server/src/queryengine/mod.rs +++ b/server/src/queryengine/mod.rs @@ -22,8 +22,9 @@ //! # The Query Engine use crate::coredb::CoreDB; +use crate::dbnet::Con; use crate::protocol::ActionGroup; -use crate::protocol::{responses, Connection}; +use crate::protocol::{responses}; use crate::{admin, kvengine}; use libtdb::TResult; mod tags { @@ -66,7 +67,7 @@ mod tags { } /// Execute a simple(*) query -pub async fn execute_simple(db: &CoreDB, con: &mut Connection, buf: ActionGroup) -> TResult<()> { +pub async fn execute_simple(db: &CoreDB, con: &mut Con<'_>, buf: ActionGroup) -> TResult<()> { let first = match buf.get_first() { None => { return con diff --git a/server/src/resp/mod.rs b/server/src/resp/mod.rs index 799704c1..a4c652ef 100644 --- a/server/src/resp/mod.rs +++ b/server/src/resp/mod.rs @@ -21,15 +21,16 @@ //! Utilities for generating responses, which are only used by the `server` //! - use bytes::Bytes; use libtdb::terrapipe::RespCodes; use std::error::Error; use std::future::Future; +use std::io::Error as IoError; use std::pin::Pin; use tokio::io::AsyncWriteExt; use tokio::io::BufWriter; use tokio::net::TcpStream; +use tokio_openssl::SslStream; /// # The `Writable` trait /// All trait implementors are given access to an asynchronous stream to which @@ -42,10 +43,44 @@ pub trait Writable { */ fn write<'s>( self, - con: &'s mut BufWriter, + con: &'s mut impl IsConnection, ) -> Pin>> + Send + Sync + 's>>; } +pub trait IsConnection: std::marker::Sync + std::marker::Send { + fn write_lowlevel<'s>( + &'s mut self, + bytes: &'s [u8], + ) -> Pin> + Send + Sync + 's>>; +} + +impl IsConnection for BufWriter { + fn write_lowlevel<'s>( + &'s mut self, + bytes: &'s [u8], + ) -> Pin> + Send + Sync + 's>> { + Box::pin(self.write(bytes)) + } +} + +impl IsConnection for SslStream { + fn write_lowlevel<'s>( + &'s mut self, + bytes: &'s [u8], + ) -> Pin> + Send + Sync + 's>> { + Box::pin(self.write(bytes)) + } +} + +impl IsConnection for BufWriter> { + fn write_lowlevel<'s>( + &'s mut self, + bytes: &'s [u8], + ) -> Pin> + Send + Sync + 's>> { + Box::pin(self.write(bytes)) + } +} + /// A `BytesWrapper` object wraps around a `Bytes` object that might have been pulled /// from `CoreDB`. /// @@ -73,14 +108,14 @@ impl BytesWrapper { impl Writable for Vec { fn write<'s>( self, - con: &'s mut BufWriter, + con: &'s mut impl IsConnection, ) -> Pin>> + Send + Sync + 's)>> { async fn write_bytes( - con: &mut BufWriter, + con: &mut impl IsConnection, resp: Vec, ) -> Result<(), Box> { - con.write(&resp).await?; + con.write_lowlevel(&resp).await?; Ok(()) } Box::pin(write_bytes(con, self)) @@ -90,14 +125,14 @@ impl Writable for Vec { impl Writable for &'static [u8] { fn write<'s>( self, - con: &'s mut BufWriter, + con: &'s mut impl IsConnection, ) -> Pin>> + Send + Sync + 's)>> { async fn write_bytes( - con: &mut BufWriter, + con: &mut impl IsConnection, resp: &[u8], ) -> Result<(), Box> { - con.write(&resp).await?; + con.write_lowlevel(&resp).await?; Ok(()) } Box::pin(write_bytes(con, &self)) @@ -107,28 +142,28 @@ impl Writable for &'static [u8] { impl Writable for BytesWrapper { fn write<'s>( self, - con: &'s mut BufWriter, + con: &'s mut impl IsConnection, ) -> Pin>> + Send + Sync + 's)>> { async fn write_bytes( - con: &mut BufWriter, + con: &mut impl IsConnection, bytes: Bytes, ) -> Result<(), Box> { // First write a `+` character to the stream since this is a // string (we represent `String`s as `Byte` objects internally) // and since `Bytes` are effectively `String`s we will append the // type operator `+` to the stream - con.write(&[b'+']).await?; + con.write_lowlevel(&[b'+']).await?; // Now get the size of the Bytes object as bytes let size = bytes.len().to_string().into_bytes(); // Write this to the stream - con.write(&size).await?; + con.write_lowlevel(&size).await?; // Now write a LF character - con.write(&[b'\n']).await?; + con.write_lowlevel(&[b'\n']).await?; // Now write the REAL bytes (of the object) - con.write(&bytes).await?; + con.write_lowlevel(&bytes).await?; // Now write another LF - con.write(&[b'\n']).await?; + con.write_lowlevel(&[b'\n']).await?; Ok(()) } Box::pin(write_bytes(con, self.finish_into_bytes())) @@ -138,44 +173,44 @@ impl Writable for BytesWrapper { impl Writable for RespCodes { fn write<'s>( self, - con: &'s mut BufWriter, + con: &'s mut impl IsConnection, ) -> Pin>> + Send + Sync + 's)>> { async fn write_bytes( - con: &mut BufWriter, + con: &mut impl IsConnection, code: RespCodes, ) -> Result<(), Box> { if let RespCodes::OtherError(Some(e)) = code { // Since this is an other error which contains a description // we'll write ! followed by the string - con.write(&[b'!']).await?; + con.write_lowlevel(&[b'!']).await?; // Convert the string into a vector of bytes let e = e.to_string().into_bytes(); // Now get the length of the byte vector and turn it into // a string and then into a byte vector let len_as_bytes = e.len().to_string().into_bytes(); // Write the length - con.write(&len_as_bytes).await?; + con.write_lowlevel(&len_as_bytes).await?; // Then an LF - con.write(&[b'\n']).await?; + con.write_lowlevel(&[b'\n']).await?; // Then the error string - con.write(&e).await?; + con.write_lowlevel(&e).await?; // Then another LF - con.write(&[b'\n']).await?; + con.write_lowlevel(&[b'\n']).await?; // And now we're done return Ok(()); } // Self's tsymbol is ! // The length of the response code is 1 // And we need a newline - con.write(&[b'!', b'1', b'\n']).await?; + con.write_lowlevel(&[b'!', b'1', b'\n']).await?; // We need to get the u8 version of the response code let code: u8 = code.into(); // We need the UTF8 equivalent of the response code let code_bytes = code.to_string().into_bytes(); - con.write(&code_bytes).await?; + con.write_lowlevel(&code_bytes).await?; // Now append a newline - con.write(&[b'\n']).await?; + con.write_lowlevel(&[b'\n']).await?; Ok(()) } Box::pin(write_bytes(con, self)) @@ -185,27 +220,27 @@ impl Writable for RespCodes { impl Writable for GroupBegin { fn write<'s>( self, - con: &'s mut BufWriter, + con: &'s mut impl IsConnection, ) -> Pin>> + Send + Sync + 's)>> { async fn write_bytes( - con: &mut BufWriter, + con: &mut impl IsConnection, size: usize, ) -> Result<(), Box> { - con.write(b"#2\n*1\n").await?; + con.write_lowlevel(b"#2\n*1\n").await?; // First write a `#` which indicates that the next bytes give the // prefix length - con.write(&[b'#']).await?; + con.write_lowlevel(&[b'#']).await?; let group_len_as_bytes = size.to_string().into_bytes(); let group_prefix_len_as_bytes = (group_len_as_bytes.len() + 1).to_string().into_bytes(); // Now write Self's len as bytes - con.write(&group_prefix_len_as_bytes).await?; + con.write_lowlevel(&group_prefix_len_as_bytes).await?; // Now write a LF and '&' which signifies the beginning of a datagroup - con.write(&[b'\n', b'&']).await?; + con.write_lowlevel(&[b'\n', b'&']).await?; // Now write the number of items in the datagroup as bytes - con.write(&group_len_as_bytes).await?; + con.write_lowlevel(&group_len_as_bytes).await?; // Now write a '\n' character - con.write(&[b'\n']).await?; + con.write_lowlevel(&[b'\n']).await?; Ok(()) } Box::pin(write_bytes(con, self.0)) @@ -215,20 +250,20 @@ impl Writable for GroupBegin { impl Writable for usize { fn write<'s>( self, - con: &'s mut BufWriter, + con: &'s mut impl IsConnection, ) -> Pin>> + Send + Sync + 's)>> { async fn write_bytes( - con: &mut BufWriter, + con: &mut impl IsConnection, val: usize, ) -> Result<(), Box> { - con.write(b":").await?; + con.write_lowlevel(b":").await?; let usize_bytes = val.to_string().into_bytes(); let usize_bytes_len = usize_bytes.len().to_string().into_bytes(); - con.write(&usize_bytes_len).await?; - con.write(b"\n").await?; - con.write(&usize_bytes).await?; - con.write(b"\n").await?; + con.write_lowlevel(&usize_bytes_len).await?; + con.write_lowlevel(b"\n").await?; + con.write_lowlevel(&usize_bytes).await?; + con.write_lowlevel(b"\n").await?; Ok(()) } Box::pin(write_bytes(con, self)) @@ -238,20 +273,17 @@ impl Writable for usize { impl Writable for u64 { fn write<'s>( self, - con: &'s mut BufWriter, + con: &'s mut impl IsConnection, ) -> Pin>> + Send + Sync + 's)>> { - async fn write_bytes( - con: &mut BufWriter, - val: u64, - ) -> Result<(), Box> { - con.write(b":").await?; + async fn write_bytes(con: &mut impl IsConnection, val: u64) -> Result<(), Box> { + con.write_lowlevel(b":").await?; let usize_bytes = val.to_string().into_bytes(); let usize_bytes_len = usize_bytes.len().to_string().into_bytes(); - con.write(&usize_bytes_len).await?; - con.write(b"\n").await?; - con.write(&usize_bytes).await?; - con.write(b"\n").await?; + con.write_lowlevel(&usize_bytes_len).await?; + con.write_lowlevel(b"\n").await?; + con.write_lowlevel(&usize_bytes).await?; + con.write_lowlevel(b"\n").await?; Ok(()) } Box::pin(write_bytes(con, self)) diff --git a/server/src/tests/kvengine.rs b/server/src/tests/kvengine.rs index 44c6863d..f5be1b09 100644 --- a/server/src/tests/kvengine.rs +++ b/server/src/tests/kvengine.rs @@ -81,6 +81,7 @@ mod __private { ) where T: AsRef, { + use tokio::io::AsyncWriteExt; let mut query = String::from("MSET "); query.push_str(values_split_with_whitespace.as_ref()); let count_bytes_len = homwany.to_string().as_bytes().len(); diff --git a/tdb-bench/src/cli.yml b/tdb-bench/src/cli.yml index ae09dc82..2080f7da 100644 --- a/tdb-bench/src/cli.yml +++ b/tdb-bench/src/cli.yml @@ -20,7 +20,7 @@ # name: TerrabaseDB Benchmark Tool -version: 0.5.0 +version: 0.5.1 author: Sayan N. about: | The TerrabaseDB benchmark tool can be used to benchmark TerrabaseDB installations. diff --git a/tdb-bench/src/main.rs b/tdb-bench/src/main.rs index 3cc1be6b..eef8c61b 100644 --- a/tdb-bench/src/main.rs +++ b/tdb-bench/src/main.rs @@ -28,7 +28,7 @@ mod benchtool { use devtimer::DevTime; use libtdb::terrapipe; use rand::distributions::Alphanumeric; - use rand::{thread_rng, Rng}; + use rand::thread_rng; use serde::Serialize; use std::io::prelude::*; use std::net::{self, TcpStream}; @@ -167,7 +167,7 @@ mod benchtool { }, None => host.push_str("2003"), } - let rand = thread_rng(); + let mut rand = thread_rng(); if let Some(matches) = matches.subcommand_matches("testkey") { let numkeys = matches.value_of("count").unwrap(); if let Ok(num) = numkeys.parse::() { @@ -175,27 +175,11 @@ mod benchtool { println!("Generating keys ..."); let keys: Vec = (0..num) .into_iter() - .map(|_| { - let rand_string: String = unsafe { - String::from_utf8_unchecked( - rand.clone().sample_iter(&Alphanumeric).take(8).collect(), - ) - .to_string() - }; - rand_string - }) + .map(|_| ran_string(8, &mut rand)) .collect(); let values: Vec = (0..num) .into_iter() - .map(|_| { - let rand_string: String = unsafe { - String::from_utf8_unchecked( - rand.clone().sample_iter(&Alphanumeric).take(8).collect(), - ) - .to_string() - }; - rand_string - }) + .map(|_| ran_string(8, &mut rand)) .collect(); let set_packs: Vec> = (0..num) .map(|idx| terrapipe::proc_query(format!("SET {} {}", keys[idx], values[idx]))) @@ -235,40 +219,18 @@ mod benchtool { max_queries, (packet_size * 2), // key size + value size ); - let rand = thread_rng(); + let mut rand = thread_rng(); let mut dt = DevTime::new_complex(); // Create separate connection pools for get and set operations let mut setpool = Netpool::new(max_connections, &host); let mut getpool = Netpool::new(max_connections, &host); let keys: Vec = (0..max_queries) .into_iter() - .map(|_| { - let rand_string: String = unsafe { - String::from_utf8_unchecked( - rand.clone() - .sample_iter(&Alphanumeric) - .take(packet_size) - .collect(), - ) - .to_string() - }; - rand_string - }) + .map(|_| ran_string(packet_size, &mut rand)) .collect(); let values: Vec = (0..max_queries) .into_iter() - .map(|_| { - let rand_string: String = unsafe { - String::from_utf8_unchecked( - rand.clone() - .sample_iter(&Alphanumeric) - .take(packet_size) - .collect(), - ) - .to_string() - }; - rand_string - }) + .map(|_| ran_string(packet_size, &mut rand)) .collect(); /* We create three vectors of vectors: `set_packs`, `get_packs` and `del_packs` @@ -333,6 +295,15 @@ mod benchtool { fn calc(reqs: usize, time: u128) -> f64 { reqs as f64 / (time as f64 / 1_000_000_000 as f64) } + + fn ran_string(len: usize, rand: impl rand::Rng) -> String { + let rand_string: String = rand + .sample_iter(&Alphanumeric) + .take(len) + .map(char::from) + .collect(); + rand_string + } } fn main() { diff --git a/tdb-macros/Cargo.toml b/tdb-macros/Cargo.toml index 3682b75d..7f467499 100644 --- a/tdb-macros/Cargo.toml +++ b/tdb-macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tdb_macros" -version = "0.2.2" +version = "0.5.0" authors = ["Sayan Nandan "] edition = "2018"