diff --git a/TODO.md b/TODO.md index 00ff125..5bb5d29 100644 --- a/TODO.md +++ b/TODO.md @@ -7,7 +7,6 @@ - server management (start-proxy, stop-proxy, restart, status, logs, stats, etc.) - cluster management (raft commands, replica commands, etc.) (low priority) - other commands (backup, restore, etc.) (low priority) - - replace k6 with wrk for performance tests ## Add Module Tests @@ -16,9 +15,6 @@ - [ ] - Add option to specify export variables when ambiguous (?) - [ ] - check how global variable conflicts are handled -## Start using Pkl - -replace dgate server config with pkl. ## dgate-cli declaritive config @@ -70,10 +66,6 @@ expose metrics for the following: - Add Transactions - [ ] - Add transactional support for admin API -## DGate Documentation (dgate.io/docs) - -Use Docusaurus to create the documentation for DGate. - ## DGate Admin Console (low priority) Admin Console is a web-based interface that can be used to manage the state of the cluster. Manage resource, view logs, stats, and more. It can also be used to develop and test modules directly in the browser. @@ -136,14 +128,6 @@ A good example of a bundle would be a bundle that adds support for OAuth2 authen Differing from common resource versioning, modules can have multiple versions that can be used at the same time. This can be used to test new versions of modules before deploying them to the cluster. -## DGate CLI - argument variable suggestions - -For example, if the user types an argument that is not recognized, the CLI can suggest the correct argument by search the available arguments and finding the closest match. -``` -dgate-cli ns mk my-ns nmae=my-ns -Variable 'nmae' is not recognized. Did you mean 'name'? -``` - ## DGate CLI - help command show required variables When the user runs the help command, the CLI should show the required variables for the command. For example, if the user runs `dgate-cli ns mk --help`, the CLI should show the required variables for the `ns mk` command. `name` is a required variable for the `ns mk` command. Also, the CLI should show non-required variables. @@ -159,4 +143,8 @@ Add stack tracing for typescript modules. Currently, Raft Implementation is tightly coupled with the Admin API. This makes it difficult to change the Raft Implementation without changing the Admin API. Decouple the Raft Implementation from the Admin API to make it easier to change the Raft Implementation. -## Add Telemetry (sentry, datadog, etc.) \ No newline at end of file +## Add Telemetry (sentry, datadog, etc.) + +## ResourceManager callback for resource changes + +Add a callback to the ResourceManager that is called when a resource is changed. This can be used to invalidate caches, update modules, and more. \ No newline at end of file diff --git a/functional-tests/raft_tests/test1.yaml b/functional-tests/raft_tests/test1.yaml index 5c18c1c..2bb8f42 100644 --- a/functional-tests/raft_tests/test1.yaml +++ b/functional-tests/raft_tests/test1.yaml @@ -1,4 +1,5 @@ version: v1 +debug: true log_level: info tags: diff --git a/internal/admin/admin_fsm.go b/internal/admin/admin_fsm.go index 5632be1..8f22399 100644 --- a/internal/admin/admin_fsm.go +++ b/internal/admin/admin_fsm.go @@ -21,27 +21,17 @@ func newDGateAdminFSM(logger *zap.Logger, cs changestate.ChangeState) *dgateAdmi return &dgateAdminFSM{cs, logger} } -func (fsm *dgateAdminFSM) isReplay(log *raft.Log) bool { - return !fsm.cs.Ready() && - log.Index+1 >= fsm.cs.Raft().LastIndex() && - log.Index+1 >= fsm.cs.Raft().AppliedIndex() +func (fsm *dgateAdminFSM) isLatestLog(log *raft.Log) bool { + rft := fsm.cs.Raft() + return log.Index == rft.CommitIndex() || + log.Index+1 == rft.CommitIndex() } -func (fsm *dgateAdminFSM) checkLast(log *raft.Log) { - rft := fsm.cs.Raft() - if !fsm.cs.Ready() && fsm.isReplay(log) { - fsm.logger.Info("FSM is not ready, setting ready", - zap.Uint64("index", log.Index), - zap.Uint64("applied-index", rft.AppliedIndex()), - zap.Uint64("last-index", rft.LastIndex()), - ) - defer func() { - if err := fsm.cs.ReloadState(false); err != nil { - fsm.logger.Error("Error processing change log in FSM", zap.Error(err)) - } else { - fsm.cs.SetReady() - } - }() +func (fsm *dgateAdminFSM) reload(cls ...*spec.ChangeLog) { + if err := fsm.cs.ReloadState(false, cls...); err != nil { + fsm.logger.Error("Error processing change log in FSM", zap.Error(err)) + } else { + fsm.cs.SetReady() } } @@ -82,43 +72,56 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) { } func (fsm *dgateAdminFSM) Apply(log *raft.Log) any { - defer fsm.checkLast(log) - _, err := fsm.applyLog(log) + rft := fsm.cs.Raft() + fsm.logger.Debug("applying log", + zap.Uint64("current", log.Index), + zap.Uint64("applied", rft.AppliedIndex()), + zap.Uint64("commit", rft.CommitIndex()), + zap.Uint64("last", rft.LastIndex()), + ) + cl, err := fsm.applyLog(log) + if err != nil && !fsm.cs.Ready() { + fsm.reload(cl) + } else { + fsm.logger.Error("Error processing change log in FSM", zap.Error(err)) + } return err } func (fsm *dgateAdminFSM) ApplyBatch(logs []*raft.Log) []any { - lastLog := logs[len(logs)-1] - if fsm.isReplay(lastLog) { - rft := fsm.cs.Raft() - fsm.logger.Info("applying log batch logs", - zap.Int("size", len(logs)), - zap.Uint64("current", lastLog.Index), - zap.Uint64("applied", rft.AppliedIndex()), - zap.Uint64("commit", rft.CommitIndex()), - zap.Uint64("last", rft.LastIndex()), - ) + if len(logs) == 0 || logs == nil { + fsm.logger.Warn("No logs to apply in ApplyBatch") + return nil } - cls := make([]*spec.ChangeLog, 0, len(logs)) - defer func() { - if !fsm.cs.Ready() { - fsm.checkLast(logs[len(logs)-1]) - return - } - - if err := fsm.cs.ReloadState(true, cls...); err != nil { - fsm.logger.Error("Error reloading state @ FSM ApplyBatch", zap.Error(err)) - } - }() + lastLog := logs[len(logs)-1] + rft := fsm.cs.Raft() + fsm.logger.Info("applying batch logs", + zap.Int("size", len(logs)), + zap.Uint64("current", lastLog.Index), + zap.Uint64("applied", rft.AppliedIndex()), + zap.Uint64("commit", rft.CommitIndex()), + zap.Uint64("last", rft.LastIndex()), + ) + cls := make([]*spec.ChangeLog, 0, len(logs)) results := make([]any, len(logs)) for i, log := range logs { - var cl *spec.ChangeLog - cl, results[i] = fsm.applyLog(log) - if cl != nil { + var ( + cl *spec.ChangeLog + err error + ) + if cl, err = fsm.applyLog(log); err != nil { + results[i] = err + fsm.logger.Error("Error processing change log in FSM", zap.Error(err)) + } else { cls = append(cls, cl) } } + + if fsm.cs.Ready() || fsm.isLatestLog(lastLog) { + fsm.reload(cls...) + } + return results } diff --git a/internal/admin/admin_routes.go b/internal/admin/admin_routes.go index 871bdac..fbde34e 100644 --- a/internal/admin/admin_routes.go +++ b/internal/admin/admin_routes.go @@ -82,22 +82,16 @@ func configureRoutes( allowed, err := ipList.Contains(remoteHost) if !allowed && adminConfig.XForwardedForDepth > 0 { xForwardedForIps := r.Header.Values("X-Forwarded-For") - count := min(adminConfig.XForwardedForDepth, len(xForwardedForIps)) - for i := 0; i < count; i++ { - allowed, err = ipList.Contains(xForwardedForIps[i]) - if err != nil { - logger.Error("error checking x-forwarded-for ip", - zap.Error(err), - ) - if conf.Debug { - http.Error(w, "Bad Request: could not parse x-forwarded-for IP address", http.StatusBadRequest) - } - http.Error(w, "Bad Request", http.StatusBadRequest) - return - } - if allowed { - break + if adminConfig.XForwardedForDepth >= len(xForwardedForIps) { + depth := min(adminConfig.XForwardedForDepth, len(xForwardedForIps)) + targetIp := xForwardedForIps[len(xForwardedForIps)-depth] + allowed, err = ipList.Contains(targetIp) + if err == nil && allowed { + remoteHost = targetIp + } else { + allowed = false } + } } @@ -145,17 +139,18 @@ func configureRoutes( return } } - raftInstance := cs.Raft() - if r.Method == http.MethodPut && raftInstance != nil { - leader := raftInstance.Leader() - if leader == "" { - util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader") - return - } - if raftInstance.State() != raft.Leader { - r.URL.Host = string(leader) - http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect) - return + if raftInstance := cs.Raft(); raftInstance != nil { + if r.Method == http.MethodPut || r.Method == http.MethodDelete { + leader := raftInstance.Leader() + if leader == "" { + util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader") + return + } + if raftInstance.State() != raft.Leader { + r.URL.Host = string(leader) + http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect) + return + } } } @@ -165,10 +160,14 @@ func configureRoutes( server.Get("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") - w.Header().Set("X-DGate-Raft", fmt.Sprintf("%t", cs.Raft() != nil)) w.Header().Set("X-DGate-WatchOnly", fmt.Sprintf("%t", adminConfig.WatchOnly)) w.Header().Set("X-DGate-ChangeHash", fmt.Sprintf("%d", cs.ChangeHash())) - w.Header().Set("X-DGate-AdminAPI", "true") + if raftInstance := cs.Raft(); raftInstance != nil { + w.Header().Set( + "X-DGate-Raft-State", + raftInstance.State().String(), + ) + } w.WriteHeader(http.StatusOK) w.Write([]byte("DGate Admin API")) })) diff --git a/internal/config/config.go b/internal/config/config.go index 1dcd90e..adee11c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -39,19 +39,21 @@ type ( } DGateProxyConfig struct { - Host string `koanf:"host"` - Port int `koanf:"port"` - TLS *DGateTLSConfig `koanf:"tls"` - EnableH2C bool `koanf:"enable_h2c"` - EnableHTTP2 bool `koanf:"enable_http2"` - EnableConsoleLogger bool `koanf:"enable_console_logger"` - RedirectHttpsDomains []string `koanf:"redirect_https"` - AllowedDomains []string `koanf:"allowed_domains"` - GlobalHeaders map[string]string `koanf:"global_headers"` - Transport DGateHttpTransportConfig `koanf:"client_transport"` + Host string `koanf:"host"` + Port int `koanf:"port"` + TLS *DGateTLSConfig `koanf:"tls"` + EnableH2C bool `koanf:"enable_h2c"` + EnableHTTP2 bool `koanf:"enable_http2"` + EnableConsoleLogger bool `koanf:"enable_console_logger"` + RedirectHttpsDomains []string `koanf:"redirect_https"` + AllowedDomains []string `koanf:"allowed_domains"` + GlobalHeaders map[string]string `koanf:"global_headers"` + Transport DGateHttpTransportConfig `koanf:"client_transport"` + DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"` + StrictMode bool `koanf:"strict_mode"` + // WARN: debug use only - InitResources *DGateResources `koanf:"init_resources"` - DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"` + InitResources *DGateResources `koanf:"init_resources"` } DGateTestServerConfig struct { diff --git a/internal/config/loader.go b/internal/config/loader.go index f7928e8..8e41595 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -95,7 +95,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) { shell := "/bin/sh" if shellEnv := os.Getenv("SHELL"); shellEnv != "" { shell = shellEnv - } + } resolveConfigStringPattern(data, CommandRegex, func(value string, results map[string]string) (string, error) { cmdResult, err := exec.CommandContext( ctx, shell, "-c", results["cmd"]).Output() @@ -182,6 +182,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) { } if k.Exists("admin") { kDefault(k, "admin.host", "127.0.0.1") + kDefault(k, "admin.x_forwarded_for_depth", -1) err = kRequireAll(k, "admin.port") if err != nil { return nil, err diff --git a/internal/proxy/change_log.go b/internal/proxy/change_log.go index 5a2ab94..e672398 100644 --- a/internal/proxy/change_log.go +++ b/internal/proxy/change_log.go @@ -81,7 +81,7 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) ( err = fmt.Errorf("unknown command: %s", cl.Cmd) } if err != nil { - ps.logger.Error("decoding or processing change log", zap.Error(err)) + ps.logger.Error("error processing change log", zap.Error(err)) return } } @@ -97,10 +97,8 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) ( // even if the change is a noop, we still need to update the hash changeHash, err := HashAny(ps.changeHash, cl) if err != nil { - if !ps.config.Debug { - return err - } ps.logger.Error("error updating change log hash", zap.Error(err)) + return err } else { ps.changeHash = changeHash } @@ -254,25 +252,8 @@ func (ps *ProxyState) processSecret(scrt *spec.Secret, cl *spec.ChangeLog) (err return err } -// applyChange - apply a change to the proxy state, returns a channel that will receive an error when the state has been updated -func (ps *ProxyState) applyChange(changeLog *spec.ChangeLog) <-chan error { - done := make(chan error, 1) - if changeLog == nil { - changeLog = &spec.ChangeLog{ - Cmd: spec.NoopCommand, - } - } - changeLog.SetErrorChan(done) - if err := ps.processChangeLog(changeLog, true, true); err != nil { - done <- err - } - return done -} - func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error { - // restore state change logs - logs, err := ps.store.FetchChangeLogs() - if err != nil { + if logs, err := ps.store.FetchChangeLogs(); err != nil { if err == badger.ErrKeyNotFound { ps.logger.Debug("no state change logs found in storage") } else { @@ -295,17 +276,12 @@ func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error { return err } } - if !directApply { - if err = ps.processChangeLog(nil, true, false); err != nil { - return err - } - } else { + if directApply { if err = ps.reconfigureState(false, nil); err != nil { - return nil + return err } } - // TODO: optionally compact change logs through a flag in config? if len(logs) > 1 { removed, err := ps.compactChangeLogs(logs) if err != nil { diff --git a/internal/proxy/proxy_handler.go b/internal/proxy/proxy_handler.go index b6bb6fa..ec7ac74 100644 --- a/internal/proxy/proxy_handler.go +++ b/internal/proxy/proxy_handler.go @@ -25,12 +25,24 @@ func proxyHandler(ps *ProxyState, reqCtx *RequestContext) { With( zap.String("route", reqCtx.route.Name), zap.String("namespace", reqCtx.route.Namespace.Name), + zap.String("path", reqCtx.req.URL.Path), + zap.String("method", reqCtx.req.Method), + zap.String("query", reqCtx.req.URL.RawQuery), + zap.String("protocol", reqCtx.req.Proto), + zap.String("remote_address", reqCtx.req.RemoteAddr), + zap.String("user_agent", reqCtx.req.UserAgent()), + zap.Int64("content_length", reqCtx.req.ContentLength), + zap.String("content_type", reqCtx.req.Header.Get("Content-Type")), ) if reqCtx.route.Service != nil { event = event.With(zap.String("service", reqCtx.route.Service.Name)) } - event.Debug("Request Log") + if ps.config.ProxyConfig.StrictMode { + event.Info("Request log") + } else { + event.Debug("Request log") + } }() defer ps.metrics.MeasureProxyRequest(reqCtx, time.Now()) diff --git a/internal/proxy/proxy_state.go b/internal/proxy/proxy_state.go index 383055a..16d1318 100644 --- a/internal/proxy/proxy_state.go +++ b/internal/proxy/proxy_state.go @@ -169,8 +169,7 @@ func (ps *ProxyState) ChangeHash() uint32 { func (ps *ProxyState) SetReady() { if ps.replicationEnabled && !ps.raftReady.Load() { - ps.logger.Info("Replication status is now ready after " + - time.Since(ps.startTime).String()) + ps.logger.Info("Replication status is now ready") ps.raftReady.Store(true) return } @@ -199,7 +198,10 @@ func (ps *ProxyState) SetupRaft(r *raft.Raft, rc *raft.Config) { func (ps *ProxyState) WaitForChanges() error { ps.proxyLock.RLock() defer ps.proxyLock.RUnlock() - return <-ps.applyChange(nil) + if rft := ps.Raft(); rft != nil { + return rft.Barrier(time.Second * 5).Error() + } + return nil } func (ps *ProxyState) ApplyChangeLog(log *spec.ChangeLog) error { @@ -289,7 +291,7 @@ func (ps *ProxyState) ReloadState(check bool, logs ...*spec.ChangeLog) error { } } if reload { - <-ps.applyChange(nil) + return ps.processChangeLog(nil, true, true) } return nil } @@ -572,12 +574,13 @@ func (ps *ProxyState) ServeHTTP(w http.ResponseWriter, r *http.Request) { if router, ok := ps.routers.Find(ns.Name); ok { router.ServeHTTP(w, r) } else { - ps.logger.Debug("No router found for namespace", - zap.String("namespace", ns.Name), - ) util.WriteStatusCodeError(w, http.StatusNotFound) } } else { + if ps.config.ProxyConfig.StrictMode { + closeConnection(w) + return + } ps.logger.Debug("No namespace found for request", zap.String("protocol", r.Proto), zap.String("host", r.Host), @@ -588,3 +591,12 @@ func (ps *ProxyState) ServeHTTP(w http.ResponseWriter, r *http.Request) { util.WriteStatusCodeError(w, http.StatusNotFound) } } + +func closeConnection(w http.ResponseWriter) { + if loot, ok := w.(http.Hijacker); ok { + if conn, _, err := loot.Hijack(); err == nil { + defer conn.Close() + return + } + } +} diff --git a/pkg/dgclient/common.go b/pkg/dgclient/common.go index bfe9b1f..681216d 100644 --- a/pkg/dgclient/common.go +++ b/pkg/dgclient/common.go @@ -150,5 +150,5 @@ func parseApiError(body io.Reader, wrapErr error) error { if err := json.NewDecoder(body).Decode(&apiError); err != nil || apiError.Error == "" { return wrapErr } - return fmt.Errorf("%d: %s", wrapErr, apiError.Error) + return fmt.Errorf("%s: %s", wrapErr, apiError.Error) } diff --git a/pkg/spec/response_writer_tracker.go b/pkg/spec/response_writer_tracker.go index 26dcb08..6a872ee 100644 --- a/pkg/spec/response_writer_tracker.go +++ b/pkg/spec/response_writer_tracker.go @@ -1,6 +1,8 @@ package spec import ( + "bufio" + "net" "net/http" ) @@ -17,6 +19,7 @@ type rwTracker struct { bytesWritten int64 } +var _ http.Hijacker = (*rwTracker)(nil) var _ ResponseWriterTracker = (*rwTracker)(nil) func NewResponseWriterTracker(rw http.ResponseWriter) ResponseWriterTracker { @@ -61,3 +64,11 @@ func (t *rwTracker) HeadersSent() bool { func (t *rwTracker) BytesWritten() int64 { return t.bytesWritten } + +func (t *rwTracker) Hijack() (net.Conn, *bufio.ReadWriter, error) { + hijacker, ok := t.rw.(http.Hijacker) + if !ok { + return nil, nil, http.ErrNotSupported + } + return hijacker.Hijack() +} diff --git a/pkg/util/http.go b/pkg/util/http.go index 2f8e007..f7c6ce8 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -1,7 +1,6 @@ package util import ( - "fmt" "net/http" ) @@ -9,7 +8,4 @@ func WriteStatusCodeError(w http.ResponseWriter, code int) { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("X-Content-Type-Options", "nosniff") w.WriteHeader(code) - w.Write([]byte( - fmt.Sprintf("DGate: %d %s", code, http.StatusText(code)), - )) }