Skip to content

Commit

Permalink
Fix/state issues (#12)
Browse files Browse the repository at this point in the history
* fix: persistent/fsm issues

* fix!: apply fixes from feat/fix-raft-bug branch
  • Loading branch information
bubbajoe authored Jun 13, 2024
1 parent d577368 commit 9bcbce4
Show file tree
Hide file tree
Showing 31 changed files with 300 additions and 283 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/discord.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
on:
release:
types: [published]

jobs:
github-releases-to-discord:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Github Releases To Discord
uses: SethCohen/[email protected]
with:
webhook_url: ${{ secrets.DISCORD_WEBHOOK_URL }}
color: "2105893"
username: "Release Changelog"
avatar_url: "https://github.com/dgate-io.png"
content: "||@everyone||"
footer_title: "Changelog"
footer_timestamp: true
22 changes: 5 additions & 17 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
- server management (start-proxy, stop-proxy, restart, status, logs, stats, etc.)
- cluster management (raft commands, replica commands, etc.) (low priority)
- other commands (backup, restore, etc.) (low priority)
- replace k6 with wrk for performance tests

## Add Module Tests

Expand All @@ -16,9 +15,6 @@
- [ ] - Add option to specify export variables when ambiguous (?)
- [ ] - check how global variable conflicts are handled

## Start using Pkl

replace dgate server config with pkl.

## dgate-cli declaritive config

Expand Down Expand Up @@ -70,10 +66,6 @@ expose metrics for the following:
- Add Transactions
- [ ] - Add transactional support for admin API

## DGate Documentation (dgate.io/docs)

Use Docusaurus to create the documentation for DGate.

## DGate Admin Console (low priority)

Admin Console is a web-based interface that can be used to manage the state of the cluster. Manage resource, view logs, stats, and more. It can also be used to develop and test modules directly in the browser.
Expand Down Expand Up @@ -136,14 +128,6 @@ A good example of a bundle would be a bundle that adds support for OAuth2 authen
Differing from common resource versioning, modules can have multiple versions that can be used at the same time. This can be used to test new versions of modules before deploying them to the cluster.


## DGate CLI - argument variable suggestions

For example, if the user types an argument that is not recognized, the CLI can suggest the correct argument by search the available arguments and finding the closest match.
```
dgate-cli ns mk my-ns nmae=my-ns
Variable 'nmae' is not recognized. Did you mean 'name'?
```

## DGate CLI - help command show required variables

When the user runs the help command, the CLI should show the required variables for the command. For example, if the user runs `dgate-cli ns mk --help`, the CLI should show the required variables for the `ns mk` command. `name` is a required variable for the `ns mk` command. Also, the CLI should show non-required variables.
Expand All @@ -159,4 +143,8 @@ Add stack tracing for typescript modules.

Currently, Raft Implementation is tightly coupled with the Admin API. This makes it difficult to change the Raft Implementation without changing the Admin API. Decouple the Raft Implementation from the Admin API to make it easier to change the Raft Implementation.

## Add Telemetry (sentry, datadog, etc.)
## Add Telemetry (sentry, datadog, etc.)

## ResourceManager callback for resource changes

Add a callback to the ResourceManager that is called when a resource is changed. This can be used to invalidate caches, update modules, and more.
6 changes: 3 additions & 3 deletions functional-tests/raft_tests/raft_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dgate-cli -f domain create name=dm-$id \

dgate-cli -f service create \
name=svc-$id namespace=ns-$id \
urls="http://localhost:8888/$RANDOM"
urls="http://localhost:8081/$RANDOM"

dgate-cli -f route create \
name=rt-$id \
Expand All @@ -55,14 +55,14 @@ for i in {1..5}; do
done

if dgate-cli --admin $ADMIN_URL4 namespace create name=0; then
echo "Expected error when creating namespace"
echo "Expected error when creating namespace on non-voter"
exit 1
fi

export DGATE_ADMIN_API=$ADMIN_URL5

if dgate-cli --admin $ADMIN_URL5 namespace create name=0; then
echo "Expected error when creating namespace"
echo "Expected error when creating namespace on non-voter"
exit 1
fi

Expand Down
2 changes: 1 addition & 1 deletion functional-tests/raft_tests/test1.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: v1
log_level: info

debug: true
tags:
- "dev"
- "internal"
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
github.com/knadh/koanf/v2 v2.0.1
github.com/mitchellh/mapstructure v1.5.0
github.com/prometheus/client_golang v1.19.0
github.com/rs/zerolog v1.31.0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/spf13/pflag v1.0.5
github.com/stoewer/go-strcase v1.3.0
Expand All @@ -31,8 +30,8 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.48.0
go.opentelemetry.io/otel/metric v1.26.0
go.opentelemetry.io/otel/sdk/metric v1.26.0
go.uber.org/zap v1.27.0
golang.org/x/net v0.21.0
golang.org/x/sync v0.6.0
golang.org/x/term v0.19.0
)

Expand Down Expand Up @@ -81,7 +80,6 @@ require (
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ github.com/clarkmcc/go-typescript v0.7.0 h1:3nVeaPYyTCWjX6Lf8GoEOTxME2bM5tLuWmwh
github.com/clarkmcc/go-typescript v0.7.0/go.mod h1:IZ/nzoVeydAmyfX7l6Jmp8lJDOEnae3jffoXwP4UyYg=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -78,7 +77,6 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre
github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
Expand Down Expand Up @@ -228,9 +226,6 @@ github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.11.1-0.20231026093722-fa6a31e0812c h1:fPpdjePK1atuOg28PXfNSqgwf9I/qD1Hlo39JFwKBXk=
github.com/rogpeppe/go-internal v1.11.1-0.20231026093722-fa6a31e0812c/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
Expand Down Expand Up @@ -278,6 +273,8 @@ go.opentelemetry.io/otel/sdk/metric v1.26.0 h1:cWSks5tfriHPdWFnl+qpX3P681aAYqlZH
go.opentelemetry.io/otel/sdk/metric v1.26.0/go.mod h1:ClMFFknnThJCksebJwz7KIyEDHO+nTB6gK8obLy8RyE=
go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA=
go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
Expand Down Expand Up @@ -317,8 +314,6 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -340,7 +335,6 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
87 changes: 24 additions & 63 deletions internal/admin/admin_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,21 @@ import (
type dgateAdminFSM struct {
cs changestate.ChangeState
logger *zap.Logger
index uint64
}

var _ raft.BatchingFSM = (*dgateAdminFSM)(nil)

func newDGateAdminFSM(logger *zap.Logger, cs changestate.ChangeState) *dgateAdminFSM {
return &dgateAdminFSM{cs, logger}
return &dgateAdminFSM{cs, logger, 0}
}

func (fsm *dgateAdminFSM) isReplay(log *raft.Log) bool {
return !fsm.cs.Ready() &&
log.Index+1 >= fsm.cs.Raft().LastIndex() &&
log.Index+1 >= fsm.cs.Raft().AppliedIndex()
func (fsm *dgateAdminFSM) SetIndex(index uint64) {
fsm.index = index
}

func (fsm *dgateAdminFSM) checkLast(log *raft.Log) {
rft := fsm.cs.Raft()
if !fsm.cs.Ready() && fsm.isReplay(log) {
fsm.logger.Info("FSM is not ready, setting ready",
zap.Uint64("index", log.Index),
zap.Uint64("applied-index", rft.AppliedIndex()),
zap.Uint64("last-index", rft.LastIndex()),
)
defer func() {
if err := fsm.cs.ReloadState(false); err != nil {
fsm.logger.Error("Error processing change log in FSM", zap.Error(err))
} else {
fsm.cs.SetReady()
}
}()
}
}

func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) {
func (fsm *dgateAdminFSM) applyLog(log *raft.Log, replay bool) (*spec.ChangeLog, error) {
log.Index = fsm.index
switch log.Type {
case raft.LogCommand:
var cl spec.ChangeLog
Expand All @@ -58,10 +40,8 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) {
fsm.logger.Error("Change log ID is empty", zap.Error(err))
panic("change log ID is empty")
}
// find a way to apply only if latest index to save time
return &cl, fsm.cs.ProcessChangeLog(&cl, false)
case raft.LogNoop:
fsm.logger.Debug("Noop Log - current leader is still leader")
// find a way to only reload if latest index to save time
return &cl, fsm.cs.ProcessChangeLog(&cl, replay)
case raft.LogConfiguration:
servers := raft.DecodeConfiguration(log.Data).Servers
for i, server := range servers {
Expand All @@ -70,54 +50,35 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) {
zap.Int("index", i),
)
}
case raft.LogBarrier:
err := fsm.cs.WaitForChanges()
if err != nil {
fsm.logger.Error("Error waiting for changes", zap.Error(err))
}
default:
fsm.logger.Error("Unknown log type in FSM Apply")
}
return nil, nil
}

func (fsm *dgateAdminFSM) Apply(log *raft.Log) any {
defer fsm.checkLast(log)
_, err := fsm.applyLog(log)
_, err := fsm.applyLog(log, true)
return err
}

func (fsm *dgateAdminFSM) ApplyBatch(logs []*raft.Log) []any {
lastLog := logs[len(logs)-1]
if fsm.isReplay(lastLog) {
rft := fsm.cs.Raft()
fsm.logger.Info("applying log batch logs",
zap.Int("size", len(logs)),
zap.Uint64("current", lastLog.Index),
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
)
}
cls := make([]*spec.ChangeLog, 0, len(logs))
defer func() {
if !fsm.cs.Ready() {
fsm.checkLast(logs[len(logs)-1])
return
}

if err := fsm.cs.ReloadState(true, cls...); err != nil {
fsm.logger.Error("Error reloading state @ FSM ApplyBatch", zap.Error(err))
}
}()

rft := fsm.cs.Raft()
lastIndex := len(logs) - 1
fsm.logger.Debug("apply log batch",
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
zap.Uint64("fsmLastIndex", fsm.index),
zap.Uint64("log[0]", logs[0].Index),
zap.Uint64("log[-1]", logs[lastIndex].Index),
zap.Int("logs", len(logs)),
)
results := make([]any, len(logs))
for i, log := range logs {
var cl *spec.ChangeLog
cl, results[i] = fsm.applyLog(log)
if cl != nil {
cls = append(cls, cl)
}
// TODO: check to see if this can be optimized channels raft node provides
_, results[i] = fsm.applyLog(
log, lastIndex == i,
)
}
return results
}
Expand Down
Loading

0 comments on commit 9bcbce4

Please sign in to comment.