diff --git a/CHANGELOG.md b/CHANGELOG.md index 9908687a..5f1421ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [PR #1586](https://github.com/rqlite/rqlite/pull/1586): Auto-backups will be in WAL mode going forward, and not in DELETE mode. - [PR #1587](https://github.com/rqlite/rqlite/pull/1587): Refactor Store Backup Provider to use `io.Writer`. - [PR #1588](https://github.com/rqlite/rqlite/pull/1588): More consistent use of Sum types in Uploader. +- [PR #1589](https://github.com/rqlite/rqlite/pull/1589): Avoid SQLite file copy during automatic backups. ## 8.16.3 (January 9th 2024) ### Implementation changes and bug fixes diff --git a/auto/backup/uploader.go b/auto/backup/uploader.go index 4dab1005..e0a10293 100644 --- a/auto/backup/uploader.go +++ b/auto/backup/uploader.go @@ -1,7 +1,6 @@ package backup import ( - "compress/gzip" "context" "expvar" "fmt" @@ -35,10 +34,10 @@ type DataProvider interface { // last modified. LastModified() (time.Time, error) - // Provide writes the data-for-upload to the file specified by path. Because - // Provide may change the data in the DataProvider, it returns the current - // modified time of the data, after the data has been written to path. - Provide(path string) (time.Time, error) + // Provide writes the data-for-upload to the writer. Because Provide may + // change the data in the DataProvider, it returns the current modified + // time of the data, after the data has been written to path. + Provide(w io.Writer) (time.Time, error) } // stats captures stats for the Uploader service. @@ -158,27 +157,19 @@ func (u *Uploader) upload(ctx context.Context) error { } // Create a temporary file for the data to be uploaded - filetoUpload, err := tempFilename() + fd, err := tempFD() if err != nil { return err } - defer os.Remove(filetoUpload) - - lm, err = u.dataProvider.Provide(filetoUpload) - if err != nil { - return err - } - if err := u.compressIfNeeded(filetoUpload); err != nil { - return err - } + defer os.Remove(fd.Name()) + defer fd.Close() - fd, err := os.Open(filetoUpload) + lm, err = u.dataProvider.Provide(fd) if err != nil { return err } - defer fd.Close() - filesum, err := FileSHA256(filetoUpload) + filesum, err := FileSHA256(fd.Name()) if err != nil { return err } @@ -195,6 +186,9 @@ func (u *Uploader) upload(ctx context.Context) error { } } + if _, err := fd.Seek(0, io.SeekStart); err != nil { + return err + } cr := progress.NewCountingReader(fd) startTime := time.Now() err = u.storageClient.Upload(ctx, cr, filesum) @@ -221,54 +215,6 @@ func (u *Uploader) currentSum(ctx context.Context) (SHA256Sum, error) { return SHA256Sum(s), nil } -func (u *Uploader) compressIfNeeded(path string) error { - if !u.compress { - return nil - } - - compressedFile, err := tempFilename() - if err != nil { - return err - } - defer os.Remove(compressedFile) - - if err = compressFromTo(path, compressedFile); err != nil { - return err - } - - return os.Rename(compressedFile, path) -} - -func compressFromTo(from, to string) error { - uncompressedFd, err := os.Open(from) - if err != nil { - return err - } - defer uncompressedFd.Close() - - compressedFd, err := os.Create(to) - if err != nil { - return err - } - defer compressedFd.Close() - - gw := gzip.NewWriter(compressedFd) - _, err = io.Copy(gw, uncompressedFd) - if err != nil { - return err - } - err = gw.Close() - if err != nil { - return err - } - return nil -} - -func tempFilename() (string, error) { - f, err := os.CreateTemp("", "rqlite-upload") - if err != nil { - return "", err - } - f.Close() - return f.Name(), nil +func tempFD() (*os.File, error) { + return os.CreateTemp("", "rqlite-upload") } diff --git a/auto/backup/uploader_test.go b/auto/backup/uploader_test.go index bb876f3f..383fa505 100644 --- a/auto/backup/uploader_test.go +++ b/auto/backup/uploader_test.go @@ -1,7 +1,6 @@ package backup import ( - "compress/gzip" "context" "expvar" "fmt" @@ -66,43 +65,6 @@ func Test_UploaderSingleUpload(t *testing.T) { } } -func Test_UploaderSingleUploadCompress(t *testing.T) { - ResetStats() - var uploadedData []byte - var wg sync.WaitGroup - wg.Add(1) - sc := &mockStorageClient{ - uploadFn: func(ctx context.Context, reader io.Reader, sum []byte) error { - defer wg.Done() - - // Wrap a gzip reader about the reader, to ensure the data is compressed. - gzReader, err := gzip.NewReader(reader) - if err != nil { - return err - } - defer gzReader.Close() - - uploadedData, err = io.ReadAll(gzReader) - return err - }, - } - dp := &mockDataProvider{data: "my upload data"} - n := time.Now() - dp.lastModifiedFn = func() (time.Time, error) { - return n, nil // Single upload, since time doesn't change. - } - uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadCompress) - ctx, cancel := context.WithCancel(context.Background()) - - done := uploader.Start(ctx, nil) - wg.Wait() - cancel() - <-done - if exp, got := "my upload data", string(uploadedData); exp != got { - t.Errorf("expected uploadedData to be %s, got %s", exp, got) - } -} - // Test_UploaderSingleUpload_Checksum ensures that when the checksum in the // storage service is the same as the checksum of the data being uploaded, the // upload is skipped. @@ -354,12 +316,15 @@ func (mp *mockDataProvider) LastModified() (time.Time, error) { return time.Now(), nil } -func (mp *mockDataProvider) Provide(path string) (time.Time, error) { +func (mp *mockDataProvider) Provide(w io.Writer) (time.Time, error) { if mp.err != nil { return time.Time{}, mp.err } - return time.Now(), os.WriteFile(path, []byte(mp.data), 0644) + if _, err := w.Write([]byte(mp.data)); err != nil { + return time.Time{}, err + } + return time.Now(), nil } func mustWriteToFile(s string) string { diff --git a/cmd/rqlited/main.go b/cmd/rqlited/main.go index 7fa5fccd..89d040c9 100644 --- a/cmd/rqlited/main.go +++ b/cmd/rqlited/main.go @@ -252,7 +252,7 @@ func startAutoBackups(ctx context.Context, cfg *Config, str *store.Store) (*back if err != nil { return nil, fmt.Errorf("failed to parse auto-backup file: %s", err.Error()) } - provider := store.NewProvider(str, false) + provider := store.NewProvider(str, uCfg.Vacuum, !uCfg.NoCompress) sc := aws.NewS3Client(s3cfg.Endpoint, s3cfg.Region, s3cfg.AccessKeyID, s3cfg.SecretAccessKey, s3cfg.Bucket, s3cfg.Path, s3cfg.ForcePathStyle) u := backup.NewUploader(sc, provider, time.Duration(uCfg.Interval), !uCfg.NoCompress) diff --git a/store/provider.go b/store/provider.go index f0dd61d2..02c19e50 100644 --- a/store/provider.go +++ b/store/provider.go @@ -2,7 +2,6 @@ package store import ( "io" - "os" "time" "github.com/rqlite/rqlite/v8/command/proto" @@ -11,18 +10,22 @@ import ( // Provider implements the uploader Provider interface, allowing the // Store to be used as a DataProvider for an uploader. type Provider struct { - str *Store - vacuum bool + str *Store + vacuum bool + compress bool nRetries int retryInterval time.Duration } -// NewProvider returns a new instance of Provider. -func NewProvider(s *Store, v bool) *Provider { +// NewProvider returns a new instance of Provider. If v is true, the +// SQLite database will be VACUUMed before being provided. If c is +// true, the SQLite database will be compressed before being provided. +func NewProvider(s *Store, v, c bool) *Provider { return &Provider{ str: s, vacuum: v, + compress: c, nRetries: 10, retryInterval: 500 * time.Millisecond, } @@ -37,7 +40,7 @@ func (p *Provider) LastModified() (time.Time, error) { // Provider writes the SQLite database to the given path. If path exists, // it will be overwritten. -func (p *Provider) Provide(path string) (t time.Time, retErr error) { +func (p *Provider) Provide(w io.Writer) (t time.Time, retErr error) { stats.Add(numProviderProvides, 1) defer func() { if retErr != nil { @@ -45,18 +48,10 @@ func (p *Provider) Provide(path string) (t time.Time, retErr error) { } }() - fd, err := os.Create(path) - if err != nil { - return time.Time{}, err - } - defer fd.Close() - return p.provide(fd) -} - -func (p *Provider) provide(w io.Writer) (time.Time, error) { br := &proto.BackupRequest{ - Format: proto.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY, - Vacuum: p.vacuum, + Format: proto.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY, + Vacuum: p.vacuum, + Compress: p.compress, } nRetries := 0 for { diff --git a/store/provider_test.go b/store/provider_test.go index b46e9413..c3a1e4d1 100644 --- a/store/provider_test.go +++ b/store/provider_test.go @@ -1,6 +1,8 @@ package store import ( + "compress/gzip" + "io" "os" "testing" "time" @@ -46,10 +48,11 @@ func Test_SingleNodeProvide(t *testing.T) { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } - tempFile := mustCreateTempFile() - defer os.Remove(tempFile) - provider := NewProvider(s0, false) - if _, err := provider.Provide(tempFile); err != nil { + tmpFd := mustCreateTempFD() + defer os.Remove(tmpFd.Name()) + defer tmpFd.Close() + provider := NewProvider(s0, false, false) + if _, err := provider.Provide(tmpFd); err != nil { t.Fatalf("failed to provide SQLite data: %s", err.Error()) } @@ -68,7 +71,89 @@ func Test_SingleNodeProvide(t *testing.T) { t.Fatalf("Error waiting for leader: %s", err) } - err = s1.Load(loadRequestFromFile(tempFile)) + err = s1.Load(loadRequestFromFile(tmpFd.Name())) + if err != nil { + t.Fatalf("failed to load provided SQLite data: %s", err.Error()) + } + qr = queryRequestFromString("SELECT * FROM foo", false, false) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err = s1.Query(qr) + if err != nil { + t.Fatalf("failed to query leader node: %s", err.Error()) + } + if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } +} + +func Test_SingleNodeProvide_Compress(t *testing.T) { + s0, ln := mustNewStore(t) + defer ln.Close() + + if err := s0.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + defer s0.Close(true) + if _, err := s0.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + er := executeRequestFromStrings([]string{ + `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, + `INSERT INTO foo(id, name) VALUES(1, "fiona")`, + }, false, false) + _, err := s0.Execute(er) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + qr := queryRequestFromString("SELECT * FROM foo", false, false) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_NONE + r, err := s0.Query(qr) + if err != nil { + t.Fatalf("failed to query leader node: %s", err.Error()) + } + if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + + tmpFd := mustCreateTempFD() + defer os.Remove(tmpFd.Name()) + defer tmpFd.Close() + provider := NewProvider(s0, false, true) + if _, err := provider.Provide(tmpFd); err != nil { + t.Fatalf("failed to provide SQLite data: %s", err.Error()) + } + + unCompressedFile, err := gunzip(tmpFd.Name()) + if err != nil { + t.Fatalf("failed to gunzip provided SQLite data: %s", err.Error()) + } + + // Load the provided data into a new store and check it. + s1, ln := mustNewStore(t) + defer ln.Close() + + if err := s1.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + if err := s1.Bootstrap(NewServer(s1.ID(), s1.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + defer s1.Close(true) + if _, err := s1.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + err = s1.Load(loadRequestFromFile(unCompressedFile)) if err != nil { t.Fatalf("failed to load provided SQLite data: %s", err.Error()) } @@ -103,10 +188,11 @@ func Test_SingleNodeProvideNoData(t *testing.T) { t.Fatalf("Error waiting for leader: %s", err) } - tmpFile := mustCreateTempFile() - defer os.Remove(tmpFile) - provider := NewProvider(s, false) - if _, err := provider.Provide(tmpFile); err != nil { + tmpFd := mustCreateTempFD() + defer os.Remove(tmpFd.Name()) + defer tmpFd.Close() + provider := NewProvider(s, false, false) + if _, err := provider.Provide(tmpFd); err != nil { t.Fatalf("store failed to provide: %s", err.Error()) } } @@ -128,7 +214,7 @@ func Test_SingleNodeProvideLastModified(t *testing.T) { tmpFile := mustCreateTempFile() defer os.Remove(tmpFile) - provider := NewProvider(s, false) + provider := NewProvider(s, false, false) lm, err := provider.LastModified() if err != nil { @@ -222,3 +308,27 @@ func Test_SingleNodeProvideLastModified(t *testing.T) { t.Fatalf("last modified time should have changed") } } + +func gunzip(file string) (string, error) { + f, err := os.Open(file) + if err != nil { + return "", err + } + defer f.Close() + + gz, err := gzip.NewReader(f) + if err != nil { + return "", err + } + defer gz.Close() + + tmpFd := mustCreateTempFD() + defer tmpFd.Close() + + _, err = io.Copy(tmpFd, gz) + if err != nil { + return "", err + } + + return tmpFd.Name(), nil +}