Beacon/workflows/dynamic_workflow.go
2025-06-27 01:01:58 +08:00

312 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package workflows
import (
"encoding/json"
"go.temporal.io/sdk/temporal"
"sort"
"strconv"
"time"
"beacon/activities" // 假设你的 activity 包在这个路径
"beacon/pkg/pb" // 你的 Protobuf 路径
"beacon/utils" // 导入参数处理器
"go.temporal.io/sdk/workflow"
// 假设你有数据库访问层Workflow 不能直接访问 DB需要通过 Activity 或预加载数据
// "your_module_path/go-server/dal" // 例如 Data Access Layer
)
// DynamicTestSuiteWorkflow 是通用的动态测试工作流
// 该工作流根据数据库中的配置动态执行不同类型的测试步骤,支持条件跳转和错误处理
// 输入参数:
// - input: 包含运行ID、复合案例ID和全局参数的动态测试运行输入
//
// 返回值:
// - TestRunOutput: 包含整体成功状态、各类测试结果的输出
// - error: 执行过程中的错误
func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInput) (*pb.TestRunOutput, error) {
// 获取工作流日志记录器,用于记录执行过程
logger := workflow.GetLogger(ctx)
logger.Info("DynamicTestSuiteWorkflow started", "runID", input.RunId, "compositeCaseID", input.CompositeCaseId)
// ========================================================================================
// 步骤1: 加载复合案例步骤定义
// ========================================================================================
// 注意: Temporal Workflow 不能直接访问数据库,必须通过 Activity 来获取数据
// 有两种设计方案:
// 方式一 (推荐): 在启动工作流时,由 Go Client 将完整的步骤数据作为输入参数传入
// 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义
// 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置
ctxActivity := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
TaskQueue: "data-task-queue", // 指定任务队列名称
StartToCloseTimeout: 10 * time.Minute, // Activity 从开始到完成的最大允许时间
HeartbeatTimeout: 30 * time.Second, // Heartbeat 超时时间,防止 Worker 假死
RetryPolicy: &temporal.RetryPolicy{ // Activity 级别的重试策略配置
InitialInterval: time.Second, // 首次重试前的等待时间
BackoffCoefficient: 1.0, // 重试间隔的递增系数
MaximumInterval: time.Minute, // 重试间隔的最大值
MaximumAttempts: 3, // 最大重试次数
NonRetryableErrorTypes: []string{"NonRetryableErrorType"}, // 自定义不可重试的错误类型
},
}) // 应用 Activity 选项到上下文
var caseSteps []*pb.CompositeCaseStepDefinition // 存储从数据库加载的步骤定义列表
// 执行 LoadCompositeCaseSteps Activity 来获取复合案例的所有步骤定义
// 这个 Activity 会根据 CompositeCaseId 查询数据库并返回步骤配置
err := workflow.ExecuteActivity(ctxActivity, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps)
if err != nil {
logger.Error("Failed to load composite case steps", "error", err)
return nil, err
}
// ========================================================================================
// 步骤2: 对步骤进行排序,确保按正确顺序执行
// ========================================================================================
// 按 step_order 字段升序排序,确保步骤按定义的顺序执行
// 这对于 DAG 结构的正确执行至关重要
sort.Slice(caseSteps, func(i, j int) bool {
return caseSteps[i].StepOrder < caseSteps[j].StepOrder
})
// ========================================================================================
// 步骤3: 初始化执行状态和结果收集器
// ========================================================================================
var (
overallSuccess = true // 整体执行成功标志,任何一个步骤失败都会置为 false
apiResults []*pb.ApiTestResult // 收集所有 API 测试结果
uiResults []*pb.UiTestResult // 收集所有 UI 测试结果
stepResults = make(map[string]bool) // 存储每个步骤的成功/失败状态,用于条件跳转判断
currentStepOrder = 0 // 当前执行步骤的索引,支持非线性跳转
)
// 初始化全局变量,包含输入参数中的全局参数
globalVariables := make(map[string]interface{})
if input.GlobalParameters != nil {
for key, value := range input.GlobalParameters {
globalVariables[key] = value
}
}
// 创建参数处理器
parameterProcessor := utils.NewParameterProcessor(globalVariables)
logger.Info("Initialized global variables", "variables", globalVariables)
// ========================================================================================
// 步骤4: 主执行循环 - 动态执行各个测试步骤
// ========================================================================================
// 使用 while 循环而非 for range因为需要支持条件跳转非线性执行
for currentStepOrder < len(caseSteps) {
step := caseSteps[currentStepOrder] // 获取当前要执行的步骤
logger.Info("Executing step", "stepOrder", step.StepOrder, "activityName", step.ActivityName)
// ------------------------------------------------------------------------------------
// 步骤4.1: 动态构造 Activity 输入参数(支持参数模板解析)
// ------------------------------------------------------------------------------------
// 因为不同的 Activity 需要不同类型的输入参数,这里使用工厂模式
// 根据 activity_name 决定使用哪个 Protobuf 结构来反序列化 JSON 参数
var activityInput interface{} // 存储特定 Activity 的输入参数(类型由 activity_name 决定)
var activityResult interface{} // 存储特定 Activity 的输出结果(类型由 activity_name 决定)
// 使用参数处理器解析参数模板,替换其中的变量引用
processedParametersJson, err := parameterProcessor.ProcessTemplate(step.ParametersJson)
if err != nil {
logger.Error("Failed to process parameter template", "error", err, "stepOrder", step.StepOrder)
overallSuccess = false
break
}
// 验证模板中的变量引用是否有效
if isValid, errors := parameterProcessor.ValidateTemplate(step.ParametersJson); !isValid {
logger.Error("Parameter template validation failed", "errors", errors, "stepOrder", step.StepOrder)
overallSuccess = false
break
}
// 根据不同的 Activity 类型,将 JSON 字符串参数反序列化为对应的 Protobuf 结构
// 这是动态工作流的核心:同一个工作流可以执行不同类型的测试
switch step.ActivityName {
case "RunApiTest":
apiReq := &pb.ApiTestRequest{}
// 1. 首先验证JSON格式
if !json.Valid([]byte(processedParametersJson)) {
logger.Error("Invalid JSON format in parameters")
overallSuccess = false
break
}
// 2. 解析JSON时增加错误详情
if err := json.Unmarshal([]byte(processedParametersJson), apiReq); err != nil {
logger.Error("Failed to unmarshal API test parameters",
"error", err,
"raw_json", processedParametersJson)
overallSuccess = false
break
}
// 3. 验证必要字段
if apiReq.Endpoint == "" || apiReq.HttpMethod == "" {
logger.Error("Missing required fields in API test parameters")
overallSuccess = false
break
}
activityInput = apiReq
activityResult = &pb.ApiTestResult{} // 预创建结果容器
case "RunUiTest":
// UI 测试:创建 UI 测试请求结构并解析参数
uiReq := &pb.UiTestRequest{}
if err := json.Unmarshal([]byte(processedParametersJson), uiReq); err != nil {
logger.Error("Failed to unmarshal UI test parameters", "error", err)
overallSuccess = false
break // 参数解析失败,跳出当前步骤
}
activityInput = uiReq
activityResult = &pb.UiTestResult{} // 预创建结果容器
case "PrepareEnvironment":
// 环境准备:可以扩展更多测试类型
// TODO: 实现环境准备的参数解析
/*
prepReq := &pb.PrepareEnvRequest{}
if err := json.Unmarshal([]byte(processedParametersJson), prepReq); err != nil {
logger.Error("Failed to unmarshal prepare env parameters", "error", err)
overallSuccess = false
break
}
activityInput = prepReq
activityResult = &pb.PrepareEnvResult{}
*/
break
default:
// 未知的 Activity 类型,记录错误并标记失败
logger.Error("Unknown activity name", "activityName", step.ActivityName)
overallSuccess = false
break
}
// 如果参数构造失败,跳过当前步骤
if activityInput == nil {
overallSuccess = false
logger.Error("Activity input could not be constructed for step", "stepOrder", step.StepOrder)
break
}
// ------------------------------------------------------------------------------------
// 步骤4.2: 配置 Activity 执行选项
// ------------------------------------------------------------------------------------
// 为 Activity 设置超时、重试等策略,确保测试的可靠性
ao := workflow.ActivityOptions{
TaskQueue: "python-task-queue", // 指定任务队列名称
StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间
HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行
RetryPolicy: &temporal.RetryPolicy{ // 重试策略配置
InitialInterval: time.Second, // 首次重试间隔
MaximumAttempts: 3, // 最大重试次数
},
}
stepCtx := workflow.WithActivityOptions(ctx, ao) // 应用 Activity 选项到上下文
// ------------------------------------------------------------------------------------
// 步骤4.3: 动态执行 Activity
// ------------------------------------------------------------------------------------
// 使用反射机制动态调用指定名称的 Activity 函数
// Temporal SDK 会根据 activity_name 找到对应的注册函数并执行
err = workflow.ExecuteActivity(stepCtx, step.ActivityName, activityInput).Get(stepCtx, activityResult)
// ------------------------------------------------------------------------------------
// 步骤4.4: 处理 Activity 执行结果
// ------------------------------------------------------------------------------------
stepPassed := true // 当前步骤成功标志
if err != nil {
// Activity 执行过程中发生错误(如超时、异常等)
logger.Error("Activity execution failed", "activityName", step.ActivityName, "error", err)
stepPassed = false
overallSuccess = false // 标记整个工作流失败
} else {
// Activity 执行成功,需要检查业务逻辑是否成功
// 通过类型断言获取具体的结果并检查 BaseResult.Success 字段
switch res := activityResult.(type) {
case *pb.ApiTestResult:
apiResults = append(apiResults, res) // 收集 API 测试结果
if !res.BaseResult.Success { // 检查业务逻辑是否成功
stepPassed = false
overallSuccess = false
}
// 存储activity结果用于参数传递
activityResult := &utils.ActivityResult{
StepID: step.StepId,
StepName: step.ActivityName,
Success: res.BaseResult.Success,
Data: res,
}
parameterProcessor.AddActivityResult(step.StepId, activityResult)
case *pb.UiTestResult:
uiResults = append(uiResults, res) // 收集 UI 测试结果
if !res.BaseResult.Success { // 检查业务逻辑是否成功
stepPassed = false
overallSuccess = false
}
// 存储activity结果用于参数传递
activityResult := &utils.ActivityResult{
StepID: step.StepId,
StepName: step.ActivityName,
Success: res.BaseResult.Success,
Data: res,
}
parameterProcessor.AddActivityResult(step.StepId, activityResult)
// 可以在这里添加更多结果类型的处理
}
logger.Info("Activity execution finished", "activityName", step.ActivityName, "success", stepPassed)
}
// 记录当前步骤的执行结果,用于后续条件跳转判断
stepResults[strconv.FormatInt(step.StepId, 10)] = stepPassed
// ------------------------------------------------------------------------------------
// 步骤4.5: 根据执行结果确定下一步骤(实现 DAG 的条件跳转)
// ------------------------------------------------------------------------------------
// 默认情况下,顺序执行下一个步骤
nextStep := currentStepOrder + 1
// 根据当前步骤的成功/失败状态和预定义的跳转规则确定下一步
if stepPassed && step.SuccessNextStepOrder != nil {
// 如果当前步骤成功且定义了成功跳转目标,则跳转到指定步骤
nextStep = int(*step.SuccessNextStepOrder) - 1 // 转换为 0 索引(数据库中可能是 1 索引)
} else if !stepPassed && step.FailureNextStepOrder != nil {
// 如果当前步骤失败且定义了失败跳转目标,则跳转到指定步骤
nextStep = int(*step.FailureNextStepOrder) - 1 // 转换为 0 索引
}
// 验证跳转目标的有效性,防止数组越界
if nextStep < 0 || nextStep >= len(caseSteps) {
logger.Info("Next step out of range, terminating workflow", "nextStep", nextStep, "totalSteps", len(caseSteps))
break // 跳转目标无效,退出执行循环
}
// 更新当前步骤索引,继续下一轮循环
currentStepOrder = nextStep
}
// ========================================================================================
// 步骤5: 构造并返回最终执行结果
// ========================================================================================
logger.Info("DynamicTestSuiteWorkflow completed",
"runID", input.RunId,
"overallSuccess", overallSuccess,
"apiResultsCount", len(apiResults),
"uiResultsCount", len(uiResults))
// 返回包含所有测试结果和执行状态的输出结构
return &pb.TestRunOutput{
RunId: input.RunId, // 运行标识符
OverallSuccess: overallSuccess, // 整体成功状态
ApiResults: apiResults, // 所有 API 测试结果
UiResults: uiResults, // 所有 UI 测试结果
CompletionMessage: "Dynamic test suite finished.", // 完成消息
}, nil
}