Skip to content

Commit

Permalink
feat: fix raft bug, add request log, and many other nits
Browse files Browse the repository at this point in the history
  • Loading branch information
bubbajoe committed Jun 7, 2024
1 parent d577368 commit 63940c1
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 145 deletions.
22 changes: 5 additions & 17 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
- server management (start-proxy, stop-proxy, restart, status, logs, stats, etc.)
- cluster management (raft commands, replica commands, etc.) (low priority)
- other commands (backup, restore, etc.) (low priority)
- replace k6 with wrk for performance tests

## Add Module Tests

Expand All @@ -16,9 +15,6 @@
- [ ] - Add option to specify export variables when ambiguous (?)
- [ ] - check how global variable conflicts are handled

## Start using Pkl

replace dgate server config with pkl.

## dgate-cli declaritive config

Expand Down Expand Up @@ -70,10 +66,6 @@ expose metrics for the following:
- Add Transactions
- [ ] - Add transactional support for admin API

## DGate Documentation (dgate.io/docs)

Use Docusaurus to create the documentation for DGate.

## DGate Admin Console (low priority)

Admin Console is a web-based interface that can be used to manage the state of the cluster. Manage resource, view logs, stats, and more. It can also be used to develop and test modules directly in the browser.
Expand Down Expand Up @@ -136,14 +128,6 @@ A good example of a bundle would be a bundle that adds support for OAuth2 authen
Differing from common resource versioning, modules can have multiple versions that can be used at the same time. This can be used to test new versions of modules before deploying them to the cluster.


## DGate CLI - argument variable suggestions

For example, if the user types an argument that is not recognized, the CLI can suggest the correct argument by search the available arguments and finding the closest match.
```
dgate-cli ns mk my-ns nmae=my-ns
Variable 'nmae' is not recognized. Did you mean 'name'?
```

## DGate CLI - help command show required variables

When the user runs the help command, the CLI should show the required variables for the command. For example, if the user runs `dgate-cli ns mk --help`, the CLI should show the required variables for the `ns mk` command. `name` is a required variable for the `ns mk` command. Also, the CLI should show non-required variables.
Expand All @@ -159,4 +143,8 @@ Add stack tracing for typescript modules.

Currently, Raft Implementation is tightly coupled with the Admin API. This makes it difficult to change the Raft Implementation without changing the Admin API. Decouple the Raft Implementation from the Admin API to make it easier to change the Raft Implementation.

## Add Telemetry (sentry, datadog, etc.)
## Add Telemetry (sentry, datadog, etc.)

## ResourceManager callback for resource changes

Add a callback to the ResourceManager that is called when a resource is changed. This can be used to invalidate caches, update modules, and more.
1 change: 1 addition & 0 deletions functional-tests/raft_tests/test1.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: v1
debug: true
log_level: info

tags:
Expand Down
93 changes: 48 additions & 45 deletions internal/admin/admin_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,17 @@ func newDGateAdminFSM(logger *zap.Logger, cs changestate.ChangeState) *dgateAdmi
return &dgateAdminFSM{cs, logger}
}

func (fsm *dgateAdminFSM) isReplay(log *raft.Log) bool {
return !fsm.cs.Ready() &&
log.Index+1 >= fsm.cs.Raft().LastIndex() &&
log.Index+1 >= fsm.cs.Raft().AppliedIndex()
func (fsm *dgateAdminFSM) isLatestLog(log *raft.Log) bool {
rft := fsm.cs.Raft()
return log.Index == rft.CommitIndex() ||
log.Index+1 == rft.CommitIndex()
}

func (fsm *dgateAdminFSM) checkLast(log *raft.Log) {
rft := fsm.cs.Raft()
if !fsm.cs.Ready() && fsm.isReplay(log) {
fsm.logger.Info("FSM is not ready, setting ready",
zap.Uint64("index", log.Index),
zap.Uint64("applied-index", rft.AppliedIndex()),
zap.Uint64("last-index", rft.LastIndex()),
)
defer func() {
if err := fsm.cs.ReloadState(false); err != nil {
fsm.logger.Error("Error processing change log in FSM", zap.Error(err))
} else {
fsm.cs.SetReady()
}
}()
func (fsm *dgateAdminFSM) reload(cls ...*spec.ChangeLog) {
if err := fsm.cs.ReloadState(false, cls...); err != nil {
fsm.logger.Error("Error processing change log in FSM", zap.Error(err))
} else {
fsm.cs.SetReady()
}
}

Expand Down Expand Up @@ -82,43 +72,56 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) {
}

func (fsm *dgateAdminFSM) Apply(log *raft.Log) any {
defer fsm.checkLast(log)
_, err := fsm.applyLog(log)
rft := fsm.cs.Raft()
fsm.logger.Debug("applying log",
zap.Uint64("current", log.Index),
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
)
cl, err := fsm.applyLog(log)
if err != nil && !fsm.cs.Ready() {
fsm.reload(cl)
} else {
fsm.logger.Error("Error processing change log in FSM", zap.Error(err))
}
return err
}

func (fsm *dgateAdminFSM) ApplyBatch(logs []*raft.Log) []any {
lastLog := logs[len(logs)-1]
if fsm.isReplay(lastLog) {
rft := fsm.cs.Raft()
fsm.logger.Info("applying log batch logs",
zap.Int("size", len(logs)),
zap.Uint64("current", lastLog.Index),
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
)
if len(logs) == 0 || logs == nil {
fsm.logger.Warn("No logs to apply in ApplyBatch")
return nil
}
cls := make([]*spec.ChangeLog, 0, len(logs))
defer func() {
if !fsm.cs.Ready() {
fsm.checkLast(logs[len(logs)-1])
return
}

if err := fsm.cs.ReloadState(true, cls...); err != nil {
fsm.logger.Error("Error reloading state @ FSM ApplyBatch", zap.Error(err))
}
}()
lastLog := logs[len(logs)-1]
rft := fsm.cs.Raft()
fsm.logger.Info("applying batch logs",
zap.Int("size", len(logs)),
zap.Uint64("current", lastLog.Index),
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
)

cls := make([]*spec.ChangeLog, 0, len(logs))
results := make([]any, len(logs))
for i, log := range logs {
var cl *spec.ChangeLog
cl, results[i] = fsm.applyLog(log)
if cl != nil {
var (
cl *spec.ChangeLog
err error
)
if cl, err = fsm.applyLog(log); err != nil {
results[i] = err
fsm.logger.Error("Error processing change log in FSM", zap.Error(err))
} else {
cls = append(cls, cl)
}
}

if fsm.cs.Ready() || fsm.isLatestLog(lastLog) {
fsm.reload(cls...)
}

return results
}

Expand Down
55 changes: 27 additions & 28 deletions internal/admin/admin_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,16 @@ func configureRoutes(
allowed, err := ipList.Contains(remoteHost)
if !allowed && adminConfig.XForwardedForDepth > 0 {
xForwardedForIps := r.Header.Values("X-Forwarded-For")
count := min(adminConfig.XForwardedForDepth, len(xForwardedForIps))
for i := 0; i < count; i++ {
allowed, err = ipList.Contains(xForwardedForIps[i])
if err != nil {
logger.Error("error checking x-forwarded-for ip",
zap.Error(err),
)
if conf.Debug {
http.Error(w, "Bad Request: could not parse x-forwarded-for IP address", http.StatusBadRequest)
}
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
if allowed {
break
if adminConfig.XForwardedForDepth >= len(xForwardedForIps) {
depth := min(adminConfig.XForwardedForDepth, len(xForwardedForIps))
targetIp := xForwardedForIps[len(xForwardedForIps)-depth]
allowed, err = ipList.Contains(targetIp)
if err == nil && allowed {
remoteHost = targetIp
} else {
allowed = false
}

}
}

Expand Down Expand Up @@ -145,17 +139,18 @@ func configureRoutes(
return
}
}
raftInstance := cs.Raft()
if r.Method == http.MethodPut && raftInstance != nil {
leader := raftInstance.Leader()
if leader == "" {
util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader")
return
}
if raftInstance.State() != raft.Leader {
r.URL.Host = string(leader)
http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect)
return
if raftInstance := cs.Raft(); raftInstance != nil {
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
leader := raftInstance.Leader()
if leader == "" {
util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader")
return
}
if raftInstance.State() != raft.Leader {
r.URL.Host = string(leader)
http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect)
return
}
}
}

Expand All @@ -165,10 +160,14 @@ func configureRoutes(

server.Get("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.Header().Set("X-DGate-Raft", fmt.Sprintf("%t", cs.Raft() != nil))
w.Header().Set("X-DGate-WatchOnly", fmt.Sprintf("%t", adminConfig.WatchOnly))
w.Header().Set("X-DGate-ChangeHash", fmt.Sprintf("%d", cs.ChangeHash()))
w.Header().Set("X-DGate-AdminAPI", "true")
if raftInstance := cs.Raft(); raftInstance != nil {
w.Header().Set(
"X-DGate-Raft-State",
raftInstance.State().String(),
)
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("DGate Admin API"))
}))
Expand Down
26 changes: 14 additions & 12 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,21 @@ type (
}

DGateProxyConfig struct {
Host string `koanf:"host"`
Port int `koanf:"port"`
TLS *DGateTLSConfig `koanf:"tls"`
EnableH2C bool `koanf:"enable_h2c"`
EnableHTTP2 bool `koanf:"enable_http2"`
EnableConsoleLogger bool `koanf:"enable_console_logger"`
RedirectHttpsDomains []string `koanf:"redirect_https"`
AllowedDomains []string `koanf:"allowed_domains"`
GlobalHeaders map[string]string `koanf:"global_headers"`
Transport DGateHttpTransportConfig `koanf:"client_transport"`
Host string `koanf:"host"`
Port int `koanf:"port"`
TLS *DGateTLSConfig `koanf:"tls"`
EnableH2C bool `koanf:"enable_h2c"`
EnableHTTP2 bool `koanf:"enable_http2"`
EnableConsoleLogger bool `koanf:"enable_console_logger"`
RedirectHttpsDomains []string `koanf:"redirect_https"`
AllowedDomains []string `koanf:"allowed_domains"`
GlobalHeaders map[string]string `koanf:"global_headers"`
Transport DGateHttpTransportConfig `koanf:"client_transport"`
DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"`
StrictMode bool `koanf:"strict_mode"`

// WARN: debug use only
InitResources *DGateResources `koanf:"init_resources"`
DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"`
InitResources *DGateResources `koanf:"init_resources"`
}

DGateTestServerConfig struct {
Expand Down
3 changes: 2 additions & 1 deletion internal/config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) {
shell := "/bin/sh"
if shellEnv := os.Getenv("SHELL"); shellEnv != "" {
shell = shellEnv
}
}
resolveConfigStringPattern(data, CommandRegex, func(value string, results map[string]string) (string, error) {
cmdResult, err := exec.CommandContext(
ctx, shell, "-c", results["cmd"]).Output()
Expand Down Expand Up @@ -182,6 +182,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) {
}
if k.Exists("admin") {
kDefault(k, "admin.host", "127.0.0.1")
kDefault(k, "admin.x_forwarded_for_depth", -1)
err = kRequireAll(k, "admin.port")
if err != nil {
return nil, err
Expand Down
34 changes: 5 additions & 29 deletions internal/proxy/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) (
err = fmt.Errorf("unknown command: %s", cl.Cmd)
}
if err != nil {
ps.logger.Error("decoding or processing change log", zap.Error(err))
ps.logger.Error("error processing change log", zap.Error(err))
return
}
}
Expand All @@ -97,10 +97,8 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) (
// even if the change is a noop, we still need to update the hash
changeHash, err := HashAny(ps.changeHash, cl)
if err != nil {
if !ps.config.Debug {
return err
}
ps.logger.Error("error updating change log hash", zap.Error(err))
return err
} else {
ps.changeHash = changeHash
}
Expand Down Expand Up @@ -254,25 +252,8 @@ func (ps *ProxyState) processSecret(scrt *spec.Secret, cl *spec.ChangeLog) (err
return err
}

// applyChange - apply a change to the proxy state, returns a channel that will receive an error when the state has been updated
func (ps *ProxyState) applyChange(changeLog *spec.ChangeLog) <-chan error {
done := make(chan error, 1)
if changeLog == nil {
changeLog = &spec.ChangeLog{
Cmd: spec.NoopCommand,
}
}
changeLog.SetErrorChan(done)
if err := ps.processChangeLog(changeLog, true, true); err != nil {
done <- err
}
return done
}

func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error {
// restore state change logs
logs, err := ps.store.FetchChangeLogs()
if err != nil {
if logs, err := ps.store.FetchChangeLogs(); err != nil {
if err == badger.ErrKeyNotFound {
ps.logger.Debug("no state change logs found in storage")
} else {
Expand All @@ -295,17 +276,12 @@ func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error {
return err
}
}
if !directApply {
if err = ps.processChangeLog(nil, true, false); err != nil {
return err
}
} else {
if directApply {
if err = ps.reconfigureState(false, nil); err != nil {
return nil
return err
}
}

// TODO: optionally compact change logs through a flag in config?
if len(logs) > 1 {
removed, err := ps.compactChangeLogs(logs)
if err != nil {
Expand Down
Loading

0 comments on commit 63940c1

Please sign in to comment.