diff --git a/aws/s3.go b/aws/s3.go index 0f44a86d..b632dbeb 100644 --- a/aws/s3.go +++ b/aws/s3.go @@ -2,7 +2,6 @@ package aws import ( "context" - "encoding/hex" "fmt" "io" "net/http" @@ -16,7 +15,7 @@ import ( ) var ( - AWSS3SumKey = http.CanonicalHeaderKey("x-rqlite-auto-backup-sum") + AWSS3IDKey = http.CanonicalHeaderKey("x-rqlite-auto-backup-id") ) // S3Config is the subconfig for the S3 storage type @@ -73,7 +72,7 @@ func (s *S3Client) String() string { } // Upload uploads data to S3. -func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) error { +func (s *S3Client) Upload(ctx context.Context, reader io.Reader, id string) error { sess, err := s.createSession() if err != nil { return err @@ -93,9 +92,9 @@ func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) err Body: reader, } - if sum != nil { + if id != "" { input.Metadata = map[string]*string{ - AWSS3SumKey: aws.String(hex.EncodeToString(sum)), + AWSS3IDKey: aws.String(id), } } _, err = uploader.UploadWithContext(ctx, input) @@ -106,12 +105,11 @@ func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) err return nil } -// CurrentSum returns the last SHA256 sum uploaded to S3. It is always -// read from S3 and never cached. -func (s *S3Client) CurrentSum(ctx context.Context) ([]byte, error) { +// CurrentID returns the last ID uploaded to S3. +func (s *S3Client) CurrentID(ctx context.Context) (string, error) { sess, err := s.createSession() if err != nil { - return nil, err + return "", err } svc := s3.New(sess) @@ -122,14 +120,14 @@ func (s *S3Client) CurrentSum(ctx context.Context) ([]byte, error) { result, err := svc.HeadObjectWithContext(ctx, input) if err != nil { - return nil, fmt.Errorf("failed to get object head for %v: %w", s, err) + return "", fmt.Errorf("failed to get object head for %v: %w", s, err) } - sumHex, ok := result.Metadata[AWSS3SumKey] + id, ok := result.Metadata[AWSS3IDKey] if !ok { - return nil, fmt.Errorf("sum metadata not found for %v", s) + return "", fmt.Errorf("sum metadata not found for %v", s) } - return hex.DecodeString(*sumHex) + return *id, nil } // Download downloads data from S3. diff --git a/aws/s3_test.go b/aws/s3_test.go index cee1dad5..95b6350c 100644 --- a/aws/s3_test.go +++ b/aws/s3_test.go @@ -87,7 +87,7 @@ func TestS3ClientUploadOK(t *testing.T) { if input.Metadata == nil { t.Errorf("expected metadata to be non-nil") } - exp, got := "736f6d652d7368613235362d73756d", *input.Metadata[http.CanonicalHeaderKey(AWSS3SumKey)] + exp, got := "some-id", *input.Metadata[http.CanonicalHeaderKey(AWSS3IDKey)] if exp != got { t.Errorf("expected metadata to contain %q, got %q", exp, got) } @@ -106,7 +106,7 @@ func TestS3ClientUploadOK(t *testing.T) { } reader := strings.NewReader("test data") - err := client.Upload(context.Background(), reader, []byte("some-sha256-sum")) + err := client.Upload(context.Background(), reader, "some-id") if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -138,7 +138,7 @@ func TestS3ClientUploadFail(t *testing.T) { } reader := strings.NewReader("test data") - err := client.Upload(context.Background(), reader, nil) + err := client.Upload(context.Background(), reader, "") if err == nil { t.Fatal("Expected error, got nil") } diff --git a/store/provider.go b/store/provider.go index 02c19e50..3b1117e9 100644 --- a/store/provider.go +++ b/store/provider.go @@ -31,16 +31,16 @@ func NewProvider(s *Store, v, c bool) *Provider { } } -// LastModified returns the time the data managed by the Provider was -// last modified. -func (p *Provider) LastModified() (time.Time, error) { +// LastIndex returns the cluster-wide index the data managed by the DataProvider was +// last modified by. +func (p *Provider) LastIndex() (uint64, error) { stats.Add(numProviderChecks, 1) - return p.str.db.LastModified() + return p.str.DBAppliedIndex(), nil } // Provider writes the SQLite database to the given path. If path exists, // it will be overwritten. -func (p *Provider) Provide(w io.Writer) (t time.Time, retErr error) { +func (p *Provider) Provide(w io.Writer) (retErr error) { stats.Add(numProviderProvides, 1) defer func() { if retErr != nil { @@ -62,8 +62,8 @@ func (p *Provider) Provide(w io.Writer) (t time.Time, retErr error) { time.Sleep(p.retryInterval) nRetries++ if nRetries > p.nRetries { - return time.Time{}, err + return err } } - return p.str.db.LastModified() + return nil } diff --git a/store/provider_test.go b/store/provider_test.go index afe2148d..c112d2ba 100644 --- a/store/provider_test.go +++ b/store/provider_test.go @@ -50,7 +50,7 @@ func test_SingleNodeProvide(t *testing.T, vaccuum, compress bool) { defer os.Remove(tmpFd.Name()) defer tmpFd.Close() provider := NewProvider(s0, vaccuum, compress) - if _, err := provider.Provide(tmpFd); err != nil { + if err := provider.Provide(tmpFd); err != nil { t.Fatalf("failed to provide SQLite data: %s", err.Error()) } @@ -113,7 +113,7 @@ func Test_SingleNodeProvide(t *testing.T) { }) } -func Test_SingleNodeProvideLastModified(t *testing.T) { +func Test_SingleNodeProvideLastIndex(t *testing.T) { s, ln := mustNewStore(t) defer ln.Close() @@ -132,7 +132,7 @@ func Test_SingleNodeProvideLastModified(t *testing.T) { defer os.Remove(tmpFile) provider := NewProvider(s, false, false) - lm, err := provider.LastModified() + lm, err := provider.LastIndex() if err != nil { t.Fatalf("failed to get last modified: %s", err.Error()) } @@ -149,14 +149,14 @@ func Test_SingleNodeProvideLastModified(t *testing.T) { t.Fatalf("failed to wait for FSM to apply") } - newLM, err := provider.LastModified() + newLI, err := provider.LastIndex() if err != nil { t.Fatalf("failed to get last modified: %s", err.Error()) } - if !newLM.After(lm) { - t.Fatalf("last modified time should have changed") + if newLI <= lm { + t.Fatalf("last index should have changed") } - lm = newLM + lm = newLI // Try various queries and commands which should not change the database. qr := queryRequestFromString("SELECT * FROM foo", false, false) @@ -168,26 +168,26 @@ func Test_SingleNodeProvideLastModified(t *testing.T) { if _, err := s.WaitForAppliedFSM(2 * time.Second); err != nil { t.Fatalf("failed to wait for FSM to apply") } - newLM, err = provider.LastModified() + newLI, err = provider.LastIndex() if err != nil { t.Fatalf("failed to get last modified: %s", err.Error()) } - if !newLM.Equal(lm) { - t.Fatalf("last modified time should not have changed") + if newLI != lm { + t.Fatalf("last index should not have changed") } - lm = newLM + lm = newLI if af, err := s.Noop("don't care"); err != nil || af.Error() != nil { t.Fatalf("failed to execute Noop") } - newLM, err = provider.LastModified() + newLI, err = provider.LastIndex() if err != nil { t.Fatalf("failed to get last modified: %s", err.Error()) } - if !newLM.Equal(lm) { - t.Fatalf("last modified time should not have changed") + if newLI != lm { + t.Fatalf("last index should not have changed") } - lm = newLM + lm = newLI er = executeRequestFromStrings([]string{ `INSERT INTO foo(id, name) VALUES(1, "fiona")`, // Constraint violation. @@ -196,14 +196,14 @@ func Test_SingleNodeProvideLastModified(t *testing.T) { if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } - newLM, err = provider.LastModified() + newLI, err = provider.LastIndex() if err != nil { t.Fatalf("failed to get last modified: %s", err.Error()) } - if !newLM.Equal(lm) { - t.Fatalf("last modified time should not have changed with constraint violation") + if newLI != lm { + t.Fatalf("last index should not have changed with constraint violation") } - lm = newLM + lm = newLI // This should change the database. er = executeRequestFromStrings([]string{ @@ -216,12 +216,12 @@ func Test_SingleNodeProvideLastModified(t *testing.T) { if _, err := s.WaitForAppliedFSM(2 * time.Second); err != nil { t.Fatalf("failed to wait for FSM to apply") } - newLM, err = provider.LastModified() + newLI, err = provider.LastIndex() if err != nil { t.Fatalf("failed to get last modified: %s", err.Error()) } - if !newLM.After(lm) { - t.Fatalf("last modified time should have changed") + if newLI <= lm { + t.Fatalf("last index should have changed") } }