2021-12-21 16:37:18 +08:00
# coding:utf-8
from __future__ import print_function
from logging import fatal
from typing import Text
from bs4 import BeautifulSoup
import sys
import string
import argparse
from urllib . parse import urlparse
import random
import configparser
import requests
import time
import os
import re
import base64
from lib . core . data import vulnoutput , unvulnoutput , unreachoutput , lock , AliveList , root_path
from lib . core . log import logverifyerror , logvuln , logunvuln
from requests . exceptions import ConnectTimeout
from requests . exceptions import ConnectionError
from requests . exceptions import HTTPError
from requests . exceptions import TooManyRedirects
def get_title ( htmlcode ) :
"""
获取网站title
use :
get_title ( html_source )
return :
title
"""
soup = BeautifulSoup ( htmlcode , ' html.parser ' )
end = str ( soup . title ) . find ( " </title> " )
title = str ( soup . title ) [ 7 : end ] + " <title> "
if " <title> " in title :
title = title . replace ( " <title> " , " " , 10000 )
if " | " in title :
title = title . replace ( " | " , " " , 100000 )
if " \n " in title :
title = title . replace ( " \n " , " " , 100000 )
if " \t " in title :
title = title . replace ( " \t " , " " , 100000 )
if " \r " in title :
title = title . replace ( " \r " , " " , 100000 )
if " ` " in title :
title = title . replace ( " ` " , " " , 100000 )
if " , " in title :
title = title . replace ( " , " , " " , 100000 )
return title
def url_handle ( url ) :
"""
url处理函数
return :
dict urldict
"""
if url . startswith ( " http " ) :
p = urlparse ( url )
# pass
else :
url = " http:// " + url
p = urlparse ( url )
if " : " in p . netloc :
if " 443 " in p . netloc . split ( " : " ) [ 1 ] :
url = url . replace ( " http:// " , " https:// " )
p = urlparse ( url )
else :
pass
else :
pass
return p . scheme + " :// " + p . netloc
def get_random_ua ( ) :
with open ( " ./data/user_agents.txt " , " r " ) as f :
UAs = [ i . strip ( ) for i in f . readlines ( ) ]
return random . choice ( UAs )
def random_str ( length = 10 , chars = string . ascii_letters + string . digits ) :
return ' ' . join ( random . sample ( chars , length ) )
def get_local_version ( path ) :
cp = configparser . ConfigParser ( )
cp . read ( path )
return cp [ " info " ] [ " version " ]
def get_ceye_dns ( ) :
"""
根据你配置的dns , 生成一个随机dns子域名
该函数会检查info . ini配置是否正确 , 并通过ture / false来反馈给调用者
如果配置读取失败会立即停止本次扫描并将错误信息打印到控制台提示用户
Returns :
[ type ] : [ description ]
"""
cp = configparser . ConfigParser ( )
cp . read ( root_path + " /info.ini " )
dns = cp [ " ceye " ] [ " dns " ]
token = cp [ " ceye " ] [ " token " ]
flag = random_str ( )
if len ( dns ) == 0 or len ( token ) == 0 :
return False , " <title>ceye dns或token 配置为空,请重新确认</title> "
else :
return True , flag + " . " + dns
def verify_ceye_dns ( flag ) :
"""
接通ceye的dns的api
内置sleep功能 , 必须在3秒以上 , 否则极其不稳定 , 考虑到批量扫描时对ceye的负载 , 我决定最少延时6秒
Args :
flag ( [ type ] ) : [ description ]
Returns :
[ type ] : [ description ]
"""
cp = configparser . ConfigParser ( )
cp . read ( root_path + " /info.ini " )
token = cp [ " ceye " ] [ " token " ]
api_url = " http://api.ceye.io/v1/records?token= " + token + " &type=dns "
time . sleep ( random . randint ( 6 , 10 ) )
req = requests . get ( api_url , timeout = 10 )
if " User Not Exists " in req . text :
return " <title>ceye 配置错误,无法获取数据</title> "
elif flag . lower ( ) in req . text . lower ( ) :
return True
else :
return False
def get_latest_revision ( ) :
lv = None
cp = configparser . ConfigParser ( )
try :
2022-01-04 17:24:54 +08:00
req = requests . get ( " https://raw.githubusercontent.com/openx-org/BLEN/master/info.ini " , timeout = 15 )
2021-12-21 16:37:18 +08:00
cp . read_string ( req . text )
lv = cp [ " info " ] [ " version " ]
except Exception :
pass
return lv
def run ( POC_Class , target , proxy = False , output = True , PocRemain = " " , Alive_mode = False ) :
global vulnoutput , unvulnoutput , unreachoutput , AliveList
while not target . empty ( ) :
try :
target_url = target . get ( )
rVerify = POC_Class ( target_url , proxy )
poc_name = rVerify . _info [ " name " ]
# print(poc_name)
vuln = rVerify . _verify ( )
if Alive_mode == True :
if vuln [ 0 ] == True :
AliveList . add ( target_url )
continue
# print("yes")
else :
continue
if vuln [ 0 ] == True :
try :
vulntitle = get_title ( vuln [ 1 ] )
except :
vulntitle = " "
lock . acquire ( )
logvuln ( " POC 剩余 : %s ╭☞ Target 剩余 : %d Vuln %s | WebSite Title: %s " % ( PocRemain , target . qsize ( ) , target_url , vulntitle ) )
# logvuln("%s ╭☞ %d Vuln %s | WebSite Title: %s | Server Response : %s"%(PocRemain,target.qsize(),target_url,vulntitle,vuln[1]))
if poc_name in vulnoutput :
vulnoutput [ poc_name ] . append ( target_url + " || " + vulntitle )
else :
vulnoutput . update ( { poc_name : list ( ) } )
vulnoutput [ poc_name ] . append ( target_url + " || " + vulntitle )
lock . release ( )
else :
lock . acquire ( )
logunvuln ( " POC 剩余 : %s ╭☞ Target 剩余 : %d UnVuln %s " % ( PocRemain , target . qsize ( ) , target_url ) )
unvulnoutput . append ( target_url )
lock . release ( )
except NotImplementedError as e :
lock . acquire ( )
logverifyerror ( " POC 剩余 : %s ╭☞ %d The POC does not support virtualized depiction scan mode Error details: %s " % ( PocRemain , target . qsize ( ) , str ( e ) ) )
unreachoutput . append ( target_url + " || Error details " + str ( e ) )
lock . release ( )
pass
except TimeoutError as e :
lock . acquire ( )
logverifyerror ( " POC 剩余 : %s ╭☞ %d Connection timed out %s Error details: %s " % ( PocRemain , target . qsize ( ) , target , str ( e ) ) )
unreachoutput . append ( target_url + " || Error details " + str ( e ) )
lock . release ( )
pass
except HTTPError as e :
lock . acquire ( )
logverifyerror ( " POC 剩余 : %s ╭☞ %d HTTPError occurred %s Error details: %s " % ( PocRemain , target . qsize ( ) , target , str ( e ) ) )
unreachoutput . append ( target_url + " || Error details " + str ( e ) )
lock . release ( )
pass
except ConnectionError as e :
lock . acquire ( )
logverifyerror ( " POC 剩余 : %s ╭☞ %d Connection error %s Error details: %s " % ( PocRemain , target . qsize ( ) , target , str ( e ) ) )
unreachoutput . append ( target_url + " || Error details " + str ( e ) )
lock . release ( )
pass
except TooManyRedirects as e :
lock . acquire ( )
logverifyerror ( " POC 剩余 : %s ╭☞ %d The number of resets exceeds the limit, and the goal is discarded %s Error details: %s " % ( PocRemain , target . qsize ( ) , target , str ( e ) ) )
unreachoutput . append ( target_url + " || Error details " + str ( e ) )
lock . release ( )
pass
except BaseException as e :
lock . acquire ( )
logverifyerror ( " POC 剩余 : %s ╭☞ %d unknown mistake %s Error details: %s " % ( PocRemain , target . qsize ( ) , target , str ( e ) ) )
unreachoutput . append ( target_url + " || Error details " + str ( e ) )
lock . release ( )
pass
def GetCommand ( ) :
2022-01-04 17:24:54 +08:00
parser = argparse . ArgumentParser ( description = " Blen framewark of POC test " ,
usage = " python3 Blen.py -f [target_path] / -u [url] -s [poc_path] --thread 50 \n \
python3 Blen . py - - fofa - search " )
2021-12-21 16:37:18 +08:00
searchengine = parser . add_argument_group ( " SearchEngine " )
searchengine . add_argument ( " --fofa-search " , action = " store_true " , help = " Fofa Search模式, 该参数不需要跟值 " )
# target = parser.add_argument_group("TARGET")
target = parser . add_mutually_exclusive_group ( )
target . add_argument ( " -u " , " --url " , type = str , help = " 指定单个url, 该模式不支持多POC或全量POC (e.g. www.baidu.com) " )
target . add_argument ( " -f " , " --file " , type = str , help = " 指定存有url列表的文件路径, 该模式支持多POC或全量POC (e.g. /root/urllist.txt) " )
script = parser . add_argument_group ( " Script " )
script . add_argument ( " -s " , " --script " , type = str , help = " 指定POC相对路径, 格式见readme.md (e.g. -s poc/jellyfin/jellyfin_fileread_scan/poc.py OR -s all) " )
system = parser . add_argument_group ( " System " )
system . add_argument ( " --thread " , default = 10 , type = int , help = " 指定线程数, 默认为10, 仅扫描时指定线程数有效 " )
system . add_argument ( " --proxy " , default = False , help = " 指定Http Proxy, 仅扫描时指定线程数有效, Example: 127.0.0.1:8080 OR http://127.0.0.1:8080 " )
system . add_argument ( " --output " , default = True , help = " 不用管, 扫完了看output目录即可 " )
system . add_argument ( " --sound " , action = " store_true " , help = " 扫完了会有铃声提醒,虚拟机里不一定能用 " )
system . add_argument ( " --version " , action = " store_true " , help = " 显示本地oFx版本, 并根据网络状态给出最新版本号 " )
developer = parser . add_argument_group ( " Developer(POC开发者工具箱) " )
developer . add_argument ( " --add-poc " , action = " store_true " , help = " 生成POC标准目录结构, 该参数不需要跟值 " )
developer . add_argument ( " --show-error " , action = " store_true " , help = " single mode下展示详细报错信息 " )
if len ( sys . argv ) == 1 :
sys . argv . append ( " -h " )
args = parser . parse_args ( )
return args
def Str2Base64 ( string ) :
"""
str = > base64
"""
return base64 . b64encode ( str . encode ( string ) ) . decode ( )