diff --git a/go.mod b/go.mod index f9f9d99..2619eae 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.4 require ( github.com/google/uuid v1.6.0 go.temporal.io/sdk v1.34.0 + google.golang.org/protobuf v1.36.5 ) require ( @@ -28,6 +29,5 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect google.golang.org/grpc v1.66.0 // indirect - google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/server/main.go b/server/main.go index 96a2b3f..ed56d50 100644 --- a/server/main.go +++ b/server/main.go @@ -15,7 +15,8 @@ import ( func main() { // 创建 Temporal 客户端 c, err := client.Dial(client.Options{ - HostPort: "temporal.newai.day:17233", // 根据你的 Temporal Server 配置 + HostPort: "temporal.newai.day:17233", // 根据你的 Temporal Server 配置 + Namespace: "default", }) if err != nil { log.Fatalf("Unable to create Temporal client: %v", err) diff --git a/worker/activities.py b/worker/activities.py index f3e1b8c..0715899 100644 --- a/worker/activities.py +++ b/worker/activities.py @@ -1,86 +1,82 @@ # 实现 Temporal Activity 逻辑 import os -# 确保能导入 proto_gen 模块 import sys import time from temporalio import activity -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'proto_gen'))) - +# 确保能导入 gen 模块 +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'gen'))) +# 全局变量来存储 protobuf 模块 from gen import common_test_pb2 as pb from api_tests import execute_api_test_case from ui_tests import execute_ui_test_case -from utils import upload_file_to_s3 # 假设有这个函数 - -from worker.utils import scalar_map_to_dict +from utils import upload_file_to_s3, scalar_map_to_dict -@activity.defn -async def RunApiTest(req: pb.ApiTestRequest) -> pb.ApiTestResult: - """执行API测试的Temporal Activity实现""" - activity.logger.info(f"Received API Test Request: {req.test_case_id}") - start_time = time.time() - result = pb.ApiTestResult() - result.base_result.test_case_id = req.test_case_id +class TestActivities: + @activity.defn + async def run_api_test(self,req: pb.ApiTestRequest) -> pb.ApiTestResult: + """执行API测试的Temporal Activity实现""" + activity.logger.info(f"Received API Test Request: {req.test_case_id}") + start_time = time.time() + result = pb.ApiTestResult() + result.base_result.test_case_id = req.test_case_id - try: - # 调用实际的API测试逻辑 - api_test_success, actual_status, response_body, log_output = execute_api_test_case( - req.test_case_id, req.endpoint, req.http_method, scalar_map_to_dict(req.headers), req.request_body, - req.expected_status_code - ) + try: + # 调用实际的API测试逻辑 + api_test_success, actual_status, response_body, log_output = execute_api_test_case( + req.test_case_id, req.endpoint, req.http_method, scalar_map_to_dict(req.headers), req.request_body, + req.expected_status_code + ) - result.base_result.success = api_test_success - result.actual_status_code = actual_status - result.response_body = response_body.decode('utf-8') # 假设是UTF-8 - result.base_result.log_output = log_output - result.base_result.message = "API Test Passed" if api_test_success else "API Test Failed" + result.base_result.success = api_test_success + result.actual_status_code = actual_status + result.response_body = response_body.decode('utf-8') if isinstance(response_body, bytes) else str(response_body) + result.base_result.log_output = log_output + result.base_result.message = "API Test Passed" if api_test_success else "API Test Failed" - except Exception as e: - activity.logger.error(f"API Test Failed for {req.test_case_id}: {e}") - result.base_result.success = False - result.base_result.message = f"API Test Error: {e}" - result.base_result.error_details = str(e) - # 如果是业务逻辑上的不可重试错误,可以抛出 ApplicationError - # raise ApplicationError("NonRetryableErrorType", details=str(e)) + except Exception as e: + activity.logger.error(f"API Test Failed for {req.test_case_id}: {e}") + result.base_result.success = False + result.base_result.message = f"API Test Error: {e}" + result.base_result.error_details = str(e) - result.base_result.duration_seconds = time.time() - start_time - return result + result.base_result.duration_seconds = time.time() - start_time + return result -@activity.defn -async def RunUiTest(req: pb.UiTestRequest) -> pb.UiTestResult: - """执行UI测试的Temporal Activity实现""" - activity.logger.info(f"Received UI Test Request: {req.test_case_id}") - start_time = time.time() - result = pb.UiTestResult() - result.base_result.test_case_id = req.test_case_id + @activity.defn + async def run_ui_test(self,req: pb.UiTestRequest) -> pb.UiTestResult: + """执行UI测试的Temporal Activity实现""" + activity.logger.info(f"Received UI Test Request: {req.test_case_id}") + start_time = time.time() + result = pb.UiTestResult() + result.base_result.test_case_id = req.test_case_id - try: - # 调用实际的UI测试逻辑,返回本地文件路径 - ui_test_success, log_output, screenshot_path, html_report_path = await execute_ui_test_case( - req.test_case_id, req.url_path, req.browser_type, req.headless, scalar_map_to_dict(req.user_data) - ) + try: + # 调用实际的UI测试逻辑,返回本地文件路径 + ui_test_success, log_output, screenshot_path, html_report_path = await execute_ui_test_case( + req.test_case_id, req.url_path, req.browser_type, req.headless, scalar_map_to_dict(req.user_data) + ) - result.base_result.success = ui_test_success - result.base_result.log_output = log_output - result.base_result.message = "UI Test Passed" if ui_test_success else "UI Test Failed" + result.base_result.success = ui_test_success + result.base_result.log_output = log_output + result.base_result.message = "UI Test Passed" if ui_test_success else "UI Test Failed" - # 上传截图和报告到对象存储,并返回URL - if screenshot_path: - result.screenshot_url = await upload_file_to_s3(screenshot_path, f"screenshots/{req.test_case_id}.png") - os.remove(screenshot_path) # 清理本地文件 - if html_report_path: - result.html_report_url = await upload_file_to_s3(html_report_path, f"reports/{req.test_case_id}.html") - os.remove(html_report_path) # 清理本地文件 + # 上传截图和报告到对象存储,并返回URL + if screenshot_path and os.path.exists(screenshot_path): + result.screenshot_url = await upload_file_to_s3(screenshot_path, f"screenshots/{req.test_case_id}.png") + os.remove(screenshot_path) # 清理本地文件 + if html_report_path and os.path.exists(html_report_path): + result.html_report_url = await upload_file_to_s3(html_report_path, f"reports/{req.test_case_id}.html") + os.remove(html_report_path) # 清理本地文件 - except Exception as e: - activity.logger.error(f"UI Test Failed for {req.test_case_id}: {e}") - result.base_result.success = False - result.base_result.message = f"UI Test Error: {e}" - result.base_result.error_details = str(e) - # 同样,可以抛出 ApplicationError + except Exception as e: + activity.logger.error(f"UI Test Failed for {req.test_case_id}: {e}") + result.base_result.success = False + result.base_result.message = f"UI Test Error: {e}" + result.base_result.error_details = str(e) - result.base_result.duration_seconds = time.time() - start_time - return result + result.base_result.duration_seconds = time.time() - start_time + return result \ No newline at end of file diff --git a/worker/main.py b/worker/main.py index 13eea59..d3178a6 100644 --- a/worker/main.py +++ b/worker/main.py @@ -6,20 +6,21 @@ import sys from temporalio.client import Client from temporalio.worker import Worker -# 确保能导入 proto_gen 模块 -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'proto_gen'))) +# 确保能导入 gen 模块 +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'gen'))) -from activities import RunApiTest, RunUiTest # 导入定义的 Activity +from activities import TestActivities # 导入定义的 Activity async def main(): # 连接 Temporal Server - client = await Client.connect("localhost:7233") # 根据你的 Temporal Server 配置 + client = await Client.connect("temporal.newai.day:17233", namespace="default") # 根据你的 Temporal Server 配置 + activities = TestActivities() # 创建 Worker worker = Worker( client, task_queue="test-task-queue", # 保持与 Go Client 一致 - activities=[RunApiTest, RunUiTest], + activities=[activities.run_ui_test, activities.run_api_test] ) print("Starting Python Temporal Worker...") await worker.run() diff --git a/worker/ui_tests.py b/worker/ui_tests.py index 58ad27e..27ccda5 100644 --- a/worker/ui_tests.py +++ b/worker/ui_tests.py @@ -1,6 +1,5 @@ # UI 测试具体实现 (使用 Playwright) -import asyncio -from playwright.async_api import Playwright, async_playwright, expect +from playwright.async_api import async_playwright, expect import os import datetime @@ -15,13 +14,15 @@ async def execute_ui_test_case(test_case_id: str, url_path: str, browser_type: s log_output = [] success = False screenshot_path = None - html_report_path = None # Playwright 默认生成HTML报告,通常在测试结束后生成 + html_report_path = None + + browser = None + page = None log_output.append(f"Executing UI test: {test_case_id} - {full_url} with {browser_type}") try: async with async_playwright() as p: - browser = None if browser_type == "chromium": browser = await p.chromium.launch(headless=headless) elif browser_type == "firefox": @@ -65,24 +66,19 @@ async def execute_ui_test_case(test_case_id: str, url_path: str, browser_type: s success = False finally: if page: - timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - screenshot_filename = f"screenshot_{test_case_id}_{timestamp}.png" - screenshot_path = os.path.join("/tmp", screenshot_filename) # 临时保存路径 - await page.screenshot(path=screenshot_path) - log_output.append(f"Screenshot saved to: {screenshot_path}") + try: + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + screenshot_filename = f"screenshot_{test_case_id}_{timestamp}.png" + screenshot_path = os.path.join("/tmp", screenshot_filename) + await page.screenshot(path=screenshot_path) + log_output.append(f"Screenshot saved to: {screenshot_path}") + except Exception as e: + log_output.append(f"Failed to take screenshot: {e}") + if browser: - await browser.close() + try: + await browser.close() + except Exception as e: + log_output.append(f"Failed to close browser: {e}") - return success, "\n".join(log_output), screenshot_path, html_report_path # html_report_path 留空,因为Playwright通常在测试套件结束后生成 - -# 辅助函数,模拟对象存储上传 -async def upload_file_to_s3(local_path: str, remote_path: str) -> str: - """ - 模拟将文件上传到S3(或任何对象存储)并返回可访问URL。 - 在实际项目中,这里会调用 AWS SDK, MinIO SDK 等。 - """ - # 模拟上传延迟 - await asyncio.sleep(0.1) - print(f"Mock Uploaded {local_path} to S3 bucket/path: {remote_path}") - # 返回一个模拟的URL - return f"https://your-s3-bucket.com/{remote_path}" + return success, "\n".join(log_output), screenshot_path, html_report_path \ No newline at end of file