proxy从互联网收集代理的编写
一、代码的结构
我在分析SmallProxyPool代码之后找到了几个基础的结构用这些基础的结构就可以编写一个手机互联网代理的工具
- 收集模块 通过Fofa搜索到互联网上的Sockt5资产
- 检测模块 通过检测看什么Sockt5不需要账密并且可以发送和接收流量
- 流量转发模块 把本机的流量转发到远程的Sockt5服务器上
二、代码实现
1.收集模块
- 读取配置文件
- 调用Fofa接口发挥数据
- 存放到数组里
def getSocks5Data(): # 获取节点
rule = base64.b64encode(con['config']['rule'].encode())
counter = 0
nextPages = ""
while True:
url = "https://fofa.info/api/v1/search/next?email={}&key={}&qbase64={}&size=1000&next={}".format(emali, key, rule.decode(), nextPages)
if counter == 0:
url = "https://fofa.info/api/v1/search/next?email={}&key={}&qbase64={}&size=1000".format(emali, key,rule.decode())
counter += 1
ScoketData = requests.get(url, headers=Heade)
try:
nextPages = ScoketData.json()['next'] # 获取一次next
print(nextPages)
if nextPages != "" or ScoketData.json()['error']: # 不为空就添加一个next
print("[~]正在翻页")
else:
break
list_tmp = ScoketData.json()['results']
for item in list_tmp:
ListProxy.append(item[0])
except:
break
2.检测模块
使用多线程对收集的url进行测试
def ThreadcheckAlive():
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(checkAlive, proxy) for proxy in ListProxy]
for future in as_completed(futures):
result = future.result()
if result is not None:
print(f"Result: {result}")
然后使用进入checkAlive方法判代理是否联通
def checkAlive(ProxyIpProt):
proxies = {
"http": "socks5://" + ProxyIpProt,
"https": "socks5://" + ProxyIpProt,
}
try:
Hea_req = requests.head("http://www.baidu.com", headers=Heade, proxies=proxies, timeout=1, verify=False)
if Hea_req.status_code == 200:
#print(len(Hea_req.text))
filteredProxy.append(ProxyIpProt)
print(ProxyIpProt)
except(Exception):
pass
这个代码很简单通过socks5去测试每个节点的的响应包如果返回200则代表可以来联通
3、流量转发模块
流量转发模块实现的主要是Sock5的本地监听然后把流量 轮询的发送到远程的Socks5服务器上
流量的接收和发送可以使用多线程。
import socket
import threading
import FofaProxy
import random
# 定义一个函数,用于将数据从源套接字转发到目标套接字
def forward_data(src, dest):
while True:
data = src.recv(4096) # 从源套接字接收数据
print(data)
if not data: # 如果接收到的数据为空,跳出循环
break
dest.sendall(data) # 将数据发送到目标套接字
# 主函数
def SocktStart(List_Proxy):
# proxy_list = ["218.62.55.221:1080","61.160.213.169:1080","106.52.247.90:20005","124.221.108.177:1080","223.112.196.124:1080"] #
proxy_list_len = len(List_Proxy)
print(List_Proxy)
print("我被运行")
local_host = "127.0.0.1" # 本地主机地址
local_port = 10808 # 本地端口号
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建服务器套接字
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 设置套接字选项
server_socket.bind((local_host, local_port)) # 绑定本地主机地址和端口号
server_socket.listen(5) # 开始监听连接请求
print(f"[*] Listening on {local_host}:{local_port}") # 输出监听信息
while True:
client_socket, addr = server_socket.accept() # 接受客户端连接请求
print(f"[*] Accepted connection from: {addr[0]}:{addr[1]}") # 输出连接信息
Proxy = List_Proxy[random.randint(0, proxy_list_len-1)]
IpHost = Proxy.split(":")[0]
HProt = Proxy.split(":")[1]
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建远程套接字
remote_socket.connect((IpHost, int(HProt))) # 连接到远程主机列表中的第一个主机
# 创建两个线程,分别将数据从客户端套接字转发到远程套接字,以及从远程套接字转发到客户端套接字
forward_thread = threading.Thread(target=forward_data, args=(client_socket, remote_socket))
forward_thread.start()
forward_thread = threading.Thread(target=forward_data, args=(remote_socket, client_socket))
forward_thread.start()
...