From c4d6e553ed182e96a20ce27c9ae25d536f062a5e Mon Sep 17 00:00:00 2001 From: longpeng Date: Fri, 20 Jun 2025 17:44:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86=E6=B3=A8=E9=87=8A=E7=BF=BB=E8=AF=91?= =?UTF-8?q?=E4=B8=BA=E4=B8=AD=E6=96=87=E4=BB=A5=E6=8F=90=E9=AB=98=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E5=8F=AF=E8=AF=BB=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- workers/python/activities.py | 79 ++++++++++++++++++++++++++++-------- 1 file changed, 62 insertions(+), 17 deletions(-) diff --git a/workers/python/activities.py b/workers/python/activities.py index 75d79b6..12464c1 100644 --- a/workers/python/activities.py +++ b/workers/python/activities.py @@ -6,9 +6,9 @@ import asyncio from temporalio import activity -# 确保能导入 gen 模块 +# 确保能导入 gen 模块,将gen目录添加到Python路径 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'gen'))) -# 全局变量来存储 protobuf 模块 +# 导入protobuf生成的模块和其他依赖 from gen import common_test_pb2 as pb from api_tests import execute_api_test_case from ui_tests import execute_ui_test_case @@ -16,112 +16,157 @@ from utils import upload_file_to_s3, scalar_map_to_dict class TestActivities: + """ + 测试活动类,包含API测试和UI测试的Temporal Activity实现 + """ async def _heartbeat_task(self, interval_seconds=30): - """心跳任务,定期发送心跳信号""" + """ + 心跳任务,定期发送心跳信号,防止长时间运行的Activity被Temporal服务器认为已死亡 + + Args: + interval_seconds (int): 心跳发送间隔,默认30秒 + """ while True: try: + # 等待指定间隔时间 await asyncio.sleep(interval_seconds) + # 发送心跳信号告知Temporal服务器Activity仍在运行 activity.heartbeat() activity.logger.debug("Activity heartbeat sent") except asyncio.CancelledError: + # 心跳任务被取消,正常退出 activity.logger.debug("Heartbeat task cancelled") break except Exception as e: + # 心跳发送失败,记录警告但继续尝试 activity.logger.warning(f"Failed to send heartbeat: {e}") @activity.defn(name="run_api_test") async def run_api_test(self, req: pb.ApiTestRequest) -> pb.ApiTestResult: - """执行API测试的Temporal Activity实现""" + """ + 执行API测试的Temporal Activity实现 + + Args: + req (pb.ApiTestRequest): API测试请求对象,包含测试用例ID、端点、HTTP方法等信息 + + Returns: + pb.ApiTestResult: API测试结果对象,包含测试状态、响应数据等信息 + """ activity.logger.info(f"Received API Test Request: {req.test_case_id}") + # 记录测试开始时间,用于计算执行时长 start_time = time.time() + # 初始化测试结果对象 result = pb.ApiTestResult() result.base_result.test_case_id = req.test_case_id - # 启动心跳任务 + # 启动后台心跳任务,确保长时间运行的测试不会超时 heartbeat_task = asyncio.create_task(self._heartbeat_task()) try: - # 发送初始心跳 + # 发送初始心跳信号 activity.heartbeat() - # 调用实际的API测试逻辑 + # 调用实际的API测试逻辑,执行HTTP请求并验证响应 api_test_success, actual_status, response_body, log_output = execute_api_test_case( req.test_case_id, req.endpoint, req.http_method, scalar_map_to_dict(req.headers), req.request_body, req.expected_status_code ) + # 填充测试结果 result.base_result.success = api_test_success result.actual_status_code = actual_status + # 处理响应体,确保为字符串格式 result.response_body = response_body.decode('utf-8') if isinstance(response_body, bytes) else str(response_body) result.base_result.log_output = log_output result.base_result.message = "API Test Passed" if api_test_success else "API Test Failed" except Exception as e: + # 捕获测试执行过程中的异常 activity.logger.error(f"API Test Failed for {req.test_case_id}: {e}") result.base_result.success = False result.base_result.message = f"API Test Error: {e}" result.base_result.error_details = str(e) finally: - # 取消心跳任务 + # 清理工作:取消心跳任务 heartbeat_task.cancel() try: + # 等待心跳任务完全结束 await heartbeat_task except asyncio.CancelledError: pass + # 计算并记录测试执行时长 result.base_result.duration_seconds = time.time() - start_time return result @activity.defn(name="run_ui_test") async def run_ui_test(self, req: pb.UiTestRequest) -> pb.UiTestResult: - """执行UI测试的Temporal Activity实现""" + """ + 执行UI测试的Temporal Activity实现 + + Args: + req (pb.UiTestRequest): UI测试请求对象,包含测试用例ID、URL路径、浏览器类型等信息 + + Returns: + pb.UiTestResult: UI测试结果对象,包含测试状态、截图URL、报告URL等信息 + """ activity.logger.info(f"Received UI Test Request: {req.test_case_id}") + # 记录测试开始时间 start_time = time.time() + # 初始化测试结果对象 result = pb.UiTestResult() result.base_result.test_case_id = req.test_case_id - # 启动心跳任务 + # 启动后台心跳任务 heartbeat_task = asyncio.create_task(self._heartbeat_task()) try: - # 发送初始心跳 + # 发送初始心跳信号 activity.heartbeat() - # 调用实际的UI测试逻辑,返回本地文件路径 + # 调用实际的UI测试逻辑,执行浏览器自动化测试并返回本地文件路径 ui_test_success, log_output, screenshot_path, html_report_path = await execute_ui_test_case( req.test_case_id, req.url_path, req.browser_type, req.headless, scalar_map_to_dict(req.user_data) ) + # 填充基本测试结果 result.base_result.success = ui_test_success result.base_result.log_output = log_output result.base_result.message = "UI Test Passed" if ui_test_success else "UI Test Failed" - # 上传截图和报告到对象存储,并返回URL + # 处理测试生成的文件:上传截图和报告到对象存储,并返回URL if screenshot_path and os.path.exists(screenshot_path): - # 在长时间操作前发送心跳 + # 在长时间操作前发送心跳,防止超时 activity.heartbeat() + # 上传截图到S3并获取访问URL result.screenshot_url = await upload_file_to_s3(screenshot_path, f"screenshots/{req.test_case_id}.png") - os.remove(screenshot_path) # 清理本地文件 + # 清理本地临时文件 + os.remove(screenshot_path) if html_report_path and os.path.exists(html_report_path): # 在长时间操作前发送心跳 activity.heartbeat() + # 上传HTML报告到S3并获取访问URL result.html_report_url = await upload_file_to_s3(html_report_path, f"reports/{req.test_case_id}.html") - os.remove(html_report_path) # 清理本地文件 + # 清理本地临时文件 + os.remove(html_report_path) except Exception as e: + # 捕获UI测试执行过程中的异常 activity.logger.error(f"UI Test Failed for {req.test_case_id}: {e}") result.base_result.success = False result.base_result.message = f"UI Test Error: {e}" result.base_result.error_details = str(e) finally: - # 取消心跳任务 + # 清理工作:取消心跳任务 heartbeat_task.cancel() try: + # 等待心跳任务完全结束 await heartbeat_task except asyncio.CancelledError: pass + # 计算并记录测试执行时长 result.base_result.duration_seconds = time.time() - start_time return result \ No newline at end of file