Skip to content

Commit

Permalink
refactor: improve config check
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Aug 4, 2023
1 parent b97d6bd commit 94d1c0a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 101 deletions.
12 changes: 5 additions & 7 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,18 @@ func main() {
return
}

if len(args) < 1 {
fmt.Printf("fatal: no data directory set\n")
os.Exit(1)
return
}

// Ensure no args come after the data directory.
if len(args) > 1 {
fmt.Printf("fatal: arguments after data directory are not accepted\n")
os.Exit(1)
return
}

cfg.DataPath = args[0]
dataPath := ""
if len(args) == 1 {
dataPath = args[0]
}
cfg.DataPath = dataPath

s, err := server.NewServer(&cfg)
if err != nil {
Expand Down
93 changes: 49 additions & 44 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ import (
)

type Server struct {
cfg *CmdConfig
grpcd *grpc.Server
str *store.Store
pprofLn net.Listener
logger *zap.Logger
closed atomic.Bool
cfg *CmdConfig
grpcd *grpc.Server
str *store.Store
strConfig *store.StoreConfig
pprofLn net.Listener
logger *zap.Logger
closed atomic.Bool
}

func NewServer(cfg *CmdConfig) (*Server, error) {
Expand All @@ -64,6 +65,45 @@ func NewServer(cfg *CmdConfig) (*Server, error) {
cfg: cfg,
}

if cfg.DataPath == "" {
return nil, errors.New("no data directory set")
}

cfg.DataPath, err = filepath.Abs(cfg.DataPath)
if err != nil {
return nil, fmt.Errorf("failed to determine absolute data path: %w", err)
}

server.strConfig = &store.StoreConfig{
Dir: cfg.DataPath,
ID: idOrRaftAddr(cfg),
Logger: server.logger,
}
server.strConfig.SnapshotInterval, err = time.ParseDuration(cfg.RaftSnapInterval)
if err != nil {
return nil, fmt.Errorf("failed to parse Raft Snapsnot interval: %w", err)
}
server.strConfig.LeaderLeaseTimeout, err = time.ParseDuration(cfg.RaftLeaderLeaseTimeout)
if err != nil {
return nil, fmt.Errorf("failed to parse Raft Leader lease timeout: %w", err)
}
server.strConfig.HeartbeatTimeout, err = time.ParseDuration(cfg.RaftHeartbeatTimeout)
if err != nil {
return nil, fmt.Errorf("failed to parse Raft heartbeat timeout: %w", err)
}
server.strConfig.ElectionTimeout, err = time.ParseDuration(cfg.RaftElectionTimeout)
if err != nil {
return nil, fmt.Errorf("failed to parse Raft election timeout: %w", err)
}
server.strConfig.ApplyTimeout, err = time.ParseDuration(cfg.RaftApplyTimeout)
if err != nil {
return nil, fmt.Errorf("failed to parse Raft apply timeout: %w", err)
}

server.strConfig.RaftLogLevel = cfg.RaftLogLevel
server.strConfig.ShutdownOnRemove = cfg.RaftShutdownOnRemove
server.strConfig.SnapshotThreshold = cfg.RaftSnapThreshold

return server, nil
}

Expand Down Expand Up @@ -140,12 +180,6 @@ func (s *Server) Start() error {
}
raftTransport = store.NewTransportFromListener(s.logger, raftLn, raftTlsConfig)

// Create and open the store.
cfg.DataPath, err = filepath.Abs(cfg.DataPath)
if err != nil {
return fmt.Errorf("failed to determine absolute data path: %w", err)
}

authType := auth.Noop
var credentialsStore *auth.CredentialsStore
if cfg.EnableAuth {
Expand All @@ -158,38 +192,9 @@ func (s *Server) Start() error {
}
}

s.str = store.New(raftTransport, &store.StoreConfig{
Dir: cfg.DataPath,
ID: idOrRaftAddr(cfg),
AuthType: authType,
CredentialsStore: credentialsStore,
Logger: s.logger,
})

// Set optional parameters on store.
s.str.RaftLogLevel = cfg.RaftLogLevel
s.str.ShutdownOnRemove = cfg.RaftShutdownOnRemove
s.str.SnapshotThreshold = cfg.RaftSnapThreshold
s.str.SnapshotInterval, err = time.ParseDuration(cfg.RaftSnapInterval)
if err != nil {
return fmt.Errorf("failed to parse Raft Snapsnot interval: %w", err)
}
s.str.LeaderLeaseTimeout, err = time.ParseDuration(cfg.RaftLeaderLeaseTimeout)
if err != nil {
return fmt.Errorf("failed to parse Raft Leader lease timeout: %w", err)
}
s.str.HeartbeatTimeout, err = time.ParseDuration(cfg.RaftHeartbeatTimeout)
if err != nil {
return fmt.Errorf("failed to parse Raft heartbeat timeout: %w", err)
}
s.str.ElectionTimeout, err = time.ParseDuration(cfg.RaftElectionTimeout)
if err != nil {
return fmt.Errorf("failed to parse Raft election timeout: %w", err)
}
s.str.ApplyTimeout, err = time.ParseDuration(cfg.RaftApplyTimeout)
if err != nil {
return fmt.Errorf("failed to parse Raft apply timeout: %w", err)
}
s.strConfig.AuthType = authType
s.strConfig.CredentialsStore = credentialsStore
s.str = store.New(raftTransport, s.strConfig)

// Any prexisting node state?
var enableBootstrap bool
Expand Down
11 changes: 11 additions & 0 deletions server/store/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package store

import (
"time"

"github.com/casbin/casbin-mesh/server/auth"
"go.uber.org/zap"
)
Expand All @@ -28,4 +30,13 @@ type StoreConfig struct {
AuthType auth.AuthType
*auth.CredentialsStore
AdvAddr string

RaftLogLevel string
ShutdownOnRemove bool
SnapshotThreshold uint64
SnapshotInterval time.Duration
LeaderLeaseTimeout time.Duration
HeartbeatTimeout time.Duration
ElectionTimeout time.Duration
ApplyTimeout time.Duration
}
86 changes: 36 additions & 50 deletions server/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,12 @@ const (

// Store is casbin memory data, where all changes are made via Raft consensus.
type Store struct {
authCredStore *auth.CredentialsStore
authType auth.AuthType
raftDir string
rootUsername string
raft *raft.Raft // The consensus mechanism.
ln Listener
raftTn *raft.NetworkTransport
raftID string // Node ID.
raftDir string
rootUsername string
raft *raft.Raft // The consensus mechanism.
ln Listener
raftTn *raft.NetworkTransport
raftID string // Node ID.

raftLog raft.LogStore // Persistent log store.
raftStable raft.StableStore // Persistent k-v store.
Expand All @@ -147,28 +145,20 @@ type Store struct {
enforcersState *adapter.BadgerStore
logger *zap.Logger

ShutdownOnRemove bool
SnapshotThreshold uint64
SnapshotInterval time.Duration
LeaderLeaseTimeout time.Duration
HeartbeatTimeout time.Duration
ElectionTimeout time.Duration
ApplyTimeout time.Duration
RaftLogLevel string

cfg *StoreConfig
numTrailingLogs uint64
}

func (s *Store) AuthType() auth.AuthType {
return s.authType
return s.cfg.AuthType
}

// Check validates username and password
func (s *Store) Check(username, password string) bool {
if s.authCredStore == nil {
if s.cfg.CredentialsStore == nil {
return false
}
return s.authCredStore.Check(username, password)
return s.cfg.CredentialsStore.Check(username, password)
}

// IsNewNode returns whether a node using raftDir would be a brand new node.
Expand All @@ -187,24 +177,20 @@ func New(ln Listener, c *StoreConfig) *Store {
}

store := &Store{
ln: ln,
raftDir: c.Dir,
raftID: c.ID,
meta: make(map[string]map[string]string),
logger: logger,
ApplyTimeout: applyTimeout,
authType: c.AuthType,
authCredStore: c.CredentialsStore,
ln: ln,
raftDir: c.Dir,
raftID: c.ID,
meta: make(map[string]map[string]string),
logger: logger,
cfg: c,
}
logger.Info("store is ready")

return store

}

// InitRoot init a root account
func (s *Store) InitRoot(username, password string) error {
err := s.authCredStore.Add(username, password)
err := s.cfg.CredentialsStore.Add(username, password)
if err != nil {
return err
}
Expand All @@ -231,7 +217,7 @@ func (s *Store) Open(enableBootstrap bool) error {
s.raftTn = raft.NewNetworkTransport(NewTransport(s.ln), connectionPoolCount, connectionTimeout, nil)

// Don't allow control over trailing logs directly, just implement a policy.
s.numTrailingLogs = uint64(float64(s.SnapshotThreshold) * trailingScale)
s.numTrailingLogs = uint64(float64(s.cfg.SnapshotThreshold) * trailingScale)

config := s.raftConfig()
config.LocalID = raft.ServerID(s.raftID)
Expand Down Expand Up @@ -300,23 +286,23 @@ func (s *Store) Open(enableBootstrap bool) error {
// raftConfig returns a new Raft config for the store.
func (s *Store) raftConfig() *raft.Config {
config := raft.DefaultConfig()
config.ShutdownOnRemove = s.ShutdownOnRemove
config.LogLevel = s.RaftLogLevel
if s.SnapshotThreshold != 0 {
config.SnapshotThreshold = s.SnapshotThreshold
config.ShutdownOnRemove = s.cfg.ShutdownOnRemove
config.LogLevel = s.cfg.RaftLogLevel
if s.cfg.SnapshotThreshold != 0 {
config.SnapshotThreshold = s.cfg.SnapshotThreshold
config.TrailingLogs = s.numTrailingLogs
}
if s.SnapshotInterval != 0 {
config.SnapshotInterval = s.SnapshotInterval
if s.cfg.SnapshotInterval != 0 {
config.SnapshotInterval = s.cfg.SnapshotInterval
}
if s.LeaderLeaseTimeout != 0 {
config.LeaderLeaseTimeout = s.LeaderLeaseTimeout
if s.cfg.LeaderLeaseTimeout != 0 {
config.LeaderLeaseTimeout = s.cfg.LeaderLeaseTimeout
}
if s.HeartbeatTimeout != 0 {
config.HeartbeatTimeout = s.HeartbeatTimeout
if s.cfg.HeartbeatTimeout != 0 {
config.HeartbeatTimeout = s.cfg.HeartbeatTimeout
}
if s.ElectionTimeout != 0 {
config.ElectionTimeout = s.ElectionTimeout
if s.cfg.ElectionTimeout != 0 {
config.ElectionTimeout = s.cfg.ElectionTimeout
}
return config
}
Expand Down Expand Up @@ -526,11 +512,11 @@ func (s *Store) Stats() (map[string]interface{}, error) {
"node_id": leaderID,
"addr": s.LeaderAddr(),
},
"apply_timeout": s.ApplyTimeout.String(),
"heartbeat_timeout": s.HeartbeatTimeout.String(),
"election_timeout": s.ElectionTimeout.String(),
"snapshot_threshold": s.SnapshotThreshold,
"snapshot_interval": s.SnapshotInterval,
"apply_timeout": s.cfg.ApplyTimeout.String(),
"heartbeat_timeout": s.cfg.HeartbeatTimeout.String(),
"election_timeout": s.cfg.ElectionTimeout.String(),
"snapshot_threshold": s.cfg.SnapshotThreshold,
"snapshot_interval": s.cfg.SnapshotInterval,
"trailing_logs": s.numTrailingLogs,
"metadata": s.meta,
"nodes": nodes,
Expand Down Expand Up @@ -636,7 +622,7 @@ func (s *Store) remove(id string) error {
return err
}

f = s.raft.Apply(bc, s.ApplyTimeout)
f = s.raft.Apply(bc, s.cfg.ApplyTimeout)
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
return ErrNotLeader
Expand Down

0 comments on commit 94d1c0a

Please sign in to comment.