1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

665 lines
19 KiB
Go

package snapshot
import (
"bytes"
"io"
"strings"
"testing"
"github.com/hashicorp/raft"
"github.com/rqlite/rqlite/db"
)
func Test_NewStore(t *testing.T) {
tmpDir := t.TempDir()
s, err := NewStore(tmpDir)
if err != nil {
t.Fatal(err)
}
if s == nil {
t.Fatal("expected non-nil store")
}
generations, err := s.GetGenerations()
if err != nil {
t.Fatalf("failed to get generations: %s", err.Error())
}
if len(generations) != 0 {
t.Fatalf("expected 0 generation, got %d", len(generations))
}
_, ok, err := s.GetCurrentGenerationDir()
if err != nil {
t.Fatalf("failed to get current generation dir: %s", err.Error())
}
if ok {
t.Fatalf("expected current generation dir not to exist")
}
nextGenDir, err := s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
if !strings.HasSuffix(nextGenDir, firstGeneration) {
t.Fatalf("expected next generation dir to be empty, got %s", nextGenDir)
}
}
func Test_NewStore_ListOpenEmpty(t *testing.T) {
dir := t.TempDir()
s, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
if !s.FullNeeded() {
t.Fatalf("expected full snapshots to be needed")
}
if snaps, err := s.List(); err != nil {
t.Fatalf("failed to list snapshots: %s", err)
} else if len(snaps) != 0 {
t.Fatalf("expected 1 snapshots, got %d", len(snaps))
}
if _, _, err := s.Open("non-existent"); err != ErrSnapshotNotFound {
t.Fatalf("expected ErrSnapshotNotFound, got %s", err)
}
}
// Test_WALSnapshotStore_CreateFull performs detailed testing of the
// snapshot creation process. It is critical that snapshots are created
// correctly, so this test is thorough.
func Test_Store_CreateFullThenIncremental(t *testing.T) {
checkSnapshotCount := func(s *Store, exp int) *raft.SnapshotMeta {
snaps, err := s.List()
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := exp, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
if len(snaps) == 0 {
return nil
}
return snaps[0]
}
dir := t.TempDir()
str, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
if !str.FullNeeded() {
t.Fatalf("expected full snapshots to be needed")
}
testConfig1 := makeTestConfiguration("1", "2")
sink, err := str.Create(1, 22, 33, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 1st snapshot sink: %s", err)
}
//////////////////////////////////////////////////////////////////////////
// Create a full snapshot and write it to the sink.
fullSnap := NewFullSnapshot("testdata/db-and-wals/backup.db")
if err := fullSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist full snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
if str.FullNeeded() {
t.Fatalf("full snapshot still needed")
}
meta := checkSnapshotCount(str, 1)
if meta.Index != 22 || meta.Term != 33 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
// Open the latest snapshot and check that it's correct.
raftMeta, rc, err := str.Open(meta.ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", meta.ID, err)
}
crc := &countingReadCloser{rc: rc}
streamHdr, _, err := NewStreamHeaderFromReader(crc)
if err != nil {
t.Fatalf("error reading stream header: %v", err)
}
streamSnap := streamHdr.GetFullSnapshot()
if streamSnap == nil {
t.Fatal("got nil FullSnapshot")
}
dbInfo := streamSnap.GetDb()
if dbInfo == nil {
t.Fatal("got nil DB info")
}
if !compareReaderToFile(crc, "testdata/db-and-wals/backup.db") {
t.Fatalf("database file does not match what is in snapshot")
}
// should be no more data
if _, err := crc.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
if err := crc.Close(); err != nil {
t.Fatalf("failed to close snapshot reader: %s", err)
}
if exp, got := raftMeta.Size, int64(crc.n); exp != got {
t.Fatalf("expected snapshot size to be %d, got %d", exp, got)
}
crc.Close()
//////////////////////////////////////////////////////////////////////////
// Incremental snapshot next
sink, err = str.Create(2, 55, 66, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 2nd snapshot sink: %s", err)
}
walData := mustReadFile("testdata/db-and-wals/wal-00")
incSnap := NewWALSnapshot(walData)
if err := incSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist incremental snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
meta = checkSnapshotCount(str, 1)
if meta.Index != 55 || meta.Term != 66 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
// Open the latest snapshot again, and recreate the database so we
// can check its contents.
raftMeta, rc, err = str.Open(meta.ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", meta.ID, err)
}
crc = &countingReadCloser{rc: rc}
streamHdr, _, err = NewStreamHeaderFromReader(crc)
if err != nil {
t.Fatalf("error reading stream header: %v", err)
}
streamSnap = streamHdr.GetFullSnapshot()
if streamSnap == nil {
t.Fatal("got nil FullSnapshot")
}
tmpFile := t.TempDir() + "/db"
if err := ReplayDB(streamSnap, crc, tmpFile); err != nil {
t.Fatalf("failed to replay database: %s", err)
}
checkDB, err := db.Open(tmpFile, false, true)
if err != nil {
t.Fatalf("failed to open database: %s", err)
}
defer checkDB.Close()
// Database should now have 1 one after replaying the WAL.
rows, err := checkDB.QueryStringStmt("SELECT * FROM foo")
if err != nil {
t.Fatalf("failed to query database: %s", err)
}
if exp, got := `[{"columns":["id","value"],"types":["integer","text"],"values":[[1,"Row 0"]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query, exp %s, got %s", exp, got)
}
// should be no more data
if _, err := crc.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
if exp, got := raftMeta.Size, int64(crc.n); exp != got {
t.Fatalf("expected snapshot size to be %d, got %d", exp, got)
}
crc.Close()
//////////////////////////////////////////////////////////////////////////
// Do it again!
sink, err = str.Create(2, 77, 88, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 2nd snapshot sink: %s", err)
}
walData = mustReadFile("testdata/db-and-wals/wal-01")
incSnap = NewWALSnapshot(walData)
if err := incSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist incremental snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
meta = checkSnapshotCount(str, 1)
if meta.Index != 77 || meta.Term != 88 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
// Open the latest snapshot again, and recreate the database so we
// can check its contents.
raftMeta, rc, err = str.Open(meta.ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", meta.ID, err)
}
crc = &countingReadCloser{rc: rc}
streamHdr, _, err = NewStreamHeaderFromReader(crc)
if err != nil {
t.Fatalf("error reading stream header: %v", err)
}
streamSnap = streamHdr.GetFullSnapshot()
if streamSnap == nil {
t.Fatal("got nil FullSnapshot")
}
tmpFile = t.TempDir() + "/db"
if err := ReplayDB(streamSnap, crc, tmpFile); err != nil {
t.Fatalf("failed to replay database: %s", err)
}
checkDB, err = db.Open(tmpFile, false, true)
if err != nil {
t.Fatalf("failed to open database: %s", err)
}
defer checkDB.Close()
rows, err = checkDB.QueryStringStmt("SELECT * FROM foo")
if err != nil {
t.Fatalf("failed to query database: %s", err)
}
if exp, got := `[{"columns":["id","value"],"types":["integer","text"],"values":[[1,"Row 0"],[2,"Row 1"]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query, exp %s, got %s", exp, got)
}
// should be no more data
if _, err := crc.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
if exp, got := raftMeta.Size, int64(crc.n); exp != got {
t.Fatalf("expected snapshot size to be %d, got %d", exp, got)
}
crc.Close()
//////////////////////////////////////////////////////////////////////////
// One last time, after a reaping took place in the middle.
sink, err = str.Create(2, 100, 200, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 2nd snapshot sink: %s", err)
}
walData = mustReadFile("testdata/db-and-wals/wal-02")
incSnap = NewWALSnapshot(walData)
if err := incSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist incremental snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
meta = checkSnapshotCount(str, 1)
if meta.Index != 100 || meta.Term != 200 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
// Open the latest snapshot again, and recreate the database so we
// can check its contents.
raftMeta, rc, err = str.Open(meta.ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", meta.ID, err)
}
crc = &countingReadCloser{rc: rc}
streamHdr, _, err = NewStreamHeaderFromReader(crc)
if err != nil {
t.Fatalf("error reading stream header: %v", err)
}
streamSnap = streamHdr.GetFullSnapshot()
if streamSnap == nil {
t.Fatal("got nil FullSnapshot")
}
tmpFile = t.TempDir() + "/db"
if err := ReplayDB(streamSnap, crc, tmpFile); err != nil {
t.Fatalf("failed to replay database: %s", err)
}
checkDB, err = db.Open(tmpFile, false, true)
if err != nil {
t.Fatalf("failed to open database: %s", err)
}
defer checkDB.Close()
rows, err = checkDB.QueryStringStmt("SELECT * FROM foo")
if err != nil {
t.Fatalf("failed to query database: %s", err)
}
if exp, got := `[{"columns":["id","value"],"types":["integer","text"],"values":[[1,"Row 0"],[2,"Row 1"],[3,"Row 2"]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query, exp %s, got %s", exp, got)
}
// should be no more data
if _, err := crc.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
if exp, got := raftMeta.Size, int64(crc.n); exp != got {
t.Fatalf("expected snapshot size to be %d, got %d", exp, got)
}
crc.Close()
}
// Test_WALSnapshotStore_CreateFullThenFull ensures two full snapshots
// can be created and persisted back-to-back.
func Test_Store_CreateFullThenFull(t *testing.T) {
checkSnapshotCount := func(s *Store, exp int) *raft.SnapshotMeta {
snaps, err := s.List()
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := exp, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
if len(snaps) == 0 {
return nil
}
return snaps[0]
}
dir := t.TempDir()
str, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
if !str.FullNeeded() {
t.Fatalf("expected full snapshots to be needed")
}
testConfig1 := makeTestConfiguration("1", "2")
//////////////////////////////////////////////////////////////////////////
// Create a full snapshot and write it to the sink.
sink, err := str.Create(1, 22, 33, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 1st snapshot sink: %s", err)
}
fullSnap := NewFullSnapshot("testdata/db-and-wals/backup.db")
if err := fullSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist full snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
if str.FullNeeded() {
t.Fatalf("full snapshot still needed")
}
meta := checkSnapshotCount(str, 1)
if meta.Index != 22 || meta.Term != 33 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
//////////////////////////////////////////////////////////////////////////
// Create a second full snapshot and write it to the sink.
sink, err = str.Create(1, 44, 55, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 1st snapshot sink: %s", err)
}
fullSnap = NewFullSnapshot("testdata/db-and-wals/backup.db")
if err := fullSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist full snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
if str.FullNeeded() {
t.Fatalf("full snapshot still needed")
}
meta = checkSnapshotCount(str, 1)
if meta.Index != 44 || meta.Term != 55 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
}
func Test_Store_ReapGenerations(t *testing.T) {
dir := t.TempDir()
s, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
testCurrGenDirIs := func(exp string) string {
curGenDir, ok, err := s.GetCurrentGenerationDir()
if err != nil {
t.Fatalf("failed to get current generation dir: %s", err.Error())
}
if !ok {
t.Fatalf("expected current generation dir to exist")
}
if curGenDir != exp {
t.Fatalf("expected current generation dir to be %s, got %s", exp, curGenDir)
}
return curGenDir
}
testGenCountIs := func(exp int) {
generations, err := s.GetGenerations()
if err != nil {
t.Fatalf("failed to get generations: %s", err.Error())
}
if exp, got := exp, len(generations); exp != got {
t.Fatalf("expected %d generations, got %d", exp, got)
}
}
testReapsOK := func(expN int) {
n, err := s.ReapGenerations()
if err != nil {
t.Fatalf("reaping failed: %s", err.Error())
}
if n != expN {
t.Fatalf("expected %d generations to be reaped, got %d", expN, n)
}
}
var nextGenDir string
nextGenDir, err = s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
mustCreateDir(nextGenDir)
testCurrGenDirIs(nextGenDir)
testReapsOK(0)
// Create another generation and then tell the Store to reap.
nextGenDir, err = s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
mustCreateDir(nextGenDir)
testGenCountIs(2)
testReapsOK(1)
testCurrGenDirIs(nextGenDir)
// Finally, test reaping lots of generations.
for i := 0; i < 10; i++ {
nextGenDir, err = s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
mustCreateDir(nextGenDir)
}
testGenCountIs(11)
testReapsOK(10)
testGenCountIs(1)
testCurrGenDirIs(nextGenDir)
}
func compareReaderToFile(r io.Reader, path string) bool {
b := mustReadFile(path)
rb := mustReadAll(r)
return bytes.Equal(b, rb)
}
func mustReadAll(r io.Reader) []byte {
b, err := io.ReadAll(r)
if err != nil {
panic(err)
}
return b
}
type countingReadCloser struct {
rc io.ReadCloser
n int
}
func (c *countingReadCloser) Read(p []byte) (int, error) {
n, err := c.rc.Read(p)
c.n += n
return n, err
}
func (c *countingReadCloser) Close() error {
return c.rc.Close()
}
func Test_StoreReaping(t *testing.T) {
dir := t.TempDir()
str, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
str.noAutoreap = true
testConfig := makeTestConfiguration("1", "2")
// Create a full snapshot.
snapshot := NewFullSnapshot("testdata/db-and-wals/backup.db")
sink, err := str.Create(1, 1, 1, testConfig, 4, nil)
if err != nil {
t.Fatalf("failed to create snapshot sink: %s", err)
}
stream, err := snapshot.OpenStream()
if err != nil {
t.Fatalf("failed to open snapshot stream: %s", err)
}
_, err = io.Copy(sink, stream)
if err != nil {
t.Fatalf("failed to write snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close snapshot sink: %s", err)
}
createIncSnapshot := func(index, term uint64, file string) {
snapshot := NewWALSnapshot(mustReadFile(file))
sink, err := str.Create(1, index, term, testConfig, 4, nil)
if err != nil {
t.Fatalf("failed to create snapshot sink: %s", err)
}
stream, err := snapshot.OpenStream()
if err != nil {
t.Fatalf("failed to open snapshot stream: %s", err)
}
_, err = io.Copy(sink, stream)
if err != nil {
t.Fatalf("failed to write snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close snapshot sink: %s", err)
}
}
createIncSnapshot(3, 2, "testdata/db-and-wals/wal-00")
createIncSnapshot(5, 3, "testdata/db-and-wals/wal-01")
createIncSnapshot(7, 4, "testdata/db-and-wals/wal-02")
createIncSnapshot(9, 5, "testdata/db-and-wals/wal-03")
// There should be 5 snapshot directories in the current generation.
generationsDir, ok, err := str.GetCurrentGenerationDir()
if err != nil {
t.Fatalf("failed to get generations dir: %s", err)
}
if !ok {
t.Fatalf("expected generations dir to exist")
}
snaps, err := str.getSnapshots(generationsDir)
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := 5, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
for _, snap := range snaps[0:4] {
if snap.Full {
t.Fatalf("snapshot %s is full", snap.ID)
}
}
if !snaps[4].Full {
t.Fatalf("snapshot %s is incremental", snaps[4].ID)
}
// Reap just the first snapshot, which is full.
n, err := str.ReapSnapshots(generationsDir, 4)
if err != nil {
t.Fatalf("failed to reap full snapshot: %s", err)
}
if exp, got := 1, n; exp != got {
t.Fatalf("expected %d snapshots to be reaped, got %d", exp, got)
}
snaps, err = str.getSnapshots(generationsDir)
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := 4, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
// Reap all but the last two snapshots. The remaining snapshots
// should all be incremental.
n, err = str.ReapSnapshots(generationsDir, 2)
if err != nil {
t.Fatalf("failed to reap snapshots: %s", err)
}
if exp, got := 2, n; exp != got {
t.Fatalf("expected %d snapshots to be reaped, got %d", exp, got)
}
snaps, err = str.getSnapshots(generationsDir)
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := 2, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
for _, snap := range snaps {
if snap.Full {
t.Fatalf("snapshot %s is full", snap.ID)
}
}
if snaps[0].Index != 9 && snaps[1].Term != 5 {
t.Fatal("snap 0 is wrong, exp: ", snaps[0].Index, snaps[1].Term)
}
if snaps[1].Index != 7 && snaps[1].Term != 3 {
t.Fatal("snap 1 is wrong, exp:", snaps[1].Index, snaps[1].Term)
}
// Open the latest snapshot, write it to disk, and check its contents.
_, rc, err := str.Open(snaps[0].ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", snaps[0].ID, err)
}
defer rc.Close()
strHdr, _, err := NewStreamHeaderFromReader(rc)
if err != nil {
t.Fatalf("error reading stream header: %v", err)
}
streamSnap := strHdr.GetFullSnapshot()
if streamSnap == nil {
t.Fatal("got nil FullSnapshot")
}
tmpFile := t.TempDir() + "/db"
if err := ReplayDB(streamSnap, rc, tmpFile); err != nil {
t.Fatalf("failed to replay database: %s", err)
}
// Check the database.
db, err := db.Open(tmpFile, false, true)
if err != nil {
t.Fatalf("failed to open database: %s", err)
}
defer db.Close()
rows, err := db.QueryStringStmt("SELECT COUNT(*) FROM foo")
if err != nil {
t.Fatalf("failed to query database: %s", err)
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[4]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query exp: %s got: %s", exp, got)
}
}