Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Group enhances the ability to sync.Group in the Go standard library. It's used to execute the method to control the termination condition through Context or channel and execute the method in a stand-alone Go Routine.
From the comments in the code, we need to solve two key problems, how the sliding works and how the jitterFactor works. Once both of them are solved, everything will be clear.
From the code, it's not difficult to see that the sliding is whether the time interval contains execution time. Looking at the 170 lines of code, it's not difficult to guess that Backoff() returns a Timer type, then the Timer start time is the key. One detail needs to be noted in this code, the select starting at line 167 doesn't guarantee order, in other words, if the timer has been triggered and the stopCh has been closed, it isn't necessary to ensure exit. But after entering the lower wheel cycle, due to the 144th line code, it must ensure that the program is normal to exit.
The jitterFactor works rely on the BackoffManager, let's look at the creation process, the configuration parameters and other associated objects are simply preserved.
Let's look at the implementation of its backoff method again. Pay attention to lines 379-383 to ensure that only one timer is working.
Continue to see the Jitter implementation, add a dynamic value on a fixed duration, and the two problems are solved.
Hello everyone, this time I bring you Kubernetes source code reading.
The source code design drawing needs to be used in conjunction with the source code. In addition to showing the key design, the omission of some implementation details will allow you to ask questions as you read. If you find that there are problems when you read the diagram, you can look at the source code with the problem and see how others design and solve it, which will improve your design ability. Let's Study with questions.
If you find that the picture is blurred, it is caused by the picture compression, right click and save it as the original download image or contact me (WeChat: abser9216) to send the original SVG image.
Author of the picture: Abserari,Oiar
Sandbox: The protocol stack can contain multiple endpoints, which can be implemented by Namespace, Jail, etc.
Endpoint: Connect Sandbox with Network
Network: A collection of Endpoints that can communicate directly, which can be implemented using Bridge, VLAN, etc.
Docker Daemon Manages the available NetWorkController. When launching Daemon, all available NetWorkController under the current operating system will be created.
On Unix Operating system: with daemon_unix.go as an example, create None, Host, Bridge network controller.
The controller is the implementation of NetworkController in libnetwork. In this picture, the controller uses a registry map to distinguish network with different types, then use the Driver to create Network and Endpoint, attach the Endpoint to Sandbox or remove them from Sandbox.
The Container uses SandboxID and SandboxKey to find Sandbox. At the same time, Sandbox use containerID to determine which Container it belongs to.
As indicated by docker-network-sandbox.svg, not all the functions are listed, but only the functions which divide the border of Sandbox. The Namespace implementation will be an example of analytic. netlink provides the functions like route, interface. The Namespace could get netlink as below code. Attention: netlink configure only work in Namespace.
GetFromPath would return NsHandle structure, then NsHandle could be used in below methods to create specific SocketHandle.
According to the BridgeName value of the config file(networkConfiguration), attempt to find an existing bridge named with the specified name. If not, use the default Bridge -- docker0.
Create and set network handler in driver
If bridgeInterface exists the valid bridge device, the device and sysctl methods would be added to the queue; if already exists, just add sysctl methods.
Add a corresponding setting method to the setting queue according to the configuration file parameters.
Add the device start setting method to the setup queue and return to the execution result.
Create a NetLink.bridge structure, using BridGename in the configuration to create LinkAttrs, then create a bridge device using the NetLink method. If need to set MAC, randomly generates the MAC address.
So netlink could create and configure Bridge devices.
System Control
/proc/sys/net/ipv6/conf/BridgeName/accept_ra -> 0:Routing suggestions are not accepted
/proc/sys/net/ipv4/conf/BridgeName/route_localnet -> 1:Redirect external traffic to loopback, need to be used with iptables
INTERNAL
filter
DOCKER-ISOLATION-STAGE-1 -i BridgeInterface ! -d Network -j DROP
DOCKER-ISOLATION-STAGE-1 -o BridgeInterface ! -s Network -j DROP
NON INTERNAL
nat
DOCKER -t nat -i BridgeInterface -j RETURN
filter
FORWARD -i BridgeInterface ! -o BridgeInterface -j ACCEPT
HOST IP != nil
nat
POSTROUTING -t nat -s BridgeSubnet ! -o BridgeInterface -j SNAT --to-source HOSTIP
POSTROUTING -t nat -m addrtype --src-type LOCAL -o BridgeInterface -j SNAT --to-source HOSTIP
HOST IP == nil
nat
POSTROUTING -t nat -s BridgeSubnet ! -o BridgeInterface -j MASQUERADE
POSTROUTING -t nat -m addrtype --src-type LOCAL -o BridgeInterface -j MASQUERADE
Inter Container Communication Enabled
filter
FORWARD -i BridgeInterface -o __BridgeInterface -j ACCEPT
Inter Container Communication Disabled
filter
FORWARD -i BridgeInterface -o __BridgeInterface -j DROP
nat
PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
OUTPUT -m addrtype --dst-type LOCAL -j DOCKER
filter
FORWARD -o BridgeInterface -j DOCKER
FORWARD -o BridgeInterface -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
filter
-I FORWARD -j DOCKER-ISOLATION-STAGE-1
There is a default Bridge device docker0 globally, and each Container has its own independent network protocol stack. Container network and Bridge device communication via Veth pairs Different Container on the same node, 3-layer communication can be performed through the ARP protocol; when the Container traffic out of the Node network, should be redirected by the default gateway device Docker0, and then redirect to Eth0.
The timeBudget is defined as follows.
The creation code is as follows.
After the timeBudget starts running, a work collaboration is created. In this coroutine, the budget increase operation will be triggered once a second. If the budget is greater than the upper limit, the upper limit is taken.
The takeAvailable acquires all budgets at a time and resets the budget. The returnUnused returns the remaining budget.
Kubernetes 源码研习社是由 组织的 Kubernetes 源码特别兴趣小组(SIG),由热爱学习、注重个人成长的一帮小伙伴们自由、自愿成立的小组。每个人都非常希望从 Kubernetes 上学到知识,帮助自己实现成长和进步。欢迎加入,一起坚持,一起克服,一起成长。
本期主题:kube-scheduler 源码剖析
活动时间:2020.10.12 开始
如何报名:
Kubernetes 源码 scheduler 剖析,干就完事了。每周学习目标:
每周写笔记做总结。笔记链接:
每周六晚 7-10 点固定在线研讨 Kubernetes 调度器问题。腾讯会议号:4967324951
每日讨论 Kubernetes 源码问题
参阅本项目推荐的 kubernetes 相关文章
本期学习计划
坚持就是胜利
合适的文章有非常多,如果有合适的欢迎提交 PR(Pull Request)合入推荐阅读文章。
对 Kubernetes 核心源码有更深刻的理解
一群热爱云原生的志同道合的朋友
进入报名 excel 表,填写自己信息即被认为是报名参加活动,每周按要求完成总结笔记,参与每周周末的讨论即可
源码研习社也有自己的微信群,如何加入?
扫描下面的二维码,添加 Jimmy Song 好友,备注姓名-公司,留言“加入源码研习社”即可。
郑东旭(Derek Zheng) BFE(万亿流量转发引擎)开源项目的作者之一,《Kubernetes 源码剖析》作者,擅长 Linux 下高性能服务器的开发,对云计算、区块链相关技术领域有深刻的理解。
SIG 的全称是 Special Interests Group, 或称 Super Intellectual Genius。 源码研习社 SIG 小组负责源码研习社活动的日常维护,目前的核心成员包括:
金润森
王文虎
赵卫国
王冬
Know the information from the name of CacheableObject
. It could store the Object instance. In the 1.18 version of Kubernetes, cachingObject
is the only implementation of this interface - CacheableObject
. Its relation as this picture.
Combined with CacheableObject
definitions, it will be found that when the CacheableObject
is stored in Object, Identifier is specified, whether does it mean to save(cache) multiple Object?
The GetObject()
method is indeed returned to an Object instance, although the container class structure of Slice, Map cannot be eliminated to implement the possibility of Object interface, it is also a problem that needs to be deeply understood and resolved.
Let us continue and start by trying to solve these two questions and see what can have any gains.
Each cachingObject
actually stores an Object, which is the metaRuntimeInterface
instance. According to different Identifiers, this object is encoded into different formats and is cached in the map.
metaRuntimeInterface
simultaneously implemented runtime.Object
interface and metav1.Object
interface.
metav1.Object
interface as this defined is using to describe Kuberentes core Object.
From the new method, we can see that a cachingObject
stores an instance, and when it stores an instance, it does not store the original object, but a deep copy
GetObject method Gets a deep copy of the metaRuntimeInterface instance.
CachingObject itself also implement the runtime.Object
interface, implemented as follows. It is important to note that in the DeepCopyObject()
method, a new SerializationCache is created, and the old content is not copied.
The focus is to replace the atomic.Value
, the operation as shown below.
The Object instances are as follows. They are basically in the pkg/apis directory, you can find it yourself.
The scene of Unstructured and Object cooperation is as follows.
The example diagram is as follows.
The DefaultNameFunc implementation is as follows.
The ConversionFunc declaration is as follows.
FieldMappingFunc converts the key to Field in the source structure and the target structure.
Briefly explain the following methods:
Then process them separately according to dv.Kind().
dv.Kind() -> reflect.Struct
dv.Kind() -> reflect.Slice
dv.Kind() -> reflect.Ptr
dv.Kind() -> reflect.Interface
dv.Kind() -> reflect.Map
非常感谢社区整理的以下的学习资料
报名链接:
关于本次活动有任何问题,请在 中提问,也可以参与 反馈。
Look at the definition of , the first four are maintained the relationship between reflect.Type and Schema.GroupVersionKind. The defaulterFuncs is used to build a default object.
is used to convert label and value to internal label and value.
only needs to pay attention to one problem, that is, the GroupVersionKind is generated from the incoming GroupVersion through the Name method of reflect.Type as the Kind. Please see the simplified sample . The sample code can be executed under .
The principle of is as follows. Unversioned Type can be understood as an Object mounted on a Group, and the Version will never be updated.
The principle of is as follows, just pay attention to the return type priority to Internal Type.
The is used to represent the combination of source type and target type, stores the type and type name. is used as the conversion method from the default type to Name. defines the object conversion method.
calls ConversionFuncs.Add method directly.
calls ConversionFuncs.AddUntyped method.
will not do the type of conversion record in the mapping.
register input type Field conversion method.
When Converter executes object conversion methods, such as and , it is allowed to pass in a Meta object and execute the method to construct the scope object in this method.
The handles the default type conversion, the incoming sv, dv have been ensured to be addressable through . This part of the code is a nearly perfect application of the reflect package in Go.
First, deal with the conversion of basic types, which can be converted by or .
Return the result of the method directly. However, you need to pay attention to first convert sv and dv into the form of Key/Value respectively. Please study the method by yourself.
This paper studies the source code of the API Group section. You should read the source code at the same time. It can enhance your design capacity.
VersionedResourcesStorageMap saves the mapping of version->resources->rest.Storage, the first-level mapping is the version, the second-level is the resource, and the storage is used to solve the creation, modification, and deletion of resource objects.
Convert the rest.Storage interface to various operation interfaces, the code is shown below. It can be seen from this that the rest.Storage interface is the key, and we will discuss it in depth later.
Take creater as an example. Finally, register creater or namedCreater on the Post method.
In the registration code, we can see that when registering the API, available Resources and restful.WebService is returned. Afterward, immediately register the Resources available to the WebService on the root request of the WebService, and the action is GET.
本文研究了 Route 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
This paper studies the source code of the Route section. You should read the source code at the same time. It can enhance your design capacity.
The figure below shows the APIServerHandler
core assembly. It is mainly divided into Restful and NonRestful two parts.
- Restful is prioritized, and if the processing is successful, exit. does not execute the NonRestful section;
- if the RESTful section does not have a target function, the NonRestful section is executed. FullHandlerChain
is used for HTTP processing entry points, linking the middleware features, and boots the request to the Director
for processing.
The following is the APIServer
default HandlerChain
build process.
This paper studies the source code of the Storage section. You should read the source code at the same time. It can enhance your design capacity.
The role of StorageFactory is to encapsulate and simplify operations on resources. The main function of StorageFactory is to obtain the storage configuration Config corresponding to the resource according to the incoming GroupResource.
In the API Server, StorageFactory is generated by StorageFactoryConfig, and StorageFactoryConfig is generated by EtcdOption. After all, no matter what changes, etcd storage is the final destination.
DefaultStorageFactory is the only implementation of K8S internal StorageFactory before version 1.18. Let's analyze the mode of DefaultStorageFactory in detail.
The DefaultStorageFactory organizes the associated GroupResources together. As you can see from the above figure, each incoming GroupResource is processed in turn. Therefore, there are also priority issues among the associated GroupResources. The following figure shows the configuration of associated resources in the StorageFactory used when kube-apiserver is created.
Etcd configuration and StorageFactory are finally imported into RESTOptionsGetter. RESTOptionsGetter is used as the core configuration item to find the final storage through GroupResource.
The process of creating storage. The interface is shown in the figure below.
Taking StorageFactoryRestOptionFactory as an example, the steps of the GetRESTOptions method are as follows.
Use StorageFactory to generate Storage Config.
Create a RESTOptions structure and save the generated Storage Config.
Use generic.UndecoratedStorage method as a decorator by default.
If the EnableWatchCache option is turned on, the Decorator will be modified.
UndecoratedStorage only uses the passed storagebackend.Config parameter
Call factory.Create directly to create the back-end storage.
This article has studied the source code of the ETCD part, equipped with the source code for further understanding, which can deepen the understanding and enhance related design capabilities.
Hello everyone, this time I bring you ETCD source code reading. The three parts of this article are the Server part, the Storage part, and the Utility part. With the source code for further understanding, you can deepen your understanding and enhance related design capabilities.
Clients contain the address to be monitored by the etcd server. The address can be in the form of TCP or Unix Socket and supports http and https. The serverCtx matches a net.Listener and runs independently of a goroutine.
run
Start Timer regularly submits or submit and exit when receiving the stop signal. The code is simple, as shown below.
Transaction Relationship
Buffer
References
Landscape
Watcher Creation
Nofity Waiter
soheilhy/cmux: Connection multiplexer for GoLang: serve different services on the same port!
This paper studies the source code of the Storage section. You should read the source code at the same time. It can enhance your design capacity.
Cacher contains an instance of storage.Interface, which is a real storage backend instance. At the same time, Cacher also implements storage.Interface interface, which is a typical decorator pattern. There are a large number of elegant design patterns in the Kubernetes source code, so you can pay more attention when reading. After simply tracking the code, the current guessed relationship is as follows.
The registry package location is as follows.
The storage package location is as follows.
Store initialization code set DryRunnableStorage location.
The Store interface is defined in k8s.io/client-go. Pay attention to the Add/Update/Delete in the interface, which is used to add objects to the Store. Then the role of this interface is the glue between API Server and Etcd.
The Cacher structure is defined as follows, which contains a watchCache instance.
Look at the Cacher initialization method again. Line 373 is used to create a watchCache instance. The EventHandler passed in is a method of Cacher. In this way, watchCache has a channel for injecting events into Cacher.
The dispatchEvents method in the above code seems to be the part that processes the Event sent from the watchCache method. Let's continue, it seems that we are about to solve the event source problem.
Keep track of incoming, so does processEvent seem familiar?
Go to the watchCache structure and find the place where eventHandler is used.
Continue to dig, so far, we have found the complete source of the event, and there are only three types of events: Add/Update/Delete.
The generation of the original event to the final event is shown in the figure below. The keyFunc, getAttrsFunc, Indexer, etc. used are all passed in through configuration.
After the event created, refresh the cache.
The related structure of cacheWatcher in Cacher is shown in the figure below.
The cacheWatcher implements the watch.Interface interface for monitoring events. The watch.Interface declaration is as follows.
The definition of watch.Event is as follows.
The core processing flow of cacheWatcher is as follows.
The judgment processes of triggerValue and triggerSupported are as follows.
CacheWatcher's Input Channel cache size calculation is as follows.
The specific addition code is as follows
The forgetWatcher is as follows. clean watcher from Cacher.
In the Cacher event distribution process, a Timer is created. Each time this Timer is triggered, it is possible to generate a Bookmark Event event and distribute this event. The source code is as follows.
After the Bookmark Event is created, the ResourceVersion information of the event object is updated through Versioner, and then the event is distributed. Next, let's take a look at how to distribute.
The Bookmark Event distribution process is shown in the following figure. You can see that the event has been distributed to all cacheWatchers whose IDs are less than the current time.
After arriving at CacheWatcher, the processing is very simple, just returns the original object.
As you can see from the figure above, when the length of watchersBuffer is greater than or equal to 3, the object is cached for sending. When sending an event, if there is a failure, get an available time slice, within this time slice, try to block sending the event. If all the transmissions are successful, the waiting time slice is exhausted.
If sending fails within the time slice, delete the remaining cacheWatcher.
The ch of all watchers created through watchStream all point to watchStream.ch. The event direction is watcher -> watchStream. Watcher management is in charge of watcherGroup.
本文研究了 Aggregator Server 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
本文研究了 Aggregator Server 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
通过 Informer 监控 APIService 资源变更,通过 ResourceEventHandler 放入 Controller 队列。Controller 内部处理逻辑与其他 Controller 一致,最终将 APIService 资源变更情况,反映至 Aggregator Server 的 HTTP 处理部分。
监听的是 APIService 资源变更
无论是 Add/Update/Delete,重建 cache 方法一致,使用的是从 API Server 获取的服务列表
AvailableConditionController 的运行协程从 queue 中取出内容,并检查该服务状态后,将服务当前上报至 API Server。
本文研究了 Kubernetes 中 Client Shared Informer 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
本文研究了 Kubernetes 中 Client Shared Informer 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
从接口间关系可以看出,SharedInformer 是核心组件,它通过 Controller 执行操作,并将结果存入 Store 中。SharedIndexInformer 为 SharedInformer 添加了 Index 功能。
[1] cache 根据 Object 生成 Key 的方式如下
[2] items 根据 Key 获取老对象,并设置新对象
[3] updateIndices 代码如下
[4] sharedIndexInformer 在创建 processorListener 时,如果处于工作状态,会调用 indexer 的 List 方法将全部缓存的 object 取出,并发送给新添加的 processorListener。
最终获取全部事件对象位置
本文研究了 Kubernetes 中 Client Shared Informer 部分的源码,是 Client 篇的第一部分,下面是全系列的链接。
This article has studied the source code of the Generic API Server part, equipped with the source code for further understanding, which can deepen the understanding and enhance related design capabilities.
The type of HandlerChainBuilderFn is defined as follows. Pass in an http.Handler instance and return an http.Handler instance. In this way, a middleware-like effect can be achieved.
When creating the ApiServerHandler, use the following method.
The final startup code of preparedAPIAggregator is as follows. It simply calls the Run method of runnable. In Server Chain, we know that runnable is an instance of preparedGenericAPIServer generated by GenericAPIServer included in APIAggregator.
The Run method of PreparedGenericapiServer is as follows.
This article has studied the source code of the Master Server part, equipped with the source code for further understanding, which can deepen the understanding and enhance related design capabilities.
First, determine whether the resource configuration of the v1 version is enabled. If enabled, the corresponding resource processing API will be installed. Note that the two core components, StorageFactory and RESTOptionsGetter, have been explained in more detail before.
Create a LegacyRESTStorageProvider object, save the StorageFactory and other necessary information, and then pass in the method InstallLegacyAPI, along with RESTOptionsGetter.
InstallLegacyAPI uses the passed parameters to create APIGroupInfo and install it.
Create APIGroupInfo.
Create various types of RESTStorage, not all of them are listed in the figure below.
Build resources for Storage mapping.
An associate resource to Storage mapping on version v1.
Each resource type has its own REST package. Generally speaking, REST only needs to simply encapsulate a Store. When creating, it will register NewFunc, NewListFunc, and behavior strategies that match the resource type.
Note that REST is not necessarily only one Store, such as Posstorage.
RESTStorageProvider cooperates with Resource Config and REST Options to create APIGroupInfo, which is used to register resource processing methods with API Server.
RESTOptionsGetter registers the Store with the Storage Map according to the version and resource check method in APIResourceConfigSource, and finally mounts the Storage Map to APIGroupInfo. Take Auto Scaling as an example, the code is as follows.
The code to create the v1 version of Storage is as follows, and the other parts are similar.
It is not difficult to see that RESTStorageProvider is the core component that undertakes configuration to API Group. Such a design can clearly divide the boundaries of each structure and interface, and set a reasonable process.
The Listener has only one Enqueue method and is registered somewhere through Notifier. ControllerRunner controls the execution of a task. If it is necessary to notify the outside during the execution process, it will broadcast (or unicast) to the target task queue through the registered Listener list. The queue owner may be a task waiting for the queue to output.
Through this design, use the queue feature to isolate the two related tasks and divide their boundaries. The Enqueue method of the Listener interface has no parameters. Therefore, the implementation of the Listener focuses more on the occurrence of the event rather than the specific details of the event content. This idea is worth learning.
PKI certificate and requirements
This article has studied the source code of the CRD part, equipped with the source code for further understanding, which can deepen the understanding and enhance related design capabilities.
Enabled resource configuration and disabled version.
Enabled the selection as follows.
The three are shown below.
The Store is expanded as shown below.
SharedInformerFactory is used to create SharedIndexInformer, which will periodically use Clientset to connect to the API Extension Services of v1beta1 or v1 and notify the respective ResourceEventHandler after obtaining the status change. Here, there are still some issues that need to dig deeper:
How SharedInformerFactory distinguishes different types of resource state changes
Can ResourceEventHandler pay attention to changes in the state of different types of resources at the same time
How are resource status changes obtained
The Clientset function is relatively simple. It encapsulates the available API Extension Services. Each RESTClient is connected to the "Loopback" address and sends requests to different services.
After the EstablishingController is started, it will start a scheduled execution task. This task checks every second whether there is a new Key value in the queue. If there is, update the corresponding resource status on the Server side to Established.
The sync code is as follows.
The CRD Handler registers event processing with SharedIndexInformer. When the Watch object type is Update, it may be that the state changes to the Established state and needs to be sent to the EstablingController.
When the CRD Handler processes the request, it first checks whether the cache contains the requested object, if so, returns the cached object; if not, it requests the Server and changes the cache status.
本文研究了 Kubernetes 中 Client Shared Informer 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
本文研究了 Kubernetes 中 Client Shared Informer 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
从接口间关系可以看出,SharedInformer 是核心组件,它通过 Controller 执行操作,并将结果存入 Store 中。SharedIndexInformer 为 SharedInformer 添加了 Index 功能。
[1] cache 根据 Object 生成 Key 的方式如下
[2] items 根据 Key 获取老对象,并设置新对象
[3] updateIndices 代码如下
[4] sharedIndexInformer 在创建 processorListener 时,如果处于工作状态,会调用 indexer 的 List 方法将全部缓存的 object 取出,并发送给新添加的 processorListener。
最终获取全部事件对象位置
本文研究了 Kubernetes 中 Client Shared Informer 部分的源码,是 Client 篇的第一部分,下面是全系列的链接。
创建 Broadcaster
使用 Broadcaster 创建 Recorder
各组件根据自己需要,使用 Recorder 向 Broadcaster 发送 Event
Service Informer 是一定存在的 Informer;EndpointSlice 及 Endpoint 二选一,1.18 默认使用 Ednpoit Informer;Node Informer 需要开启 ServiceTopology 选项。最终,都由 Provider 来处理各类事件。 具体来说,每一类型资源都会创建独自的 ResourceEventHandler。各自的 Handler 都是一个 slice 类型,slice 内存储真正的资源 Handler,如下图所示
本文研究了 Queue 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
源码设计图需要配合源码使用,除展示关键设计外,还会有问题性的留白。阅读图的时候发现存在问题的,带着问题去看源码,看别人如何设计解决的,会提高设计能力。
通过对上图的分析,可以知道 Type 实例中三个容器的作用如下:
queue: 保存任意类型实例,其中保存的对象不存在有相同内容的对象在 dirty 或 processing 中
processing: 对象正在处理中,处理完成后移除
dirty: 与要添加的对象内容相同的对象正在处理中
添加对象时,如果延时条件已达成,直接进入 Queue
延时条件未达成,进入 waitForPriorityQueue,其满足 Heap 接口,没进入一个对象,都会以延迟到达的绝对时刻进行重排
如果 waitForPriorityQueue 的“最小元素”不满足延时条件,其他不可能满足延时条件
MaxOfRateLimiter 是 RateLimiter 的 Controller,包含了一个 RateLimiter 列表,可填入随意数量的 RateLimiter 具体实现
MaxOfRateLimiter 本身也是 RateLimiter,其实现方法是使用全部存储的 RateLimiter 分别调用同名方法
MaxOfRateLimiter 中保存的独立的 RateLimiter 可提供不完整的 RateLimiter 能力,由其他 RateLimiter 补足
本文研究了 Controllers 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
大家好,这一次给大家带来的是 Controllers 的源码阅读。
Controller 启动过程是类似的,首先创建到 API Server 的客户端连接 clientset.Interface,它包含了访问 API Server 不同类型资源的客户端。
然后,启动 SharedInformer 接口实例,伴随其启动的,还有一个 Controller 实例。Controller 定期从 API Server 获取资源变更,并存入 Store 实例中。Controller 的 processLoop 协程,从 Store 中顺序读取资源变更事件,并交由 sharedIndexInformer 实例处理,最终到达 ResourceEventHandler。
Controller 实现的核心,在于其对监听资源的变更处理方法上。
简化后的工作流如下图
Controller Manager 负责启动 Controllers。通过注册不同类型 Controller 的初始化方法,并创建 ControllerContext,隔离了 Controller 具体实现。
Controller Manager ---Create--> ControllerContext ---Pass--> Initialization Function
1.18 版本,注册的 Controller
/proc/sys/net/ipv4/conf/all/route_localnet ---> 1
/proc/sys/net/bridge/bridge-nf-call-iptables ---> 1
/proc/sys/net/ipv4/vs/conntrack ---> 1
/proc/sys/net/ipv4/vs/conn_reuse_mode ---> 0
/proc/sys/net/ipv4/vs/expire_nodest_conn ---> 1
/proc/sys/net/ipv4/vs/expire_quiescent_template ---> 1
/proc/sys/net/ipv4/ip_forward ---> 1
StrictARP
/proc/sys/net/ipv4/conf/all/arp_ignore ---> 1
/proc/sys/net/ipv4/conf/all/arp_announce ---> 2
使用 Service 的 ClusterIP、Port 等信息,创建 VirtualServer,通过 netlink 来查询、创建、更新、删除 VirtualServer。
VirtualServer 通过 netlink 接口创建 libipvs.Service 与处理 ClusterIP 类型服务时相同。需要注意,如果开启特性 ExternalPolicyForExternalIP,并且当前处理的 Service 的 Endpoint 只存在与当前 Node,那么使用 KUBE-EXTERNAL-IP-LOCAL 存储 Entry 值。
在同步 Endpoints 前,需要获取当前服务的全部 RealServer,可通过 netlink 根据 VirtualServer 获取其对应的 Destination 列表,再根据 Destination 转化为 RealServer
处理过程比较简单,先处理 newEndpoints 中新增或更新的 Endpoint。处理完毕后,将处理删除的 Endpoints,被移除的 IP 及 Port 可通过 curEndpoints.Difference(newEndpoints) 获取。遍历删除列表,如果 IP、Port 已存在于 termination list,则不需要任何处理;如果不存在,将该 IP、Port 存入 termination list,同时存入的还有其对应的 netlink 创建的 Server。
找到本地地址,并根据当前 Service 使用的 NodePort 情况,创建监听的端口。遍历时,如果遇到零地址段,则退出循环,因为这意味着本机全部 IP 都要监听该 NodePort。 创建 Entry 结构时,与当前 Service 的协议相关,如果为 SCTP 协议,类型为:HashIPPort。如果当前 Service 不是 Node Only,则只需要添加至 KUBE-NODE-PORT-protocol 中即可。
在之前的处理中,在 ipsetList 中各种类型的 IPSet 上,添加了各自的 IP、端口信息,在这个方法中将其应用。utilipset.Interface 实现是基于 ipset 命令集的,ListEntries 方法中传入的 Name 是内部的 utilipset.IPSet 中 Name 域,注意区分。
在之前的处理中 RealServer 已创建完毕,但是,每个 Node 只创建了一个 dummy 类型的网络设备 kube-ipvs0,那么,需要通过 iptables 及 ipset 配置,将流量合理引导。 首先创建的是如下的 NAT 规则,因为使用 -A 选择,下图中顺序即规则顺序,现在需要添加规则,从系统内置链中跳转至不同 Chain 中处理。
跳转规则如下所示
图片来源:维基百科 iptables
Chain 如上图中虚线方框所示,代表了包处理过程中,实际的规则执行路径。Table 是可简单理解为规则的分类:
Raw:Connection Tracking 相关操作
Mangle:修改 IP 包
Nat:地址转换
Filter:过滤
创建 Proxier 结构时,启动了一个协程,用于确保上图的 Table、Chain 都是存在的,然后执行同步规则方法。如果检查存在性时失败,则删除全部的 Table 及 Chain。创建 Table、Chain 成功后,才会执行 syncProxyRules 方法。 Proxier 的 SyncLoop 启动时,注册了 syncProxyRules 的 BoundedFrequencyRunner 也同时启动,可以通过 Sync 方法,触发 syncProxyRules 方法执行,需要注意 BoundedFrequencyRunner 通过令牌桶算法,限制其运行方法的执行频率。
Service 到 ServiceMap 过程如上图所示,LoadBalancer 部分没有在图中标注。如果需要定制化操作,可以通过自定义 makeServiceInfo 方法来实现。ServiceMap 是 iptables 模式下的核心结构之一。
Service Informer 触发后,由 proxy.Provider 来处理。在 iptables 模式下,处理最终落在 ServiceChangeTracker 的 OnServiceAdd/Update/Delete 方法上。三个方法使用简单的技巧,统一为 Update:
Create 时调用 OnServiceUpdate(nil, service)
Delete 时调用 OnServiceDelete(service, nil)
在 ServiceChangeTracker 的 Update 方法中,将当前 Service 对象与前次的 Service 对象分别对应的 ServiceMap 做比较,决定其在 ServiceMap 中的去留,规则如下
同一 Service 对应的 NamespacedName 对象相同
如果 previous 与 current 保存的 ServiceMap 内容相同,则删除,否则更新
Create 时,current 为 Service 对应的 ServiceMap,previous 为 nil
Update 时,更新 current,不改变 previous
Delete 时,更新 current,不改变 previous
Endpoints 处理逻辑与 Service 基本一致,不同指出在于 Endpoints 中包含的 Ports 与 Addresses 可自由组合。同样的,可以通过自定义 makeEndpointInfo 获取 Endpoint 接口对象,这里的 Endpoint 接口,是 Proxy 中使用的,不是 Kubernetes 的资源对象。
处理 Endpoints 变更方法与 Service 并无本质区别,只是将 previous 与 current 指向的对象更根为 EndpointsMap。
Node 资源变更处理相对简单,只要变更 Proxier 的 nodeLabels 即可,以 OnNodeAdd 为例
OnNodeUpdate 使用新 Node 对象的 Labels;OnNodeDelete 则将 nodeLabels 设置为 nil。
原则上,Service、Endpoints(EndpointSlice) 任何一个的变更都会触发 syncProxyRules 的执行,但是,不要忘记 BoundedFrequencyRunner 限制调用频率的存在,因此,在执行规则同步时,有可能存在 Service、Endpoints(EnpointSlice) 同时变更的可能性。 Service、Endpoints、Node 的变更,全部 kube-proxy 都会收到,后续的处理如果没有特殊说明,每个 Node 都会处理。
将检测到的 Service 变更情况更新至 ServiceMap 中,已删除的 Service 如果包含 UDP 协议的端口,保留下来。 最终得到了所有 Created、Updated 的 ServiceMap、已删除的 UDP 端口及到目前为止仍然存活的 Service 的健康检查端口。
将 Endpoints 变更应用到 EndpointsMap 上,上图是没有开启 EndpointSlice 特性时的情况。不同于 ServiceMap 处理之处在于保留的是本地健康的 Endpoint IP 数量。
然后,将 staleServiceNames 合并至由 Service 变更引起的 staleServicesClusterIP 中,这样,全部变更的 Cluster IP 获取完毕。
创建了如下的自定义链并将默认链处理对应至自定义链
Nat
KUBE-SERVICES
KUBE-POSTROUTING
Filter
KUBE-SERVICES
KUBE-EXTERNAL-SERVICES
KUBE-FORWARD
在根据 ServiceMap 处理规则前,先使用 iptables-save 格式确保以下的 Chain 存在
Filter
KUBE-SERVICES
KUBE-EXTERNAL-SERVICES
KUBE-FORWARD
Nat
KUBE-SERVICES
KUBE-NODEPORTS
KUBE-POSTROUTING
KUBE-MARK-MASQ
添加 nat 规则
无 Endpoints 的服务,Node 在接收到请求后,直接拒绝,因此规则添加在 Filter 表;Cluster IP 要根据配置进行 NAT 转化。
是否只有本节点 Endpoints 判断代码如下所示
处理至此,与服务相关的基本规则均已建立,如果本次循环处理的 Service 没有 Endpoints,那么继续处理下一个 Service;如果有,则继续向下建立 Endpoints 规则。
处理到此处,Service 对应的 Endpoints 如果不是 NodeOnly,则处理下一个 Service。后续处理,仅对 NodeOnly 的服务起作用。
然后,对 Node 上每一个网络接口地址设置如下规则
最后,设置 KUBE-FORWARD 上规则如下
本文研究了 Endpoint Controller 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
大家好,这一次给大家带来的是 Endpoint Controller 部分的源码阅读。
EndpointController 在收到 v1.Service 资源变更时,将服务 Spec 中指定的 Selector 对象,保存在一个 map 中。该 map 的 key 通过 DeletionHandlingMetaNamespaceKeyFunc 方法生成,如下所示:
使用的 MetaNamespaceKeyFunc 如下所示:
最后,将生成的 key 放入 EndpointController 的工作队列,待后续处理。附上 Kubernetes 官方的 Service 对象配置示例。
上图为 syncService 完整的流程。第一步,先根据通过 Service 获取的 Key 值,还原 Namespace 与 Name,这两个值是后续向 API Server 查询的核心依据。
服务删除,那么通知 API Server 删除该服务相关的 Endpoint
如果是创建、更新服务
从 API Server 根据 (Namespace, Name) 拉取与现有与 Service.Spec.Selector 相关的 Pod 列表
根据 Pod 列表获取子网地址列表
获取当前与 Service 关联的 Endpoints
DeepCopy 返回的 Endpoints 对象,并更新 Copy 的 Endpoints 的子网地址列表
使用新 Endpoints,通知 API Server 创建或更新 Endpoints 资源
创建/更新过程发生错误时,记录事件
从 API Server 获取的全部 Pod 都会做如下处理来生成 EndpointAddress,以下两种情况下,Pod 被跳过,继续处理下个 Pod:
Pod.PodStatus.IP 长度为 0
没有设置 tolerateUnreadyEndpoints 且 Pod 将被删除
如果设置了 IPv6DualStack,则根据 Service.Spec.ClusterIP 的类型(IPv4 或 IPv6),从 Pod.PodStatus.PodIPs 中存在同类型的地址,找到即刻返回。如果同类型地址不存在,则报错。
获取到 IP 地址的 EndpointAddress 结构,会根据下图条件,设置 EndpointAddress 结构的 Hostname 域。
生成 EndpointAddress 后,根据 Service.ServiceSpec.Ports 配置,生成 EndpointSubset 列表,并存入全局的列表中。
如果设置了 tolerateUnreadyEndpoints 或当前遍历的 Pod 处于 Ready 状态,Ready 计数 +1
如果不满足上述情况,且当前遍历的 Pod 应该归属于 Endpoint,那么 Unready 计数 +1
从 API Server 获取 Pod.Namespace 下所有 Service。遍历 Service,如果缓存中没有该 Service 存在,更新缓存。从缓存中获取 Service 使用的 Selector,并与 Pod 的 Labels 比对,如果一致,说明该服务受到 Pod 影响,添加进队列等待处理。
删除的 Pod 无法从 API Server 上获取,那么从传入的 obj 中获取即可。找到被删除节点后,处理方式就和添加 Pod 一样了。
获取 Pod 对象的方法如下
本文研究了 Namespace Controller 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
NamespaceController 结构体中包含 RateLimitingInterface 示例,当从 Informer 监听到时间变更时,触发的 Event Handler 将事件对象转换为 key 值,并存入 RateLimitingInterface 实例中。
NamespaceController 在启动运行时,会根据要求,启动多个协程,这些协程都执行相同的功能:从 RateLimitingInterface 实例中获取 key 值,并做处理。
使用 Discover Client 获取服务端支持的全部 Resource 列表,并根据 Resource 是否需要归属于Namespace 来进行过滤,不需要关联于 Namespace 的资源被过滤。
将获取到的 Resources 进行遍历,根据 GroupVersion 进行分类,并获取该资源支持的操作列表(Verbs),将不支持 list、deletecollection 操作的 Resource 记录下来。
本文研究了 Kubernetes 中 Plugins 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
本文研究了 Kubernetes 中 Plugins 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
Plugins 包含了一组 PluginSet,每个 PluginSet 又包含了两组 Plugin,一组为激活状态,一组为禁用状态。Plugin 包含一个唯一标识符和该 Plugin 的权重。
接口方法 Less 如上图所示,传入两个 PodInfo 实例,通过 Less 方法判定二者在排序时先后位置。由于 Plugin 接口只有通用方法 Name,因此,每个特定功能的 Plugin 原则上可随意定制自己需要的方法。
将与当前调度 Pod 的相关 Node 数据存储在 CycleState 的 PreFilterInterPodAffinity 关键字中,供后续调度使用。
Registry 用于组织 provider 与 Plugins 间关联关系,保存的是默认的 Plugins。在创建 Profile 结构时,会使用这些默认的 Plugins。
相关代码如下图所示
本文研究了 Kubernetes 中 Plugins 部分的源码,是 Scheduler 的第三部分。
本文研究了 Kubernetes 中 Priority Queue 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
本文研究了 Kubernetes 中 Priority Queue 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
上图为 PriorityQueue 中 activeQ 域,它是一个 Heap 实例。Heap 中核心结构为 data,包含一个字符串到 heapItem 的映射,heapItem 存储了实际对象以及该对象的 Key 在 queue 中位置的变量。
PriorityQueue 中与 Pod 关联的两个数据结构如上图所示。UnschedulablePodsMap 中保存了从 Pod 信息到 key 值的方法。
优先使用传入的 nodeName,若 nodeName 为空时,使用 UID,若 UID 也为空,处理完毕。否则,按上图示意,添加对应的 map。
kube-scheduler 接收到添加 Pod 事件后,会将 Pod 添加进 SchedulerCache 中,随后,执行上图操作。Pod 的添加、更新操作执行相同代码,区别为状态不同,分别对应为 AssignedPodAdd 与 AssignedPodUpdate。
SchedulerCache 注
将 podInfoMap 中全部 PodInfo 移动至 podBackoffQ 或 activeQ 中,并删除 podInfoMap 中对应 K/V 对,标记状态为 AssignedPodDelete。
定时从 backoff queue 中获取一个 PodInfo 对象,检查其 backoff time 是否到期,如果没有到期,直接返回,等待下次触发,此时,PodInfo 对象仍然存在于 backoff queue 中。如果到期,则弹出该对象,并存入 active queue 中,此时,PodInfo 对象被移除出 backoff queue。
本文研究了 Kubernetes 中 Priority Queue 部分的源码,是 Scheduler 的第二部分。
本文研究了 Kubernetes 中 Scheduler Cache 部分的源码,通过画图表现其设计思想,希望读者能自行配备源码进行进一步理解,学会自己进行相关设计。
本文研究了 Kubernetes 中 Scheduler Cache 部分的源码,通过画图表现其设计思想,希望读者能自行配备源码进行进一步理解,学会自己进行相关设计。
Nodes 中保存了 Node.Name 到 nodeInfoListItem 链表的映射。每个 nodeInfoListItem 对应一个 NodeInfo 实例,实例中保存了 Node 信息及相关的 Pod 信息。
AddNode 方法执行时,需要传入一个 Node 实例。首先,根据 Node.Name 是否存在于 nodes 中来判断执行路径。 如果 Node.Name 不存在,那么创建一个新的 nodeInfoListItem 并存入 nodes 中。如果已经存在,那么获取对应的链表第一个对象,使用该对象包含的 Node 节点进行镜像清理,需要注意,这里并没有删除镜像,只是在 imageStates 中移除镜像名。
然后,将最近修改的 Node 对应的链表项移动至 headNode 表头,如下图所示,这样也解释了为什么一个 Node 对应的 Key 会关联一个链表。事实上,一个 Key 只有一个链表项,通过 headNode 关联起来的是最近使用顺序。
接着,将 Node 添加至 nodeTree 中,过程如下图
完成后,将 Node 中关联的镜像添加至 imageStates 中,关于 imageState 的清理操作,前面已详细说明,添加操作不再深入。
随后,再执行 addPod,这里不再描述,前面的图中已详细绘制。
本文研究了 Kubernetes 中 Scheduler Cache 部分的源码,进度在 1/5,接下来将整理 Kubernetes 1.18 版本下的全部源码设计图。 预计会有五个大模块,分别是 API Server,Client,Proxy,Controllers 和 Scheduler,和一些辅助工具如 Docker,Go Basic 和 Network 方面统共 123 张源码设计图。敬请期待吧。
本文研究了 Node Controllers 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
大家好,这一次给大家带来的是 Node Controllers 部分的源码阅读。
IPAM Controller 在 K8S 1.18 中有四种类型的 CIDRAllocator,分别为:RangeAllocatorType、CloudAllocatorType、IPAMFromClusterAllocatorType 及 IPAMFromClusterAllocatorType。
Controller 在 RangeAllocatorType 或 CloudAllocatorType 模式下,cidrAllocator 有具体指向的结构体负责处理 IPAM 逻辑。
Controller 在 IPAMFromClusterAllocatorType 或 IPAMFromClusterAllocatorType 模式下时,启动 ipam.Controller 来处理,但 K8S 1.18 代码中,仅支持 GCE。
再来看一下 Controller 的 Run 方法,如果类型为 RangeAllocatorType、CloudAllocatorType 时,启动 cidrAllocator 的 Run 方法。
Range Allocator 启动时,Node 有可能已启动完毕,因此,需要获取全部 Node,并标注使用的 IP 地址。根据 Node.Spec.PodCIDRs 获取 IPNet 后,使用如下方法获取索引值。
获取到起始位置后,设置 CidrSet 中对应位,防止重复使用。
nodesInProcessing 中存储正在处理的 Node 名称,新 Node 到达时,如果在其中已经存在该名称,说明该节点正在处理中,直接退出处理程序。
如果 Node 的Spec 中 PodCIDRs 不为空,直接更新 rangeAllocator 中 IP 使用位图即可;如果为空,则创建 nodeReservedCIDRs 结构体,并使用 rangeAllocator 分配 IP,只要有一个 IP 分配成功,那么从 nodesInProcessing 移除该 Node。最后,将新创建的 nodeReservedCIDRs 发送至 Worker 协程处理。
根据 Node 最新状态下 Spec 中 PodCIDRs 长度,决定是要执行的操作,如果长度为零,等同于创建操作,否则直接退出。
创建 Node 的逆操作,将占用的 IP 资源释放,根据 Node 配置,遍历 PodCIDRs,将占用的资源逐一释放。
相对于创建时的置 1 操作,释放资源时使用清 0 操作
在 Node 创建事件回调方法中,曾创建一个 nodeReservedCIDRs 并被发送至 Channel 中。在 Worker 协程中,会捕获该结构,并做处理。具体来说,先根据 Name 从 API Server 端获取 Node 最新信息,根据最新信息中的 PodCIDRs 与 该结构体中预留的 CIDR 对比:
长度相同,且每个 IP 信息均相同,则处理完毕
上述条件不满足,且 PodCIDRs 长度不为 0,则释放全部 CIDR 资源,并退出
PodCIDRs 长度为零,那么意味着该 Node 尚未包含 CIDR 资源,分配 CIDR,并通知 API Server,如果成功,则退出;如果 API Server 响应超时,则释放预留 CIDR 资源,退出
Worker 协程中,如果上述处理返回错误,则将 nodeReservedCIDRs 重新发送至 Channel,以待下次处理。
监听到 Pod 变化时,将 Pod 变化情况包装为 podUpdateItem,并放入队列。Pod 变更处理协程从队列中获取到该实例,并处理 Pod 变更情况。如果处理成功,在队列中移除该实例,如果失败,重新将该实例放入队列,以待下次处理。
变更监听代码如下所示,将 Create/Update/Delete 统一至相同方法 podUpdate 中进行后续处理,类似手法已经在之前章节有详细说明,这里不再赘述。
遍历 Node.Status.Conditions,并根据 Condition.Type 在 nodeConditionToKeyStatusMap 中获取到相应信息。nodeConditionToKeyStatusMap 内容如下
获取时如下所示
本文研究了 Kubernetes 中 Schedule 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
本文研究了 Kubernetes 中 Schedule 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。
Scheduler 中封装了具体调度算法,并与调度算法共享相同的 SchedulingQueue 实例对象。在执行调度时,首先需要通过 NextPod 方法获取待调度的 PodInfo。
Profile 中保存着调度框架基本信息。根据选中的当前调度 PodInfo 中的 Pod 实例,选中合适的调度 Profile。根据 Profile、Pod 来判断是否需要跳过当前 Pod 调度,如果判断结果不需要调度当前 Pod,本次调度完成;如果需要调度,则通过 Algorithm 进行调度。
ScheduleAlgorithm 定义了关于调度的两个核心方法,同时,也通过 Extenders 方法预留出了扩展空间。
genericScheduler 实现了 Scheduler 接口,接下来,我们详细看下 genericScheduler 的核心调度方法的实现。
根据 Pod 选择 Profile 后,执行 Profile 的 RunPreFilterPlugins 方法,该方法由 framework 结构提供。执行过程对每个注册的 PreFilterPlugin 接口,执行其 PreFilter 方法,如果执行中有错误发生,那么创建 Status 结构,记录错误信息,并返回,不再执行后续的 PreFilterPlugin 接口。如果全部 PreFilterPlugin 接口都执行成功,返回 nil。Status 结构定义非常简洁,如下
首先根据 Snapshot 中 nodeInfoList 的长度来计算最大 Node 数量,并预先分配 Node 切片。然后根据当前要调度的 NodeInfo,获取其 Node 的名称,在 SchedulingQueue 中查找对应的 Pod。遍历全部 Pod 数组,使用当前 Profile,并执行其 RunPreFilterExtensionAddPod 方法,如果通过,则将该 Pod 存入 Node 中。最后,对 Node 执行 Profile 的 RunFilterPlugins 方法。
进一步筛选已通过检查的 Node 列表,筛选出满足 SchedulerExtender 要求的 Node 列表。至此,确认了当前调度的 Pod 可选择的 Node 列表。筛选 Node 完成后,使用选中的 Profile 进行 PreScore 操作。
完成 PreScore 操作后,如果仍然存在多于一个可选 Node 的情况,将执行优先级运算,最终根据优先级运算结果经由 selectHost 方法确认要使用的 Host 名称,一次调度过程完成。
本文研究了 Kubernetes 中 Schedule 部分的源码,是 Scheduler 最后一部分。
当 podStates 中对应 Pod Key 中存储的 Pod 的 NodeName 与新 Pod 的 NodeName 不一致时,会执行 removePod 操作,其代码如下