@ -1,235 +0,0 @@
package workflows
import (
"encoding/json"
"go.temporal.io/sdk/temporal"
"sort"
"time"
"beacon/activities" // 假设你的 activity 包在这个路径
"beacon/pkg/pb" // 你的 Protobuf 路径
"go.temporal.io/sdk/workflow"
// 假设你有数据库访问层, Workflow 不能直接访问 DB, 需要通过 Activity 或预加载数据
// "your_module_path/go-server/dal" // 例如 Data Access Layer
)
// DynamicTestSuiteWorkflow 是通用的动态测试工作流
// 该工作流根据数据库中的配置动态执行不同类型的测试步骤,支持条件跳转和错误处理
// 输入参数:
// - input: 包含运行ID、复合案例ID和全局参数的动态测试运行输入
//
// 返回值:
// - TestRunOutput: 包含整体成功状态、各类测试结果的输出
// - error: 执行过程中的错误
func DynamicTestSuiteWorkflow ( ctx workflow . Context , input * pb . DynamicTestRunInput ) ( * pb . TestRunOutput , error ) {
// 获取工作流日志记录器,用于记录执行过程
logger := workflow . GetLogger ( ctx )
logger . Info ( "DynamicTestSuiteWorkflow started" , "runID" , input . RunId , "compositeCaseID" , input . CompositeCaseId )
// ========================================================================================
// 步骤1: 加载复合案例步骤定义
// ========================================================================================
// 注意: Temporal Workflow 不能直接访问数据库,必须通过 Activity 来获取数据
// 有两种设计方案:
// 方式一 (推荐): 在启动工作流时,由 Go Client 将完整的步骤数据作为输入参数传入
// 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义
// 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置
var caseSteps [ ] * pb . CompositeCaseStepDefinition // 存储从数据库加载的步骤定义列表
// 执行 LoadCompositeCaseSteps Activity 来获取复合案例的所有步骤定义
// 这个 Activity 会根据 CompositeCaseId 查询数据库并返回步骤配置
err := workflow . ExecuteActivity ( ctx , activities . LoadCompositeCaseSteps , input . CompositeCaseId ) . Get ( ctx , & caseSteps )
if err != nil {
logger . Error ( "Failed to load composite case steps" , "error" , err )
return nil , err
}
// ========================================================================================
// 步骤2: 对步骤进行排序,确保按正确顺序执行
// ========================================================================================
// 按 step_order 字段升序排序,确保步骤按定义的顺序执行
// 这对于 DAG 结构的正确执行至关重要
sort . Slice ( caseSteps , func ( i , j int ) bool {
return caseSteps [ i ] . StepOrder < caseSteps [ j ] . StepOrder
} )
// ========================================================================================
// 步骤3: 初始化执行状态和结果收集器
// ========================================================================================
var (
overallSuccess = true // 整体执行成功标志,任何一个步骤失败都会置为 false
apiResults [ ] * pb . ApiTestResult // 收集所有 API 测试结果
uiResults [ ] * pb . UiTestResult // 收集所有 UI 测试结果
stepResults = make ( map [ string ] bool ) // 存储每个步骤的成功/失败状态,用于条件跳转判断
currentStepOrder = 0 // 当前执行步骤的索引,支持非线性跳转
)
// ========================================================================================
// 步骤4: 主执行循环 - 动态执行各个测试步骤
// ========================================================================================
// 使用 while 循环而非 for range, 因为需要支持条件跳转( 非线性执行)
for currentStepOrder < len ( caseSteps ) {
step := caseSteps [ currentStepOrder ] // 获取当前要执行的步骤
logger . Info ( "Executing step" , "stepOrder" , step . StepOrder , "activityName" , step . ActivityName )
// ------------------------------------------------------------------------------------
// 步骤4.1: 动态构造 Activity 输入参数
// ------------------------------------------------------------------------------------
// 因为不同的 Activity 需要不同类型的输入参数,这里使用工厂模式
// 根据 activity_name 决定使用哪个 Protobuf 结构来反序列化 JSON 参数
var activityInput interface { } // 存储特定 Activity 的输入参数(类型由 activity_name 决定)
var activityResult interface { } // 存储特定 Activity 的输出结果(类型由 activity_name 决定)
// 根据不同的 Activity 类型,将 JSON 字符串参数反序列化为对应的 Protobuf 结构
// 这是动态工作流的核心:同一个工作流可以执行不同类型的测试
switch step . ActivityName {
case "RunApiTest" :
// API 测试:创建 API 测试请求结构并解析参数
apiReq := & pb . ApiTestRequest { }
if err := json . Unmarshal ( [ ] byte ( step . ParametersJson ) , apiReq ) ; err != nil {
logger . Error ( "Failed to unmarshal API test parameters" , "error" , err )
overallSuccess = false
break // 参数解析失败,跳出当前步骤
}
activityInput = apiReq
activityResult = & pb . ApiTestResult { } // 预创建结果容器
case "RunUiTest" :
// UI 测试:创建 UI 测试请求结构并解析参数
uiReq := & pb . UiTestRequest { }
if err := json . Unmarshal ( [ ] byte ( step . ParametersJson ) , uiReq ) ; err != nil {
logger . Error ( "Failed to unmarshal UI test parameters" , "error" , err )
overallSuccess = false
break // 参数解析失败,跳出当前步骤
}
activityInput = uiReq
activityResult = & pb . UiTestResult { } // 预创建结果容器
case "PrepareEnvironment" :
// 环境准备:可以扩展更多测试类型
// TODO: 实现环境准备的参数解析
/ *
prepReq := & pb . PrepareEnvRequest { }
if err := json . Unmarshal ( [ ] byte ( step . ParametersJson ) , prepReq ) ; err != nil {
logger . Error ( "Failed to unmarshal prepare env parameters" , "error" , err )
overallSuccess = false
break
}
activityInput = prepReq
activityResult = & pb . PrepareEnvResult { }
* /
break
default :
// 未知的 Activity 类型,记录错误并标记失败
logger . Error ( "Unknown activity name" , "activityName" , step . ActivityName )
overallSuccess = false
break
}
// 如果参数构造失败,跳过当前步骤
if activityInput == nil {
overallSuccess = false
logger . Error ( "Activity input could not be constructed for step" , "stepOrder" , step . StepOrder )
break
}
// ------------------------------------------------------------------------------------
// 步骤4.2: 配置 Activity 执行选项
// ------------------------------------------------------------------------------------
// 为 Activity 设置超时、重试等策略,确保测试的可靠性
ao := workflow . ActivityOptions {
StartToCloseTimeout : 10 * time . Minute , // 单个测试最长执行时间
HeartbeatTimeout : 30 * time . Second , // 心跳超时,用于检测 Activity 是否还在运行
RetryPolicy : & temporal . RetryPolicy { // 重试策略配置
InitialInterval : time . Second , // 首次重试间隔
MaximumAttempts : 3 , // 最大重试次数
} ,
}
stepCtx := workflow . WithActivityOptions ( ctx , ao ) // 应用 Activity 选项到上下文
// ------------------------------------------------------------------------------------
// 步骤4.3: 动态执行 Activity
// ------------------------------------------------------------------------------------
// 使用反射机制动态调用指定名称的 Activity 函数
// Temporal SDK 会根据 activity_name 找到对应的注册函数并执行
err = workflow . ExecuteActivity ( stepCtx , step . ActivityName , activityInput ) . Get ( stepCtx , activityResult )
// ------------------------------------------------------------------------------------
// 步骤4.4: 处理 Activity 执行结果
// ------------------------------------------------------------------------------------
stepPassed := true // 当前步骤成功标志
if err != nil {
// Activity 执行过程中发生错误(如超时、异常等)
logger . Error ( "Activity execution failed" , "activityName" , step . ActivityName , "error" , err )
stepPassed = false
overallSuccess = false // 标记整个工作流失败
} else {
// Activity 执行成功,需要检查业务逻辑是否成功
// 通过类型断言获取具体的结果并检查 BaseResult.Success 字段
switch res := activityResult . ( type ) {
case * pb . ApiTestResult :
apiResults = append ( apiResults , res ) // 收集 API 测试结果
if ! res . BaseResult . Success { // 检查业务逻辑是否成功
stepPassed = false
overallSuccess = false
}
case * pb . UiTestResult :
uiResults = append ( uiResults , res ) // 收集 UI 测试结果
if ! res . BaseResult . Success { // 检查业务逻辑是否成功
stepPassed = false
overallSuccess = false
}
// 可以在这里添加更多结果类型的处理
}
logger . Info ( "Activity execution finished" , "activityName" , step . ActivityName , "success" , stepPassed )
}
// 记录当前步骤的执行结果,用于后续条件跳转判断
stepResults [ step . StepId ] = stepPassed
// ------------------------------------------------------------------------------------
// 步骤4.5: 根据执行结果确定下一步骤(实现 DAG 的条件跳转)
// ------------------------------------------------------------------------------------
// 默认情况下,顺序执行下一个步骤
nextStep := currentStepOrder + 1
// 根据当前步骤的成功/失败状态和预定义的跳转规则确定下一步
if stepPassed && step . SuccessNextStepOrder != nil {
// 如果当前步骤成功且定义了成功跳转目标,则跳转到指定步骤
nextStep = int ( * step . SuccessNextStepOrder ) - 1 // 转换为 0 索引(数据库中可能是 1 索引)
} else if ! stepPassed && step . FailureNextStepOrder != nil {
// 如果当前步骤失败且定义了失败跳转目标,则跳转到指定步骤
nextStep = int ( * step . FailureNextStepOrder ) - 1 // 转换为 0 索引
}
// 验证跳转目标的有效性,防止数组越界
if nextStep < 0 || nextStep >= len ( caseSteps ) {
logger . Info ( "Next step out of range, terminating workflow" , "nextStep" , nextStep , "totalSteps" , len ( caseSteps ) )
break // 跳转目标无效,退出执行循环
}
// 更新当前步骤索引,继续下一轮循环
currentStepOrder = nextStep
}
// ========================================================================================
// 步骤5: 构造并返回最终执行结果
// ========================================================================================
logger . Info ( "DynamicTestSuiteWorkflow completed" ,
"runID" , input . RunId ,
"overallSuccess" , overallSuccess ,
"apiResultsCount" , len ( apiResults ) ,
"uiResultsCount" , len ( uiResults ) )
// 返回包含所有测试结果和执行状态的输出结构
return & pb . TestRunOutput {
RunId : input . RunId , // 运行标识符
OverallSuccess : overallSuccess , // 整体成功状态
ApiResults : apiResults , // 所有 API 测试结果
UiResults : uiResults , // 所有 UI 测试结果
CompletionMessage : "Dynamic test suite finished." , // 完成消息
} , nil
}