From 9a07675f786134936641894902824151747c254c Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Fri, 27 May 2022 21:35:26 +0800 Subject: [PATCH] implement sorting --- cozorocks/src/lib.rs | 25 ++++++++++++++ src/algebra/op/sort.rs | 77 +++++++++++++++++++++++++++++++++++++++--- src/data/tuple_set.rs | 25 ++++++++++++-- src/data/value.rs | 6 ++++ 4 files changed, 125 insertions(+), 8 deletions(-) diff --git a/cozorocks/src/lib.rs b/cozorocks/src/lib.rs index f93d9327..a5ba958a 100644 --- a/cozorocks/src/lib.rs +++ b/cozorocks/src/lib.rs @@ -177,6 +177,23 @@ impl> Iterator for PrefixIterator

{ } } +pub struct RowIterator { + iter: IteratorPtr, + started: bool, +} + +impl Iterator for RowIterator { + type Item = (SlicePtr, SlicePtr); + fn next(&mut self) -> Option { + if self.started { + self.iter.next() + } else { + self.started = true + } + self.iter.pair() + } +} + impl IteratorPtr { #[inline] pub fn to_first(&self) { @@ -203,6 +220,14 @@ impl IteratorPtr { IteratorBridge::do_seek_for_prev(self, key.as_ref()) } #[inline] + pub fn iter_rows>(self, prefix: T) -> RowIterator { + self.seek(prefix.as_ref()); + RowIterator { + iter: self, + started: false, + } + } + #[inline] pub fn iter_prefix>(self, prefix: T) -> PrefixIterator { self.seek(prefix.as_ref()); PrefixIterator { diff --git a/src/algebra/op/sort.rs b/src/algebra/op/sort.rs index 52faee27..388cc0e0 100644 --- a/src/algebra/op/sort.rs +++ b/src/algebra/op/sort.rs @@ -1,10 +1,17 @@ use crate::algebra::op::RelationalAlgebra; use crate::algebra::parser::RaBox; +use crate::context::TempDbContext; use crate::data::expr::StaticExpr; -use crate::data::tuple_set::{BindingMap, TupleSet, TupleSetIdx}; -use crate::ddl::reify::TableInfo; +use crate::data::tuple::{DataKind, OwnTuple, Tuple}; +use crate::data::tuple_set::{BindingMap, TupleSet, MIN_TABLE_ID_BOUND, BindingMapEvalContext}; +use crate::ddl::reify::{DdlContext, TableInfo}; +use crate::runtime::options::{default_read_options}; use anyhow::Result; -use std::collections::{BTreeMap, BTreeSet}; +use log::error; +use std::cell::RefCell; +use std::cmp::Reverse; +use std::collections::{BTreeSet}; +use crate::data::value::Value; pub(crate) const NAME_SORT: &str = "Sort"; @@ -16,12 +23,61 @@ pub(crate) enum SortDirection { pub(crate) struct SortOp<'a> { source: RaBox<'a>, + ctx: &'a TempDbContext<'a>, sort_exprs: Vec<(StaticExpr, SortDirection)>, + temp_table_id: RefCell, +} + +impl<'a> SortOp<'a> { + fn sort_data(&self) -> Result<()> { + let temp_table_id = *self.temp_table_id.borrow(); + assert!(temp_table_id > MIN_TABLE_ID_BOUND); + let source_map = self.source.binding_map()?; + let binding_ctx = BindingMapEvalContext { + map: &source_map, + parent: self.ctx, + }; + let sort_exprs = self.sort_exprs.iter().map(|(ex, dir)| -> Result<(StaticExpr, SortDirection)>{ + let ex = ex.clone().partial_eval(&binding_ctx)?.into_static(); + Ok((ex, *dir)) + }).collect::>>()?; + let mut insertion_key = OwnTuple::with_prefix(temp_table_id); + let mut insertion_val = OwnTuple::with_data_prefix(DataKind::Data); + for (i, tset) in self.source.iter()?.enumerate() { + insertion_key.truncate_all(); + insertion_val.truncate_all(); + let tset = tset?; + for (expr, dir) in &sort_exprs { + let mut val = expr.row_eval(&tset)?; + if *dir == SortDirection::Dsc { + val = Value::DescVal(Reverse(val.into())) + } + insertion_key.push_value(&val); + } + insertion_key.push_int(i as i64); + tset.encode_as_tuple(&mut insertion_val); + self.ctx.sess.temp.put(&self.ctx.sess.w_opts_temp, &insertion_key, &insertion_val)?; + } + Ok(()) + } } impl<'a> Drop for SortOp<'a> { fn drop(&mut self) { - todo!() + let id = *self.temp_table_id.borrow(); + if id > MIN_TABLE_ID_BOUND { + let start_key = OwnTuple::with_prefix(id); + let mut end_key = OwnTuple::with_prefix(id); + end_key.seal_with_sentinel(); + if let Err(e) = + self.ctx + .sess + .temp + .del_range(&self.ctx.sess.w_opts_temp, start_key, end_key) + { + error!("Undefine temp table failed: {:?}", e) + } + } } } @@ -39,7 +95,18 @@ impl<'b> RelationalAlgebra for SortOp<'b> { } fn iter<'a>(&'a self) -> Result> + 'a>> { - todo!() + if *self.temp_table_id.borrow() == 0 { + *self.temp_table_id.borrow_mut() = self.ctx.gen_temp_table_id().id; + self.sort_data()?; + } + let r_opts = default_read_options(); + let iter = self.ctx.sess.temp.iterator(&r_opts); + let key = OwnTuple::with_prefix(*self.temp_table_id.borrow()); + Ok(Box::new(iter.iter_rows(key).map(|(_k, v)| -> Result { + let v = Tuple::new(v); + let tset = TupleSet::decode_from_tuple(&v)?; + Ok(tset) + }))) } fn identity(&self) -> Option { diff --git a/src/data/tuple_set.rs b/src/data/tuple_set.rs index 3d35e22d..7c36eca6 100644 --- a/src/data/tuple_set.rs +++ b/src/data/tuple_set.rs @@ -1,6 +1,7 @@ use crate::data::eval::{PartialEvalContext, RowEvalContext}; use crate::data::expr::Expr; use crate::data::tuple::{DataKind, OwnTuple, ReifiedTuple, Tuple}; +use crate::data::tuple_set::TupleSetError::DecodeFailure; use crate::data::typing::Typing; use crate::data::value::{StaticValue, Value}; use anyhow::Result; @@ -18,6 +19,8 @@ pub enum TupleSetError { Deser(StaticValue), #[error("resolve db on raw tuple set")] RawTupleSetDbResolve, + #[error("Decode tupleset from tuple failed for {0:?}")] + DecodeFailure(OwnTuple), } pub(crate) const MIN_TABLE_ID_BOUND: u32 = 10000; @@ -138,9 +141,25 @@ impl TupleSet { target.push_bytes(v.as_ref()); } } - pub(crate) fn decode_from_tuple>(source: &Tuple) { - // let k_len = - todo!() + pub(crate) fn decode_from_tuple>(source: &Tuple) -> Result { + let gen_err = || DecodeFailure(source.to_owned()); + let k_len = source.get(0)?.get_int().ok_or_else(gen_err)? as usize; + let v_len = source.get(1)?.get_int().ok_or_else(gen_err)? as usize; + let mut ret = TupleSet { + keys: Vec::with_capacity(k_len), + vals: Vec::with_capacity(v_len), + }; + for i in 2..k_len + 2 { + let d = source.get(i)?; + let d = d.get_bytes().ok_or_else(gen_err)?; + ret.keys.push(OwnTuple::new(d.to_vec()).into()); + } + for i in k_len + 2..k_len + v_len + 2 { + let d = source.get(i)?; + let d = d.get_bytes().ok_or_else(gen_err)?; + ret.vals.push(OwnTuple::new(d.to_vec()).into()); + } + Ok(ret) } } diff --git a/src/data/value.rs b/src/data/value.rs index c15c8c2a..fd94b1a0 100644 --- a/src/data/value.rs +++ b/src/data/value.rs @@ -51,6 +51,12 @@ impl<'a> Value<'a> { _ => None, } } + pub(crate) fn get_bytes(&self) -> Option<&[u8]> { + match self { + Value::Bytes(b) => Some(b.as_ref()), + _ => None, + } + } pub(crate) fn get_slice(&self) -> Option<&[Value<'a>]> { match self { Value::List(l) => Some(l),