更新活动名称以遵循命名规范,增强API测试参数验证逻辑,调整任务队列名称以保持一致性
This commit is contained in:
parent
a299cf384c
commit
6231ef37bf
@ -22,7 +22,7 @@ message ApiTestRequest {
|
|||||||
string endpoint = 2; // API路径
|
string endpoint = 2; // API路径
|
||||||
string http_method = 3; // "GET", "POST", etc.
|
string http_method = 3; // "GET", "POST", etc.
|
||||||
map<string, string> headers = 4;
|
map<string, string> headers = 4;
|
||||||
bytes request_body = 5; // JSON或其他二进制数据
|
string request_body = 5; // JSON或其他二进制数据
|
||||||
int32 expected_status_code = 6;
|
int32 expected_status_code = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ func (s *WorkflowService) Start(compositeCaseId string) error {
|
|||||||
|
|
||||||
workflowOptions := client.StartWorkflowOptions{
|
workflowOptions := client.StartWorkflowOptions{
|
||||||
ID: "test_workflow_" + runID,
|
ID: "test_workflow_" + runID,
|
||||||
TaskQueue: "test-task-queue", // 保持与 Python Worker 一致
|
TaskQueue: "data-task-queue", // 保持与 Python Worker 一致
|
||||||
}
|
}
|
||||||
|
|
||||||
zap.L().Info("Starting TestRunWorkflow", zap.String("runID", runID))
|
zap.L().Info("Starting TestRunWorkflow", zap.String("runID", runID))
|
||||||
|
@ -42,7 +42,7 @@ class TestActivities:
|
|||||||
# 心跳发送失败,记录警告但继续尝试
|
# 心跳发送失败,记录警告但继续尝试
|
||||||
activity.logger.warning(f"Failed to send heartbeat: {e}")
|
activity.logger.warning(f"Failed to send heartbeat: {e}")
|
||||||
|
|
||||||
@activity.defn(name="run_api_test")
|
@activity.defn(name="RunApiTest")
|
||||||
async def run_api_test(self, req: pb.ApiTestRequest) -> pb.ApiTestResult:
|
async def run_api_test(self, req: pb.ApiTestRequest) -> pb.ApiTestResult:
|
||||||
"""
|
"""
|
||||||
执行API测试的Temporal Activity实现
|
执行API测试的Temporal Activity实现
|
||||||
@ -100,7 +100,7 @@ class TestActivities:
|
|||||||
result.base_result.duration_seconds = time.time() - start_time
|
result.base_result.duration_seconds = time.time() - start_time
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@activity.defn(name="run_ui_test")
|
@activity.defn(name="RunUiTest")
|
||||||
async def run_ui_test(self, req: pb.UiTestRequest) -> pb.UiTestResult:
|
async def run_ui_test(self, req: pb.UiTestRequest) -> pb.UiTestResult:
|
||||||
"""
|
"""
|
||||||
执行UI测试的Temporal Activity实现
|
执行UI测试的Temporal Activity实现
|
||||||
|
@ -2,9 +2,7 @@ package workflows
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"go.temporal.io/sdk/temporal"
|
"go.temporal.io/sdk/temporal"
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
@ -38,6 +36,7 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu
|
|||||||
// 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义
|
// 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义
|
||||||
// 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置
|
// 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置
|
||||||
ctxActivity := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
|
ctxActivity := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
|
||||||
|
TaskQueue: "data-task-queue", // 指定任务队列名称
|
||||||
StartToCloseTimeout: 10 * time.Minute, // Activity 从开始到完成的最大允许时间
|
StartToCloseTimeout: 10 * time.Minute, // Activity 从开始到完成的最大允许时间
|
||||||
HeartbeatTimeout: 30 * time.Second, // Heartbeat 超时时间,防止 Worker 假死
|
HeartbeatTimeout: 30 * time.Second, // Heartbeat 超时时间,防止 Worker 假死
|
||||||
RetryPolicy: &temporal.RetryPolicy{ // Activity 级别的重试策略配置
|
RetryPolicy: &temporal.RetryPolicy{ // Activity 级别的重试策略配置
|
||||||
@ -99,7 +98,30 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu
|
|||||||
// 这是动态工作流的核心:同一个工作流可以执行不同类型的测试
|
// 这是动态工作流的核心:同一个工作流可以执行不同类型的测试
|
||||||
switch step.ActivityName {
|
switch step.ActivityName {
|
||||||
case "RunApiTest":
|
case "RunApiTest":
|
||||||
// API 测试:创建 API 测试请求结构并解析参数
|
apiReq := &pb.ApiTestRequest{}
|
||||||
|
// 1. 首先验证JSON格式
|
||||||
|
if !json.Valid([]byte(step.ParametersJson)) {
|
||||||
|
logger.Error("Invalid JSON format in parameters")
|
||||||
|
overallSuccess = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 解析JSON时增加错误详情
|
||||||
|
if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil {
|
||||||
|
logger.Error("Failed to unmarshal API test parameters",
|
||||||
|
"error", err,
|
||||||
|
"raw_json", step.ParametersJson)
|
||||||
|
overallSuccess = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 验证必要字段
|
||||||
|
if apiReq.Endpoint == "" || apiReq.HttpMethod == "" {
|
||||||
|
logger.Error("Missing required fields in API test parameters")
|
||||||
|
overallSuccess = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
/*// API 测试:创建 API 测试请求结构并解析参数
|
||||||
logger.Info("Running api test activity", "ParametersJson", step.ParametersJson)
|
logger.Info("Running api test activity", "ParametersJson", step.ParametersJson)
|
||||||
fmt.Println(reflect.TypeOf(step.ParametersJson))
|
fmt.Println(reflect.TypeOf(step.ParametersJson))
|
||||||
apiReq := &pb.ApiTestRequest{}
|
apiReq := &pb.ApiTestRequest{}
|
||||||
@ -107,7 +129,7 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu
|
|||||||
logger.Error("Failed to unmarshal API test parameters", "error", err)
|
logger.Error("Failed to unmarshal API test parameters", "error", err)
|
||||||
overallSuccess = false
|
overallSuccess = false
|
||||||
break // 参数解析失败,跳出当前步骤
|
break // 参数解析失败,跳出当前步骤
|
||||||
}
|
}*/
|
||||||
activityInput = apiReq
|
activityInput = apiReq
|
||||||
activityResult = &pb.ApiTestResult{} // 预创建结果容器
|
activityResult = &pb.ApiTestResult{} // 预创建结果容器
|
||||||
|
|
||||||
@ -156,6 +178,7 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu
|
|||||||
// ------------------------------------------------------------------------------------
|
// ------------------------------------------------------------------------------------
|
||||||
// 为 Activity 设置超时、重试等策略,确保测试的可靠性
|
// 为 Activity 设置超时、重试等策略,确保测试的可靠性
|
||||||
ao := workflow.ActivityOptions{
|
ao := workflow.ActivityOptions{
|
||||||
|
TaskQueue: "python-task-queue", // 指定任务队列名称
|
||||||
StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间
|
StartToCloseTimeout: 10 * time.Minute, // 单个测试最长执行时间
|
||||||
HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行
|
HeartbeatTimeout: 30 * time.Second, // 心跳超时,用于检测 Activity 是否还在运行
|
||||||
RetryPolicy: &temporal.RetryPolicy{ // 重试策略配置
|
RetryPolicy: &temporal.RetryPolicy{ // 重试策略配置
|
||||||
|
Loading…
Reference in New Issue
Block a user