我们的模型服务迭代正变得越来越痛苦。最初,BentoML为我们提供了一个清晰的打包和部署规范,这很好。但随着业务逻辑的膨胀,问题也随之而来:每次上游需要一个新的特征预处理,或者下游需要一种新的结果格式化方式,整个BentoML服务就必须重新构建、测试、打包和部署。一次小小的逻辑变更,触发的是一整套重量级的CI/CD流程,发布周期被拉长,风险也随之增加。模型推理的核心代码与琐碎的业务胶水代码紧紧地耦合在一起,技术债像滚雪球一样越积越多。
我们需要的不是一个静态的、编译好的服务,而是一个具备动态能力的推理运行时(Inference Runtime)。这个运行时本身应该保持稳定,而其内部执行的业务逻辑——我们称之为“工作流”——则能够根据外部配置动态地加载、组合和更新。这不仅能将模型团队从频繁的部署中解放出来,也能让业务迭代更加敏捷。
初步构想是,将BentoML服务改造为一个“微内核”,它只负责承载工作流的执行环境。真正的业务逻辑以“插件”的形式存在,而这些插件如何串联成一个完整的工作流,则由一个外部的配置中心来定义。当需要变更逻辑时,我们只需修改配置,服务就能实时感知并应用新的工作流。
为了实现这个构想,技术选型变得至关重要:
- 执行内核: 继续使用 BentoML。它成熟的Serving能力、依赖管理和对多种模型框架的支持,使其成为一个理想的执行环境宿主。我们要做的是在其之上构建我们的动态调度层。
- 触发机制: 从同步的HTTP API调用转向异步的消息队列(以RabbitMQ为例)。这彻底实现了服务间的解耦。上游服务只需将原始数据投递到MQ,我们的BentoML服务作为消费者,按需处理,这天然地支持了削峰填谷、重试和背压,提高了系统的整体韧性。
- 动态配置: Nacos。它的配置管理能力,特别是其“配置监听器”机制,是我们实现工作流热更新的核心。工作流的定义(比如,哪个插件在前,哪个在后)将以JSON格式存储在Nacos中。
- 配置规范化: Prettier。这是一个看似无关的选择,但在真实项目中,它解决了配置管理中的一个痛点。随着工作流定义变得复杂,多人协作维护这些JSON配置文件极易出错,格式混乱。我们将Prettier集成到配置管理的Git流程中,通过pre-commit钩子强制统一格式,极大地提升了配置文件的可读性和可维护性。
整个系统的架构图如下所示:
graph TD subgraph "上游服务" Producer[数据生产者] end subgraph "消息中间件" MQ[RabbitMQ] end subgraph "BentoML 动态推理服务" BentoService[BentoML Service] subgraph "内部组件" Consumer[MQ Consumer] -->|消息| Dispatcher[工作流调度器] Dispatcher -->|获取工作流| WorkflowRegistry[工作流注册表] WorkflowRegistry -->|加载插件| PluginLoader[插件加载器] PluginLoader -->|读取插件代码| Plugins[插件目录] ConfigManager[配置管理器] -.->|更新配置| WorkflowRegistry end end subgraph "配置与代码" Nacos[Nacos Server] PluginsRepo[插件代码 Git Repo] ConfigRepo[工作流定义 Git Repo] end Producer -- "发送任务消息" --> MQ MQ -- "消费任务消息" --> Consumer BentoService -- "启动时加载" --> ConfigManager ConfigManager -- "监听配置变更" --> Nacos style BentoService fill:#f9f,stroke:#333,stroke-width:2px style Nacos fill:#bbf,stroke:#333,stroke-width:2px
第一步:定义工作流与插件的抽象
一切动态能力的基础是稳定的抽象。我们首先需要定义什么是“插件”(ProcessingStep)以及它们如何被组合成“工作流”(Workflow)。一个插件就是一个实现了特定接口的Python类,负责处理工作流中的一个步骤。
common/steps.py
:
import abc
import logging
from typing import Any, Dict
# 配置日志记录器
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class StepContext:
"""
工作流上下文,用于在不同步骤之间传递数据。
这是一个简单的实现,实际项目中可能需要更复杂的结构。
"""
def __init__(self, initial_data: Any):
self._data: Dict[str, Any] = {"initial": initial_data}
self.error: Exception | None = None
self.is_failed: bool = False
def set(self, key: str, value: Any):
self._data[key] = value
def get(self, key: str, default: Any = None) -> Any:
return self._data.get(key, default)
def get_all_data(self) -> Dict[str, Any]:
return self._data
def fail(self, error: Exception):
self.is_failed = True
self.error = error
logger.error(f"Workflow step failed: {error}", exc_info=True)
class ProcessingStep(abc.ABC):
"""
所有处理步骤插件的基类。
每个插件必须实现 process 方法。
"""
def __init__(self, params: Dict[str, Any] | None = None):
self.params = params or {}
logger.info(f"Initializing step: {self.__class__.__name__} with params: {self.params}")
@abc.abstractmethod
def process(self, context: StepContext) -> StepContext:
"""
执行处理逻辑。
:param context: 工作流上下文,包含需要处理的数据。
:return: 更新后的上下文。
"""
raise NotImplementedError
def __repr__(self) -> str:
return f"{self.__class__.__name__}(params={self.params})"
有了这个基类,我们可以编写具体的插件。例如,一个用于数据验证的插件和一个用于模型推理的插件。
plugins/validation.py
:
from common.steps import ProcessingStep, StepContext, logger
class SchemaValidator(ProcessingStep):
"""一个简单的数据结构验证插件"""
def process(self, context: StepContext) -> StepContext:
required_keys = self.params.get("required_keys", [])
if not required_keys:
logger.warning("No required_keys configured for SchemaValidator.")
return context
data = context.get("initial")
if not isinstance(data, dict):
context.fail(TypeError("Initial data is not a dictionary."))
return context
missing_keys = [key for key in required_keys if key not in data]
if missing_keys:
context.fail(ValueError(f"Missing required keys: {', '.join(missing_keys)}"))
logger.info(f"Schema validation passed for keys: {required_keys}")
context.set("validated_data", data)
return context
plugins/inference.py
:
import bentoml
from common.steps import ProcessingStep, StepContext, logger
# 假设我们有一个预先打包好的BentoML模型
# bentoml models pull iris_classifier:latest
iris_classifier_runner = bentoml.sklearn.get("iris_classifier:latest").to_runner()
class ModelInferrer(ProcessingStep):
"""执行BentoML模型推理的插件"""
def __init__(self, params: dict | None = None):
super().__init__(params)
# 在真实场景中,runner应该在服务启动时注入,而不是每次实例化时获取
# 这里为了简化示例,我们直接获取。
# 更好的做法是在服务启动时初始化所有需要的runners并缓存。
self.runner = iris_classifier_runner
def process(self, context: StepContext) -> StepContext:
validated_data = context.get("validated_data")
if validated_data is None:
context.fail(ValueError("No validated_data found in context. SchemaValidator must run first."))
return context
# 假设模型需要一个特征列表
features = validated_data.get("features")
if not isinstance(features, list):
context.fail(TypeError("Features must be a list."))
return context
try:
# 这里的 .predict.run 是BentoML 1.0+ 的标准调用方式
result = self.runner.predict.run([features])
prediction = result[0]
logger.info(f"Inference successful. Input: {features}, Prediction: {prediction}")
context.set("prediction", int(prediction))
except Exception as e:
context.fail(e)
return context
第二步:实现动态配置管理器与插件加载器
这是系统的核心。我们需要一个管理器,它能连接到Nacos,监听工作流定义的变更,并在变更发生时,安全地重新加载工作流。
services/config_manager.py
:
import json
import logging
import threading
import time
from typing import Dict, List, Any, Callable
from nacos import NacosClient
from importlib import import_module
from common.steps import ProcessingStep
logger = logging.getLogger(__name__)
class WorkflowRegistry:
"""
负责加载、存储和管理所有动态工作流。
"""
def __init__(self):
self._workflows: Dict[str, List[ProcessingStep]] = {}
self._lock = threading.RLock()
logger.info("WorkflowRegistry initialized.")
def update_workflows(self, config: Dict[str, Any]):
"""
根据新的配置解析并构建工作流。
这是一个原子操作,要么全部成功,要么全部失败,保持旧的工作流不变。
"""
if not isinstance(config, dict) or "workflows" not in config:
logger.error("Invalid config format: 'workflows' key not found.")
return
new_workflows = {}
try:
for name, steps_config in config["workflows"].items():
pipeline = []
for step_config in steps_config:
module_path, class_name = step_config["class"].rsplit('.', 1)
params = step_config.get("params", {})
# 动态导入模块和类
module = import_module(module_path)
step_class = getattr(module, class_name)
if not issubclass(step_class, ProcessingStep):
raise TypeError(f"{step_class.__name__} is not a subclass of ProcessingStep")
instance = step_class(params)
pipeline.append(instance)
new_workflows[name] = pipeline
logger.info(f"Successfully loaded workflow '{name}' with {len(pipeline)} steps.")
except (ImportError, AttributeError, TypeError, KeyError) as e:
logger.error(f"Failed to parse and load new workflows: {e}", exc_info=True)
# 加载失败,不更新现有工作流
return
with self._lock:
self._workflows = new_workflows
logger.info("WorkflowRegistry updated successfully with new configuration.")
def get_workflow(self, name: str) -> List[ProcessingStep] | None:
"""根据名称获取一个已注册的工作流管道"""
with self._lock:
return self._workflows.get(name)
class NacosConfigManager:
"""
连接Nacos,监听配置变更,并通知WorkflowRegistry更新。
"""
def __init__(self, server_address: str, namespace: str, data_id: str, group: str, registry: WorkflowRegistry):
self.client = NacosClient(server_addresses=server_address, namespace=namespace)
self.data_id = data_id
self.group = group
self.registry = registry
self._stop_event = threading.Event()
def _config_watcher(self, config_data: Dict[str, Any]):
"""Nacos回调函数,当配置变更时被调用"""
try:
logger.info(f"Received new configuration update for data_id='{self.data_id}'")
# Nacos推送的是字符串,需要解析
content = config_data.get("content", "{}")
if not content:
logger.warning("Received empty configuration content.")
return
config = json.loads(content)
self.registry.update_workflows(config)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON from Nacos config: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred in config watcher: {e}", exc_info=True)
def start(self):
"""启动监听"""
logger.info(f"Starting Nacos listener for data_id='{self.data_id}', group='{self.group}'")
# 首次拉取配置
try:
initial_config_str = self.client.get_config(self.data_id, self.group)
if initial_config_str:
self._config_watcher({"content": initial_config_str})
except Exception as e:
logger.error(f"Failed to get initial config from Nacos: {e}")
# 添加监听器,这里的实现使用一个后台线程轮询模拟长轮询
thread = threading.Thread(target=self._watch_loop, daemon=True)
thread.start()
def _watch_loop(self):
"""
一个简化的长轮询模拟,实际Nacos Python SDK有更复杂的实现。
这里为了清晰,我们手动实现。
"""
self.client.add_config_watcher(self.data_id, self.group, self._config_watcher)
logger.info("Nacos watcher added. Entering watch loop.")
while not self._stop_event.is_set():
time.sleep(10) # 保持主线程存活,让回调线程工作
def stop(self):
"""停止监听"""
self._stop_event.set()
self.client.remove_config_watcher(self.data_id, self.group, self._config_watcher)
self.client.close()
logger.info("Nacos listener stopped.")
这里的关键在于update_workflows
方法的健壮性。它必须能处理配置错误(如类名写错、JSON格式错误),并且在更新失败时,不能破坏当前正在运行的工作流。这是一个“全有或全无”的原子性操作。
第三步:配置Nacos中的工作流
我们在Nacos中创建一个新的配置,dataId
为bento-workflows
,group
为DEFAULT_GROUP
。内容为JSON格式:
{
"workflows": {
"iris_standard_flow": [
{
"class": "plugins.validation.SchemaValidator",
"params": {
"required_keys": ["features"]
}
},
{
"class": "plugins.inference.ModelInferrer",
"params": {}
}
],
"iris_no_validation_flow": [
{
"class": "plugins.inference.ModelInferrer",
"params": {}
}
]
}
}
这份配置定义了两个工作流:iris_standard_flow
包含验证和推理两个步骤,而iris_no_validation_flow
则直接进行推理。
为了保证这份JSON的质量,我们可以在配置的Git仓库中引入Prettier。package.json
:
{
"devDependencies": {
"husky": "^8.0.0",
"prettier": "^2.8.0"
},
"scripts": {
"prepare": "husky install"
}
}
.prettierrc
:
{
"jsonWhitespaceSensitivity": "strict",
"printWidth": 80,
"tabWidth": 2
}
然后设置一个pre-commit钩子(使用husky
),在提交前自动格式化所有JSON文件。这确保了所有推送到Nacos的配置都遵循统一的、清晰的格式,减少了因格式问题导致的人为错误。
第四步:构建消息消费者与BentoML服务
现在,我们将所有组件集成到BentoML服务中。服务在启动时会初始化NacosConfigManager
和消息队列消费者。消费者从队列中获取消息,并使用WorkflowRegistry
来执行相应的工作流。
services/mq_consumer.py
:
import pika
import json
import logging
import threading
from typing import Callable
logger = logging.getLogger(__name__)
class RabbitMQConsumer:
def __init__(self, amqp_url: str, queue_name: str, dispatch_callback: Callable):
self.amqp_url = amqp_url
self.queue_name = queue_name
self.dispatch_callback = dispatch_callback
self._connection = None
self._channel = None
self._thread = None
self._stop_event = threading.Event()
def _connect(self):
logger.info("Connecting to RabbitMQ...")
self._connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
self._channel = self._connection.channel()
self._channel.queue_declare(queue=self.queue_name, durable=True)
self._channel.basic_qos(prefetch_count=1)
logger.info("Successfully connected to RabbitMQ.")
def _on_message(self, channel, method, properties, body):
try:
logger.info(f"Received message with delivery_tag: {method.delivery_tag}")
message_data = json.loads(body)
workflow_name = properties.headers.get('workflow_name', 'default_flow')
# 调用Bento服务中的调度逻辑
self.dispatch_callback(workflow_name, message_data)
channel.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Message {method.delivery_tag} processed and acked.")
except json.JSONDecodeError:
logger.error(f"Failed to decode message body: {body}. Rejecting message.")
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
logger.error(f"Error processing message: {e}. Requeueing message.", exc_info=True)
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def _run(self):
while not self._stop_event.is_set():
try:
self._connect()
self._channel.basic_consume(queue=self.queue_name, on_message_callback=self._on_message)
self._channel.start_consuming()
except pika.exceptions.AMQPConnectionError as e:
logger.error(f"RabbitMQ connection error: {e}. Reconnecting in 5 seconds...")
time.sleep(5)
except Exception as e:
logger.error(f"An unexpected error occurred in consumer loop: {e}", exc_info=True)
break
logger.info("RabbitMQ consumer loop finished.")
def start(self):
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
logger.info("RabbitMQ consumer started in background thread.")
def stop(self):
logger.info("Stopping RabbitMQ consumer...")
self._stop_event.set()
if self._channel and self._channel.is_open:
self._channel.stop_consuming()
if self._connection and self._connection.is_open:
self._connection.close()
self._thread.join()
logger.info("RabbitMQ consumer stopped.")
service.py
(BentoML服务定义):
import bentoml
import logging
from services.config_manager import WorkflowRegistry, NacosConfigManager
from services.mq_consumer import RabbitMQConsumer
from common.steps import StepContext
# 环境变量,实际项目中通过配置注入
NACOS_SERVER = "localhost:8848"
NACOS_NAMESPACE = "" # 通常是 public 或自定义
NACOS_DATA_ID = "bento-workflows"
NACOS_GROUP = "DEFAULT_GROUP"
RABBITMQ_URL = "amqp://guest:guest@localhost:5672/"
QUEUE_NAME = "inference_tasks"
logger = logging.getLogger(__name__)
# 全局单例
workflow_registry = WorkflowRegistry()
config_manager = NacosConfigManager(
server_address=NACOS_SERVER,
namespace=NACOS_NAMESPACE,
data_id=NACOS_DATA_ID,
group=NACOS_GROUP,
registry=workflow_registry
)
def workflow_dispatcher(workflow_name: str, message_data: dict):
"""
工作流调度核心逻辑。
由MQ消费者回调此函数。
"""
logger.info(f"Dispatching task to workflow: '{workflow_name}'")
workflow = workflow_registry.get_workflow(workflow_name)
if not workflow:
logger.error(f"Workflow '{workflow_name}' not found. Task discarded.")
return
context = StepContext(initial_data=message_data)
for step in workflow:
if context.is_failed:
logger.warning(f"Workflow '{workflow_name}' aborted due to previous step failure.")
break
try:
logger.info(f"Executing step: {step}")
context = step.process(context)
except Exception as e:
context.fail(e)
if context.is_failed:
logger.error(f"Workflow '{workflow_name}' execution failed. Final error: {context.error}")
else:
logger.info(f"Workflow '{workflow_name}' executed successfully. Final data: {context.get_all_data()}")
# 初始化MQ消费者
mq_consumer = RabbitMQConsumer(
amqp_url=RABBITMQ_URL,
queue_name=QUEUE_NAME,
dispatch_callback=workflow_dispatcher
)
# 创建BentoML服务
svc = bentoml.Service("dynamic_workflow_runner")
@svc.on_startup
def startup():
"""服务启动时的钩子,启动后台任务"""
logger.info("Service starting up...")
config_manager.start()
mq_consumer.start()
logger.info("Background services (Nacos, RabbitMQ) started.")
@svc.on_shutdown
def shutdown():
"""服务关闭时的钩子,优雅关闭后台任务"""
logger.info("Service shutting down...")
mq_consumer.stop()
config_manager.stop()
logger.info("Background services stopped.")
# 我们不再需要一个主要的API端点来做推理,但可以保留一个用于健康检查
@svc.api(input=bentoml.io.JSON(), output=bentoml.io.JSON())
def health_check(data: dict):
return {"status": "ok", "active_workflows": list(workflow_registry._workflows.keys())}
至此,一个完整的动态推理服务已经构建完成。当服务启动时,它会从Nacos拉取最新的工作流配置,并开始监听消息队列。当一条消息到达时,它会根据消息头中的workflow_name
选择一个工作流来执行。如果我们此时去Nacos控制台修改bento-workflows
的配置——比如给iris_standard_flow
增加一个日志记录插件,或者删除iris_no_validation_flow
——服务将在几秒钟内自动应用这些变更,无需任何重启。
当前方案的局限性与未来迭代
尽管这个架构解决了动态配置和解耦的核心问题,但在生产环境中,它还有一些需要完善的地方:
- 插件依赖管理: 目前所有插件都假设其依赖已存在于主BentoML服务的环境中。如果插件需要独立的、甚至是有冲突的依赖,当前的
importlib
方案无法解决。未来的迭代可以探索更复杂的加载机制,例如使用子进程或轻量级容器来隔离每个插件的执行环境,但这会显著增加架构的复杂性。 - 工作流状态管理: 当前的工作流是无状态的。对于需要跨多个步骤、甚至跨多个消息进行状态跟踪的复杂场景(如Saga模式),需要引入外部的状态存储(如Redis或数据库)并在
StepContext
中进行管理。 - 可观测性: 动态系统的调试和监控是个挑战。我们需要为每个插件和工作流引入更精细化的Metrics(执行时间、成功率、失败率),并将这些指标聚合到Prometheus等监控系统中。分布式追踪对于理解一个请求在动态工作流中的完整生命周期也至关重要。
- 配置的版本控制与回滚: 直接在Nacos控制台修改配置是危险的。一个更安全的实践是,将配置也纳入Git进行版本管理(我们已经通过Prettier规范化了它),通过CI/CD流水线来发布配置变更。这提供了审计日志,并允许在出现问题时快速回滚到上一个稳定版本。