1
0
Fork 0

Better timeout handling

master
Philip O'Toole 8 months ago
parent 0e00c5d2d4
commit 078d0eacaf

@ -45,13 +45,13 @@ type DataProvider interface {
var stats *expvar.Map
const (
numUploadsOK = "num_uploads_ok"
numUploadsFail = "num_uploads_fail"
numUploadsSkipped = "num_uploads_skipped"
numUploadsSkippedSum = "num_uploads_skipped_sum"
numSumGetFail = "num_sum_get_fail"
totalUploadBytes = "total_upload_bytes"
lastUploadBytes = "last_upload_bytes"
numUploadsOK = "num_uploads_ok"
numUploadsFail = "num_uploads_fail"
numUploadsSkipped = "num_uploads_skipped"
numUploadsSkippedID = "num_uploads_skipped_id"
numSumGetFail = "num_sum_get_fail"
totalUploadBytes = "total_upload_bytes"
lastUploadBytes = "last_upload_bytes"
UploadCompress = true
UploadNoCompress = false
@ -68,7 +68,7 @@ func ResetStats() {
stats.Add(numUploadsOK, 0)
stats.Add(numUploadsFail, 0)
stats.Add(numUploadsSkipped, 0)
stats.Add(numUploadsSkippedSum, 0)
stats.Add(numUploadsSkippedID, 0)
stats.Add(numSumGetFail, 0)
stats.Add(totalUploadBytes, 0)
stats.Add(lastUploadBytes, 0)
@ -175,7 +175,7 @@ func (u *Uploader) upload(ctx context.Context) error {
stats.Add(numSumGetFail, 1)
u.logger.Printf("failed to get current sum from %s: %v", u.storageClient, err)
} else if err == nil && cloudID == strconv.FormatUint(li, 10) {
stats.Add(numUploadsSkippedSum, 1)
stats.Add(numUploadsSkippedID, 1)
return nil
}
}

@ -61,10 +61,10 @@ func Test_UploaderSingleUpload(t *testing.T) {
}
}
// Test_UploaderSingleUpload_Checksum ensures that when the checksum in the
// storage service is the same as the checksum of the data being uploaded, the
// Test_UploaderSingleUpload_ID ensures that when the ID in the
// storage service is the same as the ID of the data being uploaded, the
// upload is skipped.
func Test_UploaderSingleUpload_Checksum(t *testing.T) {
func Test_UploaderSingleUpload_ID(t *testing.T) {
ResetStats()
var wg sync.WaitGroup
wg.Add(1)
@ -83,7 +83,7 @@ func Test_UploaderSingleUpload_Checksum(t *testing.T) {
wg.Wait()
cancel()
<-done
if exp, got := int64(1), stats.Get(numUploadsSkippedSum).(*expvar.Int); exp != got.Value() {
if exp, got := int64(1), stats.Get(numUploadsSkippedID).(*expvar.Int); exp != got.Value() {
t.Errorf("expected numUploadsSkippedSum to be %d, got %d", exp, got)
}
}

@ -199,21 +199,19 @@ class TestAutoBackupS3(unittest.TestCase):
cfg = write_random_file(json.dumps(auto_backup_cfg))
# Create a node, enable automatic backups, and start it. Then
# create a table and insert a row. Wait for an initial backup
# to happen because there is no data in the cloud.
# create a table and insert a row.
node = Node(RQLITED_PATH, '0', auto_backup=cfg)
node.start()
node.wait_for_leader()
node.wait_for_upload(1)
node.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
node.wait_for_upload(2)
node.wait_for_upload(1)
# Wait and check that no further backups have been made.
node.wait_until_uploads_idle()
# Write one more row, confirm another backup is made.
node.execute('INSERT INTO foo(name) VALUES("fiona")')
node.wait_for_upload(3)
node.wait_for_upload(2)
# Wait and check that no further backups have been made.
node.wait_until_uploads_idle()
@ -265,16 +263,15 @@ class TestAutoBackupS3(unittest.TestCase):
cfg = write_random_file(json.dumps(auto_backup_cfg))
# Create a node, enable automatic backups, and start it. Then
# create a table and insert a row. An initial backup will happen
# because there is no data in the cloud.
# create a table and insert a row.
node = Node(RQLITED_PATH, '0', auto_backup=cfg)
node.start()
node.wait_for_leader()
node.wait_for_upload(1)
# Then create a table and insert a row. Wait for another backup to happen.
node.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
node.wait_for_all_applied()
node.wait_for_upload(1)
j = node.query('SELECT count(*) FROM foo', level='strong')
self.assertEqual(j, d_("{'results': [{'values': [[0]], 'types': ['integer'], 'columns': ['count(*)']}]}"))
node.wait_for_upload(2)
@ -324,12 +321,10 @@ class TestAutoBackupS3(unittest.TestCase):
cfg = write_random_file(json.dumps(auto_backup_cfg))
# Create a node, enable automatic backups, and start it. Then
# create a table and insert a row. Wait for a backup to happen
# because there is no data in the cloud.
# create a table and insert a row.
node = Node(RQLITED_PATH, '0', auto_backup=cfg)
node.start()
node.wait_for_leader()
node.wait_for_upload(1)
# Then create a table and insert rows. Wait for another backup to happen.
node.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
@ -442,22 +437,20 @@ class TestAutoBackupS3(unittest.TestCase):
}
cfg = write_random_file(json.dumps(auto_backup_cfg))
# Create a cluster with automatic backups enabled. An initial
# backup will happen because there is no data in the cloud.
# Create a cluster with automatic backups enabled.
node = Node(RQLITED_PATH, '0', auto_backup=cfg)
node.start()
node.wait_for_leader()
node.wait_for_upload(1)
# Then create a table and insert a row. Wait for another backup to happen.
node.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
node.wait_for_upload(2)
node.wait_for_upload(1)
# Restart the node, and confirm no backup is uploaded due to the restart.
node.stop(graceful=True)
node.start()
node.wait_for_leader()
node.wait_for_upload_skipped_sum(1)
node.wait_for_upload_skipped_id(1)
# Insert a row, make sure a backup will happen now.
node.execute('INSERT INTO foo(name) VALUES("fiona")')

@ -220,16 +220,15 @@ class Node(object):
command.append(self.dir)
self.process = subprocess.Popen(command, stdout=self.stdout_fd, stderr=self.stderr_fd)
t = 0
deadline = time.time() + timeout
while wait:
if t > timeout:
if time.time() > deadline:
self.dump_log("dumping log due to timeout during start")
raise Exception('rqlite process failed to start within %d seconds' % timeout)
try:
self.status()
except requests.exceptions.ConnectionError:
time.sleep(1)
t+=1
time.sleep(0.1)
else:
break
return self
@ -304,9 +303,9 @@ class Node(object):
def wait_for_leader(self, timeout=TIMEOUT, log=True, ready=True):
lr = None
t = 0
deadline = time.time() + timeout
while lr == None or lr['addr'] == '':
if t > timeout:
if time.time() > deadline:
if log:
self.dump_log("dumping log due to timeout waiting for leader")
raise Exception('rqlite node failed to detect leader within %d seconds' % timeout)
@ -314,8 +313,7 @@ class Node(object):
lr = self.status()['store']['leader']
except (KeyError, requests.exceptions.ConnectionError):
pass
time.sleep(1)
t+=1
time.sleep(0.1)
# Perform a check on readyness while we're here.
if ready and (self.ready() is not True):
@ -329,12 +327,11 @@ class Node(object):
return lr
def wait_for_ready(self, timeout=TIMEOUT):
t = 0
while t < timeout:
deadline = time.time() + timeout
while time.time() < deadline:
if self.ready():
return
time.sleep(1)
t+=1
time.sleep(0.1)
raise Exception('rqlite node failed to become ready within %d seconds' % timeout)
def expect_leader_fail(self, timeout=TIMEOUT):
@ -382,53 +379,47 @@ class Node(object):
'ok': int(self.expvar()['uploader']['num_uploads_ok']),
'fail': int(self.expvar()['uploader']['num_uploads_fail']),
'skip': int(self.expvar()['uploader']['num_uploads_skipped']),
'skip_sum': int(self.expvar()['uploader']['num_uploads_skipped_sum'])
'skip_id': int(self.expvar()['uploader']['num_uploads_skipped_id'])
}
def wait_for_upload(self, i, timeout=TIMEOUT):
'''
Wait until the number of uploads is equal to the given value.
'''
t = 0
while t < timeout:
deadline = time.time() + timeout
while time.time() < deadline:
if self.num_auto_backups()['ok'] == i:
return self.num_auto_backups()
time.sleep(0.1)
t+=1
n = self.num_auto_backups()
raise Exception('rqlite node failed to upload backup within %d seconds (%d, %d, %d, %d)' % (timeout, n[0], n[1], n[2], n[3]))
raise Exception('rqlite node failed to upload backup within %d seconds (%s)' % (timeout, self.num_auto_backups()))
def wait_for_upload_skipped_sum(self, i, timeout=TIMEOUT):
def wait_for_upload_skipped_id(self, i, timeout=TIMEOUT):
'''
Wait until the number of skipped sum uploads is at least as great as the given value.
Wait until the number of skipped ID uploads is at least as great as the given value.
'''
t = 0
while t < timeout:
if self.num_auto_backups()['skip_sum'] >= i:
deadline = time.time() + timeout
while time.time() < deadline:
if self.num_auto_backups()['skip_id'] >= i:
return self.num_auto_backups()
time.sleep(0.1)
t+=1
n = self.num_auto_backups()
raise Exception('rqlite node failed to skip backup due sum within %d seconds (%d, %d, %d, %d)' % (timeout, n[0], n[1], n[2], n[3]))
raise Exception('rqlite node failed to skip backup due sum within %d seconds (%s)' % (timeout, self.num_auto_backups()))
def wait_until_uploads_idle(self, timeout=TIMEOUT):
'''
Wait until uploads go idle.
'''
t = 0
while t < timeout:
deadline = time.time() + timeout
while time.time() < deadline:
backups = self.num_auto_backups()['ok']
skipped = self.num_auto_backups()['skip']
skipped_sum = self.num_auto_backups()['skip_sum']
time.sleep(0.5)
if self.num_auto_backups()['skip'] + self.num_auto_backups()['skip_sum'] == skipped + skipped_sum:
skipped_sum = self.num_auto_backups()['skip_id']
time.sleep(0.1)
if self.num_auto_backups()['skip'] + self.num_auto_backups()['skip_id'] == skipped + skipped_sum:
# Skipped uploads are not increasing, so uploads are not idle
t+=1
continue
# OK, skipped uploads are increasing, but has the number of backups stayed the same?
if self.num_auto_backups()['ok'] != backups:
t+=1
continue
# Backups are idle
@ -441,24 +432,22 @@ class Node(object):
'''
Wait until the given index has been applied to the state machine.
'''
t = 0
deadline = time.time() + timeout
while self.fsm_index() < index:
if t > timeout:
if time.time() > deadline:
raise Exception('timeout, target index: %d, actual index %d' % (index, self.fsm_index()))
time.sleep(0.1)
t+=1
return self.fsm_index()
def wait_for_all_applied(self, timeout=TIMEOUT):
'''
Wait until the applied index equals the commit index.
'''
t = 0
deadline = time.time() + timeout
while self.raft_commit_index() != self.raft_applied_index():
if t > timeout:
if time.time() > deadline:
raise Exception('wait_for_all_applied timeout')
time.sleep(0.1)
t+=1
return self.raft_applied_index()
def wait_for_restores(self, num, timeout=TIMEOUT):
@ -466,12 +455,11 @@ class Node(object):
Wait until the number of snapshot-restores on this node reach
the given value.
'''
t = 0
deadline = time.time() + timeout
while self.num_restores() != num:
if t > timeout:
if time.time() > deadline:
raise Exception('wait_for_restores timeout wanted %d, got %d' % (num, self.num_restores()))
time.sleep(0.1)
t+=1
return self.num_restores()
def query(self, statement, params=None, level='weak', pretty=False, text=False, associative=False):
@ -651,17 +639,16 @@ class Cluster(object):
def __init__(self, nodes):
self.nodes = nodes
def wait_for_leader(self, node_exc=None, timeout=TIMEOUT):
t = 0
deadline = time.time() + timeout
while True:
if t > timeout:
if time.time() > deadline:
raise Exception('timeout')
for n in self.nodes:
if node_exc is not None and n == node_exc:
continue
if n.is_leader():
return n
time.sleep(1)
t+=1
time.sleep(0.1)
def cross_check_leader(self):
'''
Check that all nodes agree on who the leader is.

Loading…
Cancel
Save