我们团队维护一个复杂的内部运营平台,其前端架构在三年前就演进到了微前端(Micro-frontend)。最初,各个微应用间的通信依赖于 Custom Events 和一个共享的全局状态库,这种方案在系统规模尚小时尚可应付。但随着业务复杂度的指数级增长,问题开始集中爆发:跨应用的状态同步逻辑变得脆弱不堪,任何一个微应用的异常状态都可能污染全局,导致连锁崩溃。更致命的是,刷新页面后,所有临时状态全部丢失,用户需要重新执行一系列操作才能恢复工作上下文,而排查问题时,我们完全无法追溯用户的操作序列,因为它们只是浏览器内存中瞬时生灭的事件。
痛点已经明确:我们需要一种能够跨越页面生命周期、可追溯、且具备工业级解耦能力的通信机制。与其在前端领域修修补补,我们决定换一个思路——将后端成熟的事件驱动架构(EDA)思想引入前端,构建一个持久化的微前端事件总线。
初步构想与技术选型决策
构想的核心是将前端UI事件视为与后端业务事件同等重要的一等公民。每个有意义的用户操作(例如:触发一次构建、更新一项配置、启动一个任务)不再是简单的函数调用,而是被封装成一个事件对象,发送到一个中心化的、持久化的事件总线。其他微应用或后端服务可以订阅这些事件,并据此更新自身状态或执行相应逻辑。
这个构想的架构图如下:
graph TD subgraph Browser MFE_A[微应用 A] MFE_B[微应用 B] WS_Client[WebSocket Client] MFE_A -- publish event --> WS_Client MFE_B -- publish event --> WS_Client WS_Client -- push updates --> MFE_A WS_Client -- push updates --> MFE_B end subgraph Backend Infrastructure Gateway[WebSocket Gateway] MQ[ActiveMQ Broker] PersistenceSvc[事件持久化服务] StateSvc[状态物化服务] DB[(PostgreSQL)] end WS_Client <-->|STOMP over WebSocket| Gateway Gateway -- publish --> MQ PersistenceSvc -- subscribe --> MQ StateSvc -- subscribe --> MQ MQ -- push --> Gateway PersistenceSvc -- write event log --> DB StateSvc -- update materialized view --> DB
技术选型考量:
消息队列 (MQ) - ActiveMQ: 为什么选择 ActiveMQ 而不是更流行的 RabbitMQ 或 Kafka?在真实项目中,技术选型往往不只是看性能指标。我们的运维团队对 ActiveMQ Artemis 有着超过五年的生产环境维护经验,相关的监控、告警、灾备方案都非常成熟。此外,ActiveMQ 对 STOMP (Simple Text Oriented Messaging Protocol) 协议的良好支持,使其能非常方便地通过 WebSocket 与浏览器端直接集成。我们需要的是它的持久化主题(Durable Topics)和消息可靠性保证,确保前端发出的每一个“指令”都不会丢失。
ORM - Prisma: 事件溯源(Event Sourcing)是这个架构的自然选择。我们会记录所有事件(写模型),并根据事件流计算出当前系统的最新状态(读模型)。这意味着我们需要频繁地写入事件日志,并构建物化视图。直接编写 SQL 来处理这两种模型非常繁琐且容易出错。Prisma 以其强大的类型安全和简洁的 API,极大地简化了数据库操作,特别是对于 TypeScript 项目,它能提供从数据库 Schema 到应用代码的端到端类型校验。
前端构建工具 - Turbopack: 我们的微前端项目采用 Monorepo 结构管理。随着微应用数量增至两位数,传统的 Webpack 构建和本地开发服务器启动时间已经严重影响了开发效率。我们评估了 Vite 和 Turbopack,最终选择了 Turbopack。它基于 Rust 构建,其增量计算引擎在大型 Monorepo 项目中表现出的近乎瞬时的 HMR (Hot Module Replacement) 和极快的冷启动速度,是决定性的优势。这纯粹是一个为了提升开发者体验(DX)的选择。
步骤化实现:从后端到前端
1. 数据库模型定义 (Prisma Schema)
首先,我们需要定义事件日志和物化视图的模型。事件日志是不可变(Immutable)的,只增不改。物化视图则是可变的,它代表了系统的当前状态快照。
./packages/db/prisma/schema.prisma
// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema
generator client {
provider = "prisma-client-js"
output = "../src/generated/client"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
// 记录所有前端发来的事件,这是系统的真相来源 (Source of Truth)
model MfeEventLog {
id String @id @default(cuid())
// 事件的唯一标识符,用于幂等性处理
eventId String @unique
// 事件类型,例如 'service.restart', 'config.update'
type String
// 事件负载,JSON格式
payload Json
// 事件来源的微应用标识
source String
// 用户标识
userId String
// 事件创建时间戳
createdAt DateTime @default(now())
@@index([type])
@@index([userId, createdAt])
}
// 物化视图:服务的当前状态,这是读模型
// 它由事件驱动更新,用于快速查询
model ServiceStatus {
id String @id
// 服务名称
name String
// 服务的当前状态: 'RUNNING', 'STOPPED', 'RESTARTING'
status String
// 配置的版本号
configVersion Int
// 最近一次操作的事件ID
lastEventId String
// 最后更新时间
updatedAt DateTime @updatedAt
@@index([status])
}
2. 后端核心服务:WebSocket 网关与事件处理器
我们使用 Node.js、Express 和 stomp-broker-js
库来构建 WebSocket 网关。这个网关是前端和 ActiveMQ 之间的桥梁。
./packages/gateway/src/server.ts
import http from 'http';
import express from 'express';
import { StompServer } from 'stomp-broker-js';
import WebSocket from 'ws';
import { logger } from './utils/logger';
const app = express();
const server = http.createServer(app);
// ActiveMQ STOMP 连接配置
// 在生产环境中,这些值应该来自环境变量
const stompConfig = {
host: 'activemq-broker.internal',
port: 61613,
login: 'admin',
passcode: 'secret_password',
// 启用重连机制
reconnect: {
retries: -1, // 无限重试
delay: 5000, // 5秒间隔
},
};
const stompServer = new StompServer({
server: new WebSocket.Server({ server }),
brokerURL: `stomp://${stompConfig.host}:${stompConfig.port}`,
wsHandler: (stompSocket, wsSocket, req) => {
logger.info({ ip: req.socket.remoteAddress }, 'New WebSocket client connected.');
// 这里可以添加认证逻辑,例如校验 req 中的 cookie 或 token
},
debug: (msg) => {
// 将 stomp-broker-js 的调试日志接入我们的日志系统
logger.trace(msg);
}
});
stompServer.on('error', (err) => {
logger.error({ error: err }, 'STOMP server error.');
});
stompServer.on('connected', () => {
logger.info('Gateway successfully connected to ActiveMQ broker.');
});
const PORT = process.env.PORT || 3001;
server.listen(PORT, () => {
logger.info(`WebSocket Gateway listening on port ${PORT}`);
});
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received. Closing connections.');
server.close(() => {
stompServer.close(() => {
logger.info('All connections closed. Exiting.');
process.exit(0);
});
});
});
这个网关本身不处理任何业务逻辑,它只负责透明地转发 STOMP 帧。
接下来是两个独立的微服务,它们订阅 ActiveMQ 的主题来处理业务。
./packages/persistence-service/src/index.ts
import { Client } from '@stomp/stompjs';
import { PrismaClient } from '@my-org/db';
import { logger } from './utils/logger';
import { z } from 'zod';
// 使用 Zod 定义事件的 schema,确保消息格式正确
const eventSchema = z.object({
eventId: z.string().uuid(),
type: z.string().min(1),
payload: z.record(z.unknown()),
source: z.string().min(1),
userId: z.string().min(1),
});
const prisma = new PrismaClient();
const client = new Client({
brokerURL: `ws://activemq-broker.internal:61613`, // 注意:这里用 ws 或 stomp 取决于broker配置
connectHeaders: {
login: 'admin',
passcode: 'secret_password',
},
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
});
client.onConnect = (frame) => {
logger.info('Persistence service connected to ActiveMQ.');
// 订阅通配符主题,接收所有微前端事件
client.subscribe('/topic/mfe.events.>', (message) => {
try {
const rawEvent = JSON.parse(message.body);
const parsedEvent = eventSchema.parse(rawEvent);
// 核心逻辑:将事件存入数据库
prisma.mfeEventLog.create({
data: {
eventId: parsedEvent.eventId,
type: parsedEvent.type,
payload: parsedEvent.payload,
source: parsedEvent.source,
userId: parsedEvent.userId,
}
}).then(() => {
logger.info({ eventId: parsedEvent.eventId, type: parsedEvent.type }, 'Event persisted successfully.');
}).catch(err => {
// 处理唯一键冲突,实现幂等性
if (err.code === 'P2002' && err.meta?.target?.includes('eventId')) {
logger.warn({ eventId: parsedEvent.eventId }, 'Duplicate event received. Ignoring.');
} else {
logger.error({ error: err, eventId: parsedEvent.eventId }, 'Failed to persist event.');
// 在真实项目中,这里应该将失败的消息推送到一个死信队列(DLQ)
}
});
} catch (error) {
logger.error({ error, body: message.body }, 'Failed to parse or process message.');
}
});
};
client.onStompError = (frame) => {
logger.error(`Broker reported error: ${frame.headers['message']}`);
logger.error(`Additional details: ${frame.body}`);
};
async function main() {
await prisma.$connect();
client.activate();
}
main().catch(e => {
logger.error(e);
process.exit(1);
});
这个服务是“写模型”的处理器,它唯一的职责就是把事件记录下来。
./packages/state-service/src/processor.ts
import { PrismaClient, ServiceStatus } from '@my-org/db';
import { logger } from './utils/logger';
const prisma = new PrismaClient();
// 事件处理逻辑的映射表
// 这里的关键是每个处理器都是一个纯函数:(state, event) => newState
const eventHandlers: { [key: string]: Function } = {
'service.restarted': async (event: any): Promise<void> => {
const { serviceId } = event.payload;
await prisma.serviceStatus.update({
where: { id: serviceId },
data: {
status: 'RUNNING',
lastEventId: event.eventId,
},
});
logger.info({ serviceId }, 'Service status updated to RUNNING.');
},
'service.restart.initiated': async (event: any): Promise<void> => {
const { serviceId, initiatedBy } = event.payload;
await prisma.serviceStatus.upsert({
where: { id: serviceId },
update: {
status: 'RESTARTING',
lastEventId: event.eventId,
},
create: {
id: serviceId,
name: `Service-${serviceId}`, // 实际项目中名称应来自事件或数据库
status: 'RESTARTING',
configVersion: 1,
lastEventId: event.eventId,
}
});
logger.info({ serviceId, initiatedBy }, 'Service status updated to RESTARTING.');
},
// ... 其他事件处理器
};
export async function processEvent(event: any): Promise<void> {
const handler = eventHandlers[event.type];
if (handler) {
try {
await handler(event);
} catch (error) {
logger.error({ error, eventId: event.eventId }, `Error processing event for state projection.`);
// 失败处理,例如记录错误或发送告警
}
} else {
logger.trace({ type: event.type }, 'No state handler for this event type.');
}
}
// StateService 的主入口会像 PersistenceService 一样订阅主题,
// 然后调用 processEvent 函数。
这个服务是“读模型”的构建器。它消费事件流,并更新物化视图,供前端快速查询。
3. 前端集成:Turbopack 配置与 React Hook
在 Monorepo 根目录下的 turbo.json
配置如下,Turbopack 会根据任务依赖关系并行执行构建和开发任务。
./turbo.json
{
"$schema": "https://turbo.build/schema.json",
"pipeline": {
"build": {
"dependsOn": ["^build"],
"outputs": ["dist/**"]
},
"lint": {},
"dev": {
"cache": false,
"persistent": true
}
}
}
接下来是前端的核心,一个可复用的 React Hook,用于封装与事件总线的交互。
./packages/ui-hooks/src/useEventBus.ts
import { useState, useEffect, useRef, useCallback } from 'react';
import { Client, IMessage, StompSubscription } from '@stomp/stompjs';
import { v4 as uuidv4 } from 'uuid';
// 注意:在真实项目中,URL应该是可配置的
const GATEWAY_URL = 'ws://localhost:3001';
interface EventBusState {
isConnected: boolean;
error: string | null;
}
interface PublishOptions {
source: string; // 微应用标识
userId: string;
}
export function useEventBus() {
const [state, setState] = useState<EventBusState>({ isConnected: false, error: null });
const clientRef = useRef<Client | null>(null);
const subscriptionsRef = useRef<Map<string, StompSubscription>>(new Map());
useEffect(() => {
// 初始化 STOMP client
const client = new Client({
brokerURL: GATEWAY_URL,
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
onConnect: () => {
setState({ isConnected: true, error: null });
console.log('EventBus connected.');
},
onDisconnect: () => {
setState({ isConnected: false, error: null });
console.log('EventBus disconnected.');
},
onStompError: (frame) => {
const errorMessage = `Broker error: ${frame.headers['message']} | Details: ${frame.body}`;
setState(s => ({ ...s, error: errorMessage }));
console.error(errorMessage);
},
});
client.activate();
clientRef.current = client;
return () => {
client.deactivate();
console.log('EventBus deactivated.');
};
}, []);
const publish = useCallback((type: string, payload: object, options: PublishOptions) => {
if (!clientRef.current || !state.isConnected) {
console.error('Cannot publish: EventBus is not connected.');
// 在生产环境中,可以考虑将事件放入一个队列,在重连后发送
return;
}
const event = {
eventId: uuidv4(),
type,
payload,
source: options.source,
userId: options.userId,
};
// 主题格式: mfe.events.<event_type>
const destination = `/topic/mfe.events.${type}`;
clientRef.current.publish({
destination,
body: JSON.stringify(event),
headers: { 'content-type': 'application/json', 'persistent': 'true' }
});
}, [state.isConnected]);
const subscribe = useCallback((topic: string, callback: (message: IMessage) => void) => {
if (!clientRef.current) {
console.error('Cannot subscribe: client is not initialized.');
return () => {};
}
// 如果已连接,则立即订阅
if (clientRef.current.connected) {
const sub = clientRef.current.subscribe(topic, callback);
subscriptionsRef.current.set(topic, sub);
} else {
// 如果未连接,则在连接成功后订阅
// 这里的实现需要对 client 的 onConnect 事件进行扩展
// 简单起见,我们假设调用 subscribe 时已经连接
console.warn('Subscribing while disconnected. Subscription will activate on connect.');
// A more robust solution would queue subscriptions.
}
// 返回一个取消订阅的函数
return () => {
const sub = subscriptionsRef.current.get(topic);
if (sub) {
sub.unsubscribe();
subscriptionsRef.current.delete(topic);
}
};
}, []);
return { ...state, publish, subscribe };
}
4. 微应用实战
假设我们有两个微应用:ControlPanel
用于执行操作,StatusDashboard
用于展示状态。
./apps/control-panel/src/components/ServiceController.tsx
import React from 'react';
import { useEventBus } from '@my-org/ui-hooks';
const CURRENT_USER_ID = 'user-123'; // 实际应从认证上下文中获取
const MFE_SOURCE = 'ControlPanel';
export function ServiceController({ serviceId }: { serviceId: string }) {
const { publish, isConnected } = useEventBus();
const handleRestart = () => {
if (!isConnected) {
alert('Connection to system bus lost. Please wait.');
return;
}
console.log(`Publishing restart event for ${serviceId}`);
publish(
'service.restart.initiated',
{ serviceId, initiatedBy: CURRENT_USER_ID },
{ source: MFE_SOURCE, userId: CURRENT_USER_ID }
);
};
return (
<div>
<h3>Service: {serviceId}</h3>
<button onClick={handleRestart} disabled={!isConnected}>
Restart Service
</button>
</div>
);
}
./apps/status-dashboard/src/components/ServiceStatusDisplay.tsx
import React, { useState, useEffect } from 'react';
import { useEventBus } from '@my-org/ui-hooks';
import { IMessage } from '@stomp/stompjs';
// 初始状态可以从 API 加载
const initialStatus = 'LOADING';
export function ServiceStatusDisplay({ serviceId }: { serviceId: string }) {
const [status, setStatus] = useState(initialStatus);
const { subscribe } = useEventBus();
useEffect(() => {
// 订阅与特定服务相关的状态更新
const topic = `/topic/state.updates.service.${serviceId}`;
// 假设后端 StateService 在物化视图更新后,会向这个更具体的主题发布更新消息
const unsubscribe = subscribe(topic, (message: IMessage) => {
try {
const update = JSON.parse(message.body);
setStatus(update.status);
} catch (e) {
console.error('Failed to parse status update', e);
}
});
// 组件卸载时取消订阅
return () => unsubscribe();
}, [serviceId, subscribe]);
return (
<div>
<span>Service {serviceId} Status:</span>
<strong>{status}</strong>
</div>
);
}
至此,我们完成了一个闭环。用户在 ControlPanel
点击按钮,事件通过 WebSocket 发送到 ActiveMQ,被 PersistenceService
和 StateService
处理,StateService
更新数据库后可能还会向另一个主题发布状态变更消息,最终被 StatusDashboard
接收并更新UI。整个过程是异步、解耦且持久化的。
方案的局限性与未来展望
此架构并非银弹。它引入的复杂度和延迟远高于传统的前端状态管理方案。对于需要毫秒级响应的UI交互(如拖拽、实时输入校验),这套机制完全不适用。它的价值在于处理那些代表重要业务状态变迁的、低频但高价值的用户操作。
当前实现中的 WebSocket 网关是一个潜在的单点故障和性能瓶颈。在生产环境中,它需要被设计成一个可水平扩展的集群,并使用负载均衡器分发连接。此外,事件 Schema 的管理至关重要,随着系统演进,需要引入类似 Schema Registry 的机制来确保向后兼容性,避免因事件格式变更导致的服务中断。
未来的一个优化方向是探索更细粒度的消息路由。例如,可以让前端直接订阅与自身相关的特定资源主题(如 /topic/state.updates/service/abc-123
),而不是订阅一个宽泛的主题然后在客户端进行过滤。这需要网关具备更智能的权限校验和主题映射能力,但可以显著降低不必要的网络流量和前端处理负载。