diff --git a/system_test/e2e/auto_clustering.py b/system_test/e2e/auto_clustering.py index 41757095..4f5fa60d 100644 --- a/system_test/e2e/auto_clustering.py +++ b/system_test/e2e/auto_clustering.py @@ -219,7 +219,7 @@ class TestAutoClusteringKVStores(unittest.TestCase): j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.assertEqual(j, d_("{'results': [{}]}")) j = n0.execute('INSERT INTO foo(name) VALUES("fiona")') - n0.wait_for_all_fsm() + n0.wait_for_all_applied() j = n0.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) @@ -276,7 +276,7 @@ class TestAutoClusteringKVStores(unittest.TestCase): j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.assertEqual(j, d_("{'results': [{}]}")) j = n0.execute('INSERT INTO foo(name) VALUES("fiona")') - n0.wait_for_all_fsm() + n0.wait_for_all_applied() j = n0.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) diff --git a/system_test/e2e/auto_state.py b/system_test/e2e/auto_state.py index a2e5f3f7..de527c4f 100644 --- a/system_test/e2e/auto_state.py +++ b/system_test/e2e/auto_state.py @@ -274,7 +274,7 @@ class TestAutoBackupS3(unittest.TestCase): # Then create a table and insert a row. Wait for another backup to happen. node.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') - node.wait_for_all_fsm() + node.wait_for_all_applied() j = node.query('SELECT count(*) FROM foo', level='strong') self.assertEqual(j, d_("{'results': [{'values': [[0]], 'types': ['integer'], 'columns': ['count(*)']}]}")) node.wait_for_upload(2) @@ -404,7 +404,7 @@ class TestAutoBackupS3(unittest.TestCase): # Then create a table and insert a row. Wait for a backup to happen. leader.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') - leader.wait_for_all_fsm() + leader.wait_for_all_applied() leader.wait_for_upload(2) # Confirm that the follower has performed no backups. diff --git a/system_test/e2e/helpers.py b/system_test/e2e/helpers.py index 6c8502f2..974a8379 100644 --- a/system_test/e2e/helpers.py +++ b/system_test/e2e/helpers.py @@ -350,16 +350,19 @@ class Node(object): def fsm_index(self): return int(self.status()['store']['fsm_index']) - def commit_index(self): + def raft_commit_index(self): return int(self.status()['store']['raft']['commit_index']) - def applied_index(self): + def raft_applied_index(self): return int(self.status()['store']['raft']['applied_index']) - def last_log_index(self): + def raft_fsm_pending_index(self): + return int(self.status()['store']['raft']['fsm_pending']) + + def raft_last_log_index(self): return int(self.status()['store']['raft']['last_log_index']) - def last_snapshot_index(self): + def raft_last_snapshot_index(self): return int(self.status()['store']['raft']['last_snapshot_index']) def num_join_requests(self): @@ -428,46 +431,21 @@ class Node(object): while self.fsm_index() < index: if t > timeout: raise Exception('timeout, target index: %d, actual index %d' % (index, self.fsm_index())) - time.sleep(1) + time.sleep(0.1) t+=1 return self.fsm_index() - def wait_for_commit_index(self, index, timeout=TIMEOUT): - ''' - Wait until the commit index reaches the given value - ''' - t = 0 - while self.commit_index() < index: - if t > timeout: - raise Exception('wait_for_commit_index timeout') - time.sleep(1) - t+=1 - return self.commit_index() - def wait_for_all_applied(self, timeout=TIMEOUT): ''' Wait until the applied index equals the commit index. ''' t = 0 - while self.commit_index() != self.applied_index(): + while self.raft_commit_index() != self.raft_applied_index(): if t > timeout: raise Exception('wait_for_all_applied timeout') - time.sleep(1) - t+=1 - return self.applied_index() - - def wait_for_all_fsm(self, timeout=TIMEOUT): - ''' - Wait until all outstanding database commands have actually - been applied to the database i.e. state machine. - ''' - t = 0 - while self.fsm_index() != self.db_applied_index(): - if t > timeout: - raise Exception('wait_for_all_fsm timeout') - time.sleep(1) + time.sleep(0.1) t+=1 - return self.fsm_index() + return self.raft_applied_index() def wait_for_restores(self, num, timeout=TIMEOUT): ''' @@ -478,7 +456,7 @@ class Node(object): while self.num_restores() != num: if t > timeout: raise Exception('wait_for_restores timeout wanted %d, got %d' % (num, self.num_restores())) - time.sleep(1) + time.sleep(0.1) t+=1 return self.num_restores() diff --git a/system_test/e2e/joining.py b/system_test/e2e/joining.py index 04fc4c64..19ff9c8b 100644 --- a/system_test/e2e/joining.py +++ b/system_test/e2e/joining.py @@ -186,7 +186,7 @@ class TestJoinCatchup(unittest.TestCase): self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n0.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) - applied = n0.wait_for_all_fsm() + applied = n0.wait_for_all_applied() # Test that follower node has correct state in local database, and then kill the follower self.n1.wait_for_fsm_index(applied) @@ -199,7 +199,7 @@ class TestJoinCatchup(unittest.TestCase): self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}")) j = n0.query('SELECT COUNT(*) FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': ['integer'], 'columns': ['COUNT(*)']}]}")) - applied = n0.wait_for_all_fsm() + applied = n0.wait_for_all_applied() # Restart follower, explicity rejoin, and ensure it picks up new records self.n1.start(join=self.n0.RaftAddr()) @@ -218,7 +218,7 @@ class TestJoinCatchup(unittest.TestCase): self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n0.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) - applied = n0.wait_for_all_fsm() + applied = n0.wait_for_all_applied() # Test that follower node has correct state in local database, and then kill the follower self.n1.wait_for_fsm_index(applied) @@ -231,7 +231,7 @@ class TestJoinCatchup(unittest.TestCase): self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}")) j = n0.query('SELECT COUNT(*) FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': ['integer'], 'columns': ['COUNT(*)']}]}")) - applied = n0.wait_for_all_fsm() + applied = n0.wait_for_all_applied() # Restart follower with new network attributes, explicity rejoin, and ensure it picks up new records self.n1.scramble_network() diff --git a/system_test/e2e/multi_node.py b/system_test/e2e/multi_node.py index e6cda904..46e4299f 100644 --- a/system_test/e2e/multi_node.py +++ b/system_test/e2e/multi_node.py @@ -65,7 +65,7 @@ class TestEndToEnd(unittest.TestCase): j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.assertEqual(j, d_("{'results': [{}]}")) j = n.execute('INSERT INTO foo(name) VALUES("fiona")') - fsmIdx = n.wait_for_all_fsm() + fsmIdx = n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) @@ -227,7 +227,7 @@ class TestClusterRecovery(unittest.TestCase): j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.assertEqual(j, d_("{'results': [{}]}")) j = n0.execute('INSERT INTO foo(name) VALUES("fiona")') - fsmIdx = n0.wait_for_all_fsm() + fsmIdx = n0.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n0.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) @@ -295,7 +295,7 @@ class TestRequestForwarding(unittest.TestCase): f = self.cluster.followers()[0] j = f.execute('INSERT INTO foo(name) VALUES("fiona")') self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) - fsmIdx = l.wait_for_all_fsm() + fsmIdx = l.wait_for_all_applied() j = l.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) @@ -313,7 +313,7 @@ class TestRequestForwarding(unittest.TestCase): f = self.cluster.followers()[0] j = f.execute('INSERT INTO foo(name) VALUES("fiona")') self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) - fsmIdx = l.wait_for_all_fsm() + fsmIdx = l.wait_for_all_applied() j = f.execute_queued('INSERT INTO foo(name) VALUES("declan")') self.assertTrue(is_sequence_number(str(j))) @@ -341,7 +341,7 @@ class TestRequestForwarding(unittest.TestCase): f = self.cluster.followers()[0] j = f.execute('INSERT INTO foo(name) VALUES("fiona")') self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) - fsmIdx = l.wait_for_all_fsm() + fsmIdx = l.wait_for_all_applied() # Load up the queue! for i in range(0,2000): @@ -381,7 +381,7 @@ class TestEndToEndNonVoter(unittest.TestCase): j = self.leader.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.assertEqual(j, d_("{'results': [{}]}")) j = self.leader.execute('INSERT INTO foo(name) VALUES("fiona")') - self.leader.wait_for_all_fsm() + self.leader.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = self.leader.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) @@ -437,7 +437,7 @@ class TestEndToEndNonVoterFollowsLeader(unittest.TestCase): j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.assertEqual(j, d_("{'results': [{}]}")) j = n.execute('INSERT INTO foo(name) VALUES("fiona")') - n.wait_for_all_fsm() + n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) @@ -468,7 +468,7 @@ class TestEndToEndBackupRestore(unittest.TestCase): self.node0.wait_for_leader() self.node0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.node0.execute('INSERT INTO foo(name) VALUES("fiona")') - self.node0.wait_for_all_fsm() + self.node0.wait_for_all_applied() self.node1 = Node(RQLITED_PATH, '1') self.node1.start(join=self.node0.RaftAddr()) self.node1.wait_for_leader() @@ -511,7 +511,7 @@ class TestEndToEndBackupRestore(unittest.TestCase): self.assertTrue(self.node3.is_leader()) self.node4.restore(self.db_file, fmt='binary') - self.node3.wait_for_all_fsm() + self.node3.wait_for_all_applied() j = self.node3.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) @@ -565,7 +565,7 @@ class TestEndToEndSnapRestoreCluster(unittest.TestCase): # Let's get multiple snapshots done. for i in range(0,300): self.n0.execute('INSERT INTO foo(name) VALUES("fiona")') - self.n0.wait_for_all_fsm() + self.n0.wait_for_all_applied() num_snaps = self.wait_for_snap(1) # Add two more nodes to the cluster @@ -593,7 +593,7 @@ class TestEndToEndSnapRestoreCluster(unittest.TestCase): # Let's get more snapshots done. for j in range(0, 200): self.n0.execute('INSERT INTO foo(name) VALUES("fiona")') - self.n0.wait_for_all_fsm() + self.n0.wait_for_all_applied() self.wait_for_snap(num_snaps+1) # Restart killed node, check it has full state. diff --git a/system_test/e2e/single_node.py b/system_test/e2e/single_node.py index 3c807c7e..2860f31d 100644 --- a/system_test/e2e/single_node.py +++ b/system_test/e2e/single_node.py @@ -32,13 +32,13 @@ class TestSingleNode(unittest.TestCase): self.assertEqual(j, d_("{'results': [{}]}")) j = n.execute('INSERT INTO bar(name) VALUES("fiona")') - applied = n.wait_for_all_fsm() + applied = n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n.query('SELECT * from bar') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) j = n.execute('INSERT INTO bar(name) VALUES("declan")') - applied = n.wait_for_all_fsm() + applied = n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}")) j = n.query('SELECT * from bar where id=2') self.assertEqual(j, d_("{'results': [{'values': [[2, 'declan']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) @@ -57,7 +57,7 @@ class TestSingleNode(unittest.TestCase): j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.assertEqual(j, d_("{'results': [{}]}")) j = n.execute('INSERT INTO bar(name) VALUES("fiona")') - applied = n.wait_for_all_fsm() + applied = n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n.query('SELECT * from bar', pretty=True, text=True) exp = '''{ @@ -89,11 +89,11 @@ class TestSingleNode(unittest.TestCase): self.assertEqual(j, d_("{'results': [{}]}")) j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20]) - applied = n.wait_for_all_fsm() + applied = n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["declan", None]) - applied = n.wait_for_all_fsm() + applied = n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}")) j = n.query('SELECT * from bar WHERE age=?', params=[20]) @@ -110,7 +110,7 @@ class TestSingleNode(unittest.TestCase): self.assertEqual(j, d_("{'results': [{}]}")) j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20]) - applied = n.wait_for_all_fsm() + applied = n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n.query('SELECT * from bar') @@ -120,7 +120,7 @@ class TestSingleNode(unittest.TestCase): self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}")) j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["declan", None]) - applied = n.wait_for_all_fsm() + applied = n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}")) j = n.query('SELECT * from bar WHERE name=:name', params={"name": "declan"}) @@ -137,7 +137,7 @@ class TestSingleNode(unittest.TestCase): ['INSERT INTO bar(name, age) VALUES("sinead", 25)'] ]) j = n.execute_raw(body) - applied = n.wait_for_all_fsm() + applied = n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}, {'last_insert_id': 2, 'rows_affected': 1}]}")) j = n.query('SELECT * from bar') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20], [2, 'sinead', 25]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}")) @@ -153,7 +153,7 @@ class TestSingleNode(unittest.TestCase): ['INSERT INTO bar(name, age) VALUES("sinead", 25)'] ]) j = n.request_raw(body) - applied = n.wait_for_all_fsm() + applied = n.wait_for_all_applied() self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}, {'last_insert_id': 2, 'rows_affected': 1}]}")) j = n.request('SELECT * from bar') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20], [2, 'sinead', 25]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}")) @@ -168,7 +168,7 @@ class TestSingleNode(unittest.TestCase): j = n.execute('INSERT INTO foo(name) VALUES("fiona")') self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) - applied = n.wait_for_all_fsm() + applied = n.wait_for_all_applied() # Wait for a snapshot to happen. timeout = 10 @@ -244,7 +244,7 @@ class TestEndToEndSnapshotRestoreSingle(unittest.TestCase): while True: if t > timeout: raise Exception('timeout') - if self.n0.last_snapshot_index() >= n: + if self.n0.raft_last_snapshot_index() >= n: break time.sleep(1) t+=1 @@ -257,7 +257,7 @@ class TestEndToEndSnapshotRestoreSingle(unittest.TestCase): for i in range(0,200): self.n0.execute('INSERT INTO foo(name) VALUES("fiona")') - self.n0.wait_for_all_fsm() + self.n0.wait_for_all_applied() self.waitForSnapIndex(175) # Ensure node has the full correct state.