Построим RAG-систему без LangChain. Только numpy, sentence-transformers и httpx.
pip install numpy sentence-transformers httpx # Никаких LangChain/LlamaIndex — чистый RAG своими руками import numpy as np import httpx from sentence_transformers import SentenceTransformer embedder = SentenceTransformer("intfloat/multilingual-e5-large") API_KEY = "sk-..."
def chunk_text(text: str, chunk_size=512, overlap=128) -> list: """Разбиение текста на чанки с перекрытием — ключ к хорошему поиску.""" words = text.split() chunks = [] start = 0 while start < len(words): chunk_words = words[start:start + chunk_size] chunks.append(" ".join(chunk_words)) start += chunk_size - overlap return chunks # Пример: грузим документ и чанкуем with open("docs/knowledge_base.txt") as f: raw_text = f.read() all_chunks = chunk_text(raw_text) print(f"Чанков: {len(all_chunks)}")
chunk_embeddings = embedder.encode(all_chunks, normalize_embeddings=True) def vector_search(query: str, top_k=5) -> list[str]: """Поиск через косинусное сходство — матричное умножение одним вызовом.""" q_emb = embedder.encode(query, normalize_embeddings=True) scores = np.dot(chunk_embeddings, q_emb) # косинус = dot для нормализованных top_indices = np.argsort(scores)[-top_k:][::-1] return [all_chunks[i] for i in top_indices], scores[top_indices]
from sentence_transformers import CrossEncoder reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2") def rerank(query: str, candidates: list[str], top_n=3) -> list[str]: """Cross-encoder берёт пару (query, chunk) и даёт точный relevance score.""" pairs = [[query, c] for c in candidates] scores = reranker.predict(pairs) ranked = sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True) return [chunk for chunk, _ in ranked[:top_n]]
import json async def stream_answer(query: str, context_chunks: list[str]): """Стриминг ответа от LLM с контекстом — токен за токеном.""" context = " ".join(context_chunks) prompt = f"""Используй ТОЛЬКО этот контекст для ответа: {context} Вопрос: {query} Ответ:""" async with httpx.AsyncClient(timeout=60) as client: async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {API_KEY}"}, json={ "model": "gpt-4o-mini", "messages": [{"role": "user", "content": prompt}], "stream": True, }) as response: async for line in response.aiter_lines(): if line.startswith("data: "): data = json.loads(line[6:]) delta = data["choices"][0]["delta"] if "content" in delta: print(delta["content"], end="", flush=True) # Полный пайплайн: поиск → реранкинг → стриминг ответа import asyncio async def rag_pipeline(question: str): candidates, _ = vector_search(question, top_k=10) top_chunks = rerank(question, candidates, top_n=3) await stream_answer(question, top_chunks) asyncio.run(rag_pipeline("Как работает attention mechanism?"))