diff --git a/proto/common_test.proto b/proto/common_test.proto index dfa03c2..598bac2 100644 --- a/proto/common_test.proto +++ b/proto/common_test.proto @@ -22,7 +22,7 @@ message ApiTestRequest { string endpoint = 2; // API路径 string http_method = 3; // "GET", "POST", etc. map headers = 4; - bytes request_body = 5; // JSON或其他二进制数据 + string request_body = 5; // JSON或其他二进制数据 int32 expected_status_code = 6; } diff --git a/services/workflow.go b/services/workflow.go index f7670ba..b6bdcc2 100644 --- a/services/workflow.go +++ b/services/workflow.go @@ -34,7 +34,7 @@ func (s *WorkflowService) Start(compositeCaseId string) error { workflowOptions := client.StartWorkflowOptions{ ID: "test_workflow_" + runID, - TaskQueue: "test-task-queue", // 保持与 Python Worker 一致 + TaskQueue: "data-task-queue", // 保持与 Python Worker 一致 } zap.L().Info("Starting TestRunWorkflow", zap.String("runID", runID)) diff --git a/workers/python/activities.py b/workers/python/activities.py index b6ffc8e..759898a 100644 --- a/workers/python/activities.py +++ b/workers/python/activities.py @@ -42,7 +42,7 @@ class TestActivities: # 心跳发送失败,记录警告但继续尝试 activity.logger.warning(f"Failed to send heartbeat: {e}") - @activity.defn(name="run_api_test") + @activity.defn(name="RunApiTest") async def run_api_test(self, req: pb.ApiTestRequest) -> pb.ApiTestResult: """ 执行API测试的Temporal Activity实现 @@ -100,7 +100,7 @@ class TestActivities: result.base_result.duration_seconds = time.time() - start_time return result - @activity.defn(name="run_ui_test") + @activity.defn(name="RunUiTest") async def run_ui_test(self, req: pb.UiTestRequest) -> pb.UiTestResult: """ 执行UI测试的Temporal Activity实现 diff --git a/workflows/dynamic_workflow.go b/workflows/dynamic_workflow.go index dbb4ec2..3d13402 100644 --- a/workflows/dynamic_workflow.go +++ b/workflows/dynamic_workflow.go @@ -2,9 +2,7 @@ package workflows import ( "encoding/json" - "fmt" "go.temporal.io/sdk/temporal" - "reflect" "sort" "strconv" "time" @@ -38,8 +36,9 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu // 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义 // 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置 ctxActivity := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 10 * time.Minute, // Activity 从开始到完成的最大允许时间 - HeartbeatTimeout: 30 * time.Second, // Heartbeat 超时时间,防止 Worker 假死 + TaskQueue: "data-task-queue", // 指定任务队列名称 + StartToCloseTimeout: 10 * time.Minute, // Activity 从开始到完成的最大允许时间 + HeartbeatTimeout: 30 * time.Second, // Heartbeat 超时时间,防止 Worker 假死 RetryPolicy: &temporal.RetryPolicy{ // Activity 级别的重试策略配置 InitialInterval: time.Second, // 首次重试前的等待时间 BackoffCoefficient: 1.0, // 重试间隔的递增系数 @@ -99,7 +98,30 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu // 这是动态工作流的核心:同一个工作流可以执行不同类型的测试 switch step.ActivityName { case "RunApiTest": - // API 测试:创建 API 测试请求结构并解析参数 + apiReq := &pb.ApiTestRequest{} + // 1. 首先验证JSON格式 + if !json.Valid([]byte(step.ParametersJson)) { + logger.Error("Invalid JSON format in parameters") + overallSuccess = false + break + } + + // 2. 解析JSON时增加错误详情 + if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil { + logger.Error("Failed to unmarshal API test parameters", + "error", err, + "raw_json", step.ParametersJson) + overallSuccess = false + break + } + + // 3. 验证必要字段 + if apiReq.Endpoint == "" || apiReq.HttpMethod == "" { + logger.Error("Missing required fields in API test parameters") + overallSuccess = false + break + } + /*// API 测试:创建 API 测试请求结构并解析参数 logger.Info("Running api test activity", "ParametersJson", step.ParametersJson) fmt.Println(reflect.TypeOf(step.ParametersJson)) apiReq := &pb.ApiTestRequest{} @@ -107,7 +129,7 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu logger.Error("Failed to unmarshal API test parameters", "error", err) overallSuccess = false break // 参数解析失败,跳出当前步骤 - } + }*/ activityInput = apiReq activityResult = &pb.ApiTestResult{} // 预创建结果容器 @@ -156,8 +178,9 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu // ------------------------------------------------------------------------------------ // 为 Activity 设置超时、重试等策略,确保测试的可靠性 ao := workflow.ActivityOptions{ - StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间 - HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行 + TaskQueue: "python-task-queue", // 指定任务队列名称 + StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间 + HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行 RetryPolicy: &temporal.RetryPolicy{ // 重试策略配置 InitialInterval: time.Second, // 首次重试间隔 MaximumAttempts: 3, // 最大重试次数