1
0
Fork 0

Clarify end-to-end test code

master
Philip O'Toole 8 months ago
parent 0774faa04f
commit 9b8e2559d2

@ -219,7 +219,7 @@ class TestAutoClusteringKVStores(unittest.TestCase):
j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
n0.wait_for_all_fsm()
n0.wait_for_all_applied()
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
@ -276,7 +276,7 @@ class TestAutoClusteringKVStores(unittest.TestCase):
j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
n0.wait_for_all_fsm()
n0.wait_for_all_applied()
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))

@ -274,7 +274,7 @@ class TestAutoBackupS3(unittest.TestCase):
# Then create a table and insert a row. Wait for another backup to happen.
node.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
node.wait_for_all_fsm()
node.wait_for_all_applied()
j = node.query('SELECT count(*) FROM foo', level='strong')
self.assertEqual(j, d_("{'results': [{'values': [[0]], 'types': ['integer'], 'columns': ['count(*)']}]}"))
node.wait_for_upload(2)
@ -404,7 +404,7 @@ class TestAutoBackupS3(unittest.TestCase):
# Then create a table and insert a row. Wait for a backup to happen.
leader.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
leader.wait_for_all_fsm()
leader.wait_for_all_applied()
leader.wait_for_upload(2)
# Confirm that the follower has performed no backups.

@ -350,16 +350,19 @@ class Node(object):
def fsm_index(self):
return int(self.status()['store']['fsm_index'])
def commit_index(self):
def raft_commit_index(self):
return int(self.status()['store']['raft']['commit_index'])
def applied_index(self):
def raft_applied_index(self):
return int(self.status()['store']['raft']['applied_index'])
def last_log_index(self):
def raft_fsm_pending_index(self):
return int(self.status()['store']['raft']['fsm_pending'])
def raft_last_log_index(self):
return int(self.status()['store']['raft']['last_log_index'])
def last_snapshot_index(self):
def raft_last_snapshot_index(self):
return int(self.status()['store']['raft']['last_snapshot_index'])
def num_join_requests(self):
@ -428,46 +431,21 @@ class Node(object):
while self.fsm_index() < index:
if t > timeout:
raise Exception('timeout, target index: %d, actual index %d' % (index, self.fsm_index()))
time.sleep(1)
time.sleep(0.1)
t+=1
return self.fsm_index()
def wait_for_commit_index(self, index, timeout=TIMEOUT):
'''
Wait until the commit index reaches the given value
'''
t = 0
while self.commit_index() < index:
if t > timeout:
raise Exception('wait_for_commit_index timeout')
time.sleep(1)
t+=1
return self.commit_index()
def wait_for_all_applied(self, timeout=TIMEOUT):
'''
Wait until the applied index equals the commit index.
'''
t = 0
while self.commit_index() != self.applied_index():
while self.raft_commit_index() != self.raft_applied_index():
if t > timeout:
raise Exception('wait_for_all_applied timeout')
time.sleep(1)
t+=1
return self.applied_index()
def wait_for_all_fsm(self, timeout=TIMEOUT):
'''
Wait until all outstanding database commands have actually
been applied to the database i.e. state machine.
'''
t = 0
while self.fsm_index() != self.db_applied_index():
if t > timeout:
raise Exception('wait_for_all_fsm timeout')
time.sleep(1)
time.sleep(0.1)
t+=1
return self.fsm_index()
return self.raft_applied_index()
def wait_for_restores(self, num, timeout=TIMEOUT):
'''
@ -478,7 +456,7 @@ class Node(object):
while self.num_restores() != num:
if t > timeout:
raise Exception('wait_for_restores timeout wanted %d, got %d' % (num, self.num_restores()))
time.sleep(1)
time.sleep(0.1)
t+=1
return self.num_restores()

@ -186,7 +186,7 @@ class TestJoinCatchup(unittest.TestCase):
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
applied = n0.wait_for_all_fsm()
applied = n0.wait_for_all_applied()
# Test that follower node has correct state in local database, and then kill the follower
self.n1.wait_for_fsm_index(applied)
@ -199,7 +199,7 @@ class TestJoinCatchup(unittest.TestCase):
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n0.query('SELECT COUNT(*) FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': ['integer'], 'columns': ['COUNT(*)']}]}"))
applied = n0.wait_for_all_fsm()
applied = n0.wait_for_all_applied()
# Restart follower, explicity rejoin, and ensure it picks up new records
self.n1.start(join=self.n0.RaftAddr())
@ -218,7 +218,7 @@ class TestJoinCatchup(unittest.TestCase):
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
applied = n0.wait_for_all_fsm()
applied = n0.wait_for_all_applied()
# Test that follower node has correct state in local database, and then kill the follower
self.n1.wait_for_fsm_index(applied)
@ -231,7 +231,7 @@ class TestJoinCatchup(unittest.TestCase):
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n0.query('SELECT COUNT(*) FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': ['integer'], 'columns': ['COUNT(*)']}]}"))
applied = n0.wait_for_all_fsm()
applied = n0.wait_for_all_applied()
# Restart follower with new network attributes, explicity rejoin, and ensure it picks up new records
self.n1.scramble_network()

@ -65,7 +65,7 @@ class TestEndToEnd(unittest.TestCase):
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
fsmIdx = n.wait_for_all_fsm()
fsmIdx = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
@ -227,7 +227,7 @@ class TestClusterRecovery(unittest.TestCase):
j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
fsmIdx = n0.wait_for_all_fsm()
fsmIdx = n0.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
@ -295,7 +295,7 @@ class TestRequestForwarding(unittest.TestCase):
f = self.cluster.followers()[0]
j = f.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
fsmIdx = l.wait_for_all_fsm()
fsmIdx = l.wait_for_all_applied()
j = l.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
@ -313,7 +313,7 @@ class TestRequestForwarding(unittest.TestCase):
f = self.cluster.followers()[0]
j = f.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
fsmIdx = l.wait_for_all_fsm()
fsmIdx = l.wait_for_all_applied()
j = f.execute_queued('INSERT INTO foo(name) VALUES("declan")')
self.assertTrue(is_sequence_number(str(j)))
@ -341,7 +341,7 @@ class TestRequestForwarding(unittest.TestCase):
f = self.cluster.followers()[0]
j = f.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
fsmIdx = l.wait_for_all_fsm()
fsmIdx = l.wait_for_all_applied()
# Load up the queue!
for i in range(0,2000):
@ -381,7 +381,7 @@ class TestEndToEndNonVoter(unittest.TestCase):
j = self.leader.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = self.leader.execute('INSERT INTO foo(name) VALUES("fiona")')
self.leader.wait_for_all_fsm()
self.leader.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = self.leader.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
@ -437,7 +437,7 @@ class TestEndToEndNonVoterFollowsLeader(unittest.TestCase):
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
n.wait_for_all_fsm()
n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
@ -468,7 +468,7 @@ class TestEndToEndBackupRestore(unittest.TestCase):
self.node0.wait_for_leader()
self.node0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.node0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.node0.wait_for_all_fsm()
self.node0.wait_for_all_applied()
self.node1 = Node(RQLITED_PATH, '1')
self.node1.start(join=self.node0.RaftAddr())
self.node1.wait_for_leader()
@ -511,7 +511,7 @@ class TestEndToEndBackupRestore(unittest.TestCase):
self.assertTrue(self.node3.is_leader())
self.node4.restore(self.db_file, fmt='binary')
self.node3.wait_for_all_fsm()
self.node3.wait_for_all_applied()
j = self.node3.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
@ -565,7 +565,7 @@ class TestEndToEndSnapRestoreCluster(unittest.TestCase):
# Let's get multiple snapshots done.
for i in range(0,300):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_fsm()
self.n0.wait_for_all_applied()
num_snaps = self.wait_for_snap(1)
# Add two more nodes to the cluster
@ -593,7 +593,7 @@ class TestEndToEndSnapRestoreCluster(unittest.TestCase):
# Let's get more snapshots done.
for j in range(0, 200):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_fsm()
self.n0.wait_for_all_applied()
self.wait_for_snap(num_snaps+1)
# Restart killed node, check it has full state.

@ -32,13 +32,13 @@ class TestSingleNode(unittest.TestCase):
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("fiona")')
applied = n.wait_for_all_fsm()
applied = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("declan")')
applied = n.wait_for_all_fsm()
applied = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar where id=2')
self.assertEqual(j, d_("{'results': [{'values': [[2, 'declan']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
@ -57,7 +57,7 @@ class TestSingleNode(unittest.TestCase):
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("fiona")')
applied = n.wait_for_all_fsm()
applied = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar', pretty=True, text=True)
exp = '''{
@ -89,11 +89,11 @@ class TestSingleNode(unittest.TestCase):
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20])
applied = n.wait_for_all_fsm()
applied = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["declan", None])
applied = n.wait_for_all_fsm()
applied = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar WHERE age=?', params=[20])
@ -110,7 +110,7 @@ class TestSingleNode(unittest.TestCase):
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20])
applied = n.wait_for_all_fsm()
applied = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
@ -120,7 +120,7 @@ class TestSingleNode(unittest.TestCase):
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["declan", None])
applied = n.wait_for_all_fsm()
applied = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar WHERE name=:name', params={"name": "declan"})
@ -137,7 +137,7 @@ class TestSingleNode(unittest.TestCase):
['INSERT INTO bar(name, age) VALUES("sinead", 25)']
])
j = n.execute_raw(body)
applied = n.wait_for_all_fsm()
applied = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}, {'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20], [2, 'sinead', 25]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
@ -153,7 +153,7 @@ class TestSingleNode(unittest.TestCase):
['INSERT INTO bar(name, age) VALUES("sinead", 25)']
])
j = n.request_raw(body)
applied = n.wait_for_all_fsm()
applied = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}, {'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.request('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20], [2, 'sinead', 25]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
@ -168,7 +168,7 @@ class TestSingleNode(unittest.TestCase):
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
applied = n.wait_for_all_fsm()
applied = n.wait_for_all_applied()
# Wait for a snapshot to happen.
timeout = 10
@ -244,7 +244,7 @@ class TestEndToEndSnapshotRestoreSingle(unittest.TestCase):
while True:
if t > timeout:
raise Exception('timeout')
if self.n0.last_snapshot_index() >= n:
if self.n0.raft_last_snapshot_index() >= n:
break
time.sleep(1)
t+=1
@ -257,7 +257,7 @@ class TestEndToEndSnapshotRestoreSingle(unittest.TestCase):
for i in range(0,200):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_fsm()
self.n0.wait_for_all_applied()
self.waitForSnapIndex(175)
# Ensure node has the full correct state.

Loading…
Cancel
Save