Практические примеры защиты: Docker-песочницы, валидация вывода, human-in-the-loop. Полный код на Python.
# STRIDE — модель угроз Microsoft для Agent System # S — Spoofing: злоумышленник выдаёт себя за легитимного пользователя # T — Tampering: модификация tool output или системных промптов # R — Repudiation: невозможность отследить, кто инициировал опасное действие # I — Info Disclosure: утечка системного промпта, ключей, данных пользователей # D — DoS: исчерпание токенов, бесконечный цикл tool calls # E — Elevation: повышение привилегий через несанкционированный доступ к tools class ThreatModel: """STRIDE-анализ для AI-агента.""" threats = { "Spoofing": {"mitigation": "JWT auth + session validation"}, "Tampering": {"mitigation": "Output validator + checksums"}, "Repudiation": {"mitigation": "Immutable audit log (hash chain)"}, "Info Disclosure": {"mitigation": "Redact secrets from logs, prompt isolation"}, "DoS": {"mitigation": "Rate limiting + max tokens + max steps"}, "Elevation": {"mitigation": "Least privilege + container sandbox"}, }
import re from html import escape class InputSanitizer: """Фильтрация пользовательского ввода перед передачей агенту.""" BLACKLIST = [ r'\[system\]', r'\[SYSTEM\]', r'<script.*?>', r'</?script', r'IGNORE\s+(PREVIOUS|ALL|ABOVE)', r'OVERRIDE\s+(PREVIOUS|SYSTEM)', r'You are now', r'new instructions?:', r'\[OVERRIDE\]', r'\[/OVERRIDE\]', r'\.\./\.\./', # path traversal ] @classmethod def sanitize(cls, user_input): """Проверка и очистка ввода.""" for pattern in cls.BLACKLIST: if re.search(pattern, user_input, re.IGNORECASE): raise ValueError(f"Blocked: suspicious pattern in input") # HTML escape для предотвращения инъекций в UI (не в LLM) cleaned = escape(user_input) # Ограничение длины if len(cleaned) > 32000: raise ValueError("Input too long (max 32k chars)") return cleaned
class OutputValidator: """Валидация вывода агента перед исполнением.""" FORBIDDEN_COMMANDS = [ 'rm -rf', 'mkfs', 'dd if=', '> /dev/', 'chmod 777', 'chown', 'shutdown', 'reboot', ':(){ :|:& };:', 'wget', 'curl', 'nc -', ] # Схема для tool call ответа TOOL_CALL_SCHEMA = { "type": "object", "properties": { "tool": {"type": "string", "enum": ["read", "write", "search", "calculate"]}, "args": {"type": "object"} }, "required": ["tool", "args"] } @classmethod def validate_command(cls, command): for forbidden in cls.FORBIDDEN_COMMANDS: if forbidden in command: return False, f"Forbidden: '{forbidden}'" return True, "OK" @classmethod def validate_tool_call(cls, output): try: parsed = json.loads(output) validate(parsed, cls.TOOL_CALL_SCHEMA) return True, parsed except Exception as e: return False, str(e)
# Принцип least privilege: минимальные права для каждой части агента # 1. LLM API клиент — только HTTP доступ к API эндпоинту # 2. Tool executor — доступ только к разрешённым инструментам, readonly FS # 3. Memory store — доступ только к своей коллекции ChromaDB class LeastPrivilegeAgent: """Агент с разделением привилегий.""" def __init__(self): # Каждый компонент имеет свой scope доступа self.allowed_dirs = ["/tmp/agent_workspace"] # только рабочая папка self.allowed_tools = {"read_file", "search", "calculate"} # whitelist self.max_file_size = 10 * 1024 * 1024 # 10 MB def can_access_path(self, path): import os real = os.path.realpath(path) return any(real.startswith(d) for d in self.allowed_dirs) def can_use_tool(self, tool_name): return tool_name in self.allowed_tools
import time from collections import defaultdict class RateLimiter: """Ограничение запросов: tool calls/мин, токенов/сессию.""" def __init__(self, max_per_min=10, max_tokens_per_session=100000, max_steps=50): self.max_per_min = max_per_min self.max_tokens = max_tokens_per_session self.max_steps = max_steps self.windows = defaultdict(list) self.session_tokens = defaultdict(int) self.session_steps = defaultdict(int) def check(self, session_id, tokens_used=0): now = time.time() window = self.windows[session_id] # Удаляем старые записи window[:] = [t for t in window if now - t < 60] if len(window) >= self.max_per_min: raise Exception("Rate limit: too many requests") self.session_tokens[session_id] += tokens_used if self.session_tokens[session_id] > self.max_tokens: raise Exception("Session token limit exceeded") self.session_steps[session_id] += 1 if self.session_steps[session_id] > self.max_steps: raise Exception("Max agent steps exceeded") window.append(now)
class SafeAgent: """Продакшн-готовый агент с полной валидацией на каждом шаге.""" def __init__(self): self.sanitizer = InputSanitizer() self.validator = OutputValidator() self.rate_limiter = RateLimiter(max_per_min=10) self.audit = AuditLog() self.privileges = LeastPrivilegeAgent() def run(self, user_input, session_id): # Шаг 1: Санитизация ввода cleaned = self.sanitizer.sanitize(user_input) # Шаг 2: Rate limit self.rate_limiter.check(session_id) # Шаг 3: LLM вызов (в песочнице) raw_output = self._call_llm(cleaned) # Шаг 4: Валидация tool call valid, tool_call = self.validator.validate_tool_call(raw_output) if not valid: raise Exception(f"Invalid tool call: {tool_call}") # Шаг 5: Проверка прав if not self.privileges.can_use_tool(tool_call['tool']): raise Exception(f"Forbidden tool: {tool_call['tool']}") # Шаг 6: Human-in-the-loop allowed, reason = human_in_the_loop(tool_call['tool'], tool_call['args']) if not allowed: raise Exception(reason) # Шаг 7: Выполнение с аудитом result = self._execute(tool_call) self.audit.log(session_id, "tool_exec", tool_call['tool'], tool_call['args'], result) return result