Files
2025-03-09 19:44:06 +08:00

398 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
爬虫模块,负责爬取目标网站的页面
"""
import re
import logging
import threading
import queue
import time
from urllib.parse import urlparse, urljoin, parse_qsl
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger('xss_scanner')
class Crawler:
"""爬虫类,负责爬取网站页面"""
def __init__(self, http_client, max_depth=2, threads=5, exclude_pattern=None, include_pattern=None):
"""
初始化爬虫
Args:
http_client: HTTP客户端
max_depth: 最大爬取深度
threads: 线程数
exclude_pattern: 排除URL模式
include_pattern: 包含URL模式
"""
self.http_client = http_client
self.max_depth = max_depth
self.threads = threads
self.exclude_pattern = exclude_pattern
self.include_pattern = include_pattern
# 已访问的URL
self.visited_urls = set()
# 待访问的URL队列
self.url_queue = queue.Queue()
# 存储爬取结果
self.results = []
# 线程锁
self.lock = threading.Lock()
# 线程池
self.thread_pool = None
# 记录每个页面的加载状态
self.page_status = {}
# 常见的静态资源文件扩展名
self.static_extensions = {
'.css', '.js', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.ico', '.pdf',
'.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.zip', '.rar', '.tar',
'.gz', '.mp3', '.mp4', '.avi', '.mov', '.flv', '.wmv'
}
def crawl(self, base_url):
"""
爬取指定的网站
Args:
base_url: 基础URL
Returns:
list: 爬取结果,包含页面信息
"""
logger.info(f"开始爬取网站: {base_url}")
# 重置状态
self.visited_urls = set()
self.url_queue = queue.Queue()
self.results = []
self.page_status = {}
# 解析基础URL
parsed_base_url = urlparse(base_url)
self.base_domain = parsed_base_url.netloc
# 添加基础URL到队列
self.url_queue.put((base_url, 0)) # (url, depth)
# 创建线程池
self.thread_pool = ThreadPoolExecutor(max_workers=self.threads)
# 创建并启动爬虫线程
workers = []
for _ in range(self.threads):
worker = threading.Thread(target=self._worker)
workers.append(worker)
worker.start()
# 等待所有线程完成
for worker in workers:
worker.join()
logger.info(f"爬取完成,共发现 {len(self.results)} 个页面")
return self.results
def _worker(self):
"""爬虫工作线程"""
while True:
try:
# 获取URL和深度
url, depth = self.url_queue.get(block=False)
# 处理URL
self._process_url(url, depth)
# 标记任务完成
self.url_queue.task_done()
except queue.Empty:
# 队列为空,检查是否所有线程都空闲
with self.lock:
if self.url_queue.empty():
break
except Exception as e:
logger.error(f"爬虫线程错误: {str(e)}")
def _process_url(self, url, depth):
"""
处理URL
Args:
url: 要处理的URL
depth: 当前深度
"""
# 如果超过最大深度,则跳过
if depth > self.max_depth:
return
# 如果URL已访问则跳过
if url in self.visited_urls:
return
# 添加到已访问集合
with self.lock:
self.visited_urls.add(url)
# 检查URL是否符合过滤条件
if not self._should_crawl(url):
return
logger.debug(f"爬取页面: {url}")
# 发送HTTP请求
response = self.http_client.get(url)
if not response or response.status_code != 200:
logger.debug(f"页面获取失败: {url}, 状态码: {response.status_code if response else 'None'}")
return
# 解析页面内容
content_type = response.headers.get('Content-Type', '')
if 'text/html' not in content_type:
logger.debug(f"跳过非HTML页面: {url}, Content-Type: {content_type}")
return
# 解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
# 提取页面信息
page_info = self._extract_page_info(url, soup, response)
# 添加到结果
with self.lock:
self.results.append(page_info)
# 如果未达到最大深度,则提取链接
if depth < self.max_depth:
links = self._extract_links(url, soup)
for link in links:
# 添加到队列
self.url_queue.put((link, depth + 1))
def _extract_page_info(self, url, soup, response):
"""
提取页面信息
Args:
url: 页面URL
soup: BeautifulSoup对象
response: 响应对象
Returns:
dict: 页面信息
"""
# 提取页面标题
title = soup.title.string if soup.title else "No Title"
# 提取表单
forms = self._extract_forms(url, soup)
# 提取URL参数
params = self._extract_params(url)
# 提取JavaScript事件
events = self._extract_js_events(soup)
# 提取HTTP头
headers = dict(response.headers)
return {
'url': url,
'title': title,
'forms': forms,
'params': params,
'events': events,
'headers': headers,
'status_code': response.status_code,
'content_length': len(response.content),
'cookies': dict(response.cookies)
}
def _extract_links(self, base_url, soup):
"""
提取页面中的链接
Args:
base_url: 基础URL
soup: BeautifulSoup对象
Returns:
list: 提取的链接列表
"""
links = []
# 提取<a>标签链接
for a in soup.find_all('a', href=True):
link = a['href'].strip()
if link:
full_url = urljoin(base_url, link)
links.append(full_url)
# 提取<form>标签链接
for form in soup.find_all('form', action=True):
link = form['action'].strip()
if link:
full_url = urljoin(base_url, link)
links.append(full_url)
# 过滤链接
filtered_links = []
for link in links:
# 跳过锚点链接
if '#' in link:
link = link.split('#')[0]
if not link:
continue
# 跳过JavaScript链接
if link.startswith('javascript:'):
continue
# 跳过邮件链接
if link.startswith('mailto:'):
continue
# 跳过电话链接
if link.startswith('tel:'):
continue
# 跳过静态资源文件
parsed_link = urlparse(link)
path = parsed_link.path.lower()
if any(path.endswith(ext) for ext in self.static_extensions):
continue
# 只爬取同一域名
if parsed_link.netloc and parsed_link.netloc != self.base_domain:
continue
filtered_links.append(link)
return list(set(filtered_links))
def _extract_forms(self, base_url, soup):
"""
提取页面中的表单
Args:
base_url: 基础URL
soup: BeautifulSoup对象
Returns:
list: 表单列表
"""
forms = []
for form in soup.find_all('form'):
form_info = {
'id': form.get('id', ''),
'name': form.get('name', ''),
'method': form.get('method', 'get').upper(),
'action': urljoin(base_url, form.get('action', '')),
'fields': []
}
# 提取表单字段
for field in form.find_all(['input', 'textarea', 'select']):
# 跳过隐藏字段
if field.name == 'input' and field.get('type') == 'hidden':
continue
field_info = {
'name': field.get('name', ''),
'id': field.get('id', ''),
'type': field.get('type', 'text') if field.name == 'input' else field.name,
'value': field.get('value', '')
}
form_info['fields'].append(field_info)
forms.append(form_info)
return forms
def _extract_params(self, url):
"""
提取URL参数
Args:
url: URL
Returns:
list: 参数列表
"""
parsed_url = urlparse(url)
params = [p[0] for p in parse_qsl(parsed_url.query)]
return params
def _extract_js_events(self, soup):
"""
提取JavaScript事件
Args:
soup: BeautifulSoup对象
Returns:
list: 事件列表
"""
events = []
# 常见的JavaScript事件属性
js_events = [
'onclick', 'onmouseover', 'onmouseout', 'onload', 'onerror', 'onsubmit',
'onchange', 'onkeyup', 'onkeydown', 'onkeypress', 'onblur', 'onfocus',
'onreset', 'onselect', 'onabort', 'ondblclick', 'onmousedown', 'onmouseup',
'onmousemove', 'onunload'
]
# 查找所有带有JavaScript事件的标签
for tag in soup.find_all():
for event in js_events:
if tag.has_attr(event):
events.append({
'tag': tag.name,
'event': event,
'code': tag[event]
})
return events
def _should_crawl(self, url):
"""
检查URL是否应该爬取
Args:
url: URL
Returns:
bool: 是否应该爬取
"""
# 检查是否是HTTP或HTTPS协议
if not url.startswith(('http://', 'https://')):
return False
# 检查排除模式
if self.exclude_pattern and re.search(self.exclude_pattern, url):
return False
# 检查包含模式
if self.include_pattern and not re.search(self.include_pattern, url):
return False
# 检查是否是静态资源文件
parsed_url = urlparse(url)
path = parsed_url.path.lower()
if any(path.endswith(ext) for ext in self.static_extensions):
return False
return True