Skip to main content

MongoDB to PostgreSQL + pgvector: Our Migration Journey

A deep-dive into migrating 2.3M documents from MongoDB to PostgreSQL + pgvector. We cover schema design, ETL architecture, zero-downtime strategy β€” and achieved 10X query latency improvement with 80% infrastructure cost reduction.

Why We Migrated

At Groovy Web, we built an enterprise knowledge management platform β€” part of the broader transformation to AI-first engineering on MongoDB. It worked great for document storage and flexible schemas. But as we added AI-powered semantic search and RAG (Retrieval-Augmented Generation) capabilities, MongoDB's limitations became apparent.

The tipping point? We needed to support:

  • Vector similarity search for semantic queries
  • Complex joins across related entities
  • ACID transactions for data consistency
  • Advanced analytics with window functions
  • Full-text search combined with vector search

After extensive evaluation, we chose PostgreSQL with pgvector extension. This decision transformed our application's capabilities and performance.

In this guide, I'll share our complete migration journeyβ€”including the mistakes we made, lessons learned, and a reusable migration framework.

The Migration Decision

Evaluation Criteria

We evaluated database options based on these requirements:

Requirement MongoDB PostgreSQL MySQL Pinecone
Vector similarity ❌ Requires external βœ… pgvector ⚠️ Limited βœ… Native
ACID transactions βœ… Document-level βœ… Full support βœ… Full support ❌ No
Complex joins ❌ $lookup (slow) βœ… Optimized βœ… Optimized ❌ No
Schema flexibility βœ… Excellent ⚠️ Migration needed ⚠️ Migration needed N/A
Full-text search βœ… Built-in βœ… tsvector ⚠️ Basic βœ… Hybrid
Window functions ❌ No βœ… Full support βœ… Full support ❌ No
Materialized views ❌ No βœ… Native ⚠️ Limited ❌ No
Cost $$$$ (Atlas) $$ (self-hosted) $$ (self-hosted) $$$$ (managed)

Why PostgreSQL + pgvector?

1. Unified Data Store

Previously, we used:

  • MongoDB for documents
  • Pinecone for vectors (additional cost and complexity)
  • Redis for caching

With PostgreSQL + pgvector:

  • Documents β†’ JSONB columns
  • Vectors β†’ pgvector columns
  • Caching β†’ MATERIALIZED VIEWs
  • Single source of truth

2. Cost Savings

Before:
- MongoDB Atlas (M50 cluster): $2,400/month
- Pinecone (1M vectors): $1,200/month
- Redis Cloud (Large): $600/month
Total: $4,200/month

After:
- PostgreSQL (Managed, 8 vCPU, 32GB RAM): $800/month
- Savings: $3,400/month (81% reduction)

3. Query Performance

Vector search with metadata filtering:

MongoDB (with external vector DB):

// Required two queries
const results = await pinecone.query(vector, { topK: 100 });
const ids = results.map(r => r.id);
const documents = await mongo.collection('docs').find({
  _id: { $in: ids },
  status: 'published'  // Additional filter
}).toArray();
// Total: ~450ms for 100 results

PostgreSQL + pgvector:

-- Single query with vector + metadata filter
SELECT
  id,
  title,
  content,
  1 - (embedding <=> $1) as similarity
FROM documents
WHERE status = 'published'
ORDER BY embedding <=> $1
LIMIT 100;
-- Total: ~45ms for 100 results (10x faster)

4. ACID Transactions

-- Atomic update across multiple tables
BEGIN;
  UPDATE documents SET content = $1 WHERE id = $2;
  UPDATE document_stats SET word_count = $3 WHERE doc_id = $2;
  INSERT INTO document_revisions (doc_id, content, created_at)
    VALUES ($2, $4, NOW());
COMMIT;

Migration at a Glance

Metric Before (MongoDB) After (PostgreSQL)
Database instances 3 (Mongo, Pinecone, Redis) 1 (PostgreSQL)
Monthly cost $4,200 $800
Vector query latency 450ms 45ms
Complex join queries Not possible 25ms
Data consistency Eventual Strong
Backup/restore 4 hours 30 minutes
Team familiarity High Medium

Pre-Migration Planning

Step 1: Schema Analysis

First, we analyzed our MongoDB schemas:

// MongoDB collections analyzed
db.listCollections().toArray()

// Example: documents collection
{
  _id: ObjectId("..."),
  title: "Building AI Systems",
  content: "Full text content...",
  metadata: {
    author: "John Doe",
    category: "Technology",
    tags: ["AI", "Machine Learning"],
    created_at: ISODate("2026-01-15"),
    updated_at: ISODate("2026-01-20")
  },
  embedding: [0.1, 0.2, ...],  // 1536-dimensional vector
  status: "published",
  version: 3
}

// Indexes
db.documents.getIndexes()

Schema mapping strategy:

MongoDB Type PostgreSQL Type Notes
ObjectId UUID More readable, widely supported
String VARCHAR(n) or TEXT Use VARCHAR for indexed fields
Number NUMERIC or INTEGER Preserve precision
Date TIMESTAMPTZ Always use timezone-aware
Array TEXT[] or JSONB Depends on array contents
Object JSONB Preserve flexible schemas
Vector (Array) VECTOR(1536) pgvector extension

Step 2: Dependency Mapping

We mapped all application dependencies:

# scripts/analyze_dependencies.py
import pymongo
from collections import defaultdict

client = pymongo.MongoClient("mongodb://localhost:27017")
db = client["knowledge_base"]

# Find all collections
collections = db.list_collection_names()

# Analyze relationships
relationships = defaultdict(set)

for collection in collections:
    # Scan sample documents
    for doc in db[collection].find().limit(1000):
        for key, value in doc.items():
            # Look for references (ObjectId fields ending with _id)
            if key.endswith('_id') and isinstance(value, ObjectId):
                referenced_collection = key.replace('_id', 's')
                relationships[collection].add(referenced_collection)

# Output dependency graph
for source, targets in relationships.items():
    print(f"{source} -> {', '.join(targets)}")

Output:

documents -> users, categories, tags
comments -> documents, users
revisions -> documents
versions -> documents

Step 3: Performance Baseline

Establish baseline metrics before migration:

# scripts/benchmark_mongodb.py
import time
import pymongo

def benchmark_query(db, query_name, query_func, iterations=100):
    times = []
    for _ in range(iterations):
        start = time.time()
        result = query_func()
        times.append((time.time() - start) * 1000)  # ms

    return {
        'query': query_name,
        'avg_ms': sum(times) / len(times),
        'p50_ms': sorted(times)[len(times) // 2],
        'p95_ms': sorted(times)[int(len(times) * 0.95)],
        'p99_ms': sorted(times)[int(len(times) * 0.99)]
    }

# Define benchmark queries
queries = [
    ('simple_find', lambda: db.documents.find_one({'_id': doc_id})),
    ('complex_aggregate', lambda: db.documents.aggregate([
        {'$match': {'status': 'published'}},
        {'$lookup': {'from': 'users', 'localField': 'author_id', 'foreignField': '_id', 'as': 'author'}},
        {'$limit': 100}
    ]).to_list(None)),
    ('text_search', lambda: db.documents.find({'$text': {'$search': 'machine learning'}}).limit(50).to_list(None)),
]

for name, func in queries:
    metrics = benchmark_query(db, name, func)
    print(f"{name}: {metrics['p95_ms']:.2f}ms (p95)")

Results saved for post-migration comparison.

Schema Design Strategy

Document Collection Schema

MongoDB:

{
  _id: ObjectId("65b8a3d2e4b0f3a9c8d7e6f5"),
  title: "Building AI Systems with LangChain",
  content: "Full article content...",
  metadata: {
    author_id: ObjectId("65b8a3d2e4b0f3a9c8d7e6f4"),
    category: "Technology",
    tags: ["AI", "Machine Learning", "LangChain"],
    published_at: ISODate("2026-01-15T10:30:00Z"),
    word_count: 2500
  },
  embedding: [0.123, 0.456, ...],  // 1536 dimensions
  status: "published",
  version: 3,
  created_at: ISODate("2026-01-10T08:00:00Z"),
  updated_at: ISODate("2026-01-20T14:30:00Z")
}

PostgreSQL:

-- Main documents table
CREATE TABLE documents (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  title VARCHAR(500) NOT NULL,
  content TEXT NOT NULL,
  author_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
  category_id INTEGER REFERENCES categories(id) ON DELETE SET NULL,
  status document_status NOT NULL DEFAULT 'draft',
  version INTEGER NOT NULL DEFAULT 1,
  word_count INTEGER,
  published_at TIMESTAMPTZ,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Full-text search index
CREATE INDEX idx_documents_content_gin ON documents
  USING gin(to_tsvector('english', content));

-- Vector embedding column
ALTER TABLE documents ADD COLUMN embedding vector(1536);
CREATE INDEX idx_documents_embedding_ivfflat ON documents
  USING ivfflat(embedding vector_cosine_ops)
  WITH (lists = 100);

-- JSONB for flexible metadata
ALTER TABLE documents ADD COLUMN metadata JSONB DEFAULT '{}';
CREATE INDEX idx_documents_metadata_gin ON documents USING gin(metadata);

-- Composite index for common queries
CREATE INDEX idx_documents_status_published ON documents(status, published_at DESC)
  WHERE status = 'published';

-- Tag many-to-many relationship
CREATE TABLE document_tags (
  document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
  tag_id INTEGER REFERENCES tags(id) ON DELETE CASCADE,
  PRIMARY KEY (document_id, tag_id)
);

-- Document revisions (history tracking)
CREATE TABLE document_revisions (
  id SERIAL PRIMARY KEY,
  document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
  version INTEGER NOT NULL,
  title VARCHAR(500),
  content TEXT,
  metadata JSONB,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  created_by_id UUID REFERENCES users(id),
  UNIQUE (document_id, version)
);

-- Document statistics (materialized view)
CREATE MATERIALIZED VIEW document_statistics AS
SELECT
  d.id,
  d.title,
  d.status,
  d.word_count,
  COUNT(DISTINCT c.id) as comment_count,
  COUNT(DISTINCT v.id) as view_count,
  AVG(r.rating) as avg_rating,
  MAX(d.updated_at) as last_updated
FROM documents d
LEFT JOIN comments c ON c.document_id = d.id
LEFT JOIN views v ON v.document_id = d.id
LEFT JOIN ratings r ON r.document_id = d.id
GROUP BY d.id, d.title, d.status, d.word_count, d.updated_at;

CREATE UNIQUE INDEX ON document_statistics(id);

-- Refresh strategy (cron job or trigger)
CREATE OR REPLACE FUNCTION refresh_document_statistics()
RETURNS TRIGGER AS $$
BEGIN
  REFRESH MATERIALIZED VIEW CONCURRENTLY document_statistics;
  RETURN NULL;
END;
$$ LANGUAGE plpgsql;

Key Design Decisions

1. UUID vs ObjectId

-- We chose UUID over serial/auto-increment
CREATE TABLE documents (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),  -- UUID v4
  -- or
  id UUID PRIMARY KEY DEFAULT uuid_generate_v7()  -- UUID v7 (time-ordered)
);

Why UUID v7?

  • Time-ordered like MongoDB ObjectId
  • Globally unique across databases
  • Better indexing performance than random UUID v4
  • No exposure of record counts

2. JSONB for Flexible Metadata

-- Store flexible metadata in JSONB
UPDATE documents
SET metadata = jsonb_set(
  metadata,
  '{seo_keywords}',
  '["AI", "LangChain", "Multi-Agent Systems"]'::jsonb
)
WHERE id = '...';

-- Query JSONB fields
SELECT title, metadata->'seo_keywords' as keywords
FROM documents
WHERE metadata @> '{"featured": true}';

-- Index on JSONB paths
CREATE INDEX idx_documents_metadata_featured
  ON documents ((metadata->>'featured'))
  WHERE metadata ? 'featured';

3. Vector Index Strategy

-- IVFFlat index for approximate search (faster, less accurate)
CREATE INDEX idx_documents_embedding_ivfflat ON documents
  USING ivfflat(embedding vector_cosine_ops)
  WITH (lists = 100);  -- lists = sqrt(rows) for optimal performance

-- HNSW index for better accuracy (slower build, faster query)
CREATE INDEX idx_documents_embedding_hnsw ON documents
  USING hnsw(embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);

Index selection guide:

Index Type Build Speed Query Speed Memory Usage Best For
IVFFlat Fast Fast Low Large datasets (>1M vectors)
HNSW Slow Very Fast High High accuracy requirements
Exact (none) N/A Slow N/A Small datasets (<100K vectors)

The ETL Pipeline

Architecture

MongoDB (Source)
    β”‚
    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         Extraction Script               β”‚
β”‚  - Batch read with pagination           β”‚
β”‚  - Progress tracking                    β”‚
β”‚  - Error logging                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    β”‚
    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚      Transformation Layer               β”‚
β”‚  - Schema mapping                       β”‚
β”‚  - Type conversion                      β”‚
β”‚  - Data validation                      β”‚
β”‚  - Embedding generation                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    β”‚
    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚          Load Script                    β”‚
β”‚  - Batch insert with COPY               β”‚
β”‚  - Parallel processing                  β”‚
β”‚  - Transaction management               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    β”‚
    β–Ό
PostgreSQL (Destination)

Implementation

1. Extraction

# etl/extract.py
import pymongo
from typing import Iterator, Dict, Any
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class MongoExtractor:
    def __init__(self, connection_string: str, database: str):
        self.client = pymongo.MongoClient(connection_string)
        self.db = self.client[database]

    def extract_collection(
        self,
        collection_name: str,
        batch_size: int = 1000
    ) -> Iterator[Dict[str, Any]]:
        """
        Extract documents from MongoDB in batches

        Yields batches of documents to avoid memory issues
        """
        collection = self.db[collection_name]
        total = collection.estimated_document_count()

        logger.info(f"Extracting {total:,} documents from {collection_name}")

        batch = []
        for i, doc in enumerate(collection.find(), 1):
            batch.append(doc)

            if len(batch) >= batch_size:
                logger.info(f"Extracted {i:,}/{total:,} documents")
                yield batch
                batch = []

        # Yield final batch
        if batch:
            yield batch

    def extract_with_relations(
        self,
        collection_name: str,
        relations: Dict[str, str],
        batch_size: int = 1000
    ) -> Iterator[Dict[str, Any]]:
        """
        Extract documents with related data in a single query

        relations: {'field_name': 'related_collection'}
        """
        collection = self.db[collection_name]

        # Build aggregation pipeline with $lookup
        pipeline = [{'$match': {}}]

        for field, related_collection in relations.items():
            pipeline.append({
                '$lookup': {
                    'from': related_collection,
                    'localField': field,
                    'foreignField': '_id',
                    'as': field.replace('_id', '') + '_data'
                }
            })

        batch = []
        for i, doc in enumerate(collection.aggregate(pipeline), 1):
            batch.append(doc)

            if len(batch) >= batch_size:
                yield batch
                batch = []

        if batch:
            yield batch

2. Transformation

# etl/transform.py
import uuid
from typing import Any, Dict, List
from datetime import datetime
import numpy as np

class MongoToPostgresTransformer:
    """Transform MongoDB documents to PostgreSQL format"""

    def __init__(self, collection_mapping: Dict[str, Any]):
        self.collection_mapping = collection_mapping

    def transform_document(self, mongo_doc: Dict[str, Any]) -> Dict[str, Any]:
        """
        Transform MongoDB document to PostgreSQL row

        Handles:
        - ObjectId β†’ UUID
        - ISODate β†’ TIMESTAMPTZ
        - Array β†’ ARRAY or JSONB
        - Nested object β†’ JSONB
        """
        transformed = {}

        for key, value in mongo_doc.items():
            # Skip _id (will be generated)
            if key == '_id':
                continue

            # Transform based on field type
            if isinstance(value, ObjectId):
                # ObjectId β†’ UUID
                transformed[key] = str(uuid.uuid4())
            elif isinstance(value, datetime):
                # ISODate β†’ TIMESTAMPTZ
                transformed[key] = value
            elif isinstance(value, list):
                # Array β†’ PostgreSQL array or JSONB
                if key == 'embedding':
                    # Vector array
                    transformed[key] = np.array(value, dtype=np.float32)
                else:
                    # Regular array
                    transformed[key] = value
            elif isinstance(value, dict):
                # Nested object β†’ JSONB
                transformed[key] = value
            else:
                transformed[key] = value

        return transformed

    def transform_batch(self, batch: List[Dict]) -> List[Dict]:
        """Transform a batch of documents"""
        return [self.transform_document(doc) for doc in batch]

    def validate_document(self, doc: Dict[str, Any], schema: Dict) -> bool:
        """Validate transformed document against schema"""
        required_fields = schema.get('required', [])

        for field in required_fields:
            if field not in doc:
                logger.error(f"Missing required field: {field}")
                return False

        return True

3. Load

# etl/load.py
import psycopg
from psycopg import sql
from psycopg.rows import dict_row
from typing import List, Dict, Any
import numpy as np
import logging

logger = logging.getLogger(__name__)

class PostgresLoader:
    def __init__(self, connection_string: str):
        self.conn = psycopg.connect(connection_string)

    def create_tables(self, schema_file: str = 'schema.sql'):
        """Create tables from schema file"""
        with open(schema_file, 'r') as f:
            schema_sql = f.read()

        with self.conn.cursor() as cur:
            cur.execute(schema_sql)
        self.conn.commit()
        logger.info("Tables created successfully")

    def load_batch(
        self,
        table_name: str,
        batch: List[Dict[str, Any]],
        batch_size: int = 1000
    ) -> int:
        """
        Load a batch of data using COPY for performance

        Returns: Number of rows inserted
        """
        if not batch:
            return 0

        # Prepare data for COPY
        columns = list(batch[0].keys())

        # Use COPY for bulk insert (much faster than INSERT)
        with self.conn.cursor() as cur:
            # Create temporary table
            temp_table = f"temp_{table_name}"
            cur.execute(f"""
                CREATE TEMP TABLE {temp_table} AS
                SELECT * FROM {table_name}
                WITH NO DATA
            """)

            # Use COPY to load into temp table
            with cur.copy(f"COPY {temp_table} ({', '.join(columns)}) FROM STDIN") as copy:
                for row in batch:
                    # Convert row values to PostgreSQL format
                    values = []
                    for col in columns:
                        val = row[col]
                        if isinstance(val, np.ndarray):
                            # Vector array
                            values.append(f"[{','.join(map(str, val))}]")
                        elif isinstance(val, list):
                            # Regular array
                            values.append(f"""{{{','.join(map(repr, val))}}}""")
                        elif isinstance(val, dict):
                            # JSONB
                            import json
                            values.append(json.dumps(val))
                        elif isinstance(val, uuid.UUID):
                            # UUID
                            values.append(str(val))
                        else:
                            values.append(str(val))

                    copy.write_row('\t'.join(map(str, values)))

            # Insert from temp to main table (handles duplicates)
            cur.execute(f"""
                INSERT INTO {table_name} ({', '.join(columns)})
                SELECT {', '.join(columns)} FROM {temp_table}
                ON CONFLICT (id) DO NOTHING
            """)

            # Drop temp table
            cur.execute(f"DROP TABLE {temp_table}")

        self.conn.commit()
        logger.info(f"Loaded {len(batch)} rows into {table_name}")

        return len(batch)

    def create_indexes(self, indexes: List[Dict[str, Any]]):
        """Create indexes after data loading"""
        with self.conn.cursor() as cur:
            for index_def in indexes:
                try:
                    cur.execute(index_def['sql'])
                    logger.info(f"Created index: {index_def['name']}")
                except Exception as e:
                    logger.error(f"Failed to create index {index_def['name']}: {e}")

        self.conn.commit()

4. Orchestration

# etl/migrate.py
from etl.extract import MongoExtractor
from etl.transform import MongoToPostgresTransformer
from etl.load import PostgresLoader
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MigrationOrchestrator:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.extractor = MongoExtractor(
            config['mongo_connection_string'],
            config['mongo_database']
        )
        self.transformer = MongoToPostgresTransformer(
            config['schema_mapping']
        )
        self.loader = PostgresLoader(
            config['postgres_connection_string']
        )

    def migrate_collection(
        self,
        collection_name: str,
        target_table: str
    ) -> Dict[str, Any]:
        """Migrate a single collection"""
        start_time = datetime.now()

        logger.info(f"Starting migration: {collection_name} β†’ {target_table}")

        stats = {
            'collection': collection_name,
            'table': target_table,
            'extracted': 0,
            'loaded': 0,
            'errors': 0,
            'start_time': start_time,
            'end_time': None,
            'duration_seconds': 0
        }

        try:
            # Extract, transform, load in batches
            for batch in self.extractor.extract_collection(
                collection_name,
                batch_size=self.config.get('batch_size', 1000)
            ):
                # Transform
                transformed = self.transformer.transform_batch(batch)

                # Validate
                valid_batch = [
                    row for row in transformed
                    if self.transformer.validate_document(
                        row,
                        self.config['schema_mapping'][collection_name]
                    )
                ]

                # Load
                loaded = self.loader.load_batch(target_table, valid_batch)

                stats['extracted'] += len(batch)
                stats['loaded'] += loaded
                stats['errors'] += len(batch) - len(valid_batch)

        except Exception as e:
            logger.error(f"Migration failed for {collection_name}: {e}")
            stats['error'] = str(e)

        stats['end_time'] = datetime.now()
        stats['duration_seconds'] = (stats['end_time'] - stats['start_time']).total_seconds()

        logger.info(
            f"Migration complete: {stats['loaded']:,} rows "
            f"in {stats['duration_seconds']:.1f}s"
        )

        return stats

    def migrate_all(self):
        """Migrate all collections based on config"""
        results = []

        for collection, table in self.config['collections'].items():
            result = self.migrate_collection(collection, table)
            results.append(result)

        return results

# Usage
if __name__ == '__main__':
    config = {
        'mongo_connection_string': 'mongodb://localhost:27017',
        'mongo_database': 'knowledge_base',
        'postgres_connection_string': 'postgresql://user:pass@localhost:5432/kb',
        'batch_size': 5000,
        'collections': {
            'documents': 'documents',
            'users': 'users',
            'categories': 'categories',
            'tags': 'tags'
        },
        'schema_mapping': {
            'documents': {
                'required': ['title', 'content'],
                'optional': ['metadata', 'status']
            }
        }
    }

    orchestrator = MigrationOrchestrator(config)
    results = orchestrator.migrate_all()

    # Print summary
    print("
=== Migration Summary ===")
    for result in results:
        print(f"{result['collection']:20} β†’ {result['loaded']:>10,} rows "
              f"({result['duration_seconds']:.1f}s)")

pgvector Setup and Configuration

Installation

# Install pgvector extension
# Ubuntu/Debian
sudo apt-get install postgresql-16-pgvector

# macOS (Homebrew)
brew install pgvector

# Or build from source
git clone --branch v0.5.0 https://github.com/pgvector/pgvector.git
cd pgvector
make
sudo make install

Database Setup

-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;

-- Verify installation
SELECT * FROM pg_extension WHERE extname = 'vector';

-- Add vector column
ALTER TABLE documents ADD COLUMN embedding vector(1536);

-- Create index (IVFFlat for approximate search)
CREATE INDEX ON documents
  USING ivfflat(embedding vector_cosine_ops)
  WITH (lists = 100);

-- For small datasets (<100K rows), exact search is fine
-- No index needed, but sort by distance

-- For large datasets, use HNSW for better performance
CREATE INDEX ON documents
  USING hnsw(embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);

Vector Generation

# embeddings/generate.py
import openai
import psycopg
from typing import List
import numpy as np

class EmbeddingGenerator:
    def __init__(self, api_key: str, model: str = "text-embedding-3-small"):
        self.client = openai.Client(api_key=api_key)
        self.model = model
        self.dimensions = 1536 if "3-large" in model else 1536

    def generate_embedding(self, text: str) -> List[float]:
        """Generate embedding for a single text"""
        response = self.client.embeddings.create(
            model=self.model,
            input=text
        )
        return response.data[0].embedding

    def generate_batch(self, texts: List[str], batch_size: int = 100) -> List[List[float]]:
        """Generate embeddings for multiple texts efficiently"""
        embeddings = []

        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            response = self.client.embeddings.create(
                model=self.model,
                input=batch
            )
            embeddings.extend([item.embedding for item in response.data])

        return embeddings

    def update_document_embeddings(self, pg_conn, doc_ids: List[str]):
        """Generate and store embeddings for documents"""
        # Fetch documents
        with pg_conn.cursor() as cur:
            cur.execute("""
                SELECT id, title, content
                FROM documents
                WHERE id = ANY(%s)
                  AND embedding IS NULL
            """, (doc_ids,))

            documents = cur.fetchall()

        if not documents:
            return

        # Prepare texts for embedding
        texts = [
            f"{title}

{content}"
            for _, title, content in documents
        ]

        # Generate embeddings
        embeddings = self.generate_batch(texts)

        # Update database
        with pg_conn.cursor() as cur:
            for (doc_id, _, _), embedding in zip(documents, embeddings):
                # Convert to PostgreSQL vector format
                vector_str = f"[{','.join(map(str, embedding))}]"
                cur.execute("""
                    UPDATE documents
                    SET embedding = %s::vector
                    WHERE id = %s
                """, (vector_str, doc_id))

        pg_conn.commit()
-- Cosine similarity search (most common for text embeddings)
SELECT
  id,
  title,
  1 - (embedding <=> '[0.1,0.2,...]') as similarity
FROM documents
ORDER BY embedding <=> '[0.1,0.2,...]'
LIMIT 10;

-- Euclidean distance (L2)
SELECT
  id,
  title,
  - (embedding <-> '[0.1,0.2,...]') as similarity
FROM documents
ORDER BY embedding <-> '[0.1,0.2,...]'
LIMIT 10;

-- Inner product (for normalized vectors)
SELECT
  id,
  title,
  (embedding <#> '[0.1,0.2,...]') as similarity
FROM documents
ORDER BY embedding <#> '[0.1,0.2,...]'
LIMIT 10;

-- Hybrid search: vector similarity + metadata filters
SELECT
  d.id,
  d.title,
  d.content,
  1 - (d.embedding <=> $1) as similarity
FROM documents d
JOIN document_tags dt ON d.id = dt.document_id
JOIN tags t ON dt.tag_id = t.id
WHERE d.status = 'published'
  AND t.name IN ('AI', 'Machine Learning')
  AND d.published_at > NOW() - INTERVAL '1 year'
ORDER BY d.embedding <=> $1
LIMIT 20;

-- Vector search with reranking
WITH vector_search AS (
  SELECT
    id,
    title,
    1 - (embedding <=> $1) as vector_similarity
  FROM documents
  ORDER BY embedding <=> $1
  LIMIT 100
),
text_search AS (
  SELECT
    id,
    ts_rank(text_search_vector, query) as text_similarity
  FROM documents,
       to_tsquery('english', $2) query
  WHERE text_search_vector @@ query
  LIMIT 100
)
SELECT
  v.id,
  v.title,
  (v.vector_similarity * 0.7 + COALESCE(t.text_similarity, 0) * 0.3) as combined_score
FROM vector_search v
LEFT JOIN text_search t ON v.id = t.id
ORDER BY combined_score DESC
LIMIT 20;

Data Validation Strategy

Record Count Validation

# validation/validate_counts.py
import pymongo
import psycopg

def validate_record_counts(mongo_uri: str, pg_uri: str, db_name: str):
    """Validate that record counts match between MongoDB and PostgreSQL"""

    # Connect to both databases
    mongo_client = pymongo.MongoClient(mongo_uri)
    mongo_db = mongo_client[db_name]

    pg_conn = psycopg.connect(pg_uri)

    # Get MongoDB collection counts
    mongo_counts = {}
    for collection_name in mongo_db.list_collection_names():
        count = mongo_db[collection_name].estimated_document_count()
        mongo_counts[collection_name] = count

    # Get PostgreSQL table counts
    pg_counts = {}
    with pg_conn.cursor() as cur:
        for collection in mongo_counts.keys():
            table_name = collection  # Assuming 1:1 mapping
            cur.execute(f"SELECT COUNT(*) FROM {table_name}")
            pg_counts[table_name] = cur.fetchone()[0]

    # Compare
    mismatches = []
    for name in mongo_counts.keys():
        if mongo_counts[name] != pg_counts.get(name, 0):
            mismatches.append({
                'name': name,
                'mongo_count': mongo_counts[name],
                'pg_count': pg_counts.get(name, 0),
                'difference': mongo_counts[name] - pg_counts.get(name, 0)
            })

    if mismatches:
        print("⚠️  Record count mismatches found:")
        for m in mismatches:
            print(f"  {m['name']}: {m['mongo_count']} (Mongo) vs {m['pg_count']} (PG) "
                  f"(diff: {m['difference']})")
    else:
        print("βœ… All record counts match!")

    return len(mismatches) == 0

Data Integrity Validation

# validation/validate_data.py
import random
import pymongo
import psycopg

def validate_sample_data(mongo_uri: str, pg_uri: str, db_name: str, sample_size: int = 100):
    """Validate random sample records match between databases"""

    mongo_client = pymongo.MongoClient(mongo_uri)
    mongo_db = mongo_client[db_name]
    pg_conn = psycopg.connect(pg_uri)

    # Sample random collection
    collection_name = random.choice(mongo_db.list_collection_names())

    # Get random sample from MongoDB
    mongo_samples = list(mongo_db[collection_name].aggregate([
        {'$sample': {'size': sample_size}}
    ]))

    # Fetch corresponding records from PostgreSQL
    with pg_conn.cursor() as cur:
        for mongo_doc in mongo_samples:
            doc_id = str(mongo_doc['_id'])
            cur.execute(f"SELECT * FROM {collection_name} WHERE id = %s", (doc_id,))
            pg_row = cur.fetchone()

            if not pg_row:
                print(f"❌ Document {doc_id} not found in PostgreSQL")
                continue

            # Validate fields
            validate_document(mongo_doc, pg_row)

    print("βœ… Sample validation complete")

def validate_document(mongo_doc: dict, pg_row: dict):
    """Validate individual document"""
    # Implement field-by-field validation
    # Compare values, handle type conversions, etc.
    pass

Performance Validation

# validation/validate_performance.py
import time
import psycopg

def benchmark_vector_search(pg_conn: str, query_vector: List[float]):
    """Benchmark vector search performance"""

    conn = psycopg.connect(pg_conn)

    # Warm-up
    with conn.cursor() as cur:
        for _ in range(10):
            cur.execute("""
                SELECT id FROM documents
                ORDER BY embedding <=> %s
                LIMIT 10
            """, (query_vector,))
            cur.fetchall()

    # Benchmark
    times = []
    for _ in range(100):
        start = time.time()
        with conn.cursor() as cur:
            cur.execute("""
                SELECT id, title,
                       1 - (embedding <=> %s) as similarity
                FROM documents
                ORDER BY embedding <=> %s
                LIMIT 10
            """, (query_vector, query_vector))
            results = cur.fetchall()
        times.append((time.time() - start) * 1000)  # ms

    # Statistics
    times_sorted = sorted(times)
    print(f"Vector search performance (100 iterations):")
    print(f"  Mean: {sum(times)/len(times):.2f}ms")
    print(f"  P50: {times_sorted[50]:.2f}ms")
    print(f"  P95: {times_sorted[95]:.2f}ms")
    print(f"  P99: {times_sorted[99]:.2f}ms")

The Cutover Plan

Phase 1: Dual-Write Period (1 Week)

# middleware/dual_write.py
from pymongo import MongoClient
import psycopg

class DualWriteMiddleware:
    """
    Write to both MongoDB and PostgreSQL during transition period
    """
    def __init__(self, mongo_uri: str, pg_uri: str):
        self.mongo_client = MongoClient(mongo_uri)
        self.pg_conn = psycopg.connect(pg_uri)

    def insert_document(self, collection: str, data: dict):
        """Insert into both databases"""
        # MongoDB
        mongo_result = self.mongo_client.knowledge_base[collection].insert_one(data)

        # PostgreSQL (with converted ID)
        pg_data = convert_to_postgres(data)
        with self.pg_conn.cursor() as cur:
            cur.execute(f"""
                INSERT INTO {collection} (...)
                VALUES (...)
            """, pg_data)
        self.pg_conn.commit()

        return mongo_result.inserted_id

    def update_document(self, collection: str, doc_id: str, update: dict):
        """Update in both databases"""
        # MongoDB
        self.mongo_client.knowledge_base[collection].update_one(
            {'_id': ObjectId(doc_id)},
            {'$set': update}
        )

        # PostgreSQL
        with self.pg_conn.cursor() as cur:
            cur.execute(f"""
                UPDATE {collection}
                SET ... WHERE id = %s
            """, (doc_id, update))
        self.pg_conn.commit()

Phase 2: Read-Only MongoDB (3 Days)

# Application configuration change
# config.py
DATABASE_BACKEND = 'postgresql'  # Change from 'mongodb'
MONGODB_READONLY = True  # MongoDB for fallback only

# middleware/read_adapter.py
class ReadAdapter:
    def __init__(self, pg_conn, mongo_client):
        self.pg_conn = pg_conn
        self.mongo_client = mongo_client  # Fallback

    def get_document(self, doc_id: str):
        """Read from PostgreSQL, fallback to MongoDB"""
        try:
            with self.pg_conn.cursor() as cur:
                cur.execute("SELECT * FROM documents WHERE id = %s", (doc_id,))
                return cur.fetchone()
        except Exception as e:
            # Fallback to MongoDB
            doc = self.mongo_client.knowledge_base.documents.find_one({
                '_id': ObjectId(doc_id)
            })
            # Sync to PostgreSQL
            self.sync_to_postgres(doc)
            return doc

Phase 3: PostgreSQL Only (Final)

# Remove MongoDB dependencies
# Update all database calls to use PostgreSQL only

Post-Migration Optimization

Query Optimization

-- Analyze query performance
EXPLAIN ANALYZE
SELECT *
FROM documents
WHERE status = 'published'
ORDER BY embedding <=> '[0.1,0.2,...]'
LIMIT 10;

-- Update statistics
ANALYZE documents;

-- Vacuum and reindex
VACUUM ANALYZE documents;
REINDEX TABLE documents;

-- Check for bloat
SELECT
  schemaname,
  tablename,
  pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size,
  pg_size_pretty(pg_indexes_size(schemaname||'.'||tablename)) AS index_size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;

Partitioning for Large Tables

-- Partition documents by date (for very large datasets)
CREATE TABLE documents (
  id UUID,
  title VARCHAR(500),
  content TEXT,
  -- ... other columns
  created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);

-- Create partitions
CREATE TABLE documents_2025_q1 PARTITION OF documents
  FOR VALUES FROM ('2025-01-01') TO ('2025-04-01');

CREATE TABLE documents_2025_q2 PARTITION OF documents
  FOR VALUES FROM ('2025-04-01') TO ('2025-07-01');

-- Automatically create future partitions
CREATE OR REPLACE FUNCTION create_partitions()
RETURNS void AS $$
DECLARE
  start_date DATE := date_trunc('quarter', CURRENT_DATE + INTERVAL '3 months');
  end_date DATE := start_date + INTERVAL '3 months';
  partition_name TEXT := 'documents_' || to_char(start_date, 'YYYY_q"Q"');
BEGIN
  EXECUTE format(
    'CREATE TABLE IF NOT EXISTS %I PARTITION OF documents FOR VALUES FROM (%L) TO (%L)',
    partition_name, start_date, end_date
  );
END;
$$ LANGUAGE plpgsql;

Query Comparison: MongoDB vs PostgreSQL

MongoDB (requires Atlas Vector Search):

// Requires Atlas, not available in self-hosted
const results = await db.collection('documents').aggregate([
  {
    "$vectorSearch": {
      "index": "vector_index",
      "path": "embedding",
      "queryVector": embedding,
      "numCandidates": 100,
      "limit": 10
    }
  },
  {
    "$project": {
      "title": 1,
      "content": 1,
      "score": { "$meta": "vectorSearchScore" }
    }
  }
]).toArray();
// Performance: ~200ms for 100K documents

PostgreSQL + pgvector:

SELECT
  id,
  title,
  content,
  1 - (embedding <=> $1::vector) as similarity
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT 10;
-- Performance: ~15ms for 100K documents (13x faster)

Complex Joins

MongoDB:

// Requires $lookup (slow, memory-intensive)
const results = await db.collection('documents').aggregate([
  { "$match": { "status": "published" } },
  {
    "$lookup": {
      "from": "users",
      "localField": "author_id",
      "foreignField": "_id",
      "as": "author"
    }
  },
  {
    "$lookup": {
      "from": "categories",
      "localField": "category_id",
      "foreignField": "_id",
      "as": "category"
    }
  },
  { "$unwind": "$author" },
  { "$unwind": "$category" }
]).toArray();
// Performance: ~850ms for 1000 documents

PostgreSQL:

SELECT
  d.id,
  d.title,
  u.name as author_name,
  c.name as category_name
FROM documents d
JOIN users u ON d.author_id = u.id
JOIN categories c ON d.category_id = c.id
WHERE d.status = 'published'
LIMIT 1000;
-- Performance: ~45ms (19x faster)

Aggregation

MongoDB:

const stats = await db.collection('documents').aggregate([
  {
    "$group": {
      "_id": "$category",
      "count": { "$sum": 1 },
      "avg_words": { "$avg": "$metadata.word_count" }
    }
  }
]).toArray();
// Performance: ~320ms for 100K documents

PostgreSQL:

SELECT
  c.name as category,
  COUNT(*) as count,
  AVG(d.word_count) as avg_words
FROM documents d
JOIN categories c ON d.category_id = c.id
GROUP BY c.name;
-- Performance: ~35ms (9x faster)

Lessons Learned

Mistakes We Made

1. Insufficient Testing of Edge Cases

  • Problem: Special characters in JSONB caused issues
  • Fix: Comprehensive validation before migration

2. Underestimated Embedding Generation Time

  • Problem: Generating 1M embeddings took 72 hours
  • Fix: Parallel processing with batch API calls

3. Index Build Blocked Production

  • Problem: Building IVFFlat index locked tables
  • Fix: Use CREATE INDEX CONCURRENTLY

4. Memory Issues During ETL

  • Problem: Batch size too large caused OOM errors
  • Fix: Dynamic batch sizing based on memory usage

Success Factors

1. Incremental Migration

  • Started with read-only replica
  • Gradually shifted traffic
  • Maintained rollback option

2. Comprehensive Monitoring

  • Real-time validation checks
  • Performance metrics tracking
  • Automated alerting

3. Team Training

  • PostgreSQL training for MongoDB-focused team
  • Pair programming sessions
  • Documentation and runbooks

4. Clear Communication

  • Weekly migration updates
  • Stakeholder dashboards
  • Transparent risk reporting

Migration Checklist

Pre-Migration

  • [ ] Schema analysis and mapping completed
  • [ ] ETL scripts written and tested
  • [ ] Performance baselines recorded
  • [ ] Backup strategy documented
  • [ ] Rollback plan defined
  • [ ] Team training completed

Migration Day

  • [ ] Full backup taken
  • [ ] Read-only mode enabled on MongoDB
  • [ ] Final data sync completed
  • [ ] Data validation passed
  • [ ] Smoke tests executed
  • [ ] Monitoring alerts configured

Post-Migration

  • [ ] Performance benchmarks met
  • [ ] Error rates normal
  • [ ] User acceptance testing passed
  • [ ] Documentation updated
  • [ ] Team debrief conducted
  • [ ] Success criteria validated

Conclusion

Migrating from MongoDB to PostgreSQL with pgvector was a significant undertaking, but the results were transformative:

  • 81% cost reduction ($4,200 β†’ $800/month)
  • 10x faster vector search (450ms β†’ 45ms)
  • Unified architecture (3 databases β†’ 1)
  • Advanced analytics (window functions, CTEs, materialized views)
  • Strong consistency (ACID transactions)

The key to success was thorough planning, incremental migration, and comprehensive validation. PostgreSQL's maturity and pgvector's capabilities made it the ideal choice for our AI-powered knowledge management platform.

If you're considering a similar migration, start small, validate often, and don't underestimate the importance of team training. The investment pays dividends in performance, cost, and developer productivity.

Frequently Asked Questions

Why migrate from MongoDB to PostgreSQL?

The primary drivers are ACID compliance, advanced analytics capabilities, and cost reduction. PostgreSQL supports window functions, CTEs, and materialized views that MongoDB cannot match, enabling complex analytical queries without a separate data warehouse. Organizations that migrate report storage cost reductions of 70-85% and significant improvements in query performance for relational data patterns.

What is pgvector and why is it used after migrating from MongoDB?

pgvector is a PostgreSQL extension that adds native vector storage and similarity search capabilities, enabling semantic search and AI-powered retrieval directly within the database. After migrating from MongoDB, teams can store document embeddings alongside structured data in a single PostgreSQL instance, eliminating the need for a separate vector database like Pinecone. Instacart's production migration to pgvector achieved 80% cost savings versus their previous search infrastructure.

How do you handle MongoDB's flexible schema during PostgreSQL migration?

MongoDB's schema flexibility is addressed through PostgreSQL's JSONB column type, which stores variable or nested JSON data with full indexing support. Fields with consistent structure are migrated to typed columns for performance and query simplicity, while genuinely variable nested data is retained in JSONB. A data profiling step before migration identifies which fields have consistent types (suitable for columns) versus which are genuinely polymorphic (suitable for JSONB).

What is the typical downtime during a MongoDB to PostgreSQL migration?

A well-planned incremental migration can achieve near-zero downtime by running both databases in parallel during the transition period. The dual-write phase synchronizes writes to both MongoDB and PostgreSQL while reads gradually shift to PostgreSQL. Final cutover (switching all reads to PostgreSQL) typically requires only 5-15 minutes of maintenance window. Large migrations with 100GB+ of data require careful bulk export and import strategies to keep this window minimal.

How do you migrate MongoDB indexes to PostgreSQL?

MongoDB compound indexes map to PostgreSQL multi-column B-tree indexes, text indexes map to PostgreSQL full-text search indexes (using GIN), and geospatial indexes map to PostGIS. Vector indexes from Atlas Vector Search migrate to pgvector's IVFFlat or HNSW index types. A critical step is profiling the most frequent queries in MongoDB and ensuring equivalent PostgreSQL indexes exist before cutover to prevent performance regressions.

What tools are used for MongoDB to PostgreSQL data migration?

The most common migration stack is: MongoDB's mongoexport for data extraction, custom Python or Node.js ETL scripts for schema transformation and type mapping, and PostgreSQL's COPY command for bulk loading. For ongoing dual-write synchronization, tools like Debezium (change data capture) or AWS DMS handle real-time replication between the two databases during the transition window. Always validate row counts and checksums after each migration batch before proceeding.


Need Help with Your Database Migration?

At Groovy Web, we specialize in database migrations, PostgreSQL optimization, and AI infrastructure. Whether you're moving from NoSQL to SQL, adding vector search capabilities, or optimizing performance, we can help.

Schedule a free consultation to discuss your migration project.


Related Services:


Further Reading:


Published: January 29, 2026 | Author: Groovy Web Team | Category: Technical Deep-Dive

Ship 10-20X Faster with AI Agent Teams

Our AI-First engineering approach delivers production-ready applications in weeks, not months. Starting at $22/hr.

Get Free Consultation

Was this article helpful?

Groovy Web Team

Written by Groovy Web Team

Groovy Web is an AI-First development agency specializing in building production-grade AI applications, multi-agent systems, and enterprise solutions. We've helped 200+ clients achieve 10-20X development velocity using AI Agent Teams.

Ready to Build Your App?

Get a free consultation and see how AI-First development can accelerate your project.

1-week free trial No long-term contract Start in 1-2 weeks
Get Free Consultation
Start a Project

Got an Idea?
Let's Build It Together

Tell us about your project and we'll get back to you within 24 hours with a game plan.

Response Time

Within 24 hours

247+ Projects Delivered
10+ Years Experience
3 Global Offices

Follow Us

Only 3 slots available this month

Hire AI-First Engineers
10-20Γ— Faster Development

For startups & product teams

One engineer replaces an entire team. Full-stack development, AI orchestration, and production-grade delivery β€” starting at just $22/hour.

Helped 8+ startups save $200K+ in 60 days

10-20Γ— faster delivery
Save 70-90% on costs
Start in 1-2 weeks

No long-term commitment Β· Flexible pricing Β· Cancel anytime