From 44db31b9cc0524dd5be798894a8330580659bf9f Mon Sep 17 00:00:00 2001 From: Sebastiaan van Stijn Date: Sat, 28 Dec 2024 17:30:39 +0100 Subject: [PATCH] remove pkg/broadcaster and make it internal to container/streams This package was only used internally in container/streams and had no external consumers. Signed-off-by: Sebastiaan van Stijn --- container/stream/streams.go | 13 ++++++------- {pkg/broadcaster => container/stream}/unbuffered.go | 12 ++++++------ .../stream}/unbuffered_test.go | 8 ++++---- 3 files changed, 16 insertions(+), 17 deletions(-) rename {pkg/broadcaster => container/stream}/unbuffered.go (72%) rename {pkg/broadcaster => container/stream}/unbuffered_test.go (95%) diff --git a/container/stream/streams.go b/container/stream/streams.go index 034548edb9fbe..280cafe27984b 100644 --- a/container/stream/streams.go +++ b/container/stream/streams.go @@ -9,7 +9,6 @@ import ( "github.com/containerd/containerd/cio" "github.com/containerd/log" - "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/pools" ) @@ -25,8 +24,8 @@ import ( // a kind of "broadcaster". type Config struct { wg sync.WaitGroup - stdout *broadcaster.Unbuffered - stderr *broadcaster.Unbuffered + stdout *unbuffered + stderr *unbuffered stdin io.ReadCloser stdinPipe io.WriteCloser dio *cio.DirectIO @@ -36,18 +35,18 @@ type Config struct { // the standard err and standard out to new unbuffered broadcasters. func NewConfig() *Config { return &Config{ - stderr: new(broadcaster.Unbuffered), - stdout: new(broadcaster.Unbuffered), + stderr: new(unbuffered), + stdout: new(unbuffered), } } // Stdout returns the standard output in the configuration. -func (c *Config) Stdout() *broadcaster.Unbuffered { +func (c *Config) Stdout() io.Writer { return c.stdout } // Stderr returns the standard error in the configuration. -func (c *Config) Stderr() *broadcaster.Unbuffered { +func (c *Config) Stderr() io.Writer { return c.stderr } diff --git a/pkg/broadcaster/unbuffered.go b/container/stream/unbuffered.go similarity index 72% rename from pkg/broadcaster/unbuffered.go rename to container/stream/unbuffered.go index 6bb285123f20f..ff6f07924b00d 100644 --- a/pkg/broadcaster/unbuffered.go +++ b/container/stream/unbuffered.go @@ -1,18 +1,18 @@ -package broadcaster // import "github.com/docker/docker/pkg/broadcaster" +package stream import ( "io" "sync" ) -// Unbuffered accumulates multiple io.WriteCloser by stream. -type Unbuffered struct { +// unbuffered accumulates multiple io.WriteCloser by stream. +type unbuffered struct { mu sync.Mutex writers []io.WriteCloser } // Add adds new io.WriteCloser. -func (w *Unbuffered) Add(writer io.WriteCloser) { +func (w *unbuffered) Add(writer io.WriteCloser) { w.mu.Lock() w.writers = append(w.writers, writer) w.mu.Unlock() @@ -20,7 +20,7 @@ func (w *Unbuffered) Add(writer io.WriteCloser) { // Write writes bytes to all writers. Failed writers will be evicted during // this call. -func (w *Unbuffered) Write(p []byte) (n int, err error) { +func (w *unbuffered) Write(p []byte) (n int, err error) { w.mu.Lock() var evict []int for i, sw := range w.writers { @@ -38,7 +38,7 @@ func (w *Unbuffered) Write(p []byte) (n int, err error) { // Clean closes and removes all writers. Last non-eol-terminated part of data // will be saved. -func (w *Unbuffered) Clean() error { +func (w *unbuffered) Clean() error { w.mu.Lock() for _, sw := range w.writers { sw.Close() diff --git a/pkg/broadcaster/unbuffered_test.go b/container/stream/unbuffered_test.go similarity index 95% rename from pkg/broadcaster/unbuffered_test.go rename to container/stream/unbuffered_test.go index c510584aa355f..7042937bbe2b0 100644 --- a/pkg/broadcaster/unbuffered_test.go +++ b/container/stream/unbuffered_test.go @@ -1,4 +1,4 @@ -package broadcaster // import "github.com/docker/docker/pkg/broadcaster" +package stream import ( "bytes" @@ -28,7 +28,7 @@ func (dw *dummyWriter) Close() error { } func TestUnbuffered(t *testing.T) { - writer := new(Unbuffered) + writer := new(unbuffered) // Test 1: Both bufferA and bufferB should contain "foo" bufferA := &dummyWriter{} @@ -114,7 +114,7 @@ func (d devNullCloser) Write(buf []byte) (int, error) { // This test checks for races. It is only useful when run with the race detector. func TestRaceUnbuffered(t *testing.T) { - writer := new(Unbuffered) + writer := new(unbuffered) c := make(chan bool) go func() { writer.Add(devNullCloser(0)) @@ -125,7 +125,7 @@ func TestRaceUnbuffered(t *testing.T) { } func BenchmarkUnbuffered(b *testing.B) { - writer := new(Unbuffered) + writer := new(unbuffered) setUpWriter := func() { for i := 0; i < 100; i++ { writer.Add(devNullCloser(0))