团队里数据科学家的开发环境管理正逐渐失控。最初,我们为每个项目维护一套 Deployment
和 Service
的 YAML 文件,但随着项目增多,环境的差异化(不同的 Python 库、资源需求、存储挂载)导致 YAML 文件爆炸式增长。更糟的是,资源清理常常被遗忘,GPU 资源被闲置的 Notebook Pod 长期占用,成本居高不下。手动 kubectl apply -f
和 kubectl delete -f
的操作模式充满了人为错误,根本无法规模化。
我们需要的是一种声明式的方法。数据科学家应该只关心他们需要什么环境(比如 “TensorFlow 2.11 镜像,需要 1 块 V100 GPU,以及 100Gi 的持久化工作区”),而不是如何通过一堆 Kubernetes 资源去实现它。这正是 Kubernetes Operator 模式的用武之地。我们的目标是创建一个 JupyterNotebook
自定义资源(CRD),让 Kubernetes API 成为管理 Notebook 环境的统一入口。
初步构想与技术选型
核心思路是创建一个 Operator,它会持续监听 JupyterNotebook
资源的变化。当一个新的 JupyterNotebook
被创建时,Operator 会自动为其创建对应的 Deployment
、Service
和 PersistentVolumeClaim
。当它被删除时,Operator 负责清理所有关联的资源,避免任何泄漏。
在工具选型上,有 Operator SDK 和 Kubebuilder 两个主流选择。两者都极大地简化了 Operator 的开发。考虑到我们团队的技术栈以 Go 为主,并且希望利用更原生的 controller-runtime 库,我们选择了 Kubebuilder。它通过代码生成提供了清晰的项目结构和强大的样板代码,让我们可以专注于核心的业务逻辑——也就是调谐循环(Reconciliation Loop)。
我们的 JupyterNotebook
CRD 规格(Spec)需要包含以下关键信息:
-
image
: Notebook 环境所使用的容器镜像。 -
resources
: Pod 的 CPU、内存和 GPU 请求与限制。 -
volumeSize
: 为工作区分配的持久卷大小。
而它的状态(Status)则需要反馈给用户:
-
phase
: 当前的状态,如Creating
,Ready
,Deleting
。 -
url
: 访问 Notebook 的内部 Service 地址。
步骤化实现:从零构建 Operator
1. 初始化项目与 API 定义
我们使用 Kubebuilder CLI 来搭建项目骨架。
# 安装 Kubebuilder
# (确保已安装 Go 1.19+ 和 Docker)
os=$(go env GOOS)
arch=$(go env GOARCH)
curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/${os}/${arch}"
chmod +x kubebuilder && sudo mv kubebuilder /usr/local/bin/
# 初始化项目
mkdir jupyter-operator && cd jupyter-operator
kubebuilder init --domain my.domain --repo my.domain/jupyter-operator
# 创建 API (CRD 和 Controller)
kubebuilder create api --group mlops --version v1 --kind JupyterNotebook
执行完这些命令后,Kubebuilder 会生成 CRD 的 Go 类型定义文件 api/v1/jupyternotebook_types.go
和 Controller 的骨架 controllers/jupyternotebook_controller.go
。现在,我们来充实 JupyterNotebook
的类型定义。
api/v1/jupyternotebook_types.go
:
package v1
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// JupyterNotebookSpec defines the desired state of JupyterNotebook
type JupyterNotebookSpec struct {
// Image is the container image for the Jupyter Notebook.
// +kubebuilder:validation:Required
Image string `json:"image"`
// Resources defines the compute resources required by the notebook.
// +kubebuilder:validation:Optional
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
// VolumeSize is the size of the persistent volume for the workspace.
// e.g., 10Gi. If not specified, an emptyDir will be used.
// +kubebuilder:validation:Optional
VolumeSize resource.Quantity `json:"volumeSize,omitempty"`
// ServiceType defines the type of the service to expose the notebook.
// Defaults to ClusterIP.
// +kubebuilder:validation:Enum=ClusterIP;NodePort;LoadBalancer
// +kubebuilder:default:=ClusterIP
ServiceType corev1.ServiceType `json:"serviceType,omitempty"`
}
// JupyterNotebookPhase defines the observed state of JupyterNotebook
type JupyterNotebookPhase string
const (
PhaseCreating JupyterNotebookPhase = "Creating"
PhaseReady JupyterNotebookPhase = "Ready"
PhaseDeleting JupyterNotebookPhase = "Deleting"
PhaseFailed JupyterNotebookPhase = "Failed"
)
// JupyterNotebookStatus defines the observed state of JupyterNotebook
type JupyterNotebookStatus struct {
// Phase is the current phase of the notebook.
Phase JupyterNotebookPhase `json:"phase,omitempty"`
// URL is the address to access the Jupyter Notebook service.
URL string `json:"url,omitempty"`
// ReadyReplicas is the number of Pods running and ready.
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image"
//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="URL",type="string",JSONPath=".status.url"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// JupyterNotebook is the Schema for the jupyternotebooks API
type JupyterNotebook struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec JupyterNotebookSpec `json:"spec,omitempty"`
Status JupyterNotebookStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// JupyterNotebookList contains a list of JupyterNotebook
type JupyterNotebookList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"listMeta,omitempty"`
Items []JupyterNotebook `json:"items"`
}
func init() {
SchemeBuilder.Register(&JupyterNotebook{}, &JupyterNotebookList{})
}
这里的注释不仅仅是注释,它们被称为 controller-gen
markers,用于生成 CRD YAML 文件、RBAC 规则等。例如 +kubebuilder:subresource:status
告诉 Kubernetes API Server,status
字段应该通过 /status
子资源来更新,这可以防止控制器意外修改 spec
。
2. 实现核心调谐逻辑
调谐逻辑是 Operator 的大脑,位于 Reconcile
方法中。每当 JupyterNotebook
资源发生变化(创建、更新、删除),或者它拥有的子资源(Deployment, Service)发生变化时,这个方法就会被调用。我们的目标是让系统的实际状态(Actual State)与用户的期望状态(Desired State)保持一致。
下面是调谐逻辑的核心流程图:
graph TD A[Start Reconciliation] --> B{Fetch JupyterNotebook CR}; B --> C{Is CR being deleted?}; C -- Yes --> D[Run Finalizer Logic]; D --> E[Remove Finalizer]; E --> F[End]; C -- No --> G{Ensure Finalizer Exists}; G --> H[Reconcile PVC]; H --> I[Reconcile Deployment]; I --> J[Reconcile Service]; J --> K[Update CR Status]; K --> F;
这是 controllers/jupyternotebook_controller.go
的关键实现。代码很长,但每个部分都至关重要。
package controllers
import (
// ... imports
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
mlopsv1 "my.domain/jupyter-operator/api/v1"
)
const (
jupyterNotebookFinalizer = "mlops.my.domain/finalizer"
)
// JupyterNotebookReconciler reconciles a JupyterNotebook object
type JupyterNotebookReconciler struct {
client.Client
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=mlops.my.domain,resources=jupyternotebooks,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=mlops.my.domain,resources=jupyternotebooks/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=mlops.my.domain,resources=jupyternotebooks/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services;persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
func (r *JupyterNotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 1. Fetch the JupyterNotebook instance
notebook := &mlopsv1.JupyterNotebook{}
err := r.Get(ctx, req.NamespacedName, notebook)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("JupyterNotebook resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get JupyterNotebook")
return ctrl.Result{}, err
}
// 2. Handle deletion: Check if the deletion timestamp is set
if notebook.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object.
if !controllerutil.ContainsFinalizer(notebook, jupyterNotebookFinalizer) {
controllerutil.AddFinalizer(notebook, jupyterNotebookFinalizer)
if err := r.Update(ctx, notebook); err != nil {
return ctrl.Result{}, err
}
}
} else {
// The object is being deleted
if controllerutil.ContainsFinalizer(notebook, jupyterNotebookFinalizer) {
// Our finalizer is present, so let's handle any external dependency
logger.Info("Performing finalizer cleanup for JupyterNotebook")
// In a real project, you would add cleanup logic here, e.g.,
// deleting external resources, unregistering from a service, etc.
// For this example, our owned resources (Deployment, PVC, Service)
// are garbage collected by Kubernetes, so there's not much to do here.
// Remove our finalizer from the list and update it.
controllerutil.RemoveFinalizer(notebook, jupyterNotebookFinalizer)
if err := r.Update(ctx, notebook); err != nil {
return ctrl.Result{}, err
}
}
// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
}
// Set initial status
if notebook.Status.Phase == "" {
notebook.Status.Phase = mlopsv1.PhaseCreating
if err := r.Status().Update(ctx, notebook); err != nil {
logger.Error(err, "Failed to update JupyterNotebook status")
return ctrl.Result{}, err
}
// Requeue to process after status update
return ctrl.Result{Requeue: true}, nil
}
// 3. Reconcile PersistentVolumeClaim
pvc := &corev1.PersistentVolumeClaim{}
pvcName := fmt.Sprintf("%s-workspace", notebook.Name)
err = r.Get(ctx, types.NamespacedName{Name: pvcName, Namespace: notebook.Namespace}, pvc)
if err != nil && errors.IsNotFound(err) {
// Define a new PVC
newPvc := r.pvcForJupyterNotebook(notebook, pvcName)
logger.Info("Creating a new PVC", "PVC.Namespace", newPvc.Namespace, "PVC.Name", newPvc.Name)
if err = r.Create(ctx, newPvc); err != nil {
logger.Error(err, "Failed to create new PVC")
return ctrl.Result{}, err
}
// PVC created successfully - return and requeue
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
logger.Error(err, "Failed to get PVC")
return ctrl.Result{}, err
}
// 4. Reconcile Deployment
deployment := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: notebook.Name, Namespace: notebook.Namespace}, deployment)
if err != nil && errors.IsNotFound(err) {
// Define a new Deployment
newDep := r.deploymentForJupyterNotebook(notebook, pvcName)
logger.Info("Creating a new Deployment", "Deployment.Namespace", newDep.Namespace, "Deployment.Name", newDep.Name)
if err = r.Create(ctx, newDep); err != nil {
logger.Error(err, "Failed to create new Deployment")
return ctrl.Result{}, err
}
// Deployment created successfully - return and requeue
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
logger.Error(err, "Failed to get Deployment")
return ctrl.Result{}, err
}
// 5. Reconcile Service
service := &corev1.Service{}
err = r.Get(ctx, types.NamespacedName{Name: notebook.Name, Namespace: notebook.Namespace}, service)
if err != nil && errors.IsNotFound(err) {
// Define a new Service
newSvc := r.serviceForJupyterNotebook(notebook)
logger.Info("Creating a new Service", "Service.Namespace", newSvc.Namespace, "Service.Name", newSvc.Name)
if err = r.Create(ctx, newSvc); err != nil {
logger.Error(err, "Failed to create new Service")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
logger.Error(err, "Failed to get Service")
return ctrl.Result{}, err
}
// 6. Update Status
// In a real-world scenario, you'd check for spec drift and update sub-resources if necessary.
// For this example, we keep it simple.
newStatus := mlopsv1.JupyterNotebookStatus{}
newStatus.ReadyReplicas = deployment.Status.ReadyReplicas
if deployment.Status.ReadyReplicas > 0 {
newStatus.Phase = mlopsv1.PhaseReady
// Standard Jupyter port is 8888
newStatus.URL = fmt.Sprintf("http://%s.%s.svc.cluster.local:8888", service.Name, service.Namespace)
} else {
newStatus.Phase = mlopsv1.PhaseCreating
}
// Compare old and new status to avoid unnecessary updates
if notebook.Status.Phase != newStatus.Phase || notebook.Status.URL != newStatus.URL {
notebook.Status = newStatus
if err := r.Status().Update(ctx, notebook); err != nil {
logger.Error(err, "Failed to update JupyterNotebook status")
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// Helper functions to create desired K8s objects
func (r *JupyterNotebookReconciler) deploymentForJupyterNotebook(nb *mlopsv1.JupyterNotebook, pvcName string) *appsv1.Deployment {
labels := map[string]string{"app": "jupyter-notebook", "notebook-name": nb.Name}
replicas := int32(1)
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: nb.Name,
Namespace: nb.Namespace,
Labels: labels,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "notebook",
Image: nb.Spec.Image,
Resources: nb.Spec.Resources,
ImagePullPolicy: corev1.PullIfNotPresent,
Ports: []corev1.ContainerPort{{
ContainerPort: 8888,
Name: "http",
}},
// A common practice for jupyter images: disable auth for simplicity.
// In production, you MUST secure this.
Args: []string{
"start-notebook.sh",
"--NotebookApp.token=''",
"--NotebookApp.password=''",
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "workspace",
MountPath: "/home/jovyan/work",
},
},
}},
Volumes: []corev1.Volume{
{
Name: "workspace",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: pvcName,
},
},
},
},
},
},
},
}
// Set JupyterNotebook instance as the owner and controller
ctrl.SetControllerReference(nb, dep, r.Scheme)
return dep
}
func (r *JupyterNotebookReconciler) serviceForJupyterNotebook(nb *mlopsv1.JupyterNotebook) *corev1.Service {
labels := map[string]string{"app": "jupyter-notebook", "notebook-name": nb.Name}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: nb.Name,
Namespace: nb.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Selector: labels,
Ports: []corev1.ServicePort{{
Protocol: corev1.ProtocolTCP,
Port: 8888,
TargetPort: intstr.FromString("http"),
}},
Type: nb.Spec.ServiceType,
},
}
ctrl.SetControllerReference(nb, svc, r.Scheme)
return svc
}
func (r *JupyterNotebookReconciler) pvcForJupyterNotebook(nb *mlopsv1.JupyterNotebook, pvcName string) *corev1.PersistentVolumeClaim {
// A practical default storage class should be configured in a real cluster.
storageClassName := "standard"
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: nb.Namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: nb.Spec.VolumeSize,
},
},
StorageClassName: &storageClassName,
},
}
ctrl.SetControllerReference(nb, pvc, r.Scheme)
return pvc
}
// SetupWithManager sets up the controller with the Manager.
func (r *JupyterNotebookReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&mlopsv1.JupyterNotebook{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&corev1.PersistentVolumeClaim{}).
Complete(r)
}
几个关键点的剖析:
- Finalizer:
jupyterNotebookFinalizer
是一个关键机制。当用户删除JupyterNotebook
对象时,Kubernetes API Server 不会立即删除它,而是设置DeletionTimestamp
。我们的调谐循环检测到这个时间戳后,会执行清理逻辑(这里是空的,因为子资源会自动垃圾回收),然后移除 finalizer。只有当 finalizer 列表为空时,对象才会被真正删除。这保证了资源的有序清理。 - 幂等性: 整个
Reconcile
方法被设计为幂等的。无论它被调用多少次,只要JupyterNotebook
的spec
不变,最终集群中的状态都是一样的。我们通过r.Get
来检查资源是否存在,只在IsNotFound
错误时才创建。 - Owner References:
ctrl.SetControllerReference
是将子资源(Deployment, Service, PVC)与父资源(JupyterNotebook)关联起来的关键。这告诉 Kubernetes,这些子资源是由JupyterNotebook
“拥有”的。当JupyterNotebook
被删除时,Kubernetes 的垃圾回收器会自动清理所有被它拥有的资源。 - Status 更新: 状态更新是异步的,并且应该与 Spec 更新分开。我们只在状态确实发生变化时才调用
r.Status().Update()
,避免不必要的 API 调用。 - 单元测试思路: 虽然没有展示具体代码,但测试这个 Operator 的标准方法是使用
envtest
库。它会启动一个临时的etcd
和kube-apiserver
实例,让我们的 Controller 在一个受控的环境中运行。我们可以创建JupyterNotebook
对象,然后断言对应的Deployment
,Service
是否被正确创建,以及Status
是否被正确更新。
3. 部署与验证
首先,生成并应用 CRD 到集群中:
make manifests
make install
然后,在本地运行 Operator 进行调试:
make run
现在,打开另一个终端,创建一个 JupyterNotebook
实例。
config/samples/mlops_v1_jupyternotebook.yaml
:
apiVersion: mlops.my.domain/v1
kind: JupyterNotebook
metadata:
name: datascience-project-alpha
spec:
image: jupyter/scipy-notebook:latest
volumeSize: 10Gi
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
应用它:
kubectl apply -f config/samples/mlops_v1_jupyternotebook.yaml
很快,你就可以看到 Operator 的日志输出,显示它正在创建 PVC, Deployment 和 Service。几分钟后,检查资源状态:
# 查看 CR 状态
kubectl get jupyternotebook datascience-project-alpha -o wide
# NAME IMAGE STATUS URL AGE
# datascience-project-alpha jupyter/scipy-notebook:latest Ready http://datascience-project-alpha.default.svc.cluster.local:8888 2m
# 查看创建的子资源
kubectl get pvc,deployment,service -l notebook-name=datascience-project-alpha
# NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
# persistentvolumeclaim/datascience-project-alpha-workspace Bound pvc-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx 10Gi RWO standard 2m
# NAME READY UP-TO-DATE AVAILABLE AGE
# deployment.apps/datascience-project-alpha 1/1 1 1 2m
# NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
# service/datascience-project-alpha ClusterIP 10.96.123.123 <none> 8888/TCP 2m
当删除 JupyterNotebook
对象时,所有关联的资源也都会被自动清理。
kubectl delete jupyternotebook datascience-project-alpha
局限性与未来迭代路径
这个 Operator 只是一个起点,距离生产级可用还有一段距离。当前的实现存在一些明显的局限:
- 安全性: Notebook 容器以无密码、无令牌的方式启动,这在多租户环境中是绝对不可接受的。下一步需要集成认证机制,例如 OIDC,或者通过 sidecar 注入身份验证代理。
- 网络暴露:
ClusterIP
类型的服务只能在集群内部访问。对于需要从外部访问的场景,需要支持Ingress
资源的自动创建和配置,并处理 TLS 证书。 - 资源管理: 没有实现对空闲 Notebook 的自动关闭机制,这会导致资源浪费。可以在
Spec
中增加一个idleTimeoutSeconds
字段,Operator 需要监控 Pod 的活动并根据策略进行缩容。 - 更新策略: 当前的实现非常简单,没有处理
Spec
变更的逻辑(例如更换镜像)。一个完整的 Operator 需要能够比较Spec
和当前Deployment
的状态,并在检测到差异时触发滚动更新。
未来的迭代方向很明确:将这个 Operator 发展成一个更完备的 MLOps 平台组件,集成身份认证、网络策略、资源配额管理和成本核算功能,真正实现数据科学开发环境的自动化和自服务。