Skip to content

Commit

Permalink
koordlet: fix nri reconnect params (#2067)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <[email protected]>
  • Loading branch information
saintube authored May 27, 2024
1 parent 1a66b95 commit f436876
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 25 deletions.
7 changes: 7 additions & 0 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ type Config struct {
RuntimeHooksNRIBackOffFactor float64
RuntimeHooksNRIBackOffSteps int
RuntimeHooksNRISocketPath string
RuntimeHooksNRIPluginName string
RuntimeHooksNRIPluginIndex string
RuntimeHookReconcileInterval time.Duration
}

Expand All @@ -139,6 +141,8 @@ func NewDefaultConfig() *Config {
RuntimeHooksNRIBackOffSteps: math.MaxInt32,
RuntimeHooksNRIBackOffFactor: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHooksNRIPluginName: "koordlet_nri",
RuntimeHooksNRIPluginIndex: "00",
RuntimeHookReconcileInterval: 10 * time.Second,
}
}
Expand All @@ -155,6 +159,9 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.DurationVar(&c.RuntimeHooksNRIBackOffCap, "runtime-hooks-nri-backoff-cap", c.RuntimeHooksNRIBackOffCap, "nri server backoff cap")
fs.IntVar(&c.RuntimeHooksNRIBackOffSteps, "runtime-hooks-nri-backoff-steps", c.RuntimeHooksNRIBackOffSteps, "nri server backoff steps")
fs.Float64Var(&c.RuntimeHooksNRIBackOffFactor, "runtime-hooks-nri-backoff-factor", c.RuntimeHooksNRIBackOffFactor, "nri server reconnect backoff factor")
fs.StringVar(&c.RuntimeHooksNRISocketPath, "runtime-hooks-nri-socket-path", c.RuntimeHooksNRISocketPath, "nri server socket path")
fs.StringVar(&c.RuntimeHooksNRIPluginName, "runtime-hooks-nri-plugin-name", c.RuntimeHooksNRISocketPath, "nri plugin name of the koordlet runtime hooks")
fs.StringVar(&c.RuntimeHooksNRIPluginIndex, "runtime-hooks-nri-plugin-index", c.RuntimeHooksNRIPluginIndex, "nri plugin index of the koordlet runtime hooks")
fs.Var(cliflag.NewStringSlice(&c.RuntimeHookDisableStages), "runtime-hooks-disable-stages", "disable stages for runtime hooks")
fs.BoolVar(&c.RuntimeHooksNRI, "enable-nri-runtime-hook", c.RuntimeHooksNRI, "enable/disable runtime hooks nri mode")
fs.DurationVar(&c.RuntimeHookReconcileInterval, "runtime-hooks-reconcile-interval", c.RuntimeHookReconcileInterval, "reconcile interval for each plugins")
Expand Down
2 changes: 2 additions & 0 deletions pkg/koordlet/runtimehooks/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func Test_NewDefaultConfig(t *testing.T) {
RuntimeHooksNRIBackOffSteps: math.MaxInt32,
RuntimeHooksNRIBackOffFactor: 2,
RuntimeHooksNRISocketPath: "nri/nri.sock",
RuntimeHooksNRIPluginName: "koordlet_nri",
RuntimeHooksNRIPluginIndex: "00",
RuntimeHookReconcileInterval: 10 * time.Second,
}
defaultConfig := NewDefaultConfig()
Expand Down
63 changes: 40 additions & 23 deletions pkg/koordlet/runtimehooks/nri/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"
Expand All @@ -41,6 +42,8 @@ type nriConfig struct {
}

type Options struct {
NriPluginName string
NriPluginIdx string
NriSocketPath string
NriConnectTimeout time.Duration
// support stop running other hooks once someone failed
Expand All @@ -62,26 +65,25 @@ func (o Options) Validate() error {
}

type NriServer struct {
cfg nriConfig
stub stub.Stub
mask stub.EventMask
options Options // server options
cfg nriConfig
stub stub.Stub
mask stub.EventMask
options Options // server options
stubOpts []stub.Option // nri stub options
stopped *atomic.Bool // if false, the stub will try to reconnect when stub.OnClose is invoked
}

const (
events = "RunPodSandbox,RemovePodSandbox,CreateContainer,UpdateContainer"
pluginName = "koordlet_nri"
pluginIdx = "00"
events = "RunPodSandbox,RemovePodSandbox,CreateContainer,UpdateContainer"
)

var (
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.RemovePodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
opts []stub.Option
_ = stub.ConfigureInterface(&NriServer{})
_ = stub.SynchronizeInterface(&NriServer{})
_ = stub.RunPodInterface(&NriServer{})
_ = stub.RemovePodInterface(&NriServer{})
_ = stub.CreateContainerInterface(&NriServer{})
_ = stub.UpdateContainerInterface(&NriServer{})
)

func NewNriServer(opt Options) (*NriServer, error) {
Expand All @@ -90,17 +92,23 @@ func NewNriServer(opt Options) (*NriServer, error) {
return nil, fmt.Errorf("failed to validate nri server, err: %w", err)
}

opts = append(opts, stub.WithPluginName(pluginName))
opts = append(opts, stub.WithPluginIdx(pluginIdx))
opts = append(opts, stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath)))
p := &NriServer{options: opt}
stubOpts := []stub.Option{
stub.WithPluginName(opt.NriPluginName),
stub.WithPluginIdx(opt.NriPluginIdx),
stub.WithSocketPath(filepath.Join(system.Conf.VarRunRootDir, opt.NriSocketPath)),
}
p := &NriServer{
options: opt,
stubOpts: stubOpts,
stopped: atomic.NewBool(false),
}
if p.mask, err = api.ParseEventMask(events); err != nil {
klog.Errorf("failed to parse events %v", err)
return p, err
}
p.cfg.Events = strings.Split(events, ",")

if p.stub, err = stub.New(p, append(opts, stub.WithOnClose(p.onClose))...); err != nil {
if p.stub, err = stub.New(p, append(p.stubOpts, stub.WithOnClose(p.onClose))...); err != nil {
klog.Errorf("failed to create plugin stub: %v", err)
return nil, err
}
Expand Down Expand Up @@ -135,10 +143,15 @@ func (p *NriServer) Start() error {
case <-success:
return nil
case <-errorChan:
return fmt.Errorf("nri start fail")
return fmt.Errorf("nri start fail, err: %w", err)
}
}

func (p *NriServer) Stop() {
p.stopped.Store(true)
p.stub.Stop()
}

func (p *NriServer) Configure(config, runtime, version string) (stub.EventMask, error) {
klog.V(4).Infof("got configuration data: %q from runtime %s %s", config, runtime, version)
if config == "" {
Expand Down Expand Up @@ -248,13 +261,17 @@ func (p *NriServer) RemovePodSandbox(pod *api.PodSandbox) error {
func (p *NriServer) onClose() {
//TODO: consider the pod status during restart
retryFunc := func() (bool, error) {
stub, err := stub.New(p, append(opts, stub.WithOnClose(p.onClose))...)
if p.stopped.Load() { // if set to stopped, no longer reconnect
return true, nil
}

newStub, err := stub.New(p, append(p.stubOpts, stub.WithOnClose(p.onClose))...)
if err != nil {
klog.Errorf("failed to create plugin stub: %v", err)
return false, nil
}

p.stub = stub
p.stub = newStub
err = p.Start()
if err != nil {
completeNriSocketPath := filepath.Join(system.Conf.VarRunRootDir, p.options.NriSocketPath)
Expand All @@ -263,7 +280,7 @@ func (p *NriServer) onClose() {
return false, err
}
//TODO: check the error type, if nri server disable nri, we should also break backoff
klog.Warningf("nri reconnect failed")
klog.Warningf("nri reconnect failed, err: %s", err)
return false, nil
} else {
klog.V(4).Info("nri server restart success")
Expand Down
30 changes: 28 additions & 2 deletions pkg/koordlet/runtimehooks/nri/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ func TestNriServer_Start(t *testing.T) {
wantErr bool
}{
{
name: "stub is nil",
name: "nri socket not found",
fields: fields{
stub: nil,
mask: api.EventMask(1),
options: Options{
NriSocketPath: "nri/nri.sock",
NriConnectTimeout: time.Second,
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Expand All @@ -81,19 +82,34 @@ func TestNriServer_Start(t *testing.T) {
wantErr: true,
},
{
name: "stub is nil",
fields: fields{
stub: nil,
mask: api.EventMask(1),
options: Options{
NriSocketPath: "",
NriConnectTimeout: time.Second,
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Executor: nil,
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
helper := system.NewFileTestUtil(t)
defer helper.Cleanup()

s := &NriServer{
stub: tt.fields.stub,
mask: tt.fields.mask,
options: tt.fields.options,
}
if s.stub != nil {
defer s.Stop()
}

if err := s.Start(); (err != nil) != tt.wantErr {
t.Errorf("Start() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -122,6 +138,8 @@ func TestNewNriServer(t *testing.T) {
isNriSocketExist: false,
},
args: args{opt: Options{
NriPluginName: "test_newNriServer_0",
NriPluginIdx: "00",
NriSocketPath: "nri/nri.sock",
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Expand All @@ -135,6 +153,8 @@ func TestNewNriServer(t *testing.T) {
isNriSocketExist: true,
},
args: args{opt: Options{
NriPluginName: "test_newNriServer_1",
NriPluginIdx: "01",
NriSocketPath: "nri/nri.sock",
PluginFailurePolicy: "Ignore",
DisableStages: getDisableStagesMap([]string{"PreRunPodSandbox"}),
Expand All @@ -150,7 +170,10 @@ func TestNewNriServer(t *testing.T) {
helper.WriteFileContents("nri/nri.sock", "")
}

_, err := NewNriServer(tt.args.opt)
s, err := NewNriServer(tt.args.opt)
if s != nil && s.stub != nil {
defer s.Stop()
}
if (err != nil) != tt.wantErr {
t.Errorf("NewNriServer() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -597,6 +620,9 @@ func TestNriServer_RemovePodSandbox(t *testing.T) {
mask: tt.fields.mask,
options: tt.fields.options,
}
if p.stub != nil {
defer p.Stop()
}
if tt.fields.plugin != nil {
tt.fields.plugin.Register(hooks.Options{})
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/koordlet/runtimehooks/runtimehooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook,
var nriServer *nri.NriServer
if cfg.RuntimeHooksNRI {
nriServerOptions := nri.Options{
NriPluginName: cfg.RuntimeHooksNRIPluginName,
NriPluginIdx: cfg.RuntimeHooksNRIPluginIndex,
NriSocketPath: cfg.RuntimeHooksNRISocketPath,
NriConnectTimeout: cfg.RuntimeHooksNRIConnectTimeout,
PluginFailurePolicy: pluginFailurePolicy,
Expand Down

0 comments on commit f436876

Please sign in to comment.