Сообщить об ошибке.

Обвязка вокруг поиска для ассистентов / RAG

Содержание:

Цель: Построить поверх уже существующего поиска слой "достать контекст для LLM":

  • нарезать страницы на куски (chunks);
  • хранить куски в отдельной коллекции с эмбеддингами;
  • сделать удобный Python-интерфейс rag_retrieve(query);
  • добавить HTTP-endpoint /api/rag-search для внешнего ассистента.

Архитектура RAG поверх Typesense

Логика такая:

  1. У нас уже есть коллекция pages для сайта (поиск для людей).
  2. Для ассистента делаем отдельную коллекцию page_chunks, где каждый документ - это кусок текста (часть страницы).
  3. В page_chunks хранится:
    • текст фрагмента (500–1500 символов);
    • ссылка на исходную страницу (page_id, slug, title);
    • позиция фрагмента на странице (chunk_index);
    • эмбеддинг для семантического поиска.
  4. Ассистент, получив вопрос пользователя:
    • отправляет запрос к нашему RAG-endpoint’у;
    • тот ищет релевантные куски через Typesense (vector/hybrid);
    • возвращает ассистенту текст + метаданные (URL, заголовок);
    • ассистент вставляет это в промпт и генерирует ответ.

Схема коллекции для RAG: page_chunks

Пример схемы:

# schema_page_chunks.py
page_chunks_schema = {
    "name": "page_chunks",
    "fields": [
        {"name": "id", "type": "int64"},
        {"name": "page_id", "type": "int32"},
        {"name": "slug", "type": "string"},
        {"name": "title", "type": "string", "locale": "ru"},

        # собственно текст фрагмента
        {"name": "chunk_text", "type": "string", "locale": "ru"},

        # порядковый номер фрагмента внутри страницы
        {"name": "chunk_index", "type": "int32", "facet": True},

        # теги, если нужны фильтры/фасеты
        {"name": "tags", "type": "string[]", "facet": True},

        # эмбеддинг
        {
            "name": "embedding",
            "type": "float[]",
            "num_dim": 768,
        },
    ],
    "default_sorting_field": "chunk_index",
}

Создаёшь коллекцию примерно так же, как для pages:

from ts_client_admin import client as ts_admin_client
from schema_page_chunks import page_chunks_schema

ts_admin_client.collections.create(page_chunks_schema)

(используется админ-ключ, как в части про первоначальную индексацию.)

Нарезка страниц на chunks

Функция нарезки

Возьмём очищенный от HTML текст и порежем на куски по ~1000–1500 символов с небольшим overlap.

# chunking.py
from typing import List, Dict

def split_into_chunks(
    text: str,
    max_len: int = 1200,
    overlap: int = 200,
) -> List[str]:
    """
    Простейшая нарезка по символам с overlap.
    Можно усложнить: резать по предложениям/абзацам.
    """
    text = text.strip()
    if not text:
        return []

    chunks = []
    start = 0
    n = len(text)

    while start < n:
        end = start + max_len
        chunk = text[start:end]
        chunks.append(chunk.strip())
        if end >= n:
            break
        start = end - overlap

    return chunks

Маппер для chunk-документа

Предполагаем, что уже есть:

  • clean_html() - очистка HTML => plain text;
  • базовый row из MySQL с полями id, slug, title, content, tags.

Теперь:

# rag_mapper.py
from typing import List, Dict
from bs4 import BeautifulSoup
from chunking import split_into_chunks
from embeddings import embed_texts # та же функция, что и в части про векторный поиск


def clean_html(html: str) -> str:
    soup = BeautifulSoup(html or "", "html.parser")
    return soup.get_text(" ", strip=True)


def row_to_chunk_docs(row) -> List[Dict]:
    """
    Из одной строки pages → список документов для page_chunks.
    """
    text = clean_html(row["content"])
    chunks = split_into_chunks(text, max_len=1200, overlap=200)
    if not chunks:
        return []

    tags_str = row.get("tags") or ""
    tags = [t.strip() for t in tags_str.split(",") if t.strip()]

    # Считаем эмбеддинги сразу для всех фрагментов страницы
    embeddings = embed_texts(chunks) # List[List[float]]

    docs = []
    page_id = row["id"]

    for idx, (chunk_text, emb) in enumerate(zip(chunks, embeddings)):
        doc = {
            # уникальный id фрагмента: например page_id * 1000 + idx
            "id": page_id * 1000 + idx,
            "page_id": page_id,
            "slug": row["slug"],
            "title": row["title"],
            "chunk_text": chunk_text,
            "chunk_index": idx,
            "tags": tags,
            "embedding": emb,
        }
        docs.append(doc)

    return docs

Индексация коллекции page_chunks

Скрипт очень похож на уже существующий initial_index_all, только вместо row_to_typesense_doc вызываем row_to_chunk_docs и делаем bulk-insert.

# initial_index_chunks.py
from db import get_connection
from ts_client_admin import client as ts_admin_client
from rag_mapper import row_to_chunk_docs

COLLECTION_NAME = "page_chunks"


def fetch_pages(batch_size: int = 1000):
    conn = get_connection()
    try:
        with conn.cursor(dictionary=True) as cursor:
            cursor.execute("SELECT COUNT(*) AS cnt FROM pages")
            total = cursor.fetchone()["cnt"]
            offset = 0

            while offset < total:
                cursor.execute(
                    """
                    SELECT id, slug, title, content, tags
                    FROM pages
                    ORDER BY id
                    LIMIT %s OFFSET %s
                    """,
                    (batch_size, offset),
                )
                rows = cursor.fetchall()
                if not rows:
                    break
                yield rows
                offset += batch_size
    finally:
        conn.close()


def index_all_chunks():
    collection = ts_admin_client.collections[COLLECTION_NAME]

    for rows in fetch_pages(batch_size=200):
        docs_batch = []
        for row in rows:
            chunk_docs = row_to_chunk_docs(row)
            docs_batch.extend(chunk_docs)

        if not docs_batch:
            continue

        # bulk-import в JSONL
        import json
        lines = "\n".join(json.dumps(doc, ensure_ascii=False) for doc in docs_batch)

        res = collection.documents.import_(lines)
        # можно добавить логирование res для отладки


if __name__ == "__main__":
    index_all_chunks()

На 100k страниц количество chunk-документов вырастет (например, 3–10 кусочков на страницу => 300–1M документов).

Важно:

  • запускать это как оффлайновую задачу, не в веб-процессе;
  • при необходимости ограничить список страниц (например, только новые/обновлённые).

RAG-функция в Python: rag_retrieve(query)

Сделаем удобную функцию, которой будет пользоваться ассистентный слой (или HTTP-endpoint).

Векторный поиск по chunk’ам

# rag_retrieve.py
from typing import List, Dict, Any
from ts_client_search import client as ts_client
from embeddings import embed_texts

COLLECTION_NAME = "page_chunks"


def build_vector_query(embedding: List[float], k: int) -> str:
    vec_str = ",".join(str(x) for x in embedding)
    return f"embedding:([{vec_str}], k:{k})"


def rag_retrieve(
    query: str,
    *,
    top_k: int = 8,
    vector_k: int = 40,
    tag_filter: str | None = None,
) -> List[Dict[str, Any]]:
    """
    Возвращает список фрагментов для использования в RAG.
    Каждый элемент: {text, title, slug, score, chunk_index}.
    """
    q = (query or "").strip()
    if not q:
        return []

    query_vec = embed_texts([q])[0]
    vector_query = build_vector_query(query_vec, k=vector_k)

    search_params = {
        "q": q, # можно поставить "*" и делать чисто векторный поиск
        "query_by": "chunk_text",
        "per_page": top_k,
        "page": 1,
        "vector_query": vector_query,
        "exclude_fields": "embedding",
    }

    if tag_filter:
        search_params["filter_by"] = f"tags:=[{tag_filter}]"

    result = ts_client.collections[COLLECTION_NAME].documents.search(search_params)

    contexts: List[Dict[str, Any]] = []

    for hit in result.get("hits", []):
        doc = hit.get("document", {})
        # score в Typesense можно брать из "text_match" или "vector_distance",
        # конкретное поле зависит от версии. Оставим как есть:
        meta = hit.get("text_match", None)

        contexts.append({
            "text": doc.get("chunk_text", ""),
            "title": doc.get("title", ""),
            "slug": doc.get("slug", ""),
            "page_id": doc.get("page_id"),
            "chunk_index": doc.get("chunk_index", 0),
            "score": meta,
        })

    return contexts

При желании можно:

  • сделать dedup по page_id, чтобы не получать 10 кусочков с одной страницы;
  • склеивать соседние chunk’и, если ассистенту полезнее видеть более длинный контекст.

HTTP-endpoint /api/rag-search для ассистента

Сделаем JSON-endpoint, который:

  • принимает query и, опционально, limit, tag;
  • возвращает список контекстов с URL.
# app_rag.py
from flask import Flask, request, jsonify
from rag_retrieve import rag_retrieve

app = Flask(__name__)


@app.route("/api/rag-search", methods=["POST"])
def api_rag_search():
    """
    Бэкенд для ассистента: отдаёт контекст по запросу.
    Ожидает JSON {"query": "...", "limit": 8}.
    """
    data = request.get_json(force=True) or {}
    query = (data.get("query") or "").strip()
    limit = data.get("limit") or 8
    tag = data.get("tag") # опциональный фильтр

    try:
        limit = int(limit)
    except (TypeError, ValueError):
        limit = 8
    limit = max(1, min(limit, 16))

    if not query:
        return jsonify({"contexts": []})

    contexts = rag_retrieve(
        query,
        top_k=limit,
        vector_k=limit * 4,
        tag_filter=tag,
    )

    # добавим URL (собираем по slug)
    def build_url(slug: str) -> str:
        return f"https://example.com/pages/{slug}" # замени на свой домен

    for c in contexts:
        c["url"] = build_url(c["slug"])

    return jsonify({"contexts": contexts})

Теперь любой ассистент (внешний сервис, свой чат-бот и т.д.) может:

  1. Сделать POST /api/rag-search с {"query": "вопрос пользователя"}.
  2. Получить JSON:
    {
      "contexts": [
        {
          "text": "Фрагмент текста...",
          "title": "Заголовок страницы",
          "slug": "url-slug",
          "page_id": 123,
          "chunk_index": 0,
          "score": 42,
          "url": "https://example.com/pages/url-slug"
        },
        ...
      ]
    }
    
  3. Вставить contexts в промпт модели.

Пример промпта для ассистента

Грубый скелет (на стороне ассистента, не в Typesense):

Ты помощник, который отвечает только на основе предоставленного контекста.

Вопрос пользователя:
{user_question}

Контекст из базы знаний:
{context_blocks}

Инструкции:
- Отвечай по-русски.
- Если в контексте нет ответа, скажи, что не нашёл точного ответа в базе.
- В конце ответа сделай список ссылок на использованные источники.

Где context_blocks - это, например:

def build_context_block(contexts):
    lines = []
    for idx, c in enumerate(contexts, start=1):
        lines.append(f"[{idx}] {c['title']} ({c['url']})\n{c['text']}\n")
    return "\n\n".join(lines)

Несколько практичных советов

  1. Не пихать слишком много контекста. Обычно хватает 3–8 кусочков (зависит от длины). Больше => LLM начинает путаться и тратить токены впустую.
  2. Аккуратнее с HTML и мусором. Ассистенту лучше отдавать чистый текст, без JS, меню, футеров. Мы уже чистим HTML => это ровно то, что нужно.
  3. Многопоточность / очереди. Генерация эмбеддингов для chunk’ов - тяжёлая операция:
    • делай её в отдельном процессе/воркере;
    • подумай об инкрементальном обновлении (только новые/изменённые страницы).
  4. Фильтры по тегам / языку. Если на сайте есть разные разделы (документация, блог, новости) - полезно иметь tags и фильтровать контекст по ним, чтобы ассистент не мешал всё в одну кучу.
  5. Логирование RAG-поиска. Можно использовать уже сделанную таблицу search_log, но с source='rag' - это даст понимание, что спрашивают ассистента и какие запросы плохо покрыты знаниями.