Skip to content

Operator模式

概述

Operator是Kubernetes的一种扩展模式,它将运维知识编码到软件中,实现复杂应用的自动化管理。Operator通过自定义控制器(Custom Controller)和自定义资源定义(CRD)来扩展Kubernetes API,使应用的生命周期管理变得自动化、可重复和可靠。Operator模式是云原生应用管理的核心模式,广泛应用于数据库、消息队列、监控系统等有状态应用的管理。

核心概念

1. Operator

Operator是Kubernetes的软件扩展,使用自定义资源管理应用及其组件:

  • 遵循Kubernetes原则(特别是控制循环)
  • 将运维知识编码到软件中
  • 自动执行复杂的运维任务
  • 提供声明式的API接口

2. 控制器模式

控制器是Kubernetes的核心设计模式:

  • 监控集群状态
  • 比较期望状态和实际状态
  • 调整实际状态以匹配期望状态
  • 持续运行的控制循环

3. 自定义资源(CR)

自定义资源是Kubernetes API的扩展:

  • 允许用户定义自己的资源类型
  • 通过CRD创建新的资源类型
  • 可以像内置资源一样使用kubectl操作

4. 控制循环(Reconcile Loop)

控制循环是Operator的核心机制:

┌─────────────────────────────────────────────────────────────┐
│                    Control Loop                              │
│                                                              │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐             │
│  │  Watch   │ -> │  Compare │ -> │  Reconcile│             │
│  │ Resources│    │  States  │    │  Actions │             │
│  └──────────┘    └──────────┘    └──────────┘             │
│       ↑                                 │                   │
│       └─────────────────────────────────┘                   │
└─────────────────────────────────────────────────────────────┘

Operator架构

整体架构

┌─────────────────────────────────────────────────────────────┐
│                    Kubernetes Cluster                         │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │              API Server                               │   │
│  │  ┌────────────┐  ┌────────────┐  ┌────────────┐     │   │
│  │  │ Built-in   │  │    CRD     │  │   CR       │     │   │
│  │  │ Resources  │  │ Definitions│  │  Instances │     │   │
│  │  └────────────┘  └────────────┘  └────────────┘     │   │
│  └──────────────────────────────────────────────────────┘   │
│                          ↑                                   │
│                          │ Watch/Update                      │
│                          ↓                                   │
│  ┌──────────────────────────────────────────────────────┐   │
│  │              Custom Controller                        │   │
│  │  ┌────────────┐  ┌────────────┐  ┌────────────┐     │   │
│  │  │  Informer  │  │  Workqueue │  │  Reconciler│     │   │
│  │  │  (Cache)   │  │            │  │            │     │   │
│  │  └────────────┘  └────────────┘  └────────────┘     │   │
│  └──────────────────────────────────────────────────────┘   │
│                          ↓                                   │
│                          │ Create/Update/Delete              │
│                          ↓                                   │
│  ┌──────────────────────────────────────────────────────┐   │
│  │              Managed Resources                        │   │
│  │  ┌────────────┐  ┌────────────┐  ┌────────────┐     │   │
│  │  │    Pods    │  │  Services  │  │ ConfigMaps │     │   │
│  │  └────────────┘  └────────────┘  └────────────┘     │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

控制器组件

1. Informer

Informer负责监控资源变化:

go
type Informer interface {
    AddEventHandler(handler ResourceEventHandler)
    Run(stopCh <-chan struct{})
    HasSynced() bool
}

informer := cache.NewSharedInformer(
    &cache.ListWatch{
        ListFunc:  func(options metav1.ListOptions) (runtime.Object, error) {},
        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {},
    },
    &MyResource{},
    time.Hour*12,
)

2. Workqueue

Workqueue处理事件队列:

go
type Workqueue interface {
    Add(item interface{})
    Get() (interface{}, bool)
    Done(item interface{})
    AddRateLimited(item interface{})
    Forget(item interface{})
}

queue := workqueue.NewRateLimitingQueue(
    workqueue.DefaultControllerRateLimiter(),
)

3. Reconciler

Reconciler实现协调逻辑:

go
type Reconciler interface {
    Reconcile(ctx context.Context, req Request) (Result, error)
}

func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    instance := &MyResource{}
    if err := r.Client.Get(ctx, req.NamespacedName, instance); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }
    
    return r.reconcileResource(ctx, instance)
}

Operator开发框架

Kubebuilder

安装Kubebuilder

bash
curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)
chmod +x kubebuilder
mv kubebuilder /usr/local/bin/

kubebuilder version

创建Operator项目

bash
mkdir my-operator
cd my-operator

kubebuilder init --domain my.domain --repo my.domain/my-operator

kubebuilder create api --group webapp --version v1 --kind WebApp

kubebuilder create webhook --group webapp --version v1 --kind WebApp --defaulting --programmatic-validation

项目结构

my-operator/
├── cmd/
│   └── main.go
├── config/
│   ├── crd/
│   │   ├── bases/
│   │   └── kustomization.yaml
│   ├── rbac/
│   │   ├── auth_proxy_client_clusterrole.yaml
│   │   ├── auth_proxy_role.yaml
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── role.yaml
│   │   ├── role_binding.yaml
│   │   └── service_account.yaml
│   ├── manager/
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── prometheus/
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   ├── webhook/
│   │   ├── kustomization.yaml
│   │   ├── kustomizeconfig.yaml
│   │   ├── manifests.yaml
│   │   └── service.yaml
│   └── samples/
│       └── webapp_v1_webapp.yaml
├── api/
│   └── v1/
│       ├── groupversion_info.go
│       ├── webapp_types.go
│       ├── webapp_webhook.go
│       └── zz_generated.deepcopy.go
├── controllers/
│   ├── suite_test.go
│   └── webapp_controller.go
├── hack/
│   └── boilerplate.go.txt
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
└── go.mod

Operator SDK

安装Operator SDK

bash
curl -LO https://github.com/operator-framework/operator-sdk/releases/download/v1.32.0/operator-sdk_linux_amd64
chmod +x operator-sdk_linux_amd64
sudo mv operator-sdk_linux_amd64 /usr/local/bin/operator-sdk

operator-sdk version

创建Operator项目

bash
mkdir my-operator
cd my-operator

operator-sdk init --domain my.domain --repo github.com/example/my-operator

operator-sdk create api --group webapp --version v1 --kind WebApp --resource --controller

operator-sdk create webhook --group webapp --version v1 --kind WebApp --defaulting --programmatic-validation

实践示例

示例1:WebApp Operator

API定义

go
package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type WebAppSpec struct {
    Replicas    *int32                      `json:"replicas"`
    Image       string                      `json:"image"`
    Port        int32                       `json:"port"`
    Resources   ResourceRequirements        `json:"resources,omitempty"`
    Config      map[string]string           `json:"config,omitempty"`
    Storage     StorageSpec                 `json:"storage,omitempty"`
    Ingress     *IngressSpec                `json:"ingress,omitempty"`
}

type ResourceRequirements struct {
    Requests ResourceList `json:"requests,omitempty"`
    Limits   ResourceList `json:"limits,omitempty"`
}

type ResourceList struct {
    CPU    string `json:"cpu,omitempty"`
    Memory string `json:"memory,omitempty"`
}

type StorageSpec struct {
    Enabled      bool   `json:"enabled,omitempty"`
    Size         string `json:"size,omitempty"`
    StorageClass string `json:"storageClass,omitempty"`
}

type IngressSpec struct {
    Enabled  bool              `json:"enabled,omitempty"`
    Host     string            `json:"host,omitempty"`
    TLS      *IngressTLSSpec   `json:"tls,omitempty"`
    Annotations map[string]string `json:"annotations,omitempty"`
}

type IngressTLSSpec struct {
    Enabled   bool   `json:"enabled,omitempty"`
    SecretName string `json:"secretName,omitempty"`
}

type WebAppStatus struct {
    Replicas      int32  `json:"replicas"`
    ReadyReplicas int32  `json:"readyReplicas"`
    Phase         string `json:"phase,omitempty"`
    Message       string `json:"message,omitempty"`
}

type WebApp struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    
    Spec   WebAppSpec   `json:"spec,omitempty"`
    Status WebAppStatus `json:"status,omitempty"`
}

type WebAppList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    
    Items []WebApp `json:"items"`
}

func init() {
    SchemeBuilder.Register(&WebApp{}, &WebAppList{})
}

控制器实现

go
package controllers

import (
    "context"
    "fmt"
    
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    networkingv1 "k8s.io/api/networking/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/api/resource"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/types"
    "k8s.io/apimachinery/pkg/util/intstr"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/log"
    
    webappv1 "my.domain/my-operator/api/v1"
)

type WebAppReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    logger := log.FromContext(ctx)
    
    instance := &webappv1.WebApp{}
    if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
        if errors.IsNotFound(err) {
            logger.Info("WebApp resource not found. Ignoring since object must be deleted")
            return ctrl.Result{}, nil
        }
        logger.Error(err, "Failed to get WebApp")
        return ctrl.Result{}, err
    }
    
    if err := r.reconcileDeployment(ctx, instance); err != nil {
        logger.Error(err, "Failed to reconcile Deployment")
        return ctrl.Result{}, err
    }
    
    if err := r.reconcileService(ctx, instance); err != nil {
        logger.Error(err, "Failed to reconcile Service")
        return ctrl.Result{}, err
    }
    
    if instance.Spec.Ingress != nil && instance.Spec.Ingress.Enabled {
        if err := r.reconcileIngress(ctx, instance); err != nil {
            logger.Error(err, "Failed to reconcile Ingress")
            return ctrl.Result{}, err
        }
    }
    
    if err := r.updateStatus(ctx, instance); err != nil {
        logger.Error(err, "Failed to update status")
        return ctrl.Result{}, err
    }
    
    return ctrl.Result{RequeueAfter: time.Minute}, nil
}

func (r *WebAppReconciler) reconcileDeployment(ctx context.Context, webapp *webappv1.WebApp) error {
    deployment := &appsv1.Deployment{}
    err := r.Get(ctx, types.NamespacedName{Name: webapp.Name, Namespace: webapp.Namespace}, deployment)
    
    if err != nil && errors.IsNotFound(err) {
        deployment = r.createDeployment(webapp)
        if err := ctrl.SetControllerReference(webapp, deployment, r.Scheme); err != nil {
            return err
        }
        return r.Create(ctx, deployment)
    } else if err != nil {
        return err
    }
    
    updated := r.updateDeployment(deployment, webapp)
    if updated {
        return r.Update(ctx, deployment)
    }
    
    return nil
}

func (r *WebAppReconciler) createDeployment(webapp *webappv1.WebApp) *appsv1.Deployment {
    replicas := int32(1)
    if webapp.Spec.Replicas != nil {
        replicas = *webapp.Spec.Replicas
    }
    
    return &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      webapp.Name,
            Namespace: webapp.Namespace,
            Labels: map[string]string{
                "app":        webapp.Name,
                "app.kubernetes.io/name":      webapp.Name,
                "app.kubernetes.io/instance":  webapp.Name,
                "app.kubernetes.io/managed-by": "webapp-operator",
            },
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: &replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: map[string]string{
                    "app": webapp.Name,
                },
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: map[string]string{
                        "app": webapp.Name,
                    },
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  "webapp",
                            Image: webapp.Spec.Image,
                            Ports: []corev1.ContainerPort{
                                {
                                    ContainerPort: webapp.Spec.Port,
                                },
                            },
                            Resources: corev1.ResourceRequirements{
                                Requests: corev1.ResourceList{
                                    corev1.ResourceCPU:    resource.MustParse(webapp.Spec.Resources.Requests.CPU),
                                    corev1.ResourceMemory: resource.MustParse(webapp.Spec.Resources.Requests.Memory),
                                },
                                Limits: corev1.ResourceList{
                                    corev1.ResourceCPU:    resource.MustParse(webapp.Spec.Resources.Limits.CPU),
                                    corev1.ResourceMemory: resource.MustParse(webapp.Spec.Resources.Limits.Memory),
                                },
                            },
                            Env: r.createEnvVars(webapp),
                        },
                    },
                },
            },
        },
    }
}

func (r *WebAppReconciler) createEnvVars(webapp *webappv1.WebApp) []corev1.EnvVar {
    envVars := []corev1.EnvVar{}
    for key, value := range webapp.Spec.Config {
        envVars = append(envVars, corev1.EnvVar{
            Name:  key,
            Value: value,
        })
    }
    return envVars
}

func (r *WebAppReconciler) reconcileService(ctx context.Context, webapp *webappv1.WebApp) error {
    service := &corev1.Service{}
    err := r.Get(ctx, types.NamespacedName{Name: webapp.Name, Namespace: webapp.Namespace}, service)
    
    if err != nil && errors.IsNotFound(err) {
        service = r.createService(webapp)
        if err := ctrl.SetControllerReference(webapp, service, r.Scheme); err != nil {
            return err
        }
        return r.Create(ctx, service)
    } else if err != nil {
        return err
    }
    
    return nil
}

func (r *WebAppReconciler) createService(webapp *webappv1.WebApp) *corev1.Service {
    return &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name:      webapp.Name,
            Namespace: webapp.Namespace,
            Labels: map[string]string{
                "app": webapp.Name,
            },
        },
        Spec: corev1.ServiceSpec{
            Selector: map[string]string{
                "app": webapp.Name,
            },
            Ports: []corev1.ServicePort{
                {
                    Port:       webapp.Spec.Port,
                    TargetPort: intstr.FromInt(int(webapp.Spec.Port)),
                },
            },
        },
    }
}

func (r *WebAppReconciler) reconcileIngress(ctx context.Context, webapp *webappv1.WebApp) error {
    ingress := &networkingv1.Ingress{}
    err := r.Get(ctx, types.NamespacedName{Name: webapp.Name, Namespace: webapp.Namespace}, ingress)
    
    if err != nil && errors.IsNotFound(err) {
        ingress = r.createIngress(webapp)
        if err := ctrl.SetControllerReference(webapp, ingress, r.Scheme); err != nil {
            return err
        }
        return r.Create(ctx, ingress)
    } else if err != nil {
        return err
    }
    
    return nil
}

func (r *WebAppReconciler) createIngress(webapp *webappv1.WebApp) *networkingv1.Ingress {
    pathType := networkingv1.PathTypePrefix
    
    ingress := &networkingv1.Ingress{
        ObjectMeta: metav1.ObjectMeta{
            Name:        webapp.Name,
            Namespace:   webapp.Namespace,
            Annotations: webapp.Spec.Ingress.Annotations,
        },
        Spec: networkingv1.IngressSpec{
            Rules: []networkingv1.IngressRule{
                {
                    Host: webapp.Spec.Ingress.Host,
                    IngressRuleValue: networkingv1.IngressRuleValue{
                        HTTP: &networkingv1.HTTPIngressRuleValue{
                            Paths: []networkingv1.HTTPIngressPath{
                                {
                                    Path:     "/",
                                    PathType: &pathType,
                                    Backend: networkingv1.IngressBackend{
                                        Service: &networkingv1.IngressServiceBackend{
                                            Name: webapp.Name,
                                            Port: networkingv1.ServiceBackendPort{
                                                Number: webapp.Spec.Port,
                                            },
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
    }
    
    if webapp.Spec.Ingress.TLS != nil && webapp.Spec.Ingress.TLS.Enabled {
        ingress.Spec.TLS = []networkingv1.IngressTLS{
            {
                Hosts:      []string{webapp.Spec.Ingress.Host},
                SecretName: webapp.Spec.Ingress.TLS.SecretName,
            },
        }
    }
    
    return ingress
}

func (r *WebAppReconciler) updateStatus(ctx context.Context, webapp *webappv1.WebApp) error {
    deployment := &appsv1.Deployment{}
    if err := r.Get(ctx, types.NamespacedName{Name: webapp.Name, Namespace: webapp.Namespace}, deployment); err != nil {
        return err
    }
    
    webapp.Status.Replicas = deployment.Status.Replicas
    webapp.Status.ReadyReplicas = deployment.Status.ReadyReplicas
    
    if deployment.Status.ReadyReplicas == deployment.Status.Replicas {
        webapp.Status.Phase = "Running"
        webapp.Status.Message = "All replicas are ready"
    } else {
        webapp.Status.Phase = "Updating"
        webapp.Status.Message = fmt.Sprintf("%d/%d replicas ready", deployment.Status.ReadyReplicas, deployment.Status.Replicas)
    }
    
    return r.Status().Update(ctx, webapp)
}

func (r *WebAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&webappv1.WebApp{}).
        Owns(&appsv1.Deployment{}).
        Owns(&corev1.Service{}).
        Owns(&networkingv1.Ingress{}).
        Complete(r)
}

CRD定义

yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: webapps.webapp.my.domain
spec:
  group: webapp.my.domain
  names:
    kind: WebApp
    listKind: WebAppList
    plural: webapps
    singular: webapp
  scope: Namespaced
  versions:
  - name: v1
    served: true
    storage: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            required:
            - image
            - port
            properties:
              replicas:
                type: integer
                minimum: 1
                maximum: 100
              image:
                type: string
              port:
                type: integer
                minimum: 1
                maximum: 65535
              resources:
                type: object
                properties:
                  requests:
                    type: object
                    properties:
                      cpu:
                        type: string
                      memory:
                        type: string
                  limits:
                    type: object
                    properties:
                      cpu:
                        type: string
                      memory:
                        type: string
              config:
                type: object
                additionalProperties:
                  type: string
              storage:
                type: object
                properties:
                  enabled:
                    type: boolean
                  size:
                    type: string
                  storageClass:
                    type: string
              ingress:
                type: object
                properties:
                  enabled:
                    type: boolean
                  host:
                    type: string
                  annotations:
                    type: object
                    additionalProperties:
                      type: string
                  tls:
                    type: object
                    properties:
                      enabled:
                        type: boolean
                      secretName:
                        type: string
          status:
            type: object
            properties:
              replicas:
                type: integer
              readyReplicas:
                type: integer
              phase:
                type: string
              message:
                type: string
    subresources:
      status: {}
      scale:
        specReplicasPath: .spec.replicas
        statusReplicasPath: .status.replicas
    additionalPrinterColumns:
    - name: Replicas
      type: integer
      jsonPath: .spec.replicas
    - name: Ready
      type: integer
      jsonPath: .status.readyReplicas
    - name: Phase
      type: string
      jsonPath: .status.phase
    - name: Age
      type: date
      jsonPath: .metadata.creationTimestamp

示例CR

yaml
apiVersion: webapp.my.domain/v1
kind: WebApp
metadata:
  name: my-webapp
  namespace: default
spec:
  replicas: 3
  image: nginx:1.25
  port: 80
  resources:
    requests:
      cpu: "100m"
      memory: "128Mi"
    limits:
      cpu: "500m"
      memory: "512Mi"
  config:
    ENVIRONMENT: "production"
    LOG_LEVEL: "info"
  ingress:
    enabled: true
    host: my-webapp.example.com
    annotations:
      kubernetes.io/ingress.class: nginx
      cert-manager.io/cluster-issuer: letsencrypt-prod
    tls:
      enabled: true
      secretName: my-webapp-tls

示例2:数据库Operator

API定义

go
package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type DatabaseSpec struct {
    Version          string            `json:"version"`
    StorageSize      string            `json:"storageSize"`
    StorageClass     string            `json:"storageClass,omitempty"`
    Replicas         int32             `json:"replicas"`
    Resources        ResourceRequirements `json:"resources,omitempty"`
    Database         string            `json:"database"`
    Credentials      CredentialsSpec   `json:"credentials"`
    Backup           *BackupSpec       `json:"backup,omitempty"`
    Monitoring       *MonitoringSpec   `json:"monitoring,omitempty"`
}

type CredentialsSpec struct {
    SecretName string `json:"secretName"`
    UsernameKey string `json:"usernameKey"`
    PasswordKey string `json:"passwordKey"`
}

type BackupSpec struct {
    Enabled      bool   `json:"enabled,omitempty"`
    Schedule     string `json:"schedule,omitempty"`
    StorageSize  string `json:"storageSize,omitempty"`
    Retention    int    `json:"retention,omitempty"`
}

type MonitoringSpec struct {
    Enabled bool `json:"enabled,omitempty"`
}

type DatabaseStatus struct {
    Phase           string       `json:"phase,omitempty"`
    CurrentReplicas int32        `json:"currentReplicas"`
    ReadyReplicas   int32        `json:"readyReplicas"`
    CurrentVersion  string       `json:"currentVersion,omitempty"`
    LastBackup      metav1.Time  `json:"lastBackup,omitempty"`
    Conditions      []metav1.Condition `json:"conditions,omitempty"`
}

type Database struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    
    Spec   DatabaseSpec   `json:"spec,omitempty"`
    Status DatabaseStatus `json:"status,omitempty"`
}

type DatabaseList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    
    Items []Database `json:"items"`
}

func init() {
    SchemeBuilder.Register(&Database{}, &DatabaseList{})
}

控制器实现

go
package controllers

import (
    "context"
    "fmt"
    "time"
    
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/api/resource"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/types"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/log"
    
    dbv1 "my.domain/my-operator/api/v1"
)

type DatabaseReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    logger := log.FromContext(ctx)
    
    instance := &dbv1.Database{}
    if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
        if errors.IsNotFound(err) {
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }
    
    if err := r.reconcileSecret(ctx, instance); err != nil {
        return ctrl.Result{}, err
    }
    
    if err := r.reconcilePVC(ctx, instance); err != nil {
        return ctrl.Result{}, err
    }
    
    if err := r.reconcileStatefulSet(ctx, instance); err != nil {
        return ctrl.Result{}, err
    }
    
    if err := r.reconcileService(ctx, instance); err != nil {
        return ctrl.Result{}, err
    }
    
    if instance.Spec.Backup != nil && instance.Spec.Backup.Enabled {
        if err := r.reconcileBackupJob(ctx, instance); err != nil {
            return ctrl.Result{}, err
        }
    }
    
    if err := r.updateStatus(ctx, instance); err != nil {
        return ctrl.Result{}, err
    }
    
    return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}

func (r *DatabaseReconciler) reconcileSecret(ctx context.Context, db *dbv1.Database) error {
    secret := &corev1.Secret{}
    err := r.Get(ctx, types.NamespacedName{Name: db.Spec.Credentials.SecretName, Namespace: db.Namespace}, secret)
    
    if err != nil && errors.IsNotFound(err) {
        secret = &corev1.Secret{
            ObjectMeta: metav1.ObjectMeta{
                Name:      db.Spec.Credentials.SecretName,
                Namespace: db.Namespace,
            },
            StringData: map[string]string{
                db.Spec.Credentials.UsernameKey: "admin",
                db.Spec.Credentials.PasswordKey: r.generatePassword(),
            },
        }
        
        if err := ctrl.SetControllerReference(db, secret, r.Scheme); err != nil {
            return err
        }
        
        return r.Create(ctx, secret)
    }
    
    return err
}

func (r *DatabaseReconciler) reconcilePVC(ctx context.Context, db *dbv1.Database) error {
    for i := int32(0); i < db.Spec.Replicas; i++ {
        pvcName := fmt.Sprintf("data-%s-%d", db.Name, i)
        pvc := &corev1.PersistentVolumeClaim{}
        err := r.Get(ctx, types.NamespacedName{Name: pvcName, Namespace: db.Namespace}, pvc)
        
        if err != nil && errors.IsNotFound(err) {
            storageClass := db.Spec.StorageClass
            pvc = &corev1.PersistentVolumeClaim{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      pvcName,
                    Namespace: db.Namespace,
                },
                Spec: corev1.PersistentVolumeClaimSpec{
                    AccessModes: []corev1.PersistentVolumeAccessMode{
                        corev1.ReadWriteOnce,
                    },
                    Resources: corev1.ResourceRequirements{
                        Requests: corev1.ResourceList{
                            corev1.ResourceStorage: resource.MustParse(db.Spec.StorageSize),
                        },
                    },
                },
            }
            
            if storageClass != "" {
                pvc.Spec.StorageClassName = &storageClass
            }
            
            if err := ctrl.SetControllerReference(db, pvc, r.Scheme); err != nil {
                return err
            }
            
            if err := r.Create(ctx, pvc); err != nil {
                return err
            }
        } else if err != nil {
            return err
        }
    }
    
    return nil
}

func (r *DatabaseReconciler) reconcileStatefulSet(ctx context.Context, db *dbv1.Database) error {
    sts := &appsv1.StatefulSet{}
    err := r.Get(ctx, types.NamespacedName{Name: db.Name, Namespace: db.Namespace}, sts)
    
    if err != nil && errors.IsNotFound(err) {
        sts = r.createStatefulSet(db)
        if err := ctrl.SetControllerReference(db, sts, r.Scheme); err != nil {
            return err
        }
        return r.Create(ctx, sts)
    } else if err != nil {
        return err
    }
    
    return nil
}

func (r *DatabaseReconciler) createStatefulSet(db *dbv1.Database) *appsv1.StatefulSet {
    return &appsv1.StatefulSet{
        ObjectMeta: metav1.ObjectMeta{
            Name:      db.Name,
            Namespace: db.Namespace,
        },
        Spec: appsv1.StatefulSetSpec{
            ServiceName: db.Name,
            Replicas:    &db.Spec.Replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: map[string]string{
                    "app": db.Name,
                },
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: map[string]string{
                        "app": db.Name,
                    },
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  "database",
                            Image: fmt.Sprintf("postgres:%s", db.Spec.Version),
                            Ports: []corev1.ContainerPort{
                                {
                                    ContainerPort: 5432,
                                },
                            },
                            Env: []corev1.EnvVar{
                                {
                                    Name: "POSTGRES_DB",
                                    Value: db.Spec.Database,
                                },
                                {
                                    Name: "POSTGRES_USER",
                                    ValueFrom: &corev1.EnvVarSource{
                                        SecretKeyRef: &corev1.SecretKeySelector{
                                            LocalObjectReference: corev1.LocalObjectReference{
                                                Name: db.Spec.Credentials.SecretName,
                                            },
                                            Key: db.Spec.Credentials.UsernameKey,
                                        },
                                    },
                                },
                                {
                                    Name: "POSTGRES_PASSWORD",
                                    ValueFrom: &corev1.EnvVarSource{
                                        SecretKeyRef: &corev1.SecretKeySelector{
                                            LocalObjectReference: corev1.LocalObjectReference{
                                                Name: db.Spec.Credentials.SecretName,
                                            },
                                            Key: db.Spec.Credentials.PasswordKey,
                                        },
                                    },
                                },
                            },
                            VolumeMounts: []corev1.VolumeMount{
                                {
                                    Name:      "data",
                                    MountPath: "/var/lib/postgresql/data",
                                },
                            },
                        },
                    },
                },
            },
            VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
                {
                    ObjectMeta: metav1.ObjectMeta{
                        Name: "data",
                    },
                    Spec: corev1.PersistentVolumeClaimSpec{
                        AccessModes: []corev1.PersistentVolumeAccessMode{
                            corev1.ReadWriteOnce,
                        },
                        Resources: corev1.ResourceRequirements{
                            Requests: corev1.ResourceList{
                                corev1.ResourceStorage: resource.MustParse(db.Spec.StorageSize),
                            },
                        },
                    },
                },
            },
        },
    }
}

func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&dbv1.Database{}).
        Owns(&appsv1.StatefulSet{}).
        Owns(&corev1.Service{}).
        Owns(&corev1.Secret{}).
        Complete(r)
}

示例3:使用Helm Operator

Helm Operator配置

yaml
apiVersion: operators.coreos.com/v1alpha1
kind: ClusterServiceVersion
metadata:
  name: my-app-operator.v1.0.0
  namespace: operators
spec:
  displayName: My Application Operator
  version: 1.0.0
  provider:
    name: My Company
  maturity: stable
  installModes:
  - type: OwnNamespace
    supported: true
  - type: SingleNamespace
    supported: true
  - type: MultiNamespace
    supported: false
  - type: AllNamespaces
    supported: true
  install:
    strategy: deployment
    spec:
      deployments:
      - name: my-app-operator
        spec:
          replicas: 1
          selector:
            matchLabels:
              name: my-app-operator
          template:
            metadata:
              labels:
                name: my-app-operator
            spec:
              serviceAccountName: my-app-operator
              containers:
              - name: my-app-operator
                image: my-registry/my-app-operator:1.0.0
                env:
                - name: WATCH_NAMESPACE
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.annotations['olm.targetNamespaces']
                - name: POD_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.name
                - name: OPERATOR_NAME
                  value: "my-app-operator"
  customresourcedefinitions:
    owned:
    - name: myapps.myapp.example.com
      version: v1alpha1
      kind: MyApp
      displayName: My Application
      description: Represents a deployment of My Application

生命周期管理

Operator生命周期

安装阶段

yaml
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
  name: my-operator
  namespace: operators
spec:
  channel: stable
  name: my-operator
  source: operatorhubio-catalog
  sourceNamespace: olm
  startingCSV: my-operator.v1.0.0
  installPlanApproval: Automatic

升级阶段

yaml
apiVersion: operators.coreos.com/v1alpha1
kind: InstallPlan
metadata:
  name: install-plan
  namespace: operators
spec:
  approval: Automatic
  approved: true
  clusterServiceVersionNames:
  - my-operator.v1.1.0

卸载阶段

bash
kubectl delete subscription my-operator -n operators

kubectl delete clusterserviceversion my-operator.v1.0.0 -n operators

kubectl delete crd myapps.myapp.example.com

应用生命周期

创建

yaml
apiVersion: webapp.my.domain/v1
kind: WebApp
metadata:
  name: my-app
spec:
  replicas: 3
  image: nginx:latest
  port: 80

更新

yaml
apiVersion: webapp.my.domain/v1
kind: WebApp
metadata:
  name: my-app
spec:
  replicas: 5
  image: nginx:1.25
  port: 80

扩缩容

bash
kubectl patch webapp my-app --type merge -p '{"spec":{"replicas":10}}'

kubectl scale webapp my-app --replicas=5

删除

bash
kubectl delete webapp my-app

kubectl操作命令

CRD管理

bash
kubectl get crd

kubectl describe crd webapps.webapp.my.domain

kubectl get crd webapps.webapp.my.domain -o yaml

kubectl apply -f crd.yaml

kubectl delete crd webapps.webapp.my.domain

CR管理

bash
kubectl get webapp

kubectl get webapp -o wide

kubectl describe webapp my-webapp

kubectl get webapp my-webapp -o yaml

kubectl apply -f webapp-cr.yaml

kubectl edit webapp my-webapp

kubectl delete webapp my-webapp

kubectl patch webapp my-webapp --type merge -p '{"spec":{"replicas":5}}'

Operator管理

bash
kubectl get deployment -n operators

kubectl get pods -n operators -l name=my-operator

kubectl logs -n operators -l name=my-operator -f

kubectl describe pod -n operators -l name=my-operator

kubectl get events -n operators --sort-by='.lastTimestamp'

调试命令

bash
kubectl get webapp my-webapp -o jsonpath='{.status}'

kubectl get webapp my-webapp -o jsonpath='{.status.conditions}'

kubectl auth can-i create webapps.webapp.my.domain --as=system:serviceaccount:default:default

kubectl auth can-i list webapps.webapp.my.domain --all-namespaces

kubectl api-resources | grep webapp

kubectl explain webapp.spec

kubectl explain webapp.status

故障排查指南

问题1:CRD未创建

症状

bash
kubectl get webapp
error: the server doesn't have a resource type "webapp"

排查步骤

bash
kubectl get crd | grep webapp

kubectl describe crd webapps.webapp.my.domain

kubectl get api-resources | grep webapp

kubectl logs -n operators -l name=my-operator

解决方案

  • 检查CRD定义是否正确
  • 确认Operator已安装并运行
  • 验证RBAC权限
  • 检查API版本兼容性

问题2:Operator未协调资源

症状

bash
kubectl get webapp my-webapp
NAME       REPLICAS   READY   PHASE     AGE
my-webapp  3          0       Pending   10m

排查步骤

bash
kubectl describe webapp my-webapp

kubectl logs -n operators -l name=my-operator

kubectl get events --field-selector involvedObject.name=my-webapp

kubectl get deployment,service,ingress -l app=my-webapp

解决方案

  • 检查Operator日志错误
  • 验证控制器逻辑
  • 确认资源配额充足
  • 检查网络策略限制

问题3:Webhook错误

症状

bash
Error from server (InternalError): error when creating "webapp-cr.yaml": Internal error occurred: failed calling webhook "mwebapp.kb.io": Post "https://my-operator-webhook.operators.svc:443/mutate-webapp-my-domain-v1-webapp?timeout=10s": no such host

排查步骤

bash
kubectl get validatingwebhookconfigurations

kubectl get mutatingwebhookconfigurations

kubectl get svc -n operators

kubectl logs -n operators -l name=my-operator

kubectl get cert -n operators

解决方案

  • 检查Webhook服务是否正常
  • 验证证书配置
  • 确认Webhook配置正确
  • 检查网络连接

问题4:RBAC权限不足

症状

bash
E0115 10:30:00.123456   12345 controller.go:123] failed to reconcile: webapps.webapp.my.domain is forbidden: User "system:serviceaccount:operators:my-operator" cannot create resource "webapps" in API group "webapp.my.domain" at the cluster scope

排查步骤

bash
kubectl get clusterrole | grep my-operator

kubectl get clusterrolebinding | grep my-operator

kubectl describe clusterrole my-operator-role

kubectl auth can-i create webapps.webapp.my.domain --as=system:serviceaccount:operators:my-operator

解决方案

yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: my-operator-role
rules:
- apiGroups: ["webapp.my.domain"]
  resources: ["webapps"]
  verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
- apiGroups: ["webapp.my.domain"]
  resources: ["webapps/status"]
  verbs: ["get", "patch", "update"]

问题5:资源状态更新失败

症状

bash
kubectl get webapp my-webapp -o yaml
status:
  phase: ""
  replicas: 0

排查步骤

bash
kubectl describe webapp my-webapp

kubectl logs -n operators -l name=my-operator | grep -i error

kubectl get webapp my-webapp -o jsonpath='{.metadata.resourceVersion}'

kubectl auth can-i update webapps/status --as=system:serviceaccount:operators:my-operator

解决方案

  • 检查状态子资源权限
  • 验证状态更新逻辑
  • 确认资源版本冲突处理
  • 检查控制器重试机制

最佳实践

1. 设计最佳实践

API设计

go
type MyResourceSpec struct {
    Version string `json:"version"`
    
    Replicas *int32 `json:"replicas,omitempty"`
    
    Image string `json:"image"`
    
    Resources corev1.ResourceRequirements `json:"resources,omitempty"`
    
    Affinity *corev1.Affinity `json:"affinity,omitempty"`
    
    Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
    
    NodeSelector map[string]string `json:"nodeSelector,omitempty"`
}

type MyResourceStatus struct {
    Phase string `json:"phase,omitempty"`
    
    Conditions []metav1.Condition `json:"conditions,omitempty"`
    
    ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

状态管理

go
func (r *MyReconciler) updateCondition(instance *MyResource, conditionType string, status metav1.ConditionStatus, reason, message string) {
    condition := metav1.Condition{
        Type:               conditionType,
        Status:             status,
        Reason:             reason,
        Message:            message,
        LastTransitionTime: metav1.Now(),
    }
    
    meta.SetStatusCondition(&instance.Status.Conditions, condition)
}

2. 控制器实现最佳实践

幂等性

go
func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    instance := &MyResource{}
    if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
        if errors.IsNotFound(err) {
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }
    
    if instance.DeletionTimestamp != nil {
        return r.handleDeletion(ctx, instance)
    }
    
    return r.handleCreationOrUpdate(ctx, instance)
}

错误处理

go
func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    instance := &MyResource{}
    if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }
    
    result, err := r.reconcile(ctx, instance)
    if err != nil {
        if errors.IsConflict(err) {
            return ctrl.Result{Requeue: true}, nil
        }
        return ctrl.Result{}, err
    }
    
    return result, nil
}

事件记录

go
func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    logger := log.FromContext(ctx)
    recorder := r.Recorder
    
    instance := &MyResource{}
    if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }
    
    logger.Info("Reconciling MyResource", "name", instance.Name)
    
    recorder.Event(instance, corev1.EventTypeNormal, "Reconciling", "Starting reconciliation")
    
    return ctrl.Result{}, nil
}

3. 测试最佳实践

单元测试

go
func TestReconcile(t *testing.T) {
    scheme := runtime.NewScheme()
    _ = MyResourceV1.AddToScheme(scheme)
    _ = appsv1.AddToScheme(scheme)
    _ = corev1.AddToScheme(scheme)
    
    ns := &corev1.Namespace{
        ObjectMeta: metav1.ObjectMeta{
            Name: "test",
        },
    }
    
    instance := &MyResource{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "test-instance",
            Namespace: "test",
        },
        Spec: MyResourceSpec{
            Replicas: pointer.Int32Ptr(3),
            Image:    "nginx:latest",
        },
    }
    
    client := fake.NewClientBuilder().
        WithScheme(scheme).
        WithObjects(ns, instance).
        Build()
    
    reconciler := &MyReconciler{
        Client: client,
        Scheme: scheme,
    }
    
    req := ctrl.Request{
        NamespacedName: types.NamespacedName{
            Name:      "test-instance",
            Namespace: "test",
        },
    }
    
    result, err := reconciler.Reconcile(context.Background(), req)
    
    assert.NoError(t, err)
    assert.Equal(t, ctrl.Result{RequeueAfter: time.Minute}, result)
}

集成测试

go
func TestIntegration(t *testing.T) {
    cfg, err := ctrl.GetConfig()
    require.NoError(t, err)
    
    scheme := runtime.NewScheme()
    _ = MyResourceV1.AddToScheme(scheme)
    
    k8sClient, err := client.New(cfg, client.Options{Scheme: scheme})
    require.NoError(t, err)
    
    ctx := context.Background()
    
    instance := &MyResource{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "test-instance",
            Namespace: "default",
        },
        Spec: MyResourceSpec{
            Replicas: pointer.Int32Ptr(3),
            Image:    "nginx:latest",
        },
    }
    
    err = k8sClient.Create(ctx, instance)
    require.NoError(t, err)
    
    defer k8sClient.Delete(ctx, instance)
    
    time.Sleep(10 * time.Second)
    
    deployment := &appsv1.Deployment{}
    err = k8sClient.Get(ctx, types.NamespacedName{Name: "test-instance", Namespace: "default"}, deployment)
    require.NoError(t, err)
    assert.Equal(t, int32(3), *deployment.Spec.Replicas)
}

4. 性能优化最佳实践

缓存优化

go
func (r *MyReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&MyResource{}).
        Owns(&appsv1.Deployment{}).
        WithOptions(controller.Options{
            MaxConcurrentReconciles: 10,
            CacheSyncTimeout:        2 * time.Minute,
        }).
        Complete(r)
}

事件过滤

go
func (r *MyReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&MyResource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
        Owns(&appsv1.Deployment{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
        Complete(r)
}

5. 安全最佳实践

RBAC配置

go
//+kubebuilder:rbac:groups=webapp.my.domain,resources=webapps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=webapp.my.domain,resources=webapps/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=webapp.my.domain,resources=webapps/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete

func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    return ctrl.Result{}, nil
}

安全上下文

go
func (r *MyReconciler) createDeployment(instance *MyResource) *appsv1.Deployment {
    return &appsv1.Deployment{
        Spec: appsv1.DeploymentSpec{
            Template: corev1.PodTemplateSpec{
                Spec: corev1.PodSpec{
                    SecurityContext: &corev1.PodSecurityContext{
                        RunAsNonRoot: pointer.BoolPtr(true),
                        RunAsUser:    pointer.Int64Ptr(1000),
                        FSGroup:      pointer.Int64Ptr(1000),
                    },
                    Containers: []corev1.Container{
                        {
                            Name: "app",
                            SecurityContext: &corev1.SecurityContext{
                                AllowPrivilegeEscalation: pointer.BoolPtr(false),
                                ReadOnlyRootFilesystem:   pointer.BoolPtr(true),
                                Capabilities: &corev1.Capabilities{
                                    Drop: []corev1.Capability{"ALL"},
                                },
                            },
                        },
                    },
                },
            },
        },
    }
}

总结

本章详细介绍了Operator模式的核心概念和实践方法:

  1. 基础概念: 掌握了Operator、控制器、CRD等核心概念
  2. 架构设计: 理解了控制循环和组件交互机制
  3. 开发框架: 学会了使用Kubebuilder和Operator SDK开发Operator
  4. 实践示例: 通过完整案例掌握了Operator开发流程
  5. 生命周期管理: 理解了Operator和应用的生命周期管理
  6. 故障排查: 掌握了常见问题的诊断和解决方法

Operator模式是Kubernetes扩展的核心方式,为复杂应用的自动化管理提供了强大的能力。

下一步学习