Logging functionality (#193)

* Add files via upload

* Add files via upload

* Logging functionality (Resolves #146)

* Created customized logger and setup file

* Start replacing prints

* Custom StreamHandler to allow '\r' as line terminator and updated more prints

* Remove setup.py

* Logger functionality to write red lines and records without format

* Possibility to set logging level when logging without format and usage of debug level instead of verboseOutput

* Replace utils logger function calls

* Fixes

* Import missing info color

* Move xsstrike.py imports to properly initialize loggers and add logger method to debug data using json

* Minor fix
This commit is contained in:
Somdev Sangwan
2019-01-21 04:57:55 +05:30
committed by GitHub
parent 7907db26be
commit 98c6b347b4
15 changed files with 386 additions and 146 deletions

View File

@@ -1,19 +1,22 @@
import concurrent.futures
import re
from core.colors import good, info, green, end
from core.colors import green, end
from core.config import blindParams, xsschecker, threadCount
from core.requester import requester
from core.log import setup_logger
logger = setup_logger(__name__)
def checky(param, paraNames, url, headers, GET, delay, timeout):
if param not in paraNames:
logger.debug('Checking param: {}'.format(param))
response = requester(url, {param: xsschecker},
headers, GET, delay, timeout).text
if '\'%s\'' % xsschecker in response or '"%s"' % xsschecker in response or ' %s ' % xsschecker in response:
paraNames[param] = ''
print('%s Valid parameter found : %s%s%s' %
(good, green, param, end))
logger.good('Valid parameter found: %s%s', green, param)
def arjun(url, GET, headers, delay, timeout):
@@ -26,8 +29,8 @@ def arjun(url, GET, headers, delay, timeout):
foundParam = match[1]
except UnicodeDecodeError:
continue
print('%s Heuristics found a potentially valid parameter: %s%s%s. Priortizing it.' % (
good, green, foundParam, end))
logger.good('Heuristics found a potentially valid parameter: %s%s%s. Priortizing it.' % (
green, foundParam, end))
if foundParam not in blindParams:
blindParams.insert(0, foundParam)
threadpool = concurrent.futures.ThreadPoolExecutor(max_workers=threadCount)
@@ -35,5 +38,5 @@ def arjun(url, GET, headers, delay, timeout):
headers, GET, delay, timeout) for param in blindParams)
for i, _ in enumerate(concurrent.futures.as_completed(futures)):
if i + 1 == len(blindParams) or (i + 1) % threadCount == 0:
print('%s Progress: %i/%i' % (info, i + 1, len(blindParams)), end='\r')
logger.info('Progress: %i/%i\r' % (i + 1, len(blindParams)))
return paraNames

View File

@@ -3,10 +3,13 @@ from random import randint
from time import sleep
from urllib.parse import unquote
from core.colors import end, red, green, yellow, bad, good, info
from core.colors import end, red, green, yellow
from core.config import fuzzes, xsschecker
from core.requester import requester
from core.utils import replaceValue, counter
from core.log import setup_logger
logger = setup_logger(__name__)
def fuzzer(url, params, headers, GET, delay, timeout, WAF, encoding):
@@ -21,23 +24,22 @@ def fuzzer(url, params, headers, GET, delay, timeout, WAF, encoding):
data = replaceValue(params, xsschecker, fuzz, copy.deepcopy)
response = requester(url, data, headers, GET, delay/2, timeout)
except:
print ('\n%s WAF is dropping suspicious requests.' % bad)
logger.error('WAF is dropping suspicious requests.')
if delay == 0:
print ('%s Delay has been increased to %s6%s seconds.' %
(info, green, end))
logger.info('Delay has been increased to %s6%s seconds.' % (green, end))
delay += 6
limit = (delay + 1) * 50
timer = -1
while timer < limit:
print ('\r%s Fuzzing will continue after %s%i%s seconds.\t\t' % (info, green, limit, end), end='\r')
logger.info('\rFuzzing will continue after %s%i%s seconds.\t\t\r' % (green, limit, end))
limit -= 1
sleep(1)
try:
requester(url, params, headers, GET, 0, 10)
print ('\n%s Pheww! Looks like sleeping for %s%i%s seconds worked!' % (
good, green, (delay + 1) * 2), end)
logger.good('Pheww! Looks like sleeping for %s%i%s seconds worked!' % (
green, ((delay + 1) * 2), end))
except:
print ('\n%s Looks like WAF has blocked our IP Address. Sorry!' % bad)
logger.error('\nLooks like WAF has blocked our IP Address. Sorry!')
break
if encoding:
fuzz = encoding(fuzz)
@@ -48,4 +50,4 @@ def fuzzer(url, params, headers, GET, delay, timeout, WAF, encoding):
result = ('%s[blocked] %s' % (red, end))
else: # if the fuzz string was not reflected in the response completely
result = ('%s[filtered]%s' % (yellow, end))
print ('%s %s' % (result, fuzz))
logger.info('%s %s' % (result, fuzz))

194
core/log.py Normal file
View File

@@ -0,0 +1,194 @@
import logging
from .colors import *
__all__ = ['setup_logger', 'console_log_level', 'file_log_level', 'log_file']
console_log_level = 'INFO'
file_log_level = None
log_file = 'xsstrike.log'
"""
Default Logging Levels
CRITICAL = 50
ERROR = 40
WARNING = 30
INFO = 20
DEBUG = 10
"""
VULN_LEVEL_NUM = 60
RUN_LEVEL_NUM = 22
GOOD_LEVEL_NUM = 25
logging.addLevelName(VULN_LEVEL_NUM, 'VULN')
logging.addLevelName(RUN_LEVEL_NUM, 'RUN')
logging.addLevelName(GOOD_LEVEL_NUM, 'GOOD')
def _vuln(self, msg, *args, **kwargs):
if self.isEnabledFor(VULN_LEVEL_NUM):
self._log(VULN_LEVEL_NUM, msg, args, **kwargs)
def _run(self, msg, *args, **kwargs):
if self.isEnabledFor(RUN_LEVEL_NUM):
self._log(RUN_LEVEL_NUM, msg, args, **kwargs)
def _good(self, msg, *args, **kwargs):
if self.isEnabledFor(GOOD_LEVEL_NUM):
self._log(GOOD_LEVEL_NUM, msg, args, **kwargs)
logging.Logger.vuln = _vuln
logging.Logger.run = _run
logging.Logger.good = _good
log_config = {
'DEBUG': {
'value': logging.DEBUG,
'prefix': '{}[*]{}'.format(yellow, end),
},
'INFO': {
'value': logging.INFO,
'prefix': info,
},
'RUN': {
'value': RUN_LEVEL_NUM,
'prefix': run,
},
'GOOD': {
'value': GOOD_LEVEL_NUM,
'prefix': good,
},
'WARNING': {
'value': logging.WARNING,
'prefix': '[!!]'.format(yellow, end),
},
'ERROR': {
'value': logging.ERROR,
'prefix': bad,
},
'CRITICAL': {
'value': logging.CRITICAL,
'prefix': '{}[--]{}'.format(red, end),
},
'VULN': {
'value': VULN_LEVEL_NUM,
'prefix': '{}[++]{}'.format(green, red),
}
}
class CustomFormatter(logging.Formatter):
def format(self, record):
msg = super().format(record)
if record.levelname in log_config.keys():
msg = '%s %s %s' % (log_config[record.levelname]['prefix'], msg, end)
return msg
class CustomStreamHandler(logging.StreamHandler):
default_terminator = '\n'
def emit(self, record):
"""
Overrides emit method to temporally update terminator character in case last log record character is '\r'
:param record:
:return:
"""
if record.msg.endswith('\r'):
self.terminator = '\r'
super().emit(record)
self.terminator = self.default_terminator
else:
super().emit(record)
def _switch_to_no_format_loggers(self):
self.removeHandler(self.console_handler)
self.addHandler(self.no_format_console_handler)
if hasattr(self, 'file_handler') and hasattr(self, 'no_format_file_handler'):
self.removeHandler(self.file_handler)
self.addHandler(self.no_format_file_handler)
def _switch_to_default_loggers(self):
self.removeHandler(self.no_format_console_handler)
self.addHandler(self.console_handler)
if hasattr(self, 'file_handler') and hasattr(self, 'no_format_file_handler'):
self.removeHandler(self.no_format_file_handler)
self.addHandler(self.file_handler)
def _get_level_and_log(self, msg, level):
if level.upper() in log_config.keys():
log_method = getattr(self, level.lower())
log_method(msg)
else:
self.info(msg)
def log_red_line(self, amount=60, level='INFO'):
_switch_to_no_format_loggers(self)
_get_level_and_log(self, red + ('-' * amount) + end, level)
_switch_to_default_loggers(self)
def log_no_format(self, msg='', level='INFO'):
_switch_to_no_format_loggers(self)
_get_level_and_log(self, msg, level)
_switch_to_default_loggers(self)
def log_debug_json(self, msg='', data={}):
if self.isEnabledFor(logging.DEBUG):
if isinstance(data, dict):
import json
try:
self.debug('{} {}'.format(msg, json.dumps(data, indent=2)))
except TypeError:
self.debug('{} {}'.format(msg, data))
else:
self.debug('{} {}'.format(msg, data))
def setup_logger(name='xsstrike'):
from types import MethodType
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
console_handler = CustomStreamHandler(sys.stdout)
console_handler.setLevel(log_config[console_log_level]['value'])
console_handler.setFormatter(CustomFormatter('%(message)s'))
logger.addHandler(console_handler)
# Setup blank handler to temporally use to log without format
no_format_console_handler = CustomStreamHandler(sys.stdout)
no_format_console_handler.setLevel((log_config[console_log_level]['value']))
no_format_console_handler.setFormatter(logging.Formatter(fmt=''))
# Store current handlers
logger.console_handler = console_handler
logger.no_format_console_handler = no_format_console_handler
if file_log_level:
detailed_formatter = logging.Formatter('%(asctime)s %(name)s - %(levelname)s - %(message)s')
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(log_config[file_log_level]['value'])
file_handler.setFormatter(detailed_formatter)
logger.addHandler(file_handler)
# Setup blank handler to temporally use to log without format
no_format_file_handler = logging.FileHandler(log_file)
no_format_file_handler.setLevel(log_config[file_log_level]['value'])
no_format_file_handler.setFormatter(logging.Formatter(fmt=''))
# Store file handlers
logger.file_handler = file_handler
logger.no_format_file_handler = no_format_file_handler
# Create logger method to only log a red line
logger.red_line = MethodType(log_red_line, logger)
# Create logger method to log without format
logger.no_format = MethodType(log_no_format, logger)
# Create logger method to convert data to json and log with debug level
logger.debug_json = MethodType(log_debug_json, logger)
return logger

View File

@@ -2,10 +2,13 @@ import concurrent.futures
from re import findall
from urllib.parse import urlparse
from core.colors import run
from core.utils import getUrl, getParams
from core.requester import requester
from core.zetanize import zetanize
from core.log import setup_logger
logger = setup_logger(__name__)
def photon(seedUrl, headers, level, threadCount, delay, timeout):
@@ -24,7 +27,7 @@ def photon(seedUrl, headers, level, threadCount, delay, timeout):
printableTarget = printableTarget[-40:]
else:
printableTarget = (printableTarget + (' ' * (40 - len(printableTarget))))
print ('%s Parsing %s' % (run, printableTarget), end='\r')
logger.run('Parsing %s\r' % printableTarget)
url = getUrl(target, True)
params = getParams(target, '', True)
if '=' in target: # if there's a = in the url, there should be GET parameters

View File

@@ -2,8 +2,10 @@ import os
import tempfile
from core.config import defaultEditor
from core.colors import info, white, bad, yellow
from core.colors import white, yellow
from core.log import setup_logger
logger = setup_logger(__name__)
def prompt(default=None):
@@ -21,10 +23,10 @@ def prompt(default=None):
try:
os.execvp(editor, [editor, tmpfile.name])
except FileNotFoundError:
print('%s You don\'t have either a default $EDITOR \
value defined nor \'nano\' text editor' % bad)
print('%s Execute %s`export EDITOR=/pat/to/your/editor` \
%sthen run XSStrike again.\n\n' % (info, yellow,white))
logger.error('You don\'t have either a default $EDITOR \
value defined nor \'nano\' text editor')
logger.info('Execute %s`export EDITOR=/pat/to/your/editor` \
%sthen run XSStrike again.\n\n' % (yellow,white))
exit(1)
else:
os.waitpid(child_pid, 0) # wait till the editor gets closed

View File

@@ -4,10 +4,12 @@ import time
from urllib3.exceptions import ProtocolError
import warnings
from core.colors import bad, info
import core.config
from core.config import globalVariables
from core.utils import converter
from core.log import setup_logger
logger = setup_logger(__name__)
warnings.filterwarnings('ignore') # Disable SSL related warnings
@@ -27,6 +29,10 @@ def requester(url, data, headers, GET, delay, timeout):
headers['User-Agent'] = random.choice(user_agents)
elif headers['User-Agent'] == '$':
headers['User-Agent'] = random.choice(user_agents)
logger.debug('Requester url: {}'.format(url))
logger.debug('Requester GET: {}'.format(GET))
logger.debug_json('Requester data:', data)
logger.debug_json('Requester headers:', headers)
try:
if GET:
response = requests.get(url, params=data, headers=headers,
@@ -36,6 +42,6 @@ def requester(url, data, headers, GET, delay, timeout):
timeout=timeout, verify=False, proxies=core.config.proxies)
return response
except ProtocolError:
print ('%s WAF is dropping suspicious requests.')
print ('%s Scanning will continue after 10 minutes.')
logger.warning('WAF is dropping suspicious requests.')
logger.warning('Scanning will continue after 10 minutes.')
time.sleep(600)

View File

@@ -4,10 +4,13 @@ from requests import get
from core.config import changes
from core.colors import run, que, good, info, end, green
from core.log import setup_logger
logger = setup_logger(__name__)
def updater():
print('%s Checking for updates' % run)
logger.run('Checking for updates')
latestCommit = get(
'https://raw.githubusercontent.com/s0md3v/XSStrike/master/core/config.py').text
@@ -15,22 +18,22 @@ def updater():
changelog = re.search(r"changes = '''(.*?)'''", latestCommit)
changelog = changelog.group(1).split(
';') # splitting the changes to form a list
print('%s A new version of XSStrike is available.' % good)
print('%s Changes:' % info)
for change in changelog: # print changes
print('%s>%s %s' % (green, end, change))
logger.good('A new version of XSStrike is available.')
changes_str = 'Changes:\n'
for change in changelog: # prepare changes to print
changes_str += '%s>%s %s\n' % (green, end, change)
logger.info(changes_str)
currentPath = os.getcwd().split('/') # if you know it, you know it
folder = currentPath[-1] # current directory name
path = '/'.join(currentPath) # current directory path
choice = input('%s Would you like to update? [Y/n] ' % que).lower()
if choice != 'n':
print('%s Updating XSStrike' % run)
logger.run('Updating XSStrike')
os.system(
'git clone --quiet https://github.com/s0md3v/XSStrike %s' % (folder))
os.system('cp -r %s/%s/* %s && rm -r %s/%s/ 2>/dev/null' %
(path, folder, path, path, folder))
print('%s Update successful!' % good)
logger.good('Update successful!')
else:
print('%s XSStrike is up to date!' % good)
logger.good('XSStrike is up to date!')

View File

@@ -4,9 +4,9 @@ import re
from urllib.parse import urlparse
import core.config
from core.colors import info, red, end
from core.config import xsschecker
def converter(data, url=False):
if 'str' in str(type(data)):
if url:
@@ -32,16 +32,6 @@ def counter(string):
return len(string)
def verboseOutput(data, name, verbose):
if core.config.globalVariables['verbose']:
if str(type(data)) == '<class \'dict\'>':
try:
print (json.dumps(data, indent=2))
except TypeError:
print (data)
print (data)
def closest(number, numbers):
difference = [abs(list(numbers.values())[0]), {}]
for index, i in numbers.items():

View File

@@ -3,6 +3,9 @@ import re
import sys
from core.requester import requester
from core.log import setup_logger
logger = setup_logger(__name__)
def wafDetector(url, params, headers, GET, delay, timeout):
@@ -16,6 +19,9 @@ def wafDetector(url, params, headers, GET, delay, timeout):
page = response.text
code = str(response.status_code)
headers = str(response.headers)
logger.debug('Waf Detector code: {}'.format(code))
logger.debug_json('Waf Detector headers:', response.headers)
if int(code) >= 400:
bestMatch = [0, None]
for wafName, wafSignature in wafSignatures.items():

1
modes/__init__.py Normal file
View File

@@ -0,0 +1 @@

View File

@@ -1,26 +1,31 @@
import copy
from urllib.parse import urlparse, unquote
from core.colors import run, good, bad, green, end
from core.colors import good, green, end
from core.requester import requester
from core.utils import getUrl, getParams, verboseOutput
from core.utils import getUrl, getParams
from core.log import setup_logger
logger = setup_logger(__name__)
def bruteforcer(target, paramData, payloadList, verbose, encoding, headers, delay, timeout):
GET, POST = (False, True) if paramData else (True, False)
host = urlparse(target).netloc # Extracts host out of the url
verboseOutput(host, 'host', verbose)
logger.debug('Parsed host to bruteforce: {}'.format(host))
url = getUrl(target, GET)
verboseOutput(url, 'url', verbose)
logger.debug('Parsed url to bruteforce: {}'.format(url))
params = getParams(target, paramData, GET)
logger.debug_json('Bruteforcer params:', params)
if not params:
print('%s No parameters to test.' % bad)
logger.error('No parameters to test.')
quit()
verboseOutput(params, 'params', verbose)
for paramName in params.keys():
progress = 1
paramsCopy = copy.deepcopy(params)
for payload in payloadList:
print ('%s Bruteforcing %s[%s%s%s]%s: %i/%i' % (run, green, end, paramName, green, end, progress, len(payloadList)), end='\r')
logger.run('Bruteforcing %s[%s%s%s]%s: %i/%i\r' %
(green, end, paramName, green, end, progress, len(payloadList)))
if encoding:
payload = encoding(unquote(payload))
paramsCopy[paramName] = payload
@@ -29,6 +34,6 @@ def bruteforcer(target, paramData, payloadList, verbose, encoding, headers, dela
if encoding:
payload = encoding(payload)
if payload in response:
print('%s %s' % (good, payload))
logger.info('%s %s' % (good, payload))
progress += 1
print ()
logger.no_format('')

View File

@@ -8,18 +8,21 @@ from core.filterChecker import filterChecker
from core.generator import generator
from core.htmlParser import htmlParser
from core.requester import requester
from core.log import setup_logger
def crawl(scheme, host, main_url, form, domURL, verbose, blindXSS, blindPayload, headers, delay, timeout, skipDOM, encoding):
logger = setup_logger(__name__)
def crawl(scheme, host, main_url, form, domURL, blindXSS, blindPayload, headers, delay, timeout, skipDOM, encoding):
if domURL and not skipDOM:
response = requester(domURL, {}, headers, True, delay, timeout).text
highlighted = dom(response)
if highlighted:
print('%s Potentially vulnerable objects found at %s' %
(good, domURL))
print(red + ('-' * 60) + end)
logger.good('Potentially vulnerable objects found at %s' % domURL)
logger.red_line(level='good')
for line in highlighted:
print(line)
print(red + ('-' * 60) + end)
logger.no_format(line, level='good')
logger.red_line(level='good')
if form:
for each in form.values():
url = each['action']
@@ -53,10 +56,10 @@ def crawl(scheme, host, main_url, form, domURL, verbose, blindXSS, blindPayload,
for confidence, vects in vectors.items():
try:
payload = list(vects)[0]
print('%s Vulnerable webpage: %s%s%s' %
(good, green, url, end))
print('%s Vector for %s%s%s: %s' %
(good, green, paramName, end, payload))
logger.vuln('Vulnerable webpage: %s%s%s' %
(green, url, end))
logger.vuln('Vector for %s%s%s: %s' %
(green, paramName, end, payload))
break
except IndexError:
pass

View File

@@ -5,7 +5,7 @@ from urllib.parse import urlparse, quote, unquote
from core.arjun import arjun
from core.browserEngine import browserEngine
from core.checker import checker
from core.colors import good, bad, end, info, green, run, red, que
from core.colors import good, bad, end, info, green, red, que
import core.config
from core.config import xsschecker, minEfficiency
from core.dom import dom
@@ -13,10 +13,14 @@ from core.filterChecker import filterChecker
from core.generator import generator
from core.htmlParser import htmlParser
from core.requester import requester
from core.utils import getUrl, getParams, verboseOutput
from core.utils import getUrl, getParams
from core.wafDetector import wafDetector
from core.log import setup_logger
def scan(target, paramData, verbose, encoding, headers, delay, timeout, skipDOM, find, skip):
logger = setup_logger(__name__)
def scan(target, paramData, encoding, headers, delay, timeout, skipDOM, find, skip):
GET, POST = (False, True) if paramData else (True, False)
# If the user hasn't supplied the root url with http(s), we will handle it
if not target.startswith('http'):
@@ -26,37 +30,38 @@ def scan(target, paramData, verbose, encoding, headers, delay, timeout, skipDOM,
target = 'https://' + target
except:
target = 'http://' + target
logger.debug('Scan target: {}'.format(target))
response = requester(target, {}, headers, GET, delay, timeout).text
if not skipDOM:
print('%s Checking for DOM vulnerabilities' % run)
logger.run('Checking for DOM vulnerabilities')
highlighted = dom(response)
if highlighted:
print('%s Potentially vulnerable objects found' % good)
print(red + ('-' * 60) + end)
logger.good('Potentially vulnerable objects found')
logger.red_line(level='good')
for line in highlighted:
print(line)
print(red + ('-' * 60) + end)
logger.no_format(line, level='good')
logger.red_line(level='good')
host = urlparse(target).netloc # Extracts host out of the url
verboseOutput(host, 'host', verbose)
logger.debug('Host to scan: {}'.format(host))
url = getUrl(target, GET)
verboseOutput(url, 'url', verbose)
logger.debug('Url to scan: {}'.format(url))
params = getParams(target, paramData, GET)
verboseOutput(params, 'params', verbose)
logger.debug_json('Scan parameters:', params)
if find:
params = arjun(url, GET, headers, delay, timeout)
if not params:
print('%s No parameters to test.' % bad)
logger.error('No parameters to test.')
quit()
WAF = wafDetector(
url, {list(params.keys())[0]: xsschecker}, headers, GET, delay, timeout)
if WAF:
print('%s WAF detected: %s%s%s' % (bad, green, WAF, end))
logger.error('WAF detected: %s%s%s' % (green, WAF, end))
else:
print('%s WAF Status: %sOffline%s' % (good, green, end))
logger.good('WAF Status: %sOffline%s' % (green, end))
for paramName in params.keys():
paramsCopy = copy.deepcopy(params)
print('%s Testing parameter: %s' % (info, paramName))
logger.info('Testing parameter: %s' % paramName)
if encoding:
paramsCopy[paramName] = encoding(xsschecker)
else:
@@ -64,36 +69,36 @@ def scan(target, paramData, verbose, encoding, headers, delay, timeout, skipDOM,
response = requester(url, paramsCopy, headers, GET, delay, timeout)
parsedResponse = htmlParser(response, encoding)
occurences = parsedResponse[0]
verboseOutput(occurences, 'occurences', verbose)
logger.debug('Scan occurences: {}'.format(occurences))
positions = parsedResponse[1]
verboseOutput(positions, 'positions', verbose)
logger.debug('Scan positions: {}'.format(positions))
if not occurences:
print('%s No reflection found' % bad)
logger.error('No reflection found')
continue
else:
print('%s Reflections found: %s' % (info, len(occurences)))
print('%s Analysing reflections' % run)
logger.info('Reflections found: %i' % len(occurences))
logger.run('Analysing reflections')
efficiencies = filterChecker(
url, paramsCopy, headers, GET, delay, occurences, timeout, encoding)
verboseOutput(efficiencies, 'efficiencies', verbose)
print('%s Generating payloads' % run)
logger.debug('Scan efficiencies: {}'.format(efficiencies))
logger.run('Generating payloads')
vectors = generator(occurences, response.text)
verboseOutput(vectors, 'vectors', verbose)
total = 0
for v in vectors.values():
total += len(v)
if total == 0:
print('%s No vectors were crafted' % bad)
logger.error('No vectors were crafted.')
continue
print('%s Payloads generated: %i' % (info, total))
logger.info('Payloads generated: %i' % total)
progress = 0
for confidence, vects in vectors.items():
for vect in vects:
if core.config.globalVariables['path']:
vect = vect.replace('/', '%2F')
printVector = vect
loggerVector = vect
progress += 1
print ('%s Progress: %i/%i' % (run, progress, total), end='\r')
logger.run('Progress: %i/%i\r' % (progress, total))
if confidence == 10:
if not GET:
vect = unquote(vect)
@@ -104,20 +109,20 @@ def scan(target, paramData, verbose, encoding, headers, delay, timeout, skipDOM,
efficiencies.append(0)
bestEfficiency = max(efficiencies)
if bestEfficiency == 100 or (vect[0] == '\\' and bestEfficiency >= 95):
print(('%s-%s' % (red, end)) * 60)
print('%s Payload: %s' % (good, printVector))
print('%s Efficiency: %i' % (info, bestEfficiency))
print('%s Confidence: %i' % (info, confidence))
logger.red_line()
logger.good('Payload: %s' % loggerVector)
logger.info('Efficiency: %i' % bestEfficiency)
logger.info('Confidence: %i' % confidence)
if not skip:
choice = input(
'%s Would you like to continue scanning? [y/N] ' % que).lower()
if choice != 'y':
quit()
elif bestEfficiency > minEfficiency:
print(('%s-%s' % (red, end)) * 60)
print('%s Payload: %s' % (good, printVector))
print('%s Efficiency: %i' % (info, bestEfficiency))
print('%s Confidence: %i' % (info, confidence))
logger.red_line()
logger.good('Payload: %s' % loggerVector)
logger.info('Efficiency: %i' % bestEfficiency)
logger.info('Confidence: %i' % confidence)
else:
if re.search(r'<(a|d3|details)|lt;(a|d3|details)', vect.lower()):
continue
@@ -129,13 +134,13 @@ def scan(target, paramData, verbose, encoding, headers, delay, timeout, skipDOM,
response = requester(url, paramsCopy, headers, GET, delay, timeout).text
success = browserEngine(response)
if success:
print(('%s-%s' % (red, end)) * 60)
print('%s Payload: %s' % (good, printVector))
print('%s Efficiency: %i' % (info, 100))
print('%s Confidence: %i' % (info, 10))
logger.red_line()
logger.good('Payload: %s' % loggerVector)
logger.info('Efficiency: %i' % 100)
logger.info('Confidence: %i' % 10)
if not skip:
choice = input(
'%s Would you like to continue scanning? [y/N] ' % que).lower()
if choice != 'y':
quit()
print ('')
logger.no_format('')

View File

@@ -1,14 +1,18 @@
import copy
from urllib.parse import urlparse
from core.colors import bad, green, end, good, info
from core.colors import green, end
from core.config import xsschecker
from core.fuzzer import fuzzer
from core.requester import requester
from core.utils import getUrl, getParams, verboseOutput
from core.utils import getUrl, getParams
from core.wafDetector import wafDetector
from core.log import setup_logger
def singleFuzz(target, paramData, verbose, encoding, headers, delay, timeout):
logger = setup_logger(__name__)
def singleFuzz(target, paramData, encoding, headers, delay, timeout):
GET, POST = (False, True) if paramData else (True, False)
# If the user hasn't supplied the root url with http(s), we will handle it
if not target.startswith('http'):
@@ -18,24 +22,25 @@ def singleFuzz(target, paramData, verbose, encoding, headers, delay, timeout):
target = 'https://' + target
except:
target = 'http://' + target
logger.debug('Single Fuzz target: {}'.format(target))
host = urlparse(target).netloc # Extracts host out of the url
verboseOutput(host, 'host', verbose)
logger.debug('Single fuzz host: {}'.format(host))
url = getUrl(target, GET)
verboseOutput(url, 'url', verbose)
logger.debug('Single fuzz url: {}'.format(url))
params = getParams(target, paramData, GET)
verboseOutput(params, 'params', verbose)
logger.debug_json('Single fuzz params:', params)
if not params:
print('%s No parameters to test.' % bad)
logger.error('No parameters to test.')
quit()
WAF = wafDetector(
url, {list(params.keys())[0]: xsschecker}, headers, GET, delay, timeout)
if WAF:
print('%s WAF detected: %s%s%s' % (bad, green, WAF, end))
logger.error('WAF detected: %s%s%s' % (green, WAF, end))
else:
print('%s WAF Status: %sOffline%s' % (good, green, end))
logger.good('WAF Status: %sOffline%s' % (green, end))
for paramName in params.keys():
print('%s Fuzzing parameter: %s' % (info, paramName))
logger.info('Fuzzing parameter: %s' % paramName)
paramsCopy = copy.deepcopy(params)
paramsCopy[paramName] = xsschecker
fuzzer(url, paramsCopy, headers, GET,

View File

@@ -2,7 +2,7 @@
from __future__ import print_function
from core.colors import end, info, red, run, white, bad
from core.colors import end, red, white, bad, info
# Just a fancy ass banner
print('''%s
@@ -27,19 +27,9 @@ except ImportError: # throws error in python2
# Let's import whatever we need from standard lib
import argparse
# ... and from core lib
# ... and configurations core lib
import core.config
from core.config import blindPayload
from core.encoders import base64
from core.photon import photon
from core.prompt import prompt
from core.updater import updater
from core.utils import extractHeaders, verboseOutput, reader, converter
from modes.bruteforcer import bruteforcer
from modes.crawl import crawl
from modes.scan import scan
from modes.singleFuzz import singleFuzz
import core.log
# Processing command line arguments, where dest var names will be mapped to local vars with the same name
parser = argparse.ArgumentParser()
@@ -78,19 +68,17 @@ parser.add_argument('--skip', help='don\'t ask to continue',
dest='skip', action='store_true')
parser.add_argument('--skip-dom', help='skip dom checking',
dest='skipDOM', action='store_true')
parser.add_argument('-v', '--vectors', help='verbose output',
dest='verbose', action='store_true')
parser.add_argument('--blind', help='inject blind XSS payload while crawling',
dest='blindXSS', action='store_true')
parser.add_argument('--console-log-level', help='Console logging level',
dest='console_log_level', default=core.log.console_log_level,
choices=core.log.log_config.keys())
parser.add_argument('--file-log-level', help='File logging level', dest='file_log_level',
choices=core.log.log_config.keys(), default=None)
parser.add_argument('--log-file', help='Name of the file to log', dest='log_file',
default=core.log.log_file)
args = parser.parse_args()
if type(args.add_headers) == bool:
headers = extractHeaders(prompt())
elif type(args.add_headers) == str:
headers = extractHeaders(args.add_headers)
else:
from core.config import headers
# Pull all parameter values of dict from argparse namespace into local variables of name == key
# The following works, but the static checkers are too static ;-) locals().update(vars(args))
target = args.target
@@ -112,11 +100,35 @@ threadCount = args.threadCount
delay = args.delay
skip = args.skip
skipDOM = args.skipDOM
verbose = args.verbose
blindXSS = args.blindXSS
core.log.console_log_level = args.console_log_level
core.log.file_log_level = args.file_log_level
core.log.log_file = args.log_file
logger = core.log.setup_logger()
core.config.globalVariables = vars(args)
# Import everything else required from core lib
from core.config import blindPayload
from core.encoders import base64
from core.photon import photon
from core.prompt import prompt
from core.updater import updater
from core.utils import extractHeaders, reader, converter
from modes.bruteforcer import bruteforcer
from modes.crawl import crawl
from modes.scan import scan
from modes.singleFuzz import singleFuzz
if type(args.add_headers) == bool:
headers = extractHeaders(prompt())
elif type(args.add_headers) == str:
headers = extractHeaders(args.add_headers)
else:
from core.config import headers
if path:
paramData = converter(target, target)
elif jsonData:
@@ -142,23 +154,23 @@ if update: # if the user has supplied --update argument
quit() # quitting because files have been changed
if not target and not args_seeds: # if the user hasn't supplied a url
print('\n' + parser.format_help().lower())
logger.no_format('\n' + parser.format_help().lower())
quit()
if fuzz:
singleFuzz(target, paramData, verbose, encoding, headers, delay, timeout)
singleFuzz(target, paramData, encoding, headers, delay, timeout)
elif not recursive and not args_seeds:
if args_file:
bruteforcer(target, paramData, payloadList, verbose, encoding, headers, delay, timeout)
bruteforcer(target, paramData, payloadList, encoding, headers, delay, timeout)
else:
scan(target, paramData, verbose, encoding, headers, delay, timeout, skipDOM, find, skip)
scan(target, paramData, encoding, headers, delay, timeout, skipDOM, find, skip)
else:
if target:
seedList.append(target)
for target in seedList:
print('%s Crawling the target' % run)
logger.run('Crawling the target')
scheme = urlparse(target).scheme
verboseOutput(scheme, 'scheme', verbose)
logger.debug('Target scheme: {}'.format(scheme))
host = urlparse(target).netloc
main_url = scheme + '://' + host
crawlingResult = photon(target, headers, level,
@@ -173,9 +185,9 @@ else:
for i in range(difference):
domURLs.append(0)
threadpool = concurrent.futures.ThreadPoolExecutor(max_workers=threadCount)
futures = (threadpool.submit(crawl, scheme, host, main_url, form, domURL, verbose,
futures = (threadpool.submit(crawl, scheme, host, main_url, form, domURL,
blindXSS, blindPayload, headers, delay, timeout, skipDOM, encoding) for form, domURL in zip(forms, domURLs))
for i, _ in enumerate(concurrent.futures.as_completed(futures)):
if i + 1 == len(forms) or (i + 1) % threadCount == 0:
print('%s Progress: %i/%i' % (info, i + 1, len(forms)), end='\r')
print()
logger.info('Progress: %i/%i\r' % (i + 1, len(forms)))
logger.no_format('')