Files
ProxyPool/app/repositories/proxy_repo.py

599 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""代理数据访问层 - 所有 SQL 操作收敛于此"""
import aiosqlite
from datetime import datetime, timedelta
from typing import List, Optional, Tuple, Union
from app.core.config import settings as app_settings
from app.models.domain import Proxy, ProxyRaw
from app.core.log import logger
VALID_PROTOCOLS = ("http", "https", "socks4", "socks5")
def _to_datetime(value: Union[str, datetime, None]) -> Optional[datetime]:
if value is None:
return None
if isinstance(value, datetime):
return value
if isinstance(value, str):
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"):
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
return None
def _row_to_proxy(row: Tuple) -> Proxy:
validated = int(row[7]) if len(row) > 7 and row[7] is not None else 0
use_count = int(row[8]) if len(row) > 8 and row[8] is not None else 0
return Proxy(
ip=row[0],
port=row[1],
protocol=row[2],
score=row[3],
response_time_ms=row[4],
last_check=_to_datetime(row[5]),
created_at=_to_datetime(row[6]),
validated=validated,
use_count=use_count,
)
_SELECT_PROXY_COLS = (
"ip, port, protocol, score, response_time_ms, last_check, created_at, validated, use_count"
)
class ProxyRepository:
"""代理 Repository"""
@staticmethod
async def insert_or_update(
db: aiosqlite.Connection,
ip: str,
port: int,
protocol: str = "http",
score: Optional[int] = None,
) -> bool:
if protocol not in VALID_PROTOCOLS:
protocol = "http"
if score is None:
score = int(app_settings.score_valid)
try:
await db.execute(
"""
INSERT INTO proxies (ip, port, protocol, score, last_check, created_at, validated, use_count)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 0)
ON CONFLICT(ip, port) DO UPDATE SET
protocol = excluded.protocol,
score = excluded.score,
last_check = CURRENT_TIMESTAMP,
validated = 1
""",
(ip, port, protocol, score),
)
await db.commit()
return True
except Exception as e:
logger.error(f"insert_or_update proxy failed: {e}", exc_info=True)
return False
@staticmethod
async def upsert_from_crawl(
db: aiosqlite.Connection,
ip: str,
port: int,
protocol: str = "http",
initial_score: int = 0,
) -> None:
"""爬取入库待验证validated=0score 由 initial_score 决定(通常来自配置 score_valid"""
if protocol not in VALID_PROTOCOLS:
protocol = "http"
await db.execute(
"""
INSERT INTO proxies (ip, port, protocol, score, last_check, created_at, validated, use_count)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0, 0)
ON CONFLICT(ip, port) DO UPDATE SET
protocol = excluded.protocol,
score = excluded.score,
last_check = CURRENT_TIMESTAMP,
validated = 0,
use_count = 0
""",
(ip, port, protocol, initial_score),
)
@staticmethod
async def upsert_many_from_crawl(
db: aiosqlite.Connection,
proxies: List[ProxyRaw],
initial_score: int = 0,
) -> None:
"""批量爬取入库;不 commit由外层 transaction 提交。"""
if not proxies:
return
rows = []
for p in proxies:
proto = p.protocol if p.protocol in VALID_PROTOCOLS else "http"
rows.append((p.ip, p.port, proto, initial_score))
await db.executemany(
"""
INSERT INTO proxies (ip, port, protocol, score, last_check, created_at, validated, use_count)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0, 0)
ON CONFLICT(ip, port) DO UPDATE SET
protocol = excluded.protocol,
score = excluded.score,
last_check = CURRENT_TIMESTAMP,
validated = 0,
use_count = 0
""",
rows,
)
@staticmethod
async def update_score(
db: aiosqlite.Connection,
ip: str,
port: int,
delta: int,
min_score: int = 0,
max_score: int = 100,
) -> bool:
try:
# 原子更新:计算新分数并直接更新
await db.execute(
"""
UPDATE proxies
SET score = MAX(?, MIN(?, score + ?)),
last_check = CURRENT_TIMESTAMP
WHERE ip = ? AND port = ?
""",
(min_score, max_score, delta, ip, port),
)
# 仅删除已入池且分数耗尽者;待验证(score=0)不经过此路径
await db.execute(
"""
DELETE FROM proxies
WHERE ip = ? AND port = ? AND score <= ? AND validated = 1
""",
(ip, port, min_score),
)
await db.commit()
return db.total_changes > 0
except Exception as e:
logger.error(f"update_score failed: {e}", exc_info=True)
return False
@staticmethod
async def update_response_time(
db: aiosqlite.Connection,
ip: str,
port: int,
response_time_ms: float,
) -> bool:
try:
await db.execute(
"UPDATE proxies SET response_time_ms = ? WHERE ip = ? AND port = ?",
(response_time_ms, ip, port),
)
await db.commit()
return True
except Exception as e:
logger.error(f"update_response_time failed: {e}", exc_info=True)
return False
@staticmethod
async def set_use_count_and_score(
db: aiosqlite.Connection,
ip: str,
port: int,
use_count: int,
score: int,
) -> bool:
try:
await db.execute(
"""
UPDATE proxies
SET use_count = ?, score = ?, last_check = CURRENT_TIMESTAMP
WHERE ip = ? AND port = ? AND validated = 1
""",
(use_count, score, ip, port),
)
await db.commit()
return db.total_changes > 0
except Exception as e:
logger.error(f"set_use_count_and_score failed: {e}", exc_info=True)
return False
@staticmethod
async def delete(db: aiosqlite.Connection, ip: str, port: int) -> None:
await db.execute("DELETE FROM proxies WHERE ip = ? AND port = ?", (ip, port))
await db.commit()
@staticmethod
async def batch_delete(db: aiosqlite.Connection, proxies: List[Tuple[str, int]]) -> int:
if not proxies:
return 0
changes_before = db.total_changes
await db.executemany("DELETE FROM proxies WHERE ip = ? AND port = ?", proxies)
await db.commit()
return db.total_changes - changes_before
@staticmethod
async def get_by_ip_port(
db: aiosqlite.Connection, ip: str, port: int
) -> Optional[Proxy]:
async with db.execute(
f"SELECT {_SELECT_PROXY_COLS} FROM proxies WHERE ip = ? AND port = ?",
(ip, port),
) as cursor:
row = await cursor.fetchone()
if row:
return _row_to_proxy(row)
return None
@staticmethod
async def get_random(
db: aiosqlite.Connection, min_score: int = 1
) -> Optional[Proxy]:
ms = max(1, int(min_score))
async with db.execute(
f"""
SELECT {_SELECT_PROXY_COLS} FROM proxies
WHERE validated = 1 AND score >= ?
ORDER BY RANDOM() LIMIT 1
""",
(ms,),
) as cursor:
row = await cursor.fetchone()
if row:
return _row_to_proxy(row)
return None
@staticmethod
async def list_all(
db: aiosqlite.Connection,
protocol: Optional[str] = None,
limit: int = 100000,
offset: int = 0,
validated: Optional[int] = None,
) -> List[Proxy]:
query = f"SELECT {_SELECT_PROXY_COLS} FROM proxies"
params: List = []
clauses = []
if protocol:
clauses.append("protocol = ?")
params.append(protocol.lower())
if validated is not None:
clauses.append("validated = ?")
params.append(int(validated))
if clauses:
query += " WHERE " + " AND ".join(clauses)
query += " LIMIT ? OFFSET ?"
params.extend([limit, offset])
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [_row_to_proxy(row) for row in rows]
@staticmethod
async def list_for_validation(
db: aiosqlite.Connection,
protocol: Optional[str] = None,
) -> List[Proxy]:
"""待验证优先,其次已验证按 last_check 升序(用于全量/调度复检)。"""
pending: List[Proxy] = []
q = f"SELECT {_SELECT_PROXY_COLS} FROM proxies WHERE validated = 0"
params: List = []
if protocol:
q += " AND protocol = ?"
params.append(protocol.lower())
q += " ORDER BY created_at ASC"
async with db.execute(q, params) as cursor:
rows_p = await cursor.fetchall()
pending = [_row_to_proxy(r) for r in rows_p]
rest_q = f"SELECT {_SELECT_PROXY_COLS} FROM proxies WHERE validated = 1"
rparams: List = []
if protocol:
rest_q += " AND protocol = ?"
rparams.append(protocol.lower())
rest_q += " ORDER BY last_check ASC"
async with db.execute(rest_q, rparams) as cursor:
rows_r = await cursor.fetchall()
rest = [_row_to_proxy(r) for r in rows_r]
return pending + rest
@staticmethod
async def iter_batches(
db: aiosqlite.Connection,
protocol: Optional[str] = None,
batch_size: int = 1000,
only_usable: bool = False,
usable_min_score: int = 1,
):
"""流式分批读取代理,避免一次性加载大量数据到内存"""
offset = 0
while True:
batch = await ProxyRepository._list_batch_offset(
db,
protocol,
batch_size,
offset,
only_usable=only_usable,
usable_min_score=usable_min_score,
)
if not batch:
break
yield batch
offset += batch_size
@staticmethod
async def _list_batch_offset(
db: aiosqlite.Connection,
protocol: Optional[str],
batch_size: int,
offset: int,
only_usable: bool,
usable_min_score: int = 1,
) -> List[Proxy]:
query = f"SELECT {_SELECT_PROXY_COLS} FROM proxies"
params: List = []
clauses = []
if only_usable:
ms = max(1, int(usable_min_score))
clauses.append("validated = 1 AND score >= ?")
params.append(ms)
if protocol:
clauses.append("protocol = ?")
params.append(protocol.lower())
if clauses:
query += " WHERE " + " AND ".join(clauses)
query += " LIMIT ? OFFSET ?"
params.extend([batch_size, offset])
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [_row_to_proxy(row) for row in rows]
@staticmethod
async def list_paginated(
db: aiosqlite.Connection,
page: int = 1,
page_size: int = 20,
protocol: Optional[str] = None,
min_score: int = 0,
max_score: Optional[int] = None,
sort_by: str = "last_check",
sort_order: str = "DESC",
pool_filter: Optional[str] = None,
) -> Tuple[List[Proxy], int]:
conditions = ["score >= ?"]
params: List = [min_score]
if protocol:
conditions.append("protocol = ?")
params.append(protocol)
if max_score is not None:
conditions.append("score <= ?")
params.append(max_score)
if pool_filter == "pending":
conditions.append("validated = 0")
elif pool_filter == "available":
conditions.append("validated = 1 AND score > 0")
where_clause = " AND ".join(conditions)
allowed_sort_by = {"ip", "port", "protocol", "score", "last_check"}
allowed_sort_order = {"ASC", "DESC"}
if sort_by not in allowed_sort_by or sort_order.upper() not in allowed_sort_order:
order_clause = "last_check DESC"
else:
order_clause = f"{sort_by} {sort_order.upper()}"
offset = (page - 1) * page_size
count_query = f"SELECT COUNT(*) FROM proxies WHERE {where_clause}"
async with db.execute(count_query, list(params)) as cursor:
row = await cursor.fetchone()
total = row[0] if row else 0
data_query = f"""
SELECT {_SELECT_PROXY_COLS}
FROM proxies
WHERE {where_clause}
ORDER BY {order_clause}
LIMIT ? OFFSET ?
"""
params.extend([page_size, offset])
async with db.execute(data_query, params) as cursor:
rows = await cursor.fetchall()
proxies = [_row_to_proxy(row) for row in rows]
return proxies, total
@staticmethod
async def get_stats(
db: aiosqlite.Connection, low_score_threshold: int = 0
) -> dict:
"""统计快照。
协议计数http/https/socks*)仅含已验证且 score>0 的可用代理,供首页图表与「可用」口径一致。
pending_* 为待验证池validated=0按协议分布。
invalid_count已验证且 score<=0或 score 低于系统「最低分」阈值(阈值>0 时)。
"""
thr = max(0, int(low_score_threshold))
query = """
SELECT
COUNT(*) as total,
COUNT(CASE WHEN validated = 0 THEN 1 END) as pending,
COUNT(CASE WHEN validated = 1 AND score > 0 THEN 1 END) as available,
(SELECT AVG(score) FROM proxies WHERE validated = 1 AND score > 0) as avg_score,
COUNT(CASE WHEN validated = 1 AND score > 0 AND protocol = 'http' THEN 1 END) as http_count,
COUNT(CASE WHEN validated = 1 AND score > 0 AND protocol = 'https' THEN 1 END) as https_count,
COUNT(CASE WHEN validated = 1 AND score > 0 AND protocol = 'socks4' THEN 1 END) as socks4_count,
COUNT(CASE WHEN validated = 1 AND score > 0 AND protocol = 'socks5' THEN 1 END) as socks5_count,
COUNT(CASE WHEN validated = 0 AND protocol = 'http' THEN 1 END) as pending_http_count,
COUNT(CASE WHEN validated = 0 AND protocol = 'https' THEN 1 END) as pending_https_count,
COUNT(CASE WHEN validated = 0 AND protocol = 'socks4' THEN 1 END) as pending_socks4_count,
COUNT(CASE WHEN validated = 0 AND protocol = 'socks5' THEN 1 END) as pending_socks5_count,
COUNT(CASE WHEN validated = 1 AND (score <= 0 OR (? > 0 AND score < ?)) THEN 1 END) as invalid_count,
(SELECT AVG(response_time_ms) FROM proxies WHERE validated = 1 AND score > 0
AND response_time_ms IS NOT NULL AND response_time_ms > 0) as avg_response_ms
FROM proxies
"""
async with db.execute(query, (thr, thr)) as cursor:
row = await cursor.fetchone()
if row:
avg_lat = row[13]
return {
"total": row[0] or 0,
"pending": row[1] or 0,
"available": row[2] or 0,
"avg_score": round(row[3], 2) if row[3] is not None else 0,
"http_count": row[4] or 0,
"https_count": row[5] or 0,
"socks4_count": row[6] or 0,
"socks5_count": row[7] or 0,
"pending_http_count": row[8] or 0,
"pending_https_count": row[9] or 0,
"pending_socks4_count": row[10] or 0,
"pending_socks5_count": row[11] or 0,
"invalid_count": row[12] or 0,
"avg_response_ms": round(avg_lat, 2) if avg_lat is not None else None,
}
return {
"total": 0,
"pending": 0,
"available": 0,
"avg_score": 0,
"http_count": 0,
"https_count": 0,
"socks4_count": 0,
"socks5_count": 0,
"pending_http_count": 0,
"pending_https_count": 0,
"pending_socks4_count": 0,
"pending_socks5_count": 0,
"invalid_count": 0,
"avg_response_ms": None,
}
@staticmethod
async def get_today_new_count(db: aiosqlite.Connection) -> int:
"""今日新增:仅统计今日入库且已验证可用(与 get_stats.available 语义一致)。"""
try:
async with db.execute(
"""
SELECT COUNT(*) FROM proxies
WHERE DATE(created_at) = DATE('now', 'localtime')
AND validated = 1
AND score > 0
"""
) as cursor:
row = await cursor.fetchone()
return row[0] if row else 0
except Exception as e:
logger.error(f"get_today_new_count failed: {e}", exc_info=True)
return 0
@staticmethod
async def clean_invalid(
db: aiosqlite.Connection, low_score_threshold: int = 0
) -> int:
thr = max(0, int(low_score_threshold))
if thr > 0:
await db.execute(
"DELETE FROM proxies WHERE validated = 1 AND (score <= 0 OR score < ?)",
(thr,),
)
else:
await db.execute(
"DELETE FROM proxies WHERE validated = 1 AND score <= 0"
)
await db.commit()
return db.total_changes
@staticmethod
async def clean_expired(db: aiosqlite.Connection, days: int) -> int:
try:
await db.execute(
"DELETE FROM proxies WHERE last_check < datetime('now', '-' || ? || ' days')",
(days,),
)
await db.commit()
return db.total_changes
except Exception as e:
logger.error(f"clean_expired failed: {e}", exc_info=True)
return 0
@staticmethod
async def get_latency_distribution(db: aiosqlite.Connection) -> dict:
"""获取延迟分布数据(仅已验证可用的代理)"""
try:
async with db.execute(
"""
SELECT response_time_ms FROM proxies
WHERE validated = 1 AND score > 0 AND response_time_ms IS NOT NULL AND response_time_ms > 0
"""
) as cursor:
rows = await cursor.fetchall()
if not rows:
return {"ranges": [], "counts": []}
latencies = [row[0] for row in rows]
ranges = ["<500ms", "500-1s", "1-2s", "2-3s", ">3s"]
counts = [0, 0, 0, 0, 0]
for lat in latencies:
if lat < 500:
counts[0] += 1
elif lat < 1000:
counts[1] += 1
elif lat < 2000:
counts[2] += 1
elif lat < 3000:
counts[3] += 1
else:
counts[4] += 1
return {"ranges": ranges, "counts": counts}
except Exception as e:
logger.error(f"get_latency_distribution failed: {e}", exc_info=True)
return {"ranges": [], "counts": []}
@staticmethod
async def get_score_distribution(db: aiosqlite.Connection) -> dict:
"""获取评分分布数据(仅已验证可用的代理)"""
try:
async with db.execute(
"""
SELECT score FROM proxies
WHERE validated = 1 AND score > 0
"""
) as cursor:
rows = await cursor.fetchall()
if not rows:
return {"ranges": [], "counts": []}
scores = [row[0] for row in rows]
ranges = ["80-100", "60-80", "40-60", "20-40", "0-20"]
counts = [0, 0, 0, 0, 0]
for score in scores:
if score >= 80:
counts[0] += 1
elif score >= 60:
counts[1] += 1
elif score >= 40:
counts[2] += 1
elif score >= 20:
counts[3] += 1
else:
counts[4] += 1
return {"ranges": ranges, "counts": counts}
except Exception as e:
logger.error(f"get_score_distribution failed: {e}", exc_info=True)
return {"ranges": [], "counts": []}