当AI写代码开始帮你引入第三方库,供应链安全该怎么守

admin 2026-03-09 01:29:29 网络安全文章 来源:ZONE.CI 全球网 0 阅读模式

文章总结: 文档针对AI编程引入第三方库带来的供应链安全风险,提出了一种基于ClaudeCodeHook机制与OpenSCA的实时防御方案。作者指出传统SCA扫描滞后,主张将检测左移至AI决策瞬间。通过编写Python脚本配置PreToolUse钩子,在AI执行Write、Edit或Bash命令前解析依赖信息,调用OpenSCA扫描漏洞并阻断高危操作。该方案支持Python、JS、Go等多生态,提供了详细的配置代码与逻辑实现,为防范AI编程引入幽灵依赖提供了可落地的自动化解决思路,具有较高的实战参考价值。 综合评分: 90 文章分类: AI安全,供应链安全,安全工具,安全开发,解决方案


cover_image

当 AI 写代码开始帮你引入第三方库,供应链安全该怎么守

原创

帅气的Jumbo 帅气的Jumbo

中国白客联盟

2026年3月8日 17:43 上海

前言

AI 已经慢慢成为工作的一部分,尤其在 IT 行业,利用 AI 写代码已是日常。但 AI 写的代码就完全可信、安全吗?传统的 SCA 扫描面对 AI 编程场景,是否有更好的解决方案?

最近看到腾讯的一篇文章《幽灵依赖:Agentic Coding 范式下的新型供应链安全威胁》,文中提出了一个有意思的思路——将安全检测左移至 Coding Agent 产生决策的瞬间,在 AI 真正执行依赖操作之前就完成拦截。

但是因为其插件是基于CodeBuddy Code,因此本文就基于这个思路,结合 Claude Code 的 Hook 机制和 OpenSCA,实现一个在 AI 写代码时实时拦截高危依赖的方案。

Claude Code Hook机制

Claude Code提供了完善的Hook机制,其中PreToolUse符合本文的要求,也就是在AI输出方案但是还没产生文件前,安全介入:

然后利用Exit code 2来实现阻止当前Claude Code的操作,那么思路就很清晰了,只要通过Hook PreToolUse,判断当出现高危组件时则终止任务。

Hook具体操作

首先在Claude Code的全局配置文件~/.claude/settings.json 中添加hook操作:

{  "hooks": {    "PreToolUse": [      {        "matcher": "Write",        "hooks": [          {            "type": "command",            "command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py"          }        ]      },      {        "matcher": "Edit",        "hooks": [          {            "type": "command",            "command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py"          }        ]      },      {        "matcher": "Bash",        "hooks": [          {            "type": "command",            "command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py"          }        ]      }    ]  }}

从几个方面来进行Hook来保证全面性:

EditWriteBASH

然后编写Hook代码,具体代码如下:

可访问 https://github.com/Jumbo-WJB/ai-code-guard 获取访问代码。

#!/usr/bin/env python3import&nbsp;sysimport&nbsp;jsonimport&nbsp;osimport&nbsp;reimport&nbsp;subprocessimport&nbsp;tempfileimport&nbsp;shutilfrom&nbsp;datetime&nbsp;import&nbsp;datetimefrom&nbsp;typing&nbsp;import&nbsp;Optional,&nbsp;Tuple# ── 配置区 ───────────────────────────────────────────────────────────────────OPENSCA_CLI &nbsp; = os.path.expanduser("~/Downloads/opensca-cli-v3.0.9-darwin-arm64/opensca-cli")OPENSCA_TOKEN =&nbsp;"your-token-here"OPENSCA_PROJ &nbsp;=&nbsp;"claude-hook-scan"BLOCK_LEVEL &nbsp; =&nbsp;2&nbsp;&nbsp;# 1=Critical, 2=High, 3=Medium, 4=LowLOG_FILE &nbsp; &nbsp; &nbsp;= os.path.expanduser("~/.claude/hooks/opensca_guard/opensca_guard.log")DEPENDENCY_FILES = {&nbsp; &nbsp;&nbsp;# Python - Pip&nbsp; &nbsp;&nbsp;"requirements.txt",&nbsp; &nbsp;&nbsp;"requirements.in",&nbsp; &nbsp;&nbsp;"requirements-dev.txt",&nbsp; &nbsp;&nbsp;"requirements-prod.txt",&nbsp; &nbsp;&nbsp;"setup.py",&nbsp; &nbsp;&nbsp;"Pipfile",&nbsp; &nbsp;&nbsp;"Pipfile.lock",&nbsp; &nbsp;&nbsp;# JavaScript - Npm&nbsp; &nbsp;&nbsp;"package.json",&nbsp; &nbsp;&nbsp;"package-lock.json",&nbsp; &nbsp;&nbsp;"yarn.lock",&nbsp; &nbsp;&nbsp;# Java - Maven&nbsp; &nbsp;&nbsp;"pom.xml",&nbsp; &nbsp;&nbsp;# Java - Gradle&nbsp; &nbsp;&nbsp;".gradle",&nbsp; &nbsp;&nbsp;".gradle.kts",&nbsp; &nbsp;&nbsp;# PHP - Composer&nbsp; &nbsp;&nbsp;"composer.json",&nbsp; &nbsp;&nbsp;"composer.lock",&nbsp; &nbsp;&nbsp;# Ruby - gem&nbsp; &nbsp;&nbsp;"Gemfile",&nbsp; &nbsp;&nbsp;"Gemfile.lock",&nbsp; &nbsp;&nbsp;# Golang - gomod&nbsp; &nbsp;&nbsp;"go.mod",&nbsp; &nbsp;&nbsp;"go.sum",&nbsp; &nbsp;&nbsp;"Gopkg.toml",&nbsp; &nbsp;&nbsp;"Gopkg.lock",&nbsp; &nbsp;&nbsp;# Rust - cargo&nbsp; &nbsp;&nbsp;"Cargo.toml",&nbsp; &nbsp;&nbsp;"Cargo.lock",&nbsp; &nbsp;&nbsp;# Erlang - Rebar&nbsp; &nbsp;&nbsp;"rebar.lock",&nbsp; &nbsp;&nbsp;"rebar.config",}INSTALL_CMD_PATTERNS = [&nbsp; &nbsp;&nbsp;# Python&nbsp; &nbsp;&nbsp;r"\bpip3?\s+install\b",&nbsp; &nbsp;&nbsp;r"\bpipenv\s+install\b",&nbsp; &nbsp;&nbsp;# JavaScript&nbsp; &nbsp;&nbsp;r"\bnpm\s+install\b",&nbsp; &nbsp;&nbsp;r"\bnpm\s+i\b",&nbsp; &nbsp;&nbsp;r"\byarn\s+add\b",&nbsp; &nbsp;&nbsp;r"\bpnpm\s+add\b",&nbsp; &nbsp;&nbsp;r"\bpnpm\s+install\b",&nbsp; &nbsp;&nbsp;# Java&nbsp; &nbsp;&nbsp;r"\bmvn\s+install\b",&nbsp; &nbsp;&nbsp;r"\bmvn\s+package\b",&nbsp; &nbsp;&nbsp;r"\bgradle\s+build\b",&nbsp; &nbsp;&nbsp;r"\bgradle\s+dependencies\b",&nbsp; &nbsp;&nbsp;# PHP&nbsp; &nbsp;&nbsp;r"\bcomposer\s+require\b",&nbsp; &nbsp;&nbsp;r"\bcomposer\s+install\b",&nbsp; &nbsp;&nbsp;# Ruby&nbsp; &nbsp;&nbsp;r"\bbundle\s+install\b",&nbsp; &nbsp;&nbsp;r"\bgem\s+install\b",&nbsp; &nbsp;&nbsp;# Golang&nbsp; &nbsp;&nbsp;r"\bgo\s+get\b",&nbsp; &nbsp;&nbsp;r"\bgo\s+install\b",&nbsp; &nbsp;&nbsp;# Rust&nbsp; &nbsp;&nbsp;r"\bcargo\s+add\b",&nbsp; &nbsp;&nbsp;r"\bcargo\s+install\b",&nbsp; &nbsp;&nbsp;r"\bcargo\s+build\b",&nbsp; &nbsp;&nbsp;# Erlang&nbsp; &nbsp;&nbsp;r"\brebar3?\s+get-deps\b",&nbsp; &nbsp;&nbsp;r"\brebar3?\s+compile\b",]LEVEL_NAMES = {1:&nbsp;"Critical",&nbsp;2:&nbsp;"High",&nbsp;3:&nbsp;"Medium",&nbsp;4:&nbsp;"Low"}# ── 日志 ─────────────────────────────────────────────────────────────────────def&nbsp;wlog(level:&nbsp;str, msg:&nbsp;str):&nbsp; &nbsp;&nbsp;with&nbsp;open(LOG_FILE,&nbsp;"a", encoding="utf-8")&nbsp;as&nbsp;f:&nbsp; &nbsp; &nbsp; &nbsp; ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")&nbsp; &nbsp; &nbsp; &nbsp; f.write(f"[{ts}][{level}]&nbsp;{msg}\n")def&nbsp;log(msg:&nbsp;str):&nbsp; &nbsp;&nbsp;print(f"[OpenSCA Guard]&nbsp;{msg}", file=sys.stderr)# ── 判断逻辑 ─────────────────────────────────────────────────────────────────def&nbsp;is_dependency_file(path:&nbsp;str) ->&nbsp;bool:&nbsp; &nbsp;&nbsp;return&nbsp;os.path.basename(path)&nbsp;in&nbsp;DEPENDENCY_FILESdef&nbsp;is_install_command(command:&nbsp;str) ->&nbsp;bool:&nbsp; &nbsp;&nbsp;return&nbsp;any(re.search(p, command, re.IGNORECASE)&nbsp;for&nbsp;p&nbsp;in&nbsp;INSTALL_CMD_PATTERNS)# ── 各生态 Bash 命令包名提取 ──────────────────────────────────────────────────def&nbsp;extract_reqs_from_pip_command(command:&nbsp;str) ->&nbsp;list:&nbsp; &nbsp;&nbsp;"""pip install django==4.2.7 requests -> ["django==4.2.7", "requests"]"""&nbsp; &nbsp; reqs = []&nbsp; &nbsp; m = re.search(r"pip3?\s+install\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)&nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;m:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;reqs&nbsp; &nbsp;&nbsp;for&nbsp;arg&nbsp;in&nbsp;m.group(1).split():&nbsp; &nbsp; &nbsp; &nbsp; arg = arg.strip()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;arg&nbsp;or&nbsp;arg.startswith("-")&nbsp;or&nbsp;arg.endswith((".txt",&nbsp;".cfg")):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue&nbsp; &nbsp; &nbsp; &nbsp; reqs.append(arg)&nbsp; &nbsp;&nbsp;return&nbsp;reqsdef&nbsp;extract_reqs_from_npm_command(command:&nbsp;str) ->&nbsp;list:&nbsp; &nbsp;&nbsp;"""npm install [email protected] -> ["[email protected]"]"""&nbsp; &nbsp; reqs = []&nbsp; &nbsp; m = re.search(r"npm\s+(?:install|i)\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)&nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;m:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;reqs&nbsp; &nbsp;&nbsp;for&nbsp;arg&nbsp;in&nbsp;m.group(1).split():&nbsp; &nbsp; &nbsp; &nbsp; arg = arg.strip()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;arg&nbsp;or&nbsp;arg.startswith("-"):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue&nbsp; &nbsp; &nbsp; &nbsp; reqs.append(arg)&nbsp; &nbsp;&nbsp;return&nbsp;reqsdef&nbsp;extract_reqs_from_yarn_command(command:&nbsp;str) ->&nbsp;list:&nbsp; &nbsp;&nbsp;"""yarn add [email protected] -> ["[email protected]"]"""&nbsp; &nbsp; reqs = []&nbsp; &nbsp; m = re.search(r"yarn\s+add\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)&nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;m:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;reqs&nbsp; &nbsp;&nbsp;for&nbsp;arg&nbsp;in&nbsp;m.group(1).split():&nbsp; &nbsp; &nbsp; &nbsp; arg = arg.strip()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;arg&nbsp;or&nbsp;arg.startswith("-"):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue&nbsp; &nbsp; &nbsp; &nbsp; reqs.append(arg)&nbsp; &nbsp;&nbsp;return&nbsp;reqsdef&nbsp;extract_reqs_from_go_command(command:&nbsp;str) ->&nbsp;list:&nbsp; &nbsp;&nbsp;"""go get github.com/gin-gonic/[email protected] -> ["github.com/gin-gonic/[email protected]"]"""&nbsp; &nbsp; reqs = []&nbsp; &nbsp; m = re.search(r"go\s+(?:get|install)\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)&nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;m:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;reqs&nbsp; &nbsp;&nbsp;for&nbsp;arg&nbsp;in&nbsp;m.group(1).split():&nbsp; &nbsp; &nbsp; &nbsp; arg = arg.strip()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;arg&nbsp;or&nbsp;arg.startswith("-"):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue&nbsp; &nbsp; &nbsp; &nbsp; reqs.append(arg)&nbsp; &nbsp;&nbsp;return&nbsp;reqsdef&nbsp;extract_reqs_from_cargo_command(command:&nbsp;str) ->&nbsp;list:&nbsp; &nbsp;&nbsp;"""cargo add [email protected] -> ["[email protected]"]"""&nbsp; &nbsp; reqs = []&nbsp; &nbsp; m = re.search(r"cargo\s+add\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)&nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;m:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;reqs&nbsp; &nbsp;&nbsp;for&nbsp;arg&nbsp;in&nbsp;m.group(1).split():&nbsp; &nbsp; &nbsp; &nbsp; arg = arg.strip()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;arg&nbsp;or&nbsp;arg.startswith("-"):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue&nbsp; &nbsp; &nbsp; &nbsp; reqs.append(arg)&nbsp; &nbsp;&nbsp;return&nbsp;reqsdef&nbsp;extract_reqs_from_composer_command(command:&nbsp;str) ->&nbsp;list:&nbsp; &nbsp;&nbsp;"""composer require laravel/framework:^10.0 -> ["laravel/framework:^10.0"]"""&nbsp; &nbsp; reqs = []&nbsp; &nbsp; m = re.search(r"composer\s+require\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)&nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;m:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;reqs&nbsp; &nbsp;&nbsp;for&nbsp;arg&nbsp;in&nbsp;m.group(1).split():&nbsp; &nbsp; &nbsp; &nbsp; arg = arg.strip()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;arg&nbsp;or&nbsp;arg.startswith("-"):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue&nbsp; &nbsp; &nbsp; &nbsp; reqs.append(arg)&nbsp; &nbsp;&nbsp;return&nbsp;reqs# ── Bash 提取器路由表(必须在所有 extract 函数定义之后)─────────────────────def&nbsp;_fmt_pip(pkgs:&nbsp;list) ->&nbsp;str:&nbsp; &nbsp;&nbsp;return&nbsp;"\n".join(pkgs)def&nbsp;_fmt_npm(pkgs:&nbsp;list) ->&nbsp;str:&nbsp; &nbsp; deps = {}&nbsp; &nbsp;&nbsp;for&nbsp;p&nbsp;in&nbsp;pkgs:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;"@"&nbsp;in&nbsp;p&nbsp;and&nbsp;not&nbsp;p.startswith("@"):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name, ver = p.split("@",&nbsp;1)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;elif&nbsp;p.startswith("@")&nbsp;and&nbsp;p.count("@") ==&nbsp;2:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 处理 @scope/pkg@version 格式&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; parts = p.rsplit("@",&nbsp;1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name, ver = parts[0], parts[1]&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name, ver = p,&nbsp;"latest"&nbsp; &nbsp; &nbsp; &nbsp; deps[name] = ver&nbsp; &nbsp;&nbsp;return&nbsp;json.dumps({"dependencies": deps}, indent=2)def&nbsp;_fmt_go(pkgs:&nbsp;list) ->&nbsp;str:&nbsp; &nbsp; requires =&nbsp;"\n".join(f"\t{p}"&nbsp;for&nbsp;p&nbsp;in&nbsp;pkgs)&nbsp; &nbsp;&nbsp;return&nbsp;f"module scan\n\ngo 1.21\n\nrequire (\n{requires}\n)"def&nbsp;_fmt_cargo(pkgs:&nbsp;list) ->&nbsp;str:&nbsp; &nbsp; lines = ["[dependencies]"]&nbsp; &nbsp;&nbsp;for&nbsp;p&nbsp;in&nbsp;pkgs:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;"@"&nbsp;in&nbsp;p:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name, ver = p.split("@",&nbsp;1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lines.append(f'{name}&nbsp;= "{ver}"')&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lines.append(f'{p}&nbsp;= "*"')&nbsp; &nbsp;&nbsp;return&nbsp;"\n".join(lines)def&nbsp;_fmt_composer(pkgs:&nbsp;list) ->&nbsp;str:&nbsp; &nbsp; deps = {}&nbsp; &nbsp;&nbsp;for&nbsp;p&nbsp;in&nbsp;pkgs:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;":"&nbsp;in&nbsp;p:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name, ver = p.split(":",&nbsp;1)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name, ver = p,&nbsp;"*"&nbsp; &nbsp; &nbsp; &nbsp; deps[name] = ver&nbsp; &nbsp;&nbsp;return&nbsp;json.dumps({"require": deps}, indent=2)BASH_EXTRACTORS = [&nbsp; &nbsp; (r"\bpip3?\s+install\b", &nbsp; &nbsp; &nbsp;extract_reqs_from_pip_command, &nbsp; &nbsp; &nbsp;"requirements.txt", _fmt_pip),&nbsp; &nbsp; (r"\bpipenv\s+install\b", &nbsp; &nbsp; extract_reqs_from_pip_command, &nbsp; &nbsp; &nbsp;"requirements.txt", _fmt_pip),&nbsp; &nbsp; (r"\bnpm\s+(?:install|i)\b", &nbsp;extract_reqs_from_npm_command, &nbsp; &nbsp; &nbsp;"package.json", &nbsp; &nbsp; _fmt_npm),&nbsp; &nbsp; (r"\byarn\s+add\b", &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; extract_reqs_from_yarn_command, &nbsp; &nbsp;&nbsp;"package.json", &nbsp; &nbsp; _fmt_npm),&nbsp; &nbsp; (r"\bgo\s+(?:get|install)\b", extract_reqs_from_go_command, &nbsp; &nbsp; &nbsp;&nbsp;"go.mod", &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _fmt_go),&nbsp; &nbsp; (r"\bcargo\s+add\b", &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;extract_reqs_from_cargo_command, &nbsp; &nbsp;"Cargo.toml", &nbsp; &nbsp; &nbsp; _fmt_cargo),&nbsp; &nbsp; (r"\bcomposer\s+require\b", &nbsp; extract_reqs_from_composer_command,&nbsp;"composer.json", &nbsp; &nbsp;_fmt_composer),]# ── 核心:处理三种工具的不同字段结构 ─────────────────────────────────────────def&nbsp;get_scan_target(tool_name:&nbsp;str, tool_input:&nbsp;dict) ->&nbsp;Optional[Tuple[str,&nbsp;str]]:&nbsp; &nbsp;&nbsp;"""&nbsp; &nbsp; 返回 (filename, content_to_scan) 或 None&nbsp; &nbsp; 三种工具的字段结构:&nbsp; &nbsp; - Write : {"file_path": "...", "content": "..."}&nbsp; &nbsp; - Edit &nbsp;: {"file_path": "...", "old_string": "...", "new_string": "..."}&nbsp; &nbsp; - Bash &nbsp;: {"command": "pip install django==4.2.7"}&nbsp; &nbsp; """&nbsp; &nbsp;&nbsp;# ── Write ────────────────────────────────────────────────&nbsp; &nbsp;&nbsp;if&nbsp;tool_name ==&nbsp;"Write":&nbsp; &nbsp; &nbsp; &nbsp; path &nbsp; &nbsp;= tool_input.get("file_path",&nbsp;"")&nbsp; &nbsp; &nbsp; &nbsp; content = tool_input.get("content",&nbsp;"")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;is_dependency_file(path):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wlog("DEBUG",&nbsp;f"Write 非依赖文件,跳过:&nbsp;{path}")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;None&nbsp; &nbsp; &nbsp; &nbsp; wlog("INFO",&nbsp;f"Write 依赖文件:&nbsp;{path}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;os.path.basename(path), content&nbsp; &nbsp;&nbsp;# ── Edit ─────────────────────────────────────────────────&nbsp; &nbsp;&nbsp;elif&nbsp;tool_name ==&nbsp;"Edit":&nbsp; &nbsp; &nbsp; &nbsp; path &nbsp; &nbsp; &nbsp; = tool_input.get("file_path",&nbsp;"")&nbsp; &nbsp; &nbsp; &nbsp; old_string = tool_input.get("old_string",&nbsp;"")&nbsp; &nbsp; &nbsp; &nbsp; new_string = tool_input.get("new_string",&nbsp;"")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;is_dependency_file(path):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wlog("DEBUG",&nbsp;f"Edit 非依赖文件,跳过:&nbsp;{path}")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;None&nbsp; &nbsp; &nbsp; &nbsp; wlog("INFO",&nbsp;f"Edit 依赖文件:&nbsp;{path}")&nbsp; &nbsp; &nbsp; &nbsp; wlog("DEBUG",&nbsp;f"old_string:&nbsp;{old_string}")&nbsp; &nbsp; &nbsp; &nbsp; wlog("DEBUG",&nbsp;f"new_string:&nbsp;{new_string}")&nbsp; &nbsp; &nbsp; &nbsp; full_content = get_post_edit_content(path, old_string, new_string)&nbsp; &nbsp; &nbsp; &nbsp; wlog("DEBUG",&nbsp;f"合并后完整内容:\n{full_content}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;os.path.basename(path), full_content&nbsp; &nbsp;&nbsp;# ── Bash ─────────────────────────────────────────────────&nbsp; &nbsp;&nbsp;elif&nbsp;tool_name ==&nbsp;"Bash":&nbsp; &nbsp; &nbsp; &nbsp; command = tool_input.get("command",&nbsp;"")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;pattern, extractor, filename, formatter&nbsp;in&nbsp;BASH_EXTRACTORS:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;re.search(pattern, command, re.IGNORECASE):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pkgs = extractor(command)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;pkgs:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wlog("WARN",&nbsp;f"未解析出包名:&nbsp;{command}")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;None&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; content = formatter(pkgs)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wlog("INFO",&nbsp;f"Bash 解析出包:&nbsp;{pkgs}&nbsp;->&nbsp;{filename}")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;filename, content&nbsp; &nbsp; &nbsp; &nbsp; wlog("DEBUG",&nbsp;f"Bash 非安装命令,跳过:&nbsp;{command[:80]}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;None&nbsp; &nbsp;&nbsp;return&nbsp;Nonedef&nbsp;get_post_edit_content(file_path:&nbsp;str, old_string:&nbsp;str, new_string:&nbsp;str) ->&nbsp;str:&nbsp; &nbsp;&nbsp;"""读取磁盘原文件,将 old_string 替换为 new_string,返回替换后的完整内容"""&nbsp; &nbsp;&nbsp;if&nbsp;os.path.exists(file_path):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;open(file_path,&nbsp;"r", encoding="utf-8")&nbsp;as&nbsp;f:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; original = f.read()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;old_string&nbsp;in&nbsp;original:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;original.replace(old_string, new_string,&nbsp;1)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wlog("WARN",&nbsp;"old_string 未在原文件中找到,使用 new_string 作为扫描内容")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;new_string&nbsp; &nbsp;&nbsp;else:&nbsp; &nbsp; &nbsp; &nbsp; wlog("INFO",&nbsp;"原文件不存在,使用 new_string 作为扫描内容")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;new_string# ── OpenSCA 扫描 ──────────────────────────────────────────────────────────────def&nbsp;run_opensca_scan(filename:&nbsp;str, content:&nbsp;str) ->&nbsp;Optional[dict]:&nbsp; &nbsp; tmpdir = tempfile.mkdtemp(prefix="opensca_hook_")&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; target_path = os.path.join(tmpdir, filename)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;open(target_path,&nbsp;"w", encoding="utf-8")&nbsp;as&nbsp;f:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f.write(content)&nbsp; &nbsp; &nbsp; &nbsp; wlog("INFO",&nbsp;f"临时扫描目录:&nbsp;{tmpdir}")&nbsp; &nbsp; &nbsp; &nbsp; result_path = os.path.join(tmpdir,&nbsp;"result.json")&nbsp; &nbsp; &nbsp; &nbsp; cmd = [&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; OPENSCA_CLI,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"-token", OPENSCA_TOKEN,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"-proj", &nbsp;OPENSCA_PROJ,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"-path", &nbsp;tmpdir,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"-out", &nbsp; result_path,&nbsp; &nbsp; &nbsp; &nbsp; ]&nbsp; &nbsp; &nbsp; &nbsp; wlog("INFO",&nbsp;f"执行命令:&nbsp;{' '.join(cmd)}")&nbsp; &nbsp; &nbsp; &nbsp; proc = subprocess.run(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cmd,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; capture_output=True,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; text=True,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout=60,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cwd=tmpdir, &nbsp;# 避免 opensca.log 等产物污染项目目录&nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; wlog("DEBUG",&nbsp;f"opensca stdout:&nbsp;{proc.stdout[:200]}")&nbsp; &nbsp; &nbsp; &nbsp; wlog("DEBUG",&nbsp;f"opensca stderr:&nbsp;{proc.stderr[:200]}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;os.path.exists(result_path):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wlog("WARN",&nbsp;"未生成 result.json")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;None&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;open(result_path,&nbsp;"r", encoding="utf-8")&nbsp;as&nbsp;f:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result = json.load(f)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 检测云端返回的错误信息&nbsp; &nbsp; &nbsp; &nbsp; task_error = result.get("task_info", {}).get("error",&nbsp;"")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;task_error:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wlog("WARN",&nbsp;f"opensca 云端报错:&nbsp;{task_error}")&nbsp; &nbsp; &nbsp; &nbsp; wlog("INFO",&nbsp;"扫描结果解析成功")&nbsp; &nbsp; &nbsp; &nbsp; wlog("DEBUG",&nbsp;f"result.json 原始内容:&nbsp;{json.dumps(result)[:500]}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;result&nbsp; &nbsp;&nbsp;except&nbsp;subprocess.TimeoutExpired:&nbsp; &nbsp; &nbsp; &nbsp; wlog("WARN",&nbsp;"opensca-cli 超时")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;None&nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:&nbsp; &nbsp; &nbsp; &nbsp; wlog("ERROR",&nbsp;f"扫描异常:&nbsp;{e}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;None&nbsp; &nbsp;&nbsp;finally:&nbsp; &nbsp; &nbsp; &nbsp; shutil.rmtree(tmpdir, ignore_errors=True)# ── 漏洞解析 ─────────────────────────────────────────────────────────────────def&nbsp;collect_vulnerabilities(scan_result:&nbsp;dict) ->&nbsp;list:&nbsp; &nbsp; vulns = []&nbsp; &nbsp;&nbsp;def&nbsp;walk(node:&nbsp;dict):&nbsp; &nbsp; &nbsp; &nbsp; pkg_name &nbsp; &nbsp;= node.get("name",&nbsp;"unknown")&nbsp; &nbsp; &nbsp; &nbsp; pkg_version = node.get("version",&nbsp;"unknown")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;v&nbsp;in&nbsp;node.get("vulnerabilities", []):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; level_id = v.get("security_level_id",&nbsp;4)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; vulns.append({&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"pkg_name": &nbsp; &nbsp;pkg_name,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"pkg_version": pkg_version,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"vuln_name": &nbsp; v.get("name",&nbsp;""),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"cve_id": &nbsp; &nbsp; &nbsp;v.get("cve_id",&nbsp;""),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"level_id": &nbsp; &nbsp;level_id,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"level_name": &nbsp;LEVEL_NAMES.get(level_id,&nbsp;f"Level{level_id}"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"suggestion": &nbsp;v.get("suggestion",&nbsp;""),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;child&nbsp;in&nbsp;node.get("children", []):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; walk(child)&nbsp; &nbsp; walk(scan_result)&nbsp; &nbsp;&nbsp;return&nbsp;vulnsdef&nbsp;build_block_message(vulns_to_block:&nbsp;list, all_vulns:&nbsp;list) ->&nbsp;str:&nbsp; &nbsp; lines = [&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"="&nbsp;*&nbsp;60,&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"🚨 供应链安全扫描发现高危漏洞,已阻断本次依赖操作",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"="&nbsp;*&nbsp;60,&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;f"\n共发现&nbsp;{len(all_vulns)}&nbsp;个漏洞,其中&nbsp;{len(vulns_to_block)}&nbsp;个达到阻断阈值(Critical/High):\n",&nbsp; &nbsp; ]&nbsp; &nbsp; pkg_map = {}&nbsp; &nbsp;&nbsp;for&nbsp;v&nbsp;in&nbsp;vulns_to_block:&nbsp; &nbsp; &nbsp; &nbsp; key =&nbsp;f"{v['pkg_name']}=={v['pkg_version']}"&nbsp; &nbsp; &nbsp; &nbsp; pkg_map.setdefault(key, []).append(v)&nbsp; &nbsp;&nbsp;for&nbsp;pkg_key, pkg_vulns&nbsp;in&nbsp;pkg_map.items():&nbsp; &nbsp; &nbsp; &nbsp; lines.append(f"📦&nbsp;{pkg_key}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;v&nbsp;in&nbsp;pkg_vulns:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lines.append(f" &nbsp; [{v['level_name']}]&nbsp;{v['cve_id']}&nbsp;—&nbsp;{v['vuln_name']}")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ver_match = re.search(r"(\d+\.\d+[\.\d]*)", v["suggestion"])&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;ver_match:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lines.append(f" &nbsp; 💡 建议升级至:&nbsp;{ver_match.group(1)}&nbsp;或更高版本")&nbsp; &nbsp; &nbsp; &nbsp; lines.append("")&nbsp; &nbsp; lines += ["─"&nbsp;*&nbsp;60,&nbsp;"⚡ 请将依赖修改为安全版本后重新执行。",&nbsp;"="&nbsp;*&nbsp;60]&nbsp; &nbsp;&nbsp;return&nbsp;"\n".join(lines)# ── 主流程 ───────────────────────────────────────────────────────────────────def&nbsp;main():&nbsp; &nbsp; wlog("INFO",&nbsp;"="&nbsp;*&nbsp;50)&nbsp; &nbsp; wlog("INFO",&nbsp;"Hook 被触发!")&nbsp; &nbsp; raw = sys.stdin.read()&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; data = json.loads(raw)&nbsp; &nbsp;&nbsp;except&nbsp;json.JSONDecodeError&nbsp;as&nbsp;e:&nbsp; &nbsp; &nbsp; &nbsp; wlog("ERROR",&nbsp;f"JSON 解析失败:&nbsp;{e}")&nbsp; &nbsp; &nbsp; &nbsp; sys.exit(0)&nbsp; &nbsp; tool_name &nbsp;= data.get("tool_name",&nbsp;"")&nbsp; &nbsp; tool_input = data.get("tool_input", {})&nbsp; &nbsp; wlog("INFO",&nbsp;f"tool_name={tool_name}")&nbsp; &nbsp; wlog("INFO",&nbsp;f"file_path={tool_input.get('file_path')&nbsp;or&nbsp;tool_input.get('command',&nbsp;'')[:80]}")&nbsp; &nbsp; scan_target = get_scan_target(tool_name, tool_input)&nbsp; &nbsp;&nbsp;if&nbsp;scan_target&nbsp;is&nbsp;None:&nbsp; &nbsp; &nbsp; &nbsp; sys.exit(0)&nbsp; &nbsp; filename, content = scan_target&nbsp; &nbsp; wlog("INFO",&nbsp;f"开始扫描:&nbsp;{filename}")&nbsp; &nbsp; scan_result = run_opensca_scan(filename, content)&nbsp; &nbsp;&nbsp;if&nbsp;scan_result&nbsp;is&nbsp;None:&nbsp; &nbsp; &nbsp; &nbsp; wlog("WARN",&nbsp;"扫描无结果,放行")&nbsp; &nbsp; &nbsp; &nbsp; sys.exit(0)&nbsp; &nbsp; all_vulns &nbsp; &nbsp; &nbsp;= collect_vulnerabilities(scan_result)&nbsp; &nbsp; vulns_to_block = [v&nbsp;for&nbsp;v&nbsp;in&nbsp;all_vulns&nbsp;if&nbsp;v["level_id"] <= BLOCK_LEVEL]&nbsp; &nbsp; wlog("INFO",&nbsp;f"漏洞总数={len(all_vulns)}, 阻断数={len(vulns_to_block)}")&nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;vulns_to_block:&nbsp; &nbsp; &nbsp; &nbsp; wlog("INFO",&nbsp;"✅ 无高危漏洞,放行")&nbsp; &nbsp; &nbsp; &nbsp; sys.exit(0)&nbsp; &nbsp; wlog("WARN",&nbsp;f"🚨 阻断!包:&nbsp;{set(v['pkg_name']&nbsp;for&nbsp;v&nbsp;in&nbsp;vulns_to_block)}")&nbsp; &nbsp; msg = build_block_message(vulns_to_block, all_vulns)&nbsp; &nbsp;&nbsp;print(msg, file=sys.stderr)&nbsp; &nbsp; sys.exit(2)if&nbsp;__name__ ==&nbsp;"__main__":&nbsp; &nbsp; main()

实现效果

当写入低版本的、存在漏洞的依赖包时,会触发Hook终止当前任务,并展示存在漏洞的依赖包和修复方案:

总结

AI 编程工具的普及极大提升了开发效率,但也带来了新的安全盲区——当 AI 开始替我们引入第三方依赖时,传统的事后扫描已经跟不上节奏。

本文基于 Claude Code 的 Hook 机制,结合 OpenSCA 实现了一套实时供应链安全拦截方案,核心思路是将安全检测的时机前移:

1.不等代码提交,在 AI 修改依赖文件的瞬间触发扫描

2.不等人工审查,由 Hook 自动完成漏洞识别与阻断

3.不放过任何入口,无论是直接写文件还是执行安装命令,均在拦截范围之内

整个方案的本质,是把安全审计员的角色嵌入到 Coding Agent 的决策链路中,让 AI 在”想引入一个有漏洞的依赖”时,在执行前就被叫停。


免责声明:

本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。

任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。

本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我

本文转载自:中国白客联盟 帅气的Jumbo 帅气的Jumbo《当 AI 写代码开始帮你引入第三方库,供应链安全该怎么守》

评论:0   参与:  0