398 lines
12 KiB
Python
398 lines
12 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
爬虫模块,负责爬取目标网站的页面
|
||
"""
|
||
|
||
import re
|
||
import logging
|
||
import threading
|
||
import queue
|
||
import time
|
||
from urllib.parse import urlparse, urljoin, parse_qsl
|
||
from bs4 import BeautifulSoup
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
|
||
logger = logging.getLogger('xss_scanner')
|
||
|
||
class Crawler:
|
||
"""爬虫类,负责爬取网站页面"""
|
||
|
||
def __init__(self, http_client, max_depth=2, threads=5, exclude_pattern=None, include_pattern=None):
|
||
"""
|
||
初始化爬虫
|
||
|
||
Args:
|
||
http_client: HTTP客户端
|
||
max_depth: 最大爬取深度
|
||
threads: 线程数
|
||
exclude_pattern: 排除URL模式
|
||
include_pattern: 包含URL模式
|
||
"""
|
||
self.http_client = http_client
|
||
self.max_depth = max_depth
|
||
self.threads = threads
|
||
self.exclude_pattern = exclude_pattern
|
||
self.include_pattern = include_pattern
|
||
|
||
# 已访问的URL
|
||
self.visited_urls = set()
|
||
|
||
# 待访问的URL队列
|
||
self.url_queue = queue.Queue()
|
||
|
||
# 存储爬取结果
|
||
self.results = []
|
||
|
||
# 线程锁
|
||
self.lock = threading.Lock()
|
||
|
||
# 线程池
|
||
self.thread_pool = None
|
||
|
||
# 记录每个页面的加载状态
|
||
self.page_status = {}
|
||
|
||
# 常见的静态资源文件扩展名
|
||
self.static_extensions = {
|
||
'.css', '.js', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.ico', '.pdf',
|
||
'.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.zip', '.rar', '.tar',
|
||
'.gz', '.mp3', '.mp4', '.avi', '.mov', '.flv', '.wmv'
|
||
}
|
||
|
||
def crawl(self, base_url):
|
||
"""
|
||
爬取指定的网站
|
||
|
||
Args:
|
||
base_url: 基础URL
|
||
|
||
Returns:
|
||
list: 爬取结果,包含页面信息
|
||
"""
|
||
logger.info(f"开始爬取网站: {base_url}")
|
||
|
||
# 重置状态
|
||
self.visited_urls = set()
|
||
self.url_queue = queue.Queue()
|
||
self.results = []
|
||
self.page_status = {}
|
||
|
||
# 解析基础URL
|
||
parsed_base_url = urlparse(base_url)
|
||
self.base_domain = parsed_base_url.netloc
|
||
|
||
# 添加基础URL到队列
|
||
self.url_queue.put((base_url, 0)) # (url, depth)
|
||
|
||
# 创建线程池
|
||
self.thread_pool = ThreadPoolExecutor(max_workers=self.threads)
|
||
|
||
# 创建并启动爬虫线程
|
||
workers = []
|
||
for _ in range(self.threads):
|
||
worker = threading.Thread(target=self._worker)
|
||
workers.append(worker)
|
||
worker.start()
|
||
|
||
# 等待所有线程完成
|
||
for worker in workers:
|
||
worker.join()
|
||
|
||
logger.info(f"爬取完成,共发现 {len(self.results)} 个页面")
|
||
|
||
return self.results
|
||
|
||
def _worker(self):
|
||
"""爬虫工作线程"""
|
||
while True:
|
||
try:
|
||
# 获取URL和深度
|
||
url, depth = self.url_queue.get(block=False)
|
||
|
||
# 处理URL
|
||
self._process_url(url, depth)
|
||
|
||
# 标记任务完成
|
||
self.url_queue.task_done()
|
||
except queue.Empty:
|
||
# 队列为空,检查是否所有线程都空闲
|
||
with self.lock:
|
||
if self.url_queue.empty():
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"爬虫线程错误: {str(e)}")
|
||
|
||
def _process_url(self, url, depth):
|
||
"""
|
||
处理URL
|
||
|
||
Args:
|
||
url: 要处理的URL
|
||
depth: 当前深度
|
||
"""
|
||
# 如果超过最大深度,则跳过
|
||
if depth > self.max_depth:
|
||
return
|
||
|
||
# 如果URL已访问,则跳过
|
||
if url in self.visited_urls:
|
||
return
|
||
|
||
# 添加到已访问集合
|
||
with self.lock:
|
||
self.visited_urls.add(url)
|
||
|
||
# 检查URL是否符合过滤条件
|
||
if not self._should_crawl(url):
|
||
return
|
||
|
||
logger.debug(f"爬取页面: {url}")
|
||
|
||
# 发送HTTP请求
|
||
response = self.http_client.get(url)
|
||
if not response or response.status_code != 200:
|
||
logger.debug(f"页面获取失败: {url}, 状态码: {response.status_code if response else 'None'}")
|
||
return
|
||
|
||
# 解析页面内容
|
||
content_type = response.headers.get('Content-Type', '')
|
||
if 'text/html' not in content_type:
|
||
logger.debug(f"跳过非HTML页面: {url}, Content-Type: {content_type}")
|
||
return
|
||
|
||
# 解析HTML
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
|
||
# 提取页面信息
|
||
page_info = self._extract_page_info(url, soup, response)
|
||
|
||
# 添加到结果
|
||
with self.lock:
|
||
self.results.append(page_info)
|
||
|
||
# 如果未达到最大深度,则提取链接
|
||
if depth < self.max_depth:
|
||
links = self._extract_links(url, soup)
|
||
for link in links:
|
||
# 添加到队列
|
||
self.url_queue.put((link, depth + 1))
|
||
|
||
def _extract_page_info(self, url, soup, response):
|
||
"""
|
||
提取页面信息
|
||
|
||
Args:
|
||
url: 页面URL
|
||
soup: BeautifulSoup对象
|
||
response: 响应对象
|
||
|
||
Returns:
|
||
dict: 页面信息
|
||
"""
|
||
# 提取页面标题
|
||
title = soup.title.string if soup.title else "No Title"
|
||
|
||
# 提取表单
|
||
forms = self._extract_forms(url, soup)
|
||
|
||
# 提取URL参数
|
||
params = self._extract_params(url)
|
||
|
||
# 提取JavaScript事件
|
||
events = self._extract_js_events(soup)
|
||
|
||
# 提取HTTP头
|
||
headers = dict(response.headers)
|
||
|
||
return {
|
||
'url': url,
|
||
'title': title,
|
||
'forms': forms,
|
||
'params': params,
|
||
'events': events,
|
||
'headers': headers,
|
||
'status_code': response.status_code,
|
||
'content_length': len(response.content),
|
||
'cookies': dict(response.cookies)
|
||
}
|
||
|
||
def _extract_links(self, base_url, soup):
|
||
"""
|
||
提取页面中的链接
|
||
|
||
Args:
|
||
base_url: 基础URL
|
||
soup: BeautifulSoup对象
|
||
|
||
Returns:
|
||
list: 提取的链接列表
|
||
"""
|
||
links = []
|
||
|
||
# 提取<a>标签链接
|
||
for a in soup.find_all('a', href=True):
|
||
link = a['href'].strip()
|
||
if link:
|
||
full_url = urljoin(base_url, link)
|
||
links.append(full_url)
|
||
|
||
# 提取<form>标签链接
|
||
for form in soup.find_all('form', action=True):
|
||
link = form['action'].strip()
|
||
if link:
|
||
full_url = urljoin(base_url, link)
|
||
links.append(full_url)
|
||
|
||
# 过滤链接
|
||
filtered_links = []
|
||
for link in links:
|
||
# 跳过锚点链接
|
||
if '#' in link:
|
||
link = link.split('#')[0]
|
||
if not link:
|
||
continue
|
||
|
||
# 跳过JavaScript链接
|
||
if link.startswith('javascript:'):
|
||
continue
|
||
|
||
# 跳过邮件链接
|
||
if link.startswith('mailto:'):
|
||
continue
|
||
|
||
# 跳过电话链接
|
||
if link.startswith('tel:'):
|
||
continue
|
||
|
||
# 跳过静态资源文件
|
||
parsed_link = urlparse(link)
|
||
path = parsed_link.path.lower()
|
||
if any(path.endswith(ext) for ext in self.static_extensions):
|
||
continue
|
||
|
||
# 只爬取同一域名
|
||
if parsed_link.netloc and parsed_link.netloc != self.base_domain:
|
||
continue
|
||
|
||
filtered_links.append(link)
|
||
|
||
return list(set(filtered_links))
|
||
|
||
def _extract_forms(self, base_url, soup):
|
||
"""
|
||
提取页面中的表单
|
||
|
||
Args:
|
||
base_url: 基础URL
|
||
soup: BeautifulSoup对象
|
||
|
||
Returns:
|
||
list: 表单列表
|
||
"""
|
||
forms = []
|
||
|
||
for form in soup.find_all('form'):
|
||
form_info = {
|
||
'id': form.get('id', ''),
|
||
'name': form.get('name', ''),
|
||
'method': form.get('method', 'get').upper(),
|
||
'action': urljoin(base_url, form.get('action', '')),
|
||
'fields': []
|
||
}
|
||
|
||
# 提取表单字段
|
||
for field in form.find_all(['input', 'textarea', 'select']):
|
||
# 跳过隐藏字段
|
||
if field.name == 'input' and field.get('type') == 'hidden':
|
||
continue
|
||
|
||
field_info = {
|
||
'name': field.get('name', ''),
|
||
'id': field.get('id', ''),
|
||
'type': field.get('type', 'text') if field.name == 'input' else field.name,
|
||
'value': field.get('value', '')
|
||
}
|
||
|
||
form_info['fields'].append(field_info)
|
||
|
||
forms.append(form_info)
|
||
|
||
return forms
|
||
|
||
def _extract_params(self, url):
|
||
"""
|
||
提取URL参数
|
||
|
||
Args:
|
||
url: URL
|
||
|
||
Returns:
|
||
list: 参数列表
|
||
"""
|
||
parsed_url = urlparse(url)
|
||
params = [p[0] for p in parse_qsl(parsed_url.query)]
|
||
return params
|
||
|
||
def _extract_js_events(self, soup):
|
||
"""
|
||
提取JavaScript事件
|
||
|
||
Args:
|
||
soup: BeautifulSoup对象
|
||
|
||
Returns:
|
||
list: 事件列表
|
||
"""
|
||
events = []
|
||
|
||
# 常见的JavaScript事件属性
|
||
js_events = [
|
||
'onclick', 'onmouseover', 'onmouseout', 'onload', 'onerror', 'onsubmit',
|
||
'onchange', 'onkeyup', 'onkeydown', 'onkeypress', 'onblur', 'onfocus',
|
||
'onreset', 'onselect', 'onabort', 'ondblclick', 'onmousedown', 'onmouseup',
|
||
'onmousemove', 'onunload'
|
||
]
|
||
|
||
# 查找所有带有JavaScript事件的标签
|
||
for tag in soup.find_all():
|
||
for event in js_events:
|
||
if tag.has_attr(event):
|
||
events.append({
|
||
'tag': tag.name,
|
||
'event': event,
|
||
'code': tag[event]
|
||
})
|
||
|
||
return events
|
||
|
||
def _should_crawl(self, url):
|
||
"""
|
||
检查URL是否应该爬取
|
||
|
||
Args:
|
||
url: URL
|
||
|
||
Returns:
|
||
bool: 是否应该爬取
|
||
"""
|
||
# 检查是否是HTTP或HTTPS协议
|
||
if not url.startswith(('http://', 'https://')):
|
||
return False
|
||
|
||
# 检查排除模式
|
||
if self.exclude_pattern and re.search(self.exclude_pattern, url):
|
||
return False
|
||
|
||
# 检查包含模式
|
||
if self.include_pattern and not re.search(self.include_pattern, url):
|
||
return False
|
||
|
||
# 检查是否是静态资源文件
|
||
parsed_url = urlparse(url)
|
||
path = parsed_url.path.lower()
|
||
if any(path.endswith(ext) for ext in self.static_extensions):
|
||
return False
|
||
|
||
return True |