Skip to content

Commit

Permalink
add priority cache, improve reqCtx interface, add option to disable p… (
Browse files Browse the repository at this point in the history
#15)

* add priority cache, improve reqCtx interface, add option to disable private IPs, add ctx to metrics, add timeout to module pool

* remove raft tests from e2e
  • Loading branch information
bubbajoe authored Jun 30, 2024
1 parent 2c07ee8 commit eee44cb
Show file tree
Hide file tree
Showing 18 changed files with 236 additions and 159 deletions.
19 changes: 2 additions & 17 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,12 @@ jobs:
run: |
sudo apt install -y jq
jq --version
- name: Install goreman
run: |
go install github.com/mattn/goreman@latest
goreman version
- run: go run cmd/dgate-server/main.go &
- name: Start and wait 5 seconds
run: go run cmd/dgate-server/main.go & sleep 5

- run: cd functional-tests/raft_tests && goreman start &

- name: Wait for server to start
run: sleep 10

- name: Functional Standalone Tests
run: |
for i in functional-tests/admin_tests/*.sh; \
do bash -c $i; done
- name: Run local k6 test
uses: grafana/[email protected]
with:
filename: performance-tests/perf-test.js

5 changes: 3 additions & 2 deletions config.dgate.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: v1
debug: true
log_level: ${LOG_LEVEL:-debug}
log_level: ${LOG_LEVEL:-info}
disable_default_namespace: true
tags: [debug, local, test]
storage:
Expand All @@ -15,7 +15,8 @@ proxy:
port: ${PORT:-80}
host: 0.0.0.0
console_log_level: info
transport:
client_transport:
disable_private_ips: false
dns_prefer_go: true
init_resources:
namespaces:
Expand Down
10 changes: 3 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,7 @@ type (
}

LoggingConfig struct {
ZapConfig *zap.Config `koanf:",squash"`
LogOutputs []*LogOutput `koanf:"log_outputs"`
}

LogOutput struct {
Name string `koanf:"name"`
Config map[string]any `koanf:",remain"`
ZapConfig *zap.Config `koanf:",squash"`
}

DGateProxyConfig struct {
Expand Down Expand Up @@ -176,6 +170,7 @@ type (
KeepAlive time.Duration `koanf:"keep_alive"`
ResponseHeaderTimeout time.Duration `koanf:"response_header_timeout"`
DialTimeout time.Duration `koanf:"dial_timeout"`
DisablePrivateIPs bool `koanf:"disable_private_ips"`
}

DGateStorageConfig struct {
Expand Down Expand Up @@ -255,6 +250,7 @@ func (conf *DGateConfig) GetLogger() (*zap.Logger, error) {
if logger, err := conf.Logging.ZapConfig.Build(); err != nil {
return nil, err
} else {
zap.ReplaceGlobals(logger)
return logger, nil
}
}
10 changes: 6 additions & 4 deletions internal/proxy/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) (err error) {
if reload {
defer func(start time.Time) {
ps.logger.Debug("processing change log",
zap.String("id", cl.ID),
zap.Duration("duration", time.Since(start)),
)
if err != nil {
ps.logger.Debug("processed change log",
zap.String("id", cl.ID),
zap.Duration("duration", time.Since(start)),
)
}
}(time.Now())
}
ps.proxyLock.Lock()
Expand Down
10 changes: 7 additions & 3 deletions internal/proxy/dynamic_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,19 @@ func (ps *ProxyState) setupRoutes(
if len(rt.Modules) > 0 {
modExtFunc := ps.createModuleExtractorFunc(rt)
if modPool, err := NewModulePool(
256, 1024, reqCtxProvider, modExtFunc,
0, 1024, time.Minute*5,
reqCtxProvider, modExtFunc,
); err != nil {
ps.logger.Error("Error creating module buffer", zap.Error(err))
return err
} else {
reqCtxProvider.SetModulePool(modPool)
reqCtxProvider.UpdateModulePool(modPool)
}
}
ps.providers.Insert(rt.Namespace.Name+"/"+rt.Name, reqCtxProvider)
oldReqCtxProvider := ps.providers.Insert(rt.Namespace.Name+"/"+rt.Name, reqCtxProvider)
if oldReqCtxProvider != nil {
oldReqCtxProvider.Close()
}
for _, path := range rt.Paths {
if len(rt.Methods) > 0 && rt.Methods[0] == "*" {
if len(rt.Methods) > 1 {
Expand Down
66 changes: 39 additions & 27 deletions internal/proxy/module_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package proxy

import (
"context"
"time"

"go.uber.org/zap"
)

type ModulePool interface {
Expand All @@ -11,57 +14,66 @@ type ModulePool interface {
}

type modulePool struct {
modExtBuffer chan ModuleExtractor
min, max int

ctxCancel context.CancelFunc
ctx context.Context
modExtChan chan ModuleExtractor
min, max int
cancel context.CancelFunc
ctx context.Context

createModuleExtract func() (ModuleExtractor, error)
createModExt func() (ModuleExtractor, error)
}

func NewModulePool(
minBuffers, maxBuffers int,
bufferTimeout time.Duration,
reqCtxProvider *RequestContextProvider,
createModExts ModuleExtractorFunc,
) (ModulePool, error) {
if minBuffers < 1 {
panic("module concurrency must be greater than 0")
}
if maxBuffers < minBuffers {
panic("maxBuffers must be greater than minBuffers")
}

if _, err := createModExts(reqCtxProvider); err != nil {
return nil, err
}
modExtChan := make(chan ModuleExtractor, maxBuffers)
mb := &modulePool{
min: minBuffers,
max: maxBuffers,
modExtBuffer: make(chan ModuleExtractor, maxBuffers),
min: minBuffers,
max: maxBuffers,
modExtChan: modExtChan,
createModExt: func() (ModuleExtractor, error) {
return createModExts(reqCtxProvider)
},
}
mb.createModuleExtract = func() (ModuleExtractor, error) {
return createModExts(reqCtxProvider)
}
mb.ctx, mb.ctxCancel = context.WithCancel(reqCtxProvider.ctx)
mb.ctx, mb.cancel = context.WithCancel(reqCtxProvider.ctx)

// add min module extractors to the pool
defer func() {
for i := 0; i < minBuffers; i++ {
me, err := mb.createModExt()
if err == nil {
mb.modExtChan <- me
}
}
}()

return mb, nil
}

func (mb *modulePool) Borrow() ModuleExtractor {
if mb == nil || mb.ctx == nil || mb.ctx.Err() != nil {
if mb == nil || mb.ctx.Err() != nil {
zap.L().Warn("stale use of module pool",
zap.Any("modPool", mb),
)
return nil
}
var (
me ModuleExtractor
err error
)
select {
case me = <-mb.modExtBuffer:
case me = <-mb.modExtChan:
break
// NOTE: important for performance
default:
me, err = mb.createModuleExtract()
if err != nil {
if me, err = mb.createModExt(); err != nil {
return nil
}
}
Expand All @@ -72,18 +84,18 @@ func (mb *modulePool) Return(me ModuleExtractor) {
// if context is canceled, do not return module extract
if mb.ctx != nil && mb.ctx.Err() == nil {
select {
case mb.modExtBuffer <- me:
case mb.modExtChan <- me:
return
default:
// if buffer is full, discard module extract
}
}
me.Stop(true)
me.Stop(false)
}

func (mb *modulePool) Close() {
if mb.ctxCancel != nil {
mb.ctxCancel()
if mb.cancel != nil {
mb.cancel()
}
close(mb.modExtBuffer)
close(mb.modExtChan)
}
39 changes: 20 additions & 19 deletions internal/proxy/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func proxyHandler(ps *ProxyState, reqCtx *RequestContext) {
event.Debug("Request log")
}()

defer ps.metrics.MeasureProxyRequest(reqCtx, time.Now())
defer ps.metrics.MeasureProxyRequest(reqCtx.ctx, reqCtx, time.Now())

var modExt ModuleExtractor
if len(reqCtx.route.Modules) != 0 {
Expand All @@ -53,7 +53,7 @@ func proxyHandler(ps *ProxyState, reqCtx *RequestContext) {
} else {
if modExt = modPool.Borrow(); modExt == nil {
ps.metrics.MeasureModuleDuration(
reqCtx, "module_extract", runtimeStart,
reqCtx.ctx, reqCtx, "module_extract", runtimeStart,
errors.New("error borrowing module"),
)
ps.logger.Error("Error borrowing module")
Expand All @@ -66,7 +66,7 @@ func proxyHandler(ps *ProxyState, reqCtx *RequestContext) {
modExt.Start(reqCtx)
defer modExt.Stop(true)
ps.metrics.MeasureModuleDuration(
reqCtx, "module_extract",
reqCtx.ctx, reqCtx, "module_extract",
runtimeStart, nil,
)
} else {
Expand All @@ -86,7 +86,7 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
fetchUpstreamStart := time.Now()
hostUrl, err := fetchUpstreamUrl(modExt.ModuleContext())
ps.metrics.MeasureModuleDuration(
reqCtx, "fetch_upstream",
reqCtx.ctx, reqCtx, "fetch_upstream",
fetchUpstreamStart, err,
)
if err != nil {
Expand Down Expand Up @@ -147,7 +147,8 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
resModifierStart := time.Now()
err = responseModifier(modExt.ModuleContext(), res)
ps.metrics.MeasureModuleDuration(
reqCtx, "response_modifier",
reqCtx.ctx, reqCtx,
"response_modifier",
resModifierStart, err,
)
if err != nil {
Expand All @@ -164,7 +165,7 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
}).
ErrorHandler(func(w http.ResponseWriter, r *http.Request, reqErr error) {
upstreamErr = reqErr
ps.logger.Debug("Error proxying request",
ps.logger.Error("Error proxying request",
zap.String("error", reqErr.Error()),
zap.String("route", reqCtx.route.Name),
zap.String("service", reqCtx.route.Service.Name),
Expand All @@ -178,7 +179,7 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
errorHandlerStart := time.Now()
err = errorHandler(modExt.ModuleContext(), reqErr)
ps.metrics.MeasureModuleDuration(
reqCtx, "error_handler",
reqCtx.ctx, reqCtx, "error_handler",
errorHandlerStart, err,
)
if err != nil {
Expand All @@ -193,12 +194,6 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
}
}
if !reqCtx.rw.HeadersSent() && reqCtx.rw.BytesWritten() == 0 {
ps.logger.Error("Writing error response",
zap.String("error", reqErr.Error()),
zap.String("route", reqCtx.route.Name),
zap.String("service", reqCtx.route.Service.Name),
zap.String("namespace", reqCtx.route.Namespace.Name),
)
util.WriteStatusCodeError(reqCtx.rw, http.StatusBadGateway)
}
})
Expand All @@ -207,7 +202,8 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
reqModifierStart := time.Now()
err = requestModifier(modExt.ModuleContext())
ps.metrics.MeasureModuleDuration(
reqCtx, "request_modifier",
reqCtx.ctx, reqCtx,
"request_modifier",
reqModifierStart, err,
)
if err != nil {
Expand Down Expand Up @@ -241,8 +237,10 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
upstreamStart := time.Now()
rp.ServeHTTP(reqCtx.rw, reqCtx.req)
ps.metrics.MeasureUpstreamDuration(
reqCtx, upstreamStart,
upstreamUrl.String(), upstreamErr,
reqCtx.ctx, reqCtx,
upstreamStart,
upstreamUrl.String(),
upstreamErr,
)
}

Expand All @@ -252,7 +250,8 @@ func requestHandlerModule(ps *ProxyState, reqCtx *RequestContext, modExt ModuleE
reqModifierStart := time.Now()
err = requestModifier(modExt.ModuleContext())
ps.metrics.MeasureModuleDuration(
reqCtx, "request_modifier",
reqCtx.ctx, reqCtx,
"request_modifier",
reqModifierStart, err,
)
if err != nil {
Expand All @@ -269,7 +268,8 @@ func requestHandlerModule(ps *ProxyState, reqCtx *RequestContext, modExt ModuleE
requestHandlerStart := time.Now()
err := requestHandler(modExt.ModuleContext())
defer ps.metrics.MeasureModuleDuration(
reqCtx, "request_handler",
reqCtx.ctx, reqCtx,
"request_handler",
requestHandlerStart, err,
)
if err != nil {
Expand All @@ -283,7 +283,8 @@ func requestHandlerModule(ps *ProxyState, reqCtx *RequestContext, modExt ModuleE
errorHandlerStart := time.Now()
err = errorHandler(modExt.ModuleContext(), err)
ps.metrics.MeasureModuleDuration(
reqCtx, "error_handler",
reqCtx.ctx, reqCtx,
"error_handler",
errorHandlerStart, err,
)
if err != nil {
Expand Down
Loading

0 comments on commit eee44cb

Please sign in to comment.