Безопасность AI-агентов: полный гайд

Практические примеры защиты: Docker-песочницы, валидация вывода, human-in-the-loop. Полный код на Python.

📊 Продвинутый⏱ 15 мин

# 1. STRIDE АНАЛИЗ УГРОЗ

# STRIDE — модель угроз Microsoft для Agent System
# S — Spoofing:       злоумышленник выдаёт себя за легитимного пользователя
# T — Tampering:      модификация tool output или системных промптов
# R — Repudiation:    невозможность отследить, кто инициировал опасное действие
# I — Info Disclosure: утечка системного промпта, ключей, данных пользователей
# D — DoS:            исчерпание токенов, бесконечный цикл tool calls
# E — Elevation:      повышение привилегий через несанкционированный доступ к tools

class ThreatModel:
    """STRIDE-анализ для AI-агента."""
    threats = {
        "Spoofing":       {"mitigation": "JWT auth + session validation"},
        "Tampering":      {"mitigation": "Output validator + checksums"},
        "Repudiation":    {"mitigation": "Immutable audit log (hash chain)"},
        "Info Disclosure": {"mitigation": "Redact secrets from logs, prompt isolation"},
        "DoS":            {"mitigation": "Rate limiting + max tokens + max steps"},
        "Elevation":      {"mitigation": "Least privilege + container sandbox"},
    }

# 2. INPUT SANITIZATION

import re
from html import escape

class InputSanitizer:
    """Фильтрация пользовательского ввода перед передачей агенту."""
    BLACKLIST = [
        r'\[system\]', r'\[SYSTEM\]',
        r'<script.*?>', r'</?script',
        r'IGNORE\s+(PREVIOUS|ALL|ABOVE)',
        r'OVERRIDE\s+(PREVIOUS|SYSTEM)',
        r'You are now',
        r'new instructions?:',
        r'\[OVERRIDE\]', r'\[/OVERRIDE\]',
        r'\.\./\.\./',  # path traversal
    ]

    @classmethod
    def sanitize(cls, user_input):
        """Проверка и очистка ввода."""
        for pattern in cls.BLACKLIST:
            if re.search(pattern, user_input, re.IGNORECASE):
                raise ValueError(f"Blocked: suspicious pattern in input")
        # HTML escape для предотвращения инъекций в UI (не в LLM)
        cleaned = escape(user_input)
        # Ограничение длины
        if len(cleaned) > 32000:
            raise ValueError("Input too long (max 32k chars)")
        return cleaned

# 3. OUTPUT VALIDATION

class OutputValidator:
    """Валидация вывода агента перед исполнением."""
    FORBIDDEN_COMMANDS = [
        'rm -rf', 'mkfs', 'dd if=', '> /dev/',
        'chmod 777', 'chown', 'shutdown', 'reboot',
        ':(){ :|:& };:', 'wget', 'curl', 'nc -',
    ]

    # Схема для tool call ответа
    TOOL_CALL_SCHEMA = {
        "type": "object",
        "properties": {
            "tool": {"type": "string", "enum": ["read", "write", "search", "calculate"]},
            "args": {"type": "object"}
        },
        "required": ["tool", "args"]
    }

    @classmethod
    def validate_command(cls, command):
        for forbidden in cls.FORBIDDEN_COMMANDS:
            if forbidden in command:
                return False, f"Forbidden: '{forbidden}'"
        return True, "OK"

    @classmethod
    def validate_tool_call(cls, output):
        try:
            parsed = json.loads(output)
            validate(parsed, cls.TOOL_CALL_SCHEMA)
            return True, parsed
        except Exception as e:
            return False, str(e)

# 4. PRIVILEGE SEPARATION

# Принцип least privilege: минимальные права для каждой части агента
# 1. LLM API клиент — только HTTP доступ к API эндпоинту
# 2. Tool executor — доступ только к разрешённым инструментам, readonly FS
# 3. Memory store — доступ только к своей коллекции ChromaDB

class LeastPrivilegeAgent:
    """Агент с разделением привилегий."""
    def __init__(self):
        # Каждый компонент имеет свой scope доступа
        self.allowed_dirs = ["/tmp/agent_workspace"]  # только рабочая папка
        self.allowed_tools = {"read_file", "search", "calculate"}  # whitelist
        self.max_file_size = 10 * 1024 * 1024  # 10 MB

    def can_access_path(self, path):
        import os
        real = os.path.realpath(path)
        return any(real.startswith(d) for d in self.allowed_dirs)

    def can_use_tool(self, tool_name):
        return tool_name in self.allowed_tools

# 5. RATE LIMITING И DOS ЗАЩИТА

import time
from collections import defaultdict

class RateLimiter:
    """Ограничение запросов: tool calls/мин, токенов/сессию."""
    def __init__(self, max_per_min=10, max_tokens_per_session=100000, max_steps=50):
        self.max_per_min = max_per_min
        self.max_tokens = max_tokens_per_session
        self.max_steps = max_steps
        self.windows = defaultdict(list)
        self.session_tokens = defaultdict(int)
        self.session_steps = defaultdict(int)

    def check(self, session_id, tokens_used=0):
        now = time.time()
        window = self.windows[session_id]
        # Удаляем старые записи
        window[:] = [t for t in window if now - t < 60]
        if len(window) >= self.max_per_min:
            raise Exception("Rate limit: too many requests")
        self.session_tokens[session_id] += tokens_used
        if self.session_tokens[session_id] > self.max_tokens:
            raise Exception("Session token limit exceeded")
        self.session_steps[session_id] += 1
        if self.session_steps[session_id] > self.max_steps:
            raise Exception("Max agent steps exceeded")
        window.append(now)

# 6. ПОЛНЫЙ БЕЗОПАСНЫЙ AGENT

class SafeAgent:
    """Продакшн-готовый агент с полной валидацией на каждом шаге."""
    def __init__(self):
        self.sanitizer    = InputSanitizer()
        self.validator    = OutputValidator()
        self.rate_limiter = RateLimiter(max_per_min=10)
        self.audit        = AuditLog()
        self.privileges   = LeastPrivilegeAgent()

    def run(self, user_input, session_id):
        # Шаг 1: Санитизация ввода
        cleaned = self.sanitizer.sanitize(user_input)

        # Шаг 2: Rate limit
        self.rate_limiter.check(session_id)

        # Шаг 3: LLM вызов (в песочнице)
        raw_output = self._call_llm(cleaned)

        # Шаг 4: Валидация tool call
        valid, tool_call = self.validator.validate_tool_call(raw_output)
        if not valid:
            raise Exception(f"Invalid tool call: {tool_call}")

        # Шаг 5: Проверка прав
        if not self.privileges.can_use_tool(tool_call['tool']):
            raise Exception(f"Forbidden tool: {tool_call['tool']}")

        # Шаг 6: Human-in-the-loop
        allowed, reason = human_in_the_loop(tool_call['tool'], tool_call['args'])
        if not allowed:
            raise Exception(reason)

        # Шаг 7: Выполнение с аудитом
        result = self._execute(tool_call)
        self.audit.log(session_id, "tool_exec", tool_call['tool'], tool_call['args'], result)
        return result

🔗 Полезные ссылки

📖 OWASP LLM Top 10📖 NIST AI Risk Framework📖 Guardrails AI📖 AI Safety Paper