1
0
Fork 0

Try an actual upload test

master
Philip O'Toole 1 year ago
parent ca2ac4b1ba
commit 35b3a157dc

@ -8,24 +8,53 @@
import os
import unittest
import random
import string
import time
from helpers import Node, deprovision_node
from helpers import Node, deprovision_node, write_random_file
RQLITED_PATH = os.environ['RQLITED_PATH']
def random_string(N):
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=N))
class TestAutoBackupS3(unittest.TestCase):
'''Test that automatic backups to AWS S3 work'''
def test(self):
if os.environ['RQLITE_S3_ACCESS_KEY'] is not None and os.environ['RQLITE_S3_SECRET_ACCESS_KEY'] is not None:
print("Running test because environment variables are set")
else:
print("Skipping test because environment variables are not set")
try:
if os.environ['RQLITE_S3_ACCESS_KEY'] == "":
return
except KeyError:
return
n0 = Node(RQLITED_PATH, '0')
n0.start()
n0.wait_until_leader()
deprovision_node(n0)
path = random_string(32)
auto_backup_cfg = {
"version": 1,
"type": "s3",
"interval": "5s",
"sub" : {
"access_key_id": os.environ['RQLITE_S3_ACCESS_KEY'],
"secret_access_key": os.environ['RQLITE_S3_SECRET_KEY'],
"region": "us-west-2",
"bucket": "rqlite-testing-circleci",
"path": "%s/db.sqlite3" % path
}
}
cfg = write_random_file(str(auto_backup_cfg))
node = Node(RQLITED_PATH, '0', auto_backup=cfg)
node.start()
node.wait_for_leader()
node.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
node.execute('INSERT INTO foo(name) VALUES("fiona")')
node.wait_for_all_fsm()
time.Sleep(10)
deprovision_node(node)
os.remove(cfg)
if __name__ == "__main__":

Loading…
Cancel
Save