构建基于 ActiveMQ 的持久化微前端事件总线与状态同步机制


我们团队维护一个复杂的内部运营平台,其前端架构在三年前就演进到了微前端(Micro-frontend)。最初,各个微应用间的通信依赖于 Custom Events 和一个共享的全局状态库,这种方案在系统规模尚小时尚可应付。但随着业务复杂度的指数级增长,问题开始集中爆发:跨应用的状态同步逻辑变得脆弱不堪,任何一个微应用的异常状态都可能污染全局,导致连锁崩溃。更致命的是,刷新页面后,所有临时状态全部丢失,用户需要重新执行一系列操作才能恢复工作上下文,而排查问题时,我们完全无法追溯用户的操作序列,因为它们只是浏览器内存中瞬时生灭的事件。

痛点已经明确:我们需要一种能够跨越页面生命周期、可追溯、且具备工业级解耦能力的通信机制。与其在前端领域修修补补,我们决定换一个思路——将后端成熟的事件驱动架构(EDA)思想引入前端,构建一个持久化的微前端事件总线。

初步构想与技术选型决策

构想的核心是将前端UI事件视为与后端业务事件同等重要的一等公民。每个有意义的用户操作(例如:触发一次构建、更新一项配置、启动一个任务)不再是简单的函数调用,而是被封装成一个事件对象,发送到一个中心化的、持久化的事件总线。其他微应用或后端服务可以订阅这些事件,并据此更新自身状态或执行相应逻辑。

这个构想的架构图如下:

graph TD
    subgraph Browser
        MFE_A[微应用 A]
        MFE_B[微应用 B]
        WS_Client[WebSocket Client]
        MFE_A -- publish event --> WS_Client
        MFE_B -- publish event --> WS_Client
        WS_Client -- push updates --> MFE_A
        WS_Client -- push updates --> MFE_B
    end

    subgraph Backend Infrastructure
        Gateway[WebSocket Gateway]
        MQ[ActiveMQ Broker]
        PersistenceSvc[事件持久化服务]
        StateSvc[状态物化服务]
        DB[(PostgreSQL)]
    end

    WS_Client <-->|STOMP over WebSocket| Gateway
    Gateway -- publish --> MQ
    PersistenceSvc -- subscribe --> MQ
    StateSvc -- subscribe --> MQ
    MQ -- push --> Gateway
    PersistenceSvc -- write event log --> DB
    StateSvc -- update materialized view --> DB

技术选型考量:

  1. 消息队列 (MQ) - ActiveMQ: 为什么选择 ActiveMQ 而不是更流行的 RabbitMQ 或 Kafka?在真实项目中,技术选型往往不只是看性能指标。我们的运维团队对 ActiveMQ Artemis 有着超过五年的生产环境维护经验,相关的监控、告警、灾备方案都非常成熟。此外,ActiveMQ 对 STOMP (Simple Text Oriented Messaging Protocol) 协议的良好支持,使其能非常方便地通过 WebSocket 与浏览器端直接集成。我们需要的是它的持久化主题(Durable Topics)和消息可靠性保证,确保前端发出的每一个“指令”都不会丢失。

  2. ORM - Prisma: 事件溯源(Event Sourcing)是这个架构的自然选择。我们会记录所有事件(写模型),并根据事件流计算出当前系统的最新状态(读模型)。这意味着我们需要频繁地写入事件日志,并构建物化视图。直接编写 SQL 来处理这两种模型非常繁琐且容易出错。Prisma 以其强大的类型安全和简洁的 API,极大地简化了数据库操作,特别是对于 TypeScript 项目,它能提供从数据库 Schema 到应用代码的端到端类型校验。

  3. 前端构建工具 - Turbopack: 我们的微前端项目采用 Monorepo 结构管理。随着微应用数量增至两位数,传统的 Webpack 构建和本地开发服务器启动时间已经严重影响了开发效率。我们评估了 Vite 和 Turbopack,最终选择了 Turbopack。它基于 Rust 构建,其增量计算引擎在大型 Monorepo 项目中表现出的近乎瞬时的 HMR (Hot Module Replacement) 和极快的冷启动速度,是决定性的优势。这纯粹是一个为了提升开发者体验(DX)的选择。

步骤化实现:从后端到前端

1. 数据库模型定义 (Prisma Schema)

首先,我们需要定义事件日志和物化视图的模型。事件日志是不可变(Immutable)的,只增不改。物化视图则是可变的,它代表了系统的当前状态快照。

./packages/db/prisma/schema.prisma

// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema

generator client {
  provider = "prisma-client-js"
  output   = "../src/generated/client"
}

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

// 记录所有前端发来的事件,这是系统的真相来源 (Source of Truth)
model MfeEventLog {
  id        String   @id @default(cuid())
  // 事件的唯一标识符,用于幂等性处理
  eventId   String   @unique
  // 事件类型,例如 'service.restart', 'config.update'
  type      String
  // 事件负载,JSON格式
  payload   Json
  // 事件来源的微应用标识
  source    String
  // 用户标识
  userId    String
  // 事件创建时间戳
  createdAt DateTime @default(now())

  @@index([type])
  @@index([userId, createdAt])
}

// 物化视图:服务的当前状态,这是读模型
// 它由事件驱动更新,用于快速查询
model ServiceStatus {
  id          String    @id
  // 服务名称
  name        String
  // 服务的当前状态: 'RUNNING', 'STOPPED', 'RESTARTING'
  status      String
  // 配置的版本号
  configVersion Int
  // 最近一次操作的事件ID
  lastEventId String
  // 最后更新时间
  updatedAt   DateTime  @updatedAt

  @@index([status])
}

2. 后端核心服务:WebSocket 网关与事件处理器

我们使用 Node.js、Express 和 stomp-broker-js 库来构建 WebSocket 网关。这个网关是前端和 ActiveMQ 之间的桥梁。

./packages/gateway/src/server.ts

import http from 'http';
import express from 'express';
import { StompServer } from 'stomp-broker-js';
import WebSocket from 'ws';
import { logger } from './utils/logger';

const app = express();
const server = http.createServer(app);

// ActiveMQ STOMP 连接配置
// 在生产环境中,这些值应该来自环境变量
const stompConfig = {
    host: 'activemq-broker.internal',
    port: 61613,
    login: 'admin',
    passcode: 'secret_password',
    // 启用重连机制
    reconnect: {
        retries: -1, // 无限重试
        delay: 5000, // 5秒间隔
    },
};

const stompServer = new StompServer({
    server: new WebSocket.Server({ server }),
    brokerURL: `stomp://${stompConfig.host}:${stompConfig.port}`,
    wsHandler: (stompSocket, wsSocket, req) => {
        logger.info({ ip: req.socket.remoteAddress }, 'New WebSocket client connected.');
        // 这里可以添加认证逻辑,例如校验 req 中的 cookie 或 token
    },
    debug: (msg) => {
        // 将 stomp-broker-js 的调试日志接入我们的日志系统
        logger.trace(msg);
    }
});

stompServer.on('error', (err) => {
    logger.error({ error: err }, 'STOMP server error.');
});

stompServer.on('connected', () => {
    logger.info('Gateway successfully connected to ActiveMQ broker.');
});

const PORT = process.env.PORT || 3001;
server.listen(PORT, () => {
    logger.info(`WebSocket Gateway listening on port ${PORT}`);
});

process.on('SIGTERM', () => {
    logger.info('SIGTERM signal received. Closing connections.');
    server.close(() => {
        stompServer.close(() => {
            logger.info('All connections closed. Exiting.');
            process.exit(0);
        });
    });
});

这个网关本身不处理任何业务逻辑,它只负责透明地转发 STOMP 帧。

接下来是两个独立的微服务,它们订阅 ActiveMQ 的主题来处理业务。

./packages/persistence-service/src/index.ts

import { Client } from '@stomp/stompjs';
import { PrismaClient } from '@my-org/db';
import { logger } from './utils/logger';
import { z } from 'zod';

// 使用 Zod 定义事件的 schema,确保消息格式正确
const eventSchema = z.object({
    eventId: z.string().uuid(),
    type: z.string().min(1),
    payload: z.record(z.unknown()),
    source: z.string().min(1),
    userId: z.string().min(1),
});

const prisma = new PrismaClient();
const client = new Client({
    brokerURL: `ws://activemq-broker.internal:61613`, // 注意:这里用 ws 或 stomp 取决于broker配置
    connectHeaders: {
        login: 'admin',
        passcode: 'secret_password',
    },
    reconnectDelay: 5000,
    heartbeatIncoming: 4000,
    heartbeatOutgoing: 4000,
});

client.onConnect = (frame) => {
    logger.info('Persistence service connected to ActiveMQ.');
    // 订阅通配符主题,接收所有微前端事件
    client.subscribe('/topic/mfe.events.>', (message) => {
        try {
            const rawEvent = JSON.parse(message.body);
            const parsedEvent = eventSchema.parse(rawEvent);

            // 核心逻辑:将事件存入数据库
            prisma.mfeEventLog.create({
                data: {
                    eventId: parsedEvent.eventId,
                    type: parsedEvent.type,
                    payload: parsedEvent.payload,
                    source: parsedEvent.source,
                    userId: parsedEvent.userId,
                }
            }).then(() => {
                logger.info({ eventId: parsedEvent.eventId, type: parsedEvent.type }, 'Event persisted successfully.');
            }).catch(err => {
                // 处理唯一键冲突,实现幂等性
                if (err.code === 'P2002' && err.meta?.target?.includes('eventId')) {
                    logger.warn({ eventId: parsedEvent.eventId }, 'Duplicate event received. Ignoring.');
                } else {
                    logger.error({ error: err, eventId: parsedEvent.eventId }, 'Failed to persist event.');
                    // 在真实项目中,这里应该将失败的消息推送到一个死信队列(DLQ)
                }
            });

        } catch (error) {
            logger.error({ error, body: message.body }, 'Failed to parse or process message.');
        }
    });
};

client.onStompError = (frame) => {
    logger.error(`Broker reported error: ${frame.headers['message']}`);
    logger.error(`Additional details: ${frame.body}`);
};

async function main() {
    await prisma.$connect();
    client.activate();
}

main().catch(e => {
    logger.error(e);
    process.exit(1);
});

这个服务是“写模型”的处理器,它唯一的职责就是把事件记录下来。

./packages/state-service/src/processor.ts

import { PrismaClient, ServiceStatus } from '@my-org/db';
import { logger } from './utils/logger';

const prisma = new PrismaClient();

// 事件处理逻辑的映射表
// 这里的关键是每个处理器都是一个纯函数:(state, event) => newState
const eventHandlers: { [key: string]: Function } = {
    'service.restarted': async (event: any): Promise<void> => {
        const { serviceId } = event.payload;
        await prisma.serviceStatus.update({
            where: { id: serviceId },
            data: {
                status: 'RUNNING',
                lastEventId: event.eventId,
            },
        });
        logger.info({ serviceId }, 'Service status updated to RUNNING.');
    },
    'service.restart.initiated': async (event: any): Promise<void> => {
        const { serviceId, initiatedBy } = event.payload;
        await prisma.serviceStatus.upsert({
            where: { id: serviceId },
            update: {
                status: 'RESTARTING',
                lastEventId: event.eventId,
            },
            create: {
                id: serviceId,
                name: `Service-${serviceId}`, // 实际项目中名称应来自事件或数据库
                status: 'RESTARTING',
                configVersion: 1,
                lastEventId: event.eventId,
            }
        });
        logger.info({ serviceId, initiatedBy }, 'Service status updated to RESTARTING.');
    },
    // ... 其他事件处理器
};

export async function processEvent(event: any): Promise<void> {
    const handler = eventHandlers[event.type];
    if (handler) {
        try {
            await handler(event);
        } catch (error) {
            logger.error({ error, eventId: event.eventId }, `Error processing event for state projection.`);
            // 失败处理,例如记录错误或发送告警
        }
    } else {
        logger.trace({ type: event.type }, 'No state handler for this event type.');
    }
}

// StateService 的主入口会像 PersistenceService 一样订阅主题,
// 然后调用 processEvent 函数。

这个服务是“读模型”的构建器。它消费事件流,并更新物化视图,供前端快速查询。

3. 前端集成:Turbopack 配置与 React Hook

在 Monorepo 根目录下的 turbo.json 配置如下,Turbopack 会根据任务依赖关系并行执行构建和开发任务。

./turbo.json

{
  "$schema": "https://turbo.build/schema.json",
  "pipeline": {
    "build": {
      "dependsOn": ["^build"],
      "outputs": ["dist/**"]
    },
    "lint": {},
    "dev": {
      "cache": false,
      "persistent": true
    }
  }
}

接下来是前端的核心,一个可复用的 React Hook,用于封装与事件总线的交互。

./packages/ui-hooks/src/useEventBus.ts

import { useState, useEffect, useRef, useCallback } from 'react';
import { Client, IMessage, StompSubscription } from '@stomp/stompjs';
import { v4 as uuidv4 } from 'uuid';

// 注意:在真实项目中,URL应该是可配置的
const GATEWAY_URL = 'ws://localhost:3001';

interface EventBusState {
  isConnected: boolean;
  error: string | null;
}

interface PublishOptions {
  source: string; // 微应用标识
  userId: string;
}

export function useEventBus() {
  const [state, setState] = useState<EventBusState>({ isConnected: false, error: null });
  const clientRef = useRef<Client | null>(null);
  const subscriptionsRef = useRef<Map<string, StompSubscription>>(new Map());

  useEffect(() => {
    // 初始化 STOMP client
    const client = new Client({
      brokerURL: GATEWAY_URL,
      reconnectDelay: 5000,
      heartbeatIncoming: 4000,
      heartbeatOutgoing: 4000,
      onConnect: () => {
        setState({ isConnected: true, error: null });
        console.log('EventBus connected.');
      },
      onDisconnect: () => {
        setState({ isConnected: false, error: null });
        console.log('EventBus disconnected.');
      },
      onStompError: (frame) => {
        const errorMessage = `Broker error: ${frame.headers['message']} | Details: ${frame.body}`;
        setState(s => ({ ...s, error: errorMessage }));
        console.error(errorMessage);
      },
    });

    client.activate();
    clientRef.current = client;

    return () => {
      client.deactivate();
      console.log('EventBus deactivated.');
    };
  }, []);

  const publish = useCallback((type: string, payload: object, options: PublishOptions) => {
    if (!clientRef.current || !state.isConnected) {
      console.error('Cannot publish: EventBus is not connected.');
      // 在生产环境中,可以考虑将事件放入一个队列,在重连后发送
      return;
    }

    const event = {
      eventId: uuidv4(),
      type,
      payload,
      source: options.source,
      userId: options.userId,
    };

    // 主题格式: mfe.events.<event_type>
    const destination = `/topic/mfe.events.${type}`;
    clientRef.current.publish({
      destination,
      body: JSON.stringify(event),
      headers: { 'content-type': 'application/json', 'persistent': 'true' }
    });
  }, [state.isConnected]);

  const subscribe = useCallback((topic: string, callback: (message: IMessage) => void) => {
    if (!clientRef.current) {
        console.error('Cannot subscribe: client is not initialized.');
        return () => {};
    }
    
    // 如果已连接,则立即订阅
    if (clientRef.current.connected) {
        const sub = clientRef.current.subscribe(topic, callback);
        subscriptionsRef.current.set(topic, sub);
    } else {
        // 如果未连接,则在连接成功后订阅
        // 这里的实现需要对 client 的 onConnect 事件进行扩展
        // 简单起见,我们假设调用 subscribe 时已经连接
        console.warn('Subscribing while disconnected. Subscription will activate on connect.');
        // A more robust solution would queue subscriptions.
    }

    // 返回一个取消订阅的函数
    return () => {
        const sub = subscriptionsRef.current.get(topic);
        if (sub) {
            sub.unsubscribe();
            subscriptionsRef.current.delete(topic);
        }
    };
  }, []);

  return { ...state, publish, subscribe };
}

4. 微应用实战

假设我们有两个微应用:ControlPanel 用于执行操作,StatusDashboard 用于展示状态。

./apps/control-panel/src/components/ServiceController.tsx

import React from 'react';
import { useEventBus } from '@my-org/ui-hooks';

const CURRENT_USER_ID = 'user-123'; // 实际应从认证上下文中获取
const MFE_SOURCE = 'ControlPanel';

export function ServiceController({ serviceId }: { serviceId: string }) {
  const { publish, isConnected } = useEventBus();

  const handleRestart = () => {
    if (!isConnected) {
      alert('Connection to system bus lost. Please wait.');
      return;
    }
    console.log(`Publishing restart event for ${serviceId}`);
    publish(
      'service.restart.initiated',
      { serviceId, initiatedBy: CURRENT_USER_ID },
      { source: MFE_SOURCE, userId: CURRENT_USER_ID }
    );
  };

  return (
    <div>
      <h3>Service: {serviceId}</h3>
      <button onClick={handleRestart} disabled={!isConnected}>
        Restart Service
      </button>
    </div>
  );
}

./apps/status-dashboard/src/components/ServiceStatusDisplay.tsx

import React, { useState, useEffect } from 'react';
import { useEventBus } from '@my-org/ui-hooks';
import { IMessage } from '@stomp/stompjs';

// 初始状态可以从 API 加载
const initialStatus = 'LOADING';

export function ServiceStatusDisplay({ serviceId }: { serviceId: string }) {
  const [status, setStatus] = useState(initialStatus);
  const { subscribe } = useEventBus();

  useEffect(() => {
    // 订阅与特定服务相关的状态更新
    const topic = `/topic/state.updates.service.${serviceId}`;

    // 假设后端 StateService 在物化视图更新后,会向这个更具体的主题发布更新消息
    const unsubscribe = subscribe(topic, (message: IMessage) => {
      try {
        const update = JSON.parse(message.body);
        setStatus(update.status);
      } catch (e) {
        console.error('Failed to parse status update', e);
      }
    });

    // 组件卸载时取消订阅
    return () => unsubscribe();
  }, [serviceId, subscribe]);

  return (
    <div>
      <span>Service {serviceId} Status:</span>
      <strong>{status}</strong>
    </div>
  );
}

至此,我们完成了一个闭环。用户在 ControlPanel 点击按钮,事件通过 WebSocket 发送到 ActiveMQ,被 PersistenceServiceStateService 处理,StateService 更新数据库后可能还会向另一个主题发布状态变更消息,最终被 StatusDashboard 接收并更新UI。整个过程是异步、解耦且持久化的。

方案的局限性与未来展望

此架构并非银弹。它引入的复杂度和延迟远高于传统的前端状态管理方案。对于需要毫秒级响应的UI交互(如拖拽、实时输入校验),这套机制完全不适用。它的价值在于处理那些代表重要业务状态变迁的、低频但高价值的用户操作。

当前实现中的 WebSocket 网关是一个潜在的单点故障和性能瓶颈。在生产环境中,它需要被设计成一个可水平扩展的集群,并使用负载均衡器分发连接。此外,事件 Schema 的管理至关重要,随着系统演进,需要引入类似 Schema Registry 的机制来确保向后兼容性,避免因事件格式变更导致的服务中断。

未来的一个优化方向是探索更细粒度的消息路由。例如,可以让前端直接订阅与自身相关的特定资源主题(如 /topic/state.updates/service/abc-123),而不是订阅一个宽泛的主题然后在客户端进行过滤。这需要网关具备更智能的权限校验和主题映射能力,但可以显著降低不必要的网络流量和前端处理负载。


  目录