Tim Wang Tech Blog

深入了解Kubernetes控制器对象存储(object stores)和索引器(indexers)

本文是Understanding Kubernetes controllers part II – object stores and indexers的中文翻译版本,内容有删减

基本上,我们已经学会了如何使用 Kubernetes Go client来检索 Kubernetes 资源的信息,因此我们可以在我们的controller中简单地执行这个操作。然而,这有点低效。假设,例如,你正在使用多个工作线程,就像我们所做的那样。那么你可能会一遍又一遍地检索相同的信息,对 API 服务器造成很大负载。为了避免这种情况,可以使用一种特殊的 Kubernetes Informers 类型,称为index informers,它们构建一个线程安全的对象存储作为缓存。当集群的状态发生变化时,Informer 不仅会调用我们controller的处理函数,还会执行必要的更新以保持缓存的最新状态。由于缓存具有处理索引的额外能力,因此被称为 Indexer。因此,在今天的文章末尾,以下图片将呈现出来。

在本文的其余部分,我们将更详细地讨论索引器及其与 Informer 的交互,而在下一篇文章中,我们将学习如何创建和使用 Informer,并深入了解它们的内部运作。

Watches and resource versions

在我们讨论 Informers 和 Indexers 之前,我们必须了解客户端可以使用的基本机制,以跟踪集群状态。为了实现这一点,Kubernetes API 提供了一种称为 watch 的机制。最好通过一个例子来解释这个概念。

在继续之前,请确保你有一个运行中的 Kubernetes 集群。我们将使用 curl 直接与 API 进行交互。为了避免必须在请求中添加令牌或证书,我们将使用 kubectl 代理机制。因此,请在另一个单独的终端中运行:

$ kubectl proxy

此时你应该看到一个消息,表示该代理正在本地主机的某个端口上监听(通常是8001)。发送到该端口的任何请求将被转发到 Kubernetes API 服务器。为了访问我们的集群,让我们首先启动一个单独的 HTTPD。

$ kubectl run alpine --image=httpd:alpine

然后我们使用CURL来获取默认命名空间中正在运行的 pod 列表。

$ curl localhost:8001/api/v1/namespaces/default/pods
{
  "kind": "PodList",
  "apiVersion": "v1",
  "metadata": {
    "selfLink": "/api/v1/namespaces/default/pods",
    "resourceVersion": "6834"
  },
  "items": [
    {
      "metadata": {
        "name": "alpine-56cf65bbfc-tzqqx",
        "generateName": "alpine-56cf65bbfc-",
        "namespace": "default",
        "selfLink": "/api/v1/namespaces/default/pods/alpine-56cf65bbfc-tzqqx",
        "uid": "584ddf85-5f8d-11e9-80c0-080027696a3f",
        "resourceVersion": "6671",
--- REDACTED ---

正如预期的那样,你将获得一个 JSON 编码的对象类型 PodList。有趣的部分是元数据中的数据。你会看到有一个字段 resourceVersion。本质上,资源版本是一个随时间增加的数字,它唯一地标识集群的某个状态。

现在,Kubernetes API允许您请求watch,并以此资源版本作为起点。要手动完成此操作,请输入

$ curl -v localhost:8001/api/v1/namespaces/default/pods?watch=1&resourceVersion=6834

如果我们查看输出,你会发现这个请求返回了一个带有传输编码“chunked”的 HTTP 响应。这在 RFC 7230 中有规定,并将客户端置于流模式,即连接将保持打开状态,API server将继续以小块的方式发送更新。这将使 curl 进入后台运行,同时 curl 将继续将接收到的数据打印到终端。如果你现在在集群中创建额外的 pod 或删除现有的 pod,你将继续看到接收到的通知,通知你有关事件的信息。每个通知都包含一个类型(ADDED、MODIFIED、ERROR 或 DELETED)和一个Event

这为我们提供了一种以高效的方式获取集群完整状态的方法。我们首先使用普通的 API 请求列出所有资源。然后,我们记住响应中的资源版本,并将该资源版本作为监视的起点。每当我们收到关于更改的通知时,我们相应地更新本地数据。基本上,这正是InformerIndexer组合所做的事情。

Caching mechanisms and indexers

一个indexer是一个实现了接口 cache.Indexer 的对象。这个接口是从 cache.Store 派生出来的,所以让我们先来研究一下它。它的定义在 store.go

type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	ListKeys() []string
	Get(obj interface{}) (item interface{}, exists bool, err error)
	GetByKey(key string) (item interface{}, exists bool, err error)
	Replace([]interface{}, string) error
	Resync() error
}

所以 store 基本上是一个我们可以向其中添加对象、检索对象、更新对象或删除对象的东西。接口本身并不对键做任何假设,但是当你创建一个新的 store 时,你提供了一个key function,它从一个对象中提取键,并具有以下签名。

`type KeyFunc func(obj interface{}) (string, error)`

使用store是非常方便和简单的,你可以在这里找到一个简短的例子。

让我们验证一下,正如上面的图表所述,InformerLister都引用了相同的索引器。为了查看这一点,让我们看一下我们的 sample controller 的创建过程。

当由函数_NewController_ 创建一个新controller时,此函数接受一个 _DeploymentInformer_ 和一个 _FooInformer_。这些都是提供对相应资源的实际 InformerLister 访问的接口。让我们以 _FooInformer_ 为例。Lister 的实际创建方法如下。

func (f *fooInformer) Lister() v1alpha1.FooLister {
	return v1alpha1.NewFooLister(f.Informer().GetIndexer())
}

InformerIndexer之间的通信是通过函数 _handleDeltas_ 完成的,该函数接收一个 _Delta_ 对象列表,其定义在 delta_fifo.go 中(我们将在下一篇文章中了解更多关于这是如何工作的)。如果我们查看这个函数,我们会发现它不仅调用所有注册的处理函数(通过 _processor_),而且根据 delta 的类型还调用存储的 _Add__Update__Delete_ 方法。

// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
// optional.
type DeltaFIFOOptions struct {

	// KeyFunction is used to figure out what key an object should have. (It's
	// exposed in the returned DeltaFIFO's KeyOf() method, with additional
	// handling around deleted objects and queue state).
	// Optional, the default is MetaNamespaceKeyFunc.
	KeyFunction KeyFunc

	// KnownObjects is expected to return a list of keys that the consumer of
	// this queue "knows about". It is used to decide which items are missing
	// when Replace() is called; 'Deleted' deltas are produced for the missing items.
	// KnownObjects may be nil if you can tolerate missing deletions on Replace().
	KnownObjects KeyListerGetter

	// EmitDeltaTypeReplaced indicates that the queue consumer
	// understands the Replaced DeltaType. Before the `Replaced` event type was
	// added, calls to Replace() were handled the same as Sync(). For
	// backwards-compatibility purposes, this is false by default.
	// When true, `Replaced` events will be sent for items passed to a Replace() call.
	// When false, `Sync` events will be sent instead.
	EmitDeltaTypeReplaced bool

	// If set, will be called for objects before enqueueing them. Please
	// see the comment on TransformFunc for details.
	Transformer TransformFunc
}

我们现在对sample controller的工作原理有了一个相当完整的了解。Informer利用Kubernetes API及其基于资源版本的监视机制来获取集群状态的更新。这些更新用于维护一个对象存储,反映集群的当前状态,并调用定义的事件处理函数。controllerInformer注册其函数,以在资源发生更改时调用这些函数。然后,它可以访问对象存储,轻松检索资源的当前状态,并采取必要的措施推动系统朝向目标状态发展。