使用 Pulumi 与 Saga 模式构建具备 ACID 特性的基础设施编排器


为新租户或新环境自动化部署一套完整的基础设施,远比执行一个 pulumi up 命令要复杂。在真实项目中,一个完整的“环境就绪”流程可能包含三个步骤:1) 使用 Pulumi 创建云资源(如数据库、存储桶、消息队列);2) 调用应用服务A,初始化数据库 Schema 并填入基础数据;3) 调用应用服务B,将新环境的元数据注册到全局配置中心。这三个步骤必须构成一个原子操作:要么全部成功,要么全部回滚到初始状态。如果 Pulumi 成功创建了数据库,但应用服务A初始化失败,我们必须销毁刚刚创建的云资源,避免产生无人管理的“孤儿资源”和不必要的云开销。

这种跨越基础设施层和应用服务层的复合操作,本质上是一个分布式事务。直接套用两阶段提交(2PC)这样的强一致性方案,在技术上和成本上都极不现实。基础设施API并非为XA协议设计,长时间锁定资源会带来稳定性问题。

一个更务实的方案是放弃强一致性,转而追求最终一致性,并借鉴数据库事务ACID中的原子性(Atomicity)和持久性(Durability)思想。这里的挑战在于,如何设计一个编排器,能可靠地管理这个长周期、跨边界的工作流状态,并具备失败后自动补偿(回滚)的能力。

方案权衡:命令式脚本 vs. 声明式Saga

一个直接的想法是编写一个命令式脚本(如 Bash 或 Python),按顺序执行任务:

  1. 执行 pulumi up --yes
  2. 若成功,解析 Pulumi 输出,获取数据库连接字符串。
  3. 调用服务A的初始化API。
  4. 若成功,调用服务B的注册API。
  5. 在任何一步失败时,捕获异常并执行 pulumi destroy --yes

这个方案的主要缺陷在于其脆弱性。如果脚本执行器自身在步骤3之后崩溃,整个工作流就处于一个未知的中间状态。重启脚本无法安全地恢复执行,因为它不知道之前完成了哪些步骤。它缺乏对工作流状态的持久化管理。

Saga 模式提供了一种更健壮的解决方案。它将一个长事务拆分为一系列本地事务(Saga Steps),每个本地事务都有一个对应的补偿操作(Compensation)。编排器负责按顺序执行每个步骤,并持久化记录工作流的状态。如果任何步骤失败,编排器会根据记录的状态,反向执行已成功步骤的补偿操作。

graph TD
    subgraph Saga Transaction: ProvisionNewEnvironment
        A[Start] --> B(1. Create Infrastructure);
        B -- Success --> C(2. Initialize Database);
        C -- Success --> D(3. Register Metadata);
        D -- Success --> E[End];
        C -- Failure --> F(Compensate: Destroy Infrastructure);
        F --> G[End Failed];
        B -- Failure --> G;
        D -- Failure --> H(Compensate: Uninitialize Database);
        H --> F;
    end

在这个模型中:

  • 本地事务1: Create Infrastructure (通过 Pulumi 实现)。
  • 补偿操作1: Destroy Infrastructure (通过 Pulumi 实现)。
  • 本地事务2: Initialize Database (调用服务A)。
  • 补偿操作2: Uninitialize Database (调用服务A的另一个接口,清除数据)。
  • 本地事务3: Register Metadata (调用服务B)。
  • 补偿操作3: Deregister Metadata (调用服务B的接口,删除元数据)。

这个方案的核心在于状态管理。我们需要一个持久化的状态机来跟踪每个Saga实例的进展。选择将Saga的状态存储在关系型数据库中,是因为我们可以利用数据库本身的ACID事务来保证Saga状态更新的原子性和持久性。

核心实现:基于Go的Saga编排器与状态持久化

我们使用Go语言来构建这个编排器,因为它非常适合编写系统级和网络服务。Pulumi也提供了原生的Go Automation SDK,使我们能以编程方式驱动基础设施的创建和销毁,而不是通过执行CLI命令。

1. Saga状态管理与数据模型

首先,定义Saga事务日志的数据模型。这张表是整个系统可靠性的基石。

CREATE TABLE saga_execution_logs (
    id VARCHAR(36) PRIMARY KEY,
    saga_name VARCHAR(100) NOT NULL,
    current_step INT NOT NULL,
    status VARCHAR(20) NOT NULL, -- PENDING, EXECUTING, COMPENSATING, SUCCEEDED, FAILED
    payload JSONB NOT NULL,
    step_history JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- status: 记录整个Saga实例的宏观状态
-- current_step: 指示当前正在执行或下一个要执行的步骤索引
-- payload: 存储Saga执行所需的上下文数据,如租户ID、环境配置等
-- step_history: 记录每个步骤的执行结果和状态,用于补偿

2. Pulumi Automation SDK的封装

直接在Saga步骤中调用Pulumi SDK 会让业务逻辑和基础设施逻辑耦合。一个更好的实践是将其封装成一个独立的 InfraManager

package infra

import (
	"context"
	"fmt"
	"os"

	"github.com/pulumi/pulumi/sdk/v3/go/auto"
	"github.com/pulumi/pulumi/sdk/v3/go/auto/optdestroy"
	"github.com/pulumi/pulumi/sdk/v3/go/auto/optup"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
)

// InfraManager encapsulates Pulumi Automation API operations.
type InfraManager struct {
	ProjectName string
	StackName   string
	Program     pulumi.RunFunc
}

// ProvisionResult holds the output of a successful provisioning.
type ProvisionResult struct {
	Outputs map[string]interface{}
}

// NewInfraManager creates a new infrastructure manager.
func NewInfraManager(projectName, stackName string, program pulumi.RunFunc) *InfraManager {
	return &InfraManager{
		ProjectName: projectName,
		StackName:   stackName,
		Program:     program,
	}
}

// Up creates or updates the infrastructure.
// It's designed to be a single step in a Saga.
func (m *InfraManager) Up(ctx context.Context, config map[string]string) (*ProvisionResult, error) {
	// OpenTelemetry tracing for observability
	tracer := otel.Tracer("infra-manager")
	ctx, span := tracer.Start(ctx, "Pulumi.Up")
	defer span.End()

	span.SetAttributes(
		attribute.String("pulumi.project", m.ProjectName),
		attribute.String("pulumi.stack", m.StackName),
	)

	// Context handling is crucial for cancellation and timeouts.
	s, err := auto.UpsertStackInlineSource(ctx, m.StackName, m.ProjectName, m.Program)
	if err != nil {
		return nil, fmt.Errorf("failed to create or select stack: %w", err)
	}

	// Set configuration for the stack.
	for k, v := range config {
		s.SetConfig(ctx, k, auto.ConfigValue{Value: v})
	}

	// Wire up stdout to the current process.
	stdoutStreamer := optup.ProgressStreams(os.Stdout)
	res, err := s.Up(ctx, stdoutStreamer)
	if err != nil {
		span.RecordError(err)
		return nil, fmt.Errorf("pulumi up failed: %w", err)
	}

	return &ProvisionResult{Outputs: res.Outputs}, nil
}

// Destroy tears down the infrastructure.
// This is the compensation action for 'Up'.
func (m *InfraManager) Destroy(ctx context.Context) error {
	tracer := otel.Tracer("infra-manager")
	ctx, span := tracer.Start(ctx, "Pulumi.Destroy")
	defer span.End()
	
	span.SetAttributes(
		attribute.String("pulumi.project", m.ProjectName),
		attribute.String("pulumi.stack", m.StackName),
	)

	s, err := auto.SelectStackInlineSource(ctx, m.StackName, m.ProjectName, m.Program)
	if err != nil {
		return fmt.Errorf("failed to select stack for destroy: %w", err)
	}

	stdoutStreamer := optdestroy.ProgressStreams(os.Stdout)
	_, err = s.Destroy(ctx, stdoutStreamer)
	if err != nil {
		span.RecordError(err)
		return fmt.Errorf("pulumi destroy failed: %w", err)
	}

	return nil
}

这段代码的关键在于它将Pulumi操作封装为幂等的、可被Saga编排器调度的单元。同时,我们集成了OpenTelemetry,为后续使用Zipkin进行全链路追踪埋下伏笔。

3. Saga编排器的核心逻辑

编排器是Saga模式的核心。它驱动整个流程,并负责状态的持久化和失败时的补偿。

package saga

import (
	"context"
	"encoding/json"
	"fmt"
	
	// Assuming a repository for DB operations exists
	"myapp/repository" 
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/trace"
)

// Step defines a single step in the Saga.
type Step struct {
	Name        string
	Execute     func(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)
	Compensate  func(ctx context.Context, payload map[string]interface{}) error
}

// Saga defines the entire workflow.
type Saga struct {
	Name  string
	Steps []Step
}

// Coordinator manages the execution of a Saga instance.
type Coordinator struct {
	repo repository.SagaLogRepository
	tracer trace.Tracer
}

func NewCoordinator(repo repository.SagaLogRepository) *Coordinator {
	return &Coordinator{
		repo: repo,
		tracer: otel.Tracer("saga-coordinator"),
	}
}

// Execute starts a new Saga and runs it to completion or failure.
func (c *Coordinator) Execute(ctx context.Context, saga Saga, initialPayload map[string]interface{}) error {
	ctx, span := c.tracer.Start(ctx, fmt.Sprintf("Saga.Execute: %s", saga.Name))
	defer span.End()

	// 1. Create and persist the initial state of the Saga instance.
	logEntry, err := c.repo.CreateLog(ctx, saga.Name, initialPayload)
	if err != nil {
		return fmt.Errorf("failed to create initial saga log: %w", err)
	}
	
	span.SetAttributes(attribute.String("saga.id", logEntry.ID))

	currentPayload := initialPayload
	
	// 2. Execute steps sequentially.
	for i, step := range saga.Steps {
		logEntry.CurrentStep = i
		logEntry.Status = "EXECUTING"
		if err := c.repo.UpdateLog(ctx, logEntry); err != nil {
			// If we can't even update the log, we have a critical problem.
			// This requires manual intervention.
			return fmt.Errorf("FATAL: failed to update saga log before step %d: %w", i, err)
		}

		ctx, stepSpan := c.tracer.Start(ctx, fmt.Sprintf("SagaStep: %s", step.Name))
		
		// Execute the local transaction.
		outputPayload, err := step.Execute(ctx, currentPayload)
		if err != nil {
			stepSpan.RecordError(err)
			stepSpan.End()
			// Execution failed, start compensation.
			c.compensate(ctx, saga, logEntry)
			return fmt.Errorf("step '%s' failed, compensation triggered: %w", step.Name, err)
		}

		// Update payload for the next step.
		for k, v := range outputPayload {
			currentPayload[k] = v
		}
		
		// Persist the success of the step.
		logEntry.StepHistory[step.Name] = map[string]interface{}{"status": "SUCCEEDED"}
		logEntry.Payload = currentPayload
		stepSpan.End()
	}

	// 3. All steps succeeded. Mark the Saga as completed.
	logEntry.Status = "SUCCEEDED"
	logEntry.CurrentStep = len(saga.Steps)
	if err := c.repo.UpdateLog(ctx, logEntry); err != nil {
		return fmt.Errorf("failed to mark saga as succeeded: %w", err)
	}

	return nil
}

func (c *Coordinator) compensate(ctx context.Context, saga Saga, logEntry *repository.SagaLog) {
	ctx, span := c.tracer.Start(ctx, fmt.Sprintf("Saga.Compensate: %s", saga.Name))
	defer span.End()
	
	span.SetAttributes(attribute.String("saga.id", logEntry.ID))
	logEntry.Status = "COMPENSATING"
	c.repo.UpdateLog(ctx, logEntry) // Best effort update

	// Compensate backwards from the last successfully completed step.
	for i := logEntry.CurrentStep - 1; i >= 0; i-- {
		step := saga.Steps[i]
		
		// Check if this step actually completed before starting compensation.
		history, exists := logEntry.StepHistory[step.Name]
		if !exists || history["status"] != "SUCCEEDED" {
			continue
		}
		
		ctx, compSpan := c.tracer.Start(ctx, fmt.Sprintf("SagaCompensate: %s", step.Name))
		err := step.Compensate(ctx, logEntry.Payload)
		if err != nil {
			// Compensation failure is a critical error that requires manual intervention.
			// Log it and mark the Saga as terminally failed.
			compSpan.RecordError(err)
			logEntry.Status = "COMPENSATION_FAILED"
			logEntry.StepHistory[step.Name] = map[string]interface{}{
				"status": "COMPENSATION_FAILED", 
				"error": err.Error(),
			}
			c.repo.UpdateLog(ctx, logEntry)
			compSpan.End()
			return // Stop further compensation.
		}
		
		logEntry.StepHistory[step.Name] = map[string]interface{}{"status": "COMPENSATED"}
		compSpan.End()
	}

	logEntry.Status = "FAILED"
	c.repo.UpdateLog(ctx, logEntry)
}

此编排器逻辑的健壮性体现在每个状态转换前都会持久化日志。即使进程崩溃,我们也可以通过读取 saga_execution_logs 表来恢复Saga的状态,决定是继续执行还是开始补偿。

集成全链路追踪:使用Zipkin洞察Saga执行过程

一个复杂的Saga流程,尤其当它调用多个微服务时,会变成一个调试黑洞。当某个环节出错时,我们需要一种方式来可视化整个调用链,快速定位问题。这就是Zipkin的用武之地。

通过在 InfraManagerCoordinator 中集成OpenTelemetry,我们已经生成了追踪数据。现在只需配置一个Exporter将这些数据发送到Zipkin实例。

package tracing

import (
	"log"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/exporters/zipkin"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

// InitTracer initializes an OTel tracer provider with a Zipkin exporter.
func InitTracer(serviceName, zipkinURL string) (func(context.Context), error) {
	exporter, err := zipkin.New(zipkinURL)
	if err != nil {
		return nil, err
	}

	// Create a new batch span processor.
	batcher := sdktrace.NewBatchSpanProcessor(exporter)

	tp := sdktrace.NewTracerProvider(
		sdktrace.WithSpanProcessor(batcher),
		sdktrace.WithResource(resource.NewWithAttributes(
			semconv.SchemaURL,
			semconv.ServiceName(serviceName),
		)),
	)
	otel.SetTracerProvider(tp)
	
	// Set the global propagator to W3C Trace Context.
	// This is crucial for propagating context across service boundaries.
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))

	return func(ctx context.Context) {
		if err := tp.Shutdown(ctx); err != nil {
			log.Printf("Error shutting down tracer provider: %v", err)
		}
	}, nil
}

在应用启动时初始化Tracer,之后所有通过otel.Tracer()创建的Span都会被自动导出。当 Coordinator 调用外部服务(如 Initialize Database API)时,我们需要手动注入追踪上下文到HTTP请求头中,以便Zipkin能将不同服务产生的Span关联起来。

// Example of an application service client call inside a Saga step
func initializeDatabase(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error) {
	dbConnStr := payload["dbConnectionString"].(string)
	
	req, err := http.NewRequestWithContext(ctx, "POST", "http://service-a/init", strings.NewReader(dbConnStr))
	if err != nil {
		return nil, err
	}
	
	// Inject the trace context into the outgoing request headers.
	otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))

	// ... execute request ...
	
	return nil, nil
}

当这个Saga流程执行后,我们可以在Zipkin的UI上看到一条完整的轨迹,它清晰地展示了从Saga启动,到Pulumi执行,再到各个应用服务调用的完整流程、耗时和依赖关系。如果某一步失败,错误的Span会高亮显示,极大地简化了故障排查。

最终架构与局限性

我们构建的这套系统,本质上是一个具备事务性保证和深度可观测性的基础设施编排引擎。它通过Saga模式和持久化状态管理,为跨基础设施和应用边界的复杂操作提供了类似ACID的原子性保证。通过集成Zipkin,我们获得了对这个“黑盒”流程的透明度。

然而,这个方案并非银弹。它的主要局限性在于:

  1. 补偿逻辑的复杂性: 编写正确且幂等的补偿操作是一项挑战。pulumi destroy是天然的补偿,但应用层的补偿逻辑(如删除已初始化的数据)可能非常复杂,且容易出错。
  2. 最终一致性: 在失败和补偿期间,系统会处于一个中间状态。依赖此环境的下游系统需要能容忍这种短暂的不一致。
  3. 补偿失败: 补偿操作本身也可能失败。我们的设计中将此标记为COMPENSATION_FAILED状态,并停止后续补偿,这需要人工介入处理,是架构上的一个“兜底”策略。
  4. 状态管理数据库的单点问题: Saga日志数据库的可用性和性能至关重要。在生产环境中,需要为其配置高可用方案。

未来的优化路径可以探索将Saga编排器本身部署为高可用的集群,引入更复杂的重试和错误处理逻辑,或者为补偿失败的场景设计半自动化的恢复工具,以进一步减少人工干预的需要。


  目录