我们的视频会议平台开始频繁出现无法复现的质量投诉。用户反馈“卡顿”、“声音断断续续”,但我们的基础监控——CPU、内存、网络IO——一切正常。问题在于,这些宏观指标无法反映单个 WebRTC 流的微观体验。我们依赖 WebRTC 的 getStats()
API,但它每秒为每个连接生成数百个指标,对于一个拥有数千并发流的平台,这意味着每秒数百万个数据点。用传统的 Prometheus 拉取模式来处理这种高基数、高频率的数据,无异于一场灾难。我们正在驾驶一架没有仪表盘的飞机,而且外面有雾。
最初的构想是简单的:客户端定期上报 getStats()
快照,服务端接收后存入时序数据库,再由 Grafana 展示。这个方案很快被否决。原始的 getStats()
数据是瞬时快照,直接存储价值有限。例如,packetsLost
是一个累计值,我们需要的是两次报告之间的增量,即丢包率。这意味着我们需要一个有状态的流处理层。此外,我们需要的不仅仅是展示原始数据,而是进行实时聚合分析,比如计算“过去五分钟内,某特定 ISP 提供商下所有用户的平均抖动(jitter)”。这种动态的多维度切片分析,对任何现成的时序数据库都是一个巨大的查询压力。
我们需要一个数据管道,它必须能:
- 稳定接收来自海量客户端的高频
getStats()
JSON 数据。 - 对数据进行有状态的实时计算(如计算差值、速率)。
- 支持灵活的、分布式的聚合分析。
- 将聚合结果持久化,用于趋势分析和仪表盘展示。
- 整个系统的维护成本必须在可控范围内。
经过一番技术选型,我们确定了核心技术栈:Dask 用于分布式计算,MariaDB 用于聚合结果存储,Grafana 作为最终的可视化前端。选择 Dask 是因为我们的技术栈以 Python 为主,Dask 提供了与 Pandas、NumPy 相似的 API,能平滑地将单机分析代码扩展到分布式集群,且比 Spark 轻量,没有 JVM 的额外开销。MariaDB 对于我们的运维团队来说驾轻就熟,用于存储每分钟或每五分钟一次的聚合结果,性能绰绰有余,避免了引入新的、更复杂的时序数据库带来的学习和维护成本。
第一步:构建数据采集与分发层
这是整个管道的入口。我们选择使用 FastAPI 构建一个 WebSocket 端点,因为它异步、高性能,非常适合处理大量持久连接。客户端通过 WebSocket 连接上报数据,避免了 HTTP 的频繁建连开销。
客户端的 getStats
上报逻辑大致如下。这里的关键是,我们不只发送一次,而是使用 setInterval
定期上报,以便服务端计算速率。
// client_reporter.js
class WebRTCStatsReporter {
constructor(peerConnection, endpointUrl, reportInterval = 5000) {
if (!peerConnection || !(peerConnection instanceof RTCPeerConnection)) {
throw new Error("A valid RTCPeerConnection instance is required.");
}
this.pc = peerConnection;
this.endpointUrl = endpointUrl;
this.reportInterval = reportInterval;
this.ws = null;
this.intervalId = null;
this.sessionId = this.generateSessionId();
this.lastReport = null; // 用于计算增量
}
generateSessionId() {
return 'session_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
}
connect() {
console.log(`[StatsReporter] Attempting to connect to ${this.endpointUrl}`);
this.ws = new WebSocket(this.endpointUrl);
this.ws.onopen = () => {
console.log("[StatsReporter] WebSocket connection established.");
this.start();
};
this.ws.onclose = (event) => {
console.warn("[StatsReporter] WebSocket connection closed.", event);
this.stop();
// 在生产环境中,这里应该有重连逻辑
};
this.ws.onerror = (error) => {
console.error("[StatsReporter] WebSocket error:", error);
this.stop();
};
}
async reportStats() {
if (this.ws.readyState !== WebSocket.OPEN) {
console.warn("[StatsReporter] WebSocket is not open. Skipping report.");
return;
}
try {
const stats = await this.pc.getStats();
const report = {
sessionId: this.sessionId,
timestamp: new Date().toISOString(),
stats: [],
};
stats.forEach(stat => {
// 我们只关心特定类型的统计数据以减少负载
if (stat.type === 'inbound-rtp' || stat.type === 'outbound-rtp' || stat.type === 'remote-outbound-rtp') {
// 为了演示,我们只挑选几个关键指标
const filteredStat = {
id: stat.id,
type: stat.type,
kind: stat.kind,
packetsSent: stat.packetsSent,
bytesSent: stat.bytesSent,
packetsReceived: stat.packetsReceived,
bytesReceived: stat.bytesReceived,
packetsLost: stat.packetsLost,
jitter: stat.jitter,
roundTripTime: stat.roundTripTime,
framesPerSecond: stat.framesPerSecond,
};
report.stats.push(filteredStat);
}
});
// 发送完整报告
this.ws.send(JSON.stringify(report));
this.lastReport = report; // 保存上一次的报告,尽管计算在后端进行
} catch (error) {
console.error("[StatsReporter] Error getting stats:", error);
}
}
start() {
if (this.intervalId) {
this.stop();
}
console.log(`[StatsReporter] Starting to report stats every ${this.reportInterval}ms.`);
this.intervalId = setInterval(() => this.reportStats(), this.reportInterval);
}
stop() {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
console.log("[StatsReporter] Stopped reporting stats.");
}
}
}
// 使用示例:
// const pc = new RTCPeerConnection();
// const reporter = new WebRTCStatsReporter(pc, 'ws://localhost:8000/ws/stats');
// reporter.connect();
服务端 FastAPI 应用接收数据,然后将其推送到一个内部队列中,这个队列将是 Dask 集群的入口。我们这里使用 Dask 的 dask.distributed.Queue
,它能很好地与 Dask 的调度器集成。
# collector_server.py
import asyncio
import logging
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from dask.distributed import Client, Queue
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 初始化 Dask Client 和 Queue
# 在生产环境中,这里的地址应该是 Dask Scheduler 的地址
DASK_SCHEDULER_ADDRESS = 'tcp://127.0.0.1:8786'
try:
dask_client = Client(DASK_SCHEDULER_ADDRESS)
stats_queue = Queue('webrtc-stats', client=dask_client)
logging.info(f"Successfully connected to Dask scheduler at {DASK_SCHEDULER_ADDRESS}")
except Exception as e:
logging.error(f"Could not connect to Dask scheduler: {e}")
# 在无法连接Dask时退出,或实现降级逻辑
exit(1)
app = FastAPI()
@app.websocket("/ws/stats")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
logging.info(f"WebSocket connection accepted from {websocket.client.host}")
try:
while True:
data = await websocket.receive_text()
try:
# 验证JSON格式,但不做深度业务验证,交给下游处理
payload = json.loads(data)
# 放入Dask队列,这是非阻塞的
await stats_queue.put(payload)
except json.JSONDecodeError:
logging.warning(f"Received non-JSON message from {websocket.client.host}")
except Exception as e:
logging.error(f"Error processing message: {e}")
# 考虑是否需要断开连接
except WebSocketDisconnect:
logging.info(f"WebSocket connection closed for {websocket.client.host}")
except Exception as e:
logging.error(f"An unexpected error occurred in WebSocket handler: {e}")
if __name__ == "__main__":
import uvicorn
# 在生产环境中,应该使用 Gunicorn + Uvicorn workers
uvicorn.run(app, host="0.0.0.0", port=8000)
这个采集服务本身是无状态的,可以水平扩展。所有有状态的计算和聚合都将由 Dask 集群处理。
第二步:Dask 分布式处理管道
这是系统的核心。Dask worker 将从队列中拉取原始 getStats
数据,进行一系列转换和聚合。整个流程可以被看作一个流式处理任务。
graph TD subgraph Clients C1[WebRTC Client 1] C2[WebRTC Client 2] C3[...] end subgraph Collector Service WS[FastAPI WebSocket] end subgraph Dask Cluster A[Dask Queue] B1[Worker 1: Preprocessing] B2[Worker 2: Preprocessing] B3[...] C[Aggregation & State Management] end subgraph Storage & Viz DB[(MariaDB)] G[Grafana] end Clients -- getStats() JSON --> WS WS -- Raw Payload --> A A -- Distributes Tasks --> B1 A -- Distributes Tasks --> B2 A -- Distributes Tasks --> B3 B1 -- Processed Data --> C B2 -- Processed Data --> C B3 -- Processed Data --> C C -- Aggregated Batches --> DB G -- SQL Queries --> DB
我们的 Dask 处理脚本需要持续运行,它定义了处理逻辑。主要分为三部分:
- 预处理: 计算指标的增量值。
- 状态管理: 缓存上一次的报告以计算增量。
- 聚合与入库: 按时间窗口聚合数据并批量写入 MariaDB。
# dask_processor.py
import time
import logging
from collections import defaultdict, deque
from datetime import datetime, timezone
import pandas as pd
from dask.distributed import Client, Queue, Actor
import sqlalchemy
# 配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('DaskProcessor')
DASK_SCHEDULER_ADDRESS = 'tcp://127.0.0.1:8786'
MARIADB_CONN_STR = "mysql+pymysql://user:password@localhost:3306/webrtc_monitoring"
# 数据聚合的批处理大小和时间窗口
BATCH_SIZE = 1000
FLUSH_INTERVAL_SECONDS = 60
class StatsProcessor:
"""
一个简单的 Dask Actor,用于维护状态(上一次的统计报告)
在真实的生产环境中,这个状态可能需要持久化到Redis等外部存储,以防Actor失败
"""
def __init__(self):
# key: sessionId, value: last_report
self.session_states = {}
logger.info("StatsProcessor Actor initialized.")
def process(self, report):
session_id = report.get('sessionId')
if not session_id:
return None
last_report = self.session_states.get(session_id)
self.session_states[session_id] = report
if not last_report:
# 这是该会话的第一个报告,无法计算增量
return None
# 计算时间差(秒)
current_ts = datetime.fromisoformat(report['timestamp'].replace('Z', '+00:00'))
last_ts = datetime.fromisoformat(last_report['timestamp'].replace('Z', '+00:00'))
delta_seconds = (current_ts - last_ts).total_seconds()
if delta_seconds <= 0:
return None
processed_metrics = []
# 将 stats 列表转换为以 id 为 key 的字典,方便查找
current_stats_map = {s['id']: s for s in report.get('stats', [])}
last_stats_map = {s['id']: s for s in last_report.get('stats', [])}
for stat_id, current_stat in current_stats_map.items():
last_stat = last_stats_map.get(stat_id)
if not last_stat or current_stat['type'] != last_stat['type']:
continue
# 计算增量指标
if current_stat['type'] == 'outbound-rtp':
delta_packets_sent = (current_stat.get('packetsSent', 0) or 0) - (last_stat.get('packetsSent', 0) or 0)
delta_bytes_sent = (current_stat.get('bytesSent', 0) or 0) - (last_stat.get('bytesSent', 0) or 0)
processed_metrics.append({
'timestamp': current_ts,
'session_id': session_id,
'stat_id': stat_id,
'kind': current_stat.get('kind'),
'metric_name': 'packets_sent_rate',
'value': delta_packets_sent / delta_seconds
})
processed_metrics.append({
'timestamp': current_ts,
'session_id': session_id,
'stat_id': stat_id,
'kind': current_stat.get('kind'),
'metric_name': 'bitrate_sent_kbps',
'value': (delta_bytes_sent * 8) / (delta_seconds * 1024)
})
elif current_stat['type'] == 'inbound-rtp':
delta_packets_lost = (current_stat.get('packetsLost', 0) or 0) - (last_stat.get('packetsLost', 0) or 0)
delta_packets_received = (current_stat.get('packetsReceived', 0) or 0) - (last_stat.get('packetsReceived', 0) or 0)
total_packets = delta_packets_received + delta_packets_lost
packet_loss_ratio = (delta_packets_lost / total_packets) if total_packets > 0 else 0.0
processed_metrics.append({
'timestamp': current_ts,
'session_id': session_id,
'stat_id': stat_id,
'kind': current_stat.get('kind'),
'metric_name': 'packet_loss_ratio',
'value': packet_loss_ratio
})
processed_metrics.append({
'timestamp': current_ts,
'session_id': session_id,
'stat_id': stat_id,
'kind': current_stat.get('kind'),
'metric_name': 'jitter',
'value': current_stat.get('jitter', 0) or 0.0
})
return processed_metrics
class DatabaseWriter:
"""
负责将聚合数据批量写入MariaDB
"""
def __init__(self, conn_str):
self.engine = sqlalchemy.create_engine(conn_str)
self.buffer = []
self.last_flush_time = time.time()
def write(self, df):
try:
# 这里的表名 'qos_aggregates_per_minute' 需要预先创建
df.to_sql('qos_aggregates_per_minute', self.engine, if_exists='append', index=False)
logger.info(f"Successfully wrote {len(df)} rows to MariaDB.")
except Exception as e:
logger.error(f"Failed to write to MariaDB: {e}")
def aggregate_and_flush(self, metrics_buffer):
if not metrics_buffer:
return
df = pd.DataFrame(metrics_buffer)
df['timestamp'] = pd.to_datetime(df['timestamp'])
# 按分钟、媒体类型和指标名称进行聚合
# 在真实场景中,可能还会按地域、ISP等维度聚合
df_agg = df.groupby([
pd.Grouper(key='timestamp', freq='1Min'),
'kind',
'metric_name'
]).agg(
value_avg=('value', 'mean'),
value_p95=('value', lambda x: x.quantile(0.95)),
value_max=('value', 'max'),
count=('value', 'count')
).reset_index()
df_agg.rename(columns={'timestamp': 'time_bucket'}, inplace=True)
logger.info(f"Aggregated {len(df)} metrics into {len(df_agg)} rows.")
self.write(df_agg)
async def main():
client = Client(DASK_SCHEDULER_ADDRESS)
logger.info("Dask client connected.")
stats_queue = Queue('webrtc-stats', client=client)
# 将Processor实例化为Actor,使其在某个worker上运行并保持状态
processor_actor = await client.submit(StatsProcessor, actor=True)
db_writer = DatabaseWriter(MARIADB_CONN_STR)
metrics_buffer = []
last_flush_time = time.time()
while True:
try:
# 从队列中批量获取数据以提高效率
raw_reports = await stats_queue.get(batch=True, timeout=5)
if not raw_reports:
# 检查是否需要因为超时而 flush buffer
if time.time() - last_flush_time > FLUSH_INTERVAL_SECONDS and metrics_buffer:
db_writer.aggregate_and_flush(metrics_buffer)
metrics_buffer = []
last_flush_time = time.time()
continue
# 将处理任务分发给Actor
# 这里使用了简单的 client.map 模型,对于更复杂的DAG可以使用 dask.delayed
futures = client.map(lambda report: processor_actor.process(report), raw_reports)
results = await client.gather(futures)
for res in results:
if res:
metrics_buffer.extend(res)
# 检查是否满足 flush 条件
if len(metrics_buffer) >= BATCH_SIZE or \
(time.time() - last_flush_time > FLUSH_INTERVAL_SECONDS and metrics_buffer):
db_writer.aggregate_and_flush(metrics_buffer)
metrics_buffer = []
last_flush_time = time.time()
except asyncio.TimeoutError:
# 队列为空时超时是正常的
pass
except Exception as e:
logger.error(f"An error occurred in the main processing loop: {e}", exc_info=True)
# 等待一段时间再重试,防止CPU空转
time.sleep(5)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
第三步:MariaDB 存储层设计
我们的聚合数据不需要太复杂的结构。一张宽表足以满足 Grafana 的查询需求。关键在于索引的设计,time_bucket
列是查询的主要入口,必须建立索引。
-- mariadb_schema.sql
CREATE DATABASE IF NOT EXISTS webrtc_monitoring;
USE webrtc_monitoring;
CREATE TABLE `qos_aggregates_per_minute` (
`time_bucket` datetime NOT NULL COMMENT '聚合时间窗口(分钟级)',
`kind` varchar(10) NOT NULL COMMENT '媒体类型: audio/video',
`metric_name` varchar(50) NOT NULL COMMENT '指标名称: packet_loss_ratio, jitter, bitrate_sent_kbps 等',
`value_avg` double DEFAULT NULL COMMENT '窗口内的平均值',
`value_p95` double DEFAULT NULL COMMENT '窗口内的95百分位值',
`value_max` double DEFAULT NULL COMMENT '窗口内的最大值',
`count` int(11) unsigned NOT NULL COMMENT '聚合的样本点数量',
PRIMARY KEY (`time_bucket`, `kind`, `metric_name`),
INDEX `idx_time_bucket` (`time_bucket`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 在生产环境中,可以考虑使用分区表来优化历史数据的查询和维护
-- ALTER TABLE qos_aggregates_per_minute
-- PARTITION BY RANGE (TO_DAYS(time_bucket)) (
-- PARTITION p202310 VALUES LESS THAN (TO_DAYS('2023-11-01')),
-- ...
-- );
这个表结构简单、清晰。主键确保了每个时间桶内同一种指标的唯一性。time_bucket
上的独立索引加速了按时间范围的查询,这是 Grafana 最常见的查询模式。
第四步:Grafana 可视化
最后一步是将存储在 MariaDB 中的数据呈现出来。在 Grafana 中,我们添加一个新的 MySQL
类型的数据源,指向我们的 MariaDB 实例。
然后,创建仪表盘面板,使用 SQL 查询数据。例如,要显示视频流的平均丢包率时序图:
-- Grafana Panel Query: Video Packet Loss Ratio
SELECT
time_bucket AS "time",
value_avg AS "Average Packet Loss Ratio"
FROM qos_aggregates_per_minute
WHERE
$__timeFilter(time_bucket) AND
kind = 'video' AND
metric_name = 'packet_loss_ratio'
ORDER BY
time_bucket;
要显示音频流的 P95 抖动:
-- Grafana Panel Query: Audio Jitter (P95)
SELECT
time_bucket AS "time",
value_p95 AS "Jitter (P95)"
FROM qos_aggregates_per_minute
WHERE
$__timeFilter(time_bucket) AND
kind = 'audio' AND
metric_name = 'jitter'
ORDER BY
time_bucket;
Grafana 的 $__timeFilter
宏会自动替换为仪表盘右上角选择的时间范围,非常方便。通过组合不同的查询和图表类型,我们可以构建一个全面的 QoS 监控仪表盘,从全局视角下钻到特定问题。
方案的局限性与未来展望
这个方案并非没有缺点。首先,Dask Actor 的状态是存储在内存中的,如果承载 Actor 的 worker 进程崩溃,会丢失部分会话的状态信息,导致一段时间内的增量计算不准确。生产级的实现需要将状态外部化到 Redis 或类似的快速 K-V 存储中。
其次,聚合粒度是固定的(每分钟)。虽然这对于宏观监控足够,但无法满足对特定用户会话进行深度问题排查的需求。一个可行的改进是实现一个“双写”策略:聚合数据写入 MariaDB 用于全局仪表盘,同时将部分原始或半处理过的数据采样后写入像 Elasticsearch 或 Loki 这样的日志系统中,用于特定 sessionId
的下钻查询。
最后,当前的聚合维度比较单一。随着业务发展,我们可能需要加入更多维度,如用户地理位置、ISP、设备类型等。这会增加 MariaDB 表的复杂度和数据量,届时可能需要考虑引入 ClickHouse 这样的 OLAP 数据库来替代 MariaDB,以保持高效的即席查询性能。但作为起点,当前这套架构在成本、性能和运维复杂度之间取得了务实的平衡,成功地为我们这架高速飞行的飞机装上了急需的仪表盘。