Skip to content

Commit

Permalink
wip: debug outgoing best selection
Browse files Browse the repository at this point in the history
  • Loading branch information
ainghazal committed Jan 17, 2024
1 parent 1e4d886 commit d31bd13
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 27 deletions.
2 changes: 1 addition & 1 deletion internal/reliabletransport/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type inFlightPacket interface {
Retry()
Retries() uint8
MarkExpired()
MarkSent()
MarkSent(time.Time)
Ready() bool
}

Expand Down
17 changes: 9 additions & 8 deletions internal/reliabletransport/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type inFlightWrappedPacket struct {

// how many acks we've received for packets with higher PID.
withHigherACK int

sent time.Time
}

func newInFlightWrappedPacket(p *model.Packet) *inFlightWrappedPacket {
Expand Down Expand Up @@ -62,14 +64,6 @@ func (p *inFlightWrappedPacket) MarkExpired() {
p.readyToSend = true
}

func (p *inFlightWrappedPacket) MarkSent() {
p.readyToSend = false
fmt.Printf(">> p.id: %v, ready=false %v\n", p.ID(), p)
backoff := p.Backoff()
// FIXME debug...
p.retransmitDeadline = time.Now().Add(backoff).Add(time.Second)
}

func (p *inFlightWrappedPacket) Ready() bool {
return p.readyToSend
}
Expand Down Expand Up @@ -100,6 +94,13 @@ func (p *inFlightWrappedPacket) Retry() {
fmt.Println("new deadline:", p.retransmitDeadline)
}

// the packet we just sent needs to mark the deadline to the next backoff
func (p *inFlightWrappedPacket) MarkSent(t time.Time) {
p.readyToSend = false
p.sent = t
fmt.Printf(">> p.id: %v, ready=false %v\n", p.ID(), p)
}

func (p *inFlightWrappedPacket) Retries() uint8 {
return p.retryCount
}
Expand Down
43 changes: 35 additions & 8 deletions internal/reliabletransport/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ func (r *reliableOutgoing) NextOutgoingPacket() sequentialPacket {

// TODO implement a proper "best" strategy that takes into account timers & fast rexmit
// this is a naive selection for now: get the one with a lower pid
// sort.Sort(inflightSequence(r.inFlight))
// candidate := r.inFlight[0]

var candidate *inFlightWrappedPacket

Expand All @@ -154,21 +152,50 @@ func (r *reliableOutgoing) NextOutgoingPacket() sequentialPacket {
candidate = p
continue
}
if p.Ready() && p.ID() <= candidate.ID() {
if p.ID() <= candidate.ID() {
candidate = p
}
}

r.logger.Infof(">> selected %v to send at %v", candidate.ID(), time.Now())
r.logger.Infof(">> retries: %v", candidate.Retries())
r.logger.Infof(">> ready: %v", candidate.Ready())
candidate.MarkSent()
// DEBUG: if there's no candidate that is ready and we return nil here
// we end up "stalling" for significant periods of time. Why??
// this is rather slow

/*
got := false
for _, p := range r.inFlight {
if p.Ready() {
if candidate == nil {
candidate = p
got = true
continue
}
if p.ID() <= candidate.ID() {
candidate = p
}
}
}
if !got {
return nil
}
*/

d := time.Since(candidate.sent)
if d < time.Duration(time.Second*2) {
fmt.Println(":: bananas! sent too little ago")
return nil

}
//r.logger.Infof(">> selected %v to send at %v", candidate.ID(), time.Now())
//r.logger.Infof(">> retries: %v", candidate.Retries())
//r.logger.Infof(">> ready: %v", candidate.Ready())
candidate.MarkSent(time.Now())
return candidate
}

func (r *reliableOutgoing) RescheduleExpiredPackets() {
for _, p := range r.inFlight {
if p.Ready() {
if p.Deadline().Before(time.Now()) {
p.Retry()
}
}
Expand Down
56 changes: 46 additions & 10 deletions internal/reliabletransport/reliabletransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reliabletransport

import (
"bytes"
"fmt"

"github.com/ooni/minivpn/internal/model"
"github.com/ooni/minivpn/internal/session"
Expand Down Expand Up @@ -40,8 +41,14 @@ func (s *Service) StartWorkers(
incoming := newReliableIncoming(logger, incomingSeen)
outgoing := newReliableOutgoing(logger, incomingSeen)

bufferToMuxer := make(chan *model.Packet, 20)

ws := &workersState{
logger: logger,
logger: logger,

// ???
bufferToMuxer: bufferToMuxer,

dataOrControlToMuxer: *s.DataOrControlToMuxer,
controlToReliable: s.ControlToReliable,
muxerToReliable: s.MuxerToReliable,
Expand All @@ -60,6 +67,9 @@ type workersState struct {
// logger is the logger to use
logger model.Logger

// DEBUG -- trying stuff here
bufferToMuxer chan *model.Packet

// dataOrControlToMuxer is the channel where we write packets going down the stack.
dataOrControlToMuxer chan<- *model.Packet

Expand Down Expand Up @@ -185,22 +195,48 @@ func (ws *workersState) moveDownWorker() {
ws.outgoing.OnIncomingPacketSeen(incomingSeen)

case <-ws.outgoing.ticker.C:
//ws.logger.Debugf("TICK-- %v", time.Now())

/*
ws.logger.Debugf("TICK-- %v", time.Now())
fmt.Println()
fmt.Println("::: mark expired")
*/
ws.outgoing.MarkExpiredPackets()

/*
fmt.Println()
fmt.Println("::: select to send")
*/
// TODO(ainghazal): use optional here
if nextPacket := ws.outgoing.NextOutgoingPacket(); nextPacket != nil {
pkt := nextPacket.Unwrap()
// TODO: is this the best place to check if there's a packet to write?
nextPacket := ws.outgoing.NextOutgoingPacket()
if nextPacket != nil {
ws.bufferToMuxer <- nextPacket.Unwrap()

// pkt := nextPacket.Unwrap()
// POSSIBLY BLOCK delivering this packet to the lower layer
select {
case ws.dataOrControlToMuxer <- pkt:
ws.logger.Debugf("sent packet down: %v", nextPacket.ID())
case <-ws.workersManager.ShouldShutdown():
return
}
/*
select {
case ws.dataOrControlToMuxer <- pkt:
ws.logger.Debugf("sent packet down: %v", nextPacket.ID())
case <-ws.workersManager.ShouldShutdown():
return
}
*/
}
//fmt.Println()
fmt.Println("::: reschedule")
ws.outgoing.RescheduleExpiredPackets()

case packet := <-ws.bufferToMuxer:
// POSSIBLY BLOCK delivering this packet to the lower layer
select {
case ws.dataOrControlToMuxer <- packet:
ws.logger.Debugf("sent packet down: %v", packet.ID)
case <-ws.workersManager.ShouldShutdown():
return
}

case <-ws.workersManager.ShouldShutdown():
return
}
Expand Down

0 comments on commit d31bd13

Please sign in to comment.