Kubernetes API的每个请求都会经过多阶段的访问控制之后才会被接受,这包括认证、授权以及 准入控制(Admission Control)等。
整体涉及:
- 认证
- 限流
- 授权
- 准入
- 审计
- 入口地址
- cmd/kube-apiserver/apiserver.go
- 初始化apiserver的 cmd 并执行
func main() {
command := app.NewAPIServerCommand()
code := cli.Run(command)
os.Exit(code)
}
- cmd/kube-apiserver/app/server.go:NewAPIServerCommand()
PersistentPreRunE: func(*cobra.Command, []string) error {
// silence client-go warnings.
// kube-apiserver loopback clients should not log self-issued warnings.
rest.SetDefaultWarningHandler(rest.NoWarnings{})
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
// 设置默认 Options
completedOptions, err := Complete(s)
if err != nil {
return err
}
// 校验 Options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
// 添加Metrics特性
utilfeature.DefaultMutableFeatureGate.AddMetrics()
// 运行
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
- 默认 Options
kube-apiserver 命令行参数
- Option校验
- 文件:/cmd/kube-apiserver/app/options/validation.go
// Validate checks ServerRunOptions and return a slice of found errs.
func (s *ServerRunOptions) Validate() []error {
var errs []error
if s.MasterCount <= 0 {
errs = append(errs, fmt.Errorf("--apiserver-count should be a positive number, but value '%d' provided", s.MasterCount))
}
errs = append(errs, s.Etcd.Validate()...)
errs = append(errs, validateClusterIPFlags(s)...)
errs = append(errs, validateServiceNodePort(s)...)
errs = append(errs, validateAPIPriorityAndFairness(s)...)
errs = append(errs, s.SecureServing.Validate()...)
errs = append(errs, s.Authentication.Validate()...)
errs = append(errs, s.Authorization.Validate()...)
errs = append(errs, s.Audit.Validate()...)
errs = append(errs, s.Admission.Validate()...)
errs = append(errs, s.APIEnablement.Validate(legacyscheme.Scheme, apiextensionsapiserver.Scheme, aggregatorscheme.Scheme)...)
errs = append(errs, validateTokenRequest(s)...)
errs = append(errs, s.Metrics.Validate()...)
return errs
}
- Run
就会调用 Run() 启动函数,并传递经过验证的选项配置 completeOptions 以及一个停止信号的通道 stopCh ,函数的定义如下:
- 文件: vendor/k8s.io/apiserver/pkg/server/signal.go
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalHandler() <-chan struct{} {
return SetupSignalContext().Done()
}
// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalContext() context.Context {
close(onlyOneSignalHandler) // panics when called twice
shutdownHandler = make(chan os.Signal, 2)
ctx, cancel := context.WithCancel(context.Background())
signal.Notify(shutdownHandler, shutdownSignals...)
go func() {
<-shutdownHandler
cancel()
<-shutdownHandler
os.Exit(1) // second signal. Exit directly.
}()
return ctx
}
// Run runs the specified APIServer. This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// 1、创建服务调用链
server, err := CreateServerChain(completeOptions)
if err != nil {
return err
}
// 2、进行服务启动的准备工作
prepared, err := server.PrepareRun()
if err != nil {
return err
}
// 3、服务启动
return prepared.Run(stopCh)
}
- CreateServerChain :创建服务调用链。该函数负责创建各种不同 API Server 的配置并初始化,最后构建出完整的 API Server 链式结构
- PrepareRun:服务启动前的准备工作。该函数负责进行健康检查、存活检查和 OpenAPI 路由的注册工作,以便 apiserver 能够顺利地运行
- Run:服务启动。该函数启动 HTTP Server 实例并开始监听和处理来自客户端的请求
- 服务调用链的创建
在这里会根据不同功能进行解耦,创建出三个不同的 API Server:
- AggregatorServer:API 聚合服务。用于实现 Kubernetes API 聚合层[1] 的功能,当 AggregatorServer 接收到请求之后,如果发现对应的是一个 APIService 的请求,则会直接转发到对应的服务上(自行编写和部署的 API 服务器),否则则委托给 KubeAPIServer 进行处理
- KubeAPIServer:API 核心服务。实现认证、鉴权以及所有 Kubernetes 内置资源的 REST API 接口(诸如 Pod 和 Service 等资源的接口),如果请求未能找到对应的处理,则委托给 APIExtensionsServer 进行处理
- APIExtensionsServer:API 扩展服务。处理 CustomResourceDefinitions(CRD)和 Custom Resource(CR)的 REST 请求(自定义资源的接口),如果请求仍不能被处理则委托给 404 Handler 处理
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err
}
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
return aggregatorServer, nil
}