package workflows import ( "encoding/json" "go.temporal.io/sdk/temporal" "sort" "strconv" "time" "beacon/activities" // 假设你的 activity 包在这个路径 "beacon/pkg/pb" // 你的 Protobuf 路径 "beacon/utils" // 导入参数处理器 "go.temporal.io/sdk/workflow" // 假设你有数据库访问层,Workflow 不能直接访问 DB,需要通过 Activity 或预加载数据 // "your_module_path/go-server/dal" // 例如 Data Access Layer ) // DynamicTestSuiteWorkflow 是通用的动态测试工作流 // 该工作流根据数据库中的配置动态执行不同类型的测试步骤,支持条件跳转和错误处理 // 输入参数: // - input: 包含运行ID、复合案例ID和全局参数的动态测试运行输入 // // 返回值: // - TestRunOutput: 包含整体成功状态、各类测试结果的输出 // - error: 执行过程中的错误 func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInput) (*pb.TestRunOutput, error) { // 获取工作流日志记录器,用于记录执行过程 logger := workflow.GetLogger(ctx) logger.Info("DynamicTestSuiteWorkflow started", "runID", input.RunId, "compositeCaseID", input.CompositeCaseId) // ======================================================================================== // 步骤1: 加载复合案例步骤定义 // ======================================================================================== // 注意: Temporal Workflow 不能直接访问数据库,必须通过 Activity 来获取数据 // 有两种设计方案: // 方式一 (推荐): 在启动工作流时,由 Go Client 将完整的步骤数据作为输入参数传入 // 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义 // 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置 ctxActivity := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ TaskQueue: "data-task-queue", // 指定任务队列名称 StartToCloseTimeout: 10 * time.Minute, // Activity 从开始到完成的最大允许时间 HeartbeatTimeout: 30 * time.Second, // Heartbeat 超时时间,防止 Worker 假死 RetryPolicy: &temporal.RetryPolicy{ // Activity 级别的重试策略配置 InitialInterval: time.Second, // 首次重试前的等待时间 BackoffCoefficient: 1.0, // 重试间隔的递增系数 MaximumInterval: time.Minute, // 重试间隔的最大值 MaximumAttempts: 3, // 最大重试次数 NonRetryableErrorTypes: []string{"NonRetryableErrorType"}, // 自定义不可重试的错误类型 }, }) // 应用 Activity 选项到上下文 var caseSteps []*pb.CompositeCaseStepDefinition // 存储从数据库加载的步骤定义列表 // 执行 LoadCompositeCaseSteps Activity 来获取复合案例的所有步骤定义 // 这个 Activity 会根据 CompositeCaseId 查询数据库并返回步骤配置 err := workflow.ExecuteActivity(ctxActivity, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps) if err != nil { logger.Error("Failed to load composite case steps", "error", err) return nil, err } // ======================================================================================== // 步骤2: 对步骤进行排序,确保按正确顺序执行 // ======================================================================================== // 按 step_order 字段升序排序,确保步骤按定义的顺序执行 // 这对于 DAG 结构的正确执行至关重要 sort.Slice(caseSteps, func(i, j int) bool { return caseSteps[i].StepOrder < caseSteps[j].StepOrder }) // ======================================================================================== // 步骤3: 初始化执行状态和结果收集器 // ======================================================================================== var ( overallSuccess = true // 整体执行成功标志,任何一个步骤失败都会置为 false apiResults []*pb.ApiTestResult // 收集所有 API 测试结果 uiResults []*pb.UiTestResult // 收集所有 UI 测试结果 stepResults = make(map[string]bool) // 存储每个步骤的成功/失败状态,用于条件跳转判断 currentStepOrder = 0 // 当前执行步骤的索引,支持非线性跳转 lastBrowserSessionID string // 用于UI测试的浏览器会话ID ) // 初始化全局变量,包含输入参数中的全局参数 globalVariables := make(map[string]interface{}) if input.GlobalParameters != nil { for key, value := range input.GlobalParameters { globalVariables[key] = value } } // 创建参数处理器 parameterProcessor := utils.NewParameterProcessor(globalVariables) logger.Info("Initialized global variables", "variables", globalVariables) // ======================================================================================== // 步骤4: 主执行循环 - 动态执行各个测试步骤 // ======================================================================================== // 使用 while 循环而非 for range,因为需要支持条件跳转(非线性执行) for currentStepOrder < len(caseSteps) { step := caseSteps[currentStepOrder] // 获取当前要执行的步骤 logger.Info("Executing step", "stepOrder", step.StepOrder, "activityName", step.ActivityName) // ------------------------------------------------------------------------------------ // 步骤4.1: 动态构造 Activity 输入参数(支持参数模板解析) // ------------------------------------------------------------------------------------ // 因为不同的 Activity 需要不同类型的输入参数,这里使用工厂模式 // 根据 activity_name 决定使用哪个 Protobuf 结构来反序列化 JSON 参数 var activityInput interface{} // 存储特定 Activity 的输入参数(类型由 activity_name 决定) var activityResult interface{} // 存储特定 Activity 的输出结果(类型由 activity_name 决定) // 使用参数处理器解析参数模板,替换其中的变量引用 processedParametersJson, err := parameterProcessor.ProcessTemplate(step.ParametersJson) if err != nil { logger.Error("Failed to process parameter template", "error", err, "stepOrder", step.StepOrder) overallSuccess = false break } // 验证模板中的变量引用是否有效 if isValid, errors := parameterProcessor.ValidateTemplate(step.ParametersJson); !isValid { logger.Error("Parameter template validation failed", "errors", errors, "stepOrder", step.StepOrder) overallSuccess = false break } // 根据不同的 Activity 类型,将 JSON 字符串参数反序列化为对应的 Protobuf 结构 // 这是动态工作流的核心:同一个工作流可以执行不同类型的测试 switch step.ActivityName { case "RunApiTest": apiReq := &pb.ApiTestRequest{} // 1. 首先验证JSON格式 if !json.Valid([]byte(processedParametersJson)) { logger.Error("Invalid JSON format in parameters") overallSuccess = false break } // 2. 解析JSON时增加错误详情 if err := json.Unmarshal([]byte(processedParametersJson), apiReq); err != nil { logger.Error("Failed to unmarshal API test parameters", "error", err, "raw_json", processedParametersJson) overallSuccess = false break } // 3. 验证必要字段 if apiReq.Endpoint == "" || apiReq.HttpMethod == "" { logger.Error("Missing required fields in API test parameters") overallSuccess = false break } activityInput = apiReq activityResult = &pb.ApiTestResult{} // 预创建结果容器 case "RunUiTest": // UI 测试:创建 UI 测试请求结构并解析参数 uiReq := &pb.UiTestRequest{} if err := json.Unmarshal([]byte(processedParametersJson), uiReq); err != nil { logger.Error("Failed to unmarshal UI test parameters", "error", err) overallSuccess = false break // 参数解析失败,跳出当前步骤 } // 注入 browser_session_id if lastBrowserSessionID != "" { uiReq.BrowserSessionId = &lastBrowserSessionID } activityInput = uiReq activityResult = &pb.UiTestResult{} // 预创建结果容器 case "PrepareEnvironment": // 环境准备:可以扩展更多测试类型 // TODO: 实现环境准备的参数解析 /* prepReq := &pb.PrepareEnvRequest{} if err := json.Unmarshal([]byte(processedParametersJson), prepReq); err != nil { logger.Error("Failed to unmarshal prepare env parameters", "error", err) overallSuccess = false break } activityInput = prepReq activityResult = &pb.PrepareEnvResult{} */ break default: // 未知的 Activity 类型,记录错误并标记失败 logger.Error("Unknown activity name", "activityName", step.ActivityName) overallSuccess = false break } // 如果参数构造失败,跳过当前步骤 if activityInput == nil { overallSuccess = false logger.Error("Activity input could not be constructed for step", "stepOrder", step.StepOrder) break } // ------------------------------------------------------------------------------------ // 步骤4.2: 配置 Activity 执行选项 // ------------------------------------------------------------------------------------ // 为 Activity 设置超时、重试等策略,确保测试的可靠性 ao := workflow.ActivityOptions{ TaskQueue: "python-task-queue", // 指定任务队列名称 StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间 HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行 RetryPolicy: &temporal.RetryPolicy{ // 重试策略配置 InitialInterval: time.Second, // 首次重试间隔 MaximumAttempts: 3, // 最大重试次数 }, } stepCtx := workflow.WithActivityOptions(ctx, ao) // 应用 Activity 选项到上下文 // ------------------------------------------------------------------------------------ // 步骤4.3: 动态执行 Activity // ------------------------------------------------------------------------------------ // 使用反射机制动态调用指定名称的 Activity 函数 // Temporal SDK 会根据 activity_name 找到对应的注册函数并执行 err = workflow.ExecuteActivity(stepCtx, step.ActivityName, activityInput).Get(stepCtx, activityResult) // ------------------------------------------------------------------------------------ // 步骤4.4: 处理 Activity 执行结果 // ------------------------------------------------------------------------------------ stepPassed := true // 当前步骤成功标志 if err != nil { // Activity 执行过程中发生错误(如超时、异常等) logger.Error("Activity execution failed", "activityName", step.ActivityName, "error", err) stepPassed = false overallSuccess = false // 标记整个工作流失败 } else { // Activity 执行成功,需要检查业务逻辑是否成功 // 通过类型断言获取具体的结果并检查 BaseResult.Success 字段 switch res := activityResult.(type) { case *pb.ApiTestResult: apiResults = append(apiResults, res) // 收集 API 测试结果 if !res.BaseResult.Success { // 检查业务逻辑是否成功 stepPassed = false overallSuccess = false } // 存储activity结果用于参数传递 activityResult := &utils.ActivityResult{ StepID: step.StepId, StepName: step.ActivityName, Success: res.BaseResult.Success, Data: res, } parameterProcessor.AddActivityResult(step.StepId, activityResult) case *pb.UiTestResult: uiResults = append(uiResults, res) // 收集 UI 测试结果 if !res.BaseResult.Success { // 检查业务逻辑是否成功 stepPassed = false overallSuccess = false } // 存储activity结果用于参数传递 activityResult := &utils.ActivityResult{ StepID: step.StepId, StepName: step.ActivityName, Success: res.BaseResult.Success, Data: res, } parameterProcessor.AddActivityResult(step.StepId, activityResult) // 记录 browser_session_id 以便下一个 UI 步骤复用 if res.BrowserSessionId != "" { lastBrowserSessionID = res.BrowserSessionId } // 可以在这里添加更多结果类型的处理 } logger.Info("Activity execution finished", "activityName", step.ActivityName, "success", stepPassed) } // 记录当前步骤的执行结果,用于后续条件跳转判断 stepResults[strconv.FormatInt(step.StepId, 10)] = stepPassed // ------------------------------------------------------------------------------------ // 步骤4.5: 根据执行结果确定下一步骤(实现 DAG 的条件跳转) // ------------------------------------------------------------------------------------ // 默认情况下,顺序执行下一个步骤 nextStep := currentStepOrder + 1 // 根据当前步骤的成功/失败状态和预定义的跳转规则确定下一步 if stepPassed && step.SuccessNextStepOrder != nil { // 如果当前步骤成功且定义了成功跳转目标,则跳转到指定步骤 nextStep = int(*step.SuccessNextStepOrder) - 1 // 转换为 0 索引(数据库中可能是 1 索引) } else if !stepPassed && step.FailureNextStepOrder != nil { // 如果当前步骤失败且定义了失败跳转目标,则跳转到指定步骤 nextStep = int(*step.FailureNextStepOrder) - 1 // 转换为 0 索引 } // 验证跳转目标的有效性,防止数组越界 if nextStep < 0 || nextStep >= len(caseSteps) { logger.Info("Next step out of range, terminating workflow", "nextStep", nextStep, "totalSteps", len(caseSteps)) break // 跳转目标无效,退出执行循环 } // 更新当前步骤索引,继续下一轮循环 currentStepOrder = nextStep } // ======================================================================================== // 步骤5: 构造并返回最终执行结果 // ======================================================================================== logger.Info("DynamicTestSuiteWorkflow completed", "runID", input.RunId, "overallSuccess", overallSuccess, "apiResultsCount", len(apiResults), "uiResultsCount", len(uiResults)) // 工作流结束时关闭浏览器会话 if lastBrowserSessionID != "" { closeReq := &pb.CloseBrowserRequest{BrowserSessionId: lastBrowserSessionID} closeCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ TaskQueue: "python-task-queue", StartToCloseTimeout: 2 * time.Minute, }) var closeResp interface{} _ = workflow.ExecuteActivity(closeCtx, "CloseBrowser", closeReq).Get(closeCtx, &closeResp) logger.Info("Closed browser session at workflow end", "browserSessionId", lastBrowserSessionID) } // 返回包含所有测试结果和执行状态的输出结构 return &pb.TestRunOutput{ RunId: input.RunId, // 运行标识符 OverallSuccess: overallSuccess, // 整体成功状态 ApiResults: apiResults, // 所有 API 测试结果 UiResults: uiResults, // 所有 UI 测试结果 CompletionMessage: "Dynamic test suite finished.", // 完成消息 }, nil }