|
|
@ -200,51 +200,13 @@ func main() {
|
|
|
|
raftTn := mux.Listen(cluster.MuxRaftHeader)
|
|
|
|
raftTn := mux.Listen(cluster.MuxRaftHeader)
|
|
|
|
log.Printf("Raft TCP mux Listener registered with %d", cluster.MuxRaftHeader)
|
|
|
|
log.Printf("Raft TCP mux Listener registered with %d", cluster.MuxRaftHeader)
|
|
|
|
|
|
|
|
|
|
|
|
// Create and open the store.
|
|
|
|
// Create the store.
|
|
|
|
dataPath, err = filepath.Abs(dataPath)
|
|
|
|
str, isNew, err := createStore(raftTn, dataPath)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("failed to determine absolute data path: %s", err.Error())
|
|
|
|
log.Fatalf("failed to create store: %s", err.Error())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
dbConf := store.NewDBConfig(!onDisk)
|
|
|
|
|
|
|
|
dbConf.FKConstraints = fkConstraints
|
|
|
|
|
|
|
|
dbConf.OnDiskPath = onDiskPath
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
str := store.New(raftTn, &store.Config{
|
|
|
|
|
|
|
|
DBConf: dbConf,
|
|
|
|
|
|
|
|
Dir: dataPath,
|
|
|
|
|
|
|
|
ID: idOrRaftAddr(),
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Set optional parameters on store.
|
|
|
|
|
|
|
|
str.StartupOnDisk = onDiskStartup
|
|
|
|
|
|
|
|
str.SetRequestCompression(compressionBatch, compressionSize)
|
|
|
|
|
|
|
|
str.RaftLogLevel = raftLogLevel
|
|
|
|
|
|
|
|
str.ShutdownOnRemove = raftShutdownOnRemove
|
|
|
|
|
|
|
|
str.SnapshotThreshold = raftSnapThreshold
|
|
|
|
|
|
|
|
str.SnapshotInterval, err = time.ParseDuration(raftSnapInterval)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
log.Fatalf("failed to parse Raft Snapsnot interval %s: %s", raftSnapInterval, err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
str.LeaderLeaseTimeout, err = time.ParseDuration(raftLeaderLeaseTimeout)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
log.Fatalf("failed to parse Raft Leader lease timeout %s: %s", raftLeaderLeaseTimeout, err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
str.HeartbeatTimeout, err = time.ParseDuration(raftHeartbeatTimeout)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
log.Fatalf("failed to parse Raft heartbeat timeout %s: %s", raftHeartbeatTimeout, err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
str.ElectionTimeout, err = time.ParseDuration(raftElectionTimeout)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
log.Fatalf("failed to parse Raft election timeout %s: %s", raftElectionTimeout, err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
str.ApplyTimeout, err = time.ParseDuration(raftApplyTimeout)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
log.Fatalf("failed to parse Raft apply timeout %s: %s", raftApplyTimeout, err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Any prexisting node state?
|
|
|
|
|
|
|
|
var enableBootstrap bool
|
|
|
|
var enableBootstrap bool
|
|
|
|
isNew := store.IsNewNode(dataPath)
|
|
|
|
|
|
|
|
if isNew {
|
|
|
|
if isNew {
|
|
|
|
log.Printf("no preexisting node state detected in %s, node may be bootstrapping", dataPath)
|
|
|
|
log.Printf("no preexisting node state detected in %s, node may be bootstrapping", dataPath)
|
|
|
|
enableBootstrap = true // New node, so we may be bootstrapping
|
|
|
|
enableBootstrap = true // New node, so we may be bootstrapping
|
|
|
@ -445,6 +407,53 @@ func waitForConsensus(str *store.Store) error {
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func createStore(ln *tcp.Layer, dataPath string) (*store.Store, bool, error) {
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
dataPath, err = filepath.Abs(dataPath)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return nil, false, fmt.Errorf("failed to determine absolute data path: %s", err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
dbConf := store.NewDBConfig(!onDisk)
|
|
|
|
|
|
|
|
dbConf.FKConstraints = fkConstraints
|
|
|
|
|
|
|
|
dbConf.OnDiskPath = onDiskPath
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
str := store.New(ln, &store.Config{
|
|
|
|
|
|
|
|
DBConf: dbConf,
|
|
|
|
|
|
|
|
Dir: dataPath,
|
|
|
|
|
|
|
|
ID: idOrRaftAddr(),
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Set optional parameters on store.
|
|
|
|
|
|
|
|
str.StartupOnDisk = onDiskStartup
|
|
|
|
|
|
|
|
str.SetRequestCompression(compressionBatch, compressionSize)
|
|
|
|
|
|
|
|
str.RaftLogLevel = raftLogLevel
|
|
|
|
|
|
|
|
str.ShutdownOnRemove = raftShutdownOnRemove
|
|
|
|
|
|
|
|
str.SnapshotThreshold = raftSnapThreshold
|
|
|
|
|
|
|
|
str.SnapshotInterval, err = time.ParseDuration(raftSnapInterval)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return nil, false, fmt.Errorf("invalid Raft Snapsnot interval %s: %s", raftSnapInterval, err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
str.LeaderLeaseTimeout, err = time.ParseDuration(raftLeaderLeaseTimeout)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return nil, false, fmt.Errorf("invalid Raft Leader lease timeout %s: %s", raftLeaderLeaseTimeout, err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
str.HeartbeatTimeout, err = time.ParseDuration(raftHeartbeatTimeout)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return nil, false, fmt.Errorf("invalid Raft heartbeat timeout %s: %s", raftHeartbeatTimeout, err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
str.ElectionTimeout, err = time.ParseDuration(raftElectionTimeout)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return nil, false, fmt.Errorf("invalid Raft election timeout %s: %s", raftElectionTimeout, err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
str.ApplyTimeout, err = time.ParseDuration(raftApplyTimeout)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return nil, false, fmt.Errorf("invalid Raft apply timeout %s: %s", raftApplyTimeout, err.Error())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return str, store.IsNewNode(dataPath), nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func startHTTPService(str *store.Store, cltr *cluster.Client, credStr *auth.CredentialsStore) (*httpd.Service, error) {
|
|
|
|
func startHTTPService(str *store.Store, cltr *cluster.Client, credStr *auth.CredentialsStore) (*httpd.Service, error) {
|
|
|
|
// Create HTTP server and load authentication information if required.
|
|
|
|
// Create HTTP server and load authentication information if required.
|
|
|
|
var s *httpd.Service
|
|
|
|
var s *httpd.Service
|
|
|
|