From 2f3b31341cd64192290a004631fc47a17e62da6e Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Fri, 5 Aug 2022 16:21:23 +0800 Subject: [PATCH] limit and offset --- README.md | 6 ++-- cozopy/cozo.py | 6 +++- src/data/program.rs | 7 ++++ src/parse/query.rs | 39 ++++++++++++++++++++-- src/query/eval.rs | 68 ++++++++++++++++++++++++++++++++++++--- src/query/pull.rs | 19 ++++++----- src/runtime/db.rs | 8 ++--- src/runtime/temp_store.rs | 21 +++++++++--- tests/creation.rs | 3 +- 9 files changed, 150 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index fcdc4042..de1b237c 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,9 @@ * [x] unification * [x] aggregation * [x] duplicate symbols in rule heads -* [ ] limit, offset -* [ ] range scan -* [ ] public API +* [x] limit, offset +* [x] public API * [ ] sorting +* [ ] range scan comparators can have problems when sorting mixed integers and floats diff --git a/cozopy/cozo.py b/cozopy/cozo.py index 795da0f7..1d295d0a 100644 --- a/cozopy/cozo.py +++ b/cozopy/cozo.py @@ -14,10 +14,14 @@ class CozoDb: def tx(self, payload): return json.loads(self.inner.transact_triples(json.dumps({'tx': payload}, ensure_ascii=False))) - def run(self, q, out=None): + def run(self, q, out=None, limit=None, offset=None): payload = {'q': q} if out is not None: payload['out'] = out + if limit is not None: + payload['limit'] = limit + if offset is not None: + payload['offset'] = offset return json.loads(self.inner.run_query(json.dumps(payload, ensure_ascii=False))) diff --git a/src/data/program.rs b/src/data/program.rs index c8427780..a3a093c1 100644 --- a/src/data/program.rs +++ b/src/data/program.rs @@ -190,6 +190,13 @@ impl MagicSymbol { pub(crate) fn has_bound_adornment(&self) -> bool { self.magic_adornment().iter().any(|b| *b) } + pub(crate) fn is_prog_entry(&self) -> bool { + if let MagicSymbol::Muggle { inner } = self { + inner.is_prog_entry() + } else { + false + } + } } #[derive(Debug, Clone)] diff --git a/src/parse/query.rs b/src/parse/query.rs index 41637435..4f9c911c 100644 --- a/src/parse/query.rs +++ b/src/parse/query.rs @@ -22,11 +22,28 @@ use crate::utils::swap_result_option; pub(crate) type OutSpec = (Vec<(usize, Option)>, Option>); +pub(crate) struct QueryOutOptions { + pub(crate) out_spec: Option, + pub(crate) vld: Validity, + pub(crate) limit: Option, + pub(crate) offset: Option, +} + +impl QueryOutOptions { + pub(crate) fn num_to_take(&self) -> Option { + match (self.limit, self.offset) { + (None, _) => None, + (Some(i), None) => Some(i), + (Some(i), Some(j)) => Some(i + j), + } + } +} + impl SessionTx { pub(crate) fn parse_query( &mut self, payload: &JsonValue, - ) -> Result<(InputProgram, Option, Validity)> { + ) -> Result<(InputProgram, QueryOutOptions)> { let vld = match payload.get("since") { None => Validity::current(), Some(v) => Validity::try_from(v)?, @@ -55,7 +72,25 @@ impl SessionTx { .get("out") .map(|spec| self.parse_query_out_spec(spec, entry_bindings)); let out_spec = swap_result_option(out_spec)?; - Ok((input_prog, out_spec, vld)) + let limit = swap_result_option(payload.get("limit").map(|v| { + v.as_u64() + .map(|v| v as usize) + .ok_or_else(|| anyhow!("'limit' must be a positive number")) + }))?; + let offset = swap_result_option(payload.get("offset").map(|v| { + v.as_u64() + .map(|v| v as usize) + .ok_or_else(|| anyhow!("'offset' must be a positive number")) + }))?; + Ok(( + input_prog, + QueryOutOptions { + out_spec, + vld, + limit, + offset, + }, + )) } fn parse_query_out_spec( &mut self, diff --git a/src/query/eval.rs b/src/query/eval.rs index d3a25a45..f6b15d68 100644 --- a/src/query/eval.rs +++ b/src/query/eval.rs @@ -10,11 +10,28 @@ use crate::query::compile::{AggrKind, CompiledProgram}; use crate::runtime::temp_store::TempStore; use crate::runtime::transact::SessionTx; +pub(crate) struct QueryLimiter { + limit: Option, + counter: usize, +} + +impl QueryLimiter { + pub(crate) fn incr(&mut self) -> bool { + if let Some(limit) = self.limit { + self.counter += 1; + self.counter >= limit + } else { + false + } + } +} + impl SessionTx { pub(crate) fn stratified_magic_evaluate( &mut self, strata: &[CompiledProgram], stores: &BTreeMap, + num_to_take: Option, ) -> Result { let ret_area = stores .get(&MagicSymbol::Muggle { @@ -25,7 +42,7 @@ impl SessionTx { for (idx, cur_prog) in strata.iter().enumerate() { debug!("stratum {}", idx); - self.semi_naive_magic_evaluate(cur_prog, &stores)?; + self.semi_naive_magic_evaluate(cur_prog, &stores, num_to_take)?; } Ok(ret_area) } @@ -33,6 +50,7 @@ impl SessionTx { &mut self, prog: &CompiledProgram, stores: &BTreeMap, + num_to_take: Option, ) -> Result<()> { if log_enabled!(Level::Debug) { for (k, vs) in prog.iter() { @@ -44,6 +62,10 @@ impl SessionTx { let mut changed: BTreeMap<_, _> = prog.keys().map(|k| (k, false)).collect(); let mut prev_changed = changed.clone(); + let mut limiter = QueryLimiter { + limit: num_to_take, + counter: 0, + }; for epoch in 0u32.. { debug!("epoch {}", epoch); @@ -52,6 +74,7 @@ impl SessionTx { let aggr_kind = ruleset.aggr_kind(); let store = stores.get(k).unwrap(); let use_delta = BTreeSet::default(); + let should_check_limit = num_to_take.is_some() && k.is_prog_entry(); match aggr_kind { AggrKind::None | AggrKind::Meet => { let is_meet = aggr_kind == AggrKind::Meet; @@ -63,7 +86,17 @@ impl SessionTx { if is_meet { store.aggr_meet_put(&item, &rule.aggr, 0)?; } else { - store.put(&item, 0)?; + if should_check_limit { + if !store.exists(&item, 0)? { + store.put(&item, 0)?; + if limiter.incr() { + trace!("early stopping due to result count limit exceeded"); + return Ok(()); + } + } + } else { + store.put(&item, 0)?; + } } *changed.get_mut(k).unwrap() = true; } @@ -86,12 +119,32 @@ impl SessionTx { if rule_is_aggr { store_to_use.normal_aggr_put(&item, &rule.aggr, serial)?; } else { - store_to_use.put(&item, 0)?; + if should_check_limit { + if !store.exists(&item, 0)? { + store.put(&item, 0)?; + if limiter.incr() { + trace!("early stopping due to result count limit exceeded"); + return Ok(()); + } + } + } else { + store_to_use.put(&item, 0)?; + } } *changed.get_mut(k).unwrap() = true; } if rule_is_aggr { - store_to_use.normal_aggr_scan_and_put(&rule.aggr, store)?; + if store_to_use.normal_aggr_scan_and_put( + &rule.aggr, + store, + if should_check_limit { + Some(&mut limiter) + } else { + None + }, + )? { + return Ok(()); + } } } } @@ -105,6 +158,7 @@ impl SessionTx { for (k, ruleset) in prog.iter() { let store = stores.get(k).unwrap(); + let should_check_limit = num_to_take.is_some() && k.is_prog_entry(); for (rule_n, rule) in ruleset.rules.iter().enumerate() { let mut should_do_calculation = false; for d_rule in &rule.contained_rules { @@ -160,6 +214,12 @@ impl SessionTx { *changed.get_mut(k).unwrap() = true; store.put(&item, epoch)?; store.put(&item, 0)?; + if should_check_limit { + if limiter.incr() { + trace!("early stopping due to result count limit exceeded"); + return Ok(()); + } + } } } } diff --git a/src/query/pull.rs b/src/query/pull.rs index 61318027..5828ae94 100644 --- a/src/query/pull.rs +++ b/src/query/pull.rs @@ -1,6 +1,7 @@ use std::collections::{BTreeSet, HashSet}; use anyhow::Result; +use either::{Left, Right}; use itertools::Itertools; use serde_json::{json, Map}; use smallvec::{smallvec, SmallVec, ToSmallVec}; @@ -14,7 +15,7 @@ use crate::data::json::JsonValue; use crate::data::symb::Symbol; use crate::data::triple::StoreOp; use crate::data::value::DataValue; -use crate::parse::query::OutSpec; +use crate::parse::query::QueryOutOptions; use crate::query::relation::flatten_err; use crate::runtime::temp_store::TempStore; use crate::runtime::transact::SessionTx; @@ -86,18 +87,20 @@ impl SessionTx { pub(crate) fn run_pull_on_query_results( &mut self, res_store: TempStore, - out_spec: Option, - vld: Validity, + out_opts: QueryOutOptions, ) -> Result> { - match out_spec { - None => Ok(Box::new(res_store.scan_all().map_ok(|tuple| { + let out_iter = match out_opts.offset { + None => Left(res_store.scan_all()), + Some(n) => Right(res_store.scan_all().skip(n)), + }; + match out_opts.out_spec { + None => Ok(Box::new(out_iter.map_ok(|tuple| { JsonValue::Array(tuple.0.into_iter().map(JsonValue::from).collect_vec()) }))), Some((pull_specs, out_keys)) => { // type OutSpec = (Vec<(usize, Option)>, Option>); Ok(Box::new( - res_store - .scan_all() + out_iter .map_ok(move |tuple| -> Result { let tuple = tuple.0; let res_iter = @@ -115,7 +118,7 @@ impl SessionTx { for (idx, spec) in specs.iter().enumerate() { self.pull( eid, - vld, + out_opts.vld, spec, 0, &specs, diff --git a/src/runtime/db.rs b/src/runtime/db.rs index 1d3a0b20..affe7821 100644 --- a/src/runtime/db.rs +++ b/src/runtime/db.rs @@ -280,21 +280,21 @@ impl Db { } pub fn run_query(&self, payload: &JsonValue) -> Result { let mut tx = self.transact()?; - let (input_program, out_spec, vld) = tx.parse_query(payload)?; + let (input_program, out_opts) = tx.parse_query(payload)?; let program = input_program .to_normalized_program()? .stratify()? .magic_sets_rewrite(); let (compiled, stores) = tx.stratified_magic_compile(&program)?; - let result = tx.stratified_magic_evaluate(&compiled, &stores)?; + let result = tx.stratified_magic_evaluate(&compiled, &stores, out_opts.num_to_take())?; let ret: Vec<_> = tx - .run_pull_on_query_results(result, out_spec, vld)? + .run_pull_on_query_results(result, out_opts)? .try_collect()?; Ok(json!(ret)) } pub fn explain_query(&self, payload: &JsonValue) -> Result { let mut tx = self.transact()?; - let (input_program, out_spec, vld) = tx.parse_query(payload)?; + let (input_program, out_opts) = tx.parse_query(payload)?; let normalized_program = input_program.to_normalized_program()?; let stratified_program = normalized_program.stratify()?; let magic_program = stratified_program.magic_sets_rewrite(); diff --git a/src/runtime/temp_store.rs b/src/runtime/temp_store.rs index 90a94145..33e93366 100644 --- a/src/runtime/temp_store.rs +++ b/src/runtime/temp_store.rs @@ -1,3 +1,4 @@ +use std::borrow::BorrowMut; use std::fmt::{Debug, Formatter}; use anyhow::Result; @@ -10,6 +11,7 @@ use crate::data::aggr::Aggregation; use crate::data::program::MagicSymbol; use crate::data::tuple::{EncodedTuple, Tuple}; use crate::data::value::DataValue; +use crate::query::eval::QueryLimiter; use crate::utils::swap_result_option; #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] @@ -147,7 +149,8 @@ impl TempStore { &self, aggrs: &[Option], store: &TempStore, - ) -> Result<()> { + mut limiter: Option<&mut QueryLimiter> + ) -> Result { let (lower, upper) = EncodedTuple::bounds_for_prefix_and_epoch(self.id, 0); let mut it = self .db @@ -214,12 +217,22 @@ impl TempStore { } } } - store.put(&Tuple(aggr_res), 0)?; + let res_tpl = Tuple(aggr_res); + if let Some(lmt) = limiter.borrow_mut() { + if !store.exists(&res_tpl, 0)? { + store.put(&res_tpl, 0)?; + if lmt.incr() { + return Ok(true); + } + } + } else { + store.put(&res_tpl, 0)?; + } } else { - return group.into_iter().next().unwrap().map(|_| ()); + return group.into_iter().next().unwrap().map(|_| true); } } - Ok(()) + Ok(false) } pub(crate) fn scan_all_for_epoch( diff --git a/tests/creation.rs b/tests/creation.rs index a42411de..59dc0687 100644 --- a/tests/creation.rs +++ b/tests/creation.rs @@ -159,7 +159,8 @@ fn creation() { ] } ], - "out": {"friend": {"pull": "?a", "spec": ["person.first_name"]}} + "out": {"friend": {"pull": "?a", "spec": ["person.first_name"]}}, + "limit": 1, }); let ret = db.run_query(&query).unwrap(); let res = to_string_pretty(&ret).unwrap();