1
0
Fork 0

Initial framework for setting sums

master
Philip O'Toole 8 months ago
parent 54622f423a
commit 58fbe3ee81

@ -15,7 +15,7 @@ import (
// StorageClient is an interface for uploading data to a storage service.
type StorageClient interface {
Upload(ctx context.Context, reader io.Reader) error
Upload(ctx context.Context, reader io.Reader, sum []byte) error
fmt.Stringer
}
@ -73,7 +73,7 @@ type Uploader struct {
lastUploadTime time.Time
lastUploadDuration time.Duration
lastModified time.Time
lastModified time.Time // The last-modified time of the data most-recently uploaded.
}
// NewUploader creates a new Uploader service.
@ -165,9 +165,14 @@ func (u *Uploader) upload(ctx context.Context) error {
}
defer fd.Close()
sum, err := FileSHA256(filetoUpload)
if err != nil {
return err
}
cr := progress.NewCountingReader(fd)
startTime := time.Now()
err = u.storageClient.Upload(ctx, cr)
err = u.storageClient.Upload(ctx, cr, sum)
if err != nil {
stats.Add(numUploadsFail, 1)
} else {

@ -38,7 +38,7 @@ func Test_UploaderSingleUpload(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
sc := &mockStorageClient{
uploadFn: func(ctx context.Context, reader io.Reader) error {
uploadFn: func(ctx context.Context, reader io.Reader, sum []byte) error {
defer wg.Done()
uploadedData, err = io.ReadAll(reader)
return err
@ -68,7 +68,7 @@ func Test_UploaderSingleUploadCompress(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
sc := &mockStorageClient{
uploadFn: func(ctx context.Context, reader io.Reader) error {
uploadFn: func(ctx context.Context, reader io.Reader, sum []byte) error {
defer wg.Done()
// Wrap a gzip reader about the reader, to ensure the data is compressed.
@ -108,7 +108,7 @@ func Test_UploaderDoubleUpload(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
sc := &mockStorageClient{
uploadFn: func(ctx context.Context, reader io.Reader) error {
uploadFn: func(ctx context.Context, reader io.Reader, sum []byte) error {
defer wg.Done()
uploadedData = nil // Wipe out any previous state.
uploadedData, err = io.ReadAll(reader)
@ -138,7 +138,7 @@ func Test_UploaderFailThenOK(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
sc := &mockStorageClient{
uploadFn: func(ctx context.Context, reader io.Reader) error {
uploadFn: func(ctx context.Context, reader io.Reader, sum []byte) error {
defer wg.Done()
if uploadCount == 0 {
uploadCount++
@ -172,7 +172,7 @@ func Test_UploaderOKThenFail(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
sc := &mockStorageClient{
uploadFn: func(ctx context.Context, reader io.Reader) error {
uploadFn: func(ctx context.Context, reader io.Reader, sum []byte) error {
defer wg.Done()
if uploadCount == 1 {
@ -203,7 +203,7 @@ func Test_UploaderContextCancellation(t *testing.T) {
var uploadCount int32
sc := &mockStorageClient{
uploadFn: func(ctx context.Context, reader io.Reader) error {
uploadFn: func(ctx context.Context, reader io.Reader, sum []byte) error {
atomic.AddInt32(&uploadCount, 1)
return nil
},
@ -246,7 +246,7 @@ func Test_UploaderEnabledTrue(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
sc := &mockStorageClient{
uploadFn: func(ctx context.Context, reader io.Reader) error {
uploadFn: func(ctx context.Context, reader io.Reader, sum []byte) error {
defer wg.Done()
uploadedData, err = io.ReadAll(reader)
return err
@ -287,12 +287,12 @@ func Test_UploaderStats(t *testing.T) {
}
type mockStorageClient struct {
uploadFn func(ctx context.Context, reader io.Reader) error
uploadFn func(ctx context.Context, reader io.Reader, sum []byte) error
}
func (mc *mockStorageClient) Upload(ctx context.Context, reader io.Reader) error {
func (mc *mockStorageClient) Upload(ctx context.Context, reader io.Reader, sum []byte) error {
if mc.uploadFn != nil {
return mc.uploadFn(ctx, reader)
return mc.uploadFn(ctx, reader, sum)
}
return nil
}

@ -67,7 +67,7 @@ func (s *S3Client) String() string {
}
// Upload uploads data to S3.
func (s *S3Client) Upload(ctx context.Context, reader io.Reader) error {
func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) error {
sess, err := s.createSession()
if err != nil {
return err
@ -81,11 +81,18 @@ func (s *S3Client) Upload(ctx context.Context, reader io.Reader) error {
uploader = s.uploader
}
_, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
input := &s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.key),
Body: reader,
})
}
if sum != nil {
input.Metadata = map[string]*string{
"x-rqlite-sum": aws.String(fmt.Sprintf("%x", sum)),
}
}
_, err = uploader.UploadWithContext(ctx, input)
if err != nil {
return fmt.Errorf("failed to upload to %v: %w", s, err)
}

@ -98,7 +98,7 @@ func TestS3ClientUploadOK(t *testing.T) {
}
reader := strings.NewReader("test data")
err := client.Upload(context.Background(), reader)
err := client.Upload(context.Background(), reader, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -130,7 +130,7 @@ func TestS3ClientUploadFail(t *testing.T) {
}
reader := strings.NewReader("test data")
err := client.Upload(context.Background(), reader)
err := client.Upload(context.Background(), reader, nil)
if err == nil {
t.Fatal("Expected error, got nil")
}

Loading…
Cancel
Save