1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

690 lines
25 KiB
Python

#!/usr/bin/env python
#
# End-to-end testing using actual rqlited binary.
#
# To run a specific test, execute
#
# python system_test/full_system_test.py Class.test
import tempfile
import os
import time
import sqlite3
import unittest
from certs import x509cert, x509key, caCert, caSignedCertExampleDotCom, caSignedKeyExampleDotCom
from helpers import Node, Cluster, d_, write_random_file, deprovision_node, is_sequence_number, TIMEOUT
RQLITED_PATH = os.environ['RQLITED_PATH']
class TestEndToEnd(unittest.TestCase):
def setUp(self):
n0 = Node(RQLITED_PATH, '0')
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1')
n1.start(join=n0.RaftAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2')
n2.start(join=n0.RaftAddr())
n2.wait_for_leader()
self.cluster = Cluster([n0, n1, n2])
def tearDown(self):
self.cluster.deprovision()
def test_election(self):
'''Test basic leader election'''
n = self.cluster.wait_for_leader().stop()
self.cluster.wait_for_leader(node_exc=n)
n.start()
n.wait_for_leader()
def test_full_restart(self):
'''Test that a cluster can perform a full restart successfully'''
self.cluster.wait_for_leader()
pids = set(self.cluster.pids())
self.cluster.stop()
self.cluster.start()
self.cluster.wait_for_leader()
self.cluster.cross_check_leader()
# Guard against any error in testing, by confirming that restarting the cluster
# actually resulted in all new rqlited processes.
self.assertTrue(pids.isdisjoint(set(self.cluster.pids())))
def test_execute_fail_restart(self):
'''Test that a node that fails picks up changes after restarting'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
fsmIdx = n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
n0 = self.cluster.wait_for_leader().stop()
n1 = self.cluster.wait_for_leader(node_exc=n0)
n1.wait_for_fsm_index(fsmIdx)
j = n1.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = n1.execute('INSERT INTO foo(name) VALUES("declan")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
n0.start()
n0.wait_for_leader()
n0.wait_for_fsm_index(n1.fsm_index())
j = n0.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona'], [2, 'declan']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def test_leader_redirect(self):
'''Test that followers supply the correct leader redirects (HTTP 301)'''
l = self.cluster.wait_for_leader()
fs = self.cluster.followers()
self.assertEqual(len(fs), 2)
for n in fs:
self.assertEqual(l.APIProtoAddr(), n.redirect_addr())
# Kill the leader, wait for a new leader, and check that the
# redirect returns the new leader address.
l.stop()
n = self.cluster.wait_for_leader(node_exc=l)
for f in self.cluster.followers():
self.assertEqual(n.APIProtoAddr(), f.redirect_addr())
def test_nodes(self):
'''Test that the nodes/ endpoint operates as expected'''
l = self.cluster.wait_for_leader()
fs = self.cluster.followers()
self.assertEqual(len(fs), 2)
nodes = l.nodes()
self.assertEqual(nodes[l.node_id]['leader'], True)
self.assertEqual(nodes[l.node_id]['reachable'], True)
self.assertEqual(nodes[l.node_id]['api_addr'], l.APIProtoAddr())
self.assertTrue('time' in nodes[fs[0].node_id])
for n in [fs[0], fs[1]]:
self.assertEqual(nodes[n.node_id]['leader'], False)
self.assertEqual(nodes[n.node_id]['reachable'], True)
self.assertEqual(nodes[n.node_id]['api_addr'], n.APIProtoAddr())
self.assertTrue('time' in nodes[n.node_id])
self.assertTrue('time' in nodes[fs[0].node_id])
fs[0].stop()
nodes = l.nodes()
self.assertEqual(nodes[fs[0].node_id]['reachable'], False)
self.assertTrue('error' in nodes[fs[0].node_id])
self.assertEqual(nodes[fs[1].node_id]['reachable'], True)
self.assertEqual(nodes[l.node_id]['reachable'], True)
def test_remove_node_via_leader(self):
'''Test that removing a node via the leader works'''
l = self.cluster.wait_for_leader()
fs = self.cluster.followers()
# Validate state of cluster.
self.assertEqual(len(fs), 2)
nodes = l.nodes()
self.assertEqual(len(nodes), 3)
l.remove_node(fs[0].node_id)
nodes = l.nodes()
self.assertEqual(len(nodes), 2)
def test_remove_node_via_follower(self):
'''Test that removing a node via a follower works'''
l = self.cluster.wait_for_leader()
fs = self.cluster.followers()
# Validate state of cluster.
self.assertEqual(len(fs), 2)
nodes = l.nodes()
self.assertEqual(len(nodes), 3)
fs[0].remove_node(fs[1].node_id)
nodes = l.nodes()
self.assertEqual(len(nodes), 2)
class TestEndToEndEncryptedNode(TestEndToEnd):
def setUp(self):
certFile = write_random_file(x509cert)
keyFile = write_random_file(x509key)
n0 = Node(RQLITED_PATH, '0', node_cert=certFile, node_key=keyFile, node_no_verify=True)
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1', node_cert=certFile, node_key=keyFile, node_no_verify=True)
n1.start(join=n0.RaftAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2', node_cert=certFile, node_key=keyFile, node_no_verify=True)
n2.start(join=n0.RaftAddr())
n2.wait_for_leader()
self.cluster = Cluster([n0, n1, n2])
class TestEndToEndEncryptedNode_ServerName(unittest.TestCase):
caCertFile = write_random_file(caCert)
caSignedKey = write_random_file(caSignedKeyExampleDotCom)
caSignedCert = write_random_file(caSignedCertExampleDotCom)
def test_ok(self):
'''Test that a cluster can be created when nodes present the right server name'''
n0 = Node(RQLITED_PATH, '0', node_cert=self.caSignedCert, node_key=self.caSignedKey, node_ca_cert=self.caCertFile,
node_verify_server_name='example.com')
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1', node_cert=self.caSignedCert, node_key=self.caSignedKey, node_ca_cert=self.caCertFile,
node_verify_server_name='example.com')
n1.start(join=n0.RaftAddr())
n1.wait_for_leader()
deprovision_node(n0)
deprovision_node(n1)
def test_bad(self):
'''Test that a cluster fails to be created when a node has a bad server name'''
n0 = Node(RQLITED_PATH, '0', node_cert=self.caSignedCert, node_key=self.caSignedKey, node_ca_cert=self.caCertFile,
node_verify_server_name='example.com')
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1', node_cert=self.caSignedCert, node_key=self.caSignedKey, node_ca_cert=self.caCertFile,
node_verify_server_name='bad.com')
n1.start(join=n0.RaftAddr())
self.assertTrue(n1.expect_leader_fail()) # Should fail to join due to bad server name
deprovision_node(n0)
deprovision_node(n1)
class TestClusterRecovery(unittest.TestCase):
'''Test that a cluster can recover after all Raft network addresses change'''
def test(self):
n0 = Node(RQLITED_PATH, '0')
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1')
n1.start(join=n0.RaftAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2')
n2.start(join=n0.RaftAddr())
n2.wait_for_leader()
self.nodes = [n0, n1, n2]
j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
fsmIdx = n0.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
n0.stop()
n1.stop()
n2.stop()
# Create new cluster from existing data, but with new network addresses.
addr3 = "127.0.0.1:10003"
addr4 = "127.0.0.1:10004"
addr5 = "127.0.0.1:10005"
peers = [
{"id": n0.node_id, "address": addr3},
{"id": n1.node_id, "address": addr4},
{"id": n2.node_id, "address": addr5},
]
n3 = Node(RQLITED_PATH, n0.node_id, dir=n0.dir, raft_addr=addr3)
n4 = Node(RQLITED_PATH, n1.node_id, dir=n1.dir, raft_addr=addr4)
n5 = Node(RQLITED_PATH, n2.node_id, dir=n2.dir, raft_addr=addr5)
self.nodes = self.nodes + [n3, n4, n5]
n3.set_peers(peers)
n4.set_peers(peers)
n5.set_peers(peers)
n3.start(wait=False)
n4.start(wait=False)
n5.start(wait=False)
# New cluster ok?
n3.wait_for_leader()
j = n3.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def tearDown(self):
for n in self.nodes:
deprovision_node(n)
class TestRequestForwarding(unittest.TestCase):
'''Test that followers transparently forward requests to leaders'''
def setUp(self):
n0 = Node(RQLITED_PATH, '0')
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1')
n1.start(join=n0.RaftAddr())
n1.wait_for_leader()
self.cluster = Cluster([n0, n1])
def tearDown(self):
self.cluster.deprovision()
def test_execute_query_forward(self):
l = self.cluster.wait_for_leader()
j = l.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
f = self.cluster.followers()[0]
j = f.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
fsmIdx = l.wait_for_all_applied()
j = l.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = f.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = f.query('SELECT * FROM foo', level="strong")
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def test_execute_queued_forward(self):
l = self.cluster.wait_for_leader()
j = l.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
f = self.cluster.followers()[0]
j = f.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
fsmIdx = l.wait_for_all_applied()
j = f.execute_queued('INSERT INTO foo(name) VALUES("declan")')
self.assertTrue(is_sequence_number(str(j)))
j = f.execute_queued('INSERT INTO foo(name) VALUES(?)', params=["aoife"])
self.assertTrue(is_sequence_number(str(j)))
# Wait for queued write to happen.
timeout = 10
t = 0
while True:
j = l.query('SELECT * FROM foo')
if j == d_("{'results': [{'values': [[1, 'fiona'], [2, 'declan'], [3, 'aoife']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"):
break
if t > timeout:
raise Exception('timeout')
time.sleep(1)
t+=1
def test_execute_queued_forward_wait(self):
l = self.cluster.wait_for_leader()
j = l.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
f = self.cluster.followers()[0]
j = f.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
fsmIdx = l.wait_for_all_applied()
# Load up the queue!
for i in range(0,2000):
j = f.execute_queued('INSERT INTO foo(name) VALUES("declan")')
self.assertTrue(is_sequence_number(str(j)))
j = f.execute_queued('INSERT INTO foo(name) VALUES(?)', wait=True, params=["aoife"])
self.assertTrue(is_sequence_number(str(j)))
# Data should be ready immediately, since we waited.
j = l.query('SELECT COUNT(*) FROM foo')
self.assertEqual(j, d_("{'results': [{'columns': ['COUNT(*)'], 'types': ['integer'], 'values': [[2002]]}]}"))
class TestEndToEndNonVoter(unittest.TestCase):
def setUp(self):
self.leader = Node(RQLITED_PATH, '0')
self.leader.start()
self.leader.wait_for_leader()
self.non_voter = Node(RQLITED_PATH, '1', raft_voter=False)
self.non_voter.start(join=self.leader.RaftAddr())
self.non_voter.wait_for_leader()
self.cluster = Cluster([self.leader, self.non_voter])
def tearDown(self):
self.cluster.deprovision()
def test_execute_fail_rejoin(self):
'''Test that a non-voter that fails can rejoin the cluster, and pick up changes'''
# Confirm that voting status reporting is correct
self.assertTrue(self.leader.is_voter())
self.assertFalse(self.non_voter.is_voter())
# Insert some records via the leader
j = self.leader.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = self.leader.execute('INSERT INTO foo(name) VALUES("fiona")')
self.leader.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = self.leader.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Stop non-voter and then insert some more records
self.non_voter.stop()
j = self.leader.execute('INSERT INTO foo(name) VALUES("declan")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
# Restart non-voter and confirm it picks up changes
self.non_voter.start()
self.non_voter.wait_for_leader()
self.non_voter.wait_for_fsm_index(self.leader.fsm_index())
j = self.non_voter.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona'], [2, 'declan']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def test_leader_redirect(self):
'''Test that non-voters supply the correct leader redirects (HTTP 301)'''
l = self.cluster.wait_for_leader()
fs = self.cluster.followers()
self.assertEqual(len(fs), 1)
for n in fs:
self.assertEqual(l.APIProtoAddr(), n.redirect_addr())
class TestEndToEndNonVoterFollowsLeader(unittest.TestCase):
def setUp(self):
n0 = Node(RQLITED_PATH, '0')
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1')
n1.start(join=n0.RaftAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2')
n2.start(join=n0.RaftAddr())
n2.wait_for_leader()
self.non_voter = Node(RQLITED_PATH, '3', raft_voter=False)
self.non_voter.start(join=n0.RaftAddr())
self.non_voter.wait_for_leader()
self.cluster = Cluster([n0, n1, n2, self.non_voter])
def tearDown(self):
self.cluster.deprovision()
def test_execute_fail_nonvoter(self):
'''Test that a non-voter always picks up changes, even when leader changes'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
n.wait_for_all_applied()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Kill leader, and then make more changes.
n0 = self.cluster.wait_for_leader().stop()
n1 = self.cluster.wait_for_leader(node_exc=n0)
n1.wait_for_all_applied()
n1.query('SELECT count(*) FROM foo', level='strong') ## Send one more operation through log before query.
j = n1.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = n1.execute('INSERT INTO foo(name) VALUES("declan")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
# Confirm non-voter sees changes made through old and new leader.
self.non_voter.wait_for_fsm_index(n1.fsm_index())
j = self.non_voter.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona'], [2, 'declan']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
class TestEndToEndBackupRestore(unittest.TestCase):
def test_backup_restore(self):
fd, self.db_file = tempfile.mkstemp()
os.close(fd)
# Create a two-node cluster.
self.node0 = Node(RQLITED_PATH, '0')
self.node0.start()
self.node0.wait_for_leader()
self.node0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.node0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.node0.wait_for_all_applied()
self.node1 = Node(RQLITED_PATH, '1')
self.node1.start(join=self.node0.RaftAddr())
self.node1.wait_for_leader()
# Get a backup from the first node and check it.
self.node0.backup(self.db_file)
conn = sqlite3.connect(self.db_file)
rows = conn.execute('SELECT * FROM foo').fetchall()
conn.execute('PRAGMA journal_mode=DELETE')
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0], (1, 'fiona'))
conn.close()
# Get a backup from the other node and check it too.
self.node1.backup(self.db_file)
conn = sqlite3.connect(self.db_file)
rows = conn.execute('SELECT * FROM foo').fetchall()
conn.execute('PRAGMA journal_mode=DELETE')
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0], (1, 'fiona'))
conn.close()
# Load file into a brand new single node, check the data is right.
self.node2 = Node(RQLITED_PATH, '3')
self.node2.start()
self.node2.wait_for_leader()
j = self.node2.restore(self.db_file)
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = self.node2.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Start another 2-node cluster, load data via follower and check.
self.node3 = Node(RQLITED_PATH, '3')
self.node3.start()
self.node3.wait_for_leader()
self.node4 = Node(RQLITED_PATH, '4')
self.node4.start(join=self.node3.RaftAddr())
self.node4.wait_for_leader()
self.assertTrue(self.node3.is_leader())
self.node4.restore(self.db_file, fmt='binary')
self.node3.wait_for_all_applied()
j = self.node3.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def tearDown(self):
if hasattr(self, 'node0'):
deprovision_node(self.node0)
if hasattr(self, 'node1'):
deprovision_node(self.node1)
if hasattr(self, 'node2'):
deprovision_node(self.node2)
if hasattr(self, 'node3'):
deprovision_node(self.node3)
if hasattr(self, 'node4'):
deprovision_node(self.node4)
os.remove(self.db_file)
class TestEndToEndSnapRestoreCluster(unittest.TestCase):
def wait_for_snap(self, n):
'''
Wait for at least n snapshots to be taken.
'''
timeout = 10
t = 0
while True:
if t > timeout:
raise Exception('timeout')
if self.n0.num_snapshots() >= n:
break
time.sleep(1)
t+=1
return self.n0.num_snapshots()
def poll_query(self, node, exp):
t = 0
while True:
if t > TIMEOUT:
raise Exception('timeout waiting for node %s to have all data' % node.node_id)
j = node.query('SELECT count(*) FROM foo', level='none')
if j == exp:
break
time.sleep(1)
t+=1
def test_join_with_snap(self):
'''Check that nodes join a cluster correctly via a snapshot'''
self.n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=100, raft_snap_int="1s")
self.n0.start()
self.n0.wait_for_leader()
self.n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
# Let's get multiple snapshots done.
for i in range(0,300):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_applied()
num_snaps = self.wait_for_snap(1)
# Add two more nodes to the cluster
self.n1 = Node(RQLITED_PATH, '1')
self.n1.start(join=self.n0.RaftAddr())
self.n1.wait_for_leader()
self.n2 = Node(RQLITED_PATH, '2')
self.n2.start(join=self.n0.RaftAddr())
self.n2.wait_for_leader()
# Force the Apply loop to run on the node, so fsm_index is updated.
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
# Ensure those new nodes have the full correct state.
self.n1.wait_for_fsm_index(self.n0.fsm_index())
self.poll_query(self.n1, d_("{'results': [{'values': [[301]], 'types': ['integer'], 'columns': ['count(*)']}]}"))
self.n2.wait_for_fsm_index(self.n0.fsm_index())
self.poll_query(self.n2, d_("{'results': [{'values': [[301]], 'types': ['integer'], 'columns': ['count(*)']}]}"))
# Kill one of the nodes, and make more changes, enough to trigger more snaps.
self.n2.stop()
# Let's get more snapshots done.
for j in range(0, 200):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_applied()
self.wait_for_snap(num_snaps+1)
# Restart killed node, check it has full state.
self.n2.start()
self.n2.wait_for_leader()
# Force the Apply loop to run on the node twice, so fsm_index is updated on n2 to a point
# where the SQLite database on n2 is updated.
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n2.query('SELECT count(*) FROM foo', level='strong')
# Snapshot testing is tricky, as it's so timing-based -- and therefore hard to predict.
# So adopt a polling approach.
self.poll_query(self.n2, d_("{'results': [{'values': [[502]], 'types': ['integer'], 'columns': ['count(*)']}]}"))
# Launch a brand-new node, and check that is catches up via a single restore from snapshot.
self.n3 = Node(RQLITED_PATH, '3')
self.n3.start(join=self.n0.RaftAddr())
self.n3.wait_for_leader()
self.poll_query(self.n3, d_("{'results': [{'values': [[502]], 'types': ['integer'], 'columns': ['count(*)']}]}"))
self.assertEqual(self.n3.expvar()['store']['num_restores'], 1)
def tearDown(self):
if hasattr(self, 'n0'):
deprovision_node(self.n0)
if hasattr(self, 'n1'):
deprovision_node(self.n1)
if hasattr(self, 'n2'):
deprovision_node(self.n2)
if hasattr(self, 'n3'):
deprovision_node(self.n3)
class TestShutdown(unittest.TestCase):
def test_cluster_leader_remove_on_shutdown(self):
'''Test that removing the leader on shutdown leaves a good cluster'''
n0 = Node(RQLITED_PATH, '0', raft_cluster_remove_shutdown=True)
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1', raft_cluster_remove_shutdown=True)
n1.start(join=n0.RaftAddr())
n1.wait_for_leader()
nodes = n0.nodes()
self.assertEqual(len(nodes), 2)
nodes = n1.nodes()
self.assertEqual(len(nodes), 2)
n0.stop(graceful=True)
nodes = n1.nodes()
self.assertEqual(len(nodes), 1)
# Check that we have a working single-node cluster with a leader by doing
# a write.
n1.wait_for_ready()
n1.wait_for_ready(sync=True)
j = n1.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
deprovision_node(n0)
deprovision_node(n1)
def test_cluster_follower_remove_on_shutdown(self):
'''Test that removing a follower on shutdown leaves a good cluster'''
n0 = Node(RQLITED_PATH, '0', raft_cluster_remove_shutdown=True)
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1', raft_cluster_remove_shutdown=True)
n1.start(join=n0.RaftAddr())
n1.wait_for_leader()
nodes = n0.nodes()
self.assertEqual(len(nodes), 2)
nodes = n1.nodes()
self.assertEqual(len(nodes), 2)
n1.stop(graceful=True)
nodes = n0.nodes()
self.assertEqual(len(nodes), 1)
# Check that we have a working single-node cluster with a leader by doing
# a write.
n0.wait_for_ready()
j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
deprovision_node(n0)
deprovision_node(n1)
if __name__ == "__main__":
unittest.main(verbosity=2)