Add files via upload

This commit is contained in:
achenc1013
2025-03-09 19:44:06 +08:00
committed by GitHub
parent ba7339b28c
commit 6f49427932
60 changed files with 10119 additions and 2 deletions

View File

@@ -0,0 +1 @@
"""XSS扫描器 - 核心包"""

Binary file not shown.

Binary file not shown.

Binary file not shown.

405
xss_scanner/core/config.py Normal file
View File

@@ -0,0 +1,405 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
配置模块,负责管理扫描器的配置参数
"""
import os
import json
import logging
from urllib.parse import urlparse
logger = logging.getLogger('xss_scanner')
class Config:
"""配置类,管理扫描器的所有配置选项"""
def __init__(self):
"""初始化默认配置"""
# 常规选项
self.url = None
self.depth = 2
self.threads = 5
self.timeout = 10
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
self.cookies = {}
self.headers = {}
self.proxy = None
self.scan_level = 2
self.scan_type = 'all'
self.payload_level = 2
self.output_file = None
self.output_format = 'html'
self.verbose = False
self.no_color = False
# 高级选项
self.use_browser = False
self.exploit = False
self.custom_payloads = None
self.exclude_pattern = None
self.include_pattern = None
self.auth = None
# 内部使用
self.base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.project_root = os.path.dirname(self.base_dir)
# 优先检查项目根目录下的payloads目录如果不存在再使用模块内部的payloads目录
self.payloads_dir = os.path.join(self.project_root, 'payloads')
if not os.path.exists(self.payloads_dir) or not os.path.isdir(self.payloads_dir):
self.payloads_dir = os.path.join(self.base_dir, 'payloads')
logger.debug(f"使用模块内部payloads目录: {self.payloads_dir}")
else:
logger.debug(f"使用项目根目录payloads目录: {self.payloads_dir}")
def load_from_args(self, args):
"""
从命令行参数加载配置
Args:
args: 解析后的命令行参数
"""
# 常规选项
if hasattr(args, 'url') and args.url:
self.url = args.url
if hasattr(args, 'depth') and args.depth is not None:
self.depth = args.depth
if hasattr(args, 'threads') and args.threads is not None:
self.threads = args.threads
if hasattr(args, 'timeout') and args.timeout is not None:
self.timeout = args.timeout
if hasattr(args, 'user_agent') and args.user_agent:
self.user_agent = args.user_agent
if hasattr(args, 'cookie') and args.cookie:
self._parse_cookies(args.cookie)
if hasattr(args, 'headers') and args.headers:
self._parse_headers(args.headers)
if hasattr(args, 'proxy') and args.proxy:
self.proxy = args.proxy
if hasattr(args, 'scan_level') and args.scan_level is not None:
self.scan_level = args.scan_level
if hasattr(args, 'scan_type') and args.scan_type:
self.scan_type = args.scan_type
if hasattr(args, 'payload_level') and args.payload_level is not None:
self.payload_level = args.payload_level
if hasattr(args, 'output') and args.output:
self.output_file = args.output
if hasattr(args, 'format') and args.format:
self.output_format = args.format
if hasattr(args, 'verbose'):
self.verbose = args.verbose
if hasattr(args, 'no_color'):
self.no_color = args.no_color
# 高级选项
if hasattr(args, 'browser'):
self.use_browser = args.browser
if hasattr(args, 'exploit'):
self.exploit = args.exploit
if hasattr(args, 'custom_payloads') and args.custom_payloads:
self.custom_payloads = args.custom_payloads
if hasattr(args, 'exclude') and args.exclude:
self.exclude_pattern = args.exclude
if hasattr(args, 'include') and args.include:
self.include_pattern = args.include
if hasattr(args, 'auth') and args.auth:
self._parse_auth(args.auth)
def load_from_file(self, config_file):
"""
从配置文件加载配置
Args:
config_file: 配置文件路径
"""
if not os.path.exists(config_file):
logger.error(f"配置文件不存在: {config_file}")
return False
try:
with open(config_file, 'r', encoding='utf-8') as f:
config_data = json.load(f)
# 常规选项
if 'url' in config_data:
self.url = config_data['url']
if 'depth' in config_data:
self.depth = config_data['depth']
if 'threads' in config_data:
self.threads = config_data['threads']
if 'timeout' in config_data:
self.timeout = config_data['timeout']
if 'user_agent' in config_data:
self.user_agent = config_data['user_agent']
if 'cookies' in config_data:
self.cookies = config_data['cookies']
if 'headers' in config_data:
self.headers = config_data['headers']
if 'proxy' in config_data:
self.proxy = config_data['proxy']
if 'scan_level' in config_data:
self.scan_level = config_data['scan_level']
if 'scan_type' in config_data:
self.scan_type = config_data['scan_type']
if 'payload_level' in config_data:
self.payload_level = config_data['payload_level']
if 'output_file' in config_data:
self.output_file = config_data['output_file']
if 'output_format' in config_data:
self.output_format = config_data['output_format']
if 'verbose' in config_data:
self.verbose = config_data['verbose']
if 'no_color' in config_data:
self.no_color = config_data['no_color']
# 高级选项
if 'use_browser' in config_data:
self.use_browser = config_data['use_browser']
if 'exploit' in config_data:
self.exploit = config_data['exploit']
if 'custom_payloads' in config_data:
self.custom_payloads = config_data['custom_payloads']
if 'exclude_pattern' in config_data:
self.exclude_pattern = config_data['exclude_pattern']
if 'include_pattern' in config_data:
self.include_pattern = config_data['include_pattern']
if 'auth' in config_data:
self.auth = config_data['auth']
logger.info(f"成功从 {config_file} 加载配置")
return True
except Exception as e:
logger.error(f"加载配置文件时出错: {str(e)}")
return False
def save_to_file(self, config_file):
"""
将配置保存到文件
Args:
config_file: 配置文件路径
Returns:
bool: 是否成功保存
"""
config_data = {
# 常规选项
'url': self.url,
'depth': self.depth,
'threads': self.threads,
'timeout': self.timeout,
'user_agent': self.user_agent,
'cookies': self.cookies,
'headers': self.headers,
'proxy': self.proxy,
'scan_level': self.scan_level,
'scan_type': self.scan_type,
'payload_level': self.payload_level,
'output_file': self.output_file,
'output_format': self.output_format,
'verbose': self.verbose,
'no_color': self.no_color,
# 高级选项
'use_browser': self.use_browser,
'exploit': self.exploit,
'custom_payloads': self.custom_payloads,
'exclude_pattern': self.exclude_pattern,
'include_pattern': self.include_pattern,
'auth': self.auth
}
try:
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config_data, f, indent=4)
logger.info(f"配置已保存到 {config_file}")
return True
except Exception as e:
logger.error(f"保存配置文件时出错: {str(e)}")
return False
def _parse_cookies(self, cookie_str):
"""
解析Cookie字符串
Args:
cookie_str: Cookie字符串格式为"name=value; name2=value2"
"""
if not cookie_str:
return
try:
cookies = {}
for cookie in cookie_str.split(';'):
if '=' in cookie:
name, value = cookie.strip().split('=', 1)
cookies[name] = value
self.cookies = cookies
except Exception as e:
logger.error(f"解析Cookie时出错: {str(e)}")
def _parse_headers(self, headers_str):
"""
解析HTTP头字符串
Args:
headers_str: HTTP头字符串格式为"Header1:Value1;Header2:Value2"
"""
if not headers_str:
return
try:
headers = {}
for header in headers_str.split(';'):
if ':' in header:
name, value = header.strip().split(':', 1)
headers[name] = value
self.headers = headers
except Exception as e:
logger.error(f"解析HTTP头时出错: {str(e)}")
def _parse_auth(self, auth_str):
"""
解析基本认证字符串
Args:
auth_str: 基本认证字符串,格式为"username:password"
"""
if not auth_str:
return
try:
if ':' in auth_str:
username, password = auth_str.split(':', 1)
self.auth = {
'username': username,
'password': password
}
except Exception as e:
logger.error(f"解析认证信息时出错: {str(e)}")
def get_payloads_file(self, payload_type):
"""
获取有效载荷文件路径
Args:
payload_type: 有效载荷类型,如'xss''sqli'
Returns:
str: 有效载荷文件路径如果文件不存在则返回None
"""
if self.custom_payloads and os.path.exists(self.custom_payloads):
logger.info(f"使用自定义有效载荷文件: {self.custom_payloads}")
return self.custom_payloads
# 尝试多个位置查找有效载荷文件
payload_paths = []
# 构建可能的文件名
level_filename = f"{payload_type}_level{self.payload_level}.txt"
waf_bypass_filename = f"{payload_type}_waf_bypass.txt"
level1_filename = f"{payload_type}_level1.txt"
# 首先检查项目根目录下的payloads目录
root_payload_dir = os.path.join(self.project_root, 'payloads')
if os.path.exists(root_payload_dir) and os.path.isdir(root_payload_dir):
level_path = os.path.join(root_payload_dir, payload_type, level_filename)
waf_path = os.path.join(root_payload_dir, payload_type, waf_bypass_filename)
level1_path = os.path.join(root_payload_dir, payload_type, level1_filename)
payload_paths.append(level_path)
# 尝试特殊的WAF绕过有效载荷
if self.payload_level >= 2:
payload_paths.append(waf_path)
# 始终包含level1作为后备
payload_paths.append(level1_path)
# 然后检查模块内部的payloads目录
module_payload_dir = os.path.join(self.base_dir, 'payloads')
if os.path.exists(module_payload_dir) and os.path.isdir(module_payload_dir):
level_path = os.path.join(module_payload_dir, payload_type, level_filename)
waf_path = os.path.join(module_payload_dir, payload_type, waf_bypass_filename)
level1_path = os.path.join(module_payload_dir, payload_type, level1_filename)
payload_paths.append(level_path)
# 尝试特殊的WAF绕过有效载荷
if self.payload_level >= 2:
payload_paths.append(waf_path)
# 始终包含level1作为后备
payload_paths.append(level1_path)
# 尝试各个路径,返回第一个存在的文件
found_file = None
for path in payload_paths:
if os.path.exists(path) and os.path.isfile(path):
logger.debug(f"找到有效载荷文件: {path}")
found_file = path
break
if not found_file:
logger.warning(f"找不到类型为 {payload_type} 级别为 {self.payload_level} 的有效载荷文件")
logger.warning(f"尝试搜索的路径: {', '.join(payload_paths)}")
# 尝试使用任何可用的有效载荷文件
for path in payload_paths:
if os.path.exists(path) and os.path.isfile(path):
logger.info(f"使用替代的有效载荷文件: {path}")
return path
# 如果仍然找不到最后一搏尝试root目录下的任何级别
root_payload_type_dir = os.path.join(root_payload_dir, payload_type)
if os.path.exists(root_payload_type_dir) and os.path.isdir(root_payload_type_dir):
for file in os.listdir(root_payload_type_dir):
if file.endswith('.txt'):
path = os.path.join(root_payload_type_dir, file)
logger.info(f"使用最终备选的有效载荷文件: {path}")
return path
# 真的找不到了
logger.error(f"没有找到任何适用于{payload_type}的有效载荷文件")
return None
return found_file

138
xss_scanner/core/logger.py Normal file
View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
日志模块,负责管理日志输出
"""
import os
import logging
import sys
from datetime import datetime
class ColoredFormatter(logging.Formatter):
"""彩色日志格式化器"""
# 定义颜色代码
COLORS = {
'DEBUG': '\033[94m', # 蓝色
'INFO': '\033[92m', # 绿色
'WARNING': '\033[93m', # 黄色
'ERROR': '\033[91m', # 红色
'CRITICAL': '\033[91m\033[1m', # 红色加粗
'RESET': '\033[0m' # 重置
}
def __init__(self, fmt=None, datefmt=None, style='%', use_color=True):
"""
初始化格式化器
Args:
fmt: 日志格式
datefmt: 日期格式
style: 格式风格
use_color: 是否使用彩色输出
"""
super().__init__(fmt, datefmt, style)
self.use_color = use_color and sys.platform != 'win32' or os.name == 'posix'
def format(self, record):
"""
格式化日志记录
Args:
record: 日志记录
Returns:
str: 格式化后的日志
"""
levelname = record.levelname
# 使用原始格式化方法格式化日志
message = super().format(record)
# 如果启用了彩色输出,则添加颜色
if self.use_color and levelname in self.COLORS:
message = f"{self.COLORS[levelname]}{message}{self.COLORS['RESET']}"
return message
def setup_logger(log_level=logging.INFO, no_color=False):
"""
设置日志系统
Args:
log_level: 日志级别
no_color: 是否禁用彩色输出
Returns:
logging.Logger: 日志记录器
"""
# 创建日志记录器
logger = logging.getLogger('xss_scanner')
logger.setLevel(log_level)
# 清除之前的处理器
for handler in logger.handlers:
logger.removeHandler(handler)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
# 设置日志格式
log_format = '[%(asctime)s] [%(levelname)s] %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
# 创建格式化器
formatter = ColoredFormatter(
fmt=log_format,
datefmt=date_format,
use_color=not no_color
)
# 设置处理器的格式化器
console_handler.setFormatter(formatter)
# 将处理器添加到记录器
logger.addHandler(console_handler)
return logger
def setup_file_logger(log_file, log_level=logging.INFO):
"""
设置文件日志
Args:
log_file: 日志文件路径
log_level: 日志级别
Returns:
logging.Logger: 日志记录器
"""
# 创建日志记录器
logger = logging.getLogger('xss_scanner')
# 创建文件处理器
try:
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(log_level)
# 设置日志格式
log_format = '[%(asctime)s] [%(levelname)s] %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
# 创建格式化器
formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
# 设置处理器的格式化器
file_handler.setFormatter(formatter)
# 将处理器添加到记录器
logger.addHandler(file_handler)
except Exception as e:
logger.error(f"设置文件日志失败: {str(e)}")
return logger

View File

@@ -0,0 +1,339 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
扫描引擎核心模块
负责协调各种类型的扫描器和管理扫描过程
"""
import time
import logging
import threading
import queue
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse, urljoin
from xss_scanner.utils.crawler import Crawler
from xss_scanner.utils.http_client import HttpClient
from xss_scanner.scanners.xss_scanner import XSSScanner
from xss_scanner.scanners.csrf_scanner import CSRFScanner
from xss_scanner.scanners.sql_injection_scanner import SQLInjectionScanner
from xss_scanner.scanners.lfi_scanner import LFIScanner
from xss_scanner.scanners.rfi_scanner import RFIScanner
from xss_scanner.scanners.ssrf_scanner import SSRFScanner
from xss_scanner.scanners.xxe_scanner import XXEScanner
logger = logging.getLogger('xss_scanner')
class ScannerEngine:
"""扫描引擎核心类,负责协调不同类型的扫描器"""
def __init__(self, config):
"""
初始化扫描引擎
Args:
config: 配置对象,包含扫描参数
"""
self.config = config
self.http_client = HttpClient(
timeout=config.timeout,
user_agent=config.user_agent,
proxy=config.proxy,
cookies=config.cookies,
headers=config.headers
)
self.crawler = Crawler(
http_client=self.http_client,
max_depth=config.depth,
threads=config.threads,
exclude_pattern=config.exclude_pattern,
include_pattern=config.include_pattern
)
# 初始化各种扫描器
self.scanners = []
self.initialize_scanners()
# 线程池
self.thread_pool = ThreadPoolExecutor(max_workers=config.threads)
# 存储扫描结果
self.results = {
'target': None,
'start_time': None,
'end_time': None,
'scan_info': {
'scan_level': config.scan_level,
'scan_type': config.scan_type,
'threads': config.threads,
'depth': config.depth
},
'vulnerabilities': [],
'statistics': {
'pages_scanned': 0,
'forms_tested': 0,
'parameters_tested': 0,
'vulnerabilities_found': 0
}
}
# 存储已扫描的URL、表单和参数防止重复扫描
self.scanned_urls = set()
self.scanned_forms = set()
self.scanned_param_combinations = set()
def initialize_scanners(self):
"""初始化所有扫描器"""
# 添加XSS扫描器
self.scanners.append(XSSScanner(
http_client=self.http_client,
payload_level=self.config.payload_level,
use_browser=self.config.use_browser
))
# 根据配置添加其他类型的扫描器
if self.config.scan_type in ['all', 'csrf']:
self.scanners.append(CSRFScanner(self.http_client))
if self.config.scan_type in ['all', 'sqli']:
self.scanners.append(SQLInjectionScanner(self.http_client))
if self.config.scan_type in ['all', 'lfi']:
self.scanners.append(LFIScanner(self.http_client))
if self.config.scan_type in ['all', 'rfi']:
self.scanners.append(RFIScanner(self.http_client))
if self.config.scan_type in ['all', 'ssrf']:
self.scanners.append(SSRFScanner(self.http_client))
if self.config.scan_type in ['all', 'xxe']:
self.scanners.append(XXEScanner(self.http_client))
def scan(self, target_url):
"""
对目标进行扫描
Args:
target_url: 目标URL
Returns:
dict: 扫描结果
"""
self.results['target'] = target_url
self.results['start_time'] = time.time()
# 爬取目标站点
logger.info(f"开始爬取目标站点: {target_url}")
pages = self.crawler.crawl(target_url)
# 去重处理爬取的页面
unique_pages = []
page_urls = set()
for page in pages:
url = page['url']
# 标准化URL以便更好地去重
parsed_url = urlparse(url)
normalized_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
if normalized_url not in page_urls:
page_urls.add(normalized_url)
unique_pages.append(page)
self.results['statistics']['pages_scanned'] = len(unique_pages)
logger.info(f"爬取完成,发现 {len(unique_pages)} 个唯一页面")
# 对每个页面进行扫描
for page in unique_pages:
page_url = page['url']
if page_url in self.scanned_urls:
logger.debug(f"跳过已扫描的页面: {page_url}")
continue
self.scanned_urls.add(page_url)
logger.debug(f"扫描页面: {page_url}")
# 对页面中的表单进行测试
for form in page.get('forms', []):
# 通过表单操作和ID创建唯一标识符
form_id = self._get_form_identifier(page_url, form)
if form_id in self.scanned_forms:
logger.debug(f"跳过已扫描的表单: {form_id}")
continue
self.scanned_forms.add(form_id)
self.results['statistics']['forms_tested'] += 1
self._scan_form(page_url, form)
# 对URL参数进行测试
if page.get('params', []):
params_to_scan = []
for param in page.get('params', []):
# 创建URL参数组合的唯一标识符
param_id = f"{page_url}:{param}"
if param_id in self.scanned_param_combinations:
logger.debug(f"跳过已扫描的参数: {param_id}")
continue
self.scanned_param_combinations.add(param_id)
params_to_scan.append(param)
if params_to_scan:
self._scan_params(page_url, params_to_scan)
self.results['statistics']['parameters_tested'] += len(params_to_scan)
# 如果配置了漏洞利用,则尝试利用发现的漏洞
if self.config.exploit and self.results['vulnerabilities']:
self._exploit_vulnerabilities()
self.results['end_time'] = time.time()
self.results['statistics']['vulnerabilities_found'] = len(self.results['vulnerabilities'])
return self.results
def _get_form_identifier(self, url, form):
"""
生成表单的唯一标识符
Args:
url: 页面URL
form: 表单信息
Returns:
str: 表单唯一标识符
"""
form_id = form.get('id', '')
form_action = form.get('action', '')
form_method = form.get('method', 'get')
# 将表单字段排序后连接,生成指纹
fields = sorted([f"{field['name']}:{field['type']}" for field in form.get('fields', [])])
fields_str = ','.join(fields)
return f"{url}:{form_id}:{form_action}:{form_method}:{fields_str}"
def _scan_form(self, url, form):
"""
扫描表单
Args:
url: 页面URL
form: 表单信息
"""
logger.debug(f"扫描表单: {form.get('id', 'unknown')}")
# 对表单中的每个输入字段进行测试
for field in form.get('fields', []):
if field['type'] in ['text', 'search', 'url', 'email', 'password', 'tel', 'number']:
for scanner in self.scanners:
# 跳过不适用于表单的扫描器
if not scanner.can_scan_form():
continue
result = scanner.scan_form(url, form, field)
if result:
# 检查是否为重复的漏洞
is_duplicate = False
for vuln in self.results['vulnerabilities']:
if (vuln['type'] == result['type'] and
vuln.get('url') == result.get('url')):
# 安全检查location键
if 'location' in vuln and 'location' in result:
if vuln['location'] == result['location']:
is_duplicate = True
break
# 如果没有location比较其他可用的键
elif vuln.get('form_id') == result.get('form_id'):
is_duplicate = True
break
if not is_duplicate:
self.results['vulnerabilities'].append(result)
logger.warning(f"在表单中发现漏洞: {result['type']} - {result['description']}")
def _scan_params(self, url, params):
"""
扫描URL参数
Args:
url: 页面URL
params: URL参数列表
"""
logger.debug(f"扫描URL参数: {', '.join(params)}")
for param in params:
for scanner in self.scanners:
# 跳过不适用于URL参数的扫描器
if not scanner.can_scan_params():
continue
result = scanner.scan_parameter(url, param)
if result:
# 检查是否为重复的漏洞
is_duplicate = False
for vuln in self.results['vulnerabilities']:
if (vuln['type'] == result['type'] and
vuln.get('url') == result.get('url')):
# 安全检查parameter键
if 'parameter' in vuln and 'parameter' in result:
if vuln['parameter'] == result['parameter']:
is_duplicate = True
break
# 如果没有parameter比较其他可用的键
elif vuln.get('vulnerability_id') == result.get('vulnerability_id'):
is_duplicate = True
break
if not is_duplicate:
self.results['vulnerabilities'].append(result)
logger.warning(f"在URL参数中发现漏洞: {result['type']} - {result['description']}")
def _exploit_vulnerabilities(self):
"""尝试利用发现的漏洞"""
logger.info("尝试利用发现的漏洞...")
for vuln in self.results['vulnerabilities']:
# 根据漏洞类型选择合适的利用模块
exploit_module = self._get_exploit_module(vuln['type'])
if exploit_module:
exploit_result = exploit_module.exploit(vuln)
if exploit_result:
vuln['exploit_result'] = exploit_result
logger.warning(f"成功利用漏洞: {vuln['type']} - {exploit_result['description']}")
def _get_exploit_module(self, vuln_type):
"""
根据漏洞类型获取对应的利用模块
Args:
vuln_type: 漏洞类型
Returns:
对应的利用模块实例
"""
if vuln_type == 'XSS':
from xss_scanner.exploits.xss_exploit import XSSExploit
return XSSExploit(self.http_client)
elif vuln_type == 'CSRF':
from xss_scanner.exploits.csrf_exploit import CSRFExploit
return CSRFExploit(self.http_client)
elif vuln_type == 'SQL_INJECTION':
from xss_scanner.exploits.sqli_exploit import SQLInjectionExploit
return SQLInjectionExploit(self.http_client)
elif vuln_type == 'LFI':
from xss_scanner.exploits.lfi_exploit import LFIExploit
return LFIExploit(self.http_client)
elif vuln_type == 'RFI':
from xss_scanner.exploits.rfi_exploit import RFIExploit
return RFIExploit(self.http_client)
elif vuln_type == 'SSRF':
from xss_scanner.exploits.ssrf_exploit import SSRFExploit
return SSRFExploit(self.http_client)
elif vuln_type == 'XXE':
from xss_scanner.exploits.xxe_exploit import XXEExploit
return XXEExploit(self.http_client)
return None