diff --git a/Cargo.lock b/Cargo.lock index 24b1758d..3984fbb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -339,6 +339,7 @@ version = "0.1.0" dependencies = [ "approx", "base64", + "byteorder", "casey", "chrono", "chrono-tz", diff --git a/Cargo.toml b/Cargo.toml index 83216f9a..ba59184c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ chrono = "0.4.19" chrono-tz = "0.6.3" priority-queue = "1.2.3" ordered-float = "3.0.0" +byteorder = "1.4.3" num-traits = "0.2.15" itertools = "0.10.3" regex = "1.6.0" diff --git a/src/data/functions.rs b/src/data/functions.rs index de2f7f0b..9591e5dd 100644 --- a/src/data/functions.rs +++ b/src/data/functions.rs @@ -43,7 +43,6 @@ fn ensure_same_value_type(a: &DataValue, b: &DataValue) -> Result<()> { | (Regex(_), Regex(_)) | (List(_), List(_)) | (Set(_), Set(_)) - | (Rev(_), Rev(_)) | (Guard, Guard) | (Bot, Bot) ) { diff --git a/src/data/json.rs b/src/data/json.rs index d655fed2..9afcc81c 100644 --- a/src/data/json.rs +++ b/src/data/json.rs @@ -84,7 +84,6 @@ impl From for JsonValue { DataValue::List(l) => { JsonValue::Array(l.iter().map(|v| JsonValue::from(v.clone())).collect()) } - DataValue::Rev(v) => JsonValue::from(*v.0), DataValue::Bot => panic!("found bottom"), DataValue::Guard => panic!("found guard"), DataValue::Set(l) => { diff --git a/src/data/memcmp.rs b/src/data/memcmp.rs new file mode 100644 index 00000000..67cd4d56 --- /dev/null +++ b/src/data/memcmp.rs @@ -0,0 +1,334 @@ +/* + * Copyright 2022, The Cozo Project Authors. Licensed under AGPL-3 or later. + */ + +use std::collections::BTreeSet; +use std::io::Write; +use std::str::FromStr; + +use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use regex::Regex; + +use crate::data::value::{DataValue, Num, RegexWrapper, UuidWrapper}; + +const INIT_TAG: u8 = 0x00; +const NULL_TAG: u8 = 0x01; +const FALSE_TAG: u8 = 0x02; +const TRUE_TAG: u8 = 0x03; +const NUM_TAG: u8 = 0x05; +const STR_TAG: u8 = 0x06; +const BYTES_TAG: u8 = 0x07; +const UUID_TAG: u8 = 0x08; +const REGEX_TAG: u8 = 0x09; +const LIST_TAG: u8 = 0x0A; +const SET_TAG: u8 = 0x0B; +const GUARD_TAG: u8 = 0xFE; +const BOT_TAG: u8 = 0xFF; + +pub(crate) trait MemCmpEncoder: Write { + fn encode_datavalue(&mut self, v: &DataValue) { + match v { + DataValue::Null => self.write_u8(NULL_TAG).unwrap(), + DataValue::Bool(false) => self.write_u8(FALSE_TAG).unwrap(), + DataValue::Bool(true) => self.write_u8(TRUE_TAG).unwrap(), + DataValue::Num(n) => { + self.write_u8(NUM_TAG).unwrap(); + self.encode_num(*n); + } + DataValue::Str(s) => { + self.write_u8(STR_TAG).unwrap(); + self.encode_bytes(s.as_bytes()); + } + DataValue::Bytes(b) => { + self.write_u8(BYTES_TAG).unwrap(); + self.encode_bytes(b) + } + DataValue::Uuid(u) => { + self.write_u8(UUID_TAG).unwrap(); + let (s_l, s_m, s_h, s_rest) = u.0.as_fields(); + self.write_u16::(s_h).unwrap(); + self.write_u16::(s_m).unwrap(); + self.write_u32::(s_l).unwrap(); + self.encode_bytes(s_rest) + } + DataValue::Regex(rx) => { + self.write_u8(REGEX_TAG).unwrap(); + let s = rx.0.as_str().as_bytes(); + self.encode_bytes(s) + } + DataValue::List(l) => { + self.write_u8(LIST_TAG).unwrap(); + for el in l { + self.encode_datavalue(el); + } + self.write_u8(INIT_TAG).unwrap() + } + DataValue::Set(s) => { + self.write_u8(SET_TAG).unwrap(); + for el in s { + self.encode_datavalue(el); + } + self.write_u8(INIT_TAG).unwrap() + } + DataValue::Guard => self.write_u8(GUARD_TAG).unwrap(), + DataValue::Bot => self.write_u8(BOT_TAG).unwrap(), + } + } + fn encode_num(&mut self, v: Num) { + let f = v.get_float(); + let u = order_encode_f64(f); + self.write_u64::(u).unwrap(); + match v { + Num::I(i) => { + self.write_u8(0b0).unwrap(); + let i_lsb = order_encode_i64(i) as u16; + self.write_u16::(i_lsb).unwrap(); + } + Num::F(_) => { + self.write_u8(0b1000).unwrap(); + } + } + } + + fn encode_bytes(&mut self, key: &[u8]) { + let len = key.len(); + let mut index = 0; + while index <= len { + let remain = len - index; + let mut pad: usize = 0; + if remain > ENC_GROUP_SIZE { + self.write_all(&key[index..index + ENC_GROUP_SIZE]).unwrap(); + } else { + pad = ENC_GROUP_SIZE - remain; + self.write_all(&key[index..]).unwrap(); + self.write_all(&ENC_ASC_PADDING[..pad]).unwrap(); + } + self.write_all(&[ENC_MARKER - (pad as u8)]).unwrap(); + index += ENC_GROUP_SIZE; + } + } +} + +pub fn decode_bytes(data: &[u8]) -> (Vec, &[u8]) { + let mut key = Vec::with_capacity(data.len() / (ENC_GROUP_SIZE + 1) * ENC_GROUP_SIZE); + let mut offset = 0; + let chunk_len = ENC_GROUP_SIZE + 1; + loop { + let next_offset = offset + chunk_len; + debug_assert!(next_offset <= data.len()); + let chunk = &data[offset..next_offset]; + offset = next_offset; + + let (&marker, bytes) = chunk.split_last().unwrap(); + let pad_size = (ENC_MARKER - marker) as usize; + + if pad_size == 0 { + key.write_all(bytes).unwrap(); + continue; + } + debug_assert!(pad_size <= ENC_GROUP_SIZE); + + let (bytes, padding) = bytes.split_at(ENC_GROUP_SIZE - pad_size); + key.write_all(bytes).unwrap(); + + debug_assert!(!padding.iter().any(|x| *x != 0)); + + return (key, &data[offset..]); + } +} + +const SIGN_MARK: u64 = 0x8000000000000000; + +fn order_encode_i64(v: i64) -> u64 { + v as u64 ^ SIGN_MARK +} + +fn order_encode_f64(v: f64) -> u64 { + let u = v.to_bits(); + if v.is_sign_positive() { + u | SIGN_MARK + } else { + !u + } +} + +fn order_decode_f64(u: u64) -> f64 { + let u = if u & SIGN_MARK > 0 { + u & (!SIGN_MARK) + } else { + !u + }; + f64::from_bits(u) +} + +const ENC_GROUP_SIZE: usize = 8; +const ENC_MARKER: u8 = b'\xff'; +const ENC_ASC_PADDING: [u8; ENC_GROUP_SIZE] = [0; ENC_GROUP_SIZE]; + +impl Num { + pub(crate) fn decode_from_key(bs: &[u8]) -> (Self, &[u8]) { + let (float_part, remaining) = bs.split_at(8); + let fu = BigEndian::read_u64(float_part); + let f = order_decode_f64(fu); + let (tag, remaining) = remaining.split_first().unwrap(); + if *tag == 0b1000 { + return (Num::F(f), remaining); + } + let (subtag, remaining) = remaining.split_at(2); + let n = f as i64; + let mut n_bytes = n.to_be_bytes(); + n_bytes[6] = subtag[0]; + n_bytes[7] = subtag[1]; + let n = BigEndian::read_i64(&n_bytes); + return (Num::I(n), remaining); + } +} + +impl DataValue { + pub(crate) fn decode_from_key(bs: &[u8]) -> (Self, &[u8]) { + let (tag, remaining) = bs.split_first().unwrap(); + match *tag { + NULL_TAG => (DataValue::Null, remaining), + FALSE_TAG => (DataValue::Bool(false), remaining), + TRUE_TAG => (DataValue::Bool(true), remaining), + NUM_TAG => { + let (n, remaining) = Num::decode_from_key(remaining); + (DataValue::Num(n), remaining) + } + STR_TAG => { + let (bytes, remaining) = decode_bytes(remaining); + let s = unsafe { String::from_utf8_unchecked(bytes) }; + (DataValue::Str(s.into()), remaining) + } + BYTES_TAG => { + let (bytes, remaining) = decode_bytes(remaining); + (DataValue::Bytes(bytes), remaining) + } + UUID_TAG => { + let (uuid_data, remaining) = remaining.split_at(16); + let s_h = BigEndian::read_u16(&uuid_data[0..2]); + let s_m = BigEndian::read_u16(&uuid_data[2..4]); + let s_l = BigEndian::read_u32(&uuid_data[4..8]); + let mut s_rest = [0u8; 8]; + s_rest.copy_from_slice(&uuid_data[8..]); + let uuid = uuid::Uuid::from_fields(s_l, s_m, s_h, &s_rest); + (DataValue::Uuid(UuidWrapper(uuid)), remaining) + } + REGEX_TAG => { + let (bytes, remaining) = decode_bytes(remaining); + let s = unsafe { String::from_utf8_unchecked(bytes) }; + ( + DataValue::Regex(RegexWrapper(Regex::from_str(&s).unwrap())), + remaining, + ) + } + LIST_TAG => { + let mut collected = vec![]; + let mut remaining = remaining; + while remaining[0] != INIT_TAG { + let (val, next_chunk) = DataValue::decode_from_key(remaining); + remaining = next_chunk; + collected.push(val); + } + (DataValue::List(collected), &remaining[1..]) + } + SET_TAG => { + let mut collected = BTreeSet::default(); + let mut remaining = remaining; + while remaining[0] != INIT_TAG { + let (val, next_chunk) = DataValue::decode_from_key(remaining); + remaining = next_chunk; + collected.insert(val); + } + (DataValue::Set(collected), &remaining[1..]) + } + GUARD_TAG => (DataValue::Guard, remaining), + BOT_TAG => (DataValue::Bot, remaining), + _ => unreachable!("{:?}", bs), + } + } +} + +impl MemCmpEncoder for T {} + +#[cfg(test)] +mod tests { + use smartstring::SmartString; + use crate::data::memcmp::{decode_bytes, MemCmpEncoder}; + use crate::data::value::DataValue; + + #[test] + fn encode_decode_bytes() { + let target = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit..."; + for i in 0..target.len() { + let bs = &target[i..]; + let mut encoder: Vec = vec![]; + encoder.encode_bytes(bs); + let (decoded, remaining) = decode_bytes(&encoder); + assert!(remaining.is_empty()); + assert_eq!(bs, decoded); + + let mut encoder: Vec = vec![]; + encoder.encode_bytes(target); + encoder.encode_bytes(bs); + encoder.encode_bytes(bs); + encoder.encode_bytes(target); + + let (decoded, remaining) = decode_bytes(&encoder); + assert_eq!(&target[..], decoded); + + let (decoded, remaining) = decode_bytes(remaining); + assert_eq!(bs, decoded); + + let (decoded, remaining) = decode_bytes(remaining); + assert_eq!(bs, decoded); + + let (decoded, remaining) = decode_bytes(remaining); + assert_eq!(&target[..], decoded); + assert!(remaining.is_empty()); + } + } + + #[test] + fn specific_encode() { + let mut encoder = vec![]; + encoder.encode_datavalue(&DataValue::from(2095)); + println!("e1 {:?}", encoder); + encoder.encode_datavalue(&DataValue::Str(SmartString::from("MSS"))); + println!("e2 {:?}", encoder); + let (a, remaining) = DataValue::decode_from_key(&encoder); + println!("r {:?}", remaining); + let (b, remaining) = DataValue::decode_from_key(remaining); + assert!(remaining.is_empty()); + assert_eq!(a, DataValue::from(2095)); + assert_eq!(b, DataValue::Str(SmartString::from("MSS"))); + } + + #[test] + fn encode_decode_datavalues() { + let mut dv = vec![ + DataValue::Null, + DataValue::Bool(false), + DataValue::Bool(true), + DataValue::from(1), + DataValue::from(1.0), + DataValue::from(i64::MAX), + DataValue::from(i64::MAX - 1), + DataValue::from(i64::MAX - 2), + DataValue::from(i64::MIN), + DataValue::from(i64::MIN + 1), + DataValue::from(i64::MIN + 2), + DataValue::from(f64::INFINITY), + DataValue::from(f64::NEG_INFINITY), + DataValue::List(vec![]) + ]; + dv.push(DataValue::List(dv.clone())); + dv.push(DataValue::List(dv.clone())); + let mut encoded = vec![]; + let v = DataValue::List(dv); + encoded.encode_datavalue(&v); + let (decoded, remaining) = DataValue::decode_from_key(&encoded); + assert!(remaining.is_empty()); + assert_eq!(decoded, v); + } +} diff --git a/src/data/mod.rs b/src/data/mod.rs index f642301d..5d9ea6b7 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -11,6 +11,7 @@ pub(crate) mod program; pub(crate) mod aggr; pub(crate) mod functions; pub(crate) mod relation; +pub(crate) mod memcmp; #[cfg(test)] mod tests; diff --git a/src/data/tuple.rs b/src/data/tuple.rs index d01a6be8..fc9388ec 100644 --- a/src/data/tuple.rs +++ b/src/data/tuple.rs @@ -2,13 +2,11 @@ * Copyright 2022, The Cozo Project Authors. Licensed under AGPL-3 or later. */ -use std::cmp::{max, min, Ordering}; use std::fmt::{Debug, Formatter}; use miette::Result; -use rmp_serde::Serializer; -use serde::Serialize; +use crate::data::memcmp::MemCmpEncoder; use crate::data::value::DataValue; use crate::runtime::relation::RelationId; @@ -23,157 +21,164 @@ impl Debug for Tuple { } } -pub(crate) type TupleIter<'a> = Box> + 'a>; +pub(crate) type TupleIter<'a> = Box> + 'a>; impl Tuple { pub(crate) fn encode_as_key(&self, prefix: RelationId) -> Vec { let len = self.0.len(); let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len); let prefix_bytes = prefix.0.to_be_bytes(); - ret.extend([ - prefix_bytes[2], - prefix_bytes[3], - prefix_bytes[4], - prefix_bytes[5], - prefix_bytes[6], - prefix_bytes[7], - ]); - ret.extend((len as u16).to_be_bytes()); - ret.resize(max(6, 4 * (len + 1)), 0); - for (idx, val) in self.0.iter().enumerate() { - if idx > 0 { - let pos = (ret.len() as u32).to_be_bytes(); - for (i, u) in pos.iter().enumerate() { - ret[4 * (1 + idx) + i] = *u; - } - } - val.serialize(&mut Serializer::new(&mut ret)).unwrap(); + ret.extend(prefix_bytes); + for val in self.0.iter() { + ret.encode_datavalue(val); } + // println!("encoded as key {:?}", ret); ret + // for (idx, val) in self.0.iter().enumerate() { + // if idx > 0 { + // let pos = (ret.len() as u32).to_be_bytes(); + // for (i, u) in pos.iter().enumerate() { + // ret[4 * (1 + idx) + i] = *u; + // } + // } + // val.serialize(&mut Serializer::new(&mut ret)).unwrap(); + // } + // ret } -} - -#[derive(Copy, Clone, Debug)] -pub(crate) struct EncodedTuple<'a>(pub(crate) &'a [u8]); - -impl<'a> From<&'a [u8]> for EncodedTuple<'a> { - fn from(s: &'a [u8]) -> Self { - EncodedTuple(s) - } -} - -impl<'a> EncodedTuple<'a> { - pub(crate) fn prefix(&self) -> RelationId { - debug_assert!(self.0.len() >= 6, "bad data: {:x?}", self.0); - let id = u64::from_be_bytes([ - 0, 0, self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5], - ]); - RelationId(id) - } - pub(crate) fn arity(&self) -> usize { - if self.0.len() == 6 { - return 0; - } - debug_assert!(self.0.len() >= 8, "bad data: {:x?}", self.0); - u16::from_be_bytes([self.0[6], self.0[7]]) as usize - } - fn force_get(&self, idx: usize) -> DataValue { - let pos = if idx == 0 { - let arity = u16::from_be_bytes([self.0[6], self.0[7]]) as usize; - 4 * (arity + 1) - } else { - let len_pos = (idx + 1) * 4; - u32::from_be_bytes([ - self.0[len_pos], - self.0[len_pos + 1], - self.0[len_pos + 2], - self.0[len_pos + 3], - ]) as usize - }; - rmp_serde::from_slice(&self.0[pos..]).unwrap() - } - pub(crate) fn get(&self, idx: usize) -> DataValue { - let pos = if idx == 0 { - 4 * (self.arity() + 1) - } else { - let len_pos = (idx + 1) * 4; - debug_assert!(self.0.len() >= len_pos + 4, "bad data: {:x?}", self.0); - u32::from_be_bytes([ - self.0[len_pos], - self.0[len_pos + 1], - self.0[len_pos + 2], - self.0[len_pos + 3], - ]) as usize - }; - debug_assert!( - pos < self.0.len(), - "bad data length for data: {:x?}", - self.0 - ); - rmp_serde::from_slice(&self.0[pos..]).expect("data corruption when getting from tuple") - } - - pub(crate) fn iter(&self) -> EncodedTupleIter<'a> { - EncodedTupleIter { - tuple: *self, - size: 0, - pos: 0, - } - } - pub(crate) fn decode(&self) -> Tuple { - Tuple(self.iter().collect()) - } -} - -pub(crate) struct EncodedTupleIter<'a> { - tuple: EncodedTuple<'a>, - size: usize, - pos: usize, -} - -impl<'a> Iterator for EncodedTupleIter<'a> { - type Item = DataValue; - - fn next(&mut self) -> Option { - if self.size == 0 { - let arity = self.tuple.arity(); - self.size = arity; - } - if self.pos == self.size { - None - } else { - let pos = self.pos; - self.pos += 1; - Some(self.tuple.get(pos)) - } - } -} - -pub(crate) fn rusty_scratch_cmp(a: &[u8], b: &[u8]) -> i8 { - match compare_tuple_keys(a, b) { - Ordering::Greater => 1, - Ordering::Equal => 0, - Ordering::Less => -1, - } -} - - -pub(crate) fn compare_tuple_keys(a: &[u8], b: &[u8]) -> Ordering { - let a = EncodedTuple(a); - let b = EncodedTuple(b); - match a.prefix().cmp(&b.prefix()) { - Ordering::Equal => {} - o => return o, - } - let a_len = a.arity(); - let b_len = b.arity(); - for idx in 0..min(a_len, b_len) { - let av = a.force_get(idx); - let bv = b.force_get(idx); - match av.cmp(&bv) { - Ordering::Equal => {} - o => return o, + pub(crate) fn decode_from_key(key: &[u8]) -> Self { + let mut remaining = &key[ENCODED_KEY_MIN_LEN..]; + let mut ret = vec![]; + while !remaining.is_empty() { + let (val, next) = DataValue::decode_from_key(remaining); + ret.push(val); + remaining = next; } + Tuple(ret) } - a_len.cmp(&b_len) } +pub(crate) const ENCODED_KEY_MIN_LEN: usize = 8; +// +// #[derive(Copy, Clone, Debug)] +// pub(crate) struct EncodedTuple<'a>(pub(crate) &'a [u8]); +// +// impl<'a> From<&'a [u8]> for EncodedTuple<'a> { +// fn from(s: &'a [u8]) -> Self { +// EncodedTuple(s) +// } +// } +// +// impl<'a> EncodedTuple<'a> { +// pub(crate) fn prefix(&self) -> RelationId { +// debug_assert!(self.0.len() >= 6, "bad data: {:x?}", self.0); +// let id = u64::from_be_bytes([ +// 0, 0, self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5], +// ]); +// RelationId(id) +// } +// pub(crate) fn arity(&self) -> usize { +// if self.0.len() == 6 { +// return 0; +// } +// debug_assert!(self.0.len() >= 8, "bad data: {:x?}", self.0); +// u16::from_be_bytes([self.0[6], self.0[7]]) as usize +// } +// fn force_get(&self, idx: usize) -> DataValue { +// let pos = if idx == 0 { +// let arity = u16::from_be_bytes([self.0[6], self.0[7]]) as usize; +// 4 * (arity + 1) +// } else { +// let len_pos = (idx + 1) * 4; +// u32::from_be_bytes([ +// self.0[len_pos], +// self.0[len_pos + 1], +// self.0[len_pos + 2], +// self.0[len_pos + 3], +// ]) as usize +// }; +// rmp_serde::from_slice(&self.0[pos..]).unwrap() +// } +// pub(crate) fn get(&self, idx: usize) -> DataValue { +// let pos = if idx == 0 { +// 4 * (self.arity() + 1) +// } else { +// let len_pos = (idx + 1) * 4; +// debug_assert!(self.0.len() >= len_pos + 4, "bad data: {:x?}", self.0); +// u32::from_be_bytes([ +// self.0[len_pos], +// self.0[len_pos + 1], +// self.0[len_pos + 2], +// self.0[len_pos + 3], +// ]) as usize +// }; +// debug_assert!( +// pos < self.0.len(), +// "bad data length for data: {:x?}", +// self.0 +// ); +// rmp_serde::from_slice(&self.0[pos..]).expect("data corruption when getting from tuple") +// } +// +// pub(crate) fn iter(&self) -> EncodedTupleIter<'a> { +// EncodedTupleIter { +// tuple: *self, +// size: 0, +// pos: 0, +// } +// } +// pub(crate) fn decode(&self) -> Tuple { +// Tuple(self.iter().collect()) +// } +// } +// +// pub(crate) struct EncodedTupleIter<'a> { +// tuple: EncodedTuple<'a>, +// size: usize, +// pos: usize, +// } +// +// impl<'a> Iterator for EncodedTupleIter<'a> { +// type Item = DataValue; +// +// fn next(&mut self) -> Option { +// if self.size == 0 { +// let arity = self.tuple.arity(); +// self.size = arity; +// } +// if self.pos == self.size { +// None +// } else { +// let pos = self.pos; +// self.pos += 1; +// Some(self.tuple.get(pos)) +// } +// } +// } +// +// pub(crate) fn rusty_scratch_cmp(a: &[u8], b: &[u8]) -> i8 { +// match compare_tuple_keys(a, b) { +// Ordering::Greater => 1, +// Ordering::Equal => 0, +// Ordering::Less => -1, +// } +// } +// +// +// pub(crate) fn compare_tuple_keys(a: &[u8], b: &[u8]) -> Ordering { +// let a = EncodedTuple(a); +// let b = EncodedTuple(b); +// match a.prefix().cmp(&b.prefix()) { +// Ordering::Equal => {} +// o => return o, +// } +// let a_len = a.arity(); +// let b_len = b.arity(); +// for idx in 0..min(a_len, b_len) { +// let av = a.force_get(idx); +// let bv = b.force_get(idx); +// match av.cmp(&bv) { +// Ordering::Equal => {} +// o => return o, +// } +// } +// a_len.cmp(&b_len) +// } diff --git a/src/data/value.rs b/src/data/value.rs index d4b8a55b..e171f1dc 100644 --- a/src/data/value.rs +++ b/src/data/value.rs @@ -2,7 +2,7 @@ * Copyright 2022, The Cozo Project Authors. Licensed under AGPL-3 or later. */ -use std::cmp::{Ordering, Reverse}; +use std::cmp::Ordering; use std::collections::BTreeSet; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; @@ -102,8 +102,6 @@ pub(crate) enum DataValue { List(Vec), #[serde(rename = "H", alias = "Set")] Set(BTreeSet), - #[serde(rename = "R", alias = "Rev")] - Rev(Reverse>), #[serde(rename = "G", alias = "Guard")] Guard, #[serde(rename = "_", alias = "Bot")] @@ -181,11 +179,10 @@ impl Display for Num { } else { write!(f, r#"to_float("INF")"#) } - } - else { + } else { write!(f, "{}", n) } - }, + } } } } @@ -256,9 +253,6 @@ impl Display for DataValue { } DataValue::List(ls) => f.debug_list().entries(ls).finish(), DataValue::Set(s) => f.debug_list().entries(s).finish(), - DataValue::Rev(rev) => { - write!(f, "{}", rev.0) - } DataValue::Guard => { write!(f, "null") } @@ -355,7 +349,6 @@ mod tests { dbg!(s); } - #[test] fn display_datavalues() { println!("{}", DataValue::Null); diff --git a/src/query/sort.rs b/src/query/sort.rs index 2c0dca72..f7910fc4 100644 --- a/src/query/sort.rs +++ b/src/query/sort.rs @@ -2,7 +2,7 @@ * Copyright 2022, The Cozo Project Authors. Licensed under AGPL-3 or later. */ -use std::cmp::Reverse; +use std::cmp::{Ordering}; use std::collections::BTreeMap; use itertools::Itertools; @@ -11,7 +11,6 @@ use miette::Result; use crate::data::program::SortDir; use crate::data::symb::Symbol; use crate::data::tuple::Tuple; -use crate::data::value::DataValue; use crate::runtime::in_mem::InMemRelation; use crate::runtime::transact::SessionTx; @@ -21,29 +20,29 @@ impl SessionTx { original: InMemRelation, sorters: &[(Symbol, SortDir)], head: &[Symbol], - ) -> Result { + ) -> Result> { let head_indices: BTreeMap<_, _> = head.iter().enumerate().map(|(i, k)| (k, i)).collect(); let idx_sorters = sorters .iter() .map(|(k, dir)| (head_indices[k], *dir)) .collect_vec(); - let ret = self.new_temp_store(original.rule_name.symbol().span); - for (idx, tuple) in original.scan_all().enumerate() { - let tuple = tuple?; - let mut key = idx_sorters - .iter() - .map(|(idx, dir)| { - let mut val = tuple.0[*idx].clone(); - if *dir == SortDir::Dsc { - val = DataValue::Rev(Reverse(Box::new(val))); + + let mut all_data: Vec<_> = original.scan_all().try_collect()?; + all_data.sort_by(|a, b| { + for (idx, dir) in &idx_sorters { + match a.0[*idx].cmp(&b.0[*idx]) { + Ordering::Equal => {} + o => { + return match dir { + SortDir::Asc => o, + SortDir::Dsc => o.reverse(), + } } - val - }) - .collect_vec(); - key.push(DataValue::from(idx as i64)); - let key = Tuple(key); - ret.put_kv(key, tuple, 0); - } - Ok(ret) + } + } + Ordering::Equal + }); + + Ok(all_data) } } diff --git a/src/query/stored.rs b/src/query/stored.rs index ffbae088..01205244 100644 --- a/src/query/stored.rs +++ b/src/query/stored.rs @@ -5,7 +5,7 @@ use std::collections::BTreeMap; use itertools::Itertools; -use miette::{bail, Diagnostic, Result}; +use miette::{bail, Diagnostic, Result, WrapErr}; use smartstring::SmartString; use thiserror::Error; @@ -15,7 +15,7 @@ use crate::data::expr::Expr; use crate::data::program::{AlgoApply, InputInlineRulesOrAlgo, InputProgram, RelationOp}; use crate::data::relation::{ColumnDef, NullableColType}; use crate::data::symb::Symbol; -use crate::data::tuple::{EncodedTuple, Tuple}; +use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN}; use crate::data::value::DataValue; use crate::parse::parse_script; use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel}; @@ -118,10 +118,16 @@ impl SessionTx { if let Some(existing) = self.tx.get(&key, false)? { let mut tup = extracted.clone(); if !existing.is_empty() { - let v_tup = EncodedTuple(&existing); - if v_tup.arity() > 0 { - tup.0.extend(v_tup.decode().0); + let mut remaining = &existing[ENCODED_KEY_MIN_LEN..]; + while !remaining.is_empty() { + let (val, nxt) = DataValue::decode_from_key(remaining); + tup.0.push(val); + remaining = nxt; } + // let v_tup = EncodedTuple(&existing); + // if v_tup.arity() > 0 { + // tup.0.extend(v_tup.decode().0); + // } } old_tuples.push(DataValue::List(tup.0)); } @@ -299,15 +305,24 @@ impl SessionTx { let key = relation_store.adhoc_encode_key(&extracted, *span)?; let val = relation_store.adhoc_encode_val(&extracted, *span)?; + // println!("adhoc encoded key {:?}, {:?}", key, extracted); + // println!("adhoc encoded val {:?}", val); + if has_triggers { if let Some(existing) = self.tx.get(&key, false)? { let mut tup = extracted.clone(); - if !existing.is_empty() { - let v_tup = EncodedTuple(&existing); - if v_tup.arity() > 0 { - tup.0.extend(v_tup.decode().0); - } + let mut remaining = &existing[ENCODED_KEY_MIN_LEN..]; + while !remaining.is_empty() { + let (val, nxt) = DataValue::decode_from_key(remaining); + tup.0.push(val); + remaining = nxt; } + // if !existing.is_empty() { + // let v_tup = EncodedTuple(&existing); + // if v_tup.arity() > 0 { + // tup.0.extend(v_tup.decode().0); + // } + // } old_tuples.push(DataValue::List(tup.0)); } @@ -371,10 +386,12 @@ enum DataExtractor { impl DataExtractor { fn extract_data(&self, tuple: &Tuple) -> Result { Ok(match self { - DataExtractor::DefaultExtractor(expr, typ) => { - typ.coerce(expr.clone().eval_to_const()?)? - } - DataExtractor::IndexExtractor(i, typ) => typ.coerce(tuple.0[*i].clone())?, + DataExtractor::DefaultExtractor(expr, typ) => typ + .coerce(expr.clone().eval_to_const()?) + .wrap_err_with(|| format!("when processing tuple {:?}", tuple.0))?, + DataExtractor::IndexExtractor(i, typ) => typ + .coerce(tuple.0[*i].clone()) + .wrap_err_with(|| format!("when processing tuple {:?}", tuple.0))?, }) } } diff --git a/src/runtime/db.rs b/src/runtime/db.rs index 567022ac..d4eb04d9 100644 --- a/src/runtime/db.rs +++ b/src/runtime/db.rs @@ -3,7 +3,6 @@ */ use std::{fs, thread}; -use std::cmp::Ordering::Greater; use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::path::PathBuf; @@ -23,7 +22,7 @@ use cozorocks::{DbBuilder, RocksDb}; use crate::data::json::JsonValue; use crate::data::program::{InputProgram, QueryAssertion, RelationOp}; use crate::data::symb::Symbol; -use crate::data::tuple::{compare_tuple_keys, rusty_scratch_cmp, SCRATCH_DB_KEY_PREFIX_LEN, Tuple}; +use crate::data::tuple::{SCRATCH_DB_KEY_PREFIX_LEN, Tuple}; use crate::data::value::{DataValue, LARGEST_UTF_CHAR}; use crate::parse::{CozoScript, parse_script, SourceSpan}; use crate::parse::sys::SysOp; @@ -125,7 +124,7 @@ impl Db { let db_builder = builder .create_if_missing(is_new) .use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN) - .use_custom_comparator("cozo_rusty_cmp", rusty_scratch_cmp, false) + // .use_custom_comparator("cozo_rusty_cmp", rusty_scratch_cmp, false) .use_bloom_filter(true, 9.9, true) .path(store_path.to_str().unwrap()); @@ -603,15 +602,16 @@ impl Db { let sorted_result = tx.sort_and_collect(result, &input_program.out_opts.sorters, &entry_head)?; let sorted_iter = if let Some(offset) = input_program.out_opts.offset { - Left(sorted_result.scan_sorted().skip(offset)) + Left(sorted_result.into_iter().skip(offset)) } else { - Right(sorted_result.scan_sorted()) + Right(sorted_result.into_iter()) }; let sorted_iter = if let Some(limit) = input_program.out_opts.limit { Left(sorted_iter.take(limit)) } else { Right(sorted_iter) }; + let sorted_iter = sorted_iter.map(|t| Ok(t)); if let Some((meta, relation_op)) = &input_program.out_opts.store_relation { let to_clear = tx .execute_relation( @@ -726,9 +726,12 @@ impl Db { it.seek(&lower); let mut collected = vec![]; while let Some((k_slice, v_slice)) = it.pair()? { - if compare_tuple_keys(&upper, k_slice) != Greater { + if upper.as_slice() <= k_slice { break; } + // if compare_tuple_keys(&upper, k_slice) != Greater { + // break; + // } let meta = RelationHandle::decode(v_slice)?; let n_keys = meta.metadata.keys.len(); let n_dependents = meta.metadata.non_keys.len(); diff --git a/src/runtime/in_mem.rs b/src/runtime/in_mem.rs index 5b182f80..931f92a1 100644 --- a/src/runtime/in_mem.rs +++ b/src/runtime/in_mem.rs @@ -14,11 +14,9 @@ use either::{Left, Right}; use itertools::Itertools; use miette::Result; -use cozorocks::DbIter; - use crate::data::aggr::Aggregation; use crate::data::program::MagicSymbol; -use crate::data::tuple::{EncodedTuple, Tuple}; +use crate::data::tuple::{Tuple}; use crate::data::value::DataValue; use crate::query::eval::QueryLimiter; use crate::runtime::db::Poison; @@ -141,12 +139,6 @@ impl InMemRelation { let mut target = db.get(epoch as usize).unwrap().try_write().unwrap(); target.insert(tuple, Tuple::default()); } - pub(crate) fn put_kv(&self, tuple: Tuple, val: Tuple, epoch: u32) { - self.ensure_mem_db_for_epoch(epoch); - let db = self.mem_db.try_read().unwrap(); - let mut target = db.get(epoch as usize).unwrap().try_write().unwrap(); - target.insert(tuple, val); - } pub(crate) fn normal_aggr_put( &self, tuple: &Tuple, @@ -315,12 +307,6 @@ impl InMemRelation { pub(crate) fn scan_all(&self) -> impl Iterator> { self.scan_all_for_epoch(0) } - pub(crate) fn scan_sorted(&self) -> impl Iterator> { - self.ensure_mem_db_for_epoch(0); - let target = self.mem_db.try_read().unwrap(); - let target = target.get(0).unwrap().try_read().unwrap(); - target.clone().into_iter().map(|(_k, v)| Ok(v)) - } pub(crate) fn scan_prefix(&self, prefix: &Tuple) -> impl Iterator> { self.scan_prefix_for_epoch(prefix, 0) } @@ -379,24 +365,24 @@ impl InMemRelation { res.into_iter() } } - -struct SortedIter { - it: DbIter, - started: bool, -} - -impl Iterator for SortedIter { - type Item = Result; - fn next(&mut self) -> Option { - if !self.started { - self.started = true; - } else { - self.it.next(); - } - match self.it.pair() { - Err(e) => Some(Err(e.into())), - Ok(None) => None, - Ok(Some((_, v_slice))) => Some(Ok(EncodedTuple(v_slice).decode())), - } - } -} +// +// struct SortedIter { +// it: DbIter, +// started: bool, +// } +// +// impl Iterator for SortedIter { +// type Item = Result; +// fn next(&mut self) -> Option { +// if !self.started { +// self.started = true; +// } else { +// self.it.next(); +// } +// match self.it.pair() { +// Err(e) => Some(Err(e.into())), +// Ok(None) => None, +// Ok(Some((_, v_slice))) => Some(Ok(EncodedTuple(v_slice).decode())), +// } +// } +// } diff --git a/src/runtime/relation.rs b/src/runtime/relation.rs index 2350c623..044d9127 100644 --- a/src/runtime/relation.rs +++ b/src/runtime/relation.rs @@ -2,8 +2,6 @@ * Copyright 2022, The Cozo Project Authors. Licensed under AGPL-3 or later. */ -use std::cmp::max; -use std::cmp::Ordering::Greater; use std::fmt::{Debug, Display, Formatter}; use std::sync::atomic::Ordering; @@ -16,9 +14,10 @@ use thiserror::Error; use cozorocks::DbIter; +use crate::data::memcmp::MemCmpEncoder; use crate::data::relation::StoredRelationMetadata; use crate::data::symb::Symbol; -use crate::data::tuple::{compare_tuple_keys, EncodedTuple, Tuple}; +use crate::data::tuple::{ENCODED_KEY_MIN_LEN, Tuple}; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::transact::SessionTx; @@ -120,28 +119,18 @@ impl RelationHandle { fn encode_key_prefix(&self, len: usize) -> Vec { let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len); let prefix_bytes = self.id.0.to_be_bytes(); - ret.extend([ - prefix_bytes[2], - prefix_bytes[3], - prefix_bytes[4], - prefix_bytes[5], - prefix_bytes[6], - prefix_bytes[7], - ]); - ret.extend((len as u16).to_be_bytes()); - ret.resize(max(6, 4 * (len + 1)), 0); - + ret.extend(prefix_bytes); ret } - fn encode_key_element(&self, ret: &mut Vec, idx: usize, val: &DataValue) { - if idx > 0 { - let pos = (ret.len() as u32).to_be_bytes(); - for (i, u) in pos.iter().enumerate() { - ret[4 * (1 + idx) + i] = *u; - } - } - val.serialize(&mut Serializer::new(ret)).unwrap(); - } + // fn encode_key_element(&self, ret: &mut Vec, idx: usize, val: &DataValue) { + // if idx > 0 { + // let pos = (ret.len() as u32).to_be_bytes(); + // for (i, u) in pos.iter().enumerate() { + // ret[4 * (1 + idx) + i] = *u; + // } + // } + // val.serialize(&mut Serializer::new(ret)).unwrap(); + // } pub(crate) fn adhoc_encode_key(&self, tuple: &Tuple, span: SourceSpan) -> Result> { let len = self.metadata.keys.len(); ensure!( @@ -154,18 +143,24 @@ impl RelationHandle { } ); let mut ret = self.encode_key_prefix(len); - for i in 0..len { - self.encode_key_element(&mut ret, i, &tuple.0[i]) + for val in &tuple.0[0..len] { + ret.encode_datavalue(val); } + // for i in 0..len { + // self.encode_key_element(&mut ret, i, &tuple.0[i]) + // } Ok(ret) } pub(crate) fn adhoc_encode_val(&self, tuple: &Tuple, _span: SourceSpan) -> Result> { let start = self.metadata.keys.len(); let len = self.metadata.non_keys.len(); let mut ret = self.encode_key_prefix(len); - for i in 0..len { - self.encode_key_element(&mut ret, i, &tuple.0[i + start]) - } + // for i in 0..len { + // self.encode_key_element(&mut ret, i, &tuple.0[i + start]) + // } + tuple.0[start..] + .serialize(&mut Serializer::new(&mut ret)) + .unwrap(); Ok(ret) } pub(crate) fn ensure_compatible(&self, inp: &InputRelationHandle) -> Result<()> { @@ -284,16 +279,23 @@ impl RelationIterator { Ok(match self.inner.pair()? { None => None, Some((k_slice, v_slice)) => { - if compare_tuple_keys(&self.upper_bound, k_slice) != Greater { + if self.upper_bound.as_slice() <= k_slice { + // + // } + // if compare_tuple_keys(&self.upper_bound, k_slice) != Greater { None } else { - let mut tup = EncodedTuple(k_slice).decode(); + let mut tup = Tuple::decode_from_key(k_slice); if !v_slice.is_empty() { - let v_tup = EncodedTuple(v_slice); - if v_tup.arity() > 0 { - tup.0.extend(v_tup.decode().0); - } + let vals: Vec = rmp_serde::from_slice(&v_slice[ENCODED_KEY_MIN_LEN..]).unwrap(); + tup.0.extend(vals); } + // if !v_slice.is_empty() { + // let v_tup = EncodedTuple(v_slice); + // if v_tup.arity() > 0 { + // tup.0.extend(v_tup.decode().0); + // } + // } Some(tup) } } diff --git a/tests/air_routes.rs b/tests/air_routes.rs index 3e33e65c..231bdc19 100644 --- a/tests/air_routes.rs +++ b/tests/air_routes.rs @@ -23,98 +23,103 @@ lazy_static! { let init = Instant::now(); db.run_script(r##" -{ - res[idx, label, typ, code, icao, desc, region, runways, longest, elev, country, city, lat, lon] <~ - CsvReader(types: ['Int', 'Any', 'Any', 'Any', 'Any', 'Any', 'Any', 'Int?', 'Float?', 'Float?', 'Any', 'Any', 'Float?', 'Float?'], - url: 'file://./tests/air-routes-latest-nodes.csv', - has_headers: true) - - ?[code, icao, desc, region, runways, longest, elev, country, city, lat, lon] := - res[idx, label, typ, code, icao, desc, region, runways, longest, elev, country, city, lat, lon], - label == 'airport' - - :replace airport { - code: String - => - icao: String, - desc: String, - region: String, - runways: Int, - longest: Float, - elev: Float, - country: String, - city: String, - lat: Float, - lon: Float - } -} -{ - res[idx, label, typ, code, icao, desc] <~ - CsvReader(types: ['Int', 'Any', 'Any', 'Any', 'Any', 'Any'], - url: 'file://./tests/air-routes-latest-nodes.csv', - has_headers: true) - ?[code, desc] := - res[idx, label, typ, code, icao, desc], - label == 'country' - - :replace country { - code: String - => - desc: String - } -} -{ - res[idx, label, typ, code, icao, desc] <~ - CsvReader(types: ['Int', 'Any', 'Any', 'Any', 'Any', 'Any'], - url: 'file://./tests/air-routes-latest-nodes.csv', - has_headers: true) - ?[idx, code, desc] := - res[idx, label, typ, code, icao, desc], - label == 'continent' - - :replace continent { - code: String - => - desc: String - } -} -{ - res[idx, label, typ, code] <~ - CsvReader(types: ['Int', 'Any', 'Any', 'Any'], - url: 'file://./tests/air-routes-latest-nodes.csv', - has_headers: true) - ?[idx, code] := - res[idx, label, typ, code], - - :replace idx2code { idx => code } -} -{ - res[] <~ - CsvReader(types: ['Int', 'Int', 'Int', 'String', 'Float?'], - url: 'file://./tests/air-routes-latest-edges.csv', - has_headers: true) - ?[fr, to, dist] := - res[idx, fr_i, to_i, typ, dist], - typ == 'route', - *idx2code[fr_i, fr], - *idx2code[to_i, to] - - :replace route { fr: String, to: String => dist: Float } -} -{ - res[] <~ - CsvReader(types: ['Int', 'Int', 'Int', 'String'], - url: 'file://./tests/air-routes-latest-edges.csv', - has_headers: true) - ?[entity, contained] := - res[idx, fr_i, to_i, typ], - typ == 'contains', - *idx2code[fr_i, entity], - *idx2code[to_i, contained] - - - :replace contain { entity: String, contained: String } -} + res[idx, label, typ, code, icao, desc, region, runways, longest, elev, country, city, lat, lon] <~ + CsvReader(types: ['Int', 'Any', 'Any', 'Any', 'Any', 'Any', 'Any', 'Int?', 'Float?', 'Float?', 'Any', 'Any', 'Float?', 'Float?'], + url: 'file://./tests/air-routes-latest-nodes.csv', + has_headers: true) + + ?[code, icao, desc, region, runways, longest, elev, country, city, lat, lon] := + res[idx, label, typ, code, icao, desc, region, runways, longest, elev, country, city, lat, lon], + label == 'airport' + + :replace airport { + code: String + => + icao: String, + desc: String, + region: String, + runways: Int, + longest: Float, + elev: Float, + country: String, + city: String, + lat: Float, + lon: Float + } + "##, &Default::default()).unwrap(); + + db.run_script(r##" + res[idx, label, typ, code, icao, desc] <~ + CsvReader(types: ['Int', 'Any', 'Any', 'Any', 'Any', 'Any'], + url: 'file://./tests/air-routes-latest-nodes.csv', + has_headers: true) + ?[code, desc] := + res[idx, label, typ, code, icao, desc], + label == 'country' + + :replace country { + code: String + => + desc: String + } + "##, &Default::default()).unwrap(); + + db.run_script(r##" + res[idx, label, typ, code, icao, desc] <~ + CsvReader(types: ['Int', 'Any', 'Any', 'Any', 'Any', 'Any'], + url: 'file://./tests/air-routes-latest-nodes.csv', + has_headers: true) + ?[idx, code, desc] := + res[idx, label, typ, code, icao, desc], + label == 'continent' + + :replace continent { + code: String + => + desc: String + } + "##, &Default::default()).unwrap(); + + db.run_script(r##" + res[idx, label, typ, code] <~ + CsvReader(types: ['Int', 'Any', 'Any', 'Any'], + url: 'file://./tests/air-routes-latest-nodes.csv', + has_headers: true) + ?[idx, code] := + res[idx, label, typ, code], + + :replace idx2code { idx: Int => code: String } + "##, &Default::default()).unwrap(); + + // println!("{}", db.run_script("?[idx, code] := *idx2code[idx, code]", &Default::default()).unwrap()); + + db.run_script(r##" + res[] <~ + CsvReader(types: ['Int', 'Int', 'Int', 'String', 'Float?'], + url: 'file://./tests/air-routes-latest-edges.csv', + has_headers: true) + ?[fr, to, dist] := + res[idx, fr_i, to_i, typ, dist], + typ == 'route', + *idx2code[fr_i, fr], + *idx2code[to_i, to] + + :replace route { fr: String, to: String => dist: Float } + "##, &Default::default()).unwrap(); + + db.run_script(r##" + res[] <~ + CsvReader(types: ['Int', 'Int', 'Int', 'String'], + url: 'file://./tests/air-routes-latest-edges.csv', + has_headers: true) + ?[entity, contained] := + res[idx, fr_i, to_i, typ], + typ == 'contains', + *idx2code[fr_i, entity], + *idx2code[to_i, contained] + + + :replace contain { entity: String, contained: String } "##, &Default::default()).unwrap(); db.run_script("::remove idx2code", &Default::default())