package raft import ( "sync" "time" ) // Future is used to represent an action that may occur in the future. type Future interface { // Error blocks until the future arrives and then // returns the error status of the future. // This may be called any number of times - all // calls will return the same value. // Note that it is not OK to call this method // twice concurrently on the same Future instance. Error() error } // ApplyFuture is used for Apply() and may return the FSM response. type ApplyFuture interface { Future // Response returns the FSM response as returned // by the FSM.Apply method. This must not be called // until after the Error method has returned. Response() interface{} // Index holds the index of the newly applied log entry. // This must not be called // until after the Error method has returned. Index() uint64 } // errorFuture is used to return a static error. type errorFuture struct { err error } func (e errorFuture) Error() error { return e.err } func (e errorFuture) Response() interface{} { return nil } func (e errorFuture) Index() uint64 { return 0 } // deferError can be embedded to allow a future // to provide an error in the future. type deferError struct { err error errCh chan error responded bool } func (d *deferError) init() { d.errCh = make(chan error, 1) } func (d *deferError) Error() error { if d.err != nil { // Note that when we've received a nil error, this // won't trigger, but the channel is closed after // send so we'll still return nil below. return d.err } if d.errCh == nil { panic("waiting for response on nil channel") } d.err = <-d.errCh return d.err } func (d *deferError) respond(err error) { if d.errCh == nil { return } if d.responded { return } d.errCh <- err close(d.errCh) d.responded = true } // logFuture is used to apply a log entry and waits until // the log is considered committed. type logFuture struct { deferError log Log policy quorumPolicy response interface{} dispatch time.Time } func (l *logFuture) Response() interface{} { return l.response } func (l *logFuture) Index() uint64 { return l.log.Index } type peerFuture struct { deferError peers []string } type shutdownFuture struct { raft *Raft } func (s *shutdownFuture) Error() error { if s.raft == nil { return nil } s.raft.waitShutdown() if closeable, ok := s.raft.trans.(WithClose); ok { closeable.Close() } return nil } // snapshotFuture is used for waiting on a snapshot to complete. type snapshotFuture struct { deferError } // reqSnapshotFuture is used for requesting a snapshot start. // It is only used internally. type reqSnapshotFuture struct { deferError // snapshot details provided by the FSM runner before responding index uint64 term uint64 peers []string snapshot FSMSnapshot } // restoreFuture is used for requesting an FSM to perform a // snapshot restore. Used internally only. type restoreFuture struct { deferError ID string } // verifyFuture is used to verify the current node is still // the leader. This is to prevent a stale read. type verifyFuture struct { deferError notifyCh chan *verifyFuture quorumSize int votes int voteLock sync.Mutex } // vote is used to respond to a verifyFuture. // This may block when responding on the notifyCh. func (v *verifyFuture) vote(leader bool) { v.voteLock.Lock() defer v.voteLock.Unlock() // Guard against having notified already if v.notifyCh == nil { return } if leader { v.votes++ if v.votes >= v.quorumSize { v.notifyCh <- v v.notifyCh = nil } } else { v.notifyCh <- v v.notifyCh = nil } } // appendFuture is used for waiting on a pipelined append // entries RPC. type appendFuture struct { deferError start time.Time args *AppendEntriesRequest resp *AppendEntriesResponse } func (a *appendFuture) Start() time.Time { return a.start } func (a *appendFuture) Request() *AppendEntriesRequest { return a.args } func (a *appendFuture) Response() *AppendEntriesResponse { return a.resp }