parse pull specs

main
Ziyang Hu 2 years ago
parent 85c500bf62
commit 3fe5a4865a

@ -9,6 +9,7 @@ use smallvec::SmallVec;
use crate::data::encode::EncodedVec;
use crate::data::id::{AttrId, EntityId, TxId};
use crate::data::json::JsonValue;
use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp;
use crate::data::value::Value;
@ -21,7 +22,7 @@ pub enum AttributeError {
}
#[repr(u8)]
#[derive(Clone, PartialEq, Ord, PartialOrd, Eq, Debug, Deserialize, Serialize)]
#[derive(Copy, Clone, PartialEq, Ord, PartialOrd, Eq, Debug, Deserialize, Serialize)]
pub(crate) enum AttributeCardinality {
One = 1,
Many = 2,
@ -292,7 +293,7 @@ impl Attribute {
pub(crate) fn decode(data: &[u8]) -> Result<Self> {
Ok(rmp_serde::from_slice(data)?)
}
pub(crate) fn to_json(&self) -> serde_json::Value {
pub(crate) fn to_json(&self) -> JsonValue {
json!({
"id": self.id.0,
"keyword": self.keyword.to_string(),

@ -3,6 +3,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{DateTime, TimeZone, Utc};
use serde_derive::{Deserialize, Serialize};
use crate::data::json::JsonValue;
use crate::data::triple::StoreOp;
@ -51,13 +52,13 @@ impl TryFrom<&str> for Validity {
#[derive(Debug, thiserror::Error)]
pub enum IdError {
#[error("Cannot convert to validity: {0}")]
JsonValidityError(serde_json::Value),
JsonValidityError(JsonValue),
}
impl TryFrom<&serde_json::Value> for Validity {
impl TryFrom<&JsonValue> for Validity {
type Error = anyhow::Error;
fn try_from(value: &serde_json::Value) -> Result<Self, Self::Error> {
fn try_from(value: &JsonValue) -> Result<Self, Self::Error> {
if let Some(v) = value.as_i64() {
return Ok(v.into());
}

@ -104,8 +104,8 @@ impl TryFrom<&'_ JsonValue> for Attribute {
let indexing = match map.get("index") {
None => AttributeIndex::None,
Some(serde_json::Value::Bool(true)) => AttributeIndex::Indexed,
Some(serde_json::Value::Bool(false)) => AttributeIndex::None,
Some(JsonValue::Bool(true)) => AttributeIndex::Indexed,
Some(JsonValue::Bool(false)) => AttributeIndex::None,
Some(v) => AttributeIndex::try_from(v.as_str().ok_or_else(|| {
JsonError::Conversion(value.clone(), "AttributeIndexing".to_string())
})?)?,

@ -4,6 +4,8 @@ use std::str::Utf8Error;
use serde_derive::{Deserialize, Serialize};
use smartstring::{LazyCompact, SmartString};
use crate::data::json::JsonValue;
#[derive(Debug, thiserror::Error)]
pub enum KeywordError {
#[error("cannot convert to keyword: {0}")]
@ -16,7 +18,7 @@ pub enum KeywordError {
Utf8(#[from] Utf8Error),
#[error("unexpected json {0}")]
UnexpectedJson(serde_json::Value),
UnexpectedJson(JsonValue),
}
#[derive(Clone, PartialEq, PartialOrd, Eq, Ord, Deserialize, Serialize)]

@ -2,6 +2,7 @@ use anyhow::Result;
use itertools::Itertools;
use crate::data::attr::Attribute;
use crate::data::json::JsonValue;
use crate::data::triple::StoreOp;
#[derive(Debug)]
@ -11,7 +12,7 @@ pub struct AttrTxItem {
}
impl AttrTxItem {
pub fn parse_request(req: &serde_json::Value) -> Result<(Vec<AttrTxItem>, String)> {
pub fn parse_request(req: &JsonValue) -> Result<(Vec<AttrTxItem>, String)> {
let map = req
.as_object()
.ok_or_else(|| AttrTxItemError::Decoding(req.clone(), "expected object".to_string()))?;
@ -30,7 +31,7 @@ impl AttrTxItem {
req.clone(),
"'attrs' cannot be empty".to_string(),
)
.into());
.into());
}
let res = items.iter().map(AttrTxItem::try_from).try_collect()?;
Ok((res, comment))
@ -40,13 +41,13 @@ impl AttrTxItem {
#[derive(Debug, thiserror::Error)]
pub enum AttrTxItemError {
#[error("Error decoding {0}: {1}")]
Decoding(serde_json::Value, String),
Decoding(JsonValue, String),
}
impl TryFrom<&'_ serde_json::Value> for AttrTxItem {
impl TryFrom<&'_ JsonValue> for AttrTxItem {
type Error = anyhow::Error;
fn try_from(value: &'_ serde_json::Value) -> Result<Self, Self::Error> {
fn try_from(value: &'_ JsonValue) -> Result<Self, Self::Error> {
let map = value.as_object().ok_or_else(|| {
AttrTxItemError::Decoding(value.clone(), "expected object".to_string())
})?;
@ -55,7 +56,7 @@ impl TryFrom<&'_ serde_json::Value> for AttrTxItem {
value.clone(),
"object must have exactly one field".to_string(),
)
.into());
.into());
}
let (k, v) = map.into_iter().next().unwrap();
let op = match k as &str {

@ -1,2 +1,3 @@
pub(crate) mod attr;
pub(crate) mod triple;
pub(crate) mod pull;

@ -0,0 +1,193 @@
use anyhow::Result;
use itertools::Itertools;
use serde_json::{json, Map};
use crate::data::attr::AttributeCardinality;
use crate::data::json::JsonValue;
use crate::data::keyword::Keyword;
use crate::data::value::Value;
use crate::preprocess::triple::TxError;
use crate::runtime::transact::SessionTx;
use crate::transact::pull::{AttrPullSpec, PullSpec, PullSpecs};
#[derive(Debug, thiserror::Error)]
pub enum PullError {
#[error("cannot parse pull format {0}: {1}")]
InvalidFormat(JsonValue, String),
}
impl SessionTx {
pub(crate) fn parse_pull(&mut self, desc: &JsonValue, no_parent: bool) -> Result<PullSpecs> {
if let Some(inner) = desc.as_array() {
inner
.iter()
.map(|v| self.parse_pull_element(v, if no_parent { None } else { Some(inner) }))
.try_collect()
} else {
Err(PullError::InvalidFormat(desc.clone(), "expect array".to_string()).into())
}
}
pub(crate) fn parse_pull_element(
&mut self,
desc: &JsonValue,
parent: Option<&Vec<JsonValue>>,
) -> Result<PullSpec> {
match desc {
JsonValue::String(s) if s == "*" => Ok(PullSpec::PullAll),
JsonValue::String(s) => {
let input_kw = Keyword::from(s.as_ref());
let reverse = input_kw.0.starts_with('<');
let kw = if reverse {
Keyword::from(input_kw.0.strip_prefix('<').unwrap())
} else {
input_kw.clone()
};
let attr = self.attr_by_kw(&kw)?.ok_or(TxError::AttrNotFound(kw))?;
let cardinality = attr.cardinality;
Ok(PullSpec::Attr(AttrPullSpec {
attr,
default_val: Value::Null,
reverse,
name: input_kw,
cardinality,
take: None,
nested: vec![],
recursive: false,
recursion_limit: None,
}))
}
JsonValue::Object(m) => self.parse_pull_obj(m, parent),
v => Err(
PullError::InvalidFormat(v.clone(), "expect string or object".to_string()).into(),
),
}
}
pub(crate) fn parse_pull_obj(
&mut self,
desc: &Map<String, JsonValue>,
parent: Option<&Vec<JsonValue>>,
) -> Result<PullSpec> {
let mut default_val = Value::Null;
let mut as_override = None;
let mut take = None;
let mut cardinality_override = None;
let mut input_kw = None;
let mut sub_target = vec![];
let mut recursive = false;
let mut recursion_limit = None;
for (k, v) in desc {
match k as &str {
"_as" => {
as_override = Some(Keyword::from(v.as_str().ok_or_else(|| {
PullError::InvalidFormat(v.clone(), "expect string".to_string())
})?))
}
"_limit" => {
take = Some(v.as_u64().ok_or_else(|| {
PullError::InvalidFormat(v.clone(), "expect limit".to_string())
})? as usize)
}
"_cardinality" => {
cardinality_override =
Some(AttributeCardinality::try_from(v.as_str().ok_or_else(
|| PullError::InvalidFormat(v.clone(), "expect string".to_string()),
)?)?)
}
"_default" => default_val = Value::from(v).to_static(),
k if !k.starts_with('_') => {
if input_kw.is_some() {
return Err(PullError::InvalidFormat(
JsonValue::Object(desc.clone()),
"only one sublevel target expected".to_string(),
)
.into());
}
input_kw = Some(Keyword::from(k));
sub_target = {
if let Some(arr) = v.as_array() {
arr.clone()
} else {
if let Some(u) = v.as_u64() {
recursion_limit = Some(u as usize);
} else if *v != json!("...") {
return Err(PullError::InvalidFormat(
v.clone(),
"expect array".to_string(),
)
.into());
}
let parent = parent.ok_or_else(|| {
PullError::InvalidFormat(
JsonValue::Object(desc.clone()),
"cannot recurse at top level".to_string(),
)
})?;
// not clear what two recursions would do
if recursive {
return Err(PullError::InvalidFormat(
JsonValue::Object(desc.clone()),
"cannot have two recursions".to_string(),
)
.into());
}
recursive = true;
// remove self to prevent infinite recursion
parent
.iter()
.filter(|p| {
if let Some(o) = p.as_object() {
o != desc
} else {
true
}
})
.cloned()
.collect_vec()
}
};
}
v => {
return Err(PullError::InvalidFormat(
v.into(),
"unexpected spec key".to_string(),
)
.into())
}
}
}
if input_kw.is_none() {
return Err(PullError::InvalidFormat(
JsonValue::Object(desc.clone()),
"expect target key".to_string(),
)
.into());
}
let input_kw = input_kw.unwrap();
// let recurse_target = sub_target.unwrap();
let reverse = input_kw.0.starts_with('<');
let kw = if reverse {
Keyword::from(input_kw.0.strip_prefix('<').unwrap())
} else {
input_kw.clone()
};
let attr = self.attr_by_kw(&kw)?.ok_or(TxError::AttrNotFound(kw))?;
let cardinality = cardinality_override.unwrap_or(attr.cardinality);
let nested = self.parse_pull(&JsonValue::Array(sub_target), recursive)?;
Ok(PullSpec::Attr(AttrPullSpec {
attr,
default_val,
reverse,
name: as_override.unwrap_or(input_kw),
cardinality,
take,
nested,
recursive,
recursion_limit,
}))
}
}

@ -7,6 +7,7 @@ use serde_json::Map;
use crate::data::attr::{Attribute, AttributeIndex, AttributeTyping};
use crate::data::id::{AttrId, EntityId, Validity};
use crate::data::json::JsonValue;
use crate::data::keyword::Keyword;
use crate::data::value::Value;
use crate::runtime::transact::SessionTx;
@ -44,7 +45,7 @@ impl Display for TxAction {
#[derive(Debug, thiserror::Error)]
pub enum TxError {
#[error("Error decoding {0}: {1}")]
Decoding(serde_json::Value, String),
Decoding(JsonValue, String),
#[error("triple length error")]
TripleLength,
#[error("attribute not found: {0}")]
@ -125,7 +126,7 @@ impl SessionTx {
/// nesting is allowed for values of type `ref` and `component`
pub fn parse_tx_requests<'a>(
&mut self,
req: &'a serde_json::Value,
req: &'a JsonValue,
) -> Result<(Vec<Quintuple<'a>>, String)> {
let map = req
.as_object()
@ -158,7 +159,7 @@ impl SessionTx {
}
fn parse_tx_request_item<'a>(
&mut self,
item: &'a serde_json::Value,
item: &'a JsonValue,
default_since: Validity,
temp_id_ctx: &mut TempIdCtx,
collected: &mut Vec<Quintuple<'a>>,
@ -175,10 +176,10 @@ impl SessionTx {
(inner, TxAction::Ensure)
} else {
return Err(TxError::Decoding(
serde_json::Value::Object(item.clone()),
JsonValue::Object(item.clone()),
"expect any of the keys 'put', 'retract', 'erase', 'ensure'".to_string(),
)
.into());
.into());
}
};
let since = match item.get("since") {
@ -201,7 +202,7 @@ impl SessionTx {
&mut self,
eid: EntityId,
attr: &Attribute,
value: &'a serde_json::Value,
value: &'a JsonValue,
action: TxAction,
since: Validity,
temp_id_ctx: &mut TempIdCtx,
@ -228,10 +229,10 @@ impl SessionTx {
action,
"using temp id instead of perm id".to_string(),
)
.into());
.into());
}
let v = if let serde_json::Value::Object(inner) = value {
let v = if let JsonValue::Object(inner) = value {
self.parse_tx_component(&attr, inner, action, since, temp_id_ctx, collected)?
} else {
attr.coerce_value(Value::from(value), temp_id_ctx)?
@ -252,7 +253,7 @@ impl SessionTx {
fn parse_tx_component<'a>(
&mut self,
parent_attr: &Attribute,
comp: &'a Map<String, serde_json::Value>,
comp: &'a Map<String, JsonValue>,
action: TxAction,
since: Validity,
temp_id_ctx: &mut TempIdCtx,
@ -263,7 +264,7 @@ impl SessionTx {
action,
"component shorthand cannot be used".to_string(),
)
.into());
.into());
}
let (eid, has_unique_attr) =
self.parse_tx_request_obj(comp, true, action, since, temp_id_ctx, collected)?;
@ -275,7 +276,7 @@ impl SessionTx {
}
fn parse_tx_request_arr<'a>(
&mut self,
item: &'a [serde_json::Value],
item: &'a [JsonValue],
action: TxAction,
since: Validity,
temp_id_ctx: &mut TempIdCtx,
@ -288,7 +289,7 @@ impl SessionTx {
action,
"singlet only allowed for 'retract'".to_string(),
)
.into());
.into());
}
let eid = eid.as_u64().ok_or_else(|| {
TxError::Decoding(eid.clone(), "cannot parse as entity id".to_string())
@ -316,7 +317,7 @@ impl SessionTx {
action,
"doublet only allowed for 'retract'".to_string(),
)
.into());
.into());
}
let kw: Keyword = attr.try_into()?;
let attr = self.attr_by_kw(&kw)?.ok_or(TxError::AttrNotFound(kw))?;
@ -349,9 +350,9 @@ impl SessionTx {
}
fn parse_tx_triple<'a>(
&mut self,
eid: &serde_json::Value,
attr_kw: &serde_json::Value,
val: &'a serde_json::Value,
eid: &JsonValue,
attr_kw: &JsonValue,
val: &'a JsonValue,
action: TxAction,
since: Validity,
temp_id_ctx: &mut TempIdCtx,
@ -367,7 +368,7 @@ impl SessionTx {
}
let id = if attr.indexing.is_unique_index() {
let value = if let serde_json::Value::Object(inner) = val {
let value = if let JsonValue::Object(inner) = val {
self.parse_tx_component(&attr, inner, action, since, temp_id_ctx, collected)?
} else {
attr.coerce_value(val.into(), temp_id_ctx)?
@ -398,7 +399,7 @@ impl SessionTx {
id.0,
"conflicting id for identity value".into(),
)
.into());
.into());
}
id
} else if eid.is_string() {
@ -406,7 +407,7 @@ impl SessionTx {
existing_id.0,
"specifying temp_id string together with unique constraint".into(),
)
.into());
.into());
} else {
existing_id
}
@ -436,7 +437,7 @@ impl SessionTx {
}
fn parse_tx_request_obj<'a>(
&mut self,
item: &'a Map<String, serde_json::Value>,
item: &'a Map<String, JsonValue>,
is_sub_component: bool,
action: TxAction,
since: Validity,
@ -456,7 +457,7 @@ impl SessionTx {
has_unique_attr = has_unique_attr || attr.indexing.is_unique_index();
has_identity_attr = has_identity_attr || attr.indexing == AttributeIndex::Identity;
if attr.indexing == AttributeIndex::Identity {
let value = if let serde_json::Value::Object(inner) = v {
let value = if let JsonValue::Object(inner) = v {
self.parse_tx_component(
&attr,
inner,
@ -478,7 +479,7 @@ impl SessionTx {
existing_eid.0,
"conflicting entity id given".to_string(),
)
.into());
.into());
}
}
eid = Some(existing_eid)
@ -501,7 +502,7 @@ impl SessionTx {
given_id.0,
"temp id given where perm id is required".to_string(),
)
.into());
.into());
}
if let Some(prev_id) = eid {
if prev_id != given_id {
@ -509,7 +510,7 @@ impl SessionTx {
given_id.0,
"conflicting entity id given".to_string(),
)
.into());
.into());
}
}
eid = Some(given_id);
@ -520,7 +521,7 @@ impl SessionTx {
eid_inner.0,
"conflicting entity id given".to_string(),
)
.into());
.into());
}
let temp_id_str = temp_id.as_str().ok_or_else(|| {
TxError::Decoding(
@ -543,7 +544,7 @@ impl SessionTx {
action,
"upsert requires identity attribute present".to_string(),
)
.into());
.into());
}
for (attr, v) in pairs {
self.parse_tx_request_inner(eid, &attr, v, action, since, temp_id_ctx, collected)?;
@ -559,7 +560,7 @@ impl SessionTx {
action,
"cannot use non-unique fields to specify entity".to_string(),
)
.into());
.into());
}
}
}
@ -567,14 +568,14 @@ impl SessionTx {
}
}
fn assert_absence_of_keys(m: &Map<String, serde_json::Value>, keys: &[&str]) -> Result<()> {
fn assert_absence_of_keys(m: &Map<String, JsonValue>, keys: &[&str]) -> Result<()> {
for k in keys {
if m.contains_key(*k) {
return Err(TxError::Decoding(
serde_json::Value::Object(m.clone()),
JsonValue::Object(m.clone()),
format!("object must not contain key {}", k),
)
.into());
.into());
}
}
Ok(())

@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use anyhow::Result;
use itertools::Itertools;
@ -9,15 +9,16 @@ use serde_json::json;
use cozorocks::{DbBuilder, DbIter, RocksDb};
use crate::AttrTxItem;
use crate::data::compare::{DB_KEY_PREFIX_LEN, rusty_cmp};
use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN};
use crate::data::encode::{
decode_ea_key, decode_value_from_key, decode_value_from_val, encode_eav_key, StorageTag,
};
use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::json::JsonValue;
use crate::data::triple::StoreOp;
use crate::data::value::Value;
use crate::runtime::transact::SessionTx;
use crate::AttrTxItem;
pub struct Db {
db: RocksDb,
@ -118,10 +119,10 @@ impl Db {
it.seek_to_start();
it
}
pub fn transact_triples(&self, payload: &serde_json::Value) -> Result<serde_json::Value> {
pub fn transact_triples(&self, payload: &JsonValue) -> Result<JsonValue> {
let mut tx = self.transact_write()?;
let (payloads, comment) = tx.parse_tx_requests(payload)?;
let res: serde_json::Value = tx
let res: JsonValue = tx
.tx_triples(payloads)?
.iter()
.map(|(eid, size)| json!([eid.0, size]))
@ -133,10 +134,10 @@ impl Db {
"results": res
}))
}
pub fn transact_attributes(&self, payload: &serde_json::Value) -> Result<serde_json::Value> {
pub fn transact_attributes(&self, payload: &JsonValue) -> Result<JsonValue> {
let (attrs, comment) = AttrTxItem::parse_request(payload)?;
let mut tx = self.transact_write()?;
let res: serde_json::Value = tx
let res: JsonValue = tx
.tx_attrs(attrs)?
.iter()
.map(|(op, aid)| json!([aid.0, op.to_string()]))
@ -148,11 +149,11 @@ impl Db {
"results": res
}))
}
pub fn current_schema(&self) -> Result<serde_json::Value> {
pub fn current_schema(&self) -> Result<JsonValue> {
let mut tx = self.transact()?;
tx.all_attrs().map_ok(|v| v.to_json()).try_collect()
}
pub fn entities_at(&self, vld: Option<Validity>) -> Result<serde_json::Value> {
pub fn entities_at(&self, vld: Option<Validity>) -> Result<JsonValue> {
let vld = vld.unwrap_or_else(Validity::current);
let mut tx = self.transact()?;
let mut current = encode_eav_key(
@ -173,7 +174,7 @@ impl Db {
.upper_bound(&upper_bound)
.total_order_seek(true)
.start();
let mut collected: BTreeMap<EntityId, serde_json::Value> = BTreeMap::default();
let mut collected: BTreeMap<EntityId, JsonValue> = BTreeMap::default();
it.seek(&current);
while let Some((k_slice, v_slice)) = it.pair()? {
debug_assert_eq!(

@ -25,6 +25,7 @@ pub(crate) enum PullSpec {
#[derive(Debug, Clone)]
pub(crate) struct AttrPullSpec {
pub(crate) attr: Attribute,
pub(crate) default_val: StaticValue,
pub(crate) reverse: bool,
pub(crate) name: Keyword,
pub(crate) cardinality: AttributeCardinality,
@ -70,7 +71,7 @@ impl SessionTx {
let (_, _, value) = found?;
self.pull_attr_collect(spec, value, vld, collector, recursive_seen)?;
} else {
self.pull_attr_collect(spec, Value::Null, vld, collector, recursive_seen)?;
self.pull_attr_collect(spec, spec.default_val.clone(), vld, collector, recursive_seen)?;
}
} else {
let mut collection: Vec<StaticValue> = vec![];
@ -231,7 +232,7 @@ impl SessionTx {
let (_, _, value) = found?;
self.pull_attr_collect(spec, Value::EnId(value), vld, collector, recursive_seen)?;
} else {
self.pull_attr_collect(spec, Value::Null, vld, collector, recursive_seen)?;
self.pull_attr_collect(spec, spec.default_val.clone(), vld, collector, recursive_seen)?;
}
} else {
let mut collection: Vec<StaticValue> = vec![];

Loading…
Cancel
Save