#!/usr/bin/env python # # End-to-end testing using actual rqlited binary. # # To run a specific test, execute # # python system_test/full_system_test.py Class.test import tempfile import os import unittest from certs import x509cert, x509key from helpers import Node, Cluster, d_, write_random_file, deprovision_node RQLITED_PATH = os.environ['RQLITED_PATH'] class TestJoinEncryptedNoVerify(unittest.TestCase): def test(self): ''' Test that a joining node will not operate if remote cert can't be trusted''' certFile = write_random_file(x509cert) keyFile = write_random_file(x509key) n0 = Node(RQLITED_PATH, '0', node_cert=certFile, node_key=keyFile, node_no_verify=False) n0.start() n0.wait_for_leader() n1 = Node(RQLITED_PATH, '1', node_cert=certFile, node_key=keyFile, node_no_verify=False) n1.start(join=n0.APIAddr()) self.assertRaises(Exception, n1.wait_for_leader) # Join should fail due to bad cert. deprovision_node(n0) deprovision_node(n1) class TestAuthJoin(unittest.TestCase): '''Test that joining works with authentication''' def test(self): self.auth_file = tempfile.NamedTemporaryFile() with open(self.auth_file.name, 'w') as f: f.write('[{"username": "foo","password": "bar","perms": ["all"]}, {"username": "*", "perms": ["status", "ready"]}]') n0 = Node(RQLITED_PATH, '0', auth=self.auth_file.name) n0.start() n0.wait_for_leader() n1 = Node(RQLITED_PATH, '1', auth=self.auth_file.name) n1.start(join=n0.APIAddr()) self.assertTrue(n1.expect_leader_fail()) # Join should fail due to lack of auth. n2 = Node(RQLITED_PATH, '2', auth=self.auth_file.name) n2.start(join=n0.APIAddr(), join_as="foo") n2.wait_for_leader() self.cluster = Cluster([n0, n1, n2]) def tearDown(self): self.auth_file.close() self.cluster.deprovision() class TestIdempotentJoin(unittest.TestCase): def tearDown(self): deprovision_node(self.n0) deprovision_node(self.n1) def test(self): '''Test that a node performing two join requests works fine''' self.n0 = Node(RQLITED_PATH, '0') self.n0.start() self.n0.wait_for_leader() self.n1 = Node(RQLITED_PATH, '1') self.n1.start(join=self.n0.APIAddr()) self.n1.wait_for_leader() self.assertEqual(self.n0.num_join_requests(), 1) self.n1.stop() self.n1.start(join=self.n0.APIAddr()) self.n1.wait_for_leader() self.assertEqual(self.n0.num_join_requests(), 2) class TestRedirectedJoin(unittest.TestCase): def tearDown(self): deprovision_node(self.n0) deprovision_node(self.n1) deprovision_node(self.n2) def test(self): '''Test that a node can join via a follower''' self.n0 = Node(RQLITED_PATH, '0') self.n0.start() l0 = self.n0.wait_for_leader() self.n1 = Node(RQLITED_PATH, '1') self.n1.start(join=self.n0.APIAddr()) self.n1.wait_for_leader() self.assertTrue(self.n1.is_follower()) self.n2 = Node(RQLITED_PATH, '2') self.n2.start(join=self.n1.APIAddr()) l2 = self.n2.wait_for_leader() self.assertEqual(l0, l2) def test_api_adv(self): '''Test that a node can join via a follower that advertises a different API address''' self.n0 = Node(RQLITED_PATH, '0', api_addr="0.0.0.0:4001", api_adv="localhost:4001") self.n0.start() l0 = self.n0.wait_for_leader() self.n1 = Node(RQLITED_PATH, '1') self.n1.start(join=self.n0.APIAddr()) self.n1.wait_for_leader() self.assertTrue(self.n1.is_follower()) self.n2 = Node(RQLITED_PATH, '2') self.n2.start(join=self.n1.APIAddr()) l2 = self.n2.wait_for_leader() self.assertEqual(l0, l2) class TestJoinCatchup(unittest.TestCase): def setUp(self): self.n0 = Node(RQLITED_PATH, '0') self.n0.start() self.n0.wait_for_leader() self.n1 = Node(RQLITED_PATH, '1') self.n1.start(join=self.n0.APIAddr()) self.n1.wait_for_leader() self.n2 = Node(RQLITED_PATH, '2') self.n2.start(join=self.n0.APIAddr()) self.n2.wait_for_leader() self.cluster = Cluster([self.n0, self.n1, self.n2]) def tearDown(self): self.cluster.deprovision() def test_no_change_id_addr(self): '''Test that a node rejoining without changing ID or address picks up changes''' n0 = self.cluster.wait_for_leader() j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.assertEqual(j, d_("{'results': [{}]}")) j = n0.execute('INSERT INTO foo(name) VALUES("fiona")') self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n0.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) applied = n0.wait_for_all_fsm() # Test that follower node has correct state in local database, and then kill the follower self.n1.wait_for_fsm_index(applied) j = self.n1.query('SELECT * FROM foo', level='none') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) self.n1.stop() # Insert a new record j = n0.execute('INSERT INTO foo(name) VALUES("fiona")') self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}")) j = n0.query('SELECT COUNT(*) FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': [''], 'columns': ['COUNT(*)']}]}")) applied = n0.wait_for_all_fsm() # Restart follower, explicity rejoin, and ensure it picks up new records self.n1.start(join=self.n0.APIAddr()) self.n1.wait_for_leader() self.n1.wait_for_fsm_index(applied) self.assertEqual(n0.expvar()['store']['num_ignored_joins'], 1) j = self.n1.query('SELECT COUNT(*) FROM foo', level='none') self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': [''], 'columns': ['COUNT(*)']}]}")) def test_change_addresses(self): '''Test that a node rejoining with new addresses works fine''' n0 = self.cluster.wait_for_leader() j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.assertEqual(j, d_("{'results': [{}]}")) j = n0.execute('INSERT INTO foo(name) VALUES("fiona")') self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) j = n0.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) applied = n0.wait_for_all_fsm() # Test that follower node has correct state in local database, and then kill the follower self.n1.wait_for_fsm_index(applied) j = self.n1.query('SELECT * FROM foo', level='none') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) self.n1.stop() # Insert a new record j = n0.execute('INSERT INTO foo(name) VALUES("fiona")') self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}")) j = n0.query('SELECT COUNT(*) FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': [''], 'columns': ['COUNT(*)']}]}")) applied = n0.wait_for_all_fsm() # Restart follower with new network attributes, explicity rejoin, and ensure it picks up new records self.n1.scramble_network() self.n1.start(join=self.n0.APIAddr()) self.n1.wait_for_leader() self.assertEqual(n0.expvar()['store']['num_removed_before_joins'], 1) self.n1.wait_for_fsm_index(applied) j = self.n1.query('SELECT COUNT(*) FROM foo', level='none') self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': [''], 'columns': ['COUNT(*)']}]}")) if __name__ == "__main__": unittest.main(verbosity=2)