optimize sqlite storage

main
Ziyang Hu 2 years ago
parent 810bc17e82
commit 71f1530368

@ -444,7 +444,7 @@ fn make_const_rule(
); );
let bindings_arity = bindings.len(); let bindings_arity = bindings.len();
program.prog.insert( program.prog.insert(
rule_symbol.clone(), rule_symbol,
InputInlineRulesOrAlgo::Algo { InputInlineRulesOrAlgo::Algo {
algo: AlgoApply { algo: AlgoApply {
algo: AlgoHandle { algo: AlgoHandle {

@ -180,11 +180,7 @@ impl<'s, S: Storage<'s>> Db<S> {
for data in tx.tx.range_scan(&start, &end) { for data in tx.tx.range_scan(&start, &end) {
let (k, v) = data?; let (k, v) = data?;
let tuple = decode_tuple_from_kv(&k, &v); let tuple = decode_tuple_from_kv(&k, &v);
let row = tuple let row = tuple.0.into_iter().map(JsonValue::from).collect_vec();
.0
.into_iter()
.map(|dv| JsonValue::from(dv))
.collect_vec();
rows.push(row); rows.push(row);
} }
let headers = cols.iter().map(|col| col.to_string()).collect_vec(); let headers = cols.iter().map(|col| col.to_string()).collect_vec();

@ -6,7 +6,8 @@
* You can obtain one at https://mozilla.org/MPL/2.0/. * You can obtain one at https://mozilla.org/MPL/2.0/.
*/ */
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::cell::RefCell;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use ::sqlite::Connection; use ::sqlite::Connection;
use either::{Either, Left, Right}; use either::{Either, Left, Right};
@ -22,6 +23,7 @@ use crate::storage::{Storage, StoreTx};
pub struct SqliteStorage { pub struct SqliteStorage {
lock: Arc<RwLock<()>>, lock: Arc<RwLock<()>>,
name: String, name: String,
pool: Arc<Mutex<Vec<Connection>>>,
} }
/// Create a sqlite backed database. /// Create a sqlite backed database.
@ -50,6 +52,7 @@ pub fn new_cozo_sqlite(path: String) -> Result<crate::Db<SqliteStorage>> {
let ret = crate::Db::new(SqliteStorage { let ret = crate::Db::new(SqliteStorage {
lock: Arc::new(Default::default()), lock: Arc::new(Default::default()),
name: path, name: path,
pool: Arc::new(Mutex::new(vec![])),
})?; })?;
ret.initialize()?; ret.initialize()?;
@ -60,7 +63,12 @@ impl<'s> Storage<'s> for SqliteStorage {
type Tx = SqliteTx<'s>; type Tx = SqliteTx<'s>;
fn transact(&'s self, write: bool) -> Result<Self::Tx> { fn transact(&'s self, write: bool) -> Result<Self::Tx> {
let conn = sqlite::open(&self.name).into_diagnostic()?; let conn = {
match self.pool.lock().unwrap().pop() {
None => sqlite::open(&self.name).into_diagnostic()?,
Some(conn) => conn,
}
};
let lock = if write { let lock = if write {
Right(self.lock.write().unwrap()) Right(self.lock.write().unwrap())
} else { } else {
@ -72,7 +80,14 @@ impl<'s> Storage<'s> for SqliteStorage {
} }
Ok(SqliteTx { Ok(SqliteTx {
lock, lock,
conn, storage: self,
conn: Some(conn),
stmts: [
RefCell::new(None),
RefCell::new(None),
RefCell::new(None),
RefCell::new(None),
],
committed: false, committed: false,
}) })
} }
@ -101,34 +116,74 @@ impl<'s> Storage<'s> for SqliteStorage {
} }
fn range_compact(&'_ self, _lower: &[u8], _upper: &[u8]) -> Result<()> { fn range_compact(&'_ self, _lower: &[u8], _upper: &[u8]) -> Result<()> {
let mut pool = self.pool.lock().unwrap();
while pool.pop().is_some() {}
Ok(()) Ok(())
} }
} }
pub struct SqliteTx<'a> { pub struct SqliteTx<'a> {
lock: Either<RwLockReadGuard<'a, ()>, RwLockWriteGuard<'a, ()>>, lock: Either<RwLockReadGuard<'a, ()>, RwLockWriteGuard<'a, ()>>,
conn: Connection, storage: &'a SqliteStorage,
conn: Option<Connection>,
stmts: [RefCell<Option<Statement<'static>>>; N_CACHED_QUERIES],
committed: bool, committed: bool,
} }
const N_QUERIES: usize = 5;
const N_CACHED_QUERIES: usize = 4;
const QUERIES: [&str; N_QUERIES] = [
"select v from cozo where k = ?;",
"insert into cozo(k, v) values (?, ?) on conflict(k) do update set v=excluded.v;",
"delete from cozo where k = ?;",
"select 1 from cozo where k = ?;",
"select k, v from cozo where k >= ? and k < ? order by k;",
];
const GET_QUERY: usize = 0;
const PUT_QUERY: usize = 1;
const DEL_QUERY: usize = 2;
const EXISTS_QUERY: usize = 3;
const RANGE_QUERY: usize = 4;
impl Drop for SqliteTx<'_> { impl Drop for SqliteTx<'_> {
fn drop(&mut self) { fn drop(&mut self) {
if let Right(RwLockWriteGuard { .. }) = self.lock { if let Right(RwLockWriteGuard { .. }) = self.lock {
if !self.committed { if !self.committed {
let query = r#"rollback;"#; let query = r#"rollback;"#;
let _ = self.conn.execute(query); let _ = self.conn.as_ref().unwrap().execute(query);
} }
} }
let mut pool = self.storage.pool.lock().unwrap();
let conn = self.conn.take().unwrap();
pool.push(conn)
}
}
impl<'s> SqliteTx<'s> {
fn ensure_stmt(&self, idx: usize) {
let mut stmt = self.stmts[idx].borrow_mut();
if stmt.is_none() {
let query = QUERIES[idx];
let prepared = self.conn.as_ref().unwrap().prepare(query).unwrap();
// Casting away the lifetime!
// This is OK because we are abiding by the contract of the underlying C pointer,
// as required by Sqlite's implementation
let prepared = unsafe { std::mem::transmute(prepared) };
*stmt = Some(prepared)
}
} }
} }
impl<'s> StoreTx<'s> for SqliteTx<'s> { impl<'s> StoreTx<'s> for SqliteTx<'s> {
fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Vec<u8>>> { fn get(&self, key: &[u8], _for_update: bool) -> Result<Option<Vec<u8>>> {
let query = r#" self.ensure_stmt(GET_QUERY);
select v from cozo where k = ?; let mut statement = self.stmts[GET_QUERY].borrow_mut();
"#; let statement = statement.as_mut().unwrap();
statement.reset().unwrap();
let mut statement = self.conn.prepare(query).unwrap();
statement.bind((1, key)).unwrap(); statement.bind((1, key)).unwrap();
Ok(match statement.next().into_diagnostic()? { Ok(match statement.next().into_diagnostic()? {
State::Row => { State::Row => {
@ -140,12 +195,11 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
} }
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> { fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
let query = r#" self.ensure_stmt(PUT_QUERY);
insert into cozo(k, v) values (?, ?) let mut statement = self.stmts[PUT_QUERY].borrow_mut();
on conflict(k) do update set v=excluded.v; let statement = statement.as_mut().unwrap();
"#; statement.reset().unwrap();
let mut statement = self.conn.prepare(query).unwrap();
statement.bind((1, key)).unwrap(); statement.bind((1, key)).unwrap();
statement.bind((2, val)).unwrap(); statement.bind((2, val)).unwrap();
while statement.next().into_diagnostic()? != State::Done {} while statement.next().into_diagnostic()? != State::Done {}
@ -153,10 +207,11 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
} }
fn del(&mut self, key: &[u8]) -> Result<()> { fn del(&mut self, key: &[u8]) -> Result<()> {
let query = r#" self.ensure_stmt(DEL_QUERY);
delete from cozo where k = ?; let mut statement = self.stmts[DEL_QUERY].borrow_mut();
"#; let statement = statement.as_mut().unwrap();
let mut statement = self.conn.prepare(query).unwrap(); statement.reset().unwrap();
statement.bind((1, key)).unwrap(); statement.bind((1, key)).unwrap();
while statement.next().into_diagnostic()? != State::Done {} while statement.next().into_diagnostic()? != State::Done {}
@ -164,10 +219,11 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
} }
fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> { fn exists(&self, key: &[u8], _for_update: bool) -> Result<bool> {
let query = r#" self.ensure_stmt(EXISTS_QUERY);
select 1 from cozo where k = ?; let mut statement = self.stmts[EXISTS_QUERY].borrow_mut();
"#; let statement = statement.as_mut().unwrap();
let mut statement = self.conn.prepare(query).unwrap(); statement.reset().unwrap();
statement.bind((1, key)).unwrap(); statement.bind((1, key)).unwrap();
Ok(match statement.next().into_diagnostic()? { Ok(match statement.next().into_diagnostic()? {
State::Row => true, State::Row => true,
@ -179,7 +235,7 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
if let Right(RwLockWriteGuard { .. }) = self.lock { if let Right(RwLockWriteGuard { .. }) = self.lock {
if !self.committed { if !self.committed {
let query = r#"commit;"#; let query = r#"commit;"#;
let mut statement = self.conn.prepare(query).unwrap(); let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap();
while statement.next().into_diagnostic()? != State::Done {} while statement.next().into_diagnostic()? != State::Done {}
self.committed = true; self.committed = true;
} else { } else {
@ -197,11 +253,10 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
where where
's: 'a, 's: 'a,
{ {
let query = r#" // Range scans cannot use cached prepared statements, as several of them
select k, v from cozo where k >= ? and k < ? // can be used at the same time.
order by k; let query = QUERIES[RANGE_QUERY];
"#; let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap();
let mut statement = self.conn.prepare(query).unwrap();
statement.bind((1, lower)).unwrap(); statement.bind((1, lower)).unwrap();
statement.bind((2, upper)).unwrap(); statement.bind((2, upper)).unwrap();
Box::new(TupleIter(statement)) Box::new(TupleIter(statement))
@ -215,11 +270,8 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
where where
's: 'a, 's: 'a,
{ {
let query = r#" let query = QUERIES[RANGE_QUERY];
select k, v from cozo where k >= ? and k < ? let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap();
order by k;
"#;
let mut statement = self.conn.prepare(query).unwrap();
statement.bind((1, lower)).unwrap(); statement.bind((1, lower)).unwrap();
statement.bind((2, upper)).unwrap(); statement.bind((2, upper)).unwrap();
Box::new(RawIter(statement)) Box::new(RawIter(statement))
@ -232,18 +284,17 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
where where
's: 'a, 's: 'a,
{ {
let query = r#" self.ensure_stmt(PUT_QUERY);
insert into cozo(k, v) values (?, ?) let mut statement = self.stmts[PUT_QUERY].borrow_mut();
on conflict(k) do update set v=excluded.v; let statement = statement.as_mut().unwrap();
"#; statement.reset().unwrap();
let mut statement = self.conn.prepare(query).unwrap();
for pair in data { for pair in data {
let (key, val) = pair?; let (key, val) = pair?;
statement.bind((1, key.as_slice())).unwrap(); statement.bind((1, key.as_slice())).unwrap();
statement.bind((2, val.as_slice())).unwrap(); statement.bind((2, val.as_slice())).unwrap();
while statement.next().into_diagnostic()? != State::Done {} while statement.next().into_diagnostic()? != State::Done {}
statement.reset().into_diagnostic()?; statement.reset().unwrap();
} }
Ok(()) Ok(())
} }

@ -26,7 +26,7 @@ lazy_static! {
_ = std::fs::remove_file(path); _ = std::fs::remove_file(path);
_ = std::fs::remove_dir_all(path); _ = std::fs::remove_dir_all(path);
let db_kind = env::var("COZO_TEST_DB_ENGINE").unwrap_or("mem".to_string()); let db_kind = env::var("COZO_TEST_DB_ENGINE").unwrap_or("mem".to_string());
println!("Using {} engine", db_kind);
let db = DbInstance::new(&db_kind, path, Default::default()).unwrap(); let db = DbInstance::new(&db_kind, path, Default::default()).unwrap();
dbg!(creation.elapsed()); dbg!(creation.elapsed());

@ -64,6 +64,9 @@ char *cozo_run_query(int32_t db_id, const char *script_raw, const char *params_r
/** /**
* Import data into relations * Import data into relations
* *
* Note that triggers are _not_ run for the relations, if any exists.
* If you need to activate triggers, use queries with parameters.
*
* `db_id`: the ID representing the database. * `db_id`: the ID representing the database.
* `json_payload`: a UTF-8 encoded JSON payload, in the same form as returned by exporting relations. * `json_payload`: a UTF-8 encoded JSON payload, in the same form as returned by exporting relations.
* *
@ -108,6 +111,9 @@ char *cozo_restore(int32_t db_id,
/** /**
* Import data into relations from a backup * Import data into relations from a backup
* *
* Note that triggers are _not_ run for the relations, if any exists.
* If you need to activate triggers, use queries with parameters.
*
* `db_id`: the ID representing the database. * `db_id`: the ID representing the database.
* `json_payload`: a UTF-8 encoded JSON payload: `{"path": ..., "relations": [...]}` * `json_payload`: a UTF-8 encoded JSON payload: `{"path": ..., "relations": [...]}`
* *

@ -69,7 +69,12 @@ fn main() {
eprintln!("{}", SECURITY_WARNING); eprintln!("{}", SECURITY_WARNING);
} }
let db = DbInstance::new(args.engine.as_str(), args.path.as_str(), &args.config.clone()).unwrap(); let db = DbInstance::new(
args.engine.as_str(),
args.path.as_str(),
&args.config.clone(),
)
.unwrap();
if let Some(restore_path) = &args.restore { if let Some(restore_path) = &args.restore {
db.restore_backup(restore_path).unwrap(); db.restore_backup(restore_path).unwrap();
@ -140,13 +145,7 @@ fn main() {
check_auth!(request, auth_guard); check_auth!(request, auth_guard);
} }
let relations = relations.split(",").filter_map(|t| { let relations = relations.split(',').filter(|t| !t.is_empty());
if t.is_empty() {
None
} else {
Some(t)
}
});
let result = db.export_relations(relations); let result = db.export_relations(relations);
match result { match result {
Ok(s) => { Ok(s) => {
@ -190,7 +189,7 @@ fn main() {
let payload: BackupPayload = try_or_400!(rouille::input::json_input(request)); let payload: BackupPayload = try_or_400!(rouille::input::json_input(request));
let result = db.backup_db(payload.path.clone()); let result = db.backup_db(payload.path);
match result { match result {
Ok(()) => { Ok(()) => {

Loading…
Cancel
Save