From ae423361600105ea2f83e85c4315543e4dca9621 Mon Sep 17 00:00:00 2001 From: longpeng Date: Wed, 25 Jun 2025 14:49:45 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E9=85=8D=E7=BD=AE=E7=BB=93?= =?UTF-8?q?=E6=9E=84=EF=BC=8C=E6=96=B0=E5=A2=9E=20Temporal=20=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=94=AF=E6=8C=81=EF=BC=8C=E4=BC=98=E5=8C=96=20OpenAI?= =?UTF-8?q?=20=E9=85=8D=E7=BD=AE=E5=91=BD=E5=90=8D=EF=BC=8C=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E5=B7=A5=E4=BD=9C=E6=B5=81=E7=9B=B8=E5=85=B3=E8=B7=AF?= =?UTF-8?q?=E7=94=B1=E5=92=8C=E6=9C=8D=E5=8A=A1=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/main.go | 58 ------------------------- config/default.go | 10 ++++- main.go | 10 +++++ pkg/openai/openai.go | 8 ++-- routers/handlers/workflow.go | 76 +++++++++++++++++++++++++++++++++ routers/router.go | 14 +++++++ services/workflow.go | 79 +++++++++++++++++++++++++++++++++++ workers/go/main.go | 37 ---------------- workers/go/workers.go | 70 +++++++++++++++++++++++++++++++ workflows/dynamic_workflow.go | 4 ++ 10 files changed, 265 insertions(+), 101 deletions(-) delete mode 100644 client/main.go create mode 100644 routers/handlers/workflow.go create mode 100644 services/workflow.go delete mode 100644 workers/go/main.go create mode 100644 workers/go/workers.go diff --git a/client/main.go b/client/main.go deleted file mode 100644 index ab0a931..0000000 --- a/client/main.go +++ /dev/null @@ -1,58 +0,0 @@ -package client - -// Go 服务端入口,触发 Workflow -import ( - "beacon/pkg/pb" - "beacon/workflows" - "context" - "fmt" - "log" - - "github.com/google/uuid" - "go.temporal.io/sdk/client" -) - -func main() { - // 创建 Temporal 客户端 - c, err := client.Dial(client.Options{ - HostPort: "temporal.newai.day:17233", // 根据你的 Temporal Server 配置 - Namespace: "default", - }) - if err != nil { - log.Fatalf("Unable to create Temporal client: %v", err) - } - defer c.Close() - - // 模拟一个触发测试的事件 (例如来自 Web UI 或 CI/CD) - runID := uuid.New().String() - testInput := &pb.DynamicTestRunInput{ - RunId: runID, - CompositeCaseId: "11", - } - - workflowOptions := client.StartWorkflowOptions{ - ID: "test_workflow_" + runID, - TaskQueue: "test-task-queue", // 保持与 Python Worker 一致 - } - - fmt.Printf("Starting TestRunWorkflow for run ID: %s\n", runID) - we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflows.DynamicTestSuiteWorkflow, testInput) - if err != nil { - log.Fatalf("Unable to execute workflows: %v", err) - } - - fmt.Printf("Workflow started. Workflow ID: %s, Run ID: %s\n", we.GetID(), we.GetRunID()) - - // 等待 Workflow 完成并获取结果 - var result pb.TestRunOutput - err = we.Get(context.Background(), &result) - if err != nil { - log.Fatalf("Unable to get workflows result: %v", err) - } - - fmt.Printf("Workflow finished. Overall Success: %t, Message: %s\n", result.OverallSuccess, result.CompletionMessage) - fmt.Printf("API Test Results: %+v\n", result.ApiResults) - fmt.Printf("UI Test Results: %+v\n", result.UiResults) - - // 后续可以根据 result 生成报告、发送通知等 -} diff --git a/config/default.go b/config/default.go index ef0c7bd..136f403 100755 --- a/config/default.go +++ b/config/default.go @@ -38,7 +38,8 @@ type SrvConfig struct { *ApolloConfig `mapstructure:"apollo"` *Registrar `mapstructure:"registrar"` *RcloneConfig `mapstructure:"rclone"` - *OpenAI `mapstructure:"openai"` + *OpenAIConfig `mapstructure:"openai"` + *TemporalConfig `mapstructure:"temporal"` // Temporal配置 } type LogConfig struct { @@ -113,13 +114,18 @@ type RcloneConfig struct { CustomDomains string `mapstructure:"custom_domains"` // 自定义域 } -type OpenAI struct { +type OpenAIConfig struct { BaseURL string `mapstructure:"base_url"` ApiKey string `mapstructure:"api_key"` Model string `mapstructure:"model"` Prompt string `mapstructure:"prompt"` } +type TemporalConfig struct { + Host string `mapstructure:"host"` // Temporal服务地址 + Port int `mapstructure:"port"` // Temporal服务端口 +} + // Init 整个服务配置文件初始化的方法 func Init(yamlContent string) (err error) { // 方式1:直接指定配置文件路径(相对路径或者绝对路径) diff --git a/main.go b/main.go index 4c59e8e..bc217a7 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "beacon/pkg/logger" "beacon/pkg/validator" "beacon/routers" + workers "beacon/workers/go" "flag" "fmt" "os" @@ -60,6 +61,15 @@ func Run() { zap.L().Info(fmt.Sprintf("Serving Gin on %s:%d", config.Conf.IP, config.Conf.Port)) zap.L().Info("service start...") + go func() { + // 启动 Temporal Worker + err := workers.StartWorkflow(config.Conf.TemporalConfig) + if err != nil { + zap.L().Error("启动 Temporal Worker 失败", zap.Error(err)) + os.Exit(1) // 如果启动 Temporal Worker 失败,退出程序 + } + }() + // 接收操作系统发来的中断信号 QuitChan := make(chan os.Signal) signal.Notify(QuitChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) // kill -15 CTRL+C kill -9 diff --git a/pkg/openai/openai.go b/pkg/openai/openai.go index 640a0a4..f3c51e5 100644 --- a/pkg/openai/openai.go +++ b/pkg/openai/openai.go @@ -32,8 +32,8 @@ func encodeImageToBase64(file *multipart.FileHeader) (string, error) { func GetOllamaAICaptcha(file *multipart.FileHeader) (error, string) { client := openai.NewClient( - option.WithAPIKey(config.Conf.OpenAI.ApiKey), // defaults to os.LookupEnv("OPENAI_API_KEY") - option.WithBaseURL(config.Conf.OpenAI.BaseURL), + option.WithAPIKey(config.Conf.OpenAIConfig.ApiKey), // defaults to os.LookupEnv("OPENAI_API_KEY") + option.WithBaseURL(config.Conf.OpenAIConfig.BaseURL), ) base64Image, _ := encodeImageToBase64(file) @@ -49,7 +49,7 @@ func GetOllamaAICaptcha(file *multipart.FileHeader) (error, string) { }, openai.ChatCompletionContentPartUnionParam{ OfText: &openai.ChatCompletionContentPartTextParam{ - Text: config.Conf.OpenAI.Prompt, + Text: config.Conf.OpenAIConfig.Prompt, }, }, ) @@ -57,7 +57,7 @@ func GetOllamaAICaptcha(file *multipart.FileHeader) (error, string) { Messages: []openai.ChatCompletionMessageParamUnion{ openai.UserMessage(message), }, - Model: config.Conf.OpenAI.Model, + Model: config.Conf.OpenAIConfig.Model, }) if err != nil { return err, "" diff --git a/routers/handlers/workflow.go b/routers/handlers/workflow.go new file mode 100644 index 0000000..bdba956 --- /dev/null +++ b/routers/handlers/workflow.go @@ -0,0 +1,76 @@ +package handlers + +import ( + "beacon/services" + "github.com/gin-gonic/gin" +) + +type WorkflowHandler struct { + service *services.WorkflowService +} + +func NewWorkflowHandler() *WorkflowHandler { + return &WorkflowHandler{ + service: &services.WorkflowService{}, + } +} + +// StartWorkflow POST /api/workflow/start +func (h *WorkflowHandler) StartWorkflow(c *gin.Context) { + + err := h.service.Start("11") + if err != nil { + c.JSON(500, gin.H{ + "code": -1, + "message": "Failed to start workflow", + }) + return + } + c.JSON(200, gin.H{ + "message": "Workflow started successfully", + }) +} + +// StopWorkflow POST /api/workflow/stop +func (h *WorkflowHandler) StopWorkflow(c *gin.Context) { + // 停止工作流的逻辑 + // 这里可以调用 Temporal 的 API 来停止指定的工作流 + // 例如,使用 WorkflowID 或 RunID 来停止工作流 + c.JSON(200, gin.H{ + "message": "Workflow stopped successfully", + }) +} + +// GetWorkflowStatus GET /api/workflow/status/{workflowID} +func (h *WorkflowHandler) GetWorkflowStatus(c *gin.Context) { + workflowID := c.Param("workflowID") + if workflowID == "" { + c.JSON(400, gin.H{"error": "Workflow ID is required"}) + return + } + + status, err := h.service.GetWorkflowStatus(workflowID) + if err != nil { + c.JSON(500, gin.H{"error": "Failed to get workflow status", "details": err.Error()}) + return + } + + c.JSON(200, gin.H{"status": status}) +} + +// GetWorkflowResults GET /api/workflow/results/{workflowID} +func (h *WorkflowHandler) GetWorkflowResults(c *gin.Context) { + workflowID := c.Param("workflowID") + if workflowID == "" { + c.JSON(400, gin.H{"error": "Workflow ID is required"}) + return + } + + results, err := h.service.GetWorkflowResults(workflowID) + if err != nil { + c.JSON(500, gin.H{"error": "Failed to get workflow results", "details": err.Error()}) + return + } + + c.JSON(200, gin.H{"results": results}) +} diff --git a/routers/router.go b/routers/router.go index 4d70186..d793d3d 100644 --- a/routers/router.go +++ b/routers/router.go @@ -86,6 +86,7 @@ func Init() *gin.Engine { // 路由分组 v1 := r.Group(config.Conf.Version) SetupCompositeCaseRoutes(v1) + SetupWorkflowRoutes(v1) return r } @@ -104,3 +105,16 @@ func SetupCompositeCaseRoutes(group *gin.RouterGroup) { api.DELETE("/composite-cases/:id", handler.DeleteCompositeCase) } } + +func SetupWorkflowRoutes(group *gin.RouterGroup) { + // 初始化服务和处理器 + handler := *handlers.NewWorkflowHandler() + + api := group.Group("/api") + { + // 工作流相关路由 + api.POST("/workflows/start", handler.StartWorkflow) + api.GET("/workflows/:id", handler.GetWorkflowStatus) + api.GET("/workflows/:id/results", handler.GetWorkflowResults) + } +} diff --git a/services/workflow.go b/services/workflow.go new file mode 100644 index 0000000..f7670ba --- /dev/null +++ b/services/workflow.go @@ -0,0 +1,79 @@ +package services + +// Go 服务端入口,触发 Workflow +import ( + "beacon/pkg/pb" + "beacon/workflows" + "context" + "github.com/google/uuid" + "go.temporal.io/sdk/client" + "go.uber.org/zap" +) + +type WorkflowService struct { +} + +func (s *WorkflowService) Start(compositeCaseId string) error { + // 创建 Temporal 客户端 + c, err := client.Dial(client.Options{ + HostPort: "temporal.newai.day:17233", // 根据你的 Temporal Server 配置 + Namespace: "default", + }) + if err != nil { + zap.L().Error("Unable to create Temporal client", zap.Error(err)) + return err + } + defer c.Close() + + // 模拟一个触发测试的事件 (例如来自 Web UI 或 CI/CD) + runID := uuid.New().String() + testInput := &pb.DynamicTestRunInput{ + RunId: runID, + CompositeCaseId: compositeCaseId, + } + + workflowOptions := client.StartWorkflowOptions{ + ID: "test_workflow_" + runID, + TaskQueue: "test-task-queue", // 保持与 Python Worker 一致 + } + + zap.L().Info("Starting TestRunWorkflow", zap.String("runID", runID)) + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflows.DynamicTestSuiteWorkflow, testInput) + if err != nil { + zap.L().Error("Unable to execute workflows", zap.Error(err)) + return err + } + + zap.L().Info("Workflow started", zap.String("workflowID", we.GetID()), zap.String("runID", we.GetRunID())) + + // 等待 Workflow 完成并获取结果 + var result pb.TestRunOutput + err = we.Get(context.Background(), &result) + if err != nil { + zap.L().Error("Unable to get workflows result", zap.Error(err)) + return err + } + + zap.L().Info("Workflow finished", zap.Bool("OverallSuccess", result.OverallSuccess), zap.String("Message", result.CompletionMessage)) + zap.L().Debug("API Test Results", zap.Any("ApiResults", result.ApiResults)) + zap.L().Debug("UI Test Results", zap.Any("UiResults", result.UiResults)) + + // 后续可以根据 result 生成报告、发送通知等 + return nil +} + +func (s *WorkflowService) GetWorkflowResults(workflowID string) (status string, err error) { + // 这里可以实现获取工作流结果的逻辑 + // 例如,查询 Temporal Server 获取指定工作流的结果 + // 可以使用 WorkflowID 或 RunID 来查询 + zap.L().Debug("GetWorkflowResults", zap.Any("workflowID", workflowID)) + return +} + +func (s *WorkflowService) GetWorkflowStatus(workflowID string) (status string, err error) { + // 这里可以实现获取工作流状态的逻辑 + // 例如,查询 Temporal Server 获取指定工作流的状态 + // 可以使用 WorkflowID 或 RunID 来查询 + zap.L().Debug("GetWorkflowStatus", zap.Any("workflowID", workflowID)) + return +} diff --git a/workers/go/main.go b/workers/go/main.go deleted file mode 100644 index ddb99af..0000000 --- a/workers/go/main.go +++ /dev/null @@ -1,37 +0,0 @@ -package main - -import ( - "beacon/activities" - "beacon/workflows" - "go.temporal.io/sdk/client" - "go.temporal.io/sdk/worker" - "log" -) - -func main() { - // 创建使用默认选项的 Temporal 客户端。 - c, err := client.Dial(client.Options{HostPort: "temporal.newai.day:17233"}) - if err != nil { - log.Fatalln("Unable to create Temporal client", err) - } - defer c.Close() - - taskQueue := "test-task-queue" - - // 创建一个监听指定任务队列的 Worker。 - w := worker.New(c, taskQueue, worker.Options{}) - - // 将工作流和带有真实 Go 后缀的活动注册到 Worker。 - w.RegisterWorkflow(workflows.TestRunWorkflow) - w.RegisterWorkflow(workflows.DynamicTestSuiteWorkflow) - w.RegisterActivity(activities.LoadCompositeCaseSteps) - //(for later) w.RegisterActivity(activities.AddSuffixActivity) - - // 注意:Python 和 ts 活动将由 Python/ts 进程处理,此处未进行注册。 - - // 启动 Worker。此调用会阻塞,直到 Worker 被中断。 - err = w.Run(worker.InterruptCh()) - if err != nil { - log.Fatalln("Unable to start Worker", err) - } -} diff --git a/workers/go/workers.go b/workers/go/workers.go new file mode 100644 index 0000000..39b86f4 --- /dev/null +++ b/workers/go/workers.go @@ -0,0 +1,70 @@ +package workers + +import ( + "beacon/activities" + "beacon/config" + "beacon/workflows" + "fmt" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.uber.org/zap" + "log" +) + +func StartWorkflow(cnf *config.TemporalConfig) error { + // 创建使用默认选项的 Temporal 客户端。 + c, err := client.Dial(client.Options{HostPort: fmt.Sprintf("%s:%d", cnf.Host, cnf.Port)}) + if err != nil { + log.Fatalln("Unable to create Temporal client", err) + } + defer c.Close() + + taskQueue := "test-task-queue" + + // 创建一个监听指定任务队列的 Worker。 + w := worker.New(c, taskQueue, worker.Options{}) + + // 将工作流和带有真实 Go 后缀的活动注册到 Worker。 + w.RegisterWorkflow(workflows.TestRunWorkflow) + w.RegisterWorkflow(workflows.DynamicTestSuiteWorkflow) + w.RegisterActivity(activities.LoadCompositeCaseSteps) + //(for later) w.RegisterActivity(activities.AddSuffixActivity) + + // 注意:Python 和 ts 活动将由 Python/ts 进程处理,此处未进行注册。 + + // 启动 Worker。此调用会阻塞,直到 Worker 被中断。 + err = w.Run(worker.InterruptCh()) + if err != nil { + zap.L().Error(fmt.Sprintf("Unable to start worker: %v", err)) + } + return nil +} + +/*func main() { + // 创建使用默认选项的 Temporal 客户端。 + c, err := client.Dial(client.Options{HostPort: "temporal.newai.day:17233"}) + if err != nil { + log.Fatalln("Unable to create Temporal client", err) + } + defer c.Close() + + taskQueue := "test-task-queue" + + // 创建一个监听指定任务队列的 Worker。 + w := worker.New(c, taskQueue, worker.Options{}) + + // 将工作流和带有真实 Go 后缀的活动注册到 Worker。 + w.RegisterWorkflow(workflows.TestRunWorkflow) + w.RegisterWorkflow(workflows.DynamicTestSuiteWorkflow) + w.RegisterActivity(activities.LoadCompositeCaseSteps) + //(for later) w.RegisterActivity(activities.AddSuffixActivity) + + // 注意:Python 和 ts 活动将由 Python/ts 进程处理,此处未进行注册。 + + // 启动 Worker。此调用会阻塞,直到 Worker 被中断。 + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start Worker", err) + } +} +*/ diff --git a/workflows/dynamic_workflow.go b/workflows/dynamic_workflow.go index da0debe..dbb4ec2 100644 --- a/workflows/dynamic_workflow.go +++ b/workflows/dynamic_workflow.go @@ -2,7 +2,9 @@ package workflows import ( "encoding/json" + "fmt" "go.temporal.io/sdk/temporal" + "reflect" "sort" "strconv" "time" @@ -98,6 +100,8 @@ func DynamicTestSuiteWorkflow(ctx workflow.Context, input *pb.DynamicTestRunInpu switch step.ActivityName { case "RunApiTest": // API 测试:创建 API 测试请求结构并解析参数 + logger.Info("Running api test activity", "ParametersJson", step.ParametersJson) + fmt.Println(reflect.TypeOf(step.ParametersJson)) apiReq := &pb.ApiTestRequest{} if err := json.Unmarshal([]byte(step.ParametersJson), apiReq); err != nil { logger.Error("Failed to unmarshal API test parameters", "error", err)