Skip to content

Commit

Permalink
Merge pull request #109 from klihub/devel/pass-timeouts-to-plugins
Browse files Browse the repository at this point in the history
api: pass configured timeouts to plugins.
  • Loading branch information
dmcgowan authored Oct 10, 2024
2 parents 7b3bcee + 320e4e7 commit 43b5c7f
Show file tree
Hide file tree
Showing 9 changed files with 806 additions and 578 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ ginkgo-tests:
--coverprofile coverprofile \
--succinct \
--skip-package $(SKIPPED_PKGS) \
-r .; \
-r && \
$(GO_CMD) tool cover -html=$(COVERAGE_PATH)/coverprofile -o $(COVERAGE_PATH)/coverage.html

test-ulimits:
Expand Down
34 changes: 33 additions & 1 deletion pkg/adaptation/adaptation.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Adaptation struct {
serverOpts []ttrpc.ServerOpt
listener net.Listener
plugins []*plugin
syncLock sync.RWMutex
}

var (
Expand Down Expand Up @@ -135,6 +136,7 @@ func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option)
pluginPath: DefaultPluginPath,
dropinPath: DefaultPluginConfigPath,
socketPath: DefaultSocketPath,
syncLock: sync.RWMutex{},
}

for _, o := range opts {
Expand Down Expand Up @@ -464,6 +466,8 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
continue
}

r.requestPluginSync()

err = r.syncFn(ctx, p.synchronize)
if err != nil {
log.Infof(ctx, "failed to synchronize plugin: %v", err)
Expand All @@ -472,9 +476,10 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
r.plugins = append(r.plugins, p)
r.sortPlugins()
r.Unlock()

log.Infof(ctx, "plugin %q connected and synchronized", p.name())
}

r.finishedPluginSync()
}
}()

Expand Down Expand Up @@ -545,3 +550,30 @@ func (r *Adaptation) sortPlugins() {
}
}
}

func (r *Adaptation) requestPluginSync() {
r.syncLock.Lock()
}

func (r *Adaptation) finishedPluginSync() {
r.syncLock.Unlock()
}

type PluginSyncBlock struct {
r *Adaptation
}

// BlockPluginSync blocks plugins from being synchronized/fully registered.
func (r *Adaptation) BlockPluginSync() *PluginSyncBlock {
r.syncLock.RLock()
return &PluginSyncBlock{r: r}
}

// Unblock a plugin sync. block put in place by BlockPluginSync. Safe to call
// multiple times but only from a single goroutine.
func (b *PluginSyncBlock) Unblock() {
if b != nil && b.r != nil {
b.r.syncLock.RUnlock()
b.r = nil
}
}
102 changes: 84 additions & 18 deletions pkg/adaptation/adaptation_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,12 +582,12 @@ var _ = Describe("Plugin container creation adjustments", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod,
Container: ctr,
}
reply, err := runtime.runtime.CreateContainer(ctx, ctrReq)
reply, err := runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())
Expect(stripAdjustment(reply.Adjust)).Should(Equal(stripAdjustment(expected)))
},
Expand Down Expand Up @@ -792,12 +792,12 @@ var _ = Describe("Plugin container creation adjustments", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod,
Container: ctr,
}
reply, err := runtime.runtime.CreateContainer(ctx, ctrReq)
reply, err := runtime.CreateContainer(ctx, ctrReq)
if shouldFail {
Expect(err).ToNot(BeNil())
} else {
Expand Down Expand Up @@ -983,21 +983,21 @@ var _ = Describe("Plugin container updates during creation", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod0}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod0,
Container: ctr0,
}
_, err := runtime.runtime.CreateContainer(ctx, ctrReq)
_, err := runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())

podReq = &api.RunPodSandboxRequest{Pod: pod1}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq = &api.CreateContainerRequest{
Pod: pod1,
Container: ctr1,
}
reply, err = runtime.runtime.CreateContainer(ctx, ctrReq)
reply, err = runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())

Expect(len(reply.Update)).To(Equal(1))
Expand Down Expand Up @@ -1137,21 +1137,21 @@ var _ = Describe("Plugin container updates during creation", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod0}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod0,
Container: ctr0,
}
_, err := runtime.runtime.CreateContainer(ctx, ctrReq)
_, err := runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())

podReq = &api.RunPodSandboxRequest{Pod: pod1}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq = &api.CreateContainerRequest{
Pod: pod1,
Container: ctr1,
}
reply, err = runtime.runtime.CreateContainer(ctx, ctrReq)
reply, err = runtime.CreateContainer(ctx, ctrReq)
if which == "both" {
Expect(err).ToNot(BeNil())
} else {
Expand Down Expand Up @@ -1341,12 +1341,12 @@ var _ = Describe("Solicited container updates by plugins", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod0}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod0,
Container: ctr0,
}
_, err := runtime.runtime.CreateContainer(ctx, ctrReq)
_, err := runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())

updReq := &api.UpdateContainerRequest{
Expand Down Expand Up @@ -1374,7 +1374,7 @@ var _ = Describe("Solicited container updates by plugins", func() {
},
},
}
reply, err = runtime.runtime.UpdateContainer(ctx, updReq)
reply, err = runtime.UpdateContainer(ctx, updReq)

Expect(len(reply.Update)).To(Equal(1))
Expect(err).To(BeNil())
Expand Down Expand Up @@ -1580,12 +1580,12 @@ var _ = Describe("Solicited container updates by plugins", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod0}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod0,
Container: ctr0,
}
_, err := runtime.runtime.CreateContainer(ctx, ctrReq)
_, err := runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())

updReq := &api.UpdateContainerRequest{
Expand Down Expand Up @@ -1613,7 +1613,7 @@ var _ = Describe("Solicited container updates by plugins", func() {
},
},
}
reply, err = runtime.runtime.UpdateContainer(ctx, updReq)
reply, err = runtime.UpdateContainer(ctx, updReq)
if which == "both" {
Expect(err).ToNot(BeNil())
} else {
Expand Down Expand Up @@ -1869,6 +1869,72 @@ var _ = Describe("Unsolicited container update requests", func() {
})
})

var _ = Describe("Plugin configuration request", func() {
var (
s = &Suite{}
)

AfterEach(func() {
s.Cleanup()
})

BeforeEach(func() {
s.Prepare(&mockRuntime{}, &mockPlugin{idx: "00", name: "test"})
})

It("should pass runtime version information to plugins", func() {
var (
runtimeName = "test-runtime"
runtimeVersion = "1.2.3"
)

s.runtime.name = runtimeName
s.runtime.version = runtimeVersion

s.Startup()

Expect(s.plugins[0].RuntimeName()).To(Equal(runtimeName))
Expect(s.plugins[0].RuntimeVersion()).To(Equal(runtimeVersion))
})

When("unchanged", func() {
It("should pass default timeout information to plugins", func() {
var (
registerTimeout = nri.DefaultPluginRegistrationTimeout
requestTimeout = nri.DefaultPluginRequestTimeout
)

s.Startup()
Expect(s.plugins[0].stub.RegistrationTimeout()).To(Equal(registerTimeout))
Expect(s.plugins[0].stub.RequestTimeout()).To(Equal(requestTimeout))
})
})

When("reconfigured", func() {
var (
registerTimeout = nri.DefaultPluginRegistrationTimeout + 5*time.Millisecond
requestTimeout = nri.DefaultPluginRequestTimeout + 7*time.Millisecond
)

BeforeEach(func() {
nri.SetPluginRegistrationTimeout(registerTimeout)
nri.SetPluginRequestTimeout(requestTimeout)
s.Prepare(&mockRuntime{}, &mockPlugin{idx: "00", name: "test"})
})

AfterEach(func() {
nri.SetPluginRegistrationTimeout(nri.DefaultPluginRegistrationTimeout)
nri.SetPluginRequestTimeout(nri.DefaultPluginRequestTimeout)
})

It("should pass configured timeout information to plugins", func() {
s.Startup()
Expect(s.plugins[0].stub.RegistrationTimeout()).To(Equal(registerTimeout))
Expect(s.plugins[0].stub.RequestTimeout()).To(Equal(requestTimeout))
})
})
})

// Notes:
//
// XXX FIXME KLUDGE
Expand Down
12 changes: 7 additions & 5 deletions pkg/adaptation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import (

const (
// DefaultPluginRegistrationTimeout is the default timeout for plugin registration.
DefaultPluginRegistrationTimeout = 5 * time.Second
DefaultPluginRegistrationTimeout = api.DefaultPluginRegistrationTimeout
// DefaultPluginRequestTimeout is the default timeout for plugins to handle a request.
DefaultPluginRequestTimeout = 2 * time.Second
DefaultPluginRequestTimeout = api.DefaultPluginRequestTimeout
)

var (
Expand Down Expand Up @@ -384,9 +384,11 @@ func (p *plugin) configure(ctx context.Context, name, version, config string) er
defer cancel()

rpl, err := p.stub.Configure(ctx, &ConfigureRequest{
Config: config,
RuntimeName: name,
RuntimeVersion: version,
Config: config,
RuntimeName: name,
RuntimeVersion: version,
RegistrationTimeout: getPluginRegistrationTimeout().Milliseconds(),
RequestTimeout: getPluginRequestTimeout().Milliseconds(),
})
if err != nil {
return fmt.Errorf("failed to configure plugin: %w", err)
Expand Down
Loading

0 comments on commit 43b5c7f

Please sign in to comment.