Files
ProxyPool/main.py
2026-01-27 21:17:36 +08:00

81 lines
2.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from core.plugin_manager import PluginManager
from core.sqlite import SQLiteManager
from core.validator import ProxyValidator
from core.log import logger
# 异步队列,增大缓冲区以适应更高并发
proxy_queue = asyncio.Queue(maxsize=500)
async def run_crawler():
"""生产者:抓取代理并放入队列"""
logger.info("后台爬虫任务启动...")
manager = PluginManager()
count = 0
async for ip, port, protocol in manager.run_all():
await proxy_queue.put((ip, port, protocol))
count += 1
logger.info(f"爬虫抓取阶段完成,共发现 {count} 个潜在代理。")
async def run_validator(db, validator):
"""消费者:从队列获取代理并验证入库"""
verified_count = 0
while True:
proxy = await proxy_queue.get()
if proxy is None:
proxy_queue.task_done()
break
ip, port, protocol = proxy
try:
is_valid, latency = await validator.validate(ip, port, protocol)
if is_valid:
logger.info(f"验证通过: {ip}:{port} ({protocol}) - 延迟: {latency}ms")
await db.insert_proxy(ip, port, protocol)
verified_count += 1
except Exception as e:
logger.error(f"验证器异常: {e}")
finally:
proxy_queue.task_done()
if verified_count > 0:
logger.info(f"验证协程完成,入库 {verified_count} 个代理。")
async def main():
logger.info("=== ProxyPool 加速启动 ===")
db = SQLiteManager()
await db.init_db()
# 大幅提升并发参数
# max_concurrency 限制底层请求并发num_validators 决定上层消费速度
async with ProxyValidator(max_concurrency=200) as validator:
num_validators = 100
# 启动生产者
crawler_task = asyncio.create_task(run_crawler())
# 启动验证协程
validator_tasks = [asyncio.create_task(run_validator(db, validator)) for _ in range(num_validators)]
await crawler_task
# 发送退出信号
for _ in range(num_validators):
await proxy_queue.put(None)
await proxy_queue.join()
await asyncio.gather(*validator_tasks)
total = await db.count_proxies()
logger.info(f"=== 运行结束,当前池内总数: {total} ===")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("程序手动停止")