Files
ProxyPool/tests/integration/test_scheduler_api.py
祀梦 0131c8b408 feat: fpw plugins, validation/crawl perf, WS stats, test DB isolation
- Add Free_Proxy_Website-style fpw_* plugins and register them
- Per-plugin crawl timeout (crawl_timeout_seconds=120); remove global crawl_timeout setting
- Validator: fix connect vs total timeout on save; SOCKS session LRU cache; drop redundant semaphore
- Validation handler uses single DB connection; batch upsert after crawl; WorkerPool put_nowait
- Remove unused max_retries from settings API/UI; settings maintenance SQL + init_db cleanup of deprecated keys
- WebSocket dashboard stats; ProxyList pool_filter and API alignment
- POST /api/proxies/delete-one for IPv6-safe deletes; task poll stops on 404
- pytest uses PROXYPOOL_DB_PATH=db/proxies.test.sqlite so tests do not wipe production DB
- .gitignore: explicit proxies.test.sqlite patterns; fix plugin_service ValidationException import

Made-with: Cursor
2026-04-05 13:39:19 +08:00

150 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""调度器 API 集成测试 - 测试 /api/scheduler/* 所有接口"""
import pytest
from app.api.deps import get_settings_repo
from app.repositories.settings_repo import SettingsRepository
class FailingSettingsRepository(SettingsRepository):
"""save 恒为 False用于覆盖「设置保存失败」分支非 MagicMock。"""
@staticmethod
async def save(db, settings):
return False
class TestSchedulerAPI:
"""测试调度器相关 API"""
@pytest.mark.asyncio
async def test_get_scheduler_status(self, client):
"""测试 GET /api/scheduler/status"""
response = await client.get("/api/scheduler/status")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "running" in data["data"]
assert "interval_minutes" in data["data"]
assert isinstance(data["data"]["running"], bool)
@pytest.mark.asyncio
async def test_start_scheduler(self, client):
"""测试 POST /api/scheduler/start"""
response = await client.post("/api/scheduler/start")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert data["data"]["running"] is True
@pytest.mark.asyncio
async def test_start_scheduler_already_running(self, client):
"""测试 POST /api/scheduler/start - 已经运行"""
# 先启动
await client.post("/api/scheduler/start")
# 再次启动
response = await client.post("/api/scheduler/start")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "已在运行" in data["message"] or data["data"]["running"] is True
@pytest.mark.asyncio
async def test_stop_scheduler(self, client):
"""测试 POST /api/scheduler/stop"""
# 先确保调度器在运行
await client.post("/api/scheduler/start")
response = await client.post("/api/scheduler/stop")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert data["data"]["running"] is False
@pytest.mark.asyncio
async def test_stop_scheduler_already_stopped(self, client):
"""测试 POST /api/scheduler/stop - 已经停止"""
# 先停止
await client.post("/api/scheduler/stop")
# 再次停止
response = await client.post("/api/scheduler/stop")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "未运行" in data["message"] or data["data"]["running"] is False
@pytest.mark.asyncio
async def test_validate_now(self, client):
"""测试 POST /api/scheduler/validate-now"""
# 先启动调度器
await client.post("/api/scheduler/start")
response = await client.post("/api/scheduler/validate-now")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert data["data"]["started"] is True
@pytest.mark.asyncio
async def test_validate_now_returns_valid_job(self, client):
"""测试 POST /api/scheduler/validate-now 返回有效 job_id"""
await client.post("/api/scheduler/start")
response = await client.post("/api/scheduler/validate-now")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
job_id = data["data"]["job_id"]
assert isinstance(job_id, str) and len(job_id) > 0
# 通过应用状态验证 job 已被提交到 executor
from app.api.main import create_app
# 使用 client 的 app 实例
app = client._transport.app
executor = app.state.executor
job = executor.get_job(job_id)
assert job is not None
@pytest.mark.asyncio
async def test_start_scheduler_db_save_failure(self, client, app):
"""测试启动调度器时数据库保存失败应返回 running=False"""
# lifespan 启动时调度器可能已自动启动,先停止它
await client.post("/api/scheduler/stop")
app.dependency_overrides[get_settings_repo] = lambda: FailingSettingsRepository()
try:
response = await client.post("/api/scheduler/start")
finally:
app.dependency_overrides.pop(get_settings_repo, None)
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert data["data"]["running"] is False
assert "失败" in data["message"]
@pytest.mark.asyncio
async def test_scheduler_full_workflow(self, client):
"""测试调度器完整工作流"""
# 1. 获取初始状态
response = await client.get("/api/scheduler/status")
initial_status = response.json()["data"]
# 2. 启动调度器
response = await client.post("/api/scheduler/start")
assert response.json()["data"]["running"] is True
# 3. 验证状态
response = await client.get("/api/scheduler/status")
assert response.json()["data"]["running"] is True
# 4. 触发立即验证
response = await client.post("/api/scheduler/validate-now")
assert response.json()["data"]["started"] is True
# 5. 停止调度器
response = await client.post("/api/scheduler/stop")
assert response.json()["data"]["running"] is False
# 6. 验证最终状态
response = await client.get("/api/scheduler/status")
assert response.json()["data"]["running"] is False