"""调度器 API 集成测试 - 测试 /api/scheduler/* 所有接口""" import pytest class TestSchedulerAPI: """测试调度器相关 API""" @pytest.mark.asyncio async def test_get_scheduler_status(self, client): """测试 GET /api/scheduler/status""" response = await client.get("/api/scheduler/status") assert response.status_code == 200 data = response.json() assert data["code"] == 200 assert "running" in data["data"] assert "interval_minutes" in data["data"] assert isinstance(data["data"]["running"], bool) @pytest.mark.asyncio async def test_start_scheduler(self, client): """测试 POST /api/scheduler/start""" response = await client.post("/api/scheduler/start") assert response.status_code == 200 data = response.json() assert data["code"] == 200 assert data["data"]["running"] is True @pytest.mark.asyncio async def test_start_scheduler_already_running(self, client): """测试 POST /api/scheduler/start - 已经运行""" # 先启动 await client.post("/api/scheduler/start") # 再次启动 response = await client.post("/api/scheduler/start") assert response.status_code == 200 data = response.json() assert data["code"] == 200 assert "已在运行" in data["message"] or data["data"]["running"] is True @pytest.mark.asyncio async def test_stop_scheduler(self, client): """测试 POST /api/scheduler/stop""" # 先确保调度器在运行 await client.post("/api/scheduler/start") response = await client.post("/api/scheduler/stop") assert response.status_code == 200 data = response.json() assert data["code"] == 200 assert data["data"]["running"] is False @pytest.mark.asyncio async def test_stop_scheduler_already_stopped(self, client): """测试 POST /api/scheduler/stop - 已经停止""" # 先停止 await client.post("/api/scheduler/stop") # 再次停止 response = await client.post("/api/scheduler/stop") assert response.status_code == 200 data = response.json() assert data["code"] == 200 assert "未运行" in data["message"] or data["data"]["running"] is False @pytest.mark.asyncio async def test_validate_now(self, client): """测试 POST /api/scheduler/validate-now""" # 先启动调度器 await client.post("/api/scheduler/start") response = await client.post("/api/scheduler/validate-now") assert response.status_code == 200 data = response.json() assert data["code"] == 200 assert data["data"]["started"] is True @pytest.mark.asyncio async def test_scheduler_full_workflow(self, client): """测试调度器完整工作流""" # 1. 获取初始状态 response = await client.get("/api/scheduler/status") initial_status = response.json()["data"] # 2. 启动调度器 response = await client.post("/api/scheduler/start") assert response.json()["data"]["running"] is True # 3. 验证状态 response = await client.get("/api/scheduler/status") assert response.json()["data"]["running"] is True # 4. 触发立即验证 response = await client.post("/api/scheduler/validate-now") assert response.json()["data"]["started"] is True # 5. 停止调度器 response = await client.post("/api/scheduler/stop") assert response.json()["data"]["running"] is False # 6. 验证最终状态 response = await client.get("/api/scheduler/status") assert response.json()["data"]["running"] is False