1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
2.6 KiB
Go

package progress
import (
"context"
"io"
"sync"
"time"
)
const (
countingMonitorInterval = 10 * time.Second
)
// CountingReader is an io.Reader that counts the number of bytes read.
type CountingReader struct {
reader io.Reader
mu sync.RWMutex
count int64
}
// NewCountingReader returns a new CountingReader.
func NewCountingReader(reader io.Reader) *CountingReader {
return &CountingReader{reader: reader}
}
// Read reads from the underlying reader, and counts the number of bytes read.
func (c *CountingReader) Read(p []byte) (int, error) {
n, err := c.reader.Read(p)
c.mu.Lock()
defer c.mu.Unlock()
c.count += int64(n)
return n, err
}
// Count returns the number of bytes read.
func (c *CountingReader) Count() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.count
}
// CountingWriter is an io.Writer that counts the number of bytes written.
type CountingWriter struct {
writer io.Writer
mu sync.RWMutex
count int64
}
// NewCountingWriter returns a new CountingWriter.
func NewCountingWriter(writer io.Writer) *CountingWriter {
return &CountingWriter{writer: writer}
}
// Write writes to the underlying writer, and counts the number of bytes written.
func (c *CountingWriter) Write(p []byte) (int, error) {
n, err := c.writer.Write(p)
c.mu.Lock()
defer c.mu.Unlock()
c.count += int64(n)
return n, err
}
// Count returns the number of bytes written.
func (c *CountingWriter) Count() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.count
}
// LoggerFunc is a function that can be used to log the current count.
type LoggerFunc func(n int64)
// Counter is an interface that can be used to get the current count.
type Counter interface {
Count() int64
}
// CountingMonitor is a monitor that periodically logs the current count.
type CountingMonitor struct {
loggerFn LoggerFunc
ctr Counter
once sync.Once
cancel func()
doneCh chan struct{}
}
// StartCountingMonitor starts a CountingMonitor.
func StartCountingMonitor(loggerFn LoggerFunc, ctr Counter) *CountingMonitor {
ctx, cancel := context.WithCancel(context.Background())
m := &CountingMonitor{
loggerFn: loggerFn,
ctr: ctr,
cancel: cancel,
doneCh: make(chan struct{}),
}
go m.run(ctx)
return m
}
func (cm *CountingMonitor) run(ctx context.Context) {
defer close(cm.doneCh)
ticker := time.NewTicker(countingMonitorInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cm.runOnce()
}
}
}
func (cm *CountingMonitor) runOnce() {
cm.loggerFn(cm.ctr.Count())
}
func (m *CountingMonitor) StopAndWait() {
m.once.Do(func() {
m.cancel()
<-m.doneCh
})
}