1
0
Fork 0

End-to-end test for skipped uploads due to sum

master
Philip O'Toole 8 months ago
parent 22c42eed0a
commit 7c11295863

@ -46,11 +46,12 @@ type DataProvider interface {
var stats *expvar.Map
const (
numUploadsOK = "num_uploads_ok"
numUploadsFail = "num_uploads_fail"
numUploadsSkipped = "num_uploads_skipped"
totalUploadBytes = "total_upload_bytes"
lastUploadBytes = "last_upload_bytes"
numUploadsOK = "num_uploads_ok"
numUploadsFail = "num_uploads_fail"
numUploadsSkipped = "num_uploads_skipped"
numUploadsSkippedSum = "num_uploads_skippedSum"
totalUploadBytes = "total_upload_bytes"
lastUploadBytes = "last_upload_bytes"
UploadCompress = true
UploadNoCompress = false
@ -67,6 +68,7 @@ func ResetStats() {
stats.Add(numUploadsOK, 0)
stats.Add(numUploadsFail, 0)
stats.Add(numUploadsSkipped, 0)
stats.Add(numUploadsSkippedSum, 0)
stats.Add(totalUploadBytes, 0)
stats.Add(lastUploadBytes, 0)
}
@ -184,7 +186,7 @@ func (u *Uploader) upload(ctx context.Context) error {
// uploader started. Double-check that we really need to upload.
cloudSum, err := u.storageClient.CurrentSum(ctx)
if err == nil && bytes.Equal(cloudSum, filesum) {
stats.Add(numUploadsSkipped, 1)
stats.Add(numUploadsSkippedSum, 1)
return nil
}
}

@ -94,7 +94,7 @@ func (s *S3Client) Upload(ctx context.Context, reader io.Reader, sum []byte) err
if sum != nil {
input.Metadata = map[string]*string{
AWSS3SumKey: aws.String(fmt.Sprintf("%x", sum)),
AWSS3SumKey: aws.String(hex.EncodeToString(sum)),
}
}
_, err = uploader.UploadWithContext(ctx, input)

@ -245,8 +245,6 @@ class TestAutoBackupS3(unittest.TestCase):
node = None
cfg = None
path = None
backup_file = None
access_key_id = os.environ['RQLITE_S3_ACCESS_KEY']
secret_access_key_id = os.environ['RQLITE_S3_SECRET_ACCESS_KEY']
@ -292,6 +290,54 @@ class TestAutoBackupS3(unittest.TestCase):
deprovision_node(follower)
os.remove(cfg)
@unittest.skipUnless(env_present('RQLITE_S3_ACCESS_KEY'), "S3 credentials not available")
def test_no_upload_restart(self):
'''Test that restarting a node that already upload doesn't upload again'''
node = None
cfg = None
path = None
access_key_id = os.environ['RQLITE_S3_ACCESS_KEY']
secret_access_key_id = os.environ['RQLITE_S3_SECRET_ACCESS_KEY']
# Create the auto-backup config file
path = random_string(32)
auto_backup_cfg = {
"version": 1,
"type": "s3",
"interval": "100ms",
"no_compress": True,
"sub" : {
"access_key_id": access_key_id,
"secret_access_key": secret_access_key_id,
"region": S3_BUCKET_REGION,
"bucket": S3_BUCKET,
"path": path
}
}
cfg = write_random_file(json.dumps(auto_backup_cfg))
# Create a cluster with automatic backups enabled.
node = Node(RQLITED_PATH, '0', auto_backup=cfg)
node.start()
node.wait_for_leader()
# Then create a table and insert a row. Wait for a backup to happen.
i = node.num_auto_backups()[0]
node.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
node.execute('INSERT INTO foo(name) VALUES("fiona")')
node.wait_for_all_fsm()
node.wait_for_upload(i+1)
# Restart the node, and confirm no further backups are made.
node.stop(graceful=True)
node.start()
node.wait_for_leader()
node.wait_for_upload_skipped_sum(1)
delete_s3_object(access_key_id, secret_access_key_id, S3_BUCKET, path)
deprovision_node(node)
os.remove(cfg)
@unittest.skipUnless(env_present('RQLITE_S3_ACCESS_KEY'), "S3 credentials not available")
def test_no_compress_vacuum(self):
'''Test that automatic backups to S3 work with compression off and vacuum on'''

@ -373,11 +373,12 @@ class Node(object):
def num_auto_backups(self):
'''
Return a tuple of the number of successful, failed, and skipped auto-backups.
Return a tuple of the number of successful, failed, skipped auto-backups.
'''
return (int(self.expvar()['uploader']['num_uploads_ok']),
int(self.expvar()['uploader']['num_uploads_fail']),
int(self.expvar()['uploader']['num_uploads_skipped']))
int(self.expvar()['uploader']['num_uploads_skipped'],
int(self.expvar()['uploader']['num_uploads_skipped_sum']))
def wait_for_upload(self, i, timeout=TIMEOUT):
'''
@ -390,7 +391,20 @@ class Node(object):
time.sleep(1)
t+=1
n = self.num_auto_backups()
raise Exception('rqlite node failed to upload backup within %d seconds (%d, %d, %d)' % (timeout, n[0], n[1], n[2]))
raise Exception('rqlite node failed to upload backup within %d seconds (%d, %d, %d)' % (timeout, n[0], n[1], n[2], n[3]))
def wait_for_upload_skipped_sum(self, i, timeout=TIMEOUT):
'''
Wait until the number of skipped sum uploads is at least as great as the given value.
'''
t = 0
while t < timeout:
if self.num_auto_backups()[3] >= i:
return self.num_auto_backups()
time.sleep(1)
t+=1
n = self.num_auto_backups()
raise Exception('rqlite node failed to skip backup within %d seconds (%d, %d, %d)' % (timeout, n[0], n[1], n[2], n[3]))
def wait_for_fsm_index(self, index, timeout=TIMEOUT):
'''

Loading…
Cancel
Save