在K8s中將Pod調(diào)度到某一臺Node節(jié)點之后,后續(xù)的狀態(tài)維護信息則是由對應(yīng)機器上的kubelet進行維護,如何實時反饋本地運行狀態(tài),并通知apiserver則是設(shè)計的難點, 本節(jié)主要是通過感知Pod狀態(tài)變化和探測狀態(tài)改變兩個流程來實際分析其核心數(shù)據(jù)結(jié)構(gòu),來了解內(nèi)部設(shè)計
金山網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、響應(yīng)式網(wǎng)站開發(fā)等網(wǎng)站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)公司2013年開創(chuàng)至今到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。
靜態(tài)Pod主要是指的那些不是通過感知apiserver創(chuàng)建的pod, 因為apiserver上并不包含,但是同時也需要維護和獲取這類Pod的狀態(tài), k8s中就設(shè)計了一個鏡像Pod的概念,其實就是為靜態(tài)Pod鏡像出來一個Pod該Pod的主要信息與靜態(tài)Pod一致,并且在apiserver中進行創(chuàng)建,通過apiserver可以感知的這個鏡像Pod來反映真實的靜態(tài)Pod的狀態(tài),?
statusManager是進行狀態(tài)同步的關(guān)鍵組件其需要綜合當前Pod運行中的數(shù)據(jù)和apiserver存儲的數(shù)據(jù),從而決定最終的狀態(tài)轉(zhuǎn)換, 這里先關(guān)注圖上畫出來的,更多的狀態(tài)等后續(xù)會一一介紹
type versionedPodStatus struct {
status v1.PodStatus
// 單調(diào)遞增的版本號(每個pod)
version uint64
// Pod name & namespace, for sending updates to API server.
podName string
podNamespace string
}
在Kubelet中為保證與apiserver端信息的同步,在本地保存了一個Pod狀態(tài)版本信息,其里面除了保存當前Pod的狀態(tài)數(shù)據(jù)還有一個版本版本號,通過單調(diào)遞增的版本號的對比來確定是否進行狀態(tài)的同步
statusManager的流程其實還是蠻復雜的,今天我們就只講一個場景,即kubelet通過apiserver感知到一個Pod更新,然后順著該功能的數(shù)據(jù)流來進行梳理statusMangaer里面的數(shù)據(jù)流轉(zhuǎn)
manager中的核心狀態(tài)相關(guān)的數(shù)據(jù)結(jié)構(gòu)可以主要分為兩大類:映射數(shù)據(jù)維護(podManager、podStatuses、apiStatusVersions)數(shù)據(jù)通信管道(podStatusChannel), 剩余的則是對與apiserver通信的kublet和進行pod刪除檢查的?podDeletionSafety
type manager struct {
kubeClient clientset.Interface
// 管理緩存Pod,包含鏡像pod和靜態(tài)pod的映射
podManager kubepod.Manager
// 從pod UID映射到相應(yīng)pod的版本狀態(tài)信息 。
podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex
podStatusChannel chan podStatusSyncRequest
// 存儲鏡像pod的版本
apiStatusVersions map[kubetypes.MirrorPodUID]uint64
podDeletionSafety PodDeletionSafetyProvider
}
設(shè)置Pod狀態(tài)主要是位于kubelet中的syncPod中,在接收到pod事件變更之后,會與apiserver進行 Pod最新數(shù)據(jù)的同步從而獲取當前pod在apiserver端的最新狀態(tài)
func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
for _, c := range pod.Status.Conditions {
if !kubetypes.PodConditionByKubelet(c.Type) {
klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+
"But it is not owned by kubelet.", string(c.Type), format.Pod(pod))
}
}
// Make sure we're caching a deep copy.
status = *status.DeepCopy()
// 如果Pod被刪除了則需要強制與apiserver進行信息的同步
m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil)
}
var oldStatus v1.PodStatus
// 檢測之前的本地緩存數(shù)據(jù)
cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached {
oldStatus = cachedStatus.status
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
oldStatus = mirrorPod.Status
} else {
oldStatus = pod.Status
}
檢測容器狀態(tài)主要是針對容器終止狀態(tài)轉(zhuǎn)發(fā)的合法性進行檢測,其實就是根據(jù)設(shè)定的Pod的RestartPolicy來檢測針對一個終止的容器是否可以進行重啟
if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
return false
}
if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
return false
}
通過最新的status里面的condition設(shè)定對應(yīng)PodCondition的LastTransitionTime更新時間未當前時間
// Set ContainersReadyCondition.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)
// Set ReadyCondition.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.PodReady)
// Set InitializedCondition.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)
// Set PodScheduledCondition.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)
首先會根據(jù)當前容器的個數(shù),從而決定每個容器最大的字節(jié)數(shù)大小,然后對容器里面的終止狀態(tài)里面的Message信息,進行截斷,同時進行時間的校對
normalizeStatus(pod, &status)
如果之前已經(jīng)緩存了對應(yīng)的數(shù)據(jù),并且緩存的數(shù)據(jù)與當前的狀態(tài)未發(fā)生改變,也不需要強制更新,就直接返回
if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
// 如果不強制更新 ,默認是true此處不會成立
klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
return false // No new status.
}
生成最新的狀態(tài)緩存數(shù)據(jù),并且遞增本地的版本信息
// 構(gòu)建新的狀態(tài)
newStatus := versionedPodStatus{
status: status,
version: cachedStatus.version + 1, // 更新器緩存
podName: pod.Name,
podNamespace: pod.Namespace,
}
// 更新新的緩存狀態(tài)
m.podStatuses[pod.UID] = newStatus
select {
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: // 構(gòu)建一個新的同步請求
klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
pod.UID, newStatus.version, newStatus.status)
return true
default:
// Let the periodic syncBatch handle the update if the channel is full.
// We can't block, since we hold the mutex lock.
klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
format.Pod(pod), status)
return false
}
探測狀態(tài)其實就是Pod內(nèi)容器的運行狀態(tài),比如如果設(shè)置了Readiness探測,當某個容器探測失敗的時候,就會通知對應(yīng)的service從后端的enpoint中移除該Pod, 讓我們一起看看Kubelet是如何將運行狀態(tài)通知到apiserver端的
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
// 獲取本地的容器
pod, ok := m.podManager.GetPodByUID(podUID)
if !ok {
klog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
return
}
// 獲取當前的狀態(tài)
oldStatus, found := m.podStatuses[pod.UID]
if !found {
klog.Warningf("Container readiness changed before pod has synced: %q - %q",
format.Pod(pod), containerID.String())
return
}
// 獲取當前的容器狀態(tài)
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
if !ok {
klog.Warningf("Container readiness changed for unknown container: %q - %q",
format.Pod(pod), containerID.String())
return
}
// 檢測前后的就緒狀態(tài)是否發(fā)生改變
if containerStatus.Ready == ready {
klog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
format.Pod(pod), containerID.String())
return
}
獲取容器的狀態(tài),修改就緒為當前的狀態(tài)
status := *oldStatus.status.DeepCopy()
containerStatus, _, _ = findContainerStatus(&status, containerID.String())
containerStatus.Ready = ready
會根據(jù)當前運行時的容器探測的狀態(tài),來修改對應(yīng)PodCondition里面的狀態(tài),最后調(diào)用內(nèi)部的更新邏輯
updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) {
conditionIndex := -1
// 獲取Pod對應(yīng)的PodCondition狀態(tài)
for i, condition := range status.Conditions {
if condition.Type == conditionType {
conditionIndex = i
break
}
}
// 修改或追加Pod對應(yīng)的PodCondition狀態(tài)
if conditionIndex != -1 {
status.Conditions[conditionIndex] = condition
} else {
klog.Warningf("PodStatus missing %s type condition: %+v", conditionType, status)
status.Conditions = append(status.Conditions, condition)
}
}
// 計算Ready狀態(tài)
updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase))
// 計算容器Ready狀態(tài)
updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase))
m.updateStatusInternal(pod, status, false)
statusManager會啟動一個后臺的線程來進行更新管道里面同步請求的消費
func (m *manager) Start() {
// 省略非核心代碼
go wait.Forever(func() {
select {
case syncRequest := <-m.podStatusChannel:
// 獲取最新的狀態(tài)信息,更新apiserver
klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
m.syncBatch()
}
}, 0)
}
同步條件檢測主要是檢測鏡像Pod的版本是否發(fā)送變化、Pod當前是否被刪除,如果pod沒有被刪除則返回false,即對一個沒有刪除的Pod我們還是需要繼續(xù)更新其狀態(tài)的
if !m.needsUpdate(uid, status) {
klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
return
}
如果沒有獲取到Pod信息,則直接進行退出即可
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid))
// 如果Pod已經(jīng)被刪除了,就直接退出就行
return
}
if err != nil {
klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
return
}
這里面會通過將最小的狀態(tài)與之前的狀態(tài)來進行merge合并,然后調(diào)用kubeClient進行apiserver端狀態(tài)的修改
oldStatus := pod.Status.DeepCopy()
// 更新服務(wù)端的狀態(tài)
newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
if err != nil {
klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
return
}
// 當前是最新的狀態(tài)
pod = newPod
klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
這里主要是最后階段,即Pod對應(yīng)的資源都已經(jīng)釋放了,則才最終刪除apiserver端的Pod
// 如果pod的DeletionTimestamp被設(shè)置,則對應(yīng)的Pod需要被刪除
if m.canBeDeleted(pod, status.status) {
deleteOptions := metav1.NewDeleteOptions(0)
deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
// 調(diào)用apiserver對Pod進行刪除
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
if err != nil {
klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
return
}
klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
m.deletePodStatus(uid)
}
探活整體的設(shè)計大概就是這樣,希望大佬們多多關(guān)注,一起交流。
k8s源碼閱讀電子書地址: https://www.yuque.com/baxiaoshi/tyado3
文章題目:#IT明星不是夢#圖解kubernetes容器狀態(tài)同步
URL網(wǎng)址:http://chinadenli.net/article16/giiddg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、網(wǎng)站排名、電子商務(wù)、定制網(wǎng)站、用戶體驗、
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)