diff --git a/.github/workflows/release-assets.yml b/.github/workflows/release-assets.yml index bfb238b..06a3906 100644 --- a/.github/workflows/release-assets.yml +++ b/.github/workflows/release-assets.yml @@ -8,7 +8,9 @@ on: - created env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - GO_VERSION: 1.21.x + GO_VERSION: '1.22.0-rc.1' + LINUX_AMD64_BUILD_OPTIONS: '-tags cgo_zstd' + GOAMD64: v3 jobs: build: name: Upload Release Assets diff --git a/.golangci.yml b/.golangci.yml index a035208..c2a2977 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -16,6 +16,10 @@ linters-settings: check-exported: false unparam: check-exported: true + funlen: + lines: 80 + cyclop: + max-complexity: 15 linters: enable-all: true diff --git a/Makefile b/Makefile index 06f5916..650b826 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,6 @@ ifeq ($(DEVGO_PATH),) endif -export CGO_ENABLED ?= 0 BUILD_PKG = ./cmd/catp BUILD_LDFLAGS=-s -w diff --git a/cmd/catp/README.md b/cmd/catp/README.md index 0e98765..ab94610 100644 --- a/cmd/catp/README.md +++ b/cmd/catp/README.md @@ -21,10 +21,25 @@ wget https://github.com/bool64/progress/releases/latest/download/linux_amd64.tar Usage of catp: -dbg-cpu-prof string write first 10 seconds of CPU profile to file - -grep string - grep pattern, may contain multiple patterns separated by \| + -dbg-mem-prof string + write heap profile to file after 10 seconds + -grep value + grep pattern, may contain multiple OR patterns separated by \|, + each -grep value is added with AND logic, akin to extra '| grep foo', + for example, you can use '-grep bar\|baz -grep foo' to only keep lines that have (bar OR baz) AND foo + -no-progress + disable progress printing + -out-dir string + output to directory instead of STDOUT + files will be written to out dir with original base names + disables output flag -output string output to file instead of STDOUT + -parallel int + number of parallel readers if multiple files are provided + lines from different files will go to output simultaneouslyuse 0 for multi-threaded zst decoder (slightly faster at cost of more CPU) (default 1) + -progress-json string + write current progress to a file -version print version and exit ``` diff --git a/cmd/catp/bytesContains.pprof b/cmd/catp/bytesContains.pprof new file mode 100644 index 0000000..7b30a45 Binary files /dev/null and b/cmd/catp/bytesContains.pprof differ diff --git a/cmd/catp/catp/app.go b/cmd/catp/catp/app.go index f23d1bd..0b78cfc 100644 --- a/cmd/catp/catp/app.go +++ b/cmd/catp/catp/app.go @@ -4,13 +4,16 @@ package catp import ( "bufio" "bytes" + "encoding/json" "flag" "fmt" "io" "log" "os" + "path" "runtime/pprof" "strings" + "sync" "sync/atomic" "time" @@ -21,27 +24,53 @@ import ( ) type runner struct { - output io.Writer - pr *progress.Progress + mu sync.Mutex + output io.Writer + + pr *progress.Progress + progressJSON string + sizes map[string]int64 matches int64 totalBytes int64 + outDir string - grep [][]byte + parallel int + currentBytes int64 + currentLines int64 + + // grep is a slice of AND items, that are slices of OR items. + grep [][][]byte currentFile *progress.CountingReader currentTotal int64 - lastErr error + + lastErr error } // st renders Status as a string. func (r *runner) st(s progress.Status) string { var res string - if len(r.sizes) > 1 { - fileDonePercent := 100 * float64(r.currentFile.Bytes()) / float64(r.currentTotal) + type progressJSON struct { + progress.Status + BytesCompleted int64 `json:"bytes_completed"` + BytesTotal int64 `json:"bytes_total"` + CurrentFilePercent float64 `json:"current_file_percent,omitempty"` + Matches *int64 `json:"matches,omitempty"` + ElapsedSeconds float64 `json:"elapsed_sec"` + RemainingSeconds float64 `json:"remaining_sec"` + } + + pr := progressJSON{ + Status: s, + } + + if len(r.sizes) > 1 && r.parallel <= 1 { + pr.CurrentFilePercent = 100 * float64(r.currentFile.Bytes()) / float64(r.currentTotal) + res = fmt.Sprintf("all: %.1f%% bytes read, %s: %.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s", - s.DonePercent, s.Task, fileDonePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, + s.DonePercent, s.Task, pr.CurrentFilePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String()) } else { res = fmt.Sprintf("%s: %.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s", @@ -50,46 +79,109 @@ func (r *runner) st(s progress.Status) string { } if r.grep != nil { - res += fmt.Sprintf(", matches %d", atomic.LoadInt64(&r.matches)) + m := atomic.LoadInt64(&r.matches) + pr.Matches = &m + res += fmt.Sprintf(", matches %d", m) + } + + if r.progressJSON != "" { + pr.ElapsedSeconds = pr.Elapsed.Truncate(time.Second).Seconds() + pr.RemainingSeconds = pr.Remaining.Round(time.Second).Seconds() + pr.BytesCompleted = atomic.LoadInt64(&r.currentBytes) + pr.BytesTotal = atomic.LoadInt64(&r.totalBytes) + + if j, err := json.Marshal(pr); err == nil { + if err = os.WriteFile(r.progressJSON, append(j, '\n'), 0o600); err != nil { + println("failed to write progress JSON: " + err.Error()) + } + } } return res } -func (r *runner) readFile(rd io.Reader) { +func (r *runner) readFile(rd io.Reader, out io.Writer) { b := bufio.NewReaderSize(rd, 64*1024) - _, err := io.Copy(r.output, b) + _, err := io.Copy(out, b) if err != nil { log.Fatal(err) } } -func (r *runner) scanFile(rd io.Reader) { +func (r *runner) scanFile(rd io.Reader, out io.Writer) { s := bufio.NewScanner(rd) s.Buffer(make([]byte, 64*1024), 10*1024*1024) + lines := 0 + for s.Scan() { - for _, g := range r.grep { - if bytes.Contains(s.Bytes(), g) { - if _, err := r.output.Write(append(s.Bytes(), '\n')); err != nil { - r.lastErr = err + lines++ - return - } + if lines >= 1000 { + atomic.AddInt64(&r.currentLines, int64(lines)) + lines = 0 + } - atomic.AddInt64(&r.matches, 1) + if !r.shouldWrite(s.Bytes()) { + continue + } - break + atomic.AddInt64(&r.matches, 1) + + if r.parallel > 1 && r.outDir == "" { + r.mu.Lock() + } + + if _, err := out.Write(append(s.Bytes(), '\n')); err != nil { + r.lastErr = err + + if r.parallel > 1 && r.outDir == "" { + r.mu.Unlock() } + + return + } + + if r.parallel > 1 && r.outDir == "" { + r.mu.Unlock() } } + atomic.AddInt64(&r.currentLines, int64(lines)) + if err := s.Err(); err != nil { + r.mu.Lock() + defer r.mu.Unlock() + r.lastErr = err } } +func (r *runner) shouldWrite(line []byte) bool { + shouldWrite := true + + for _, andGrep := range r.grep { + andPassed := false + + for _, orGrep := range andGrep { + if bytes.Contains(line, orGrep) { + andPassed = true + + break + } + } + + if !andPassed { + shouldWrite = false + + break + } + } + + return shouldWrite +} + func (r *runner) cat(filename string) (err error) { file, err := os.Open(filename) //nolint:gosec if err != nil { @@ -102,51 +194,100 @@ func (r *runner) cat(filename string) (err error) { } }() - r.currentFile = &progress.CountingReader{Reader: file} - r.currentTotal = r.sizes[filename] - rd := io.Reader(r.currentFile) - lines := r.currentFile + cr := progress.NewCountingReader(file) + cr.SetBytes(&r.currentBytes) + cr.SetLines(nil) - switch { - case strings.HasSuffix(filename, ".gz"): - if rd, err = gzip.NewReader(rd); err != nil { - return fmt.Errorf("failed to init gzip reader: %w", err) + if r.parallel <= 1 { + cr = progress.NewCountingReader(file) + cr.SetLines(nil) + r.currentFile = cr + r.currentTotal = r.sizes[filename] + } + + rd := io.Reader(cr) + + if rd, err = r.openReader(rd, filename); err != nil { + return err + } + + out := r.output + + if r.outDir != "" { + fn := r.outDir + "/" + path.Base(filename) + if strings.HasSuffix(fn, ".gz") { + fn = strings.TrimSuffix(fn, ".gz") + } else { + fn = strings.TrimSuffix(fn, ".zst") } - lines = &progress.CountingReader{Reader: rd} - rd = lines - case strings.HasSuffix(filename, ".zst"): - if rd, err = zstd.NewReader(rd); err != nil { - return fmt.Errorf("failed to init zst reader: %w", err) + w, err := os.Create(fn) //nolint:gosec + if err != nil { + return err } - lines = &progress.CountingReader{Reader: rd} - rd = lines + defer func() { + if clErr := w.Close(); clErr != nil && err == nil { + err = clErr + } + }() + + out = w } - r.pr.Start(func(t *progress.Task) { - t.TotalBytes = func() int64 { - return r.totalBytes - } - t.CurrentBytes = r.currentFile.Bytes - t.CurrentLines = lines.Lines + if r.parallel <= 1 { + r.pr.Start(func(t *progress.Task) { + t.TotalBytes = func() int64 { + return r.totalBytes + } - t.Task = filename - t.Continue = true - }) + t.CurrentBytes = r.currentFile.Bytes + t.CurrentLines = func() int64 { return atomic.LoadInt64(&r.currentLines) } + t.Task = filename + t.Continue = true + t.PrintOnStart = true + }) + } if len(r.grep) > 0 { - r.scanFile(rd) + r.scanFile(rd, out) } else { - r.readFile(rd) + r.readFile(rd, out) } - r.pr.Stop() + cr.Close() + + if r.parallel <= 1 { + r.pr.Stop() + } return r.lastErr } -func startProfiling(cpuProfile string) { +func (r *runner) openReader(rd io.Reader, filename string) (io.Reader, error) { + var err error + + switch { + case strings.HasSuffix(filename, ".gz"): + if rd, err = gzip.NewReader(rd); err != nil { + return nil, fmt.Errorf("failed to init gzip reader: %w", err) + } + case strings.HasSuffix(filename, ".zst"): + if r.parallel >= 1 { + if rd, err = zstdReader(rd); err != nil { + return nil, fmt.Errorf("failed to init zst reader: %w", err) + } + } else { + if rd, err = zstd.NewReader(rd); err != nil { + return nil, fmt.Errorf("failed to init zst reader: %w", err) + } + } + } + + return rd, nil +} + +func startProfiling(cpuProfile string, memProfile string) { f, err := os.Create(cpuProfile) //nolint:gosec if err != nil { log.Fatal(err) @@ -160,14 +301,54 @@ func startProfiling(cpuProfile string) { time.Sleep(10 * time.Second) pprof.StopCPUProfile() println("CPU profile written to", cpuProfile) + + if memProfile != "" { + f, err := os.Create(memProfile) //nolint:gosec + if err != nil { + log.Fatal(err) + } + + if err := pprof.WriteHeapProfile(f); err != nil { + log.Fatal("writing heap profile:", err) + } + + println("Memory profile written to", memProfile) + } }() } +type stringFlags []string + +func (i *stringFlags) String() string { + return "my string representation" +} + +func (i *stringFlags) Set(value string) error { + *i = append(*i, value) + + return nil +} + // Main is the entry point for catp CLI tool. -func Main() error { //nolint:funlen,cyclop - grep := flag.String("grep", "", "grep pattern, may contain multiple patterns separated by \\|") +func Main() error { //nolint:funlen,cyclop,gocognit,gocyclo + var grep stringFlags + + flag.Var(&grep, "grep", "grep pattern, may contain multiple OR patterns separated by \\|,\n"+ + "each -grep value is added with AND logic, akin to extra '| grep foo',\n"+ + "for example, you can use '-grep bar\\|baz -grep foo' to only keep lines that have (bar OR baz) AND foo") + + parallel := flag.Int("parallel", 1, "number of parallel readers if multiple files are provided\n"+ + "lines from different files will go to output simultaneously"+ + "use 0 for multi-threaded zst decoder (slightly faster at cost of more CPU)") + cpuProfile := flag.String("dbg-cpu-prof", "", "write first 10 seconds of CPU profile to file") + memProfile := flag.String("dbg-mem-prof", "", "write heap profile to file after 10 seconds") output := flag.String("output", "", "output to file instead of STDOUT") + outDir := flag.String("out-dir", "", "output to directory instead of STDOUT\n"+ + "files will be written to out dir with original base names\n"+ + "disables output flag") + noProgress := flag.Bool("no-progress", false, "disable progress printing") + progressJSON := flag.String("progress-json", "", "write current progress to a file") ver := flag.Bool("version", false, "print version and exit") flag.Parse() @@ -179,41 +360,68 @@ func Main() error { //nolint:funlen,cyclop } if *cpuProfile != "" { - startProfiling(*cpuProfile) + startProfiling(*cpuProfile, *memProfile) defer pprof.StopCPUProfile() } r := &runner{} - r.output = os.Stdout + r.parallel = *parallel + r.outDir = *outDir - if *output != "" { + if *output != "" && *outDir == "" { //nolint:nestif out, err := os.Create(*output) if err != nil { return fmt.Errorf("failed to create output file %s: %w", *output, err) } - r.output = out + w := bufio.NewWriterSize(out, 64*1024) + r.output = w defer func() { + if err := w.Flush(); err != nil { + log.Fatalf("failed to flush STDOUT buffer: %s", err) + } + if err := out.Close(); err != nil { log.Fatalf("failed to close output file %s: %s", *output, err) } }() + } else { + w := bufio.NewWriterSize(os.Stdout, 64*1024) + r.output = w + + defer func() { + if err := w.Flush(); err != nil { + log.Fatalf("failed to flush STDOUT buffer: %s", err) + } + }() } - if *grep != "" { - for _, s := range strings.Split(*grep, "\\|") { - r.grep = append(r.grep, []byte(s)) + if len(grep) > 0 { + for _, andGrep := range grep { + var og [][]byte + for _, orGrep := range strings.Split(andGrep, "\\|") { + og = append(og, []byte(orGrep)) + } + + r.grep = append(r.grep, og) } } r.sizes = make(map[string]int64) + r.progressJSON = *progressJSON r.pr = &progress.Progress{ Interval: 5 * time.Second, Print: func(status progress.Status) { - println(r.st(status)) + s := r.st(status) + + if *noProgress { + return + } + + println(s) }, } @@ -229,9 +437,49 @@ func Main() error { //nolint:funlen,cyclop r.sizes[fn] = st.Size() } - for i := 0; i < flag.NArg(); i++ { - if err := r.cat(flag.Arg(i)); err != nil { - return err + if *parallel >= 2 { + pr := r.pr + pr.Start(func(t *progress.Task) { + t.TotalBytes = func() int64 { return r.totalBytes } + t.CurrentBytes = func() int64 { return atomic.LoadInt64(&r.currentBytes) } + t.CurrentLines = func() int64 { return atomic.LoadInt64(&r.currentLines) } + t.Task = "all" + t.PrintOnStart = true + }) + + sem := make(chan struct{}, *parallel) + errs := make(chan error, 1) + + for i := 0; i < flag.NArg(); i++ { + i := i + select { + case err := <-errs: + return err + case sem <- struct{}{}: + } + + go func() { + defer func() { + <-sem + }() + + if err := r.cat(flag.Arg(i)); err != nil { + errs <- err + } + }() + } + + // Wait for goroutines to finish by acquiring all slots. + for i := 0; i < cap(sem); i++ { + sem <- struct{}{} + } + + pr.Stop() + } else { + for i := 0; i < flag.NArg(); i++ { + if err := r.cat(flag.Arg(i)); err != nil { + return err + } } } diff --git a/cmd/catp/catp/cgo_zstd.go b/cmd/catp/catp/cgo_zstd.go new file mode 100644 index 0000000..d00213e --- /dev/null +++ b/cmd/catp/catp/cgo_zstd.go @@ -0,0 +1,13 @@ +//go:build cgo_zstd + +package catp + +import ( + "io" + + "github.com/DataDog/zstd" +) + +func zstdReader(rd io.Reader) (io.Reader, error) { + return zstd.NewReader(rd), nil +} diff --git a/cmd/catp/catp/zstd.go b/cmd/catp/catp/zstd.go new file mode 100644 index 0000000..472f7d3 --- /dev/null +++ b/cmd/catp/catp/zstd.go @@ -0,0 +1,18 @@ +//go:build !cgo_zstd + +package catp + +import ( + "fmt" + "io" + + "github.com/klauspost/compress/zstd" +) + +func zstdReader(rd io.Reader) (io.Reader, error) { + if rd, err := zstd.NewReader(rd, zstd.WithDecoderConcurrency(1)); err == nil { + return rd, nil + } else { //nolint:revive + return nil, fmt.Errorf("failed to init zst reader: %w", err) + } +} diff --git a/cmd/catp/default.pgo b/cmd/catp/default.pgo index 699f91d..d57301b 100644 Binary files a/cmd/catp/default.pgo and b/cmd/catp/default.pgo differ diff --git a/go.mod b/go.mod index e48be65..538a66b 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,10 @@ module github.com/bool64/progress -go 1.20 +go 1.21 require ( - github.com/bool64/dev v0.2.31 - github.com/klauspost/compress v1.16.7 + github.com/DataDog/zstd v1.5.5 + github.com/bool64/dev v0.2.33 + github.com/klauspost/compress v1.17.4 github.com/klauspost/pgzip v1.2.6 ) diff --git a/go.sum b/go.sum index 8e4258d..9c457dc 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ -github.com/bool64/dev v0.2.31 h1:OS57EqYaYe2M/2bw9uhDCIFiZZwywKFS/4qMLN6JUmQ= -github.com/bool64/dev v0.2.31/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= +github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/bool64/dev v0.2.33 h1:ETAcSa8H9w4talcCdSQCCnLX7PMHmuxdLcDl6TpSDj4= +github.com/bool64/dev v0.2.33/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU= github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= diff --git a/progress.go b/progress.go index 314f8a1..6de875e 100644 --- a/progress.go +++ b/progress.go @@ -11,14 +11,14 @@ import ( // Status describes current progress. type Status struct { - Task string - DonePercent float64 - LinesCompleted int64 - SpeedMBPS float64 - SpeedLPS float64 - Elapsed time.Duration - Remaining time.Duration - Metrics []Metric + Task string `json:"task"` + DonePercent float64 `json:"done_percent"` + LinesCompleted int64 `json:"lines_completed"` + SpeedMBPS float64 `json:"speed_mbps"` + SpeedLPS float64 `json:"speed_lps"` + Elapsed time.Duration `json:"-"` + Remaining time.Duration `json:"-"` + Metrics []Metric `json:"-"` } // Progress reports reading performance. @@ -104,6 +104,7 @@ type Task struct { CurrentLines func() int64 Task string Continue bool + PrintOnStart bool } // Start spawns background progress reporter. @@ -168,6 +169,13 @@ func (p *Progress) startPrinter(interval time.Duration) { } } }() + + if p.task.PrintOnStart { + go func() { + time.Sleep(time.Millisecond) + p.printStatus(false) + }() + } } // AddMetrics adds more metrics to progress status message. @@ -195,10 +203,14 @@ func (p *Progress) printStatus(last bool) { s.SpeedMBPS = (b / s.Elapsed.Seconds()) / (1024 * 1024) s.SpeedLPS = float64(s.LinesCompleted) / s.Elapsed.Seconds() - s.Remaining = time.Duration(float64(100*s.Elapsed)/s.DonePercent) - s.Elapsed - s.Remaining = s.Remaining.Truncate(time.Second) + if s.DonePercent > 0 { + s.Remaining = time.Duration(float64(100*s.Elapsed)/s.DonePercent) - s.Elapsed + s.Remaining = s.Remaining.Truncate(time.Second) + } else { + s.Remaining = 0 + } - if s.Remaining > 100*time.Millisecond || last { + if s.Remaining > 100*time.Millisecond || s.Remaining == 0 || last { p.prnt(s) } } @@ -232,72 +244,116 @@ func (p *Progress) Lines() int64 { return p.continuedLines } +// NewCountingReader wraps an io.Reader with counters of bytes and lines. +func NewCountingReader(r io.Reader) *CountingReader { + cr := &CountingReader{ + Reader: r, + } + cr.lines = new(int64) + cr.bytes = new(int64) + + return cr +} + // CountingReader wraps io.Reader to count bytes. type CountingReader struct { Reader io.Reader + sharedCounters +} + +type sharedCounters struct { + lines *int64 + bytes *int64 - lines int64 - readBytes int64 + localBytes int64 + localLines int64 } -// Read reads and counts bytes. -func (cr *CountingReader) Read(p []byte) (n int, err error) { - n, err = cr.Reader.Read(p) +func (cr *sharedCounters) SetLines(lines *int64) { + cr.lines = lines +} + +func (cr *sharedCounters) SetBytes(bytes *int64) { + cr.bytes = bytes +} - atomic.AddInt64(&cr.readBytes, int64(n)) +func (cr *sharedCounters) count(n int, p []byte) { + cr.localBytes += int64(n) + + if cr.localBytes > 100000 && cr.bytes != nil { + atomic.AddInt64(cr.bytes, cr.localBytes) + cr.localBytes = 0 + } + + if cr.lines == nil { + return + } for i := 0; i < n; i++ { if p[i] == '\n' { - atomic.AddInt64(&cr.lines, 1) + cr.localLines++ + + if cr.localLines > 1000 { + atomic.AddInt64(cr.lines, cr.localLines) + cr.localLines = 0 + } } } +} + +// Read reads and counts bytes. +func (cr *CountingReader) Read(p []byte) (n int, err error) { + n, err = cr.Reader.Read(p) + cr.count(n, p) return n, err } -// Bytes returns number of read bytes. -func (cr *CountingReader) Bytes() int64 { - return atomic.LoadInt64(&cr.readBytes) +func (cr *sharedCounters) Close() { + if cr.localBytes > 0 && cr.bytes != nil { + atomic.AddInt64(cr.bytes, cr.localBytes) + cr.localBytes = 0 + } + + if cr.localLines > 0 && cr.lines != nil { + atomic.AddInt64(cr.lines, cr.localLines) + cr.localLines = 0 + } } -// Lines returns number of read lines. -func (cr *CountingReader) Lines() int64 { - return atomic.LoadInt64(&cr.lines) +// Bytes returns number of processed bytes. +func (cr *sharedCounters) Bytes() int64 { + return atomic.LoadInt64(cr.bytes) +} + +// Lines returns number of processed lines. +func (cr *sharedCounters) Lines() int64 { + return atomic.LoadInt64(cr.lines) +} + +// NewCountingWriter wraps an io.Writer with counters of bytes and lines. +func NewCountingWriter(w io.Writer) *CountingWriter { + cw := &CountingWriter{Writer: w} + cw.lines = new(int64) + cw.bytes = new(int64) + + return cw } // CountingWriter wraps io.Writer to count bytes. type CountingWriter struct { Writer io.Writer - - lines int64 - writtenBytes int64 + sharedCounters } // Write writes and counts bytes. func (cr *CountingWriter) Write(p []byte) (n int, err error) { n, err = cr.Writer.Write(p) - - atomic.AddInt64(&cr.writtenBytes, int64(n)) - - for i := 0; i < n; i++ { - if p[i] == '\n' { - atomic.AddInt64(&cr.lines, 1) - } - } + cr.count(n, p) return n, err } -// Bytes returns number of written bytes. -func (cr *CountingWriter) Bytes() int64 { - return atomic.LoadInt64(&cr.writtenBytes) -} - -// Lines returns number of written bytes. -func (cr *CountingWriter) Lines() int64 { - return atomic.LoadInt64(&cr.lines) -} - // MetricsExposer provides metric counters. type MetricsExposer interface { Metrics() []Metric