client-go实战 - 添加container

client-go实战 - 添加container,第1张

1. 需求分析

在平时进行pod故障拍错时,会经常发现某些基础命令或者基础诊断工具补全,在无法在线yum的时候很是麻烦。查阅官方文档可以通过在创建pod镜像的时候通过,共享命名空间的方式来实现。原文如下:

https://kubernetes.io/zh/docs/tasks/configure-pod-container/share-process-namespace/

但是在 *** 作的时候都需要去修改很多yaml文件,因此可以通过client-go调用内部资源,设置某种参数触发在原有container基础上append一个shell的container进去,从而简化 *** 作。

源码参考:https://github.com/ylinyang/b_demo/tree/master/shareProcessNs

2. main函数 package info
import (
	"log"
	"time"

	"gihtub.com/ylinyang/b_demo/shareProcessNs/pkg"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)
获取kubeconfig 初始化客户端
// 1. get kubeconfig
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		inClusterConfig, err := rest.InClusterConfig()
		if err != nil {
			log.Panicln("get kubeconfig is failed, ", err)
		}
		config = inClusterConfig
	}

	// 2. clientset
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Panicln(err)
	}
构建informer
// 3. informer
	sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 10*time.Minute)
	deploymentInformer := sharedInformerFactory.Apps().V1().Deployments()
3. controller函数

在 *** 作controller函数时 思路主要就是informer + memroy + workqueue的处理流程

package info
import (
	"context"
	"fmt"
	"log"
	"time"

	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	informersv1 "k8s.io/client-go/informers/apps/v1"
	"k8s.io/client-go/kubernetes"
	listersv1 "k8s.io/client-go/listers/apps/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
)
先定义controller的struct,同时通过NewController函数提供实例化controller的方法,返回值是一个controller, 同时里面也包含informer的事件函数
type Controller struct {
	clientset kubernetes.Interface

	deploymentLister listersv1.DeploymentLister
	deploymentSynced cache.InformerSynced            // 缓存同步
	workqueue        workqueue.RateLimitingInterface //初始化队列
}

func NewController(clientset kubernetes.Interface, deploymentInformer informersv1.DeploymentInformer) *Controller {
	c := &Controller{
		clientset:        clientset,
		deploymentLister: deploymentInformer.Lister(),
		deploymentSynced: deploymentInformer.Informer().HasSynced,
		workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "shareProcessNs"),
	}

	log.Println("Setting up event handlers")
	// 添加两个事件处理函数 由于informer将事件变更写入缓存,所有需要先等待缓存信息同步在 *** 作
	deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			log.Println("--------------------------------------add func")
			c.queue(obj)  // 在这里先进行初步处理 在加入workqueue 
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			log.Println("--------------------------------------add update")
			c.queue(newObj)
		},
		DeleteFunc: nil,
	})
	return c
}

// 处理 获取obj中的key字段 -- namespace/name
func (c *Controller) queue(obj interface{}) {
	key, err := cache.MetaNamespaceKeyFunc(obj)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("getting key from cache %s\n", err.Error()))
	}
	c.workqueue.Add(key)
}
定义Run方法,在确定缓存没有问题时去调用真正的执行者runwoker
func (c *Controller) Run(stopCh chan struct{}) error {
	defer utilruntime.HandleCrash()
	defer c.workqueue.ShutDown()

	log.Println("Starting shareProcessNs controller")

	log.Println("waiting for informer caches to sync")
	if ok := cache.WaitForCacheSync(stopCh, c.deploymentSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}

	log.Println("Starting workers")

	go wait.Until(c.runWorker, time.Second, stopCh)

	log.Println("Started workers")
	<-stopCh
	log.Println("shutting down workers")

	return nil
}
func (c *Controller) runWorker() {
	for c.processNextWorkItem() {

	}
}
定义processNextWorkItem方法,在这里取出资源 进行逻辑判断 为真正执行做准备
func (c *Controller) processNextWorkItem() bool {
	obj, shutdown := c.workqueue.Get()
	if shutdown {
		return false
	}

	// obj active
	err := func(obj interface{}) error {
		// 处理该obj后需要done
		defer c.workqueue.Done(obj)

		var key string
		var ok bool
		if key, ok = obj.(string); !ok {
			c.workqueue.Forget(obj)
			utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}
		// Run the syncHandler, passing it the namespace/name string of the
		if err := c.syncHandler(key); err != nil {
			c.workqueue.AddRateLimited(key)
			return fmt.Errorf("error syncing %s: %s, requeuing", key, err.Error())
		}
		c.workqueue.Forget(obj)
		log.Printf("Successfully synced %s \n", key)
		return nil
	}(obj)

	if err != nil {
		utilruntime.HandleError(err)
		return true
	}
	return true
}
定义最终的事件处理函数,这里才是正儿八经的事件逻辑处理 往往大部分时候 我们需要 *** 作的地方, 通过获取deployment.GetAnnotations()[“shell”]的annotations参数,是否设置来判断是否新增一个shell container
func (c *Controller) syncHandler(key string) error {
	log.Println(key + " 1. get ns and name")
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}

	log.Println(key + " 2. get shell and annotations")
	deployment, err := c.deploymentLister.Deployments(namespace).Get(name)
	if err != nil && !errors.IsNotFound(err) {
		return err
	}
	if _, ok := deployment.GetAnnotations()["shell"]; !ok {
		for i, container := range deployment.Spec.Template.Spec.Containers {
			if container.Name == "shell" {
				log.Println(key, " 2.1 find shell, but annotations is nil, start delete shell")
				deployment.Spec.Template.Spec.Containers = append(deployment.Spec.Template.Spec.Containers[:i],
					deployment.Spec.Template.Spec.Containers[i+1:]...)
				deployment.Spec.Template.Spec.ShareProcessNamespace = boolPtr(false)
				if _, err := c.clientset.AppsV1().Deployments(namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{}); err != nil {
					return err
				}
				return nil
			}
		}
		return nil
	}
	for _, container := range deployment.Spec.Template.Spec.Containers {
		if container.Name == "shell" {
			log.Println(key, " 2.1 find shell, not create")
			return nil
		}
	}
	log.Println(key + " 3. add container")
	container := v1.Container{
		Name:  "shell",
		Image: "busybox:1.28",
		SecurityContext: &v1.SecurityContext{
			Capabilities: &v1.Capabilities{
				Add: []v1.Capability{"SYS_PTRACE"},
			},
		},
		Stdin: true,
		TTY:   true,
	}
	deployment.Spec.Template.Spec.ShareProcessNamespace = boolPtr(true)
	deployment.Spec.Template.Spec.Containers = append(deployment.Spec.Template.Spec.Containers, container)
	if _, err := c.clientset.AppsV1().Deployments(namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{}); err != nil {
		return err
	}
	log.Println(key + "\t4. fix ok")
	return nil
}

func int32Ptr(i int32) *int32 { return &i }
func boolPtr(b bool) *bool    { return &b }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/926806.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存