@ -1,7 +1,6 @@
package upload
import (
"bytes"
"compress/gzip"
"context"
"expvar"
@ -19,21 +18,21 @@ type StorageClient interface {
}
// DataProvider is an interface for providing data to be uploaded. The Uploader
// service will call Provide() to get a reader for the data to be uploaded. Once
// the upload completes the reader will be closed, regardless of whether the
// upload succeeded or failed.
// service will call Provide() to have the data-for-upload to be written to the
// to the file specified by path.
type DataProvider interface {
Provide ( ) ( io . ReadCloser , error )
Provide ( path string ) error
}
// stats captures stats for the Uploader service.
var stats * expvar . Map
const (
numUploadsOK = "num_uploads_ok"
numUploadsFail = "num_uploads_fail"
totalUploadBytes = "total_upload_bytes"
lastUploadBytes = "last_upload_bytes"
numUploadsOK = "num_uploads_ok"
numUploadsFail = "num_uploads_fail"
numUploadsSkipped = "num_uploads_skipped"
totalUploadBytes = "total_upload_bytes"
lastUploadBytes = "last_upload_bytes"
UploadCompress = true
UploadNoCompress = false
@ -49,6 +48,7 @@ func ResetStats() {
stats . Init ( )
stats . Add ( numUploadsOK , 0 )
stats . Add ( numUploadsFail , 0 )
stats . Add ( numUploadsSkipped , 0 )
stats . Add ( totalUploadBytes , 0 )
stats . Add ( lastUploadBytes , 0 )
}
@ -63,6 +63,12 @@ type Uploader struct {
logger * log . Logger
lastUploadTime time . Time
lastUploadDuration time . Duration
lastSum SHA256Sum
// disableSumCheck is used for testing purposes to disable the check that
// prevents uploading the same data twice.
disableSumCheck bool
}
// NewUploader creates a new Uploader service.
@ -93,6 +99,10 @@ func (u *Uploader) Start(ctx context.Context, isUploadEnabled func() bool) {
return
case <- ticker . C :
if ! isUploadEnabled ( ) {
// Reset the lastSum so that the next time we're enabled upload will
// happen. We do this to be conservative, as we don't know what was
// happening while upload was disabled.
u . lastSum = nil
continue
}
if err := u . upload ( ctx ) ; err != nil {
@ -110,38 +120,47 @@ func (u *Uploader) Stats() (map[string]interface{}, error) {
"compress" : u . compress ,
"last_upload_time" : u . lastUploadTime . Format ( time . RFC3339 ) ,
"last_upload_duration" : u . lastUploadDuration . String ( ) ,
"last_upload_sum" : u . lastSum . String ( ) ,
}
return status , nil
}
func ( u * Uploader ) upload ( ctx context . Context ) error {
rc , err := u . dataProvider . Provide ( )
// create a temporary file for the data to be uploaded
filetoUpload , err := tempFilename ( )
if err != nil {
return err
}
defer rc. Close ( )
defer os. Remove ( filetoUpload )
r := rc . ( io . Reader )
if u . compress {
buffer := new ( bytes . Buffer )
gw := gzip . NewWriter ( buffer )
_ , err = io . Copy ( gw , rc )
if err != nil {
return err
}
err = gw . Close ( )
if err != nil {
return err
}
r = buffer
if err := u . dataProvider . Provide ( filetoUpload ) ; err != nil {
return err
}
if err := u . compressIfNeeded ( filetoUpload ) ; err != nil {
return err
}
sum , err := FileSHA256 ( filetoUpload )
if err != nil {
return err
}
if ! u . disableSumCheck && sum . Equals ( u . lastSum ) {
stats . Add ( numUploadsSkipped , 1 )
return nil
}
cr := & countingReader { reader : r }
fd , err := os . Open ( filetoUpload )
if err != nil {
return err
}
cr := & countingReader { reader : fd }
startTime := time . Now ( )
err = u . storageClient . Upload ( ctx , cr )
if err != nil {
stats . Add ( numUploadsFail , 1 )
} else {
u . lastSum = sum
stats . Add ( numUploadsOK , 1 )
stats . Add ( totalUploadBytes , cr . count )
stats . Get ( lastUploadBytes ) . ( * expvar . Int ) . Set ( cr . count )
@ -151,6 +170,49 @@ func (u *Uploader) upload(ctx context.Context) error {
return err
}
func ( u * Uploader ) compressIfNeeded ( path string ) error {
if ! u . compress {
return nil
}
compressedFile , err := tempFilename ( )
if err != nil {
return err
}
defer os . Remove ( compressedFile )
if err = compressFromTo ( path , compressedFile ) ; err != nil {
return err
}
return os . Rename ( compressedFile , path )
}
func compressFromTo ( from , to string ) error {
uncompressedFd , err := os . Open ( from )
if err != nil {
return err
}
defer uncompressedFd . Close ( )
compressedFd , err := os . Create ( to )
if err != nil {
return err
}
defer compressedFd . Close ( )
gw := gzip . NewWriter ( compressedFd )
_ , err = io . Copy ( gw , uncompressedFd )
if err != nil {
return err
}
err = gw . Close ( )
if err != nil {
return err
}
return nil
}
type countingReader struct {
reader io . Reader
count int64
@ -161,3 +223,12 @@ func (c *countingReader) Read(p []byte) (int, error) {
c . count += int64 ( n )
return n , err
}
func tempFilename ( ) ( string , error ) {
f , err := os . CreateTemp ( "" , "rqlite-upload" )
if err != nil {
return "" , err
}
f . Close ( )
return f . Name ( ) , nil
}