1
0
Fork 0

Prepare to snap after some number of log entries

master
Philip O'Toole 10 years ago
parent af48ccb4a8
commit aca943d194

@ -24,6 +24,7 @@ var dbfile string
var cpuprofile string
var logFile string
var logLevel string
var snapAfter int
func init() {
flag.StringVar(&host, "h", "localhost", "hostname")
@ -33,6 +34,7 @@ func init() {
flag.StringVar(&cpuprofile, "cpuprofile", "", "write CPU profile to file")
flag.StringVar(&logFile, "logfile", "stdout", "log file path")
flag.StringVar(&logLevel, "loglevel", "INFO", "log level (ERROR|WARN|INFO|DEBUG|TRACE)")
flag.IntVar(&snapAfter, "s", 1000, "Snapshot after every number of log entries")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
flag.PrintDefaults()
@ -114,7 +116,7 @@ func main() {
log.Error("Unable to create path: %v", err)
}
s := server.New(path, dbfile, host, port)
s := server.New(path, dbfile, snapAfter, host, port)
go func() {
log.Error(s.ListenAndServe(join))
}()

@ -10,6 +10,7 @@ import (
"net/http"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -57,8 +58,18 @@ type ServerDiagnostics struct {
startTime time.Time
}
type SnapshotConf struct {
// The index when the last snapshot happened
lastIndex uint64
// If the incremental number of index entries since the last
// snapshot exceeds snapshotAfter rqlite will do a snapshot
snapshotAfter uint64
}
// The raftd server is a combination of the Raft server and an HTTP
// server which acts as the transport.
type Server struct {
name string
host string
@ -69,6 +80,7 @@ type Server struct {
httpServer *http.Server
dbFile string
db *db.DB
snapConf *SnapshotConf
metrics *ServerMetrics
diagnostics *ServerDiagnostics
mutex sync.Mutex
@ -132,7 +144,7 @@ func NewServerDiagnostics() *ServerDiagnostics {
}
// Creates a new server.
func New(dataDir string, dbfile string, host string, port int) *Server {
func New(dataDir string, dbfile string, snapAfter int, host string, port int) *Server {
s := &Server{
host: host,
port: port,
@ -168,6 +180,16 @@ func (s *Server) connectionString() string {
return fmt.Sprintf("http://%s:%d", s.host, s.port)
}
// logSnapshot logs about the snapshot that was taken.
func (s *Server) logSnapshot(err error, currentIndex, count uint64) {
info := fmt.Sprintf("%s: snapshot of %d events at index %d", s.connectionString, count, currentIndex)
if err != nil {
log.Info("%s attempted and failed: %v", info, err)
} else {
log.Info("%s completed", info)
}
}
// Starts the server.
func (s *Server) ListenAndServe(leader string) error {
var err error
@ -355,6 +377,14 @@ func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
log.Trace("writeHandler for URL: %s", req.URL)
s.metrics.executeReceived.Inc(1)
currentIndex := s.raftServer.CommitIndex()
count := currentIndex - s.snapConf.lastIndex
if uint64(count) > s.snapConf.snapshotAfter {
err := s.raftServer.TakeSnapshot()
s.logSnapshot(err, currentIndex, count)
s.snapConf.lastIndex = currentIndex
}
var startTime time.Time
// Read the value from the POST body.
@ -440,6 +470,8 @@ func (s *Server) serveDiagnostics(w http.ResponseWriter, req *http.Request) {
diagnostics["data"] = s.path
diagnostics["database"] = s.dbFile
diagnostics["connection"] = s.connectionString()
diagnostics["snapafter"] = strconv.FormatUint(s.snapConf.snapshotAfter, 10)
diagnostics["snapindex"] = strconv.FormatUint(s.snapConf.lastIndex, 10)
var b []byte
pretty, _ := isPretty(req)
if pretty {

Loading…
Cancel
Save