ai-agent-security

SKILL.md

AI Agent 安全开发最佳实践

本技能基于生产级AI Agent防护方案整理,覆盖AI系统全生命周期的安全风险防控,符合等保2.0三级要求与数据安全法规定。

When to Activate

  • 开发企业级AI Agent应用
  • 对接敏感内部数据的AI系统
  • 实现代码执行能力的AI助手
  • 合规审计要求的AI系统建设
  • AI安全风险评估与加固

AI Agent 安全风险矩阵

风险类型 危害等级 典型场景
Prompt注入攻击 高危 诱导Agent执行恶意指令、绕过安全限制
敏感信息泄露 高危 Agent输出内部数据、密钥、用户隐私
恶意代码执行 极高危 Agent生成并执行恶意代码,控制服务器
数据投毒 中危 污染训练数据/知识库,导致Agent输出错误信息
越权访问 高危 Agent绕过权限控制,访问未授权资源
合规风险 中危 违反数据安全法、个人信息保护法等法规

四层安全防护架构

1. 输入层防护

Prompt注入检测

from typing import List
import re

class PromptInjectionDetector:
    def __init__(self):
        self.risk_patterns = [
            # 指令绕过模式
            r"(ignore|disregard|forget).*(previous|above|prior).*(instructions|prompt|rules)",
            r"(you are|act as|pretend to be).*(not|no longer).*(assistant|AI|bot)",
            r"(override|bypass|disable).*(security|safety|content).*(filters|policies|restrictions)",
            # 系统指令模式
            r"```system\s*",
            r"<\|system\|>",
            r"SYSTEM:",
            # 诱导输出模式
            r"(output|print|reveal|disclose).*(prompt|instructions|rules|system)",
            r"(show|tell|list).*(all|full|entire).*(prompt|context|memory)"
        ]
        self.suspicious_keywords = ["jailbreak", "DAN", "dev mode", "developer mode", "unrestricted"]
    
    def detect(self, prompt: str, threshold: float = 0.7) -> dict:
        risk_score = 0.0
        matched_patterns = []
        
        # 正则匹配检测
        for pattern in self.risk_patterns:
            if re.search(pattern, prompt, re.IGNORECASE):
                risk_score += 0.2
                matched_patterns.append(pattern)
        
        # 关键词检测
        for keyword in self.suspicious_keywords:
            if keyword.lower() in prompt.lower():
                risk_score += 0.15
        
        # 特殊字符检测
        special_char_ratio = len(re.findall(r'[^\w\s,.,。?!;:""''()()、]', prompt)) / len(prompt) if prompt else 0
        if special_char_ratio > 0.3:
            risk_score += 0.25
        
        return {
            "is_risk": risk_score >= threshold,
            "risk_score": risk_score,
            "matched_patterns": matched_patterns
        }

# 使用示例
detector = PromptInjectionDetector()
result = detector.detect(user_input)
if result["is_risk"]:
    raise SecurityError("疑似Prompt注入攻击,请求已拦截")

输入规范化

def normalize_input(prompt: str) -> str:
    # 移除特殊标记
    prompt = re.sub(r'<\|.*?\|>', '', prompt)
    # 移除系统指令关键词
    prompt = re.sub(r'(?i)\b(system|assistant|user):\s*', '', prompt)
    # 移除代码块标记
    prompt = re.sub(r'```[\s\S]*?```', '[CODE_BLOCK_REMOVED]', prompt)
    # 截断过长输入
    if len(prompt) > 4000:
        prompt = prompt[:4000] + "[TRUNCATED]"
    return prompt

2. 生成层防护

输出校验

import re
from typing import List

class OutputValidator:
    def __init__(self):
        self.sensitive_patterns = [
            # 密钥模式
            r'(sk_|api_key|secret|token|password)\s*[:=]\s*[\w-]+',
            # 隐私数据模式
            r'\b\d{11}\b',  # 手机号
            r'\b\d{18}\b',  # 身份证号
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # 邮箱
            # 内部信息模式
            r'(内部|机密|绝密|保密|敏感)',
            r'(内网|192\.168\.|10\.|172\.1[6-9]\.|172\.2[0-9]\.|172\.3[0-1]\.)'
        ]
    
    def validate(self, output: str) -> dict:
        risks = []
        cleaned_output = output
        
        # 敏感信息检测与脱敏
        for pattern in self.sensitive_patterns:
            matches = re.findall(pattern, output, re.IGNORECASE)
            if matches:
                risks.append(f"检测到敏感信息: {pattern}")
                # 脱敏处理
                cleaned_output = re.sub(pattern, '***', cleaned_output, flags=re.IGNORECASE)
        
        # 恶意代码检测
        malicious_code_patterns = [
            r'(rm\s+-rf|format\s+/|del\s+/f/s/q|reg\s+delete)',
            r'(curl|wget).*http.*\|.*sh',
            r'(eval|exec|system|popen|subprocess\.call)',
            r'(base64.*\|.*bash|bash.*<\(curl)'
        ]
        
        for pattern in malicious_code_patterns:
            if re.search(pattern, output, re.IGNORECASE):
                risks.append(f"检测到恶意代码: {pattern}")
                cleaned_output = "输出包含恶意代码,已拦截"
                break
        
        return {
            "is_risk": len(risks) > 0,
            "risks": risks,
            "cleaned_output": cleaned_output
        }

幻觉检测

def verify_facts(output: str, knowledge_base: List[str]) -> dict:
    """基于知识库验证输出事实正确性"""
    import spacy
    nlp = spacy.load("zh_core_web_sm")
    
    doc = nlp(output)
    facts = [ent.text for ent in doc.ents if ent.label_ in ["PERSON", "ORG", "DATE", "EVENT"]]
    
    unverified_facts = []
    for fact in facts:
        if not any(fact in kb_entry for kb_entry in knowledge_base):
            unverified_facts.append(fact)
    
    return {
        "has_hallucination": len(unverified_facts) > 0,
        "unverified_facts": unverified_facts,
        "confidence": 1.0 - (len(unverified_facts) / max(len(facts), 1))
    }

3. 执行层防护

代码执行沙箱

import subprocess
import tempfile
import os
from pathlib import Path

class CodeSandbox:
    def __init__(self, memory_limit: str = "256m", cpu_limit: float = 0.5, timeout: int = 10):
        self.memory_limit = memory_limit
        self.cpu_limit = cpu_limit
        self.timeout = timeout
    
    def execute(self, code: str, language: str = "python") -> dict:
        # 创建临时目录
        with tempfile.TemporaryDirectory() as tmpdir:
            tmpdir = Path(tmpdir)
            
            # 写入代码文件
            code_file = tmpdir / "code"
            code_file.write_text(code)
            
            try:
                if language == "python":
                    cmd = [
                        "docker", "run", "--rm",
                        "--memory", self.memory_limit,
                        "--cpus", str(self.cpu_limit),
                        "--network", "none",  # 禁用网络
                        "--read-only",        # 只读文件系统
                        "-v", f"{tmpdir}:/app",
                        "python:3.11-slim",
                        "python", "/app/code"
                    ]
                elif language == "javascript":
                    cmd = [
                        "docker", "run", "--rm",
                        "--memory", self.memory_limit,
                        "--cpus", str(self.cpu_limit),
                        "--network", "none",
                        "--read-only",
                        "-v", f"{tmpdir}:/app",
                        "node:20-slim",
                        "node", "/app/code"
                    ]
                else:
                    return {"error": "不支持的语言"}
                
                # 执行代码
                result = subprocess.run(
                    cmd,
                    capture_output=True,
                    text=True,
                    timeout=self.timeout
                )
                
                return {
                    "success": result.returncode == 0,
                    "stdout": result.stdout,
                    "stderr": result.stderr,
                    "returncode": result.returncode
                }
                
            except subprocess.TimeoutExpired:
                return {"error": "执行超时"}
            except Exception as e:
                return {"error": f"执行错误: {str(e)}"}

工具调用权限控制

from typing import Dict, List, Callable

class ToolAuthorization:
    def __init__(self):
        # 角色-权限映射
        self.role_permissions: Dict[str, List[str]] = {
            "admin": ["*"],
            "developer": ["code_execute", "git_*", "database_query"],
            "user": ["web_search", "file_read", "calculator"]
        }
        # 工具风险等级
        self.tool_risk: Dict[str, str] = {
            "code_execute": "high",
            "database_write": "high",
            "file_delete": "high",
            "email_send": "medium",
            "web_search": "low",
            "calculator": "low"
        }
    
    def check_permission(self, user_role: str, tool_name: str) -> bool:
        permissions = self.role_permissions.get(user_role, [])
        
        # 通配符匹配
        for perm in permissions:
            if perm == "*":
                return True
            if perm.endswith("*") and tool_name.startswith(perm[:-1]):
                return True
            if perm == tool_name:
                return True
        
        return False
    
    def require_mfa(self, tool_name: str) -> bool:
        """高风险工具需要二次验证"""
        return self.tool_risk.get(tool_name, "low") == "high"

4. 审计层防护

全链路审计日志

import json
from datetime import datetime
from typing import Any

class AuditLogger:
    def __init__(self, log_path: str = "audit.log"):
        self.log_path = log_path
    
    def log_event(self, 
                 event_type: str,
                 user_id: str,
                 session_id: str,
                 tool_name: str = None,
                 input_data: Any = None,
                 output_data: Any = None,
                 is_risk: bool = False,
                 risk_details: List[str] = None):
        event = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "user_id": user_id,
            "session_id": session_id,
            "tool_name": tool_name,
            "input_hash": hash(str(input_data)) if input_data else None,
            "output_hash": hash(str(output_data)) if output_data else None,
            "is_risk": is_risk,
            "risk_details": risk_details or []
        }
        
        with open(self.log_path, "a", encoding="utf-8") as f:
            f.write(json.dumps(event, ensure_ascii=False) + "\n")

# 使用示例
audit_logger = AuditLogger()
audit_logger.log_event(
    event_type="tool_call",
    user_id="user123",
    session_id="session456",
    tool_name="code_execute",
    input_data=user_code,
    output_data=execution_result,
    is_risk=False
)

合规要求实现

数据安全法合规

class DataCompliance:
    def __init__(self):
        self.sensitive_data_types = ["个人信息", "隐私数据", "商业秘密", "重要数据"]
    
    def data_processing_approval(self, data_type: str, processing_purpose: str) -> bool:
        """数据处理审批流程"""
        if data_type in self.sensitive_data_types:
            # 需要审批流程
            return self.check_approval_flow(data_type, processing_purpose)
        return True
    
    def data_retention_policy(self, data_type: str) -> int:
        """数据留存周期"""
        retention_policy = {
            "个人信息": 30,  # 30天
            "业务数据": 365,  # 1年
            "日志数据": 180,  # 6个月
            "审计数据": 365*3  # 3年
        }
        return retention_policy.get(data_type, 90)

个人信息保护法合规

def desensitize_personal_info(data: dict) -> dict:
    """个人信息脱敏"""
    if "phone" in data:
        data["phone"] = data["phone"][:3] + "****" + data["phone"][7:]
    if "id_card" in data:
        data["id_card"] = data["id_card"][:6] + "********" + data["id_card"][14:]
    if "email" in data:
        local, domain = data["email"].split("@")
        data["email"] = local[0] + "****@" + domain
    if "address" in data:
        data["address"] = data["address"][:3] + "****"
    return data

AI Agent安全检查清单

开发阶段

  • 实现Prompt注入检测与防护
  • 输出敏感信息自动脱敏
  • 代码执行能力通过沙箱隔离
  • 工具调用实现最小权限控制
  • 高风险操作需要二次验证
  • 全链路审计日志完备
  • 敏感数据加密存储与传输

测试阶段

  • 完成Prompt注入攻击测试(覆盖常见攻击模式)
  • 完成敏感信息泄露测试
  • 完成恶意代码执行测试
  • 完成越权访问测试
  • 性能压测下安全防护不失效
  • 异常场景下安全策略不绕过

运行阶段

  • 安全规则实时更新
  • 异常行为实时告警
  • 定期安全审计与漏洞扫描
  • 数据定期备份与恢复演练
  • 安全事件应急响应流程完备
  • 定期安全培训与意识提升
Weekly Installs
2
GitHub Stars
122
First Seen
7 days ago
Installed on
opencode2
gemini-cli2
deepagents2
antigravity2
claude-code2
github-copilot2