|
|
@ -32,6 +32,10 @@ import (
|
|
|
|
"github.com/rqlite/rqlite/store"
|
|
|
|
"github.com/rqlite/rqlite/store"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
|
|
|
defaultChunkSize = 512 * 1024 * 1024 // 512 MB
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
var (
|
|
|
|
// ErrLeaderNotFound is returned when a node cannot locate a leader
|
|
|
|
// ErrLeaderNotFound is returned when a node cannot locate a leader
|
|
|
|
ErrLeaderNotFound = errors.New("leader not found")
|
|
|
|
ErrLeaderNotFound = errors.New("leader not found")
|
|
|
@ -847,6 +851,12 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
chunkSz, err := chunkSizeParam(r, defaultChunkSize)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Peek at the incoming bytes so we can determine if this is a SQLite database
|
|
|
|
// Peek at the incoming bytes so we can determine if this is a SQLite database
|
|
|
|
validSQLite := false
|
|
|
|
validSQLite := false
|
|
|
|
bufReader := bufio.NewReader(r.Body)
|
|
|
|
bufReader := bufio.NewReader(r.Body)
|
|
|
@ -897,8 +907,7 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
resp.end = time.Now()
|
|
|
|
resp.end = time.Now()
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
chunker := chunking.NewChunker(bufReader, 1024*1024)
|
|
|
|
chunker := chunking.NewChunker(bufReader, int64(chunkSz))
|
|
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
for {
|
|
|
|
chunk, err := chunker.Next()
|
|
|
|
chunk, err := chunker.Next()
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
@ -2058,11 +2067,24 @@ func timeoutParam(req *http.Request, def time.Duration) (time.Duration, error) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
t, err := time.ParseDuration(timeout)
|
|
|
|
t, err := time.ParseDuration(timeout)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
return def, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return t, nil
|
|
|
|
return t, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func chunkSizeParam(req *http.Request, defSz int) (int, error) {
|
|
|
|
|
|
|
|
q := req.URL.Query()
|
|
|
|
|
|
|
|
chunkSize := strings.TrimSpace(q.Get("chunk_kb"))
|
|
|
|
|
|
|
|
if chunkSize == "" {
|
|
|
|
|
|
|
|
return defSz, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
sz, err := strconv.Atoi(chunkSize)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return defSz, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return sz * 1024, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// isTx returns whether the HTTP request is requesting a transaction.
|
|
|
|
// isTx returns whether the HTTP request is requesting a transaction.
|
|
|
|
func isTx(req *http.Request) (bool, error) {
|
|
|
|
func isTx(req *http.Request) (bool, error) {
|
|
|
|
return queryParam(req, "transaction")
|
|
|
|
return queryParam(req, "transaction")
|
|
|
|