一个常见的 Elixir 架构场景是利用 GenServer
为每个 WebSocket 连接维护一个进程。这种模型提供了出色的隔离性,当一个连接的进程崩溃时, благодаря Supervisor
的存在,它可以被迅速重启,而不会影响到其他数万个并发连接。但这种隔离性和自愈能力也隐藏了一个微妙的数据一致性陷阱。
假设一个 WebSocket 进程(ChannelProcess
)的职责是接收来自客户端的消息,然后通过一个外部 HTTP API 将其转发出去,最后将 API 的响应回推给客户端。如果这个外部 API 暂时不可用,ChannelProcess
在尝试调用时可能会因为超时而崩溃。Supervisor 会尽职尽责地重启它,但那个正在处理中的、未能成功转发的消息呢?它随着进程的崩溃而永远消失了。这在需要保证消息至少一次送达(at-least-once delivery)的业务场景中是不可接受的。
方案权衡:内存重试与持久化死信队列
面对这个问题,最初的直觉可能是在 ChannelProcess
内部实现一个简单的内存重试机制。当 API 调用失败时,不立即让进程崩溃,而是将失败的消息暂存在进程状态(state)里,使用 Process.send_after/3
进行延时重试。
方案A:内存重试
- 优点: 实现简单,无外部依赖,对于短暂的网络抖动非常有效。
- 缺点:
- 数据易失性: 如果整个应用重启或节点宕机,所有进程状态中的待重试消息将全部丢失。
- 状态膨胀: 如果外部 API 长时间不可用,
ChannelProcess
的状态会持续增长,导致内存占用失控。 - 职责耦合: 消息的可靠传递逻辑与核心业务逻辑紧密耦合在同一个进程中,违反了单一职责原则。
- 背压失效: 大量消息积压在成千上万个独立的 Channel 进程中,系统缺乏一个全局视角来感知压力,无法实施有效的反压(Backpressure)策略。
方案B:外部持久化死信队列(Dead Letter Queue, DLQ)
这个方案的核心思想是将失败消息的处理职责从 ChannelProcess
中剥离出来。当 ChannelProcess
遇到可重试的失败时,它不再自己持有消息,而是将其发送到一个专用的、持久化的 DLQ 服务中。这个服务负责存储、调度和执行重试逻辑。
- 优点:
- 数据持久性: 消息被写入磁盘或数据库,即使应用重启也不会丢失。
- 责任分离:
ChannelProcess
只负责其核心业务,可靠性投递由 DLQ 子系统保证。 - 集中式控制: 所有失败消息集中管理,便于监控、告警和实施全局策略(如熔断、限流)。
- 反压机制: 当 DLQ 积压严重时,可以向上游(
ChannelProcess
)发出信号,暂停接收新消息,从而保护整个系统。
- 缺点:
- 架构复杂性增加: 需要引入新的组件,包括持久化存储和重试调度器。
- 性能开销: 消息的入队和出队涉及 I/O 操作,相比纯内存操作会有额外的延迟。
在生产环境中,数据的可靠性通常是首要考虑的。因此,方案 B 尽管更复杂,却是构建一个健壮系统的正确选择。我们将着手实现一个基于 Elixir/OTP 原语的、轻量级但功能完备的持久化 DLQ 系统。
架构设计与实现
我们的 DLQ 系统将由以下几个核心组件构成:
-
DLQ.Broker
: 一个GenServer
,作为系统的统一入口。它接收来自业务进程的“死信”,将其写入持久化存储,并通知调度器。 -
DLQ.Store
: 一个行为(Behaviour),定义了持久化存储的接口。我们将实现一个简单的基于文件系统的版本(DLQ.Store.File
),但架构上允许轻松替换为 Mnesia 或数据库实现。 -
DLQ.Scheduler
: 一个调度进程,定期检查存储中是否有待处理的消息,并为它们启动重试任务。 -
DLQ.RetrySupervisor
: 一个DynamicSupervisor
,为每个重试任务动态地创建RetryWorker
进程,以实现重试逻辑的隔离。 -
DLQ.RetryWorker
: 一个GenServer
,负责执行单个消息的重试逻辑,包含指数退避策略。
下面是这个架构的交互流程图:
sequenceDiagram participant ChannelProcess participant DLQ.Broker participant DLQ.Store participant DLQ.Scheduler participant DLQ.RetrySupervisor participant RetryWorker ChannelProcess->>+DLQ.Broker: enqueue({:http_request, ...}) DLQ.Broker->>+DLQ.Store: write(message) DLQ.Store-->>-DLQ.Broker: {:ok, message_id} DLQ.Broker-->>-ChannelProcess: {:ok, message_id} loop Periodic Check DLQ.Scheduler->>+DLQ.Store: fetch_retryable() DLQ.Store-->>-DLQ.Scheduler: [message1, message2] end DLQ.Scheduler->>+DLQ.RetrySupervisor: start_worker(message1) DLQ.RetrySupervisor->>+RetryWorker: start_link(message1) RetryWorker-->>-DLQ.RetrySupervisor: {:ok, pid} DLQ.RetrySupervisor-->>-DLQ.Scheduler: {:ok, pid} RetryWorker->>External API: Perform HTTP Request alt Request Successful RetryWorker->>+DLQ.Store: delete(message1.id) DLQ.Store-->>-RetryWorker: :ok Note over RetryWorker: Process terminates else Request Fails RetryWorker->>+DLQ.Store: update_retry_count(message1.id) DLQ.Store-->>-RetryWorker: :ok Note over RetryWorker: Process terminates after scheduling next retry (implicitly) end
1. 配置与应用启动
首先,定义应用的配置,这使得重试策略、存储路径等参数在生产环境中易于调整。
config/config.exs
:
import Config
config :my_app, MyApp.DLQ,
# 使用基于文件的存储模块
store_module: MyApp.DLQ.Store.File,
# 死信存储目录
store_path: "/var/data/dlq",
# 调度器轮询间隔(毫秒)
scheduler_interval_ms: 5_000,
# 最大重试次数
max_retries: 5,
# 初始重试延迟(毫秒)
initial_backoff_ms: 1_000,
# 指数退避的乘数因子
backoff_multiplier: 2
在应用启动时,我们需要将 DLQ 的核心组件添加到主应用的监督树中。
lib/my_app/application.ex
:
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# ... 其他服务
{MyApp.DLQ, name: MyApp.DLQ.Broker}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
lib/my_app/dlq.ex
:
defmodule MyApp.DLQ do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, :ok, opts)
end
@impl true
def init(:ok) do
# 从配置中读取 DLQ 设置
config = Application.get_env(:my_app, __MODULE__, [])
children = [
# 动态 감독자, 用于管理重试工作进程
{DynamicSupervisor, strategy: :one_for_one, name: MyApp.DLQ.RetrySupervisor},
# 存储模块,具体实现由配置决定
{config[:store_module], name: MyApp.DLQ.Store},
# 调度器进程
{MyApp.DLQ.Scheduler, name: MyApp.DLQ.Scheduler},
# Broker 进程,作为公共 API 入口
{MyApp.DLQ.Broker, name: MyApp.DLQ.Broker}
]
Supervisor.init(children, strategy: :one_for_one)
end
# 公共 API
def enqueue(payload) do
GenServer.call(MyApp.DLQ.Broker, {:enqueue, payload})
end
end
这里的监督树设计非常关键。我们将 RetrySupervisor
放在前面,确保在其他组件启动前,管理 worker 的基础已经就绪。
2. 持久化存储层 (DLQ.Store.File
)
为了让示例可独立运行,我们实现一个简单的文件存储。每个死信消息被序列化并存储为一个独立的文件。文件名包含时间戳和唯一ID,便于排序和查找。
lib/my_app/dlq/store/file.ex
:
defmodule MyApp.DLQ.Store.File do
use GenServer
require Logger
# --- Public API ---
def start_link(opts) do
name = Keyword.fetch!(opts, :name)
GenServer.start_link(__MODULE__, :ok, name: name)
end
def write(message) do
GenServer.call(__MODULE__, {:write, message})
end
def fetch_retryable(limit) do
GenServer.call(__MODULE__, {:fetch_retryable, limit})
end
def update(message_id, attrs) do
GenServer.call(__MODULE__, {:update, message_id, attrs})
end
def delete(message_id) do
GenServer.cast(__MODULE__, {:delete, message_id})
end
# --- GenServer Callbacks ---
@impl true
def init(:ok) do
config = Application.get_env(:my_app, MyApp.DLQ, [])
path = config[:store_path]
File.mkdir_p!(path)
Logger.info("DLQ File Store initialized at path: #{path}")
{:ok, %{path: path, locked_files: MapSet.new()}}
end
@impl true
def handle_call({:write, message}, _from, state) do
# 生成一个基于时间的、大致有序的唯一ID
id = "#{DateTime.to_unix(DateTime.utc_now(), :nanosecond)}-#{Base.encode16(:crypto.strong_rand_bytes(8))}"
data = %{
id: id,
payload: message,
attempt: 0,
status: :pending,
# 初始时,下次重试时间就是现在
next_retry_at: DateTime.utc_now()
}
file_path = Path.join(state.path, id)
case :erlang.term_to_binary(data) |> File.write(file_path, [:binary, :write]) do
:ok -> {:reply, {:ok, data}, state}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
@impl true
def handle_call({:fetch_retryable, limit}, _from, state) do
now = DateTime.utc_now()
files =
state.path
|> File.ls!()
|> Enum.reject(&MapSet.member?(state.locked_files, &1))
|> Enum.map(&Path.join(state.path, &1))
# 读取并反序列化所有未锁定的文件
|> Enum.map_reduce([], fn path, acc ->
case File.read(path) do
{:ok, binary} ->
try do
message = :erlang.binary_to_term(binary)
{message, acc}
rescue
_ -> {nil, acc} # 忽略损坏的文件
end
_ -> {nil, acc}
end
end)
|> elem(0)
|> Enum.filter(fn
%{status: :pending, next_retry_at: next_retry} -> DateTime.compare(next_retry, now) != :gt
_ -> false
end)
# 按下次重试时间排序,优先处理早期的
|> Enum.sort_by(& &1.next_retry_at, DateTime)
|> Enum.take(limit)
# 锁定将要处理的文件,防止被其他调度周期重复获取
locked_ids = Enum.map(files, & &1.id)
new_locked_files = Enum.reduce(locked_ids, state.locked_files, &MapSet.put(&2, &1))
{:reply, {:ok, files}, %{state | locked_files: new_locked_files}}
end
@impl true
def handle_call({:update, message_id, attrs}, _from, state) do
file_path = Path.join(state.path, message_id)
case File.read(file_path) do
{:ok, binary} ->
message = :erlang.binary_to_term(binary)
updated_message = Map.merge(message, Map.new(attrs))
:erlang.term_to_binary(updated_message) |> File.write(file_path, [:binary, :write])
# 更新后解锁文件
new_locked_files = MapSet.delete(state.locked_files, message_id)
{:reply, :ok, %{state | locked_files: new_locked_files}}
{:error, reason} ->
# 文件可能已被成功处理并删除
Logger.warning("Failed to update DLQ message #{message_id}: #{inspect(reason)}")
{:reply, {:error, reason}, state}
end
end
@impl true
def handle_cast({:delete, message_id}, state) do
file_path = Path.join(state.path, message_id)
File.rm(file_path)
# 删除后解锁
new_locked_files = MapSet.delete(state.locked_files, message_id)
{:noreply, %{state | locked_files: new_locked_files}}
end
end
关键设计点:
- 串行化访问: 使用
GenServer
来序列化所有文件系统操作,避免了复杂的竞争条件。 - 锁定机制: 通过
locked_files
集合,确保一个消息在被一个RetryWorker
处理期间,不会被Scheduler
再次拾取。这是一个简单的分布式锁的本地模拟。 - 健壮性: 文件读写和反序列化都包含基本的错误处理。
3. 调度器与重试工作者 (Scheduler
& RetryWorker
)
Scheduler
的职责很简单:定期轮询 Store
,并为每个可重试的消息启动一个 RetryWorker
。
lib/my_app/dlq/scheduler.ex
:
defmodule MyApp.DLQ.Scheduler do
use GenServer
require Logger
@fetch_limit 100
def start_link(opts) do
name = Keyword.fetch!(opts, :name)
GenServer.start_link(__MODULE__, :ok, name: name)
end
@impl true
def init(:ok) do
config = Application.get_env(:my_app, MyApp.DLQ, [])
interval = config[:scheduler_interval_ms]
schedule_work()
{:ok, %{interval: interval}}
end
@impl true
def handle_info(:work, state) do
Logger.debug("DLQ Scheduler waking up to find work...")
case MyApp.DLQ.Store.fetch_retryable(@fetch_limit) do
{:ok, messages} ->
if Enum.any?(messages) do
Logger.info("Found #{length(messages)} retryable messages in DLQ.")
for message <- messages do
# 为每个消息启动一个独立的重试进程
DynamicSupervisor.start_child(
MyApp.DLQ.RetrySupervisor,
{MyApp.DLQ.RetryWorker, message}
)
end
end
{:error, reason} ->
Logger.error("Failed to fetch retryable messages from DLQ store: #{inspect(reason)}")
end
schedule_work()
{:noreply, state}
end
defp schedule_work do
Process.send_after(self(), :work, Application.get_env(:my_app, MyApp.DLQ)[:scheduler_interval_ms])
end
end
RetryWorker
是真正执行重试逻辑的地方。每个 worker 只关心一个消息,执行完它的生命周期(成功或最终失败)后就退出。
lib/my_app/dlq/retry_worker.ex
:
defmodule MyApp.DLQ.RetryWorker do
use GenServer
require Logger
def start_link(message) do
GenServer.start_link(__MODULE__, message, [])
end
@impl true
def init(message) do
# 立即开始第一次尝试
Process.send(self(), :retry, [])
{:ok, message}
end
@impl true
def handle_info(:retry, message) do
# 这里的 dispatch/1 是一个示例函数,它应该根据 payload 的类型
# 调用相应的业务逻辑。
case dispatch(message.payload) do
:ok ->
Logger.info("Successfully processed DLQ message #{message.id}")
MyApp.DLQ.Store.delete(message.id)
# 成功后,进程正常终止
{:stop, :normal, message}
{:error, _reason} ->
handle_failure(message)
end
end
# 模拟业务分发逻辑
# 在真实项目中,这里会更复杂,可能需要模式匹配 payload 结构
defp dispatch({:http_request, opts}) do
# 假设这是执行外部调用的函数
# MyApp.HTTPClient.post(opts.url, opts.body, opts.headers)
# 为了测试,我们让它随机成功或失败
if :rand.uniform() > 0.5 do
:ok
else
{:error, :timeout}
end
end
defp handle_failure(message) do
config = Application.get_env(:my_app, MyApp.DLQ)
next_attempt = message.attempt + 1
if next_attempt > config[:max_retries] do
# 超过最大重试次数,标记为永久失败(poisonous)
Logger.error("DLQ message #{message.id} exceeded max retries. Marking as failed.")
MyApp.DLQ.Store.update(message.id, status: :failed, finished_at: DateTime.utc_now())
{:stop, :normal, message}
else
# 计算下一次重试的延迟时间(指数退避)
delay = config[:initial_backoff_ms] * :math.pow(config[:backoff_multiplier], message.attempt) |> round()
next_retry_time = DateTime.add(DateTime.utc_now(), delay, :millisecond)
Logger.warn("DLQ message #{message.id} failed, attempt #{next_attempt}. Retrying at #{next_retry_time}.")
MyApp.DLQ.Store.update(message.id,
attempt: next_attempt,
status: :pending,
next_retry_at: next_retry_time
)
# 更新状态后,进程终止。调度器会在未来某个时间点重新拾取它。
{:stop, :normal, message}
end
end
end
架构洞察:
- 隔离性:
DynamicSupervisor
为每个重试任务创建一个进程。这意味着一个“有毒”的消息(poison pill message),即使在重试逻辑中导致 worker 崩溃,也绝不会影响到其他消息的重试。 - 无状态 Worker:
RetryWorker
本身是无状态的。它从init/1
接收消息,处理完毕后就退出。所有的状态变化(如重试次数、下次重试时间)都被持久化回Store
。这使得整个系统非常健壮。即使在RetryWorker
处理过程中整个节点崩溃,当节点重启后,Scheduler
仍能从Store
中找到这个未完成的消息并再次启动重试。 - 反压的基石: 虽然我们没有显式实现反压,但这个架构为它奠定了基础。我们可以轻易地在
DLQ.Broker
的enqueue/1
调用中增加逻辑:如果DLQ.Store
中的待处理消息超过某个阈值,enqueue
调用可以开始阻塞或直接返回错误,从而向上游的ChannelProcess
传递压力。
遗留问题与未来迭代路径
这个实现提供了一个健壮的 DLQ 基础,但在一个大规模的生产环境中,仍有几个方面需要考虑和优化。
- 存储层的瓶颈: 基于单进程访问本地文件的
DLQ.Store.File
无法在多节点环境中工作,并且在高并发写入时会成为性能瓶颈。一个自然的演进路径是将其替换为基于 Mnesia 的分布式存储,或者一个专门的数据库表(如 PostgreSQL),利用其事务和索引能力。 - “毒丸”消息处理: 当前实现中,超过最大重试次数的消息仅被标记为
failed
,但仍保留在存储中。需要一个归档或清理机制来处理这些消息,例如将它们移动到另一个长期存储中供人工分析。 - 可观测性: 系统目前只输出了日志。为了更好地进行生产环境运维,需要引入遥测(Telemetry)事件,暴露关键指标,如队列深度、消息处理成功率、平均重试次数等,并接入 Prometheus 和 Grafana 进行监控。
- 动态配置: 重试策略(次数、退避算法)目前是全局静态配置。更高级的系统可能会允许基于消息类型或元数据来动态决定重试策略。
- 精细化反压: 可以实现一个更智能的反压机制。例如,
DLQ.Broker
可以定期查询DLQ.Store
的队列深度,并动态调整其GenServer.call
的超时时间,或者使用类似GenStage
的模型来构建一个真正具备反压能力的数据处理管道。