Compare commits
No commits in common. "71eb131e581ec511cbd0e544427c60d8429f10c5" and "950d73ade44762b2bdce9adea6923a295c1d14e6" have entirely different histories.
71eb131e58
...
950d73ade4
@ -1,7 +1,6 @@
|
|||||||
package activities
|
package activities
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"beacon/pkg/pb"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
@ -18,12 +17,3 @@ func AddSuffixActivity(ctx context.Context, data string) (string, error) {
|
|||||||
fmt.Println("Go Activity: Modified data to:", result)
|
fmt.Println("Go Activity: Modified data to:", result)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadCompositeCaseSteps 辅助 Activity 用于加载复合案例步骤定义 (可选,如果 Workflow 输入不包含完整定义)
|
|
||||||
func LoadCompositeCaseSteps(ctx context.Context, compositeCaseId string) ([]*pb.CompositeCaseStepDefinition, error) {
|
|
||||||
// 这是一个 Activity,它可以在 Python Worker 或一个 Go Activity Worker 中实现
|
|
||||||
// 这个 Activity 负责从数据库加载 composite_case_steps 表中的数据
|
|
||||||
// 并将其转换为 Protobuf 列表返回
|
|
||||||
// 请确保你在 Python Worker 或另一个 Go Worker 中注册并实现了这个 Activity
|
|
||||||
return nil, fmt.Errorf("activity not implemented yet") // 仅作示例
|
|
||||||
}
|
|
||||||
|
@ -2,7 +2,7 @@ syntax = "proto3";
|
|||||||
|
|
||||||
package test_pb;
|
package test_pb;
|
||||||
|
|
||||||
option go_package = "Beacon/pkg/pb"; // 替换 your_module_path
|
option go_package = "Beacon/client/gen/pb"; // 替换 your_module_path
|
||||||
// Python 生成时,会在 proto_gen 目录下直接生成 common_test_pb2.py,
|
// Python 生成时,会在 proto_gen 目录下直接生成 common_test_pb2.py,
|
||||||
// Python 代码中 import proto_gen.common_test_pb2 即可。
|
// Python 代码中 import proto_gen.common_test_pb2 即可。
|
||||||
|
|
||||||
|
@ -1,44 +0,0 @@
|
|||||||
syntax = "proto3";
|
|
||||||
|
|
||||||
package test_pb;
|
|
||||||
|
|
||||||
option go_package = "Beacon/pkg/pb"; // 替换 your_module_path
|
|
||||||
|
|
||||||
message DynamicTestRunInput {
|
|
||||||
string run_id = 1;
|
|
||||||
string composite_case_id = 2; // 引用数据库中的复合案例ID
|
|
||||||
map<string, string> global_parameters = 3; // 运行时传入的全局参数,例如 environment_url
|
|
||||||
|
|
||||||
// 这里的 steps 会在 Go 服务端启动 Workflow 时,根据数据库查询结果动态构建
|
|
||||||
// 也可以不在 Proto 中定义,直接在 Workflow 中根据 DB 数据拉取并使用
|
|
||||||
// 但如果希望 Workflow 的输入本身包含整个流程定义,可以定义一个嵌套结构
|
|
||||||
repeated DynamicStep steps = 4; // 也可以通过 DB ID 在 Workflow 中获取
|
|
||||||
}
|
|
||||||
|
|
||||||
message DynamicStep {
|
|
||||||
string step_id = 1; // 对应数据库中的 step_id
|
|
||||||
string activity_name = 2; // 对应 Activity 函数名
|
|
||||||
map<string, string> parameters_json = 3; // 将 JSONB 转换为 string 或 bytes 传递
|
|
||||||
// 对于条件和跳转,Workflow 内部逻辑会根据这些信息进行判断
|
|
||||||
// 例如,可以传递一个表示整个流程图的 DAG 结构,或让 Workflow 每次都查询 DB
|
|
||||||
}
|
|
||||||
|
|
||||||
// 定义一个复合测试用例中的单个步骤
|
|
||||||
message CompositeCaseStepDefinition {
|
|
||||||
string step_id = 1; // 对应数据库中的 step_id
|
|
||||||
int32 step_order = 2; // 步骤执行顺序
|
|
||||||
string step_type = 3; // e.g., "API_TEST", "UI_TEST"
|
|
||||||
string activity_name = 4; // 对应的 Temporal Activity 函数名
|
|
||||||
|
|
||||||
// 将参数作为 JSON 字符串传递,在 Workflow 中反序列化
|
|
||||||
// 也可以根据 Activity 的具体类型定义一个 oneof 结构,但 JSON 字符串更灵活
|
|
||||||
string parameters_json = 5; // JSON 格式的 Activity 参数
|
|
||||||
|
|
||||||
// 条件跳转 (如果成功/失败,跳转到哪个 step_order)
|
|
||||||
optional int32 success_next_step_order = 6; // 可选,如果成功则跳转到此步骤的顺序号
|
|
||||||
optional int32 failure_next_step_order = 7; // 可选,如果失败则跳转到此步骤的顺序号
|
|
||||||
|
|
||||||
// 执行条件(例如:{"prev_step_id": "xyz", "status": "success"})
|
|
||||||
// Workflow 内部需要解析和评估此 JSON
|
|
||||||
string run_condition_json = 8;
|
|
||||||
}
|
|
@ -1,235 +0,0 @@
|
|||||||
package workflows
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"go.temporal.io/sdk/temporal"
|
|
||||||
"sort"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"beacon/activities" // 假设你的 activity 包在这个路径
|
|
||||||
"beacon/pkg/pb" // 你的 Protobuf 路径
|
|
||||||
"go.temporal.io/sdk/workflow"
|
|
||||||
// 假设你有数据库访问层,Workflow 不能直接访问 DB,需要通过 Activity 或预加载数据
|
|
||||||
// "your_module_path/go-server/dal" // 例如 Data Access Layer
|
|
||||||
)
|
|
||||||
|
|
||||||
// DynamicTestSuiteWorkflow 是通用的动态测试工作流
|
|
||||||
// 该工作流根据数据库中的配置动态执行不同类型的测试步骤,支持条件跳转和错误处理
|
|
||||||
// 输入参数:
|
|
||||||
// - input: 包含运行ID、复合案例ID和全局参数的动态测试运行输入
|
|
||||||
//
|
|
||||||
// 返回值:
|
|
||||||
// - TestRunOutput: 包含整体成功状态、各类测试结果的输出
|
|
||||||
// - error: 执行过程中的错误
|
|
||||||
func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInput) (*pb.TestRunOutput, error) {
|
|
||||||
// 获取工作流日志记录器,用于记录执行过程
|
|
||||||
logger := workflow.GetLogger(ctx)
|
|
||||||
logger.Info("DynamicTestSuiteWorkflow started", "runID", input.RunId, "compositeCaseID", input.CompositeCaseId)
|
|
||||||
|
|
||||||
// ========================================================================================
|
|
||||||
// 步骤1: 加载复合案例步骤定义
|
|
||||||
// ========================================================================================
|
|
||||||
// 注意: Temporal Workflow 不能直接访问数据库,必须通过 Activity 来获取数据
|
|
||||||
// 有两种设计方案:
|
|
||||||
// 方式一 (推荐): 在启动工作流时,由 Go Client 将完整的步骤数据作为输入参数传入
|
|
||||||
// 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义
|
|
||||||
// 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置
|
|
||||||
|
|
||||||
var caseSteps []*pb.CompositeCaseStepDefinition // 存储从数据库加载的步骤定义列表
|
|
||||||
|
|
||||||
// 执行 LoadCompositeCaseSteps Activity 来获取复合案例的所有步骤定义
|
|
||||||
// 这个 Activity 会根据 CompositeCaseId 查询数据库并返回步骤配置
|
|
||||||
err := workflow.ExecuteActivity(ctx, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Failed to load composite case steps", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// ========================================================================================
|
|
||||||
// 步骤2: 对步骤进行排序,确保按正确顺序执行
|
|
||||||
// ========================================================================================
|
|
||||||
// 按 step_order 字段升序排序,确保步骤按定义的顺序执行
|
|
||||||
// 这对于 DAG 结构的正确执行至关重要
|
|
||||||
sort.Slice(caseSteps, func(i, j int) bool {
|
|
||||||
return caseSteps[i].StepOrder < caseSteps[j].StepOrder
|
|
||||||
})
|
|
||||||
|
|
||||||
// ========================================================================================
|
|
||||||
// 步骤3: 初始化执行状态和结果收集器
|
|
||||||
// ========================================================================================
|
|
||||||
var (
|
|
||||||
overallSuccess = true // 整体执行成功标志,任何一个步骤失败都会置为 false
|
|
||||||
apiResults []*pb.ApiTestResult // 收集所有 API 测试结果
|
|
||||||
uiResults []*pb.UiTestResult // 收集所有 UI 测试结果
|
|
||||||
stepResults = make(map[string]bool) // 存储每个步骤的成功/失败状态,用于条件跳转判断
|
|
||||||
currentStepOrder = 0 // 当前执行步骤的索引,支持非线性跳转
|
|
||||||
)
|
|
||||||
|
|
||||||
// ========================================================================================
|
|
||||||
// 步骤4: 主执行循环 - 动态执行各个测试步骤
|
|
||||||
// ========================================================================================
|
|
||||||
// 使用 while 循环而非 for range,因为需要支持条件跳转(非线性执行)
|
|
||||||
for currentStepOrder < len(caseSteps) {
|
|
||||||
step := caseSteps[currentStepOrder] // 获取当前要执行的步骤
|
|
||||||
logger.Info("Executing step", "stepOrder", step.StepOrder, "activityName", step.ActivityName)
|
|
||||||
|
|
||||||
// ------------------------------------------------------------------------------------
|
|
||||||
// 步骤4.1: 动态构造 Activity 输入参数
|
|
||||||
// ------------------------------------------------------------------------------------
|
|
||||||
// 因为不同的 Activity 需要不同类型的输入参数,这里使用工厂模式
|
|
||||||
// 根据 activity_name 决定使用哪个 Protobuf 结构来反序列化 JSON 参数
|
|
||||||
var activityInput interface{} // 存储特定 Activity 的输入参数(类型由 activity_name 决定)
|
|
||||||
var activityResult interface{} // 存储特定 Activity 的输出结果(类型由 activity_name 决定)
|
|
||||||
|
|
||||||
// 根据不同的 Activity 类型,将 JSON 字符串参数反序列化为对应的 Protobuf 结构
|
|
||||||
// 这是动态工作流的核心:同一个工作流可以执行不同类型的测试
|
|
||||||
switch step.ActivityName {
|
|
||||||
case "RunApiTest":
|
|
||||||
// API 测试:创建 API 测试请求结构并解析参数
|
|
||||||
apiReq := &pb.ApiTestRequest{}
|
|
||||||
if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil {
|
|
||||||
logger.Error("Failed to unmarshal API test parameters", "error", err)
|
|
||||||
overallSuccess = false
|
|
||||||
break // 参数解析失败,跳出当前步骤
|
|
||||||
}
|
|
||||||
activityInput = apiReq
|
|
||||||
activityResult = &pb.ApiTestResult{} // 预创建结果容器
|
|
||||||
|
|
||||||
case "RunUiTest":
|
|
||||||
// UI 测试:创建 UI 测试请求结构并解析参数
|
|
||||||
uiReq := &pb.UiTestRequest{}
|
|
||||||
if err := json.Unmarshal([]byte(step.ParametersJson), uiReq); err != nil {
|
|
||||||
logger.Error("Failed to unmarshal UI test parameters", "error", err)
|
|
||||||
overallSuccess = false
|
|
||||||
break // 参数解析失败,跳出当前步骤
|
|
||||||
}
|
|
||||||
activityInput = uiReq
|
|
||||||
activityResult = &pb.UiTestResult{} // 预创建结果容器
|
|
||||||
|
|
||||||
case "PrepareEnvironment":
|
|
||||||
// 环境准备:可以扩展更多测试类型
|
|
||||||
// TODO: 实现环境准备的参数解析
|
|
||||||
/*
|
|
||||||
prepReq := &pb.PrepareEnvRequest{}
|
|
||||||
if err := json.Unmarshal([]byte(step.ParametersJson), prepReq); err != nil {
|
|
||||||
logger.Error("Failed to unmarshal prepare env parameters", "error", err)
|
|
||||||
overallSuccess = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
activityInput = prepReq
|
|
||||||
activityResult = &pb.PrepareEnvResult{}
|
|
||||||
*/
|
|
||||||
break
|
|
||||||
|
|
||||||
default:
|
|
||||||
// 未知的 Activity 类型,记录错误并标记失败
|
|
||||||
logger.Error("Unknown activity name", "activityName", step.ActivityName)
|
|
||||||
overallSuccess = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// 如果参数构造失败,跳过当前步骤
|
|
||||||
if activityInput == nil {
|
|
||||||
overallSuccess = false
|
|
||||||
logger.Error("Activity input could not be constructed for step", "stepOrder", step.StepOrder)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// ------------------------------------------------------------------------------------
|
|
||||||
// 步骤4.2: 配置 Activity 执行选项
|
|
||||||
// ------------------------------------------------------------------------------------
|
|
||||||
// 为 Activity 设置超时、重试等策略,确保测试的可靠性
|
|
||||||
ao := workflow.ActivityOptions{
|
|
||||||
StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间
|
|
||||||
HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行
|
|
||||||
RetryPolicy: &temporal.RetryPolicy{ // 重试策略配置
|
|
||||||
InitialInterval: time.Second, // 首次重试间隔
|
|
||||||
MaximumAttempts: 3, // 最大重试次数
|
|
||||||
},
|
|
||||||
}
|
|
||||||
stepCtx := workflow.WithActivityOptions(ctx, ao) // 应用 Activity 选项到上下文
|
|
||||||
|
|
||||||
// ------------------------------------------------------------------------------------
|
|
||||||
// 步骤4.3: 动态执行 Activity
|
|
||||||
// ------------------------------------------------------------------------------------
|
|
||||||
// 使用反射机制动态调用指定名称的 Activity 函数
|
|
||||||
// Temporal SDK 会根据 activity_name 找到对应的注册函数并执行
|
|
||||||
err = workflow.ExecuteActivity(stepCtx, step.ActivityName, activityInput).Get(stepCtx, activityResult)
|
|
||||||
|
|
||||||
// ------------------------------------------------------------------------------------
|
|
||||||
// 步骤4.4: 处理 Activity 执行结果
|
|
||||||
// ------------------------------------------------------------------------------------
|
|
||||||
stepPassed := true // 当前步骤成功标志
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
// Activity 执行过程中发生错误(如超时、异常等)
|
|
||||||
logger.Error("Activity execution failed", "activityName", step.ActivityName, "error", err)
|
|
||||||
stepPassed = false
|
|
||||||
overallSuccess = false // 标记整个工作流失败
|
|
||||||
} else {
|
|
||||||
// Activity 执行成功,需要检查业务逻辑是否成功
|
|
||||||
// 通过类型断言获取具体的结果并检查 BaseResult.Success 字段
|
|
||||||
switch res := activityResult.(type) {
|
|
||||||
case *pb.ApiTestResult:
|
|
||||||
apiResults = append(apiResults, res) // 收集 API 测试结果
|
|
||||||
if !res.BaseResult.Success { // 检查业务逻辑是否成功
|
|
||||||
stepPassed = false
|
|
||||||
overallSuccess = false
|
|
||||||
}
|
|
||||||
case *pb.UiTestResult:
|
|
||||||
uiResults = append(uiResults, res) // 收集 UI 测试结果
|
|
||||||
if !res.BaseResult.Success { // 检查业务逻辑是否成功
|
|
||||||
stepPassed = false
|
|
||||||
overallSuccess = false
|
|
||||||
}
|
|
||||||
// 可以在这里添加更多结果类型的处理
|
|
||||||
}
|
|
||||||
logger.Info("Activity execution finished", "activityName", step.ActivityName, "success", stepPassed)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 记录当前步骤的执行结果,用于后续条件跳转判断
|
|
||||||
stepResults[step.StepId] = stepPassed
|
|
||||||
|
|
||||||
// ------------------------------------------------------------------------------------
|
|
||||||
// 步骤4.5: 根据执行结果确定下一步骤(实现 DAG 的条件跳转)
|
|
||||||
// ------------------------------------------------------------------------------------
|
|
||||||
// 默认情况下,顺序执行下一个步骤
|
|
||||||
nextStep := currentStepOrder + 1
|
|
||||||
|
|
||||||
// 根据当前步骤的成功/失败状态和预定义的跳转规则确定下一步
|
|
||||||
if stepPassed && step.SuccessNextStepOrder != nil {
|
|
||||||
// 如果当前步骤成功且定义了成功跳转目标,则跳转到指定步骤
|
|
||||||
nextStep = int(*step.SuccessNextStepOrder) - 1 // 转换为 0 索引(数据库中可能是 1 索引)
|
|
||||||
} else if !stepPassed && step.FailureNextStepOrder != nil {
|
|
||||||
// 如果当前步骤失败且定义了失败跳转目标,则跳转到指定步骤
|
|
||||||
nextStep = int(*step.FailureNextStepOrder) - 1 // 转换为 0 索引
|
|
||||||
}
|
|
||||||
|
|
||||||
// 验证跳转目标的有效性,防止数组越界
|
|
||||||
if nextStep < 0 || nextStep >= len(caseSteps) {
|
|
||||||
logger.Info("Next step out of range, terminating workflow", "nextStep", nextStep, "totalSteps", len(caseSteps))
|
|
||||||
break // 跳转目标无效,退出执行循环
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新当前步骤索引,继续下一轮循环
|
|
||||||
currentStepOrder = nextStep
|
|
||||||
}
|
|
||||||
|
|
||||||
// ========================================================================================
|
|
||||||
// 步骤5: 构造并返回最终执行结果
|
|
||||||
// ========================================================================================
|
|
||||||
logger.Info("DynamicTestSuiteWorkflow completed",
|
|
||||||
"runID", input.RunId,
|
|
||||||
"overallSuccess", overallSuccess,
|
|
||||||
"apiResultsCount", len(apiResults),
|
|
||||||
"uiResultsCount", len(uiResults))
|
|
||||||
|
|
||||||
// 返回包含所有测试结果和执行状态的输出结构
|
|
||||||
return &pb.TestRunOutput{
|
|
||||||
RunId: input.RunId, // 运行标识符
|
|
||||||
OverallSuccess: overallSuccess, // 整体成功状态
|
|
||||||
ApiResults: apiResults, // 所有 API 测试结果
|
|
||||||
UiResults: uiResults, // 所有 UI 测试结果
|
|
||||||
CompletionMessage: "Dynamic test suite finished.", // 完成消息
|
|
||||||
}, nil
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user