restore some cases of prefix join in neg join

main
Ziyang Hu 2 years ago
parent 115873e61a
commit c5ac7d36e1

@ -756,24 +756,71 @@ impl RelationRA {
left_to_prefix_indices.push(left_join_indices[*idx]); left_to_prefix_indices.push(left_join_indices[*idx]);
} }
// TODO don't build the whole thing is prefix scan suffices if join_is_prefix(&right_join_indices) {
Ok(Box::new(
left_iter
.map_ok(move |tuple| -> Result<Option<Tuple>> {
let prefix = Tuple(
left_to_prefix_indices
.iter()
.map(|i| tuple.0[*i].clone())
.collect_vec(),
);
'outer: for found in self.storage.scan_prefix(tx, &prefix) {
let found = found?;
for (left_idx, right_idx) in
left_join_indices.iter().zip(right_join_indices.iter())
{
if tuple.0[*left_idx] != found.0[*right_idx] {
continue 'outer;
}
}
return Ok(None);
}
Ok(Some(if !eliminate_indices.is_empty() {
Tuple(
tuple
.0
.into_iter()
.enumerate()
.filter_map(|(i, v)| {
if eliminate_indices.contains(&i) {
None
} else {
Some(v)
}
})
.collect_vec(),
)
} else {
tuple
}))
})
.map(flatten_err)
.filter_map(invert_option_err),
))
} else {
let mut right_join_vals = BTreeSet::new(); let mut right_join_vals = BTreeSet::new();
for tuple in self.storage.scan_all(tx) { for tuple in self.storage.scan_all(tx) {
let tuple = tuple?; let tuple = tuple?;
let to_join: Box<[DataValue]> = right_join_indices.iter().map(|i| { let to_join: Box<[DataValue]> = right_join_indices
tuple.0[*i].clone() .iter()
}).collect(); .map(|i| tuple.0[*i].clone())
.collect();
right_join_vals.insert(to_join); right_join_vals.insert(to_join);
} }
Ok(Box::new( Ok(Box::new(
left_iter left_iter
.map_ok(move |tuple| -> Result<Option<Tuple>> { .map_ok(move |tuple| -> Result<Option<Tuple>> {
let left_join_vals: Box<[DataValue]> = left_join_indices.iter().map(|i| { let left_join_vals: Box<[DataValue]> = left_join_indices
tuple.0[*i].clone() .iter()
}).collect(); .map(|i| tuple.0[*i].clone())
.collect();
if right_join_vals.contains(&left_join_vals) { if right_join_vals.contains(&left_join_vals) {
return Ok(None) return Ok(None);
} }
Ok(Some(if !eliminate_indices.is_empty() { Ok(Some(if !eliminate_indices.is_empty() {
@ -799,6 +846,7 @@ impl RelationRA {
.filter_map(invert_option_err), .filter_map(invert_option_err),
)) ))
} }
}
fn iter(&self, tx: &SessionTx) -> Result<TupleIter<'_>> { fn iter(&self, tx: &SessionTx) -> Result<TupleIter<'_>> {
let it = self.storage.scan_all(tx); let it = self.storage.scan_all(tx);
@ -808,12 +856,13 @@ impl RelationRA {
Box::new(filter_iter(self.filters.clone(), it)) Box::new(filter_iter(self.filters.clone(), it))
}) })
} }
fn join_is_prefix(&self, right_join_indices: &[usize]) -> bool { }
fn join_is_prefix(right_join_indices: &[usize]) -> bool {
let mut indices = right_join_indices.to_vec(); let mut indices = right_join_indices.to_vec();
indices.sort(); indices.sort();
let l = indices.len(); let l = indices.len();
indices.into_iter().eq(0..l) indices.into_iter().eq(0..l)
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -900,24 +949,71 @@ impl StoredRelationRA {
} }
left_to_prefix_indices.push(left_join_indices[*idx]); left_to_prefix_indices.push(left_join_indices[*idx]);
} }
if join_is_prefix(&right_join_indices) {
Ok(Box::new(
left_iter
.map_ok(move |tuple| -> Result<Option<Tuple>> {
let prefix = Tuple(
left_to_prefix_indices
.iter()
.map(|i| tuple.0[*i].clone())
.collect_vec(),
);
'outer: for found in self.storage.scan_prefix(&prefix) {
let found = found?;
for (left_idx, right_idx) in
left_join_indices.iter().zip(right_join_indices.iter())
{
if tuple.0[*left_idx] != found.0[*right_idx] {
continue 'outer;
}
}
return Ok(None);
}
Ok(Some(if !eliminate_indices.is_empty() {
Tuple(
tuple
.0
.into_iter()
.enumerate()
.filter_map(|(i, v)| {
if eliminate_indices.contains(&i) {
None
} else {
Some(v)
}
})
.collect_vec(),
)
} else {
tuple
}))
})
.map(flatten_err)
.filter_map(invert_option_err),
))
} else {
let mut right_join_vals = BTreeSet::new(); let mut right_join_vals = BTreeSet::new();
for tuple in self.storage.scan_all() { for tuple in self.storage.scan_all() {
let tuple = tuple?; let tuple = tuple?;
let to_join: Box<[DataValue]> = right_join_indices.iter().map(|i| { let to_join: Box<[DataValue]> = right_join_indices
tuple.0[*i].clone() .iter()
}).collect(); .map(|i| tuple.0[*i].clone())
.collect();
right_join_vals.insert(to_join); right_join_vals.insert(to_join);
} }
Ok(Box::new( Ok(Box::new(
left_iter left_iter
.map_ok(move |tuple| -> Result<Option<Tuple>> { .map_ok(move |tuple| -> Result<Option<Tuple>> {
let left_join_vals: Box<[DataValue]> = left_join_indices.iter().map(|i| { let left_join_vals: Box<[DataValue]> = left_join_indices
tuple.0[*i].clone() .iter()
}).collect(); .map(|i| tuple.0[*i].clone())
.collect();
if right_join_vals.contains(&left_join_vals) { if right_join_vals.contains(&left_join_vals) {
return Ok(None) return Ok(None);
} }
Ok(Some(if !eliminate_indices.is_empty() { Ok(Some(if !eliminate_indices.is_empty() {
Tuple( Tuple(
@ -942,6 +1038,7 @@ impl StoredRelationRA {
.filter_map(invert_option_err), .filter_map(invert_option_err),
)) ))
} }
}
fn prefix_join<'a>( fn prefix_join<'a>(
&'a self, &'a self,
left_iter: TupleIter<'a>, left_iter: TupleIter<'a>,
@ -1305,7 +1402,7 @@ impl InnerJoin {
&self.right.bindings_after_eliminate(), &self.right.bindings_after_eliminate(),
) )
.unwrap(); .unwrap();
if r.join_is_prefix(&join_indices.1) { if join_is_prefix(&join_indices.1) {
r.prefix_join( r.prefix_join(
tx, tx,
self.left.iter(tx, epoch, use_delta)?, self.left.iter(tx, epoch, use_delta)?,

Loading…
Cancel
Save