various stuff

main
Ziyang Hu 2 years ago
parent 3160789922
commit f5de468548

@ -276,6 +276,17 @@ impl TransactionPtr {
}
}
#[inline]
pub fn get_owned(&self, options: &ReadOptions,
key: impl AsRef<[u8]>) -> Result<Option<PinnableSlicePtr>> {
let mut slice = PinnableSlicePtr::default();
if self.get(options, key, &mut slice)? {
Ok(Some(slice))
} else {
Ok(None)
}
}
#[inline]
pub fn get_for_update(
&self,
options: &ReadOptions,
@ -377,6 +388,17 @@ impl DbPtr {
}
}
#[inline]
pub fn get_owned(&self, options: &ReadOptions,
key: impl AsRef<[u8]>) -> Result<Option<PinnableSlicePtr>> {
let mut slice = PinnableSlicePtr::default();
if self.get(options, key, &mut slice)? {
Ok(Some(slice))
} else {
Ok(None)
}
}
#[inline]
pub fn del(&self, options: &WriteOptions, key: impl AsRef<[u8]>) -> Result<()> {
let mut status = BridgeStatus::default();
let ret = self.del_raw(options, key.as_ref(), &mut status);
@ -399,7 +421,7 @@ impl DbPtr {
}
#[inline]
pub fn make_transaction(
pub fn txn(
&self,
options: TransactOptions,
write_ops: WriteOptionsPtr,

@ -128,6 +128,10 @@ impl OptionsPtr {
pub struct ReadOptionsPtr(UniquePtr<ReadOptions>);
unsafe impl Send for ReadOptionsPtr {}
// unsafe impl Sync for ReadOptionsPtr {}
impl Deref for ReadOptionsPtr {
type Target = UniquePtr<ReadOptions>;
@ -174,6 +178,9 @@ impl ReadOptionsPtr {
pub struct WriteOptionsPtr(pub(crate) UniquePtr<WriteOptions>);
unsafe impl Send for WriteOptionsPtr {}
// unsafe impl Sync for WriteOptionsPtr {}
impl Deref for WriteOptionsPtr {
type Target = UniquePtr<WriteOptions>;

@ -28,8 +28,8 @@ pub(crate) enum Expr<'a> {
Variable(String),
TableCol(TableId, ColId),
TupleSetIdx(TupleSetIdx),
Apply(Arc<dyn Op>, Vec<Expr<'a>>),
ApplyAgg(Arc<dyn AggOp>, Vec<Expr<'a>>, Vec<Expr<'a>>),
Apply(Arc<dyn Op + Send + Sync>, Vec<Expr<'a>>),
ApplyAgg(Arc<dyn AggOp + Send + Sync>, Vec<Expr<'a>>, Vec<Expr<'a>>),
FieldAcc(String, Box<Expr<'a>>),
IdxAcc(usize, Box<Expr<'a>>),
}

@ -99,7 +99,7 @@ fn build_expr_primary(pair: Pair<Rule>) -> Result<Expr> {
let mut inner = pair.into_inner();
let p = inner.next().unwrap();
let op = p.as_rule();
let op: Arc<dyn Op> = match op {
let op: Arc<dyn Op + Send + Sync> = match op {
Rule::term => return build_expr_primary(p),
Rule::negate => Arc::new(OpNegate),
Rule::minus => Arc::new(OpMinus),
@ -224,7 +224,7 @@ fn build_expr_infix<'a>(
) -> Result<Expr<'a>> {
let lhs = lhs?;
let rhs = rhs?;
let op: Arc<dyn Op> = match op.as_rule() {
let op: Arc<dyn Op + Send + Sync> = match op.as_rule() {
Rule::op_add => Arc::new(OpAdd),
Rule::op_str_cat => Arc::new(OpStrCat),
Rule::op_sub => Arc::new(OpSub),

@ -1,11 +1,11 @@
use std::fmt::{Debug, Formatter};
pub(crate) trait Op {
pub(crate) trait Op: Send + Sync {
fn is_resolved(&self) -> bool;
fn name(&self) -> &str;
}
pub(crate) trait AggOp {
pub(crate) trait AggOp: Send + Sync {
fn is_resolved(&self) -> bool;
fn name(&self) -> &str;
}

@ -7,10 +7,11 @@ use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::hash::{Hash, Hasher};
use std::result;
use chrono::format::Item;
use uuid::Uuid;
#[derive(thiserror::Error, Debug)]
pub(crate) enum TupleError {
pub enum TupleError {
#[error("Undefined data kind {0}")]
UndefinedDataKind(u32),
@ -108,8 +109,14 @@ impl<T: AsRef<[u8]>> Tuple<T> {
}
}
impl From<DataKind> for u32 {
fn from(dk: DataKind) -> Self {
dk as u32
}
}
#[derive(Clone)]
pub(crate) struct Tuple<T>
pub struct Tuple<T>
where
T: AsRef<[u8]>,
{
@ -117,6 +124,16 @@ pub(crate) struct Tuple<T>
idx_cache: RefCell<Vec<usize>>,
}
unsafe impl<T: AsRef<[u8]>> Send for Tuple<T> {}
unsafe impl<T: AsRef<[u8]>> Sync for Tuple<T> {}
impl<T> From<T> for Tuple<T> where T: AsRef<[u8]> {
fn from(data: T) -> Self {
Tuple::new(data)
}
}
impl<T> Tuple<T>
where
T: AsRef<[u8]>,
@ -135,7 +152,7 @@ impl<T> AsRef<[u8]> for Tuple<T>
}
}
pub(crate) type OwnTuple = Tuple<Vec<u8>>;
pub type OwnTuple = Tuple<Vec<u8>>;
pub(crate) const PREFIX_LEN: usize = 4;
@ -698,3 +715,16 @@ impl<T: AsRef<[u8]>> Hash for Tuple<T> {
}
impl<T: AsRef<[u8]>> Eq for Tuple<T> {}
impl<'a, P, T> From<(P, T)> for OwnTuple
where T: IntoIterator<Item=&'a Value<'a>>,
P: Into<u32> {
fn from((prefix, it): (P, T)) -> Self {
let mut ret = OwnTuple::with_prefix(prefix.into());
for item in it.into_iter() {
ret.push_value(item);
}
ret
}
}

@ -9,7 +9,7 @@ pub(crate) enum TypingError {
type Result<T> = result::Result<T, TypingError>;
const MIN_TABLE_ID: u32 = 10001;
pub(crate) const MIN_TABLE_ID_BOUND: u32 = 10000;
#[derive(Eq, PartialEq, Clone, Copy, Ord, PartialOrd, Hash)]
pub(crate) struct TableId {
@ -25,14 +25,14 @@ impl Debug for TableId {
impl TableId {
pub(crate) fn new(in_root: bool, id: u32) -> Result<Self> {
if id < MIN_TABLE_ID {
if id <= MIN_TABLE_ID_BOUND {
Err(TypingError::InvalidTableId(id))
} else {
Ok(TableId { in_root, id })
}
}
pub(crate) fn is_valid(&self) -> bool {
self.id >= MIN_TABLE_ID
self.id > MIN_TABLE_ID_BOUND
}
}

@ -1,17 +1,27 @@
use std::{mem, result};
use cozorocks::{BridgeError, DbPtr, destroy_db, OptionsPtrShared, TDbOptions};
use std::collections::BTreeMap;
use cozorocks::{BridgeError, DbPtr, destroy_db, OptionsPtrShared, PinnableSlicePtr, ReadOptionsPtr, TDbOptions, TransactionPtr, TransactOptions, WriteOptionsPtr};
use std::sync::{Arc, LockResult, Mutex, PoisonError};
use std::sync::atomic::{AtomicU32, Ordering};
use lazy_static::lazy_static;
use log::error;
use crate::data::tuple::Tuple;
use crate::runtime::options::{default_options, default_txn_options, default_write_options};
use crate::data::expr::StaticExpr;
use crate::data::tuple::{DataKind, OwnTuple, Tuple, TupleError};
use crate::data::tuple_set::MIN_TABLE_ID_BOUND;
use crate::data::typing::Typing;
use crate::data::value::{StaticValue, Value};
use crate::runtime::options::{default_options, default_read_options, default_txn_db_options, default_txn_options, default_write_options};
#[derive(thiserror::Error, Debug)]
pub enum DbInstanceError {
#[error(transparent)]
DbBridgeError(#[from] BridgeError),
DbBridge(#[from] BridgeError),
#[error("Cannot obtain session lock")]
SessionLockError,
SessionLock,
#[error(transparent)]
Tuple(#[from] TupleError),
}
type Result<T> = result::Result<T, DbInstanceError>;
@ -27,6 +37,7 @@ pub enum SessionStatus {
struct SessionHandle {
id: usize,
db: DbPtr,
next_table_id: u32,
status: SessionStatus,
}
@ -36,18 +47,20 @@ pub struct DbInstance {
tdb_options: TDbOptions,
path: String,
session_handles: Mutex<Vec<Arc<Mutex<SessionHandle>>>>,
optimistic: bool,
destroy_on_close: bool,
}
impl DbInstance {
pub fn new(path: &str, optimistic: bool) -> Result<Self> {
let options = default_options().make_shared();
let tdb_options = default_txn_options(optimistic);
let tdb_options = default_txn_db_options(optimistic);
let main = DbPtr::open(&options, &tdb_options, path)?;
Ok(Self {
options,
tdb_options,
main,
optimistic,
path: path.to_string(),
session_handles: vec![].into(),
destroy_on_close: false,
@ -58,7 +71,7 @@ impl DbInstance {
impl DbInstance {
pub fn session(&self) -> Result<Session> {
let mut handles = self.session_handles.lock()
.map_err(|_| DbInstanceError::SessionLockError)?;
.map_err(|_| DbInstanceError::SessionLock)?;
let handle = handles.iter().find_map(|handle| {
match handle.try_lock() {
Ok(inner) => {
@ -84,6 +97,7 @@ impl DbInstance {
status: SessionStatus::Prepared,
id: idx,
db: temp.clone(),
next_table_id: MIN_TABLE_ID_BOUND,
}));
handles.push(handle.clone());
@ -94,10 +108,21 @@ impl DbInstance {
drop(handles);
let mut w_opts_temp = default_write_options();
w_opts_temp.set_disable_wal(true);
Ok(Session {
main: self.main.clone(),
temp,
session_handle: handle,
optimistic: self.optimistic,
w_opts_main: default_write_options(),
w_opts_temp,
r_opts_main: default_read_options(),
r_opts_temp: default_read_options(),
stack: vec![],
cur_table_id: 0.into(),
params: Default::default(),
})
}
@ -153,24 +178,71 @@ impl Drop for DbInstance {
}
}
enum SessionDefinable {
Value(StaticValue),
Expr(StaticExpr),
Typing(Typing)
// TODO
}
type SessionStackFrame = BTreeMap<String, SessionDefinable>;
pub struct Session {
pub(crate) main: DbPtr,
pub(crate) temp: DbPtr,
pub(crate) r_opts_main: ReadOptionsPtr,
pub(crate) r_opts_temp: ReadOptionsPtr,
pub(crate) w_opts_main: WriteOptionsPtr,
pub(crate) w_opts_temp: WriteOptionsPtr,
optimistic: bool,
cur_table_id: AtomicU32,
stack: Vec<SessionStackFrame>,
params: BTreeMap<String, StaticValue>,
session_handle: Arc<Mutex<SessionHandle>>,
}
pub(crate) struct InterpretContext<'a> {
session: &'a Session,
}
impl <'a> InterpretContext<'a> {
pub(crate) fn resolve(&self, key: impl AsRef<str>) {
}
pub(crate) fn resolve_value(&self, key: impl AsRef<str>) {
}
pub(crate) fn resolve_typing(&self, key: impl AsRef<str>) {
todo!()
}
// also for expr, table, etc..
}
impl Session {
pub fn start(&mut self) -> Result<()> {
let mut handle = self.session_handle.lock()
.map_err(|_| DbInstanceError::SessionLockError)?;
handle.status = SessionStatus::Running;
Ok(())
pub fn start(mut self) -> Result<Self> {
{
self.push_env();
let mut handle = self.session_handle.lock()
.map_err(|_| DbInstanceError::SessionLock)?;
handle.status = SessionStatus::Running;
self.cur_table_id = handle.next_table_id.into();
}
Ok(self)
}
pub(crate) fn push_env(&mut self) {
self.stack.push(BTreeMap::new());
}
pub(crate) fn pop_env(&mut self) {
if self.stack.len() > 1 {
self.stack.pop();
}
}
fn clear_data(&self) -> Result<()> {
let w_opts = default_write_options();
self.temp
.del_range(&w_opts, Tuple::with_null_prefix(), Tuple::max_tuple())?;
// self.temp.compact_all()?;
self.temp.del_range(
&self.w_opts_temp,
Tuple::with_null_prefix(),
Tuple::max_tuple(),
)?;
Ok(())
}
pub fn stop(&mut self) -> Result<()> {
@ -178,11 +250,51 @@ impl Session {
let mut handle = self.session_handle.lock()
.map_err(|_| {
error!("failed to stop interpreter");
DbInstanceError::SessionLockError
DbInstanceError::SessionLock
})?;
handle.next_table_id = self.cur_table_id.load(Ordering::SeqCst);
handle.status = SessionStatus::Completed;
Ok(())
}
pub(crate) fn get_next_temp_table_id(&self) -> u32 {
let mut res = self.cur_table_id.fetch_add(1, Ordering::SeqCst);
while res.wrapping_add(1) < MIN_TABLE_ID_BOUND {
res = self.cur_table_id.fetch_add(MIN_TABLE_ID_BOUND, Ordering::SeqCst);
}
res + 1
}
pub(crate) fn txn(&self, w_opts: Option<WriteOptionsPtr>) -> TransactionPtr {
self.main.txn(default_txn_options(self.optimistic),
w_opts.unwrap_or_else(default_write_options))
}
pub(crate) fn get_next_main_table_id(&self) -> Result<u32> {
let txn = self.txn(None);
let key = MAIN_DB_TABLE_ID_SEQ_KEY.as_ref();
let cur_id = match txn.get_owned(&self.r_opts_main, key)? {
None => {
let val = OwnTuple::from(
(DataKind::Data, &[(MIN_TABLE_ID_BOUND as i64).into()]));
txn.put(key, &val)?;
MIN_TABLE_ID_BOUND
}
Some(pt) => {
let pt = Tuple::from(pt);
let prev_id = pt.get_int(0)?;
let val = OwnTuple::from((DataKind::Data, &[(prev_id + 1).into()]));
txn.put(key, &val)?;
(prev_id + 1) as u32
}
};
txn.commit()?;
Ok(cur_id + 1)
}
}
lazy_static! {
static ref MAIN_DB_TABLE_ID_SEQ_KEY: OwnTuple = OwnTuple::from((0u32, &[Value::Null]));
}
impl Drop for Session {
@ -201,7 +313,7 @@ mod tests {
use super::*;
use crate::runtime::instance::DbInstance;
fn test_send_sync<T: Send + Sync>(_x: T) {}
fn test_send<T: Send>(_x: T) {}
#[test]
fn creation() -> Result<()> {
@ -214,9 +326,11 @@ mod tests {
let start = Instant::now();
let mut db2 = DbInstance::new("_test2", true)?;
db2.set_destroy_on_close(true);
for _ in 0..1000 {
let i1 = db2.session()?;
test_send_sync(i1);
for _ in 0..100 {
let i1 = db2.session()?.start()?;
dbg!(i1.get_next_temp_table_id());
dbg!(i1.get_next_main_table_id()?);
test_send(i1);
}
dbg!(start.elapsed());
Ok(())

@ -1,5 +1,5 @@
use lazy_static::lazy_static;
use cozorocks::{FlushOptionsPtr, OptionsPtr, OTxnDbOptionsPtr, OTxnOptionsPtr, PTxnDbOptionsPtr, PTxnOptionsPtr, ReadOptionsPtr, RustComparatorPtr, TDbOptions, WriteOptionsPtr};
use cozorocks::{FlushOptionsPtr, OptionsPtr, OTxnDbOptionsPtr, OTxnOptionsPtr, PTxnDbOptionsPtr, PTxnOptionsPtr, ReadOptionsPtr, RustComparatorPtr, TDbOptions, TransactOptions, WriteOptionsPtr};
use crate::data::tuple::PREFIX_LEN;
const COMPARATOR_NAME: &str = "cozo_cmp_v1";
@ -34,10 +34,21 @@ pub fn default_flush_options() -> FlushOptionsPtr {
FlushOptionsPtr::default()
}
pub fn default_txn_options(optimistic: bool) -> TDbOptions {
pub fn default_txn_db_options(optimistic: bool) -> TDbOptions {
if optimistic {
TDbOptions::Optimistic(OTxnDbOptionsPtr::default())
} else {
TDbOptions::Pessimistic(PTxnDbOptionsPtr::default())
}
}
pub fn default_txn_options(optimistic: bool) -> TransactOptions {
if optimistic {
let o = OTxnOptionsPtr::new(&DEFAULT_COMPARATOR);
TransactOptions::Optimistic(o)
} else {
let mut o = PTxnOptionsPtr::default();
o.set_deadlock_detect(true);
TransactOptions::Pessimistic(o)
}
}
Loading…
Cancel
Save