# coding:utf-8 from __future__ import print_function from logging import fatal from typing import Text from bs4 import BeautifulSoup import sys import string import argparse from urllib.parse import urlparse import random import configparser import requests import time import os import re import base64 from lib.core.data import vulnoutput,unvulnoutput,unreachoutput,lock,AliveList,root_path from lib.core.log import logverifyerror,logvuln,logunvuln from requests.exceptions import ConnectTimeout from requests.exceptions import ConnectionError from requests.exceptions import HTTPError from requests.exceptions import TooManyRedirects def get_title(htmlcode): """ 获取网站title use: get_title(html_source) return: title """ soup = BeautifulSoup(htmlcode, 'html.parser') end = str(soup.title).find("") title = str(soup.title)[7:end] + "" if "<title>" in title: title = title.replace("<title>","",10000) if "|" in title: title = title.replace("|"," ",100000) if "\n" in title: title = title.replace("\n","",100000) if "\t" in title: title = title.replace("\t","",100000) if "\r" in title: title = title.replace("\r","",100000) if "`" in title: title = title.replace("`","",100000) if "," in title: title = title.replace(",","",100000) return title def url_handle(url): """ url处理函数 return: dict urldict """ if url.startswith("http"): p = urlparse(url) # pass else: url = "http://" + url p = urlparse(url) if ":" in p.netloc: if "443" in p.netloc.split(":")[1]: url = url.replace("http://","https://") p = urlparse(url) else: pass else: pass return p.scheme+"://"+p.netloc def get_random_ua(): with open("./data/user_agents.txt","r") as f: UAs = [i.strip() for i in f.readlines()] return random.choice(UAs) def random_str(length=10, chars=string.ascii_letters + string.digits): return ''.join(random.sample(chars, length)) def get_local_version(path): cp = configparser.ConfigParser() cp.read(path) return cp["info"]["version"] def get_ceye_dns(): """ 根据你配置的dns,生成一个随机dns子域名 该函数会检查info.ini配置是否正确,并通过ture/false来反馈给调用者 如果配置读取失败会立即停止本次扫描并将错误信息打印到控制台提示用户 Returns: [type]: [description] """ cp = configparser.ConfigParser() cp.read(root_path+"/info.ini") dns = cp["ceye"]["dns"] token = cp["ceye"]["token"] flag = random_str() if len(dns) == 0 or len(token) == 0: return False,"<title>ceye dns或token 配置为空,请重新确认" else: return True,flag + "." + dns def verify_ceye_dns(flag): """ 接通ceye的dns的api 内置sleep功能,必须在3秒以上,否则极其不稳定,考虑到批量扫描时对ceye的负载,我决定最少延时6秒 Args: flag ([type]): [description] Returns: [type]: [description] """ cp = configparser.ConfigParser() cp.read(root_path+"/info.ini") token = cp["ceye"]["token"] api_url = "http://api.ceye.io/v1/records?token="+token+"&type=dns" time.sleep(random.randint(6,10)) req = requests.get(api_url,timeout=10) if "User Not Exists" in req.text: return "ceye 配置错误,无法获取数据" elif flag.lower() in req.text.lower(): return True else: return False def get_latest_revision(): lv = None cp = configparser.ConfigParser() try: req = requests.get("https://raw.githubusercontent.com/openx-org/BLEN/master/info.ini",timeout = 15) cp.read_string(req.text) lv = cp["info"]["version"] except Exception: pass return lv def run(POC_Class,target,proxy=False,output=True,PocRemain="",Alive_mode = False): global vulnoutput,unvulnoutput,unreachoutput,AliveList while not target.empty(): try: target_url = target.get() rVerify = POC_Class(target_url,proxy) poc_name = rVerify._info["name"] # print(poc_name) vuln = rVerify._verify() if Alive_mode == True: if vuln[0] == True: AliveList.add(target_url) continue # print("yes") else: continue if vuln[0] == True: try: vulntitle=get_title(vuln[1]) except: vulntitle = "" lock.acquire() logvuln("POC 剩余 : %s ╭☞ Target 剩余 : %d Vuln %s | WebSite Title:%s "%(PocRemain,target.qsize(),target_url,vulntitle)) # logvuln("%s ╭☞ %d Vuln %s | WebSite Title:%s | Server Response : %s"%(PocRemain,target.qsize(),target_url,vulntitle,vuln[1])) if poc_name in vulnoutput: vulnoutput[poc_name].append(target_url+" || "+vulntitle) else: vulnoutput.update({poc_name:list()}) vulnoutput[poc_name].append(target_url+" || "+vulntitle) lock.release() else: lock.acquire() logunvuln("POC 剩余 : %s ╭☞ Target 剩余 : %d UnVuln %s "%(PocRemain,target.qsize(),target_url)) unvulnoutput.append(target_url) lock.release() except NotImplementedError as e : lock.acquire() logverifyerror("POC 剩余 : %s ╭☞ %d The POC does not support virtualized depiction scan mode Error details:%s "%(PocRemain,target.qsize(),str(e))) unreachoutput.append(target_url+" || Error details"+str(e)) lock.release() pass except TimeoutError as e: lock.acquire() logverifyerror("POC 剩余 : %s ╭☞ %d Connection timed out %s Error details:%s "%(PocRemain,target.qsize(),target,str(e))) unreachoutput.append(target_url+" || Error details"+str(e)) lock.release() pass except HTTPError as e: lock.acquire() logverifyerror("POC 剩余 : %s ╭☞ %d HTTPError occurred %s Error details:%s "%(PocRemain,target.qsize(),target,str(e))) unreachoutput.append(target_url+" || Error details"+str(e)) lock.release() pass except ConnectionError as e: lock.acquire() logverifyerror("POC 剩余 : %s ╭☞ %d Connection error %s Error details:%s "%(PocRemain,target.qsize(),target,str(e))) unreachoutput.append(target_url+" || Error details"+str(e)) lock.release() pass except TooManyRedirects as e: lock.acquire() logverifyerror("POC 剩余 : %s ╭☞ %d The number of resets exceeds the limit, and the goal is discarded %s Error details:%s "%(PocRemain,target.qsize(),target,str(e))) unreachoutput.append(target_url+" || Error details"+str(e)) lock.release() pass except BaseException as e: lock.acquire() logverifyerror("POC 剩余 : %s ╭☞ %d unknown mistake %s Error details:%s "%(PocRemain,target.qsize(),target,str(e))) unreachoutput.append(target_url+" || Error details"+str(e)) lock.release() pass def GetCommand(): parser = argparse.ArgumentParser(description="Blen framewark of POC test", usage="python3 Blen.py -f [target_path] / -u [url] -s [poc_path] --thread 50\n\ python3 Blen.py --fofa-search") searchengine = parser.add_argument_group("SearchEngine") searchengine.add_argument("--fofa-search",action="store_true",help="Fofa Search模式,该参数不需要跟值") # target = parser.add_argument_group("TARGET") target = parser.add_mutually_exclusive_group() target.add_argument("-u","--url",type=str,help="指定单个url,该模式不支持多POC或全量POC (e.g. www.baidu.com)") target.add_argument("-f","--file",type=str,help="指定存有url列表的文件路径,该模式支持多POC或全量POC (e.g. /root/urllist.txt)") script = parser.add_argument_group("Script") script.add_argument("-s","--script",type=str,help="指定POC相对路径,格式见readme.md (e.g. -s poc/jellyfin/jellyfin_fileread_scan/poc.py OR -s all)") system = parser.add_argument_group("System") system.add_argument("--thread",default=10,type=int,help="指定线程数,默认为10,仅扫描时指定线程数有效") system.add_argument("--proxy",default=False,help="指定Http Proxy,仅扫描时指定线程数有效,Example:127.0.0.1:8080 OR http://127.0.0.1:8080") system.add_argument("--output",default=True,help="不用管,扫完了看output目录即可") system.add_argument("--sound",action="store_true",help="扫完了会有铃声提醒,虚拟机里不一定能用") system.add_argument("--version",action="store_true",help="显示本地oFx版本,并根据网络状态给出最新版本号") developer = parser.add_argument_group("Developer(POC开发者工具箱)") developer.add_argument("--add-poc",action="store_true",help="生成POC标准目录结构,该参数不需要跟值") developer.add_argument("--show-error",action = "store_true",help="single mode下展示详细报错信息") if len(sys.argv) == 1: sys.argv.append("-h") args=parser.parse_args() return args def Str2Base64(string): """ str => base64 """ return base64.b64encode(str.encode(string)).decode()