在 EKS 上利用 RabbitMQ 作为通信层的 Paxos 分布式锁实现


团队在 AWS EKS 上运行的微服务集群规模扩大后,一个老问题再次浮出水面:分布式锁。之前依赖 Redis 的 SETNX 做一些简单的锁,但在一些对一致性要求更高的场景,比如关键任务调度、资源竞态分配,这种方案的可靠性不足以让我们安心。引入 ZooKeeper 或 etcd 是标准答案,但为了一个锁服务而引入一整套重量级的组件,其运维成本和技术栈复杂度都让我们犹豫。我们的核心技术栈是 Node.js 和 RabbitMQ,能不能基于现有组件构建一个足够“好”的分布式锁服务,成了这次技术探索的起点。

初步构想是实现一个高可用的复制状态机(Replicated State Machine),锁的状态就是这个状态机需要维护的数据。要保证状态机在多个节点间的一致性,就需要一个共识算法。我们没有选择相对容易理解的 Raft,而是决定挑战 Paxos。原因很简单:一方面是为了深入理解这个共识算法的鼻祖,另一方面,Paxos 对网络模型的假设更弱,不强制要求一个稳定的 Leader,这给了我们更大的架构灵活性。

一个非主流但有趣的决策是,我们决定不采用节点间直接 RPC 通信的方式,而是使用 RabbitMQ 作为 Paxos 协议的消息总线。这个选择的好处是明显的:

  1. 解耦: Paxos 节点(我们称之为 Acceptor)无需知道其他节点具体的网络地址,只需订阅和发布到 RabbitMQ 的指定 Exchange 即可。这简化了在 EKS 这种动态环境中的服务发现。
  2. 缓冲与重试: RabbitMQ 提供了消息持久化和缓冲能力,可以应对节点短暂的不可用,提升了系统的整体韧性。
  3. 运维熟悉度: 团队已经有成熟的 RabbitMQ 集群运维经验。

当然,这个选择也带来了挑战:消息延迟会比直接的 gRPC 调用高得多,并且 RabbitMQ 默认不保证消息的绝对顺序,这对共识算法的实现提出了更高的要求。幸运的是,Paxos 算法本身的设计,通过提案编号 n,天然地能够处理消息的乱序、重复和丢失。

最终的技术方案定型为:

  • 共识核心: 基于 Node.js 实现的 Classic Paxos 算法。
  • 服务节点: 使用 Express.js 封装 Paxos 实例,对外提供 RESTful API 用于获取和释放锁。
  • 通信层: RabbitMQ 的 Topic Exchange,用于广播 Paxos 协议消息。
  • 部署环境: AWS EKS,使用 StatefulSet 保证每个服务节点有稳定的网络标识。

第一步:Paxos 核心逻辑的 TypeScript 实现

首先要解决的是核心的 Paxos 算法。我们将其封装在一个 PaxosInstance 类中,这个类同时扮演 Proposer、Acceptor 和 Learner 的角色。一个实例代表对某个特定锁资源的一次共识过程。

在真实项目中,一个节点会同时处理多个 key 的共识,但为了清晰,我们先简化为单个 key 的实例。

// src/paxos/PaxosInstance.ts

import { EventEmitter } from 'events';

// 定义 Paxos 消息类型
export type PaxosMessage = {
  type: 'PREPARE';
  proposalNumber: number;
} | {
  type: 'PROMISE';
  proposalNumber: number;
  acceptedProposal?: { number: number; value: any };
  from: string; // Acceptor ID
} | {
  type: 'PROPOSE';
  proposalNumber: number;
  value: any;
} | {
  type: 'ACCEPTED';
  proposalNumber: number;
  value: any;
  from: string; // Acceptor ID
};

// 定义节点角色状态
interface AcceptorState {
  promisedProposalNumber: number;
  acceptedProposal?: { number: number; value: any };
}

interface ProposerState {
  // 当前正在进行的提案号
  currentProposalNumber: number;
  // 提议的值
  proposedValue: any;
  // 收到的 Promise 响应
  promises: PaxosMessage[];
  // 收到的 Accepted 响应
  acceptances: PaxosMessage[];
}

export class PaxosInstance extends EventEmitter {
  private readonly nodeId: string;
  private readonly quorumSize: number;

  // 作为 Acceptor 的状态
  private acceptorState: AcceptorState = {
    promisedProposalNumber: -1,
  };

  // 作为 Proposer 的状态
  private proposerState?: ProposerState;

  // 作为 Learner 的状态
  private learnedValue: any | null = null;
  private isConsensusReached: boolean = false;

  /**
   * @param nodeId 当前节点的唯一标识,例如 pod 名称 'lock-service-0'
   * @param peerCount 集群中节点总数
   */
  constructor(nodeId: string, peerCount: number) {
    super();
    this.nodeId = nodeId;
    // 计算法定人数(Quorum)
    this.quorumSize = Math.floor(peerCount / 2) + 1;
    console.log(`[${this.nodeId}] Paxos instance created. Quorum size: ${this.quorumSize}`);
  }

  /**
   * Proposer: 发起一个提案
   * @param value 要达成共识的值 (例如: { action: 'ACQUIRE', lockId: 'res-1', owner: 'client-abc' })
   */
  public startProposal(value: any): void {
    if (this.isConsensusReached) {
      console.warn(`[${this.nodeId}] Consensus already reached. Ignoring new proposal.`);
      return;
    }

    // 初始化 Proposer 状态
    const proposalNumber = this.generateProposalNumber();
    this.proposerState = {
      currentProposalNumber: proposalNumber,
      proposedValue: value,
      promises: [],
      acceptances: [],
    };

    console.log(`[${this.nodeId}] Starting proposal #${proposalNumber} with value:`, value);

    const prepareMessage: PaxosMessage = {
      type: 'PREPARE',
      proposalNumber: proposalNumber,
    };

    // 通过事件发射器将消息交由外部的通信模块广播
    this.emit('broadcast', prepareMessage);
  }

  /**
   * 核心消息处理逻辑,所有角色都通过这个入口
   * @param message 从消息队列收到的消息
   */
  public handleMessage(message: PaxosMessage): void {
    if (this.isConsensusReached) {
      return; // 共识已达成,不再处理消息
    }

    switch (message.type) {
      case 'PREPARE':
        this.handlePrepare(message);
        break;
      case 'PROMISE':
        this.handlePromise(message);
        break;
      case 'PROPOSE':
        this.handlePropose(message);
        break;
      case 'ACCEPTED':
        this.handleAccepted(message);
        break;
    }
  }

  /**
   * Acceptor: 处理 Prepare 请求
   */
  private handlePrepare(message: PaxosMessage & { type: 'PREPARE' }): void {
    if (message.proposalNumber > this.acceptorState.promisedProposalNumber) {
      console.log(`[${this.nodeId}] Promising for proposal #${message.proposalNumber} (previously promised #${this.acceptorState.promisedProposalNumber})`);
      this.acceptorState.promisedProposalNumber = message.proposalNumber;

      const promiseMessage: PaxosMessage = {
        type: 'PROMISE',
        proposalNumber: message.proposalNumber,
        acceptedProposal: this.acceptorState.acceptedProposal,
        from: this.nodeId,
      };
      // 回复 Promise 给 Proposer,这里也是通过事件发射器
      this.emit('send', promiseMessage);
    } else {
       console.log(`[${this.nodeId}] Rejecting proposal #${message.proposalNumber} (already promised #${this.acceptorState.promisedProposalNumber})`);
    }
  }

  /**
   * Proposer: 处理 Promise 响应
   */
  private handlePromise(message: PaxosMessage & { type: 'PROMISE' }): void {
    if (!this.proposerState || message.proposalNumber !== this.proposerState.currentProposalNumber) {
      return; // 忽略不属于当前提案的 Promise
    }
    
    this.proposerState.promises.push(message);
    console.log(`[${this.nodeId}] Received promise from ${message.from} for proposal #${message.proposalNumber}. Total promises: ${this.proposerState.promises.length}`);


    if (this.proposerState.promises.length === this.quorumSize) {
      console.log(`[${this.nodeId}] Quorum of promises reached for proposal #${this.proposerState.currentProposalNumber}.`);
      let highestAcceptedProposal: { number: number; value: any } | undefined = undefined;

      // 检查收到的 Promise 中是否包含已接受的提案
      this.proposerState.promises.forEach(p => {
        const promise = p as PaxosMessage & { type: 'PROMISE' };
        if (promise.acceptedProposal) {
          if (!highestAcceptedProposal || promise.acceptedProposal.number > highestAcceptedProposal.number) {
            highestAcceptedProposal = promise.acceptedProposal;
          }
        }
      });
      
      let valueToPropose = this.proposerState.proposedValue;
      if (highestAcceptedProposal) {
        console.log(`[${this.nodeId}] A higher-numbered proposal (#${highestAcceptedProposal.number}) was already accepted. Proposing its value instead.`);
        valueToPropose = highestAcceptedProposal.value;
      }
      
      const proposeMessage: PaxosMessage = {
        type: 'PROPOSE',
        proposalNumber: this.proposerState.currentProposalNumber,
        value: valueToPropose,
      };
      
      this.emit('broadcast', proposeMessage);
    }
  }

  /**
   * Acceptor: 处理 Propose 请求
   */
  private handlePropose(message: PaxosMessage & { type: 'PROPOSE' }): void {
    if (message.proposalNumber >= this.acceptorState.promisedProposalNumber) {
      console.log(`[${this.nodeId}] Accepting proposal #${message.proposalNumber}.`);
      this.acceptorState.promisedProposalNumber = message.proposalNumber;
      this.acceptorState.acceptedProposal = {
        number: message.proposalNumber,
        value: message.value,
      };

      const acceptedMessage: PaxosMessage = {
        type: 'ACCEPTED',
        proposalNumber: message.proposalNumber,
        value: message.value,
        from: this.nodeId
      };
      
      this.emit('broadcast', acceptedMessage);
    } else {
      console.log(`[${this.nodeId}] Rejecting propose #${message.proposalNumber} (already promised #${this.acceptorState.promisedProposalNumber})`);
    }
  }

  /**
   * Learner & Proposer: 处理 Accepted 消息
   */
  private handleAccepted(message: PaxosMessage & { type: 'ACCEPTED' }): void {
    // Proposer 角色统计
    if (this.proposerState && message.proposalNumber === this.proposerState.currentProposalNumber) {
        this.proposerState.acceptances.push(message);
        console.log(`[${this.nodeId}] Received acceptance from ${message.from} for proposal #${message.proposalNumber}. Total acceptances: ${this.proposerState.acceptances.length}`);

        if (this.proposerState.acceptances.length === this.quorumSize) {
            console.log(`[${this.nodeId}] Quorum of acceptances reached for proposal #${message.proposalNumber}. Consensus achieved!`);
            this.learnValue(message.value);
        }
    }
    
    // Learner 角色学习
    // 在一个简化实现中,Proposer 达到法定人数后自己学习即可。
    // 在更完备的实现中,Learner 需要独立监听 Accepted 消息并统计。
    // 这里我们允许任何节点在收到一个 Accepted 后,如果还未学习到值,就暂存它。
    // 但真正的“共识达成”是由 Proposer 确认的。
  }
  
  private learnValue(value: any) {
    if (!this.isConsensusReached) {
        this.learnedValue = value;
        this.isConsensusReached = true;
        console.log(`[${this.nodeId}] Learned value:`, value);
        this.emit('consensus', value); // 发出共识达成事件
        this.proposerState = undefined; // 清理状态
    }
  }
  
  /**
   * 生成一个唯一的、递增的提案号。
   * 在真实分布式系统中,这需要更鲁棒的机制,例如结合时间戳和节点ID。
   * 这里简化为使用时间戳+随机数。
   */
  private generateProposalNumber(): number {
    // (时间戳 << 16) | (节点ID的哈希) 是一种常见做法
    // 这里为了简单,我们用高精度时间戳
    return Date.now() + Math.random();
  }
}

这里的核心设计是 PaxosInstance 类通过 EventEmitter 与外部通信,它不关心消息是怎样发送的(HTTP, gRPC, or RabbitMQ),只负责在正确的时机 emit('broadcast', message)emit('send', message)。这使得核心逻辑和网络通信彻底分离。

第二步:集成 RabbitMQ 作为消息总线

接下来是棘手的部分:如何用 RabbitMQ 来承载 Paxos 协议。我们需要一个广播机制(用于 Prepare, Propose, Accepted)和一个点对点通信机制(用于 Promise)。Topic Exchange 是实现广播的最佳选择。

// src/services/MessageBroker.ts

import amqp, { Connection, Channel } from 'amqplib';

const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://user:password@localhost:5672';
const EXCHANGE_NAME = 'paxos_locks';

export class MessageBroker {
  private connection!: Connection;
  private channel!: Channel;
  private readonly nodeId: string;

  constructor(nodeId: string) {
    this.nodeId = nodeId;
  }

  public async connect(): Promise<void> {
    try {
      this.connection = await amqp.connect(RABBITMQ_URL);
      this.channel = await this.connection.createChannel();

      // 声明一个 topic exchange 用于广播
      await this.channel.assertExchange(EXCHANGE_NAME, 'topic', { durable: false });
      
      console.log(`[${this.nodeId}] RabbitMQ connected and exchange '${EXCHANGE_NAME}' is ready.`);
    } catch (error) {
      console.error(`[${this.nodeId}] Failed to connect to RabbitMQ:`, error);
      // 在生产环境中,这里应该有重连逻辑
      process.exit(1);
    }
  }

  /**
   * 监听来自其他节点的消息
   * @param onMessageCallback 收到消息后的回调函数
   */
  public async subscribe(onMessageCallback: (message: any) => void): Promise<void> {
    // 每个节点都创建一个唯一的、非持久的队列来接收消息
    const { queue } = await this.channel.assertQueue('', { exclusive: true });
    
    // 绑定队列到 exchange,接收所有广播消息
    // routingKey '#' 意味着接收所有消息
    this.channel.bindQueue(queue, EXCHANGE_NAME, '#');

    this.channel.consume(queue, (msg) => {
      if (msg?.content) {
        try {
          const content = JSON.parse(msg.content.toString());
          onMessageCallback(content);
        } catch (e) {
          console.error(`[${this.nodeId}] Error parsing message:`, e);
        }
      }
    }, { noAck: true });

    console.log(`[${this.nodeId}] Subscribed to exchange '${EXCHANGE_NAME}'. Waiting for messages.`);
  }

  /**
   * 广播消息给所有节点
   * @param message PaxosMessage 对象
   */
  public broadcast(message: any): void {
    // 使用一个通用的 routing key 来广播
    const routingKey = 'paxos.broadcast';
    this.channel.publish(EXCHANGE_NAME, routingKey, Buffer.from(JSON.stringify(message)));
  }
  
  // 在这个模型中,我们简化了设计,所有消息都通过广播发送。
  // Promise 消息也广播,但消息体内会包含 from 字段,
  // 只有原始的 Proposer 会处理这个 Promise。
  // 这样做简化了队列管理,代价是网络流量略有增加,对于节点不多的场景是可接受的。
  public send(message: any): void {
    this.broadcast(message);
  }

  public async close(): Promise<void> {
    await this.channel.close();
    await this.connection.close();
  }
}

然后,我们将 PaxosInstanceMessageBroker 粘合起来:

// In the main application file, e.g., src/server.ts

const PEER_COUNT = parseInt(process.env.PEER_COUNT || '3', 10);
const NODE_ID = process.env.HOSTNAME || 'local-node'; // HOSTNAME is set by StatefulSet

const broker = new MessageBroker(NODE_ID);
const paxos = new PaxosInstance(NODE_ID, PEER_COUNT);

broker.connect().then(() => {
  broker.subscribe((message) => {
    paxos.handleMessage(message);
  });
});

paxos.on('broadcast', (message) => {
  broker.broadcast(message);
});

paxos.on('send', (message) => {
  broker.send(message); // Using broadcast for simplicity
});

第三步:Express.js 接口层

现在,我们需要一个 HTTP 接口来触发和查询锁的状态。Express.js 是实现这个轻量级 API 服务的理想选择。

// src/api/server.ts
import express from 'express';
import { PaxosInstance } from '../paxos/PaxosInstance';

// 这是一个简化的内存锁状态管理器
const lockState = new Map<string, { owner: string; expires: number }>();

export function createApiServer(paxos: PaxosInstance) {
  const app = express();
  app.use(express.json());

  // 尝试获取锁
  app.post('/locks/:lockId', (req, res) => {
    const { lockId } = req.params;
    const { owner, ttl } = req.body;

    if (!owner || !ttl) {
      return res.status(400).json({ error: 'owner and ttl are required' });
    }

    if (lockState.has(lockId) && lockState.get(lockId)!.expires > Date.now()) {
      return res.status(409).json({ error: 'Lock already held' });
    }

    // 提议的值,包含了锁的完整信息
    const proposalValue = {
      action: 'ACQUIRE',
      lockId,
      owner,
      expires: Date.now() + ttl * 1000,
    };

    // 监听共识结果
    const consensusListener = (learnedValue: any) => {
      // 确保是本次请求的共识结果
      if (
        learnedValue.action === 'ACQUIRE' &&
        learnedValue.lockId === lockId &&
        learnedValue.owner === owner
      ) {
        console.log(`[API] Consensus reached for lock ${lockId}, owner ${owner}.`);
        lockState.set(lockId, { owner: learnedValue.owner, expires: learnedValue.expires });
        res.status(200).json({ status: 'acquired', details: learnedValue });
      } else {
        // 其他 Proposer 赢得了这次共识
        res.status(409).json({ error: 'Consensus lost to another proposal', winner: learnedValue });
      }
      paxos.removeListener('consensus', consensusListener);
    };

    // 设置一个超时,防止请求永远挂起
    const timeout = setTimeout(() => {
      paxos.removeListener('consensus', consensusListener);
      if (!res.headersSent) {
          res.status(504).json({ error: 'Consensus timeout' });
      }
    }, 10000); // 10秒超时

    paxos.once('consensus', (v) => {
        clearTimeout(timeout);
        consensusListener(v);
    });

    paxos.startProposal(proposalValue);
  });

  // 获取锁状态
  app.get('/locks/:lockId', (req, res) => {
    const { lockId } = req.params;
    const lock = lockState.get(lockId);

    if (lock && lock.expires > Date.now()) {
      res.status(200).json(lock);
    } else {
      lockState.delete(lockId);
      res.status(404).json({ error: 'Lock not found or expired' });
    }
  });

  return app;
}

这个 API 的设计关键在于 POST /locks/:lockId 是异步的。客户端发起请求后,服务器立即启动 Paxos 提议流程,并监听 consensus 事件。一旦共识达成,服务器才向客户端返回成功或失败。

第四步:容器化与 EKS 部署

部署到 Kubernetes,特别是 EKS,是最后但至关重要的一步。因为 Paxos 节点需要稳定的身份和彼此发现的能力,StatefulSet 是不二之选。

Dockerfile:

FROM node:18-alpine

WORKDIR /usr/src/app

COPY package*.json ./
RUN npm install

COPY . .
RUN npm run build

EXPOSE 3000

# HOSTNAME 将由 Kubernetes 自动注入,作为我们的节点ID
CMD ["node", "dist/main.js"]

Kubernetes Manifests (lock-service.yaml):
我们需要一个 Headless Service 来为 StatefulSet 的每个 Pod 提供一个唯一的、可预测的 DNS 名称(例如 lock-service-0.lock-service-headless.default.svc.cluster.local)。

apiVersion: v1
kind: Service
metadata:
  name: lock-service-headless
  labels:
    app: lock-service
spec:
  ports:
  - port: 3000
    name: api
  clusterIP: None # 关键:这让它成为一个 Headless Service
  selector:
    app: lock-service
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: lock-service
spec:
  serviceName: "lock-service-headless"
  replicas: 3 # 必须是奇数个副本
  selector:
    matchLabels:
      app: lock-service
  template:
    metadata:
      labels:
        app: lock-service
    spec:
      terminationGracePeriodSeconds: 10
      containers:
      - name: lock-service
        image: your-account-id.dkr.ecr.your-region.amazonaws.com/lock-service:latest
        ports:
        - containerPort: 3000
          name: api
        env:
        # HOSTNAME 环境变量由 K8s 自动为 pod 设置,
        # 格式为 statefulset-name-ordinal, e.g., lock-service-0
        - name: HOSTNAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: PEER_COUNT
          value: "3" # 与 replicas 数量保持一致
        - name: RABBITMQ_URL
          value: "amqp://user:[email protected]:5672"
        resources:
          requests:
            cpu: "100m"
            memory: "128Mi"
          limits:
            cpu: "200m"
            memory: "256Mi"

这个配置的几个关键点:

  • StatefulSet: 确保 Pods 有稳定的网络标识(lock-service-0, lock-service-1, …)和持久存储(如果需要的话)。我们的 HOSTNAME 环境变量就利用了这个特性作为节点 ID。
  • Headless Service: 配合 StatefulSet,为服务发现提供了基础。虽然在这个 RabbitMQ 的实现里我们没有直接用到 Pod 间的 DNS,但这是 StatefulSet 的标准实践。
  • PEER_COUNT: 作为一个环境变量传入,让 Paxos 实例知道法定人数是多少。这个值必须与 replicas 数量一致。

部署流程很简单:先部署 RabbitMQ(可以使用 Bitnami Helm chart),然后构建 Docker 镜像推送到 ECR,最后 kubectl apply -f lock-service.yaml

部署后的架构图如下:

graph TD
    subgraph AWS EKS Cluster
        subgraph lock-service StatefulSet
            Pod0["Pod: lock-service-0\n(Express.js + Paxos)"]
            Pod1["Pod: lock-service-1\n(Express.js + Paxos)"]
            Pod2["Pod: lock-service-2\n(Express.js + Paxos)"]
        end

        subgraph RabbitMQ Cluster
            RMQ("RabbitMQ Topic Exchange\n'paxos_locks'")
        end

        Pod0 -- "Publish (Prepare)" --> RMQ
        Pod1 -- "Publish (Prepare)" --> RMQ
        Pod2 -- "Publish (Prepare)" --> RMQ

        RMQ -- "Consume (Broadcast)" --> Pod0
        RMQ -- "Consume (Broadcast)" --> Pod1
        RMQ -- "Consume (Broadcast)" --> Pod2
        
        K8sService[("K8s Service\n(Load Balancer)")]
    end

    Client[("Client Application")] -- "HTTP POST /locks/res-1" --> K8sService
    K8sService --> Pod0

当一个客户端请求到达 lock-service-0 时,lock-service-0 作为 Proposer 发起 PREPARE 消息。这个消息通过 RabbitMQ 广播给包括自己在内的所有三个节点。所有节点作为 Acceptor 回复 PROMISE(同样通过广播)。lock-service-0 收集到足够多的 PROMISE 后,再广播 PROPOSE,最终达成共识。

遗留问题与未来迭代

这个实现成功地验证了我们的设想,但距离一个生产级的服务还有距离。

  1. 性能瓶颈: RabbitMQ 带来的延迟是最大的问题。对于需要低延迟锁的场景,这个架构并不合适。gRPC 或其他二进制 RPC 框架会是更好的选择。
  2. 活锁 (Livelock) 风险: Classic Paxos 在两个 Proposer 持续交替发起更高编号提案的极端情况下,可能导致活锁,即永远无法达成共识。Multi-Paxos 通过选举一个稳定的 Leader 作为唯一的 Proposer 来解决这个问题,这是后续优化的第一步。
  3. 日志与快照: 当前实现是内存态的,节点重启后状态会丢失。真正的复制状态机需要持久化 Paxos 日志。当日志过长时,还需要引入快照机制来压缩状态,否则节点重启恢复会非常缓慢。
  4. 成员变更: 目前的集群成员是静态配置的。一个生产系统需要支持动态地增加或减少节点,这需要实现 Paxos 的成员变更算法,复杂度很高。

尽管存在这些局限,这次实践的价值在于,它迫使我们深入 Paxos 的每一个细节,并探索了在云原生环境下,利用消息队列实现共识算法这一非典型但有趣的技术路径。它证明了即使是复杂的分布式共识,也可以用我们熟悉的工具栈,通过巧妙的架构设计来“拼装”出来,这本身就是一种工程上的胜利。


  目录