Files
ProxyPool/app/repositories/settings_repo.py
祀梦 0131c8b408 feat: fpw plugins, validation/crawl perf, WS stats, test DB isolation
- Add Free_Proxy_Website-style fpw_* plugins and register them
- Per-plugin crawl timeout (crawl_timeout_seconds=120); remove global crawl_timeout setting
- Validator: fix connect vs total timeout on save; SOCKS session LRU cache; drop redundant semaphore
- Validation handler uses single DB connection; batch upsert after crawl; WorkerPool put_nowait
- Remove unused max_retries from settings API/UI; settings maintenance SQL + init_db cleanup of deprecated keys
- WebSocket dashboard stats; ProxyList pool_filter and API alignment
- POST /api/proxies/delete-one for IPv6-safe deletes; task poll stops on 404
- pytest uses PROXYPOOL_DB_PATH=db/proxies.test.sqlite so tests do not wipe production DB
- .gitignore: explicit proxies.test.sqlite patterns; fix plugin_service ValidationException import

Made-with: Cursor
2026-04-05 13:39:19 +08:00

197 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""设置数据访问层"""
import json
import aiosqlite
from typing import Optional, Dict, Any
from app.core.log import logger
DEFAULT_SETTINGS = {
"validation_timeout": 6,
"default_concurrency": 120,
"min_proxy_score": 0,
"proxy_expiry_days": 7,
"auto_validate": True,
"auto_validate_after_crawl": False,
"validate_interval_minutes": 30,
"validation_targets": [
"http://httpbin.org/ip",
"https://httpbin.org/ip",
"http://api.ipify.org",
"https://api.ipify.org",
"http://www.baidu.com",
"http://www.qq.com",
],
}
class SettingsRepository:
"""系统设置 Repository"""
@staticmethod
async def get_all(db: aiosqlite.Connection) -> Dict[str, Any]:
settings = DEFAULT_SETTINGS.copy()
try:
async with db.execute("SELECT key, value FROM settings") as cursor:
rows = await cursor.fetchall()
for key, value in rows:
# 类型转换
default = DEFAULT_SETTINGS.get(key)
if isinstance(default, bool):
settings[key] = value.lower() == "true"
elif isinstance(default, int):
settings[key] = int(value)
elif isinstance(default, list):
try:
settings[key] = json.loads(value)
except json.JSONDecodeError:
settings[key] = default
else:
settings[key] = value
except Exception as e:
logger.error(f"get_all settings failed: {e}")
# 已废弃:爬取限时改为每插件 crawl_timeout_seconds不再存全局项
settings.pop("crawl_timeout", None)
return settings
@staticmethod
async def save(db: aiosqlite.Connection, settings: Dict[str, Any]) -> bool:
try:
for key, value in settings.items():
if isinstance(value, list):
stored_value = json.dumps(value, ensure_ascii=False)
else:
stored_value = str(value)
await db.execute(
"""
INSERT INTO settings (key, value, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = CURRENT_TIMESTAMP
""",
(key, stored_value),
)
await db.commit()
return True
except Exception as e:
logger.error(f"save settings failed: {e}")
return False
class PluginSettingsRepository:
"""插件设置 Repository"""
@staticmethod
async def get_enabled(db: aiosqlite.Connection, plugin_id: str) -> Optional[bool]:
async with db.execute(
"SELECT enabled FROM plugin_settings WHERE plugin_id = ?", (plugin_id,)
) as cursor:
row = await cursor.fetchone()
if row:
return bool(row[0])
return None
@staticmethod
async def set_enabled(db: aiosqlite.Connection, plugin_id: str, enabled: bool) -> bool:
try:
await db.execute(
"""
INSERT INTO plugin_settings (plugin_id, enabled, created_at, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT(plugin_id) DO UPDATE SET
enabled = excluded.enabled,
updated_at = CURRENT_TIMESTAMP
""",
(plugin_id, int(enabled)),
)
await db.commit()
return True
except Exception as e:
logger.error(f"set_enabled failed for {plugin_id}: {e}")
return False
@staticmethod
async def get_config(db: aiosqlite.Connection, plugin_id: str) -> Optional[Dict[str, Any]]:
async with db.execute(
"SELECT config_json FROM plugin_settings WHERE plugin_id = ?", (plugin_id,)
) as cursor:
row = await cursor.fetchone()
if row and row[0]:
try:
return json.loads(row[0])
except json.JSONDecodeError:
return None
return None
@staticmethod
async def set_config(db: aiosqlite.Connection, plugin_id: str, config: Dict[str, Any]) -> bool:
try:
await db.execute(
"""
INSERT INTO plugin_settings (plugin_id, config_json, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(plugin_id) DO UPDATE SET
config_json = excluded.config_json,
updated_at = CURRENT_TIMESTAMP
""",
(plugin_id, json.dumps(config, ensure_ascii=False)),
)
await db.commit()
return True
except Exception as e:
logger.error(f"set_config failed for {plugin_id}: {e}")
return False
@staticmethod
async def get_stats(db: aiosqlite.Connection, plugin_id: str) -> Dict[str, Any]:
async with db.execute(
"SELECT stats_json FROM plugin_settings WHERE plugin_id = ?", (plugin_id,)
) as cursor:
row = await cursor.fetchone()
if row and row[0]:
try:
return json.loads(row[0])
except json.JSONDecodeError:
return {}
return {}
@staticmethod
async def set_stats(db: aiosqlite.Connection, plugin_id: str, stats: Dict[str, Any]) -> bool:
try:
await db.execute(
"""
INSERT INTO plugin_settings (plugin_id, stats_json, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(plugin_id) DO UPDATE SET
stats_json = excluded.stats_json,
updated_at = CURRENT_TIMESTAMP
""",
(plugin_id, json.dumps(stats, ensure_ascii=False)),
)
await db.commit()
return True
except Exception as e:
logger.error(f"set_stats failed for {plugin_id}: {e}")
return False
@staticmethod
async def list_all(db: aiosqlite.Connection) -> Dict[str, Dict[str, Any]]:
result = {}
async with db.execute("SELECT plugin_id, enabled, config_json, stats_json FROM plugin_settings") as cursor:
rows = await cursor.fetchall()
for plugin_id, enabled, config_json, stats_json in rows:
config = {}
if config_json:
try:
config = json.loads(config_json)
except json.JSONDecodeError:
pass
stats = {}
if stats_json:
try:
stats = json.loads(stats_json)
except json.JSONDecodeError:
pass
result[plugin_id] = {"enabled": bool(enabled), "config": config, "stats": stats}
return result