From c2a20d4476dd5b1f5de173a36018f383cb009897 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sun, 16 May 2021 20:31:05 +0530 Subject: [PATCH 1/3] Implement serialize/deserialize for `HTable` --- server/src/coredb/htable.rs | 98 ++++++++++++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/server/src/coredb/htable.rs b/server/src/coredb/htable.rs index f71af8dc..1034b967 100644 --- a/server/src/coredb/htable.rs +++ b/server/src/coredb/htable.rs @@ -24,16 +24,18 @@ * */ +use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::borrow::Borrow; pub use std::collections::hash_map::Entry; use std::collections::hash_map::Keys; use std::collections::hash_map::Values; use std::collections::HashMap; +use std::fmt; use std::hash::Hash; use std::iter::FromIterator; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct HTable where K: Eq + Hash, @@ -112,3 +114,97 @@ where } } } + +/// A wrapper for `Bytes` +#[derive(Debug, PartialEq, Clone)] +pub struct Data { + /// The blob of data + blob: Bytes, +} + +impl Data { + /// Create a new blob from a string + pub fn from_string(val: String) -> Self { + Data { + blob: Bytes::from(val.into_bytes()), + } + } + /// Create a new blob from an existing `Bytes` instance + pub const fn from_blob(blob: Bytes) -> Self { + Data { blob } + } + /// Get the inner blob (raw `Bytes`) + pub const fn get_blob(&self) -> &Bytes { + &self.blob + } + /// Get the inner blob as an `u8` slice (coerced) + pub fn get_inner_ref(&self) -> &[u8] { + &self.blob + } +} + +impl Eq for Data {} +impl Hash for Data { + fn hash(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { + self.blob.hash(hasher) + } +} + +use serde::ser::{SerializeSeq, Serializer}; + +impl Serialize for Data { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.blob.len()))?; + for e in self.blob.iter() { + seq.serialize_element(e)?; + } + seq.end() + } +} + +use serde::de::{Deserializer, SeqAccess, Visitor}; + +struct DataVisitor; +impl<'de> Visitor<'de> for DataVisitor { + type Value = Data; + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting a coredb::htable::Data object") + } + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let mut bytes = Vec::new(); + while let Some(unsigned_8bit_int) = seq.next_element()? { + bytes.push(unsigned_8bit_int); + } + Ok(Data::from_blob(Bytes::from(bytes))) + } +} + +impl<'de> Deserialize<'de> for Data { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_seq(DataVisitor) + } +} + +#[test] +fn test_de() { + let mut x: HTable = HTable::new(); + x.insert( + String::from("Sayan"), + Data::from_string("is writing open-source code".to_owned()), + ); + let ser = bincode::serialize(&x).unwrap(); + let de: HTable = bincode::deserialize(&ser).unwrap(); + assert_eq!(de, x); +} From 975e953426593e0c97e80e0a158cae61e6950a96 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sun, 16 May 2021 21:13:56 +0530 Subject: [PATCH 2/3] Add compat module for upgrading old files --- server/src/cli.yml | 3 + server/src/compat/mod.rs | 130 +++++++++++++++++++++++++++++++ server/src/config/mod.rs | 11 +++ server/src/coredb/htable.rs | 11 +++ server/src/coredb/mod.rs | 30 +------ server/src/diskstore/mod.rs | 84 ++------------------ server/src/diskstore/snapshot.rs | 3 +- server/src/main.rs | 1 + 8 files changed, 163 insertions(+), 110 deletions(-) create mode 100644 server/src/compat/mod.rs diff --git a/server/src/cli.yml b/server/src/cli.yml index 107c652c..2b2b981d 100644 --- a/server/src/cli.yml +++ b/server/src/cli.yml @@ -79,3 +79,6 @@ args: long: sslonly takes_value: false help: Tells the server to only accept SSL connections and disables the non-SSL port +subcommands: + - upgrade: + about: Upgrades old datsets to the latest format supported by this server edition \ No newline at end of file diff --git a/server/src/compat/mod.rs b/server/src/compat/mod.rs new file mode 100644 index 00000000..ef1e6bf1 --- /dev/null +++ b/server/src/compat/mod.rs @@ -0,0 +1,130 @@ +/* + * Created on Sun May 16 2021 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2021, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +//! Compatibility suite for Skytable +//! +//! This module will enable users from an earlier version of Skytable to migrate their data to match +//! the latest format + +use crate::coredb::{htable::HTable, Data}; +use crate::diskstore::snapshot::SNAP_MATCH; +use bytes::Bytes; +use libsky::TResult; +use std::collections::HashMap; +use std::fs; +use std::io::Write; +use std::iter::FromIterator; +use std::path::PathBuf; + +/// The disk storage type since 0.3.1 +type DiskStoreType = (Vec, Vec>); + +const SKY_UPGRADE_FOLDER: &str = "newdata"; +const SKY_COMPLETE_UPGRADE_FOLDER: &str = "newdata/snapshots/remote"; + +pub fn concat_path(other: impl Into) -> PathBuf { + let mut path = PathBuf::from(SKY_UPGRADE_FOLDER); + path.push(other.into()); + path +} + +pub fn upgrade() -> TResult<()> { + fs::create_dir_all(SKY_COMPLETE_UPGRADE_FOLDER)?; + // first attempt to upgrade the data file + log::info!("Upgrading data file"); + upgrade_file("data/data.bin", concat_path("data.bin")) + .map_err(|e| format!("Failed to upgrade data.bin file with error: {}", e))?; + log::info!("Finished upgrading data file"); + // now let's check what files are there in the snapshots directory + log::info!("Upgrading snapshots"); + let snapshot_dir = fs::read_dir("data/snapshots")?; + for path in snapshot_dir { + let path = path?.path(); + if path.is_dir() && path != PathBuf::from("data/snapshots/remote") { + return Err("The snapshot directory contains unrecognized files".into()); + } + if path.is_file() { + let fname = path + .file_name() + .ok_or("Failed to get path name in snapshot directory")? + .to_string_lossy(); + if !SNAP_MATCH.is_match(&fname) { + return Err("The snapshot directory contains unexpected files".into()); + } + upgrade_file(path.clone(), concat_path(format!("snapshots/{}", fname)))?; + } + } + log::info!("Finished upgrading snapshots"); + log::info!("Upgrading remote snapshots"); + let remote_snapshot_dir = fs::read_dir("data/snapshots/remote")?; + for path in remote_snapshot_dir { + let path = path?.path(); + if path.is_file() { + let fname = path + .file_name() + .ok_or("Failed to get filename in remote snapshot directory")? + .to_string_lossy(); + upgrade_file( + path.clone(), + concat_path(format!("snapshots/remote/{}", fname)), + )?; + } else { + return Err("Unexpected files in the remote snapshot directory".into()); + } + } + log::info!("Finished upgrading remote snapshots"); + log::info!("All files were upgraded. Updating directories"); + fs::rename("data", "olddata")?; + log::info!("Moved old data into folder 'olddata'"); + fs::rename(SKY_UPGRADE_FOLDER, "data")?; + log::info!("Successfully finished upgrade"); + Ok(()) +} + +fn upgrade_file(src: impl Into, destination: impl Into) -> TResult<()> { + let file = src.into(); + log::info!("Upgrading file: {}", file.to_string_lossy()); + let old_data_file = fs::read(&file)?; + let data_from_old_file: DiskStoreType = bincode::deserialize(&old_data_file)?; + let data_from_old_file: HashMap = HashMap::from_iter( + data_from_old_file + .0 + .into_iter() + .zip(data_from_old_file.1.into_iter()) + .map(|(key, value)| (key, Data::from_blob(Bytes::from(value)))), + ); + let data_in_new_format: HTable = HTable::from_iter( + data_from_old_file + .into_iter() + .map(|(key, value)| (Data::from_string(key), value)), + ); + let data_in_new_format = bincode::serialize(&data_in_new_format)?; + let destination = destination.into(); + let mut file = fs::File::create(&destination)?; + log::info!("Writing upgraded file to {}", destination.to_string_lossy()); + file.write_all(&data_in_new_format)?; + Ok(()) +} diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index f7207167..090f454c 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -26,6 +26,7 @@ //! This module provides tools to handle configuration files and settings +use crate::compat; #[cfg(test)] use libsky::TResult; use serde::Deserialize; @@ -35,6 +36,7 @@ use std::fs; #[cfg(test)] use std::net::Ipv6Addr; use std::net::{IpAddr, Ipv4Addr}; +use std::process; use toml; const DEFAULT_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); @@ -408,6 +410,15 @@ impl fmt::Display for ConfigError { pub fn get_config_file_or_return_cfg() -> Result, ConfigError> { let cfg_layout = load_yaml!("../cli.yml"); let matches = App::from_yaml(cfg_layout).get_matches(); + // check upgrades + if matches.subcommand_matches("upgrade").is_some() { + if let Err(e) = compat::upgrade() { + log::error!("Dataset upgrade failed with error: {}", e); + process::exit(0x100); + } else { + process::exit(0x000); + } + } let restorefile = matches.value_of("restore").map(|v| v.to_string()); // Check flags let sslonly = matches.is_present("sslonly"); diff --git a/server/src/coredb/htable.rs b/server/src/coredb/htable.rs index 1034b967..ad67d332 100644 --- a/server/src/coredb/htable.rs +++ b/server/src/coredb/htable.rs @@ -153,6 +153,17 @@ impl Hash for Data { } } +impl From for Data +where + T: Into, +{ + fn from(dat: T) -> Self { + Self { + blob: Bytes::from(dat.into()), + } + } +} + use serde::ser::{SerializeSeq, Serializer}; impl Serialize for Data { diff --git a/server/src/coredb/mod.rs b/server/src/coredb/mod.rs index 65a52361..e5f8e7df 100644 --- a/server/src/coredb/mod.rs +++ b/server/src/coredb/mod.rs @@ -33,7 +33,7 @@ use crate::dbnet::connection::prelude::*; use crate::diskstore; use crate::protocol::Query; use crate::queryengine; -use bytes::Bytes; +pub use htable::Data; use libsky::TResult; use parking_lot::RwLock; use parking_lot::RwLockReadGuard; @@ -126,34 +126,6 @@ impl Coretable { } } -/// A wrapper for `Bytes` -#[derive(Debug, PartialEq, Clone)] -pub struct Data { - /// The blob of data - blob: Bytes, -} - -impl Data { - /// Create a new blob from a string - pub fn from_string(val: String) -> Self { - Data { - blob: Bytes::from(val.into_bytes()), - } - } - /// Create a new blob from an existing `Bytes` instance - pub const fn from_blob(blob: Bytes) -> Self { - Data { blob } - } - /// Get the inner blob (raw `Bytes`) - pub const fn get_blob(&self) -> &Bytes { - &self.blob - } - /// Get the inner blob as an `u8` slice (coerced) - pub fn get_inner_ref(&self) -> &[u8] { - &self.blob - } -} - impl CoreDB { #[cfg(debug_assertions)] #[allow(dead_code)] // This has been kept for debugging purposes, so we'll suppress this lint diff --git a/server/src/diskstore/mod.rs b/server/src/diskstore/mod.rs index a8c98a5e..8c3d7d49 100644 --- a/server/src/diskstore/mod.rs +++ b/server/src/diskstore/mod.rs @@ -28,26 +28,18 @@ use crate::coredb::htable::HTable; use crate::coredb::Data; -use crate::diskstore::snapshot::{DIR_OLD_SNAPSHOT, DIR_SNAPSHOT}; +use crate::diskstore::snapshot::DIR_SNAPSHOT; use bincode; -use bytes::Bytes; use libsky::TResult; use std::fs; use std::io::{ErrorKind, Write}; -use std::iter::FromIterator; use std::path::PathBuf; pub mod flock; pub mod snapshot; mod snapstore; -/// This type alias is to be used when deserializing binary data from disk -type DiskStoreFromDisk = (Vec, Vec>); -/// This type alias is to be used when serializing data from the in-memory table -/// onto disk -type DiskStoreFromMemory<'a> = (Vec<&'a String>, Vec<&'a [u8]>); lazy_static::lazy_static! { pub static ref PERSIST_FILE: PathBuf = PathBuf::from("./data/data.bin"); - pub static ref OLD_PATH: PathBuf = PathBuf::from("./data.bin"); } fn get_snapshot(path: String) -> TResult>> { @@ -56,33 +48,7 @@ fn get_snapshot(path: String) -> TResult>> { snap_location.push(&path); let file = match fs::read(snap_location) { Ok(f) => f, - Err(e) => match e.kind() { - ErrorKind::NotFound => { - // Probably the old snapshot directory? - let mut old_snaploc = PathBuf::from(DIR_OLD_SNAPSHOT); - old_snaploc.push(path); - match fs::read(old_snaploc) { - Ok(f) => { - log::warn!("The new snapshot directory is under the data directory"); - if let Err(e) = fs::rename(DIR_OLD_SNAPSHOT, DIR_SNAPSHOT) { - log::error!( - "Failed to migrate snapshot directory into new structure: {}", - e - ); - return Err(e.into()); - } else { - log::info!( - "Migrated old snapshot directory structure to newer structure" - ); - log::warn!("This backwards compat will be removed in the future"); - } - f - } - _ => return Err(e.into()), - } - } - _ => return Err(e.into()), - }, + Err(e) => return Err(e.into()), }; let parsed = deserialize(file)?; Ok(Some(parsed)) @@ -98,33 +64,7 @@ pub fn get_saved(path: Option) -> TResult>> Ok(f) => f, Err(e) => match e.kind() { ErrorKind::NotFound => { - // TODO(@ohsayan): Drop support for this in the future - // This might be an old installation still not using the data/data.bin path - match fs::read(OLD_PATH.to_path_buf()) { - Ok(f) => { - log::warn!("Your data file was found to be in the current directory and not in data/data.bin"); - if let Err(e) = fs::rename("data.bin", "data/data.bin") { - log::error!("Failed to move data.bin into data/data.bin directory. Consider moving it manually"); - return Err(format!( - "Failed to move data.bin into data/data.bin: {}", - e - ) - .into()); - } else { - log::info!("The data file has been moved into the new directory"); - log::warn!("This backwards compat directory support will be removed in the future"); - } - f - } - Err(e) => match e.kind() { - ErrorKind::NotFound => return Ok(None), - _ => { - return Err( - format!("Coudln't read flushed data from disk: {}", e).into() - ) - } - }, - } + return Ok(None); } _ => return Err(format!("Couldn't read flushed data from disk: {}", e).into()), }, @@ -139,17 +79,7 @@ pub fn test_deserialize(file: Vec) -> TResult> { deserialize(file) } fn deserialize(file: Vec) -> TResult> { - let parsed: DiskStoreFromDisk = bincode::deserialize(&file)?; - let parsed: HTable = HTable::from_iter( - parsed - .0 - .into_iter() - .zip(parsed.1.into_iter()) - .map(|(key, value)| { - let data = Data::from_blob(Bytes::from(value)); - (key, data) - }), - ); + let parsed = bincode::deserialize(&file)?; Ok(parsed) } @@ -171,10 +101,6 @@ pub fn write_to_disk(file: &PathBuf, data: &HTable) -> TResult<()> } fn serialize(data: &HTable) -> TResult> { - let ds: DiskStoreFromMemory = ( - data.keys().into_iter().collect(), - data.values().map(|val| val.get_inner_ref()).collect(), - ); - let encoded = bincode::serialize(&ds)?; + let encoded = bincode::serialize(&data)?; Ok(encoded) } diff --git a/server/src/diskstore/snapshot.rs b/server/src/diskstore/snapshot.rs index 9c02fc0b..750fa1c4 100644 --- a/server/src/diskstore/snapshot.rs +++ b/server/src/diskstore/snapshot.rs @@ -44,7 +44,7 @@ lazy_static::lazy_static! { /// ```text /// YYYYMMDD-HHMMSS.snapshot /// ``` - static ref SNAP_MATCH: Regex = Regex::new("^\\d{4}(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01])(-)(?:(?:([01]?\\d|2[0-3]))?([0-5]?\\d))?([0-5]?\\d)(.snapshot)$").unwrap(); + pub static ref SNAP_MATCH: Regex = Regex::new("^\\d{4}(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01])(-)(?:(?:([01]?\\d|2[0-3]))?([0-5]?\\d))?([0-5]?\\d)(.snapshot)$").unwrap(); /// The directory for remote snapshots pub static ref DIR_REMOTE_SNAPSHOT: PathBuf = PathBuf::from("./data/snapshots/remote"); } @@ -53,7 +53,6 @@ lazy_static::lazy_static! { /// /// This is currently a `snapshot` directory under the current directory pub const DIR_SNAPSHOT: &'static str = "data/snapshots"; -pub const DIR_OLD_SNAPSHOT: &'static str = "snapshots"; /// The default snapshot count is 12, assuming that the user would take a snapshot /// every 2 hours (or 7200 seconds) const DEF_SNAPSHOT_COUNT: usize = 12; diff --git a/server/src/main.rs b/server/src/main.rs index a33c8291..5fb8c9cd 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -46,6 +46,7 @@ mod queryengine; mod resp; use coredb::CoreDB; use dbnet::run; +mod compat; use env_logger::*; use libsky::util::terminal; use std::sync::Arc; From 449da56308f239e33c044e03bd852a1fd469049b Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Mon, 17 May 2021 08:34:15 +0530 Subject: [PATCH 3/3] Upgrade all interfaces to use new in-memory table --- server/src/coredb/htable.rs | 24 +++++++++++++++++++++++- server/src/coredb/mod.rs | 10 +++++----- server/src/diskstore/mod.rs | 14 +++++++------- server/src/diskstore/snapshot.rs | 2 +- server/src/kvengine/del.rs | 2 +- server/src/kvengine/exists.rs | 2 +- server/src/kvengine/get.rs | 2 +- server/src/kvengine/keylen.rs | 2 +- server/src/kvengine/mget.rs | 2 +- server/src/kvengine/mset.rs | 3 ++- server/src/kvengine/mupdate.rs | 3 ++- server/src/kvengine/set.rs | 12 +++++++----- server/src/kvengine/strong.rs | 18 ++++++++++++------ server/src/kvengine/update.rs | 12 +++++++----- server/src/kvengine/uset.rs | 4 ++-- server/src/tests/mod.rs | 18 +++++++++++------- 16 files changed, 84 insertions(+), 46 deletions(-) diff --git a/server/src/coredb/htable.rs b/server/src/coredb/htable.rs index ad67d332..d2e0f7ca 100644 --- a/server/src/coredb/htable.rs +++ b/server/src/coredb/htable.rs @@ -34,6 +34,7 @@ use std::collections::HashMap; use std::fmt; use std::hash::Hash; use std::iter::FromIterator; +use std::ops::Deref; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct HTable @@ -92,7 +93,6 @@ where self.inner.values() } } - impl IntoIterator for HTable { type Item = (K, V); type IntoIter = std::collections::hash_map::IntoIter; @@ -101,6 +101,25 @@ impl IntoIterator for HTable { } } +impl Deref for Data { + type Target = [u8]; + fn deref(&self) -> &::Target { + &self.blob + } +} + +impl Borrow<[u8]> for Data { + fn borrow(&self) -> &[u8] { + &self.blob.borrow() + } +} + +impl AsRef<[u8]> for Data { + fn as_ref(&self) -> &[u8] { + &self.blob + } +} + impl FromIterator<(K, V)> for HTable where K: Eq + Hash, @@ -218,4 +237,7 @@ fn test_de() { let ser = bincode::serialize(&x).unwrap(); let de: HTable = bincode::deserialize(&ser).unwrap(); assert_eq!(de, x); + let mut hmap: HTable = HTable::new(); + hmap.insert(Data::from("sayan"), Data::from("writes code")); + assert!(hmap.get("sayan".as_bytes()).is_some()); } diff --git a/server/src/coredb/mod.rs b/server/src/coredb/mod.rs index e5f8e7df..7c309839 100644 --- a/server/src/coredb/mod.rs +++ b/server/src/coredb/mod.rs @@ -107,7 +107,7 @@ pub struct Shared { #[derive(Debug)] pub struct Coretable { /// The core table contain key-value pairs - coremap: HTable, + coremap: HTable, /// Whether the database is poisoned or not /// /// If the database is poisoned -> the database can no longer accept writes @@ -117,11 +117,11 @@ pub struct Coretable { impl Coretable { /// Get a reference to the inner `HTable` - pub const fn get_ref<'a>(&'a self) -> &'a HTable { + pub const fn get_ref<'a>(&'a self) -> &'a HTable { &self.coremap } /// Get a **mutable** reference to the inner `HTable` - pub fn get_mut_ref<'a>(&'a mut self) -> &'a mut HTable { + pub fn get_mut_ref<'a>(&'a mut self) -> &'a mut HTable { &mut self.coremap } } @@ -217,7 +217,7 @@ impl CoreDB { CoreDB { shared: Arc::new(Shared { table: RwLock::new(Coretable { - coremap: HTable::::new(), + coremap: HTable::::new(), poisoned: false, }), }), @@ -248,7 +248,7 @@ impl CoreDB { /// **⚠ Do note**: This is super inefficient since it performs an actual /// clone of the `HTable` and doesn't do any `Arc`-business! This function /// can be used by test functions and the server, but **use with caution!** - pub fn get_htable_deep_clone(&self) -> HTable { + pub fn get_htable_deep_clone(&self) -> HTable { (*self.acquire_read().get_ref()).clone() } } diff --git a/server/src/diskstore/mod.rs b/server/src/diskstore/mod.rs index 8c3d7d49..a698ac8b 100644 --- a/server/src/diskstore/mod.rs +++ b/server/src/diskstore/mod.rs @@ -42,7 +42,7 @@ lazy_static::lazy_static! { pub static ref PERSIST_FILE: PathBuf = PathBuf::from("./data/data.bin"); } -fn get_snapshot(path: String) -> TResult>> { +fn get_snapshot(path: String) -> TResult>> { // the path just has the snapshot name, let's improve that let mut snap_location = PathBuf::from(DIR_SNAPSHOT); snap_location.push(&path); @@ -56,7 +56,7 @@ fn get_snapshot(path: String) -> TResult>> { /// Try to get the saved data from disk. This returns `None`, if the `data/data.bin` wasn't found /// otherwise the `data/data.bin` file is deserialized and parsed into a `HTable` -pub fn get_saved(path: Option) -> TResult>> { +pub fn get_saved(path: Option) -> TResult>> { if let Some(path) = path { get_snapshot(path) } else { @@ -75,10 +75,10 @@ pub fn get_saved(path: Option) -> TResult>> } #[cfg(test)] -pub fn test_deserialize(file: Vec) -> TResult> { +pub fn test_deserialize(file: Vec) -> TResult> { deserialize(file) } -fn deserialize(file: Vec) -> TResult> { +fn deserialize(file: Vec) -> TResult> { let parsed = bincode::deserialize(&file)?; Ok(parsed) } @@ -87,20 +87,20 @@ fn deserialize(file: Vec) -> TResult> { /// /// This functions takes the entire in-memory table and writes it to the disk, /// more specifically, the `data/data.bin` file -pub fn flush_data(file: &mut flock::FileLock, data: &HTable) -> TResult<()> { +pub fn flush_data(file: &mut flock::FileLock, data: &HTable) -> TResult<()> { let encoded = serialize(&data)?; file.write(&encoded)?; Ok(()) } -pub fn write_to_disk(file: &PathBuf, data: &HTable) -> TResult<()> { +pub fn write_to_disk(file: &PathBuf, data: &HTable) -> TResult<()> { let mut file = fs::File::create(&file)?; let encoded = serialize(&data)?; file.write_all(&encoded)?; Ok(()) } -fn serialize(data: &HTable) -> TResult> { +fn serialize(data: &HTable) -> TResult> { let encoded = bincode::serialize(&data)?; Ok(encoded) } diff --git a/server/src/diskstore/snapshot.rs b/server/src/diskstore/snapshot.rs index 750fa1c4..b89ca929 100644 --- a/server/src/diskstore/snapshot.rs +++ b/server/src/diskstore/snapshot.rs @@ -336,7 +336,7 @@ fn test_snapshot() { let db = CoreDB::new_empty(std::sync::Arc::new(Some(SnapshotStatus::new(4)))); let mut write = db.acquire_write().unwrap(); let _ = write.get_mut_ref().insert( - String::from("ohhey"), + crate::coredb::Data::from(String::from("ohhey")), crate::coredb::Data::from_string(String::from("heya!")), ); drop(write); diff --git a/server/src/kvengine/del.rs b/server/src/kvengine/del.rs index a2675360..325078bd 100644 --- a/server/src/kvengine/del.rs +++ b/server/src/kvengine/del.rs @@ -50,7 +50,7 @@ where let mut many = 0; let cmap = (*whandle).get_mut_ref(); act.into_iter().skip(1).for_each(|key| { - if cmap.remove(&key).is_some() { + if cmap.remove(key.as_bytes()).is_some() { many += 1 } }); diff --git a/server/src/kvengine/exists.rs b/server/src/kvengine/exists.rs index 18ef591e..cfd1e37b 100644 --- a/server/src/kvengine/exists.rs +++ b/server/src/kvengine/exists.rs @@ -45,7 +45,7 @@ where let rhandle = handle.acquire_read(); let cmap = rhandle.get_ref(); act.into_iter().skip(1).for_each(|key| { - if cmap.contains_key(&key) { + if cmap.contains_key(key.as_bytes()) { how_many_of_them_exist += 1; } }); diff --git a/server/src/kvengine/get.rs b/server/src/kvengine/get.rs index c1667671..3d2769b5 100644 --- a/server/src/kvengine/get.rs +++ b/server/src/kvengine/get.rs @@ -50,7 +50,7 @@ where // UNSAFE(@ohsayan): act.get_ref().get_unchecked() is safe because we've already if the action // group contains one argument (excluding the action itself) reader - .get(act.get_unchecked(1)) + .get(act.get_unchecked(1).as_bytes()) .map(|b| b.get_blob().clone()) } }; diff --git a/server/src/kvengine/keylen.rs b/server/src/kvengine/keylen.rs index 5b627bce..e16b4aca 100644 --- a/server/src/kvengine/keylen.rs +++ b/server/src/kvengine/keylen.rs @@ -47,7 +47,7 @@ where // UNSAFE(@ohsayan): get_unchecked() is completely safe as we've already checked // the number of arguments is one reader - .get(act.get_unchecked(1)) + .get(act.get_unchecked(1).as_bytes()) .map(|b| b.get_blob().len()) } }; diff --git a/server/src/kvengine/mget.rs b/server/src/kvengine/mget.rs index e563f68e..3855222f 100644 --- a/server/src/kvengine/mget.rs +++ b/server/src/kvengine/mget.rs @@ -47,7 +47,7 @@ where let res: Option = { let rhandle = handle.acquire_read(); let reader = rhandle.get_ref(); - reader.get(&key).map(|b| b.get_blob().clone()) + reader.get(key.as_bytes()).map(|b| b.get_blob().clone()) }; if let Some(value) = res { // Good, we got the value, write it off to the stream diff --git a/server/src/kvengine/mset.rs b/server/src/kvengine/mset.rs index aa0af001..508cfc36 100644 --- a/server/src/kvengine/mset.rs +++ b/server/src/kvengine/mset.rs @@ -26,6 +26,7 @@ use crate::coredb; use crate::coredb::htable::Entry; +use crate::coredb::Data; use crate::dbnet::connection::prelude::*; use crate::protocol::responses; @@ -53,7 +54,7 @@ where let writer = whandle.get_mut_ref(); let mut didmany = 0; while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) { - if let Entry::Vacant(v) = writer.entry(key) { + if let Entry::Vacant(v) = writer.entry(Data::from(key)) { let _ = v.insert(coredb::Data::from_string(val)); didmany += 1; } diff --git a/server/src/kvengine/mupdate.rs b/server/src/kvengine/mupdate.rs index d2018db9..d7f3a999 100644 --- a/server/src/kvengine/mupdate.rs +++ b/server/src/kvengine/mupdate.rs @@ -26,6 +26,7 @@ use crate::coredb; use crate::coredb::htable::Entry; +use crate::coredb::Data; use crate::dbnet::connection::prelude::*; use crate::protocol::responses; @@ -53,7 +54,7 @@ where let writer = whandle.get_mut_ref(); let mut didmany = 0; while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) { - if let Entry::Occupied(mut v) = writer.entry(key) { + if let Entry::Occupied(mut v) = writer.entry(Data::from(key)) { let _ = v.insert(coredb::Data::from_string(val)); didmany += 1; } diff --git a/server/src/kvengine/set.rs b/server/src/kvengine/set.rs index 20efc8f8..ee9485db 100644 --- a/server/src/kvengine/set.rs +++ b/server/src/kvengine/set.rs @@ -53,11 +53,13 @@ where let did_we = { if let Some(mut writer) = handle.acquire_write() { let writer = writer.get_mut_ref(); - if let Entry::Vacant(e) = writer.entry(it.next().unwrap_or_else(|| unsafe { - // UNSAFE(@ohsayan): This is completely safe as we've already checked - // that there are exactly 2 arguments - unreachable_unchecked() - })) { + if let Entry::Vacant(e) = + writer.entry(Data::from(it.next().unwrap_or_else(|| unsafe { + // UNSAFE(@ohsayan): This is completely safe as we've already checked + // that there are exactly 2 arguments + unreachable_unchecked() + }))) + { e.insert(Data::from_string(it.next().unwrap_or_else(|| unsafe { // UNSAFE(@ohsayan): This is completely safe as we've already checked // that there are exactly 2 arguments diff --git a/server/src/kvengine/strong.rs b/server/src/kvengine/strong.rs index f9a058fb..080e1591 100644 --- a/server/src/kvengine/strong.rs +++ b/server/src/kvengine/strong.rs @@ -76,7 +76,7 @@ where if let Some(mut whandle) = handle.acquire_write() { let mut_table = whandle.get_mut_ref(); while let Some(key) = key_iter.next() { - if mut_table.contains_key(key.as_str()) { + if mut_table.contains_key(key.as_bytes()) { // With one of the keys existing - this action can't clearly be done // So we'll set `failed` to true and ensure that we check this while // writing a response back to the client @@ -92,7 +92,10 @@ where // So we can safely set the keys let mut iter = act.into_iter().skip(1); while let (Some(key), Some(value)) = (iter.next(), iter.next()) { - if mut_table.insert(key, Data::from_string(value)).is_some() { + if mut_table + .insert(Data::from(key), Data::from_string(value)) + .is_some() + { // Tell the compiler that this will never be the case unsafe { // UNSAFE(@ohsayan): As none of the keys exist in the table, no @@ -151,7 +154,7 @@ where if let Some(mut whandle) = handle.acquire_write() { let mut_table = whandle.get_mut_ref(); while let Some(key) = key_iter.next() { - if !mut_table.contains_key(key.as_str()) { + if !mut_table.contains_key(key.as_bytes()) { // With one of the keys not existing - this action can't clearly be done // So we'll set `failed` to true and ensure that we check this while // writing a response back to the client @@ -169,7 +172,7 @@ where act.into_iter().skip(1).for_each(|key| { // Since we've already checked that the keys don't exist // We'll tell the compiler to optimize this - let _ = mut_table.remove(&key).unwrap_or_else(|| unsafe { + let _ = mut_table.remove(key.as_bytes()).unwrap_or_else(|| unsafe { // UNSAFE(@ohsayan): Since all the values exist, all of them will return // some value. Hence, this branch won't ever be reached. Hence, this is safe. unreachable_unchecked() @@ -223,7 +226,7 @@ where if let Some(mut whandle) = handle.acquire_write() { let mut_table = whandle.get_mut_ref(); while let Some(key) = key_iter.next() { - if !mut_table.contains_key(key.as_str()) { + if !mut_table.contains_key(key.as_bytes()) { // With one of the keys failing to exist - this action can't clearly be done // So we'll set `failed` to true and ensure that we check this while // writing a response back to the client @@ -245,7 +248,10 @@ where // So we can safely update the keys let mut iter = act.into_iter().skip(1); while let (Some(key), Some(value)) = (iter.next(), iter.next()) { - if mut_table.insert(key, Data::from_string(value)).is_none() { + if mut_table + .insert(Data::from(key), Data::from_string(value)) + .is_none() + { // Tell the compiler that this will never be the case unsafe { unreachable_unchecked() } } diff --git a/server/src/kvengine/update.rs b/server/src/kvengine/update.rs index 3149e4c5..02e46a5e 100644 --- a/server/src/kvengine/update.rs +++ b/server/src/kvengine/update.rs @@ -53,11 +53,13 @@ where let did_we = { if let Some(mut whandle) = handle.acquire_write() { let writer = whandle.get_mut_ref(); - if let Entry::Occupied(mut e) = writer.entry(it.next().unwrap_or_else(|| unsafe { - // UNSAFE(@ohsayan): We've already checked that the action contains exactly - // two arguments (excluding the action itself). So, this branch won't ever be reached - unreachable_unchecked() - })) { + if let Entry::Occupied(mut e) = + writer.entry(Data::from(it.next().unwrap_or_else(|| unsafe { + // UNSAFE(@ohsayan): We've already checked that the action contains exactly + // two arguments (excluding the action itself). So, this branch won't ever be reached + unreachable_unchecked() + }))) + { e.insert(Data::from_string(it.next().unwrap_or_else(|| unsafe { // UNSAFE(@ohsayan): We've already checked that the action contains exactly // two arguments (excluding the action itself). So, this branch won't ever be reached diff --git a/server/src/kvengine/uset.rs b/server/src/kvengine/uset.rs index 75fe1c90..6f6f737a 100644 --- a/server/src/kvengine/uset.rs +++ b/server/src/kvengine/uset.rs @@ -24,7 +24,7 @@ * */ -use crate::coredb::{self}; +use crate::coredb::Data; use crate::dbnet::connection::prelude::*; use crate::protocol::responses; @@ -52,7 +52,7 @@ where if let Some(mut whandle) = handle.acquire_write() { let writer = whandle.get_mut_ref(); while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) { - let _ = writer.insert(key, coredb::Data::from_string(val)); + let _ = writer.insert(Data::from(key), Data::from(val)); } drop(writer); drop(whandle); diff --git a/server/src/tests/mod.rs b/server/src/tests/mod.rs index a91dc1a5..efb10f0e 100644 --- a/server/src/tests/mod.rs +++ b/server/src/tests/mod.rs @@ -44,7 +44,7 @@ mod bgsave { // pre-initialize our maps for comparison let mut map_should_be_with_one = HTable::new(); map_should_be_with_one.insert( - String::from("sayan"), + Data::from(String::from("sayan")), Data::from_string("is testing bgsave".to_owned()), ); #[allow(non_snake_case)] @@ -60,19 +60,21 @@ mod bgsave { // sleep for 10 seconds with epsilon 1.5s time::sleep(DUR_WITH_EPSILON).await; // we should get an empty map - let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap(); + let saved = + diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap(); assert!(saved.len() == 0); // now let's quickly write some data { datahandle.acquire_write().unwrap().get_mut_ref().insert( - String::from("sayan"), - Data::from_string("is testing bgsave".to_owned()), + Data::from(String::from("sayan")), + Data::from("is testing bgsave".to_owned()), ); } // sleep for 10 seconds with epsilon 1.5s time::sleep(DUR_WITH_EPSILON).await; // we should get a map with the one key - let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap(); + let saved = + diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap(); assert_eq!(saved, map_should_be_with_one); // now let's remove all the data { @@ -80,13 +82,15 @@ mod bgsave { } // sleep for 10 seconds with epsilon 1.5s time::sleep(DUR_WITH_EPSILON).await; - let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap(); + let saved = + diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap(); assert!(saved.len() == 0); // drop the signal; all waiting tasks can now terminate drop(signal); handle.await.unwrap(); // check the file again after unlocking - let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap(); + let saved = + diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap(); assert!(saved.len() == 0); fs::remove_file(BGSAVE_DIRECTORY_TESTING_LOC).unwrap(); }