implement sorting

main
Ziyang Hu 2 years ago
parent ad36e2d86d
commit 9a07675f78

@ -177,6 +177,23 @@ impl<P: AsRef<[u8]>> Iterator for PrefixIterator<P> {
}
}
pub struct RowIterator {
iter: IteratorPtr,
started: bool,
}
impl Iterator for RowIterator {
type Item = (SlicePtr, SlicePtr);
fn next(&mut self) -> Option<Self::Item> {
if self.started {
self.iter.next()
} else {
self.started = true
}
self.iter.pair()
}
}
impl IteratorPtr {
#[inline]
pub fn to_first(&self) {
@ -203,6 +220,14 @@ impl IteratorPtr {
IteratorBridge::do_seek_for_prev(self, key.as_ref())
}
#[inline]
pub fn iter_rows<T: AsRef<[u8]>>(self, prefix: T) -> RowIterator {
self.seek(prefix.as_ref());
RowIterator {
iter: self,
started: false,
}
}
#[inline]
pub fn iter_prefix<T: AsRef<[u8]>>(self, prefix: T) -> PrefixIterator<T> {
self.seek(prefix.as_ref());
PrefixIterator {

@ -1,10 +1,17 @@
use crate::algebra::op::RelationalAlgebra;
use crate::algebra::parser::RaBox;
use crate::context::TempDbContext;
use crate::data::expr::StaticExpr;
use crate::data::tuple_set::{BindingMap, TupleSet, TupleSetIdx};
use crate::ddl::reify::TableInfo;
use crate::data::tuple::{DataKind, OwnTuple, Tuple};
use crate::data::tuple_set::{BindingMap, TupleSet, MIN_TABLE_ID_BOUND, BindingMapEvalContext};
use crate::ddl::reify::{DdlContext, TableInfo};
use crate::runtime::options::{default_read_options};
use anyhow::Result;
use std::collections::{BTreeMap, BTreeSet};
use log::error;
use std::cell::RefCell;
use std::cmp::Reverse;
use std::collections::{BTreeSet};
use crate::data::value::Value;
pub(crate) const NAME_SORT: &str = "Sort";
@ -16,12 +23,61 @@ pub(crate) enum SortDirection {
pub(crate) struct SortOp<'a> {
source: RaBox<'a>,
ctx: &'a TempDbContext<'a>,
sort_exprs: Vec<(StaticExpr, SortDirection)>,
temp_table_id: RefCell<u32>,
}
impl<'a> SortOp<'a> {
fn sort_data(&self) -> Result<()> {
let temp_table_id = *self.temp_table_id.borrow();
assert!(temp_table_id > MIN_TABLE_ID_BOUND);
let source_map = self.source.binding_map()?;
let binding_ctx = BindingMapEvalContext {
map: &source_map,
parent: self.ctx,
};
let sort_exprs = self.sort_exprs.iter().map(|(ex, dir)| -> Result<(StaticExpr, SortDirection)>{
let ex = ex.clone().partial_eval(&binding_ctx)?.into_static();
Ok((ex, *dir))
}).collect::<Result<Vec<_>>>()?;
let mut insertion_key = OwnTuple::with_prefix(temp_table_id);
let mut insertion_val = OwnTuple::with_data_prefix(DataKind::Data);
for (i, tset) in self.source.iter()?.enumerate() {
insertion_key.truncate_all();
insertion_val.truncate_all();
let tset = tset?;
for (expr, dir) in &sort_exprs {
let mut val = expr.row_eval(&tset)?;
if *dir == SortDirection::Dsc {
val = Value::DescVal(Reverse(val.into()))
}
insertion_key.push_value(&val);
}
insertion_key.push_int(i as i64);
tset.encode_as_tuple(&mut insertion_val);
self.ctx.sess.temp.put(&self.ctx.sess.w_opts_temp, &insertion_key, &insertion_val)?;
}
Ok(())
}
}
impl<'a> Drop for SortOp<'a> {
fn drop(&mut self) {
todo!()
let id = *self.temp_table_id.borrow();
if id > MIN_TABLE_ID_BOUND {
let start_key = OwnTuple::with_prefix(id);
let mut end_key = OwnTuple::with_prefix(id);
end_key.seal_with_sentinel();
if let Err(e) =
self.ctx
.sess
.temp
.del_range(&self.ctx.sess.w_opts_temp, start_key, end_key)
{
error!("Undefine temp table failed: {:?}", e)
}
}
}
}
@ -39,7 +95,18 @@ impl<'b> RelationalAlgebra for SortOp<'b> {
}
fn iter<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<TupleSet>> + 'a>> {
todo!()
if *self.temp_table_id.borrow() == 0 {
*self.temp_table_id.borrow_mut() = self.ctx.gen_temp_table_id().id;
self.sort_data()?;
}
let r_opts = default_read_options();
let iter = self.ctx.sess.temp.iterator(&r_opts);
let key = OwnTuple::with_prefix(*self.temp_table_id.borrow());
Ok(Box::new(iter.iter_rows(key).map(|(_k, v)| -> Result<TupleSet> {
let v = Tuple::new(v);
let tset = TupleSet::decode_from_tuple(&v)?;
Ok(tset)
})))
}
fn identity(&self) -> Option<TableInfo> {

@ -1,6 +1,7 @@
use crate::data::eval::{PartialEvalContext, RowEvalContext};
use crate::data::expr::Expr;
use crate::data::tuple::{DataKind, OwnTuple, ReifiedTuple, Tuple};
use crate::data::tuple_set::TupleSetError::DecodeFailure;
use crate::data::typing::Typing;
use crate::data::value::{StaticValue, Value};
use anyhow::Result;
@ -18,6 +19,8 @@ pub enum TupleSetError {
Deser(StaticValue),
#[error("resolve db on raw tuple set")]
RawTupleSetDbResolve,
#[error("Decode tupleset from tuple failed for {0:?}")]
DecodeFailure(OwnTuple),
}
pub(crate) const MIN_TABLE_ID_BOUND: u32 = 10000;
@ -138,9 +141,25 @@ impl TupleSet {
target.push_bytes(v.as_ref());
}
}
pub(crate) fn decode_from_tuple<T: AsRef<[u8]>>(source: &Tuple<T>) {
// let k_len =
todo!()
pub(crate) fn decode_from_tuple<T: AsRef<[u8]>>(source: &Tuple<T>) -> Result<Self> {
let gen_err = || DecodeFailure(source.to_owned());
let k_len = source.get(0)?.get_int().ok_or_else(gen_err)? as usize;
let v_len = source.get(1)?.get_int().ok_or_else(gen_err)? as usize;
let mut ret = TupleSet {
keys: Vec::with_capacity(k_len),
vals: Vec::with_capacity(v_len),
};
for i in 2..k_len + 2 {
let d = source.get(i)?;
let d = d.get_bytes().ok_or_else(gen_err)?;
ret.keys.push(OwnTuple::new(d.to_vec()).into());
}
for i in k_len + 2..k_len + v_len + 2 {
let d = source.get(i)?;
let d = d.get_bytes().ok_or_else(gen_err)?;
ret.vals.push(OwnTuple::new(d.to_vec()).into());
}
Ok(ret)
}
}

@ -51,6 +51,12 @@ impl<'a> Value<'a> {
_ => None,
}
}
pub(crate) fn get_bytes(&self) -> Option<&[u8]> {
match self {
Value::Bytes(b) => Some(b.as_ref()),
_ => None,
}
}
pub(crate) fn get_slice(&self) -> Option<&[Value<'a>]> {
match self {
Value::List(l) => Some(l),

Loading…
Cancel
Save