1
0
Fork 0

Merge pull request #587 from rqlite/expvar_for_store

Ad expvar stats to Store
master
Philip O'Toole 5 years ago committed by GitHub
commit 412328c988
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,4 +1,5 @@
## 4.6.0 (unreleased)
- [PR #587](https://github.com/rqlite/rqlite/pull/587): Add expvar stats to store.
- [PR #586](https://github.com/rqlite/rqlite/pull/586): rqlite CLI now supports command history. Thanks @rhnvrm.
- [PR #585](https://github.com/rqlite/rqlite/pull/585): Add backup command to CLI: Thanks @eariassoto.
- [PR #584](https://github.com/rqlite/rqlite/pull/584): Support showing timings in the CLI. Thanks @joaodrp.

@ -8,6 +8,7 @@ import (
"encoding/binary"
"encoding/json"
"errors"
"expvar"
"fmt"
"io"
"io/ioutil"
@ -42,6 +43,22 @@ const (
appliedWaitDelay = 100 * time.Millisecond
)
const (
numSnaphots = "num_snapshots"
numBackups = "num_backups"
numRestores = "num_restores"
)
// stats captures stats for the Store.
var stats *expvar.Map
func init() {
stats = expvar.NewMap("store")
stats.Add(numSnaphots, 0)
stats.Add(numBackups, 0)
stats.Add(numRestores, 0)
}
// QueryRequest represents a query that returns rows, and does not modify
// the database.
type QueryRequest struct {
@ -188,6 +205,7 @@ type Store struct {
ShutdownOnRemove bool
SnapshotThreshold uint64
SnapshotInterval time.Duration
HeartbeatTimeout time.Duration
ElectionTimeout time.Duration
ApplyTimeout time.Duration
@ -519,6 +537,7 @@ func (s *Store) Backup(leader bool) ([]byte, error) {
if err != nil {
return nil, err
}
stats.Add(numBackups, 1)
return b, nil
}
@ -646,6 +665,9 @@ func (s *Store) raftConfig() *raft.Config {
if s.SnapshotThreshold != 0 {
config.SnapshotThreshold = s.SnapshotThreshold
}
if s.SnapshotInterval != 0 {
config.SnapshotInterval = s.SnapshotInterval
}
if s.HeartbeatTimeout != 0 {
config.HeartbeatTimeout = s.HeartbeatTimeout
}
@ -758,6 +780,7 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
return nil, err
}
stats.Add(numSnaphots, 1)
return fsm, nil
}
@ -819,11 +842,16 @@ func (s *Store) Restore(rc io.ReadCloser) error {
return err
}
return func() error {
err = func() error {
s.metaMu.Lock()
defer s.metaMu.Unlock()
return json.Unmarshal(b, &s.meta)
}()
if err != nil {
return err
}
stats.Add(numRestores, 1)
return nil
}
// RegisterObserver registers an observer of Raft events

@ -125,6 +125,74 @@ func Test_SingleNodeInMemExecuteQueryFail(t *testing.T) {
}
}
func Test_StoreLogTruncationMultinode(t *testing.T) {
s0 := mustNewStore(true)
defer os.RemoveAll(s0.Path())
s0.SnapshotThreshold = 4
s0.SnapshotInterval = 100 * time.Millisecond
if err := s0.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s0.Close(true)
s0.WaitForLeader(10 * time.Second)
nSnaps := stats.Get(numSnaphots).String()
// Write more than s.SnapshotThreshold statements.
queries := []string{
`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`,
`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
`INSERT INTO foo(id, name) VALUES(2, "fiona")`,
`INSERT INTO foo(id, name) VALUES(3, "fiona")`,
`INSERT INTO foo(id, name) VALUES(4, "fiona")`,
`INSERT INTO foo(id, name) VALUES(5, "fiona")`,
}
for i := range queries {
_, err := s0.Execute(&ExecuteRequest{[]string{queries[i]}, false, false})
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
}
// Wait for the snapshot to happen and log to be truncated.
for {
time.Sleep(1000 * time.Millisecond)
if stats.Get(numSnaphots).String() != nSnaps {
// It's changed, so a snap and truncate has happened.
break
}
}
// Fire up new node and ensure it picks up all changes. This will
// involve getting a snapshot and truncated log.
s1 := mustNewStore(true)
if err := s1.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s1.Close(true)
// Join the second node to the first.
if err := s0.Join(s1.Addr().String()); err != nil {
t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error())
}
s1.WaitForLeader(10 * time.Second)
// Wait until the log entries have been applied to the follower,
// and then query.
if err := s1.WaitForAppliedIndex(8, 5*time.Second); err != nil {
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
}
r, err := s1.Query(&QueryRequest{[]string{`SELECT count(*) FROM foo`}, false, true, None})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[5]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_SingleNodeFileExecuteQuery(t *testing.T) {
s := mustNewStore(false)
defer os.RemoveAll(s.Path())

Loading…
Cancel
Save