Enable entity group based table creation

next
Sayan Nandan 3 years ago
parent 5402028b26
commit 2f39e6808b

@ -190,13 +190,17 @@ impl Corestore {
/// luck -- the next mutual access may be yielded to the next `create table` command /// luck -- the next mutual access may be yielded to the next `create table` command
pub fn create_table( pub fn create_table(
&self, &self,
tblid: ObjectID, entity: EntityGroup,
modelcode: u8, modelcode: u8,
volatile: bool, volatile: bool,
) -> KeyspaceResult<()> { ) -> KeyspaceResult<()> {
// first lock the global flush state // first lock the global flush state
let flush_lock = registry::lock_flush_state(); let flush_lock = registry::lock_flush_state();
let ret = match &self.cks { let ret;
match entity {
// Important: create table <tblname> is only ks
(Some(tblid), None) => {
ret = match &self.cks {
Some(ks) => { Some(ks) => {
let tbl = Table::from_model_code(modelcode, volatile); let tbl = Table::from_model_code(modelcode, volatile);
if let Some(tbl) = tbl { if let Some(tbl) = tbl {
@ -211,6 +215,26 @@ impl Corestore {
} }
None => Err(DdlError::DefaultNotFound), None => Err(DdlError::DefaultNotFound),
}; };
}
(Some(ksid), Some(tblid)) => {
ret = match self.store.get_keyspace_atomic_ref(&ksid) {
Some(kspace) => {
let tbl = Table::from_model_code(modelcode, volatile);
if let Some(tbl) = tbl {
if kspace.create_table(tblid.clone(), tbl) {
Ok(())
} else {
Err(DdlError::AlreadyExists)
}
} else {
Err(DdlError::WrongModel)
}
}
None => Err(DdlError::ObjectNotFound),
}
}
_ => unsafe { impossible!() },
}
// free the global flush lock // free the global flush lock
drop(flush_lock); drop(flush_lock);
ret ret

@ -79,11 +79,11 @@ action!(
/// We should have `<tableid> <model>(args)` /// We should have `<tableid> <model>(args)`
fn create_table(handle: &Corestore, con: &mut T, act: ActionIter) { fn create_table(handle: &Corestore, con: &mut T, act: ActionIter) {
err_if_len_is!(act, con, not 2); err_if_len_is!(act, con, not 2);
let (table_name, model_code) = match parser::parse_table_args(act) { let (table_entity, model_code) = match parser::parse_table_args(act) {
Ok(v) => v, Ok(v) => v,
Err(e) => return con.write_response(e).await, Err(e) => return con.write_response(e).await,
}; };
match handle.create_table(table_name, model_code, false) { match handle.create_table(table_entity, model_code, false) {
Ok(_) => con.write_response(responses::groups::OKAY).await?, Ok(_) => con.write_response(responses::groups::OKAY).await?,
Err(DdlError::AlreadyExists) => { Err(DdlError::AlreadyExists) => {
con.write_response(responses::groups::ALREADY_EXISTS) con.write_response(responses::groups::ALREADY_EXISTS)

@ -32,7 +32,6 @@ use crate::protocol::responses;
use crate::queryengine::ActionIter; use crate::queryengine::ActionIter;
use crate::util::compiler; use crate::util::compiler;
use crate::util::Unwrappable; use crate::util::Unwrappable;
use bytes::Bytes;
use core::str; use core::str;
use regex::Regex; use regex::Regex;
@ -49,17 +48,16 @@ fn cold_err<T>(v: T) -> T {
v v
} }
pub(super) fn parse_table_args(mut act: ActionIter) -> Result<(ObjectID, u8), &'static [u8]> { pub(super) fn parse_table_args(mut act: ActionIter) -> Result<(EntityGroup, u8), &'static [u8]> {
let table_name = unsafe { act.next().unsafe_unwrap() }; let table_name = unsafe { act.next().unsafe_unwrap() };
let model_name = unsafe { act.next().unsafe_unwrap() }; let model_name = unsafe { act.next().unsafe_unwrap() };
if compiler::unlikely(!encoding::is_utf8(&table_name) || !encoding::is_utf8(&model_name)) { if compiler::unlikely(!encoding::is_utf8(&table_name) || !encoding::is_utf8(&model_name)) {
return Err(responses::groups::ENCODING_ERROR); return Err(responses::groups::ENCODING_ERROR);
} }
let table_name_str = unsafe { str::from_utf8_unchecked(&table_name) };
let model_name_str = unsafe { str::from_utf8_unchecked(&model_name) }; let model_name_str = unsafe { str::from_utf8_unchecked(&model_name) };
if compiler::unlikely(!VALID_CONTAINER_NAME.is_match(table_name_str)) {
return Err(responses::groups::BAD_EXPRESSION); // get the entity group
} let entity_group = get_query_entity(&table_name)?;
let splits: Vec<&str> = model_name_str.split('(').collect(); let splits: Vec<&str> = model_name_str.split('(').collect();
if compiler::unlikely(splits.len() != 2) { if compiler::unlikely(splits.len() != 2) {
return Err(responses::groups::BAD_EXPRESSION); return Err(responses::groups::BAD_EXPRESSION);
@ -124,15 +122,10 @@ pub(super) fn parse_table_args(mut act: ActionIter) -> Result<(ObjectID, u8), &'
(STR, BINSTR) => 3, (STR, BINSTR) => 3,
_ => return Err(responses::groups::UNKNOWN_DATA_TYPE), _ => return Err(responses::groups::UNKNOWN_DATA_TYPE),
}; };
Ok((entity_group, model_code))
if compiler::unlikely(table_name_str.len() > 64) {
Err(responses::groups::CONTAINER_NAME_TOO_LONG)
} else {
Ok((unsafe { ObjectID::from_slice(table_name_str) }, model_code))
}
} }
pub(super) fn get_query_entity<'a>(input: &'a Bytes) -> Result<EntityGroup, &'static [u8]> { pub(super) fn get_query_entity<'a>(input: &'a [u8]) -> Result<EntityGroup, &'static [u8]> {
let y: Vec<&[u8]> = input.split(|v| *v == b':').collect(); let y: Vec<&[u8]> = input.split(|v| *v == b':').collect();
unsafe { unsafe {
if y.len() == 1 { if y.len() == 1 {
@ -140,6 +133,10 @@ pub(super) fn get_query_entity<'a>(input: &'a Bytes) -> Result<EntityGroup, &'st
let ksret = y.get_unchecked(0); let ksret = y.get_unchecked(0);
if compiler::unlikely(ksret.len() > 64 || ksret.is_empty()) { if compiler::unlikely(ksret.len() > 64 || ksret.is_empty()) {
Err(responses::groups::BAD_CONTAINER_NAME) Err(responses::groups::BAD_CONTAINER_NAME)
} else if compiler::unlikely(
!VALID_CONTAINER_NAME.is_match(str::from_utf8_unchecked(ksret)),
) {
Err(responses::groups::BAD_EXPRESSION)
} else { } else {
Ok((Some(ObjectID::from_slice(ksret)), None)) Ok((Some(ObjectID::from_slice(ksret)), None))
} }
@ -151,6 +148,11 @@ pub(super) fn get_query_entity<'a>(input: &'a Bytes) -> Result<EntityGroup, &'st
Err(responses::groups::BAD_CONTAINER_NAME) Err(responses::groups::BAD_CONTAINER_NAME)
} else if compiler::unlikely(tblret.is_empty() || ksret.is_empty()) { } else if compiler::unlikely(tblret.is_empty() || ksret.is_empty()) {
Err(responses::groups::BAD_EXPRESSION) Err(responses::groups::BAD_EXPRESSION)
} else if compiler::unlikely(
!VALID_CONTAINER_NAME.is_match(str::from_utf8_unchecked(ksret))
|| !VALID_CONTAINER_NAME.is_match(str::from_utf8_unchecked(tblret)),
) {
Err(responses::groups::BAD_CONTAINER_NAME)
} else { } else {
Ok(( Ok((
Some(ObjectID::from_slice(ksret)), Some(ObjectID::from_slice(ksret)),

Loading…
Cancel
Save