Operator模式
概述
Operator是Kubernetes的一种扩展模式,它将运维知识编码到软件中,实现复杂应用的自动化管理。Operator通过自定义控制器(Custom Controller)和自定义资源定义(CRD)来扩展Kubernetes API,使应用的生命周期管理变得自动化、可重复和可靠。Operator模式是云原生应用管理的核心模式,广泛应用于数据库、消息队列、监控系统等有状态应用的管理。
核心概念
1. Operator
Operator是Kubernetes的软件扩展,使用自定义资源管理应用及其组件:
- 遵循Kubernetes原则(特别是控制循环)
- 将运维知识编码到软件中
- 自动执行复杂的运维任务
- 提供声明式的API接口
2. 控制器模式
控制器是Kubernetes的核心设计模式:
- 监控集群状态
- 比较期望状态和实际状态
- 调整实际状态以匹配期望状态
- 持续运行的控制循环
3. 自定义资源(CR)
自定义资源是Kubernetes API的扩展:
- 允许用户定义自己的资源类型
- 通过CRD创建新的资源类型
- 可以像内置资源一样使用kubectl操作
4. 控制循环(Reconcile Loop)
控制循环是Operator的核心机制:
┌─────────────────────────────────────────────────────────────┐
│ Control Loop │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Watch │ -> │ Compare │ -> │ Reconcile│ │
│ │ Resources│ │ States │ │ Actions │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ↑ │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘Operator架构
整体架构
┌─────────────────────────────────────────────────────────────┐
│ Kubernetes Cluster │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ API Server │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ Built-in │ │ CRD │ │ CR │ │ │
│ │ │ Resources │ │ Definitions│ │ Instances │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
│ ↑ │
│ │ Watch/Update │
│ ↓ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Custom Controller │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ Informer │ │ Workqueue │ │ Reconciler│ │ │
│ │ │ (Cache) │ │ │ │ │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
│ ↓ │
│ │ Create/Update/Delete │
│ ↓ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Managed Resources │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ Pods │ │ Services │ │ ConfigMaps │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘控制器组件
1. Informer
Informer负责监控资源变化:
go
type Informer interface {
AddEventHandler(handler ResourceEventHandler)
Run(stopCh <-chan struct{})
HasSynced() bool
}
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {},
},
&MyResource{},
time.Hour*12,
)2. Workqueue
Workqueue处理事件队列:
go
type Workqueue interface {
Add(item interface{})
Get() (interface{}, bool)
Done(item interface{})
AddRateLimited(item interface{})
Forget(item interface{})
}
queue := workqueue.NewRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
)3. Reconciler
Reconciler实现协调逻辑:
go
type Reconciler interface {
Reconcile(ctx context.Context, req Request) (Result, error)
}
func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
instance := &MyResource{}
if err := r.Client.Get(ctx, req.NamespacedName, instance); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
return r.reconcileResource(ctx, instance)
}Operator开发框架
Kubebuilder
安装Kubebuilder
bash
curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)
chmod +x kubebuilder
mv kubebuilder /usr/local/bin/
kubebuilder version创建Operator项目
bash
mkdir my-operator
cd my-operator
kubebuilder init --domain my.domain --repo my.domain/my-operator
kubebuilder create api --group webapp --version v1 --kind WebApp
kubebuilder create webhook --group webapp --version v1 --kind WebApp --defaulting --programmatic-validation项目结构
my-operator/
├── cmd/
│ └── main.go
├── config/
│ ├── crd/
│ │ ├── bases/
│ │ └── kustomization.yaml
│ ├── rbac/
│ │ ├── auth_proxy_client_clusterrole.yaml
│ │ ├── auth_proxy_role.yaml
│ │ ├── kustomization.yaml
│ │ ├── leader_election_role.yaml
│ │ ├── leader_election_role_binding.yaml
│ │ ├── role.yaml
│ │ ├── role_binding.yaml
│ │ └── service_account.yaml
│ ├── manager/
│ │ ├── kustomization.yaml
│ │ └── manager.yaml
│ ├── prometheus/
│ │ ├── kustomization.yaml
│ │ └── monitor.yaml
│ ├── webhook/
│ │ ├── kustomization.yaml
│ │ ├── kustomizeconfig.yaml
│ │ ├── manifests.yaml
│ │ └── service.yaml
│ └── samples/
│ └── webapp_v1_webapp.yaml
├── api/
│ └── v1/
│ ├── groupversion_info.go
│ ├── webapp_types.go
│ ├── webapp_webhook.go
│ └── zz_generated.deepcopy.go
├── controllers/
│ ├── suite_test.go
│ └── webapp_controller.go
├── hack/
│ └── boilerplate.go.txt
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
└── go.modOperator SDK
安装Operator SDK
bash
curl -LO https://github.com/operator-framework/operator-sdk/releases/download/v1.32.0/operator-sdk_linux_amd64
chmod +x operator-sdk_linux_amd64
sudo mv operator-sdk_linux_amd64 /usr/local/bin/operator-sdk
operator-sdk version创建Operator项目
bash
mkdir my-operator
cd my-operator
operator-sdk init --domain my.domain --repo github.com/example/my-operator
operator-sdk create api --group webapp --version v1 --kind WebApp --resource --controller
operator-sdk create webhook --group webapp --version v1 --kind WebApp --defaulting --programmatic-validation实践示例
示例1:WebApp Operator
API定义
go
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type WebAppSpec struct {
Replicas *int32 `json:"replicas"`
Image string `json:"image"`
Port int32 `json:"port"`
Resources ResourceRequirements `json:"resources,omitempty"`
Config map[string]string `json:"config,omitempty"`
Storage StorageSpec `json:"storage,omitempty"`
Ingress *IngressSpec `json:"ingress,omitempty"`
}
type ResourceRequirements struct {
Requests ResourceList `json:"requests,omitempty"`
Limits ResourceList `json:"limits,omitempty"`
}
type ResourceList struct {
CPU string `json:"cpu,omitempty"`
Memory string `json:"memory,omitempty"`
}
type StorageSpec struct {
Enabled bool `json:"enabled,omitempty"`
Size string `json:"size,omitempty"`
StorageClass string `json:"storageClass,omitempty"`
}
type IngressSpec struct {
Enabled bool `json:"enabled,omitempty"`
Host string `json:"host,omitempty"`
TLS *IngressTLSSpec `json:"tls,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
}
type IngressTLSSpec struct {
Enabled bool `json:"enabled,omitempty"`
SecretName string `json:"secretName,omitempty"`
}
type WebAppStatus struct {
Replicas int32 `json:"replicas"`
ReadyReplicas int32 `json:"readyReplicas"`
Phase string `json:"phase,omitempty"`
Message string `json:"message,omitempty"`
}
type WebApp struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec WebAppSpec `json:"spec,omitempty"`
Status WebAppStatus `json:"status,omitempty"`
}
type WebAppList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []WebApp `json:"items"`
}
func init() {
SchemeBuilder.Register(&WebApp{}, &WebAppList{})
}控制器实现
go
package controllers
import (
"context"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
webappv1 "my.domain/my-operator/api/v1"
)
type WebAppReconciler struct {
client.Client
Scheme *runtime.Scheme
}
func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
instance := &webappv1.WebApp{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
if errors.IsNotFound(err) {
logger.Info("WebApp resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get WebApp")
return ctrl.Result{}, err
}
if err := r.reconcileDeployment(ctx, instance); err != nil {
logger.Error(err, "Failed to reconcile Deployment")
return ctrl.Result{}, err
}
if err := r.reconcileService(ctx, instance); err != nil {
logger.Error(err, "Failed to reconcile Service")
return ctrl.Result{}, err
}
if instance.Spec.Ingress != nil && instance.Spec.Ingress.Enabled {
if err := r.reconcileIngress(ctx, instance); err != nil {
logger.Error(err, "Failed to reconcile Ingress")
return ctrl.Result{}, err
}
}
if err := r.updateStatus(ctx, instance); err != nil {
logger.Error(err, "Failed to update status")
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
func (r *WebAppReconciler) reconcileDeployment(ctx context.Context, webapp *webappv1.WebApp) error {
deployment := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{Name: webapp.Name, Namespace: webapp.Namespace}, deployment)
if err != nil && errors.IsNotFound(err) {
deployment = r.createDeployment(webapp)
if err := ctrl.SetControllerReference(webapp, deployment, r.Scheme); err != nil {
return err
}
return r.Create(ctx, deployment)
} else if err != nil {
return err
}
updated := r.updateDeployment(deployment, webapp)
if updated {
return r.Update(ctx, deployment)
}
return nil
}
func (r *WebAppReconciler) createDeployment(webapp *webappv1.WebApp) *appsv1.Deployment {
replicas := int32(1)
if webapp.Spec.Replicas != nil {
replicas = *webapp.Spec.Replicas
}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
Labels: map[string]string{
"app": webapp.Name,
"app.kubernetes.io/name": webapp.Name,
"app.kubernetes.io/instance": webapp.Name,
"app.kubernetes.io/managed-by": "webapp-operator",
},
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": webapp.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": webapp.Name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "webapp",
Image: webapp.Spec.Image,
Ports: []corev1.ContainerPort{
{
ContainerPort: webapp.Spec.Port,
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(webapp.Spec.Resources.Requests.CPU),
corev1.ResourceMemory: resource.MustParse(webapp.Spec.Resources.Requests.Memory),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(webapp.Spec.Resources.Limits.CPU),
corev1.ResourceMemory: resource.MustParse(webapp.Spec.Resources.Limits.Memory),
},
},
Env: r.createEnvVars(webapp),
},
},
},
},
},
}
}
func (r *WebAppReconciler) createEnvVars(webapp *webappv1.WebApp) []corev1.EnvVar {
envVars := []corev1.EnvVar{}
for key, value := range webapp.Spec.Config {
envVars = append(envVars, corev1.EnvVar{
Name: key,
Value: value,
})
}
return envVars
}
func (r *WebAppReconciler) reconcileService(ctx context.Context, webapp *webappv1.WebApp) error {
service := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: webapp.Name, Namespace: webapp.Namespace}, service)
if err != nil && errors.IsNotFound(err) {
service = r.createService(webapp)
if err := ctrl.SetControllerReference(webapp, service, r.Scheme); err != nil {
return err
}
return r.Create(ctx, service)
} else if err != nil {
return err
}
return nil
}
func (r *WebAppReconciler) createService(webapp *webappv1.WebApp) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
Labels: map[string]string{
"app": webapp.Name,
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": webapp.Name,
},
Ports: []corev1.ServicePort{
{
Port: webapp.Spec.Port,
TargetPort: intstr.FromInt(int(webapp.Spec.Port)),
},
},
},
}
}
func (r *WebAppReconciler) reconcileIngress(ctx context.Context, webapp *webappv1.WebApp) error {
ingress := &networkingv1.Ingress{}
err := r.Get(ctx, types.NamespacedName{Name: webapp.Name, Namespace: webapp.Namespace}, ingress)
if err != nil && errors.IsNotFound(err) {
ingress = r.createIngress(webapp)
if err := ctrl.SetControllerReference(webapp, ingress, r.Scheme); err != nil {
return err
}
return r.Create(ctx, ingress)
} else if err != nil {
return err
}
return nil
}
func (r *WebAppReconciler) createIngress(webapp *webappv1.WebApp) *networkingv1.Ingress {
pathType := networkingv1.PathTypePrefix
ingress := &networkingv1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
Annotations: webapp.Spec.Ingress.Annotations,
},
Spec: networkingv1.IngressSpec{
Rules: []networkingv1.IngressRule{
{
Host: webapp.Spec.Ingress.Host,
IngressRuleValue: networkingv1.IngressRuleValue{
HTTP: &networkingv1.HTTPIngressRuleValue{
Paths: []networkingv1.HTTPIngressPath{
{
Path: "/",
PathType: &pathType,
Backend: networkingv1.IngressBackend{
Service: &networkingv1.IngressServiceBackend{
Name: webapp.Name,
Port: networkingv1.ServiceBackendPort{
Number: webapp.Spec.Port,
},
},
},
},
},
},
},
},
},
},
}
if webapp.Spec.Ingress.TLS != nil && webapp.Spec.Ingress.TLS.Enabled {
ingress.Spec.TLS = []networkingv1.IngressTLS{
{
Hosts: []string{webapp.Spec.Ingress.Host},
SecretName: webapp.Spec.Ingress.TLS.SecretName,
},
}
}
return ingress
}
func (r *WebAppReconciler) updateStatus(ctx context.Context, webapp *webappv1.WebApp) error {
deployment := &appsv1.Deployment{}
if err := r.Get(ctx, types.NamespacedName{Name: webapp.Name, Namespace: webapp.Namespace}, deployment); err != nil {
return err
}
webapp.Status.Replicas = deployment.Status.Replicas
webapp.Status.ReadyReplicas = deployment.Status.ReadyReplicas
if deployment.Status.ReadyReplicas == deployment.Status.Replicas {
webapp.Status.Phase = "Running"
webapp.Status.Message = "All replicas are ready"
} else {
webapp.Status.Phase = "Updating"
webapp.Status.Message = fmt.Sprintf("%d/%d replicas ready", deployment.Status.ReadyReplicas, deployment.Status.Replicas)
}
return r.Status().Update(ctx, webapp)
}
func (r *WebAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&webappv1.WebApp{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&networkingv1.Ingress{}).
Complete(r)
}CRD定义
yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: webapps.webapp.my.domain
spec:
group: webapp.my.domain
names:
kind: WebApp
listKind: WebAppList
plural: webapps
singular: webapp
scope: Namespaced
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- image
- port
properties:
replicas:
type: integer
minimum: 1
maximum: 100
image:
type: string
port:
type: integer
minimum: 1
maximum: 65535
resources:
type: object
properties:
requests:
type: object
properties:
cpu:
type: string
memory:
type: string
limits:
type: object
properties:
cpu:
type: string
memory:
type: string
config:
type: object
additionalProperties:
type: string
storage:
type: object
properties:
enabled:
type: boolean
size:
type: string
storageClass:
type: string
ingress:
type: object
properties:
enabled:
type: boolean
host:
type: string
annotations:
type: object
additionalProperties:
type: string
tls:
type: object
properties:
enabled:
type: boolean
secretName:
type: string
status:
type: object
properties:
replicas:
type: integer
readyReplicas:
type: integer
phase:
type: string
message:
type: string
subresources:
status: {}
scale:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
additionalPrinterColumns:
- name: Replicas
type: integer
jsonPath: .spec.replicas
- name: Ready
type: integer
jsonPath: .status.readyReplicas
- name: Phase
type: string
jsonPath: .status.phase
- name: Age
type: date
jsonPath: .metadata.creationTimestamp示例CR
yaml
apiVersion: webapp.my.domain/v1
kind: WebApp
metadata:
name: my-webapp
namespace: default
spec:
replicas: 3
image: nginx:1.25
port: 80
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "512Mi"
config:
ENVIRONMENT: "production"
LOG_LEVEL: "info"
ingress:
enabled: true
host: my-webapp.example.com
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
tls:
enabled: true
secretName: my-webapp-tls示例2:数据库Operator
API定义
go
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type DatabaseSpec struct {
Version string `json:"version"`
StorageSize string `json:"storageSize"`
StorageClass string `json:"storageClass,omitempty"`
Replicas int32 `json:"replicas"`
Resources ResourceRequirements `json:"resources,omitempty"`
Database string `json:"database"`
Credentials CredentialsSpec `json:"credentials"`
Backup *BackupSpec `json:"backup,omitempty"`
Monitoring *MonitoringSpec `json:"monitoring,omitempty"`
}
type CredentialsSpec struct {
SecretName string `json:"secretName"`
UsernameKey string `json:"usernameKey"`
PasswordKey string `json:"passwordKey"`
}
type BackupSpec struct {
Enabled bool `json:"enabled,omitempty"`
Schedule string `json:"schedule,omitempty"`
StorageSize string `json:"storageSize,omitempty"`
Retention int `json:"retention,omitempty"`
}
type MonitoringSpec struct {
Enabled bool `json:"enabled,omitempty"`
}
type DatabaseStatus struct {
Phase string `json:"phase,omitempty"`
CurrentReplicas int32 `json:"currentReplicas"`
ReadyReplicas int32 `json:"readyReplicas"`
CurrentVersion string `json:"currentVersion,omitempty"`
LastBackup metav1.Time `json:"lastBackup,omitempty"`
Conditions []metav1.Condition `json:"conditions,omitempty"`
}
type Database struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec DatabaseSpec `json:"spec,omitempty"`
Status DatabaseStatus `json:"status,omitempty"`
}
type DatabaseList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Database `json:"items"`
}
func init() {
SchemeBuilder.Register(&Database{}, &DatabaseList{})
}控制器实现
go
package controllers
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
dbv1 "my.domain/my-operator/api/v1"
)
type DatabaseReconciler struct {
client.Client
Scheme *runtime.Scheme
}
func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
instance := &dbv1.Database{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
if err := r.reconcileSecret(ctx, instance); err != nil {
return ctrl.Result{}, err
}
if err := r.reconcilePVC(ctx, instance); err != nil {
return ctrl.Result{}, err
}
if err := r.reconcileStatefulSet(ctx, instance); err != nil {
return ctrl.Result{}, err
}
if err := r.reconcileService(ctx, instance); err != nil {
return ctrl.Result{}, err
}
if instance.Spec.Backup != nil && instance.Spec.Backup.Enabled {
if err := r.reconcileBackupJob(ctx, instance); err != nil {
return ctrl.Result{}, err
}
}
if err := r.updateStatus(ctx, instance); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}
func (r *DatabaseReconciler) reconcileSecret(ctx context.Context, db *dbv1.Database) error {
secret := &corev1.Secret{}
err := r.Get(ctx, types.NamespacedName{Name: db.Spec.Credentials.SecretName, Namespace: db.Namespace}, secret)
if err != nil && errors.IsNotFound(err) {
secret = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: db.Spec.Credentials.SecretName,
Namespace: db.Namespace,
},
StringData: map[string]string{
db.Spec.Credentials.UsernameKey: "admin",
db.Spec.Credentials.PasswordKey: r.generatePassword(),
},
}
if err := ctrl.SetControllerReference(db, secret, r.Scheme); err != nil {
return err
}
return r.Create(ctx, secret)
}
return err
}
func (r *DatabaseReconciler) reconcilePVC(ctx context.Context, db *dbv1.Database) error {
for i := int32(0); i < db.Spec.Replicas; i++ {
pvcName := fmt.Sprintf("data-%s-%d", db.Name, i)
pvc := &corev1.PersistentVolumeClaim{}
err := r.Get(ctx, types.NamespacedName{Name: pvcName, Namespace: db.Namespace}, pvc)
if err != nil && errors.IsNotFound(err) {
storageClass := db.Spec.StorageClass
pvc = &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: db.Namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(db.Spec.StorageSize),
},
},
},
}
if storageClass != "" {
pvc.Spec.StorageClassName = &storageClass
}
if err := ctrl.SetControllerReference(db, pvc, r.Scheme); err != nil {
return err
}
if err := r.Create(ctx, pvc); err != nil {
return err
}
} else if err != nil {
return err
}
}
return nil
}
func (r *DatabaseReconciler) reconcileStatefulSet(ctx context.Context, db *dbv1.Database) error {
sts := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: db.Name, Namespace: db.Namespace}, sts)
if err != nil && errors.IsNotFound(err) {
sts = r.createStatefulSet(db)
if err := ctrl.SetControllerReference(db, sts, r.Scheme); err != nil {
return err
}
return r.Create(ctx, sts)
} else if err != nil {
return err
}
return nil
}
func (r *DatabaseReconciler) createStatefulSet(db *dbv1.Database) *appsv1.StatefulSet {
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: db.Name,
Namespace: db.Namespace,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: db.Name,
Replicas: &db.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": db.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": db.Name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "database",
Image: fmt.Sprintf("postgres:%s", db.Spec.Version),
Ports: []corev1.ContainerPort{
{
ContainerPort: 5432,
},
},
Env: []corev1.EnvVar{
{
Name: "POSTGRES_DB",
Value: db.Spec.Database,
},
{
Name: "POSTGRES_USER",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: db.Spec.Credentials.SecretName,
},
Key: db.Spec.Credentials.UsernameKey,
},
},
},
{
Name: "POSTGRES_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: db.Spec.Credentials.SecretName,
},
Key: db.Spec.Credentials.PasswordKey,
},
},
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "data",
MountPath: "/var/lib/postgresql/data",
},
},
},
},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "data",
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(db.Spec.StorageSize),
},
},
},
},
},
},
}
}
func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&dbv1.Database{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Owns(&corev1.Secret{}).
Complete(r)
}示例3:使用Helm Operator
Helm Operator配置
yaml
apiVersion: operators.coreos.com/v1alpha1
kind: ClusterServiceVersion
metadata:
name: my-app-operator.v1.0.0
namespace: operators
spec:
displayName: My Application Operator
version: 1.0.0
provider:
name: My Company
maturity: stable
installModes:
- type: OwnNamespace
supported: true
- type: SingleNamespace
supported: true
- type: MultiNamespace
supported: false
- type: AllNamespaces
supported: true
install:
strategy: deployment
spec:
deployments:
- name: my-app-operator
spec:
replicas: 1
selector:
matchLabels:
name: my-app-operator
template:
metadata:
labels:
name: my-app-operator
spec:
serviceAccountName: my-app-operator
containers:
- name: my-app-operator
image: my-registry/my-app-operator:1.0.0
env:
- name: WATCH_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.annotations['olm.targetNamespaces']
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: OPERATOR_NAME
value: "my-app-operator"
customresourcedefinitions:
owned:
- name: myapps.myapp.example.com
version: v1alpha1
kind: MyApp
displayName: My Application
description: Represents a deployment of My Application生命周期管理
Operator生命周期
安装阶段
yaml
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
name: my-operator
namespace: operators
spec:
channel: stable
name: my-operator
source: operatorhubio-catalog
sourceNamespace: olm
startingCSV: my-operator.v1.0.0
installPlanApproval: Automatic升级阶段
yaml
apiVersion: operators.coreos.com/v1alpha1
kind: InstallPlan
metadata:
name: install-plan
namespace: operators
spec:
approval: Automatic
approved: true
clusterServiceVersionNames:
- my-operator.v1.1.0卸载阶段
bash
kubectl delete subscription my-operator -n operators
kubectl delete clusterserviceversion my-operator.v1.0.0 -n operators
kubectl delete crd myapps.myapp.example.com应用生命周期
创建
yaml
apiVersion: webapp.my.domain/v1
kind: WebApp
metadata:
name: my-app
spec:
replicas: 3
image: nginx:latest
port: 80更新
yaml
apiVersion: webapp.my.domain/v1
kind: WebApp
metadata:
name: my-app
spec:
replicas: 5
image: nginx:1.25
port: 80扩缩容
bash
kubectl patch webapp my-app --type merge -p '{"spec":{"replicas":10}}'
kubectl scale webapp my-app --replicas=5删除
bash
kubectl delete webapp my-appkubectl操作命令
CRD管理
bash
kubectl get crd
kubectl describe crd webapps.webapp.my.domain
kubectl get crd webapps.webapp.my.domain -o yaml
kubectl apply -f crd.yaml
kubectl delete crd webapps.webapp.my.domainCR管理
bash
kubectl get webapp
kubectl get webapp -o wide
kubectl describe webapp my-webapp
kubectl get webapp my-webapp -o yaml
kubectl apply -f webapp-cr.yaml
kubectl edit webapp my-webapp
kubectl delete webapp my-webapp
kubectl patch webapp my-webapp --type merge -p '{"spec":{"replicas":5}}'Operator管理
bash
kubectl get deployment -n operators
kubectl get pods -n operators -l name=my-operator
kubectl logs -n operators -l name=my-operator -f
kubectl describe pod -n operators -l name=my-operator
kubectl get events -n operators --sort-by='.lastTimestamp'调试命令
bash
kubectl get webapp my-webapp -o jsonpath='{.status}'
kubectl get webapp my-webapp -o jsonpath='{.status.conditions}'
kubectl auth can-i create webapps.webapp.my.domain --as=system:serviceaccount:default:default
kubectl auth can-i list webapps.webapp.my.domain --all-namespaces
kubectl api-resources | grep webapp
kubectl explain webapp.spec
kubectl explain webapp.status故障排查指南
问题1:CRD未创建
症状
bash
kubectl get webapp
error: the server doesn't have a resource type "webapp"排查步骤
bash
kubectl get crd | grep webapp
kubectl describe crd webapps.webapp.my.domain
kubectl get api-resources | grep webapp
kubectl logs -n operators -l name=my-operator解决方案
- 检查CRD定义是否正确
- 确认Operator已安装并运行
- 验证RBAC权限
- 检查API版本兼容性
问题2:Operator未协调资源
症状
bash
kubectl get webapp my-webapp
NAME REPLICAS READY PHASE AGE
my-webapp 3 0 Pending 10m排查步骤
bash
kubectl describe webapp my-webapp
kubectl logs -n operators -l name=my-operator
kubectl get events --field-selector involvedObject.name=my-webapp
kubectl get deployment,service,ingress -l app=my-webapp解决方案
- 检查Operator日志错误
- 验证控制器逻辑
- 确认资源配额充足
- 检查网络策略限制
问题3:Webhook错误
症状
bash
Error from server (InternalError): error when creating "webapp-cr.yaml": Internal error occurred: failed calling webhook "mwebapp.kb.io": Post "https://my-operator-webhook.operators.svc:443/mutate-webapp-my-domain-v1-webapp?timeout=10s": no such host排查步骤
bash
kubectl get validatingwebhookconfigurations
kubectl get mutatingwebhookconfigurations
kubectl get svc -n operators
kubectl logs -n operators -l name=my-operator
kubectl get cert -n operators解决方案
- 检查Webhook服务是否正常
- 验证证书配置
- 确认Webhook配置正确
- 检查网络连接
问题4:RBAC权限不足
症状
bash
E0115 10:30:00.123456 12345 controller.go:123] failed to reconcile: webapps.webapp.my.domain is forbidden: User "system:serviceaccount:operators:my-operator" cannot create resource "webapps" in API group "webapp.my.domain" at the cluster scope排查步骤
bash
kubectl get clusterrole | grep my-operator
kubectl get clusterrolebinding | grep my-operator
kubectl describe clusterrole my-operator-role
kubectl auth can-i create webapps.webapp.my.domain --as=system:serviceaccount:operators:my-operator解决方案
yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: my-operator-role
rules:
- apiGroups: ["webapp.my.domain"]
resources: ["webapps"]
verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
- apiGroups: ["webapp.my.domain"]
resources: ["webapps/status"]
verbs: ["get", "patch", "update"]问题5:资源状态更新失败
症状
bash
kubectl get webapp my-webapp -o yaml
status:
phase: ""
replicas: 0排查步骤
bash
kubectl describe webapp my-webapp
kubectl logs -n operators -l name=my-operator | grep -i error
kubectl get webapp my-webapp -o jsonpath='{.metadata.resourceVersion}'
kubectl auth can-i update webapps/status --as=system:serviceaccount:operators:my-operator解决方案
- 检查状态子资源权限
- 验证状态更新逻辑
- 确认资源版本冲突处理
- 检查控制器重试机制
最佳实践
1. 设计最佳实践
API设计
go
type MyResourceSpec struct {
Version string `json:"version"`
Replicas *int32 `json:"replicas,omitempty"`
Image string `json:"image"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
Affinity *corev1.Affinity `json:"affinity,omitempty"`
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
}
type MyResourceStatus struct {
Phase string `json:"phase,omitempty"`
Conditions []metav1.Condition `json:"conditions,omitempty"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}状态管理
go
func (r *MyReconciler) updateCondition(instance *MyResource, conditionType string, status metav1.ConditionStatus, reason, message string) {
condition := metav1.Condition{
Type: conditionType,
Status: status,
Reason: reason,
Message: message,
LastTransitionTime: metav1.Now(),
}
meta.SetStatusCondition(&instance.Status.Conditions, condition)
}2. 控制器实现最佳实践
幂等性
go
func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
instance := &MyResource{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
if instance.DeletionTimestamp != nil {
return r.handleDeletion(ctx, instance)
}
return r.handleCreationOrUpdate(ctx, instance)
}错误处理
go
func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
instance := &MyResource{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
result, err := r.reconcile(ctx, instance)
if err != nil {
if errors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
return result, nil
}事件记录
go
func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
recorder := r.Recorder
instance := &MyResource{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
logger.Info("Reconciling MyResource", "name", instance.Name)
recorder.Event(instance, corev1.EventTypeNormal, "Reconciling", "Starting reconciliation")
return ctrl.Result{}, nil
}3. 测试最佳实践
单元测试
go
func TestReconcile(t *testing.T) {
scheme := runtime.NewScheme()
_ = MyResourceV1.AddToScheme(scheme)
_ = appsv1.AddToScheme(scheme)
_ = corev1.AddToScheme(scheme)
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
}
instance := &MyResource{
ObjectMeta: metav1.ObjectMeta{
Name: "test-instance",
Namespace: "test",
},
Spec: MyResourceSpec{
Replicas: pointer.Int32Ptr(3),
Image: "nginx:latest",
},
}
client := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(ns, instance).
Build()
reconciler := &MyReconciler{
Client: client,
Scheme: scheme,
}
req := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "test-instance",
Namespace: "test",
},
}
result, err := reconciler.Reconcile(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, ctrl.Result{RequeueAfter: time.Minute}, result)
}集成测试
go
func TestIntegration(t *testing.T) {
cfg, err := ctrl.GetConfig()
require.NoError(t, err)
scheme := runtime.NewScheme()
_ = MyResourceV1.AddToScheme(scheme)
k8sClient, err := client.New(cfg, client.Options{Scheme: scheme})
require.NoError(t, err)
ctx := context.Background()
instance := &MyResource{
ObjectMeta: metav1.ObjectMeta{
Name: "test-instance",
Namespace: "default",
},
Spec: MyResourceSpec{
Replicas: pointer.Int32Ptr(3),
Image: "nginx:latest",
},
}
err = k8sClient.Create(ctx, instance)
require.NoError(t, err)
defer k8sClient.Delete(ctx, instance)
time.Sleep(10 * time.Second)
deployment := &appsv1.Deployment{}
err = k8sClient.Get(ctx, types.NamespacedName{Name: "test-instance", Namespace: "default"}, deployment)
require.NoError(t, err)
assert.Equal(t, int32(3), *deployment.Spec.Replicas)
}4. 性能优化最佳实践
缓存优化
go
func (r *MyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}).
Owns(&appsv1.Deployment{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: 10,
CacheSyncTimeout: 2 * time.Minute,
}).
Complete(r)
}事件过滤
go
func (r *MyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&appsv1.Deployment{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
Complete(r)
}5. 安全最佳实践
RBAC配置
go
//+kubebuilder:rbac:groups=webapp.my.domain,resources=webapps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=webapp.my.domain,resources=webapps/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=webapp.my.domain,resources=webapps/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, nil
}安全上下文
go
func (r *MyReconciler) createDeployment(instance *MyResource) *appsv1.Deployment {
return &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
SecurityContext: &corev1.PodSecurityContext{
RunAsNonRoot: pointer.BoolPtr(true),
RunAsUser: pointer.Int64Ptr(1000),
FSGroup: pointer.Int64Ptr(1000),
},
Containers: []corev1.Container{
{
Name: "app",
SecurityContext: &corev1.SecurityContext{
AllowPrivilegeEscalation: pointer.BoolPtr(false),
ReadOnlyRootFilesystem: pointer.BoolPtr(true),
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{"ALL"},
},
},
},
},
},
},
},
}
}总结
本章详细介绍了Operator模式的核心概念和实践方法:
- 基础概念: 掌握了Operator、控制器、CRD等核心概念
- 架构设计: 理解了控制循环和组件交互机制
- 开发框架: 学会了使用Kubebuilder和Operator SDK开发Operator
- 实践示例: 通过完整案例掌握了Operator开发流程
- 生命周期管理: 理解了Operator和应用的生命周期管理
- 故障排查: 掌握了常见问题的诊断和解决方法
Operator模式是Kubernetes扩展的核心方式,为复杂应用的自动化管理提供了强大的能力。
下一步学习
- 自定义资源 - 深入学习CRD定义和验证
- 自动扩缩容 - 配置HPA/VPA自动扩缩容
- 服务网格 - 学习Istio服务网格架构
- Helm Charts - 回顾Helm包管理器使用