为新租户或新环境自动化部署一套完整的基础设施,远比执行一个 pulumi up
命令要复杂。在真实项目中,一个完整的“环境就绪”流程可能包含三个步骤:1) 使用 Pulumi 创建云资源(如数据库、存储桶、消息队列);2) 调用应用服务A,初始化数据库 Schema 并填入基础数据;3) 调用应用服务B,将新环境的元数据注册到全局配置中心。这三个步骤必须构成一个原子操作:要么全部成功,要么全部回滚到初始状态。如果 Pulumi 成功创建了数据库,但应用服务A初始化失败,我们必须销毁刚刚创建的云资源,避免产生无人管理的“孤儿资源”和不必要的云开销。
这种跨越基础设施层和应用服务层的复合操作,本质上是一个分布式事务。直接套用两阶段提交(2PC)这样的强一致性方案,在技术上和成本上都极不现实。基础设施API并非为XA协议设计,长时间锁定资源会带来稳定性问题。
一个更务实的方案是放弃强一致性,转而追求最终一致性,并借鉴数据库事务ACID中的原子性(Atomicity)和持久性(Durability)思想。这里的挑战在于,如何设计一个编排器,能可靠地管理这个长周期、跨边界的工作流状态,并具备失败后自动补偿(回滚)的能力。
方案权衡:命令式脚本 vs. 声明式Saga
一个直接的想法是编写一个命令式脚本(如 Bash 或 Python),按顺序执行任务:
- 执行
pulumi up --yes
。 - 若成功,解析 Pulumi 输出,获取数据库连接字符串。
- 调用服务A的初始化API。
- 若成功,调用服务B的注册API。
- 在任何一步失败时,捕获异常并执行
pulumi destroy --yes
。
这个方案的主要缺陷在于其脆弱性。如果脚本执行器自身在步骤3之后崩溃,整个工作流就处于一个未知的中间状态。重启脚本无法安全地恢复执行,因为它不知道之前完成了哪些步骤。它缺乏对工作流状态的持久化管理。
Saga 模式提供了一种更健壮的解决方案。它将一个长事务拆分为一系列本地事务(Saga Steps),每个本地事务都有一个对应的补偿操作(Compensation)。编排器负责按顺序执行每个步骤,并持久化记录工作流的状态。如果任何步骤失败,编排器会根据记录的状态,反向执行已成功步骤的补偿操作。
graph TD subgraph Saga Transaction: ProvisionNewEnvironment A[Start] --> B(1. Create Infrastructure); B -- Success --> C(2. Initialize Database); C -- Success --> D(3. Register Metadata); D -- Success --> E[End]; C -- Failure --> F(Compensate: Destroy Infrastructure); F --> G[End Failed]; B -- Failure --> G; D -- Failure --> H(Compensate: Uninitialize Database); H --> F; end
在这个模型中:
- 本地事务1:
Create Infrastructure
(通过 Pulumi 实现)。 - 补偿操作1:
Destroy Infrastructure
(通过 Pulumi 实现)。 - 本地事务2:
Initialize Database
(调用服务A)。 - 补偿操作2:
Uninitialize Database
(调用服务A的另一个接口,清除数据)。 - 本地事务3:
Register Metadata
(调用服务B)。 - 补偿操作3:
Deregister Metadata
(调用服务B的接口,删除元数据)。
这个方案的核心在于状态管理。我们需要一个持久化的状态机来跟踪每个Saga实例的进展。选择将Saga的状态存储在关系型数据库中,是因为我们可以利用数据库本身的ACID事务来保证Saga状态更新的原子性和持久性。
核心实现:基于Go的Saga编排器与状态持久化
我们使用Go语言来构建这个编排器,因为它非常适合编写系统级和网络服务。Pulumi也提供了原生的Go Automation SDK,使我们能以编程方式驱动基础设施的创建和销毁,而不是通过执行CLI命令。
1. Saga状态管理与数据模型
首先,定义Saga事务日志的数据模型。这张表是整个系统可靠性的基石。
CREATE TABLE saga_execution_logs (
id VARCHAR(36) PRIMARY KEY,
saga_name VARCHAR(100) NOT NULL,
current_step INT NOT NULL,
status VARCHAR(20) NOT NULL, -- PENDING, EXECUTING, COMPENSATING, SUCCEEDED, FAILED
payload JSONB NOT NULL,
step_history JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- status: 记录整个Saga实例的宏观状态
-- current_step: 指示当前正在执行或下一个要执行的步骤索引
-- payload: 存储Saga执行所需的上下文数据,如租户ID、环境配置等
-- step_history: 记录每个步骤的执行结果和状态,用于补偿
2. Pulumi Automation SDK的封装
直接在Saga步骤中调用Pulumi SDK 会让业务逻辑和基础设施逻辑耦合。一个更好的实践是将其封装成一个独立的 InfraManager
。
package infra
import (
"context"
"fmt"
"os"
"github.com/pulumi/pulumi/sdk/v3/go/auto"
"github.com/pulumi/pulumi/sdk/v3/go/auto/optdestroy"
"github.com/pulumi/pulumi/sdk/v3/go/auto/optup"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)
// InfraManager encapsulates Pulumi Automation API operations.
type InfraManager struct {
ProjectName string
StackName string
Program pulumi.RunFunc
}
// ProvisionResult holds the output of a successful provisioning.
type ProvisionResult struct {
Outputs map[string]interface{}
}
// NewInfraManager creates a new infrastructure manager.
func NewInfraManager(projectName, stackName string, program pulumi.RunFunc) *InfraManager {
return &InfraManager{
ProjectName: projectName,
StackName: stackName,
Program: program,
}
}
// Up creates or updates the infrastructure.
// It's designed to be a single step in a Saga.
func (m *InfraManager) Up(ctx context.Context, config map[string]string) (*ProvisionResult, error) {
// OpenTelemetry tracing for observability
tracer := otel.Tracer("infra-manager")
ctx, span := tracer.Start(ctx, "Pulumi.Up")
defer span.End()
span.SetAttributes(
attribute.String("pulumi.project", m.ProjectName),
attribute.String("pulumi.stack", m.StackName),
)
// Context handling is crucial for cancellation and timeouts.
s, err := auto.UpsertStackInlineSource(ctx, m.StackName, m.ProjectName, m.Program)
if err != nil {
return nil, fmt.Errorf("failed to create or select stack: %w", err)
}
// Set configuration for the stack.
for k, v := range config {
s.SetConfig(ctx, k, auto.ConfigValue{Value: v})
}
// Wire up stdout to the current process.
stdoutStreamer := optup.ProgressStreams(os.Stdout)
res, err := s.Up(ctx, stdoutStreamer)
if err != nil {
span.RecordError(err)
return nil, fmt.Errorf("pulumi up failed: %w", err)
}
return &ProvisionResult{Outputs: res.Outputs}, nil
}
// Destroy tears down the infrastructure.
// This is the compensation action for 'Up'.
func (m *InfraManager) Destroy(ctx context.Context) error {
tracer := otel.Tracer("infra-manager")
ctx, span := tracer.Start(ctx, "Pulumi.Destroy")
defer span.End()
span.SetAttributes(
attribute.String("pulumi.project", m.ProjectName),
attribute.String("pulumi.stack", m.StackName),
)
s, err := auto.SelectStackInlineSource(ctx, m.StackName, m.ProjectName, m.Program)
if err != nil {
return fmt.Errorf("failed to select stack for destroy: %w", err)
}
stdoutStreamer := optdestroy.ProgressStreams(os.Stdout)
_, err = s.Destroy(ctx, stdoutStreamer)
if err != nil {
span.RecordError(err)
return fmt.Errorf("pulumi destroy failed: %w", err)
}
return nil
}
这段代码的关键在于它将Pulumi操作封装为幂等的、可被Saga编排器调度的单元。同时,我们集成了OpenTelemetry,为后续使用Zipkin进行全链路追踪埋下伏笔。
3. Saga编排器的核心逻辑
编排器是Saga模式的核心。它驱动整个流程,并负责状态的持久化和失败时的补偿。
package saga
import (
"context"
"encoding/json"
"fmt"
// Assuming a repository for DB operations exists
"myapp/repository"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
// Step defines a single step in the Saga.
type Step struct {
Name string
Execute func(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)
Compensate func(ctx context.Context, payload map[string]interface{}) error
}
// Saga defines the entire workflow.
type Saga struct {
Name string
Steps []Step
}
// Coordinator manages the execution of a Saga instance.
type Coordinator struct {
repo repository.SagaLogRepository
tracer trace.Tracer
}
func NewCoordinator(repo repository.SagaLogRepository) *Coordinator {
return &Coordinator{
repo: repo,
tracer: otel.Tracer("saga-coordinator"),
}
}
// Execute starts a new Saga and runs it to completion or failure.
func (c *Coordinator) Execute(ctx context.Context, saga Saga, initialPayload map[string]interface{}) error {
ctx, span := c.tracer.Start(ctx, fmt.Sprintf("Saga.Execute: %s", saga.Name))
defer span.End()
// 1. Create and persist the initial state of the Saga instance.
logEntry, err := c.repo.CreateLog(ctx, saga.Name, initialPayload)
if err != nil {
return fmt.Errorf("failed to create initial saga log: %w", err)
}
span.SetAttributes(attribute.String("saga.id", logEntry.ID))
currentPayload := initialPayload
// 2. Execute steps sequentially.
for i, step := range saga.Steps {
logEntry.CurrentStep = i
logEntry.Status = "EXECUTING"
if err := c.repo.UpdateLog(ctx, logEntry); err != nil {
// If we can't even update the log, we have a critical problem.
// This requires manual intervention.
return fmt.Errorf("FATAL: failed to update saga log before step %d: %w", i, err)
}
ctx, stepSpan := c.tracer.Start(ctx, fmt.Sprintf("SagaStep: %s", step.Name))
// Execute the local transaction.
outputPayload, err := step.Execute(ctx, currentPayload)
if err != nil {
stepSpan.RecordError(err)
stepSpan.End()
// Execution failed, start compensation.
c.compensate(ctx, saga, logEntry)
return fmt.Errorf("step '%s' failed, compensation triggered: %w", step.Name, err)
}
// Update payload for the next step.
for k, v := range outputPayload {
currentPayload[k] = v
}
// Persist the success of the step.
logEntry.StepHistory[step.Name] = map[string]interface{}{"status": "SUCCEEDED"}
logEntry.Payload = currentPayload
stepSpan.End()
}
// 3. All steps succeeded. Mark the Saga as completed.
logEntry.Status = "SUCCEEDED"
logEntry.CurrentStep = len(saga.Steps)
if err := c.repo.UpdateLog(ctx, logEntry); err != nil {
return fmt.Errorf("failed to mark saga as succeeded: %w", err)
}
return nil
}
func (c *Coordinator) compensate(ctx context.Context, saga Saga, logEntry *repository.SagaLog) {
ctx, span := c.tracer.Start(ctx, fmt.Sprintf("Saga.Compensate: %s", saga.Name))
defer span.End()
span.SetAttributes(attribute.String("saga.id", logEntry.ID))
logEntry.Status = "COMPENSATING"
c.repo.UpdateLog(ctx, logEntry) // Best effort update
// Compensate backwards from the last successfully completed step.
for i := logEntry.CurrentStep - 1; i >= 0; i-- {
step := saga.Steps[i]
// Check if this step actually completed before starting compensation.
history, exists := logEntry.StepHistory[step.Name]
if !exists || history["status"] != "SUCCEEDED" {
continue
}
ctx, compSpan := c.tracer.Start(ctx, fmt.Sprintf("SagaCompensate: %s", step.Name))
err := step.Compensate(ctx, logEntry.Payload)
if err != nil {
// Compensation failure is a critical error that requires manual intervention.
// Log it and mark the Saga as terminally failed.
compSpan.RecordError(err)
logEntry.Status = "COMPENSATION_FAILED"
logEntry.StepHistory[step.Name] = map[string]interface{}{
"status": "COMPENSATION_FAILED",
"error": err.Error(),
}
c.repo.UpdateLog(ctx, logEntry)
compSpan.End()
return // Stop further compensation.
}
logEntry.StepHistory[step.Name] = map[string]interface{}{"status": "COMPENSATED"}
compSpan.End()
}
logEntry.Status = "FAILED"
c.repo.UpdateLog(ctx, logEntry)
}
此编排器逻辑的健壮性体现在每个状态转换前都会持久化日志。即使进程崩溃,我们也可以通过读取 saga_execution_logs
表来恢复Saga的状态,决定是继续执行还是开始补偿。
集成全链路追踪:使用Zipkin洞察Saga执行过程
一个复杂的Saga流程,尤其当它调用多个微服务时,会变成一个调试黑洞。当某个环节出错时,我们需要一种方式来可视化整个调用链,快速定位问题。这就是Zipkin的用武之地。
通过在 InfraManager
和 Coordinator
中集成OpenTelemetry,我们已经生成了追踪数据。现在只需配置一个Exporter将这些数据发送到Zipkin实例。
package tracing
import (
"log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/zipkin"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)
// InitTracer initializes an OTel tracer provider with a Zipkin exporter.
func InitTracer(serviceName, zipkinURL string) (func(context.Context), error) {
exporter, err := zipkin.New(zipkinURL)
if err != nil {
return nil, err
}
// Create a new batch span processor.
batcher := sdktrace.NewBatchSpanProcessor(exporter)
tp := sdktrace.NewTracerProvider(
sdktrace.WithSpanProcessor(batcher),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(serviceName),
)),
)
otel.SetTracerProvider(tp)
// Set the global propagator to W3C Trace Context.
// This is crucial for propagating context across service boundaries.
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return func(ctx context.Context) {
if err := tp.Shutdown(ctx); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}, nil
}
在应用启动时初始化Tracer,之后所有通过otel.Tracer()
创建的Span都会被自动导出。当 Coordinator
调用外部服务(如 Initialize Database
API)时,我们需要手动注入追踪上下文到HTTP请求头中,以便Zipkin能将不同服务产生的Span关联起来。
// Example of an application service client call inside a Saga step
func initializeDatabase(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error) {
dbConnStr := payload["dbConnectionString"].(string)
req, err := http.NewRequestWithContext(ctx, "POST", "http://service-a/init", strings.NewReader(dbConnStr))
if err != nil {
return nil, err
}
// Inject the trace context into the outgoing request headers.
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
// ... execute request ...
return nil, nil
}
当这个Saga流程执行后,我们可以在Zipkin的UI上看到一条完整的轨迹,它清晰地展示了从Saga启动,到Pulumi执行,再到各个应用服务调用的完整流程、耗时和依赖关系。如果某一步失败,错误的Span会高亮显示,极大地简化了故障排查。
最终架构与局限性
我们构建的这套系统,本质上是一个具备事务性保证和深度可观测性的基础设施编排引擎。它通过Saga模式和持久化状态管理,为跨基础设施和应用边界的复杂操作提供了类似ACID的原子性保证。通过集成Zipkin,我们获得了对这个“黑盒”流程的透明度。
然而,这个方案并非银弹。它的主要局限性在于:
- 补偿逻辑的复杂性: 编写正确且幂等的补偿操作是一项挑战。
pulumi destroy
是天然的补偿,但应用层的补偿逻辑(如删除已初始化的数据)可能非常复杂,且容易出错。 - 最终一致性: 在失败和补偿期间,系统会处于一个中间状态。依赖此环境的下游系统需要能容忍这种短暂的不一致。
- 补偿失败: 补偿操作本身也可能失败。我们的设计中将此标记为
COMPENSATION_FAILED
状态,并停止后续补偿,这需要人工介入处理,是架构上的一个“兜底”策略。 - 状态管理数据库的单点问题: Saga日志数据库的可用性和性能至关重要。在生产环境中,需要为其配置高可用方案。
未来的优化路径可以探索将Saga编排器本身部署为高可用的集群,引入更复杂的重试和错误处理逻辑,或者为补偿失败的场景设计半自动化的恢复工具,以进一步减少人工干预的需要。