[k8s源码分析][kubelet] devicemanager
1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/kubernetes/tree/tming-v1.13/pkg/kubelet/cm/devicemanager
分支: tming-v1.13 (基于v1.13版本)
k8s-device-plugin
分支: tming-v1.11(基于v1.11版本)
device manager and device plugin
1. [k8s源码分析][kubelet] devicemanager 之 pod_devices 和 checkpoint
2. [k8s源码分析][kubelet] devicemanager 之 使用device-plugin(模拟gpu)
3. [k8s源码分析][kubelet] devicemanager 之 device-plugin向kubelet注册
4. [k8s源码分析][kubelet] devicemanager 之 kubelet申请资源
5. [k8s源码分析][kubelet] devicemanager 之 重启kubelet和device-plugin
在上文的基础上来分析
device-plugin是如何向kubelet注册自身的资源的. 因为要走这个流程, 所以分析的时候两个项目会同时一起看.
2. pluginapi
在
k8s-device-plugin/server.go中开头有一句pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1", 关于grpc知识自行了解, 这里定义了方法.
// k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1
// constants.go
const (
Healthy = "Healthy"
Unhealthy = "Unhealthy"
Version = "v1beta1"
DevicePluginPath = "/var/lib/kubelet/device-plugins/"
KubeletSocket = DevicePluginPath + "kubelet.sock"
KubeletPreStartContainerRPCTimeoutInSecs = 30
)
var SupportedVersions = [...]string{"v1beta1"}
// api.proto
service DevicePlugin {
rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}
所有的
device plugin必须实现DevicePlugin中的四个方法, 此时device plugin会成为它的服务器端, 那客户端是什么呢?自然是device manager, 下面可以看到.
3. 启动device plugin
kubelet已经启动了, 因为集群肯定是要存在的, 不然怎么注册, 所以/var/lib/kubelet/device-plugins目录已经有kubelet.sock文件.
看看
device plugin的main方法.
// k8s-device-plugin/main.go
func main() {
...
L:
for {
if restart {
...
devicePlugin = NewNvidiaDevicePlugin()
if err := devicePlugin.Serve(); err != nil {
...
} else {
restart = false
}
}
...
}
}
// k8s-device-plugin/server.go
var serverSock = pluginapi.DevicePluginPath + "nvidia.sock"
func NewNvidiaDevicePlugin() *NvidiaDevicePlugin {
return &NvidiaDevicePlugin{
devs: getDevices(),
socket: serverSock,
stop: make(chan interface{}),
health: make(chan *pluginapi.Device),
}
}
// k8s-device-plugin/nvidia.go
func getDevices() []*pluginapi.Device {
n := uint(10)
var devs []*pluginapi.Device
for i := uint(0); i < n; i++ {
devs = append(devs, &pluginapi.Device{
ID: fmt.Sprintf("%v-%v", resourceName, i),
Health: pluginapi.Healthy,
})
}
return devs
}
关注这几行就行, 别的部分会在后面博客介绍.
NewNvidiaDevicePlugin初始化了一个NvidiaDevicePlugin对象.
devs:里面含有该device plugin的所有设备.
serverSock:此服务器端(device plugin)的地址, 供device manager调用.
Serve方法
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Serve() error {
// 启动当前服务 作为pluginapi的服务器端 供device manager调用
err := m.Start()
if err != nil {
log.Printf("Could not start device plugin: %s", err)
return err
}
log.Println("Starting to serve on", m.socket)
// 向kubelet发请求 其实就是向device manager发请求
err = m.Register(pluginapi.KubeletSocket, resourceName)
if err != nil {
log.Printf("Could not register device plugin: %s", err)
m.Stop()
return err
}
log.Println("Registered device plugin with Kubelet")
return nil
}
1. m.Start(): 启动本地服务, 为
kubelet其实就是device manager提供服务,device manager会通过pluginapi中那四个方法请求服务器端(也就是NvidiaDevicePlugin).
2. m.Register: 是向kubelet(pluginapi.KubeletSocket=/var/lib/kubelet/device-plugins/kubelet.sock)发请求注册资源resouceName.
来具体看一下
Start方法:
Start
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Start() error {
// 删除以前的服务器地址 因为要启动一个新的
// 就是删除m.socket=/var/lib/kubelet/device-plugins/nvidia.sock
err := m.cleanup()
if err != nil {
return err
}
// 启动服务 由于是本地进程间交流 所以用unix启动
sock, err := net.Listen("unix", m.socket)
if err != nil {
return err
}
// 注册m为pluginapi的服务器端
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterDevicePluginServer(m.server, m)
// goroutine方法启动
go m.server.Serve(sock)
// Wait for server to start by launching a blocking connexion
// 试一下有没有创建成功
conn, err := dial(m.socket, 5*time.Second)
if err != nil {
return err
}
conn.Close()
go m.healthcheck()
return nil
}
没有别的什么意思, 就是启动当前
NvidiaDevicePlugin对象m为pluginapi的服务器端, 地址为/var/lib/kubelet/device-plugins/nvidia.sock
Register
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Register(kubeletEndpoint, resourceName string) error {
// kubeletEndpoint = /var/lib/kubelet/device-plugins/kubelet.sock
conn, err := dial(kubeletEndpoint, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
// 创建一个连接服务器(地址为kubeletEndpoint)的客户端 就是device manager
client := pluginapi.NewRegistrationClient(conn)
// 构造请求内容
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
}
// 向device manager注册信息
// 可以看到device manager也是pluginapi的服务器
// 此时的NvidiaDevicePlugin为客户端 实现了Register方法
_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}
// k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1
service Registration {
rpc Register(RegisterRequest) returns (Empty) {}
}
1. 生成服务器端地址为
/var/lib/kubelet/device-plugins/kubelet.sock的客户端, 并向该服务器发送要注册的资源以及资源所拥有的设备.
device-plugin 端总结
关于
device plugin端的内容已经分析完了, 总结可以分为以下几个步骤:
device-plugin.png
可以看到向
kubelet注册资源的时候并没有把该资源所有的设备传过去, 那kubelet如何知道的呢? 接下来将分析kubelet中的device manager收到请求后会如何处理.
4. device manager 处理注册请求
在看注册方法之前先看一下它是如何启动的.
启动 Start
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
klog.V(2).Infof("Starting Device Plugin manager")
m.activePods = activePods
m.sourcesReady = sourcesReady
// Loads in allocatedDevices information from disk.
// 加载kubelet_internal-checkpoint
err := m.readCheckpoint()
if err != nil {
klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
}
socketPath := filepath.Join(m.socketdir, m.socketname)
os.MkdirAll(m.socketdir, 0755)
// Removes all stale sockets in m.socketdir. Device plugins can monitor
// this and use it as a signal to re-register with the new Kubelet.
if err := m.removeContents(m.socketdir); err != nil {
klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
}
// 启动服务端 地址为/var/lib/kubelet/device-plugins/kubelet.sock
s, err := net.Listen("unix", socketPath)
if err != nil {
klog.Errorf(errListenSocket+" %v", err)
return err
}
m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...)
// 注册自己为该服务器的服务
pluginapi.RegisterRegistrationServer(m.server, m)
go func() {
defer m.wg.Done()
m.server.Serve(s)
}()
klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
return nil
}
该方法就是启动服务端, 地址为
/var/lib/kubelet/device-plugins/kubelet.sock, 客户端就是所有的device plugin, 从上面device plugin中可以看到它生成了一个客户端, 并发送了请求.
所以接下来看一下Register方法
Register方法
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
// 看看是不是支持的version v1beta1
var versionCompatible bool
for _, v := range pluginapi.SupportedVersions {
if r.Version == v {
versionCompatible = true
break
}
}
// 如果不支持该version 返回错误信息给客户端的device-plugin
if !versionCompatible {
errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
return &pluginapi.Empty{}, fmt.Errorf(errorString)
}
if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
klog.Infof("Bad registration request from device plugin: %s", errorString)
return &pluginapi.Empty{}, fmt.Errorf(errorString)
}
go m.addEndpoint(r)
// 代表注册成功
return &pluginapi.Empty{}, nil
}
1. 判断是不是支持的
version. 不是直接返回错误.
2. 然后判断resouceName是不是ExtendedResourceName. 不是直接返回错误.
3. 异步调用addEndpoint. 从字面意思上看为此次注册添加一个endpoint.
4. 返回, 代表注册成功.
addEndpoint
在分析该方法之前有必要了解
Endpoint到底是什么以及它的作用.
Endpoint
type endpoint interface {
run()
stop()
allocate(devs []string) (*pluginapi.AllocateResponse, error)
preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
callback(resourceName string, devices []pluginapi.Device)
isStopped() bool
stopGracePeriodExpired() bool
}
type endpointImpl struct {
// 访问device plugin的客户端
client pluginapi.DevicePluginClient
clientConn *grpc.ClientConn
// device-plugin的地址
socketPath string
// device-plugin的资源名
resourceName string
// 停止的时间
stopTime time.Time
mutex sync.Mutex
// 回调函数
cb monitorCallback
}
endpoint:是一个接口, 定义了一些方法.
endpointImpl:是endpoint的一个实现类, 可以看到它的属性大部分都是记录的device plugin的信息, 其实endpointImpl是一个专门与device plugin打交道的结构体, 可以这样理解, 每当有一个device plugin请求注册的时候,device manager都会分配一个endpointImpl来处理与该device plugin之间的业务交流.
newEndpointImpl
func newEndpointImpl(socketPath, resourceName string, callback monitorCallback) (*endpointImpl, error) {
// 生成一个客户端 可以访问地址为socketPat(/var/lib/kubelet/device-plugins/nvidia.sock)h的device plugin
client, c, err := dial(socketPath)
if err != nil {
klog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
return nil, err
}
return &endpointImpl{
client: client,
clientConn: c,
socketPath: socketPath,
resourceName: resourceName,
cb: callback,
}, nil
}
func dial(unixSocketPath string) (pluginapi.DevicePluginClient, *grpc.ClientConn, error) {
...
c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
...
return pluginapi.NewDevicePluginClient(c), c, nil
}
从这里可以看到
newEndpointImpl中有可以访问某个device plugin的客户方.
addEndpoint
现在回过头来看看
addEndpoint方法, 关于callback方法在后面涉及.
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
// 生成一个endpointImpl实例
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
if err != nil {
klog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}
// 注册该endpoint
m.registerEndpoint(r.ResourceName, r.Options, new)
go func() {
// 运行该endpoint
m.runEndpoint(r.ResourceName, new)
}()
}
从客户端的
device plugin有是三个参数, 从上面的分析中可以看到:
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version, // v1beta1
Endpoint: path.Base(m.socket), // /var/lib/kubelet/device-plugins/nvidia.sock
ResourceName: resourceName, // 资源名字
}
现在已经通过
newEndpointImpl生成了一个可以与device plugin打交道的endpoint实例new. 那么device manager当然需要知道哪个resourceName是由哪个endpoint来进行处理的. 所以调用了registerEndpoint方法保存对应关系.
registerEndpoint
func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e endpoint) {
m.mutex.Lock()
defer m.mutex.Unlock()
// 保存到了一个map对象中
m.endpoints[resourceName] = endpointInfo{e: e, opts: options}
klog.V(2).Infof("Registered endpoint %v", e)
}
将
resourceName与endpoint的对应关系保存到一个map中, 让device manager知道哪个resouceName绑定的哪个endpoint.
runEndpoint
// pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
// 调用run方法 run方法是死循环
e.run()
e.stop()
m.mutex.Lock()
defer m.mutex.Unlock()
if old, ok := m.endpoints[resourceName]; ok && old.e == e {
m.markResourceUnhealthy(resourceName)
}
klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
}
看
run方法
// pkg/kubelet/cm/devicemanager/endpoint.go
func (e *endpointImpl) run() {
// 调用device plugin的ListAndWatch
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)
return
}
for {
// 有变化的时候就会接受到信息
response, err := stream.Recv()
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)
return
}
devs := response.Devices
klog.V(2).Infof("State pushed for device plugin %s", e.resourceName)
var newDevs []pluginapi.Device
for _, d := range devs {
newDevs = append(newDevs, *d)
}
// 然后调用endpoint的回调函数
e.callback(e.resourceName, newDevs)
}
}
可以看到此时利用
grpc与device plugin的ListAndWatch方法进行通信了. 看device plugin的ListAndWatch方法
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
}
}
}
重点: 可以看到
device plugin首先会把所有的设备发送给endpoint. 然后endpoint会调用自己的回调函数进行处理, 然后device plugin与endpoint之间就形成了通信, 只有device plugin中的ListAndWatch方法有变动,endpoint这边都会感知并调用自己的回调函数进行处理.
genericDeviceUpdateCallback
接下来看看
endpoint的回调函数做了什么操作. 在newEnpointImpl的时候传入了device manger的callback方法. 这里说明一下, 默认是genericDeviceUpdateCallback方法.
// pkg/kubelet/cm/devicemanager/endpoint.go
func (e *endpointImpl) callback(resourceName string, devices []pluginapi.Device) {
e.cb(resourceName, devices)
}
// pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
for _, dev := range devices {
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
}
m.mutex.Unlock()
m.writeCheckpoint()
}
看到该方法一目了然, 就是把
resourceName对应的设备放入到device manger中保存. 用两个map来保存哪些设备目前是healthy的, 哪些设备是unhealthy的.
所以可以看到
device plugin中发生变化, 对应的endpoint会感知, 进而将信息传到device manager中的healthyDevices和unhealthyDevices中.
device plugin --> endpoint --> device manager(healthyDevices, unhealthyDevices)
总结
到此处,
device manager端是如何处理device plugin中来注册资源的请求已经分析完了.
1. 只要有device plugin前来注册资源,device manager都会分配一个新的endpoint来与该device plugin通过ListAndWatch方法建立连接以及通信.
2. 但是device manager需要哪个resouceName是由哪个endpoint负责的, 所以有一个map结构的endpoints来保存它们之间的对应关系.
3. 只要device plugin中有设备信息变化, 对应的endpoint会立马感知并通过回调函数告诉device manager, 进而device manager可以更新资源的设备信息(healthyDevices和unhealthyDevices).
device-manager.png
5. 整体总结
1.
device plugin端启动自己服务, 地址为(/var/lib/kubelet/device-plugins/sock.sock).
2.device plugin向地址为(/var/lib/kubelet/device-plugins/kubelet.sock)发送注册请求(含有resoucename以及自己服务的地址/var/lib/kubelet/device-plugins/sock.sock).
3.device manager收到请求分配一个新的endpoint与该device plugin通过device plugin的ListAndWatch进行连接并通信.
4. 当device plugin的ListAndWatch有变化时, 对应的endpoint会感知并通过回调函数告知device manager需要更新它的资源以及对应设备信息(healthyDevices和unhealthyDevices)
overall.png
device-plugin.png
device-manager.png
overall.png