mirrored 5 minutes ago
0
yuanmengqiClean code 5d219e7
import random
import requests
import logging
import time
from typing import List, Dict, Optional
from dataclasses import dataclass
from threading import Lock
import json

logger = logging.getLogger("desktopenv.providers.aws.ProxyPool")
logger.setLevel(logging.INFO)

@dataclass
class ProxyInfo:
    host: str
    port: int
    username: Optional[str] = None
    password: Optional[str] = None
    protocol: str = "http"  # http, https, socks5
    failed_count: int = 0
    last_used: float = 0
    is_active: bool = True

class ProxyPool:
    def __init__(self, config_file: str = None):
        self.proxies: List[ProxyInfo] = []
        self.current_index = 0
        self.lock = Lock()
        self.max_failures = 3  # 最大失败次数
        self.cooldown_time = 300  # 5分钟冷却时间
        
        if config_file:
            self.load_proxies_from_file(config_file)
    
    def load_proxies_from_file(self, config_file: str):
        """Load proxy list from config file"""
        try:
            with open(config_file, 'r') as f:
                proxy_configs = json.load(f)
                
            for config in proxy_configs:
                proxy = ProxyInfo(
                    host=config['host'],
                    port=config['port'],
                    username=config.get('username'),
                    password=config.get('password'),
                    protocol=config.get('protocol', 'http')
                )
                self.proxies.append(proxy)
                
            logger.info(f"Loaded {len(self.proxies)} proxies from {config_file}")
        except Exception as e:
            logger.error(f"Failed to load proxies from {config_file}: {e}")
    
    def add_proxy(self, host: str, port: int, username: str = None, 
                  password: str = None, protocol: str = "http"):
        """Add proxy to pool"""
        proxy = ProxyInfo(host=host, port=port, username=username, 
                         password=password, protocol=protocol)
        with self.lock:
            self.proxies.append(proxy)
        logger.info(f"Added proxy {host}:{port}")
    
    def get_next_proxy(self) -> Optional[ProxyInfo]:
        """Get next available proxy"""
        with self.lock:
            if not self.proxies:
                return None
            
            # Filter out proxies with too many failures
            active_proxies = [p for p in self.proxies if self._is_proxy_available(p)]
            
            if not active_proxies:
                logger.warning("No active proxies available")
                return None
            
            # Round-robin selection of proxy
            proxy = active_proxies[self.current_index % len(active_proxies)]
            self.current_index += 1
            proxy.last_used = time.time()
            
            return proxy
    
    def _is_proxy_available(self, proxy: ProxyInfo) -> bool:
        """Check if proxy is available"""
        if not proxy.is_active:
            return False
        
        if proxy.failed_count >= self.max_failures:
            # Check if cooldown time has passed
            if time.time() - proxy.last_used < self.cooldown_time:
                return False
            else:
                # Reset failure count
                proxy.failed_count = 0
        
        return True
    
    def mark_proxy_failed(self, proxy: ProxyInfo):
        """Mark proxy as failed"""
        with self.lock:
            proxy.failed_count += 1
            if proxy.failed_count >= self.max_failures:
                logger.warning(f"Proxy {proxy.host}:{proxy.port} marked as failed "
                             f"(failures: {proxy.failed_count})")
    
    def mark_proxy_success(self, proxy: ProxyInfo):
        """Mark proxy as successful"""
        with self.lock:
            proxy.failed_count = 0
    
    def test_proxy(self, proxy: ProxyInfo, test_url: str = "http://httpbin.org/ip", 
                   timeout: int = 10) -> bool:
        """Test if proxy is working"""
        try:
            proxy_url = self._format_proxy_url(proxy)
            proxies = {
                'http': proxy_url,
                'https': proxy_url
            }
            
            response = requests.get(test_url, proxies=proxies, timeout=timeout)
            if response.status_code == 200:
                self.mark_proxy_success(proxy)
                return True
            else:
                self.mark_proxy_failed(proxy)
                return False
                
        except Exception as e:
            logger.debug(f"Proxy test failed for {proxy.host}:{proxy.port}: {e}")
            self.mark_proxy_failed(proxy)
            return False
    
    def _format_proxy_url(self, proxy: ProxyInfo) -> str:
        """Format proxy URL"""
        if proxy.username and proxy.password:
            return f"{proxy.protocol}://{proxy.username}:{proxy.password}@{proxy.host}:{proxy.port}"
        else:
            return f"{proxy.protocol}://{proxy.host}:{proxy.port}"
    
    def get_proxy_dict(self, proxy: ProxyInfo) -> Dict[str, str]:
        """Get proxy dictionary for requests library"""
        proxy_url = self._format_proxy_url(proxy)
        return {
            'http': proxy_url,
            'https': proxy_url
        }
    
    def test_all_proxies(self, test_url: str = "http://httpbin.org/ip"):
        """Test all proxies"""
        logger.info("Testing all proxies...")
        working_count = 0
        
        for proxy in self.proxies:
            if self.test_proxy(proxy, test_url):
                working_count += 1
                logger.info(f"✓ Proxy {proxy.host}:{proxy.port} is working")
            else:
                logger.warning(f"✗ Proxy {proxy.host}:{proxy.port} failed")
        
        logger.info(f"Proxy test completed: {working_count}/{len(self.proxies)} working")
        return working_count
    
    def get_stats(self) -> Dict:
        """Get proxy pool statistics"""
        with self.lock:
            total = len(self.proxies)
            active = len([p for p in self.proxies if self._is_proxy_available(p)])
            failed = len([p for p in self.proxies if p.failed_count >= self.max_failures])
            
            return {
                'total': total,
                'active': active,
                'failed': failed,
                'success_rate': active / total if total > 0 else 0
            }

# Global proxy pool instance
_proxy_pool = None

def get_global_proxy_pool() -> ProxyPool:
    """Get global proxy pool instance"""
    global _proxy_pool
    if _proxy_pool is None:
        _proxy_pool = ProxyPool()
    return _proxy_pool

def init_proxy_pool(config_file: str = None):
    """Initialize global proxy pool"""
    global _proxy_pool
    _proxy_pool = ProxyPool(config_file)
    return _proxy_pool