diff --git a/CHANGELOG.md b/CHANGELOG.md index 4888b0b8..9a221b2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,10 @@ ### Implementation changes and bug fixes - [PR #1456](https://github.com/rqlite/rqlite/pull/1459): Standardize on chunk size. - [PR #1456](https://github.com/rqlite/rqlite/pull/1459): Set `TrailingLogs=0` to truncate log during user-initiated Snapshotting. -- [PR #1462](https://github.com/rqlite/rqlite/pull/1462): Refactor redirect logic in HTTP service -- [PR #1464](https://github.com/rqlite/rqlite/pull/1464): Handle snapshotting of empty WAL files - +- [PR #1462](https://github.com/rqlite/rqlite/pull/1462): Refactor redirect logic in HTTP service. +- [PR #1464](https://github.com/rqlite/rqlite/pull/1464): Handle snapshotting of empty WAL files. +- [PR #1465](https://github.com/rqlite/rqlite/pull/1465): Move uploader goroutine into Uploader. + ## 8.0.1 (December 8th 2023) This release fixes an edge case issue during restore-from-SQLite. It's possible if a rqlite system crashes shortly after restoring from SQLite it may not have loaded the data correctly. diff --git a/auto/backup/uploader.go b/auto/backup/uploader.go index 3883db8a..90b9e005 100644 --- a/auto/backup/uploader.go +++ b/auto/backup/uploader.go @@ -83,33 +83,37 @@ func NewUploader(storageClient StorageClient, dataProvider DataProvider, interva } // Start starts the Uploader service. -func (u *Uploader) Start(ctx context.Context, isUploadEnabled func() bool) { +func (u *Uploader) Start(ctx context.Context, isUploadEnabled func() bool) chan struct{} { + doneCh := make(chan struct{}) if isUploadEnabled == nil { isUploadEnabled = func() bool { return true } } u.logger.Printf("starting upload to %s every %s", u.storageClient, u.interval) ticker := time.NewTicker(u.interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - u.logger.Println("upload service shutting down") - return - case <-ticker.C: - if !isUploadEnabled() { - // Reset the lastSum so that the next time we're enabled upload will - // happen. We do this to be conservative, as we don't know what was - // happening while upload was disabled. - u.lastSum = nil - continue - } - if err := u.upload(ctx); err != nil { - u.logger.Printf("failed to upload to %s: %v", u.storageClient, err) + go func() { + defer ticker.Stop() + defer close(doneCh) + for { + select { + case <-ctx.Done(): + u.logger.Println("upload service shutting down") + return + case <-ticker.C: + if !isUploadEnabled() { + // Reset the lastSum so that the next time we're enabled upload will + // happen. We do this to be conservative, as we don't know what was + // happening while upload was disabled. + u.lastSum = nil + continue + } + if err := u.upload(ctx); err != nil { + u.logger.Printf("failed to upload to %s: %v", u.storageClient, err) + } } } - } + }() + return doneCh } // Stats returns the stats for the Uploader service. diff --git a/auto/backup/uploader_test.go b/auto/backup/uploader_test.go index 7b27aaa1..ff937c65 100644 --- a/auto/backup/uploader_test.go +++ b/auto/backup/uploader_test.go @@ -14,6 +14,7 @@ import ( ) func Test_NewUploader(t *testing.T) { + ResetStats() storageClient := &mockStorageClient{} dataProvider := &mockDataProvider{} interval := time.Second @@ -47,10 +48,10 @@ func Test_UploaderSingleUpload(t *testing.T) { uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress) ctx, cancel := context.WithCancel(context.Background()) - go uploader.Start(ctx, nil) + done := uploader.Start(ctx, nil) wg.Wait() cancel() - <-ctx.Done() + <-done if exp, got := "my upload data", string(uploadedData); exp != got { t.Errorf("expected uploadedData to be %s, got %s", exp, got) @@ -60,7 +61,6 @@ func Test_UploaderSingleUpload(t *testing.T) { func Test_UploaderSingleUploadCompress(t *testing.T) { ResetStats() var uploadedData []byte - var wg sync.WaitGroup wg.Add(1) sc := &mockStorageClient{ @@ -82,10 +82,10 @@ func Test_UploaderSingleUploadCompress(t *testing.T) { uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadCompress) ctx, cancel := context.WithCancel(context.Background()) - go uploader.Start(ctx, nil) + done := uploader.Start(ctx, nil) wg.Wait() cancel() - <-ctx.Done() + <-done if exp, got := "my upload data", string(uploadedData); exp != got { t.Errorf("expected uploadedData to be %s, got %s", exp, got) @@ -94,7 +94,6 @@ func Test_UploaderSingleUploadCompress(t *testing.T) { func Test_UploaderDoubleUpload(t *testing.T) { ResetStats() - var uploadedData []byte var err error @@ -113,10 +112,10 @@ func Test_UploaderDoubleUpload(t *testing.T) { uploader.disableSumCheck = true // Force upload of the same data ctx, cancel := context.WithCancel(context.Background()) - go uploader.Start(ctx, nil) + done := uploader.Start(ctx, nil) wg.Wait() cancel() - <-ctx.Done() + <-done if exp, got := "my upload data", string(uploadedData); exp != got { t.Errorf("expected uploadedData to be %s, got %s", exp, got) @@ -125,7 +124,6 @@ func Test_UploaderDoubleUpload(t *testing.T) { func Test_UploaderFailThenOK(t *testing.T) { ResetStats() - var uploadedData []byte uploadCount := 0 var err error @@ -149,10 +147,10 @@ func Test_UploaderFailThenOK(t *testing.T) { uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress) ctx, cancel := context.WithCancel(context.Background()) - go uploader.Start(ctx, nil) + done := uploader.Start(ctx, nil) wg.Wait() cancel() - <-ctx.Done() + <-done if exp, got := "my upload data", string(uploadedData); exp != got { t.Errorf("expected uploadedData to be %s, got %s", exp, got) @@ -160,6 +158,7 @@ func Test_UploaderFailThenOK(t *testing.T) { } func Test_UploaderOKThenFail(t *testing.T) { + ResetStats() var uploadedData []byte uploadCount := 0 var err error @@ -184,10 +183,10 @@ func Test_UploaderOKThenFail(t *testing.T) { uploader.disableSumCheck = true // Disable because we want to upload twice. ctx, cancel := context.WithCancel(context.Background()) - go uploader.Start(ctx, nil) + done := uploader.Start(ctx, nil) wg.Wait() cancel() - <-ctx.Done() + <-done if exp, got := "my upload data", string(uploadedData); exp != got { t.Errorf("expected uploadedData to be %s, got %s", exp, got) @@ -195,6 +194,7 @@ func Test_UploaderOKThenFail(t *testing.T) { } func Test_UploaderContextCancellation(t *testing.T) { + ResetStats() var uploadCount int32 sc := &mockStorageClient{ @@ -207,10 +207,9 @@ func Test_UploaderContextCancellation(t *testing.T) { uploader := NewUploader(sc, dp, time.Second, UploadNoCompress) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) - go uploader.Start(ctx, nil) - <-ctx.Done() + done := uploader.Start(ctx, nil) cancel() - <-ctx.Done() + <-done if exp, got := int32(0), atomic.LoadInt32(&uploadCount); exp != got { t.Errorf("expected uploadCount to be %d, got %d", exp, got) @@ -219,25 +218,25 @@ func Test_UploaderContextCancellation(t *testing.T) { func Test_UploaderEnabledFalse(t *testing.T) { ResetStats() - sc := &mockStorageClient{} dp := &mockDataProvider{data: "my upload data"} uploader := NewUploader(sc, dp, 100*time.Millisecond, false) ctx, cancel := context.WithCancel(context.Background()) - go uploader.Start(ctx, func() bool { return false }) + done := uploader.Start(ctx, func() bool { return false }) time.Sleep(time.Second) - defer cancel() if exp, got := int64(0), stats.Get(numUploadsOK).(*expvar.Int); exp != got.Value() { t.Errorf("expected numUploadsOK to be %d, got %d", exp, got) } + cancel() + <-done } func Test_UploaderEnabledTrue(t *testing.T) { + ResetStats() var uploadedData []byte var err error - ResetStats() var wg sync.WaitGroup wg.Add(1) @@ -252,16 +251,17 @@ func Test_UploaderEnabledTrue(t *testing.T) { uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress) ctx, cancel := context.WithCancel(context.Background()) - go uploader.Start(ctx, func() bool { return true }) - defer cancel() - + done := uploader.Start(ctx, func() bool { return true }) wg.Wait() if exp, got := string(uploadedData), "my upload data"; exp != got { t.Errorf("expected uploadedData to be %s, got %s", exp, got) } + cancel() + <-done } func Test_UploaderStats(t *testing.T) { + ResetStats() sc := &mockStorageClient{} dp := &mockDataProvider{data: "my upload data"} interval := 100 * time.Millisecond diff --git a/cmd/rqlited/main.go b/cmd/rqlited/main.go index 4a9007e5..20a7d852 100644 --- a/cmd/rqlited/main.go +++ b/cmd/rqlited/main.go @@ -241,7 +241,7 @@ func startAutoBackups(ctx context.Context, cfg *Config, str *store.Store) (*back sc := aws.NewS3Client(s3cfg.Endpoint, s3cfg.Region, s3cfg.AccessKeyID, s3cfg.SecretAccessKey, s3cfg.Bucket, s3cfg.Path) u := backup.NewUploader(sc, provider, time.Duration(uCfg.Interval), !uCfg.NoCompress) - go u.Start(ctx, nil) + u.Start(ctx, nil) return u, nil } diff --git a/snapshot/store.go b/snapshot/store.go index 3e31b369..9e4e43ce 100644 --- a/snapshot/store.go +++ b/snapshot/store.go @@ -232,7 +232,7 @@ func (s *Store) check() (retError error) { syncDirMaybe(s.dir) s.logger.Printf("check complete") }() - s.logger.Printf("checking snapshot store at %s", s.dir) + s.logger.Printf("checking consistency of snapshot store at %s", s.dir) if err := RemoveAllTmpSnapshotData(s.dir); err != nil { return err