package cluster import ( "errors" "fmt" "log" "os" "sync" "time" "github.com/rqlite/rqlite/command" "github.com/rqlite/rqlite/random" ) var ( // ErrBootTimeout is returned when a boot operation does not // complete within the timeout. ErrBootTimeout = errors.New("boot timeout") ) // BootStatus is the reason the boot process completed. type BootStatus int const ( // BootUnknown is the initial state of the boot process. BootUnknown BootStatus = iota // BootJoin means boot completed due to a successful join. BootJoin // BootDone means boot completed due to Done being "true". BootDone // BootTimeout means the boot process timed out. BootTimeout ) const ( requestTimeout = 5 * time.Second numJoinAttempts = 1 bootInterval = 2 * time.Second ) // String returns a string representation of the BootStatus. func (b BootStatus) String() string { switch b { case BootUnknown: return "unknown" case BootJoin: return "join" case BootDone: return "done" case BootTimeout: return "timeout" default: panic("unknown boot status") } } // AddressProvider is the interface types must implement to provide // addresses to a Bootstrapper. type AddressProvider interface { Lookup() ([]string, error) } // Bootstrapper performs a bootstrap of this node. type Bootstrapper struct { provider AddressProvider client *Client logger *log.Logger Interval time.Duration bootStatusMu sync.RWMutex bootStatus BootStatus } // NewBootstrapper returns an instance of a Bootstrapper. func NewBootstrapper(p AddressProvider, client *Client) *Bootstrapper { bs := &Bootstrapper{ provider: p, client: client, logger: log.New(os.Stderr, "[cluster-bootstrap] ", log.LstdFlags), Interval: bootInterval, } return bs } // Boot performs the bootstrapping process for this node. This means it will // ensure this node becomes part of a cluster. It does this by either joining // an existing cluster by explicitly joining it through one of these nodes, // or by notifying those nodes that it exists, allowing a cluster-wide bootstap // take place. // // Returns nil if the boot operation was successful, or if done() ever returns // true. done() is periodically polled by the boot process. Returns an error // the boot process encounters an unrecoverable error, or booting does not // occur within the given timeout. // // id and raftAddr are those of the node calling Boot. All operations // performed by this function are done as a voting node. func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.Duration) error { timeoutT := time.NewTimer(timeout) defer timeoutT.Stop() tickerT := time.NewTimer(random.Jitter(time.Millisecond)) defer tickerT.Stop() joiner := NewJoiner(b.client, numJoinAttempts, requestTimeout) for { select { case <-timeoutT.C: b.setBootStatus(BootTimeout) return ErrBootTimeout case <-tickerT.C: if done() { b.logger.Printf("boot operation marked done") b.setBootStatus(BootDone) return nil } tickerT.Reset(random.Jitter(b.Interval)) // Move to longer-period polling targets, err := b.provider.Lookup() if err != nil { b.logger.Printf("provider lookup failed %s", err.Error()) } if len(targets) == 0 { continue } // Try an explicit join first. Joining an existing cluster is always given priority // over trying to form a new cluster. if j, err := joiner.Do(targets, id, raftAddr, true); err == nil { b.logger.Printf("succeeded directly joining cluster via node at %s", j) b.setBootStatus(BootJoin) return nil } // This is where we have to be careful. This node failed to join with any node // in the targets list. This could be because none of the nodes are contactable, // or none of the nodes are in a functioning cluster with a leader. That means that // this node could be part of a set nodes that are bootstrapping to form a cluster // de novo. For that to happen it needs to now let the other nodes know it is here. // If this is a new cluster, some node will then reach the bootstrap-expect value // first, form the cluster, beating all other nodes to it. if err := b.notify(targets, id, raftAddr); err != nil { b.logger.Printf("failed to notify all targets: %s (%s, will retry)", targets, err.Error()) } else { b.logger.Printf("succeeded notifying all targets: %s", targets) } } } } // Status returns the reason for the boot process completing. func (b *Bootstrapper) Status() BootStatus { b.bootStatusMu.RLock() defer b.bootStatusMu.RUnlock() return b.bootStatus } func (b *Bootstrapper) notify(targets []string, id, raftAddr string) error { nr := &command.NotifyRequest{ Address: raftAddr, Id: id, } for _, t := range targets { if err := b.client.Notify(nr, t, requestTimeout); err != nil { return fmt.Errorf("failed to notify node at %s: %s", t, err) } } return nil } func (b *Bootstrapper) setBootStatus(status BootStatus) { b.bootStatusMu.Lock() defer b.bootStatusMu.Unlock() b.bootStatus = status } type stringAddressProvider struct { ss []string } func (s *stringAddressProvider) Lookup() ([]string, error) { return s.ss, nil } // NewAddressProviderString wraps an AddressProvider around a string slice. func NewAddressProviderString(ss []string) AddressProvider { return &stringAddressProvider{ss} }