Beacon/workers/python/ui_tests.py

108 lines
4.2 KiB
Python

# UI 测试具体实现 (使用 Playwright)
from playwright.async_api import async_playwright, expect
import os
import datetime
import uuid
from typing import Optional
# 全局浏览器会话管理
_browser_sessions = {}
_playwright_instance = None
async def execute_ui_test_case(test_case_id: str, url_path: str, browser_type: str, headless: bool, browser_session_id: Optional[str] = None):
"""
实际执行UI测试的函数。
支持浏览器会话复用。
"""
global _browser_sessions, _playwright_instance
base_url = "https://playwright.dev" # 假设 UI 测试的基地址
full_url = f"{base_url}{url_path}"
log_output = []
success = False
screenshot_path = None
html_report_path = None
page = None
created_new_session = False
session_id = browser_session_id
log_output.append(f"Executing UI test: {test_case_id} - {full_url} with {browser_type}")
try:
if _playwright_instance is None:
_playwright_instance = await async_playwright().start()
# 浏览器复用逻辑
if session_id and session_id in _browser_sessions:
browser = _browser_sessions[session_id]
log_output.append(f"Reusing browser session: {session_id}")
else:
# 新建浏览器
if browser_type == "chromium":
browser = await _playwright_instance.chromium.launch(headless=headless)
elif browser_type == "firefox":
browser = await _playwright_instance.firefox.launch(headless=headless)
elif browser_type == "webkit":
browser = await _playwright_instance.webkit.launch(headless=headless)
else:
raise ValueError(f"Unsupported browser type: {browser_type}")
# 生成新的 session_id
session_id = str(uuid.uuid4())
_browser_sessions[session_id] = browser
created_new_session = True
log_output.append(f"Created new browser session: {session_id}")
page = await browser.new_page()
await page.goto(full_url)
log_output.append(f"Navigated to: {full_url}")
# 示例UI操作和断言
element = page.locator("text=Playwright enables reliable end-to-end testing for modern web apps.")
await expect(element).to_be_visible()
log_output.append("Found expected text on page.")
await page.click("text=Docs")
await page.wait_for_url("**/docs/intro")
log_output.append("Clicked 'Docs' link and navigated.")
success = True
log_output.append("UI Test PASSED.")
except Exception as e:
log_output.append(f"UI Test FAILED (Exception): {e}")
success = False
finally:
if page:
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
screenshot_filename = f"screenshot_{test_case_id}_{timestamp}.png"
screenshot_path = os.path.join("/tmp", screenshot_filename)
await page.screenshot(path=screenshot_path)
log_output.append(f"Screenshot saved to: {screenshot_path}")
except Exception as e:
log_output.append(f"Failed to take screenshot: {e}")
# 注意:只有不是复用时才自动关闭浏览器
if created_new_session and session_id in _browser_sessions:
try:
await _browser_sessions[session_id].close()
del _browser_sessions[session_id]
log_output.append(f"Closed browser session: {session_id}")
except Exception as e:
log_output.append(f"Failed to close browser: {e}")
return success, "\n".join(log_output), screenshot_path, html_report_path, session_id
async def close_browser_session(browser_session_id: str):
"""
显式关闭指定的浏览器会话。
"""
global _browser_sessions
if browser_session_id in _browser_sessions:
try:
await _browser_sessions[browser_session_id].close()
del _browser_sessions[browser_session_id]
return True, f"Closed browser session: {browser_session_id}"
except Exception as e:
return False, f"Failed to close browser: {e}"
return False, f"No such browser session: {browser_session_id}"