1
0
Fork 0

Add Enabled checker to uploader

This will be used to ensure the upload only runs on the Leader.
master
Philip O'Toole 1 year ago
parent ae97060f2c
commit 4903d48d51

@ -70,7 +70,7 @@ func NewUploader(storageClient StorageClient, dataProvider DataProvider, interva
}
// Start starts the Uploader service.
func (u *Uploader) Start(ctx context.Context) {
func (u *Uploader) Start(ctx context.Context, enabled func() bool) {
ticker := time.NewTicker(u.interval)
defer ticker.Stop()
@ -79,6 +79,9 @@ func (u *Uploader) Start(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
if enabled != nil && !enabled() {
continue
}
if err := u.upload(ctx); err != nil {
u.logger.Printf("failed to upload to %s: %v", u.storageClient, err)
}

@ -46,7 +46,7 @@ func Test_UploaderSingleUpload(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx)
go uploader.Start(ctx, nil)
wg.Wait()
cancel()
@ -89,7 +89,7 @@ func Test_UploaderDoubleUpload(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx)
go uploader.Start(ctx, nil)
wg.Wait()
cancel()
@ -138,7 +138,7 @@ func Test_UploaderFailThenOK(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx)
go uploader.Start(ctx, nil)
wg.Wait()
cancel()
@ -185,7 +185,7 @@ func Test_UploaderOKThenFail(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx)
go uploader.Start(ctx, nil)
wg.Wait()
cancel()
@ -207,7 +207,7 @@ func Test_UploaderContextCancellation(t *testing.T) {
uploader := NewUploader(sc, dp, time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
go uploader.Start(ctx)
go uploader.Start(ctx, nil)
<-ctx.Done()
cancel()
@ -216,6 +216,50 @@ func Test_UploaderContextCancellation(t *testing.T) {
}
}
func Test_UploaderEnabledFalse(t *testing.T) {
ResetStats()
sc := &mockStorageClient{}
dp := &mockDataProvider{data: "my upload data"}
uploader := NewUploader(sc, dp, 100*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, func() bool { return false })
time.Sleep(time.Second)
defer cancel()
if exp, got := int64(0), stats.Get(numUploadsOK).(*expvar.Int); exp != got.Value() {
t.Errorf("expected numUploadsOK to be %d, got %d", exp, got)
}
}
func Test_UploaderEnabledTrue(t *testing.T) {
var uploadedData []byte
var err error
ResetStats()
var wg sync.WaitGroup
wg.Add(1)
sc := &mockStorageClient{
uploadFn: func(ctx context.Context, reader io.Reader) error {
defer wg.Done()
uploadedData, err = io.ReadAll(reader)
return err
},
}
dp := &mockDataProvider{data: "my upload data"}
uploader := NewUploader(sc, dp, 100*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, func() bool { return true })
defer cancel()
wg.Wait()
if exp, got := string(uploadedData), "my upload data"; exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got)
}
}
func Test_UploaderStats(t *testing.T) {
sc := &mockStorageClient{}
dp := &mockDataProvider{data: "my upload data"}

Loading…
Cancel
Save