我们面临的挑战是构建一个大型、多团队协作的在线诊断平台。平台由多个独立的微前端(如视频通话模块、实时遥测图表模块、共享白板模块)组成,它们需要共享并同步一个统一的实时会话状态。这个状态不仅包含谁在线,还包括每个用户的光标位置、正在交互的组件ID,以及WebRTC通话的信令数据。系统的核心要求是低延迟、高可扩展性,并保证微前端之间的技术隔离。
方案权衡:中心化WebSocket vs. 去中心化WebRTC DataChannel
一个直接的方案是使用一个中心化的WebSocket服务器,结合一个全局的Redux或Vuex store。所有微前端通过一个事件总线与这个全局store通信,WebSocket服务器负责广播状态变化。
方案A: 中心化WebSocket + 全局Store
- 优势:
- 实现直观,业务逻辑集中。
- 状态变更的来源单一,易于调试。
- 劣势:
- 强耦合: 所有微前端都必须依赖同一个全局store的结构,任何一个模块对store的修改都可能影响其他模块,违背了微前端的独立性原则。
- 性能瓶颈: 所有状态更新,无论大小,都经过中心服务器转发。在高频交互场景(如光标移动),中心服务器和客户端的单一WebSocket连接会成为瓶颈。
- 扩展性差: 服务端需要维护所有房间的所有状态,随着用户和房间数增加,内存和CPU压力剧增。
方案B: WebRTC DataChannel + NoSQL信令 + 共享MobX实例
这个方案将状态同步的职责进行拆分。
- WebRTC DataChannel: 用于客户端之间高频、低延迟的状态同步,例如光标位置、绘图操作。数据直接在对等端之间传输,不经过中心服务器。
- NoSQL (Redis) + WebSocket信令服务器: 用于处理WebRTC的连接建立(SDP、ICE Candidate交换)和低频的全局状态同步(用户加入/离开房间)。Redis的Pub/Sub能力天然适合做信令广播,并且性能极高。
- 共享的MobX实例: 在微前端的宿主应用(Shell)中创建一个MobX store实例,并通过props或依赖注入的方式传递给每个微前端。这个store负责管理会话的“元数据”(如房间成员列表),并提供响应式API。微前端内部的状态变化可以直接通过DataChannel广播,接收到的对端数据再更新到这个共享的MobX store中,从而驱动UI更新。
决策:
对于一个要求长期演进和团队隔离的大型平台,方案B尽管初始复杂度更高,但其解耦能力、性能优势和可扩展性远超方案A。在真实项目中,长期的可维护性往往比初期的开发速度更重要。我们选择方案B。
架构设计与核心实现
我们设计的架构包含四个核心部分:宿主应用(Shell)、信令服务器、共享状态模块,以及各个业务微前端。
graph TD subgraph Browser Client Shell(宿主应用) --> SharedMobX(共享MobX Store) Shell --> MF_Video(视频微前端) Shell --> MF_Board(白板微前端) MF_Video -- 使用/更新 --> SharedMobX MF_Board -- 使用/更新 --> SharedMobX MF_Video -- WebRTC --> MF_Video_Peer(其他客户端的视频微前端) MF_Board -- WebRTC --> MF_Board_Peer(其他客户端的白板微前端) subgraph "WebRTC & Signaling Logic" MF_Video -- WebSocket --> SignalingServer MF_Board -- WebSocket --> SignalingServer end end subgraph Backend SignalingServer(Node.js 信令服务器) Redis(Redis键值存储/PubSub) SignalingServer <--> Redis end style Shell fill:#f9f,stroke:#333,stroke-width:2px style MF_Video fill:#ccf,stroke:#333,stroke-width:2px style MF_Board fill:#ccf,stroke:#333,stroke-width:2px style SharedMobX fill:#9cf,stroke:#333,stroke-width:2px
1. 信令服务器 (Node.js + Redis)
信令服务器不处理任何业务逻辑,它的唯一职责是转发WebRTC信令消息和管理房间成员关系。使用Redis的Pub/Sub模型,可以轻松实现服务器的水平扩展。
signaling-server.js
import { WebSocketServer } from 'ws';
import { createClient } from 'redis';
import { v4 as uuidv4 } from 'uuid';
const PORT = process.env.PORT || 8080;
const wss = new WebSocketServer({ port: PORT });
// 使用两个Redis客户端,一个用于发布,一个用于订阅
// 这是Redis Pub/Sub的推荐实践,因为订阅连接会进入阻塞模式
const publisher = createClient({ url: 'redis://localhost:6379' });
const subscriber = publisher.duplicate();
await publisher.connect();
await subscriber.connect();
// 存储WebSocket连接与用户ID的映射
const clients = new Map();
wss.on('connection', (ws) => {
const userId = uuidv4();
clients.set(userId, ws);
console.log(`Client connected: ${userId}`);
ws.on('message', async (message) => {
try {
const data = JSON.parse(message);
const { type, roomId, payload } = data;
// 核心逻辑:将消息发布到对应房间的Redis channel
// 这样所有订阅了该channel的信令服务器实例都能收到消息
// 并转发给其下的客户端
if (roomId) {
const messageToPublish = JSON.stringify({
senderId: userId,
type,
payload
});
await publisher.publish(roomId, messageToPublish);
}
} catch (error) {
console.error(`Failed to process message: ${message}`, error);
}
});
ws.on('close', () => {
// 当用户断开连接时,也需要通知房间内的其他人
// 实际生产中,需要维护一个userId到roomId的映射
// 这里为了简化,省略了这部分逻辑
clients.delete(userId);
console.log(`Client disconnected: ${userId}`);
// 可以在这里发布一个 'user-left' 事件到相关的房间channel
});
ws.on('error', (err) => {
console.error(`WebSocket error for user ${userId}:`, err);
});
});
// 订阅逻辑:处理来自Redis的消息并转发给客户端
async function setupSubscription() {
// 这里的订阅模式可以更精细,例如使用 psubscribe('room:*')
// 但为了演示,我们假设客户端在加入房间时会发送一个'join'消息
// 服务器根据这个消息来决定订阅哪个channel
// 下面的代码是一个简化的广播模型
// 实际项目中,不能直接订阅所有消息。
// 服务器应该动态维护一个 `Map<roomId, Set<WebSocket>>` 的映射
// 当收到Redis消息时,根据roomId找到对应的WebSocket连接集合并转发
// 这里为了演示Pub/Sub机制,我们做一个伪实现
// 伪代码: 当客户端发送 `{"type": "join", "roomId": "room-123"}`
// server.joinRoom(userId, roomId);
// 在 joinRoom 内部,如果这个roomId是首次被此实例处理,则 subscriber.subscribe(roomId, listener);
// 监听所有消息的示例 (不推荐用于生产)
// subscriber.subscribe('__all__', (message, channel) => { ... });
console.log(`Signaling server listening on port ${PORT}`);
}
// 模拟一个房间的订阅
// 在真实应用中,这个订阅应该在有客户端加入该房间时才动态创建
const exampleRoomId = 'diagnostics-room-1';
subscriber.subscribe(exampleRoomId, (message) => {
try {
const { senderId, type, payload } = JSON.parse(message);
console.log(`Received from Redis on channel ${exampleRoomId}:`, { senderId, type });
// 将消息广播给所有连接到此服务器实例且在该房间的客户端
// 这里需要一个从roomId到clients的映射,再次简化
clients.forEach((ws, userId) => {
if (userId !== senderId && ws.readyState === ws.OPEN) {
// 不把消息发回给发送者
ws.send(JSON.stringify({ type, payload, senderId }));
}
});
} catch(err) {
console.error('Error processing redis message', err);
}
});
setupSubscription().catch(console.error);
// 这是一个简化的模型。生产级服务器需要处理:
// 1. 更健壮的房间管理,比如用Redis Hash存储房间成员。
// 2. 心跳检测,清理死连接。
// 3. 错误处理和重连逻辑。
// 4. 认证与授权。
2. 共享状态模块 (MobX)
这个模块定义了跨微前端共享的状态结构。它会被宿主应用实例化。
shared-state/sessionStore.js
import { makeAutoObservable, observable, action, computed } from 'mobx';
class PeerState {
id = null;
connection = null; // RTCPeerConnection instance
dataChannel = null; // RTCDataChannel instance
connectionState = 'new'; // 'new', 'connecting', 'connected', 'disconnected', 'failed', 'closed'
mediaStream = null;
constructor(id) {
makeAutoObservable(this);
this.id = id;
}
@action setConnection(connection) {
this.connection = connection;
}
@action setDataChannel(dataChannel) {
this.dataChannel = dataChannel;
}
@action setConnectionState(state) {
console.log(`Peer ${this.id} connection state changed to: ${state}`);
this.connectionState = state;
}
@action setMediaStream(stream) {
this.mediaStream = stream;
}
}
export class SessionStore {
roomId = null;
userId = null;
peers = observable.map(); // Map<userId, PeerState>
isSignalingConnected = false;
// 业务相关的共享状态
cursorPositions = observable.map(); // Map<userId, {x: number, y: number}>
constructor() {
// makeAutoObservable 会自动将属性变为 observable, 方法变为 action, getter 变为 computed
makeAutoObservable(this);
}
@action setSessionInfo(roomId, userId) {
this.roomId = roomId;
this.userId = userId;
}
@action setSignalingConnected(isConnected) {
this.isSignalingConnected = isConnected;
}
@action addPeer(userId, connection) {
if (!this.peers.has(userId)) {
const peerState = new PeerState(userId);
peerState.setConnection(connection);
this.peers.set(userId, peerState);
return peerState;
}
return this.peers.get(userId);
}
@action removePeer(userId) {
const peer = this.peers.get(userId);
if (peer) {
peer.connection?.close();
this.peers.delete(userId);
this.cursorPositions.delete(userId);
}
}
getPeer(userId) {
return this.peers.get(userId);
}
@action updateCursorPosition(userId, position) {
// 这是一个业务状态更新的例子
this.cursorPositions.set(userId, position);
}
@computed get connectedPeers() {
return Array.from(this.peers.values()).filter(p => p.connectionState === 'connected');
}
@action broadcastToAllPeers(message) {
// 通过DataChannel广播消息
const messageString = JSON.stringify(message);
for (const peer of this.connectedPeers) {
if (peer.dataChannel && peer.dataChannel.readyState === 'open') {
try {
peer.dataChannel.send(messageString);
} catch(error) {
console.error(`Failed to send message to peer ${peer.id}:`, error);
}
}
}
}
}
3. WebRTC封装与微前端集成
创建一个WebRTC控制器,它将与信令服务器和MobX store交互。
webrtc-controller/WebRTCController.js
import { runInAction } from 'mobx';
const ICE_SERVERS = [
{ urls: 'stun:stun.l.google.com:19302' },
// 在生产环境中,必须配置TURN服务器以应对NAT穿透失败
// {
// urls: 'turn:your.turn.server:3478',
// username: 'user',
// credential: 'password'
// }
];
export class WebRTCController {
constructor(sessionStore, signalingSocket) {
this.store = sessionStore;
this.socket = signalingSocket;
this.localStream = null;
// 绑定this上下文,确保在事件回调中this指向正确
this.handleSignalingMessage = this.handleSignalingMessage.bind(this);
}
async initialize(localStream) {
this.localStream = localStream;
this.socket.addEventListener('message', this.handleSignalingMessage);
// 加入房间
this.sendMessage('join-room', { roomId: this.store.roomId });
}
sendMessage(type, payload) {
if (this.socket.readyState !== this.socket.OPEN) {
console.error('Signaling socket is not open.');
return;
}
this.socket.send(JSON.stringify({
type,
roomId: this.store.roomId,
payload
}));
}
async handleSignalingMessage(event) {
try {
const { type, payload, senderId } = JSON.parse(event.data);
if (senderId === this.store.userId) return; // 忽略自己发出的消息
switch (type) {
// 当一个新用户加入时,由房间内的“老”用户发起连接
case 'user-joined':
await this.createAndSendOffer(senderId);
break;
case 'offer':
await this.handleOffer(senderId, payload.sdp);
break;
case 'answer':
await this.handleAnswer(senderId, payload.sdp);
break;
case 'ice-candidate':
await this.handleNewICECandidate(senderId, payload.candidate);
break;
case 'user-left':
this.store.removePeer(senderId);
break;
default:
console.warn(`Unknown signaling message type: ${type}`);
}
} catch (error) {
console.error('Error handling signaling message:', error);
}
}
async createPeerConnection(peerId) {
const pc = new RTCPeerConnection({ iceServers: ICE_SERVERS });
const peerState = this.store.addPeer(peerId, pc);
pc.onicecandidate = (event) => {
if (event.candidate) {
this.sendMessage('ice-candidate', { to: peerId, candidate: event.candidate });
}
};
pc.onconnectionstatechange = () => {
runInAction(() => {
peerState.setConnectionState(pc.connectionState);
});
if (pc.connectionState === 'failed' || pc.connectionState === 'disconnected') {
// 实现重连逻辑
console.warn(`Connection to ${peerId} failed/disconnected. Consider ICE restart.`);
}
};
pc.ondatachannel = (event) => {
const dataChannel = event.channel;
console.log(`Data channel received from ${peerId}`);
this.setupDataChannel(peerId, dataChannel);
};
pc.ontrack = (event) => {
console.log(`Track received from ${peerId}`);
if(event.streams && event.streams[0]) {
runInAction(() => {
peerState.setMediaStream(event.streams[0]);
});
}
};
if (this.localStream) {
this.localStream.getTracks().forEach(track => {
pc.addTrack(track, this.localStream);
});
}
return pc;
}
setupDataChannel(peerId, dataChannel) {
const peerState = this.store.getPeer(peerId);
if (!peerState) return;
runInAction(() => {
peerState.setDataChannel(dataChannel);
});
dataChannel.onopen = () => console.log(`Data channel to ${peerId} is open.`);
dataChannel.onclose = () => console.log(`Data channel to ${peerId} is closed.`);
dataChannel.onerror = (error) => console.error(`Data channel error with ${peerId}:`, error);
dataChannel.onmessage = (event) => {
const message = JSON.parse(event.data);
// 这里是处理对等端发来的业务状态的地方
// 例如:光标移动
if (message.type === 'cursor-move') {
this.store.updateCursorPosition(peerId, message.payload);
}
};
}
async createAndSendOffer(peerId) {
const pc = await this.createPeerConnection(peerId);
// 为发起方创建Data Channel
const dataChannel = pc.createDataChannel('main-channel');
this.setupDataChannel(peerId, dataChannel);
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
this.sendMessage('offer', { to: peerId, sdp: pc.localDescription });
}
async handleOffer(peerId, sdp) {
const pc = await this.createPeerConnection(peerId);
await pc.setRemoteDescription(new RTCSessionDescription({ type: 'offer', sdp }));
const answer = await pc.createAnswer();
await pc.setLocalDescription(answer);
this.sendMessage('answer', { to: peerId, sdp: pc.localDescription });
}
async handleAnswer(peerId, sdp) {
const peer = this.store.getPeer(peerId);
if (peer && peer.connection) {
await peer.connection.setRemoteDescription(new RTCSessionDescription({ type: 'answer', sdp }));
}
}
async handleNewICECandidate(peerId, candidate) {
const peer = this.store.getPeer(peerId);
if (peer && peer.connection) {
try {
await peer.connection.addIceCandidate(new RTCIceCandidate(candidate));
} catch (e) {
console.error('Error adding received ice candidate', e);
}
}
}
destroy() {
this.socket.removeEventListener('message', this.handleSignalingMessage);
this.store.peers.forEach(peer => peer.connection?.close());
this.localStream?.getTracks().forEach(track => track.stop());
}
}
4. 微前端实现 (React + MobX)
这是一个视频微前端的例子,它使用WebRTCController
并响应SessionStore
的变化。
VideoMicrofrontend.jsx
import React, { useEffect, useRef, useState } from 'react';
import { observer } from 'mobx-react-lite';
import { SessionStore } from '../shared-state/sessionStore';
import { WebRTCController } from '../webrtc-controller/WebRTCController';
// 假设 sessionStore 和 signalingSocket 由宿主应用通过props传入
const VideoMicrofrontend = observer(({ sessionStore, signalingSocket }) => {
const [controller, setController] = useState(null);
const localVideoRef = useRef(null);
const peers = Array.from(sessionStore.peers.values());
useEffect(() => {
let webrtcController;
const init = async () => {
try {
const stream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true });
if (localVideoRef.current) {
localVideoRef.current.srcObject = stream;
}
webrtcController = new WebRTCController(sessionStore, signalingSocket);
webrtcController.initialize(stream);
setController(webrtcController);
} catch (err) {
console.error("Failed to get local media stream", err);
// 应该在这里向用户显示错误信息
}
};
if (signalingSocket && signalingSocket.readyState === WebSocket.OPEN) {
init();
}
return () => {
// 组件卸载时清理资源
webrtcController?.destroy();
};
}, [sessionStore, signalingSocket]);
// 这是一个业务逻辑的例子:广播光标位置
useEffect(() => {
const handleMouseMove = (event) => {
const payload = { x: event.clientX, y: event.clientY };
sessionStore.updateCursorPosition(sessionStore.userId, payload);
sessionStore.broadcastToAllPeers({
type: 'cursor-move',
payload
});
};
window.addEventListener('mousemove', handleMouseMove);
return () => window.removeEventListener('mousemove', handleMouseMove);
}, [sessionStore]);
return (
<div>
<h3>My Video</h3>
<video ref={localVideoRef} autoPlay muted playsInline style={{ width: '200px' }} />
<hr />
<h3>Peers</h3>
<div style={{ display: 'flex', flexWrap: 'wrap' }}>
{peers.map(peer => (
<PeerVideo key={peer.id} peer={peer} />
))}
</div>
</div>
);
});
// 单独的Peer Video组件,以便于响应式更新
const PeerVideo = observer(({ peer }) => {
const videoRef = useRef(null);
useEffect(() => {
if (videoRef.current && peer.mediaStream) {
videoRef.current.srcObject = peer.mediaStream;
}
}, [peer.mediaStream]);
return (
<div style={{ margin: '10px', border: '1px solid #ccc', padding: '5px' }}>
<p>Peer ID: {peer.id}</p>
<p>State: {peer.connectionState}</p>
<video ref={videoRef} autoPlay playsInline style={{ width: '200px' }} />
</div>
);
});
export default VideoMicrofrontend;
架构的局限性与未来迭代方向
当前这套P2P(Mesh)架构在参与者较少(< 8人)的场景下表现优异,因为它将媒体和数据的流量负载分散到了每个客户端。然而,它也存在明显的局限性。
首先,连接复杂性。在Mesh网络中,每个客户端都需要与其他所有客户端建立连接。N个参与者需要N * (N - 1) / 2
个连接,这会导致客户端的上行带宽和CPU资源随着人数增加而急剧消耗。
其次,NAT穿透问题。尽管STUN服务器能解决大部分NAT问题,但在复杂的企业网络或对称NAT背后,必须依赖成本更高的TURN服务器来中继流量。这在一定程度上削弱了P2P的成本优势。
未来的优化路径可以探索选择性转发单元(SFU)架构。在SFU模式下,每个客户端只需与中心的SFU服务器建立一个上行和一个下行连接。客户端将自己的媒体流发送给SFU,SFU再将其转发给房间内的其他所有客户端。这种架构极大地降低了客户端的负担,能够支持数百甚至数千人规模的实时会话,但代价是需要维护一组高性能的SFU服务器集群,这对服务端的运维能力提出了更高要求。对于状态同步,SFU也可以作为DataChannel消息的中央转发节点,解决新加入者获取历史状态的问题。