row evaluation

main
Ziyang Hu 2 years ago
parent 007c36daa2
commit d393d4cf5f

@ -215,16 +215,8 @@ mod ffi {
val: &[u8],
status: &mut BridgeStatus,
);
fn del_raw(
self: &TDBBridge,
options: &WriteOptions,
key: &[u8],
status: &mut BridgeStatus,
);
fn iterator_raw(
self: &TDBBridge,
r_opts: &ReadOptions,
) -> UniquePtr<IteratorBridge>;
fn del_raw(self: &TDBBridge, options: &WriteOptions, key: &[u8], status: &mut BridgeStatus);
fn iterator_raw(self: &TDBBridge, r_opts: &ReadOptions) -> UniquePtr<IteratorBridge>;
fn open_db_raw(
options: &Options,

@ -12,9 +12,9 @@ pub use bridge::StatusBridgeCode;
pub use bridge::StatusCode;
pub use bridge::StatusSeverity;
pub use bridge::StatusSubCode;
pub use status::BridgeError;
use cxx::let_cxx_string;
pub use cxx::{SharedPtr, UniquePtr};
pub use status::BridgeError;
use status::Result;
use std::ops::Deref;
use std::pin::Pin;
@ -276,14 +276,16 @@ impl TransactionPtr {
}
}
#[inline]
pub fn get_owned(&self, options: &ReadOptions,
key: impl AsRef<[u8]>) -> Result<Option<PinnableSlicePtr>> {
pub fn get_owned(
&self,
options: &ReadOptions,
key: impl AsRef<[u8]>,
) -> Result<Option<PinnableSlicePtr>> {
let mut slice = PinnableSlicePtr::default();
if self.get(options, key, &mut slice)? {
Ok(Some(slice))
} else {
Ok(None)
}
}
#[inline]
@ -308,11 +310,7 @@ impl TransactionPtr {
status.check_err(ret)
}
#[inline]
pub fn put(
&self,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()> {
pub fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> {
let mut status = BridgeStatus::default();
let ret = self.put_txn(key.as_ref(), val.as_ref(), &mut status);
status.check_err(ret)
@ -347,19 +345,14 @@ impl DbPtr {
DbPtr(SharedPtr::null())
}
pub fn open_non_txn(options: &Options,
path: impl AsRef<str>, ) -> Result<Self> {
pub fn open_non_txn(options: &Options, path: impl AsRef<str>) -> Result<Self> {
let_cxx_string!(cname = path.as_ref());
let mut status = BridgeStatus::default();
let ret = open_db_raw(options, &cname, &mut status);
status.check_err(Self(ret))
}
pub fn open(
options: &Options,
t_options: &TDbOptions,
path: impl AsRef<str>,
) -> Result<Self> {
pub fn open(options: &Options, t_options: &TDbOptions, path: impl AsRef<str>) -> Result<Self> {
let_cxx_string!(cname = path.as_ref());
let mut status = BridgeStatus::default();
let ret = match t_options {
@ -388,14 +381,16 @@ impl DbPtr {
}
}
#[inline]
pub fn get_owned(&self, options: &ReadOptions,
key: impl AsRef<[u8]>) -> Result<Option<PinnableSlicePtr>> {
pub fn get_owned(
&self,
options: &ReadOptions,
key: impl AsRef<[u8]>,
) -> Result<Option<PinnableSlicePtr>> {
let mut slice = PinnableSlicePtr::default();
if self.get(options, key, &mut slice)? {
Ok(Some(slice))
} else {
Ok(None)
}
}
#[inline]
@ -421,11 +416,7 @@ impl DbPtr {
}
#[inline]
pub fn txn(
&self,
options: TransactOptions,
write_ops: WriteOptionsPtr,
) -> TransactionPtr {
pub fn txn(&self, options: TransactOptions, write_ops: WriteOptionsPtr) -> TransactionPtr {
TransactionPtr(match options {
TransactOptions::Optimistic(o) => self.begin_o_transaction(write_ops.0, o.0),
TransactOptions::Pessimistic(o) => self.begin_t_transaction(write_ops.0, o.0),
@ -495,4 +486,4 @@ pub fn repair_db(options: &Options, path: impl AsRef<str>) -> Result<()> {
let mut status = BridgeStatus::default();
repair_db_raw(options, &cname, &mut status);
status.check_err(())
}
}

@ -131,7 +131,6 @@ pub struct ReadOptionsPtr(UniquePtr<ReadOptions>);
unsafe impl Send for ReadOptionsPtr {}
// unsafe impl Sync for ReadOptionsPtr {}
impl Deref for ReadOptionsPtr {
type Target = UniquePtr<ReadOptions>;

@ -1,8 +1,9 @@
pub(crate) mod eval;
pub(crate) mod expr;
pub(crate) mod expr_parser;
pub(crate) mod key_order;
pub(crate) mod op;
pub(crate) mod tuple;
pub(crate) mod tuple_set;
pub(crate) mod typing;
pub(crate) mod value;
pub(crate) mod key_order;

@ -0,0 +1,134 @@
use crate::data::expr::Expr;
use crate::data::expr_parser::ExprParseError;
use crate::data::tuple_set::{ColId, TableId, TupleSetIdx};
use crate::data::value::{StaticValue, Value};
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::result;
#[derive(thiserror::Error, Debug)]
pub(crate) enum EvalError {
#[error("Unresolved variable `{0}`")]
UnresolvedVariable(String),
#[error("Unresolved table col {0:?}{1:?}")]
UnresolveTableCol(TableId, ColId),
#[error("Unresolved tuple index {0:?}")]
UnresolveTupleIdx(TupleSetIdx),
#[error("Cannot access field {0} for {1}")]
FieldAccess(String, StaticValue),
#[error("Cannot access index {0} for {1}")]
IndexAccess(usize, StaticValue),
#[error(transparent)]
Parse(#[from] ExprParseError),
#[error("Cannot apply `{0}` to `{1:?}`")]
OpTypeMismatch(String, Vec<StaticValue>),
}
type Result<T> = result::Result<T, EvalError>;
pub(crate) trait RowEvalContext {
fn resolve<'a>(&'a self, idx: &TupleSetIdx) -> Result<&'a Value>;
}
impl RowEvalContext for () {
fn resolve<'a>(&'a self, idx: &TupleSetIdx) -> Result<&'a Value> {
Err(EvalError::UnresolveTupleIdx(*idx))
}
}
pub(crate) trait ExprEvalContext {}
impl<'a> Expr<'a> {
pub(crate) fn row_eval<C: RowEvalContext + 'a>(&'a self, ctx: &'a C) -> Result<Value<'a>> {
let res: Value = match self {
Expr::Const(v) => v.clone(),
Expr::List(l) => l
.iter()
.map(|v| v.row_eval(ctx))
.collect::<Result<Vec<_>>>()?
.into(),
Expr::Dict(d) => d
.iter()
.map(|(k, v)| -> Result<(Cow<str>, Value)> {
let v = v.row_eval(ctx)?;
Ok((k.into(), v))
})
.collect::<Result<BTreeMap<_, _>>>()?
.into(),
Expr::Variable(v) => return Err(EvalError::UnresolvedVariable(v.clone())),
Expr::TableCol(tid, cid) => return Err(EvalError::UnresolveTableCol(*tid, *cid)),
Expr::TupleSetIdx(idx) => ctx.resolve(idx)?.clone(),
Expr::Apply(op, vals) => {
// TODO for non-null operators, short-circuit
let (has_null, args) = vals.iter().try_fold(
(false, Vec::with_capacity(vals.len())),
|(has_null, mut acc), v| {
v.row_eval(ctx).map(|v| match v {
Value::Null => {
acc.push(Value::Null);
(true, acc)
}
v => {
acc.push(v);
(has_null, acc)
}
})
},
)?;
op.eval(has_null, args)?
}
Expr::ApplyAgg(_, _, _) => {
todo!()
}
Expr::FieldAcc(f, arg) => match arg.row_eval(ctx)? {
Value::Null => Value::Null,
Value::Dict(mut d) => d.remove(f as &str).unwrap_or(Value::Null),
v => return Err(EvalError::FieldAccess(f.clone(), v.to_static())),
},
Expr::IdxAcc(idx, arg) => match arg.row_eval(ctx)? {
Value::Null => Value::Null,
Value::List(mut d) => {
if *idx >= d.len() {
Value::Null
} else {
d.swap_remove(*idx)
}
}
v => return Err(EvalError::IndexAccess(*idx, v.to_static())),
},
};
Ok(res)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::expr_parser::tests::str2expr;
#[test]
fn evaluations() -> Result<()> {
dbg!(str2expr("123")?.row_eval(&())?);
dbg!(str2expr("123 + 457")?.row_eval(&())?);
dbg!(str2expr("123 + 457.1")?.row_eval(&())?);
dbg!(str2expr("'123' ++ '457.1'")?.row_eval(&())?);
dbg!(str2expr("null ~ null ~ 123 ~ null")?.row_eval(&())?);
dbg!(str2expr("2*3+1/10")?.row_eval(&())?);
dbg!(str2expr("1>null")?.row_eval(&())?);
dbg!(str2expr("'c'>'d'")?.row_eval(&())?);
dbg!(str2expr("null && true && null")?.row_eval(&())?);
dbg!(str2expr("null && false && null")?.row_eval(&())?);
dbg!(str2expr("null || true || null")?.row_eval(&())?);
dbg!(str2expr("null || false || null")?.row_eval(&())?);
dbg!(str2expr("!true")?.row_eval(&())?);
dbg!(str2expr("!null")?.row_eval(&())?);
Ok(())
}
}

@ -247,13 +247,13 @@ fn build_expr_infix<'a>(
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use crate::data::expr::StaticExpr;
use crate::parser::CozoParser;
use pest::Parser;
fn parse_expr_from_str(s: &str) -> Result<Expr> {
pub(crate) fn str2expr(s: &str) -> Result<Expr> {
let pair = CozoParser::parse(Rule::expr, s.as_ref())
.unwrap()
.next()
@ -263,95 +263,77 @@ mod tests {
#[test]
fn raw_string() {
assert!(dbg!(parse_expr_from_str(r#####"r#"x"#"#####)).is_ok());
assert!(dbg!(str2expr(r#####"r#"x"#"#####)).is_ok());
}
#[test]
fn unevaluated() {
assert!(dbg!(parse_expr_from_str("a+b*c+d")).is_ok());
assert!(dbg!(str2expr("a+b*c+d")).is_ok());
}
#[test]
fn parse_literals() {
assert_eq!(str2expr("1").unwrap(), Expr::Const(Value::Int(1)));
assert_eq!(str2expr("12_3").unwrap(), Expr::Const(Value::Int(123)));
assert_eq!(str2expr("0xaf").unwrap(), Expr::Const(Value::Int(0xaf)));
assert_eq!(
parse_expr_from_str("1").unwrap(),
Expr::Const(Value::Int(1))
);
assert_eq!(
parse_expr_from_str("12_3").unwrap(),
Expr::Const(Value::Int(123))
);
assert_eq!(
parse_expr_from_str("0xaf").unwrap(),
Expr::Const(Value::Int(0xaf))
);
assert_eq!(
parse_expr_from_str("0xafcE_f").unwrap(),
str2expr("0xafcE_f").unwrap(),
Expr::Const(Value::Int(0xafcef))
);
assert_eq!(
parse_expr_from_str("0o1234_567").unwrap(),
str2expr("0o1234_567").unwrap(),
Expr::Const(Value::Int(0o1234567))
);
assert_eq!(
parse_expr_from_str("0o0001234_567").unwrap(),
str2expr("0o0001234_567").unwrap(),
Expr::Const(Value::Int(0o1234567))
);
assert_eq!(
parse_expr_from_str("0b101010").unwrap(),
str2expr("0b101010").unwrap(),
Expr::Const(Value::Int(0b101010))
);
assert_eq!(
parse_expr_from_str("0.0").unwrap(),
str2expr("0.0").unwrap(),
Expr::Const(Value::Float((0.).into()))
);
assert_eq!(
parse_expr_from_str("10.022_3").unwrap(),
str2expr("10.022_3").unwrap(),
Expr::Const(Value::Float(10.0223.into()))
);
assert_eq!(
parse_expr_from_str("10.022_3e-100").unwrap(),
str2expr("10.022_3e-100").unwrap(),
Expr::Const(Value::Float(10.0223e-100.into()))
);
assert_eq!(str2expr("null").unwrap(), Expr::Const(Value::Null));
assert_eq!(str2expr("true").unwrap(), Expr::Const(Value::Bool(true)));
assert_eq!(str2expr("false").unwrap(), Expr::Const(Value::Bool(false)));
assert_eq!(
parse_expr_from_str("null").unwrap(),
Expr::Const(Value::Null)
);
assert_eq!(
parse_expr_from_str("true").unwrap(),
Expr::Const(Value::Bool(true))
);
assert_eq!(
parse_expr_from_str("false").unwrap(),
Expr::Const(Value::Bool(false))
);
assert_eq!(
parse_expr_from_str(r#""x \n \ty \"""#).unwrap(),
str2expr(r#""x \n \ty \"""#).unwrap(),
Expr::Const(Value::Text(Cow::Borrowed("x \n \ty \"")))
);
assert_eq!(
parse_expr_from_str(r#""x'""#).unwrap(),
str2expr(r#""x'""#).unwrap(),
Expr::Const(Value::Text("x'".into()))
);
assert_eq!(
parse_expr_from_str(r#"'"x"'"#).unwrap(),
str2expr(r#"'"x"'"#).unwrap(),
Expr::Const(Value::Text(r##""x""##.into()))
);
assert_eq!(
parse_expr_from_str(r#####"r###"x"yz"###"#####).unwrap(),
str2expr(r#####"r###"x"yz"###"#####).unwrap(),
(Expr::Const(Value::Text(r##"x"yz"##.into())))
);
}
#[test]
fn complex_cases() -> Result<()> {
dbg!(parse_expr_from_str("{}")?);
dbg!(parse_expr_from_str("{b:1,a,c:2,d,...e,}")?);
dbg!(parse_expr_from_str("{...a,...b,c:1,d:2,...e,f:3}")?);
dbg!(parse_expr_from_str("[]")?);
dbg!(parse_expr_from_str("[...a,...b,1,2,...e,3]")?);
dbg!(str2expr("{}")?);
dbg!(str2expr("{b:1,a,c:2,d,...e,}")?);
dbg!(str2expr("{...a,...b,c:1,d:2,...e,f:3}")?);
dbg!(str2expr("[]")?);
dbg!(str2expr("[...a,...b,1,2,...e,3]")?);
Ok(())
}
}

@ -1,29 +1,88 @@
use std::fmt::{Debug, Formatter};
use std::cmp::max;
use std::collections::BTreeMap;
use crate::data::eval::{EvalError, ExprEvalContext, RowEvalContext};
use crate::data::expr::Expr;
use crate::data::typing::Typing;
use crate::data::value::{StaticValue, Value};
use std::fmt::{Debug, Formatter};
use std::result;
type Result<T> = result::Result<T, EvalError>;
pub(crate) trait Op: Send + Sync {
fn is_resolved(&self) -> bool;
fn name(&self) -> &str;
fn non_null_args(&self) -> bool {
fn is_resolved(&self) -> bool {
true
}
fn typing_eval(&self, args: ()) -> Typing {
Typing::Any
}
fn row_eval(&self, ctx: (), args: ()) -> () {
unimplemented!()
fn arity(&self) -> Option<usize> {
Some(1)
}
fn row_eval_non_null(&self) -> () {
panic!()
}
fn expr_eval(&self, ctx: (), args: ()) -> () {
self.row_eval(ctx, args)
fn name(&self) -> &str;
fn non_null_args(&self) -> bool {
true
}
fn typing_eval(&self, args: &[Typing]) -> Typing {
let representatives = args.iter().map(|v| v.representative_value()).collect();
match self.eval_non_null(representatives) {
Ok(t) => t.deduce_typing(),
Err(_) => Typing::Any,
}
}
fn eval<'a>(&self, has_null: bool, args: Vec<Value<'a>>) -> Result<Value<'a>> {
if self.non_null_args() {
if has_null {
Ok(Value::Null)
} else {
match self.arity() {
Some(0) => self.eval_zero(),
Some(1) => self.eval_one_non_null(args.into_iter().next().unwrap()),
Some(2) => {
let mut args = args.into_iter();
self.eval_two_non_null(args.next().unwrap(), args.next().unwrap())
}
_ => self.eval_non_null(args),
}
}
} else {
panic!(
"Required method `eval` not implemented for `{}`",
self.name()
)
}
}
fn eval_non_null<'a>(&self, args: Vec<Value<'a>>) -> Result<Value<'a>> {
panic!(
"Required method `eval_non_null` not implemented for `{}`",
self.name()
)
}
fn eval_zero(&self) -> Result<StaticValue> {
panic!(
"Required method `eval_zero` not implemented for `{}`",
self.name()
)
}
fn eval_one_non_null<'a>(&self, _arg: Value<'a>) -> Result<Value<'a>> {
panic!(
"Required method `eval_one` not implemented for `{}`",
self.name()
)
}
fn eval_two_non_null<'a>(&self, _left: Value<'a>, _right: Value<'a>) -> Result<Value<'a>> {
panic!(
"Required method `eval_two` not implemented for `{}`",
self.name()
)
}
fn expr_eval(&self, ctx: &dyn ExprEvalContext, args: ()) -> () {}
}
pub(crate) trait AggOp: Send + Sync {
fn is_resolved(&self) -> bool;
fn is_resolved(&self) -> bool {
true
}
fn arity(&self) -> Option<usize> {
Some(1)
}
fn name(&self) -> &str;
fn row_eval(&self, ctx: (), args: ()) -> () {
unimplemented!()
@ -33,11 +92,6 @@ pub(crate) trait AggOp: Send + Sync {
}
}
impl<'a> Expr<'a> {
pub(crate) fn expr_eval() {}
pub(crate) fn row_eval() {}
}
pub(crate) struct UnresolvedOp(pub String);
impl Op for UnresolvedOp {
@ -51,10 +105,6 @@ impl Op for UnresolvedOp {
}
impl AggOp for UnresolvedOp {
fn is_resolved(&self) -> bool {
false
}
fn name(&self) -> &str {
&self.0
}
@ -63,263 +113,517 @@ impl AggOp for UnresolvedOp {
pub(crate) struct OpAdd;
impl Op for OpAdd {
fn is_resolved(&self) -> bool {
true
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"+"
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
let res: Value = match (left, right) {
(Value::Int(l), Value::Int(r)) => (l + r).into(),
(Value::Float(l), Value::Int(r)) => (l + (r as f64)).into(),
(Value::Int(l), Value::Float(r)) => ((l as f64) + r.into_inner()).into(),
(Value::Float(l), Value::Float(r)) => (l.into_inner() + r.into_inner()).into(),
(l, r) => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
));
}
};
Ok(res)
}
}
pub(crate) struct OpSub;
impl Op for OpSub {
fn is_resolved(&self) -> bool {
true
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"-"
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
let res: Value = match (left, right) {
(Value::Int(l), Value::Int(r)) => (l - r).into(),
(Value::Float(l), Value::Int(r)) => (l - (r as f64)).into(),
(Value::Int(l), Value::Float(r)) => ((l as f64) - r.into_inner()).into(),
(Value::Float(l), Value::Float(r)) => (l.into_inner() - r.into_inner()).into(),
(l, r) => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
));
}
};
Ok(res)
}
}
pub(crate) struct OpMul;
impl Op for OpMul {
fn is_resolved(&self) -> bool {
true
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"*"
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
let res: Value = match (left, right) {
(Value::Int(l), Value::Int(r)) => (l * r).into(),
(Value::Float(l), Value::Int(r)) => (l * (r as f64)).into(),
(Value::Int(l), Value::Float(r)) => ((l as f64) * r.into_inner()).into(),
(Value::Float(l), Value::Float(r)) => (l.into_inner() * r.into_inner()).into(),
(l, r) => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
));
}
};
Ok(res)
}
}
pub(crate) struct OpDiv;
impl Op for OpDiv {
fn is_resolved(&self) -> bool {
true
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"/"
}
}
pub(crate) struct OpStrCat;
impl Op for OpStrCat {
fn is_resolved(&self) -> bool {
true
}
fn name(&self) -> &str {
"++"
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
let res: Value = match (left, right) {
(Value::Int(l), Value::Int(r)) => (l as f64 / r as f64).into(),
(Value::Float(l), Value::Int(r)) => (l / (r as f64)).into(),
(Value::Int(l), Value::Float(r)) => ((l as f64) / r.into_inner()).into(),
(Value::Float(l), Value::Float(r)) => (l.into_inner() / r.into_inner()).into(),
(l, r) => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
));
}
};
Ok(res)
}
}
pub(crate) struct OpEq;
pub(crate) struct OpMod;
impl Op for OpEq {
fn is_resolved(&self) -> bool {
true
impl Op for OpMod {
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"=="
"%"
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
let res: Value = match (left, right) {
(Value::Int(l), Value::Int(r)) => (l % r).into(),
(l, r) => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
));
}
};
Ok(res)
}
}
pub(crate) struct OpNe;
pub(crate) struct OpPow;
impl Op for OpNe {
fn is_resolved(&self) -> bool {
true
impl Op for OpPow {
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"!="
"**"
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
let res: Value = match (left, right) {
(Value::Int(l), Value::Int(r)) => ((l as f64).powf(r as f64)).into(),
(Value::Float(l), Value::Int(r)) => ((l.into_inner()).powf(r as f64)).into(),
(Value::Int(l), Value::Float(r)) => ((l as f64).powf(r.into_inner())).into(),
(Value::Float(l), Value::Float(r)) => ((l.into_inner()).powf(r.into_inner())).into(),
(l, r) => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
));
}
};
Ok(res)
}
}
pub(crate) struct OpOr;
pub(crate) struct OpStrCat;
impl Op for OpOr {
fn is_resolved(&self) -> bool {
true
impl Op for OpStrCat {
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"||"
"++"
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
match (left, right) {
(Value::Text(l), Value::Text(r)) => {
let mut l = l.into_owned();
l += r.as_ref();
Ok(l.into())
}
(l, r) => Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
)),
}
}
}
pub(crate) struct OpAnd;
pub(crate) struct OpEq;
impl Op for OpAnd {
fn is_resolved(&self) -> bool {
true
impl Op for OpEq {
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"&&"
"=="
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
Ok((left == right).into())
}
}
pub(crate) struct OpMod;
pub(crate) struct OpNe;
impl Op for OpMod {
fn is_resolved(&self) -> bool {
true
impl Op for OpNe {
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"%"
"!="
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
Ok((left != right).into())
}
}
pub(crate) struct OpGt;
impl Op for OpGt {
fn is_resolved(&self) -> bool {
true
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
">"
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
let res: Value = match (left, right) {
(Value::Int(l), Value::Int(r)) => (l > r).into(),
(Value::Float(l), Value::Int(r)) => (l > (r as f64).into()).into(),
(Value::Int(l), Value::Float(r)) => ((l as f64) > r.into_inner()).into(),
(Value::Float(l), Value::Float(r)) => (l > r).into(),
(Value::Text(l), Value::Text(r)) => (l > r).into(),
(l, r) => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
));
}
};
Ok(res)
}
}
pub(crate) struct OpGe;
impl Op for OpGe {
fn is_resolved(&self) -> bool {
true
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
">="
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
let res: Value = match (left, right) {
(Value::Int(l), Value::Int(r)) => (l >= r).into(),
(Value::Float(l), Value::Int(r)) => (l >= (r as f64).into()).into(),
(Value::Int(l), Value::Float(r)) => ((l as f64) >= r.into_inner()).into(),
(Value::Float(l), Value::Float(r)) => (l >= r).into(),
(Value::Text(l), Value::Text(r)) => (l >= r).into(),
(l, r) => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
));
}
};
Ok(res)
}
}
pub(crate) struct OpLt;
impl Op for OpLt {
fn is_resolved(&self) -> bool {
true
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"<"
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
let res: Value = match (left, right) {
(Value::Int(l), Value::Int(r)) => (l < r).into(),
(Value::Float(l), Value::Int(r)) => (l < (r as f64).into()).into(),
(Value::Int(l), Value::Float(r)) => ((l as f64) < r.into_inner()).into(),
(Value::Float(l), Value::Float(r)) => (l < r).into(),
(Value::Text(l), Value::Text(r)) => (l < r).into(),
(l, r) => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
));
}
};
Ok(res)
}
}
pub(crate) struct OpLe;
impl Op for OpLe {
fn is_resolved(&self) -> bool {
true
fn arity(&self) -> Option<usize> {
Some(2)
}
fn name(&self) -> &str {
"<="
}
fn eval_two_non_null<'a>(&self, left: Value<'a>, right: Value<'a>) -> Result<Value<'a>> {
let res: Value = match (left, right) {
(Value::Int(l), Value::Int(r)) => (l <= r).into(),
(Value::Float(l), Value::Int(r)) => (l <= (r as f64).into()).into(),
(Value::Int(l), Value::Float(r)) => ((l as f64) <= r.into_inner()).into(),
(Value::Float(l), Value::Float(r)) => (l <= r).into(),
(Value::Text(l), Value::Text(r)) => (l <= r).into(),
(l, r) => {
return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![l.to_static(), r.to_static()],
));
}
};
Ok(res)
}
}
pub(crate) struct OpPow;
impl Op for OpPow {
fn is_resolved(&self) -> bool {
true
}
pub(crate) struct OpNegate;
impl Op for OpNegate {
fn name(&self) -> &str {
"**"
"!"
}
fn eval_one_non_null<'a>(&self, arg: Value<'a>) -> Result<Value<'a>> {
match arg {
Value::Bool(b) => Ok((!b).into()),
v => Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![v.to_static()],
)),
}
}
}
pub(crate) struct OpCoalesce;
impl Op for OpCoalesce {
fn is_resolved(&self) -> bool {
true
}
pub(crate) struct OpMinus;
impl Op for OpMinus {
fn name(&self) -> &str {
"~~"
"--"
}
fn eval_one_non_null<'a>(&self, arg: Value<'a>) -> Result<Value<'a>> {
match arg {
Value::Int(i) => Ok((-i).into()),
Value::Float(i) => Ok((-i).into()),
v => Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![v.to_static()],
)),
}
}
}
pub(crate) struct OpNegate;
pub(crate) struct OpIsNull;
impl Op for OpNegate {
fn is_resolved(&self) -> bool {
true
impl Op for OpIsNull {
fn name(&self) -> &str {
"is_null"
}
fn non_null_args(&self) -> bool {
false
}
fn eval<'a>(&self, has_null: bool, _args: Vec<Value<'a>>) -> Result<Value<'a>> {
Ok(has_null.into())
}
}
pub(crate) struct OpNotNull;
impl Op for OpNotNull {
fn name(&self) -> &str {
"!"
"not_null"
}
fn non_null_args(&self) -> bool {
false
}
fn eval<'a>(&self, has_null: bool, _args: Vec<Value<'a>>) -> Result<Value<'a>> {
Ok((!has_null).into())
}
}
pub(crate) struct OpMinus;
pub(crate) struct OpCoalesce;
impl Op for OpMinus {
fn is_resolved(&self) -> bool {
true
impl Op for OpCoalesce {
fn arity(&self) -> Option<usize> {
None
}
fn name(&self) -> &str {
"--"
"~~"
}
fn non_null_args(&self) -> bool {
false
}
fn eval<'a>(&self, _has_null: bool, args: Vec<Value<'a>>) -> Result<Value<'a>> {
for arg in args {
if arg != Value::Null {
return Ok(arg);
}
}
Ok(Value::Null)
}
}
pub(crate) struct OpIsNull;
pub(crate) struct OpOr;
impl Op for OpIsNull {
fn is_resolved(&self) -> bool {
true
impl Op for OpOr {
fn arity(&self) -> Option<usize> {
None
}
fn name(&self) -> &str {
"is_null"
"||"
}
fn non_null_args(&self) -> bool {
false
}
fn eval<'a>(&self, has_null: bool, args: Vec<Value<'a>>) -> Result<Value<'a>> {
for arg in args {
match arg {
Value::Null => {}
Value::Bool(true) => return Ok(Value::Bool(true)),
Value::Bool(false) => {}
v => return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![v.to_static()],
)),
}
}
if has_null {
Ok(Value::Null)
} else {
Ok(Value::Bool(false))
}
}
}
pub(crate) struct OpNotNull;
pub(crate) struct OpAnd;
impl Op for OpNotNull {
fn is_resolved(&self) -> bool {
true
impl Op for OpAnd {
fn arity(&self) -> Option<usize> {
None
}
fn name(&self) -> &str {
"not_null"
"&&"
}
fn non_null_args(&self) -> bool {
false
}
fn eval<'a>(&self, has_null: bool, args: Vec<Value<'a>>) -> Result<Value<'a>> {
for arg in args {
match arg {
Value::Null => {}
Value::Bool(false) => return Ok(Value::Bool(false)),
Value::Bool(true) => {}
v => return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![v.to_static()],
)),
}
}
if has_null {
Ok(Value::Null)
} else {
Ok(Value::Bool(true))
}
}
}
pub(crate) struct OpConcat;
impl Op for OpConcat {
fn is_resolved(&self) -> bool {
true
fn arity(&self) -> Option<usize> {
None
}
fn name(&self) -> &str {
"concat"
}
fn non_null_args(&self) -> bool {
false
}
fn eval<'a>(&self, _has_null: bool, args: Vec<Value<'a>>) -> Result<Value<'a>> {
let mut coll = vec![];
for v in args.into_iter() {
match v {
Value::Null => {}
Value::List(l) => coll.extend(l),
v => return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![v.to_static()],
)),
}
}
Ok(coll.into())
}
}
pub(crate) struct OpMerge;
impl Op for OpMerge {
fn is_resolved(&self) -> bool {
true
fn arity(&self) -> Option<usize> {
None
}
fn name(&self) -> &str {
"merge"
}
fn non_null_args(&self) -> bool {
false
}
fn eval<'a>(&self, has_null: bool, args: Vec<Value<'a>>) -> Result<Value<'a>> {
let mut coll = BTreeMap::new();
for v in args.into_iter() {
match v {
Value::Null => {}
Value::Dict(d) => coll.extend(d),
v => return Err(EvalError::OpTypeMismatch(
self.name().to_string(),
vec![v.to_static()],
)),
}
}
Ok(coll.into())
}
}

@ -1,5 +1,7 @@
use crate::data::tuple::TupleError::UndefinedDataTag;
use crate::data::value::Value;
use chrono::format::Item;
use cozorocks::{PinnableSlicePtr, PinnableSlicePtrShared, SlicePtr, SlicePtrShared};
use std::borrow::Cow;
use std::cell::RefCell;
use std::cmp::{Ordering, Reverse};
@ -7,9 +9,7 @@ use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::hash::{Hash, Hasher};
use std::result;
use chrono::format::Item;
use uuid::Uuid;
use cozorocks::{PinnableSlicePtr, PinnableSlicePtrShared, SlicePtr, SlicePtrShared};
#[derive(thiserror::Error, Debug)]
pub enum TupleError {
@ -118,8 +118,8 @@ impl From<DataKind> for u32 {
#[derive(Clone)]
pub struct Tuple<T>
where
T: AsRef<[u8]>,
where
T: AsRef<[u8]>,
{
pub(crate) data: T,
idx_cache: RefCell<Vec<usize>>,
@ -129,15 +129,18 @@ unsafe impl<T: AsRef<[u8]>> Send for Tuple<T> {}
unsafe impl<T: AsRef<[u8]>> Sync for Tuple<T> {}
impl<T> From<T> for Tuple<T> where T: AsRef<[u8]> {
impl<T> From<T> for Tuple<T>
where
T: AsRef<[u8]>,
{
fn from(data: T) -> Self {
Tuple::new(data)
}
}
impl<T> Tuple<T>
where
T: AsRef<[u8]>,
where
T: AsRef<[u8]>,
{
pub(crate) fn clear_cache(&self) {
self.idx_cache.borrow_mut().clear()
@ -145,8 +148,8 @@ impl<T> Tuple<T>
}
impl<T> AsRef<[u8]> for Tuple<T>
where
T: AsRef<[u8]>,
where
T: AsRef<[u8]>,
{
fn as_ref(&self) -> &[u8] {
self.data.as_ref()
@ -427,7 +430,7 @@ impl<T: AsRef<[u8]>> Tuple<T> {
let (val, offset) = self.parse_value_at(pos + 1)?;
(offset, Value::DescVal(Reverse(val.into())))
}
StorageTag::Max => (start, Value::Sentinel),
StorageTag::Max => (start, Value::Bottom),
};
Ok((val, nxt))
}
@ -640,7 +643,7 @@ impl OwnTuple {
cache.truncate(start_len);
cache.push(self.data.len());
}
Value::Sentinel => self.seal_with_sentinel(),
Value::Bottom => self.seal_with_sentinel(),
Value::DescVal(Reverse(v)) => {
self.push_reverse_value(v);
}
@ -696,7 +699,7 @@ impl OwnTuple {
impl<'a> Extend<Value<'a>> for OwnTuple {
#[inline]
fn extend<T: IntoIterator<Item=Value<'a>>>(&mut self, iter: T) {
fn extend<T: IntoIterator<Item = Value<'a>>>(&mut self, iter: T) {
for v in iter {
self.push_value(&v)
}
@ -718,10 +721,11 @@ impl<T: AsRef<[u8]>> Hash for Tuple<T> {
impl<T: AsRef<[u8]>> Eq for Tuple<T> {}
impl<'a, P, T> From<(P, T)> for OwnTuple
where T: IntoIterator<Item=&'a Value<'a>>,
P: Into<u32> {
where
T: IntoIterator<Item = &'a Value<'a>>,
P: Into<u32>,
{
fn from((prefix, it): (P, T)) -> Self {
let mut ret = OwnTuple::with_prefix(prefix.into());
for item in it.into_iter() {
@ -731,7 +735,6 @@ impl<'a, P, T> From<(P, T)> for OwnTuple
}
}
pub(crate) enum ReifiedTupleData {
Own(Vec<u8>),
Slice(SlicePtr),
@ -747,7 +750,9 @@ impl Clone for ReifiedTupleData {
ReifiedTupleData::Slice(s) => ReifiedTupleData::Own(s.as_ref().to_vec()),
ReifiedTupleData::SharedSlice(s) => ReifiedTupleData::SharedSlice(s.clone()),
ReifiedTupleData::PinnableSlice(s) => ReifiedTupleData::Own(s.as_ref().to_vec()),
ReifiedTupleData::PinnableSliceShared(s) => ReifiedTupleData::PinnableSliceShared(s.clone())
ReifiedTupleData::PinnableSliceShared(s) => {
ReifiedTupleData::PinnableSliceShared(s.clone())
}
}
}
}
@ -782,7 +787,6 @@ impl From<PinnableSlicePtrShared> for ReifiedTupleData {
}
}
impl AsRef<[u8]> for ReifiedTupleData {
fn as_ref(&self) -> &[u8] {
match self {
@ -790,7 +794,7 @@ impl AsRef<[u8]> for ReifiedTupleData {
ReifiedTupleData::Slice(s) => s.as_ref(),
ReifiedTupleData::SharedSlice(s) => s.as_ref(),
ReifiedTupleData::PinnableSlice(s) => s.as_ref(),
ReifiedTupleData::PinnableSliceShared(s) => s.as_ref()
ReifiedTupleData::PinnableSliceShared(s) => s.as_ref(),
}
}
}

@ -1,9 +1,9 @@
use crate::data::tuple::{OwnTuple, ReifiedTuple, Tuple, TupleError};
use crate::data::value::Value;
use cozorocks::{PinnableSlicePtr, PinnableSlicePtrShared, SlicePtr, SlicePtrShared};
use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use std::result;
use cozorocks::{PinnableSlicePtr, PinnableSlicePtrShared, SlicePtr, SlicePtrShared};
use crate::data::tuple::{OwnTuple, ReifiedTuple, Tuple, TupleError};
use crate::data::value::Value;
#[derive(thiserror::Error, Debug)]
pub(crate) enum TupleSetError {
@ -105,13 +105,17 @@ impl TupleSet {
self.vals.extend(o.vals);
}
pub(crate) fn extend_keys<I, T>(&mut self, keys: I)
where I: IntoIterator<Item=T>,
ReifiedTuple: From<T> {
where
I: IntoIterator<Item = T>,
ReifiedTuple: From<T>,
{
self.keys.extend(keys.into_iter().map(ReifiedTuple::from));
}
pub(crate) fn extend_vals<I, T>(&mut self, keys: I)
where I: IntoIterator<Item=T>,
ReifiedTuple: From<T> {
where
I: IntoIterator<Item = T>,
ReifiedTuple: From<T>,
{
self.vals.extend(keys.into_iter().map(ReifiedTuple::from));
}
@ -136,19 +140,30 @@ impl TupleSet {
Ordering::Equal
}
pub(crate) fn get_value(&self, TupleSetIdx { is_key, t_set, col_idx }: TupleSetIdx) -> Result<Value> {
pub(crate) fn get_value(
&self,
TupleSetIdx {
is_key,
t_set,
col_idx,
}: TupleSetIdx,
) -> Result<Value> {
let tuples = if is_key { &self.keys } else { &self.vals };
let tuple = tuples.get(t_set).ok_or(TupleSetError::IndexOutOfBound(t_set))?;
let tuple = tuples
.get(t_set)
.ok_or(TupleSetError::IndexOutOfBound(t_set))?;
let res = tuple.get(col_idx)?;
Ok(res)
}
}
impl<I1, T1, I2, T2> From<(I1, I2)> for TupleSet
where I1: IntoIterator<Item=T1>,
ReifiedTuple: From<T1>,
I2: IntoIterator<Item=T2>,
ReifiedTuple: From<T2> {
where
I1: IntoIterator<Item = T1>,
ReifiedTuple: From<T1>,
I2: IntoIterator<Item = T2>,
ReifiedTuple: From<T2>,
{
fn from((keys, vals): (I1, I2)) -> Self {
TupleSet {
keys: keys.into_iter().map(ReifiedTuple::from).collect(),
@ -159,8 +174,8 @@ impl<I1, T1, I2, T2> From<(I1, I2)> for TupleSet
#[cfg(test)]
mod tests {
use std::mem;
use super::*;
use std::mem;
#[test]
fn sizes() {
@ -171,4 +186,4 @@ mod tests {
dbg!(mem::size_of::<ReifiedTuple>());
dbg!(mem::size_of::<TupleSet>());
}
}
}

@ -3,8 +3,10 @@ use crate::parser::text_identifier::build_name_in_def;
use crate::parser::{CozoParser, Rule};
use pest::iterators::Pair;
use pest::Parser;
use std::collections::BTreeMap;
use std::fmt::{Debug, Display, Formatter};
use std::result;
use uuid::Uuid;
#[derive(thiserror::Error, Debug)]
pub(crate) enum TypingError {
@ -34,6 +36,7 @@ pub(crate) enum Typing {
Float,
Text,
Uuid,
Bytes,
Nullable(Box<Typing>),
Homogeneous(Box<Typing>),
UnnamedTuple(Vec<Typing>),
@ -66,6 +69,7 @@ impl Display for Typing {
write!(f, "{}", joined)?;
write!(f, "}}")
}
Typing::Bytes => write!(f, "Bytes"),
}
}
}
@ -78,7 +82,29 @@ impl Debug for Typing {
impl Typing {
pub(crate) fn representative_value(&self) -> StaticValue {
todo!()
match self {
Typing::Any => Value::Bottom,
Typing::Bool => Value::Bool(false),
Typing::Int => Value::Int(0),
Typing::Float => Value::Float((0.).into()),
Typing::Text => Value::Text("".into()),
Typing::Uuid => Value::Uuid(Uuid::nil()),
Typing::Nullable(n) => n.representative_value(),
Typing::Homogeneous(h) => vec![h.representative_value()].into(),
Typing::UnnamedTuple(v) => v
.iter()
.map(|v| v.representative_value())
.collect::<Vec<_>>()
.into(),
Typing::NamedTuple(nt) => {
let map = nt
.iter()
.map(|(k, v)| (k.clone().into(), v.representative_value()))
.collect::<BTreeMap<_, _>>();
Value::from(map)
}
Typing::Bytes => Value::from(b"".as_ref()),
}
}
pub(crate) fn coerce<'a>(&self, v: Value<'a>) -> Result<Value<'a>> {
if *self == Typing::Any {
@ -102,6 +128,7 @@ impl Typing {
Typing::Float => self.coerce_float(v),
Typing::Text => self.coerce_text(v),
Typing::Uuid => self.coerce_uuid(v),
Typing::Bytes => self.coerce_bytes(v),
Typing::Homogeneous(t) => match v {
Value::List(vs) => Ok(Value::List(
vs.into_iter()
@ -150,6 +177,30 @@ impl Typing {
_ => Err(TypingError::TypeMismatch(self.clone(), v.to_static())),
}
}
fn coerce_bytes<'a>(&self, v: Value<'a>) -> Result<Value<'a>> {
match v {
v @ Value::Bytes(_) => Ok(v),
_ => Err(TypingError::TypeMismatch(self.clone(), v.to_static())),
}
}
}
impl<'a> Value<'a> {
pub(crate) fn deduce_typing(&self) -> Typing {
match self {
Value::Null => Typing::Any,
Value::Bool(_) => Typing::Bool,
Value::Int(_) => Typing::Int,
Value::Float(_) => Typing::Float,
Value::Uuid(_) => Typing::Uuid,
Value::Text(_) => Typing::Text,
Value::Bytes(_) => Typing::Bytes,
Value::List(_) => Typing::Any,
Value::Dict(_) => Typing::Any,
Value::DescVal(_) => Typing::Any,
Value::Bottom => Typing::Any,
}
}
}
impl TryFrom<&str> for Typing {
@ -161,14 +212,6 @@ impl TryFrom<&str> for Typing {
}
}
impl<'a> TryFrom<Value<'a>> for Typing {
type Error = TypingError;
fn try_from(value: Value<'a>) -> result::Result<Self, Self::Error> {
todo!()
}
}
impl TryFrom<Pair<'_, Rule>> for Typing {
type Error = TypingError;

@ -20,7 +20,7 @@ pub enum Value<'a> {
DescVal(Reverse<Box<Value<'a>>>),
Sentinel, // Acts as "any" in type inference, end value in sorting
Bottom, // Acts as "any" in type inference, end value in sorting
}
pub(crate) type StaticValue = Value<'static>;
@ -204,7 +204,7 @@ impl<'a> Display for Value<'a> {
}
f.write_char('}')?;
}
Value::Sentinel => write!(f, "Sentinel")?,
Value::Bottom => write!(f, "Sentinel")?,
Value::DescVal(Reverse(v)) => {
write!(f, "~{}", v)?;
}
@ -233,7 +233,7 @@ impl<'a> Value<'a> {
.map(|(k, v)| (Cow::Owned(k.into_owned()), v.to_static()))
.collect::<BTreeMap<Cow<'static, str>, StaticValue>>()
.into(),
Value::Sentinel => panic!("Cannot process sentinel value"),
Value::Bottom => panic!("Cannot process sentinel value"),
Value::Bytes(t) => Value::from(t.into_owned()),
Value::DescVal(Reverse(val)) => Value::DescVal(Reverse(val.to_static().into())),
}

@ -8,4 +8,4 @@ pub(crate) mod logger;
pub(crate) mod parser;
pub(crate) mod runtime;
pub use runtime::instance::DbInstance;
pub use runtime::instance::DbInstance;

@ -8,7 +8,6 @@ pub(crate) fn init_test_logger() {
.try_init();
}
#[cfg(test)]
mod tests {
use super::*;

@ -1,3 +1,3 @@
pub(crate) mod instance;
pub(crate) mod interpreter;
pub(crate) mod options;
pub(crate) mod options;

@ -1,17 +1,23 @@
use std::{mem, result};
use std::collections::{BTreeMap, BTreeSet};
use cozorocks::{BridgeError, DbPtr, destroy_db, OptionsPtrShared, PinnableSlicePtr, ReadOptionsPtr, TDbOptions, TransactionPtr, TransactOptions, WriteOptionsPtr};
use std::sync::{Arc, LockResult, Mutex, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::atomic::{AtomicU32, Ordering};
use lazy_static::lazy_static;
use log::error;
use crate::data::expr::StaticExpr;
use crate::data::tuple::{DataKind, OwnTuple, Tuple, TupleError};
use crate::data::tuple_set::MIN_TABLE_ID_BOUND;
use crate::data::typing::Typing;
use crate::data::value::{StaticValue, Value};
use crate::runtime::instance::DbInstanceError::TableDoesNotExist;
use crate::runtime::options::{default_options, default_read_options, default_txn_db_options, default_txn_options, default_write_options};
use crate::runtime::options::{
default_options, default_read_options, default_txn_db_options, default_txn_options,
default_write_options,
};
use cozorocks::{
destroy_db, BridgeError, DbPtr, OptionsPtrShared, PinnableSlicePtr, ReadOptionsPtr, TDbOptions,
TransactOptions, TransactionPtr, WriteOptionsPtr,
};
use lazy_static::lazy_static;
use log::error;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, LockResult, Mutex, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{mem, result};
#[derive(thiserror::Error, Debug)]
pub enum DbInstanceError {
@ -43,7 +49,6 @@ pub enum SessionStatus {
Completed,
}
struct SessionHandle {
id: usize,
db: DbPtr,
@ -84,29 +89,27 @@ impl DbInstance {
impl DbInstance {
pub fn session(&self) -> Result<Session> {
let mut handles = self.session_handles.lock()
let mut handles = self
.session_handles
.lock()
.map_err(|_| DbInstanceError::SessionLock)?;
let handle = handles.iter().find_map(|handle| {
match handle.try_lock() {
Ok(inner) => {
if inner.status == SessionStatus::Completed {
let db = inner.db.clone();
let idx = inner.id;
Some((db, idx, handle))
} else {
None
}
let handle = handles.iter().find_map(|handle| match handle.try_lock() {
Ok(inner) => {
if inner.status == SessionStatus::Completed {
let db = inner.db.clone();
let idx = inner.id;
Some((db, idx, handle))
} else {
None
}
Err(_) => None
}
Err(_) => None,
});
let (temp, handle) = match handle {
None => {
let idx = handles.len();
let temp_path = self.get_session_storage_path(idx);
let temp = DbPtr::open_non_txn(
&self.options,
&temp_path)?;
let temp = DbPtr::open_non_txn(&self.options, &temp_path)?;
let handle = Arc::new(Mutex::new(SessionHandle {
status: SessionStatus::Prepared,
id: idx,
@ -117,7 +120,7 @@ impl DbInstance {
(temp, handle)
}
Some((db, _, handle)) => (db, handle.clone())
Some((db, _, handle)) => (db, handle.clone()),
};
drop(handles);
@ -234,7 +237,9 @@ impl Session {
pub fn start(mut self) -> Result<Self> {
{
self.push_env();
let mut handle = self.session_handle.lock()
let mut handle = self
.session_handle
.lock()
.map_err(|_| DbInstanceError::SessionLock)?;
handle.status = SessionStatus::Running;
self.cur_table_id = handle.next_table_id.into();
@ -259,11 +264,10 @@ impl Session {
}
pub fn stop(&mut self) -> Result<()> {
self.clear_data()?;
let mut handle = self.session_handle.lock()
.map_err(|_| {
error!("failed to stop interpreter");
DbInstanceError::SessionLock
})?;
let mut handle = self.session_handle.lock().map_err(|_| {
error!("failed to stop interpreter");
DbInstanceError::SessionLock
})?;
handle.next_table_id = self.cur_table_id.load(Ordering::SeqCst);
handle.status = SessionStatus::Completed;
Ok(())
@ -272,14 +276,18 @@ impl Session {
pub(crate) fn get_next_temp_table_id(&self) -> u32 {
let mut res = self.cur_table_id.fetch_add(1, Ordering::SeqCst);
while res.wrapping_add(1) < MIN_TABLE_ID_BOUND {
res = self.cur_table_id.fetch_add(MIN_TABLE_ID_BOUND, Ordering::SeqCst);
res = self
.cur_table_id
.fetch_add(MIN_TABLE_ID_BOUND, Ordering::SeqCst);
}
res + 1
}
pub(crate) fn txn(&self, w_opts: Option<WriteOptionsPtr>) -> TransactionPtr {
self.main.txn(default_txn_options(self.optimistic),
w_opts.unwrap_or_else(default_write_options))
self.main.txn(
default_txn_options(self.optimistic),
w_opts.unwrap_or_else(default_write_options),
)
}
pub(crate) fn get_next_main_table_id(&self) -> Result<u32> {
@ -287,8 +295,7 @@ impl Session {
let key = MAIN_DB_TABLE_ID_SEQ_KEY.as_ref();
let cur_id = match txn.get_owned(&self.r_opts_main, key)? {
None => {
let val = OwnTuple::from(
(DataKind::Data, &[(MIN_TABLE_ID_BOUND as i64).into()]));
let val = OwnTuple::from((DataKind::Data, &[(MIN_TABLE_ID_BOUND as i64).into()]));
txn.put(key, &val)?;
MIN_TABLE_ID_BOUND
}
@ -304,10 +311,14 @@ impl Session {
Ok(cur_id + 1)
}
pub(crate) fn table_access_guard(&self, ids: BTreeSet<u32>) -> Result<RwLockReadGuard<()>> {
self.table_locks.try_read().map_err(|_| DbInstanceError::TableAccessLock)
self.table_locks
.try_read()
.map_err(|_| DbInstanceError::TableAccessLock)
}
pub(crate) fn table_mutation_guard(&self, ids: BTreeSet<u32>) -> Result<RwLockWriteGuard<()>> {
self.table_locks.write().map_err(|_| DbInstanceError::TableAccessLock)
self.table_locks
.write()
.map_err(|_| DbInstanceError::TableAccessLock)
}
}
@ -323,13 +334,12 @@ impl Drop for Session {
}
}
#[cfg(test)]
mod tests {
use std::time::Instant;
use crate::logger::init_test_logger;
use super::*;
use crate::logger::init_test_logger;
use crate::runtime::instance::DbInstance;
use std::time::Instant;
fn test_send<T: Send>(_x: T) {}
@ -353,4 +363,4 @@ mod tests {
dbg!(start.elapsed());
Ok(())
}
}
}

@ -1,15 +1,17 @@
use lazy_static::lazy_static;
use cozorocks::{FlushOptionsPtr, OptionsPtr, OTxnDbOptionsPtr, OTxnOptionsPtr, PTxnDbOptionsPtr, PTxnOptionsPtr, ReadOptionsPtr, RustComparatorPtr, TDbOptions, TransactOptions, WriteOptionsPtr};
use crate::data::tuple::PREFIX_LEN;
use cozorocks::{
FlushOptionsPtr, OTxnDbOptionsPtr, OTxnOptionsPtr, OptionsPtr, PTxnDbOptionsPtr,
PTxnOptionsPtr, ReadOptionsPtr, RustComparatorPtr, TDbOptions, TransactOptions,
WriteOptionsPtr,
};
use lazy_static::lazy_static;
const COMPARATOR_NAME: &str = "cozo_cmp_v1";
lazy_static! {
static ref DEFAULT_COMPARATOR: RustComparatorPtr = RustComparatorPtr::new(
COMPARATOR_NAME,
crate::data::key_order::compare,
false);
}
static ref DEFAULT_COMPARATOR: RustComparatorPtr =
RustComparatorPtr::new(COMPARATOR_NAME, crate::data::key_order::compare, false);
}
pub fn default_options() -> OptionsPtr {
let mut options = OptionsPtr::default();
@ -51,4 +53,4 @@ pub fn default_txn_options(optimistic: bool) -> TransactOptions {
o.set_deadlock_detect(true);
TransactOptions::Pessimistic(o)
}
}
}

Loading…
Cancel
Save