重构复合案例相关逻辑,新增DAO层方法以简化数据库操作,优化活动加载步骤的实现

This commit is contained in:
longpeng 2025-06-25 11:03:00 +08:00
parent 6b4350b915
commit 182bbbd215
8 changed files with 263 additions and 105 deletions

View File

@ -1,29 +1,71 @@
package activities
import (
"beacon/pkg/dao/mysql"
"beacon/pkg/pb"
"context"
"fmt"
"math/rand"
"go.temporal.io/sdk/activity"
)
// AddSuffixActivity appends a fixed suffix to the input data.
func AddSuffixActivity(ctx context.Context, data string) (string, error) {
suffixes := []string{
"-one", "-two", "-three", "-four", "-five",
"-six", "-seven", "-eight", "-nine", "-ten",
}
suffix := suffixes[rand.Intn(len(suffixes))]
result := fmt.Sprintf("%s%s", data, suffix)
fmt.Println("Go Activity: Modified data to:", result)
return result, nil
}
// LoadCompositeCaseStepsActivity 用于加载复合案例步骤的Activity结构
// LoadCompositeCaseSteps 辅助 Activity 用于加载复合案例步骤定义 (可选,如果 Workflow 输入不包含完整定义)
// LoadCompositeCaseSteps 加载复合案例步骤定义
func LoadCompositeCaseSteps(ctx context.Context, compositeCaseId string) ([]*pb.CompositeCaseStepDefinition, error) {
// 这是一个 Activity它可以在 Python Worker 或一个 Go Activity Worker 中实现
// 这个 Activity 负责从数据库加载 composite_case_steps 表中的数据
// 并将其转换为 Protobuf 列表返回
// 请确保你在 Python Worker 或另一个 Go Worker 中注册并实现了这个 Activity
return nil, fmt.Errorf("activity not implemented yet") // 仅作示例
// 获取 Activity 日志记录器
logger := activity.GetLogger(ctx)
logger.Info("Loading composite case steps", "compositeCaseId", compositeCaseId)
// 参数验证
if compositeCaseId == "" {
return nil, fmt.Errorf("compositeCaseId cannot be empty")
}
// 发送心跳信号,表明 Activity 正在运行
activity.RecordHeartbeat(ctx)
// 通过DAO从数据库加载复合案例步骤数据
steps, err := dao.GetCompositeCaseSteps(compositeCaseId)
if err != nil {
logger.Error("Failed to load composite case steps from database", "error", err)
return nil, fmt.Errorf("failed to load composite case steps: %w", err)
}
// 如果没有找到步骤,返回空切片而不是错误
if len(steps) == 0 {
logger.Warn("No steps found for composite case", "compositeCaseId", compositeCaseId)
return []*pb.CompositeCaseStepDefinition{}, nil
}
// 转换数据库模型为 Protobuf 结构
var pbSteps []*pb.CompositeCaseStepDefinition
for _, step := range steps {
pbStep := &pb.CompositeCaseStepDefinition{
StepId: int64(step.ID),
StepOrder: int32(step.StepOrder),
StepType: step.StepType,
ActivityName: step.ActivityName,
ParametersJson: step.ParametersJson,
//SuccessNextStepOrder: convertToInt32Ptr(step.SuccessNextStepOrder),
//FailureNextStepOrder: convertToInt32Ptr(step.FailureNextStepOrder),
//IsParallel: step.IsParallel,
//DependsOnStepIds: step.DependsOnStepIds,
//ContinueOnFailure: step.ContinueOnFailure,
//TimeoutSeconds: int32(step.TimeoutSeconds),
//RetryCount: int32(step.RetryCount),
//Description: step.Description,
//CreatedAt: step.CreatedAt.Unix(),
//UpdatedAt: step.UpdatedAt.Unix(),
}
pbSteps = append(pbSteps, pbStep)
}
// 再次发送心跳信号
activity.RecordHeartbeat(ctx)
logger.Info("Successfully loaded composite case steps",
"compositeCaseId", compositeCaseId,
"stepCount", len(pbSteps))
return pbSteps, nil
}

View File

@ -1,4 +1,4 @@
package main
package client
// Go 服务端入口,触发 Workflow
import (
@ -25,12 +25,9 @@ func main() {
// 模拟一个触发测试的事件 (例如来自 Web UI 或 CI/CD)
runID := uuid.New().String()
testInput := &pb.TestRunInput{
RunId: runID,
EnvironmentUrl: "https://example.com",
Tags: []string{"smoke", "critical"},
RunApiTests: true,
RunUiTests: true,
testInput := &pb.DynamicTestRunInput{
RunId: runID,
CompositeCaseId: "11",
}
workflowOptions := client.StartWorkflowOptions{
@ -39,7 +36,7 @@ func main() {
}
fmt.Printf("Starting TestRunWorkflow for run ID: %s\n", runID)
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflows.TestRunWorkflow, testInput)
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflows.DynamicTestSuiteWorkflow, testInput)
if err != nil {
log.Fatalf("Unable to execute workflows: %v", err)
}

157
pkg/dao/mysql/composite.go Normal file
View File

@ -0,0 +1,157 @@
package dao
import (
"beacon/models"
"errors"
"fmt"
"gorm.io/gorm"
"strconv"
)
// CreateCompositeCase 创建复合案例
func CreateCompositeCase(tx *gorm.DB, compositeCase *models.CompositeCase) error {
if tx == nil {
tx = DB
}
if compositeCase.Status == "" {
compositeCase.Status = "active"
}
return tx.Create(compositeCase).Error
}
// GetCompositeCaseByID 根据ID获取复合案例
func GetCompositeCaseByID(id uint) (*models.CompositeCase, error) {
var compositeCase models.CompositeCase
err := DB.Preload("Steps", func(db *gorm.DB) *gorm.DB {
return db.Order("step_order ASC")
}).First(&compositeCase, id).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("复合案例不存在")
}
return nil, fmt.Errorf("获取复合案例失败: %w", err)
}
return &compositeCase, nil
}
// UpdateCompositeCase 更新复合案例基本信息
func UpdateCompositeCase(tx *gorm.DB, id uint, updates map[string]interface{}) error {
if tx == nil {
tx = DB
}
if len(updates) == 0 {
return nil
}
return tx.Model(&models.CompositeCase{}).Where("id = ?", id).Updates(updates).Error
}
// DeleteCompositeCase 删除复合案例
func DeleteCompositeCase(tx *gorm.DB, id uint) error {
if tx == nil {
tx = DB
}
return tx.Delete(&models.CompositeCase{}, id).Error
}
// ListCompositeCases 获取复合案例列表
func ListCompositeCases(page, pageSize int, status string) ([]models.CompositeCase, int64, error) {
var compositeCases []models.CompositeCase
var total int64
query := DB.Model(&models.CompositeCase{})
if status != "" {
query = query.Where("status = ?", status)
}
// 获取总数
if err := query.Count(&total).Error; err != nil {
return nil, 0, fmt.Errorf("获取复合案例总数失败: %w", err)
}
// 分页查询
offset := (page - 1) * pageSize
err := query.Preload("Steps", func(db *gorm.DB) *gorm.DB {
return db.Order("step_order ASC")
}).Offset(offset).Limit(pageSize).Order("created_at DESC").Find(&compositeCases).Error
if err != nil {
return nil, 0, fmt.Errorf("获取复合案例列表失败: %w", err)
}
return compositeCases, total, nil
}
// ExistsCompositeCase 检查复合案例是否存在
func ExistsCompositeCase(tx *gorm.DB, id uint) (*models.CompositeCase, error) {
if tx == nil {
tx = DB
}
var compositeCase models.CompositeCase
err := tx.First(&compositeCase, id).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("复合案例不存在")
}
return nil, fmt.Errorf("查询复合案例失败: %w", err)
}
return &compositeCase, nil
}
// CreateCompositeCaseSteps 创建复合案例步骤
func CreateCompositeCaseSteps(tx *gorm.DB, steps []models.CompositeCaseStep) error {
if tx == nil {
tx = DB
}
if len(steps) == 0 {
return nil
}
return tx.Create(&steps).Error
}
// GetCompositeCaseSteps 根据复合案例ID获取步骤列表
func GetCompositeCaseSteps(compositeCaseId string) ([]models.CompositeCaseStep, error) {
var steps []models.CompositeCaseStep
// 将字符串ID转换为uint
id, err := strconv.ParseUint(compositeCaseId, 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid composite case id: %w", err)
}
err = DB.Where("composite_case_id = ?", uint(id)).
Order("step_order ASC").
Find(&steps).Error
if err != nil {
return nil, fmt.Errorf("获取复合案例步骤失败: %w", err)
}
return steps, nil
}
// DeleteCompositeCaseStepsByCompositeCaseID 根据复合案例ID删除所有步骤
func DeleteCompositeCaseStepsByCompositeCaseID(tx *gorm.DB, compositeCaseId uint) error {
if tx == nil {
tx = DB
}
return tx.Where("composite_case_id = ?", compositeCaseId).Delete(&models.CompositeCaseStep{}).Error
}
// BeginTransaction 开启事务
func BeginTransaction() *gorm.DB {
return DB.Begin()
}

View File

@ -25,7 +25,7 @@ message DynamicStep {
//
message CompositeCaseStepDefinition {
string step_id = 1; // step_id
int64 step_id = 1; // step_id
int32 step_order = 2; //
string step_type = 3; // e.g., "API_TEST", "UI_TEST"
string activity_name = 4; // Temporal Activity

View File

@ -4,10 +4,8 @@ import (
"beacon/models"
"beacon/pkg/dao/mysql"
"beacon/utils"
"errors"
"fmt"
"go.uber.org/zap"
"gorm.io/gorm"
)
type CompositeCaseService struct {
@ -22,7 +20,7 @@ func (s *CompositeCaseService) CreateCompositeCase(req *models.CreateCompositeCa
zap.Int("steps_count", len(req.Steps)))
// 开启事务
tx := dao.DB.Begin()
tx := dao.BeginTransaction()
if tx.Error != nil {
zap.L().Error("创建复合案例失败 - 事务开启失败", zap.Error(tx.Error))
return nil, tx.Error
@ -41,12 +39,7 @@ func (s *CompositeCaseService) CreateCompositeCase(req *models.CreateCompositeCa
Status: req.Status,
}
if compositeCase.Status == "" {
compositeCase.Status = "active"
zap.L().Debug("设置默认状态", zap.String("status", "active"))
}
if err := tx.Create(compositeCase).Error; err != nil {
if err := dao.CreateCompositeCase(tx, compositeCase); err != nil {
zap.L().Error("创建复合案例失败 - 数据库创建失败", zap.Error(err))
tx.Rollback()
return nil, fmt.Errorf("创建复合案例失败: %w", err)
@ -59,6 +52,7 @@ func (s *CompositeCaseService) CreateCompositeCase(req *models.CreateCompositeCa
// 创建步骤
if len(req.Steps) > 0 {
zap.L().Info("开始创建步骤", zap.Int("steps_count", len(req.Steps)))
var steps []models.CompositeCaseStep
for _, stepReq := range req.Steps {
activityName, _ := utils.GetActivityName(stepReq.StepType)
@ -75,7 +69,7 @@ func (s *CompositeCaseService) CreateCompositeCase(req *models.CreateCompositeCa
steps = append(steps, step)
}
if err := tx.Create(&steps).Error; err != nil {
if err := dao.CreateCompositeCaseSteps(tx, steps); err != nil {
zap.L().Error("创建复合案例步骤失败",
zap.Uint("composite_case_id", compositeCase.ID),
zap.Error(err))
@ -100,26 +94,17 @@ func (s *CompositeCaseService) CreateCompositeCase(req *models.CreateCompositeCa
func (s *CompositeCaseService) GetCompositeCaseByID(id uint) (*models.CompositeCase, error) {
zap.L().Debug("开始查询复合案例", zap.Uint("id", id))
var compositeCase models.CompositeCase
err := dao.DB.Preload("Steps", func(db *gorm.DB) *gorm.DB {
return db.Order("step_order ASC")
}).First(&compositeCase, id).Error
compositeCase, err := dao.GetCompositeCaseByID(id)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
zap.L().Warn("复合案例不存在", zap.Uint("id", id))
return nil, fmt.Errorf("复合案例不存在")
}
zap.L().Error("获取复合案例失败", zap.Uint("id", id), zap.Error(err))
return nil, fmt.Errorf("获取复合案例失败: %w", err)
return nil, err
}
zap.L().Debug("复合案例查询成功",
zap.Uint("id", compositeCase.ID),
zap.String("name", compositeCase.Name),
zap.Int("steps_count", len(compositeCase.Steps)))
return &compositeCase, nil
return compositeCase, nil
}
// UpdateCompositeCase 更新复合案例
@ -127,7 +112,7 @@ func (s *CompositeCaseService) UpdateCompositeCase(id uint, req *models.UpdateCo
zap.L().Info("开始更新复合案例", zap.Uint("id", id))
// 开启事务
tx := dao.DB.Begin()
tx := dao.BeginTransaction()
if tx.Error != nil {
zap.L().Error("更新复合案例失败 - 事务开启失败",
zap.Uint("id", id),
@ -144,17 +129,10 @@ func (s *CompositeCaseService) UpdateCompositeCase(id uint, req *models.UpdateCo
}()
// 检查复合案例是否存在
var compositeCase models.CompositeCase
if err := tx.First(&compositeCase, id).Error; err != nil {
if _, err := dao.ExistsCompositeCase(tx, id); err != nil {
tx.Rollback()
if errors.Is(err, gorm.ErrRecordNotFound) {
zap.L().Warn("更新复合案例失败 - 复合案例不存在", zap.Uint("id", id))
return nil, fmt.Errorf("复合案例不存在")
}
zap.L().Error("更新复合案例失败 - 查询失败",
zap.Uint("id", id),
zap.Error(err))
return nil, fmt.Errorf("查询复合案例失败: %w", err)
zap.L().Warn("更新复合案例失败 - 复合案例不存在", zap.Uint("id", id))
return nil, err
}
// 更新基本信息
@ -173,7 +151,7 @@ func (s *CompositeCaseService) UpdateCompositeCase(id uint, req *models.UpdateCo
zap.L().Info("更新复合案例基本信息",
zap.Uint("id", id),
zap.Any("updates", updates))
if err := tx.Model(&compositeCase).Updates(updates).Error; err != nil {
if err := dao.UpdateCompositeCase(tx, id, updates); err != nil {
zap.L().Error("更新复合案例基本信息失败",
zap.Uint("id", id),
zap.Error(err))
@ -189,7 +167,7 @@ func (s *CompositeCaseService) UpdateCompositeCase(id uint, req *models.UpdateCo
zap.Int("new_steps_count", len(req.Steps)))
// 删除现有步骤
if err := tx.Where("composite_case_id = ?", id).Delete(&models.CompositeCaseStep{}).Error; err != nil {
if err := dao.DeleteCompositeCaseStepsByCompositeCaseID(tx, id); err != nil {
zap.L().Error("删除现有步骤失败",
zap.Uint("id", id),
zap.Error(err))
@ -215,7 +193,7 @@ func (s *CompositeCaseService) UpdateCompositeCase(id uint, req *models.UpdateCo
steps = append(steps, step)
}
if err := tx.Create(&steps).Error; err != nil {
if err := dao.CreateCompositeCaseSteps(tx, steps); err != nil {
zap.L().Error("创建新步骤失败",
zap.Uint("id", id),
zap.Error(err))
@ -240,7 +218,7 @@ func (s *CompositeCaseService) DeleteCompositeCase(id uint) error {
zap.L().Info("开始删除复合案例", zap.Uint("id", id))
// 开启事务
tx := dao.DB.Begin()
tx := dao.BeginTransaction()
if tx.Error != nil {
zap.L().Error("删除复合案例失败 - 事务开启失败",
zap.Uint("id", id),
@ -257,17 +235,11 @@ func (s *CompositeCaseService) DeleteCompositeCase(id uint) error {
}()
// 检查复合案例是否存在
var compositeCase models.CompositeCase
if err := tx.First(&compositeCase, id).Error; err != nil {
compositeCase, err := dao.ExistsCompositeCase(tx, id)
if err != nil {
tx.Rollback()
if errors.Is(err, gorm.ErrRecordNotFound) {
zap.L().Warn("删除复合案例失败 - 复合案例不存在", zap.Uint("id", id))
return fmt.Errorf("复合案例不存在")
}
zap.L().Error("删除复合案例失败 - 查询失败",
zap.Uint("id", id),
zap.Error(err))
return fmt.Errorf("查询复合案例失败: %w", err)
zap.L().Warn("删除复合案例失败", zap.Uint("id", id), zap.Error(err))
return err
}
zap.L().Info("找到复合案例,开始删除",
@ -275,7 +247,7 @@ func (s *CompositeCaseService) DeleteCompositeCase(id uint) error {
zap.String("name", compositeCase.Name))
// 删除关联的步骤
if err := tx.Where("composite_case_id = ?", id).Delete(&models.CompositeCaseStep{}).Error; err != nil {
if err := dao.DeleteCompositeCaseStepsByCompositeCaseID(tx, id); err != nil {
zap.L().Error("删除复合案例步骤失败",
zap.Uint("id", id),
zap.Error(err))
@ -286,7 +258,7 @@ func (s *CompositeCaseService) DeleteCompositeCase(id uint) error {
zap.L().Debug("复合案例步骤删除成功", zap.Uint("id", id))
// 删除复合案例
if err := tx.Delete(&compositeCase).Error; err != nil {
if err := dao.DeleteCompositeCase(tx, id); err != nil {
zap.L().Error("删除复合案例失败",
zap.Uint("id", id),
zap.Error(err))
@ -308,33 +280,10 @@ func (s *CompositeCaseService) ListCompositeCases(page, pageSize int, status str
zap.Int("page_size", pageSize),
zap.String("status", status))
var compositeCases []models.CompositeCase
var total int64
query := dao.DB.Model(&models.CompositeCase{})
if status != "" {
query = query.Where("status = ?", status)
zap.L().Debug("添加状态过滤条件", zap.String("status", status))
}
// 获取总数
if err := query.Count(&total).Error; err != nil {
zap.L().Error("获取复合案例总数失败", zap.Error(err))
return nil, 0, fmt.Errorf("获取复合案例总数失败: %w", err)
}
zap.L().Debug("查询到复合案例总数", zap.Int64("total", total))
// 分页查询
offset := (page - 1) * pageSize
err := query.Preload("Steps", func(db *gorm.DB) *gorm.DB {
return db.Order("step_order ASC")
}).Offset(offset).Limit(pageSize).Order("created_at DESC").Find(&compositeCases).Error
compositeCases, total, err := dao.ListCompositeCases(page, pageSize, status)
if err != nil {
zap.L().Error("获取复合案例列表失败", zap.Error(err))
return nil, 0, fmt.Errorf("获取复合案例列表失败: %w", err)
return nil, 0, err
}
zap.L().Info("复合案例列表查询成功",

View File

@ -23,7 +23,8 @@ func main() {
// 将工作流和带有真实 Go 后缀的活动注册到 Worker。
w.RegisterWorkflow(workflows.TestRunWorkflow)
w.RegisterActivity(activities.AddSuffixActivity)
w.RegisterWorkflow(workflows.DynamicTestSuiteWorkflow)
w.RegisterActivity(activities.LoadCompositeCaseSteps)
//(for later) w.RegisterActivity(activities.AddSuffixActivity)
// 注意Python 和 ts 活动将由 Python/ts 进程处理,此处未进行注册。

View File

@ -7,7 +7,7 @@ def execute_api_test_case(test_case_id: str, endpoint: str, http_method: str, he
实际执行API测试的函数
可以集成 pytest, requests 等库
"""
base_url = "http://localhost:8080" # 假设 API 服务的基地址
base_url = "http://101.89.127.197:9080" # 假设 API 服务的基地址
full_url = f"{base_url}{endpoint}"
log_output = []

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"go.temporal.io/sdk/temporal"
"sort"
"strconv"
"time"
"beacon/activities" // 假设你的 activity 包在这个路径
@ -34,12 +35,23 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu
// 方式一 (推荐): 在启动工作流时,由 Go Client 将完整的步骤数据作为输入参数传入
// 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义
// 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置
ctxActivity := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute, // Activity 从开始到完成的最大允许时间
HeartbeatTimeout: 30 * time.Second, // Heartbeat 超时时间,防止 Worker 假死
RetryPolicy: &temporal.RetryPolicy{ // Activity 级别的重试策略配置
InitialInterval: time.Second, // 首次重试前的等待时间
BackoffCoefficient: 1.0, // 重试间隔的递增系数
MaximumInterval: time.Minute, // 重试间隔的最大值
MaximumAttempts: 3, // 最大重试次数
NonRetryableErrorTypes: []string{"NonRetryableErrorType"}, // 自定义不可重试的错误类型
},
}) // 应用 Activity 选项到上下文
var caseSteps []*pb.CompositeCaseStepDefinition // 存储从数据库加载的步骤定义列表
// 执行 LoadCompositeCaseSteps Activity 来获取复合案例的所有步骤定义
// 这个 Activity 会根据 CompositeCaseId 查询数据库并返回步骤配置
err := workflow.ExecuteActivity(ctx, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps)
err := workflow.ExecuteActivity(ctxActivity, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps)
if err != nil {
logger.Error("Failed to load composite case steps", "error", err)
return nil, err
@ -188,7 +200,7 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu
}
// 记录当前步骤的执行结果,用于后续条件跳转判断
stepResults[step.StepId] = stepPassed
stepResults[strconv.FormatInt(step.StepId, 10)] = stepPassed
// ------------------------------------------------------------------------------------
// 步骤4.5: 根据执行结果确定下一步骤(实现 DAG 的条件跳转)