基于 Kubebuilder 构建管理 Jupyter Notebook 生命周期的 MLOps Operator


团队里数据科学家的开发环境管理正逐渐失控。最初,我们为每个项目维护一套 DeploymentService 的 YAML 文件,但随着项目增多,环境的差异化(不同的 Python 库、资源需求、存储挂载)导致 YAML 文件爆炸式增长。更糟的是,资源清理常常被遗忘,GPU 资源被闲置的 Notebook Pod 长期占用,成本居高不下。手动 kubectl apply -fkubectl delete -f 的操作模式充满了人为错误,根本无法规模化。

我们需要的是一种声明式的方法。数据科学家应该只关心他们需要什么环境(比如 “TensorFlow 2.11 镜像,需要 1 块 V100 GPU,以及 100Gi 的持久化工作区”),而不是如何通过一堆 Kubernetes 资源去实现它。这正是 Kubernetes Operator 模式的用武之地。我们的目标是创建一个 JupyterNotebook 自定义资源(CRD),让 Kubernetes API 成为管理 Notebook 环境的统一入口。

初步构想与技术选型

核心思路是创建一个 Operator,它会持续监听 JupyterNotebook 资源的变化。当一个新的 JupyterNotebook 被创建时,Operator 会自动为其创建对应的 DeploymentServicePersistentVolumeClaim。当它被删除时,Operator 负责清理所有关联的资源,避免任何泄漏。

在工具选型上,有 Operator SDK 和 Kubebuilder 两个主流选择。两者都极大地简化了 Operator 的开发。考虑到我们团队的技术栈以 Go 为主,并且希望利用更原生的 controller-runtime 库,我们选择了 Kubebuilder。它通过代码生成提供了清晰的项目结构和强大的样板代码,让我们可以专注于核心的业务逻辑——也就是调谐循环(Reconciliation Loop)。

我们的 JupyterNotebook CRD 规格(Spec)需要包含以下关键信息:

  • image: Notebook 环境所使用的容器镜像。
  • resources: Pod 的 CPU、内存和 GPU 请求与限制。
  • volumeSize: 为工作区分配的持久卷大小。

而它的状态(Status)则需要反馈给用户:

  • phase: 当前的状态,如 Creating, Ready, Deleting
  • url: 访问 Notebook 的内部 Service 地址。

步骤化实现:从零构建 Operator

1. 初始化项目与 API 定义

我们使用 Kubebuilder CLI 来搭建项目骨架。

# 安装 Kubebuilder
# (确保已安装 Go 1.19+ 和 Docker)
os=$(go env GOOS)
arch=$(go env GOARCH)
curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/${os}/${arch}"
chmod +x kubebuilder && sudo mv kubebuilder /usr/local/bin/

# 初始化项目
mkdir jupyter-operator && cd jupyter-operator
kubebuilder init --domain my.domain --repo my.domain/jupyter-operator

# 创建 API (CRD 和 Controller)
kubebuilder create api --group mlops --version v1 --kind JupyterNotebook

执行完这些命令后,Kubebuilder 会生成 CRD 的 Go 类型定义文件 api/v1/jupyternotebook_types.go 和 Controller 的骨架 controllers/jupyternotebook_controller.go。现在,我们来充实 JupyterNotebook 的类型定义。

api/v1/jupyternotebook_types.go:

package v1

import (
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/resource"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// JupyterNotebookSpec defines the desired state of JupyterNotebook
type JupyterNotebookSpec struct {
	// Image is the container image for the Jupyter Notebook.
	// +kubebuilder:validation:Required
	Image string `json:"image"`

	// Resources defines the compute resources required by the notebook.
	// +kubebuilder:validation:Optional
	Resources corev1.ResourceRequirements `json:"resources,omitempty"`

	// VolumeSize is the size of the persistent volume for the workspace.
	// e.g., 10Gi. If not specified, an emptyDir will be used.
	// +kubebuilder:validation:Optional
	VolumeSize resource.Quantity `json:"volumeSize,omitempty"`

	// ServiceType defines the type of the service to expose the notebook.
	// Defaults to ClusterIP.
	// +kubebuilder:validation:Enum=ClusterIP;NodePort;LoadBalancer
	// +kubebuilder:default:=ClusterIP
	ServiceType corev1.ServiceType `json:"serviceType,omitempty"`
}

// JupyterNotebookPhase defines the observed state of JupyterNotebook
type JupyterNotebookPhase string

const (
	PhaseCreating JupyterNotebookPhase = "Creating"
	PhaseReady    JupyterNotebookPhase = "Ready"
	PhaseDeleting JupyterNotebookPhase = "Deleting"

	PhaseFailed JupyterNotebookPhase = "Failed"
)

// JupyterNotebookStatus defines the observed state of JupyterNotebook
type JupyterNotebookStatus struct {
	// Phase is the current phase of the notebook.
	Phase JupyterNotebookPhase `json:"phase,omitempty"`

	// URL is the address to access the Jupyter Notebook service.
	URL string `json:"url,omitempty"`

	// ReadyReplicas is the number of Pods running and ready.
	ReadyReplicas int32 `json:"readyReplicas,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image"
//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="URL",type="string",JSONPath=".status.url"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// JupyterNotebook is the Schema for the jupyternotebooks API
type JupyterNotebook struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   JupyterNotebookSpec   `json:"spec,omitempty"`
	Status JupyterNotebookStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// JupyterNotebookList contains a list of JupyterNotebook
type JupyterNotebookList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"listMeta,omitempty"`
	Items           []JupyterNotebook `json:"items"`
}

func init() {
	SchemeBuilder.Register(&JupyterNotebook{}, &JupyterNotebookList{})
}

这里的注释不仅仅是注释,它们被称为 controller-gen markers,用于生成 CRD YAML 文件、RBAC 规则等。例如 +kubebuilder:subresource:status 告诉 Kubernetes API Server,status 字段应该通过 /status 子资源来更新,这可以防止控制器意外修改 spec

2. 实现核心调谐逻辑

调谐逻辑是 Operator 的大脑,位于 Reconcile 方法中。每当 JupyterNotebook 资源发生变化(创建、更新、删除),或者它拥有的子资源(Deployment, Service)发生变化时,这个方法就会被调用。我们的目标是让系统的实际状态(Actual State)与用户的期望状态(Desired State)保持一致。

下面是调谐逻辑的核心流程图:

graph TD
    A[Start Reconciliation] --> B{Fetch JupyterNotebook CR};
    B --> C{Is CR being deleted?};
    C -- Yes --> D[Run Finalizer Logic];
    D --> E[Remove Finalizer];
    E --> F[End];
    C -- No --> G{Ensure Finalizer Exists};
    G --> H[Reconcile PVC];
    H --> I[Reconcile Deployment];
    I --> J[Reconcile Service];
    J --> K[Update CR Status];
    K --> F;

这是 controllers/jupyternotebook_controller.go 的关键实现。代码很长,但每个部分都至关重要。

package controllers

import (
	// ... imports
	"context"
	"fmt"
	"time"

	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/resource"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/intstr"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
	"sigs.k8s.io/controller-runtime/pkg/log"

	mlopsv1 "my.domain/jupyter-operator/api/v1"
)

const (
	jupyterNotebookFinalizer = "mlops.my.domain/finalizer"
)

// JupyterNotebookReconciler reconciles a JupyterNotebook object
type JupyterNotebookReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=mlops.my.domain,resources=jupyternotebooks,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=mlops.my.domain,resources=jupyternotebooks/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=mlops.my.domain,resources=jupyternotebooks/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services;persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete

func (r *JupyterNotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	
	// 1. Fetch the JupyterNotebook instance
	notebook := &mlopsv1.JupyterNotebook{}
	err := r.Get(ctx, req.NamespacedName, notebook)
	if err != nil {
		if errors.IsNotFound(err) {
			logger.Info("JupyterNotebook resource not found. Ignoring since object must be deleted")
			return ctrl.Result{}, nil
		}
		logger.Error(err, "Failed to get JupyterNotebook")
		return ctrl.Result{}, err
	}

	// 2. Handle deletion: Check if the deletion timestamp is set
	if notebook.ObjectMeta.DeletionTimestamp.IsZero() {
		// The object is not being deleted, so if it does not have our finalizer,
		// then lets add the finalizer and update the object.
		if !controllerutil.ContainsFinalizer(notebook, jupyterNotebookFinalizer) {
			controllerutil.AddFinalizer(notebook, jupyterNotebookFinalizer)
			if err := r.Update(ctx, notebook); err != nil {
				return ctrl.Result{}, err
			}
		}
	} else {
		// The object is being deleted
		if controllerutil.ContainsFinalizer(notebook, jupyterNotebookFinalizer) {
			// Our finalizer is present, so let's handle any external dependency
			logger.Info("Performing finalizer cleanup for JupyterNotebook")
			
			// In a real project, you would add cleanup logic here, e.g.,
			// deleting external resources, unregistering from a service, etc.
			// For this example, our owned resources (Deployment, PVC, Service)
			// are garbage collected by Kubernetes, so there's not much to do here.

			// Remove our finalizer from the list and update it.
			controllerutil.RemoveFinalizer(notebook, jupyterNotebookFinalizer)
			if err := r.Update(ctx, notebook); err != nil {
				return ctrl.Result{}, err
			}
		}
		// Stop reconciliation as the item is being deleted
		return ctrl.Result{}, nil
	}
	
	// Set initial status
	if notebook.Status.Phase == "" {
		notebook.Status.Phase = mlopsv1.PhaseCreating
		if err := r.Status().Update(ctx, notebook); err != nil {
			logger.Error(err, "Failed to update JupyterNotebook status")
			return ctrl.Result{}, err
		}
		// Requeue to process after status update
		return ctrl.Result{Requeue: true}, nil
	}

	// 3. Reconcile PersistentVolumeClaim
	pvc := &corev1.PersistentVolumeClaim{}
	pvcName := fmt.Sprintf("%s-workspace", notebook.Name)
	err = r.Get(ctx, types.NamespacedName{Name: pvcName, Namespace: notebook.Namespace}, pvc)
	if err != nil && errors.IsNotFound(err) {
		// Define a new PVC
		newPvc := r.pvcForJupyterNotebook(notebook, pvcName)
		logger.Info("Creating a new PVC", "PVC.Namespace", newPvc.Namespace, "PVC.Name", newPvc.Name)
		if err = r.Create(ctx, newPvc); err != nil {
			logger.Error(err, "Failed to create new PVC")
			return ctrl.Result{}, err
		}
		// PVC created successfully - return and requeue
		return ctrl.Result{Requeue: true}, nil
	} else if err != nil {
		logger.Error(err, "Failed to get PVC")
		return ctrl.Result{}, err
	}

	// 4. Reconcile Deployment
	deployment := &appsv1.Deployment{}
	err = r.Get(ctx, types.NamespacedName{Name: notebook.Name, Namespace: notebook.Namespace}, deployment)
	if err != nil && errors.IsNotFound(err) {
		// Define a new Deployment
		newDep := r.deploymentForJupyterNotebook(notebook, pvcName)
		logger.Info("Creating a new Deployment", "Deployment.Namespace", newDep.Namespace, "Deployment.Name", newDep.Name)
		if err = r.Create(ctx, newDep); err != nil {
			logger.Error(err, "Failed to create new Deployment")
			return ctrl.Result{}, err
		}
		// Deployment created successfully - return and requeue
		return ctrl.Result{Requeue: true}, nil
	} else if err != nil {
		logger.Error(err, "Failed to get Deployment")
		return ctrl.Result{}, err
	}
	
	// 5. Reconcile Service
	service := &corev1.Service{}
	err = r.Get(ctx, types.NamespacedName{Name: notebook.Name, Namespace: notebook.Namespace}, service)
	if err != nil && errors.IsNotFound(err) {
		// Define a new Service
		newSvc := r.serviceForJupyterNotebook(notebook)
		logger.Info("Creating a new Service", "Service.Namespace", newSvc.Namespace, "Service.Name", newSvc.Name)
		if err = r.Create(ctx, newSvc); err != nil {
			logger.Error(err, "Failed to create new Service")
			return ctrl.Result{}, err
		}
		return ctrl.Result{Requeue: true}, nil
	} else if err != nil {
		logger.Error(err, "Failed to get Service")
		return ctrl.Result{}, err
	}


	// 6. Update Status
	// In a real-world scenario, you'd check for spec drift and update sub-resources if necessary.
	// For this example, we keep it simple.
	
	newStatus := mlopsv1.JupyterNotebookStatus{}
	newStatus.ReadyReplicas = deployment.Status.ReadyReplicas
	
	if deployment.Status.ReadyReplicas > 0 {
		newStatus.Phase = mlopsv1.PhaseReady
		// Standard Jupyter port is 8888
		newStatus.URL = fmt.Sprintf("http://%s.%s.svc.cluster.local:8888", service.Name, service.Namespace)
	} else {
		newStatus.Phase = mlopsv1.PhaseCreating
	}
	
	// Compare old and new status to avoid unnecessary updates
	if notebook.Status.Phase != newStatus.Phase || notebook.Status.URL != newStatus.URL {
		notebook.Status = newStatus
		if err := r.Status().Update(ctx, notebook); err != nil {
			logger.Error(err, "Failed to update JupyterNotebook status")
			return ctrl.Result{}, err
		}
	}
	
	return ctrl.Result{}, nil
}

// Helper functions to create desired K8s objects
func (r *JupyterNotebookReconciler) deploymentForJupyterNotebook(nb *mlopsv1.JupyterNotebook, pvcName string) *appsv1.Deployment {
	labels := map[string]string{"app": "jupyter-notebook", "notebook-name": nb.Name}
	replicas := int32(1)

	dep := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      nb.Name,
			Namespace: nb.Namespace,
			Labels:    labels,
		},
		Spec: appsv1.DeploymentSpec{
			Replicas: &replicas,
			Selector: &metav1.LabelSelector{
				MatchLabels: labels,
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: labels,
				},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{{
						Name:            "notebook",
						Image:           nb.Spec.Image,
						Resources:       nb.Spec.Resources,
						ImagePullPolicy: corev1.PullIfNotPresent,
						Ports: []corev1.ContainerPort{{
							ContainerPort: 8888,
							Name:          "http",
						}},
						// A common practice for jupyter images: disable auth for simplicity.
						// In production, you MUST secure this.
						Args: []string{
							"start-notebook.sh",
							"--NotebookApp.token=''",
							"--NotebookApp.password=''",
						},
						VolumeMounts: []corev1.VolumeMount{
							{
								Name:      "workspace",
								MountPath: "/home/jovyan/work",
							},
						},
					}},
					Volumes: []corev1.Volume{
						{
							Name: "workspace",
							VolumeSource: corev1.VolumeSource{
								PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
									ClaimName: pvcName,
								},
							},
						},
					},
				},
			},
		},
	}
	// Set JupyterNotebook instance as the owner and controller
	ctrl.SetControllerReference(nb, dep, r.Scheme)
	return dep
}

func (r *JupyterNotebookReconciler) serviceForJupyterNotebook(nb *mlopsv1.JupyterNotebook) *corev1.Service {
	labels := map[string]string{"app": "jupyter-notebook", "notebook-name": nb.Name}

	svc := &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
			Name:      nb.Name,
			Namespace: nb.Namespace,
			Labels:    labels,
		},
		Spec: corev1.ServiceSpec{
			Selector: labels,
			Ports: []corev1.ServicePort{{
				Protocol:   corev1.ProtocolTCP,
				Port:       8888,
				TargetPort: intstr.FromString("http"),
			}},
			Type: nb.Spec.ServiceType,
		},
	}
	ctrl.SetControllerReference(nb, svc, r.Scheme)
	return svc
}

func (r *JupyterNotebookReconciler) pvcForJupyterNotebook(nb *mlopsv1.JupyterNotebook, pvcName string) *corev1.PersistentVolumeClaim {
	// A practical default storage class should be configured in a real cluster.
	storageClassName := "standard" 

	pvc := &corev1.PersistentVolumeClaim{
		ObjectMeta: metav1.ObjectMeta{
			Name:      pvcName,
			Namespace: nb.Namespace,
		},
		Spec: corev1.PersistentVolumeClaimSpec{
			AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
			Resources: corev1.ResourceRequirements{
				Requests: corev1.ResourceList{
					corev1.ResourceStorage: nb.Spec.VolumeSize,
				},
			},
			StorageClassName: &storageClassName,
		},
	}
	ctrl.SetControllerReference(nb, pvc, r.Scheme)
	return pvc
}


// SetupWithManager sets up the controller with the Manager.
func (r *JupyterNotebookReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&mlopsv1.JupyterNotebook{}).
		Owns(&appsv1.Deployment{}).
		Owns(&corev1.Service{}).
		Owns(&corev1.PersistentVolumeClaim{}).
		Complete(r)
}

几个关键点的剖析:

  1. Finalizer: jupyterNotebookFinalizer 是一个关键机制。当用户删除 JupyterNotebook 对象时,Kubernetes API Server 不会立即删除它,而是设置 DeletionTimestamp。我们的调谐循环检测到这个时间戳后,会执行清理逻辑(这里是空的,因为子资源会自动垃圾回收),然后移除 finalizer。只有当 finalizer 列表为空时,对象才会被真正删除。这保证了资源的有序清理。
  2. 幂等性: 整个 Reconcile 方法被设计为幂等的。无论它被调用多少次,只要 JupyterNotebookspec 不变,最终集群中的状态都是一样的。我们通过 r.Get 来检查资源是否存在,只在 IsNotFound 错误时才创建。
  3. Owner References: ctrl.SetControllerReference 是将子资源(Deployment, Service, PVC)与父资源(JupyterNotebook)关联起来的关键。这告诉 Kubernetes,这些子资源是由 JupyterNotebook “拥有”的。当 JupyterNotebook 被删除时,Kubernetes 的垃圾回收器会自动清理所有被它拥有的资源。
  4. Status 更新: 状态更新是异步的,并且应该与 Spec 更新分开。我们只在状态确实发生变化时才调用 r.Status().Update(),避免不必要的 API 调用。
  5. 单元测试思路: 虽然没有展示具体代码,但测试这个 Operator 的标准方法是使用 envtest 库。它会启动一个临时的 etcdkube-apiserver 实例,让我们的 Controller 在一个受控的环境中运行。我们可以创建 JupyterNotebook 对象,然后断言对应的 Deployment, Service 是否被正确创建,以及 Status 是否被正确更新。

3. 部署与验证

首先,生成并应用 CRD 到集群中:

make manifests
make install

然后,在本地运行 Operator 进行调试:

make run

现在,打开另一个终端,创建一个 JupyterNotebook 实例。

config/samples/mlops_v1_jupyternotebook.yaml:

apiVersion: mlops.my.domain/v1
kind: JupyterNotebook
metadata:
  name: datascience-project-alpha
spec:
  image: jupyter/scipy-notebook:latest
  volumeSize: 10Gi
  resources:
    requests:
      cpu: "1"
      memory: "2Gi"
    limits:
      cpu: "2"
      memory: "4Gi"

应用它:

kubectl apply -f config/samples/mlops_v1_jupyternotebook.yaml

很快,你就可以看到 Operator 的日志输出,显示它正在创建 PVC, Deployment 和 Service。几分钟后,检查资源状态:

# 查看 CR 状态
kubectl get jupyternotebook datascience-project-alpha -o wide

# NAME                          IMAGE                          STATUS   URL                                                      AGE
# datascience-project-alpha   jupyter/scipy-notebook:latest   Ready    http://datascience-project-alpha.default.svc.cluster.local:8888   2m

# 查看创建的子资源
kubectl get pvc,deployment,service -l notebook-name=datascience-project-alpha

# NAME                                                        STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
# persistentvolumeclaim/datascience-project-alpha-workspace   Bound    pvc-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx   10Gi       RWO            standard       2m

# NAME                                        READY   UP-TO-DATE   AVAILABLE   AGE
# deployment.apps/datascience-project-alpha   1/1     1            1           2m

# NAME                                      TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE
# service/datascience-project-alpha         ClusterIP   10.96.123.123   <none>        8888/TCP   2m

当删除 JupyterNotebook 对象时,所有关联的资源也都会被自动清理。

kubectl delete jupyternotebook datascience-project-alpha

局限性与未来迭代路径

这个 Operator 只是一个起点,距离生产级可用还有一段距离。当前的实现存在一些明显的局限:

  • 安全性: Notebook 容器以无密码、无令牌的方式启动,这在多租户环境中是绝对不可接受的。下一步需要集成认证机制,例如 OIDC,或者通过 sidecar 注入身份验证代理。
  • 网络暴露: ClusterIP 类型的服务只能在集群内部访问。对于需要从外部访问的场景,需要支持 Ingress 资源的自动创建和配置,并处理 TLS 证书。
  • 资源管理: 没有实现对空闲 Notebook 的自动关闭机制,这会导致资源浪费。可以在 Spec 中增加一个 idleTimeoutSeconds 字段,Operator 需要监控 Pod 的活动并根据策略进行缩容。
  • 更新策略: 当前的实现非常简单,没有处理 Spec 变更的逻辑(例如更换镜像)。一个完整的 Operator 需要能够比较 Spec 和当前 Deployment 的状态,并在检测到差异时触发滚动更新。

未来的迭代方向很明确:将这个 Operator 发展成一个更完备的 MLOps 平台组件,集成身份认证、网络策略、资源配额管理和成本核算功能,真正实现数据科学开发环境的自动化和自服务。


  目录