1
0
Fork 0

Download S3 object and check

master
Philip O'Toole 1 year ago
parent 2c9dcec40b
commit e7171b914e

@ -6,10 +6,12 @@
# #
# python system_test/full_system_test.py Class.test # python system_test/full_system_test.py Class.test
import boto3
import os import os
import json import json
import unittest import unittest
import random import random
import sqlite3
import string import string
import time import time
@ -20,6 +22,37 @@ RQLITED_PATH = os.environ['RQLITED_PATH']
def random_string(N): def random_string(N):
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=N)) return ''.join(random.choices(string.ascii_uppercase + string.digits, k=N))
def delete_s3_object(bucket_name, object_key):
"""
Delete an object from an S3 bucket.
Args:
bucket_name (str): The name of the S3 bucket.
object_key (str): The key of the object to delete.
"""
# Create a boto3 client for S3
s3_client = boto3.client('s3')
# Delete the object from the S3 bucket
s3_client.delete_object(Bucket=bucket_name, Key=object_key)
def download_s3_object(bucket_name, object_key):
"""
Download an object from an S3 bucket.
Args:
bucket_name (str): The name of the S3 bucket.
object_key (str): The key of the object to download.
"""
# Create a boto3 client for S3
s3_client = boto3.client('s3')
# Download the object from the S3 bucket
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
# Return the object contents
return response['Body'].read()
class TestAutoBackupS3(unittest.TestCase): class TestAutoBackupS3(unittest.TestCase):
'''Test that automatic backups to AWS S3 work''' '''Test that automatic backups to AWS S3 work'''
def test(self): def test(self):
@ -29,6 +62,7 @@ class TestAutoBackupS3(unittest.TestCase):
except KeyError: except KeyError:
return return
# Create the auto-backup config file
path = random_string(32) path = random_string(32)
auto_backup_cfg = { auto_backup_cfg = {
"version": 1, "version": 1,
@ -42,24 +76,39 @@ class TestAutoBackupS3(unittest.TestCase):
"path": path "path": path
} }
} }
cfg = write_random_file(json.dumps(auto_backup_cfg)) cfg = write_random_file(json.dumps(auto_backup_cfg))
# Create a node, enable automatic backups, and start it. Then
# create a table and insert a row.
node = Node(RQLITED_PATH, '0', auto_backup=cfg) node = Node(RQLITED_PATH, '0', auto_backup=cfg)
node.start() node.start()
node.wait_for_leader() node.wait_for_leader()
node.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') node.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
node.execute('INSERT INTO foo(name) VALUES("fiona")') node.execute('INSERT INTO foo(name) VALUES("fiona")')
node.wait_for_all_fsm() node.wait_for_all_fsm()
time.sleep(10) time.sleep(10)
deprovision_node(node) deprovision_node(node)
os.remove(cfg) os.remove(cfg)
# Download the backup file from S3 and check it.
os.environ['AWS_ACCESS_KEY_ID'] = os.environ['RQLITE_S3_ACCESS_KEY'] os.environ['AWS_ACCESS_KEY_ID'] = os.environ['RQLITE_S3_ACCESS_KEY']
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ['RQLITE_S3_SECRET_ACCESS_KEY'] os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ['RQLITE_S3_SECRET_ACCESS_KEY']
backupData = download_s3_object('rqlite-testing-circleci', path)
backupFile = write_random_file(backupData)
conn = sqlite3.connect(backupFile)
c = conn.cursor()
c.execute('SELECT * FROM foo')
rows = c.fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0][1], 'fiona')
conn.close()
# Remove the backup file and S3 object
os.remove(backupFile)
delete_s3_object('rqlite-testing-circleci', path)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main(verbosity=2) unittest.main(verbosity=2)
Loading…
Cancel
Save