From 57c957d4e7e9d41e08ab8f70fc69e49ffe45ecc7 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Mon, 14 Jun 2021 19:33:46 +0530 Subject: [PATCH] Add `pop` action --- server/src/actions/mod.rs | 1 + server/src/actions/pop.rs | 61 ++++++++++++++++++++++++++++++++++ server/src/coredb/htable.rs | 3 ++ server/src/dbnet/connection.rs | 10 +++--- server/src/queryengine/mod.rs | 3 +- server/src/resp/mod.rs | 9 +++++ 6 files changed, 81 insertions(+), 6 deletions(-) create mode 100644 server/src/actions/pop.rs diff --git a/server/src/actions/mod.rs b/server/src/actions/mod.rs index 40689f7c..6abf1243 100644 --- a/server/src/actions/mod.rs +++ b/server/src/actions/mod.rs @@ -41,6 +41,7 @@ pub mod lskeys; pub mod mget; pub mod mset; pub mod mupdate; +pub mod pop; pub mod set; pub mod strong; pub mod update; diff --git a/server/src/actions/pop.rs b/server/src/actions/pop.rs new file mode 100644 index 00000000..9beba1cc --- /dev/null +++ b/server/src/actions/pop.rs @@ -0,0 +1,61 @@ +/* + * Created on Mon Jun 14 2021 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2021, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crate::coredb; +use crate::dbnet::connection::prelude::*; +use crate::protocol::responses; +use crate::queryengine::ActionIter; +use crate::resp::BytesWrapper; + +/// Run a POP action +pub async fn pop( + handle: &coredb::CoreDB, + con: &mut T, + act: ActionIter, +) -> std::io::Result<()> +where + T: ProtocolConnectionExt, + Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync, +{ + crate::err_if_len_is!(act, con, eq 0); + if handle.is_poisoned() { + // don't begin the operation at all if the database is poisoned + return con.write_response(&**responses::groups::SERVER_ERR).await; + } + con.write_array_length(act.len()).await?; + for key in act { + if handle.is_poisoned() { + // we keep this check just in case the server fails in-between running a + // pop operation + con.write_response(&**responses::groups::SERVER_ERR).await?; + } else if let Some((_key, val)) = handle.get_ref().remove(key.as_bytes()) { + con.write_response(BytesWrapper(val.into_inner())).await?; + } else { + con.write_response(&**responses::groups::NIL).await?; + } + } + Ok(()) +} diff --git a/server/src/coredb/htable.rs b/server/src/coredb/htable.rs index fed49796..7b4b86fc 100644 --- a/server/src/coredb/htable.rs +++ b/server/src/coredb/htable.rs @@ -436,6 +436,9 @@ impl Data { pub const fn get_blob(&self) -> &Bytes { &self.blob } + pub fn into_inner(self) -> Bytes { + self.blob + } } impl Eq for Data {} diff --git a/server/src/dbnet/connection.rs b/server/src/dbnet/connection.rs index e51ae748..b5fa6222 100644 --- a/server/src/dbnet/connection.rs +++ b/server/src/dbnet/connection.rs @@ -190,7 +190,7 @@ where Box::pin(async move { let mv_self = self; let ret: IoResult<()> = { - mv_self.write_response(&SIMPLE_QUERY_HEADER[..]).await?; + mv_self.write_response(SIMPLE_QUERY_HEADER).await?; Ok(()) }; ret @@ -208,9 +208,9 @@ where Box::pin(async move { let mv_self = self; let ret: IoResult<()> = { - mv_self.write_response(&[b'_'][..]).await?; + mv_self.write_response([b'_']).await?; mv_self.write_response(len.to_string().into_bytes()).await?; - mv_self.write_response(&[b'\n'][..]).await?; + mv_self.write_response([b'\n']).await?; Ok(()) }; ret @@ -228,9 +228,9 @@ where Box::pin(async move { let mv_self = self; let ret: IoResult<()> = { - mv_self.write_response(&[b'&'][..]).await?; + mv_self.write_response([b'&']).await?; mv_self.write_response(len.to_string().into_bytes()).await?; - mv_self.write_response(&[b'\n'][..]).await?; + mv_self.write_response([b'\n']).await?; Ok(()) }; ret diff --git a/server/src/queryengine/mod.rs b/server/src/queryengine/mod.rs index 69307aef..eecc929f 100644 --- a/server/src/queryengine/mod.rs +++ b/server/src/queryengine/mod.rs @@ -93,7 +93,8 @@ where USET => actions::uset::uset, KEYLEN => actions::keylen::keylen, MKSNAP => admin::mksnap::mksnap, - LSKEYS => actions::lskeys::lskeys + LSKEYS => actions::lskeys::lskeys, + POP => actions::pop::pop ); Ok(()) } diff --git a/server/src/resp/mod.rs b/server/src/resp/mod.rs index 267bd591..154986ee 100644 --- a/server/src/resp/mod.rs +++ b/server/src/resp/mod.rs @@ -101,6 +101,15 @@ impl Writable for Vec { } } +impl Writable for [u8; N] { + fn write<'s>( + self, + con: &'s mut impl IsConnection, + ) -> Pin> + Send + Sync + 's)>> { + Box::pin(async move { con.write_lowlevel(&self).await }) + } +} + impl Writable for &'static [u8] { fn write<'s>( self,