Simplify schema changes

next
Sayan Nandan 10 months ago
parent 3e49971efc
commit 341a896c37
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -29,7 +29,7 @@ use crate::engine::{
self,
dml::QueryExecMeta,
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{delta::DataDeltaKind, Fields, Model},
model::{delta::DataDeltaKind, Model},
},
error::{QueryError, QueryResult},
fractal::GlobalInstanceLike,
@ -48,8 +48,7 @@ pub fn insert_resp(
pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> QueryResult<()> {
core::with_model_for_data_update(global, insert.entity(), |mdl| {
let irmwd = mdl.intent_write_new_data();
let (pk, data) = prepare_insert(mdl, irmwd.fields(), insert.data())?;
let (pk, data) = prepare_insert(mdl, insert.data())?;
let g = cpin();
let ds = mdl.delta_state();
// create new version
@ -68,9 +67,9 @@ pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> Quer
// TODO(@ohsayan): optimize null case
fn prepare_insert(
model: &Model,
fields: &Fields,
insert: InsertData,
) -> QueryResult<(PrimaryIndexKey, DcFieldIndex)> {
let fields = model.fields();
let mut okay = fields.len() == insert.column_count();
let mut prepared_data = DcFieldIndex::idx_init_cap(fields.len());
match insert {

@ -102,7 +102,6 @@ where
F: FnMut(&Datacell),
{
global.namespace().with_model(select.entity(), |mdl| {
let irm = mdl.intent_read_model();
let target_key = mdl.resolve_where(select.clauses_mut())?;
let pkdc = VirtualDatacell::new(target_key.clone());
let g = sync::atm::cpin();
@ -118,7 +117,7 @@ where
Some(row) => {
let r = row.resolve_schema_deltas_and_freeze(mdl.delta_state());
if select.is_wildcard() {
for key in irm.fields().stseq_ord_key() {
for key in mdl.fields().stseq_ord_key() {
read_field(key.as_ref(), r.fields())?;
}
} else {

@ -266,8 +266,6 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) ->
let mut ret = Ok(QueryExecMeta::zero());
// prepare row fetch
let key = mdl.resolve_where(update.clauses_mut())?;
// freeze schema
let irm = mdl.intent_read_model();
// fetch row
let g = sync::atm::cpin();
let Some(row) = mdl.primary_index().select(key, &g) else {
@ -298,7 +296,7 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) ->
let field_definition;
let field_data;
match (
irm.fields().st_get(lhs.as_str()),
mdl.fields().st_get(lhs.as_str()),
row_data_wl.fields_mut().st_get_mut(lhs.as_str()),
) {
(Some(fdef), Some(fdata)) => {

@ -167,9 +167,8 @@ impl Row {
}
// we have deltas to apply
let mut wl = RwLockUpgradableReadGuard::upgrade(rwl_ug);
let delta_read = delta_state.schema_delta_read();
let mut max_delta = wl.txn_revised_schema_version;
for (delta_id, delta) in delta_read.resolve_iter_since(wl.txn_revised_schema_version) {
for (delta_id, delta) in delta_state.resolve_iter_since(wl.txn_revised_schema_version) {
match delta.kind() {
SchemaDeltaKind::FieldAdd(f) => {
wl.fields.st_insert(f.clone(), Datacell::null());

@ -87,13 +87,13 @@ impl GlobalNS {
let mut space = space.write();
f(&mut space)
}
pub fn with_model_space<'a, T, F>(&self, entity: Entity<'a>, f: F) -> QueryResult<T>
pub fn with_model_space_mut_for_ddl<'a, T, F>(&self, entity: Entity<'a>, f: F) -> QueryResult<T>
where
F: FnOnce(&Space, &Model) -> QueryResult<T>,
F: FnOnce(&Space, &mut Model) -> QueryResult<T>,
{
let (space, model_name) = entity.into_full_result()?;
let mdl_idx = self.idx_mdl.read();
let Some(model) = mdl_idx.get(&EntityIDRef::new(&space, &model_name)) else {
let mut mdl_idx = self.idx_mdl.write();
let Some(model) = mdl_idx.get_mut(&EntityIDRef::new(&space, &model_name)) else {
return Err(QueryError::QExecObjectNotFound);
};
let space_read = self.idx.read();

@ -25,7 +25,7 @@
*/
use {
super::{Field, IWModel, Layer, Model},
super::{Field, Layer, Model},
crate::{
engine::{
data::{
@ -76,7 +76,7 @@ macro_rules! can_ignore {
}
#[inline(always)]
fn no_field(mr: &IWModel, new: &str) -> bool {
fn no_field(mr: &Model, new: &str) -> bool {
!mr.fields().st_contains(new)
}
@ -90,8 +90,7 @@ fn check_nullable(props: &mut HashMap<Box<str>, DictEntryGeneric>) -> QueryResul
impl<'a> AlterPlan<'a> {
pub fn fdeltas(
mv: &Model,
wm: &IWModel,
mdl: &Model,
AlterModel { model, kind }: AlterModel<'a>,
) -> QueryResult<AlterPlan<'a>> {
let mut no_lock = true;
@ -104,8 +103,8 @@ impl<'a> AlterPlan<'a> {
}
let mut not_found = false;
if r.iter().all(|id| {
let not_pk = mv.not_pk(id);
let exists = !no_field(wm, id.as_str());
let not_pk = mdl.not_pk(id);
let exists = !no_field(mdl, id.as_str());
not_found = !exists;
not_pk & exists
}) {
@ -125,7 +124,7 @@ impl<'a> AlterPlan<'a> {
layers,
mut props,
} = fields.next().unwrap();
okay &= no_field(wm, &field_name) & mv.not_pk(&field_name);
okay &= no_field(mdl, &field_name) & mdl.not_pk(&field_name);
let is_nullable = check_nullable(&mut props)?;
let layers = Field::parse_layers(layers, is_nullable)?;
okay &= add.st_insert(field_name.to_string().into_boxed_str(), layers);
@ -144,9 +143,9 @@ impl<'a> AlterPlan<'a> {
mut props,
} = updated_fields.next().unwrap();
// enforce pk
mv.guard_pk(&field_name)?;
mdl.guard_pk(&field_name)?;
// get the current field
let Some(current_field) = wm.fields().st_get(field_name.as_str()) else {
let Some(current_field) = mdl.fields().st_get(field_name.as_str()) else {
return Err(QueryError::QExecUnknownField);
};
// check props
@ -255,22 +254,18 @@ impl Model {
let (space_name, model_name) = alter.model.into_full_result()?;
global
.namespace()
.with_model_space(alter.model, |space, model| {
// make intent
let iwm = model.intent_write_model();
.with_model_space_mut_for_ddl(alter.model, |space, model| {
// prepare plan
let plan = AlterPlan::fdeltas(model, &iwm, alter)?;
let plan = AlterPlan::fdeltas(model, alter)?;
// we have a legal plan; acquire exclusive if we need it
if !plan.no_lock {
// TODO(@ohsayan): allow this later on, once we define the syntax
return Err(QueryError::QExecNeedLock);
}
// fine, we're good
let mut iwm = iwm;
match plan.action {
AlterAction::Ignore => drop(iwm),
AlterAction::Ignore => {}
AlterAction::Add(new_fields) => {
let mut guard = model.delta_state().schema_delta_write();
// TODO(@ohsayan): this impacts lockdown duration; fix it
if G::FS_IS_NON_NULL {
// prepare txn
@ -291,13 +286,12 @@ impl Model {
.map(|(x, y)| (x.clone(), y.clone()))
.for_each(|(field_id, field)| {
model
.delta_state()
.schema_append_unresolved_wl_field_add(&mut guard, &field_id);
iwm.fields_mut().st_insert(field_id, field);
.delta_state_mut()
.schema_append_unresolved_wl_field_add(&field_id);
model.fields_mut().st_insert(field_id, field);
});
}
AlterAction::Remove(removed) => {
let mut guard = model.delta_state().schema_delta_write();
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::AlterModelRemoveTxn::new(
@ -308,11 +302,10 @@ impl Model {
global.namespace_txn_driver().lock().try_commit(txn)?;
}
removed.iter().for_each(|field_id| {
model.delta_state().schema_append_unresolved_wl_field_rem(
&mut guard,
field_id.as_str(),
);
iwm.fields_mut().st_delete(field_id.as_str());
model
.delta_state_mut()
.schema_append_unresolved_wl_field_rem(field_id.as_str());
model.fields_mut().st_delete(field_id.as_str());
});
}
AlterAction::Update(updated) => {
@ -326,7 +319,7 @@ impl Model {
global.namespace_txn_driver().lock().try_commit(txn)?;
}
updated.into_iter().for_each(|(field_id, field)| {
iwm.fields_mut().st_update(&field_id, field);
model.fields_mut().st_update(&field_id, field);
});
}
}

@ -25,136 +25,25 @@
*/
use {
super::{Fields, Model},
super::Model,
crate::engine::{
core::{dml::QueryExecMeta, index::Row},
fractal::{FractalToken, GlobalInstanceLike},
sync::atm::Guard,
sync::queue::Queue,
},
parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard},
std::{
collections::btree_map::{BTreeMap, Range},
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
},
};
/*
sync matrix
*/
// FIXME(@ohsayan): This an inefficient repr of the matrix; replace it with my other design
#[derive(Debug)]
/// A sync matrix enables different queries to have different access permissions on the data model, and the data in the
/// index
pub struct ISyncMatrix {
// virtual privileges
/// read/write model
v_priv_model_alter: RwLock<()>,
/// RW data/block all
v_priv_data_new_or_revise: RwLock<()>,
}
#[cfg(test)]
impl PartialEq for ISyncMatrix {
fn eq(&self, _: &Self) -> bool {
true
}
}
#[derive(Debug)]
/// Read model, write new data
pub struct IRModelSMData<'a> {
_rmodel: RwLockReadGuard<'a, ()>,
_mdata: RwLockReadGuard<'a, ()>,
fields: &'a Fields,
}
impl<'a> IRModelSMData<'a> {
pub fn new(m: &'a Model) -> Self {
let rmodel = m.sync_matrix().v_priv_model_alter.read();
let mdata = m.sync_matrix().v_priv_data_new_or_revise.read();
Self {
_rmodel: rmodel,
_mdata: mdata,
fields: unsafe {
// UNSAFE(@ohsayan): we already have acquired this resource
m._read_fields()
},
}
}
pub fn fields(&'a self) -> &'a Fields {
self.fields
}
}
#[derive(Debug)]
/// Read model
pub struct IRModel<'a> {
_rmodel: RwLockReadGuard<'a, ()>,
fields: &'a Fields,
}
impl<'a> IRModel<'a> {
pub fn new(m: &'a Model) -> Self {
Self {
_rmodel: m.sync_matrix().v_priv_model_alter.read(),
fields: unsafe {
// UNSAFE(@ohsayan): we already have acquired this resource
m._read_fields()
},
}
}
pub fn fields(&'a self) -> &'a Fields {
self.fields
}
}
#[derive(Debug)]
/// Write model
pub struct IWModel<'a> {
_wmodel: RwLockWriteGuard<'a, ()>,
fields: &'a mut Fields,
}
impl<'a> IWModel<'a> {
pub fn new(m: &'a Model) -> Self {
Self {
_wmodel: m.sync_matrix().v_priv_model_alter.write(),
fields: unsafe {
// UNSAFE(@ohsayan): we have exclusive access to this resource
m._read_fields_mut()
},
}
}
pub fn fields(&'a self) -> &'a Fields {
self.fields
}
// ALIASING
pub fn fields_mut(&mut self) -> &mut Fields {
self.fields
}
}
impl ISyncMatrix {
pub const fn new() -> Self {
Self {
v_priv_model_alter: RwLock::new(()),
v_priv_data_new_or_revise: RwLock::new(()),
}
}
}
/*
delta
*/
#[derive(Debug)]
/// A delta state for the model
pub struct DeltaState {
// schema
schema_current_version: AtomicU64,
schema_deltas: RwLock<BTreeMap<DeltaVersion, SchemaDeltaPart>>,
schema_current_version: u64,
schema_deltas: BTreeMap<DeltaVersion, SchemaDeltaPart>,
// data
data_current_version: AtomicU64,
data_deltas: Queue<DataDelta>,
@ -165,8 +54,8 @@ impl DeltaState {
/// A new, fully resolved delta state with version counters set to 0
pub fn new_resolved() -> Self {
Self {
schema_current_version: AtomicU64::new(0),
schema_deltas: RwLock::new(BTreeMap::new()),
schema_current_version: 0,
schema_deltas: BTreeMap::new(),
data_current_version: AtomicU64::new(0),
data_deltas: Queue::new(),
data_deltas_size: AtomicUsize::new(0),
@ -218,51 +107,32 @@ impl DeltaState {
// schema
impl DeltaState {
pub fn schema_delta_write<'a>(&'a self) -> SchemaDeltaIndexWGuard<'a> {
SchemaDeltaIndexWGuard(self.schema_deltas.write())
}
pub fn schema_delta_read<'a>(&'a self) -> SchemaDeltaIndexRGuard<'a> {
SchemaDeltaIndexRGuard(self.schema_deltas.read())
}
pub fn schema_current_version(&self) -> DeltaVersion {
self.__schema_delta_current()
}
pub fn schema_append_unresolved_wl_field_add(
pub fn resolve_iter_since(
&self,
guard: &mut SchemaDeltaIndexWGuard,
field_name: &str,
) {
self.__schema_append_unresolved_delta(&mut guard.0, SchemaDeltaPart::field_add(field_name));
current_version: DeltaVersion,
) -> Range<DeltaVersion, SchemaDeltaPart> {
self.schema_deltas.range(current_version.step()..)
}
pub fn schema_append_unresolved_wl_field_rem(
&self,
guard: &mut SchemaDeltaIndexWGuard,
field_name: &str,
) {
self.__schema_append_unresolved_delta(&mut guard.0, SchemaDeltaPart::field_rem(field_name));
pub fn schema_current_version(&self) -> DeltaVersion {
DeltaVersion(self.schema_current_version)
}
pub fn schema_append_unresolved_field_add(&self, field_name: &str) {
self.schema_append_unresolved_wl_field_add(&mut self.schema_delta_write(), field_name);
pub fn schema_append_unresolved_wl_field_add(&mut self, field_name: &str) {
self.__schema_append_unresolved_delta(SchemaDeltaPart::field_add(field_name));
}
pub fn schema_append_unresolved_field_rem(&self, field_name: &str) {
self.schema_append_unresolved_wl_field_rem(&mut self.schema_delta_write(), field_name);
pub fn schema_append_unresolved_wl_field_rem(&mut self, field_name: &str) {
self.__schema_append_unresolved_delta(SchemaDeltaPart::field_rem(field_name));
}
}
impl DeltaState {
fn __schema_delta_step(&self) -> DeltaVersion {
DeltaVersion(self.schema_current_version.fetch_add(1, Ordering::AcqRel))
}
fn __schema_delta_current(&self) -> DeltaVersion {
DeltaVersion(self.schema_current_version.load(Ordering::Acquire))
fn __schema_delta_step(&mut self) -> DeltaVersion {
let current = self.schema_current_version;
self.schema_current_version += 1;
DeltaVersion(current)
}
fn __schema_append_unresolved_delta(
&self,
w: &mut BTreeMap<DeltaVersion, SchemaDeltaPart>,
part: SchemaDeltaPart,
) -> DeltaVersion {
fn __schema_append_unresolved_delta(&mut self, part: SchemaDeltaPart) -> DeltaVersion {
let v = self.__schema_delta_step();
w.insert(v, part);
self.schema_deltas.insert(v, part);
v
}
}
@ -328,19 +198,6 @@ impl SchemaDeltaPart {
}
}
pub struct SchemaDeltaIndexWGuard<'a>(
RwLockWriteGuard<'a, BTreeMap<DeltaVersion, SchemaDeltaPart>>,
);
pub struct SchemaDeltaIndexRGuard<'a>(RwLockReadGuard<'a, BTreeMap<DeltaVersion, SchemaDeltaPart>>);
impl<'a> SchemaDeltaIndexRGuard<'a> {
pub fn resolve_iter_since(
&self,
current_version: DeltaVersion,
) -> Range<DeltaVersion, SchemaDeltaPart> {
self.0.range(current_version.step()..)
}
}
/*
data delta
*/

@ -31,7 +31,6 @@ pub(in crate::engine) mod delta;
use std::cell::RefCell;
use {
self::delta::{IRModel, IRModelSMData, ISyncMatrix, IWModel},
super::index::PrimaryIndex,
crate::engine::{
data::{
@ -50,7 +49,6 @@ use {
},
txn::gns::{self as gnstxn, SpaceIDRef},
},
std::cell::UnsafeCell,
};
pub(in crate::engine::core) use self::delta::{DeltaState, DeltaVersion, SchemaDeltaKind};
@ -63,8 +61,7 @@ pub struct Model {
uuid: Uuid,
p_key: Box<str>,
p_tag: FullTag,
fields: UnsafeCell<Fields>,
sync_matrix: ISyncMatrix,
fields: Fields,
data: PrimaryIndex,
delta: DeltaState,
}
@ -72,11 +69,10 @@ pub struct Model {
#[cfg(test)]
impl PartialEq for Model {
fn eq(&self, m: &Self) -> bool {
let mdl1 = self.intent_read_model();
let mdl2 = m.intent_read_model();
((self.p_key == m.p_key) & (self.p_tag == m.p_tag))
&& self.uuid == m.uuid
&& mdl1.fields() == mdl2.fields()
self.uuid == m.uuid
&& self.p_key == m.p_key
&& self.p_tag == m.p_tag
&& self.fields == m.fields
}
}
@ -85,8 +81,7 @@ impl Model {
uuid: Uuid,
p_key: Box<str>,
p_tag: FullTag,
fields: UnsafeCell<Fields>,
sync_matrix: ISyncMatrix,
fields: Fields,
data: PrimaryIndex,
delta: DeltaState,
) -> Self {
@ -95,12 +90,10 @@ impl Model {
p_key,
p_tag,
fields,
sync_matrix,
data,
delta,
}
}
pub fn get_uuid(&self) -> Uuid {
self.uuid
}
@ -110,24 +103,6 @@ impl Model {
pub fn p_tag(&self) -> FullTag {
self.p_tag
}
pub fn sync_matrix(&self) -> &ISyncMatrix {
&self.sync_matrix
}
unsafe fn _read_fields<'a>(&'a self) -> &'a Fields {
&*self.fields.get().cast_const()
}
unsafe fn _read_fields_mut<'a>(&'a self) -> &'a mut Fields {
&mut *self.fields.get()
}
pub fn intent_read_model<'a>(&'a self) -> IRModel<'a> {
IRModel::new(self)
}
pub fn intent_write_model<'a>(&'a self) -> IWModel<'a> {
IWModel::new(self)
}
pub fn intent_write_new_data<'a>(&'a self) -> IRModelSMData<'a> {
IRModelSMData::new(self)
}
fn is_pk(&self, new: &str) -> bool {
self.p_key.as_bytes() == new.as_bytes()
}
@ -147,6 +122,15 @@ impl Model {
pub fn delta_state(&self) -> &DeltaState {
&self.delta
}
pub fn delta_state_mut(&mut self) -> &mut DeltaState {
&mut self.delta
}
pub fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
pub fn fields(&self) -> &Fields {
&self.fields
}
}
impl Model {
@ -155,8 +139,7 @@ impl Model {
uuid,
p_key,
p_tag,
UnsafeCell::new(fields),
ISyncMatrix::new(),
fields,
PrimaryIndex::new_empty(),
DeltaState::new_resolved(),
)
@ -215,14 +198,12 @@ impl Model {
}
// since we've locked this down, no one else can parallely create another model in the same space (or remove)
if G::FS_IS_NON_NULL {
let irm = model.intent_read_model();
let mut txn_driver = global.namespace_txn_driver().lock();
// prepare txn
let txn = gnstxn::CreateModelTxn::new(
SpaceIDRef::new(&space_name, &space),
&model_name,
&model,
&irm,
);
// attempt to initialize driver
global.initialize_model_driver(

@ -39,8 +39,7 @@ fn with_plan(model: &str, plan: &str, f: impl Fn(AlterPlan)) -> QueryResult<()>
let model = create(model)?;
let tok = lex_insecure(plan.as_bytes()).unwrap();
let alter = parse_ast_node_full(&tok[2..]).unwrap();
let model_write = model.intent_write_model();
let mv = AlterPlan::fdeltas(&model, &model_write, alter)?;
let mv = AlterPlan::fdeltas(&model, alter)?;
Ok(f(mv))
}
fn plan(model: &str, plan: &str, f: impl Fn(AlterPlan)) {
@ -366,9 +365,8 @@ mod exec {
"create model myspace.mymodel(username: string, col1: uint64)",
"alter model myspace.mymodel add (col2 { type: uint32, nullable: true }, col3 { type: uint16, nullable: true })",
|model| {
let schema = model.intent_read_model();
assert_eq!(
schema
model
.fields()
.stseq_ord_kv()
.rev()
@ -397,9 +395,8 @@ mod exec {
"create model myspace.mymodel(username: string, col1: uint64, col2: uint32, col3: uint16, col4: uint8)",
"alter model myspace.mymodel remove (col1, col2, col3, col4)",
|mdl| {
let schema = mdl.intent_read_model();
assert_eq!(
schema
mdl
.fields()
.stseq_ord_kv()
.rev()
@ -423,8 +420,7 @@ mod exec {
"create model myspace.mymodel(username: string, password: binary)",
"alter model myspace.mymodel update password { nullable: true }",
|model| {
let schema = model.intent_read_model();
assert!(schema.fields().st_get("password").unwrap().is_nullable());
assert!(model.fields().st_get("password").unwrap().is_nullable());
assert_eq!(
model.delta_state().schema_current_version(),
DeltaVersion::genesis()

@ -42,7 +42,6 @@ mod validation {
assert_eq!(model.p_tag(), FullTag::STR);
assert_eq!(
model
.intent_read_model()
.fields()
.stseq_ord_value()
.cloned()
@ -66,7 +65,6 @@ mod validation {
assert_eq!(model.p_tag(), FullTag::STR);
assert_eq!(
model
.intent_read_model()
.fields()
.stseq_ord_value()
.cloned()
@ -153,7 +151,6 @@ mod exec {
.unwrap();
with_model(&global, SPACE, "mymodel", |model| {
let models: Vec<(String, Field)> = model
.intent_read_model()
.fields()
.stseq_ord_kv()
.map(|(k, v)| (k.to_string(), v.clone()))

@ -74,7 +74,6 @@ fn _exec_only_read_key_and_then<T>(
) -> QueryResult<T> {
let guard = sync::atm::cpin();
global.namespace().with_model(entity, |mdl| {
let _irm = mdl.intent_read_model();
let row = mdl
.primary_index()
.select(Lit::from(key_name), &guard)
@ -92,7 +91,6 @@ fn _exec_delete_only(global: &impl GlobalInstanceLike, delete: &str, key: &str)
dml::delete(global, delete)?;
assert_eq!(
global.namespace().with_model(entity, |model| {
let _ = model.intent_read_model();
let g = sync::atm::cpin();
Ok(model.primary_index().select(key.into(), &g).is_none())
}),

@ -36,7 +36,7 @@ use {
core::{
index::{PrimaryIndexKey, RowData},
model::{
delta::{DataDelta, DataDeltaKind, DeltaVersion, IRModel},
delta::{DataDelta, DataDeltaKind, DeltaVersion},
Model,
},
},
@ -76,7 +76,6 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
}
pub fn write_new_batch(&mut self, model: &Model, observed_len: usize) -> RuntimeResult<()> {
// pin model
let irm = model.intent_read_model();
let schema_version = model.delta_state().schema_current_version();
let g = pin();
// init restore list
@ -90,7 +89,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
observed_len,
schema_version,
model.p_tag().tag_unique(),
irm.fields().len() - 1,
model.fields().len() - 1,
)?;
while i < observed_len {
let delta = model.delta_state().__data_delta_dequeue(&g).unwrap();
@ -116,7 +115,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
self.write_batch_item_common_row_data(&delta)?;
// encode data
self.encode_pk_only(delta.row().d_key())?;
self.encode_row_data(model, &irm, &row_data)?;
self.encode_row_data(model, &row_data)?;
}
}
i += 1;
@ -227,18 +226,13 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
Ok(())
}
/// Encode row data
fn encode_row_data(
&mut self,
mdl: &Model,
irm: &IRModel,
row_data: &RowData,
) -> RuntimeResult<()> {
for field_name in irm.fields().stseq_ord_key() {
fn encode_row_data(&mut self, model: &Model, row_data: &RowData) -> RuntimeResult<()> {
for field_name in model.fields().stseq_ord_key() {
match row_data.fields().get(field_name) {
Some(cell) => {
self.encode_cell(cell)?;
}
None if field_name.as_ref() == mdl.p_key() => {}
None if field_name.as_ref() == model.p_key() => {}
None => self.f.write_unfsynced(&[0])?,
}
}

@ -215,7 +215,6 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
) -> RuntimeResult<()> {
// NOTE(@ohsayan): current complexity is O(n) which is good enough (in the future I might revise this to a fancier impl)
// pin model
let irm = m.intent_read_model();
let g = unsafe { crossbeam_epoch::unprotected() };
let mut pending_delete = HashMap::new();
let p_index = m.primary_index().__raw_index();
@ -235,7 +234,7 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
// new row (logically)
let _ = p_index.mt_delete(&pk, &g);
let mut data = DcFieldIndex::default();
for (field_name, new_data) in irm
for (field_name, new_data) in m
.fields()
.stseq_ord_key()
.filter(|key| key.as_ref() != m.p_key())

@ -29,7 +29,7 @@ use {
crate::{
engine::{
core::{
model::{delta::IRModel, Field, Layer, Model},
model::{Field, Layer, Model},
space::Space,
},
data::{
@ -405,10 +405,10 @@ impl ModelLayoutMD {
}
#[derive(Clone, Copy)]
pub struct ModelLayoutRef<'a>(pub(super) &'a Model, pub(super) &'a IRModel<'a>);
impl<'a> From<(&'a Model, &'a IRModel<'a>)> for ModelLayoutRef<'a> {
fn from((mdl, irm): (&'a Model, &'a IRModel<'a>)) -> Self {
Self(mdl, irm)
pub struct ModelLayoutRef<'a>(pub(super) &'a Model);
impl<'a> From<&'a Model> for ModelLayoutRef<'a> {
fn from(mdl: &'a Model) -> Self {
Self(mdl)
}
}
impl<'a> PersistObject for ModelLayoutRef<'a> {
@ -419,11 +419,11 @@ impl<'a> PersistObject for ModelLayoutRef<'a> {
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.p_key_len as usize)
}
fn meta_enc(buf: &mut VecU8, ModelLayoutRef(v, irm): Self::InputType) {
buf.extend(v.get_uuid().to_le_bytes());
buf.extend(v.p_key().len().u64_bytes_le());
buf.extend(v.p_tag().tag_selector().value_qword().to_le_bytes());
buf.extend(irm.fields().len().u64_bytes_le());
fn meta_enc(buf: &mut VecU8, ModelLayoutRef(model_def): Self::InputType) {
buf.extend(model_def.get_uuid().to_le_bytes());
buf.extend(model_def.p_key().len().u64_bytes_le());
buf.extend(model_def.p_tag().tag_selector().value_qword().to_le_bytes());
buf.extend(model_def.fields().len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
Ok(ModelLayoutMD::new(
@ -433,11 +433,11 @@ impl<'a> PersistObject for ModelLayoutRef<'a> {
scanner.next_u64_le(),
))
}
fn obj_enc(buf: &mut VecU8, ModelLayoutRef(mdl, irm): Self::InputType) {
buf.extend(mdl.p_key().as_bytes());
fn obj_enc(buf: &mut VecU8, ModelLayoutRef(model_definition): Self::InputType) {
buf.extend(model_definition.p_key().as_bytes());
<super::map::PersistMapImpl<super::map::FieldMapSpec> as PersistObject>::obj_enc(
buf,
irm.fields(),
model_definition.fields(),
)
}
unsafe fn obj_dec(

@ -102,8 +102,7 @@ fn model() {
"profile_pic" => Field::new([Layer::bin()].into(), true),
},
);
let model_irm = model.intent_read_model();
let enc = super::enc::enc_full::<obj::ModelLayoutRef>(obj::ModelLayoutRef(&model, &model_irm));
let enc = super::enc::enc_full::<obj::ModelLayoutRef>(obj::ModelLayoutRef(&model));
let dec = super::dec::dec_full::<obj::ModelLayoutRef>(&enc).unwrap();
assert_eq!(model, dec);
}

@ -29,7 +29,7 @@ use {
crate::{
engine::{
core::{
model::{delta::IRModel, Field, Model},
model::{Field, Model},
space::Space,
GlobalNS, {EntityID, EntityIDRef},
},
@ -187,15 +187,15 @@ fn with_space_mut<T>(
f(&mut space)
}
fn with_model<T>(
fn with_model_mut<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
model_id: &ModelIDRes,
mut f: impl FnMut(&Model) -> RuntimeResult<T>,
mut f: impl FnMut(&mut Model) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
with_space(gns, space_id, |_| {
let models = gns.idx_models().read();
let Some(model) = models.get(&EntityIDRef::new(&space_id.name, &model_id.model_name))
let mut models = gns.idx_models().write();
let Some(model) = models.get_mut(&EntityIDRef::new(&space_id.name, &model_id.model_name))
else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
@ -217,7 +217,6 @@ pub struct CreateModelTxn<'a> {
space_id: super::SpaceIDRef<'a>,
model_name: &'a str,
model: &'a Model,
model_read: &'a IRModel<'a>,
}
impl<'a> CreateModelTxn<'a> {
@ -225,13 +224,11 @@ impl<'a> CreateModelTxn<'a> {
space_id: super::SpaceIDRef<'a>,
model_name: &'a str,
model: &'a Model,
model_read: &'a IRModel<'a>,
) -> Self {
Self {
space_id,
model_name,
model,
model_read,
}
}
}
@ -266,10 +263,7 @@ impl<'a> PersistObject for CreateModelTxn<'a> {
// model name
buf.extend(data.model_name.len().u64_bytes_le());
// model meta dump
<obj::ModelLayoutRef as PersistObject>::meta_enc(
buf,
obj::ModelLayoutRef::from((data.model, data.model_read)),
)
<obj::ModelLayoutRef as PersistObject>::meta_enc(buf, obj::ModelLayoutRef::from(data.model))
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let space_id = <super::SpaceID as PersistObject>::meta_dec(scanner)?;
@ -287,10 +281,7 @@ impl<'a> PersistObject for CreateModelTxn<'a> {
// model name
buf.extend(data.model_name.as_bytes());
// model dump
<obj::ModelLayoutRef as PersistObject>::obj_enc(
buf,
obj::ModelLayoutRef::from((data.model, data.model_read)),
)
<obj::ModelLayoutRef as PersistObject>::obj_enc(buf, obj::ModelLayoutRef::from(data.model))
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
@ -432,16 +423,15 @@ impl<'a> GNSEvent for AlterModelAddTxn<'a> {
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model(gns, &model_id.space_id, &model_id, |model| {
let mut wmodel = model.intent_write_model();
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
for (i, (field_name, field)) in new_fields.stseq_ord_kv().enumerate() {
if !wmodel
if !model
.fields_mut()
.st_insert(field_name.to_owned(), field.clone())
{
// rollback; corrupted
new_fields.stseq_ord_key().take(i).for_each(|field_id| {
let _ = wmodel.fields_mut().st_delete(field_id);
let _ = model.fields_mut().st_delete(field_id);
});
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
@ -450,8 +440,8 @@ impl<'a> GNSEvent for AlterModelAddTxn<'a> {
// publish deltas
for field_name in new_fields.stseq_ord_key() {
model
.delta_state()
.schema_append_unresolved_field_add(field_name);
.delta_state_mut()
.schema_append_unresolved_wl_field_add(field_name);
}
Ok(())
})
@ -551,18 +541,17 @@ impl<'a> GNSEvent for AlterModelRemoveTxn<'a> {
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model(gns, &model_id.space_id, &model_id, |model| {
let mut iwm = model.intent_write_model();
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut removed_fields_rb = vec![];
for removed_field in removed_fields.iter() {
match iwm.fields_mut().st_delete_return(removed_field) {
match model.fields_mut().st_delete_return(removed_field) {
Some(field) => {
removed_fields_rb.push((removed_field as &str, field));
}
None => {
// rollback
removed_fields_rb.into_iter().for_each(|(field_id, field)| {
let _ = iwm.fields_mut().st_insert(field_id.into(), field);
let _ = model.fields_mut().st_insert(field_id.into(), field);
});
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
@ -572,8 +561,8 @@ impl<'a> GNSEvent for AlterModelRemoveTxn<'a> {
// publish deltas
for field_name in removed_fields.iter() {
model
.delta_state()
.schema_append_unresolved_field_rem(field_name);
.delta_state_mut()
.schema_append_unresolved_wl_field_rem(field_name);
}
Ok(())
})
@ -667,16 +656,15 @@ impl<'a> GNSEvent for AlterModelUpdateTxn<'a> {
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model(gns, &model_id.space_id, &model_id, |model| {
let mut iwm = model.intent_write_model();
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut fields_rb = vec![];
for (field_id, field) in updated_fields.iter() {
match iwm.fields_mut().st_update_return(field_id, field.clone()) {
match model.fields_mut().st_update_return(field_id, field.clone()) {
Some(f) => fields_rb.push((field_id as &str, f)),
None => {
// rollback
fields_rb.into_iter().for_each(|(field_id, field)| {
let _ = iwm.fields_mut().st_update(field_id, field);
let _ = model.fields_mut().st_update(field_id, field);
});
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}

@ -219,11 +219,7 @@ fn alter_model_add() {
.namespace()
.with_model(("myspace", "mymodel").into(), |model| {
assert_eq!(
model
.intent_read_model()
.fields()
.st_get("profile_pic")
.unwrap(),
model.fields().st_get("profile_pic").unwrap(),
&Field::new([Layer::bin()].into(), true)
);
Ok(())
@ -257,9 +253,8 @@ fn alter_model_remove() {
global
.namespace()
.with_model(("myspace", "mymodel").into(), |model| {
let irm = model.intent_read_model();
assert!(irm.fields().st_get("has_secure_key").is_none());
assert!(irm.fields().st_get("is_dumb").is_none());
assert!(model.fields().st_get("has_secure_key").is_none());
assert!(model.fields().st_get("is_dumb").is_none());
Ok(())
})
.unwrap();
@ -291,11 +286,7 @@ fn alter_model_update() {
.namespace()
.with_model(("myspace", "mymodel").into(), |model| {
assert_eq!(
model
.intent_read_model()
.fields()
.st_get("profile_pic")
.unwrap(),
model.fields().st_get("profile_pic").unwrap(),
&Field::new([Layer::bin()].into(), true)
);
Ok(())

@ -118,15 +118,8 @@ mod model_tests {
#[test]
fn create() {
let (space, model) = default_space_model();
let irm = model.intent_read_model();
let txn = CreateModelTxn::new(
super::SpaceIDRef::new("myspace", &space),
"mymodel",
&model,
&irm,
);
let txn = CreateModelTxn::new(super::SpaceIDRef::new("myspace", &space), "mymodel", &model);
let encoded = super::enc::enc_full_self(txn);
core::mem::drop(irm);
let decoded = super::dec::dec_full::<CreateModelTxn>(&encoded).unwrap();
assert_eq!(
CreateModelTxnRestorePL {

Loading…
Cancel
Save