当 Nomad 集群中的微服务数量从 10 个增长到 50 个以上时,依赖关系图就成了一张无人能理清的蛛网。一次下游服务的变更,没人能准确说出到底会影响多少上游调用方,发布评审会变成了基于猜测的风险评估。传统的做法是靠人力维护文档,但这在敏捷迭代中几乎立刻过时。另一个选项是引入重量级的 APM 工具或服务网格,但对于现有技术栈而言,这意味着巨大的侵入性改造和陡峭的学习曲线。我们需要一个轻量级、非侵入式的方案,在不改变任何应用代码的前提下,动态地绘制出服务间的调用拓扑。
我们的突破口在于所有南北向流量都收敛到了作为边缘代理的 Nginx 集群。如果能让 Nginx 在转发流量时,以一种机器可读的方式记录下“谁调用了谁”,再结合 Nomad 作为调度系统所掌握的“谁在哪里”的全景信息,理论上就能拼凑出完整的调用链。
第一步:构建可观测的基础设施
要实现这个目标,首先需要一个标准化的环境。我们使用 Nomad 来部署所有服务,包括几个示例 Flask 应用、Nginx 网关,以及我们即将构建的拓扑收集器。
1. 示例微服务定义
我们定义三个 Flask 微服务:api-gateway
,user-service
和 order-service
。api-gateway
是唯一的入口,它会分别调用 user-service
和 order-service
。在真实项目中,这些服务会更复杂,但这里的关键是它们的网络关系和部署方式。
这是 user-service
的 Nomad job 文件,order-service
和 api-gateway
与此类似。
# file: user-service.nomad.hcl
job "user-service" {
datacenters = ["dc1"]
type = "service"
group "app" {
count = 2 # 部署两个实例以模拟真实场景
network {
port "http" {}
}
service {
name = "user-service"
tags = ["python", "flask", "v1"]
port = "http"
# Consul Connect 不是必须的,但注册到 Consul 是关键
# Nomad 会自动将服务注册到 Consul
check {
type = "http"
path = "/health"
interval = "10s"
timeout = "2s"
}
}
task "server" {
driver = "docker"
config {
image = "python:3.9-slim"
ports = ["http"]
# 实际项目中,这里会是一个打包好的镜像
# 为了演示,我们直接在容器内运行一个简单的 Flask 应用
command = "sh"
args = [
"-c",
"""
pip install Flask
cat <<EOF > app.py
from flask import Flask, jsonify
import os
app = Flask(__name__)
@app.route('/')
def get_user():
return jsonify({
"user_id": 123,
"username": "testuser",
"service_instance": os.getenv("NOMAD_ALLOC_ID", "unknown")
})
@app.route('/health')
def health_check():
return "OK", 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=${NOMAD_PORT_http})
EOF
python app.py
"""
]
}
resources {
cpu = 100 # MHz
memory = 64 # MB
}
}
}
}
这里的核心是 service
块。Nomad 会利用它自动将该服务的每个实例注册到 Consul,提供了服务发现的基础。NOMAD_ALLOC_ID
环境变量可以唯一标识一个服务实例。
2. 配置 Nginx 以生成结构化日志
下一步是部署 Nginx 作为边缘代理,并让它记录结构化的 JSON 日志。默认的 Nginx 日志格式是为人眼设计的,解析起来非常低效且容易出错。JSON 格式则完美解决了这个问题。
Nginx 的 Nomad job 文件:
# file: nginx-gateway.nomad.hcl
job "nginx-gateway" {
datacenters = ["dc1"]
type = "service"
group "gateway" {
count = 1
network {
port "http" {
static = 80
}
}
# 共享日志卷,收集器可以从这里读取日志
volume "nginx-logs" {
type = "host"
source = "nginx-logs-volume" # Nomad将在主机上创建此目录
read_only = false
}
service {
name = "nginx-gateway"
port = "http"
}
task "nginx" {
driver = "docker"
config {
image = "nginx:1.21"
ports = ["http"]
# 将日志目录挂载到容器中
volumes = [
"/var/log/nginx:/var/log/nginx",
]
}
# 使用 template 动态生成 Nginx 配置
template {
data = <<EOF
log_format topology_json escape=json
'{'
'"msec": "$msec", '
'"request_time": "$request_time", '
'"status": "$status", '
'"request": "$request", '
'"http_host": "$http_host", '
'"remote_addr": "$remote_addr", '
'"upstream_addr": "$upstream_addr", '
'"upstream_service_name": "$upstream_http_x_nomad_service_name"'
'}';
server {
listen {{ env "NOMAD_PORT_http" }};
access_log /var/log/nginx/access.log topology_json;
# 使用 Consul Template 查询服务地址
location /users/ {
# 这里的关键是 proxy_pass 指向 Consul 中的服务
# Nomad 默认集成了 consul-template
proxy_pass http://user-service.service.consul;
# 这个自定义 header 是一个关键技巧,用于在日志中直接标记目标服务
proxy_set_header X-Nomad-Service-Name "user-service";
}
location /orders/ {
proxy_pass http://order-service.service.consul;
proxy_set_header X-Nomad-Service-Name "order-service";
}
# api-gateway 本身也是一个服务
location / {
proxy_pass http://api-gateway.service.consul;
proxy_set_header X-Nomad-Service-Name "api-gateway";
}
}
EOF
destination = "local/nginx.conf"
}
# Nginx 启动命令
config {
command = "/usr/sbin/nginx"
args = ["-c", "/local/nginx.conf", "-g", "daemon off;"]
}
resources {
cpu = 200
memory = 128
}
}
}
}
这段配置中有几个关键点:
-
log_format topology_json
: 我们定义了一个名为topology_json
的日志格式。它捕获了请求时间、状态码等信息,但最重要的是$upstream_addr
。这个变量记录了 Nginx 实际将请求转发到的后端服务的IP:Port
。 -
proxy_set_header X-Nomad-Service-Name
: 这是一个重要的技巧。直接记录$upstream_addr
还不够,因为我们不知道这个 IP 对应的是哪个逻辑服务。通过在location
块中硬编码目标服务名并将其设置到一个自定义 Header 中,我们可以在日志里直接捕获到目标服务的逻辑名称$upstream_http_x_nomad_service_name
。这大大简化了后续的处理逻辑。 - Consul Integration:
proxy_pass http://user-service.service.consul;
利用了 Nomad 与 Consul 的无缝集成,Nginx 可以通过 DNS 直接解析到健康的后端服务实例。
第二步:开发拓扑收集与分析服务
现在我们有了数据源(Nginx JSON 日志),接下来需要一个服务来消费这些数据,并结合 Nomad API 来构建拓扑图。我们使用 Flask 来构建这个 topology-collector
服务。
它需要做三件事:
- 实时读取和解析 Nginx 日志。
- 定期从 Nomad API 查询集群中所有服务实例的信息,建立一个
IP:Port -> Service Name
的映射。 - 整合以上两部分信息,构建服务依赖图,并通过 API 暴露出来。
# file: collector/app.py
import os
import json
import time
import threading
from flask import Flask, jsonify
import requests
from collections import defaultdict
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 从环境变量获取配置
NOMAD_ADDR = os.getenv("NOMAD_ADDR", "http://127.0.0.1:4646")
NGINX_LOG_FILE = os.getenv("NGINX_LOG_FILE", "/path/to/nginx-logs/access.log") # 路径需要与 Nomad job volume 匹配
app = Flask(__name__)
# 全局变量,线程安全需要考量,但在这个简单示例中我们暂不引入锁
# 在生产环境中,应使用更健壮的并发数据结构或数据库
topology_graph = defaultdict(set)
instance_to_service_map = {}
def fetch_nomad_allocations():
"""
从 Nomad API 获取所有分配信息,构建 IP:Port -> Service Name 的映射。
这是一个核心函数,它将底层网络地址与高层服务概念连接起来。
"""
global instance_to_service_map
api_url = f"{NOMAD_ADDR}/v1/nodes"
try:
nodes_response = requests.get(api_url, timeout=5)
nodes_response.raise_for_status()
nodes = nodes_response.json()
except requests.RequestException as e:
logging.error(f"Failed to fetch nodes from Nomad: {e}")
return
new_map = {}
for node in nodes:
node_id = node['ID']
allocs_url = f"{NOMAD_ADDR}/v1/node/{node_id}/allocations"
try:
allocs_response = requests.get(allocs_url, timeout=5)
allocs_response.raise_for_status()
allocations = allocs_response.json()
except requests.RequestException as e:
logging.warning(f"Failed to fetch allocations for node {node_id}: {e}")
continue
for alloc in allocations:
if alloc['ClientStatus'] != 'running':
continue
job_id = alloc['JobID']
# 我们需要服务名,它通常和 Job ID 相同,但也可以不同
# TaskGroup 和 Task name 也可以用来区分
if 'Services' in alloc['TaskResources'] and alloc['TaskResources']['Services']:
for service in alloc['TaskResources']['Services']:
service_name = service['Name']
for network in service['Networks']:
address = f"{network['IP']}:{network['Port']}"
new_map[address] = service_name
instance_to_service_map = new_map
logging.info(f"Updated instance map. Total instances found: {len(instance_to_service_map)}")
def poll_nomad_api():
"""后台线程,定期刷新服务实例映射"""
while True:
fetch_nomad_allocations()
time.sleep(30) # 生产环境中这个间隔可以调整
def follow(thefile):
"""一个简单的 'tail -f' 实现"""
thefile.seek(0, 2) # Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1)
continue
yield line
def process_nginx_logs():
"""后台线程,持续读取并处理 Nginx 日志"""
global topology_graph
# 等待日志文件被 Nginx 创建
while not os.path.exists(NGINX_LOG_FILE):
logging.warning(f"Log file {NGINX_LOG_FILE} not found. Waiting...")
time.sleep(5)
logging.info(f"Start tailing log file: {NGINX_LOG_FILE}")
logfile = open(NGINX_LOG_FILE, 'r')
loglines = follow(logfile)
for line in loglines:
try:
log_data = json.loads(line)
# $upstream_addr 可能会有多个地址,用逗号分隔
# 例如 "10.0.2.15:21856, 10.0.2.16:31234"
upstream_addrs = log_data.get("upstream_addr", "").split(", ")
if not upstream_addrs or not upstream_addrs[0]:
continue
# 在这个模型中,我们认为请求的发起方是 api-gateway
# 这里的逻辑需要根据实际网关部署情况调整
# 如果请求直接来自外部,source_service 可以标记为 'external'
# $http_host 可以帮助我们识别请求打到哪个虚拟主机
source_service = log_data.get("http_host", "external")
if "api-gateway" in source_service: # 简化处理
source_service = "api-gateway"
# 优先使用我们自定义的 Header
target_service_name = log_data.get("upstream_service_name")
if not target_service_name:
# 如果 Header 不存在,则回退到基于 IP 的查找
# 这里的挑战在于一个请求可能被负载均衡到多个实例
# 我们只取第一个
target_addr = upstream_addrs[0]
target_service_name = instance_to_service_map.get(target_addr)
if target_service_name:
# 防止 A->B, B->C 这样的链条中,将 B 误判为源头
# 在我们的边缘代理模型中,所有记录到的请求源头都是 Nginx 的调用者
# 这里需要更复杂的逻辑来重建内部调用链,但对于边缘拓扑已足够
# 更新拓扑图,使用 set 避免重复
current_deps = topology_graph[source_service]
if target_service_name not in current_deps:
topology_graph[source_service].add(target_service_name)
logging.info(f"New dependency discovered: {source_service} -> {target_service_name}")
else:
logging.warning(f"Could not map upstream address '{upstream_addrs[0]}' to a known Nomad service.")
except json.JSONDecodeError:
logging.warning(f"Failed to decode JSON log line: {line.strip()}")
except Exception as e:
logging.error(f"Error processing log line: {e}")
@app.route('/topology')
def get_topology():
"""API 端点,以 JSON 格式返回拓扑图"""
# 将 set 转换为 list 以便 JSON 序列化
serializable_graph = {source: list(dests) for source, dests in topology_graph.items()}
return jsonify(serializable_graph)
@app.route('/health')
def health_check():
return "OK", 200
if __name__ == '__main__':
# 启动后台线程
nomad_poller = threading.Thread(target=poll_nomad_api, daemon=True)
nomad_poller.start()
log_processor = threading.Thread(target=process_nginx_logs, daemon=True)
log_processor.start()
app.run(host='0.0.0.0', port=5000)
这个 Flask 应用的设计有几个值得注意的地方:
- 解耦: 读取日志和查询 Nomad API 是两个独立的后台线程。这确保了 API 的响应不会被 I/O 操作阻塞。
- 容错: 代码中包含了对 API 请求失败和日志解析错误的异常处理。在生产环境中,这部分需要更健壮的重试和告警机制。
-
instance_to_service_map
: 这个内存缓存是连接 Nginx 日志(物理层)和 Nomad 服务(逻辑层)的桥梁。它的数据新鲜度决定了拓扑发现的准确性。30秒的轮询间隔是一个权衡,兼顾了实时性和对 Nomad API 的压力。 - 回退机制:
proxy_set_header
是首选方案,因为它准确无误。但代码也保留了通过 IP 反查服务的能力,作为一种备用。在真实项目中,这种 IP 反查的逻辑会更复杂,需要处理 NAT、Overlay 网络等情况。
第三步:部署与可视化
最后,我们将 topology-collector
也部署到 Nomad 集群,并让它能够访问 Nginx 的日志卷和 Nomad 的 API。
# file: collector.nomad.hcl
job "topology-collector" {
datacenters = ["dc1"]
type = "service"
group "collector" {
count = 1
# 与 Nginx job 共享同一个日志卷,但设置为只读
volume "nginx-logs" {
type = "host"
source = "nginx-logs-volume"
read_only = true
}
network {
port "http" {}
}
service {
name = "topology-collector"
port = "http"
}
task "server" {
driver = "docker"
config {
image = "your-repo/topology-collector:latest" # 假设已将 Flask 应用打包成镜像
ports = ["http"]
# 将日志卷挂载到容器内,路径要与 Flask 应用的环境变量匹配
volumes = [
"/path/on/host/nginx-logs-volume:/var/log/nginx-logs"
]
}
env {
NOMAD_ADDR = "http://${attr.unique.network.ip-address}:4646"
NGINX_LOG_FILE = "/var/log/nginx-logs/access.log"
}
resources {
cpu = 200
memory = 128
}
}
}
}
部署完成后,我们可以通过 topology-collector
的 /topology
API 获取数据。假设我们向 api-gateway
发送了一些请求,它内部调用了 user-service
和 order-service
。Nginx 日志会记录下这些 proxy_pass
操作。topology-collector
解析后,API 的返回可能如下:
{
"api-gateway": [
"user-service",
"order-service"
],
"external": [
"api-gateway"
]
}
这些数据可以直接用于生成可视化图表。例如,可以使用 Mermaid.js 来绘制服务依赖图。
graph TD subgraph Nomad Cluster External_Client -- "requests" --> api-gateway; api-gateway -- "gets user data" --> user-service; api-gateway -- "creates order" --> order-service; end style External_Client fill:#f9f,stroke:#333,stroke-width:2px
局限性与未来迭代方向
这个方案虽然轻量且有效,但并非没有局限性。
覆盖范围: 它只能发现通过被监控的 Nginx 集群的流量。服务间的直接内部通信(例如,服务A直接调用服务B的内部地址,或通过消息队列通信)将无法被捕获。要覆盖这部分,需要扩展监控点,例如在每个节点上部署一个日志代理来捕获所有出口流量,或者转向服务网格。
调用关系粒度: 当前方案只能识别到“服务A调用了服务B”,但无法知道是哪个具体的 API 端点被调用。可以通过在 Nginx 日志中添加
$request_uri
字段来增强日志,从而获得更细粒度的依赖关系,例如api-gateway:/v1/users
->user-service:/
。数据持久化: 当前实现是纯内存的,重启后拓扑信息会丢失。一个自然的演进是引入一个时序数据库或图数据库(如 Prometheus 或 Neo4j)来存储拓扑关系,这样不仅可以持久化数据,还能分析依赖关系随时间的变化。
环境复杂性: 在更复杂的网络环境中,例如存在多层代理或 NAT 时,从日志中获取真实的调用方和被调用方 IP 会变得更加困难。可能需要依赖分布式追踪的
trace-id
来串联整条调用链,但这又回到了对应用代码有侵入性的方案上。
尽管存在这些局限,但对于许多以边缘代理为主要流量入口的系统而言,这是一个投入产出比极高的可观测性建设方案。它以最小的成本,解决了“谁依赖我,我依赖谁”这个核心问题,为架构决策、故障排查和容量规划提供了坚实的数据基础。