Files
log4j-finder/log4j-finder.py

332 lines
11 KiB
Python
Raw Normal View History

2021-12-14 11:40:41 +01:00
#!/usr/bin/env python3
#
# file: log4j-finder.py
# author: NCC Group / Fox-IT / Research and Intelligence Fusion Team (RIFT)
#
# Scan the filesystem to find Log4j2 files that is vulnerable to Log4Shell (CVE-2021-44228)
# It scans recursively both on disk and inside Java Archive files (JARs).
#
# Example usage to scan a path (defaults to /):
# $ python3 log4j-finder.py /path/to/scan
#
# Or directly a JAR file:
# $ python3 log4j-finder.py /path/to/jarfile.jar
#
# Or multiple directories:
# $ python3 log4j-finder.py /path/to/dir1 /path/to/dir2
#
import os
import io
import sys
import time
2021-12-14 11:40:41 +01:00
import zipfile
import logging
import argparse
import hashlib
import datetime
import functools
import itertools
import collections
from pathlib import Path
__version__ = "1.0.1"
FIGLET = f"""\
__ _____ __ ___ __ __
| |.-----.-----.| | ||__|______.' _|__|.-----.--| |.-----.----.
| || _ | _ ||__ | |______| _| || | _ || -__| _|
|__||_____|___ | |__|| | |__| |__||__|__|_____||_____|__|
|_____| |___| v{__version__} https://github.com/fox-it/log4j-finder
"""
# Optionally import colorama to enable colored output for Windows
try:
import colorama
colorama.init()
NO_COLOR = False
except ImportError:
NO_COLOR = True if sys.platform == "win32" else False
2021-12-14 11:40:41 +01:00
log = logging.getLogger(__name__)
# Java Archive Extensions
JAR_EXTENSIONS = (".jar", ".war", ".ear")
# Filenames to find and MD5 hash (also recursively in JAR_EXTENSIONS)
# Currently we just look for JndiManager.class
FILENAMES = [
p.lower()
for p in [
"JndiManager.class",
]
]
# Known BAD
MD5_BAD = {
# JndiManager.class (source: https://github.com/nccgroup/Cyber-Defence/blob/master/Intelligence/CVE-2021-44228/modified-classes/md5sum.txt)
"04fdd701809d17465c17c7e603b1b202": "log4j 2.9.0 - 2.11.2",
"21f055b62c15453f0d7970a9d994cab7": "log4j 2.13.0 - 2.13.3",
"3bd9f41b89ce4fe8ccbf73e43195a5ce": "log4j 2.6 - 2.6.2",
"415c13e7c8505fb056d540eac29b72fa": "log4j 2.7 - 2.8.1",
"5824711d6c68162eb535cc4dbf7485d3": "log4j 2.12.0 - 2.12.1",
"6b15f42c333ac39abacfeeeb18852a44": "log4j 2.1 - 2.3",
"8b2260b1cce64144f6310876f94b1638": "log4j 2.4 - 2.5",
"a193703904a3f18fb3c90a877eb5c8a7": "log4j 2.8.2",
"f1d630c48928096a484e4b95ccb162a0": "log4j 2.14.0 - 2.14.1",
# 2.15.0 vulnerable to Denial of Service attack (source: https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-45046)
"5d253e53fa993e122ff012221aa49ec3": "log4j 2.15.0",
2021-12-14 11:40:41 +01:00
}
# Known GOOD
MD5_GOOD = {
# JndiManager.class (source: https://repo.maven.apache.org/maven2/org/apache/logging/log4j/log4j-core/2.16.0/log4j-core-2.16.0.jar)
"ba1cf8f81e7b31c709768561ba8ab558": "log4j 2.16.0",
}
def md5_digest(fobj):
"""Calculate the MD5 digest of a file object."""
d = hashlib.md5()
for buf in iter(functools.partial(fobj.read, io.DEFAULT_BUFFER_SIZE), b""):
d.update(buf)
return d.hexdigest()
def iter_scandir(path, stats=None):
2021-12-14 11:40:41 +01:00
"""
Yields all files matcthing JAR_EXTENSIONS or FILENAMES recursively in path
"""
p = Path(path)
if p.is_file():
if stats:
stats["files"] += 1
2021-12-14 11:40:41 +01:00
yield p
try:
for entry in scantree(path, stats=stats):
2021-12-14 11:40:41 +01:00
if entry.is_symlink():
continue
elif entry.is_file():
name = entry.name.lower()
if name.endswith(JAR_EXTENSIONS):
yield Path(entry.path)
elif name in FILENAMES:
yield Path(entry.path)
except IOError as e:
log.debug(e)
def scantree(path, stats=None):
2021-12-14 11:40:41 +01:00
"""Recursively yield DirEntry objects for given directory."""
try:
with os.scandir(path) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
if stats:
stats["directories"] += 1
yield from scantree(entry.path, stats=stats)
2021-12-14 11:40:41 +01:00
else:
if stats:
stats["files"] += 1
2021-12-14 11:40:41 +01:00
yield entry
except IOError as e:
log.debug(e)
def iter_jarfile(fobj, parents=None, stats=None):
2021-12-14 11:40:41 +01:00
"""
Yields (zfile, zinfo, zpath, parents) for each file in zipfile that matches `FILENAMES` or `JAR_EXTENSIONS` (recursively)
"""
parents = parents or []
try:
with zipfile.ZipFile(fobj) as zfile:
for zinfo in zfile.infolist():
# log.debug(zinfo.filename)
zpath = Path(zinfo.filename)
if zpath.name.lower() in FILENAMES:
yield (zinfo, zfile, zpath, parents)
elif zpath.name.lower().endswith(JAR_EXTENSIONS):
yield from iter_jarfile(
zfile.open(zinfo.filename), parents=parents + [zpath]
)
except IOError as e:
log.debug(f"{fobj}: {e}")
2021-12-14 11:40:41 +01:00
except zipfile.BadZipFile as e:
log.debug(f"{fobj}: {e}")
2021-12-14 11:40:41 +01:00
def red(s):
if NO_COLOR:
return s
return f"\033[31m{s}\033[0m"
def green(s):
if NO_COLOR:
return s
return f"\033[32m{s}\033[0m"
def yellow(s):
if NO_COLOR:
return s
return f"\033[33m{s}\033[0m"
def cyan(s):
if NO_COLOR:
return s
return f"\033[36m{s}\033[0m"
2021-12-14 11:40:41 +01:00
def bold(s):
if NO_COLOR:
return s
return f"\033[1m{s}\033[0m"
def check_vulnerable(fobj, path_chain, stats, has_jndilookup=True):
2021-12-14 11:40:41 +01:00
"""
Test if fobj matches any of the known bad or known good MD5 hashes.
Also prints message if fobj is vulnerable or known good or unknown.
if `has_jndilookup` is False, it means `lookup/JndiLookup.class` was not found and could
indicate it was patched according to https://logging.apache.org/log4j/2.x/security.html using:
zip -q -d log4j-core-*.jar org/apache/logging/log4j/core/lookup/JndiLookup.class
2021-12-14 11:40:41 +01:00
"""
md5sum = md5_digest(fobj)
first_path = bold(path_chain.pop(0))
path_chain = " -> ".join(str(p) for p in [first_path] + path_chain)
now = datetime.datetime.utcnow()
2021-12-14 11:40:41 +01:00
vulnerable = red("VULNERABLE")
patched = cyan("PATCHED")
2021-12-14 11:40:41 +01:00
good = green("GOOD")
unknown = yellow("UNKNOWN")
if md5sum in MD5_BAD:
comment = MD5_BAD[md5sum]
if has_jndilookup:
print(f"[{now}] {vulnerable}: {path_chain} [{md5sum}: {comment}]")
stats["vulnerable"] += 1
else:
print(f"[{now}] {patched}: {path_chain} [{md5sum}: {comment}]")
stats["patched"] += 1
2021-12-14 11:40:41 +01:00
elif md5sum in MD5_GOOD:
comment = MD5_GOOD[md5sum]
print(f"[{now}] {good}: {path_chain} [{md5sum}: {comment}]")
2021-12-14 11:40:41 +01:00
stats["good"] += 1
else:
print(f"[{now}] {unknown}: MD5 not known for {path_chain} [{md5sum}]")
2021-12-14 11:40:41 +01:00
stats["unknown"] += 1
def main():
parser = argparse.ArgumentParser(
description=f"%(prog)s v{__version__} - Find vulnerable log4j2 on filesystem (Log4Shell CVE-2021-4428, CVE-2021-45046)",
2021-12-14 11:40:41 +01:00
epilog="Files are scanned recursively, both on disk and in Java Archive Files",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"path",
metavar="PATH",
nargs="*",
default=["/"],
help="Directory or file(s) to scan (recursively)",
)
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="verbose output (-v is info, -vv is debug)",
)
parser.add_argument(
"-n", "--no-color", action="store_true", help="disable color output"
)
parser.add_argument("-b", "--no-banner", action="store_true", help="disable banner")
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}")
2021-12-14 11:40:41 +01:00
args = parser.parse_args()
logging.basicConfig(
format="%(asctime)s %(levelname)s %(message)s",
)
if args.verbose == 1:
log.setLevel(logging.INFO)
log.info("info logging enabled")
elif args.verbose >= 2:
log.setLevel(logging.DEBUG)
log.debug("debug logging enabled")
if args.no_color:
global NO_COLOR
NO_COLOR = True
stats = {
"scanned": 0,
"files": 0,
"directories": 0,
2021-12-14 11:40:41 +01:00
"vulnerable": 0,
"patched": 0,
2021-12-14 11:40:41 +01:00
"good": 0,
"unknown": 0,
}
start_time = time.monotonic()
if not args.no_banner:
print(FIGLET)
2021-12-14 11:40:41 +01:00
for directory in args.path:
now = datetime.datetime.utcnow()
print(f"[{now}] Scanning: {directory}")
for p in iter_scandir(directory, stats=stats):
2021-12-14 11:40:41 +01:00
if p.name.lower() in FILENAMES:
stats["scanned"] += 1
2021-12-14 11:40:41 +01:00
log.info(f"Found file: {p}")
with p.open("rb") as fobj:
# If we find JndiManager, we also check if JndiLookup.class exists
has_lookup = True
if p.name.lower().endswith("JndiManager.class".lower()):
lookup_path = p.parent.parent / "lookup/JndiLookup.class"
has_lookup = lookup_path.exists()
check_vulnerable(fobj, [p], stats, has_lookup)
2021-12-14 11:40:41 +01:00
if p.suffix.lower() in JAR_EXTENSIONS:
try:
log.info(f"Found jar file: {p}")
stats["scanned"] += 1
2021-12-14 11:40:41 +01:00
for (zinfo, zfile, zpath, parents) in iter_jarfile(
p.resolve().open("rb"), parents=[p.resolve()]
):
log.info(f"Found zfile: {zinfo} ({parents}")
with zfile.open(zinfo.filename) as zf:
# If we find JndiManager.class, we also check if JndiLookup.class exists
has_lookup = True
if zpath.name.lower().endswith("JndiManager.class".lower()):
lookup_path = str(
zpath.parent.parent / "lookup/JndiLookup.class"
)
has_lookup = zipfile.Path(zfile, lookup_path).exists()
check_vulnerable(zf, parents + [zpath], stats, has_lookup)
2021-12-14 11:40:41 +01:00
except IOError as e:
log.debug(f"{p}: {e}")
2021-12-14 11:40:41 +01:00
elapsed_time = time.monotonic() - start_time
now = datetime.datetime.utcnow()
print(f"[{now}] Finished scan, elapsed time: {elapsed_time:.2f} seconds")
2021-12-14 11:40:41 +01:00
print("\nSummary:")
print(f" Processed {stats['files']} files and {stats['directories']} directories")
print(f" Scanned {stats['scanned']} files")
2021-12-14 11:40:41 +01:00
if stats["vulnerable"]:
print(" Found {} vulnerable files".format(stats["vulnerable"]))
2021-12-14 11:40:41 +01:00
if stats["good"]:
print(" Found {} good files".format(stats["good"]))
if stats["patched"]:
print(" Found {} patched files".format(stats["patched"]))
2021-12-14 11:40:41 +01:00
if stats["unknown"]:
print(" Found {} unknown files".format(stats["unknown"]))
print(f"\nElapsed time: {elapsed_time:.2f} seconds ")
2021-12-14 11:40:41 +01:00
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
print("\nAborted!")