Skip to content

Commit 91a882e

Browse files
committed
Added code to improve timeout on blocked read.
Fixing unused change. Added test for deadline occurring.
1 parent c1a1e09 commit 91a882e

File tree

2 files changed

+141
-15
lines changed

2 files changed

+141
-15
lines changed

pkg/agent/client.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package agent
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"io"
2324
"net"
@@ -151,6 +152,9 @@ type Client struct {
151152
serviceAccountTokenPath string
152153

153154
warnOnChannelLimit bool
155+
156+
// Here for testing
157+
readBlockInterval time.Duration
154158
}
155159

156160
func newAgentClient(address, agentID, agentIdentifiers string, cs *ClientSet, opts ...grpc.DialOption) (*Client, int, error) {
@@ -166,6 +170,7 @@ func newAgentClient(address, agentID, agentIdentifiers string, cs *ClientSet, op
166170
serviceAccountTokenPath: cs.serviceAccountTokenPath,
167171
connManager: newConnectionManager(),
168172
warnOnChannelLimit: cs.warnOnChannelLimit,
173+
readBlockInterval: 15 * time.Second,
169174
}
170175
serverCount, err := a.Connect()
171176
if err != nil {
@@ -538,10 +543,19 @@ func (a *Client) remoteToProxy(connID int64, eConn *endpointConn) {
538543
}
539544

540545
for {
546+
select {
547+
case <-a.stopCh:
548+
return
549+
default:
550+
}
551+
timeout := time.Now().Add(a.readBlockInterval)
552+
eConn.conn.SetReadDeadline(timeout)
541553
n, err := eConn.conn.Read(buf[:])
542554
klog.V(5).InfoS("received data from remote", "bytes", n, "connectionID", connID)
543555

544-
if err == io.EOF {
556+
if errors.Is(err, os.ErrDeadlineExceeded) {
557+
continue
558+
} else if err == io.EOF {
545559
klog.V(2).InfoS("remote connection EOF", "connectionID", connID)
546560
return
547561
} else if err != nil {

pkg/agent/client_test.go

+126-14
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@ func TestServeData_HTTP(t *testing.T) {
4242
stopCh: stopCh,
4343
}
4444
testClient := &Client{
45-
connManager: newConnectionManager(),
46-
stopCh: stopCh,
47-
cs: cs,
45+
connManager: newConnectionManager(),
46+
stopCh: stopCh,
47+
cs: cs,
48+
readBlockInterval: 15 * time.Second,
4849
}
4950
testClient.stream, stream = pipe()
5051

@@ -133,7 +134,8 @@ func TestServeData_HTTP(t *testing.T) {
133134
waitForConnectionDeletion(t, testClient, connID)
134135
}
135136

136-
func TestClose_Client(t *testing.T) {
137+
func TestDelayedServedData_HTTP(t *testing.T) {
138+
var err error
137139
var stream agent.AgentService_ConnectClient
138140
stopCh := make(chan struct{})
139141
cs := &ClientSet{
@@ -144,6 +146,113 @@ func TestClose_Client(t *testing.T) {
144146
connManager: newConnectionManager(),
145147
stopCh: stopCh,
146148
cs: cs,
149+
// Set the readBlockInterval to a short value to check if
150+
// the agent can handle the SetReadDeadline.
151+
readBlockInterval: 1 * time.Second,
152+
}
153+
testClient.stream, stream = pipe()
154+
155+
// Start agent
156+
go testClient.Serve()
157+
defer close(stopCh)
158+
159+
// Start test http server as remote service
160+
expectedBody := "Hello, client"
161+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
162+
time.Sleep(4 * time.Second) // HTTPTest times out after 5 seconds.
163+
fmt.Fprint(w, expectedBody)
164+
}))
165+
defer ts.Close()
166+
167+
// Simulate sending KAS DIAL_REQ to (Agent) Client
168+
dialPacket := newDialPacket("tcp", ts.URL[len("http://"):], 111)
169+
err = stream.Send(dialPacket)
170+
if err != nil {
171+
t.Fatal(err.Error())
172+
}
173+
174+
// Expect receiving DIAL_RSP packet from (Agent) Client
175+
pkt, err := stream.Recv()
176+
if err != nil {
177+
t.Fatal(err.Error())
178+
}
179+
if pkt == nil {
180+
t.Fatal("unexpected nil packet")
181+
}
182+
if pkt.Type != client.PacketType_DIAL_RSP {
183+
t.Errorf("expect PacketType_DIAL_RSP; got %v", pkt.Type)
184+
}
185+
dialRsp := pkt.Payload.(*client.Packet_DialResponse)
186+
connID := dialRsp.DialResponse.ConnectID
187+
if dialRsp.DialResponse.Random != 111 {
188+
t.Errorf("expect random=111; got %v", dialRsp.DialResponse.Random)
189+
}
190+
191+
// Send Data (HTTP Request) via (Agent) Client to the test http server
192+
t.Logf("Sending data packet at %v", time.Now())
193+
dataPacket := newDataPacket(connID, []byte("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"))
194+
err = stream.Send(dataPacket)
195+
if err != nil {
196+
t.Error(err.Error())
197+
}
198+
t.Logf("Sent data packet at %v", time.Now())
199+
200+
// Expect receiving http response via (Agent) Client
201+
t.Logf("Receiving http response at %v", time.Now())
202+
pkt, _ = stream.Recv()
203+
if pkt == nil {
204+
t.Fatalf("unexpected nil packet at %v", time.Now())
205+
}
206+
if pkt.Type != client.PacketType_DATA {
207+
t.Errorf("expect PacketType_DATA; got %v", pkt.Type)
208+
}
209+
data := pkt.Payload.(*client.Packet_Data).Data.Data
210+
211+
// Verify response data
212+
//
213+
// HTTP/1.1 200 OK\r\n
214+
// Date: Tue, 07 May 2019 06:44:57 GMT\r\n
215+
// Content-Length: 14\r\n
216+
// Content-Type: text/plain; charset=utf-8\r\n
217+
// \r\n
218+
// Hello, client
219+
headAndBody := strings.Split(string(data), "\r\n")
220+
if body := headAndBody[len(headAndBody)-1]; body != expectedBody {
221+
t.Errorf("expect body %v; got %v", expectedBody, body)
222+
}
223+
224+
// Force close the test server which will cause remote connection gets droped
225+
ts.Close()
226+
227+
// Verify receiving CLOSE_RSP
228+
pkt, _ = stream.Recv()
229+
if pkt == nil {
230+
t.Fatal("unexpected nil packet")
231+
}
232+
if pkt.Type != client.PacketType_CLOSE_RSP {
233+
t.Errorf("expect PacketType_CLOSE_RSP; got %v", pkt.Type)
234+
}
235+
closeErr := pkt.Payload.(*client.Packet_CloseResponse).CloseResponse.Error
236+
if closeErr != "" {
237+
t.Errorf("expect nil closeErr; got %v", closeErr)
238+
}
239+
240+
// Verify internal state is consistent
241+
waitForConnectionDeletion(t, testClient, connID)
242+
}
243+
244+
func TestClose_Client(t *testing.T) {
245+
var stream agent.AgentService_ConnectClient
246+
stopCh := make(chan struct{})
247+
cs := &ClientSet{
248+
clients: make(map[string]*Client),
249+
stopCh: stopCh,
250+
}
251+
testClient := &Client{
252+
connManager: newConnectionManager(),
253+
stopCh: stopCh,
254+
cs: cs,
255+
readBlockInterval: 15 * time.Second,
147256
}
148257
testClient.stream, stream = pipe()
149258

@@ -229,9 +338,10 @@ func TestConnectionMismatch(t *testing.T) {
229338
stopCh: stopCh,
230339
}
231340
testClient := &Client{
232-
connManager: newConnectionManager(),
233-
stopCh: stopCh,
234-
cs: cs,
341+
connManager: newConnectionManager(),
342+
stopCh: stopCh,
343+
cs: cs,
344+
readBlockInterval: 15 * time.Second,
235345
}
236346
testClient.stream, stream = pipe()
237347

@@ -291,9 +401,10 @@ func TestFailedSend_DialResp_GRPC(t *testing.T) {
291401
stopCh: stopCh,
292402
}
293403
testClient := &Client{
294-
connManager: newConnectionManager(),
295-
stopCh: stopCh,
296-
cs: cs,
404+
connManager: newConnectionManager(),
405+
stopCh: stopCh,
406+
cs: cs,
407+
readBlockInterval: 15 * time.Second,
297408
}
298409
defer func() {
299410
close(stopCh)
@@ -353,10 +464,11 @@ func TestDrain(t *testing.T) {
353464
stopCh: stopCh,
354465
}
355466
testClient := &Client{
356-
connManager: newConnectionManager(),
357-
drainCh: drainCh,
358-
stopCh: stopCh,
359-
cs: cs,
467+
connManager: newConnectionManager(),
468+
drainCh: drainCh,
469+
stopCh: stopCh,
470+
cs: cs,
471+
readBlockInterval: 15 * time.Second,
360472
}
361473
testClient.stream, stream = pipe()
362474

0 commit comments

Comments
 (0)