1
0
Fork 0

Provide should also return last modified

master
Philip O'Toole 8 months ago
parent d7a5d3676a
commit b111a7bf98

@ -30,8 +30,10 @@ type DataProvider interface {
// can be ignored.
Check(i int64) (int64, bool)
// Provide writes the data-for-upload to the file specified by path.
Provide(path string) error
// Provide writes the data-for-upload to the file specified by path. Because
// Provide may change the data in the DataProvider, it returns the current
// value of i, which should be passed to the next invocation of Check().
Provide(path string) (int64, error)
}
// stats captures stats for the Uploader service.
@ -139,13 +141,14 @@ func (u *Uploader) upload(ctx context.Context) error {
}
defer os.Remove(filetoUpload)
lastI, changed := u.dataProvider.Check(u.lastI)
_, changed := u.dataProvider.Check(u.lastI)
if !changed {
stats.Add(numUploadsSkipped, 1)
return nil
}
if err := u.dataProvider.Provide(filetoUpload); err != nil {
lastI, err := u.dataProvider.Provide(filetoUpload)
if err != nil {
return err
}
if err := u.compressIfNeeded(filetoUpload); err != nil {

@ -129,6 +129,14 @@ func Test_UploaderDoubleUpload(t *testing.T) {
}
return 2, false
}
provideLastFnCalled := int64(1)
dp.provideLastIFn = func() int64 {
// Keep it one ahead of checkFn.
defer func() { provideLastFnCalled++ }()
return provideLastFnCalled
}
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background())
@ -166,6 +174,14 @@ func Test_UploaderFailThenOK(t *testing.T) {
dp.checkFn = func(i int64) (int64, bool) {
return 1, true
}
provideLastFnCalled := int64(1)
dp.provideLastIFn = func() int64 {
// Keep it one ahead of checkFn.
defer func() { provideLastFnCalled++ }()
return provideLastFnCalled
}
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background())
@ -204,6 +220,12 @@ func Test_UploaderOKThenFail(t *testing.T) {
dp.checkFn = func(i int64) (int64, bool) {
return 1, true
}
provideLastFnCalled := int64(2)
dp.provideLastIFn = func() int64 {
// Keep it one ahead of checkFn.
defer func() { provideLastFnCalled++ }()
return provideLastFnCalled
}
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background())
@ -278,6 +300,12 @@ func Test_UploaderEnabledTrue(t *testing.T) {
dp.checkFn = func(i int64) (int64, bool) {
return 1, true
}
provideLastFnCalled := int64(2)
dp.provideLastIFn = func() int64 {
// Keep it one ahead of checkFn.
defer func() { provideLastFnCalled++ }()
return provideLastFnCalled
}
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background())
@ -327,9 +355,10 @@ func (mc *mockStorageClient) String() string {
}
type mockDataProvider struct {
data string
err error
checkFn func(i int64) (int64, bool)
data string
err error
checkFn func(i int64) (int64, bool)
provideLastIFn func() int64
}
func (mp *mockDataProvider) Check(i int64) (int64, bool) {
@ -339,9 +368,15 @@ func (mp *mockDataProvider) Check(i int64) (int64, bool) {
return 0, false
}
func (mp *mockDataProvider) Provide(path string) error {
func (mp *mockDataProvider) Provide(path string) (int64, error) {
if mp.err != nil {
return mp.err
return 0, mp.err
}
return os.WriteFile(path, []byte(mp.data), 0644)
lastI := int64(0)
if mp.provideLastIFn != nil {
lastI = mp.provideLastIFn()
}
return lastI, os.WriteFile(path, []byte(mp.data), 0644)
}

@ -53,7 +53,7 @@ func (p *Provider) Provide(path string) (retErr error) {
fd, err := os.Create(path)
if err != nil {
return err
return 0, err
}
defer fd.Close()
@ -70,17 +70,21 @@ func (p *Provider) Provide(path string) (retErr error) {
time.Sleep(p.retryInterval)
nRetries++
if nRetries > p.nRetries {
return err
return 0, err
}
}
// Switch database to DELETE mode, to keep existing behaviour.
if err := fd.Close(); err != nil {
return err
return 0, err
}
if db.EnsureDeleteMode(path) != nil {
return err
return 0, err
}
lm, err := p.str.db.LastModified()
if err != nil {
return 0, err
}
stats.Add(numProvides, 1)
return nil
return lm.UnixNano(), nil
}

@ -49,7 +49,7 @@ func Test_SingleNodeProvide(t *testing.T) {
tempFile := mustCreateTempFile()
defer os.Remove(tempFile)
provider := NewProvider(s0, false)
if err := provider.Provide(tempFile); err != nil {
if _, err := provider.Provide(tempFile); err != nil {
t.Fatalf("failed to provide SQLite data: %s", err.Error())
}
@ -106,7 +106,7 @@ func Test_SingleNodeProvideNoData(t *testing.T) {
tmpFile := mustCreateTempFile()
defer os.Remove(tmpFile)
provider := NewProvider(s, false)
if err := provider.Provide(tmpFile); err != nil {
if _, err := provider.Provide(tmpFile); err != nil {
t.Fatalf("store failed to provide: %s", err.Error())
}
}

Loading…
Cancel
Save