|
|
|
@ -120,88 +120,98 @@ func (u *Uploader) Stats() (map[string]interface{}, error) {
|
|
|
|
|
"compress": u.compress,
|
|
|
|
|
"last_upload_time": u.lastUploadTime.Format(time.RFC3339),
|
|
|
|
|
"last_upload_duration": u.lastUploadDuration.String(),
|
|
|
|
|
"last_upload_sum": u.lastSum.String(),
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (u *Uploader) upload(ctx context.Context) error {
|
|
|
|
|
// create a temporary file to hold the data to be uploaded
|
|
|
|
|
tmpfile, err := tempFilename()
|
|
|
|
|
// create a temporary file for the data to be uploaded
|
|
|
|
|
filetoUpload, err := tempFilename()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer os.Remove(tmpfile)
|
|
|
|
|
defer os.Remove(filetoUpload)
|
|
|
|
|
|
|
|
|
|
if err := u.dataProvider.Provide(tmpfile); err != nil {
|
|
|
|
|
if err := u.dataProvider.Provide(filetoUpload); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := u.compressIfNeeded(filetoUpload); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var sum SHA256Sum
|
|
|
|
|
if !u.disableSumCheck {
|
|
|
|
|
// Get the SHA256 sum of the file. If it is the same as the last one, then
|
|
|
|
|
// there is no need to upload the file.
|
|
|
|
|
sum, err = FileSHA256(tmpfile)
|
|
|
|
|
sum, err := FileSHA256(filetoUpload)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if sum.Equals(u.lastSum) {
|
|
|
|
|
if !u.disableSumCheck && sum.Equals(u.lastSum) {
|
|
|
|
|
stats.Add(numUploadsSkipped, 1)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Re-open the file for reading.
|
|
|
|
|
uncompressedF, err := os.Open(tmpfile)
|
|
|
|
|
fd, err := os.Open(filetoUpload)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer uncompressedF.Close()
|
|
|
|
|
|
|
|
|
|
r := io.Reader(uncompressedF)
|
|
|
|
|
if u.compress {
|
|
|
|
|
compressedF, err := os.CreateTemp("", "rqlite-upload")
|
|
|
|
|
cr := &countingReader{reader: fd}
|
|
|
|
|
startTime := time.Now()
|
|
|
|
|
err = u.storageClient.Upload(ctx, cr)
|
|
|
|
|
if err != nil {
|
|
|
|
|
stats.Add(numUploadsFail, 1)
|
|
|
|
|
} else {
|
|
|
|
|
u.lastSum = sum
|
|
|
|
|
stats.Add(numUploadsOK, 1)
|
|
|
|
|
stats.Add(totalUploadBytes, cr.count)
|
|
|
|
|
stats.Get(lastUploadBytes).(*expvar.Int).Set(cr.count)
|
|
|
|
|
u.lastUploadTime = time.Now()
|
|
|
|
|
u.lastUploadDuration = time.Since(startTime)
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer os.Remove(compressedF.Name())
|
|
|
|
|
|
|
|
|
|
gw := gzip.NewWriter(compressedF)
|
|
|
|
|
_, err = io.Copy(gw, uncompressedF)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
func (u *Uploader) compressIfNeeded(path string) error {
|
|
|
|
|
if !u.compress {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
err = gw.Close()
|
|
|
|
|
|
|
|
|
|
compressedFile, err := tempFilename()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := compressedF.Close(); err != nil {
|
|
|
|
|
defer os.Remove(compressedFile)
|
|
|
|
|
|
|
|
|
|
if err = compressFromTo(path, compressedFile); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
compressedF, err = os.Open(compressedF.Name())
|
|
|
|
|
return os.Rename(compressedFile, path)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func compressFromTo(from, to string) error {
|
|
|
|
|
uncompressedFd, err := os.Open(from)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer compressedF.Close()
|
|
|
|
|
r = io.Reader(compressedF)
|
|
|
|
|
defer uncompressedFd.Close()
|
|
|
|
|
|
|
|
|
|
compressedFd, err := os.Create(to)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer compressedFd.Close()
|
|
|
|
|
|
|
|
|
|
cr := &countingReader{reader: r}
|
|
|
|
|
startTime := time.Now()
|
|
|
|
|
err = u.storageClient.Upload(ctx, cr)
|
|
|
|
|
gw := gzip.NewWriter(compressedFd)
|
|
|
|
|
_, err = io.Copy(gw, uncompressedFd)
|
|
|
|
|
if err != nil {
|
|
|
|
|
stats.Add(numUploadsFail, 1)
|
|
|
|
|
} else {
|
|
|
|
|
u.lastSum = sum
|
|
|
|
|
stats.Add(numUploadsOK, 1)
|
|
|
|
|
stats.Add(totalUploadBytes, cr.count)
|
|
|
|
|
stats.Get(lastUploadBytes).(*expvar.Int).Set(cr.count)
|
|
|
|
|
u.lastUploadTime = time.Now()
|
|
|
|
|
u.lastUploadDuration = time.Since(startTime)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
err = gw.Close()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type countingReader struct {
|
|
|
|
|
reader io.Reader
|
|
|
|
|