1
0
Fork 0

Correctly support read-only nodes with KV disco

master
Philip O'Toole 9 months ago
parent 31687195be
commit 2cb7c19087

@ -3,6 +3,7 @@ This release adds new control over Raft snapshotting, a key part of the Raft con
### New features
- [PR #1530](https://github.com/rqlite/rqlite/pull/1530), [PR #1533](https://github.com/rqlite/rqlite/pull/1533): Support automatically snapshotting when WAL reaches the SQLite default of 4MB.
- [PR #1541](https://github.com/rqlite/rqlite/pull/1541), [PR #1542](https://github.com/rqlite/rqlite/pull/1542): DNS-based autoclustering supports read-only (non-voting) nodes. Fixes issue [#1521](https://github.com/rqlite/rqlite/issues/1521)
- [PR #1544](https://github.com/rqlite/rqlite/pull/1544): Support autoclustering of read-only nodes with Consul and etcd.
### Implementation changes and bug fixes
- [PR #1531](https://github.com/rqlite/rqlite/pull/1531): Check for Raft snapshot condition every 10 seconds by default.

@ -519,7 +519,6 @@ func createCluster(cfg *Config, hasPeers bool, client *cluster.Client, str *stor
log.Printf("preexisting node configuration detected, not registering with discovery service")
return nil
}
log.Println("no preexisting nodes, registering with discovery service")
leader, addr, err := discoService.Register(str.ID(), cfg.HTTPURL(), cfg.RaftAdv)

@ -16,15 +16,32 @@ const (
// Client is the interface discovery clients must implement.
type Client interface {
// GetLeader returns the current Leader stored in the KV store. If the Leader
// is set, the returned ok flag will be true. If the Leader is not set, the
// returned ok flag will be false.
GetLeader() (id string, apiAddr string, addr string, ok bool, e error)
// InitializeLeader sets the leader to the given details, but only if no leader
// has already been set. This operation is a check-and-set type operation. If
// initialization succeeds, ok is set to true, otherwise false.
InitializeLeader(id, apiAddr, addr string) (bool, error)
// SetLeader unconditionally sets the leader to the given details.
SetLeader(id, apiAddr, addr string) error
fmt.Stringer
}
// Store is the interface the consensus system must implement.
type Store interface {
// IsLeader returns whether this node is the Leader.
IsLeader() bool
// IsVoter returns whether this node is a voter. Only Voters can be Leaders.
IsVoter() (bool, error)
// RegisterLeaderChange registers a channel that will be notified when
// a leadership change occurs.
RegisterLeaderChange(c chan<- struct{})
}
@ -55,8 +72,8 @@ func NewService(c Client, s Store) *Service {
}
// Register registers this node with the discovery service. It will block
// until a) the node registers itself as leader, b) learns of another node
// it can use to join the cluster, or c) an unrecoverable error occurs.
// until a) if the node is a voter, it registers itself, b) learns of another
// node it can use to join the cluster, or c) an unrecoverable error occurs.
func (s *Service) Register(id, apiAddr, addr string) (bool, string, error) {
for {
_, _, cRaftAddr, ok, err := s.c.GetLeader()
@ -67,13 +84,15 @@ func (s *Service) Register(id, apiAddr, addr string) (bool, string, error) {
return false, cRaftAddr, nil
}
ok, err = s.c.InitializeLeader(id, apiAddr, addr)
if err != nil {
s.logger.Printf("failed to initialize as Leader: %s", err.Error())
}
if ok {
s.updateContact(time.Now())
return true, addr, nil
if s.isVoter() {
ok, err = s.c.InitializeLeader(id, apiAddr, addr)
if err != nil {
s.logger.Printf("failed to initialize as Leader: %s", err.Error())
}
if ok {
s.updateContact(time.Now())
return true, addr, nil
}
}
time.Sleep(random.Jitter(s.RegisterInterval))
@ -138,3 +157,8 @@ func (s *Service) updateContact(t time.Time) {
defer s.mu.Unlock()
s.lastContact = t
}
func (s *Service) isVoter() bool {
v, _ := s.s.IsVoter()
return v
}

@ -67,6 +67,41 @@ func Test_RegisterInitializeLeader(t *testing.T) {
}
}
func Test_RegisterNonVoter(t *testing.T) {
m := &mockClient{}
getCalled := false
m.getLeaderFn = func() (id string, apiAddr string, addr string, ok bool, e error) {
if getCalled {
return "2", "localhost:4003", "localhost:4004", true, nil
}
getCalled = true
return "", "", "", false, nil
}
m.initializeLeaderFn = func(tID, tAPIAddr, tAddr string) (bool, error) {
t.Fatal("InitializeLeader called unexpectedly")
return false, nil
}
c := &mockStore{
isVoterFn: func() (bool, error) {
return false, nil
},
}
s := NewService(m, c)
s.RegisterInterval = 10 * time.Millisecond
ok, addr, err := s.Register("1", "localhost:4001", "localhost:4002")
if err != nil {
t.Fatalf("error registering with disco: %s", err.Error())
}
if ok {
t.Fatalf("registered incorrectly as non-voter")
}
if exp, got := "localhost:4004", addr; exp != got {
t.Fatalf("returned addressed incorrect, exp %s, got %s", exp, got)
}
}
func Test_StartReportingTimer(t *testing.T) {
// WaitGroups won't work because we don't know how many times
// setLeaderFn will be called.
@ -155,6 +190,7 @@ func (m *mockClient) String() string {
type mockStore struct {
isLeaderFn func() bool
isVoterFn func() (bool, error)
registerLeaderChangeFn func(c chan<- struct{})
}
@ -165,6 +201,13 @@ func (m *mockStore) IsLeader() bool {
return false
}
func (m *mockStore) IsVoter() (bool, error) {
if m.isVoterFn != nil {
return m.isVoterFn()
}
return true, nil
}
func (m *mockStore) RegisterLeaderChange(c chan<- struct{}) {
if m.registerLeaderChangeFn != nil {
m.registerLeaderChangeFn(c)

Loading…
Cancel
Save