AI/ML RAG Systems in Production: Building Enterprise Knowledge Search Groovy Web Team January 31, 2026 31 min read 40 views Blog AI/ML RAG Systems in Production: Building Enterprise Knowledge Seβ¦ A comprehensive guide to building production-ready Retrieval-Augmented Generation (RAG) systems. Learn about vector databases, embedding strategies, retrieval optimization, and real-world implementation patterns for enterprise knowledge search. Introduction Retrieval-Augmented Generation (RAG) has revolutionized how enterprises build intelligent knowledge systems. By combining the power of large language models with domain-specific knowledge, RAG systems can answer questions, synthesize information, and provide insights that pure LLMs cannot achieve alone. At Groovy Web, we've built and deployed RAG systems for Fortune 500 companies, helping them unlock the value of their organizational knowledge. This guide captures everything we've learned from building production RAG systems that serve millions of queries per month. Fortune 500 Enterprise Clients RAG systems deployed for Fortune 500 companies Millions Queries / Month Production systems serving millions of queries monthly 90% Cost Reduction Cheaper than managed vector database alternatives Days Time to Production RAG systems reach production in days to weeks vs months for fine-tuning Understanding RAG Systems What is RAG? RAG (Retrieval-Augmented Generation) is a technique that enhances large language models by retrieving relevant context from a knowledge base before generating responses. Without RAG: User Question β LLM β Answer (Limited to training data) With RAG: User Question β Retrieve Relevant Documents β LLM + Context β Answer (Grounded in knowledge base) Why RAG for Enterprise? 1. Domain-Specific Knowledge LLMs are trained on public internet data, but enterprises have proprietary information: Internal documentation Product specifications Customer interactions Research papers Compliance documents RAG systems enable LLMs to access this private knowledge. 2. Reduced Hallucinations By grounding responses in retrieved documents, RAG systems: Cite sources Provide verifiable information Reduce false claims Build user trust 3. Cost-Effective Compared to fine-tuning: No model training required Easy to update knowledge base Lower infrastructure costs Faster time to production 4. Transparency and Compliance RAG systems provide: Source attribution Audit trails Compliance with regulations Explainable AI RAG vs Fine-Tuning Aspect RAG Fine-Tuning Knowledge updates Instant (add to database) Requires retraining Cost Low ($/query) High (training costs) Domain specificity High (source data) Medium (pattern learning) Hallucination risk Low (grounded) Medium (model-based) Transparency High (citations) Low (black box) Setup time Days to weeks Weeks to months Maintenance Ongoing indexing Periodic retraining Best Use Cases for RAG: Knowledge search and Q&A Document analysis Customer support automation Research assistance Compliance and legal review Best Use Cases for Fine-Tuning: Style and tone customization Format standardization Domain-specific reasoning Specialized instruction following System Architecture End-to-End RAG Pipeline βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β KNOWLEDGE BASE β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β Documents β β Vectors β β Metadata β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β Ingestion Pipeline βΌ βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β PROCESSING LAYER β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β Chunk βββ Embed βββ Index βββ Store β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β Query βΌ βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β RETRIEVAL LAYER β β ββββββββββββββ ββββββββββββββ ββββββββββββββ β β β Query βββ Semantic βββ Hybrid β β β β Embedding β β Search β β Search β β β ββββββββββββββ ββββββββββββββ ββββββββββββββ β β β β β βΌ β β ββββββββββββββββ β β β Rerank & β β β β Filter β β β ββββββββββββββββ β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β Context βΌ βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β GENERATION LAYER β β ββββββββββββββ ββββββββββββββ ββββββββββββββ β β β Prompt βββ LLM βββ Response β β β β Building β β Inference β β Synthesis β β β ββββββββββββββ ββββββββββββββ ββββββββββββββ β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β βΌ User Response Component Breakdown 1. Ingestion Pipeline # ingestion/pipeline.py from typing import List, Dict from pathlib import Path import hashlib class DocumentIngestionPipeline: """Process and ingest documents into knowledge base""" def __init__(self, config: Dict): self.chunker = DocumentChunker(config['chunking']) self.embedder = EmbeddingGenerator(config['embeddings']) self.vector_store = VectorStore(config['vector_db']) async def ingest_document(self, document: Dict) -> List[str]: """ Ingest a document into the knowledge base Returns: List of chunk IDs """ # 1. Extract text and metadata text = document['content'] metadata = { 'title': document['title'], 'source': document['source'], 'author': document.get('author'), 'created_at': document.get('created_at'), 'doc_type': document.get('type', 'unknown'), 'language': document.get('language', 'en') } # 2. Split into chunks chunks = self.chunker.chunk(text) # 3. Generate embeddings chunk_texts = [chunk['text'] for chunk in chunks] embeddings = await self.embedder.generate_batch(chunk_texts) # 4. Prepare records for storage records = [] for chunk, embedding in zip(chunks, embeddings): record = { 'id': self._generate_chunk_id(document['id'], chunk['index']), 'document_id': document['id'], 'text': chunk['text'], 'embedding': embedding, 'metadata': { **metadata, 'chunk_index': chunk['index'], 'chunk_size': len(chunk['text']), 'start_char': chunk['start'], 'end_char': chunk['end'] } } records.append(record) # 5. Store in vector database chunk_ids = await self.vector_store.insert(records) return chunk_ids def _generate_chunk_id(self, doc_id: str, chunk_index: int) -> str: """Generate unique chunk ID""" hash_input = f"{doc_id}_{chunk_index}" return hashlib.sha256(hash_input.encode()).hexdigest()[:32] 2. Retrieval Engine # retrieval/engine.py from typing import List, Dict, Optional import numpy as np class RetrievalEngine: """Retrieve relevant documents for queries""" def __init__(self, vector_store, embedder, config: Dict): self.vector_store = vector_store self.embedder = embedder self.config = config self.reranker = Reranker(config.get('reranking')) async def retrieve( self, query: str, top_k: int = 10, filters: Optional[Dict] = None ) -> List[Dict]: """ Retrieve relevant chunks for a query Args: query: User query top_k: Number of results to return filters: Metadata filters (e.g., {category: 'technology'}) Returns: List of retrieved chunks with scores """ # 1. Generate query embedding query_embedding = await self.embedder.generate(query) # 2. Semantic search results = await self.vector_store.similarity_search( query_embedding, top_k=top_k * 2, # Retrieve more for reranking filters=filters ) # 3. Rerank if configured if self.reranker and len(results) > top_k: results = await self.reranker.rerank(query, results, top_k) return results[:top_k] async def retrieve_with_hybrid_search( self, query: str, top_k: int = 10, alpha: float = 0.5, filters: Optional[Dict] = None ) -> List[Dict]: """ Hybrid retrieval combining semantic and keyword search Args: query: User query top_k: Number of results alpha: Weight for semantic search (0-1) filters: Metadata filters Returns: Reranked combined results """ # 1. Semantic search semantic_results = await self.vector_store.similarity_search( await self.embedder.generate(query), top_k=top_k * 2, filters=filters ) # 2. Keyword search keyword_results = await self.vector_store.keyword_search( query, top_k=top_k * 2, filters=filters ) # 3. Combine and rerank combined = self._combine_results( semantic_results, keyword_results, alpha ) # 4. Rerank combined results if self.reranker: combined = await self.reranker.rerank(query, combined, top_k) return combined[:top_k] def _combine_results( self, semantic_results: List[Dict], keyword_results: List[Dict], alpha: float ) -> List[Dict]: """Combine semantic and keyword search results""" # Score normalization sem_scores = np.array([r['score'] for r in semantic_results]) key_scores = np.array([r['score'] for r in keyword_results]) sem_normalized = (sem_scores - sem_scores.min()) / (sem_scores.max() - sem_scores.min()) key_normalized = (key_scores - key_scores.min()) / (key_scores.max() - key_scores.min()) # Combine scores for i, result in enumerate(semantic_results): result['combined_score'] = alpha * sem_normalized[i] for i, result in enumerate(keyword_results): result['combined_score'] += (1 - alpha) * key_normalized[i] # Merge and sort by combined score seen = set() combined = [] for result in semantic_results + keyword_results: if result['id'] not in seen: seen.add(result['id']) combined.append(result) combined.sort(key=lambda x: x['combined_score'], reverse=True) return combined 3. Response Generator # generation/generator.py from typing import List, Dict import openai class ResponseGenerator: """Generate responses using retrieved context""" def __init__(self, config: Dict): self.client = openai.AsyncClient(api_key=config['api_key']) self.model = config['model'] self.temperature = config.get('temperature', 0.3) self.max_tokens = config.get('max_tokens', 1000) async def generate_response( self, query: str, context: List[Dict], conversation_history: Optional[List[Dict]] = None ) -> Dict: """ Generate response using retrieved context Args: query: User query context: Retrieved chunks conversation_history: Previous messages (for chat) Returns: Generated response with citations """ # 1. Build prompt with context prompt = self._build_prompt(query, context) # 2. Generate response messages = self._build_messages(prompt, conversation_history) response = await self.client.chat.completions.create( model=self.model, messages=messages, temperature=self.temperature, max_tokens=self.max_tokens ) # 3. Extract response and citations answer = response.choices[0].message.content citations = self._extract_citations(response, context) return { 'answer': answer, 'citations': citations, 'sources': self._get_unique_sources(context), 'model': self.model, 'tokens_used': response.usage.total_tokens } def _build_prompt(self, query: str, context: List[Dict]) -> str: """Build prompt with context""" context_str = " ".join([ f"[Source {i+1}] {chunk['text']}" for i, chunk in enumerate(context) ]) prompt = f"""You are a helpful assistant that answers questions based on the provided context. Context: {context_str} Question: {query} Instructions: 1. Answer the question using only the provided context 2. If the context doesn't contain enough information, say so 3. Cite sources using [Source X] notation 4. Be concise and accurate 5. If asked for sources, provide them Answer:""" return prompt def _build_messages( self, prompt: str, history: Optional[List[Dict]] = None ) -> List[Dict]: """Build message list for API""" messages = [] if history: messages.extend(history) messages.append({ "role": "user", "content": prompt }) return messages def _extract_citations( self, response: openai.ChatCompletion, context: List[Dict] ) -> List[Dict]: """Extract citations from response""" answer = response.choices[0].message.content # Find source references like [Source 1], [Source 2], etc. import re citations = re.findall(r'\[Source (\d+)\]', answer) # Map to actual source chunks unique_citations = [] for citation in set(citations): idx = int(citation) - 1 # Convert to 0-based index if idx < len(context): unique_citations.append({ 'index': int(citation), 'chunk_id': context[idx]['id'], 'document_id': context[idx]['metadata']['document_id'], 'title': context[idx]['metadata']['title'], 'source': context[idx]['metadata']['source'] }) return unique_citations def _get_unique_sources(self, context: List[Dict]) -> List[Dict]: """Get unique sources from context""" seen = set() sources = [] for chunk in context: doc_id = chunk['metadata']['document_id'] if doc_id not in seen: seen.add(doc_id) sources.append({ 'document_id': doc_id, 'title': chunk['metadata']['title'], 'source': chunk['metadata']['source'], 'author': chunk['metadata'].get('author'), 'created_at': chunk['metadata'].get('created_at') }) return sources Vector Database Selection Comparison Matrix Database Open Source Cloud Managed Performance Scalability Features Cost pgvector β β (Supabase, etc.) ββββ ββββ Relational DB + vectors $ Pinecone β β βββββ βββββ Purpose-built, easy $$$ Weaviate β β ββββ ββββ GraphQL, multi-modal $$ Qdrant β β βββββ βββββ Filter optimization, hybrid $$ Milvus β β (Zilliz) βββββ βββββ Distributed, cloud-native $$ Chroma β β βββ βββ Simple, embedded Free Selection Criteria Choose pgvector if: Already using PostgreSQL Need ACID transactions Want to minimize infrastructure Budget-conscious Need SQL joins with vector search Choose Pinecone if: Want fully managed solution Need auto-scaling Prioritize ease of setup Have budget for managed service Want fastest time to production Choose Qdrant if: Need advanced filtering Want hybrid search capabilities Require high performance Prefer open-source with managed option Choose Weaviate if: Need multi-modal search (text + image) Want GraphQL API Require modular architecture Building knowledge graphs Our Choice: pgvector We recommend pgvector for most enterprise RAG systems because: 1. Unified Data Model -- Single query for vectors + metadata SELECT d.title, d.content, d.metadata->>'category' as category, 1 - (d.embedding <=> query_embedding) as similarity FROM documents d JOIN document_tags dt ON d.id = dt.document_id WHERE d.status = 'published' AND dt.tag_id = ANY(SELECT id FROM tags WHERE name IN ('AI', 'ML')) AND d.created_at > NOW() - INTERVAL '1 year' ORDER BY d.embedding <=> query_embedding LIMIT 20; 2. Cost Effective No separate vector database needed Use existing PostgreSQL infrastructure Self-hosted option available 90% cheaper than managed alternatives 3. Mature Ecosystem Backup/restore tools Replication and HA Monitoring and observability ORM support (SQLAlchemy, Django ORM) 4. Performance -- With proper indexing CREATE INDEX idx_documents_embedding_hnsw ON documents USING hnsw(embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64); -- Query performance: 15-30ms for 1M vectors Embedding Strategies Model Selection Model Dimensions Context Length Speed Quality Cost/1M tokens text-embedding-3-small 1536 8191 Fast Good $0.02 text-embedding-3-large 3072 8191 Medium Excellent $0.13 text-embedding-ada-002 1536 8191 Fast Good $0.10 bge-large-en-v1.5 1024 512 Fast Excellent Free (self-hosted) e5-large-v2 1024 512 Fast Very Good Free (self-hosted) Recommendation For most enterprise use cases: text-embedding-3-small embeddings = OpenAIEmbeddings( model="text-embedding-3-small", dimensions=1536 # Can truncate to 512 for faster search ) Why? Best price/performance ratio Good quality for most domains Long context (8191 tokens) Multi-language support Lower storage costs For specialized domains: Open-source models (self-hosted) # For legal/medical/technical content from sentence_transformers import SentenceTransformer model = SentenceTransformer('BAAI/bge-large-en-v1.5') embeddings = model.encode(texts) Embedding Optimization 1. Dimensionality Reduction # Reduce from 1536 to 512 dimensions (faster search, lower storage) import numpy as np from sklearn.decomposition import PCA def reduce_dimensions(embeddings: np.ndarray, target_dim: int = 512) -> np.ndarray: """Reduce embedding dimensions using PCA""" pca = PCA(n_components=target_dim) return pca.fit_transform(embeddings) # Usage full_embeddings = np.array([...]) # (N, 1536) reduced_embeddings = reduce_dimensions(full_embeddings, 512) Trade-offs: 1536 dims: Best quality, slower search 768 dims: Good balance 512 dims: Faster search, slight quality loss 256 dims: Fastest search, noticeable quality loss 2. Hybrid Embeddings # Combine semantic and keyword embeddings class HybridEmbedding: def __init__(self): self.semantic_model = OpenAIEmbeddings(model="text-embedding-3-small") self.bm25 = BM25Encoder() def embed_documents(self, texts: List[str]) -> Dict[str, np.ndarray]: """Generate both semantic and keyword embeddings""" semantic = self.semantic_model.embed_documents(texts) keyword = self.bm25.encode_documents(texts) return { 'semantic': np.array(semantic), 'keyword': np.array(keyword) } 3. Query Expansion # Expand queries with related terms for better retrieval async def expand_query(query: str, llm) -> List[str]: """Generate query variations""" prompt = f"""Generate 3-5 alternative queries for: "{query}" Consider: - Synonyms - Related concepts - Different phrasings - Broader/narrower terms Return one query per line.""" response = await llm.generate(prompt) variations = [line.strip() for line in response.split(' ') if line.strip()] return [query] + variations # Usage query_variations = await expand_query("How to implement RAG?", llm) # Returns: [ # "How to implement RAG?", # "Building retrieval-augmented generation systems", # "RAG implementation guide", # "Creating RAG applications", # "RAG system architecture" # ] Chunking Techniques Why Chunking Matters Chunking is the most critical decision in RAG systems: Too small β Loss of context Too large β Noisy retrieval, slow generation Poor boundaries β Fragmented information Chunking Strategies 1. Fixed-Size Chunking # chunking/fixed_size.py from typing import List, Dict class FixedSizeChunker: """Split text into fixed-size chunks""" def __init__(self, chunk_size: int = 1000, overlap: int = 200): self.chunk_size = chunk_size self.overlap = overlap def chunk(self, text: str) -> List[Dict]: """Split text into chunks""" chunks = [] start = 0 chunk_index = 0 while start < len(text): end = start + self.chunk_size chunk_text = text[start:end] chunks.append({ 'text': chunk_text, 'index': chunk_index, 'start': start, 'end': end, 'size': len(chunk_text) }) chunk_index += 1 start = end - self.overlap return chunks # Pros: Simple, predictable # Cons: May split sentences, loses context 2. Sentence-Based Chunking # chunking/sentence.py import re from typing import List, Dict class SentenceChunker: """Split text into sentence-based chunks""" def __init__(self, sentences_per_chunk: int = 5, overlap: int = 1): self.sentences_per_chunk = sentences_per_chunk self.overlap = overlap def chunk(self, text: str) -> List[Dict]: """Split text into sentence-based chunks""" # Split into sentences sentences = re.split(r'(?<=[.!?])\s+', text) chunks = [] chunk_index = 0 i = 0 while i < len(sentences): # Get sentences for this chunk end = min(i + self.sentences_per_chunk, len(sentences)) chunk_sentences = sentences[i:end] chunk_text = ' '.join(chunk_sentences) start_char = text.find(chunk_sentences[0]) end_char = start_char + len(chunk_text) chunks.append({ 'text': chunk_text, 'index': chunk_index, 'start': start_char, 'end': end_char, 'size': len(chunk_text), 'sentence_count': len(chunk_sentences) }) chunk_index += 1 i += self.sentences_per_chunk - self.overlap return chunks # Pros: Preserves sentence boundaries, better context # Cons: Variable chunk sizes, may be too short/long 3. Semantic Chunking (Recommended) # chunking/semantic.py from typing import List, Dict import numpy as np class SemanticChunker: """Split text into semantically coherent chunks""" def __init__(self, embedder, max_chunk_size: int = 1500, threshold: float = 0.7): self.embedder = embedder self.max_chunk_size = max_chunk_size self.threshold = threshold async def chunk(self, text: str) -> List[Dict]: """Split text into semantic chunks""" # 1. Split into sentences sentences = self._split_sentences(text) # 2. Generate embeddings for each sentence sentence_embeddings = await self.embedder.embed_documents(sentences) # 3. Calculate similarities between consecutive sentences similarities = [ self._cosine_similarity(sentence_embeddings[i], sentence_embeddings[i+1]) for i in range(len(sentence_embeddings) - 1) ] # 4. Identify chunk boundaries (where similarity drops below threshold) boundaries = [0] for i, sim in enumerate(similarities): if sim < self.threshold: boundaries.append(i + 1) boundaries.append(len(sentences)) # 5. Create chunks chunks = [] chunk_index = 0 for i in range(len(boundaries) - 1): start_idx = boundaries[i] end_idx = boundaries[i+1] # Combine sentences in this segment chunk_sentences = sentences[start_idx:end_idx] chunk_text = ' '.join(chunk_sentences) # Further split if chunk is too long if len(chunk_text) > self.max_chunk_size: sub_chunks = self._split_long_chunk(chunk_text, self.max_chunk_size) for sub_chunk in sub_chunks: chunks.append({ 'text': sub_chunk, 'index': chunk_index, 'type': 'semantic' }) chunk_index += 1 else: chunks.append({ 'text': chunk_text, 'index': chunk_index, 'sentence_count': len(chunk_sentences), 'type': 'semantic' }) chunk_index += 1 return chunks def _split_sentences(self, text: str) -> List[str]: """Split text into sentences""" import re return [s.strip() for s in re.split(r'(?<=[.!?])\s+', text) if s.strip()] def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float: """Calculate cosine similarity""" return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)) def _split_long_chunk(self, text: str, max_size: int) -> List[str]: """Split long chunk into smaller pieces""" # Fallback to fixed-size splitting chunks = [] start = 0 while start < len(text): end = start + max_size chunks.append(text[start:end]) start = end - 200 # Add overlap return chunks # Pros: Semantically coherent, better retrieval # Cons: Slower (requires embeddings), more complex 4. Hierarchical Chunking # chunking/hierarchical.py class HierarchicalChunker: """Create multi-level chunk hierarchy for different use cases""" def __init__(self, embedder): self.embedder = embedder async def chunk(self, text: str, document_id: str) -> Dict[str, List[Dict]]: """Create hierarchical chunks""" # Level 1: Document-level (for broad queries) doc_chunk = { 'id': f"{document_id}_doc", 'level': 'document', 'text': text[:2000], # Summary/first part 'metadata': {'type': 'document_summary'} } # Level 2: Section-level (for medium queries) section_chunks = self._chunk_by_sections(text) # Level 3: Paragraph-level (for specific queries) paragraph_chunks = self._chunk_by_paragraphs(text) # Level 4: Sentence-level (for precise queries) sentence_chunks = self._chunk_by_sentences(text) return { 'document': [doc_chunk], 'sections': section_chunks, 'paragraphs': paragraph_chunks, 'sentences': sentence_chunks } def _chunk_by_sections(self, text: str) -> List[Dict]: """Split by markdown/document sections""" import re sections = re.split(r' #{1,3}\s+', text) return [{'text': s, 'level': 'section'} for s in sections if s.strip()] def _chunk_by_paragraphs(self, text: str) -> List[Dict]: """Split by paragraphs""" paragraphs = text.split(' ') return [{'text': p, 'level': 'paragraph'} for p in paragraphs if p.strip()] def _chunk_by_sentences(self, text: str) -> List[Dict]: """Split by sentences""" import re sentences = re.split(r'(?<=[.!?])\s+', text) return [{'text': s, 'level': 'sentence'} for s in sentences if s.strip()] # Usage: Store all levels, retrieve based on query type Recommended Strategy For general enterprise knowledge: chunker = SemanticChunker( embedder=OpenAIEmbeddings(model="text-embedding-3-small"), max_chunk_size=1000, threshold=0.75 ) For technical documentation: chunker = HierarchicalChunker(embedder) # Allows retrieval at appropriate granularity Retrieval Optimization Improving Retrieval Quality 1. Query Rewriting # retrieval/query_rewrite.py from typing import List class QueryRewriter: """Rewrite queries for better retrieval""" def __init__(self, llm): self.llm = llm async def rewrite(self, query: str, context: str = "") -> str: """Rewrite query to improve retrieval""" prompt = f"""Rewrite the following query to improve information retrieval. Original query: {query} Context: {context} Guidelines: 1. Make the query more specific 2. Add relevant domain terms 3. Fix grammatical issues 4. Expand abbreviations 5. Keep the intent the same Rewritten query:""" response = await self.llm.generate(prompt) return response.strip() async def decompose(self, query: str) -> List[str]: """Decompose complex query into sub-queries""" prompt = f"""Break down the following query into 2-4 simpler sub-queries. Query: {query} Return one sub-query per line.""" response = await self.llm.generate(prompt) sub_queries = [line.strip() for line in response.split(' ') if line.strip()] return sub_queries 2. Reranking # retrieval/reranker.py from typing import List, Dict import openai class Reranker: """Rerank retrieved results for better relevance""" def __init__(self, model: str = "gpt-4o-mini"): self.client = openai.AsyncClient() self.model = model async def rerank( self, query: str, results: List[Dict], top_k: int = 10 ) -> List[Dict]: """Rerank results based on query relevance""" if len(results) <= top_k: return results # Prepare reranking prompt result_texts = [ f"[{i+1}] {r['text'][:500]}" for i, r in enumerate(results[:20]) # Rerank top 20 ] prompt = f"""Rank the following passages by their relevance to the query. Query: {query} Passages: {chr(10).join(result_texts)} Instructions: 1. Rank passages from most relevant (1) to least relevant (20) 2. Return only the rankings as a comma-separated list 3. Consider: direct answers, completeness, specificity Rankings:""" response = await self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.1 ) # Parse rankings rankings = response.choices[0].message.content ranked_indices = [int(x.strip()) - 1 for x in rankings.split(',')] # Reorder results reranked = [results[i] for i in ranked_indices if i < len(results)] return reranked[:top_k] 3. Hybrid Search # retrieval/hybrid.py class HybridSearcher: """Combine semantic and keyword search""" def __init__(self, vector_store, keyword_index): self.vector_store = vector_store self.keyword_index = keyword_index # BM25 or similar async def search( self, query: str, top_k: int = 10, alpha: float = 0.5 ) -> List[Dict]: """ Hybrid search combining semantic and keyword Args: query: Search query top_k: Number of results alpha: Semantic search weight (0-1) Returns: Combined and reranked results """ # Semantic search semantic_results = await self.vector_store.search(query, top_k * 2) # Keyword search keyword_results = await self.keyword_index.search(query, top_k * 2) # Combine scores combined = self._combine_results( semantic_results, keyword_results, alpha ) # Remove duplicates and sort seen = set() unique_results = [] for result in combined: if result['id'] not in seen: seen.add(result['id']) unique_results.append(result) return unique_results[:top_k] def _combine_results( self, semantic: List[Dict], keyword: List[Dict], alpha: float ) -> List[Dict]: """Combine and score results""" # Normalize scores sem_scores = [r['score'] for r in semantic] key_scores = [r['score'] for r in keyword] sem_max, sem_min = max(sem_scores), min(sem_scores) key_max, key_min = max(key_scores), min(key_scores) # Normalize to 0-1 for r in semantic: r['normalized_score'] = (r['score'] - sem_min) / (sem_max - sem_min) if sem_max > sem_min else 0 for r in keyword: r['normalized_score'] = (r['score'] - key_min) / (key_max - key_min) if key_max > key_min else 0 # Combine combined = {} for r in semantic: combined[r['id']] = { **r, 'combined_score': alpha * r['normalized_score'] } for r in keyword: if r['id'] in combined: combined[r['id']]['combined_score'] += (1 - alpha) * r['normalized_score'] else: combined[r['id']] = { **r, 'combined_score': (1 - alpha) * r['normalized_score'] } # Sort by combined score results = list(combined.values()) results.sort(key=lambda x: x['combined_score'], reverse=True) return results 4. Metadata Filtering # retrieval/filtering.py from typing import Dict, List, Any class MetadataFilter: """Apply metadata filters to search results""" @staticmethod def apply_filters( results: List[Dict], filters: Dict[str, Any] ) -> List[Dict]: """Filter results based on metadata""" filtered = results for key, value in filters.items(): if isinstance(value, list): # Filter: value in list filtered = [ r for r in filtered if r['metadata'].get(key) in value ] elif isinstance(value, dict): # Range filter if '$gte' in value: filtered = [ r for r in filtered if r['metadata'].get(key, 0) >= value['$gte'] ] if '$lte' in value: filtered = [ r for r in filtered if r['metadata'].get(key, float('inf')) <= value['$lte'] ] else: # Exact match filtered = [ r for r in filtered if r['metadata'].get(key) == value ] return filtered # Usage filtered_results = MetadataFilter.apply_filters(results, { 'category': ['technology', 'ai'], 'created_at': {'$gte': '2025-01-01'}, 'status': 'published' }) Generation and Synthesis Prompt Engineering for RAG 1. Basic RAG Prompt def build_basic_rag_prompt(query: str, context: List[Dict]) -> str: """Build basic RAG prompt""" context_str = " --- ".join([ f"Document: {chunk['metadata']['title']} {chunk['text']}" for chunk in context ]) return f"""You are a helpful assistant. Answer the following question using the provided context. Context: {context_str} Question: {query} Instructions: 1. Base your answer only on the provided context 2. If the context doesn't contain the answer, say "I don't have enough information to answer this" 3. Cite sources using [Document X] notation 4. Be accurate and concise Answer:""" 2. Advanced Multi-Source Prompt def build_advanced_rag_prompt( query: str, context: List[Dict], conversation_history: List[Dict] = None ) -> str: """Build advanced RAG prompt with conversation history""" context_by_source = {} for chunk in context: source = chunk['metadata']['source'] if source not in context_by_source: context_by_source[source] = [] context_by_source[source].append(chunk) context_str = " ".join([ f"## {source} " + " ".join([c['text'] for c in chunks]) for source, chunks in context_by_source.items() ]) history_str = "" if conversation_history: history_str = " ".join([ f"{msg['role']}: {msg['content']}" for msg in conversation_history[-5:] # Last 5 messages ]) return f"""You are an expert knowledge assistant. Help answer questions by synthesizing information from multiple sources. ### Conversation History {history_str} ### Available Sources {context_str} ### Current Question {query} ### Instructions 1. Synthesize information from multiple sources when relevant 2. Acknowledge when sources disagree or conflict 3. Prioritize recent and authoritative sources 4. Use [Source: Document Title] citations 5. If information is missing, explicitly state what's unknown 6. Provide a clear, well-structured answer ### Answer Format - Start with a direct answer - Follow with supporting details - Include source citations - End with limitations (if any) Answer:""" 3. Chain-of-Thought Prompting def build_cot_rag_prompt(query: str, context: List[Dict]) -> str: """Build chain-of-thought RAG prompt""" return f"""Answer the following question using the provided context. Show your reasoning. Context: {' '.join([c['text'] for c in context[:3]])} Question: {query} Think step by step: 1. What is the question asking? 2. What relevant information is in the context? 3. What can I conclude from this information? 4. What information is missing? Answer:""" Response Post-Processing # generation/post_process.py from typing import Dict, List import re class ResponsePostProcessor: """Post-process generated responses""" @staticmethod def add_citations(response: str, context: List[Dict]) -> Dict: """Add citation links to response""" # Find [Source X] references citations = re.findall(r'\[Source (\d+)\]', response) # Create citation mapping citation_map = {} for cit in set(citations): idx = int(cit) - 1 if idx < len(context): citation_map[cit] = { 'title': context[idx]['metadata']['title'], 'source': context[idx]['metadata']['source'], 'url': context[idx]['metadata'].get('url', '#') } return { 'response': response, 'citations': citation_map } @staticmethod def extract_key_points(response: str) -> List[str]: """Extract key points from response""" prompt = f"""Extract the key points from the following response. Response: {response} Return one key point per line.""" # Use LLM to extract # Implementation depends on your LLM setup return [] @staticmethod def format_response( response: str, citations: List[Dict], sources: List[Dict] ) -> Dict: """Format final response for API""" return { 'answer': response, 'citations': [ { 'index': i + 1, 'title': c['title'], 'source': c['source'], 'url': c.get('url') } for i, c in enumerate(citations) ], 'sources': sources, 'answer_length': len(response), 'citation_count': len(citations) } Evaluation and Quality Assurance Metrics for RAG Systems 1. Retrieval Metrics # evaluation/retrieval.py from typing import List, Dict class RetrievalEvaluator: """Evaluate retrieval quality""" @staticmethod def precision_at_k(retrieved: List[Dict], relevant: List[str], k: int) -> float: """Calculate Precision@K""" retrieved_ids = [r['id'] for r in retrieved[:k]] relevant_retrieved = set(retrieved_ids) & set(relevant) return len(relevant_retrieved) / k @staticmethod def recall_at_k(retrieved: List[Dict], relevant: List[str], k: int) -> float: """Calculate Recall@K""" retrieved_ids = [r['id'] for r in retrieved[:k]] relevant_retrieved = set(retrieved_ids) & set(relevant) return len(relevant_retrieved) / len(relevant) @staticmethod def mrr(retrieved: List[Dict], relevant: List[str]) -> float: """Calculate Mean Reciprocal Rank""" retrieved_ids = [r['id'] for r in retrieved] for i, doc_id in enumerate(retrieved_ids, 1): if doc_id in relevant: return 1 / i return 0.0 @staticmethod def ndcg(retrieved: List[Dict], relevant: List[str], k: int) -> float: """Calculate Normalized DCG""" retrieved_ids = [r['id'] for r in retrieved[:k]] dcg = 0.0 for i, doc_id in enumerate(retrieved_ids, 1): if doc_id in relevant: dcg += 1 / np.log2(i + 1) # Ideal DCG idcg = sum(1 / np.log2(i + 1) for i in range(1, min(len(relevant), k) + 1)) return dcg / idcg if idcg > 0 else 0.0 2. Generation Metrics # evaluation/generation.py import openai class GenerationEvaluator: """Evaluate generation quality using LLM-as-a-judge""" def __init__(self, model: str = "gpt-4o"): self.client = openai.Client() self.model = model def evaluate_relevance(self, query: str, response: str, context: List[Dict]) -> Dict: """Evaluate if response is relevant to query""" prompt = f"""Rate the relevance of the following response to the query. Query: {query} Response: {response} Available Context: {' '.join([c['text'][:200] for c in context[:3]])} Rate on: 1. Relevance (0-100): Does it answer the question? 2. Accuracy (0-100): Is it factually correct based on context? 3. Completeness (0-100): Does it provide sufficient detail? 4. Citation Quality (0-100): Are citations appropriate? Return as JSON: {{"relevance": X, "accuracy": Y, "completeness": Z, "citation_quality": W}}""" response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"} ) import json return json.loads(response.choices[0].message.content) def evaluate_hallucination(self, response: str, context: List[Dict]) -> Dict: """Check for hallucinations""" context_text = " ".join([c['text'] for c in context]) prompt = f"""Analyze the following response for hallucinations (information not supported by context). Response: {response} Context: {context_text} Identify: 1. Factual claims not in context 2. Invented sources or citations 3. Contradictions to context 4. Speculative statements presented as fact Return as JSON with hallucinations list and severity (low/medium/high).""" response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"} ) import json return json.loads(response.choices[0].message.content) 3. End-to-End Evaluation # evaluation/e2e.py from typing import List, Dict import asyncio class RAGEvaluator: """End-to-end RAG system evaluation""" def __init__(self, rag_system, evaluator): self.rag_system = rag_system self.evaluator = evaluator async def evaluate_on_dataset( self, test_questions: List[Dict], metrics: List[str] = None ) -> Dict: """ Evaluate RAG system on test dataset Args: test_questions: List of {query, relevant_docs, expected_answer} metrics: Metrics to compute Returns: Evaluation results """ if metrics is None: metrics = ['precision', 'recall', 'mrr', 'ndcg', 'relevance', 'accuracy'] results = { 'retrieval': {}, 'generation': {}, 'overall': {} } for question in test_questions: query = question['query'] relevant = question['relevant_docs'] # 1. Retrieve retrieved = await self.rag_system.retrieve(query, top_k=10) # 2. Evaluate retrieval if 'precision' in metrics: prec = RetrievalEvaluator.precision_at_k(retrieved, relevant, 5) results['retrieval']['precision'] = results['retrieval'].get('precision', []) + [prec] if 'recall' in metrics: rec = RetrievalEvaluator.recall_at_k(retrieved, relevant, 10) results['retrieval']['recall'] = results['retrieval'].get('recall', []) + [rec] if 'mrr' in metrics: mrr = RetrievalEvaluator.mrr(retrieved, relevant) results['retrieval']['mrr'] = results['retrieval'].get('mrr', []) + [mrr] # 3. Generate and evaluate response = await self.rag_system.generate(query, retrieved[:5]) if 'relevance' in metrics: gen_eval = self.evaluator.evaluate_relevance(query, response, retrieved[:5]) for metric, value in gen_eval.items(): results['generation'][metric] = results['generation'].get(metric, []) + [value] # Compute averages for category in ['retrieval', 'generation']: for metric, values in results[category].items(): results['overall'][f'{category}_{metric}'] = sum(values) / len(values) return results # Usage evaluator = RAGEvaluator(rag_system, generation_evaluator) results = await evaluator.evaluate_on_dataset(test_questions) print(f"Overall Precision@5: {results['overall']['retrieval_precision']:.2f}") print(f"Overall Relevance: {results['overall']['generation_relevance']:.2f}") Scaling Considerations Horizontal Scaling # scaling/distributed.py from typing import List, Dict import asyncio import numpy as np class DistributedRAGSystem: """Distributed RAG system for horizontal scaling""" def __init__(self, config: Dict): # Multiple embedding models for parallel processing self.embedders = [ EmbeddingGenerator(config['embeddings']) for _ in range(config['embedding_workers']) ] # Sharded vector stores self.vector_stores = [ VectorStore(config['vector_db'], shard_id=i) for i in range(config['num_shards']) ] async def embed_batch_parallel(self, texts: List[str]) -> np.ndarray: """Embed texts in parallel""" batch_size = len(texts) // len(self.embedders) batches = [ texts[i * batch_size:(i + 1) * batch_size] for i in range(len(self.embedders)) ] # Parallel embedding results = await asyncio.gather(*[ self.embedders[i].generate_batch(batch) for i, batch in enumerate(batches) ]) # Combine results embeddings = np.concatenate(results) return embeddings async def retrieve_distributed( self, query_embedding: np.ndarray, top_k: int = 10 ) -> List[Dict]: """Retrieve from all shards in parallel""" # Query all shards in parallel shard_results = await asyncio.gather(*[ shard.search(query_embedding, top_k=top_k * 2) for shard in self.vector_stores ]) # Combine and deduplicate all_results = [] seen = set() for results in shard_results: for result in results: if result['id'] not in seen: seen.add(result['id']) all_results.append(result) # Sort by score and return top_k all_results.sort(key=lambda x: x['score'], reverse=True) return all_results[:top_k] Caching Strategy # scaling/cache.py from typing import Dict, List, Optional import hashlib import json class RAGCache: """Cache for RAG queries and responses""" def __init__(self, ttl: int = 3600): self.cache = {} # In production, use Redis self.ttl = ttl def _generate_key(self, query: str, filters: Dict = None) -> str: """Generate cache key""" key_data = {'query': query, 'filters': filters} key_str = json.dumps(key_data, sort_keys=True) return hashlib.sha256(key_str.encode()).hexdigest() def get(self, query: str, filters: Dict = None) -> Optional[Dict]: """Get cached response""" key = self._generate_key(query, filters) if key in self.cache: cached = self.cache[key] if time.time() - cached['timestamp'] < self.ttl: return cached['response'] else: del self.cache[key] # Expired return None def set(self, query: str, response: Dict, filters: Dict = None): """Cache response""" key = self._generate_key(query, filters) self.cache[key] = { 'response': response, 'timestamp': time.time() } def invalidate_document(self, document_id: str): """Invalidate cache entries for a document""" # In production, implement smarter invalidation self.cache.clear() Production Deployment Deployment Architecture # docker-compose.yml for production RAG system version: '3.8' services: # API Gateway api: build: ./api ports: - "8000:8000" environment: - DATABASE_URL=postgresql://user:pass@postgres:5432/rag - REDIS_URL=redis://redis:6379 - OPENAI_API_KEY=${OPENAI_API_KEY} depends_on: - postgres - redis # PostgreSQL with pgvector postgres: image: pgvector/pgvector:pg16 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=rag volumes: - postgres_data:/var/lib/postgresql/data ports: - "5432:5432" # Redis for caching redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data # Worker for background tasks worker: build: ./worker environment: - DATABASE_URL=postgresql://user:pass@postgres:5432/rag - REDIS_URL=redis://redis:6379 depends_on: - postgres - redis # Monitoring prometheus: image: prom/prometheus ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml grafana: image: grafana/grafana ports: - "3000:3000" volumes: - grafana_data:/var/lib/grafana volumes: postgres_data: redis_data: grafana_data: API Implementation # api/main.py from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Dict, Optional import asyncio from rag_system import RAGSystem from cache import RAGCache app = FastAPI(title="Enterprise RAG API") # Add CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # Initialize RAG system rag_system = RAGSystem(config={ 'database_url': 'postgresql://...', 'openai_api_key': '...', 'cache_ttl': 3600 }) cache = RAGCache(ttl=3600) class QueryRequest(BaseModel): query: str top_k: int = 10 filters: Optional[Dict] = None conversation_id: Optional[str] = None class QueryResponse(BaseModel): answer: str citations: List[Dict] sources: List[Dict] retrieval_time: float generation_time: float total_time: float @app.post("/api/v1/query", response_model=QueryResponse) async def query(request: QueryRequest): """Query the RAG system""" start_time = time.time() # Check cache cached_response = cache.get(request.query, request.filters) if cached_response: return cached_response try: # 1. Retrieve retrieval_start = time.time() context = await rag_system.retrieve( request.query, top_k=request.top_k, filters=request.filters ) retrieval_time = time.time() - retrieval_start # 2. Generate generation_start = time.time() response = await rag_system.generate( request.query, context, conversation_id=request.conversation_id ) generation_time = time.time() - generation_start total_time = time.time() - start_time # Format response result = QueryResponse( answer=response['answer'], citations=response['citations'], sources=response['sources'], retrieval_time=retrieval_time, generation_time=generation_time, total_time=total_time ) # Cache response cache.set(request.query, result.dict(), request.filters) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v1/ingest") async def ingest_document(document: Dict): """Ingest a document into the knowledge base""" try: chunk_ids = await rag_system.ingest(document) return {"status": "success", "chunk_ids": chunk_ids} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v1/health") async def health(): """Health check endpoint""" return {"status": "healthy", "timestamp": time.time()} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) Monitoring and Observability Metrics Collection # monitoring/metrics.py from prometheus_client import Counter, Histogram, Gauge import time # Define metrics query_counter = Counter( 'rag_queries_total', 'Total number of RAG queries', ['status'] ) query_duration = Histogram( 'rag_query_duration_seconds', 'RAG query duration', ['stage'] # retrieval, generation, total ) cache_hits = Counter( 'rag_cache_hits_total', 'Total cache hits' ) cache_misses = Counter( 'rag_cache_misses_total', 'Total cache misses' ) embedding_queue_size = Gauge( 'rag_embedding_queue_size', 'Current embedding queue size' ) class RAGMetrics: """Collect and report RAG metrics""" @staticmethod def record_query(status: str): query_counter.labels(status=status).inc() @staticmethod def record_duration(stage: str, duration: float): query_duration.labels(stage=stage).observe(duration) @staticmethod def record_cache_hit(): cache_hits.inc() @staticmethod def record_cache_miss(): cache_misses.inc() Logging # monitoring/logging.py import logging import json from datetime import datetime class RAGLogger: """Structured logging for RAG systems""" def __init__(self): self.logger = logging.getLogger('rag_system') def log_query( self, query: str, context: List[Dict], response: Dict, duration: float ): """Log query with full context""" log_entry = { 'event': 'query', 'timestamp': datetime.now().isoformat(), 'query': query, 'context_count': len(context), 'context_sources': [c['metadata']['source'] for c in context], 'response_length': len(response.get('answer', '')), 'citation_count': len(response.get('citations', [])), 'duration_ms': duration * 1000, 'retrieval_time_ms': response.get('retrieval_time', 0) * 1000, 'generation_time_ms': response.get('generation_time', 0) * 1000 } self.logger.info(json.dumps(log_entry)) def log_error(self, error: Exception, context: Dict): """Log error with context""" log_entry = { 'event': 'error', 'timestamp': datetime.now().isoformat(), 'error_type': type(error).__name__, 'error_message': str(error), 'context': context } self.logger.error(json.dumps(log_entry)) Real-World Implementation Complete Enterprise RAG System # rag_system.py from typing import List, Dict, Optional import asyncio from ingestion.pipeline import DocumentIngestionPipeline from retrieval.engine import RetrievalEngine from generation.generator import ResponseGenerator from cache import RAGCache from monitoring.metrics import RAGMetrics from monitoring.logging import RAGLogger class RAGSystem: """Complete RAG system for enterprise""" def __init__(self, config: Dict): # Initialize components self.ingestion_pipeline = DocumentIngestionPipeline(config['ingestion']) self.retrieval_engine = RetrievalEngine( config['vector_store'], config['embeddings'], config['retrieval'] ) self.generator = ResponseGenerator(config['generation']) self.cache = RAGCache(ttl=config.get('cache_ttl', 3600)) self.metrics = RAGMetrics() self.logger = RAGLogger() async def ingest(self, document: Dict) -> List[str]: """Ingest document into knowledge base""" try: chunk_ids = await self.ingestion_pipeline.ingest_document(document) self.logger.log_ingestion(document, chunk_ids) return chunk_ids except Exception as e: self.logger.log_error(e, {'document': document}) raise async def query( self, query: str, top_k: int = 10, filters: Optional[Dict] = None, use_cache: bool = True ) -> Dict: """Query the RAG system""" start_time = time.time() # Check cache if use_cache: cached = self.cache.get(query, filters) if cached: self.metrics.record_cache_hit() return cached self.metrics.record_cache_miss() try: # Retrieve retrieval_start = time.time() context = await self.retrieval_engine.retrieve(query, top_k, filters) retrieval_time = time.time() - retrieval_start # Generate generation_start = time.time() response = await self.generator.generate_response(query, context) generation_time = time.time() - generation_start # Add timing info response['retrieval_time'] = retrieval_time response['generation_time'] = generation_time response['total_time'] = time.time() - start_time # Log query self.logger.log_query(query, context, response, response['total_time']) # Record metrics self.metrics.record_query('success') self.metrics.record_duration('total', response['total_time']) self.metrics.record_duration('retrieval', retrieval_time) self.metrics.record_duration('generation', generation_time) # Cache response if use_cache: self.cache.set(query, response, filters) return response except Exception as e: self.metrics.record_query('error') self.logger.log_error(e, {'query': query, 'filters': filters}) raise async def bulk_ingest(self, documents: List[Dict]) -> Dict: """Ingest multiple documents""" results = { 'successful': 0, 'failed': 0, 'errors': [] } for document in documents: try: await self.ingest(document) results['successful'] += 1 except Exception as e: results['failed'] += 1 results['errors'].append({ 'document_id': document.get('id'), 'error': str(e) }) return results async def delete_document(self, document_id: str): """Delete document from knowledge base""" # Implementation depends on vector store pass async def update_document(self, document_id: str, updated_document: Dict): """Update document in knowledge base""" # Delete old, insert new await self.delete_document(document_id) await self.ingest(updated_document) Key Takeaways Building production RAG systems requires careful consideration of: Vector Database Selection - Choose based on your specific needs Embedding Strategy - Balance cost, quality, and performance Chunking Technique - Use semantic chunking for best results Retrieval Optimization - Hybrid search with reranking Quality Evaluation - Continuous monitoring and improvement Scaling Strategy - Plan for horizontal scaling from day one Monitoring & Observability - Essential for production systems The key to success is iterative improvement: Start with a simple baseline Measure everything Optimize based on data Scale as needed RAG systems are transforming how enterprises access and utilize their knowledge. With the right architecture and implementation, you can build powerful, accurate, and scalable knowledge search systems that provide real business value. Sources: Vectara: Enterprise RAG Predictions (2025) Β· RAG Definitive Guide 2025: 70-90% Hallucination Reduction Β· Morphik: RAG at Scale β 3-5x Faster Information Retrieval (2025) Frequently Asked Questions What is RAG and how does it work? RAG (Retrieval-Augmented Generation) combines a vector search retrieval step with an LLM generation step. When a user submits a query, the system encodes it into a vector embedding, searches a vector database for the most semantically similar document chunks, and injects those chunks as context into the LLM prompt. The LLM then generates an answer grounded in the retrieved documents rather than relying solely on its training data. How does RAG reduce AI hallucinations? Hallucinations occur when an LLM fabricates information not present in its training data. RAG mitigates this by providing the model with authoritative source documents as in-context evidence for every response. Studies show RAG reduces hallucination rates by 70-90% compared to standalone LLMs. Including source citations in the response allows end users to verify claims against original documents. What chunk size should I use when indexing documents for RAG? Optimal chunk size depends on the document type and retrieval strategy. A common starting point is 512-1024 tokens per chunk with a 10-15% overlap between consecutive chunks to preserve sentence context. Smaller chunks (128-256 tokens) improve retrieval precision for Q&A tasks, while larger chunks (1024-2048 tokens) work better for summarization. Always evaluate chunk size empirically using retrieval hit rate metrics. Which vector database is best for enterprise RAG? pgvector (PostgreSQL extension) is the top choice for organizations already running PostgreSQL, as it eliminates the need for a separate vector database and supports ACID transactions. Pinecone and Weaviate excel for large-scale, dedicated vector workloads requiring billions of vectors. For self-hosted deployments, Qdrant and Chroma offer strong performance with simple operational footprints. How do you evaluate RAG system quality? The three core RAG metrics are context recall (did retrieval find the relevant documents?), faithfulness (does the answer accurately reflect the retrieved context?), and answer relevancy (does the answer address the user's question?). Frameworks like RAGAS and TruLens provide automated evaluation pipelines for these metrics. Establishing a golden QA dataset specific to your domain is essential for ongoing quality monitoring. How much does a production RAG system cost to run? Costs divide into three buckets: embedding generation (typically $0.0001 per 1000 tokens with text-embedding-3-small), vector database hosting ($100-500/month for most mid-size deployments), and LLM inference ($0.01-0.06 per 1000 output tokens for GPT-4o). For an enterprise system processing 10,000 queries per day, expect total costs of $500-3000/month depending on document volume and model choice. Need Help Building a RAG System? Our AI Agent Teams have built production RAG systems for 200+ enterprise clients. Starting at $22/hr. Hire AI-First Engineers | Get Free Estimate Related Articles: Building Multi-Agent Systems with LangChain MongoDB to PostgreSQL + pgvector: Our Migration Journey Building Production-Ready AI Agents: A Practical Guide AI-First Development: Build Software 10-20X Faster Published: January 2026 | Author: Groovy Web Team | Category: AI Development 📋 Get the Free Checklist Download the key takeaways from this article as a practical, step-by-step checklist you can reference anytime. Email Address Send Checklist No spam. Unsubscribe anytime. Ship 10-20X Faster with AI Agent Teams Our AI-First engineering approach delivers production-ready applications in weeks, not months. Starting at $22/hr. Get Free Consultation Was this article helpful? Yes No Thanks for your feedback! We'll use it to improve our content. Written by Groovy Web Team Groovy Web is an AI-First development agency specializing in building production-grade AI applications, multi-agent systems, and enterprise solutions. We've helped 200+ clients achieve 10-20X development velocity using AI Agent Teams. Hire Us β’ More Articles