1
0
Fork 0

Use new AtomicBool

master
Philip O'Toole 7 months ago
parent 80f4a39ea1
commit 35348d393a

@ -238,7 +238,7 @@ const (
// Store is a SQLite database, where all changes are made via Raft consensus. // Store is a SQLite database, where all changes are made via Raft consensus.
type Store struct { type Store struct {
open bool open *AtomicBool
raftDir string raftDir string
snapshotDir string snapshotDir string
peersPath string peersPath string
@ -398,7 +398,7 @@ func New(ly Layer, c *Config) *Store {
// and setting the restore path means the Store will not report // and setting the restore path means the Store will not report
// itself as ready until a restore has been attempted. // itself as ready until a restore has been attempted.
func (s *Store) SetRestorePath(path string) error { func (s *Store) SetRestorePath(path string) error {
if s.open { if s.open.Is() {
return ErrOpen return ErrOpen
} }
@ -415,11 +415,11 @@ func (s *Store) SetRestorePath(path string) error {
func (s *Store) Open() (retErr error) { func (s *Store) Open() (retErr error) {
defer func() { defer func() {
if retErr == nil { if retErr == nil {
s.open = true s.open.Set()
} }
}() }()
if s.open { if s.open.Is() {
return ErrOpen return ErrOpen
} }
@ -591,7 +591,7 @@ func (s *Store) Bootstrap(servers ...*Server) error {
// the cluster. If this node is not the leader, and 'wait' is true, an error // the cluster. If this node is not the leader, and 'wait' is true, an error
// will be returned. // will be returned.
func (s *Store) Stepdown(wait bool) error { func (s *Store) Stepdown(wait bool) error {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
f := s.raft.LeadershipTransfer() f := s.raft.LeadershipTransfer()
@ -653,10 +653,10 @@ func (s *Store) Close(wait bool) (retErr error) {
defer func() { defer func() {
if retErr == nil { if retErr == nil {
s.logger.Printf("store closed with node ID %s, listening on %s", s.raftID, s.ly.Addr().String()) s.logger.Printf("store closed with node ID %s, listening on %s", s.raftID, s.ly.Addr().String())
s.open = false s.open.Unset()
} }
}() }()
if !s.open { if !s.open.Is() {
// Protect against closing already-closed resource, such as channels. // Protect against closing already-closed resource, such as channels.
return nil return nil
} }
@ -763,7 +763,7 @@ func (s *Store) DBAppliedIndex() uint64 {
// IsLeader is used to determine if the current node is cluster leader // IsLeader is used to determine if the current node is cluster leader
func (s *Store) IsLeader() bool { func (s *Store) IsLeader() bool {
if !s.open { if !s.open.Is() {
return false return false
} }
return s.raft.State() == raft.Leader return s.raft.State() == raft.Leader
@ -771,7 +771,7 @@ func (s *Store) IsLeader() bool {
// HasLeader returns true if the cluster has a leader, false otherwise. // HasLeader returns true if the cluster has a leader, false otherwise.
func (s *Store) HasLeader() bool { func (s *Store) HasLeader() bool {
if !s.open { if !s.open.Is() {
return false return false
} }
return s.raft.Leader() != "" return s.raft.Leader() != ""
@ -781,7 +781,7 @@ func (s *Store) HasLeader() bool {
// is no reference to the current node in the current cluster configuration then // is no reference to the current node in the current cluster configuration then
// false will also be returned. // false will also be returned.
func (s *Store) IsVoter() (bool, error) { func (s *Store) IsVoter() (bool, error) {
if !s.open { if !s.open.Is() {
return false, ErrNotOpen return false, ErrNotOpen
} }
cfg := s.raft.GetConfiguration() cfg := s.raft.GetConfiguration()
@ -798,7 +798,7 @@ func (s *Store) IsVoter() (bool, error) {
// State returns the current node's Raft state // State returns the current node's Raft state
func (s *Store) State() ClusterState { func (s *Store) State() ClusterState {
if !s.open { if !s.open.Is() {
return Unknown return Unknown
} }
state := s.raft.State() state := s.raft.State()
@ -828,7 +828,7 @@ func (s *Store) Path() string {
// Addr returns the address of the store. // Addr returns the address of the store.
func (s *Store) Addr() string { func (s *Store) Addr() string {
if !s.open { if !s.open.Is() {
return "" return ""
} }
return string(s.raftTn.LocalAddr()) return string(s.raftTn.LocalAddr())
@ -842,7 +842,7 @@ func (s *Store) ID() string {
// LeaderAddr returns the address of the current leader. Returns a // LeaderAddr returns the address of the current leader. Returns a
// blank string if there is no leader or if the Store is not open. // blank string if there is no leader or if the Store is not open.
func (s *Store) LeaderAddr() (string, error) { func (s *Store) LeaderAddr() (string, error) {
if !s.open { if !s.open.Is() {
return "", nil return "", nil
} }
addr, _ := s.raft.LeaderWithID() addr, _ := s.raft.LeaderWithID()
@ -852,7 +852,7 @@ func (s *Store) LeaderAddr() (string, error) {
// LeaderID returns the node ID of the Raft leader. Returns a // LeaderID returns the node ID of the Raft leader. Returns a
// blank string if there is no leader, or an error. // blank string if there is no leader, or an error.
func (s *Store) LeaderID() (string, error) { func (s *Store) LeaderID() (string, error) {
if !s.open { if !s.open.Is() {
return "", nil return "", nil
} }
_, id := s.raft.LeaderWithID() _, id := s.raft.LeaderWithID()
@ -862,7 +862,7 @@ func (s *Store) LeaderID() (string, error) {
// LeaderWithID is used to return the current leader address and ID of the cluster. // LeaderWithID is used to return the current leader address and ID of the cluster.
// It may return empty strings if there is no current leader or the leader is unknown. // It may return empty strings if there is no current leader or the leader is unknown.
func (s *Store) LeaderWithID() (string, string) { func (s *Store) LeaderWithID() (string, string) {
if !s.open { if !s.open.Is() {
return "", "" return "", ""
} }
addr, id := s.raft.LeaderWithID() addr, id := s.raft.LeaderWithID()
@ -871,7 +871,7 @@ func (s *Store) LeaderWithID() (string, string) {
// CommitIndex returns the Raft commit index. // CommitIndex returns the Raft commit index.
func (s *Store) CommitIndex() (uint64, error) { func (s *Store) CommitIndex() (uint64, error) {
if !s.open { if !s.open.Is() {
return 0, ErrNotOpen return 0, ErrNotOpen
} }
return s.raft.CommitIndex(), nil return s.raft.CommitIndex(), nil
@ -881,7 +881,7 @@ func (s *Store) CommitIndex() (uint64, error) {
// by the latest AppendEntries RPC. If this node is the Leader then the // by the latest AppendEntries RPC. If this node is the Leader then the
// commit index is returned directly from the Raft object. // commit index is returned directly from the Raft object.
func (s *Store) LeaderCommitIndex() (uint64, error) { func (s *Store) LeaderCommitIndex() (uint64, error) {
if !s.open { if !s.open.Is() {
return 0, ErrNotOpen return 0, ErrNotOpen
} }
if s.raft.State() == raft.Leader { if s.raft.State() == raft.Leader {
@ -892,7 +892,7 @@ func (s *Store) LeaderCommitIndex() (uint64, error) {
// Nodes returns the slice of nodes in the cluster, sorted by ID ascending. // Nodes returns the slice of nodes in the cluster, sorted by ID ascending.
func (s *Store) Nodes() ([]*Server, error) { func (s *Store) Nodes() ([]*Server, error) {
if !s.open { if !s.open.Is() {
return nil, ErrNotOpen return nil, ErrNotOpen
} }
@ -1010,7 +1010,7 @@ func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, erro
// Stats returns stats for the store. // Stats returns stats for the store.
func (s *Store) Stats() (map[string]interface{}, error) { func (s *Store) Stats() (map[string]interface{}, error) {
if !s.open { if !s.open.Is() {
return map[string]interface{}{ return map[string]interface{}{
"open": false, "open": false,
}, nil }, nil
@ -1120,7 +1120,7 @@ func (s *Store) Stats() (map[string]interface{}, error) {
// Execute executes queries that return no rows, but do modify the database. // Execute executes queries that return no rows, but do modify the database.
func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteResult, error) { func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteResult, error) {
if !s.open { if !s.open.Is() {
return nil, ErrNotOpen return nil, ErrNotOpen
} }
@ -1163,7 +1163,7 @@ func (s *Store) execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteResult, error
// Query executes queries that return rows, and do not modify the database. // Query executes queries that return rows, and do not modify the database.
func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) { func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) {
if !s.open { if !s.open.Is() {
return nil, ErrNotOpen return nil, ErrNotOpen
} }
@ -1222,7 +1222,7 @@ func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) {
// Request processes a request that may contain both Executes and Queries. // Request processes a request that may contain both Executes and Queries.
func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryResponse, error) { func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryResponse, error) {
if !s.open { if !s.open.Is() {
return nil, ErrNotOpen return nil, ErrNotOpen
} }
nRW, _ := s.RORWCount(eqr) nRW, _ := s.RORWCount(eqr)
@ -1304,7 +1304,7 @@ func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryRe
// will be written directly to that file. Otherwise a temporary file will be created, // will be written directly to that file. Otherwise a temporary file will be created,
// and that temporary file copied to dst. // and that temporary file copied to dst.
func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error) { func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error) {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
@ -1389,7 +1389,7 @@ func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error) {
// Loads an entire SQLite file into the database, sending the request // Loads an entire SQLite file into the database, sending the request
// through the Raft log. // through the Raft log.
func (s *Store) Load(lr *proto.LoadRequest) error { func (s *Store) Load(lr *proto.LoadRequest) error {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
@ -1559,7 +1559,7 @@ func (s *Store) Database(leader bool) ([]byte, error) {
// //
// Notifying is idempotent. A node may repeatedly notify the Store without issue. // Notifying is idempotent. A node may repeatedly notify the Store without issue.
func (s *Store) Notify(nr *proto.NotifyRequest) error { func (s *Store) Notify(nr *proto.NotifyRequest) error {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
@ -1618,7 +1618,7 @@ func (s *Store) Notify(nr *proto.NotifyRequest) error {
// Join joins a node, identified by id and located at addr, to this store. // Join joins a node, identified by id and located at addr, to this store.
// The node must be ready to respond to Raft communications at that address. // The node must be ready to respond to Raft communications at that address.
func (s *Store) Join(jr *proto.JoinRequest) error { func (s *Store) Join(jr *proto.JoinRequest) error {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
@ -1686,7 +1686,7 @@ func (s *Store) Join(jr *proto.JoinRequest) error {
// Remove removes a node from the store. // Remove removes a node from the store.
func (s *Store) Remove(rn *proto.RemoveNodeRequest) error { func (s *Store) Remove(rn *proto.RemoveNodeRequest) error {
if !s.open { if !s.open.Is() {
return ErrNotOpen return ErrNotOpen
} }
id := rn.Id id := rn.Id

@ -536,7 +536,7 @@ func Test_OpenStoreCloseSingleNode(t *testing.T) {
if err := s.Open(); err != nil { if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error()) t.Fatalf("failed to open single-node store: %s", err.Error())
} }
if !s.open { if !s.open.Is() {
t.Fatalf("store not marked as open") t.Fatalf("store not marked as open")
} }
@ -563,7 +563,7 @@ func Test_OpenStoreCloseSingleNode(t *testing.T) {
if err := s.Close(true); err != nil { if err := s.Close(true); err != nil {
t.Fatalf("failed to close single-node store: %s", err.Error()) t.Fatalf("failed to close single-node store: %s", err.Error())
} }
if s.open { if s.open.Is() {
t.Fatalf("store still marked as open") t.Fatalf("store still marked as open")
} }

Loading…
Cancel
Save