You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
226 lines
7.3 KiB
Go
226 lines
7.3 KiB
Go
package store
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/rqlite/rqlite/command"
|
|
)
|
|
|
|
func openStoreCloseStartup(t *testing.T, s *Store) {
|
|
if err := s.Open(); err != nil {
|
|
t.Fatalf("failed to open single-node store: %s", err.Error())
|
|
}
|
|
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
|
|
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
|
|
}
|
|
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
|
|
t.Fatalf("Error waiting for leader: %s", err)
|
|
}
|
|
er := executeRequestFromStrings([]string{
|
|
`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`,
|
|
`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
|
|
}, false, false)
|
|
_, err := s.Execute(er)
|
|
if err != nil {
|
|
t.Fatalf("failed to execute on single node: %s", err.Error())
|
|
}
|
|
|
|
fsmIdx, err := s.WaitForAppliedFSM(5 * time.Second)
|
|
if err != nil {
|
|
t.Fatalf("failed to wait for fsmIndex: %s", err.Error())
|
|
}
|
|
|
|
if err := s.Close(true); err != nil {
|
|
t.Fatalf("failed to close single-node store: %s", err.Error())
|
|
}
|
|
|
|
// Reopen it and confirm data still there.
|
|
if err := s.Open(); err != nil {
|
|
t.Fatalf("failed to open single-node store: %s", err.Error())
|
|
}
|
|
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
|
|
t.Fatalf("Error waiting for leader: %s", err)
|
|
}
|
|
|
|
// Wait until the log entries have been applied to the voting follower,
|
|
// and then query.
|
|
if _, err := s.WaitForFSMIndex(fsmIdx, 5*time.Second); err != nil {
|
|
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
|
|
}
|
|
|
|
qr := queryRequestFromString("SELECT * FROM foo", false, false)
|
|
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
|
|
r, err := s.Query(qr)
|
|
if err != nil {
|
|
t.Fatalf("failed to query single node: %s", err.Error())
|
|
}
|
|
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
|
|
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
|
|
}
|
|
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
|
|
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
|
|
}
|
|
|
|
if err := s.Close(true); err != nil {
|
|
t.Fatalf("failed to close single-node store: %s", err.Error())
|
|
}
|
|
|
|
// Tweak snapshot params to force a snap to take place.
|
|
s.SnapshotThreshold = 4
|
|
s.SnapshotInterval = 100 * time.Millisecond
|
|
if err := s.Open(); err != nil {
|
|
t.Fatalf("failed to open single-node store: %s", err.Error())
|
|
}
|
|
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
|
|
t.Fatalf("Error waiting for leader: %s", err)
|
|
}
|
|
|
|
// Insert new records to trigger a snapshot.
|
|
queryTest := func(s *Store, c int) {
|
|
qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, false)
|
|
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
|
|
r, err := s.Query(qr)
|
|
if err != nil {
|
|
t.Fatalf("failed to query single node: %s", err.Error())
|
|
}
|
|
if exp, got := `["COUNT(*)"]`, asJSON(r[0].Columns); exp != got {
|
|
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
|
|
}
|
|
if exp, got := fmt.Sprintf(`[[%d]]`, c), asJSON(r[0].Values); exp != got {
|
|
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
|
|
}
|
|
}
|
|
for i := 0; i < 9; i++ {
|
|
er := executeRequestFromStrings([]string{
|
|
`INSERT INTO foo(name) VALUES("fiona")`,
|
|
}, false, false)
|
|
if _, err := s.Execute(er); err != nil {
|
|
t.Fatalf("failed to execute on single node: %s", err.Error())
|
|
}
|
|
}
|
|
queryTest(s, 10)
|
|
|
|
// Wait for a snapshot to take place.
|
|
for {
|
|
time.Sleep(100 * time.Millisecond)
|
|
s.numSnapshotsMu.Lock()
|
|
ns := s.numSnapshots
|
|
s.numSnapshotsMu.Unlock()
|
|
if ns > 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
fsmIdx, err = s.WaitForAppliedFSM(5 * time.Second)
|
|
if err != nil {
|
|
t.Fatalf("failed to wait for fsmIndex: %s", err.Error())
|
|
}
|
|
|
|
// Close and re-open to make sure all continues to work with recovery from snapshot.
|
|
if err := s.Close(true); err != nil {
|
|
t.Fatalf("failed to close single-node store: %s", err.Error())
|
|
}
|
|
if err := s.Open(); err != nil {
|
|
t.Fatalf("failed to open single-node store: %s", err.Error())
|
|
}
|
|
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
|
|
t.Fatalf("Error waiting for leader: %s", err)
|
|
}
|
|
|
|
// Wait until the log entries have been applied to the voting follower,
|
|
// and then query.
|
|
if _, err := s.WaitForFSMIndex(fsmIdx, 5*time.Second); err != nil {
|
|
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
|
|
}
|
|
|
|
qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, false)
|
|
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
|
|
r, err = s.Query(qr)
|
|
if err != nil {
|
|
t.Fatalf("failed to query single node: %s", err.Error())
|
|
}
|
|
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[10]]}]`, asJSON(r); exp != got {
|
|
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
|
|
}
|
|
|
|
if err := s.Close(true); err != nil {
|
|
t.Fatalf("failed to close single-node store: %s", err.Error())
|
|
}
|
|
|
|
// Set snapshot threshold high to effectively disable, reopen store, write
|
|
// one more record, and then reopen again, ensure all data is there.
|
|
s.SnapshotThreshold = 8192
|
|
s.SnapshotInterval = 100 * time.Second
|
|
|
|
if err := s.Open(); err != nil {
|
|
t.Fatalf("failed to open single-node store: %s", err.Error())
|
|
}
|
|
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
|
|
t.Fatalf("Error waiting for leader: %s", err)
|
|
}
|
|
_, err = s.Execute(executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false))
|
|
if err != nil {
|
|
t.Fatalf("failed to execute on single node: %s", err.Error())
|
|
}
|
|
qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, false)
|
|
r, err = s.Query(qr)
|
|
if err != nil {
|
|
t.Fatalf("failed to query single node: %s", err.Error())
|
|
}
|
|
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[11]]}]`, asJSON(r); exp != got {
|
|
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
|
|
}
|
|
|
|
fsmIdx, err = s.WaitForAppliedFSM(5 * time.Second)
|
|
if err != nil {
|
|
t.Fatalf("failed to wait for fsmIndex: %s", err.Error())
|
|
}
|
|
|
|
if err := s.Close(true); err != nil {
|
|
t.Fatalf("failed to close single-node store: %s", err.Error())
|
|
}
|
|
if err := s.Open(); err != nil {
|
|
t.Fatalf("failed to open single-node store: %s", err.Error())
|
|
}
|
|
defer s.Close(true)
|
|
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
|
|
t.Fatalf("Error waiting for leader: %s", err)
|
|
}
|
|
|
|
// Wait until the log entries have been applied to the voting follower,
|
|
// and then query.
|
|
if _, err := s.WaitForFSMIndex(fsmIdx, 5*time.Second); err != nil {
|
|
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
|
|
}
|
|
|
|
qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, false)
|
|
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
|
|
r, err = s.Query(qr)
|
|
if err != nil {
|
|
t.Fatalf("failed to query single node: %s", err.Error())
|
|
}
|
|
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[11]]}]`, asJSON(r); exp != got {
|
|
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
|
|
}
|
|
}
|
|
|
|
// Test_OpenStoreCloseStartupOnDiskSingleNode tests that on-disk
|
|
// works fine during various restart scenarios.
|
|
func Test_OpenStoreCloseStartupOnDiskSingleNode(t *testing.T) {
|
|
s, ln := mustNewStore(t, false)
|
|
defer ln.Close()
|
|
|
|
openStoreCloseStartup(t, s)
|
|
}
|
|
|
|
// Test_OpenStoreCloseStartupMemoryOnlySingleNode tests that in-memory
|
|
// works fine during various restart scenarios.
|
|
func Test_OpenStoreCloseStartupMemoryOnlySingleNode(t *testing.T) {
|
|
s, ln := mustNewStore(t, true)
|
|
defer ln.Close()
|
|
|
|
openStoreCloseStartup(t, s)
|
|
}
|