From 3e981f3dcb90b084d14badaa250d449287acfed0 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sat, 7 Oct 2023 08:54:28 +0000 Subject: [PATCH] Implement exec, fix delta handling and add resp --- server/src/engine/core/dml/del.rs | 8 +- server/src/engine/core/dml/ins.rs | 9 +- server/src/engine/core/dml/mod.rs | 18 +- server/src/engine/core/dml/sel.rs | 4 +- server/src/engine/core/dml/upd.rs | 18 +- server/src/engine/core/exec.rs | 163 ++++++++++++- server/src/engine/core/mod.rs | 20 +- server/src/engine/core/model/alt.rs | 23 +- server/src/engine/core/model/delta.rs | 24 +- server/src/engine/core/model/mod.rs | 12 +- server/src/engine/core/space.rs | 18 +- server/src/engine/core/tests/ddl_model/alt.rs | 16 +- server/src/engine/core/tests/ddl_model/crt.rs | 10 +- .../src/engine/core/tests/ddl_model/layer.rs | 4 +- .../src/engine/core/tests/ddl_space/alter.rs | 2 +- .../src/engine/core/tests/ddl_space/create.rs | 4 +- server/src/engine/core/tests/dml/delete.rs | 2 +- server/src/engine/core/tests/dml/insert.rs | 2 +- server/src/engine/core/tests/dml/select.rs | 2 +- server/src/engine/core/tests/dml/update.rs | 6 +- server/src/engine/core/util.rs | 2 +- server/src/engine/error.rs | 71 +++--- server/src/engine/fractal/mgr.rs | 5 - server/src/engine/fractal/mod.rs | 25 +- server/src/engine/macros.rs | 227 ++++++++++++++++++ server/src/engine/net/mod.rs | 10 +- server/src/engine/net/protocol/exchange.rs | 55 +++-- server/src/engine/net/protocol/mod.rs | 38 ++- server/src/engine/ql/ast/mod.rs | 15 +- server/src/engine/ql/ddl/alt.rs | 2 +- server/src/engine/ql/ddl/drop.rs | 2 +- server/src/engine/ql/ddl/ins.rs | 2 +- server/src/engine/ql/ddl/syn.rs | 2 +- server/src/engine/ql/lex/mod.rs | 20 +- server/src/engine/ql/lex/raw.rs | 45 ++-- server/src/engine/ql/macros.rs | 223 ----------------- .../engine/storage/v1/batch_jrnl/persist.rs | 6 +- server/src/engine/txn/gns/tests/full_chain.rs | 2 +- 38 files changed, 660 insertions(+), 457 deletions(-) diff --git a/server/src/engine/core/dml/del.rs b/server/src/engine/core/dml/del.rs index 1ba3b8e5..0802383c 100644 --- a/server/src/engine/core/dml/del.rs +++ b/server/src/engine/core/dml/del.rs @@ -25,7 +25,7 @@ */ use crate::engine::{ - core::{self, model::delta::DataDeltaKind}, + core::{self, dml::QueryExecMeta, model::delta::DataDeltaKind}, error::{QueryError, QueryResult}, fractal::GlobalInstanceLike, idx::MTIndex, @@ -47,16 +47,16 @@ pub fn delete(global: &impl GlobalInstanceLike, mut delete: DeleteStatement) -> .mt_delete_return_entry(&model.resolve_where(delete.clauses_mut())?, &g) { Some(row) => { - delta_state.append_new_data_delta_with( + let dp = delta_state.append_new_data_delta_with( DataDeltaKind::Delete, row.clone(), schema_version, new_version, &g, ); - Ok(()) + Ok(QueryExecMeta::new(dp)) } - None => Err(QueryError::QPDmlRowNotFound), + None => Err(QueryError::QExecDmlRowNotFound), } }) } diff --git a/server/src/engine/core/dml/ins.rs b/server/src/engine/core/dml/ins.rs index 8b5d7992..8b0173dd 100644 --- a/server/src/engine/core/dml/ins.rs +++ b/server/src/engine/core/dml/ins.rs @@ -27,6 +27,7 @@ use crate::engine::{ core::{ self, + dml::QueryExecMeta, index::{DcFieldIndex, PrimaryIndexKey, Row}, model::{delta::DataDeltaKind, Fields, Model}, }, @@ -49,16 +50,16 @@ pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> Quer let row = Row::new(pk, data, ds.schema_current_version(), new_version); if mdl.primary_index().__raw_index().mt_insert(row.clone(), &g) { // append delta for new version - ds.append_new_data_delta_with( + let dp = ds.append_new_data_delta_with( DataDeltaKind::Insert, row, ds.schema_current_version(), new_version, &g, ); - Ok(()) + Ok(QueryExecMeta::new(dp)) } else { - Err(QueryError::QPDmlDuplicate) + Err(QueryError::QExecDmlDuplicate) } }) } @@ -114,6 +115,6 @@ fn prepare_insert( }; Ok((primary_key, prepared_data)) } else { - Err(QueryError::QPDmlValidationError) + Err(QueryError::QExecDmlValidationError) } } diff --git a/server/src/engine/core/dml/mod.rs b/server/src/engine/core/dml/mod.rs index a7414382..dfe1dfb1 100644 --- a/server/src/engine/core/dml/mod.rs +++ b/server/src/engine/core/dml/mod.rs @@ -55,7 +55,23 @@ impl Model { { Ok(clause.rhs()) } - _ => compiler::cold_rerr(QueryError::QPDmlWhereHasUnindexedColumn), + _ => compiler::cold_rerr(QueryError::QExecDmlWhereHasUnindexedColumn), } } } + +pub struct QueryExecMeta { + delta_hint: usize, +} + +impl QueryExecMeta { + pub fn new(delta_hint: usize) -> Self { + Self { delta_hint } + } + pub fn zero() -> Self { + Self::new(0) + } + pub fn delta_hint(&self) -> usize { + self.delta_hint + } +} diff --git a/server/src/engine/core/dml/sel.rs b/server/src/engine/core/dml/sel.rs index 4ee9698c..f95a8239 100644 --- a/server/src/engine/core/dml/sel.rs +++ b/server/src/engine/core/dml/sel.rs @@ -52,7 +52,7 @@ where match fields.st_get(key) { Some(dc) => cellfn(dc), None if key == mdl.p_key() => cellfn(&pkdc), - None => return Err(QueryError::QPUnknownField), + None => return Err(QueryError::QExecUnknownField), } Ok(()) }; @@ -69,7 +69,7 @@ where } } } - None => return Err(QueryError::QPDmlRowNotFound), + None => return Err(QueryError::QExecDmlRowNotFound), } Ok(()) }) diff --git a/server/src/engine/core/dml/upd.rs b/server/src/engine/core/dml/upd.rs index 3be58b52..eeb2998a 100644 --- a/server/src/engine/core/dml/upd.rs +++ b/server/src/engine/core/dml/upd.rs @@ -30,7 +30,10 @@ use std::cell::RefCell; use { crate::{ engine::{ - core::{self, model::delta::DataDeltaKind, query_meta::AssignmentOperator}, + core::{ + self, dml::QueryExecMeta, model::delta::DataDeltaKind, + query_meta::AssignmentOperator, + }, data::{ cell::Datacell, lit::Lit, @@ -235,7 +238,7 @@ pub fn collect_trace_path() -> Vec<&'static str> { #[allow(unused)] pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) -> QueryResult<()> { core::with_model_for_data_update(global, update.entity(), |mdl| { - let mut ret = Ok(()); + let mut ret = Ok(QueryExecMeta::zero()); // prepare row fetch let key = mdl.resolve_where(update.clauses_mut())?; // freeze schema @@ -243,7 +246,7 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) -> // fetch row let g = sync::atm::cpin(); let Some(row) = mdl.primary_index().select(key, &g) else { - return Err(QueryError::QPDmlRowNotFound); + return Err(QueryError::QExecDmlRowNotFound); }; // lock row let mut row_data_wl = row.d_data().write(); @@ -280,7 +283,7 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) -> _ => { input_trace("fieldnotfound"); rollback_now = true; - ret = Err(QueryError::QPUnknownField); + ret = Err(QueryError::QExecUnknownField); break; } } @@ -321,13 +324,13 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) -> } else { input_trace("list;badtag"); rollback_now = true; - ret = Err(QueryError::QPDmlValidationError); + ret = Err(QueryError::QExecDmlValidationError); break; } } _ => { input_trace("unknown_reason;exitmainloop"); - ret = Err(QueryError::QPDmlValidationError); + ret = Err(QueryError::QExecDmlValidationError); rollback_now = true; break; } @@ -344,13 +347,14 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) -> // update revised tag row_data_wl.set_txn_revised(new_version); // publish delta - ds.append_new_data_delta_with( + let dp = ds.append_new_data_delta_with( DataDeltaKind::Update, row.clone(), ds.schema_current_version(), new_version, &g, ); + ret = Ok(QueryExecMeta::new(dp)) } ret }) diff --git a/server/src/engine/core/exec.rs b/server/src/engine/core/exec.rs index 642960df..703b2447 100644 --- a/server/src/engine/core/exec.rs +++ b/server/src/engine/core/exec.rs @@ -24,12 +24,169 @@ * */ -use crate::engine::{error::QueryResult, fractal::Global, net::protocol::SQuery}; +use { + crate::engine::{ + core::{dml, model::Model, space::Space}, + error::{QueryError, QueryResult}, + fractal::Global, + net::protocol::{Response, SQuery}, + ql::{ + ast::{traits::ASTNode, InplaceData, State}, + lex::{Keyword, KeywordStmt, Token}, + }, + }, + core::ops::Deref, +}; -pub async fn execute_query<'a>(_global: &Global, query: SQuery<'a>) -> QueryResult<()> { +pub async fn dispatch_to_executor<'a, 'b>( + global: &'b Global, + query: SQuery<'a>, +) -> QueryResult { let tokens = crate::engine::ql::lex::SecureLexer::new_with_segments(query.query(), query.params()) .lex()?; - let _ = crate::engine::ql::ast::compile(&tokens, crate::engine::ql::ast::InplaceData::new()); + let mut state = State::new_inplace(&tokens); + let stmt = match state.read() { + Token::Keyword(Keyword::Statement(stmt)) if state.remaining() >= 3 => *stmt, + _ => return Err(QueryError::QLExpectedStatement), + }; + state.cursor_ahead(); + if stmt.is_blocking() { + run_blocking_stmt(state, stmt, global).await + } else { + run_nb(global, state, stmt) + } +} + +/* + blocking exec + --- + trigger warning: disgusting hacks below (why can't async play nice with lifetimes :|) +*/ + +struct RawSlice { + t: *const T, + l: usize, +} + +unsafe impl Send for RawSlice {} +unsafe impl Sync for RawSlice {} + +impl RawSlice { + #[inline(always)] + unsafe fn new(t: *const T, l: usize) -> Self { + Self { t, l } + } +} + +impl Deref for RawSlice { + type Target = [T]; + #[inline(always)] + fn deref(&self) -> &Self::Target { + unsafe { + // UNSAFE(@ohsayan): the caller MUST guarantee that this remains valid throughout the usage of the slice + core::slice::from_raw_parts(self.t, self.l) + } + } +} + +#[inline(always)] +fn call + core::fmt::Debug, T>( + g: Global, + tokens: RawSlice>, + f: impl FnOnce(&Global, A) -> QueryResult, +) -> QueryResult { + let mut state = State::new_inplace(unsafe { + // UNSAFE(@ohsayan): nothing to drop. all cool + core::mem::transmute(tokens) + }); + _call(&g, &mut state, f) +} + +#[inline(always)] +fn _call + core::fmt::Debug, T>( + g: &Global, + state: &mut State<'static, InplaceData>, + f: impl FnOnce(&Global, A) -> Result, +) -> QueryResult { + let cs = ASTNode::from_state(state)?; + f(&g, cs) +} + +async fn run_blocking_stmt( + mut state: State<'_, InplaceData>, + stmt: KeywordStmt, + global: &Global, +) -> Result { + let (a, b) = (&state.current()[0], &state.current()[1]); + let sysctl = stmt == KeywordStmt::Sysctl; + let create = stmt == KeywordStmt::Create; + let alter = stmt == KeywordStmt::Alter; + let drop = stmt == KeywordStmt::Drop; + let last_id = b.is_ident(); + let c_s = (create & Token![space].eq(a) & last_id) as u8 * 2; + let c_m = (create & Token![model].eq(a) & last_id) as u8 * 3; + let a_s = (alter & Token![space].eq(a) & last_id) as u8 * 4; + let a_m = (alter & Token![model].eq(a) & last_id) as u8 * 5; + let d_s = (drop & Token![space].eq(a) & last_id) as u8 * 6; + let d_m = (drop & Token![model].eq(a) & last_id) as u8 * 7; + let fc = sysctl as u8 | c_s | c_m | a_s | a_m | d_s | d_m; + state.cursor_ahead(); + static BLK_EXEC: [fn(Global, RawSlice>) -> QueryResult<()>; 8] = [ + |_, _| Err(QueryError::QLUnknownStatement), // unknown + blocking_exec_sysctl, // sysctl + |g, t| call(g, t, Space::transactional_exec_create), + |g, t| call(g, t, Model::transactional_exec_create), + |g, t| call(g, t, Space::transactional_exec_alter), + |g, t| call(g, t, Model::transactional_exec_alter), + |g, t| call(g, t, Space::transactional_exec_drop), + |g, t| call(g, t, Model::transactional_exec_drop), + ]; + let r = unsafe { + // UNSAFE(@ohsayan): the only await is within this block + let c_glob = global.clone(); + let ptr = state.current().as_ptr() as usize; + let len = state.current().len(); + tokio::task::spawn_blocking(move || { + let tokens = RawSlice::new(ptr as *const Token, len); + BLK_EXEC[fc as usize](c_glob, tokens)?; + Ok(Response::Empty) + }) + .await + }; + r.unwrap() +} + +fn blocking_exec_sysctl(_: Global, _: RawSlice>) -> QueryResult<()> { todo!() } + +/* + nb exec +*/ + +fn run_nb( + global: &Global, + state: State<'_, InplaceData>, + stmt: KeywordStmt, +) -> QueryResult { + let stmt = stmt.value_u8() - KeywordStmt::Use.value_u8(); + static F: [fn(&Global, &mut State<'static, InplaceData>) -> QueryResult<()>; 8] = [ + |_, _| panic!("use not implemented"), + |_, _| panic!("inspect not implemented"), + |_, _| panic!("describe not implemented"), + |g, s| _call(g, s, dml::insert), + |_, _| panic!("select not implemented"), + |g, s| _call(g, s, dml::update), + |g, s| _call(g, s, dml::delete), + |_, _| panic!("exists not implemented"), + ]; + { + let mut state = unsafe { + // UNSAFE(@ohsayan): this is a lifetime issue with the token handle + core::mem::transmute(state) + }; + F[stmt as usize](global, &mut state)?; + } + Ok(Response::Empty) +} diff --git a/server/src/engine/core/mod.rs b/server/src/engine/core/mod.rs index 7385cf4c..6a759f71 100644 --- a/server/src/engine/core/mod.rs +++ b/server/src/engine/core/mod.rs @@ -57,13 +57,13 @@ pub struct GlobalNS { index_space: RWLIdx, Space>, } -pub(self) fn with_model_for_data_update<'a, T, E, F>( +pub(self) fn with_model_for_data_update<'a, E, F>( global: &impl GlobalInstanceLike, entity: E, f: F, -) -> QueryResult +) -> QueryResult<()> where - F: FnOnce(&Model) -> QueryResult, + F: FnOnce(&Model) -> QueryResult, E: 'a + EntityLocator<'a>, { let (space_name, model_name) = entity.parse_entity()?; @@ -71,11 +71,15 @@ where .namespace() .with_model((space_name, model_name), |mdl| { let r = f(mdl); - // see if this task local delta is full - if r.is_ok() { - model::DeltaState::guard_delta_overflow(global, space_name, model_name, mdl); + match r { + Ok(dhint) => { + model::DeltaState::guard_delta_overflow( + global, space_name, model_name, mdl, dhint, + ); + Ok(()) + } + Err(e) => Err(e), } - r }) } @@ -101,7 +105,7 @@ impl GlobalNS { ) -> QueryResult { let sread = self.index_space.read(); let Some(space) = sread.st_get(space) else { - return Err(QueryError::QPObjectNotFound); + return Err(QueryError::QExecObjectNotFound); }; f(space) } diff --git a/server/src/engine/core/model/alt.rs b/server/src/engine/core/model/alt.rs index d9716a70..fe2fdd79 100644 --- a/server/src/engine/core/model/alt.rs +++ b/server/src/engine/core/model/alt.rs @@ -84,7 +84,7 @@ fn no_field(mr: &IWModel, new: &str) -> bool { fn check_nullable(props: &mut HashMap, DictEntryGeneric>) -> QueryResult { match props.remove("nullable") { Some(DictEntryGeneric::Data(b)) if b.kind() == TagClass::Bool => Ok(b.bool()), - Some(_) => Err(QueryError::QPDdlInvalidProperties), + Some(_) => Err(QueryError::QExecDdlInvalidProperties), None => Ok(false), } } @@ -101,7 +101,7 @@ impl<'a> AlterPlan<'a> { AlterKind::Remove(r) => { let mut x = HashSet::new(); if !r.iter().all(|id| x.insert(id.as_str())) { - return Err(QueryError::QPDdlModelAlterIllegal); + return Err(QueryError::QExecDdlModelAlterIllegal); } let mut not_found = false; if r.iter().all(|id| { @@ -112,9 +112,9 @@ impl<'a> AlterPlan<'a> { }) { can_ignore!(AlterAction::Remove(r)) } else if not_found { - return Err(QueryError::QPUnknownField); + return Err(QueryError::QExecUnknownField); } else { - return Err(QueryError::QPDdlModelAlterIllegal); + return Err(QueryError::QExecDdlModelAlterIllegal); } } AlterKind::Add(new_fields) => { @@ -148,7 +148,7 @@ impl<'a> AlterPlan<'a> { mv.guard_pk(&field_name)?; // get the current field let Some(current_field) = wm.fields().st_get(field_name.as_str()) else { - return Err(QueryError::QPUnknownField); + return Err(QueryError::QExecUnknownField); }; // check props let is_nullable = check_nullable(&mut props)?; @@ -174,7 +174,7 @@ impl<'a> AlterPlan<'a> { no_lock, }) } else { - Err(QueryError::QPDdlModelAlterIllegal) + Err(QueryError::QExecDdlModelAlterIllegal) } } fn ldeltas( @@ -197,7 +197,7 @@ impl<'a> AlterPlan<'a> { } if layers.len() > current.layers().len() { // simply a dumb tomato; ELIMINATE THESE DUMB TOMATOES - return Err(QueryError::QPDdlModelAlterIllegal); + return Err(QueryError::QExecDdlModelAlterIllegal); } let mut no_lock = !(current.is_nullable() & !nullable); let mut deltasize = (current.is_nullable() ^ nullable) as usize; @@ -216,7 +216,7 @@ impl<'a> AlterPlan<'a> { // actually parse the new layer okay &= props.is_empty(); let Some(new_parsed_layer) = Layer::get_layer(&ty) else { - return Err(QueryError::QPDdlInvalidTypeDefinition); + return Err(QueryError::QExecDdlInvalidTypeDefinition); }; match ( current_layer.tag.tag_selector(), @@ -233,7 +233,7 @@ impl<'a> AlterPlan<'a> { } _ => { // can't cast this directly - return Err(QueryError::QPDdlInvalidTypeDefinition); + return Err(QueryError::QExecDdlInvalidTypeDefinition); } } *new_layer = new_parsed_layer; @@ -243,13 +243,12 @@ impl<'a> AlterPlan<'a> { if okay { Ok((deltasize != 0, new_field)) } else { - Err(QueryError::QPDdlModelAlterIllegal) + Err(QueryError::QExecDdlModelAlterIllegal) } } } impl Model { - #[allow(unused)] pub fn transactional_exec_alter( global: &G, alter: AlterModel, @@ -264,7 +263,7 @@ impl Model { // we have a legal plan; acquire exclusive if we need it if !plan.no_lock { // TODO(@ohsayan): allow this later on, once we define the syntax - return Err(QueryError::QPNeedLock); + return Err(QueryError::QExecNeedLock); } // fine, we're good let mut iwm = iwm; diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index 0565f481..af1c8d4d 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -30,7 +30,7 @@ use { super::{Fields, Model}, crate::{ engine::{ - core::index::Row, + core::{dml::QueryExecMeta, index::Row}, fractal::{FractalToken, GlobalInstanceLike}, sync::atm::Guard, sync::queue::Queue, @@ -186,19 +186,9 @@ impl DeltaState { space_name: &str, model_name: &str, model: &Model, + hint: QueryExecMeta, ) { - let current_deltas_size = model.delta_state().data_deltas_size.load(Ordering::Acquire); - let max_len = global - .get_max_delta_size() - .min((model.primary_index().count() as f64 * 0.05) as usize); - if compiler::unlikely(current_deltas_size >= max_len) { - global.request_batch_resolve( - space_name, - model_name, - model.get_uuid(), - current_deltas_size, - ); - } + global.request_batch_resolve_if_cache_full(space_name, model_name, model, hint) } } @@ -211,12 +201,12 @@ impl DeltaState { schema_version: DeltaVersion, data_version: DeltaVersion, g: &Guard, - ) { - self.append_new_data_delta(DataDelta::new(schema_version, data_version, row, kind), g); + ) -> usize { + self.append_new_data_delta(DataDelta::new(schema_version, data_version, row, kind), g) } - pub fn append_new_data_delta(&self, delta: DataDelta, g: &Guard) { + pub fn append_new_data_delta(&self, delta: DataDelta, g: &Guard) -> usize { self.data_deltas.blocking_enqueue(delta, g); - self.data_deltas_size.fetch_add(1, Ordering::Release); + self.data_deltas_size.fetch_add(1, Ordering::Release) + 1 } pub fn create_new_data_delta_version(&self) -> DeltaVersion { DeltaVersion(self.__data_delta_step()) diff --git a/server/src/engine/core/model/mod.rs b/server/src/engine/core/model/mod.rs index 82acd88c..61e1557f 100644 --- a/server/src/engine/core/model/mod.rs +++ b/server/src/engine/core/model/mod.rs @@ -134,7 +134,7 @@ impl Model { } fn guard_pk(&self, new: &str) -> QueryResult<()> { if self.is_pk(new) { - Err(QueryError::QPDdlModelAlterIllegal) + Err(QueryError::QExecDdlModelAlterIllegal) } else { Ok(()) } @@ -195,12 +195,11 @@ impl Model { return Ok(Self::new_restore(Uuid::new(), last_pk.into(), tag, fields)); } } - Err(QueryError::QPDdlModelBadDefinition) + Err(QueryError::QExecDdlModelBadDefinition) } } impl Model { - #[allow(unused)] pub fn transactional_exec_create( global: &G, stmt: CreateModel, @@ -210,7 +209,7 @@ impl Model { global.namespace().with_space(space_name, |space| { let mut w_space = space.models().write(); if w_space.st_contains(model_name) { - return Err(QueryError::QPDdlObjectAlreadyExists); + return Err(QueryError::QExecDdlObjectAlreadyExists); } if G::FS_IS_NON_NULL { let irm = model.intent_read_model(); @@ -251,7 +250,6 @@ impl Model { Ok(()) }) } - #[allow(unused)] pub fn transactional_exec_drop( global: &G, stmt: DropModel, @@ -260,7 +258,7 @@ impl Model { global.namespace().with_space(space_name, |space| { let mut w_space = space.models().write(); let Some(model) = w_space.get(model_name) else { - return Err(QueryError::QPObjectNotFound); + return Err(QueryError::QExecObjectNotFound); }; if G::FS_IS_NON_NULL { // prepare txn @@ -366,7 +364,7 @@ impl Field { nullable, }) } else { - Err(QueryError::QPDdlInvalidTypeDefinition) + Err(QueryError::QExecDdlInvalidTypeDefinition) } } #[inline(always)] diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index 52013191..05685d0e 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -93,7 +93,7 @@ impl Space { { Ok(()) } else { - Err(QueryError::QPDdlObjectAlreadyExists) + Err(QueryError::QExecDdlObjectAlreadyExists) } } pub fn get_uuid(&self) -> Uuid { @@ -112,7 +112,7 @@ impl Space { ) -> QueryResult { let mread = self.mns.read(); let Some(model) = mread.st_get(model) else { - return Err(QueryError::QPObjectNotFound); + return Err(QueryError::QExecObjectNotFound); }; f(model) } @@ -161,7 +161,7 @@ impl Space { None if props.is_empty() => IndexST::default(), _ => { // unknown properties - return Err(QueryError::QPDdlInvalidProperties); + return Err(QueryError::QExecDdlInvalidProperties); } }; Ok(ProcedureCreate { @@ -178,7 +178,6 @@ impl Space { } impl Space { - #[allow(unused)] pub fn transactional_exec_create( global: &G, space: CreateSpace, @@ -188,7 +187,7 @@ impl Space { // acquire access let mut wl = global.namespace().spaces().write(); if wl.st_contains(&space_name) { - return Err(QueryError::QPDdlObjectAlreadyExists); + return Err(QueryError::QExecDdlObjectAlreadyExists); } // commit txn if G::FS_IS_NON_NULL { @@ -229,13 +228,13 @@ impl Space { Some(DictEntryGeneric::Map(_)) if updated_props.len() == 1 => {} Some(DictEntryGeneric::Data(l)) if updated_props.len() == 1 && l.is_null() => {} None if updated_props.is_empty() => return Ok(()), - _ => return Err(QueryError::QPDdlInvalidProperties), + _ => return Err(QueryError::QExecDdlInvalidProperties), } let mut space_props = space.meta.dict().write(); // create patch let patch = match dict::rprepare_metadata_patch(&space_props, updated_props) { Some(patch) => patch, - None => return Err(QueryError::QPDdlInvalidProperties), + None => return Err(QueryError::QExecDdlInvalidProperties), }; if G::FS_IS_NON_NULL { // prepare txn @@ -255,7 +254,6 @@ impl Space { Ok(()) }) } - #[allow(unused)] pub fn transactional_exec_drop( global: &G, DropSpace { space, force: _ }: DropSpace, @@ -266,11 +264,11 @@ impl Space { let mut wgns = global.namespace().spaces().write(); let space = match wgns.get(space_name.as_str()) { Some(space) => space, - None => return Err(QueryError::QPObjectNotFound), + None => return Err(QueryError::QExecObjectNotFound), }; let space_w = space.mns.write(); if space_w.st_len() != 0 { - return Err(QueryError::QPDdlNotEmpty); + return Err(QueryError::QExecDdlNotEmpty); } // we can remove this if G::FS_IS_NON_NULL { diff --git a/server/src/engine/core/tests/ddl_model/alt.rs b/server/src/engine/core/tests/ddl_model/alt.rs index 09ac8fc7..3d6246ef 100644 --- a/server/src/engine/core/tests/ddl_model/alt.rs +++ b/server/src/engine/core/tests/ddl_model/alt.rs @@ -164,7 +164,7 @@ mod plan { |_| {} ) .unwrap_err(), - QueryError::QPUnknownField + QueryError::QExecUnknownField ); } #[test] @@ -176,7 +176,7 @@ mod plan { |_| {} ) .unwrap_err(), - QueryError::QPDdlModelAlterIllegal + QueryError::QExecDdlModelAlterIllegal ); } #[test] @@ -188,7 +188,7 @@ mod plan { |_| {} ) .unwrap_err(), - QueryError::QPDdlModelAlterIllegal + QueryError::QExecDdlModelAlterIllegal ); } #[test] @@ -200,7 +200,7 @@ mod plan { |_| {} ) .unwrap_err(), - QueryError::QPDdlModelAlterIllegal + QueryError::QExecDdlModelAlterIllegal ); } #[test] @@ -212,7 +212,7 @@ mod plan { |_| {} ) .unwrap_err(), - QueryError::QPDdlModelAlterIllegal + QueryError::QExecDdlModelAlterIllegal ); } #[test] @@ -224,7 +224,7 @@ mod plan { |_| {} ) .unwrap_err(), - QueryError::QPUnknownField + QueryError::QExecUnknownField ); } fn bad_type_cast(orig_ty: &str, new_ty: &str) { @@ -235,7 +235,7 @@ mod plan { super::with_plan(&create, &alter, |_| {}).expect_err(&format!( "found no error in transformation: {orig_ty} -> {new_ty}" )), - QueryError::QPDdlInvalidTypeDefinition, + QueryError::QExecDdlInvalidTypeDefinition, "failed to match error in transformation: {orig_ty} -> {new_ty}", ) } @@ -445,7 +445,7 @@ mod exec { |_| {}, ) .unwrap_err(), - QueryError::QPNeedLock + QueryError::QExecNeedLock ); } } diff --git a/server/src/engine/core/tests/ddl_model/crt.rs b/server/src/engine/core/tests/ddl_model/crt.rs index 7193dc14..fa52a40d 100644 --- a/server/src/engine/core/tests/ddl_model/crt.rs +++ b/server/src/engine/core/tests/ddl_model/crt.rs @@ -89,7 +89,7 @@ mod validation { "create model mymodel(primary username: string, primary contract_location: binary)" ) .unwrap_err(), - QueryError::QPDdlModelBadDefinition + QueryError::QExecDdlModelBadDefinition ); } @@ -97,7 +97,7 @@ mod validation { fn duplicate_fields() { assert_eq!( create("create model mymodel(primary username: string, username: binary)").unwrap_err(), - QueryError::QPDdlModelBadDefinition + QueryError::QExecDdlModelBadDefinition ); } @@ -105,7 +105,7 @@ mod validation { fn illegal_props() { assert_eq!( create("create model mymodel(primary username: string, password: binary) with { lol_prop: false }").unwrap_err(), - QueryError::QPDdlModelBadDefinition + QueryError::QExecDdlModelBadDefinition ); } @@ -116,12 +116,12 @@ mod validation { "create model mymodel(primary username_bytes: list { type: uint8 }, password: binary)" ) .unwrap_err(), - QueryError::QPDdlModelBadDefinition + QueryError::QExecDdlModelBadDefinition ); assert_eq!( create("create model mymodel(primary username: float32, password: binary)") .unwrap_err(), - QueryError::QPDdlModelBadDefinition + QueryError::QExecDdlModelBadDefinition ); } } diff --git a/server/src/engine/core/tests/ddl_model/layer.rs b/server/src/engine/core/tests/ddl_model/layer.rs index 0ee2626e..345ed2ce 100644 --- a/server/src/engine/core/tests/ddl_model/layer.rs +++ b/server/src/engine/core/tests/ddl_model/layer.rs @@ -64,7 +64,7 @@ mod layer_spec_validation { fn invalid_list() { assert_eq!( layerview("list").unwrap_err(), - QueryError::QPDdlInvalidTypeDefinition + QueryError::QExecDdlInvalidTypeDefinition ); } @@ -72,7 +72,7 @@ mod layer_spec_validation { fn invalid_flat() { assert_eq!( layerview("string { type: string }").unwrap_err(), - QueryError::QPDdlInvalidTypeDefinition + QueryError::QExecDdlInvalidTypeDefinition ); } } diff --git a/server/src/engine/core/tests/ddl_space/alter.rs b/server/src/engine/core/tests/ddl_space/alter.rs index e71e7ec0..0ae95e23 100644 --- a/server/src/engine/core/tests/ddl_space/alter.rs +++ b/server/src/engine/core/tests/ddl_space/alter.rs @@ -122,7 +122,7 @@ fn alter_nx() { |_| {}, ) .unwrap_err(), - QueryError::QPObjectNotFound + QueryError::QExecObjectNotFound ); } diff --git a/server/src/engine/core/tests/ddl_space/create.rs b/server/src/engine/core/tests/ddl_space/create.rs index b6e70bb2..8a42cf55 100644 --- a/server/src/engine/core/tests/ddl_space/create.rs +++ b/server/src/engine/core/tests/ddl_space/create.rs @@ -73,7 +73,7 @@ fn exec_create_space_with_bad_env_type() { let global = TestGlobal::new_with_tmp_nullfs_driver(); assert_eq!( super::exec_create(&global, "create space myspace with { env: 100 }", |_| {}).unwrap_err(), - QueryError::QPDdlInvalidProperties + QueryError::QExecDdlInvalidProperties ); } @@ -87,6 +87,6 @@ fn exec_create_space_with_random_property() { |_| {} ) .unwrap_err(), - QueryError::QPDdlInvalidProperties + QueryError::QExecDdlInvalidProperties ); } diff --git a/server/src/engine/core/tests/dml/delete.rs b/server/src/engine/core/tests/dml/delete.rs index 5a8a32c4..017ba0ee 100644 --- a/server/src/engine/core/tests/dml/delete.rs +++ b/server/src/engine/core/tests/dml/delete.rs @@ -51,6 +51,6 @@ fn delete_nonexisting() { "sayan", ) .unwrap_err(), - QueryError::QPDmlRowNotFound + QueryError::QExecDmlRowNotFound ); } diff --git a/server/src/engine/core/tests/dml/insert.rs b/server/src/engine/core/tests/dml/insert.rs index 109d41c8..c1c99cbc 100644 --- a/server/src/engine/core/tests/dml/insert.rs +++ b/server/src/engine/core/tests/dml/insert.rs @@ -83,6 +83,6 @@ fn insert_duplicate() { assert_eq!( super::exec_insert_only(&global, "insert into myspace.mymodel('sayan', 'pass123')") .unwrap_err(), - QueryError::QPDmlDuplicate + QueryError::QExecDmlDuplicate ); } diff --git a/server/src/engine/core/tests/dml/select.rs b/server/src/engine/core/tests/dml/select.rs index 966b0411..2af7dfd6 100644 --- a/server/src/engine/core/tests/dml/select.rs +++ b/server/src/engine/core/tests/dml/select.rs @@ -97,6 +97,6 @@ fn select_nonexisting() { "select username, password from myspace.mymodel where username = 'notsayan'", ) .unwrap_err(), - QueryError::QPDmlRowNotFound + QueryError::QExecDmlRowNotFound ); } diff --git a/server/src/engine/core/tests/dml/update.rs b/server/src/engine/core/tests/dml/update.rs index b0cf57d3..577cf465 100644 --- a/server/src/engine/core/tests/dml/update.rs +++ b/server/src/engine/core/tests/dml/update.rs @@ -96,7 +96,7 @@ fn fail_operation_on_null() { "select * from myspace.mymodel where username='sayan'" ) .unwrap_err(), - QueryError::QPDmlValidationError + QueryError::QExecDmlValidationError ); assert_eq!( dml::update_flow_trace(), @@ -116,7 +116,7 @@ fn fail_unknown_fields() { "select * from myspace.mymodel where username='sayan'" ) .unwrap_err(), - QueryError::QPUnknownField + QueryError::QExecUnknownField ); assert_eq!(dml::update_flow_trace(), ["fieldnotfound", "rollback"]); // verify integrity @@ -142,7 +142,7 @@ fn fail_typedef_violation() { "select * from myspace.mymodel where username = 'sayan'" ) .unwrap_err(), - QueryError::QPDmlValidationError + QueryError::QExecDmlValidationError ); assert_eq!( dml::update_flow_trace(), diff --git a/server/src/engine/core/util.rs b/server/src/engine/core/util.rs index 15c07be6..f7f54895 100644 --- a/server/src/engine/core/util.rs +++ b/server/src/engine/core/util.rs @@ -46,6 +46,6 @@ impl<'a> EntityLocator<'a> for Entity<'a> { where Self: 'a, { - self.into_full_str().ok_or(QueryError::QPExpectedEntity) + self.into_full_str().ok_or(QueryError::QLExpectedEntity) } } diff --git a/server/src/engine/error.rs b/server/src/engine/error.rs index 1d0ef66f..a9aeae83 100644 --- a/server/src/engine/error.rs +++ b/server/src/engine/error.rs @@ -31,71 +31,78 @@ pub type QueryResult = Result; /// an enumeration of 'flat' errors that the server actually responds to the client with, since we do not want to send specific information /// about anything (as that will be a security hole). The variants correspond with their actual response codes -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, sky_macros::EnumMethods)] #[repr(u8)] pub enum QueryError { + // system /// I/O error - SysServerError, + SysServerError = 0, /// out of memory - SysOutOfMemory, + SysOutOfMemory = 1, /// unknown server error - SysUnknownError, + SysUnknownError = 2, + /// system auth error + SysAuthError = 3, + /// transactional error + SysTransactionalError = 4, + // exchange + NetworkSubsystemCorruptedPacket = 24, + // QL /// something like an integer that randomly has a character to attached to it like `1234q` - LexInvalidLiteral, + LexInvalidLiteral = 25, /// something like an invalid 'string" or a safe string with a bad length etc - LexInvalidEscapedLiteral, + LexInvalidParameter = 26, /// unexpected byte - LexUnexpectedByte, + LexUnexpectedByte = 27, /// expected a longer statement - QLUnexpectedEndOfStatement, + QLUnexpectedEndOfStatement = 28, /// incorrect syntax for "something" - QLInvalidSyntax, + QLInvalidSyntax = 29, /// invalid collection definition definition - QLInvalidCollectionSyntax, + QLInvalidCollectionSyntax = 30, /// invalid type definition syntax - QLInvalidTypeDefinitionSyntax, + QLInvalidTypeDefinitionSyntax = 31, /// expected a full entity definition - QPExpectedEntity, + QLExpectedEntity = 32, /// expected a statement, found something else - QPExpectedStatement, + QLExpectedStatement = 33, /// unknown statement - QPUnknownStatement, - /// this query needs a lock for execution, but that wasn't explicitly allowed anywhere - QPNeedLock, + QLUnknownStatement = 34, + // exec /// the object to be used as the "query container" is missing (for example, insert when the model was missing) - QPObjectNotFound, + QExecObjectNotFound = 100, /// an unknown field was attempted to be accessed/modified/... - QPUnknownField, + QExecUnknownField = 101, /// invalid property for an object - QPDdlInvalidProperties, + QExecDdlInvalidProperties = 102, /// create space/model, but the object already exists - QPDdlObjectAlreadyExists, + QExecDdlObjectAlreadyExists = 103, /// an object that was attempted to be removed is non-empty, and for this object, removals require it to be empty - QPDdlNotEmpty, + QExecDdlNotEmpty = 104, /// invalid type definition - QPDdlInvalidTypeDefinition, + QExecDdlInvalidTypeDefinition = 105, /// bad model definition - QPDdlModelBadDefinition, + QExecDdlModelBadDefinition = 106, /// illegal alter model query - QPDdlModelAlterIllegal, + QExecDdlModelAlterIllegal = 107, + // exec DML /// violated the uniqueness property - QPDmlDuplicate, + QExecDmlDuplicate = 150, /// the data could not be validated for being accepted into a field/function/etc. - QPDmlValidationError, + QExecDmlValidationError = 151, /// the where expression has an unindexed column essentially implying that we can't run this query because of perf concerns - QPDmlWhereHasUnindexedColumn, + QExecDmlWhereHasUnindexedColumn = 152, /// the row matching the given match expression was not found - QPDmlRowNotFound, - /// transactional error - TransactionalError, - SysAuthError, + QExecDmlRowNotFound = 153, + /// this query needs a lock for execution, but that wasn't explicitly allowed anywhere + QExecNeedLock = 154, } impl From for QueryError { fn from(e: super::fractal::error::Error) -> Self { match e.kind() { ErrorKind::IoError(_) | ErrorKind::Storage(_) => QueryError::SysServerError, - ErrorKind::Txn(_) => QueryError::TransactionalError, + ErrorKind::Txn(_) => QueryError::SysTransactionalError, ErrorKind::Other(_) => QueryError::SysUnknownError, ErrorKind::Config(_) => unreachable!("config error cannot propagate here"), } diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 6f669c2d..e83e53da 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -271,11 +271,6 @@ impl FractalMgr { // branch returning. but it is okay return Ok(()); } - // mark that we're taking these deltas - model.delta_state().__fractal_take_from_data_delta( - observed_size, - super::FractalToken::new(), - ); Self::try_write_model_data_batch(model, observed_size, mdl_driver) }, ); diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index 91235402..c23854a1 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -26,7 +26,7 @@ use { super::{ - core::GlobalNS, + core::{dml::QueryExecMeta, model::Model, GlobalNS}, data::uuid::Uuid, storage::{ self, @@ -123,17 +123,26 @@ pub trait GlobalInstanceLike { fn taskmgr_post_high_priority(&self, task: Task); fn taskmgr_post_standard_priority(&self, task: Task); // default impls - fn request_batch_resolve( + fn request_batch_resolve_if_cache_full( &self, space_name: &str, model_name: &str, - model_uuid: Uuid, - observed_len: usize, + model: &Model, + hint: QueryExecMeta, ) { - self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch( - ModelUniqueID::new(space_name, model_name, model_uuid), - observed_len, - ))) + let current_delta_size = hint.delta_hint(); + let index_size = model.primary_index().count(); + let five = (index_size as f64 * 0.05) as usize; + let max_delta = five.max(self.get_max_delta_size()); + if current_delta_size >= max_delta { + let obtained_delta_size = model + .delta_state() + .__fractal_take_full_from_data_delta(FractalToken::new()); + self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch( + ModelUniqueID::new(space_name, model_name, model.get_uuid()), + obtained_delta_size, + ))); + } } // config handle fn sys_cfg(&self) -> &config::SysConfig; diff --git a/server/src/engine/macros.rs b/server/src/engine/macros.rs index 0256eb7a..b23e8e72 100644 --- a/server/src/engine/macros.rs +++ b/server/src/engine/macros.rs @@ -88,6 +88,233 @@ macro_rules! flags { ); } +macro_rules! __kw_misc { + ($ident:ident) => { + $crate::engine::ql::lex::Token::Keyword($crate::engine::ql::lex::Keyword::Misc( + $crate::engine::ql::lex::KeywordMisc::$ident, + )) + }; +} + +macro_rules! __kw_stmt { + ($ident:ident) => { + $crate::engine::ql::lex::Token::Keyword($crate::engine::ql::lex::Keyword::Statement( + $crate::engine::ql::lex::KeywordStmt::$ident, + )) + }; +} + +/* + Frankly, this is just for lazy people like me. Do not judge + -- Sayan (@ohsayan) +*/ +macro_rules! Token { + // misc symbol + (@) => { + __sym_token!(SymAt) + }; + (#) => { + __sym_token!(SymHash) + }; + ($) => { + __sym_token!(SymDollar) + }; + (%) => { + __sym_token!(SymPercent) + }; + (.) => { + __sym_token!(SymPeriod) + }; + (,) => { + __sym_token!(SymComma) + }; + (_) => { + __sym_token!(SymUnderscore) + }; + (?) => { + __sym_token!(SymQuestion) + }; + (:) => { + __sym_token!(SymColon) + }; + (;) => { + __sym_token!(SymSemicolon) + }; + (~) => { + __sym_token!(SymTilde) + }; + // logical + (!) => { + __sym_token!(OpLogicalNot) + }; + (^) => { + __sym_token!(OpLogicalXor) + }; + (&) => { + __sym_token!(OpLogicalAnd) + }; + (|) => { + __sym_token!(OpLogicalOr) + }; + // operator misc. + (=) => { + __sym_token!(OpAssign) + }; + // arithmetic + (+) => { + __sym_token!(OpArithmeticAdd) + }; + (-) => { + __sym_token!(OpArithmeticSub) + }; + (*) => { + __sym_token!(OpArithmeticMul) + }; + (/) => { + __sym_token!(OpArithmeticDiv) + }; + // relational + (>) => { + __sym_token!(OpComparatorGt) + }; + (<) => { + __sym_token!(OpComparatorLt) + }; + // ddl keywords + (use) => { + __kw_stmt!(Use) + }; + (create) => { + __kw_stmt!(Create) + }; + (alter) => { + __kw_stmt!(Alter) + }; + (drop) => { + __kw_stmt!(Drop) + }; + (model) => { + __kw_misc!(Model) + }; + (space) => { + __kw_misc!(Space) + }; + (primary) => { + __kw_misc!(Primary) + }; + // ddl misc + (with) => { + __kw_misc!(With) + }; + (add) => { + __kw_misc!(Add) + }; + (remove) => { + __kw_misc!(Remove) + }; + (sort) => { + __kw_misc!(Sort) + }; + (type) => { + __kw_misc!(Type) + }; + // dml + (insert) => { + __kw_stmt!(Insert) + }; + (select) => { + __kw_stmt!(Select) + }; + (update) => { + __kw_stmt!(Update) + }; + (delete) => { + __kw_stmt!(Delete) + }; + // dml misc + (set) => { + __kw_misc!(Set) + }; + (limit) => { + __kw_misc!(Limit) + }; + (from) => { + __kw_misc!(From) + }; + (into) => { + __kw_misc!(Into) + }; + (where) => { + __kw_misc!(Where) + }; + (if) => { + __kw_misc!(If) + }; + (and) => { + __kw_misc!(And) + }; + (as) => { + __kw_misc!(As) + }; + (by) => { + __kw_misc!(By) + }; + (asc) => { + __kw_misc!(Asc) + }; + (desc) => { + __kw_misc!(Desc) + }; + // types + (string) => { + __kw_misc!(String) + }; + (binary) => { + __kw_misc!(Binary) + }; + (list) => { + __kw_misc!(List) + }; + (map) => { + __kw_misc!(Map) + }; + (bool) => { + __kw_misc!(Bool) + }; + (int) => { + __kw_misc!(Int) + }; + (double) => { + __kw_misc!(Double) + }; + (float) => { + __kw_misc!(Float) + }; + // tt + (open {}) => { + __sym_token!(TtOpenBrace) + }; + (close {}) => { + __sym_token!(TtCloseBrace) + }; + (() open) => { + __sym_token!(TtOpenParen) + }; + (() close) => { + __sym_token!(TtCloseParen) + }; + (open []) => { + __sym_token!(TtOpenSqBracket) + }; + (close []) => { + __sym_token!(TtCloseSqBracket) + }; + // misc + (null) => { + __kw_misc!(Null) + }; +} + macro_rules! union { ($(#[$attr:meta])* $vis:vis union $name:ident $tail:tt) => (union!(@parse [$(#[$attr])* $vis union $name] [] $tail);); ($(#[$attr:meta])* $vis:vis union $name:ident<$($lt:lifetime),*> $tail:tt) => (union!(@parse [$(#[$attr])* $vis union $name<$($lt),*>] [] $tail);); diff --git a/server/src/engine/net/mod.rs b/server/src/engine/net/mod.rs index 496a6730..2e1977c2 100644 --- a/server/src/engine/net/mod.rs +++ b/server/src/engine/net/mod.rs @@ -24,12 +24,13 @@ * */ -use super::config::ConfigEndpointTcp; - pub mod protocol; use { - crate::engine::{error::RuntimeResult, fractal::error::ErrorContext, fractal::Global}, + crate::engine::{ + config::ConfigEndpointTcp, error::RuntimeResult, fractal::error::ErrorContext, + fractal::Global, + }, bytes::BytesMut, openssl::{ pkey::PKey, @@ -39,7 +40,7 @@ use { }, std::{cell::Cell, net::SocketAddr, pin::Pin, time::Duration}, tokio::{ - io::{AsyncRead, AsyncWrite, BufWriter}, + io::{AsyncRead, AsyncWrite, BufWriter, AsyncWriteExt}, net::{TcpListener, TcpStream}, sync::{broadcast, mpsc, Semaphore}, }, @@ -128,6 +129,7 @@ impl ConnectionHandler { loop { tokio::select! { ret = protocol::query_loop(socket, buffer, global) => { + socket.flush().await?; match ret { Ok(QueryLoopResult::Fin) => return Ok(()), Ok(QueryLoopResult::Rst) => error!("connection reset while talking to client"), diff --git a/server/src/engine/net/protocol/exchange.rs b/server/src/engine/net/protocol/exchange.rs index 5bfe1904..1a1a1547 100644 --- a/server/src/engine/net/protocol/exchange.rs +++ b/server/src/engine/net/protocol/exchange.rs @@ -80,28 +80,7 @@ pub fn resume<'a>( state: QueryTimeExchangeState, ) -> QueryTimeExchangeResult<'a> { match state { - QueryTimeExchangeState::Initial => { - if cfg!(debug_assertions) { - if !scanner.has_left(EXCHANGE_MIN_SIZE) { - return STATE_READ_INITIAL; - } - } else { - assert!(scanner.has_left(EXCHANGE_MIN_SIZE)); - } - // attempt to read atleast one byte - if cfg!(debug_assertions) { - match scanner.try_next_byte() { - Some(b'S') => SQuery::resume_initial(scanner), - Some(_) => return STATE_ERROR, - None => return STATE_READ_INITIAL, - } - } else { - match unsafe { scanner.next_byte() } { - b'S' => SQuery::resume_initial(scanner), - _ => return STATE_ERROR, - } - } - } + QueryTimeExchangeState::Initial => SQuery::resume_initial(scanner), QueryTimeExchangeState::SQ1Meta1Partial { packet_size_part } => { SQuery::resume_at_sq1_meta1_partial(scanner, packet_size_part) } @@ -199,6 +178,26 @@ impl<'a> SQuery<'a> { impl<'a> SQuery<'a> { /// We're touching this packet for the first time fn resume_initial(scanner: &mut BufferedScanner<'a>) -> QueryTimeExchangeResult<'a> { + if cfg!(debug_assertions) { + if !scanner.has_left(EXCHANGE_MIN_SIZE) { + return STATE_READ_INITIAL; + } + } else { + assert!(scanner.has_left(EXCHANGE_MIN_SIZE)); + } + // attempt to read atleast one byte + if cfg!(debug_assertions) { + match scanner.try_next_byte() { + Some(b'S') => {} + Some(_) => return STATE_ERROR, + None => return STATE_READ_INITIAL, + } + } else { + match unsafe { scanner.next_byte() } { + b'S' => {} + _ => return STATE_ERROR, + } + } Self::resume_at_sq1_meta1_partial(scanner, 0) } /// We found some part of meta1, and need to resume @@ -239,11 +238,11 @@ impl<'a> SQuery<'a> { match parse_lf_separated(scanner, prev_qw_buffered) { LFTIntParseResult::Value(q_window) => { // we got the q window; can we complete the exchange? - Self::resume_at_final( - scanner, - q_window as usize, - Self::compute_df_size(scanner, static_size, packet_size), - ) + let df_size = Self::compute_df_size(scanner, static_size, packet_size); + if df_size == 0 { + return QueryTimeExchangeResult::Error; + } + Self::resume_at_final(scanner, q_window as usize, df_size) } LFTIntParseResult::Partial(q_window_partial) => { // not enough bytes for getting Q window @@ -290,7 +289,7 @@ impl<'a> SQuery<'a> { impl<'a> SQuery<'a> { fn compute_df_size(scanner: &BufferedScanner, static_size: usize, packet_size: usize) -> usize { - packet_size - scanner.cursor() + static_size + (packet_size + static_size) - scanner.cursor() } fn compute_df_remaining(scanner: &BufferedScanner<'_>, df_size: usize) -> usize { (scanner.cursor() + df_size) - scanner.buffer_len() diff --git a/server/src/engine/net/protocol/mod.rs b/server/src/engine/net/protocol/mod.rs index a2a2fa23..1044e2a1 100644 --- a/server/src/engine/net/protocol/mod.rs +++ b/server/src/engine/net/protocol/mod.rs @@ -24,8 +24,6 @@ * */ -use tokio::io::AsyncWriteExt; - mod exchange; mod handshake; #[cfg(test)] @@ -45,13 +43,19 @@ use { super::{IoResult, QueryLoopResult, Socket}, crate::engine::{ self, + error::QueryError, fractal::{Global, GlobalInstanceLike}, mem::BufferedScanner, }, bytes::{Buf, BytesMut}, - tokio::io::{AsyncReadExt, BufWriter}, + tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}, }; +#[derive(Debug, PartialEq)] +pub enum Response { + Empty, +} + pub(super) async fn query_loop( con: &mut BufWriter, buf: &mut BytesMut, @@ -64,13 +68,14 @@ pub(super) async fn query_loop( PostHandshake::ConnectionClosedRst => return Ok(QueryLoopResult::Rst), PostHandshake::Error(e) => { // failed to handshake; we'll close the connection - let hs_err_packet = [b'H', b'1', 1, e.value_u8()]; + let hs_err_packet = [b'H', 0, 1, e.value_u8()]; con.write_all(&hs_err_packet).await?; return Ok(QueryLoopResult::HSFailed); } }; // done handshaking - con.write_all(b"H1\x00\x00\x00").await?; + con.write_all(b"H\x00\x00\x00").await?; + con.flush().await?; let mut exchg_state = QueryTimeExchangeState::default(); let mut expect_more = exchange::EXCHANGE_MIN_SIZE; let mut cursor = 0; @@ -100,21 +105,27 @@ pub(super) async fn query_loop( } QueryTimeExchangeResult::SQCompleted(sq) => sq, QueryTimeExchangeResult::Error => { - con.write_all(b"!\x00").await?; + let [a, b] = + (QueryError::NetworkSubsystemCorruptedPacket.value_u8() as u16).to_le_bytes(); + con.write_all(&[0x10, a, b]).await?; + con.flush().await?; + buf.clear(); exchg_state = QueryTimeExchangeState::default(); continue; } }; // now execute query - match engine::core::exec::execute_query(global, sq).await { - Ok(()) => { - buf.clear(); + match engine::core::exec::dispatch_to_executor(global, sq).await { + Ok(Response::Empty) => { + con.write_all(&[0x12]).await?; + } + Err(e) => { + let [a, b] = (e.value_u8() as u16).to_le_bytes(); + con.write_all(&[0x10, a, b]).await?; } - Err(_e) => { - // TOOD(@ohsayan): actual error codes! - con.write_all(&[b'!', 1]).await?; - }, } + con.flush().await?; + buf.clear(); exchg_state = QueryTimeExchangeState::default(); } } @@ -152,6 +163,7 @@ async fn do_handshake( match handshake::CHandshake::resume_with(&mut scanner, state) { HandshakeResult::Completed(hs) => { handshake = hs; + cursor = scanner.cursor(); break; } HandshakeResult::ChangeState { new_state, expect } => { diff --git a/server/src/engine/ql/ast/mod.rs b/server/src/engine/ql/ast/mod.rs index 1072ef25..fbc75463 100644 --- a/server/src/engine/ql/ast/mod.rs +++ b/server/src/engine/ql/ast/mod.rs @@ -26,10 +26,11 @@ pub mod traits; +#[cfg(debug_assertions)] +use self::traits::ASTNode; #[cfg(test)] pub use traits::{parse_ast_node_full, parse_ast_node_multiple_full}; use { - self::traits::ASTNode, super::{ ddl, dml, lex::{Ident, Token}, @@ -57,7 +58,6 @@ pub struct State<'a, Qd> { f: bool, } -#[cfg(test)] impl<'a> State<'a, InplaceData> { pub const fn new_inplace(tok: &'a [Token<'a>]) -> Self { Self::new(tok, InplaceData::new()) @@ -392,7 +392,7 @@ impl<'a> Entity<'a> { *c += 1; Self::parse_uck_tokens_single(tok) }, - _ => return Err(QueryError::QPExpectedEntity), + _ => return Err(QueryError::QLExpectedEntity), }; Ok(r) } @@ -408,7 +408,7 @@ impl<'a> Entity<'a> { Ok(e.assume_init()) } } else { - Err(QueryError::QPExpectedEntity) + Err(QueryError::QLExpectedEntity) } } #[inline(always)] @@ -495,6 +495,7 @@ pub enum Statement<'a> { } #[inline(always)] +#[cfg(debug_assertions)] pub fn compile<'a, Qd: QueryData<'a>>(tok: &'a [Token<'a>], d: Qd) -> QueryResult> { if compiler::unlikely(tok.len() < 2) { return Err(QueryError::QLUnexpectedEndOfStatement); @@ -506,12 +507,12 @@ pub fn compile<'a, Qd: QueryData<'a>>(tok: &'a [Token<'a>], d: Qd) -> QueryResul Token![create] => match state.fw_read() { Token![model] => ASTNode::from_state(&mut state).map(Statement::CreateModel), Token![space] => ASTNode::from_state(&mut state).map(Statement::CreateSpace), - _ => compiler::cold_rerr(QueryError::QPUnknownStatement), + _ => compiler::cold_rerr(QueryError::QLUnknownStatement), }, Token![alter] => match state.fw_read() { Token![model] => ASTNode::from_state(&mut state).map(Statement::AlterModel), Token![space] => ASTNode::from_state(&mut state).map(Statement::AlterSpace), - _ => compiler::cold_rerr(QueryError::QPUnknownStatement), + _ => compiler::cold_rerr(QueryError::QLUnknownStatement), }, Token![drop] if state.remaining() >= 2 => ddl::drop::parse_drop(&mut state), Token::Ident(id) if id.eq_ignore_ascii_case("inspect") => { @@ -522,6 +523,6 @@ pub fn compile<'a, Qd: QueryData<'a>>(tok: &'a [Token<'a>], d: Qd) -> QueryResul Token![select] => ASTNode::from_state(&mut state).map(Statement::Select), Token![update] => ASTNode::from_state(&mut state).map(Statement::Update), Token![delete] => ASTNode::from_state(&mut state).map(Statement::Delete), - _ => compiler::cold_rerr(QueryError::QPUnknownStatement), + _ => compiler::cold_rerr(QueryError::QLUnknownStatement), } } diff --git a/server/src/engine/ql/ddl/alt.rs b/server/src/engine/ql/ddl/alt.rs index e4228375..4f833695 100644 --- a/server/src/engine/ql/ddl/alt.rs +++ b/server/src/engine/ql/ddl/alt.rs @@ -123,7 +123,7 @@ impl<'a> AlterModel<'a> { Token![add] => AlterKind::alter_add(state), Token![remove] => AlterKind::alter_remove(state), Token![update] => AlterKind::alter_update(state), - _ => Err(QueryError::QPExpectedStatement), + _ => Err(QueryError::QLExpectedStatement), }; kind.map(|kind| AlterModel::new(model_name, kind)) } diff --git a/server/src/engine/ql/ddl/drop.rs b/server/src/engine/ql/ddl/drop.rs index 6ac4b311..fed26836 100644 --- a/server/src/engine/ql/ddl/drop.rs +++ b/server/src/engine/ql/ddl/drop.rs @@ -97,7 +97,7 @@ pub fn parse_drop<'a, Qd: QueryData<'a>>(state: &mut State<'a, Qd>) -> QueryResu match state.fw_read() { Token![model] => DropModel::parse(state).map(Statement::DropModel), Token![space] => return DropSpace::parse(state).map(Statement::DropSpace), - _ => Err(QueryError::QPUnknownStatement), + _ => Err(QueryError::QLUnknownStatement), } } diff --git a/server/src/engine/ql/ddl/ins.rs b/server/src/engine/ql/ddl/ins.rs index 827dd84e..ff07600d 100644 --- a/server/src/engine/ql/ddl/ins.rs +++ b/server/src/engine/ql/ddl/ins.rs @@ -65,7 +65,7 @@ pub fn parse_inspect<'a, Qd: QueryData<'a>>( } _ => { state.cursor_back(); - Err(QueryError::QPExpectedStatement) + Err(QueryError::QLExpectedStatement) } } } diff --git a/server/src/engine/ql/ddl/syn.rs b/server/src/engine/ql/ddl/syn.rs index f4296e05..b2fcffe6 100644 --- a/server/src/engine/ql/ddl/syn.rs +++ b/server/src/engine/ql/ddl/syn.rs @@ -515,7 +515,7 @@ impl<'a> ExpandedField<'a> { Err(QueryError::QLInvalidSyntax) } } - _ => Err(QueryError::QPExpectedStatement), + _ => Err(QueryError::QLExpectedStatement), } } } diff --git a/server/src/engine/ql/lex/mod.rs b/server/src/engine/ql/lex/mod.rs index 49b3094b..ad9f3478 100644 --- a/server/src/engine/ql/lex/mod.rs +++ b/server/src/engine/ql/lex/mod.rs @@ -426,7 +426,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe { let nb = slf.param_buffer.next_byte(); slf.l.push_token(Token::Lit(Lit::new_bool(nb == 1))); if nb > 1 { - slf.l.set_error(QueryError::LexInvalidEscapedLiteral); + slf.l.set_error(QueryError::LexInvalidParameter); } }, // uint @@ -435,7 +435,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe { .try_next_ascii_u64_lf_separated_or_restore_cursor() { Some(int) => slf.l.push_token(Lit::new_uint(int)), - None => slf.l.set_error(QueryError::LexInvalidEscapedLiteral), + None => slf.l.set_error(QueryError::LexInvalidParameter), }, // sint |slf| { @@ -452,7 +452,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe { .param_buffer .try_next_ascii_u64_lf_separated_or_restore_cursor() else { - slf.l.set_error(QueryError::LexInvalidEscapedLiteral); + slf.l.set_error(QueryError::LexInvalidParameter); return; }; let body = match slf @@ -461,13 +461,13 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe { { Some(body) => body, None => { - slf.l.set_error(QueryError::LexInvalidEscapedLiteral); + slf.l.set_error(QueryError::LexInvalidParameter); return; } }; match core::str::from_utf8(body).map(core::str::FromStr::from_str) { Ok(Ok(fp)) => slf.l.push_token(Lit::new_float(fp)), - _ => slf.l.set_error(QueryError::LexInvalidEscapedLiteral), + _ => slf.l.set_error(QueryError::LexInvalidParameter), } }, // binary @@ -476,7 +476,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe { .param_buffer .try_next_ascii_u64_lf_separated_or_restore_cursor() else { - slf.l.set_error(QueryError::LexInvalidEscapedLiteral); + slf.l.set_error(QueryError::LexInvalidParameter); return; }; match slf @@ -484,7 +484,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe { .try_next_variable_block(size_of_body as usize) { Some(block) => slf.l.push_token(Lit::new_bin(block)), - None => slf.l.set_error(QueryError::LexInvalidEscapedLiteral), + None => slf.l.set_error(QueryError::LexInvalidParameter), } }, // string @@ -493,7 +493,7 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe { .param_buffer .try_next_ascii_u64_lf_separated_or_restore_cursor() else { - slf.l.set_error(QueryError::LexInvalidEscapedLiteral); + slf.l.set_error(QueryError::LexInvalidParameter); return; }; match slf @@ -503,10 +503,10 @@ static SCAN_PARAM: [unsafe fn(&mut SecureLexer); 8] = unsafe { { // TODO(@ohsayan): obliterate this alloc Some(Ok(s)) => slf.l.push_token(Lit::new_string(s.to_owned())), - _ => slf.l.set_error(QueryError::LexInvalidEscapedLiteral), + _ => slf.l.set_error(QueryError::LexInvalidParameter), } }, // ecc - |s| s.l.set_error(QueryError::LexInvalidEscapedLiteral), + |s| s.l.set_error(QueryError::LexInvalidParameter), ] }; diff --git a/server/src/engine/ql/lex/raw.rs b/server/src/engine/ql/lex/raw.rs index cb751d29..d64cbafb 100644 --- a/server/src/engine/ql/lex/raw.rs +++ b/server/src/engine/ql/lex/raw.rs @@ -321,25 +321,26 @@ macro_rules! flattened_lut { } flattened_lut! { - static KW_LUT in kwlut; + static KW in kw; #[derive(Debug, PartialEq, Clone, Copy)] #[repr(u8)] pub enum Keyword { Statement => { - #[derive(Debug, PartialEq, Clone, Copy, sky_macros::EnumMethods)] + #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, sky_macros::EnumMethods)] #[repr(u8)] /// A statement keyword pub enum KeywordStmt { - // sys + // system Sysctl = 0, - Describe = 1, - Inspect = 2, - // ddl - Use = 3, - Create = 4, - Alter = 5, - Drop = 6, - // dml + // DDL + Create = 1, + Alter = 2, + Drop = 3, + // system/DDL misc + Use = 4, + Inspect = 5, + Describe = 6, + // DML Insert = 7, Select = 8, Update = 9, @@ -425,18 +426,18 @@ impl Keyword { } } fn compute(key: &[u8]) -> Option { - static G: [u8; 67] = [ - 0, 42, 57, 0, 20, 61, 15, 46, 28, 0, 31, 2, 1, 44, 47, 10, 35, 53, 30, 28, 48, 9, 1, - 51, 61, 20, 20, 47, 23, 31, 0, 52, 55, 59, 27, 45, 54, 49, 29, 0, 66, 54, 23, 58, 13, - 31, 47, 56, 1, 30, 40, 0, 0, 42, 27, 63, 6, 24, 65, 45, 42, 63, 60, 14, 26, 4, 13, + static G: [u8; 64] = [ + 0, 27, 13, 56, 18, 0, 26, 30, 33, 56, 20, 41, 56, 39, 23, 34, 36, 23, 17, 40, 38, 45, + 8, 25, 26, 24, 53, 59, 30, 14, 9, 60, 12, 29, 6, 47, 3, 38, 19, 5, 13, 51, 41, 34, 0, + 22, 43, 13, 46, 33, 11, 12, 36, 58, 40, 0, 36, 2, 19, 49, 53, 23, 55, 0, ]; - static M1: [u8; 11] = *b"wsE1pgJgJMO"; - static M2: [u8; 11] = *b"fICAB04WegN"; + static M1: [u8; 11] = *b"RtEMxHylmiZ"; + static M2: [u8; 11] = *b"F1buDOZ2nzz"; let h1 = Self::_sum(key, M1) % G.len(); let h2 = Self::_sum(key, M2) % G.len(); let h = (G[h1] + G[h2]) as usize % G.len(); - if h < G.len() && KW_LUT[h].0.eq_ignore_ascii_case(key) { - Some(KW_LUT[h].1) + if h < KW.len() && KW[h].0.eq_ignore_ascii_case(key) { + Some(KW[h].1) } else { None } @@ -453,3 +454,9 @@ impl Keyword { sum } } + +impl KeywordStmt { + pub const fn is_blocking(&self) -> bool { + self.value_u8() <= Self::Drop.value_u8() + } +} diff --git a/server/src/engine/ql/macros.rs b/server/src/engine/ql/macros.rs index 2bc26cd6..fdad0dd5 100644 --- a/server/src/engine/ql/macros.rs +++ b/server/src/engine/ql/macros.rs @@ -30,229 +30,6 @@ macro_rules! __sym_token { }; } -macro_rules! __kw_misc { - ($ident:ident) => { - $crate::engine::ql::lex::Token::Keyword($crate::engine::ql::lex::Keyword::Misc($crate::engine::ql::lex::KeywordMisc::$ident)) - }; -} - -macro_rules! __kw_stmt { - ($ident:ident) => { - $crate::engine::ql::lex::Token::Keyword($crate::engine::ql::lex::Keyword::Statement($crate::engine::ql::lex::KeywordStmt::$ident)) - }; -} - -/* - Frankly, this is just for lazy people like me. Do not judge - -- Sayan (@ohsayan) -*/ -macro_rules! Token { - // misc symbol - (@) => { - __sym_token!(SymAt) - }; - (#) => { - __sym_token!(SymHash) - }; - ($) => { - __sym_token!(SymDollar) - }; - (%) => { - __sym_token!(SymPercent) - }; - (.) => { - __sym_token!(SymPeriod) - }; - (,) => { - __sym_token!(SymComma) - }; - (_) => { - __sym_token!(SymUnderscore) - }; - (?) => { - __sym_token!(SymQuestion) - }; - (:) => { - __sym_token!(SymColon) - }; - (;) => { - __sym_token!(SymSemicolon) - }; - (~) => { - __sym_token!(SymTilde) - }; - // logical - (!) => { - __sym_token!(OpLogicalNot) - }; - (^) => { - __sym_token!(OpLogicalXor) - }; - (&) => { - __sym_token!(OpLogicalAnd) - }; - (|) => { - __sym_token!(OpLogicalOr) - }; - // operator misc. - (=) => { - __sym_token!(OpAssign) - }; - // arithmetic - (+) => { - __sym_token!(OpArithmeticAdd) - }; - (-) => { - __sym_token!(OpArithmeticSub) - }; - (*) => { - __sym_token!(OpArithmeticMul) - }; - (/) => { - __sym_token!(OpArithmeticDiv) - }; - // relational - (>) => { - __sym_token!(OpComparatorGt) - }; - (<) => { - __sym_token!(OpComparatorLt) - }; - // ddl keywords - (use) => { - __kw_stmt!(Use) - }; - (create) => { - __kw_stmt!(Create) - }; - (alter) => { - __kw_stmt!(Alter) - }; - (drop) => { - __kw_stmt!(Drop) - }; - (model) => { - __kw_misc!(Model) - }; - (space) => { - __kw_misc!(Space) - }; - (primary) => { - __kw_misc!(Primary) - }; - // ddl misc - (with) => { - __kw_misc!(With) - }; - (add) => { - __kw_misc!(Add) - }; - (remove) => { - __kw_misc!(Remove) - }; - (sort) => { - __kw_misc!(Sort) - }; - (type) => { - __kw_misc!(Type) - }; - // dml - (insert) => { - __kw_stmt!(Insert) - }; - (select) => { - __kw_stmt!(Select) - }; - (update) => { - __kw_stmt!(Update) - }; - (delete) => { - __kw_stmt!(Delete) - }; - // dml misc - (set) => { - __kw_misc!(Set) - }; - (limit) => { - __kw_misc!(Limit) - }; - (from) => { - __kw_misc!(From) - }; - (into) => { - __kw_misc!(Into) - }; - (where) => { - __kw_misc!(Where) - }; - (if) => { - __kw_misc!(If) - }; - (and) => { - __kw_misc!(And) - }; - (as) => { - __kw_misc!(As) - }; - (by) => { - __kw_misc!(By) - }; - (asc) => { - __kw_misc!(Asc) - }; - (desc) => { - __kw_misc!(Desc) - }; - // types - (string) => { - __kw_misc!(String) - }; - (binary) => { - __kw_misc!(Binary) - }; - (list) => { - __kw_misc!(List) - }; - (map) => { - __kw_misc!(Map) - }; - (bool) => { - __kw_misc!(Bool) - }; - (int) => { - __kw_misc!(Int) - }; - (double) => { - __kw_misc!(Double) - }; - (float) => { - __kw_misc!(Float) - }; - // tt - (open {}) => { - __sym_token!(TtOpenBrace) - }; - (close {}) => { - __sym_token!(TtCloseBrace) - }; - (() open) => { - __sym_token!(TtOpenParen) - }; - (() close) => { - __sym_token!(TtCloseParen) - }; - (open []) => { - __sym_token!(TtOpenSqBracket) - }; - (close []) => { - __sym_token!(TtCloseSqBracket) - }; - // misc - (null) => { - __kw_misc!(Null) - }; -} - #[cfg(test)] macro_rules! dict { () => { diff --git a/server/src/engine/storage/v1/batch_jrnl/persist.rs b/server/src/engine/storage/v1/batch_jrnl/persist.rs index ee61ddb0..3dfcbb2b 100644 --- a/server/src/engine/storage/v1/batch_jrnl/persist.rs +++ b/server/src/engine/storage/v1/batch_jrnl/persist.rs @@ -134,9 +134,9 @@ impl DataBatchPersistDriver { Ok(()) => Ok(()), Err(_) => { // republish changes since we failed to commit - restore_list - .into_iter() - .for_each(|delta| model.delta_state().append_new_data_delta(delta, &g)); + restore_list.into_iter().for_each(|delta| { + model.delta_state().append_new_data_delta(delta, &g); + }); // now attempt to fix the file return self.attempt_fix_data_batchfile(); } diff --git a/server/src/engine/txn/gns/tests/full_chain.rs b/server/src/engine/txn/gns/tests/full_chain.rs index 4bac479e..ec66e073 100644 --- a/server/src/engine/txn/gns/tests/full_chain.rs +++ b/server/src/engine/txn/gns/tests/full_chain.rs @@ -318,7 +318,7 @@ fn drop_model() { .namespace() .with_model(("myspace", "mymodel"), |_| { Ok(()) }) .unwrap_err(), - QueryError::QPObjectNotFound + QueryError::QExecObjectNotFound ); }) })