Skip to content

Commit c1d832d

Browse files
authored
Merge pull request #16118 from vrothberg/proxy-mcproxface
play kube: notifyproxy: listen before starting the pod
2 parents a344928 + 7b84a3a commit c1d832d

File tree

1 file changed

+74
-47
lines changed

1 file changed

+74
-47
lines changed

pkg/systemd/notifyproxy/notifyproxy.go

+74-47
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,16 @@ type NotifyProxy struct {
4545
connection *net.UnixConn
4646
socketPath string
4747
container Container // optional
48+
49+
// Channels for synchronizing the goroutine waiting for the READY
50+
// message and the one checking if the optional container is still
51+
// running.
52+
errorChan chan error
53+
readyChan chan bool
4854
}
4955

50-
// New creates a NotifyProxy. The specified temp directory can be left empty.
56+
// New creates a NotifyProxy that starts listening immediately. The specified
57+
// temp directory can be left empty.
5158
func New(tmpDir string) (*NotifyProxy, error) {
5259
tempFile, err := os.CreateTemp(tmpDir, "-podman-notify-proxy.sock")
5360
if err != nil {
@@ -69,7 +76,60 @@ func New(tmpDir string) (*NotifyProxy, error) {
6976
return nil, err
7077
}
7178

72-
return &NotifyProxy{connection: conn, socketPath: socketPath}, nil
79+
errorChan := make(chan error, 1)
80+
readyChan := make(chan bool, 1)
81+
82+
proxy := &NotifyProxy{
83+
connection: conn,
84+
socketPath: socketPath,
85+
errorChan: errorChan,
86+
readyChan: readyChan,
87+
}
88+
89+
// Start waiting for the READY message in the background. This way,
90+
// the proxy can be created prior to starting the container and
91+
// circumvents a race condition on writing/reading on the socket.
92+
proxy.waitForReady()
93+
94+
return proxy, nil
95+
}
96+
97+
// waitForReady waits for the READY message in the background. The goroutine
98+
// returns on receiving READY or when the socket is closed.
99+
func (p *NotifyProxy) waitForReady() {
100+
go func() {
101+
// Read until the `READY` message is received or the connection
102+
// is closed.
103+
const bufferSize = 1024
104+
sBuilder := strings.Builder{}
105+
for {
106+
for {
107+
buffer := make([]byte, bufferSize)
108+
num, err := p.connection.Read(buffer)
109+
if err != nil {
110+
if !errors.Is(err, io.EOF) {
111+
p.errorChan <- err
112+
return
113+
}
114+
}
115+
sBuilder.Write(buffer[:num])
116+
if num != bufferSize || buffer[num-1] == '\n' {
117+
// Break as we read an entire line that
118+
// we can inspect for the `READY`
119+
// message.
120+
break
121+
}
122+
}
123+
124+
for _, line := range strings.Split(sBuilder.String(), "\n") {
125+
if line == daemon.SdNotifyReady {
126+
p.readyChan <- true
127+
return
128+
}
129+
}
130+
sBuilder.Reset()
131+
}
132+
}()
73133
}
74134

75135
// SocketPath returns the path of the socket the proxy is listening on.
@@ -105,54 +165,21 @@ type Container interface {
105165
// the waiting gets canceled and ErrNoReadyMessage is returned.
106166
func (p *NotifyProxy) WaitAndClose() error {
107167
defer func() {
168+
// Closing the socket/connection makes sure that the other
169+
// goroutine reading/waiting for the READY message returns.
108170
if err := p.close(); err != nil {
109171
logrus.Errorf("Closing notify proxy: %v", err)
110172
}
111173
}()
112174

113-
// Since reading from the connection is blocking, we need to spin up two
114-
// goroutines. One waiting for the `READY` message, the other waiting
115-
// for the container to stop running.
116-
errorChan := make(chan error, 1)
117-
readyChan := make(chan bool, 1)
118-
119-
go func() {
120-
// Read until the `READY` message is received or the connection
121-
// is closed.
122-
const bufferSize = 1024
123-
sBuilder := strings.Builder{}
124-
for {
125-
for {
126-
buffer := make([]byte, bufferSize)
127-
num, err := p.connection.Read(buffer)
128-
if err != nil {
129-
if !errors.Is(err, io.EOF) {
130-
errorChan <- err
131-
return
132-
}
133-
}
134-
sBuilder.Write(buffer[:num])
135-
if num != bufferSize || buffer[num-1] == '\n' {
136-
// Break as we read an entire line that
137-
// we can inspect for the `READY`
138-
// message.
139-
break
140-
}
141-
}
142-
143-
for _, line := range strings.Split(sBuilder.String(), "\n") {
144-
if line == daemon.SdNotifyReady {
145-
readyChan <- true
146-
return
147-
}
148-
}
149-
sBuilder.Reset()
150-
}
151-
}()
152-
175+
// If the proxy has a container we need to watch it as it may exit
176+
// without sending a READY message. The goroutine below returns when
177+
// the container exits OR when the function returns (see deferred the
178+
// cancel()) in which case we either we've either received the READY
179+
// message or encountered an error reading from the socket.
153180
if p.container != nil {
154181
// Create a cancellable context to make sure the goroutine
155-
// below terminates.
182+
// below terminates on function return.
156183
ctx, cancel := context.WithCancel(context.Background())
157184
defer cancel()
158185
go func() {
@@ -162,11 +189,11 @@ func (p *NotifyProxy) WaitAndClose() error {
162189
default:
163190
state, err := p.container.State()
164191
if err != nil {
165-
errorChan <- err
192+
p.errorChan <- err
166193
return
167194
}
168195
if state != define.ContainerStateRunning {
169-
errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
196+
p.errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
170197
return
171198
}
172199
time.Sleep(time.Second)
@@ -176,9 +203,9 @@ func (p *NotifyProxy) WaitAndClose() error {
176203

177204
// Wait for the ready/error channel.
178205
select {
179-
case <-readyChan:
206+
case <-p.readyChan:
180207
return nil
181-
case err := <-errorChan:
208+
case err := <-p.errorChan:
182209
return err
183210
}
184211
}

0 commit comments

Comments
 (0)