"""数据库连接管理 - 使用上下文管理器,避免全局单例连接泄漏""" import os import aiosqlite from contextlib import asynccontextmanager from typing import AsyncIterator from app.core.config import settings from app.core.log import logger DB_PATH = os.path.join(settings.base_dir, settings.db_path) def ensure_db_dir(): db_dir = os.path.dirname(DB_PATH) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) async def _apply_connection_pragmas(db: aiosqlite.Connection) -> None: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA synchronous=NORMAL") await db.execute("PRAGMA busy_timeout=30000") # aiosqlite/sqlite3:等待锁的最长时间(秒),与高并发验证写入配合 _SQLITE_CONNECT_TIMEOUT = 30.0 async def init_db(): """初始化数据库表结构(支持迁移)""" ensure_db_dir() async with aiosqlite.connect(DB_PATH, timeout=_SQLITE_CONNECT_TIMEOUT) as db: await _apply_connection_pragmas(db) await db.execute("PRAGMA cache_size=-64000") await db.execute("PRAGMA temp_store=MEMORY") await db.execute(""" CREATE TABLE IF NOT EXISTS proxies ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip TEXT NOT NULL, port INTEGER NOT NULL, protocol TEXT DEFAULT 'http', score INTEGER DEFAULT 10, response_time_ms REAL, last_check TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(ip, port) ) """) # 迁移:如果旧表缺少 response_time_ms 列,则添加 try: await db.execute("SELECT response_time_ms FROM proxies LIMIT 1") except Exception: await db.execute("ALTER TABLE proxies ADD COLUMN response_time_ms REAL") logger.info("Migrated: added response_time_ms column") # 迁移:如果旧表缺少 created_at 列,则添加 try: await db.execute("SELECT created_at FROM proxies LIMIT 1") except Exception: await db.execute("ALTER TABLE proxies ADD COLUMN created_at TIMESTAMP") await db.execute("UPDATE proxies SET created_at = CURRENT_TIMESTAMP WHERE created_at IS NULL") logger.info("Migrated: added created_at column") # 迁移:validated 0=待验证 1=已验证入池(参与分数维护) try: await db.execute("SELECT validated FROM proxies LIMIT 1") except Exception: await db.execute( "ALTER TABLE proxies ADD COLUMN validated INTEGER NOT NULL DEFAULT 0" ) await db.execute( "UPDATE proxies SET validated = 1 WHERE score > 0" ) logger.info("Migrated: added validated column") try: await db.execute("SELECT use_count FROM proxies LIMIT 1") except Exception: await db.execute( "ALTER TABLE proxies ADD COLUMN use_count INTEGER NOT NULL DEFAULT 0" ) logger.info("Migrated: added use_count column") await db.execute("CREATE INDEX IF NOT EXISTS idx_score ON proxies(score)") await db.execute("CREATE INDEX IF NOT EXISTS idx_protocol ON proxies(protocol)") await db.execute("CREATE INDEX IF NOT EXISTS idx_last_check ON proxies(last_check)") await db.execute("CREATE INDEX IF NOT EXISTS idx_ip_port ON proxies(ip, port)") await db.execute("CREATE INDEX IF NOT EXISTS idx_validated ON proxies(validated)") # 插件设置表 await db.execute(""" CREATE TABLE IF NOT EXISTS plugin_settings ( plugin_id TEXT PRIMARY KEY, enabled INTEGER DEFAULT 1, config_json TEXT DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 迁移:为旧版 plugin_settings 表增加 config_json 列 try: await db.execute("SELECT config_json FROM plugin_settings LIMIT 1") except Exception: await db.execute("ALTER TABLE plugin_settings ADD COLUMN config_json TEXT DEFAULT '{}'") logger.info("Migrated: added config_json column to plugin_settings") # 迁移:为旧版 plugin_settings 表增加 stats_json 列 try: await db.execute("SELECT stats_json FROM plugin_settings LIMIT 1") except Exception: await db.execute("ALTER TABLE plugin_settings ADD COLUMN stats_json TEXT DEFAULT '{}'") logger.info("Migrated: added stats_json column to plugin_settings") # 验证任务队列表已废弃,不再创建;旧表保留在数据库中不影响功能 # 系统设置表 await db.execute(""" CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 仅移除已废弃设置键,不碰 proxies 表数据 await db.execute( "DELETE FROM settings WHERE key IN ('crawl_timeout', 'max_retries')" ) await db.commit() logger.info("Database initialized") @asynccontextmanager async def get_db() -> AsyncIterator[aiosqlite.Connection]: """获取数据库连接的异步上下文管理器""" ensure_db_dir() db = await aiosqlite.connect(DB_PATH, timeout=_SQLITE_CONNECT_TIMEOUT) try: await _apply_connection_pragmas(db) yield db finally: await db.close() @asynccontextmanager async def get_db_connection() -> AsyncIterator[aiosqlite.Connection]: """与 get_db 相同 pragma/超时;保留别名供需「长连接」语义处使用。""" ensure_db_dir() db = await aiosqlite.connect(DB_PATH, timeout=_SQLITE_CONNECT_TIMEOUT) try: await _apply_connection_pragmas(db) yield db finally: await db.close() @asynccontextmanager async def transaction() -> AsyncIterator[aiosqlite.Connection]: """获取带有显式事务控制的数据库连接 用法: async with transaction() as db: await repo.update(db, ...) # 如果抛出异常,自动 rollback """ ensure_db_dir() db = await aiosqlite.connect(DB_PATH, timeout=_SQLITE_CONNECT_TIMEOUT) try: await _apply_connection_pragmas(db) await db.execute("BEGIN") yield db await db.commit() except Exception: await db.rollback() raise finally: await db.close()