一个看似简单的需求摆在了面前:我们需要构建一个服务,接收非结构化的文本文档,利用自然语言处理(NLP)模型进行实体识别、情感分析等一系列处理,最后将结构化的结果存储起来供下游查询。但其中附加了几个严苛的生产级约束:第一,整个处理过程必须具备金融级别的可审计性,任何一次处理、模型更迭、甚至是失败尝试,都必须有迹可循;第二,系统必须是高可用的,NLP处理模块的故障或性能瓶颈不能阻塞整个文档接收流程;第三,架构必须支持未来业务的扩展,例如用新的NLP模型重新处理历史数据,或增加新的数据消费者。
一个直接的同步RESTful API调用 spaCy
模型进行处理,然后将结果写入 PostgreSQL
的方案,在第一轮评审中就被否决了。这种设计耦合度太高,同步调用意味着前端的延迟完全取决于后端最慢的处理环节——NLP。更重要的是,它完全无法满足“可审计性”和“可回溯性”的要求。如果数据被错误地更新,我们将永远失去变更前的状态和变更原因。
第二个方案是引入消息队列的异步处理模型。一个API服务接收文档,将其放入队列,然后由一个独立的NLP处理服务消费。这解决了服务解耦和异步处理的问题。但是,它仍然没有解决核心的审计问题。数据库中的记录依然是“当前状态”的快照,我们知道“是什么”,但不知道“为什么会变成这样”。当需要用新模型重新处理上百万份历史文档时,操作会变得异常复杂且危险,本质上是一次大规模的数据库覆盖操作。
最终,我们选择了事件溯源(Event Sourcing, ES)与命令查询职责分离(CQRS)的架构模式。这并非一个流行的银弹,而是一个经过深思熟虑的权衡。其核心思想是,系统中唯一的数据源(Source of Truth)不再是状态本身,而是一系列描述状态变更的、不可变的“事件”日志。这天然满足了审计需求。而 PostgreSQL
的事务能力和强大的 JSONB
类型,使其成为事件存储(Event Store)的理想选择。NATS JetStream
则作为事件在微服务间传递的高性能、高可靠性的总线。spaCy
负责核心的领域逻辑——NLP处理。
架构决策与数据流
我们的系统被拆分为几个职责明确的微服务。它们之间通过事件进行通信,而非直接的API调用。
graph TD subgraph "客户端" Client[外部调用方] end subgraph "写模型 (Commands)" Client -- 1. HTTP POST /documents --> CommandService[命令服务 Go] CommandService -- 2. 验证并生成 DocumentReceived 事件 --> PG_EventStore[(PostgreSQL 事件存储)] PG_EventStore -- 3. AFTER COMMIT Trigger --> NATS[NATS JetStream] end subgraph "NLP处理 (Business Logic)" NATS -- 4. 消费 DocumentReceived 事件 --> NlpProcessor[NLP处理服务 Python/spaCy] NlpProcessor -- 5. 执行NLP分析 --> NlpProcessor NlpProcessor -- 6. 生成 NlpCompleted/NlpFailed 事件 --> PG_EventStore end subgraph "读模型 (Queries)" NATS -- 7. 消费所有相关事件 --> Projector[投影服务 Go] Projector -- 8. 更新/插入读模型 --> PG_ReadModel[(PostgreSQL 读模型)] QueryService[查询服务 Go] -- 9. HTTP GET /documents/:id --> PG_ReadModel Client -- 10. 查询结果 --> QueryService end style PG_EventStore fill:#f9f,stroke:#333,stroke-width:2px style PG_ReadModel fill:#ccf,stroke:#333,stroke-width:2px
- 命令服务 (Command Service): 唯一接收外部写请求的入口。它负责验证命令的合法性,但不执行任何业务逻辑。它的唯一职责是生成一个代表意图的事件(例如
DocumentReceived
)并将其原子性地写入PostgreSQL
的事件存储中。 - 事件存储 (Event Store): 这是系统的核心。我们选择在
PostgreSQL
中用一张简单的表来实现。这张表只允许追加,不允许修改或删除。每次写入事件时,我们会检查业务实体(在这里是文档)的版本号,以实现乐观并发控制。 - 事件总线 (NATS JetStream): 事件成功持久化到数据库后,我们需要通知其他服务。与其让命令服务同时写入数据库和发布消息(这会引入分布式事务的复杂性),我们采用更可靠的事务性发件箱模式(Transactional Outbox)。事件被写入同一事务中的
outbox
表,一个独立的进程或数据库触发器负责将新事件可靠地推送到NATS
。 - NLP处理服务 (NLP Processor): 这是一个后台工作者。它订阅
NATS
中它感兴趣的事件(DocumentReceived
)。接收到事件后,它执行耗时的spaCy
分析,然后将结果封装成一个新的事件(如NlpCompleted
或NlpFailed
),同样写入事件存储。 - 投影服务 (Projector): 它订阅所有相关的事件流,并将这些事件“投影”成一个为查询优化的“读模型”。这个读模型可以是一个或多个反规范化的
PostgreSQL
表。它的存在是为了将写模型(事件流)和读模型(状态视图)解耦。 - 查询服务 (Query Service): 一个非常轻量级的服务,直接从读模型中读取数据并返回给客户端。它不包含任何业务逻辑。
核心实现:PostgreSQL作为事件存储
事件存储的表结构设计至关重要。它必须高效、可扩展且易于查询。
-- events.sql: 事件存储核心表结构
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- 用于存储所有事件的单一表
CREATE TABLE events (
-- 全局唯一的事件ID
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
-- 聚合根ID,这里是文档的ID
stream_id UUID NOT NULL,
-- 事件在流中的版本号,用于乐观锁
version BIGINT NOT NULL,
-- 事件类型,例如 'DocumentReceived', 'NlpCompleted'
event_type VARCHAR(255) NOT NULL,
-- 事件负载,存储具体数据
payload JSONB NOT NULL,
-- 事件元数据,例如 correlation_id, causation_id
metadata JSONB DEFAULT '{}'::jsonb,
-- 事件发生的时间戳
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- 确保每个聚合根的版本号是唯一的
CONSTRAINT events_stream_id_version_unique UNIQUE (stream_id, version)
);
-- 常用查询的索引
CREATE INDEX idx_events_stream_id ON events (stream_id);
CREATE INDEX idx_events_event_type ON events (event_type);
CREATE INDEX idx_events_created_at ON events (created_at);
COMMENT ON COLUMN events.stream_id IS '关联的业务实体ID,即聚合根ID';
COMMENT ON COLUMN events.version IS '事件版本号,用于乐观并发控制';
COMMENT ON COLUMN events.payload IS '事件的具体数据,以JSON格式存储';
COMMENT ON COLUMN events.metadata IS '元数据,用于追踪和调试';
这里的 stream_id
代表一个业务实体(比如一个文档),而 version
字段是实现乐观锁的关键。当一个服务要为一个文档追加新事件时,它必须先读取该文档的当前版本号,然后在写入新事件时将版本号加一。如果此时有另一个进程已经写入了新事件,数据库的 UNIQUE
约束会触发,操作失败,应用层需要重试。
命令服务:原子性地追加事件
命令服务的实现(我们使用Go)必须确保事件写入的原子性。
// command-service/main.go
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq"
"github.com/nats-io/nats.go"
)
// 全局变量,在生产环境中应通过配置管理
var db *sql.DB
var nc *nats.Conn
var js nats.JetStreamContext
const eventStoreTableName = "events"
// Event 定义了事件的基本结构
type Event struct {
StreamID uuid.UUID `json:"stream_id"`
Version int64 `json:"version"`
EventType string `json:"event_type"`
Payload json.RawMessage `json:"payload"`
}
// DocumentReceivedPayload 是 DocumentReceived 事件的具体内容
type DocumentReceivedPayload struct {
Content string `json:"content"`
Source string `json:"source"`
ReceivedAt time.Time `json:"received_at"`
}
func main() {
// ... 初始化DB和NATS连接 ...
var err error
db, err = sql.Open("postgres", "user=... password=... dbname=... sslmode=disable")
if err != nil {
log.Fatalf("Failed to connect to PostgreSQL: %v", err)
}
// ... NATS JetStream setup ...
nc, err = nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
js, err = nc.JetStream()
if err != nil {
log.Fatalf("Failed to get JetStream context: %v", err)
}
// 创建一个名为 'EVENTS' 的流
js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"events.>"},
})
http.HandleFunc("/documents", handleReceiveDocument)
log.Println("Command service listening on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func handleReceiveDocument(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var reqBody struct {
Content string `json:"content"`
Source string `json:"source"`
}
if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
// 这是一个新文档,所以我们生成一个新的UUID
docID := uuid.New()
payload, _ := json.Marshal(DocumentReceivedPayload{
Content: reqBody.Content,
Source: reqBody.Source,
ReceivedAt: time.Now().UTC(),
})
event := Event{
StreamID: docID,
Version: 1, // 新文档的第一个事件,版本号为1
EventType: "DocumentReceived",
Payload: payload,
}
if err := appendToStream(context.Background(), event); err != nil {
// 这里的错误处理很关键
// 可能是版本冲突,也可能是数据库连接问题
log.Printf("Error appending event: %v", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"document_id": docID.String()})
}
// appendToStream 将事件原子性地写入数据库并发布到NATS
func appendToStream(ctx context.Context, event Event) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("could not begin transaction: %w", err)
}
defer tx.Rollback() // Rollback is a no-op if Commit succeeds
query := fmt.Sprintf(
`INSERT INTO %s (stream_id, version, event_type, payload) VALUES ($1, $2, $3, $4)`,
eventStoreTableName,
)
_, err = tx.ExecContext(ctx, query, event.StreamID, event.Version, event.EventType, event.Payload)
if err != nil {
// 错误可能由UNIQUE约束(version冲突)引起
return fmt.Errorf("could not insert event: %w", err)
}
// 将事件发布到NATS
eventData, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("could not marshal event for NATS: %w", err)
}
// 主题结构: events.<event_type>.<stream_id>
subject := fmt.Sprintf("events.%s.%s", event.EventType, event.StreamID.String())
_, err = js.Publish(subject, eventData)
if err != nil {
// 如果NATS发布失败,事务会回滚,保证数据一致性
return fmt.Errorf("could not publish event to NATS: %w", err)
}
// 只有当所有操作都成功时才提交事务
return tx.Commit()
}
注意 appendToStream
函数。它在一个数据库事务中执行 INSERT
和 js.Publish
。这是一个简化的实现。在真实项目中,为了极致的可靠性,我们不会在事务中直接发布到 NATS
。我们会写入一个 outbox
表,然后由另一个进程轮询该表并将事件发布出去(事务性发件箱模式),这样可以保证即使 NATS
服务短暂不可用,事件最终也一定会被发布。但对于大多数场景,将发布操作放在事务的最后,并在失败时回滚,已经足够健壮。
NLP处理服务:幂等的事件消费者
NLP
处理服务是核心业务逻辑的承载者。它必须被设计为幂等的,因为消息系统(如NATS)通常提供“至少一次”的投递保证,意味着同一个事件可能被重复消费。
# nlp-processor/main.py
import asyncio
import json
import logging
import os
import spacy
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from uuid import UUID
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 加载spaCy模型。这是一个耗时操作,应在服务启动时完成。
try:
nlp = spacy.load("en_core_web_sm")
logging.info("spaCy model 'en_core_web_sm' loaded successfully.")
except OSError:
logging.error("Model 'en_core_web_sm' not found. Please run 'python -m spacy download en_core_web_sm'")
exit(1)
# 数据库连接
DB_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost/db_name")
engine = create_engine(DB_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class EventStore:
"""一个简单的用于与事件存储交互的类"""
def get_current_version(self, session, stream_id: UUID) -> int:
query = text("SELECT COALESCE(MAX(version), 0) FROM events WHERE stream_id = :stream_id")
result = session.execute(query, {"stream_id": stream_id}).scalar_one()
return int(result)
def append_event(self, session, stream_id: UUID, event_type: str, payload: dict):
current_version = self.get_current_version(session, stream_id)
next_version = current_version + 1
# 这是一个关键步骤,确保幂等性。
# 如果已经存在一个同类型的事件在当前版本之后,我们假设它已被处理。
check_query = text("""
SELECT 1 FROM events
WHERE stream_id = :stream_id AND version > :current_version AND event_type = :event_type
""")
exists = session.execute(check_query, {
"stream_id": stream_id,
"current_version": current_version,
"event_type": event_type
}).first()
if exists:
logging.warning(f"Event '{event_type}' for stream {stream_id} seems to be already processed. Skipping.")
return
query = text("""
INSERT INTO events (stream_id, version, event_type, payload)
VALUES (:stream_id, :version, :event_type, :payload)
""")
session.execute(query, {
"stream_id": stream_id,
"version": next_version,
"event_type": event_type,
"payload": json.dumps(payload)
})
logging.info(f"Appended event '{event_type}' v{next_version} for stream {stream_id}")
event_store = EventStore()
async def message_handler(msg):
subject = msg.subject
data = json.loads(msg.data.decode())
logging.info(f"Received a message on '{subject}'")
stream_id = UUID(data['StreamID'])
content = data['Payload']['content']
db_session = SessionLocal()
try:
# 执行NLP处理
doc = nlp(content)
entities = [{"text": ent.text, "label": ent.label_} for ent in doc.ents]
result_payload = {
"entities": entities,
"model_version": nlp.meta['version']
}
# 将处理结果作为新事件写入
event_store.append_event(db_session, stream_id, "NlpCompleted", result_payload)
db_session.commit()
except Exception as e:
logging.error(f"Failed to process message for stream {stream_id}: {e}", exc_info=True)
db_session.rollback()
# 产生一个失败事件
failure_payload = {"error": str(e), "original_event": data}
try:
event_store.append_event(db_session, stream_id, "NlpFailed", failure_payload)
db_session.commit()
except Exception as commit_e:
logging.error(f"Failed to even commit failure event for stream {stream_id}: {commit_e}")
db_session.rollback()
finally:
db_session.close()
async def main():
nc = NATS()
try:
await nc.connect(servers=["nats://localhost:4222"])
logging.info("Connected to NATS.")
except ErrNoServers as e:
logging.error(f"Could not connect to NATS: {e}")
return
# 订阅 'DocumentReceived' 事件
# 使用持久化队列订阅,确保服务重启后能从上次中断的地方继续
await nc.jetstream().subscribe("events.DocumentReceived.>", durable="nlp-processor", cb=message_handler)
logging.info("Subscribed to 'events.DocumentReceived.>'")
# 保持运行
try:
await asyncio.Future()
except asyncio.CancelledError:
await nc.close()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
EventStore.append_event
中的幂等性检查是一种简单的实现方式。在更复杂的场景中,可能需要在读模型中维护一个已处理消息ID的集合,并在处理前进行检查。错误处理也很重要:当NLP处理失败时,我们不是简单地丢弃消息,而是产生一个 NlpFailed
事件。这使得系统的状态变更依然是可追踪的,我们可以后续对这些失败的文档进行分析或重试。
投影服务:构建查询友好的读模型
投影服务的工作是“扁平化”事件流。它消费事件,然后更新一个或多个反规范化的表,这些表专门为快速查询而设计。
-- read_models.sql: 读模型表结构
CREATE TABLE documents_summary (
id UUID PRIMARY KEY,
source VARCHAR(255),
content TEXT,
status VARCHAR(50) NOT NULL DEFAULT 'PENDING',
entities JSONB,
model_version VARCHAR(50),
last_processed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX idx_documents_summary_status ON documents_summary(status);
这个 documents_summary
表就是我们的读模型。它包含了我们希望快速查询的所有信息。投影服务的Go实现如下:
// projector-service/main.go
package main
import (
"context"
"database/sql"
"encoding/json"
"log"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq"
"github.com/nats-io/nats.go"
)
// ... DB 和 NATS 连接设置 ...
type Event struct {
StreamID uuid.UUID `json:"StreamID"`
EventType string `json:"EventType"`
Payload json.RawMessage `json:"Payload"`
}
func main() {
// ... 初始化DB和NATS ...
js.Subscribe("events.>", func(msg *nats.Msg) {
var event Event
if err := json.Unmarshal(msg.Data, &event); err != nil {
log.Printf("Error unmarshalling event: %v", err)
msg.Nak() // 告诉NATS消息处理失败,稍后重试
return
}
ctx := context.Background()
if err := handleEvent(ctx, event); err != nil {
log.Printf("Error handling event %s for stream %s: %v", event.EventType, event.StreamID, err)
msg.Nak()
return
}
msg.Ack() // 确认消息处理成功
}, nats.Durable("document-projector"), nats.ManualAck())
log.Println("Projector service running...")
select {} // Block forever
}
func handleEvent(ctx context.Context, event Event) error {
switch event.EventType {
case "DocumentReceived":
return handleDocumentReceived(ctx, event)
case "NlpCompleted":
return handleNlpCompleted(ctx, event)
case "NlpFailed":
return handleNlpFailed(ctx, event)
default:
log.Printf("Ignoring unknown event type: %s", event.EventType)
return nil
}
}
func handleDocumentReceived(ctx context.Context, event Event) error {
var payload struct {
Content string `json:"content"`
Source string `json:"source"`
ReceivedAt time.Time `json:"received_at"`
}
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
query := `
INSERT INTO documents_summary (id, source, content, status, created_at, updated_at)
VALUES ($1, $2, $3, 'PROCESSING', $4, $4)
ON CONFLICT (id) DO NOTHING;`
_, err := db.ExecContext(ctx, query, event.StreamID, payload.Source, payload.Content, payload.ReceivedAt)
return err
}
func handleNlpCompleted(ctx context.Context, event Event) error {
var payload struct {
Entities json.RawMessage `json:"entities"`
ModelVersion string `json:"model_version"`
}
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
query := `
UPDATE documents_summary
SET status = 'COMPLETED', entities = $2, model_version = $3, last_processed_at = NOW(), updated_at = NOW()
WHERE id = $1;`
_, err := db.ExecContext(ctx, query, event.StreamID, payload.Entities, payload.ModelVersion)
return err
}
// ... handleNlpFailed 类似 ...
投影服务的逻辑非常直接:监听所有事件,并根据事件类型执行相应的 INSERT
或 UPDATE
操作。ON CONFLICT (id) DO NOTHING
是一种处理重复事件的简单有效方法,保证了投影操作的幂等性。
架构的边界与未来展望
这套基于事件溯源的架构虽然解决了可审计性、解耦和可扩展性的核心问题,但它也引入了新的复杂性。首先是最终一致性。读模型的数据更新总是会滞后于写模型,客户端在写入后立刻查询可能无法看到最新的状态,这对于某些业务场景是不可接受的。
其次,事件的 schema 管理是一个长期挑战。随着业务发展,事件的结构必然会发生变化。如何处理旧版本的事件(事件升级)、如何保证新旧代码的兼容性,需要一套完整的版本控制和迁移策略。
最后,对于生命周期极长的业务实体,其事件流可能会变得非常庞大,每次需要重建状态时都需要从头读取所有事件,性能会成为瓶颈。对此,通常的解决方案是引入“快照”(Snapshot)机制,即定期将实体的当前状态保存下来,重建时只需从最近的快照开始应用后续事件即可。
尽管存在这些挑战,但这套架构赋予了我们巨大的灵活性。当需要用更先进的 spaCy
模型重新处理所有历史数据时,我们无需进行危险的数据库迁移。只需启动一个新的投影服务,从头读取 events
表中的所有 DocumentReceived
事件,将它们重新发送到 NATS
的一个新主题,由一个配置了新模型的NLP处理服务消费即可。旧的读模型可以保持不变,直到新的读模型完全构建完成,然后通过流量切换平滑地完成升级,整个过程对线上查询服务零影响。这正是事件溯源架构真正的威力所在。