1
0
Fork 0

Remove ability to trigger chunked-loading

The low-level Raft system still recognizes the Chunk command, so that
this system can interop with older versions, which might still have a
Chunk command in logs.
master
Philip O'Toole 9 months ago
parent a0ee2e58fb
commit e6b703a0c0

@ -233,32 +233,6 @@ func (c *Client) Load(lr *command.LoadRequest, nodeAddr string, creds *Credentia
return nil
}
// LoadChunk loads a chunk of a SQLite file into the database.
func (c *Client) LoadChunk(lcr *command.LoadChunkRequest, nodeAddr string, creds *Credentials, timeout time.Duration) error {
command := &Command{
Type: Command_COMMAND_TYPE_LOAD_CHUNK,
Request: &Command_LoadChunkRequest{
LoadChunkRequest: lcr,
},
Credentials: creds,
}
p, err := c.retry(command, nodeAddr, timeout)
if err != nil {
return err
}
a := &CommandLoadChunkResponse{}
err = proto.Unmarshal(p, a)
if err != nil {
return err
}
if a.Error != "" {
return errors.New(a.Error)
}
return nil
}
// RemoveNode removes a node from the cluster
func (c *Client) RemoveNode(rn *command.RemoveNodeRequest, nodeAddr string, creds *Credentials, timeout time.Duration) error {
conn, err := c.dial(nodeAddr, c.timeout)

@ -30,7 +30,6 @@ const (
numRequestRequest = "num_request_req"
numBackupRequest = "num_backup_req"
numLoadRequest = "num_load_req"
numLoadChunkRequest = "num_load_chunk_req"
numRemoveNodeRequest = "num_remove_node_req"
numNotifyRequest = "num_notify_req"
numJoinRequest = "num_join_req"
@ -57,7 +56,6 @@ func init() {
stats.Add(numRequestRequest, 0)
stats.Add(numBackupRequest, 0)
stats.Add(numLoadRequest, 0)
stats.Add(numLoadChunkRequest, 0)
stats.Add(numRemoveNodeRequest, 0)
stats.Add(numGetNodeAPIRequestLocal, 0)
stats.Add(numNotifyRequest, 0)
@ -89,9 +87,6 @@ type Database interface {
// Loads an entire SQLite file into the database
Load(lr *command.LoadRequest) error
// LoadChunk loads a chunk of a SQLite file into the database
LoadChunk(lcr *command.LoadChunkRequest) error
}
// Manager is the interface node-management systems must implement
@ -401,18 +396,8 @@ func (s *Service) handleConn(conn net.Conn) {
marshalAndWrite(conn, resp)
case Command_COMMAND_TYPE_LOAD_CHUNK:
stats.Add(numLoadChunkRequest, 1)
resp := &CommandLoadChunkResponse{}
lcr := c.GetLoadChunkRequest()
if lcr == nil {
resp.Error = "LoadChunkRequest is nil"
} else if !s.checkCommandPerm(c, auth.PermLoad) {
resp.Error = "unauthorized"
} else {
if err := s.db.LoadChunk(lcr); err != nil {
resp.Error = fmt.Sprintf("remote node failed to load chunk: %s", err.Error())
}
resp := &CommandLoadChunkResponse{
Error: "unsupported",
}
marshalAndWrite(conn, resp)

@ -356,53 +356,6 @@ func Test_ServiceLoad(t *testing.T) {
}
}
func Test_ServiceLoadChunk(t *testing.T) {
ln, mux := mustNewMux()
go mux.Serve()
tn := mux.Listen(1) // Could be any byte value.
db := mustNewMockDatabase()
mgr := mustNewMockManager()
cred := mustNewMockCredentialStore()
s := New(tn, db, mgr, cred)
if s == nil {
t.Fatalf("failed to create cluster service")
}
c := NewClient(mustNewDialer(1, false, false), 30*time.Second)
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service: %s", err.Error())
}
// Ready for Load tests now.
called := false
testData := []byte("this is SQLite data")
db.loadChunkFn = func(lc *command.LoadChunkRequest) error {
called = true
if !bytes.Equal(lc.Data, testData) {
t.Fatalf("load data is not as expected, exp: %s, got: %s", testData, lc.Data)
}
return nil
}
err := c.LoadChunk(loadChunkRequest(testData), s.Addr(), NO_CREDS, longWait)
if err != nil {
t.Fatalf("failed to load database: %s", err.Error())
}
if !called {
t.Fatal("load not called on database")
}
// Clean up resources.
if err := ln.Close(); err != nil {
t.Fatalf("failed to close Mux's listener: %s", err)
}
if err := s.Close(); err != nil {
t.Fatalf("failed to close cluster service")
}
}
func Test_ServiceRemoveNode(t *testing.T) {
ln, mux := mustNewMux()
go mux.Serve()
@ -658,12 +611,6 @@ func loadRequest(b []byte) *command.LoadRequest {
}
}
func loadChunkRequest(b []byte) *command.LoadChunkRequest {
return &command.LoadChunkRequest{
Data: b,
}
}
func removeNodeRequest(id string) *command.RemoveNodeRequest {
return &command.RemoveNodeRequest{
Id: id,

@ -432,12 +432,11 @@ func mustNewMockTLSTransport() *mockTransport {
}
type mockDatabase struct {
executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error)
requestFn func(rr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error)
backupFn func(br *command.BackupRequest, dst io.Writer) error
loadFn func(lr *command.LoadRequest) error
loadChunkFn func(lcr *command.LoadChunkRequest) error
executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error)
requestFn func(rr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error)
backupFn func(br *command.BackupRequest, dst io.Writer) error
loadFn func(lr *command.LoadRequest) error
}
func (m *mockDatabase) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
@ -469,13 +468,6 @@ func (m *mockDatabase) Load(lr *command.LoadRequest) error {
return m.loadFn(lr)
}
func (m *mockDatabase) LoadChunk(lcr *command.LoadChunkRequest) error {
if m.loadChunkFn == nil {
return nil
}
return m.loadChunkFn(lcr)
}
func mustNewMockDatabase() *mockDatabase {
e := func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
return []*command.ExecuteResult{}, nil

@ -4,7 +4,6 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"
@ -35,13 +34,6 @@ func NewQueryParams(r *http.Request) (QueryParams, error) {
}
}
}
sz, ok := qp["chunk_kb"]
if ok {
_, err := strconv.Atoi(sz)
if err != nil {
return nil, fmt.Errorf("chunk_kb is not an integer")
}
}
q, ok := qp["q"]
if ok {
if q == "" {
@ -81,16 +73,6 @@ func (qp QueryParams) Wait() bool {
return qp.HasKey("wait")
}
// ChunkKB returns the requested chunk size.
func (qp QueryParams) ChunkKB(defSz int) int {
s, ok := qp["chunk_kb"]
if !ok {
return defSz
}
sz, _ := strconv.Atoi(s)
return sz * 1024
}
// Associative returns true if the query parameters request associative results.
func (qp QueryParams) Associative() bool {
return qp.HasKey("associative")

@ -16,9 +16,8 @@ func Test_NewQueryParams(t *testing.T) {
expectError bool
}{
{"Empty Query", "", QueryParams{}, false},
{"Valid Query", "timeout=10s&chunk_kb=1024&q=test", QueryParams{"timeout": "10s", "chunk_kb": "1024", "q": "test"}, false},
{"Valid Query", "timeout=10s&q=test", QueryParams{"timeout": "10s", "q": "test"}, false},
{"Invalid Timeout", "timeout=invalid", nil, true},
{"Invalid ChunkKB", "chunk_kb=invalid", nil, true},
{"Empty Q", "q=", nil, true},
{"Invalid Q", "q", nil, true},
{"Valid Q, no case changes", "q=SELeCT", QueryParams{"q": "SELeCT"}, false},

@ -29,16 +29,9 @@ import (
"github.com/rqlite/rqlite/store"
)
const (
defaultChunkSize = 64 * 1024 * 1024 // 64 MB
)
var (
// ErrLeaderNotFound is returned when a node cannot locate a leader
ErrLeaderNotFound = errors.New("leader not found")
// ErrRemoteLoadNotAuthorized is returned when a remote node is not authorized to load a chunk
ErrRemoteLoadNotAuthorized = errors.New("remote load not authorized")
)
type ResultsError interface {
@ -64,9 +57,6 @@ type Database interface {
// an Execute or Query request.
Request(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error)
// LoadChunk loads a SQLite database into the node, chunk by chunk.
LoadChunk(lc *command.LoadChunkRequest) error
// Load loads a SQLite file into the system
Load(lr *command.LoadRequest) error
}
@ -116,9 +106,6 @@ type Cluster interface {
// Backup retrieves a backup from a remote node and writes to the io.Writer.
Backup(br *command.BackupRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration, w io.Writer) error
// LoadChunk loads a SQLite database into the node, chunk by chunk.
LoadChunk(lc *command.LoadChunkRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) error
// Load loads a SQLite database into the node.
Load(lr *command.LoadRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) error

@ -1236,11 +1236,6 @@ func Test_timeoutVersionPrettyQueryParam(t *testing.T) {
u: "http://localhost:4001/nodes?timeout=zdfjkh",
parseErr: true,
},
{
u: "http://localhost:4001/db/load?chunk_kb=aaaa",
dur: defStr,
parseErr: true,
},
{
u: "http://localhost:4001/db/query?q=",
dur: defStr,
@ -1275,14 +1270,13 @@ func Test_timeoutVersionPrettyQueryParam(t *testing.T) {
}
type MockStore struct {
executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error)
requestFn func(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error)
backupFn func(br *command.BackupRequest, dst io.Writer) error
loadFn func(lr *command.LoadRequest) error
loadChunkFn func(lr *command.LoadChunkRequest) error
leaderAddr string
notReady bool // Default value is true, easier to test.
executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error)
requestFn func(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error)
backupFn func(br *command.BackupRequest, dst io.Writer) error
loadFn func(lr *command.LoadRequest) error
leaderAddr string
notReady bool // Default value is true, easier to test.
}
func (m *MockStore) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
@ -1341,13 +1335,6 @@ func (m *MockStore) Backup(br *command.BackupRequest, w io.Writer) error {
return m.backupFn(br, w)
}
func (m *MockStore) LoadChunk(lc *command.LoadChunkRequest) error {
if m.loadChunkFn != nil {
return m.loadChunkFn(lc)
}
return nil
}
func (m *MockStore) Load(lr *command.LoadRequest) error {
if m.loadFn != nil {
return m.loadFn(lr)
@ -1362,7 +1349,6 @@ type mockClusterService struct {
requestFn func(eqr *command.ExecuteQueryRequest, nodeAddr string, timeout time.Duration) ([]*command.ExecuteQueryResponse, error)
backupFn func(br *command.BackupRequest, addr string, t time.Duration, w io.Writer) error
loadFn func(lr *command.LoadRequest, addr string, t time.Duration) error
loadChunkFn func(lc *command.LoadChunkRequest, addr string, t time.Duration) error
removeNodeFn func(rn *command.RemoveNodeRequest, nodeAddr string, t time.Duration) error
}
@ -1398,13 +1384,6 @@ func (m *mockClusterService) Backup(br *command.BackupRequest, addr string, cred
return nil
}
func (m *mockClusterService) LoadChunk(lc *command.LoadChunkRequest, addr string, creds *cluster.Credentials, t time.Duration) error {
if m.loadChunkFn != nil {
return m.loadChunkFn(lc, addr, t)
}
return nil
}
func (m *mockClusterService) Load(lr *command.LoadRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) error {
if m.loadFn != nil {
return m.loadFn(lr, nodeAddr, timeout)

@ -88,8 +88,6 @@ const (
raftLogCacheSize = 512
trailingScale = 1.25
observerChanLen = 50
defaultChunkSize = 64 * 1024 * 1024 // 64 MB
)
const (
@ -195,9 +193,8 @@ type Store struct {
peersPath string
peersInfoPath string
restoreChunkSize int64
restorePath string
restoreDoneCh chan struct{}
restorePath string
restoreDoneCh chan struct{}
raft *raft.Raft // The consensus mechanism.
ln Listener
@ -213,9 +210,7 @@ type Store struct {
dbAppliedIndex uint64
appliedIdxUpdateDone chan struct{}
dechunkManager *chunking.DechunkerManager
loadsInProgressMu sync.Mutex
loadsInProgress map[string]struct{}
dechunkManager *chunking.DechunkerManager
// Channels that must be closed for the Store to be considered ready.
readyChans []<-chan struct{}
@ -314,21 +309,19 @@ func New(ln Listener, c *Config) *Store {
}
return &Store{
ln: ln,
raftDir: c.Dir,
peersPath: filepath.Join(c.Dir, peersPath),
peersInfoPath: filepath.Join(c.Dir, peersInfoPath),
restoreChunkSize: defaultChunkSize,
restoreDoneCh: make(chan struct{}),
loadsInProgress: make(map[string]struct{}),
raftID: c.ID,
dbConf: c.DBConf,
dbPath: dbPath,
leaderObservers: make([]chan<- struct{}, 0),
reqMarshaller: command.NewRequestMarshaler(),
logger: logger,
notifyingNodes: make(map[string]*Server),
ApplyTimeout: applyTimeout,
ln: ln,
raftDir: c.Dir,
peersPath: filepath.Join(c.Dir, peersPath),
peersInfoPath: filepath.Join(c.Dir, peersInfoPath),
restoreDoneCh: make(chan struct{}),
raftID: c.ID,
dbConf: c.DBConf,
dbPath: dbPath,
leaderObservers: make([]chan<- struct{}, 0),
reqMarshaller: command.NewRequestMarshaler(),
logger: logger,
notifyingNodes: make(map[string]*Server),
ApplyTimeout: applyTimeout,
}
}
@ -357,12 +350,6 @@ func (s *Store) SetRestorePath(path string) error {
return nil
}
// SetRestoreChunkSize sets the chunk size to use when restoring a database.
// If not set, the default chunk size is used.
func (s *Store) SetRestoreChunkSize(size int64) {
s.restoreChunkSize = size
}
// Open opens the Store.
func (s *Store) Open() (retErr error) {
defer func() {
@ -981,7 +968,6 @@ func (s *Store) Stats() (map[string]interface{}, error) {
"trailing_logs": s.numTrailingLogs,
"request_marshaler": s.reqMarshaller.Stats(),
"nodes": nodes,
"loads_in_progress": s.NumLoadsInProgress(),
"dir": s.raftDir,
"dir_size": dirSz,
"sqlite3": dbStatus,
@ -1211,100 +1197,6 @@ func (s *Store) Backup(br *command.BackupRequest, dst io.Writer) (retErr error)
return ErrInvalidBackupFormat
}
// LoadFromReader reads data from r chunk-by-chunk, and loads it into the
// database.
func (s *Store) LoadFromReader(r io.Reader, chunkSize int64) error {
if !s.open {
return ErrNotOpen
}
if !s.Ready() {
return ErrNotReady
}
return s.loadFromReader(r, chunkSize)
}
// loadFromReader reads data from r chunk-by-chunk, and loads it into the
// database. It is for internal use only. It does not check for readiness.
func (s *Store) loadFromReader(r io.Reader, chunkSize int64) error {
chunker := chunking.NewChunker(r, chunkSize)
for {
chunk, err := chunker.Next()
if err != nil {
return err
}
if err := s.loadChunk(chunk); err != nil {
return err
}
if chunk.IsLast {
break
}
}
return nil
}
// LoadChunk loads a chunk of data into the database, sending the request
// through the Raft log.
func (s *Store) LoadChunk(lcr *command.LoadChunkRequest) error {
if !s.open {
return ErrNotOpen
}
if !s.Ready() {
return ErrNotReady
}
return s.loadChunk(lcr)
}
// loadChunk loads a chunk of data into the database, and is for internal use
// only. It does not check for readiness.
func (s *Store) loadChunk(lcr *command.LoadChunkRequest) error {
b, err := command.MarshalLoadChunkRequest(lcr)
if err != nil {
return err
}
c := &command.Command{
Type: command.Command_COMMAND_TYPE_LOAD_CHUNK,
SubCommand: b,
}
b, err = command.Marshal(c)
if err != nil {
return err
}
af := s.raft.Apply(b, s.ApplyTimeout)
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return ErrNotLeader
}
return af.Error()
}
// Send one last command through the log to deal with issues in
// underlying Raft code when truncating log to zero trailing.
if lcr.Abort || lcr.IsLast {
af, err = s.Noop("load-chunk-trailing")
if err != nil {
return err
}
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return ErrNotLeader
}
return af.Error()
}
}
s.dbAppliedIndexMu.Lock()
s.dbAppliedIndex = af.Index()
s.dbAppliedIndexMu.Unlock()
return nil
}
// Loads an entire SQLite file into the database, sending the request
// through the Raft log.
func (s *Store) Load(lr *command.LoadRequest) error {
@ -1671,36 +1563,9 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
s.logger.Printf("first log applied since node start, log at index %d", l.Index)
}
snapshotNeeded := false
cmd, r := applyCommand(s.logger, l.Data, &s.db, s.dechunkManager)
switch cmd.Type {
case command.Command_COMMAND_TYPE_NOOP:
if cmd.Type == command.Command_COMMAND_TYPE_NOOP {
s.numNoops++
case command.Command_COMMAND_TYPE_LOAD:
snapshotNeeded = true
case command.Command_COMMAND_TYPE_LOAD_CHUNK:
var lcr command.LoadChunkRequest
if err := command.UnmarshalLoadChunkRequest(cmd.SubCommand, &lcr); err != nil {
panic(fmt.Sprintf("failed to unmarshal load-chunk subcommand: %s", err.Error()))
}
snapshotNeeded = lcr.IsLast
func() {
s.loadsInProgressMu.Lock()
defer s.loadsInProgressMu.Unlock()
if lcr.IsLast || lcr.Abort {
delete(s.loadsInProgress, lcr.StreamId)
} else {
s.loadsInProgress[lcr.StreamId] = struct{}{}
}
}()
}
if snapshotNeeded {
if err := s.snapshotStore.SetFullNeeded(); err != nil {
return &fsmGenericResponse{error: fmt.Errorf("failed to SetFullNeeded post load: %s", err)}
}
s.logger.Printf("last chunk loaded, forcing snapshot of database")
s.snapshotTChan <- struct{}{}
}
return r
}
@ -1733,34 +1598,6 @@ func (s *Store) Database(leader bool) ([]byte, error) {
func (s *Store) fsmSnapshot() (raft.FSMSnapshot, error) {
startT := time.Now()
if s.NumLoadsInProgress() > 0 {
// There are loads in progress. Now, the next thing that has to be true
// is that the last log entry must be a load-chunk command, and it must
// indicate an ongoing load. If that is not the case, then some other
// request has been sent to this systemg, and all load operations are invalid.
lastIdx, err := s.boltStore.LastIndex()
if err != nil {
return nil, err
}
lastLog := &raft.Log{}
if err := s.boltStore.GetLog(lastIdx, lastLog); err != nil {
return nil, err
}
var c command.Command
if err := command.Unmarshal(lastLog.Data, &c); err != nil {
panic(fmt.Sprintf("failed to unmarshal last log entry: %s", err.Error()))
}
if c.Type == command.Command_COMMAND_TYPE_LOAD_CHUNK {
var lcr command.LoadChunkRequest
if err := command.UnmarshalLoadChunkRequest(c.SubCommand, &lcr); err != nil {
panic(fmt.Sprintf("failed to unmarshal load-chunk subcommand: %s", err.Error()))
}
if !lcr.IsLast && !lcr.Abort {
return nil, ErrLoadInProgress
}
}
}
fullNeeded, err := s.snapshotStore.FullNeeded()
if err != nil {
return nil, err
@ -2075,13 +1912,6 @@ func (s *Store) logSize() (int64, error) {
return fi.Size(), nil
}
// NumLoadsInProgress returns the number of loads currently in progress.
func (s *Store) NumLoadsInProgress() int {
s.loadsInProgressMu.Lock()
defer s.loadsInProgressMu.Unlock()
return len(s.loadsInProgress)
}
// tryCompress attempts to compress the given command. If the command is
// successfully compressed, the compressed byte slice is returned, along with
// a boolean true. If the command cannot be compressed, the uncompressed byte

@ -2,13 +2,11 @@ package store
import (
"fmt"
"os"
"path/filepath"
"testing"
"time"
"github.com/rqlite/rqlite/command"
"github.com/rqlite/rqlite/command/chunking"
)
func test_OpenStoreCloseStartup(t *testing.T, s *Store) {
@ -280,7 +278,7 @@ func Test_StoreSnapshotStressSingleNode(t *testing.T) {
test_SnapshotStress(t, s)
}
func Test_StoreLoadChunk_Restart(t *testing.T) {
func Test_StoreLoad_Restart(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
@ -295,36 +293,10 @@ func Test_StoreLoadChunk_Restart(t *testing.T) {
t.Fatalf("Error waiting for leader: %s", err)
}
// Open the SQLite file for loading.
f, err := os.Open(filepath.Join("testdata", "load.sqlite"))
err := s.Load(loadRequestFromFile(filepath.Join("testdata", "load.sqlite")))
if err != nil {
t.Fatalf("failed to open SQLite file: %s", err.Error())
t.Fatalf("failed to load: %s", err.Error())
}
defer f.Close()
numSnapshots := s.numSnapshots
chunker := chunking.NewChunker(f, 2048)
for {
chunk, err := chunker.Next()
if err != nil {
t.Fatalf("failed to read next chunk: %s", err.Error())
}
err = s.LoadChunk(chunk)
if err != nil {
t.Fatalf("failed to load chunk: %s", err.Error())
}
if chunk.IsLast {
break
}
}
// Chunked loading should trigger a snapshot, so check that the snapshot
// exists. Check that numSnapshots is 1
testPoll(t, func() bool {
s.numSnapshotsMu.Lock()
defer s.numSnapshotsMu.Unlock()
return s.numSnapshots == numSnapshots+1
}, 100*time.Millisecond, 3*time.Second)
// Check store can be re-opened.
if err := s.Close(true); err != nil {

@ -13,7 +13,6 @@ import (
"time"
"github.com/rqlite/rqlite/command"
"github.com/rqlite/rqlite/command/chunking"
"github.com/rqlite/rqlite/command/encoding"
"github.com/rqlite/rqlite/db"
"github.com/rqlite/rqlite/random"
@ -1249,436 +1248,6 @@ COMMIT;
}
}
// Test_SingleNodeLoadChunkBinary tests that a Store correctly loads data in SQLite
// binary format from a file using chunked loading.
func Test_SingleNodeLoadChunkBinary(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
defer s.Close(true)
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Load a dataset, to check it's erased by the SQLite file load.
dump := `PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE bar (id integer not null primary key, name text);
INSERT INTO "bar" VALUES(1,'declan');
COMMIT;
`
_, err := s.Execute(executeRequestFromString(dump, false, false))
if err != nil {
t.Fatalf("failed to load simple dump: %s", err.Error())
}
// Check that data were loaded correctly.
qr := queryRequestFromString("SELECT * FROM bar", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err := s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"declan"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Open the SQLite file for loading.
f, err := os.Open(filepath.Join("testdata", "load.sqlite"))
if err != nil {
t.Fatalf("failed to open SQLite file: %s", err.Error())
}
defer f.Close()
numSnapshots := s.numSnapshots
chunker := chunking.NewChunker(f, 2048)
for {
chunk, err := chunker.Next()
if err != nil {
t.Fatalf("failed to read next chunk: %s", err.Error())
}
err = s.LoadChunk(chunk)
if err != nil {
t.Fatalf("failed to load chunk: %s", err.Error())
}
if chunk.IsLast {
break
}
}
// Chunked loading should trigger a snapshot, so check that the snapshot
// exists. Check that numSnapshots is 1
testPoll(t, func() bool {
s.numSnapshotsMu.Lock()
defer s.numSnapshotsMu.Unlock()
return s.numSnapshots == numSnapshots+1
}, 100*time.Millisecond, 3*time.Second)
// Check that data were loaded correctly.
qr = queryRequestFromString("SELECT * FROM foo WHERE id=2", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[2,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
qr = queryRequestFromString("SELECT count(*) FROM foo", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[3]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Check pre-existing data is gone.
qr = queryRequestFromString("SELECT * FROM bar", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `{"error":"no such table: bar"}`, asJSON(r[0]); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
// Test_SingleNodeLoadChunkBinary tests that a Store reopens with the correct
// state after loading data in SQLite binary format from a file using chunked loading.
// This is important because chunked loading involves forcing the creation of a new
// snapshot.
func Test_SingleNodeLoadChunkBinaryReopen(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
defer s.Close(true)
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Open the file for loading.
f, err := os.Open(filepath.Join("testdata", "load.sqlite"))
if err != nil {
t.Fatalf("failed to open SQLite file: %s", err.Error())
}
defer f.Close()
numSnapshots := s.numSnapshots
chunker := chunking.NewChunker(f, 2048)
for {
chunk, err := chunker.Next()
if err != nil {
t.Fatalf("failed to read next chunk: %s", err.Error())
}
err = s.LoadChunk(chunk)
if err != nil {
t.Fatalf("failed to load chunk: %s", err.Error())
}
if chunk.IsLast {
break
}
}
// Chunked loading should trigger a snapshot, so check that the snapshot
// exists. Check that numSnapshots is 1
testPoll(t, func() bool {
s.numSnapshotsMu.Lock()
defer s.numSnapshotsMu.Unlock()
return s.numSnapshots == numSnapshots+1
}, 100*time.Millisecond, 3*time.Second)
// Close and re-open the store.
if err := s.Close(true); err != nil {
t.Fatalf("failed to close store: %s", err.Error())
}
if err := s.Open(); err != nil {
t.Fatalf("failed to re-open store: %s", err.Error())
}
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Check that data were loaded correctly.
qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err := s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["COUNT(*)"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[3]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Insert one more row to be sure everything is still working.
er := executeRequestFromString("INSERT INTO foo VALUES(4, 'bob')", false, true)
_, err = s.Execute(er)
if err != nil {
t.Fatalf("failed to insert into foo: %s", err.Error())
}
qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["COUNT(*)"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[4]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Close and re-open the store, and check one last time.
if err := s.Close(true); err != nil {
t.Fatalf("failed to close store: %s", err.Error())
}
if err := s.Open(); err != nil {
t.Fatalf("failed to re-open store: %s", err.Error())
}
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["COUNT(*)"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[4]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
// Test_SingleNodeLoadChunk_SnapshotBlock tests that a Store correctly loads data in SQLite
// binary format from a file using chunked loading, and that snapshotting is blocked
// during the load.
func Test_SingleNodeLoadChunk_SnapshotBlock(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
defer s.Close(true)
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
getChunker := func() (*os.File, *chunking.Chunker) {
// Open the SQLite file for loading.
f, err := os.Open(filepath.Join("testdata", "load.sqlite"))
if err != nil {
t.Fatalf("failed to open SQLite file: %s", err.Error())
}
return f, chunking.NewChunker(f, 2048)
}
// Load the first chunk, which should block snapshotting, complete
// the load, and then check that snapshotting is no longer blocked.
fd, chunker := getChunker()
defer fd.Close()
chunk, err := chunker.Next()
if err != nil {
t.Fatalf("failed to read first chunk: %s", err.Error())
}
if err := s.LoadChunk(chunk); err != nil {
t.Fatalf("failed to load first chunk: %s", err.Error())
}
if s.NumLoadsInProgress() != 1 {
t.Fatalf("expected 1 load in progress, got %d", s.NumLoadsInProgress())
}
if err := s.Snapshot(); err != ErrLoadInProgress {
t.Fatalf("snapshot should have been blocked")
}
for {
chunk, err := chunker.Next()
if err != nil {
t.Fatalf("failed to read next chunk: %s", err.Error())
}
err = s.LoadChunk(chunk)
if err != nil {
t.Fatalf("failed to load chunk: %s", err.Error())
}
if chunk.IsLast {
break
}
}
if err := s.Snapshot(); err != nil {
t.Fatalf("snapshot should not have been blocked")
}
if s.NumLoadsInProgress() != 0 {
t.Fatalf("expected 0 load in progress, got %d", s.NumLoadsInProgress())
}
// Load the first chunk, which should block snapshotting, send an
// Abort chunk, then check that snapshotting is no longer blocked.
fd, chunker = getChunker()
defer fd.Close()
chunk, err = chunker.Next()
if err != nil {
t.Fatalf("failed to read first chunk: %s", err.Error())
}
if err := s.LoadChunk(chunk); err != nil {
t.Fatalf("failed to load first chunk: %s", err.Error())
}
if err := s.Snapshot(); err != ErrLoadInProgress {
t.Fatalf("snapshot should have been blocked")
}
if err := s.LoadChunk(chunker.Abort()); err != nil {
t.Fatalf("failed to load abort chunk: %s", err.Error())
}
if err := s.Snapshot(); err != nil {
t.Fatalf("snapshot should not have been blocked")
}
// Load the first chunk, which should block snapshotting, then send
// execute, then check that snapshotting is no longer blocked.
fd, chunker = getChunker()
defer fd.Close()
chunk, err = chunker.Next()
if err != nil {
t.Fatalf("failed to read first chunk: %s", err.Error())
}
if err := s.LoadChunk(chunk); err != nil {
t.Fatalf("failed to load first chunk: %s", err.Error())
}
if err := s.Snapshot(); err != ErrLoadInProgress {
t.Fatalf("snapshot should have been blocked")
}
er := executeRequestFromString("anything at all, doesn't matter", false, false)
_, err = s.Execute(er)
if err != nil {
t.Fatalf("failed to execute: %s", err.Error())
}
if err := s.Snapshot(); err != nil {
t.Fatalf("snapshot should not have been blocked, err: %s", err.Error())
}
}
// Test_SingleNodeLoadBinaryFromReader tests that a Store correctly loads data in SQLite
// binary format from a reader.
func Test_SingleNodeLoadBinaryFromReader(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
defer s.Close(true)
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Load a dataset, to check it's erased by the SQLite file load.
dump := `PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE bar (id integer not null primary key, name text);
INSERT INTO "bar" VALUES(1,'declan');
COMMIT;
`
_, err := s.Execute(executeRequestFromString(dump, false, false))
if err != nil {
t.Fatalf("failed to load simple dump: %s", err.Error())
}
// Check that data were loaded correctly.
qr := queryRequestFromString("SELECT * FROM bar", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err := s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"declan"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
f, err := os.Open(filepath.Join("testdata", "load.sqlite"))
if err != nil {
t.Fatalf("failed to open SQLite file: %s", err.Error())
}
defer f.Close()
err = s.LoadFromReader(f, 1024*1024)
if err != nil {
t.Fatalf("failed to load SQLite file via Reader: %s", err.Error())
}
// Check that data were loaded correctly.
qr = queryRequestFromString("SELECT * FROM foo WHERE id=2", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[2,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
qr = queryRequestFromString("SELECT count(*) FROM foo", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[3]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Check pre-existing data is gone.
qr = queryRequestFromString("SELECT * FROM bar", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `{"error":"no such table: bar"}`, asJSON(r[0]); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_SingleNodeAutoRestore(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()

@ -494,18 +494,18 @@ class Node(object):
r = requests.delete(self._remove_url(), data=json.dumps(body))
raise_for_status(r)
def restore(self, file, fmt=None, chunk_kb=None):
def restore(self, file, fmt=None):
# This is the one API that doesn't expect JSON.
if fmt != "binary":
conn = sqlite3.connect(file)
r = requests.post(self._load_url(chunk_kb), data='\n'.join(conn.iterdump()))
r = requests.post(self._load_url(), data='\n'.join(conn.iterdump()))
raise_for_status(r)
conn.close()
return r.json()
else:
with open(file, 'rb') as f:
data = f.read()
r = requests.post(self._load_url(chunk_kb), data=data, headers={'Content-Type': 'application/octet-stream'})
r = requests.post(self._load_url(), data=data, headers={'Content-Type': 'application/octet-stream'})
raise_for_status(r)
def redirect_addr(self):
@ -559,10 +559,7 @@ class Node(object):
return 'http://' + self.APIAddr() + '/db/request' + rd
def _backup_url(self):
return 'http://' + self.APIAddr() + '/db/backup'
def _load_url(self, chunk_kb=None):
ckb = ""
if chunk_kb is not None:
ckb = '?chunk_kb=%d' % chunk_kb
def _load_url(self):
return 'http://' + self.APIAddr() + '/db/load' + ckb
def _remove_url(self):
return 'http://' + self.APIAddr() + '/remove'

@ -188,14 +188,11 @@ class TestSingleNodeLoadRestart(unittest.TestCase):
self.n = Node(RQLITED_PATH, '0', raft_snap_threshold=8192, raft_snap_int="30s")
self.n.start()
n = self.n.wait_for_leader()
j = self.n.restore('system_test/e2e/testdata/1000-numbers.db', fmt='binary',chunk_kb=4)
j = self.n.restore('system_test/e2e/testdata/1000-numbers.db', fmt='binary')
j = self.n.query('SELECT COUNT(*) from test')
self.assertEqual(j, d_("{'results': [{'values': [[1000]], 'types': ['integer'], 'columns': ['COUNT(*)']}]}"))
# Wait for a snapshot and persist to happen.
time.sleep(5)
# Ensure node can restart after chunked loading -- and the log truncation it forces..
# Ensure node can restart after loading
self.n.stop()
self.n.start()
self.n.wait_for_leader()

Loading…
Cancel
Save