From 6231ef37bf830c938c2b127dcba5dca818ae4ed7 Mon Sep 17 00:00:00 2001 From: longpeng Date: Wed, 25 Jun 2025 15:27:45 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=B4=BB=E5=8A=A8=E5=90=8D?= =?UTF-8?q?=E7=A7=B0=E4=BB=A5=E9=81=B5=E5=BE=AA=E5=91=BD=E5=90=8D=E8=A7=84?= =?UTF-8?q?=E8=8C=83=EF=BC=8C=E5=A2=9E=E5=BC=BAAPI=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E9=AA=8C=E8=AF=81=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=BB=E5=8A=A1=E9=98=9F=E5=88=97=E5=90=8D?= =?UTF-8?q?=E7=A7=B0=E4=BB=A5=E4=BF=9D=E6=8C=81=E4=B8=80=E8=87=B4=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- proto/common_test.proto | 2 +- services/workflow.go | 2 +- workers/python/activities.py | 4 ++-- workflows/dynamic_workflow.go | 39 ++++++++++++++++++++++++++++------- 4 files changed, 35 insertions(+), 12 deletions(-) diff --git a/proto/common_test.proto b/proto/common_test.proto index dfa03c2..598bac2 100644 --- a/proto/common_test.proto +++ b/proto/common_test.proto @@ -22,7 +22,7 @@ message ApiTestRequest { string endpoint = 2; // API路径 string http_method = 3; // "GET", "POST", etc. map headers = 4; - bytes request_body = 5; // JSON或其他二进制数据 + string request_body = 5; // JSON或其他二进制数据 int32 expected_status_code = 6; } diff --git a/services/workflow.go b/services/workflow.go index f7670ba..b6bdcc2 100644 --- a/services/workflow.go +++ b/services/workflow.go @@ -34,7 +34,7 @@ func (s *WorkflowService) Start(compositeCaseId string) error { workflowOptions := client.StartWorkflowOptions{ ID: "test_workflow_" + runID, - TaskQueue: "test-task-queue", // 保持与 Python Worker 一致 + TaskQueue: "data-task-queue", // 保持与 Python Worker 一致 } zap.L().Info("Starting TestRunWorkflow", zap.String("runID", runID)) diff --git a/workers/python/activities.py b/workers/python/activities.py index b6ffc8e..759898a 100644 --- a/workers/python/activities.py +++ b/workers/python/activities.py @@ -42,7 +42,7 @@ class TestActivities: # 心跳发送失败,记录警告但继续尝试 activity.logger.warning(f"Failed to send heartbeat: {e}") - @activity.defn(name="run_api_test") + @activity.defn(name="RunApiTest") async def run_api_test(self, req: pb.ApiTestRequest) -> pb.ApiTestResult: """ 执行API测试的Temporal Activity实现 @@ -100,7 +100,7 @@ class TestActivities: result.base_result.duration_seconds = time.time() - start_time return result - @activity.defn(name="run_ui_test") + @activity.defn(name="RunUiTest") async def run_ui_test(self, req: pb.UiTestRequest) -> pb.UiTestResult: """ 执行UI测试的Temporal Activity实现 diff --git a/workflows/dynamic_workflow.go b/workflows/dynamic_workflow.go index dbb4ec2..3d13402 100644 --- a/workflows/dynamic_workflow.go +++ b/workflows/dynamic_workflow.go @@ -2,9 +2,7 @@ package workflows import ( "encoding/json" - "fmt" "go.temporal.io/sdk/temporal" - "reflect" "sort" "strconv" "time" @@ -38,8 +36,9 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu // 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义 // 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置 ctxActivity := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 10 * time.Minute, // Activity 从开始到完成的最大允许时间 - HeartbeatTimeout: 30 * time.Second, // Heartbeat 超时时间,防止 Worker 假死 + TaskQueue: "data-task-queue", // 指定任务队列名称 + StartToCloseTimeout: 10 * time.Minute, // Activity 从开始到完成的最大允许时间 + HeartbeatTimeout: 30 * time.Second, // Heartbeat 超时时间,防止 Worker 假死 RetryPolicy: &temporal.RetryPolicy{ // Activity 级别的重试策略配置 InitialInterval: time.Second, // 首次重试前的等待时间 BackoffCoefficient: 1.0, // 重试间隔的递增系数 @@ -99,7 +98,30 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu // 这是动态工作流的核心:同一个工作流可以执行不同类型的测试 switch step.ActivityName { case "RunApiTest": - // API 测试:创建 API 测试请求结构并解析参数 + apiReq := &pb.ApiTestRequest{} + // 1. 首先验证JSON格式 + if !json.Valid([]byte(step.ParametersJson)) { + logger.Error("Invalid JSON format in parameters") + overallSuccess = false + break + } + + // 2. 解析JSON时增加错误详情 + if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil { + logger.Error("Failed to unmarshal API test parameters", + "error", err, + "raw_json", step.ParametersJson) + overallSuccess = false + break + } + + // 3. 验证必要字段 + if apiReq.Endpoint == "" || apiReq.HttpMethod == "" { + logger.Error("Missing required fields in API test parameters") + overallSuccess = false + break + } + /*// API 测试:创建 API 测试请求结构并解析参数 logger.Info("Running api test activity", "ParametersJson", step.ParametersJson) fmt.Println(reflect.TypeOf(step.ParametersJson)) apiReq := &pb.ApiTestRequest{} @@ -107,7 +129,7 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu logger.Error("Failed to unmarshal API test parameters", "error", err) overallSuccess = false break // 参数解析失败,跳出当前步骤 - } + }*/ activityInput = apiReq activityResult = &pb.ApiTestResult{} // 预创建结果容器 @@ -156,8 +178,9 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu // ------------------------------------------------------------------------------------ // 为 Activity 设置超时、重试等策略,确保测试的可靠性 ao := workflow.ActivityOptions{ - StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间 - HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行 + TaskQueue: "python-task-queue", // 指定任务队列名称 + StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间 + HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行 RetryPolicy: &temporal.RetryPolicy{ // 重试策略配置 InitialInterval: time.Second, // 首次重试间隔 MaximumAttempts: 3, // 最大重试次数