From 0d81e6f43833abb93b0d60b1675c0bf854aa1dcc Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 10 Jan 2024 19:01:09 -0500 Subject: [PATCH] Fetch sums from S3 and compare --- CHANGELOG.md | 1 + auto/backup/uploader.go | 17 +++++++++++++++++ auto/backup/uploader_test.go | 10 +++++++++- aws/s3.go | 31 ++++++++++++++++++++++++++++++- 4 files changed, 57 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7531334a..a52e9984 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 8.16.4 (unreleased) ### Implementation changes and bug fixes - [PR #1584](https://github.com/rqlite/rqlite/pull/1584): Count Snapshot reaping failures. +- [PR #1585](https://github.com/rqlite/rqlite/pull/1585): Avoid unnecessary auto-backups by storing sha256 sums in S3. ## 8.16.3 (January 9th 2024) ### Implementation changes and bug fixes diff --git a/auto/backup/uploader.go b/auto/backup/uploader.go index 70b04b10..be7cddeb 100644 --- a/auto/backup/uploader.go +++ b/auto/backup/uploader.go @@ -1,6 +1,7 @@ package backup import ( + "bytes" "compress/gzip" "context" "expvar" @@ -16,6 +17,9 @@ import ( // StorageClient is an interface for uploading data to a storage service. type StorageClient interface { Upload(ctx context.Context, reader io.Reader, sum []byte) error + + LastSum(ctx context.Context) ([]byte, error) + fmt.Stringer } @@ -73,6 +77,7 @@ type Uploader struct { lastUploadTime time.Time lastUploadDuration time.Duration + lastSum []byte // The SHA256 sum of the data most-recently uploaded. lastModified time.Time // The last-modified time of the data most-recently uploaded. } @@ -125,6 +130,7 @@ func (u *Uploader) Stats() (map[string]interface{}, error) { "compress": u.compress, "last_upload_time": u.lastUploadTime.Format(time.RFC3339), "last_upload_duration": u.lastUploadDuration.String(), + "last_sum": fmt.Sprintf("%x", u.lastSum), "last_modified": u.lastModified.String(), } return status, nil @@ -170,12 +176,23 @@ func (u *Uploader) upload(ctx context.Context) error { return err } + if u.lastSum == nil { + ls, err := u.storageClient.LastSum(ctx) + if err == nil { + u.lastSum = ls + } + } else if bytes.Equal(u.lastSum, sum) { + stats.Add(numUploadsSkipped, 1) + return nil + } + cr := progress.NewCountingReader(fd) startTime := time.Now() err = u.storageClient.Upload(ctx, cr, sum) if err != nil { stats.Add(numUploadsFail, 1) } else { + u.lastSum = sum u.lastModified = lm stats.Add(numUploadsOK, 1) stats.Add(totalUploadBytes, cr.Count()) diff --git a/auto/backup/uploader_test.go b/auto/backup/uploader_test.go index 92f4e0fe..56f8377e 100644 --- a/auto/backup/uploader_test.go +++ b/auto/backup/uploader_test.go @@ -287,7 +287,8 @@ func Test_UploaderStats(t *testing.T) { } type mockStorageClient struct { - uploadFn func(ctx context.Context, reader io.Reader, sum []byte) error + uploadFn func(ctx context.Context, reader io.Reader, sum []byte) error + lastSumFn func(ctx context.Context) ([]byte, error) } func (mc *mockStorageClient) Upload(ctx context.Context, reader io.Reader, sum []byte) error { @@ -297,6 +298,13 @@ func (mc *mockStorageClient) Upload(ctx context.Context, reader io.Reader, sum [ return nil } +func (mc *mockStorageClient) LastSum(ctx context.Context) ([]byte, error) { + if mc.lastSumFn != nil { + return mc.lastSumFn(ctx) + } + return nil, nil +} + func (mc *mockStorageClient) String() string { return "mockStorageClient" } diff --git a/aws/s3.go b/aws/s3.go index 8ed53d8a..3c74506f 100644 --- a/aws/s3.go +++ b/aws/s3.go @@ -2,6 +2,7 @@ package aws import ( "context" + "encoding/hex" "fmt" "io" "strings" @@ -13,6 +14,10 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" ) +const ( + AWSS3SumKey = "x-rqlite-sum" +) + // S3Config is the subconfig for the S3 storage type type S3Config struct { Endpoint string `json:"endpoint,omitempty"` @@ -89,7 +94,7 @@ func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) err if sum != nil { input.Metadata = map[string]*string{ - "x-rqlite-sum": aws.String(fmt.Sprintf("%x", sum)), + AWSS3SumKey: aws.String(fmt.Sprintf("%x", sum)), } } _, err = uploader.UploadWithContext(ctx, input) @@ -100,6 +105,30 @@ func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) err return nil } +func (s *S3Client) LastSum(ctx context.Context) ([]byte, error) { + sess, err := s.createSession() + if err != nil { + return nil, err + } + + svc := s3.New(sess) + input := &s3.HeadObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(s.key), + } + + result, err := svc.HeadObjectWithContext(ctx, input) + if err != nil { + return nil, fmt.Errorf("failed to get object head for %v: %w", s, err) + } + + sumHex, ok := result.Metadata[AWSS3SumKey] + if !ok { + return nil, fmt.Errorf("sum metadata not found for %v", s) + } + return hex.DecodeString(*sumHex) +} + // Download downloads data from S3. func (s *S3Client) Download(ctx context.Context, writer io.WriterAt) error { sess, err := s.createSession()