1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1565 lines
56 KiB
Python

#!/usr/bin/env python
# End-to-end testing using actual rqlited binary.
#
# To run a specific test, execute
#
# python system_test/full_system_test.py Class.test
import tempfile
import argparse
import ast
import subprocess
import requests
import json
import os
import random
import shutil
import time
import socket
import sqlite3
import string
import sys
import unittest
from urllib.parse import urlparse
RQLITED_PATH = os.environ['RQLITED_PATH']
TIMEOUT=20
x509cert = '''-----BEGIN CERTIFICATE-----
MIIFXTCCA0WgAwIBAgIJALrA6P0W35jRMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
BAYTAlVTMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMTcwNjEwMjIwMDM1WhcNMTgwNjEwMjIwMDM1WjBF
MQswCQYDVQQGEwJVUzETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
CgKCAgEA2cxg1IcP1gDQezLJm9MDkEEHqOZEAn1iatoIHUoIlfu36Sripn4yoTxM
1pmOT37CFoaiRfj0biEbjrgfi0QXk9z4E7Vy0XGF6XB5KofOneqnUuSgnOnEkL0p
gQ3itCr/FLkvuT8/zYKL+PXsMnfHGORgJmHlu1/4rY6Z/dayaf4fUFlKRRziEVUn
3EMd/hHFHThXimWd3mtxE1YnpKimnFLmIYjXrK22QUZJ2MYVcRklJYaXhIJgHW2s
oe+ZRhFHxcYoY3znRFZXYkoCXETcExCmo7czLoN4/F92zFDEGbAMbwC/7Zo9AxQg
30Q4iCrLfwAx+M/0A2dRbSTqGReBeBVfEBWopfz7zV3W7kI+s5K2AIFi+1hbmJ6a
mKomv3f4z6Ml+yOqrq4KtrDSxnSf6Vh7EHsws6uyMG7Y6rLpPm1sLDiffPABlAti
/YlVT+3vlg86h7Vlw68CcNSclgyfFW+i1e5a+EV7WB0VmIQXzSkhA86b9aD8qWdL
N4H8sRlSZ3XfIil4u93QDC/NzJl22wRsN7926xR4DgbCesEsc361KYE8fBSx61fa
6EyvlQoI2I4r1aWCSHq7YGfV6guBZekR0BeaIsoNwfZDZrboL0sOrHGxiEfzYdVC
pAxjdG13zuPo+634fUfewBAq695kVYcy3aBt2wOkLyQGLu0CHHsCAwEAAaNQME4w
HQYDVR0OBBYEFAYLLJUqmUdXCNYTQIWX1ICBKGvWMB8GA1UdIwQYMBaAFAYLLJUq
mUdXCNYTQIWX1ICBKGvWMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggIB
AGnvTPevCooo3xO8U/lq2YNo3cFxYnkurBwAn5pwzJrtPgogwezBltEp52+n39TY
5hSa//pfKdQz2GrQ9YvX1vB8gWkNLxBe6g2ksn0DsGTApC/te1p4M+yTKhogtE7a
qYmZBSEI46URe0JLYNirzdTu5dri7DzxFc7E/XlQ0riuMyHNqOP0JXKhxKN1dYOu
NEPxekq2Z2phoo1ul8hBXsz4IRwVeQOAtpRnfrKjxogOI1teP/RSikTsSLvFHxqo
UHVzwBexQs9isBlBUcmuKksxoGugqqSkGQRE+dSs5RSeEPLexMgACfFmKfpS+Vn4
ikb2ETQ3i76+JgMoDHKwb4u9xIyKTUToIsx5dUO+o7paPfyqRE6WbO4H+suM4VCd
VhNbG9qv02Fl8vdYAc/A6tVyV8b4fMbSsGEQnBlvKuOXf/uxAIcz11WUQ4gy/0/e
kHbMqGuBFPkg5nww3dBxkrBbtKq/1yrnQUjpBvjYtyUvoKrLSbQSGj586i52r4hF
+bqGPTxmk6hU4JZN+0wvkbVWLZBTRVNKs8Sb6fRWTd2Zd/o7a7QFhbnnAhv8bgyb
4472yLaXTL/siml+LlSrNGeZEsAaCVH4ETp+HzjpAMAyhhFGqCixG0e9BRPGV936
H/8+SUQK5KxnwDz3hqrAVJyimrvNlSaP1eZ5P8WXuvBl
-----END CERTIFICATE-----'''
x509key = '''-----BEGIN PRIVATE KEY-----
MIIJQQIBADANBgkqhkiG9w0BAQEFAASCCSswggknAgEAAoICAQDZzGDUhw/WANB7
Msmb0wOQQQeo5kQCfWJq2ggdSgiV+7fpKuKmfjKhPEzWmY5PfsIWhqJF+PRuIRuO
uB+LRBeT3PgTtXLRcYXpcHkqh86d6qdS5KCc6cSQvSmBDeK0Kv8UuS+5Pz/Ngov4
9ewyd8cY5GAmYeW7X/itjpn91rJp/h9QWUpFHOIRVSfcQx3+EcUdOFeKZZ3ea3ET
ViekqKacUuYhiNesrbZBRknYxhVxGSUlhpeEgmAdbayh75lGEUfFxihjfOdEVldi
SgJcRNwTEKajtzMug3j8X3bMUMQZsAxvAL/tmj0DFCDfRDiIKst/ADH4z/QDZ1Ft
JOoZF4F4FV8QFail/PvNXdbuQj6zkrYAgWL7WFuYnpqYqia/d/jPoyX7I6qurgq2
sNLGdJ/pWHsQezCzq7Iwbtjqsuk+bWwsOJ988AGUC2L9iVVP7e+WDzqHtWXDrwJw
1JyWDJ8Vb6LV7lr4RXtYHRWYhBfNKSEDzpv1oPypZ0s3gfyxGVJndd8iKXi73dAM
L83MmXbbBGw3v3brFHgOBsJ6wSxzfrUpgTx8FLHrV9roTK+VCgjYjivVpYJIertg
Z9XqC4Fl6RHQF5oiyg3B9kNmtugvSw6scbGIR/Nh1UKkDGN0bXfO4+j7rfh9R97A
ECrr3mRVhzLdoG3bA6QvJAYu7QIcewIDAQABAoICAEKMgXXPAxa3zvwl65ZyZp9Y
T3fbTCKan0zY7CvO6EqzzGExmmmXG+9KVowoBWTi7XkmkETjKgTQlvQH7JOILdAf
b6nOApRepLVMialmL8ru3Uul0jG/+DDlq93kGUZF8QUrBJsM6XjpD831jsNo9+vy
NDLmLOURERIvBXybco6SeIz7i4cMqUL0iyZxV6O/WERyZ8VBAXjpyXZIF/rnEWmo
purOPmBj/9F4Ia5b8EdLkJ8jvf5eO/IiBeLBLEtNkmmq/8JOcvfdjfvZc1kwLTKi
HtjdbIUk5P3wSYNqllDnCxWL3BlEzKm5J8YwuTlaIi3fKGXHXN8BXc8EvYcHOKah
K89HIuexjQyQ0JAWKIIJTZs8jVvTMTjgYnEAB+sLfehBBOKmRdmYij28kIo18blx
tsx1HjdfImDd0QloofRW1Srp6FhcgDK0qfWXze/Vm6IfF40oTVE3fS/RgYzx0SSM
2pc6hTXOnrw1r/UBPyNkJ1D/4UK4m0x91BvTSi6MsThWnhicoaTZl1GP4Qpeo9+4
9Z7t0Yalm0PA55aiHZsm9S8OroasVun2QnDxfUC44PIov7nhqifGVcKA8hIDgSNT
WP8amq9cNjft5xQnP/y70fbioPPiwau2+Q0SXVn/BYxjqZrNp6OfbWSi2IRO1NOD
QZDo2rtnL1RrDdBmtDShAoIBAQD2MTJT6HNacu8+u7DDtbMdKvy7rXVblGbUouh9
cLWX7/zGVcNzB5GSVki3J7J+Kdrs4H45/1JR1YvWtWd93F/xKXmCGkUQCsturtRn
IX93by3zuWNdLv4giP2pk97wNYaaJWZmo87nXKV6BbL//eEl+Ospg5lGrLCsj+Mk
9V8oBBxsxqgVZYVyoevLDAuwUw3Cb52PhnEaLrv30ljGFHpsYb9lFlMs8vRosVWy
i3/T5ASfdnMXKQ1gxN/aPtN6yrFVpXe+S/A5JBzAQfrjiZk4SzNvE7R0eze1YLfO
IulTvlqpk3HVQEpgfq8D3l1x/zqsh0SpCH3VkV5sQQx262iRAoIBAQDieZsWv9eO
QzF5nZmh3NL53LkpqONRBjF5b9phppgxw2jiS8D2eEn2XWExmEaK/JppmzvfxSG4
cPaQHJFjkRGpnJyBlBUnyk4ua5hlXOTb9l5HsLIBlVdcWxwF+zJh8Ylwau+mcVF8
b8n86zke88du+xTvXfMDn6p6EACmBncyZGi424hSw72u8jS0cdmqJl3isLR6duG3
4yipWhEpLU5YuR+796jmjK5h+HQwl/Ra2dykbAw7vN0ofdK0+7LrVnGh7dDecOGK
0fElgFPTazeQQV+dEzqz6UwO0Z9koxqBwPqCLi7sXOeUWwqqb3ewYO7TMM4NlK/o
C8oG4yvWj9pLAoIBACEI9PHhbSkj5wqJ8OwyA3jUfdlJK0hAn5PE0GGUsClVIJwU
ggd7aoMyZMt+3iqjvyat8QIjSo6EkyEacmqnGZCoug9FKyM975JIj2PPUOVb29Sq
ebTVS3BeMXuBxhaBeDBS+GypamgNPH8lKKHFFWMdBaEqcXTUU1i0bgxViJE8C/xk
o8VLPB7nr1YtpZvhaSVACOprZd3Xi41zgkoCEXNdomsUFdEgQL+TnCY7Jcnu/NfQ
8xyWe58Si98jMwl1DVqqu2ijk/Z27Ay4TcweeJrfLGWpRTukFROXiNJ2SMzd7Bh5
Gns9Bz3vgdiJDAzx7JOeCw6LfycbPIpWKDAE4qECggEAZ5kPG7H4Dcio6iPwsj1M
eSXBwc/S5C58FTvYXtERT7o+0T2r8FMIKl1+52vr4Qo6LFLpaaxIh5GNCFE5JJ2o
wbi1UwUFRGVjrBJl7QA4ZHJnoE2wr8673rCCui21V15g637PT4kIqG6OrFaBk6oa
MadDZVfJoX+5QQru8QOGJRQPX3h0/L8zlsKO33gxBId2bQs+E8Mr761G3Wko7nge
HbHZVWet6IC0CHbZ15y7F5APQVt3oR/83tfnughlSQgLBPK/l/F1CsaMlAYG0nB6
Q0/USAsS0FfJBgJX8nY12uMG9OPhbRf2i0O2Nk61JobA2PS7XTUF3pT9/naOiCDX
zwKCAQBK9dPzoc9CfdAJVWjCqR2xS8Jr3/64k59Pya6e7Y4ca+e1g4N7hhQRLPW7
owTKloXrR0mAkwOIiJlk+gXsl2banuSajiPxumSfPYWE1Q/QNFD/WoPvo6rPYJ8N
yA/ORsMjWq51SfpzOU69+FdY7p3GvIVWhRtinqseaAIMOkNZBLVDXF4DvtFgiLZM
bKAjGuXsKOT3MPFU9tHxi4q/7flUb30mSUVXyPjh+C+UH7e0BS0pi/rDeRdEju4z
bJVERP8/VAJ61TDQJq+Il95fzKe4yTA3dDHnO+EG5W2eCsawTK4Ze5XAWqomgdew
62D3AkJQiflLfJL8zTFph1FZXLOm
-----END PRIVATE KEY-----'''
def d_(s):
return ast.literal_eval(s.replace("'", "\""))
def random_string(n):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(n))
def write_random_file(data):
f = tempfile.NamedTemporaryFile('w', delete=False)
f.write(data)
f.close()
return f.name
class Node(object):
def __init__(self, path, node_id,
api_addr=None, api_adv=None,
boostrap_expect=0,
raft_addr=None, raft_adv=None,
raft_voter=True,
raft_snap_threshold=8192, raft_snap_int="1s",
http_cert=None, http_key=None, http_no_verify=False,
node_cert=None, node_key=None, node_no_verify=False,
auth=None, dir=None, on_disk=False):
if api_addr is None:
s, addr = random_addr()
api_addr = addr
s.close()
if raft_addr is None:
s, addr = random_addr()
raft_addr = addr
s.close()
if dir is None:
dir = tempfile.mkdtemp()
if api_adv is None:
api_adv = api_addr
self.dir = dir
self.path = path
self.peers_path = os.path.join(self.dir, "raft/peers.json")
self.node_id = node_id
self.api_addr = api_addr
self.api_adv = api_adv
self.boostrap_expect = boostrap_expect
self.raft_addr = raft_addr
self.raft_adv = raft_adv
self.raft_voter = raft_voter
self.raft_snap_threshold = raft_snap_threshold
self.raft_snap_int = raft_snap_int
self.http_cert = http_cert
self.http_key = http_key
self.http_no_verify = http_no_verify
self.node_cert = node_cert
self.node_key = node_key
self.node_no_verify = node_no_verify
self.auth = auth
self.disco_key = random_string(10)
self.on_disk = on_disk
self.process = None
self.stdout_file = os.path.join(dir, 'rqlited.log')
self.stdout_fd = open(self.stdout_file, 'w')
self.stderr_file = os.path.join(dir, 'rqlited.err')
self.stderr_fd = open(self.stderr_file, 'w')
def APIAddr(self):
if self.api_adv is not None:
return self.api_adv
return self.api_addr
def APIProtoAddr(self):
return "http://%s" % self.APIAddr()
def scramble_network(self):
if self.api_adv == self.api_addr:
self.api_adv = None
s, addr = random_addr()
self.api_addr = addr
s.close()
if self.api_adv is None:
self.api_adv = self.api_addr
if self.raft_adv == self.raft_addr:
self.raft_adv = None
s, addr = random_addr()
self.raft_addr = addr
s.close()
if self.raft_adv is None:
self.raft_adv = self.raft_addr
def start(self, join=None, join_as=None, join_attempts=None, join_interval=None,
disco_mode=None, disco_key=None, disco_config=None, wait=True, timeout=TIMEOUT):
if self.process is not None:
return
command = [self.path,
'-node-id', self.node_id,
'-http-addr', self.api_addr,
'-bootstrap-expect', str(self.boostrap_expect),
'-raft-addr', self.raft_addr,
'-raft-snap', str(self.raft_snap_threshold),
'-raft-snap-int', self.raft_snap_int,
'-raft-non-voter=%s' % str(not self.raft_voter).lower()]
if self.api_adv is not None:
command += ['-http-adv-addr', self.api_adv]
if self.raft_adv is not None:
command += ['-raft-adv-addr', self.raft_adv]
if self.http_cert is not None:
command += ['-http-cert', self.http_cert, '-http-key', self.http_key]
if self.http_no_verify:
command += ['-http-no-verify']
if self.node_cert is not None:
command += ['-node-encrypt', '-node-cert', self.node_cert, '-node-key', self.node_key]
if self.node_no_verify:
command += ['-node-no-verify']
if self.on_disk:
command += ['-on-disk']
if self.auth is not None:
command += ['-auth', self.auth]
if join is not None:
if join.startswith('http://') is False:
join = 'http://' + join
command += ['-join', join]
if join_as is not None:
command += ['-join-as', join_as]
if join_attempts is not None:
command += ['-join-attempts', str(join_attempts)]
if join_interval is not None:
command += ['-join-interval', join_interval]
if disco_mode is not None:
dk = disco_key
if dk is None:
dk = self.disco_key
command += ['-disco-mode', disco_mode, '-disco-key', dk]
if disco_config is not None:
command += ['-disco-config', disco_config]
command.append(self.dir)
self.process = subprocess.Popen(command, stdout=self.stdout_fd, stderr=self.stderr_fd)
t = 0
while wait:
if t > timeout:
raise Exception('rqlite process failed to start within %d seconds' % timeout)
try:
self.status()
except requests.exceptions.ConnectionError:
time.sleep(1)
t+=1
else:
break
return self
def stop(self):
if self.process is None:
return
self.process.kill()
self.process.wait()
self.process = None
return self
def pid(self):
if self.process is None:
return None
return self.process.pid
def status(self):
r = requests.get(self._status_url())
raise_for_status(r)
return r.json()
def nodes(self):
r = requests.get(self._nodes_url())
raise_for_status(r)
return r.json()
def ready(self, noleader=False):
r = requests.get(self._ready_url(noleader))
return r.status_code == 200
def expvar(self):
r = requests.get(self._expvar_url())
raise_for_status(r)
return r.json()
def is_leader(self):
'''
is_leader returns whether this node is the cluster leader
It also performs a check, to ensure the node nevers gives out
conflicting information about leader state.
'''
try:
return self.status()['store']['raft']['state'] == 'Leader'
except requests.exceptions.ConnectionError:
return False
def is_follower(self):
try:
return self.status()['store']['raft']['state'] == 'Follower'
except requests.exceptions.ConnectionError:
return False
def disco_mode(self):
try:
return self.status()['disco']['mode']
except requests.exceptions.ConnectionError:
return ''
def wait_for_leader(self, timeout=TIMEOUT):
lr = None
t = 0
while lr == None or lr['addr'] == '':
if t > timeout:
raise Exception('rqlite node failed to detect leader within %d seconds' % timeout)
try:
lr = self.status()['store']['leader']
except requests.exceptions.ConnectionError:
pass
time.sleep(1)
t+=1
# Perform a check on readyness while we're here.
if self.ready() is not True:
raise Exception('leader is available but node reports not ready')
return lr
def db_applied_index(self):
return int(self.status()['store']['db_applied_index'])
def fsm_index(self):
return int(self.status()['store']['fsm_index'])
def commit_index(self):
return int(self.status()['store']['raft']['commit_index'])
def applied_index(self):
return int(self.status()['store']['raft']['applied_index'])
def last_log_index(self):
return int(self.status()['store']['raft']['last_log_index'])
def last_snapshot_index(self):
return int(self.status()['store']['raft']['last_snapshot_index'])
def num_join_requests(self):
return int(self.expvar()['http']['joins'])
def num_snapshots(self):
return int(self.expvar()['store']['num_snapshots'])
def num_restores(self):
return int(self.expvar()['store']['num_restores'])
def wait_for_fsm_index(self, index, timeout=TIMEOUT):
'''
Wait until the given index has been applied to the state machine.
'''
t = 0
while self.fsm_index() < index:
if t > timeout:
raise Exception('timeout, target index: %d, actual index %d' % (index, self.fsm_index()))
time.sleep(1)
t+=1
return self.fsm_index()
def wait_for_commit_index(self, index, timeout=TIMEOUT):
'''
Wait until the commit index reaches the given value
'''
t = 0
while self.commit_index() < index:
if t > timeout:
raise Exception('wait_for_commit_index timeout')
time.sleep(1)
t+=1
return self.commit_index()
def wait_for_all_applied(self, timeout=TIMEOUT):
'''
Wait until the applied index equals the commit index.
'''
t = 0
while self.commit_index() != self.applied_index():
if t > timeout:
raise Exception('wait_for_all_applied timeout')
time.sleep(1)
t+=1
return self.applied_index()
def wait_for_all_fsm(self, timeout=TIMEOUT):
'''
Wait until all outstanding database commands have actually
been applied to the database i.e. state machine.
'''
t = 0
while self.fsm_index() != self.db_applied_index():
if t > timeout:
raise Exception('wait_for_all_fsm timeout')
time.sleep(1)
t+=1
return self.fsm_index()
def wait_for_restores(self, num, timeout=TIMEOUT):
'''
Wait until the number of snapshot-restores on this node reach
the given value.
'''
t = 0
while self.num_restores() != num:
if t > timeout:
raise Exception('wait_for_restores timeout wanted %d, got %d' % (num, self.num_restores()))
time.sleep(1)
t+=1
return self.num_restores()
def query(self, statement, params=None, level='weak', pretty=False, text=False):
body = [statement]
if params is not None:
try:
body = body + params
except TypeError:
# Presumably not a list, so append as an object.
body.append(params)
reqParams = {'level': level}
if pretty:
reqParams['pretty'] = "yes"
r = requests.post(self._query_url(), params=reqParams, data=json.dumps([body]))
raise_for_status(r)
if text:
return r.text
return r.json()
def execute(self, statement, params=None):
body = [statement]
if params is not None:
try:
body = body + params
except TypeError:
# Presumably not a list, so append as an object.
body.append(params)
return self.execute_raw(json.dumps([body]))
def execute_raw(self, body):
r = requests.post(self._execute_url(), data=body)
raise_for_status(r)
return r.json()
def backup(self, file):
with open(file, 'wb') as fd:
r = requests.get(self._backup_url())
raise_for_status(r)
fd.write(r.content)
def restore(self, file):
# This is the one API that doesn't expect JSON.
conn = sqlite3.connect(file)
r = requests.post(self._load_url(), data='\n'.join(conn.iterdump()))
raise_for_status(r)
conn.close()
return r.json()
def redirect_addr(self):
r = requests.post(self._execute_url(redirect=True), data=json.dumps(['nonsense']), allow_redirects=False)
raise_for_status(r)
if r.status_code == 301:
return "%s://%s" % (urlparse(r.headers['Location']).scheme, urlparse(r.headers['Location']).netloc)
def set_peers(self, peers):
f = open(self.peers_path, "w")
f.write(json.dumps(peers))
f.close()
def _status_url(self):
return 'http://' + self.APIAddr() + '/status'
def _nodes_url(self):
return 'http://' + self.APIAddr() + '/nodes?nonvoters' # Getting all nodes back makes testing easier
def _ready_url(self, noleader=False):
nl = ""
if noleader:
nl = "?noleader"
return 'http://' + self.APIAddr() + '/readyz' + nl
def _expvar_url(self):
return 'http://' + self.APIAddr() + '/debug/vars'
def _query_url(self, redirect=False):
rd = ""
if redirect:
rd = "?redirect"
return 'http://' + self.APIAddr() + '/db/query' + rd
def _execute_url(self, redirect=False):
rd = ""
if redirect:
rd = "?redirect"
return 'http://' + self.APIAddr() + '/db/execute' + rd
def _backup_url(self):
return 'http://' + self.APIAddr() + '/db/backup'
def _load_url(self):
return 'http://' + self.APIAddr() + '/db/load'
def __eq__(self, other):
return self.node_id == other.node_id
def __str__(self):
return '%s:[%s]:[%s]:[%s]' % (self.node_id, self.APIAddr(), self.raft_addr, self.dir)
def __del__(self):
self.stdout_fd.close()
self.stderr_fd.close()
def raise_for_status(r):
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
print(e)
print((r.text))
raise e
def random_addr():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 0))
return s, ':'.join([s.getsockname()[0], str(s.getsockname()[1])])
def deprovision_node(node):
node.stop()
if os.path.isdir(node.dir):
shutil.rmtree(node.dir)
node = None
class Cluster(object):
def __init__(self, nodes):
self.nodes = nodes
def wait_for_leader(self, node_exc=None, timeout=TIMEOUT):
t = 0
while True:
if t > timeout:
raise Exception('timeout')
for n in self.nodes:
if node_exc is not None and n == node_exc:
continue
if n.is_leader():
return n
time.sleep(1)
t+=1
def start(self):
for n in self.nodes:
n.start()
def stop(self):
for n in self.nodes:
n.stop()
def followers(self):
return [n for n in self.nodes if n.is_follower()]
def pids(self):
'''
Return a sorted list of all rqlited PIDs in the cluster.
'''
return sorted([n.pid() for n in self.nodes])
def deprovision(self):
for n in self.nodes:
deprovision_node(n)
class TestSingleNode(unittest.TestCase):
def setUp(self):
n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=2, raft_snap_int="1s")
n0.start()
n0.wait_for_leader()
self.cluster = Cluster([n0])
def tearDown(self):
self.cluster.deprovision()
def test_simple_raw_queries(self):
'''Test simple queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("fiona")')
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Ensure raw response from API is as expected.
j = n.query('SELECT * from bar', text=True)
self.assertEqual(str(j), '{"results":[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}]}')
def test_simple_raw_queries_pretty(self):
'''Test simple queries, requesting pretty output, work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("fiona")')
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar', pretty=True, text=True)
exp = '''{
"results": [
{
"columns": [
"id",
"name"
],
"types": [
"integer",
"text"
],
"values": [
[
1,
"fiona"
]
]
}
]
}'''
self.assertEqual(str(j), exp)
def test_simple_parameterized_queries(self):
'''Test parameterized queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT, age INTEGER)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20])
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.query('SELECT * from bar WHERE age=?', params=[20])
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.query('SELECT * from bar WHERE age=?', params=[21])
self.assertEqual(j, d_("{'results': [{'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
def test_simple_named_parameterized_queries(self):
'''Test named parameterized queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT, age INTEGER)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20])
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.query('SELECT * from bar WHERE age=:age', params={"age": 20})
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
def test_simple_parameterized_mixed_queries(self):
'''Test a mix of parameterized and non-parameterized queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT, age INTEGER)')
self.assertEqual(j, d_("{'results': [{}]}"))
body = json.dumps([
["INSERT INTO bar(name, age) VALUES(?,?)", "fiona", 20],
['INSERT INTO bar(name, age) VALUES("sinead", 25)']
])
j = n.execute_raw(body)
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}, {'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20], [2, 'sinead', 25]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
def test_snapshot(self):
''' Test that a node peforms at least 1 snapshot'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
applied = n.wait_for_all_fsm()
# Wait for a snapshot to happen.
timeout = 10
t = 0
while True:
nSnaps = n.num_snapshots()
if nSnaps > 0:
return
if t > timeout:
raise Exception('timeout', nSnaps)
time.sleep(1)
t+=1
class TestSingleNodeReadyz(unittest.TestCase):
def test(self):
''' Test /readyz behaves correctly'''
n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=2, raft_snap_int="1s")
n0.start(join="http://nonsense")
self.assertEqual(False, n0.ready())
self.assertEqual(True, n0.ready(noleader=True))
self.assertEqual(False, n0.ready(noleader=False))
deprovision_node(n0)
class TestEndToEnd(unittest.TestCase):
def setUp(self):
n0 = Node(RQLITED_PATH, '0')
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1')
n1.start(join=n0.APIAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2')
n2.start(join=n0.APIAddr())
n2.wait_for_leader()
self.cluster = Cluster([n0, n1, n2])
def tearDown(self):
self.cluster.deprovision()
def test_election(self):
'''Test basic leader election'''
n = self.cluster.wait_for_leader().stop()
self.cluster.wait_for_leader(node_exc=n)
n.start()
n.wait_for_leader()
def test_full_restart(self):
'''Test that a cluster can perform a full restart successfully'''
self.cluster.wait_for_leader()
pids = self.cluster.pids()
self.cluster.stop()
self.cluster.start()
self.cluster.wait_for_leader()
# Guard against any error in testing, by confirming that restarting the cluster
# actually resulted in new rqlite processes.
self.assertNotEqual(pids, self.cluster.pids())
def test_execute_fail_rejoin(self):
'''Test that a node that fails can rejoin the cluster, and picks up changes'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
fsmIdx = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
n0 = self.cluster.wait_for_leader().stop()
n1 = self.cluster.wait_for_leader(node_exc=n0)
n1.wait_for_fsm_index(fsmIdx)
j = n1.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = n1.execute('INSERT INTO foo(name) VALUES("declan")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
n0.start()
n0.wait_for_leader()
n0.wait_for_fsm_index(n1.fsm_index())
j = n0.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona'], [2, 'declan']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def test_leader_redirect(self):
'''Test that followers supply the correct leader redirects (HTTP 301)'''
l = self.cluster.wait_for_leader()
fs = self.cluster.followers()
self.assertEqual(len(fs), 2)
for n in fs:
self.assertEqual(l.APIProtoAddr(), n.redirect_addr())
# Kill the leader, wait for a new leader, and check that the
# redirect returns the new leader address.
l.stop()
n = self.cluster.wait_for_leader(node_exc=l)
for f in self.cluster.followers():
self.assertEqual(n.APIProtoAddr(), f.redirect_addr())
def test_nodes(self):
'''Test that the nodes/ endpoint operates as expected'''
l = self.cluster.wait_for_leader()
fs = self.cluster.followers()
self.assertEqual(len(fs), 2)
nodes = l.nodes()
self.assertEqual(nodes[l.node_id]['leader'], True)
self.assertEqual(nodes[l.node_id]['reachable'], True)
self.assertEqual(nodes[l.node_id]['api_addr'], l.APIProtoAddr())
self.assertTrue('time' in nodes[fs[0].node_id])
for n in [fs[0], fs[1]]:
self.assertEqual(nodes[n.node_id]['leader'], False)
self.assertEqual(nodes[n.node_id]['reachable'], True)
self.assertEqual(nodes[n.node_id]['api_addr'], n.APIProtoAddr())
self.assertTrue('time' in nodes[n.node_id])
self.assertTrue('time' in nodes[fs[0].node_id])
fs[0].stop()
nodes = l.nodes()
self.assertEqual(nodes[fs[0].node_id]['reachable'], False)
self.assertTrue('error' in nodes[fs[0].node_id])
self.assertEqual(nodes[fs[1].node_id]['reachable'], True)
self.assertEqual(nodes[l.node_id]['reachable'], True)
class TestEndToEndOnDisk(TestEndToEnd):
def setUp(self):
n0 = Node(RQLITED_PATH, '0', on_disk=True)
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1', on_disk=True)
n1.start(join=n0.APIAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2', on_disk=True)
n2.start(join=n0.APIAddr())
n2.wait_for_leader()
self.cluster = Cluster([n0, n1, n2])
class TestEndToEndEncryptedNode(TestEndToEnd):
def setUp(self):
certFile = write_random_file(x509cert)
keyFile = write_random_file(x509key)
n0 = Node(RQLITED_PATH, '0', node_cert=certFile, node_key=keyFile, node_no_verify=True)
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1', node_cert=certFile, node_key=keyFile, node_no_verify=True)
n1.start(join=n0.APIAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2', node_cert=certFile, node_key=keyFile, node_no_verify=True)
n2.start(join=n0.APIAddr())
n2.wait_for_leader()
self.cluster = Cluster([n0, n1, n2])
class TestEndToEndAdvAddr(TestEndToEnd):
def setUp(self):
n0 = Node(RQLITED_PATH, '0',
api_addr="0.0.0.0:4001", api_adv="localhost:4001",
raft_addr="0.0.0.0:4002", raft_adv="localhost:4002")
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1')
n1.start(join=n0.APIAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2')
n2.start(join=n0.APIAddr())
n2.wait_for_leader()
self.cluster = Cluster([n0, n1, n2])
class TestEndToEndAdvAddrEncryptedNode(TestEndToEnd):
def setUp(self):
certFile = write_random_file(x509cert)
keyFile = write_random_file(x509key)
n0 = Node(RQLITED_PATH, '0',
api_addr="0.0.0.0:4001", api_adv="localhost:4001",
raft_addr="0.0.0.0:4002", raft_adv="localhost:4002",
node_cert=certFile, node_key=keyFile, node_no_verify=True)
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1', node_cert=certFile, node_key=keyFile, node_no_verify=True)
n1.start(join=n0.APIAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2', node_cert=certFile, node_key=keyFile, node_no_verify=True)
n2.start(join=n0.APIAddr())
n2.wait_for_leader()
self.cluster = Cluster([n0, n1, n2])
class TestBootstrapping(unittest.TestCase):
'''Test simple bootstrapping works via -bootstrap-expect'''
def test(self):
n0 = Node(RQLITED_PATH, '0', boostrap_expect=3)
n1 = Node(RQLITED_PATH, '1', boostrap_expect=3)
n2 = Node(RQLITED_PATH, '2', boostrap_expect=3)
n0.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
n1.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
n2.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
self.assertEqual(n0.wait_for_leader(), n1.wait_for_leader())
self.assertEqual(n0.wait_for_leader(), n2.wait_for_leader())
# Ensure a 4th node can join later, with same launch params.
n3 = Node(RQLITED_PATH, '1', boostrap_expect=3)
n3.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
n3.wait_for_leader()
self.assertEqual(n3.wait_for_leader(), n0.wait_for_leader())
deprovision_node(n0)
deprovision_node(n1)
deprovision_node(n2)
class TestAutoClustering(unittest.TestCase):
DiscoModeConsulKV = "consul-kv"
DiscoModeEtcdKV = "etcd-kv"
def autocluster(self, mode):
disco_key = random_string(10)
n0 = Node(RQLITED_PATH, '0')
n0.start(disco_mode=mode, disco_key=disco_key)
n0.wait_for_leader()
self.assertEqual(n0.disco_mode(), mode)
j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
n0.wait_for_all_fsm()
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Add second node, make sure it joins the cluster fine.
n1 = Node(RQLITED_PATH, '1')
n1.start(disco_mode=mode, disco_key=disco_key)
n1.wait_for_leader()
self.assertEqual(n1.disco_mode(), mode)
j = n1.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Now a third.
n2 = Node(RQLITED_PATH, '2')
n2.start(disco_mode=mode, disco_key=disco_key)
n2.wait_for_leader()
self.assertEqual(n2.disco_mode(), mode)
j = n2.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Now, kill the leader, which should trigger a different node to report leadership.
deprovision_node(n0)
# Add a fourth node, it should join fine using updated leadership details.
# Use quick retries, as we know the leader information may be changing while
# the node is coming up.
n3 = Node(RQLITED_PATH, '3')
n3.start(disco_mode=mode, disco_key=disco_key, join_interval='1s', join_attempts=1)
n3.wait_for_leader()
self.assertEqual(n3.disco_mode(), mode)
j = n3.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
deprovision_node(n1)
deprovision_node(n2)
deprovision_node(n3)
def autocluster_config(self, mode, config):
disco_key = random_string(10)
n0 = Node(RQLITED_PATH, '0')
n0.start(disco_mode=mode, disco_key=disco_key, disco_config=config)
n0.wait_for_leader()
self.assertEqual(n0.disco_mode(), mode)
j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
n0.wait_for_all_fsm()
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Add second node, make sure it joins the cluster fine.
n1 = Node(RQLITED_PATH, '1')
n1.start(disco_mode=mode, disco_key=disco_key, disco_config=config)
n1.wait_for_leader()
self.assertEqual(n1.disco_mode(), mode)
j = n1.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
deprovision_node(n0)
deprovision_node(n1)
def test_consul(self):
'''Test clustering via Consul and that leadership change is observed'''
self.autocluster(TestAutoClustering.DiscoModeConsulKV)
def test_etcd(self):
'''Test clustering via Etcd and that leadership change is observed'''
self.autocluster(TestAutoClustering.DiscoModeEtcdKV)
def test_consul_config(self):
'''Test clustering via Consul with explicit file-based config'''
filename = write_random_file('{"address": "localhost:8500"}')
self.autocluster_config(TestAutoClustering.DiscoModeConsulKV, filename)
os.remove(filename)
def test_etcd_config(self):
'''Test clustering via Etcd with explicit file-based config'''
filename = write_random_file('{"endpoints": ["localhost:2379"]}')
self.autocluster_config(TestAutoClustering.DiscoModeEtcdKV, filename)
os.remove(filename)
class TestAuthJoin(unittest.TestCase):
'''Test that joining works with authentication'''
def test(self):
self.auth_file = tempfile.NamedTemporaryFile()
with open(self.auth_file.name, 'w') as f:
f.write('[{"username": "foo","password": "bar","perms": ["all"]}, {"username": "*", "perms": ["status", "ready"]}]')
n0 = Node(RQLITED_PATH, '0', auth=self.auth_file.name)
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1', auth=self.auth_file.name)
n1.start(join=n0.APIAddr())
self.assertRaises(Exception, n1.wait_for_leader) # Join should fail due to lack of auth.
n2 = Node(RQLITED_PATH, '2', auth=self.auth_file.name)
n2.start(join=n0.APIAddr(), join_as="foo")
n2.wait_for_leader()
self.cluster = Cluster([n0, n1, n2])
def tearDown(self):
self.auth_file.close()
self.cluster.deprovision()
class TestClusterRecovery(unittest.TestCase):
'''Test that a cluster can recover after all Raft network addresses change'''
def test(self):
n0 = Node(RQLITED_PATH, '0')
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1')
n1.start(join=n0.APIAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2')
n2.start(join=n0.APIAddr())
n2.wait_for_leader()
self.nodes = [n0, n1, n2]
j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
fsmIdx = n0.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
n0.stop()
n1.stop()
n2.stop()
# Create new cluster from existing data, but with new network addresses.
addr3 = "127.0.0.1:10003"
addr4 = "127.0.0.1:10004"
addr5 = "127.0.0.1:10005"
peers = [
{"id": n0.node_id, "address": addr3},
{"id": n1.node_id, "address": addr4},
{"id": n2.node_id, "address": addr5},
]
n3 = Node(RQLITED_PATH, n0.node_id, dir=n0.dir, raft_addr=addr3)
n4 = Node(RQLITED_PATH, n1.node_id, dir=n1.dir, raft_addr=addr4)
n5 = Node(RQLITED_PATH, n2.node_id, dir=n2.dir, raft_addr=addr5)
self.nodes = self.nodes + [n3, n4, n5]
n3.set_peers(peers)
n4.set_peers(peers)
n5.set_peers(peers)
n3.start(wait=False)
n4.start(wait=False)
n5.start(wait=False)
# New cluster ok?
n3.wait_for_leader()
j = n3.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def tearDown(self):
for n in self.nodes:
deprovision_node(n)
class TestRequestForwarding(unittest.TestCase):
'''Test that followers transparently forward requests to leaders'''
def setUp(self):
n0 = Node(RQLITED_PATH, '0')
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1')
n1.start(join=n0.APIAddr())
n1.wait_for_leader()
self.cluster = Cluster([n0, n1])
def tearDown(self):
self.cluster.deprovision()
def test_execute_query_forward(self):
l = self.cluster.wait_for_leader()
j = l.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
f = self.cluster.followers()[0]
j = f.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
fsmIdx = l.wait_for_all_fsm()
j = l.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = f.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = f.query('SELECT * FROM foo', level="strong")
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
class TestEndToEndNonVoter(unittest.TestCase):
def setUp(self):
self.leader = Node(RQLITED_PATH, '0')
self.leader.start()
self.leader.wait_for_leader()
self.non_voter = Node(RQLITED_PATH, '1', raft_voter=False)
self.non_voter.start(join=self.leader.APIAddr())
self.non_voter.wait_for_leader()
self.cluster = Cluster([self.leader, self.non_voter])
def tearDown(self):
self.cluster.deprovision()
def test_execute_fail_rejoin(self):
'''Test that a non-voter that fails can rejoin the cluster, and pick up changes'''
# Insert some records via the leader
j = self.leader.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = self.leader.execute('INSERT INTO foo(name) VALUES("fiona")')
self.leader.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = self.leader.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Stop non-voter and then insert some more records
self.non_voter.stop()
j = self.leader.execute('INSERT INTO foo(name) VALUES("declan")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
# Restart non-voter and confirm it picks up changes
self.non_voter.start()
self.non_voter.wait_for_leader()
self.non_voter.wait_for_fsm_index(self.leader.fsm_index())
j = self.non_voter.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona'], [2, 'declan']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def test_leader_redirect(self):
return
'''Test that non-voters supply the correct leader redirects (HTTP 301)'''
l = self.cluster.wait_for_leader()
fs = self.cluster.followers()
self.assertEqual(len(fs), 1)
for n in fs:
self.assertEqual(l.APIAddr(), n.redirect_addr())
l.stop()
n = self.cluster.wait_for_leader(node_exc=l)
for f in self.cluster.followers():
self.assertEqual(n.APIAddr(), f.redirect_addr())
class TestEndToEndNonVoterFollowsLeader(unittest.TestCase):
def setUp(self):
n0 = Node(RQLITED_PATH, '0')
n0.start()
n0.wait_for_leader()
n1 = Node(RQLITED_PATH, '1')
n1.start(join=n0.APIAddr())
n1.wait_for_leader()
n2 = Node(RQLITED_PATH, '2')
n2.start(join=n0.APIAddr())
n2.wait_for_leader()
self.non_voter = Node(RQLITED_PATH, '3', raft_voter=False)
self.non_voter.start(join=n0.APIAddr())
self.non_voter.wait_for_leader()
self.cluster = Cluster([n0, n1, n2, self.non_voter])
def tearDown(self):
self.cluster.deprovision()
def test_execute_fail_nonvoter(self):
'''Test that a non-voter always picks up changes, even when leader changes'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Kill leader, and then make more changes.
n0 = self.cluster.wait_for_leader().stop()
n1 = self.cluster.wait_for_leader(node_exc=n0)
n1.wait_for_all_applied()
n1.query('SELECT count(*) FROM foo', level='strong') ## Send one more operation through log before query.
j = n1.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = n1.execute('INSERT INTO foo(name) VALUES("declan")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
# Confirm non-voter sees changes made through old and new leader.
self.non_voter.wait_for_fsm_index(n1.fsm_index())
j = self.non_voter.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona'], [2, 'declan']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
class TestEndToEndBackupRestore(unittest.TestCase):
def test_backup_restore(self):
fd, self.db_file = tempfile.mkstemp()
os.close(fd)
self.node0 = Node(RQLITED_PATH, '0')
self.node0.start()
self.node0.wait_for_leader()
self.node0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.node0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.node0.wait_for_all_fsm()
self.node0.backup(self.db_file)
conn = sqlite3.connect(self.db_file)
rows = conn.execute('SELECT * FROM foo').fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0], (1, 'fiona'))
conn.close()
self.node1 = Node(RQLITED_PATH, '1')
self.node1.start()
self.node1.wait_for_leader()
j = self.node1.restore(self.db_file)
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = self.node1.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def tearDown(self):
if hasattr(self, 'node0'):
deprovision_node(self.node0)
if hasattr(self, 'node1'):
deprovision_node(self.node1)
os.remove(self.db_file)
class TestEndToEndSnapRestoreSingle(unittest.TestCase):
def setUp(self):
self.n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=10, raft_snap_int="1s")
self.n0.start()
self.n0.wait_for_leader()
def waitForSnapIndex(self, n):
timeout = 10
t = 0
while True:
if t > timeout:
raise Exception('timeout')
if self.n0.last_snapshot_index() >= n:
break
time.sleep(1)
t+=1
def test_snap_and_restart(self):
'''Check that an node restarts correctly after multiple snapshots'''
# Let's get multiple snapshots done.
self.n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
for i in range(0,200):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_fsm()
self.waitForSnapIndex(175)
# Ensure node has the full correct state.
j = self.n0.query('SELECT count(*) FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[200]], 'types': [''], 'columns': ['count(*)']}]}"))
# Restart node, and make sure it comes back with the correct state
self.n0.stop()
self.n0.start()
self.n0.wait_for_leader()
self.n0.wait_for_all_applied()
self.assertEqual(self.n0.expvar()['store']['num_restores'], 1)
j = self.n0.query('SELECT count(*) FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[200]], 'types': [''], 'columns': ['count(*)']}]}"))
def tearDown(self):
deprovision_node(self.n0)
class TestEndToEndSnapRestoreSingleOnDisk(TestEndToEndSnapRestoreSingle):
def setUp(self):
self.n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=10, raft_snap_int="1s", on_disk=True)
self.n0.start()
self.n0.wait_for_leader()
class TestEndToEndSnapRestoreCluster(unittest.TestCase):
def waitForSnap(self, n):
timeout = 10
t = 0
while True:
if t > timeout:
raise Exception('timeout')
if self.n0.num_snapshots() is n:
break
time.sleep(1)
t+=1
def test_join_with_snap(self):
'''Check that a node joins a cluster correctly via a snapshot'''
self.n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=100, raft_snap_int="1s")
self.n0.start()
self.n0.wait_for_leader()
# Let's get TWO snapshots done.
self.n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
for i in range(0,100):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_fsm()
self.waitForSnap(1)
for i in range(0,100):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_fsm()
self.waitForSnap(2)
# Add two more nodes to the cluster
self.n1 = Node(RQLITED_PATH, '1')
self.n1.start(join=self.n0.APIAddr())
self.n1.wait_for_leader()
self.n2 = Node(RQLITED_PATH, '2')
self.n2.start(join=self.n0.APIAddr())
self.n2.wait_for_leader()
# Force the Apply loop to run on the node, so fsm_index is updated.
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
# Ensure those new nodes have the full correct state.
self.n1.wait_for_fsm_index(self.n0.fsm_index())
j = self.n1.query('SELECT count(*) FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[201]], 'types': [''], 'columns': ['count(*)']}]}"))
self.n2.wait_for_fsm_index(self.n0.fsm_index())
j = self.n2.query('SELECT count(*) FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[201]], 'types': [''], 'columns': ['count(*)']}]}"))
# Kill one of the nodes, and make more changes, enough to trigger more snaps.
self.n2.stop()
for i in range(0,100):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_fsm()
self.waitForSnap(3)
for i in range(0,100):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_fsm()
self.waitForSnap(4)
# Restart killed node, check it has full state.
self.n2.start()
self.n2.wait_for_leader()
# Force the Apply loop to run on the node twice, so fsm_index is updated on n2 to a point
# where the SQLite database on n2 is updated.
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n2.query('SELECT count(*) FROM foo', level='strong')
# Snapshot testing is tricky, as it's so timing-based -- and therefore hard to predict.
# So adopt a polling approach.
t = 0
while True:
if t > TIMEOUT:
raise Exception('timeout waiting for n2 to have all data')
j = self.n2.query('SELECT count(*) FROM foo', level='none')
if j == d_("{'results': [{'values': [[402]], 'types': [''], 'columns': ['count(*)']}]}"):
break
time.sleep(1)
t+=1
def tearDown(self):
deprovision_node(self.n0)
deprovision_node(self.n1)
deprovision_node(self.n2)
class TestIdempotentJoin(unittest.TestCase):
def tearDown(self):
deprovision_node(self.n0)
deprovision_node(self.n1)
def test(self):
'''Test that a node performing two join requests works fine'''
self.n0 = Node(RQLITED_PATH, '0')
self.n0.start()
self.n0.wait_for_leader()
self.n1 = Node(RQLITED_PATH, '1')
self.n1.start(join=self.n0.APIAddr())
self.n1.wait_for_leader()
self.assertEqual(self.n0.num_join_requests(), 1)
self.n1.stop()
self.n1.start(join=self.n0.APIAddr())
self.n1.wait_for_leader()
self.assertEqual(self.n0.num_join_requests(), 2)
class TestJoinCatchup(unittest.TestCase):
def setUp(self):
self.n0 = Node(RQLITED_PATH, '0')
self.n0.start()
self.n0.wait_for_leader()
self.n1 = Node(RQLITED_PATH, '1')
self.n1.start(join=self.n0.APIAddr())
self.n1.wait_for_leader()
self.n2 = Node(RQLITED_PATH, '2')
self.n2.start(join=self.n0.APIAddr())
self.n2.wait_for_leader()
self.cluster = Cluster([self.n0, self.n1, self.n2])
def tearDown(self):
self.cluster.deprovision()
def test_no_change_id_addr(self):
'''Test that a node rejoining without changing ID or address picks up changes'''
n0 = self.cluster.wait_for_leader()
j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
applied = n0.wait_for_all_fsm()
# Test that follower node has correct state in local database, and then kill the follower
self.n1.wait_for_fsm_index(applied)
j = self.n1.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
self.n1.stop()
# Insert a new record
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n0.query('SELECT COUNT(*) FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': [''], 'columns': ['COUNT(*)']}]}"))
applied = n0.wait_for_all_fsm()
# Restart follower, explicity rejoin, and ensure it picks up new records
self.n1.start(join=self.n0.APIAddr())
self.n1.wait_for_leader()
self.n1.wait_for_fsm_index(applied)
self.assertEqual(n0.expvar()['store']['num_ignored_joins'], 1)
j = self.n1.query('SELECT COUNT(*) FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': [''], 'columns': ['COUNT(*)']}]}"))
def test_change_addresses(self):
'''Test that a node rejoining with new addresses works fine'''
n0 = self.cluster.wait_for_leader()
j = n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n0.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
applied = n0.wait_for_all_fsm()
# Test that follower node has correct state in local database, and then kill the follower
self.n1.wait_for_fsm_index(applied)
j = self.n1.query('SELECT * FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
self.n1.stop()
# Insert a new record
j = n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n0.query('SELECT COUNT(*) FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': [''], 'columns': ['COUNT(*)']}]}"))
applied = n0.wait_for_all_fsm()
# Restart follower with new network attributes, explicity rejoin, and ensure it picks up new records
self.n1.scramble_network()
self.n1.start(join=self.n0.APIAddr())
self.n1.wait_for_leader()
self.assertEqual(n0.expvar()['store']['num_removed_before_joins'], 1)
self.n1.wait_for_fsm_index(applied)
j = self.n1.query('SELECT COUNT(*) FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[2]], 'types': [''], 'columns': ['COUNT(*)']}]}"))
class TestRedirectedJoin(unittest.TestCase):
def tearDown(self):
deprovision_node(self.n0)
deprovision_node(self.n1)
deprovision_node(self.n2)
def test(self):
'''Test that a node can join via a follower'''
self.n0 = Node(RQLITED_PATH, '0')
self.n0.start()
l0 = self.n0.wait_for_leader()
self.n1 = Node(RQLITED_PATH, '1')
self.n1.start(join=self.n0.APIAddr())
self.n1.wait_for_leader()
self.assertTrue(self.n1.is_follower())
self.n2 = Node(RQLITED_PATH, '2')
self.n2.start(join=self.n1.APIAddr())
l2 = self.n2.wait_for_leader()
self.assertEqual(l0, l2)
def test_api_adv(self):
'''Test that a node can join via a follower that advertises a different API address'''
self.n0 = Node(RQLITED_PATH, '0',
api_addr="0.0.0.0:4001", api_adv="localhost:4001")
self.n0.start()
l0 = self.n0.wait_for_leader()
self.n1 = Node(RQLITED_PATH, '1')
self.n1.start(join=self.n0.APIAddr())
self.n1.wait_for_leader()
self.assertTrue(self.n1.is_follower())
self.n2 = Node(RQLITED_PATH, '2')
self.n2.start(join=self.n1.APIAddr())
l2 = self.n2.wait_for_leader()
self.assertEqual(l0, l2)
if __name__ == "__main__":
unittest.main(verbosity=2)