Beacon/workers/python/activities.py

127 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 实现 Temporal Activity 逻辑
import os
import sys
import time
import asyncio
from temporalio import activity
# 确保能导入 gen 模块
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'gen')))
# 全局变量来存储 protobuf 模块
from gen import common_test_pb2 as pb
from api_tests import execute_api_test_case
from ui_tests import execute_ui_test_case
from utils import upload_file_to_s3, scalar_map_to_dict
class TestActivities:
async def _heartbeat_task(self, interval_seconds=30):
"""心跳任务,定期发送心跳信号"""
while True:
try:
await asyncio.sleep(interval_seconds)
activity.heartbeat()
activity.logger.debug("Activity heartbeat sent")
except asyncio.CancelledError:
activity.logger.debug("Heartbeat task cancelled")
break
except Exception as e:
activity.logger.warning(f"Failed to send heartbeat: {e}")
@activity.defn(name="run_api_test")
async def run_api_test(self, req: pb.ApiTestRequest) -> pb.ApiTestResult:
"""执行API测试的Temporal Activity实现"""
activity.logger.info(f"Received API Test Request: {req.test_case_id}")
start_time = time.time()
result = pb.ApiTestResult()
result.base_result.test_case_id = req.test_case_id
# 启动心跳任务
heartbeat_task = asyncio.create_task(self._heartbeat_task())
try:
# 发送初始心跳
activity.heartbeat()
# 调用实际的API测试逻辑
api_test_success, actual_status, response_body, log_output = execute_api_test_case(
req.test_case_id, req.endpoint, req.http_method, scalar_map_to_dict(req.headers), req.request_body,
req.expected_status_code
)
result.base_result.success = api_test_success
result.actual_status_code = actual_status
result.response_body = response_body.decode('utf-8') if isinstance(response_body, bytes) else str(response_body)
result.base_result.log_output = log_output
result.base_result.message = "API Test Passed" if api_test_success else "API Test Failed"
except Exception as e:
activity.logger.error(f"API Test Failed for {req.test_case_id}: {e}")
result.base_result.success = False
result.base_result.message = f"API Test Error: {e}"
result.base_result.error_details = str(e)
finally:
# 取消心跳任务
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
result.base_result.duration_seconds = time.time() - start_time
return result
@activity.defn(name="run_ui_test")
async def run_ui_test(self, req: pb.UiTestRequest) -> pb.UiTestResult:
"""执行UI测试的Temporal Activity实现"""
activity.logger.info(f"Received UI Test Request: {req.test_case_id}")
start_time = time.time()
result = pb.UiTestResult()
result.base_result.test_case_id = req.test_case_id
# 启动心跳任务
heartbeat_task = asyncio.create_task(self._heartbeat_task())
try:
# 发送初始心跳
activity.heartbeat()
# 调用实际的UI测试逻辑返回本地文件路径
ui_test_success, log_output, screenshot_path, html_report_path = await execute_ui_test_case(
req.test_case_id, req.url_path, req.browser_type, req.headless, scalar_map_to_dict(req.user_data)
)
result.base_result.success = ui_test_success
result.base_result.log_output = log_output
result.base_result.message = "UI Test Passed" if ui_test_success else "UI Test Failed"
# 上传截图和报告到对象存储并返回URL
if screenshot_path and os.path.exists(screenshot_path):
# 在长时间操作前发送心跳
activity.heartbeat()
result.screenshot_url = await upload_file_to_s3(screenshot_path, f"screenshots/{req.test_case_id}.png")
os.remove(screenshot_path) # 清理本地文件
if html_report_path and os.path.exists(html_report_path):
# 在长时间操作前发送心跳
activity.heartbeat()
result.html_report_url = await upload_file_to_s3(html_report_path, f"reports/{req.test_case_id}.html")
os.remove(html_report_path) # 清理本地文件
except Exception as e:
activity.logger.error(f"UI Test Failed for {req.test_case_id}: {e}")
result.base_result.success = False
result.base_result.message = f"UI Test Error: {e}"
result.base_result.error_details = str(e)
finally:
# 取消心跳任务
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
result.base_result.duration_seconds = time.time() - start_time
return result