利用事件溯源模式在PostgreSQL与NATS上构建可审计的spaCy处理管道


一个看似简单的需求摆在了面前:我们需要构建一个服务,接收非结构化的文本文档,利用自然语言处理(NLP)模型进行实体识别、情感分析等一系列处理,最后将结构化的结果存储起来供下游查询。但其中附加了几个严苛的生产级约束:第一,整个处理过程必须具备金融级别的可审计性,任何一次处理、模型更迭、甚至是失败尝试,都必须有迹可循;第二,系统必须是高可用的,NLP处理模块的故障或性能瓶颈不能阻塞整个文档接收流程;第三,架构必须支持未来业务的扩展,例如用新的NLP模型重新处理历史数据,或增加新的数据消费者。

一个直接的同步RESTful API调用 spaCy 模型进行处理,然后将结果写入 PostgreSQL 的方案,在第一轮评审中就被否决了。这种设计耦合度太高,同步调用意味着前端的延迟完全取决于后端最慢的处理环节——NLP。更重要的是,它完全无法满足“可审计性”和“可回溯性”的要求。如果数据被错误地更新,我们将永远失去变更前的状态和变更原因。

第二个方案是引入消息队列的异步处理模型。一个API服务接收文档,将其放入队列,然后由一个独立的NLP处理服务消费。这解决了服务解耦和异步处理的问题。但是,它仍然没有解决核心的审计问题。数据库中的记录依然是“当前状态”的快照,我们知道“是什么”,但不知道“为什么会变成这样”。当需要用新模型重新处理上百万份历史文档时,操作会变得异常复杂且危险,本质上是一次大规模的数据库覆盖操作。

最终,我们选择了事件溯源(Event Sourcing, ES)与命令查询职责分离(CQRS)的架构模式。这并非一个流行的银弹,而是一个经过深思熟虑的权衡。其核心思想是,系统中唯一的数据源(Source of Truth)不再是状态本身,而是一系列描述状态变更的、不可变的“事件”日志。这天然满足了审计需求。而 PostgreSQL 的事务能力和强大的 JSONB 类型,使其成为事件存储(Event Store)的理想选择。NATS JetStream 则作为事件在微服务间传递的高性能、高可靠性的总线。spaCy 负责核心的领域逻辑——NLP处理。

架构决策与数据流

我们的系统被拆分为几个职责明确的微服务。它们之间通过事件进行通信,而非直接的API调用。

graph TD
    subgraph "客户端"
        Client[外部调用方]
    end

    subgraph "写模型 (Commands)"
        Client -- 1. HTTP POST /documents --> CommandService[命令服务 Go]
        CommandService -- 2. 验证并生成 DocumentReceived 事件 --> PG_EventStore[(PostgreSQL 事件存储)]
        PG_EventStore -- 3.
        AFTER COMMIT Trigger --> NATS[NATS JetStream]
    end

    subgraph "NLP处理 (Business Logic)"
        NATS -- 4. 消费 DocumentReceived 事件 --> NlpProcessor[NLP处理服务 Python/spaCy]
        NlpProcessor -- 5. 执行NLP分析 --> NlpProcessor
        NlpProcessor -- 6. 生成 NlpCompleted/NlpFailed 事件 --> PG_EventStore
    end

    subgraph "读模型 (Queries)"
        NATS -- 7. 消费所有相关事件 --> Projector[投影服务 Go]
        Projector -- 8. 更新/插入读模型 --> PG_ReadModel[(PostgreSQL 读模型)]
        QueryService[查询服务 Go] -- 9. HTTP GET /documents/:id --> PG_ReadModel
        Client -- 10. 查询结果 --> QueryService
    end

    style PG_EventStore fill:#f9f,stroke:#333,stroke-width:2px
    style PG_ReadModel fill:#ccf,stroke:#333,stroke-width:2px
  1. 命令服务 (Command Service): 唯一接收外部写请求的入口。它负责验证命令的合法性,但不执行任何业务逻辑。它的唯一职责是生成一个代表意图的事件(例如 DocumentReceived)并将其原子性地写入 PostgreSQL 的事件存储中。
  2. 事件存储 (Event Store): 这是系统的核心。我们选择在 PostgreSQL 中用一张简单的表来实现。这张表只允许追加,不允许修改或删除。每次写入事件时,我们会检查业务实体(在这里是文档)的版本号,以实现乐观并发控制。
  3. 事件总线 (NATS JetStream): 事件成功持久化到数据库后,我们需要通知其他服务。与其让命令服务同时写入数据库和发布消息(这会引入分布式事务的复杂性),我们采用更可靠的事务性发件箱模式(Transactional Outbox)。事件被写入同一事务中的 outbox 表,一个独立的进程或数据库触发器负责将新事件可靠地推送到 NATS
  4. NLP处理服务 (NLP Processor): 这是一个后台工作者。它订阅 NATS 中它感兴趣的事件(DocumentReceived)。接收到事件后,它执行耗时的 spaCy 分析,然后将结果封装成一个新的事件(如 NlpCompletedNlpFailed),同样写入事件存储。
  5. 投影服务 (Projector): 它订阅所有相关的事件流,并将这些事件“投影”成一个为查询优化的“读模型”。这个读模型可以是一个或多个反规范化的 PostgreSQL 表。它的存在是为了将写模型(事件流)和读模型(状态视图)解耦。
  6. 查询服务 (Query Service): 一个非常轻量级的服务,直接从读模型中读取数据并返回给客户端。它不包含任何业务逻辑。

核心实现:PostgreSQL作为事件存储

事件存储的表结构设计至关重要。它必须高效、可扩展且易于查询。

-- events.sql: 事件存储核心表结构

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

-- 用于存储所有事件的单一表
CREATE TABLE events (
    -- 全局唯一的事件ID
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    -- 聚合根ID,这里是文档的ID
    stream_id UUID NOT NULL,
    -- 事件在流中的版本号,用于乐观锁
    version BIGINT NOT NULL,
    -- 事件类型,例如 'DocumentReceived', 'NlpCompleted'
    event_type VARCHAR(255) NOT NULL,
    -- 事件负载,存储具体数据
    payload JSONB NOT NULL,
    -- 事件元数据,例如 correlation_id, causation_id
    metadata JSONB DEFAULT '{}'::jsonb,
    -- 事件发生的时间戳
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    -- 确保每个聚合根的版本号是唯一的
    CONSTRAINT events_stream_id_version_unique UNIQUE (stream_id, version)
);

-- 常用查询的索引
CREATE INDEX idx_events_stream_id ON events (stream_id);
CREATE INDEX idx_events_event_type ON events (event_type);
CREATE INDEX idx_events_created_at ON events (created_at);

COMMENT ON COLUMN events.stream_id IS '关联的业务实体ID,即聚合根ID';
COMMENT ON COLUMN events.version IS '事件版本号,用于乐观并发控制';
COMMENT ON COLUMN events.payload IS '事件的具体数据,以JSON格式存储';
COMMENT ON COLUMN events.metadata IS '元数据,用于追踪和调试';

这里的 stream_id 代表一个业务实体(比如一个文档),而 version 字段是实现乐观锁的关键。当一个服务要为一个文档追加新事件时,它必须先读取该文档的当前版本号,然后在写入新事件时将版本号加一。如果此时有另一个进程已经写入了新事件,数据库的 UNIQUE 约束会触发,操作失败,应用层需要重试。

命令服务:原子性地追加事件

命令服务的实现(我们使用Go)必须确保事件写入的原子性。

// command-service/main.go
package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/google/uuid"
	_ "github.com/lib/pq"
	"github.com/nats-io/nats.go"
)

// 全局变量,在生产环境中应通过配置管理
var db *sql.DB
var nc *nats.Conn
var js nats.JetStreamContext

const eventStoreTableName = "events"

// Event 定义了事件的基本结构
type Event struct {
	StreamID  uuid.UUID       `json:"stream_id"`
	Version   int64           `json:"version"`
	EventType string          `json:"event_type"`
	Payload   json.RawMessage `json:"payload"`

}

// DocumentReceivedPayload 是 DocumentReceived 事件的具体内容
type DocumentReceivedPayload struct {
	Content   string    `json:"content"`
	Source    string    `json:"source"`
	ReceivedAt time.Time `json:"received_at"`
}

func main() {
	// ... 初始化DB和NATS连接 ...
    var err error
	db, err = sql.Open("postgres", "user=... password=... dbname=... sslmode=disable")
	if err != nil {
		log.Fatalf("Failed to connect to PostgreSQL: %v", err)
	}
	// ... NATS JetStream setup ...
    nc, err = nats.Connect("nats://localhost:4222")
    if err != nil {
        log.Fatalf("Failed to connect to NATS: %v", err)
    }
    js, err = nc.JetStream()
    if err != nil {
        log.Fatalf("Failed to get JetStream context: %v", err)
    }
    // 创建一个名为 'EVENTS' 的流
    js.AddStream(&nats.StreamConfig{
        Name:     "EVENTS",
        Subjects: []string{"events.>"},
    })


	http.HandleFunc("/documents", handleReceiveDocument)
	log.Println("Command service listening on :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

func handleReceiveDocument(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	var reqBody struct {
		Content string `json:"content"`
		Source  string `json:"source"`
	}
	if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
		http.Error(w, "Invalid request body", http.StatusBadRequest)
		return
	}
    // 这是一个新文档,所以我们生成一个新的UUID
	docID := uuid.New()

	payload, _ := json.Marshal(DocumentReceivedPayload{
		Content:   reqBody.Content,
		Source:    reqBody.Source,
		ReceivedAt: time.Now().UTC(),
	})

	event := Event{
		StreamID:  docID,
		Version:   1, // 新文档的第一个事件,版本号为1
		EventType: "DocumentReceived",
		Payload:   payload,
	}

	if err := appendToStream(context.Background(), event); err != nil {
		// 这里的错误处理很关键
        // 可能是版本冲突,也可能是数据库连接问题
		log.Printf("Error appending event: %v", err)
		http.Error(w, "Internal server error", http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusAccepted)
	json.NewEncoder(w).Encode(map[string]string{"document_id": docID.String()})
}

// appendToStream 将事件原子性地写入数据库并发布到NATS
func appendToStream(ctx context.Context, event Event) error {
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		return fmt.Errorf("could not begin transaction: %w", err)
	}
	defer tx.Rollback() // Rollback is a no-op if Commit succeeds

	query := fmt.Sprintf(
		`INSERT INTO %s (stream_id, version, event_type, payload) VALUES ($1, $2, $3, $4)`,
		eventStoreTableName,
	)

	_, err = tx.ExecContext(ctx, query, event.StreamID, event.Version, event.EventType, event.Payload)
	if err != nil {
		// 错误可能由UNIQUE约束(version冲突)引起
		return fmt.Errorf("could not insert event: %w", err)
	}

    // 将事件发布到NATS
    eventData, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("could not marshal event for NATS: %w", err)
    }

    // 主题结构: events.<event_type>.<stream_id>
    subject := fmt.Sprintf("events.%s.%s", event.EventType, event.StreamID.String())
    _, err = js.Publish(subject, eventData)
    if err != nil {
        // 如果NATS发布失败,事务会回滚,保证数据一致性
        return fmt.Errorf("could not publish event to NATS: %w", err)
    }

    // 只有当所有操作都成功时才提交事务
	return tx.Commit()
}

注意 appendToStream 函数。它在一个数据库事务中执行 INSERTjs.Publish。这是一个简化的实现。在真实项目中,为了极致的可靠性,我们不会在事务中直接发布到 NATS。我们会写入一个 outbox 表,然后由另一个进程轮询该表并将事件发布出去(事务性发件箱模式),这样可以保证即使 NATS 服务短暂不可用,事件最终也一定会被发布。但对于大多数场景,将发布操作放在事务的最后,并在失败时回滚,已经足够健壮。

NLP处理服务:幂等的事件消费者

NLP 处理服务是核心业务逻辑的承载者。它必须被设计为幂等的,因为消息系统(如NATS)通常提供“至少一次”的投递保证,意味着同一个事件可能被重复消费。

# nlp-processor/main.py
import asyncio
import json
import logging
import os
import spacy
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from uuid import UUID

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 加载spaCy模型。这是一个耗时操作,应在服务启动时完成。
try:
    nlp = spacy.load("en_core_web_sm")
    logging.info("spaCy model 'en_core_web_sm' loaded successfully.")
except OSError:
    logging.error("Model 'en_core_web_sm' not found. Please run 'python -m spacy download en_core_web_sm'")
    exit(1)


# 数据库连接
DB_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost/db_name")
engine = create_engine(DB_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

class EventStore:
    """一个简单的用于与事件存储交互的类"""
    def get_current_version(self, session, stream_id: UUID) -> int:
        query = text("SELECT COALESCE(MAX(version), 0) FROM events WHERE stream_id = :stream_id")
        result = session.execute(query, {"stream_id": stream_id}).scalar_one()
        return int(result)

    def append_event(self, session, stream_id: UUID, event_type: str, payload: dict):
        current_version = self.get_current_version(session, stream_id)
        next_version = current_version + 1
        
        # 这是一个关键步骤,确保幂等性。
        # 如果已经存在一个同类型的事件在当前版本之后,我们假设它已被处理。
        check_query = text("""
            SELECT 1 FROM events 
            WHERE stream_id = :stream_id AND version > :current_version AND event_type = :event_type
        """)
        exists = session.execute(check_query, {
            "stream_id": stream_id, 
            "current_version": current_version, 
            "event_type": event_type
        }).first()
        if exists:
            logging.warning(f"Event '{event_type}' for stream {stream_id} seems to be already processed. Skipping.")
            return

        query = text("""
            INSERT INTO events (stream_id, version, event_type, payload)
            VALUES (:stream_id, :version, :event_type, :payload)
        """)
        session.execute(query, {
            "stream_id": stream_id,
            "version": next_version,
            "event_type": event_type,
            "payload": json.dumps(payload)
        })
        logging.info(f"Appended event '{event_type}' v{next_version} for stream {stream_id}")

event_store = EventStore()

async def message_handler(msg):
    subject = msg.subject
    data = json.loads(msg.data.decode())
    logging.info(f"Received a message on '{subject}'")

    stream_id = UUID(data['StreamID'])
    content = data['Payload']['content']

    db_session = SessionLocal()
    try:
        # 执行NLP处理
        doc = nlp(content)
        entities = [{"text": ent.text, "label": ent.label_} for ent in doc.ents]
        
        result_payload = {
            "entities": entities,
            "model_version": nlp.meta['version']
        }
        
        # 将处理结果作为新事件写入
        event_store.append_event(db_session, stream_id, "NlpCompleted", result_payload)
        db_session.commit()
        
    except Exception as e:
        logging.error(f"Failed to process message for stream {stream_id}: {e}", exc_info=True)
        db_session.rollback()
        # 产生一个失败事件
        failure_payload = {"error": str(e), "original_event": data}
        try:
            event_store.append_event(db_session, stream_id, "NlpFailed", failure_payload)
            db_session.commit()
        except Exception as commit_e:
            logging.error(f"Failed to even commit failure event for stream {stream_id}: {commit_e}")
            db_session.rollback()
    finally:
        db_session.close()


async def main():
    nc = NATS()
    try:
        await nc.connect(servers=["nats://localhost:4222"])
        logging.info("Connected to NATS.")
    except ErrNoServers as e:
        logging.error(f"Could not connect to NATS: {e}")
        return

    # 订阅 'DocumentReceived' 事件
    # 使用持久化队列订阅,确保服务重启后能从上次中断的地方继续
    await nc.jetstream().subscribe("events.DocumentReceived.>", durable="nlp-processor", cb=message_handler)
    logging.info("Subscribed to 'events.DocumentReceived.>'")

    # 保持运行
    try:
        await asyncio.Future()
    except asyncio.CancelledError:
        await nc.close()

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

EventStore.append_event 中的幂等性检查是一种简单的实现方式。在更复杂的场景中,可能需要在读模型中维护一个已处理消息ID的集合,并在处理前进行检查。错误处理也很重要:当NLP处理失败时,我们不是简单地丢弃消息,而是产生一个 NlpFailed 事件。这使得系统的状态变更依然是可追踪的,我们可以后续对这些失败的文档进行分析或重试。

投影服务:构建查询友好的读模型

投影服务的工作是“扁平化”事件流。它消费事件,然后更新一个或多个反规范化的表,这些表专门为快速查询而设计。

-- read_models.sql: 读模型表结构
CREATE TABLE documents_summary (
    id UUID PRIMARY KEY,
    source VARCHAR(255),
    content TEXT,
    status VARCHAR(50) NOT NULL DEFAULT 'PENDING',
    entities JSONB,
    model_version VARCHAR(50),
    last_processed_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX idx_documents_summary_status ON documents_summary(status);

这个 documents_summary 表就是我们的读模型。它包含了我们希望快速查询的所有信息。投影服务的Go实现如下:

// projector-service/main.go
package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"log"
	"time"

	"github.com/google/uuid"
	_ "github.com/lib/pq"
	"github.com/nats-io/nats.go"
)

// ... DB 和 NATS 连接设置 ...

type Event struct {
	StreamID  uuid.UUID       `json:"StreamID"`
	EventType string          `json:"EventType"`
	Payload   json.RawMessage `json:"Payload"`
}

func main() {
    // ... 初始化DB和NATS ...
	js.Subscribe("events.>", func(msg *nats.Msg) {
		var event Event
		if err := json.Unmarshal(msg.Data, &event); err != nil {
			log.Printf("Error unmarshalling event: %v", err)
			msg.Nak() // 告诉NATS消息处理失败,稍后重试
			return
		}

		ctx := context.Background()
		if err := handleEvent(ctx, event); err != nil {
			log.Printf("Error handling event %s for stream %s: %v", event.EventType, event.StreamID, err)
			msg.Nak()
			return
		}
		msg.Ack() // 确认消息处理成功
	}, nats.Durable("document-projector"), nats.ManualAck())

	log.Println("Projector service running...")
	select {} // Block forever
}

func handleEvent(ctx context.Context, event Event) error {
	switch event.EventType {
	case "DocumentReceived":
		return handleDocumentReceived(ctx, event)
	case "NlpCompleted":
		return handleNlpCompleted(ctx, event)
	case "NlpFailed":
		return handleNlpFailed(ctx, event)
	default:
		log.Printf("Ignoring unknown event type: %s", event.EventType)
		return nil
	}
}

func handleDocumentReceived(ctx context.Context, event Event) error {
	var payload struct {
		Content string `json:"content"`
		Source  string `json:"source"`
		ReceivedAt time.Time `json:"received_at"`
	}
	if err := json.Unmarshal(event.Payload, &payload); err != nil {
		return err
	}

	query := `
		INSERT INTO documents_summary (id, source, content, status, created_at, updated_at)
		VALUES ($1, $2, $3, 'PROCESSING', $4, $4)
		ON CONFLICT (id) DO NOTHING;`
	
	_, err := db.ExecContext(ctx, query, event.StreamID, payload.Source, payload.Content, payload.ReceivedAt)
	return err
}

func handleNlpCompleted(ctx context.Context, event Event) error {
	var payload struct {
		Entities     json.RawMessage `json:"entities"`
		ModelVersion string          `json:"model_version"`
	}
	if err := json.Unmarshal(event.Payload, &payload); err != nil {
		return err
	}

	query := `
		UPDATE documents_summary
		SET status = 'COMPLETED', entities = $2, model_version = $3, last_processed_at = NOW(), updated_at = NOW()
		WHERE id = $1;`

	_, err := db.ExecContext(ctx, query, event.StreamID, payload.Entities, payload.ModelVersion)
	return err
}
// ... handleNlpFailed 类似 ...

投影服务的逻辑非常直接:监听所有事件,并根据事件类型执行相应的 INSERTUPDATE 操作。ON CONFLICT (id) DO NOTHING 是一种处理重复事件的简单有效方法,保证了投影操作的幂等性。

架构的边界与未来展望

这套基于事件溯源的架构虽然解决了可审计性、解耦和可扩展性的核心问题,但它也引入了新的复杂性。首先是最终一致性。读模型的数据更新总是会滞后于写模型,客户端在写入后立刻查询可能无法看到最新的状态,这对于某些业务场景是不可接受的。

其次,事件的 schema 管理是一个长期挑战。随着业务发展,事件的结构必然会发生变化。如何处理旧版本的事件(事件升级)、如何保证新旧代码的兼容性,需要一套完整的版本控制和迁移策略。

最后,对于生命周期极长的业务实体,其事件流可能会变得非常庞大,每次需要重建状态时都需要从头读取所有事件,性能会成为瓶颈。对此,通常的解决方案是引入“快照”(Snapshot)机制,即定期将实体的当前状态保存下来,重建时只需从最近的快照开始应用后续事件即可。

尽管存在这些挑战,但这套架构赋予了我们巨大的灵活性。当需要用更先进的 spaCy 模型重新处理所有历史数据时,我们无需进行危险的数据库迁移。只需启动一个新的投影服务,从头读取 events 表中的所有 DocumentReceived 事件,将它们重新发送到 NATS 的一个新主题,由一个配置了新模型的NLP处理服务消费即可。旧的读模型可以保持不变,直到新的读模型完全构建完成,然后通过流量切换平滑地完成升级,整个过程对线上查询服务零影响。这正是事件溯源架构真正的威力所在。


  目录