From 422140c63f5af93973425c6a5de4b7f6f9139663 Mon Sep 17 00:00:00 2001 From: evilc0deooo Date: Mon, 10 Mar 2025 18:21:57 +0800 Subject: [PATCH] Create swagger-hound.py --- swagger-hound.py | 378 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 378 insertions(+) create mode 100644 swagger-hound.py diff --git a/swagger-hound.py b/swagger-hound.py new file mode 100644 index 0000000..f292a8a --- /dev/null +++ b/swagger-hound.py @@ -0,0 +1,378 @@ +# -*- coding: utf-8 -*- + +import json +import sys +import csv +import argparse +import requests +import re +import random +import urllib3 +from datetime import datetime +from urllib.parse import urlparse +from loguru import logger + +# 禁用安全请求警告 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +logger.remove() +handler_id = logger.add(sys.stderr, level='DEBUG') # 设置输出级别 + +now_time = datetime.now().strftime("%Y%m%d_%H%M%S") + +proxies = { + 'https': 'http://127.0.0.1:7890', + 'http': 'http://127.0.0.1:7890' +} + +# 开启代理 +SET_PROXY = False + +header_agents = [ + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Code/1.96.2 Chrome/128.0.6613.186 Electron/32.2.6 Safari/537.36' +] + + +def http_req(url, method='get', **kwargs): + kwargs.setdefault('verify', False) + kwargs.setdefault('timeout', (10.1, 30.1)) + kwargs.setdefault('allow_redirects', False) + + headers = kwargs.get('headers', {}) + headers.setdefault('User-Agent', random.choice(header_agents)) + # 不允许缓存,每次请求都获取服务器上最新的资源 + headers.setdefault('Cache-Control', 'max-age=0') + kwargs['headers'] = headers + if SET_PROXY: + kwargs['proxies'] = proxies + + conn = getattr(requests, method)(url, **kwargs) + return conn + + +def check_page(url): + """ + 检查当前页面 + """ + res = http_req(url, method='get') + if ' 0: + # 如果存在路径则在原路径上继续拼接 + target = url.rsplit('/', 1)[0] + location + go_api_docs(target) # 调用 api_docs 扫描全部接口 + + except Exception as e: + logger.error(f'[-] {url} error info {e}') + + +def go_swagger_html(url): + """ + 解析 swagger-ui.html 获取 api 接口路径 + """ + response = http_req(url) + response.raise_for_status() + html_content = response.text + # 在 swagger-initializer.js 中获取 swagger.json 接口 + initializer_pattern = r' 0: + base_url = url.rsplit('/', 1)[0] + js_file_url = f'{base_url}/{js_file_path.lstrip("/")}' + else: + js_file_url = f'{domain}/{js_file_path.lstrip("/")}' + + js_response = http_req(js_file_url) + js_response.raise_for_status() + js_content = js_response.text + + # 正则获取 defaultDefinitionUrl 的值 swagger.json 接口路径 + js_pattern = r'const\s+defaultDefinitionUrl\s*=\s*["\']([^"\']+)["\'];' + js_match = re.search(js_pattern, js_content) + if js_match: + api_docs_path = js_match.group(1) + go_api_docs(api_docs_path) + return + + # 未找到 swagger-initializer.js 文件或 defaultDefinitionUrl 定义, 则尝试查找 springfox.js 文件 + springfox_pattern = r']*>', lambda match: match.group(0).replace('"', "'"), data) + data = json.loads(result, strict=False) + + if 'basePath' in data.keys(): + base_path = data['basePath'] + elif 'servers' in data.keys(): + base_path = data['servers']['url'] + else: + base_path = '' + + paths = data.get('paths', {}) + definitions = data.get('definitions', {}) + swagger_result = [] + for path, methods in paths.items(): + for method, details in methods.items(): # get / post / put / update / delete / head... + if method.upper() not in ['GET', 'POST']: # http 请求方式白名单 + continue + req_path = domain + base_path + path + summary = details.get('summary', path) # 概要信息 + consumes = details.get('consumes', []) # 数据请求类型 application/json + params = details.get('parameters', []) + logger.debug(f'test on {summary} => {method} => {req_path}') + param_info = [] + for param in params: + param_name = param.get('name') + param_in = param.get('in') + schema = param.get('schema') + # 判断是否存在自定义的模型或对象 + if schema and '$ref' in schema: + ref = schema['$ref'].split('/')[-1] + if ref in definitions: # 如果在 definitions 中声明了参数属性,则去 definitions 定义中获取参数及属性信息 + # 递归处理定义中的属性 + for prop_name, prop_details in definitions[ref].get('properties', {}).items(): + param_info.append({ + 'name': prop_name, + 'in': param_in, + 'type': prop_details.get('type') + }) + else: + param_type = param.get('type') + param_info.append({ + 'name': param_name, + 'in': param_in, + 'type': param_type + }) + + # 解析 swagger 获取到所有需要的数据 + swagger_result.append({ + 'summary': summary, + 'req_path': req_path, + 'method': method, + 'consumes': consumes, + 'parameters': param_info + }) + + black_list_status = [401, 404, 502, 503] # 状态码黑名单 + for item in swagger_result: + summary = item['summary'] + req_path = item['req_path'] + method = item['method'] + consumes = item['consumes'] + parameters = item['parameters'] + # 生成发送的 Body 数据 + filled_params, new_url = fill_parameters(parameters, req_path) + headers = {} + + if 'application/json' in consumes: + headers = {'Content-Type': 'application/json'} + if method.lower() == 'get': + response = http_req(new_url, method='get', params=filled_params) + if response.status_code in black_list_status: + logger.debug(f'[-] {method} {new_url} req status is {response.status_code}') + continue + if response.status_code == 200: + logger.debug(f'[+] {method} {new_url} req status is {response.status_code}') + write_result = [url, new_url, summary, method, consumes, filled_params, response.status_code, response.text] + output_to_csv(write_result) + + elif method.lower() == 'post': + if 'body' in filled_params: + response = http_req(new_url, method='post', json=filled_params['body'], headers=headers) + if response.status_code in black_list_status: + logger.debug(f'[-] {method} {new_url} req status is {response.status_code}') + continue + if response.status_code == 200: + logger.debug(f'[+] {method} {new_url} req status is {response.status_code}') + write_result = [url, new_url, summary, method, consumes, filled_params, response.status_code, response.text] + output_to_csv(write_result) + + else: + response = http_req(new_url, method='post', params=filled_params, headers=headers) + if response.status_code in black_list_status: + logger.debug(f'[-] {method} {new_url} req status is {response.status_code}') + continue + if response.status_code == 200: + logger.debug(f'[+] {method} {new_url} req status is {response.status_code}') + write_result = [url, new_url, summary, method, consumes, filled_params, response.status_code, response.text] + output_to_csv(write_result) + + + except Exception as e: + logger.error(f'[-] {url} error info {e}') + + +def run(target): + """ + 执行程序 + """ + url_type = check_page(target) + if url_type == 1: + logger.success('working on {}'.format(target), 'type: source') + go_resources(target) + elif url_type == 2: + logger.success('working on {}'.format(target), 'type: api-docs') + go_api_docs(target) + else: + logger.success('working on {}'.format(target), 'type: html') + go_swagger_html(target) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('-u', '--url', dest='target_url', help='resource 地址 or api 文档地址 or swagger 首页地址') + parser.add_argument('-f', '--file', dest='url_file', help='批量测试') + args = parser.parse_args() + + logger.add('debug.log', format='{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}') + if args.target_url: + run(args.target_url) + elif args.url_file: + with open(args.url_file, 'r') as f: + urls = [line.strip() for line in f.readlines()] + for target_url in urls: + print(target_url) + run(target_url)