我们的核心挑战并非简单的数据抓取,而是要为上千个无API、结构频繁变更、且普遍采用JavaScript动态渲染的Web数据源,构建一个工业级的、可长期维护的数据采集平台。传统的定时任务(Cron Job)配合简单脚本的模式在这种规模和复杂性面前,其脆弱性暴露无遗:任何一个数据源的DOM结构微调都可能导致整个采集任务静默失败,数据丢失且难以追溯。系统的瓶颈很快会从采集效率转向故障排查与系统可维护性。
因此,问题被重新定义:如何设计一个架构,能够大规模并发执行采集任务,对采集过程中的每一次成功、失败、甚至环境变化都留下不可变痕迹,并且为运维团队提供一个高性能、高信息密度的原生操作界面,以实现对整个采集集群的实时干预与监控。
方案A: 传统一体化架构的局限
一种直接的思路是构建一个单体应用。该应用通过一个中心化的调度器,动态创建Puppeteer子进程执行抓取任务。抓取的数据经过清洗和转换后,直接写入一个关系型数据库(例如PostgreSQL)的目标表中。一个内嵌的Web前端(如React或Vue)则提供任务管理和数据显示的仪表盘。
优点:
- 开发启动快: 单一代码库,技术栈统一,初期开发效率高。
- 部署简单: 作为一个整体进行部署和管理。
不可忽视的缺点:
- 紧密耦合: 采集逻辑、数据处理、存储写入和UI展示被紧密捆绑。对任何一环的修改都可能影响全局。例如,为适应某个网站的变更而调整采集脚本,可能会意外影响数据入库的公共模块。
- 脆弱的事务链: 整个流程
抓取 -> 解析 -> 入库
形成一个隐式的长事务。如果在最后一步的数据库写入环节失败,之前通过Puppeteer获取的宝贵数据可能会被丢弃,除非实现复杂的重试和补偿逻辑,但这本身就在破坏单体应用的简洁性。 - 扩展性受限: 如果瓶颈出现在CPU密集型的Puppeteer渲染上,我们必须水平扩展整个单体应用,这造成了资源浪费,因为数据处理和API服务部分可能远未达到负载上限。
- 数据模型僵化: 关系型数据库的预定义Schema难以适应Web数据源的多变性。频繁的
ALTER TABLE
在生产环境中是高风险操作,且无法优雅地处理历史数据的结构差异。 - 运维黑盒: 当任务失败时,我们能得到的通常只有一个最终的“失败”状态。失败的具体原因——是网络超时、目标页面改版、还是反爬虫策略——被淹没在日志的海洋中,难以进行系统性的归因分析。
在真实项目中,这种架构的维护成本会随着数据源数量的增加而指数级增长,最终陷入“修补旧漏洞比开发新功能更耗时”的泥沼。
方案B: 基于CQRS与事件溯源的解耦架构
为了克服上述缺点,我们决定采用命令查询职责分离(Command Query Responsibility Segregation, CQRS)模式,并结合事件溯源(Event Sourcing)的思想。
这个架构的核心是将系统的写操作(命令)与读操作(查询)彻底分离。
graph TD subgraph "命令侧 (Write Side)" A[SwiftUI/Admin API] -- 1. 发送采集命令 --> B(Command Bus/API Gateway); B -- 2. 分发Command --> C{Command Handler}; C -- 3. 执行业务逻辑, 创建事件 --> D[Puppeteer Worker Pool]; D -- 4. 执行抓取, 产生领域事件 --> E{Event Handler}; E -- 5. 将事件持久化 --> F[(Cassandra Event Store)]; end subgraph "查询侧 (Read Side)" F -- 6. 事件流 --> G(Projector/Projection Service); G -- 7. 构建/更新读模型 --> H1[(PostgreSQL Read Model)]; G -- 7. 构建/更新读模型 --> H2[(Elasticsearch Read Model)]; I(Query API) -- 8. 查询读模型 --> H1; I -- 8. 查询读模型 --> H2; J[SwiftUI Client] -- 9. 订阅/查询数据 --> I; end style F fill:#f9f,stroke:#333,stroke-width:2px style J fill:#ccf,stroke:#333,stroke-width:2px
架构组件解析:
- 命令 (Commands): 表达意图的不可变对象,如
StartScrapingJobCommand
。它们不包含业务逻辑,只携带执行所需的数据。 - 命令处理器 (Command Handlers): 接收命令,执行必要的验证和业务逻辑,最终产出一个或多个事件。
- 事件 (Events): 记录系统中已发生事实的不可变对象,如
JobCreated
,PageRendered
,DataExtracted
,ScrapingFailed
. 事件是本架构的“真相之源”。 - 事件存储 (Event Store): 我们选择Cassandra作为事件存储。这是整个系统的核心。所有事件都以仅追加(Append-Only)的方式写入。Cassandra的分布式、高可用、高写入吞吐量的特性完美契合事件存储的需求。我们不更新、不删除任何事件。
- 投影 (Projections): 一些独立的后台服务,它们订阅事件存储中的事件流。每当新事件发生时,投影会更新为特定查询场景优化的“读模型”(Read Model)。
- 读模型 (Read Models): 预先计算好的视图。例如,一个投影可以构建一个PostgreSQL表来展示所有任务的最新状态,另一个投影则可以将提取的数据写入Elasticsearch以供全文检索。
- 查询 (Queries): 客户端通过一个独立的查询API来读取这些为它们量身定制的读模型,实现高效查询。
技术选型决策:
- 后端 (Backend): Node.js (TypeScript)。与Puppeteer(基于Node.js)保持技术栈一致,便于管理依赖和进程通信。其异步I/O模型也适合处理大量并发的网络请求和消息队列交互。
- 事件存储 (Event Store): Apache Cassandra。选择它的理由是其卓越的写性能和线性扩展能力。Web采集会产生海量的状态变更事件,Cassandra的LSM-Tree架构就是为这种写密集型场景设计的。
- 原生监控端 (Native Client): SwiftUI (macOS/iPadOS)。运维团队需要一个能处理高频数据更新、提供流畅交互和丰富图表的可视化工具。相比Web界面,原生应用能更好地利用系统资源,提供更低的延迟和更稳定的性能,尤其是在展示实时日志流、性能指标曲线等场景下,体验优势明显。
这个方案的初始复杂度的确更高,但它为我们提供了无与伦比的弹性、可扩展性和可观测性。
核心实现:代码与设计剖析
1. Cassandra事件存储的表结构设计
事件存储是架构的基石,其表结构设计至关重要。一个糟糕的设计会导致热点问题,扼杀Cassandra的性能优势。
我们的核心表是 job_events
。
-- job_events: 存储与单个采集任务相关的所有事件
-- 这是一个典型的事件溯源表设计
CREATE TABLE IF NOT EXISTS data_pipeline.job_events (
-- Partition Key: job_id
-- 所有与同一个任务相关的事件都存储在同一个分区中,确保按任务查询的高效性。
job_id uuid,
-- Clustering Key: event_time
-- 在分区内部,事件按时间戳排序。这让我们能够轻松地按时间顺序检索一个任务的完整历史。
-- 使用 timeuuid 可以保证时序性和唯一性,避免同一毫秒内事件的冲突。
event_time timeuuid,
-- Event Type: 用于反序列化事件负载
event_type text,
-- Event Payload: 存储事件具体内容的JSON/Protobuf
-- 使用 text 存储 JSON 具有灵活性,但在超大规模下,二进制格式(blob)配合Protobuf会更高效。
event_payload text,
-- 元数据,用于快速过滤和索引
-- 例如,可以添加 worker_id, source_url 等,但要谨慎,避免模型过度膨胀。
metadata map<text, text>,
PRIMARY KEY (job_id, event_time)
) WITH CLUSTERING ORDER BY (event_time ASC);
这个设计的关键在于 PRIMARY KEY (job_id, event_time)
。job_id
作为分区键,意味着一个任务的所有历史事件在物理上会存储在一起,查询一个任务的完整生命周期就变成了一次高效的分区内顺序扫描。event_time
作为聚类键,保证了事件在分区内的时序性。
2. Node.js Puppeteer Worker的实现
Worker是执行实际采集任务的单元。它必须是健壮的,能够处理各种异常,并将每一个关键步骤都作为事件报告给系统。
// src/worker/puppeteer-worker.ts
import puppeteer, { Browser, Page } from 'puppeteer';
import { v4 as uuidv4 } from 'uuid';
import { CassandraClient } from '../persistence/cassandra-client';
import { JobCommand, BaseEvent } from '../types';
// 生产级的Puppeteer配置
const puppeteerLaunchOptions = {
headless: true,
args: [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--disable-gpu',
'--window-size=1920x1080',
],
};
export class PuppeteerWorker {
private browser: Browser | null = null;
private cassandraClient: CassandraClient;
constructor() {
// 依赖注入Cassandra客户端
this.cassandraClient = new CassandraClient();
}
// 初始化浏览器实例,这是一个昂贵操作,应在Worker生命周期内复用
public async initialize(): Promise<void> {
this.logEvent('WorkerInitializing', {});
this.browser = await puppeteer.launch(puppeteerLaunchOptions);
this.logEvent('WorkerInitialized', { browserVersion: await this.browser.version() });
}
// 核心任务执行逻辑
public async executeJob(command: JobCommand): Promise<void> {
if (!this.browser) {
throw new Error('Worker not initialized. Call initialize() first.');
}
const { jobId, url, steps } = command;
let page: Page | null = null;
try {
await this.persistEvent(jobId, 'JobExecutionStarted', { url });
page = await this.browser.newPage();
await page.setViewport({ width: 1920, height: 1080 });
// 导航到目标URL,包含详细的超时和错误处理
await page.goto(url, { waitUntil: 'networkidle2', timeout: 30000 });
await this.persistEvent(jobId, 'PageNavigationSucceeded', { url });
// 模拟一个数据提取步骤
const titleSelector = 'h1';
await page.waitForSelector(titleSelector, { timeout: 10000 });
const title = await page.$eval(titleSelector, el => el.textContent);
await this.persistEvent(jobId, 'DataExtracted', { extractedData: { title } });
await this.persistEvent(jobId, 'JobExecutionSucceeded', {});
} catch (error: any) {
// 捕获所有潜在错误,并将其作为事件记录
// 这是可观测性的关键:失败不是终点,而是有价值的数据点
const errorDetails = {
name: error.name,
message: error.message,
stack: error.stack?.substring(0, 1024), // 限制堆栈大小
};
await this.persistEvent(jobId, 'JobExecutionFailed', { error: errorDetails });
} finally {
if (page) {
await page.close();
}
}
}
private async persistEvent(jobId: string, eventType: string, payload: object): Promise<void> {
const event: BaseEvent = {
jobId,
eventType,
payload,
};
// 实际的数据库写入操作
await this.cassandraClient.insertEvent(event);
}
private logEvent(eventType: string, payload: object): void {
// 用于worker自身日志,非业务事件
console.log(JSON.stringify({ eventType, payload, timestamp: new Date().toISOString() }));
}
public async shutdown(): Promise<void> {
if (this.browser) {
await this.browser.close();
this.logEvent('WorkerShutdown', {});
}
}
}
这段代码的重点在于,executeJob
方法中的每一个步骤,无论是成功还是失败,都通过persistEvent
方法被记录下来。这种细粒度的事件日志使得事后复盘变得极为精确。当一个任务失败时,我们可以通过查询其在Cassandra中的事件流,准确地定位到是导航超时、选择器未找到,还是数据解析出错。
3. SwiftUI监控客户端的构建
SwiftUI客户端的核心是消费查询API提供的数据,并将其以高效、直观的方式呈现。
数据模型 (Data Models):
首先,定义与后端Query API响应匹配的Codable
结构体。
// Models/JobStatus.swift
import Foundation
// 这是一个简化的读模型,用于在主列表中显示任务概览
struct JobStatus: Identifiable, Codable, Hashable {
let id: UUID
let url: String
let status: JobState
let startedAt: Date
let lastUpdatedAt: Date
let failureReason: String?
enum JobState: String, Codable {
case pending = "PENDING"
case running = "RUNNING"
case succeeded = "SUCCEEDED"
case failed = "FAILED"
}
}
API服务层 (API Service):
使用async/await
封装网络请求。
// Services/APIService.swift
import Foundation
class APIService {
private let baseURL = URL(string: "http://api.your-backend.com/query")!
func fetchJobStatuses() async throws -> [JobStatus] {
let (data, response) = try await URLSession.shared.data(from: baseURL.appendingPathComponent("jobs"))
guard let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 200 else {
throw URLError(.badServerResponse)
}
// 使用自定义的JSONDecoder处理日期格式
let decoder = JSONDecoder()
decoder.dateDecodingStrategy = .iso8601
return try decoder.decode([JobStatus].self, from: data)
}
// 可以在此添加提交新任务(Command)的方法
func submitNewJob(url: String) async throws {
// ... 实现POST请求到Command API
}
}
视图 (View) 与视图模型 (ViewModel):
使用StateObject
和@Published
属性来驱动UI的响应式更新。
// ViewModels/JobListViewModel.swift
import Foundation
import Combine
@MainActor
class JobListViewModel: ObservableObject {
@Published var jobs: [JobStatus] = []
@Published var isLoading = false
@Published var errorMessage: String? = nil
private let apiService = APIService()
func loadJobs() async {
isLoading = true
errorMessage = nil
do {
jobs = try await apiService.fetchJobStatuses()
} catch {
errorMessage = "Failed to load jobs: \(error.localizedDescription)"
}
isLoading = false
}
}
// Views/JobListView.swift
import SwiftUI
struct JobListView: View {
@StateObject private var viewModel = JobListViewModel()
var body: some View {
NavigationView {
VStack {
if viewModel.isLoading {
ProgressView()
} else if let errorMessage = viewModel.errorMessage {
Text(errorMessage).foregroundColor(.red)
} else {
List(viewModel.jobs) { job in
VStack(alignment: .leading) {
Text(job.url)
.font(.headline)
.lineLimit(1)
HStack {
Text(job.status.rawValue)
.font(.caption)
.padding(4)
.background(statusColor(for: job.status))
.foregroundColor(.white)
.cornerRadius(4)
Spacer()
Text(job.lastUpdatedAt, style: .relative)
.font(.caption2)
.foregroundColor(.secondary)
}
}
}
}
}
.navigationTitle("Scraping Jobs")
.onAppear {
Task {
await viewModel.loadJobs()
}
}
.toolbar {
ToolbarItem {
Button(action: {
Task { await viewModel.loadJobs() }
}) {
Image(systemName: "arrow.clockwise")
}
}
}
}
}
private func statusColor(for status: JobStatus.JobState) -> Color {
switch status {
case .pending: return .gray
case .running: return .blue
case .succeeded: return .green
case .failed: return .red
}
}
}
这个SwiftUI实现展示了一个基础但功能完备的监控列表。在真实项目中,点击列表项可以导航到详情页,该页面会查询并展示特定job_id
的完整事件流,为运维人员提供根本原因分析所需的所有上下文。
架构的扩展性与局限性
该架构的优雅之处在于其天然的可扩展性。需要支持一种新型的数据源(例如,从FTP文件而非网页抓取)?只需开发一个新的Worker类型,它同样向Cassandra事件存储中发布标准的领域事件即可。业务部门需要一个新的数据报表?只需开发一个新的投影服务,从事件流中聚合数据,生成所需的读模型,而无需对核心的采集和写入逻辑做任何改动。
然而,这个架构并非银弹。它的主要代价是最终一致性。从一个命令被执行,到其产生的事件被投影服务处理并更新到读模型,存在一个时间延迟。对于我们的监控场景,秒级的延迟是完全可以接受的。但如果业务场景要求写入后必须能立刻读到最新结果,那么CQRS可能不是最合适的选择,或者需要引入更复杂的机制来处理读写一致性。
另一个需要管理的复杂性是事件模型的演进。随着业务发展,事件的结构可能会发生变化。这要求我们建立一套严格的版本控制和迁移策略(例如,upcasting),以确保新旧版本的事件都能被系统正确解析。
最后,选择SwiftUI作为客户端,意味着将运维工具链锁定在苹果生态系统。这是一个战略决策,其前提是运维团队已经或计划统一使用Mac/iPad作为工作设备,以换取原生应用带来的极致性能和用户体验。如果需要跨平台支持,则需要重新评估客户端的技术选型。