1
0
Fork 0

Migrate HTTP layer to chunked loading

master
Philip O'Toole 1 year ago
parent 4d1fbd4b60
commit 1107690c52

@ -21,6 +21,8 @@ import (
)
const (
SQLiteHeaderSize = 32
bkDelay = 250
defaultCheckpointTimeout = 30 * time.Second
)

@ -3,6 +3,7 @@
package http
import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
@ -23,6 +24,7 @@ import (
"github.com/rqlite/rqlite/auth"
"github.com/rqlite/rqlite/cluster"
"github.com/rqlite/rqlite/command"
"github.com/rqlite/rqlite/command/chunking"
"github.com/rqlite/rqlite/command/encoding"
"github.com/rqlite/rqlite/db"
"github.com/rqlite/rqlite/queue"
@ -58,8 +60,8 @@ type Database interface {
// an Execute or Query request.
Request(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error)
// Load loads a SQLite file into the system
Load(lr *command.LoadRequest) error
// LoadChunk loads a SQLite database into the node, chunk by chunk.
LoadChunk(lc *command.LoadChunkRequest) error
}
// Store is the interface the Raft-based database must implement.
@ -108,8 +110,8 @@ type Cluster interface {
// Backup retrieves a backup from a remote node and writes to the io.Writer.
Backup(br *command.BackupRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration, w io.Writer) error
// Load loads a SQLite database into the node.
Load(lr *command.LoadRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) error
// LoadChunk loads a SQLite database into the node, chunk by chunk.
LoadChunk(lc *command.LoadChunkRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) error
// RemoveNode removes a node from the cluster.
RemoveNode(rn *command.RemoveNodeRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) error
@ -813,8 +815,7 @@ func (s *Service) handleBackup(w http.ResponseWriter, r *http.Request) {
s.lastBackup = time.Now()
}
// handleLoad loads the state contained in a .dump output. This API is different
// from others in that it expects a raw file, not wrapped in any kind of JSON.
// handleLoad loads the database from the given SQLite database file or SQLite dump.
func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
if !s.CheckRequestPerm(r, auth.PermLoad) {
w.WriteHeader(http.StatusUnauthorized)
@ -846,32 +847,39 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
r.Body.Close()
if db.IsValidSQLiteData(b) {
s.logger.Printf("SQLite database file detected as load data")
lr := &command.LoadRequest{
Data: b,
// Peek at the incoming bytes so we can determine if this is a SQLite database
validSQLite := false
bufReader := bufio.NewReader(r.Body)
peek, err := bufReader.Peek(db.SQLiteHeaderSize)
if err == nil {
validSQLite = db.IsValidSQLiteData(peek)
if validSQLite {
s.logger.Printf("SQLite database file detected as load data")
if db.IsWALModeEnabled(peek) {
s.logger.Printf("SQLite database file is in WAL mode - rejecting load request")
http.Error(w, `SQLite database file is in WAL mode - convert it to DELETE mode via 'PRAGMA journal_mode=DELETE'`,
http.StatusBadRequest)
return
}
}
if db.IsWALModeEnabled(b) {
s.logger.Printf("SQLite database file is in WAL mode - rejecting load request")
http.Error(w, `SQLite database file is in WAL mode - convert it to DELETE mode via 'PRAGMA journal_mode=DELETE'`,
http.StatusBadRequest)
}
if !validSQLite {
// Assume SQL text
b, err := io.ReadAll(bufReader)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
r.Body.Close()
err := s.store.Load(lr)
if err != nil && err != store.ErrNotLeader {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else if err != nil && err == store.ErrNotLeader {
if redirect {
queries := []string{string(b)}
er := executeRequestFromStrings(queries, timings, false)
results, err := s.store.Execute(er)
if err != nil {
if err == store.ErrNotLeader {
leaderAPIAddr := s.LeaderAPIAddr()
if leaderAPIAddr == "" {
stats.Add(numLeaderNotFound, 1)
@ -883,63 +891,75 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, redirect, http.StatusMovedPermanently)
return
}
resp.Error = err.Error()
} else {
resp.Results.ExecuteResult = results
}
resp.end = time.Now()
} else {
chunker := chunking.NewChunker(bufReader, 1024*1024)
addr, err := s.store.LeaderAddr()
for {
chunk, err := chunker.Next()
if err != nil {
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
if addr == "" {
stats.Add(numLeaderNotFound, 1)
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
err = s.store.LoadChunk(chunk)
if err != nil && err != store.ErrNotLeader {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
username, password, ok := r.BasicAuth()
if !ok {
username = ""
}
} else if err != nil && err == store.ErrNotLeader {
if redirect {
leaderAPIAddr := s.LeaderAPIAddr()
if leaderAPIAddr == "" {
stats.Add(numLeaderNotFound, 1)
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
return
}
w.Header().Add(ServedByHTTPHeader, addr)
loadErr := s.cluster.Load(lr, addr, makeCredentials(username, password), timeout)
if loadErr != nil {
if loadErr.Error() == "unauthorized" {
http.Error(w, "remote load not authorized", http.StatusUnauthorized)
} else {
http.Error(w, loadErr.Error(), http.StatusInternalServerError)
redirect := s.FormRedirect(r, leaderAPIAddr)
http.Redirect(w, r, redirect, http.StatusMovedPermanently)
return
}
return
}
stats.Add(numRemoteLoads, 1)
// Allow this if block to exit, so response remains as before request
// forwarding was put in place.
}
} else {
// No JSON structure expected for this API.
queries := []string{string(b)}
er := executeRequestFromStrings(queries, timings, false)
results, err := s.store.Execute(er)
if err != nil {
if err == store.ErrNotLeader {
leaderAPIAddr := s.LeaderAPIAddr()
if leaderAPIAddr == "" {
addr, err := s.store.LeaderAddr()
if err != nil {
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
http.StatusInternalServerError)
return
}
if addr == "" {
stats.Add(numLeaderNotFound, 1)
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
return
}
redirect := s.FormRedirect(r, leaderAPIAddr)
http.Redirect(w, r, redirect, http.StatusMovedPermanently)
return
username, password, ok := r.BasicAuth()
if !ok {
username = ""
}
w.Header().Add(ServedByHTTPHeader, addr)
loadErr := s.cluster.LoadChunk(chunk, addr, makeCredentials(username, password), timeout)
if loadErr != nil {
if loadErr.Error() == "unauthorized" {
http.Error(w, "remote load not authorized", http.StatusUnauthorized)
} else {
http.Error(w, loadErr.Error(), http.StatusInternalServerError)
}
return
}
stats.Add(numRemoteLoads, 1)
// Allow this if block to exit, so response remains as before request
// forwarding was put in place.
}
if chunk.IsLast {
break
}
resp.Error = err.Error()
} else {
resp.Results.ExecuteResult = results
}
resp.end = time.Now()
}
s.writeResponse(w, r, resp)
}

@ -2,9 +2,11 @@ package http
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
@ -741,14 +743,14 @@ func Test_LoadFlagsNoLeader(t *testing.T) {
t.Fatalf("failed to load test SQLite data")
}
m.loadFn = func(br *command.LoadRequest) error {
m.loadChunkFn = func(br *command.LoadChunkRequest) error {
return store.ErrNotLeader
}
clusterLoadCalled := false
c.loadFn = func(lr *command.LoadRequest, nodeAddr string, timeout time.Duration) error {
c.loadChunkFn = func(lc *command.LoadChunkRequest, nodeAddr string, timeout time.Duration) error {
clusterLoadCalled = true
if !bytes.Equal(lr.Data, testData) {
if !bytes.Equal(mustGunzip(lc.Data), testData) {
t.Fatalf("wrong data passed to cluster load")
}
return nil
@ -798,11 +800,11 @@ func Test_LoadRemoteError(t *testing.T) {
t.Fatalf("failed to load test SQLite data")
}
m.loadFn = func(br *command.LoadRequest) error {
m.loadChunkFn = func(br *command.LoadChunkRequest) error {
return store.ErrNotLeader
}
clusterLoadCalled := false
c.loadFn = func(lr *command.LoadRequest, addr string, t time.Duration) error {
c.loadChunkFn = func(lr *command.LoadChunkRequest, addr string, t time.Duration) error {
clusterLoadCalled = true
return fmt.Errorf("the load failed")
}
@ -1262,13 +1264,13 @@ func Test_timeoutQueryParam(t *testing.T) {
}
type MockStore struct {
executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error)
requestFn func(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error)
backupFn func(br *command.BackupRequest, dst io.Writer) error
loadFn func(lr *command.LoadRequest) error
leaderAddr string
notReady bool // Default value is true, easier to test.
executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error)
requestFn func(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error)
backupFn func(br *command.BackupRequest, dst io.Writer) error
loadChunkFn func(lr *command.LoadChunkRequest) error
leaderAddr string
notReady bool // Default value is true, easier to test.
}
func (m *MockStore) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
@ -1327,9 +1329,9 @@ func (m *MockStore) Backup(br *command.BackupRequest, w io.Writer) error {
return m.backupFn(br, w)
}
func (m *MockStore) Load(lr *command.LoadRequest) error {
if m.loadFn != nil {
return m.loadFn(lr)
func (m *MockStore) LoadChunk(lc *command.LoadChunkRequest) error {
if m.loadChunkFn != nil {
return m.loadChunkFn(lc)
}
return nil
}
@ -1340,7 +1342,7 @@ type mockClusterService struct {
queryFn func(qr *command.QueryRequest, addr string, t time.Duration) ([]*command.QueryRows, error)
requestFn func(eqr *command.ExecuteQueryRequest, nodeAddr string, timeout time.Duration) ([]*command.ExecuteQueryResponse, error)
backupFn func(br *command.BackupRequest, addr string, t time.Duration, w io.Writer) error
loadFn func(lr *command.LoadRequest, addr string, t time.Duration) error
loadChunkFn func(lc *command.LoadChunkRequest, addr string, t time.Duration) error
removeNodeFn func(rn *command.RemoveNodeRequest, nodeAddr string, t time.Duration) error
}
@ -1376,9 +1378,9 @@ func (m *mockClusterService) Backup(br *command.BackupRequest, addr string, cred
return nil
}
func (m *mockClusterService) Load(lr *command.LoadRequest, addr string, creds *cluster.Credentials, t time.Duration) error {
if m.loadFn != nil {
return m.loadFn(lr, addr, t)
func (m *mockClusterService) LoadChunk(lc *command.LoadChunkRequest, addr string, creds *cluster.Credentials, t time.Duration) error {
if m.loadChunkFn != nil {
return m.loadChunkFn(lc, addr, t)
}
return nil
}
@ -1440,3 +1442,18 @@ func mustParseDuration(d string) time.Duration {
return dur
}
}
func mustGunzip(b []byte) []byte {
r, err := gzip.NewReader(bytes.NewBuffer(b))
if err != nil {
panic(err)
}
defer r.Close()
dec, err := ioutil.ReadAll(r)
if err != nil {
panic(err)
}
return dec
}

Loading…
Cancel
Save