2025-03-10 18:21:57 +08:00
# -*- coding: utf-8 -*-
2025-03-10 18:22:24 +08:00
# https://github.com/evilc0deooo/SwaggerHound
2025-03-10 18:21:57 +08:00
import json
import sys
import csv
import argparse
import requests
import re
import random
import urllib3
from datetime import datetime
from urllib . parse import urlparse
from loguru import logger
# 禁用安全请求警告
urllib3 . disable_warnings ( urllib3 . exceptions . InsecureRequestWarning )
logger . remove ( )
handler_id = logger . add ( sys . stderr , level = ' DEBUG ' ) # 设置输出级别
now_time = datetime . now ( ) . strftime ( " % Y % m %d _ % H % M % S " )
proxies = {
' https ' : ' http://127.0.0.1:7890 ' ,
' http ' : ' http://127.0.0.1:7890 '
}
# 开启代理
SET_PROXY = False
header_agents = [
' Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 ' ,
' Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A ' ,
' Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Code/1.96.2 Chrome/128.0.6613.186 Electron/32.2.6 Safari/537.36 '
]
def http_req ( url , method = ' get ' , * * kwargs ) :
kwargs . setdefault ( ' verify ' , False )
kwargs . setdefault ( ' timeout ' , ( 10.1 , 30.1 ) )
kwargs . setdefault ( ' allow_redirects ' , False )
headers = kwargs . get ( ' headers ' , { } )
headers . setdefault ( ' User-Agent ' , random . choice ( header_agents ) )
# 不允许缓存,每次请求都获取服务器上最新的资源
headers . setdefault ( ' Cache-Control ' , ' max-age=0 ' )
kwargs [ ' headers ' ] = headers
if SET_PROXY :
kwargs [ ' proxies ' ] = proxies
conn = getattr ( requests , method ) ( url , * * kwargs )
return conn
def check_page ( url ) :
"""
检查当前页面
"""
res = http_req ( url , method = ' get ' )
if ' <html ' in res . text :
logger . debug ( ' [+] 输入为 swagger 首页,开始解析 api 文档地址 ' )
return 3 # swagger-html
elif ' " parameters " ' in res . text :
logger . debug ( ' [+] 输入为 api 文档地址,开始构造请求发包 ' )
return 2 # api_docs
elif ' " location " ' in res . text :
logger . debug ( ' [+] 输入为 resource 地址,开始解析 api 文档地址 ' )
return 1 # resource
def fill_parameters ( parameters , url ) :
"""
填充测试数据并替换 URL 中的占位符
"""
filled_params = { }
path_params = { }
for param in parameters :
param_name = param [ ' name ' ]
param_in = param [ ' in ' ]
param_type = param [ ' type ' ]
# 根据类型填充默认值
if param_type == ' string ' :
value = ' a '
elif param_type == ' integer ' :
value = 1
elif param_type == ' boolean ' :
value = True
else :
value = ' '
if param_in == ' query ' :
filled_params [ param_name ] = value
elif param_in == ' path ' :
path_params [ param_name ] = value
filled_params [ param_name ] = value
elif param_in == ' body ' :
if ' body ' not in filled_params :
filled_params [ ' body ' ] = { }
filled_params [ ' body ' ] [ param_name ] = value
# 替换 URL 中的占位符
for key , value in path_params . items ( ) :
url = url . replace ( f ' {{ { key } }} ' , str ( value ) )
return filled_params , url
def get_api_docs_path ( resource_url ) :
"""
输入 resource 解析 api 文档 url
"""
domain = urlparse ( resource_url )
domain = domain . scheme + ' :// ' + domain . netloc
try :
res = http_req ( resource_url , method = ' get ' )
resources = json . loads ( res . text )
except Exception as e :
logger . error ( f ' [-] { resource_url } error info { e } ' )
return [ ]
paths = [ ]
if isinstance ( resources , tuple ) :
if ' apis ' in resources . keys ( ) : # 版本不同, 格式不一样
for api_docs in resources . get ( ' apis ' , { } ) :
paths . append ( domain + api_docs [ ' path ' ] )
return paths
else :
for i in resources :
paths . append ( domain + i [ ' location ' ] )
return paths
def output_to_csv ( data ) :
_f = open ( f ' { now_time } .csv ' , ' a ' , newline = ' ' , encoding = ' utf-8 ' ) # 使用追加模式
writer = csv . writer ( _f )
writer . writerow ( data )
def go_resources ( url ) :
"""
解析 swagger - resources 获取 api - docs
"""
try :
_domain = urlparse ( url )
domain = _domain . scheme + ' :// ' + _domain . netloc
domain_path = _domain . path
stripped_path = domain_path . strip ( ' / ' )
res = http_req ( url )
data = json . loads ( res . text )
for _i in data :
location = _i . get ( ' location ' )
# 判断如果不存在路径则直接进行拼接
target = domain + location
if len ( stripped_path ) > 0 :
# 如果存在路径则在原路径上继续拼接
target = url . rsplit ( ' / ' , 1 ) [ 0 ] + location
go_api_docs ( target ) # 调用 api_docs 扫描全部接口
except Exception as e :
logger . error ( f ' [-] { url } error info { e } ' )
def go_swagger_html ( url ) :
"""
解析 swagger - ui . html 获取 api 接口路径
"""
response = http_req ( url )
response . raise_for_status ( )
html_content = response . text
# 在 swagger-initializer.js 中获取 swagger.json 接口
initializer_pattern = r ' <script \ s+src=[ " \' ]([^ " \' ]*swagger-initializer \ .js[^ " \' ]*)[ " \' ] '
initializer_match = re . search ( initializer_pattern , html_content )
if initializer_match :
js_file_path = initializer_match . group ( 1 )
if js_file_path . startswith ( ' http ' ) :
js_file_url = js_file_path
else :
_domain = urlparse ( url )
domain = _domain . scheme + ' :// ' + _domain . netloc
domain_path = _domain . path
stripped_path = domain_path . strip ( ' / ' )
if len ( stripped_path ) > 0 :
base_url = url . rsplit ( ' / ' , 1 ) [ 0 ]
js_file_url = f ' { base_url } / { js_file_path . lstrip ( " / " ) } '
else :
js_file_url = f ' { domain } / { js_file_path . lstrip ( " / " ) } '
js_response = http_req ( js_file_url )
js_response . raise_for_status ( )
js_content = js_response . text
# 正则获取 defaultDefinitionUrl 的值 swagger.json 接口路径
js_pattern = r ' const \ s+defaultDefinitionUrl \ s*= \ s*[ " \' ]([^ " \' ]+)[ " \' ]; '
js_match = re . search ( js_pattern , js_content )
if js_match :
api_docs_path = js_match . group ( 1 )
go_api_docs ( api_docs_path )
return
# 未找到 swagger-initializer.js 文件或 defaultDefinitionUrl 定义, 则尝试查找 springfox.js 文件
springfox_pattern = r ' <script \ s+src=[ " \' ]([^ " \' ]*springfox \ .js[^ " \' ]*)[ " \' ] '
springfox_match = re . search ( springfox_pattern , html_content )
if not springfox_match :
logger . debug ( ' [-] 未找到 swagger-initializer.js 和 springfox.js 文件路径 ' )
return
# 获取 springfox.js 文件的相对或绝对路径
springfox_file_path = springfox_match . group ( 1 )
if springfox_file_path . startswith ( ' http ' ) :
springfox_file_url = springfox_file_path
else :
base_url = url . rsplit ( ' / ' , 1 ) [ 0 ]
springfox_file_url = f ' { base_url } / { springfox_file_path . lstrip ( " / " ) } '
# 发送请求获取 springfox.js 文件内容
springfox_response = http_req ( springfox_file_url )
springfox_response . raise_for_status ( )
springfox_content = springfox_response . text
if " /swagger-resources " in springfox_content :
base_url = url . rsplit ( ' / ' , 1 ) [ 0 ]
resource_url = f " { base_url } /swagger-resources "
go_resources ( resource_url )
return
def go_api_docs ( url ) :
"""
开始 api - docs 解析并扫描
"""
try :
domain = urlparse ( url )
domain = domain . scheme + ' :// ' + domain . netloc
res = http_req ( url )
if res . status_code != 200 :
logger . error ( f ' [-] { url } req status is { res . status_code } ' )
return
try :
data = json . loads ( res . text )
except json . JSONDecodeError :
# 遇到 html 标签内存在双引号 json.loads 无法格式化, 需要特殊处理
data = res . text . replace ( " ' " , ' " ' )
result = re . sub ( r ' <[^>]*> ' , lambda match : match . group ( 0 ) . replace ( ' " ' , " ' " ) , data )
data = json . loads ( result , strict = False )
if ' basePath ' in data . keys ( ) :
base_path = data [ ' basePath ' ]
elif ' servers ' in data . keys ( ) :
base_path = data [ ' servers ' ] [ ' url ' ]
else :
base_path = ' '
paths = data . get ( ' paths ' , { } )
definitions = data . get ( ' definitions ' , { } )
swagger_result = [ ]
for path , methods in paths . items ( ) :
for method , details in methods . items ( ) : # get / post / put / update / delete / head...
if method . upper ( ) not in [ ' GET ' , ' POST ' ] : # http 请求方式白名单
continue
req_path = domain + base_path + path
summary = details . get ( ' summary ' , path ) # 概要信息
consumes = details . get ( ' consumes ' , [ ] ) # 数据请求类型 application/json
params = details . get ( ' parameters ' , [ ] )
logger . debug ( f ' test on { summary } => { method } => { req_path } ' )
param_info = [ ]
for param in params :
param_name = param . get ( ' name ' )
param_in = param . get ( ' in ' )
schema = param . get ( ' schema ' )
# 判断是否存在自定义的模型或对象
if schema and ' $ref ' in schema :
ref = schema [ ' $ref ' ] . split ( ' / ' ) [ - 1 ]
if ref in definitions : # 如果在 definitions 中声明了参数属性,则去 definitions 定义中获取参数及属性信息
# 递归处理定义中的属性
for prop_name , prop_details in definitions [ ref ] . get ( ' properties ' , { } ) . items ( ) :
param_info . append ( {
' name ' : prop_name ,
' in ' : param_in ,
' type ' : prop_details . get ( ' type ' )
} )
else :
param_type = param . get ( ' type ' )
param_info . append ( {
' name ' : param_name ,
' in ' : param_in ,
' type ' : param_type
} )
# 解析 swagger 获取到所有需要的数据
swagger_result . append ( {
' summary ' : summary ,
' req_path ' : req_path ,
' method ' : method ,
' consumes ' : consumes ,
' parameters ' : param_info
} )
black_list_status = [ 401 , 404 , 502 , 503 ] # 状态码黑名单
for item in swagger_result :
summary = item [ ' summary ' ]
req_path = item [ ' req_path ' ]
method = item [ ' method ' ]
consumes = item [ ' consumes ' ]
parameters = item [ ' parameters ' ]
# 生成发送的 Body 数据
filled_params , new_url = fill_parameters ( parameters , req_path )
headers = { }
if ' application/json ' in consumes :
headers = { ' Content-Type ' : ' application/json ' }
if method . lower ( ) == ' get ' :
response = http_req ( new_url , method = ' get ' , params = filled_params )
if response . status_code in black_list_status :
logger . debug ( f ' [-] { method } { new_url } req status is { response . status_code } ' )
continue
if response . status_code == 200 :
logger . debug ( f ' [+] { method } { new_url } req status is { response . status_code } ' )
write_result = [ url , new_url , summary , method , consumes , filled_params , response . status_code , response . text ]
output_to_csv ( write_result )
elif method . lower ( ) == ' post ' :
if ' body ' in filled_params :
response = http_req ( new_url , method = ' post ' , json = filled_params [ ' body ' ] , headers = headers )
if response . status_code in black_list_status :
logger . debug ( f ' [-] { method } { new_url } req status is { response . status_code } ' )
continue
if response . status_code == 200 :
logger . debug ( f ' [+] { method } { new_url } req status is { response . status_code } ' )
write_result = [ url , new_url , summary , method , consumes , filled_params , response . status_code , response . text ]
output_to_csv ( write_result )
else :
response = http_req ( new_url , method = ' post ' , params = filled_params , headers = headers )
if response . status_code in black_list_status :
logger . debug ( f ' [-] { method } { new_url } req status is { response . status_code } ' )
continue
if response . status_code == 200 :
logger . debug ( f ' [+] { method } { new_url } req status is { response . status_code } ' )
write_result = [ url , new_url , summary , method , consumes , filled_params , response . status_code , response . text ]
output_to_csv ( write_result )
except Exception as e :
logger . error ( f ' [-] { url } error info { e } ' )
def run ( target ) :
"""
执行程序
"""
url_type = check_page ( target )
if url_type == 1 :
logger . success ( ' working on {} ' . format ( target ) , ' type: source ' )
go_resources ( target )
elif url_type == 2 :
logger . success ( ' working on {} ' . format ( target ) , ' type: api-docs ' )
go_api_docs ( target )
else :
logger . success ( ' working on {} ' . format ( target ) , ' type: html ' )
go_swagger_html ( target )
if __name__ == ' __main__ ' :
parser = argparse . ArgumentParser ( )
parser . add_argument ( ' -u ' , ' --url ' , dest = ' target_url ' , help = ' resource 地址 or api 文档地址 or swagger 首页地址 ' )
parser . add_argument ( ' -f ' , ' --file ' , dest = ' url_file ' , help = ' 批量测试 ' )
args = parser . parse_args ( )
logger . add ( ' debug.log ' , format = ' { time:YYYY-MM-DD at HH:mm:ss} | {level} | {message} ' )
if args . target_url :
run ( args . target_url )
elif args . url_file :
with open ( args . url_file , ' r ' ) as f :
urls = [ line . strip ( ) for line in f . readlines ( ) ]
for target_url in urls :
print ( target_url )
run ( target_url )