1
0
Fork 0

Merge pull request #1465 from rqlite/uploader-done

Move uploader goroutine into Uploader
master
Philip O'Toole 9 months ago committed by GitHub
commit 6320a95e27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,8 +2,9 @@
### Implementation changes and bug fixes
- [PR #1456](https://github.com/rqlite/rqlite/pull/1459): Standardize on chunk size.
- [PR #1456](https://github.com/rqlite/rqlite/pull/1459): Set `TrailingLogs=0` to truncate log during user-initiated Snapshotting.
- [PR #1462](https://github.com/rqlite/rqlite/pull/1462): Refactor redirect logic in HTTP service
- [PR #1462](https://github.com/rqlite/rqlite/pull/1462): Refactor redirect logic in HTTP service.
- [PR #1465](https://github.com/rqlite/rqlite/pull/1465): Move uploader goroutine into Uploader.
## 8.0.1 (December 8th 2023)
This release fixes an edge case issue during restore-from-SQLite. It's possible if a rqlite system crashes shortly after restoring from SQLite it may not have loaded the data correctly.

@ -83,33 +83,37 @@ func NewUploader(storageClient StorageClient, dataProvider DataProvider, interva
}
// Start starts the Uploader service.
func (u *Uploader) Start(ctx context.Context, isUploadEnabled func() bool) {
func (u *Uploader) Start(ctx context.Context, isUploadEnabled func() bool) chan struct{} {
doneCh := make(chan struct{})
if isUploadEnabled == nil {
isUploadEnabled = func() bool { return true }
}
u.logger.Printf("starting upload to %s every %s", u.storageClient, u.interval)
ticker := time.NewTicker(u.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
u.logger.Println("upload service shutting down")
return
case <-ticker.C:
if !isUploadEnabled() {
// Reset the lastSum so that the next time we're enabled upload will
// happen. We do this to be conservative, as we don't know what was
// happening while upload was disabled.
u.lastSum = nil
continue
}
if err := u.upload(ctx); err != nil {
u.logger.Printf("failed to upload to %s: %v", u.storageClient, err)
go func() {
defer ticker.Stop()
defer close(doneCh)
for {
select {
case <-ctx.Done():
u.logger.Println("upload service shutting down")
return
case <-ticker.C:
if !isUploadEnabled() {
// Reset the lastSum so that the next time we're enabled upload will
// happen. We do this to be conservative, as we don't know what was
// happening while upload was disabled.
u.lastSum = nil
continue
}
if err := u.upload(ctx); err != nil {
u.logger.Printf("failed to upload to %s: %v", u.storageClient, err)
}
}
}
}
}()
return doneCh
}
// Stats returns the stats for the Uploader service.

@ -14,6 +14,7 @@ import (
)
func Test_NewUploader(t *testing.T) {
ResetStats()
storageClient := &mockStorageClient{}
dataProvider := &mockDataProvider{}
interval := time.Second
@ -47,10 +48,10 @@ func Test_UploaderSingleUpload(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, nil)
done := uploader.Start(ctx, nil)
wg.Wait()
cancel()
<-ctx.Done()
<-done
if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got)
@ -60,7 +61,6 @@ func Test_UploaderSingleUpload(t *testing.T) {
func Test_UploaderSingleUploadCompress(t *testing.T) {
ResetStats()
var uploadedData []byte
var wg sync.WaitGroup
wg.Add(1)
sc := &mockStorageClient{
@ -82,10 +82,10 @@ func Test_UploaderSingleUploadCompress(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadCompress)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, nil)
done := uploader.Start(ctx, nil)
wg.Wait()
cancel()
<-ctx.Done()
<-done
if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got)
@ -94,7 +94,6 @@ func Test_UploaderSingleUploadCompress(t *testing.T) {
func Test_UploaderDoubleUpload(t *testing.T) {
ResetStats()
var uploadedData []byte
var err error
@ -113,10 +112,10 @@ func Test_UploaderDoubleUpload(t *testing.T) {
uploader.disableSumCheck = true // Force upload of the same data
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, nil)
done := uploader.Start(ctx, nil)
wg.Wait()
cancel()
<-ctx.Done()
<-done
if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got)
@ -125,7 +124,6 @@ func Test_UploaderDoubleUpload(t *testing.T) {
func Test_UploaderFailThenOK(t *testing.T) {
ResetStats()
var uploadedData []byte
uploadCount := 0
var err error
@ -149,10 +147,10 @@ func Test_UploaderFailThenOK(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, nil)
done := uploader.Start(ctx, nil)
wg.Wait()
cancel()
<-ctx.Done()
<-done
if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got)
@ -160,6 +158,7 @@ func Test_UploaderFailThenOK(t *testing.T) {
}
func Test_UploaderOKThenFail(t *testing.T) {
ResetStats()
var uploadedData []byte
uploadCount := 0
var err error
@ -184,10 +183,10 @@ func Test_UploaderOKThenFail(t *testing.T) {
uploader.disableSumCheck = true // Disable because we want to upload twice.
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, nil)
done := uploader.Start(ctx, nil)
wg.Wait()
cancel()
<-ctx.Done()
<-done
if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got)
@ -195,6 +194,7 @@ func Test_UploaderOKThenFail(t *testing.T) {
}
func Test_UploaderContextCancellation(t *testing.T) {
ResetStats()
var uploadCount int32
sc := &mockStorageClient{
@ -207,10 +207,9 @@ func Test_UploaderContextCancellation(t *testing.T) {
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
go uploader.Start(ctx, nil)
<-ctx.Done()
done := uploader.Start(ctx, nil)
cancel()
<-ctx.Done()
<-done
if exp, got := int32(0), atomic.LoadInt32(&uploadCount); exp != got {
t.Errorf("expected uploadCount to be %d, got %d", exp, got)
@ -219,25 +218,25 @@ func Test_UploaderContextCancellation(t *testing.T) {
func Test_UploaderEnabledFalse(t *testing.T) {
ResetStats()
sc := &mockStorageClient{}
dp := &mockDataProvider{data: "my upload data"}
uploader := NewUploader(sc, dp, 100*time.Millisecond, false)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, func() bool { return false })
done := uploader.Start(ctx, func() bool { return false })
time.Sleep(time.Second)
defer cancel()
if exp, got := int64(0), stats.Get(numUploadsOK).(*expvar.Int); exp != got.Value() {
t.Errorf("expected numUploadsOK to be %d, got %d", exp, got)
}
cancel()
<-done
}
func Test_UploaderEnabledTrue(t *testing.T) {
ResetStats()
var uploadedData []byte
var err error
ResetStats()
var wg sync.WaitGroup
wg.Add(1)
@ -252,16 +251,17 @@ func Test_UploaderEnabledTrue(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, func() bool { return true })
defer cancel()
done := uploader.Start(ctx, func() bool { return true })
wg.Wait()
if exp, got := string(uploadedData), "my upload data"; exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got)
}
cancel()
<-done
}
func Test_UploaderStats(t *testing.T) {
ResetStats()
sc := &mockStorageClient{}
dp := &mockDataProvider{data: "my upload data"}
interval := 100 * time.Millisecond

@ -241,7 +241,7 @@ func startAutoBackups(ctx context.Context, cfg *Config, str *store.Store) (*back
sc := aws.NewS3Client(s3cfg.Endpoint, s3cfg.Region, s3cfg.AccessKeyID, s3cfg.SecretAccessKey,
s3cfg.Bucket, s3cfg.Path)
u := backup.NewUploader(sc, provider, time.Duration(uCfg.Interval), !uCfg.NoCompress)
go u.Start(ctx, nil)
u.Start(ctx, nil)
return u, nil
}

Loading…
Cancel
Save