Software Development MongoDB to PostgreSQL + pgvector: Our Migration Journey Groovy Web Team January 30, 2026 23 min read 78 views Blog Software Development MongoDB to PostgreSQL + pgvector: Our Migration Journey A deep-dive into migrating 2.3M documents from MongoDB to PostgreSQL + pgvector. We cover schema design, ETL architecture, zero-downtime strategy β and achieved 10X query latency improvement with 80% infrastructure cost reduction. Why We Migrated At Groovy Web, we built an enterprise knowledge management platform β part of the broader transformation to AI-first engineering on MongoDB. It worked great for document storage and flexible schemas. But as we added AI-powered semantic search and RAG (Retrieval-Augmented Generation) capabilities, MongoDB's limitations became apparent. The tipping point? We needed to support: Vector similarity search for semantic queries Complex joins across related entities ACID transactions for data consistency Advanced analytics with window functions Full-text search combined with vector search After extensive evaluation, we chose PostgreSQL with pgvector extension. This decision transformed our application's capabilities and performance. In this guide, I'll share our complete migration journeyβincluding the mistakes we made, lessons learned, and a reusable migration framework. The Migration Decision Evaluation Criteria We evaluated database options based on these requirements: Requirement MongoDB PostgreSQL MySQL Pinecone Vector similarity β Requires external β pgvector β οΈ Limited β Native ACID transactions β Document-level β Full support β Full support β No Complex joins β $lookup (slow) β Optimized β Optimized β No Schema flexibility β Excellent β οΈ Migration needed β οΈ Migration needed N/A Full-text search β Built-in β tsvector β οΈ Basic β Hybrid Window functions β No β Full support β Full support β No Materialized views β No β Native β οΈ Limited β No Cost $$$$ (Atlas) $$ (self-hosted) $$ (self-hosted) $$$$ (managed) Why PostgreSQL + pgvector? 1. Unified Data Store Previously, we used: MongoDB for documents Pinecone for vectors (additional cost and complexity) Redis for caching With PostgreSQL + pgvector: Documents β JSONB columns Vectors β pgvector columns Caching β MATERIALIZED VIEWs Single source of truth 2. Cost Savings Before: - MongoDB Atlas (M50 cluster): $2,400/month - Pinecone (1M vectors): $1,200/month - Redis Cloud (Large): $600/month Total: $4,200/month After: - PostgreSQL (Managed, 8 vCPU, 32GB RAM): $800/month - Savings: $3,400/month (81% reduction) 3. Query Performance Vector search with metadata filtering: MongoDB (with external vector DB): // Required two queries const results = await pinecone.query(vector, { topK: 100 }); const ids = results.map(r => r.id); const documents = await mongo.collection('docs').find({ _id: { $in: ids }, status: 'published' // Additional filter }).toArray(); // Total: ~450ms for 100 results PostgreSQL + pgvector: -- Single query with vector + metadata filter SELECT id, title, content, 1 - (embedding <=> $1) as similarity FROM documents WHERE status = 'published' ORDER BY embedding <=> $1 LIMIT 100; -- Total: ~45ms for 100 results (10x faster) 4. ACID Transactions -- Atomic update across multiple tables BEGIN; UPDATE documents SET content = $1 WHERE id = $2; UPDATE document_stats SET word_count = $3 WHERE doc_id = $2; INSERT INTO document_revisions (doc_id, content, created_at) VALUES ($2, $4, NOW()); COMMIT; Migration at a Glance Metric Before (MongoDB) After (PostgreSQL) Database instances 3 (Mongo, Pinecone, Redis) 1 (PostgreSQL) Monthly cost $4,200 $800 Vector query latency 450ms 45ms Complex join queries Not possible 25ms Data consistency Eventual Strong Backup/restore 4 hours 30 minutes Team familiarity High Medium Pre-Migration Planning Step 1: Schema Analysis First, we analyzed our MongoDB schemas: // MongoDB collections analyzed db.listCollections().toArray() // Example: documents collection { _id: ObjectId("..."), title: "Building AI Systems", content: "Full text content...", metadata: { author: "John Doe", category: "Technology", tags: ["AI", "Machine Learning"], created_at: ISODate("2026-01-15"), updated_at: ISODate("2026-01-20") }, embedding: [0.1, 0.2, ...], // 1536-dimensional vector status: "published", version: 3 } // Indexes db.documents.getIndexes() Schema mapping strategy: MongoDB Type PostgreSQL Type Notes ObjectId UUID More readable, widely supported String VARCHAR(n) or TEXT Use VARCHAR for indexed fields Number NUMERIC or INTEGER Preserve precision Date TIMESTAMPTZ Always use timezone-aware Array TEXT[] or JSONB Depends on array contents Object JSONB Preserve flexible schemas Vector (Array) VECTOR(1536) pgvector extension Step 2: Dependency Mapping We mapped all application dependencies: # scripts/analyze_dependencies.py import pymongo from collections import defaultdict client = pymongo.MongoClient("mongodb://localhost:27017") db = client["knowledge_base"] # Find all collections collections = db.list_collection_names() # Analyze relationships relationships = defaultdict(set) for collection in collections: # Scan sample documents for doc in db[collection].find().limit(1000): for key, value in doc.items(): # Look for references (ObjectId fields ending with _id) if key.endswith('_id') and isinstance(value, ObjectId): referenced_collection = key.replace('_id', 's') relationships[collection].add(referenced_collection) # Output dependency graph for source, targets in relationships.items(): print(f"{source} -> {', '.join(targets)}") Output: documents -> users, categories, tags comments -> documents, users revisions -> documents versions -> documents Step 3: Performance Baseline Establish baseline metrics before migration: # scripts/benchmark_mongodb.py import time import pymongo def benchmark_query(db, query_name, query_func, iterations=100): times = [] for _ in range(iterations): start = time.time() result = query_func() times.append((time.time() - start) * 1000) # ms return { 'query': query_name, 'avg_ms': sum(times) / len(times), 'p50_ms': sorted(times)[len(times) // 2], 'p95_ms': sorted(times)[int(len(times) * 0.95)], 'p99_ms': sorted(times)[int(len(times) * 0.99)] } # Define benchmark queries queries = [ ('simple_find', lambda: db.documents.find_one({'_id': doc_id})), ('complex_aggregate', lambda: db.documents.aggregate([ {'$match': {'status': 'published'}}, {'$lookup': {'from': 'users', 'localField': 'author_id', 'foreignField': '_id', 'as': 'author'}}, {'$limit': 100} ]).to_list(None)), ('text_search', lambda: db.documents.find({'$text': {'$search': 'machine learning'}}).limit(50).to_list(None)), ] for name, func in queries: metrics = benchmark_query(db, name, func) print(f"{name}: {metrics['p95_ms']:.2f}ms (p95)") Results saved for post-migration comparison. Schema Design Strategy Document Collection Schema MongoDB: { _id: ObjectId("65b8a3d2e4b0f3a9c8d7e6f5"), title: "Building AI Systems with LangChain", content: "Full article content...", metadata: { author_id: ObjectId("65b8a3d2e4b0f3a9c8d7e6f4"), category: "Technology", tags: ["AI", "Machine Learning", "LangChain"], published_at: ISODate("2026-01-15T10:30:00Z"), word_count: 2500 }, embedding: [0.123, 0.456, ...], // 1536 dimensions status: "published", version: 3, created_at: ISODate("2026-01-10T08:00:00Z"), updated_at: ISODate("2026-01-20T14:30:00Z") } PostgreSQL: -- Main documents table CREATE TABLE documents ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), title VARCHAR(500) NOT NULL, content TEXT NOT NULL, author_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, category_id INTEGER REFERENCES categories(id) ON DELETE SET NULL, status document_status NOT NULL DEFAULT 'draft', version INTEGER NOT NULL DEFAULT 1, word_count INTEGER, published_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -- Full-text search index CREATE INDEX idx_documents_content_gin ON documents USING gin(to_tsvector('english', content)); -- Vector embedding column ALTER TABLE documents ADD COLUMN embedding vector(1536); CREATE INDEX idx_documents_embedding_ivfflat ON documents USING ivfflat(embedding vector_cosine_ops) WITH (lists = 100); -- JSONB for flexible metadata ALTER TABLE documents ADD COLUMN metadata JSONB DEFAULT '{}'; CREATE INDEX idx_documents_metadata_gin ON documents USING gin(metadata); -- Composite index for common queries CREATE INDEX idx_documents_status_published ON documents(status, published_at DESC) WHERE status = 'published'; -- Tag many-to-many relationship CREATE TABLE document_tags ( document_id UUID REFERENCES documents(id) ON DELETE CASCADE, tag_id INTEGER REFERENCES tags(id) ON DELETE CASCADE, PRIMARY KEY (document_id, tag_id) ); -- Document revisions (history tracking) CREATE TABLE document_revisions ( id SERIAL PRIMARY KEY, document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, version INTEGER NOT NULL, title VARCHAR(500), content TEXT, metadata JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_by_id UUID REFERENCES users(id), UNIQUE (document_id, version) ); -- Document statistics (materialized view) CREATE MATERIALIZED VIEW document_statistics AS SELECT d.id, d.title, d.status, d.word_count, COUNT(DISTINCT c.id) as comment_count, COUNT(DISTINCT v.id) as view_count, AVG(r.rating) as avg_rating, MAX(d.updated_at) as last_updated FROM documents d LEFT JOIN comments c ON c.document_id = d.id LEFT JOIN views v ON v.document_id = d.id LEFT JOIN ratings r ON r.document_id = d.id GROUP BY d.id, d.title, d.status, d.word_count, d.updated_at; CREATE UNIQUE INDEX ON document_statistics(id); -- Refresh strategy (cron job or trigger) CREATE OR REPLACE FUNCTION refresh_document_statistics() RETURNS TRIGGER AS $$ BEGIN REFRESH MATERIALIZED VIEW CONCURRENTLY document_statistics; RETURN NULL; END; $$ LANGUAGE plpgsql; Key Design Decisions 1. UUID vs ObjectId -- We chose UUID over serial/auto-increment CREATE TABLE documents ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- UUID v4 -- or id UUID PRIMARY KEY DEFAULT uuid_generate_v7() -- UUID v7 (time-ordered) ); Why UUID v7? Time-ordered like MongoDB ObjectId Globally unique across databases Better indexing performance than random UUID v4 No exposure of record counts 2. JSONB for Flexible Metadata -- Store flexible metadata in JSONB UPDATE documents SET metadata = jsonb_set( metadata, '{seo_keywords}', '["AI", "LangChain", "Multi-Agent Systems"]'::jsonb ) WHERE id = '...'; -- Query JSONB fields SELECT title, metadata->'seo_keywords' as keywords FROM documents WHERE metadata @> '{"featured": true}'; -- Index on JSONB paths CREATE INDEX idx_documents_metadata_featured ON documents ((metadata->>'featured')) WHERE metadata ? 'featured'; 3. Vector Index Strategy -- IVFFlat index for approximate search (faster, less accurate) CREATE INDEX idx_documents_embedding_ivfflat ON documents USING ivfflat(embedding vector_cosine_ops) WITH (lists = 100); -- lists = sqrt(rows) for optimal performance -- HNSW index for better accuracy (slower build, faster query) CREATE INDEX idx_documents_embedding_hnsw ON documents USING hnsw(embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64); Index selection guide: Index Type Build Speed Query Speed Memory Usage Best For IVFFlat Fast Fast Low Large datasets (>1M vectors) HNSW Slow Very Fast High High accuracy requirements Exact (none) N/A Slow N/A Small datasets (<100K vectors) The ETL Pipeline Architecture MongoDB (Source) β βΌ βββββββββββββββββββββββββββββββββββββββββββ β Extraction Script β β - Batch read with pagination β β - Progress tracking β β - Error logging β βββββββββββββββββββββββββββββββββββββββββββ β βΌ βββββββββββββββββββββββββββββββββββββββββββ β Transformation Layer β β - Schema mapping β β - Type conversion β β - Data validation β β - Embedding generation β βββββββββββββββββββββββββββββββββββββββββββ β βΌ βββββββββββββββββββββββββββββββββββββββββββ β Load Script β β - Batch insert with COPY β β - Parallel processing β β - Transaction management β βββββββββββββββββββββββββββββββββββββββββββ β βΌ PostgreSQL (Destination) Implementation 1. Extraction # etl/extract.py import pymongo from typing import Iterator, Dict, Any from datetime import datetime import logging logger = logging.getLogger(__name__) class MongoExtractor: def __init__(self, connection_string: str, database: str): self.client = pymongo.MongoClient(connection_string) self.db = self.client[database] def extract_collection( self, collection_name: str, batch_size: int = 1000 ) -> Iterator[Dict[str, Any]]: """ Extract documents from MongoDB in batches Yields batches of documents to avoid memory issues """ collection = self.db[collection_name] total = collection.estimated_document_count() logger.info(f"Extracting {total:,} documents from {collection_name}") batch = [] for i, doc in enumerate(collection.find(), 1): batch.append(doc) if len(batch) >= batch_size: logger.info(f"Extracted {i:,}/{total:,} documents") yield batch batch = [] # Yield final batch if batch: yield batch def extract_with_relations( self, collection_name: str, relations: Dict[str, str], batch_size: int = 1000 ) -> Iterator[Dict[str, Any]]: """ Extract documents with related data in a single query relations: {'field_name': 'related_collection'} """ collection = self.db[collection_name] # Build aggregation pipeline with $lookup pipeline = [{'$match': {}}] for field, related_collection in relations.items(): pipeline.append({ '$lookup': { 'from': related_collection, 'localField': field, 'foreignField': '_id', 'as': field.replace('_id', '') + '_data' } }) batch = [] for i, doc in enumerate(collection.aggregate(pipeline), 1): batch.append(doc) if len(batch) >= batch_size: yield batch batch = [] if batch: yield batch 2. Transformation # etl/transform.py import uuid from typing import Any, Dict, List from datetime import datetime import numpy as np class MongoToPostgresTransformer: """Transform MongoDB documents to PostgreSQL format""" def __init__(self, collection_mapping: Dict[str, Any]): self.collection_mapping = collection_mapping def transform_document(self, mongo_doc: Dict[str, Any]) -> Dict[str, Any]: """ Transform MongoDB document to PostgreSQL row Handles: - ObjectId β UUID - ISODate β TIMESTAMPTZ - Array β ARRAY or JSONB - Nested object β JSONB """ transformed = {} for key, value in mongo_doc.items(): # Skip _id (will be generated) if key == '_id': continue # Transform based on field type if isinstance(value, ObjectId): # ObjectId β UUID transformed[key] = str(uuid.uuid4()) elif isinstance(value, datetime): # ISODate β TIMESTAMPTZ transformed[key] = value elif isinstance(value, list): # Array β PostgreSQL array or JSONB if key == 'embedding': # Vector array transformed[key] = np.array(value, dtype=np.float32) else: # Regular array transformed[key] = value elif isinstance(value, dict): # Nested object β JSONB transformed[key] = value else: transformed[key] = value return transformed def transform_batch(self, batch: List[Dict]) -> List[Dict]: """Transform a batch of documents""" return [self.transform_document(doc) for doc in batch] def validate_document(self, doc: Dict[str, Any], schema: Dict) -> bool: """Validate transformed document against schema""" required_fields = schema.get('required', []) for field in required_fields: if field not in doc: logger.error(f"Missing required field: {field}") return False return True 3. Load # etl/load.py import psycopg from psycopg import sql from psycopg.rows import dict_row from typing import List, Dict, Any import numpy as np import logging logger = logging.getLogger(__name__) class PostgresLoader: def __init__(self, connection_string: str): self.conn = psycopg.connect(connection_string) def create_tables(self, schema_file: str = 'schema.sql'): """Create tables from schema file""" with open(schema_file, 'r') as f: schema_sql = f.read() with self.conn.cursor() as cur: cur.execute(schema_sql) self.conn.commit() logger.info("Tables created successfully") def load_batch( self, table_name: str, batch: List[Dict[str, Any]], batch_size: int = 1000 ) -> int: """ Load a batch of data using COPY for performance Returns: Number of rows inserted """ if not batch: return 0 # Prepare data for COPY columns = list(batch[0].keys()) # Use COPY for bulk insert (much faster than INSERT) with self.conn.cursor() as cur: # Create temporary table temp_table = f"temp_{table_name}" cur.execute(f""" CREATE TEMP TABLE {temp_table} AS SELECT * FROM {table_name} WITH NO DATA """) # Use COPY to load into temp table with cur.copy(f"COPY {temp_table} ({', '.join(columns)}) FROM STDIN") as copy: for row in batch: # Convert row values to PostgreSQL format values = [] for col in columns: val = row[col] if isinstance(val, np.ndarray): # Vector array values.append(f"[{','.join(map(str, val))}]") elif isinstance(val, list): # Regular array values.append(f"""{{{','.join(map(repr, val))}}}""") elif isinstance(val, dict): # JSONB import json values.append(json.dumps(val)) elif isinstance(val, uuid.UUID): # UUID values.append(str(val)) else: values.append(str(val)) copy.write_row('\t'.join(map(str, values))) # Insert from temp to main table (handles duplicates) cur.execute(f""" INSERT INTO {table_name} ({', '.join(columns)}) SELECT {', '.join(columns)} FROM {temp_table} ON CONFLICT (id) DO NOTHING """) # Drop temp table cur.execute(f"DROP TABLE {temp_table}") self.conn.commit() logger.info(f"Loaded {len(batch)} rows into {table_name}") return len(batch) def create_indexes(self, indexes: List[Dict[str, Any]]): """Create indexes after data loading""" with self.conn.cursor() as cur: for index_def in indexes: try: cur.execute(index_def['sql']) logger.info(f"Created index: {index_def['name']}") except Exception as e: logger.error(f"Failed to create index {index_def['name']}: {e}") self.conn.commit() 4. Orchestration # etl/migrate.py from etl.extract import MongoExtractor from etl.transform import MongoToPostgresTransformer from etl.load import PostgresLoader import logging from datetime import datetime logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class MigrationOrchestrator: def __init__(self, config: Dict[str, Any]): self.config = config self.extractor = MongoExtractor( config['mongo_connection_string'], config['mongo_database'] ) self.transformer = MongoToPostgresTransformer( config['schema_mapping'] ) self.loader = PostgresLoader( config['postgres_connection_string'] ) def migrate_collection( self, collection_name: str, target_table: str ) -> Dict[str, Any]: """Migrate a single collection""" start_time = datetime.now() logger.info(f"Starting migration: {collection_name} β {target_table}") stats = { 'collection': collection_name, 'table': target_table, 'extracted': 0, 'loaded': 0, 'errors': 0, 'start_time': start_time, 'end_time': None, 'duration_seconds': 0 } try: # Extract, transform, load in batches for batch in self.extractor.extract_collection( collection_name, batch_size=self.config.get('batch_size', 1000) ): # Transform transformed = self.transformer.transform_batch(batch) # Validate valid_batch = [ row for row in transformed if self.transformer.validate_document( row, self.config['schema_mapping'][collection_name] ) ] # Load loaded = self.loader.load_batch(target_table, valid_batch) stats['extracted'] += len(batch) stats['loaded'] += loaded stats['errors'] += len(batch) - len(valid_batch) except Exception as e: logger.error(f"Migration failed for {collection_name}: {e}") stats['error'] = str(e) stats['end_time'] = datetime.now() stats['duration_seconds'] = (stats['end_time'] - stats['start_time']).total_seconds() logger.info( f"Migration complete: {stats['loaded']:,} rows " f"in {stats['duration_seconds']:.1f}s" ) return stats def migrate_all(self): """Migrate all collections based on config""" results = [] for collection, table in self.config['collections'].items(): result = self.migrate_collection(collection, table) results.append(result) return results # Usage if __name__ == '__main__': config = { 'mongo_connection_string': 'mongodb://localhost:27017', 'mongo_database': 'knowledge_base', 'postgres_connection_string': 'postgresql://user:pass@localhost:5432/kb', 'batch_size': 5000, 'collections': { 'documents': 'documents', 'users': 'users', 'categories': 'categories', 'tags': 'tags' }, 'schema_mapping': { 'documents': { 'required': ['title', 'content'], 'optional': ['metadata', 'status'] } } } orchestrator = MigrationOrchestrator(config) results = orchestrator.migrate_all() # Print summary print(" === Migration Summary ===") for result in results: print(f"{result['collection']:20} β {result['loaded']:>10,} rows " f"({result['duration_seconds']:.1f}s)") pgvector Setup and Configuration Installation # Install pgvector extension # Ubuntu/Debian sudo apt-get install postgresql-16-pgvector # macOS (Homebrew) brew install pgvector # Or build from source git clone --branch v0.5.0 https://github.com/pgvector/pgvector.git cd pgvector make sudo make install Database Setup -- Enable pgvector extension CREATE EXTENSION IF NOT EXISTS vector; -- Verify installation SELECT * FROM pg_extension WHERE extname = 'vector'; -- Add vector column ALTER TABLE documents ADD COLUMN embedding vector(1536); -- Create index (IVFFlat for approximate search) CREATE INDEX ON documents USING ivfflat(embedding vector_cosine_ops) WITH (lists = 100); -- For small datasets (<100K rows), exact search is fine -- No index needed, but sort by distance -- For large datasets, use HNSW for better performance CREATE INDEX ON documents USING hnsw(embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64); Vector Generation # embeddings/generate.py import openai import psycopg from typing import List import numpy as np class EmbeddingGenerator: def __init__(self, api_key: str, model: str = "text-embedding-3-small"): self.client = openai.Client(api_key=api_key) self.model = model self.dimensions = 1536 if "3-large" in model else 1536 def generate_embedding(self, text: str) -> List[float]: """Generate embedding for a single text""" response = self.client.embeddings.create( model=self.model, input=text ) return response.data[0].embedding def generate_batch(self, texts: List[str], batch_size: int = 100) -> List[List[float]]: """Generate embeddings for multiple texts efficiently""" embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i + batch_size] response = self.client.embeddings.create( model=self.model, input=batch ) embeddings.extend([item.embedding for item in response.data]) return embeddings def update_document_embeddings(self, pg_conn, doc_ids: List[str]): """Generate and store embeddings for documents""" # Fetch documents with pg_conn.cursor() as cur: cur.execute(""" SELECT id, title, content FROM documents WHERE id = ANY(%s) AND embedding IS NULL """, (doc_ids,)) documents = cur.fetchall() if not documents: return # Prepare texts for embedding texts = [ f"{title} {content}" for _, title, content in documents ] # Generate embeddings embeddings = self.generate_batch(texts) # Update database with pg_conn.cursor() as cur: for (doc_id, _, _), embedding in zip(documents, embeddings): # Convert to PostgreSQL vector format vector_str = f"[{','.join(map(str, embedding))}]" cur.execute(""" UPDATE documents SET embedding = %s::vector WHERE id = %s """, (vector_str, doc_id)) pg_conn.commit() Similarity Search -- Cosine similarity search (most common for text embeddings) SELECT id, title, 1 - (embedding <=> '[0.1,0.2,...]') as similarity FROM documents ORDER BY embedding <=> '[0.1,0.2,...]' LIMIT 10; -- Euclidean distance (L2) SELECT id, title, - (embedding <-> '[0.1,0.2,...]') as similarity FROM documents ORDER BY embedding <-> '[0.1,0.2,...]' LIMIT 10; -- Inner product (for normalized vectors) SELECT id, title, (embedding <#> '[0.1,0.2,...]') as similarity FROM documents ORDER BY embedding <#> '[0.1,0.2,...]' LIMIT 10; -- Hybrid search: vector similarity + metadata filters SELECT d.id, d.title, d.content, 1 - (d.embedding <=> $1) as similarity FROM documents d JOIN document_tags dt ON d.id = dt.document_id JOIN tags t ON dt.tag_id = t.id WHERE d.status = 'published' AND t.name IN ('AI', 'Machine Learning') AND d.published_at > NOW() - INTERVAL '1 year' ORDER BY d.embedding <=> $1 LIMIT 20; -- Vector search with reranking WITH vector_search AS ( SELECT id, title, 1 - (embedding <=> $1) as vector_similarity FROM documents ORDER BY embedding <=> $1 LIMIT 100 ), text_search AS ( SELECT id, ts_rank(text_search_vector, query) as text_similarity FROM documents, to_tsquery('english', $2) query WHERE text_search_vector @@ query LIMIT 100 ) SELECT v.id, v.title, (v.vector_similarity * 0.7 + COALESCE(t.text_similarity, 0) * 0.3) as combined_score FROM vector_search v LEFT JOIN text_search t ON v.id = t.id ORDER BY combined_score DESC LIMIT 20; Data Validation Strategy Record Count Validation # validation/validate_counts.py import pymongo import psycopg def validate_record_counts(mongo_uri: str, pg_uri: str, db_name: str): """Validate that record counts match between MongoDB and PostgreSQL""" # Connect to both databases mongo_client = pymongo.MongoClient(mongo_uri) mongo_db = mongo_client[db_name] pg_conn = psycopg.connect(pg_uri) # Get MongoDB collection counts mongo_counts = {} for collection_name in mongo_db.list_collection_names(): count = mongo_db[collection_name].estimated_document_count() mongo_counts[collection_name] = count # Get PostgreSQL table counts pg_counts = {} with pg_conn.cursor() as cur: for collection in mongo_counts.keys(): table_name = collection # Assuming 1:1 mapping cur.execute(f"SELECT COUNT(*) FROM {table_name}") pg_counts[table_name] = cur.fetchone()[0] # Compare mismatches = [] for name in mongo_counts.keys(): if mongo_counts[name] != pg_counts.get(name, 0): mismatches.append({ 'name': name, 'mongo_count': mongo_counts[name], 'pg_count': pg_counts.get(name, 0), 'difference': mongo_counts[name] - pg_counts.get(name, 0) }) if mismatches: print("β οΈ Record count mismatches found:") for m in mismatches: print(f" {m['name']}: {m['mongo_count']} (Mongo) vs {m['pg_count']} (PG) " f"(diff: {m['difference']})") else: print("β All record counts match!") return len(mismatches) == 0 Data Integrity Validation # validation/validate_data.py import random import pymongo import psycopg def validate_sample_data(mongo_uri: str, pg_uri: str, db_name: str, sample_size: int = 100): """Validate random sample records match between databases""" mongo_client = pymongo.MongoClient(mongo_uri) mongo_db = mongo_client[db_name] pg_conn = psycopg.connect(pg_uri) # Sample random collection collection_name = random.choice(mongo_db.list_collection_names()) # Get random sample from MongoDB mongo_samples = list(mongo_db[collection_name].aggregate([ {'$sample': {'size': sample_size}} ])) # Fetch corresponding records from PostgreSQL with pg_conn.cursor() as cur: for mongo_doc in mongo_samples: doc_id = str(mongo_doc['_id']) cur.execute(f"SELECT * FROM {collection_name} WHERE id = %s", (doc_id,)) pg_row = cur.fetchone() if not pg_row: print(f"β Document {doc_id} not found in PostgreSQL") continue # Validate fields validate_document(mongo_doc, pg_row) print("β Sample validation complete") def validate_document(mongo_doc: dict, pg_row: dict): """Validate individual document""" # Implement field-by-field validation # Compare values, handle type conversions, etc. pass Performance Validation # validation/validate_performance.py import time import psycopg def benchmark_vector_search(pg_conn: str, query_vector: List[float]): """Benchmark vector search performance""" conn = psycopg.connect(pg_conn) # Warm-up with conn.cursor() as cur: for _ in range(10): cur.execute(""" SELECT id FROM documents ORDER BY embedding <=> %s LIMIT 10 """, (query_vector,)) cur.fetchall() # Benchmark times = [] for _ in range(100): start = time.time() with conn.cursor() as cur: cur.execute(""" SELECT id, title, 1 - (embedding <=> %s) as similarity FROM documents ORDER BY embedding <=> %s LIMIT 10 """, (query_vector, query_vector)) results = cur.fetchall() times.append((time.time() - start) * 1000) # ms # Statistics times_sorted = sorted(times) print(f"Vector search performance (100 iterations):") print(f" Mean: {sum(times)/len(times):.2f}ms") print(f" P50: {times_sorted[50]:.2f}ms") print(f" P95: {times_sorted[95]:.2f}ms") print(f" P99: {times_sorted[99]:.2f}ms") The Cutover Plan Phase 1: Dual-Write Period (1 Week) # middleware/dual_write.py from pymongo import MongoClient import psycopg class DualWriteMiddleware: """ Write to both MongoDB and PostgreSQL during transition period """ def __init__(self, mongo_uri: str, pg_uri: str): self.mongo_client = MongoClient(mongo_uri) self.pg_conn = psycopg.connect(pg_uri) def insert_document(self, collection: str, data: dict): """Insert into both databases""" # MongoDB mongo_result = self.mongo_client.knowledge_base[collection].insert_one(data) # PostgreSQL (with converted ID) pg_data = convert_to_postgres(data) with self.pg_conn.cursor() as cur: cur.execute(f""" INSERT INTO {collection} (...) VALUES (...) """, pg_data) self.pg_conn.commit() return mongo_result.inserted_id def update_document(self, collection: str, doc_id: str, update: dict): """Update in both databases""" # MongoDB self.mongo_client.knowledge_base[collection].update_one( {'_id': ObjectId(doc_id)}, {'$set': update} ) # PostgreSQL with self.pg_conn.cursor() as cur: cur.execute(f""" UPDATE {collection} SET ... WHERE id = %s """, (doc_id, update)) self.pg_conn.commit() Phase 2: Read-Only MongoDB (3 Days) # Application configuration change # config.py DATABASE_BACKEND = 'postgresql' # Change from 'mongodb' MONGODB_READONLY = True # MongoDB for fallback only # middleware/read_adapter.py class ReadAdapter: def __init__(self, pg_conn, mongo_client): self.pg_conn = pg_conn self.mongo_client = mongo_client # Fallback def get_document(self, doc_id: str): """Read from PostgreSQL, fallback to MongoDB""" try: with self.pg_conn.cursor() as cur: cur.execute("SELECT * FROM documents WHERE id = %s", (doc_id,)) return cur.fetchone() except Exception as e: # Fallback to MongoDB doc = self.mongo_client.knowledge_base.documents.find_one({ '_id': ObjectId(doc_id) }) # Sync to PostgreSQL self.sync_to_postgres(doc) return doc Phase 3: PostgreSQL Only (Final) # Remove MongoDB dependencies # Update all database calls to use PostgreSQL only Post-Migration Optimization Query Optimization -- Analyze query performance EXPLAIN ANALYZE SELECT * FROM documents WHERE status = 'published' ORDER BY embedding <=> '[0.1,0.2,...]' LIMIT 10; -- Update statistics ANALYZE documents; -- Vacuum and reindex VACUUM ANALYZE documents; REINDEX TABLE documents; -- Check for bloat SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size, pg_size_pretty(pg_indexes_size(schemaname||'.'||tablename)) AS index_size FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC; Partitioning for Large Tables -- Partition documents by date (for very large datasets) CREATE TABLE documents ( id UUID, title VARCHAR(500), content TEXT, -- ... other columns created_at TIMESTAMPTZ NOT NULL ) PARTITION BY RANGE (created_at); -- Create partitions CREATE TABLE documents_2025_q1 PARTITION OF documents FOR VALUES FROM ('2025-01-01') TO ('2025-04-01'); CREATE TABLE documents_2025_q2 PARTITION OF documents FOR VALUES FROM ('2025-04-01') TO ('2025-07-01'); -- Automatically create future partitions CREATE OR REPLACE FUNCTION create_partitions() RETURNS void AS $$ DECLARE start_date DATE := date_trunc('quarter', CURRENT_DATE + INTERVAL '3 months'); end_date DATE := start_date + INTERVAL '3 months'; partition_name TEXT := 'documents_' || to_char(start_date, 'YYYY_q"Q"'); BEGIN EXECUTE format( 'CREATE TABLE IF NOT EXISTS %I PARTITION OF documents FOR VALUES FROM (%L) TO (%L)', partition_name, start_date, end_date ); END; $$ LANGUAGE plpgsql; Query Comparison: MongoDB vs PostgreSQL Vector Similarity Search MongoDB (requires Atlas Vector Search): // Requires Atlas, not available in self-hosted const results = await db.collection('documents').aggregate([ { "$vectorSearch": { "index": "vector_index", "path": "embedding", "queryVector": embedding, "numCandidates": 100, "limit": 10 } }, { "$project": { "title": 1, "content": 1, "score": { "$meta": "vectorSearchScore" } } } ]).toArray(); // Performance: ~200ms for 100K documents PostgreSQL + pgvector: SELECT id, title, content, 1 - (embedding <=> $1::vector) as similarity FROM documents ORDER BY embedding <=> $1::vector LIMIT 10; -- Performance: ~15ms for 100K documents (13x faster) Complex Joins MongoDB: // Requires $lookup (slow, memory-intensive) const results = await db.collection('documents').aggregate([ { "$match": { "status": "published" } }, { "$lookup": { "from": "users", "localField": "author_id", "foreignField": "_id", "as": "author" } }, { "$lookup": { "from": "categories", "localField": "category_id", "foreignField": "_id", "as": "category" } }, { "$unwind": "$author" }, { "$unwind": "$category" } ]).toArray(); // Performance: ~850ms for 1000 documents PostgreSQL: SELECT d.id, d.title, u.name as author_name, c.name as category_name FROM documents d JOIN users u ON d.author_id = u.id JOIN categories c ON d.category_id = c.id WHERE d.status = 'published' LIMIT 1000; -- Performance: ~45ms (19x faster) Aggregation MongoDB: const stats = await db.collection('documents').aggregate([ { "$group": { "_id": "$category", "count": { "$sum": 1 }, "avg_words": { "$avg": "$metadata.word_count" } } } ]).toArray(); // Performance: ~320ms for 100K documents PostgreSQL: SELECT c.name as category, COUNT(*) as count, AVG(d.word_count) as avg_words FROM documents d JOIN categories c ON d.category_id = c.id GROUP BY c.name; -- Performance: ~35ms (9x faster) Lessons Learned Mistakes We Made 1. Insufficient Testing of Edge Cases Problem: Special characters in JSONB caused issues Fix: Comprehensive validation before migration 2. Underestimated Embedding Generation Time Problem: Generating 1M embeddings took 72 hours Fix: Parallel processing with batch API calls 3. Index Build Blocked Production Problem: Building IVFFlat index locked tables Fix: Use CREATE INDEX CONCURRENTLY 4. Memory Issues During ETL Problem: Batch size too large caused OOM errors Fix: Dynamic batch sizing based on memory usage Success Factors 1. Incremental Migration Started with read-only replica Gradually shifted traffic Maintained rollback option 2. Comprehensive Monitoring Real-time validation checks Performance metrics tracking Automated alerting 3. Team Training PostgreSQL training for MongoDB-focused team Pair programming sessions Documentation and runbooks 4. Clear Communication Weekly migration updates Stakeholder dashboards Transparent risk reporting Migration Checklist Pre-Migration [ ] Schema analysis and mapping completed [ ] ETL scripts written and tested [ ] Performance baselines recorded [ ] Backup strategy documented [ ] Rollback plan defined [ ] Team training completed Migration Day [ ] Full backup taken [ ] Read-only mode enabled on MongoDB [ ] Final data sync completed [ ] Data validation passed [ ] Smoke tests executed [ ] Monitoring alerts configured Post-Migration [ ] Performance benchmarks met [ ] Error rates normal [ ] User acceptance testing passed [ ] Documentation updated [ ] Team debrief conducted [ ] Success criteria validated Conclusion Migrating from MongoDB to PostgreSQL with pgvector was a significant undertaking, but the results were transformative: 81% cost reduction ($4,200 β $800/month) 10x faster vector search (450ms β 45ms) Unified architecture (3 databases β 1) Advanced analytics (window functions, CTEs, materialized views) Strong consistency (ACID transactions) The key to success was thorough planning, incremental migration, and comprehensive validation. PostgreSQL's maturity and pgvector's capabilities made it the ideal choice for our AI-powered knowledge management platform. If you're considering a similar migration, start small, validate often, and don't underestimate the importance of team training. The investment pays dividends in performance, cost, and developer productivity. Sources: Stack Overflow Developer Survey 2025: PostgreSQL 55% Adoption Β· Instaclustr: pgvector Guide (2026) Β· SingleStore: Vector Database Landscape 2024 Frequently Asked Questions Why migrate from MongoDB to PostgreSQL? The primary drivers are ACID compliance, advanced analytics capabilities, and cost reduction. PostgreSQL supports window functions, CTEs, and materialized views that MongoDB cannot match, enabling complex analytical queries without a separate data warehouse. Organizations that migrate report storage cost reductions of 70-85% and significant improvements in query performance for relational data patterns. What is pgvector and why is it used after migrating from MongoDB? pgvector is a PostgreSQL extension that adds native vector storage and similarity search capabilities, enabling semantic search and AI-powered retrieval directly within the database. After migrating from MongoDB, teams can store document embeddings alongside structured data in a single PostgreSQL instance, eliminating the need for a separate vector database like Pinecone. Instacart's production migration to pgvector achieved 80% cost savings versus their previous search infrastructure. How do you handle MongoDB's flexible schema during PostgreSQL migration? MongoDB's schema flexibility is addressed through PostgreSQL's JSONB column type, which stores variable or nested JSON data with full indexing support. Fields with consistent structure are migrated to typed columns for performance and query simplicity, while genuinely variable nested data is retained in JSONB. A data profiling step before migration identifies which fields have consistent types (suitable for columns) versus which are genuinely polymorphic (suitable for JSONB). What is the typical downtime during a MongoDB to PostgreSQL migration? A well-planned incremental migration can achieve near-zero downtime by running both databases in parallel during the transition period. The dual-write phase synchronizes writes to both MongoDB and PostgreSQL while reads gradually shift to PostgreSQL. Final cutover (switching all reads to PostgreSQL) typically requires only 5-15 minutes of maintenance window. Large migrations with 100GB+ of data require careful bulk export and import strategies to keep this window minimal. How do you migrate MongoDB indexes to PostgreSQL? MongoDB compound indexes map to PostgreSQL multi-column B-tree indexes, text indexes map to PostgreSQL full-text search indexes (using GIN), and geospatial indexes map to PostGIS. Vector indexes from Atlas Vector Search migrate to pgvector's IVFFlat or HNSW index types. A critical step is profiling the most frequent queries in MongoDB and ensuring equivalent PostgreSQL indexes exist before cutover to prevent performance regressions. What tools are used for MongoDB to PostgreSQL data migration? The most common migration stack is: MongoDB's mongoexport for data extraction, custom Python or Node.js ETL scripts for schema transformation and type mapping, and PostgreSQL's COPY command for bulk loading. For ongoing dual-write synchronization, tools like Debezium (change data capture) or AWS DMS handle real-time replication between the two databases during the transition window. Always validate row counts and checksums after each migration batch before proceeding. Need Help with Your Database Migration? At Groovy Web, we specialize in database migrations, PostgreSQL optimization, and AI infrastructure. Whether you're moving from NoSQL to SQL, adding vector search capabilities, or optimizing performance, we can help. Schedule a free consultation to discuss your migration project. Related Services: Database Migration Services PostgreSQL Consulting RAG System Development Further Reading: pgvector Documentation PostgreSQL JSONB Guide MongoDB to PostgreSQL Migration Guide Published: January 29, 2026 | Author: Groovy Web Team | Category: Technical Deep-Dive 📋 Get the Free Checklist Download the key takeaways from this article as a practical, step-by-step checklist you can reference anytime. Email Address Send Checklist No spam. Unsubscribe anytime. Ship 10-20X Faster with AI Agent Teams Our AI-First engineering approach delivers production-ready applications in weeks, not months. Starting at $22/hr. Get Free Consultation Was this article helpful? Yes No Thanks for your feedback! We'll use it to improve our content. Written by Groovy Web Team Groovy Web is an AI-First development agency specializing in building production-grade AI applications, multi-agent systems, and enterprise solutions. We've helped 200+ clients achieve 10-20X development velocity using AI Agent Teams. Hire Us β’ More Articles