transact attributes over http

main
Ziyang Hu 2 years ago
parent 539083f7b5
commit 51a27163c6

@ -2,6 +2,8 @@
name = "cozo"
version = "0.1.0"
edition = "2021"
description = "A database in development"
authors = ["Ziyang Hu"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -24,6 +26,9 @@ rmpv = "1.0.0"
base64 = "0.13.0"
chrono = "0.4.19"
ordered-float = { version = "3.0", features = ["serde"] }
actix-web = "4.1.0"
clap = { version = "3.2.8", features = ["derive"] }
itertools = "0.10.3"
cozorocks = { path = "cozorocks" }
[target.'cfg(not(target_env = "msvc"))'.dependencies]

@ -0,0 +1,119 @@
use actix_web::{post, web, App, HttpResponse, HttpServer, Responder};
use clap::Parser;
use cozo::{AttrTxItem, Db};
use cozorocks::DbBuilder;
use std::fmt::{Debug, Display, Formatter};
use std::path::Path;
type Result<T> = std::result::Result<T, RespError>;
struct RespError {
err: anyhow::Error,
}
impl Debug for RespError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.err)
}
}
impl Display for RespError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.err)
}
}
impl actix_web::error::ResponseError for RespError {}
impl From<anyhow::Error> for RespError {
fn from(err: anyhow::Error) -> RespError {
RespError { err }
}
}
#[derive(Parser, Debug)]
#[clap(version, about, long_about=None)]
struct Args {
/// Path to the directory to store the database
#[clap(value_parser, default_value_t = String::from("cozo_db"))]
path: String,
/// Address to bind the service to
#[clap(short, long, default_value_t = String::from("127.0.0.1"))]
bind: String,
/// Port to use
#[clap(short, long, default_value_t = 9070)]
port: u16,
/// Temporary database, i.e. will be deleted when the program exits
#[clap(short, long, default_value_t = false, action)]
temp: bool,
}
struct AppStateWithDb {
db: Db,
}
#[post("/tx")]
async fn transact(
body: web::Json<serde_json::Value>,
data: web::Data<AppStateWithDb>,
) -> Result<impl Responder> {
dbg!(&body, &data.db);
Ok(HttpResponse::Ok().body("transact"))
}
#[post("/txa")]
async fn transact_attr(
body: web::Json<serde_json::Value>,
data: web::Data<AppStateWithDb>,
) -> Result<impl Responder> {
let (attrs, comment) = AttrTxItem::parse_request(&body)?;
let mut tx = data.db.transact_write()?;
tx.tx_attrs(attrs)?;
tx.commit_tx(&comment, false)?;
Ok(HttpResponse::Ok().body("transact-attr success"))
}
#[post("/q")]
async fn query(
body: web::Json<serde_json::Value>,
data: web::Data<AppStateWithDb>,
) -> Result<impl Responder> {
dbg!(&body, &data.db);
Ok(HttpResponse::Ok().body("query"))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let args = Args::parse();
if args.temp && Path::new(&args.path).exists() {
panic!(
"cannot open database at '{}' as temporary since it already exists",
args.path
);
}
let builder = DbBuilder::default()
.path(&args.path)
.create_if_missing(true)
.destroy_on_exit(args.temp);
let db = Db::build(builder).unwrap();
let app_state = web::Data::new(AppStateWithDb { db });
let addr = (&args.bind as &str, args.port);
eprintln!("Serving database at {}:{}", addr.0, addr.1);
HttpServer::new(move || {
App::new()
.app_data(app_state.clone())
.service(query)
.service(transact)
.service(transact_attr)
})
.bind(addr)?
.run()
.await
}

@ -86,6 +86,9 @@ impl AttrId {
0, b[1], b[2], b[3], b[4], b[5], b[6], b[7],
]))
}
pub(crate) fn is_perm(&self) -> bool {
*self > Self::MAX_TEMP && *self <= Self::MAX_PERM
}
}
impl From<u64> for AttrId {

@ -1,6 +1,6 @@
use crate::data::attr::{Attribute, AttributeCardinality, AttributeIndex, AttributeTyping};
use crate::data::id::{AttrId, EntityId, TxId};
use crate::data::keyword::Keyword;
use crate::data::keyword::{Keyword, KeywordError};
use crate::data::value::Value;
use serde_json::json;
pub(crate) use serde_json::Value as JsonValue;
@ -9,7 +9,7 @@ pub(crate) use serde_json::Value as JsonValue;
pub enum JsonError {
#[error("cannot convert JSON value {0} to {1}")]
Conversion(JsonValue, String),
#[error("missing field {1} in value {0}")]
#[error("missing field '{1}' in value {0}")]
MissingField(JsonValue, String),
}
@ -82,6 +82,9 @@ impl TryFrom<&'_ JsonValue> for Attribute {
.get("keyword")
.ok_or_else(|| JsonError::MissingField(value.clone(), "keyword".to_string()))?;
let keyword = Keyword::try_from(keyword)?;
if keyword.is_reserved() {
return Err(KeywordError::ReservedKeyword(keyword.clone()).into());
}
let cardinality = map
.get("cardinality")
.ok_or_else(|| JsonError::MissingField(value.clone(), "cardinality".to_string()))?
@ -97,14 +100,14 @@ impl TryFrom<&'_ JsonValue> for Attribute {
.ok_or_else(|| JsonError::Conversion(value.clone(), "AttributeTyping".to_string()))?;
let val_type = AttributeTyping::try_from(val_type)?;
let indexing = match map.get("type") {
let indexing = match map.get("index") {
None => AttributeIndex::None,
Some(v) => AttributeIndex::try_from(v.as_str().ok_or_else(|| {
JsonError::Conversion(value.clone(), "AttributeIndexing".to_string())
})?)?,
};
let with_history = match map.get("with_history") {
let with_history = match map.get("history") {
None => true,
Some(v) => v.as_bool().ok_or_else(|| {
JsonError::Conversion(value.clone(), "AttributeWithHistory".to_string())
@ -130,7 +133,7 @@ impl From<Attribute> for JsonValue {
"cardinality": attr.cardinality.to_string(),
"type": attr.val_type.to_string(),
"index": attr.indexing.to_string(),
"with_history": attr.with_history
"history": attr.with_history
})
}
}

@ -1,5 +1,7 @@
use lazy_static::lazy_static;
use serde_derive::{Deserialize, Serialize};
use smartstring::{LazyCompact, SmartString};
use std::collections::BTreeSet;
use std::fmt::{Debug, Display, Formatter};
use std::str::Utf8Error;
@ -8,6 +10,9 @@ pub enum KeywordError {
#[error("cannot convert to keyword: {0}")]
InvalidKeyword(String),
#[error("reserved keyword: {0}")]
ReservedKeyword(Keyword),
#[error(transparent)]
Utf8(#[from] Utf8Error),
}
@ -22,13 +27,17 @@ pub struct Keyword {
impl Display for Keyword {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.ns, self.ident)
if self.ns.is_empty() {
write!(f, ":{}", self.ident)
} else {
write!(f, ":{}/{}", self.ns, self.ident)
}
}
}
impl Debug for Keyword {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, ":{}", self)
write!(f, "{}", self)
}
}
@ -38,7 +47,15 @@ impl TryFrom<&str> for Keyword {
let make_err = || KeywordError::InvalidKeyword(value.to_string());
let mut kw_iter = value.split('/');
let ns = kw_iter.next().ok_or_else(make_err)?;
let ident = kw_iter.next().ok_or_else(make_err)?;
let ident = match kw_iter.next() {
None => {
return Ok(Keyword {
ns: "".into(),
ident: ns.into(),
})
}
Some(ident) => ident,
};
if kw_iter.next().is_none() {
Ok(Keyword {
ns: ns.into(),
@ -56,3 +73,9 @@ impl TryFrom<&[u8]> for Keyword {
std::str::from_utf8(value)?.try_into()
}
}
impl Keyword {
pub(crate) fn is_reserved(&self) -> bool {
self.ns.is_empty() && self.ident.starts_with('_')
}
}

@ -7,3 +7,5 @@ pub(crate) mod keyword;
pub(crate) mod triple;
pub(crate) mod tx;
pub(crate) mod value;
pub(crate) mod tx_attr;

@ -0,0 +1,74 @@
use crate::data::attr::Attribute;
use crate::data::triple::StoreOp;
use anyhow::Result;
use itertools::Itertools;
#[derive(Debug)]
pub struct AttrTxItem {
pub(crate) op: StoreOp,
pub(crate) attr: Attribute,
}
impl AttrTxItem {
pub fn parse_request(req: &serde_json::Value) -> Result<(Vec<AttrTxItem>, String)> {
let map = req
.as_object()
.ok_or_else(|| AttrTxItemError::Decoding(req.clone(), "expected object".to_string()))?;
let comment = match map.get("comment") {
None => "".to_string(),
Some(c) => c.to_string(),
};
let items = map.get("attrs").ok_or_else(|| {
AttrTxItemError::Decoding(req.clone(), "expected key 'attrs'".to_string())
})?;
let items = items.as_array().ok_or_else(|| {
AttrTxItemError::Decoding(items.clone(), "expected array".to_string())
})?;
if items.is_empty() {
return Err(AttrTxItemError::Decoding(
req.clone(),
"'attrs' cannot be empty".to_string(),
)
.into());
}
let res = items.iter().map(AttrTxItem::try_from).try_collect()?;
Ok((res, comment))
}
}
#[derive(Debug, thiserror::Error)]
pub enum AttrTxItemError {
#[error("Error decoding {0}: {1}")]
Decoding(serde_json::Value, String),
}
impl TryFrom<&'_ serde_json::Value> for AttrTxItem {
type Error = anyhow::Error;
fn try_from(value: &'_ serde_json::Value) -> Result<Self, Self::Error> {
let map = value.as_object().ok_or_else(|| {
AttrTxItemError::Decoding(value.clone(), "expected object".to_string())
})?;
if map.len() != 1 {
return Err(AttrTxItemError::Decoding(
value.clone(),
"object must have exactly one field".to_string(),
)
.into());
}
let (k, v) = map.into_iter().next().unwrap();
let op = match k as &str {
"put" => StoreOp::Assert,
"retract" => StoreOp::Retract,
_ => {
return Err(
AttrTxItemError::Decoding(value.clone(), format!("unknown op {}", k)).into(),
)
}
};
let attr = Attribute::try_from(v)?;
Ok(AttrTxItem { op, attr })
}
}

@ -14,3 +14,4 @@ pub(crate) mod transact;
pub(crate) mod utils;
pub use runtime::instance::Db;
pub use data::tx_attr::AttrTxItem;

@ -70,7 +70,7 @@ impl Db {
.store(tx.load_last_entity_id()?.0, Ordering::Release);
Ok(())
}
pub(crate) fn transact(&self) -> Result<SessionTx> {
pub fn transact(&self) -> Result<SessionTx> {
let ret = SessionTx {
tx: self.db.transact().set_snapshot(true).start(),
w_tx_id: None,
@ -85,7 +85,7 @@ impl Db {
};
Ok(ret)
}
pub(crate) fn transact_write(&self) -> Result<SessionTx> {
pub fn transact_write(&self) -> Result<SessionTx> {
let last_tx_id = self.last_tx_id.fetch_add(1, Ordering::AcqRel);
let cur_tx_id = TxId(last_tx_id + 1);

@ -13,7 +13,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
pub(crate) struct SessionTx {
pub struct SessionTx {
pub(crate) tx: Tx,
pub(crate) w_tx_id: Option<TxId>,
pub(crate) last_attr_id: Arc<AtomicU64>,
@ -96,7 +96,7 @@ impl SessionTx {
})
}
pub(crate) fn commit_tx(&mut self, comment: &str, refresh: bool) -> Result<()> {
pub fn commit_tx(&mut self, comment: &str, refresh: bool) -> Result<()> {
let tx_id = self.get_write_tx_id()?;
let encoded = encode_tx(tx_id);
@ -143,6 +143,8 @@ pub enum TransactError {
AttrConsistency(AttrId, String),
#[error("attribute not found {0:?}")]
AttrNotFound(AttrId),
#[error("attribute not found {0}")]
AttrNotFoundKw(Keyword),
#[error("attempt to write in read-only transaction")]
WriteInReadOnly,
#[error("attempt to change immutable property for attr {0:?}")]

@ -7,11 +7,33 @@ use crate::data::keyword::Keyword;
use crate::data::triple::StoreOp;
use crate::runtime::transact::{SessionTx, TransactError};
use crate::utils::swap_option_result;
use crate::AttrTxItem;
use anyhow::Result;
use cozorocks::{DbIter, IterBuilder};
use std::sync::atomic::Ordering;
impl SessionTx {
pub fn tx_attrs(&mut self, payloads: Vec<AttrTxItem>) -> Result<()> {
for item in payloads {
let id = item.attr.id;
let kw = item.attr.keyword.clone();
if item.op.is_retract() {
if item.attr.id.is_perm() {
self.retract_attr(item.attr.id)?;
} else {
self.retract_attr_by_kw(&item.attr.keyword)?;
}
} else if item.attr.id.is_perm() {
self.amend_attr(item.attr)?;
} else {
self.new_attr(item.attr)?;
}
self.attr_by_id_cache.remove(&id);
self.attr_by_kw_cache.remove(&kw);
}
Ok(())
}
pub(crate) fn attr_by_id(&mut self, aid: AttrId) -> Result<Option<Attribute>> {
if let Some(res) = self.attr_by_id_cache.get(&aid) {
return Ok(res.clone());
@ -155,6 +177,13 @@ impl SessionTx {
}
}
}
pub(crate) fn retract_attr_by_kw(&mut self, kw: &Keyword) -> Result<()> {
let attr = self
.attr_by_kw(kw)?
.ok_or_else(|| TransactError::AttrNotFoundKw(kw.clone()))?;
self.retract_attr(attr.id)
}
}
struct AttrIter {

Loading…
Cancel
Save