1
0
Fork 0

Add method to Store that returns leader ID

With the advent of Raft IDs, this distinction matters.
master
Philip O'Toole 5 years ago
parent 3bcbf82dc3
commit e376812907

@ -39,8 +39,8 @@ type Transport interface {
// Store represents a store of information, managed via consensus.
type Store interface {
// Leader returns the leader of the consensus system.
Leader() string
// Leader returns the address of the leader of the consensus system.
LeaderAddr() string
// UpdateAPIPeers updates the API peers on the store.
UpdateAPIPeers(peers map[string]string) error
@ -102,10 +102,10 @@ func (s *Service) SetPeer(raftAddr, apiAddr string) error {
}
// Try talking to the leader over the network.
if leader := s.store.Leader(); leader == "" {
if leader := s.store.LeaderAddr(); leader == "" {
return fmt.Errorf("no leader available")
}
conn, err := s.tn.Dial(s.store.Leader(), connectionTimeout)
conn, err := s.tn.Dial(s.store.LeaderAddr(), connectionTimeout)
if err != nil {
return err
}

@ -178,7 +178,7 @@ func newMockStore() *mockStore {
}
}
func (ms *mockStore) Leader() string {
func (ms *mockStore) LeaderAddr() string {
return ms.leader
}

@ -49,8 +49,8 @@ type Store interface {
// Remove removes the node, specified by addr, from the cluster.
Remove(addr string) error
// Leader returns the Raft leader of the cluster.
Leader() string
// LeaderAddr returns the Raft address of the leader of the cluster.
LeaderAddr() string
// Peer returns the API peer for the given address
Peer(addr string) string
@ -309,7 +309,7 @@ func (s *Service) handleJoin(w http.ResponseWriter, r *http.Request) {
if err := s.store.Join(remoteID, remoteAddr); err != nil {
if err == store.ErrNotLeader {
leader := s.store.Peer(s.store.Leader())
leader := s.store.Peer(s.store.LeaderAddr())
if leader == "" {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
@ -363,7 +363,7 @@ func (s *Service) handleRemove(w http.ResponseWriter, r *http.Request) {
if err := s.store.Remove(remoteAddr); err != nil {
if err == store.ErrNotLeader {
leader := s.store.Peer(s.store.Leader())
leader := s.store.Peer(s.store.LeaderAddr())
if leader == "" {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
@ -448,7 +448,7 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
results, err := s.store.ExecuteOrAbort(&store.ExecuteRequest{queries, timings, false})
if err != nil {
if err == store.ErrNotLeader {
leader := s.store.Peer(s.store.Leader())
leader := s.store.Peer(s.store.LeaderAddr())
if leader == "" {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
@ -498,7 +498,7 @@ func (s *Service) handleStatus(w http.ResponseWriter, r *http.Request) {
httpStatus := map[string]interface{}{
"addr": s.Addr().String(),
"auth": prettyEnabled(s.credentialStore != nil),
"redirect": s.store.Peer(s.store.Leader()),
"redirect": s.store.Peer(s.store.LeaderAddr()),
}
nodeStatus := map[string]interface{}{
@ -595,7 +595,7 @@ func (s *Service) handleExecute(w http.ResponseWriter, r *http.Request) {
results, err := s.store.Execute(&store.ExecuteRequest{queries, timings, isTx})
if err != nil {
if err == store.ErrNotLeader {
leader := s.store.Peer(s.store.Leader())
leader := s.store.Peer(s.store.LeaderAddr())
if leader == "" {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
@ -657,7 +657,7 @@ func (s *Service) handleQuery(w http.ResponseWriter, r *http.Request) {
results, err := s.store.Query(&store.QueryRequest{queries, timings, isTx, lvl})
if err != nil {
if err == store.ErrNotLeader {
leader := s.store.Peer(s.store.Leader())
leader := s.store.Peer(s.store.LeaderAddr())
if leader == "" {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return

@ -499,7 +499,7 @@ func (m *MockStore) Remove(addr string) error {
return nil
}
func (m *MockStore) Leader() string {
func (m *MockStore) LeaderAddr() string {
return ""
}

@ -353,10 +353,28 @@ func (s *Store) ID() string {
// Leader returns the current leader. Returns a blank string if there is
// no leader.
func (s *Store) Leader() string {
func (s *Store) LeaderAddr() string {
return string(s.raft.Leader())
}
// LeaderID returns the node ID of the Raft leader. Returns a
// blank string if there is no leader, or an error.
func (s *Store) LeaderID() (string, error) {
addr := s.LeaderAddr()
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("failed to get raft configuration: %v", err)
return "", err
}
for _, srv := range configFuture.Configuration().Servers {
if srv.Address == raft.ServerAddress(addr) {
return string(srv.ID), nil
}
}
return "", nil
}
// Peer returns the API address for the given addr. If there is no peer
// for the address, it returns the empty string.
func (s *Store) Peer(addr string) string {
@ -405,7 +423,7 @@ func (s *Store) WaitForLeader(timeout time.Duration) (string, error) {
for {
select {
case <-tck.C:
l := s.Leader()
l := s.LeaderAddr()
if l != "" {
return l, nil
}
@ -466,7 +484,7 @@ func (s *Store) Stats() (map[string]interface{}, error) {
"node_id": s.raftID,
"raft": s.raft.Stats(),
"addr": s.Addr().String(),
"leader": s.Leader(),
"leader": s.LeaderAddr(),
"apply_timeout": s.ApplyTimeout.String(),
"open_timeout": s.OpenTimeout.String(),
"heartbeat_timeout": s.HeartbeatTimeout.String(),

@ -50,6 +50,18 @@ func Test_OpenStoreSingleNode(t *testing.T) {
if err := s.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
s.WaitForLeader(10 * time.Second)
if got, exp := s.LeaderAddr(), s.Addr().String(); got != exp {
t.Fatalf("wrong leader address returned, got: %s, exp %s", got, exp)
}
id, err := s.LeaderID()
if err != nil {
t.Fatalf("failed to retrieve leader ID: %s", err.Error())
}
if got, exp := id, s.raftID; got != exp {
t.Fatalf("wrong leader ID returned, got: %s, exp %s", got, exp)
}
}
func Test_OpenStoreCloseSingleNode(t *testing.T) {
@ -442,6 +454,20 @@ func Test_MultiNodeJoinRemove(t *testing.T) {
t.Fatalf("failed to join to node at %s: %s", s0.Addr().String(), err.Error())
}
s1.WaitForLeader(10 * time.Second)
// Check leader state on follower.
if got, exp := s1.LeaderAddr(), s0.Addr().String(); got != exp {
t.Fatalf("wrong leader address returned, got: %s, exp %s", got, exp)
}
id, err := s1.LeaderID()
if err != nil {
t.Fatalf("failed to retrieve leader ID: %s", err.Error())
}
if got, exp := id, s0.raftID; got != exp {
t.Fatalf("wrong leader ID returned, got: %s, exp %s", got, exp)
}
nodes, err := s0.Nodes()
if err != nil {
t.Fatalf("failed to get nodes: %s", err.Error())
@ -804,7 +830,7 @@ func mustNewStore(inmem bool) *Store {
DBConf: cfg,
Dir: path,
Tn: tn,
ID: tn.Addr().String(), // Could be any unique string.
ID: path, // Could be any unique string.
})
if s == nil {
panic("failed to create new store")

Loading…
Cancel
Save