1
0
Fork 0

Start hooking in queuing to HTTP layer

master
Philip O'Toole 2 years ago
parent e03cfd836d
commit 02b8776c29

@ -144,20 +144,22 @@ type Response struct {
var stats *expvar.Map
const (
numLeaderNotFound = "leader_not_found"
numExecutions = "executions"
numQueuedExecutions = "queued_executions"
numQueries = "queries"
numRemoteExecutions = "remote_executions"
numRemoteQueries = "remote_queries"
numReadyz = "num_readyz"
numStatus = "num_status"
numBackups = "backups"
numLoad = "loads"
numJoins = "joins"
numNotifies = "notifies"
numAuthOK = "authOK"
numAuthFail = "authFail"
numLeaderNotFound = "leader_not_found"
numExecutions = "executions"
numQueuedExecutions = "queued_executions"
numQueuedExecutionsOK = "queued_executions_ok"
numQueuedExecutionsFailed = "queued_executions_failed"
numQueries = "queries"
numRemoteExecutions = "remote_executions"
numRemoteQueries = "remote_queries"
numReadyz = "num_readyz"
numStatus = "num_status"
numBackups = "backups"
numLoad = "loads"
numJoins = "joins"
numNotifies = "notifies"
numAuthOK = "authOK"
numAuthFail = "authFail"
// Default timeout for cluster communications.
defaultTimeout = 30 * time.Second
@ -188,12 +190,17 @@ const (
// node (by node Raft address) actually served the request if
// it wasn't served by this node.
ServedByHTTPHeader = "X-RQLITE-SERVED-BY"
noTimeout = 0
)
func init() {
stats = expvar.NewMap("http")
stats.Add(numLeaderNotFound, 0)
stats.Add(numExecutions, 0)
stats.Add(numQueuedExecutions, 0)
stats.Add(numQueuedExecutionsOK, 0)
stats.Add(numQueuedExecutionsFailed, 0)
stats.Add(numQueries, 0)
stats.Add(numRemoteExecutions, 0)
stats.Add(numRemoteQueries, 0)
@ -223,11 +230,13 @@ func NewResponse() *Response {
// Service provides HTTP service.
type Service struct {
addr string // Bind address of the HTTP service.
ln net.Listener // Service listener
closeCh chan struct{}
addr string // Bind address of the HTTP service.
ln net.Listener // Service listener
store Store // The Raft-backed database store.
queueDone chan struct{}
stmtQueue *queue.Queue // Queue for queued executes
cluster Cluster // The Cluster service.
@ -257,8 +266,10 @@ type Service struct {
// the service performs no authentication and authorization checks.
func New(addr string, store Store, cluster Cluster, credentials CredentialStore) *Service {
return &Service{
closeCh: make(chan struct{}),
addr: addr,
store: store,
queueDone: make(chan struct{}),
stmtQueue: queue.New(1024, 64, 100*time.Millisecond),
cluster: cluster,
start: time.Now(),
@ -294,6 +305,9 @@ func (s *Service) Start() error {
}
s.ln = ln
go s.runQueue()
s.logger.Println("execute queue processing started")
go func() {
err := server.Serve(s.ln)
if err != nil {
@ -307,6 +321,9 @@ func (s *Service) Start() error {
// Close closes the service.
func (s *Service) Close() {
close(s.closeCh)
<-s.queueDone
s.ln.Close()
return
}
@ -1212,6 +1229,46 @@ func (s *Service) LeaderAPIAddr() string {
return apiAddr
}
func (s *Service) runQueue() {
defer close(s.queueDone)
for {
select {
case <-s.closeCh:
return
case stmts := <-s.stmtQueue.C:
er := &command.ExecuteRequest{
Request: &command.Request{
Statements: stmts,
},
}
for {
_, resultsErr := s.store.Execute(er)
if resultsErr != nil && resultsErr == store.ErrNotLeader {
addr, err := s.store.LeaderAddr()
if err == nil && addr != "" {
_, resultsErr = s.cluster.Execute(er, addr, noTimeout)
if resultsErr == nil {
stats.Add(numRemoteExecutions, 1)
stats.Add(numQueuedExecutionsOK, 1)
break
} else {
stats.Add(numQueuedExecutionsFailed, 1)
}
}
}
if resultsErr != nil {
stats.Add(numQueuedExecutionsFailed, 1)
break
} else {
stats.Add(numQueuedExecutionsOK, 1)
}
}
}
}
}
type checkNodesResponse struct {
apiAddr string
reachable bool

Loading…
Cancel
Save