Files
ProxyPool/tests/e2e/test_full_workflow.py
祀梦 ce667dba13 test: skip network/live crawl by default; fix settings e2e key
- pytest_collection_modifyitems: skip @pytest.mark.network unless PROXYPOOL_RUN_NETWORK_TESTS=1
- Document opt-in in tests/README.md
- e2e: replace removed crawl_timeout with validation_timeout

Made-with: Cursor
2026-04-05 14:33:29 +08:00

201 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""完整工作流 E2E 测试
这些测试模拟真实用户场景,验证整个系统的集成功能。
"""
import pytest
from tests.task_utils import poll_task_until_terminal
class TestFullWorkflow:
"""测试完整工作流"""
@pytest.mark.network
@pytest.mark.slow
@pytest.mark.asyncio
async def test_proxy_management_workflow(self, client):
"""测试代理管理完整工作流
场景:
1. 查看统计信息
2. 列出代理
3. 触发爬取
4. 查看更新后的统计
5. 导出代理
6. 清理无效代理
"""
# 1. 获取初始统计
response = await client.get("/api/proxies/stats")
assert response.status_code == 200
initial_stats = response.json()["data"]
# 2. 列出代理
response = await client.post("/api/proxies", json={
"page": 1,
"page_size": 20,
})
assert response.status_code == 200
# 3. 触发所有插件爬取
response = await client.post("/api/plugins/crawl-all")
assert response.status_code == 200
task_id = response.json()["data"]["task_id"]
task_data = await poll_task_until_terminal(
client, task_id, max_rounds=400, interval=0.5
)
assert task_data is not None
assert task_data["status"] in ("completed", "failed", "cancelled")
# 4. 获取更新后的统计
response = await client.get("/api/proxies/stats")
updated_stats = response.json()["data"]
assert "total" in initial_stats and "total" in updated_stats
# 5. 导出代理(所有格式)
for fmt in ["csv", "txt", "json"]:
response = await client.get(f"/api/proxies/export/{fmt}")
assert response.status_code == 200
# 6. 清理无效代理
response = await client.delete("/api/proxies/clean-invalid")
assert response.status_code == 200
@pytest.mark.network
@pytest.mark.slow
@pytest.mark.asyncio
async def test_plugin_management_workflow(self, client):
"""测试插件管理完整工作流
场景:
1. 列出所有插件
2. 禁用某个插件
3. 更新插件配置
4. 启用插件
5. 触发单个插件爬取
"""
# 1. 列出插件
response = await client.get("/api/plugins")
assert response.status_code == 200
plugins = response.json()["data"]["plugins"]
if not plugins:
pytest.skip("没有可用的插件")
plugin_id = plugins[0]["id"]
# 2. 禁用插件
response = await client.put(f"/api/plugins/{plugin_id}/toggle", json={"enabled": False})
assert response.status_code == 200
# 3. 获取插件配置
response = await client.get(f"/api/plugins/{plugin_id}/config")
assert response.status_code == 200
# 4. 更新插件配置
response = await client.post(
f"/api/plugins/{plugin_id}/config",
json={"config": {"max_pages": 3}}
)
assert response.status_code == 200
# 5. 启用插件
response = await client.put(f"/api/plugins/{plugin_id}/toggle", json={"enabled": True})
assert response.status_code == 200
# 6. 触发爬取
response = await client.post(f"/api/plugins/{plugin_id}/crawl")
assert response.status_code == 200
crawl_task_id = response.json()["data"]["task_id"]
crawl_task = await poll_task_until_terminal(
client, crawl_task_id, max_rounds=140, interval=0.5
)
assert crawl_task is not None
assert crawl_task["status"] in ("completed", "failed", "cancelled")
@pytest.mark.asyncio
async def test_scheduler_workflow(self, client):
"""测试调度器工作流
场景:
1. 启动调度器
2. 触发立即验证
3. 检查状态
4. 停止调度器
"""
# 1. 启动调度器
response = await client.post("/api/scheduler/start")
assert response.status_code == 200
# 2. 触发立即验证
response = await client.post("/api/scheduler/validate-now")
assert response.status_code == 200
# 3. 检查状态
response = await client.get("/api/scheduler/status")
assert response.status_code == 200
assert response.json()["data"]["running"] is True
# 4. 停止调度器
response = await client.post("/api/scheduler/stop")
assert response.status_code == 200
assert response.json()["data"]["running"] is False
@pytest.mark.asyncio
async def test_settings_workflow(self, client):
"""测试设置工作流
场景:
1. 获取当前设置
2. 修改设置
3. 验证设置已保存
4. 恢复默认设置
"""
# 1. 获取当前设置
response = await client.get("/api/settings")
assert response.status_code == 200
original_settings = response.json()["data"]
# 2. 修改设置(爬取限时已改为各插件 crawl_timeout_seconds不再使用全局 crawl_timeout
new_settings = original_settings.copy()
new_settings["validation_timeout"] = 8
new_settings["auto_validate"] = not original_settings["auto_validate"]
response = await client.post("/api/settings", json=new_settings)
assert response.status_code == 200
# 3. 验证设置已保存
response = await client.get("/api/settings")
saved_settings = response.json()["data"]
assert saved_settings["validation_timeout"] == 8
# 4. 恢复原始设置
response = await client.post("/api/settings", json=original_settings)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_batch_operations_workflow(self, client):
"""测试批量操作工作流
场景:
1. 批量删除不存在的代理(幂等性测试)
2. 导出所有代理
3. 获取随机代理
"""
# 1. 批量删除(幂等性)
response = await client.post("/api/proxies/batch-delete", json={
"proxies": [
{"ip": "192.168.100.1", "port": 8080},
{"ip": "192.168.100.2", "port": 8081},
]
})
assert response.status_code == 200
# 2. 导出所有格式
for fmt in ["csv", "txt", "json"]:
response = await client.get(f"/api/proxies/export/{fmt}")
assert response.status_code == 200
# 3. 获取随机代理(可能返回 200 或 404
response = await client.get("/api/proxies/random")
assert response.status_code in [200, 404]