diff --git a/Cargo.lock b/Cargo.lock index 7f1f8d1c..aaedab6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,6 +69,12 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" +[[package]] +name = "bytes" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad1f8e949d755f9d79112b5bb46938e0ef9d3804a0b16dfab13aafcaa5f0fa72" + [[package]] name = "cc" version = "1.0.59" @@ -165,11 +171,100 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" +[[package]] +name = "futures" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c70be434c505aee38639abccb918163b63158a4b4bb791b45b7023044bdc3c9c" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f01c61843314e95f96cc9245702248733a3a3d744e43e2e755e3c7af8348a0a9" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" -version = "0.3.5" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +checksum = "db8d3b0917ff63a2a96173133c02818fac4a746b0a57569d3baca9ec0e945e08" + +[[package]] +name = "futures-executor" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ee9ca2f7eb4475772cf39dd1cd06208dce2670ad38f4d9c7262b3e15f127068" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e37c1a51b037b80922864b8eed90692c5cd8abd4c71ce49b77146caa47f3253b" + +[[package]] +name = "futures-macro" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f8719ca0e1f3c5e34f3efe4570ef2c0610ca6da85ae7990d472e9cbfba13664" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6adabac1290109cfa089f79192fb6244ad2c3f1cc2281f3e1dd987592b71feb" + +[[package]] +name = "futures-task" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92a0843a2ff66823a8f7c77bffe9a09be2b64e533562c412d63075643ec0038" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "036a2107cdeb57f6d7322f1b6c363dad67cd63ca3b7d1b925bdf75bd5d96cda9" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] [[package]] name = "getrandom" @@ -246,7 +341,7 @@ checksum = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701" name = "libtdb" version = "0.5.0" dependencies = [ - "bytes", + "bytes 0.6.0", "lazy_static", "termcolor", ] @@ -344,9 +439,9 @@ checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0" [[package]] name = "openssl" -version = "0.10.31" +version = "0.10.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d008f51b1acffa0d3450a68606e6a51c123012edaacb0f4e1426bd978869187" +checksum = "038d43985d1ddca7a9900630d8cd031b56e4794eecc2e9ea39dd17aa04399a70" dependencies = [ "bitflags", "cfg-if 1.0.0", @@ -367,9 +462,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.59" +version = "0.9.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de52d8eabd217311538a39bba130d7dea1f1e118010fee7a033d966845e7d5fe" +checksum = "921fc71883267538946025deffb622905ecad223c28efbfdef9bb59a0175f3e6" dependencies = [ "autocfg", "cc", @@ -405,12 +500,38 @@ dependencies = [ "winapi", ] +[[package]] +name = "pin-project" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95b70b68509f17aa2857863b6fa00bf21fc93674c7a8893de2f469f6aa7ca2f2" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caa25a6393f22ce819b0f50e0be89287292fda8d425be38ee0ca14c4931d9e71" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.19" @@ -423,6 +544,18 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20" +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" + [[package]] name = "proc-macro2" version = "1.0.24" @@ -490,9 +623,9 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] name = "regex" -version = "1.4.2" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c" +checksum = "d9251239e129e16308e70d853559389de218ac275b515068abc96829d05b948a" dependencies = [ "aho-corasick", "memchr", @@ -502,9 +635,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.21" +version = "0.6.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" +checksum = "b5eb417147ba9860a96cfe72a0b93bf88fee1744b5636ec99ab20c1aa9376581" [[package]] name = "ryu" @@ -605,7 +738,7 @@ name = "tdb" version = "0.5.0" dependencies = [ "bincode", - "bytes", + "bytes 0.6.0", "chrono", "clap", "env_logger", @@ -620,8 +753,8 @@ dependencies = [ "serde", "serde_derive", "tdb_macros", - "tokio", - "tokio-openssl", + "tokio 0.3.6", + "tokio-openssl 0.5.0", "toml", ] @@ -692,7 +825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "720ba21c25078711bf456d607987d95bce90f7c3bea5abe1db587862e7a1e87c" dependencies = [ "autocfg", - "bytes", + "bytes 0.6.0", "futures-core", "libc", "memchr", @@ -703,7 +836,27 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "slab", - "tokio-macros", + "tokio-macros 0.3.1", + "winapi", +] + +[[package]] +name = "tokio" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d258221f566b6c803c7b4714abadc080172b272090cdc5e244a6d4dd13c3a6bd" +dependencies = [ + "autocfg", + "bytes 1.0.0", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "tokio-macros 1.0.0", "winapi", ] @@ -718,6 +871,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-macros" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42517d2975ca3114b22a16192634e8241dc5cc1f130be194645970cc1c371494" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-openssl" version = "0.5.0" @@ -726,7 +890,19 @@ checksum = "d01e5cc2d3a154fa310982d0affaec8140d6476805422265b2f648eb813f937f" dependencies = [ "openssl", "openssl-sys", - "tokio", + "tokio 0.3.6", +] + +[[package]] +name = "tokio-openssl" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac1bec5c0a4aa71e3459802c7a12e8912c2091ce2151004f9ce95cc5d1c6124e" +dependencies = [ + "futures", + "openssl", + "pin-project", + "tokio 1.0.1", ] [[package]] @@ -742,12 +918,14 @@ dependencies = [ name = "tsh" version = "0.5.0" dependencies = [ - "bytes", + "bytes 1.0.0", "clap", "lazy_static", "libtdb", + "openssl", "regex", - "tokio", + "tokio 1.0.1", + "tokio-openssl 0.6.1", ] [[package]] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 2c436129..3a15caec 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -8,8 +8,10 @@ edition = "2018" [dependencies] libtdb = {path = "../libtdb"} -tokio = {version = "0.3.6", features = ["full"]} -bytes = "0.6.0" -regex = "1.4.2" +tokio = {version = "1.0.1", features = ["full"]} +bytes = "1.0.0" +regex = "1.4.3" lazy_static = "1.4.0" -clap = {version = "2.33.3", features=["yaml"]} \ No newline at end of file +clap = {version = "2.33.3", features=["yaml"]} +openssl = { version = "0.10.32", features = ["vendored"] } +tokio-openssl = "0.6.1" \ No newline at end of file diff --git a/cli/src/argparse.rs b/cli/src/argparse.rs index ca547e19..568a3511 100644 --- a/cli/src/argparse.rs +++ b/cli/src/argparse.rs @@ -23,6 +23,7 @@ use crate::protocol; use clap::load_yaml; use clap::App; use libtdb::terrapipe::ADDR; +use protocol::{Con, Connection, SslConnection}; use std::io::{self, prelude::*}; use std::process; const MSG_WELCOME: &'static str = "TerrabaseDB v0.5.0"; @@ -39,6 +40,7 @@ pub async fn start_repl() { Some(h) => h.to_owned(), None => ADDR.to_owned(), }; + let domain = host.clone(); host.push(':'); match matches.value_of("port") { Some(p) => match p.parse::() { @@ -50,23 +52,32 @@ pub async fn start_repl() { }, None => host.push_str("2003"), } + let mut con = if let Some(sslcert) = matches.value_of("cert") { + let con = match SslConnection::new(&domain, &host, sslcert).await { + Ok(c) => c, + Err(e) => { + eprintln!("ERROR: {}", e); + process::exit(0x100); + } + }; + Con::Secure(con) + } else { + let con = match Connection::new(&host).await { + Ok(c) => c, + Err(e) => { + eprintln!("ERROR: {}", e); + process::exit(0x100); + } + }; + Con::Insecure(con) + }; if let Some(eval_expr) = matches.value_of("eval") { if eval_expr.len() == 0 { return; } - if let Err(e) = protocol::Connection::oneshot(&host, eval_expr.to_string()).await { - eprintln!("ERROR: {}", e); - process::exit(0x100); - } + con.execute_query(eval_expr.to_string()).await; return; } - let mut con = match protocol::Connection::new(&host).await { - Ok(c) => c, - Err(e) => { - eprintln!("ERROR: {}", e); - process::exit(0x100); - } - }; println!("{}", MSG_WELCOME); loop { print!("tsh>"); @@ -85,6 +96,6 @@ pub async fn start_repl() { // The query was empty, so let it be continue; } - con.run_query(rl).await; + con.execute_query(rl).await; } } diff --git a/cli/src/cli.yml b/cli/src/cli.yml index 0745b293..bd6ae5fb 100644 --- a/cli/src/cli.yml +++ b/cli/src/cli.yml @@ -24,24 +24,31 @@ version: 0.5.0 author: Sayan N. about: The TerrabaseDB Shell (tsh) args: - - host: - short: h - required: false - long: host - value_name: host - help: Sets the remote host to connect to - takes_value: true - - port: - short: p - required: false - long: port - value_name: port - help: Sets the remote port to connect to - takes_value: true - - eval: - short: e - required: false - long: eval - value_name: expression - help: Run an expression without REPL - takes_value: true + - host: + short: h + required: false + long: host + value_name: host + help: Sets the remote host to connect to + takes_value: true + - port: + short: p + required: false + long: port + value_name: port + help: Sets the remote port to connect to + takes_value: true + - eval: + short: e + required: false + long: eval + value_name: expression + help: Run an expression without REPL + takes_value: true + - cert: + short: C + required: false + long: sslcert + value_name: cert + help: Sets the PEM certificate to use for SSL connections + takes_value: true diff --git a/cli/src/protocol/mod.rs b/cli/src/protocol/mod.rs index dde8edb8..280f0423 100644 --- a/cli/src/protocol/mod.rs +++ b/cli/src/protocol/mod.rs @@ -26,14 +26,32 @@ use lazy_static::lazy_static; use libtdb::terrapipe; use libtdb::TResult; use libtdb::BUF_CAP; +use openssl::ssl::SslConnector; +use openssl::ssl::SslMethod; use regex::Regex; +use std::pin::Pin; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; +use tokio_openssl::SslStream; lazy_static! { static ref RE: Regex = Regex::new("[^\\s\"']+|\"[^\"]*\"|'[^']*'").unwrap(); } +pub enum Con { + Secure(SslConnection), + Insecure(Connection), +} + +impl Con { + pub async fn execute_query(&mut self, query: String) { + match self { + Con::Insecure(con) => con.run_query(query).await, + Con::Secure(con) => con.run_query(query).await, + } + } +} + /// A `Connection` is a wrapper around a`TcpStream` and a read buffer pub struct Connection { stream: TcpStream, @@ -44,21 +62,12 @@ impl Connection { /// Create a new connection, creating a connection to `host` pub async fn new(host: &str) -> TResult { let stream = TcpStream::connect(host).await?; - println!("Connected to {}", host); + println!("Connected to tp://{}", host); Ok(Connection { stream, buffer: BytesMut::with_capacity(BUF_CAP), }) } - pub async fn oneshot(host: &str, query: String) -> TResult<()> { - let mut con = Connection { - stream: TcpStream::connect(host).await?, - buffer: BytesMut::with_capacity(BUF_CAP), - }; - con.run_query(query).await; - drop(con); - Ok(()) - } /// This function will write a query to the stream and read the response from the /// server. It will then determine if the returned response is complete or incomplete /// or invalid. @@ -85,6 +94,100 @@ impl Connection { return; } } + println!( + "The server returned: {}", + String::from_utf8_lossy(&self.buffer) + ); + match self.try_response().await { + ClientResult::Empty(f) => { + self.buffer.advance(f); + eprintln!("ERROR: The remote end reset the connection"); + return; + } + ClientResult::Incomplete => { + continue; + } + ClientResult::Response(r, f) => { + self.buffer.advance(f); + if r.len() == 0 { + return; + } + for group in r { + println!("{}", group); + } + return; + } + ClientResult::InvalidResponse(_) => { + self.buffer.clear(); + eprintln!("{}", ClientResult::InvalidResponse(0)); + return; + } + } + } + } + /// This function is a subroutine of `run_query` used to parse the response packet + async fn try_response(&mut self) -> ClientResult { + if self.buffer.is_empty() { + // The connection was possibly reset + return ClientResult::Empty(0); + } + deserializer::parse(&self.buffer) + } +} + +/// An `SslConnection` is a wrapper around a `SslStream` provided by OpenSSL and a +/// read buffer +pub struct SslConnection { + stream: SslStream, + buffer: BytesMut, +} + +impl SslConnection { + /// Create a new connection, creating a connection to `host` + pub async fn new(domain: &str, host: &str, sslcert: &str) -> TResult { + let mut connector = SslConnector::builder(SslMethod::tls())?; + connector.set_ca_file(sslcert)?; + let ssl = connector.build().configure()?.into_ssl(&domain)?; + let stream = TcpStream::connect(host).await?; + let mut stream = SslStream::new(ssl, stream)?; + Pin::new(&mut stream).connect().await.unwrap(); + println!("Connected to tps://{}", host); + Ok(SslConnection { + stream, + buffer: BytesMut::with_capacity(BUF_CAP), + }) + } + /// This function will write a query to the stream and read the response from the + /// server. It will then determine if the returned response is complete or incomplete + /// or invalid. + /// + /// - If it is complete, then the return is parsed into a `Display`able form + /// and written to the output stream. If any parsing errors occur, they're also handled + /// by this function (usually, "Invalid Response" is written to the terminal). + /// - If the packet is incomplete, it will wait to read the entire response from the stream + /// - If the packet is corrupted, it will output "Invalid Response" + pub async fn run_query(&mut self, query: String) { + let query = terrapipe::proc_query(query); + println!("Processed query and now connected!"); + match self.stream.write_all(&query).await { + Ok(_) => (), + Err(e) => { + eprintln!("ERROR: Couldn't write data to socket with '{}'", e); + return; + } + }; + loop { + match self.stream.read_buf(&mut self.buffer).await { + Ok(_) => (), + Err(e) => { + eprintln!("ERROR: {}", e); + return; + } + } + println!( + "The server returned: {}", + String::from_utf8_lossy(&self.buffer) + ); match self.try_response().await { ClientResult::Empty(f) => { self.buffer.advance(f); diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index 5e9c0f4a..a68fcfc7 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -197,9 +197,6 @@ impl SslOpts { pub const fn new(key: String, chain: String, port: u16) -> Self { SslOpts { key, chain, port } } - pub fn get_host_port_tuple(&self, host: String) -> impl ToSocketAddrs { - ((host), self.port) - } } #[derive(Debug, PartialEq)] diff --git a/server/src/protocol/tls.rs b/server/src/protocol/tls.rs index e3e2bf57..f75736a8 100644 --- a/server/src/protocol/tls.rs +++ b/server/src/protocol/tls.rs @@ -86,6 +86,7 @@ impl SslListener { }) } async fn accept(&mut self) -> TResult> { + println!("Received connection"); let mut backoff = 1; loop { match self.listener.accept().await { @@ -151,6 +152,7 @@ impl SslConnectionHandler { }; match try_df { Ok(QueryResult::Q(s)) => { + println!("Query: {:?}", s); self.db .execute_query(s, &mut Con::init_secure(&mut self.con)) .await? diff --git a/server/src/resp/mod.rs b/server/src/resp/mod.rs index ff6b043b..005d398b 100644 --- a/server/src/resp/mod.rs +++ b/server/src/resp/mod.rs @@ -21,8 +21,6 @@ //! Utilities for generating responses, which are only used by the `server` //! -use crate::protocol::tls::SslConnection; -use crate::protocol::Connection; use bytes::Bytes; use libtdb::terrapipe::RespCodes; use std::error::Error;