diff --git a/activities/activities.go b/activities/activities.go index 5d3bd02..9c44f94 100644 --- a/activities/activities.go +++ b/activities/activities.go @@ -1,6 +1,7 @@ package activities import ( + "beacon/pkg/pb" "context" "fmt" "math/rand" @@ -17,3 +18,12 @@ func AddSuffixActivity(ctx context.Context, data string) (string, error) { fmt.Println("Go Activity: Modified data to:", result) return result, nil } + +// LoadCompositeCaseSteps 辅助 Activity 用于加载复合案例步骤定义 (可选,如果 Workflow 输入不包含完整定义) +func LoadCompositeCaseSteps(ctx context.Context, compositeCaseId string) ([]*pb.CompositeCaseStepDefinition, error) { + // 这是一个 Activity,它可以在 Python Worker 或一个 Go Activity Worker 中实现 + // 这个 Activity 负责从数据库加载 composite_case_steps 表中的数据 + // 并将其转换为 Protobuf 列表返回 + // 请确保你在 Python Worker 或另一个 Go Worker 中注册并实现了这个 Activity + return nil, fmt.Errorf("activity not implemented yet") // 仅作示例 +} diff --git a/proto/common_test.proto b/proto/common_test.proto index 0678267..dfa03c2 100644 --- a/proto/common_test.proto +++ b/proto/common_test.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package test_pb; -option go_package = "Beacon/client/gen/pb"; // 替换 your_module_path +option go_package = "Beacon/pkg/pb"; // 替换 your_module_path // Python 生成时,会在 proto_gen 目录下直接生成 common_test_pb2.py, // Python 代码中 import proto_gen.common_test_pb2 即可。 diff --git a/proto/dynamic_test.proto b/proto/dynamic_test.proto new file mode 100644 index 0000000..78baa60 --- /dev/null +++ b/proto/dynamic_test.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; + +package test_pb; + +option go_package = "Beacon/pkg/pb"; // 替换 your_module_path + +message DynamicTestRunInput { + string run_id = 1; + string composite_case_id = 2; // 引用数据库中的复合案例ID + map global_parameters = 3; // 运行时传入的全局参数,例如 environment_url + + // 这里的 steps 会在 Go 服务端启动 Workflow 时,根据数据库查询结果动态构建 + // 也可以不在 Proto 中定义,直接在 Workflow 中根据 DB 数据拉取并使用 + // 但如果希望 Workflow 的输入本身包含整个流程定义,可以定义一个嵌套结构 + // repeated DynamicStep steps = 4; // 也可以通过 DB ID 在 Workflow 中获取 +} + +message DynamicStep { + string step_id = 1; // 对应数据库中的 step_id + string activity_name = 2; // 对应 Activity 函数名 + map parameters_json = 3; // 将 JSONB 转换为 string 或 bytes 传递 + // 对于条件和跳转,Workflow 内部逻辑会根据这些信息进行判断 + // 例如,可以传递一个表示整个流程图的 DAG 结构,或让 Workflow 每次都查询 DB +} + +// 定义一个复合测试用例中的单个步骤 +message CompositeCaseStepDefinition { + string step_id = 1; // 对应数据库中的 step_id + int32 step_order = 2; // 步骤执行顺序 + string step_type = 3; // e.g., "API_TEST", "UI_TEST" + string activity_name = 4; // 对应的 Temporal Activity 函数名 + + // 将参数作为 JSON 字符串传递,在 Workflow 中反序列化 + // 也可以根据 Activity 的具体类型定义一个 oneof 结构,但 JSON 字符串更灵活 + string parameters_json = 5; // JSON 格式的 Activity 参数 + + // 条件跳转 (如果成功/失败,跳转到哪个 step_order) + optional int32 success_next_step_order = 6; // 可选,如果成功则跳转到此步骤的顺序号 + optional int32 failure_next_step_order = 7; // 可选,如果失败则跳转到此步骤的顺序号 + + // 执行条件(例如:{"prev_step_id": "xyz", "status": "success"}) + // Workflow 内部需要解析和评估此 JSON + string run_condition_json = 8; +} \ No newline at end of file diff --git a/workflows/dynamic_workflow.go b/workflows/dynamic_workflow.go new file mode 100644 index 0000000..694fc42 --- /dev/null +++ b/workflows/dynamic_workflow.go @@ -0,0 +1,161 @@ +package workflows + +import ( + "encoding/json" + "go.temporal.io/sdk/temporal" + "sort" + "time" + + "beacon/activities" // 假设你的 activity 包在这个路径 + "beacon/pkg/pb" // 你的 Protobuf 路径 + "go.temporal.io/sdk/workflow" + // 假设你有数据库访问层,Workflow 不能直接访问 DB,需要通过 Activity 或预加载数据 + // "your_module_path/go-server/dal" // 例如 Data Access Layer +) + +// DynamicTestSuiteWorkflow 是通用的工作流,根据配置动态执行测试 +func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInput) (*pb.TestRunOutput, error) { + logger := workflow.GetLogger(ctx) + logger.Info("DynamicTestSuiteWorkflow started", "runID", input.RunId, "compositeCaseID", input.CompositeCaseId) + + // Workflow 不能直接访问数据库。 + // 方式一 (推荐): 在 Workflow 启动时,由 Go Client 将完整的复合案例步骤数据作为 DynamicTestRunInput 的一部分传入。 + // 方式二 (如果数据太大): Workflow 通过一个 "LoadCompositeCaseSteps" Activity 来从 DB 获取数据。 + // 这里假设 input 包含了所有步骤信息,或者我们在 Workflow 开始时通过 Activity 加载了 + + // 假设我们通过 Activity 加载步骤定义 + var caseSteps []*pb.CompositeCaseStepDefinition // 你需要为这个结构定义一个 Protobuf 消息 + err := workflow.ExecuteActivity(ctx, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps) + if err != nil { + logger.Error("Failed to load composite case steps", "error", err) + return nil, err + } + + // 按 step_order 排序 + sort.Slice(caseSteps, func(i, j int) bool { + return caseSteps[i].StepOrder < caseSteps[j].StepOrder + }) + + var ( + overallSuccess = true + apiResults []*pb.ApiTestResult + uiResults []*pb.UiTestResult + stepResults = make(map[string]bool) // 存储每个步骤的成功状态 + currentStepOrder = 0 + ) + + // 循环执行步骤 + for currentStepOrder < len(caseSteps) { + step := caseSteps[currentStepOrder] + logger.Info("Executing step", "stepOrder", step.StepOrder, "activityName", step.ActivityName) + + // 动态解析 Activity 参数 + var activityInput interface{} // 使用 interface{} 来存储 Protobuf Activity Input + var activityResult interface{} // 使用 interface{} 来存储 Protobuf Activity Output + + // 将 JSON 参数字符串反序列化到对应 Activity 的 Protobuf 结构 + // 这是最复杂的部分,需要一个映射表或工厂函数来根据 activity_name 决定用哪个 Protobuf 结构 + // 例如: + switch step.ActivityName { + case "RunApiTest": + apiReq := &pb.ApiTestRequest{} + if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil { + logger.Error("Failed to unmarshal API test parameters", "error", err) + overallSuccess = false + break // 跳出当前步骤 + } + activityInput = apiReq + activityResult = &pb.ApiTestResult{} + case "RunUiTest": + uiReq := &pb.UiTestRequest{} + if err := json.Unmarshal([]byte(step.ParametersJson), uiReq); err != nil { + logger.Error("Failed to unmarshal UI test parameters", "error", err) + overallSuccess = false + break // 跳出当前步骤 + } + activityInput = uiReq + activityResult = &pb.UiTestResult{} + case "PrepareEnvironment": + // ... + /*activityInput = &pb.PrepareEnvRequest{} // 假设有这个 + activityResult = &pb.PrepareEnvResult{}*/ + break + default: + logger.Error("Unknown activity name", "activityName", step.ActivityName) + overallSuccess = false + break + } + + if activityInput == nil { + overallSuccess = false + logger.Error("Activity input could not be constructed for step", "stepOrder", step.StepOrder) + break + } + + // 设置 Activity Options + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Minute, + HeartbeatTimeout: 30 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + MaximumAttempts: 3, + }, + } + stepCtx := workflow.WithActivityOptions(ctx, ao) + + // 动态执行 Activity + err = workflow.ExecuteActivity(stepCtx, step.ActivityName, activityInput).Get(stepCtx, activityResult) + + stepPassed := true + if err != nil { + logger.Error("Activity execution failed", "activityName", step.ActivityName, "error", err) + stepPassed = false + overallSuccess = false // 标记整个流程失败 + } else { + // 根据 activityResult 提取 BaseTestResult.Success + // 这里需要类型断言来获取具体的结果 + switch res := activityResult.(type) { + case *pb.ApiTestResult: + apiResults = append(apiResults, res) + if !res.BaseResult.Success { + stepPassed = false + overallSuccess = false + } + case *pb.UiTestResult: + uiResults = append(uiResults, res) + if !res.BaseResult.Success { + stepPassed = false + overallSuccess = false + } + } + logger.Info("Activity execution finished", "activityName", step.ActivityName, "success", stepPassed) + } + + stepResults[step.StepId] = stepPassed + + // 根据结果和条件确定下一步 + nextStep := currentStepOrder + 1 // 默认顺序执行 + + if stepPassed && step.SuccessNextStepOrder != nil { + nextStep = int(*step.SuccessNextStepOrder) - 1 // Adjust for 0-indexed slice + } else if !stepPassed && step.FailureNextStepOrder != nil { + nextStep = int(*step.FailureNextStepOrder) - 1 + } + + // 确保跳转的 nextStep 索引有效 + if nextStep < 0 || nextStep >= len(caseSteps) { + break // 超出范围,退出循环 + } + + currentStepOrder = nextStep + } + + // 最终返回结果 + return &pb.TestRunOutput{ + RunId: input.RunId, + OverallSuccess: overallSuccess, + ApiResults: apiResults, + UiResults: uiResults, + CompletionMessage: "Dynamic test suite finished.", + }, nil +}