Skip to content

Commit

Permalink
fix snapshot issue
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaji-kharse committed Oct 19, 2023
1 parent 283fe63 commit 3245c3d
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 36 deletions.
42 changes: 25 additions & 17 deletions dgraphtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,25 @@ func AllUpgradeCombos(v20 bool) []UpgradeCombo {

// ClusterConfig stores all config for setting up a dgraph cluster
type ClusterConfig struct {
prefix string
numAlphas int
numZeros int
replicas int
verbosity int
acl bool
aclTTL time.Duration
aclAlg jwt.SigningMethod
encryption bool
version string
volumes map[string]string
refillInterval time.Duration
uidLease int
portOffset int // exposed port offset for grpc/http port for both alpha/zero
bulkOutDir string
featureFlags []string
customPlugins bool
prefix string
numAlphas int
numZeros int
replicas int
verbosity int
acl bool
aclTTL time.Duration
aclAlg jwt.SigningMethod
encryption bool
version string
volumes map[string]string
refillInterval time.Duration
uidLease int
portOffset int // exposed port offset for grpc/http port for both alpha/zero
bulkOutDir string
featureFlags []string
customPlugins bool
snapShotAfterEntries uint64
snapshotAfterDuration time.Duration
}

// NewClusterConfig generates a default ClusterConfig
Expand Down Expand Up @@ -237,3 +239,9 @@ func (cc ClusterConfig) WithCustomPlugins() ClusterConfig {
func (cc ClusterConfig) GetClusterVolume(volume string) string {
return cc.volumes[volume]
}

func (cc ClusterConfig) WithSnapshotConfig(snapShotAfterEntries uint64, snapshotAfterDuration time.Duration) ClusterConfig {

Check failure on line 243 in dgraphtest/config.go

View workflow job for this annotation

GitHub Actions / lint

line is 124 characters (lll)
cc.snapShotAfterEntries = snapShotAfterEntries
cc.snapshotAfterDuration = snapshotAfterDuration
return cc
}
17 changes: 17 additions & 0 deletions dgraphtest/dgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,15 @@ type dnode interface {
assignURL(*LocalCluster) (string, error)
alphaURL(*LocalCluster) (string, error)
zeroURL(*LocalCluster) (string, error)
changeStatus(bool)
}

type zero struct {
id int // 0, 1, 2
containerID string // container ID in docker world
containerName string // something like test-1234_zero2
aliasName string // something like alpha0, zero1
isRunning bool
}

func (z *zero) cname() string {
Expand Down Expand Up @@ -175,6 +177,10 @@ func (z *zero) healthURL(c *LocalCluster) (string, error) {
return "http://localhost:" + publicPort + "/health", nil
}

func (z *zero) changeStatus(isRunning bool) {
z.isRunning = isRunning
}

func (z *zero) assignURL(c *LocalCluster) (string, error) {
publicPort, err := publicPort(c.dcli, z, zeroHttpPort)
if err != nil {
Expand All @@ -200,6 +206,7 @@ type alpha struct {
containerID string
containerName string
aliasName string
isRunning bool
}

func (a *alpha) cname() string {
Expand Down Expand Up @@ -283,6 +290,12 @@ func (a *alpha) cmd(c *LocalCluster) []string {
acmd = append(acmd, fmt.Sprintf("--custom_tokenizers=%s", c.customTokenizers))
}

if c.conf.snapShotAfterEntries != 0 {
acmd = append(acmd, fmt.Sprintf("--raft=%s",
fmt.Sprintf(`snapshot-after-entries=%v;snapshot-after-duration=%v;`,
c.conf.snapShotAfterEntries, c.conf.snapshotAfterDuration)))
}

return acmd
}

Expand Down Expand Up @@ -351,6 +364,10 @@ func (a *alpha) alphaURL(c *LocalCluster) (string, error) {
return "localhost:" + publicPort + "", nil
}

func (a *alpha) changeStatus(isRunning bool) {
a.isRunning = isRunning
}

func (a *alpha) zeroURL(c *LocalCluster) (string, error) {
return "", errNotImplemented
}
Expand Down
15 changes: 15 additions & 0 deletions dgraphtest/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/pkg/errors"

"github.com/dgraph-io/dgo/v230/protos/api"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/x"
)
Expand Down Expand Up @@ -493,3 +494,17 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
return nil
}
}

// AddData will insert a total of end-start triples into the database.
func AddData(gc *GrpcClient, pred string, start, end int) error {
if err := gc.SetupSchema(fmt.Sprintf(`%v: string @index(exact) .`, pred)); err != nil {
return err
}

rdf := ""
for i := start; i <= end; i++ {
rdf = rdf + fmt.Sprintf("_:a%v <%v> \"%v%v\" . \n", i, pred, pred, i)
}
_, err := gc.Mutate(&api.Mutation{SetNquads: []byte(rdf), CommitNow: true})
return err
}
15 changes: 13 additions & 2 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func (c *LocalCluster) startContainer(dc dnode) error {
if err := c.dcli.ContainerStart(ctx, dc.cid(), types.ContainerStartOptions{}); err != nil {
return errors.Wrapf(err, "error starting container [%v]", dc.cname())
}
dc.changeStatus(true)
return nil
}

Expand Down Expand Up @@ -398,6 +399,7 @@ func (c *LocalCluster) stopContainer(dc dnode) error {
}
return errors.Wrapf(err, "error stopping container [%v]", dc.cname())
}
dc.changeStatus(false)
return nil
}

Expand All @@ -420,6 +422,9 @@ func (c *LocalCluster) killContainer(dc dnode) error {
func (c *LocalCluster) HealthCheck(zeroOnly bool) error {
log.Printf("[INFO] checking health of containers")
for _, zo := range c.zeros {
if !zo.isRunning {
break
}
url, err := zo.healthURL(c)
if err != nil {
return errors.Wrap(err, "error getting health URL")
Expand All @@ -438,6 +443,9 @@ func (c *LocalCluster) HealthCheck(zeroOnly bool) error {
}

for _, aa := range c.alphas {
if !aa.isRunning {
break
}
url, err := aa.healthURL(c)
if err != nil {
return errors.Wrap(err, "error getting health URL")
Expand Down Expand Up @@ -688,8 +696,11 @@ func (c *LocalCluster) Client() (*GrpcClient, func(), error) {
// TODO(aman): can we cache the connections?
var apiClients []api.DgraphClient
var conns []*grpc.ClientConn
for i := 0; i < c.conf.numAlphas; i++ {
url, err := c.alphas[i].alphaURL(c)
for _, aa := range c.alphas {
if !aa.isRunning {
break
}
url, err := aa.alphaURL(c)
if err != nil {
return nil, nil, errors.Wrap(err, "error getting health URL")
}
Expand Down
73 changes: 73 additions & 0 deletions dgraphtest/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dgraphtest

import (
"encoding/json"
"time"

"github.com/pkg/errors"
)

func (hc *HTTPClient) GetCurrentSnapshotTs(group uint64) (uint64, error) {
snapTsRequest := `query {
state {
groups {
id
snapshotTs
}
}
}`
params := GraphQLParams{
Query: snapTsRequest,
}
resp, err := hc.RunGraphqlQuery(params, true)
if err != nil {
return 0, err
}

var stateResp struct {
State struct {
Groups []struct {
SnapshotTs uint64
}
}
}

err = json.Unmarshal(resp, &stateResp)
if err != nil {
return 0, err
}

return stateResp.State.Groups[group-1].SnapshotTs, nil
}

func (hc *HTTPClient) WaitForSnapshot(group, prevSnapshotTs uint64) (uint64, error) {

for i := 1; i <= 100; i++ {
currentSnapshotTs, err := hc.GetCurrentSnapshotTs(group)
if err != nil {
errors.Wrapf(err, "error while getting current snapshot timestamp: %v", err)

Check failure on line 64 in dgraphtest/snapshot.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `errors.Wrapf` is not checked (errcheck)
}
if currentSnapshotTs > prevSnapshotTs {
return currentSnapshotTs, nil
}

time.Sleep(time.Second)
}
return 0, errors.New("timeout excedded")
}
3 changes: 3 additions & 0 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte
return nil
}

// TODO(Aman): In the raft example here, we store the snapshot first, followed by entries
// and then, hard state. https://github.com/etcd-io/etcd/blob/main/contrib/raftexample/raft.go
// We should do the same here.
// Save would write Entries, HardState and Snapshot to persistent storage in order, i.e. Entries
// first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic
// writes then all of them can be written together. Note that when writing an Entry with Index i,
Expand Down
68 changes: 68 additions & 0 deletions systest/integration2/snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//go:build integration2

/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"testing"
"time"

"github.com/dgraph-io/dgraph/dgraphtest"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
)

func TestSnapshotTranferAfterNewNodeJoins(t *testing.T) {
conf := dgraphtest.NewClusterConfig().WithNumAlphas(3).WithNumZeros(1).
WithACL(time.Hour).WithReplicas(3).WithSnapshotConfig(11, time.Second)
c, err := dgraphtest.NewLocalCluster(conf)
require.NoError(t, err)
defer func() { c.Cleanup(t.Failed()) }()

// start zero
require.NoError(t, c.StartZero(0))
require.NoError(t, c.HealthCheck(true))
require.NoError(t, c.StartAlpha(0))
require.NoError(t, c.HealthCheck(false))

hc, err := c.HTTPClient()
require.NoError(t, err)
hc.LoginIntoNamespace(dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)

gc, cleanup, err := c.Client()
require.NoError(t, err)
defer cleanup()
require.NoError(t, gc.LoginIntoNamespace(context.Background(),
dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace))

prevSnapshotTs, err := hc.GetCurrentSnapshotTs(1)
require.NoError(t, err)

dgraphtest.AddData(gc, "name", 1, 20)

_, err = hc.WaitForSnapshot(1, prevSnapshotTs)
require.NoError(t, err)

require.NoError(t, c.StartAlpha(1))
require.NoError(t, c.StartAlpha(2))

// Wait for the other alpha nodes to receive the snapshot from the leader alpha.
// If they are healthy, there should be no issues with the snapshot streaming
time.Sleep(time.Second)
require.NoError(t, c.HealthCheck(false))
}
Loading

0 comments on commit 3245c3d

Please sign in to comment.