Compare commits
No commits in common. "ae423361600105ea2f83e85c4315543e4dca9621" and "6b4350b915fcbe4255c0ad6186adb2f2d5d360e8" have entirely different histories.
ae42336160
...
6b4350b915
@ -1,71 +1,29 @@
|
|||||||
package activities
|
package activities
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"beacon/pkg/dao/mysql"
|
|
||||||
"beacon/pkg/pb"
|
"beacon/pkg/pb"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"go.temporal.io/sdk/activity"
|
"math/rand"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LoadCompositeCaseStepsActivity 用于加载复合案例步骤的Activity结构
|
// AddSuffixActivity appends a fixed suffix to the input data.
|
||||||
|
func AddSuffixActivity(ctx context.Context, data string) (string, error) {
|
||||||
// LoadCompositeCaseSteps 加载复合案例步骤定义
|
suffixes := []string{
|
||||||
func LoadCompositeCaseSteps(ctx context.Context, compositeCaseId string) ([]*pb.CompositeCaseStepDefinition, error) {
|
"-one", "-two", "-three", "-four", "-five",
|
||||||
// 获取 Activity 日志记录器
|
"-six", "-seven", "-eight", "-nine", "-ten",
|
||||||
logger := activity.GetLogger(ctx)
|
|
||||||
logger.Info("Loading composite case steps", "compositeCaseId", compositeCaseId)
|
|
||||||
|
|
||||||
// 参数验证
|
|
||||||
if compositeCaseId == "" {
|
|
||||||
return nil, fmt.Errorf("compositeCaseId cannot be empty")
|
|
||||||
}
|
}
|
||||||
|
suffix := suffixes[rand.Intn(len(suffixes))]
|
||||||
// 发送心跳信号,表明 Activity 正在运行
|
result := fmt.Sprintf("%s%s", data, suffix)
|
||||||
activity.RecordHeartbeat(ctx)
|
fmt.Println("Go Activity: Modified data to:", result)
|
||||||
|
return result, nil
|
||||||
// 通过DAO从数据库加载复合案例步骤数据
|
}
|
||||||
steps, err := dao.GetCompositeCaseSteps(compositeCaseId)
|
|
||||||
if err != nil {
|
// LoadCompositeCaseSteps 辅助 Activity 用于加载复合案例步骤定义 (可选,如果 Workflow 输入不包含完整定义)
|
||||||
logger.Error("Failed to load composite case steps from database", "error", err)
|
func LoadCompositeCaseSteps(ctx context.Context, compositeCaseId string) ([]*pb.CompositeCaseStepDefinition, error) {
|
||||||
return nil, fmt.Errorf("failed to load composite case steps: %w", err)
|
// 这是一个 Activity,它可以在 Python Worker 或一个 Go Activity Worker 中实现
|
||||||
}
|
// 这个 Activity 负责从数据库加载 composite_case_steps 表中的数据
|
||||||
|
// 并将其转换为 Protobuf 列表返回
|
||||||
// 如果没有找到步骤,返回空切片而不是错误
|
// 请确保你在 Python Worker 或另一个 Go Worker 中注册并实现了这个 Activity
|
||||||
if len(steps) == 0 {
|
return nil, fmt.Errorf("activity not implemented yet") // 仅作示例
|
||||||
logger.Warn("No steps found for composite case", "compositeCaseId", compositeCaseId)
|
|
||||||
return []*pb.CompositeCaseStepDefinition{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// 转换数据库模型为 Protobuf 结构
|
|
||||||
var pbSteps []*pb.CompositeCaseStepDefinition
|
|
||||||
for _, step := range steps {
|
|
||||||
pbStep := &pb.CompositeCaseStepDefinition{
|
|
||||||
StepId: int64(step.ID),
|
|
||||||
StepOrder: int32(step.StepOrder),
|
|
||||||
StepType: step.StepType,
|
|
||||||
ActivityName: step.ActivityName,
|
|
||||||
ParametersJson: step.ParametersJson,
|
|
||||||
//SuccessNextStepOrder: convertToInt32Ptr(step.SuccessNextStepOrder),
|
|
||||||
//FailureNextStepOrder: convertToInt32Ptr(step.FailureNextStepOrder),
|
|
||||||
//IsParallel: step.IsParallel,
|
|
||||||
//DependsOnStepIds: step.DependsOnStepIds,
|
|
||||||
//ContinueOnFailure: step.ContinueOnFailure,
|
|
||||||
//TimeoutSeconds: int32(step.TimeoutSeconds),
|
|
||||||
//RetryCount: int32(step.RetryCount),
|
|
||||||
//Description: step.Description,
|
|
||||||
//CreatedAt: step.CreatedAt.Unix(),
|
|
||||||
//UpdatedAt: step.UpdatedAt.Unix(),
|
|
||||||
}
|
|
||||||
pbSteps = append(pbSteps, pbStep)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 再次发送心跳信号
|
|
||||||
activity.RecordHeartbeat(ctx)
|
|
||||||
|
|
||||||
logger.Info("Successfully loaded composite case steps",
|
|
||||||
"compositeCaseId", compositeCaseId,
|
|
||||||
"stepCount", len(pbSteps))
|
|
||||||
|
|
||||||
return pbSteps, nil
|
|
||||||
}
|
}
|
||||||
|
61
client/main.go
Normal file
61
client/main.go
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
// Go 服务端入口,触发 Workflow
|
||||||
|
import (
|
||||||
|
"beacon/pkg/pb"
|
||||||
|
"beacon/workflows"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"go.temporal.io/sdk/client"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// 创建 Temporal 客户端
|
||||||
|
c, err := client.Dial(client.Options{
|
||||||
|
HostPort: "temporal.newai.day:17233", // 根据你的 Temporal Server 配置
|
||||||
|
Namespace: "default",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Unable to create Temporal client: %v", err)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
// 模拟一个触发测试的事件 (例如来自 Web UI 或 CI/CD)
|
||||||
|
runID := uuid.New().String()
|
||||||
|
testInput := &pb.TestRunInput{
|
||||||
|
RunId: runID,
|
||||||
|
EnvironmentUrl: "https://example.com",
|
||||||
|
Tags: []string{"smoke", "critical"},
|
||||||
|
RunApiTests: true,
|
||||||
|
RunUiTests: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
workflowOptions := client.StartWorkflowOptions{
|
||||||
|
ID: "test_workflow_" + runID,
|
||||||
|
TaskQueue: "test-task-queue", // 保持与 Python Worker 一致
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Starting TestRunWorkflow for run ID: %s\n", runID)
|
||||||
|
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflows.TestRunWorkflow, testInput)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Unable to execute workflows: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Workflow started. Workflow ID: %s, Run ID: %s\n", we.GetID(), we.GetRunID())
|
||||||
|
|
||||||
|
// 等待 Workflow 完成并获取结果
|
||||||
|
var result pb.TestRunOutput
|
||||||
|
err = we.Get(context.Background(), &result)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Unable to get workflows result: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Workflow finished. Overall Success: %t, Message: %s\n", result.OverallSuccess, result.CompletionMessage)
|
||||||
|
fmt.Printf("API Test Results: %+v\n", result.ApiResults)
|
||||||
|
fmt.Printf("UI Test Results: %+v\n", result.UiResults)
|
||||||
|
|
||||||
|
// 后续可以根据 result 生成报告、发送通知等
|
||||||
|
}
|
@ -38,8 +38,7 @@ type SrvConfig struct {
|
|||||||
*ApolloConfig `mapstructure:"apollo"`
|
*ApolloConfig `mapstructure:"apollo"`
|
||||||
*Registrar `mapstructure:"registrar"`
|
*Registrar `mapstructure:"registrar"`
|
||||||
*RcloneConfig `mapstructure:"rclone"`
|
*RcloneConfig `mapstructure:"rclone"`
|
||||||
*OpenAIConfig `mapstructure:"openai"`
|
*OpenAI `mapstructure:"openai"`
|
||||||
*TemporalConfig `mapstructure:"temporal"` // Temporal配置
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type LogConfig struct {
|
type LogConfig struct {
|
||||||
@ -114,18 +113,13 @@ type RcloneConfig struct {
|
|||||||
CustomDomains string `mapstructure:"custom_domains"` // 自定义域
|
CustomDomains string `mapstructure:"custom_domains"` // 自定义域
|
||||||
}
|
}
|
||||||
|
|
||||||
type OpenAIConfig struct {
|
type OpenAI struct {
|
||||||
BaseURL string `mapstructure:"base_url"`
|
BaseURL string `mapstructure:"base_url"`
|
||||||
ApiKey string `mapstructure:"api_key"`
|
ApiKey string `mapstructure:"api_key"`
|
||||||
Model string `mapstructure:"model"`
|
Model string `mapstructure:"model"`
|
||||||
Prompt string `mapstructure:"prompt"`
|
Prompt string `mapstructure:"prompt"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TemporalConfig struct {
|
|
||||||
Host string `mapstructure:"host"` // Temporal服务地址
|
|
||||||
Port int `mapstructure:"port"` // Temporal服务端口
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init 整个服务配置文件初始化的方法
|
// Init 整个服务配置文件初始化的方法
|
||||||
func Init(yamlContent string) (err error) {
|
func Init(yamlContent string) (err error) {
|
||||||
// 方式1:直接指定配置文件路径(相对路径或者绝对路径)
|
// 方式1:直接指定配置文件路径(相对路径或者绝对路径)
|
||||||
|
10
main.go
10
main.go
@ -6,7 +6,6 @@ import (
|
|||||||
"beacon/pkg/logger"
|
"beacon/pkg/logger"
|
||||||
"beacon/pkg/validator"
|
"beacon/pkg/validator"
|
||||||
"beacon/routers"
|
"beacon/routers"
|
||||||
workers "beacon/workers/go"
|
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
@ -61,15 +60,6 @@ func Run() {
|
|||||||
zap.L().Info(fmt.Sprintf("Serving Gin on %s:%d", config.Conf.IP, config.Conf.Port))
|
zap.L().Info(fmt.Sprintf("Serving Gin on %s:%d", config.Conf.IP, config.Conf.Port))
|
||||||
zap.L().Info("service start...")
|
zap.L().Info("service start...")
|
||||||
|
|
||||||
go func() {
|
|
||||||
// 启动 Temporal Worker
|
|
||||||
err := workers.StartWorkflow(config.Conf.TemporalConfig)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("启动 Temporal Worker 失败", zap.Error(err))
|
|
||||||
os.Exit(1) // 如果启动 Temporal Worker 失败,退出程序
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// 接收操作系统发来的中断信号
|
// 接收操作系统发来的中断信号
|
||||||
QuitChan := make(chan os.Signal)
|
QuitChan := make(chan os.Signal)
|
||||||
signal.Notify(QuitChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) // kill -15 CTRL+C kill -9
|
signal.Notify(QuitChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) // kill -15 CTRL+C kill -9
|
||||||
|
@ -1,157 +0,0 @@
|
|||||||
package dao
|
|
||||||
|
|
||||||
import (
|
|
||||||
"beacon/models"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
"strconv"
|
|
||||||
)
|
|
||||||
|
|
||||||
// CreateCompositeCase 创建复合案例
|
|
||||||
func CreateCompositeCase(tx *gorm.DB, compositeCase *models.CompositeCase) error {
|
|
||||||
if tx == nil {
|
|
||||||
tx = DB
|
|
||||||
}
|
|
||||||
|
|
||||||
if compositeCase.Status == "" {
|
|
||||||
compositeCase.Status = "active"
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx.Create(compositeCase).Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetCompositeCaseByID 根据ID获取复合案例
|
|
||||||
func GetCompositeCaseByID(id uint) (*models.CompositeCase, error) {
|
|
||||||
var compositeCase models.CompositeCase
|
|
||||||
|
|
||||||
err := DB.Preload("Steps", func(db *gorm.DB) *gorm.DB {
|
|
||||||
return db.Order("step_order ASC")
|
|
||||||
}).First(&compositeCase, id).Error
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
||||||
return nil, fmt.Errorf("复合案例不存在")
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("获取复合案例失败: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &compositeCase, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateCompositeCase 更新复合案例基本信息
|
|
||||||
func UpdateCompositeCase(tx *gorm.DB, id uint, updates map[string]interface{}) error {
|
|
||||||
if tx == nil {
|
|
||||||
tx = DB
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(updates) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx.Model(&models.CompositeCase{}).Where("id = ?", id).Updates(updates).Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteCompositeCase 删除复合案例
|
|
||||||
func DeleteCompositeCase(tx *gorm.DB, id uint) error {
|
|
||||||
if tx == nil {
|
|
||||||
tx = DB
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx.Delete(&models.CompositeCase{}, id).Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListCompositeCases 获取复合案例列表
|
|
||||||
func ListCompositeCases(page, pageSize int, status string) ([]models.CompositeCase, int64, error) {
|
|
||||||
var compositeCases []models.CompositeCase
|
|
||||||
var total int64
|
|
||||||
|
|
||||||
query := DB.Model(&models.CompositeCase{})
|
|
||||||
|
|
||||||
if status != "" {
|
|
||||||
query = query.Where("status = ?", status)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取总数
|
|
||||||
if err := query.Count(&total).Error; err != nil {
|
|
||||||
return nil, 0, fmt.Errorf("获取复合案例总数失败: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 分页查询
|
|
||||||
offset := (page - 1) * pageSize
|
|
||||||
err := query.Preload("Steps", func(db *gorm.DB) *gorm.DB {
|
|
||||||
return db.Order("step_order ASC")
|
|
||||||
}).Offset(offset).Limit(pageSize).Order("created_at DESC").Find(&compositeCases).Error
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, fmt.Errorf("获取复合案例列表失败: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return compositeCases, total, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExistsCompositeCase 检查复合案例是否存在
|
|
||||||
func ExistsCompositeCase(tx *gorm.DB, id uint) (*models.CompositeCase, error) {
|
|
||||||
if tx == nil {
|
|
||||||
tx = DB
|
|
||||||
}
|
|
||||||
|
|
||||||
var compositeCase models.CompositeCase
|
|
||||||
err := tx.First(&compositeCase, id).Error
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
||||||
return nil, fmt.Errorf("复合案例不存在")
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("查询复合案例失败: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &compositeCase, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateCompositeCaseSteps 创建复合案例步骤
|
|
||||||
func CreateCompositeCaseSteps(tx *gorm.DB, steps []models.CompositeCaseStep) error {
|
|
||||||
if tx == nil {
|
|
||||||
tx = DB
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(steps) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx.Create(&steps).Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetCompositeCaseSteps 根据复合案例ID获取步骤列表
|
|
||||||
func GetCompositeCaseSteps(compositeCaseId string) ([]models.CompositeCaseStep, error) {
|
|
||||||
var steps []models.CompositeCaseStep
|
|
||||||
|
|
||||||
// 将字符串ID转换为uint
|
|
||||||
id, err := strconv.ParseUint(compositeCaseId, 10, 32)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid composite case id: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = DB.Where("composite_case_id = ?", uint(id)).
|
|
||||||
Order("step_order ASC").
|
|
||||||
Find(&steps).Error
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("获取复合案例步骤失败: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return steps, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteCompositeCaseStepsByCompositeCaseID 根据复合案例ID删除所有步骤
|
|
||||||
func DeleteCompositeCaseStepsByCompositeCaseID(tx *gorm.DB, compositeCaseId uint) error {
|
|
||||||
if tx == nil {
|
|
||||||
tx = DB
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx.Where("composite_case_id = ?", compositeCaseId).Delete(&models.CompositeCaseStep{}).Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// BeginTransaction 开启事务
|
|
||||||
func BeginTransaction() *gorm.DB {
|
|
||||||
return DB.Begin()
|
|
||||||
}
|
|
@ -32,8 +32,8 @@ func encodeImageToBase64(file *multipart.FileHeader) (string, error) {
|
|||||||
|
|
||||||
func GetOllamaAICaptcha(file *multipart.FileHeader) (error, string) {
|
func GetOllamaAICaptcha(file *multipart.FileHeader) (error, string) {
|
||||||
client := openai.NewClient(
|
client := openai.NewClient(
|
||||||
option.WithAPIKey(config.Conf.OpenAIConfig.ApiKey), // defaults to os.LookupEnv("OPENAI_API_KEY")
|
option.WithAPIKey(config.Conf.OpenAI.ApiKey), // defaults to os.LookupEnv("OPENAI_API_KEY")
|
||||||
option.WithBaseURL(config.Conf.OpenAIConfig.BaseURL),
|
option.WithBaseURL(config.Conf.OpenAI.BaseURL),
|
||||||
)
|
)
|
||||||
|
|
||||||
base64Image, _ := encodeImageToBase64(file)
|
base64Image, _ := encodeImageToBase64(file)
|
||||||
@ -49,7 +49,7 @@ func GetOllamaAICaptcha(file *multipart.FileHeader) (error, string) {
|
|||||||
},
|
},
|
||||||
openai.ChatCompletionContentPartUnionParam{
|
openai.ChatCompletionContentPartUnionParam{
|
||||||
OfText: &openai.ChatCompletionContentPartTextParam{
|
OfText: &openai.ChatCompletionContentPartTextParam{
|
||||||
Text: config.Conf.OpenAIConfig.Prompt,
|
Text: config.Conf.OpenAI.Prompt,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@ -57,7 +57,7 @@ func GetOllamaAICaptcha(file *multipart.FileHeader) (error, string) {
|
|||||||
Messages: []openai.ChatCompletionMessageParamUnion{
|
Messages: []openai.ChatCompletionMessageParamUnion{
|
||||||
openai.UserMessage(message),
|
openai.UserMessage(message),
|
||||||
},
|
},
|
||||||
Model: config.Conf.OpenAIConfig.Model,
|
Model: config.Conf.OpenAI.Model,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err, ""
|
return err, ""
|
||||||
|
@ -25,7 +25,7 @@ message DynamicStep {
|
|||||||
|
|
||||||
// 定义一个复合测试用例中的单个步骤
|
// 定义一个复合测试用例中的单个步骤
|
||||||
message CompositeCaseStepDefinition {
|
message CompositeCaseStepDefinition {
|
||||||
int64 step_id = 1; // 对应数据库中的 step_id
|
string step_id = 1; // 对应数据库中的 step_id
|
||||||
int32 step_order = 2; // 步骤执行顺序
|
int32 step_order = 2; // 步骤执行顺序
|
||||||
string step_type = 3; // e.g., "API_TEST", "UI_TEST"
|
string step_type = 3; // e.g., "API_TEST", "UI_TEST"
|
||||||
string activity_name = 4; // 对应的 Temporal Activity 函数名
|
string activity_name = 4; // 对应的 Temporal Activity 函数名
|
||||||
|
@ -1,76 +0,0 @@
|
|||||||
package handlers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"beacon/services"
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
)
|
|
||||||
|
|
||||||
type WorkflowHandler struct {
|
|
||||||
service *services.WorkflowService
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWorkflowHandler() *WorkflowHandler {
|
|
||||||
return &WorkflowHandler{
|
|
||||||
service: &services.WorkflowService{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// StartWorkflow POST /api/workflow/start
|
|
||||||
func (h *WorkflowHandler) StartWorkflow(c *gin.Context) {
|
|
||||||
|
|
||||||
err := h.service.Start("11")
|
|
||||||
if err != nil {
|
|
||||||
c.JSON(500, gin.H{
|
|
||||||
"code": -1,
|
|
||||||
"message": "Failed to start workflow",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.JSON(200, gin.H{
|
|
||||||
"message": "Workflow started successfully",
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// StopWorkflow POST /api/workflow/stop
|
|
||||||
func (h *WorkflowHandler) StopWorkflow(c *gin.Context) {
|
|
||||||
// 停止工作流的逻辑
|
|
||||||
// 这里可以调用 Temporal 的 API 来停止指定的工作流
|
|
||||||
// 例如,使用 WorkflowID 或 RunID 来停止工作流
|
|
||||||
c.JSON(200, gin.H{
|
|
||||||
"message": "Workflow stopped successfully",
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetWorkflowStatus GET /api/workflow/status/{workflowID}
|
|
||||||
func (h *WorkflowHandler) GetWorkflowStatus(c *gin.Context) {
|
|
||||||
workflowID := c.Param("workflowID")
|
|
||||||
if workflowID == "" {
|
|
||||||
c.JSON(400, gin.H{"error": "Workflow ID is required"})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
status, err := h.service.GetWorkflowStatus(workflowID)
|
|
||||||
if err != nil {
|
|
||||||
c.JSON(500, gin.H{"error": "Failed to get workflow status", "details": err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.JSON(200, gin.H{"status": status})
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetWorkflowResults GET /api/workflow/results/{workflowID}
|
|
||||||
func (h *WorkflowHandler) GetWorkflowResults(c *gin.Context) {
|
|
||||||
workflowID := c.Param("workflowID")
|
|
||||||
if workflowID == "" {
|
|
||||||
c.JSON(400, gin.H{"error": "Workflow ID is required"})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
results, err := h.service.GetWorkflowResults(workflowID)
|
|
||||||
if err != nil {
|
|
||||||
c.JSON(500, gin.H{"error": "Failed to get workflow results", "details": err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.JSON(200, gin.H{"results": results})
|
|
||||||
}
|
|
@ -86,7 +86,6 @@ func Init() *gin.Engine {
|
|||||||
// 路由分组
|
// 路由分组
|
||||||
v1 := r.Group(config.Conf.Version)
|
v1 := r.Group(config.Conf.Version)
|
||||||
SetupCompositeCaseRoutes(v1)
|
SetupCompositeCaseRoutes(v1)
|
||||||
SetupWorkflowRoutes(v1)
|
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,16 +104,3 @@ func SetupCompositeCaseRoutes(group *gin.RouterGroup) {
|
|||||||
api.DELETE("/composite-cases/:id", handler.DeleteCompositeCase)
|
api.DELETE("/composite-cases/:id", handler.DeleteCompositeCase)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetupWorkflowRoutes(group *gin.RouterGroup) {
|
|
||||||
// 初始化服务和处理器
|
|
||||||
handler := *handlers.NewWorkflowHandler()
|
|
||||||
|
|
||||||
api := group.Group("/api")
|
|
||||||
{
|
|
||||||
// 工作流相关路由
|
|
||||||
api.POST("/workflows/start", handler.StartWorkflow)
|
|
||||||
api.GET("/workflows/:id", handler.GetWorkflowStatus)
|
|
||||||
api.GET("/workflows/:id/results", handler.GetWorkflowResults)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -4,8 +4,10 @@ import (
|
|||||||
"beacon/models"
|
"beacon/models"
|
||||||
"beacon/pkg/dao/mysql"
|
"beacon/pkg/dao/mysql"
|
||||||
"beacon/utils"
|
"beacon/utils"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"gorm.io/gorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
type CompositeCaseService struct {
|
type CompositeCaseService struct {
|
||||||
@ -20,7 +22,7 @@ func (s *CompositeCaseService) CreateCompositeCase(req *models.CreateCompositeCa
|
|||||||
zap.Int("steps_count", len(req.Steps)))
|
zap.Int("steps_count", len(req.Steps)))
|
||||||
|
|
||||||
// 开启事务
|
// 开启事务
|
||||||
tx := dao.BeginTransaction()
|
tx := dao.DB.Begin()
|
||||||
if tx.Error != nil {
|
if tx.Error != nil {
|
||||||
zap.L().Error("创建复合案例失败 - 事务开启失败", zap.Error(tx.Error))
|
zap.L().Error("创建复合案例失败 - 事务开启失败", zap.Error(tx.Error))
|
||||||
return nil, tx.Error
|
return nil, tx.Error
|
||||||
@ -39,7 +41,12 @@ func (s *CompositeCaseService) CreateCompositeCase(req *models.CreateCompositeCa
|
|||||||
Status: req.Status,
|
Status: req.Status,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := dao.CreateCompositeCase(tx, compositeCase); err != nil {
|
if compositeCase.Status == "" {
|
||||||
|
compositeCase.Status = "active"
|
||||||
|
zap.L().Debug("设置默认状态", zap.String("status", "active"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Create(compositeCase).Error; err != nil {
|
||||||
zap.L().Error("创建复合案例失败 - 数据库创建失败", zap.Error(err))
|
zap.L().Error("创建复合案例失败 - 数据库创建失败", zap.Error(err))
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return nil, fmt.Errorf("创建复合案例失败: %w", err)
|
return nil, fmt.Errorf("创建复合案例失败: %w", err)
|
||||||
@ -52,7 +59,6 @@ func (s *CompositeCaseService) CreateCompositeCase(req *models.CreateCompositeCa
|
|||||||
// 创建步骤
|
// 创建步骤
|
||||||
if len(req.Steps) > 0 {
|
if len(req.Steps) > 0 {
|
||||||
zap.L().Info("开始创建步骤", zap.Int("steps_count", len(req.Steps)))
|
zap.L().Info("开始创建步骤", zap.Int("steps_count", len(req.Steps)))
|
||||||
|
|
||||||
var steps []models.CompositeCaseStep
|
var steps []models.CompositeCaseStep
|
||||||
for _, stepReq := range req.Steps {
|
for _, stepReq := range req.Steps {
|
||||||
activityName, _ := utils.GetActivityName(stepReq.StepType)
|
activityName, _ := utils.GetActivityName(stepReq.StepType)
|
||||||
@ -69,7 +75,7 @@ func (s *CompositeCaseService) CreateCompositeCase(req *models.CreateCompositeCa
|
|||||||
steps = append(steps, step)
|
steps = append(steps, step)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := dao.CreateCompositeCaseSteps(tx, steps); err != nil {
|
if err := tx.Create(&steps).Error; err != nil {
|
||||||
zap.L().Error("创建复合案例步骤失败",
|
zap.L().Error("创建复合案例步骤失败",
|
||||||
zap.Uint("composite_case_id", compositeCase.ID),
|
zap.Uint("composite_case_id", compositeCase.ID),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
@ -94,17 +100,26 @@ func (s *CompositeCaseService) CreateCompositeCase(req *models.CreateCompositeCa
|
|||||||
func (s *CompositeCaseService) GetCompositeCaseByID(id uint) (*models.CompositeCase, error) {
|
func (s *CompositeCaseService) GetCompositeCaseByID(id uint) (*models.CompositeCase, error) {
|
||||||
zap.L().Debug("开始查询复合案例", zap.Uint("id", id))
|
zap.L().Debug("开始查询复合案例", zap.Uint("id", id))
|
||||||
|
|
||||||
compositeCase, err := dao.GetCompositeCaseByID(id)
|
var compositeCase models.CompositeCase
|
||||||
|
|
||||||
|
err := dao.DB.Preload("Steps", func(db *gorm.DB) *gorm.DB {
|
||||||
|
return db.Order("step_order ASC")
|
||||||
|
}).First(&compositeCase, id).Error
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
zap.L().Warn("复合案例不存在", zap.Uint("id", id))
|
||||||
|
return nil, fmt.Errorf("复合案例不存在")
|
||||||
|
}
|
||||||
zap.L().Error("获取复合案例失败", zap.Uint("id", id), zap.Error(err))
|
zap.L().Error("获取复合案例失败", zap.Uint("id", id), zap.Error(err))
|
||||||
return nil, err
|
return nil, fmt.Errorf("获取复合案例失败: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
zap.L().Debug("复合案例查询成功",
|
zap.L().Debug("复合案例查询成功",
|
||||||
zap.Uint("id", compositeCase.ID),
|
zap.Uint("id", compositeCase.ID),
|
||||||
zap.String("name", compositeCase.Name),
|
zap.String("name", compositeCase.Name),
|
||||||
zap.Int("steps_count", len(compositeCase.Steps)))
|
zap.Int("steps_count", len(compositeCase.Steps)))
|
||||||
return compositeCase, nil
|
return &compositeCase, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateCompositeCase 更新复合案例
|
// UpdateCompositeCase 更新复合案例
|
||||||
@ -112,7 +127,7 @@ func (s *CompositeCaseService) UpdateCompositeCase(id uint, req *models.UpdateCo
|
|||||||
zap.L().Info("开始更新复合案例", zap.Uint("id", id))
|
zap.L().Info("开始更新复合案例", zap.Uint("id", id))
|
||||||
|
|
||||||
// 开启事务
|
// 开启事务
|
||||||
tx := dao.BeginTransaction()
|
tx := dao.DB.Begin()
|
||||||
if tx.Error != nil {
|
if tx.Error != nil {
|
||||||
zap.L().Error("更新复合案例失败 - 事务开启失败",
|
zap.L().Error("更新复合案例失败 - 事务开启失败",
|
||||||
zap.Uint("id", id),
|
zap.Uint("id", id),
|
||||||
@ -129,10 +144,17 @@ func (s *CompositeCaseService) UpdateCompositeCase(id uint, req *models.UpdateCo
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// 检查复合案例是否存在
|
// 检查复合案例是否存在
|
||||||
if _, err := dao.ExistsCompositeCase(tx, id); err != nil {
|
var compositeCase models.CompositeCase
|
||||||
|
if err := tx.First(&compositeCase, id).Error; err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
zap.L().Warn("更新复合案例失败 - 复合案例不存在", zap.Uint("id", id))
|
zap.L().Warn("更新复合案例失败 - 复合案例不存在", zap.Uint("id", id))
|
||||||
return nil, err
|
return nil, fmt.Errorf("复合案例不存在")
|
||||||
|
}
|
||||||
|
zap.L().Error("更新复合案例失败 - 查询失败",
|
||||||
|
zap.Uint("id", id),
|
||||||
|
zap.Error(err))
|
||||||
|
return nil, fmt.Errorf("查询复合案例失败: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 更新基本信息
|
// 更新基本信息
|
||||||
@ -151,7 +173,7 @@ func (s *CompositeCaseService) UpdateCompositeCase(id uint, req *models.UpdateCo
|
|||||||
zap.L().Info("更新复合案例基本信息",
|
zap.L().Info("更新复合案例基本信息",
|
||||||
zap.Uint("id", id),
|
zap.Uint("id", id),
|
||||||
zap.Any("updates", updates))
|
zap.Any("updates", updates))
|
||||||
if err := dao.UpdateCompositeCase(tx, id, updates); err != nil {
|
if err := tx.Model(&compositeCase).Updates(updates).Error; err != nil {
|
||||||
zap.L().Error("更新复合案例基本信息失败",
|
zap.L().Error("更新复合案例基本信息失败",
|
||||||
zap.Uint("id", id),
|
zap.Uint("id", id),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
@ -167,7 +189,7 @@ func (s *CompositeCaseService) UpdateCompositeCase(id uint, req *models.UpdateCo
|
|||||||
zap.Int("new_steps_count", len(req.Steps)))
|
zap.Int("new_steps_count", len(req.Steps)))
|
||||||
|
|
||||||
// 删除现有步骤
|
// 删除现有步骤
|
||||||
if err := dao.DeleteCompositeCaseStepsByCompositeCaseID(tx, id); err != nil {
|
if err := tx.Where("composite_case_id = ?", id).Delete(&models.CompositeCaseStep{}).Error; err != nil {
|
||||||
zap.L().Error("删除现有步骤失败",
|
zap.L().Error("删除现有步骤失败",
|
||||||
zap.Uint("id", id),
|
zap.Uint("id", id),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
@ -193,7 +215,7 @@ func (s *CompositeCaseService) UpdateCompositeCase(id uint, req *models.UpdateCo
|
|||||||
steps = append(steps, step)
|
steps = append(steps, step)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := dao.CreateCompositeCaseSteps(tx, steps); err != nil {
|
if err := tx.Create(&steps).Error; err != nil {
|
||||||
zap.L().Error("创建新步骤失败",
|
zap.L().Error("创建新步骤失败",
|
||||||
zap.Uint("id", id),
|
zap.Uint("id", id),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
@ -218,7 +240,7 @@ func (s *CompositeCaseService) DeleteCompositeCase(id uint) error {
|
|||||||
zap.L().Info("开始删除复合案例", zap.Uint("id", id))
|
zap.L().Info("开始删除复合案例", zap.Uint("id", id))
|
||||||
|
|
||||||
// 开启事务
|
// 开启事务
|
||||||
tx := dao.BeginTransaction()
|
tx := dao.DB.Begin()
|
||||||
if tx.Error != nil {
|
if tx.Error != nil {
|
||||||
zap.L().Error("删除复合案例失败 - 事务开启失败",
|
zap.L().Error("删除复合案例失败 - 事务开启失败",
|
||||||
zap.Uint("id", id),
|
zap.Uint("id", id),
|
||||||
@ -235,11 +257,17 @@ func (s *CompositeCaseService) DeleteCompositeCase(id uint) error {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// 检查复合案例是否存在
|
// 检查复合案例是否存在
|
||||||
compositeCase, err := dao.ExistsCompositeCase(tx, id)
|
var compositeCase models.CompositeCase
|
||||||
if err != nil {
|
if err := tx.First(&compositeCase, id).Error; err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
zap.L().Warn("删除复合案例失败", zap.Uint("id", id), zap.Error(err))
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
return err
|
zap.L().Warn("删除复合案例失败 - 复合案例不存在", zap.Uint("id", id))
|
||||||
|
return fmt.Errorf("复合案例不存在")
|
||||||
|
}
|
||||||
|
zap.L().Error("删除复合案例失败 - 查询失败",
|
||||||
|
zap.Uint("id", id),
|
||||||
|
zap.Error(err))
|
||||||
|
return fmt.Errorf("查询复合案例失败: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
zap.L().Info("找到复合案例,开始删除",
|
zap.L().Info("找到复合案例,开始删除",
|
||||||
@ -247,7 +275,7 @@ func (s *CompositeCaseService) DeleteCompositeCase(id uint) error {
|
|||||||
zap.String("name", compositeCase.Name))
|
zap.String("name", compositeCase.Name))
|
||||||
|
|
||||||
// 删除关联的步骤
|
// 删除关联的步骤
|
||||||
if err := dao.DeleteCompositeCaseStepsByCompositeCaseID(tx, id); err != nil {
|
if err := tx.Where("composite_case_id = ?", id).Delete(&models.CompositeCaseStep{}).Error; err != nil {
|
||||||
zap.L().Error("删除复合案例步骤失败",
|
zap.L().Error("删除复合案例步骤失败",
|
||||||
zap.Uint("id", id),
|
zap.Uint("id", id),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
@ -258,7 +286,7 @@ func (s *CompositeCaseService) DeleteCompositeCase(id uint) error {
|
|||||||
zap.L().Debug("复合案例步骤删除成功", zap.Uint("id", id))
|
zap.L().Debug("复合案例步骤删除成功", zap.Uint("id", id))
|
||||||
|
|
||||||
// 删除复合案例
|
// 删除复合案例
|
||||||
if err := dao.DeleteCompositeCase(tx, id); err != nil {
|
if err := tx.Delete(&compositeCase).Error; err != nil {
|
||||||
zap.L().Error("删除复合案例失败",
|
zap.L().Error("删除复合案例失败",
|
||||||
zap.Uint("id", id),
|
zap.Uint("id", id),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
@ -280,10 +308,33 @@ func (s *CompositeCaseService) ListCompositeCases(page, pageSize int, status str
|
|||||||
zap.Int("page_size", pageSize),
|
zap.Int("page_size", pageSize),
|
||||||
zap.String("status", status))
|
zap.String("status", status))
|
||||||
|
|
||||||
compositeCases, total, err := dao.ListCompositeCases(page, pageSize, status)
|
var compositeCases []models.CompositeCase
|
||||||
|
var total int64
|
||||||
|
|
||||||
|
query := dao.DB.Model(&models.CompositeCase{})
|
||||||
|
|
||||||
|
if status != "" {
|
||||||
|
query = query.Where("status = ?", status)
|
||||||
|
zap.L().Debug("添加状态过滤条件", zap.String("status", status))
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取总数
|
||||||
|
if err := query.Count(&total).Error; err != nil {
|
||||||
|
zap.L().Error("获取复合案例总数失败", zap.Error(err))
|
||||||
|
return nil, 0, fmt.Errorf("获取复合案例总数失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
zap.L().Debug("查询到复合案例总数", zap.Int64("total", total))
|
||||||
|
|
||||||
|
// 分页查询
|
||||||
|
offset := (page - 1) * pageSize
|
||||||
|
err := query.Preload("Steps", func(db *gorm.DB) *gorm.DB {
|
||||||
|
return db.Order("step_order ASC")
|
||||||
|
}).Offset(offset).Limit(pageSize).Order("created_at DESC").Find(&compositeCases).Error
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error("获取复合案例列表失败", zap.Error(err))
|
zap.L().Error("获取复合案例列表失败", zap.Error(err))
|
||||||
return nil, 0, err
|
return nil, 0, fmt.Errorf("获取复合案例列表失败: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
zap.L().Info("复合案例列表查询成功",
|
zap.L().Info("复合案例列表查询成功",
|
||||||
|
@ -1,79 +0,0 @@
|
|||||||
package services
|
|
||||||
|
|
||||||
// Go 服务端入口,触发 Workflow
|
|
||||||
import (
|
|
||||||
"beacon/pkg/pb"
|
|
||||||
"beacon/workflows"
|
|
||||||
"context"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"go.temporal.io/sdk/client"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type WorkflowService struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *WorkflowService) Start(compositeCaseId string) error {
|
|
||||||
// 创建 Temporal 客户端
|
|
||||||
c, err := client.Dial(client.Options{
|
|
||||||
HostPort: "temporal.newai.day:17233", // 根据你的 Temporal Server 配置
|
|
||||||
Namespace: "default",
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Unable to create Temporal client", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer c.Close()
|
|
||||||
|
|
||||||
// 模拟一个触发测试的事件 (例如来自 Web UI 或 CI/CD)
|
|
||||||
runID := uuid.New().String()
|
|
||||||
testInput := &pb.DynamicTestRunInput{
|
|
||||||
RunId: runID,
|
|
||||||
CompositeCaseId: compositeCaseId,
|
|
||||||
}
|
|
||||||
|
|
||||||
workflowOptions := client.StartWorkflowOptions{
|
|
||||||
ID: "test_workflow_" + runID,
|
|
||||||
TaskQueue: "test-task-queue", // 保持与 Python Worker 一致
|
|
||||||
}
|
|
||||||
|
|
||||||
zap.L().Info("Starting TestRunWorkflow", zap.String("runID", runID))
|
|
||||||
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflows.DynamicTestSuiteWorkflow, testInput)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Unable to execute workflows", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
zap.L().Info("Workflow started", zap.String("workflowID", we.GetID()), zap.String("runID", we.GetRunID()))
|
|
||||||
|
|
||||||
// 等待 Workflow 完成并获取结果
|
|
||||||
var result pb.TestRunOutput
|
|
||||||
err = we.Get(context.Background(), &result)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Unable to get workflows result", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
zap.L().Info("Workflow finished", zap.Bool("OverallSuccess", result.OverallSuccess), zap.String("Message", result.CompletionMessage))
|
|
||||||
zap.L().Debug("API Test Results", zap.Any("ApiResults", result.ApiResults))
|
|
||||||
zap.L().Debug("UI Test Results", zap.Any("UiResults", result.UiResults))
|
|
||||||
|
|
||||||
// 后续可以根据 result 生成报告、发送通知等
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *WorkflowService) GetWorkflowResults(workflowID string) (status string, err error) {
|
|
||||||
// 这里可以实现获取工作流结果的逻辑
|
|
||||||
// 例如,查询 Temporal Server 获取指定工作流的结果
|
|
||||||
// 可以使用 WorkflowID 或 RunID 来查询
|
|
||||||
zap.L().Debug("GetWorkflowResults", zap.Any("workflowID", workflowID))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *WorkflowService) GetWorkflowStatus(workflowID string) (status string, err error) {
|
|
||||||
// 这里可以实现获取工作流状态的逻辑
|
|
||||||
// 例如,查询 Temporal Server 获取指定工作流的状态
|
|
||||||
// 可以使用 WorkflowID 或 RunID 来查询
|
|
||||||
zap.L().Debug("GetWorkflowStatus", zap.Any("workflowID", workflowID))
|
|
||||||
return
|
|
||||||
}
|
|
36
workers/go/main.go
Normal file
36
workers/go/main.go
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"beacon/activities"
|
||||||
|
"beacon/workflows"
|
||||||
|
"go.temporal.io/sdk/client"
|
||||||
|
"go.temporal.io/sdk/worker"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// 创建使用默认选项的 Temporal 客户端。
|
||||||
|
c, err := client.Dial(client.Options{HostPort: "temporal.newai.day:17233"})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalln("Unable to create Temporal client", err)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
taskQueue := "test-task-queue"
|
||||||
|
|
||||||
|
// 创建一个监听指定任务队列的 Worker。
|
||||||
|
w := worker.New(c, taskQueue, worker.Options{})
|
||||||
|
|
||||||
|
// 将工作流和带有真实 Go 后缀的活动注册到 Worker。
|
||||||
|
w.RegisterWorkflow(workflows.TestRunWorkflow)
|
||||||
|
w.RegisterActivity(activities.AddSuffixActivity)
|
||||||
|
//(for later) w.RegisterActivity(activities.AddSuffixActivity)
|
||||||
|
|
||||||
|
// 注意:Python 和 ts 活动将由 Python/ts 进程处理,此处未进行注册。
|
||||||
|
|
||||||
|
// 启动 Worker。此调用会阻塞,直到 Worker 被中断。
|
||||||
|
err = w.Run(worker.InterruptCh())
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalln("Unable to start Worker", err)
|
||||||
|
}
|
||||||
|
}
|
@ -1,70 +0,0 @@
|
|||||||
package workers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"beacon/activities"
|
|
||||||
"beacon/config"
|
|
||||||
"beacon/workflows"
|
|
||||||
"fmt"
|
|
||||||
"go.temporal.io/sdk/client"
|
|
||||||
"go.temporal.io/sdk/worker"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"log"
|
|
||||||
)
|
|
||||||
|
|
||||||
func StartWorkflow(cnf *config.TemporalConfig) error {
|
|
||||||
// 创建使用默认选项的 Temporal 客户端。
|
|
||||||
c, err := client.Dial(client.Options{HostPort: fmt.Sprintf("%s:%d", cnf.Host, cnf.Port)})
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalln("Unable to create Temporal client", err)
|
|
||||||
}
|
|
||||||
defer c.Close()
|
|
||||||
|
|
||||||
taskQueue := "test-task-queue"
|
|
||||||
|
|
||||||
// 创建一个监听指定任务队列的 Worker。
|
|
||||||
w := worker.New(c, taskQueue, worker.Options{})
|
|
||||||
|
|
||||||
// 将工作流和带有真实 Go 后缀的活动注册到 Worker。
|
|
||||||
w.RegisterWorkflow(workflows.TestRunWorkflow)
|
|
||||||
w.RegisterWorkflow(workflows.DynamicTestSuiteWorkflow)
|
|
||||||
w.RegisterActivity(activities.LoadCompositeCaseSteps)
|
|
||||||
//(for later) w.RegisterActivity(activities.AddSuffixActivity)
|
|
||||||
|
|
||||||
// 注意:Python 和 ts 活动将由 Python/ts 进程处理,此处未进行注册。
|
|
||||||
|
|
||||||
// 启动 Worker。此调用会阻塞,直到 Worker 被中断。
|
|
||||||
err = w.Run(worker.InterruptCh())
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error(fmt.Sprintf("Unable to start worker: %v", err))
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
/*func main() {
|
|
||||||
// 创建使用默认选项的 Temporal 客户端。
|
|
||||||
c, err := client.Dial(client.Options{HostPort: "temporal.newai.day:17233"})
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalln("Unable to create Temporal client", err)
|
|
||||||
}
|
|
||||||
defer c.Close()
|
|
||||||
|
|
||||||
taskQueue := "test-task-queue"
|
|
||||||
|
|
||||||
// 创建一个监听指定任务队列的 Worker。
|
|
||||||
w := worker.New(c, taskQueue, worker.Options{})
|
|
||||||
|
|
||||||
// 将工作流和带有真实 Go 后缀的活动注册到 Worker。
|
|
||||||
w.RegisterWorkflow(workflows.TestRunWorkflow)
|
|
||||||
w.RegisterWorkflow(workflows.DynamicTestSuiteWorkflow)
|
|
||||||
w.RegisterActivity(activities.LoadCompositeCaseSteps)
|
|
||||||
//(for later) w.RegisterActivity(activities.AddSuffixActivity)
|
|
||||||
|
|
||||||
// 注意:Python 和 ts 活动将由 Python/ts 进程处理,此处未进行注册。
|
|
||||||
|
|
||||||
// 启动 Worker。此调用会阻塞,直到 Worker 被中断。
|
|
||||||
err = w.Run(worker.InterruptCh())
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalln("Unable to start Worker", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
@ -7,7 +7,7 @@ def execute_api_test_case(test_case_id: str, endpoint: str, http_method: str, he
|
|||||||
实际执行API测试的函数。
|
实际执行API测试的函数。
|
||||||
可以集成 pytest, requests 等库。
|
可以集成 pytest, requests 等库。
|
||||||
"""
|
"""
|
||||||
base_url = "http://101.89.127.197:9080" # 假设 API 服务的基地址
|
base_url = "http://localhost:8080" # 假设 API 服务的基地址
|
||||||
|
|
||||||
full_url = f"{base_url}{endpoint}"
|
full_url = f"{base_url}{endpoint}"
|
||||||
log_output = []
|
log_output = []
|
||||||
|
@ -2,11 +2,8 @@ package workflows
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"go.temporal.io/sdk/temporal"
|
"go.temporal.io/sdk/temporal"
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"beacon/activities" // 假设你的 activity 包在这个路径
|
"beacon/activities" // 假设你的 activity 包在这个路径
|
||||||
@ -37,23 +34,12 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu
|
|||||||
// 方式一 (推荐): 在启动工作流时,由 Go Client 将完整的步骤数据作为输入参数传入
|
// 方式一 (推荐): 在启动工作流时,由 Go Client 将完整的步骤数据作为输入参数传入
|
||||||
// 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义
|
// 方式二 (适用于大数据): 工作流通过 Activity 从数据库动态加载步骤定义
|
||||||
// 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置
|
// 这里采用方式二,通过 LoadCompositeCaseSteps Activity 从数据库加载步骤配置
|
||||||
ctxActivity := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
|
|
||||||
StartToCloseTimeout: 10 * time.Minute, // Activity 从开始到完成的最大允许时间
|
|
||||||
HeartbeatTimeout: 30 * time.Second, // Heartbeat 超时时间,防止 Worker 假死
|
|
||||||
RetryPolicy: &temporal.RetryPolicy{ // Activity 级别的重试策略配置
|
|
||||||
InitialInterval: time.Second, // 首次重试前的等待时间
|
|
||||||
BackoffCoefficient: 1.0, // 重试间隔的递增系数
|
|
||||||
MaximumInterval: time.Minute, // 重试间隔的最大值
|
|
||||||
MaximumAttempts: 3, // 最大重试次数
|
|
||||||
NonRetryableErrorTypes: []string{"NonRetryableErrorType"}, // 自定义不可重试的错误类型
|
|
||||||
},
|
|
||||||
}) // 应用 Activity 选项到上下文
|
|
||||||
|
|
||||||
var caseSteps []*pb.CompositeCaseStepDefinition // 存储从数据库加载的步骤定义列表
|
var caseSteps []*pb.CompositeCaseStepDefinition // 存储从数据库加载的步骤定义列表
|
||||||
|
|
||||||
// 执行 LoadCompositeCaseSteps Activity 来获取复合案例的所有步骤定义
|
// 执行 LoadCompositeCaseSteps Activity 来获取复合案例的所有步骤定义
|
||||||
// 这个 Activity 会根据 CompositeCaseId 查询数据库并返回步骤配置
|
// 这个 Activity 会根据 CompositeCaseId 查询数据库并返回步骤配置
|
||||||
err := workflow.ExecuteActivity(ctxActivity, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps)
|
err := workflow.ExecuteActivity(ctx, activities.LoadCompositeCaseSteps, input.CompositeCaseId).Get(ctx, &caseSteps)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed to load composite case steps", "error", err)
|
logger.Error("Failed to load composite case steps", "error", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -100,8 +86,6 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu
|
|||||||
switch step.ActivityName {
|
switch step.ActivityName {
|
||||||
case "RunApiTest":
|
case "RunApiTest":
|
||||||
// API 测试:创建 API 测试请求结构并解析参数
|
// API 测试:创建 API 测试请求结构并解析参数
|
||||||
logger.Info("Running api test activity", "ParametersJson", step.ParametersJson)
|
|
||||||
fmt.Println(reflect.TypeOf(step.ParametersJson))
|
|
||||||
apiReq := &pb.ApiTestRequest{}
|
apiReq := &pb.ApiTestRequest{}
|
||||||
if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil {
|
if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil {
|
||||||
logger.Error("Failed to unmarshal API test parameters", "error", err)
|
logger.Error("Failed to unmarshal API test parameters", "error", err)
|
||||||
@ -204,7 +188,7 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 记录当前步骤的执行结果,用于后续条件跳转判断
|
// 记录当前步骤的执行结果,用于后续条件跳转判断
|
||||||
stepResults[strconv.FormatInt(step.StepId, 10)] = stepPassed
|
stepResults[step.StepId] = stepPassed
|
||||||
|
|
||||||
// ------------------------------------------------------------------------------------
|
// ------------------------------------------------------------------------------------
|
||||||
// 步骤4.5: 根据执行结果确定下一步骤(实现 DAG 的条件跳转)
|
// 步骤4.5: 根据执行结果确定下一步骤(实现 DAG 的条件跳转)
|
||||||
|
Loading…
Reference in New Issue
Block a user