Skip to content

Commit 978fcce

Browse files
authored
Honor traffic_tunnel_endpoints and test tunneling E2E (viamrobotics#4839)
RSDK-9852 RSDK-9763
1 parent a07ada6 commit 978fcce

File tree

3 files changed

+176
-18
lines changed

3 files changed

+176
-18
lines changed

robot/server/server.go

+19-18
Original file line numberDiff line numberDiff line change
@@ -72,24 +72,25 @@ func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error {
7272

7373
dialTimeout := defaultTunnelConnectionTimeout
7474

75-
// TODO(RSDK-5763): Start rejecting requests to unavailable ports once `app` has been
76-
// updated to propagate `traffic_tunnel_endpoints` configs.
77-
/*
78-
var destAllowed bool
79-
// Ensure destination port is available; otherwise error.
80-
for _, tte := range s.robot.ListTunnels() {
81-
if int(req.DestinationPort) == tte.Port {
82-
destAllowed = true
83-
if tte.ConnectionTimeout != 0 {
84-
dialTimeout = tte.ConnectionTimeout
85-
}
86-
break
87-
}
88-
}
89-
if !destAllowed {
90-
return fmt.Errorf("tunnel not available at port %d", req.DestinationPort)
91-
}
92-
*/
75+
// Ensure destination port is available; otherwise error.
76+
var destAllowed bool
77+
ttes, err := s.robot.ListTunnels(srv.Context())
78+
if err != nil {
79+
return err
80+
}
81+
for _, tte := range ttes {
82+
if int(req.DestinationPort) == tte.Port {
83+
destAllowed = true
84+
if tte.ConnectionTimeout != 0 {
85+
// Honor specified timeout if one exists (0 is use-default.)
86+
dialTimeout = tte.ConnectionTimeout
87+
}
88+
break
89+
}
90+
}
91+
if !destAllowed {
92+
return fmt.Errorf("tunnel not available at port %d", req.DestinationPort)
93+
}
9394

9495
dest := strconv.Itoa(int(req.DestinationPort))
9596

tunnel/tunnel.go

+9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"io"
1010
"net"
11+
"strings"
1112

1213
"go.viam.com/rdk/logging"
1314
)
@@ -31,6 +32,14 @@ func filterError(ctx context.Context, err error, closeChan <-chan struct{}, logg
3132
logger.CDebugw(ctx, "expected EOF received")
3233
return nil
3334
}
35+
36+
// Depending on when the tunnel is closed, the server may not have a chance to send
37+
// trailers.
38+
if err != nil && strings.Contains(err.Error(),
39+
"server closed the stream without sending trailers") {
40+
return nil
41+
}
42+
3443
return err
3544
}
3645

web/server/entrypoint_test.go

+148
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/json"
77
"fmt"
8+
"net"
89
"os"
910
"os/exec"
1011
"path/filepath"
@@ -366,3 +367,150 @@ func TestMachineStateNoResources(t *testing.T) {
366367
cancel()
367368
wg.Wait()
368369
}
370+
371+
func TestTunnelE2E(t *testing.T) {
372+
// `TestTunnelE2E` attempts to send "Hello, World!" across a tunnel. The tunnel is:
373+
//
374+
// test-process <-> source-listener(localhost:23656) <-> machine(localhost:23655) <-> dest-listener(localhost:23654)
375+
376+
tunnelMsg := "Hello, World!"
377+
destPort := 23654
378+
destListenerAddr := net.JoinHostPort("localhost", strconv.Itoa(destPort))
379+
machineAddr := net.JoinHostPort("localhost", "23655")
380+
sourceListenerAddr := net.JoinHostPort("localhost", "23656")
381+
382+
logger := logging.NewTestLogger(t)
383+
ctx := context.Background()
384+
runServerCtx, runServerCtxCancel := context.WithCancel(ctx)
385+
var wg sync.WaitGroup
386+
387+
// Start "destination" listener.
388+
destListener, err := net.Listen("tcp", destListenerAddr)
389+
test.That(t, err, test.ShouldBeNil)
390+
defer func() {
391+
test.That(t, destListener.Close(), test.ShouldBeNil)
392+
}()
393+
394+
wg.Add(1)
395+
go func() {
396+
defer wg.Done()
397+
398+
logger.Infof("Listening on %s for tunnel message", destListenerAddr)
399+
conn, err := destListener.Accept()
400+
test.That(t, err, test.ShouldBeNil)
401+
defer func() {
402+
test.That(t, conn.Close(), test.ShouldBeNil)
403+
}()
404+
405+
bytes := make([]byte, 1024)
406+
n, err := conn.Read(bytes)
407+
test.That(t, err, test.ShouldBeNil)
408+
test.That(t, n, test.ShouldEqual, len(tunnelMsg))
409+
test.That(t, string(bytes), test.ShouldContainSubstring, tunnelMsg)
410+
logger.Info("Received expected tunnel message at", destListenerAddr)
411+
412+
// Write the same message back.
413+
n, err = conn.Write([]byte(tunnelMsg))
414+
test.That(t, err, test.ShouldBeNil)
415+
test.That(t, n, test.ShouldEqual, len(tunnelMsg))
416+
417+
// Cancel `runServerCtx` once message has made it all the way across and has been
418+
// echoed back. This should stop the `RunServer` goroutine below.
419+
runServerCtxCancel()
420+
}()
421+
422+
// Start a machine at `machineAddr` (`RunServer` in a goroutine.)
423+
wg.Add(1)
424+
go func() {
425+
defer wg.Done()
426+
427+
// Create a temporary config file.
428+
tempConfigFile, err := os.CreateTemp(t.TempDir(), "temp_config.json")
429+
test.That(t, err, test.ShouldBeNil)
430+
cfg := &config.Config{
431+
Network: config.NetworkConfig{
432+
NetworkConfigData: config.NetworkConfigData{
433+
TrafficTunnelEndpoints: []config.TrafficTunnelEndpoint{
434+
{
435+
Port: destPort, // allow tunneling to destination port
436+
},
437+
{
438+
Port: 65535, // allow tunneling to 65535
439+
ConnectionTimeout: time.Nanosecond, // specify an impossibly small timeout
440+
},
441+
},
442+
BindAddress: machineAddr,
443+
},
444+
},
445+
}
446+
cfgBytes, err := json.Marshal(&cfg)
447+
test.That(t, err, test.ShouldBeNil)
448+
test.That(t, os.WriteFile(tempConfigFile.Name(), cfgBytes, 0o755), test.ShouldBeNil)
449+
450+
args := []string{"viam-server", "-config", tempConfigFile.Name()}
451+
test.That(t, server.RunServer(runServerCtx, args, logger), test.ShouldBeNil)
452+
}()
453+
454+
// Open a robot client to `machineAddr`.
455+
rc := robottestutils.NewRobotClient(t, logger, machineAddr, time.Second)
456+
457+
// Test error paths for `Tunnel` with random `net.Conn`s.
458+
//
459+
// We will not be actually writing anything to/reading anything from the `net.Conn`, as
460+
// we only want to ensure that instantiation of the tunnel fails as expected.
461+
{
462+
googleConn, err := net.Dial("tcp", "google.com:443")
463+
test.That(t, err, test.ShouldBeNil)
464+
465+
// Assert that opening a tunnel to a disallowed port errors.
466+
err = rc.Tunnel(ctx, googleConn /* will be eventually closed by `Tunnel` */, 404)
467+
test.That(t, err, test.ShouldNotBeNil)
468+
test.That(t, err.Error(), test.ShouldContainSubstring, "tunnel not available at port")
469+
470+
googleConn, err = net.Dial("tcp", "google.com:443")
471+
test.That(t, err, test.ShouldBeNil)
472+
473+
// Assert that opening a tunnel to a port with a low `connection_timeout` results in a
474+
// timeout.
475+
err = rc.Tunnel(ctx, googleConn /* will be eventually closed by `Tunnel` */, 65535)
476+
test.That(t, err, test.ShouldNotBeNil)
477+
test.That(t, err.Error(), test.ShouldContainSubstring, "DeadlineExceeded")
478+
}
479+
480+
// Start "source" listener (a `RobotClient` running `Tunnel`.)
481+
sourceListener, err := net.Listen("tcp", sourceListenerAddr)
482+
test.That(t, err, test.ShouldBeNil)
483+
defer func() {
484+
test.That(t, sourceListener.Close(), test.ShouldBeNil)
485+
}()
486+
wg.Add(1)
487+
go func() {
488+
defer wg.Done()
489+
490+
logger.Infof("Connections opened at %s will be tunneled", sourceListenerAddr)
491+
conn, err := sourceListener.Accept()
492+
test.That(t, err, test.ShouldBeNil)
493+
494+
err = rc.Tunnel(ctx, conn /* will be eventually closed by `Tunnel` */, destPort)
495+
test.That(t, err, test.ShouldBeNil)
496+
}()
497+
498+
// Write `tunnelMsg` to "source" listener over TCP from this test process.
499+
conn, err := net.Dial("tcp", sourceListenerAddr)
500+
test.That(t, err, test.ShouldBeNil)
501+
defer func() {
502+
test.That(t, conn.Close(), test.ShouldBeNil)
503+
}()
504+
n, err := conn.Write([]byte(tunnelMsg))
505+
test.That(t, err, test.ShouldBeNil)
506+
test.That(t, n, test.ShouldEqual, len(tunnelMsg))
507+
508+
// Expect `tunnelMsg` to be written back.
509+
bytes := make([]byte, 1024)
510+
n, err = conn.Read(bytes)
511+
test.That(t, err, test.ShouldBeNil)
512+
test.That(t, n, test.ShouldEqual, len(tunnelMsg))
513+
test.That(t, string(bytes), test.ShouldContainSubstring, tunnelMsg)
514+
515+
wg.Wait()
516+
}

0 commit comments

Comments
 (0)