85 lines
3.5 KiB
Python
85 lines
3.5 KiB
Python
# 实现 Temporal Activity 逻辑
|
||
import os
|
||
# 确保能导入 proto_gen 模块
|
||
import sys
|
||
import time
|
||
|
||
from temporalio.exceptions import ApplicationError
|
||
from temporalio.worker import activity
|
||
|
||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'proto_gen')))
|
||
|
||
from gen import common_test_pb2 as pb
|
||
from api_tests import execute_api_test_case
|
||
from ui_tests import execute_ui_test_case
|
||
from utils import upload_file_to_s3 # 假设有这个函数
|
||
|
||
|
||
@activity.defn
|
||
async def RunApiTest(req: pb.ApiTestRequest) -> pb.ApiTestResult:
|
||
"""执行API测试的Temporal Activity实现"""
|
||
activity.logger.info(f"Received API Test Request: {req.test_case_id}")
|
||
start_time = time.time()
|
||
result = pb.ApiTestResult()
|
||
result.base_result.test_case_id = req.test_case_id
|
||
|
||
try:
|
||
# 调用实际的API测试逻辑
|
||
api_test_success, actual_status, response_body, log_output = execute_api_test_case(
|
||
req.test_case_id, req.endpoint, req.http_method, req.headers, req.request_body, req.expected_status_code
|
||
)
|
||
|
||
result.base_result.success = api_test_success
|
||
result.actual_status_code = actual_status
|
||
result.response_body = response_body.decode('utf-8') # 假设是UTF-8
|
||
result.base_result.log_output = log_output
|
||
result.base_result.message = "API Test Passed" if api_test_success else "API Test Failed"
|
||
|
||
except Exception as e:
|
||
activity.logger.error(f"API Test Failed for {req.test_case_id}: {e}")
|
||
result.base_result.success = False
|
||
result.base_result.message = f"API Test Error: {e}"
|
||
result.base_result.error_details = str(e)
|
||
# 如果是业务逻辑上的不可重试错误,可以抛出 ApplicationError
|
||
# raise ApplicationError("NonRetryableErrorType", details=str(e))
|
||
|
||
result.base_result.duration_seconds = time.time() - start_time
|
||
return result
|
||
|
||
|
||
@activity.defn
|
||
async def RunUiTest(req: pb.UiTestRequest) -> pb.UiTestResult:
|
||
"""执行UI测试的Temporal Activity实现"""
|
||
activity.logger.info(f"Received UI Test Request: {req.test_case_id}")
|
||
start_time = time.time()
|
||
result = pb.UiTestResult()
|
||
result.base_result.test_case_id = req.test_case_id
|
||
|
||
try:
|
||
# 调用实际的UI测试逻辑,返回本地文件路径
|
||
ui_test_success, log_output, screenshot_path, html_report_path = await execute_ui_test_case(
|
||
req.test_case_id, req.url_path, req.browser_type, req.headless, req.user_data
|
||
)
|
||
|
||
result.base_result.success = ui_test_success
|
||
result.base_result.log_output = log_output
|
||
result.base_result.message = "UI Test Passed" if ui_test_success else "UI Test Failed"
|
||
|
||
# 上传截图和报告到对象存储,并返回URL
|
||
if screenshot_path:
|
||
result.screenshot_url = await upload_file_to_s3(screenshot_path, f"screenshots/{req.test_case_id}.png")
|
||
os.remove(screenshot_path) # 清理本地文件
|
||
if html_report_path:
|
||
result.html_report_url = await upload_file_to_s3(html_report_path, f"reports/{req.test_case_id}.html")
|
||
os.remove(html_report_path) # 清理本地文件
|
||
|
||
except Exception as e:
|
||
activity.logger.error(f"UI Test Failed for {req.test_case_id}: {e}")
|
||
result.base_result.success = False
|
||
result.base_result.message = f"UI Test Error: {e}"
|
||
result.base_result.error_details = str(e)
|
||
# 同样,可以抛出 ApplicationError
|
||
|
||
result.base_result.duration_seconds = time.time() - start_time
|
||
return result
|