# 实现 Temporal Activity 逻辑 import os import sys import time import random from temporalio import activity # 确保能导入 gen 模块 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'gen'))) # 全局变量来存储 protobuf 模块 from gen import common_test_pb2 as pb from api_tests import execute_api_test_case from ui_tests import execute_ui_test_case from utils import upload_file_to_s3, scalar_map_to_dict class TestActivities: @activity.defn(name="run_api_test") async def run_api_test(self,req: pb.ApiTestRequest) -> pb.ApiTestResult: """执行API测试的Temporal Activity实现""" activity.logger.info(f"Received API Test Request: {req.test_case_id}") start_time = time.time() result = pb.ApiTestResult() result.base_result.test_case_id = req.test_case_id try: # 调用实际的API测试逻辑 api_test_success, actual_status, response_body, log_output = execute_api_test_case( req.test_case_id, req.endpoint, req.http_method, scalar_map_to_dict(req.headers), req.request_body, req.expected_status_code ) result.base_result.success = api_test_success result.actual_status_code = actual_status result.response_body = response_body.decode('utf-8') if isinstance(response_body, bytes) else str(response_body) result.base_result.log_output = log_output result.base_result.message = "API Test Passed" if api_test_success else "API Test Failed" except Exception as e: activity.logger.error(f"API Test Failed for {req.test_case_id}: {e}") result.base_result.success = False result.base_result.message = f"API Test Error: {e}" result.base_result.error_details = str(e) result.base_result.duration_seconds = time.time() - start_time return result @activity.defn(name="run_ui_test") async def run_ui_test(self,req: pb.UiTestRequest) -> pb.UiTestResult: """执行UI测试的Temporal Activity实现""" activity.logger.info(f"Received UI Test Request: {req.test_case_id}") start_time = time.time() result = pb.UiTestResult() result.base_result.test_case_id = req.test_case_id try: # 调用实际的UI测试逻辑,返回本地文件路径 ui_test_success, log_output, screenshot_path, html_report_path = await execute_ui_test_case( req.test_case_id, req.url_path, req.browser_type, req.headless, scalar_map_to_dict(req.user_data) ) result.base_result.success = ui_test_success result.base_result.log_output = log_output result.base_result.message = "UI Test Passed" if ui_test_success else "UI Test Failed" # 上传截图和报告到对象存储,并返回URL if screenshot_path and os.path.exists(screenshot_path): result.screenshot_url = await upload_file_to_s3(screenshot_path, f"screenshots/{req.test_case_id}.png") os.remove(screenshot_path) # 清理本地文件 if html_report_path and os.path.exists(html_report_path): result.html_report_url = await upload_file_to_s3(html_report_path, f"reports/{req.test_case_id}.html") os.remove(html_report_path) # 清理本地文件 except Exception as e: activity.logger.error(f"UI Test Failed for {req.test_case_id}: {e}") result.base_result.success = False result.base_result.message = f"UI Test Error: {e}" result.base_result.error_details = str(e) result.base_result.duration_seconds = time.time() - start_time return result @activity.defn(name="PythonAddRandomPrefixActivity") async def python_add_random_prefix_activity(self,data: str) -> str: prefixes = [ "alpha-", "beta-", "gamma-", "delta-", "epsilon-", "zeta-", "eta-", "theta-", "iota-", "kappa-", ] prefix = random.choice(prefixes) return f"{prefix}{data}"