1
0
Fork 0

Try splitting end-to-end testing

master
Philip O'Toole 1 year ago
parent 6b09bcba13
commit 12aa97de41

@ -36,7 +36,29 @@ jobs:
environment:
GORACE: "halt_on_error=1"
end_to_end:
end_to_end_single:
docker:
- image: cimg/go:1.20.0
steps:
- checkout
- restore_cache:
keys:
- go-mod-v4-{{ checksum "go.sum" }}
- run: sudo apt-get update
- run: sudo apt-get install python3
- run: sudo apt install python3-pip
- run: python3 -m pip install requests
- run: go version
- run: go get -t -d -v ./...
- run: go install -tags osusergo,netgo,sqlite_omit_load_extension
-ldflags="-extldflags=-static" ./...
- run:
command: python3 system_test/e2e/single_node.py
environment:
RQLITED_PATH: /home/circleci/go/bin/rqlited
resource_class: large
end_to_end_multi:
docker:
- image: cimg/go:1.20.0
- image: consul
@ -55,7 +77,7 @@ jobs:
- run: go install -tags osusergo,netgo,sqlite_omit_load_extension
-ldflags="-extldflags=-static" ./...
- run:
command: python3 system_test/full_system_test.py
command: python3 system_test/e2e/multi_node.py
environment:
RQLITED_PATH: /home/circleci/go/bin/rqlited
resource_class: large

@ -0,0 +1,84 @@
x509cert = '''-----BEGIN CERTIFICATE-----
MIIFXTCCA0WgAwIBAgIJALrA6P0W35jRMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
BAYTAlVTMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMTcwNjEwMjIwMDM1WhcNMTgwNjEwMjIwMDM1WjBF
MQswCQYDVQQGEwJVUzETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
CgKCAgEA2cxg1IcP1gDQezLJm9MDkEEHqOZEAn1iatoIHUoIlfu36Sripn4yoTxM
1pmOT37CFoaiRfj0biEbjrgfi0QXk9z4E7Vy0XGF6XB5KofOneqnUuSgnOnEkL0p
gQ3itCr/FLkvuT8/zYKL+PXsMnfHGORgJmHlu1/4rY6Z/dayaf4fUFlKRRziEVUn
3EMd/hHFHThXimWd3mtxE1YnpKimnFLmIYjXrK22QUZJ2MYVcRklJYaXhIJgHW2s
oe+ZRhFHxcYoY3znRFZXYkoCXETcExCmo7czLoN4/F92zFDEGbAMbwC/7Zo9AxQg
30Q4iCrLfwAx+M/0A2dRbSTqGReBeBVfEBWopfz7zV3W7kI+s5K2AIFi+1hbmJ6a
mKomv3f4z6Ml+yOqrq4KtrDSxnSf6Vh7EHsws6uyMG7Y6rLpPm1sLDiffPABlAti
/YlVT+3vlg86h7Vlw68CcNSclgyfFW+i1e5a+EV7WB0VmIQXzSkhA86b9aD8qWdL
N4H8sRlSZ3XfIil4u93QDC/NzJl22wRsN7926xR4DgbCesEsc361KYE8fBSx61fa
6EyvlQoI2I4r1aWCSHq7YGfV6guBZekR0BeaIsoNwfZDZrboL0sOrHGxiEfzYdVC
pAxjdG13zuPo+634fUfewBAq695kVYcy3aBt2wOkLyQGLu0CHHsCAwEAAaNQME4w
HQYDVR0OBBYEFAYLLJUqmUdXCNYTQIWX1ICBKGvWMB8GA1UdIwQYMBaAFAYLLJUq
mUdXCNYTQIWX1ICBKGvWMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggIB
AGnvTPevCooo3xO8U/lq2YNo3cFxYnkurBwAn5pwzJrtPgogwezBltEp52+n39TY
5hSa//pfKdQz2GrQ9YvX1vB8gWkNLxBe6g2ksn0DsGTApC/te1p4M+yTKhogtE7a
qYmZBSEI46URe0JLYNirzdTu5dri7DzxFc7E/XlQ0riuMyHNqOP0JXKhxKN1dYOu
NEPxekq2Z2phoo1ul8hBXsz4IRwVeQOAtpRnfrKjxogOI1teP/RSikTsSLvFHxqo
UHVzwBexQs9isBlBUcmuKksxoGugqqSkGQRE+dSs5RSeEPLexMgACfFmKfpS+Vn4
ikb2ETQ3i76+JgMoDHKwb4u9xIyKTUToIsx5dUO+o7paPfyqRE6WbO4H+suM4VCd
VhNbG9qv02Fl8vdYAc/A6tVyV8b4fMbSsGEQnBlvKuOXf/uxAIcz11WUQ4gy/0/e
kHbMqGuBFPkg5nww3dBxkrBbtKq/1yrnQUjpBvjYtyUvoKrLSbQSGj586i52r4hF
+bqGPTxmk6hU4JZN+0wvkbVWLZBTRVNKs8Sb6fRWTd2Zd/o7a7QFhbnnAhv8bgyb
4472yLaXTL/siml+LlSrNGeZEsAaCVH4ETp+HzjpAMAyhhFGqCixG0e9BRPGV936
H/8+SUQK5KxnwDz3hqrAVJyimrvNlSaP1eZ5P8WXuvBl
-----END CERTIFICATE-----'''
x509key = '''-----BEGIN PRIVATE KEY-----
MIIJQQIBADANBgkqhkiG9w0BAQEFAASCCSswggknAgEAAoICAQDZzGDUhw/WANB7
Msmb0wOQQQeo5kQCfWJq2ggdSgiV+7fpKuKmfjKhPEzWmY5PfsIWhqJF+PRuIRuO
uB+LRBeT3PgTtXLRcYXpcHkqh86d6qdS5KCc6cSQvSmBDeK0Kv8UuS+5Pz/Ngov4
9ewyd8cY5GAmYeW7X/itjpn91rJp/h9QWUpFHOIRVSfcQx3+EcUdOFeKZZ3ea3ET
ViekqKacUuYhiNesrbZBRknYxhVxGSUlhpeEgmAdbayh75lGEUfFxihjfOdEVldi
SgJcRNwTEKajtzMug3j8X3bMUMQZsAxvAL/tmj0DFCDfRDiIKst/ADH4z/QDZ1Ft
JOoZF4F4FV8QFail/PvNXdbuQj6zkrYAgWL7WFuYnpqYqia/d/jPoyX7I6qurgq2
sNLGdJ/pWHsQezCzq7Iwbtjqsuk+bWwsOJ988AGUC2L9iVVP7e+WDzqHtWXDrwJw
1JyWDJ8Vb6LV7lr4RXtYHRWYhBfNKSEDzpv1oPypZ0s3gfyxGVJndd8iKXi73dAM
L83MmXbbBGw3v3brFHgOBsJ6wSxzfrUpgTx8FLHrV9roTK+VCgjYjivVpYJIertg
Z9XqC4Fl6RHQF5oiyg3B9kNmtugvSw6scbGIR/Nh1UKkDGN0bXfO4+j7rfh9R97A
ECrr3mRVhzLdoG3bA6QvJAYu7QIcewIDAQABAoICAEKMgXXPAxa3zvwl65ZyZp9Y
T3fbTCKan0zY7CvO6EqzzGExmmmXG+9KVowoBWTi7XkmkETjKgTQlvQH7JOILdAf
b6nOApRepLVMialmL8ru3Uul0jG/+DDlq93kGUZF8QUrBJsM6XjpD831jsNo9+vy
NDLmLOURERIvBXybco6SeIz7i4cMqUL0iyZxV6O/WERyZ8VBAXjpyXZIF/rnEWmo
purOPmBj/9F4Ia5b8EdLkJ8jvf5eO/IiBeLBLEtNkmmq/8JOcvfdjfvZc1kwLTKi
HtjdbIUk5P3wSYNqllDnCxWL3BlEzKm5J8YwuTlaIi3fKGXHXN8BXc8EvYcHOKah
K89HIuexjQyQ0JAWKIIJTZs8jVvTMTjgYnEAB+sLfehBBOKmRdmYij28kIo18blx
tsx1HjdfImDd0QloofRW1Srp6FhcgDK0qfWXze/Vm6IfF40oTVE3fS/RgYzx0SSM
2pc6hTXOnrw1r/UBPyNkJ1D/4UK4m0x91BvTSi6MsThWnhicoaTZl1GP4Qpeo9+4
9Z7t0Yalm0PA55aiHZsm9S8OroasVun2QnDxfUC44PIov7nhqifGVcKA8hIDgSNT
WP8amq9cNjft5xQnP/y70fbioPPiwau2+Q0SXVn/BYxjqZrNp6OfbWSi2IRO1NOD
QZDo2rtnL1RrDdBmtDShAoIBAQD2MTJT6HNacu8+u7DDtbMdKvy7rXVblGbUouh9
cLWX7/zGVcNzB5GSVki3J7J+Kdrs4H45/1JR1YvWtWd93F/xKXmCGkUQCsturtRn
IX93by3zuWNdLv4giP2pk97wNYaaJWZmo87nXKV6BbL//eEl+Ospg5lGrLCsj+Mk
9V8oBBxsxqgVZYVyoevLDAuwUw3Cb52PhnEaLrv30ljGFHpsYb9lFlMs8vRosVWy
i3/T5ASfdnMXKQ1gxN/aPtN6yrFVpXe+S/A5JBzAQfrjiZk4SzNvE7R0eze1YLfO
IulTvlqpk3HVQEpgfq8D3l1x/zqsh0SpCH3VkV5sQQx262iRAoIBAQDieZsWv9eO
QzF5nZmh3NL53LkpqONRBjF5b9phppgxw2jiS8D2eEn2XWExmEaK/JppmzvfxSG4
cPaQHJFjkRGpnJyBlBUnyk4ua5hlXOTb9l5HsLIBlVdcWxwF+zJh8Ylwau+mcVF8
b8n86zke88du+xTvXfMDn6p6EACmBncyZGi424hSw72u8jS0cdmqJl3isLR6duG3
4yipWhEpLU5YuR+796jmjK5h+HQwl/Ra2dykbAw7vN0ofdK0+7LrVnGh7dDecOGK
0fElgFPTazeQQV+dEzqz6UwO0Z9koxqBwPqCLi7sXOeUWwqqb3ewYO7TMM4NlK/o
C8oG4yvWj9pLAoIBACEI9PHhbSkj5wqJ8OwyA3jUfdlJK0hAn5PE0GGUsClVIJwU
ggd7aoMyZMt+3iqjvyat8QIjSo6EkyEacmqnGZCoug9FKyM975JIj2PPUOVb29Sq
ebTVS3BeMXuBxhaBeDBS+GypamgNPH8lKKHFFWMdBaEqcXTUU1i0bgxViJE8C/xk
o8VLPB7nr1YtpZvhaSVACOprZd3Xi41zgkoCEXNdomsUFdEgQL+TnCY7Jcnu/NfQ
8xyWe58Si98jMwl1DVqqu2ijk/Z27Ay4TcweeJrfLGWpRTukFROXiNJ2SMzd7Bh5
Gns9Bz3vgdiJDAzx7JOeCw6LfycbPIpWKDAE4qECggEAZ5kPG7H4Dcio6iPwsj1M
eSXBwc/S5C58FTvYXtERT7o+0T2r8FMIKl1+52vr4Qo6LFLpaaxIh5GNCFE5JJ2o
wbi1UwUFRGVjrBJl7QA4ZHJnoE2wr8673rCCui21V15g637PT4kIqG6OrFaBk6oa
MadDZVfJoX+5QQru8QOGJRQPX3h0/L8zlsKO33gxBId2bQs+E8Mr761G3Wko7nge
HbHZVWet6IC0CHbZ15y7F5APQVt3oR/83tfnughlSQgLBPK/l/F1CsaMlAYG0nB6
Q0/USAsS0FfJBgJX8nY12uMG9OPhbRf2i0O2Nk61JobA2PS7XTUF3pT9/naOiCDX
zwKCAQBK9dPzoc9CfdAJVWjCqR2xS8Jr3/64k59Pya6e7Y4ca+e1g4N7hhQRLPW7
owTKloXrR0mAkwOIiJlk+gXsl2banuSajiPxumSfPYWE1Q/QNFD/WoPvo6rPYJ8N
yA/ORsMjWq51SfpzOU69+FdY7p3GvIVWhRtinqseaAIMOkNZBLVDXF4DvtFgiLZM
bKAjGuXsKOT3MPFU9tHxi4q/7flUb30mSUVXyPjh+C+UH7e0BS0pi/rDeRdEju4z
bJVERP8/VAJ61TDQJq+Il95fzKe4yTA3dDHnO+EG5W2eCsawTK4Ze5XAWqomgdew
62D3AkJQiflLfJL8zTFph1FZXLOm
-----END PRIVATE KEY-----'''

@ -0,0 +1,544 @@
#!/usr/bin/env python
import tempfile
import ast
import subprocess
import requests
import json
import os
import random
import re
from urllib.parse import urlparse
import shutil
import time
import socket
import sqlite3
import string
TIMEOUT=20
seqRe = re.compile("^{'results': \[\], 'sequence_number': \d+}$")
def d_(s):
return ast.literal_eval(s.replace("'", "\""))
def is_sequence_number(r):
return seqRe.match(r)
def random_string(n):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(n))
def write_random_file(data):
f = tempfile.NamedTemporaryFile('w', delete=False)
f.write(data)
f.close()
return f.name
def raise_for_status(r):
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
print(e)
print((r.text))
raise e
def random_addr():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 0))
return s, ':'.join([s.getsockname()[0], str(s.getsockname()[1])])
class Node(object):
def __init__(self, path, node_id,
api_addr=None, api_adv=None,
boostrap_expect=0,
raft_addr=None, raft_adv=None,
raft_voter=True,
raft_snap_threshold=8192, raft_snap_int="1s",
http_cert=None, http_key=None, http_no_verify=False,
node_cert=None, node_key=None, node_no_verify=False,
auth=None, dir=None, on_disk=False):
s_api = None
s_raft = None
if api_addr is None:
s_api, addr = random_addr()
api_addr = addr
if raft_addr is None:
s_raft, addr = random_addr()
raft_addr = addr
# Only now close any sockets used to get random addresses, so there is
# no chance a randomly selected address would get re-used by the HTTP
# system and Raft system.
if s_api is not None:
s_api.close()
if s_raft is not None:
s_raft.close()
if api_adv is None:
api_adv = api_addr
if dir is None:
dir = tempfile.mkdtemp()
self.dir = dir
self.path = path
self.peers_path = os.path.join(self.dir, "raft/peers.json")
self.node_id = node_id
self.api_addr = api_addr
self.api_adv = api_adv
self.boostrap_expect = boostrap_expect
self.raft_addr = raft_addr
self.raft_adv = raft_adv
self.raft_voter = raft_voter
self.raft_snap_threshold = raft_snap_threshold
self.raft_snap_int = raft_snap_int
self.http_cert = http_cert
self.http_key = http_key
self.http_no_verify = http_no_verify
self.node_cert = node_cert
self.node_key = node_key
self.node_no_verify = node_no_verify
self.auth = auth
self.disco_key = random_string(10)
self.on_disk = on_disk
self.process = None
self.stdout_file = os.path.join(dir, 'rqlited.log')
self.stdout_fd = open(self.stdout_file, 'w')
self.stderr_file = os.path.join(dir, 'rqlited.err')
self.stderr_fd = open(self.stderr_file, 'w')
def APIAddr(self):
if self.api_adv is not None:
return self.api_adv
return self.api_addr
def APIProtoAddr(self):
return "http://%s" % self.APIAddr()
def scramble_network(self):
if self.api_adv == self.api_addr:
self.api_adv = None
s, addr = random_addr()
self.api_addr = addr
s.close()
if self.api_adv is None:
self.api_adv = self.api_addr
if self.raft_adv == self.raft_addr:
self.raft_adv = None
s, addr = random_addr()
self.raft_addr = addr
s.close()
if self.raft_adv is None:
self.raft_adv = self.raft_addr
def start(self, join=None, join_as=None, join_attempts=None, join_interval=None,
disco_mode=None, disco_key=None, disco_config=None, wait=True, timeout=TIMEOUT):
if self.process is not None:
return
command = [self.path,
'-node-id', self.node_id,
'-http-addr', self.api_addr,
'-bootstrap-expect', str(self.boostrap_expect),
'-raft-addr', self.raft_addr,
'-raft-snap', str(self.raft_snap_threshold),
'-raft-snap-int', self.raft_snap_int,
'-raft-non-voter=%s' % str(not self.raft_voter).lower()]
if self.api_adv is not None:
command += ['-http-adv-addr', self.api_adv]
if self.raft_adv is not None:
command += ['-raft-adv-addr', self.raft_adv]
if self.http_cert is not None:
command += ['-http-cert', self.http_cert, '-http-key', self.http_key]
if self.http_no_verify:
command += ['-http-no-verify']
if self.node_cert is not None:
command += ['-node-encrypt', '-node-cert', self.node_cert, '-node-key', self.node_key]
if self.node_no_verify:
command += ['-node-no-verify']
if self.on_disk:
command += ['-on-disk']
if self.auth is not None:
command += ['-auth', self.auth]
if join is not None:
if join.startswith('http://') is False:
join = 'http://' + join
command += ['-join', join]
if join_as is not None:
command += ['-join-as', join_as]
if join_attempts is not None:
command += ['-join-attempts', str(join_attempts)]
if join_interval is not None:
command += ['-join-interval', join_interval]
if disco_mode is not None:
dk = disco_key
if dk is None:
dk = self.disco_key
command += ['-disco-mode', disco_mode, '-disco-key', dk]
if disco_config is not None:
command += ['-disco-config', disco_config]
command.append(self.dir)
self.process = subprocess.Popen(command, stdout=self.stdout_fd, stderr=self.stderr_fd)
t = 0
while wait:
if t > timeout:
self.dump_log("dumping log due to timeout during start")
raise Exception('rqlite process failed to start within %d seconds' % timeout)
try:
self.status()
except requests.exceptions.ConnectionError:
time.sleep(1)
t+=1
else:
break
return self
def stop(self):
if self.process is None:
return
self.process.kill()
self.process.wait()
self.process = None
return self
def pid(self):
if self.process is None:
return None
return self.process.pid
def status(self):
r = requests.get(self._status_url())
raise_for_status(r)
return r.json()
def nodes(self):
r = requests.get(self._nodes_url())
raise_for_status(r)
return r.json()
def ready(self, noleader=False):
r = requests.get(self._ready_url(noleader))
return r.status_code == 200
def expvar(self):
r = requests.get(self._expvar_url())
raise_for_status(r)
return r.json()
def is_leader(self):
'''
is_leader returns whether this node is the cluster leader
'''
try:
return self.status()['store']['raft']['state'] == 'Leader'
except requests.exceptions.ConnectionError:
return False
def is_follower(self):
try:
return self.status()['store']['raft']['state'] == 'Follower'
except requests.exceptions.ConnectionError:
return False
def is_voter(self):
try:
return self.status()['store']['raft']['voter'] == True
except requests.exceptions.ConnectionError:
return False
def disco_mode(self):
try:
return self.status()['disco']['mode']
except requests.exceptions.ConnectionError:
return ''
def wait_for_leader(self, timeout=TIMEOUT, log=True):
lr = None
t = 0
while lr == None or lr['addr'] == '':
if t > timeout:
if log:
self.dump_log("dumping log due to timeout waiting for leader")
raise Exception('rqlite node failed to detect leader within %d seconds' % timeout)
try:
lr = self.status()['store']['leader']
except (KeyError, requests.exceptions.ConnectionError):
pass
time.sleep(1)
t+=1
# Perform a check on readyness while we're here.
if self.ready() is not True:
raise Exception('leader is available but node reports not ready')
return lr
def expect_leader_fail(self, timeout=TIMEOUT):
try:
self.wait_for_leader(self, timeout, log=False)
except:
return True
return False
def db_applied_index(self):
return int(self.status()['store']['db_applied_index'])
def fsm_index(self):
return int(self.status()['store']['fsm_index'])
def commit_index(self):
return int(self.status()['store']['raft']['commit_index'])
def applied_index(self):
return int(self.status()['store']['raft']['applied_index'])
def last_log_index(self):
return int(self.status()['store']['raft']['last_log_index'])
def last_snapshot_index(self):
return int(self.status()['store']['raft']['last_snapshot_index'])
def num_join_requests(self):
return int(self.expvar()['http']['joins'])
def num_snapshots(self):
return int(self.expvar()['store']['num_snapshots'])
def num_restores(self):
return int(self.expvar()['store']['num_restores'])
def wait_for_fsm_index(self, index, timeout=TIMEOUT):
'''
Wait until the given index has been applied to the state machine.
'''
t = 0
while self.fsm_index() < index:
if t > timeout:
raise Exception('timeout, target index: %d, actual index %d' % (index, self.fsm_index()))
time.sleep(1)
t+=1
return self.fsm_index()
def wait_for_commit_index(self, index, timeout=TIMEOUT):
'''
Wait until the commit index reaches the given value
'''
t = 0
while self.commit_index() < index:
if t > timeout:
raise Exception('wait_for_commit_index timeout')
time.sleep(1)
t+=1
return self.commit_index()
def wait_for_all_applied(self, timeout=TIMEOUT):
'''
Wait until the applied index equals the commit index.
'''
t = 0
while self.commit_index() != self.applied_index():
if t > timeout:
raise Exception('wait_for_all_applied timeout')
time.sleep(1)
t+=1
return self.applied_index()
def wait_for_all_fsm(self, timeout=TIMEOUT):
'''
Wait until all outstanding database commands have actually
been applied to the database i.e. state machine.
'''
t = 0
while self.fsm_index() != self.db_applied_index():
if t > timeout:
raise Exception('wait_for_all_fsm timeout')
time.sleep(1)
t+=1
return self.fsm_index()
def wait_for_restores(self, num, timeout=TIMEOUT):
'''
Wait until the number of snapshot-restores on this node reach
the given value.
'''
t = 0
while self.num_restores() != num:
if t > timeout:
raise Exception('wait_for_restores timeout wanted %d, got %d' % (num, self.num_restores()))
time.sleep(1)
t+=1
return self.num_restores()
def query(self, statement, params=None, level='weak', pretty=False, text=False, associative=False):
body = [statement]
if params is not None:
try:
body = body + params
except TypeError:
# Presumably not a list, so append as an object.
body.append(params)
reqParams = {'level': level}
if pretty:
reqParams['pretty'] = "yes"
if associative:
reqParams['associative'] = "yes"
r = requests.post(self._query_url(), params=reqParams, data=json.dumps([body]))
raise_for_status(r)
if text:
return r.text
return r.json()
def execute(self, statement, params=None):
body = [statement]
if params is not None:
try:
body = body + params
except TypeError:
# Presumably not a list, so append as an object.
body.append(params)
return self.execute_raw(json.dumps([body]))
def execute_raw(self, body):
r = requests.post(self._execute_url(), data=body)
raise_for_status(r)
return r.json()
def execute_queued(self, statement, wait=False, params=None):
body = [statement]
if params is not None:
try:
body = body + params
except TypeError:
# Presumably not a list, so append as an object.
body.append(params)
r = requests.post(self._execute_queued_url(wait), data=json.dumps([body]))
raise_for_status(r)
return r.json()
def backup(self, file):
with open(file, 'wb') as fd:
r = requests.get(self._backup_url())
raise_for_status(r)
fd.write(r.content)
def remove_node(self, id):
body = {"id": id}
r = requests.delete(self._remove_url(), data=json.dumps(body))
raise_for_status(r)
def restore(self, file, fmt=None):
# This is the one API that doesn't expect JSON.
if fmt != "binary":
conn = sqlite3.connect(file)
r = requests.post(self._load_url(), data='\n'.join(conn.iterdump()))
raise_for_status(r)
conn.close()
return r.json()
else:
with open(file, 'rb') as f:
data = f.read()
r = requests.post(self._load_url(), data=data, headers={'Content-Type': 'application/octet-stream'})
raise_for_status(r)
def redirect_addr(self):
r = requests.post(self._execute_url(redirect=True), data=json.dumps(['nonsense']), allow_redirects=False)
raise_for_status(r)
if r.status_code == 301:
return "%s://%s" % (urlparse(r.headers['Location']).scheme, urlparse(r.headers['Location']).netloc)
def set_peers(self, peers):
f = open(self.peers_path, "w")
f.write(json.dumps(peers))
f.close()
def dump_log(self, msg):
print(msg)
self.stderr_fd.close()
f = open(self.stderr_file, 'r')
for l in f.readlines():
print(l.strip())
def _status_url(self):
return 'http://' + self.APIAddr() + '/status'
def _nodes_url(self):
return 'http://' + self.APIAddr() + '/nodes?nonvoters' # Getting all nodes back makes testing easier
def _ready_url(self, noleader=False):
nl = ""
if noleader:
nl = "?noleader"
return 'http://' + self.APIAddr() + '/readyz' + nl
def _expvar_url(self):
return 'http://' + self.APIAddr() + '/debug/vars'
def _query_url(self, redirect=False):
rd = ""
if redirect:
rd = "?redirect"
return 'http://' + self.APIAddr() + '/db/query' + rd
def _execute_url(self, redirect=False):
rd = ""
if redirect:
rd = "?redirect"
return 'http://' + self.APIAddr() + '/db/execute' + rd
def _execute_queued_url(self, wait=False):
u = '/db/execute?queue'
if wait:
u = u + '&wait'
return 'http://' + self.APIAddr() + u
def _backup_url(self):
return 'http://' + self.APIAddr() + '/db/backup'
def _load_url(self):
return 'http://' + self.APIAddr() + '/db/load'
def _remove_url(self):
return 'http://' + self.APIAddr() + '/remove'
def __eq__(self, other):
return self.node_id == other.node_id
def __str__(self):
return '%s:[%s]:[%s]:[%s]' % (self.node_id, self.APIAddr(), self.raft_addr, self.dir)
def __del__(self):
self.stdout_fd.close()
self.stderr_fd.close()
def deprovision_node(node):
node.stop()
if os.path.isdir(node.dir):
shutil.rmtree(node.dir)
node = None
class Cluster(object):
def __init__(self, nodes):
self.nodes = nodes
def wait_for_leader(self, node_exc=None, timeout=TIMEOUT):
t = 0
while True:
if t > timeout:
raise Exception('timeout')
for n in self.nodes:
if node_exc is not None and n == node_exc:
continue
if n.is_leader():
return n
time.sleep(1)
t+=1
def start(self):
for n in self.nodes:
n.start()
def stop(self):
for n in self.nodes:
n.stop()
def followers(self):
return [n for n in self.nodes if n.is_follower()]
def pids(self):
'''
Return a sorted list of all rqlited PIDs in the cluster.
'''
return sorted([n.pid() for n in self.nodes])
def deprovision(self):
for n in self.nodes:
deprovision_node(n)

@ -7,803 +7,15 @@
# python system_test/full_system_test.py Class.test
import tempfile
import argparse
import ast
import subprocess
import requests
import json
import os
import random
import re
import shutil
import time
import socket
import sqlite3
import string
import sys
import unittest
from urllib.parse import urlparse
from certs import x509cert, x509key
from helpers import Node, Cluster, d_, write_random_file, deprovision_node, is_sequence_number, random_string, TIMEOUT
RQLITED_PATH = os.environ['RQLITED_PATH']
TIMEOUT=20
x509cert = '''-----BEGIN CERTIFICATE-----
MIIFXTCCA0WgAwIBAgIJALrA6P0W35jRMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
BAYTAlVTMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMTcwNjEwMjIwMDM1WhcNMTgwNjEwMjIwMDM1WjBF
MQswCQYDVQQGEwJVUzETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
CgKCAgEA2cxg1IcP1gDQezLJm9MDkEEHqOZEAn1iatoIHUoIlfu36Sripn4yoTxM
1pmOT37CFoaiRfj0biEbjrgfi0QXk9z4E7Vy0XGF6XB5KofOneqnUuSgnOnEkL0p
gQ3itCr/FLkvuT8/zYKL+PXsMnfHGORgJmHlu1/4rY6Z/dayaf4fUFlKRRziEVUn
3EMd/hHFHThXimWd3mtxE1YnpKimnFLmIYjXrK22QUZJ2MYVcRklJYaXhIJgHW2s
oe+ZRhFHxcYoY3znRFZXYkoCXETcExCmo7czLoN4/F92zFDEGbAMbwC/7Zo9AxQg
30Q4iCrLfwAx+M/0A2dRbSTqGReBeBVfEBWopfz7zV3W7kI+s5K2AIFi+1hbmJ6a
mKomv3f4z6Ml+yOqrq4KtrDSxnSf6Vh7EHsws6uyMG7Y6rLpPm1sLDiffPABlAti
/YlVT+3vlg86h7Vlw68CcNSclgyfFW+i1e5a+EV7WB0VmIQXzSkhA86b9aD8qWdL
N4H8sRlSZ3XfIil4u93QDC/NzJl22wRsN7926xR4DgbCesEsc361KYE8fBSx61fa
6EyvlQoI2I4r1aWCSHq7YGfV6guBZekR0BeaIsoNwfZDZrboL0sOrHGxiEfzYdVC
pAxjdG13zuPo+634fUfewBAq695kVYcy3aBt2wOkLyQGLu0CHHsCAwEAAaNQME4w
HQYDVR0OBBYEFAYLLJUqmUdXCNYTQIWX1ICBKGvWMB8GA1UdIwQYMBaAFAYLLJUq
mUdXCNYTQIWX1ICBKGvWMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggIB
AGnvTPevCooo3xO8U/lq2YNo3cFxYnkurBwAn5pwzJrtPgogwezBltEp52+n39TY
5hSa//pfKdQz2GrQ9YvX1vB8gWkNLxBe6g2ksn0DsGTApC/te1p4M+yTKhogtE7a
qYmZBSEI46URe0JLYNirzdTu5dri7DzxFc7E/XlQ0riuMyHNqOP0JXKhxKN1dYOu
NEPxekq2Z2phoo1ul8hBXsz4IRwVeQOAtpRnfrKjxogOI1teP/RSikTsSLvFHxqo
UHVzwBexQs9isBlBUcmuKksxoGugqqSkGQRE+dSs5RSeEPLexMgACfFmKfpS+Vn4
ikb2ETQ3i76+JgMoDHKwb4u9xIyKTUToIsx5dUO+o7paPfyqRE6WbO4H+suM4VCd
VhNbG9qv02Fl8vdYAc/A6tVyV8b4fMbSsGEQnBlvKuOXf/uxAIcz11WUQ4gy/0/e
kHbMqGuBFPkg5nww3dBxkrBbtKq/1yrnQUjpBvjYtyUvoKrLSbQSGj586i52r4hF
+bqGPTxmk6hU4JZN+0wvkbVWLZBTRVNKs8Sb6fRWTd2Zd/o7a7QFhbnnAhv8bgyb
4472yLaXTL/siml+LlSrNGeZEsAaCVH4ETp+HzjpAMAyhhFGqCixG0e9BRPGV936
H/8+SUQK5KxnwDz3hqrAVJyimrvNlSaP1eZ5P8WXuvBl
-----END CERTIFICATE-----'''
x509key = '''-----BEGIN PRIVATE KEY-----
MIIJQQIBADANBgkqhkiG9w0BAQEFAASCCSswggknAgEAAoICAQDZzGDUhw/WANB7
Msmb0wOQQQeo5kQCfWJq2ggdSgiV+7fpKuKmfjKhPEzWmY5PfsIWhqJF+PRuIRuO
uB+LRBeT3PgTtXLRcYXpcHkqh86d6qdS5KCc6cSQvSmBDeK0Kv8UuS+5Pz/Ngov4
9ewyd8cY5GAmYeW7X/itjpn91rJp/h9QWUpFHOIRVSfcQx3+EcUdOFeKZZ3ea3ET
ViekqKacUuYhiNesrbZBRknYxhVxGSUlhpeEgmAdbayh75lGEUfFxihjfOdEVldi
SgJcRNwTEKajtzMug3j8X3bMUMQZsAxvAL/tmj0DFCDfRDiIKst/ADH4z/QDZ1Ft
JOoZF4F4FV8QFail/PvNXdbuQj6zkrYAgWL7WFuYnpqYqia/d/jPoyX7I6qurgq2
sNLGdJ/pWHsQezCzq7Iwbtjqsuk+bWwsOJ988AGUC2L9iVVP7e+WDzqHtWXDrwJw
1JyWDJ8Vb6LV7lr4RXtYHRWYhBfNKSEDzpv1oPypZ0s3gfyxGVJndd8iKXi73dAM
L83MmXbbBGw3v3brFHgOBsJ6wSxzfrUpgTx8FLHrV9roTK+VCgjYjivVpYJIertg
Z9XqC4Fl6RHQF5oiyg3B9kNmtugvSw6scbGIR/Nh1UKkDGN0bXfO4+j7rfh9R97A
ECrr3mRVhzLdoG3bA6QvJAYu7QIcewIDAQABAoICAEKMgXXPAxa3zvwl65ZyZp9Y
T3fbTCKan0zY7CvO6EqzzGExmmmXG+9KVowoBWTi7XkmkETjKgTQlvQH7JOILdAf
b6nOApRepLVMialmL8ru3Uul0jG/+DDlq93kGUZF8QUrBJsM6XjpD831jsNo9+vy
NDLmLOURERIvBXybco6SeIz7i4cMqUL0iyZxV6O/WERyZ8VBAXjpyXZIF/rnEWmo
purOPmBj/9F4Ia5b8EdLkJ8jvf5eO/IiBeLBLEtNkmmq/8JOcvfdjfvZc1kwLTKi
HtjdbIUk5P3wSYNqllDnCxWL3BlEzKm5J8YwuTlaIi3fKGXHXN8BXc8EvYcHOKah
K89HIuexjQyQ0JAWKIIJTZs8jVvTMTjgYnEAB+sLfehBBOKmRdmYij28kIo18blx
tsx1HjdfImDd0QloofRW1Srp6FhcgDK0qfWXze/Vm6IfF40oTVE3fS/RgYzx0SSM
2pc6hTXOnrw1r/UBPyNkJ1D/4UK4m0x91BvTSi6MsThWnhicoaTZl1GP4Qpeo9+4
9Z7t0Yalm0PA55aiHZsm9S8OroasVun2QnDxfUC44PIov7nhqifGVcKA8hIDgSNT
WP8amq9cNjft5xQnP/y70fbioPPiwau2+Q0SXVn/BYxjqZrNp6OfbWSi2IRO1NOD
QZDo2rtnL1RrDdBmtDShAoIBAQD2MTJT6HNacu8+u7DDtbMdKvy7rXVblGbUouh9
cLWX7/zGVcNzB5GSVki3J7J+Kdrs4H45/1JR1YvWtWd93F/xKXmCGkUQCsturtRn
IX93by3zuWNdLv4giP2pk97wNYaaJWZmo87nXKV6BbL//eEl+Ospg5lGrLCsj+Mk
9V8oBBxsxqgVZYVyoevLDAuwUw3Cb52PhnEaLrv30ljGFHpsYb9lFlMs8vRosVWy
i3/T5ASfdnMXKQ1gxN/aPtN6yrFVpXe+S/A5JBzAQfrjiZk4SzNvE7R0eze1YLfO
IulTvlqpk3HVQEpgfq8D3l1x/zqsh0SpCH3VkV5sQQx262iRAoIBAQDieZsWv9eO
QzF5nZmh3NL53LkpqONRBjF5b9phppgxw2jiS8D2eEn2XWExmEaK/JppmzvfxSG4
cPaQHJFjkRGpnJyBlBUnyk4ua5hlXOTb9l5HsLIBlVdcWxwF+zJh8Ylwau+mcVF8
b8n86zke88du+xTvXfMDn6p6EACmBncyZGi424hSw72u8jS0cdmqJl3isLR6duG3
4yipWhEpLU5YuR+796jmjK5h+HQwl/Ra2dykbAw7vN0ofdK0+7LrVnGh7dDecOGK
0fElgFPTazeQQV+dEzqz6UwO0Z9koxqBwPqCLi7sXOeUWwqqb3ewYO7TMM4NlK/o
C8oG4yvWj9pLAoIBACEI9PHhbSkj5wqJ8OwyA3jUfdlJK0hAn5PE0GGUsClVIJwU
ggd7aoMyZMt+3iqjvyat8QIjSo6EkyEacmqnGZCoug9FKyM975JIj2PPUOVb29Sq
ebTVS3BeMXuBxhaBeDBS+GypamgNPH8lKKHFFWMdBaEqcXTUU1i0bgxViJE8C/xk
o8VLPB7nr1YtpZvhaSVACOprZd3Xi41zgkoCEXNdomsUFdEgQL+TnCY7Jcnu/NfQ
8xyWe58Si98jMwl1DVqqu2ijk/Z27Ay4TcweeJrfLGWpRTukFROXiNJ2SMzd7Bh5
Gns9Bz3vgdiJDAzx7JOeCw6LfycbPIpWKDAE4qECggEAZ5kPG7H4Dcio6iPwsj1M
eSXBwc/S5C58FTvYXtERT7o+0T2r8FMIKl1+52vr4Qo6LFLpaaxIh5GNCFE5JJ2o
wbi1UwUFRGVjrBJl7QA4ZHJnoE2wr8673rCCui21V15g637PT4kIqG6OrFaBk6oa
MadDZVfJoX+5QQru8QOGJRQPX3h0/L8zlsKO33gxBId2bQs+E8Mr761G3Wko7nge
HbHZVWet6IC0CHbZ15y7F5APQVt3oR/83tfnughlSQgLBPK/l/F1CsaMlAYG0nB6
Q0/USAsS0FfJBgJX8nY12uMG9OPhbRf2i0O2Nk61JobA2PS7XTUF3pT9/naOiCDX
zwKCAQBK9dPzoc9CfdAJVWjCqR2xS8Jr3/64k59Pya6e7Y4ca+e1g4N7hhQRLPW7
owTKloXrR0mAkwOIiJlk+gXsl2banuSajiPxumSfPYWE1Q/QNFD/WoPvo6rPYJ8N
yA/ORsMjWq51SfpzOU69+FdY7p3GvIVWhRtinqseaAIMOkNZBLVDXF4DvtFgiLZM
bKAjGuXsKOT3MPFU9tHxi4q/7flUb30mSUVXyPjh+C+UH7e0BS0pi/rDeRdEju4z
bJVERP8/VAJ61TDQJq+Il95fzKe4yTA3dDHnO+EG5W2eCsawTK4Ze5XAWqomgdew
62D3AkJQiflLfJL8zTFph1FZXLOm
-----END PRIVATE KEY-----'''
seqRe = re.compile("^{'results': \[\], 'sequence_number': \d+}$")
def d_(s):
return ast.literal_eval(s.replace("'", "\""))
def is_sequence_number(r):
return seqRe.match(r)
def random_string(n):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(n))
def write_random_file(data):
f = tempfile.NamedTemporaryFile('w', delete=False)
f.write(data)
f.close()
return f.name
class Node(object):
def __init__(self, path, node_id,
api_addr=None, api_adv=None,
boostrap_expect=0,
raft_addr=None, raft_adv=None,
raft_voter=True,
raft_snap_threshold=8192, raft_snap_int="1s",
http_cert=None, http_key=None, http_no_verify=False,
node_cert=None, node_key=None, node_no_verify=False,
auth=None, dir=None, on_disk=False):
s_api = None
s_raft = None
if api_addr is None:
s_api, addr = random_addr()
api_addr = addr
if raft_addr is None:
s_raft, addr = random_addr()
raft_addr = addr
# Only now close any sockets used to get random addresses, so there is
# no chance a randomly selected address would get re-used by the HTTP
# system and Raft system.
if s_api is not None:
s_api.close()
if s_raft is not None:
s_raft.close()
if api_adv is None:
api_adv = api_addr
if dir is None:
dir = tempfile.mkdtemp()
self.dir = dir
self.path = path
self.peers_path = os.path.join(self.dir, "raft/peers.json")
self.node_id = node_id
self.api_addr = api_addr
self.api_adv = api_adv
self.boostrap_expect = boostrap_expect
self.raft_addr = raft_addr
self.raft_adv = raft_adv
self.raft_voter = raft_voter
self.raft_snap_threshold = raft_snap_threshold
self.raft_snap_int = raft_snap_int
self.http_cert = http_cert
self.http_key = http_key
self.http_no_verify = http_no_verify
self.node_cert = node_cert
self.node_key = node_key
self.node_no_verify = node_no_verify
self.auth = auth
self.disco_key = random_string(10)
self.on_disk = on_disk
self.process = None
self.stdout_file = os.path.join(dir, 'rqlited.log')
self.stdout_fd = open(self.stdout_file, 'w')
self.stderr_file = os.path.join(dir, 'rqlited.err')
self.stderr_fd = open(self.stderr_file, 'w')
def APIAddr(self):
if self.api_adv is not None:
return self.api_adv
return self.api_addr
def APIProtoAddr(self):
return "http://%s" % self.APIAddr()
def scramble_network(self):
if self.api_adv == self.api_addr:
self.api_adv = None
s, addr = random_addr()
self.api_addr = addr
s.close()
if self.api_adv is None:
self.api_adv = self.api_addr
if self.raft_adv == self.raft_addr:
self.raft_adv = None
s, addr = random_addr()
self.raft_addr = addr
s.close()
if self.raft_adv is None:
self.raft_adv = self.raft_addr
def start(self, join=None, join_as=None, join_attempts=None, join_interval=None,
disco_mode=None, disco_key=None, disco_config=None, wait=True, timeout=TIMEOUT):
if self.process is not None:
return
command = [self.path,
'-node-id', self.node_id,
'-http-addr', self.api_addr,
'-bootstrap-expect', str(self.boostrap_expect),
'-raft-addr', self.raft_addr,
'-raft-snap', str(self.raft_snap_threshold),
'-raft-snap-int', self.raft_snap_int,
'-raft-non-voter=%s' % str(not self.raft_voter).lower()]
if self.api_adv is not None:
command += ['-http-adv-addr', self.api_adv]
if self.raft_adv is not None:
command += ['-raft-adv-addr', self.raft_adv]
if self.http_cert is not None:
command += ['-http-cert', self.http_cert, '-http-key', self.http_key]
if self.http_no_verify:
command += ['-http-no-verify']
if self.node_cert is not None:
command += ['-node-encrypt', '-node-cert', self.node_cert, '-node-key', self.node_key]
if self.node_no_verify:
command += ['-node-no-verify']
if self.on_disk:
command += ['-on-disk']
if self.auth is not None:
command += ['-auth', self.auth]
if join is not None:
if join.startswith('http://') is False:
join = 'http://' + join
command += ['-join', join]
if join_as is not None:
command += ['-join-as', join_as]
if join_attempts is not None:
command += ['-join-attempts', str(join_attempts)]
if join_interval is not None:
command += ['-join-interval', join_interval]
if disco_mode is not None:
dk = disco_key
if dk is None:
dk = self.disco_key
command += ['-disco-mode', disco_mode, '-disco-key', dk]
if disco_config is not None:
command += ['-disco-config', disco_config]
command.append(self.dir)
self.process = subprocess.Popen(command, stdout=self.stdout_fd, stderr=self.stderr_fd)
t = 0
while wait:
if t > timeout:
self.dump_log("dumping log due to timeout during start")
raise Exception('rqlite process failed to start within %d seconds' % timeout)
try:
self.status()
except requests.exceptions.ConnectionError:
time.sleep(1)
t+=1
else:
break
return self
def stop(self):
if self.process is None:
return
self.process.kill()
self.process.wait()
self.process = None
return self
def pid(self):
if self.process is None:
return None
return self.process.pid
def status(self):
r = requests.get(self._status_url())
raise_for_status(r)
return r.json()
def nodes(self):
r = requests.get(self._nodes_url())
raise_for_status(r)
return r.json()
def ready(self, noleader=False):
r = requests.get(self._ready_url(noleader))
return r.status_code == 200
def expvar(self):
r = requests.get(self._expvar_url())
raise_for_status(r)
return r.json()
def is_leader(self):
'''
is_leader returns whether this node is the cluster leader
'''
try:
return self.status()['store']['raft']['state'] == 'Leader'
except requests.exceptions.ConnectionError:
return False
def is_follower(self):
try:
return self.status()['store']['raft']['state'] == 'Follower'
except requests.exceptions.ConnectionError:
return False
def is_voter(self):
try:
return self.status()['store']['raft']['voter'] == True
except requests.exceptions.ConnectionError:
return False
def disco_mode(self):
try:
return self.status()['disco']['mode']
except requests.exceptions.ConnectionError:
return ''
def wait_for_leader(self, timeout=TIMEOUT, log=True):
lr = None
t = 0
while lr == None or lr['addr'] == '':
if t > timeout:
if log:
self.dump_log("dumping log due to timeout waiting for leader")
raise Exception('rqlite node failed to detect leader within %d seconds' % timeout)
try:
lr = self.status()['store']['leader']
except (KeyError, requests.exceptions.ConnectionError):
pass
time.sleep(1)
t+=1
# Perform a check on readyness while we're here.
if self.ready() is not True:
raise Exception('leader is available but node reports not ready')
return lr
def expect_leader_fail(self, timeout=TIMEOUT):
try:
self.wait_for_leader(self, timeout, log=False)
except:
return True
return False
def db_applied_index(self):
return int(self.status()['store']['db_applied_index'])
def fsm_index(self):
return int(self.status()['store']['fsm_index'])
def commit_index(self):
return int(self.status()['store']['raft']['commit_index'])
def applied_index(self):
return int(self.status()['store']['raft']['applied_index'])
def last_log_index(self):
return int(self.status()['store']['raft']['last_log_index'])
def last_snapshot_index(self):
return int(self.status()['store']['raft']['last_snapshot_index'])
def num_join_requests(self):
return int(self.expvar()['http']['joins'])
def num_snapshots(self):
return int(self.expvar()['store']['num_snapshots'])
def num_restores(self):
return int(self.expvar()['store']['num_restores'])
def wait_for_fsm_index(self, index, timeout=TIMEOUT):
'''
Wait until the given index has been applied to the state machine.
'''
t = 0
while self.fsm_index() < index:
if t > timeout:
raise Exception('timeout, target index: %d, actual index %d' % (index, self.fsm_index()))
time.sleep(1)
t+=1
return self.fsm_index()
def wait_for_commit_index(self, index, timeout=TIMEOUT):
'''
Wait until the commit index reaches the given value
'''
t = 0
while self.commit_index() < index:
if t > timeout:
raise Exception('wait_for_commit_index timeout')
time.sleep(1)
t+=1
return self.commit_index()
def wait_for_all_applied(self, timeout=TIMEOUT):
'''
Wait until the applied index equals the commit index.
'''
t = 0
while self.commit_index() != self.applied_index():
if t > timeout:
raise Exception('wait_for_all_applied timeout')
time.sleep(1)
t+=1
return self.applied_index()
def wait_for_all_fsm(self, timeout=TIMEOUT):
'''
Wait until all outstanding database commands have actually
been applied to the database i.e. state machine.
'''
t = 0
while self.fsm_index() != self.db_applied_index():
if t > timeout:
raise Exception('wait_for_all_fsm timeout')
time.sleep(1)
t+=1
return self.fsm_index()
def wait_for_restores(self, num, timeout=TIMEOUT):
'''
Wait until the number of snapshot-restores on this node reach
the given value.
'''
t = 0
while self.num_restores() != num:
if t > timeout:
raise Exception('wait_for_restores timeout wanted %d, got %d' % (num, self.num_restores()))
time.sleep(1)
t+=1
return self.num_restores()
def query(self, statement, params=None, level='weak', pretty=False, text=False, associative=False):
body = [statement]
if params is not None:
try:
body = body + params
except TypeError:
# Presumably not a list, so append as an object.
body.append(params)
reqParams = {'level': level}
if pretty:
reqParams['pretty'] = "yes"
if associative:
reqParams['associative'] = "yes"
r = requests.post(self._query_url(), params=reqParams, data=json.dumps([body]))
raise_for_status(r)
if text:
return r.text
return r.json()
def execute(self, statement, params=None):
body = [statement]
if params is not None:
try:
body = body + params
except TypeError:
# Presumably not a list, so append as an object.
body.append(params)
return self.execute_raw(json.dumps([body]))
def execute_raw(self, body):
r = requests.post(self._execute_url(), data=body)
raise_for_status(r)
return r.json()
def execute_queued(self, statement, wait=False, params=None):
body = [statement]
if params is not None:
try:
body = body + params
except TypeError:
# Presumably not a list, so append as an object.
body.append(params)
r = requests.post(self._execute_queued_url(wait), data=json.dumps([body]))
raise_for_status(r)
return r.json()
def backup(self, file):
with open(file, 'wb') as fd:
r = requests.get(self._backup_url())
raise_for_status(r)
fd.write(r.content)
def remove_node(self, id):
body = {"id": id}
r = requests.delete(self._remove_url(), data=json.dumps(body))
raise_for_status(r)
def restore(self, file, fmt=None):
# This is the one API that doesn't expect JSON.
if fmt != "binary":
conn = sqlite3.connect(file)
r = requests.post(self._load_url(), data='\n'.join(conn.iterdump()))
raise_for_status(r)
conn.close()
return r.json()
else:
with open(file, 'rb') as f:
data = f.read()
r = requests.post(self._load_url(), data=data, headers={'Content-Type': 'application/octet-stream'})
raise_for_status(r)
def redirect_addr(self):
r = requests.post(self._execute_url(redirect=True), data=json.dumps(['nonsense']), allow_redirects=False)
raise_for_status(r)
if r.status_code == 301:
return "%s://%s" % (urlparse(r.headers['Location']).scheme, urlparse(r.headers['Location']).netloc)
def set_peers(self, peers):
f = open(self.peers_path, "w")
f.write(json.dumps(peers))
f.close()
def dump_log(self, msg):
print(msg)
self.stderr_fd.close()
f = open(self.stderr_file, 'r')
for l in f.readlines():
print(l.strip())
def _status_url(self):
return 'http://' + self.APIAddr() + '/status'
def _nodes_url(self):
return 'http://' + self.APIAddr() + '/nodes?nonvoters' # Getting all nodes back makes testing easier
def _ready_url(self, noleader=False):
nl = ""
if noleader:
nl = "?noleader"
return 'http://' + self.APIAddr() + '/readyz' + nl
def _expvar_url(self):
return 'http://' + self.APIAddr() + '/debug/vars'
def _query_url(self, redirect=False):
rd = ""
if redirect:
rd = "?redirect"
return 'http://' + self.APIAddr() + '/db/query' + rd
def _execute_url(self, redirect=False):
rd = ""
if redirect:
rd = "?redirect"
return 'http://' + self.APIAddr() + '/db/execute' + rd
def _execute_queued_url(self, wait=False):
u = '/db/execute?queue'
if wait:
u = u + '&wait'
return 'http://' + self.APIAddr() + u
def _backup_url(self):
return 'http://' + self.APIAddr() + '/db/backup'
def _load_url(self):
return 'http://' + self.APIAddr() + '/db/load'
def _remove_url(self):
return 'http://' + self.APIAddr() + '/remove'
def __eq__(self, other):
return self.node_id == other.node_id
def __str__(self):
return '%s:[%s]:[%s]:[%s]' % (self.node_id, self.APIAddr(), self.raft_addr, self.dir)
def __del__(self):
self.stdout_fd.close()
self.stderr_fd.close()
def raise_for_status(r):
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
print(e)
print((r.text))
raise e
def random_addr():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 0))
return s, ':'.join([s.getsockname()[0], str(s.getsockname()[1])])
def deprovision_node(node):
node.stop()
if os.path.isdir(node.dir):
shutil.rmtree(node.dir)
node = None
class Cluster(object):
def __init__(self, nodes):
self.nodes = nodes
def wait_for_leader(self, node_exc=None, timeout=TIMEOUT):
t = 0
while True:
if t > timeout:
raise Exception('timeout')
for n in self.nodes:
if node_exc is not None and n == node_exc:
continue
if n.is_leader():
return n
time.sleep(1)
t+=1
def start(self):
for n in self.nodes:
n.start()
def stop(self):
for n in self.nodes:
n.stop()
def followers(self):
return [n for n in self.nodes if n.is_follower()]
def pids(self):
'''
Return a sorted list of all rqlited PIDs in the cluster.
'''
return sorted([n.pid() for n in self.nodes])
def deprovision(self):
for n in self.nodes:
deprovision_node(n)
class TestSingleNode(unittest.TestCase):
def setUp(self):
n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=2, raft_snap_int="1s")
n0.start()
n0.wait_for_leader()
self.cluster = Cluster([n0])
def tearDown(self):
self.cluster.deprovision()
def test_simple_raw_queries(self):
'''Test simple queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("fiona")')
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("declan")')
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar where id=2')
self.assertEqual(j, d_("{'results': [{'values': [[2, 'declan']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Ensure raw response from API is as expected.
j = n.query('SELECT * from bar', text=True)
self.assertEqual(str(j), '{"results":[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"declan"]]}]}')
# Ensure raw associative response from API is as expected.
j = n.query('SELECT * from bar', text=True, associative=True)
self.assertEqual(str(j), '{"results":[{"types":{"id":"integer","name":"text"},"rows":[{"id":1,"name":"fiona"},{"id":2,"name":"declan"}]}]}')
def test_simple_raw_queries_pretty(self):
'''Test simple queries, requesting pretty output, work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("fiona")')
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar', pretty=True, text=True)
exp = '''{
"results": [
{
"columns": [
"id",
"name"
],
"types": [
"integer",
"text"
],
"values": [
[
1,
"fiona"
]
]
}
]
}'''
self.assertEqual(str(j), exp)
def test_simple_parameterized_queries(self):
'''Test parameterized queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT, age INTEGER)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20])
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["declan", None])
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar WHERE age=?', params=[20])
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.query('SELECT * from bar WHERE age=?', params=[21])
self.assertEqual(j, d_("{'results': [{'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.query('SELECT * from bar WHERE name=?', params=['declan'])
self.assertEqual(j, d_("{'results': [{'values': [[2, 'declan', None]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
def test_simple_named_parameterized_queries(self):
'''Test named parameterized queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT, age INTEGER)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20])
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.query('SELECT * from bar WHERE age=:age', params={"age": 20})
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["declan", None])
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar WHERE name=:name', params={"name": "declan"})
self.assertEqual(j, d_("{'results': [{'values': [[2, 'declan', None]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
def test_simple_parameterized_mixed_queries(self):
'''Test a mix of parameterized and non-parameterized queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT, age INTEGER)')
self.assertEqual(j, d_("{'results': [{}]}"))
body = json.dumps([
["INSERT INTO bar(name, age) VALUES(?,?)", "fiona", 20],
['INSERT INTO bar(name, age) VALUES("sinead", 25)']
])
j = n.execute_raw(body)
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}, {'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20], [2, 'sinead', 25]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
def test_snapshot(self):
''' Test that a node peforms at least 1 snapshot'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
applied = n.wait_for_all_fsm()
# Wait for a snapshot to happen.
timeout = 10
t = 0
while True:
nSnaps = n.num_snapshots()
if nSnaps > 0:
return
if t > timeout:
raise Exception('timeout', nSnaps)
time.sleep(1)
t+=1
class TestSingleNodeOnDisk(TestSingleNode):
def setUp(self):
n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=2, raft_snap_int="1s", on_disk=True)
n0.start()
n0.wait_for_leader()
self.cluster = Cluster([n0])
class TestSingleNodeReadyz(unittest.TestCase):
def test(self):
''' Test /readyz behaves correctly'''
n0 = Node(RQLITED_PATH, '0')
n0.start(join="http://nonsense")
self.assertEqual(False, n0.ready())
self.assertEqual(True, n0.ready(noleader=True))
self.assertEqual(False, n0.ready(noleader=False))
deprovision_node(n0)
class TestEndToEnd(unittest.TestCase):
def setUp(self):
@ -975,7 +187,7 @@ class TestEndToEndEncryptedNode(TestEndToEnd):
self.cluster = Cluster([n0, n1, n2])
class TestSingleNodeEncryptedNoVerify(unittest.TestCase):
class TestJoinEncryptedNoVerify(unittest.TestCase):
def test(self):
''' Test that a joining node will not operate if remote cert can't be trusted'''
certFile = write_random_file(x509cert)
@ -1507,57 +719,6 @@ class TestEndToEndBackupRestore(unittest.TestCase):
deprovision_node(self.node4)
os.remove(self.db_file)
class TestEndToEndSnapRestoreSingle(unittest.TestCase):
def setUp(self):
self.n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=10, raft_snap_int="1s")
self.n0.start()
self.n0.wait_for_leader()
def waitForSnapIndex(self, n):
timeout = 10
t = 0
while True:
if t > timeout:
raise Exception('timeout')
if self.n0.last_snapshot_index() >= n:
break
time.sleep(1)
t+=1
def test_snap_and_restart(self):
'''Check that an node restarts correctly after multiple snapshots'''
# Let's get multiple snapshots done.
self.n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
for i in range(0,200):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_fsm()
self.waitForSnapIndex(175)
# Ensure node has the full correct state.
j = self.n0.query('SELECT count(*) FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[200]], 'types': [''], 'columns': ['count(*)']}]}"))
# Restart node, and make sure it comes back with the correct state
self.n0.stop()
self.n0.start()
self.n0.wait_for_leader()
self.n0.wait_for_all_applied()
self.assertEqual(self.n0.expvar()['store']['num_restores'], 1)
j = self.n0.query('SELECT count(*) FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[200]], 'types': [''], 'columns': ['count(*)']}]}"))
def tearDown(self):
deprovision_node(self.n0)
class TestEndToEndSnapRestoreSingleOnDisk(TestEndToEndSnapRestoreSingle):
def setUp(self):
self.n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=10, raft_snap_int="1s", on_disk=True)
self.n0.start()
self.n0.wait_for_leader()
class TestEndToEndSnapRestoreCluster(unittest.TestCase):
def waitForSnap(self, n):
timeout = 10

@ -0,0 +1,230 @@
#!/usr/bin/env python
import json
import os
import time
import unittest
from helpers import Node, Cluster, d_, deprovision_node, TIMEOUT
RQLITED_PATH = os.environ['RQLITED_PATH']
class TestSingleNode(unittest.TestCase):
def setUp(self):
n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=2, raft_snap_int="1s")
n0.start()
n0.wait_for_leader()
self.cluster = Cluster([n0])
def tearDown(self):
self.cluster.deprovision()
def test_simple_raw_queries(self):
'''Test simple queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("fiona")')
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("declan")')
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar where id=2')
self.assertEqual(j, d_("{'results': [{'values': [[2, 'declan']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
# Ensure raw response from API is as expected.
j = n.query('SELECT * from bar', text=True)
self.assertEqual(str(j), '{"results":[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"declan"]]}]}')
# Ensure raw associative response from API is as expected.
j = n.query('SELECT * from bar', text=True, associative=True)
self.assertEqual(str(j), '{"results":[{"types":{"id":"integer","name":"text"},"rows":[{"id":1,"name":"fiona"},{"id":2,"name":"declan"}]}]}')
def test_simple_raw_queries_pretty(self):
'''Test simple queries, requesting pretty output, work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name) VALUES("fiona")')
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar', pretty=True, text=True)
exp = '''{
"results": [
{
"columns": [
"id",
"name"
],
"types": [
"integer",
"text"
],
"values": [
[
1,
"fiona"
]
]
}
]
}'''
self.assertEqual(str(j), exp)
def test_simple_parameterized_queries(self):
'''Test parameterized queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT, age INTEGER)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20])
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["declan", None])
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar WHERE age=?', params=[20])
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.query('SELECT * from bar WHERE age=?', params=[21])
self.assertEqual(j, d_("{'results': [{'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.query('SELECT * from bar WHERE name=?', params=['declan'])
self.assertEqual(j, d_("{'results': [{'values': [[2, 'declan', None]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
def test_simple_named_parameterized_queries(self):
'''Test named parameterized queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT, age INTEGER)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20])
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.query('SELECT * from bar WHERE age=:age', params={"age": 20})
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["declan", None])
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar WHERE name=:name', params={"name": "declan"})
self.assertEqual(j, d_("{'results': [{'values': [[2, 'declan', None]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
def test_simple_parameterized_mixed_queries(self):
'''Test a mix of parameterized and non-parameterized queries work as expected'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT, age INTEGER)')
self.assertEqual(j, d_("{'results': [{}]}"))
body = json.dumps([
["INSERT INTO bar(name, age) VALUES(?,?)", "fiona", 20],
['INSERT INTO bar(name, age) VALUES("sinead", 25)']
])
j = n.execute_raw(body)
applied = n.wait_for_all_fsm()
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}, {'last_insert_id': 2, 'rows_affected': 1}]}"))
j = n.query('SELECT * from bar')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona', 20], [2, 'sinead', 25]], 'types': ['integer', 'text', 'integer'], 'columns': ['id', 'name', 'age']}]}"))
def test_snapshot(self):
''' Test that a node peforms at least 1 snapshot'''
n = self.cluster.wait_for_leader()
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}"))
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
applied = n.wait_for_all_fsm()
# Wait for a snapshot to happen.
timeout = 10
t = 0
while True:
nSnaps = n.num_snapshots()
if nSnaps > 0:
return
if t > timeout:
raise Exception('timeout', nSnaps)
time.sleep(1)
t+=1
class TestSingleNodeOnDisk(TestSingleNode):
def setUp(self):
n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=2, raft_snap_int="1s", on_disk=True)
n0.start()
n0.wait_for_leader()
self.cluster = Cluster([n0])
class TestSingleNodeReadyz(unittest.TestCase):
def test(self):
''' Test /readyz behaves correctly'''
n0 = Node(RQLITED_PATH, '0')
n0.start(join="http://nonsense")
self.assertEqual(False, n0.ready())
self.assertEqual(True, n0.ready(noleader=True))
self.assertEqual(False, n0.ready(noleader=False))
deprovision_node(n0)
class TestEndToEndSnapRestoreSingle(unittest.TestCase):
def setUp(self):
self.n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=10, raft_snap_int="1s")
self.n0.start()
self.n0.wait_for_leader()
def waitForSnapIndex(self, n):
timeout = 10
t = 0
while True:
if t > timeout:
raise Exception('timeout')
if self.n0.last_snapshot_index() >= n:
break
time.sleep(1)
t+=1
def test_snap_and_restart(self):
'''Check that an node restarts correctly after multiple snapshots'''
# Let's get multiple snapshots done.
self.n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
for i in range(0,200):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_fsm()
self.waitForSnapIndex(175)
# Ensure node has the full correct state.
j = self.n0.query('SELECT count(*) FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[200]], 'types': [''], 'columns': ['count(*)']}]}"))
# Restart node, and make sure it comes back with the correct state
self.n0.stop()
self.n0.start()
self.n0.wait_for_leader()
self.n0.wait_for_all_applied()
self.assertEqual(self.n0.expvar()['store']['num_restores'], 1)
j = self.n0.query('SELECT count(*) FROM foo', level='none')
self.assertEqual(j, d_("{'results': [{'values': [[200]], 'types': [''], 'columns': ['count(*)']}]}"))
def tearDown(self):
deprovision_node(self.n0)
class TestEndToEndSnapRestoreSingleOnDisk(TestEndToEndSnapRestoreSingle):
def setUp(self):
self.n0 = Node(RQLITED_PATH, '0', raft_snap_threshold=10, raft_snap_int="1s", on_disk=True)
self.n0.start()
self.n0.wait_for_leader()
if __name__ == "__main__":
unittest.main(verbosity=2)
Loading…
Cancel
Save