我们团队的移动应用开发流程中,功能开关(Feature Flag)的管理一直是个痛点。最初的方案是硬编码,发布周期长达数周。后来演进到后端 API 控制,但这引入了新的依赖和延迟。最新的方案是使用 Kubernetes 的 ConfigMap
,通过 CI/CD 流水线更新,再由应用后端读取。这套流程虽然实现了配置与代码分离,但对移动端开发者而言,整个过程仍然是一个黑盒,充满了延迟和不确定性。一个开关的变更请求,从提交 Git Commit 到最终在测试应用上生效,耗时可能从几分钟到半小时不等,并且状态完全不可见。
我们的目标很明确:为移动端团队提供一个近乎实时的、状态透明的功能开关管理平台。核心要求是,变更流程必须遵循 GitOps 原则,所有配置的变更源头都在 Git 仓库中;同时,需要有一个直观的界面,能实时反映线上开关的精确状态。这个界面不仅要服务于开发者,也要让产品经理能够一目了然。
初步构想与技术选型
这个系统的核心在于解决“实时性”和“GitOps”这两个看似矛盾的需求。GitOps 的流程是异步的,而用户界面需要同步的实时反馈。
GitOps引擎: 我们已经是 Flux CD 的重度用户,用它来管理整个 Kubernetes 集群的声明式状态。因此,继续使用 Flux CD 来管理功能开关的
Custom Resource
是最自然的选择。这保证了配置的唯一真相来源是 Git。状态存储与通知中心: 这是整个架构的关键。直接轮询 Kubernetes API Server 来获取
ConfigMap
或Custom Resource
的状态是不可行的,这会给 API Server 带来巨大压力,且实时性无法保证。我们需要一个能够主动“推送”变更的组件。- Redis Pub/Sub? 是一个备选方案,但它缺乏强大的持久化和一致性保证。我们不希望在订阅服务重启时丢失状态变更事件。
- etcd? 这非常诱人。作为 Kubernetes 自身的基石,etcd 提供了我们所需的一切:强一致性、事务性读写、以及一个极其强大的
Watch
API。通过Watch
机制,任何客户端都能实时订阅某个 key 前缀下的所有变更。我们可以为我们的平台搭建一个独立的 etcd 集群,用作功能开关的实时状态存储,从而与 Kubernetes 集群本身的 etcd 解耦,避免任何潜在的性能影响。
前端界面: 团队希望有一个跨平台的客户端,能在 iOS、Android 甚至 Web 上运行。Flutter 在这方面是最佳选择,一次开发,多端部署。
连接桥梁: 如何将 etcd 中的变更实时推送到 Flutter 客户端?HTTP Polling 被首先排除。WebSocket 是一个选项,但 gRPC Streaming 提供了更优的性能、更强的类型约束和更高效的二进制协议。我们可以构建一个 Go 服务,它作为 gRPC 服务端,一方面使用 etcd 的
Watch
API 监控状态,另一方面将收到的变更通过 gRPC stream 推送给 Flutter 客户端。
最终的架构图如下:
graph TD subgraph Git Repository A[Developer Commits FeatureFlag YAML] end subgraph Kubernetes Cluster B[Flux CD] --> C{Reconciles CR} C --> D[FeatureFlag CR] E[Custom Controller] -- Watches --> D E -- Writes State --> F[Application etcd Cluster] end subgraph Backend Service G[gRPC Streaming Service] -- Watches --> F end subgraph Client H[Flutter App] -- gRPC Stream --> G end A --> B
步骤化实现:从 CRD 到实时UI
第一阶段:定义 CRD 并同步状态至 etcd
首先,我们需要定义一个 FeatureFlag
的 CustomResourceDefinition
(CRD),让它成为我们 GitOps 流程中的一等公民。
crd/featureflag.crd.yaml
:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: featureflags.runtime.mycorp.com
spec:
group: runtime.mycorp.com
names:
kind: FeatureFlag
listKind: FeatureFlagList
plural: featureflags
singular: featureflag
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
appName:
type: string
description: "The name of the application this flag belongs to."
flags:
type: object
description: "A map of feature flags."
additionalProperties:
type: boolean
required:
- appName
- flags
接着,我们编写一个 Kubernetes Controller,它的唯一职责是 watch FeatureFlag
资源的变化,并将 spec.flags
的内容写入我们独立的 etcd 集群中。key 的格式设计为 /featureflags/{appName}/{flagName}
。
internal/controller/featureflag_controller.go
:
package controller
import (
"context"
"fmt"
"path"
"time"
"go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
runtimev1alpha1 "mycorp.com/featureflag-operator/api/v1alpha1"
)
// FeatureFlagReconciler reconciles a FeatureFlag object
type FeatureFlagReconciler struct {
client.Client
Scheme *runtime.Scheme
EtcdClient *clientv3.Client
}
const (
etcdRequestTimeout = 5 * time.Second
etcdKeyPrefix = "/featureflags"
)
//+kubebuilder:rbac:groups=runtime.mycorp.com,resources=featureflags,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=runtime.mycorp.com,resources=featureflags/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=runtime.mycorp.com,resources=featureflags/finalizers,verbs=update
func (r *FeatureFlagReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Reconciling FeatureFlag")
var featureFlag runtimev1alpha1.FeatureFlag
if err := r.Get(ctx, req.NamespacedName, &featureFlag); err != nil {
// 如果资源被删除,我们忽略 'not found' 错误,因为我们需要处理删除逻辑
// 这里的简化处理是,当 CR 被删除时,我们不清理 etcd 中的 key,
// 在生产环境中,应该通过 finalizer 来确保 etcd 中的数据被清理。
return ctrl.Result{}, client.IgnoreNotFound(err)
}
appName := featureFlag.Spec.AppName
logger.Info("Syncing flags for application", "appName", appName)
// 使用 etcd 事务来确保原子性更新
// 这在真实项目中至关重要,可以防止部分更新导致的状态不一致
ops := []clientv3.Op{}
for flagName, flagValue := range featureFlag.Spec.Flags {
key := path.Join(etcdKeyPrefix, appName, flagName)
value := fmt.Sprintf("%t", flagValue)
// clientv3.OpPut 会创建一个 KV 写操作
ops = append(ops, clientv3.OpPut(key, value))
}
if len(ops) > 0 {
etcdCtx, cancel := context.WithTimeout(ctx, etcdRequestTimeout)
defer cancel()
// 在一个事务中执行所有 Put 操作
if _, err := r.EtcdClient.Txn(etcdCtx).Then(ops...).Commit(); err != nil {
logger.Error(err, "Failed to commit transaction to etcd")
// 返回错误,controller-runtime 会自动进行重试
return ctrl.Result{}, err
}
}
logger.Info("Successfully synced flags to etcd", "appName", appName, "flagCount", len(featureFlag.Spec.Flags))
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *FeatureFlagReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&runtimev1alpha1.FeatureFlag{}).
Complete(r)
}
这个 Controller 的核心逻辑在 Reconcile
函数中。它获取 FeatureFlag
资源,然后构建一个 etcd 事务,将所有的开关状态原子性地写入 etcd。使用事务是生产级代码的关键,它避免了在更新多个开关时出现中间状态。
现在,开发者只需要在 Git 仓库中提交一个 FeatureFlag
的 YAML 文件,Flux CD 就会自动将其应用到集群,我们的 Controller 会监听到变化并更新 etcd。
apps/my-mobile-app/featureflags.yaml
:
apiVersion: runtime.mycorp.com/v1alpha1
kind: FeatureFlag
metadata:
name: my-mobile-app-flags
namespace: mobile-apps
spec:
appName: "my-mobile-app"
flags:
newOnboardingFlow: true
enableChatV2: false
useNewPaymentGateway: true
第二阶段:构建 gRPC 实时流服务
这是连接后端状态与前端 UI 的桥梁。我们首先用 Protobuf 定义服务接口。
proto/flags.proto
:
syntax = "proto3";
package flags;
option go_package = "mycorp.com/flag-streamer/gen/flags";
service FlagStreamer {
// WatchAppFlags 建立一个流式连接,用于接收指定 App 的功能开关变更
// 客户端发起请求时,服务端会首先推送一次全量数据
// 之后,任何变更都会被实时推送下来
rpc WatchAppFlags(WatchRequest) returns (stream FlagUpdate);
}
message WatchRequest {
string app_name = 1;
}
message FlagUpdate {
// 事件类型:INITIAL, UPDATE, DELETE
enum EventType {
UNKNOWN = 0;
INITIAL = 1; // 初始全量数据
UPDATE = 2; // 增量更新
DELETE = 3; // 删除(暂未在 controller 中实现)
}
EventType event_type = 1;
// Key-value map of flags.
// For INITIAL, this contains all flags.
// For UPDATE, it contains only the changed flags.
map<string, bool> flags = 2;
}
Go 服务端的实现是这个项目的核心。它需要处理 gRPC 连接,并为每个连接启动一个 goroutine 去 watch etcd。
cmd/streamer/main.go
(部分实现):
package main
import (
// ... imports
"go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "mycorp.com/flag-streamer/gen/flags"
)
type server struct {
pb.UnimplementedFlagStreamerServer
etcdClient *clientv3.Client
}
const etcdKeyPrefix = "/featureflags"
func (s *server) WatchAppFlags(req *pb.WatchRequest, stream pb.FlagStreamer_WatchAppFlagsServer) error {
appName := req.GetAppName()
if appName == "" {
return status.Error(codes.InvalidArgument, "appName is required")
}
ctx := stream.Context()
logger := log.FromContext(ctx).WithValues("appName", appName)
logger.Info("New client connected for watching flags")
// 1. 发送初始全量数据
// 这对于客户端冷启动至关重要,使其能立刻获得当前状态
keyPrefix := path.Join(etcdKeyPrefix, appName)
resp, err := s.etcdClient.Get(ctx, keyPrefix, clientv3.WithPrefix())
if err != nil {
logger.Error(err, "Failed to get initial flags from etcd")
return status.Error(codes.Internal, "failed to fetch initial state")
}
initialFlags := make(map[string]bool)
for _, kv := range resp.Kvs {
flagName := path.Base(string(kv.Key))
val, _ := strconv.ParseBool(string(kv.Value))
initialFlags[flagName] = val
}
if err := stream.Send(&pb.FlagUpdate{
EventType: pb.FlagUpdate_INITIAL,
Flags: initialFlags,
}); err != nil {
logger.Error(err, "Failed to send initial flags")
return status.Error(codes.Internal, "failed to send initial state")
}
// 2. 启动 Watcher 监听后续变更
// 我们从获取全量数据时的 etcd revision + 1 开始 watch,确保不会丢失任何变更
watchChan := s.etcdClient.Watch(ctx, keyPrefix, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1))
logger.Info("Starting to watch for updates", "fromRevision", resp.Header.Revision+1)
for {
select {
case <-ctx.Done():
// 客户端断开连接
logger.Info("Client disconnected")
return ctx.Err()
case watchResp, ok := <-watchChan:
if !ok {
logger.Info("Watch channel closed")
return status.Error(codes.Internal, "etcd watch channel closed")
}
if err := watchResp.Err(); err != nil {
logger.Error(err, "Etcd watch error")
return status.Error(codes.Internal, "etcd watch failed")
}
// 批处理收到的事件,减少 gRPC 消息发送频率
updatedFlags := make(map[string]bool)
for _, event := range watchResp.Events {
// 我们只关心 PUT 事件,因为 DELETE 逻辑暂未实现
if event.Type == clientv3.EventTypePut {
flagName := path.Base(string(event.Kv.Key))
val, _ := strconv.ParseBool(string(event.Kv.Value))
updatedFlags[flagName] = val
}
}
if len(updatedFlags) > 0 {
if err := stream.Send(&pb.FlagUpdate{
EventType: pb.FlagUpdate_UPDATE,
Flags: updatedFlags,
}); err != nil {
logger.Error(err, "Failed to send update to client")
// 发送失败通常意味着客户端已断开,循环将在下一次 select 时从 ctx.Done() 退出
return err
}
logger.Info("Sent update to client", "updateCount", len(updatedFlags))
}
}
}
}
// main function to setup and start gRPC server...
这段代码的健壮性体现在几个细节上:
- 先Get后Watch:客户端连接后,先
Get
一次全量数据,然后再从Get
操作的Revision
之后开始Watch
。这是一个保证数据无缝衔接的标准模式。 - 上下文管理:整个
Watch
循环都受到stream.Context()
的控制。一旦Flutter客户端断开连接,ctx.Done()
会立刻被触发,goroutine 能优雅地退出,释放资源。 - 错误处理:对 etcd 和 gRPC 的各种错误都进行了处理,并返回了合适的 gRPC 状态码。
第三阶段:Flutter 客户端集成
在 Flutter 端,我们需要使用 grpc
和 protobuf
包来与我们的服务进行通信。首先,通过 protoc
工具根据 .proto
文件生成 Dart 代码。
lib/services/flag_service.dart
:
import 'package:grpc/grpc.dart';
import 'package:rxdart/rxdart.dart';
import '../generated/flags.pbgrpc.dart';
// 一个简单的单例服务来管理功能开关状态
class FlagService {
FlagService._();
static final instance = FlagService._();
late FlagStreamerClient _client;
// BehaviorSubject 会缓存最新的值,新订阅者能立刻收到当前状态
final _flagsSubject = BehaviorSubject<Map<String, bool>>.seeded({});
// 对外暴露一个只读的 Stream
Stream<Map<String, bool>> get flags$ => _flagsSubject.stream;
Map<String, bool> get currentFlags => _flagsSubject.value;
ClientChannel? _channel;
ResponseStream<FlagUpdate>? _stream;
void connect(String appName) {
// 避免重复连接
if (_channel != null) {
disconnect();
}
print('Connecting to FlagStreamer service for app: $appName...');
_channel = ClientChannel(
'your-grpc-service-host', // 替换为你的 gRPC 服务地址
port: 50051,
options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
);
_client = FlagStreamerClient(_channel!);
_stream = _client.watchAppFlags(WatchRequest()..appName = appName);
_stream!.listen(
(FlagUpdate update) {
print('Received flag update: ${update.eventType}');
final current = Map<String, bool>.from(_flagsSubject.value);
switch (update.eventType) {
case FlagUpdate_EventType.INITIAL:
// 收到全量数据,直接替换
_flagsSubject.add(update.flags);
break;
case FlagUpdate_EventType.UPDATE:
// 收到增量更新,合并到当前状态
current.addAll(update.flags);
_flagsSubject.add(current);
break;
default:
// 忽略其他事件类型
break;
}
},
onError: (e) {
print('Flag stream error: $e');
// 在生产环境中,这里应该有重连逻辑
disconnect();
},
onDone: () {
print('Flag stream closed by server.');
disconnect();
},
);
}
void disconnect() {
print('Disconnecting from FlagStreamer service...');
_stream?.cancel();
_channel?.shutdown();
_stream = null;
_channel = null;
}
}
在 UI 层,使用 StreamBuilder
来消费这个 Stream
,可以非常优雅地实现界面的自动刷新。
lib/widgets/flag_display_widget.dart
:
import 'package:flutter/material.dart';
import '../services/flag_service.dart';
class FlagDisplayWidget extends StatelessWidget {
const FlagDisplayWidget({Key? key}) : super(key: key);
Widget build(BuildContext context) {
return StreamBuilder<Map<String, bool>>(
stream: FlagService.instance.flags$,
builder: (context, snapshot) {
if (!snapshot.hasData || snapshot.data!.isEmpty) {
return const Center(child: CircularProgressIndicator());
}
final flags = snapshot.data!;
final flagItems = flags.entries.toList()
..sort((a, b) => a.key.compareTo(b.key));
return ListView.builder(
itemCount: flagItems.length,
itemBuilder: (context, index) {
final item = flagItems[index];
return ListTile(
title: Text(item.key),
trailing: Switch(
value: item.value,
onChanged: null, // UI 是只读的
activeColor: Colors.green,
),
);
},
);
},
);
}
}
当开发者在 main.dart
或应用的某个入口点调用 FlagService.instance.connect('my-mobile-app')
后,整个数据流就建立起来了。现在,任何从 Git 提交到 etcd 的状态变更,都会在毫秒级内反映到 Flutter UI 上,无需任何用户操作。
方案的局限性与未来迭代路径
这个架构虽然实现了核心目标,但在生产环境中仍有需要完善的地方。
首先,gRPC 服务目前是单点的。为了实现高可用,需要部署多个实例,并通过 gRPC 负载均衡(如 L4 网络负载均衡器或使用服务网格)将客户端连接分发。
其次,安全性方面有所欠缺。gRPC 通信应该启用 TLS (Transport Layer Security),并且需要一套认证授权机制(例如基于 JWT 的 token)来确保只有合法的客户端才能订阅特定应用的开关信息。
最后,随着功能开关数量的增长,初始全量同步的数据包可能会变得很大。未来的优化可以考虑更精细的订阅模型,例如客户端可以只订阅它关心的部分开关,或者在服务端实现更高效的压缩算法。etcd 本身对于 value 的大小也有限制,超大规模的配置可能需要将 value 拆分或存储在其他地方,etcd 只存索引。