# Python Worker 入口,注册并运行 Activity import asyncio import os import sys from temporalio.client import Client from temporalio.worker import Worker # 确保能导入 gen 模块 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'gen'))) from activities import TestActivities # 导入定义的 Activity async def main(): # 连接 Temporal Server client = await Client.connect("temporal.newai.day:17233", namespace="default") # 根据你的 Temporal Server 配置 activities = TestActivities() # 创建 Worker worker = Worker( client, task_queue="test-task-queue", # 保持与 Go Client 一致 activities=[activities.run_ui_test, activities.run_api_test] ) print("Starting Python Temporal Worker...") await worker.run() if __name__ == "__main__": asyncio.run(main())