利用 Dask MariaDB 与 Grafana 实现大规模 WebRTC 服务的可观测性管道


我们的视频会议平台开始频繁出现无法复现的质量投诉。用户反馈“卡顿”、“声音断断续续”,但我们的基础监控——CPU、内存、网络IO——一切正常。问题在于,这些宏观指标无法反映单个 WebRTC 流的微观体验。我们依赖 WebRTC 的 getStats() API,但它每秒为每个连接生成数百个指标,对于一个拥有数千并发流的平台,这意味着每秒数百万个数据点。用传统的 Prometheus 拉取模式来处理这种高基数、高频率的数据,无异于一场灾难。我们正在驾驶一架没有仪表盘的飞机,而且外面有雾。

最初的构想是简单的:客户端定期上报 getStats() 快照,服务端接收后存入时序数据库,再由 Grafana 展示。这个方案很快被否决。原始的 getStats() 数据是瞬时快照,直接存储价值有限。例如,packetsLost 是一个累计值,我们需要的是两次报告之间的增量,即丢包率。这意味着我们需要一个有状态的流处理层。此外,我们需要的不仅仅是展示原始数据,而是进行实时聚合分析,比如计算“过去五分钟内,某特定 ISP 提供商下所有用户的平均抖动(jitter)”。这种动态的多维度切片分析,对任何现成的时序数据库都是一个巨大的查询压力。

我们需要一个数据管道,它必须能:

  1. 稳定接收来自海量客户端的高频 getStats() JSON 数据。
  2. 对数据进行有状态的实时计算(如计算差值、速率)。
  3. 支持灵活的、分布式的聚合分析。
  4. 将聚合结果持久化,用于趋势分析和仪表盘展示。
  5. 整个系统的维护成本必须在可控范围内。

经过一番技术选型,我们确定了核心技术栈:Dask 用于分布式计算,MariaDB 用于聚合结果存储,Grafana 作为最终的可视化前端。选择 Dask 是因为我们的技术栈以 Python 为主,Dask 提供了与 Pandas、NumPy 相似的 API,能平滑地将单机分析代码扩展到分布式集群,且比 Spark 轻量,没有 JVM 的额外开销。MariaDB 对于我们的运维团队来说驾轻就熟,用于存储每分钟或每五分钟一次的聚合结果,性能绰绰有余,避免了引入新的、更复杂的时序数据库带来的学习和维护成本。

第一步:构建数据采集与分发层

这是整个管道的入口。我们选择使用 FastAPI 构建一个 WebSocket 端点,因为它异步、高性能,非常适合处理大量持久连接。客户端通过 WebSocket 连接上报数据,避免了 HTTP 的频繁建连开销。

客户端的 getStats 上报逻辑大致如下。这里的关键是,我们不只发送一次,而是使用 setInterval 定期上报,以便服务端计算速率。

// client_reporter.js

class WebRTCStatsReporter {
    constructor(peerConnection, endpointUrl, reportInterval = 5000) {
        if (!peerConnection || !(peerConnection instanceof RTCPeerConnection)) {
            throw new Error("A valid RTCPeerConnection instance is required.");
        }
        this.pc = peerConnection;
        this.endpointUrl = endpointUrl;
        this.reportInterval = reportInterval;
        this.ws = null;
        this.intervalId = null;
        this.sessionId = this.generateSessionId();
        this.lastReport = null; // 用于计算增量
    }

    generateSessionId() {
        return 'session_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
    }

    connect() {
        console.log(`[StatsReporter] Attempting to connect to ${this.endpointUrl}`);
        this.ws = new WebSocket(this.endpointUrl);

        this.ws.onopen = () => {
            console.log("[StatsReporter] WebSocket connection established.");
            this.start();
        };

        this.ws.onclose = (event) => {
            console.warn("[StatsReporter] WebSocket connection closed.", event);
            this.stop();
            // 在生产环境中,这里应该有重连逻辑
        };

        this.ws.onerror = (error) => {
            console.error("[StatsReporter] WebSocket error:", error);
            this.stop();
        };
    }

    async reportStats() {
        if (this.ws.readyState !== WebSocket.OPEN) {
            console.warn("[StatsReporter] WebSocket is not open. Skipping report.");
            return;
        }

        try {
            const stats = await this.pc.getStats();
            const report = {
                sessionId: this.sessionId,
                timestamp: new Date().toISOString(),
                stats: [],
            };

            stats.forEach(stat => {
                // 我们只关心特定类型的统计数据以减少负载
                if (stat.type === 'inbound-rtp' || stat.type === 'outbound-rtp' || stat.type === 'remote-outbound-rtp') {
                    // 为了演示,我们只挑选几个关键指标
                    const filteredStat = {
                        id: stat.id,
                        type: stat.type,
                        kind: stat.kind,
                        packetsSent: stat.packetsSent,
                        bytesSent: stat.bytesSent,
                        packetsReceived: stat.packetsReceived,
                        bytesReceived: stat.bytesReceived,
                        packetsLost: stat.packetsLost,
                        jitter: stat.jitter,
                        roundTripTime: stat.roundTripTime,
                        framesPerSecond: stat.framesPerSecond,
                    };
                    report.stats.push(filteredStat);
                }
            });
            
            // 发送完整报告
            this.ws.send(JSON.stringify(report));
            this.lastReport = report; // 保存上一次的报告,尽管计算在后端进行
        } catch (error) {
            console.error("[StatsReporter] Error getting stats:", error);
        }
    }

    start() {
        if (this.intervalId) {
            this.stop();
        }
        console.log(`[StatsReporter] Starting to report stats every ${this.reportInterval}ms.`);
        this.intervalId = setInterval(() => this.reportStats(), this.reportInterval);
    }

    stop() {
        if (this.intervalId) {
            clearInterval(this.intervalId);
            this.intervalId = null;
            console.log("[StatsReporter] Stopped reporting stats.");
        }
    }
}

// 使用示例:
// const pc = new RTCPeerConnection();
// const reporter = new WebRTCStatsReporter(pc, 'ws://localhost:8000/ws/stats');
// reporter.connect();

服务端 FastAPI 应用接收数据,然后将其推送到一个内部队列中,这个队列将是 Dask 集群的入口。我们这里使用 Dask 的 dask.distributed.Queue,它能很好地与 Dask 的调度器集成。

# collector_server.py
import asyncio
import logging
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from dask.distributed import Client, Queue

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 初始化 Dask Client 和 Queue
# 在生产环境中,这里的地址应该是 Dask Scheduler 的地址
DASK_SCHEDULER_ADDRESS = 'tcp://127.0.0.1:8786'
try:
    dask_client = Client(DASK_SCHEDULER_ADDRESS)
    stats_queue = Queue('webrtc-stats', client=dask_client)
    logging.info(f"Successfully connected to Dask scheduler at {DASK_SCHEDULER_ADDRESS}")
except Exception as e:
    logging.error(f"Could not connect to Dask scheduler: {e}")
    # 在无法连接Dask时退出,或实现降级逻辑
    exit(1)

app = FastAPI()

@app.websocket("/ws/stats")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    logging.info(f"WebSocket connection accepted from {websocket.client.host}")
    try:
        while True:
            data = await websocket.receive_text()
            try:
                # 验证JSON格式,但不做深度业务验证,交给下游处理
                payload = json.loads(data)
                # 放入Dask队列,这是非阻塞的
                await stats_queue.put(payload)
            except json.JSONDecodeError:
                logging.warning(f"Received non-JSON message from {websocket.client.host}")
            except Exception as e:
                logging.error(f"Error processing message: {e}")
                # 考虑是否需要断开连接
    except WebSocketDisconnect:
        logging.info(f"WebSocket connection closed for {websocket.client.host}")
    except Exception as e:
        logging.error(f"An unexpected error occurred in WebSocket handler: {e}")

if __name__ == "__main__":
    import uvicorn
    # 在生产环境中,应该使用 Gunicorn + Uvicorn workers
    uvicorn.run(app, host="0.0.0.0", port=8000)

这个采集服务本身是无状态的,可以水平扩展。所有有状态的计算和聚合都将由 Dask 集群处理。

第二步:Dask 分布式处理管道

这是系统的核心。Dask worker 将从队列中拉取原始 getStats 数据,进行一系列转换和聚合。整个流程可以被看作一个流式处理任务。

graph TD
    subgraph Clients
        C1[WebRTC Client 1]
        C2[WebRTC Client 2]
        C3[...]
    end

    subgraph Collector Service
        WS[FastAPI WebSocket]
    end

    subgraph Dask Cluster
        A[Dask Queue]
        B1[Worker 1: Preprocessing]
        B2[Worker 2: Preprocessing]
        B3[...]
        C[Aggregation & State Management]
    end
    
    subgraph Storage & Viz
        DB[(MariaDB)]
        G[Grafana]
    end

    Clients -- getStats() JSON --> WS
    WS -- Raw Payload --> A
    A -- Distributes Tasks --> B1
    A -- Distributes Tasks --> B2
    A -- Distributes Tasks --> B3
    B1 -- Processed Data --> C
    B2 -- Processed Data --> C
    B3 -- Processed Data --> C
    C -- Aggregated Batches --> DB
    G -- SQL Queries --> DB

我们的 Dask 处理脚本需要持续运行,它定义了处理逻辑。主要分为三部分:

  1. 预处理: 计算指标的增量值。
  2. 状态管理: 缓存上一次的报告以计算增量。
  3. 聚合与入库: 按时间窗口聚合数据并批量写入 MariaDB。
# dask_processor.py
import time
import logging
from collections import defaultdict, deque
from datetime import datetime, timezone
import pandas as pd
from dask.distributed import Client, Queue, Actor
import sqlalchemy

# 配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('DaskProcessor')

DASK_SCHEDULER_ADDRESS = 'tcp://127.0.0.1:8786'
MARIADB_CONN_STR = "mysql+pymysql://user:password@localhost:3306/webrtc_monitoring"

# 数据聚合的批处理大小和时间窗口
BATCH_SIZE = 1000
FLUSH_INTERVAL_SECONDS = 60

class StatsProcessor:
    """
    一个简单的 Dask Actor,用于维护状态(上一次的统计报告)
    在真实的生产环境中,这个状态可能需要持久化到Redis等外部存储,以防Actor失败
    """
    def __init__(self):
        # key: sessionId, value: last_report
        self.session_states = {}
        logger.info("StatsProcessor Actor initialized.")

    def process(self, report):
        session_id = report.get('sessionId')
        if not session_id:
            return None

        last_report = self.session_states.get(session_id)
        self.session_states[session_id] = report
        
        if not last_report:
            # 这是该会话的第一个报告,无法计算增量
            return None

        # 计算时间差(秒)
        current_ts = datetime.fromisoformat(report['timestamp'].replace('Z', '+00:00'))
        last_ts = datetime.fromisoformat(last_report['timestamp'].replace('Z', '+00:00'))
        delta_seconds = (current_ts - last_ts).total_seconds()
        
        if delta_seconds <= 0:
            return None

        processed_metrics = []
        
        # 将 stats 列表转换为以 id 为 key 的字典,方便查找
        current_stats_map = {s['id']: s for s in report.get('stats', [])}
        last_stats_map = {s['id']: s for s in last_report.get('stats', [])}

        for stat_id, current_stat in current_stats_map.items():
            last_stat = last_stats_map.get(stat_id)
            if not last_stat or current_stat['type'] != last_stat['type']:
                continue

            # 计算增量指标
            if current_stat['type'] == 'outbound-rtp':
                delta_packets_sent = (current_stat.get('packetsSent', 0) or 0) - (last_stat.get('packetsSent', 0) or 0)
                delta_bytes_sent = (current_stat.get('bytesSent', 0) or 0) - (last_stat.get('bytesSent', 0) or 0)
                
                processed_metrics.append({
                    'timestamp': current_ts,
                    'session_id': session_id,
                    'stat_id': stat_id,
                    'kind': current_stat.get('kind'),
                    'metric_name': 'packets_sent_rate',
                    'value': delta_packets_sent / delta_seconds
                })
                processed_metrics.append({
                    'timestamp': current_ts,
                    'session_id': session_id,
                    'stat_id': stat_id,
                    'kind': current_stat.get('kind'),
                    'metric_name': 'bitrate_sent_kbps',
                    'value': (delta_bytes_sent * 8) / (delta_seconds * 1024)
                })

            elif current_stat['type'] == 'inbound-rtp':
                delta_packets_lost = (current_stat.get('packetsLost', 0) or 0) - (last_stat.get('packetsLost', 0) or 0)
                delta_packets_received = (current_stat.get('packetsReceived', 0) or 0) - (last_stat.get('packetsReceived', 0) or 0)
                total_packets = delta_packets_received + delta_packets_lost
                
                packet_loss_ratio = (delta_packets_lost / total_packets) if total_packets > 0 else 0.0

                processed_metrics.append({
                    'timestamp': current_ts,
                    'session_id': session_id,
                    'stat_id': stat_id,
                    'kind': current_stat.get('kind'),
                    'metric_name': 'packet_loss_ratio',
                    'value': packet_loss_ratio
                })
                processed_metrics.append({
                    'timestamp': current_ts,
                    'session_id': session_id,
                    'stat_id': stat_id,
                    'kind': current_stat.get('kind'),
                    'metric_name': 'jitter',
                    'value': current_stat.get('jitter', 0) or 0.0
                })

        return processed_metrics


class DatabaseWriter:
    """
    负责将聚合数据批量写入MariaDB
    """
    def __init__(self, conn_str):
        self.engine = sqlalchemy.create_engine(conn_str)
        self.buffer = []
        self.last_flush_time = time.time()

    def write(self, df):
        try:
            # 这里的表名 'qos_aggregates_per_minute' 需要预先创建
            df.to_sql('qos_aggregates_per_minute', self.engine, if_exists='append', index=False)
            logger.info(f"Successfully wrote {len(df)} rows to MariaDB.")
        except Exception as e:
            logger.error(f"Failed to write to MariaDB: {e}")

    def aggregate_and_flush(self, metrics_buffer):
        if not metrics_buffer:
            return
            
        df = pd.DataFrame(metrics_buffer)
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        
        # 按分钟、媒体类型和指标名称进行聚合
        # 在真实场景中,可能还会按地域、ISP等维度聚合
        df_agg = df.groupby([
            pd.Grouper(key='timestamp', freq='1Min'),
            'kind',
            'metric_name'
        ]).agg(
            value_avg=('value', 'mean'),
            value_p95=('value', lambda x: x.quantile(0.95)),
            value_max=('value', 'max'),
            count=('value', 'count')
        ).reset_index()

        df_agg.rename(columns={'timestamp': 'time_bucket'}, inplace=True)
        
        logger.info(f"Aggregated {len(df)} metrics into {len(df_agg)} rows.")
        self.write(df_agg)


async def main():
    client = Client(DASK_SCHEDULER_ADDRESS)
    logger.info("Dask client connected.")
    
    stats_queue = Queue('webrtc-stats', client=client)
    
    # 将Processor实例化为Actor,使其在某个worker上运行并保持状态
    processor_actor = await client.submit(StatsProcessor, actor=True)
    
    db_writer = DatabaseWriter(MARIADB_CONN_STR)
    
    metrics_buffer = []
    last_flush_time = time.time()
    
    while True:
        try:
            # 从队列中批量获取数据以提高效率
            raw_reports = await stats_queue.get(batch=True, timeout=5)
            if not raw_reports:
                # 检查是否需要因为超时而 flush buffer
                if time.time() - last_flush_time > FLUSH_INTERVAL_SECONDS and metrics_buffer:
                    db_writer.aggregate_and_flush(metrics_buffer)
                    metrics_buffer = []
                    last_flush_time = time.time()
                continue

            # 将处理任务分发给Actor
            # 这里使用了简单的 client.map 模型,对于更复杂的DAG可以使用 dask.delayed
            futures = client.map(lambda report: processor_actor.process(report), raw_reports)
            results = await client.gather(futures)

            for res in results:
                if res:
                    metrics_buffer.extend(res)
            
            # 检查是否满足 flush 条件
            if len(metrics_buffer) >= BATCH_SIZE or \
               (time.time() - last_flush_time > FLUSH_INTERVAL_SECONDS and metrics_buffer):
                db_writer.aggregate_and_flush(metrics_buffer)
                metrics_buffer = []
                last_flush_time = time.time()

        except asyncio.TimeoutError:
            # 队列为空时超时是正常的
            pass
        except Exception as e:
            logger.error(f"An error occurred in the main processing loop: {e}", exc_info=True)
            # 等待一段时间再重试,防止CPU空转
            time.sleep(5)


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

第三步:MariaDB 存储层设计

我们的聚合数据不需要太复杂的结构。一张宽表足以满足 Grafana 的查询需求。关键在于索引的设计,time_bucket 列是查询的主要入口,必须建立索引。

-- mariadb_schema.sql
CREATE DATABASE IF NOT EXISTS webrtc_monitoring;
USE webrtc_monitoring;

CREATE TABLE `qos_aggregates_per_minute` (
  `time_bucket` datetime NOT NULL COMMENT '聚合时间窗口(分钟级)',
  `kind` varchar(10) NOT NULL COMMENT '媒体类型: audio/video',
  `metric_name` varchar(50) NOT NULL COMMENT '指标名称: packet_loss_ratio, jitter, bitrate_sent_kbps 等',
  `value_avg` double DEFAULT NULL COMMENT '窗口内的平均值',
  `value_p95` double DEFAULT NULL COMMENT '窗口内的95百分位值',
  `value_max` double DEFAULT NULL COMMENT '窗口内的最大值',
  `count` int(11) unsigned NOT NULL COMMENT '聚合的样本点数量',
  PRIMARY KEY (`time_bucket`, `kind`, `metric_name`),
  INDEX `idx_time_bucket` (`time_bucket`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 在生产环境中,可以考虑使用分区表来优化历史数据的查询和维护
-- ALTER TABLE qos_aggregates_per_minute
-- PARTITION BY RANGE (TO_DAYS(time_bucket)) (
--   PARTITION p202310 VALUES LESS THAN (TO_DAYS('2023-11-01')),
--   ...
-- );

这个表结构简单、清晰。主键确保了每个时间桶内同一种指标的唯一性。time_bucket 上的独立索引加速了按时间范围的查询,这是 Grafana 最常见的查询模式。

第四步:Grafana 可视化

最后一步是将存储在 MariaDB 中的数据呈现出来。在 Grafana 中,我们添加一个新的 MySQL 类型的数据源,指向我们的 MariaDB 实例。

然后,创建仪表盘面板,使用 SQL 查询数据。例如,要显示视频流的平均丢包率时序图:

-- Grafana Panel Query: Video Packet Loss Ratio
SELECT
  time_bucket AS "time",
  value_avg AS "Average Packet Loss Ratio"
FROM qos_aggregates_per_minute
WHERE
  $__timeFilter(time_bucket) AND
  kind = 'video' AND
  metric_name = 'packet_loss_ratio'
ORDER BY
  time_bucket;

要显示音频流的 P95 抖动:

-- Grafana Panel Query: Audio Jitter (P95)
SELECT
  time_bucket AS "time",
  value_p95 AS "Jitter (P95)"
FROM qos_aggregates_per_minute
WHERE
  $__timeFilter(time_bucket) AND
  kind = 'audio' AND
  metric_name = 'jitter'
ORDER BY
  time_bucket;

Grafana 的 $__timeFilter 宏会自动替换为仪表盘右上角选择的时间范围,非常方便。通过组合不同的查询和图表类型,我们可以构建一个全面的 QoS 监控仪表盘,从全局视角下钻到特定问题。

方案的局限性与未来展望

这个方案并非没有缺点。首先,Dask Actor 的状态是存储在内存中的,如果承载 Actor 的 worker 进程崩溃,会丢失部分会话的状态信息,导致一段时间内的增量计算不准确。生产级的实现需要将状态外部化到 Redis 或类似的快速 K-V 存储中。

其次,聚合粒度是固定的(每分钟)。虽然这对于宏观监控足够,但无法满足对特定用户会话进行深度问题排查的需求。一个可行的改进是实现一个“双写”策略:聚合数据写入 MariaDB 用于全局仪表盘,同时将部分原始或半处理过的数据采样后写入像 Elasticsearch 或 Loki 这样的日志系统中,用于特定 sessionId 的下钻查询。

最后,当前的聚合维度比较单一。随着业务发展,我们可能需要加入更多维度,如用户地理位置、ISP、设备类型等。这会增加 MariaDB 表的复杂度和数据量,届时可能需要考虑引入 ClickHouse 这样的 OLAP 数据库来替代 MariaDB,以保持高效的即席查询性能。但作为起点,当前这套架构在成本、性能和运维复杂度之间取得了务实的平衡,成功地为我们这架高速飞行的飞机装上了急需的仪表盘。


  目录