diff --git a/pipeline/linechartpipeline.go b/pipeline/linechartpipeline.go index 780d619..ff59500 100644 --- a/pipeline/linechartpipeline.go +++ b/pipeline/linechartpipeline.go @@ -3,9 +3,10 @@ package pipeline import ( "fmt" "os" + "sync" + "time" "github.com/kbence/logan/types" - "github.com/kbence/logan/utils" "github.com/kbence/logan/utils/terminfo" ) @@ -22,7 +23,7 @@ type LineChartPipeline struct { settings LineChartSettings chartSettings *types.ChartSettings sampler *types.TimelineSampler - timer *utils.UpdateTimer + renderLock *sync.Mutex } func NewLineChartPipeline(input types.LogLineChannel, settings LineChartSettings) *LineChartPipeline { @@ -43,7 +44,7 @@ func NewLineChartPipeline(input types.LogLineChannel, settings LineChartSettings settings: settings, chartSettings: chartSettings, sampler: sampler, - timer: utils.NewUpdateTimer(), + renderLock: &sync.Mutex{}, } } @@ -53,20 +54,30 @@ func (p *LineChartPipeline) render() { fmt.Println(chartRenderer.Render()) } -func (p *LineChartPipeline) updateIfNeeded() { - if !p.settings.FrequentUpdates { - return - } - - if p.timer.IsUpdateNeeded() { - p.render() - fmt.Print(terminfo.GoUpBy(p.settings.Height - 1)) - os.Stdout.Sync() +func (p *LineChartPipeline) updateLoop(exitChannel chan bool) { + for { + select { + case <-time.After(time.Second): + p.renderLock.Lock() + p.render() + fmt.Print(terminfo.GoUpBy(p.settings.Height - 1)) + os.Stdout.Sync() + p.renderLock.Unlock() + break + + case <-exitChannel: + return + } } } func (p *LineChartPipeline) Start() chan bool { exitChannel := make(chan bool) + updateExitChannel := make(chan bool) + + if p.settings.FrequentUpdates { + go p.updateLoop(updateExitChannel) + } go func() { for { @@ -77,10 +88,16 @@ func (p *LineChartPipeline) Start() chan bool { } p.sampler.Inc(line.Date, 1) - p.updateIfNeeded() } + if p.settings.FrequentUpdates { + // Exit update goproc _before_ we start rendering + updateExitChannel <- true + } + + p.renderLock.Lock() p.render() + p.renderLock.Unlock() exitChannel <- true }() diff --git a/pipeline/uniquepipeline.go b/pipeline/uniquepipeline.go index 6909244..dd8837f 100644 --- a/pipeline/uniquepipeline.go +++ b/pipeline/uniquepipeline.go @@ -3,10 +3,11 @@ package pipeline import ( "fmt" "strings" + "sync" + "time" "unicode/utf8" "github.com/kbence/logan/types" - "github.com/kbence/logan/utils" "github.com/kbence/logan/utils/terminfo" ) @@ -19,16 +20,16 @@ type UniquePipeline struct { input types.LogLineChannel counter *types.LogLineCounter settings UniqueSettings - timer *utils.UpdateTimer + renderLock *sync.Mutex lastPrintedLines int } func NewUniquePipeline(input types.LogLineChannel, settings UniqueSettings) *UniquePipeline { return &UniquePipeline{ - input: input, - counter: types.NewLogLineCounter(), - settings: settings, - timer: &utils.UpdateTimer{}, + input: input, + counter: types.NewLogLineCounter(), + settings: settings, + renderLock: &sync.Mutex{}, } } @@ -102,26 +103,36 @@ func (p *UniquePipeline) clearLines(lines, width int) { fmt.Println(terminfo.GoUpBy(lines)) } -func (p *UniquePipeline) updateIfNeeded() { - if p.settings.TopLimit <= 0 { - return - } +func (p *UniquePipeline) updateLoop(exitChannel chan bool) { + for { + select { + case <-time.After(time.Second): + p.renderLock.Lock() + p.clearLines(p.lastPrintedLines, p.settings.TerminalWidth) - if p.timer.IsUpdateNeeded() { - p.clearLines(p.lastPrintedLines, p.settings.TerminalWidth) + numPrintedLines := p.printUniqueLines() - numPrintedLines := p.printUniqueLines() + if numPrintedLines > 0 { + fmt.Print(terminfo.GoUpBy(numPrintedLines)) + } - if numPrintedLines > 0 { - fmt.Print(terminfo.GoUpBy(numPrintedLines)) - } + p.lastPrintedLines = numPrintedLines + p.renderLock.Unlock() + break - p.lastPrintedLines = numPrintedLines + case <-exitChannel: + return + } } } func (p *UniquePipeline) Start() chan bool { exitChannel := make(chan bool) + updateExitChannel := make(chan bool) + + if p.settings.TopLimit > 0 { + go p.updateLoop(updateExitChannel) + } go func() { for { @@ -132,10 +143,16 @@ func (p *UniquePipeline) Start() chan bool { } p.storeUniqueLine(line) - p.updateIfNeeded() } + if p.settings.TopLimit > 0 { + updateExitChannel <- true + } + + p.renderLock.Lock() p.printUniqueLines() + p.renderLock.Unlock() + exitChannel <- true }() diff --git a/utils/updatetimer.go b/utils/updatetimer.go deleted file mode 100644 index e1f297e..0000000 --- a/utils/updatetimer.go +++ /dev/null @@ -1,37 +0,0 @@ -package utils - -import "time" - -type UpdateTimer struct { - lastUpdateTime *time.Time - lastUpdate uint64 - received uint64 - nextUpdate uint64 -} - -func NewUpdateTimer() *UpdateTimer { - return &UpdateTimer{} -} - -func (t *UpdateTimer) IsUpdateNeeded() bool { - t.received++ - - if t.lastUpdateTime == nil || t.received >= t.nextUpdate { - currentTime := time.Now() - - if t.lastUpdateTime == nil { - t.nextUpdate = t.received + 10 - } else { - next := 1000000000 * (t.received - t.lastUpdate) / - uint64(currentTime.Sub(*t.lastUpdateTime)) - t.nextUpdate = t.received + next - } - - t.lastUpdateTime = ¤tTime - t.lastUpdate = t.received - - return true - } - - return false -}