构建基于Puppeteer与Cassandra的CQRS数据采集架构及SwiftUI原生监控端实践


我们的核心挑战并非简单的数据抓取,而是要为上千个无API、结构频繁变更、且普遍采用JavaScript动态渲染的Web数据源,构建一个工业级的、可长期维护的数据采集平台。传统的定时任务(Cron Job)配合简单脚本的模式在这种规模和复杂性面前,其脆弱性暴露无遗:任何一个数据源的DOM结构微调都可能导致整个采集任务静默失败,数据丢失且难以追溯。系统的瓶颈很快会从采集效率转向故障排查与系统可维护性。

因此,问题被重新定义:如何设计一个架构,能够大规模并发执行采集任务,对采集过程中的每一次成功、失败、甚至环境变化都留下不可变痕迹,并且为运维团队提供一个高性能、高信息密度的原生操作界面,以实现对整个采集集群的实时干预与监控。

方案A: 传统一体化架构的局限

一种直接的思路是构建一个单体应用。该应用通过一个中心化的调度器,动态创建Puppeteer子进程执行抓取任务。抓取的数据经过清洗和转换后,直接写入一个关系型数据库(例如PostgreSQL)的目标表中。一个内嵌的Web前端(如React或Vue)则提供任务管理和数据显示的仪表盘。

优点:

  • 开发启动快: 单一代码库,技术栈统一,初期开发效率高。
  • 部署简单: 作为一个整体进行部署和管理。

不可忽视的缺点:

  • 紧密耦合: 采集逻辑、数据处理、存储写入和UI展示被紧密捆绑。对任何一环的修改都可能影响全局。例如,为适应某个网站的变更而调整采集脚本,可能会意外影响数据入库的公共模块。
  • 脆弱的事务链: 整个流程 抓取 -> 解析 -> 入库 形成一个隐式的长事务。如果在最后一步的数据库写入环节失败,之前通过Puppeteer获取的宝贵数据可能会被丢弃,除非实现复杂的重试和补偿逻辑,但这本身就在破坏单体应用的简洁性。
  • 扩展性受限: 如果瓶颈出现在CPU密集型的Puppeteer渲染上,我们必须水平扩展整个单体应用,这造成了资源浪费,因为数据处理和API服务部分可能远未达到负载上限。
  • 数据模型僵化: 关系型数据库的预定义Schema难以适应Web数据源的多变性。频繁的ALTER TABLE在生产环境中是高风险操作,且无法优雅地处理历史数据的结构差异。
  • 运维黑盒: 当任务失败时,我们能得到的通常只有一个最终的“失败”状态。失败的具体原因——是网络超时、目标页面改版、还是反爬虫策略——被淹没在日志的海洋中,难以进行系统性的归因分析。

在真实项目中,这种架构的维护成本会随着数据源数量的增加而指数级增长,最终陷入“修补旧漏洞比开发新功能更耗时”的泥沼。

方案B: 基于CQRS与事件溯源的解耦架构

为了克服上述缺点,我们决定采用命令查询职责分离(Command Query Responsibility Segregation, CQRS)模式,并结合事件溯源(Event Sourcing)的思想。

这个架构的核心是将系统的写操作(命令)与读操作(查询)彻底分离。

graph TD
    subgraph "命令侧 (Write Side)"
        A[SwiftUI/Admin API] -- 1. 发送采集命令 --> B(Command Bus/API Gateway);
        B -- 2. 分发Command --> C{Command Handler};
        C -- 3. 执行业务逻辑, 创建事件 --> D[Puppeteer Worker Pool];
        D -- 4. 执行抓取, 产生领域事件 --> E{Event Handler};
        E -- 5. 将事件持久化 --> F[(Cassandra Event Store)];
    end

    subgraph "查询侧 (Read Side)"
        F -- 6. 事件流 --> G(Projector/Projection Service);
        G -- 7. 构建/更新读模型 --> H1[(PostgreSQL Read Model)];
        G -- 7. 构建/更新读模型 --> H2[(Elasticsearch Read Model)];
        I(Query API) -- 8. 查询读模型 --> H1;
        I -- 8. 查询读模型 --> H2;
        J[SwiftUI Client] -- 9. 订阅/查询数据 --> I;
    end

    style F fill:#f9f,stroke:#333,stroke-width:2px
    style J fill:#ccf,stroke:#333,stroke-width:2px

架构组件解析:

  1. 命令 (Commands): 表达意图的不可变对象,如 StartScrapingJobCommand。它们不包含业务逻辑,只携带执行所需的数据。
  2. 命令处理器 (Command Handlers): 接收命令,执行必要的验证和业务逻辑,最终产出一个或多个事件。
  3. 事件 (Events): 记录系统中已发生事实的不可变对象,如 JobCreated, PageRendered, DataExtracted, ScrapingFailed. 事件是本架构的“真相之源”。
  4. 事件存储 (Event Store): 我们选择Cassandra作为事件存储。这是整个系统的核心。所有事件都以仅追加(Append-Only)的方式写入。Cassandra的分布式、高可用、高写入吞吐量的特性完美契合事件存储的需求。我们不更新、不删除任何事件。
  5. 投影 (Projections): 一些独立的后台服务,它们订阅事件存储中的事件流。每当新事件发生时,投影会更新为特定查询场景优化的“读模型”(Read Model)。
  6. 读模型 (Read Models): 预先计算好的视图。例如,一个投影可以构建一个PostgreSQL表来展示所有任务的最新状态,另一个投影则可以将提取的数据写入Elasticsearch以供全文检索。
  7. 查询 (Queries): 客户端通过一个独立的查询API来读取这些为它们量身定制的读模型,实现高效查询。

技术选型决策:

  • 后端 (Backend): Node.js (TypeScript)。与Puppeteer(基于Node.js)保持技术栈一致,便于管理依赖和进程通信。其异步I/O模型也适合处理大量并发的网络请求和消息队列交互。
  • 事件存储 (Event Store): Apache Cassandra。选择它的理由是其卓越的写性能和线性扩展能力。Web采集会产生海量的状态变更事件,Cassandra的LSM-Tree架构就是为这种写密集型场景设计的。
  • 原生监控端 (Native Client): SwiftUI (macOS/iPadOS)。运维团队需要一个能处理高频数据更新、提供流畅交互和丰富图表的可视化工具。相比Web界面,原生应用能更好地利用系统资源,提供更低的延迟和更稳定的性能,尤其是在展示实时日志流、性能指标曲线等场景下,体验优势明显。

这个方案的初始复杂度的确更高,但它为我们提供了无与伦比的弹性、可扩展性和可观测性。

核心实现:代码与设计剖析

1. Cassandra事件存储的表结构设计

事件存储是架构的基石,其表结构设计至关重要。一个糟糕的设计会导致热点问题,扼杀Cassandra的性能优势。

我们的核心表是 job_events

-- job_events: 存储与单个采集任务相关的所有事件
-- 这是一个典型的事件溯源表设计
CREATE TABLE IF NOT EXISTS data_pipeline.job_events (
    -- Partition Key: job_id
    -- 所有与同一个任务相关的事件都存储在同一个分区中,确保按任务查询的高效性。
    job_id uuid,

    -- Clustering Key: event_time
    -- 在分区内部,事件按时间戳排序。这让我们能够轻松地按时间顺序检索一个任务的完整历史。
    -- 使用 timeuuid 可以保证时序性和唯一性,避免同一毫秒内事件的冲突。
    event_time timeuuid,

    -- Event Type: 用于反序列化事件负载
    event_type text,

    -- Event Payload: 存储事件具体内容的JSON/Protobuf
    -- 使用 text 存储 JSON 具有灵活性,但在超大规模下,二进制格式(blob)配合Protobuf会更高效。
    event_payload text,

    -- 元数据,用于快速过滤和索引
    -- 例如,可以添加 worker_id, source_url 等,但要谨慎,避免模型过度膨胀。
    metadata map<text, text>,

    PRIMARY KEY (job_id, event_time)
) WITH CLUSTERING ORDER BY (event_time ASC);

这个设计的关键在于 PRIMARY KEY (job_id, event_time)job_id 作为分区键,意味着一个任务的所有历史事件在物理上会存储在一起,查询一个任务的完整生命周期就变成了一次高效的分区内顺序扫描。event_time 作为聚类键,保证了事件在分区内的时序性。

2. Node.js Puppeteer Worker的实现

Worker是执行实际采集任务的单元。它必须是健壮的,能够处理各种异常,并将每一个关键步骤都作为事件报告给系统。

// src/worker/puppeteer-worker.ts
import puppeteer, { Browser, Page } from 'puppeteer';
import { v4 as uuidv4 } from 'uuid';
import { CassandraClient } from '../persistence/cassandra-client';
import { JobCommand, BaseEvent } from '../types';

// 生产级的Puppeteer配置
const puppeteerLaunchOptions = {
  headless: true,
  args: [
    '--no-sandbox',
    '--disable-setuid-sandbox',
    '--disable-dev-shm-usage',
    '--disable-accelerated-2d-canvas',
    '--disable-gpu',
    '--window-size=1920x1080',
  ],
};

export class PuppeteerWorker {
  private browser: Browser | null = null;
  private cassandraClient: CassandraClient;

  constructor() {
    // 依赖注入Cassandra客户端
    this.cassandraClient = new CassandraClient(); 
  }

  // 初始化浏览器实例,这是一个昂贵操作,应在Worker生命周期内复用
  public async initialize(): Promise<void> {
    this.logEvent('WorkerInitializing', {});
    this.browser = await puppeteer.launch(puppeteerLaunchOptions);
    this.logEvent('WorkerInitialized', { browserVersion: await this.browser.version() });
  }

  // 核心任务执行逻辑
  public async executeJob(command: JobCommand): Promise<void> {
    if (!this.browser) {
      throw new Error('Worker not initialized. Call initialize() first.');
    }

    const { jobId, url, steps } = command;
    let page: Page | null = null;

    try {
      await this.persistEvent(jobId, 'JobExecutionStarted', { url });

      page = await this.browser.newPage();
      await page.setViewport({ width: 1920, height: 1080 });

      // 导航到目标URL,包含详细的超时和错误处理
      await page.goto(url, { waitUntil: 'networkidle2', timeout: 30000 });
      await this.persistEvent(jobId, 'PageNavigationSucceeded', { url });

      // 模拟一个数据提取步骤
      const titleSelector = 'h1';
      await page.waitForSelector(titleSelector, { timeout: 10000 });
      const title = await page.$eval(titleSelector, el => el.textContent);
      
      await this.persistEvent(jobId, 'DataExtracted', { extractedData: { title } });

      await this.persistEvent(jobId, 'JobExecutionSucceeded', {});

    } catch (error: any) {
      // 捕获所有潜在错误,并将其作为事件记录
      // 这是可观测性的关键:失败不是终点,而是有价值的数据点
      const errorDetails = {
        name: error.name,
        message: error.message,
        stack: error.stack?.substring(0, 1024), // 限制堆栈大小
      };
      await this.persistEvent(jobId, 'JobExecutionFailed', { error: errorDetails });
    } finally {
      if (page) {
        await page.close();
      }
    }
  }

  private async persistEvent(jobId: string, eventType: string, payload: object): Promise<void> {
    const event: BaseEvent = {
      jobId,
      eventType,
      payload,
    };
    // 实际的数据库写入操作
    await this.cassandraClient.insertEvent(event);
  }

  private logEvent(eventType: string, payload: object): void {
      // 用于worker自身日志,非业务事件
      console.log(JSON.stringify({ eventType, payload, timestamp: new Date().toISOString() }));
  }

  public async shutdown(): Promise<void> {
    if (this.browser) {
      await this.browser.close();
      this.logEvent('WorkerShutdown', {});
    }
  }
}

这段代码的重点在于,executeJob方法中的每一个步骤,无论是成功还是失败,都通过persistEvent方法被记录下来。这种细粒度的事件日志使得事后复盘变得极为精确。当一个任务失败时,我们可以通过查询其在Cassandra中的事件流,准确地定位到是导航超时、选择器未找到,还是数据解析出错。

3. SwiftUI监控客户端的构建

SwiftUI客户端的核心是消费查询API提供的数据,并将其以高效、直观的方式呈现。

数据模型 (Data Models):
首先,定义与后端Query API响应匹配的Codable结构体。

// Models/JobStatus.swift
import Foundation

// 这是一个简化的读模型,用于在主列表中显示任务概览
struct JobStatus: Identifiable, Codable, Hashable {
    let id: UUID
    let url: String
    let status: JobState
    let startedAt: Date
    let lastUpdatedAt: Date
    let failureReason: String?

    enum JobState: String, Codable {
        case pending = "PENDING"
        case running = "RUNNING"
        case succeeded = "SUCCEEDED"
        case failed = "FAILED"
    }
}

API服务层 (API Service):
使用async/await封装网络请求。

// Services/APIService.swift
import Foundation

class APIService {
    private let baseURL = URL(string: "http://api.your-backend.com/query")!

    func fetchJobStatuses() async throws -> [JobStatus] {
        let (data, response) = try await URLSession.shared.data(from: baseURL.appendingPathComponent("jobs"))

        guard let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 200 else {
            throw URLError(.badServerResponse)
        }
        
        // 使用自定义的JSONDecoder处理日期格式
        let decoder = JSONDecoder()
        decoder.dateDecodingStrategy = .iso8601
        
        return try decoder.decode([JobStatus].self, from: data)
    }
    
    // 可以在此添加提交新任务(Command)的方法
    func submitNewJob(url: String) async throws {
        // ... 实现POST请求到Command API
    }
}

视图 (View) 与视图模型 (ViewModel):
使用StateObject@Published属性来驱动UI的响应式更新。

// ViewModels/JobListViewModel.swift
import Foundation
import Combine

@MainActor
class JobListViewModel: ObservableObject {
    @Published var jobs: [JobStatus] = []
    @Published var isLoading = false
    @Published var errorMessage: String? = nil

    private let apiService = APIService()

    func loadJobs() async {
        isLoading = true
        errorMessage = nil
        
        do {
            jobs = try await apiService.fetchJobStatuses()
        } catch {
            errorMessage = "Failed to load jobs: \(error.localizedDescription)"
        }
        
        isLoading = false
    }
}

// Views/JobListView.swift
import SwiftUI

struct JobListView: View {
    @StateObject private var viewModel = JobListViewModel()

    var body: some View {
        NavigationView {
            VStack {
                if viewModel.isLoading {
                    ProgressView()
                } else if let errorMessage = viewModel.errorMessage {
                    Text(errorMessage).foregroundColor(.red)
                } else {
                    List(viewModel.jobs) { job in
                        VStack(alignment: .leading) {
                            Text(job.url)
                                .font(.headline)
                                .lineLimit(1)
                            HStack {
                                Text(job.status.rawValue)
                                    .font(.caption)
                                    .padding(4)
                                    .background(statusColor(for: job.status))
                                    .foregroundColor(.white)
                                    .cornerRadius(4)
                                Spacer()
                                Text(job.lastUpdatedAt, style: .relative)
                                    .font(.caption2)
                                    .foregroundColor(.secondary)
                            }
                        }
                    }
                }
            }
            .navigationTitle("Scraping Jobs")
            .onAppear {
                Task {
                    await viewModel.loadJobs()
                }
            }
            .toolbar {
                ToolbarItem {
                    Button(action: {
                        Task { await viewModel.loadJobs() }
                    }) {
                        Image(systemName: "arrow.clockwise")
                    }
                }
            }
        }
    }

    private func statusColor(for status: JobStatus.JobState) -> Color {
        switch status {
        case .pending: return .gray
        case .running: return .blue
        case .succeeded: return .green
        case .failed: return .red
        }
    }
}

这个SwiftUI实现展示了一个基础但功能完备的监控列表。在真实项目中,点击列表项可以导航到详情页,该页面会查询并展示特定job_id的完整事件流,为运维人员提供根本原因分析所需的所有上下文。

架构的扩展性与局限性

该架构的优雅之处在于其天然的可扩展性。需要支持一种新型的数据源(例如,从FTP文件而非网页抓取)?只需开发一个新的Worker类型,它同样向Cassandra事件存储中发布标准的领域事件即可。业务部门需要一个新的数据报表?只需开发一个新的投影服务,从事件流中聚合数据,生成所需的读模型,而无需对核心的采集和写入逻辑做任何改动。

然而,这个架构并非银弹。它的主要代价是最终一致性。从一个命令被执行,到其产生的事件被投影服务处理并更新到读模型,存在一个时间延迟。对于我们的监控场景,秒级的延迟是完全可以接受的。但如果业务场景要求写入后必须能立刻读到最新结果,那么CQRS可能不是最合适的选择,或者需要引入更复杂的机制来处理读写一致性。

另一个需要管理的复杂性是事件模型的演进。随着业务发展,事件的结构可能会发生变化。这要求我们建立一套严格的版本控制和迁移策略(例如,upcasting),以确保新旧版本的事件都能被系统正确解析。

最后,选择SwiftUI作为客户端,意味着将运维工具链锁定在苹果生态系统。这是一个战略决策,其前提是运维团队已经或计划统一使用Mac/iPad作为工作设备,以换取原生应用带来的极致性能和用户体验。如果需要跨平台支持,则需要重新评估客户端的技术选型。


  目录