1
0
Fork 0

Port DB layer to sql/database

Empty statements now cause panic.
master
Philip O'Toole 3 years ago
parent eb8dcf74ff
commit 561bb47a14

@ -5,13 +5,12 @@ package db
import (
"context"
"database/sql"
"database/sql/driver"
"expvar"
"fmt"
"io"
"math/rand"
"os"
"strings"
"sync"
"time"
"github.com/rqlite/go-sqlite3"
@ -21,6 +20,10 @@ import (
const bkDelay = 250
const (
connMaxIdleTime = time.Duration(30 * time.Second)
maxOpenConns = 128
maxIdleConns = 16
fkChecks = "PRAGMA foreign_keys"
fkChecksEnabled = "PRAGMA foreign_keys=ON"
fkChecksDisabled = "PRAGMA foreign_keys=OFF"
@ -51,12 +54,23 @@ func init() {
// DB is the SQL database.
type DB struct {
db *sql.DB // Std library database connection
sqlite3conn *sqlite3.SQLiteConn // Driver connection to database.
path string // Path to database file.
dsn string // DSN, if any.
memory bool // In-memory only.
mu sync.Mutex // Serialize use of DB driver connection
db *sql.DB // Std library database connection
path string // Path to database file.
dsn string // DSN, if any.
memory bool // In-memory only.
}
// PoolStats represents connection pool statistics
type PoolStats struct {
MaxOpenConnections int `json:"max_open_connections"`
OpenConnections int `json:"open_connections"`
InUse int `json:"in_use"`
Idle int `json:"idle"`
WaitCount int64 `json:"wait_count"`
WaitDuration time.Duration `json:"wait_duration"`
MaxIdleClosed int64 `json:"max_idle_closed"`
MaxIdleTimeClosed int64 `json:"max_idle_time_closed"`
MaxLifetimeClosed int64 `json:"max_lifetime_closed"`
}
// Result represents the outcome of an operation that changes rows.
@ -88,18 +102,19 @@ func OpenWithDSN(dbPath, dsn string) (*DB, error) {
// OpenInMemory opens an in-memory database.
func OpenInMemory() (*DB, error) {
return open(fqdsn("file:rqlite?mode=memory", ""))
return open(fqdsn(randomInMemoryDB(), ""))
}
// OpenInMemoryWithDSN opens an in-memory database with a specific DSN.
func OpenInMemoryWithDSN(dsn string) (*DB, error) {
return open(fqdsn("file:rqlite?mode=memory", dsn))
return open(fqdsn(randomInMemoryDB(), dsn))
}
// LoadInMemoryWithDSN loads an in-memory database with that at the path,
// with the specified DSN
// with the specified DSN. Not safe to call while other operations
// are happening with the source database.
func LoadInMemoryWithDSN(dbPath, dsn string) (*DB, error) {
db, err := OpenInMemoryWithDSN(dsn)
dstDB, err := OpenInMemoryWithDSN(dsn)
if err != nil {
return nil, err
}
@ -109,7 +124,7 @@ func LoadInMemoryWithDSN(dbPath, dsn string) (*DB, error) {
return nil, err
}
if err := copyDatabase(db.sqlite3conn, srcDB.sqlite3conn); err != nil {
if err := copyDatabase(dstDB, srcDB); err != nil {
return nil, err
}
@ -117,7 +132,7 @@ func LoadInMemoryWithDSN(dbPath, dsn string) (*DB, error) {
return nil, err
}
return db, nil
return dstDB, nil
}
// DeserializeInMemoryWithDSN loads an in-memory database with that contained
@ -130,18 +145,35 @@ func DeserializeInMemoryWithDSN(b []byte, dsn string) (*DB, error) {
}
defer tmpDB.Close()
if err := tmpDB.sqlite3conn.Deserialize(b, ""); err != nil {
tmpConn, err := tmpDB.db.Conn(context.Background())
if err != nil {
return nil, err
}
if err := tmpConn.Raw(func(driverConn interface{}) error {
c := driverConn.(*sqlite3.SQLiteConn)
err2 := c.Deserialize(b, "")
if err2 != nil {
return fmt.Errorf("DeserializeInMemoryWithDSN: %s", err.Error())
}
return nil
}); err != nil {
return nil, err
}
// Testing shows closing the temp conn is necessary.
if err := tmpConn.Close(); err != nil {
return nil, fmt.Errorf("DeserializeInMemoryWithDSN: %s", err.Error())
}
// tmpDB is still using memory in Go space, so it needs to be explicitly
// tmpDB is still using memory in Go space, so tmpDB needs to be explicitly
// copied to a new database.
db, err := OpenInMemoryWithDSN(dsn)
if err != nil {
return nil, fmt.Errorf("DeserializeInMemoryWithDSN: %s", err.Error())
}
if err := copyDatabase(db.sqlite3conn, tmpDB.sqlite3conn); err != nil {
if err := copyDatabase(db, tmpDB); err != nil {
return nil, fmt.Errorf("DeserializeInMemoryWithDSN: %s", err.Error())
}
@ -150,9 +182,6 @@ func DeserializeInMemoryWithDSN(b []byte, dsn string) (*DB, error) {
// Close closes the underlying database connection.
func (db *DB) Close() error {
if err := db.sqlite3conn.Close(); err != nil {
return err
}
return db.db.Close()
}
@ -162,16 +191,21 @@ func open(dbPath string) (*DB, error) {
return nil, err
}
d := sqlite3.SQLiteDriver{}
dbc, err := d.Open(dbPath)
if err != nil {
return nil, err
// Configure connection pool parameters such that rqlite behavior
// remains generally similar to previous versions that didn't use
// a pool. The pool may be configurable in a future release
db.SetConnMaxIdleTime(connMaxIdleTime)
db.SetMaxIdleConns(maxIdleConns)
db.SetMaxOpenConns(maxOpenConns)
// Ensure database is basically healthy.
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("Ping database: %s", err.Error())
}
return &DB{
db: db,
sqlite3conn: dbc.(*sqlite3.SQLiteConn),
path: dbPath,
db: db,
path: dbPath,
}, nil
}
@ -255,106 +289,103 @@ func (db *DB) ExecuteStringStmt(query string) ([]*Result, error) {
func (db *DB) Execute(req *command.Request, xTime bool) ([]*Result, error) {
stats.Add(numExecutions, int64(len(req.Statements)))
tx := req.Transaction
if tx {
stats.Add(numETx, 1)
conn, err := db.db.Conn(context.Background())
if err != nil {
return nil, err
}
defer conn.Close()
var allResults []*Result
err := func() error {
// // Check for the err, if set rollback.
// defer func() {
// if t != nil {
// if rollback {
// t.Rollback()
// return
// }
// t.Commit()
// }
// }()
// handleError sets the error field on the given result. It returns
// whether the caller should continue processing or break.
handleError := func(result *Result, err error) bool {
stats.Add(numExecutionErrors, 1)
result.Error = err.Error()
allResults = append(allResults, result)
return true
}
// // Create the correct execution object, depending on whether a
// // transaction was requested.
// if false {
// t, err = db.sqlite3conn.Begin()
// if err != nil {
// return err
// }
// }
conn, err := db.db.Conn(context.Background())
type Execer interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
var execer Execer
var tx *sql.Tx
if req.Transaction {
stats.Add(numETx, 1)
tx, err = conn.BeginTx(context.Background(), nil)
if err != nil {
return err
return nil, err
}
defer conn.Close()
// Execute each statement.
for _, stmt := range req.Statements {
ss := stmt.Sql
if ss == "" {
continue
defer func() {
if tx != nil {
tx.Rollback() // Will be ignored if tx is committed
}
}()
execer = tx
} else {
execer = conn
}
result := &Result{}
start := time.Now()
var allResults []*Result
parameters, err := parametersToValues(stmt.Parameters)
if err != nil {
if handleError(result, err) {
continue
}
break
}
// handleError sets the error field on the given result. It returns
// whether the caller should continue processing or break.
handleError := func(result *Result, err error) bool {
stats.Add(numExecutionErrors, 1)
result.Error = err.Error()
allResults = append(allResults, result)
if tx != nil {
tx.Rollback()
tx = nil
return false
}
return true
}
r, err := conn.ExecContext(context.Background(), ss, parameters...)
if err != nil {
if handleError(result, err) {
continue
}
break
}
// Execute each statement.
for _, stmt := range req.Statements {
ss := stmt.Sql
result := &Result{}
start := time.Now()
if r == nil {
parameters, err := parametersToValues(stmt.Parameters)
if err != nil {
if handleError(result, err) {
continue
}
break
}
lid, err := r.LastInsertId()
if err != nil {
if handleError(result, err) {
continue
}
break
r, err := execer.ExecContext(context.Background(), ss, parameters...)
if err != nil {
if handleError(result, err) {
continue
}
result.LastInsertID = lid
break
}
ra, err := r.RowsAffected()
if err != nil {
if handleError(result, err) {
continue
}
break
}
result.RowsAffected = ra
if xTime {
result.Time = time.Now().Sub(start).Seconds()
if r == nil {
continue
}
lid, err := r.LastInsertId()
if err != nil {
if handleError(result, err) {
continue
}
allResults = append(allResults, result)
break
}
result.LastInsertID = lid
return nil
}()
ra, err := r.RowsAffected()
if err != nil {
if handleError(result, err) {
continue
}
break
}
result.RowsAffected = ra
if xTime {
result.Time = time.Now().Sub(start).Seconds()
}
allResults = append(allResults, result)
}
if tx != nil {
err = tx.Commit()
}
return allResults, err
}
@ -373,126 +404,112 @@ func (db *DB) QueryStringStmt(query string) ([]*Rows, error) {
// Query executes queries that return rows, but don't modify the database.
func (db *DB) Query(req *command.Request, xTime bool) ([]*Rows, error) {
stats.Add(numQueries, int64(len(req.Statements)))
conn, err := db.db.Conn(context.Background())
if err != nil {
return nil, err
}
defer conn.Close()
return db.queryWithConn(req, xTime, conn)
}
tx := req.Transaction
if tx {
func (db *DB) queryWithConn(req *command.Request, xTime bool, conn *sql.Conn) ([]*Rows, error) {
var err error
type Queryer interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}
var queryer Queryer
var tx *sql.Tx
if req.Transaction {
stats.Add(numQTx, 1)
tx, err = conn.BeginTx(context.Background(), nil)
if err != nil {
return nil, err
}
defer tx.Rollback() // Will be ignored if tx is committed
queryer = tx
} else {
queryer = conn
}
var allRows []*Rows
err := func() (err error) {
var t driver.Tx
defer func() {
// XXX THIS DOESN'T ACTUALLY WORK! Might as WELL JUST COMMIT?
if t != nil {
if err != nil {
t.Rollback()
return
}
t.Commit()
}
}()
// Create the correct query object, depending on whether a
// transaction was requested. XXXX NEED FIXING
if tx {
t, err = db.sqlite3conn.Begin()
if err != nil {
return err
}
for _, stmt := range req.Statements {
sql := stmt.Sql
if sql == "" {
continue
}
conn, err := db.db.Conn(context.Background())
rows := &Rows{}
start := time.Now()
parameters, err := parametersToValues(stmt.Parameters)
if err != nil {
return err
rows.Error = err.Error()
allRows = append(allRows, rows)
continue
}
defer conn.Close()
for _, stmt := range req.Statements {
sql := stmt.Sql
if sql == "" {
continue
}
rows := &Rows{}
start := time.Now()
parameters, err := parametersToValues(stmt.Parameters)
if err != nil {
rows.Error = err.Error()
allRows = append(allRows, rows)
continue
}
rs, err := conn.QueryContext(context.Background(), sql, parameters...)
if err != nil {
rows.Error = err.Error()
allRows = append(allRows, rows)
continue
}
defer rs.Close()
rs, err := queryer.QueryContext(context.Background(), sql, parameters...)
if err != nil {
rows.Error = err.Error()
allRows = append(allRows, rows)
continue
}
defer rs.Close()
rows.Columns, err = rs.Columns()
if err != nil {
return err
}
rows.Columns, err = rs.Columns()
if err != nil {
return nil, err
}
types, err := rs.ColumnTypes()
if err != nil {
return err
}
rows.Types = make([]string, len(types))
for i := range types {
rows.Types[i] = strings.ToLower(types[i].DatabaseTypeName())
}
types, err := rs.ColumnTypes()
if err != nil {
return nil, err
}
rows.Types = make([]string, len(types))
for i := range types {
rows.Types[i] = strings.ToLower(types[i].DatabaseTypeName())
}
dest := make([]interface{}, 0)
for rs.Next() {
var v driver.Value
if err := rs.Scan(dest); err != nil {
return err
}
dest = append(dest, v)
values := normalizeRowValues(dest, rows.Types)
rows.Values = append(rows.Values, values)
for rs.Next() {
dest := make([]interface{}, len(rows.Columns))
ptrs := make([]interface{}, len(dest))
for i := range ptrs {
ptrs[i] = &dest[i]
}
// Check for errors from iterating over rows.
if err := rs.Err(); err != nil {
return err
if err := rs.Scan(ptrs...); err != nil {
return nil, err
}
values := normalizeRowValues(dest, rows.Types)
rows.Values = append(rows.Values, values)
}
if xTime {
rows.Time = time.Now().Sub(start).Seconds()
}
allRows = append(allRows, rows)
// Check for errors from iterating over rows.
if err := rs.Err(); err != nil {
return nil, err
}
return nil
}()
if xTime {
rows.Time = time.Now().Sub(start).Seconds()
}
allRows = append(allRows, rows)
}
if tx != nil {
err = tx.Commit()
}
return allRows, err
}
// Backup writes a consistent snapshot of the database to the given file.
// This function can be called when changes to the database are in flight.
func (db *DB) Backup(path string) error {
db.mu.Lock()
defer db.mu.Unlock()
dstDB, err := Open(path)
if err != nil {
return err
}
defer func(db *DB, err *error) {
cerr := db.sqlite3conn.Close()
if *err == nil {
*err = cerr
}
}(dstDB, &err)
if err := copyDatabase(dstDB.sqlite3conn, db.sqlite3conn); err != nil {
if err := copyDatabase(dstDB, db); err != nil {
return fmt.Errorf("backup database: %s", err)
}
return nil
@ -503,10 +520,7 @@ func (db *DB) Backup(path string) error {
// on-disk database. This function can be called when changes to the source
// database are in flight.
func (db *DB) Copy(dstDB *DB) error {
db.mu.Lock()
defer db.mu.Unlock()
if err := copyDatabase(dstDB.sqlite3conn, db.sqlite3conn); err != nil {
if err := copyDatabase(dstDB, db); err != nil {
return fmt.Errorf("copy database: %s", err)
}
return nil
@ -518,12 +532,24 @@ func (db *DB) Copy(dstDB *DB) error {
// is the same sequence of bytes which would be written to disk if that database
// were backed up to disk.
func (db *DB) Serialize() ([]byte, error) {
db.mu.Lock()
defer db.mu.Unlock()
conn, err := db.db.Conn(context.Background())
if err != nil {
return nil, err
}
defer conn.Close()
b := db.sqlite3conn.Serialize("")
if b == nil {
return nil, fmt.Errorf("failed to serialize database")
var b []byte
f := func(driverConn interface{}) error {
c := driverConn.(*sqlite3.SQLiteConn)
b = c.Serialize("")
if b == nil {
return fmt.Errorf("failed to serialize database")
}
return nil
}
if err := conn.Raw(f); err != nil {
return nil, err
}
return b, nil
}
@ -531,34 +557,31 @@ func (db *DB) Serialize() ([]byte, error) {
// Dump writes a consistent snapshot of the database in SQL text format.
// This function can be called when changes to the database are in flight.
func (db *DB) Dump(w io.Writer) error {
if _, err := w.Write([]byte("PRAGMA foreign_keys=OFF;\nBEGIN TRANSACTION;\n")); err != nil {
return err
}
// Get a new connection, so the dump creation is isolated from other activity.
dstDB, err := OpenInMemory()
conn, err := db.db.Conn(context.Background())
if err != nil {
return err
}
defer func(db *DB, err *error) {
cerr := db.sqlite3conn.Close()
if *err == nil {
*err = cerr
defer conn.Close()
// Convenience function to convert string query to protobuf.
commReq := func(query string) *command.Request {
return &command.Request{
Statements: []*command.Statement{
{
Sql: query,
},
},
}
}(dstDB, &err)
}
if err := func() error {
db.mu.Lock()
defer db.mu.Unlock()
return copyDatabase(dstDB.sqlite3conn, db.sqlite3conn)
}(); err != nil {
if _, err := w.Write([]byte("PRAGMA foreign_keys=OFF;\nBEGIN TRANSACTION;\n")); err != nil {
return err
}
// Get the schema.
query := `SELECT "name", "type", "sql" FROM "sqlite_master"
WHERE "sql" NOT NULL AND "type" == 'table' ORDER BY "name"`
rows, err := dstDB.QueryStringStmt(query)
rows, err := db.queryWithConn(commReq(query), false, conn)
if err != nil {
return err
}
@ -582,7 +605,8 @@ func (db *DB) Dump(w io.Writer) error {
}
tableIndent := strings.Replace(table, `"`, `""`, -1)
r, err := dstDB.QueryStringStmt(fmt.Sprintf(`PRAGMA table_info("%s")`, tableIndent))
r, err := db.queryWithConn(commReq(fmt.Sprintf(`PRAGMA table_info("%s")`, tableIndent)),
false, conn)
if err != nil {
return err
}
@ -595,7 +619,8 @@ func (db *DB) Dump(w io.Writer) error {
tableIndent,
strings.Join(columnNames, ","),
tableIndent)
r, err = dstDB.QueryStringStmt(query)
r, err = db.queryWithConn(commReq(query), false, conn)
if err != nil {
return err
}
@ -610,7 +635,7 @@ func (db *DB) Dump(w io.Writer) error {
// Do indexes, triggers, and views.
query = `SELECT "name", "type", "sql" FROM "sqlite_master"
WHERE "sql" NOT NULL AND "type" IN ('index', 'trigger', 'view')`
rows, err = dstDB.QueryStringStmt(query)
rows, err = db.queryWithConn(commReq(query), false, conn)
if err != nil {
return err
}
@ -628,29 +653,52 @@ func (db *DB) Dump(w io.Writer) error {
return nil
}
func copyDatabase(dst *sqlite3.SQLiteConn, src *sqlite3.SQLiteConn) error {
bk, err := dst.Backup("main", src, "main")
func copyDatabase(dst *DB, src *DB) error {
dstConn, err := dst.db.Conn(context.Background())
if err != nil {
return err
}
defer dstConn.Close()
srcConn, err := src.db.Conn(context.Background())
if err != nil {
return err
}
defer srcConn.Close()
var dstSQLiteConn *sqlite3.SQLiteConn
for {
done, err := bk.Step(-1)
// Define the backup function.
bf := func(driverConn interface{}) error {
srcSQLiteConn := driverConn.(*sqlite3.SQLiteConn)
bk, err := dstSQLiteConn.Backup("main", srcSQLiteConn, "main")
if err != nil {
bk.Finish()
return err
}
if done {
break
for {
done, err := bk.Step(-1)
if err != nil {
bk.Finish()
return err
}
if done {
break
}
time.Sleep(bkDelay * time.Millisecond)
}
time.Sleep(bkDelay * time.Millisecond)
}
if err := bk.Finish(); err != nil {
return err
if err := bk.Finish(); err != nil {
return err
}
return nil
}
return nil
return dstConn.Raw(
func(driverConn interface{}) error {
dstSQLiteConn = driverConn.(*sqlite3.SQLiteConn)
return srcConn.Raw(bf)
})
}
// parametersToValues maps values in the proto params to SQL driver values.
@ -716,3 +764,14 @@ func fqdsn(path, dsn string) string {
}
return path
}
func randomInMemoryDB() string {
var output strings.Builder
chars := "abcdedfghijklmnopqrstABCDEFGHIJKLMNOP"
for i := 0; i < 20; i++ {
random := rand.Intn(len(chars))
randomChar := chars[random]
output.WriteString(string(randomChar))
}
return fmt.Sprintf("file:%s?mode=memory", output.String())
}

@ -56,6 +56,31 @@ func Test_TableCreation(t *testing.T) {
}
}
// Test_TableCreationInMemory tests basic operation of an in-memory database,
// ensuring that using different connection objects (as the Execute and Query
// will do) works properly i.e. that the connections object work on the same
// in-memory database.
func Test_TableCreationInMemory(t *testing.T) {
db := mustCreateInMemoryDatabase()
defer db.Close()
r, err := db.ExecuteStringStmt("CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)")
if err != nil {
t.Fatalf("failed to create table: %s", err.Error())
}
if exp, got := `[{}]`, asJSON(r); exp != got {
t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
}
q, err := db.QueryStringStmt("SELECT * FROM foo")
if err != nil {
t.Fatalf("failed to query empty table: %s", err.Error())
}
if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(q); exp != got {
t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
}
}
func Test_SQLiteMasterTable(t *testing.T) {
db, path := mustCreateDatabase()
defer db.Close()
@ -785,48 +810,6 @@ func Test_DBFileSize(t *testing.T) {
}
}
func Test_ActiveTransaction(t *testing.T) {
db, path := mustCreateDatabase()
defer db.Close()
defer os.Remove(path)
if db.TransactionActive() {
t.Fatal("transaction incorrectly marked as active")
}
if _, err := db.ExecuteStringStmt(`BEGIN`); err != nil {
t.Fatalf("error starting transaction: %s", err.Error())
}
if !db.TransactionActive() {
t.Fatal("transaction incorrectly marked as inactive")
}
if _, err := db.ExecuteStringStmt(`COMMIT`); err != nil {
t.Fatalf("error starting transaction: %s", err.Error())
}
if db.TransactionActive() {
t.Fatal("transaction incorrectly marked as active")
}
if _, err := db.ExecuteStringStmt(`BEGIN`); err != nil {
t.Fatalf("error starting transaction: %s", err.Error())
}
if !db.TransactionActive() {
t.Fatal("transaction incorrectly marked as inactive")
}
if _, err := db.ExecuteStringStmt(`ROLLBACK`); err != nil {
t.Fatalf("error starting transaction: %s", err.Error())
}
if db.TransactionActive() {
t.Fatal("transaction incorrectly marked as active")
}
}
func Test_AbortTransaction(t *testing.T) {
db, path := mustCreateDatabase()
defer db.Close()
@ -840,17 +823,9 @@ func Test_AbortTransaction(t *testing.T) {
t.Fatalf("error starting transaction: %s", err.Error())
}
if !db.TransactionActive() {
t.Fatal("transaction incorrectly marked as inactive")
}
if err := db.AbortTransaction(); err != nil {
t.Fatalf("error abrorting non-active transaction: %s", err.Error())
}
if db.TransactionActive() {
t.Fatal("transaction incorrectly marked as active")
}
}
func Test_PartialFail(t *testing.T) {
@ -1214,6 +1189,14 @@ func mustCreateDatabase() (*DB, string) {
return db, f
}
func mustCreateInMemoryDatabase() *DB {
db, err := OpenInMemory()
if err != nil {
panic("failed to open in-memory database")
}
return db
}
func mustWriteAndOpenDatabase(b []byte) (*DB, string) {
var err error
f := mustTempFile()

@ -460,10 +460,11 @@ func (s *Store) Stats() (map[string]interface{}, error) {
return nil, err
}
dbStatus := map[string]interface{}{
"dsn": s.dbConf.DSN,
"fk_constraints": enabledFromBool(fkEnabled),
"version": sql.DBVersion,
"db_size": dbSz,
"dsn": s.dbConf.DSN,
"fk_constraints": enabledFromBool(fkEnabled),
"version": sql.DBVersion,
"db_size": dbSz,
"conn_pool_stats": s.db.ConnectionPoolStats(),
}
if s.dbConf.Memory {
dbStatus["path"] = ":memory:"

Loading…
Cancel
Save