127 lines
5.3 KiB
Python
127 lines
5.3 KiB
Python
# UI 测试具体实现 (使用 Playwright)
|
|
from playwright.async_api import async_playwright
|
|
import os
|
|
import datetime
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
# 全局浏览器会话管理
|
|
_browser_sessions = {}
|
|
_playwright_instance = None
|
|
|
|
async def execute_ui_test_case(test_case_id: str, url_path: str, browser_type: str, headless: bool, steps: list, browser_session_id: Optional[str] = None):
|
|
"""
|
|
实际执行UI测试的函数。
|
|
支持浏览器会话复用。
|
|
"""
|
|
global _browser_sessions, _playwright_instance
|
|
base_url = "" # 假设 UI 测试的基地址
|
|
full_url = f"{base_url}{url_path}"
|
|
|
|
log_output = []
|
|
success = False
|
|
screenshot_path = None
|
|
html_report_path = None
|
|
page = None
|
|
created_new_session = False
|
|
session_id = browser_session_id
|
|
|
|
log_output.append(f"Executing UI test: {test_case_id} - {full_url} with {browser_type}")
|
|
|
|
try:
|
|
if _playwright_instance is None:
|
|
_playwright_instance = await async_playwright().start()
|
|
|
|
# 浏览器复用逻辑
|
|
if session_id and session_id in _browser_sessions:
|
|
browser = _browser_sessions[session_id]
|
|
log_output.append(f"Reusing browser session: {session_id}")
|
|
else:
|
|
# 新建浏览器
|
|
if browser_type == "chromium":
|
|
browser = await _playwright_instance.chromium.launch(headless=headless)
|
|
elif browser_type == "firefox":
|
|
browser = await _playwright_instance.firefox.launch(headless=headless)
|
|
elif browser_type == "webkit":
|
|
browser = await _playwright_instance.webkit.launch(headless=headless)
|
|
else:
|
|
raise ValueError(f"Unsupported browser type: {browser_type}")
|
|
# 生成新的 session_id
|
|
session_id = str(uuid.uuid4())
|
|
_browser_sessions[session_id] = browser
|
|
created_new_session = True
|
|
log_output.append(f"Created new browser session: {session_id}")
|
|
|
|
page = await browser.new_page()
|
|
|
|
await page.goto(full_url)
|
|
log_output.append(f"Navigated to: {full_url}")
|
|
|
|
# 遍历并执行所有UI步骤
|
|
for step in steps:
|
|
log_output.append(f"Executing step: {step.name} ({step.event_type})")
|
|
element = page.locator(step.selector)
|
|
|
|
if step.event_type == "click":
|
|
await element.click()
|
|
log_output.append(f"Clicked element with selector: {step.selector}")
|
|
elif step.event_type == "input":
|
|
await element.fill(step.input_value)
|
|
log_output.append(f"Input '{step.input_value}' into element with selector: {step.selector}")
|
|
elif step.event_type == "swipe":
|
|
# Playwright doesn't have a built-in swipe, so we simulate with mouse actions
|
|
box = await element.bounding_box()
|
|
if box:
|
|
start_x = box['x'] + box['width'] / 2
|
|
start_y = box['y'] + box['height'] / 2
|
|
await page.mouse.move(start_x, start_y)
|
|
await page.mouse.down()
|
|
await page.mouse.move(start_x + step.offset_x, start_y + step.offset_y)
|
|
await page.mouse.up()
|
|
log_output.append(f"Swiped on element with selector: {step.selector}")
|
|
elif step.event_type == "wait":
|
|
await page.wait_for_timeout(step.wait_time_seconds * 1000)
|
|
log_output.append(f"Waited for {step.wait_time_seconds} seconds.")
|
|
else:
|
|
log_output.append(f"Unsupported event type: {step.event_type}")
|
|
|
|
success = True
|
|
log_output.append("UI Test PASSED.")
|
|
|
|
except Exception as e:
|
|
log_output.append(f"UI Test FAILED (Exception): {e}")
|
|
success = False
|
|
finally:
|
|
if page:
|
|
try:
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
|
screenshot_filename = f"screenshot_{test_case_id}_{timestamp}.png"
|
|
screenshot_path = os.path.join("/tmp", screenshot_filename)
|
|
await page.screenshot(path=screenshot_path)
|
|
log_output.append(f"Screenshot saved to: {screenshot_path}")
|
|
except Exception as e:
|
|
log_output.append(f"Failed to take screenshot: {e}")
|
|
# 注意:只有不是复用时才自动关闭浏览器
|
|
if created_new_session and session_id in _browser_sessions:
|
|
try:
|
|
await _browser_sessions[session_id].close()
|
|
del _browser_sessions[session_id]
|
|
log_output.append(f"Closed browser session: {session_id}")
|
|
except Exception as e:
|
|
log_output.append(f"Failed to close browser: {e}")
|
|
|
|
return success, "\n".join(log_output), screenshot_path, html_report_path, session_id
|
|
|
|
async def close_browser_session(browser_session_id: str):
|
|
"""
|
|
显式关闭指定的浏览器会话。
|
|
"""
|
|
global _browser_sessions
|
|
if browser_session_id in _browser_sessions:
|
|
try:
|
|
await _browser_sessions[browser_session_id].close()
|
|
del _browser_sessions[browser_session_id]
|
|
return True, f"Closed browser session: {browser_session_id}"
|
|
except Exception as e:
|
|
return False, f"Failed to close browser: {e}"
|
|
return False, f"No such browser session: {browser_session_id}" |