|
|
@ -762,6 +762,7 @@ impl StoredRA {
|
|
|
|
left_iter: TupleIter<'a>,
|
|
|
|
left_iter: TupleIter<'a>,
|
|
|
|
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
|
|
|
|
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
|
|
|
|
eliminate_indices: BTreeSet<usize>,
|
|
|
|
eliminate_indices: BTreeSet<usize>,
|
|
|
|
|
|
|
|
left_tuple_len: usize,
|
|
|
|
) -> Result<TupleIter<'a>> {
|
|
|
|
) -> Result<TupleIter<'a>> {
|
|
|
|
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
|
|
|
|
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
|
|
|
|
right_invert_indices.sort_by_key(|(_, b)| **b);
|
|
|
|
right_invert_indices.sort_by_key(|(_, b)| **b);
|
|
|
@ -772,25 +773,44 @@ impl StoredRA {
|
|
|
|
|
|
|
|
|
|
|
|
let key_len = self.storage.metadata.keys.len();
|
|
|
|
let key_len = self.storage.metadata.keys.len();
|
|
|
|
if left_to_prefix_indices.len() >= key_len {
|
|
|
|
if left_to_prefix_indices.len() >= key_len {
|
|
|
|
|
|
|
|
let val_len = self.storage.metadata.non_keys.len();
|
|
|
|
|
|
|
|
let all_right_val_indices: BTreeSet<usize> =
|
|
|
|
|
|
|
|
(0..val_len).map(|i| left_tuple_len + key_len + i).collect();
|
|
|
|
|
|
|
|
let mut need_to_filter_values = false;
|
|
|
|
|
|
|
|
for f in self.filters.iter() {
|
|
|
|
|
|
|
|
let bind_indices = f.binding_indices();
|
|
|
|
|
|
|
|
if !all_right_val_indices.is_disjoint(&bind_indices) {
|
|
|
|
|
|
|
|
need_to_filter_values = true;
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
// We can use point lookups, finally
|
|
|
|
// We can use point lookups, finally
|
|
|
|
// TODO can use this branch if no values are touched!
|
|
|
|
// TODO can use this branch if no values are touched!
|
|
|
|
return if self.storage.metadata.non_keys.is_empty() {
|
|
|
|
return if !need_to_filter_values
|
|
|
|
|
|
|
|
&& eliminate_indices.is_superset(&all_right_val_indices)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
dbg!(left_tuple_len);
|
|
|
|
|
|
|
|
dbg!(key_len);
|
|
|
|
|
|
|
|
dbg!(val_len);
|
|
|
|
|
|
|
|
dbg!(&eliminate_indices);
|
|
|
|
let it = left_iter
|
|
|
|
let it = left_iter
|
|
|
|
.map_ok(move |tuple| -> Result<Option<Tuple>> {
|
|
|
|
.map_ok(move |tuple| -> Result<Option<Tuple>> {
|
|
|
|
let prefix = left_to_prefix_indices
|
|
|
|
let prefix = left_to_prefix_indices
|
|
|
|
.iter()
|
|
|
|
.iter()
|
|
|
|
.map(|i| tuple[*i].clone())
|
|
|
|
.map(|i| tuple[*i].clone())
|
|
|
|
.collect_vec();
|
|
|
|
.collect_vec();
|
|
|
|
let filters = self.filters.clone();
|
|
|
|
|
|
|
|
let key = &prefix[0..key_len];
|
|
|
|
let key = &prefix[0..key_len];
|
|
|
|
for p in filters.iter() {
|
|
|
|
for p in self.filters.iter() {
|
|
|
|
if !p.eval_pred(key)? {
|
|
|
|
if !p.eval_pred(key)? {
|
|
|
|
return Ok(None);
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if self.storage.exists(tx, key)? {
|
|
|
|
if self.storage.exists(tx, key)? {
|
|
|
|
let mut ret = tuple.clone();
|
|
|
|
let mut ret = tuple;
|
|
|
|
ret.extend_from_slice(key);
|
|
|
|
ret.extend_from_slice(key);
|
|
|
|
|
|
|
|
for _ in 0..val_len {
|
|
|
|
|
|
|
|
ret.push(DataValue::Bot);
|
|
|
|
|
|
|
|
}
|
|
|
|
Ok(Some(ret))
|
|
|
|
Ok(Some(ret))
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
Ok(None)
|
|
|
|
Ok(None)
|
|
|
@ -810,17 +830,16 @@ impl StoredRA {
|
|
|
|
.iter()
|
|
|
|
.iter()
|
|
|
|
.map(|i| tuple[*i].clone())
|
|
|
|
.map(|i| tuple[*i].clone())
|
|
|
|
.collect_vec();
|
|
|
|
.collect_vec();
|
|
|
|
let filters = self.filters.clone();
|
|
|
|
|
|
|
|
let key = &prefix[0..key_len];
|
|
|
|
let key = &prefix[0..key_len];
|
|
|
|
match self.storage.get(tx, key)? {
|
|
|
|
match self.storage.get(tx, key)? {
|
|
|
|
None => Ok(None),
|
|
|
|
None => Ok(None),
|
|
|
|
Some(found) => {
|
|
|
|
Some(found) => {
|
|
|
|
for p in filters.iter() {
|
|
|
|
for p in self.filters.iter() {
|
|
|
|
if !p.eval_pred(&found)? {
|
|
|
|
if !p.eval_pred(&found)? {
|
|
|
|
return Ok(None);
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let mut ret = tuple.clone();
|
|
|
|
let mut ret = tuple;
|
|
|
|
ret.extend(found);
|
|
|
|
ret.extend(found);
|
|
|
|
Ok(Some(ret))
|
|
|
|
Ok(Some(ret))
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -843,7 +862,6 @@ impl StoredRA {
|
|
|
|
.iter()
|
|
|
|
.iter()
|
|
|
|
.map(|i| tuple[*i].clone())
|
|
|
|
.map(|i| tuple[*i].clone())
|
|
|
|
.collect_vec();
|
|
|
|
.collect_vec();
|
|
|
|
let filters = self.filters.clone();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !skip_range_check && !self.filters.is_empty() {
|
|
|
|
if !skip_range_check && !self.filters.is_empty() {
|
|
|
|
let other_bindings = &self.bindings[right_join_indices.len()..];
|
|
|
|
let other_bindings = &self.bindings[right_join_indices.len()..];
|
|
|
@ -859,7 +877,7 @@ impl StoredRA {
|
|
|
|
.scan_bounded_prefix(tx, &prefix, &l_bound, &u_bound)
|
|
|
|
.scan_bounded_prefix(tx, &prefix, &l_bound, &u_bound)
|
|
|
|
.map(move |res_found| -> Result<Option<Tuple>> {
|
|
|
|
.map(move |res_found| -> Result<Option<Tuple>> {
|
|
|
|
let found = res_found?;
|
|
|
|
let found = res_found?;
|
|
|
|
for p in filters.iter() {
|
|
|
|
for p in self.filters.iter() {
|
|
|
|
if !p.eval_pred(&found)? {
|
|
|
|
if !p.eval_pred(&found)? {
|
|
|
|
return Ok(None);
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -878,7 +896,7 @@ impl StoredRA {
|
|
|
|
.scan_prefix(tx, &prefix)
|
|
|
|
.scan_prefix(tx, &prefix)
|
|
|
|
.map(move |res_found| -> Result<Option<Tuple>> {
|
|
|
|
.map(move |res_found| -> Result<Option<Tuple>> {
|
|
|
|
let found = res_found?;
|
|
|
|
let found = res_found?;
|
|
|
|
for p in filters.iter() {
|
|
|
|
for p in self.filters.iter() {
|
|
|
|
if !p.eval_pred(&found)? {
|
|
|
|
if !p.eval_pred(&found)? {
|
|
|
|
return Ok(None);
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -1200,8 +1218,6 @@ impl InMemRelationRA {
|
|
|
|
.map(|i| tuple[*i].clone())
|
|
|
|
.map(|i| tuple[*i].clone())
|
|
|
|
.collect_vec();
|
|
|
|
.collect_vec();
|
|
|
|
|
|
|
|
|
|
|
|
let filters = self.filters.clone();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !skip_range_check && !self.filters.is_empty() {
|
|
|
|
if !skip_range_check && !self.filters.is_empty() {
|
|
|
|
let other_bindings = &self.bindings[right_join_indices.len()..];
|
|
|
|
let other_bindings = &self.bindings[right_join_indices.len()..];
|
|
|
|
let (l_bound, u_bound) = match compute_bounds(&self.filters, other_bindings) {
|
|
|
|
let (l_bound, u_bound) = match compute_bounds(&self.filters, other_bindings) {
|
|
|
@ -1218,7 +1234,7 @@ impl InMemRelationRA {
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.map(move |res_found| -> Result<Option<Tuple>> {
|
|
|
|
.map(move |res_found| -> Result<Option<Tuple>> {
|
|
|
|
let found = res_found?;
|
|
|
|
let found = res_found?;
|
|
|
|
for p in filters.iter() {
|
|
|
|
for p in self.filters.iter() {
|
|
|
|
if !p.eval_pred(&found)? {
|
|
|
|
if !p.eval_pred(&found)? {
|
|
|
|
return Ok(None);
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -1237,7 +1253,7 @@ impl InMemRelationRA {
|
|
|
|
.scan_prefix_for_epoch(&prefix, scan_epoch)
|
|
|
|
.scan_prefix_for_epoch(&prefix, scan_epoch)
|
|
|
|
.map(move |res_found| -> Result<Option<Tuple>> {
|
|
|
|
.map(move |res_found| -> Result<Option<Tuple>> {
|
|
|
|
let found = res_found?;
|
|
|
|
let found = res_found?;
|
|
|
|
for p in filters.iter() {
|
|
|
|
for p in self.filters.iter() {
|
|
|
|
if !p.eval_pred(&found)? {
|
|
|
|
if !p.eval_pred(&found)? {
|
|
|
|
return Ok(None);
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -1620,11 +1636,13 @@ impl InnerJoin {
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
.unwrap();
|
|
|
|
if join_is_prefix(&join_indices.1) {
|
|
|
|
if join_is_prefix(&join_indices.1) {
|
|
|
|
|
|
|
|
let left_len = self.left.bindings_after_eliminate().len();
|
|
|
|
r.prefix_join(
|
|
|
|
r.prefix_join(
|
|
|
|
tx,
|
|
|
|
tx,
|
|
|
|
self.left.iter(tx, epoch, use_delta)?,
|
|
|
|
self.left.iter(tx, epoch, use_delta)?,
|
|
|
|
join_indices,
|
|
|
|
join_indices,
|
|
|
|
eliminate_indices,
|
|
|
|
eliminate_indices,
|
|
|
|
|
|
|
|
left_len,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
self.materialized_join(tx, eliminate_indices, epoch, use_delta)
|
|
|
|
self.materialized_join(tx, eliminate_indices, epoch, use_delta)
|
|
|
|