diff --git a/.gitignore b/.gitignore
index c45a9ea..97a145a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,7 +53,7 @@ logs/
# Test
test/
-tests/
+# tests/ # 保留测试目录以便版本控制
# Share Directory
share/
diff --git a/README.md b/README.md
index f69e1e3..d10eb14 100644
--- a/README.md
+++ b/README.md
@@ -14,12 +14,12 @@
## 📦 技术栈
### 后端
-- **框架**: FastAPI (端口 9949)
+- **框架**: FastAPI (端口 18080)
- **数据库**: SQLite + aiosqlite
- **异步**: asyncio
### 前端
-- **框架**: Vue 3 + Vite (端口 9948)
+- **框架**: Vue 3 + Vite (端口 18081)
- **UI库**: Element Plus
- **状态管理**: Pinia
- **图表**: ECharts
@@ -53,7 +53,7 @@ start.bat
**启动后端服务**(终端 1)
```bash
-python api_server.py
+python main.py
```
**启动前端服务**(终端 2)
@@ -71,7 +71,7 @@ stop.bat
### 4. 访问 WebUI
-打开浏览器访问:**http://localhost:9948**
+打开浏览器访问:**http://localhost:18081**
## 📁 项目结构
@@ -202,10 +202,10 @@ POST /api/settings
2. **手动测试 API**
```bash
# 获取统计信息
- curl http://localhost:9949/api/stats
+ curl http://localhost:18080/api/stats
# 获取代理列表
- curl -X POST http://localhost:9949/api/proxies \
+ curl -X POST http://localhost:18080/api/proxies \
-H "Content-Type: application/json" \
-d '{"page": 1, "page_size": 20}'
```
@@ -229,7 +229,7 @@ POST /api/settings
## 🔧 常见问题
### Q: 启动后端口被占用?
-A: 修改 `config.py` 中的端口号(默认9949)或 `frontend/vite.config.js` 中的端口号(默认9948)
+A: 修改 `.env` 中的端口号(默认18080)或 `WebUI/vite.config.js` 中的端口号(默认18081)
### Q: 爬虫无法抓取代理?
A: 检查网络连接,确保能访问目标网站,或尝试更换代理源插件
diff --git a/WebUI/index.html b/WebUI/index.html
index 8769dc1..2410242 100644
--- a/WebUI/index.html
+++ b/WebUI/index.html
@@ -1,5 +1,5 @@
-
+
diff --git a/WebUI/src/api/index.js b/WebUI/src/api/index.js
index d9b781b..3568518 100644
--- a/WebUI/src/api/index.js
+++ b/WebUI/src/api/index.js
@@ -61,8 +61,8 @@ export const statsAPI = {
}
export const proxiesAPI = {
- getProxies: (params, signal) =>
- api.post('/api/proxies', cleanParams(params), signal ? { signal } : {}),
+ getProxies: (params, signal) =>
+ api.post('/api/proxies', cleanParams(params), { signal }),
deleteProxy: (ip, port) => api.delete(`/api/proxies/${ip}/${port}`),
diff --git a/WebUI/src/stores/proxy.js b/WebUI/src/stores/proxy.js
index 970863b..476b67a 100644
--- a/WebUI/src/stores/proxy.js
+++ b/WebUI/src/stores/proxy.js
@@ -89,7 +89,7 @@ export const useProxyStore = defineStore('proxy', () => {
/**
* 批量删除代理
- * @param {Array<[string, number|string]>} proxyList
+ * @param {Array<{ip: string, port: number}>} proxyList
* @returns {Promise} 实际删除的数量
*/
async function batchDeleteProxies(proxyList) {
diff --git a/WebUI/src/views/Plugins.vue b/WebUI/src/views/Plugins.vue
index 8874b83..73c039f 100644
--- a/WebUI/src/views/Plugins.vue
+++ b/WebUI/src/views/Plugins.vue
@@ -46,7 +46,7 @@
handleToggle(row.id, val)"
class="theme-switch"
/>
diff --git a/WebUI/src/views/ProxyList.vue b/WebUI/src/views/ProxyList.vue
index 178b582..c445176 100644
--- a/WebUI/src/views/ProxyList.vue
+++ b/WebUI/src/views/ProxyList.vue
@@ -212,7 +212,7 @@ function handleSearch() {
}
function handleSelectionChange(selection) {
- selectedProxies.value = selection.map(item => [item.ip, item.port])
+ selectedProxies.value = selection.map(item => ({ ip: item.ip, port: item.port }))
}
async function handleCopy(proxy) {
diff --git a/app/api/deps.py b/app/api/deps.py
index add4572..43ff92c 100644
--- a/app/api/deps.py
+++ b/app/api/deps.py
@@ -48,6 +48,6 @@ def create_scheduler_service(db_settings: dict | None = None) -> SchedulerServic
)
svc = SchedulerService(validation_queue=queue, proxy_repo=proxy_repo)
svc.interval_minutes = db_settings.get(
- "validate_interval_minutes", app_settings.validator_timeout
+ "validate_interval_minutes", 30
)
return svc
diff --git a/app/api/main.py b/app/api/main.py
index 25c8cf4..f30753c 100644
--- a/app/api/main.py
+++ b/app/api/main.py
@@ -53,6 +53,3 @@ def create_app() -> FastAPI:
}
return app
-
-
-app = create_app()
diff --git a/app/api/routes/settings.py b/app/api/routes/settings.py
index 14f8bde..d4576b9 100644
--- a/app/api/routes/settings.py
+++ b/app/api/routes/settings.py
@@ -1,9 +1,10 @@
"""设置相关路由"""
-from fastapi import APIRouter
+from fastapi import APIRouter, Request
from app.core.db import get_db
from app.repositories.settings_repo import SettingsRepository
from app.models.schemas import SettingsSchema
from app.api.common import success_response, error_response
+from app.core.log import logger
router = APIRouter(prefix="/api/settings", tags=["settings"])
settings_repo = SettingsRepository()
@@ -17,9 +18,18 @@ async def get_settings():
@router.post("")
-async def save_settings(request: SettingsSchema):
+async def save_settings(request: SettingsSchema, http_request: Request):
async with get_db() as db:
success = await settings_repo.save(db, request.model_dump())
if not success:
return error_response("保存设置失败", 500)
+
+ # 热更新运行中调度器的间隔时间
+ scheduler = getattr(http_request.app.state, "scheduler_service", None)
+ if scheduler and scheduler.running:
+ new_interval = request.validate_interval_minutes
+ if scheduler.interval_minutes != new_interval:
+ scheduler.interval_minutes = new_interval
+ logger.info(f"Scheduler interval updated to {new_interval} minutes")
+
return success_response("保存设置成功", request.model_dump())
diff --git a/app/core/log.py b/app/core/log.py
index 38bf6fa..9d0069c 100644
--- a/app/core/log.py
+++ b/app/core/log.py
@@ -29,7 +29,16 @@ console_handler.setFormatter(formatter)
# 获取标准 logger
logger = logging.getLogger('ProxyPool')
-logger.setLevel(logging.INFO)
+
+# 尝试从配置读取日志级别,默认 INFO
+try:
+ from app.core.config import settings
+ log_level = getattr(logging, settings.log_level.upper(), logging.INFO)
+except Exception:
+ log_level = logging.INFO
+logger.setLevel(log_level)
+file_handler.setLevel(log_level)
+console_handler.setLevel(log_level)
# 防止重复添加 handler(如模块重导入)
if not logger.handlers:
diff --git a/app/models/schemas.py b/app/models/schemas.py
index 142ee8a..c261caf 100644
--- a/app/models/schemas.py
+++ b/app/models/schemas.py
@@ -23,6 +23,7 @@ class ProxyResponse(BaseModel):
port: int
protocol: str
score: int
+ response_time_ms: Optional[float] = None
last_check: Optional[str] = None
diff --git a/app/plugins/ip3366.py b/app/plugins/ip3366.py
index 8b79333..87f0edd 100644
--- a/app/plugins/ip3366.py
+++ b/app/plugins/ip3366.py
@@ -53,8 +53,11 @@ class Ip3366Plugin(BaseHTTPPlugin):
protocol = tds[4].get_text(strip=True).lower() if len(tds) > 4 else "http"
if protocol not in VALID_PROTOCOLS:
protocol = "http"
- if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit():
- results.append(ProxyRaw(ip, int(port), protocol))
+ if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit() and 1 <= int(port) <= 65535:
+ try:
+ results.append(ProxyRaw(ip, int(port), protocol))
+ except ValueError:
+ continue
if results:
logger.info(f"{self.display_name} 解析完成,获得 {len(results)} 个潜在代理")
diff --git a/app/plugins/ip89.py b/app/plugins/ip89.py
index 9a449e6..c972130 100644
--- a/app/plugins/ip89.py
+++ b/app/plugins/ip89.py
@@ -34,8 +34,11 @@ class Ip89Plugin(BaseHTTPPlugin):
if len(tds) >= 2:
ip = tds[0].get_text(strip=True)
port = tds[1].get_text(strip=True)
- if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit():
- results.append(ProxyRaw(ip, int(port), "http"))
+ if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit() and 1 <= int(port) <= 65535:
+ try:
+ results.append(ProxyRaw(ip, int(port), "http"))
+ except ValueError:
+ continue
await asyncio.sleep(random.uniform(1, 2))
diff --git a/app/plugins/kuaidaili.py b/app/plugins/kuaidaili.py
index 94fa5f1..b80825a 100644
--- a/app/plugins/kuaidaili.py
+++ b/app/plugins/kuaidaili.py
@@ -61,8 +61,11 @@ class KuaiDaiLiPlugin(BaseHTTPPlugin):
protocol = tds[4].get_text(strip=True).lower() if len(tds) > 4 else "http"
if protocol not in VALID_PROTOCOLS:
protocol = "http"
- if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit():
- results.append(ProxyRaw(ip, int(port), protocol))
+ if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit() and 1 <= int(port) <= 65535:
+ try:
+ results.append(ProxyRaw(ip, int(port), protocol))
+ except ValueError:
+ continue
await asyncio.sleep(random.uniform(5, 8))
if results:
diff --git a/app/plugins/proxylist_download.py b/app/plugins/proxylist_download.py
index 96863b1..3b83910 100644
--- a/app/plugins/proxylist_download.py
+++ b/app/plugins/proxylist_download.py
@@ -59,12 +59,14 @@ class ProxyListDownloadPlugin(BaseHTTPPlugin):
line = line.strip()
if not line or ":" not in line:
continue
- parts = line.split(":")
- if len(parts) >= 2:
- ip = parts[0].strip()
- port = parts[1].strip()
- if ip and port.isdigit():
+ ip, _, port = line.rpartition(":")
+ ip = ip.strip()
+ port = port.strip()
+ if ip and port.isdigit() and 1 <= int(port) <= 65535:
+ try:
results.append(ProxyRaw(ip, int(port), protocol))
+ except ValueError:
+ continue
return results
async def crawl(self) -> List[ProxyRaw]:
diff --git a/app/plugins/proxyscrape.py b/app/plugins/proxyscrape.py
index 2bc3c5a..4a3f8f4 100644
--- a/app/plugins/proxyscrape.py
+++ b/app/plugins/proxyscrape.py
@@ -42,12 +42,14 @@ class ProxyScrapePlugin(BaseHTTPPlugin):
line = line.strip()
if not line or ":" not in line:
continue
- parts = line.split(":")
- if len(parts) >= 2:
- ip = parts[0].strip()
- port_str = parts[1].strip()
- if port_str.isdigit():
+ ip, _, port_str = line.rpartition(":")
+ ip = ip.strip()
+ port_str = port_str.strip()
+ if port_str.isdigit() and 1 <= int(port_str) <= 65535:
+ try:
proxies.append(ProxyRaw(ip, int(port_str), protocol))
+ except ValueError:
+ continue
return proxies
async def crawl(self) -> List[ProxyRaw]:
@@ -71,7 +73,7 @@ class ProxyScrapePlugin(BaseHTTPPlugin):
htmls.append("")
except Exception:
htmls.append("")
- done_protocols.add(protocols[i])
+ # 异常时不加入 done_protocols,以便触发 API fallback
fallback_protocols = []
for protocol, html in zip(protocols, htmls):
diff --git a/app/plugins/speedx.py b/app/plugins/speedx.py
index 2a9dc3e..8c9dec9 100644
--- a/app/plugins/speedx.py
+++ b/app/plugins/speedx.py
@@ -42,15 +42,17 @@ class SpeedXPlugin(BaseHTTPPlugin):
line = line.strip()
if not line or ":" not in line:
continue
- parts = line.split(":")
- if len(parts) >= 2:
- ip = parts[0].strip()
- port = parts[1].strip()
- if not re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip):
- continue
- if not port.isdigit() or not (1 <= int(port) <= 65535):
- continue
+ ip, _, port = line.rpartition(":")
+ ip = ip.strip()
+ port = port.strip()
+ if not re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip):
+ continue
+ if not port.isdigit() or not (1 <= int(port) <= 65535):
+ continue
+ try:
results.append(ProxyRaw(ip, int(port), protocol))
+ except ValueError:
+ continue
return results
async def crawl(self) -> List[ProxyRaw]:
diff --git a/app/plugins/yundaili.py b/app/plugins/yundaili.py
index 1487916..729978e 100644
--- a/app/plugins/yundaili.py
+++ b/app/plugins/yundaili.py
@@ -40,17 +40,18 @@ class YunDaiLiPlugin(BaseHTTPPlugin):
line = line.strip()
if not line or ":" not in line:
continue
- parts = line.split(":")
- if len(parts) < 2:
- continue
- ip = parts[0].strip()
- port_str = parts[1].strip()
+ ip, _, port_str = line.rpartition(":")
+ ip = ip.strip()
+ port_str = port_str.strip()
if not re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip):
continue
if not port_str.isdigit() or not (1 <= int(port_str) <= 65535):
continue
final_protocol = protocol if protocol in VALID_PROTOCOLS else "http"
- results.append(ProxyRaw(ip, int(port_str), final_protocol))
+ try:
+ results.append(ProxyRaw(ip, int(port_str), final_protocol))
+ except ValueError:
+ continue
count += 1
if count:
diff --git a/app/repositories/proxy_repo.py b/app/repositories/proxy_repo.py
index 1a9bd6b..13fafc9 100644
--- a/app/repositories/proxy_repo.py
+++ b/app/repositories/proxy_repo.py
@@ -76,23 +76,23 @@ class ProxyRepository:
max_score: int = 100,
) -> bool:
try:
- async with db.execute(
- "SELECT score FROM proxies WHERE ip = ? AND port = ?", (ip, port)
- ) as cursor:
- row = await cursor.fetchone()
- if not row:
- return False
- current_score = row[0]
- new_score = max(min_score, min(max_score, current_score + delta))
- await db.execute(
- "UPDATE proxies SET score = ?, last_check = CURRENT_TIMESTAMP WHERE ip = ? AND port = ?",
- (new_score, ip, port),
- )
- if new_score <= 0:
- # 只删除当前代理,避免误删其他无效代理
- await db.execute("DELETE FROM proxies WHERE ip = ? AND port = ?", (ip, port))
- await db.commit()
- return True
+ # 原子更新:计算新分数并直接更新
+ await db.execute(
+ """
+ UPDATE proxies
+ SET score = MAX(?, MIN(?, score + ?)),
+ last_check = CURRENT_TIMESTAMP
+ WHERE ip = ? AND port = ?
+ """,
+ (min_score, max_score, delta, ip, port),
+ )
+ # 删除分数已降至 0 及以下的代理
+ await db.execute(
+ "DELETE FROM proxies WHERE ip = ? AND port = ? AND score <= ?",
+ (ip, port, min_score),
+ )
+ await db.commit()
+ return db.total_changes > 0
except Exception as e:
logger.error(f"update_score failed: {e}", exc_info=True)
return False
@@ -156,19 +156,35 @@ class ProxyRepository:
db: aiosqlite.Connection,
protocol: Optional[str] = None,
limit: int = 100000,
+ offset: int = 0,
) -> List[Proxy]:
query = "SELECT ip, port, protocol, score, response_time_ms, last_check, created_at FROM proxies"
params: List = []
if protocol:
query += " WHERE protocol = ?"
params.append(protocol.lower())
- query += " LIMIT ?"
- params.append(limit)
+ query += " LIMIT ? OFFSET ?"
+ params.extend([limit, offset])
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [_row_to_proxy(row) for row in rows]
+ @staticmethod
+ async def iter_batches(
+ db: aiosqlite.Connection,
+ protocol: Optional[str] = None,
+ batch_size: int = 1000,
+ ):
+ """流式分批读取代理,避免一次性加载大量数据到内存"""
+ offset = 0
+ while True:
+ batch = await ProxyRepository.list_all(db, protocol, batch_size, offset)
+ if not batch:
+ break
+ yield batch
+ offset += batch_size
+
@staticmethod
async def list_paginated(
db: aiosqlite.Connection,
diff --git a/app/services/plugin_service.py b/app/services/plugin_service.py
index 6c856d7..02c40b8 100644
--- a/app/services/plugin_service.py
+++ b/app/services/plugin_service.py
@@ -24,10 +24,9 @@ class PluginService:
result = []
for plugin in registry.list_plugins():
- # 合并持久化状态
+ # 合并持久化状态(不修改全局实例,避免并发竞争)
state = db_states.get(plugin.name, {})
- if "enabled" in state:
- plugin.enabled = state["enabled"]
+ enabled = state.get("enabled", plugin.enabled)
if "config" in state and isinstance(state["config"], dict):
plugin.update_config(state["config"])
@@ -50,7 +49,7 @@ class PluginService:
name=plugin.name,
display_name=plugin.display_name or plugin.name,
description=plugin.description or f"从 {plugin.name} 爬取代理",
- enabled=plugin.enabled,
+ enabled=enabled,
last_run=stat.get("last_run"),
success_count=stat.get("success_count", 0),
failure_count=stat.get("failure_count", 0),
@@ -133,11 +132,11 @@ class PluginService:
logger.error(f"Run all plugins error: {results}")
continue
all_results.extend(results)
- # 去重
+ # 去重(与数据库 UNIQUE(ip, port) 约束保持一致)
seen = set()
unique = []
for p in all_results:
- key = (p.ip, p.port, p.protocol)
+ key = (p.ip, p.port)
if key not in seen:
seen.add(key)
unique.append(p)
diff --git a/app/services/proxy_service.py b/app/services/proxy_service.py
index 4340a73..8209805 100644
--- a/app/services/proxy_service.py
+++ b/app/services/proxy_service.py
@@ -61,28 +61,41 @@ class ProxyService:
protocol: Optional[str] = None,
limit: int = 10000,
) -> AsyncIterator[str]:
- async with get_db() as db:
- proxies = await self.proxy_repo.list_all(db, protocol=protocol, limit=limit)
-
if fmt == "csv":
- yield "IP,Port,Protocol,Score,Last Check\n"
- for p in proxies:
- yield f"{p.ip},{p.port},{p.protocol},{p.score},{self._fmt_time(p.last_check)}\n"
+ yield "\ufeffIP,Port,Protocol,Score,Last Check\n"
elif fmt == "txt":
- for p in proxies:
- yield f"{p.ip}:{p.port}\n"
+ pass
elif fmt == "json":
- data = [
- {
- "ip": p.ip,
- "port": p.port,
- "protocol": p.protocol,
- "score": p.score,
- "last_check": self._fmt_time(p.last_check),
- }
- for p in proxies
- ]
- yield json.dumps(data, ensure_ascii=False, indent=2)
+ yield "["
+ first = True
+
+ exported = 0
+ async with get_db() as db:
+ async for batch in self.proxy_repo.iter_batches(db, protocol=protocol, batch_size=1000):
+ for p in batch:
+ if exported >= limit:
+ break
+ if fmt == "csv":
+ yield f"{p.ip},{p.port},{p.protocol},{p.score},{self._fmt_time(p.last_check)}\n"
+ elif fmt == "txt":
+ yield f"{p.ip}:{p.port}\n"
+ elif fmt == "json":
+ item = {
+ "ip": p.ip,
+ "port": p.port,
+ "protocol": p.protocol,
+ "score": p.score,
+ "last_check": self._fmt_time(p.last_check),
+ }
+ prefix = "" if first else ","
+ yield prefix + json.dumps(item, ensure_ascii=False)
+ first = False
+ exported += 1
+ if exported >= limit:
+ break
+
+ if fmt == "json":
+ yield "]"
@staticmethod
def _fmt_time(dt: Optional[datetime]) -> str:
diff --git a/app/services/scheduler_service.py b/app/services/scheduler_service.py
index 1a70516..3996319 100644
--- a/app/services/scheduler_service.py
+++ b/app/services/scheduler_service.py
@@ -8,6 +8,7 @@ from app.core.tasks.queue import ValidationQueue
from app.core.config import settings as app_settings
from app.core.log import logger
from app.models.domain import ProxyRaw
+from app.services.task_service import task_service
class SchedulerService:
@@ -58,16 +59,15 @@ class SchedulerService:
"""立即执行一次全量验证(后台运行,不阻塞)"""
if self._validate_task and not self._validate_task.done():
return
- self._validate_task = asyncio.create_task(self._do_validate_all())
+ self._validate_task = asyncio.create_task(self._do_validate_all(from_loop=False))
async def _run_loop(self):
"""定时循环"""
while self.running:
try:
# 清理过期任务,防止内存无限增长
- from app.services.task_service import task_service
task_service.cleanup_old_tasks()
- await self._do_validate_all()
+ await self._do_validate_all(from_loop=True)
except Exception as e:
logger.error(f"Scheduler loop error: {e}", exc_info=True)
# 等待下一次
@@ -76,10 +76,17 @@ class SchedulerService:
except asyncio.TimeoutError:
pass
- async def _do_validate_all(self):
+ async def _do_validate_all(self, from_loop: bool = True):
"""验证数据库中所有存量代理"""
+ queue_started_here = False
try:
logger.info("Starting scheduled validation for all proxies")
+
+ # 如果队列未运行,临时启动它(适用于 validate_all_now 在调度器停止时调用)
+ if not self.validation_queue._running:
+ await self.validation_queue.start()
+ queue_started_here = True
+
async with get_db() as db:
# 清理 7 天前的验证任务记录,防止表无限增长
cleaned = await ValidationTaskRepository.cleanup_old(db, days=7)
@@ -95,7 +102,7 @@ class SchedulerService:
batch_size = 100
total_batches = (len(proxies) - 1) // batch_size + 1
for i in range(0, len(proxies), batch_size):
- if not self.running:
+ if from_loop and not self.running:
break
batch = proxies[i : i + batch_size]
await self.validation_queue.submit([
@@ -106,3 +113,6 @@ class SchedulerService:
logger.info("Scheduled validation batches submitted")
except Exception as e:
logger.error(f"Scheduled validation error: {e}", exc_info=True)
+ finally:
+ if queue_started_here:
+ await self.validation_queue.stop()
diff --git a/app/services/validator_service.py b/app/services/validator_service.py
index 3383a9e..4b3cec2 100644
--- a/app/services/validator_service.py
+++ b/app/services/validator_service.py
@@ -95,17 +95,13 @@ class ValidatorService:
timeout = aiohttp.ClientTimeout(total=self.timeout, connect=self.connect_timeout)
test_url = self._get_test_url("http")
- try:
- async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
- async with session.get(test_url, allow_redirects=True) as response:
- if response.status in (200, 301, 302):
- latency = round((time.time() - start) * 1000, 2)
- logger.info(f"SOCKS valid: {ip}:{port} ({protocol}) {latency}ms")
- return True, latency
- return False, 0.0
- finally:
- # ClientSession 的 async with 退出时会自动关闭 connector,无需手动重复关闭
- pass
+ async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
+ async with session.get(test_url, allow_redirects=True) as response:
+ if response.status in (200, 301, 302):
+ latency = round((time.time() - start) * 1000, 2)
+ logger.info(f"SOCKS valid: {ip}:{port} ({protocol}) {latency}ms")
+ return True, latency
+ return False, 0.0
async def close(self):
"""关闭共享的 HTTP ClientSession"""
diff --git a/pytest.ini b/pytest.ini
index 98047b6..f3e68c3 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -14,4 +14,4 @@ markers =
e2e: 端到端测试
slow: 慢速测试
async_test: 异步测试
-asyncio_mode = auto
+asyncio_default_fixture_loop_scope = function
diff --git a/requirements.txt b/requirements.txt
index a96639b..526a10c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -6,3 +6,4 @@ aiohttp-socks==0.9.1
beautifulsoup4==4.12.3
lxml==5.1.0
pydantic-settings==2.8.1
+httpx==0.27.0
diff --git a/tests/unit/test_repositories.py b/tests/unit/test_repositories.py
index dc99a67..b7b3a7f 100644
--- a/tests/unit/test_repositories.py
+++ b/tests/unit/test_repositories.py
@@ -68,6 +68,37 @@ class TestProxyRepository:
# 清理
await proxy_repo.delete(db, "192.168.1.1", 8080)
+ @pytest.mark.asyncio
+ async def test_update_score_deletes_on_zero_or_below(self, db, proxy_repo):
+ """测试分数降至 0 及以下时自动删除代理"""
+ await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 5)
+
+ result = await proxy_repo.update_score(db, "192.168.1.1", 8080, -10)
+ assert result is True
+
+ proxy = await proxy_repo.get_by_ip_port(db, "192.168.1.1", 8080)
+ assert proxy is None
+
+ @pytest.mark.asyncio
+ async def test_iter_batches(self, db, proxy_repo):
+ """测试流式分批读取"""
+ # 插入 5 条测试数据
+ for i in range(5):
+ await proxy_repo.insert_or_update(db, f"192.168.1.{i}", 8000 + i, "http", 10)
+
+ batches = []
+ async for batch in proxy_repo.iter_batches(db, batch_size=2):
+ batches.append(batch)
+
+ assert len(batches) == 3
+ assert len(batches[0]) == 2
+ assert len(batches[1]) == 2
+ assert len(batches[2]) == 1
+
+ # 清理
+ for i in range(5):
+ await proxy_repo.delete(db, f"192.168.1.{i}", 8000 + i)
+
@pytest.mark.asyncio
async def test_batch_delete(self, db, proxy_repo):
"""测试批量删除"""