1
0
Fork 0

Fetch sums from S3 and compare

master
Philip O'Toole 8 months ago
parent 58fbe3ee81
commit 0d81e6f438

@ -1,6 +1,7 @@
## 8.16.4 (unreleased)
### Implementation changes and bug fixes
- [PR #1584](https://github.com/rqlite/rqlite/pull/1584): Count Snapshot reaping failures.
- [PR #1585](https://github.com/rqlite/rqlite/pull/1585): Avoid unnecessary auto-backups by storing sha256 sums in S3.
## 8.16.3 (January 9th 2024)
### Implementation changes and bug fixes

@ -1,6 +1,7 @@
package backup
import (
"bytes"
"compress/gzip"
"context"
"expvar"
@ -16,6 +17,9 @@ import (
// StorageClient is an interface for uploading data to a storage service.
type StorageClient interface {
Upload(ctx context.Context, reader io.Reader, sum []byte) error
LastSum(ctx context.Context) ([]byte, error)
fmt.Stringer
}
@ -73,6 +77,7 @@ type Uploader struct {
lastUploadTime time.Time
lastUploadDuration time.Duration
lastSum []byte // The SHA256 sum of the data most-recently uploaded.
lastModified time.Time // The last-modified time of the data most-recently uploaded.
}
@ -125,6 +130,7 @@ func (u *Uploader) Stats() (map[string]interface{}, error) {
"compress": u.compress,
"last_upload_time": u.lastUploadTime.Format(time.RFC3339),
"last_upload_duration": u.lastUploadDuration.String(),
"last_sum": fmt.Sprintf("%x", u.lastSum),
"last_modified": u.lastModified.String(),
}
return status, nil
@ -170,12 +176,23 @@ func (u *Uploader) upload(ctx context.Context) error {
return err
}
if u.lastSum == nil {
ls, err := u.storageClient.LastSum(ctx)
if err == nil {
u.lastSum = ls
}
} else if bytes.Equal(u.lastSum, sum) {
stats.Add(numUploadsSkipped, 1)
return nil
}
cr := progress.NewCountingReader(fd)
startTime := time.Now()
err = u.storageClient.Upload(ctx, cr, sum)
if err != nil {
stats.Add(numUploadsFail, 1)
} else {
u.lastSum = sum
u.lastModified = lm
stats.Add(numUploadsOK, 1)
stats.Add(totalUploadBytes, cr.Count())

@ -287,7 +287,8 @@ func Test_UploaderStats(t *testing.T) {
}
type mockStorageClient struct {
uploadFn func(ctx context.Context, reader io.Reader, sum []byte) error
uploadFn func(ctx context.Context, reader io.Reader, sum []byte) error
lastSumFn func(ctx context.Context) ([]byte, error)
}
func (mc *mockStorageClient) Upload(ctx context.Context, reader io.Reader, sum []byte) error {
@ -297,6 +298,13 @@ func (mc *mockStorageClient) Upload(ctx context.Context, reader io.Reader, sum [
return nil
}
func (mc *mockStorageClient) LastSum(ctx context.Context) ([]byte, error) {
if mc.lastSumFn != nil {
return mc.lastSumFn(ctx)
}
return nil, nil
}
func (mc *mockStorageClient) String() string {
return "mockStorageClient"
}

@ -2,6 +2,7 @@ package aws
import (
"context"
"encoding/hex"
"fmt"
"io"
"strings"
@ -13,6 +14,10 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
const (
AWSS3SumKey = "x-rqlite-sum"
)
// S3Config is the subconfig for the S3 storage type
type S3Config struct {
Endpoint string `json:"endpoint,omitempty"`
@ -89,7 +94,7 @@ func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) err
if sum != nil {
input.Metadata = map[string]*string{
"x-rqlite-sum": aws.String(fmt.Sprintf("%x", sum)),
AWSS3SumKey: aws.String(fmt.Sprintf("%x", sum)),
}
}
_, err = uploader.UploadWithContext(ctx, input)
@ -100,6 +105,30 @@ func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) err
return nil
}
func (s *S3Client) LastSum(ctx context.Context) ([]byte, error) {
sess, err := s.createSession()
if err != nil {
return nil, err
}
svc := s3.New(sess)
input := &s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.key),
}
result, err := svc.HeadObjectWithContext(ctx, input)
if err != nil {
return nil, fmt.Errorf("failed to get object head for %v: %w", s, err)
}
sumHex, ok := result.Metadata[AWSS3SumKey]
if !ok {
return nil, fmt.Errorf("sum metadata not found for %v", s)
}
return hex.DecodeString(*sumHex)
}
// Download downloads data from S3.
func (s *S3Client) Download(ctx context.Context, writer io.WriterAt) error {
sess, err := s.createSession()

Loading…
Cancel
Save