Implement exec, fix delta handling and add resp

next
Sayan Nandan 12 months ago
parent dfdb8a39ec
commit 3e981f3dcb
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -25,7 +25,7 @@
*/
use crate::engine::{
core::{self, model::delta::DataDeltaKind},
core::{self, dml::QueryExecMeta, model::delta::DataDeltaKind},
error::{QueryError, QueryResult},
fractal::GlobalInstanceLike,
idx::MTIndex,
@ -47,16 +47,16 @@ pub fn delete(global: &impl GlobalInstanceLike, mut delete: DeleteStatement) ->
.mt_delete_return_entry(&model.resolve_where(delete.clauses_mut())?, &g)
{
Some(row) => {
delta_state.append_new_data_delta_with(
let dp = delta_state.append_new_data_delta_with(
DataDeltaKind::Delete,
row.clone(),
schema_version,
new_version,
&g,
);
Ok(())
Ok(QueryExecMeta::new(dp))
}
None => Err(QueryError::QPDmlRowNotFound),
None => Err(QueryError::QExecDmlRowNotFound),
}
})
}

@ -27,6 +27,7 @@
use crate::engine::{
core::{
self,
dml::QueryExecMeta,
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{delta::DataDeltaKind, Fields, Model},
},
@ -49,16 +50,16 @@ pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> Quer
let row = Row::new(pk, data, ds.schema_current_version(), new_version);
if mdl.primary_index().__raw_index().mt_insert(row.clone(), &g) {
// append delta for new version
ds.append_new_data_delta_with(
let dp = ds.append_new_data_delta_with(
DataDeltaKind::Insert,
row,
ds.schema_current_version(),
new_version,
&g,
);
Ok(())
Ok(QueryExecMeta::new(dp))
} else {
Err(QueryError::QPDmlDuplicate)
Err(QueryError::QExecDmlDuplicate)
}
})
}
@ -114,6 +115,6 @@ fn prepare_insert(
};
Ok((primary_key, prepared_data))
} else {
Err(QueryError::QPDmlValidationError)
Err(QueryError::QExecDmlValidationError)
}
}

@ -55,7 +55,23 @@ impl Model {
{
Ok(clause.rhs())
}
_ => compiler::cold_rerr(QueryError::QPDmlWhereHasUnindexedColumn),
_ => compiler::cold_rerr(QueryError::QExecDmlWhereHasUnindexedColumn),
}
}
}
pub struct QueryExecMeta {
delta_hint: usize,
}
impl QueryExecMeta {
pub fn new(delta_hint: usize) -> Self {
Self { delta_hint }
}
pub fn zero() -> Self {
Self::new(0)
}
pub fn delta_hint(&self) -> usize {
self.delta_hint
}
}

@ -52,7 +52,7 @@ where
match fields.st_get(key) {
Some(dc) => cellfn(dc),
None if key == mdl.p_key() => cellfn(&pkdc),
None => return Err(QueryError::QPUnknownField),
None => return Err(QueryError::QExecUnknownField),
}
Ok(())
};
@ -69,7 +69,7 @@ where
}
}
}
None => return Err(QueryError::QPDmlRowNotFound),
None => return Err(QueryError::QExecDmlRowNotFound),
}
Ok(())
})

@ -30,7 +30,10 @@ use std::cell::RefCell;
use {
crate::{
engine::{
core::{self, model::delta::DataDeltaKind, query_meta::AssignmentOperator},
core::{
self, dml::QueryExecMeta, model::delta::DataDeltaKind,
query_meta::AssignmentOperator,
},
data::{
cell::Datacell,
lit::Lit,
@ -235,7 +238,7 @@ pub fn collect_trace_path() -> Vec<&'static str> {
#[allow(unused)]
pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) -> QueryResult<()> {
core::with_model_for_data_update(global, update.entity(), |mdl| {
let mut ret = Ok(());
let mut ret = Ok(QueryExecMeta::zero());
// prepare row fetch
let key = mdl.resolve_where(update.clauses_mut())?;
// freeze schema
@ -243,7 +246,7 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) ->
// fetch row
let g = sync::atm::cpin();
let Some(row) = mdl.primary_index().select(key, &g) else {
return Err(QueryError::QPDmlRowNotFound);
return Err(QueryError::QExecDmlRowNotFound);
};
// lock row
let mut row_data_wl = row.d_data().write();
@ -280,7 +283,7 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) ->
_ => {
input_trace("fieldnotfound");
rollback_now = true;
ret = Err(QueryError::QPUnknownField);
ret = Err(QueryError::QExecUnknownField);
break;
}
}
@ -321,13 +324,13 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) ->
} else {
input_trace("list;badtag");
rollback_now = true;
ret = Err(QueryError::QPDmlValidationError);
ret = Err(QueryError::QExecDmlValidationError);
break;
}
}
_ => {
input_trace("unknown_reason;exitmainloop");
ret = Err(QueryError::QPDmlValidationError);
ret = Err(QueryError::QExecDmlValidationError);
rollback_now = true;
break;
}
@ -344,13 +347,14 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) ->
// update revised tag
row_data_wl.set_txn_revised(new_version);
// publish delta
ds.append_new_data_delta_with(
let dp = ds.append_new_data_delta_with(
DataDeltaKind::Update,
row.clone(),
ds.schema_current_version(),
new_version,
&g,
);
ret = Ok(QueryExecMeta::new(dp))
}
ret
})

@ -24,12 +24,169 @@
*
*/
use crate::engine::{error::QueryResult, fractal::Global, net::protocol::SQuery};
use {
crate::engine::{
core::{dml, model::Model, space::Space},
error::{QueryError, QueryResult},
fractal::Global,
net::protocol::{Response, SQuery},
ql::{
ast::{traits::ASTNode, InplaceData, State},
lex::{Keyword, KeywordStmt, Token},
},
},
core::ops::Deref,
};
pub async fn execute_query<'a>(_global: &Global, query: SQuery<'a>) -> QueryResult<()> {
pub async fn dispatch_to_executor<'a, 'b>(
global: &'b Global,
query: SQuery<'a>,
) -> QueryResult<Response> {
let tokens =
crate::engine::ql::lex::SecureLexer::new_with_segments(query.query(), query.params())
.lex()?;
let _ = crate::engine::ql::ast::compile(&tokens, crate::engine::ql::ast::InplaceData::new());
let mut state = State::new_inplace(&tokens);
let stmt = match state.read() {
Token::Keyword(Keyword::Statement(stmt)) if state.remaining() >= 3 => *stmt,
_ => return Err(QueryError::QLExpectedStatement),
};
state.cursor_ahead();
if stmt.is_blocking() {
run_blocking_stmt(state, stmt, global).await
} else {
run_nb(global, state, stmt)
}
}
/*
blocking exec
---
trigger warning: disgusting hacks below (why can't async play nice with lifetimes :|)
*/
struct RawSlice<T> {
t: *const T,
l: usize,
}
unsafe impl<T: Send> Send for RawSlice<T> {}
unsafe impl<T: Sync> Sync for RawSlice<T> {}
impl<T> RawSlice<T> {
#[inline(always)]
unsafe fn new(t: *const T, l: usize) -> Self {
Self { t, l }
}
}
impl<T> Deref for RawSlice<T> {
type Target = [T];
#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe {
// UNSAFE(@ohsayan): the caller MUST guarantee that this remains valid throughout the usage of the slice
core::slice::from_raw_parts(self.t, self.l)
}
}
}
#[inline(always)]
fn call<A: ASTNode<'static> + core::fmt::Debug, T>(
g: Global,
tokens: RawSlice<Token<'static>>,
f: impl FnOnce(&Global, A) -> QueryResult<T>,
) -> QueryResult<T> {
let mut state = State::new_inplace(unsafe {
// UNSAFE(@ohsayan): nothing to drop. all cool
core::mem::transmute(tokens)
});
_call(&g, &mut state, f)
}
#[inline(always)]
fn _call<A: ASTNode<'static> + core::fmt::Debug, T>(
g: &Global,
state: &mut State<'static, InplaceData>,
f: impl FnOnce(&Global, A) -> Result<T, QueryError>,
) -> QueryResult<T> {
let cs = ASTNode::from_state(state)?;
f(&g, cs)
}
async fn run_blocking_stmt(
mut state: State<'_, InplaceData>,
stmt: KeywordStmt,
global: &Global,
) -> Result<Response, QueryError> {
let (a, b) = (&state.current()[0], &state.current()[1]);
let sysctl = stmt == KeywordStmt::Sysctl;
let create = stmt == KeywordStmt::Create;
let alter = stmt == KeywordStmt::Alter;
let drop = stmt == KeywordStmt::Drop;
let last_id = b.is_ident();
let c_s = (create & Token![space].eq(a) & last_id) as u8 * 2;
let c_m = (create & Token![model].eq(a) & last_id) as u8 * 3;
let a_s = (alter & Token![space].eq(a) & last_id) as u8 * 4;
let a_m = (alter & Token![model].eq(a) & last_id) as u8 * 5;
let d_s = (drop & Token![space].eq(a) & last_id) as u8 * 6;
let d_m = (drop & Token![model].eq(a) & last_id) as u8 * 7;
let fc = sysctl as u8 | c_s | c_m | a_s | a_m | d_s | d_m;
state.cursor_ahead();
static BLK_EXEC: [fn(Global, RawSlice<Token<'static>>) -> QueryResult<()>; 8] = [
|_, _| Err(QueryError::QLUnknownStatement), // unknown
blocking_exec_sysctl, // sysctl
|g, t| call(g, t, Space::transactional_exec_create),
|g, t| call(g, t, Model::transactional_exec_create),
|g, t| call(g, t, Space::transactional_exec_alter),
|g, t| call(g, t, Model::transactional_exec_alter),
|g, t| call(g, t, Space::transactional_exec_drop),
|g, t| call(g, t, Model::transactional_exec_drop),
];
let r = unsafe {
// UNSAFE(@ohsayan): the only await is within this block
let c_glob = global.clone();
let ptr = state.current().as_ptr() as usize;
let len = state.current().len();
tokio::task::spawn_blocking(move || {
let tokens = RawSlice::new(ptr as *const Token, len);
BLK_EXEC[fc as usize](c_glob, tokens)?;
Ok(Response::Empty)
})
.await
};
r.unwrap()
}
fn blocking_exec_sysctl(_: Global, _: RawSlice<Token<'static>>) -> QueryResult<()> {
todo!()
}
/*
nb exec
*/
fn run_nb(
global: &Global,
state: State<'_, InplaceData>,
stmt: KeywordStmt,
) -> QueryResult<Response> {
let stmt = stmt.value_u8() - KeywordStmt::Use.value_u8();
static F: [fn(&Global, &mut State<'static, InplaceData>) -> QueryResult<()>; 8] = [
|_, _| panic!("use not implemented"),
|_, _| panic!("inspect not implemented"),
|_, _| panic!("describe not implemented"),
|g, s| _call(g, s, dml::insert),
|_, _| panic!("select not implemented"),
|g, s| _call(g, s, dml::update),
|g, s| _call(g, s, dml::delete),
|_, _| panic!("exists not implemented"),
];
{
let mut state = unsafe {
// UNSAFE(@ohsayan): this is a lifetime issue with the token handle
core::mem::transmute(state)
};
F[stmt as usize](global, &mut state)?;
}
Ok(Response::Empty)
}

@ -57,13 +57,13 @@ pub struct GlobalNS {
index_space: RWLIdx<Box<str>, Space>,
}
pub(self) fn with_model_for_data_update<'a, T, E, F>(
pub(self) fn with_model_for_data_update<'a, E, F>(
global: &impl GlobalInstanceLike,
entity: E,
f: F,
) -> QueryResult<T>
) -> QueryResult<()>
where
F: FnOnce(&Model) -> QueryResult<T>,
F: FnOnce(&Model) -> QueryResult<dml::QueryExecMeta>,
E: 'a + EntityLocator<'a>,
{
let (space_name, model_name) = entity.parse_entity()?;
@ -71,11 +71,15 @@ where
.namespace()
.with_model((space_name, model_name), |mdl| {
let r = f(mdl);
// see if this task local delta is full
if r.is_ok() {
model::DeltaState::guard_delta_overflow(global, space_name, model_name, mdl);
match r {
Ok(dhint) => {
model::DeltaState::guard_delta_overflow(
global, space_name, model_name, mdl, dhint,
);
Ok(())
}
Err(e) => Err(e),
}
r
})
}
@ -101,7 +105,7 @@ impl GlobalNS {
) -> QueryResult<T> {
let sread = self.index_space.read();
let Some(space) = sread.st_get(space) else {
return Err(QueryError::QPObjectNotFound);
return Err(QueryError::QExecObjectNotFound);
};
f(space)
}

@ -84,7 +84,7 @@ fn no_field(mr: &IWModel, new: &str) -> bool {
fn check_nullable(props: &mut HashMap<Box<str>, DictEntryGeneric>) -> QueryResult<bool> {
match props.remove("nullable") {
Some(DictEntryGeneric::Data(b)) if b.kind() == TagClass::Bool => Ok(b.bool()),
Some(_) => Err(QueryError::QPDdlInvalidProperties),
Some(_) => Err(QueryError::QExecDdlInvalidProperties),
None => Ok(false),
}
}
@ -101,7 +101,7 @@ impl<'a> AlterPlan<'a> {
AlterKind::Remove(r) => {
let mut x = HashSet::new();
if !r.iter().all(|id| x.insert(id.as_str())) {
return Err(QueryError::QPDdlModelAlterIllegal);
return Err(QueryError::QExecDdlModelAlterIllegal);
}
let mut not_found = false;
if r.iter().all(|id| {
@ -112,9 +112,9 @@ impl<'a> AlterPlan<'a> {
}) {
can_ignore!(AlterAction::Remove(r))
} else if not_found {
return Err(QueryError::QPUnknownField);
return Err(QueryError::QExecUnknownField);
} else {
return Err(QueryError::QPDdlModelAlterIllegal);
return Err(QueryError::QExecDdlModelAlterIllegal);
}
}
AlterKind::Add(new_fields) => {
@ -148,7 +148,7 @@ impl<'a> AlterPlan<'a> {
mv.guard_pk(&field_name)?;
// get the current field
let Some(current_field) = wm.fields().st_get(field_name.as_str()) else {
return Err(QueryError::QPUnknownField);
return Err(QueryError::QExecUnknownField);
};
// check props
let is_nullable = check_nullable(&mut props)?;
@ -174,7 +174,7 @@ impl<'a> AlterPlan<'a> {
no_lock,
})
} else {
Err(QueryError::QPDdlModelAlterIllegal)
Err(QueryError::QExecDdlModelAlterIllegal)
}
}
fn ldeltas(
@ -197,7 +197,7 @@ impl<'a> AlterPlan<'a> {
}
if layers.len() > current.layers().len() {
// simply a dumb tomato; ELIMINATE THESE DUMB TOMATOES
return Err(QueryError::QPDdlModelAlterIllegal);
return Err(QueryError::QExecDdlModelAlterIllegal);
}
let mut no_lock = !(current.is_nullable() & !nullable);
let mut deltasize = (current.is_nullable() ^ nullable) as usize;
@ -216,7 +216,7 @@ impl<'a> AlterPlan<'a> {
// actually parse the new layer
okay &= props.is_empty();
let Some(new_parsed_layer) = Layer::get_layer(&ty) else {
return Err(QueryError::QPDdlInvalidTypeDefinition);
return Err(QueryError::QExecDdlInvalidTypeDefinition);
};
match (
current_layer.tag.tag_selector(),
@ -233,7 +233,7 @@ impl<'a> AlterPlan<'a> {
}
_ => {
// can't cast this directly
return Err(QueryError::QPDdlInvalidTypeDefinition);
return Err(QueryError::QExecDdlInvalidTypeDefinition);
}
}
*new_layer = new_parsed_layer;
@ -243,13 +243,12 @@ impl<'a> AlterPlan<'a> {
if okay {
Ok((deltasize != 0, new_field))
} else {
Err(QueryError::QPDdlModelAlterIllegal)
Err(QueryError::QExecDdlModelAlterIllegal)
}
}
}
impl Model {
#[allow(unused)]
pub fn transactional_exec_alter<G: GlobalInstanceLike>(
global: &G,
alter: AlterModel,
@ -264,7 +263,7 @@ impl Model {
// we have a legal plan; acquire exclusive if we need it
if !plan.no_lock {
// TODO(@ohsayan): allow this later on, once we define the syntax
return Err(QueryError::QPNeedLock);
return Err(QueryError::QExecNeedLock);
}
// fine, we're good
let mut iwm = iwm;

@ -30,7 +30,7 @@ use {
super::{Fields, Model},
crate::{
engine::{
core::index::Row,
core::{dml::QueryExecMeta, index::Row},
fractal::{FractalToken, GlobalInstanceLike},
sync::atm::Guard,
sync::queue::Queue,
@ -186,19 +186,9 @@ impl DeltaState {
space_name: &str,
model_name: &str,
model: &Model,
hint: QueryExecMeta,
) {
let current_deltas_size = model.delta_state().data_deltas_size.load(Ordering::Acquire);
let max_len = global
.get_max_delta_size()
.min((model.primary_index().count() as f64 * 0.05) as usize);
if compiler::unlikely(current_deltas_size >= max_len) {
global.request_batch_resolve(
space_name,
model_name,
model.get_uuid(),
current_deltas_size,
);
}
global.request_batch_resolve_if_cache_full(space_name, model_name, model, hint)
}
}
@ -211,12 +201,12 @@ impl DeltaState {
schema_version: DeltaVersion,
data_version: DeltaVersion,
g: &Guard,
) {
self.append_new_data_delta(DataDelta::new(schema_version, data_version, row, kind), g);
) -> usize {
self.append_new_data_delta(DataDelta::new(schema_version, data_version, row, kind), g)
}
pub fn append_new_data_delta(&self, delta: DataDelta, g: &Guard) {
pub fn append_new_data_delta(&self, delta: DataDelta, g: &Guard) -> usize {
self.data_deltas.blocking_enqueue(delta, g);
self.data_deltas_size.fetch_add(1, Ordering::Release);
self.data_deltas_size.fetch_add(1, Ordering::Release) + 1
}
pub fn create_new_data_delta_version(&self) -> DeltaVersion {
DeltaVersion(self.__data_delta_step())

@ -134,7 +134,7 @@ impl Model {
}
fn guard_pk(&self, new: &str) -> QueryResult<()> {
if self.is_pk(new) {
Err(QueryError::QPDdlModelAlterIllegal)
Err(QueryError::QExecDdlModelAlterIllegal)
} else {
Ok(())
}
@ -195,12 +195,11 @@ impl Model {
return Ok(Self::new_restore(Uuid::new(), last_pk.into(), tag, fields));
}
}
Err(QueryError::QPDdlModelBadDefinition)
Err(QueryError::QExecDdlModelBadDefinition)
}
}
impl Model {
#[allow(unused)]
pub fn transactional_exec_create<G: GlobalInstanceLike>(
global: &G,
stmt: CreateModel,
@ -210,7 +209,7 @@ impl Model {
global.namespace().with_space(space_name, |space| {
let mut w_space = space.models().write();
if w_space.st_contains(model_name) {
return Err(QueryError::QPDdlObjectAlreadyExists);
return Err(QueryError::QExecDdlObjectAlreadyExists);
}
if G::FS_IS_NON_NULL {
let irm = model.intent_read_model();
@ -251,7 +250,6 @@ impl Model {
Ok(())
})
}
#[allow(unused)]
pub fn transactional_exec_drop<G: GlobalInstanceLike>(
global: &G,
stmt: DropModel,
@ -260,7 +258,7 @@ impl Model {
global.namespace().with_space(space_name, |space| {
let mut w_space = space.models().write();
let Some(model) = w_space.get(model_name) else {
return Err(QueryError::QPObjectNotFound);
return Err(QueryError::QExecObjectNotFound);
};
if G::FS_IS_NON_NULL {
// prepare txn
@ -366,7 +364,7 @@ impl Field {
nullable,
})
} else {
Err(QueryError::QPDdlInvalidTypeDefinition)
Err(QueryError::QExecDdlInvalidTypeDefinition)
}
}
#[inline(always)]

@ -93,7 +93,7 @@ impl Space {
{
Ok(())
} else {
Err(QueryError::QPDdlObjectAlreadyExists)
Err(QueryError::QExecDdlObjectAlreadyExists)
}
}
pub fn get_uuid(&self) -> Uuid {
@ -112,7 +112,7 @@ impl Space {
) -> QueryResult<T> {
let mread = self.mns.read();
let Some(model) = mread.st_get(model) else {
return Err(QueryError::QPObjectNotFound);
return Err(QueryError::QExecObjectNotFound);
};
f(model)
}
@ -161,7 +161,7 @@ impl Space {
None if props.is_empty() => IndexST::default(),
_ => {
// unknown properties
return Err(QueryError::QPDdlInvalidProperties);
return Err(QueryError::QExecDdlInvalidProperties);
}
};
Ok(ProcedureCreate {
@ -178,7 +178,6 @@ impl Space {
}
impl Space {
#[allow(unused)]
pub fn transactional_exec_create<G: GlobalInstanceLike>(
global: &G,
space: CreateSpace,
@ -188,7 +187,7 @@ impl Space {
// acquire access
let mut wl = global.namespace().spaces().write();
if wl.st_contains(&space_name) {
return Err(QueryError::QPDdlObjectAlreadyExists);
return Err(QueryError::QExecDdlObjectAlreadyExists);
}
// commit txn
if G::FS_IS_NON_NULL {
@ -229,13 +228,13 @@ impl Space {
Some(DictEntryGeneric::Map(_)) if updated_props.len() == 1 => {}
Some(DictEntryGeneric::Data(l)) if updated_props.len() == 1 && l.is_null() => {}
None if updated_props.is_empty() => return Ok(()),
_ => return Err(QueryError::QPDdlInvalidProperties),
_ => return Err(QueryError::QExecDdlInvalidProperties),
}
let mut space_props = space.meta.dict().write();
// create patch
let patch = match dict::rprepare_metadata_patch(&space_props, updated_props) {
Some(patch) => patch,
None => return Err(QueryError::QPDdlInvalidProperties),
None => return Err(QueryError::QExecDdlInvalidProperties),
};
if G::FS_IS_NON_NULL {
// prepare txn
@ -255,7 +254,6 @@ impl Space {
Ok(())
})
}
#[allow(unused)]
pub fn transactional_exec_drop<G: GlobalInstanceLike>(
global: &G,
DropSpace { space, force: _ }: DropSpace,
@ -266,11 +264,11 @@ impl Space {
let mut wgns = global.namespace().spaces().write();
let space = match wgns.get(space_name.as_str()) {
Some(space) => space,
None => return Err(QueryError::QPObjectNotFound),
None => return Err(QueryError::QExecObjectNotFound),
};
let space_w = space.mns.write();
if space_w.st_len() != 0 {
return Err(QueryError::QPDdlNotEmpty);
return Err(QueryError::QExecDdlNotEmpty);
}
// we can remove this
if G::FS_IS_NON_NULL {

@ -164,7 +164,7 @@ mod plan {
|_| {}
)
.unwrap_err(),
QueryError::QPUnknownField
QueryError::QExecUnknownField
);
}
#[test]
@ -176,7 +176,7 @@ mod plan {
|_| {}
)
.unwrap_err(),
QueryError::QPDdlModelAlterIllegal
QueryError::QExecDdlModelAlterIllegal
);
}
#[test]
@ -188,7 +188,7 @@ mod plan {
|_| {}
)
.unwrap_err(),
QueryError::QPDdlModelAlterIllegal
QueryError::QExecDdlModelAlterIllegal
);
}
#[test]
@ -200,7 +200,7 @@ mod plan {
|_| {}
)
.unwrap_err(),
QueryError::QPDdlModelAlterIllegal
QueryError::QExecDdlModelAlterIllegal
);
}
#[test]
@ -212,7 +212,7 @@ mod plan {
|_| {}
)
.unwrap_err(),
QueryError::QPDdlModelAlterIllegal
QueryError::QExecDdlModelAlterIllegal
);
}
#[test]
@ -224,7 +224,7 @@ mod plan {
|_| {}
)
.unwrap_err(),
QueryError::QPUnknownField
QueryError::QExecUnknownField
);
}
fn bad_type_cast(orig_ty: &str, new_ty: &str) {
@ -235,7 +235,7 @@ mod plan {
super::with_plan(&create, &alter, |_| {}).expect_err(&format!(
"found no error in transformation: {orig_ty} -> {new_ty}"
)),
QueryError::QPDdlInvalidTypeDefinition,
QueryError::QExecDdlInvalidTypeDefinition,
"failed to match error in transformation: {orig_ty} -> {new_ty}",
)
}
@ -445,7 +445,7 @@ mod exec {
|_| {},
)
.unwrap_err(),
QueryError::QPNeedLock
QueryError::QExecNeedLock
);
}
}

@ -89,7 +89,7 @@ mod validation {
"create model mymodel(primary username: string, primary contract_location: binary)"
)
.unwrap_err(),
QueryError::QPDdlModelBadDefinition
QueryError::QExecDdlModelBadDefinition
);
}
@ -97,7 +97,7 @@ mod validation {
fn duplicate_fields() {
assert_eq!(
create("create model mymodel(primary username: string, username: binary)").unwrap_err(),
QueryError::QPDdlModelBadDefinition
QueryError::QExecDdlModelBadDefinition
);
}
@ -105,7 +105,7 @@ mod validation {
fn illegal_props() {
assert_eq!(
create("create model mymodel(primary username: string, password: binary) with { lol_prop: false }").unwrap_err(),
QueryError::QPDdlModelBadDefinition
QueryError::QExecDdlModelBadDefinition
);
}
@ -116,12 +116,12 @@ mod validation {
"create model mymodel(primary username_bytes: list { type: uint8 }, password: binary)"
)
.unwrap_err(),
QueryError::QPDdlModelBadDefinition
QueryError::QExecDdlModelBadDefinition
);
assert_eq!(
create("create model mymodel(primary username: float32, password: binary)")
.unwrap_err(),
QueryError::QPDdlModelBadDefinition
QueryError::QExecDdlModelBadDefinition
);
}
}

@ -64,7 +64,7 @@ mod layer_spec_validation {
fn invalid_list() {
assert_eq!(
layerview("list").unwrap_err(),
QueryError::QPDdlInvalidTypeDefinition
QueryError::QExecDdlInvalidTypeDefinition
);
}
@ -72,7 +72,7 @@ mod layer_spec_validation {
fn invalid_flat() {
assert_eq!(
layerview("string { type: string }").unwrap_err(),
QueryError::QPDdlInvalidTypeDefinition
QueryError::QExecDdlInvalidTypeDefinition
);
}
}

@ -122,7 +122,7 @@ fn alter_nx() {
|_| {},
)
.unwrap_err(),
QueryError::QPObjectNotFound
QueryError::QExecObjectNotFound
);
}

@ -73,7 +73,7 @@ fn exec_create_space_with_bad_env_type() {
let global = TestGlobal::new_with_tmp_nullfs_driver();
assert_eq!(
super::exec_create(&global, "create space myspace with { env: 100 }", |_| {}).unwrap_err(),
QueryError::QPDdlInvalidProperties
QueryError::QExecDdlInvalidProperties
);
}
@ -87,6 +87,6 @@ fn exec_create_space_with_random_property() {
|_| {}
)
.unwrap_err(),
QueryError::QPDdlInvalidProperties
QueryError::QExecDdlInvalidProperties
);
}

@ -51,6 +51,6 @@ fn delete_nonexisting() {
"sayan",
)
.unwrap_err(),
QueryError::QPDmlRowNotFound
QueryError::QExecDmlRowNotFound
);
}

@ -83,6 +83,6 @@ fn insert_duplicate() {
assert_eq!(
super::exec_insert_only(&global, "insert into myspace.mymodel('sayan', 'pass123')")
.unwrap_err(),
QueryError::QPDmlDuplicate
QueryError::QExecDmlDuplicate
);
}

@ -97,6 +97,6 @@ fn select_nonexisting() {
"select username, password from myspace.mymodel where username = 'notsayan'",
)
.unwrap_err(),
QueryError::QPDmlRowNotFound
QueryError::QExecDmlRowNotFound
);
}

@ -96,7 +96,7 @@ fn fail_operation_on_null() {
"select * from myspace.mymodel where username='sayan'"
)
.unwrap_err(),
QueryError::QPDmlValidationError
QueryError::QExecDmlValidationError
);
assert_eq!(
dml::update_flow_trace(),
@ -116,7 +116,7 @@ fn fail_unknown_fields() {
"select * from myspace.mymodel where username='sayan'"
)
.unwrap_err(),
QueryError::QPUnknownField
QueryError::QExecUnknownField
);
assert_eq!(dml::update_flow_trace(), ["fieldnotfound", "rollback"]);
// verify integrity
@ -142,7 +142,7 @@ fn fail_typedef_violation() {
"select * from myspace.mymodel where username = 'sayan'"
)
.unwrap_err(),
QueryError::QPDmlValidationError
QueryError::QExecDmlValidationError
);
assert_eq!(
dml::update_flow_trace(),

@ -46,6 +46,6 @@ impl<'a> EntityLocator<'a> for Entity<'a> {
where
Self: 'a,
{
self.into_full_str().ok_or(QueryError::QPExpectedEntity)
self.into_full_str().ok_or(QueryError::QLExpectedEntity)
}
}

@ -31,71 +31,78 @@ pub type QueryResult<T> = Result<T, QueryError>;
/// an enumeration of 'flat' errors that the server actually responds to the client with, since we do not want to send specific information
/// about anything (as that will be a security hole). The variants correspond with their actual response codes
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq, sky_macros::EnumMethods)]
#[repr(u8)]
pub enum QueryError {
// system
/// I/O error
SysServerError,
SysServerError = 0,
/// out of memory
SysOutOfMemory,
SysOutOfMemory = 1,
/// unknown server error
SysUnknownError,
SysUnknownError = 2,
/// system auth error
SysAuthError = 3,
/// transactional error
SysTransactionalError = 4,
// exchange
NetworkSubsystemCorruptedPacket = 24,
// QL
/// something like an integer that randomly has a character to attached to it like `1234q`
LexInvalidLiteral,
LexInvalidLiteral = 25,
/// something like an invalid 'string" or a safe string with a bad length etc
LexInvalidEscapedLiteral,
LexInvalidParameter = 26,
/// unexpected byte
LexUnexpectedByte,
LexUnexpectedByte = 27,
/// expected a longer statement
QLUnexpectedEndOfStatement,
QLUnexpectedEndOfStatement = 28,
/// incorrect syntax for "something"
QLInvalidSyntax,
QLInvalidSyntax = 29,
/// invalid collection definition definition
QLInvalidCollectionSyntax,
QLInvalidCollectionSyntax = 30,
/// invalid type definition syntax
QLInvalidTypeDefinitionSyntax,
QLInvalidTypeDefinitionSyntax = 31,
/// expected a full entity definition
QPExpectedEntity,
QLExpectedEntity = 32,
/// expected a statement, found something else
QPExpectedStatement,
QLExpectedStatement = 33,
/// unknown statement
QPUnknownStatement,
/// this query needs a lock for execution, but that wasn't explicitly allowed anywhere
QPNeedLock,
QLUnknownStatement = 34,
// exec
/// the object to be used as the "query container" is missing (for example, insert when the model was missing)
QPObjectNotFound,
QExecObjectNotFound = 100,
/// an unknown field was attempted to be accessed/modified/...
QPUnknownField,
QExecUnknownField = 101,
/// invalid property for an object
QPDdlInvalidProperties,
QExecDdlInvalidProperties = 102,
/// create space/model, but the object already exists
QPDdlObjectAlreadyExists,
QExecDdlObjectAlreadyExists = 103,
/// an object that was attempted to be removed is non-empty, and for this object, removals require it to be empty
QPDdlNotEmpty,
QExecDdlNotEmpty = 104,
/// invalid type definition
QPDdlInvalidTypeDefinition,
QExecDdlInvalidTypeDefinition = 105,
/// bad model definition
QPDdlModelBadDefinition,
QExecDdlModelBadDefinition = 106,
/// illegal alter model query
QPDdlModelAlterIllegal,
QExecDdlModelAlterIllegal = 107,
// exec DML
/// violated the uniqueness property
QPDmlDuplicate,
QExecDmlDuplicate = 150,
/// the data could not be validated for being accepted into a field/function/etc.
QPDmlValidationError,
QExecDmlValidationError = 151,
/// the where expression has an unindexed column essentially implying that we can't run this query because of perf concerns
QPDmlWhereHasUnindexedColumn,
QExecDmlWhereHasUnindexedColumn = 152,
/// the row matching the given match expression was not found
QPDmlRowNotFound,
/// transactional error
TransactionalError,
SysAuthError,
QExecDmlRowNotFound = 153,
/// this query needs a lock for execution, but that wasn't explicitly allowed anywhere
QExecNeedLock = 154,
}
impl From<super::fractal::error::Error> for QueryError {
fn from(e: super::fractal::error::Error) -> Self {
match e.kind() {
ErrorKind::IoError(_) | ErrorKind::Storage(_) => QueryError::SysServerError,
ErrorKind::Txn(_) => QueryError::TransactionalError,
ErrorKind::Txn(_) => QueryError::SysTransactionalError,
ErrorKind::Other(_) => QueryError::SysUnknownError,
ErrorKind::Config(_) => unreachable!("config error cannot propagate here"),
}

@ -271,11 +271,6 @@ impl FractalMgr {
// branch returning. but it is okay
return Ok(());
}
// mark that we're taking these deltas
model.delta_state().__fractal_take_from_data_delta(
observed_size,
super::FractalToken::new(),
);
Self::try_write_model_data_batch(model, observed_size, mdl_driver)
},
);

@ -26,7 +26,7 @@
use {
super::{
core::GlobalNS,
core::{dml::QueryExecMeta, model::Model, GlobalNS},
data::uuid::Uuid,
storage::{
self,
@ -123,17 +123,26 @@ pub trait GlobalInstanceLike {
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>);
fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>);
// default impls
fn request_batch_resolve(
fn request_batch_resolve_if_cache_full(
&self,
space_name: &str,
model_name: &str,
model_uuid: Uuid,
observed_len: usize,
model: &Model,
hint: QueryExecMeta,
) {
self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch(
ModelUniqueID::new(space_name, model_name, model_uuid),
observed_len,
)))
let current_delta_size = hint.delta_hint();
let index_size = model.primary_index().count();
let five = (index_size as f64 * 0.05) as usize;
let max_delta = five.max(self.get_max_delta_size());
if current_delta_size >= max_delta {
let obtained_delta_size = model
.delta_state()
.__fractal_take_full_from_data_delta(FractalToken::new());
self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
obtained_delta_size,
)));
}
}
// config handle
fn sys_cfg(&self) -> &config::SysConfig;

@ -88,6 +88,233 @@ macro_rules! flags {
);
}
macro_rules! __kw_misc {
($ident:ident) => {
$crate::engine::ql::lex::Token::Keyword($crate::engine::ql::lex::Keyword::Misc(
$crate::engine::ql::lex::KeywordMisc::$ident,
))
};
}
macro_rules! __kw_stmt {
($ident:ident) => {
$crate::engine::ql::lex::Token::Keyword($crate::engine::ql::lex::Keyword::Statement(
$crate::engine::ql::lex::KeywordStmt::$ident,
))
};
}
/*
Frankly, this is just for lazy people like me. Do not judge
-- Sayan (@ohsayan)
*/
macro_rules! Token {
// misc symbol
(@) => {
__sym_token!(SymAt)
};
(#) => {
__sym_token!(SymHash)
};
($) => {
__sym_token!(SymDollar)
};
(%) => {
__sym_token!(SymPercent)
};
(.) => {
__sym_token!(SymPeriod)
};
(,) => {
__sym_token!(SymComma)
};
(_) => {
__sym_token!(SymUnderscore)
};
(?) => {
__sym_token!(SymQuestion)
};
(:) => {
__sym_token!(SymColon)
};
(;) => {
__sym_token!(SymSemicolon)
};
(~) => {
__sym_token!(SymTilde)
};
// logical
(!) => {
__sym_token!(OpLogicalNot)
};
(^) => {
__sym_token!(OpLogicalXor)
};
(&) => {
__sym_token!(OpLogicalAnd)
};
(|) => {
__sym_token!(OpLogicalOr)
};
// operator misc.
(=) => {
__sym_token!(OpAssign)
};
// arithmetic
(+) => {
__sym_token!(OpArithmeticAdd)
};
(-) => {
__sym_token!(OpArithmeticSub)
};
(*) => {
__sym_token!(OpArithmeticMul)
};
(/) => {
__sym_token!(OpArithmeticDiv)
};
// relational
(>) => {
__sym_token!(OpComparatorGt)
};
(<) => {
__sym_token!(OpComparatorLt)
};
// ddl keywords
(use) => {
__kw_stmt!(Use)
};
(create) => {
__kw_stmt!(Create)
};
(alter) => {
__kw_stmt!(Alter)
};
(drop) => {
__kw_stmt!(Drop)
};
(model) => {
__kw_misc!(Model)
};
(space) => {
__kw_misc!(Space)
};
(primary) => {
__kw_misc!(Primary)
};
// ddl misc
(with) => {
__kw_misc!(With)
};
(add) => {
__kw_misc!(Add)
};
(remove) => {
__kw_misc!(Remove)
};
(sort) => {
__kw_misc!(Sort)
};
(type) => {
__kw_misc!(Type)
};
// dml
(insert) => {
__kw_stmt!(Insert)
};
(select) => {
__kw_stmt!(Select)
};
(update) => {
__kw_stmt!(Update)
};
(delete) => {
__kw_stmt!(Delete)
};
// dml misc
(set) => {
__kw_misc!(Set)
};
(limit) => {
__kw_misc!(Limit)
};
(from) => {
__kw_misc!(From)
};
(into) => {
__kw_misc!(Into)
};
(where) => {
__kw_misc!(Where)
};
(if) => {
__kw_misc!(If)
};
(and) => {
__kw_misc!(And)
};
(as) => {
__kw_misc!(As)
};
(by) => {
__kw_misc!(By)
};
(asc) => {
__kw_misc!(Asc)
};
(desc) => {
__kw_misc!(Desc)
};
// types
(string) => {
__kw_misc!(String)
};
(binary) => {
__kw_misc!(Binary)
};
(list) => {
__kw_misc!(List)
};
(map) => {
__kw_misc!(Map)
};
(bool) => {
__kw_misc!(Bool)
};
(int) => {
__kw_misc!(Int)
};
(double) => {
__kw_misc!(Double)
};
(float) => {
__kw_misc!(Float)
};
// tt
(open {}) => {
__sym_token!(TtOpenBrace)
};
(close {}) => {
__sym_token!(TtCloseBrace)
};
(() open) => {
__sym_token!(TtOpenParen)
};
(() close) => {
__sym_token!(TtCloseParen)
};
(open []) => {
__sym_token!(TtOpenSqBracket)
};
(close []) => {
__sym_token!(TtCloseSqBracket)
};
// misc
(null) => {
__kw_misc!(Null)
};
}
macro_rules! union {
($(#[$attr:meta])* $vis:vis union $name:ident $tail:tt) => (union!(@parse [$(#[$attr])* $vis union $name] [] $tail););
($(#[$attr:meta])* $vis:vis union $name:ident<$($lt:lifetime),*> $tail:tt) => (union!(@parse [$(#[$attr])* $vis union $name<$($lt),*>] [] $tail););

@ -24,12 +24,13 @@
*
*/
use super::config::ConfigEndpointTcp;
pub mod protocol;
use {
crate::engine::{error::RuntimeResult, fractal::error::ErrorContext, fractal::Global},
crate::engine::{
config::ConfigEndpointTcp, error::RuntimeResult, fractal::error::ErrorContext,
fractal::Global,
},
bytes::BytesMut,
openssl::{
pkey::PKey,
@ -39,7 +40,7 @@ use {
},
std::{cell::Cell, net::SocketAddr, pin::Pin, time::Duration},
tokio::{
io::{AsyncRead, AsyncWrite, BufWriter},
io::{AsyncRead, AsyncWrite, BufWriter, AsyncWriteExt},
net::{TcpListener, TcpStream},
sync::{broadcast, mpsc, Semaphore},
},
@ -128,6 +129,7 @@ impl<S: Socket> ConnectionHandler<S> {
loop {
tokio::select! {
ret = protocol::query_loop(socket, buffer, global) => {
socket.flush().await?;
match ret {
Ok(QueryLoopResult::Fin) => return Ok(()),
Ok(QueryLoopResult::Rst) => error!("connection reset while talking to client"),

@ -80,28 +80,7 @@ pub fn resume<'a>(
state: QueryTimeExchangeState,
) -> QueryTimeExchangeResult<'a> {
match state {
QueryTimeExchangeState::Initial => {
if cfg!(debug_assertions) {
if !scanner.has_left(EXCHANGE_MIN_SIZE) {
return STATE_READ_INITIAL;
}
} else {
assert!(scanner.has_left(EXCHANGE_MIN_SIZE));
}
// attempt to read atleast one byte
if cfg!(debug_assertions) {
match scanner.try_next_byte() {
Some(b'S') => SQuery::resume_initial(scanner),
Some(_) => return STATE_ERROR,
None => return STATE_READ_INITIAL,
}
} else {
match unsafe { scanner.next_byte() } {
b'S' => SQuery::resume_initial(scanner),
_ => return STATE_ERROR,
}
}
}
QueryTimeExchangeState::Initial => SQuery::resume_initial(scanner),
QueryTimeExchangeState::SQ1Meta1Partial { packet_size_part } => {
SQuery::resume_at_sq1_meta1_partial(scanner, packet_size_part)
}
@ -199,6 +178,26 @@ impl<'a> SQuery<'a> {
impl<'a> SQuery<'a> {
/// We're touching this packet for the first time
fn resume_initial(scanner: &mut BufferedScanner<'a>) -> QueryTimeExchangeResult<'a> {
if cfg!(debug_assertions) {
if !scanner.has_left(EXCHANGE_MIN_SIZE) {
return STATE_READ_INITIAL;
}
} else {
assert!(scanner.has_left(EXCHANGE_MIN_SIZE));
}
// attempt to read atleast one byte
if cfg!(debug_assertions) {
match scanner.try_next_byte() {
Some(b'S') => {}
Some(_) => return STATE_ERROR,
None => return STATE_READ_INITIAL,
}
} else {
match unsafe { scanner.next_byte() } {
b'S' => {}
_ => return STATE_ERROR,
}
}
Self::resume_at_sq1_meta1_partial(scanner, 0)
}
/// We found some part of meta1, and need to resume
@ -239,11 +238,11 @@ impl<'a> SQuery<'a> {
match parse_lf_separated(scanner, prev_qw_buffered) {
LFTIntParseResult::Value(q_window) => {
// we got the q window; can we complete the exchange?
Self::resume_at_final(
scanner,
q_window as usize,
Self::compute_df_size(scanner, static_size, packet_size),
)
let df_size = Self::compute_df_size(scanner, static_size, packet_size);
if df_size == 0 {
return QueryTimeExchangeResult::Error;
}
Self::resume_at_final(scanner, q_window as usize, df_size)
}
LFTIntParseResult::Partial(q_window_partial) => {
// not enough bytes for getting Q window
@ -290,7 +289,7 @@ impl<'a> SQuery<'a> {
impl<'a> SQuery<'a> {
fn compute_df_size(scanner: &BufferedScanner, static_size: usize, packet_size: usize) -> usize {
packet_size - scanner.cursor() + static_size
(packet_size + static_size) - scanner.cursor()
}
fn compute_df_remaining(scanner: &BufferedScanner<'_>, df_size: usize) -> usize {
(scanner.cursor() + df_size) - scanner.buffer_len()

@ -24,8 +24,6 @@
*
*/
use tokio::io::AsyncWriteExt;
mod exchange;
mod handshake;
#[cfg(test)]
@ -45,13 +43,19 @@ use {
super::{IoResult, QueryLoopResult, Socket},
crate::engine::{
self,
error::QueryError,
fractal::{Global, GlobalInstanceLike},
mem::BufferedScanner,
},
bytes::{Buf, BytesMut},
tokio::io::{AsyncReadExt, BufWriter},
tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter},
};
#[derive(Debug, PartialEq)]
pub enum Response {
Empty,
}
pub(super) async fn query_loop<S: Socket>(
con: &mut BufWriter<S>,
buf: &mut BytesMut,
@ -64,13 +68,14 @@ pub(super) async fn query_loop<S: Socket>(
PostHandshake::ConnectionClosedRst => return Ok(QueryLoopResult::Rst),
PostHandshake::Error(e) => {
// failed to handshake; we'll close the connection
let hs_err_packet = [b'H', b'1', 1, e.value_u8()];
let hs_err_packet = [b'H', 0, 1, e.value_u8()];
con.write_all(&hs_err_packet).await?;
return Ok(QueryLoopResult::HSFailed);
}
};
// done handshaking
con.write_all(b"H1\x00\x00\x00").await?;
con.write_all(b"H\x00\x00\x00").await?;
con.flush().await?;
let mut exchg_state = QueryTimeExchangeState::default();
let mut expect_more = exchange::EXCHANGE_MIN_SIZE;
let mut cursor = 0;
@ -100,21 +105,27 @@ pub(super) async fn query_loop<S: Socket>(
}
QueryTimeExchangeResult::SQCompleted(sq) => sq,
QueryTimeExchangeResult::Error => {
con.write_all(b"!\x00").await?;
let [a, b] =
(QueryError::NetworkSubsystemCorruptedPacket.value_u8() as u16).to_le_bytes();
con.write_all(&[0x10, a, b]).await?;
con.flush().await?;
buf.clear();
exchg_state = QueryTimeExchangeState::default();
continue;
}
};
// now execute query
match engine::core::exec::execute_query(global, sq).await {
Ok(()) => {
buf.clear();
match engine::core::exec::dispatch_to_executor(global, sq).await {
Ok(Response::Empty) => {
con.write_all(&[0x12]).await?;
}
Err(e) => {
let [a, b] = (e.value_u8() as u16).to_le_bytes();
con.write_all(&[0x10, a, b]).await?;
}
Err(_e) => {
// TOOD(@ohsayan): actual error codes!
con.write_all(&[b'!', 1]).await?;
},
}
con.flush().await?;
buf.clear();
exchg_state = QueryTimeExchangeState::default();
}
}
@ -152,6 +163,7 @@ async fn do_handshake<S: Socket>(
match handshake::CHandshake::resume_with(&mut scanner, state) {
HandshakeResult::Completed(hs) => {
handshake = hs;
cursor = scanner.cursor();
break;
}
HandshakeResult::ChangeState { new_state, expect } => {

@ -26,10 +26,11 @@
pub mod traits;
#[cfg(debug_assertions)]
use self::traits::ASTNode;
#[cfg(test)]
pub use traits::{parse_ast_node_full, parse_ast_node_multiple_full};
use {
self::traits::ASTNode,
super::{
ddl, dml,
lex::{Ident, Token},
@ -57,7 +58,6 @@ pub struct State<'a, Qd> {
f: bool,
}
#[cfg(test)]
impl<'a> State<'a, InplaceData> {
pub const fn new_inplace(tok: &'a [Token<'a>]) -> Self {
Self::new(tok, InplaceData::new())
@ -392,7 +392,7 @@ impl<'a> Entity<'a> {
*c += 1;
Self::parse_uck_tokens_single(tok)
},
_ => return Err(QueryError::QPExpectedEntity),
_ => return Err(QueryError::QLExpectedEntity),
};
Ok(r)
}
@ -408,7 +408,7 @@ impl<'a> Entity<'a> {
Ok(e.assume_init())
}
} else {
Err(QueryError::QPExpectedEntity)
Err(QueryError::QLExpectedEntity)
}
}
#[inline(always)]
@ -495,6 +495,7 @@ pub enum Statement<'a> {
}
#[inline(always)]
#[cfg(debug_assertions)]
pub fn compile<'a, Qd: QueryData<'a>>(tok: &'a [Token<'a>], d: Qd) -> QueryResult<Statement<'a>> {
if compiler::unlikely(tok.len() < 2) {
return Err(QueryError::QLUnexpectedEndOfStatement);
@ -506,12 +507,12 @@ pub fn compile<'a, Qd: QueryData<'a>>(tok: &'a [Token<'a>], d: Qd) -> QueryResul
Token![create] => match state.fw_read() {
Token![model] => ASTNode::from_state(&mut state).map(Statement::CreateModel),
Token![space] => ASTNode::from_state(&mut state).map(Statement::CreateSpace),
_ => compiler::cold_rerr(QueryError::QPUnknownStatement),
_ => compiler::cold_rerr(QueryError::QLUnknownStatement),
},
Token![alter] => match state.fw_read() {
Token![model] => ASTNode::from_state(&mut state).map(Statement::AlterModel),
Token![space] => ASTNode::from_state(&mut state).map(Statement::AlterSpace),
_ => compiler::cold_rerr(QueryError::QPUnknownStatement),
_ => compiler::cold_rerr(QueryError::QLUnknownStatement),
},
Token![drop] if state.remaining() >= 2 => ddl::drop::parse_drop(&mut state),
Token::Ident(id) if id.eq_ignore_ascii_case("inspect") => {
@ -522,6 +523,6 @@ pub fn compile<'a, Qd: QueryData<'a>>(tok: &'a [Token<'a>], d: Qd) -> QueryResul
Token![select] => ASTNode::from_state(&mut state).map(Statement::Select),
Token![update] => ASTNode::from_state(&mut state).map(Statement::Update),
Token![delete] => ASTNode::from_state(&mut state).map(Statement::Delete),
_ => compiler::cold_rerr(QueryError::QPUnknownStatement),
_ => compiler::cold_rerr(QueryError::QLUnknownStatement),
}
}

@ -123,7 +123,7 @@ impl<'a> AlterModel<'a> {
Token![add] => AlterKind::alter_add(state),
Token![remove] => AlterKind::alter_remove(state),
Token![update] => AlterKind::alter_update(state),
_ => Err(QueryError::QPExpectedStatement),
_ => Err(QueryError::QLExpectedStatement),
};
kind.map(|kind| AlterModel::new(model_name, kind))
}

@ -97,7 +97,7 @@ pub fn parse_drop<'a, Qd: QueryData<'a>>(state: &mut State<'a, Qd>) -> QueryResu
match state.fw_read() {
Token![model] => DropModel::parse(state).map(Statement::DropModel),
Token![space] => return DropSpace::parse(state).map(Statement::DropSpace),
_ => Err(QueryError::QPUnknownStatement),
_ => Err(QueryError::QLUnknownStatement),
}
}

@ -65,7 +65,7 @@ pub fn parse_inspect<'a, Qd: QueryData<'a>>(
}
_ => {
state.cursor_back();
Err(QueryError::QPExpectedStatement)
Err(QueryError::QLExpectedStatement)
}
}
}

@ -515,7 +515,7 @@ impl<'a> ExpandedField<'a> {
Err(QueryError::QLInvalidSyntax)
}
}
_ => Err(QueryError::QPExpectedStatement),
_ => Err(QueryError::QLExpectedStatement),
}
}
}

@ -426,7 +426,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe {
let nb = slf.param_buffer.next_byte();
slf.l.push_token(Token::Lit(Lit::new_bool(nb == 1)));
if nb > 1 {
slf.l.set_error(QueryError::LexInvalidEscapedLiteral);
slf.l.set_error(QueryError::LexInvalidParameter);
}
},
// uint
@ -435,7 +435,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe {
.try_next_ascii_u64_lf_separated_or_restore_cursor()
{
Some(int) => slf.l.push_token(Lit::new_uint(int)),
None => slf.l.set_error(QueryError::LexInvalidEscapedLiteral),
None => slf.l.set_error(QueryError::LexInvalidParameter),
},
// sint
|slf| {
@ -452,7 +452,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe {
.param_buffer
.try_next_ascii_u64_lf_separated_or_restore_cursor()
else {
slf.l.set_error(QueryError::LexInvalidEscapedLiteral);
slf.l.set_error(QueryError::LexInvalidParameter);
return;
};
let body = match slf
@ -461,13 +461,13 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe {
{
Some(body) => body,
None => {
slf.l.set_error(QueryError::LexInvalidEscapedLiteral);
slf.l.set_error(QueryError::LexInvalidParameter);
return;
}
};
match core::str::from_utf8(body).map(core::str::FromStr::from_str) {
Ok(Ok(fp)) => slf.l.push_token(Lit::new_float(fp)),
_ => slf.l.set_error(QueryError::LexInvalidEscapedLiteral),
_ => slf.l.set_error(QueryError::LexInvalidParameter),
}
},
// binary
@ -476,7 +476,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe {
.param_buffer
.try_next_ascii_u64_lf_separated_or_restore_cursor()
else {
slf.l.set_error(QueryError::LexInvalidEscapedLiteral);
slf.l.set_error(QueryError::LexInvalidParameter);
return;
};
match slf
@ -484,7 +484,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe {
.try_next_variable_block(size_of_body as usize)
{
Some(block) => slf.l.push_token(Lit::new_bin(block)),
None => slf.l.set_error(QueryError::LexInvalidEscapedLiteral),
None => slf.l.set_error(QueryError::LexInvalidParameter),
}
},
// string
@ -493,7 +493,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe {
.param_buffer
.try_next_ascii_u64_lf_separated_or_restore_cursor()
else {
slf.l.set_error(QueryError::LexInvalidEscapedLiteral);
slf.l.set_error(QueryError::LexInvalidParameter);
return;
};
match slf
@ -503,10 +503,10 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe {
{
// TODO(@ohsayan): obliterate this alloc
Some(Ok(s)) => slf.l.push_token(Lit::new_string(s.to_owned())),
_ => slf.l.set_error(QueryError::LexInvalidEscapedLiteral),
_ => slf.l.set_error(QueryError::LexInvalidParameter),
}
},
// ecc
|s| s.l.set_error(QueryError::LexInvalidEscapedLiteral),
|s| s.l.set_error(QueryError::LexInvalidParameter),
]
};

@ -321,25 +321,26 @@ macro_rules! flattened_lut {
}
flattened_lut! {
static KW_LUT in kwlut;
static KW in kw;
#[derive(Debug, PartialEq, Clone, Copy)]
#[repr(u8)]
pub enum Keyword {
Statement => {
#[derive(Debug, PartialEq, Clone, Copy, sky_macros::EnumMethods)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, sky_macros::EnumMethods)]
#[repr(u8)]
/// A statement keyword
pub enum KeywordStmt {
// sys
// system
Sysctl = 0,
Describe = 1,
Inspect = 2,
// ddl
Use = 3,
Create = 4,
Alter = 5,
Drop = 6,
// dml
// DDL
Create = 1,
Alter = 2,
Drop = 3,
// system/DDL misc
Use = 4,
Inspect = 5,
Describe = 6,
// DML
Insert = 7,
Select = 8,
Update = 9,
@ -425,18 +426,18 @@ impl Keyword {
}
}
fn compute(key: &[u8]) -> Option<Self> {
static G: [u8; 67] = [
0, 42, 57, 0, 20, 61, 15, 46, 28, 0, 31, 2, 1, 44, 47, 10, 35, 53, 30, 28, 48, 9, 1,
51, 61, 20, 20, 47, 23, 31, 0, 52, 55, 59, 27, 45, 54, 49, 29, 0, 66, 54, 23, 58, 13,
31, 47, 56, 1, 30, 40, 0, 0, 42, 27, 63, 6, 24, 65, 45, 42, 63, 60, 14, 26, 4, 13,
static G: [u8; 64] = [
0, 27, 13, 56, 18, 0, 26, 30, 33, 56, 20, 41, 56, 39, 23, 34, 36, 23, 17, 40, 38, 45,
8, 25, 26, 24, 53, 59, 30, 14, 9, 60, 12, 29, 6, 47, 3, 38, 19, 5, 13, 51, 41, 34, 0,
22, 43, 13, 46, 33, 11, 12, 36, 58, 40, 0, 36, 2, 19, 49, 53, 23, 55, 0,
];
static M1: [u8; 11] = *b"wsE1pgJgJMO";
static M2: [u8; 11] = *b"fICAB04WegN";
static M1: [u8; 11] = *b"RtEMxHylmiZ";
static M2: [u8; 11] = *b"F1buDOZ2nzz";
let h1 = Self::_sum(key, M1) % G.len();
let h2 = Self::_sum(key, M2) % G.len();
let h = (G[h1] + G[h2]) as usize % G.len();
if h < G.len() && KW_LUT[h].0.eq_ignore_ascii_case(key) {
Some(KW_LUT[h].1)
if h < KW.len() && KW[h].0.eq_ignore_ascii_case(key) {
Some(KW[h].1)
} else {
None
}
@ -453,3 +454,9 @@ impl Keyword {
sum
}
}
impl KeywordStmt {
pub const fn is_blocking(&self) -> bool {
self.value_u8() <= Self::Drop.value_u8()
}
}

@ -30,229 +30,6 @@ macro_rules! __sym_token {
};
}
macro_rules! __kw_misc {
($ident:ident) => {
$crate::engine::ql::lex::Token::Keyword($crate::engine::ql::lex::Keyword::Misc($crate::engine::ql::lex::KeywordMisc::$ident))
};
}
macro_rules! __kw_stmt {
($ident:ident) => {
$crate::engine::ql::lex::Token::Keyword($crate::engine::ql::lex::Keyword::Statement($crate::engine::ql::lex::KeywordStmt::$ident))
};
}
/*
Frankly, this is just for lazy people like me. Do not judge
-- Sayan (@ohsayan)
*/
macro_rules! Token {
// misc symbol
(@) => {
__sym_token!(SymAt)
};
(#) => {
__sym_token!(SymHash)
};
($) => {
__sym_token!(SymDollar)
};
(%) => {
__sym_token!(SymPercent)
};
(.) => {
__sym_token!(SymPeriod)
};
(,) => {
__sym_token!(SymComma)
};
(_) => {
__sym_token!(SymUnderscore)
};
(?) => {
__sym_token!(SymQuestion)
};
(:) => {
__sym_token!(SymColon)
};
(;) => {
__sym_token!(SymSemicolon)
};
(~) => {
__sym_token!(SymTilde)
};
// logical
(!) => {
__sym_token!(OpLogicalNot)
};
(^) => {
__sym_token!(OpLogicalXor)
};
(&) => {
__sym_token!(OpLogicalAnd)
};
(|) => {
__sym_token!(OpLogicalOr)
};
// operator misc.
(=) => {
__sym_token!(OpAssign)
};
// arithmetic
(+) => {
__sym_token!(OpArithmeticAdd)
};
(-) => {
__sym_token!(OpArithmeticSub)
};
(*) => {
__sym_token!(OpArithmeticMul)
};
(/) => {
__sym_token!(OpArithmeticDiv)
};
// relational
(>) => {
__sym_token!(OpComparatorGt)
};
(<) => {
__sym_token!(OpComparatorLt)
};
// ddl keywords
(use) => {
__kw_stmt!(Use)
};
(create) => {
__kw_stmt!(Create)
};
(alter) => {
__kw_stmt!(Alter)
};
(drop) => {
__kw_stmt!(Drop)
};
(model) => {
__kw_misc!(Model)
};
(space) => {
__kw_misc!(Space)
};
(primary) => {
__kw_misc!(Primary)
};
// ddl misc
(with) => {
__kw_misc!(With)
};
(add) => {
__kw_misc!(Add)
};
(remove) => {
__kw_misc!(Remove)
};
(sort) => {
__kw_misc!(Sort)
};
(type) => {
__kw_misc!(Type)
};
// dml
(insert) => {
__kw_stmt!(Insert)
};
(select) => {
__kw_stmt!(Select)
};
(update) => {
__kw_stmt!(Update)
};
(delete) => {
__kw_stmt!(Delete)
};
// dml misc
(set) => {
__kw_misc!(Set)
};
(limit) => {
__kw_misc!(Limit)
};
(from) => {
__kw_misc!(From)
};
(into) => {
__kw_misc!(Into)
};
(where) => {
__kw_misc!(Where)
};
(if) => {
__kw_misc!(If)
};
(and) => {
__kw_misc!(And)
};
(as) => {
__kw_misc!(As)
};
(by) => {
__kw_misc!(By)
};
(asc) => {
__kw_misc!(Asc)
};
(desc) => {
__kw_misc!(Desc)
};
// types
(string) => {
__kw_misc!(String)
};
(binary) => {
__kw_misc!(Binary)
};
(list) => {
__kw_misc!(List)
};
(map) => {
__kw_misc!(Map)
};
(bool) => {
__kw_misc!(Bool)
};
(int) => {
__kw_misc!(Int)
};
(double) => {
__kw_misc!(Double)
};
(float) => {
__kw_misc!(Float)
};
// tt
(open {}) => {
__sym_token!(TtOpenBrace)
};
(close {}) => {
__sym_token!(TtCloseBrace)
};
(() open) => {
__sym_token!(TtOpenParen)
};
(() close) => {
__sym_token!(TtCloseParen)
};
(open []) => {
__sym_token!(TtOpenSqBracket)
};
(close []) => {
__sym_token!(TtCloseSqBracket)
};
// misc
(null) => {
__kw_misc!(Null)
};
}
#[cfg(test)]
macro_rules! dict {
() => {

@ -134,9 +134,9 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
Ok(()) => Ok(()),
Err(_) => {
// republish changes since we failed to commit
restore_list
.into_iter()
.for_each(|delta| model.delta_state().append_new_data_delta(delta, &g));
restore_list.into_iter().for_each(|delta| {
model.delta_state().append_new_data_delta(delta, &g);
});
// now attempt to fix the file
return self.attempt_fix_data_batchfile();
}

@ -318,7 +318,7 @@ fn drop_model() {
.namespace()
.with_model(("myspace", "mymodel"), |_| { Ok(()) })
.unwrap_err(),
QueryError::QPObjectNotFound
QueryError::QExecObjectNotFound
);
})
})

Loading…
Cancel
Save