1
0
Fork 0

Configure Store Provider

master
Philip O'Toole 8 months ago
parent 056bd639e1
commit aa2cf2dd46

@ -1,7 +1,6 @@
package backup
import (
"compress/gzip"
"context"
"expvar"
"fmt"
@ -35,10 +34,10 @@ type DataProvider interface {
// last modified.
LastModified() (time.Time, error)
// Provide writes the data-for-upload to the file specified by path. Because
// Provide may change the data in the DataProvider, it returns the current
// modified time of the data, after the data has been written to path.
Provide(path string) (time.Time, error)
// Provide writes the data-for-upload to the writer. Because Provide may
// change the data in the DataProvider, it returns the current modified
// time of the data, after the data has been written to path.
Provide(w io.Writer) (time.Time, error)
}
// stats captures stats for the Uploader service.
@ -158,27 +157,19 @@ func (u *Uploader) upload(ctx context.Context) error {
}
// Create a temporary file for the data to be uploaded
filetoUpload, err := tempFilename()
fd, err := tempFD()
if err != nil {
return err
}
defer os.Remove(filetoUpload)
lm, err = u.dataProvider.Provide(filetoUpload)
if err != nil {
return err
}
if err := u.compressIfNeeded(filetoUpload); err != nil {
return err
}
defer os.Remove(fd.Name())
defer fd.Close()
fd, err := os.Open(filetoUpload)
lm, err = u.dataProvider.Provide(fd)
if err != nil {
return err
}
defer fd.Close()
filesum, err := FileSHA256(filetoUpload)
filesum, err := FileSHA256(fd.Name())
if err != nil {
return err
}
@ -195,6 +186,9 @@ func (u *Uploader) upload(ctx context.Context) error {
}
}
if _, err := fd.Seek(0, io.SeekStart); err != nil {
return err
}
cr := progress.NewCountingReader(fd)
startTime := time.Now()
err = u.storageClient.Upload(ctx, cr, filesum)
@ -221,54 +215,6 @@ func (u *Uploader) currentSum(ctx context.Context) (SHA256Sum, error) {
return SHA256Sum(s), nil
}
func (u *Uploader) compressIfNeeded(path string) error {
if !u.compress {
return nil
}
compressedFile, err := tempFilename()
if err != nil {
return err
}
defer os.Remove(compressedFile)
if err = compressFromTo(path, compressedFile); err != nil {
return err
}
return os.Rename(compressedFile, path)
}
func compressFromTo(from, to string) error {
uncompressedFd, err := os.Open(from)
if err != nil {
return err
}
defer uncompressedFd.Close()
compressedFd, err := os.Create(to)
if err != nil {
return err
}
defer compressedFd.Close()
gw := gzip.NewWriter(compressedFd)
_, err = io.Copy(gw, uncompressedFd)
if err != nil {
return err
}
err = gw.Close()
if err != nil {
return err
}
return nil
}
func tempFilename() (string, error) {
f, err := os.CreateTemp("", "rqlite-upload")
if err != nil {
return "", err
}
f.Close()
return f.Name(), nil
func tempFD() (*os.File, error) {
return os.CreateTemp("", "rqlite-upload")
}

@ -1,7 +1,6 @@
package backup
import (
"compress/gzip"
"context"
"expvar"
"fmt"
@ -66,43 +65,6 @@ func Test_UploaderSingleUpload(t *testing.T) {
}
}
func Test_UploaderSingleUploadCompress(t *testing.T) {
ResetStats()
var uploadedData []byte
var wg sync.WaitGroup
wg.Add(1)
sc := &mockStorageClient{
uploadFn: func(ctx context.Context, reader io.Reader, sum []byte) error {
defer wg.Done()
// Wrap a gzip reader about the reader, to ensure the data is compressed.
gzReader, err := gzip.NewReader(reader)
if err != nil {
return err
}
defer gzReader.Close()
uploadedData, err = io.ReadAll(gzReader)
return err
},
}
dp := &mockDataProvider{data: "my upload data"}
n := time.Now()
dp.lastModifiedFn = func() (time.Time, error) {
return n, nil // Single upload, since time doesn't change.
}
uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadCompress)
ctx, cancel := context.WithCancel(context.Background())
done := uploader.Start(ctx, nil)
wg.Wait()
cancel()
<-done
if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got)
}
}
// Test_UploaderSingleUpload_Checksum ensures that when the checksum in the
// storage service is the same as the checksum of the data being uploaded, the
// upload is skipped.
@ -354,12 +316,15 @@ func (mp *mockDataProvider) LastModified() (time.Time, error) {
return time.Now(), nil
}
func (mp *mockDataProvider) Provide(path string) (time.Time, error) {
func (mp *mockDataProvider) Provide(w io.Writer) (time.Time, error) {
if mp.err != nil {
return time.Time{}, mp.err
}
return time.Now(), os.WriteFile(path, []byte(mp.data), 0644)
if _, err := w.Write([]byte(mp.data)); err != nil {
return time.Time{}, err
}
return time.Now(), nil
}
func mustWriteToFile(s string) string {

@ -252,7 +252,7 @@ func startAutoBackups(ctx context.Context, cfg *Config, str *store.Store) (*back
if err != nil {
return nil, fmt.Errorf("failed to parse auto-backup file: %s", err.Error())
}
provider := store.NewProvider(str, false)
provider := store.NewProvider(str, uCfg.Vacuum, !uCfg.NoCompress)
sc := aws.NewS3Client(s3cfg.Endpoint, s3cfg.Region, s3cfg.AccessKeyID, s3cfg.SecretAccessKey,
s3cfg.Bucket, s3cfg.Path, s3cfg.ForcePathStyle)
u := backup.NewUploader(sc, provider, time.Duration(uCfg.Interval), !uCfg.NoCompress)

@ -2,7 +2,6 @@ package store
import (
"io"
"os"
"time"
"github.com/rqlite/rqlite/v8/command/proto"
@ -11,18 +10,22 @@ import (
// Provider implements the uploader Provider interface, allowing the
// Store to be used as a DataProvider for an uploader.
type Provider struct {
str *Store
vacuum bool
str *Store
vacuum bool
compress bool
nRetries int
retryInterval time.Duration
}
// NewProvider returns a new instance of Provider.
func NewProvider(s *Store, v bool) *Provider {
// NewProvider returns a new instance of Provider. If v is true, the
// SQLite database will be VACUUMed before being provided. If c is
// true, the SQLite database will be compressed before being provided.
func NewProvider(s *Store, v, c bool) *Provider {
return &Provider{
str: s,
vacuum: v,
compress: c,
nRetries: 10,
retryInterval: 500 * time.Millisecond,
}
@ -37,7 +40,7 @@ func (p *Provider) LastModified() (time.Time, error) {
// Provider writes the SQLite database to the given path. If path exists,
// it will be overwritten.
func (p *Provider) Provide(path string) (t time.Time, retErr error) {
func (p *Provider) Provide(w io.Writer) (t time.Time, retErr error) {
stats.Add(numProviderProvides, 1)
defer func() {
if retErr != nil {
@ -45,18 +48,10 @@ func (p *Provider) Provide(path string) (t time.Time, retErr error) {
}
}()
fd, err := os.Create(path)
if err != nil {
return time.Time{}, err
}
defer fd.Close()
return p.provide(fd)
}
func (p *Provider) provide(w io.Writer) (time.Time, error) {
br := &proto.BackupRequest{
Format: proto.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY,
Vacuum: p.vacuum,
Format: proto.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY,
Vacuum: p.vacuum,
Compress: p.compress,
}
nRetries := 0
for {

@ -1,6 +1,8 @@
package store
import (
"compress/gzip"
"io"
"os"
"testing"
"time"
@ -46,10 +48,11 @@ func Test_SingleNodeProvide(t *testing.T) {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
tempFile := mustCreateTempFile()
defer os.Remove(tempFile)
provider := NewProvider(s0, false)
if _, err := provider.Provide(tempFile); err != nil {
tmpFd := mustCreateTempFD()
defer os.Remove(tmpFd.Name())
defer tmpFd.Close()
provider := NewProvider(s0, false, false)
if _, err := provider.Provide(tmpFd); err != nil {
t.Fatalf("failed to provide SQLite data: %s", err.Error())
}
@ -68,7 +71,89 @@ func Test_SingleNodeProvide(t *testing.T) {
t.Fatalf("Error waiting for leader: %s", err)
}
err = s1.Load(loadRequestFromFile(tempFile))
err = s1.Load(loadRequestFromFile(tmpFd.Name()))
if err != nil {
t.Fatalf("failed to load provided SQLite data: %s", err.Error())
}
qr = queryRequestFromString("SELECT * FROM foo", false, false)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s1.Query(qr)
if err != nil {
t.Fatalf("failed to query leader node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_SingleNodeProvide_Compress(t *testing.T) {
s0, ln := mustNewStore(t)
defer ln.Close()
if err := s0.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
defer s0.Close(true)
if _, err := s0.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
er := executeRequestFromStrings([]string{
`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`,
`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
}, false, false)
_, err := s0.Execute(er)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
qr := queryRequestFromString("SELECT * FROM foo", false, false)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_NONE
r, err := s0.Query(qr)
if err != nil {
t.Fatalf("failed to query leader node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
tmpFd := mustCreateTempFD()
defer os.Remove(tmpFd.Name())
defer tmpFd.Close()
provider := NewProvider(s0, false, true)
if _, err := provider.Provide(tmpFd); err != nil {
t.Fatalf("failed to provide SQLite data: %s", err.Error())
}
unCompressedFile, err := gunzip(tmpFd.Name())
if err != nil {
t.Fatalf("failed to gunzip provided SQLite data: %s", err.Error())
}
// Load the provided data into a new store and check it.
s1, ln := mustNewStore(t)
defer ln.Close()
if err := s1.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s1.Bootstrap(NewServer(s1.ID(), s1.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
defer s1.Close(true)
if _, err := s1.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
err = s1.Load(loadRequestFromFile(unCompressedFile))
if err != nil {
t.Fatalf("failed to load provided SQLite data: %s", err.Error())
}
@ -103,10 +188,11 @@ func Test_SingleNodeProvideNoData(t *testing.T) {
t.Fatalf("Error waiting for leader: %s", err)
}
tmpFile := mustCreateTempFile()
defer os.Remove(tmpFile)
provider := NewProvider(s, false)
if _, err := provider.Provide(tmpFile); err != nil {
tmpFd := mustCreateTempFD()
defer os.Remove(tmpFd.Name())
defer tmpFd.Close()
provider := NewProvider(s, false, false)
if _, err := provider.Provide(tmpFd); err != nil {
t.Fatalf("store failed to provide: %s", err.Error())
}
}
@ -128,7 +214,7 @@ func Test_SingleNodeProvideLastModified(t *testing.T) {
tmpFile := mustCreateTempFile()
defer os.Remove(tmpFile)
provider := NewProvider(s, false)
provider := NewProvider(s, false, false)
lm, err := provider.LastModified()
if err != nil {
@ -222,3 +308,27 @@ func Test_SingleNodeProvideLastModified(t *testing.T) {
t.Fatalf("last modified time should have changed")
}
}
func gunzip(file string) (string, error) {
f, err := os.Open(file)
if err != nil {
return "", err
}
defer f.Close()
gz, err := gzip.NewReader(f)
if err != nil {
return "", err
}
defer gz.Close()
tmpFd := mustCreateTempFD()
defer tmpFd.Close()
_, err = io.Copy(tmpFd, gz)
if err != nil {
return "", err
}
return tmpFd.Name(), nil
}

Loading…
Cancel
Save