1
0
Fork 0

Implement concurrent reads and writes

Disk doesn't support it yet, as it has not moved to WAL mode. Perhaps
wait for next release to do that.
master
Philip O'Toole 3 years ago
parent db1768c63f
commit 6dc9583dab

@ -54,10 +54,11 @@ func init() {
// DB is the SQL database.
type DB struct {
rwDB *sql.DB
roDB *sql.DB
path string // Path to database file.
memory bool // In-memory only.
rwDB *sql.DB
roDB *sql.DB
}
// PoolStats represents connection pool statistics
@ -92,31 +93,48 @@ type Rows struct {
// Open opens a file-based database, creating it if it does not exist.
func Open(dbPath string) (*DB, error) {
return nil, nil
// db, err := open(dbPath)
// if err != nil {
// return nil, err
// }
// db.db.SetConnMaxIdleTime(onDiskMaxIdleTime)
// db.db.SetConnMaxLifetime(0)
// db.db.SetMaxIdleConns(onDiskMaxOpenConns)
// db.db.SetMaxOpenConns(onDiskMaxOpenConns)
// return db, nil
}
rwOpts := strings.Join(
[]string{
"mode=rw",
}, "&")
// OpenInMemory returns a new in-memory database.
func OpenInMemory() (*DB, error) {
inMemPath := randomInMemoryDB()
rwDB, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbPath, rwOpts))
if err != nil {
return nil, err
}
// XXX NOT SURE IF WAL WILL WORK. SIMPLY COPYING THE DATABASE FILE
// WONT GET THE ENTIRE DATABASE. CAN WAL COMPACT BE FORCED?
// Create database connection for executes and queries
inMemPathRW := strings.Join(
roOpts := strings.Join(
[]string{
inMemPath + "&",
"mode=rw",
"_txlock=immediate",
"mode=ro",
}, "&")
rwDB, err := sql.Open("sqlite3", inMemPathRW)
roDB, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbPath, roOpts))
if err != nil {
return nil, err
}
// Set some reasonable connection pool behaviour.
rwDB.SetConnMaxIdleTime(30 * time.Second)
rwDB.SetConnMaxLifetime(0)
roDB.SetConnMaxIdleTime(30 * time.Second)
roDB.SetConnMaxLifetime(0)
return &DB{
path: dbPath,
rwDB: rwDB,
roDB: roDB,
}, nil
}
// OpenInMemory returns a new in-memory database.
func OpenInMemory() (*DB, error) {
inMemPath := fmt.Sprintf("file:/%s", randomString())
//rwDB, err := sql.Open("sqlite3", fmt.Sprintf("file:/%s?%s", inMemPath, rwOpts))
rwDB, err := sql.Open("sqlite3", fmt.Sprintf("file:/%s?mode=rw&vfs=memdb&_txlock=immediate", inMemPath))
if err != nil {
return nil, err
}
@ -128,15 +146,7 @@ func OpenInMemory() (*DB, error) {
rwDB.SetMaxIdleConns(1)
rwDB.SetMaxOpenConns(1)
// Create database connection just for queries
inMemPathRO := strings.Join(
[]string{
inMemPath + "&",
"mode=ro",
"_txlock=deferred",
}, "&")
roDB, err := sql.Open("sqlite3", inMemPathRO)
roDB, err := sql.Open("sqlite3", fmt.Sprintf("file:/%s?mode=ro&vfs=memdb&_txlock=deferred", inMemPath))
if err != nil {
return nil, err
}
@ -147,10 +157,10 @@ func OpenInMemory() (*DB, error) {
}, nil
}
// LoadInMemory loads an in-memory database with that at the path.
// LoadIntoMemory loads an in-memory database with that at the path.
// Not safe to call while other operations are happening with the
// source database.
func LoadInMemory(dbPath string) (*DB, error) {
func LoadIntoMemory(dbPath string) (*DB, error) {
dstDB, err := OpenInMemory()
if err != nil {
return nil, err
@ -172,42 +182,27 @@ func LoadInMemory(dbPath string) (*DB, error) {
return dstDB, nil
}
// DeserializeInMemory loads an in-memory database with that contained
// DeserializeIntoMemory loads an in-memory database with that contained
// in the byte slide. The byte slice must not be changed or garbage-collected
// until after this function returns.
func DeserializeInMemory(b []byte) (retDB *DB, retErr error) {
tmpDB, err := OpenInMemory()
func DeserializeIntoMemory(b []byte) (retDB *DB, retErr error) {
// Get a plain-ol' in-memory database.
tmpDB, err := sql.Open("sqlite3", ":memory:")
if err != nil {
return nil, fmt.Errorf("DeserializeInMemory: %s", err.Error())
return nil, fmt.Errorf("DeserializeIntoMemory: %s", err.Error())
}
defer tmpDB.Close()
tmpConn, err := tmpDB.rwDB.Conn(context.Background())
tmpConn, err := tmpDB.Conn(context.Background())
if err != nil {
return nil, err
}
if err := tmpConn.Raw(func(driverConn interface{}) error {
c := driverConn.(*sqlite3.SQLiteConn)
err2 := c.Deserialize(b, "")
if err2 != nil {
return fmt.Errorf("DeserializeInMemory: %s", err.Error())
}
return nil
}); err != nil {
return nil, err
}
// Testing shows closing the temp conn is necessary.
if err := tmpConn.Close(); err != nil {
return nil, fmt.Errorf("DeserializeInMemory: %s", err.Error())
}
// tmpDB is still using memory in Go space, so tmpDB needs to be explicitly
// copied to a new database.
// tmpDB will still be using memory in Go space, so tmpDB needs to be explicitly
// copied to a new database, which we create now.
db, err := OpenInMemory()
if err != nil {
return nil, fmt.Errorf("DeserializeInMemory: %s", err.Error())
return nil, fmt.Errorf("DeserializeIntoMemory: %s", err.Error())
}
defer func() {
// Don't leak a database if deserialization fails.
@ -216,8 +211,28 @@ func DeserializeInMemory(b []byte) (retDB *DB, retErr error) {
}
}()
if err := copyDatabase(db, tmpDB); err != nil {
return nil, fmt.Errorf("DeserializeInMemory: %s", err.Error())
if err := tmpConn.Raw(func(driverConn interface{}) error {
srcConn := driverConn.(*sqlite3.SQLiteConn)
err2 := srcConn.Deserialize(b, "")
if err2 != nil {
return fmt.Errorf("DeserializeIntoMemory: %s", err.Error())
}
defer srcConn.Close()
// Now copy from tmp database to the database this function will return.
dbConn, err3 := db.rwDB.Conn(context.Background())
if err3 != nil {
return fmt.Errorf("DeserializeIntoMemory: %s", err.Error())
}
defer dbConn.Close()
return dbConn.Raw(func(driverConn interface{}) error {
dstConn := driverConn.(*sqlite3.SQLiteConn)
return copyDatabaseConnection(dstConn, srcConn)
})
}); err != nil {
return nil, err
}
return db, nil
@ -231,24 +246,6 @@ func (db *DB) Close() error {
return db.roDB.Close()
}
func open(dbPath string) (*DB, error) {
return nil, nil
// db, err := sql.Open("sqlite3", dbPath)
// if err != nil {
// return nil, err
// }
// // Ensure database is basically healthy.
// if err := db.Ping(); err != nil {
// return nil, fmt.Errorf("Ping database: %s", err.Error())
// }
// return &DB{
// db: db,
// path: dbPath,
// }, nil
}
// EnableFKConstraints allows control of foreign key constraint checks.
func (db *DB) EnableFKConstraints(e bool) error {
q := fkChecksEnabled
@ -260,27 +257,6 @@ func (db *DB) EnableFKConstraints(e bool) error {
return err
}
// FKConstraints returns whether FK constraints are set or not.
func (db *DB) FKConstraints() (bool, error) {
r, err := db.QueryStringStmt(fkChecks)
if err != nil {
return false, err
}
if r[0].Values[0][0] == int64(1) {
return true, nil
}
return false, nil
}
// JournalMode returns the current journal mode.
func (db *DB) JournalMode() (string, error) {
r, err := db.QueryStringStmt(journalCheck)
if err != nil {
return "", err
}
return r[0].Values[0][0].(string), nil
}
// Size returns the size of the database in bytes. "Size" is defined as
// page_count * schema.page_size.
func (db *DB) Size() (int64, error) {
@ -724,28 +700,7 @@ func copyDatabase(dst *DB, src *DB) error {
// Define the backup function.
bf := func(driverConn interface{}) error {
srcSQLiteConn := driverConn.(*sqlite3.SQLiteConn)
bk, err := dstSQLiteConn.Backup("main", srcSQLiteConn, "main")
if err != nil {
return err
}
for {
done, err := bk.Step(-1)
if err != nil {
bk.Finish()
return err
}
if done {
break
}
time.Sleep(bkDelay * time.Millisecond)
}
if err := bk.Finish(); err != nil {
return err
}
return nil
return copyDatabaseConnection(dstSQLiteConn, srcSQLiteConn)
}
return dstConn.Raw(
@ -755,6 +710,26 @@ func copyDatabase(dst *DB, src *DB) error {
})
}
func copyDatabaseConnection(dst, src *sqlite3.SQLiteConn) error {
bk, err := dst.Backup("main", src, "main")
if err != nil {
return err
}
for {
done, err := bk.Step(-1)
if err != nil {
bk.Finish()
return err
}
if done {
break
}
time.Sleep(bkDelay * time.Millisecond)
}
return bk.Finish()
}
// parametersToValues maps values in the proto params to SQL driver values.
func parametersToValues(parameters []*command.Parameter) ([]interface{}, error) {
if parameters == nil {
@ -811,7 +786,7 @@ func isTextType(t string) bool {
strings.HasPrefix(t, "clob")
}
func randomInMemoryDB() string {
func randomString() string {
var output strings.Builder
chars := "abcdedfghijklmnopqrstABCDEFGHIJKLMNOP"
for i := 0; i < 20; i++ {
@ -819,5 +794,5 @@ func randomInMemoryDB() string {
randomChar := chars[random]
output.WriteString(string(randomChar))
}
return fmt.Sprintf("file:/%s?vfs=memdb", output.String())
return output.String()
}

@ -102,7 +102,7 @@ func Test_SQLiteMasterTable(t *testing.T) {
}
}
func Test_LoadInMemory(t *testing.T) {
func Test_LoadIntoMemory(t *testing.T) {
db, path := mustCreateDatabase()
defer db.Close()
defer os.Remove(path)
@ -120,7 +120,7 @@ func Test_LoadInMemory(t *testing.T) {
t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
}
inmem, err := LoadInMemory(path)
inmem, err := LoadIntoMemory(path)
if err != nil {
t.Fatalf("failed to create loaded in-memory database: %s", err.Error())
}
@ -135,7 +135,7 @@ func Test_LoadInMemory(t *testing.T) {
}
}
func Test_DeserializeInMemory(t *testing.T) {
func Test_DeserializeIntoMemory(t *testing.T) {
db, path := mustCreateDatabase()
defer db.Close()
defer os.Remove(path)
@ -174,7 +174,7 @@ func Test_DeserializeInMemory(t *testing.T) {
t.Fatalf("failed to read database on disk: %s", err.Error())
}
newDB, err := DeserializeInMemory(b)
newDB, err := DeserializeIntoMemory(b)
if err != nil {
t.Fatalf("failed to deserialize database: %s", err.Error())
}
@ -709,89 +709,6 @@ func Test_CommonTableExpressions(t *testing.T) {
}
}
func Test_ForeignKeyConstraints(t *testing.T) {
db, path := mustCreateDatabase()
defer db.Close()
defer os.Remove(path)
_, err := db.ExecuteStringStmt("CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, ref INTEGER REFERENCES foo(id))")
if err != nil {
t.Fatalf("failed to create table: %s", err.Error())
}
// Explicitly disable constraints.
if err := db.EnableFKConstraints(false); err != nil {
t.Fatalf("failed to enable foreign key constraints: %s", err.Error())
}
// Check constraints
fk, err := db.FKConstraints()
if err != nil {
t.Fatalf("failed to check FK constraints: %s", err.Error())
}
if fk != false {
t.Fatal("FK constraints are not disabled")
}
r, err := db.ExecuteStringStmt(`INSERT INTO foo(id, ref) VALUES(1, 2)`)
if err != nil {
t.Fatalf("failed to execute FK test statement: %s", err.Error())
}
if exp, got := `[{"last_insert_id":1,"rows_affected":1}]`, asJSON(r); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Explicitly enable constraints.
if err := db.EnableFKConstraints(true); err != nil {
t.Fatalf("failed to enable foreign key constraints: %s", err.Error())
}
// Check constraints
fk, err = db.FKConstraints()
if err != nil {
t.Fatalf("failed to check FK constraints: %s", err.Error())
}
if fk != true {
t.Fatal("FK constraints are not enabled")
}
r, err = db.ExecuteStringStmt(`INSERT INTO foo(id, ref) VALUES(1, 3)`)
if err != nil {
t.Fatalf("failed to execute FK test statement: %s", err.Error())
}
if exp, got := `[{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_JournalMode(t *testing.T) {
db, path := mustCreateDatabase()
defer db.Close()
defer os.Remove(path)
m, err := db.JournalMode()
if err != nil {
t.Fatalf("failed to check journal mode: %s", err.Error())
}
if exp, got := "delete", m; exp != got {
t.Fatalf("got wrong mode for journal, expected %s, got %s", exp, got)
}
_, err = db.ExecuteStringStmt(`PRAGMA journal_mode=off`)
if err != nil {
t.Fatalf(`failed to execute 'PRAGMA journal_mode' statement: %s`, err.Error())
}
m, err = db.JournalMode()
if err != nil {
t.Fatalf("failed to check journal mode: %s", err.Error())
}
if exp, got := "off", m; exp != got {
t.Fatalf("got wrong mode for journal, expected %s, got %s", exp, got)
}
}
func Test_UniqueConstraints(t *testing.T) {
db, path := mustCreateDatabase()
defer db.Close()
@ -1170,7 +1087,7 @@ func Test_DumpMemory(t *testing.T) {
defer db.Close()
defer os.Remove(path)
inmem, err := LoadInMemory(path)
inmem, err := LoadIntoMemory(path)
if err != nil {
t.Fatalf("failed to create loaded in-memory database: %s", err.Error())
}

@ -450,25 +450,14 @@ func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
// Stats returns stats for the store.
func (s *Store) Stats() (map[string]interface{}, error) {
fkEnabled, err := s.db.FKConstraints()
if err != nil {
return nil, err
}
dbSz, err := s.db.Size()
if err != nil {
return nil, err
}
jm, err := s.db.JournalMode()
if err != nil {
return nil, err
}
dbStatus := map[string]interface{}{
"fk_constraints": enabledFromBool(fkEnabled),
"version": sql.DBVersion,
"db_size": dbSz,
"conn_pool_stats": s.db.ConnectionPoolStats(),
"journal_mode": jm,
}
if s.dbConf.Memory {
dbStatus["path"] = ":memory:"
@ -756,7 +745,7 @@ func (s *Store) createInMemory(b []byte) (db *sql.DB, err error) {
if b == nil {
db, err = sql.OpenInMemory()
} else {
db, err = sql.DeserializeInMemory(b)
db, err = sql.DeserializeIntoMemory(b)
}
return
}

Loading…
Cancel
Save