重构: 迁移后端代码到 app 目录,前端移动到 WebUI,添加完整测试套件

主要变更:
- 后端代码从根目录迁移到 app/ 目录
- 前端代码从 frontend/ 重命名为 WebUI/
- 更新所有导入路径以适配新结构
- 提取公共 API 响应函数到 app/api/common.py
- 精简验证器服务代码
- 更新启动脚本和文档

测试:
- 新增完整测试套件 (tests/)
- 单元测试: 模型、仓库层
- 集成测试: 覆盖所有 22+ API 端点
- E2E 测试: 4个完整工作流场景
- 添加 pytest 配置和测试运行脚本
This commit is contained in:
祀梦
2026-04-04 13:32:36 +08:00
parent df3cc87f88
commit 38bd66128b
109 changed files with 2017 additions and 548 deletions

159
tests/README.md Normal file
View File

@@ -0,0 +1,159 @@
# 测试说明
## 测试结构
```
tests/
├── conftest.py # pytest 配置和 fixtures
├── README.md # 本文件
├── unit/ # 单元测试
│ ├── test_models.py # 模型测试
│ └── test_repositories.py # 仓库层测试
├── integration/ # 集成测试
│ ├── test_proxies_api.py # 代理 API 测试
│ ├── test_plugins_api.py # 插件 API 测试
│ ├── test_scheduler_api.py # 调度器 API 测试
│ ├── test_settings_api.py # 设置 API 测试
│ └── test_health_api.py # 健康检查测试
└── e2e/ # 端到端测试
└── test_full_workflow.py # 完整工作流测试
```
## 运行测试
### 安装测试依赖
```bash
pip install pytest pytest-asyncio httpx
```
### 运行所有测试
```bash
pytest
```
### 运行特定类型的测试
```bash
# 仅运行单元测试
pytest tests/unit -v
# 仅运行集成测试
pytest tests/integration -v
# 仅运行 E2E 测试
pytest tests/e2e -v
```
### 运行特定测试文件
```bash
pytest tests/integration/test_proxies_api.py -v
```
### 运行特定测试函数
```bash
pytest tests/integration/test_proxies_api.py::TestProxiesAPI::test_get_stats -v
```
## 测试覆盖的 API
### 代理 API (`/api/proxies/*`)
-`GET /api/proxies/stats` - 获取统计信息
-`POST /api/proxies` - 列出代理
-`GET /api/proxies/random` - 获取随机代理
-`GET /api/proxies/export/{format}` - 导出代理 (csv, txt, json)
-`DELETE /api/proxies/{ip}/{port}` - 删除代理
-`POST /api/proxies/batch-delete` - 批量删除
-`DELETE /api/proxies/clean-invalid` - 清理无效代理
### 插件 API (`/api/plugins/*`)
-`GET /api/plugins` - 列出插件
-`PUT /api/plugins/{id}/toggle` - 切换插件状态
-`GET /api/plugins/{id}/config` - 获取插件配置
-`POST /api/plugins/{id}/config` - 更新插件配置
-`POST /api/plugins/{id}/crawl` - 触发单个插件爬取
-`POST /api/plugins/crawl-all` - 触发所有插件爬取
### 调度器 API (`/api/scheduler/*`)
-`GET /api/scheduler/status` - 获取调度器状态
-`POST /api/scheduler/start` - 启动调度器
-`POST /api/scheduler/stop` - 停止调度器
-`POST /api/scheduler/validate-now` - 立即验证
### 设置 API (`/api/settings`)
-`GET /api/settings` - 获取设置
-`POST /api/settings` - 保存设置
### 健康检查
-`GET /` - 根端点
-`GET /health` - 健康检查
## 测试 Fixtures
### `client`
异步 HTTP 客户端,用于发送请求到测试应用。
```python
async def test_example(client):
response = await client.get("/api/proxies/stats")
assert response.status_code == 200
```
### `db`
数据库连接 fixture。
```python
async def test_example(db, proxy_repo):
await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50)
```
### `sample_proxy`
创建一个测试代理并自动清理。
```python
async def test_example(client, sample_proxy):
# sample_proxy = {"ip": "192.168.1.1", "port": 8080, "protocol": "http", "score": 50}
response = await client.delete(f"/api/proxies/{sample_proxy['ip']}/{sample_proxy['port']}")
assert response.status_code == 200
```
## 编写新测试
### 单元测试示例
```python
# tests/unit/test_new_feature.py
import pytest
from app.models.domain import ProxyRaw
class TestProxyRaw:
def test_create(self):
proxy = ProxyRaw("192.168.1.1", 8080, "http")
assert proxy.ip == "192.168.1.1"
```
### 集成测试示例
```python
# tests/integration/test_new_api.py
import pytest
class TestNewAPI:
@pytest.mark.asyncio
async def test_new_endpoint(self, client):
response = await client.get("/api/new-endpoint")
assert response.status_code == 200
```

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""测试包"""

5
tests/__main__.py Normal file
View File

@@ -0,0 +1,5 @@
"""直接运行 python -m tests 时执行的入口"""
from .run_tests import main
import sys
sys.exit(main())

Binary file not shown.

56
tests/conftest.py Normal file
View File

@@ -0,0 +1,56 @@
"""pytest 配置文件和 fixtures"""
import pytest
import asyncio
from typing import AsyncGenerator, Generator
from httpx import AsyncClient, ASGITransport
from app.api import create_app
from app.core.db import init_db, get_db
from app.repositories.proxy_repo import ProxyRepository
@pytest.fixture(scope="session")
def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]:
"""创建事件循环"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def app():
"""创建应用实例"""
# 初始化测试数据库
await init_db()
app = create_app()
return app
@pytest.fixture
async def client(app) -> AsyncGenerator[AsyncClient, None]:
"""创建异步 HTTP 客户端"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
@pytest.fixture
async def db():
"""获取数据库连接"""
async with get_db() as db:
yield db
@pytest.fixture
async def proxy_repo():
"""获取代理仓库"""
return ProxyRepository()
@pytest.fixture
async def sample_proxy(db, proxy_repo):
"""创建一个测试代理"""
await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50)
yield {"ip": "192.168.1.1", "port": 8080, "protocol": "http", "score": 50}
# 清理
await proxy_repo.delete(db, "192.168.1.1", 8080)

1
tests/e2e/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""端到端测试"""

View File

@@ -0,0 +1,182 @@
"""完整工作流 E2E 测试
这些测试模拟真实用户场景,验证整个系统的集成功能。
"""
import pytest
class TestFullWorkflow:
"""测试完整工作流"""
@pytest.mark.asyncio
async def test_proxy_management_workflow(self, client):
"""测试代理管理完整工作流
场景:
1. 查看统计信息
2. 列出代理
3. 触发爬取
4. 查看更新后的统计
5. 导出代理
6. 清理无效代理
"""
# 1. 获取初始统计
response = await client.get("/api/proxies/stats")
assert response.status_code == 200
initial_stats = response.json()["data"]
# 2. 列出代理
response = await client.post("/api/proxies", json={
"page": 1,
"page_size": 20,
})
assert response.status_code == 200
# 3. 触发所有插件爬取
response = await client.post("/api/plugins/crawl-all")
assert response.status_code == 200
crawl_result = response.json()["data"]
# 4. 获取更新后的统计
response = await client.get("/api/proxies/stats")
updated_stats = response.json()["data"]
# 5. 导出代理(所有格式)
for fmt in ["csv", "txt", "json"]:
response = await client.get(f"/api/proxies/export/{fmt}")
assert response.status_code == 200
# 6. 清理无效代理
response = await client.delete("/api/proxies/clean-invalid")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_plugin_management_workflow(self, client):
"""测试插件管理完整工作流
场景:
1. 列出所有插件
2. 禁用某个插件
3. 更新插件配置
4. 启用插件
5. 触发单个插件爬取
"""
# 1. 列出插件
response = await client.get("/api/plugins")
assert response.status_code == 200
plugins = response.json()["data"]["plugins"]
if not plugins:
pytest.skip("没有可用的插件")
plugin_id = plugins[0]["id"]
# 2. 禁用插件
response = await client.put(f"/api/plugins/{plugin_id}/toggle", json={"enabled": False})
assert response.status_code == 200
# 3. 获取插件配置
response = await client.get(f"/api/plugins/{plugin_id}/config")
assert response.status_code == 200
# 4. 更新插件配置
response = await client.post(
f"/api/plugins/{plugin_id}/config",
json={"config": {"max_pages": 3}}
)
assert response.status_code == 200
# 5. 启用插件
response = await client.put(f"/api/plugins/{plugin_id}/toggle", json={"enabled": True})
assert response.status_code == 200
# 6. 触发爬取
response = await client.post(f"/api/plugins/{plugin_id}/crawl")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_scheduler_workflow(self, client):
"""测试调度器工作流
场景:
1. 启动调度器
2. 触发立即验证
3. 检查状态
4. 停止调度器
"""
# 1. 启动调度器
response = await client.post("/api/scheduler/start")
assert response.status_code == 200
# 2. 触发立即验证
response = await client.post("/api/scheduler/validate-now")
assert response.status_code == 200
# 3. 检查状态
response = await client.get("/api/scheduler/status")
assert response.status_code == 200
assert response.json()["data"]["running"] is True
# 4. 停止调度器
response = await client.post("/api/scheduler/stop")
assert response.status_code == 200
assert response.json()["data"]["running"] is False
@pytest.mark.asyncio
async def test_settings_workflow(self, client):
"""测试设置工作流
场景:
1. 获取当前设置
2. 修改设置
3. 验证设置已保存
4. 恢复默认设置
"""
# 1. 获取当前设置
response = await client.get("/api/settings")
assert response.status_code == 200
original_settings = response.json()["data"]
# 2. 修改设置
new_settings = original_settings.copy()
new_settings["crawl_timeout"] = 45
new_settings["auto_validate"] = not original_settings["auto_validate"]
response = await client.post("/api/settings", json=new_settings)
assert response.status_code == 200
# 3. 验证设置已保存
response = await client.get("/api/settings")
saved_settings = response.json()["data"]
assert saved_settings["crawl_timeout"] == 45
# 4. 恢复原始设置
response = await client.post("/api/settings", json=original_settings)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_batch_operations_workflow(self, client):
"""测试批量操作工作流
场景:
1. 批量删除不存在的代理(幂等性测试)
2. 导出所有代理
3. 获取随机代理
"""
# 1. 批量删除(幂等性)
response = await client.post("/api/proxies/batch-delete", json={
"proxies": [
{"ip": "192.168.100.1", "port": 8080},
{"ip": "192.168.100.2", "port": 8081},
]
})
assert response.status_code == 200
# 2. 导出所有格式
for fmt in ["csv", "txt", "json"]:
response = await client.get(f"/api/proxies/export/{fmt}")
assert response.status_code == 200
# 3. 获取随机代理(可能返回 200 或 404
response = await client.get("/api/proxies/random")
assert response.status_code in [200, 404]

View File

@@ -0,0 +1 @@
"""集成测试"""

View File

@@ -0,0 +1,47 @@
"""健康检查 API 测试"""
import pytest
class TestHealthAPI:
"""测试健康检查端点"""
@pytest.mark.asyncio
async def test_root_endpoint(self, client):
"""测试根端点 /"""
response = await client.get("/")
assert response.status_code == 200
data = response.json()
assert "message" in data
assert "status" in data
assert data["status"] == "running"
@pytest.mark.asyncio
async def test_health_endpoint(self, client):
"""测试健康检查端点 /health"""
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
assert "timestamp" in data
assert "database" in data
assert "scheduler" in data
assert "version" in data
@pytest.mark.asyncio
async def test_health_endpoint_structure(self, client):
"""测试健康检查端点返回结构"""
response = await client.get("/health")
data = response.json()
assert "status" in data
assert "timestamp" in data
assert "database" in data
assert "scheduler" in data
assert "version" in data
# 验证数据类型
assert isinstance(data["status"], str)
assert isinstance(data["timestamp"], str)
assert isinstance(data["database"], str)
assert isinstance(data["scheduler"], str)
assert isinstance(data["version"], str)

View File

@@ -0,0 +1,147 @@
"""插件 API 集成测试 - 测试 /api/plugins/* 所有接口"""
import pytest
class TestPluginsAPI:
"""测试插件相关 API"""
@pytest.mark.asyncio
async def test_list_plugins(self, client):
"""测试 GET /api/plugins"""
response = await client.get("/api/plugins")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "plugins" in data["data"]
assert isinstance(data["data"]["plugins"], list)
@pytest.mark.asyncio
async def test_list_plugins_structure(self, client):
"""测试 GET /api/plugins 返回结构"""
response = await client.get("/api/plugins")
data = response.json()
if data["data"]["plugins"]:
plugin = data["data"]["plugins"][0]
assert "id" in plugin
assert "name" in plugin
assert "display_name" in plugin
assert "description" in plugin
assert "enabled" in plugin
@pytest.mark.asyncio
async def test_toggle_plugin_enable(self, client):
"""测试 PUT /api/plugins/{id}/toggle - 启用"""
# 先获取一个插件 ID
response = await client.get("/api/plugins")
plugins = response.json()["data"]["plugins"]
if not plugins:
pytest.skip("没有可用的插件")
plugin_id = plugins[0]["id"]
response = await client.put(f"/api/plugins/{plugin_id}/toggle", json={"enabled": True})
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert data["data"]["enabled"] is True
@pytest.mark.asyncio
async def test_toggle_plugin_disable(self, client):
"""测试 PUT /api/plugins/{id}/toggle - 禁用"""
response = await client.get("/api/plugins")
plugins = response.json()["data"]["plugins"]
if not plugins:
pytest.skip("没有可用的插件")
plugin_id = plugins[0]["id"]
response = await client.put(f"/api/plugins/{plugin_id}/toggle", json={"enabled": False})
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert data["data"]["enabled"] is False
@pytest.mark.asyncio
async def test_toggle_plugin_missing_enabled(self, client):
"""测试 PUT /api/plugins/{id}/toggle - 缺少 enabled 参数"""
response = await client.get("/api/plugins")
plugins = response.json()["data"]["plugins"]
if not plugins:
pytest.skip("没有可用的插件")
plugin_id = plugins[0]["id"]
response = await client.put(f"/api/plugins/{plugin_id}/toggle", json={})
assert response.status_code == 400
@pytest.mark.asyncio
async def test_toggle_nonexistent_plugin(self, client):
"""测试 PUT /api/plugins/{id}/toggle - 不存在的插件"""
response = await client.put("/api/plugins/nonexistent_plugin/toggle", json={"enabled": True})
assert response.status_code == 404
@pytest.mark.asyncio
async def test_get_plugin_config(self, client):
"""测试 GET /api/plugins/{id}/config"""
response = await client.get("/api/plugins")
plugins = response.json()["data"]["plugins"]
if not plugins:
pytest.skip("没有可用的插件")
plugin_id = plugins[0]["id"]
response = await client.get(f"/api/plugins/{plugin_id}/config")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "config" in data["data"]
@pytest.mark.asyncio
async def test_get_nonexistent_plugin_config(self, client):
"""测试 GET /api/plugins/{id}/config - 不存在的插件"""
response = await client.get("/api/plugins/nonexistent_plugin/config")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_update_plugin_config(self, client):
"""测试 POST /api/plugins/{id}/config"""
response = await client.get("/api/plugins")
plugins = response.json()["data"]["plugins"]
if not plugins:
pytest.skip("没有可用的插件")
plugin_id = plugins[0]["id"]
response = await client.post(
f"/api/plugins/{plugin_id}/config",
json={"config": {"max_pages": 3}}
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
@pytest.mark.asyncio
async def test_crawl_plugin(self, client):
"""测试 POST /api/plugins/{id}/crawl"""
response = await client.get("/api/plugins")
plugins = response.json()["data"]["plugins"]
if not plugins:
pytest.skip("没有可用的插件")
plugin_id = plugins[0]["id"]
# 这个测试可能需要较长时间,设置较短的超时
response = await client.post(f"/api/plugins/{plugin_id}/crawl")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "proxy_count" in data["data"]
@pytest.mark.asyncio
async def test_crawl_nonexistent_plugin(self, client):
"""测试 POST /api/plugins/{id}/crawl - 不存在的插件"""
response = await client.post("/api/plugins/nonexistent_plugin/crawl")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_crawl_all_plugins(self, client):
"""测试 POST /api/plugins/crawl-all"""
response = await client.post("/api/plugins/crawl-all")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "total_crawled" in data["data"]

View File

@@ -0,0 +1,144 @@
"""代理 API 集成测试 - 测试 /api/proxies/* 所有接口"""
import pytest
class TestProxiesAPI:
"""测试代理相关 API"""
@pytest.mark.asyncio
async def test_get_stats(self, client):
"""测试 GET /api/proxies/stats"""
response = await client.get("/api/proxies/stats")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "data" in data
assert "total" in data["data"]
assert "available" in data["data"]
assert "scheduler_running" in data["data"]
@pytest.mark.asyncio
async def test_list_proxies(self, client):
"""测试 POST /api/proxies"""
request_data = {
"page": 1,
"page_size": 10,
"protocol": None,
"min_score": 0,
"sort_by": "last_check",
"sort_order": "DESC"
}
response = await client.post("/api/proxies", json=request_data)
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "list" in data["data"]
assert "total" in data["data"]
assert "page" in data["data"]
@pytest.mark.asyncio
async def test_list_proxies_with_protocol_filter(self, client):
"""测试 POST /api/proxies 带协议过滤"""
request_data = {
"page": 1,
"page_size": 10,
"protocol": "http",
"min_score": 0,
}
response = await client.post("/api/proxies", json=request_data)
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
@pytest.mark.asyncio
async def test_list_proxies_invalid_protocol(self, client):
"""测试 POST /api/proxies 无效协议"""
request_data = {
"page": 1,
"page_size": 10,
"protocol": "invalid",
}
response = await client.post("/api/proxies", json=request_data)
assert response.status_code == 422 # 验证错误
@pytest.mark.asyncio
async def test_get_random_proxy_empty(self, client):
"""测试 GET /api/proxies/random - 空数据库"""
response = await client.get("/api/proxies/random")
# 可能返回 200(有数据) 或 404(无数据)
assert response.status_code in [200, 404]
@pytest.mark.asyncio
async def test_delete_proxy(self, client, sample_proxy):
"""测试 DELETE /api/proxies/{ip}/{port}"""
response = await client.delete(f"/api/proxies/{sample_proxy['ip']}/{sample_proxy['port']}")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
@pytest.mark.asyncio
async def test_delete_nonexistent_proxy(self, client):
"""测试 DELETE /api/proxies/{ip}/{port} - 不存在的代理"""
response = await client.delete("/api/proxies/999.999.999.999/99999")
# 即使代理不存在也返回成功(幂等删除)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_batch_delete_proxies(self, client):
"""测试 POST /api/proxies/batch-delete"""
request_data = {
"proxies": [
{"ip": "192.168.1.1", "port": 8080},
{"ip": "192.168.1.2", "port": 8081},
]
}
response = await client.post("/api/proxies/batch-delete", json=request_data)
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "deleted_count" in data["data"]
@pytest.mark.asyncio
async def test_batch_delete_too_many(self, client):
"""测试 POST /api/proxies/batch-delete - 超过限制"""
request_data = {
"proxies": [{"ip": f"192.168.1.{i}", "port": 8080} for i in range(1001)]
}
response = await client.post("/api/proxies/batch-delete", json=request_data)
assert response.status_code == 422 # 验证错误
@pytest.mark.asyncio
async def test_clean_invalid_proxies(self, client):
"""测试 DELETE /api/proxies/clean-invalid"""
response = await client.delete("/api/proxies/clean-invalid")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "deleted_count" in data["data"]
@pytest.mark.asyncio
async def test_export_proxies_csv(self, client):
"""测试 GET /api/proxies/export/csv"""
response = await client.get("/api/proxies/export/csv")
assert response.status_code == 200
assert response.headers["content-type"] == "text/csv"
@pytest.mark.asyncio
async def test_export_proxies_txt(self, client):
"""测试 GET /api/proxies/export/txt"""
response = await client.get("/api/proxies/export/txt")
assert response.status_code == 200
assert response.headers["content-type"] == "text/plain"
@pytest.mark.asyncio
async def test_export_proxies_json(self, client):
"""测试 GET /api/proxies/export/json"""
response = await client.get("/api/proxies/export/json")
assert response.status_code == 200
assert response.headers["content-type"] == "application/json"
@pytest.mark.asyncio
async def test_export_proxies_invalid_format(self, client):
"""测试 GET /api/proxies/export/invalid - 无效格式"""
response = await client.get("/api/proxies/export/invalid")
assert response.status_code == 400 # 错误响应

View File

@@ -0,0 +1,101 @@
"""调度器 API 集成测试 - 测试 /api/scheduler/* 所有接口"""
import pytest
class TestSchedulerAPI:
"""测试调度器相关 API"""
@pytest.mark.asyncio
async def test_get_scheduler_status(self, client):
"""测试 GET /api/scheduler/status"""
response = await client.get("/api/scheduler/status")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "running" in data["data"]
assert "interval_minutes" in data["data"]
assert isinstance(data["data"]["running"], bool)
@pytest.mark.asyncio
async def test_start_scheduler(self, client):
"""测试 POST /api/scheduler/start"""
response = await client.post("/api/scheduler/start")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert data["data"]["running"] is True
@pytest.mark.asyncio
async def test_start_scheduler_already_running(self, client):
"""测试 POST /api/scheduler/start - 已经运行"""
# 先启动
await client.post("/api/scheduler/start")
# 再次启动
response = await client.post("/api/scheduler/start")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "已在运行" in data["message"] or data["data"]["running"] is True
@pytest.mark.asyncio
async def test_stop_scheduler(self, client):
"""测试 POST /api/scheduler/stop"""
# 先确保调度器在运行
await client.post("/api/scheduler/start")
response = await client.post("/api/scheduler/stop")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert data["data"]["running"] is False
@pytest.mark.asyncio
async def test_stop_scheduler_already_stopped(self, client):
"""测试 POST /api/scheduler/stop - 已经停止"""
# 先停止
await client.post("/api/scheduler/stop")
# 再次停止
response = await client.post("/api/scheduler/stop")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "未运行" in data["message"] or data["data"]["running"] is False
@pytest.mark.asyncio
async def test_validate_now(self, client):
"""测试 POST /api/scheduler/validate-now"""
# 先启动调度器
await client.post("/api/scheduler/start")
response = await client.post("/api/scheduler/validate-now")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert data["data"]["started"] is True
@pytest.mark.asyncio
async def test_scheduler_full_workflow(self, client):
"""测试调度器完整工作流"""
# 1. 获取初始状态
response = await client.get("/api/scheduler/status")
initial_status = response.json()["data"]
# 2. 启动调度器
response = await client.post("/api/scheduler/start")
assert response.json()["data"]["running"] is True
# 3. 验证状态
response = await client.get("/api/scheduler/status")
assert response.json()["data"]["running"] is True
# 4. 触发立即验证
response = await client.post("/api/scheduler/validate-now")
assert response.json()["data"]["started"] is True
# 5. 停止调度器
response = await client.post("/api/scheduler/stop")
assert response.json()["data"]["running"] is False
# 6. 验证最终状态
response = await client.get("/api/scheduler/status")
assert response.json()["data"]["running"] is False

View File

@@ -0,0 +1,137 @@
"""设置 API 集成测试 - 测试 /api/settings 接口"""
import pytest
class TestSettingsAPI:
"""测试设置相关 API"""
@pytest.mark.asyncio
async def test_get_settings(self, client):
"""测试 GET /api/settings"""
response = await client.get("/api/settings")
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
assert "crawl_timeout" in data["data"]
assert "validation_timeout" in data["data"]
assert "auto_validate" in data["data"]
@pytest.mark.asyncio
async def test_get_settings_structure(self, client):
"""测试 GET /api/settings 返回结构"""
response = await client.get("/api/settings")
data = response.json()["data"]
# 验证所有预期的设置项
expected_keys = [
"crawl_timeout",
"validation_timeout",
"max_retries",
"default_concurrency",
"min_proxy_score",
"proxy_expiry_days",
"auto_validate",
"validate_interval_minutes",
]
for key in expected_keys:
assert key in data, f"缺少设置项: {key}"
@pytest.mark.asyncio
async def test_save_settings(self, client):
"""测试 POST /api/settings"""
settings = {
"crawl_timeout": 45,
"validation_timeout": 15,
"max_retries": 5,
"default_concurrency": 100,
"min_proxy_score": 10,
"proxy_expiry_days": 14,
"auto_validate": True,
"validate_interval_minutes": 60,
}
response = await client.post("/api/settings", json=settings)
assert response.status_code == 200
data = response.json()
assert data["code"] == 200
# 验证返回的数据与提交的一致
for key, value in settings.items():
assert data["data"][key] == value
@pytest.mark.asyncio
async def test_save_settings_partial(self, client):
"""测试 POST /api/settings - 部分更新(实际上会替换所有)"""
# 先获取当前设置
response = await client.get("/api/settings")
current_settings = response.json()["data"]
# 修改部分设置
new_settings = current_settings.copy()
new_settings["crawl_timeout"] = 60
new_settings["auto_validate"] = False
response = await client.post("/api/settings", json=new_settings)
assert response.status_code == 200
data = response.json()
assert data["data"]["crawl_timeout"] == 60
assert data["data"]["auto_validate"] is False
@pytest.mark.asyncio
async def test_save_settings_validation_error(self, client):
"""测试 POST /api/settings - 验证错误"""
# crawl_timeout 必须在 5-120 之间
invalid_settings = {
"crawl_timeout": 200, # 超出范围
"validation_timeout": 10,
"max_retries": 3,
"default_concurrency": 50,
"min_proxy_score": 0,
"proxy_expiry_days": 7,
"auto_validate": True,
"validate_interval_minutes": 30,
}
response = await client.post("/api/settings", json=invalid_settings)
assert response.status_code == 422 # 验证错误
@pytest.mark.asyncio
async def test_save_settings_invalid_type(self, client):
"""测试 POST /api/settings - 无效类型"""
invalid_settings = {
"crawl_timeout": "invalid", # 应该是整数
"validation_timeout": 10,
"max_retries": 3,
"default_concurrency": 50,
"min_proxy_score": 0,
"proxy_expiry_days": 7,
"auto_validate": True,
"validate_interval_minutes": 30,
}
response = await client.post("/api/settings", json=invalid_settings)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_settings_roundtrip(self, client):
"""测试设置读写一致性"""
# 生成随机但有效的设置
import random
test_settings = {
"crawl_timeout": random.randint(10, 60),
"validation_timeout": random.randint(5, 30),
"max_retries": random.randint(1, 5),
"default_concurrency": random.randint(20, 100),
"min_proxy_score": random.randint(0, 50),
"proxy_expiry_days": random.randint(1, 14),
"auto_validate": random.choice([True, False]),
"validate_interval_minutes": random.randint(10, 120),
}
# 写入设置
response = await client.post("/api/settings", json=test_settings)
assert response.status_code == 200
# 读取设置
response = await client.get("/api/settings")
saved_settings = response.json()["data"]
# 验证一致性
for key, value in test_settings.items():
assert saved_settings[key] == value, f"设置项 {key} 不一致"

89
tests/run_tests.py Normal file
View File

@@ -0,0 +1,89 @@
#!/usr/bin/env python
"""测试运行脚本 - 方便运行各种类型的测试"""
import sys
import subprocess
import argparse
def run_command(cmd, description):
"""运行命令并打印结果"""
print(f"\n{'='*60}")
print(f"运行: {description}")
print(f"命令: {' '.join(cmd)}")
print('='*60)
result = subprocess.run(cmd, shell=False)
return result.returncode
def main():
parser = argparse.ArgumentParser(description='运行 ProxyPool 测试')
parser.add_argument(
'type',
nargs='?',
default='all',
choices=['all', 'unit', 'integration', 'e2e', 'coverage'],
help='测试类型 (默认: all)'
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help='详细输出'
)
parser.add_argument(
'-k',
metavar='EXPRESSION',
help='只运行匹配表达式的测试 (pytest -k 选项)'
)
args = parser.parse_args()
# 基础命令
base_cmd = [sys.executable, "-m", "pytest"]
if args.verbose:
base_cmd.append("-v")
if args.k:
base_cmd.extend(["-k", args.k])
exit_code = 0
if args.type == 'all':
exit_code = run_command(
base_cmd + ["tests/"],
"所有测试"
)
elif args.type == 'unit':
exit_code = run_command(
base_cmd + ["tests/unit/"],
"单元测试"
)
elif args.type == 'integration':
exit_code = run_command(
base_cmd + ["tests/integration/"],
"集成测试"
)
elif args.type == 'e2e':
exit_code = run_command(
base_cmd + ["tests/e2e/"],
"端到端测试"
)
elif args.type == 'coverage':
exit_code = run_command(
[sys.executable, "-m", "pytest", "tests/", "--cov=app", "--cov-report=html", "--cov-report=term"] + (["-v"] if args.verbose else []),
"覆盖率测试"
)
print(f"\n{'='*60}")
if exit_code == 0:
print("✅ 所有测试通过!")
else:
print(f"❌ 测试失败 (退出码: {exit_code})")
print('='*60)
return exit_code
if __name__ == "__main__":
sys.exit(main())

1
tests/unit/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""单元测试"""

Binary file not shown.

136
tests/unit/test_models.py Normal file
View File

@@ -0,0 +1,136 @@
"""模型单元测试"""
import pytest
from datetime import datetime
from app.models.domain import ProxyRaw, Proxy, PluginInfo
from app.models.schemas import (
ProxyCreate,
ProxyListRequest,
SettingsSchema,
BatchDeleteRequest,
)
class TestProxyRaw:
"""测试 ProxyRaw 领域模型"""
def test_create_proxy_raw(self):
"""测试创建原始代理"""
proxy = ProxyRaw("192.168.1.1", 8080, "http")
assert proxy.ip == "192.168.1.1"
assert proxy.port == 8080
assert proxy.protocol == "http"
def test_protocol_normalization(self):
"""测试协议标准化"""
proxy = ProxyRaw("192.168.1.1", 8080, "HTTP")
assert proxy.protocol == "http"
def test_invalid_protocol_defaults_to_http(self):
"""测试无效协议默认为 http"""
proxy = ProxyRaw("192.168.1.1", 8080, "invalid")
assert proxy.protocol == "http"
class TestProxy:
"""测试 Proxy 领域模型"""
def test_create_proxy(self):
"""测试创建代理实体"""
proxy = Proxy(
ip="192.168.1.1",
port=8080,
protocol="http",
score=50,
response_time_ms=100.5,
last_check=datetime.now(),
created_at=datetime.now(),
)
assert proxy.ip == "192.168.1.1"
assert proxy.score == 50
class TestPluginInfo:
"""测试 PluginInfo 领域模型"""
def test_create_plugin_info(self):
"""测试创建插件信息"""
plugin = PluginInfo(
id="test_plugin",
name="test_plugin",
display_name="测试插件",
description="用于测试",
enabled=True,
success_count=10,
failure_count=2,
)
assert plugin.id == "test_plugin"
assert plugin.enabled is True
assert plugin.success_count == 10
class TestProxyCreateSchema:
"""测试 ProxyCreate Pydantic 模型"""
def test_valid_proxy_create(self):
"""测试有效的代理创建"""
proxy = ProxyCreate(ip="192.168.1.1", port=8080, protocol="http")
assert proxy.ip == "192.168.1.1"
assert proxy.port == 8080
def test_port_validation(self):
"""测试端口验证"""
with pytest.raises(Exception):
ProxyCreate(ip="192.168.1.1", port=70000) # 超出范围
def test_protocol_validation(self):
"""测试协议验证"""
with pytest.raises(Exception):
ProxyCreate(ip="192.168.1.1", port=8080, protocol="invalid")
class TestProxyListRequest:
"""测试 ProxyListRequest 模式"""
def test_default_values(self):
"""测试默认值"""
request = ProxyListRequest()
assert request.page == 1
assert request.page_size == 20
assert request.sort_by == "last_check"
assert request.sort_order == "DESC"
def test_custom_values(self):
"""测试自定义值"""
request = ProxyListRequest(page=2, page_size=50, protocol="https")
assert request.page == 2
assert request.page_size == 50
assert request.protocol == "https"
class TestSettingsSchema:
"""测试 SettingsSchema"""
def test_default_settings(self):
"""测试默认设置"""
settings = SettingsSchema()
assert settings.crawl_timeout == 30
assert settings.validation_timeout == 10
assert settings.auto_validate is True
def test_custom_settings(self):
"""测试自定义设置"""
settings = SettingsSchema(crawl_timeout=60, auto_validate=False)
assert settings.crawl_timeout == 60
assert settings.auto_validate is False
class TestBatchDeleteRequest:
"""测试 BatchDeleteRequest"""
def test_valid_batch_delete(self):
"""测试有效的批量删除"""
request = BatchDeleteRequest(proxies=[
{"ip": "192.168.1.1", "port": 8080},
{"ip": "192.168.1.2", "port": 8081},
])
assert len(request.proxies) == 2

View File

@@ -0,0 +1,95 @@
"""仓库层单元测试"""
import pytest
import asyncio
from datetime import datetime
class TestProxyRepository:
"""测试 ProxyRepository"""
@pytest.mark.asyncio
async def test_insert_or_update(self, db, proxy_repo):
"""测试插入或更新代理"""
result = await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50)
assert result is True
# 验证插入成功
proxy = await proxy_repo.get_by_ip_port(db, "192.168.1.1", 8080)
assert proxy is not None
assert proxy.ip == "192.168.1.1"
assert proxy.port == 8080
# 清理
await proxy_repo.delete(db, "192.168.1.1", 8080)
@pytest.mark.asyncio
async def test_get_random(self, db, proxy_repo):
"""测试获取随机代理"""
# 先插入一个代理
await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50)
proxy = await proxy_repo.get_random(db)
# 可能有也可能没有(取决于数据库状态)
if proxy:
assert hasattr(proxy, 'ip')
assert hasattr(proxy, 'port')
# 清理
await proxy_repo.delete(db, "192.168.1.1", 8080)
@pytest.mark.asyncio
async def test_list_all(self, db, proxy_repo):
"""测试列出所有代理"""
# 插入测试数据
await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50)
await proxy_repo.insert_or_update(db, "192.168.1.2", 8081, "https", 60)
proxies = await proxy_repo.list_all(db, limit=100)
assert isinstance(proxies, list)
# 清理
await proxy_repo.delete(db, "192.168.1.1", 8080)
await proxy_repo.delete(db, "192.168.1.2", 8081)
@pytest.mark.asyncio
async def test_update_score(self, db, proxy_repo):
"""测试更新分数"""
# 插入代理
await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50)
# 更新分数
result = await proxy_repo.update_score(db, "192.168.1.1", 8080, 10)
assert result is True
# 验证
proxy = await proxy_repo.get_by_ip_port(db, "192.168.1.1", 8080)
assert proxy.score == 60
# 清理
await proxy_repo.delete(db, "192.168.1.1", 8080)
@pytest.mark.asyncio
async def test_batch_delete(self, db, proxy_repo):
"""测试批量删除"""
# 插入测试数据
await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50)
await proxy_repo.insert_or_update(db, "192.168.1.2", 8081, "http", 50)
# 批量删除
count = await proxy_repo.batch_delete(db, [("192.168.1.1", 8080), ("192.168.1.2", 8081)])
assert count == 2
# 验证删除
proxy1 = await proxy_repo.get_by_ip_port(db, "192.168.1.1", 8080)
proxy2 = await proxy_repo.get_by_ip_port(db, "192.168.1.2", 8081)
assert proxy1 is None
assert proxy2 is None
@pytest.mark.asyncio
async def test_get_stats(self, db, proxy_repo):
"""测试获取统计信息"""
stats = await proxy_repo.get_stats(db)
assert "total" in stats
assert "available" in stats
assert "avg_score" in stats
assert "http_count" in stats