Skip to content

Commit

Permalink
Various tweaks and improvements
Browse files Browse the repository at this point in the history
Still arguably broken, but this might get some people in #1 out of trouble.

Signed-off-by: duck <[email protected]>
  • Loading branch information
duckfullstop committed Dec 3, 2021
1 parent 431514c commit ccf1a51
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 61 deletions.
21 changes: 16 additions & 5 deletions pkg/admin/enum/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
UpdateTypeGamescript
)

type UpdateFrequency uint8
type UpdateFrequency uint16

const (
UpdateFrequencyPoll UpdateFrequency = 0x01 << iota
Expand Down Expand Up @@ -56,15 +56,15 @@ const (
type Destination uint8

const (
DestinationBroadcast Destination = 0x00 // All destinations
DestinationTeam // A specific team
DestinationClient // A specific client
DestinationBroadcast Destination = 0x00 << iota // All destinations
DestinationTeam // A specific team
DestinationClient // A specific client
)

type NetError uint8

const (
NetErrorGeneral NetError = 0x00 // A general network failure
NetErrorGeneral NetError = 0x00 << iota // A general network failure

// Signals from clients
NetErrorDesync
Expand All @@ -85,3 +85,14 @@ const (
NetErrorFull
NetErrorTooManyCommands // 0x0F
)

type ClientID uint32

const (
// Client is not part of anything
ClientIDInvalid = 0x00 << iota
// Server is guaranteed to have this Client ID
ClientIDServer = 0x01
// The first Client ID
ClientIDFirst = 0x02
)
4 changes: 2 additions & 2 deletions pkg/admin/packets/packets_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ func (p AdminQuit) PacketType() openttd_packets_admin.AdminPacketIndex {
}

type AdminUpdateFrequency struct { // Type 2
Type enum.UpdateType // Update type (see #AdminUpdateType).
Type enum.UpdateType // Update type (see #AdminUpdateType). -- THIS HAS TO BE SENT AS A UINT16 FOR WHO KNOWS WHAT REASON, USE CAUTION
Frequency enum.UpdateFrequency // Update frequency (see #AdminUpdateFrequency), setting #ADMIN_FREQUENCY_POLL is always ignored.
}

func (p AdminUpdateFrequency) Pack() (out bytes.Buffer) {
binary.Write(&out, binary.LittleEndian, p.Type)
binary.Write(&out, binary.LittleEndian, uint16(p.Type))
binary.Write(&out, binary.LittleEndian, p.Frequency)
return out
}
Expand Down
28 changes: 23 additions & 5 deletions pkg/admin/proto_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ func (s *Session) Open() error {
s.log(LogWarning, "Expected PROTOCOL, instead got:\n%#v\n", e)
}

// Repeat the above to get the Welcome packet
mt, m, err = readPacketFromTcpConn(s.conn)
if err != nil {
return err
}
e, err = s.onEvent(mt, m)
if err != nil {
return err
}
if e.Type != welcomeEventType {
// This is not fatal, but it does not follow the standard.
s.log(LogWarning, "Expected WELCOME, instead got:\n%#v\n", e)
}

s.log(LogInformational, "We are now connected to OpenTTD, emitting connect event")
s.handleEvent(connectEventType, &Connect{})

Expand Down Expand Up @@ -124,6 +138,9 @@ func (s *Session) Close() (err error) {
// Nil out the connection
s.conn = nil

// Close the listener
close(s.listening)

s.log(LogInformational, "emit disconnect event")
s.handleEvent(disconnectEventType, &Disconnect{})
s.Unlock()
Expand Down Expand Up @@ -241,7 +258,7 @@ func (s *Session) GamescriptCommand(json string) (err error) {
return err
}

// listen polls the websocket connection for events, it will stop when the
// listen polls the admin connection for events, it will stop when the
// listening channel is closed, or an error occurs.
func (s *Session) listen(conn *net.TCPConn, listening <-chan interface{}) {

Expand Down Expand Up @@ -305,6 +322,7 @@ func (s *Session) reconnect() {
err = s.Open()
if err == nil {
s.log(LogInformational, "successfully reconnected to game")
return
}

// Certain race conditions can call reconnect() twice. If this happens, we
Expand All @@ -326,7 +344,7 @@ func (s *Session) reconnect() {
}

// FailedPongs is the Number of pong intervals to wait until forcing a connection restart.
const FailedPongs = 5 * time.Millisecond
const FailedPongs = 6

// HeartbeatLatency returns the latency between heartbeat acknowledgement and heartbeat send.
func (s *Session) HeartbeatLatency() time.Duration {
Expand All @@ -344,10 +362,10 @@ func (s *Session) heartbeat(conn *net.TCPConn, listening <-chan interface{}) {
return
}

heartbeatIntervalMsec := time.Duration(10000)
heartbeatInterval := time.Duration(10 * time.Second)

var err error
ticker := time.NewTicker(heartbeatIntervalMsec * time.Second)
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()

for {
Expand All @@ -360,7 +378,7 @@ func (s *Session) heartbeat(conn *net.TCPConn, listening <-chan interface{}) {
// very lazy implementation of token for very lazy people
err = writePacketToTcpConn(conn, packets.AdminPing{Token: 1})
s.connMutex.Unlock()
if err != nil || time.Now().UTC().Sub(last) > (heartbeatIntervalMsec*FailedPongs) {
if err != nil || time.Now().UTC().Sub(last) > (heartbeatInterval*FailedPongs) {
if err != nil {
s.log(LogError, "error sending heartbeat to server %s, %s", s.Hostname, err)
} else {
Expand Down
70 changes: 26 additions & 44 deletions pkg/admin/rcon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,18 @@ type rconResp struct {
func (s *Session) Rcon(command string) (err error) {
// we have to add this to the queue because the handleRconRequests queue will get out of step with commands otherwise

// is this too wide of a scope to lock? we could have an RconLock otherwise
s.Lock()
obj := rconRequest{Command: command}
s.rconQueue = append(s.rconQueue, obj)
s.Unlock()
s.rconQueue <- &obj
return nil
}

// RconSync sends a blocking RCON command to the server, waits for a response, then returns a set of response packets.
// Please note: This will block your thread until we get a complete response from the server!
// If you don't care about the result, use Rcon(command).
func (s *Session) RconSync(command string) (ret []Rcon, err error) {
s.rconMtx.Lock()
rchan := make(chan []Rcon)
obj := rconRequest{Command: command, responseChan: rchan}
s.rconQueue = append(s.rconQueue, obj)
s.rconMtx.Unlock()
s.rconQueue <- &obj
// Block on a response
ret = <-obj.responseChan
return ret, nil
Expand All @@ -57,49 +52,36 @@ func (s *Session) handleRconRequests(listening <-chan interface{}) {
s.log(LogDebug, "called")

for {
var cmd rconRequest
if len(s.rconQueue) == 0 {
// no requests available right now
continue
}

s.rconMtx.Lock()

// Pop the last thing on the stack
cmd, s.rconQueue = s.rconQueue[len(s.rconQueue)-1], s.rconQueue[:len(s.rconQueue)-1]

// Send it
err := s.sendRconCommand(cmd.Command)
if err != nil {
if cmd.responseChan != nil {
cmd.responseChan <- []Rcon{}
}
continue
}
var data []Rcon
var run = true
for run {
v := <-s.rconChan
switch {
case v.rcon != nil:
// not an ending packet
data = append(data, *v.rcon)
case v.rconEnd != nil:
// ending packet
if v.rconEnd.Command == cmd.Command {
run = false
}
}
}
select {
case <-listening:
s.Unlock()
return
default:
case cmd := <-s.rconQueue:
// Send it
err := s.sendRconCommand(cmd.Command)
if err != nil {
if cmd.responseChan != nil {
cmd.responseChan <- []Rcon{}
}
continue
}
var data []Rcon
var run = true
for run {
v := <-s.rconChan
switch {
case v.rcon != nil:
// not an ending packet
data = append(data, *v.rcon)
case v.rconEnd != nil:
// ending packet
if v.rconEnd.Command == cmd.Command {
run = false
}
}
}
if cmd.responseChan != nil {
cmd.responseChan <- data
}
}
s.rconMtx.Unlock()
}
}
2 changes: 1 addition & 1 deletion pkg/admin/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func New(hostname string, port int, password string) (s *Session, err error) {
Hostname: hostname,
Port: port,
Password: password,
rconQueue: []rconRequest{},
rconQueue: make(chan *rconRequest),
rconChan: make(chan *rconResp),
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/admin/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,8 @@ type Session struct {
// Acceptable polling rates
pollrates map[enum.UpdateType]uint16

// Mutex for rconQueue
rconMtx sync.Mutex

// Pending RCON commands
rconQueue []rconRequest
rconQueue chan *rconRequest

// Channel for RCON responses to be sent to
rconChan chan *rconResp
Expand Down

0 comments on commit ccf1a51

Please sign in to comment.