Burpsuite→Yakit→Reqable三工具联动抓包:构建多层网络安全测试工作流

admin 2026-01-28 06:48:00 网络安全文章 来源:ZONE.CI 全球网 0 阅读模式

文章总结: 本文阐述构建Burpsuite、Yakit与Reqable三工具联动的流量捕获系统,通过串联代理架构实现Web及移动端流量的精细化控制与漏洞挖掘。内容涵盖环境配置、证书信任链、实战场景及自动化集成,提供详细的故障排除方案,旨在优化安全测试工作流,提升测试效率与覆盖面。 综合评分: 91 文章分类: 渗透测试,安全工具,移动安全,WEB安全,解决方案


5.2.2 自定义规则脚本

# 三工具协同规则脚本

class ProxyChainRuleEngine:

    def \_\_init\_\_(self):

        self.rules = self.load\_rules()

    def load\_rules(self):

        """加载联动规则"""

        return {

            'sqli\_detection': {

                'pattern': r"(['\"])[^'\"]\*?(union|select|insert|update|delete|drop|exec)[^'\"]\*?\1",

                'action': {

                    'burpsuite': 'highlight',

                    'yakit': 'auto\_scan',

                    'reqable': 'alert'

                }

            },

            'sensitive\_data': {

                'pattern': r"(password|token|secret|key|credential)\s\*[=:]\s\*['\"]?([^'\"]+)",

                'action': {

                    'burpsuite': 'mask',

                    'yakit': 'log',

                    'reqable': 'block'

                }

            }

        }

    def apply\_rule(self, request\_data, response\_data):

        """应用规则到三工具"""

        for rule\_name, rule\_config in self.rules.items():

            if re.search(rule\_config['pattern'], request\_data, re.IGNORECASE):

                # 触发Burpsuite动作

                if 'highlight' in rule\_config['action']['burpsuite']:

                    self.highlight\_in\_burpsuite(request\_data)

                # 触发Yakit动作

                if 'auto\_scan' in rule\_config['action']['yakit']:

                    self.send\_to\_yakit\_scan(request\_data)

                # 触发Reqable动作

                if 'alert' in rule\_config['action']['reqable']:

                    self.send\_alert\_to\_reqable(rule\_name)

5.3 自动化与集成

5.3.1 CI/CD集成

# GitHub Actions 配置示例

name: Security Scan with Proxy Chain

on:

  push:

    branches: [ main ]

  pull\_request:

    branches: [ main ]

jobs:

  security-scan:

    runs-on: ubuntu-latest

    steps:

    - uses: actions/checkout@v3

    - name: Setup proxy chain

      run: |

        docker-compose up -d burpsuite yakit reqable

        sleep 30  # 等待服务启动

    - name: Run automated tests

      run: |

        python run\_security\_tests.py \

          --proxy http://localhost:8080 \

          --target ${{ secrets.TARGET\_URL }} \

          --report-dir ./reports

    - name: Generate combined report

      run: |

        python generate\_report.py \

          --burp ./reports/burp.xml \

          --yakit ./reports/yakit.json \

          --reqable ./reports/reqable.db \

          --output ./reports/combined.html

    - name: Upload security report

      uses: actions/upload-artifact@v3

      with:

        name: security-scan-report

        path: ./reports/

5.3.2 API接口集成

# 三工具API集成示例

import requests

import time

class ProxyChainManager:

    def \_\_init\_\_(self, config):

        self.config = config

        self.burp\_api = "http://127.0.0.1:1337"

        self.yakit\_api = "http://127.0.0.1:8084"

        self.reqable\_api = "http://127.0.0.1:8086"

    def start\_proxy\_chain(self):

        """启动三工具服务链"""

        # 1. 启动Reqable

        requests.post(f"{self.reqable\_api}/api/start", json={

            "port": 8085,

            "ssl": True,

            "capture": True

        })

        # 2. 启动Yakit MITM

        requests.post(f"{self.yakit\_api}/mitm/start", json={

            "host": "0.0.0.0",

            "port": 8083,

            "upstream\_proxy": "127.0.0.1:8085"

        })

        # 3. 配置Burpsuite

        requests.post(f"{self.burp\_api}/burp/proxy/intercept", json={"enabled": False})

        requests.post(f"{self.burp\_api}/burp/proxy/upstream", json={

            "enabled": True,

            "proxy": "127.0.0.1:8083"

        })

        print("三工具代理链启动完成")

    def stop\_proxy\_chain(self):

        """停止服务链"""

        requests.post(f"{self.burp\_api}/burp/proxy/intercept", json={"enabled": False})

        requests.post(f"{self.yakit\_api}/mitm/stop")

        requests.post(f"{self.reqable\_api}/api/stop")

六、故障排除与维护

6.1 常见问题解决

6.1.1 连接问题诊断

# 诊断脚本:check\_proxy\_chain.py

import socket

import requests

def check\_port(host, port):

    try:

        sock = socket.socket(socket.AF\_INET, socket.SOCK\_STREAM)

        sock.settimeout(2)

        result = sock.connect\_ex((host, port))

        sock.close()

        return result == 0

    except:

        return False

def diagnose\_proxy\_chain():

    ports = [

        ("Burpsuite", "127.0.0.1", 8080),

        ("Yakit", "127.0.0.1", 8083),

        ("Reqable", "127.0.0.1", 8085)

    ]

    print("代理链连通性诊断:")

    for name, host, port in ports:

        status = "✓ 正常" if check\_port(host, port) else "✗ 异常"

        print(f"{name} ({host}:{port}): {status}")

    # 测试代理链完整路径

    print("\n测试完整代理链:")

    try:

        resp = requests.get(

            "http://httpbin.org/ip",

            proxies={"http": "http://127.0.0.1:8080"},

            timeout=10

        )

        print(f"完整代理链测试: ✓ 成功 (IP: {resp.json().get('origin')})")

    except Exception as e:

        print(f"完整代理链测试: ✗ 失败 ({str(e)})")

if \_\_name\_\_ == "\_\_main\_\_":

    diagnose\_proxy\_chain()

6.1.2 HTTPS问题排查

HTTPS解密故障排查清单:

1. 证书链验证:

   - 客户端是否安装Burpsuite证书

   - Burpsuite是否信任Yakit证书

   - Yakit是否信任Reqable证书

   - Reqable是否信任目标证书

2. 常见错误解决:

   - ERR\_CERT\_AUTHORITY\_INVALID: 重新安装根证书

   - ERR\_SSL\_VERSION\_OR\_CIPHER\_MISMATCH: 调整SSL/TLS设置

   - ERR\_CONNECTION\_RESET: 检查防火墙/杀毒软件

3. 应用层问题:

   - Android 7+ 需要将证书安装到系统分区

   - iOS 需要在设置中完全信任证书

   - 某些应用使用证书锁定(Pinning): 需要绕过

6.2 日常维护建议

6.2.1 配置备份脚本

#!/bin/bash

# backup\_proxy\_config.sh

BACKUP\_DIR="$HOME/proxy\_config\_backup/$(date +%Y%m%d\_%H%M%S)"

mkdir -p "$BACKUP\_DIR"

echo "备份三工具配置..."

# 备份Burpsuite配置

if [ -d "$HOME/.BurpSuite" ]; then

    cp -r "$HOME/.BurpSuite" "$BACKUP\_DIR/BurpSuite"

    echo "✓ Burpsuite配置已备份"

fi

# 备份Yakit配置

if [ -d "$HOME/.yakit" ]; then

    cp -r "$HOME/.yakit" "$BACKUP\_DIR/yakit"

    echo "✓ Yakit配置已备份"

fi

# 备份Reqable配置

if [ -d "$HOME/.reqable" ]; then

    cp -r "$HOME/.reqable" "$BACKUP\_DIR/reqable"

    echo "✓ Reqable配置已备份"

fi

# 备份证书

if [ -f "$HOME/Desktop/burp\_cert.der" ]; then

    cp "$HOME/Desktop/burp\_cert.der" "$BACKUP\_DIR/"

fi

echo "备份完成!位置: $BACKUP\_DIR"

# 创建恢复脚本

cat > "$BACKUP\_DIR/restore.sh" << 'EOF'

#!/bin/bash

echo "恢复代理配置..."

cp -r BurpSuite "$HOME/.BurpSuite"

cp -r yakit "$HOME/.yakit"

cp -r reqable "$HOME/.reqable"

cp burp\_cert.der "$HOME/Desktop/"

echo "恢复完成!请重新启动各工具。"

EOF

chmod +x "$BACKUP\_DIR/restore.sh"

6.2.2 性能监控

# performance\_monitor.py

import psutil

import time

import json

from datetime import datetime

class ProxyChainMonitor:

&nbsp; &nbsp; def \_\_init\_\_(self, interval=5):

&nbsp; &nbsp; &nbsp; &nbsp; self.interval = interval

&nbsp; &nbsp; &nbsp; &nbsp; self.metrics = []

&nbsp; &nbsp; &nbsp; &nbsp; self.process\_names = ['java', 'yakit', 'reqable']

&nbsp; &nbsp; def get\_process\_metrics(self):

&nbsp; &nbsp; &nbsp; &nbsp; """获取三工具进程指标"""

&nbsp; &nbsp; &nbsp; &nbsp; metrics = {

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'timestamp': datetime.now().isoformat(),

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'tools': {}

&nbsp; &nbsp; &nbsp; &nbsp; }

&nbsp; &nbsp; &nbsp; &nbsp; for proc in psutil.process\_iter(['name', 'memory\_percent', 'cpu\_percent']):

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try:

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; proc\_name = proc.info['name'].lower()

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for tool in self.process\_names:

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if tool in proc\_name:

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; metrics['tools'][tool] = {

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'memory\_mb': proc.memory\_info().rss / 1024 / 1024,

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'memory\_percent': proc.info['memory\_percent'],

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'cpu\_percent': proc.info['cpu\_percent'],

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'threads': proc.num\_threads()

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; except (psutil.NoSuchProcess, psutil.AccessDenied):

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; continue

&nbsp; &nbsp; &nbsp; &nbsp; return metrics

&nbsp; &nbsp; def monitor(self, duration=300):

&nbsp; &nbsp; &nbsp; &nbsp; """监控指定时长"""

&nbsp; &nbsp; &nbsp; &nbsp; start\_time = time.time()

&nbsp; &nbsp; &nbsp; &nbsp; print(f"开始监控代理链性能,时长: {duration}秒")

&nbsp; &nbsp; &nbsp; &nbsp; print("-" \* 50)

&nbsp; &nbsp; &nbsp; &nbsp; while time.time() - start\_time < duration:

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; metrics = self.get\_process\_metrics()

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.metrics.append(metrics)

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # 打印当前状态

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print(f"\n[{metrics['timestamp']}]")

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for tool, data in metrics['tools'].items():

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print(f"{tool.upper():10} | CPU: {data['cpu\_percent']:5.1f}% | "

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f"内存: {data['memory\_mb']:7.1f}MB")

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.sleep(self.interval)

&nbsp; &nbsp; &nbsp; &nbsp; self.save\_report()

&nbsp; &nbsp; def save\_report(self):

&nbsp; &nbsp; &nbsp; &nbsp; """保存监控报告"""

&nbsp; &nbsp; &nbsp; &nbsp; filename = f"proxy\_chain\_perf\_{datetime.now().strftime('%Y%m%d\_%H%M%S')}.json"

&nbsp; &nbsp; &nbsp; &nbsp; with open(filename, 'w') as f:

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; json.dump(self.metrics, f, indent=2)

&nbsp; &nbsp; &nbsp; &nbsp; print(f"\n监控报告已保存: {filename}")

&nbsp; &nbsp; &nbsp; &nbsp; # 生成摘要

&nbsp; &nbsp; &nbsp; &nbsp; self.generate\_summary()

&nbsp; &nbsp; def generate\_summary(self):

&nbsp; &nbsp; &nbsp; &nbsp; """生成性能摘要"""

&nbsp; &nbsp; &nbsp; &nbsp; if not self.metrics:

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return

&nbsp; &nbsp; &nbsp; &nbsp; print("\n" + "="\*50)

&nbsp; &nbsp; &nbsp; &nbsp; print("性能监控摘要")

&nbsp; &nbsp; &nbsp; &nbsp; print("="\*50)

&nbsp; &nbsp; &nbsp; &nbsp; for tool in self.process\_names:

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tool\_data = [m['tools'].get(tool, {}) for m in self.metrics if tool in m['tools']]

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if tool\_data:

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; avg\_cpu = sum(d.get('cpu\_percent', 0) for d in tool\_data) / len(tool\_data)

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; max\_mem = max(d.get('memory\_mb', 0) for d in tool\_data)

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print(f"{tool.upper():10} | 平均CPU: {avg\_cpu:5.1f}% | "

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f"峰值内存: {max\_mem:7.1f}MB")

if \_\_name\_\_ == '\_\_main\_\_':

&nbsp; &nbsp; monitor = ProxyChainMonitor(interval=10)

&nbsp; &nbsp; monitor.monitor(duration=600) &nbsp;# 监控10分钟

免责声明:

本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。

任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。

本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我

本文转载自:Alfadi组织 萧瑶 萧瑶《Burpsuite → Yakit → Reqable 三工具联动抓包:构建多层网络安全测试工作流》

评论:0   参与:  0