基于 Nginx 结构化日志与 Nomad API 的动态服务拓扑发现实践


当 Nomad 集群中的微服务数量从 10 个增长到 50 个以上时,依赖关系图就成了一张无人能理清的蛛网。一次下游服务的变更,没人能准确说出到底会影响多少上游调用方,发布评审会变成了基于猜测的风险评估。传统的做法是靠人力维护文档,但这在敏捷迭代中几乎立刻过时。另一个选项是引入重量级的 APM 工具或服务网格,但对于现有技术栈而言,这意味着巨大的侵入性改造和陡峭的学习曲线。我们需要一个轻量级、非侵入式的方案,在不改变任何应用代码的前提下,动态地绘制出服务间的调用拓扑。

我们的突破口在于所有南北向流量都收敛到了作为边缘代理的 Nginx 集群。如果能让 Nginx 在转发流量时,以一种机器可读的方式记录下“谁调用了谁”,再结合 Nomad 作为调度系统所掌握的“谁在哪里”的全景信息,理论上就能拼凑出完整的调用链。

第一步:构建可观测的基础设施

要实现这个目标,首先需要一个标准化的环境。我们使用 Nomad 来部署所有服务,包括几个示例 Flask 应用、Nginx 网关,以及我们即将构建的拓扑收集器。

1. 示例微服务定义

我们定义三个 Flask 微服务:api-gatewayuser-serviceorder-serviceapi-gateway 是唯一的入口,它会分别调用 user-serviceorder-service。在真实项目中,这些服务会更复杂,但这里的关键是它们的网络关系和部署方式。

这是 user-service 的 Nomad job 文件,order-serviceapi-gateway 与此类似。

# file: user-service.nomad.hcl
job "user-service" {
  datacenters = ["dc1"]
  type        = "service"

  group "app" {
    count = 2 # 部署两个实例以模拟真实场景

    network {
      port "http" {}
    }

    service {
      name     = "user-service"
      tags     = ["python", "flask", "v1"]
      port     = "http"
      
      # Consul Connect 不是必须的,但注册到 Consul 是关键
      # Nomad 会自动将服务注册到 Consul
      check {
        type     = "http"
        path     = "/health"
        interval = "10s"
        timeout  = "2s"
      }
    }

    task "server" {
      driver = "docker"

      config {
        image = "python:3.9-slim"
        ports = ["http"]
        
        # 实际项目中,这里会是一个打包好的镜像
        # 为了演示,我们直接在容器内运行一个简单的 Flask 应用
        command = "sh"
        args = [
          "-c",
          """
          pip install Flask
          cat <<EOF > app.py
from flask import Flask, jsonify
import os

app = Flask(__name__)

@app.route('/')
def get_user():
    return jsonify({
        "user_id": 123, 
        "username": "testuser", 
        "service_instance": os.getenv("NOMAD_ALLOC_ID", "unknown")
    })

@app.route('/health')
def health_check():
    return "OK", 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=${NOMAD_PORT_http})
EOF
          python app.py
          """
        ]
      }
      
      resources {
        cpu    = 100 # MHz
        memory = 64  # MB
      }
    }
  }
}

这里的核心是 service 块。Nomad 会利用它自动将该服务的每个实例注册到 Consul,提供了服务发现的基础。NOMAD_ALLOC_ID 环境变量可以唯一标识一个服务实例。

2. 配置 Nginx 以生成结构化日志

下一步是部署 Nginx 作为边缘代理,并让它记录结构化的 JSON 日志。默认的 Nginx 日志格式是为人眼设计的,解析起来非常低效且容易出错。JSON 格式则完美解决了这个问题。

Nginx 的 Nomad job 文件:

# file: nginx-gateway.nomad.hcl
job "nginx-gateway" {
  datacenters = ["dc1"]
  type        = "service"

  group "gateway" {
    count = 1

    network {
      port "http" {
        static = 80
      }
    }
    
    # 共享日志卷,收集器可以从这里读取日志
    volume "nginx-logs" {
      type      = "host"
      source    = "nginx-logs-volume" # Nomad将在主机上创建此目录
      read_only = false
    }

    service {
      name = "nginx-gateway"
      port = "http"
    }

    task "nginx" {
      driver = "docker"

      config {
        image = "nginx:1.21"
        ports = ["http"]
        
        # 将日志目录挂载到容器中
        volumes = [
          "/var/log/nginx:/var/log/nginx",
        ]
      }
      
      # 使用 template 动态生成 Nginx 配置
      template {
        data = <<EOF
log_format topology_json escape=json
  '{'
    '"msec": "$msec", '
    '"request_time": "$request_time", '
    '"status": "$status", '
    '"request": "$request", '
    '"http_host": "$http_host", '
    '"remote_addr": "$remote_addr", '
    '"upstream_addr": "$upstream_addr", '
    '"upstream_service_name": "$upstream_http_x_nomad_service_name"'
  '}';

server {
    listen {{ env "NOMAD_PORT_http" }};
    access_log /var/log/nginx/access.log topology_json;

    # 使用 Consul Template 查询服务地址
    location /users/ {
        # 这里的关键是 proxy_pass 指向 Consul 中的服务
        # Nomad 默认集成了 consul-template
        proxy_pass http://user-service.service.consul;
        # 这个自定义 header 是一个关键技巧,用于在日志中直接标记目标服务
        proxy_set_header X-Nomad-Service-Name "user-service";
    }

    location /orders/ {
        proxy_pass http://order-service.service.consul;
        proxy_set_header X-Nomad-Service-Name "order-service";
    }
    
    # api-gateway 本身也是一个服务
    location / {
        proxy_pass http://api-gateway.service.consul;
        proxy_set_header X-Nomad-Service-Name "api-gateway";
    }
}
EOF
        destination = "local/nginx.conf"
      }
      
      # Nginx 启动命令
      config {
        command = "/usr/sbin/nginx"
        args    = ["-c", "/local/nginx.conf", "-g", "daemon off;"]
      }

      resources {
        cpu    = 200
        memory = 128
      }
    }
  }
}

这段配置中有几个关键点:

  1. log_format topology_json: 我们定义了一个名为 topology_json 的日志格式。它捕获了请求时间、状态码等信息,但最重要的是 $upstream_addr。这个变量记录了 Nginx 实际将请求转发到的后端服务的 IP:Port
  2. proxy_set_header X-Nomad-Service-Name: 这是一个重要的技巧。直接记录 $upstream_addr 还不够,因为我们不知道这个 IP 对应的是哪个逻辑服务。通过在 location 块中硬编码目标服务名并将其设置到一个自定义 Header 中,我们可以在日志里直接捕获到目标服务的逻辑名称 $upstream_http_x_nomad_service_name。这大大简化了后续的处理逻辑。
  3. Consul Integration: proxy_pass http://user-service.service.consul; 利用了 Nomad 与 Consul 的无缝集成,Nginx 可以通过 DNS 直接解析到健康的后端服务实例。

第二步:开发拓扑收集与分析服务

现在我们有了数据源(Nginx JSON 日志),接下来需要一个服务来消费这些数据,并结合 Nomad API 来构建拓扑图。我们使用 Flask 来构建这个 topology-collector 服务。

它需要做三件事:

  1. 实时读取和解析 Nginx 日志。
  2. 定期从 Nomad API 查询集群中所有服务实例的信息,建立一个 IP:Port -> Service Name 的映射。
  3. 整合以上两部分信息,构建服务依赖图,并通过 API 暴露出来。
# file: collector/app.py
import os
import json
import time
import threading
from flask import Flask, jsonify
import requests
from collections import defaultdict
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 从环境变量获取配置
NOMAD_ADDR = os.getenv("NOMAD_ADDR", "http://127.0.0.1:4646")
NGINX_LOG_FILE = os.getenv("NGINX_LOG_FILE", "/path/to/nginx-logs/access.log") # 路径需要与 Nomad job volume 匹配

app = Flask(__name__)

# 全局变量,线程安全需要考量,但在这个简单示例中我们暂不引入锁
# 在生产环境中,应使用更健壮的并发数据结构或数据库
topology_graph = defaultdict(set)
instance_to_service_map = {}

def fetch_nomad_allocations():
    """
    从 Nomad API 获取所有分配信息,构建 IP:Port -> Service Name 的映射。
    这是一个核心函数,它将底层网络地址与高层服务概念连接起来。
    """
    global instance_to_service_map
    
    api_url = f"{NOMAD_ADDR}/v1/nodes"
    try:
        nodes_response = requests.get(api_url, timeout=5)
        nodes_response.raise_for_status()
        nodes = nodes_response.json()
    except requests.RequestException as e:
        logging.error(f"Failed to fetch nodes from Nomad: {e}")
        return

    new_map = {}
    for node in nodes:
        node_id = node['ID']
        allocs_url = f"{NOMAD_ADDR}/v1/node/{node_id}/allocations"
        try:
            allocs_response = requests.get(allocs_url, timeout=5)
            allocs_response.raise_for_status()
            allocations = allocs_response.json()
        except requests.RequestException as e:
            logging.warning(f"Failed to fetch allocations for node {node_id}: {e}")
            continue

        for alloc in allocations:
            if alloc['ClientStatus'] != 'running':
                continue
            
            job_id = alloc['JobID']
            # 我们需要服务名,它通常和 Job ID 相同,但也可以不同
            # TaskGroup 和 Task name 也可以用来区分
            
            if 'Services' in alloc['TaskResources'] and alloc['TaskResources']['Services']:
                for service in alloc['TaskResources']['Services']:
                    service_name = service['Name']
                    for network in service['Networks']:
                        address = f"{network['IP']}:{network['Port']}"
                        new_map[address] = service_name

    instance_to_service_map = new_map
    logging.info(f"Updated instance map. Total instances found: {len(instance_to_service_map)}")

def poll_nomad_api():
    """后台线程,定期刷新服务实例映射"""
    while True:
        fetch_nomad_allocations()
        time.sleep(30) # 生产环境中这个间隔可以调整

def follow(thefile):
    """一个简单的 'tail -f' 实现"""
    thefile.seek(0, 2) # Go to the end of the file
    while True:
        line = thefile.readline()
        if not line:
            time.sleep(0.1)
            continue
        yield line

def process_nginx_logs():
    """后台线程,持续读取并处理 Nginx 日志"""
    global topology_graph
    
    # 等待日志文件被 Nginx 创建
    while not os.path.exists(NGINX_LOG_FILE):
        logging.warning(f"Log file {NGINX_LOG_FILE} not found. Waiting...")
        time.sleep(5)

    logging.info(f"Start tailing log file: {NGINX_LOG_FILE}")
    logfile = open(NGINX_LOG_FILE, 'r')
    loglines = follow(logfile)
    
    for line in loglines:
        try:
            log_data = json.loads(line)
            
            # $upstream_addr 可能会有多个地址,用逗号分隔
            # 例如 "10.0.2.15:21856, 10.0.2.16:31234"
            upstream_addrs = log_data.get("upstream_addr", "").split(", ")
            if not upstream_addrs or not upstream_addrs[0]:
                continue
            
            # 在这个模型中,我们认为请求的发起方是 api-gateway
            # 这里的逻辑需要根据实际网关部署情况调整
            # 如果请求直接来自外部,source_service 可以标记为 'external'
            # $http_host 可以帮助我们识别请求打到哪个虚拟主机
            source_service = log_data.get("http_host", "external")
            if "api-gateway" in source_service: # 简化处理
                source_service = "api-gateway"

            # 优先使用我们自定义的 Header
            target_service_name = log_data.get("upstream_service_name")
            
            if not target_service_name:
                # 如果 Header 不存在,则回退到基于 IP 的查找
                # 这里的挑战在于一个请求可能被负载均衡到多个实例
                # 我们只取第一个
                target_addr = upstream_addrs[0]
                target_service_name = instance_to_service_map.get(target_addr)
            
            if target_service_name:
                # 防止 A->B, B->C 这样的链条中,将 B 误判为源头
                # 在我们的边缘代理模型中,所有记录到的请求源头都是 Nginx 的调用者
                # 这里需要更复杂的逻辑来重建内部调用链,但对于边缘拓扑已足够
                
                # 更新拓扑图,使用 set 避免重复
                current_deps = topology_graph[source_service]
                if target_service_name not in current_deps:
                    topology_graph[source_service].add(target_service_name)
                    logging.info(f"New dependency discovered: {source_service} -> {target_service_name}")
            else:
                logging.warning(f"Could not map upstream address '{upstream_addrs[0]}' to a known Nomad service.")

        except json.JSONDecodeError:
            logging.warning(f"Failed to decode JSON log line: {line.strip()}")
        except Exception as e:
            logging.error(f"Error processing log line: {e}")


@app.route('/topology')
def get_topology():
    """API 端点,以 JSON 格式返回拓扑图"""
    # 将 set 转换为 list 以便 JSON 序列化
    serializable_graph = {source: list(dests) for source, dests in topology_graph.items()}
    return jsonify(serializable_graph)
    
@app.route('/health')
def health_check():
    return "OK", 200

if __name__ == '__main__':
    # 启动后台线程
    nomad_poller = threading.Thread(target=poll_nomad_api, daemon=True)
    nomad_poller.start()
    
    log_processor = threading.Thread(target=process_nginx_logs, daemon=True)
    log_processor.start()
    
    app.run(host='0.0.0.0', port=5000)

这个 Flask 应用的设计有几个值得注意的地方:

  • 解耦: 读取日志和查询 Nomad API 是两个独立的后台线程。这确保了 API 的响应不会被 I/O 操作阻塞。
  • 容错: 代码中包含了对 API 请求失败和日志解析错误的异常处理。在生产环境中,这部分需要更健壮的重试和告警机制。
  • instance_to_service_map: 这个内存缓存是连接 Nginx 日志(物理层)和 Nomad 服务(逻辑层)的桥梁。它的数据新鲜度决定了拓扑发现的准确性。30秒的轮询间隔是一个权衡,兼顾了实时性和对 Nomad API 的压力。
  • 回退机制: proxy_set_header 是首选方案,因为它准确无误。但代码也保留了通过 IP 反查服务的能力,作为一种备用。在真实项目中,这种 IP 反查的逻辑会更复杂,需要处理 NAT、Overlay 网络等情况。

第三步:部署与可视化

最后,我们将 topology-collector 也部署到 Nomad 集群,并让它能够访问 Nginx 的日志卷和 Nomad 的 API。

# file: collector.nomad.hcl
job "topology-collector" {
  datacenters = ["dc1"]
  type        = "service"

  group "collector" {
    count = 1

    # 与 Nginx job 共享同一个日志卷,但设置为只读
    volume "nginx-logs" {
      type      = "host"
      source    = "nginx-logs-volume"
      read_only = true
    }

    network {
      port "http" {}
    }

    service {
      name = "topology-collector"
      port = "http"
    }

    task "server" {
      driver = "docker"

      config {
        image = "your-repo/topology-collector:latest" # 假设已将 Flask 应用打包成镜像
        ports = ["http"]
        
        # 将日志卷挂载到容器内,路径要与 Flask 应用的环境变量匹配
        volumes = [
          "/path/on/host/nginx-logs-volume:/var/log/nginx-logs"
        ]
      }
      
      env {
        NOMAD_ADDR = "http://${attr.unique.network.ip-address}:4646"
        NGINX_LOG_FILE = "/var/log/nginx-logs/access.log"
      }

      resources {
        cpu    = 200
        memory = 128
      }
    }
  }
}

部署完成后,我们可以通过 topology-collector/topology API 获取数据。假设我们向 api-gateway 发送了一些请求,它内部调用了 user-serviceorder-service。Nginx 日志会记录下这些 proxy_pass 操作。topology-collector 解析后,API 的返回可能如下:

{
  "api-gateway": [
    "user-service",
    "order-service"
  ],
  "external": [
    "api-gateway"
  ]
}

这些数据可以直接用于生成可视化图表。例如,可以使用 Mermaid.js 来绘制服务依赖图。

graph TD
    subgraph Nomad Cluster
        External_Client -- "requests" --> api-gateway;
        api-gateway -- "gets user data" --> user-service;
        api-gateway -- "creates order" --> order-service;
    end

    style External_Client fill:#f9f,stroke:#333,stroke-width:2px

局限性与未来迭代方向

这个方案虽然轻量且有效,但并非没有局限性。

  1. 覆盖范围: 它只能发现通过被监控的 Nginx 集群的流量。服务间的直接内部通信(例如,服务A直接调用服务B的内部地址,或通过消息队列通信)将无法被捕获。要覆盖这部分,需要扩展监控点,例如在每个节点上部署一个日志代理来捕获所有出口流量,或者转向服务网格。

  2. 调用关系粒度: 当前方案只能识别到“服务A调用了服务B”,但无法知道是哪个具体的 API 端点被调用。可以通过在 Nginx 日志中添加 $request_uri 字段来增强日志,从而获得更细粒度的依赖关系,例如 api-gateway:/v1/users -> user-service:/

  3. 数据持久化: 当前实现是纯内存的,重启后拓扑信息会丢失。一个自然的演进是引入一个时序数据库或图数据库(如 Prometheus 或 Neo4j)来存储拓扑关系,这样不仅可以持久化数据,还能分析依赖关系随时间的变化。

  4. 环境复杂性: 在更复杂的网络环境中,例如存在多层代理或 NAT 时,从日志中获取真实的调用方和被调用方 IP 会变得更加困难。可能需要依赖分布式追踪的 trace-id 来串联整条调用链,但这又回到了对应用代码有侵入性的方案上。

尽管存在这些局限,但对于许多以边缘代理为主要流量入口的系统而言,这是一个投入产出比极高的可观测性建设方案。它以最小的成本,解决了“谁依赖我,我依赖谁”这个核心问题,为架构决策、故障排查和容量规划提供了坚实的数据基础。


  目录