1
0
Fork 0

Fix AWS client and Store Provider

master
Philip O'Toole 8 months ago
parent f324e6b404
commit 68521d34a2

@ -2,7 +2,6 @@ package aws
import ( import (
"context" "context"
"encoding/hex"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -16,7 +15,7 @@ import (
) )
var ( var (
AWSS3SumKey = http.CanonicalHeaderKey("x-rqlite-auto-backup-sum") AWSS3IDKey = http.CanonicalHeaderKey("x-rqlite-auto-backup-id")
) )
// S3Config is the subconfig for the S3 storage type // S3Config is the subconfig for the S3 storage type
@ -73,7 +72,7 @@ func (s *S3Client) String() string {
} }
// Upload uploads data to S3. // Upload uploads data to S3.
func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) error { func (s *S3Client) Upload(ctx context.Context, reader io.Reader, id string) error {
sess, err := s.createSession() sess, err := s.createSession()
if err != nil { if err != nil {
return err return err
@ -93,9 +92,9 @@ func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) err
Body: reader, Body: reader,
} }
if sum != nil { if id != "" {
input.Metadata = map[string]*string{ input.Metadata = map[string]*string{
AWSS3SumKey: aws.String(hex.EncodeToString(sum)), AWSS3IDKey: aws.String(id),
} }
} }
_, err = uploader.UploadWithContext(ctx, input) _, err = uploader.UploadWithContext(ctx, input)
@ -106,12 +105,11 @@ func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) err
return nil return nil
} }
// CurrentSum returns the last SHA256 sum uploaded to S3. It is always // CurrentID returns the last ID uploaded to S3.
// read from S3 and never cached. func (s *S3Client) CurrentID(ctx context.Context) (string, error) {
func (s *S3Client) CurrentSum(ctx context.Context) ([]byte, error) {
sess, err := s.createSession() sess, err := s.createSession()
if err != nil { if err != nil {
return nil, err return "", err
} }
svc := s3.New(sess) svc := s3.New(sess)
@ -122,14 +120,14 @@ func (s *S3Client) CurrentSum(ctx context.Context) ([]byte, error) {
result, err := svc.HeadObjectWithContext(ctx, input) result, err := svc.HeadObjectWithContext(ctx, input)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get object head for %v: %w", s, err) return "", fmt.Errorf("failed to get object head for %v: %w", s, err)
} }
sumHex, ok := result.Metadata[AWSS3SumKey] id, ok := result.Metadata[AWSS3IDKey]
if !ok { if !ok {
return nil, fmt.Errorf("sum metadata not found for %v", s) return "", fmt.Errorf("sum metadata not found for %v", s)
} }
return hex.DecodeString(*sumHex) return *id, nil
} }
// Download downloads data from S3. // Download downloads data from S3.

@ -87,7 +87,7 @@ func TestS3ClientUploadOK(t *testing.T) {
if input.Metadata == nil { if input.Metadata == nil {
t.Errorf("expected metadata to be non-nil") t.Errorf("expected metadata to be non-nil")
} }
exp, got := "736f6d652d7368613235362d73756d", *input.Metadata[http.CanonicalHeaderKey(AWSS3SumKey)] exp, got := "some-id", *input.Metadata[http.CanonicalHeaderKey(AWSS3IDKey)]
if exp != got { if exp != got {
t.Errorf("expected metadata to contain %q, got %q", exp, got) t.Errorf("expected metadata to contain %q, got %q", exp, got)
} }
@ -106,7 +106,7 @@ func TestS3ClientUploadOK(t *testing.T) {
} }
reader := strings.NewReader("test data") reader := strings.NewReader("test data")
err := client.Upload(context.Background(), reader, []byte("some-sha256-sum")) err := client.Upload(context.Background(), reader, "some-id")
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -138,7 +138,7 @@ func TestS3ClientUploadFail(t *testing.T) {
} }
reader := strings.NewReader("test data") reader := strings.NewReader("test data")
err := client.Upload(context.Background(), reader, nil) err := client.Upload(context.Background(), reader, "")
if err == nil { if err == nil {
t.Fatal("Expected error, got nil") t.Fatal("Expected error, got nil")
} }

@ -31,16 +31,16 @@ func NewProvider(s *Store, v, c bool) *Provider {
} }
} }
// LastModified returns the time the data managed by the Provider was // LastIndex returns the cluster-wide index the data managed by the DataProvider was
// last modified. // last modified by.
func (p *Provider) LastModified() (time.Time, error) { func (p *Provider) LastIndex() (uint64, error) {
stats.Add(numProviderChecks, 1) stats.Add(numProviderChecks, 1)
return p.str.db.LastModified() return p.str.DBAppliedIndex(), nil
} }
// Provider writes the SQLite database to the given path. If path exists, // Provider writes the SQLite database to the given path. If path exists,
// it will be overwritten. // it will be overwritten.
func (p *Provider) Provide(w io.Writer) (t time.Time, retErr error) { func (p *Provider) Provide(w io.Writer) (retErr error) {
stats.Add(numProviderProvides, 1) stats.Add(numProviderProvides, 1)
defer func() { defer func() {
if retErr != nil { if retErr != nil {
@ -62,8 +62,8 @@ func (p *Provider) Provide(w io.Writer) (t time.Time, retErr error) {
time.Sleep(p.retryInterval) time.Sleep(p.retryInterval)
nRetries++ nRetries++
if nRetries > p.nRetries { if nRetries > p.nRetries {
return time.Time{}, err return err
} }
} }
return p.str.db.LastModified() return nil
} }

@ -50,7 +50,7 @@ func test_SingleNodeProvide(t *testing.T, vaccuum, compress bool) {
defer os.Remove(tmpFd.Name()) defer os.Remove(tmpFd.Name())
defer tmpFd.Close() defer tmpFd.Close()
provider := NewProvider(s0, vaccuum, compress) provider := NewProvider(s0, vaccuum, compress)
if _, err := provider.Provide(tmpFd); err != nil { if err := provider.Provide(tmpFd); err != nil {
t.Fatalf("failed to provide SQLite data: %s", err.Error()) t.Fatalf("failed to provide SQLite data: %s", err.Error())
} }
@ -113,7 +113,7 @@ func Test_SingleNodeProvide(t *testing.T) {
}) })
} }
func Test_SingleNodeProvideLastModified(t *testing.T) { func Test_SingleNodeProvideLastIndex(t *testing.T) {
s, ln := mustNewStore(t) s, ln := mustNewStore(t)
defer ln.Close() defer ln.Close()
@ -132,7 +132,7 @@ func Test_SingleNodeProvideLastModified(t *testing.T) {
defer os.Remove(tmpFile) defer os.Remove(tmpFile)
provider := NewProvider(s, false, false) provider := NewProvider(s, false, false)
lm, err := provider.LastModified() lm, err := provider.LastIndex()
if err != nil { if err != nil {
t.Fatalf("failed to get last modified: %s", err.Error()) t.Fatalf("failed to get last modified: %s", err.Error())
} }
@ -149,14 +149,14 @@ func Test_SingleNodeProvideLastModified(t *testing.T) {
t.Fatalf("failed to wait for FSM to apply") t.Fatalf("failed to wait for FSM to apply")
} }
newLM, err := provider.LastModified() newLI, err := provider.LastIndex()
if err != nil { if err != nil {
t.Fatalf("failed to get last modified: %s", err.Error()) t.Fatalf("failed to get last modified: %s", err.Error())
} }
if !newLM.After(lm) { if newLI <= lm {
t.Fatalf("last modified time should have changed") t.Fatalf("last index should have changed")
} }
lm = newLM lm = newLI
// Try various queries and commands which should not change the database. // Try various queries and commands which should not change the database.
qr := queryRequestFromString("SELECT * FROM foo", false, false) qr := queryRequestFromString("SELECT * FROM foo", false, false)
@ -168,26 +168,26 @@ func Test_SingleNodeProvideLastModified(t *testing.T) {
if _, err := s.WaitForAppliedFSM(2 * time.Second); err != nil { if _, err := s.WaitForAppliedFSM(2 * time.Second); err != nil {
t.Fatalf("failed to wait for FSM to apply") t.Fatalf("failed to wait for FSM to apply")
} }
newLM, err = provider.LastModified() newLI, err = provider.LastIndex()
if err != nil { if err != nil {
t.Fatalf("failed to get last modified: %s", err.Error()) t.Fatalf("failed to get last modified: %s", err.Error())
} }
if !newLM.Equal(lm) { if newLI != lm {
t.Fatalf("last modified time should not have changed") t.Fatalf("last index should not have changed")
} }
lm = newLM lm = newLI
if af, err := s.Noop("don't care"); err != nil || af.Error() != nil { if af, err := s.Noop("don't care"); err != nil || af.Error() != nil {
t.Fatalf("failed to execute Noop") t.Fatalf("failed to execute Noop")
} }
newLM, err = provider.LastModified() newLI, err = provider.LastIndex()
if err != nil { if err != nil {
t.Fatalf("failed to get last modified: %s", err.Error()) t.Fatalf("failed to get last modified: %s", err.Error())
} }
if !newLM.Equal(lm) { if newLI != lm {
t.Fatalf("last modified time should not have changed") t.Fatalf("last index should not have changed")
} }
lm = newLM lm = newLI
er = executeRequestFromStrings([]string{ er = executeRequestFromStrings([]string{
`INSERT INTO foo(id, name) VALUES(1, "fiona")`, // Constraint violation. `INSERT INTO foo(id, name) VALUES(1, "fiona")`, // Constraint violation.
@ -196,14 +196,14 @@ func Test_SingleNodeProvideLastModified(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error()) t.Fatalf("failed to execute on single node: %s", err.Error())
} }
newLM, err = provider.LastModified() newLI, err = provider.LastIndex()
if err != nil { if err != nil {
t.Fatalf("failed to get last modified: %s", err.Error()) t.Fatalf("failed to get last modified: %s", err.Error())
} }
if !newLM.Equal(lm) { if newLI != lm {
t.Fatalf("last modified time should not have changed with constraint violation") t.Fatalf("last index should not have changed with constraint violation")
} }
lm = newLM lm = newLI
// This should change the database. // This should change the database.
er = executeRequestFromStrings([]string{ er = executeRequestFromStrings([]string{
@ -216,12 +216,12 @@ func Test_SingleNodeProvideLastModified(t *testing.T) {
if _, err := s.WaitForAppliedFSM(2 * time.Second); err != nil { if _, err := s.WaitForAppliedFSM(2 * time.Second); err != nil {
t.Fatalf("failed to wait for FSM to apply") t.Fatalf("failed to wait for FSM to apply")
} }
newLM, err = provider.LastModified() newLI, err = provider.LastIndex()
if err != nil { if err != nil {
t.Fatalf("failed to get last modified: %s", err.Error()) t.Fatalf("failed to get last modified: %s", err.Error())
} }
if !newLM.After(lm) { if newLI <= lm {
t.Fatalf("last modified time should have changed") t.Fatalf("last index should have changed")
} }
} }

Loading…
Cancel
Save