"""仓库层单元测试""" import pytest import asyncio from datetime import datetime class TestProxyRepository: """测试 ProxyRepository""" @pytest.mark.asyncio async def test_insert_or_update(self, db, proxy_repo): """测试插入或更新代理""" result = await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50) assert result is True # 验证插入成功 proxy = await proxy_repo.get_by_ip_port(db, "192.168.1.1", 8080) assert proxy is not None assert proxy.ip == "192.168.1.1" assert proxy.port == 8080 # 清理 await proxy_repo.delete(db, "192.168.1.1", 8080) @pytest.mark.asyncio async def test_get_random(self, db, proxy_repo): """测试获取随机代理""" # 先插入一个代理 await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50) proxy = await proxy_repo.get_random(db) # 可能有也可能没有(取决于数据库状态) if proxy: assert hasattr(proxy, 'ip') assert hasattr(proxy, 'port') # 清理 await proxy_repo.delete(db, "192.168.1.1", 8080) @pytest.mark.asyncio async def test_list_all(self, db, proxy_repo): """测试列出所有代理""" # 插入测试数据 await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50) await proxy_repo.insert_or_update(db, "192.168.1.2", 8081, "https", 60) proxies = await proxy_repo.list_all(db, limit=100) assert isinstance(proxies, list) # 清理 await proxy_repo.delete(db, "192.168.1.1", 8080) await proxy_repo.delete(db, "192.168.1.2", 8081) @pytest.mark.asyncio async def test_update_score(self, db, proxy_repo): """测试更新分数""" # 插入代理 await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50) # 更新分数 result = await proxy_repo.update_score(db, "192.168.1.1", 8080, 10) assert result is True # 验证 proxy = await proxy_repo.get_by_ip_port(db, "192.168.1.1", 8080) assert proxy.score == 60 # 清理 await proxy_repo.delete(db, "192.168.1.1", 8080) @pytest.mark.asyncio async def test_update_score_deletes_on_zero_or_below(self, db, proxy_repo): """测试分数降至 0 及以下时自动删除代理""" await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 5) result = await proxy_repo.update_score(db, "192.168.1.1", 8080, -10) assert result is True proxy = await proxy_repo.get_by_ip_port(db, "192.168.1.1", 8080) assert proxy is None @pytest.mark.asyncio async def test_iter_batches(self, db, proxy_repo): """测试流式分批读取(与库内已有数据共存,只校验增量与分批形状)""" async with db.execute("SELECT COUNT(*) FROM proxies") as c: before = (await c.fetchone())[0] for i in range(5): await proxy_repo.insert_or_update(db, f"192.168.99.{i}", 8000 + i, "http", 10) async with db.execute("SELECT COUNT(*) FROM proxies") as c: after = (await c.fetchone())[0] assert after == before + 5 batches = [] async for batch in proxy_repo.iter_batches(db, batch_size=2): batches.append(batch) assert sum(len(b) for b in batches) == after assert len(batches[-1]) in (1, 2) assert all(len(b) <= 2 for b in batches) for i in range(5): await proxy_repo.delete(db, f"192.168.99.{i}", 8000 + i) @pytest.mark.asyncio async def test_batch_delete(self, db, proxy_repo): """测试批量删除""" # 插入测试数据 await proxy_repo.insert_or_update(db, "192.168.1.1", 8080, "http", 50) await proxy_repo.insert_or_update(db, "192.168.1.2", 8081, "http", 50) # 批量删除 count = await proxy_repo.batch_delete(db, [("192.168.1.1", 8080), ("192.168.1.2", 8081)]) assert count == 2 # 验证删除 proxy1 = await proxy_repo.get_by_ip_port(db, "192.168.1.1", 8080) proxy2 = await proxy_repo.get_by_ip_port(db, "192.168.1.2", 8081) assert proxy1 is None assert proxy2 is None @pytest.mark.asyncio async def test_get_stats(self, db, proxy_repo): """测试获取统计信息""" stats = await proxy_repo.get_stats(db) assert "total" in stats assert "pending" in stats assert "available" in stats assert "avg_score" in stats assert "http_count" in stats @pytest.mark.asyncio async def test_get_today_new_count_only_validated_available(self, db, proxy_repo): """今日新增不计待验证;仅今日创建且 validated=1、score>0""" base = await proxy_repo.get_today_new_count(db) await proxy_repo.upsert_from_crawl(db, "192.168.88.20", 9020, "http", 0) assert await proxy_repo.get_today_new_count(db) == base await proxy_repo.insert_or_update(db, "192.168.88.21", 9021, "http", 55) assert await proxy_repo.get_today_new_count(db) == base + 1 await proxy_repo.delete(db, "192.168.88.20", 9020) await proxy_repo.delete(db, "192.168.88.21", 9021) @pytest.mark.asyncio async def test_upsert_many_from_crawl(self, db, proxy_repo): from app.models.domain import ProxyRaw raws = [ ProxyRaw("10.0.0.1", 18080, "http"), ProxyRaw("10.0.0.2", 18081, "socks5"), ] await proxy_repo.upsert_many_from_crawl(db, raws, 0) await db.commit() p1 = await proxy_repo.get_by_ip_port(db, "10.0.0.1", 18080) assert p1 is not None assert p1.validated == 0 p2 = await proxy_repo.get_by_ip_port(db, "10.0.0.2", 18081) assert p2.protocol == "socks5" await proxy_repo.delete(db, "10.0.0.1", 18080) await proxy_repo.delete(db, "10.0.0.2", 18081)