diff --git a/CHANGELOG.md b/CHANGELOG.md index c05d45fe..bd252b58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 4.6.0 (unreleased) +- [PR #587](https://github.com/rqlite/rqlite/pull/587): Add expvar stats to store. - [PR #586](https://github.com/rqlite/rqlite/pull/586): rqlite CLI now supports command history. Thanks @rhnvrm. - [PR #585](https://github.com/rqlite/rqlite/pull/585): Add backup command to CLI: Thanks @eariassoto. - [PR #584](https://github.com/rqlite/rqlite/pull/584): Support showing timings in the CLI. Thanks @joaodrp. diff --git a/store/store.go b/store/store.go index 2ad2e16f..b6a6a1fd 100644 --- a/store/store.go +++ b/store/store.go @@ -8,6 +8,7 @@ import ( "encoding/binary" "encoding/json" "errors" + "expvar" "fmt" "io" "io/ioutil" @@ -42,6 +43,22 @@ const ( appliedWaitDelay = 100 * time.Millisecond ) +const ( + numSnaphots = "num_snapshots" + numBackups = "num_backups" + numRestores = "num_restores" +) + +// stats captures stats for the Store. +var stats *expvar.Map + +func init() { + stats = expvar.NewMap("store") + stats.Add(numSnaphots, 0) + stats.Add(numBackups, 0) + stats.Add(numRestores, 0) +} + // QueryRequest represents a query that returns rows, and does not modify // the database. type QueryRequest struct { @@ -188,6 +205,7 @@ type Store struct { ShutdownOnRemove bool SnapshotThreshold uint64 + SnapshotInterval time.Duration HeartbeatTimeout time.Duration ElectionTimeout time.Duration ApplyTimeout time.Duration @@ -519,6 +537,7 @@ func (s *Store) Backup(leader bool) ([]byte, error) { if err != nil { return nil, err } + stats.Add(numBackups, 1) return b, nil } @@ -646,6 +665,9 @@ func (s *Store) raftConfig() *raft.Config { if s.SnapshotThreshold != 0 { config.SnapshotThreshold = s.SnapshotThreshold } + if s.SnapshotInterval != 0 { + config.SnapshotInterval = s.SnapshotInterval + } if s.HeartbeatTimeout != 0 { config.HeartbeatTimeout = s.HeartbeatTimeout } @@ -758,6 +780,7 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) { return nil, err } + stats.Add(numSnaphots, 1) return fsm, nil } @@ -819,11 +842,16 @@ func (s *Store) Restore(rc io.ReadCloser) error { return err } - return func() error { + err = func() error { s.metaMu.Lock() defer s.metaMu.Unlock() return json.Unmarshal(b, &s.meta) }() + if err != nil { + return err + } + stats.Add(numRestores, 1) + return nil } // RegisterObserver registers an observer of Raft events diff --git a/store/store_test.go b/store/store_test.go index 8a70e6d7..1aae4472 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -125,6 +125,74 @@ func Test_SingleNodeInMemExecuteQueryFail(t *testing.T) { } } +func Test_StoreLogTruncationMultinode(t *testing.T) { + s0 := mustNewStore(true) + defer os.RemoveAll(s0.Path()) + s0.SnapshotThreshold = 4 + s0.SnapshotInterval = 100 * time.Millisecond + + if err := s0.Open(true); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s0.Close(true) + s0.WaitForLeader(10 * time.Second) + nSnaps := stats.Get(numSnaphots).String() + + // Write more than s.SnapshotThreshold statements. + queries := []string{ + `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, + `INSERT INTO foo(id, name) VALUES(1, "fiona")`, + `INSERT INTO foo(id, name) VALUES(2, "fiona")`, + `INSERT INTO foo(id, name) VALUES(3, "fiona")`, + `INSERT INTO foo(id, name) VALUES(4, "fiona")`, + `INSERT INTO foo(id, name) VALUES(5, "fiona")`, + } + for i := range queries { + _, err := s0.Execute(&ExecuteRequest{[]string{queries[i]}, false, false}) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + } + + // Wait for the snapshot to happen and log to be truncated. + for { + time.Sleep(1000 * time.Millisecond) + if stats.Get(numSnaphots).String() != nSnaps { + // It's changed, so a snap and truncate has happened. + break + } + } + + // Fire up new node and ensure it picks up all changes. This will + // involve getting a snapshot and truncated log. + s1 := mustNewStore(true) + if err := s1.Open(true); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s1.Close(true) + + // Join the second node to the first. + if err := s0.Join(s1.Addr().String()); err != nil { + t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) + } + s1.WaitForLeader(10 * time.Second) + // Wait until the log entries have been applied to the follower, + // and then query. + if err := s1.WaitForAppliedIndex(8, 5*time.Second); err != nil { + t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) + } + r, err := s1.Query(&QueryRequest{[]string{`SELECT count(*) FROM foo`}, false, true, None}) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + if exp, got := `[[5]]`, asJSON(r[0].Values); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } +} + func Test_SingleNodeFileExecuteQuery(t *testing.T) { s := mustNewStore(false) defer os.RemoveAll(s.Path())