"""代理数据访问层 - 所有 SQL 操作收敛于此""" import aiosqlite from datetime import datetime, timedelta from typing import List, Optional, Tuple, Union from app.models.domain import Proxy, ProxyRaw from app.core.log import logger VALID_PROTOCOLS = ("http", "https", "socks4", "socks5") def _to_datetime(value: Union[str, datetime, None]) -> Optional[datetime]: if value is None: return None if isinstance(value, datetime): return value if isinstance(value, str): for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"): try: return datetime.strptime(value, fmt) except ValueError: continue return None def _row_to_proxy(row: Tuple) -> Proxy: validated = int(row[7]) if len(row) > 7 and row[7] is not None else 0 use_count = int(row[8]) if len(row) > 8 and row[8] is not None else 0 return Proxy( ip=row[0], port=row[1], protocol=row[2], score=row[3], response_time_ms=row[4], last_check=_to_datetime(row[5]), created_at=_to_datetime(row[6]), validated=validated, use_count=use_count, ) _SELECT_PROXY_COLS = ( "ip, port, protocol, score, response_time_ms, last_check, created_at, validated, use_count" ) class ProxyRepository: """代理 Repository""" @staticmethod async def insert_or_update( db: aiosqlite.Connection, ip: str, port: int, protocol: str = "http", score: int = 10, ) -> bool: if protocol not in VALID_PROTOCOLS: protocol = "http" try: await db.execute( """ INSERT INTO proxies (ip, port, protocol, score, last_check, created_at, validated, use_count) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 0) ON CONFLICT(ip, port) DO UPDATE SET protocol = excluded.protocol, score = excluded.score, last_check = CURRENT_TIMESTAMP, validated = 1 """, (ip, port, protocol, score), ) await db.commit() return True except Exception as e: logger.error(f"insert_or_update proxy failed: {e}", exc_info=True) return False @staticmethod async def upsert_from_crawl( db: aiosqlite.Connection, ip: str, port: int, protocol: str = "http", initial_score: int = 0, ) -> None: """爬取入库:待验证状态(validated=0, score=0);再次爬取同一条则重置为待验证。""" if protocol not in VALID_PROTOCOLS: protocol = "http" await db.execute( """ INSERT INTO proxies (ip, port, protocol, score, last_check, created_at, validated, use_count) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0, 0) ON CONFLICT(ip, port) DO UPDATE SET protocol = excluded.protocol, score = excluded.score, last_check = CURRENT_TIMESTAMP, validated = 0, use_count = 0 """, (ip, port, protocol, initial_score), ) @staticmethod async def upsert_many_from_crawl( db: aiosqlite.Connection, proxies: List[ProxyRaw], initial_score: int = 0, ) -> None: """批量爬取入库;不 commit,由外层 transaction 提交。""" if not proxies: return rows = [] for p in proxies: proto = p.protocol if p.protocol in VALID_PROTOCOLS else "http" rows.append((p.ip, p.port, proto, initial_score)) await db.executemany( """ INSERT INTO proxies (ip, port, protocol, score, last_check, created_at, validated, use_count) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0, 0) ON CONFLICT(ip, port) DO UPDATE SET protocol = excluded.protocol, score = excluded.score, last_check = CURRENT_TIMESTAMP, validated = 0, use_count = 0 """, rows, ) @staticmethod async def update_score( db: aiosqlite.Connection, ip: str, port: int, delta: int, min_score: int = 0, max_score: int = 100, ) -> bool: try: # 原子更新:计算新分数并直接更新 await db.execute( """ UPDATE proxies SET score = MAX(?, MIN(?, score + ?)), last_check = CURRENT_TIMESTAMP WHERE ip = ? AND port = ? """, (min_score, max_score, delta, ip, port), ) # 仅删除已入池且分数耗尽者;待验证(score=0)不经过此路径 await db.execute( """ DELETE FROM proxies WHERE ip = ? AND port = ? AND score <= ? AND validated = 1 """, (ip, port, min_score), ) await db.commit() return db.total_changes > 0 except Exception as e: logger.error(f"update_score failed: {e}", exc_info=True) return False @staticmethod async def update_response_time( db: aiosqlite.Connection, ip: str, port: int, response_time_ms: float, ) -> bool: try: await db.execute( "UPDATE proxies SET response_time_ms = ? WHERE ip = ? AND port = ?", (response_time_ms, ip, port), ) await db.commit() return True except Exception as e: logger.error(f"update_response_time failed: {e}", exc_info=True) return False @staticmethod async def set_use_count_and_score( db: aiosqlite.Connection, ip: str, port: int, use_count: int, score: int, ) -> bool: try: await db.execute( """ UPDATE proxies SET use_count = ?, score = ?, last_check = CURRENT_TIMESTAMP WHERE ip = ? AND port = ? AND validated = 1 """, (use_count, score, ip, port), ) await db.commit() return db.total_changes > 0 except Exception as e: logger.error(f"set_use_count_and_score failed: {e}", exc_info=True) return False @staticmethod async def delete(db: aiosqlite.Connection, ip: str, port: int) -> None: await db.execute("DELETE FROM proxies WHERE ip = ? AND port = ?", (ip, port)) await db.commit() @staticmethod async def batch_delete(db: aiosqlite.Connection, proxies: List[Tuple[str, int]]) -> int: if not proxies: return 0 changes_before = db.total_changes await db.executemany("DELETE FROM proxies WHERE ip = ? AND port = ?", proxies) await db.commit() return db.total_changes - changes_before @staticmethod async def get_by_ip_port( db: aiosqlite.Connection, ip: str, port: int ) -> Optional[Proxy]: async with db.execute( f"SELECT {_SELECT_PROXY_COLS} FROM proxies WHERE ip = ? AND port = ?", (ip, port), ) as cursor: row = await cursor.fetchone() if row: return _row_to_proxy(row) return None @staticmethod async def get_random(db: aiosqlite.Connection) -> Optional[Proxy]: async with db.execute( f""" SELECT {_SELECT_PROXY_COLS} FROM proxies WHERE validated = 1 AND score > 0 ORDER BY RANDOM() LIMIT 1 """ ) as cursor: row = await cursor.fetchone() if row: return _row_to_proxy(row) return None @staticmethod async def list_all( db: aiosqlite.Connection, protocol: Optional[str] = None, limit: int = 100000, offset: int = 0, validated: Optional[int] = None, ) -> List[Proxy]: query = f"SELECT {_SELECT_PROXY_COLS} FROM proxies" params: List = [] clauses = [] if protocol: clauses.append("protocol = ?") params.append(protocol.lower()) if validated is not None: clauses.append("validated = ?") params.append(int(validated)) if clauses: query += " WHERE " + " AND ".join(clauses) query += " LIMIT ? OFFSET ?" params.extend([limit, offset]) async with db.execute(query, params) as cursor: rows = await cursor.fetchall() return [_row_to_proxy(row) for row in rows] @staticmethod async def list_for_validation( db: aiosqlite.Connection, protocol: Optional[str] = None, ) -> List[Proxy]: """待验证优先,其次已验证按 last_check 升序(用于全量/调度复检)。""" pending: List[Proxy] = [] q = f"SELECT {_SELECT_PROXY_COLS} FROM proxies WHERE validated = 0" params: List = [] if protocol: q += " AND protocol = ?" params.append(protocol.lower()) q += " ORDER BY created_at ASC" async with db.execute(q, params) as cursor: rows_p = await cursor.fetchall() pending = [_row_to_proxy(r) for r in rows_p] rest_q = f"SELECT {_SELECT_PROXY_COLS} FROM proxies WHERE validated = 1" rparams: List = [] if protocol: rest_q += " AND protocol = ?" rparams.append(protocol.lower()) rest_q += " ORDER BY last_check ASC" async with db.execute(rest_q, rparams) as cursor: rows_r = await cursor.fetchall() rest = [_row_to_proxy(r) for r in rows_r] return pending + rest @staticmethod async def iter_batches( db: aiosqlite.Connection, protocol: Optional[str] = None, batch_size: int = 1000, only_usable: bool = False, ): """流式分批读取代理,避免一次性加载大量数据到内存""" offset = 0 while True: batch = await ProxyRepository._list_batch_offset( db, protocol, batch_size, offset, only_usable=only_usable ) if not batch: break yield batch offset += batch_size @staticmethod async def _list_batch_offset( db: aiosqlite.Connection, protocol: Optional[str], batch_size: int, offset: int, only_usable: bool, ) -> List[Proxy]: query = f"SELECT {_SELECT_PROXY_COLS} FROM proxies" params: List = [] clauses = [] if only_usable: clauses.append("validated = 1 AND score > 0") if protocol: clauses.append("protocol = ?") params.append(protocol.lower()) if clauses: query += " WHERE " + " AND ".join(clauses) query += " LIMIT ? OFFSET ?" params.extend([batch_size, offset]) async with db.execute(query, params) as cursor: rows = await cursor.fetchall() return [_row_to_proxy(row) for row in rows] @staticmethod async def list_paginated( db: aiosqlite.Connection, page: int = 1, page_size: int = 20, protocol: Optional[str] = None, min_score: int = 0, max_score: Optional[int] = None, sort_by: str = "last_check", sort_order: str = "DESC", pool_filter: Optional[str] = None, ) -> Tuple[List[Proxy], int]: conditions = ["score >= ?"] params: List = [min_score] if protocol: conditions.append("protocol = ?") params.append(protocol) if max_score is not None: conditions.append("score <= ?") params.append(max_score) if pool_filter == "pending": conditions.append("validated = 0") elif pool_filter == "available": conditions.append("validated = 1 AND score > 0") where_clause = " AND ".join(conditions) allowed_sort_by = {"ip", "port", "protocol", "score", "last_check"} allowed_sort_order = {"ASC", "DESC"} if sort_by not in allowed_sort_by or sort_order.upper() not in allowed_sort_order: order_clause = "last_check DESC" else: order_clause = f"{sort_by} {sort_order.upper()}" offset = (page - 1) * page_size count_query = f"SELECT COUNT(*) FROM proxies WHERE {where_clause}" async with db.execute(count_query, list(params)) as cursor: row = await cursor.fetchone() total = row[0] if row else 0 data_query = f""" SELECT {_SELECT_PROXY_COLS} FROM proxies WHERE {where_clause} ORDER BY {order_clause} LIMIT ? OFFSET ? """ params.extend([page_size, offset]) async with db.execute(data_query, params) as cursor: rows = await cursor.fetchall() proxies = [_row_to_proxy(row) for row in rows] return proxies, total @staticmethod async def get_stats(db: aiosqlite.Connection) -> dict: """统计快照。 协议计数(http/https/socks*)仅含已验证且 score>0 的可用代理,供首页图表与「可用」口径一致。 pending_* 为待验证池(validated=0)按协议分布。 """ query = """ SELECT COUNT(*) as total, COUNT(CASE WHEN validated = 0 THEN 1 END) as pending, COUNT(CASE WHEN validated = 1 AND score > 0 THEN 1 END) as available, (SELECT AVG(score) FROM proxies WHERE validated = 1 AND score > 0) as avg_score, COUNT(CASE WHEN validated = 1 AND score > 0 AND protocol = 'http' THEN 1 END) as http_count, COUNT(CASE WHEN validated = 1 AND score > 0 AND protocol = 'https' THEN 1 END) as https_count, COUNT(CASE WHEN validated = 1 AND score > 0 AND protocol = 'socks4' THEN 1 END) as socks4_count, COUNT(CASE WHEN validated = 1 AND score > 0 AND protocol = 'socks5' THEN 1 END) as socks5_count, COUNT(CASE WHEN validated = 0 AND protocol = 'http' THEN 1 END) as pending_http_count, COUNT(CASE WHEN validated = 0 AND protocol = 'https' THEN 1 END) as pending_https_count, COUNT(CASE WHEN validated = 0 AND protocol = 'socks4' THEN 1 END) as pending_socks4_count, COUNT(CASE WHEN validated = 0 AND protocol = 'socks5' THEN 1 END) as pending_socks5_count, COUNT(CASE WHEN validated = 1 AND score <= 0 THEN 1 END) as invalid_count, (SELECT AVG(response_time_ms) FROM proxies WHERE validated = 1 AND score > 0 AND response_time_ms IS NOT NULL AND response_time_ms > 0) as avg_response_ms FROM proxies """ async with db.execute(query) as cursor: row = await cursor.fetchone() if row: avg_lat = row[13] return { "total": row[0] or 0, "pending": row[1] or 0, "available": row[2] or 0, "avg_score": round(row[3], 2) if row[3] is not None else 0, "http_count": row[4] or 0, "https_count": row[5] or 0, "socks4_count": row[6] or 0, "socks5_count": row[7] or 0, "pending_http_count": row[8] or 0, "pending_https_count": row[9] or 0, "pending_socks4_count": row[10] or 0, "pending_socks5_count": row[11] or 0, "invalid_count": row[12] or 0, "avg_response_ms": round(avg_lat, 2) if avg_lat is not None else None, } return { "total": 0, "pending": 0, "available": 0, "avg_score": 0, "http_count": 0, "https_count": 0, "socks4_count": 0, "socks5_count": 0, "pending_http_count": 0, "pending_https_count": 0, "pending_socks4_count": 0, "pending_socks5_count": 0, "invalid_count": 0, "avg_response_ms": None, } @staticmethod async def get_today_new_count(db: aiosqlite.Connection) -> int: """今日新增:仅统计今日入库且已验证可用(与 get_stats.available 语义一致)。""" try: async with db.execute( """ SELECT COUNT(*) FROM proxies WHERE DATE(created_at) = DATE('now', 'localtime') AND validated = 1 AND score > 0 """ ) as cursor: row = await cursor.fetchone() return row[0] if row else 0 except Exception as e: logger.error(f"get_today_new_count failed: {e}", exc_info=True) return 0 @staticmethod async def clean_invalid(db: aiosqlite.Connection) -> int: await db.execute( "DELETE FROM proxies WHERE validated = 1 AND score <= 0" ) await db.commit() return db.total_changes @staticmethod async def clean_expired(db: aiosqlite.Connection, days: int) -> int: try: await db.execute( "DELETE FROM proxies WHERE last_check < datetime('now', '-' || ? || ' days')", (days,), ) await db.commit() return db.total_changes except Exception as e: logger.error(f"clean_expired failed: {e}", exc_info=True) return 0