From 0b21510c9fabd51144367850fc2b788c71f319b1 Mon Sep 17 00:00:00 2001 From: longpeng Date: Thu, 19 Jun 2025 23:24:23 +0800 Subject: [PATCH] Implement end-to-end Temporal Workflow with Go and Python integration for API and UI test execution --- server/activity.go | 3 -- server/activity/activity.go | 23 ++++++++++ server/workflow.go | 3 -- server/workflow/workflow.go | 91 +++++++++++++++++++++++++++++++++++++ worker/activities.py | 85 +++++++++++++++++++++++++++++++++- worker/api_tests.py | 50 +++++++++++++++++++- worker/main.py | 29 +++++++++++- worker/requirements.txt | 8 +++- worker/ui_tests.py | 89 +++++++++++++++++++++++++++++++++++- worker/utils.py | 21 ++++++++- 10 files changed, 390 insertions(+), 12 deletions(-) delete mode 100644 server/activity.go create mode 100644 server/activity/activity.go delete mode 100644 server/workflow.go create mode 100644 server/workflow/workflow.go diff --git a/server/activity.go b/server/activity.go deleted file mode 100644 index 56ba981..0000000 --- a/server/activity.go +++ /dev/null @@ -1,3 +0,0 @@ -package main - -// 定义 Temporal Activity 接口 diff --git a/server/activity/activity.go b/server/activity/activity.go new file mode 100644 index 0000000..a9f17a9 --- /dev/null +++ b/server/activity/activity.go @@ -0,0 +1,23 @@ +package activity + +// 定义 Temporal Activity 接口 +import ( + "beacon/server/gen/pb" // 替换为你的模块路径 + "context" +) + +// 定义活动接口,Python Worker 将实现这些接口 +// Temporal Go SDK 会在编译时通过 go-temporal 插件自动生成这些接口的实现桩 +// 使得你可以直接调用这些接口,而实际执行在 Python Worker 中。 + +// RunApiTest 是执行接口测试的活动 +func RunApiTest(ctx context.Context, req *pb.ApiTestRequest) (*pb.ApiTestResult, error) { + // 实际调用会被转发到 Python Worker + return nil, nil // Go 侧不需要实现,由 Temporal SDK 代理 +} + +// RunUiTest 是执行 UI 测试的活动 +func RunUiTest(ctx context.Context, req *pb.UiTestRequest) (*pb.UiTestResult, error) { + // 实际调用会被转发到 Python Worker + return nil, nil // Go 侧不需要实现,由 Temporal SDK 代理 +} diff --git a/server/workflow.go b/server/workflow.go deleted file mode 100644 index 93cbcdc..0000000 --- a/server/workflow.go +++ /dev/null @@ -1,3 +0,0 @@ -package main - -// 定义 Temporal Workflow diff --git a/server/workflow/workflow.go b/server/workflow/workflow.go new file mode 100644 index 0000000..76c1a4e --- /dev/null +++ b/server/workflow/workflow.go @@ -0,0 +1,91 @@ +package workflow + +// 定义 Temporal Workflow +import ( + "beacon/server/activity" + "beacon/server/gen/pb" + "fmt" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" + "time" +) + +// TestRunWorkflow 定义了整个测试执行的工作流 +func TestRunWorkflow(ctx workflow.Context, input *pb.TestRunInput) (*pb.TestRunOutput, error) { + logger := workflow.GetLogger(ctx) + logger.Info("TestRunWorkflow started", "runID", input.RunId) + + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Minute, // Activity 执行超时时间 + HeartbeatTimeout: 30 * time.Second, // Heartbeat 防止 Worker 假死 + RetryPolicy: &temporal.RetryPolicy{ // Activity 级别的重试策略 + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: time.Minute, + MaximumAttempts: 3, + NonRetryableErrorTypes: []string{"NonRetryableErrorType"}, // 自定义不可重试的错误 + }, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + var ( + apiResults []*pb.ApiTestResult + uiResults []*pb.UiTestResult + overallSuccess = true + completionMessage = "Test run completed successfully." + ) + + // 执行 API 测试 Activity + if input.RunApiTests { + apiTestInput := &pb.ApiTestRequest{ + TestCaseId: "api-example-1", + Endpoint: "/api/v1/data", + HttpMethod: "GET", + Headers: map[string]string{"Authorization": "Bearer token123"}, + ExpectedStatusCode: 200, + } + var apiRes pb.ApiTestResult + err := workflow.ExecuteActivity(ctx, activity.RunApiTest, apiTestInput).Get(ctx, &apiRes) + if err != nil { + logger.Error("API test activity failed", "error", err) + // 可以选择标记为失败,或者继续执行UI测试 + overallSuccess = false + apiRes.BaseResult.Success = false + apiRes.BaseResult.Message = fmt.Sprintf("API Test Failed: %v", err) + } + apiResults = append(apiResults, &apiRes) + } + + // 执行 UI 测试 Activity + if input.RunUiTests { + uiTestInput := &pb.UiTestRequest{ + TestCaseId: "ui-example-1", + UrlPath: "/dashboard", + BrowserType: "chromium", + Headless: true, + UserData: map[string]string{"user": "test", "pass": "password"}, + } + var uiRes pb.UiTestResult + err := workflow.ExecuteActivity(ctx, activity.RunUiTest, uiTestInput).Get(ctx, &uiRes) + if err != nil { + logger.Error("UI test activity failed", "error", err) + overallSuccess = false + uiRes.BaseResult.Success = false + uiRes.BaseResult.Message = fmt.Sprintf("UI Test Failed: %v", err) + } + uiResults = append(uiResults, &uiRes) + } + + if !overallSuccess { + completionMessage = "Test run completed with failures." + } + + logger.Info("TestRunWorkflow completed", "overallSuccess", overallSuccess) + return &pb.TestRunOutput{ + RunId: input.RunId, + OverallSuccess: overallSuccess, + CompletionMessage: completionMessage, + ApiResults: apiResults, + UiResults: uiResults, + }, nil +} diff --git a/worker/activities.py b/worker/activities.py index c6d435c..b04a7c6 100644 --- a/worker/activities.py +++ b/worker/activities.py @@ -1 +1,84 @@ -# 实现 Temporal Activity 逻辑 \ No newline at end of file +# 实现 Temporal Activity 逻辑 +import os +# 确保能导入 proto_gen 模块 +import sys +import time + +from temporalio.exceptions import ApplicationError +from temporalio.worker import activity + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'proto_gen'))) + +from gen import common_test_pb2 as pb +from api_tests import execute_api_test_case +from ui_tests import execute_ui_test_case +from utils import upload_file_to_s3 # 假设有这个函数 + + +@activity.defn +async def RunApiTest(req: pb.ApiTestRequest) -> pb.ApiTestResult: + """执行API测试的Temporal Activity实现""" + activity.logger.info(f"Received API Test Request: {req.test_case_id}") + start_time = time.time() + result = pb.ApiTestResult() + result.base_result.test_case_id = req.test_case_id + + try: + # 调用实际的API测试逻辑 + api_test_success, actual_status, response_body, log_output = execute_api_test_case( + req.test_case_id, req.endpoint, req.http_method, req.headers, req.request_body, req.expected_status_code + ) + + result.base_result.success = api_test_success + result.actual_status_code = actual_status + result.response_body = response_body.decode('utf-8') # 假设是UTF-8 + result.base_result.log_output = log_output + result.base_result.message = "API Test Passed" if api_test_success else "API Test Failed" + + except Exception as e: + activity.logger.error(f"API Test Failed for {req.test_case_id}: {e}") + result.base_result.success = False + result.base_result.message = f"API Test Error: {e}" + result.base_result.error_details = str(e) + # 如果是业务逻辑上的不可重试错误,可以抛出 ApplicationError + # raise ApplicationError("NonRetryableErrorType", details=str(e)) + + result.base_result.duration_seconds = time.time() - start_time + return result + + +@activity.defn +async def RunUiTest(req: pb.UiTestRequest) -> pb.UiTestResult: + """执行UI测试的Temporal Activity实现""" + activity.logger.info(f"Received UI Test Request: {req.test_case_id}") + start_time = time.time() + result = pb.UiTestResult() + result.base_result.test_case_id = req.test_case_id + + try: + # 调用实际的UI测试逻辑,返回本地文件路径 + ui_test_success, log_output, screenshot_path, html_report_path = await execute_ui_test_case( + req.test_case_id, req.url_path, req.browser_type, req.headless, req.user_data + ) + + result.base_result.success = ui_test_success + result.base_result.log_output = log_output + result.base_result.message = "UI Test Passed" if ui_test_success else "UI Test Failed" + + # 上传截图和报告到对象存储,并返回URL + if screenshot_path: + result.screenshot_url = await upload_file_to_s3(screenshot_path, f"screenshots/{req.test_case_id}.png") + os.remove(screenshot_path) # 清理本地文件 + if html_report_path: + result.html_report_url = await upload_file_to_s3(html_report_path, f"reports/{req.test_case_id}.html") + os.remove(html_report_path) # 清理本地文件 + + except Exception as e: + activity.logger.error(f"UI Test Failed for {req.test_case_id}: {e}") + result.base_result.success = False + result.base_result.message = f"UI Test Error: {e}" + result.base_result.error_details = str(e) + # 同样,可以抛出 ApplicationError + + result.base_result.duration_seconds = time.time() - start_time + return result diff --git a/worker/api_tests.py b/worker/api_tests.py index 5607c2f..d2c5403 100644 --- a/worker/api_tests.py +++ b/worker/api_tests.py @@ -1 +1,49 @@ -# 接口测试具体实现 \ No newline at end of file +# 接口测试具体实现 +import requests +import json + +def execute_api_test_case(test_case_id: str, endpoint: str, http_method: str, headers: dict, request_body: bytes, expected_status_code: int): + """ + 实际执行API测试的函数。 + 可以集成 pytest, requests 等库。 + """ + base_url = "http://localhost:8080" # 假设 API 服务的基地址 + + full_url = f"{base_url}{endpoint}" + log_output = [] + success = False + actual_status = 0 + response_body = b"" + + log_output.append(f"Executing API test: {test_case_id} - {http_method} {full_url}") + + try: + if http_method.upper() == "GET": + response = requests.get(full_url, headers=headers, timeout=10) + elif http_method.upper() == "POST": + response = requests.post(full_url, headers=headers, data=request_body, timeout=10) + # ... 其他 HTTP 方法 + else: + raise ValueError(f"Unsupported HTTP method: {http_method}") + + actual_status = response.status_code + response_body = response.content + + log_output.append(f"Response Status: {actual_status}") + log_output.append(f"Response Body: {response_body.decode('utf-8')[:500]}...") # 只显示前500字符 + + if actual_status == expected_status_code: + success = True + log_output.append("API Test PASSED: Status code matched.") + else: + success = False + log_output.append(f"API Test FAILED: Expected {expected_status_code}, got {actual_status}.") + + except requests.exceptions.RequestException as e: + log_output.append(f"API Test Failed (Request Exception): {e}") + success = False + except Exception as e: + log_output.append(f"API Test Failed (Unexpected Error): {e}") + success = False + + return success, actual_status, response_body, "\n".join(log_output) \ No newline at end of file diff --git a/worker/main.py b/worker/main.py index 0950274..13eea59 100644 --- a/worker/main.py +++ b/worker/main.py @@ -1 +1,28 @@ -# Python Worker 入口,注册并运行 Activity \ No newline at end of file +# Python Worker 入口,注册并运行 Activity +import asyncio +import os +import sys + +from temporalio.client import Client +from temporalio.worker import Worker + +# 确保能导入 proto_gen 模块 +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'proto_gen'))) + +from activities import RunApiTest, RunUiTest # 导入定义的 Activity + +async def main(): + # 连接 Temporal Server + client = await Client.connect("localhost:7233") # 根据你的 Temporal Server 配置 + + # 创建 Worker + worker = Worker( + client, + task_queue="test-task-queue", # 保持与 Go Client 一致 + activities=[RunApiTest, RunUiTest], + ) + print("Starting Python Temporal Worker...") + await worker.run() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/worker/requirements.txt b/worker/requirements.txt index a80f052..fd7c77a 100644 --- a/worker/requirements.txt +++ b/worker/requirements.txt @@ -1 +1,7 @@ -# Python 依赖 \ No newline at end of file +# Python 依赖 +temporalio[aiohttp] +protobuf +grpcio-tools # 用于生成 protobuf 代码 +requests # 用于API测试 +pytest # 测试框架 +playwright # UI自动化测试库,或使用 selenium \ No newline at end of file diff --git a/worker/ui_tests.py b/worker/ui_tests.py index e206773..58ad27e 100644 --- a/worker/ui_tests.py +++ b/worker/ui_tests.py @@ -1 +1,88 @@ -# UI 测试具体实现 (使用 Playwright) \ No newline at end of file +# UI 测试具体实现 (使用 Playwright) +import asyncio +from playwright.async_api import Playwright, async_playwright, expect +import os +import datetime + +async def execute_ui_test_case(test_case_id: str, url_path: str, browser_type: str, headless: bool, user_data: dict): + """ + 实际执行UI测试的函数。 + 这里使用 Playwright,你也可以替换成 Selenium。 + """ + base_url = "https://playwright.dev" # 假设 UI 测试的基地址 + full_url = f"{base_url}{url_path}" + + log_output = [] + success = False + screenshot_path = None + html_report_path = None # Playwright 默认生成HTML报告,通常在测试结束后生成 + + log_output.append(f"Executing UI test: {test_case_id} - {full_url} with {browser_type}") + + try: + async with async_playwright() as p: + browser = None + if browser_type == "chromium": + browser = await p.chromium.launch(headless=headless) + elif browser_type == "firefox": + browser = await p.firefox.launch(headless=headless) + elif browser_type == "webkit": + browser = await p.webkit.launch(headless=headless) + else: + raise ValueError(f"Unsupported browser type: {browser_type}") + + page = await browser.new_page() + + # 模拟登录(如果需要) + if user_data: + log_output.append(f"Attempting to log in with user: {user_data.get('user')}") + # 假设有一个登录页面 + await page.goto(f"{base_url}/login") + await page.fill('input[name="username"]', user_data.get('user', '')) + await page.fill('input[name="password"]', user_data.get('pass', '')) + await page.click('button[type="submit"]') + await page.wait_for_url(full_url) # 等待跳转到目标页面 + + await page.goto(full_url) + log_output.append(f"Navigated to: {full_url}") + + # 示例UI操作和断言 + # 查找一个元素并验证其文本 + element = page.locator("text=Playwright enables reliable end-to-end testing for modern web apps.") + await expect(element).to_be_visible() + log_output.append("Found expected text on page.") + + # 点击一个链接 + await page.click("text=Docs") + await page.wait_for_url("**/docs/intro") + log_output.append("Clicked 'Docs' link and navigated.") + + success = True + log_output.append("UI Test PASSED.") + + except Exception as e: + log_output.append(f"UI Test FAILED (Exception): {e}") + success = False + finally: + if page: + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + screenshot_filename = f"screenshot_{test_case_id}_{timestamp}.png" + screenshot_path = os.path.join("/tmp", screenshot_filename) # 临时保存路径 + await page.screenshot(path=screenshot_path) + log_output.append(f"Screenshot saved to: {screenshot_path}") + if browser: + await browser.close() + + return success, "\n".join(log_output), screenshot_path, html_report_path # html_report_path 留空,因为Playwright通常在测试套件结束后生成 + +# 辅助函数,模拟对象存储上传 +async def upload_file_to_s3(local_path: str, remote_path: str) -> str: + """ + 模拟将文件上传到S3(或任何对象存储)并返回可访问URL。 + 在实际项目中,这里会调用 AWS SDK, MinIO SDK 等。 + """ + # 模拟上传延迟 + await asyncio.sleep(0.1) + print(f"Mock Uploaded {local_path} to S3 bucket/path: {remote_path}") + # 返回一个模拟的URL + return f"https://your-s3-bucket.com/{remote_path}" diff --git a/worker/utils.py b/worker/utils.py index 01090f0..1a50670 100644 --- a/worker/utils.py +++ b/worker/utils.py @@ -1 +1,20 @@ -# 辅助函数 (例如截图、报告生成) \ No newline at end of file +# 辅助函数 (例如截图、报告生成) +import asyncio +import os + +# 辅助函数,模拟对象存储上传 +async def upload_file_to_s3(local_path: str, remote_path: str) -> str: + """ + 模拟将文件上传到S3(或任何对象存储)并返回可访问URL。 + 在实际项目中,这里会调用 AWS SDK, MinIO SDK 等。 + """ + if not os.path.exists(local_path): + return "" # 文件不存在则不上传 + + # 模拟上传延迟 + await asyncio.sleep(0.1) + print(f"Mock Uploaded {local_path} to S3 bucket/path: {remote_path}") + # 返回一个模拟的URL + return f"https://your-s3-bucket.com/{remote_path}" + +# 其他通用工具函数,例如日志格式化,报告生成辅助等。 \ No newline at end of file