1
0
Fork 0

Merge pull request #228 from rqlite/no_batch_load

Start removing batch load
master
Philip O'Toole 8 years ago committed by GitHub
commit ec0922ec62

@ -56,7 +56,7 @@ type Store interface {
Backup(leader bool) ([]byte, error) Backup(leader bool) ([]byte, error)
// Load loads a SQLite .dump state from a reader // Load loads a SQLite .dump state from a reader
Load(r io.Reader, sz int) (int64, error) Load(r io.Reader) (int, error)
} }
// CredentialStore is the interface credential stores must support. // CredentialStore is the interface credential stores must support.
@ -101,9 +101,6 @@ const (
PermStatus = "status" PermStatus = "status"
// PermBackup means user can backup node. // PermBackup means user can backup node.
PermBackup = "backup" PermBackup = "backup"
// LoadBatchSz is the batch size for loading a dump file.
LoadBatchSz = 1000
) )
func init() { func init() {
@ -385,7 +382,7 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
return return
} }
n, err := s.store.Load(r.Body, LoadBatchSz) n, err := s.store.Load(r.Body)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

@ -393,7 +393,7 @@ func (m *MockStore) Backup(leader bool) ([]byte, error) {
return nil, nil return nil, nil
} }
func (m *MockStore) Load(r io.Reader, sz int) (int64, error) { func (m *MockStore) Load(r io.Reader) (int, error) {
return 0, nil return 0, nil
} }

@ -442,7 +442,7 @@ func (s *Store) Execute(queries []string, timings, tx bool) ([]*sql.Result, erro
} }
// Load loads a SQLite .dump state from a reader. // Load loads a SQLite .dump state from a reader.
func (s *Store) Load(r io.Reader, sz int) (int64, error) { func (s *Store) Load(r io.Reader) (int, error) {
if s.raft.State() != raft.Leader { if s.raft.State() != raft.Leader {
return 0, ErrNotLeader return 0, ErrNotLeader
} }
@ -458,15 +458,13 @@ func (s *Store) Load(r io.Reader, sz int) (int64, error) {
// Read the dump, executing the commands. // Read the dump, executing the commands.
var queries []string var queries []string
var n int64
scanner := parser.NewScanner(r) scanner := parser.NewScanner(r)
for { for {
cmd, err := scanner.Scan() cmd, err := scanner.Scan()
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return n, err return len(queries), err
} }
if cmd == "PRAGMA foreign_keys=OFF" || if cmd == "BEGIN TRANSACTION" ||
cmd == "BEGIN TRANSACTION" ||
cmd == "COMMIT" { cmd == "COMMIT" {
continue continue
} }
@ -478,30 +476,20 @@ func (s *Store) Load(r io.Reader, sz int) (int64, error) {
} }
queries = append(queries, cmd) queries = append(queries, cmd)
if len(queries) == sz {
_, err = s.Execute(queries, false, true)
if err != nil {
return n, err
}
n += int64(len(queries))
queries = nil
}
} }
// Flush residual
if len(queries) > 0 { if len(queries) > 0 {
_, err = s.Execute(queries, false, true) _, err = s.Execute(queries, false, true)
if err != nil { if err != nil {
return n, err return len(queries), err
} }
n += int64(len(queries))
} }
// Restore FK constraints to starting state. // Restore FK constraints to starting state.
if err := s.db.EnableFKConstraints(currFK); err != nil { if err := s.db.EnableFKConstraints(currFK); err != nil {
return n, err return len(queries), err
} }
return n, nil return len(queries), nil
} }
// Backup return a consistent snapshot of the underlying database. // Backup return a consistent snapshot of the underlying database.

@ -221,87 +221,11 @@ INSERT INTO "foo" VALUES(1,'fiona');
COMMIT; COMMIT;
` `
buf := bytes.NewBufferString(dump) buf := bytes.NewBufferString(dump)
n, err := s.Load(buf, 0) n, err := s.Load(buf)
if err != nil { if err != nil {
t.Fatalf("failed to load dump: %s", err.Error()) t.Fatalf("failed to load dump: %s", err.Error())
} }
if n != 2 { if n != 3 {
t.Fatal("wrong number of statements loaded")
}
// Check that data were loaded correctly.
r, err := s.Query([]string{`SELECT * FROM foo`}, false, true, Strong)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_SingleNodeLoadBatch1(t *testing.T) {
s := mustNewStore(true)
defer os.RemoveAll(s.Path())
if err := s.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
s.WaitForLeader(10 * time.Second)
dump := `PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE foo (id integer not null primary key, name text);
INSERT INTO "foo" VALUES(1,'fiona');
COMMIT;
`
buf := bytes.NewBufferString(dump)
n, err := s.Load(buf, 1)
if err != nil {
t.Fatalf("failed to load dump: %s", err.Error())
}
if n != 2 {
t.Fatalf("wrong number of statements loaded, exp %d, got %d", 2, n)
}
// Check that data were loaded correctly.
r, err := s.Query([]string{`SELECT * FROM foo`}, false, true, Strong)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_SingleNodeLoadBatchLarge(t *testing.T) {
s := mustNewStore(true)
defer os.RemoveAll(s.Path())
if err := s.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
s.WaitForLeader(10 * time.Second)
dump := `PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE foo (id integer not null primary key, name text);
INSERT INTO "foo" VALUES(1,'fiona');
COMMIT;
`
buf := bytes.NewBufferString(dump)
n, err := s.Load(buf, 100)
if err != nil {
t.Fatalf("failed to load dump: %s", err.Error())
}
if n != 2 {
t.Fatal("wrong number of statements loaded") t.Fatal("wrong number of statements loaded")
} }
@ -337,12 +261,12 @@ INSERT INTO "foo" VALUES(1,'fiona');
COMMIT; COMMIT;
` `
buf := bytes.NewBufferString(dump) buf := bytes.NewBufferString(dump)
n, err := s.Load(buf, 100) n, err := s.Load(buf)
if err != nil { if err != nil {
t.Fatalf("failed to load dump: %s", err.Error()) t.Fatalf("failed to load dump: %s", err.Error())
} }
if n != 2 { if n != 3 {
t.Fatal("wrong number of statements loaded") t.Fatal("wrong number of statements loaded, exp: 2, got: ", n)
} }
// Check that data were loaded correctly. // Check that data were loaded correctly.
@ -373,12 +297,12 @@ BEGIN TRANSACTION;
COMMIT; COMMIT;
` `
buf := bytes.NewBufferString(dump) buf := bytes.NewBufferString(dump)
n, err := s.Load(buf, 0) n, err := s.Load(buf)
if err != nil { if err != nil {
t.Fatalf("failed to load dump: %s", err.Error()) t.Fatalf("failed to load dump: %s", err.Error())
} }
if n != 0 { if n != 1 {
t.Fatal("wrong number of statements loaded") t.Fatal("wrong number of statements loaded, exp: 1, got: ", n)
} }
} }
@ -393,7 +317,7 @@ func Test_SingleNodeLoadEmpty(t *testing.T) {
s.WaitForLeader(10 * time.Second) s.WaitForLeader(10 * time.Second)
buf := bytes.NewBufferString("") buf := bytes.NewBufferString("")
n, err := s.Load(buf, 0) n, err := s.Load(buf)
if err != nil { if err != nil {
t.Fatalf("failed to load dump: %s", err.Error()) t.Fatalf("failed to load dump: %s", err.Error())
} }

Loading…
Cancel
Save