From 71eb131e581ec511cbd0e544427c60d8429f10c5 Mon Sep 17 00:00:00 2001 From: longpeng Date: Tue, 24 Jun 2025 22:49:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20DynamicTestSuiteWorkflow?= =?UTF-8?q?=EF=BC=8C=E5=AE=8C=E5=96=84=E4=B8=AD=E6=96=87=E6=B3=A8=E9=87=8A?= =?UTF-8?q?=EF=BC=8C=E6=96=B0=E5=A2=9E=E6=AD=A5=E9=AA=A4=E5=8A=A0=E8=BD=BD?= =?UTF-8?q?=E3=80=81=E6=8E=92=E5=BA=8F=E5=8F=8A=E5=8A=A8=E6=80=81=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E9=80=BB=E8=BE=91=EF=BC=8C=E6=94=AF=E6=8C=81=E6=9D=A1?= =?UTF-8?q?=E4=BB=B6=E8=B7=B3=E8=BD=AC=E4=B8=8E=E9=94=99=E8=AF=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- proto/dynamic_test.proto | 2 +- workflows/dynamic_workflow.go | 186 ++++++++++++++++++++++++---------- 2 files changed, 131 insertions(+), 57 deletions(-) diff --git a/proto/dynamic_test.proto b/proto/dynamic_test.proto index 78baa60..6ab937f 100644 --- a/proto/dynamic_test.proto +++ b/proto/dynamic_test.proto @@ -12,7 +12,7 @@ message DynamicTestRunInput { // 这里的 steps 会在 Go 服务端启动 Workflow 时,根据数据库查询结果动态构建 // 也可以不在 Proto 中定义,直接在 Workflow 中根据 DB 数据拉取并使用 // 但如果希望 Workflow 的输入本身包含整个流程定义,可以定义一个嵌套结构 - // repeated DynamicStep steps = 4; // 也可以通过 DB ID 在 Workflow 中获取 + repeated DynamicStep steps = 4; // 也可以通过 DB ID 在 Workflow 中获取 } message DynamicStep { diff --git a/workflows/dynamic_workflow.go b/workflows/dynamic_workflow.go index 694fc42..5786f50 100644 --- a/workflows/dynamic_workflow.go +++ b/workflows/dynamic_workflow.go @@ -13,149 +13,223 @@ import ( // "your_module_path/go-server/dal" // 例如 Data Access Layer ) -// DynamicTestSuiteWorkflow 是通用的工作流,根据配置动态执行测试 +// DynamicTestSuiteWorkflow 是通用的动态测试工作流 +// 该工作流根据数据库中的配置动态执行不同类型的测试步骤,支持条件跳转和错误处理 +// 输入参数: +// - input: 包含运行ID、复合案例ID和全局参数的动态测试运行输入 +// +// 返回值: +// - TestRunOutput: 包含整体成功状态、各类测试结果的输出 +// - error: 执行过程中的错误 func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInput) (*pb.TestRunOutput, error) { + // 获取工作流日志记录器,用于记录执行过程 logger := workflow.GetLogger(ctx) logger.Info("DynamicTestSuiteWorkflow started", "runID", input.RunId, "compositeCaseID", input.CompositeCaseId) - // Workflow 不能直接访问数据库。 - // 方式一 (推荐): 在 Workflow 启动时,由 Go Client 将完整的复合案例步骤数据作为 DynamicTestRunInput 的一部分传入。 - // 方式二 (如果数据太大): Workflow 通过一个 "LoadCompositeCaseSteps" Activity 来从 DB 获取数据。 - // 这里假设 input 包含了所有步骤信息,或者我们在 Workflow 开始时通过 Activity 加载了 + // ======================================================================================== + // 步骤1: 加载复合案例步骤定义 + // ======================================================================================== + // 注意: Temporal Workflow 不能直接访问数据库,必须通过 Activity 来获取数据 + // 有两种设计方案: + // 方式一 (推荐): 在启动工作流时,由 Go Client 将完整的步骤数据作为输入参数传入 + // 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义 + // 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置 - // 假设我们通过 Activity 加载步骤定义 - var caseSteps []*pb.CompositeCaseStepDefinition // 你需要为这个结构定义一个 Protobuf 消息 + var caseSteps []*pb.CompositeCaseStepDefinition // 存储从数据库加载的步骤定义列表 + + // 执行 LoadCompositeCaseSteps Activity 来获取复合案例的所有步骤定义 + // 这个 Activity 会根据 CompositeCaseId 查询数据库并返回步骤配置 err := workflow.ExecuteActivity(ctx, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps) if err != nil { logger.Error("Failed to load composite case steps", "error", err) return nil, err } - // 按 step_order 排序 + // ======================================================================================== + // 步骤2: 对步骤进行排序,确保按正确顺序执行 + // ======================================================================================== + // 按 step_order 字段升序排序,确保步骤按定义的顺序执行 + // 这对于 DAG 结构的正确执行至关重要 sort.Slice(caseSteps, func(i, j int) bool { return caseSteps[i].StepOrder < caseSteps[j].StepOrder }) + // ======================================================================================== + // 步骤3: 初始化执行状态和结果收集器 + // ======================================================================================== var ( - overallSuccess = true - apiResults []*pb.ApiTestResult - uiResults []*pb.UiTestResult - stepResults = make(map[string]bool) // 存储每个步骤的成功状态 - currentStepOrder = 0 + overallSuccess = true // 整体执行成功标志,任何一个步骤失败都会置为 false + apiResults []*pb.ApiTestResult // 收集所有 API 测试结果 + uiResults []*pb.UiTestResult // 收集所有 UI 测试结果 + stepResults = make(map[string]bool) // 存储每个步骤的成功/失败状态,用于条件跳转判断 + currentStepOrder = 0 // 当前执行步骤的索引,支持非线性跳转 ) - // 循环执行步骤 + // ======================================================================================== + // 步骤4: 主执行循环 - 动态执行各个测试步骤 + // ======================================================================================== + // 使用 while 循环而非 for range,因为需要支持条件跳转(非线性执行) for currentStepOrder < len(caseSteps) { - step := caseSteps[currentStepOrder] + step := caseSteps[currentStepOrder] // 获取当前要执行的步骤 logger.Info("Executing step", "stepOrder", step.StepOrder, "activityName", step.ActivityName) - // 动态解析 Activity 参数 - var activityInput interface{} // 使用 interface{} 来存储 Protobuf Activity Input - var activityResult interface{} // 使用 interface{} 来存储 Protobuf Activity Output + // ------------------------------------------------------------------------------------ + // 步骤4.1: 动态构造 Activity 输入参数 + // ------------------------------------------------------------------------------------ + // 因为不同的 Activity 需要不同类型的输入参数,这里使用工厂模式 + // 根据 activity_name 决定使用哪个 Protobuf 结构来反序列化 JSON 参数 + var activityInput interface{} // 存储特定 Activity 的输入参数(类型由 activity_name 决定) + var activityResult interface{} // 存储特定 Activity 的输出结果(类型由 activity_name 决定) - // 将 JSON 参数字符串反序列化到对应 Activity 的 Protobuf 结构 - // 这是最复杂的部分,需要一个映射表或工厂函数来根据 activity_name 决定用哪个 Protobuf 结构 - // 例如: + // 根据不同的 Activity 类型,将 JSON 字符串参数反序列化为对应的 Protobuf 结构 + // 这是动态工作流的核心:同一个工作流可以执行不同类型的测试 switch step.ActivityName { case "RunApiTest": + // API 测试:创建 API 测试请求结构并解析参数 apiReq := &pb.ApiTestRequest{} if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil { logger.Error("Failed to unmarshal API test parameters", "error", err) overallSuccess = false - break // 跳出当前步骤 + break // 参数解析失败,跳出当前步骤 } activityInput = apiReq - activityResult = &pb.ApiTestResult{} + activityResult = &pb.ApiTestResult{} // 预创建结果容器 + case "RunUiTest": + // UI 测试:创建 UI 测试请求结构并解析参数 uiReq := &pb.UiTestRequest{} if err := json.Unmarshal([]byte(step.ParametersJson), uiReq); err != nil { logger.Error("Failed to unmarshal UI test parameters", "error", err) overallSuccess = false - break // 跳出当前步骤 + break // 参数解析失败,跳出当前步骤 } activityInput = uiReq - activityResult = &pb.UiTestResult{} + activityResult = &pb.UiTestResult{} // 预创建结果容器 + case "PrepareEnvironment": - // ... - /*activityInput = &pb.PrepareEnvRequest{} // 假设有这个 - activityResult = &pb.PrepareEnvResult{}*/ + // 环境准备:可以扩展更多测试类型 + // TODO: 实现环境准备的参数解析 + /* + prepReq := &pb.PrepareEnvRequest{} + if err := json.Unmarshal([]byte(step.ParametersJson), prepReq); err != nil { + logger.Error("Failed to unmarshal prepare env parameters", "error", err) + overallSuccess = false + break + } + activityInput = prepReq + activityResult = &pb.PrepareEnvResult{} + */ break + default: + // 未知的 Activity 类型,记录错误并标记失败 logger.Error("Unknown activity name", "activityName", step.ActivityName) overallSuccess = false break } + // 如果参数构造失败,跳过当前步骤 if activityInput == nil { overallSuccess = false logger.Error("Activity input could not be constructed for step", "stepOrder", step.StepOrder) break } - // 设置 Activity Options + // ------------------------------------------------------------------------------------ + // 步骤4.2: 配置 Activity 执行选项 + // ------------------------------------------------------------------------------------ + // 为 Activity 设置超时、重试等策略,确保测试的可靠性 ao := workflow.ActivityOptions{ - StartToCloseTimeout: 10 * time.Minute, - HeartbeatTimeout: 30 * time.Second, - RetryPolicy: &temporal.RetryPolicy{ - InitialInterval: time.Second, - MaximumAttempts: 3, + StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间 + HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行 + RetryPolicy: &temporal.RetryPolicy{ // 重试策略配置 + InitialInterval: time.Second, // 首次重试间隔 + MaximumAttempts: 3, // 最大重试次数 }, } - stepCtx := workflow.WithActivityOptions(ctx, ao) + stepCtx := workflow.WithActivityOptions(ctx, ao) // 应用 Activity 选项到上下文 - // 动态执行 Activity + // ------------------------------------------------------------------------------------ + // 步骤4.3: 动态执行 Activity + // ------------------------------------------------------------------------------------ + // 使用反射机制动态调用指定名称的 Activity 函数 + // Temporal SDK 会根据 activity_name 找到对应的注册函数并执行 err = workflow.ExecuteActivity(stepCtx, step.ActivityName, activityInput).Get(stepCtx, activityResult) - stepPassed := true + // ------------------------------------------------------------------------------------ + // 步骤4.4: 处理 Activity 执行结果 + // ------------------------------------------------------------------------------------ + stepPassed := true // 当前步骤成功标志 + if err != nil { + // Activity 执行过程中发生错误(如超时、异常等) logger.Error("Activity execution failed", "activityName", step.ActivityName, "error", err) stepPassed = false - overallSuccess = false // 标记整个流程失败 + overallSuccess = false // 标记整个工作流失败 } else { - // 根据 activityResult 提取 BaseTestResult.Success - // 这里需要类型断言来获取具体的结果 + // Activity 执行成功,需要检查业务逻辑是否成功 + // 通过类型断言获取具体的结果并检查 BaseResult.Success 字段 switch res := activityResult.(type) { case *pb.ApiTestResult: - apiResults = append(apiResults, res) - if !res.BaseResult.Success { + apiResults = append(apiResults, res) // 收集 API 测试结果 + if !res.BaseResult.Success { // 检查业务逻辑是否成功 stepPassed = false overallSuccess = false } case *pb.UiTestResult: - uiResults = append(uiResults, res) - if !res.BaseResult.Success { + uiResults = append(uiResults, res) // 收集 UI 测试结果 + if !res.BaseResult.Success { // 检查业务逻辑是否成功 stepPassed = false overallSuccess = false } + // 可以在这里添加更多结果类型的处理 } logger.Info("Activity execution finished", "activityName", step.ActivityName, "success", stepPassed) } + // 记录当前步骤的执行结果,用于后续条件跳转判断 stepResults[step.StepId] = stepPassed - // 根据结果和条件确定下一步 - nextStep := currentStepOrder + 1 // 默认顺序执行 + // ------------------------------------------------------------------------------------ + // 步骤4.5: 根据执行结果确定下一步骤(实现 DAG 的条件跳转) + // ------------------------------------------------------------------------------------ + // 默认情况下,顺序执行下一个步骤 + nextStep := currentStepOrder + 1 + // 根据当前步骤的成功/失败状态和预定义的跳转规则确定下一步 if stepPassed && step.SuccessNextStepOrder != nil { - nextStep = int(*step.SuccessNextStepOrder) - 1 // Adjust for 0-indexed slice + // 如果当前步骤成功且定义了成功跳转目标,则跳转到指定步骤 + nextStep = int(*step.SuccessNextStepOrder) - 1 // 转换为 0 索引(数据库中可能是 1 索引) } else if !stepPassed && step.FailureNextStepOrder != nil { - nextStep = int(*step.FailureNextStepOrder) - 1 + // 如果当前步骤失败且定义了失败跳转目标,则跳转到指定步骤 + nextStep = int(*step.FailureNextStepOrder) - 1 // 转换为 0 索引 } - // 确保跳转的 nextStep 索引有效 + // 验证跳转目标的有效性,防止数组越界 if nextStep < 0 || nextStep >= len(caseSteps) { - break // 超出范围,退出循环 + logger.Info("Next step out of range, terminating workflow", "nextStep", nextStep, "totalSteps", len(caseSteps)) + break // 跳转目标无效,退出执行循环 } + // 更新当前步骤索引,继续下一轮循环 currentStepOrder = nextStep } - // 最终返回结果 + // ======================================================================================== + // 步骤5: 构造并返回最终执行结果 + // ======================================================================================== + logger.Info("DynamicTestSuiteWorkflow completed", + "runID", input.RunId, + "overallSuccess", overallSuccess, + "apiResultsCount", len(apiResults), + "uiResultsCount", len(uiResults)) + + // 返回包含所有测试结果和执行状态的输出结构 return &pb.TestRunOutput{ - RunId: input.RunId, - OverallSuccess: overallSuccess, - ApiResults: apiResults, - UiResults: uiResults, - CompletionMessage: "Dynamic test suite finished.", + RunId: input.RunId, // 运行标识符 + OverallSuccess: overallSuccess, // 整体成功状态 + ApiResults: apiResults, // 所有 API 测试结果 + UiResults: uiResults, // 所有 UI 测试结果 + CompletionMessage: "Dynamic test suite finished.", // 完成消息 }, nil }