limit and offset

main
Ziyang Hu 2 years ago
parent 8612cbabfa
commit 2f3b31341c

@ -8,9 +8,9 @@
* [x] unification
* [x] aggregation
* [x] duplicate symbols in rule heads
* [ ] limit, offset
* [ ] range scan
* [ ] public API
* [x] limit, offset
* [x] public API
* [ ] sorting
* [ ] range scan
comparators can have problems when sorting mixed integers and floats

@ -14,10 +14,14 @@ class CozoDb:
def tx(self, payload):
return json.loads(self.inner.transact_triples(json.dumps({'tx': payload}, ensure_ascii=False)))
def run(self, q, out=None):
def run(self, q, out=None, limit=None, offset=None):
payload = {'q': q}
if out is not None:
payload['out'] = out
if limit is not None:
payload['limit'] = limit
if offset is not None:
payload['offset'] = offset
return json.loads(self.inner.run_query(json.dumps(payload, ensure_ascii=False)))

@ -190,6 +190,13 @@ impl MagicSymbol {
pub(crate) fn has_bound_adornment(&self) -> bool {
self.magic_adornment().iter().any(|b| *b)
}
pub(crate) fn is_prog_entry(&self) -> bool {
if let MagicSymbol::Muggle { inner } = self {
inner.is_prog_entry()
} else {
false
}
}
}
#[derive(Debug, Clone)]

@ -22,11 +22,28 @@ use crate::utils::swap_result_option;
pub(crate) type OutSpec = (Vec<(usize, Option<PullSpecs>)>, Option<Vec<String>>);
pub(crate) struct QueryOutOptions {
pub(crate) out_spec: Option<OutSpec>,
pub(crate) vld: Validity,
pub(crate) limit: Option<usize>,
pub(crate) offset: Option<usize>,
}
impl QueryOutOptions {
pub(crate) fn num_to_take(&self) -> Option<usize> {
match (self.limit, self.offset) {
(None, _) => None,
(Some(i), None) => Some(i),
(Some(i), Some(j)) => Some(i + j),
}
}
}
impl SessionTx {
pub(crate) fn parse_query(
&mut self,
payload: &JsonValue,
) -> Result<(InputProgram, Option<OutSpec>, Validity)> {
) -> Result<(InputProgram, QueryOutOptions)> {
let vld = match payload.get("since") {
None => Validity::current(),
Some(v) => Validity::try_from(v)?,
@ -55,7 +72,25 @@ impl SessionTx {
.get("out")
.map(|spec| self.parse_query_out_spec(spec, entry_bindings));
let out_spec = swap_result_option(out_spec)?;
Ok((input_prog, out_spec, vld))
let limit = swap_result_option(payload.get("limit").map(|v| {
v.as_u64()
.map(|v| v as usize)
.ok_or_else(|| anyhow!("'limit' must be a positive number"))
}))?;
let offset = swap_result_option(payload.get("offset").map(|v| {
v.as_u64()
.map(|v| v as usize)
.ok_or_else(|| anyhow!("'offset' must be a positive number"))
}))?;
Ok((
input_prog,
QueryOutOptions {
out_spec,
vld,
limit,
offset,
},
))
}
fn parse_query_out_spec(
&mut self,

@ -10,11 +10,28 @@ use crate::query::compile::{AggrKind, CompiledProgram};
use crate::runtime::temp_store::TempStore;
use crate::runtime::transact::SessionTx;
pub(crate) struct QueryLimiter {
limit: Option<usize>,
counter: usize,
}
impl QueryLimiter {
pub(crate) fn incr(&mut self) -> bool {
if let Some(limit) = self.limit {
self.counter += 1;
self.counter >= limit
} else {
false
}
}
}
impl SessionTx {
pub(crate) fn stratified_magic_evaluate(
&mut self,
strata: &[CompiledProgram],
stores: &BTreeMap<MagicSymbol, TempStore>,
num_to_take: Option<usize>,
) -> Result<TempStore> {
let ret_area = stores
.get(&MagicSymbol::Muggle {
@ -25,7 +42,7 @@ impl SessionTx {
for (idx, cur_prog) in strata.iter().enumerate() {
debug!("stratum {}", idx);
self.semi_naive_magic_evaluate(cur_prog, &stores)?;
self.semi_naive_magic_evaluate(cur_prog, &stores, num_to_take)?;
}
Ok(ret_area)
}
@ -33,6 +50,7 @@ impl SessionTx {
&mut self,
prog: &CompiledProgram,
stores: &BTreeMap<MagicSymbol, TempStore>,
num_to_take: Option<usize>,
) -> Result<()> {
if log_enabled!(Level::Debug) {
for (k, vs) in prog.iter() {
@ -44,6 +62,10 @@ impl SessionTx {
let mut changed: BTreeMap<_, _> = prog.keys().map(|k| (k, false)).collect();
let mut prev_changed = changed.clone();
let mut limiter = QueryLimiter {
limit: num_to_take,
counter: 0,
};
for epoch in 0u32.. {
debug!("epoch {}", epoch);
@ -52,6 +74,7 @@ impl SessionTx {
let aggr_kind = ruleset.aggr_kind();
let store = stores.get(k).unwrap();
let use_delta = BTreeSet::default();
let should_check_limit = num_to_take.is_some() && k.is_prog_entry();
match aggr_kind {
AggrKind::None | AggrKind::Meet => {
let is_meet = aggr_kind == AggrKind::Meet;
@ -63,7 +86,17 @@ impl SessionTx {
if is_meet {
store.aggr_meet_put(&item, &rule.aggr, 0)?;
} else {
store.put(&item, 0)?;
if should_check_limit {
if !store.exists(&item, 0)? {
store.put(&item, 0)?;
if limiter.incr() {
trace!("early stopping due to result count limit exceeded");
return Ok(());
}
}
} else {
store.put(&item, 0)?;
}
}
*changed.get_mut(k).unwrap() = true;
}
@ -86,12 +119,32 @@ impl SessionTx {
if rule_is_aggr {
store_to_use.normal_aggr_put(&item, &rule.aggr, serial)?;
} else {
store_to_use.put(&item, 0)?;
if should_check_limit {
if !store.exists(&item, 0)? {
store.put(&item, 0)?;
if limiter.incr() {
trace!("early stopping due to result count limit exceeded");
return Ok(());
}
}
} else {
store_to_use.put(&item, 0)?;
}
}
*changed.get_mut(k).unwrap() = true;
}
if rule_is_aggr {
store_to_use.normal_aggr_scan_and_put(&rule.aggr, store)?;
if store_to_use.normal_aggr_scan_and_put(
&rule.aggr,
store,
if should_check_limit {
Some(&mut limiter)
} else {
None
},
)? {
return Ok(());
}
}
}
}
@ -105,6 +158,7 @@ impl SessionTx {
for (k, ruleset) in prog.iter() {
let store = stores.get(k).unwrap();
let should_check_limit = num_to_take.is_some() && k.is_prog_entry();
for (rule_n, rule) in ruleset.rules.iter().enumerate() {
let mut should_do_calculation = false;
for d_rule in &rule.contained_rules {
@ -160,6 +214,12 @@ impl SessionTx {
*changed.get_mut(k).unwrap() = true;
store.put(&item, epoch)?;
store.put(&item, 0)?;
if should_check_limit {
if limiter.incr() {
trace!("early stopping due to result count limit exceeded");
return Ok(());
}
}
}
}
}

@ -1,6 +1,7 @@
use std::collections::{BTreeSet, HashSet};
use anyhow::Result;
use either::{Left, Right};
use itertools::Itertools;
use serde_json::{json, Map};
use smallvec::{smallvec, SmallVec, ToSmallVec};
@ -14,7 +15,7 @@ use crate::data::json::JsonValue;
use crate::data::symb::Symbol;
use crate::data::triple::StoreOp;
use crate::data::value::DataValue;
use crate::parse::query::OutSpec;
use crate::parse::query::QueryOutOptions;
use crate::query::relation::flatten_err;
use crate::runtime::temp_store::TempStore;
use crate::runtime::transact::SessionTx;
@ -86,18 +87,20 @@ impl SessionTx {
pub(crate) fn run_pull_on_query_results(
&mut self,
res_store: TempStore,
out_spec: Option<OutSpec>,
vld: Validity,
out_opts: QueryOutOptions,
) -> Result<QueryResult<'_>> {
match out_spec {
None => Ok(Box::new(res_store.scan_all().map_ok(|tuple| {
let out_iter = match out_opts.offset {
None => Left(res_store.scan_all()),
Some(n) => Right(res_store.scan_all().skip(n)),
};
match out_opts.out_spec {
None => Ok(Box::new(out_iter.map_ok(|tuple| {
JsonValue::Array(tuple.0.into_iter().map(JsonValue::from).collect_vec())
}))),
Some((pull_specs, out_keys)) => {
// type OutSpec = (Vec<(usize, Option<PullSpecs>)>, Option<Vec<String>>);
Ok(Box::new(
res_store
.scan_all()
out_iter
.map_ok(move |tuple| -> Result<JsonValue> {
let tuple = tuple.0;
let res_iter =
@ -115,7 +118,7 @@ impl SessionTx {
for (idx, spec) in specs.iter().enumerate() {
self.pull(
eid,
vld,
out_opts.vld,
spec,
0,
&specs,

@ -280,21 +280,21 @@ impl Db {
}
pub fn run_query(&self, payload: &JsonValue) -> Result<JsonValue> {
let mut tx = self.transact()?;
let (input_program, out_spec, vld) = tx.parse_query(payload)?;
let (input_program, out_opts) = tx.parse_query(payload)?;
let program = input_program
.to_normalized_program()?
.stratify()?
.magic_sets_rewrite();
let (compiled, stores) = tx.stratified_magic_compile(&program)?;
let result = tx.stratified_magic_evaluate(&compiled, &stores)?;
let result = tx.stratified_magic_evaluate(&compiled, &stores, out_opts.num_to_take())?;
let ret: Vec<_> = tx
.run_pull_on_query_results(result, out_spec, vld)?
.run_pull_on_query_results(result, out_opts)?
.try_collect()?;
Ok(json!(ret))
}
pub fn explain_query(&self, payload: &JsonValue) -> Result<JsonValue> {
let mut tx = self.transact()?;
let (input_program, out_spec, vld) = tx.parse_query(payload)?;
let (input_program, out_opts) = tx.parse_query(payload)?;
let normalized_program = input_program.to_normalized_program()?;
let stratified_program = normalized_program.stratify()?;
let magic_program = stratified_program.magic_sets_rewrite();

@ -1,3 +1,4 @@
use std::borrow::BorrowMut;
use std::fmt::{Debug, Formatter};
use anyhow::Result;
@ -10,6 +11,7 @@ use crate::data::aggr::Aggregation;
use crate::data::program::MagicSymbol;
use crate::data::tuple::{EncodedTuple, Tuple};
use crate::data::value::DataValue;
use crate::query::eval::QueryLimiter;
use crate::utils::swap_result_option;
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
@ -147,7 +149,8 @@ impl TempStore {
&self,
aggrs: &[Option<Aggregation>],
store: &TempStore,
) -> Result<()> {
mut limiter: Option<&mut QueryLimiter>
) -> Result<bool> {
let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, 0);
let mut it = self
.db
@ -214,12 +217,22 @@ impl TempStore {
}
}
}
store.put(&Tuple(aggr_res), 0)?;
let res_tpl = Tuple(aggr_res);
if let Some(lmt) = limiter.borrow_mut() {
if !store.exists(&res_tpl, 0)? {
store.put(&res_tpl, 0)?;
if lmt.incr() {
return Ok(true);
}
}
} else {
store.put(&res_tpl, 0)?;
}
} else {
return group.into_iter().next().unwrap().map(|_| ());
return group.into_iter().next().unwrap().map(|_| true);
}
}
Ok(())
Ok(false)
}
pub(crate) fn scan_all_for_epoch(

@ -159,7 +159,8 @@ fn creation() {
]
}
],
"out": {"friend": {"pull": "?a", "spec": ["person.first_name"]}}
"out": {"friend": {"pull": "?a", "spec": ["person.first_name"]}},
"limit": 1,
});
let ret = db.run_query(&query).unwrap();
let res = to_string_pretty(&ret).unwrap();

Loading…
Cancel
Save