AI & LLMs24 min readJanuary 10, 2024

Building a Local RAG Pipeline: Complete Guide with llama.cpp and ChromaDB

Step-by-step guide to building a production-ready RAG (Retrieval-Augmented Generation) pipeline using llama.cpp, ChromaDB, and Python with evaluation metrics.

TL;DR

Summary not available.

TL;DR

Learn to build a complete local RAG (Retrieval-Augmented Generation) pipeline using open-source tools. We'll use llama.cpp for inference, ChromaDB for vector storage, and Python for orchestration. This guide includes evaluation metrics, optimization techniques, and production considerations.

Introduction

Retrieval-Augmented Generation (RAG) has revolutionized how we build AI applications that need to work with specific knowledge bases. While cloud-based solutions are popular, running RAG locally offers several advantages:

  • Privacy: Your data never leaves your infrastructure
  • Cost Control: No per-token pricing or API rate limits
  • Customization: Full control over models and parameters
  • Offline Operation: Works without internet connectivity

This guide will walk you through building a production-ready local RAG system from scratch.

Architecture Overview

Our RAG pipeline consists of several key components:

graph TD
    A[Documents] --> B[Text Splitter]
    B --> C[Embedding Model]
    C --> D[ChromaDB Vector Store]
    E[User Query] --> F[Query Embedding]
    F --> D
    D --> G[Retrieved Context]
    G --> H[llama.cpp LLM]
    E --> H
    H --> I[Generated Response]

Core Components

  1. Document Processing: Text extraction and chunking
  2. Embedding Generation: Convert text to vectors using local models
  3. Vector Storage: ChromaDB for similarity search
  4. LLM Inference: llama.cpp for local text generation
  5. Evaluation: Metrics to measure RAG performance

Environment Setup

Prerequisites

First, let's set up our development environment:

# Create virtual environment
python -m venv rag-env
source rag-env/bin/activate  # On Windows: rag-env\Scripts\activate
 
# Install required packages
pip install chromadb
pip install sentence-transformers
pip install langchain
pip install pypdf
pip install python-dotenv
pip install numpy
pip install pandas
pip install tqdm

Installing llama.cpp

# Clone and build llama.cpp
git clone https://github.com/ggerganov/llama.cpp.git
cd llama.cpp
make
 
# Install Python bindings
pip install llama-cpp-python

Project Structure

rag-pipeline/
├── data/
│   ├── documents/
│   └── models/
├── src/
│   ├── embeddings.py
│   ├── vector_store.py
│   ├── llm_interface.py
│   ├── rag_pipeline.py
│   └── evaluation.py
├── config/
│   └── config.yaml
├── tests/
└── requirements.txt

Document Processing

Text Extraction and Chunking

Let's start by creating a document processor that can handle various file formats:

# src/document_processor.py
import os
import re
from typing import List, Dict
from pathlib import Path
import PyPDF2
from langchain.text_splitter import RecursiveCharacterTextSplitter
 
class DocumentProcessor:
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
        )
    
    def extract_text_from_pdf(self, pdf_path: str) -> str:
        """Extract text from PDF file."""
        text = ""
        try:
            with open(pdf_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page in pdf_reader.pages:
                    text += page.extract_text() + "\n"
        except Exception as e:
            print(f"Error extracting text from {pdf_path}: {e}")
        return text
    
    def extract_text_from_txt(self, txt_path: str) -> str:
        """Extract text from plain text file."""
        try:
            with open(txt_path, 'r', encoding='utf-8') as file:
                return file.read()
        except Exception as e:
            print(f"Error reading {txt_path}: {e}")
            return ""
    
    def clean_text(self, text: str) -> str:
        """Clean and normalize text."""
        # Remove excessive whitespace
        text = re.sub(r'\s+', ' ', text)
        
        # Remove special characters but keep punctuation
        text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)]', '', text)
        
        # Remove page numbers and headers/footers (basic heuristic)
        lines = text.split('\n')
        cleaned_lines = []
        for line in lines:
            line = line.strip()
            if len(line) > 10 and not re.match(r'^\d+$', line):
                cleaned_lines.append(line)
        
        return ' '.join(cleaned_lines)
    
    def process_document(self, file_path: str) -> List[Dict[str, str]]:
        """Process a single document and return chunks with metadata."""
        file_extension = Path(file_path).suffix.lower()
        
        # Extract text based on file type
        if file_extension == '.pdf':
            raw_text = self.extract_text_from_pdf(file_path)
        elif file_extension == '.txt':
            raw_text = self.extract_text_from_txt(file_path)
        else:
            print(f"Unsupported file type: {file_extension}")
            return []
        
        if not raw_text.strip():
            print(f"No text extracted from {file_path}")
            return []
        
        # Clean text
        cleaned_text = self.clean_text(raw_text)
        
        # Split into chunks
        chunks = self.text_splitter.split_text(cleaned_text)
        
        # Create chunk objects with metadata
        processed_chunks = []
        for i, chunk in enumerate(chunks):
            processed_chunks.append({
                'content': chunk,
                'metadata': {
                    'source': file_path,
                    'chunk_id': i,
                    'total_chunks': len(chunks),
                    'file_type': file_extension
                }
            })
        
        return processed_chunks
    
    def process_directory(self, directory_path: str) -> List[Dict[str, str]]:
        """Process all supported documents in a directory."""
        all_chunks = []
        supported_extensions = ['.pdf', '.txt']
        
        for file_path in Path(directory_path).rglob('*'):
            if file_path.suffix.lower() in supported_extensions:
                print(f"Processing: {file_path}")
                chunks = self.process_document(str(file_path))
                all_chunks.extend(chunks)
        
        print(f"Processed {len(all_chunks)} chunks from directory")
        return all_chunks
 
# Example usage
if __name__ == "__main__":
    processor = DocumentProcessor(chunk_size=800, chunk_overlap=150)
    chunks = processor.process_directory("data/documents/")
    print(f"Total chunks created: {len(chunks)}")

Embedding Generation

Local Embedding Model

We'll use SentenceTransformers for generating embeddings locally:

# src/embeddings.py
import numpy as np
from typing import List, Union
from sentence_transformers import SentenceTransformer
import torch
 
class LocalEmbeddings:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """Initialize local embedding model.
        
        Args:
            model_name: HuggingFace model name for embeddings
        """
        self.model_name = model_name
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"Loading embedding model: {model_name} on {self.device}")
        
        self.model = SentenceTransformer(model_name, device=self.device)
        self.embedding_dimension = self.model.get_sentence_embedding_dimension()
        
        print(f"Embedding dimension: {self.embedding_dimension}")
    
    def embed_text(self, text: str) -> np.ndarray:
        """Generate embedding for a single text."""
        return self.model.encode([text])[0]
    
    def embed_texts(self, texts: List[str], batch_size: int = 32) -> np.ndarray:
        """Generate embeddings for multiple texts."""
        embeddings = self.model.encode(
            texts, 
            batch_size=batch_size,
            show_progress_bar=True,
            convert_to_numpy=True
        )
        return embeddings
    
    def similarity(self, text1: str, text2: str) -> float:
        """Calculate cosine similarity between two texts."""
        emb1 = self.embed_text(text1)
        emb2 = self.embed_text(text2)
        
        # Cosine similarity
        dot_product = np.dot(emb1, emb2)
        norm1 = np.linalg.norm(emb1)
        norm2 = np.linalg.norm(emb2)
        
        return dot_product / (norm1 * norm2)
 
# Alternative: OpenAI-compatible embedding models
class OpenAICompatibleEmbeddings:
    def __init__(self, model_name: str = "text-embedding-ada-002", api_key: str = None):
        """For comparison with cloud-based embeddings."""
        import openai
        self.model_name = model_name
        openai.api_key = api_key
    
    def embed_text(self, text: str) -> np.ndarray:
        import openai
        response = openai.Embedding.create(
            input=[text],
            model=self.model_name
        )
        return np.array(response['data'][0]['embedding'])
 
# Example usage and benchmarking
if __name__ == "__main__":
    embeddings = LocalEmbeddings()
    
    # Test embedding generation
    sample_texts = [
        "Machine learning is a subset of artificial intelligence.",
        "Deep learning uses neural networks with multiple layers.",
        "Natural language processing helps computers understand text."
    ]
    
    # Generate embeddings
    embeds = embeddings.embed_texts(sample_texts)
    print(f"Generated embeddings shape: {embeds.shape}")
    
    # Test similarity
    similarity_score = embeddings.similarity(sample_texts[0], sample_texts[1])
    print(f"Similarity between first two texts: {similarity_score:.4f}")

Vector Store with ChromaDB

ChromaDB Integration

ChromaDB provides an excellent local vector database solution:

# src/vector_store.py
import chromadb
from chromadb.config import Settings
from typing import List, Dict, Optional, Tuple
import uuid
from embeddings import LocalEmbeddings
 
class ChromaVectorStore:
    def __init__(self, 
                 collection_name: str = "rag_documents",
                 persist_directory: str = "./chroma_db",
                 embedding_model: str = "all-MiniLM-L6-v2"):
        """Initialize ChromaDB vector store.
        
        Args:
            collection_name: Name of the ChromaDB collection
            persist_directory: Directory to persist the database
            embedding_model: Model name for generating embeddings
        """
        self.collection_name = collection_name
        self.embeddings = LocalEmbeddings(embedding_model)
        
        # Initialize ChromaDB client with persistence
        self.client = chromadb.PersistentClient(path=persist_directory)
        
        # Create or get collection
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}  # Use cosine similarity
        )
        
        print(f"Initialized ChromaDB collection: {collection_name}")
        print(f"Collection count: {self.collection.count()}")
    
    def add_documents(self, documents: List[Dict[str, str]], batch_size: int = 100):
        """Add documents to the vector store."""
        print(f"Adding {len(documents)} documents to vector store...")
        
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            
            # Prepare batch data
            texts = [doc['content'] for doc in batch]
            metadatas = [doc['metadata'] for doc in batch]
            ids = [str(uuid.uuid4()) for _ in batch]
            
            # Generate embeddings
            embeddings = self.embeddings.embed_texts(texts)
            
            # Add to collection
            self.collection.add(
                documents=texts,
                metadatas=metadatas,
                ids=ids,
                embeddings=embeddings.tolist()
            )
            
            print(f"Added batch {i//batch_size + 1}/{(len(documents)-1)//batch_size + 1}")
        
        print(f"Total documents in collection: {self.collection.count()}")
    
    def similarity_search(self, 
                         query: str, 
                         k: int = 5,
                         filter_dict: Optional[Dict] = None) -> List[Dict]:
        """Search for similar documents."""
        # Generate query embedding
        query_embedding = self.embeddings.embed_text(query)
        
        # Search in ChromaDB
        results = self.collection.query(
            query_embeddings=[query_embedding.tolist()],
            n_results=k,
            where=filter_dict
        )
        
        # Format results
        formatted_results = []
        for i in range(len(results['documents'][0])):
            formatted_results.append({
                'content': results['documents'][0][i],
                'metadata': results['metadatas'][0][i],
                'distance': results['distances'][0][i],
                'id': results['ids'][0][i]
            })
        
        return formatted_results
    
    def similarity_search_with_score(self, 
                                   query: str, 
                                   k: int = 5) -> List[Tuple[Dict, float]]:
        """Search with similarity scores."""
        results = self.similarity_search(query, k)
        return [(result, 1 - result['distance']) for result in results]
    
    def delete_collection(self):
        """Delete the entire collection."""
        self.client.delete_collection(name=self.collection_name)
        print(f"Deleted collection: {self.collection_name}")
    
    def get_collection_stats(self) -> Dict:
        """Get collection statistics."""
        count = self.collection.count()
        return {
            'name': self.collection_name,
            'document_count': count,
            'embedding_dimension': self.embeddings.embedding_dimension
        }
 
# Advanced ChromaDB features
class AdvancedChromaStore(ChromaVectorStore):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    
    def hybrid_search(self, 
                     query: str, 
                     k: int = 5,
                     keyword_weight: float = 0.3,
                     semantic_weight: float = 0.7) -> List[Dict]:
        """Combine semantic and keyword search."""
        # Semantic search
        semantic_results = self.similarity_search(query, k * 2)
        
        # Simple keyword matching (can be improved with BM25)
        keyword_results = []
        query_words = set(query.lower().split())
        
        for result in semantic_results:
            content_words = set(result['content'].lower().split())
            keyword_overlap = len(query_words.intersection(content_words))
            keyword_score = keyword_overlap / len(query_words) if query_words else 0
            
            # Combine scores
            semantic_score = 1 - result['distance']
            combined_score = (semantic_weight * semantic_score + 
                            keyword_weight * keyword_score)
            
            result['combined_score'] = combined_score
            keyword_results.append(result)
        
        # Sort by combined score and return top k
        keyword_results.sort(key=lambda x: x['combined_score'], reverse=True)
        return keyword_results[:k]
    
    def get_document_by_metadata(self, metadata_filter: Dict) -> List[Dict]:
        """Retrieve documents by metadata filter."""
        results = self.collection.get(where=metadata_filter)
        
        formatted_results = []
        for i in range(len(results['documents'])):
            formatted_results.append({
                'content': results['documents'][i],
                'metadata': results['metadatas'][i],
                'id': results['ids'][i]
            })
        
        return formatted_results
 
# Example usage
if __name__ == "__main__":
    # Initialize vector store
    vector_store = ChromaVectorStore(
        collection_name="test_documents",
        persist_directory="./test_chroma_db"
    )
    
    # Sample documents
    sample_docs = [
        {
            'content': "Machine learning is a method of data analysis that automates analytical model building.",
            'metadata': {'source': 'ml_intro.txt', 'category': 'machine_learning'}
        },
        {
            'content': "Deep learning is part of a broader family of machine learning methods based on artificial neural networks.",
            'metadata': {'source': 'dl_intro.txt', 'category': 'deep_learning'}
        }
    ]
    
    # Add documents
    vector_store.add_documents(sample_docs)
    
    # Search
    results = vector_store.similarity_search("What is machine learning?", k=2)
    for result in results:
        print(f"Content: {result['content'][:100]}...")
        print(f"Score: {1 - result['distance']:.4f}")
        print("---")

LLM Integration with llama.cpp

Local LLM Interface

Now let's create an interface for llama.cpp:

# src/llm_interface.py
import os
from typing import List, Dict, Optional, Generator
from llama_cpp import Llama
import json
 
class LlamaCppLLM:
    def __init__(self, 
                 model_path: str,
                 n_ctx: int = 4096,
                 n_threads: int = 8,
                 n_gpu_layers: int = 0,
                 verbose: bool = False):
        """Initialize llama.cpp model.
        
        Args:
            model_path: Path to GGUF model file
            n_ctx: Context length
            n_threads: Number of CPU threads
            n_gpu_layers: Number of layers to offload to GPU
            verbose: Enable verbose logging
        """
        if not os.path.exists(model_path):
            raise FileNotFoundError(f"Model file not found: {model_path}")
        
        print(f"Loading model: {model_path}")
        print(f"Context length: {n_ctx}")
        print(f"CPU threads: {n_threads}")
        print(f"GPU layers: {n_gpu_layers}")
        
        self.llm = Llama(
            model_path=model_path,
            n_ctx=n_ctx,
            n_threads=n_threads,
            n_gpu_layers=n_gpu_layers,
            verbose=verbose
        )
        
        print("Model loaded successfully!")
    
    def generate(self, 
                prompt: str,
                max_tokens: int = 512,
                temperature: float = 0.7,
                top_p: float = 0.9,
                stop: Optional[List[str]] = None) -> str:
        """Generate text completion."""
        response = self.llm(
            prompt,
            max_tokens=max_tokens,
            temperature=temperature,
            top_p=top_p,
            stop=stop,
            echo=False
        )
        
        return response['choices'][0]['text'].strip()
    
    def generate_stream(self,
                       prompt: str,
                       max_tokens: int = 512,
                       temperature: float = 0.7,
                       top_p: float = 0.9,
                       stop: Optional[List[str]] = None) -> Generator[str, None, None]:
        """Generate text with streaming."""
        stream = self.llm(
            prompt,
            max_tokens=max_tokens,
            temperature=temperature,
            top_p=top_p,
            stop=stop,
            stream=True,
            echo=False
        )
        
        for output in stream:
            token = output['choices'][0]['text']
            yield token
    
    def create_chat_completion(self,
                              messages: List[Dict[str, str]],
                              max_tokens: int = 512,
                              temperature: float = 0.7) -> str:
        """Create chat completion from messages."""
        # Format messages into a prompt
        prompt = self._format_chat_prompt(messages)
        
        return self.generate(
            prompt=prompt,
            max_tokens=max_tokens,
            temperature=temperature,
            stop=["Human:", "Assistant:", "\n\n"]
        )
    
    def _format_chat_prompt(self, messages: List[Dict[str, str]]) -> str:
        """Format chat messages into a prompt."""
        formatted_prompt = ""
        
        for message in messages:
            role = message.get('role', 'user')
            content = message.get('content', '')
            
            if role == 'system':
                formatted_prompt += f"System: {content}\n\n"
            elif role == 'user':
                formatted_prompt += f"Human: {content}\n\n"
            elif role == 'assistant':
                formatted_prompt += f"Assistant: {content}\n\n"
        
        formatted_prompt += "Assistant: "
        return formatted_prompt
 
class RAGPromptTemplate:
    """Template for RAG prompts."""
    
    @staticmethod
    def create_rag_prompt(query: str, context_chunks: List[str], max_context_length: int = 2000) -> str:
        """Create a RAG prompt with context and query."""
        # Combine context chunks
        context = "\n\n".join(context_chunks)
        
        # Truncate context if too long
        if len(context) > max_context_length:
            context = context[:max_context_length] + "..."
        
        prompt = f"""You are a helpful AI assistant. Use the following context to answer the user's question. If the answer cannot be found in the context, say "I don't have enough information to answer that question."
 
Context:
{context}
 
Question: {query}
 
Answer:"""
        
        return prompt
    
    @staticmethod
    def create_chat_rag_prompt(query: str, 
                              context_chunks: List[str], 
                              chat_history: List[Dict[str, str]] = None,
                              max_context_length: int = 2000) -> List[Dict[str, str]]:
        """Create chat messages for RAG with history."""
        context = "\n\n".join(context_chunks)
        if len(context) > max_context_length:
            context = context[:max_context_length] + "..."
        
        messages = [
            {
                "role": "system",
                "content": f"""You are a helpful AI assistant. Use the following context to answer questions. If the answer cannot be found in the context, say "I don't have enough information to answer that question."
 
Context:
{context}"""
            }
        ]
        
        # Add chat history
        if chat_history:
            messages.extend(chat_history)
        
        # Add current query
        messages.append({
            "role": "user",
            "content": query
        })
        
        return messages
 
# Example usage and model downloading helper
class ModelManager:
    """Helper class for managing llama.cpp models."""
    
    RECOMMENDED_MODELS = {
        "llama-2-7b-chat": {
            "url": "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGUF/resolve/main/llama-2-7b-chat.Q4_K_M.gguf",
            "filename": "llama-2-7b-chat.Q4_K_M.gguf",
            "size_gb": 4.1
        },
        "mistral-7b-instruct": {
            "url": "https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.1-GGUF/resolve/main/mistral-7b-instruct-v0.1.Q4_K_M.gguf",
            "filename": "mistral-7b-instruct-v0.1.Q4_K_M.gguf",
            "size_gb": 4.1
        }
    }
    
    @staticmethod
    def download_model(model_name: str, models_dir: str = "./data/models/"):
        """Download a recommended model."""
        import requests
        from tqdm import tqdm
        
        if model_name not in ModelManager.RECOMMENDED_MODELS:
            print(f"Model {model_name} not in recommended list.")
            print(f"Available models: {list(ModelManager.RECOMMENDED_MODELS.keys())}")
            return None
        
        model_info = ModelManager.RECOMMENDED_MODELS[model_name]
        os.makedirs(models_dir, exist_ok=True)
        
        file_path = os.path.join(models_dir, model_info["filename"])
        
        if os.path.exists(file_path):
            print(f"Model already exists: {file_path}")
            return file_path
        
        print(f"Downloading {model_name} ({model_info['size_gb']} GB)...")
        
        response = requests.get(model_info["url"], stream=True)
        total_size = int(response.headers.get('content-length', 0))
        
        with open(file_path, 'wb') as file:
            with tqdm(total=total_size, unit='B', unit_scale=True, desc="Downloading") as pbar:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        file.write(chunk)
                        pbar.update(len(chunk))
        
        print(f"Model downloaded: {file_path}")
        return file_path
 
# Example usage
if __name__ == "__main__":
    # Download a model (uncomment to use)
    # model_path = ModelManager.download_model("mistral-7b-instruct")
    
    # For testing, use a placeholder path
    model_path = "./data/models/mistral-7b-instruct-v0.1.Q4_K_M.gguf"
    
    if os.path.exists(model_path):
        # Initialize LLM
        llm = LlamaCppLLM(
            model_path=model_path,
            n_ctx=4096,
            n_threads=8,
            n_gpu_layers=0  # Set to > 0 if you have a compatible GPU
        )
        
        # Test generation
        prompt = "What is machine learning?"
        response = llm.generate(prompt, max_tokens=200, temperature=0.7)
        print(f"Response: {response}")
        
        # Test RAG prompt
        context_chunks = [
            "Machine learning is a method of data analysis that automates analytical model building.",
            "It is a branch of artificial intelligence based on the idea that systems can learn from data."
        ]
        
        rag_prompt = RAGPromptTemplate.create_rag_prompt(
            "What is machine learning?", 
            context_chunks
        )
        
        rag_response = llm.generate(rag_prompt, max_tokens=200, temperature=0.7)
        print(f"RAG Response: {rag_response}")
    else:
        print(f"Model file not found: {model_path}")
        print("Please download a model first using ModelManager.download_model()")

Complete RAG Pipeline

Orchestrating Everything Together

Now let's create the main RAG pipeline that ties everything together:

# src/rag_pipeline.py
import os
import time
from typing import List, Dict, Optional, Tuple
from document_processor import DocumentProcessor
from vector_store import ChromaVectorStore
from llm_interface import LlamaCppLLM, RAGPromptTemplate
import json
 
class LocalRAGPipeline:
    def __init__(self, 
                 model_path: str,
                 documents_dir: str = "./data/documents/",
                 chroma_db_dir: str = "./chroma_db",
                 collection_name: str = "rag_documents",
                 embedding_model: str = "all-MiniLM-L6-v2",
                 chunk_size: int = 1000,
                 chunk_overlap: int = 200):
        """Initialize the complete RAG pipeline.
        
        Args:
            model_path: Path to the llama.cpp model
            documents_dir: Directory containing documents to index
            chroma_db_dir: Directory for ChromaDB persistence
            collection_name: Name of the ChromaDB collection
            embedding_model: Model for generating embeddings
            chunk_size: Size of text chunks
            chunk_overlap: Overlap between chunks
        """
        print("Initializing Local RAG Pipeline...")
        
        # Initialize components
        self.document_processor = DocumentProcessor(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )
        
        self.vector_store = ChromaVectorStore(
            collection_name=collection_name,
            persist_directory=chroma_db_dir,
            embedding_model=embedding_model
        )
        
        self.llm = LlamaCppLLM(model_path=model_path)
        
        self.documents_dir = documents_dir
        self.chat_history = []
        
        print("RAG Pipeline initialized successfully!")
    
    def index_documents(self, force_reindex: bool = False):
        """Index all documents in the documents directory."""
        # Check if collection already has documents
        current_count = self.vector_store.collection.count()
        
        if current_count > 0 and not force_reindex:
            print(f"Collection already contains {current_count} documents.")
            print("Use force_reindex=True to reindex all documents.")
            return
        
        if force_reindex and current_count > 0:
            print("Force reindexing: clearing existing collection...")
            self.vector_store.delete_collection()
            self.vector_store = ChromaVectorStore(
                collection_name=self.vector_store.collection_name,
                persist_directory=self.vector_store.client._settings.persist_directory,
                embedding_model=self.vector_store.embeddings.model_name
            )
        
        print(f"Processing documents from: {self.documents_dir}")
        
        # Process all documents
        chunks = self.document_processor.process_directory(self.documents_dir)
        
        if not chunks:
            print("No documents found to index!")
            return
        
        # Add to vector store
        self.vector_store.add_documents(chunks)
        
        print(f"Successfully indexed {len(chunks)} document chunks!")
    
    def query(self, 
              question: str,
              k: int = 5,
              use_chat_history: bool = False,
              max_tokens: int = 512,
              temperature: float = 0.7) -> Dict[str, any]:
        """Query the RAG pipeline.
        
        Args:
            question: User question
            k: Number of relevant chunks to retrieve
            use_chat_history: Whether to include chat history in context
            max_tokens: Maximum tokens for LLM response
            temperature: Temperature for LLM generation
            
        Returns:
            Dictionary containing answer, sources, and metadata
        """
        start_time = time.time()
        
        # 1. Retrieve relevant documents
        print(f"Retrieving {k} most relevant documents...")
        retrieved_docs = self.vector_store.similarity_search(question, k=k)
        
        if not retrieved_docs:
            return {
                'answer': "I couldn't find any relevant information to answer your question.",
                'sources': [],
                'retrieval_time': time.time() - start_time,
                'generation_time': 0,
                'total_time': time.time() - start_time
            }
        
        retrieval_time = time.time() - start_time
        
        # 2. Prepare context
        context_chunks = [doc['content'] for doc in retrieved_docs]
        
        # 3. Generate response
        generation_start = time.time()
        
        if use_chat_history and self.chat_history:
            # Use chat-based RAG
            messages = RAGPromptTemplate.create_chat_rag_prompt(
                question, context_chunks, self.chat_history[-10:]  # Last 10 exchanges
            )
            answer = self.llm.create_chat_completion(
                messages=messages,
                max_tokens=max_tokens,
                temperature=temperature
            )
        else:
            # Use simple RAG
            prompt = RAGPromptTemplate.create_rag_prompt(question, context_chunks)
            answer = self.llm.generate(
                prompt=prompt,
                max_tokens=max_tokens,
                temperature=temperature
            )
        
        generation_time = time.time() - generation_start
        total_time = time.time() - start_time
        
        # 4. Update chat history
        if use_chat_history:
            self.chat_history.append({"role": "user", "content": question})
            self.chat_history.append({"role": "assistant", "content": answer})
        
        # 5. Prepare sources information
        sources = []
        for doc in retrieved_docs:
            sources.append({
                'content_preview': doc['content'][:200] + "..." if len(doc['content']) > 200 else doc['content'],
                'metadata': doc['metadata'],
                'similarity_score': 1 - doc['distance']
            })
        
        return {
            'answer': answer,
            'sources': sources,
            'retrieval_time': retrieval_time,
            'generation_time': generation_time,
            'total_time': total_time,
            'num_retrieved_docs': len(retrieved_docs)
        }
    
    def query_stream(self, 
                    question: str,
                    k: int = 5,
                    use_chat_history: bool = False,
                    max_tokens: int = 512,
                    temperature: float = 0.7):
        """Query with streaming response."""
        # Retrieve documents
        retrieved_docs = self.vector_store.similarity_search(question, k=k)
        
        if not retrieved_docs:
            yield "I couldn't find any relevant information to answer your question."
            return
        
        # Prepare context and prompt
        context_chunks = [doc['content'] for doc in retrieved_docs]
        
        if use_chat_history and self.chat_history:
            messages = RAGPromptTemplate.create_chat_rag_prompt(
                question, context_chunks, self.chat_history[-10:]
            )
            prompt = self.llm._format_chat_prompt(messages)
        else:
            prompt = RAGPromptTemplate.create_rag_prompt(question, context_chunks)
        
        # Stream response
        full_response = ""
        for token in self.llm.generate_stream(
            prompt=prompt,
            max_tokens=max_tokens,
            temperature=temperature
        ):
            full_response += token
            yield token
        
        # Update chat history
        if use_chat_history:
            self.chat_history.append({"role": "user", "content": question})
            self.chat_history.append({"role": "assistant", "content": full_response})
    
    def clear_chat_history(self):
        """Clear the chat history."""
        self.chat_history = []
        print("Chat history cleared.")
    
    def get_stats(self) -> Dict:
        """Get pipeline statistics."""
        return {
            'vector_store_stats': self.vector_store.get_collection_stats(),
            'chat_history_length': len(self.chat_history),
            'model_info': {
                'context_length': self.llm.llm.n_ctx(),
                'vocab_size': self.llm.llm.n_vocab()
            }
        }
    
    def save_conversation(self, filename: str):
        """Save current conversation to file."""
        conversation_data = {
            'chat_history': self.chat_history,
            'timestamp': time.time(),
            'stats': self.get_stats()
        }
        
        with open(filename, 'w') as f:
            json.dump(conversation_data, f, indent=2)
        
        print(f"Conversation saved to: {filename}")
    
    def load_conversation(self, filename: str):
        """Load conversation from file."""
        try:
            with open(filename, 'r') as f:
                conversation_data = json.load(f)
            
            self.chat_history = conversation_data.get('chat_history', [])
            print(f"Conversation loaded from: {filename}")
            print(f"Chat history length: {len(self.chat_history)}")
            
        except Exception as e:
            print(f"Error loading conversation: {e}")
 
# Interactive CLI interface
class RAGChatInterface:
    def __init__(self, rag_pipeline: LocalRAGPipeline):
        self.rag = rag_pipeline
        self.use_streaming = True
        self.use_chat_history = True
    
    def run(self):
        """Run interactive chat interface."""
        print("\n" + "="*60)
        print("🤖 Local RAG Chat Interface")
        print("="*60)
        print("Commands:")
        print("  /help     - Show this help")
        print("  /stats    - Show pipeline statistics")
        print("  /clear    - Clear chat history")
        print("  /stream   - Toggle streaming mode")
        print("  /history  - Toggle chat history usage")
        print("  /save     - Save conversation")
        print("  /load     - Load conversation")
        print("  /quit     - Exit")
        print("="*60)
        
        while True:
            try:
                question = input("\n💬 You: ").strip()
                
                if not question:
                    continue
                
                if question.startswith('/'):
                    self._handle_command(question)
                    continue
                
                print("\n🤖 Assistant: ", end="", flush=True)
                
                if self.use_streaming:
                    for token in self.rag.query_stream(
                        question, 
                        use_chat_history=self.use_chat_history
                    ):
                        print(token, end="", flush=True)
                    print()  # New line after streaming
                else:
                    result = self.rag.query(
                        question, 
                        use_chat_history=self.use_chat_history
                    )
                    print(result['answer'])
                    
                    # Show performance metrics
                    print(f"\n⏱️  Retrieval: {result['retrieval_time']:.2f}s, "
                          f"Generation: {result['generation_time']:.2f}s, "
                          f"Total: {result['total_time']:.2f}s")
            
            except KeyboardInterrupt:
                print("\n\nGoodbye! 👋")
                break
            except Exception as e:
                print(f"\n❌ Error: {e}")
    
    def _handle_command(self, command: str):
        """Handle chat commands."""
        cmd = command.lower().strip()
        
        if cmd == '/help':
            print("\n📖 Available commands:")
            print("  /help     - Show this help")
            print("  /stats    - Show pipeline statistics")
            print("  /clear    - Clear chat history")
            print("  /stream   - Toggle streaming mode")
            print("  /history  - Toggle chat history usage")
            print("  /save     - Save conversation")
            print("  /load     - Load conversation")
            print("  /quit     - Exit")
        
        elif cmd == '/stats':
            stats = self.rag.get_stats()
            print("\n📊 Pipeline Statistics:")
            print(f"  Documents indexed: {stats['vector_store_stats']['document_count']}")
            print(f"  Chat history length: {stats['chat_history_length']}")
            print(f"  Model context length: {stats['model_info']['context_length']}")
            print(f"  Streaming mode: {self.use_streaming}")
            print(f"  Chat history mode: {self.use_chat_history}")
        
        elif cmd == '/clear':
            self.rag.clear_chat_history()
            print("\n🧹 Chat history cleared!")
        
        elif cmd == '/stream':
            self.use_streaming = not self.use_streaming
            print(f"\n🔄 Streaming mode: {'ON' if self.use_streaming else 'OFF'}")
        
        elif cmd == '/history':
            self.use_chat_history = not self.use_chat_history
            print(f"\n🔄 Chat history mode: {'ON' if self.use_chat_history else 'OFF'}")
        
        elif cmd == '/save':
            filename = input("Enter filename (or press Enter for default): ").strip()
            if not filename:
                filename = f"conversation_{int(time.time())}.json"
            self.rag.save_conversation(filename)
        
        elif cmd == '/load':
            filename = input("Enter filename to load: ").strip()
            if filename:
                self.rag.load_conversation(filename)
        
        elif cmd == '/quit':
            print("\nGoodbye! 👋")
            exit(0)
        
        else:
            print(f"\n❓ Unknown command: {command}")
            print("Type /help for available commands.")
 
# Example usage
if __name__ == "__main__":
    # Configuration
    MODEL_PATH = "./data/models/mistral-7b-instruct-v0.1.Q4_K_M.gguf"
    DOCUMENTS_DIR = "./data/documents/"
    
    # Check if model exists
    if not os.path.exists(MODEL_PATH):
        print(f"Model not found: {MODEL_PATH}")
        print("Please download a model first using the ModelManager class.")
        exit(1)
    
    # Initialize RAG pipeline
    rag_pipeline = LocalRAGPipeline(
        model_path=MODEL_PATH,
        documents_dir=DOCUMENTS_DIR,
        chunk_size=800,
        chunk_overlap=150
    )
    
    # Index documents (if not already done)
    rag_pipeline.index_documents()
    
    # Start interactive chat
    chat_interface = RAGChatInterface(rag_pipeline)
    chat_interface.run()

Evaluation and Optimization

RAG Evaluation Metrics

Evaluating RAG systems is crucial for understanding their performance:

# src/evaluation.py
import json
import numpy as np
from typing import List, Dict, Tuple
from dataclasses import dataclass
import time
from sklearn.metrics.pairwise import cosine_similarity
from embeddings import LocalEmbeddings
 
@dataclass
class EvaluationResult:
    """Container for evaluation results."""
    retrieval_accuracy: float
    answer_relevance: float
    answer_faithfulness: float
    response_time: float
    context_precision: float
    context_recall: float
 
class RAGEvaluator:
    def __init__(self, rag_pipeline, embedding_model: str = "all-MiniLM-L6-v2"):
        """Initialize RAG evaluator.
        
        Args:
            rag_pipeline: The RAG pipeline to evaluate
            embedding_model: Model for computing semantic similarity
        """
        self.rag_pipeline = rag_pipeline
        self.embeddings = LocalEmbeddings(embedding_model)
    
    def evaluate_retrieval(self, 
                          queries_and_expected: List[Dict[str, any]],
                          k: int = 5) -> Dict[str, float]:
        """Evaluate retrieval performance.
        
        Args:
            queries_and_expected: List of dicts with 'query' and 'expected_docs'
            k: Number of documents to retrieve
            
        Returns:
            Dictionary with retrieval metrics
        """
        precision_scores = []
        recall_scores = []
        mrr_scores = []  # Mean Reciprocal Rank
        
        for item in queries_and_expected:
            query = item['query']
            expected_doc_ids = set(item['expected_docs'])
            
            # Retrieve documents
            retrieved_docs = self.rag_pipeline.vector_store.similarity_search(query, k=k)
            retrieved_doc_ids = set([doc['id'] for doc in retrieved_docs])
            
            # Calculate metrics
            if retrieved_doc_ids:
                precision = len(expected_doc_ids.intersection(retrieved_doc_ids)) / len(retrieved_doc_ids)
                recall = len(expected_doc_ids.intersection(retrieved_doc_ids)) / len(expected_doc_ids) if expected_doc_ids else 0
                
                # Mean Reciprocal Rank
                reciprocal_rank = 0
                for i, doc_id in enumerate([doc['id'] for doc in retrieved_docs]):
                    if doc_id in expected_doc_ids:
                        reciprocal_rank = 1 / (i + 1)
                        break
                
                precision_scores.append(precision)
                recall_scores.append(recall)
                mrr_scores.append(reciprocal_rank)
        
        return {
            'precision': np.mean(precision_scores) if precision_scores else 0,
            'recall': np.mean(recall_scores) if recall_scores else 0,
            'mrr': np.mean(mrr_scores) if mrr_scores else 0,
            'f1': 2 * np.mean(precision_scores) * np.mean(recall_scores) / (np.mean(precision_scores) + np.mean(recall_scores)) if (np.mean(precision_scores) + np.mean(recall_scores)) > 0 else 0
        }
    
    def evaluate_answer_relevance(self, 
                                 queries_and_answers: List[Dict[str, str]]) -> float:
        """Evaluate how relevant answers are to queries using semantic similarity.
        
        Args:
            queries_and_answers: List of dicts with 'query' and 'answer'
            
        Returns:
            Average relevance score
        """
        relevance_scores = []
        
        for item in queries_and_answers:
            query = item['query']
            answer = item['answer']
            
            # Calculate semantic similarity
            query_embedding = self.embeddings.embed_text(query)
            answer_embedding = self.embeddings.embed_text(answer)
            
            similarity = cosine_similarity([query_embedding], [answer_embedding])[0][0]
            relevance_scores.append(similarity)
        
        return np.mean(relevance_scores) if relevance_scores else 0
    
    def evaluate_answer_faithfulness(self, 
                                   answers_and_contexts: List[Dict[str, any]]) -> float:
        """Evaluate how faithful answers are to the provided context.
        
        Args:
            answers_and_contexts: List of dicts with 'answer' and 'context_chunks'
            
        Returns:
            Average faithfulness score
        """
        faithfulness_scores = []
        
        for item in answers_and_contexts:
            answer = item['answer']
            context_chunks = item['context_chunks']
            
            # Combine context
            full_context = " ".join(context_chunks)
            
            # Calculate semantic similarity between answer and context
            answer_embedding = self.embeddings.embed_text(answer)
            context_embedding = self.embeddings.embed_text(full_context)
            
            similarity = cosine_similarity([answer_embedding], [context_embedding])[0][0]
            faithfulness_scores.append(similarity)
        
        return np.mean(faithfulness_scores) if faithfulness_scores else 0
    
    def evaluate_response_time(self, queries: List[str], num_runs: int = 3) -> Dict[str, float]:
        """Evaluate response time performance.
        
        Args:
            queries: List of queries to test
            num_runs: Number of runs for each query
            
        Returns:
            Dictionary with timing statistics
        """
        all_times = []
        retrieval_times = []
        generation_times = []
        
        for query in queries:
            for _ in range(num_runs):
                result = self.rag_pipeline.query(query)
                all_times.append(result['total_time'])
                retrieval_times.append(result['retrieval_time'])
                generation_times.append(result['generation_time'])
        
        return {
            'mean_total_time': np.mean(all_times),
            'std_total_time': np.std(all_times),
            'mean_retrieval_time': np.mean(retrieval_times),
            'mean_generation_time': np.mean(generation_times),
            'p95_total_time': np.percentile(all_times, 95),
            'p99_total_time': np.percentile(all_times, 99)
        }
    
    def comprehensive_evaluation(self, 
                               test_dataset: Dict[str, List[Dict]]) -> EvaluationResult:
        """Run comprehensive evaluation.
        
        Args:
            test_dataset: Dictionary with test data for different metrics
            
        Returns:
            EvaluationResult object with all metrics
        """
        print("Running comprehensive RAG evaluation...")
        
        # Retrieval evaluation
        if 'retrieval' in test_dataset:
            print("Evaluating retrieval performance...")
            retrieval_metrics = self.evaluate_retrieval(test_dataset['retrieval'])
            retrieval_accuracy = retrieval_metrics['f1']
        else:
            retrieval_accuracy = 0
        
        # Answer relevance
        if 'relevance' in test_dataset:
            print("Evaluating answer relevance...")
            answer_relevance = self.evaluate_answer_relevance(test_dataset['relevance'])
        else:
            answer_relevance = 0
        
        # Answer faithfulness
        if 'faithfulness' in test_dataset:
            print("Evaluating answer faithfulness...")
            answer_faithfulness = self.evaluate_answer_faithfulness(test_dataset['faithfulness'])
        else:
            answer_faithfulness = 0
        
        # Response time
        if 'performance' in test_dataset:
            print("Evaluating response time...")
            queries = [item['query'] for item in test_dataset['performance']]
            timing_metrics = self.evaluate_response_time(queries)
            response_time = timing_metrics['mean_total_time']
        else:
            response_time = 0
        
        return EvaluationResult(
            retrieval_accuracy=retrieval_accuracy,
            answer_relevance=answer_relevance,
            answer_faithfulness=answer_faithfulness,
            response_time=response_time,
            context_precision=0,  # Placeholder for more advanced metrics
            context_recall=0      # Placeholder for more advanced metrics
        )
 
class RAGOptimizer:
    """Optimizer for RAG pipeline parameters."""
    
    def __init__(self, rag_pipeline, evaluator: RAGEvaluator):
        self.rag_pipeline = rag_pipeline
        self.evaluator = evaluator
    
    def optimize_chunk_size(self, 
                           test_queries: List[str],
                           chunk_sizes: List[int] = [500, 800, 1000, 1500, 2000]) -> Dict[int, float]:
        """Optimize chunk size parameter.
        
        Args:
            test_queries: Queries to test with
            chunk_sizes: List of chunk sizes to test
            
        Returns:
            Dictionary mapping chunk sizes to performance scores
        """
        print("Optimizing chunk size...")
        results = {}
        
        original_chunk_size = self.rag_pipeline.document_processor.chunk_size
        
        for chunk_size in chunk_sizes:
            print(f"Testing chunk size: {chunk_size}")
            
            # Update chunk size and reindex
            self.rag_pipeline.document_processor.chunk_size = chunk_size
            self.rag_pipeline.index_documents(force_reindex=True)
            
            # Evaluate performance
            timing_metrics = self.evaluator.evaluate_response_time(test_queries, num_runs=2)
            
            # Simple scoring function (can be improved)
            score = 1 / timing_metrics['mean_total_time']  # Higher is better
            results[chunk_size] = score
            
            print(f"Chunk size {chunk_size}: score = {score:.4f}")
        
        # Restore original chunk size
        self.rag_pipeline.document_processor.chunk_size = original_chunk_size
        
        return results
    
    def optimize_retrieval_k(self, 
                           test_queries: List[str],
                           k_values: List[int] = [3, 5, 7, 10, 15]) -> Dict[int, float]:
        """Optimize number of retrieved documents.
        
        Args:
            test_queries: Queries to test with
            k_values: List of k values to test
            
        Returns:
            Dictionary mapping k values to performance scores
        """
        print("Optimizing retrieval k...")
        results = {}
        
        for k in k_values:
            print(f"Testing k={k}")
            
            total_time = 0
            for query in test_queries:
                start_time = time.time()
                self.rag_pipeline.query(query, k=k)
                total_time += time.time() - start_time
            
            avg_time = total_time / len(test_queries)
            score = 1 / avg_time  # Higher is better
            results[k] = score
            
            print(f"k={k}: score = {score:.4f}")
        
        return results
 
# Example evaluation dataset creation
def create_sample_evaluation_dataset() -> Dict[str, List[Dict]]:
    """Create a sample evaluation dataset for testing."""
    return {
        'retrieval': [
            {
                'query': 'What is machine learning?',
                'expected_docs': ['doc1', 'doc2']  # These would be actual document IDs
            }
        ],
        'relevance': [
            {
                'query': 'What is machine learning?',
                'answer': 'Machine learning is a method of data analysis that automates analytical model building.'
            }
        ],
        'faithfulness': [
            {
                'answer': 'Machine learning is a method of data analysis that automates analytical model building.',
                'context_chunks': [
                    'Machine learning is a method of data analysis that automates analytical model building.',
                    'It is a branch of artificial intelligence based on the idea that systems can learn from data.'
                ]
            }
        ],
        'performance': [
            {'query': 'What is machine learning?'},
            {'query': 'How does deep learning work?'},
            {'query': 'What are neural networks?'}
        ]
    }
 
# Example usage
if __name__ == "__main__":
    # This would typically be run with an actual RAG pipeline
    print("RAG Evaluation Framework")
    print("This module provides tools for evaluating RAG pipeline performance.")
    
    # Create sample dataset
    test_dataset = create_sample_evaluation_dataset()
    print(f"Sample dataset created with {len(test_dataset)} evaluation categories")

Key Takeaways

  1. Local Advantage: Running RAG locally provides privacy, cost control, and offline capabilities
  2. Component Integration: Success depends on properly integrating document processing, embeddings, vector storage, and LLM inference
  3. Chunking Strategy: Optimal chunk size and overlap significantly impact retrieval quality
  4. Evaluation is Critical: Implement comprehensive evaluation metrics to measure and improve performance
  5. Optimization Opportunities: Systematically optimize parameters like chunk size, retrieval count, and model parameters
  6. Production Considerations: Include error handling, logging, and monitoring for production deployments
  7. Scalability: ChromaDB and llama.cpp provide good scalability for local deployments

Conclusion

Building a local RAG pipeline offers significant advantages for privacy-sensitive applications and cost-conscious deployments. The combination of llama.cpp for efficient LLM inference and ChromaDB for vector storage provides a robust foundation for production-ready systems.

Key success factors include:

  • Proper document preprocessing and chunking
  • High-quality embedding models
  • Systematic evaluation and optimization
  • Robust error handling and monitoring

This implementation provides a solid starting point that can be extended with additional features like multi-modal support, advanced retrieval strategies, and integration with existing systems.

Remember to continuously evaluate and optimize your RAG pipeline as you add more documents and encounter new use cases. The evaluation framework provided here will help you measure improvements and identify areas for optimization.


The complete code for this tutorial is available in the accompanying GitHub repository, including example documents and evaluation datasets.

Tags

#RAG#LLM#AI#Python#ChromaDB#llama.cpp#Vector Database
Was this helpful?