Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bookinfo部署 #41

Merged
merged 7 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ type NodeStatus struct {
Address string `json:"address,omitempty"`
}

// ServiceName -> ClusterIP
type SidecarServiceNameMapping map[string]string

type SidecarMapping map[string][]SidecarEndpoints

type SidecarEndpoints struct {
Expand Down
47 changes: 45 additions & 2 deletions pkg/kubeapiserver/app/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ const (
NamespaceSubsetsURL = "/api/v1/namespaces/:namespace/subsets"
SingleSubsetURL = "/api/v1/namespaces/:namespace/subsets/:subsetname"

SidecarMappingURL = "/api/v1/sidecar-mapping"
SidecarMappingURL = "/api/v1/sidecar-mapping"
SidecarServiceNameMappingURL = "/api/v1/sidecar-service-name-mapping"
)

/* NAMESPACE
Expand Down Expand Up @@ -237,6 +238,7 @@ func (ser *kubeApiServer) binder() {

ser.router.GET(SidecarMappingURL, ser.GetSidecarMapping)
ser.router.POST(SidecarMappingURL, ser.SaveSidecarMapping)
ser.router.GET(SidecarServiceNameMappingURL, ser.GetSidecarServiceNameMapping)
}

func (s *kubeApiServer) GetStatsDataHandler(c *gin.Context) {
Expand Down Expand Up @@ -673,6 +675,30 @@ func (ser *kubeApiServer) AddPodHandler(con *gin.Context) {

pod.Status.Phase = v1.PodPending

namespace := con.Param("namespace")
if namespace == "" {
con.JSON(http.StatusBadRequest, gin.H{
"error": "namespace is required",
})
return
}
if pod.Namespace == "" {
if namespace != Default_Namespace {
con.JSON(http.StatusBadRequest, gin.H{
"error": "namespace does not match",
})
return
}
} else {
if pod.Namespace != namespace {
con.JSON(http.StatusBadRequest, gin.H{
"error": "namespace does not match",
})
return
}
}
pod.Namespace = namespace

/* fake store pod to:
1. namespace , store the binding of podname and uid
2. node , only uid
Expand All @@ -687,7 +713,7 @@ func (ser *kubeApiServer) AddPodHandler(con *gin.Context) {
all_pod_keystr := prefix + "/pods/" + string(pod.ObjectMeta.UID)

// namespace里面对应的是podname和uid的映射
namespace_pod_keystr := prefix + "/namespaces/" + Default_Namespace + "/pods/" + pod_name
namespace_pod_keystr := prefix + "/namespaces/" + namespace + "/pods/" + pod_name

// node里面对应的也是podname和uid的映射
// node_pod_keystr := prefix + "/nodes/" + Default_Nodename + "/pods/" + pod_name
Expand Down Expand Up @@ -2705,3 +2731,20 @@ func (s *kubeApiServer) GetSidecarMapping(c *gin.Context) {
Data: &mapping,
})
}

func (s *kubeApiServer) GetSidecarServiceNameMapping(c *gin.Context) {
services, err := s.getAllServicesFromEtcd()
if err != nil {
c.JSON(http.StatusInternalServerError, v1.BaseResponse[v1.SidecarServiceNameMapping]{
Error: err.Error(),
})
return
}
mapping := make(v1.SidecarServiceNameMapping)
for _, svc := range services {
mapping[svc.Name] = svc.Spec.ClusterIP
}
c.JSON(http.StatusOK, v1.BaseResponse[v1.SidecarServiceNameMapping]{
Data: mapping,
})
}
25 changes: 24 additions & 1 deletion pkg/kubeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Client interface {
GetSubsetByName(name, namespace string) (*v1.Subset, error)

AddSidecarMapping(maps v1.SidecarMapping) error

GetSidecarServiceNameMapping() (v1.SidecarServiceNameMapping, error)
}

type client struct {
Expand Down Expand Up @@ -300,7 +302,7 @@ func (c *client) UploadPodMetrics(metrics []*v1.PodRawMetrics) error {

metricsStr, _ := json.Marshal(metrics)

fmt.Printf("upload metrics str: %s\n", string(metricsStr))
// fmt.Printf("upload metrics str: %s\n", string(metricsStr))

req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(metricsStr))
if err != nil {
Expand Down Expand Up @@ -432,3 +434,24 @@ func (c *client) AddSidecarMapping(maps v1.SidecarMapping) error {
}
return nil
}

func (c *client) GetSidecarServiceNameMapping() (v1.SidecarServiceNameMapping, error) {
resp, err := http.Get(fmt.Sprintf("http://%s:8001/api/v1/sidecar-service-name-mapping", c.apiServerIP))
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var baseResponse v1.BaseResponse[v1.SidecarServiceNameMapping]
err = json.Unmarshal(body, &baseResponse)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("get sidecar service name mapping failed, error: %s", baseResponse.Error)
}
return baseResponse.Data, nil
}
1 change: 1 addition & 0 deletions pkg/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (kls *KubeletServer) RunKubelet(ctx context.Context, wg *sync.WaitGroup) {
func (kls *KubeletServer) createAndInitKubelet() (*kubelet.Kubelet, error) {
kl, err := kubelet.NewMainKubelet(kls.nodeName, kls.kubeClient)
if err != nil {
log.Printf("Failed to create kubelet: %v", err)
return nil, err
}
return kl, nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan types.
case <-syncCh:
// TODO 定时同步Pod信息到metrics collector
allPods, err := kl.runtimeManager.GetAllPods()
log.Printf("allPods: %v\n", allPods)
// log.Printf("allPods: %v\n", allPods)
if err != nil {
log.Printf("Failed to get all pods: %v\n", err)
return true
}
// 由allPods取到所有的status
var podStatusList []*runtime.PodStatus
for _, pod := range allPods {
log.Printf("pod: %v\n", pod)
// log.Printf("pod: %v\n", pod)
podStatus, err := kl.runtimeManager.GetPodStatus(pod.ID, pod.Name, pod.Namespace)
if err != nil {
log.Printf("Failed to get pod %v status: %v\n", pod.Name, err)
Expand Down Expand Up @@ -186,6 +186,7 @@ func (kl *Kubelet) SyncPod(pod *v1.Pod, syncPodType types.SyncPodType, podStatus
log.Printf("Creating pod %v using container manager.\n", pod.Name)
err := kl.runtimeManager.AddPod(pod)
if err != nil {
log.Printf("Failed to create pod %v: %v\n", pod.Name, err)
return
}
log.Printf("Pod %v created.\n", pod.Name)
Expand Down
79 changes: 53 additions & 26 deletions pkg/kubelet/runtime/runtime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func (rm *runtimeManager) GetAllPods() ([]*Pod, error) {
defer rm.lock.Unlock()
containers, err := rm.getAllContainers()
if err != nil {
panic(err)
// panic(err)
return nil, err
}
var ret []*Pod

Expand Down Expand Up @@ -87,7 +88,8 @@ func (rm *runtimeManager) GetPodStatus(ID v1.UID, PodName string, PodSpace strin
defer rm.lock.Unlock()
containers, err := rm.getPodContainers(PodName)
if err != nil {
panic(err)
//panic(err)
return nil, err
}
podStatus := &PodStatus{
ID: ID,
Expand All @@ -107,7 +109,8 @@ func (rm *runtimeManager) GetPodStatus(ID v1.UID, PodName string, PodSpace strin
func (rm *runtimeManager) getPodContainers(PodName string) ([]*ContainerStatus, error) {
containers, err := rm.getAllContainers()
if err != nil {
panic(err)
//panic(err)
return nil, err
}
var ret []*ContainerStatus
for _, container := range containers {
Expand Down Expand Up @@ -138,7 +141,8 @@ func (rm *runtimeManager) AddPod(pod *v1.Pod) error {
//PauseId, err := rm.CreatePauseContainer(pod.UID, pod.Name, pod.Namespace)
PauseId, err := rm.CreatePauseContainer(pod)
if err != nil {
panic(err)
//panic(err)
return err
}

for _, c := range pod.Spec.InitContainers {
Expand All @@ -156,7 +160,8 @@ func (rm *runtimeManager) AddPod(pod *v1.Pod) error {
}
_, err = rm.createContainer(&container, PauseId, pod.UID, pod.Name, pod.Namespace, volumes)
if err != nil {
panic(err)
//panic(err)
return err
}
}

Expand All @@ -171,19 +176,22 @@ func (rm *runtimeManager) CreatePauseContainer(pod *v1.Pod) (string, error) {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
//panic(err)
return "", err
}
defer cli.Close()
PauseContainerImage := "registry.aliyuncs.com/google_containers/pause:3.6"
exi, err := rm.checkImages(PauseContainerImage)
if err != nil {
panic(err)
//panic(err)
return "", err
}
if !exi {
//fmt.Println("yes")
reader, err := cli.ImagePull(ctx, PauseContainerImage, image.PullOptions{})
if err != nil {
panic(err)
//panic(err)
return "", err
}
defer reader.Close()
io.Copy(os.Stdout, reader)
Expand All @@ -206,16 +214,19 @@ func (rm *runtimeManager) CreatePauseContainer(pod *v1.Pod) (string, error) {
DNS: []string{rm.nameserverIP},
}, nil, nil, "")
if err != nil {
panic(err)
//panic(err)
return "", err
}

if err := cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
panic(err)
//panic(err)
return "", err
}

ip, err := nw.Attach(resp.ID)
if err != nil {
panic(err)
//panic(err)
return "", err
}
rm.IpMap[PodID] = ip

Expand Down Expand Up @@ -276,10 +287,13 @@ func (rm *runtimeManager) createInitContainer(c *v1.Container, pauseID string) e
return err
}
if !exist {
_, err = cli.ImagePull(context.Background(), c.Image, image.PullOptions{})
readCloser, err := cli.ImagePull(context.Background(), c.Image, image.PullOptions{})
if err != nil {
return err
}
// 读取pull的输出
_, _ = io.ReadAll(readCloser)
_ = readCloser.Close()
}
hostConfig := &container.HostConfig{
NetworkMode: container.NetworkMode("container:" + pauseID),
Expand Down Expand Up @@ -325,19 +339,22 @@ func (rm *runtimeManager) createContainer(ct *v1.Container, PauseId string, PodI
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
//panic(err)
return "", err
}
defer cli.Close()

exi, err := rm.checkImages(repotag)
if err != nil {
panic(err)
//panic(err)
return "", err
}
if !exi {
//fmt.Println("yes")
reader, err := cli.ImagePull(ctx, "docker.io/library/"+repotag, image.PullOptions{})
reader, err := cli.ImagePull(ctx, repotag, image.PullOptions{})
if err != nil {
panic(err)
//panic(err)
return "", err
}
defer reader.Close()
io.Copy(os.Stdout, reader)
Expand Down Expand Up @@ -403,11 +420,13 @@ func (rm *runtimeManager) createContainer(ct *v1.Container, PauseId string, PodI

resp, err := cli.ContainerCreate(ctx, config, hostConfig, nil, nil, "")
if err != nil {
panic(err)
//panic(err)
return "", err
}

if err := cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
panic(err)
//panic(err)
return "", err
}

// statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
Expand All @@ -421,7 +440,8 @@ func (rm *runtimeManager) createContainer(ct *v1.Container, PauseId string, PodI

out, err := cli.ContainerLogs(ctx, resp.ID, container.LogsOptions{ShowStdout: true})
if err != nil {
panic(err)
//panic(err)
return "", err
}

stdcopy.StdCopy(os.Stdout, os.Stderr, out)
Expand All @@ -432,13 +452,15 @@ func (rm *runtimeManager) checkImages(repotag string) (bool, error) {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
//panic(err)
return false, err
}
defer cli.Close()
images, err := cli.ImageList(ctx, image.ListOptions{})
if err != nil {
//fmt.Println("fail to get images", err)
panic(err)
//panic(err)
return false, err
}
//fmt.Println("Docker Images:")
for _, image := range images {
Expand Down Expand Up @@ -477,13 +499,15 @@ func (rm *runtimeManager) getAllContainers() ([]types.Container, error) {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
//panic(err)
return nil, err
}
defer cli.Close()

containers, err := cli.ContainerList(ctx, container.ListOptions{All: true})
if err != nil {
panic(err)
//panic(err)
return nil, err
}
var ret []types.Container
for _, container := range containers {
Expand Down Expand Up @@ -610,17 +634,20 @@ func (rm *runtimeManager) deleteContainer(ct types.Container) error {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
//panic(err)
return err
}
defer cli.Close()
noWaitTimeout := 0
if ct.State == "running" {
if err := cli.ContainerStop(ctx, ct.ID, container.StopOptions{Timeout: &noWaitTimeout}); err != nil {
panic(err)
//panic(err)
return err
}
}
if err := cli.ContainerRemove(ctx, ct.ID, container.RemoveOptions{}); err != nil {
panic(err)
//panic(err)
return err
}

return nil
Expand Down
Loading
Loading