Files
py-security-audit-tool/judge_injection.py
2014-12-15 22:26:13 -08:00

756 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!env python
#coding=utf-8
#
# Author: liaoxinxi
#
# Created Time: Fri 21 Nov 2014 10:49:03 AM GMT-8
#
# FileName: judge_injection.py
#
# Description:
#
# ChangeLog:
import dump_python
import logging
import color_log
import json
import os
import re
import traceback
import sys
from optparse import OptionParser
from collections import OrderedDict
logger = color_log.init_log(logging.DEBUG)
logger = color_log.init_log(logging.INFO)
logger = color_log.init_log(logging.WARNING)
logger = color_log.init_log(logging.ERROR)
DEBUG = False
args_ori = set([])
is_arg_in = False
is_arg_return_op = False
UNSAFE_FUNCS = ["system", "popen", "call", "Popen", "getoutput", "getstatusoutput", \
"eval", "spawnl", 'popen2', 'popen3', 'popen4' ]
FILE_UNSAFE_FUNCS = set()
FILE_SQL_UNSAFE_FUNCS = set()
UNTREATED_FUNS = set(['open','readline'])
STR_FUNCS = ['str','unicode','encode','strip','rstrip','lstrip','lower','upper','split','splitlines', 'replace','join']
SAFE_FUNCS = []
SQL_FUNCS = ['execute', 'raw']
CMD_COUNT = 0
class judge_injection(object):
"""根据语法树自动判断注入攻击"""
def __init__(self, filename, check_type):
try:
self.tree = dump_python.parse_json(filename)
except Exception,e:
self.tree = "{}"
print e
self.tree = json.loads(self.tree)
rec_decrease_tree(self.tree)
if DEBUG:
# rec_decrease_tree(self.tree)
try:
fd = open(filename+".json", 'w')
json.dump(self.tree, fd)
fd.flush()
fd.close()
except:
pass
self.filename = self.tree.get("filename")
self.start = self.tree.get("start")
self.body = self.tree.get("body")
self.funcs = []
self.func_lines = {}#获取一个函数的执行代码
self.check_type = check_type
with open(self.filename, 'r') as fd:
self.lines = fd.readlines()
self.unsafe_func = set()#记录本文件中自己的危险函数
self.untreated_func = set()#记录那些函数参数到返回值是可控的函数
self.record_unsafe_func = OrderedDict({}) #用于打印危险函数
self.record_param = {}
logger.debug("filename:%s" %(self.filename))
def get_risk_func(self):
"""用于输入系统危险函数"""
funcs = ["os.system", "os.popen", "subprocess.call", "subprocess.Popen",\
"commands.getoutput", "commands.getstatusoutput","pickle.loads"]
funcs = ["system", "popen", "call", "Popen", "getoutput", "getstatusoutput", \
"eval", "spawnl", 'popen2', 'popen3', 'popen4']
return funcs
def get_func_objects(self, body):
"""获取语法树中的函数结构们"""
for obj in body:#代码行
if obj.get("type") == "FunctionDef":
self.funcs.append(obj)
logger.debug("func:%r" %(obj))
elif obj.get('type') == 'ClassDef':
self.get_func_objects(obj.get('body'))
return
def get_func_lines(self, func, func_name):
"""获取函数的执行的行,找到func"""
#if "body" in func:
if isinstance(func, dict) and 'body' in func:
lines = func.get('body')
elif isinstance(func, list):
lines = func
elif isinstance(func, dict) and func.get('type') == 'Call':
lines = [func]
else:
lines = []
for line in lines:
ast_body = line.get('body')
ast_orelse = line.get('orelse')
ast_handlers = line.get('handlers')
ast_test_comparators = line.get('test')
ast_args = line.get('args')
# print "line:",line
if "value" in line and line.get('value') and "func" in line.get("value"):
self.func_lines[func_name].append(line)
continue
elif line.get('type') == 'Call':
self.func_lines[func_name].append(line)
continue
if ast_body:
self.get_func_lines(ast_body, func_name)
if ast_orelse:
self.get_func_lines(ast_orelse, func_name)
if ast_handlers:
self.get_func_lines(ast_handlers, func_name)
if ast_test_comparators and ast_test_comparators.get('comparators'):
self.get_func_lines(ast_test_comparators.get('comparators'), func_name)
if ast_test_comparators and ast_test_comparators.get('left'):
self.get_func_lines(ast_test_comparators.get('left'), func_name)
if ast_args:
self.get_func_lines(ast_args, func_name)
return
def parse_func(self, func, analyse_all):
global leafs
global args_ori
global is_arg_in
global CMD_COUNT
global is_arg_return_op
is_arg_return_op = False
arg_leafs = []
func_name = func.get("name")
logger.debug("function_name:%s" %(func_name))
args_ori = set([arg.get("id") for arg in func.get('args').get("args")]) #arg.id
logger.debug("args:%s" %str(args_ori))
self.func_lines.setdefault(func_name, [])
self.get_func_lines(func, func_name)
lines = self.func_lines[func_name]
logger.debug("func_lines:%r" %(lines))
# if analyse_all:
look_up_arg(func, args_ori, arg_leafs,func_name)
# self.record_param.setdefault(func_name, args_ori)
self.record_param[func_name] = args_ori
# if not analyse_all:
# print 'func,record_param:', func_name,self.record_param.get(func_name)
# is_arg_return(func, args_ori)
# print 'is_arg_return_op:',is_arg_return_op
# if is_arg_in and not is_arg_return_op:
# if func_name not in ("__init__"):
# FILE_UNSAFE_FUNCS.add(func_name)
# print "func_lines:", lines
# print "func_:", func
#对所有有函数执行的语句做进一步处理
for line in lines:
#print "all:%r" %(line)
# print "*"*20
arg_leafs = []
is_arg_in = False
value = line.get("value")
lineno = line.get("lineno")
if (value and value.get("type") == "Call") or (line and line.get('type') == 'Call'):
logger.debug("value:%r" %(value))
line_func = value.get("func") if value else line.get('func')
line_func = value if value and value.get('type')=='Call' else line
value_args = value.get('args') if value else line.get('args')
func_ids = []
rec_get_func_ids(line_func, func_ids)
func_ids = set(func_ids)
find_all_leafs(value_args, arg_leafs)
logger.info("arg_leafs:%r" %(arg_leafs))
logger.info("func_ids:%r" %(func_ids))
# if analyse_all:
# look_up_arg(func, args_ori, arg_leafs,func_name)
# print "UNTREATED_FUNS", UNTREATED_FUNS
if self.check_type[0] and func_ids and (func_ids&((set(UNSAFE_FUNCS)|set(FILE_UNSAFE_FUNCS)))) and value_args:
if self.check_type[2] and arg_leafs:
print "CMD--FILE:%s,FUNCTION:%s,LINE:%s" %(self.filename, func_name, lineno )
if set(arg_leafs)&set(self.record_param.get(func_name)):
if not is_arg_return_op and func_name not in ("__init__"):
FILE_UNSAFE_FUNCS.add(func_name)
self.record_unsafe_func.setdefault(lineno, {'func_name':func_name, 'args':args_ori, 'func_ids':func_ids}, )
CMD_COUNT = CMD_COUNT + 1
# if self.check_type[1] and line_func.get("attr") in ['execute', 'raw'] and value_args:
if self.check_type[1] and func_ids and (func_ids&((set(['execute','raw'])|FILE_SQL_UNSAFE_FUNCS))) and value_args:
if self.check_type[2] and arg_leafs:
print "SQL--FILE:%s,FUNCTION:%s,LINE:%s" %(self.filename, func_name, lineno )
if set(arg_leafs)&set(self.record_param.get(func_name)):
print self.lines[lineno - 1]
FILE_SQL_UNSAFE_FUNCS.add(func_name)
self.record_unsafe_func.setdefault(lineno, {'func_name':func_name, 'args':args_ori, 'func_ids':func_ids}, )
# print "cmd_count:",CMD_COUNT
def parse_py(self):
self.get_func_objects(self.body)
for func in self.funcs:
self.parse_func(func, True)
# print "file_unsafe_func:", FILE_UNSAFE_FUNCS
# print "*****"*50
for func in self.funcs:
self.parse_func(func, False)
# print 'COUNT',CMD_COUNT
def record_all_func(self):
for key, value in self.record_unsafe_func.iteritems():
print "maybe injected File:%s,function:%s,line:%s,dangerous_func:%r" %(self.filename, value.get('func_name'), key, value.get('func_ids'))
logger.error("maybe injected File:%s,function:%s,line:%s,dangerous_func:%r" %(self.filename, value.get('func_name'), key, value.get('func_ids')))
print self.lines[key - 1]
#print "FILE_UNSAFE_FUNCS",FILE_UNSAFE_FUNCS
def find_all_leafs(args, leafs):
for arg in args:
find_arg_leafs(arg, leafs)
def find_arg_leafs(arg, leafs):
"""通过递归找到全所有子节点,历史原因复数格式不修正"""
fields = arg.get("_fields")
_type = arg.get('type')
if _type == "Attribute":
find_arg_leafs(arg.get('value'), leafs)
if _type == "Name":
leafs.append(arg.get('id'))
if _type == 'Call':
func_ids = []
rec_get_func_ids(arg.get('func'), func_ids)
if set(func_ids)&(set(STR_FUNCS)|set(UNTREATED_FUNS)|set(UNSAFE_FUNCS)|set(FILE_UNSAFE_FUNCS)):
parent, topids = {}, []
rec_get_attr_top_id(arg.get('func'), parent, topids)
if topids:
leafs.append(topids[0])
for arg_item in arg.get('args'):
find_arg_leafs(arg_item, leafs)
if arg.get('func') and arg.get('func').get('type') != 'Name':
find_arg_leafs(arg.get('func'), leafs)
if _type == 'Subscript':
find_arg_leafs(arg.get('value'), leafs)
if _type == "BinOp" and fields:
if "right" in fields:
if arg.get('right').get('type') == "Name":
right_id = arg.get("right").get("id")
if right_id:
leafs.append(right_id)
elif arg.get('right').get('type') == 'Tuple':
for elt in arg.get('right').get('elts'):
find_arg_leafs(elt, leafs)
if "left" in fields and not arg.get("left").get("_fields"):
left_id = arg.get('left').get('id')
if left_id:
leafs.append(left_id)
if "left" in fields and arg.get("left").get("_fields"):
find_arg_leafs(arg.get("left"), leafs)
return
def is_arg_return(func, args_ori):
"""
判断是否有对arg参数的可控性判断比如判读是否数字是否file等
"""
global is_arg_return_op
if isinstance(func, dict):
lines = func.get('body')
elif isinstance(func, list):
lines = func
for line in lines:
is_return = False
is_arg_in = False
is_param = False
ast_body = line.get('body')
ast_orelse = line.get('orelse')
ast_handlers = line.get('handlers')
if line.get('type') == "If":
for body in line.get('body'):
if body.get('type') == "Return":
is_return = True
test = line.get('test')
if line.get('test') and line.get('test').get('type') == "UnaryOp":
operand = line.get('test').get('operand')
if operand:
args = []
rec_find_args(line.get('test'), args)
if set(args)&set(args_ori):
is_arg_in = True
elif test and test.get('type') == 'Compare':
args = []
for key,value in test.iteritems():
if key == 'left':
if test[key].get('type') == 'Name':
args = [test[key].get('id')]
if key == 'comparators':
for comparator in test[key]:
if comparator.get('type') in ("List", 'Tuple'):
for elt in comparator.get('elts'):
if elt.get('type') == 'Name':
is_param = True
if set(args)&set(args_ori) and not is_param:
is_arg_in = True
is_arg_return_op = is_return&is_arg_in
if is_arg_return_op:#找到即返回
logger.info("is_arg_return:%r" %(line))
return
if ast_body:
is_arg_return(ast_body, args_ori)
# if ast_orelse:
# is_arg_return(ast_orelse, args_ori)
# if ast_handlers:
# is_arg_return(ast_handlers, args_ori)
def rec_find_args(operand, args):
if isinstance(operand, list) or isinstance(operand, tuple):
find_all_leafs(operand, args)
elif isinstance(operand, dict):
if operand.get('type') == 'Call':
if "args" in operand:
find_all_leafs(operand.get('args'), args)
if "value" in operand.get('func'):
rec_find_args(operand.get('func').get('value'), args)
elif operand.get('type') == 'UnaryOp':# not param判断中
rec_find_args(operand.get('operand'), args)
elif operand.get('type') == 'BinOp':
find_arg_leafs(operand, args)
else:
return
def rec_get_attr_top_id(func, parent, ids):#获取最顶端的值eg:request
"""
func = {u'_fields': [u'value', u'attr_name'], u'type': u'Attribute', u'attr': u'get', u'value': {u'_fields': [u'value', u'attr_name'], u'type': u'Attribute', u'attr': u'POST', u'value': {u'type': u'Name', u'lineno': 15, u'id': u'request'}, u'lineno': 15}, u'lineno': 15}
ids 用于回传结果,只有一个
"""
if func.get('type') == 'Name':
ids.append(func.get('id'))
if func.get('type') == 'Attribute':
parent.update(func)
rec_get_attr_top_id(func.get('value'), parent, ids)
if func.get('type') == 'Call':
parent.update(func)
rec_get_attr_top_id(func.get('func'), parent, ids)
return
def look_up_arg(func, args_ori, args, func_name):
"""递归找出危险函数中的参数是否属于函数参数入口的"""
"""
func 代表测试的函数,args_ori是要被测试的函数的参数args则是危险函数中的参数
"""
global is_arg_in
if isinstance(func, dict) and 'body' in func:
lines = func.get('body')
elif isinstance(func, list):
lines = func
elif isinstance(func, dict) and func.get('type') == 'Call':
lines = [func]
else:
lines = []
for line in lines:
# print 'look_up_arg:line:',line
ast_body = line.get('body')
ast_orelse = line.get('orelse')
ast_handlers = line.get('handlers')
ast_test = line.get('test')
ast_args = line.get('args')
#处理单纯属性
if line.get('type') == 'Assign':
target_ids = [target.get("id") for target in line.get("targets") if target.get('id') ]
else:
target_ids = []
if line.get("type") == "Assign" and "value" in line and line.get("value").get("type")=="Name":
if target_ids and line.get("value").get("id") in args_ori:
args_ori.update(target_ids)
logger.info("In Assign,Name add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
if line.get("type") == "Assign" and "value" in line and line.get("value").get("type")=="Attribute":
value_func = line.get('value').get('value')
if value_func and value_func.get("type") == 'Name':
if target_ids and value_func.get("id") in args_ori:
args_ori.update(target_ids)
logger.info("In Assign,Attr add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
else:
topids = []
parent = {}
rec_get_attr_top_id(value_func, parent, topids)
if (set(topids)&set(args_ori)):
if topids and topids[0].lower() == 'request':
if parent and parent.get('type')=='Attribute' and parent.get('attr') in ['GET','POST','FILES']:
args_ori.update(target_ids)
logger.info("In Assign,Attr add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
elif parent and parent.get('type')=='Attribute':
args_ori.difference_update(set(target_ids))
logger.warn("In Assign,Attr delete (%r) from (%r) where line=(%r)***************************** line=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
#处理字符串拼接过程
if line.get("type") == "Assign" and "value" in line and line.get("value").get("type")=="BinOp":
# right = line.get('value').get('right')
# if right.get('type') == 'Tuple':
# rec_find_args(right.get('elts'))
leafs = []
find_arg_leafs(line.get("value"), leafs)
if (set(args_ori)&set(leafs)):
if target_ids:
args_ori.update(target_ids)
logger.info("In Assign,BinOp add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
#列表解析式
if line.get("type") == "Assign" and "value" in line and line.get("value").get("type") in ("ListComp","SetComp"):
generators = line.get('value').get('generators')
leafs = []
for generator in generators:
find_arg_leafs(generator.get('iter'), leafs)
if target_ids and (set(args_ori)&set(leafs)):
args_ori.update(target_ids)
logger.info("In Assign,ListComp,SetComp add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
#处理Subscript分片符情况
if line.get('type') == 'Assign' and 'value' in line and line.get('value').get('type')=='Subscript':
value_type = line.get('value').get('value').get('type')
value_func_ids = []
rec_get_func_ids(line.get('value').get('value'), value_func_ids)
value_func_ids = set(value_func_ids)
value_arg_ids = []
find_arg_leafs(line.get('value').get('value'), value_arg_ids)
if value_type == 'Attribute':
if value_func_ids and value_func_ids.issubset((set(['POST','GET','FILES'])|set(STR_FUNCS))):
if target_ids and not (set(value_arg_ids)&set(target_ids)):
args_ori.update(target_ids)
logger.info("In Assign,Subscript add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
#处理调用函数后的赋值,像strget取值都保留
if line.get("type") == "Assign" and "value" in line and line.get("value").get("type")=="Call":
value_arg_ids = []
rec_find_args(line.get('value'), value_arg_ids)
value_func_ids = []
rec_get_func_ids(line.get('value').get('func'), value_func_ids)
value_func_ids = set(value_func_ids)
value_func_type = line.get("value").get('func').get('type')
value_func = line.get('value').get('func')
(topids, parent) = ([], {})
rec_get_attr_top_id(value_func, parent, topids)
if value_arg_ids or topids:
#处理普通方法
if value_func_type == 'Name' and (set(value_arg_ids)&set(args_ori)):
if target_ids and value_func_ids and value_func_ids.issubset((set(STR_FUNCS)|set(UNTREATED_FUNS))):
# args_ori = args_ori|set(target_ids)
args_ori.update(target_ids)
logger.info("In Assign,Call:Name add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
elif target_ids and value_func_ids and (value_func_ids&((set(UNSAFE_FUNCS)|set(FILE_UNSAFE_FUNCS)))):
is_arg_in = True
elif target_ids:
args_ori.difference_update(target_ids)
logger.warn("In Assign,Call delete (%r) from (%r) where line=(%r)***************************** type=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
# for target in target_ids:#处理cmd=int(cmd) 这种情况
# args_ori.difference_update(target_ids)
# if target in args_ori:
# args_ori.discard(target)
# logger.info("arg_id,assign31:%r,args_ori:%r" %(value_arg_ids, args_ori))
elif value_func_type == 'Attribute':#处理属性方法如从dict取值
if (set(topids)&set(args_ori)):
if topids[0].lower() == 'request':
if parent and parent.get('type')=='Attribute' and parent.get('attr') in ['GET','POST','FILES']:
if target_ids and not (set(value_arg_ids)&set(target_ids)):
args_ori.update(target_ids)
logger.info("In Assign,Call:attr add (%r) to (%r) where line=(%r) type=(%r)" %(target_ids,args_ori,parent.get('lineno'), line))
elif parent and parent.get('type')=='Attribute':
args_ori.difference_update(set(target_ids))#去除target_ids
logger.warn("In Assign,Call:attr delete (%r) from (%r) where line=(%r)***************************** type=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
elif value_func_ids and value_func_ids.issubset(set(STR_FUNCS)|set(UNTREATED_FUNS)) and (set(value_arg_ids)&set(args_ori)):
if target_ids and not (set(value_arg_ids)&set(target_ids)):
args_ori.update(target_ids)
logger.info("In Assign,Call:attr add (%r) to (%r) where line=(%r) type=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
else:
if target_ids and not (set(value_arg_ids)&set(target_ids)):
args_ori.update(target_ids)
logger.info("In Assign,Call:attr add (%r) to (%r) where line=(%r) type=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
#处理r=unicode(s).encode('utf8')
elif value_func_ids and value_func_ids.issubset(set(STR_FUNCS)|set(UNTREATED_FUNS)) and (set(value_arg_ids)&set(args_ori)):
if target_ids and not (set(value_arg_ids)&set(target_ids)):
args_ori.update(target_ids)
logger.info("In Assign,Call:attr add (%r) to (%r) where line=(%r) type=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
elif value_func_ids and (value_func_ids&(set(UNSAFE_FUNCS)|set(FILE_UNSAFE_FUNCS))):#处理危险函数
leafs = []
leafs = value_arg_ids
if set(leafs)&set(args_ori):
is_arg_in = True
if line.get('type') == 'Return' and 'value' in line and line.get('value'):
value_id = line.get('value').get('id')
if value_id and value_id in args_ori :
print 'untrited_func_name',func_name
UNTREATED_FUNS.add(func_name)
if line.get('type') == 'For':
iter_args = []
find_arg_leafs(line.get('iter'), iter_args)
if set(iter_args)&set(args_ori):
targets = []
find_arg_leafs(line.get('target'), targets)
if targets:
args_ori.update(targets)
logger.info("In For Call add (%r) to (%r) where line=(%r) line=(%r)" %(target_ids,args_ori,line.get('lineno'), line))
if line.get("type") == "Expr" and "value" in line and line.get("value").get("type")=="Call":
value_arg_ids = []
rec_find_args(line.get('value'), value_arg_ids)
if set(value_arg_ids)&set(args_ori):
is_arg_in = True
if line.get('type') == 'Call': #处理if语句中中eval类似函数
func_ids = []
rec_get_func_ids(line.get('func'), func_ids)
args_tmp = []
rec_find_args(line, args_tmp)
if (set(args_tmp)&args_ori) and func_ids and (set(func_ids)&(set(UNSAFE_FUNCS)|set(FILE_UNSAFE_FUNCS))):
is_arg_in = True
logger.info('type:call')
# if line.get('type') == 'Ififif':
if line.get('type') == 'If':
is_if_return = False
is_if_param = False
is_in_param = False
if_judge_func = set(['exists','isfile','isdir','isabs','isdigit'])
for body in line.get('body'):
if body.get('type') == 'Return':
is_if_return = True
test = line.get('test')
if test and test.get('type') == 'UnaryOp':
operand = test.get('operand')
args_tmp = []
if operand:
rec_find_args(operand, args_tmp)
if set(args_tmp)&set(args_ori):
is_if_param = True
func_ids = []
rec_get_func_ids(operand, func_ids)
if set(func_ids)&if_judge_func and is_if_return and is_if_param:
args_ori.difference_update(args_tmp)
logger.warn("In If delete (%r) from (%r) where line=(%r)***************************** type=(%r)" %(args_tmp,args_ori,test.get('lineno'),test.get('type')))
if test and test.get('type') == 'Compare':
args_tmp = []
for key,value in test.iteritems():
if key == 'left':
if test[key].get('type') == 'Name':
args_tmp = [test[key].get('id')]
if key == 'comparators':
for comparator in test[key]:
if comparator.get('type') in ('List', 'Tuple'):
for elt in comparator.get('elts'):
if elt.get('type') == 'Name' and elt.get('id') in args_ori:
is_in_param = True
if set(args_tmp)&set(args_ori) and is_if_return and not is_in_param:
args_ori.difference_update(args_tmp)
logger.warn("In If delete (%r) from (%r) where line=(%r)***************************** type=(%r)" %(args_tmp,args_ori,test.get('lineno'),test.get('type')))
if ast_body:
look_up_arg(ast_body, args_ori, args, func_name)
if ast_orelse:
look_up_arg(ast_orelse, args_ori, args, func_name)
if ast_handlers:
look_up_arg(ast_handlers, args_ori, args, func_name)
if ast_test and ast_test.get('comparators'):
look_up_arg(ast_test.get('comparators'),args_ori, args, func_name)
if ast_test and ast_test.get('left'):
look_up_arg(ast_test.get('left'),args_ori, args, func_name)
if ast_args :
look_up_arg(ast_args, args_ori, args, func_name)
return
def get_func_id(func, func_ids):
"""获取被调用函数的名称"""
if func.get("type") == "Name":
func_id = func.get('id')
elif func.get('type') == 'Attribute':
func_id = func.get('attr')
else:
func_id = None
if func_id:
func_ids.append(func_id)
def rec_get_func_ids(func, func_ids):#处理连续的unicode.encode等
if func.get('type') in ("Name","Attribute"):
get_func_id(func, func_ids)
if 'value' in func and func.get('value').get('func'):
rec_get_func_ids(func.get('value').get('func'), func_ids)
if func.get('type') == 'Call':
rec_get_func_ids(func.get('func'), func_ids)
for args in func.get('args'):
if args.get('type') != 'Name':
rec_get_func_ids(args, func_ids)
return
"""
def decrease_tree(tree):
tree = {k:v for k, v in tree.iteritems() if k not in ['col_offset', 'start', 'end', 'ctx', 'extra_attr']}
for key, value in tree.iteritems():
if isinstance(value, dict):
decrease_tree(value)
if isinstance(value, list):
for l in value:
if isinstance(l, dict):
decrease_tree(l)
return tree
"""
def rec_decrease_tree(tree):
if isinstance(tree, dict):
for key in tree.keys():
if key in ['col_offset', 'start', 'end', 'ctx', 'extra_attr', 'attr_name']:
del(tree[key])
else:
if isinstance(tree[key], dict):
rec_decrease_tree(tree[key])
if isinstance(tree[key], list):
for l in tree[key]:
rec_decrease_tree(l)
def walk_dir(file_path):
files = []
if os.path.isfile(file_path):
files = [file_path]
elif os.path.isdir(file_path):
for root, dirs, filenames in os.walk(file_path):
for filename in filenames:
# print 'walk_dir:filename', filename
if re.match(".*\.py$", filename.strip()):
files.append(root+"/"+filename)
return files
def print_func(filename, lineno):
with open(filename, 'r') as fd:
lines = fd.readlines()
print lines[lineno-1]
def usage():
print """用途本程序主要用于测试py代码中命令注入和sql注入\n用法python judge_injection.py -d path
path即为需要测试的目录"""
def main():
parser = OptionParser()
parser.add_option("-d", "--dir", dest="file_path",help="files to be checked")
parser.add_option("-c", "--cmd", action="store_true", dest="cmd_check",help="cmd check", default=False)
parser.add_option("-s", "--sql", action="store_true", dest="sql_check",help="sql check", default=False)
parser.add_option("-a", "--all", action="store_true", dest="cmd_sql_check",help="cmd check and sql check", default=False)
parser.add_option("-v", "--verbose", action="store_true", dest="verbose",help="print all unsafe func", default=False)
(options, args) = parser.parse_args()
file_path = options.file_path
cmd_check = options.cmd_check
sql_check = options.sql_check
cmd_sql_check = options.cmd_sql_check
verbose = options.verbose
# print "option:", options
# print file_path
# print cmd_check
# print sql_check
# sys.exit()
if cmd_sql_check:
cmd_check = True
sql_check = True
check_type = (cmd_check,sql_check, verbose)
if not file_path:
usage()
sys.exit()
else:
if (os.path.isfile(file_path) or os.path.isdir(file_path)):
files = walk_dir(file_path)
else:
print "您输入的文件或者路径不存在"
sys.exit()
for filename in files:
print "filename",filename
try:
judge = judge_injection(filename, check_type)
judge.parse_py()
judge.record_all_func()
except Exception, e:
traceback.print_exc()
if __name__ == "__main__":
# filename = "libssh2_login_test.py"
# filename = "libssh2_login_test.py.bak"
# filename = "/home/liaoxinxi/trunk/src/www/npai/systemforpa.py"
# filename = "/home/liaoxinxi/trunk/src/www/npai/getTplVulLibforpa.py"
# filename = "arg.py"
# filename = "test3.py"
#rec_decrease_tree(line)
file_path = "/home/liaoxinxi/trunk/src/www/npai"
file_path = "/home/liaoxinxi/trunk/src/www/"
# files = walk_dir(file_path)
files = ["libssh2_login_test.py.bak"]
# files = ["testsql.py"]
# files = ["test_cmd2.py"]
# check_type = (True,False)
# for filename in files:
# print "filename",filename
# try:
# judge = judge_injection(filename, check_type)
# judge.parse_py()
# except Exception, e:
# traceback.print_exc()
main()