Skip to content

Commit

Permalink
add priority cache, improve reqCtx interface, add option to disable p…
Browse files Browse the repository at this point in the history
…rivate IPs, add ctx to metrics, add timeout to module pool
  • Loading branch information
bubbajoe committed Jun 30, 2024
1 parent 4edf5ea commit 9d4e376
Show file tree
Hide file tree
Showing 17 changed files with 234 additions and 142 deletions.
5 changes: 3 additions & 2 deletions config.dgate.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: v1
debug: true
log_level: ${LOG_LEVEL:-debug}
log_level: ${LOG_LEVEL:-info}
disable_default_namespace: true
tags: [debug, local, test]
storage:
Expand All @@ -15,7 +15,8 @@ proxy:
port: ${PORT:-80}
host: 0.0.0.0
console_log_level: info
transport:
client_transport:
disable_private_ips: false
dns_prefer_go: true
init_resources:
namespaces:
Expand Down
10 changes: 3 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,7 @@ type (
}

LoggingConfig struct {
ZapConfig *zap.Config `koanf:",squash"`
LogOutputs []*LogOutput `koanf:"log_outputs"`
}

LogOutput struct {
Name string `koanf:"name"`
Config map[string]any `koanf:",remain"`
ZapConfig *zap.Config `koanf:",squash"`
}

DGateProxyConfig struct {
Expand Down Expand Up @@ -176,6 +170,7 @@ type (
KeepAlive time.Duration `koanf:"keep_alive"`
ResponseHeaderTimeout time.Duration `koanf:"response_header_timeout"`
DialTimeout time.Duration `koanf:"dial_timeout"`
DisablePrivateIPs bool `koanf:"disable_private_ips"`
}

DGateStorageConfig struct {
Expand Down Expand Up @@ -255,6 +250,7 @@ func (conf *DGateConfig) GetLogger() (*zap.Logger, error) {
if logger, err := conf.Logging.ZapConfig.Build(); err != nil {
return nil, err
} else {
zap.ReplaceGlobals(logger)

Check warning on line 253 in internal/config/config.go

View check run for this annotation

Codecov / codecov/patch

internal/config/config.go#L253

Added line #L253 was not covered by tests
return logger, nil
}
}
10 changes: 6 additions & 4 deletions internal/proxy/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) (err error) {
if reload {
defer func(start time.Time) {
ps.logger.Debug("processing change log",
zap.String("id", cl.ID),
zap.Duration("duration", time.Since(start)),
)
if err != nil {
ps.logger.Debug("processed change log",
zap.String("id", cl.ID),
zap.Duration("duration", time.Since(start)),
)

Check warning on line 24 in internal/proxy/change_log.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/change_log.go#L21-L24

Added lines #L21 - L24 were not covered by tests
}
}(time.Now())
}
ps.proxyLock.Lock()
Expand Down
10 changes: 7 additions & 3 deletions internal/proxy/dynamic_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,19 @@ func (ps *ProxyState) setupRoutes(
if len(rt.Modules) > 0 {
modExtFunc := ps.createModuleExtractorFunc(rt)
if modPool, err := NewModulePool(
256, 1024, reqCtxProvider, modExtFunc,
0, 1024, time.Minute*5,
reqCtxProvider, modExtFunc,
); err != nil {
ps.logger.Error("Error creating module buffer", zap.Error(err))
return err
} else {
reqCtxProvider.SetModulePool(modPool)
reqCtxProvider.UpdateModulePool(modPool)
}
}
ps.providers.Insert(rt.Namespace.Name+"/"+rt.Name, reqCtxProvider)
oldReqCtxProvider := ps.providers.Insert(rt.Namespace.Name+"/"+rt.Name, reqCtxProvider)
if oldReqCtxProvider != nil {
oldReqCtxProvider.Close()

Check warning on line 187 in internal/proxy/dynamic_proxy.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/dynamic_proxy.go#L187

Added line #L187 was not covered by tests
}
for _, path := range rt.Paths {
if len(rt.Methods) > 0 && rt.Methods[0] == "*" {
if len(rt.Methods) > 1 {
Expand Down
66 changes: 39 additions & 27 deletions internal/proxy/module_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package proxy

import (
"context"
"time"

"go.uber.org/zap"
)

type ModulePool interface {
Expand All @@ -11,57 +14,66 @@ type ModulePool interface {
}

type modulePool struct {
modExtBuffer chan ModuleExtractor
min, max int

ctxCancel context.CancelFunc
ctx context.Context
modExtChan chan ModuleExtractor
min, max int
cancel context.CancelFunc
ctx context.Context

createModuleExtract func() (ModuleExtractor, error)
createModExt func() (ModuleExtractor, error)
}

func NewModulePool(
minBuffers, maxBuffers int,
bufferTimeout time.Duration,
reqCtxProvider *RequestContextProvider,
createModExts ModuleExtractorFunc,
) (ModulePool, error) {
if minBuffers < 1 {
panic("module concurrency must be greater than 0")
}
if maxBuffers < minBuffers {
panic("maxBuffers must be greater than minBuffers")
}

if _, err := createModExts(reqCtxProvider); err != nil {
return nil, err
}
modExtChan := make(chan ModuleExtractor, maxBuffers)
mb := &modulePool{
min: minBuffers,
max: maxBuffers,
modExtBuffer: make(chan ModuleExtractor, maxBuffers),
min: minBuffers,
max: maxBuffers,
modExtChan: modExtChan,
createModExt: func() (ModuleExtractor, error) {
return createModExts(reqCtxProvider)
},

Check warning on line 44 in internal/proxy/module_executor.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/module_executor.go#L43-L44

Added lines #L43 - L44 were not covered by tests
}
mb.createModuleExtract = func() (ModuleExtractor, error) {
return createModExts(reqCtxProvider)
}
mb.ctx, mb.ctxCancel = context.WithCancel(reqCtxProvider.ctx)
mb.ctx, mb.cancel = context.WithCancel(reqCtxProvider.ctx)

// add min module extractors to the pool
defer func() {
for i := 0; i < minBuffers; i++ {
me, err := mb.createModExt()
if err == nil {
mb.modExtChan <- me

Check warning on line 53 in internal/proxy/module_executor.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/module_executor.go#L51-L53

Added lines #L51 - L53 were not covered by tests
}
}
}()

return mb, nil
}

func (mb *modulePool) Borrow() ModuleExtractor {
if mb == nil || mb.ctx == nil || mb.ctx.Err() != nil {
if mb == nil || mb.ctx.Err() != nil {
zap.L().Warn("stale use of module pool",
zap.Any("modPool", mb),
)

Check warning on line 65 in internal/proxy/module_executor.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/module_executor.go#L62-L65

Added lines #L62 - L65 were not covered by tests
return nil
}
var (
me ModuleExtractor
err error
)
select {
case me = <-mb.modExtBuffer:
case me = <-mb.modExtChan:

Check warning on line 73 in internal/proxy/module_executor.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/module_executor.go#L73

Added line #L73 was not covered by tests
break
// NOTE: important for performance
default:
me, err = mb.createModuleExtract()
if err != nil {
if me, err = mb.createModExt(); err != nil {

Check warning on line 76 in internal/proxy/module_executor.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/module_executor.go#L76

Added line #L76 was not covered by tests
return nil
}
}
Expand All @@ -72,18 +84,18 @@ func (mb *modulePool) Return(me ModuleExtractor) {
// if context is canceled, do not return module extract
if mb.ctx != nil && mb.ctx.Err() == nil {
select {
case mb.modExtBuffer <- me:
case mb.modExtChan <- me:

Check warning on line 87 in internal/proxy/module_executor.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/module_executor.go#L87

Added line #L87 was not covered by tests
return
default:
// if buffer is full, discard module extract
}
}
me.Stop(true)
me.Stop(false)

Check warning on line 93 in internal/proxy/module_executor.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/module_executor.go#L93

Added line #L93 was not covered by tests
}

func (mb *modulePool) Close() {
if mb.ctxCancel != nil {
mb.ctxCancel()
if mb.cancel != nil {
mb.cancel()

Check warning on line 98 in internal/proxy/module_executor.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/module_executor.go#L97-L98

Added lines #L97 - L98 were not covered by tests
}
close(mb.modExtBuffer)
close(mb.modExtChan)

Check warning on line 100 in internal/proxy/module_executor.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/module_executor.go#L100

Added line #L100 was not covered by tests
}
39 changes: 20 additions & 19 deletions internal/proxy/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func proxyHandler(ps *ProxyState, reqCtx *RequestContext) {
event.Debug("Request log")
}()

defer ps.metrics.MeasureProxyRequest(reqCtx, time.Now())
defer ps.metrics.MeasureProxyRequest(reqCtx.ctx, reqCtx, time.Now())

var modExt ModuleExtractor
if len(reqCtx.route.Modules) != 0 {
Expand All @@ -53,7 +53,7 @@ func proxyHandler(ps *ProxyState, reqCtx *RequestContext) {
} else {
if modExt = modPool.Borrow(); modExt == nil {
ps.metrics.MeasureModuleDuration(
reqCtx, "module_extract", runtimeStart,
reqCtx.ctx, reqCtx, "module_extract", runtimeStart,

Check warning on line 56 in internal/proxy/proxy_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/proxy_handler.go#L56

Added line #L56 was not covered by tests
errors.New("error borrowing module"),
)
ps.logger.Error("Error borrowing module")
Expand All @@ -66,7 +66,7 @@ func proxyHandler(ps *ProxyState, reqCtx *RequestContext) {
modExt.Start(reqCtx)
defer modExt.Stop(true)
ps.metrics.MeasureModuleDuration(
reqCtx, "module_extract",
reqCtx.ctx, reqCtx, "module_extract",
runtimeStart, nil,
)
} else {
Expand All @@ -86,7 +86,7 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
fetchUpstreamStart := time.Now()
hostUrl, err := fetchUpstreamUrl(modExt.ModuleContext())
ps.metrics.MeasureModuleDuration(
reqCtx, "fetch_upstream",
reqCtx.ctx, reqCtx, "fetch_upstream",
fetchUpstreamStart, err,
)
if err != nil {
Expand Down Expand Up @@ -147,7 +147,8 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
resModifierStart := time.Now()
err = responseModifier(modExt.ModuleContext(), res)
ps.metrics.MeasureModuleDuration(
reqCtx, "response_modifier",
reqCtx.ctx, reqCtx,
"response_modifier",

Check warning on line 151 in internal/proxy/proxy_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/proxy_handler.go#L150-L151

Added lines #L150 - L151 were not covered by tests
resModifierStart, err,
)
if err != nil {
Expand All @@ -164,7 +165,7 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
}).
ErrorHandler(func(w http.ResponseWriter, r *http.Request, reqErr error) {
upstreamErr = reqErr
ps.logger.Debug("Error proxying request",
ps.logger.Error("Error proxying request",
zap.String("error", reqErr.Error()),
zap.String("route", reqCtx.route.Name),
zap.String("service", reqCtx.route.Service.Name),
Expand All @@ -178,7 +179,7 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
errorHandlerStart := time.Now()
err = errorHandler(modExt.ModuleContext(), reqErr)
ps.metrics.MeasureModuleDuration(
reqCtx, "error_handler",
reqCtx.ctx, reqCtx, "error_handler",
errorHandlerStart, err,
)
if err != nil {
Expand All @@ -193,12 +194,6 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
}
}
if !reqCtx.rw.HeadersSent() && reqCtx.rw.BytesWritten() == 0 {
ps.logger.Error("Writing error response",
zap.String("error", reqErr.Error()),
zap.String("route", reqCtx.route.Name),
zap.String("service", reqCtx.route.Service.Name),
zap.String("namespace", reqCtx.route.Namespace.Name),
)
util.WriteStatusCodeError(reqCtx.rw, http.StatusBadGateway)
}
})
Expand All @@ -207,7 +202,8 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
reqModifierStart := time.Now()
err = requestModifier(modExt.ModuleContext())
ps.metrics.MeasureModuleDuration(
reqCtx, "request_modifier",
reqCtx.ctx, reqCtx,
"request_modifier",

Check warning on line 206 in internal/proxy/proxy_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/proxy_handler.go#L205-L206

Added lines #L205 - L206 were not covered by tests
reqModifierStart, err,
)
if err != nil {
Expand Down Expand Up @@ -241,8 +237,10 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt
upstreamStart := time.Now()
rp.ServeHTTP(reqCtx.rw, reqCtx.req)
ps.metrics.MeasureUpstreamDuration(
reqCtx, upstreamStart,
upstreamUrl.String(), upstreamErr,
reqCtx.ctx, reqCtx,
upstreamStart,
upstreamUrl.String(),
upstreamErr,
)
}

Expand All @@ -252,7 +250,8 @@ func requestHandlerModule(ps *ProxyState, reqCtx *RequestContext, modExt ModuleE
reqModifierStart := time.Now()
err = requestModifier(modExt.ModuleContext())
ps.metrics.MeasureModuleDuration(
reqCtx, "request_modifier",
reqCtx.ctx, reqCtx,
"request_modifier",

Check warning on line 254 in internal/proxy/proxy_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/proxy_handler.go#L253-L254

Added lines #L253 - L254 were not covered by tests
reqModifierStart, err,
)
if err != nil {
Expand All @@ -269,7 +268,8 @@ func requestHandlerModule(ps *ProxyState, reqCtx *RequestContext, modExt ModuleE
requestHandlerStart := time.Now()
err := requestHandler(modExt.ModuleContext())
defer ps.metrics.MeasureModuleDuration(
reqCtx, "request_handler",
reqCtx.ctx, reqCtx,
"request_handler",

Check warning on line 272 in internal/proxy/proxy_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/proxy_handler.go#L271-L272

Added lines #L271 - L272 were not covered by tests
requestHandlerStart, err,
)
if err != nil {
Expand All @@ -283,7 +283,8 @@ func requestHandlerModule(ps *ProxyState, reqCtx *RequestContext, modExt ModuleE
errorHandlerStart := time.Now()
err = errorHandler(modExt.ModuleContext(), err)
ps.metrics.MeasureModuleDuration(
reqCtx, "error_handler",
reqCtx.ctx, reqCtx,
"error_handler",

Check warning on line 287 in internal/proxy/proxy_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/proxy_handler.go#L286-L287

Added lines #L286 - L287 were not covered by tests
errorHandlerStart, err,
)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/proxy/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ func TestProxyHandler_ReverseProxy(t *testing.T) {
modBuf.On("Borrow").Return(modExt).Once()
modBuf.On("Return", modExt).Return().Once()
modBuf.On("Close").Return().Once()
reqCtxProvider.SetModulePool(modBuf)
reqCtxProvider.UpdateModulePool(modBuf)

modPool := NewMockModulePool()
modPool.On("Borrow").Return(modExt).Once()
modPool.On("Return", modExt).Return().Once()
reqCtxProvider.SetModulePool(modPool)
reqCtxProvider.UpdateModulePool(modPool)
ps.ProxyHandler(ps, reqCtx)

wr.AssertExpectations(t)
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestProxyHandler_ProxyHandler(t *testing.T) {
modPool := NewMockModulePool()
modPool.On("Borrow").Return(modExt).Once()
modPool.On("Return", modExt).Return().Once()
reqCtxProvider.SetModulePool(modPool)
reqCtxProvider.UpdateModulePool(modPool)

reqCtx := reqCtxProvider.CreateRequestContext(
context.Background(), wr, req, "/")
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestProxyHandler_ProxyHandlerError(t *testing.T) {
modPool.On("Borrow").Return(modExt).Once()
modPool.On("Return", modExt).Return().Once()
reqCtxProvider := proxy.NewRequestContextProvider(rt, ps)
reqCtxProvider.SetModulePool(modPool)
reqCtxProvider.UpdateModulePool(modPool)
reqCtx := reqCtxProvider.CreateRequestContext(
context.Background(), wr, req, "/")
ps.ProxyHandler(ps, reqCtx)
Expand Down
Loading

0 comments on commit 9d4e376

Please sign in to comment.