Indexing
Hybrid Search
pgvector設定
評価
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import PGVector
import os
# ── 1. ドキュメントロード ────────────────────────────────────
def load_documents(source_dir: str):
loader = DirectoryLoader(
source_dir,
glob="**/*.pdf",
loader_cls=PyPDFLoader,
show_progress=True,
)
return loader.load()
# ── 2. チャンキング戦略 ──────────────────────────────────────
# chunk_size と overlap はドメインによって要調整
# 技術文書: 1000-1500 / FAQ: 300-500 / コード: 500-800
def chunk_documents(docs, chunk_size=1000, chunk_overlap=200):
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["
", "
", "。", ".", " ", ""], # 日本語対応
length_function=len,
)
chunks = splitter.split_documents(docs)
print(f" {len(docs)} docs → {len(chunks)} chunks")
return chunks
# ── 3. Embeddingモデル選択 ───────────────────────────────────
# OpenAI text-embedding-3-small: コスト安・高品質
# Voyage voyage-3: Anthropic推奨・多言語強い
# BGE-M3: OSS・日本語強い・セルフホスト可
embedding_model = OpenAIEmbeddings(
model="text-embedding-3-small",
dimensions=1536,
)
# ── 4. pgvector に保存 ───────────────────────────────────────
CONNECTION_STRING = os.getenv("DATABASE_URL")
COLLECTION_NAME = "knowledge_base"
def index_documents(source_dir: str):
docs = load_documents(source_dir)
chunks = chunk_documents(docs)
# バッチでUpsert(大量データは100件ずつ)
vectorstore = PGVector.from_documents(
documents=chunks,
embedding=embedding_model,
collection_name=COLLECTION_NAME,
connection_string=CONNECTION_STRING,
pre_delete_collection=False, # 差分更新
)
print(f"✅ Indexed {len(chunks)} chunks to pgvector")
return vectorstore
# ── 5. メタデータフィルタリング ──────────────────────────────
# ソース・日付・カテゴリなどでフィルタリングして精度を上げる
def search_with_filter(query: str, category: str = None):
vectorstore = PGVector(
collection_name=COLLECTION_NAME,
connection_string=CONNECTION_STRING,
embedding_function=embedding_model,
)
filter_dict = {}
if category:
filter_dict["category"] = category
results = vectorstore.similarity_search_with_score(
query, k=5, filter=filter_dict
)
return [
{"content": doc.page_content, "source": doc.metadata.get("source"), "score": score}
for doc, score in results
if score > 0.75 # 類似度スコアでフィルタリング
]
💡 チャンキング戦略のポイント:① 親チャンク(大)で文脈を保持し、子チャンク(小)で精度よく検索する「Parent Document Retriever」が効果的 ② 日本語は separators に「。」を追加 ③ chunk_overlap で文脈の途切れを防ぐ
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
import cohere
import anthropic
# ── Hybrid Retriever: ベクトル検索 + BM25 ────────────────────
# ベクトル検索: 意味的類似性 (semantic)
# BM25: キーワード一致 (lexical)
# 組み合わせで両方の弱点を補完する
def create_hybrid_retriever(chunks, vectorstore):
# ベクトル検索 (k=10で広めに取る)
vector_retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 10}
)
# BM25(キーワードマッチ)
bm25_retriever = BM25Retriever.from_documents(chunks)
bm25_retriever.k = 10
# アンサンブル(重み: ベクトル70% + BM25 30%)
hybrid = EnsembleRetriever(
retrievers=[vector_retriever, bm25_retriever],
weights=[0.7, 0.3],
)
return hybrid
# ── Reranker: Cohere Rerank で精度向上 ───────────────────────
# 上位10件を取得後、Reranker で再スコアリングしてTop-3に絞る
def rerank_results(query: str, docs: list, top_n: int = 3) -> list:
co = cohere.Client(os.getenv("COHERE_API_KEY"))
reranked = co.rerank(
query=query,
documents=[doc.page_content for doc in docs],
top_n=top_n,
model="rerank-multilingual-v3.0", # 日本語対応
)
return [docs[r.index] for r in reranked.results]
# ── HyDE: 仮の回答で検索精度を上げる ─────────────────────────
# ユーザークエリを直接ベクトル化するより、
# 「そのクエリに対する仮の回答」をベクトル化した方が精度が上がるケースがある
def hyde_search(query: str, vectorstore) -> list:
client = anthropic.Anthropic()
# 仮の回答を生成
hypothetical = client.messages.create(
model="claude-haiku-4-5",
max_tokens=300,
messages=[{"role": "user", "content": f"以下の質問に対する理想的な回答を書いてください(情報は作っても良い):
{query}"}]
).content[0].text
# 仮の回答でベクトル検索
results = vectorstore.similarity_search(hypothetical, k=5)
return results
# ── 統合RAGパイプライン ───────────────────────────────────────
def rag_query(question: str, vectorstore, chunks) -> dict:
# 1. Hybrid検索
hybrid = create_hybrid_retriever(chunks, vectorstore)
retrieved = hybrid.invoke(question)
# 2. Reranking
top_docs = rerank_results(question, retrieved, top_n=3)
# 3. コンテキスト構築(ソース引用のためメタデータも含める)
context_parts = []
for i, doc in enumerate(top_docs):
source = doc.metadata.get("source", "不明")
context_parts.append(f"[ソース{i+1}: {source}]
{doc.page_content}")
context = "
---
".join(context_parts)
# 4. Claude で回答生成
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
system="""あなたは社内ナレッジベースの質問応答AIです。
以下のルールを厳守してください:
1. 提供されたコンテキストの情報のみを使って回答する
2. コンテキストに情報がない場合は「提供された資料には該当情報がありません」と回答する
3. 回答の根拠となったソースを [ソースN] の形式で引用する
4. 推測や外部知識を混ぜない""",
messages=[{
"role": "user",
"content": f"コンテキスト:
{context}
質問: {question}"
}],
)
return {
"answer": response.content[0].text,
"sources": [doc.metadata.get("source") for doc in top_docs],
"retrieved_count": len(retrieved),
}
💡 Hybrid Search が重要な理由:ベクトル検索は「概念的に似た文書」を見つけるが、固有名詞・型番・コマンド名などのキーワード検索が苦手。BM25と組み合わせることで両方カバーできる。
-- ── PostgreSQL に pgvector 拡張をインストール ──────────────
CREATE EXTENSION IF NOT EXISTS vector;
-- ── ドキュメントチャンクテーブル ─────────────────────────────
CREATE TABLE document_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
embedding VECTOR(1536), -- text-embedding-3-small の次元数
metadata JSONB DEFAULT '{}', -- source, page, category など
collection TEXT NOT NULL DEFAULT 'default',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ── インデックス(近似最近傍探索)──────────────────────────────
-- IVFFlat: 高速だが精度は近似(大量データに)
CREATE INDEX ON document_chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100); -- lists = sqrt(行数) が目安
-- HNSW: より高精度・メモリ使用量大(中規模に)
-- CREATE INDEX ON document_chunks
-- USING hnsw (embedding vector_cosine_ops)
-- WITH (m = 16, ef_construction = 64);
-- ── 検索クエリ ────────────────────────────────────────────────
-- コサイン類似度でTop-5取得
SELECT
id,
content,
metadata->>'source' AS source,
1 - (embedding <=> $1::vector) AS similarity -- <=> はコサイン距離
FROM document_chunks
WHERE collection = 'knowledge_base'
AND metadata->>'category' = 'technical' -- メタデータフィルタ
ORDER BY embedding <=> $1::vector -- コサイン距離でソート
LIMIT 5;
-- ── メタデータフィルタ付きHybrid(pgvector + tsvector)─────
SELECT
id, content, metadata->>'source' AS source,
1 - (embedding <=> $1::vector) AS vector_score,
ts_rank(to_tsvector('japanese', content), query) AS bm25_score
FROM document_chunks,
to_tsquery('japanese', $2) query
WHERE to_tsvector('japanese', content) @@ query
OR (embedding <=> $1::vector) < 0.3
ORDER BY
(1 - (embedding <=> $1::vector)) * 0.7 +
ts_rank(to_tsvector('japanese', content), query) * 0.3 DESC
LIMIT 10;
💡 pgvector の選択基準:① 既存 PostgreSQL があれば追加コストゼロ ② 〜100万件規模なら IVFFlat インデックスで十分 ③ 100万件超・低レイテンシ必須なら Pinecone/Qdrant を検討 ④ Supabase は pgvector をマネージドで提供している
from ragas import evaluate
from ragas.metrics import (
faithfulness, # 回答がコンテキストに忠実か(ハルシネーション検出)
answer_relevancy, # 回答が質問に関連しているか
context_precision, # 取得コンテキストの精度
context_recall, # 必要な情報を取得できているか
)
from datasets import Dataset
# ── 評価データセット準備 ─────────────────────────────────────
eval_data = {
"question": [
"有給休暇の申請方法を教えてください",
"リモートワーク手当の条件は何ですか",
],
"answer": [
"有給休暇は社内システム HRCore から申請...", # RAGの回答
"リモートワーク手当は月額5000円で...",
],
"contexts": [
["就業規則第10条:有給休暇は...", "申請手順: 1. HRCoreにログイン..."],
["リモートワーク規程: 月額5,000円を支給..."],
],
"ground_truth": [
"HRCore システムから申請し、上長承認後に取得できます", # 正解
"月額5,000円、週3日以上のリモート勤務が条件です",
],
}
dataset = Dataset.from_dict(eval_data)
# ── 評価実行 ─────────────────────────────────────────────────
results = evaluate(
dataset=dataset,
metrics=[faithfulness, answer_relevancy, context_precision, context_recall],
)
print(results)
# faithfulness: 0.92 ← 1.0に近いほどハルシネーションなし
# answer_relevancy: 0.88 ← 1.0に近いほど質問に的確
# context_precision: 0.85 ← 取得コンテキストのノイズが少ない
# context_recall: 0.90 ← 必要な情報を取り逃していない
# ── 継続的評価(CI/CDに組み込む) ───────────────────────────
# GitHub Actions でRAG変更時に自動評価
# 閾値を下回ったら PR をブロックする
THRESHOLDS = {
"faithfulness": 0.85,
"answer_relevancy": 0.80,
"context_precision": 0.75,
"context_recall": 0.80,
}
for metric, threshold in THRESHOLDS.items():
score = results[metric]
status = "✅" if score >= threshold else "❌"
print(f"{status} {metric}: {score:.2f} (threshold: {threshold})")
💡 RAGASの4指標:① faithfulness(最重要)= ハルシネーションがないか ② answer_relevancy = 質問ズレがないか ③ context_precision = 無関係なチャンクを取ってないか ④ context_recall = 必要な情報を取り逃していないか。週次でモニタリングして劣化を検知する。