Skip to content

Commit

Permalink
pushing possible fix for dying connections using ping and pong mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
zephinzer committed Jun 30, 2021
1 parent 44f87bf commit c331fdc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 11 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Configurations can be set via flags or environment variables. To view available
| Arguments | `--arguments` | `ARGUMENTS` | `"-l"` | Comma delimited list of arguments that should be passed to the target binary |
| Command | `--command` | `COMMAND` | `"/bin/bash"` | Absolute path to the binary to run |
| Connection error limit | `--connection-error-limit` | `CONNECTION_ERROR_LIMIT` | `10` | Number of times a connection should be re-attempted by the server to the XTerm.js frontend before the connection is considered dead and shut down |
| Keepalive ping timeout | `--keepalive-ping-timeout` | `KEEPALIVE_PING_TIMEOUT` | `20` | Maximum duration in seconds between a ping and pong message to tolerate |
| Maximum buffer size in bytes | `--max-buffer-size-bytes` | `MAX_BUFFER_SIZE_BYTES` | `512` | Maximum length of input from the browser terminal |
| Log format | `--log-format` | `LOG_FORMAT` | `"text"` | Format with which to output logs, one of `"json"` or `"text"` |
| Log level | `--log-level` | `LOG_LEVEL` | `"debug"` | Minimum level of logs to output, one of `"trace"`, `"debug"`, `"info"`, `"warn"`, `"error"` |
Expand Down
5 changes: 5 additions & 0 deletions cmd/cloudshell/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ var conf = config.Map{
Usage: "number of times a connection should be re-attempted before it's considered dead",
Shorthand: "l",
},
"keepalive-ping-timeout": &config.Int{
Default: 20,
Usage: "maximum duration in seconds between a ping message and its response to tolerate",
Shorthand: "k",
},
"max-buffer-size-bytes": &config.Int{
Default: 512,
Usage: "maximum length of input from terminal",
Expand Down
5 changes: 4 additions & 1 deletion cmd/cloudshell/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func runE(_ *cobra.Command, _ []string) error {
connectionErrorLimit := conf.GetInt("connection-error-limit")
arguments := conf.GetStringSlice("arguments")
allowedHostnames := conf.GetStringSlice("allowed-hostnames")
keepalivePingTimeout := time.Duration(conf.GetInt("keepalive-ping-timeout")) * time.Second
maxBufferSizeBytes := conf.GetInt("max-buffer-size-bytes")
pathLiveness := conf.GetString("path-liveness")
pathMetrics := conf.GetString("path-metrics")
Expand All @@ -64,6 +65,7 @@ func runE(_ *cobra.Command, _ []string) error {

log.Infof("allowed hosts : ['%s']", strings.Join(allowedHostnames, "', '"))
log.Infof("connection error limit: %v", connectionErrorLimit)
log.Infof("keepalive ping timeout: %v", keepalivePingTimeout)
log.Infof("max buffer size : %v bytes", maxBufferSizeBytes)
log.Infof("server address : '%s' ", serverAddress)
log.Infof("server port : %v", serverPort)
Expand All @@ -86,7 +88,8 @@ func runE(_ *cobra.Command, _ []string) error {
createRequestLog(r, map[string]interface{}{"connection_uuid": connectionUUID}).Infof("created logger for connection '%s'", connectionUUID)
return createRequestLog(nil, map[string]interface{}{"connection_uuid": connectionUUID})
},
MaxBufferSizeBytes: maxBufferSizeBytes,
KeepalivePingTimeout: keepalivePingTimeout,
MaxBufferSizeBytes: maxBufferSizeBytes,
}
router.HandleFunc(pathXTermJS, xtermjs.GetHandler(xtermjsHandlerOptions))

Expand Down
45 changes: 35 additions & 10 deletions pkg/xtermjs/handler_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os/exec"
"strings"
"sync"
"time"

"github.com/creack/pty"
"github.com/google/uuid"
Expand All @@ -32,8 +33,11 @@ type HandlerOpts struct {
// CreateLogger when specified should return a logger that the handler will use.
// The string argument being passed in will be a unique identifier for the
// current connection. When not specified, logs will be sent to stdout
CreateLogger func(string, *http.Request) Logger
MaxBufferSizeBytes int
CreateLogger func(string, *http.Request) Logger
// KeepalivePingTimeout defines the maximum duration between which a ping and pong
// cycle should be tolerated, beyond this the connection should be deemed dead
KeepalivePingTimeout time.Duration
MaxBufferSizeBytes int
}

func GetHandler(opts HandlerOpts) func(http.ResponseWriter, *http.Request) {
Expand All @@ -43,6 +47,10 @@ func GetHandler(opts HandlerOpts) func(http.ResponseWriter, *http.Request) {
connectionErrorLimit = DefaultConnectionErrorLimit
}
maxBufferSizeBytes := opts.MaxBufferSizeBytes
keepalivePingTimeout := opts.KeepalivePingTimeout
if keepalivePingTimeout <= time.Second {
keepalivePingTimeout = 20 * time.Second
}

connectionUUID, err := uuid.NewUUID()
if err != nil {
Expand Down Expand Up @@ -98,6 +106,28 @@ func GetHandler(opts HandlerOpts) func(http.ResponseWriter, *http.Request) {
var waiter sync.WaitGroup
waiter.Add(1)

// this is a keep-alive loop that ensures connection does not hang-up itself
lastPongTime := time.Now()
connection.SetPongHandler(func(msg string) error {
lastPongTime = time.Now()
return nil
})
go func() {
for {
if err := connection.WriteMessage(websocket.PingMessage, []byte("keepalive")); err != nil {
clog.Warn("failed to write ping message")
return
}
time.Sleep(keepalivePingTimeout / 2)
if time.Now().Sub(lastPongTime) > keepalivePingTimeout {
clog.Warn("failed to get response from ping, triggering disconnect now...")
waiter.Done()
return
}
clog.Debug("received response from ping successfully")
}
}()

// tty >> xterm.js
go func() {
errorCounter := 0
Expand Down Expand Up @@ -133,20 +163,15 @@ func GetHandler(opts HandlerOpts) func(http.ResponseWriter, *http.Request) {
go func() {
for {
// data processing
messageType, reader, err := connection.NextReader()
messageType, data, err := connection.ReadMessage()
if err != nil {
if !connectionClosed {
clog.Warnf("failed to get next reader: %s", err)
}
return
}
dataBuffer := make([]byte, maxBufferSizeBytes)
dataLength, err := reader.Read(dataBuffer)
if err != nil {
clog.Warn("failed to get data from buffer: %s", err)
return
}
dataBuffer = bytes.Trim(dataBuffer, "\x00")
dataLength := len(data)
dataBuffer := bytes.Trim(data, "\x00")
dataType, ok := WebsocketMessageType[messageType]
if !ok {
dataType = "uunknown"
Expand Down

0 comments on commit c331fdc

Please sign in to comment.