diff --git a/server/activities/activities.go b/server/activities/activities.go new file mode 100644 index 0000000..5d3bd02 --- /dev/null +++ b/server/activities/activities.go @@ -0,0 +1,19 @@ +package activities + +import ( + "context" + "fmt" + "math/rand" +) + +// AddSuffixActivity appends a fixed suffix to the input data. +func AddSuffixActivity(ctx context.Context, data string) (string, error) { + suffixes := []string{ + "-one", "-two", "-three", "-four", "-five", + "-six", "-seven", "-eight", "-nine", "-ten", + } + suffix := suffixes[rand.Intn(len(suffixes))] + result := fmt.Sprintf("%s%s", data, suffix) + fmt.Println("Go Activity: Modified data to:", result) + return result, nil +} diff --git a/server/activity/activity.go b/server/activity/activity.go deleted file mode 100644 index a9f17a9..0000000 --- a/server/activity/activity.go +++ /dev/null @@ -1,23 +0,0 @@ -package activity - -// 定义 Temporal Activity 接口 -import ( - "beacon/server/gen/pb" // 替换为你的模块路径 - "context" -) - -// 定义活动接口,Python Worker 将实现这些接口 -// Temporal Go SDK 会在编译时通过 go-temporal 插件自动生成这些接口的实现桩 -// 使得你可以直接调用这些接口,而实际执行在 Python Worker 中。 - -// RunApiTest 是执行接口测试的活动 -func RunApiTest(ctx context.Context, req *pb.ApiTestRequest) (*pb.ApiTestResult, error) { - // 实际调用会被转发到 Python Worker - return nil, nil // Go 侧不需要实现,由 Temporal SDK 代理 -} - -// RunUiTest 是执行 UI 测试的活动 -func RunUiTest(ctx context.Context, req *pb.UiTestRequest) (*pb.UiTestResult, error) { - // 实际调用会被转发到 Python Worker - return nil, nil // Go 侧不需要实现,由 Temporal SDK 代理 -} diff --git a/server/client/main.go b/server/client/main.go new file mode 100644 index 0000000..56df608 --- /dev/null +++ b/server/client/main.go @@ -0,0 +1,49 @@ +package main + +import ( + "beacon/server/workflows" + "context" + "fmt" + "github.com/google/uuid" + "go.temporal.io/sdk/client" + "log" + "os" +) + +func main() { + if len(os.Args) < 2 { + log.Fatalln("Usage: go run main.go ") + } + inputData := os.Args[1] + + // Create a Temporal client using the new Dial method. + c, err := client.Dial(client.Options{HostPort: "temporal.newai.day:17233"}) + if err != nil { + log.Fatalln("Unable to create Temporal client", err) + } + defer c.Close() + + // Generate a unique Workflow ID. + workflowID := "data-processing-workflow-" + uuid.New().String() + + // Set up Workflow start options. + workflowOptions := client.StartWorkflowOptions{ + ID: workflowID, + TaskQueue: "data-processing-task-queue", + } + + // Start the Workflow. + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflows.TestRunWorkflow, inputData) + if err != nil { + log.Fatalln("Unable to execute Workflow", err) + } + + // Wait for the Workflow to complete and get the result. + var result string + err = we.Get(context.Background(), &result) + if err != nil { + log.Fatalln("Unable to get Workflow result", err) + } + + fmt.Println("Processed Data:", result) +} diff --git a/server/main.go b/server/main.go deleted file mode 100644 index ed56d50..0000000 --- a/server/main.go +++ /dev/null @@ -1,61 +0,0 @@ -package main - -// Go 服务端入口,触发 Workflow -import ( - "beacon/server/gen/pb" - "beacon/server/workflow" - "context" - "fmt" - "log" - - "github.com/google/uuid" - "go.temporal.io/sdk/client" -) - -func main() { - // 创建 Temporal 客户端 - c, err := client.Dial(client.Options{ - HostPort: "temporal.newai.day:17233", // 根据你的 Temporal Server 配置 - Namespace: "default", - }) - if err != nil { - log.Fatalf("Unable to create Temporal client: %v", err) - } - defer c.Close() - - // 模拟一个触发测试的事件 (例如来自 Web UI 或 CI/CD) - runID := uuid.New().String() - testInput := &pb.TestRunInput{ - RunId: runID, - EnvironmentUrl: "https://example.com", - Tags: []string{"smoke", "critical"}, - RunApiTests: true, - RunUiTests: true, - } - - workflowOptions := client.StartWorkflowOptions{ - ID: "test_workflow_" + runID, - TaskQueue: "test-task-queue", // 保持与 Python Worker 一致 - } - - fmt.Printf("Starting TestRunWorkflow for run ID: %s\n", runID) - we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflow.TestRunWorkflow, testInput) - if err != nil { - log.Fatalf("Unable to execute workflow: %v", err) - } - - fmt.Printf("Workflow started. Workflow ID: %s, Run ID: %s\n", we.GetID(), we.GetRunID()) - - // 等待 Workflow 完成并获取结果 - var result pb.TestRunOutput - err = we.Get(context.Background(), &result) - if err != nil { - log.Fatalf("Unable to get workflow result: %v", err) - } - - fmt.Printf("Workflow finished. Overall Success: %t, Message: %s\n", result.OverallSuccess, result.CompletionMessage) - fmt.Printf("API Test Results: %+v\n", result.ApiResults) - fmt.Printf("UI Test Results: %+v\n", result.UiResults) - - // 后续可以根据 result 生成报告、发送通知等 -} diff --git a/server/workflow/workflow.go b/server/workflow/workflow.go deleted file mode 100644 index 76c1a4e..0000000 --- a/server/workflow/workflow.go +++ /dev/null @@ -1,91 +0,0 @@ -package workflow - -// 定义 Temporal Workflow -import ( - "beacon/server/activity" - "beacon/server/gen/pb" - "fmt" - "go.temporal.io/sdk/temporal" - "go.temporal.io/sdk/workflow" - "time" -) - -// TestRunWorkflow 定义了整个测试执行的工作流 -func TestRunWorkflow(ctx workflow.Context, input *pb.TestRunInput) (*pb.TestRunOutput, error) { - logger := workflow.GetLogger(ctx) - logger.Info("TestRunWorkflow started", "runID", input.RunId) - - ao := workflow.ActivityOptions{ - StartToCloseTimeout: 10 * time.Minute, // Activity 执行超时时间 - HeartbeatTimeout: 30 * time.Second, // Heartbeat 防止 Worker 假死 - RetryPolicy: &temporal.RetryPolicy{ // Activity 级别的重试策略 - InitialInterval: time.Second, - BackoffCoefficient: 2.0, - MaximumInterval: time.Minute, - MaximumAttempts: 3, - NonRetryableErrorTypes: []string{"NonRetryableErrorType"}, // 自定义不可重试的错误 - }, - } - ctx = workflow.WithActivityOptions(ctx, ao) - - var ( - apiResults []*pb.ApiTestResult - uiResults []*pb.UiTestResult - overallSuccess = true - completionMessage = "Test run completed successfully." - ) - - // 执行 API 测试 Activity - if input.RunApiTests { - apiTestInput := &pb.ApiTestRequest{ - TestCaseId: "api-example-1", - Endpoint: "/api/v1/data", - HttpMethod: "GET", - Headers: map[string]string{"Authorization": "Bearer token123"}, - ExpectedStatusCode: 200, - } - var apiRes pb.ApiTestResult - err := workflow.ExecuteActivity(ctx, activity.RunApiTest, apiTestInput).Get(ctx, &apiRes) - if err != nil { - logger.Error("API test activity failed", "error", err) - // 可以选择标记为失败,或者继续执行UI测试 - overallSuccess = false - apiRes.BaseResult.Success = false - apiRes.BaseResult.Message = fmt.Sprintf("API Test Failed: %v", err) - } - apiResults = append(apiResults, &apiRes) - } - - // 执行 UI 测试 Activity - if input.RunUiTests { - uiTestInput := &pb.UiTestRequest{ - TestCaseId: "ui-example-1", - UrlPath: "/dashboard", - BrowserType: "chromium", - Headless: true, - UserData: map[string]string{"user": "test", "pass": "password"}, - } - var uiRes pb.UiTestResult - err := workflow.ExecuteActivity(ctx, activity.RunUiTest, uiTestInput).Get(ctx, &uiRes) - if err != nil { - logger.Error("UI test activity failed", "error", err) - overallSuccess = false - uiRes.BaseResult.Success = false - uiRes.BaseResult.Message = fmt.Sprintf("UI Test Failed: %v", err) - } - uiResults = append(uiResults, &uiRes) - } - - if !overallSuccess { - completionMessage = "Test run completed with failures." - } - - logger.Info("TestRunWorkflow completed", "overallSuccess", overallSuccess) - return &pb.TestRunOutput{ - RunId: input.RunId, - OverallSuccess: overallSuccess, - CompletionMessage: completionMessage, - ApiResults: apiResults, - UiResults: uiResults, - }, nil -} diff --git a/server/workflows/workflow.go b/server/workflows/workflow.go new file mode 100644 index 0000000..fa1202d --- /dev/null +++ b/server/workflows/workflow.go @@ -0,0 +1,52 @@ +package workflows + +// 定义 Temporal Workflow +import ( + "beacon/server/activities" + "fmt" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" + "strings" + "time" +) + +// TestRunWorkflow 定义了整个测试执行的工作流 +func TestRunWorkflow(ctx workflow.Context, data string) (string, error) { + logger := workflow.GetLogger(ctx) + retryPolicy := &temporal.RetryPolicy{ + InitialInterval: time.Second, // First retry after 1 second + BackoffCoefficient: 2.0, // Double the wait time on each retry (1s → 2s → 4s → 8s, etc.) + MaximumInterval: 100 * time.Second, // Cap wait time at 100 seconds + MaximumAttempts: 50, // Retry up to 5 times before giving up + } + + // Step 1: Add a prefix(Python) + pythonCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + TaskQueue: "python-task-queue", + StartToCloseTimeout: time.Minute, + RetryPolicy: retryPolicy, + }) + + var prefixed string + if err := workflow.ExecuteActivity(pythonCtx, "PythonAddRandomPrefixActivity", data).Get(pythonCtx, &prefixed); err != nil { + return "", fmt.Errorf("failed to add prefix: %w", err) + } + + goCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, //activity must complete within 1 minute + RetryPolicy: retryPolicy, + }) + + var suffixed string + err := workflow.ExecuteActivity(goCtx, activities.AddSuffixActivity, prefixed).Get(goCtx, &suffixed) + if err != nil { + return "", fmt.Errorf("failed to add suffix: %w", err) + } + + var processed string + // Demo Activity 3: Simulate uppercasing the data. + logger.Info("Demo Activity 3: Pretending to uppercase the data:", data) + processed = strings.ToUpper(suffixed) + + return processed, nil +} diff --git a/workers/go/main.go b/workers/go/main.go new file mode 100644 index 0000000..5694225 --- /dev/null +++ b/workers/go/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "beacon/server/activities" + "beacon/server/workflows" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "log" +) + +func main() { + // Create a Temporal client with the default options. + c, err := client.Dial(client.Options{HostPort: "temporal.newai.day:17233"}) + if err != nil { + log.Fatalln("Unable to create Temporal client", err) + } + defer c.Close() + + taskQueue := "data-processing-task-queue" + + // Create a Worker that listens on the specified Task Queue. + w := worker.New(c, taskQueue, worker.Options{}) + + // Register the Workflow and the real Go suffix Activity with the Worker. + w.RegisterWorkflow(workflows.TestRunWorkflow) + w.RegisterActivity(activities.AddSuffixActivity) + //(for later) w.RegisterActivity(activities.AddSuffixActivity) + + // Start the Worker. This call blocks until the Worker is interrupted. + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start Worker", err) + } +} diff --git a/workers/python/__init__.py b/workers/python/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/worker/activities.py b/workers/python/activities.py similarity index 88% rename from worker/activities.py rename to workers/python/activities.py index 0715899..cda7f8f 100644 --- a/worker/activities.py +++ b/workers/python/activities.py @@ -2,6 +2,7 @@ import os import sys import time +import random from temporalio import activity @@ -15,7 +16,7 @@ from utils import upload_file_to_s3, scalar_map_to_dict class TestActivities: - @activity.defn + @activity.defn(name="run_api_test") async def run_api_test(self,req: pb.ApiTestRequest) -> pb.ApiTestResult: """执行API测试的Temporal Activity实现""" activity.logger.info(f"Received API Test Request: {req.test_case_id}") @@ -46,7 +47,7 @@ class TestActivities: return result - @activity.defn + @activity.defn(name="run_ui_test") async def run_ui_test(self,req: pb.UiTestRequest) -> pb.UiTestResult: """执行UI测试的Temporal Activity实现""" activity.logger.info(f"Received UI Test Request: {req.test_case_id}") @@ -79,4 +80,13 @@ class TestActivities: result.base_result.error_details = str(e) result.base_result.duration_seconds = time.time() - start_time - return result \ No newline at end of file + return result + + @activity.defn(name="PythonAddRandomPrefixActivity") + async def python_add_random_prefix_activity(self,data: str) -> str: + prefixes = [ + "alpha-", "beta-", "gamma-", "delta-", "epsilon-", + "zeta-", "eta-", "theta-", "iota-", "kappa-", + ] + prefix = random.choice(prefixes) + return f"{prefix}{data}" \ No newline at end of file diff --git a/worker/gen/__init__.py b/workers/python/gen/__init__.py similarity index 100% rename from worker/gen/__init__.py rename to workers/python/gen/__init__.py diff --git a/worker/main.py b/workers/python/main.py similarity index 68% rename from worker/main.py rename to workers/python/main.py index d3178a6..d036354 100644 --- a/worker/main.py +++ b/workers/python/main.py @@ -14,15 +14,16 @@ from activities import TestActivities # 导入定义的 Activity async def main(): # 连接 Temporal Server client = await Client.connect("temporal.newai.day:17233", namespace="default") # 根据你的 Temporal Server 配置 - + print("Python Worker: Successfully connected to Temporal Server!") activities = TestActivities() + task_queue = "python-task-queue" # 创建 Worker worker = Worker( client, - task_queue="test-task-queue", # 保持与 Go Client 一致 - activities=[activities.run_ui_test, activities.run_api_test] + task_queue=task_queue, # 保持与 Go Client 一致 + activities=[activities.run_api_test,activities.python_add_random_prefix_activity] ) - print("Starting Python Temporal Worker...") + print("Starting Python Temporal Worker...",task_queue) await worker.run() if __name__ == "__main__": diff --git a/worker/requirements.txt b/workers/python/requirements.txt similarity index 100% rename from worker/requirements.txt rename to workers/python/requirements.txt diff --git a/worker/ui_tests.py b/workers/python/ui_tests.py similarity index 100% rename from worker/ui_tests.py rename to workers/python/ui_tests.py diff --git a/worker/utils.py b/workers/python/utils.py similarity index 100% rename from worker/utils.py rename to workers/python/utils.py