From aca943d1945cf790255c201ccb88c2de64574cdd Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 29 Aug 2014 01:18:32 -0700 Subject: [PATCH] Prepare to snap after some number of log entries --- main.go | 4 +++- server/server.go | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 7a49a9d8..0132b071 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ var dbfile string var cpuprofile string var logFile string var logLevel string +var snapAfter int func init() { flag.StringVar(&host, "h", "localhost", "hostname") @@ -33,6 +34,7 @@ func init() { flag.StringVar(&cpuprofile, "cpuprofile", "", "write CPU profile to file") flag.StringVar(&logFile, "logfile", "stdout", "log file path") flag.StringVar(&logLevel, "loglevel", "INFO", "log level (ERROR|WARN|INFO|DEBUG|TRACE)") + flag.IntVar(&snapAfter, "s", 1000, "Snapshot after every number of log entries") flag.Usage = func() { fmt.Fprintf(os.Stderr, "Usage: %s [arguments] \n", os.Args[0]) flag.PrintDefaults() @@ -114,7 +116,7 @@ func main() { log.Error("Unable to create path: %v", err) } - s := server.New(path, dbfile, host, port) + s := server.New(path, dbfile, snapAfter, host, port) go func() { log.Error(s.ListenAndServe(join)) }() diff --git a/server/server.go b/server/server.go index 53514c18..34a74638 100644 --- a/server/server.go +++ b/server/server.go @@ -10,6 +10,7 @@ import ( "net/http" "path" "path/filepath" + "strconv" "strings" "sync" "time" @@ -57,8 +58,18 @@ type ServerDiagnostics struct { startTime time.Time } +type SnapshotConf struct { + // The index when the last snapshot happened + lastIndex uint64 + + // If the incremental number of index entries since the last + // snapshot exceeds snapshotAfter rqlite will do a snapshot + snapshotAfter uint64 +} + // The raftd server is a combination of the Raft server and an HTTP // server which acts as the transport. + type Server struct { name string host string @@ -69,6 +80,7 @@ type Server struct { httpServer *http.Server dbFile string db *db.DB + snapConf *SnapshotConf metrics *ServerMetrics diagnostics *ServerDiagnostics mutex sync.Mutex @@ -132,7 +144,7 @@ func NewServerDiagnostics() *ServerDiagnostics { } // Creates a new server. -func New(dataDir string, dbfile string, host string, port int) *Server { +func New(dataDir string, dbfile string, snapAfter int, host string, port int) *Server { s := &Server{ host: host, port: port, @@ -168,6 +180,16 @@ func (s *Server) connectionString() string { return fmt.Sprintf("http://%s:%d", s.host, s.port) } +// logSnapshot logs about the snapshot that was taken. +func (s *Server) logSnapshot(err error, currentIndex, count uint64) { + info := fmt.Sprintf("%s: snapshot of %d events at index %d", s.connectionString, count, currentIndex) + if err != nil { + log.Info("%s attempted and failed: %v", info, err) + } else { + log.Info("%s completed", info) + } +} + // Starts the server. func (s *Server) ListenAndServe(leader string) error { var err error @@ -355,6 +377,14 @@ func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) { log.Trace("writeHandler for URL: %s", req.URL) s.metrics.executeReceived.Inc(1) + currentIndex := s.raftServer.CommitIndex() + count := currentIndex - s.snapConf.lastIndex + if uint64(count) > s.snapConf.snapshotAfter { + err := s.raftServer.TakeSnapshot() + s.logSnapshot(err, currentIndex, count) + s.snapConf.lastIndex = currentIndex + } + var startTime time.Time // Read the value from the POST body. @@ -440,6 +470,8 @@ func (s *Server) serveDiagnostics(w http.ResponseWriter, req *http.Request) { diagnostics["data"] = s.path diagnostics["database"] = s.dbFile diagnostics["connection"] = s.connectionString() + diagnostics["snapafter"] = strconv.FormatUint(s.snapConf.snapshotAfter, 10) + diagnostics["snapindex"] = strconv.FormatUint(s.snapConf.lastIndex, 10) var b []byte pretty, _ := isPretty(req) if pretty {