// Package http provides the HTTP server for accessing the distributed database. package http import ( "bufio" "context" "crypto/tls" "encoding/json" "errors" "expvar" "fmt" "io" "log" "net" "net/http" "net/http/pprof" "os" "runtime" "strings" "sync" "time" "github.com/rqlite/rqlite/v8/auth" clstrPB "github.com/rqlite/rqlite/v8/cluster/proto" "github.com/rqlite/rqlite/v8/command" "github.com/rqlite/rqlite/v8/command/encoding" "github.com/rqlite/rqlite/v8/command/proto" "github.com/rqlite/rqlite/v8/db" "github.com/rqlite/rqlite/v8/queue" "github.com/rqlite/rqlite/v8/rtls" "github.com/rqlite/rqlite/v8/store" ) var ( // ErrLeaderNotFound is returned when a node cannot locate a leader ErrLeaderNotFound = errors.New("leader not found") ) type ResultsError interface { Error() string IsAuthorized() bool } // Database is the interface any queryable system must implement type Database interface { // Execute executes a slice of queries, each of which is not expected // to return rows. If timings is true, then timing information will // be return. If tx is true, then either all queries will be executed // successfully or it will as though none executed. Execute(er *proto.ExecuteRequest) ([]*proto.ExecuteResult, error) // Query executes a slice of queries, each of which returns rows. If // timings is true, then timing information will be returned. If tx // is true, then all queries will take place while a read transaction // is held on the database. Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) // Request processes a slice of requests, each of which can be either // an Execute or Query request. Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryResponse, error) // Load loads a SQLite file into the system via Raft consensus. Load(lr *proto.LoadRequest) error } // Store is the interface the Raft-based database must implement. type Store interface { Database // Remove removes the node from the cluster. Remove(rn *proto.RemoveNodeRequest) error // LeaderAddr returns the Raft address of the leader of the cluster. LeaderAddr() (string, error) // Ready returns whether the Store is ready to service requests. Ready() bool // Committed blocks until the local commit index is greater than or // equal to the Leader index, as checked when the function is called. Committed(timeout time.Duration) (uint64, error) // Stats returns stats on the Store. Stats() (map[string]interface{}, error) // Nodes returns the slice of store.Servers in the cluster Nodes() ([]*store.Server, error) // Backup writes backup of the node state to dst Backup(br *proto.BackupRequest, dst io.Writer) error // ReadFrom reads and loads a SQLite database into the node, initially bypassing // the Raft system. It then triggers a Raft snapshot, which will then make // Raft aware of the new data. ReadFrom(r io.Reader) (int64, error) } // GetAddresser is the interface that wraps the GetNodeAPIAddr method. // GetNodeAPIAddr returns the HTTP API URL for the node at the given Raft address. type GetAddresser interface { GetNodeAPIAddr(addr string, timeout time.Duration) (string, error) } // Cluster is the interface node API services must provide type Cluster interface { GetAddresser // Execute performs an Execute Request on a remote node. Execute(er *proto.ExecuteRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, retries int) ([]*proto.ExecuteResult, error) // Query performs an Query Request on a remote node. Query(qr *proto.QueryRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) ([]*proto.QueryRows, error) // Request performs an ExecuteQuery Request on a remote node. Request(eqr *proto.ExecuteQueryRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, retries int) ([]*proto.ExecuteQueryResponse, error) // Backup retrieves a backup from a remote node and writes to the io.Writer. Backup(br *proto.BackupRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, w io.Writer) error // Load loads a SQLite database into the node. Load(lr *proto.LoadRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, retries int) error // RemoveNode removes a node from the cluster. RemoveNode(rn *proto.RemoveNodeRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) error // Stats returns stats on the Cluster. Stats() (map[string]interface{}, error) } // CredentialStore is the interface credential stores must support. type CredentialStore interface { // AA authenticates and checks authorization for the given perm. AA(username, password, perm string) bool } // StatusReporter is the interface status providers must implement. type StatusReporter interface { Stats() (map[string]interface{}, error) } // DBResults stores either an Execute result, a Query result, or // an ExecuteQuery result. type DBResults struct { ExecuteResult []*proto.ExecuteResult QueryRows []*proto.QueryRows ExecuteQueryResponse []*proto.ExecuteQueryResponse AssociativeJSON bool // Render in associative form } // Responser is the interface response objects must implement. type Responser interface { SetTime() } // MarshalJSON implements the JSON Marshaler interface. func (d *DBResults) MarshalJSON() ([]byte, error) { enc := encoding.Encoder{ Associative: d.AssociativeJSON, } if d.ExecuteResult != nil { return enc.JSONMarshal(d.ExecuteResult) } else if d.QueryRows != nil { return enc.JSONMarshal(d.QueryRows) } else if d.ExecuteQueryResponse != nil { return enc.JSONMarshal(d.ExecuteQueryResponse) } return json.Marshal(make([]interface{}, 0)) } // Response represents a response from the HTTP service. type Response struct { Results *DBResults `json:"results,omitempty"` Error string `json:"error,omitempty"` Time float64 `json:"time,omitempty"` SequenceNum int64 `json:"sequence_number,omitempty"` start time.Time end time.Time } // SetTime sets the Time attribute of the response. This way it will be present // in the serialized JSON version. func (r *Response) SetTime() { r.Time = r.end.Sub(r.start).Seconds() } // NewResponse returns a new instance of response. func NewResponse() *Response { return &Response{ Results: &DBResults{}, start: time.Now(), } } // stats captures stats for the HTTP service. var stats *expvar.Map const ( numLeaderNotFound = "leader_not_found" numExecutions = "executions" numExecuteStmtsRx = "execute_stmts_rx" numQueuedExecutions = "queued_executions" numQueuedExecutionsOK = "queued_executions_ok" numQueuedExecutionsStmtsRx = "queued_executions_num_stmts_rx" numQueuedExecutionsStmtsTx = "queued_executions_num_stmts_tx" numQueuedExecutionsNoLeader = "queued_executions_no_leader" numQueuedExecutionsNotLeader = "queued_executions_not_leader" numQueuedExecutionsLeadershipLost = "queued_executions_leadership_lost" numQueuedExecutionsUnknownError = "queued_executions_unknown_error" numQueuedExecutionsFailed = "queued_executions_failed" numQueuedExecutionsWait = "queued_executions_wait" numQueries = "queries" numQueryStmtsRx = "query_stmts_rx" numRequests = "requests" numRequestStmtsRx = "request_stmts_rx" numRemoteExecutions = "remote_executions" numRemoteExecutionsFailed = "remote_executions_failed" numRemoteQueries = "remote_queries" numRemoteQueriesFailed = "remote_queries_failed" numRemoteRequests = "remote_requests" numRemoteRequestsFailed = "remote_requests_failed" numRemoteBackups = "remote_backups" numRemoteLoads = "remote_loads" numRemoteRemoveNode = "remote_remove_node" numReadyz = "num_readyz" numStatus = "num_status" numBackups = "backups" numLoad = "loads" numLoadAborted = "loads_aborted" numBoot = "boot" numAuthOK = "authOK" numAuthFail = "authFail" // Default timeout for cluster communications. defaultTimeout = 30 * time.Second // VersionHTTPHeader is the HTTP header key for the version. VersionHTTPHeader = "X-RQLITE-VERSION" // ServedByHTTPHeader is the HTTP header used to report which // node (by node Raft address) actually served the request if // it wasn't served by this node. ServedByHTTPHeader = "X-RQLITE-SERVED-BY" // AllowOriginHeader is the HTTP header for allowing CORS compliant access from certain origins AllowOriginHeader = "Access-Control-Allow-Origin" // AllowMethodsHeader is the HTTP header for supporting the correct methods AllowMethodsHeader = "Access-Control-Allow-Methods" // AllowHeadersHeader is the HTTP header for supporting the correct request headers AllowHeadersHeader = "Access-Control-Allow-Headers" // AllowCredentialsHeader is the HTTP header for supporting specifying credentials AllowCredentialsHeader = "Access-Control-Allow-Credentials" ) func init() { stats = expvar.NewMap("http") ResetStats() } // ResetStats resets the expvar stats for this module. Mostly for test purposes. func ResetStats() { stats.Init() stats.Add(numLeaderNotFound, 0) stats.Add(numExecutions, 0) stats.Add(numExecuteStmtsRx, 0) stats.Add(numQueuedExecutions, 0) stats.Add(numQueuedExecutionsOK, 0) stats.Add(numQueuedExecutionsStmtsRx, 0) stats.Add(numQueuedExecutionsStmtsTx, 0) stats.Add(numQueuedExecutionsNoLeader, 0) stats.Add(numQueuedExecutionsNotLeader, 0) stats.Add(numQueuedExecutionsLeadershipLost, 0) stats.Add(numQueuedExecutionsUnknownError, 0) stats.Add(numQueuedExecutionsFailed, 0) stats.Add(numQueuedExecutionsWait, 0) stats.Add(numQueries, 0) stats.Add(numQueryStmtsRx, 0) stats.Add(numRequests, 0) stats.Add(numRequestStmtsRx, 0) stats.Add(numRemoteExecutions, 0) stats.Add(numRemoteExecutionsFailed, 0) stats.Add(numRemoteQueries, 0) stats.Add(numRemoteQueriesFailed, 0) stats.Add(numRemoteRequests, 0) stats.Add(numRemoteRequestsFailed, 0) stats.Add(numRemoteBackups, 0) stats.Add(numRemoteLoads, 0) stats.Add(numRemoteRemoveNode, 0) stats.Add(numReadyz, 0) stats.Add(numStatus, 0) stats.Add(numBackups, 0) stats.Add(numLoad, 0) stats.Add(numLoadAborted, 0) stats.Add(numBoot, 0) stats.Add(numAuthOK, 0) stats.Add(numAuthFail, 0) } // Service provides HTTP service. type Service struct { httpServer http.Server closeCh chan struct{} addr string // Bind address of the HTTP service. ln net.Listener // Service listener store Store // The Raft-backed database store. queueDone chan struct{} stmtQueue *queue.Queue // Queue for queued executes cluster Cluster // The Cluster service. start time.Time // Start up time. lastBackup time.Time // Time of last successful backup. statusMu sync.RWMutex statuses map[string]StatusReporter CACertFile string // Path to x509 CA certificate used to verify certificates. CertFile string // Path to server's own x509 certificate. KeyFile string // Path to server's own x509 private key. ClientVerify bool // Whether client certificates should verified. tlsConfig *tls.Config AllowOrigin string // Value to set for Access-Control-Allow-Origin DefaultQueueCap int DefaultQueueBatchSz int DefaultQueueTimeout time.Duration DefaultQueueTx bool seqNumMu sync.Mutex seqNum int64 // Last sequence number written OK. credentialStore CredentialStore BuildInfo map[string]interface{} logger *log.Logger } // New returns an uninitialized HTTP service. If credentials is nil, then // the service performs no authentication and authorization checks. func New(addr string, store Store, cluster Cluster, credentials CredentialStore) *Service { return &Service{ addr: addr, store: store, DefaultQueueCap: 1024, DefaultQueueBatchSz: 128, DefaultQueueTimeout: 100 * time.Millisecond, cluster: cluster, start: time.Now(), statuses: make(map[string]StatusReporter), credentialStore: credentials, logger: log.New(os.Stderr, "[http] ", log.LstdFlags), } } // Start starts the service. func (s *Service) Start() error { s.httpServer = http.Server{ Handler: s, } var ln net.Listener var err error if s.CertFile == "" || s.KeyFile == "" { ln, err = net.Listen("tcp", s.addr) if err != nil { return err } } else { mTLSState := rtls.MTLSStateDisabled if s.ClientVerify { mTLSState = rtls.MTLSStateEnabled } s.tlsConfig, err = rtls.CreateServerConfig(s.CertFile, s.KeyFile, s.CACertFile, mTLSState) if err != nil { return err } ln, err = tls.Listen("tcp", s.addr, s.tlsConfig) if err != nil { return err } var b strings.Builder b.WriteString(fmt.Sprintf("secure HTTPS server enabled with cert %s, key %s", s.CertFile, s.KeyFile)) if s.CACertFile != "" { b.WriteString(fmt.Sprintf(", CA cert %s", s.CACertFile)) } if s.ClientVerify { b.WriteString(", mutual TLS enabled") } else { b.WriteString(", mutual TLS disabled") } // print the message s.logger.Println(b.String()) } s.ln = ln s.closeCh = make(chan struct{}) s.queueDone = make(chan struct{}) s.stmtQueue = queue.New(s.DefaultQueueCap, s.DefaultQueueBatchSz, s.DefaultQueueTimeout) go s.runQueue() s.logger.Printf("execute queue processing started with capacity %d, batch size %d, timeout %s", s.DefaultQueueCap, s.DefaultQueueBatchSz, s.DefaultQueueTimeout.String()) go func() { err := s.httpServer.Serve(s.ln) if err != nil { s.logger.Printf("HTTP service on %s stopped: %s", s.ln.Addr().String(), err.Error()) } }() s.logger.Println("service listening on", s.Addr()) return nil } // Close closes the service. func (s *Service) Close() { s.logger.Println("closing HTTP service on", s.ln.Addr().String()) s.httpServer.Shutdown(context.Background()) s.stmtQueue.Close() select { case <-s.queueDone: default: close(s.closeCh) } <-s.queueDone s.ln.Close() } // HTTPS returns whether this service is using HTTPS. func (s *Service) HTTPS() bool { return s.CertFile != "" && s.KeyFile != "" } // ServeHTTP allows Service to serve HTTP requests. func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.addBuildVersion(w) s.addAllowHeaders(w) if r.Method == http.MethodOptions { w.WriteHeader(http.StatusOK) return } params, err := NewQueryParams(r) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } switch { case r.URL.Path == "/" || r.URL.Path == "": http.Redirect(w, r, "/status", http.StatusFound) case strings.HasPrefix(r.URL.Path, "/db/execute"): stats.Add(numExecutions, 1) s.handleExecute(w, r, params) case strings.HasPrefix(r.URL.Path, "/db/query"): stats.Add(numQueries, 1) s.handleQuery(w, r, params) case strings.HasPrefix(r.URL.Path, "/db/request"): stats.Add(numRequests, 1) s.handleRequest(w, r, params) case strings.HasPrefix(r.URL.Path, "/db/backup"): stats.Add(numBackups, 1) s.handleBackup(w, r, params) case strings.HasPrefix(r.URL.Path, "/db/load"): stats.Add(numLoad, 1) s.handleLoad(w, r, params) case r.URL.Path == "/boot": stats.Add(numBoot, 1) s.handleBoot(w, r, params) case strings.HasPrefix(r.URL.Path, "/remove"): s.handleRemove(w, r, params) case strings.HasPrefix(r.URL.Path, "/status"): stats.Add(numStatus, 1) s.handleStatus(w, r, params) case strings.HasPrefix(r.URL.Path, "/nodes"): s.handleNodes(w, r, params) case strings.HasPrefix(r.URL.Path, "/readyz"): stats.Add(numReadyz, 1) s.handleReadyz(w, r, params) case r.URL.Path == "/debug/vars": s.handleExpvar(w, r, params) case strings.HasPrefix(r.URL.Path, "/debug/pprof"): s.handlePprof(w, r, params) default: w.WriteHeader(http.StatusNotFound) } } // RegisterStatus allows other modules to register status for serving over HTTP. func (s *Service) RegisterStatus(key string, stat StatusReporter) error { s.statusMu.Lock() defer s.statusMu.Unlock() if _, ok := s.statuses[key]; ok { return fmt.Errorf("status already registered with key %s", key) } s.statuses[key] = stat return nil } // handleRemove handles cluster-remove requests. func (s *Service) handleRemove(w http.ResponseWriter, r *http.Request, qp QueryParams) { if !s.CheckRequestPerm(r, auth.PermRemove) { w.WriteHeader(http.StatusUnauthorized) return } if r.Method != "DELETE" { w.WriteHeader(http.StatusMethodNotAllowed) return } b, err := io.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) return } m := map[string]string{} if err := json.Unmarshal(b, &m); err != nil { w.WriteHeader(http.StatusBadRequest) return } if len(m) != 1 { w.WriteHeader(http.StatusBadRequest) return } remoteID, ok := m["id"] if !ok { w.WriteHeader(http.StatusBadRequest) return } rn := &proto.RemoveNodeRequest{ Id: remoteID, } err = s.store.Remove(rn) if err != nil { if err == store.ErrNotLeader { if s.DoRedirect(w, r, qp) { return } addr, err := s.store.LeaderAddr() if err != nil { http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), http.StatusInternalServerError) return } if addr == "" { stats.Add(numLeaderNotFound, 1) http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) return } username, password, ok := r.BasicAuth() if !ok { username = "" } w.Header().Add(ServedByHTTPHeader, addr) removeErr := s.cluster.RemoveNode(rn, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout)) if removeErr != nil { if removeErr.Error() == "unauthorized" { http.Error(w, "remote remove node not authorized", http.StatusUnauthorized) } else { http.Error(w, removeErr.Error(), http.StatusInternalServerError) } return } stats.Add(numRemoteRemoveNode, 1) return } http.Error(w, err.Error(), http.StatusInternalServerError) return } } // handleBackup returns the consistent database snapshot. func (s *Service) handleBackup(w http.ResponseWriter, r *http.Request, qp QueryParams) { if !s.CheckRequestPerm(r, auth.PermBackup) { w.WriteHeader(http.StatusUnauthorized) return } if r.Method != "GET" { w.WriteHeader(http.StatusMethodNotAllowed) return } br := &proto.BackupRequest{ Format: qp.BackupFormat(), Leader: !qp.NoLeader(), Vacuum: qp.Vacuum(), Compress: qp.Compress(), } addBackupFormatHeader(w, qp) err := s.store.Backup(br, w) if err != nil { if err == store.ErrNotLeader { if s.DoRedirect(w, r, qp) { return } addr, err := s.store.LeaderAddr() if err != nil { http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), http.StatusInternalServerError) return } if addr == "" { stats.Add(numLeaderNotFound, 1) http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) return } username, password, ok := r.BasicAuth() if !ok { username = "" } w.Header().Add(ServedByHTTPHeader, addr) backupErr := s.cluster.Backup(br, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout), w) if backupErr != nil { if backupErr.Error() == "unauthorized" { http.Error(w, "remote backup not authorized", http.StatusUnauthorized) } else { http.Error(w, backupErr.Error(), http.StatusInternalServerError) } return } stats.Add(numRemoteBackups, 1) return } else if err == store.ErrInvalidVacuum { http.Error(w, err.Error(), http.StatusBadRequest) return } http.Error(w, err.Error(), http.StatusInternalServerError) return } s.lastBackup = time.Now() } // handleLoad loads the database from the given SQLite database file or SQLite dump. func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request, qp QueryParams) { if !s.CheckRequestPerm(r, auth.PermLoad) { w.WriteHeader(http.StatusUnauthorized) return } if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } resp := NewResponse() b, err := io.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } r.Body.Close() if db.IsValidSQLiteData(b) { s.logger.Printf("SQLite database file detected as load data") lr := &proto.LoadRequest{ Data: b, } err := s.store.Load(lr) if err != nil && err != store.ErrNotLeader { http.Error(w, err.Error(), http.StatusInternalServerError) return } else if err != nil && err == store.ErrNotLeader { if s.DoRedirect(w, r, qp) { return } addr, err := s.store.LeaderAddr() if err != nil { http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), http.StatusInternalServerError) return } if addr == "" { stats.Add(numLeaderNotFound, 1) http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) return } username, password, ok := r.BasicAuth() if !ok { username = "" } w.Header().Add(ServedByHTTPHeader, addr) loadErr := s.cluster.Load(lr, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout), qp.Retries(0)) if loadErr != nil { if loadErr.Error() == "unauthorized" { http.Error(w, "remote load not authorized", http.StatusUnauthorized) } else { http.Error(w, loadErr.Error(), http.StatusInternalServerError) } return } stats.Add(numRemoteLoads, 1) // Allow this if block to exit, so response remains as before request // forwarding was put in place. } } else { // No JSON structure expected for this API. queries := []string{string(b)} er := executeRequestFromStrings(queries, qp.Timings(), false) results, err := s.store.Execute(er) if err != nil { if err == store.ErrNotLeader { if s.DoRedirect(w, r, qp) { return } } resp.Error = err.Error() } else { resp.Results.ExecuteResult = results } resp.end = time.Now() } s.writeResponse(w, r, qp, resp) } // handleBoot handles booting this node using a SQLite file. func (s *Service) handleBoot(w http.ResponseWriter, r *http.Request, qp QueryParams) { if !s.CheckRequestPerm(r, auth.PermLoad) { w.WriteHeader(http.StatusUnauthorized) return } if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } bufReader := bufio.NewReader(r.Body) peek, err := bufReader.Peek(db.SQLiteHeaderSize) if err != nil { http.Error(w, err.Error(), http.StatusServiceUnavailable) return } if !db.IsValidSQLiteData(peek) { http.Error(w, "invalid SQLite data", http.StatusBadRequest) return } s.logger.Printf("starting boot process") _, err = s.store.ReadFrom(bufReader) if err != nil { http.Error(w, err.Error(), http.StatusServiceUnavailable) return } } // handleStatus returns status on the system. func (s *Service) handleStatus(w http.ResponseWriter, r *http.Request, qp QueryParams) { w.Header().Set("Content-Type", "application/json; charset=utf-8") if !s.CheckRequestPerm(r, auth.PermStatus) { w.WriteHeader(http.StatusUnauthorized) return } if r.Method != "GET" { w.WriteHeader(http.StatusMethodNotAllowed) return } storeStatus, err := s.store.Stats() if err != nil { http.Error(w, fmt.Sprintf("store stats: %s", err.Error()), http.StatusInternalServerError) return } clusterStatus, err := s.cluster.Stats() if err != nil { http.Error(w, fmt.Sprintf("cluster stats: %s", err.Error()), http.StatusInternalServerError) return } rt := map[string]interface{}{ "GOARCH": runtime.GOARCH, "GOOS": runtime.GOOS, "GOMAXPROCS": runtime.GOMAXPROCS(0), "num_cpu": runtime.NumCPU(), "num_goroutine": runtime.NumGoroutine(), "version": runtime.Version(), } oss := map[string]interface{}{ "pid": os.Getpid(), "ppid": os.Getppid(), "page_size": os.Getpagesize(), } executable, err := os.Executable() if err == nil { oss["executable"] = executable } hostname, err := os.Hostname() if err == nil { oss["hostname"] = hostname } qs, err := s.stmtQueue.Stats() if err != nil { http.Error(w, fmt.Sprintf("queue stats: %s", err.Error()), http.StatusInternalServerError) return } s.seqNumMu.Lock() qs["sequence_number"] = s.seqNum s.seqNumMu.Unlock() queueStats := map[string]interface{}{ "_default": qs, } httpStatus := map[string]interface{}{ "bind_addr": s.Addr().String(), "auth": prettyEnabled(s.credentialStore != nil), "cluster": clusterStatus, "queue": queueStats, "tls": s.tlsStats(), } nodeStatus := map[string]interface{}{ "start_time": s.start, "current_time": time.Now(), "uptime": time.Since(s.start).String(), } // Build the status response. status := map[string]interface{}{ "os": oss, "runtime": rt, "store": storeStatus, "http": httpStatus, "node": nodeStatus, } if !s.lastBackup.IsZero() { status["last_backup_time"] = s.lastBackup } if s.BuildInfo != nil { status["build"] = s.BuildInfo } // Add any registered StatusReporters. func() { s.statusMu.RLock() defer s.statusMu.RUnlock() for k, v := range s.statuses { stat, err := v.Stats() if err != nil { http.Error(w, fmt.Sprintf("registered stats: %s", err.Error()), http.StatusInternalServerError) return } status[k] = stat } }() var b []byte if qp.Pretty() { b, err = json.MarshalIndent(status, "", " ") } else { b, err = json.Marshal(status) } if err != nil { http.Error(w, fmt.Sprintf("JSON marshal: %s", err.Error()), http.StatusInternalServerError) return } b, err = getSubJSON(b, qp.Key()) if err != nil { http.Error(w, fmt.Sprintf("JSON subkey: %s", err.Error()), http.StatusInternalServerError) return } _, err = w.Write(b) if err != nil { http.Error(w, fmt.Sprintf("write: %s", err.Error()), http.StatusInternalServerError) return } } // handleNodes returns status on the other voting nodes in the system. // This attempts to contact all the nodes in the cluster, so may take // some time to return. func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request, qp QueryParams) { w.Header().Set("Content-Type", "application/json; charset=utf-8") if !s.CheckRequestPerm(r, auth.PermStatus) { w.WriteHeader(http.StatusUnauthorized) return } if r.Method != "GET" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Get nodes in the cluster, and possibly filter out non-voters. sNodes, err := s.store.Nodes() if err != nil { statusCode := http.StatusInternalServerError if err == store.ErrNotOpen { statusCode = http.StatusServiceUnavailable } http.Error(w, fmt.Sprintf("store nodes: %s", err.Error()), statusCode) return } nodes := NewNodesFromServers(sNodes) if !qp.NonVoters() { nodes = nodes.Voters() } // Now test the nodes lAddr, err := s.store.LeaderAddr() if err != nil { http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), http.StatusInternalServerError) return } nodes.Test(s.cluster, lAddr, qp.Timeout(defaultTimeout)) enc := NewNodesRespEncoder(w, qp.Version() != "2") if qp.Pretty() { enc.SetIndent("", " ") } err = enc.Encode(nodes) if err != nil { http.Error(w, fmt.Sprintf("JSON marshal: %s", err.Error()), http.StatusInternalServerError) } } // handleReadyz returns whether the node is ready. func (s *Service) handleReadyz(w http.ResponseWriter, r *http.Request, qp QueryParams) { if !s.CheckRequestPerm(r, auth.PermReady) { w.WriteHeader(http.StatusUnauthorized) return } if r.Method != "GET" { w.WriteHeader(http.StatusMethodNotAllowed) return } if qp.NoLeader() { // Simply handling the HTTP request is enough. w.WriteHeader(http.StatusOK) w.Write([]byte("[+]node ok")) return } lAddr, err := s.store.LeaderAddr() if err != nil { http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), http.StatusInternalServerError) return } if lAddr == "" { w.WriteHeader(http.StatusServiceUnavailable) w.Write([]byte("[+]node ok\n[+]leader does not exist")) return } _, err = s.cluster.GetNodeAPIAddr(lAddr, qp.Timeout(defaultTimeout)) if err != nil { w.WriteHeader(http.StatusServiceUnavailable) w.Write([]byte(fmt.Sprintf("[+]node ok\n[+]leader not contactable: %s", err.Error()))) return } if !s.store.Ready() { w.WriteHeader(http.StatusServiceUnavailable) w.Write([]byte("[+]node ok\n[+]leader ok\n[+]store not ready")) return } okMsg := "[+]node ok\n[+]leader ok\n[+]store ok" if qp.Sync() { if _, err := s.store.Committed(qp.Timeout(defaultTimeout)); err != nil { w.WriteHeader(http.StatusServiceUnavailable) w.Write([]byte(fmt.Sprintf("[+]node ok\n[+]leader ok\n[+]store ok\n[+]sync %s", err.Error()))) return } okMsg += "\n[+]sync ok" } w.WriteHeader(http.StatusOK) w.Write([]byte(okMsg)) } func (s *Service) handleExecute(w http.ResponseWriter, r *http.Request, qp QueryParams) { w.Header().Set("Content-Type", "application/json; charset=utf-8") if !s.CheckRequestPerm(r, auth.PermExecute) { w.WriteHeader(http.StatusUnauthorized) return } if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } if qp.Queue() { stats.Add(numQueuedExecutions, 1) s.queuedExecute(w, r, qp) } else { s.execute(w, r, qp) } } // queuedExecute handles queued queries that modify the database. func (s *Service) queuedExecute(w http.ResponseWriter, r *http.Request, qp QueryParams) { resp := NewResponse() // Perform a leader check, unless disabled. This prevents generating queued writes on // a node that does not appear to be connected to a cluster (even a single-node cluster). if !qp.NoLeader() { addr, err := s.store.LeaderAddr() if err != nil || addr == "" { stats.Add(numLeaderNotFound, 1) http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) return } } b, err := io.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } r.Body.Close() stmts, err := ParseRequest(b) if err != nil { if errors.Is(err, ErrNoStatements) && !qp.Wait() { http.Error(w, err.Error(), http.StatusBadRequest) return } } if err := command.Rewrite(stmts, !qp.NoRewriteRandom()); err != nil { http.Error(w, fmt.Sprintf("SQL rewrite: %s", err.Error()), http.StatusInternalServerError) return } var fc queue.FlushChannel if qp.Wait() { stats.Add(numQueuedExecutionsWait, 1) fc = make(queue.FlushChannel) } seqNum, err := s.stmtQueue.Write(stmts, fc) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } resp.SequenceNum = seqNum if qp.Wait() { // Wait for the flush channel to close, or timeout. select { case <-fc: break case <-time.NewTimer(qp.Timeout(defaultTimeout)).C: http.Error(w, "timeout", http.StatusRequestTimeout) return } } resp.end = time.Now() s.writeResponse(w, r, qp, resp) } // execute handles queries that modify the database. func (s *Service) execute(w http.ResponseWriter, r *http.Request, qp QueryParams) { resp := NewResponse() b, err := io.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } r.Body.Close() stmts, err := ParseRequest(b) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } stats.Add(numExecuteStmtsRx, int64(len(stmts))) if err := command.Rewrite(stmts, !qp.NoRewriteRandom()); err != nil { http.Error(w, fmt.Sprintf("SQL rewrite: %s", err.Error()), http.StatusInternalServerError) return } er := &proto.ExecuteRequest{ Request: &proto.Request{ Transaction: qp.Tx(), DbTimeout: int64(qp.DBTimeout(0)), Statements: stmts, }, Timings: qp.Timings(), } results, resultsErr := s.store.Execute(er) if resultsErr != nil && resultsErr == store.ErrNotLeader { if s.DoRedirect(w, r, qp) { return } addr, err := s.store.LeaderAddr() if err != nil { http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), http.StatusInternalServerError) return } if addr == "" { stats.Add(numLeaderNotFound, 1) http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) return } username, password, ok := r.BasicAuth() if !ok { username = "" } w.Header().Add(ServedByHTTPHeader, addr) results, resultsErr = s.cluster.Execute(er, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout), qp.Retries(0)) if resultsErr != nil { stats.Add(numRemoteExecutionsFailed, 1) if resultsErr.Error() == "unauthorized" { http.Error(w, "remote Execute not authorized", http.StatusUnauthorized) return } resultsErr = fmt.Errorf("node failed to process Execute on remote node at %s: %s", addr, resultsErr.Error()) } stats.Add(numRemoteExecutions, 1) } if resultsErr != nil { resp.Error = resultsErr.Error() } else { resp.Results.ExecuteResult = results } resp.end = time.Now() s.writeResponse(w, r, qp, resp) } // handleQuery handles queries that do not modify the database. func (s *Service) handleQuery(w http.ResponseWriter, r *http.Request, qp QueryParams) { w.Header().Set("Content-Type", "application/json; charset=utf-8") if !s.CheckRequestPerm(r, auth.PermQuery) { w.WriteHeader(http.StatusUnauthorized) return } if r.Method != "GET" && r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Get the query statement(s), and do tx if necessary. queries, err := requestQueries(r, qp) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } stats.Add(numQueryStmtsRx, int64(len(queries))) // No point rewriting queries if they don't go through the Raft log, since they // will never be replayed from the log anyway. if qp.Level() == proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG { if err := command.Rewrite(queries, qp.NoRewriteRandom()); err != nil { http.Error(w, fmt.Sprintf("SQL rewrite: %s", err.Error()), http.StatusInternalServerError) return } } resp := NewResponse() resp.Results.AssociativeJSON = qp.Associative() qr := &proto.QueryRequest{ Request: &proto.Request{ Transaction: qp.Tx(), DbTimeout: int64(qp.DBTimeout(0)), Statements: queries, }, Timings: qp.Timings(), Level: qp.Level(), Freshness: qp.Freshness().Nanoseconds(), FreshnessStrict: qp.FreshnessStrict(), } results, resultsErr := s.store.Query(qr) if resultsErr != nil && resultsErr == store.ErrNotLeader { if s.DoRedirect(w, r, qp) { return } addr, err := s.store.LeaderAddr() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if addr == "" { stats.Add(numLeaderNotFound, 1) http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) return } username, password, ok := r.BasicAuth() if !ok { username = "" } w.Header().Add(ServedByHTTPHeader, addr) results, resultsErr = s.cluster.Query(qr, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout)) if resultsErr != nil { stats.Add(numRemoteQueriesFailed, 1) if resultsErr.Error() == "unauthorized" { http.Error(w, "remote query not authorized", http.StatusUnauthorized) return } resultsErr = fmt.Errorf("node failed to process Query on remote node at %s: %s", addr, resultsErr.Error()) } stats.Add(numRemoteQueries, 1) } if resultsErr != nil { resp.Error = resultsErr.Error() } else { resp.Results.QueryRows = results } resp.end = time.Now() s.writeResponse(w, r, qp, resp) } func (s *Service) handleRequest(w http.ResponseWriter, r *http.Request, qp QueryParams) { w.Header().Set("Content-Type", "application/json; charset=utf-8") if !s.CheckRequestPermAll(r, auth.PermQuery, auth.PermExecute) { w.WriteHeader(http.StatusUnauthorized) return } if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } b, err := io.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } r.Body.Close() stmts, err := ParseRequest(b) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } stats.Add(numRequestStmtsRx, int64(len(stmts))) if err := command.Rewrite(stmts, qp.NoRewriteRandom()); err != nil { http.Error(w, fmt.Sprintf("SQL rewrite: %s", err.Error()), http.StatusInternalServerError) return } resp := NewResponse() resp.Results.AssociativeJSON = qp.Associative() eqr := &proto.ExecuteQueryRequest{ Request: &proto.Request{ Transaction: qp.Tx(), Statements: stmts, DbTimeout: int64(qp.DBTimeout(0)), }, Timings: qp.Timings(), Level: qp.Level(), Freshness: qp.Freshness().Nanoseconds(), FreshnessStrict: qp.FreshnessStrict(), } results, resultsErr := s.store.Request(eqr) if resultsErr != nil && resultsErr == store.ErrNotLeader { if s.DoRedirect(w, r, qp) { return } addr, err := s.store.LeaderAddr() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if addr == "" { stats.Add(numLeaderNotFound, 1) http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) return } username, password, ok := r.BasicAuth() if !ok { username = "" } w.Header().Add(ServedByHTTPHeader, addr) results, resultsErr = s.cluster.Request(eqr, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout), qp.Retries(0)) if resultsErr != nil { stats.Add(numRemoteRequestsFailed, 1) if resultsErr.Error() == "unauthorized" { http.Error(w, "remote Request not authorized", http.StatusUnauthorized) return } resultsErr = fmt.Errorf("node failed to process Request on remote node at %s: %s", addr, resultsErr.Error()) } stats.Add(numRemoteRequests, 1) } if resultsErr != nil { resp.Error = resultsErr.Error() } else { resp.Results.ExecuteQueryResponse = results } resp.end = time.Now() s.writeResponse(w, r, qp, resp) } // handleExpvar serves registered expvar information over HTTP. func (s *Service) handleExpvar(w http.ResponseWriter, r *http.Request, qp QueryParams) { w.Header().Set("Content-Type", "application/json; charset=utf-8") if !s.CheckRequestPerm(r, auth.PermStatus) { w.WriteHeader(http.StatusUnauthorized) return } fmt.Fprintf(w, "{\n") first := true expvar.Do(func(kv expvar.KeyValue) { if qp.Key() != "" && qp.Key() != kv.Key { return } if !first { fmt.Fprintf(w, ",\n") } first = false fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value) }) fmt.Fprintf(w, "\n}\n") } // handlePprof serves pprof information over HTTP. func (s *Service) handlePprof(w http.ResponseWriter, r *http.Request, qp QueryParams) { if !s.CheckRequestPerm(r, auth.PermStatus) { w.WriteHeader(http.StatusUnauthorized) return } switch r.URL.Path { case "/debug/pprof/cmdline": pprof.Cmdline(w, r) case "/debug/pprof/profile": pprof.Profile(w, r) case "/debug/pprof/symbol": pprof.Symbol(w, r) default: pprof.Index(w, r) } } // Addr returns the address on which the Service is listening func (s *Service) Addr() net.Addr { return s.ln.Addr() } // DoRedirect checks if the request is a redirect, and if so, performs the redirect. // Returns true caller can consider the request handled. Returns false if the request // was not a redirect and the caller should continue processing the request. func (s *Service) DoRedirect(w http.ResponseWriter, r *http.Request, qp QueryParams) bool { if !qp.Redirect() { return false } rd, err := s.FormRedirect(r) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } else { http.Redirect(w, r, rd, http.StatusMovedPermanently) } return true } // FormRedirect returns the value for the "Location" header for a 301 response. func (s *Service) FormRedirect(r *http.Request) (string, error) { leaderAPIAddr := s.LeaderAPIAddr() if leaderAPIAddr == "" { stats.Add(numLeaderNotFound, 1) return "", ErrLeaderNotFound } rq := r.URL.RawQuery if rq != "" { rq = fmt.Sprintf("?%s", rq) } return fmt.Sprintf("%s%s%s", leaderAPIAddr, r.URL.Path, rq), nil } // CheckRequestPerm checks if the request is authenticated and authorized // with the given Perm. func (s *Service) CheckRequestPerm(r *http.Request, perm string) (b bool) { defer func() { if b { stats.Add(numAuthOK, 1) } else { stats.Add(numAuthFail, 1) } }() // No auth store set, so no checking required. if s.credentialStore == nil { return true } username, password, ok := r.BasicAuth() if !ok { username = "" } return s.credentialStore.AA(username, password, perm) } // CheckRequestPermAll checksif the request is authenticated and authorized // with all the given Perms. func (s *Service) CheckRequestPermAll(r *http.Request, perms ...string) (b bool) { defer func() { if b { stats.Add(numAuthOK, 1) } else { stats.Add(numAuthFail, 1) } }() // No auth store set, so no checking required. if s.credentialStore == nil { return true } username, password, ok := r.BasicAuth() if !ok { username = "" } for _, perm := range perms { if !s.credentialStore.AA(username, password, perm) { return false } } return true } // LeaderAPIAddr returns the API address of the leader, as known by this node. func (s *Service) LeaderAPIAddr() string { nodeAddr, err := s.store.LeaderAddr() if err != nil { return "" } apiAddr, err := s.cluster.GetNodeAPIAddr(nodeAddr, defaultTimeout) if err != nil { return "" } return apiAddr } func (s *Service) runQueue() { defer close(s.queueDone) retryDelay := time.Second var err error for { select { case <-s.closeCh: return case req := <-s.stmtQueue.C: er := &proto.ExecuteRequest{ Request: &proto.Request{ Statements: req.Statements, Transaction: s.DefaultQueueTx, }, } stats.Add(numQueuedExecutionsStmtsRx, int64(len(req.Statements))) // Nil statements are valid, as clients may want to just send // a "checkpoint" through the queue. if er.Request.Statements != nil { for { _, err = s.store.Execute(er) if err == nil { // Success! break } if err == store.ErrNotLeader { addr, err := s.store.LeaderAddr() if err != nil || addr == "" { s.logger.Printf("execute queue can't find leader for sequence number %d on node %s", req.SequenceNumber, s.Addr().String()) stats.Add(numQueuedExecutionsNoLeader, 1) } else { _, err = s.cluster.Execute(er, addr, nil, defaultTimeout, 0) if err != nil { s.logger.Printf("execute queue write failed for sequence number %d on node %s: %s", req.SequenceNumber, s.Addr().String(), err.Error()) if err.Error() == "leadership lost while committing log" { stats.Add(numQueuedExecutionsLeadershipLost, 1) } else if err.Error() == "not leader" { stats.Add(numQueuedExecutionsNotLeader, 1) } else { stats.Add(numQueuedExecutionsUnknownError, 1) } } else { // Success! stats.Add(numRemoteExecutions, 1) break } } } stats.Add(numQueuedExecutionsFailed, 1) time.Sleep(retryDelay) } } // Perform post-write processing. s.seqNumMu.Lock() s.seqNum = req.SequenceNumber s.seqNumMu.Unlock() req.Close() stats.Add(numQueuedExecutionsStmtsTx, int64(len(req.Statements))) stats.Add(numQueuedExecutionsOK, 1) } } } // addBuildVersion adds the build version to the HTTP response. func (s *Service) addBuildVersion(w http.ResponseWriter) { // Add version header to every response, if available. version := "unknown" if v, ok := s.BuildInfo["version"].(string); ok { version = v } w.Header().Add(VersionHTTPHeader, version) } // addAllowHeaders adds the Access-Control-Allow-Origin, Access-Control-Allow-Methods, // and Access-Control-Allow-Headers headers to the HTTP response. func (s *Service) addAllowHeaders(w http.ResponseWriter) { if s.AllowOrigin != "" { w.Header().Add(AllowOriginHeader, s.AllowOrigin) } w.Header().Add(AllowMethodsHeader, "OPTIONS, GET, POST") if s.credentialStore == nil { w.Header().Add(AllowHeadersHeader, "Content-Type") } else { w.Header().Add(AllowHeadersHeader, "Content-Type, Authorization") w.Header().Add(AllowCredentialsHeader, "true") } } // addBackupFormatHeader adds the Content-Type header for the backup format. func addBackupFormatHeader(w http.ResponseWriter, qp QueryParams) { w.Header().Set("Content-Type", "application/octet-stream") if qp.BackupFormat() == proto.BackupRequest_BACKUP_REQUEST_FORMAT_SQL { w.Header().Set("Content-Type", "application/sql") } } // tlsStats returns the TLS stats for the service. func (s *Service) tlsStats() map[string]interface{} { m := map[string]interface{}{ "enabled": fmt.Sprintf("%t", s.tlsConfig != nil), } if s.tlsConfig != nil { m["client_auth"] = s.tlsConfig.ClientAuth.String() m["cert_file"] = s.CertFile m["key_file"] = s.KeyFile m["ca_file"] = s.CACertFile m["next_protos"] = s.tlsConfig.NextProtos } return m } // writeResponse writes the given response to the given writer. func (s *Service) writeResponse(w http.ResponseWriter, r *http.Request, qp QueryParams, j Responser) { var b []byte var err error if qp.Timings() { j.SetTime() } if qp.Pretty() { b, err = json.MarshalIndent(j, "", " ") } else { b, err = json.Marshal(j) } if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } _, err = w.Write(b) if err != nil { s.logger.Println("writing response failed:", err.Error()) } } func requestQueries(r *http.Request, qp QueryParams) ([]*proto.Statement, error) { if r.Method == "GET" { return []*proto.Statement{ { Sql: qp.Query(), }, }, nil } b, err := io.ReadAll(r.Body) if err != nil { return nil, errors.New("bad query POST request") } r.Body.Close() return ParseRequest(b) } func getSubJSON(jsonBlob []byte, keyString string) (json.RawMessage, error) { if keyString == "" { return jsonBlob, nil } keys := strings.Split(keyString, ".") var obj interface{} if err := json.Unmarshal(jsonBlob, &obj); err != nil { return nil, fmt.Errorf("failed to unmarshal json: %w", err) } for _, key := range keys { switch val := obj.(type) { case map[string]interface{}: if value, ok := val[key]; ok { obj = value } else { emptyObj := json.RawMessage("{}") return emptyObj, nil } default: // If a value is not a map, marshal and return this value finalObjBytes, err := json.Marshal(obj) if err != nil { return nil, fmt.Errorf("failed to marshal final object: %w", err) } return finalObjBytes, nil } } finalObjBytes, err := json.Marshal(obj) if err != nil { return nil, fmt.Errorf("failed to marshal final object: %w", err) } return finalObjBytes, nil } func prettyEnabled(e bool) string { if e { return "enabled" } return "disabled" } // queryRequestFromStrings converts a slice of strings into a command.QueryRequest func executeRequestFromStrings(s []string, timings, tx bool) *proto.ExecuteRequest { stmts := make([]*proto.Statement, len(s)) for i := range s { stmts[i] = &proto.Statement{ Sql: s[i], } } return &proto.ExecuteRequest{ Request: &proto.Request{ Statements: stmts, Transaction: tx, }, Timings: timings, } } func makeCredentials(username, password string) *clstrPB.Credentials { return &clstrPB.Credentials{ Username: username, Password: password, } }