commit 2039833a859b4a4e1ca00cec344f25dae2212aed Author: shengqi158 Date: Mon Dec 15 22:26:13 2014 -0800 create py securit audit tool diff --git a/README.md b/README.md new file mode 100644 index 0000000..9a93c99 --- /dev/null +++ b/README.md @@ -0,0 +1,41 @@ +1,python的语法树 + 根据王垠的python静态分析工具PySonar(https://github.com/yinwang0/pysonar2)得到静态语法树,这是一个庞大的dict结构,递归去除一些不必要的参数得到稍微简单点的一个语法树,以免影响后续分析。 + 这是文件test_lookuparg.py得到的解析树,body里面包含两个dict,每个dict都是一个函数,另外还有type字段表明这个节点的类型。 + 下图是一个函数的基本实现: + 首先是”type”:”FunctionDef” 表明这一段代码是函数定义,函数中则会有args,表明函数的参数,lineno是代码所在的行,name是函数名。更详细的接口文档见https://greentreesnakes.readthedocs.org/en/latest/nodes.html 在这里包含了各个结构的定义,分析整个树就可以依照这个来实现。 + 1.2危险函数+可控参数 + 危险函数有eval,system,popen等系统函数,同时也有咱们自定义的包含这些危险函数的函数,如果这些函数的参数是可控的,就会认为这行代码是有注入风险的,那么这个函数也是有注入风险的。 + 可控参数,首先会从函数参数入手,认为函数参数是可控的,分析程序会根据前面的语法树去分析代码结构,发现有将函数参数赋值的操作,并且这个赋值是简单的转换,这些简单的转换包含如下类型: + (1) 简单的取属性,如get取值,对request单独处理,只认ä=acp#onPopupPost() + ¸ºGET,POST,FILES可控,其他request字段如META,user,session,url等都是不可控的。 + (2) 字符串拼接,被拼接的字符串中包含可控参数,则认为赋值后的值也是可控的 + (3) 列表解析式,如果列表解析式是基于某个可控因子进行迭代的,则认为赋值后的列表也是可控的 + (4) 分片符取值,一般认为分片后的值也是可控的,当然这个也不绝对。 + (5) 一般的函数处理过程:a,函数是常见的字符串操作函数(str,encode,strip等)或者是简单的未过滤函数;b,处理属性;c,如果经过了未知的函数处理则将赋值后的值从可控列表中去掉。 + (6) 如果代码中的if中有exists,isdigit等带可控参数的的return语句,则将该参数从可控参数列表中去掉(if not os.path.isdir(parentPath):return None),或者将可控参数定死在某个范围之内的(if type not in ["R", "B"]:return HttpResponse("2")) + +2,使用方法 + 使用方法如下: + liaoxinxi@RCM-RSAS-V6-Dev ~/tools/auto_judge $ python judge_injection.py -h + Usage: judge_injection.py [options] + + Options: + -h, --help show this help message and exit + -d FILE_PATH, --dir=FILE_PATH + files to be checked + -c, --cmd cmd check + -s, --sql sql check + -a, --all cmd check and sql check + -v, --verbose print all unsafe func + +3,代码结构 + 一个judge_injection类,首先是初始化得到一个简化的python语法树,提炼出代码中包含的函数语句,分析每一行语句,在碰到函数的时候会调用look_up_arg函数,该函数就是可以得出函数中的可变变量。如果可变变量在危险函数中出现了,则认为该函数是危险的。 + +4,不足之处 +(1)目前只做了两层函数分析,对于django的web代码来说已经足够应付 +(2)对类的分析还不彻底 +(3)目前是基于单个文件来进行分析的,没有考虑模块导入 +(4)该模块还可以扩展为分析任意文件下载,任意文件删除等。 + + + diff --git a/color_log.py b/color_log.py new file mode 100644 index 0000000..58126d1 --- /dev/null +++ b/color_log.py @@ -0,0 +1,107 @@ +# +# Copyright (C) 2010, 2011 Vinay Sajip. All rights reserved. +# +import logging +import os + +class ColorizingStreamHandler(logging.StreamHandler): + # color names to indices + color_map = { + 'black': 0, + 'red': 1, + 'green': 2, + 'yellow': 3, + 'blue': 4, + 'magenta': 5, + 'cyan': 6, + 'white': 7, + } + + #levels to (background, foreground, bold/intense) + level_map = { + #logging.DEBUG: (None, 'blue', False), + logging.DEBUG: (None, 'white', False), + logging.INFO: (None, 'blue', False), + logging.WARNING: (None, 'yellow', False), + logging.ERROR: (None, 'red', False), + logging.CRITICAL: ('red', 'white', True), + } + csi = '\x1b[' + reset = '\x1b[0m' + + @property + def is_tty(self): + isatty = getattr(self.stream, 'isatty', None) + return isatty and isatty() + + def emit(self, record): + try: + message = self.format(record) + stream = self.stream + stream.write(message) + stream.write(getattr(self, 'terminator', '\n')) + self.flush() + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) + + def output_colorized(self, message): + self.stream.write(message) + + def colorize(self, message, record): + if record.levelno in self.level_map: + bg, fg, bold = self.level_map[record.levelno] + params = [] + if bg in self.color_map: + params.append(str(self.color_map[bg] + 40)) + if fg in self.color_map: + params.append(str(self.color_map[fg] + 30)) + if bold: + params.append('1') + if params: + message = ''.join((self.csi, ';'.join(params), + 'm', message, self.reset)) + return message + + def format(self, record): + message = logging.StreamHandler.format(self, record) + if self.is_tty: + # Don't colorize any traceback + parts = message.split('\n', 1) + parts[0] = self.colorize(parts[0], record) + message = '\n'.join(parts) + return message + +def main(): + root = logging.getLogger() + root.setLevel(logging.DEBUG) + root.addHandler(ColorizingStreamHandler()) + logging.debug('DEBUG') + logging.info('INFO') + logging.warning('WARNING') + logging.error('ERROR') + logging.critical('CRITICAL') + +def init_log(log_level): + """log_level = logging.NOTSET logging.DEBUG logging.INFO logging.ERROR logging.CRITICAL""" + root = logging.getLogger() + root.setLevel(log_level) + stream_handler = ColorizingStreamHandler() + formatter = logging.Formatter('[%(funcName)-10s %(lineno)d %(levelname)-8s] %(message)s') + #logging.StreamHandler.setFormatter(formatter) + + stream_handler.setFormatter(formatter) + #root.addHandler(ColorizingStreamHandler()) + root.addHandler(stream_handler) + return root + +if __name__ == '__main__': + # main() + logger = init_log(logging.DEBUG) + logger.debug('DEBUG..........................') + logger.info('INFO----------------------------') + logger.warning('WARNING======================') + logger.error('ERROR**************************') + logger.error('ERROR**************************%r' %({'value':'111'})) + #logger.error('ERROR**************************', {'value':'111'}) diff --git a/dump_python.py b/dump_python.py new file mode 100644 index 0000000..9910579 --- /dev/null +++ b/dump_python.py @@ -0,0 +1,650 @@ +import ast +import re +import sys +import codecs + +from json import JSONEncoder +from ast import * + + +# Is it Python 3? +is_python3 = hasattr(sys.version_info, 'major') and (sys.version_info.major == 3) + + +class AstEncoder(JSONEncoder): + def default(self, o): + if hasattr(o, '__dict__'): + d = o.__dict__ + # workaround: decode strings if it's not Python3 code + if not is_python3: + for k in d: + if isinstance(d[k], str): + if k == 's': + d[k] = lines[d['start']:d['end']] + else: + d[k] = d[k].decode(enc) + d['type'] = o.__class__.__name__ + return d + else: + return str(o) + + +enc = 'latin1' +lines = '' + +def parse_dump(filename, output, end_mark): + try: + if is_python3: + encoder = AstEncoder() + else: + encoder = AstEncoder(encoding=enc) + + tree = parse_file(filename) + encoded = encoder.encode(tree) + f = open(output, "w") + f.write(encoded) + f.close() + finally: + # write marker file to signal write end + f = open(end_mark, "w") + f.close() + +def parse_json(filename): + try: + if is_python3: + encoder = AstEncoder() + else: + encoder = AstEncoder(encoding=enc) + + tree = parse_file(filename) + encoded = encoder.encode(tree) + return encoded + except: + return "" + + +def parse_file(filename): + global enc, lines + enc, enc_len = detect_encoding(filename) + f = codecs.open(filename, 'r', enc) + lines = f.read() + + # remove BOM + lines = re.sub(u'\ufeff', ' ', lines) + + # replace the encoding decl by spaces to fool python parser + # otherwise you get 'encoding decl in unicode string' syntax error + # print('enc:', enc, 'enc_len', enc_len) + if enc_len > 0: + lines = re.sub('#.*coding\s*[:=]\s*[\w\d\-]+', '#' + ' ' * (enc_len-1), lines) + + f.close() + return parse_string(lines, filename) + + +def parse_string(string, filename=None): + tree = ast.parse(string) + improve_ast(tree, string) + if filename: + tree.filename = filename + return tree + + +# short function for experiments +def p(filename): + parse_dump(filename, "json1", "end1") + + +def detect_encoding(path): + fin = open(path, 'rb') + prefix = str(fin.read(80)) + encs = re.findall('#.*coding\s*[:=]\s*([\w\d\-]+)', prefix) + decl = re.findall('#.*coding\s*[:=]\s*[\w\d\-]+', prefix) + + if encs: + enc1 = encs[0] + enc_len = len(decl[0]) + try: + info = codecs.lookup(enc1) + # print('lookedup: ', info) + except LookupError: + # print('encoding not exist: ' + enc1) + return 'latin1', enc_len + return enc1, enc_len + else: + return 'latin1', -1 + + +#------------------------------------------------------------- +# improvements to the AST +#------------------------------------------------------------- +def improve_ast(node, s): + build_index_map(s) + improve_node(node, s) + + +# build global table 'idxmap' for lineno <-> index oonversion +def build_index_map(s): + global line_starts + idx = 0 + line_starts = [0] + while idx < len(s): + if s[idx] == '\n': + line_starts.append(idx + 1) + idx += 1 + + +# convert (line, col) to offset index +def map_idx(line, col): + return line_starts[line - 1] + col + + +# convert offset index into (line, col) +def map_line_col(idx): + line = 0 + for start in line_starts: + if idx < start: + break + line += 1 + col = idx - line_starts[line - 1] + return (line, col) + + +def improve_node(node, s): + if isinstance(node, list): + for n in node: + improve_node(n, s) + + elif isinstance(node, AST): + + find_start(node, s) + find_end(node, s) + add_missing_names(node, s) + + for f in node_fields(node): + improve_node(f, s) + + +def find_start(node, s): + ret = None # default value + + if hasattr(node, 'start'): + ret = node.start + + elif isinstance(node, list): + if node != []: + ret = find_start(node[0], s) + + elif isinstance(node, Module): + if node.body != []: + ret = find_start(node.body[0], s) + + elif isinstance(node, BinOp): + leftstart = find_start(node.left, s) + if leftstart != None: + ret = leftstart + else: + ret = map_idx(node.lineno, node.col_offset) + + elif hasattr(node, 'lineno'): + if node.col_offset >= 0: + ret = map_idx(node.lineno, node.col_offset) + else: # special case for """ strings + i = map_idx(node.lineno, node.col_offset) + while i > 0 and i + 2 < len(s) and s[i:i + 3] != '"""' and s[i:i + 3] != "'''": + i -= 1 + ret = i + else: + return None + + if ret == None and hasattr(node, 'lineno'): + raise TypeError("got None for node that has lineno", node) + + if isinstance(node, AST) and ret != None: + node.start = ret + + return ret + + +def find_end(node, s): + the_end = None + + if hasattr(node, 'end'): + return node.end + + elif isinstance(node, list): + if node != []: + the_end = find_end(node[-1], s) + + elif isinstance(node, Module): + if node.body != []: + the_end = find_end(node.body[-1], s) + + elif isinstance(node, Expr): + the_end = find_end(node.value, s) + + elif isinstance(node, Str): + i = find_start(node, s) + while s[i] != '"' and s[i] != "'": + i += 1 + + if i + 2 < len(s) and s[i:i + 3] == '"""': + q = '"""' + i += 3 + elif i + 2 < len(s) and s[i:i + 3] == "'''": + q = "'''" + i += 3 + elif s[i] == '"': + q = '"' + i += 1 + elif s[i] == "'": + q = "'" + i += 1 + else: + print("illegal quote:", i, s[i]) + q = '' + + if q != '': + the_end = end_seq(s, q, i) + + elif isinstance(node, Name): + the_end = find_start(node, s) + len(node.id) + + elif isinstance(node, Attribute): + the_end = end_seq(s, node.attr, find_end(node.value, s)) + + elif isinstance(node, FunctionDef): + the_end = find_end(node.body, s) + + elif isinstance(node, Lambda): + the_end = find_end(node.body, s) + + elif isinstance(node, ClassDef): + the_end = find_end(node.body, s) + + # print will be a Call in Python 3 + elif not is_python3 and isinstance(node, Print): + the_end = start_seq(s, '\n', find_start(node, s)) + + elif isinstance(node, Call): + start = find_end(node.func, s) + if start != None: + the_end = match_paren(s, '(', ')', start) + + elif isinstance(node, Yield): + the_end = find_end(node.value, s) + + elif isinstance(node, Return): + if node.value != None: + the_end = find_end(node.value, s) + else: + the_end = find_start(node, s) + len('return') + + elif (isinstance(node, For) or + isinstance(node, While) or + isinstance(node, If) or + isinstance(node, IfExp)): + if node.orelse != []: + the_end = find_end(node.orelse, s) + else: + the_end = find_end(node.body, s) + + elif isinstance(node, Assign) or isinstance(node, AugAssign): + the_end = find_end(node.value, s) + + elif isinstance(node, BinOp): + the_end = find_end(node.right, s) + + elif isinstance(node, BoolOp): + the_end = find_end(node.values[-1], s) + + elif isinstance(node, Compare): + the_end = find_end(node.comparators[-1], s) + + elif isinstance(node, UnaryOp): + the_end = find_end(node.operand, s) + + elif isinstance(node, Num): + the_end = find_start(node, s) + len(str(node.n)) + + elif isinstance(node, List): + the_end = match_paren(s, '[', ']', find_start(node, s)); + + elif isinstance(node, Subscript): + the_end = match_paren(s, '[', ']', find_start(node, s)); + + elif isinstance(node, Tuple): + if node.elts != []: + the_end = find_end(node.elts[-1], s) + + elif isinstance(node, Dict): + the_end = match_paren(s, '{', '}', find_start(node, s)); + + elif ((not is_python3 and isinstance(node, TryExcept)) or + (is_python3 and isinstance(node, Try))): + if node.orelse != []: + the_end = find_end(node.orelse, s) + elif node.handlers != []: + the_end = find_end(node.handlers, s) + else: + the_end = find_end(node.body, s) + + elif isinstance(node, ExceptHandler): + the_end = find_end(node.body, s) + + elif isinstance(node, Pass): + the_end = find_start(node, s) + len('pass') + + elif isinstance(node, Break): + the_end = find_start(node, s) + len('break') + + elif isinstance(node, Continue): + the_end = find_start(node, s) + len('continue') + + elif isinstance(node, Global): + the_end = start_seq(s, '\n', find_start(node, s)) + + elif isinstance(node, Import): + the_end = find_start(node, s) + len('import') + + elif isinstance(node, ImportFrom): + the_end = find_start(node, s) + len('from') + + else: # can't determine node end, set to 3 chars after start + start = find_start(node, s) + if start != None: + the_end = start + 3 + + if isinstance(node, AST) and the_end != None: + node.end = the_end + + return the_end + + +def add_missing_names(node, s): + if hasattr(node, 'extra_attr'): + return + + if isinstance(node, list): + for n in node: + add_missing_names(n, s) + + elif isinstance(node, ClassDef): + head = find_start(node, s) + start = s.find("class", head) + len("class") + if start != None: + node.name_node = str_to_name(s, start) + node._fields += ('name_node',) + + elif isinstance(node, FunctionDef): + # skip to "def" because it may contain decorators like @property + head = find_start(node, s) + start = s.find("def", head) + len("def") + if start != None: + node.name_node = str_to_name(s, start) + node._fields += ('name_node',) + + # keyword_start = find_start(node, s) + # node.keyword_node = str_to_name(s, keyword_start) + # node._fields += ('keyword_node',) + + if node.args.vararg != None: + if len(node.args.args) > 0: + vstart = find_end(node.args.args[-1], s) + else: + vstart = find_end(node.name_node, s) + if vstart != None: + vname = str_to_name(s, vstart) + node.vararg_name = vname + else: + node.vararg_name = None + node._fields += ('vararg_name',) + + if node.args.kwarg != None: + if len(node.args.args) > 0: + kstart = find_end(node.args.args[-1], s) + else: + kstart = find_end(node.vararg_name, s) + if kstart: + kname = str_to_name(s, kstart) + node.kwarg_name = kname + else: + node.kwarg_name = None + node._fields += ('kwarg_name',) + + elif isinstance(node, Attribute): + start = find_end(node.value, s) + if start is not None: + name = str_to_name(s, start) + node.attr_name = name + node._fields = ('value', 'attr_name') # remove attr for node size accuracy + + elif isinstance(node, Compare): + start = find_start(node, s) + if start is not None: + node.opsName = convert_ops(node.ops, s, start) + node._fields += ('opsName',) + + elif (isinstance(node, BoolOp) or + isinstance(node, BinOp) or + isinstance(node, UnaryOp) or + isinstance(node, AugAssign)): + if hasattr(node, 'left'): + start = find_end(node.left, s) + else: + start = find_start(node, s) + if start is not None: + ops = convert_ops([node.op], s, start) + else: + ops = [] + if ops != []: + node.op_node = ops[0] + node._fields += ('op_node',) + + elif isinstance(node, Num): + if isinstance(node.n, int) or (not is_python3 and isinstance(node.n, long)): + type = 'int' + node.n = str(node.n) + elif isinstance(node.n, float): + type = 'float' + node.n = str(node.n) + elif isinstance(node.n, complex): + type = 'complex' + node.real = node.n.real + node.imag = node.n.imag + node._fields += ('real', 'imag') + + node.num_type = type + node._fields += ('num_type',) + + node.extra_attr = True + + +#------------------------------------------------------------- +# utilities used by improve AST functions +#------------------------------------------------------------- + +# find a sequence in a string s, returning the start point +def start_seq(s, pat, start): + try: + return s.index(pat, start) + except ValueError: + return len(s) + + +# find a sequence in a string s, returning the end point +def end_seq(s, pat, start): + try: + return s.index(pat, start) + len(pat) + except ValueError: + return len(s) + + +# find matching close paren from start +def match_paren(s, open, close, start): + while start < len(s) and s[start] != open: + start += 1 + if start >= len(s): + return len(s) + + left = 1 + i = start + 1 + while left > 0 and i < len(s): + if s[i] == open: + left += 1 + elif s[i] == close: + left -= 1 + i += 1 + return i + + +# convert string to Name +def str_to_name(s, start): + i = start; + while i < len(s) and not is_alpha(s[i]): + i += 1 + name_start = i + + ret = [] + while i < len(s) and is_alpha(s[i]): + ret.append(s[i]) + i += 1 + name_end = i + + id1 = ''.join(ret) + if id1 == '': + return None + else: + name = Name(id1, None) + name.start = name_start + name.end = name_end + name.lineno, name.col_offset = map_line_col(name_start) + return name + + +def convert_ops(ops, s, start): + syms = [] + for op in ops: + if type(op) in ops_map: + syms.append(ops_map[type(op)]) + else: + print("[WARNING] operator %s is missing from ops_map, " + "please report the bug on GitHub" % op) + + i = start + j = 0 + ret = [] + while i < len(s) and j < len(syms): + oplen = len(syms[j]) + if s[i:i + oplen] == syms[j]: + op_node = Name(syms[j], None) + op_node.start = i + op_node.end = i + oplen + op_node.lineno, op_node.col_offset = map_line_col(i) + ret.append(op_node) + j += 1 + i = op_node.end + else: + i += 1 + return ret + + +# lookup table for operators for convert_ops +ops_map = { + # compare: + Eq: '==', + NotEq: '!=', + LtE: '<=', + Lt: '<', + GtE: '>=', + Gt: '>', + NotIn: 'not in', + In: 'in', + IsNot: 'is not', + Is: 'is', + + # BoolOp + Or: 'or', + And: 'and', + Not: 'not', + Invert: '~', + + # bit operators + BitOr: '|', + BitAnd: '&', + BitXor: '^', + RShift: '>>', + LShift: '<<', + + + # BinOp + Add: '+', + Sub: '-', + Mult: '*', + Div: '/', + FloorDiv: '//', + Mod: '%', + Pow: '**', + + # UnaryOp + USub: '-', + UAdd: '+', +} + + +# get list of fields from a node +def node_fields(node): + ret = [] + for field in node._fields: + if field != 'ctx' and hasattr(node, field): + ret.append(getattr(node, field)) + return ret + + +# get full source text where the node is from +def node_source(node): + if hasattr(node, 'node_source'): + return node.node_source + else: + return None + + +# utility for getting exact source code part of the node +def src(node): + return node.node_source[node.start: node.end] + + +def start(node): + if hasattr(node, 'start'): + return node.start + else: + return 0 + + +def end(node): + if hasattr(node, 'end'): + return node.end + else: + return None + + +def is_alpha(c): + return (c == '_' + or ('0' <= c <= '9') + or ('a' <= c <= 'z') + or ('A' <= c <= 'Z')) + + +# p('/Users/yinwang/Code/django/tests/invalid_models/invalid_models/models.py') +# p('/Users/yinwang/Dropbox/prog/pysonar2/tests/test-unicode/test1.py') +# p('/Users/yinwang/Code/cpython/Lib/lib2to3/tests/data/bom.py') +# p('/Users/yinwang/Code/cpython/Lib/test/test_decimal.py') +# p('/Users/yinwang/Code/cpython/Lib/test/test_pep3131.py') +# p('/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/tarfile.py') +# p('/Users/yinwang/Code/cpython/Lib/lib2to3/tests/data/false_encoding.py') +# p('/System/Library/Frameworks/Python.framework/Versions/2.5/lib/python2.5/test/test_marshal.py') +# p('/System/Library/Frameworks/Python.framework/Versions/2.5/lib/python2.5/lib-tk/Tix.py') +#p('/home/liaoxinxi/trunk/src/bvs/login/test_system.py') +#p('/home/liaoxinxi/trunk/src/www/npai/systemforpa.py') +p('libssh2_login_test.py.bak') +#p('arg.py') diff --git a/judge_injection.py b/judge_injection.py new file mode 100644 index 0000000..11c5a9b --- /dev/null +++ b/judge_injection.py @@ -0,0 +1,755 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi +# +# Created Time: Fri 21 Nov 2014 10:49:03 AM GMT-8 +# +# FileName: judge_injection.py +# +# Description: +# +# ChangeLog: + +import dump_python +import logging +import color_log +import json +import os +import re +import traceback +import sys +from optparse import OptionParser +from collections import OrderedDict + +logger = color_log.init_log(logging.DEBUG) +logger = color_log.init_log(logging.INFO) +logger = color_log.init_log(logging.WARNING) +logger = color_log.init_log(logging.ERROR) +DEBUG = False +args_ori = set([]) +is_arg_in = False +is_arg_return_op = False +UNSAFE_FUNCS = ["system", "popen", "call", "Popen", "getoutput", "getstatusoutput", \ + "eval", "spawnl", 'popen2', 'popen3', 'popen4' ] +FILE_UNSAFE_FUNCS = set() +FILE_SQL_UNSAFE_FUNCS = set() +UNTREATED_FUNS = set(['open','readline']) +STR_FUNCS = ['str','unicode','encode','strip','rstrip','lstrip','lower','upper','split','splitlines', 'replace','join'] +SAFE_FUNCS = [] +SQL_FUNCS = ['execute', 'raw'] +CMD_COUNT = 0 + + + +class judge_injection(object): + """根据语法树自动判断注入攻击""" + def __init__(self, filename, check_type): + try: + self.tree = dump_python.parse_json(filename) + except Exception,e: + self.tree = "{}" + print e + self.tree = json.loads(self.tree) + rec_decrease_tree(self.tree) + if DEBUG: +# rec_decrease_tree(self.tree) + try: + fd = open(filename+".json", 'w') + json.dump(self.tree, fd) + fd.flush() + fd.close() + except: + pass + self.filename = self.tree.get("filename") + self.start = self.tree.get("start") + self.body = self.tree.get("body") + self.funcs = [] + self.func_lines = {}#获取一个函数的执行代码 + self.check_type = check_type + with open(self.filename, 'r') as fd: + self.lines = fd.readlines() + self.unsafe_func = set()#记录本文件中自己的危险函数 + self.untreated_func = set()#记录那些函数参数到返回值是可控的函数 + self.record_unsafe_func = OrderedDict({}) #用于打印危险函数 + self.record_param = {} + logger.debug("filename:%s" %(self.filename)) + + + def get_risk_func(self): + """用于输入系统危险函数""" + funcs = ["os.system", "os.popen", "subprocess.call", "subprocess.Popen",\ + "commands.getoutput", "commands.getstatusoutput","pickle.loads"] + funcs = ["system", "popen", "call", "Popen", "getoutput", "getstatusoutput", \ + "eval", "spawnl", 'popen2', 'popen3', 'popen4'] + return funcs + + def get_func_objects(self, body): + """获取语法树中的函数结构们""" + for obj in body:#代码行 + if obj.get("type") == "FunctionDef": + self.funcs.append(obj) + logger.debug("func:%r" %(obj)) + elif obj.get('type') == 'ClassDef': + self.get_func_objects(obj.get('body')) + + return + + + + def get_func_lines(self, func, func_name): + """获取函数的执行的行,找到func""" + #if "body" in func: + if isinstance(func, dict) and 'body' in func: + lines = func.get('body') + elif isinstance(func, list): + lines = func + elif isinstance(func, dict) and func.get('type') == 'Call': + lines = [func] + else: + lines = [] + + for line in lines: + ast_body = line.get('body') + ast_orelse = line.get('orelse') + ast_handlers = line.get('handlers') + ast_test_comparators = line.get('test') + ast_args = line.get('args') +# print "line:",line + if "value" in line and line.get('value') and "func" in line.get("value"): + self.func_lines[func_name].append(line) + continue + elif line.get('type') == 'Call': + self.func_lines[func_name].append(line) + continue + + if ast_body: + self.get_func_lines(ast_body, func_name) + if ast_orelse: + self.get_func_lines(ast_orelse, func_name) + if ast_handlers: + self.get_func_lines(ast_handlers, func_name) + if ast_test_comparators and ast_test_comparators.get('comparators'): + self.get_func_lines(ast_test_comparators.get('comparators'), func_name) + if ast_test_comparators and ast_test_comparators.get('left'): + self.get_func_lines(ast_test_comparators.get('left'), func_name) + if ast_args: + self.get_func_lines(ast_args, func_name) + + + return + + def parse_func(self, func, analyse_all): + global leafs + global args_ori + global is_arg_in + global CMD_COUNT + global is_arg_return_op + is_arg_return_op = False + arg_leafs = [] + func_name = func.get("name") + logger.debug("function_name:%s" %(func_name)) + args_ori = set([arg.get("id") for arg in func.get('args').get("args")]) #arg.id + logger.debug("args:%s" %str(args_ori)) + self.func_lines.setdefault(func_name, []) + self.get_func_lines(func, func_name) + lines = self.func_lines[func_name] + logger.debug("func_lines:%r" %(lines)) +# if analyse_all: + look_up_arg(func, args_ori, arg_leafs,func_name) +# self.record_param.setdefault(func_name, args_ori) + self.record_param[func_name] = args_ori +# if not analyse_all: +# print 'func,record_param:', func_name,self.record_param.get(func_name) +# is_arg_return(func, args_ori) +# print 'is_arg_return_op:',is_arg_return_op +# if is_arg_in and not is_arg_return_op: +# if func_name not in ("__init__"): +# FILE_UNSAFE_FUNCS.add(func_name) +# print "func_lines:", lines +# print "func_:", func + + #对所有有函数执行的语句做进一步处理 + for line in lines: + #print "all:%r" %(line) +# print "*"*20 + arg_leafs = [] + is_arg_in = False + value = line.get("value") + lineno = line.get("lineno") + if (value and value.get("type") == "Call") or (line and line.get('type') == 'Call'): + logger.debug("value:%r" %(value)) + line_func = value.get("func") if value else line.get('func') + line_func = value if value and value.get('type')=='Call' else line + value_args = value.get('args') if value else line.get('args') + func_ids = [] + rec_get_func_ids(line_func, func_ids) + func_ids = set(func_ids) + find_all_leafs(value_args, arg_leafs) + logger.info("arg_leafs:%r" %(arg_leafs)) + logger.info("func_ids:%r" %(func_ids)) +# if analyse_all: +# look_up_arg(func, args_ori, arg_leafs,func_name) +# print "UNTREATED_FUNS", UNTREATED_FUNS + if self.check_type[0] and func_ids and (func_ids&((set(UNSAFE_FUNCS)|set(FILE_UNSAFE_FUNCS)))) and value_args: + if self.check_type[2] and arg_leafs: + print "CMD--FILE:%s,FUNCTION:%s,LINE:%s" %(self.filename, func_name, lineno ) + if set(arg_leafs)&set(self.record_param.get(func_name)): + if not is_arg_return_op and func_name not in ("__init__"): + FILE_UNSAFE_FUNCS.add(func_name) + self.record_unsafe_func.setdefault(lineno, {'func_name':func_name, 'args':args_ori, 'func_ids':func_ids}, ) + CMD_COUNT = CMD_COUNT + 1 + +# if self.check_type[1] and line_func.get("attr") in ['execute', 'raw'] and value_args: + if self.check_type[1] and func_ids and (func_ids&((set(['execute','raw'])|FILE_SQL_UNSAFE_FUNCS))) and value_args: + if self.check_type[2] and arg_leafs: + print "SQL--FILE:%s,FUNCTION:%s,LINE:%s" %(self.filename, func_name, lineno ) + if set(arg_leafs)&set(self.record_param.get(func_name)): + print self.lines[lineno - 1] + FILE_SQL_UNSAFE_FUNCS.add(func_name) + self.record_unsafe_func.setdefault(lineno, {'func_name':func_name, 'args':args_ori, 'func_ids':func_ids}, ) +# print "cmd_count:",CMD_COUNT + + def parse_py(self): + self.get_func_objects(self.body) + + for func in self.funcs: + self.parse_func(func, True) +# print "file_unsafe_func:", FILE_UNSAFE_FUNCS +# print "*****"*50 + for func in self.funcs: + self.parse_func(func, False) + +# print 'COUNT',CMD_COUNT + + def record_all_func(self): + for key, value in self.record_unsafe_func.iteritems(): + print "maybe injected File:%s,function:%s,line:%s,dangerous_func:%r" %(self.filename, value.get('func_name'), key, value.get('func_ids')) + logger.error("maybe injected File:%s,function:%s,line:%s,dangerous_func:%r" %(self.filename, value.get('func_name'), key, value.get('func_ids'))) + print self.lines[key - 1] + #print "FILE_UNSAFE_FUNCS",FILE_UNSAFE_FUNCS + + +def find_all_leafs(args, leafs): + + for arg in args: + find_arg_leafs(arg, leafs) + + +def find_arg_leafs(arg, leafs): + """通过递归找到全所有子节点,历史原因复数格式不修正""" + fields = arg.get("_fields") + _type = arg.get('type') + if _type == "Attribute": + find_arg_leafs(arg.get('value'), leafs) + if _type == "Name": + leafs.append(arg.get('id')) + if _type == 'Call': + func_ids = [] + rec_get_func_ids(arg.get('func'), func_ids) + if set(func_ids)&(set(STR_FUNCS)|set(UNTREATED_FUNS)|set(UNSAFE_FUNCS)|set(FILE_UNSAFE_FUNCS)): + parent, topids = {}, [] + rec_get_attr_top_id(arg.get('func'), parent, topids) + if topids: + leafs.append(topids[0]) + for arg_item in arg.get('args'): + find_arg_leafs(arg_item, leafs) + if arg.get('func') and arg.get('func').get('type') != 'Name': + find_arg_leafs(arg.get('func'), leafs) + if _type == 'Subscript': + find_arg_leafs(arg.get('value'), leafs) + if _type == "BinOp" and fields: + if "right" in fields: + if arg.get('right').get('type') == "Name": + right_id = arg.get("right").get("id") + if right_id: + leafs.append(right_id) + elif arg.get('right').get('type') == 'Tuple': + for elt in arg.get('right').get('elts'): + find_arg_leafs(elt, leafs) + + if "left" in fields and not arg.get("left").get("_fields"): + left_id = arg.get('left').get('id') + if left_id: + leafs.append(left_id) + if "left" in fields and arg.get("left").get("_fields"): + find_arg_leafs(arg.get("left"), leafs) + + return + +def is_arg_return(func, args_ori): + """ + 判断是否有对arg参数的可控性判断,比如判读是否数字,是否file等 + """ + global is_arg_return_op + + if isinstance(func, dict): + lines = func.get('body') + elif isinstance(func, list): + lines = func + + for line in lines: + is_return = False + is_arg_in = False + is_param = False + ast_body = line.get('body') + ast_orelse = line.get('orelse') + ast_handlers = line.get('handlers') + if line.get('type') == "If": + for body in line.get('body'): + if body.get('type') == "Return": + is_return = True + test = line.get('test') + if line.get('test') and line.get('test').get('type') == "UnaryOp": + operand = line.get('test').get('operand') + if operand: + args = [] + rec_find_args(line.get('test'), args) + if set(args)&set(args_ori): + is_arg_in = True + elif test and test.get('type') == 'Compare': + args = [] + for key,value in test.iteritems(): + if key == 'left': + if test[key].get('type') == 'Name': + args = [test[key].get('id')] + if key == 'comparators': + for comparator in test[key]: + if comparator.get('type') in ("List", 'Tuple'): + for elt in comparator.get('elts'): + if elt.get('type') == 'Name': + is_param = True + + if set(args)&set(args_ori) and not is_param: + is_arg_in = True + + is_arg_return_op = is_return&is_arg_in + if is_arg_return_op:#找到即返回 + logger.info("is_arg_return:%r" %(line)) + return + if ast_body: + is_arg_return(ast_body, args_ori) +# if ast_orelse: +# is_arg_return(ast_orelse, args_ori) +# if ast_handlers: +# is_arg_return(ast_handlers, args_ori) + +def rec_find_args(operand, args): + if isinstance(operand, list) or isinstance(operand, tuple): + find_all_leafs(operand, args) + elif isinstance(operand, dict): + if operand.get('type') == 'Call': + if "args" in operand: + find_all_leafs(operand.get('args'), args) + if "value" in operand.get('func'): + rec_find_args(operand.get('func').get('value'), args) + elif operand.get('type') == 'UnaryOp':# not param判断中 + rec_find_args(operand.get('operand'), args) + elif operand.get('type') == 'BinOp': + find_arg_leafs(operand, args) + + else: + return + +def rec_get_attr_top_id(func, parent, ids):#获取最顶端的值,eg:request + """ + func = {u'_fields': [u'value', u'attr_name'], u'type': u'Attribute', u'attr': u'get', u'value': {u'_fields': [u'value', u'attr_name'], u'type': u'Attribute', u'attr': u'POST', u'value': {u'type': u'Name', u'lineno': 15, u'id': u'request'}, u'lineno': 15}, u'lineno': 15} + ids: 用于回传结果,只有一个 + """ + if func.get('type') == 'Name': + ids.append(func.get('id')) + if func.get('type') == 'Attribute': + parent.update(func) + rec_get_attr_top_id(func.get('value'), parent, ids) + if func.get('type') == 'Call': + parent.update(func) + rec_get_attr_top_id(func.get('func'), parent, ids) + return + + + +def look_up_arg(func, args_ori, args, func_name): + """递归找出危险函数中的参数是否属于函数参数入口的""" + """ + func 代表测试的函数,args_ori是要被测试的函数的参数,args则是危险函数中的参数 + """ + global is_arg_in + if isinstance(func, dict) and 'body' in func: + lines = func.get('body') + elif isinstance(func, list): + lines = func + elif isinstance(func, dict) and func.get('type') == 'Call': + lines = [func] + else: + lines = [] + + for line in lines: +# print 'look_up_arg:line:',line + ast_body = line.get('body') + ast_orelse = line.get('orelse') + ast_handlers = line.get('handlers') + ast_test = line.get('test') + ast_args = line.get('args') + #处理单纯属性 + if line.get('type') == 'Assign': + target_ids = [target.get("id") for target in line.get("targets") if target.get('id') ] + else: + target_ids = [] + + if line.get("type") == "Assign" and "value" in line and line.get("value").get("type")=="Name": + if target_ids and line.get("value").get("id") in args_ori: + args_ori.update(target_ids) + logger.info("In Assign,Name add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + + if line.get("type") == "Assign" and "value" in line and line.get("value").get("type")=="Attribute": + value_func = line.get('value').get('value') + if value_func and value_func.get("type") == 'Name': + if target_ids and value_func.get("id") in args_ori: + args_ori.update(target_ids) + logger.info("In Assign,Attr add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + + else: + topids = [] + parent = {} + rec_get_attr_top_id(value_func, parent, topids) + if (set(topids)&set(args_ori)): + if topids and topids[0].lower() == 'request': + if parent and parent.get('type')=='Attribute' and parent.get('attr') in ['GET','POST','FILES']: + args_ori.update(target_ids) + logger.info("In Assign,Attr add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + elif parent and parent.get('type')=='Attribute': + args_ori.difference_update(set(target_ids)) + logger.warn("In Assign,Attr delete (%r) from (%r) where line=(%r)***************************** line=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + + #处理字符串拼接过程 + if line.get("type") == "Assign" and "value" in line and line.get("value").get("type")=="BinOp": +# right = line.get('value').get('right') +# if right.get('type') == 'Tuple': +# rec_find_args(right.get('elts')) + leafs = [] + find_arg_leafs(line.get("value"), leafs) + if (set(args_ori)&set(leafs)): + if target_ids: + args_ori.update(target_ids) + logger.info("In Assign,BinOp add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + #列表解析式 + if line.get("type") == "Assign" and "value" in line and line.get("value").get("type") in ("ListComp","SetComp"): + generators = line.get('value').get('generators') + leafs = [] + for generator in generators: + find_arg_leafs(generator.get('iter'), leafs) + if target_ids and (set(args_ori)&set(leafs)): + args_ori.update(target_ids) + logger.info("In Assign,ListComp,SetComp add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + + + #处理Subscript分片符情况 + if line.get('type') == 'Assign' and 'value' in line and line.get('value').get('type')=='Subscript': + value_type = line.get('value').get('value').get('type') + value_func_ids = [] + rec_get_func_ids(line.get('value').get('value'), value_func_ids) + value_func_ids = set(value_func_ids) + value_arg_ids = [] + find_arg_leafs(line.get('value').get('value'), value_arg_ids) + if value_type == 'Attribute': + if value_func_ids and value_func_ids.issubset((set(['POST','GET','FILES'])|set(STR_FUNCS))): + if target_ids and not (set(value_arg_ids)&set(target_ids)): + args_ori.update(target_ids) + logger.info("In Assign,Subscript add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + + + #处理调用函数后的赋值,像str,get取值都保留 + if line.get("type") == "Assign" and "value" in line and line.get("value").get("type")=="Call": + value_arg_ids = [] + rec_find_args(line.get('value'), value_arg_ids) + value_func_ids = [] + rec_get_func_ids(line.get('value').get('func'), value_func_ids) + value_func_ids = set(value_func_ids) + value_func_type = line.get("value").get('func').get('type') + value_func = line.get('value').get('func') + (topids, parent) = ([], {}) + rec_get_attr_top_id(value_func, parent, topids) + + if value_arg_ids or topids: + #处理普通方法 + if value_func_type == 'Name' and (set(value_arg_ids)&set(args_ori)): + + if target_ids and value_func_ids and value_func_ids.issubset((set(STR_FUNCS)|set(UNTREATED_FUNS))): +# args_ori = args_ori|set(target_ids) + args_ori.update(target_ids) + logger.info("In Assign,Call:Name add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + elif target_ids and value_func_ids and (value_func_ids&((set(UNSAFE_FUNCS)|set(FILE_UNSAFE_FUNCS)))): + is_arg_in = True + elif target_ids: + args_ori.difference_update(target_ids) + logger.warn("In Assign,Call delete (%r) from (%r) where line=(%r)***************************** type=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) +# for target in target_ids:#处理cmd=int(cmd) 这种情况 +# args_ori.difference_update(target_ids) +# if target in args_ori: +# args_ori.discard(target) +# logger.info("arg_id,assign31:%r,args_ori:%r" %(value_arg_ids, args_ori)) + + elif value_func_type == 'Attribute':#处理属性方法,如从dict取值 + + if (set(topids)&set(args_ori)): + if topids[0].lower() == 'request': + if parent and parent.get('type')=='Attribute' and parent.get('attr') in ['GET','POST','FILES']: + if target_ids and not (set(value_arg_ids)&set(target_ids)): + args_ori.update(target_ids) + logger.info("In Assign,Call:attr add (%r) to (%r) where line=(%r) type=(%r)" %(target_ids,args_ori,parent.get('lineno'), line)) + elif parent and parent.get('type')=='Attribute': + args_ori.difference_update(set(target_ids))#去除target_ids + logger.warn("In Assign,Call:attr delete (%r) from (%r) where line=(%r)***************************** type=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + + elif value_func_ids and value_func_ids.issubset(set(STR_FUNCS)|set(UNTREATED_FUNS)) and (set(value_arg_ids)&set(args_ori)): + if target_ids and not (set(value_arg_ids)&set(target_ids)): + args_ori.update(target_ids) + logger.info("In Assign,Call:attr add (%r) to (%r) where line=(%r) type=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + else: + if target_ids and not (set(value_arg_ids)&set(target_ids)): + args_ori.update(target_ids) + logger.info("In Assign,Call:attr add (%r) to (%r) where line=(%r) type=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + #处理r=unicode(s).encode('utf8') + elif value_func_ids and value_func_ids.issubset(set(STR_FUNCS)|set(UNTREATED_FUNS)) and (set(value_arg_ids)&set(args_ori)): + if target_ids and not (set(value_arg_ids)&set(target_ids)): + args_ori.update(target_ids) + logger.info("In Assign,Call:attr add (%r) to (%r) where line=(%r) type=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + + + elif value_func_ids and (value_func_ids&(set(UNSAFE_FUNCS)|set(FILE_UNSAFE_FUNCS))):#处理危险函数 + leafs = [] + leafs = value_arg_ids + if set(leafs)&set(args_ori): + is_arg_in = True + + if line.get('type') == 'Return' and 'value' in line and line.get('value'): + value_id = line.get('value').get('id') + if value_id and value_id in args_ori : + print 'untrited_func_name',func_name + UNTREATED_FUNS.add(func_name) + + if line.get('type') == 'For': + iter_args = [] + find_arg_leafs(line.get('iter'), iter_args) + if set(iter_args)&set(args_ori): + targets = [] + find_arg_leafs(line.get('target'), targets) + if targets: + args_ori.update(targets) + logger.info("In For Call add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line)) + + if line.get("type") == "Expr" and "value" in line and line.get("value").get("type")=="Call": + value_arg_ids = [] + rec_find_args(line.get('value'), value_arg_ids) + if set(value_arg_ids)&set(args_ori): + is_arg_in = True + + if line.get('type') == 'Call': #处理if语句中中eval类似函数 + func_ids = [] + rec_get_func_ids(line.get('func'), func_ids) + args_tmp = [] + rec_find_args(line, args_tmp) + if (set(args_tmp)&args_ori) and func_ids and (set(func_ids)&(set(UNSAFE_FUNCS)|set(FILE_UNSAFE_FUNCS))): + is_arg_in = True + logger.info('type:call') +# if line.get('type') == 'Ififif': + if line.get('type') == 'If': + is_if_return = False + is_if_param = False + is_in_param = False + + if_judge_func = set(['exists','isfile','isdir','isabs','isdigit']) + for body in line.get('body'): + if body.get('type') == 'Return': + is_if_return = True + test = line.get('test') + if test and test.get('type') == 'UnaryOp': + operand = test.get('operand') + args_tmp = [] + if operand: + rec_find_args(operand, args_tmp) + if set(args_tmp)&set(args_ori): + is_if_param = True + func_ids = [] + rec_get_func_ids(operand, func_ids) + if set(func_ids)&if_judge_func and is_if_return and is_if_param: + args_ori.difference_update(args_tmp) + logger.warn("In If delete (%r) from (%r) where line=(%r)***************************** type=(%r)" %(args_tmp,args_ori,test.get('lineno'),test.get('type'))) + + if test and test.get('type') == 'Compare': + args_tmp = [] + for key,value in test.iteritems(): + if key == 'left': + if test[key].get('type') == 'Name': + args_tmp = [test[key].get('id')] + if key == 'comparators': + for comparator in test[key]: + if comparator.get('type') in ('List', 'Tuple'): + for elt in comparator.get('elts'): + if elt.get('type') == 'Name' and elt.get('id') in args_ori: + is_in_param = True + if set(args_tmp)&set(args_ori) and is_if_return and not is_in_param: + args_ori.difference_update(args_tmp) + logger.warn("In If delete (%r) from (%r) where line=(%r)***************************** type=(%r)" %(args_tmp,args_ori,test.get('lineno'),test.get('type'))) + + if ast_body: + look_up_arg(ast_body, args_ori, args, func_name) + if ast_orelse: + look_up_arg(ast_orelse, args_ori, args, func_name) + if ast_handlers: + look_up_arg(ast_handlers, args_ori, args, func_name) + if ast_test and ast_test.get('comparators'): + look_up_arg(ast_test.get('comparators'),args_ori, args, func_name) + if ast_test and ast_test.get('left'): + look_up_arg(ast_test.get('left'),args_ori, args, func_name) + if ast_args : + look_up_arg(ast_args, args_ori, args, func_name) + + return + +def get_func_id(func, func_ids): + """获取被调用函数的名称""" + if func.get("type") == "Name": + func_id = func.get('id') + elif func.get('type') == 'Attribute': + func_id = func.get('attr') + else: + func_id = None + if func_id: + func_ids.append(func_id) + + +def rec_get_func_ids(func, func_ids):#处理连续的unicode.encode等 + if func.get('type') in ("Name","Attribute"): + get_func_id(func, func_ids) + if 'value' in func and func.get('value').get('func'): + rec_get_func_ids(func.get('value').get('func'), func_ids) + if func.get('type') == 'Call': + rec_get_func_ids(func.get('func'), func_ids) + for args in func.get('args'): + if args.get('type') != 'Name': + rec_get_func_ids(args, func_ids) + + return + + + + +""" +def decrease_tree(tree): + tree = {k:v for k, v in tree.iteritems() if k not in ['col_offset', 'start', 'end', 'ctx', 'extra_attr']} + for key, value in tree.iteritems(): + if isinstance(value, dict): + decrease_tree(value) + if isinstance(value, list): + for l in value: + if isinstance(l, dict): + decrease_tree(l) + return tree +""" + +def rec_decrease_tree(tree): + if isinstance(tree, dict): + for key in tree.keys(): + if key in ['col_offset', 'start', 'end', 'ctx', 'extra_attr', 'attr_name']: + del(tree[key]) + else: + + if isinstance(tree[key], dict): + rec_decrease_tree(tree[key]) + if isinstance(tree[key], list): + for l in tree[key]: + rec_decrease_tree(l) + +def walk_dir(file_path): + files = [] + if os.path.isfile(file_path): + files = [file_path] + elif os.path.isdir(file_path): + for root, dirs, filenames in os.walk(file_path): + for filename in filenames: +# print 'walk_dir:filename', filename + if re.match(".*\.py$", filename.strip()): + files.append(root+"/"+filename) + + return files + +def print_func(filename, lineno): + with open(filename, 'r') as fd: + lines = fd.readlines() + print lines[lineno-1] + +def usage(): + print """用途:本程序主要用于测试py代码中命令注入和sql注入\n用法:python judge_injection.py -d path + path即为需要测试的目录""" + +def main(): + parser = OptionParser() + parser.add_option("-d", "--dir", dest="file_path",help="files to be checked") + parser.add_option("-c", "--cmd", action="store_true", dest="cmd_check",help="cmd check", default=False) + parser.add_option("-s", "--sql", action="store_true", dest="sql_check",help="sql check", default=False) + parser.add_option("-a", "--all", action="store_true", dest="cmd_sql_check",help="cmd check and sql check", default=False) + parser.add_option("-v", "--verbose", action="store_true", dest="verbose",help="print all unsafe func", default=False) + (options, args) = parser.parse_args() + file_path = options.file_path + cmd_check = options.cmd_check + sql_check = options.sql_check + cmd_sql_check = options.cmd_sql_check + verbose = options.verbose +# print "option:", options +# print file_path +# print cmd_check +# print sql_check +# sys.exit() + if cmd_sql_check: + cmd_check = True + sql_check = True + check_type = (cmd_check,sql_check, verbose) + if not file_path: + usage() + sys.exit() + else: + if (os.path.isfile(file_path) or os.path.isdir(file_path)): + files = walk_dir(file_path) + else: + print "您输入的文件或者路径不存在" + sys.exit() + for filename in files: + print "filename",filename + try: + judge = judge_injection(filename, check_type) + judge.parse_py() + judge.record_all_func() + except Exception, e: + traceback.print_exc() + + + +if __name__ == "__main__": +# filename = "libssh2_login_test.py" +# filename = "libssh2_login_test.py.bak" +# filename = "/home/liaoxinxi/trunk/src/www/npai/systemforpa.py" +# filename = "/home/liaoxinxi/trunk/src/www/npai/getTplVulLibforpa.py" +# filename = "arg.py" +# filename = "test3.py" + #rec_decrease_tree(line) + file_path = "/home/liaoxinxi/trunk/src/www/npai" + file_path = "/home/liaoxinxi/trunk/src/www/" +# files = walk_dir(file_path) + files = ["libssh2_login_test.py.bak"] +# files = ["testsql.py"] +# files = ["test_cmd2.py"] +# check_type = (True,False) +# for filename in files: +# print "filename",filename +# try: +# judge = judge_injection(filename, check_type) +# judge.parse_py() +# except Exception, e: +# traceback.print_exc() + main() + + + + + diff --git a/test10.py b/test10.py new file mode 100644 index 0000000..7e5f0d9 --- /dev/null +++ b/test10.py @@ -0,0 +1,33 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@ +# +# Created Time: Thu 27 Nov 2014 03:18:52 PM GMT-8 +# +# FileName: test10.py +# +# Description: +# +# ChangeLog: +def execute_cmd2(cmd2): + exe_cmd = "ls;%s" %(cmd2) + os.system(exe_cmd) + +def execute_cmd2(cmd2): + exe_cmd = "ls;%s" %(cmd2) + os.popen(exe_cmd) + +def exe_cmd10(cmd10): + cmd = str(cmd10) + os.system(cmd) + +def execute_cmd(cmd): + cmd = int(cmd) + return os.system(cmd) + +def execute_cmd_no_convert(cmd, num): + right_cmd = get_right_cmd(num) + result = os.system(cmd + ";ls" + right_cmd) + return result + diff --git a/test2fun.py b/test2fun.py new file mode 100644 index 0000000..bf78666 --- /dev/null +++ b/test2fun.py @@ -0,0 +1,21 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@ +# +# Created Time: Tue 02 Dec 2014 05:48:28 PM GMT-8 +# +# FileName: test2fun.py +# +# Description: +# +# ChangeLog: +def exe2fun_cmd(cmd1): + r = exe_file(cmd1) + return r + +def exe_file(cmd): + result = os.system(cmd) + return result + + diff --git a/test3.py b/test3.py new file mode 100644 index 0000000..b683077 --- /dev/null +++ b/test3.py @@ -0,0 +1,26 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@ +# +# Created Time: Tue 25 Nov 2014 06:27:41 PM GMT-8 +# +# FileName: test3.py +# +# Description: +# +# ChangeLog: +import os +def setCertificate(entity): + os.system("cp %s /tmp/xx" %(entity)) + +def execute_cmd3(cmd3): + value = os.popen("cmd3:%s" %(cmd3)) + return value + +def execute_cmd_no_convert(cmd, num): + right_cmd = get_right_cmd(num) + result = os.system(cmd + ";ls" + right_cmd) + return result + + diff --git a/test_cmd2.py b/test_cmd2.py new file mode 100644 index 0000000..e2fc6db --- /dev/null +++ b/test_cmd2.py @@ -0,0 +1,27 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@ +# +# Created Time: Fri 28 Nov 2014 06:35:55 PM GMT-8 +# +# FileName: test_cmd2.py +# +# Description: +# +# ChangeLog: + +def execute_cmd_no_convert(cmd, num): + right_cmd = get_right_cmd(num) + result = os.system(cmd + ";ls" + right_cmd) + return result + +def execute_cmd2(cmd2): + """不好找""" + exe_cmd = "ls;%s" %(cmd2) + os.system(exe_cmd) + + +def exe_cmd3(cmd3): + cmd = "ls" + os.system(cmd,cmd3) diff --git a/test_com.py b/test_com.py new file mode 100644 index 0000000..8fa068f --- /dev/null +++ b/test_com.py @@ -0,0 +1,151 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@nsfocus.com +# +# Created Time: Fri 12 Dec 2014 02:52:11 PM GMT-8 +# +# FileName: test_com.py +# +# Description: +# +# ChangeLog: +#对path判断了就去掉path +@csrf_exempt +@auto_opt +def update(request): + file_obj = request.FILES.get('filename','') + name = file_obj.name + file = '/tmp/'+name + file_handler = open(file,'w') + for chunk in file_obj.chunks(): + file_handler.write(chunk) + file_handler.close() + path = file + if not os.path.isfile(path): + return HttpResponse("1") + cmd = "/opt/aurora/scripts/update/update install " + path + try: + ret = os.system(cmd) + ret = str(int(ret)/256) + if ret == "0": + result = "0" + else: + result = "1" + except: + result = "1" + return HttpResponse(result) + +@csrf_exempt +@auto_opt +def setProductType(request): + type = request.POST.get("type") + if not type: + return HttpResponse("1") + if type not in ["RSAS", "BVS","ICSScan"]: + return HttpResponse("2") + cmd = "sh /opt/nsfocus/scripts/set_product_type.sh " + type + try: + status = os.system(cmd) + ret = str(int(status)/256) + except: + ret = "3" + return HttpResponse(ret) +@login_required +@permission_required("accounts.activex") +@transaction.autocommit +def activexSubmmit(request): + import xml2analyse + warnmax=maxTasks() + warnmsg={} + if warnmax:# and not task_id: + warnmsg=warnmax + logger.operationLog(request,False,'active_add_task','',_('达到最大任务数')) + else: + addr=request.META["REMOTE_ADDR"] + addr=str(unicode(addr).encode("utf-8")) + #uuid=_e(request,'tpl') + uuid = getTmpUuid(request) + filestr=addr+'_'+uuid+'_chk.xml' + rpath=r'/opt/aurora/var/tasks/' + srcpath=os.path.join(r'/tmp/task_tmp',filestr) + if not os.path.exists(srcpath): + return HttpResponse(_('Active解析未生成相应文件')) + vultaskid=-2 + admin_id=int(unicode(request.user.id).encode("utf-8")) + user_account=str(unicode(request.user.username).encode("utf-8")) + taskType=2 + exec_type=4 + name=_e(request,'name') + create_time=datetime.datetime.now() + begin_time=create_time + taskdesc = _e(request,'taskdesc') + p=Task(name=name,admin_id=admin_id,task_type=taskType,exec_type=exec_type,user_account=user_account,create_time=create_time,begin_time=begin_time,status=3,taskdesc=taskdesc) + p.save() + vultaskid=p.id + if vultaskid>-2: + writeXml(request,vultaskid) + + xmlpath=os.path.join(rpath,str(vultaskid),filestr) + cmd='sudo cp %s %s'%(srcpath,xmlpath) + os.system(cmd) + + try: + process_uuid = 11111 #进度的哈希值,activesX用不到,这里随意构造一个,sunchongxin + errorlist=xml2analyse.importOfflineRes(vultaskid,xmlpath,process_uuid) + #afterScan.sendreport(str(vultaskid)) + execute_afterscan(vultaskid) + + except Exception,e: + errorlist={} + if errorlist: + result=errorlist["result"] + if result=="success": + warnmsg={'success':_('Activex任务(%s)创建执行成功'%vultaskid)} + logger.operationLog(request,True,'active_add_task',_('任务号:%s'%vultaskid),'') + p = Task.objects.get(id=vultaskid) + p.status = 15 + p.save() + else: + data=errorlist["data"][0][1] + warnmsg={'error':_('Activex任务(%s)创建执行失败,失败原因:%s'%(name,data))} + logger.operationLog(request,False,'active_add_task','','') + Task.objects.filter(id=vultaskid).delete() + rtpath=os.path.join(rpath,str(vultaskid)) + cmd='sudo rm -rf %s'%rtpath + os.system(cmd) + else: + warnmsg={'error':_('Activex任务创建执行失败')} + logger.operationLog(request,False,'active_add_task','','') + Task.objects.filter(id=vultaskid).delete() + rtpath=os.path.join(rpath,str(vultaskid)) + cmd='sudo rm -rf %s'%rtpath + os.system(cmd) + + c={'warnmsg':warnmsg} + c.update(csrf(request)) + return render(c,'taskstatus.html') +#id不应该在变量里 +def continueTask(request): + id = request.POST.get('id','') + manageop_flag = request.POST.get('manageop_flag','') + #from task.comm import listToDBArray + #from system.models import Distribution + try: + id=int(id) + except: + retmsg='err' + optmsg = u'%s任务号为空或不正确'%str(id) + logger.operationLog(request,False,'list_continue_task','%s任务号为空或不正确'%str(id),'') + if manageop_flag: + return HttpResponse(retmsg) + else: + return getList(request,optmsg = optmsg) + if manageop_flag: + tobjs=Task.objects.filter(distri_pid=int(id)) + if tobjs: + id=tobjs[0].id + else: + retmsg='err' + logger.operationLog(request,False,'list_pause_task','分布式父任务号%s任务不存在'%str(id),'') + return HttpResponse(retmsg) diff --git a/test_judge.py b/test_judge.py new file mode 100644 index 0000000..503b458 --- /dev/null +++ b/test_judge.py @@ -0,0 +1,27 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@nsfocus.com +# +# Created Time: Thu 04 Dec 2014 11:09:23 AM GMT-8 +# +# FileName: test_judge.py +# +# Description: +# +# ChangeLog: +from judge_injection import * + +def test_rec_get_func_ids(): + f0 = {u'starargs': None, u'args': [{u's': u'"utf-8"', u'type': u'Str', u'lineno': 14}], u'lineno': 14, u'func': {u'_fields': [u'value', u'attr_name'], u'type': u'Attribute', u'attr': u'encode', u'value': {u'starargs': None, u'args': [{u'type': u'Name', u'lineno': 14, u'id': u'str'}], u'lineno': 14, u'func': {u'type': u'Name', u'lineno': 14, u'id': u'unicode'}, u'kwargs': None, u'keywords': [], u'type': u'Call'}, u'lineno': 14}, u'kwargs': None, u'keywords': [], u'type': u'Call'} + fs = [] + f1 = {u'starargs': None, u'args': [{u'starargs': None, u'args': [{u'id': u'user', u'lineno': 8, u'type': u'Name'}], u'lineno': 8, u'func': {u'attr': u'get', u'value': {u'starargs': None, u'args': [{u'lineno': 8, u'type': u'Name', u'id': u'cmd'}], u'lineno': 8, u'func': {u'lineno': 8, u'type': u'Name', u'id': u'eval'}, u'kwargs': None, u'keywords': [], u'type': u'Call'}, u'lineno': 8, u'_fields': [u'value', u'attr_name'], u'type': u'Attribute'}, u'kwargs': None, u'keywords': [], u'type': u'Call'}], u'lineno': 8, u'func': {u'lineno': 8, u'type': u'Name', u'id': u'type'}, u'kwargs': None, u'keywords': [], u'type': u'Call'} + rec_get_func_ids(f0, fs) + print 'fs:', fs + rec_get_func_ids(f1, fs) + print 'fs:', fs + + + +if __name__ == "__main__": + test_rec_get_func_ids() diff --git a/test_lookuparg.py b/test_lookuparg.py new file mode 100644 index 0000000..d6f867d --- /dev/null +++ b/test_lookuparg.py @@ -0,0 +1,31 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@ +# +# Created Time: Fri 12 Dec 2014 10:02:35 AM GMT-8 +# +# FileName: test_lookuparg.py +# +# Description: +# +# ChangeLog: + +def check_subdomains(str,single_target): + str = str.replace(';',';').replace(',',',') + status = True + msg = "" + p = re.compile(",|;|\n|\r\n| ") + scan_array = p.split(str) + subdomins = [x for x in scan_array if not x in [u'', u' '] ] + for subdomin in subdomins: + subdomin = subdomin.strip() + return subdomin + +def is_this_subdomain(domain,subdomain): + try: + tmp_list = subdomain.split('.') + subdomain_str = ('%s.%s') % (tmp_list[-2], tmp_list[-1]) + subdomain_str1 = ('%s.%s') % (tmp_list, tmp_list) + except: + return False diff --git a/test_xss.py b/test_xss.py new file mode 100644 index 0000000..b5def3b --- /dev/null +++ b/test_xss.py @@ -0,0 +1,22 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@ +# +# Created Time: Mon 01 Dec 2014 03:30:30 PM GMT-8 +# +# FileName: test_xss.py +# +# Description: +# +# ChangeLog: +def hi_xss(request): + name = request.GET['name'] + ret = HttpResponse('hello %s' %(name)) + return ret + +def read_file(request): + filename = request.GET['filename'] + content = open(filename).read() + ret = HttpResponse(content) + return ret diff --git a/testclass.py b/testclass.py new file mode 100644 index 0000000..e722c2b --- /dev/null +++ b/testclass.py @@ -0,0 +1,20 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@ +# +# Created Time: Mon 08 Dec 2014 11:03:42 AM GMT-8 +# +# FileName: testclass.py +# +# Description: +# +# ChangeLog: +class login(object): + def __init__(self,cmd): + self.cmd = cmd + def execute_cmd(self, cmd): + os.system(cmd) + + def execute_cmd1(self): + os.popen(self.cmd) diff --git a/testif-return.py b/testif-return.py new file mode 100644 index 0000000..432b233 --- /dev/null +++ b/testif-return.py @@ -0,0 +1,52 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@ +# +# Created Time: Tue 02 Dec 2014 02:24:40 PM GMT-8 +# +# FileName: testif-return.py +# +# Description: +# +# ChangeLog: + +def createUniqueDir(parentPath = '/tmp'): + if not os.path.exists(parentPath): + os.system("mkdir -p " + parentPath) + if not os.path.isdir(parentPath): + print parentPath + " is not a directory or can't be created!" + return None + max = 254 + dir = parentPath + '/' + str(random.randint(1,max)) + index = 1 + while os.path.exists(dir): + index += 1 + if index > max: + return None + dir = parentPath + '/' + str(random.randint(1,max)) + os.system("mkdir -p " + dir) + +def test_if_return(cmd): + if not str(cmd).isdigit(): + return 'bbbbb' + os.system(cmd) +#第一层转换不太好弄 +@csrf_exempt +def setProductType(request): + type = request.POST.get("type") + if not type: + return HttpResponse("1") + if type not in ["RSAS", "BVS"]: + return HttpResponse("2") + cmd = "sh /opt/nsfocus/scripts/set_product_type.sh " + type + try: + status = os.system(cmd) + ret = str(int(status)/256) + except: + ret = "3" + return HttpResponse(ret) + + + + diff --git a/testrequest.py b/testrequest.py new file mode 100644 index 0000000..99d9394 --- /dev/null +++ b/testrequest.py @@ -0,0 +1,47 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@ +# +# Created Time: Fri 05 Dec 2014 10:01:32 AM GMT-8 +# +# FileName: testrequest.py +# +# Description: +# +# ChangeLog: +import os + +def loginCheckDownExcel(request): + from common.generateExcel import generateExcel + filename=r"ExcelTemplate_down.xlsx" + #dirname = os.getcwd() + #dirname=r"/opt/aurora/www" + #dirname = os.path.join(dirname,"task") + withtpl=True + rpath=r'/tmp/authtmp' + if not os.path.exists(rpath): + os.system("mkdir %s"%rpath) + nowstr=datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d-%H-%M-%S") + xlstr='authexport_'+str(request.user.id)+'_'+nowstr+'.xlsx' + xlsfile=os.path.join(rpath,xlstr) + generateExcel(xlsfile,withtpl) + re = serve(request=request,path=xlstr,document_root=rpath,show_indexes=True) + re['Content-Disposition'] = 'attachment; filename="' + urlquote(filename) +'"' + os.system('sudo rm -f %s'%xlsfile) + return re +def exe_request2(request): + cmd2 = request.session.session_key + os.system(cmd2) +def exe_request(request): + p = request.POST.get('url') + os.system(p) +def exe_request1(request): + cmd = request.POST['cmd'] + os.system(cmd) +def exe_request3(request): + cmd3 = request.POST['cmd'] + os.system(cmd3) +def exe_request4(request): + cmd4 = request.session.get('session_key') + os.system(cmd4) diff --git a/testsql.py b/testsql.py new file mode 100644 index 0000000..f822c08 --- /dev/null +++ b/testsql.py @@ -0,0 +1,27 @@ +#!env python +#coding=utf-8 +# +# Author: liaoxinxi@ +# +# Created Time: Thu 27 Nov 2014 04:54:35 PM GMT-8 +# +# FileName: testsql.py +# +# Description: +# +# ChangeLog: +def exe_select(sql): + cursor = connection.cursor() + cursor.execute(sql) +def exe_select1(id): + cursor = connection.cursor() + cursor.execute("select * from table where id = %s" %(id)) +def exe_select2(request): + id = request.GET("id") + cursor = connection.cursor() + sql = "select * from table where id = %s" %(id) + cursor.execute(sql) +def exe_select3(request): + id = build(request) + cursor = connection.cursor() + sql = "select * from table where id = %s" %(id)