利用 Flux CD、etcd 与 Flutter 构建 GitOps 驱动的实时配置分发系统


我们团队的移动应用开发流程中,功能开关(Feature Flag)的管理一直是个痛点。最初的方案是硬编码,发布周期长达数周。后来演进到后端 API 控制,但这引入了新的依赖和延迟。最新的方案是使用 Kubernetes 的 ConfigMap,通过 CI/CD 流水线更新,再由应用后端读取。这套流程虽然实现了配置与代码分离,但对移动端开发者而言,整个过程仍然是一个黑盒,充满了延迟和不确定性。一个开关的变更请求,从提交 Git Commit 到最终在测试应用上生效,耗时可能从几分钟到半小时不等,并且状态完全不可见。

我们的目标很明确:为移动端团队提供一个近乎实时的、状态透明的功能开关管理平台。核心要求是,变更流程必须遵循 GitOps 原则,所有配置的变更源头都在 Git 仓库中;同时,需要有一个直观的界面,能实时反映线上开关的精确状态。这个界面不仅要服务于开发者,也要让产品经理能够一目了然。

初步构想与技术选型

这个系统的核心在于解决“实时性”和“GitOps”这两个看似矛盾的需求。GitOps 的流程是异步的,而用户界面需要同步的实时反馈。

  1. GitOps引擎: 我们已经是 Flux CD 的重度用户,用它来管理整个 Kubernetes 集群的声明式状态。因此,继续使用 Flux CD 来管理功能开关的 Custom Resource 是最自然的选择。这保证了配置的唯一真相来源是 Git。

  2. 状态存储与通知中心: 这是整个架构的关键。直接轮询 Kubernetes API Server 来获取 ConfigMapCustom Resource 的状态是不可行的,这会给 API Server 带来巨大压力,且实时性无法保证。我们需要一个能够主动“推送”变更的组件。

    • Redis Pub/Sub? 是一个备选方案,但它缺乏强大的持久化和一致性保证。我们不希望在订阅服务重启时丢失状态变更事件。
    • etcd? 这非常诱人。作为 Kubernetes 自身的基石,etcd 提供了我们所需的一切:强一致性、事务性读写、以及一个极其强大的 Watch API。通过 Watch 机制,任何客户端都能实时订阅某个 key 前缀下的所有变更。我们可以为我们的平台搭建一个独立的 etcd 集群,用作功能开关的实时状态存储,从而与 Kubernetes 集群本身的 etcd 解耦,避免任何潜在的性能影响。
  3. 前端界面: 团队希望有一个跨平台的客户端,能在 iOS、Android 甚至 Web 上运行。Flutter 在这方面是最佳选择,一次开发,多端部署。

  4. 连接桥梁: 如何将 etcd 中的变更实时推送到 Flutter 客户端?HTTP Polling 被首先排除。WebSocket 是一个选项,但 gRPC Streaming 提供了更优的性能、更强的类型约束和更高效的二进制协议。我们可以构建一个 Go 服务,它作为 gRPC 服务端,一方面使用 etcd 的 Watch API 监控状态,另一方面将收到的变更通过 gRPC stream 推送给 Flutter 客户端。

最终的架构图如下:

graph TD
    subgraph Git Repository
        A[Developer Commits FeatureFlag YAML]
    end

    subgraph Kubernetes Cluster
        B[Flux CD] --> C{Reconciles CR}
        C --> D[FeatureFlag CR]
        E[Custom Controller] -- Watches --> D
        E -- Writes State --> F[Application etcd Cluster]
    end

    subgraph Backend Service
        G[gRPC Streaming Service] -- Watches --> F
    end

    subgraph Client
        H[Flutter App] -- gRPC Stream --> G
    end

    A --> B

步骤化实现:从 CRD 到实时UI

第一阶段:定义 CRD 并同步状态至 etcd

首先,我们需要定义一个 FeatureFlagCustomResourceDefinition (CRD),让它成为我们 GitOps 流程中的一等公民。

crd/featureflag.crd.yaml:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: featureflags.runtime.mycorp.com
spec:
  group: runtime.mycorp.com
  names:
    kind: FeatureFlag
    listKind: FeatureFlagList
    plural: featureflags
    singular: featureflag
  scope: Namespaced
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                appName:
                  type: string
                  description: "The name of the application this flag belongs to."
                flags:
                  type: object
                  description: "A map of feature flags."
                  additionalProperties:
                    type: boolean
              required:
                - appName
                - flags

接着,我们编写一个 Kubernetes Controller,它的唯一职责是 watch FeatureFlag 资源的变化,并将 spec.flags 的内容写入我们独立的 etcd 集群中。key 的格式设计为 /featureflags/{appName}/{flagName}

internal/controller/featureflag_controller.go:

package controller

import (
	"context"
	"fmt"
	"path"
	"time"

	"go.etcd.io/etcd/client/v3"
	"k8s.io/apimachinery/pkg/runtime"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"

	runtimev1alpha1 "mycorp.com/featureflag-operator/api/v1alpha1"
)

// FeatureFlagReconciler reconciles a FeatureFlag object
type FeatureFlagReconciler struct {
	client.Client
	Scheme   *runtime.Scheme
	EtcdClient *clientv3.Client
}

const (
	etcdRequestTimeout = 5 * time.Second
	etcdKeyPrefix      = "/featureflags"
)

//+kubebuilder:rbac:groups=runtime.mycorp.com,resources=featureflags,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=runtime.mycorp.com,resources=featureflags/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=runtime.mycorp.com,resources=featureflags/finalizers,verbs=update

func (r *FeatureFlagReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Reconciling FeatureFlag")

	var featureFlag runtimev1alpha1.FeatureFlag
	if err := r.Get(ctx, req.NamespacedName, &featureFlag); err != nil {
		// 如果资源被删除,我们忽略 'not found' 错误,因为我们需要处理删除逻辑
		// 这里的简化处理是,当 CR 被删除时,我们不清理 etcd 中的 key,
		// 在生产环境中,应该通过 finalizer 来确保 etcd 中的数据被清理。
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}

	appName := featureFlag.Spec.AppName
	logger.Info("Syncing flags for application", "appName", appName)

	// 使用 etcd 事务来确保原子性更新
	// 这在真实项目中至关重要,可以防止部分更新导致的状态不一致
	ops := []clientv3.Op{}
	for flagName, flagValue := range featureFlag.Spec.Flags {
		key := path.Join(etcdKeyPrefix, appName, flagName)
		value := fmt.Sprintf("%t", flagValue)
		
		// clientv3.OpPut 会创建一个 KV 写操作
		ops = append(ops, clientv3.OpPut(key, value))
	}

	if len(ops) > 0 {
		etcdCtx, cancel := context.WithTimeout(ctx, etcdRequestTimeout)
		defer cancel()

		// 在一个事务中执行所有 Put 操作
		if _, err := r.EtcdClient.Txn(etcdCtx).Then(ops...).Commit(); err != nil {
			logger.Error(err, "Failed to commit transaction to etcd")
			// 返回错误,controller-runtime 会自动进行重试
			return ctrl.Result{}, err
		}
	}
	
	logger.Info("Successfully synced flags to etcd", "appName", appName, "flagCount", len(featureFlag.Spec.Flags))

	return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *FeatureFlagReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&runtimev1alpha1.FeatureFlag{}).
		Complete(r)
}

这个 Controller 的核心逻辑在 Reconcile 函数中。它获取 FeatureFlag 资源,然后构建一个 etcd 事务,将所有的开关状态原子性地写入 etcd。使用事务是生产级代码的关键,它避免了在更新多个开关时出现中间状态。

现在,开发者只需要在 Git 仓库中提交一个 FeatureFlag 的 YAML 文件,Flux CD 就会自动将其应用到集群,我们的 Controller 会监听到变化并更新 etcd。

apps/my-mobile-app/featureflags.yaml:

apiVersion: runtime.mycorp.com/v1alpha1
kind: FeatureFlag
metadata:
  name: my-mobile-app-flags
  namespace: mobile-apps
spec:
  appName: "my-mobile-app"
  flags:
    newOnboardingFlow: true
    enableChatV2: false
    useNewPaymentGateway: true

第二阶段:构建 gRPC 实时流服务

这是连接后端状态与前端 UI 的桥梁。我们首先用 Protobuf 定义服务接口。

proto/flags.proto:

syntax = "proto3";

package flags;

option go_package = "mycorp.com/flag-streamer/gen/flags";

service FlagStreamer {
  // WatchAppFlags 建立一个流式连接,用于接收指定 App 的功能开关变更
  // 客户端发起请求时,服务端会首先推送一次全量数据
  // 之后,任何变更都会被实时推送下来
  rpc WatchAppFlags(WatchRequest) returns (stream FlagUpdate);
}

message WatchRequest {
  string app_name = 1;
}

message FlagUpdate {
  // 事件类型:INITIAL, UPDATE, DELETE
  enum EventType {
    UNKNOWN = 0;
    INITIAL = 1; // 初始全量数据
    UPDATE = 2;  // 增量更新
    DELETE = 3;  // 删除(暂未在 controller 中实现)
  }

  EventType event_type = 1;
  // Key-value map of flags.
  // For INITIAL, this contains all flags.
  // For UPDATE, it contains only the changed flags.
  map<string, bool> flags = 2;
}

Go 服务端的实现是这个项目的核心。它需要处理 gRPC 连接,并为每个连接启动一个 goroutine 去 watch etcd。

cmd/streamer/main.go (部分实现):

package main

import (
	// ... imports
	"go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	pb "mycorp.com/flag-streamer/gen/flags"
)

type server struct {
	pb.UnimplementedFlagStreamerServer
	etcdClient *clientv3.Client
}

const etcdKeyPrefix = "/featureflags"

func (s *server) WatchAppFlags(req *pb.WatchRequest, stream pb.FlagStreamer_WatchAppFlagsServer) error {
	appName := req.GetAppName()
	if appName == "" {
		return status.Error(codes.InvalidArgument, "appName is required")
	}

	ctx := stream.Context()
	logger := log.FromContext(ctx).WithValues("appName", appName)
	logger.Info("New client connected for watching flags")

	// 1. 发送初始全量数据
	// 这对于客户端冷启动至关重要,使其能立刻获得当前状态
	keyPrefix := path.Join(etcdKeyPrefix, appName)
	resp, err := s.etcdClient.Get(ctx, keyPrefix, clientv3.WithPrefix())
	if err != nil {
		logger.Error(err, "Failed to get initial flags from etcd")
		return status.Error(codes.Internal, "failed to fetch initial state")
	}

	initialFlags := make(map[string]bool)
	for _, kv := range resp.Kvs {
		flagName := path.Base(string(kv.Key))
		val, _ := strconv.ParseBool(string(kv.Value))
		initialFlags[flagName] = val
	}

	if err := stream.Send(&pb.FlagUpdate{
		EventType: pb.FlagUpdate_INITIAL,
		Flags:     initialFlags,
	}); err != nil {
		logger.Error(err, "Failed to send initial flags")
		return status.Error(codes.Internal, "failed to send initial state")
	}

	// 2. 启动 Watcher 监听后续变更
	// 我们从获取全量数据时的 etcd revision + 1 开始 watch,确保不会丢失任何变更
	watchChan := s.etcdClient.Watch(ctx, keyPrefix, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1))

	logger.Info("Starting to watch for updates", "fromRevision", resp.Header.Revision+1)

	for {
		select {
		case <-ctx.Done():
			// 客户端断开连接
			logger.Info("Client disconnected")
			return ctx.Err()
		case watchResp, ok := <-watchChan:
			if !ok {
				logger.Info("Watch channel closed")
				return status.Error(codes.Internal, "etcd watch channel closed")
			}
			if err := watchResp.Err(); err != nil {
				logger.Error(err, "Etcd watch error")
				return status.Error(codes.Internal, "etcd watch failed")
			}

			// 批处理收到的事件,减少 gRPC 消息发送频率
			updatedFlags := make(map[string]bool)
			for _, event := range watchResp.Events {
				// 我们只关心 PUT 事件,因为 DELETE 逻辑暂未实现
				if event.Type == clientv3.EventTypePut {
					flagName := path.Base(string(event.Kv.Key))
					val, _ := strconv.ParseBool(string(event.Kv.Value))
					updatedFlags[flagName] = val
				}
			}

			if len(updatedFlags) > 0 {
				if err := stream.Send(&pb.FlagUpdate{
					EventType: pb.FlagUpdate_UPDATE,
					Flags:     updatedFlags,
				}); err != nil {
					logger.Error(err, "Failed to send update to client")
					// 发送失败通常意味着客户端已断开,循环将在下一次 select 时从 ctx.Done() 退出
					return err
				}
				logger.Info("Sent update to client", "updateCount", len(updatedFlags))
			}
		}
	}
}

// main function to setup and start gRPC server...

这段代码的健壮性体现在几个细节上:

  • 先Get后Watch:客户端连接后,先Get一次全量数据,然后再从Get操作的Revision之后开始Watch。这是一个保证数据无缝衔接的标准模式。
  • 上下文管理:整个Watch循环都受到stream.Context()的控制。一旦Flutter客户端断开连接,ctx.Done()会立刻被触发,goroutine 能优雅地退出,释放资源。
  • 错误处理:对 etcd 和 gRPC 的各种错误都进行了处理,并返回了合适的 gRPC 状态码。

第三阶段:Flutter 客户端集成

在 Flutter 端,我们需要使用 grpcprotobuf 包来与我们的服务进行通信。首先,通过 protoc 工具根据 .proto 文件生成 Dart 代码。

lib/services/flag_service.dart:

import 'package:grpc/grpc.dart';
import 'package:rxdart/rxdart.dart';

import '../generated/flags.pbgrpc.dart';

// 一个简单的单例服务来管理功能开关状态
class FlagService {
  FlagService._();
  static final instance = FlagService._();

  late FlagStreamerClient _client;
  // BehaviorSubject 会缓存最新的值,新订阅者能立刻收到当前状态
  final _flagsSubject = BehaviorSubject<Map<String, bool>>.seeded({});

  // 对外暴露一个只读的 Stream
  Stream<Map<String, bool>> get flags$ => _flagsSubject.stream;
  Map<String, bool> get currentFlags => _flagsSubject.value;

  ClientChannel? _channel;
  ResponseStream<FlagUpdate>? _stream;

  void connect(String appName) {
    // 避免重复连接
    if (_channel != null) {
      disconnect();
    }

    print('Connecting to FlagStreamer service for app: $appName...');
    _channel = ClientChannel(
      'your-grpc-service-host', // 替换为你的 gRPC 服务地址
      port: 50051,
      options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
    );
    _client = FlagStreamerClient(_channel!);

    _stream = _client.watchAppFlags(WatchRequest()..appName = appName);

    _stream!.listen(
      (FlagUpdate update) {
        print('Received flag update: ${update.eventType}');
        final current = Map<String, bool>.from(_flagsSubject.value);

        switch (update.eventType) {
          case FlagUpdate_EventType.INITIAL:
            // 收到全量数据,直接替换
            _flagsSubject.add(update.flags);
            break;
          case FlagUpdate_EventType.UPDATE:
            // 收到增量更新,合并到当前状态
            current.addAll(update.flags);
            _flagsSubject.add(current);
            break;
          default:
            // 忽略其他事件类型
            break;
        }
      },
      onError: (e) {
        print('Flag stream error: $e');
        // 在生产环境中,这里应该有重连逻辑
        disconnect();
      },
      onDone: () {
        print('Flag stream closed by server.');
        disconnect();
      },
    );
  }

  void disconnect() {
    print('Disconnecting from FlagStreamer service...');
    _stream?.cancel();
    _channel?.shutdown();
    _stream = null;
    _channel = null;
  }
}

在 UI 层,使用 StreamBuilder 来消费这个 Stream,可以非常优雅地实现界面的自动刷新。

lib/widgets/flag_display_widget.dart:

import 'package:flutter/material.dart';
import '../services/flag_service.dart';

class FlagDisplayWidget extends StatelessWidget {
  const FlagDisplayWidget({Key? key}) : super(key: key);

  
  Widget build(BuildContext context) {
    return StreamBuilder<Map<String, bool>>(
      stream: FlagService.instance.flags$,
      builder: (context, snapshot) {
        if (!snapshot.hasData || snapshot.data!.isEmpty) {
          return const Center(child: CircularProgressIndicator());
        }

        final flags = snapshot.data!;
        final flagItems = flags.entries.toList()
          ..sort((a, b) => a.key.compareTo(b.key));

        return ListView.builder(
          itemCount: flagItems.length,
          itemBuilder: (context, index) {
            final item = flagItems[index];
            return ListTile(
              title: Text(item.key),
              trailing: Switch(
                value: item.value,
                onChanged: null, // UI 是只读的
                activeColor: Colors.green,
              ),
            );
          },
        );
      },
    );
  }
}

当开发者在 main.dart 或应用的某个入口点调用 FlagService.instance.connect('my-mobile-app') 后,整个数据流就建立起来了。现在,任何从 Git 提交到 etcd 的状态变更,都会在毫秒级内反映到 Flutter UI 上,无需任何用户操作。

方案的局限性与未来迭代路径

这个架构虽然实现了核心目标,但在生产环境中仍有需要完善的地方。

首先,gRPC 服务目前是单点的。为了实现高可用,需要部署多个实例,并通过 gRPC 负载均衡(如 L4 网络负载均衡器或使用服务网格)将客户端连接分发。

其次,安全性方面有所欠缺。gRPC 通信应该启用 TLS (Transport Layer Security),并且需要一套认证授权机制(例如基于 JWT 的 token)来确保只有合法的客户端才能订阅特定应用的开关信息。

最后,随着功能开关数量的增长,初始全量同步的数据包可能会变得很大。未来的优化可以考虑更精细的订阅模型,例如客户端可以只订阅它关心的部分开关,或者在服务端实现更高效的压缩算法。etcd 本身对于 value 的大小也有限制,超大规模的配置可能需要将 value 拆分或存储在其他地方,etcd 只存索引。


  目录