Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: support cri-o container runtime #1983

Merged
merged 42 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
13f7501
Add files via upload
ocichina001 Mar 27, 2024
9fa6b84
Add files via upload
ocichina001 Mar 27, 2024
f20ed46
Update config.go
ocichina001 Mar 27, 2024
9d2c8b3
Add files via upload
ocichina001 Mar 27, 2024
02e430b
Add files via upload
ocichina001 Mar 27, 2024
fd3c34a
Update Makefile
ocichina001 Mar 27, 2024
0574594
Update pod.go
ocichina001 Mar 27, 2024
4f18730
Update pod.go
ocichina001 Mar 27, 2024
c99a6a6
Update pod.go
ocichina001 Mar 27, 2024
6cb5f9b
Update pod.go
ocichina001 Mar 27, 2024
86a6303
Update pod.go
ocichina001 Mar 27, 2024
2855768
Update pod.go
ocichina001 Mar 27, 2024
4ee8a95
Update pod.go
ocichina001 Mar 27, 2024
6b28f83
Update pod.go
ocichina001 Mar 27, 2024
67eacc3
Update pod.go
ocichina001 Mar 27, 2024
3993903
Update pod.go
ocichina001 Mar 27, 2024
e8f7e7c
Update pod.go
ocichina001 Mar 27, 2024
9923a3d
Update pod.go
ocichina001 Mar 27, 2024
c8a3002
update pod.go
ocichina001 Mar 27, 2024
3ed8b5e
koordlet: record invoke metrics for runtime hook (#1961)
saintube Mar 29, 2024
b8626d8
scheduler: optimize QueueSort func when Gang and Barepod Coexists (#1…
ZiMengSheng Mar 29, 2024
1ba2f1a
Modify the name of the REG_NS variable to koordinator-sh
georgexiang Apr 3, 2024
dd886e4
scheduler: fix reservation nominator residual bug (#1985)
xulinfei1996 Apr 7, 2024
a2157ee
Add codecov token (#1987)
FillZpp Apr 7, 2024
c415bfb
slo-controller: refactor codes for reading (#1973)
hormes Apr 7, 2024
c875583
CI workflow with codecov token (#1989)
FillZpp Apr 8, 2024
e60793f
chores: fix some fmt err (#1970)
ls-2018 Apr 8, 2024
fbdc1e0
Revert "scheduler: pick vf by random (#1953)" (#1986)
ZiMengSheng Apr 9, 2024
e4c83e6
proposal: tc plugin for netqos (#1976)
lucming Apr 9, 2024
b175fe8
scheduler: assure Fairness and DeadlockFree (#1996)
ZiMengSheng Apr 11, 2024
4c9822c
scheduler: sort gang of same gangGroup by gangId (#1997)
ZiMengSheng Apr 11, 2024
756ca75
apis & slo-controller: allow specifying node-wise total bandwidth via…
sjtufl Apr 12, 2024
cec2fe0
scheduler: no rejecting sbiling when invalid scheduling cycle (#1999)
ZiMengSheng Apr 12, 2024
7d0d17a
descheduler: allow annotated pod pass non-retrievable filter (#1994)
ZiMengSheng Apr 12, 2024
f3b35a8
gofmt for "cri-o-runtime" version updated go files
georgexiang Apr 15, 2024
7dd79f0
Merge branch 'main' into cri-o-runtime
georgexiang Apr 15, 2024
59da7aa
Merged back to original k8s version =1.22
georgexiang Apr 15, 2024
bf39f32
Merge branch 'main' into cri-o-runtime
georgexiang Apr 15, 2024
6a4bbe7
Merge branch 'cri-o-runtime' of https://github.com/ocichina/koordinat…
georgexiang Apr 15, 2024
575a6e3
Merge branch 'main' into cri-o-runtime
ocichina001 Apr 16, 2024
c3ed869
Merge branch 'main' into cri-o-runtime
georgexiang Apr 17, 2024
688249e
Merge branch 'main' into cri-o-runtime
ocichina001 Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/koordlet/util/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"

corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -79,7 +80,13 @@ func GetPodSandboxContainerID(pod *corev1.Pod) (string, error) {
continue
}
if _, exist := containerSubDirNames[containerDir.Name()]; !exist {
sandboxCandidates = append(sandboxCandidates, containerDir.Name())
if strings.HasPrefix(containerDir.Name(), "crio-") {
if !strings.HasSuffix(containerDir.Name(), ".scope") {
sandboxCandidates = append(sandboxCandidates, containerDir.Name())
}
} else {
sandboxCandidates = append(sandboxCandidates, containerDir.Name())
}
}
}

Expand Down
100 changes: 100 additions & 0 deletions pkg/koordlet/util/runtime/handler/crio_runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"

runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"

"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
)

func GetCrioEndpoint() string {
return filepath.Join(system.Conf.VarRunRootDir, "crio/crio.sock")
}

func GetCrioEndpoint2() string {
return filepath.Join(system.Conf.VarRunRootDir, "crio.sock")
}

type CrioRuntimeHandler struct {
runtimeServiceClient runtimeapi.RuntimeServiceClient
timeout time.Duration
endpoint string
}

func NewCrioRuntimeHandler(endpoint string) (ContainerRuntimeHandler, error) {
ep := strings.TrimPrefix(endpoint, "unix://")
if _, err := os.Stat(ep); err != nil {
return nil, err
}

client, err := getRuntimeClient(endpoint)
if err != nil {
return nil, err
}

return &CrioRuntimeHandler{
runtimeServiceClient: client,
timeout: defaultConnectionTimeout,
endpoint: endpoint,
}, nil
}

func (c *CrioRuntimeHandler) StopContainer(containerID string, timeout int64) error {
if containerID == "" {
return fmt.Errorf("containerID cannot be empty")
}
t := c.timeout + time.Duration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), t)
defer cancel()

request := &runtimeapi.StopContainerRequest{
ContainerId: containerID,
Timeout: timeout,
}
_, err := c.runtimeServiceClient.StopContainer(ctx, request)
return err
}

func (c *CrioRuntimeHandler) UpdateContainerResources(containerID string, opts UpdateOptions) error {
if containerID == "" {
return fmt.Errorf("containerID cannot be empty")
}
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
request := &runtimeapi.UpdateContainerResourcesRequest{
ContainerId: containerID,
Linux: &runtimeapi.LinuxContainerResources{
CpuPeriod: opts.CPUPeriod,
CpuQuota: opts.CPUQuota,
CpuShares: opts.CPUShares,
CpusetCpus: opts.CpusetCpus,
CpusetMems: opts.CpusetMems,
MemoryLimitInBytes: opts.MemoryLimitInBytes,
OomScoreAdj: opts.OomScoreAdj,
},
}
_, err := c.runtimeServiceClient.UpdateContainerResources(ctx, request)
return err
}
130 changes: 130 additions & 0 deletions pkg/koordlet/util/runtime/handler/crio_runtime_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"context"
"fmt"
"path/filepath"
"testing"

"github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"

mockclient "github.com/koordinator-sh/koordinator/pkg/koordlet/util/runtime/handler/mockclient"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
)

func Test_NewCrioRuntimeHandler(t *testing.T) {
stubs := gostub.Stub(&GrpcDial, func(context context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return &grpc.ClientConn{}, nil
})
defer stubs.Reset()

helper := system.NewFileTestUtil(t)
defer helper.Cleanup()

helper.WriteFileContents("/var/run/crio/crio.sock", "test")
system.Conf.VarRunRootDir = filepath.Join(helper.TempDir, "/var/run")
georgexiang marked this conversation as resolved.
Show resolved Hide resolved
CrioEndpoint1 := GetCrioEndpoint()
unixEndPoint := fmt.Sprintf("unix://%s", CrioEndpoint1)
crioRuntime, err := NewCrioRuntimeHandler(unixEndPoint)
assert.NoError(t, err)
assert.NotNil(t, crioRuntime)

// custom VarRunRootDir
helper.WriteFileContents("/host-var-run/crio/crio.sock", "test1")
system.Conf.VarRunRootDir = filepath.Join(helper.TempDir, "/host-var-run")
CrioEndpoint1 = GetCrioEndpoint()
unixEndPoint = fmt.Sprintf("unix://%s", CrioEndpoint1)
crioRuntime, err = NewCrioRuntimeHandler(unixEndPoint)
assert.NoError(t, err)
assert.NotNil(t, crioRuntime)
}

func Test_Crio_StopContainer(t *testing.T) {
type args struct {
name string
containerId string
runtimeError error
expectError bool
}
tests := []args{
{
name: "test_stopContainer_success",
containerId: "test_container_id",
runtimeError: nil,
expectError: false,
},
{
name: "test_stopContainer_fail",
containerId: "test_container_id",
runtimeError: fmt.Errorf("stopContainer error"),
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockRuntimeClient := mockclient.NewMockRuntimeServiceClient(ctl)
mockRuntimeClient.EXPECT().StopContainer(gomock.Any(), gomock.Any()).Return(nil, tt.runtimeError)

runtimeHandler := ContainerdRuntimeHandler{runtimeServiceClient: mockRuntimeClient, timeout: 1, endpoint: GetCrioEndpoint()}
gotErr := runtimeHandler.StopContainer(tt.containerId, 1)
assert.Equal(t, gotErr != nil, tt.expectError)

})
}
}

func Test_Crio_UpdateContainerResources(t *testing.T) {
type args struct {
name string
containerId string
runtimeError error
expectError bool
}
tests := []args{
{
name: "test_UpdateContainerResources_success",
containerId: "test_container_id",
runtimeError: nil,
expectError: false,
},
{
name: "test_UpdateContainerResources_fail",
containerId: "test_container_id",
runtimeError: fmt.Errorf("UpdateContainerResources error"),
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockRuntimeClient := mockclient.NewMockRuntimeServiceClient(ctl)
mockRuntimeClient.EXPECT().UpdateContainerResources(gomock.Any(), gomock.Any()).Return(nil, tt.runtimeError)

runtimeHandler := ContainerdRuntimeHandler{runtimeServiceClient: mockRuntimeClient, timeout: 1, endpoint: GetCrioEndpoint()}
gotErr := runtimeHandler.UpdateContainerResources(tt.containerId, UpdateOptions{})
assert.Equal(t, tt.expectError, gotErr != nil)
})
}
}
40 changes: 40 additions & 0 deletions pkg/koordlet/util/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
DockerHandler handler.ContainerRuntimeHandler
ContainerdHandler handler.ContainerRuntimeHandler
PouchHandler handler.ContainerRuntimeHandler
CrioHandler handler.ContainerRuntimeHandler
mutex = &sync.Mutex{}
)

Expand All @@ -46,6 +47,8 @@ func GetRuntimeHandler(runtimeType string) (handler.ContainerRuntimeHandler, err
return getContainerdHandler()
case system.RuntimeTypePouch:
return getPouchHandler()
case system.RuntimeTypeCrio:
return getCrioHandler()
default:
return nil, fmt.Errorf("runtime type %v is not supported", runtimeType)
}
Expand Down Expand Up @@ -152,6 +155,43 @@ func getPouchEndpoint() (string, error) {
return "", fmt.Errorf("pouch endpoint does not exist")
}

func getCrioHandler() (handler.ContainerRuntimeHandler, error) {
if CrioHandler != nil {
return CrioHandler, nil
}

unixEndpoint, err := getCrioEndpoint()
if err != nil {
klog.Errorf("failed to get cri-o endpoint, error: %v", err)
return nil, err
}

CrioHandler, err = handler.NewCrioRuntimeHandler(unixEndpoint)
if err != nil {
klog.Errorf("failed to create cri-o runtime handler, error: %v", err)
return nil, err
}

return CrioHandler, nil
}

func getCrioEndpoint() (string, error) {
if crioEndpoint := handler.GetCrioEndpoint(); isFile(crioEndpoint) {
return fmt.Sprintf("unix://%s", crioEndpoint), nil
}

if crioEndpoint2 := handler.GetCrioEndpoint2(); isFile(crioEndpoint2) {
return fmt.Sprintf("unix://%s", crioEndpoint2), nil
}

if len(system.Conf.CrioEndPoint) > 0 && isFile(system.Conf.CrioEndPoint) {
klog.Infof("find cri-o Endpoint : %v", system.Conf.CrioEndPoint)
return fmt.Sprintf("unix://%s", system.Conf.CrioEndPoint), nil
}

return "", fmt.Errorf("cri-o endpoint does not exist")
}

func isFile(path string) bool {
s, err := os.Stat(path)
if err != nil || s == nil {
Expand Down
22 changes: 22 additions & 0 deletions pkg/koordlet/util/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ func Test_GetRuntimeHandler(t *testing.T) {
expectRuntimeHandler: "PouchRuntimeHandler",
expectErr: false,
},
{
name: "test_/var/run/crio.sock",
endPoint: "/var/run/crio.sock",
runtimeType: "cri-o",
expectRuntimeHandler: "CrioRuntimeHandler",
expectErr: false,
},
{
name: "test_/var/run/crio/crio.sock",
endPoint: "/var/run/crio/crio.sock",
runtimeType: "cri-o",
expectRuntimeHandler: "CrioRuntimeHandler",
expectErr: false,
},
{
name: "custom containerd",
endPoint: "/var/run/test1/containerd.sock",
Expand All @@ -119,6 +133,14 @@ func Test_GetRuntimeHandler(t *testing.T) {
expectRuntimeHandler: "PouchRuntimeHandler",
expectErr: false,
},
{
name: "custom crio",
endPoint: "/var/run/test4/crio.sock",
flag: "test4/crio.sock",
runtimeType: "cri-o",
expectRuntimeHandler: "CrioRuntimeHandler",
expectErr: false,
},
}

for _, tt := range tests {
Expand Down
Loading
Loading