Merge pull request #154 from skytable/htablesd

Implement ser/de for `HTable` and add compat module
next
Sayan 3 years ago committed by GitHub
commit c764459d2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -79,3 +79,6 @@ args:
long: sslonly
takes_value: false
help: Tells the server to only accept SSL connections and disables the non-SSL port
subcommands:
- upgrade:
about: Upgrades old datsets to the latest format supported by this server edition

@ -0,0 +1,130 @@
/*
* Created on Sun May 16 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! Compatibility suite for Skytable
//!
//! This module will enable users from an earlier version of Skytable to migrate their data to match
//! the latest format
use crate::coredb::{htable::HTable, Data};
use crate::diskstore::snapshot::SNAP_MATCH;
use bytes::Bytes;
use libsky::TResult;
use std::collections::HashMap;
use std::fs;
use std::io::Write;
use std::iter::FromIterator;
use std::path::PathBuf;
/// The disk storage type since 0.3.1
type DiskStoreType = (Vec<String>, Vec<Vec<u8>>);
const SKY_UPGRADE_FOLDER: &str = "newdata";
const SKY_COMPLETE_UPGRADE_FOLDER: &str = "newdata/snapshots/remote";
pub fn concat_path(other: impl Into<PathBuf>) -> PathBuf {
let mut path = PathBuf::from(SKY_UPGRADE_FOLDER);
path.push(other.into());
path
}
pub fn upgrade() -> TResult<()> {
fs::create_dir_all(SKY_COMPLETE_UPGRADE_FOLDER)?;
// first attempt to upgrade the data file
log::info!("Upgrading data file");
upgrade_file("data/data.bin", concat_path("data.bin"))
.map_err(|e| format!("Failed to upgrade data.bin file with error: {}", e))?;
log::info!("Finished upgrading data file");
// now let's check what files are there in the snapshots directory
log::info!("Upgrading snapshots");
let snapshot_dir = fs::read_dir("data/snapshots")?;
for path in snapshot_dir {
let path = path?.path();
if path.is_dir() && path != PathBuf::from("data/snapshots/remote") {
return Err("The snapshot directory contains unrecognized files".into());
}
if path.is_file() {
let fname = path
.file_name()
.ok_or("Failed to get path name in snapshot directory")?
.to_string_lossy();
if !SNAP_MATCH.is_match(&fname) {
return Err("The snapshot directory contains unexpected files".into());
}
upgrade_file(path.clone(), concat_path(format!("snapshots/{}", fname)))?;
}
}
log::info!("Finished upgrading snapshots");
log::info!("Upgrading remote snapshots");
let remote_snapshot_dir = fs::read_dir("data/snapshots/remote")?;
for path in remote_snapshot_dir {
let path = path?.path();
if path.is_file() {
let fname = path
.file_name()
.ok_or("Failed to get filename in remote snapshot directory")?
.to_string_lossy();
upgrade_file(
path.clone(),
concat_path(format!("snapshots/remote/{}", fname)),
)?;
} else {
return Err("Unexpected files in the remote snapshot directory".into());
}
}
log::info!("Finished upgrading remote snapshots");
log::info!("All files were upgraded. Updating directories");
fs::rename("data", "olddata")?;
log::info!("Moved old data into folder 'olddata'");
fs::rename(SKY_UPGRADE_FOLDER, "data")?;
log::info!("Successfully finished upgrade");
Ok(())
}
fn upgrade_file(src: impl Into<PathBuf>, destination: impl Into<PathBuf>) -> TResult<()> {
let file = src.into();
log::info!("Upgrading file: {}", file.to_string_lossy());
let old_data_file = fs::read(&file)?;
let data_from_old_file: DiskStoreType = bincode::deserialize(&old_data_file)?;
let data_from_old_file: HashMap<String, Data> = HashMap::from_iter(
data_from_old_file
.0
.into_iter()
.zip(data_from_old_file.1.into_iter())
.map(|(key, value)| (key, Data::from_blob(Bytes::from(value)))),
);
let data_in_new_format: HTable<Data, Data> = HTable::from_iter(
data_from_old_file
.into_iter()
.map(|(key, value)| (Data::from_string(key), value)),
);
let data_in_new_format = bincode::serialize(&data_in_new_format)?;
let destination = destination.into();
let mut file = fs::File::create(&destination)?;
log::info!("Writing upgraded file to {}", destination.to_string_lossy());
file.write_all(&data_in_new_format)?;
Ok(())
}

@ -26,6 +26,7 @@
//! This module provides tools to handle configuration files and settings
use crate::compat;
#[cfg(test)]
use libsky::TResult;
use serde::Deserialize;
@ -35,6 +36,7 @@ use std::fs;
#[cfg(test)]
use std::net::Ipv6Addr;
use std::net::{IpAddr, Ipv4Addr};
use std::process;
use toml;
const DEFAULT_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
@ -408,6 +410,15 @@ impl fmt::Display for ConfigError {
pub fn get_config_file_or_return_cfg() -> Result<ConfigType<ParsedConfig, String>, ConfigError> {
let cfg_layout = load_yaml!("../cli.yml");
let matches = App::from_yaml(cfg_layout).get_matches();
// check upgrades
if matches.subcommand_matches("upgrade").is_some() {
if let Err(e) = compat::upgrade() {
log::error!("Dataset upgrade failed with error: {}", e);
process::exit(0x100);
} else {
process::exit(0x000);
}
}
let restorefile = matches.value_of("restore").map(|v| v.to_string());
// Check flags
let sslonly = matches.is_present("sslonly");

@ -24,16 +24,19 @@
*
*/
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::borrow::Borrow;
pub use std::collections::hash_map::Entry;
use std::collections::hash_map::Keys;
use std::collections::hash_map::Values;
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use std::iter::FromIterator;
use std::ops::Deref;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct HTable<K, V>
where
K: Eq + Hash,
@ -90,7 +93,6 @@ where
self.inner.values()
}
}
impl<K: Eq + Hash, V> IntoIterator for HTable<K, V> {
type Item = (K, V);
type IntoIter = std::collections::hash_map::IntoIter<K, V>;
@ -99,6 +101,25 @@ impl<K: Eq + Hash, V> IntoIterator for HTable<K, V> {
}
}
impl Deref for Data {
type Target = [u8];
fn deref(&self) -> &<Self>::Target {
&self.blob
}
}
impl Borrow<[u8]> for Data {
fn borrow(&self) -> &[u8] {
&self.blob.borrow()
}
}
impl AsRef<[u8]> for Data {
fn as_ref(&self) -> &[u8] {
&self.blob
}
}
impl<K, V> FromIterator<(K, V)> for HTable<K, V>
where
K: Eq + Hash,
@ -112,3 +133,111 @@ where
}
}
}
/// A wrapper for `Bytes`
#[derive(Debug, PartialEq, Clone)]
pub struct Data {
/// The blob of data
blob: Bytes,
}
impl Data {
/// Create a new blob from a string
pub fn from_string(val: String) -> Self {
Data {
blob: Bytes::from(val.into_bytes()),
}
}
/// Create a new blob from an existing `Bytes` instance
pub const fn from_blob(blob: Bytes) -> Self {
Data { blob }
}
/// Get the inner blob (raw `Bytes`)
pub const fn get_blob(&self) -> &Bytes {
&self.blob
}
/// Get the inner blob as an `u8` slice (coerced)
pub fn get_inner_ref(&self) -> &[u8] {
&self.blob
}
}
impl Eq for Data {}
impl Hash for Data {
fn hash<H>(&self, hasher: &mut H)
where
H: std::hash::Hasher,
{
self.blob.hash(hasher)
}
}
impl<T> From<T> for Data
where
T: Into<Bytes>,
{
fn from(dat: T) -> Self {
Self {
blob: Bytes::from(dat.into()),
}
}
}
use serde::ser::{SerializeSeq, Serializer};
impl Serialize for Data {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.blob.len()))?;
for e in self.blob.iter() {
seq.serialize_element(e)?;
}
seq.end()
}
}
use serde::de::{Deserializer, SeqAccess, Visitor};
struct DataVisitor;
impl<'de> Visitor<'de> for DataVisitor {
type Value = Data;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting a coredb::htable::Data object")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut bytes = Vec::new();
while let Some(unsigned_8bit_int) = seq.next_element()? {
bytes.push(unsigned_8bit_int);
}
Ok(Data::from_blob(Bytes::from(bytes)))
}
}
impl<'de> Deserialize<'de> for Data {
fn deserialize<D>(deserializer: D) -> Result<Data, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_seq(DataVisitor)
}
}
#[test]
fn test_de() {
let mut x: HTable<String, Data> = HTable::new();
x.insert(
String::from("Sayan"),
Data::from_string("is writing open-source code".to_owned()),
);
let ser = bincode::serialize(&x).unwrap();
let de: HTable<String, Data> = bincode::deserialize(&ser).unwrap();
assert_eq!(de, x);
let mut hmap: HTable<Data, Data> = HTable::new();
hmap.insert(Data::from("sayan"), Data::from("writes code"));
assert!(hmap.get("sayan".as_bytes()).is_some());
}

@ -33,7 +33,7 @@ use crate::dbnet::connection::prelude::*;
use crate::diskstore;
use crate::protocol::Query;
use crate::queryengine;
use bytes::Bytes;
pub use htable::Data;
use libsky::TResult;
use parking_lot::RwLock;
use parking_lot::RwLockReadGuard;
@ -107,7 +107,7 @@ pub struct Shared {
#[derive(Debug)]
pub struct Coretable {
/// The core table contain key-value pairs
coremap: HTable<String, Data>,
coremap: HTable<Data, Data>,
/// Whether the database is poisoned or not
///
/// If the database is poisoned -> the database can no longer accept writes
@ -117,43 +117,15 @@ pub struct Coretable {
impl Coretable {
/// Get a reference to the inner `HTable`
pub const fn get_ref<'a>(&'a self) -> &'a HTable<String, Data> {
pub const fn get_ref<'a>(&'a self) -> &'a HTable<Data, Data> {
&self.coremap
}
/// Get a **mutable** reference to the inner `HTable`
pub fn get_mut_ref<'a>(&'a mut self) -> &'a mut HTable<String, Data> {
pub fn get_mut_ref<'a>(&'a mut self) -> &'a mut HTable<Data, Data> {
&mut self.coremap
}
}
/// A wrapper for `Bytes`
#[derive(Debug, PartialEq, Clone)]
pub struct Data {
/// The blob of data
blob: Bytes,
}
impl Data {
/// Create a new blob from a string
pub fn from_string(val: String) -> Self {
Data {
blob: Bytes::from(val.into_bytes()),
}
}
/// Create a new blob from an existing `Bytes` instance
pub const fn from_blob(blob: Bytes) -> Self {
Data { blob }
}
/// Get the inner blob (raw `Bytes`)
pub const fn get_blob(&self) -> &Bytes {
&self.blob
}
/// Get the inner blob as an `u8` slice (coerced)
pub fn get_inner_ref(&self) -> &[u8] {
&self.blob
}
}
impl CoreDB {
#[cfg(debug_assertions)]
#[allow(dead_code)] // This has been kept for debugging purposes, so we'll suppress this lint
@ -245,7 +217,7 @@ impl CoreDB {
CoreDB {
shared: Arc::new(Shared {
table: RwLock::new(Coretable {
coremap: HTable::<String, Data>::new(),
coremap: HTable::<Data, Data>::new(),
poisoned: false,
}),
}),
@ -276,7 +248,7 @@ impl CoreDB {
/// **⚠ Do note**: This is super inefficient since it performs an actual
/// clone of the `HTable` and doesn't do any `Arc`-business! This function
/// can be used by test functions and the server, but **use with caution!**
pub fn get_htable_deep_clone(&self) -> HTable<String, Data> {
pub fn get_htable_deep_clone(&self) -> HTable<Data, Data> {
(*self.acquire_read().get_ref()).clone()
}
}

@ -28,61 +28,27 @@
use crate::coredb::htable::HTable;
use crate::coredb::Data;
use crate::diskstore::snapshot::{DIR_OLD_SNAPSHOT, DIR_SNAPSHOT};
use crate::diskstore::snapshot::DIR_SNAPSHOT;
use bincode;
use bytes::Bytes;
use libsky::TResult;
use std::fs;
use std::io::{ErrorKind, Write};
use std::iter::FromIterator;
use std::path::PathBuf;
pub mod flock;
pub mod snapshot;
mod snapstore;
/// This type alias is to be used when deserializing binary data from disk
type DiskStoreFromDisk = (Vec<String>, Vec<Vec<u8>>);
/// This type alias is to be used when serializing data from the in-memory table
/// onto disk
type DiskStoreFromMemory<'a> = (Vec<&'a String>, Vec<&'a [u8]>);
lazy_static::lazy_static! {
pub static ref PERSIST_FILE: PathBuf = PathBuf::from("./data/data.bin");
pub static ref OLD_PATH: PathBuf = PathBuf::from("./data.bin");
}
fn get_snapshot(path: String) -> TResult<Option<HTable<String, Data>>> {
fn get_snapshot(path: String) -> TResult<Option<HTable<Data, Data>>> {
// the path just has the snapshot name, let's improve that
let mut snap_location = PathBuf::from(DIR_SNAPSHOT);
snap_location.push(&path);
let file = match fs::read(snap_location) {
Ok(f) => f,
Err(e) => match e.kind() {
ErrorKind::NotFound => {
// Probably the old snapshot directory?
let mut old_snaploc = PathBuf::from(DIR_OLD_SNAPSHOT);
old_snaploc.push(path);
match fs::read(old_snaploc) {
Ok(f) => {
log::warn!("The new snapshot directory is under the data directory");
if let Err(e) = fs::rename(DIR_OLD_SNAPSHOT, DIR_SNAPSHOT) {
log::error!(
"Failed to migrate snapshot directory into new structure: {}",
e
);
return Err(e.into());
} else {
log::info!(
"Migrated old snapshot directory structure to newer structure"
);
log::warn!("This backwards compat will be removed in the future");
}
f
}
_ => return Err(e.into()),
}
}
_ => return Err(e.into()),
},
Err(e) => return Err(e.into()),
};
let parsed = deserialize(file)?;
Ok(Some(parsed))
@ -90,7 +56,7 @@ fn get_snapshot(path: String) -> TResult<Option<HTable<String, Data>>> {
/// Try to get the saved data from disk. This returns `None`, if the `data/data.bin` wasn't found
/// otherwise the `data/data.bin` file is deserialized and parsed into a `HTable`
pub fn get_saved(path: Option<String>) -> TResult<Option<HTable<String, Data>>> {
pub fn get_saved(path: Option<String>) -> TResult<Option<HTable<Data, Data>>> {
if let Some(path) = path {
get_snapshot(path)
} else {
@ -98,33 +64,7 @@ pub fn get_saved(path: Option<String>) -> TResult<Option<HTable<String, Data>>>
Ok(f) => f,
Err(e) => match e.kind() {
ErrorKind::NotFound => {
// TODO(@ohsayan): Drop support for this in the future
// This might be an old installation still not using the data/data.bin path
match fs::read(OLD_PATH.to_path_buf()) {
Ok(f) => {
log::warn!("Your data file was found to be in the current directory and not in data/data.bin");
if let Err(e) = fs::rename("data.bin", "data/data.bin") {
log::error!("Failed to move data.bin into data/data.bin directory. Consider moving it manually");
return Err(format!(
"Failed to move data.bin into data/data.bin: {}",
e
)
.into());
} else {
log::info!("The data file has been moved into the new directory");
log::warn!("This backwards compat directory support will be removed in the future");
}
f
}
Err(e) => match e.kind() {
ErrorKind::NotFound => return Ok(None),
_ => {
return Err(
format!("Coudln't read flushed data from disk: {}", e).into()
)
}
},
}
return Ok(None);
}
_ => return Err(format!("Couldn't read flushed data from disk: {}", e).into()),
},
@ -135,21 +75,11 @@ pub fn get_saved(path: Option<String>) -> TResult<Option<HTable<String, Data>>>
}
#[cfg(test)]
pub fn test_deserialize(file: Vec<u8>) -> TResult<HTable<String, Data>> {
pub fn test_deserialize(file: Vec<u8>) -> TResult<HTable<Data, Data>> {
deserialize(file)
}
fn deserialize(file: Vec<u8>) -> TResult<HTable<String, Data>> {
let parsed: DiskStoreFromDisk = bincode::deserialize(&file)?;
let parsed: HTable<String, Data> = HTable::from_iter(
parsed
.0
.into_iter()
.zip(parsed.1.into_iter())
.map(|(key, value)| {
let data = Data::from_blob(Bytes::from(value));
(key, data)
}),
);
fn deserialize(file: Vec<u8>) -> TResult<HTable<Data, Data>> {
let parsed = bincode::deserialize(&file)?;
Ok(parsed)
}
@ -157,24 +87,20 @@ fn deserialize(file: Vec<u8>) -> TResult<HTable<String, Data>> {
///
/// This functions takes the entire in-memory table and writes it to the disk,
/// more specifically, the `data/data.bin` file
pub fn flush_data(file: &mut flock::FileLock, data: &HTable<String, Data>) -> TResult<()> {
pub fn flush_data(file: &mut flock::FileLock, data: &HTable<Data, Data>) -> TResult<()> {
let encoded = serialize(&data)?;
file.write(&encoded)?;
Ok(())
}
pub fn write_to_disk(file: &PathBuf, data: &HTable<String, Data>) -> TResult<()> {
pub fn write_to_disk(file: &PathBuf, data: &HTable<Data, Data>) -> TResult<()> {
let mut file = fs::File::create(&file)?;
let encoded = serialize(&data)?;
file.write_all(&encoded)?;
Ok(())
}
fn serialize(data: &HTable<String, Data>) -> TResult<Vec<u8>> {
let ds: DiskStoreFromMemory = (
data.keys().into_iter().collect(),
data.values().map(|val| val.get_inner_ref()).collect(),
);
let encoded = bincode::serialize(&ds)?;
fn serialize(data: &HTable<Data, Data>) -> TResult<Vec<u8>> {
let encoded = bincode::serialize(&data)?;
Ok(encoded)
}

@ -44,7 +44,7 @@ lazy_static::lazy_static! {
/// ```text
/// YYYYMMDD-HHMMSS.snapshot
/// ```
static ref SNAP_MATCH: Regex = Regex::new("^\\d{4}(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01])(-)(?:(?:([01]?\\d|2[0-3]))?([0-5]?\\d))?([0-5]?\\d)(.snapshot)$").unwrap();
pub static ref SNAP_MATCH: Regex = Regex::new("^\\d{4}(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01])(-)(?:(?:([01]?\\d|2[0-3]))?([0-5]?\\d))?([0-5]?\\d)(.snapshot)$").unwrap();
/// The directory for remote snapshots
pub static ref DIR_REMOTE_SNAPSHOT: PathBuf = PathBuf::from("./data/snapshots/remote");
}
@ -53,7 +53,6 @@ lazy_static::lazy_static! {
///
/// This is currently a `snapshot` directory under the current directory
pub const DIR_SNAPSHOT: &'static str = "data/snapshots";
pub const DIR_OLD_SNAPSHOT: &'static str = "snapshots";
/// The default snapshot count is 12, assuming that the user would take a snapshot
/// every 2 hours (or 7200 seconds)
const DEF_SNAPSHOT_COUNT: usize = 12;
@ -337,7 +336,7 @@ fn test_snapshot() {
let db = CoreDB::new_empty(std::sync::Arc::new(Some(SnapshotStatus::new(4))));
let mut write = db.acquire_write().unwrap();
let _ = write.get_mut_ref().insert(
String::from("ohhey"),
crate::coredb::Data::from(String::from("ohhey")),
crate::coredb::Data::from_string(String::from("heya!")),
);
drop(write);

@ -50,7 +50,7 @@ where
let mut many = 0;
let cmap = (*whandle).get_mut_ref();
act.into_iter().skip(1).for_each(|key| {
if cmap.remove(&key).is_some() {
if cmap.remove(key.as_bytes()).is_some() {
many += 1
}
});

@ -45,7 +45,7 @@ where
let rhandle = handle.acquire_read();
let cmap = rhandle.get_ref();
act.into_iter().skip(1).for_each(|key| {
if cmap.contains_key(&key) {
if cmap.contains_key(key.as_bytes()) {
how_many_of_them_exist += 1;
}
});

@ -50,7 +50,7 @@ where
// UNSAFE(@ohsayan): act.get_ref().get_unchecked() is safe because we've already if the action
// group contains one argument (excluding the action itself)
reader
.get(act.get_unchecked(1))
.get(act.get_unchecked(1).as_bytes())
.map(|b| b.get_blob().clone())
}
};

@ -47,7 +47,7 @@ where
// UNSAFE(@ohsayan): get_unchecked() is completely safe as we've already checked
// the number of arguments is one
reader
.get(act.get_unchecked(1))
.get(act.get_unchecked(1).as_bytes())
.map(|b| b.get_blob().len())
}
};

@ -47,7 +47,7 @@ where
let res: Option<Bytes> = {
let rhandle = handle.acquire_read();
let reader = rhandle.get_ref();
reader.get(&key).map(|b| b.get_blob().clone())
reader.get(key.as_bytes()).map(|b| b.get_blob().clone())
};
if let Some(value) = res {
// Good, we got the value, write it off to the stream

@ -26,6 +26,7 @@
use crate::coredb;
use crate::coredb::htable::Entry;
use crate::coredb::Data;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
@ -53,7 +54,7 @@ where
let writer = whandle.get_mut_ref();
let mut didmany = 0;
while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) {
if let Entry::Vacant(v) = writer.entry(key) {
if let Entry::Vacant(v) = writer.entry(Data::from(key)) {
let _ = v.insert(coredb::Data::from_string(val));
didmany += 1;
}

@ -26,6 +26,7 @@
use crate::coredb;
use crate::coredb::htable::Entry;
use crate::coredb::Data;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
@ -53,7 +54,7 @@ where
let writer = whandle.get_mut_ref();
let mut didmany = 0;
while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) {
if let Entry::Occupied(mut v) = writer.entry(key) {
if let Entry::Occupied(mut v) = writer.entry(Data::from(key)) {
let _ = v.insert(coredb::Data::from_string(val));
didmany += 1;
}

@ -53,11 +53,13 @@ where
let did_we = {
if let Some(mut writer) = handle.acquire_write() {
let writer = writer.get_mut_ref();
if let Entry::Vacant(e) = writer.entry(it.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments
unreachable_unchecked()
})) {
if let Entry::Vacant(e) =
writer.entry(Data::from(it.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments
unreachable_unchecked()
})))
{
e.insert(Data::from_string(it.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): This is completely safe as we've already checked
// that there are exactly 2 arguments

@ -76,7 +76,7 @@ where
if let Some(mut whandle) = handle.acquire_write() {
let mut_table = whandle.get_mut_ref();
while let Some(key) = key_iter.next() {
if mut_table.contains_key(key.as_str()) {
if mut_table.contains_key(key.as_bytes()) {
// With one of the keys existing - this action can't clearly be done
// So we'll set `failed` to true and ensure that we check this while
// writing a response back to the client
@ -92,7 +92,10 @@ where
// So we can safely set the keys
let mut iter = act.into_iter().skip(1);
while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
if mut_table.insert(key, Data::from_string(value)).is_some() {
if mut_table
.insert(Data::from(key), Data::from_string(value))
.is_some()
{
// Tell the compiler that this will never be the case
unsafe {
// UNSAFE(@ohsayan): As none of the keys exist in the table, no
@ -151,7 +154,7 @@ where
if let Some(mut whandle) = handle.acquire_write() {
let mut_table = whandle.get_mut_ref();
while let Some(key) = key_iter.next() {
if !mut_table.contains_key(key.as_str()) {
if !mut_table.contains_key(key.as_bytes()) {
// With one of the keys not existing - this action can't clearly be done
// So we'll set `failed` to true and ensure that we check this while
// writing a response back to the client
@ -169,7 +172,7 @@ where
act.into_iter().skip(1).for_each(|key| {
// Since we've already checked that the keys don't exist
// We'll tell the compiler to optimize this
let _ = mut_table.remove(&key).unwrap_or_else(|| unsafe {
let _ = mut_table.remove(key.as_bytes()).unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): Since all the values exist, all of them will return
// some value. Hence, this branch won't ever be reached. Hence, this is safe.
unreachable_unchecked()
@ -223,7 +226,7 @@ where
if let Some(mut whandle) = handle.acquire_write() {
let mut_table = whandle.get_mut_ref();
while let Some(key) = key_iter.next() {
if !mut_table.contains_key(key.as_str()) {
if !mut_table.contains_key(key.as_bytes()) {
// With one of the keys failing to exist - this action can't clearly be done
// So we'll set `failed` to true and ensure that we check this while
// writing a response back to the client
@ -245,7 +248,10 @@ where
// So we can safely update the keys
let mut iter = act.into_iter().skip(1);
while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
if mut_table.insert(key, Data::from_string(value)).is_none() {
if mut_table
.insert(Data::from(key), Data::from_string(value))
.is_none()
{
// Tell the compiler that this will never be the case
unsafe { unreachable_unchecked() }
}

@ -53,11 +53,13 @@ where
let did_we = {
if let Some(mut whandle) = handle.acquire_write() {
let writer = whandle.get_mut_ref();
if let Entry::Occupied(mut e) = writer.entry(it.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked that the action contains exactly
// two arguments (excluding the action itself). So, this branch won't ever be reached
unreachable_unchecked()
})) {
if let Entry::Occupied(mut e) =
writer.entry(Data::from(it.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked that the action contains exactly
// two arguments (excluding the action itself). So, this branch won't ever be reached
unreachable_unchecked()
})))
{
e.insert(Data::from_string(it.next().unwrap_or_else(|| unsafe {
// UNSAFE(@ohsayan): We've already checked that the action contains exactly
// two arguments (excluding the action itself). So, this branch won't ever be reached

@ -24,7 +24,7 @@
*
*/
use crate::coredb::{self};
use crate::coredb::Data;
use crate::dbnet::connection::prelude::*;
use crate::protocol::responses;
@ -52,7 +52,7 @@ where
if let Some(mut whandle) = handle.acquire_write() {
let writer = whandle.get_mut_ref();
while let (Some(key), Some(val)) = (kviter.next(), kviter.next()) {
let _ = writer.insert(key, coredb::Data::from_string(val));
let _ = writer.insert(Data::from(key), Data::from(val));
}
drop(writer);
drop(whandle);

@ -46,6 +46,7 @@ mod queryengine;
mod resp;
use coredb::CoreDB;
use dbnet::run;
mod compat;
use env_logger::*;
use libsky::util::terminal;
use std::sync::Arc;

@ -44,7 +44,7 @@ mod bgsave {
// pre-initialize our maps for comparison
let mut map_should_be_with_one = HTable::new();
map_should_be_with_one.insert(
String::from("sayan"),
Data::from(String::from("sayan")),
Data::from_string("is testing bgsave".to_owned()),
);
#[allow(non_snake_case)]
@ -60,19 +60,21 @@ mod bgsave {
// sleep for 10 seconds with epsilon 1.5s
time::sleep(DUR_WITH_EPSILON).await;
// we should get an empty map
let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
let saved =
diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
assert!(saved.len() == 0);
// now let's quickly write some data
{
datahandle.acquire_write().unwrap().get_mut_ref().insert(
String::from("sayan"),
Data::from_string("is testing bgsave".to_owned()),
Data::from(String::from("sayan")),
Data::from("is testing bgsave".to_owned()),
);
}
// sleep for 10 seconds with epsilon 1.5s
time::sleep(DUR_WITH_EPSILON).await;
// we should get a map with the one key
let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
let saved =
diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
assert_eq!(saved, map_should_be_with_one);
// now let's remove all the data
{
@ -80,13 +82,15 @@ mod bgsave {
}
// sleep for 10 seconds with epsilon 1.5s
time::sleep(DUR_WITH_EPSILON).await;
let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
let saved =
diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
assert!(saved.len() == 0);
// drop the signal; all waiting tasks can now terminate
drop(signal);
handle.await.unwrap();
// check the file again after unlocking
let saved = diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
let saved =
diskstore::test_deserialize(fs::read(BGSAVE_DIRECTORY_TESTING_LOC).unwrap()).unwrap();
assert!(saved.len() == 0);
fs::remove_file(BGSAVE_DIRECTORY_TESTING_LOC).unwrap();
}

Loading…
Cancel
Save