1
0
Fork 0

Merge remote-tracking branch 'origin' into zero-wal

master
Philip O'Toole 9 months ago
commit 88971bb883

@ -2,9 +2,10 @@
### Implementation changes and bug fixes ### Implementation changes and bug fixes
- [PR #1456](https://github.com/rqlite/rqlite/pull/1459): Standardize on chunk size. - [PR #1456](https://github.com/rqlite/rqlite/pull/1459): Standardize on chunk size.
- [PR #1456](https://github.com/rqlite/rqlite/pull/1459): Set `TrailingLogs=0` to truncate log during user-initiated Snapshotting. - [PR #1456](https://github.com/rqlite/rqlite/pull/1459): Set `TrailingLogs=0` to truncate log during user-initiated Snapshotting.
- [PR #1462](https://github.com/rqlite/rqlite/pull/1462): Refactor redirect logic in HTTP service - [PR #1462](https://github.com/rqlite/rqlite/pull/1462): Refactor redirect logic in HTTP service.
- [PR #1464](https://github.com/rqlite/rqlite/pull/1464): Handle snapshotting of empty WAL files - [PR #1464](https://github.com/rqlite/rqlite/pull/1464): Handle snapshotting of empty WAL files.
- [PR #1465](https://github.com/rqlite/rqlite/pull/1465): Move uploader goroutine into Uploader.
## 8.0.1 (December 8th 2023) ## 8.0.1 (December 8th 2023)
This release fixes an edge case issue during restore-from-SQLite. It's possible if a rqlite system crashes shortly after restoring from SQLite it may not have loaded the data correctly. This release fixes an edge case issue during restore-from-SQLite. It's possible if a rqlite system crashes shortly after restoring from SQLite it may not have loaded the data correctly.

@ -83,33 +83,37 @@ func NewUploader(storageClient StorageClient, dataProvider DataProvider, interva
} }
// Start starts the Uploader service. // Start starts the Uploader service.
func (u *Uploader) Start(ctx context.Context, isUploadEnabled func() bool) { func (u *Uploader) Start(ctx context.Context, isUploadEnabled func() bool) chan struct{} {
doneCh := make(chan struct{})
if isUploadEnabled == nil { if isUploadEnabled == nil {
isUploadEnabled = func() bool { return true } isUploadEnabled = func() bool { return true }
} }
u.logger.Printf("starting upload to %s every %s", u.storageClient, u.interval) u.logger.Printf("starting upload to %s every %s", u.storageClient, u.interval)
ticker := time.NewTicker(u.interval) ticker := time.NewTicker(u.interval)
defer ticker.Stop() go func() {
defer ticker.Stop()
for { defer close(doneCh)
select { for {
case <-ctx.Done(): select {
u.logger.Println("upload service shutting down") case <-ctx.Done():
return u.logger.Println("upload service shutting down")
case <-ticker.C: return
if !isUploadEnabled() { case <-ticker.C:
// Reset the lastSum so that the next time we're enabled upload will if !isUploadEnabled() {
// happen. We do this to be conservative, as we don't know what was // Reset the lastSum so that the next time we're enabled upload will
// happening while upload was disabled. // happen. We do this to be conservative, as we don't know what was
u.lastSum = nil // happening while upload was disabled.
continue u.lastSum = nil
} continue
if err := u.upload(ctx); err != nil { }
u.logger.Printf("failed to upload to %s: %v", u.storageClient, err) if err := u.upload(ctx); err != nil {
u.logger.Printf("failed to upload to %s: %v", u.storageClient, err)
}
} }
} }
} }()
return doneCh
} }
// Stats returns the stats for the Uploader service. // Stats returns the stats for the Uploader service.

@ -14,6 +14,7 @@ import (
) )
func Test_NewUploader(t *testing.T) { func Test_NewUploader(t *testing.T) {
ResetStats()
storageClient := &mockStorageClient{} storageClient := &mockStorageClient{}
dataProvider := &mockDataProvider{} dataProvider := &mockDataProvider{}
interval := time.Second interval := time.Second
@ -47,10 +48,10 @@ func Test_UploaderSingleUpload(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress) uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, nil) done := uploader.Start(ctx, nil)
wg.Wait() wg.Wait()
cancel() cancel()
<-ctx.Done() <-done
if exp, got := "my upload data", string(uploadedData); exp != got { if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got) t.Errorf("expected uploadedData to be %s, got %s", exp, got)
@ -60,7 +61,6 @@ func Test_UploaderSingleUpload(t *testing.T) {
func Test_UploaderSingleUploadCompress(t *testing.T) { func Test_UploaderSingleUploadCompress(t *testing.T) {
ResetStats() ResetStats()
var uploadedData []byte var uploadedData []byte
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
sc := &mockStorageClient{ sc := &mockStorageClient{
@ -82,10 +82,10 @@ func Test_UploaderSingleUploadCompress(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadCompress) uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadCompress)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, nil) done := uploader.Start(ctx, nil)
wg.Wait() wg.Wait()
cancel() cancel()
<-ctx.Done() <-done
if exp, got := "my upload data", string(uploadedData); exp != got { if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got) t.Errorf("expected uploadedData to be %s, got %s", exp, got)
@ -94,7 +94,6 @@ func Test_UploaderSingleUploadCompress(t *testing.T) {
func Test_UploaderDoubleUpload(t *testing.T) { func Test_UploaderDoubleUpload(t *testing.T) {
ResetStats() ResetStats()
var uploadedData []byte var uploadedData []byte
var err error var err error
@ -113,10 +112,10 @@ func Test_UploaderDoubleUpload(t *testing.T) {
uploader.disableSumCheck = true // Force upload of the same data uploader.disableSumCheck = true // Force upload of the same data
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, nil) done := uploader.Start(ctx, nil)
wg.Wait() wg.Wait()
cancel() cancel()
<-ctx.Done() <-done
if exp, got := "my upload data", string(uploadedData); exp != got { if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got) t.Errorf("expected uploadedData to be %s, got %s", exp, got)
@ -125,7 +124,6 @@ func Test_UploaderDoubleUpload(t *testing.T) {
func Test_UploaderFailThenOK(t *testing.T) { func Test_UploaderFailThenOK(t *testing.T) {
ResetStats() ResetStats()
var uploadedData []byte var uploadedData []byte
uploadCount := 0 uploadCount := 0
var err error var err error
@ -149,10 +147,10 @@ func Test_UploaderFailThenOK(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress) uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, nil) done := uploader.Start(ctx, nil)
wg.Wait() wg.Wait()
cancel() cancel()
<-ctx.Done() <-done
if exp, got := "my upload data", string(uploadedData); exp != got { if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got) t.Errorf("expected uploadedData to be %s, got %s", exp, got)
@ -160,6 +158,7 @@ func Test_UploaderFailThenOK(t *testing.T) {
} }
func Test_UploaderOKThenFail(t *testing.T) { func Test_UploaderOKThenFail(t *testing.T) {
ResetStats()
var uploadedData []byte var uploadedData []byte
uploadCount := 0 uploadCount := 0
var err error var err error
@ -184,10 +183,10 @@ func Test_UploaderOKThenFail(t *testing.T) {
uploader.disableSumCheck = true // Disable because we want to upload twice. uploader.disableSumCheck = true // Disable because we want to upload twice.
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, nil) done := uploader.Start(ctx, nil)
wg.Wait() wg.Wait()
cancel() cancel()
<-ctx.Done() <-done
if exp, got := "my upload data", string(uploadedData); exp != got { if exp, got := "my upload data", string(uploadedData); exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got) t.Errorf("expected uploadedData to be %s, got %s", exp, got)
@ -195,6 +194,7 @@ func Test_UploaderOKThenFail(t *testing.T) {
} }
func Test_UploaderContextCancellation(t *testing.T) { func Test_UploaderContextCancellation(t *testing.T) {
ResetStats()
var uploadCount int32 var uploadCount int32
sc := &mockStorageClient{ sc := &mockStorageClient{
@ -207,10 +207,9 @@ func Test_UploaderContextCancellation(t *testing.T) {
uploader := NewUploader(sc, dp, time.Second, UploadNoCompress) uploader := NewUploader(sc, dp, time.Second, UploadNoCompress)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
go uploader.Start(ctx, nil) done := uploader.Start(ctx, nil)
<-ctx.Done()
cancel() cancel()
<-ctx.Done() <-done
if exp, got := int32(0), atomic.LoadInt32(&uploadCount); exp != got { if exp, got := int32(0), atomic.LoadInt32(&uploadCount); exp != got {
t.Errorf("expected uploadCount to be %d, got %d", exp, got) t.Errorf("expected uploadCount to be %d, got %d", exp, got)
@ -219,25 +218,25 @@ func Test_UploaderContextCancellation(t *testing.T) {
func Test_UploaderEnabledFalse(t *testing.T) { func Test_UploaderEnabledFalse(t *testing.T) {
ResetStats() ResetStats()
sc := &mockStorageClient{} sc := &mockStorageClient{}
dp := &mockDataProvider{data: "my upload data"} dp := &mockDataProvider{data: "my upload data"}
uploader := NewUploader(sc, dp, 100*time.Millisecond, false) uploader := NewUploader(sc, dp, 100*time.Millisecond, false)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, func() bool { return false }) done := uploader.Start(ctx, func() bool { return false })
time.Sleep(time.Second) time.Sleep(time.Second)
defer cancel()
if exp, got := int64(0), stats.Get(numUploadsOK).(*expvar.Int); exp != got.Value() { if exp, got := int64(0), stats.Get(numUploadsOK).(*expvar.Int); exp != got.Value() {
t.Errorf("expected numUploadsOK to be %d, got %d", exp, got) t.Errorf("expected numUploadsOK to be %d, got %d", exp, got)
} }
cancel()
<-done
} }
func Test_UploaderEnabledTrue(t *testing.T) { func Test_UploaderEnabledTrue(t *testing.T) {
ResetStats()
var uploadedData []byte var uploadedData []byte
var err error var err error
ResetStats()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
@ -252,16 +251,17 @@ func Test_UploaderEnabledTrue(t *testing.T) {
uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress) uploader := NewUploader(sc, dp, 100*time.Millisecond, UploadNoCompress)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go uploader.Start(ctx, func() bool { return true }) done := uploader.Start(ctx, func() bool { return true })
defer cancel()
wg.Wait() wg.Wait()
if exp, got := string(uploadedData), "my upload data"; exp != got { if exp, got := string(uploadedData), "my upload data"; exp != got {
t.Errorf("expected uploadedData to be %s, got %s", exp, got) t.Errorf("expected uploadedData to be %s, got %s", exp, got)
} }
cancel()
<-done
} }
func Test_UploaderStats(t *testing.T) { func Test_UploaderStats(t *testing.T) {
ResetStats()
sc := &mockStorageClient{} sc := &mockStorageClient{}
dp := &mockDataProvider{data: "my upload data"} dp := &mockDataProvider{data: "my upload data"}
interval := 100 * time.Millisecond interval := 100 * time.Millisecond

@ -241,7 +241,7 @@ func startAutoBackups(ctx context.Context, cfg *Config, str *store.Store) (*back
sc := aws.NewS3Client(s3cfg.Endpoint, s3cfg.Region, s3cfg.AccessKeyID, s3cfg.SecretAccessKey, sc := aws.NewS3Client(s3cfg.Endpoint, s3cfg.Region, s3cfg.AccessKeyID, s3cfg.SecretAccessKey,
s3cfg.Bucket, s3cfg.Path) s3cfg.Bucket, s3cfg.Path)
u := backup.NewUploader(sc, provider, time.Duration(uCfg.Interval), !uCfg.NoCompress) u := backup.NewUploader(sc, provider, time.Duration(uCfg.Interval), !uCfg.NoCompress)
go u.Start(ctx, nil) u.Start(ctx, nil)
return u, nil return u, nil
} }

@ -232,7 +232,7 @@ func (s *Store) check() (retError error) {
syncDirMaybe(s.dir) syncDirMaybe(s.dir)
s.logger.Printf("check complete") s.logger.Printf("check complete")
}() }()
s.logger.Printf("checking snapshot store at %s", s.dir) s.logger.Printf("checking consistency of snapshot store at %s", s.dir)
if err := RemoveAllTmpSnapshotData(s.dir); err != nil { if err := RemoveAllTmpSnapshotData(s.dir); err != nil {
return err return err

Loading…
Cancel
Save