diff --git a/router/xgress_common/connection.go b/router/xgress_common/connection.go index 451305363..ccf7088c9 100644 --- a/router/xgress_common/connection.go +++ b/router/xgress_common/connection.go @@ -23,6 +23,7 @@ import ( "github.com/openziti/channel/v2" "github.com/openziti/fabric/router/xgress" "github.com/openziti/foundation/v2/concurrenz" + "github.com/openziti/foundation/v2/info" "github.com/openziti/sdk-golang/ziti/edge" "github.com/openziti/sdk-golang/ziti/edge/impl" "github.com/pkg/errors" @@ -41,6 +42,10 @@ const ( xgressTypeFlag = 6 ) +const ( + DefaultBufferSize = 10 * 1024 +) + type outOfBand struct { data []byte headers map[uint8][]byte @@ -54,8 +59,9 @@ type XgressConn struct { receiver secretstream.Decryptor sender secretstream.Encryptor - writeDone chan struct{} - flags concurrenz.AtomicBitSet + writeDone chan struct{} + flags concurrenz.AtomicBitSet + bufferSize int } func NewXgressConn(conn net.Conn, halfClose bool, isTransport bool) *XgressConn { @@ -63,7 +69,13 @@ func NewXgressConn(conn net.Conn, halfClose bool, isTransport bool) *XgressConn Conn: conn, outOfBandTx: make(chan *outOfBand, 1), writeDone: make(chan struct{}), + bufferSize: DefaultBufferSize, } + + if _, isUdpConn := conn.(*net.UDPConn); isUdpConn { + result.bufferSize = info.MaxUdpPacketSize + } + result.flags.Set(halfCloseFlag, halfClose) result.flags.Set(xgressTypeFlag, isTransport) return result @@ -171,7 +183,7 @@ func (self *XgressConn) ReadPayload() ([]byte, map[uint8][]byte, error) { } } - buffer := make([]byte, 10240) + buffer := make([]byte, self.bufferSize) n, err := self.Conn.Read(buffer) buffer = buffer[:n] diff --git a/router/xgress_edge_tunnel/servicepoll.go b/router/xgress_edge_tunnel/servicepoll.go index 8d248a41c..a54ac2772 100644 --- a/router/xgress_edge_tunnel/servicepoll.go +++ b/router/xgress_edge_tunnel/servicepoll.go @@ -17,6 +17,7 @@ limitations under the License. */ import ( + "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v2" "github.com/openziti/edge/tunnel/intercept" "github.com/openziti/sdk-golang/ziti" @@ -121,6 +122,10 @@ func (self *servicePoller) pollServices(pollInterval time.Duration, notifyClose func (self *servicePoller) requestServiceListUpdate() { ctrlCh := self.fabricProvider.factory.ctrls.AnyCtrlChannel() - lastUpdateToken, _ := self.servicesLastUpdateToken.Get(ctrlCh.Id()) - self.fabricProvider.requestServiceList(ctrlCh, lastUpdateToken) + if ctrlCh != nil { // not currently connected to any controllers + lastUpdateToken, _ := self.servicesLastUpdateToken.Get(ctrlCh.Id()) + self.fabricProvider.requestServiceList(ctrlCh, lastUpdateToken) + } else { + pfxlog.Logger().Warn("unable to request service list update, no controllers connected") + } } diff --git a/tests/list_services_perf_test.go b/tests/list_services_perf_test.go new file mode 100644 index 000000000..ea9237c7e --- /dev/null +++ b/tests/list_services_perf_test.go @@ -0,0 +1,56 @@ +//go:build perftests + +package tests + +import ( + "fmt" + "github.com/openziti/foundation/v2/concurrenz" + "github.com/openziti/metrics" + "net/url" + "testing" + "time" +) + +func TestServicePerf(t *testing.T) { + ctx := NewTestContext(t) + ctx.ApiHost = "127.0.0.1:1280" + ctx.AdminAuthenticator.Username = "admin" + ctx.AdminAuthenticator.Password = "admin" + ctx.RequireAdminManagementApiLogin() + + identities := ctx.AdminManagementSession.requireQuery("identities?filter=" + url.QueryEscape("true limit 50")) + ids, err := identities.S("data").Children() + ctx.NoError(err) + + registry := metrics.NewRegistry("test", nil) + lookupTimer := registry.Timer("serviceLookup") + + wg := concurrenz.NewWaitGroup() + + for _, id := range ids { + identityId := id.S("id").Data().(string) + + doneC := make(chan struct{}) + wg.AddNotifier(doneC) + + go func() { + for i := 0; i < 100; i++ { + start := time.Now() + _ = ctx.AdminManagementSession.requireQuery("services?asIdentity=" + identityId) + lookupTimer.UpdateSince(start) + } + close(doneC) + }() + } + fmt.Println("all queries started") + wg.WaitForDone(2 * time.Minute) + + msg := registry.Poll() + lookupTimeSnapshot := msg.Timers["serviceLookup"] + fmt.Printf("mean: %v\n", time.Duration(lookupTimeSnapshot.Mean).String()) + fmt.Printf("min: %v\n", time.Duration(lookupTimeSnapshot.Min).String()) + fmt.Printf("max: %v\n", time.Duration(lookupTimeSnapshot.Max).String()) + fmt.Printf("p95: %v\n", time.Duration(lookupTimeSnapshot.P95).String()) + fmt.Printf("p99: %v\n", time.Duration(lookupTimeSnapshot.P99).String()) + fmt.Printf("count: %v\n", lookupTimeSnapshot.Count) +}