@ -24,10 +24,10 @@ import (
"unsafe"
"unsafe"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/rqlite/rqlite/command"
"github.com/rqlite/rqlite/command"
legacy "github.com/rqlite/rqlite/command/legacy"
legacy "github.com/rqlite/rqlite/command/legacy"
sql "github.com/rqlite/rqlite/db"
sql "github.com/rqlite/rqlite/db"
rlog "github.com/rqlite/rqlite/log"
)
)
var (
var (
@ -122,12 +122,16 @@ type Store struct {
reqMarshaller * command . RequestMarshaler // Request marshaler for writing to log.
reqMarshaller * command . RequestMarshaler // Request marshaler for writing to log.
raftLog raft . LogStore // Persistent log store.
raftLog raft . LogStore // Persistent log store.
raftStable raft . StableStore // Persistent k-v store.
raftStable raft . StableStore // Persistent k-v store.
boltStore * r aftboltdb. BoltStore // Physical store.
boltStore * r log. Log // Physical store.
lastIdxOnOpen uint64 // Last index on log when Store opens.
onDiskCreated bool // On disk database actually created?
firstLogAppliedT time . Time // Time first log is applied
snapsExistOnOpen bool // Any snaps present when store opens?
appliedOnOpen uint64 // Number of logs applied at open.
firstIdxOnOpen uint64 // First index on log when Store opens.
openT time . Time // Timestamp when Store opens.
lastIdxOnOpen uint64 // Last index on log when Store opens.
lastCommandIdxOnOpen uint64 // Last command index on log when Store opens.
firstLogAppliedT time . Time // Time first log is applied
appliedOnOpen uint64 // Number of logs applied at open.
openT time . Time // Timestamp when Store opens.
txMu sync . RWMutex // Sync between snapshots and query-level transactions.
txMu sync . RWMutex // Sync between snapshots and query-level transactions.
queryMu sync . RWMutex // Sync queries generally with other operations.
queryMu sync . RWMutex // Sync queries generally with other operations.
@ -200,13 +204,6 @@ func (s *Store) Open(enableBootstrap bool) error {
return err
return err
}
}
// Open underlying database.
db , err := s . open ( )
if err != nil {
return err
}
s . db = db
// Create Raft-compatible network layer.
// Create Raft-compatible network layer.
s . raftTn = raft . NewNetworkTransport ( NewTransport ( s . ln ) , connectionPoolCount , connectionTimeout , nil )
s . raftTn = raft . NewNetworkTransport ( NewTransport ( s . ln ) , connectionPoolCount , connectionTimeout , nil )
@ -221,11 +218,17 @@ func (s *Store) Open(enableBootstrap bool) error {
if err != nil {
if err != nil {
return fmt . Errorf ( "file snapshot store: %s" , err )
return fmt . Errorf ( "file snapshot store: %s" , err )
}
}
snaps , err := snapshots . List ( )
if err != nil {
return fmt . Errorf ( "list snapshots: %s" , err )
}
s . logger . Printf ( "%d preexisting snapshots present" , len ( snaps ) )
s . snapsExistOnOpen = len ( snaps ) > 0
// Create the log store and stable store.
// Create the log store and stable store.
s . boltStore , err = raftboltdb . NewBoltStore ( filepath . Join ( s . raftDir , raftDBPath ) )
s . boltStore , err = r log. NewLog ( filepath . Join ( s . raftDir , raftDBPath ) )
if err != nil {
if err != nil {
return fmt . Errorf ( "new bolt store: %s" , err )
return fmt . Errorf ( "new log store: %s", err )
}
}
s . raftStable = s . boltStore
s . raftStable = s . boltStore
s . raftLog , err = raft . NewLogCache ( raftLogCacheSize , s . boltStore )
s . raftLog , err = raft . NewLogCache ( raftLogCacheSize , s . boltStore )
@ -234,9 +237,29 @@ func (s *Store) Open(enableBootstrap bool) error {
}
}
// Get some info about the log, before any more entries are committed.
// Get some info about the log, before any more entries are committed.
s . lastIdxOnOpen , err = s . raftLog . LastIndex ( )
if err := s . setLogInfo ( ) ; err != nil {
if err != nil {
return fmt . Errorf ( "set log info: %s" , err )
return fmt . Errorf ( "failed to get last index: %s" , err )
}
s . logger . Printf ( "first log index: %d, last log index: %d, last command log index: %d:" ,
s . firstIdxOnOpen , s . lastIdxOnOpen , s . lastCommandIdxOnOpen )
// If an on-disk database has been requested, and there are no snapshots, and
// there are no commands in the log, then this is the only opportunity to
// create that on-disk database file before Raft initializes.
if ! s . dbConf . Memory && ! s . snapsExistOnOpen && s . lastCommandIdxOnOpen == 0 {
db , err := s . openOnDisk ( )
if err != nil {
return fmt . Errorf ( "failed to open on-disk database" )
}
s . db = db
s . onDiskCreated = true
} else {
// We need an in-memory database, at least for bootstrapping purposes.
db , err := s . openInMemory ( )
if err != nil {
return fmt . Errorf ( "failed to open in-memory database" )
}
s . db = db
}
}
// Instantiate the Raft system.
// Instantiate the Raft system.
@ -437,13 +460,15 @@ func (s *Store) Stats() (map[string]interface{}, error) {
"version" : sql . DBVersion ,
"version" : sql . DBVersion ,
"db_size" : dbSz ,
"db_size" : dbSz ,
}
}
if ! s . dbConf . Memory {
if s . dbConf . Memory {
dbStatus [ "path" ] = ":memory:"
} else {
dbStatus [ "path" ] = s . dbPath
dbStatus [ "path" ] = s . dbPath
if dbStatus [ "size" ] , err = s . db . FileSize ( ) ; err != nil {
if s . onDiskCreated {
return nil , err
if dbStatus [ "size" ] , err = s . db . FileSize ( ) ; err != nil {
return nil , err
}
}
}
} else {
dbStatus [ "path" ] = ":memory:"
}
}
nodes , err := s . Nodes ( )
nodes , err := s . Nodes ( )
@ -771,31 +796,47 @@ func (s *Store) setMetadata(id string, md map[string]string) error {
return nil
return nil
}
}
// open opens the in-memory or file-based database.
// openOnDisk opens an empty in-memory database at the Store's configured path.
func ( s * Store ) open ( ) ( * sql . DB , error ) {
func ( s * Store ) openInMemory ( ) ( * sql . DB , error ) {
var db * sql . DB
db , err := sql . OpenInMemoryWithDSN ( s . dbConf . DSN )
var err error
if err != nil {
if s . dbConf . Memory {
return nil , err
db , err = sql . OpenInMemoryWithDSN ( s . dbConf . DSN )
}
if err != nil {
return db , nil
return nil , err
}
}
s . logger . Printf ( "SQLite %s database opened" , s . databaseTypePretty ( ) )
// openOnDisk opens an empty on-disk database at the Store's configured path.
} else {
func ( s * Store ) openOnDisk ( ) ( * sql . DB , error ) {
// Explicitly remove any pre-existing SQLite database file as it will be
// Explicitly remove any pre-existing SQLite database file as it will be
// completely rebuilt from committed log entries (and possibly a snapshot).
// completely rebuilt from committed log entries (and possibly a snapshot).
if err := os . Remove ( s . dbPath ) ; err != nil && ! os . IsNotExist ( err ) {
if err := os . Remove ( s . dbPath ) ; err != nil && ! os . IsNotExist ( err ) {
return nil , err
return nil , err
}
}
db , err = sql . OpenWithDSN ( s . dbPath , s . dbConf . DSN )
db , err := sql . OpenWithDSN ( s . dbPath , s . dbConf . DSN )
if err != nil {
if err != nil {
return nil , err
return nil , err
}
s . logger . Printf ( "SQLite %s database opened at %s" , s . databaseTypePretty ( ) , s . dbPath )
}
}
return db , nil
return db , nil
}
}
// setLogInfo records some key indexs about the log.
func ( s * Store ) setLogInfo ( ) error {
var err error
s . firstIdxOnOpen , err = s . boltStore . FirstIndex ( )
if err != nil {
return fmt . Errorf ( "failed to get last index: %s" , err )
}
s . lastIdxOnOpen , err = s . boltStore . LastIndex ( )
if err != nil {
return fmt . Errorf ( "failed to get last index: %s" , err )
}
s . lastCommandIdxOnOpen , err = s . boltStore . LastCommandIndex ( )
if err != nil {
return fmt . Errorf ( "failed to get last command index: %s" , err )
}
return nil
}
// remove removes the node, with the given ID, from the cluster.
// remove removes the node, with the given ID, from the cluster.
func ( s * Store ) remove ( id string ) error {
func ( s * Store ) remove ( id string ) error {
if s . raft . State ( ) != raft . Leader {
if s . raft . State ( ) != raft . Leader {
@ -877,13 +918,48 @@ type fsmGenericResponse struct {
}
}
// Apply applies a Raft log entry to the database.
// Apply applies a Raft log entry to the database.
func ( s * Store ) Apply ( l * raft . Log ) interface { } {
func ( s * Store ) Apply ( l * raft . Log ) ( e interface { } ) {
defer func ( ) {
defer func ( ) {
if l . Index <= s . lastIdxOnOpen {
if l . Index <= s . lastCommandIdxOnOpen {
// In here means at least one command entry was in the log when the Store
// opened.
s . appliedOnOpen ++
s . appliedOnOpen ++
if l . Index == s . lastIdxOnOpen {
if l . Index == s . last Command IdxOnOpen {
s . logger . Printf ( "%d committed log entries applied in %s, took %s since open" ,
s . logger . Printf ( "%d committed log entries applied in %s, took %s since open" ,
s . appliedOnOpen , time . Since ( s . firstLogAppliedT ) , time . Since ( s . openT ) )
s . appliedOnOpen , time . Since ( s . firstLogAppliedT ) , time . Since ( s . openT ) )
// Last command log applied. Time to switch to on-disk database?
if s . dbConf . Memory {
s . logger . Println ( "continuing use of in-memory database" )
} else {
// Since we're here, it means that a) an on-disk database was requested
// *and* there were commands in the log. A snapshot may or may not have
// been applied, but it wouldn't have created the on-disk database in that
// case since there were commands in the log. This is the very last chance
// to do it.
b , err := s . db . Serialize ( )
if err != nil {
e = & fsmGenericResponse { error : fmt . Errorf ( "serialize failed: %s" , err ) }
return
}
if err := s . db . Close ( ) ; err != nil {
e = & fsmGenericResponse { error : fmt . Errorf ( "close failed: %s" , err ) }
return
}
// Write new database to file on disk
if err := ioutil . WriteFile ( s . dbPath , b , 0660 ) ; err != nil {
e = & fsmGenericResponse { error : fmt . Errorf ( "write failed: %s" , err ) }
return
}
// Re-open it.
s . db , err = sql . OpenWithDSN ( s . dbPath , s . dbConf . DSN )
if err != nil {
e = & fsmGenericResponse { error : fmt . Errorf ( "open on-disk failed: %s" , err ) }
}
s . onDiskCreated = true
s . logger . Println ( "successfully switched to on-disk database" )
}
}
}
}
}
} ( )
} ( )
@ -1070,8 +1146,11 @@ func (s *Store) Restore(rc io.ReadCloser) error {
}
}
var db * sql . DB
var db * sql . DB
if ! s . dbConf . Memory {
if ! s . dbConf . Memory && s . lastCommandIdxOnOpen == 0 {
// Write snapshot over any existing database file.
// A snapshot clearly exists (this function has been called) but there
// are no command entries in the log -- so Apply will not be called.
// Therefore this is the last opportunity to create the on-disk database
// before Raft starts.
if err := ioutil . WriteFile ( s . dbPath , database , 0660 ) ; err != nil {
if err := ioutil . WriteFile ( s . dbPath , database , 0660 ) ; err != nil {
return err
return err
}
}
@ -1081,8 +1160,14 @@ func (s *Store) Restore(rc io.ReadCloser) error {
if err != nil {
if err != nil {
return fmt . Errorf ( "open with DSN: %s" , err )
return fmt . Errorf ( "open with DSN: %s" , err )
}
}
s . onDiskCreated = true
s . logger . Println ( "successfully switched to on-disk database due to restore" )
} else {
} else {
// In memory, so directly deserialize into the database.
// Deserialize into an in-memory database because a) an in-memory database
// has been requested, or b) while there was a snapshot, there are also
// command entries in the log. So by sticking with an in-memory database
// those entries will be applied in the fastest possible manner. We will
// defer creation of any database on disk until the Apply function.
db , err = sql . DeserializeInMemoryWithDSN ( database , s . dbConf . DSN )
db , err = sql . DeserializeInMemoryWithDSN ( database , s . dbConf . DSN )
if err != nil {
if err != nil {
return fmt . Errorf ( "DeserializeInMemoryWithDSN with DSN: %s" , err )
return fmt . Errorf ( "DeserializeInMemoryWithDSN with DSN: %s" , err )
@ -1093,8 +1178,6 @@ func (s *Store) Restore(rc io.ReadCloser) error {
return fmt . Errorf ( "failed to close pre-restore database: %s" , err )
return fmt . Errorf ( "failed to close pre-restore database: %s" , err )
}
}
s . db = db
s . db = db
s . logger . Printf ( "successfully restored %s database from snapshot" ,
s . databaseTypePretty ( ) )
// Unmarshal remaining bytes, and set to cluster meta.
// Unmarshal remaining bytes, and set to cluster meta.
err = func ( ) error {
err = func ( ) error {