Building a Local RAG Pipeline: Complete Guide with llama.cpp and ChromaDB
Step-by-step guide to building a production-ready RAG (Retrieval-Augmented Generation) pipeline using llama.cpp, ChromaDB, and Python with evaluation metrics.
TL;DR
Summary not available.
TL;DR
Learn to build a complete local RAG (Retrieval-Augmented Generation) pipeline using open-source tools. We'll use llama.cpp for inference, ChromaDB for vector storage, and Python for orchestration. This guide includes evaluation metrics, optimization techniques, and production considerations.
Introduction
Retrieval-Augmented Generation (RAG) has revolutionized how we build AI applications that need to work with specific knowledge bases. While cloud-based solutions are popular, running RAG locally offers several advantages:
- Privacy: Your data never leaves your infrastructure
- Cost Control: No per-token pricing or API rate limits
- Customization: Full control over models and parameters
- Offline Operation: Works without internet connectivity
This guide will walk you through building a production-ready local RAG system from scratch.
This tutorial assumes basic familiarity with Python and machine learning concepts. We'll provide code examples and explanations for all components.
Architecture Overview
Our RAG pipeline consists of several key components:
graph TD
A[Documents] --> B[Text Splitter]
B --> C[Embedding Model]
C --> D[ChromaDB Vector Store]
E[User Query] --> F[Query Embedding]
F --> D
D --> G[Retrieved Context]
G --> H[llama.cpp LLM]
E --> H
H --> I[Generated Response]Core Components
- Document Processing: Text extraction and chunking
- Embedding Generation: Convert text to vectors using local models
- Vector Storage: ChromaDB for similarity search
- LLM Inference: llama.cpp for local text generation
- Evaluation: Metrics to measure RAG performance
Environment Setup
Prerequisites
First, let's set up our development environment:
# Create virtual environment
python -m venv rag-env
source rag-env/bin/activate # On Windows: rag-env\Scripts\activate
# Install required packages
pip install chromadb
pip install sentence-transformers
pip install langchain
pip install pypdf
pip install python-dotenv
pip install numpy
pip install pandas
pip install tqdmInstalling llama.cpp
# Clone and build llama.cpp
git clone https://github.com/ggerganov/llama.cpp.git
cd llama.cpp
make
# Install Python bindings
pip install llama-cpp-pythonBuilding llama.cpp requires a C++ compiler. On Windows, you may need Visual Studio Build Tools. On macOS, ensure Xcode Command Line Tools are installed.
Project Structure
rag-pipeline/
├── data/
│ ├── documents/
│ └── models/
├── src/
│ ├── embeddings.py
│ ├── vector_store.py
│ ├── llm_interface.py
│ ├── rag_pipeline.py
│ └── evaluation.py
├── config/
│ └── config.yaml
├── tests/
└── requirements.txtDocument Processing
Text Extraction and Chunking
Let's start by creating a document processor that can handle various file formats:
# src/document_processor.py
import os
import re
from typing import List, Dict
from pathlib import Path
import PyPDF2
from langchain.text_splitter import RecursiveCharacterTextSplitter
class DocumentProcessor:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)
def extract_text_from_pdf(self, pdf_path: str) -> str:
"""Extract text from PDF file."""
text = ""
try:
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
except Exception as e:
print(f"Error extracting text from {pdf_path}: {e}")
return text
def extract_text_from_txt(self, txt_path: str) -> str:
"""Extract text from plain text file."""
try:
with open(txt_path, 'r', encoding='utf-8') as file:
return file.read()
except Exception as e:
print(f"Error reading {txt_path}: {e}")
return ""
def clean_text(self, text: str) -> str:
"""Clean and normalize text."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep punctuation
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)]', '', text)
# Remove page numbers and headers/footers (basic heuristic)
lines = text.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if len(line) > 10 and not re.match(r'^\d+$', line):
cleaned_lines.append(line)
return ' '.join(cleaned_lines)
def process_document(self, file_path: str) -> List[Dict[str, str]]:
"""Process a single document and return chunks with metadata."""
file_extension = Path(file_path).suffix.lower()
# Extract text based on file type
if file_extension == '.pdf':
raw_text = self.extract_text_from_pdf(file_path)
elif file_extension == '.txt':
raw_text = self.extract_text_from_txt(file_path)
else:
print(f"Unsupported file type: {file_extension}")
return []
if not raw_text.strip():
print(f"No text extracted from {file_path}")
return []
# Clean text
cleaned_text = self.clean_text(raw_text)
# Split into chunks
chunks = self.text_splitter.split_text(cleaned_text)
# Create chunk objects with metadata
processed_chunks = []
for i, chunk in enumerate(chunks):
processed_chunks.append({
'content': chunk,
'metadata': {
'source': file_path,
'chunk_id': i,
'total_chunks': len(chunks),
'file_type': file_extension
}
})
return processed_chunks
def process_directory(self, directory_path: str) -> List[Dict[str, str]]:
"""Process all supported documents in a directory."""
all_chunks = []
supported_extensions = ['.pdf', '.txt']
for file_path in Path(directory_path).rglob('*'):
if file_path.suffix.lower() in supported_extensions:
print(f"Processing: {file_path}")
chunks = self.process_document(str(file_path))
all_chunks.extend(chunks)
print(f"Processed {len(all_chunks)} chunks from directory")
return all_chunks
# Example usage
if __name__ == "__main__":
processor = DocumentProcessor(chunk_size=800, chunk_overlap=150)
chunks = processor.process_directory("data/documents/")
print(f"Total chunks created: {len(chunks)}")Embedding Generation
Local Embedding Model
We'll use SentenceTransformers for generating embeddings locally:
# src/embeddings.py
import numpy as np
from typing import List, Union
from sentence_transformers import SentenceTransformer
import torch
class LocalEmbeddings:
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
"""Initialize local embedding model.
Args:
model_name: HuggingFace model name for embeddings
"""
self.model_name = model_name
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Loading embedding model: {model_name} on {self.device}")
self.model = SentenceTransformer(model_name, device=self.device)
self.embedding_dimension = self.model.get_sentence_embedding_dimension()
print(f"Embedding dimension: {self.embedding_dimension}")
def embed_text(self, text: str) -> np.ndarray:
"""Generate embedding for a single text."""
return self.model.encode([text])[0]
def embed_texts(self, texts: List[str], batch_size: int = 32) -> np.ndarray:
"""Generate embeddings for multiple texts."""
embeddings = self.model.encode(
texts,
batch_size=batch_size,
show_progress_bar=True,
convert_to_numpy=True
)
return embeddings
def similarity(self, text1: str, text2: str) -> float:
"""Calculate cosine similarity between two texts."""
emb1 = self.embed_text(text1)
emb2 = self.embed_text(text2)
# Cosine similarity
dot_product = np.dot(emb1, emb2)
norm1 = np.linalg.norm(emb1)
norm2 = np.linalg.norm(emb2)
return dot_product / (norm1 * norm2)
# Alternative: OpenAI-compatible embedding models
class OpenAICompatibleEmbeddings:
def __init__(self, model_name: str = "text-embedding-ada-002", api_key: str = None):
"""For comparison with cloud-based embeddings."""
import openai
self.model_name = model_name
openai.api_key = api_key
def embed_text(self, text: str) -> np.ndarray:
import openai
response = openai.Embedding.create(
input=[text],
model=self.model_name
)
return np.array(response['data'][0]['embedding'])
# Example usage and benchmarking
if __name__ == "__main__":
embeddings = LocalEmbeddings()
# Test embedding generation
sample_texts = [
"Machine learning is a subset of artificial intelligence.",
"Deep learning uses neural networks with multiple layers.",
"Natural language processing helps computers understand text."
]
# Generate embeddings
embeds = embeddings.embed_texts(sample_texts)
print(f"Generated embeddings shape: {embeds.shape}")
# Test similarity
similarity_score = embeddings.similarity(sample_texts[0], sample_texts[1])
print(f"Similarity between first two texts: {similarity_score:.4f}")Vector Store with ChromaDB
ChromaDB Integration
ChromaDB provides an excellent local vector database solution:
# src/vector_store.py
import chromadb
from chromadb.config import Settings
from typing import List, Dict, Optional, Tuple
import uuid
from embeddings import LocalEmbeddings
class ChromaVectorStore:
def __init__(self,
collection_name: str = "rag_documents",
persist_directory: str = "./chroma_db",
embedding_model: str = "all-MiniLM-L6-v2"):
"""Initialize ChromaDB vector store.
Args:
collection_name: Name of the ChromaDB collection
persist_directory: Directory to persist the database
embedding_model: Model name for generating embeddings
"""
self.collection_name = collection_name
self.embeddings = LocalEmbeddings(embedding_model)
# Initialize ChromaDB client with persistence
self.client = chromadb.PersistentClient(path=persist_directory)
# Create or get collection
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"} # Use cosine similarity
)
print(f"Initialized ChromaDB collection: {collection_name}")
print(f"Collection count: {self.collection.count()}")
def add_documents(self, documents: List[Dict[str, str]], batch_size: int = 100):
"""Add documents to the vector store."""
print(f"Adding {len(documents)} documents to vector store...")
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
# Prepare batch data
texts = [doc['content'] for doc in batch]
metadatas = [doc['metadata'] for doc in batch]
ids = [str(uuid.uuid4()) for _ in batch]
# Generate embeddings
embeddings = self.embeddings.embed_texts(texts)
# Add to collection
self.collection.add(
documents=texts,
metadatas=metadatas,
ids=ids,
embeddings=embeddings.tolist()
)
print(f"Added batch {i//batch_size + 1}/{(len(documents)-1)//batch_size + 1}")
print(f"Total documents in collection: {self.collection.count()}")
def similarity_search(self,
query: str,
k: int = 5,
filter_dict: Optional[Dict] = None) -> List[Dict]:
"""Search for similar documents."""
# Generate query embedding
query_embedding = self.embeddings.embed_text(query)
# Search in ChromaDB
results = self.collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=k,
where=filter_dict
)
# Format results
formatted_results = []
for i in range(len(results['documents'][0])):
formatted_results.append({
'content': results['documents'][0][i],
'metadata': results['metadatas'][0][i],
'distance': results['distances'][0][i],
'id': results['ids'][0][i]
})
return formatted_results
def similarity_search_with_score(self,
query: str,
k: int = 5) -> List[Tuple[Dict, float]]:
"""Search with similarity scores."""
results = self.similarity_search(query, k)
return [(result, 1 - result['distance']) for result in results]
def delete_collection(self):
"""Delete the entire collection."""
self.client.delete_collection(name=self.collection_name)
print(f"Deleted collection: {self.collection_name}")
def get_collection_stats(self) -> Dict:
"""Get collection statistics."""
count = self.collection.count()
return {
'name': self.collection_name,
'document_count': count,
'embedding_dimension': self.embeddings.embedding_dimension
}
# Advanced ChromaDB features
class AdvancedChromaStore(ChromaVectorStore):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def hybrid_search(self,
query: str,
k: int = 5,
keyword_weight: float = 0.3,
semantic_weight: float = 0.7) -> List[Dict]:
"""Combine semantic and keyword search."""
# Semantic search
semantic_results = self.similarity_search(query, k * 2)
# Simple keyword matching (can be improved with BM25)
keyword_results = []
query_words = set(query.lower().split())
for result in semantic_results:
content_words = set(result['content'].lower().split())
keyword_overlap = len(query_words.intersection(content_words))
keyword_score = keyword_overlap / len(query_words) if query_words else 0
# Combine scores
semantic_score = 1 - result['distance']
combined_score = (semantic_weight * semantic_score +
keyword_weight * keyword_score)
result['combined_score'] = combined_score
keyword_results.append(result)
# Sort by combined score and return top k
keyword_results.sort(key=lambda x: x['combined_score'], reverse=True)
return keyword_results[:k]
def get_document_by_metadata(self, metadata_filter: Dict) -> List[Dict]:
"""Retrieve documents by metadata filter."""
results = self.collection.get(where=metadata_filter)
formatted_results = []
for i in range(len(results['documents'])):
formatted_results.append({
'content': results['documents'][i],
'metadata': results['metadatas'][i],
'id': results['ids'][i]
})
return formatted_results
# Example usage
if __name__ == "__main__":
# Initialize vector store
vector_store = ChromaVectorStore(
collection_name="test_documents",
persist_directory="./test_chroma_db"
)
# Sample documents
sample_docs = [
{
'content': "Machine learning is a method of data analysis that automates analytical model building.",
'metadata': {'source': 'ml_intro.txt', 'category': 'machine_learning'}
},
{
'content': "Deep learning is part of a broader family of machine learning methods based on artificial neural networks.",
'metadata': {'source': 'dl_intro.txt', 'category': 'deep_learning'}
}
]
# Add documents
vector_store.add_documents(sample_docs)
# Search
results = vector_store.similarity_search("What is machine learning?", k=2)
for result in results:
print(f"Content: {result['content'][:100]}...")
print(f"Score: {1 - result['distance']:.4f}")
print("---")LLM Integration with llama.cpp
Local LLM Interface
Now let's create an interface for llama.cpp:
# src/llm_interface.py
import os
from typing import List, Dict, Optional, Generator
from llama_cpp import Llama
import json
class LlamaCppLLM:
def __init__(self,
model_path: str,
n_ctx: int = 4096,
n_threads: int = 8,
n_gpu_layers: int = 0,
verbose: bool = False):
"""Initialize llama.cpp model.
Args:
model_path: Path to GGUF model file
n_ctx: Context length
n_threads: Number of CPU threads
n_gpu_layers: Number of layers to offload to GPU
verbose: Enable verbose logging
"""
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
print(f"Loading model: {model_path}")
print(f"Context length: {n_ctx}")
print(f"CPU threads: {n_threads}")
print(f"GPU layers: {n_gpu_layers}")
self.llm = Llama(
model_path=model_path,
n_ctx=n_ctx,
n_threads=n_threads,
n_gpu_layers=n_gpu_layers,
verbose=verbose
)
print("Model loaded successfully!")
def generate(self,
prompt: str,
max_tokens: int = 512,
temperature: float = 0.7,
top_p: float = 0.9,
stop: Optional[List[str]] = None) -> str:
"""Generate text completion."""
response = self.llm(
prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop=stop,
echo=False
)
return response['choices'][0]['text'].strip()
def generate_stream(self,
prompt: str,
max_tokens: int = 512,
temperature: float = 0.7,
top_p: float = 0.9,
stop: Optional[List[str]] = None) -> Generator[str, None, None]:
"""Generate text with streaming."""
stream = self.llm(
prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop=stop,
stream=True,
echo=False
)
for output in stream:
token = output['choices'][0]['text']
yield token
def create_chat_completion(self,
messages: List[Dict[str, str]],
max_tokens: int = 512,
temperature: float = 0.7) -> str:
"""Create chat completion from messages."""
# Format messages into a prompt
prompt = self._format_chat_prompt(messages)
return self.generate(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
stop=["Human:", "Assistant:", "\n\n"]
)
def _format_chat_prompt(self, messages: List[Dict[str, str]]) -> str:
"""Format chat messages into a prompt."""
formatted_prompt = ""
for message in messages:
role = message.get('role', 'user')
content = message.get('content', '')
if role == 'system':
formatted_prompt += f"System: {content}\n\n"
elif role == 'user':
formatted_prompt += f"Human: {content}\n\n"
elif role == 'assistant':
formatted_prompt += f"Assistant: {content}\n\n"
formatted_prompt += "Assistant: "
return formatted_prompt
class RAGPromptTemplate:
"""Template for RAG prompts."""
@staticmethod
def create_rag_prompt(query: str, context_chunks: List[str], max_context_length: int = 2000) -> str:
"""Create a RAG prompt with context and query."""
# Combine context chunks
context = "\n\n".join(context_chunks)
# Truncate context if too long
if len(context) > max_context_length:
context = context[:max_context_length] + "..."
prompt = f"""You are a helpful AI assistant. Use the following context to answer the user's question. If the answer cannot be found in the context, say "I don't have enough information to answer that question."
Context:
{context}
Question: {query}
Answer:"""
return prompt
@staticmethod
def create_chat_rag_prompt(query: str,
context_chunks: List[str],
chat_history: List[Dict[str, str]] = None,
max_context_length: int = 2000) -> List[Dict[str, str]]:
"""Create chat messages for RAG with history."""
context = "\n\n".join(context_chunks)
if len(context) > max_context_length:
context = context[:max_context_length] + "..."
messages = [
{
"role": "system",
"content": f"""You are a helpful AI assistant. Use the following context to answer questions. If the answer cannot be found in the context, say "I don't have enough information to answer that question."
Context:
{context}"""
}
]
# Add chat history
if chat_history:
messages.extend(chat_history)
# Add current query
messages.append({
"role": "user",
"content": query
})
return messages
# Example usage and model downloading helper
class ModelManager:
"""Helper class for managing llama.cpp models."""
RECOMMENDED_MODELS = {
"llama-2-7b-chat": {
"url": "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGUF/resolve/main/llama-2-7b-chat.Q4_K_M.gguf",
"filename": "llama-2-7b-chat.Q4_K_M.gguf",
"size_gb": 4.1
},
"mistral-7b-instruct": {
"url": "https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.1-GGUF/resolve/main/mistral-7b-instruct-v0.1.Q4_K_M.gguf",
"filename": "mistral-7b-instruct-v0.1.Q4_K_M.gguf",
"size_gb": 4.1
}
}
@staticmethod
def download_model(model_name: str, models_dir: str = "./data/models/"):
"""Download a recommended model."""
import requests
from tqdm import tqdm
if model_name not in ModelManager.RECOMMENDED_MODELS:
print(f"Model {model_name} not in recommended list.")
print(f"Available models: {list(ModelManager.RECOMMENDED_MODELS.keys())}")
return None
model_info = ModelManager.RECOMMENDED_MODELS[model_name]
os.makedirs(models_dir, exist_ok=True)
file_path = os.path.join(models_dir, model_info["filename"])
if os.path.exists(file_path):
print(f"Model already exists: {file_path}")
return file_path
print(f"Downloading {model_name} ({model_info['size_gb']} GB)...")
response = requests.get(model_info["url"], stream=True)
total_size = int(response.headers.get('content-length', 0))
with open(file_path, 'wb') as file:
with tqdm(total=total_size, unit='B', unit_scale=True, desc="Downloading") as pbar:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
pbar.update(len(chunk))
print(f"Model downloaded: {file_path}")
return file_path
# Example usage
if __name__ == "__main__":
# Download a model (uncomment to use)
# model_path = ModelManager.download_model("mistral-7b-instruct")
# For testing, use a placeholder path
model_path = "./data/models/mistral-7b-instruct-v0.1.Q4_K_M.gguf"
if os.path.exists(model_path):
# Initialize LLM
llm = LlamaCppLLM(
model_path=model_path,
n_ctx=4096,
n_threads=8,
n_gpu_layers=0 # Set to > 0 if you have a compatible GPU
)
# Test generation
prompt = "What is machine learning?"
response = llm.generate(prompt, max_tokens=200, temperature=0.7)
print(f"Response: {response}")
# Test RAG prompt
context_chunks = [
"Machine learning is a method of data analysis that automates analytical model building.",
"It is a branch of artificial intelligence based on the idea that systems can learn from data."
]
rag_prompt = RAGPromptTemplate.create_rag_prompt(
"What is machine learning?",
context_chunks
)
rag_response = llm.generate(rag_prompt, max_tokens=200, temperature=0.7)
print(f"RAG Response: {rag_response}")
else:
print(f"Model file not found: {model_path}")
print("Please download a model first using ModelManager.download_model()")Complete RAG Pipeline
Orchestrating Everything Together
Now let's create the main RAG pipeline that ties everything together:
# src/rag_pipeline.py
import os
import time
from typing import List, Dict, Optional, Tuple
from document_processor import DocumentProcessor
from vector_store import ChromaVectorStore
from llm_interface import LlamaCppLLM, RAGPromptTemplate
import json
class LocalRAGPipeline:
def __init__(self,
model_path: str,
documents_dir: str = "./data/documents/",
chroma_db_dir: str = "./chroma_db",
collection_name: str = "rag_documents",
embedding_model: str = "all-MiniLM-L6-v2",
chunk_size: int = 1000,
chunk_overlap: int = 200):
"""Initialize the complete RAG pipeline.
Args:
model_path: Path to the llama.cpp model
documents_dir: Directory containing documents to index
chroma_db_dir: Directory for ChromaDB persistence
collection_name: Name of the ChromaDB collection
embedding_model: Model for generating embeddings
chunk_size: Size of text chunks
chunk_overlap: Overlap between chunks
"""
print("Initializing Local RAG Pipeline...")
# Initialize components
self.document_processor = DocumentProcessor(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
self.vector_store = ChromaVectorStore(
collection_name=collection_name,
persist_directory=chroma_db_dir,
embedding_model=embedding_model
)
self.llm = LlamaCppLLM(model_path=model_path)
self.documents_dir = documents_dir
self.chat_history = []
print("RAG Pipeline initialized successfully!")
def index_documents(self, force_reindex: bool = False):
"""Index all documents in the documents directory."""
# Check if collection already has documents
current_count = self.vector_store.collection.count()
if current_count > 0 and not force_reindex:
print(f"Collection already contains {current_count} documents.")
print("Use force_reindex=True to reindex all documents.")
return
if force_reindex and current_count > 0:
print("Force reindexing: clearing existing collection...")
self.vector_store.delete_collection()
self.vector_store = ChromaVectorStore(
collection_name=self.vector_store.collection_name,
persist_directory=self.vector_store.client._settings.persist_directory,
embedding_model=self.vector_store.embeddings.model_name
)
print(f"Processing documents from: {self.documents_dir}")
# Process all documents
chunks = self.document_processor.process_directory(self.documents_dir)
if not chunks:
print("No documents found to index!")
return
# Add to vector store
self.vector_store.add_documents(chunks)
print(f"Successfully indexed {len(chunks)} document chunks!")
def query(self,
question: str,
k: int = 5,
use_chat_history: bool = False,
max_tokens: int = 512,
temperature: float = 0.7) -> Dict[str, any]:
"""Query the RAG pipeline.
Args:
question: User question
k: Number of relevant chunks to retrieve
use_chat_history: Whether to include chat history in context
max_tokens: Maximum tokens for LLM response
temperature: Temperature for LLM generation
Returns:
Dictionary containing answer, sources, and metadata
"""
start_time = time.time()
# 1. Retrieve relevant documents
print(f"Retrieving {k} most relevant documents...")
retrieved_docs = self.vector_store.similarity_search(question, k=k)
if not retrieved_docs:
return {
'answer': "I couldn't find any relevant information to answer your question.",
'sources': [],
'retrieval_time': time.time() - start_time,
'generation_time': 0,
'total_time': time.time() - start_time
}
retrieval_time = time.time() - start_time
# 2. Prepare context
context_chunks = [doc['content'] for doc in retrieved_docs]
# 3. Generate response
generation_start = time.time()
if use_chat_history and self.chat_history:
# Use chat-based RAG
messages = RAGPromptTemplate.create_chat_rag_prompt(
question, context_chunks, self.chat_history[-10:] # Last 10 exchanges
)
answer = self.llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature
)
else:
# Use simple RAG
prompt = RAGPromptTemplate.create_rag_prompt(question, context_chunks)
answer = self.llm.generate(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature
)
generation_time = time.time() - generation_start
total_time = time.time() - start_time
# 4. Update chat history
if use_chat_history:
self.chat_history.append({"role": "user", "content": question})
self.chat_history.append({"role": "assistant", "content": answer})
# 5. Prepare sources information
sources = []
for doc in retrieved_docs:
sources.append({
'content_preview': doc['content'][:200] + "..." if len(doc['content']) > 200 else doc['content'],
'metadata': doc['metadata'],
'similarity_score': 1 - doc['distance']
})
return {
'answer': answer,
'sources': sources,
'retrieval_time': retrieval_time,
'generation_time': generation_time,
'total_time': total_time,
'num_retrieved_docs': len(retrieved_docs)
}
def query_stream(self,
question: str,
k: int = 5,
use_chat_history: bool = False,
max_tokens: int = 512,
temperature: float = 0.7):
"""Query with streaming response."""
# Retrieve documents
retrieved_docs = self.vector_store.similarity_search(question, k=k)
if not retrieved_docs:
yield "I couldn't find any relevant information to answer your question."
return
# Prepare context and prompt
context_chunks = [doc['content'] for doc in retrieved_docs]
if use_chat_history and self.chat_history:
messages = RAGPromptTemplate.create_chat_rag_prompt(
question, context_chunks, self.chat_history[-10:]
)
prompt = self.llm._format_chat_prompt(messages)
else:
prompt = RAGPromptTemplate.create_rag_prompt(question, context_chunks)
# Stream response
full_response = ""
for token in self.llm.generate_stream(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature
):
full_response += token
yield token
# Update chat history
if use_chat_history:
self.chat_history.append({"role": "user", "content": question})
self.chat_history.append({"role": "assistant", "content": full_response})
def clear_chat_history(self):
"""Clear the chat history."""
self.chat_history = []
print("Chat history cleared.")
def get_stats(self) -> Dict:
"""Get pipeline statistics."""
return {
'vector_store_stats': self.vector_store.get_collection_stats(),
'chat_history_length': len(self.chat_history),
'model_info': {
'context_length': self.llm.llm.n_ctx(),
'vocab_size': self.llm.llm.n_vocab()
}
}
def save_conversation(self, filename: str):
"""Save current conversation to file."""
conversation_data = {
'chat_history': self.chat_history,
'timestamp': time.time(),
'stats': self.get_stats()
}
with open(filename, 'w') as f:
json.dump(conversation_data, f, indent=2)
print(f"Conversation saved to: {filename}")
def load_conversation(self, filename: str):
"""Load conversation from file."""
try:
with open(filename, 'r') as f:
conversation_data = json.load(f)
self.chat_history = conversation_data.get('chat_history', [])
print(f"Conversation loaded from: {filename}")
print(f"Chat history length: {len(self.chat_history)}")
except Exception as e:
print(f"Error loading conversation: {e}")
# Interactive CLI interface
class RAGChatInterface:
def __init__(self, rag_pipeline: LocalRAGPipeline):
self.rag = rag_pipeline
self.use_streaming = True
self.use_chat_history = True
def run(self):
"""Run interactive chat interface."""
print("\n" + "="*60)
print("🤖 Local RAG Chat Interface")
print("="*60)
print("Commands:")
print(" /help - Show this help")
print(" /stats - Show pipeline statistics")
print(" /clear - Clear chat history")
print(" /stream - Toggle streaming mode")
print(" /history - Toggle chat history usage")
print(" /save - Save conversation")
print(" /load - Load conversation")
print(" /quit - Exit")
print("="*60)
while True:
try:
question = input("\n💬 You: ").strip()
if not question:
continue
if question.startswith('/'):
self._handle_command(question)
continue
print("\n🤖 Assistant: ", end="", flush=True)
if self.use_streaming:
for token in self.rag.query_stream(
question,
use_chat_history=self.use_chat_history
):
print(token, end="", flush=True)
print() # New line after streaming
else:
result = self.rag.query(
question,
use_chat_history=self.use_chat_history
)
print(result['answer'])
# Show performance metrics
print(f"\n⏱️ Retrieval: {result['retrieval_time']:.2f}s, "
f"Generation: {result['generation_time']:.2f}s, "
f"Total: {result['total_time']:.2f}s")
except KeyboardInterrupt:
print("\n\nGoodbye! 👋")
break
except Exception as e:
print(f"\n❌ Error: {e}")
def _handle_command(self, command: str):
"""Handle chat commands."""
cmd = command.lower().strip()
if cmd == '/help':
print("\n📖 Available commands:")
print(" /help - Show this help")
print(" /stats - Show pipeline statistics")
print(" /clear - Clear chat history")
print(" /stream - Toggle streaming mode")
print(" /history - Toggle chat history usage")
print(" /save - Save conversation")
print(" /load - Load conversation")
print(" /quit - Exit")
elif cmd == '/stats':
stats = self.rag.get_stats()
print("\n📊 Pipeline Statistics:")
print(f" Documents indexed: {stats['vector_store_stats']['document_count']}")
print(f" Chat history length: {stats['chat_history_length']}")
print(f" Model context length: {stats['model_info']['context_length']}")
print(f" Streaming mode: {self.use_streaming}")
print(f" Chat history mode: {self.use_chat_history}")
elif cmd == '/clear':
self.rag.clear_chat_history()
print("\n🧹 Chat history cleared!")
elif cmd == '/stream':
self.use_streaming = not self.use_streaming
print(f"\n🔄 Streaming mode: {'ON' if self.use_streaming else 'OFF'}")
elif cmd == '/history':
self.use_chat_history = not self.use_chat_history
print(f"\n🔄 Chat history mode: {'ON' if self.use_chat_history else 'OFF'}")
elif cmd == '/save':
filename = input("Enter filename (or press Enter for default): ").strip()
if not filename:
filename = f"conversation_{int(time.time())}.json"
self.rag.save_conversation(filename)
elif cmd == '/load':
filename = input("Enter filename to load: ").strip()
if filename:
self.rag.load_conversation(filename)
elif cmd == '/quit':
print("\nGoodbye! 👋")
exit(0)
else:
print(f"\n❓ Unknown command: {command}")
print("Type /help for available commands.")
# Example usage
if __name__ == "__main__":
# Configuration
MODEL_PATH = "./data/models/mistral-7b-instruct-v0.1.Q4_K_M.gguf"
DOCUMENTS_DIR = "./data/documents/"
# Check if model exists
if not os.path.exists(MODEL_PATH):
print(f"Model not found: {MODEL_PATH}")
print("Please download a model first using the ModelManager class.")
exit(1)
# Initialize RAG pipeline
rag_pipeline = LocalRAGPipeline(
model_path=MODEL_PATH,
documents_dir=DOCUMENTS_DIR,
chunk_size=800,
chunk_overlap=150
)
# Index documents (if not already done)
rag_pipeline.index_documents()
# Start interactive chat
chat_interface = RAGChatInterface(rag_pipeline)
chat_interface.run()Evaluation and Optimization
RAG Evaluation Metrics
Evaluating RAG systems is crucial for understanding their performance:
# src/evaluation.py
import json
import numpy as np
from typing import List, Dict, Tuple
from dataclasses import dataclass
import time
from sklearn.metrics.pairwise import cosine_similarity
from embeddings import LocalEmbeddings
@dataclass
class EvaluationResult:
"""Container for evaluation results."""
retrieval_accuracy: float
answer_relevance: float
answer_faithfulness: float
response_time: float
context_precision: float
context_recall: float
class RAGEvaluator:
def __init__(self, rag_pipeline, embedding_model: str = "all-MiniLM-L6-v2"):
"""Initialize RAG evaluator.
Args:
rag_pipeline: The RAG pipeline to evaluate
embedding_model: Model for computing semantic similarity
"""
self.rag_pipeline = rag_pipeline
self.embeddings = LocalEmbeddings(embedding_model)
def evaluate_retrieval(self,
queries_and_expected: List[Dict[str, any]],
k: int = 5) -> Dict[str, float]:
"""Evaluate retrieval performance.
Args:
queries_and_expected: List of dicts with 'query' and 'expected_docs'
k: Number of documents to retrieve
Returns:
Dictionary with retrieval metrics
"""
precision_scores = []
recall_scores = []
mrr_scores = [] # Mean Reciprocal Rank
for item in queries_and_expected:
query = item['query']
expected_doc_ids = set(item['expected_docs'])
# Retrieve documents
retrieved_docs = self.rag_pipeline.vector_store.similarity_search(query, k=k)
retrieved_doc_ids = set([doc['id'] for doc in retrieved_docs])
# Calculate metrics
if retrieved_doc_ids:
precision = len(expected_doc_ids.intersection(retrieved_doc_ids)) / len(retrieved_doc_ids)
recall = len(expected_doc_ids.intersection(retrieved_doc_ids)) / len(expected_doc_ids) if expected_doc_ids else 0
# Mean Reciprocal Rank
reciprocal_rank = 0
for i, doc_id in enumerate([doc['id'] for doc in retrieved_docs]):
if doc_id in expected_doc_ids:
reciprocal_rank = 1 / (i + 1)
break
precision_scores.append(precision)
recall_scores.append(recall)
mrr_scores.append(reciprocal_rank)
return {
'precision': np.mean(precision_scores) if precision_scores else 0,
'recall': np.mean(recall_scores) if recall_scores else 0,
'mrr': np.mean(mrr_scores) if mrr_scores else 0,
'f1': 2 * np.mean(precision_scores) * np.mean(recall_scores) / (np.mean(precision_scores) + np.mean(recall_scores)) if (np.mean(precision_scores) + np.mean(recall_scores)) > 0 else 0
}
def evaluate_answer_relevance(self,
queries_and_answers: List[Dict[str, str]]) -> float:
"""Evaluate how relevant answers are to queries using semantic similarity.
Args:
queries_and_answers: List of dicts with 'query' and 'answer'
Returns:
Average relevance score
"""
relevance_scores = []
for item in queries_and_answers:
query = item['query']
answer = item['answer']
# Calculate semantic similarity
query_embedding = self.embeddings.embed_text(query)
answer_embedding = self.embeddings.embed_text(answer)
similarity = cosine_similarity([query_embedding], [answer_embedding])[0][0]
relevance_scores.append(similarity)
return np.mean(relevance_scores) if relevance_scores else 0
def evaluate_answer_faithfulness(self,
answers_and_contexts: List[Dict[str, any]]) -> float:
"""Evaluate how faithful answers are to the provided context.
Args:
answers_and_contexts: List of dicts with 'answer' and 'context_chunks'
Returns:
Average faithfulness score
"""
faithfulness_scores = []
for item in answers_and_contexts:
answer = item['answer']
context_chunks = item['context_chunks']
# Combine context
full_context = " ".join(context_chunks)
# Calculate semantic similarity between answer and context
answer_embedding = self.embeddings.embed_text(answer)
context_embedding = self.embeddings.embed_text(full_context)
similarity = cosine_similarity([answer_embedding], [context_embedding])[0][0]
faithfulness_scores.append(similarity)
return np.mean(faithfulness_scores) if faithfulness_scores else 0
def evaluate_response_time(self, queries: List[str], num_runs: int = 3) -> Dict[str, float]:
"""Evaluate response time performance.
Args:
queries: List of queries to test
num_runs: Number of runs for each query
Returns:
Dictionary with timing statistics
"""
all_times = []
retrieval_times = []
generation_times = []
for query in queries:
for _ in range(num_runs):
result = self.rag_pipeline.query(query)
all_times.append(result['total_time'])
retrieval_times.append(result['retrieval_time'])
generation_times.append(result['generation_time'])
return {
'mean_total_time': np.mean(all_times),
'std_total_time': np.std(all_times),
'mean_retrieval_time': np.mean(retrieval_times),
'mean_generation_time': np.mean(generation_times),
'p95_total_time': np.percentile(all_times, 95),
'p99_total_time': np.percentile(all_times, 99)
}
def comprehensive_evaluation(self,
test_dataset: Dict[str, List[Dict]]) -> EvaluationResult:
"""Run comprehensive evaluation.
Args:
test_dataset: Dictionary with test data for different metrics
Returns:
EvaluationResult object with all metrics
"""
print("Running comprehensive RAG evaluation...")
# Retrieval evaluation
if 'retrieval' in test_dataset:
print("Evaluating retrieval performance...")
retrieval_metrics = self.evaluate_retrieval(test_dataset['retrieval'])
retrieval_accuracy = retrieval_metrics['f1']
else:
retrieval_accuracy = 0
# Answer relevance
if 'relevance' in test_dataset:
print("Evaluating answer relevance...")
answer_relevance = self.evaluate_answer_relevance(test_dataset['relevance'])
else:
answer_relevance = 0
# Answer faithfulness
if 'faithfulness' in test_dataset:
print("Evaluating answer faithfulness...")
answer_faithfulness = self.evaluate_answer_faithfulness(test_dataset['faithfulness'])
else:
answer_faithfulness = 0
# Response time
if 'performance' in test_dataset:
print("Evaluating response time...")
queries = [item['query'] for item in test_dataset['performance']]
timing_metrics = self.evaluate_response_time(queries)
response_time = timing_metrics['mean_total_time']
else:
response_time = 0
return EvaluationResult(
retrieval_accuracy=retrieval_accuracy,
answer_relevance=answer_relevance,
answer_faithfulness=answer_faithfulness,
response_time=response_time,
context_precision=0, # Placeholder for more advanced metrics
context_recall=0 # Placeholder for more advanced metrics
)
class RAGOptimizer:
"""Optimizer for RAG pipeline parameters."""
def __init__(self, rag_pipeline, evaluator: RAGEvaluator):
self.rag_pipeline = rag_pipeline
self.evaluator = evaluator
def optimize_chunk_size(self,
test_queries: List[str],
chunk_sizes: List[int] = [500, 800, 1000, 1500, 2000]) -> Dict[int, float]:
"""Optimize chunk size parameter.
Args:
test_queries: Queries to test with
chunk_sizes: List of chunk sizes to test
Returns:
Dictionary mapping chunk sizes to performance scores
"""
print("Optimizing chunk size...")
results = {}
original_chunk_size = self.rag_pipeline.document_processor.chunk_size
for chunk_size in chunk_sizes:
print(f"Testing chunk size: {chunk_size}")
# Update chunk size and reindex
self.rag_pipeline.document_processor.chunk_size = chunk_size
self.rag_pipeline.index_documents(force_reindex=True)
# Evaluate performance
timing_metrics = self.evaluator.evaluate_response_time(test_queries, num_runs=2)
# Simple scoring function (can be improved)
score = 1 / timing_metrics['mean_total_time'] # Higher is better
results[chunk_size] = score
print(f"Chunk size {chunk_size}: score = {score:.4f}")
# Restore original chunk size
self.rag_pipeline.document_processor.chunk_size = original_chunk_size
return results
def optimize_retrieval_k(self,
test_queries: List[str],
k_values: List[int] = [3, 5, 7, 10, 15]) -> Dict[int, float]:
"""Optimize number of retrieved documents.
Args:
test_queries: Queries to test with
k_values: List of k values to test
Returns:
Dictionary mapping k values to performance scores
"""
print("Optimizing retrieval k...")
results = {}
for k in k_values:
print(f"Testing k={k}")
total_time = 0
for query in test_queries:
start_time = time.time()
self.rag_pipeline.query(query, k=k)
total_time += time.time() - start_time
avg_time = total_time / len(test_queries)
score = 1 / avg_time # Higher is better
results[k] = score
print(f"k={k}: score = {score:.4f}")
return results
# Example evaluation dataset creation
def create_sample_evaluation_dataset() -> Dict[str, List[Dict]]:
"""Create a sample evaluation dataset for testing."""
return {
'retrieval': [
{
'query': 'What is machine learning?',
'expected_docs': ['doc1', 'doc2'] # These would be actual document IDs
}
],
'relevance': [
{
'query': 'What is machine learning?',
'answer': 'Machine learning is a method of data analysis that automates analytical model building.'
}
],
'faithfulness': [
{
'answer': 'Machine learning is a method of data analysis that automates analytical model building.',
'context_chunks': [
'Machine learning is a method of data analysis that automates analytical model building.',
'It is a branch of artificial intelligence based on the idea that systems can learn from data.'
]
}
],
'performance': [
{'query': 'What is machine learning?'},
{'query': 'How does deep learning work?'},
{'query': 'What are neural networks?'}
]
}
# Example usage
if __name__ == "__main__":
# This would typically be run with an actual RAG pipeline
print("RAG Evaluation Framework")
print("This module provides tools for evaluating RAG pipeline performance.")
# Create sample dataset
test_dataset = create_sample_evaluation_dataset()
print(f"Sample dataset created with {len(test_dataset)} evaluation categories")Key Takeaways
- Local Advantage: Running RAG locally provides privacy, cost control, and offline capabilities
- Component Integration: Success depends on properly integrating document processing, embeddings, vector storage, and LLM inference
- Chunking Strategy: Optimal chunk size and overlap significantly impact retrieval quality
- Evaluation is Critical: Implement comprehensive evaluation metrics to measure and improve performance
- Optimization Opportunities: Systematically optimize parameters like chunk size, retrieval count, and model parameters
- Production Considerations: Include error handling, logging, and monitoring for production deployments
- Scalability: ChromaDB and llama.cpp provide good scalability for local deployments
Conclusion
Building a local RAG pipeline offers significant advantages for privacy-sensitive applications and cost-conscious deployments. The combination of llama.cpp for efficient LLM inference and ChromaDB for vector storage provides a robust foundation for production-ready systems.
Key success factors include:
- Proper document preprocessing and chunking
- High-quality embedding models
- Systematic evaluation and optimization
- Robust error handling and monitoring
This implementation provides a solid starting point that can be extended with additional features like multi-modal support, advanced retrieval strategies, and integration with existing systems.
Remember to continuously evaluate and optimize your RAG pipeline as you add more documents and encounter new use cases. The evaluation framework provided here will help you measure improvements and identify areas for optimization.
The complete code for this tutorial is available in the accompanying GitHub repository, including example documents and evaluation datasets.