Merge pull request 'memcmp' (#1) from memcmp into main

main
zh217 2 years ago
commit 7ec5c7edac

1
Cargo.lock generated

@ -339,6 +339,7 @@ version = "0.1.0"
dependencies = [
"approx",
"base64",
"byteorder",
"casey",
"chrono",
"chrono-tz",

@ -34,6 +34,7 @@ chrono = "0.4.19"
chrono-tz = "0.6.3"
priority-queue = "1.2.3"
ordered-float = "3.0.0"
byteorder = "1.4.3"
num-traits = "0.2.15"
itertools = "0.10.3"
regex = "1.6.0"

@ -43,7 +43,6 @@ fn ensure_same_value_type(a: &DataValue, b: &DataValue) -> Result<()> {
| (Regex(_), Regex(_))
| (List(_), List(_))
| (Set(_), Set(_))
| (Rev(_), Rev(_))
| (Guard, Guard)
| (Bot, Bot)
) {

@ -84,7 +84,6 @@ impl From<DataValue> for JsonValue {
DataValue::List(l) => {
JsonValue::Array(l.iter().map(|v| JsonValue::from(v.clone())).collect())
}
DataValue::Rev(v) => JsonValue::from(*v.0),
DataValue::Bot => panic!("found bottom"),
DataValue::Guard => panic!("found guard"),
DataValue::Set(l) => {

@ -0,0 +1,334 @@
/*
* Copyright 2022, The Cozo Project Authors. Licensed under AGPL-3 or later.
*/
use std::collections::BTreeSet;
use std::io::Write;
use std::str::FromStr;
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use regex::Regex;
use crate::data::value::{DataValue, Num, RegexWrapper, UuidWrapper};
const INIT_TAG: u8 = 0x00;
const NULL_TAG: u8 = 0x01;
const FALSE_TAG: u8 = 0x02;
const TRUE_TAG: u8 = 0x03;
const NUM_TAG: u8 = 0x05;
const STR_TAG: u8 = 0x06;
const BYTES_TAG: u8 = 0x07;
const UUID_TAG: u8 = 0x08;
const REGEX_TAG: u8 = 0x09;
const LIST_TAG: u8 = 0x0A;
const SET_TAG: u8 = 0x0B;
const GUARD_TAG: u8 = 0xFE;
const BOT_TAG: u8 = 0xFF;
pub(crate) trait MemCmpEncoder: Write {
fn encode_datavalue(&mut self, v: &DataValue) {
match v {
DataValue::Null => self.write_u8(NULL_TAG).unwrap(),
DataValue::Bool(false) => self.write_u8(FALSE_TAG).unwrap(),
DataValue::Bool(true) => self.write_u8(TRUE_TAG).unwrap(),
DataValue::Num(n) => {
self.write_u8(NUM_TAG).unwrap();
self.encode_num(*n);
}
DataValue::Str(s) => {
self.write_u8(STR_TAG).unwrap();
self.encode_bytes(s.as_bytes());
}
DataValue::Bytes(b) => {
self.write_u8(BYTES_TAG).unwrap();
self.encode_bytes(b)
}
DataValue::Uuid(u) => {
self.write_u8(UUID_TAG).unwrap();
let (s_l, s_m, s_h, s_rest) = u.0.as_fields();
self.write_u16::<BigEndian>(s_h).unwrap();
self.write_u16::<BigEndian>(s_m).unwrap();
self.write_u32::<BigEndian>(s_l).unwrap();
self.encode_bytes(s_rest)
}
DataValue::Regex(rx) => {
self.write_u8(REGEX_TAG).unwrap();
let s = rx.0.as_str().as_bytes();
self.encode_bytes(s)
}
DataValue::List(l) => {
self.write_u8(LIST_TAG).unwrap();
for el in l {
self.encode_datavalue(el);
}
self.write_u8(INIT_TAG).unwrap()
}
DataValue::Set(s) => {
self.write_u8(SET_TAG).unwrap();
for el in s {
self.encode_datavalue(el);
}
self.write_u8(INIT_TAG).unwrap()
}
DataValue::Guard => self.write_u8(GUARD_TAG).unwrap(),
DataValue::Bot => self.write_u8(BOT_TAG).unwrap(),
}
}
fn encode_num(&mut self, v: Num) {
let f = v.get_float();
let u = order_encode_f64(f);
self.write_u64::<BigEndian>(u).unwrap();
match v {
Num::I(i) => {
self.write_u8(0b0).unwrap();
let i_lsb = order_encode_i64(i) as u16;
self.write_u16::<BigEndian>(i_lsb).unwrap();
}
Num::F(_) => {
self.write_u8(0b1000).unwrap();
}
}
}
fn encode_bytes(&mut self, key: &[u8]) {
let len = key.len();
let mut index = 0;
while index <= len {
let remain = len - index;
let mut pad: usize = 0;
if remain > ENC_GROUP_SIZE {
self.write_all(&key[index..index + ENC_GROUP_SIZE]).unwrap();
} else {
pad = ENC_GROUP_SIZE - remain;
self.write_all(&key[index..]).unwrap();
self.write_all(&ENC_ASC_PADDING[..pad]).unwrap();
}
self.write_all(&[ENC_MARKER - (pad as u8)]).unwrap();
index += ENC_GROUP_SIZE;
}
}
}
pub fn decode_bytes(data: &[u8]) -> (Vec<u8>, &[u8]) {
let mut key = Vec::with_capacity(data.len() / (ENC_GROUP_SIZE + 1) * ENC_GROUP_SIZE);
let mut offset = 0;
let chunk_len = ENC_GROUP_SIZE + 1;
loop {
let next_offset = offset + chunk_len;
debug_assert!(next_offset <= data.len());
let chunk = &data[offset..next_offset];
offset = next_offset;
let (&marker, bytes) = chunk.split_last().unwrap();
let pad_size = (ENC_MARKER - marker) as usize;
if pad_size == 0 {
key.write_all(bytes).unwrap();
continue;
}
debug_assert!(pad_size <= ENC_GROUP_SIZE);
let (bytes, padding) = bytes.split_at(ENC_GROUP_SIZE - pad_size);
key.write_all(bytes).unwrap();
debug_assert!(!padding.iter().any(|x| *x != 0));
return (key, &data[offset..]);
}
}
const SIGN_MARK: u64 = 0x8000000000000000;
fn order_encode_i64(v: i64) -> u64 {
v as u64 ^ SIGN_MARK
}
fn order_encode_f64(v: f64) -> u64 {
let u = v.to_bits();
if v.is_sign_positive() {
u | SIGN_MARK
} else {
!u
}
}
fn order_decode_f64(u: u64) -> f64 {
let u = if u & SIGN_MARK > 0 {
u & (!SIGN_MARK)
} else {
!u
};
f64::from_bits(u)
}
const ENC_GROUP_SIZE: usize = 8;
const ENC_MARKER: u8 = b'\xff';
const ENC_ASC_PADDING: [u8; ENC_GROUP_SIZE] = [0; ENC_GROUP_SIZE];
impl Num {
pub(crate) fn decode_from_key(bs: &[u8]) -> (Self, &[u8]) {
let (float_part, remaining) = bs.split_at(8);
let fu = BigEndian::read_u64(float_part);
let f = order_decode_f64(fu);
let (tag, remaining) = remaining.split_first().unwrap();
if *tag == 0b1000 {
return (Num::F(f), remaining);
}
let (subtag, remaining) = remaining.split_at(2);
let n = f as i64;
let mut n_bytes = n.to_be_bytes();
n_bytes[6] = subtag[0];
n_bytes[7] = subtag[1];
let n = BigEndian::read_i64(&n_bytes);
return (Num::I(n), remaining);
}
}
impl DataValue {
pub(crate) fn decode_from_key(bs: &[u8]) -> (Self, &[u8]) {
let (tag, remaining) = bs.split_first().unwrap();
match *tag {
NULL_TAG => (DataValue::Null, remaining),
FALSE_TAG => (DataValue::Bool(false), remaining),
TRUE_TAG => (DataValue::Bool(true), remaining),
NUM_TAG => {
let (n, remaining) = Num::decode_from_key(remaining);
(DataValue::Num(n), remaining)
}
STR_TAG => {
let (bytes, remaining) = decode_bytes(remaining);
let s = unsafe { String::from_utf8_unchecked(bytes) };
(DataValue::Str(s.into()), remaining)
}
BYTES_TAG => {
let (bytes, remaining) = decode_bytes(remaining);
(DataValue::Bytes(bytes), remaining)
}
UUID_TAG => {
let (uuid_data, remaining) = remaining.split_at(16);
let s_h = BigEndian::read_u16(&uuid_data[0..2]);
let s_m = BigEndian::read_u16(&uuid_data[2..4]);
let s_l = BigEndian::read_u32(&uuid_data[4..8]);
let mut s_rest = [0u8; 8];
s_rest.copy_from_slice(&uuid_data[8..]);
let uuid = uuid::Uuid::from_fields(s_l, s_m, s_h, &s_rest);
(DataValue::Uuid(UuidWrapper(uuid)), remaining)
}
REGEX_TAG => {
let (bytes, remaining) = decode_bytes(remaining);
let s = unsafe { String::from_utf8_unchecked(bytes) };
(
DataValue::Regex(RegexWrapper(Regex::from_str(&s).unwrap())),
remaining,
)
}
LIST_TAG => {
let mut collected = vec![];
let mut remaining = remaining;
while remaining[0] != INIT_TAG {
let (val, next_chunk) = DataValue::decode_from_key(remaining);
remaining = next_chunk;
collected.push(val);
}
(DataValue::List(collected), &remaining[1..])
}
SET_TAG => {
let mut collected = BTreeSet::default();
let mut remaining = remaining;
while remaining[0] != INIT_TAG {
let (val, next_chunk) = DataValue::decode_from_key(remaining);
remaining = next_chunk;
collected.insert(val);
}
(DataValue::Set(collected), &remaining[1..])
}
GUARD_TAG => (DataValue::Guard, remaining),
BOT_TAG => (DataValue::Bot, remaining),
_ => unreachable!("{:?}", bs),
}
}
}
impl<T: Write> MemCmpEncoder for T {}
#[cfg(test)]
mod tests {
use smartstring::SmartString;
use crate::data::memcmp::{decode_bytes, MemCmpEncoder};
use crate::data::value::DataValue;
#[test]
fn encode_decode_bytes() {
let target = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit...";
for i in 0..target.len() {
let bs = &target[i..];
let mut encoder: Vec<u8> = vec![];
encoder.encode_bytes(bs);
let (decoded, remaining) = decode_bytes(&encoder);
assert!(remaining.is_empty());
assert_eq!(bs, decoded);
let mut encoder: Vec<u8> = vec![];
encoder.encode_bytes(target);
encoder.encode_bytes(bs);
encoder.encode_bytes(bs);
encoder.encode_bytes(target);
let (decoded, remaining) = decode_bytes(&encoder);
assert_eq!(&target[..], decoded);
let (decoded, remaining) = decode_bytes(remaining);
assert_eq!(bs, decoded);
let (decoded, remaining) = decode_bytes(remaining);
assert_eq!(bs, decoded);
let (decoded, remaining) = decode_bytes(remaining);
assert_eq!(&target[..], decoded);
assert!(remaining.is_empty());
}
}
#[test]
fn specific_encode() {
let mut encoder = vec![];
encoder.encode_datavalue(&DataValue::from(2095));
println!("e1 {:?}", encoder);
encoder.encode_datavalue(&DataValue::Str(SmartString::from("MSS")));
println!("e2 {:?}", encoder);
let (a, remaining) = DataValue::decode_from_key(&encoder);
println!("r {:?}", remaining);
let (b, remaining) = DataValue::decode_from_key(remaining);
assert!(remaining.is_empty());
assert_eq!(a, DataValue::from(2095));
assert_eq!(b, DataValue::Str(SmartString::from("MSS")));
}
#[test]
fn encode_decode_datavalues() {
let mut dv = vec![
DataValue::Null,
DataValue::Bool(false),
DataValue::Bool(true),
DataValue::from(1),
DataValue::from(1.0),
DataValue::from(i64::MAX),
DataValue::from(i64::MAX - 1),
DataValue::from(i64::MAX - 2),
DataValue::from(i64::MIN),
DataValue::from(i64::MIN + 1),
DataValue::from(i64::MIN + 2),
DataValue::from(f64::INFINITY),
DataValue::from(f64::NEG_INFINITY),
DataValue::List(vec![])
];
dv.push(DataValue::List(dv.clone()));
dv.push(DataValue::List(dv.clone()));
let mut encoded = vec![];
let v = DataValue::List(dv);
encoded.encode_datavalue(&v);
let (decoded, remaining) = DataValue::decode_from_key(&encoded);
assert!(remaining.is_empty());
assert_eq!(decoded, v);
}
}

@ -11,6 +11,7 @@ pub(crate) mod program;
pub(crate) mod aggr;
pub(crate) mod functions;
pub(crate) mod relation;
pub(crate) mod memcmp;
#[cfg(test)]
mod tests;

@ -2,13 +2,11 @@
* Copyright 2022, The Cozo Project Authors. Licensed under AGPL-3 or later.
*/
use std::cmp::{max, min, Ordering};
use std::fmt::{Debug, Formatter};
use miette::Result;
use rmp_serde::Serializer;
use serde::Serialize;
use crate::data::memcmp::MemCmpEncoder;
use crate::data::value::DataValue;
use crate::runtime::relation::RelationId;
@ -23,157 +21,164 @@ impl Debug for Tuple {
}
}
pub(crate) type TupleIter<'a> = Box<dyn Iterator<Item=Result<Tuple>> + 'a>;
pub(crate) type TupleIter<'a> = Box<dyn Iterator<Item = Result<Tuple>> + 'a>;
impl Tuple {
pub(crate) fn encode_as_key(&self, prefix: RelationId) -> Vec<u8> {
let len = self.0.len();
let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len);
let prefix_bytes = prefix.0.to_be_bytes();
ret.extend([
prefix_bytes[2],
prefix_bytes[3],
prefix_bytes[4],
prefix_bytes[5],
prefix_bytes[6],
prefix_bytes[7],
]);
ret.extend((len as u16).to_be_bytes());
ret.resize(max(6, 4 * (len + 1)), 0);
for (idx, val) in self.0.iter().enumerate() {
if idx > 0 {
let pos = (ret.len() as u32).to_be_bytes();
for (i, u) in pos.iter().enumerate() {
ret[4 * (1 + idx) + i] = *u;
}
}
val.serialize(&mut Serializer::new(&mut ret)).unwrap();
ret.extend(prefix_bytes);
for val in self.0.iter() {
ret.encode_datavalue(val);
}
// println!("encoded as key {:?}", ret);
ret
// for (idx, val) in self.0.iter().enumerate() {
// if idx > 0 {
// let pos = (ret.len() as u32).to_be_bytes();
// for (i, u) in pos.iter().enumerate() {
// ret[4 * (1 + idx) + i] = *u;
// }
// }
// val.serialize(&mut Serializer::new(&mut ret)).unwrap();
// }
// ret
}
pub(crate) fn decode_from_key(key: &[u8]) -> Self {
let mut remaining = &key[ENCODED_KEY_MIN_LEN..];
let mut ret = vec![];
while !remaining.is_empty() {
let (val, next) = DataValue::decode_from_key(remaining);
ret.push(val);
remaining = next;
}
Tuple(ret)
}
}
#[derive(Copy, Clone, Debug)]
pub(crate) struct EncodedTuple<'a>(pub(crate) &'a [u8]);
impl<'a> From<&'a [u8]> for EncodedTuple<'a> {
fn from(s: &'a [u8]) -> Self {
EncodedTuple(s)
}
}
impl<'a> EncodedTuple<'a> {
pub(crate) fn prefix(&self) -> RelationId {
debug_assert!(self.0.len() >= 6, "bad data: {:x?}", self.0);
let id = u64::from_be_bytes([
0, 0, self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5],
]);
RelationId(id)
}
pub(crate) fn arity(&self) -> usize {
if self.0.len() == 6 {
return 0;
}
debug_assert!(self.0.len() >= 8, "bad data: {:x?}", self.0);
u16::from_be_bytes([self.0[6], self.0[7]]) as usize
}
fn force_get(&self, idx: usize) -> DataValue {
let pos = if idx == 0 {
let arity = u16::from_be_bytes([self.0[6], self.0[7]]) as usize;
4 * (arity + 1)
} else {
let len_pos = (idx + 1) * 4;
u32::from_be_bytes([
self.0[len_pos],
self.0[len_pos + 1],
self.0[len_pos + 2],
self.0[len_pos + 3],
]) as usize
};
rmp_serde::from_slice(&self.0[pos..]).unwrap()
}
pub(crate) fn get(&self, idx: usize) -> DataValue {
let pos = if idx == 0 {
4 * (self.arity() + 1)
} else {
let len_pos = (idx + 1) * 4;
debug_assert!(self.0.len() >= len_pos + 4, "bad data: {:x?}", self.0);
u32::from_be_bytes([
self.0[len_pos],
self.0[len_pos + 1],
self.0[len_pos + 2],
self.0[len_pos + 3],
]) as usize
};
debug_assert!(
pos < self.0.len(),
"bad data length for data: {:x?}",
self.0
);
rmp_serde::from_slice(&self.0[pos..]).expect("data corruption when getting from tuple")
}
pub(crate) fn iter(&self) -> EncodedTupleIter<'a> {
EncodedTupleIter {
tuple: *self,
size: 0,
pos: 0,
}
}
pub(crate) fn decode(&self) -> Tuple {
Tuple(self.iter().collect())
}
}
pub(crate) struct EncodedTupleIter<'a> {
tuple: EncodedTuple<'a>,
size: usize,
pos: usize,
}
impl<'a> Iterator for EncodedTupleIter<'a> {
type Item = DataValue;
fn next(&mut self) -> Option<Self::Item> {
if self.size == 0 {
let arity = self.tuple.arity();
self.size = arity;
}
if self.pos == self.size {
None
} else {
let pos = self.pos;
self.pos += 1;
Some(self.tuple.get(pos))
}
}
}
pub(crate) fn rusty_scratch_cmp(a: &[u8], b: &[u8]) -> i8 {
match compare_tuple_keys(a, b) {
Ordering::Greater => 1,
Ordering::Equal => 0,
Ordering::Less => -1,
}
}
pub(crate) fn compare_tuple_keys(a: &[u8], b: &[u8]) -> Ordering {
let a = EncodedTuple(a);
let b = EncodedTuple(b);
match a.prefix().cmp(&b.prefix()) {
Ordering::Equal => {}
o => return o,
}
let a_len = a.arity();
let b_len = b.arity();
for idx in 0..min(a_len, b_len) {
let av = a.force_get(idx);
let bv = b.force_get(idx);
match av.cmp(&bv) {
Ordering::Equal => {}
o => return o,
}
}
a_len.cmp(&b_len)
}
pub(crate) const ENCODED_KEY_MIN_LEN: usize = 8;
//
// #[derive(Copy, Clone, Debug)]
// pub(crate) struct EncodedTuple<'a>(pub(crate) &'a [u8]);
//
// impl<'a> From<&'a [u8]> for EncodedTuple<'a> {
// fn from(s: &'a [u8]) -> Self {
// EncodedTuple(s)
// }
// }
//
// impl<'a> EncodedTuple<'a> {
// pub(crate) fn prefix(&self) -> RelationId {
// debug_assert!(self.0.len() >= 6, "bad data: {:x?}", self.0);
// let id = u64::from_be_bytes([
// 0, 0, self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5],
// ]);
// RelationId(id)
// }
// pub(crate) fn arity(&self) -> usize {
// if self.0.len() == 6 {
// return 0;
// }
// debug_assert!(self.0.len() >= 8, "bad data: {:x?}", self.0);
// u16::from_be_bytes([self.0[6], self.0[7]]) as usize
// }
// fn force_get(&self, idx: usize) -> DataValue {
// let pos = if idx == 0 {
// let arity = u16::from_be_bytes([self.0[6], self.0[7]]) as usize;
// 4 * (arity + 1)
// } else {
// let len_pos = (idx + 1) * 4;
// u32::from_be_bytes([
// self.0[len_pos],
// self.0[len_pos + 1],
// self.0[len_pos + 2],
// self.0[len_pos + 3],
// ]) as usize
// };
// rmp_serde::from_slice(&self.0[pos..]).unwrap()
// }
// pub(crate) fn get(&self, idx: usize) -> DataValue {
// let pos = if idx == 0 {
// 4 * (self.arity() + 1)
// } else {
// let len_pos = (idx + 1) * 4;
// debug_assert!(self.0.len() >= len_pos + 4, "bad data: {:x?}", self.0);
// u32::from_be_bytes([
// self.0[len_pos],
// self.0[len_pos + 1],
// self.0[len_pos + 2],
// self.0[len_pos + 3],
// ]) as usize
// };
// debug_assert!(
// pos < self.0.len(),
// "bad data length for data: {:x?}",
// self.0
// );
// rmp_serde::from_slice(&self.0[pos..]).expect("data corruption when getting from tuple")
// }
//
// pub(crate) fn iter(&self) -> EncodedTupleIter<'a> {
// EncodedTupleIter {
// tuple: *self,
// size: 0,
// pos: 0,
// }
// }
// pub(crate) fn decode(&self) -> Tuple {
// Tuple(self.iter().collect())
// }
// }
//
// pub(crate) struct EncodedTupleIter<'a> {
// tuple: EncodedTuple<'a>,
// size: usize,
// pos: usize,
// }
//
// impl<'a> Iterator for EncodedTupleIter<'a> {
// type Item = DataValue;
//
// fn next(&mut self) -> Option<Self::Item> {
// if self.size == 0 {
// let arity = self.tuple.arity();
// self.size = arity;
// }
// if self.pos == self.size {
// None
// } else {
// let pos = self.pos;
// self.pos += 1;
// Some(self.tuple.get(pos))
// }
// }
// }
//
// pub(crate) fn rusty_scratch_cmp(a: &[u8], b: &[u8]) -> i8 {
// match compare_tuple_keys(a, b) {
// Ordering::Greater => 1,
// Ordering::Equal => 0,
// Ordering::Less => -1,
// }
// }
//
//
// pub(crate) fn compare_tuple_keys(a: &[u8], b: &[u8]) -> Ordering {
// let a = EncodedTuple(a);
// let b = EncodedTuple(b);
// match a.prefix().cmp(&b.prefix()) {
// Ordering::Equal => {}
// o => return o,
// }
// let a_len = a.arity();
// let b_len = b.arity();
// for idx in 0..min(a_len, b_len) {
// let av = a.force_get(idx);
// let bv = b.force_get(idx);
// match av.cmp(&bv) {
// Ordering::Equal => {}
// o => return o,
// }
// }
// a_len.cmp(&b_len)
// }

@ -2,7 +2,7 @@
* Copyright 2022, The Cozo Project Authors. Licensed under AGPL-3 or later.
*/
use std::cmp::{Ordering, Reverse};
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
@ -102,8 +102,6 @@ pub(crate) enum DataValue {
List(Vec<DataValue>),
#[serde(rename = "H", alias = "Set")]
Set(BTreeSet<DataValue>),
#[serde(rename = "R", alias = "Rev")]
Rev(Reverse<Box<DataValue>>),
#[serde(rename = "G", alias = "Guard")]
Guard,
#[serde(rename = "_", alias = "Bot")]
@ -181,11 +179,10 @@ impl Display for Num {
} else {
write!(f, r#"to_float("INF")"#)
}
}
else {
} else {
write!(f, "{}", n)
}
},
}
}
}
}
@ -256,9 +253,6 @@ impl Display for DataValue {
}
DataValue::List(ls) => f.debug_list().entries(ls).finish(),
DataValue::Set(s) => f.debug_list().entries(s).finish(),
DataValue::Rev(rev) => {
write!(f, "{}", rev.0)
}
DataValue::Guard => {
write!(f, "null")
}
@ -355,7 +349,6 @@ mod tests {
dbg!(s);
}
#[test]
fn display_datavalues() {
println!("{}", DataValue::Null);

@ -2,7 +2,7 @@
* Copyright 2022, The Cozo Project Authors. Licensed under AGPL-3 or later.
*/
use std::cmp::Reverse;
use std::cmp::{Ordering};
use std::collections::BTreeMap;
use itertools::Itertools;
@ -11,7 +11,6 @@ use miette::Result;
use crate::data::program::SortDir;
use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::transact::SessionTx;
@ -21,29 +20,29 @@ impl SessionTx {
original: InMemRelation,
sorters: &[(Symbol, SortDir)],
head: &[Symbol],
) -> Result<InMemRelation> {
) -> Result<Vec<Tuple>> {
let head_indices: BTreeMap<_, _> = head.iter().enumerate().map(|(i, k)| (k, i)).collect();
let idx_sorters = sorters
.iter()
.map(|(k, dir)| (head_indices[k], *dir))
.collect_vec();
let ret = self.new_temp_store(original.rule_name.symbol().span);
for (idx, tuple) in original.scan_all().enumerate() {
let tuple = tuple?;
let mut key = idx_sorters
.iter()
.map(|(idx, dir)| {
let mut val = tuple.0[*idx].clone();
if *dir == SortDir::Dsc {
val = DataValue::Rev(Reverse(Box::new(val)));
let mut all_data: Vec<_> = original.scan_all().try_collect()?;
all_data.sort_by(|a, b| {
for (idx, dir) in &idx_sorters {
match a.0[*idx].cmp(&b.0[*idx]) {
Ordering::Equal => {}
o => {
return match dir {
SortDir::Asc => o,
SortDir::Dsc => o.reverse(),
}
val
})
.collect_vec();
key.push(DataValue::from(idx as i64));
let key = Tuple(key);
ret.put_kv(key, tuple, 0);
}
Ok(ret)
}
}
Ordering::Equal
});
Ok(all_data)
}
}

@ -5,7 +5,7 @@
use std::collections::BTreeMap;
use itertools::Itertools;
use miette::{bail, Diagnostic, Result};
use miette::{bail, Diagnostic, Result, WrapErr};
use smartstring::SmartString;
use thiserror::Error;
@ -15,7 +15,7 @@ use crate::data::expr::Expr;
use crate::data::program::{AlgoApply, InputInlineRulesOrAlgo, InputProgram, RelationOp};
use crate::data::relation::{ColumnDef, NullableColType};
use crate::data::symb::Symbol;
use crate::data::tuple::{EncodedTuple, Tuple};
use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN};
use crate::data::value::DataValue;
use crate::parse::parse_script;
use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel};
@ -118,10 +118,16 @@ impl SessionTx {
if let Some(existing) = self.tx.get(&key, false)? {
let mut tup = extracted.clone();
if !existing.is_empty() {
let v_tup = EncodedTuple(&existing);
if v_tup.arity() > 0 {
tup.0.extend(v_tup.decode().0);
let mut remaining = &existing[ENCODED_KEY_MIN_LEN..];
while !remaining.is_empty() {
let (val, nxt) = DataValue::decode_from_key(remaining);
tup.0.push(val);
remaining = nxt;
}
// let v_tup = EncodedTuple(&existing);
// if v_tup.arity() > 0 {
// tup.0.extend(v_tup.decode().0);
// }
}
old_tuples.push(DataValue::List(tup.0));
}
@ -299,15 +305,24 @@ impl SessionTx {
let key = relation_store.adhoc_encode_key(&extracted, *span)?;
let val = relation_store.adhoc_encode_val(&extracted, *span)?;
// println!("adhoc encoded key {:?}, {:?}", key, extracted);
// println!("adhoc encoded val {:?}", val);
if has_triggers {
if let Some(existing) = self.tx.get(&key, false)? {
let mut tup = extracted.clone();
if !existing.is_empty() {
let v_tup = EncodedTuple(&existing);
if v_tup.arity() > 0 {
tup.0.extend(v_tup.decode().0);
}
}
let mut remaining = &existing[ENCODED_KEY_MIN_LEN..];
while !remaining.is_empty() {
let (val, nxt) = DataValue::decode_from_key(remaining);
tup.0.push(val);
remaining = nxt;
}
// if !existing.is_empty() {
// let v_tup = EncodedTuple(&existing);
// if v_tup.arity() > 0 {
// tup.0.extend(v_tup.decode().0);
// }
// }
old_tuples.push(DataValue::List(tup.0));
}
@ -371,10 +386,12 @@ enum DataExtractor {
impl DataExtractor {
fn extract_data(&self, tuple: &Tuple) -> Result<DataValue> {
Ok(match self {
DataExtractor::DefaultExtractor(expr, typ) => {
typ.coerce(expr.clone().eval_to_const()?)?
}
DataExtractor::IndexExtractor(i, typ) => typ.coerce(tuple.0[*i].clone())?,
DataExtractor::DefaultExtractor(expr, typ) => typ
.coerce(expr.clone().eval_to_const()?)
.wrap_err_with(|| format!("when processing tuple {:?}", tuple.0))?,
DataExtractor::IndexExtractor(i, typ) => typ
.coerce(tuple.0[*i].clone())
.wrap_err_with(|| format!("when processing tuple {:?}", tuple.0))?,
})
}
}

@ -3,7 +3,6 @@
*/
use std::{fs, thread};
use std::cmp::Ordering::Greater;
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
@ -23,7 +22,7 @@ use cozorocks::{DbBuilder, RocksDb};
use crate::data::json::JsonValue;
use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
use crate::data::symb::Symbol;
use crate::data::tuple::{compare_tuple_keys, rusty_scratch_cmp, SCRATCH_DB_KEY_PREFIX_LEN, Tuple};
use crate::data::tuple::{SCRATCH_DB_KEY_PREFIX_LEN, Tuple};
use crate::data::value::{DataValue, LARGEST_UTF_CHAR};
use crate::parse::{CozoScript, parse_script, SourceSpan};
use crate::parse::sys::SysOp;
@ -125,7 +124,7 @@ impl Db {
let db_builder = builder
.create_if_missing(is_new)
.use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN)
.use_custom_comparator("cozo_rusty_cmp", rusty_scratch_cmp, false)
// .use_custom_comparator("cozo_rusty_cmp", rusty_scratch_cmp, false)
.use_bloom_filter(true, 9.9, true)
.path(store_path.to_str().unwrap());
@ -603,15 +602,16 @@ impl Db {
let sorted_result =
tx.sort_and_collect(result, &input_program.out_opts.sorters, &entry_head)?;
let sorted_iter = if let Some(offset) = input_program.out_opts.offset {
Left(sorted_result.scan_sorted().skip(offset))
Left(sorted_result.into_iter().skip(offset))
} else {
Right(sorted_result.scan_sorted())
Right(sorted_result.into_iter())
};
let sorted_iter = if let Some(limit) = input_program.out_opts.limit {
Left(sorted_iter.take(limit))
} else {
Right(sorted_iter)
};
let sorted_iter = sorted_iter.map(|t| Ok(t));
if let Some((meta, relation_op)) = &input_program.out_opts.store_relation {
let to_clear = tx
.execute_relation(
@ -726,9 +726,12 @@ impl Db {
it.seek(&lower);
let mut collected = vec![];
while let Some((k_slice, v_slice)) = it.pair()? {
if compare_tuple_keys(&upper, k_slice) != Greater {
if upper.as_slice() <= k_slice {
break;
}
// if compare_tuple_keys(&upper, k_slice) != Greater {
// break;
// }
let meta = RelationHandle::decode(v_slice)?;
let n_keys = meta.metadata.keys.len();
let n_dependents = meta.metadata.non_keys.len();

@ -14,11 +14,9 @@ use either::{Left, Right};
use itertools::Itertools;
use miette::Result;
use cozorocks::DbIter;
use crate::data::aggr::Aggregation;
use crate::data::program::MagicSymbol;
use crate::data::tuple::{EncodedTuple, Tuple};
use crate::data::tuple::{Tuple};
use crate::data::value::DataValue;
use crate::query::eval::QueryLimiter;
use crate::runtime::db::Poison;
@ -141,12 +139,6 @@ impl InMemRelation {
let mut target = db.get(epoch as usize).unwrap().try_write().unwrap();
target.insert(tuple, Tuple::default());
}
pub(crate) fn put_kv(&self, tuple: Tuple, val: Tuple, epoch: u32) {
self.ensure_mem_db_for_epoch(epoch);
let db = self.mem_db.try_read().unwrap();
let mut target = db.get(epoch as usize).unwrap().try_write().unwrap();
target.insert(tuple, val);
}
pub(crate) fn normal_aggr_put(
&self,
tuple: &Tuple,
@ -315,12 +307,6 @@ impl InMemRelation {
pub(crate) fn scan_all(&self) -> impl Iterator<Item=Result<Tuple>> {
self.scan_all_for_epoch(0)
}
pub(crate) fn scan_sorted(&self) -> impl Iterator<Item=Result<Tuple>> {
self.ensure_mem_db_for_epoch(0);
let target = self.mem_db.try_read().unwrap();
let target = target.get(0).unwrap().try_read().unwrap();
target.clone().into_iter().map(|(_k, v)| Ok(v))
}
pub(crate) fn scan_prefix(&self, prefix: &Tuple) -> impl Iterator<Item=Result<Tuple>> {
self.scan_prefix_for_epoch(prefix, 0)
}
@ -379,24 +365,24 @@ impl InMemRelation {
res.into_iter()
}
}
struct SortedIter {
it: DbIter,
started: bool,
}
impl Iterator for SortedIter {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {
if !self.started {
self.started = true;
} else {
self.it.next();
}
match self.it.pair() {
Err(e) => Some(Err(e.into())),
Ok(None) => None,
Ok(Some((_, v_slice))) => Some(Ok(EncodedTuple(v_slice).decode())),
}
}
}
//
// struct SortedIter {
// it: DbIter,
// started: bool,
// }
//
// impl Iterator for SortedIter {
// type Item = Result<Tuple>;
// fn next(&mut self) -> Option<Self::Item> {
// if !self.started {
// self.started = true;
// } else {
// self.it.next();
// }
// match self.it.pair() {
// Err(e) => Some(Err(e.into())),
// Ok(None) => None,
// Ok(Some((_, v_slice))) => Some(Ok(EncodedTuple(v_slice).decode())),
// }
// }
// }

@ -2,8 +2,6 @@
* Copyright 2022, The Cozo Project Authors. Licensed under AGPL-3 or later.
*/
use std::cmp::max;
use std::cmp::Ordering::Greater;
use std::fmt::{Debug, Display, Formatter};
use std::sync::atomic::Ordering;
@ -16,9 +14,10 @@ use thiserror::Error;
use cozorocks::DbIter;
use crate::data::memcmp::MemCmpEncoder;
use crate::data::relation::StoredRelationMetadata;
use crate::data::symb::Symbol;
use crate::data::tuple::{compare_tuple_keys, EncodedTuple, Tuple};
use crate::data::tuple::{ENCODED_KEY_MIN_LEN, Tuple};
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::transact::SessionTx;
@ -120,28 +119,18 @@ impl RelationHandle {
fn encode_key_prefix(&self, len: usize) -> Vec<u8> {
let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len);
let prefix_bytes = self.id.0.to_be_bytes();
ret.extend([
prefix_bytes[2],
prefix_bytes[3],
prefix_bytes[4],
prefix_bytes[5],
prefix_bytes[6],
prefix_bytes[7],
]);
ret.extend((len as u16).to_be_bytes());
ret.resize(max(6, 4 * (len + 1)), 0);
ret.extend(prefix_bytes);
ret
}
fn encode_key_element(&self, ret: &mut Vec<u8>, idx: usize, val: &DataValue) {
if idx > 0 {
let pos = (ret.len() as u32).to_be_bytes();
for (i, u) in pos.iter().enumerate() {
ret[4 * (1 + idx) + i] = *u;
}
}
val.serialize(&mut Serializer::new(ret)).unwrap();
}
// fn encode_key_element(&self, ret: &mut Vec<u8>, idx: usize, val: &DataValue) {
// if idx > 0 {
// let pos = (ret.len() as u32).to_be_bytes();
// for (i, u) in pos.iter().enumerate() {
// ret[4 * (1 + idx) + i] = *u;
// }
// }
// val.serialize(&mut Serializer::new(ret)).unwrap();
// }
pub(crate) fn adhoc_encode_key(&self, tuple: &Tuple, span: SourceSpan) -> Result<Vec<u8>> {
let len = self.metadata.keys.len();
ensure!(
@ -154,18 +143,24 @@ impl RelationHandle {
}
);
let mut ret = self.encode_key_prefix(len);
for i in 0..len {
self.encode_key_element(&mut ret, i, &tuple.0[i])
for val in &tuple.0[0..len] {
ret.encode_datavalue(val);
}
// for i in 0..len {
// self.encode_key_element(&mut ret, i, &tuple.0[i])
// }
Ok(ret)
}
pub(crate) fn adhoc_encode_val(&self, tuple: &Tuple, _span: SourceSpan) -> Result<Vec<u8>> {
let start = self.metadata.keys.len();
let len = self.metadata.non_keys.len();
let mut ret = self.encode_key_prefix(len);
for i in 0..len {
self.encode_key_element(&mut ret, i, &tuple.0[i + start])
}
// for i in 0..len {
// self.encode_key_element(&mut ret, i, &tuple.0[i + start])
// }
tuple.0[start..]
.serialize(&mut Serializer::new(&mut ret))
.unwrap();
Ok(ret)
}
pub(crate) fn ensure_compatible(&self, inp: &InputRelationHandle) -> Result<()> {
@ -284,16 +279,23 @@ impl RelationIterator {
Ok(match self.inner.pair()? {
None => None,
Some((k_slice, v_slice)) => {
if compare_tuple_keys(&self.upper_bound, k_slice) != Greater {
if self.upper_bound.as_slice() <= k_slice {
//
// }
// if compare_tuple_keys(&self.upper_bound, k_slice) != Greater {
None
} else {
let mut tup = EncodedTuple(k_slice).decode();
let mut tup = Tuple::decode_from_key(k_slice);
if !v_slice.is_empty() {
let v_tup = EncodedTuple(v_slice);
if v_tup.arity() > 0 {
tup.0.extend(v_tup.decode().0);
}
}
let vals: Vec<DataValue> = rmp_serde::from_slice(&v_slice[ENCODED_KEY_MIN_LEN..]).unwrap();
tup.0.extend(vals);
}
// if !v_slice.is_empty() {
// let v_tup = EncodedTuple(v_slice);
// if v_tup.arity() > 0 {
// tup.0.extend(v_tup.decode().0);
// }
// }
Some(tup)
}
}

@ -23,7 +23,6 @@ lazy_static! {
let init = Instant::now();
db.run_script(r##"
{
res[idx, label, typ, code, icao, desc, region, runways, longest, elev, country, city, lat, lon] <~
CsvReader(types: ['Int', 'Any', 'Any', 'Any', 'Any', 'Any', 'Any', 'Int?', 'Float?', 'Float?', 'Any', 'Any', 'Float?', 'Float?'],
url: 'file://./tests/air-routes-latest-nodes.csv',
@ -47,8 +46,9 @@ lazy_static! {
lat: Float,
lon: Float
}
}
{
"##, &Default::default()).unwrap();
db.run_script(r##"
res[idx, label, typ, code, icao, desc] <~
CsvReader(types: ['Int', 'Any', 'Any', 'Any', 'Any', 'Any'],
url: 'file://./tests/air-routes-latest-nodes.csv',
@ -62,8 +62,9 @@ lazy_static! {
=>
desc: String
}
}
{
"##, &Default::default()).unwrap();
db.run_script(r##"
res[idx, label, typ, code, icao, desc] <~
CsvReader(types: ['Int', 'Any', 'Any', 'Any', 'Any', 'Any'],
url: 'file://./tests/air-routes-latest-nodes.csv',
@ -77,8 +78,9 @@ lazy_static! {
=>
desc: String
}
}
{
"##, &Default::default()).unwrap();
db.run_script(r##"
res[idx, label, typ, code] <~
CsvReader(types: ['Int', 'Any', 'Any', 'Any'],
url: 'file://./tests/air-routes-latest-nodes.csv',
@ -86,9 +88,12 @@ lazy_static! {
?[idx, code] :=
res[idx, label, typ, code],
:replace idx2code { idx => code }
}
{
:replace idx2code { idx: Int => code: String }
"##, &Default::default()).unwrap();
// println!("{}", db.run_script("?[idx, code] := *idx2code[idx, code]", &Default::default()).unwrap());
db.run_script(r##"
res[] <~
CsvReader(types: ['Int', 'Int', 'Int', 'String', 'Float?'],
url: 'file://./tests/air-routes-latest-edges.csv',
@ -100,8 +105,9 @@ lazy_static! {
*idx2code[to_i, to]
:replace route { fr: String, to: String => dist: Float }
}
{
"##, &Default::default()).unwrap();
db.run_script(r##"
res[] <~
CsvReader(types: ['Int', 'Int', 'Int', 'String'],
url: 'file://./tests/air-routes-latest-edges.csv',
@ -114,7 +120,6 @@ lazy_static! {
:replace contain { entity: String, contained: String }
}
"##, &Default::default()).unwrap();
db.run_script("::remove idx2code", &Default::default())

Loading…
Cancel
Save