1
0
Fork 0

More refactoring

master
Philip O'Toole 9 months ago
parent 87808a8307
commit 61fb89d418

@ -98,21 +98,26 @@ func (qp QueryParams) NonVoters() bool {
return qp.HasKey("nonvoters")
}
// NoLeader returns true if the query parameters request no leader mode
// NoLeader returns true if the query parameters request no leader mode.
func (qp QueryParams) NoLeader() bool {
return qp.HasKey("noleader")
}
// Redirect returns true if the query parameters request redirect mode
// Redirect returns true if the query parameters request redirect mode.
func (qp QueryParams) Redirect() bool {
return qp.HasKey("redirect")
}
// Vacuum returns true if the query parameters request vacuum mode
// Vacuum returns true if the query parameters request vacuum mode.
func (qp QueryParams) Vacuum() bool {
return qp.HasKey("vacuum")
}
// Key returns the value of the key named "key".
func (qp QueryParams) Key() string {
return qp["key"]
}
// Level returns the requested consistency level.
func (qp QueryParams) Level() command.QueryRequest_Level {
lvl := qp["level"]

@ -597,6 +597,7 @@ func (s *Service) handleBackup(w http.ResponseWriter, r *http.Request, qp QueryP
Leader: !qp.NoLeader(),
Vacuum: qp.Vacuum(),
}
addBackupFormatHeader(w, qp)
err := s.store.Backup(br, w)
if err != nil {
@ -896,8 +897,7 @@ func (s *Service) handleStatus(w http.ResponseWriter, r *http.Request, qp QueryP
return
}
key := keyParam(r)
b, err = getSubJSON(b, key)
b, err = getSubJSON(b, qp.Key())
if err != nil {
http.Error(w, fmt.Sprintf("JSON subkey: %s", err.Error()),
http.StatusInternalServerError)
@ -928,12 +928,6 @@ func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request, qp QueryPa
return
}
includeNonVoters, err := nonVoters(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Get nodes in the cluster, and possibly filter out non-voters.
sNodes, err := s.store.Nodes()
if err != nil {
@ -945,7 +939,7 @@ func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request, qp QueryPa
return
}
nodes := NewNodesFromServers(sNodes)
if !includeNonVoters {
if !qp.NonVoters() {
nodes = nodes.Voters()
}
@ -981,12 +975,7 @@ func (s *Service) handleReadyz(w http.ResponseWriter, r *http.Request, qp QueryP
return
}
noLeader, err := noLeader(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if noLeader {
if qp.NoLeader() {
// Simply handling the HTTP request is enough.
w.WriteHeader(http.StatusOK)
w.Write([]byte("[+]node ok"))
@ -1050,12 +1039,7 @@ func (s *Service) queuedExecute(w http.ResponseWriter, r *http.Request, qp Query
// Perform a leader check, unless disabled. This prevents generating queued writes on
// a node that does not appear to be connected to a cluster (even a single-node cluster).
noLeader, err := noLeader(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !noLeader {
if !qp.NoLeader() {
addr, err := s.store.LeaderAddr()
if err != nil || addr == "" {
stats.Add(numLeaderNotFound, 1)
@ -1064,12 +1048,6 @@ func (s *Service) queuedExecute(w http.ResponseWriter, r *http.Request, qp Query
}
}
wait, err := isWait(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
@ -1079,23 +1057,18 @@ func (s *Service) queuedExecute(w http.ResponseWriter, r *http.Request, qp Query
stmts, err := ParseRequest(b)
if err != nil {
if errors.Is(err, ErrNoStatements) && !wait {
if errors.Is(err, ErrNoStatements) && !qp.Wait() {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
noRewriteRandom, err := noRewriteRandom(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := command.Rewrite(stmts, !noRewriteRandom); err != nil {
if err := command.Rewrite(stmts, !qp.NoRewriteRandom()); err != nil {
http.Error(w, fmt.Sprintf("SQL rewrite: %s", err.Error()), http.StatusInternalServerError)
return
}
var fc queue.FlushChannel
if wait {
if qp.Wait() {
stats.Add(numQueuedExecutionsWait, 1)
fc = make(queue.FlushChannel)
}
@ -1107,7 +1080,7 @@ func (s *Service) queuedExecute(w http.ResponseWriter, r *http.Request, qp Query
}
resp.SequenceNum = seqNum
if wait {
if qp.Wait() {
// Wait for the flush channel to close, or timeout.
select {
case <-fc:
@ -1375,12 +1348,11 @@ func (s *Service) handleExpvar(w http.ResponseWriter, r *http.Request, qp QueryP
w.WriteHeader(http.StatusUnauthorized)
return
}
key := keyParam(r)
fmt.Fprintf(w, "{\n")
first := true
expvar.Do(func(kv expvar.KeyValue) {
if key != "" && key != kv.Key {
if qp.Key() != "" && qp.Key() != kv.Key {
return
}
if !first {
@ -1610,6 +1582,14 @@ func (s *Service) addAllowHeaders(w http.ResponseWriter) {
}
}
// addBackupFormatHeader adds the Content-Type header for the backup format.
func addBackupFormatHeader(w http.ResponseWriter, qp QueryParams) {
w.Header().Set("Content-Type", "application/octet-stream")
if qp.BackupFormat() == command.BackupRequest_BACKUP_REQUEST_FORMAT_SQL {
w.Header().Set("Content-Type", "application/sql")
}
}
// tlsStats returns the TLS stats for the service.
func (s *Service) tlsStats() map[string]interface{} {
m := map[string]interface{}{
@ -1667,23 +1647,6 @@ func requestQueries(r *http.Request, qp QueryParams) ([]*command.Statement, erro
return ParseRequest(b)
}
// queryParam returns whether the given query param is present.
func queryParam(req *http.Request, param string) (bool, error) {
err := req.ParseForm()
if err != nil {
return false, err
}
if _, ok := req.Form[param]; ok {
return true, nil
}
return false, nil
}
func keyParam(req *http.Request) string {
q := req.URL.Query()
return strings.TrimSpace(q.Get("key"))
}
func getSubJSON(jsonBlob []byte, keyString string) (json.RawMessage, error) {
if keyString == "" {
return jsonBlob, nil
@ -1722,67 +1685,6 @@ func getSubJSON(jsonBlob []byte, keyString string) (json.RawMessage, error) {
return finalObjBytes, nil
}
// noLeader returns whether processing should skip the leader check.
func noLeader(req *http.Request) (bool, error) {
return queryParam(req, "noleader")
}
// nonVoters returns whether a query is requesting to include non-voter results
func nonVoters(req *http.Request) (bool, error) {
return queryParam(req, "nonvoters")
}
// isTimings returns whether timings are requested.
func isTimings(req *http.Request) (bool, error) {
return queryParam(req, "timings")
}
// isWait returns whether a wait operation is requested.
func isWait(req *http.Request) (bool, error) {
return queryParam(req, "wait")
}
func isAssociative(req *http.Request) (bool, error) {
return queryParam(req, "associative")
}
// noRewriteRandom returns whether a rewrite of RANDOM is disabled.
func noRewriteRandom(req *http.Request) (bool, error) {
return queryParam(req, "norwrandom")
}
// level returns the requested consistency level for a query
func level(req *http.Request) (command.QueryRequest_Level, error) {
q := req.URL.Query()
lvl := strings.TrimSpace(q.Get("level"))
switch strings.ToLower(lvl) {
case "none":
return command.QueryRequest_QUERY_REQUEST_LEVEL_NONE, nil
case "weak":
return command.QueryRequest_QUERY_REQUEST_LEVEL_WEAK, nil
case "strong":
return command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG, nil
default:
return command.QueryRequest_QUERY_REQUEST_LEVEL_WEAK, nil
}
}
// freshness returns any freshness requested with a query.
func freshness(req *http.Request) (time.Duration, error) {
q := req.URL.Query()
f := strings.TrimSpace(q.Get("freshness"))
if f == "" {
return 0, nil
}
d, err := time.ParseDuration(f)
if err != nil {
return 0, err
}
return d, nil
}
func prettyEnabled(e bool) string {
if e {
return "enabled"

Loading…
Cancel
Save