Files
log4j-finder/log4j-finder.py
yunzheng c39e02cdbf Restored figlet
It's important :)
2021-12-17 21:37:26 +01:00

350 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
#
# file: log4j-finder.py
# author: NCC Group / Fox-IT / Research and Intelligence Fusion Team (RIFT)
#
# Scan the filesystem to find Log4j2 files that is vulnerable to Log4Shell (CVE-2021-44228)
# It scans recursively both on disk and inside Java Archive files (JARs).
#
# Example usage to scan a path (defaults to /):
# $ python3 log4j-finder.py /path/to/scan
#
# Or directly a JAR file:
# $ python3 log4j-finder.py /path/to/jarfile.jar
#
# Or multiple directories:
# $ python3 log4j-finder.py /path/to/dir1 /path/to/dir2
#
import os
import io
import sys
import time
import zipfile
import logging
import argparse
import hashlib
import platform
import datetime
import functools
import itertools
import collections
from pathlib import Path
__version__ = "1.0.1"
FIGLET = f"""\
__ _____ __ ___ __ __
| |.-----.-----.| | ||__|______.' _|__|.-----.--| |.-----.----.
| || _ | _ ||__ | |______| _| || | _ || -__| _|
|__||_____|___ | |__|| | |__| |__||__|__|_____||_____|__|
|_____| |___| v{__version__} https://github.com/fox-it/log4j-finder
"""
# Optionally import colorama to enable colored output for Windows
try:
import colorama
colorama.init()
NO_COLOR = False
except ImportError:
NO_COLOR = True if sys.platform == "win32" else False
log = logging.getLogger(__name__)
# Java Archive Extensions
JAR_EXTENSIONS = (".jar", ".war", ".ear")
# Filenames to find and MD5 hash (also recursively in JAR_EXTENSIONS)
# Currently we just look for JndiManager.class
FILENAMES = [
p.lower()
for p in [
"JndiManager.class",
]
]
# Known BAD
MD5_BAD = {
# JndiManager.class (source: https://github.com/nccgroup/Cyber-Defence/blob/master/Intelligence/CVE-2021-44228/modified-classes/md5sum.txt)
"04fdd701809d17465c17c7e603b1b202": "log4j 2.9.0 - 2.11.2",
"21f055b62c15453f0d7970a9d994cab7": "log4j 2.13.0 - 2.13.3",
"3bd9f41b89ce4fe8ccbf73e43195a5ce": "log4j 2.6 - 2.6.2",
"415c13e7c8505fb056d540eac29b72fa": "log4j 2.7 - 2.8.1",
"5824711d6c68162eb535cc4dbf7485d3": "log4j 2.12.0 - 2.12.1",
"6b15f42c333ac39abacfeeeb18852a44": "log4j 2.1 - 2.3",
"8b2260b1cce64144f6310876f94b1638": "log4j 2.4 - 2.5",
"a193703904a3f18fb3c90a877eb5c8a7": "log4j 2.8.2",
"f1d630c48928096a484e4b95ccb162a0": "log4j 2.14.0 - 2.14.1",
# 2.15.0 vulnerable to Denial of Service attack (source: https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-45046)
"5d253e53fa993e122ff012221aa49ec3": "log4j 2.15.0",
}
# Known GOOD
MD5_GOOD = {
# JndiManager.class (source: https://repo.maven.apache.org/maven2/org/apache/logging/log4j/log4j-core/2.16.0/log4j-core-2.16.0.jar)
"ba1cf8f81e7b31c709768561ba8ab558": "log4j 2.16.0",
}
HOSTNAME = platform.node()
def md5_digest(fobj):
"""Calculate the MD5 digest of a file object."""
d = hashlib.md5()
for buf in iter(functools.partial(fobj.read, io.DEFAULT_BUFFER_SIZE), b""):
d.update(buf)
return d.hexdigest()
def iter_scandir(path, stats=None):
"""
Yields all files matcthing JAR_EXTENSIONS or FILENAMES recursively in path
"""
p = Path(path)
if p.is_file():
if stats:
stats["files"] += 1
yield p
try:
for entry in scantree(path, stats=stats):
if entry.is_symlink():
continue
elif entry.is_file():
name = entry.name.lower()
if name.endswith(JAR_EXTENSIONS):
yield Path(entry.path)
elif name in FILENAMES:
yield Path(entry.path)
except IOError as e:
log.debug(e)
def scantree(path, stats=None):
"""Recursively yield DirEntry objects for given directory."""
try:
with os.scandir(path) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
if stats:
stats["directories"] += 1
yield from scantree(entry.path, stats=stats)
else:
if stats:
stats["files"] += 1
yield entry
except IOError as e:
log.debug(e)
def iter_jarfile(fobj, parents=None, stats=None):
"""
Yields (zfile, zinfo, zpath, parents) for each file in zipfile that matches `FILENAMES` or `JAR_EXTENSIONS` (recursively)
"""
parents = parents or []
try:
with zipfile.ZipFile(fobj) as zfile:
for zinfo in zfile.infolist():
# log.debug(zinfo.filename)
zpath = Path(zinfo.filename)
if zpath.name.lower() in FILENAMES:
yield (zinfo, zfile, zpath, parents)
elif zpath.name.lower().endswith(JAR_EXTENSIONS):
zfobj = zfile.open(zinfo.filename)
try:
# Test if we can open the zfobj without errors, fallback to BytesIO otherwise
# see https://github.com/fox-it/log4j-finder/pull/22
zipfile.ZipFile(zfobj)
except zipfile.BadZipFile as e:
log.debug(f"Got {zinfo}: {e}, falling back to BytesIO")
zfobj = io.BytesIO(zfile.open(zinfo.filename).read())
yield from iter_jarfile(zfobj, parents=parents + [zpath])
except IOError as e:
log.debug(f"{fobj}: {e}")
except zipfile.BadZipFile as e:
log.debug(f"{fobj}: {e}")
def red(s):
if NO_COLOR:
return s
return f"\033[31m{s}\033[0m"
def green(s):
if NO_COLOR:
return s
return f"\033[32m{s}\033[0m"
def yellow(s):
if NO_COLOR:
return s
return f"\033[33m{s}\033[0m"
def cyan(s):
if NO_COLOR:
return s
return f"\033[36m{s}\033[0m"
def magenta(s):
if NO_COLOR:
return s
return f"\033[35m{s}\033[0m"
def bold(s):
if NO_COLOR:
return s
return f"\033[1m{s}\033[0m"
def check_vulnerable(fobj, path_chain, stats, has_jndilookup=True):
"""
Test if fobj matches any of the known bad or known good MD5 hashes.
Also prints message if fobj is vulnerable or known good or unknown.
if `has_jndilookup` is False, it means `lookup/JndiLookup.class` was not found and could
indicate it was patched according to https://logging.apache.org/log4j/2.x/security.html using:
zip -q -d log4j-core-*.jar org/apache/logging/log4j/core/lookup/JndiLookup.class
"""
md5sum = md5_digest(fobj)
first_path = bold(path_chain.pop(0))
path_chain = " -> ".join(str(p) for p in [first_path] + path_chain)
comment = collections.ChainMap(MD5_BAD, MD5_GOOD).get(md5sum, "Unknown MD5")
color_map = {"vulnerable": red, "good": green, "patched": cyan, "unknown": yellow}
if md5sum in MD5_BAD:
status = "vulnerable" if has_jndilookup else "patched"
elif md5sum in MD5_GOOD:
status = "good"
else:
status = "unknown"
stats[status] += 1
color = color_map.get(status, red)
now = datetime.datetime.utcnow().replace(microsecond=0)
hostname = magenta(HOSTNAME)
status = bold(color(status.upper()))
md5sum = color(md5sum)
comment = bold(color(comment))
print(f"[{now}] {hostname} {status}: {path_chain} [{md5sum}: {comment}]")
def print_summary(stats):
print("\nSummary:")
print(f" Processed {stats['files']} files and {stats['directories']} directories")
print(f" Scanned {stats['scanned']} files")
if stats["vulnerable"]:
print(" Found {} vulnerable files".format(stats["vulnerable"]))
if stats["good"]:
print(" Found {} good files".format(stats["good"]))
if stats["patched"]:
print(" Found {} patched files".format(stats["patched"]))
if stats["unknown"]:
print(" Found {} unknown files".format(stats["unknown"]))
def main():
parser = argparse.ArgumentParser(
description=f"%(prog)s v{__version__} - Find vulnerable log4j2 on filesystem (Log4Shell CVE-2021-4428, CVE-2021-45046)",
epilog="Files are scanned recursively, both on disk and in Java Archive Files",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"path",
metavar="PATH",
nargs="*",
default=["/"],
help="Directory or file(s) to scan (recursively)",
)
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="verbose output (-v is info, -vv is debug)",
)
parser.add_argument(
"-n", "--no-color", action="store_true", help="disable color output"
)
parser.add_argument(
"-q",
"--quiet",
action="store_true",
help="be more quiet, disables banner and summary",
)
parser.add_argument("-b", "--no-banner", action="store_true", help="disable banner")
parser.add_argument(
"-V", "--version", action="version", version=f"%(prog)s {__version__}"
)
args = parser.parse_args()
logging.basicConfig(
format="%(asctime)s %(levelname)s %(message)s",
)
if args.verbose == 1:
log.setLevel(logging.INFO)
log.info("info logging enabled")
elif args.verbose >= 2:
log.setLevel(logging.DEBUG)
log.debug("debug logging enabled")
if args.no_color:
global NO_COLOR
NO_COLOR = True
stats = collections.Counter()
start_time = time.monotonic()
hostname = magenta(HOSTNAME)
if not args.no_banner and not args.quiet:
print(FIGLET)
for directory in args.path:
now = datetime.datetime.utcnow().replace(microsecond=0)
if not args.quiet:
print(f"[{now}] {hostname} Scanning: {directory}")
for p in iter_scandir(directory, stats=stats):
if p.name.lower() in FILENAMES:
stats["scanned"] += 1
log.info(f"Found file: {p}")
with p.open("rb") as fobj:
# If we find JndiManager, we also check if JndiLookup.class exists
has_lookup = True
if p.name.lower().endswith("JndiManager.class".lower()):
lookup_path = p.parent.parent / "lookup/JndiLookup.class"
has_lookup = lookup_path.exists()
check_vulnerable(fobj, [p], stats, has_lookup)
if p.suffix.lower() in JAR_EXTENSIONS:
try:
log.info(f"Found jar file: {p}")
stats["scanned"] += 1
for (zinfo, zfile, zpath, parents) in iter_jarfile(
p.resolve().open("rb"), parents=[p.resolve()]
):
log.info(f"Found zfile: {zinfo} ({parents}")
with zfile.open(zinfo.filename) as zf:
# If we find JndiManager.class, we also check if JndiLookup.class exists
has_lookup = True
if zpath.name.lower().endswith("JndiManager.class".lower()):
lookup_path = zpath.parent.parent / "lookup/JndiLookup.class"
try:
has_lookup = zfile.open(lookup_path.as_posix())
except KeyError:
has_lookup = False
check_vulnerable(zf, parents + [zpath], stats, has_lookup)
except IOError as e:
log.debug(f"{p}: {e}")
elapsed = time.monotonic() - start_time
now = datetime.datetime.utcnow().replace(microsecond=0)
if not args.quiet:
print(f"[{now}] {hostname} Finished scan, elapsed time: {elapsed:.2f} seconds")
print_summary(stats)
print(f"\nElapsed time: {elapsed:.2f} seconds")
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
print("\nAborted!")