Add `pop` action

next
Sayan Nandan 3 years ago
parent 8bed8c9c2a
commit 57c957d4e7

@ -41,6 +41,7 @@ pub mod lskeys;
pub mod mget;
pub mod mset;
pub mod mupdate;
pub mod pop;
pub mod set;
pub mod strong;
pub mod update;

@ -0,0 +1,61 @@
/*
* Created on Mon Jun 14 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::coredb;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
use crate::queryengine::ActionIter;
use crate::resp::BytesWrapper;
/// Run a POP action
pub async fn pop<T, Strm>(
handle: &coredb::CoreDB,
con: &mut T,
act: ActionIter,
) -> std::io::Result<()>
where
T: ProtocolConnectionExt<Strm>,
Strm: AsyncReadExt + AsyncWriteExt + Unpin + Send + Sync,
{
crate::err_if_len_is!(act, con, eq 0);
if handle.is_poisoned() {
// don't begin the operation at all if the database is poisoned
return con.write_response(&**responses::groups::SERVER_ERR).await;
}
con.write_array_length(act.len()).await?;
for key in act {
if handle.is_poisoned() {
// we keep this check just in case the server fails in-between running a
// pop operation
con.write_response(&**responses::groups::SERVER_ERR).await?;
} else if let Some((_key, val)) = handle.get_ref().remove(key.as_bytes()) {
con.write_response(BytesWrapper(val.into_inner())).await?;
} else {
con.write_response(&**responses::groups::NIL).await?;
}
}
Ok(())
}

@ -436,6 +436,9 @@ impl Data {
pub const fn get_blob(&self) -> &Bytes {
&self.blob
}
pub fn into_inner(self) -> Bytes {
self.blob
}
}
impl Eq for Data {}

@ -190,7 +190,7 @@ where
Box::pin(async move {
let mv_self = self;
let ret: IoResult<()> = {
mv_self.write_response(&SIMPLE_QUERY_HEADER[..]).await?;
mv_self.write_response(SIMPLE_QUERY_HEADER).await?;
Ok(())
};
ret
@ -208,9 +208,9 @@ where
Box::pin(async move {
let mv_self = self;
let ret: IoResult<()> = {
mv_self.write_response(&[b'_'][..]).await?;
mv_self.write_response([b'_']).await?;
mv_self.write_response(len.to_string().into_bytes()).await?;
mv_self.write_response(&[b'\n'][..]).await?;
mv_self.write_response([b'\n']).await?;
Ok(())
};
ret
@ -228,9 +228,9 @@ where
Box::pin(async move {
let mv_self = self;
let ret: IoResult<()> = {
mv_self.write_response(&[b'&'][..]).await?;
mv_self.write_response([b'&']).await?;
mv_self.write_response(len.to_string().into_bytes()).await?;
mv_self.write_response(&[b'\n'][..]).await?;
mv_self.write_response([b'\n']).await?;
Ok(())
};
ret

@ -93,7 +93,8 @@ where
USET => actions::uset::uset,
KEYLEN => actions::keylen::keylen,
MKSNAP => admin::mksnap::mksnap,
LSKEYS => actions::lskeys::lskeys
LSKEYS => actions::lskeys::lskeys,
POP => actions::pop::pop
);
Ok(())
}

@ -101,6 +101,15 @@ impl Writable for Vec<u8> {
}
}
impl<const N: usize> Writable for [u8; N] {
fn write<'s>(
self,
con: &'s mut impl IsConnection,
) -> Pin<Box<(dyn Future<Output = Result<(), IoError>> + Send + Sync + 's)>> {
Box::pin(async move { con.write_lowlevel(&self).await })
}
}
impl Writable for &'static [u8] {
fn write<'s>(
self,

Loading…
Cancel
Save