文章总结: 文档针对AI编程引入第三方库带来的供应链安全风险,提出了一种基于ClaudeCodeHook机制与OpenSCA的实时防御方案。作者指出传统SCA扫描滞后,主张将检测左移至AI决策瞬间。通过编写Python脚本配置PreToolUse钩子,在AI执行Write、Edit或Bash命令前解析依赖信息,调用OpenSCA扫描漏洞并阻断高危操作。该方案支持Python、JS、Go等多生态,提供了详细的配置代码与逻辑实现,为防范AI编程引入幽灵依赖提供了可落地的自动化解决思路,具有较高的实战参考价值。 综合评分: 90 文章分类: AI安全,供应链安全,安全工具,安全开发,解决方案
当 AI 写代码开始帮你引入第三方库,供应链安全该怎么守
原创
帅气的Jumbo 帅气的Jumbo
中国白客联盟
2026年3月8日 17:43 上海
前言
AI 已经慢慢成为工作的一部分,尤其在 IT 行业,利用 AI 写代码已是日常。但 AI 写的代码就完全可信、安全吗?传统的 SCA 扫描面对 AI 编程场景,是否有更好的解决方案?
最近看到腾讯的一篇文章《幽灵依赖:Agentic Coding 范式下的新型供应链安全威胁》,文中提出了一个有意思的思路——将安全检测左移至 Coding Agent 产生决策的瞬间,在 AI 真正执行依赖操作之前就完成拦截。
但是因为其插件是基于CodeBuddy Code,因此本文就基于这个思路,结合 Claude Code 的 Hook 机制和 OpenSCA,实现一个在 AI 写代码时实时拦截高危依赖的方案。
Claude Code Hook机制
Claude Code提供了完善的Hook机制,其中PreToolUse符合本文的要求,也就是在AI输出方案但是还没产生文件前,安全介入:
然后利用Exit code 2来实现阻止当前Claude Code的操作,那么思路就很清晰了,只要通过Hook PreToolUse,判断当出现高危组件时则终止任务。
Hook具体操作
首先在Claude Code的全局配置文件~/.claude/settings.json 中添加hook操作:
{ "hooks": { "PreToolUse": [ { "matcher": "Write", "hooks": [ { "type": "command", "command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py" } ] }, { "matcher": "Edit", "hooks": [ { "type": "command", "command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py" } ] }, { "matcher": "Bash", "hooks": [ { "type": "command", "command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py" } ] } ] }}
从几个方面来进行Hook来保证全面性:
EditWriteBASH
然后编写Hook代码,具体代码如下:
可访问 https://github.com/Jumbo-WJB/ai-code-guard 获取访问代码。
#!/usr/bin/env python3import sysimport jsonimport osimport reimport subprocessimport tempfileimport shutilfrom datetime import datetimefrom typing import Optional, Tuple# ── 配置区 ───────────────────────────────────────────────────────────────────OPENSCA_CLI = os.path.expanduser("~/Downloads/opensca-cli-v3.0.9-darwin-arm64/opensca-cli")OPENSCA_TOKEN = "your-token-here"OPENSCA_PROJ = "claude-hook-scan"BLOCK_LEVEL = 2 # 1=Critical, 2=High, 3=Medium, 4=LowLOG_FILE = os.path.expanduser("~/.claude/hooks/opensca_guard/opensca_guard.log")DEPENDENCY_FILES = { # Python - Pip "requirements.txt", "requirements.in", "requirements-dev.txt", "requirements-prod.txt", "setup.py", "Pipfile", "Pipfile.lock", # JavaScript - Npm "package.json", "package-lock.json", "yarn.lock", # Java - Maven "pom.xml", # Java - Gradle ".gradle", ".gradle.kts", # PHP - Composer "composer.json", "composer.lock", # Ruby - gem "Gemfile", "Gemfile.lock", # Golang - gomod "go.mod", "go.sum", "Gopkg.toml", "Gopkg.lock", # Rust - cargo "Cargo.toml", "Cargo.lock", # Erlang - Rebar "rebar.lock", "rebar.config",}INSTALL_CMD_PATTERNS = [ # Python r"\bpip3?\s+install\b", r"\bpipenv\s+install\b", # JavaScript r"\bnpm\s+install\b", r"\bnpm\s+i\b", r"\byarn\s+add\b", r"\bpnpm\s+add\b", r"\bpnpm\s+install\b", # Java r"\bmvn\s+install\b", r"\bmvn\s+package\b", r"\bgradle\s+build\b", r"\bgradle\s+dependencies\b", # PHP r"\bcomposer\s+require\b", r"\bcomposer\s+install\b", # Ruby r"\bbundle\s+install\b", r"\bgem\s+install\b", # Golang r"\bgo\s+get\b", r"\bgo\s+install\b", # Rust r"\bcargo\s+add\b", r"\bcargo\s+install\b", r"\bcargo\s+build\b", # Erlang r"\brebar3?\s+get-deps\b", r"\brebar3?\s+compile\b",]LEVEL_NAMES = {1: "Critical", 2: "High", 3: "Medium", 4: "Low"}# ── 日志 ─────────────────────────────────────────────────────────────────────def wlog(level: str, msg: str): with open(LOG_FILE, "a", encoding="utf-8") as f: ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") f.write(f"[{ts}][{level}] {msg}\n")def log(msg: str): print(f"[OpenSCA Guard] {msg}", file=sys.stderr)# ── 判断逻辑 ─────────────────────────────────────────────────────────────────def is_dependency_file(path: str) -> bool: return os.path.basename(path) in DEPENDENCY_FILESdef is_install_command(command: str) -> bool: return any(re.search(p, command, re.IGNORECASE) for p in INSTALL_CMD_PATTERNS)# ── 各生态 Bash 命令包名提取 ──────────────────────────────────────────────────def extract_reqs_from_pip_command(command: str) -> list: """pip install django==4.2.7 requests -> ["django==4.2.7", "requests"]""" reqs = [] m = re.search(r"pip3?\s+install\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command) if not m: return reqs for arg in m.group(1).split(): arg = arg.strip() if not arg or arg.startswith("-") or arg.endswith((".txt", ".cfg")): continue reqs.append(arg) return reqsdef extract_reqs_from_npm_command(command: str) -> list: """npm install [email protected] -> ["[email protected]"]""" reqs = [] m = re.search(r"npm\s+(?:install|i)\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command) if not m: return reqs for arg in m.group(1).split(): arg = arg.strip() if not arg or arg.startswith("-"): continue reqs.append(arg) return reqsdef extract_reqs_from_yarn_command(command: str) -> list: """yarn add [email protected] -> ["[email protected]"]""" reqs = [] m = re.search(r"yarn\s+add\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command) if not m: return reqs for arg in m.group(1).split(): arg = arg.strip() if not arg or arg.startswith("-"): continue reqs.append(arg) return reqsdef extract_reqs_from_go_command(command: str) -> list: """go get github.com/gin-gonic/[email protected] -> ["github.com/gin-gonic/[email protected]"]""" reqs = [] m = re.search(r"go\s+(?:get|install)\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command) if not m: return reqs for arg in m.group(1).split(): arg = arg.strip() if not arg or arg.startswith("-"): continue reqs.append(arg) return reqsdef extract_reqs_from_cargo_command(command: str) -> list: """cargo add [email protected] -> ["[email protected]"]""" reqs = [] m = re.search(r"cargo\s+add\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command) if not m: return reqs for arg in m.group(1).split(): arg = arg.strip() if not arg or arg.startswith("-"): continue reqs.append(arg) return reqsdef extract_reqs_from_composer_command(command: str) -> list: """composer require laravel/framework:^10.0 -> ["laravel/framework:^10.0"]""" reqs = [] m = re.search(r"composer\s+require\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command) if not m: return reqs for arg in m.group(1).split(): arg = arg.strip() if not arg or arg.startswith("-"): continue reqs.append(arg) return reqs# ── Bash 提取器路由表(必须在所有 extract 函数定义之后)─────────────────────def _fmt_pip(pkgs: list) -> str: return "\n".join(pkgs)def _fmt_npm(pkgs: list) -> str: deps = {} for p in pkgs: if "@" in p and not p.startswith("@"): name, ver = p.split("@", 1) elif p.startswith("@") and p.count("@") == 2: # 处理 @scope/pkg@version 格式 parts = p.rsplit("@", 1) name, ver = parts[0], parts[1] else: name, ver = p, "latest" deps[name] = ver return json.dumps({"dependencies": deps}, indent=2)def _fmt_go(pkgs: list) -> str: requires = "\n".join(f"\t{p}" for p in pkgs) return f"module scan\n\ngo 1.21\n\nrequire (\n{requires}\n)"def _fmt_cargo(pkgs: list) -> str: lines = ["[dependencies]"] for p in pkgs: if "@" in p: name, ver = p.split("@", 1) lines.append(f'{name} = "{ver}"') else: lines.append(f'{p} = "*"') return "\n".join(lines)def _fmt_composer(pkgs: list) -> str: deps = {} for p in pkgs: if ":" in p: name, ver = p.split(":", 1) else: name, ver = p, "*" deps[name] = ver return json.dumps({"require": deps}, indent=2)BASH_EXTRACTORS = [ (r"\bpip3?\s+install\b", extract_reqs_from_pip_command, "requirements.txt", _fmt_pip), (r"\bpipenv\s+install\b", extract_reqs_from_pip_command, "requirements.txt", _fmt_pip), (r"\bnpm\s+(?:install|i)\b", extract_reqs_from_npm_command, "package.json", _fmt_npm), (r"\byarn\s+add\b", extract_reqs_from_yarn_command, "package.json", _fmt_npm), (r"\bgo\s+(?:get|install)\b", extract_reqs_from_go_command, "go.mod", _fmt_go), (r"\bcargo\s+add\b", extract_reqs_from_cargo_command, "Cargo.toml", _fmt_cargo), (r"\bcomposer\s+require\b", extract_reqs_from_composer_command, "composer.json", _fmt_composer),]# ── 核心:处理三种工具的不同字段结构 ─────────────────────────────────────────def get_scan_target(tool_name: str, tool_input: dict) -> Optional[Tuple[str, str]]: """ 返回 (filename, content_to_scan) 或 None 三种工具的字段结构: - Write : {"file_path": "...", "content": "..."} - Edit : {"file_path": "...", "old_string": "...", "new_string": "..."} - Bash : {"command": "pip install django==4.2.7"} """ # ── Write ──────────────────────────────────────────────── if tool_name == "Write": path = tool_input.get("file_path", "") content = tool_input.get("content", "") if not is_dependency_file(path): wlog("DEBUG", f"Write 非依赖文件,跳过: {path}") return None wlog("INFO", f"Write 依赖文件: {path}") return os.path.basename(path), content # ── Edit ───────────────────────────────────────────────── elif tool_name == "Edit": path = tool_input.get("file_path", "") old_string = tool_input.get("old_string", "") new_string = tool_input.get("new_string", "") if not is_dependency_file(path): wlog("DEBUG", f"Edit 非依赖文件,跳过: {path}") return None wlog("INFO", f"Edit 依赖文件: {path}") wlog("DEBUG", f"old_string: {old_string}") wlog("DEBUG", f"new_string: {new_string}") full_content = get_post_edit_content(path, old_string, new_string) wlog("DEBUG", f"合并后完整内容:\n{full_content}") return os.path.basename(path), full_content # ── Bash ───────────────────────────────────────────────── elif tool_name == "Bash": command = tool_input.get("command", "") for pattern, extractor, filename, formatter in BASH_EXTRACTORS: if re.search(pattern, command, re.IGNORECASE): pkgs = extractor(command) if not pkgs: wlog("WARN", f"未解析出包名: {command}") return None content = formatter(pkgs) wlog("INFO", f"Bash 解析出包: {pkgs} -> {filename}") return filename, content wlog("DEBUG", f"Bash 非安装命令,跳过: {command[:80]}") return None return Nonedef get_post_edit_content(file_path: str, old_string: str, new_string: str) -> str: """读取磁盘原文件,将 old_string 替换为 new_string,返回替换后的完整内容""" if os.path.exists(file_path): with open(file_path, "r", encoding="utf-8") as f: original = f.read() if old_string in original: return original.replace(old_string, new_string, 1) else: wlog("WARN", "old_string 未在原文件中找到,使用 new_string 作为扫描内容") return new_string else: wlog("INFO", "原文件不存在,使用 new_string 作为扫描内容") return new_string# ── OpenSCA 扫描 ──────────────────────────────────────────────────────────────def run_opensca_scan(filename: str, content: str) -> Optional[dict]: tmpdir = tempfile.mkdtemp(prefix="opensca_hook_") try: target_path = os.path.join(tmpdir, filename) with open(target_path, "w", encoding="utf-8") as f: f.write(content) wlog("INFO", f"临时扫描目录: {tmpdir}") result_path = os.path.join(tmpdir, "result.json") cmd = [ OPENSCA_CLI, "-token", OPENSCA_TOKEN, "-proj", OPENSCA_PROJ, "-path", tmpdir, "-out", result_path, ] wlog("INFO", f"执行命令: {' '.join(cmd)}") proc = subprocess.run( cmd, capture_output=True, text=True, timeout=60, cwd=tmpdir, # 避免 opensca.log 等产物污染项目目录 ) wlog("DEBUG", f"opensca stdout: {proc.stdout[:200]}") wlog("DEBUG", f"opensca stderr: {proc.stderr[:200]}") if not os.path.exists(result_path): wlog("WARN", "未生成 result.json") return None with open(result_path, "r", encoding="utf-8") as f: result = json.load(f) # 检测云端返回的错误信息 task_error = result.get("task_info", {}).get("error", "") if task_error: wlog("WARN", f"opensca 云端报错: {task_error}") wlog("INFO", "扫描结果解析成功") wlog("DEBUG", f"result.json 原始内容: {json.dumps(result)[:500]}") return result except subprocess.TimeoutExpired: wlog("WARN", "opensca-cli 超时") return None except Exception as e: wlog("ERROR", f"扫描异常: {e}") return None finally: shutil.rmtree(tmpdir, ignore_errors=True)# ── 漏洞解析 ─────────────────────────────────────────────────────────────────def collect_vulnerabilities(scan_result: dict) -> list: vulns = [] def walk(node: dict): pkg_name = node.get("name", "unknown") pkg_version = node.get("version", "unknown") for v in node.get("vulnerabilities", []): level_id = v.get("security_level_id", 4) vulns.append({ "pkg_name": pkg_name, "pkg_version": pkg_version, "vuln_name": v.get("name", ""), "cve_id": v.get("cve_id", ""), "level_id": level_id, "level_name": LEVEL_NAMES.get(level_id, f"Level{level_id}"), "suggestion": v.get("suggestion", ""), }) for child in node.get("children", []): walk(child) walk(scan_result) return vulnsdef build_block_message(vulns_to_block: list, all_vulns: list) -> str: lines = [ "=" * 60, "🚨 供应链安全扫描发现高危漏洞,已阻断本次依赖操作", "=" * 60, f"\n共发现 {len(all_vulns)} 个漏洞,其中 {len(vulns_to_block)} 个达到阻断阈值(Critical/High):\n", ] pkg_map = {} for v in vulns_to_block: key = f"{v['pkg_name']}=={v['pkg_version']}" pkg_map.setdefault(key, []).append(v) for pkg_key, pkg_vulns in pkg_map.items(): lines.append(f"📦 {pkg_key}") for v in pkg_vulns: lines.append(f" [{v['level_name']}] {v['cve_id']} — {v['vuln_name']}") ver_match = re.search(r"(\d+\.\d+[\.\d]*)", v["suggestion"]) if ver_match: lines.append(f" 💡 建议升级至: {ver_match.group(1)} 或更高版本") lines.append("") lines += ["─" * 60, "⚡ 请将依赖修改为安全版本后重新执行。", "=" * 60] return "\n".join(lines)# ── 主流程 ───────────────────────────────────────────────────────────────────def main(): wlog("INFO", "=" * 50) wlog("INFO", "Hook 被触发!") raw = sys.stdin.read() try: data = json.loads(raw) except json.JSONDecodeError as e: wlog("ERROR", f"JSON 解析失败: {e}") sys.exit(0) tool_name = data.get("tool_name", "") tool_input = data.get("tool_input", {}) wlog("INFO", f"tool_name={tool_name}") wlog("INFO", f"file_path={tool_input.get('file_path') or tool_input.get('command', '')[:80]}") scan_target = get_scan_target(tool_name, tool_input) if scan_target is None: sys.exit(0) filename, content = scan_target wlog("INFO", f"开始扫描: {filename}") scan_result = run_opensca_scan(filename, content) if scan_result is None: wlog("WARN", "扫描无结果,放行") sys.exit(0) all_vulns = collect_vulnerabilities(scan_result) vulns_to_block = [v for v in all_vulns if v["level_id"] <= BLOCK_LEVEL] wlog("INFO", f"漏洞总数={len(all_vulns)}, 阻断数={len(vulns_to_block)}") if not vulns_to_block: wlog("INFO", "✅ 无高危漏洞,放行") sys.exit(0) wlog("WARN", f"🚨 阻断!包: {set(v['pkg_name'] for v in vulns_to_block)}") msg = build_block_message(vulns_to_block, all_vulns) print(msg, file=sys.stderr) sys.exit(2)if __name__ == "__main__": main()
实现效果
当写入低版本的、存在漏洞的依赖包时,会触发Hook终止当前任务,并展示存在漏洞的依赖包和修复方案:
总结
AI 编程工具的普及极大提升了开发效率,但也带来了新的安全盲区——当 AI 开始替我们引入第三方依赖时,传统的事后扫描已经跟不上节奏。
本文基于 Claude Code 的 Hook 机制,结合 OpenSCA 实现了一套实时供应链安全拦截方案,核心思路是将安全检测的时机前移:
1.不等代码提交,在 AI 修改依赖文件的瞬间触发扫描
2.不等人工审查,由 Hook 自动完成漏洞识别与阻断
3.不放过任何入口,无论是直接写文件还是执行安装命令,均在拦截范围之内
整个方案的本质,是把安全审计员的角色嵌入到 Coding Agent 的决策链路中,让 AI 在”想引入一个有漏洞的依赖”时,在执行前就被叫停。
免责声明:
本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。
任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。
本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我。
本文转载自:中国白客联盟 帅气的Jumbo 帅气的Jumbo《当 AI 写代码开始帮你引入第三方库,供应链安全该怎么守》
版权声明
本站仅做备份收录,仅供研究与教学参考之用。
读者将信息用于其他用途的,全部法律及连带责任由读者自行承担,本站不承担任何责任。









评论