|
|
|
@ -5,7 +5,7 @@
|
|
|
|
|
use std::collections::BTreeMap;
|
|
|
|
|
use std::fmt::{Debug, Formatter};
|
|
|
|
|
use std::path::PathBuf;
|
|
|
|
|
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
|
|
|
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
|
|
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
|
|
|
|
use std::{fs, thread};
|
|
|
|
@ -62,19 +62,13 @@ const CURRENT_STORAGE_VERSION: u64 = 1;
|
|
|
|
|
pub struct Db {
|
|
|
|
|
db: RocksDb,
|
|
|
|
|
relation_store_id: Arc<AtomicU64>,
|
|
|
|
|
n_sessions: Arc<AtomicUsize>,
|
|
|
|
|
queries_count: Arc<AtomicU64>,
|
|
|
|
|
running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
|
|
|
|
|
session_id: usize,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Debug for Db {
|
|
|
|
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
|
|
|
|
write!(
|
|
|
|
|
f,
|
|
|
|
|
"Db<session {}, sessions: {:?}>",
|
|
|
|
|
self.session_id, self.n_sessions
|
|
|
|
|
)
|
|
|
|
|
write!(f, "Db")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -131,46 +125,27 @@ impl Db {
|
|
|
|
|
let ret = Self {
|
|
|
|
|
db,
|
|
|
|
|
relation_store_id: Arc::new(Default::default()),
|
|
|
|
|
n_sessions: Arc::new(Default::default()),
|
|
|
|
|
queries_count: Arc::new(Default::default()),
|
|
|
|
|
running_queries: Arc::new(Mutex::new(Default::default())),
|
|
|
|
|
session_id: Default::default(),
|
|
|
|
|
};
|
|
|
|
|
ret.load_last_ids()?;
|
|
|
|
|
Ok(ret)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn get_session_id(&self) -> usize {
|
|
|
|
|
self.session_id
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn compact_relation(&self) -> Result<()> {
|
|
|
|
|
fn compact_relation(&self) -> Result<()> {
|
|
|
|
|
let l = Tuple::default().encode_as_key(RelationId(0));
|
|
|
|
|
let u = Tuple(vec![DataValue::Bot]).encode_as_key(RelationId(u64::MAX));
|
|
|
|
|
self.db.range_compact(&l, &u)?;
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn new_session(&self) -> Result<Self> {
|
|
|
|
|
let old_count = self.n_sessions.fetch_add(1, Ordering::AcqRel);
|
|
|
|
|
|
|
|
|
|
Ok(Self {
|
|
|
|
|
db: self.db.clone(),
|
|
|
|
|
relation_store_id: self.relation_store_id.clone(),
|
|
|
|
|
n_sessions: self.n_sessions.clone(),
|
|
|
|
|
queries_count: self.queries_count.clone(),
|
|
|
|
|
running_queries: self.running_queries.clone(),
|
|
|
|
|
session_id: old_count + 1,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn load_last_ids(&self) -> Result<()> {
|
|
|
|
|
let tx = self.transact()?;
|
|
|
|
|
self.relation_store_id
|
|
|
|
|
.store(tx.load_last_relation_store_id()?.0, Ordering::Release);
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
pub fn transact(&self) -> Result<SessionTx> {
|
|
|
|
|
fn transact(&self) -> Result<SessionTx> {
|
|
|
|
|
let ret = SessionTx {
|
|
|
|
|
tx: self.db.transact().set_snapshot(true).start(),
|
|
|
|
|
mem_store_id: Default::default(),
|
|
|
|
@ -178,7 +153,7 @@ impl Db {
|
|
|
|
|
};
|
|
|
|
|
Ok(ret)
|
|
|
|
|
}
|
|
|
|
|
pub fn transact_write(&self) -> Result<SessionTx> {
|
|
|
|
|
fn transact_write(&self) -> Result<SessionTx> {
|
|
|
|
|
let ret = SessionTx {
|
|
|
|
|
tx: self.db.transact().set_snapshot(true).start(),
|
|
|
|
|
mem_store_id: Default::default(),
|
|
|
|
@ -611,7 +586,7 @@ impl Db {
|
|
|
|
|
} else {
|
|
|
|
|
Right(sorted_iter)
|
|
|
|
|
};
|
|
|
|
|
let sorted_iter = sorted_iter.map(|t| Ok(t));
|
|
|
|
|
let sorted_iter = sorted_iter.map(Ok);
|
|
|
|
|
if let Some((meta, relation_op)) = &input_program.out_opts.store_relation {
|
|
|
|
|
let to_clear = tx
|
|
|
|
|
.execute_relation(
|
|
|
|
@ -682,7 +657,7 @@ impl Db {
|
|
|
|
|
.collect_vec();
|
|
|
|
|
Ok(json!({"rows": res, "headers": ["id", "started_at"]}))
|
|
|
|
|
}
|
|
|
|
|
pub fn list_relation(&self, name: &str) -> Result<JsonValue> {
|
|
|
|
|
fn list_relation(&self, name: &str) -> Result<JsonValue> {
|
|
|
|
|
let tx = self.transact()?;
|
|
|
|
|
let handle = tx.get_relation(name, false)?;
|
|
|
|
|
let mut ret = vec![];
|
|
|
|
@ -709,7 +684,7 @@ impl Db {
|
|
|
|
|
}
|
|
|
|
|
Ok(json!({"rows": ret, "headers": ["column", "is_key", "index", "type", "has_default"]}))
|
|
|
|
|
}
|
|
|
|
|
pub fn list_relations(&self) -> Result<JsonValue> {
|
|
|
|
|
fn list_relations(&self) -> Result<JsonValue> {
|
|
|
|
|
let lower =
|
|
|
|
|
Tuple(vec![DataValue::Str(SmartString::from(""))]).encode_as_key(RelationId::SYSTEM);
|
|
|
|
|
let upper = Tuple(vec![DataValue::Str(SmartString::from(String::from(
|
|
|
|
|