sqlite support

main
Ziyang Hu 2 years ago
parent d8f37a7b2d
commit c7877749ac

31
Cargo.lock generated

@ -508,6 +508,7 @@ dependencies = [
"sled",
"smallvec",
"smartstring",
"sqlite",
"thiserror",
"tikv-client",
"tikv-jemallocator-global",
@ -2733,6 +2734,36 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlite"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaae786470902daa541da257e5d1d519b1d16f4eef3f5426bee4fd6e7d78f5f7"
dependencies = [
"libc",
"sqlite3-sys",
]
[[package]]
name = "sqlite3-src"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1815a7a02c996eb8e5c64f61fcb6fd9b12e593ce265c512c5853b2513635691"
dependencies = [
"cc",
"pkg-config",
]
[[package]]
name = "sqlite3-sys"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d47c99824fc55360ba00caf28de0b8a0458369b832e016a64c13af0ad9fbb9ee"
dependencies = [
"libc",
"sqlite3-src",
]
[[package]]
name = "static_assertions"
version = "1.1.0"

@ -64,6 +64,7 @@ cozorocks = { path = "../cozorocks", version = "0.1.0" }
sled = "0.34.7"
tikv-client = "0.1.0"
tokio = "1.21.2"
sqlite = "0.30.1"
#redb = "0.9.0"
#ouroboros = "0.15.5"

@ -21,6 +21,7 @@ pub use miette::Error;
pub use runtime::db::Db;
pub use storage::rocks::{new_cozo_rocksdb, RocksDbStorage};
pub use storage::sled::{new_cozo_sled, SledStorage};
pub use storage::sqlite::{new_cozo_sqlite, SqliteStorage};
pub use storage::tikv::{new_cozo_tikv, TiKvStorage};
// pub use storage::re::{new_cozo_redb, ReStorage};

@ -100,6 +100,7 @@ impl<'s, S: Storage<'s>> Db<S> {
Ok(ret)
}
/// should be called after creation of the database to initialize the runtime data.
pub fn initialize(&'s self) -> Result<()> {
self.load_last_ids()?;
Ok(())

@ -199,7 +199,10 @@ impl RelationHandle {
RelationDeserError
})?)
}
pub(crate) fn scan_all<'a>(&self, tx: &'a SessionTx<'_>) -> impl Iterator<Item = Result<Tuple>> + 'a {
pub(crate) fn scan_all<'a>(
&self,
tx: &'a SessionTx<'_>,
) -> impl Iterator<Item = Result<Tuple>> + 'a {
let lower = Tuple::default().encode_as_key(self.id);
let upper = Tuple::default().encode_as_key(self.id.next());
tx.tx.range_scan(&lower, &upper)

@ -9,6 +9,7 @@ use crate::data::tuple::Tuple;
pub(crate) mod rocks;
pub(crate) mod sled;
pub(crate) mod tikv;
pub(crate) mod sqlite;
// pub(crate) mod re;
pub trait Storage<'s> {

@ -2,21 +2,27 @@
* Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0.
*/
use std::borrow::Borrow;
use std::cell::RefCell;
use std::path::Path;
use std::sync::Arc;
use std::{iter, thread};
use clap::builder::TypedValueParser;
use itertools::Itertools;
use miette::{miette, IntoDiagnostic, Result};
use miette::{miette, IntoDiagnostic, Report, Result};
use ouroboros::self_referencing;
use redb::{
Builder, Database, ReadOnlyTable, ReadTransaction, ReadableTable, Table, TableDefinition,
WriteStrategy, WriteTransaction,
Builder, Database, RangeIter, ReadOnlyTable, ReadTransaction, ReadableTable, Table,
TableDefinition, WriteStrategy, WriteTransaction,
};
use crate::data::tuple::Tuple;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx};
/// This currently does not work even after pulling in ouroboros: ReDB's lifetimes are really maddening
/// Creates a ReDB database object.
pub fn new_cozo_redb(path: impl AsRef<Path>) -> Result<crate::Db<ReStorage>> {
let ret = crate::Db::new(ReStorage::new(path)?)?;
@ -55,14 +61,18 @@ impl ReStorage {
}
}
impl Storage for ReStorage {
type Tx = ReTx;
impl<'s> Storage<'s> for ReStorage {
type Tx = ReTx<'s>;
fn transact(&self, write: bool) -> Result<Self::Tx> {
fn transact(&'s self, write: bool) -> Result<Self::Tx> {
Ok(if write {
ReTx::Write(ReTxWrite::new(self.db.clone()))
let tx = self.db.begin_write().into_diagnostic()?;
ReTx::Write(ReTxWrite {
tx: Some(RefCell::new(tx)),
})
} else {
ReTx::Read(ReTxRead::new(self.db.clone()))
let tx = self.db.begin_read().into_diagnostic()?;
ReTx::Read(ReTxRead { tx })
})
}
@ -93,211 +103,179 @@ impl Storage for ReStorage {
}
}
pub enum ReTx {
Read(ReTxRead),
Write(ReTxWrite),
}
pub struct ReTxRead {
db_ptr: Option<*const Database>,
tx_ptr: Option<*mut ReadTransaction<'static>>,
tbl_ptr: Option<*mut ReadOnlyTable<'static, [u8], [u8]>>,
}
impl ReTxRead {
fn new(db_arc: Arc<Database>) -> Self {
unsafe {
let db_ptr = Arc::into_raw(db_arc);
let tx_ptr = Box::into_raw(Box::new(
(&*db_ptr)
.begin_read()
.expect("fatal: open read transaction failed"),
));
let tbl_ptr = Box::into_raw(Box::new(
(&*tx_ptr)
.open_table(TABLE)
.expect("fatal: open table failed"),
));
ReTxRead {
db_ptr: Some(db_ptr),
tx_ptr: Some(tx_ptr),
tbl_ptr: Some(tbl_ptr),
}
}
}
pub enum ReTx<'s> {
Read(ReTxRead<'s>),
Write(ReTxWrite<'s>),
}
impl Drop for ReTxRead {
fn drop(&mut self) {
unsafe {
let db_ptr = self.db_ptr.take();
let tx_ptr = self.tx_ptr.take();
let tbl_ptr = self.tbl_ptr.take();
let _db = Arc::from_raw(db_ptr.unwrap());
let _tx = Box::from_raw(tx_ptr.unwrap());
let _tbl = Box::from_raw(tbl_ptr.unwrap());
}
}
pub struct ReTxRead<'s> {
tx: ReadTransaction<'s>,
}
pub struct ReTxWrite {
db_ptr: Option<*const Database>,
tx_ptr: Option<*mut WriteTransaction<'static>>,
tbl_ptr: Option<*mut Table<'static, 'static, [u8], [u8]>>,
pub struct ReTxWrite<'s> {
tx: Option<RefCell<WriteTransaction<'s>>>,
}
impl ReTxWrite {
fn new(db_arc: Arc<Database>) -> Self {
unsafe {
let db_ptr = Arc::into_raw(db_arc);
let tx_ptr = Box::into_raw(Box::new(
(&*db_ptr)
.begin_write()
.expect("fatal: open write transaction failed"),
));
let tbl_ptr = Box::into_raw(Box::new(
(&*tx_ptr)
.open_table(TABLE)
.expect("fatal: open table failed"),
));
ReTxWrite {
db_ptr: Some(db_ptr),
tx_ptr: Some(tx_ptr),
tbl_ptr: Some(tbl_ptr),
}
}
}
}
impl Drop for ReTxWrite {
fn drop(&mut self) {
unsafe {
let db_ptr = self.db_ptr.take();
let _db = Arc::from_raw(db_ptr.unwrap());
if self.tx_ptr.is_some() {
let tx_ptr = self.tx_ptr.take();
let tbl_ptr = self.tbl_ptr.take();
let _tx = Box::from_raw(tx_ptr.unwrap());
let _tbl = Box::from_raw(tbl_ptr.unwrap());
}
}
}
}
impl StoreTx for ReTx {
impl<'s> StoreTx<'s> for ReTx<'s> {
fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Vec<u8>>> {
unsafe {
match self {
ReTx::Read(inner) => {
let tbl = &*inner.tbl_ptr.unwrap();
let tbl = inner.tx.open_table(TABLE).into_diagnostic()?;
tbl.get(key)
.map(|op| op.map(|s| s.to_vec()))
.into_diagnostic()
}
ReTx::Write(inner) => {
let tbl = &*inner.tbl_ptr.unwrap();
let tx = inner.tx.as_ref().unwrap().borrow_mut();
let tbl = tx.open_table(TABLE).into_diagnostic()?;
tbl.get(key)
.map(|op| op.map(|s| s.to_vec()))
.into_diagnostic()
}
}
}
}
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
unsafe {
match self {
ReTx::Read(_) => unreachable!(),
ReTx::Write(inner) => {
let tbl = &mut *inner.tbl_ptr.unwrap();
let tx = inner.tx.as_ref().unwrap().borrow_mut();
let mut tbl = tx.open_table(TABLE).into_diagnostic()?;
tbl.insert(key, val).into_diagnostic()?;
Ok(())
}
}
}
}
fn del(&mut self, key: &[u8]) -> Result<()> {
unsafe {
match self {
ReTx::Read(_) => unreachable!(),
ReTx::Write(inner) => {
let tbl = &mut *inner.tbl_ptr.unwrap();
let tx = inner.tx.as_ref().unwrap().borrow_mut();
let mut tbl = tx.open_table(TABLE).into_diagnostic()?;
tbl.remove(key).into_diagnostic()?;
Ok(())
}
}
}
}
fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
unsafe {
match self {
ReTx::Read(inner) => {
let tbl = &*inner.tbl_ptr.unwrap();
let tbl = inner.tx.open_table(TABLE).into_diagnostic()?;
tbl.get(key).map(|op| op.is_some()).into_diagnostic()
}
ReTx::Write(inner) => {
let tbl = &*inner.tbl_ptr.unwrap();
let tx = inner.tx.as_ref().unwrap().borrow_mut();
let tbl = tx.open_table(TABLE).into_diagnostic()?;
tbl.get(key).map(|op| op.is_some()).into_diagnostic()
}
}
}
}
fn commit(&mut self) -> Result<()> {
match self {
ReTx::Read(_) => Ok(()),
ReTx::Write(inner) => unsafe {
let tx_ptr = inner.tx_ptr.take();
let tbl_ptr = inner.tbl_ptr.take();
let _tbl = Box::from_raw(tbl_ptr.unwrap());
let tx = Box::from_raw(tx_ptr.unwrap());
tx.commit().into_diagnostic()
},
ReTx::Write(inner) => {
let tx_cell = inner.tx.take().unwrap();
let tx = tx_cell.into_inner();
tx.commit().into_diagnostic()?;
Ok(())
}
}
}
fn range_scan(&self, lower: &[u8], upper: &[u8]) -> Box<dyn Iterator<Item = Result<Tuple>>> {
fn range_scan<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a>
where
's: 'a,
{
match self {
ReTx::Read(inner) => unsafe {
let tbl = &*inner.tbl_ptr.unwrap();
match tbl.range(lower.to_vec()..upper.to_vec()) {
Ok(it) => Box::new(it.map(|(k, v)| Ok(decode_tuple_from_kv(k, v)))),
Err(err) => Box::new(iter::once(Err(miette!(err)))),
ReTx::Read(inner) => {
let tbl = match inner.tx.open_table(TABLE).into_diagnostic() {
Ok(tbl) => tbl,
Err(err) => return Box::new(iter::once(Err(miette!(err)))),
};
let it = ReadTableIterBuilder {
tbl,
it_builder: |tbl| tbl.range(lower.to_vec()..upper.to_vec()).unwrap(),
}
},
ReTx::Write(inner) => unsafe {
let tbl = &*inner.tbl_ptr.unwrap();
match tbl.range(lower.to_vec()..upper.to_vec()) {
Ok(it) => Box::new(it.map(|(k, v)| Ok(decode_tuple_from_kv(k, v)))),
Err(err) => Box::new(iter::once(Err(miette!(err)))),
.build();
todo!()
// match tbl.range(lower.to_vec()..upper.to_vec()) {
// Ok(it) => Box::new(it.map(|(k, v)| Ok(decode_tuple_from_kv(k, v)))),
// Err(err) => Box::new(iter::once(Err(miette!(err)))),
// }
}
ReTx::Write(inner) => {
let tx = inner.tx.as_ref().unwrap().borrow_mut();
let tbl = match tx.open_table(TABLE) {
Ok(tbl) => tbl,
Err(err) => return Box::new(iter::once(Err(miette!(err)))),
};
let it = WriteTableIterBuilder {
tbl,
it_builder: |tbl| tbl.range(lower.to_vec()..upper.to_vec()).unwrap(),
}
.build();
todo!()
// let tbl = &*inner.tbl_ptr.unwrap();
// match tbl.range(lower.to_vec()..upper.to_vec()) {
// Ok(it) => Box::new(it.map(|(k, v)| Ok(decode_tuple_from_kv(k, v)))),
// Err(err) => Box::new(iter::once(Err(miette!(err)))),
// }
}
},
}
}
fn range_scan_raw(
&self,
fn range_scan_raw<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>> {
match self {
ReTx::Read(inner) => unsafe {
let tbl = &*inner.tbl_ptr.unwrap();
match tbl.range(lower.to_vec()..upper.to_vec()) {
Ok(it) => Box::new(it.map(|(k, v)| Ok((k.to_vec(), v.to_vec())))),
Err(err) => Box::new(iter::once(Err(miette!(err)))),
}
},
ReTx::Write(inner) => unsafe {
let tbl = &*inner.tbl_ptr.unwrap();
match tbl.range(lower.to_vec()..upper.to_vec()) {
Ok(it) => Box::new(it.map(|(k, v)| Ok((k.to_vec(), v.to_vec())))),
Err(err) => Box::new(iter::once(Err(miette!(err)))),
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
where
's: 'a,
{
todo!()
// match self {
// ReTx::Read(inner) => unsafe {
// let tbl = &*inner.tbl_ptr.unwrap();
// match tbl.range(lower.to_vec()..upper.to_vec()) {
// Ok(it) => Box::new(it.map(|(k, v)| Ok((k.to_vec(), v.to_vec())))),
// Err(err) => Box::new(iter::once(Err(miette!(err)))),
// }
// },
// ReTx::Write(inner) => unsafe {
// todo!()
// // let tbl = &*inner.tbl_ptr.unwrap();
// // match tbl.range(lower.to_vec()..upper.to_vec()) {
// // Ok(it) => Box::new(it.map(|(k, v)| Ok((k.to_vec(), v.to_vec())))),
// // Err(err) => Box::new(iter::once(Err(miette!(err)))),
// // }
// },
// }
}
},
}
#[self_referencing]
struct ReadTableIter<'txn> {
tbl: ReadOnlyTable<'txn, [u8], [u8]>,
#[borrows(tbl)]
#[not_covariant]
it: RangeIter<'this, [u8], [u8]>,
}
#[self_referencing]
struct WriteTableIter<'db, 'txn>
where
'txn: 'db,
{
tbl: Table<'db, 'txn, [u8], [u8]>,
#[borrows(tbl)]
#[not_covariant]
it: RangeIter<'this, [u8], [u8]>,
}

@ -0,0 +1,209 @@
/*
* Copyright 2022, The Cozo Project Authors. Licensed under MPL-2.0.
*/
use ::sqlite::Connection;
use miette::{miette, IntoDiagnostic, Result, WrapErr};
use sqlite::{State, Statement};
use crate::data::tuple::Tuple;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx};
/// The Sqlite storage engine
pub struct SqliteStorage {
path: String,
}
/// create a sqlite backed database. `:memory:` is not OK.
pub fn new_cozo_sqlite(path: String) -> Result<crate::Db<SqliteStorage>> {
let connection = sqlite::open(&path).into_diagnostic()?;
let query = r#"
create table if not exists cozo
(
k BLOB primary key,
v BLOB
);
"#;
let mut statement = connection.prepare(query).unwrap();
while statement.next().into_diagnostic()? != State::Done {}
let ret = crate::Db::new(SqliteStorage { path })?;
ret.initialize()?;
Ok(ret)
}
impl Storage<'_> for SqliteStorage {
type Tx = SqliteTx;
fn transact(&'_ self, _write: bool) -> Result<Self::Tx> {
let conn = sqlite::open(&self.path).into_diagnostic()?;
{
let query = r#"begin;"#;
let mut statement = conn.prepare(query).unwrap();
while statement.next().unwrap() != State::Done {}
}
Ok(SqliteTx { conn })
}
fn del_range(&'_ self, lower: &[u8], upper: &[u8]) -> Result<()> {
let lower_b = lower.to_vec();
let upper_b = upper.to_vec();
let path = self.path.clone();
let connection = sqlite::open(path).unwrap();
let query = r#"
delete from cozo where k >= ? and k < ?;
"#;
let mut statement = connection.prepare(query).unwrap();
statement.bind((1, &lower_b as &[u8])).unwrap();
statement.bind((2, &upper_b as &[u8])).unwrap();
while statement.next().unwrap() != State::Done {}
Ok(())
}
fn range_compact(&'_ self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
Ok(())
}
}
pub struct SqliteTx {
conn: Connection,
}
impl<'s> StoreTx<'s> for SqliteTx {
fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Vec<u8>>> {
let query = r#"
select v from cozo where k = ?;
"#;
let mut statement = self.conn.prepare(query).unwrap();
statement.bind((1, key)).unwrap();
Ok(match statement.next().into_diagnostic()? {
State::Row => {
let res = statement.read::<Vec<u8>, _>(0).into_diagnostic()?;
Some(res)
}
State::Done => None,
})
}
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
let query = r#"
insert into cozo(k, v) values (?, ?)
on conflict(k) do update set v=excluded.v;
"#;
let mut statement = self.conn.prepare(query).unwrap();
statement.bind((1, key)).unwrap();
statement.bind((2, val)).unwrap();
while statement
.next()
.into_diagnostic()
.with_context(|| format!("{:x?} {:?} {:x?}", key, val, Tuple::decode_from_key(key)))?
!= State::Done
{}
Ok(())
}
fn del(&mut self, key: &[u8]) -> Result<()> {
let query = r#"
delete from cozo where k = ?;
"#;
let mut statement = self.conn.prepare(query).unwrap();
statement.bind((1, key)).unwrap();
while statement.next().into_diagnostic()? != State::Done {}
Ok(())
}
fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
let query = r#"
select 1 from cozo where k = ?;
"#;
let mut statement = self.conn.prepare(query).unwrap();
statement.bind((1, key)).unwrap();
Ok(match statement.next().into_diagnostic()? {
State::Row => true,
State::Done => false,
})
}
fn commit(&mut self) -> Result<()> {
let query = r#"commit;"#;
let mut statement = self.conn.prepare(query).unwrap();
while statement.next().unwrap() != State::Done {}
Ok(())
}
fn range_scan<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a>
where
's: 'a,
{
let query = r#"
select k, v from cozo where k >= ? and k < ?
order by k;
"#;
let mut statement = self.conn.prepare(query).unwrap();
statement.bind((1, lower)).unwrap();
statement.bind((2, upper)).unwrap();
Box::new(TupleIter(statement))
}
fn range_scan_raw<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
where
's: 'a,
{
let query = r#"
select k, v from cozo where k >= ? and k < ?
order by k;
"#;
let mut statement = self.conn.prepare(query).unwrap();
statement.bind((1, lower)).unwrap();
statement.bind((2, upper)).unwrap();
Box::new(RawIter(statement))
}
}
struct TupleIter<'l>(Statement<'l>);
impl<'l> Iterator for TupleIter<'l> {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {
match self.0.next() {
Ok(State::Done) => None,
Ok(State::Row) => {
let k = self.0.read::<Vec<u8>, _>(0).unwrap();
let v = self.0.read::<Vec<u8>, _>(1).unwrap();
let tuple = decode_tuple_from_kv(&k, &v);
Some(Ok(tuple))
}
Err(err) => Some(Err(miette!(err))),
}
}
}
struct RawIter<'l>(Statement<'l>);
impl<'l> Iterator for RawIter<'l> {
type Item = Result<(Vec<u8>, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
match self.0.next() {
Ok(State::Done) => None,
Ok(State::Row) => {
let k = self.0.read::<Vec<u8>, _>(0).unwrap();
let v = self.0.read::<Vec<u8>, _>(1).unwrap();
Some(Ok((k, v)))
}
Err(err) => Some(Err(miette!(err))),
}
}
}

@ -24,7 +24,7 @@ pub fn new_cozo_tikv(pd_endpoints: Vec<String>, optimistic: bool) -> Result<Db<T
.block_on(RawClient::new(pd_endpoints.clone()))
.into_diagnostic()?;
let client = RT
.block_on(TransactionClient::new(pd_endpoints.clone()))
.block_on(TransactionClient::new(pd_endpoints))
.into_diagnostic()?;
let ret = Db::new(TiKvStorage {
client: Arc::new(client),
@ -184,7 +184,7 @@ impl BatchScannerRaw {
);
let res = RT.block_on(fut).into_diagnostic()?;
let res_vec = res
.map(|pair| -> (Vec<u8>, Vec<u8>) { (pair.0.into(), pair.1.into()) })
.map(|pair| -> (Vec<u8>, Vec<u8>) { (pair.0.into(), pair.1) })
.collect_vec();
let has_content = !res_vec.is_empty();
if has_content {
@ -204,7 +204,7 @@ impl BatchScannerRaw {
);
let res = RT.block_on(fut).into_diagnostic()?;
let res_vec = res
.map(|pair| -> (Vec<u8>, Vec<u8>) { (pair.0.into(), pair.1.into()) })
.map(|pair| -> (Vec<u8>, Vec<u8>) { (pair.0.into(), pair.1) })
.collect_vec();
let has_content = !res_vec.is_empty();
if has_content {

File diff suppressed because it is too large Load Diff

@ -9,8 +9,10 @@ use cbindgen::{Config, Language};
fn main() {
let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
let mut config = Config::default();
config.cpp_compat = true;
let config = Config {
cpp_compat: true,
..Config::default()
};
cbindgen::Builder::new()
.with_config(config)
.with_crate(crate_dir)

@ -120,7 +120,7 @@ pub unsafe extern "C" fn cozo_run_query(
}
};
let result = db.run_script_str(script, &params_str);
let result = db.run_script_str(script, params_str);
CString::new(result).unwrap().into_raw()
}

Loading…
Cancel
Save