更新 DynamicTestSuiteWorkflow,完善中文注释,新增步骤加载、排序及动态执行逻辑,支持条件跳转与错误处理

This commit is contained in:
longpeng 2025-06-24 22:49:05 +08:00
parent 1cf5e37f29
commit 71eb131e58
2 changed files with 131 additions and 57 deletions

View File

@ -12,7 +12,7 @@ message DynamicTestRunInput {
// steps Go Workflow // steps Go Workflow
// Proto Workflow DB 使 // Proto Workflow DB 使
// Workflow // Workflow
// repeated DynamicStep steps = 4; // DB ID Workflow repeated DynamicStep steps = 4; // DB ID Workflow
} }
message DynamicStep { message DynamicStep {

View File

@ -13,149 +13,223 @@ import (
// "your_module_path/go-server/dal" // 例如 Data Access Layer // "your_module_path/go-server/dal" // 例如 Data Access Layer
) )
// DynamicTestSuiteWorkflow 是通用的工作流,根据配置动态执行测试 // DynamicTestSuiteWorkflow 是通用的动态测试工作流
// 该工作流根据数据库中的配置动态执行不同类型的测试步骤,支持条件跳转和错误处理
// 输入参数:
// - input: 包含运行ID、复合案例ID和全局参数的动态测试运行输入
//
// 返回值:
// - TestRunOutput: 包含整体成功状态、各类测试结果的输出
// - error: 执行过程中的错误
func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInput) (*pb.TestRunOutput, error) { func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInput) (*pb.TestRunOutput, error) {
// 获取工作流日志记录器,用于记录执行过程
logger := workflow.GetLogger(ctx) logger := workflow.GetLogger(ctx)
logger.Info("DynamicTestSuiteWorkflow started", "runID", input.RunId, "compositeCaseID", input.CompositeCaseId) logger.Info("DynamicTestSuiteWorkflow started", "runID", input.RunId, "compositeCaseID", input.CompositeCaseId)
// Workflow 不能直接访问数据库。 // ========================================================================================
// 方式一 (推荐): 在 Workflow 启动时,由 Go Client 将完整的复合案例步骤数据作为 DynamicTestRunInput 的一部分传入。 // 步骤1: 加载复合案例步骤定义
// 方式二 (如果数据太大): Workflow 通过一个 "LoadCompositeCaseSteps" Activity 来从 DB 获取数据。 // ========================================================================================
// 这里假设 input 包含了所有步骤信息,或者我们在 Workflow 开始时通过 Activity 加载了 // 注意: Temporal Workflow 不能直接访问数据库,必须通过 Activity 来获取数据
// 有两种设计方案:
// 方式一 (推荐): 在启动工作流时,由 Go Client 将完整的步骤数据作为输入参数传入
// 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义
// 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置
// 假设我们通过 Activity 加载步骤定义 var caseSteps []*pb.CompositeCaseStepDefinition // 存储从数据库加载的步骤定义列表
var caseSteps []*pb.CompositeCaseStepDefinition // 你需要为这个结构定义一个 Protobuf 消息
// 执行 LoadCompositeCaseSteps Activity 来获取复合案例的所有步骤定义
// 这个 Activity 会根据 CompositeCaseId 查询数据库并返回步骤配置
err := workflow.ExecuteActivity(ctx, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps) err := workflow.ExecuteActivity(ctx, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps)
if err != nil { if err != nil {
logger.Error("Failed to load composite case steps", "error", err) logger.Error("Failed to load composite case steps", "error", err)
return nil, err return nil, err
} }
// 按 step_order 排序 // ========================================================================================
// 步骤2: 对步骤进行排序,确保按正确顺序执行
// ========================================================================================
// 按 step_order 字段升序排序,确保步骤按定义的顺序执行
// 这对于 DAG 结构的正确执行至关重要
sort.Slice(caseSteps, func(i, j int) bool { sort.Slice(caseSteps, func(i, j int) bool {
return caseSteps[i].StepOrder < caseSteps[j].StepOrder return caseSteps[i].StepOrder < caseSteps[j].StepOrder
}) })
// ========================================================================================
// 步骤3: 初始化执行状态和结果收集器
// ========================================================================================
var ( var (
overallSuccess = true overallSuccess = true // 整体执行成功标志,任何一个步骤失败都会置为 false
apiResults []*pb.ApiTestResult apiResults []*pb.ApiTestResult // 收集所有 API 测试结果
uiResults []*pb.UiTestResult uiResults []*pb.UiTestResult // 收集所有 UI 测试结果
stepResults = make(map[string]bool) // 存储每个步骤的成功状态 stepResults = make(map[string]bool) // 存储每个步骤的成功/失败状态,用于条件跳转判断
currentStepOrder = 0 currentStepOrder = 0 // 当前执行步骤的索引,支持非线性跳转
) )
// 循环执行步骤 // ========================================================================================
// 步骤4: 主执行循环 - 动态执行各个测试步骤
// ========================================================================================
// 使用 while 循环而非 for range因为需要支持条件跳转非线性执行
for currentStepOrder < len(caseSteps) { for currentStepOrder < len(caseSteps) {
step := caseSteps[currentStepOrder] step := caseSteps[currentStepOrder] // 获取当前要执行的步骤
logger.Info("Executing step", "stepOrder", step.StepOrder, "activityName", step.ActivityName) logger.Info("Executing step", "stepOrder", step.StepOrder, "activityName", step.ActivityName)
// 动态解析 Activity 参数 // ------------------------------------------------------------------------------------
var activityInput interface{} // 使用 interface{} 来存储 Protobuf Activity Input // 步骤4.1: 动态构造 Activity 输入参数
var activityResult interface{} // 使用 interface{} 来存储 Protobuf Activity Output // ------------------------------------------------------------------------------------
// 因为不同的 Activity 需要不同类型的输入参数,这里使用工厂模式
// 根据 activity_name 决定使用哪个 Protobuf 结构来反序列化 JSON 参数
var activityInput interface{} // 存储特定 Activity 的输入参数(类型由 activity_name 决定)
var activityResult interface{} // 存储特定 Activity 的输出结果(类型由 activity_name 决定)
// 将 JSON 参数字符串反序列化到对应 Activity 的 Protobuf 结构 // 根据不同的 Activity 类型,将 JSON 字符串参数反序列化为对应的 Protobuf 结构
// 这是最复杂的部分,需要一个映射表或工厂函数来根据 activity_name 决定用哪个 Protobuf 结构 // 这是动态工作流的核心:同一个工作流可以执行不同类型的测试
// 例如:
switch step.ActivityName { switch step.ActivityName {
case "RunApiTest": case "RunApiTest":
// API 测试:创建 API 测试请求结构并解析参数
apiReq := &pb.ApiTestRequest{} apiReq := &pb.ApiTestRequest{}
if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil { if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil {
logger.Error("Failed to unmarshal API test parameters", "error", err) logger.Error("Failed to unmarshal API test parameters", "error", err)
overallSuccess = false overallSuccess = false
break // 跳出当前步骤 break // 参数解析失败,跳出当前步骤
} }
activityInput = apiReq activityInput = apiReq
activityResult = &pb.ApiTestResult{} activityResult = &pb.ApiTestResult{} // 预创建结果容器
case "RunUiTest": case "RunUiTest":
// UI 测试:创建 UI 测试请求结构并解析参数
uiReq := &pb.UiTestRequest{} uiReq := &pb.UiTestRequest{}
if err := json.Unmarshal([]byte(step.ParametersJson), uiReq); err != nil { if err := json.Unmarshal([]byte(step.ParametersJson), uiReq); err != nil {
logger.Error("Failed to unmarshal UI test parameters", "error", err) logger.Error("Failed to unmarshal UI test parameters", "error", err)
overallSuccess = false overallSuccess = false
break // 跳出当前步骤 break // 参数解析失败,跳出当前步骤
} }
activityInput = uiReq activityInput = uiReq
activityResult = &pb.UiTestResult{} activityResult = &pb.UiTestResult{} // 预创建结果容器
case "PrepareEnvironment": case "PrepareEnvironment":
// ... // 环境准备:可以扩展更多测试类型
/*activityInput = &pb.PrepareEnvRequest{} // 假设有这个 // TODO: 实现环境准备的参数解析
activityResult = &pb.PrepareEnvResult{}*/ /*
prepReq := &pb.PrepareEnvRequest{}
if err := json.Unmarshal([]byte(step.ParametersJson), prepReq); err != nil {
logger.Error("Failed to unmarshal prepare env parameters", "error", err)
overallSuccess = false
break break
}
activityInput = prepReq
activityResult = &pb.PrepareEnvResult{}
*/
break
default: default:
// 未知的 Activity 类型,记录错误并标记失败
logger.Error("Unknown activity name", "activityName", step.ActivityName) logger.Error("Unknown activity name", "activityName", step.ActivityName)
overallSuccess = false overallSuccess = false
break break
} }
// 如果参数构造失败,跳过当前步骤
if activityInput == nil { if activityInput == nil {
overallSuccess = false overallSuccess = false
logger.Error("Activity input could not be constructed for step", "stepOrder", step.StepOrder) logger.Error("Activity input could not be constructed for step", "stepOrder", step.StepOrder)
break break
} }
// 设置 Activity Options // ------------------------------------------------------------------------------------
// 步骤4.2: 配置 Activity 执行选项
// ------------------------------------------------------------------------------------
// 为 Activity 设置超时、重试等策略,确保测试的可靠性
ao := workflow.ActivityOptions{ ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute, StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间
HeartbeatTimeout: 30 * time.Second, HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行
RetryPolicy: &temporal.RetryPolicy{ RetryPolicy: &temporal.RetryPolicy{ // 重试策略配置
InitialInterval: time.Second, InitialInterval: time.Second, // 首次重试间隔
MaximumAttempts: 3, MaximumAttempts: 3, // 最大重试次数
}, },
} }
stepCtx := workflow.WithActivityOptions(ctx, ao) stepCtx := workflow.WithActivityOptions(ctx, ao) // 应用 Activity 选项到上下文
// 动态执行 Activity // ------------------------------------------------------------------------------------
// 步骤4.3: 动态执行 Activity
// ------------------------------------------------------------------------------------
// 使用反射机制动态调用指定名称的 Activity 函数
// Temporal SDK 会根据 activity_name 找到对应的注册函数并执行
err = workflow.ExecuteActivity(stepCtx, step.ActivityName, activityInput).Get(stepCtx, activityResult) err = workflow.ExecuteActivity(stepCtx, step.ActivityName, activityInput).Get(stepCtx, activityResult)
stepPassed := true // ------------------------------------------------------------------------------------
// 步骤4.4: 处理 Activity 执行结果
// ------------------------------------------------------------------------------------
stepPassed := true // 当前步骤成功标志
if err != nil { if err != nil {
// Activity 执行过程中发生错误(如超时、异常等)
logger.Error("Activity execution failed", "activityName", step.ActivityName, "error", err) logger.Error("Activity execution failed", "activityName", step.ActivityName, "error", err)
stepPassed = false stepPassed = false
overallSuccess = false // 标记整个流程失败 overallSuccess = false // 标记整个工作流失败
} else { } else {
// 根据 activityResult 提取 BaseTestResult.Success // Activity 执行成功,需要检查业务逻辑是否成功
// 这里需要类型断言来获取具体的结果 // 通过类型断言获取具体的结果并检查 BaseResult.Success 字段
switch res := activityResult.(type) { switch res := activityResult.(type) {
case *pb.ApiTestResult: case *pb.ApiTestResult:
apiResults = append(apiResults, res) apiResults = append(apiResults, res) // 收集 API 测试结果
if !res.BaseResult.Success { if !res.BaseResult.Success { // 检查业务逻辑是否成功
stepPassed = false stepPassed = false
overallSuccess = false overallSuccess = false
} }
case *pb.UiTestResult: case *pb.UiTestResult:
uiResults = append(uiResults, res) uiResults = append(uiResults, res) // 收集 UI 测试结果
if !res.BaseResult.Success { if !res.BaseResult.Success { // 检查业务逻辑是否成功
stepPassed = false stepPassed = false
overallSuccess = false overallSuccess = false
} }
// 可以在这里添加更多结果类型的处理
} }
logger.Info("Activity execution finished", "activityName", step.ActivityName, "success", stepPassed) logger.Info("Activity execution finished", "activityName", step.ActivityName, "success", stepPassed)
} }
// 记录当前步骤的执行结果,用于后续条件跳转判断
stepResults[step.StepId] = stepPassed stepResults[step.StepId] = stepPassed
// 根据结果和条件确定下一步 // ------------------------------------------------------------------------------------
nextStep := currentStepOrder + 1 // 默认顺序执行 // 步骤4.5: 根据执行结果确定下一步骤(实现 DAG 的条件跳转)
// ------------------------------------------------------------------------------------
// 默认情况下,顺序执行下一个步骤
nextStep := currentStepOrder + 1
// 根据当前步骤的成功/失败状态和预定义的跳转规则确定下一步
if stepPassed && step.SuccessNextStepOrder != nil { if stepPassed && step.SuccessNextStepOrder != nil {
nextStep = int(*step.SuccessNextStepOrder) - 1 // Adjust for 0-indexed slice // 如果当前步骤成功且定义了成功跳转目标,则跳转到指定步骤
nextStep = int(*step.SuccessNextStepOrder) - 1 // 转换为 0 索引(数据库中可能是 1 索引)
} else if !stepPassed && step.FailureNextStepOrder != nil { } else if !stepPassed && step.FailureNextStepOrder != nil {
nextStep = int(*step.FailureNextStepOrder) - 1 // 如果当前步骤失败且定义了失败跳转目标,则跳转到指定步骤
nextStep = int(*step.FailureNextStepOrder) - 1 // 转换为 0 索引
} }
// 确保跳转的 nextStep 索引有效 // 验证跳转目标的有效性,防止数组越界
if nextStep < 0 || nextStep >= len(caseSteps) { if nextStep < 0 || nextStep >= len(caseSteps) {
break // 超出范围,退出循环 logger.Info("Next step out of range, terminating workflow", "nextStep", nextStep, "totalSteps", len(caseSteps))
break // 跳转目标无效,退出执行循环
} }
// 更新当前步骤索引,继续下一轮循环
currentStepOrder = nextStep currentStepOrder = nextStep
} }
// 最终返回结果 // ========================================================================================
// 步骤5: 构造并返回最终执行结果
// ========================================================================================
logger.Info("DynamicTestSuiteWorkflow completed",
"runID", input.RunId,
"overallSuccess", overallSuccess,
"apiResultsCount", len(apiResults),
"uiResultsCount", len(uiResults))
// 返回包含所有测试结果和执行状态的输出结构
return &pb.TestRunOutput{ return &pb.TestRunOutput{
RunId: input.RunId, RunId: input.RunId, // 运行标识符
OverallSuccess: overallSuccess, OverallSuccess: overallSuccess, // 整体成功状态
ApiResults: apiResults, ApiResults: apiResults, // 所有 API 测试结果
UiResults: uiResults, UiResults: uiResults, // 所有 UI 测试结果
CompletionMessage: "Dynamic test suite finished.", CompletionMessage: "Dynamic test suite finished.", // 完成消息
}, nil }, nil
} }