Istio 1.11.2 底层自动注入流程

Istio 1.11.2 底层自动注入流程,第1张

自动注入流程 简单说下

正式开始之前简单说一下 webhook,这样对以下理解更方便一些。

准入控制器也就是 webhook 会拦截 API Server 收到的请求,拦截发生在认证和鉴权完成之后,对象进行持久化之前(这个时候可以修改 pod 模版完成自动注入)。可以定义两种类型的 webhook: Mutating 和 Validating

Mutating 修改资源,可以对请求内容进行修改Validating 验证资源,根据请求的内容判断是继续执行该请求还是拒绝该请求 实现流程

pkg/kube/inject/webhook.go 实例化了 webhook 并注册了注入函数

func NewWebhook(p WebhookParameters) (*Webhook, error) {
	if p.Mux == nil {
		return nil, errors.New("expected mux to be passed, but was not passed")
	}

	wh := &Webhook{
		watcher:    p.Watcher,
		meshConfig: p.Env.Mesh(),
		env:        p.Env,
		revision:   p.Revision,
	}

	p.Watcher.SetHandler(wh.updateConfig)
	sidecarConfig, valuesConfig, err := p.Watcher.Get()
	if err != nil {
		return nil, err
	}
	wh.updateConfig(sidecarConfig, valuesConfig)
  
  // 注册 serveInject 函数
	p.Mux.HandleFunc("/inject", wh.serveInject)
	p.Mux.HandleFunc("/inject/", wh.serveInject)

	p.Env.Watcher.AddMeshHandler(func() {
		wh.mu.Lock()
		wh.meshConfig = p.Env.Mesh()
		wh.mu.Unlock()
	})

	return wh, nil
}

根据 MutatingAdmissionWebhooks 中的配置,符合定义的 pod(例如:自动注入标签、istio.io 标签)在创建时 apiserver 会回调 /inject 请求。然后执行 wh.serveInject

func (wh *Webhook) serveInject(w http.ResponseWriter, r *http.Request) {
	totalInjections.Increment()
	// body 保存着请求体
	var body []byte
	if r.Body != nil {
		if data, err := ioutil.ReadAll(r.Body); err == nil {
		  // 读取请求体内容
			body = data
		}
	}
	
	// 一些验证校验
	if len(body) == 0 {
		handleError("no body found")
		http.Error(w, "no body found", http.StatusBadRequest)
		return
	}

	// verify the content type is accurate
	contentType := r.Header.Get("Content-Type")
	if contentType != "application/json" {
		handleError(fmt.Sprintf("contentType=%s, expect application/json", contentType))
		http.Error(w, "invalid Content-Type, want `application/json`", http.StatusUnsupportedMediaType)
		return
	}

	path := ""
	if r.URL != nil {
		path = r.URL.Path
	}

	var reviewResponse *kube.AdmissionResponse
	var obj runtime.Object
	var ar *kube.AdmissionReview
	// deserializer.Decode k8s 提供的反序列化功能,会返回一个 object 接口
	if out, _, err := deserializer.Decode(body, nil, obj); err != nil {
		handleError(fmt.Sprintf("Could not decode body: %v", err))
		reviewResponse = toAdmissionResponse(err)
	} else {
		log.Debugf("AdmissionRequest for path=%s\n", path)
		// AdmissionReviewKubeToAdapter 内部判断了当前 out 的 apiVersion 是 kubeApiAdmissionv1beta1 或者 kubeApiAdmissionv1
		// 然后封装了 AdmissionReview 包含 typeMeta、request、response
		ar, err = kube.AdmissionReviewKubeToAdapter(out)
		if err != nil {
			handleError(fmt.Sprintf("Could not decode object: %v", err))
		}
		// 注入,真正的修改 pod 模版,修改的逻辑在下面
		reviewResponse = wh.inject(ar, path)
	}

    // 构建返回值
	response := kube.AdmissionReview{}
	response.Response = reviewResponse
	var responseKube runtime.Object
	var apiVersion string
	if ar != nil {
		apiVersion = ar.APIVersion
		response.TypeMeta = ar.TypeMeta
		if response.Response != nil {
			if ar.Request != nil {
				response.Response.UID = ar.Request.UID
			}
		}
	}
  
    // 判断当前 response 的 api version:admission.k8s.io/v1、admission.k8s.io/v1beta1
    // 根据不同的 version 返回一个 AdmissionResponse
	responseKube = kube.AdmissionReviewAdapterToKube(&response, apiVersion)
    // 响应值序列化为 json
	resp, err := json.Marshal(responseKube)
	if err != nil {
		log.Errorf("Could not encode response: %v", err)
		http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
	}
    // 写到 http response 请求中 
	if _, err := w.Write(resp); err != nil {
		log.Errorf("Could not write response: %v", err)
		http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
	}
}

这里大概可以分为四步

验证 http body 数据

http 请求反序列化(解码)

注入 sidecar

http 请求序列化(重新写到 http response 请求中)

func (wh *Webhook) inject(ar *kube.AdmissionReview, path string) *kube.AdmissionResponse {
	req := ar.Request
	var pod corev1.Pod
    // 反序列化解析 pod 对象
	if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
		handleError(fmt.Sprintf("Could not unmarshal raw object: %v %s", err,
			string(req.Object.Raw)))
		return toAdmissionResponse(err)
	}
	// Managed fields is sometimes extremely large, leading to excessive CPU time on patch generation
	// It does not impact the injection output at all, so we can just remove it.
	pod.ManagedFields = nil

	// Deal with potential empty fields, e.g., when the pod is created by a deployment
	podName := potentialPodName(pod.ObjectMeta)
	if pod.ObjectMeta.Namespace == "" {
		pod.ObjectMeta.Namespace = req.Namespace
	}
    // 打印一些 debug 信息
	log.Infof("Sidecar injection request for %v/%v", req.Namespace, podName)
	log.Debugf("Object: %v", string(req.Object.Raw))
	log.Debugf("OldObject: %v", string(req.OldObject.Raw))

	wh.mu.RLock()
    // 设置注入必须的以及跳过特殊 pod, 例如:设置了 HostNetwork、特殊的命名空间 kube-system 跳过注入
	if !injectRequired(IgnoredNamespaces, wh.Config, &pod.Spec, pod.ObjectMeta) {
		log.Infof("Skipping %s/%s due to policy check", pod.ObjectMeta.Namespace, podName)
		totalSkippedInjections.Increment()
		wh.mu.RUnlock()
		return &kube.AdmissionResponse{
			Allowed: true,
		}
	}

    // 获取 deploy 的元数据 metav1.ObjectMeta, metav1.TypeMeta
	deploy, typeMeta := kube.GetDeployMetaFromPod(&pod)
	params := InjectionParameters{
		pod:                 &pod,
		deployMeta:          deploy,
		typeMeta:            typeMeta,
		templates:           wh.Config.Templates,
		defaultTemplate:     wh.Config.DefaultTemplates,
		aliases:             wh.Config.Aliases,
		meshConfig:          wh.meshConfig,
		valuesConfig:        wh.valuesConfig,
		revision:            wh.revision,
		injectedAnnotations: wh.Config.InjectedAnnotations,
		proxyEnvs:           parseInjectEnvs(path),
	}
	wh.mu.RUnlock()

    //看了一大圈终于注入了同样写在下面
	patchBytes, err := injectPod(params)
	if err != nil {
		handleError(fmt.Sprintf("Pod injection failed: %v", err))
		return toAdmissionResponse(err)
	}

    // 因为是以将 patch 的方式更新,所以上面返回的是 []byte
    // 然后封装 patchBytes 返回 AdmissionResponse
	reviewResponse := kube.AdmissionResponse{
		Allowed: true,
		Patch:   patchBytes,
		PatchType: func() *string {
			pt := "JSONPatch"
			return &pt
		}(),
	}
	totalSuccessfulInjections.Increment()
	return &reviewResponse
}

下面这个函数实际注释写的很明白

injectPod 是核心的注入逻辑。这需要一个 pod 和注入模板,以及注入模板的一些输入,并生成一个 JSON 补丁。

在 webhook 中,我们将直接从 Kubernetes 那里接收到一个 Pod,并直接返回 patch; Kubernetes 将负责应用(apply)patch。

对于 kube-inject,我们将从 YAML 中解析出一个 Pod(这可能涉及从更高级别类型,如 Deployment 中获取)然后在本地应用 patch(这里指的应该是手动注入 istioctl kube-inject)

注入逻辑的工作方式是首先将渲染的注入模板应用于

注入逻辑的是:注入模版与 pod 合并。(使用 merge patch 完成)

目前只支持单个模板,将来使用的模板将是可配置的可支持多模版

func injectPod(req InjectionParameters) ([]byte, error) {
	checkPreconditions(req)

	// The patch will be built relative to the initial pod, capture its current state
    // 保存原始的 pod 配置
	originalPodSpec, err := json.Marshal(req.pod)
	if err != nil {
		return nil, err
	}

	// Run the injection template, giving us a partial pod spec
    // 渲染 sidecar 模版(合并 pod 模版)
	mergedPod, injectedPodData, err := RunTemplate(req)
	if err != nil {
		return nil, fmt.Errorf("failed to run injection template: %v", err)
	}

	mergedPod, err = reapplyOverwrittenContainers(mergedPod, req.pod, injectedPodData)
	if err != nil {
		return nil, fmt.Errorf("failed to re apply container: %v", err)
	}

	// Apply some additional transformations to the pod
	if err := postProcessPod(mergedPod, *injectedPodData, req); err != nil {
		return nil, fmt.Errorf("failed to process pod: %v", err)
	}

    // mergedPod 封装成 patch  *** 作返回 []byte
	patch, err := createPatch(mergedPod, originalPodSpec)
	if err != nil {
		return nil, fmt.Errorf("failed to create patch: %v", err)
	}

	log.Debugf("AdmissionResponse: patch=%v\n", string(patch))
	return patch, nil
}
func RunTemplate(params InjectionParameters) (mergedPod *corev1.Pod, templatePod *corev1.Pod, err error) {
	......
	cluster := valuesStruct.GetGlobal().GetMultiCluster().GetClusterName()
	// TODO allow overriding the values.global network in injection with the system namespace label
	network := valuesStruct.GetGlobal().GetNetwork()
	// params may be set from webhook URL, take priority over values yaml
	if params.proxyEnvs["ISTIO_META_CLUSTER_ID"] != "" {
		cluster = params.proxyEnvs["ISTIO_META_CLUSTER_ID"]
	}
	if params.proxyEnvs["ISTIO_META_NETWORK"] != "" {
		network = params.proxyEnvs["ISTIO_META_NETWORK"]
	}
	// explicit label takes highest precedence
	if n, ok := metadata.Labels[label.TopologyNetwork.Name]; ok {
		network = n
	}

	// use network in values for template, and proxy env variables
	if cluster != "" {
		params.proxyEnvs["ISTIO_META_CLUSTER_ID"] = cluster
	}
	if network != "" {
		params.proxyEnvs["ISTIO_META_NETWORK"] = network
	}
    ......
	data := SidecarTemplateData{
		TypeMeta:             params.typeMeta,
		DeploymentMeta:       params.deployMeta,
		ObjectMeta:           strippedPod.ObjectMeta,
		Spec:                 strippedPod.Spec,
		ProxyConfig:          meshConfig.GetDefaultConfig(),
		MeshConfig:           meshConfig,
		Values:               values,
		Revision:             params.revision,
		EstimatedConcurrency: estimateConcurrency(meshConfig.GetDefaultConfig(), metadata.Annotations, valuesStruct),
	}
	funcMap := CreateInjectionFuncmap()

	// Need to use FuncMap and SidecarTemplateData context
	funcMap["render"] = func(template string) string {
        // 解析模版
		bbuf, err := parseTemplate(template, funcMap, data)
		if err != nil {
			return ""
		}

		return bbuf.String()
	}

	mergedPod = params.pod
	templatePod = &corev1.Pod{}
	for _, templateName := range selectTemplates(params) {
		templateYAML, f := params.templates[templateName]
		if !f {
			return nil, nil, fmt.Errorf("requested template %q not found; have %v",
				templateName, strings.Join(knownTemplates(params.templates), ", "))
		}
		bbuf, err := parseTemplate(templateYAML, funcMap, data)
		if err != nil {
			return nil, nil, err
		}

        // sidecar 模版 byte 数组
		templateJSON, err := yaml.YAMLToJSON(bbuf.Bytes())
		if err != nil {
			return nil, nil, fmt.Errorf("yaml to json: %v", err)
		}

        // 合并模版
		mergedPod, err = applyOverlay(mergedPod, templateJSON)
		if err != nil {
			return nil, nil, fmt.Errorf("failed parsing generated injected YAML (check Istio sidecar injector configuration): %v", err)
		}
    
        // 合并模版
		templatePod, err = applyOverlay(templatePod, templateJSON)
		if err != nil {
			return nil, nil, fmt.Errorf("failed applying injection overlay: %v", err)
		}
	}

	return mergedPod, templatePod, nil
}

func applyOverlay(target *corev1.Pod, overlayJSON []byte) (*corev1.Pod, error) {
    // 序列化 pod 转换 json
	currentJSON, err := json.Marshal(target)
	if err != nil {
		return nil, err
	}

	pod := corev1.Pod{}
	// Overlay the injected template onto the original podSpec
    // 合并 json pod
	patched, err := strategicpatch.StrategicMergePatch(currentJSON, overlayJSON, pod)
	if err != nil {
		return nil, fmt.Errorf("strategic merge: %v", err)
	}

    // 反序列化解析 pod
	if err := json.Unmarshal(patched, &pod); err != nil {
		return nil, fmt.Errorf("unmarshal patched pod: %v", err)
	}
	return &pod, nil
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994814.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存