1
0
Fork 0

Move to move efficient index-checking for upload

master
Philip O'Toole 9 months ago
parent ca03ad3f1e
commit 7dcfea235c

@ -23,6 +23,14 @@ type StorageClient interface {
// service will call Provide() to have the data-for-upload to be written to the
// to the file specified by path.
type DataProvider interface {
// Check returns true if data in the DataProvider has changed since the
// last time Check() was called with the given value of i. Check() also
// returns the current value of i, which should be passed to the next
// invocation of Check(). If Check() returns false, the returned value of
// can be ignored.
Check(i uint64) (uint64, bool)
// Provide writes the data-for-upload to the file specified by path.
Provide(path string) error
}
@ -66,11 +74,7 @@ type Uploader struct {
lastUploadTime time.Time
lastUploadDuration time.Duration
lastSum SHA256Sum
// disableSumCheck is used for testing purposes to disable the check that
// prevents uploading the same data twice.
disableSumCheck bool
lastI uint64
}
// NewUploader creates a new Uploader service.
@ -103,10 +107,6 @@ func (u *Uploader) Start(ctx context.Context, isUploadEnabled func() bool) chan
return
case <-ticker.C:
if !isUploadEnabled() {
// Reset the lastSum so that the next time we're enabled upload will
// happen. We do this to be conservative, as we don't know what was
// happening while upload was disabled.
u.lastSum = nil
continue
}
if err := u.upload(ctx); err != nil {
@ -126,7 +126,7 @@ func (u *Uploader) Stats() (map[string]interface{}, error) {
"compress": u.compress,
"last_upload_time": u.lastUploadTime.Format(time.RFC3339),
"last_upload_duration": u.lastUploadDuration.String(),
"last_upload_sum": u.lastSum.String(),
"last_i": u.lastI,
}
return status, nil
}
@ -139,6 +139,12 @@ func (u *Uploader) upload(ctx context.Context) error {
}
defer os.Remove(filetoUpload)
lastI, changed := u.dataProvider.Check(u.lastI)
if !changed {
stats.Add(numUploadsSkipped, 1)
return nil
}
if err := u.dataProvider.Provide(filetoUpload); err != nil {
return err
}
@ -146,15 +152,6 @@ func (u *Uploader) upload(ctx context.Context) error {
return err
}
sum, err := FileSHA256(filetoUpload)
if err != nil {
return err
}
if !u.disableSumCheck && sum.Equals(u.lastSum) {
stats.Add(numUploadsSkipped, 1)
return nil
}
fd, err := os.Open(filetoUpload)
if err != nil {
return err
@ -167,7 +164,7 @@ func (u *Uploader) upload(ctx context.Context) error {
if err != nil {
stats.Add(numUploadsFail, 1)
} else {
u.lastSum = sum
u.lastI = lastI
stats.Add(numUploadsOK, 1)
stats.Add(totalUploadBytes, cr.Count())
stats.Get(lastUploadBytes).(*expvar.Int).Set(cr.Count())

@ -45,6 +45,12 @@ func Test_UploaderSingleUpload(t *testing.T) {
},
}
dp := &mockDataProvider{data: "my upload data"}
dp.checkFn = func(i uint64) (uint64, bool) {
if i == 0 {
return 1, true
}
return 1, false
}
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background())
@ -79,6 +85,12 @@ func Test_UploaderSingleUploadCompress(t *testing.T) {
},
}
dp := &mockDataProvider{data: "my upload data"}
dp.checkFn = func(i uint64) (uint64, bool) {
if i == 0 {
return 1, true
}
return 1, false
}
uploader := NewUploader(sc, dp, time.Second, UploadCompress)
ctx, cancel := context.WithCancel(context.Background())
@ -108,8 +120,16 @@ func Test_UploaderDoubleUpload(t *testing.T) {
},
}
dp := &mockDataProvider{data: "my upload data"}
dp.checkFn = func(i uint64) (uint64, bool) {
if i == 0 {
return 1, true
}
if i == 1 {
return 2, true
}
return 2, false
}
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
uploader.disableSumCheck = true // Force upload of the same data
ctx, cancel := context.WithCancel(context.Background())
done := uploader.Start(ctx, nil)
@ -133,7 +153,6 @@ func Test_UploaderFailThenOK(t *testing.T) {
sc := &mockStorageClient{
uploadFn: func(ctx context.Context, reader io.Reader) error {
defer wg.Done()
if uploadCount == 0 {
uploadCount++
return fmt.Errorf("failed to upload")
@ -144,6 +163,9 @@ func Test_UploaderFailThenOK(t *testing.T) {
},
}
dp := &mockDataProvider{data: "my upload data"}
dp.checkFn = func(i uint64) (uint64, bool) {
return 1, true
}
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background())
@ -179,8 +201,10 @@ func Test_UploaderOKThenFail(t *testing.T) {
},
}
dp := &mockDataProvider{data: "my upload data"}
dp.checkFn = func(i uint64) (uint64, bool) {
return 1, true
}
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
uploader.disableSumCheck = true // Disable because we want to upload twice.
ctx, cancel := context.WithCancel(context.Background())
done := uploader.Start(ctx, nil)
@ -220,6 +244,9 @@ func Test_UploaderEnabledFalse(t *testing.T) {
ResetStats()
sc := &mockStorageClient{}
dp := &mockDataProvider{data: "my upload data"}
dp.checkFn = func(i uint64) (uint64, bool) {
return 1, true // Upload if asked (which it shouldn't be).
}
uploader := NewUploader(sc, dp, 100*time.Millisecond, false)
ctx, cancel := context.WithCancel(context.Background())
@ -248,6 +275,9 @@ func Test_UploaderEnabledTrue(t *testing.T) {
},
}
dp := &mockDataProvider{data: "my upload data"}
dp.checkFn = func(i uint64) (uint64, bool) {
return 1, true
}
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background())
@ -297,8 +327,16 @@ func (mc *mockStorageClient) String() string {
}
type mockDataProvider struct {
data string
err error
data string
err error
checkFn func(i uint64) (uint64, bool)
}
func (mp *mockDataProvider) Check(i uint64) (uint64, bool) {
if mp.checkFn != nil {
return mp.checkFn(i)
}
return 0, false
}
func (mp *mockDataProvider) Provide(path string) error {

Loading…
Cancel
Save