Деплой AI-агента через Telegram с расширенными возможностями: file upload, streaming ответов, inline клавиатура.
# Два режима получения обновлений от Telegram: # # 1. LONG POLLING (для разработки и простых ботов): # ▸ Бот постоянно опрашивает Telegram: "есть новые сообщения?" # ▸ Простота: не нужен домен и HTTPS # ▸ Минус: задержка до 1-2 секунд, сложнее масштабировать # # 2. WEBHOOK (для продакшена): # ▸ Telegram сам отправляет обновления на ваш URL # ▸ Мгновенная доставка, легко масштабировать # ▸ Требуется: домен + HTTPS (SSL) # Streaming ответов: отправляем токены по мере генерации curl -N http://localhost:11434/api/generate \ -d '{"model":"llama3.1","prompt":"Напиши код на Python"}' # -N флаг отключает буферизацию → токены приходят по одному
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, CallbackQueryHandler # Команды бота async def task(update: Update, context): """Команда /task — создать новую задачу.""" task_text = " ".join(context.args) if not task_text: await update.message.reply_text( "Использование: /task <описание задачи>") return await update.message.reply_text( f"📝 Задача создана: {task_text[:200]}", reply_markup=_task_keyboard() ) # Inline клавиатура с кнопками действий def _task_keyboard(): keyboard = [ [InlineKeyboardButton("▶️ Старт", callback_data="start_task")], [InlineKeyboardButton("📊 Статус", callback_data="status"), InlineKeyboardButton("📋 Результат", callback_data="result")], [InlineKeyboardButton("❌ Отмена", callback_data="cancel")], ] return InlineKeyboardMarkup(keyboard) async def button_handler(update: Update, context): """Обработка нажатий на кнопки.""" query = update.callback_query await query.answer() data = query.data if data == "start_task": # Запуск задачи асинхронно await query.edit_message_text("🔄 Выполняю...") elif data == "cancel": await query.edit_message_text("❌ Задача отменена.")
import os import aiohttp from telegram.ext import MessageHandler, filters async def handle_document(update: Update, context): """Загрузка файла из Telegram → обработка агентом.""" doc = update.message.document file = await context.bot.get_file(doc.file_id) # Скачивание во временную директорию tmp_dir = f"/tmp/agent_files/{update.effective_user.id}" os.makedirs(tmp_dir, exist_ok=True) filepath = os.path.join(tmp_dir, doc.file_name) await file.download_to_drive(filepath) # Передаём агенту на обработку result = await process_with_agent(filepath, doc.mime_type) # Отправка результата обратно файлом with open(result, 'rb') as f: await update.message.reply_document( document=f, filename=f"result_{doc.file_name}", caption="✅ Обработано агентом" )
# Streaming: токены приходят → обновляем сообщение в Telegram import asyncio import aiohttp async def stream_to_telegram(prompt: str, chat_id: int, bot): """Потоковая отправка ответа LLM в Telegram.""" # 1. Отправляем пустое сообщение — будем редактировать msg = await bot.send_message(chat_id=chat_id, text="🤔 Думаю...") # 2. Показываем индикатор печати await bot.send_chat_action(chat_id=chat_id, action="typing") # 3. Стримим ответ от Ollama buffer = "" token_count = 0 async with aiohttp.ClientSession() as session: async with session.post( "http://localhost:11434/api/generate", json={"model": "llama3.1", "prompt": prompt} ) as resp: async for line in resp.content: token = json.loads(line).get("response", "") buffer += token token_count += 1 # Обновляем сообщение каждые 5 токенов if token_count % 5 == 0: try: await msg.edit_text(buffer + " ▍") except Exception: pass # игнорируем ошибки редактирования # 4. Финальное сообщение без курсора await msg.edit_text(buffer)
import sqlite3 import json from datetime import datetime class SessionStore: """SQLite-хранилище сессий пользователей.""" def __init__(self, db_path="sessions.db"): self.conn = sqlite3.connect(db_path) self._init_db() def _init_db(self): self.conn.execute("""CREATE TABLE IF NOT EXISTS sessions ( user_id INTEGER PRIMARY KEY, context TEXT NOT NULL DEFAULT '[]', model TEXT DEFAULT 'llama3.1', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )""") self.conn.commit() def save_context(self, user_id: int, messages: list): """Сохранение контекста диалога (последние N сообщений).""" context_json = json.dumps(messages[-50:]) self.conn.execute( """INSERT INTO sessions (user_id, context, updated_at) VALUES (?, ?, ?) ON CONFLICT(user_id) DO UPDATE SET context=excluded.context, updated_at=excluded.updated_at""", (user_id, context_json, datetime.now()) ) self.conn.commit() def load_context(self, user_id: int) -> list: """Загрузка контекста диалога пользователя.""" row = self.conn.execute( "SELECT context FROM sessions WHERE user_id=?", (user_id,) ).fetchone() if row: return json.loads(row[0]) return [] def clear_session(self, user_id: int): """Сброс сессии пользователя.""" self.conn.execute( "DELETE FROM sessions WHERE user_id=?", (user_id,)) self.conn.commit()
import asyncio from collections import deque class TaskQueue: """Очередь задач с пулом воркеров.""" def __init__(self, max_workers=4): self.queue = asyncio.Queue(maxsize=100) self.workers = [] self.max_workers = max_workers self._running = True async def start(self): """Запуск пула worker'ов.""" self.workers = [asyncio.create_task(self._worker(i)) for i in range(self.max_workers)] async def _worker(self, worker_id): while self._running: try: task, callback = await asyncio.wait_for( self.queue.get(), timeout=5.0) result = await execute_in_sandbox(task) await callback(result) except asyncio.TimeoutError: continue async def submit(self, task, callback): await self.queue.put((task, callback)) async def shutdown(self): """Graceful shutdown: завершаем текущие задачи.""" self._running = False for w in self.workers: w.cancel() await asyncio.gather(*self.workers, return_exceptions=True)