Beacon/workflows/dynamic_workflow.go
2025-06-24 22:10:31 +08:00

162 lines
5.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package workflows
import (
"encoding/json"
"go.temporal.io/sdk/temporal"
"sort"
"time"
"beacon/activities" // 假设你的 activity 包在这个路径
"beacon/pkg/pb" // 你的 Protobuf 路径
"go.temporal.io/sdk/workflow"
// 假设你有数据库访问层Workflow 不能直接访问 DB需要通过 Activity 或预加载数据
// "your_module_path/go-server/dal" // 例如 Data Access Layer
)
// DynamicTestSuiteWorkflow 是通用的工作流,根据配置动态执行测试
func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInput) (*pb.TestRunOutput, error) {
logger := workflow.GetLogger(ctx)
logger.Info("DynamicTestSuiteWorkflow started", "runID", input.RunId, "compositeCaseID", input.CompositeCaseId)
// Workflow 不能直接访问数据库。
// 方式一 (推荐): 在 Workflow 启动时,由 Go Client 将完整的复合案例步骤数据作为 DynamicTestRunInput 的一部分传入。
// 方式二 (如果数据太大): Workflow 通过一个 "LoadCompositeCaseSteps" Activity 来从 DB 获取数据。
// 这里假设 input 包含了所有步骤信息,或者我们在 Workflow 开始时通过 Activity 加载了
// 假设我们通过 Activity 加载步骤定义
var caseSteps []*pb.CompositeCaseStepDefinition // 你需要为这个结构定义一个 Protobuf 消息
err := workflow.ExecuteActivity(ctx, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps)
if err != nil {
logger.Error("Failed to load composite case steps", "error", err)
return nil, err
}
// 按 step_order 排序
sort.Slice(caseSteps, func(i, j int) bool {
return caseSteps[i].StepOrder < caseSteps[j].StepOrder
})
var (
overallSuccess = true
apiResults []*pb.ApiTestResult
uiResults []*pb.UiTestResult
stepResults = make(map[string]bool) // 存储每个步骤的成功状态
currentStepOrder = 0
)
// 循环执行步骤
for currentStepOrder < len(caseSteps) {
step := caseSteps[currentStepOrder]
logger.Info("Executing step", "stepOrder", step.StepOrder, "activityName", step.ActivityName)
// 动态解析 Activity 参数
var activityInput interface{} // 使用 interface{} 来存储 Protobuf Activity Input
var activityResult interface{} // 使用 interface{} 来存储 Protobuf Activity Output
// 将 JSON 参数字符串反序列化到对应 Activity 的 Protobuf 结构
// 这是最复杂的部分,需要一个映射表或工厂函数来根据 activity_name 决定用哪个 Protobuf 结构
// 例如:
switch step.ActivityName {
case "RunApiTest":
apiReq := &pb.ApiTestRequest{}
if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil {
logger.Error("Failed to unmarshal API test parameters", "error", err)
overallSuccess = false
break // 跳出当前步骤
}
activityInput = apiReq
activityResult = &pb.ApiTestResult{}
case "RunUiTest":
uiReq := &pb.UiTestRequest{}
if err := json.Unmarshal([]byte(step.ParametersJson), uiReq); err != nil {
logger.Error("Failed to unmarshal UI test parameters", "error", err)
overallSuccess = false
break // 跳出当前步骤
}
activityInput = uiReq
activityResult = &pb.UiTestResult{}
case "PrepareEnvironment":
// ...
/*activityInput = &pb.PrepareEnvRequest{} // 假设有这个
activityResult = &pb.PrepareEnvResult{}*/
break
default:
logger.Error("Unknown activity name", "activityName", step.ActivityName)
overallSuccess = false
break
}
if activityInput == nil {
overallSuccess = false
logger.Error("Activity input could not be constructed for step", "stepOrder", step.StepOrder)
break
}
// 设置 Activity Options
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute,
HeartbeatTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
MaximumAttempts: 3,
},
}
stepCtx := workflow.WithActivityOptions(ctx, ao)
// 动态执行 Activity
err = workflow.ExecuteActivity(stepCtx, step.ActivityName, activityInput).Get(stepCtx, activityResult)
stepPassed := true
if err != nil {
logger.Error("Activity execution failed", "activityName", step.ActivityName, "error", err)
stepPassed = false
overallSuccess = false // 标记整个流程失败
} else {
// 根据 activityResult 提取 BaseTestResult.Success
// 这里需要类型断言来获取具体的结果
switch res := activityResult.(type) {
case *pb.ApiTestResult:
apiResults = append(apiResults, res)
if !res.BaseResult.Success {
stepPassed = false
overallSuccess = false
}
case *pb.UiTestResult:
uiResults = append(uiResults, res)
if !res.BaseResult.Success {
stepPassed = false
overallSuccess = false
}
}
logger.Info("Activity execution finished", "activityName", step.ActivityName, "success", stepPassed)
}
stepResults[step.StepId] = stepPassed
// 根据结果和条件确定下一步
nextStep := currentStepOrder + 1 // 默认顺序执行
if stepPassed && step.SuccessNextStepOrder != nil {
nextStep = int(*step.SuccessNextStepOrder) - 1 // Adjust for 0-indexed slice
} else if !stepPassed && step.FailureNextStepOrder != nil {
nextStep = int(*step.FailureNextStepOrder) - 1
}
// 确保跳转的 nextStep 索引有效
if nextStep < 0 || nextStep >= len(caseSteps) {
break // 超出范围,退出循环
}
currentStepOrder = nextStep
}
// 最终返回结果
return &pb.TestRunOutput{
RunId: input.RunId,
OverallSuccess: overallSuccess,
ApiResults: apiResults,
UiResults: uiResults,
CompletionMessage: "Dynamic test suite finished.",
}, nil
}