edge iterator

main
Ziyang Hu 2 years ago
parent d9b1be6273
commit 06159699f9

@ -131,17 +131,19 @@ impl<'a> ExecPlan<'a> {
}
ExecPlan::EdgeItPlan { it, info, .. } => {
let it = it.try_get()?;
let prefix_tuple = OwnTuple::with_prefix(info.table_id.id as u32);
let mut prefix_tuple = OwnTuple::with_prefix(info.table_id.id as u32);
prefix_tuple.push_int(info.src_table_id.id);
it.seek(prefix_tuple);
Ok(Box::new(EdgeIterator { it, started: false }))
Ok(Box::new(EdgeIterator { it, started: false, src_table_id: info.src_table_id.id }))
}
ExecPlan::EdgeKeyOnlyBwdItPlan { it, info } => {
let it = it.try_get()?;
let prefix_tuple = OwnTuple::with_prefix(info.table_id.id as u32);
let mut prefix_tuple = OwnTuple::with_prefix(info.table_id.id as u32);
prefix_tuple.push_int(info.dst_table_id.id);
it.seek(prefix_tuple);
Ok(Box::new(EdgeKeyOnlyBwdIterator { it, started: false }))
Ok(Box::new(EdgeKeyOnlyBwdIterator { it, started: false, dst_table_id: info.dst_table_id.id }))
}
ExecPlan::KeySortedWithAssocItPlan { main, associates } => {
let buffer = iter::repeat_with(|| None).take(associates.len()).collect();
@ -405,6 +407,7 @@ impl<'a> Iterator for NodeIterator<'a> {
pub struct EdgeIterator<'a> {
it: &'a IteratorPtr<'a>,
started: bool,
src_table_id: i64,
}
impl<'a> Iterator for EdgeIterator<'a> {
@ -420,15 +423,18 @@ impl<'a> Iterator for EdgeIterator<'a> {
match unsafe { self.it.pair() } {
None => return None,
Some((k, v)) => {
let kt = Tuple::new(k);
let vt = Tuple::new(v);
if matches!(vt.data_kind(), Ok(DataKind::Edge)) {
self.it.next()
} else {
let kt = Tuple::new(k);
if kt.get_int(0) != Some(self.src_table_id) {
return None;
}
if matches!(vt.data_kind(), Ok(DataKind::Data)) {
return Some(Ok(MegaTuple {
keys: vec![kt.into()],
vals: vec![vt.into()],
}));
} else {
self.it.next();
}
}
}
@ -439,6 +445,7 @@ impl<'a> Iterator for EdgeIterator<'a> {
pub struct EdgeKeyOnlyBwdIterator<'a> {
it: &'a IteratorPtr<'a>,
started: bool,
dst_table_id: i64,
}
impl<'a> Iterator for EdgeKeyOnlyBwdIterator<'a> {
@ -455,6 +462,9 @@ impl<'a> Iterator for EdgeKeyOnlyBwdIterator<'a> {
None => return None,
Some((_k, rev_k)) => {
let rev_k_tuple = Tuple::new(rev_k);
if rev_k_tuple.get_int(0) != Some(self.dst_table_id) {
return None;
}
if !matches!(rev_k_tuple.data_kind(), Ok(DataKind::Edge)) {
self.it.next()
} else {
@ -975,7 +985,7 @@ mod tests {
.unwrap();
let sel_pat = sess.parse_select_pattern(p).unwrap();
let sel_vals = Value::Dict(sel_pat.vals.into_iter().map(|(k, v)| (k.into(), v)).collect());
let amap = sess.base_relation_to_accessor_map(
let amap = sess.node_accessor_map(
&from_pat.binding,
&from_pat.info,
);
@ -1081,6 +1091,19 @@ mod tests {
for val in plan.iter()? {
println!("{}", val?)
}
let s = r##"from hj:HasJob
where hj.salary < 5000 || hj._dst_id == 19
select {src_id: hj._src_id, dst_id: hj._dst_id, salary: hj.salary, hire_date: hj.hire_date}"##;
let parsed = Parser::parse(Rule::relational_query, s)?.next().unwrap();
let plan = sess.query_to_plan(parsed)?;
println!("{:?}", plan);
let plan = sess.reify_output_plan(plan)?;
println!("{:?}", plan);
for val in plan.iter()? {
println!("{:?}", val)
}
}
drop(engine);
let _ = fs::remove_dir_all(db_path);

@ -193,7 +193,7 @@ impl<'a, 'b> MutationManager<'a, 'b> {
.iter()
.zip(src_key_list.into_iter())
{
let v = t.coerce(v)?;
let v = t.1.coerce(v)?;
key_tuple.push_value(&v);
src_keys.push(v);
}
@ -215,7 +215,7 @@ impl<'a, 'b> MutationManager<'a, 'b> {
.iter()
.zip(dst_key_list.into_iter())
{
let v = t.coerce(v)?;
let v = t.1.coerce(v)?;
key_tuple.push_value(&v);
ikey_tuple.push_value(&v);
}

@ -36,7 +36,7 @@ impl<'a> Session<'a> {
fn do_reify_intermediate_plan(&'a self, plan: ExecPlan<'a>) -> Result<(ExecPlan<'a>, AccessorMap)> {
let res = match plan {
ExecPlan::NodeItPlan { info, binding, .. } => {
let amap = self.base_relation_to_accessor_map(&binding, &info);
let amap = self.node_accessor_map(&binding, &info);
let amap = self.convert_to_relative_amap(amap);
let it = if info.table_id.in_root {
self.txn.iterator(true, &self.perm_cf)
@ -51,7 +51,22 @@ impl<'a> Session<'a> {
};
(plan, amap)
}
ExecPlan::EdgeItPlan { .. } => todo!(),
ExecPlan::EdgeItPlan { info, binding, .. } => {
let amap = self.edge_accessor_map(&binding, &info);
let amap = self.convert_to_relative_amap(amap);
let it = if info.table_id.in_root {
self.txn.iterator(true, &self.perm_cf)
} else {
self.txn.iterator(true, &self.temp_cf)
};
let it = IteratorSlot::Reified(it);
let plan = ExecPlan::EdgeItPlan {
it,
info,
binding,
};
(plan, amap)
}
ExecPlan::EdgeKeyOnlyBwdItPlan { .. } => todo!(),
ExecPlan::KeySortedWithAssocItPlan { .. } => todo!(),
ExecPlan::CartesianProdItPlan { .. } => todo!(),
@ -152,7 +167,7 @@ impl<'a> Session<'a> {
};
Ok(res)
}
pub(crate) fn base_relation_to_accessor_map(
pub(crate) fn node_accessor_map(
&self,
binding: &str,
info: &TableInfo,
@ -174,6 +189,36 @@ impl<'a> Session<'a> {
}
BTreeMap::from([(binding.to_string(), ret)])
}
pub(crate) fn edge_accessor_map(
&self,
binding: &str,
info: &TableInfo,
) -> AccessorMap {
let mut ret = BTreeMap::new();
let src_key_len = info.src_key_typing.len();
let dst_key_len = info.dst_key_typing.len();
for (i, (k, _)) in info.src_key_typing.iter().enumerate() {
ret.insert("_src_".to_string() + k, (info.table_id, (true, 1 + i).into()));
}
for (i, (k, _)) in info.dst_key_typing.iter().enumerate() {
ret.insert("_dst_".to_string() + k, (info.table_id, (true, 2 + src_key_len + i).into()));
}
for (i, (k, _)) in info.key_typing.iter().enumerate() {
ret.insert(k.into(), (info.table_id, (true, 2 + src_key_len + dst_key_len + i).into()));
}
for (i, (k, _)) in info.val_typing.iter().enumerate() {
ret.insert(k.into(), (info.table_id, (false, i).into()));
}
for assoc in &info.associates {
for (i, (k, _)) in assoc.key_typing.iter().enumerate() {
ret.insert(k.into(), (assoc.table_id, (true, i).into()));
}
for (i, (k, _)) in assoc.val_typing.iter().enumerate() {
ret.insert(k.into(), (assoc.table_id, (false, i).into()));
}
}
BTreeMap::from([(binding.to_string(), ret)])
}
fn convert_where_data_to_plan<'b>(
&self,
plan: ExecPlan<'b>,

@ -87,8 +87,8 @@ pub struct TableInfo {
pub data_keys: HashSet<String>,
pub key_typing: Vec<(String, Typing)>,
pub val_typing: Vec<(String, Typing)>,
pub src_key_typing: Vec<Typing>,
pub dst_key_typing: Vec<Typing>,
pub src_key_typing: Vec<(String, Typing)>,
pub dst_key_typing: Vec<(String, Typing)>,
pub associates: Vec<TableInfo>,
}
@ -183,7 +183,7 @@ impl<'a> Session<'a> {
)?
.extract_named_tuple()
.ok_or_else(|| CozoError::LogicError("Corrupt data".to_string()))?;
let src_key_typing = src_key.into_iter().map(|(_, v)| v).collect();
let src_key_typing = src_key.into_iter().collect();
let dst = self.table_data(dst_id, dst_in_root)?.ok_or_else(|| {
CozoError::LogicError("Getting dst failed".to_string())
@ -197,7 +197,7 @@ impl<'a> Session<'a> {
)?
.extract_named_tuple()
.ok_or_else(|| CozoError::LogicError("Corrupt data".to_string()))?;
let dst_key_typing = dst_key.into_iter().map(|(_, v)| v).collect();
let dst_key_typing = dst_key.into_iter().collect();
let in_root = tpl.get_bool(0).ok_or_else(|| {
CozoError::LogicError("Cannot extract in root".to_string())

Loading…
Cancel
Save