|
|
|
@ -270,10 +270,8 @@ type Service struct {
|
|
|
|
|
// the service performs no authentication and authorization checks.
|
|
|
|
|
func New(addr string, store Store, cluster Cluster, credentials CredentialStore) *Service {
|
|
|
|
|
return &Service{
|
|
|
|
|
closeCh: make(chan struct{}),
|
|
|
|
|
addr: addr,
|
|
|
|
|
store: store,
|
|
|
|
|
queueDone: make(chan struct{}),
|
|
|
|
|
DefaultQueueCap: 1024,
|
|
|
|
|
DefaultQueueBatchSz: 128,
|
|
|
|
|
DefaultQueueTimeout: 100 * time.Millisecond,
|
|
|
|
@ -311,6 +309,9 @@ func (s *Service) Start() error {
|
|
|
|
|
}
|
|
|
|
|
s.ln = ln
|
|
|
|
|
|
|
|
|
|
s.closeCh = make(chan struct{})
|
|
|
|
|
s.queueDone = make(chan struct{})
|
|
|
|
|
|
|
|
|
|
s.stmtQueue = queue.New(s.DefaultQueueCap, s.DefaultQueueBatchSz, s.DefaultQueueTimeout)
|
|
|
|
|
go s.runQueue()
|
|
|
|
|
s.logger.Printf("execute queue processing started with capacity %d, batch size %d, timeout %s",
|
|
|
|
@ -330,7 +331,11 @@ func (s *Service) Start() error {
|
|
|
|
|
// Close closes the service.
|
|
|
|
|
func (s *Service) Close() {
|
|
|
|
|
s.stmtQueue.Close()
|
|
|
|
|
close(s.closeCh)
|
|
|
|
|
select {
|
|
|
|
|
case <-s.closeCh:
|
|
|
|
|
default:
|
|
|
|
|
close(s.closeCh)
|
|
|
|
|
}
|
|
|
|
|
s.ln.Close()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|