# Python Worker 入口,注册并运行 Activity import asyncio import os import sys from temporalio.client import Client from temporalio.worker import Worker # 确保能导入 proto_gen 模块 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'proto_gen'))) from activities import RunApiTest, RunUiTest # 导入定义的 Activity async def main(): # 连接 Temporal Server client = await Client.connect("localhost:7233") # 根据你的 Temporal Server 配置 # 创建 Worker worker = Worker( client, task_queue="test-task-queue", # 保持与 Go Client 一致 activities=[RunApiTest, RunUiTest], ) print("Starting Python Temporal Worker...") await worker.run() if __name__ == "__main__": asyncio.run(main())