Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect to the etcd in the startup phase #538

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Release Notes.
- Fix several "sync.Pool" leak issues by adding a tracker to the pool.
- Fix panic when removing a expired segment.
- Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order.
- Fix the bug that the etcd client doesn't reconnect when facing the context timeout in the startup phase.

### Documentation

Expand Down
76 changes: 62 additions & 14 deletions banyand/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package metadata

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -68,6 +71,7 @@ type clientService struct {
etcdTLSCertFile string
etcdTLSKeyFile string
endpoints []string
registryTimeout time.Duration
forceRegisterNode bool
}

Expand All @@ -84,6 +88,7 @@ func (s *clientService) FlagSet() *run.FlagSet {
fs.StringVar(&s.etcdTLSCAFile, flagEtcdTLSCAFile, "", "Trusted certificate authority")
fs.StringVar(&s.etcdTLSCertFile, flagEtcdTLSCertFile, "", "Etcd client certificate")
fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key for the etcd client certificate.")
fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 2*time.Minute, "The timeout for the node registry")
return fs
}

Expand All @@ -95,17 +100,50 @@ func (s *clientService) Validate() error {
}

func (s *clientService) PreRun(ctx context.Context) error {
var err error
s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
schema.Namespace(s.namespace),
schema.ConfigureServerEndpoints(s.endpoints),
schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword),
schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile),
)
if err != nil {
stopCh := make(chan struct{})
sn := make(chan os.Signal, 1)
l := logger.GetLogger(s.Name())
signal.Notify(sn,
syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
go func() {
select {
case si := <-sn:
logger.GetLogger(s.Name()).Info().Msgf("signal received: %s", si)
close(stopCh)
case <-s.closer.CloseNotify():
close(stopCh)
}
}()

for {
var err error
s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
schema.Namespace(s.namespace),
schema.ConfigureServerEndpoints(s.endpoints),
schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword),
schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile),
)
if errors.Is(err, context.DeadlineExceeded) {
select {
case <-stopCh:
return errors.New("pre-run interrupted")
case <-time.After(s.registryTimeout):
return errors.New("pre-run timeout")
case <-s.closer.CloseNotify():
return errors.New("pre-run interrupted")
default:
l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("the schema registry init timeout, retrying...")
time.Sleep(time.Second)
continue
}
}
if err == nil {
break
}
return err
}

val := ctx.Value(common.ContextNodeKey)
if val == nil {
return errors.New("node id is empty")
Expand All @@ -116,7 +154,6 @@ func (s *clientService) PreRun(ctx context.Context) error {
return errors.New("node roles is empty")
}
nodeRoles := val.([]databasev1.Role)
l := logger.GetLogger(s.Name())
nodeInfo := &databasev1.Node{
Metadata: &commonv1.Metadata{
Name: node.NodeID,
Expand All @@ -126,15 +163,26 @@ func (s *clientService) PreRun(ctx context.Context) error {
Roles: nodeRoles,
CreatedAt: timestamppb.Now(),
}
var cancel context.CancelFunc
for {
ctxRegister, cancel := context.WithTimeout(ctx, time.Second*10)
err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo, s.forceRegisterNode)
ctx, cancel = context.WithTimeout(ctx, time.Second*10)
err := s.schemaRegistry.RegisterNode(ctx, nodeInfo, s.forceRegisterNode)
cancel()
if errors.Is(err, schema.ErrGRPCAlreadyExists) {
return errors.Wrapf(err, "node[%s] already exists in etcd", node.NodeID)
} else if errors.Is(err, context.DeadlineExceeded) {
l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("register node timeout, retrying...")
continue
select {
case <-stopCh:
return errors.New("register node interrupted")
case <-time.After(s.registryTimeout):
return errors.New("register node timeout")
case <-s.closer.CloseNotify():
return errors.New("register node interrupted")
default:
l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("register node timeout, retrying...")
time.Sleep(time.Second)
continue
}
}
if err == nil {
l.Info().Stringer("info", nodeInfo).Msg("register node successfully")
Expand Down
1 change: 1 addition & 0 deletions scripts/push-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ cp ${PRODUCT_NAME}-*.tgz.asc skywalking/banyandb/"$VERSION"
cp ${PRODUCT_NAME}-*.tgz.sha512 skywalking/banyandb/"$VERSION"

cd skywalking/banyandb && svn add "$VERSION" && svn commit -m "Draft Apache SkyWalking BanyanDB release $VERSION"
cd "$VERSION"

cat << EOF
=========================================================================
Expand Down
Loading