|
|
@ -34,10 +34,9 @@ use log::{info, trace};
|
|
|
|
use rand::thread_rng;
|
|
|
|
use rand::thread_rng;
|
|
|
|
use skytable::actions::Actions;
|
|
|
|
use skytable::actions::Actions;
|
|
|
|
use skytable::Connection;
|
|
|
|
use skytable::Connection;
|
|
|
|
use skytable::Query;
|
|
|
|
use skytable::{Element, Query, RespCode, Response};
|
|
|
|
|
|
|
|
use std::collections::HashSet;
|
|
|
|
use std::env;
|
|
|
|
use std::env;
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
|
use std::sync::Mutex;
|
|
|
|
|
|
|
|
use sysinfo::{System, SystemExt};
|
|
|
|
use sysinfo::{System, SystemExt};
|
|
|
|
|
|
|
|
|
|
|
|
pub const DEFAULT_SIZE_KV: usize = 4;
|
|
|
|
pub const DEFAULT_SIZE_KV: usize = 4;
|
|
|
@ -82,15 +81,21 @@ fn stress_linearity_concurrent_clients(mut rng: &mut impl rand::Rng) {
|
|
|
|
trace!("Will spawn a maximum of {} workers", num_workers * 2);
|
|
|
|
trace!("Will spawn a maximum of {} workers", num_workers * 2);
|
|
|
|
let mut current_thread_count = 1usize;
|
|
|
|
let mut current_thread_count = 1usize;
|
|
|
|
let mut temp_con = Connection::new("127.0.0.1", 2003).exit_error("Failed to connect to server");
|
|
|
|
let mut temp_con = Connection::new("127.0.0.1", 2003).exit_error("Failed to connect to server");
|
|
|
|
temp_con.flushdb().unwrap();
|
|
|
|
// keys can't repeat, so we use a hashset
|
|
|
|
let keys: Vec<String> = (0..DEFAULT_QUERY_COUNT)
|
|
|
|
let mut keys: HashSet<String> = HashSet::with_capacity(DEFAULT_QUERY_COUNT);
|
|
|
|
.into_iter()
|
|
|
|
(0..DEFAULT_QUERY_COUNT).into_iter().for_each(|_| {
|
|
|
|
.map(|_| ran_string(DEFAULT_SIZE_KV, &mut rng))
|
|
|
|
let mut ran = ran_string(DEFAULT_SIZE_KV, &mut rng);
|
|
|
|
.collect();
|
|
|
|
while keys.contains(&ran) {
|
|
|
|
|
|
|
|
ran = ran_string(DEFAULT_SIZE_KV, &mut rng);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
keys.insert(ran);
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
// we don't care if the values do repeat
|
|
|
|
let values: Vec<String> = (0..DEFAULT_QUERY_COUNT)
|
|
|
|
let values: Vec<String> = (0..DEFAULT_QUERY_COUNT)
|
|
|
|
.into_iter()
|
|
|
|
.into_iter()
|
|
|
|
.map(|_| ran_string(DEFAULT_SIZE_KV, &mut rng))
|
|
|
|
.map(|_| ran_string(DEFAULT_SIZE_KV, &mut rng))
|
|
|
|
.collect();
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
temp_con.flushdb().unwrap();
|
|
|
|
while current_thread_count < (num_workers + 1) {
|
|
|
|
while current_thread_count < (num_workers + 1) {
|
|
|
|
log_client_linearity!(A, current_thread_count);
|
|
|
|
log_client_linearity!(A, current_thread_count);
|
|
|
|
let set_packs: Vec<Query> = keys
|
|
|
|
let set_packs: Vec<Query> = keys
|
|
|
@ -103,15 +108,14 @@ fn stress_linearity_concurrent_clients(mut rng: &mut impl rand::Rng) {
|
|
|
|
q
|
|
|
|
q
|
|
|
|
})
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
.collect();
|
|
|
|
let responses = Arc::new(Mutex::new(Vec::new()));
|
|
|
|
|
|
|
|
let workpool = Workpool::new(
|
|
|
|
let workpool = Workpool::new(
|
|
|
|
current_thread_count,
|
|
|
|
current_thread_count,
|
|
|
|
|| Connection::new("127.0.0.1", 2003).unwrap(),
|
|
|
|
|| Connection::new("127.0.0.1", 2003).unwrap(),
|
|
|
|
move |sock, query| {
|
|
|
|
move |sock, query| {
|
|
|
|
let resp = responses.clone();
|
|
|
|
assert_eq!(
|
|
|
|
resp.lock()
|
|
|
|
sock.run_simple_query(&query).unwrap(),
|
|
|
|
.unwrap()
|
|
|
|
Response::Item(Element::RespCode(RespCode::Okay))
|
|
|
|
.push(sock.run_simple_query(&query).unwrap());
|
|
|
|
);
|
|
|
|
},
|
|
|
|
},
|
|
|
|
|_| {},
|
|
|
|
|_| {},
|
|
|
|
true,
|
|
|
|
true,
|
|
|
|