团队在 AWS EKS 上运行的微服务集群规模扩大后,一个老问题再次浮出水面:分布式锁。之前依赖 Redis 的 SETNX
做一些简单的锁,但在一些对一致性要求更高的场景,比如关键任务调度、资源竞态分配,这种方案的可靠性不足以让我们安心。引入 ZooKeeper 或 etcd 是标准答案,但为了一个锁服务而引入一整套重量级的组件,其运维成本和技术栈复杂度都让我们犹豫。我们的核心技术栈是 Node.js 和 RabbitMQ,能不能基于现有组件构建一个足够“好”的分布式锁服务,成了这次技术探索的起点。
初步构想是实现一个高可用的复制状态机(Replicated State Machine),锁的状态就是这个状态机需要维护的数据。要保证状态机在多个节点间的一致性,就需要一个共识算法。我们没有选择相对容易理解的 Raft,而是决定挑战 Paxos。原因很简单:一方面是为了深入理解这个共识算法的鼻祖,另一方面,Paxos 对网络模型的假设更弱,不强制要求一个稳定的 Leader,这给了我们更大的架构灵活性。
一个非主流但有趣的决策是,我们决定不采用节点间直接 RPC 通信的方式,而是使用 RabbitMQ 作为 Paxos 协议的消息总线。这个选择的好处是明显的:
- 解耦: Paxos 节点(我们称之为 Acceptor)无需知道其他节点具体的网络地址,只需订阅和发布到 RabbitMQ 的指定 Exchange 即可。这简化了在 EKS 这种动态环境中的服务发现。
- 缓冲与重试: RabbitMQ 提供了消息持久化和缓冲能力,可以应对节点短暂的不可用,提升了系统的整体韧性。
- 运维熟悉度: 团队已经有成熟的 RabbitMQ 集群运维经验。
当然,这个选择也带来了挑战:消息延迟会比直接的 gRPC 调用高得多,并且 RabbitMQ 默认不保证消息的绝对顺序,这对共识算法的实现提出了更高的要求。幸运的是,Paxos 算法本身的设计,通过提案编号 n
,天然地能够处理消息的乱序、重复和丢失。
最终的技术方案定型为:
- 共识核心: 基于 Node.js 实现的 Classic Paxos 算法。
- 服务节点: 使用 Express.js 封装 Paxos 实例,对外提供 RESTful API 用于获取和释放锁。
- 通信层: RabbitMQ 的 Topic Exchange,用于广播 Paxos 协议消息。
- 部署环境: AWS EKS,使用 StatefulSet 保证每个服务节点有稳定的网络标识。
第一步:Paxos 核心逻辑的 TypeScript 实现
首先要解决的是核心的 Paxos 算法。我们将其封装在一个 PaxosInstance
类中,这个类同时扮演 Proposer、Acceptor 和 Learner 的角色。一个实例代表对某个特定锁资源的一次共识过程。
在真实项目中,一个节点会同时处理多个 key 的共识,但为了清晰,我们先简化为单个 key 的实例。
// src/paxos/PaxosInstance.ts
import { EventEmitter } from 'events';
// 定义 Paxos 消息类型
export type PaxosMessage = {
type: 'PREPARE';
proposalNumber: number;
} | {
type: 'PROMISE';
proposalNumber: number;
acceptedProposal?: { number: number; value: any };
from: string; // Acceptor ID
} | {
type: 'PROPOSE';
proposalNumber: number;
value: any;
} | {
type: 'ACCEPTED';
proposalNumber: number;
value: any;
from: string; // Acceptor ID
};
// 定义节点角色状态
interface AcceptorState {
promisedProposalNumber: number;
acceptedProposal?: { number: number; value: any };
}
interface ProposerState {
// 当前正在进行的提案号
currentProposalNumber: number;
// 提议的值
proposedValue: any;
// 收到的 Promise 响应
promises: PaxosMessage[];
// 收到的 Accepted 响应
acceptances: PaxosMessage[];
}
export class PaxosInstance extends EventEmitter {
private readonly nodeId: string;
private readonly quorumSize: number;
// 作为 Acceptor 的状态
private acceptorState: AcceptorState = {
promisedProposalNumber: -1,
};
// 作为 Proposer 的状态
private proposerState?: ProposerState;
// 作为 Learner 的状态
private learnedValue: any | null = null;
private isConsensusReached: boolean = false;
/**
* @param nodeId 当前节点的唯一标识,例如 pod 名称 'lock-service-0'
* @param peerCount 集群中节点总数
*/
constructor(nodeId: string, peerCount: number) {
super();
this.nodeId = nodeId;
// 计算法定人数(Quorum)
this.quorumSize = Math.floor(peerCount / 2) + 1;
console.log(`[${this.nodeId}] Paxos instance created. Quorum size: ${this.quorumSize}`);
}
/**
* Proposer: 发起一个提案
* @param value 要达成共识的值 (例如: { action: 'ACQUIRE', lockId: 'res-1', owner: 'client-abc' })
*/
public startProposal(value: any): void {
if (this.isConsensusReached) {
console.warn(`[${this.nodeId}] Consensus already reached. Ignoring new proposal.`);
return;
}
// 初始化 Proposer 状态
const proposalNumber = this.generateProposalNumber();
this.proposerState = {
currentProposalNumber: proposalNumber,
proposedValue: value,
promises: [],
acceptances: [],
};
console.log(`[${this.nodeId}] Starting proposal #${proposalNumber} with value:`, value);
const prepareMessage: PaxosMessage = {
type: 'PREPARE',
proposalNumber: proposalNumber,
};
// 通过事件发射器将消息交由外部的通信模块广播
this.emit('broadcast', prepareMessage);
}
/**
* 核心消息处理逻辑,所有角色都通过这个入口
* @param message 从消息队列收到的消息
*/
public handleMessage(message: PaxosMessage): void {
if (this.isConsensusReached) {
return; // 共识已达成,不再处理消息
}
switch (message.type) {
case 'PREPARE':
this.handlePrepare(message);
break;
case 'PROMISE':
this.handlePromise(message);
break;
case 'PROPOSE':
this.handlePropose(message);
break;
case 'ACCEPTED':
this.handleAccepted(message);
break;
}
}
/**
* Acceptor: 处理 Prepare 请求
*/
private handlePrepare(message: PaxosMessage & { type: 'PREPARE' }): void {
if (message.proposalNumber > this.acceptorState.promisedProposalNumber) {
console.log(`[${this.nodeId}] Promising for proposal #${message.proposalNumber} (previously promised #${this.acceptorState.promisedProposalNumber})`);
this.acceptorState.promisedProposalNumber = message.proposalNumber;
const promiseMessage: PaxosMessage = {
type: 'PROMISE',
proposalNumber: message.proposalNumber,
acceptedProposal: this.acceptorState.acceptedProposal,
from: this.nodeId,
};
// 回复 Promise 给 Proposer,这里也是通过事件发射器
this.emit('send', promiseMessage);
} else {
console.log(`[${this.nodeId}] Rejecting proposal #${message.proposalNumber} (already promised #${this.acceptorState.promisedProposalNumber})`);
}
}
/**
* Proposer: 处理 Promise 响应
*/
private handlePromise(message: PaxosMessage & { type: 'PROMISE' }): void {
if (!this.proposerState || message.proposalNumber !== this.proposerState.currentProposalNumber) {
return; // 忽略不属于当前提案的 Promise
}
this.proposerState.promises.push(message);
console.log(`[${this.nodeId}] Received promise from ${message.from} for proposal #${message.proposalNumber}. Total promises: ${this.proposerState.promises.length}`);
if (this.proposerState.promises.length === this.quorumSize) {
console.log(`[${this.nodeId}] Quorum of promises reached for proposal #${this.proposerState.currentProposalNumber}.`);
let highestAcceptedProposal: { number: number; value: any } | undefined = undefined;
// 检查收到的 Promise 中是否包含已接受的提案
this.proposerState.promises.forEach(p => {
const promise = p as PaxosMessage & { type: 'PROMISE' };
if (promise.acceptedProposal) {
if (!highestAcceptedProposal || promise.acceptedProposal.number > highestAcceptedProposal.number) {
highestAcceptedProposal = promise.acceptedProposal;
}
}
});
let valueToPropose = this.proposerState.proposedValue;
if (highestAcceptedProposal) {
console.log(`[${this.nodeId}] A higher-numbered proposal (#${highestAcceptedProposal.number}) was already accepted. Proposing its value instead.`);
valueToPropose = highestAcceptedProposal.value;
}
const proposeMessage: PaxosMessage = {
type: 'PROPOSE',
proposalNumber: this.proposerState.currentProposalNumber,
value: valueToPropose,
};
this.emit('broadcast', proposeMessage);
}
}
/**
* Acceptor: 处理 Propose 请求
*/
private handlePropose(message: PaxosMessage & { type: 'PROPOSE' }): void {
if (message.proposalNumber >= this.acceptorState.promisedProposalNumber) {
console.log(`[${this.nodeId}] Accepting proposal #${message.proposalNumber}.`);
this.acceptorState.promisedProposalNumber = message.proposalNumber;
this.acceptorState.acceptedProposal = {
number: message.proposalNumber,
value: message.value,
};
const acceptedMessage: PaxosMessage = {
type: 'ACCEPTED',
proposalNumber: message.proposalNumber,
value: message.value,
from: this.nodeId
};
this.emit('broadcast', acceptedMessage);
} else {
console.log(`[${this.nodeId}] Rejecting propose #${message.proposalNumber} (already promised #${this.acceptorState.promisedProposalNumber})`);
}
}
/**
* Learner & Proposer: 处理 Accepted 消息
*/
private handleAccepted(message: PaxosMessage & { type: 'ACCEPTED' }): void {
// Proposer 角色统计
if (this.proposerState && message.proposalNumber === this.proposerState.currentProposalNumber) {
this.proposerState.acceptances.push(message);
console.log(`[${this.nodeId}] Received acceptance from ${message.from} for proposal #${message.proposalNumber}. Total acceptances: ${this.proposerState.acceptances.length}`);
if (this.proposerState.acceptances.length === this.quorumSize) {
console.log(`[${this.nodeId}] Quorum of acceptances reached for proposal #${message.proposalNumber}. Consensus achieved!`);
this.learnValue(message.value);
}
}
// Learner 角色学习
// 在一个简化实现中,Proposer 达到法定人数后自己学习即可。
// 在更完备的实现中,Learner 需要独立监听 Accepted 消息并统计。
// 这里我们允许任何节点在收到一个 Accepted 后,如果还未学习到值,就暂存它。
// 但真正的“共识达成”是由 Proposer 确认的。
}
private learnValue(value: any) {
if (!this.isConsensusReached) {
this.learnedValue = value;
this.isConsensusReached = true;
console.log(`[${this.nodeId}] Learned value:`, value);
this.emit('consensus', value); // 发出共识达成事件
this.proposerState = undefined; // 清理状态
}
}
/**
* 生成一个唯一的、递增的提案号。
* 在真实分布式系统中,这需要更鲁棒的机制,例如结合时间戳和节点ID。
* 这里简化为使用时间戳+随机数。
*/
private generateProposalNumber(): number {
// (时间戳 << 16) | (节点ID的哈希) 是一种常见做法
// 这里为了简单,我们用高精度时间戳
return Date.now() + Math.random();
}
}
这里的核心设计是 PaxosInstance
类通过 EventEmitter
与外部通信,它不关心消息是怎样发送的(HTTP, gRPC, or RabbitMQ),只负责在正确的时机 emit('broadcast', message)
或 emit('send', message)
。这使得核心逻辑和网络通信彻底分离。
第二步:集成 RabbitMQ 作为消息总线
接下来是棘手的部分:如何用 RabbitMQ 来承载 Paxos 协议。我们需要一个广播机制(用于 Prepare, Propose, Accepted)和一个点对点通信机制(用于 Promise)。Topic Exchange 是实现广播的最佳选择。
// src/services/MessageBroker.ts
import amqp, { Connection, Channel } from 'amqplib';
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://user:password@localhost:5672';
const EXCHANGE_NAME = 'paxos_locks';
export class MessageBroker {
private connection!: Connection;
private channel!: Channel;
private readonly nodeId: string;
constructor(nodeId: string) {
this.nodeId = nodeId;
}
public async connect(): Promise<void> {
try {
this.connection = await amqp.connect(RABBITMQ_URL);
this.channel = await this.connection.createChannel();
// 声明一个 topic exchange 用于广播
await this.channel.assertExchange(EXCHANGE_NAME, 'topic', { durable: false });
console.log(`[${this.nodeId}] RabbitMQ connected and exchange '${EXCHANGE_NAME}' is ready.`);
} catch (error) {
console.error(`[${this.nodeId}] Failed to connect to RabbitMQ:`, error);
// 在生产环境中,这里应该有重连逻辑
process.exit(1);
}
}
/**
* 监听来自其他节点的消息
* @param onMessageCallback 收到消息后的回调函数
*/
public async subscribe(onMessageCallback: (message: any) => void): Promise<void> {
// 每个节点都创建一个唯一的、非持久的队列来接收消息
const { queue } = await this.channel.assertQueue('', { exclusive: true });
// 绑定队列到 exchange,接收所有广播消息
// routingKey '#' 意味着接收所有消息
this.channel.bindQueue(queue, EXCHANGE_NAME, '#');
this.channel.consume(queue, (msg) => {
if (msg?.content) {
try {
const content = JSON.parse(msg.content.toString());
onMessageCallback(content);
} catch (e) {
console.error(`[${this.nodeId}] Error parsing message:`, e);
}
}
}, { noAck: true });
console.log(`[${this.nodeId}] Subscribed to exchange '${EXCHANGE_NAME}'. Waiting for messages.`);
}
/**
* 广播消息给所有节点
* @param message PaxosMessage 对象
*/
public broadcast(message: any): void {
// 使用一个通用的 routing key 来广播
const routingKey = 'paxos.broadcast';
this.channel.publish(EXCHANGE_NAME, routingKey, Buffer.from(JSON.stringify(message)));
}
// 在这个模型中,我们简化了设计,所有消息都通过广播发送。
// Promise 消息也广播,但消息体内会包含 from 字段,
// 只有原始的 Proposer 会处理这个 Promise。
// 这样做简化了队列管理,代价是网络流量略有增加,对于节点不多的场景是可接受的。
public send(message: any): void {
this.broadcast(message);
}
public async close(): Promise<void> {
await this.channel.close();
await this.connection.close();
}
}
然后,我们将 PaxosInstance
和 MessageBroker
粘合起来:
// In the main application file, e.g., src/server.ts
const PEER_COUNT = parseInt(process.env.PEER_COUNT || '3', 10);
const NODE_ID = process.env.HOSTNAME || 'local-node'; // HOSTNAME is set by StatefulSet
const broker = new MessageBroker(NODE_ID);
const paxos = new PaxosInstance(NODE_ID, PEER_COUNT);
broker.connect().then(() => {
broker.subscribe((message) => {
paxos.handleMessage(message);
});
});
paxos.on('broadcast', (message) => {
broker.broadcast(message);
});
paxos.on('send', (message) => {
broker.send(message); // Using broadcast for simplicity
});
第三步:Express.js 接口层
现在,我们需要一个 HTTP 接口来触发和查询锁的状态。Express.js 是实现这个轻量级 API 服务的理想选择。
// src/api/server.ts
import express from 'express';
import { PaxosInstance } from '../paxos/PaxosInstance';
// 这是一个简化的内存锁状态管理器
const lockState = new Map<string, { owner: string; expires: number }>();
export function createApiServer(paxos: PaxosInstance) {
const app = express();
app.use(express.json());
// 尝试获取锁
app.post('/locks/:lockId', (req, res) => {
const { lockId } = req.params;
const { owner, ttl } = req.body;
if (!owner || !ttl) {
return res.status(400).json({ error: 'owner and ttl are required' });
}
if (lockState.has(lockId) && lockState.get(lockId)!.expires > Date.now()) {
return res.status(409).json({ error: 'Lock already held' });
}
// 提议的值,包含了锁的完整信息
const proposalValue = {
action: 'ACQUIRE',
lockId,
owner,
expires: Date.now() + ttl * 1000,
};
// 监听共识结果
const consensusListener = (learnedValue: any) => {
// 确保是本次请求的共识结果
if (
learnedValue.action === 'ACQUIRE' &&
learnedValue.lockId === lockId &&
learnedValue.owner === owner
) {
console.log(`[API] Consensus reached for lock ${lockId}, owner ${owner}.`);
lockState.set(lockId, { owner: learnedValue.owner, expires: learnedValue.expires });
res.status(200).json({ status: 'acquired', details: learnedValue });
} else {
// 其他 Proposer 赢得了这次共识
res.status(409).json({ error: 'Consensus lost to another proposal', winner: learnedValue });
}
paxos.removeListener('consensus', consensusListener);
};
// 设置一个超时,防止请求永远挂起
const timeout = setTimeout(() => {
paxos.removeListener('consensus', consensusListener);
if (!res.headersSent) {
res.status(504).json({ error: 'Consensus timeout' });
}
}, 10000); // 10秒超时
paxos.once('consensus', (v) => {
clearTimeout(timeout);
consensusListener(v);
});
paxos.startProposal(proposalValue);
});
// 获取锁状态
app.get('/locks/:lockId', (req, res) => {
const { lockId } = req.params;
const lock = lockState.get(lockId);
if (lock && lock.expires > Date.now()) {
res.status(200).json(lock);
} else {
lockState.delete(lockId);
res.status(404).json({ error: 'Lock not found or expired' });
}
});
return app;
}
这个 API 的设计关键在于 POST /locks/:lockId
是异步的。客户端发起请求后,服务器立即启动 Paxos 提议流程,并监听 consensus
事件。一旦共识达成,服务器才向客户端返回成功或失败。
第四步:容器化与 EKS 部署
部署到 Kubernetes,特别是 EKS,是最后但至关重要的一步。因为 Paxos 节点需要稳定的身份和彼此发现的能力,StatefulSet
是不二之选。
Dockerfile:
FROM node:18-alpine
WORKDIR /usr/src/app
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build
EXPOSE 3000
# HOSTNAME 将由 Kubernetes 自动注入,作为我们的节点ID
CMD ["node", "dist/main.js"]
Kubernetes Manifests (lock-service.yaml):
我们需要一个 Headless Service
来为 StatefulSet
的每个 Pod 提供一个唯一的、可预测的 DNS 名称(例如 lock-service-0.lock-service-headless.default.svc.cluster.local
)。
apiVersion: v1
kind: Service
metadata:
name: lock-service-headless
labels:
app: lock-service
spec:
ports:
- port: 3000
name: api
clusterIP: None # 关键:这让它成为一个 Headless Service
selector:
app: lock-service
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: lock-service
spec:
serviceName: "lock-service-headless"
replicas: 3 # 必须是奇数个副本
selector:
matchLabels:
app: lock-service
template:
metadata:
labels:
app: lock-service
spec:
terminationGracePeriodSeconds: 10
containers:
- name: lock-service
image: your-account-id.dkr.ecr.your-region.amazonaws.com/lock-service:latest
ports:
- containerPort: 3000
name: api
env:
# HOSTNAME 环境变量由 K8s 自动为 pod 设置,
# 格式为 statefulset-name-ordinal, e.g., lock-service-0
- name: HOSTNAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: PEER_COUNT
value: "3" # 与 replicas 数量保持一致
- name: RABBITMQ_URL
value: "amqp://user:[email protected]:5672"
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "200m"
memory: "256Mi"
这个配置的几个关键点:
- StatefulSet: 确保 Pods 有稳定的网络标识(
lock-service-0
,lock-service-1
, …)和持久存储(如果需要的话)。我们的HOSTNAME
环境变量就利用了这个特性作为节点 ID。 - Headless Service: 配合
StatefulSet
,为服务发现提供了基础。虽然在这个 RabbitMQ 的实现里我们没有直接用到 Pod 间的 DNS,但这是StatefulSet
的标准实践。 - PEER_COUNT: 作为一个环境变量传入,让 Paxos 实例知道法定人数是多少。这个值必须与
replicas
数量一致。
部署流程很简单:先部署 RabbitMQ(可以使用 Bitnami Helm chart),然后构建 Docker 镜像推送到 ECR,最后 kubectl apply -f lock-service.yaml
。
部署后的架构图如下:
graph TD subgraph AWS EKS Cluster subgraph lock-service StatefulSet Pod0["Pod: lock-service-0\n(Express.js + Paxos)"] Pod1["Pod: lock-service-1\n(Express.js + Paxos)"] Pod2["Pod: lock-service-2\n(Express.js + Paxos)"] end subgraph RabbitMQ Cluster RMQ("RabbitMQ Topic Exchange\n'paxos_locks'") end Pod0 -- "Publish (Prepare)" --> RMQ Pod1 -- "Publish (Prepare)" --> RMQ Pod2 -- "Publish (Prepare)" --> RMQ RMQ -- "Consume (Broadcast)" --> Pod0 RMQ -- "Consume (Broadcast)" --> Pod1 RMQ -- "Consume (Broadcast)" --> Pod2 K8sService[("K8s Service\n(Load Balancer)")] end Client[("Client Application")] -- "HTTP POST /locks/res-1" --> K8sService K8sService --> Pod0
当一个客户端请求到达 lock-service-0
时,lock-service-0
作为 Proposer 发起 PREPARE
消息。这个消息通过 RabbitMQ 广播给包括自己在内的所有三个节点。所有节点作为 Acceptor 回复 PROMISE
(同样通过广播)。lock-service-0
收集到足够多的 PROMISE
后,再广播 PROPOSE
,最终达成共识。
遗留问题与未来迭代
这个实现成功地验证了我们的设想,但距离一个生产级的服务还有距离。
- 性能瓶颈: RabbitMQ 带来的延迟是最大的问题。对于需要低延迟锁的场景,这个架构并不合适。gRPC 或其他二进制 RPC 框架会是更好的选择。
- 活锁 (Livelock) 风险: Classic Paxos 在两个 Proposer 持续交替发起更高编号提案的极端情况下,可能导致活锁,即永远无法达成共识。Multi-Paxos 通过选举一个稳定的 Leader 作为唯一的 Proposer 来解决这个问题,这是后续优化的第一步。
- 日志与快照: 当前实现是内存态的,节点重启后状态会丢失。真正的复制状态机需要持久化 Paxos 日志。当日志过长时,还需要引入快照机制来压缩状态,否则节点重启恢复会非常缓慢。
- 成员变更: 目前的集群成员是静态配置的。一个生产系统需要支持动态地增加或减少节点,这需要实现 Paxos 的成员变更算法,复杂度很高。
尽管存在这些局限,这次实践的价值在于,它迫使我们深入 Paxos 的每一个细节,并探索了在云原生环境下,利用消息队列实现共识算法这一非典型但有趣的技术路径。它证明了即使是复杂的分布式共识,也可以用我们熟悉的工具栈,通过巧妙的架构设计来“拼装”出来,这本身就是一种工程上的胜利。