基于WebRTC与NoSQL构建微前端架构下的实时状态同步引擎


我们面临的挑战是构建一个大型、多团队协作的在线诊断平台。平台由多个独立的微前端(如视频通话模块、实时遥测图表模块、共享白板模块)组成,它们需要共享并同步一个统一的实时会话状态。这个状态不仅包含谁在线,还包括每个用户的光标位置、正在交互的组件ID,以及WebRTC通话的信令数据。系统的核心要求是低延迟、高可扩展性,并保证微前端之间的技术隔离。

方案权衡:中心化WebSocket vs. 去中心化WebRTC DataChannel

一个直接的方案是使用一个中心化的WebSocket服务器,结合一个全局的Redux或Vuex store。所有微前端通过一个事件总线与这个全局store通信,WebSocket服务器负责广播状态变化。

方案A: 中心化WebSocket + 全局Store

  • 优势:
    • 实现直观,业务逻辑集中。
    • 状态变更的来源单一,易于调试。
  • 劣势:
    • 强耦合: 所有微前端都必须依赖同一个全局store的结构,任何一个模块对store的修改都可能影响其他模块,违背了微前端的独立性原则。
    • 性能瓶颈: 所有状态更新,无论大小,都经过中心服务器转发。在高频交互场景(如光标移动),中心服务器和客户端的单一WebSocket连接会成为瓶颈。
    • 扩展性差: 服务端需要维护所有房间的所有状态,随着用户和房间数增加,内存和CPU压力剧增。

方案B: WebRTC DataChannel + NoSQL信令 + 共享MobX实例

这个方案将状态同步的职责进行拆分。

  1. WebRTC DataChannel: 用于客户端之间高频、低延迟的状态同步,例如光标位置、绘图操作。数据直接在对等端之间传输,不经过中心服务器。
  2. NoSQL (Redis) + WebSocket信令服务器: 用于处理WebRTC的连接建立(SDP、ICE Candidate交换)和低频的全局状态同步(用户加入/离开房间)。Redis的Pub/Sub能力天然适合做信令广播,并且性能极高。
  3. 共享的MobX实例: 在微前端的宿主应用(Shell)中创建一个MobX store实例,并通过props或依赖注入的方式传递给每个微前端。这个store负责管理会话的“元数据”(如房间成员列表),并提供响应式API。微前端内部的状态变化可以直接通过DataChannel广播,接收到的对端数据再更新到这个共享的MobX store中,从而驱动UI更新。

决策:

对于一个要求长期演进和团队隔离的大型平台,方案B尽管初始复杂度更高,但其解耦能力、性能优势和可扩展性远超方案A。在真实项目中,长期的可维护性往往比初期的开发速度更重要。我们选择方案B。

架构设计与核心实现

我们设计的架构包含四个核心部分:宿主应用(Shell)、信令服务器、共享状态模块,以及各个业务微前端。

graph TD
    subgraph Browser Client
        Shell(宿主应用) --> SharedMobX(共享MobX Store)
        Shell --> MF_Video(视频微前端)
        Shell --> MF_Board(白板微前端)
        
        MF_Video -- 使用/更新 --> SharedMobX
        MF_Board -- 使用/更新 --> SharedMobX

        MF_Video -- WebRTC --> MF_Video_Peer(其他客户端的视频微前端)
        MF_Board -- WebRTC --> MF_Board_Peer(其他客户端的白板微前端)
        
        subgraph "WebRTC & Signaling Logic"
            MF_Video -- WebSocket --> SignalingServer
            MF_Board -- WebSocket --> SignalingServer
        end
    end

    subgraph Backend
        SignalingServer(Node.js 信令服务器)
        Redis(Redis键值存储/PubSub)
        SignalingServer <--> Redis
    end

    style Shell fill:#f9f,stroke:#333,stroke-width:2px
    style MF_Video fill:#ccf,stroke:#333,stroke-width:2px
    style MF_Board fill:#ccf,stroke:#333,stroke-width:2px
    style SharedMobX fill:#9cf,stroke:#333,stroke-width:2px

1. 信令服务器 (Node.js + Redis)

信令服务器不处理任何业务逻辑,它的唯一职责是转发WebRTC信令消息和管理房间成员关系。使用Redis的Pub/Sub模型,可以轻松实现服务器的水平扩展。

signaling-server.js

import { WebSocketServer } from 'ws';
import { createClient } from 'redis';
import { v4 as uuidv4 } from 'uuid';

const PORT = process.env.PORT || 8080;
const wss = new WebSocketServer({ port: PORT });

// 使用两个Redis客户端,一个用于发布,一个用于订阅
// 这是Redis Pub/Sub的推荐实践,因为订阅连接会进入阻塞模式
const publisher = createClient({ url: 'redis://localhost:6379' });
const subscriber = publisher.duplicate();

await publisher.connect();
await subscriber.connect();

// 存储WebSocket连接与用户ID的映射
const clients = new Map();

wss.on('connection', (ws) => {
    const userId = uuidv4();
    clients.set(userId, ws);
    console.log(`Client connected: ${userId}`);

    ws.on('message', async (message) => {
        try {
            const data = JSON.parse(message);
            const { type, roomId, payload } = data;

            // 核心逻辑:将消息发布到对应房间的Redis channel
            // 这样所有订阅了该channel的信令服务器实例都能收到消息
            // 并转发给其下的客户端
            if (roomId) {
                const messageToPublish = JSON.stringify({
                    senderId: userId,
                    type,
                    payload
                });
                await publisher.publish(roomId, messageToPublish);
            }
        } catch (error) {
            console.error(`Failed to process message: ${message}`, error);
        }
    });

    ws.on('close', () => {
        // 当用户断开连接时,也需要通知房间内的其他人
        // 实际生产中,需要维护一个userId到roomId的映射
        // 这里为了简化,省略了这部分逻辑
        clients.delete(userId);
        console.log(`Client disconnected: ${userId}`);
        // 可以在这里发布一个 'user-left' 事件到相关的房间channel
    });
    
    ws.on('error', (err) => {
        console.error(`WebSocket error for user ${userId}:`, err);
    });
});

// 订阅逻辑:处理来自Redis的消息并转发给客户端
async function setupSubscription() {
    // 这里的订阅模式可以更精细,例如使用 psubscribe('room:*')
    // 但为了演示,我们假设客户端在加入房间时会发送一个'join'消息
    // 服务器根据这个消息来决定订阅哪个channel
    // 下面的代码是一个简化的广播模型
    
    // 实际项目中,不能直接订阅所有消息。
    // 服务器应该动态维护一个 `Map<roomId, Set<WebSocket>>` 的映射
    // 当收到Redis消息时,根据roomId找到对应的WebSocket连接集合并转发
    // 这里为了演示Pub/Sub机制,我们做一个伪实现
    
    // 伪代码: 当客户端发送 `{"type": "join", "roomId": "room-123"}`
    // server.joinRoom(userId, roomId);
    // 在 joinRoom 内部,如果这个roomId是首次被此实例处理,则 subscriber.subscribe(roomId, listener);
    
    // 监听所有消息的示例 (不推荐用于生产)
    // subscriber.subscribe('__all__', (message, channel) => { ... });
    
    console.log(`Signaling server listening on port ${PORT}`);
}

// 模拟一个房间的订阅
// 在真实应用中,这个订阅应该在有客户端加入该房间时才动态创建
const exampleRoomId = 'diagnostics-room-1';
subscriber.subscribe(exampleRoomId, (message) => {
    try {
        const { senderId, type, payload } = JSON.parse(message);
        console.log(`Received from Redis on channel ${exampleRoomId}:`, { senderId, type });

        // 将消息广播给所有连接到此服务器实例且在该房间的客户端
        // 这里需要一个从roomId到clients的映射,再次简化
        clients.forEach((ws, userId) => {
            if (userId !== senderId && ws.readyState === ws.OPEN) {
                // 不把消息发回给发送者
                ws.send(JSON.stringify({ type, payload, senderId }));
            }
        });
    } catch(err) {
        console.error('Error processing redis message', err);
    }
});


setupSubscription().catch(console.error);

// 这是一个简化的模型。生产级服务器需要处理:
// 1. 更健壮的房间管理,比如用Redis Hash存储房间成员。
// 2. 心跳检测,清理死连接。
// 3. 错误处理和重连逻辑。
// 4. 认证与授权。

2. 共享状态模块 (MobX)

这个模块定义了跨微前端共享的状态结构。它会被宿主应用实例化。

shared-state/sessionStore.js

import { makeAutoObservable, observable, action, computed } from 'mobx';

class PeerState {
    id = null;
    connection = null; // RTCPeerConnection instance
    dataChannel = null; // RTCDataChannel instance
    connectionState = 'new'; // 'new', 'connecting', 'connected', 'disconnected', 'failed', 'closed'
    mediaStream = null;

    constructor(id) {
        makeAutoObservable(this);
        this.id = id;
    }

    @action setConnection(connection) {
        this.connection = connection;
    }

    @action setDataChannel(dataChannel) {
        this.dataChannel = dataChannel;
    }
    
    @action setConnectionState(state) {
        console.log(`Peer ${this.id} connection state changed to: ${state}`);
        this.connectionState = state;
    }

    @action setMediaStream(stream) {
        this.mediaStream = stream;
    }
}

export class SessionStore {
    roomId = null;
    userId = null;
    peers = observable.map(); // Map<userId, PeerState>
    isSignalingConnected = false;
    
    // 业务相关的共享状态
    cursorPositions = observable.map(); // Map<userId, {x: number, y: number}>

    constructor() {
        // makeAutoObservable 会自动将属性变为 observable, 方法变为 action, getter 变为 computed
        makeAutoObservable(this);
    }

    @action setSessionInfo(roomId, userId) {
        this.roomId = roomId;
        this.userId = userId;
    }

    @action setSignalingConnected(isConnected) {
        this.isSignalingConnected = isConnected;
    }
    
    @action addPeer(userId, connection) {
        if (!this.peers.has(userId)) {
            const peerState = new PeerState(userId);
            peerState.setConnection(connection);
            this.peers.set(userId, peerState);
            return peerState;
        }
        return this.peers.get(userId);
    }

    @action removePeer(userId) {
        const peer = this.peers.get(userId);
        if (peer) {
            peer.connection?.close();
            this.peers.delete(userId);
            this.cursorPositions.delete(userId);
        }
    }

    getPeer(userId) {
        return this.peers.get(userId);
    }
    
    @action updateCursorPosition(userId, position) {
        // 这是一个业务状态更新的例子
        this.cursorPositions.set(userId, position);
    }

    @computed get connectedPeers() {
        return Array.from(this.peers.values()).filter(p => p.connectionState === 'connected');
    }

    @action broadcastToAllPeers(message) {
        // 通过DataChannel广播消息
        const messageString = JSON.stringify(message);
        for (const peer of this.connectedPeers) {
            if (peer.dataChannel && peer.dataChannel.readyState === 'open') {
                try {
                    peer.dataChannel.send(messageString);
                } catch(error) {
                    console.error(`Failed to send message to peer ${peer.id}:`, error);
                }
            }
        }
    }
}

3. WebRTC封装与微前端集成

创建一个WebRTC控制器,它将与信令服务器和MobX store交互。

webrtc-controller/WebRTCController.js

import { runInAction } from 'mobx';

const ICE_SERVERS = [
    { urls: 'stun:stun.l.google.com:19302' },
    // 在生产环境中,必须配置TURN服务器以应对NAT穿透失败
    // { 
    //   urls: 'turn:your.turn.server:3478',
    //   username: 'user',
    //   credential: 'password'
    // }
];

export class WebRTCController {
    constructor(sessionStore, signalingSocket) {
        this.store = sessionStore;
        this.socket = signalingSocket;
        this.localStream = null;

        // 绑定this上下文,确保在事件回调中this指向正确
        this.handleSignalingMessage = this.handleSignalingMessage.bind(this);
    }

    async initialize(localStream) {
        this.localStream = localStream;
        this.socket.addEventListener('message', this.handleSignalingMessage);
        
        // 加入房间
        this.sendMessage('join-room', { roomId: this.store.roomId });
    }

    sendMessage(type, payload) {
        if (this.socket.readyState !== this.socket.OPEN) {
            console.error('Signaling socket is not open.');
            return;
        }
        this.socket.send(JSON.stringify({
            type,
            roomId: this.store.roomId,
            payload
        }));
    }

    async handleSignalingMessage(event) {
        try {
            const { type, payload, senderId } = JSON.parse(event.data);
            if (senderId === this.store.userId) return; // 忽略自己发出的消息

            switch (type) {
                // 当一个新用户加入时,由房间内的“老”用户发起连接
                case 'user-joined':
                    await this.createAndSendOffer(senderId);
                    break;
                case 'offer':
                    await this.handleOffer(senderId, payload.sdp);
                    break;
                case 'answer':
                    await this.handleAnswer(senderId, payload.sdp);
                    break;
                case 'ice-candidate':
                    await this.handleNewICECandidate(senderId, payload.candidate);
                    break;
                case 'user-left':
                    this.store.removePeer(senderId);
                    break;
                default:
                    console.warn(`Unknown signaling message type: ${type}`);
            }
        } catch (error) {
            console.error('Error handling signaling message:', error);
        }
    }
    
    async createPeerConnection(peerId) {
        const pc = new RTCPeerConnection({ iceServers: ICE_SERVERS });
        const peerState = this.store.addPeer(peerId, pc);

        pc.onicecandidate = (event) => {
            if (event.candidate) {
                this.sendMessage('ice-candidate', { to: peerId, candidate: event.candidate });
            }
        };

        pc.onconnectionstatechange = () => {
            runInAction(() => {
                peerState.setConnectionState(pc.connectionState);
            });
             if (pc.connectionState === 'failed' || pc.connectionState === 'disconnected') {
                // 实现重连逻辑
                console.warn(`Connection to ${peerId} failed/disconnected. Consider ICE restart.`);
            }
        };

        pc.ondatachannel = (event) => {
            const dataChannel = event.channel;
            console.log(`Data channel received from ${peerId}`);
            this.setupDataChannel(peerId, dataChannel);
        };
        
        pc.ontrack = (event) => {
            console.log(`Track received from ${peerId}`);
            if(event.streams && event.streams[0]) {
                 runInAction(() => {
                    peerState.setMediaStream(event.streams[0]);
                });
            }
        };
        
        if (this.localStream) {
            this.localStream.getTracks().forEach(track => {
                pc.addTrack(track, this.localStream);
            });
        }

        return pc;
    }
    
    setupDataChannel(peerId, dataChannel) {
        const peerState = this.store.getPeer(peerId);
        if (!peerState) return;
        
        runInAction(() => {
            peerState.setDataChannel(dataChannel);
        });

        dataChannel.onopen = () => console.log(`Data channel to ${peerId} is open.`);
        dataChannel.onclose = () => console.log(`Data channel to ${peerId} is closed.`);
        dataChannel.onerror = (error) => console.error(`Data channel error with ${peerId}:`, error);

        dataChannel.onmessage = (event) => {
            const message = JSON.parse(event.data);
            // 这里是处理对等端发来的业务状态的地方
            // 例如:光标移动
            if (message.type === 'cursor-move') {
                this.store.updateCursorPosition(peerId, message.payload);
            }
        };
    }

    async createAndSendOffer(peerId) {
        const pc = await this.createPeerConnection(peerId);
        
        // 为发起方创建Data Channel
        const dataChannel = pc.createDataChannel('main-channel');
        this.setupDataChannel(peerId, dataChannel);

        const offer = await pc.createOffer();
        await pc.setLocalDescription(offer);

        this.sendMessage('offer', { to: peerId, sdp: pc.localDescription });
    }

    async handleOffer(peerId, sdp) {
        const pc = await this.createPeerConnection(peerId);
        await pc.setRemoteDescription(new RTCSessionDescription({ type: 'offer', sdp }));

        const answer = await pc.createAnswer();
        await pc.setLocalDescription(answer);

        this.sendMessage('answer', { to: peerId, sdp: pc.localDescription });
    }

    async handleAnswer(peerId, sdp) {
        const peer = this.store.getPeer(peerId);
        if (peer && peer.connection) {
            await peer.connection.setRemoteDescription(new RTCSessionDescription({ type: 'answer', sdp }));
        }
    }

    async handleNewICECandidate(peerId, candidate) {
        const peer = this.store.getPeer(peerId);
        if (peer && peer.connection) {
            try {
                await peer.connection.addIceCandidate(new RTCIceCandidate(candidate));
            } catch (e) {
                console.error('Error adding received ice candidate', e);
            }
        }
    }

    destroy() {
        this.socket.removeEventListener('message', this.handleSignalingMessage);
        this.store.peers.forEach(peer => peer.connection?.close());
        this.localStream?.getTracks().forEach(track => track.stop());
    }
}

4. 微前端实现 (React + MobX)

这是一个视频微前端的例子,它使用WebRTCController并响应SessionStore的变化。

VideoMicrofrontend.jsx

import React, { useEffect, useRef, useState } from 'react';
import { observer } from 'mobx-react-lite';
import { SessionStore } from '../shared-state/sessionStore';
import { WebRTCController } from '../webrtc-controller/WebRTCController';

// 假设 sessionStore 和 signalingSocket 由宿主应用通过props传入
const VideoMicrofrontend = observer(({ sessionStore, signalingSocket }) => {
    const [controller, setController] = useState(null);
    const localVideoRef = useRef(null);
    const peers = Array.from(sessionStore.peers.values());

    useEffect(() => {
        let webrtcController;
        const init = async () => {
            try {
                const stream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true });
                if (localVideoRef.current) {
                    localVideoRef.current.srcObject = stream;
                }
                
                webrtcController = new WebRTCController(sessionStore, signalingSocket);
                webrtcController.initialize(stream);
                setController(webrtcController);

            } catch (err) {
                console.error("Failed to get local media stream", err);
                // 应该在这里向用户显示错误信息
            }
        };

        if (signalingSocket && signalingSocket.readyState === WebSocket.OPEN) {
            init();
        }

        return () => {
            // 组件卸载时清理资源
            webrtcController?.destroy();
        };
    }, [sessionStore, signalingSocket]);

    // 这是一个业务逻辑的例子:广播光标位置
    useEffect(() => {
        const handleMouseMove = (event) => {
            const payload = { x: event.clientX, y: event.clientY };
            sessionStore.updateCursorPosition(sessionStore.userId, payload);
            sessionStore.broadcastToAllPeers({
                type: 'cursor-move',
                payload
            });
        };
        window.addEventListener('mousemove', handleMouseMove);
        return () => window.removeEventListener('mousemove', handleMouseMove);
    }, [sessionStore]);


    return (
        <div>
            <h3>My Video</h3>
            <video ref={localVideoRef} autoPlay muted playsInline style={{ width: '200px' }} />
            <hr />
            <h3>Peers</h3>
            <div style={{ display: 'flex', flexWrap: 'wrap' }}>
                {peers.map(peer => (
                    <PeerVideo key={peer.id} peer={peer} />
                ))}
            </div>
        </div>
    );
});

// 单独的Peer Video组件,以便于响应式更新
const PeerVideo = observer(({ peer }) => {
    const videoRef = useRef(null);

    useEffect(() => {
        if (videoRef.current && peer.mediaStream) {
            videoRef.current.srcObject = peer.mediaStream;
        }
    }, [peer.mediaStream]);

    return (
        <div style={{ margin: '10px', border: '1px solid #ccc', padding: '5px' }}>
            <p>Peer ID: {peer.id}</p>
            <p>State: {peer.connectionState}</p>
            <video ref={videoRef} autoPlay playsInline style={{ width: '200px' }} />
        </div>
    );
});

export default VideoMicrofrontend;

架构的局限性与未来迭代方向

当前这套P2P(Mesh)架构在参与者较少(< 8人)的场景下表现优异,因为它将媒体和数据的流量负载分散到了每个客户端。然而,它也存在明显的局限性。

首先,连接复杂性。在Mesh网络中,每个客户端都需要与其他所有客户端建立连接。N个参与者需要N * (N - 1) / 2个连接,这会导致客户端的上行带宽和CPU资源随着人数增加而急剧消耗。

其次,NAT穿透问题。尽管STUN服务器能解决大部分NAT问题,但在复杂的企业网络或对称NAT背后,必须依赖成本更高的TURN服务器来中继流量。这在一定程度上削弱了P2P的成本优势。

未来的优化路径可以探索选择性转发单元(SFU)架构。在SFU模式下,每个客户端只需与中心的SFU服务器建立一个上行和一个下行连接。客户端将自己的媒体流发送给SFU,SFU再将其转发给房间内的其他所有客户端。这种架构极大地降低了客户端的负担,能够支持数百甚至数千人规模的实时会话,但代价是需要维护一组高性能的SFU服务器集群,这对服务端的运维能力提出了更高要求。对于状态同步,SFU也可以作为DataChannel消息的中央转发节点,解决新加入者获取历史状态的问题。


  目录