Skip to content

Commit

Permalink
Merge pull request #1330 from openziti/buffer-size-fixes
Browse files Browse the repository at this point in the history
Ensure buffers are big enough for UDP datagrams. Fixes #1329
  • Loading branch information
plorenz authored Mar 15, 2023
2 parents 6c9a0af + cce22eb commit 1b8b134
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 5 deletions.
18 changes: 15 additions & 3 deletions router/xgress_common/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/openziti/channel/v2"
"github.com/openziti/fabric/router/xgress"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/foundation/v2/info"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/openziti/sdk-golang/ziti/edge/impl"
"github.com/pkg/errors"
Expand All @@ -41,6 +42,10 @@ const (
xgressTypeFlag = 6
)

const (
DefaultBufferSize = 10 * 1024
)

type outOfBand struct {
data []byte
headers map[uint8][]byte
Expand All @@ -54,16 +59,23 @@ type XgressConn struct {
receiver secretstream.Decryptor
sender secretstream.Encryptor

writeDone chan struct{}
flags concurrenz.AtomicBitSet
writeDone chan struct{}
flags concurrenz.AtomicBitSet
bufferSize int
}

func NewXgressConn(conn net.Conn, halfClose bool, isTransport bool) *XgressConn {
result := &XgressConn{
Conn: conn,
outOfBandTx: make(chan *outOfBand, 1),
writeDone: make(chan struct{}),
bufferSize: DefaultBufferSize,
}

if _, isUdpConn := conn.(*net.UDPConn); isUdpConn {
result.bufferSize = info.MaxUdpPacketSize
}

result.flags.Set(halfCloseFlag, halfClose)
result.flags.Set(xgressTypeFlag, isTransport)
return result
Expand Down Expand Up @@ -171,7 +183,7 @@ func (self *XgressConn) ReadPayload() ([]byte, map[uint8][]byte, error) {
}
}

buffer := make([]byte, 10240)
buffer := make([]byte, self.bufferSize)
n, err := self.Conn.Read(buffer)
buffer = buffer[:n]

Expand Down
9 changes: 7 additions & 2 deletions router/xgress_edge_tunnel/servicepoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
*/

import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/edge/tunnel/intercept"
"github.com/openziti/sdk-golang/ziti"
Expand Down Expand Up @@ -121,6 +122,10 @@ func (self *servicePoller) pollServices(pollInterval time.Duration, notifyClose

func (self *servicePoller) requestServiceListUpdate() {
ctrlCh := self.fabricProvider.factory.ctrls.AnyCtrlChannel()
lastUpdateToken, _ := self.servicesLastUpdateToken.Get(ctrlCh.Id())
self.fabricProvider.requestServiceList(ctrlCh, lastUpdateToken)
if ctrlCh != nil { // not currently connected to any controllers
lastUpdateToken, _ := self.servicesLastUpdateToken.Get(ctrlCh.Id())
self.fabricProvider.requestServiceList(ctrlCh, lastUpdateToken)
} else {
pfxlog.Logger().Warn("unable to request service list update, no controllers connected")
}
}
56 changes: 56 additions & 0 deletions tests/list_services_perf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//go:build perftests

package tests

import (
"fmt"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/metrics"
"net/url"
"testing"
"time"
)

func TestServicePerf(t *testing.T) {
ctx := NewTestContext(t)
ctx.ApiHost = "127.0.0.1:1280"
ctx.AdminAuthenticator.Username = "admin"
ctx.AdminAuthenticator.Password = "admin"
ctx.RequireAdminManagementApiLogin()

identities := ctx.AdminManagementSession.requireQuery("identities?filter=" + url.QueryEscape("true limit 50"))
ids, err := identities.S("data").Children()
ctx.NoError(err)

registry := metrics.NewRegistry("test", nil)
lookupTimer := registry.Timer("serviceLookup")

wg := concurrenz.NewWaitGroup()

for _, id := range ids {
identityId := id.S("id").Data().(string)

doneC := make(chan struct{})
wg.AddNotifier(doneC)

go func() {
for i := 0; i < 100; i++ {
start := time.Now()
_ = ctx.AdminManagementSession.requireQuery("services?asIdentity=" + identityId)
lookupTimer.UpdateSince(start)
}
close(doneC)
}()
}
fmt.Println("all queries started")
wg.WaitForDone(2 * time.Minute)

msg := registry.Poll()
lookupTimeSnapshot := msg.Timers["serviceLookup"]
fmt.Printf("mean: %v\n", time.Duration(lookupTimeSnapshot.Mean).String())
fmt.Printf("min: %v\n", time.Duration(lookupTimeSnapshot.Min).String())
fmt.Printf("max: %v\n", time.Duration(lookupTimeSnapshot.Max).String())
fmt.Printf("p95: %v\n", time.Duration(lookupTimeSnapshot.P95).String())
fmt.Printf("p99: %v\n", time.Duration(lookupTimeSnapshot.P99).String())
fmt.Printf("count: %v\n", lookupTimeSnapshot.Count)
}

0 comments on commit 1b8b134

Please sign in to comment.