正式开始之前简单说一下 webhook,这样对以下理解更方便一些。
准入控制器也就是 webhook 会拦截 API Server 收到的请求,拦截发生在认证和鉴权完成之后,对象进行持久化之前(这个时候可以修改 pod 模版完成自动注入)。可以定义两种类型的 webhook: Mutating 和 Validating
pkg/kube/inject/webhook.go
实例化了 webhook 并注册了注入函数
func NewWebhook(p WebhookParameters) (*Webhook, error) {
if p.Mux == nil {
return nil, errors.New("expected mux to be passed, but was not passed")
}
wh := &Webhook{
watcher: p.Watcher,
meshConfig: p.Env.Mesh(),
env: p.Env,
revision: p.Revision,
}
p.Watcher.SetHandler(wh.updateConfig)
sidecarConfig, valuesConfig, err := p.Watcher.Get()
if err != nil {
return nil, err
}
wh.updateConfig(sidecarConfig, valuesConfig)
// 注册 serveInject 函数
p.Mux.HandleFunc("/inject", wh.serveInject)
p.Mux.HandleFunc("/inject/", wh.serveInject)
p.Env.Watcher.AddMeshHandler(func() {
wh.mu.Lock()
wh.meshConfig = p.Env.Mesh()
wh.mu.Unlock()
})
return wh, nil
}
根据 MutatingAdmissionWebhooks
中的配置,符合定义的 pod(例如:自动注入标签、istio.io 标签)在创建时 apiserver 会回调 /inject
请求。然后执行 wh.serveInject
。
func (wh *Webhook) serveInject(w http.ResponseWriter, r *http.Request) {
totalInjections.Increment()
// body 保存着请求体
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
// 读取请求体内容
body = data
}
}
// 一些验证校验
if len(body) == 0 {
handleError("no body found")
http.Error(w, "no body found", http.StatusBadRequest)
return
}
// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
handleError(fmt.Sprintf("contentType=%s, expect application/json", contentType))
http.Error(w, "invalid Content-Type, want `application/json`", http.StatusUnsupportedMediaType)
return
}
path := ""
if r.URL != nil {
path = r.URL.Path
}
var reviewResponse *kube.AdmissionResponse
var obj runtime.Object
var ar *kube.AdmissionReview
// deserializer.Decode k8s 提供的反序列化功能,会返回一个 object 接口
if out, _, err := deserializer.Decode(body, nil, obj); err != nil {
handleError(fmt.Sprintf("Could not decode body: %v", err))
reviewResponse = toAdmissionResponse(err)
} else {
log.Debugf("AdmissionRequest for path=%s\n", path)
// AdmissionReviewKubeToAdapter 内部判断了当前 out 的 apiVersion 是 kubeApiAdmissionv1beta1 或者 kubeApiAdmissionv1
// 然后封装了 AdmissionReview 包含 typeMeta、request、response
ar, err = kube.AdmissionReviewKubeToAdapter(out)
if err != nil {
handleError(fmt.Sprintf("Could not decode object: %v", err))
}
// 注入,真正的修改 pod 模版,修改的逻辑在下面
reviewResponse = wh.inject(ar, path)
}
// 构建返回值
response := kube.AdmissionReview{}
response.Response = reviewResponse
var responseKube runtime.Object
var apiVersion string
if ar != nil {
apiVersion = ar.APIVersion
response.TypeMeta = ar.TypeMeta
if response.Response != nil {
if ar.Request != nil {
response.Response.UID = ar.Request.UID
}
}
}
// 判断当前 response 的 api version:admission.k8s.io/v1、admission.k8s.io/v1beta1
// 根据不同的 version 返回一个 AdmissionResponse
responseKube = kube.AdmissionReviewAdapterToKube(&response, apiVersion)
// 响应值序列化为 json
resp, err := json.Marshal(responseKube)
if err != nil {
log.Errorf("Could not encode response: %v", err)
http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
}
// 写到 http response 请求中
if _, err := w.Write(resp); err != nil {
log.Errorf("Could not write response: %v", err)
http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
}
}
这里大概可以分为四步
验证 http body 数据
http 请求反序列化(解码)
注入 sidecar
http 请求序列化(重新写到 http response 请求中)
func (wh *Webhook) inject(ar *kube.AdmissionReview, path string) *kube.AdmissionResponse {
req := ar.Request
var pod corev1.Pod
// 反序列化解析 pod 对象
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
handleError(fmt.Sprintf("Could not unmarshal raw object: %v %s", err,
string(req.Object.Raw)))
return toAdmissionResponse(err)
}
// Managed fields is sometimes extremely large, leading to excessive CPU time on patch generation
// It does not impact the injection output at all, so we can just remove it.
pod.ManagedFields = nil
// Deal with potential empty fields, e.g., when the pod is created by a deployment
podName := potentialPodName(pod.ObjectMeta)
if pod.ObjectMeta.Namespace == "" {
pod.ObjectMeta.Namespace = req.Namespace
}
// 打印一些 debug 信息
log.Infof("Sidecar injection request for %v/%v", req.Namespace, podName)
log.Debugf("Object: %v", string(req.Object.Raw))
log.Debugf("OldObject: %v", string(req.OldObject.Raw))
wh.mu.RLock()
// 设置注入必须的以及跳过特殊 pod, 例如:设置了 HostNetwork、特殊的命名空间 kube-system 跳过注入
if !injectRequired(IgnoredNamespaces, wh.Config, &pod.Spec, pod.ObjectMeta) {
log.Infof("Skipping %s/%s due to policy check", pod.ObjectMeta.Namespace, podName)
totalSkippedInjections.Increment()
wh.mu.RUnlock()
return &kube.AdmissionResponse{
Allowed: true,
}
}
// 获取 deploy 的元数据 metav1.ObjectMeta, metav1.TypeMeta
deploy, typeMeta := kube.GetDeployMetaFromPod(&pod)
params := InjectionParameters{
pod: &pod,
deployMeta: deploy,
typeMeta: typeMeta,
templates: wh.Config.Templates,
defaultTemplate: wh.Config.DefaultTemplates,
aliases: wh.Config.Aliases,
meshConfig: wh.meshConfig,
valuesConfig: wh.valuesConfig,
revision: wh.revision,
injectedAnnotations: wh.Config.InjectedAnnotations,
proxyEnvs: parseInjectEnvs(path),
}
wh.mu.RUnlock()
//看了一大圈终于注入了同样写在下面
patchBytes, err := injectPod(params)
if err != nil {
handleError(fmt.Sprintf("Pod injection failed: %v", err))
return toAdmissionResponse(err)
}
// 因为是以将 patch 的方式更新,所以上面返回的是 []byte
// 然后封装 patchBytes 返回 AdmissionResponse
reviewResponse := kube.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
PatchType: func() *string {
pt := "JSONPatch"
return &pt
}(),
}
totalSuccessfulInjections.Increment()
return &reviewResponse
}
下面这个函数实际注释写的很明白
injectPod 是核心的注入逻辑。这需要一个 pod 和注入模板,以及注入模板的一些输入,并生成一个 JSON 补丁。
在 webhook 中,我们将直接从 Kubernetes 那里接收到一个 Pod,并直接返回 patch; Kubernetes 将负责应用(apply)patch。
对于 kube-inject,我们将从 YAML 中解析出一个 Pod(这可能涉及从更高级别类型,如 Deployment 中获取)然后在本地应用 patch(这里指的应该是手动注入 istioctl kube-inject)
注入逻辑的工作方式是首先将渲染的注入模板应用于
注入逻辑的是:注入模版与 pod 合并。(使用 merge patch 完成)
目前只支持单个模板,将来使用的模板将是可配置的可支持多模版
func injectPod(req InjectionParameters) ([]byte, error) {
checkPreconditions(req)
// The patch will be built relative to the initial pod, capture its current state
// 保存原始的 pod 配置
originalPodSpec, err := json.Marshal(req.pod)
if err != nil {
return nil, err
}
// Run the injection template, giving us a partial pod spec
// 渲染 sidecar 模版(合并 pod 模版)
mergedPod, injectedPodData, err := RunTemplate(req)
if err != nil {
return nil, fmt.Errorf("failed to run injection template: %v", err)
}
mergedPod, err = reapplyOverwrittenContainers(mergedPod, req.pod, injectedPodData)
if err != nil {
return nil, fmt.Errorf("failed to re apply container: %v", err)
}
// Apply some additional transformations to the pod
if err := postProcessPod(mergedPod, *injectedPodData, req); err != nil {
return nil, fmt.Errorf("failed to process pod: %v", err)
}
// mergedPod 封装成 patch *** 作返回 []byte
patch, err := createPatch(mergedPod, originalPodSpec)
if err != nil {
return nil, fmt.Errorf("failed to create patch: %v", err)
}
log.Debugf("AdmissionResponse: patch=%v\n", string(patch))
return patch, nil
}
func RunTemplate(params InjectionParameters) (mergedPod *corev1.Pod, templatePod *corev1.Pod, err error) {
......
cluster := valuesStruct.GetGlobal().GetMultiCluster().GetClusterName()
// TODO allow overriding the values.global network in injection with the system namespace label
network := valuesStruct.GetGlobal().GetNetwork()
// params may be set from webhook URL, take priority over values yaml
if params.proxyEnvs["ISTIO_META_CLUSTER_ID"] != "" {
cluster = params.proxyEnvs["ISTIO_META_CLUSTER_ID"]
}
if params.proxyEnvs["ISTIO_META_NETWORK"] != "" {
network = params.proxyEnvs["ISTIO_META_NETWORK"]
}
// explicit label takes highest precedence
if n, ok := metadata.Labels[label.TopologyNetwork.Name]; ok {
network = n
}
// use network in values for template, and proxy env variables
if cluster != "" {
params.proxyEnvs["ISTIO_META_CLUSTER_ID"] = cluster
}
if network != "" {
params.proxyEnvs["ISTIO_META_NETWORK"] = network
}
......
data := SidecarTemplateData{
TypeMeta: params.typeMeta,
DeploymentMeta: params.deployMeta,
ObjectMeta: strippedPod.ObjectMeta,
Spec: strippedPod.Spec,
ProxyConfig: meshConfig.GetDefaultConfig(),
MeshConfig: meshConfig,
Values: values,
Revision: params.revision,
EstimatedConcurrency: estimateConcurrency(meshConfig.GetDefaultConfig(), metadata.Annotations, valuesStruct),
}
funcMap := CreateInjectionFuncmap()
// Need to use FuncMap and SidecarTemplateData context
funcMap["render"] = func(template string) string {
// 解析模版
bbuf, err := parseTemplate(template, funcMap, data)
if err != nil {
return ""
}
return bbuf.String()
}
mergedPod = params.pod
templatePod = &corev1.Pod{}
for _, templateName := range selectTemplates(params) {
templateYAML, f := params.templates[templateName]
if !f {
return nil, nil, fmt.Errorf("requested template %q not found; have %v",
templateName, strings.Join(knownTemplates(params.templates), ", "))
}
bbuf, err := parseTemplate(templateYAML, funcMap, data)
if err != nil {
return nil, nil, err
}
// sidecar 模版 byte 数组
templateJSON, err := yaml.YAMLToJSON(bbuf.Bytes())
if err != nil {
return nil, nil, fmt.Errorf("yaml to json: %v", err)
}
// 合并模版
mergedPod, err = applyOverlay(mergedPod, templateJSON)
if err != nil {
return nil, nil, fmt.Errorf("failed parsing generated injected YAML (check Istio sidecar injector configuration): %v", err)
}
// 合并模版
templatePod, err = applyOverlay(templatePod, templateJSON)
if err != nil {
return nil, nil, fmt.Errorf("failed applying injection overlay: %v", err)
}
}
return mergedPod, templatePod, nil
}
func applyOverlay(target *corev1.Pod, overlayJSON []byte) (*corev1.Pod, error) {
// 序列化 pod 转换 json
currentJSON, err := json.Marshal(target)
if err != nil {
return nil, err
}
pod := corev1.Pod{}
// Overlay the injected template onto the original podSpec
// 合并 json pod
patched, err := strategicpatch.StrategicMergePatch(currentJSON, overlayJSON, pod)
if err != nil {
return nil, fmt.Errorf("strategic merge: %v", err)
}
// 反序列化解析 pod
if err := json.Unmarshal(patched, &pod); err != nil {
return nil, fmt.Errorf("unmarshal patched pod: %v", err)
}
return &pod, nil
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)