main
Ziyang Hu 2 years ago
parent 2e32e1a754
commit 860d973c12

@ -10,7 +10,7 @@
* [x] duplicate symbols in rule heads
* [x] limit, offset
* [x] public API
* [ ] sorting
* [x] sorting
* [ ] range scan
comparators can have problems when sorting mixed integers and floats

@ -107,12 +107,22 @@ fn aggr_max(accum: &DataValue, current: &DataValue) -> Result<DataValue> {
}
}
define_aggr!(AGGR_CHOICE, true);
fn aggr_choice(accum: &DataValue, current: &DataValue) -> Result<DataValue> {
Ok(if *accum == DataValue::Bottom {
current.clone()
} else {
accum.clone()
})
}
pub(crate) fn get_aggr(name: &str) -> Option<&'static Aggregation> {
Some(match name {
"Count" => &AGGR_COUNT,
"Sum" => &AGGR_SUM,
"Min" => &AGGR_MIN,
"Max" => &AGGR_MAX,
"Choice" => &AGGR_CHOICE,
_ => return None,
})
}

@ -111,7 +111,7 @@ impl SessionTx {
.map(|v| v as usize)
.ok_or_else(|| anyhow!("'offset' must be a positive number"))
}))?;
let sorters = payload
let mut sorters: Vec<_> = payload
.get("sort")
.unwrap_or(&json!([]))
.as_array()
@ -127,9 +127,39 @@ impl SessionTx {
);
let (k, v) = sorter.iter().next().unwrap();
todo!()
let k = Symbol::from(k as &str);
let d = SortDir::try_from(v)?;
Ok((k, d))
})
.try_collect()?;
if !sorters.is_empty() {
let entry = input_prog
.prog
.get(&PROG_ENTRY)
.ok_or_else(|| anyhow!("program entry point not found"))?;
ensure!(
entry.iter().map(|e| &e.head).all_equal(),
"program entry point must have equal bindings"
);
let entry_head = &entry[0].head;
if sorters
.iter()
.map(|(k, _v)| k)
.eq(entry_head.iter().take(sorters.len()))
{
if sorters.iter().all(|(_k, v)| *v == SortDir::Asc) {
sorters = vec![];
}
}
if !sorters.is_empty() {
let head_symbols: BTreeSet<_> = entry_head.iter().collect();
for (k, _) in sorters.iter() {
if !head_symbols.contains(k) {
bail!("sorted argument {} not found in program entry head", k);
}
}
}
}
Ok((
input_prog,
QueryOutOptions {

@ -6,4 +6,5 @@ pub(crate) mod magic;
pub(crate) mod pull;
pub(crate) mod relation;
pub(crate) mod reorder;
pub(crate) mod stratify;
pub(crate) mod stratify;
pub(crate) mod sort;

@ -14,10 +14,10 @@ use crate::data::id::{AttrId, EntityId, Validity};
use crate::data::json::JsonValue;
use crate::data::symb::Symbol;
use crate::data::triple::StoreOp;
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::parse::query::QueryOutOptions;
use crate::query::relation::flatten_err;
use crate::runtime::temp_store::TempStore;
use crate::runtime::transact::SessionTx;
pub(crate) type PullSpecs = Vec<PullSpec>;
@ -84,14 +84,14 @@ impl CurrentPath {
}
impl SessionTx {
pub(crate) fn run_pull_on_query_results(
&mut self,
res_store: TempStore,
pub(crate) fn run_pull_on_query_results<'a>(
&'a mut self,
res_iter: impl Iterator<Item = Result<Tuple>> + 'a,
out_opts: QueryOutOptions,
) -> Result<QueryResult<'_>> {
) -> Result<QueryResult<'a>> {
let out_iter = match out_opts.offset {
None => Left(res_store.scan_all()),
Some(n) => Right(res_store.scan_all().skip(n)),
None => Left(res_iter),
Some(n) => Right(res_iter.skip(n)),
};
match out_opts.out_spec {
None => Ok(Box::new(out_iter.map_ok(|tuple| {

@ -0,0 +1,47 @@
use std::cmp::Reverse;
use std::collections::BTreeMap;
use anyhow::Result;
use itertools::Itertools;
use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::parse::query::SortDir;
use crate::runtime::temp_store::TempStore;
use crate::runtime::transact::SessionTx;
impl SessionTx {
pub(crate) fn sort_and_collect(
&mut self,
original: TempStore,
sorters: &[(Symbol, SortDir)],
head: &[Symbol],
) -> Result<TempStore> {
let head_indices: BTreeMap<_, _> = head.iter().enumerate().map(|(i, k)| (k, i)).collect();
let idx_sorters = sorters
.iter()
.map(|(k, dir)| (head_indices[k], *dir))
.collect_vec();
let ret = self.new_temp_store();
for tuple in original.scan_all() {
let tuple = tuple?;
let key = Tuple(
idx_sorters
.iter()
.map(|(idx, dir)| {
let mut val = tuple.0[*idx].clone();
if *dir == SortDir::Dsc {
val = DataValue::DescVal(Reverse(Box::new(val)));
}
val
})
.collect_vec(),
);
let encoded_key = key.encode_as_key_for_epoch(ret.id, 0);
let encoded_val = tuple.encode_as_key_for_epoch(ret.id, 0);
ret.db.put(&encoded_key, &encoded_val)?;
}
Ok(ret)
}
}

@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use anyhow::Result;
use either::{Left, Right};
use itertools::Itertools;
use serde_json::json;
use uuid::Uuid;
@ -17,6 +18,7 @@ use crate::data::encode::{
};
use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::json::JsonValue;
use crate::data::symb::PROG_ENTRY;
use crate::data::triple::StoreOp;
use crate::data::tuple::{rusty_scratch_cmp, SCRATCH_DB_KEY_PREFIX_LEN};
use crate::data::value::DataValue;
@ -281,16 +283,43 @@ impl Db {
pub fn run_query(&self, payload: &JsonValue) -> Result<JsonValue> {
let mut tx = self.transact()?;
let (input_program, out_opts) = tx.parse_query(payload)?;
let entry_head = &input_program.prog.get(&PROG_ENTRY).unwrap()[0].head.clone();
let program = input_program
.to_normalized_program()?
.stratify()?
.magic_sets_rewrite();
let (compiled, stores) = tx.stratified_magic_compile(&program)?;
let result = tx.stratified_magic_evaluate(&compiled, &stores, out_opts.num_to_take())?;
let ret: Vec<_> = tx
.run_pull_on_query_results(result, out_opts)?
.try_collect()?;
Ok(json!(ret))
let result = tx.stratified_magic_evaluate(
&compiled,
&stores,
if out_opts.sorters.is_empty() {
out_opts.num_to_take()
} else {
None
},
)?;
if !out_opts.sorters.is_empty() {
let sorted_result = tx.sort_and_collect(result, &out_opts.sorters, entry_head)?;
let sorted_iter = if let Some(offset) = out_opts.offset {
Left(sorted_result.scan_sorted().skip(offset))
} else {
Right(sorted_result.scan_sorted())
};
let sorted_iter = if let Some(limit) = out_opts.limit {
Left(sorted_iter.take(limit))
} else {
Right(sorted_iter)
};
let ret: Vec<_> = tx
.run_pull_on_query_results(sorted_iter, out_opts)?
.try_collect()?;
Ok(json!(ret))
} else {
let ret: Vec<_> = tx
.run_pull_on_query_results(result.scan_all(), out_opts)?
.try_collect()?;
Ok(json!(ret))
}
}
pub fn explain_query(&self, payload: &JsonValue) -> Result<JsonValue> {
let mut tx = self.transact()?;

@ -43,7 +43,7 @@ impl TempStore {
tuple: &Tuple,
aggrs: &[Option<Aggregation>],
epoch: u32,
) -> anyhow::Result<bool> {
) -> Result<bool> {
let key = Tuple(
aggrs
.iter()
@ -149,7 +149,7 @@ impl TempStore {
&self,
aggrs: &[Option<Aggregation>],
store: &TempStore,
mut limiter: Option<&mut QueryLimiter>
mut limiter: Option<&mut QueryLimiter>,
) -> Result<bool> {
let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, 0);
let mut it = self
@ -235,10 +235,7 @@ impl TempStore {
Ok(false)
}
pub(crate) fn scan_all_for_epoch(
&self,
epoch: u32,
) -> impl Iterator<Item = anyhow::Result<Tuple>> {
pub(crate) fn scan_all_for_epoch(&self, epoch: u32) -> impl Iterator<Item = Result<Tuple>> {
let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, epoch);
let mut it = self
.db
@ -249,20 +246,28 @@ impl TempStore {
it.seek(&lower);
TempStoreIter { it, started: false }
}
pub(crate) fn scan_all(&self) -> impl Iterator<Item = anyhow::Result<Tuple>> {
pub(crate) fn scan_all(&self) -> impl Iterator<Item = Result<Tuple>> {
self.scan_all_for_epoch(0)
}
pub(crate) fn scan_prefix(
&self,
prefix: &Tuple,
) -> impl Iterator<Item = anyhow::Result<Tuple>> {
pub(crate) fn scan_sorted(&self) -> impl Iterator<Item = Result<Tuple>> {
let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, 0);
let mut it = self
.db
.iterator()
.upper_bound(&upper)
.prefix_same_as_start(true)
.start();
it.seek(&lower);
SortedIter { it, started: false }
}
pub(crate) fn scan_prefix(&self, prefix: &Tuple) -> impl Iterator<Item = Result<Tuple>> {
self.scan_prefix_for_epoch(prefix, 0)
}
pub(crate) fn scan_prefix_for_epoch(
&self,
prefix: &Tuple,
epoch: u32,
) -> impl Iterator<Item = anyhow::Result<Tuple>> {
) -> impl Iterator<Item = Result<Tuple>> {
let mut upper = prefix.0.clone();
upper.push(DataValue::Bottom);
let upper = Tuple(upper);
@ -279,13 +284,37 @@ impl TempStore {
}
}
struct SortedIter {
it: DbIter,
started: bool,
}
impl Iterator for SortedIter {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {
if !self.started {
self.started = true;
} else {
self.it.next();
}
match self.it.pair() {
Err(e) => Some(Err(e.into())),
Ok(None) => None,
Ok(Some((_, v_slice))) => match EncodedTuple(v_slice).decode() {
Ok(res) => Some(Ok(res)),
Err(e) => Some(Err(e)),
},
}
}
}
struct TempStoreIter {
it: DbIter,
started: bool,
}
impl Iterator for TempStoreIter {
type Item = anyhow::Result<Tuple>;
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {
if !self.started {

@ -151,7 +151,7 @@ fn creation() {
},
{
"rule": "?",
"args": [["?a"],
"args": [["?a", "?n"],
["?alice", "person.first_name", "Alice"],
// {"rule": "friend_of_friend", "args": ["?alice", "?a"]},
{"not_exists": {"rule": "friend_of_friend", "args": ["?alice", "?a"]}},
@ -160,6 +160,7 @@ fn creation() {
}
],
"out": {"friend": {"pull": "?a", "spec": ["person.first_name"]}},
"sort": [{"?n": "desc"}],
"limit": 1,
});
let ret = db.run_query(&query).unwrap();

Loading…
Cancel
Save