/
OS-World5d219e7
import random
import requests
import logging
import time
from typing import List, Dict, Optional
from dataclasses import dataclass
from threading import Lock
import json
logger = logging.getLogger("desktopenv.providers.aws.ProxyPool")
logger.setLevel(logging.INFO)
@dataclass
class ProxyInfo:
host: str
port: int
username: Optional[str] = None
password: Optional[str] = None
protocol: str = "http" # http, https, socks5
failed_count: int = 0
last_used: float = 0
is_active: bool = True
class ProxyPool:
def __init__(self, config_file: str = None):
self.proxies: List[ProxyInfo] = []
self.current_index = 0
self.lock = Lock()
self.max_failures = 3 # 最大失败次数
self.cooldown_time = 300 # 5分钟冷却时间
if config_file:
self.load_proxies_from_file(config_file)
def load_proxies_from_file(self, config_file: str):
"""Load proxy list from config file"""
try:
with open(config_file, 'r') as f:
proxy_configs = json.load(f)
for config in proxy_configs:
proxy = ProxyInfo(
host=config['host'],
port=config['port'],
username=config.get('username'),
password=config.get('password'),
protocol=config.get('protocol', 'http')
)
self.proxies.append(proxy)
logger.info(f"Loaded {len(self.proxies)} proxies from {config_file}")
except Exception as e:
logger.error(f"Failed to load proxies from {config_file}: {e}")
def add_proxy(self, host: str, port: int, username: str = None,
password: str = None, protocol: str = "http"):
"""Add proxy to pool"""
proxy = ProxyInfo(host=host, port=port, username=username,
password=password, protocol=protocol)
with self.lock:
self.proxies.append(proxy)
logger.info(f"Added proxy {host}:{port}")
def get_next_proxy(self) -> Optional[ProxyInfo]:
"""Get next available proxy"""
with self.lock:
if not self.proxies:
return None
# Filter out proxies with too many failures
active_proxies = [p for p in self.proxies if self._is_proxy_available(p)]
if not active_proxies:
logger.warning("No active proxies available")
return None
# Round-robin selection of proxy
proxy = active_proxies[self.current_index % len(active_proxies)]
self.current_index += 1
proxy.last_used = time.time()
return proxy
def _is_proxy_available(self, proxy: ProxyInfo) -> bool:
"""Check if proxy is available"""
if not proxy.is_active:
return False
if proxy.failed_count >= self.max_failures:
# Check if cooldown time has passed
if time.time() - proxy.last_used < self.cooldown_time:
return False
else:
# Reset failure count
proxy.failed_count = 0
return True
def mark_proxy_failed(self, proxy: ProxyInfo):
"""Mark proxy as failed"""
with self.lock:
proxy.failed_count += 1
if proxy.failed_count >= self.max_failures:
logger.warning(f"Proxy {proxy.host}:{proxy.port} marked as failed "
f"(failures: {proxy.failed_count})")
def mark_proxy_success(self, proxy: ProxyInfo):
"""Mark proxy as successful"""
with self.lock:
proxy.failed_count = 0
def test_proxy(self, proxy: ProxyInfo, test_url: str = "http://httpbin.org/ip",
timeout: int = 10) -> bool:
"""Test if proxy is working"""
try:
proxy_url = self._format_proxy_url(proxy)
proxies = {
'http': proxy_url,
'https': proxy_url
}
response = requests.get(test_url, proxies=proxies, timeout=timeout)
if response.status_code == 200:
self.mark_proxy_success(proxy)
return True
else:
self.mark_proxy_failed(proxy)
return False
except Exception as e:
logger.debug(f"Proxy test failed for {proxy.host}:{proxy.port}: {e}")
self.mark_proxy_failed(proxy)
return False
def _format_proxy_url(self, proxy: ProxyInfo) -> str:
"""Format proxy URL"""
if proxy.username and proxy.password:
return f"{proxy.protocol}://{proxy.username}:{proxy.password}@{proxy.host}:{proxy.port}"
else:
return f"{proxy.protocol}://{proxy.host}:{proxy.port}"
def get_proxy_dict(self, proxy: ProxyInfo) -> Dict[str, str]:
"""Get proxy dictionary for requests library"""
proxy_url = self._format_proxy_url(proxy)
return {
'http': proxy_url,
'https': proxy_url
}
def test_all_proxies(self, test_url: str = "http://httpbin.org/ip"):
"""Test all proxies"""
logger.info("Testing all proxies...")
working_count = 0
for proxy in self.proxies:
if self.test_proxy(proxy, test_url):
working_count += 1
logger.info(f"✓ Proxy {proxy.host}:{proxy.port} is working")
else:
logger.warning(f"✗ Proxy {proxy.host}:{proxy.port} failed")
logger.info(f"Proxy test completed: {working_count}/{len(self.proxies)} working")
return working_count
def get_stats(self) -> Dict:
"""Get proxy pool statistics"""
with self.lock:
total = len(self.proxies)
active = len([p for p in self.proxies if self._is_proxy_available(p)])
failed = len([p for p in self.proxies if p.failed_count >= self.max_failures])
return {
'total': total,
'active': active,
'failed': failed,
'success_rate': active / total if total > 0 else 0
}
# Global proxy pool instance
_proxy_pool = None
def get_global_proxy_pool() -> ProxyPool:
"""Get global proxy pool instance"""
global _proxy_pool
if _proxy_pool is None:
_proxy_pool = ProxyPool()
return _proxy_pool
def init_proxy_pool(config_file: str = None):
"""Initialize global proxy pool"""
global _proxy_pool
_proxy_pool = ProxyPool(config_file)
return _proxy_pool