From 3ee407b1d9c207f8bb4c34e9e84a1e2e28d8d9e9 Mon Sep 17 00:00:00 2001 From: its-a-feature Date: Thu, 8 Feb 2024 10:20:08 -0600 Subject: [PATCH] v3.2.17.1 added some better checking and deadlock preventing checks around socks traffic --- CHANGELOG.MD | 7 + VERSION | 2 +- .../rabbitmq/util_agent_message_push_c2.go | 14 +- .../src/rabbitmq/utils_proxy_traffic.go | 333 ++++++++++-------- 4 files changed, 207 insertions(+), 149 deletions(-) diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 8f97236d1..01859f141 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [3.2.17.1] - 2024-02-08 + +### Changed + +- Adjusted the SOCKS handling functions to use non-blocking sends when dealing with channels to help prevent deadlock +- Adjusted the SOCKS channels to have increased capacity + ## [3.2.17] - 2024-02-06 ### Changed diff --git a/VERSION b/VERSION index 8649cd418..09635791b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.2.17 \ No newline at end of file +3.2.17.1 \ No newline at end of file diff --git a/mythic-docker/src/rabbitmq/util_agent_message_push_c2.go b/mythic-docker/src/rabbitmq/util_agent_message_push_c2.go index 6acca47b0..4150a584a 100644 --- a/mythic-docker/src/rabbitmq/util_agent_message_push_c2.go +++ b/mythic-docker/src/rabbitmq/util_agent_message_push_c2.go @@ -98,7 +98,7 @@ type interceptProxyToAgentMessage struct { CallbackID int } -var interceptProxyToAgentMessageChan = make(chan interceptProxyToAgentMessage, 200) +var interceptProxyToAgentMessageChan = make(chan interceptProxyToAgentMessage, 2000) // interceptProxyDataToAgentForPushC2 checks if Proxy messages can be sent to a PushC2 agent first func interceptProxyDataToAgentForPushC2() { @@ -155,14 +155,20 @@ func interceptProxyDataToAgentForPushC2() { // we don't have a PushC2 client available, so save it like normal switch msg.ProxyType { case CALLBACK_PORT_TYPE_INTERACTIVE: - msg.InteractiveMessagesToAgent <- msg.InteractiveMessage + select { + case msg.InteractiveMessagesToAgent <- msg.InteractiveMessage: + default: + } //.LogInfo("saved to msg") case CALLBACK_PORT_TYPE_SOCKS: fallthrough case CALLBACK_PORT_TYPE_RPORTFWD: - msg.MessagesToAgent <- msg.Message + select { + case msg.MessagesToAgent <- msg.Message: + default: + logging.LogError(nil, "dropping message because channel is full", "type", msg.ProxyType) + } } - } } } diff --git a/mythic-docker/src/rabbitmq/utils_proxy_traffic.go b/mythic-docker/src/rabbitmq/utils_proxy_traffic.go index 0031e846e..e63125f9a 100644 --- a/mythic-docker/src/rabbitmq/utils_proxy_traffic.go +++ b/mythic-docker/src/rabbitmq/utils_proxy_traffic.go @@ -140,7 +140,7 @@ func ManuallyToggleProxy(input ProxyStop) ProxyStopResponse { func (c *callbackPortsInUse) Initialize() { callbackPorts := []databaseStructs.Callbackport{} c.ports = make([]*callbackPortUsage, 0) - c.proxyFromAgentMessageChannel = make(chan ProxyFromAgentMessageForMythic, 100) + c.proxyFromAgentMessageChannel = make(chan ProxyFromAgentMessageForMythic, 2000) go c.ListenForProxyFromAgentMessage() if err := database.DB.Select(&callbackPorts, `SELECT * FROM callbackport WHERE deleted=false`); err != nil { logging.LogError(err, "Failed to load callback ports from database") @@ -154,12 +154,12 @@ func (c *callbackPortsInUse) Initialize() { RemotePort: proxy.RemotePort, PortType: proxy.PortType, OperationID: proxy.OperationID, - messagesToAgent: make(chan proxyToAgentMessage, 200), - newConnectionChannel: make(chan *acceptedConnection, 200), - removeConnectionsChannel: make(chan *acceptedConnection, 200), - messagesFromAgent: make(chan proxyFromAgentMessage, 200), - interactiveMessagesToAgent: make(chan agentMessagePostResponseInteractive, 200), - interactiveMessagesFromAgent: make(chan agentMessagePostResponseInteractive, 200), + messagesToAgent: make(chan proxyToAgentMessage, 2000), + newConnectionChannel: make(chan *acceptedConnection, 2000), + removeConnectionsChannel: make(chan *acceptedConnection, 2000), + messagesFromAgent: make(chan proxyFromAgentMessage, 2000), + interactiveMessagesToAgent: make(chan agentMessagePostResponseInteractive, 2000), + interactiveMessagesFromAgent: make(chan agentMessagePostResponseInteractive, 2000), stopAllConnections: make(chan bool), } acceptedConnections := make([]*acceptedConnection, 0) @@ -376,12 +376,12 @@ func (c *callbackPortsInUse) Add(callbackId int, portType CallbackPortType, loca RemoteIP: remoteIP, OperationID: operationId, PortType: portType, - messagesToAgent: make(chan proxyToAgentMessage, 200), - newConnectionChannel: make(chan *acceptedConnection, 200), - removeConnectionsChannel: make(chan *acceptedConnection, 200), - messagesFromAgent: make(chan proxyFromAgentMessage, 200), - interactiveMessagesToAgent: make(chan agentMessagePostResponseInteractive, 200), - interactiveMessagesFromAgent: make(chan agentMessagePostResponseInteractive, 200), + messagesToAgent: make(chan proxyToAgentMessage, 2000), + newConnectionChannel: make(chan *acceptedConnection, 2000), + removeConnectionsChannel: make(chan *acceptedConnection, 2000), + messagesFromAgent: make(chan proxyFromAgentMessage, 2000), + interactiveMessagesToAgent: make(chan agentMessagePostResponseInteractive, 2000), + interactiveMessagesFromAgent: make(chan agentMessagePostResponseInteractive, 2000), stopAllConnections: make(chan bool), } acceptedConnections := make([]*acceptedConnection, 0) @@ -524,18 +524,32 @@ func (p *callbackPortUsage) Stop() error { if err := (*p.listener).Close(); err != nil { if err == net.ErrClosed { logging.LogInfo("tasking to stop all connections via channel") - p.stopAllConnections <- true + select { + case p.stopAllConnections <- true: + default: + } + return nil } logging.LogError(err, "Error calling close for the listener in the Stop function") + select { + case p.stopAllConnections <- true: + default: + } return err } logging.LogInfo("tasking to stop all connections via channel") - p.stopAllConnections <- true + select { + case p.stopAllConnections <- true: + default: + } return nil } else { logging.LogInfo("tasking to stop all connections via channel") - p.stopAllConnections <- true + select { + case p.stopAllConnections <- true: + default: + } return nil } @@ -590,15 +604,32 @@ func (p *callbackPortUsage) manageConnections() { select { case newConn := <-p.newConnectionChannel: //logging.LogInfo("adding new connection channel in manageConnections") + if _, exists := connectionMap[newConn.ServerID]; exists { + logging.LogWarning("Got a new connection with a ServerID that already exists, aborting it") + select { + case newConn.shouldClose <- true: + default: + } + newConn.conn.Close() + if newConn.TaskUUID != nil { + close(newConn.interactiveMessagesFromAgent) + } else { + close(newConn.messagesFromAgent) + } + continue + } connectionMap[newConn.ServerID] = newConn case removeCon := <-p.removeConnectionsChannel: - logging.LogInfo("removing connection channel in manageConnection") + logging.LogDebug("removing connection channel", "server_id", removeCon.ServerID) if removeCon.TaskUUID != nil { // remove all connections for interactive task closeIDs := []uint32{} for serverID, _ := range connectionMap { if connectionMap[serverID].TaskUUID != nil && connectionMap[serverID].TaskUUID == removeCon.TaskUUID { - //connectionMap[serverID].shouldClose <- true + select { + case connectionMap[serverID].shouldClose <- true: + default: + } close(connectionMap[serverID].interactiveMessagesFromAgent) connectionMap[serverID].conn.Close() closeIDs = append(closeIDs, serverID) @@ -609,7 +640,10 @@ func (p *callbackPortUsage) manageConnections() { } } else { if _, ok := connectionMap[removeCon.ServerID]; ok { - connectionMap[removeCon.ServerID].shouldClose <- true + select { + case connectionMap[removeCon.ServerID].shouldClose <- true: + default: + } close(connectionMap[removeCon.ServerID].messagesFromAgent) connectionMap[removeCon.ServerID].conn.Close() delete(connectionMap, removeCon.ServerID) @@ -679,7 +713,7 @@ func (p *callbackPortUsage) manageConnections() { newConnection := acceptedConnection{ conn: conn, shouldClose: make(chan bool, 1), - messagesFromAgent: make(chan proxyFromAgentMessage, 200), + messagesFromAgent: make(chan proxyFromAgentMessage, 2000), ServerID: newMsg.ServerID, // randomized id port: p.LocalPort, } @@ -704,12 +738,15 @@ func (p *callbackPortUsage) manageConnections() { case <-p.stopAllConnections: for _, rmProxyData := range connectionMap { + select { + case rmProxyData.shouldClose <- true: + default: + } if rmProxyData.TaskUUID != nil { close(rmProxyData.interactiveMessagesFromAgent) } else { close(rmProxyData.messagesFromAgent) } - rmProxyData.conn.Close() delete(connectionMap, rmProxyData.ServerID) } @@ -719,117 +756,120 @@ func (p *callbackPortUsage) manageConnections() { // "p.removeConnectionsChannel", len(p.removeConnectionsChannel), // "p.messagesFromAgent", len(p.messagesFromAgent)) } - } - } func (p *callbackPortUsage) handleSocksConnections() { + defer func() { + logging.LogError(nil, "exiting handleSocksConnectionsLoop", "port", p.LocalPort) + }() for { // Listen for an incoming connection - if p.listener != nil { - if conn, err := (*p.listener).Accept(); err != nil { - logging.LogError(err, "Failed to accept new connection on port", "port", p.LocalPort) - if err := (*p.listener).Close(); err != nil { - logging.LogError(err, "Failed to close listener", "port", p.LocalPort) - } - //p.listener = nil - return - } else { - // Handle connections in a new goroutine - newConnection := acceptedConnection{ - conn: conn, - shouldClose: make(chan bool, 1), - messagesFromAgent: make(chan proxyFromAgentMessage, 200), - ServerID: uint32(rand.Intn(math.MaxInt32)), // randomized id - port: p.LocalPort, - } - // add this connection for tracking so it can be cancelled later if needed + if p.listener == nil { + logging.LogError(nil, "Listener is nil, exiting the handleSocksConnections loop", "port", p.LocalPort) + return + } + conn, err := (*p.listener).Accept() + if err != nil { + logging.LogError(err, "Failed to accept new connection on port", "port", p.LocalPort) + err = (*p.listener).Close() + if err != nil { + logging.LogError(err, "Failed to close listener", "port", p.LocalPort) + } + //p.listener = nil + return + } + // this reads from the connection and writes data for the agent to process + initial := make([]byte, 4) + _, err = conn.Read(initial) + if err != nil { + logging.LogError(err, "failed to read initial SOCKS connection data") + continue + } + _, err = conn.Write([]byte{'\x05', '\x00'}) + if err != nil { + logging.LogError(err, "Failed to send the \\x05\\x00 no-auth bytes for new socks connection") + continue + } + newConnection := acceptedConnection{ + conn: conn, + shouldClose: make(chan bool, 1), + messagesFromAgent: make(chan proxyFromAgentMessage, 2000), + ServerID: uint32(rand.Intn(math.MaxInt32)), // randomized id + port: p.LocalPort, + } + p.newConnectionChannel <- &newConnection - // this reads from the connection and writes data for the agent to process - initial := make([]byte, 4) - if _, err := conn.Read(initial); err != nil { - logging.LogError(err, "failed to read initial SOCKS connection data") - return - } else if _, err := conn.Write([]byte{'\x05', '\x00'}); err != nil { - logging.LogError(err, "Failed to send the \\x05\\x00 no-auth bytes for new socks connection") + //logging.LogDebug("Got new connection", "server_id", newConnection.ServerID) + go func(conn net.Conn) { + // function for reading from agents to send to Mythic's connections + for { + select { + case <-newConnection.shouldClose: + //logging.LogDebug("got message to close connection", "server_id", newConnection.ServerID) + //p.removeConnectionsChannel <- &newConnection return + case agentMessage := <-newConnection.messagesFromAgent: + //logging.LogDebug("Writing to connection from agent", "serverID", agentMessage.ServerID) + dataBytes, err := base64.StdEncoding.DecodeString(agentMessage.Message) + if err != nil { + logging.LogError(err, "Failed to base64 decode agent socks message", "server_id", newConnection.ServerID) + continue + } + _, err = conn.Write(dataBytes) + if err != nil { + logging.LogError(err, "Failed to write to connection", "server_id", newConnection.ServerID) + p.removeConnectionsChannel <- &newConnection + return + } + if agentMessage.IsExit { + //logging.LogDebug("got message isExit", "server_id", newConnection.ServerID) + // cleanup the connection data + p.removeConnectionsChannel <- &newConnection + return + } } - p.newConnectionChannel <- &newConnection - - //logging.LogDebug("Got new connection", "server_id", newConnection.ServerID) - go func(conn net.Conn) { - // function for reading from agents to send to Mythic's connections - for { - select { - case <-newConnection.shouldClose: - //logging.LogDebug("got message to close connection", "server_id", newConnection.ServerID) - p.removeConnectionsChannel <- &newConnection - return - case agentMessage := <-newConnection.messagesFromAgent: - //logging.LogDebug("Writing to connection from agent", "serverID", agentMessage.ServerID) - if dataBytes, err := base64.StdEncoding.DecodeString(agentMessage.Message); err != nil { - logging.LogError(err, "Failed to base64 decode agent socks message", "server_id", newConnection.ServerID) - } else if _, err := conn.Write(dataBytes); err != nil { - logging.LogError(err, "Failed to write to connection", "server_id", newConnection.ServerID) - p.removeConnectionsChannel <- &newConnection - return - } else if agentMessage.IsExit { - //logging.LogDebug("got message isExit", "server_id", newConnection.ServerID) - // cleanup the connection data - p.removeConnectionsChannel <- &newConnection - return - } - - } + } + }(conn) + go func(conn net.Conn) { + // function for reading from Mythic's connections to send to agents + for { + buf := make([]byte, 4096) + //logging.LogDebug("looping to read from connection again", "server_id", newConnection.ServerID) + length, err := conn.Read(buf) + if err != nil { + //logging.LogError(err, "Failed to read from connection, sending exit", "server_id", newConnection.ServerID) + interceptProxyToAgentMessageChan <- interceptProxyToAgentMessage{ + Message: proxyToAgentMessage{ + Message: nil, + IsExit: true, + ServerID: newConnection.ServerID, + Port: p.LocalPort, + }, + MessagesToAgent: p.messagesToAgent, + CallbackID: p.CallbackID, + ProxyType: p.PortType, } - - }(conn) - go func(conn net.Conn) { - // function for reading from Mythic's connections to send to agents - for { - buf := make([]byte, 4096) - //logging.LogDebug("looping to read from connection again", "server_id", newConnection.ServerID) - if length, err := conn.Read(buf); err != nil { - logging.LogError(err, "Failed to read from connection, sending exit", "server_id", newConnection.ServerID) - interceptProxyToAgentMessageChan <- interceptProxyToAgentMessage{ - Message: proxyToAgentMessage{ - Message: nil, - IsExit: true, - ServerID: newConnection.ServerID, - Port: p.LocalPort, - }, - MessagesToAgent: p.messagesToAgent, - CallbackID: p.CallbackID, - ProxyType: p.PortType, - } - p.removeConnectionsChannel <- &newConnection - return - } else { - if length > 0 { - //fmt.Printf("Message received for chan %d: length %v\n", newConnection.ServerID, length) - interceptProxyToAgentMessageChan <- interceptProxyToAgentMessage{ - Message: proxyToAgentMessage{ - Message: buf[:length], - IsExit: false, - ServerID: newConnection.ServerID, - Port: p.LocalPort, - }, - MessagesToAgent: p.messagesToAgent, - CallbackID: p.CallbackID, - ProxyType: p.PortType, - } - //fmt.Printf("Message sent to p.messagesToAgent channel for chan %d\n", newConnection.ServerID) - } - } + p.removeConnectionsChannel <- &newConnection + return + } + if length > 0 { + //fmt.Printf("Message received for chan %d: length %v\n", newConnection.ServerID, length) + interceptProxyToAgentMessageChan <- interceptProxyToAgentMessage{ + Message: proxyToAgentMessage{ + Message: buf[:length], + IsExit: false, + ServerID: newConnection.ServerID, + Port: p.LocalPort, + }, + MessagesToAgent: p.messagesToAgent, + CallbackID: p.CallbackID, + ProxyType: p.PortType, } - - }(conn) + //fmt.Printf("Message sent to p.messagesToAgent channel for chan %d\n", newConnection.ServerID) + } } - } else { - logging.LogError(nil, "Listener is nil, exiting the handleSocksConnections loop", "port", p.LocalPort) - return - } + }(conn) } } func (p *callbackPortUsage) handleRpfwdConnections(newConnection *acceptedConnection) { @@ -840,17 +880,22 @@ func (p *callbackPortUsage) handleRpfwdConnections(newConnection *acceptedConnec select { case <-newConnection.shouldClose: //logging.LogDebug("got message to close connection", "server_id", newConnection.ServerID) - p.removeConnectionsChannel <- newConnection + //p.removeConnectionsChannel <- newConnection return case agentMessage := <-newConnection.messagesFromAgent: //logging.LogDebug("Writing to connection from agent", "serverID", agentMessage.ServerID) - if dataBytes, err := base64.StdEncoding.DecodeString(agentMessage.Message); err != nil { + dataBytes, err := base64.StdEncoding.DecodeString(agentMessage.Message) + if err != nil { logging.LogError(err, "Failed to base64 decode agent socks message", "server_id", newConnection.ServerID) - } else if _, err := conn.Write(dataBytes); err != nil { + continue + } + _, err = conn.Write(dataBytes) + if err != nil { logging.LogError(err, "Failed to write to connection", "server_id", newConnection.ServerID) p.removeConnectionsChannel <- newConnection return - } else if agentMessage.IsExit { + } + if agentMessage.IsExit { //logging.LogDebug("got message isExit", "server_id", newConnection.ServerID) // cleanup the connection data p.removeConnectionsChannel <- newConnection @@ -864,8 +909,9 @@ func (p *callbackPortUsage) handleRpfwdConnections(newConnection *acceptedConnec for { buf := make([]byte, 4096) //logging.LogDebug("looping to read from connection", "server_id", newConnection.ServerID) - if length, err := conn.Read(buf); err != nil { - logging.LogError(err, "Failed to read from connection, sending exit", "server_id", newConnection.ServerID) + length, err := conn.Read(buf) + if err != nil { + //logging.LogError(err, "Failed to read from connection, sending exit", "server_id", newConnection.ServerID) interceptProxyToAgentMessageChan <- interceptProxyToAgentMessage{ Message: proxyToAgentMessage{ Message: nil, @@ -879,22 +925,21 @@ func (p *callbackPortUsage) handleRpfwdConnections(newConnection *acceptedConnec } p.removeConnectionsChannel <- newConnection return - } else { - if length > 0 { - //logging.LogDebug("Message received for chan %d: length %v\n", newConnection.ServerID, length) - interceptProxyToAgentMessageChan <- interceptProxyToAgentMessage{ - Message: proxyToAgentMessage{ - Message: buf[:length], - IsExit: false, - ServerID: newConnection.ServerID, - Port: p.LocalPort, - }, - MessagesToAgent: p.messagesToAgent, - ProxyType: p.PortType, - CallbackID: p.CallbackID, - } - //fmt.Printf("Message sent to p.messagesToAgent channel for chan %d\n", newConnection.ServerID) + } + if length > 0 { + //logging.LogDebug("Message received for chan %d: length %v\n", newConnection.ServerID, length) + interceptProxyToAgentMessageChan <- interceptProxyToAgentMessage{ + Message: proxyToAgentMessage{ + Message: buf[:length], + IsExit: false, + ServerID: newConnection.ServerID, + Port: p.LocalPort, + }, + MessagesToAgent: p.messagesToAgent, + ProxyType: p.PortType, + CallbackID: p.CallbackID, } + //fmt.Printf("Message sent to p.messagesToAgent channel for chan %d\n", newConnection.ServerID) } } }(newConnection.conn) @@ -921,7 +966,7 @@ func (p *callbackPortUsage) handleInteractiveConnections() { newConnection := acceptedConnection{ conn: conn, shouldClose: make(chan bool, 1), - interactiveMessagesFromAgent: make(chan agentMessagePostResponseInteractive, 200), + interactiveMessagesFromAgent: make(chan agentMessagePostResponseInteractive, 2000), ServerID: uint32(rand.Intn(math.MaxInt32)), // randomized id TaskUUID: &taskUUID, port: p.LocalPort, @@ -935,7 +980,7 @@ func (p *callbackPortUsage) handleInteractiveConnections() { select { case <-newConnection.shouldClose: //logging.LogDebug("got message to close connection", "server_id", newConnection.ServerID) - p.removeConnectionsChannel <- &newConnection + //p.removeConnectionsChannel <- &newConnection return case agentMessage := <-newConnection.interactiveMessagesFromAgent: //logging.LogDebug("Writing to connection from agent", "serverID", agentMessage.ServerID) @@ -962,7 +1007,7 @@ func (p *callbackPortUsage) handleInteractiveConnections() { buf := make([]byte, 4096) //logging.LogDebug("looping to read from connection again", "server_id", newConnection.ServerID) if length, err := conn.Read(buf); err != nil { - logging.LogError(err, "Failed to read from connection, sending exit", "server_id", newConnection.ServerID) + //logging.LogError(err, "Failed to read from connection, sending exit", "server_id", newConnection.ServerID) p.removeConnectionsChannel <- &newConnection return } else {