1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
3.9 KiB
Go

package raft
import (
"bytes"
crand "crypto/rand"
"fmt"
"math"
"math/big"
"math/rand"
"time"
"github.com/hashicorp/go-msgpack/codec"
)
func init() {
// Ensure we use a high-entropy seed for the psuedo-random generator
rand.Seed(newSeed())
}
// returns an int64 from a crypto random source
// can be used to seed a source for a math/rand.
func newSeed() int64 {
r, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
if err != nil {
panic(fmt.Errorf("failed to read random bytes: %v", err))
}
return r.Int64()
}
// randomTimeout returns a value that is between the minVal and 2x minVal.
func randomTimeout(minVal time.Duration) <-chan time.Time {
if minVal == 0 {
return nil
}
extra := (time.Duration(rand.Int63()) % minVal)
return time.After(minVal + extra)
}
// min returns the minimum.
func min(a, b uint64) uint64 {
if a <= b {
return a
}
return b
}
// max returns the maximum.
func max(a, b uint64) uint64 {
if a >= b {
return a
}
return b
}
// generateUUID is used to generate a random UUID.
func generateUUID() string {
buf := make([]byte, 16)
if _, err := crand.Read(buf); err != nil {
panic(fmt.Errorf("failed to read random bytes: %v", err))
}
return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
buf[0:4],
buf[4:6],
buf[6:8],
buf[8:10],
buf[10:16])
}
// asyncNotifyCh is used to do an async channel send
// to a single channel without blocking.
func asyncNotifyCh(ch chan struct{}) {
select {
case ch <- struct{}{}:
default:
}
}
// asyncNotifyBool is used to do an async notification
// on a bool channel.
func asyncNotifyBool(ch chan bool, v bool) {
select {
case ch <- v:
default:
}
}
// ExcludePeer is used to exclude a single peer from a list of peers.
func ExcludePeer(peers []string, peer string) []string {
otherPeers := make([]string, 0, len(peers))
for _, p := range peers {
if p != peer {
otherPeers = append(otherPeers, p)
}
}
return otherPeers
}
// PeerContained checks if a given peer is contained in a list.
func PeerContained(peers []string, peer string) bool {
for _, p := range peers {
if p == peer {
return true
}
}
return false
}
// AddUniquePeer is used to add a peer to a list of existing
// peers only if it is not already contained.
func AddUniquePeer(peers []string, peer string) []string {
if PeerContained(peers, peer) {
return peers
}
return append(peers, peer)
}
// encodePeers is used to serialize a list of peers.
func encodePeers(peers []string, trans Transport) []byte {
// Encode each peer
var encPeers [][]byte
for _, p := range peers {
encPeers = append(encPeers, trans.EncodePeer(p))
}
// Encode the entire array
buf, err := encodeMsgPack(encPeers)
if err != nil {
panic(fmt.Errorf("failed to encode peers: %v", err))
}
return buf.Bytes()
}
// decodePeers is used to deserialize a list of peers.
func decodePeers(buf []byte, trans Transport) []string {
// Decode the buffer first
var encPeers [][]byte
if err := decodeMsgPack(buf, &encPeers); err != nil {
panic(fmt.Errorf("failed to decode peers: %v", err))
}
// Deserialize each peer
var peers []string
for _, enc := range encPeers {
peers = append(peers, trans.DecodePeer(enc))
}
return peers
}
// Decode reverses the encode operation on a byte slice input.
func decodeMsgPack(buf []byte, out interface{}) error {
r := bytes.NewBuffer(buf)
hd := codec.MsgpackHandle{}
dec := codec.NewDecoder(r, &hd)
return dec.Decode(out)
}
// Encode writes an encoded object to a new bytes buffer.
func encodeMsgPack(in interface{}) (*bytes.Buffer, error) {
buf := bytes.NewBuffer(nil)
hd := codec.MsgpackHandle{}
enc := codec.NewEncoder(buf, &hd)
err := enc.Encode(in)
return buf, err
}
// backoff is used to compute an exponential backoff
// duration. Base time is scaled by the current round,
// up to some maximum scale factor.
func backoff(base time.Duration, round, limit uint64) time.Duration {
power := min(round, limit)
for power > 2 {
base *= 2
power--
}
return base
}