|
|
|
@ -69,12 +69,12 @@ var fkConstraints bool
|
|
|
|
|
var raftLogLevel string
|
|
|
|
|
var raftNonVoter bool
|
|
|
|
|
var raftSnapThreshold uint64
|
|
|
|
|
var raftSnapInterval string
|
|
|
|
|
var raftLeaderLeaseTimeout string
|
|
|
|
|
var raftHeartbeatTimeout string
|
|
|
|
|
var raftElectionTimeout string
|
|
|
|
|
var raftApplyTimeout string
|
|
|
|
|
var raftOpenTimeout string
|
|
|
|
|
var raftSnapInterval time.Duration
|
|
|
|
|
var raftLeaderLeaseTimeout time.Duration
|
|
|
|
|
var raftHeartbeatTimeout time.Duration
|
|
|
|
|
var raftElectionTimeout time.Duration
|
|
|
|
|
var raftApplyTimeout time.Duration
|
|
|
|
|
var raftOpenTimeout time.Duration
|
|
|
|
|
var raftWaitForLeader bool
|
|
|
|
|
var raftShutdownOnRemove bool
|
|
|
|
|
var compressionSize int
|
|
|
|
@ -119,14 +119,14 @@ func init() {
|
|
|
|
|
flag.BoolVar(&fkConstraints, "fk", false, "Enable SQLite foreign key constraints")
|
|
|
|
|
flag.BoolVar(&showVersion, "version", false, "Show version information and exit")
|
|
|
|
|
flag.BoolVar(&raftNonVoter, "raft-non-voter", false, "Configure as non-voting node")
|
|
|
|
|
flag.StringVar(&raftHeartbeatTimeout, "raft-timeout", "1s", "Raft heartbeat timeout")
|
|
|
|
|
flag.StringVar(&raftElectionTimeout, "raft-election-timeout", "1s", "Raft election timeout")
|
|
|
|
|
flag.StringVar(&raftApplyTimeout, "raft-apply-timeout", "10s", "Raft apply timeout")
|
|
|
|
|
flag.StringVar(&raftOpenTimeout, "raft-open-timeout", "120s", "Time for initial Raft logs to be applied. Use 0s duration to skip wait")
|
|
|
|
|
flag.DurationVar(&raftHeartbeatTimeout, "raft-timeout", time.Second, "Raft heartbeat timeout")
|
|
|
|
|
flag.DurationVar(&raftElectionTimeout, "raft-election-timeout", time.Second, "Raft election timeout")
|
|
|
|
|
flag.DurationVar(&raftApplyTimeout, "raft-apply-timeout", 10*time.Second, "Raft apply timeout")
|
|
|
|
|
flag.DurationVar(&raftOpenTimeout, "raft-open-timeout", 120*time.Second, "Time for initial Raft logs to be applied. Use 0s duration to skip wait")
|
|
|
|
|
flag.BoolVar(&raftWaitForLeader, "raft-leader-wait", true, "Node waits for a leader before answering requests")
|
|
|
|
|
flag.Uint64Var(&raftSnapThreshold, "raft-snap", 8192, "Number of outstanding log entries that trigger snapshot")
|
|
|
|
|
flag.StringVar(&raftSnapInterval, "raft-snap-int", "30s", "Snapshot threshold check interval")
|
|
|
|
|
flag.StringVar(&raftLeaderLeaseTimeout, "raft-leader-lease-timeout", "0s", "Raft leader lease timeout. Use 0s for Raft default")
|
|
|
|
|
flag.DurationVar(&raftSnapInterval, "raft-snap-int", 30*time.Second, "Snapshot threshold check interval")
|
|
|
|
|
flag.DurationVar(&raftLeaderLeaseTimeout, "raft-leader-lease-timeout", 0, "Raft leader lease timeout. Use 0s for Raft default")
|
|
|
|
|
flag.BoolVar(&raftShutdownOnRemove, "raft-remove-shutdown", false, "Shutdown Raft if node removed")
|
|
|
|
|
flag.StringVar(&raftLogLevel, "raft-log-level", "INFO", "Minimum log level for Raft module")
|
|
|
|
|
flag.IntVar(&compressionSize, "compression-size", 150, "Request query size for compression attempt")
|
|
|
|
@ -387,18 +387,14 @@ func determineJoinAddresses(isNew bool) ([]string, error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func waitForConsensus(str *store.Store) error {
|
|
|
|
|
openTimeout, err := time.ParseDuration(raftOpenTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to parse Raft open timeout %s: %s", raftOpenTimeout, err.Error())
|
|
|
|
|
}
|
|
|
|
|
if _, err := str.WaitForLeader(openTimeout); err != nil {
|
|
|
|
|
if _, err := str.WaitForLeader(raftOpenTimeout); err != nil {
|
|
|
|
|
if raftWaitForLeader {
|
|
|
|
|
return fmt.Errorf("leader did not appear within timeout: %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
log.Println("ignoring error while waiting for leader")
|
|
|
|
|
}
|
|
|
|
|
if openTimeout != 0 {
|
|
|
|
|
if err := str.WaitForInitialLogs(openTimeout); err != nil {
|
|
|
|
|
if raftOpenTimeout != 0 {
|
|
|
|
|
if err := str.WaitForInitialLogs(raftOpenTimeout); err != nil {
|
|
|
|
|
return fmt.Errorf("log was not fully applied within timeout: %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
@ -430,26 +426,11 @@ func createStore(ln *tcp.Layer, dataPath string) (*store.Store, bool, error) {
|
|
|
|
|
str.RaftLogLevel = raftLogLevel
|
|
|
|
|
str.ShutdownOnRemove = raftShutdownOnRemove
|
|
|
|
|
str.SnapshotThreshold = raftSnapThreshold
|
|
|
|
|
str.SnapshotInterval, err = time.ParseDuration(raftSnapInterval)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, false, fmt.Errorf("invalid Raft Snapsnot interval %s: %s", raftSnapInterval, err.Error())
|
|
|
|
|
}
|
|
|
|
|
str.LeaderLeaseTimeout, err = time.ParseDuration(raftLeaderLeaseTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, false, fmt.Errorf("invalid Raft Leader lease timeout %s: %s", raftLeaderLeaseTimeout, err.Error())
|
|
|
|
|
}
|
|
|
|
|
str.HeartbeatTimeout, err = time.ParseDuration(raftHeartbeatTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, false, fmt.Errorf("invalid Raft heartbeat timeout %s: %s", raftHeartbeatTimeout, err.Error())
|
|
|
|
|
}
|
|
|
|
|
str.ElectionTimeout, err = time.ParseDuration(raftElectionTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, false, fmt.Errorf("invalid Raft election timeout %s: %s", raftElectionTimeout, err.Error())
|
|
|
|
|
}
|
|
|
|
|
str.ApplyTimeout, err = time.ParseDuration(raftApplyTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, false, fmt.Errorf("invalid Raft apply timeout %s: %s", raftApplyTimeout, err.Error())
|
|
|
|
|
}
|
|
|
|
|
str.SnapshotInterval = raftSnapInterval
|
|
|
|
|
str.LeaderLeaseTimeout = raftLeaderLeaseTimeout
|
|
|
|
|
str.HeartbeatTimeout = raftHeartbeatTimeout
|
|
|
|
|
str.ElectionTimeout = raftElectionTimeout
|
|
|
|
|
str.ApplyTimeout = raftApplyTimeout
|
|
|
|
|
|
|
|
|
|
return str, store.IsNewNode(dataPath), nil
|
|
|
|
|
}
|
|
|
|
|