1
0
Fork 0

Merge pull request #1700 from rqlite/fix-non-open-store-panics

Fix non open store panics
master
Philip O'Toole 7 months ago committed by GitHub
commit c6920d23e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,3 +1,7 @@
## 8.21.3 (unreleased)
### Implementation changes and bug fixes
- [PR #1700](https://github.com/rqlite/rqlite/pull/1700): Accessing non-open Store shouldn't panic. Fixes issue [#1698](https://github.com/rqlite/rqlite/issues/1698).
## 8.21.2 (February 23rd 2024) ## 8.21.2 (February 23rd 2024)
### Implementation changes and bug fixes ### Implementation changes and bug fixes
- [PR #1697](https://github.com/rqlite/rqlite/pull/1697): Use consistent file perms post Boot. - [PR #1697](https://github.com/rqlite/rqlite/pull/1697): Use consistent file perms post Boot.

@ -2,6 +2,7 @@ package store
import ( import (
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -43,3 +44,28 @@ func (t *AtomicTime) Sub(tt *AtomicTime) time.Duration {
defer t.mu.RUnlock() defer t.mu.RUnlock()
return t.t.Sub(tt.t) return t.t.Sub(tt.t)
} }
// AtomicBool is a boolean with atomic operations.
type AtomicBool struct {
state int32 // 1 for true, 0 for false
}
// NewAtomicBool returns a new AtomicBool initialized to false.
func NewAtomicBool() *AtomicBool {
return &AtomicBool{state: 0}
}
// Set sets the AtomicBool to true.
func (b *AtomicBool) Set() {
atomic.StoreInt32(&b.state, 1)
}
// Unset sets the AtomicBool to false.
func (b *AtomicBool) Unset() {
atomic.StoreInt32(&b.state, 0)
}
// Is returns true if the AtomicBool is true, false otherwise.
func (b *AtomicBool) Is() bool {
return atomic.LoadInt32(&b.state) == 1
}

@ -238,7 +238,7 @@ const (
// Store is a SQLite database, where all changes are made via Raft consensus. // Store is a SQLite database, where all changes are made via Raft consensus.
type Store struct { type Store struct {
open bool open *AtomicBool
raftDir string raftDir string
snapshotDir string snapshotDir string
peersPath string peersPath string
@ -364,6 +364,7 @@ func New(ly Layer, c *Config) *Store {
} }
return &Store{ return &Store{
open: NewAtomicBool(),
ly: ly, ly: ly,
raftDir: c.Dir, raftDir: c.Dir,
snapshotDir: filepath.Join(c.Dir, snapshotsDirName), snapshotDir: filepath.Join(c.Dir, snapshotsDirName),
@ -398,7 +399,7 @@ func New(ly Layer, c *Config) *Store {
// and setting the restore path means the Store will not report // and setting the restore path means the Store will not report
// itself as ready until a restore has been attempted. // itself as ready until a restore has been attempted.
func (s *Store) SetRestorePath(path string) error { func (s *Store) SetRestorePath(path string) error {
if s.open { if s.open.Is() {
return ErrOpen return ErrOpen
} }
@ -415,11 +416,11 @@ func (s *Store) SetRestorePath(path string) error {
func (s *Store) Open() (retErr error) { func (s *Store) Open() (retErr error) {
defer func() { defer func() {
if retErr == nil { if retErr == nil {
s.open = true s.open.Set()
} }
}() }()
if s.open { if s.open.Is() {
return ErrOpen return ErrOpen
} }
@ -591,6 +592,9 @@ func (s *Store) Bootstrap(servers ...*Server) error {
// the cluster. If this node is not the leader, and 'wait' is true, an error // the cluster. If this node is not the leader, and 'wait' is true, an error
// will be returned. // will be returned.
func (s *Store) Stepdown(wait bool) error { func (s *Store) Stepdown(wait bool) error {
if !s.open.Is() {
return ErrNotOpen
}
f := s.raft.LeadershipTransfer() f := s.raft.LeadershipTransfer()
if !wait { if !wait {
return nil return nil
@ -650,10 +654,10 @@ func (s *Store) Close(wait bool) (retErr error) {
defer func() { defer func() {
if retErr == nil { if retErr == nil {
s.logger.Printf("store closed with node ID %s, listening on %s", s.raftID, s.ly.Addr().String()) s.logger.Printf("store closed with node ID %s, listening on %s", s.raftID, s.ly.Addr().String())
s.open = false s.open.Unset()
} }
}() }()
if !s.open { if !s.open.Is() {
// Protect against closing already-closed resource, such as channels. // Protect against closing already-closed resource, such as channels.
return nil return nil
} }
@ -760,11 +764,17 @@ func (s *Store) DBAppliedIndex() uint64 {
// IsLeader is used to determine if the current node is cluster leader // IsLeader is used to determine if the current node is cluster leader
func (s *Store) IsLeader() bool { func (s *Store) IsLeader() bool {
if !s.open.Is() {
return false
}
return s.raft.State() == raft.Leader return s.raft.State() == raft.Leader
} }
// HasLeader returns true if the cluster has a leader, false otherwise. // HasLeader returns true if the cluster has a leader, false otherwise.
func (s *Store) HasLeader() bool { func (s *Store) HasLeader() bool {
if !s.open.Is() {
return false
}
return s.raft.Leader() != "" return s.raft.Leader() != ""
} }
@ -772,6 +782,9 @@ func (s *Store) HasLeader() bool {
// is no reference to the current node in the current cluster configuration then // is no reference to the current node in the current cluster configuration then
// false will also be returned. // false will also be returned.
func (s *Store) IsVoter() (bool, error) { func (s *Store) IsVoter() (bool, error) {
if !s.open.Is() {
return false, ErrNotOpen
}
cfg := s.raft.GetConfiguration() cfg := s.raft.GetConfiguration()
if err := cfg.Error(); err != nil { if err := cfg.Error(); err != nil {
return false, err return false, err
@ -786,6 +799,9 @@ func (s *Store) IsVoter() (bool, error) {
// State returns the current node's Raft state // State returns the current node's Raft state
func (s *Store) State() ClusterState { func (s *Store) State() ClusterState {
if !s.open.Is() {
return Unknown
}
state := s.raft.State() state := s.raft.State()
switch state { switch state {
case raft.Leader: case raft.Leader:
@ -813,7 +829,7 @@ func (s *Store) Path() string {
// Addr returns the address of the store. // Addr returns the address of the store.
func (s *Store) Addr() string { func (s *Store) Addr() string {
if !s.open { if !s.open.Is() {
return "" return ""
} }
return string(s.raftTn.LocalAddr()) return string(s.raftTn.LocalAddr())
@ -827,7 +843,7 @@ func (s *Store) ID() string {
// LeaderAddr returns the address of the current leader. Returns a // LeaderAddr returns the address of the current leader. Returns a
// blank string if there is no leader or if the Store is not open. // blank string if there is no leader or if the Store is not open.
func (s *Store) LeaderAddr() (string, error) { func (s *Store) LeaderAddr() (string, error) {
if !s.open { if !s.open.Is() {
return "", nil return "", nil
} }
addr, _ := s.raft.LeaderWithID() addr, _ := s.raft.LeaderWithID()
@ -837,7 +853,7 @@ func (s *Store) LeaderAddr() (string, error) {
// LeaderID returns the node ID of the Raft leader. Returns a // LeaderID returns the node ID of the Raft leader. Returns a
// blank string if there is no leader, or an error. // blank string if there is no leader, or an error.
func (s *Store) LeaderID() (string, error) { func (s *Store) LeaderID() (string, error) {
if !s.open { if !s.open.Is() {
return "", nil return "", nil
} }
_, id := s.raft.LeaderWithID() _, id := s.raft.LeaderWithID()
@ -847,7 +863,7 @@ func (s *Store) LeaderID() (string, error) {
// LeaderWithID is used to return the current leader address and ID of the cluster. // LeaderWithID is used to return the current leader address and ID of the cluster.
// It may return empty strings if there is no current leader or the leader is unknown. // It may return empty strings if there is no current leader or the leader is unknown.
func (s *Store) LeaderWithID() (string, string) { func (s *Store) LeaderWithID() (string, string) {
if !s.open { if !s.open.Is() {
return "", "" return "", ""
} }
addr, id := s.raft.LeaderWithID() addr, id := s.raft.LeaderWithID()
@ -856,6 +872,9 @@ func (s *Store) LeaderWithID() (string, string) {
// CommitIndex returns the Raft commit index. // CommitIndex returns the Raft commit index.
func (s *Store) CommitIndex() (uint64, error) { func (s *Store) CommitIndex() (uint64, error) {
if !s.open.Is() {
return 0, ErrNotOpen
}
return s.raft.CommitIndex(), nil return s.raft.CommitIndex(), nil
} }
@ -863,6 +882,9 @@ func (s *Store) CommitIndex() (uint64, error) {
// by the latest AppendEntries RPC. If this node is the Leader then the // by the latest AppendEntries RPC. If this node is the Leader then the
// commit index is returned directly from the Raft object. // commit index is returned directly from the Raft object.
func (s *Store) LeaderCommitIndex() (uint64, error) { func (s *Store) LeaderCommitIndex() (uint64, error) {
if !s.open.Is() {
return 0, ErrNotOpen
}
if s.raft.State() == raft.Leader { if s.raft.State() == raft.Leader {
return s.raft.CommitIndex(), nil return s.raft.CommitIndex(), nil
} }
@ -871,7 +893,7 @@ func (s *Store) LeaderCommitIndex() (uint64, error) {
// Nodes returns the slice of nodes in the cluster, sorted by ID ascending. // Nodes returns the slice of nodes in the cluster, sorted by ID ascending.
func (s *Store) Nodes() ([]*Server, error) { func (s *Store) Nodes() ([]*Server, error) {
if !s.open { if !s.open.Is() {
return nil, ErrNotOpen return nil, ErrNotOpen
} }
@ -989,7 +1011,7 @@ func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, erro
// Stats returns stats for the store. // Stats returns stats for the store.
func (s *Store) Stats() (map[string]interface{}, error) { func (s *Store) Stats() (map[string]interface{}, error) {
if !s.open { if !s.open.Is() {
return map[string]interface{}{ return map[string]interface{}{
"open": false, "open": false,
}, nil }, nil
@ -1099,7 +1121,7 @@ func (s *Store) Stats() (map[string]interface{}, error) {
// Execute executes queries that return no rows, but do modify the database. // Execute executes queries that return no rows, but do modify the database.
func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteResult, error) { func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteResult, error) {
if !s.open { if !s.open.Is() {
return nil, ErrNotOpen return nil, ErrNotOpen
} }
@ -1142,7 +1164,7 @@ func (s *Store) execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteResult, error
// Query executes queries that return rows, and do not modify the database. // Query executes queries that return rows, and do not modify the database.
func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) { func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) {
if !s.open { if !s.open.Is() {
return nil, ErrNotOpen return nil, ErrNotOpen
} }
@ -1201,7 +1223,7 @@ func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) {
// Request processes a request that may contain both Executes and Queries. // Request processes a request that may contain both Executes and Queries.
func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryResponse, error) { func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryResponse, error) {
if !s.open { if !s.open.Is() {
return nil, ErrNotOpen return nil, ErrNotOpen
} }
nRW, _ := s.RORWCount(eqr) nRW, _ := s.RORWCount(eqr)
@ -1283,7 +1305,7 @@ func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryRe
// will be written directly to that file. Otherwise a temporary file will be created, // will be written directly to that file. Otherwise a temporary file will be created,
// and that temporary file copied to dst. // and that temporary file copied to dst.
func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error) { func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error) {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
@ -1368,7 +1390,7 @@ func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error) {
// Loads an entire SQLite file into the database, sending the request // Loads an entire SQLite file into the database, sending the request
// through the Raft log. // through the Raft log.
func (s *Store) Load(lr *proto.LoadRequest) error { func (s *Store) Load(lr *proto.LoadRequest) error {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
@ -1538,7 +1560,7 @@ func (s *Store) Database(leader bool) ([]byte, error) {
// //
// Notifying is idempotent. A node may repeatedly notify the Store without issue. // Notifying is idempotent. A node may repeatedly notify the Store without issue.
func (s *Store) Notify(nr *proto.NotifyRequest) error { func (s *Store) Notify(nr *proto.NotifyRequest) error {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
@ -1597,7 +1619,7 @@ func (s *Store) Notify(nr *proto.NotifyRequest) error {
// Join joins a node, identified by id and located at addr, to this store. // Join joins a node, identified by id and located at addr, to this store.
// The node must be ready to respond to Raft communications at that address. // The node must be ready to respond to Raft communications at that address.
func (s *Store) Join(jr *proto.JoinRequest) error { func (s *Store) Join(jr *proto.JoinRequest) error {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
@ -1665,7 +1687,7 @@ func (s *Store) Join(jr *proto.JoinRequest) error {
// Remove removes a node from the store. // Remove removes a node from the store.
func (s *Store) Remove(rn *proto.RemoveNodeRequest) error { func (s *Store) Remove(rn *proto.RemoveNodeRequest) error {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
id := rn.Id id := rn.Id

@ -22,6 +22,47 @@ import (
"github.com/rqlite/rqlite/v8/testdata/chinook" "github.com/rqlite/rqlite/v8/testdata/chinook"
) )
// Test_StoreSingleNode tests that a non-open Store handles public methods correctly.
func Test_NonOpenStore(t *testing.T) {
s, ln := mustNewStore(t)
defer s.Close(true)
defer ln.Close()
if err := s.Stepdown(false); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if s.IsLeader() {
t.Fatalf("store incorrectly marked as leader")
}
if s.HasLeader() {
t.Fatalf("store incorrectly marked as having leader")
}
if _, err := s.IsVoter(); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if s.State() != Unknown {
t.Fatalf("wrong cluster state returned for non-open store")
}
if _, err := s.CommitIndex(); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if _, err := s.LeaderCommitIndex(); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if addr, err := s.LeaderAddr(); addr != "" || err != nil {
t.Fatalf("wrong leader address returned for non-open store: %s", addr)
}
if id, err := s.LeaderID(); id != "" || err != nil {
t.Fatalf("wrong leader ID returned for non-open store: %s", id)
}
if addr, id := s.LeaderWithID(); addr != "" || id != "" {
t.Fatalf("wrong leader address and ID returned for non-open store: %s", id)
}
if _, err := s.Nodes(); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
}
// Test_StoreSingleNode tests that a single node basically operates. // Test_StoreSingleNode tests that a single node basically operates.
func Test_OpenStoreSingleNode(t *testing.T) { func Test_OpenStoreSingleNode(t *testing.T) {
s, ln := mustNewStore(t) s, ln := mustNewStore(t)
@ -495,7 +536,7 @@ func Test_OpenStoreCloseSingleNode(t *testing.T) {
if err := s.Open(); err != nil { if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error()) t.Fatalf("failed to open single-node store: %s", err.Error())
} }
if !s.open { if !s.open.Is() {
t.Fatalf("store not marked as open") t.Fatalf("store not marked as open")
} }
@ -522,7 +563,7 @@ func Test_OpenStoreCloseSingleNode(t *testing.T) {
if err := s.Close(true); err != nil { if err := s.Close(true); err != nil {
t.Fatalf("failed to close single-node store: %s", err.Error()) t.Fatalf("failed to close single-node store: %s", err.Error())
} }
if s.open { if s.open.Is() {
t.Fatalf("store still marked as open") t.Fatalf("store still marked as open")
} }

Loading…
Cancel
Save