Add `create table` and `create keyspace`

next
Sayan Nandan 3 years ago
parent 5bab0fb91b
commit 608e008a65

@ -33,6 +33,7 @@ use crate::coredb::memstore::ObjectID;
use crate::coredb::memstore::DEFAULT; use crate::coredb::memstore::DEFAULT;
use crate::coredb::table::Table; use crate::coredb::table::Table;
use crate::kvengine::KVEngine; use crate::kvengine::KVEngine;
use crate::registry;
use crate::storage; use crate::storage;
use crate::util::Unwrappable; use crate::util::Unwrappable;
use crate::IoResult; use crate::IoResult;
@ -120,8 +121,8 @@ impl Corestore {
pub fn get_kvstore(&self) -> KeyspaceResult<&KVEngine> { pub fn get_kvstore(&self) -> KeyspaceResult<&KVEngine> {
match &self.ctable { match &self.ctable {
Some(tbl) => match tbl.get_kvstore() { Some(tbl) => match tbl.get_kvstore() {
Some(kvs) => Ok(kvs), Ok(kvs) => Ok(kvs),
None => Err(DdlError::WrongModel), _ => Err(DdlError::WrongModel),
}, },
None => Err(DdlError::DefaultNotFound), None => Err(DdlError::DefaultNotFound),
} }
@ -129,4 +130,54 @@ impl Corestore {
pub fn is_snapshot_enabled(&self) -> bool { pub fn is_snapshot_enabled(&self) -> bool {
self.store.snap_config.is_some() self.store.snap_config.is_some()
} }
/// Create a table: in-memory; **no transactional guarantees**. Two tables can be created
/// simultaneously, but are never flushed unless we are very lucky. If the global flush
/// system is close to a flush cycle -- then we are in luck: we pause the flush cycle
/// through a global flush lock and then allow it to resume once we're done adding the table.
/// This enables the flush routine to permanently write the table to disk. But it's all about
/// luck -- the next mutual access may be yielded to the next `create table` command
pub fn create_table(
&self,
ksid: ObjectID,
tblid: ObjectID,
modelcode: u8,
volatile: bool,
) -> KeyspaceResult<()> {
// first lock the global flush state
let flush_lock = registry::lock_flush_state();
let ret = match &self.store.get_keyspace_atomic_ref(ksid) {
Some(ks) => {
let tbl = Table::from_model_code(modelcode, volatile);
if let Some(tbl) = tbl {
if ks.create_table(tblid.clone(), tbl) {
Ok(())
} else {
Err(DdlError::AlreadyExists)
}
} else {
Err(DdlError::WrongModel)
}
}
None => Err(DdlError::ObjectNotFound),
};
// free the global flush lock
drop(flush_lock);
ret
}
/// Create a keyspace **without any transactional guarantees**
pub fn create_keyspace(&self, ksid: ObjectID) -> KeyspaceResult<()> {
// lock the global flush lock (see comment in create_table to know why)
let flush_lock = registry::lock_flush_state();
let ret = if self.store.create_keyspace(ksid) {
// woo, created
Ok(())
} else {
// ugh, already exists
Err(DdlError::AlreadyExists)
};
drop(flush_lock);
ret
}
} }

@ -27,6 +27,7 @@
#![allow(dead_code)] // TODO(@ohsayan): Remove this once we're done #![allow(dead_code)] // TODO(@ohsayan): Remove this once we're done
use crate::coredb::array::LenScopeGuard; use crate::coredb::array::LenScopeGuard;
use crate::util::compiler::{likely, unlikely};
use core::alloc::Layout; use core::alloc::Layout;
use core::borrow::Borrow; use core::borrow::Borrow;
use core::borrow::BorrowMut; use core::borrow::BorrowMut;
@ -159,28 +160,6 @@ pub struct IArray<A: MemoryBlock> {
store: InlineArray<A>, store: InlineArray<A>,
} }
/*
use branch prediction hints for optimizations as we don't expect our
ks/ns names to exceed the memory block sizes we pre-allocate for them
*/
#[cold]
fn cold() {}
fn likely(b: bool) -> bool {
if !b {
cold()
}
b
}
fn unlikely(b: bool) -> bool {
if b {
cold()
}
b
}
impl IArray<[u8; 48]> { impl IArray<[u8; 48]> {
/// Returns a new 48-bit, stack allocated array of bytes /// Returns a new 48-bit, stack allocated array of bytes
fn new_bytearray() -> Self { fn new_bytearray() -> Self {

@ -59,6 +59,7 @@
use super::corestore::KeyspaceResult; use super::corestore::KeyspaceResult;
use crate::coredb::array::Array; use crate::coredb::array::Array;
use crate::coredb::htable::Coremap; use crate::coredb::htable::Coremap;
use crate::coredb::lock::{QLGuard, QuickLock};
use crate::coredb::table::Table; use crate::coredb::table::Table;
use crate::coredb::SnapshotStatus; use crate::coredb::SnapshotStatus;
use crate::SnapshotConfig; use crate::SnapshotConfig;
@ -132,6 +133,12 @@ pub enum DdlError {
DefaultNotFound, DefaultNotFound,
/// Incorrect data model semantics were used on a data model /// Incorrect data model semantics were used on a data model
WrongModel, WrongModel,
/// The object already exists
AlreadyExists,
/// The target object is not ready
NotReady,
/// The DDL transaction failed
DdlTransactionFailure,
} }
#[derive(Debug)] #[derive(Debug)]
@ -145,6 +152,8 @@ pub struct Memstore {
pub keyspaces: Coremap<ObjectID, Arc<Keyspace>>, pub keyspaces: Coremap<ObjectID, Arc<Keyspace>>,
/// the snapshot configuration /// the snapshot configuration
pub snap_config: Option<SnapshotStatus>, pub snap_config: Option<SnapshotStatus>,
/// A **virtual lock** on the preload file
preload_lock: QuickLock<()>,
} }
impl Memstore { impl Memstore {
@ -153,6 +162,7 @@ impl Memstore {
Self { Self {
keyspaces: Coremap::new(), keyspaces: Coremap::new(),
snap_config: None, snap_config: None,
preload_lock: QuickLock::new(()),
} }
} }
pub fn init_with_all( pub fn init_with_all(
@ -166,6 +176,7 @@ impl Memstore {
} else { } else {
None None
}, },
preload_lock: QuickLock::new(()),
} }
} }
/// Create a new in-memory table with the default keyspace and the default /// Create a new in-memory table with the default keyspace and the default
@ -195,6 +206,7 @@ impl Memstore {
n n
}, },
snap_config: None, snap_config: None,
preload_lock: QuickLock::new(()),
} }
} }
/// Get an atomic reference to a keyspace /// Get an atomic reference to a keyspace
@ -236,6 +248,8 @@ pub struct Keyspace {
pub tables: Coremap<ObjectID, Arc<Table>>, pub tables: Coremap<ObjectID, Arc<Table>>,
/// the replication strategy for this keyspace /// the replication strategy for this keyspace
replication_strategy: cluster::ReplicationStrategy, replication_strategy: cluster::ReplicationStrategy,
/// A **virtual lock** on the partmap for this keyspace
partmap_lock: QuickLock<()>,
} }
macro_rules! unsafe_objectid_from_slice { macro_rules! unsafe_objectid_from_slice {
@ -258,12 +272,14 @@ impl Keyspace {
ht ht
}, },
replication_strategy: cluster::ReplicationStrategy::default(), replication_strategy: cluster::ReplicationStrategy::default(),
partmap_lock: QuickLock::new(()),
} }
} }
pub fn init_with_all_def_strategy(tables: Coremap<ObjectID, Arc<Table>>) -> Self { pub fn init_with_all_def_strategy(tables: Coremap<ObjectID, Arc<Table>>) -> Self {
Self { Self {
tables, tables,
replication_strategy: cluster::ReplicationStrategy::default(), replication_strategy: cluster::ReplicationStrategy::default(),
partmap_lock: QuickLock::new(()),
} }
} }
/// Create a new empty keyspace with zero tables /// Create a new empty keyspace with zero tables
@ -271,6 +287,7 @@ impl Keyspace {
Self { Self {
tables: Coremap::new(), tables: Coremap::new(),
replication_strategy: cluster::ReplicationStrategy::default(), replication_strategy: cluster::ReplicationStrategy::default(),
partmap_lock: QuickLock::new(()),
} }
} }
/// Get an atomic reference to a table in this keyspace if it exists /// Get an atomic reference to a table in this keyspace if it exists
@ -281,6 +298,11 @@ impl Keyspace {
pub fn create_table(&self, tableid: ObjectID, table: Table) -> bool { pub fn create_table(&self, tableid: ObjectID, table: Table) -> bool {
self.tables.true_if_insert(tableid, Arc::new(table)) self.tables.true_if_insert(tableid, Arc::new(table))
} }
/// Drop a table if it exists, if it is not forbidden and if no one references
/// back to it. We don't want any looming table references i.e table gets deleted
/// for the current connection and newer connections, but older instances still
/// refer to the table.
// FIXME(@ohsayan): Should we actually care?
pub fn drop_table(&self, table_identifier: ObjectID) -> KeyspaceResult<()> { pub fn drop_table(&self, table_identifier: ObjectID) -> KeyspaceResult<()> {
if table_identifier.eq(&unsafe_objectid_from_slice!("default")) { if table_identifier.eq(&unsafe_objectid_from_slice!("default")) {
Err(DdlError::ProtectedObject) Err(DdlError::ProtectedObject)
@ -301,6 +323,14 @@ impl Keyspace {
} }
} }
} }
/// Remove a table without doing any reference checks. This will just pull it off
pub unsafe fn force_remove_table(&self, tblid: &ObjectID) {
// atomic remember? nobody cares about the result
self.tables.remove(tblid);
}
pub fn lock_partmap(&self) -> QLGuard<'_, ()> {
self.partmap_lock.lock()
}
} }
#[test] #[test]

@ -26,7 +26,9 @@
#![allow(dead_code)] // TODO(@ohsayan): Remove this once we're done #![allow(dead_code)] // TODO(@ohsayan): Remove this once we're done
use crate::coredb::corestore::KeyspaceResult;
use crate::coredb::htable::Coremap; use crate::coredb::htable::Coremap;
use crate::coredb::memstore::DdlError;
use crate::coredb::Data; use crate::coredb::Data;
use crate::kvengine::KVEngine; use crate::kvengine::KVEngine;
use crate::storage::bytemarks; use crate::storage::bytemarks;
@ -43,17 +45,18 @@ pub enum DataModel {
pub struct Table { pub struct Table {
/// a key/value store /// a key/value store
model_store: DataModel, model_store: DataModel,
/// is the table volatile
volatile: bool, volatile: bool,
} }
impl Table { impl Table {
/// Get the key/value store if the table is a key/value store /// Get the key/value store if the table is a key/value store
pub const fn get_kvstore(&self) -> Option<&KVEngine> { pub const fn get_kvstore(&self) -> KeyspaceResult<&KVEngine> {
#[allow(irrefutable_let_patterns)] #[allow(irrefutable_let_patterns)]
if let DataModel::KV(kvs) = &self.model_store { if let DataModel::KV(kvs) = &self.model_store {
Some(&kvs) Ok(kvs)
} else { } else {
None Err(DdlError::WrongModel)
} }
} }
/// Returns the storage type as an 8-bit uint /// Returns the storage type as an 8-bit uint
@ -76,6 +79,22 @@ impl Table {
model_store: DataModel::KV(KVEngine::init_with_data(k_enc, v_enc, data)), model_store: DataModel::KV(KVEngine::init_with_data(k_enc, v_enc, data)),
} }
} }
pub fn new_kve_with_encoding(volatile: bool, k_enc: bool, v_enc: bool) -> Self {
Self {
volatile,
model_store: DataModel::KV(KVEngine::init(k_enc, v_enc)),
}
}
pub fn from_model_code(code: u8, volatile: bool) -> Option<Self> {
let ret = match code {
0 => Self::new_kve_with_encoding(volatile, false, false),
1 => Self::new_kve_with_encoding(volatile, false, true),
2 => Self::new_kve_with_encoding(volatile, true, true),
3 => Self::new_kve_with_encoding(volatile, true, false),
_ => return None,
};
Some(ret)
}
/// Create a new kve with default settings but with provided volatile configuration /// Create a new kve with default settings but with provided volatile configuration
pub fn new_kve_with_volatile(volatile: bool) -> Self { pub fn new_kve_with_volatile(volatile: bool) -> Self {
Self::new_kve_with_data(Coremap::new(), volatile, false, false) Self::new_kve_with_data(Coremap::new(), volatile, false, false)

@ -43,6 +43,8 @@ pub fn flush_keyspace_full(ksid: &ObjectID, keyspace: &Keyspace) -> IoResult<()>
/// Flush the entire **preload + keyspaces + their partmaps** /// Flush the entire **preload + keyspaces + their partmaps**
pub fn flush_full(store: &Memstore) -> IoResult<()> { pub fn flush_full(store: &Memstore) -> IoResult<()> {
// re-init the tree as new tables/keyspaces may have been added
super::interface::create_tree(store)?;
self::oneshot::flush_preload(store)?; self::oneshot::flush_preload(store)?;
for keyspace in store.keyspaces.iter() { for keyspace in store.keyspaces.iter() {
self::flush_keyspace_full(keyspace.key(), keyspace.value())?; self::flush_keyspace_full(keyspace.key(), keyspace.value())?;
@ -60,6 +62,8 @@ pub fn snap_flush_keyspace_full(
} }
pub fn snap_flush_full(snapid: &str, store: &Memstore) -> IoResult<()> { pub fn snap_flush_full(snapid: &str, store: &Memstore) -> IoResult<()> {
// re-init the tree as new tables/keyspaces may have been added
super::interface::create_tree(store)?;
self::oneshot::snap_flush_preload(snapid, store)?; self::oneshot::snap_flush_preload(snapid, store)?;
for keyspace in store.keyspaces.iter() { for keyspace in store.keyspaces.iter() {
self::snap_flush_keyspace_full(snapid, keyspace.key(), keyspace.value())?; self::snap_flush_keyspace_full(snapid, keyspace.key(), keyspace.value())?;

@ -31,6 +31,8 @@ use crate::coredb::htable::Data;
use crate::coredb::memstore::Keyspace; use crate::coredb::memstore::Keyspace;
use crate::coredb::memstore::Memstore; use crate::coredb::memstore::Memstore;
use crate::IoResult; use crate::IoResult;
use std::collections::HashSet;
use std::fs;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
pub const DIR_KSROOT: &str = "data/ks"; pub const DIR_KSROOT: &str = "data/ks";
@ -46,15 +48,6 @@ pub const DIR_ROOT: &str = "data";
/// ks2/ /// ks2/
/// ks3/ /// ks3/
/// snaps/ /// snaps/
/// ks1/
/// tbl1/
/// tbl2/
/// ks2/
/// tbl1/
/// tbl2/
/// ks3/
/// tbl1/
/// tbl2/
/// backups/ /// backups/
/// ``` /// ```
/// ///
@ -64,14 +57,42 @@ pub fn create_tree(memroot: &Memstore) -> IoResult<()> {
for ks in memroot.keyspaces.iter() { for ks in memroot.keyspaces.iter() {
unsafe { unsafe {
try_dir_ignore_existing!(concat_path!(DIR_KSROOT, ks.key().as_str()))?; try_dir_ignore_existing!(concat_path!(DIR_KSROOT, ks.key().as_str()))?;
for tbl in ks.value().tables.iter() {
try_dir_ignore_existing!(concat_path!(
DIR_SNAPROOT,
ks.key().as_str(),
tbl.key().as_str()
))?;
} }
} }
Ok(())
}
/// Clean up the tree
///
/// **Warning**: Calling this is quite inefficient so consider calling it once or twice
/// throughout the lifecycle of the server
pub fn cleanup_tree(memroot: &Memstore) -> IoResult<()> {
// hashset because the fs itself will not allow duplicate entries
let dir_keyspaces: HashSet<String> = read_dir_to_col!(DIR_KSROOT);
let our_keyspaces: HashSet<String> = memroot
.keyspaces
.iter()
.map(|kv| unsafe { kv.key().as_str() }.to_owned())
.collect();
// these are the folders that we need to remove; plonk the deleted keyspaces first
for folder in dir_keyspaces.difference(&our_keyspaces) {
let ks_path = concat_str!(DIR_KSROOT, "/", folder);
fs::remove_dir_all(ks_path)?;
}
// now plonk the data files
for keyspace in memroot.keyspaces.iter() {
let ks_path = unsafe { concat_str!(DIR_KSROOT, "/", keyspace.key().as_str()) };
let dir_tbls: HashSet<String> = read_dir_to_col!(&ks_path);
let our_tbls: HashSet<String> = keyspace
.value()
.tables
.iter()
.map(|v| unsafe { v.key().as_str() }.to_owned())
.collect();
for old_file in dir_tbls.difference(&our_tbls) {
// plonk this data file; we don't need it anymore
fs::remove_file(concat_path!(&ks_path, old_file))?;
}
} }
Ok(()) Ok(())
} }

@ -113,3 +113,16 @@ macro_rules! bad_data {
std::io::Error::from(std::io::ErrorKind::InvalidData) std::io::Error::from(std::io::ErrorKind::InvalidData)
}; };
} }
macro_rules! read_dir_to_col {
($root:expr) => {
std::fs::read_dir($root)?
.map(|v| {
v.expect("Unexpected directory parse failure")
.file_name()
.to_string_lossy()
.to_string()
})
.collect()
};
}

@ -170,6 +170,28 @@ macro_rules! action {
}; };
} }
pub mod compiler {
//! BP hints for added optim
#[cold]
#[inline(never)]
pub fn cold() {}
pub fn likely(b: bool) -> bool {
if !b {
cold()
}
b
}
pub fn unlikely(b: bool) -> bool {
if b {
cold()
}
b
}
}
#[macro_export] #[macro_export]
macro_rules! kve { macro_rules! kve {
($db:ident, $con:ident) => { ($db:ident, $con:ident) => {

Loading…
Cancel
Save