-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathproxy_manager.py
More file actions
74 lines (67 loc) · 2.49 KB
/
proxy_manager.py
File metadata and controls
74 lines (67 loc) · 2.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import random
import requests
import time
from typing import List, Dict, Optional
from datetime import datetime
import threading
import queue
class ProxyManager:
"""Sequential proxy manager for main.py"""
def __init__(self, proxy_file: str = "proxies.txt"):
self.proxy_file = proxy_file
self.working_proxies: queue.Queue = queue.Queue()
self.failed_proxies: Dict[str, datetime] = {}
self.lock = threading.Lock()
self.load_proxies()
def load_proxies(self):
"""Load proxies from file"""
try:
with open(self.proxy_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
proxy = self.parse_proxy(line)
if proxy:
self.working_proxies.put(proxy)
print(f"📋 Loaded {self.working_proxies.qsize()} proxies")
except FileNotFoundError:
print(f"⚠ No proxy file found. Create {self.proxy_file}")
def parse_proxy(self, proxy_str: str) -> Optional[Dict]:
"""Parse proxy string"""
try:
if '@' in proxy_str:
proto_rest, auth_host = proxy_str.split('://', 1) if '://' in proxy_str else ('http', proxy_str)
auth, host = auth_host.split('@')
user, password = auth.split(':')
ip, port = host.split(':')
return {
'protocol': proto_rest,
'user': user,
'password': password,
'ip': ip,
'port': port,
'url': proxy_str
}
else:
if '://' in proxy_str:
proto, rest = proxy_str.split('://')
ip, port = rest.split(':')
else:
proto = 'http'
ip, port = proxy_str.split(':')
return {
'protocol': proto,
'ip': ip,
'port': port,
'url': f"{proto}://{ip}:{port}"
}
except Exception:
return None
def get_proxy(self) -> Optional[Dict]:
"""Get a working proxy"""
try:
proxy = self.working_proxies.get_nowait()
self.working_proxies.put(proxy)
return proxy
except queue.Empty:
return None