make cf creation/deletion thread-safe

main
Ziyang Hu 2 years ago
parent f9fa75765d
commit fe44c00e3d

@ -5,7 +5,7 @@
#pragma once #pragma once
#include <memory> #include <memory>
#include <mutex> #include <shared_mutex>
#include "rust/cxx.h" #include "rust/cxx.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -14,6 +14,12 @@
#include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
typedef std::shared_mutex Lock;
typedef std::unique_lock <Lock> WriteLock;
typedef std::shared_lock <Lock> ReadLock;
using namespace ROCKSDB_NAMESPACE; using namespace ROCKSDB_NAMESPACE;
using std::unique_ptr; using std::unique_ptr;
using std::shared_ptr; using std::shared_ptr;
@ -235,13 +241,14 @@ inline unique_ptr <WriteBatchBridge> new_write_batch_raw() {
struct DBBridge { struct DBBridge {
mutable unique_ptr <DB> db; mutable unique_ptr <DB> db;
mutable unordered_map <string, shared_ptr<ColumnFamilyHandle>> handles; mutable unordered_map <string, shared_ptr<ColumnFamilyHandle>> handles;
mutable std::mutex handle_lock; mutable Lock handle_lock;
DBBridge(DB *db_, DBBridge(DB *db_,
unordered_map <string, shared_ptr<ColumnFamilyHandle>> &&handles_) : db(db_), handles(handles_) {} unordered_map <string, shared_ptr<ColumnFamilyHandle>> &&handles_) : db(db_), handles(handles_) {}
inline shared_ptr <ColumnFamilyHandle> get_cf_handle_raw(const string &name) const { inline shared_ptr <ColumnFamilyHandle> get_cf_handle_raw(const string &name) const {
ReadLock r_lock(handle_lock);
try { try {
return handles.at(name); return handles.at(name);
} catch (const std::out_of_range &) { } catch (const std::out_of_range &) {
@ -311,20 +318,22 @@ struct DBBridge {
} }
inline void create_column_family_raw(const OptionsBridge &options, const string &name, BridgeStatus &status) const { inline void create_column_family_raw(const OptionsBridge &options, const string &name, BridgeStatus &status) const {
{
ReadLock r_lock(handle_lock);
if (handles.find(name) != handles.end()) { if (handles.find(name) != handles.end()) {
write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 2); write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 2);
return; return;
} }
handle_lock.lock(); }
WriteLock w_lock(handle_lock);
ColumnFamilyHandle *handle; ColumnFamilyHandle *handle;
auto s = db->CreateColumnFamily(options.inner, name, &handle); auto s = db->CreateColumnFamily(options.inner, name, &handle);
write_status(std::move(s), status); write_status(std::move(s), status);
handles[name] = shared_ptr<ColumnFamilyHandle>(handle); handles[name] = shared_ptr<ColumnFamilyHandle>(handle);
handle_lock.unlock();
} }
inline void drop_column_family_raw(const string &name, BridgeStatus &status) const { inline void drop_column_family_raw(const string &name, BridgeStatus &status) const {
handle_lock.lock(); WriteLock w_lock(handle_lock);
auto cf_it = handles.find(name); auto cf_it = handles.find(name);
if (cf_it != handles.end()) { if (cf_it != handles.end()) {
auto s = db->DropColumnFamily(cf_it->second.get()); auto s = db->DropColumnFamily(cf_it->second.get());
@ -333,11 +342,11 @@ struct DBBridge {
} else { } else {
write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 3); write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 3);
} }
handle_lock.unlock();
// When should we call DestroyColumnFamilyHandle? // When should we call DestroyColumnFamilyHandle?
} }
inline unique_ptr <vector<string>> get_column_family_names_raw() const { inline unique_ptr <vector<string>> get_column_family_names_raw() const {
ReadLock r_lock(handle_lock);
auto ret = make_unique < vector < string >> (); auto ret = make_unique < vector < string >> ();
for (auto entry: handles) { for (auto entry: handles) {
ret->push_back(entry.first); ret->push_back(entry.first);

Loading…
Cancel
Save