Skip to main content

RAG Systems in Production: Building Enterprise Knowledge Search

A comprehensive guide to building production-ready Retrieval-Augmented Generation (RAG) systems. Learn about vector databases, embedding strategies, retrieval optimization, and real-world implementation patterns for enterprise knowledge search.

Introduction

Retrieval-Augmented Generation (RAG) has revolutionized how enterprises build intelligent knowledge systems. By combining the power of large language models with domain-specific knowledge, RAG systems can answer questions, synthesize information, and provide insights that pure LLMs cannot achieve alone.

At Groovy Web, we've built and deployed RAG systems for Fortune 500 companies, helping them unlock the value of their organizational knowledge. This guide captures everything we've learned from building production RAG systems that serve millions of queries per month.

Fortune 500
Enterprise Clients
RAG systems deployed for Fortune 500 companies
Millions
Queries / Month
Production systems serving millions of queries monthly
90%
Cost Reduction
Cheaper than managed vector database alternatives
Days
Time to Production
RAG systems reach production in days to weeks vs months for fine-tuning

Understanding RAG Systems

What is RAG?

RAG (Retrieval-Augmented Generation) is a technique that enhances large language models by retrieving relevant context from a knowledge base before generating responses.

Without RAG:

User Question β†’ LLM β†’ Answer (Limited to training data)

With RAG:

User Question β†’ Retrieve Relevant Documents β†’ LLM + Context β†’ Answer (Grounded in knowledge base)

Why RAG for Enterprise?

1. Domain-Specific Knowledge

LLMs are trained on public internet data, but enterprises have proprietary information:

  • Internal documentation
  • Product specifications
  • Customer interactions
  • Research papers
  • Compliance documents

RAG systems enable LLMs to access this private knowledge.

2. Reduced Hallucinations

By grounding responses in retrieved documents, RAG systems:

  • Cite sources
  • Provide verifiable information
  • Reduce false claims
  • Build user trust

3. Cost-Effective

Compared to fine-tuning:

  • No model training required
  • Easy to update knowledge base
  • Lower infrastructure costs
  • Faster time to production

4. Transparency and Compliance

RAG systems provide:

  • Source attribution
  • Audit trails
  • Compliance with regulations
  • Explainable AI

RAG vs Fine-Tuning

Aspect RAG Fine-Tuning
Knowledge updates Instant (add to database) Requires retraining
Cost Low ($/query) High (training costs)
Domain specificity High (source data) Medium (pattern learning)
Hallucination risk Low (grounded) Medium (model-based)
Transparency High (citations) Low (black box)
Setup time Days to weeks Weeks to months
Maintenance Ongoing indexing Periodic retraining

Best Use Cases for RAG:

  • Knowledge search and Q&A
  • Document analysis
  • Customer support automation
  • Research assistance
  • Compliance and legal review

Best Use Cases for Fine-Tuning:

  • Style and tone customization
  • Format standardization
  • Domain-specific reasoning
  • Specialized instruction following

System Architecture

End-to-End RAG Pipeline

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    KNOWLEDGE BASE                            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚  β”‚  Documents  β”‚  β”‚   Vectors   β”‚  β”‚  Metadata   β”‚         β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        β”‚
                        β”‚ Ingestion Pipeline
                        β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  PROCESSING LAYER                            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚  Chunk   β”‚β†’β”‚ Embed    β”‚β†’β”‚  Index   β”‚β†’β”‚  Store   β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        β”‚
                        β”‚ Query
                        β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  RETRIEVAL LAYER                            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚
β”‚  β”‚  Query     β”‚β†’β”‚  Semantic  β”‚β†’β”‚  Hybrid    β”‚           β”‚
β”‚  β”‚ Embedding  β”‚  β”‚  Search    β”‚  β”‚  Search    β”‚           β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚
β”‚                      β”‚                                      β”‚
β”‚                      β–Ό                                      β”‚
β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                               β”‚
β”‚              β”‚  Rerank &    β”‚                               β”‚
β”‚              β”‚  Filter      β”‚                               β”‚
β”‚              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        β”‚
                        β”‚ Context
                        β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  GENERATION LAYER                            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚
β”‚  β”‚  Prompt    β”‚β†’β”‚    LLM     β”‚β†’β”‚  Response  β”‚           β”‚
β”‚  β”‚  Building  β”‚  β”‚  Inference β”‚  β”‚  Synthesis β”‚           β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        β”‚
                        β–Ό
                    User Response

Component Breakdown

1. Ingestion Pipeline

# ingestion/pipeline.py
from typing import List, Dict
from pathlib import Path
import hashlib

class DocumentIngestionPipeline:
    """Process and ingest documents into knowledge base"""

    def __init__(self, config: Dict):
        self.chunker = DocumentChunker(config['chunking'])
        self.embedder = EmbeddingGenerator(config['embeddings'])
        self.vector_store = VectorStore(config['vector_db'])

    async def ingest_document(self, document: Dict) -> List[str]:
        """
        Ingest a document into the knowledge base

        Returns: List of chunk IDs
        """
        # 1. Extract text and metadata
        text = document['content']
        metadata = {
            'title': document['title'],
            'source': document['source'],
            'author': document.get('author'),
            'created_at': document.get('created_at'),
            'doc_type': document.get('type', 'unknown'),
            'language': document.get('language', 'en')
        }

        # 2. Split into chunks
        chunks = self.chunker.chunk(text)

        # 3. Generate embeddings
        chunk_texts = [chunk['text'] for chunk in chunks]
        embeddings = await self.embedder.generate_batch(chunk_texts)

        # 4. Prepare records for storage
        records = []
        for chunk, embedding in zip(chunks, embeddings):
            record = {
                'id': self._generate_chunk_id(document['id'], chunk['index']),
                'document_id': document['id'],
                'text': chunk['text'],
                'embedding': embedding,
                'metadata': {
                    **metadata,
                    'chunk_index': chunk['index'],
                    'chunk_size': len(chunk['text']),
                    'start_char': chunk['start'],
                    'end_char': chunk['end']
                }
            }
            records.append(record)

        # 5. Store in vector database
        chunk_ids = await self.vector_store.insert(records)

        return chunk_ids

    def _generate_chunk_id(self, doc_id: str, chunk_index: int) -> str:
        """Generate unique chunk ID"""
        hash_input = f"{doc_id}_{chunk_index}"
        return hashlib.sha256(hash_input.encode()).hexdigest()[:32]

2. Retrieval Engine

# retrieval/engine.py
from typing import List, Dict, Optional
import numpy as np

class RetrievalEngine:
    """Retrieve relevant documents for queries"""

    def __init__(self, vector_store, embedder, config: Dict):
        self.vector_store = vector_store
        self.embedder = embedder
        self.config = config
        self.reranker = Reranker(config.get('reranking'))

    async def retrieve(
        self,
        query: str,
        top_k: int = 10,
        filters: Optional[Dict] = None
    ) -> List[Dict]:
        """
        Retrieve relevant chunks for a query

        Args:
            query: User query
            top_k: Number of results to return
            filters: Metadata filters (e.g., {category: 'technology'})

        Returns:
            List of retrieved chunks with scores
        """
        # 1. Generate query embedding
        query_embedding = await self.embedder.generate(query)

        # 2. Semantic search
        results = await self.vector_store.similarity_search(
            query_embedding,
            top_k=top_k * 2,  # Retrieve more for reranking
            filters=filters
        )

        # 3. Rerank if configured
        if self.reranker and len(results) > top_k:
            results = await self.reranker.rerank(query, results, top_k)

        return results[:top_k]

    async def retrieve_with_hybrid_search(
        self,
        query: str,
        top_k: int = 10,
        alpha: float = 0.5,
        filters: Optional[Dict] = None
    ) -> List[Dict]:
        """
        Hybrid retrieval combining semantic and keyword search

        Args:
            query: User query
            top_k: Number of results
            alpha: Weight for semantic search (0-1)
            filters: Metadata filters

        Returns:
            Reranked combined results
        """
        # 1. Semantic search
        semantic_results = await self.vector_store.similarity_search(
            await self.embedder.generate(query),
            top_k=top_k * 2,
            filters=filters
        )

        # 2. Keyword search
        keyword_results = await self.vector_store.keyword_search(
            query,
            top_k=top_k * 2,
            filters=filters
        )

        # 3. Combine and rerank
        combined = self._combine_results(
            semantic_results,
            keyword_results,
            alpha
        )

        # 4. Rerank combined results
        if self.reranker:
            combined = await self.reranker.rerank(query, combined, top_k)

        return combined[:top_k]

    def _combine_results(
        self,
        semantic_results: List[Dict],
        keyword_results: List[Dict],
        alpha: float
    ) -> List[Dict]:
        """Combine semantic and keyword search results"""
        # Score normalization
        sem_scores = np.array([r['score'] for r in semantic_results])
        key_scores = np.array([r['score'] for r in keyword_results])

        sem_normalized = (sem_scores - sem_scores.min()) / (sem_scores.max() - sem_scores.min())
        key_normalized = (key_scores - key_scores.min()) / (key_scores.max() - key_scores.min())

        # Combine scores
        for i, result in enumerate(semantic_results):
            result['combined_score'] = alpha * sem_normalized[i]

        for i, result in enumerate(keyword_results):
            result['combined_score'] += (1 - alpha) * key_normalized[i]

        # Merge and sort by combined score
        seen = set()
        combined = []
        for result in semantic_results + keyword_results:
            if result['id'] not in seen:
                seen.add(result['id'])
                combined.append(result)

        combined.sort(key=lambda x: x['combined_score'], reverse=True)
        return combined

3. Response Generator

# generation/generator.py
from typing import List, Dict
import openai

class ResponseGenerator:
    """Generate responses using retrieved context"""

    def __init__(self, config: Dict):
        self.client = openai.AsyncClient(api_key=config['api_key'])
        self.model = config['model']
        self.temperature = config.get('temperature', 0.3)
        self.max_tokens = config.get('max_tokens', 1000)

    async def generate_response(
        self,
        query: str,
        context: List[Dict],
        conversation_history: Optional[List[Dict]] = None
    ) -> Dict:
        """
        Generate response using retrieved context

        Args:
            query: User query
            context: Retrieved chunks
            conversation_history: Previous messages (for chat)

        Returns:
            Generated response with citations
        """
        # 1. Build prompt with context
        prompt = self._build_prompt(query, context)

        # 2. Generate response
        messages = self._build_messages(prompt, conversation_history)

        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            temperature=self.temperature,
            max_tokens=self.max_tokens
        )

        # 3. Extract response and citations
        answer = response.choices[0].message.content
        citations = self._extract_citations(response, context)

        return {
            'answer': answer,
            'citations': citations,
            'sources': self._get_unique_sources(context),
            'model': self.model,
            'tokens_used': response.usage.total_tokens
        }

    def _build_prompt(self, query: str, context: List[Dict]) -> str:
        """Build prompt with context"""
        context_str = "

".join([
            f"[Source {i+1}]
{chunk['text']}"
            for i, chunk in enumerate(context)
        ])

        prompt = f"""You are a helpful assistant that answers questions based on the provided context.

Context:
{context_str}

Question: {query}

Instructions:
1. Answer the question using only the provided context
2. If the context doesn't contain enough information, say so
3. Cite sources using [Source X] notation
4. Be concise and accurate
5. If asked for sources, provide them

Answer:"""

        return prompt

    def _build_messages(
        self,
        prompt: str,
        history: Optional[List[Dict]] = None
    ) -> List[Dict]:
        """Build message list for API"""
        messages = []

        if history:
            messages.extend(history)

        messages.append({
            "role": "user",
            "content": prompt
        })

        return messages

    def _extract_citations(
        self,
        response: openai.ChatCompletion,
        context: List[Dict]
    ) -> List[Dict]:
        """Extract citations from response"""
        answer = response.choices[0].message.content

        # Find source references like [Source 1], [Source 2], etc.
        import re
        citations = re.findall(r'\[Source (\d+)\]', answer)

        # Map to actual source chunks
        unique_citations = []
        for citation in set(citations):
            idx = int(citation) - 1  # Convert to 0-based index
            if idx < len(context):
                unique_citations.append({
                    'index': int(citation),
                    'chunk_id': context[idx]['id'],
                    'document_id': context[idx]['metadata']['document_id'],
                    'title': context[idx]['metadata']['title'],
                    'source': context[idx]['metadata']['source']
                })

        return unique_citations

    def _get_unique_sources(self, context: List[Dict]) -> List[Dict]:
        """Get unique sources from context"""
        seen = set()
        sources = []

        for chunk in context:
            doc_id = chunk['metadata']['document_id']
            if doc_id not in seen:
                seen.add(doc_id)
                sources.append({
                    'document_id': doc_id,
                    'title': chunk['metadata']['title'],
                    'source': chunk['metadata']['source'],
                    'author': chunk['metadata'].get('author'),
                    'created_at': chunk['metadata'].get('created_at')
                })

        return sources

Vector Database Selection

Comparison Matrix

Database Open Source Cloud Managed Performance Scalability Features Cost
pgvector βœ… βœ… (Supabase, etc.) ⭐⭐⭐⭐ ⭐⭐⭐⭐ Relational DB + vectors $
Pinecone ❌ βœ… ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Purpose-built, easy $$$
Weaviate βœ… βœ… ⭐⭐⭐⭐ ⭐⭐⭐⭐ GraphQL, multi-modal $$
Qdrant βœ… βœ… ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Filter optimization, hybrid $$
Milvus βœ… βœ… (Zilliz) ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Distributed, cloud-native $$
Chroma βœ… ❌ ⭐⭐⭐ ⭐⭐⭐ Simple, embedded Free

Selection Criteria

Choose pgvector if:

  • Already using PostgreSQL
  • Need ACID transactions
  • Want to minimize infrastructure
  • Budget-conscious
  • Need SQL joins with vector search

Choose Pinecone if:

  • Want fully managed solution
  • Need auto-scaling
  • Prioritize ease of setup
  • Have budget for managed service
  • Want fastest time to production

Choose Qdrant if:

  • Need advanced filtering
  • Want hybrid search capabilities
  • Require high performance
  • Prefer open-source with managed option

Choose Weaviate if:

  • Need multi-modal search (text + image)
  • Want GraphQL API
  • Require modular architecture
  • Building knowledge graphs

Our Choice: pgvector

We recommend pgvector for most enterprise RAG systems because:

1. Unified Data Model

-- Single query for vectors + metadata
SELECT
  d.title,
  d.content,
  d.metadata->>'category' as category,
  1 - (d.embedding <=> query_embedding) as similarity
FROM documents d
JOIN document_tags dt ON d.id = dt.document_id
WHERE d.status = 'published'
  AND dt.tag_id = ANY(SELECT id FROM tags WHERE name IN ('AI', 'ML'))
  AND d.created_at > NOW() - INTERVAL '1 year'
ORDER BY d.embedding <=> query_embedding
LIMIT 20;

2. Cost Effective

  • No separate vector database needed
  • Use existing PostgreSQL infrastructure
  • Self-hosted option available
  • 90% cheaper than managed alternatives

3. Mature Ecosystem

  • Backup/restore tools
  • Replication and HA
  • Monitoring and observability
  • ORM support (SQLAlchemy, Django ORM)

4. Performance

-- With proper indexing
CREATE INDEX idx_documents_embedding_hnsw ON documents
  USING hnsw(embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);

-- Query performance: 15-30ms for 1M vectors

Embedding Strategies

Model Selection

Model Dimensions Context Length Speed Quality Cost/1M tokens
text-embedding-3-small 1536 8191 Fast Good $0.02
text-embedding-3-large 3072 8191 Medium Excellent $0.13
text-embedding-ada-002 1536 8191 Fast Good $0.10
bge-large-en-v1.5 1024 512 Fast Excellent Free (self-hosted)
e5-large-v2 1024 512 Fast Very Good Free (self-hosted)

Recommendation

For most enterprise use cases: text-embedding-3-small

embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",
    dimensions=1536  # Can truncate to 512 for faster search
)

Why?

  • Best price/performance ratio
  • Good quality for most domains
  • Long context (8191 tokens)
  • Multi-language support
  • Lower storage costs

For specialized domains: Open-source models (self-hosted)

# For legal/medical/technical content
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('BAAI/bge-large-en-v1.5')
embeddings = model.encode(texts)

Embedding Optimization

1. Dimensionality Reduction

# Reduce from 1536 to 512 dimensions (faster search, lower storage)
import numpy as np
from sklearn.decomposition import PCA

def reduce_dimensions(embeddings: np.ndarray, target_dim: int = 512) -> np.ndarray:
    """Reduce embedding dimensions using PCA"""
    pca = PCA(n_components=target_dim)
    return pca.fit_transform(embeddings)

# Usage
full_embeddings = np.array([...])  # (N, 1536)
reduced_embeddings = reduce_dimensions(full_embeddings, 512)

Trade-offs:

  • 1536 dims: Best quality, slower search
  • 768 dims: Good balance
  • 512 dims: Faster search, slight quality loss
  • 256 dims: Fastest search, noticeable quality loss

2. Hybrid Embeddings

# Combine semantic and keyword embeddings
class HybridEmbedding:
    def __init__(self):
        self.semantic_model = OpenAIEmbeddings(model="text-embedding-3-small")
        self.bm25 = BM25Encoder()

    def embed_documents(self, texts: List[str]) -> Dict[str, np.ndarray]:
        """Generate both semantic and keyword embeddings"""
        semantic = self.semantic_model.embed_documents(texts)
        keyword = self.bm25.encode_documents(texts)

        return {
            'semantic': np.array(semantic),
            'keyword': np.array(keyword)
        }

3. Query Expansion

# Expand queries with related terms for better retrieval
async def expand_query(query: str, llm) -> List[str]:
    """Generate query variations"""
    prompt = f"""Generate 3-5 alternative queries for: "{query}"

    Consider:
    - Synonyms
    - Related concepts
    - Different phrasings
    - Broader/narrower terms

    Return one query per line."""

    response = await llm.generate(prompt)
    variations = [line.strip() for line in response.split('
') if line.strip()]

    return [query] + variations

# Usage
query_variations = await expand_query("How to implement RAG?", llm)
# Returns: [
#   "How to implement RAG?",
#   "Building retrieval-augmented generation systems",
#   "RAG implementation guide",
#   "Creating RAG applications",
#   "RAG system architecture"
# ]

Chunking Techniques

Why Chunking Matters

Chunking is the most critical decision in RAG systems:

  • Too small β€” Loss of context
  • Too large β€” Noisy retrieval, slow generation
  • Poor boundaries β€” Fragmented information

Chunking Strategies

1. Fixed-Size Chunking

# chunking/fixed_size.py
from typing import List, Dict

class FixedSizeChunker:
    """Split text into fixed-size chunks"""

    def __init__(self, chunk_size: int = 1000, overlap: int = 200):
        self.chunk_size = chunk_size
        self.overlap = overlap

    def chunk(self, text: str) -> List[Dict]:
        """Split text into chunks"""
        chunks = []
        start = 0
        chunk_index = 0

        while start < len(text):
            end = start + self.chunk_size
            chunk_text = text[start:end]

            chunks.append({
                'text': chunk_text,
                'index': chunk_index,
                'start': start,
                'end': end,
                'size': len(chunk_text)
            })

            chunk_index += 1
            start = end - self.overlap

        return chunks

# Pros: Simple, predictable
# Cons: May split sentences, loses context

2. Sentence-Based Chunking

# chunking/sentence.py
import re
from typing import List, Dict

class SentenceChunker:
    """Split text into sentence-based chunks"""

    def __init__(self, sentences_per_chunk: int = 5, overlap: int = 1):
        self.sentences_per_chunk = sentences_per_chunk
        self.overlap = overlap

    def chunk(self, text: str) -> List[Dict]:
        """Split text into sentence-based chunks"""
        # Split into sentences
        sentences = re.split(r'(?<=[.!?])\s+', text)

        chunks = []
        chunk_index = 0
        i = 0

        while i < len(sentences):
            # Get sentences for this chunk
            end = min(i + self.sentences_per_chunk, len(sentences))
            chunk_sentences = sentences[i:end]
            chunk_text = ' '.join(chunk_sentences)

            start_char = text.find(chunk_sentences[0])
            end_char = start_char + len(chunk_text)

            chunks.append({
                'text': chunk_text,
                'index': chunk_index,
                'start': start_char,
                'end': end_char,
                'size': len(chunk_text),
                'sentence_count': len(chunk_sentences)
            })

            chunk_index += 1
            i += self.sentences_per_chunk - self.overlap

        return chunks

# Pros: Preserves sentence boundaries, better context
# Cons: Variable chunk sizes, may be too short/long

3. Semantic Chunking (Recommended)

# chunking/semantic.py
from typing import List, Dict
import numpy as np

class SemanticChunker:
    """Split text into semantically coherent chunks"""

    def __init__(self, embedder, max_chunk_size: int = 1500, threshold: float = 0.7):
        self.embedder = embedder
        self.max_chunk_size = max_chunk_size
        self.threshold = threshold

    async def chunk(self, text: str) -> List[Dict]:
        """Split text into semantic chunks"""
        # 1. Split into sentences
        sentences = self._split_sentences(text)

        # 2. Generate embeddings for each sentence
        sentence_embeddings = await self.embedder.embed_documents(sentences)

        # 3. Calculate similarities between consecutive sentences
        similarities = [
            self._cosine_similarity(sentence_embeddings[i], sentence_embeddings[i+1])
            for i in range(len(sentence_embeddings) - 1)
        ]

        # 4. Identify chunk boundaries (where similarity drops below threshold)
        boundaries = [0]
        for i, sim in enumerate(similarities):
            if sim < self.threshold:
                boundaries.append(i + 1)
        boundaries.append(len(sentences))

        # 5. Create chunks
        chunks = []
        chunk_index = 0

        for i in range(len(boundaries) - 1):
            start_idx = boundaries[i]
            end_idx = boundaries[i+1]

            # Combine sentences in this segment
            chunk_sentences = sentences[start_idx:end_idx]
            chunk_text = ' '.join(chunk_sentences)

            # Further split if chunk is too long
            if len(chunk_text) > self.max_chunk_size:
                sub_chunks = self._split_long_chunk(chunk_text, self.max_chunk_size)
                for sub_chunk in sub_chunks:
                    chunks.append({
                        'text': sub_chunk,
                        'index': chunk_index,
                        'type': 'semantic'
                    })
                    chunk_index += 1
            else:
                chunks.append({
                    'text': chunk_text,
                    'index': chunk_index,
                    'sentence_count': len(chunk_sentences),
                    'type': 'semantic'
                })
                chunk_index += 1

        return chunks

    def _split_sentences(self, text: str) -> List[str]:
        """Split text into sentences"""
        import re
        return [s.strip() for s in re.split(r'(?<=[.!?])\s+', text) if s.strip()]

    def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        """Calculate cosine similarity"""
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

    def _split_long_chunk(self, text: str, max_size: int) -> List[str]:
        """Split long chunk into smaller pieces"""
        # Fallback to fixed-size splitting
        chunks = []
        start = 0
        while start < len(text):
            end = start + max_size
            chunks.append(text[start:end])
            start = end - 200  # Add overlap
        return chunks

# Pros: Semantically coherent, better retrieval
# Cons: Slower (requires embeddings), more complex

4. Hierarchical Chunking

# chunking/hierarchical.py
class HierarchicalChunker:
    """Create multi-level chunk hierarchy for different use cases"""

    def __init__(self, embedder):
        self.embedder = embedder

    async def chunk(self, text: str, document_id: str) -> Dict[str, List[Dict]]:
        """Create hierarchical chunks"""
        # Level 1: Document-level (for broad queries)
        doc_chunk = {
            'id': f"{document_id}_doc",
            'level': 'document',
            'text': text[:2000],  # Summary/first part
            'metadata': {'type': 'document_summary'}
        }

        # Level 2: Section-level (for medium queries)
        section_chunks = self._chunk_by_sections(text)

        # Level 3: Paragraph-level (for specific queries)
        paragraph_chunks = self._chunk_by_paragraphs(text)

        # Level 4: Sentence-level (for precise queries)
        sentence_chunks = self._chunk_by_sentences(text)

        return {
            'document': [doc_chunk],
            'sections': section_chunks,
            'paragraphs': paragraph_chunks,
            'sentences': sentence_chunks
        }

    def _chunk_by_sections(self, text: str) -> List[Dict]:
        """Split by markdown/document sections"""
        import re
        sections = re.split(r'
#{1,3}\s+', text)
        return [{'text': s, 'level': 'section'} for s in sections if s.strip()]

    def _chunk_by_paragraphs(self, text: str) -> List[Dict]:
        """Split by paragraphs"""
        paragraphs = text.split('

')
        return [{'text': p, 'level': 'paragraph'} for p in paragraphs if p.strip()]

    def _chunk_by_sentences(self, text: str) -> List[Dict]:
        """Split by sentences"""
        import re
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [{'text': s, 'level': 'sentence'} for s in sentences if s.strip()]

# Usage: Store all levels, retrieve based on query type

Recommended Strategy

For general enterprise knowledge:

chunker = SemanticChunker(
    embedder=OpenAIEmbeddings(model="text-embedding-3-small"),
    max_chunk_size=1000,
    threshold=0.75
)

For technical documentation:

chunker = HierarchicalChunker(embedder)
# Allows retrieval at appropriate granularity

Retrieval Optimization

Improving Retrieval Quality

1. Query Rewriting

# retrieval/query_rewrite.py
from typing import List

class QueryRewriter:
    """Rewrite queries for better retrieval"""

    def __init__(self, llm):
        self.llm = llm

    async def rewrite(self, query: str, context: str = "") -> str:
        """Rewrite query to improve retrieval"""
        prompt = f"""Rewrite the following query to improve information retrieval.

Original query: {query}

Context: {context}

Guidelines:
1. Make the query more specific
2. Add relevant domain terms
3. Fix grammatical issues
4. Expand abbreviations
5. Keep the intent the same

Rewritten query:"""

        response = await self.llm.generate(prompt)
        return response.strip()

    async def decompose(self, query: str) -> List[str]:
        """Decompose complex query into sub-queries"""
        prompt = f"""Break down the following query into 2-4 simpler sub-queries.

Query: {query}

Return one sub-query per line."""

        response = await self.llm.generate(prompt)
        sub_queries = [line.strip() for line in response.split('
') if line.strip()]

        return sub_queries

2. Reranking

# retrieval/reranker.py
from typing import List, Dict
import openai

class Reranker:
    """Rerank retrieved results for better relevance"""

    def __init__(self, model: str = "gpt-4o-mini"):
        self.client = openai.AsyncClient()
        self.model = model

    async def rerank(
        self,
        query: str,
        results: List[Dict],
        top_k: int = 10
    ) -> List[Dict]:
        """Rerank results based on query relevance"""
        if len(results) <= top_k:
            return results

        # Prepare reranking prompt
        result_texts = [
            f"[{i+1}] {r['text'][:500]}"
            for i, r in enumerate(results[:20])  # Rerank top 20
        ]

        prompt = f"""Rank the following passages by their relevance to the query.

Query: {query}

Passages:
{chr(10).join(result_texts)}

Instructions:
1. Rank passages from most relevant (1) to least relevant (20)
2. Return only the rankings as a comma-separated list
3. Consider: direct answers, completeness, specificity

Rankings:"""

        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.1
        )

        # Parse rankings
        rankings = response.choices[0].message.content
        ranked_indices = [int(x.strip()) - 1 for x in rankings.split(',')]

        # Reorder results
        reranked = [results[i] for i in ranked_indices if i < len(results)]

        return reranked[:top_k]

3. Hybrid Search

# retrieval/hybrid.py
class HybridSearcher:
    """Combine semantic and keyword search"""

    def __init__(self, vector_store, keyword_index):
        self.vector_store = vector_store
        self.keyword_index = keyword_index  # BM25 or similar

    async def search(
        self,
        query: str,
        top_k: int = 10,
        alpha: float = 0.5
    ) -> List[Dict]:
        """
        Hybrid search combining semantic and keyword

        Args:
            query: Search query
            top_k: Number of results
            alpha: Semantic search weight (0-1)

        Returns:
            Combined and reranked results
        """
        # Semantic search
        semantic_results = await self.vector_store.search(query, top_k * 2)

        # Keyword search
        keyword_results = await self.keyword_index.search(query, top_k * 2)

        # Combine scores
        combined = self._combine_results(
            semantic_results,
            keyword_results,
            alpha
        )

        # Remove duplicates and sort
        seen = set()
        unique_results = []
        for result in combined:
            if result['id'] not in seen:
                seen.add(result['id'])
                unique_results.append(result)

        return unique_results[:top_k]

    def _combine_results(
        self,
        semantic: List[Dict],
        keyword: List[Dict],
        alpha: float
    ) -> List[Dict]:
        """Combine and score results"""
        # Normalize scores
        sem_scores = [r['score'] for r in semantic]
        key_scores = [r['score'] for r in keyword]

        sem_max, sem_min = max(sem_scores), min(sem_scores)
        key_max, key_min = max(key_scores), min(key_scores)

        # Normalize to 0-1
        for r in semantic:
            r['normalized_score'] = (r['score'] - sem_min) / (sem_max - sem_min) if sem_max > sem_min else 0

        for r in keyword:
            r['normalized_score'] = (r['score'] - key_min) / (key_max - key_min) if key_max > key_min else 0

        # Combine
        combined = {}

        for r in semantic:
            combined[r['id']] = {
                **r,
                'combined_score': alpha * r['normalized_score']
            }

        for r in keyword:
            if r['id'] in combined:
                combined[r['id']]['combined_score'] += (1 - alpha) * r['normalized_score']
            else:
                combined[r['id']] = {
                    **r,
                    'combined_score': (1 - alpha) * r['normalized_score']
                }

        # Sort by combined score
        results = list(combined.values())
        results.sort(key=lambda x: x['combined_score'], reverse=True)

        return results

4. Metadata Filtering

# retrieval/filtering.py
from typing import Dict, List, Any

class MetadataFilter:
    """Apply metadata filters to search results"""

    @staticmethod
    def apply_filters(
        results: List[Dict],
        filters: Dict[str, Any]
    ) -> List[Dict]:
        """Filter results based on metadata"""
        filtered = results

        for key, value in filters.items():
            if isinstance(value, list):
                # Filter: value in list
                filtered = [
                    r for r in filtered
                    if r['metadata'].get(key) in value
                ]
            elif isinstance(value, dict):
                # Range filter
                if '$gte' in value:
                    filtered = [
                        r for r in filtered
                        if r['metadata'].get(key, 0) >= value['$gte']
                    ]
                if '$lte' in value:
                    filtered = [
                        r for r in filtered
                        if r['metadata'].get(key, float('inf')) <= value['$lte']
                    ]
            else:
                # Exact match
                filtered = [
                    r for r in filtered
                    if r['metadata'].get(key) == value
                ]

        return filtered

# Usage
filtered_results = MetadataFilter.apply_filters(results, {
    'category': ['technology', 'ai'],
    'created_at': {'$gte': '2025-01-01'},
    'status': 'published'
})

Generation and Synthesis

Prompt Engineering for RAG

1. Basic RAG Prompt

def build_basic_rag_prompt(query: str, context: List[Dict]) -> str:
    """Build basic RAG prompt"""
    context_str = "

---

".join([
        f"Document: {chunk['metadata']['title']}
{chunk['text']}"
        for chunk in context
    ])

    return f"""You are a helpful assistant. Answer the following question using the provided context.

Context:
{context_str}

Question: {query}

Instructions:
1. Base your answer only on the provided context
2. If the context doesn't contain the answer, say "I don't have enough information to answer this"
3. Cite sources using [Document X] notation
4. Be accurate and concise

Answer:"""

2. Advanced Multi-Source Prompt

def build_advanced_rag_prompt(
    query: str,
    context: List[Dict],
    conversation_history: List[Dict] = None
) -> str:
    """Build advanced RAG prompt with conversation history"""

    context_by_source = {}
    for chunk in context:
        source = chunk['metadata']['source']
        if source not in context_by_source:
            context_by_source[source] = []
        context_by_source[source].append(chunk)

    context_str = "

".join([
        f"## {source}
" + "

".join([c['text'] for c in chunks])
        for source, chunks in context_by_source.items()
    ])

    history_str = ""
    if conversation_history:
        history_str = "

".join([
            f"{msg['role']}: {msg['content']}"
            for msg in conversation_history[-5:]  # Last 5 messages
        ])

    return f"""You are an expert knowledge assistant. Help answer questions by synthesizing information from multiple sources.

### Conversation History
{history_str}

### Available Sources
{context_str}

### Current Question
{query}

### Instructions
1. Synthesize information from multiple sources when relevant
2. Acknowledge when sources disagree or conflict
3. Prioritize recent and authoritative sources
4. Use [Source: Document Title] citations
5. If information is missing, explicitly state what's unknown
6. Provide a clear, well-structured answer

### Answer Format
- Start with a direct answer
- Follow with supporting details
- Include source citations
- End with limitations (if any)

Answer:"""

3. Chain-of-Thought Prompting

def build_cot_rag_prompt(query: str, context: List[Dict]) -> str:
    """Build chain-of-thought RAG prompt"""
    return f"""Answer the following question using the provided context. Show your reasoning.

Context:
{' '.join([c['text'] for c in context[:3]])}

Question: {query}

Think step by step:
1. What is the question asking?
2. What relevant information is in the context?
3. What can I conclude from this information?
4. What information is missing?

Answer:"""

Response Post-Processing

# generation/post_process.py
from typing import Dict, List
import re

class ResponsePostProcessor:
    """Post-process generated responses"""

    @staticmethod
    def add_citations(response: str, context: List[Dict]) -> Dict:
        """Add citation links to response"""
        # Find [Source X] references
        citations = re.findall(r'\[Source (\d+)\]', response)

        # Create citation mapping
        citation_map = {}
        for cit in set(citations):
            idx = int(cit) - 1
            if idx < len(context):
                citation_map[cit] = {
                    'title': context[idx]['metadata']['title'],
                    'source': context[idx]['metadata']['source'],
                    'url': context[idx]['metadata'].get('url', '#')
                }

        return {
            'response': response,
            'citations': citation_map
        }

    @staticmethod
    def extract_key_points(response: str) -> List[str]:
        """Extract key points from response"""
        prompt = f"""Extract the key points from the following response.

Response:
{response}

Return one key point per line."""

        # Use LLM to extract
        # Implementation depends on your LLM setup
        return []

    @staticmethod
    def format_response(
        response: str,
        citations: List[Dict],
        sources: List[Dict]
    ) -> Dict:
        """Format final response for API"""
        return {
            'answer': response,
            'citations': [
                {
                    'index': i + 1,
                    'title': c['title'],
                    'source': c['source'],
                    'url': c.get('url')
                }
                for i, c in enumerate(citations)
            ],
            'sources': sources,
            'answer_length': len(response),
            'citation_count': len(citations)
        }

Evaluation and Quality Assurance

Metrics for RAG Systems

1. Retrieval Metrics

# evaluation/retrieval.py
from typing import List, Dict

class RetrievalEvaluator:
    """Evaluate retrieval quality"""

    @staticmethod
    def precision_at_k(retrieved: List[Dict], relevant: List[str], k: int) -> float:
        """Calculate Precision@K"""
        retrieved_ids = [r['id'] for r in retrieved[:k]]
        relevant_retrieved = set(retrieved_ids) & set(relevant)
        return len(relevant_retrieved) / k

    @staticmethod
    def recall_at_k(retrieved: List[Dict], relevant: List[str], k: int) -> float:
        """Calculate Recall@K"""
        retrieved_ids = [r['id'] for r in retrieved[:k]]
        relevant_retrieved = set(retrieved_ids) & set(relevant)
        return len(relevant_retrieved) / len(relevant)

    @staticmethod
    def mrr(retrieved: List[Dict], relevant: List[str]) -> float:
        """Calculate Mean Reciprocal Rank"""
        retrieved_ids = [r['id'] for r in retrieved]
        for i, doc_id in enumerate(retrieved_ids, 1):
            if doc_id in relevant:
                return 1 / i
        return 0.0

    @staticmethod
    def ndcg(retrieved: List[Dict], relevant: List[str], k: int) -> float:
        """Calculate Normalized DCG"""
        retrieved_ids = [r['id'] for r in retrieved[:k]]
        dcg = 0.0
        for i, doc_id in enumerate(retrieved_ids, 1):
            if doc_id in relevant:
                dcg += 1 / np.log2(i + 1)

        # Ideal DCG
        idcg = sum(1 / np.log2(i + 1) for i in range(1, min(len(relevant), k) + 1))

        return dcg / idcg if idcg > 0 else 0.0

2. Generation Metrics

# evaluation/generation.py
import openai

class GenerationEvaluator:
    """Evaluate generation quality using LLM-as-a-judge"""

    def __init__(self, model: str = "gpt-4o"):
        self.client = openai.Client()
        self.model = model

    def evaluate_relevance(self, query: str, response: str, context: List[Dict]) -> Dict:
        """Evaluate if response is relevant to query"""
        prompt = f"""Rate the relevance of the following response to the query.

Query: {query}

Response: {response}

Available Context:
{' '.join([c['text'][:200] for c in context[:3]])}

Rate on:
1. Relevance (0-100): Does it answer the question?
2. Accuracy (0-100): Is it factually correct based on context?
3. Completeness (0-100): Does it provide sufficient detail?
4. Citation Quality (0-100): Are citations appropriate?

Return as JSON:
{{"relevance": X, "accuracy": Y, "completeness": Z, "citation_quality": W}}"""

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )

        import json
        return json.loads(response.choices[0].message.content)

    def evaluate_hallucination(self, response: str, context: List[Dict]) -> Dict:
        """Check for hallucinations"""
        context_text = " ".join([c['text'] for c in context])

        prompt = f"""Analyze the following response for hallucinations (information not supported by context).

Response:
{response}

Context:
{context_text}

Identify:
1. Factual claims not in context
2. Invented sources or citations
3. Contradictions to context
4. Speculative statements presented as fact

Return as JSON with hallucinations list and severity (low/medium/high)."""

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )

        import json
        return json.loads(response.choices[0].message.content)

3. End-to-End Evaluation

# evaluation/e2e.py
from typing import List, Dict
import asyncio

class RAGEvaluator:
    """End-to-end RAG system evaluation"""

    def __init__(self, rag_system, evaluator):
        self.rag_system = rag_system
        self.evaluator = evaluator

    async def evaluate_on_dataset(
        self,
        test_questions: List[Dict],
        metrics: List[str] = None
    ) -> Dict:
        """
        Evaluate RAG system on test dataset

        Args:
            test_questions: List of {query, relevant_docs, expected_answer}
            metrics: Metrics to compute

        Returns:
            Evaluation results
        """
        if metrics is None:
            metrics = ['precision', 'recall', 'mrr', 'ndcg', 'relevance', 'accuracy']

        results = {
            'retrieval': {},
            'generation': {},
            'overall': {}
        }

        for question in test_questions:
            query = question['query']
            relevant = question['relevant_docs']

            # 1. Retrieve
            retrieved = await self.rag_system.retrieve(query, top_k=10)

            # 2. Evaluate retrieval
            if 'precision' in metrics:
                prec = RetrievalEvaluator.precision_at_k(retrieved, relevant, 5)
                results['retrieval']['precision'] = results['retrieval'].get('precision', []) + [prec]

            if 'recall' in metrics:
                rec = RetrievalEvaluator.recall_at_k(retrieved, relevant, 10)
                results['retrieval']['recall'] = results['retrieval'].get('recall', []) + [rec]

            if 'mrr' in metrics:
                mrr = RetrievalEvaluator.mrr(retrieved, relevant)
                results['retrieval']['mrr'] = results['retrieval'].get('mrr', []) + [mrr]

            # 3. Generate and evaluate
            response = await self.rag_system.generate(query, retrieved[:5])

            if 'relevance' in metrics:
                gen_eval = self.evaluator.evaluate_relevance(query, response, retrieved[:5])
                for metric, value in gen_eval.items():
                    results['generation'][metric] = results['generation'].get(metric, []) + [value]

        # Compute averages
        for category in ['retrieval', 'generation']:
            for metric, values in results[category].items():
                results['overall'][f'{category}_{metric}'] = sum(values) / len(values)

        return results

# Usage
evaluator = RAGEvaluator(rag_system, generation_evaluator)
results = await evaluator.evaluate_on_dataset(test_questions)
print(f"Overall Precision@5: {results['overall']['retrieval_precision']:.2f}")
print(f"Overall Relevance: {results['overall']['generation_relevance']:.2f}")

Scaling Considerations

Horizontal Scaling

# scaling/distributed.py
from typing import List, Dict
import asyncio
import numpy as np

class DistributedRAGSystem:
    """Distributed RAG system for horizontal scaling"""

    def __init__(self, config: Dict):
        # Multiple embedding models for parallel processing
        self.embedders = [
            EmbeddingGenerator(config['embeddings'])
            for _ in range(config['embedding_workers'])
        ]

        # Sharded vector stores
        self.vector_stores = [
            VectorStore(config['vector_db'], shard_id=i)
            for i in range(config['num_shards'])
        ]

    async def embed_batch_parallel(self, texts: List[str]) -> np.ndarray:
        """Embed texts in parallel"""
        batch_size = len(texts) // len(self.embedders)
        batches = [
            texts[i * batch_size:(i + 1) * batch_size]
            for i in range(len(self.embedders))
        ]

        # Parallel embedding
        results = await asyncio.gather(*[
            self.embedders[i].generate_batch(batch)
            for i, batch in enumerate(batches)
        ])

        # Combine results
        embeddings = np.concatenate(results)
        return embeddings

    async def retrieve_distributed(
        self,
        query_embedding: np.ndarray,
        top_k: int = 10
    ) -> List[Dict]:
        """Retrieve from all shards in parallel"""
        # Query all shards in parallel
        shard_results = await asyncio.gather(*[
            shard.search(query_embedding, top_k=top_k * 2)
            for shard in self.vector_stores
        ])

        # Combine and deduplicate
        all_results = []
        seen = set()
        for results in shard_results:
            for result in results:
                if result['id'] not in seen:
                    seen.add(result['id'])
                    all_results.append(result)

        # Sort by score and return top_k
        all_results.sort(key=lambda x: x['score'], reverse=True)
        return all_results[:top_k]

Caching Strategy

# scaling/cache.py
from typing import Dict, List, Optional
import hashlib
import json

class RAGCache:
    """Cache for RAG queries and responses"""

    def __init__(self, ttl: int = 3600):
        self.cache = {}  # In production, use Redis
        self.ttl = ttl

    def _generate_key(self, query: str, filters: Dict = None) -> str:
        """Generate cache key"""
        key_data = {'query': query, 'filters': filters}
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.sha256(key_str.encode()).hexdigest()

    def get(self, query: str, filters: Dict = None) -> Optional[Dict]:
        """Get cached response"""
        key = self._generate_key(query, filters)

        if key in self.cache:
            cached = self.cache[key]
            if time.time() - cached['timestamp'] < self.ttl:
                return cached['response']
            else:
                del self.cache[key]  # Expired

        return None

    def set(self, query: str, response: Dict, filters: Dict = None):
        """Cache response"""
        key = self._generate_key(query, filters)
        self.cache[key] = {
            'response': response,
            'timestamp': time.time()
        }

    def invalidate_document(self, document_id: str):
        """Invalidate cache entries for a document"""
        # In production, implement smarter invalidation
        self.cache.clear()

Production Deployment

Deployment Architecture

# docker-compose.yml for production RAG system
version: '3.8'

services:
  # API Gateway
  api:
    build: ./api
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres:5432/rag
      - REDIS_URL=redis://redis:6379
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - postgres
      - redis

  # PostgreSQL with pgvector
  postgres:
    image: pgvector/pgvector:pg16
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=rag
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  # Redis for caching
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  # Worker for background tasks
  worker:
    build: ./worker
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres:5432/rag
      - REDIS_URL=redis://redis:6379
    depends_on:
      - postgres
      - redis

  # Monitoring
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  postgres_data:
  redis_data:
  grafana_data:

API Implementation

# api/main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Optional
import asyncio

from rag_system import RAGSystem
from cache import RAGCache

app = FastAPI(title="Enterprise RAG API")

# Add CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize RAG system
rag_system = RAGSystem(config={
    'database_url': 'postgresql://...',
    'openai_api_key': '...',
    'cache_ttl': 3600
})

cache = RAGCache(ttl=3600)

class QueryRequest(BaseModel):
    query: str
    top_k: int = 10
    filters: Optional[Dict] = None
    conversation_id: Optional[str] = None

class QueryResponse(BaseModel):
    answer: str
    citations: List[Dict]
    sources: List[Dict]
    retrieval_time: float
    generation_time: float
    total_time: float

@app.post("/api/v1/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    """Query the RAG system"""
    start_time = time.time()

    # Check cache
    cached_response = cache.get(request.query, request.filters)
    if cached_response:
        return cached_response

    try:
        # 1. Retrieve
        retrieval_start = time.time()
        context = await rag_system.retrieve(
            request.query,
            top_k=request.top_k,
            filters=request.filters
        )
        retrieval_time = time.time() - retrieval_start

        # 2. Generate
        generation_start = time.time()
        response = await rag_system.generate(
            request.query,
            context,
            conversation_id=request.conversation_id
        )
        generation_time = time.time() - generation_start

        total_time = time.time() - start_time

        # Format response
        result = QueryResponse(
            answer=response['answer'],
            citations=response['citations'],
            sources=response['sources'],
            retrieval_time=retrieval_time,
            generation_time=generation_time,
            total_time=total_time
        )

        # Cache response
        cache.set(request.query, result.dict(), request.filters)

        return result

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/v1/ingest")
async def ingest_document(document: Dict):
    """Ingest a document into the knowledge base"""
    try:
        chunk_ids = await rag_system.ingest(document)
        return {"status": "success", "chunk_ids": chunk_ids}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/v1/health")
async def health():
    """Health check endpoint"""
    return {"status": "healthy", "timestamp": time.time()}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Monitoring and Observability

Metrics Collection

# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time

# Define metrics
query_counter = Counter(
    'rag_queries_total',
    'Total number of RAG queries',
    ['status']
)

query_duration = Histogram(
    'rag_query_duration_seconds',
    'RAG query duration',
    ['stage']  # retrieval, generation, total
)

cache_hits = Counter(
    'rag_cache_hits_total',
    'Total cache hits'
)

cache_misses = Counter(
    'rag_cache_misses_total',
    'Total cache misses'
)

embedding_queue_size = Gauge(
    'rag_embedding_queue_size',
    'Current embedding queue size'
)

class RAGMetrics:
    """Collect and report RAG metrics"""

    @staticmethod
    def record_query(status: str):
        query_counter.labels(status=status).inc()

    @staticmethod
    def record_duration(stage: str, duration: float):
        query_duration.labels(stage=stage).observe(duration)

    @staticmethod
    def record_cache_hit():
        cache_hits.inc()

    @staticmethod
    def record_cache_miss():
        cache_misses.inc()

Logging

# monitoring/logging.py
import logging
import json
from datetime import datetime

class RAGLogger:
    """Structured logging for RAG systems"""

    def __init__(self):
        self.logger = logging.getLogger('rag_system')

    def log_query(
        self,
        query: str,
        context: List[Dict],
        response: Dict,
        duration: float
    ):
        """Log query with full context"""
        log_entry = {
            'event': 'query',
            'timestamp': datetime.now().isoformat(),
            'query': query,
            'context_count': len(context),
            'context_sources': [c['metadata']['source'] for c in context],
            'response_length': len(response.get('answer', '')),
            'citation_count': len(response.get('citations', [])),
            'duration_ms': duration * 1000,
            'retrieval_time_ms': response.get('retrieval_time', 0) * 1000,
            'generation_time_ms': response.get('generation_time', 0) * 1000
        }

        self.logger.info(json.dumps(log_entry))

    def log_error(self, error: Exception, context: Dict):
        """Log error with context"""
        log_entry = {
            'event': 'error',
            'timestamp': datetime.now().isoformat(),
            'error_type': type(error).__name__,
            'error_message': str(error),
            'context': context
        }

        self.logger.error(json.dumps(log_entry))

Real-World Implementation

Complete Enterprise RAG System

# rag_system.py
from typing import List, Dict, Optional
import asyncio

from ingestion.pipeline import DocumentIngestionPipeline
from retrieval.engine import RetrievalEngine
from generation.generator import ResponseGenerator
from cache import RAGCache
from monitoring.metrics import RAGMetrics
from monitoring.logging import RAGLogger

class RAGSystem:
    """Complete RAG system for enterprise"""

    def __init__(self, config: Dict):
        # Initialize components
        self.ingestion_pipeline = DocumentIngestionPipeline(config['ingestion'])
        self.retrieval_engine = RetrievalEngine(
            config['vector_store'],
            config['embeddings'],
            config['retrieval']
        )
        self.generator = ResponseGenerator(config['generation'])
        self.cache = RAGCache(ttl=config.get('cache_ttl', 3600))
        self.metrics = RAGMetrics()
        self.logger = RAGLogger()

    async def ingest(self, document: Dict) -> List[str]:
        """Ingest document into knowledge base"""
        try:
            chunk_ids = await self.ingestion_pipeline.ingest_document(document)
            self.logger.log_ingestion(document, chunk_ids)
            return chunk_ids
        except Exception as e:
            self.logger.log_error(e, {'document': document})
            raise

    async def query(
        self,
        query: str,
        top_k: int = 10,
        filters: Optional[Dict] = None,
        use_cache: bool = True
    ) -> Dict:
        """Query the RAG system"""
        start_time = time.time()

        # Check cache
        if use_cache:
            cached = self.cache.get(query, filters)
            if cached:
                self.metrics.record_cache_hit()
                return cached
            self.metrics.record_cache_miss()

        try:
            # Retrieve
            retrieval_start = time.time()
            context = await self.retrieval_engine.retrieve(query, top_k, filters)
            retrieval_time = time.time() - retrieval_start

            # Generate
            generation_start = time.time()
            response = await self.generator.generate_response(query, context)
            generation_time = time.time() - generation_start

            # Add timing info
            response['retrieval_time'] = retrieval_time
            response['generation_time'] = generation_time
            response['total_time'] = time.time() - start_time

            # Log query
            self.logger.log_query(query, context, response, response['total_time'])

            # Record metrics
            self.metrics.record_query('success')
            self.metrics.record_duration('total', response['total_time'])
            self.metrics.record_duration('retrieval', retrieval_time)
            self.metrics.record_duration('generation', generation_time)

            # Cache response
            if use_cache:
                self.cache.set(query, response, filters)

            return response

        except Exception as e:
            self.metrics.record_query('error')
            self.logger.log_error(e, {'query': query, 'filters': filters})
            raise

    async def bulk_ingest(self, documents: List[Dict]) -> Dict:
        """Ingest multiple documents"""
        results = {
            'successful': 0,
            'failed': 0,
            'errors': []
        }

        for document in documents:
            try:
                await self.ingest(document)
                results['successful'] += 1
            except Exception as e:
                results['failed'] += 1
                results['errors'].append({
                    'document_id': document.get('id'),
                    'error': str(e)
                })

        return results

    async def delete_document(self, document_id: str):
        """Delete document from knowledge base"""
        # Implementation depends on vector store
        pass

    async def update_document(self, document_id: str, updated_document: Dict):
        """Update document in knowledge base"""
        # Delete old, insert new
        await self.delete_document(document_id)
        await self.ingest(updated_document)

Key Takeaways

Building production RAG systems requires careful consideration of:

  1. Vector Database Selection - Choose based on your specific needs
  2. Embedding Strategy - Balance cost, quality, and performance
  3. Chunking Technique - Use semantic chunking for best results
  4. Retrieval Optimization - Hybrid search with reranking
  5. Quality Evaluation - Continuous monitoring and improvement
  6. Scaling Strategy - Plan for horizontal scaling from day one
  7. Monitoring & Observability - Essential for production systems

The key to success is iterative improvement:

  • Start with a simple baseline
  • Measure everything
  • Optimize based on data
  • Scale as needed

RAG systems are transforming how enterprises access and utilize their knowledge. With the right architecture and implementation, you can build powerful, accurate, and scalable knowledge search systems that provide real business value.

Frequently Asked Questions

What is RAG and how does it work?

RAG (Retrieval-Augmented Generation) combines a vector search retrieval step with an LLM generation step. When a user submits a query, the system encodes it into a vector embedding, searches a vector database for the most semantically similar document chunks, and injects those chunks as context into the LLM prompt. The LLM then generates an answer grounded in the retrieved documents rather than relying solely on its training data.

How does RAG reduce AI hallucinations?

Hallucinations occur when an LLM fabricates information not present in its training data. RAG mitigates this by providing the model with authoritative source documents as in-context evidence for every response. Studies show RAG reduces hallucination rates by 70-90% compared to standalone LLMs. Including source citations in the response allows end users to verify claims against original documents.

What chunk size should I use when indexing documents for RAG?

Optimal chunk size depends on the document type and retrieval strategy. A common starting point is 512-1024 tokens per chunk with a 10-15% overlap between consecutive chunks to preserve sentence context. Smaller chunks (128-256 tokens) improve retrieval precision for Q&A tasks, while larger chunks (1024-2048 tokens) work better for summarization. Always evaluate chunk size empirically using retrieval hit rate metrics.

Which vector database is best for enterprise RAG?

pgvector (PostgreSQL extension) is the top choice for organizations already running PostgreSQL, as it eliminates the need for a separate vector database and supports ACID transactions. Pinecone and Weaviate excel for large-scale, dedicated vector workloads requiring billions of vectors. For self-hosted deployments, Qdrant and Chroma offer strong performance with simple operational footprints.

How do you evaluate RAG system quality?

The three core RAG metrics are context recall (did retrieval find the relevant documents?), faithfulness (does the answer accurately reflect the retrieved context?), and answer relevancy (does the answer address the user's question?). Frameworks like RAGAS and TruLens provide automated evaluation pipelines for these metrics. Establishing a golden QA dataset specific to your domain is essential for ongoing quality monitoring.

How much does a production RAG system cost to run?

Costs divide into three buckets: embedding generation (typically $0.0001 per 1000 tokens with text-embedding-3-small), vector database hosting ($100-500/month for most mid-size deployments), and LLM inference ($0.01-0.06 per 1000 output tokens for GPT-4o). For an enterprise system processing 10,000 queries per day, expect total costs of $500-3000/month depending on document volume and model choice.


Need Help Building a RAG System?

Our AI Agent Teams have built production RAG systems for 200+ enterprise clients. Starting at $22/hr.

Hire AI-First Engineers | Get Free Estimate


Related Articles:


Published: January 2026 | Author: Groovy Web Team | Category: AI Development

Ship 10-20X Faster with AI Agent Teams

Our AI-First engineering approach delivers production-ready applications in weeks, not months. Starting at $22/hr.

Get Free Consultation

Was this article helpful?

Groovy Web Team

Written by Groovy Web Team

Groovy Web is an AI-First development agency specializing in building production-grade AI applications, multi-agent systems, and enterprise solutions. We've helped 200+ clients achieve 10-20X development velocity using AI Agent Teams.

Ready to Build Your App?

Get a free consultation and see how AI-First development can accelerate your project.

1-week free trial No long-term contract Start in 1-2 weeks
Get Free Consultation
Start a Project

Got an Idea?
Let's Build It Together

Tell us about your project and we'll get back to you within 24 hours with a game plan.

Response Time

Within 24 hours

247+ Projects Delivered
10+ Years Experience
3 Global Offices

Follow Us

Only 3 slots available this month

Hire AI-First Engineers
10-20Γ— Faster Development

For startups & product teams

One engineer replaces an entire team. Full-stack development, AI orchestration, and production-grade delivery β€” starting at just $22/hour.

Helped 8+ startups save $200K+ in 60 days

10-20Γ— faster delivery
Save 70-90% on costs
Start in 1-2 weeks

No long-term commitment Β· Flexible pricing Β· Cancel anytime