版本信息如下:
a、 *** 作系统: centos 7.6,amd64
b、kubernetes版本:v1.15.4
c、服务器docker版本:v18.09.2
2 liveness探针功能:
存活性探针用于探测目标容器是否正常运行,手段有http get请求、tcp请求以及exec命令。探测失败,则kubelet会调用停止容器的API(docker stop)来结束目标容器。
3 源码简析:
探针的实现是定时器,定时器会定期执行预先设置好的探测动作,探测结果存放在kubelet的成员livenessManager中。在kubelet主循环中,从livenessManager获取探测结果,如果探测结果是失败的,则调用停止容器的API来杀死容器。
func NewMainKubelet(...) { klet.probeManager = prober.NewManager( klet.statusManager, klet.livenessManager, klet.runner, containerRefManager, kubeDeps.Recorder) }
3.2 probeManager的AddPod(…)方法
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { start := kl.clock.Now() sort.Sort(sliceutils.PodsByCreationTime(pods)) for _, pod := range pods { // 同步目标pod kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) // 对目标pod进行探测 kl.probeManager.AddPod(pod) } }
func (m *manager) AddPod(pod *v1.Pod) { m.workerLock.Lock() defer m.workerLock.Unlock() key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name // 目标容器设置了存活性探针 if c.LivenessProbe != nil { key.probeType = liveness if _, ok := m.workers[key]; ok { klog.Errorf("Liveness probe already exists! %v - %v", format.Pod(pod), c.Name) return } // 1)创建一个worker对象,并放入一个map对象中 w := newWorker(m, liveness, pod, c) m.workers[key] = w // 2)调用worker对象的run()方法开启定时器,定时执行探测动作 go w.run() } } }
3.3 func (w *worker) run()
定时执行预先设定的探测 *** 作,探测 *** 作是w.doProbe() 方法。
func (w *worker) run() { probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod))) probeTicker := time.NewTicker(probeTickerPeriod) // run()方法结束,进行清理工作 defer func() { // 1)关闭定时器 probeTicker.Stop() // 2)删除探针结果 if !w.containerID.IsEmpty() { w.resultsManager.Remove(w.containerID) } // 3)将worker对象从map中删除 w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType) }() probeLoop: for w.doProbe() { // 定时器返回数据,则进入下一个循环 select { case <-w.stopCh: break probeLoop case <-probeTicker.C: } } }
3.4 func (w *worker) doProbe()
执行探测动作,探测结果保存在worker对象的resultsManager中。
func (w *worker) doProbe() (keepGoing bool) { status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID) result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID) if err != nil { // 探测失败,直接返回,不需要记录探测结果 return true } // worker对象的成员resultsManager,其实是kubelet的成员livenessManager对象。 // 将本次探测结果存放到worker对象的resultsManager中,即kubelet的livenessManager对象中。 w.resultsManager.Set(w.containerID, result, w.pod) return true }
kubelet的成员livenessManager对象,worker对象的成员resultsManager、manager对象(管理worker对象)的成员livenessManager,其实是同一个值。因此,探针的结果其实都保存在kubelet的成员livenessManager对象中。
// kubelet创建manager对象(管理worker对象),第二个入参livenessManager的值其实是klet.livenessManager func NewManager( statusManager status.Manager, livenessManager results.Manager, runner kubecontainer.ContainerCommandRunner, refManager *kubecontainer.RefManager, recorder record.EventRecorder) Manager { return &manager{ livenessManager: livenessManager, } }
创建worker对象的构造方法,第一个入参就是manager对象(管理worker对象)。
func newWorker( m *manager, probeType probeType, pod *v1.Pod, container v1.Container) *worker { w := &worker{ stopCh: make(chan struct{}, 1), pod: pod, container: container, probeType: probeType, probeManager: m, } switch probeType { case readiness: case liveness: // 入参manager对象的成员livenessManager赋予给worker对象的成员resultsManager w.resultsManager = m.livenessManager } return w }
3.5 func (m *kubeGenericRuntimeManager) SyncPod(…)
SyncPod(…)是用来同步一个pod,pod中待杀死的容器放在podContainerChanges.ContainersToKill中。
在本方法的Step 3中,遍历podContainerChanges.ContainersToKill,调用停止容器的API来杀死容器,存活性探针失败的情形就属于这个情景。
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // Step 1: 计算sandbox和业务container的变化情况,返回值podContainerChanges有一个成员叫ContainersToKill podContainerChanges := m.computePodActions(pod, podStatus) // Step 2: Kill the pod if the sandbox has changed. if podContainerChanges.KillPod { } else { // Step 3: 杀死pod中的container,如果这些container理应不该运行 // m.computePodActions( )方法有机会填充 ContainersToKill,例如在liveness探针失败的情景。 // 遍历 ContainersToKill,调用m.killContainer(...)方法杀死容器 for containerID, containerInfo := range podContainerChanges.ContainersToKill { if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil { return } } } // Step 4: Create a sandbox for the pod if necessary. // Step 5: start the init container. // Step 6: start containers in podContainerChanges.ContainersToStart. return }
3.6 func (m *kubeGenericRuntimeManager) computePodActions(…)
computePodActions(…)方法用于计算容器的变化情况,其中包含了待杀死的容器。
代码逻辑是:从kubelet的成员livenessManager中获取存活性探测的结果,如果探测结果是失败的,则将目标容器放入changes.ContainersToKill中,最后返回。
func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions { changes := podActions{ ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo), } // 遍历pod中的每个容器 for idx, container := range pod.Spec.Containers { containerStatus := podStatus.FindContainerStatusByName(container.Name) var message string restart := shouldRestartonFailure(pod) if _, _, changed := containerChanged(&container, containerStatus); changed { // 从livenessManager中获取探测结果,livenessManager其实也是kubelet的成员livenessManager。 } else if liveness, found := m.livenessManager.Get(containerStatus.ID); found && liveness == proberesults.Failure { // 遇见失败的探测结果则进来,不会执行下面的continue指令,则后续一定会将容器放入ContainersToKill message = fmt.Sprintf("Container %s failed liveness probe", container.Name) } else { // 代码来到此处,说明保留当前的container keepCount++ continue } // 把容器放入ContainersToKill changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{ name: containerStatus.Name, container: &pod.Spec.Containers[idx], message: message, } } return changes }
4 总结:
liveness探针的实现是定时器,探测结果存放在kubelet的成员livenessManager中,kubelet在同步一个pod时,会从成员livenessManager中获取探测结果,如果探测结果是失败的,则调用停止容器的API来杀死目标pod中的目标容器。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)