Improving Query Performance via PostgreSQL Table Partitioning – Part II


Welcome to Part II series of exploring postgresql partitioning. If you haven’t already checked out Part I, it has some pre-requisite information. This helps you find out if partitioning is the right solution for you, as it did for me.

Table of Contents

  1. Key Information To Know Before Starting
    1. Partitioned Table Characteristics
    2. pg_partman
      1. Without pg_partman:
      2. With pg_partman:
    3. pg_cron
  2. Data Migration Plan
    1. Here is the plan:
    2. My Tech stacks are:
    3. Pre-requisites: Install Extensions
    4. Step 1 – Step 4:
      1. Step 1 – Create Extensions In DB:
      2. Generate Migration files
      3. Steps 2-3 – Precutover
      4. Steps 4 – Cutover
    5. Next Steps:
  3. References:

Key Information To Know Before Starting

Partitioned Table Characteristics

  1. Which column will be used for partitioning?
    • timestamp. Example ‘2025-05-05 00:00:00’
  2. Which type of partitioning?
    • RANGE – because the column is time series base.
  3. Declarative or Inheritance?
    • Declarative

In addition, I would like to introduce two commonly used extensions. The extensions will automate everything needed. This ensures no future interventions are needed.

pg_partman

Think of pg_partman as an automated filing system for your database.

Imagine you have millions of documents coming into your office every month. Instead of dumping them all in one giant filing cabinet (making it impossible to find anything quickly), you want to organize them into monthly folders.

pg_partman is like hiring an assistant who:

  • Creates new monthly folders before you need them
  • Labels each folder with the correct date range
  • Removes old folders you no longer need (based on your retention policy)
  • Maintains the filing system without you having to think about it

Without pg_partman:

“Boss, we need a June 2025 folder!”
“Oh no, I forgot to create it… now all June documents are in the ‘miscellaneous’ pile!”

With pg_partman:

“Boss, June 2025 folder is already ready – I created it 12 months ago!”
“Perfect, I don’t even have to think about it.”

pg_cron

pg_cron is like a scheduler or alarm clock for your database. It runs tasks automatically at times you specify.

Think of it like:

  • Setting your coffee maker to brew at 6 AM every day
  • Scheduling your sprinklers to water the lawn every evening
  • Having your bills auto-pay on the 1st of each month

When combined together, these two extensions will “schedule” the “assistant” to create new partitions, and remove existing ones. No need to login 1 year later when another team has taken over the project and then calls you up due to P1 production issue.

Data Migration Plan

If you’re in a typical tech department:

  • Oh crap! I didn’t create this table as a partitioned table the first time. Now I need to migrate the data to the partitioned table.
  • Oh no, there is live business impacting activity that is using the table 24/7, I need to migrate this data while the business is running!

Well fear not. I have just the solution for you! (Because I went through the same thing).

Just a disclaimer, if your architecture does not allow downtime at all (more than 5 minutes), you may need additional considerations aside from the ones I provide you below. In my case, the application allow temporary downtime in some odd hours for cutover.

Here is the plan:

  1. Enable the extensions of pg_partman, and pg_cron in your postgres cluster. (Downtime 1 minute)
    • In my company, I used Amazon Aurora RDS Postgresql managed service. This involves enabling the shared_preload_libraries name after creating db parameters group.
    • pg_partman library was automatically added for me. But for pg_cron, you still need to add it.
    • If you’re using terraform, here is a great module.
    • You may need to restart the database instance as it requires loading the library. This is the 1 minute downtime.
  2. Create the New Partitioned Table, migration functions, and sync triggers. (No Downtime)
    • <schema>.<your old table name>_partitioned. Ex: transactions –> transactions_partitioned
    • sync triggers are used for migrating data while there are new data coming in real time
    • manual migration function is available in case you want to do this in one shot. (Much faster)
    • migration table status to see what data has been migrated.
  3. Precutover: Let the job run in the background for migrating to the new table. (No downtime)
  4. Postcutover: Move to using the new partitioned table (Downtime 5 minutes)
    • Pause all queues/jobs related to inserting into the database. This is the 5 minute downtime.
    • rename the partitioned table transactions_partitioned –> transactions
    • rename the old non-partitioned table transactions –> transactions_old in case you want to revert or something went wrong.
    • drop sync triggers and cron job for migrating existing data.
    • Resume all queues/jobs related to inserting into the database.

Looking at the activities, one may think “wow that is a lot of steps”. In my tech stack, I have this completely automated. I’ll share the code snippets here, but even if you’re not using the same tech stack, I’ll try to go over some of the steps in case you have to recreate them in another framework/language. Nowadays, you probably could just chatgpt translate it.

My Tech stacks are:

  • Terraformed Amazon Aurora RDS Postgresql
  • Python 3.12+
  • Alembic for database migrations
  • SQLAlechemy for database modeling/ORM
  • Poetry for environment managing (and scripts organization)
  • pg_partman version 5.1.0
  • pg_cron version 1.6.2
  • postgres 16+

Pre-requisites: Install Extensions

Extensions code are not installed by default into databases. To use extensions, first we need to get them installed, second, use the “CREATE EXTENSIONS” sql command in the database.

  • As I mentioned earlier, pg_partman is already installed, but we need to explicitly install pg_cron.
terraform
module "aurora_postgres_cluster" {
  depends_on = [module.vpc]
  for_each = var.aurora_postgres
  source   = "terraform-aws-modules/rds-aurora/aws"
  version  = "9.9.1"
  #   https://github.com/terraform-aws-modules/terraform-aws-rds-aurora
  ## other inputs skipped
  create_db_parameter_group = true
  db_parameter_group_family = "aurora-postgresql16"
  db_parameter_group_parameters = lookup(each.value, "db_parameter_group_parameters", [
    {
      name         = "shared_preload_libraries"
      value        = "pg_stat_statements,pg_cron"
      apply_method = "pending-reboot"
    }
  ])

Upon deploying, you’ll need to reboot the database instance manually. THIS IS A DOWNTIME ACTIVITY. So be sure to pause any jobs and communicate to stakeholders.

Step 1 – Step 4:

The rest of the steps only involves creation of migration files and using alembic +upgrade or -downgrade to run the migrations.

Step 1 – Create Extensions In DB:

Create below file in your alembic repository folder. This is a one time creation assuming the extension never been enabled in the database.

alembic/versions/2025_05_01_install_pg_partman.py
"""Install extensions pg_partman pg_cron

Revision ID: install_pgpartman
Revises: <your previous id>
Create Date: 2025-06-25 15:28:35.277865

"""

from alembic import op
import sqlalchemy as sa


import logging
from sqlalchemy import text

logger = logging.getLogger(__name__)

revision = 'install_pgpartman'
down_revision = '<your previous id>'
branch_labels = None
depends_on = None

def check_version(connection):
    result = connection.execute(text("SELECT version_num FROM alembic_version")).fetchone()
    if result:
        current_version = result[0]
        logger.info(f"Current DB version: {current_version}")
        if current_version != "<your current id>":
            raise Exception(f"Expected version <your current id> but found {current_version}")
    else:
        logger.info("No version found in alembic_version table.")

def upgrade():
    # Install pg_partman extension
    op.execute("CREATE SCHEMA IF NOT EXISTS partman")
    op.execute("CREATE EXTENSION IF NOT EXISTS pg_partman SCHEMA partman")
    op.execute("CREATE EXTENSION IF NOT EXISTS pg_cron")

    # Grant necessary permissions
    op.execute("GRANT ALL ON SCHEMA partman TO CURRENT_USER")
    op.execute("GRANT ALL ON ALL TABLES IN SCHEMA partman TO CURRENT_USER")
    op.execute("GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA partman TO CURRENT_USER")

def downgrade():
    op.execute("DROP EXTENSION IF EXISTS pg_partman CASCADE")
    op.execute("DROP SCHEMA IF EXISTS partman CASCADE")
    op.execute("DROP EXTENSION IF EXISTS pg_cron CASCADE")

#afterwards run below:

> alembic upgrade install_pgpartman

Generate Migration files

Below is a migration file generator for any tables you want to partition in future. Generates files for Steps 2-4. Create them under the same alembic folder

alembic/scripts/migrations/generate_partition_migration.py
#!/usr/bin/env python
"""
Generate Alembic migration files for table partitioning.
"""
import argparse
import os
import subprocess
from datetime import datetime, timedelta
from pathlib import Path

# Template for partition creation migration
PARTITION_MIGRATION_TEMPLATE = '''"""Partition {table} table

Creates partitioned version of {schema}.{table} using pg_partman.
"""
from typing import Sequence, Union
import sys
from pathlib import Path
from alembic import op

# Add helpers to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from scripts.migrations.partitioning import PartitioningMigrationHelper, get_table_indexes, get_table_constraints

revision: str = '{revision}'
down_revision: Union[str, None] = {down_revision}
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Create partitioned table and migrate data from {schema}.{table}."""
    connection = op.get_bind()
    
    # Get existing table structure
    indexes = get_table_indexes(connection, '{schema}', '{table}')
    constraints = get_table_constraints(connection, '{schema}', '{table}')
    
    # Initialize helper
    helper = PartitioningMigrationHelper(
        schema='{schema}',
        table_name='{table}',
        partition_key='{key}'
    )
    
    # Adjust primary keys to include partition key
    primary_keys = constraints['primary_keys']
    if '{key}' not in primary_keys:
        primary_keys.append('{key}')
    
    # Adjust unique constraints to include partition key
    unique_constraints = []
    for constraint in constraints['unique_constraints']:
        if '{key}' not in constraint:
            constraint.append('{key}')
        unique_constraints.append(constraint)
    
    # Adjust index names for partitioned table
    partitioned_indexes = []
    for idx in indexes:
        new_name = idx['name'].replace('{table}', '{table}_partitioned')
        partitioned_indexes.append({{'name': new_name, 'columns': idx['columns']}})
    
    # Step 1: Create partitioned table
    helper.create_partitioned_table_from_existing(
        primary_keys=primary_keys,
        unique_constraints=unique_constraints,
        indexes=partitioned_indexes
    )
    
    # Step 2: Setup pg_partman
    helper.setup_pg_partman(
        partition_type='native',
        partition_interval='{interval}',
        premake={premake}
    )
    
    # Step 3: Create sync trigger
    helper.create_sync_trigger()
    
    # Step 4: Migrate data
    helper.migrate_data_using_partman(
        batch_count={batch_size},
        batch_interval=100,
        analyze=True
    )
    
    # Step 5: Schedule batch migration job (optional)
    
    helper.create_batch_migration_job(
        schedule='*/1 * * * *',  # Every minute
        batch_size={batch_size}
    )
    
    # To run it manually: CALL {schema}.migrate_{table}_batch();
    
    # Step 6: Schedule pg_partman maintenance job
    helper.create_maintenance_job(
        schedule='0 0 * * *'  # Daily at 12am
    )


def downgrade() -> None:
    """Remove partitioned table and related objects."""
    connection = op.get_bind()
    
    # Drop sync trigger and function
    op.execute("DROP TRIGGER IF EXISTS sync_{table}_trigger ON {schema}.{table}")
    op.execute("DROP FUNCTION IF EXISTS {schema}.sync_{table}_to_partitioned() CASCADE")
    
    # Drop batch migration procedure
    op.execute("DROP PROCEDURE IF EXISTS {schema}.migrate_{table}_batch() CASCADE")
    
    # Clean up migration artifacts
    helper = PartitioningMigrationHelper(
        schema='{schema}',
        table_name='{table}'
    )
    helper.cleanup_migration_artifacts(drop_migration_table=True, rm_partman_maintenance=True)
    
    # Drop partitioned table
    op.execute("DROP TABLE IF EXISTS {schema}.{table}_partitioned CASCADE")
    
    # Clean up pg_partman configuration
    op.execute("DELETE FROM partman.part_config WHERE parent_table = '{schema}.{table}_partitioned'")


'''

# Template for cutover migration
CUTOVER_MIGRATION_TEMPLATE = '''"""Cutover {table} to partitioned table

Performs cutover from {schema}.{table} to partitioned version.
"""
from typing import Sequence, Union
import sys
from pathlib import Path
from alembic import op
from sqlalchemy import text

# Add helpers to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from scripts.migrations.partitioning import PartitioningMigrationHelper

revision: str = '{revision}'
down_revision: Union[str, None] = {down_revision}
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Perform cutover to partitioned table."""
    connection = op.get_bind()
    
    # Verify migration is complete
    result = connection.execute(text("""
        SELECT 
            (SELECT COUNT(*) FROM {schema}.{table}) as original_count,
            (SELECT COUNT(*) FROM {schema}.{table}_partitioned) as partitioned_count
    """)).fetchone()
    
    print(f"Original table rows: {{result[0]}}")
    print(f"Partitioned table rows: {{result[1]}}")
    
    if result[0] != result[1]:
        raise Exception(
            f"Row counts don't match! Original: {{result[0]}}, Partitioned: {{result[1]}}. "
            "Ensure migration is complete before cutover."
        )
    
    # Initialize helper
    helper = PartitioningMigrationHelper(
        schema='{schema}',
        table_name='{table}'
    )
    
    # Perform cutover
    helper.perform_cutover(drop_old_table=False)
    
    # Clean up migration artifacts
    helper.cleanup_migration_artifacts()
    
    print("Cutover completed. Old table preserved as {schema}.{table}_old")


def downgrade() -> None:
    """Revert the cutover."""
    connection = op.get_bind()
    
    # Revert table names
    connection.execute(text("BEGIN"))
    try:
        connection.execute(text(
            "ALTER TABLE {schema}.{table} RENAME TO {table}_partitioned"
        ))
        connection.execute(text(
            "ALTER TABLE {schema}.{table}_old RENAME TO {table}"
        ))
        
        # Update pg_partman
        connection.execute(text("""
            UPDATE partman.part_config 
            SET parent_table = '{schema}.{table}_partitioned'
            WHERE parent_table = '{schema}.{table}'
        """))
        
        connection.execute(text("COMMIT"))
    except Exception as e:
        connection.execute(text("ROLLBACK"))
        raise
    helper = PartitioningMigrationHelper(schema='{schema}', table_name='{table}', partition_key='{key}')
    helper.create_sync_trigger()
    helper.create_batch_migration_job(schedule='*/1 * * * *',  batch_size={batch_size})
    helper.create_maintenance_job(schedule='0 0 * * *' )
'''


def generate_revision_id():
    """Generate a unique revision ID."""
    return os.urandom(6).hex()


def get_latest_revision(versions_dir):
    """Get the latest Alembic revision."""
    try:
        common_dir = versions_dir.parent.parent  # common directory
        result = subprocess.run(
            ['poetry', 'run', 'alembic:local', 'current'],
            capture_output=True,
            text=True,
            check=True,
            cwd=str(common_dir)
        )
        for line in result.stdout.split('\n'):
            if ' (head)' in line:
                return line.split()[0]
        return None
    except:
        return None


def main():
    parser = argparse.ArgumentParser(
        description='Generate Alembic migrations for table partitioning'
    )
    parser.add_argument('--schema', required=True, help='Database schema')
    parser.add_argument('--table', required=True, help='Table name')
    parser.add_argument('--key', default='timestamp', help='Partition key column (default: timestamp)')
    parser.add_argument('--interval', default='monthly',
                        choices=['daily', 'weekly', 'monthly', 'yearly'],
                        help='Partition interval (default: monthly)')
    parser.add_argument('--premake', type=int, default=12,
                        help='Number of future partitions to create (default: 12)')
    parser.add_argument('--batch-size', type=int, default=10000,
                        help='Batch size for data migration (default: 10000)')
    parser.add_argument('--output-dir', default='versions',
                        help='Output directory for migrations')
    args = parser.parse_args()
    now = datetime.now()
    timestamp = now.strftime('%Y_%m_%d_%H_%M_%S')
    partition_timestamp = timestamp
    cutover_timestamp = (now + timedelta(seconds=1)).strftime('%Y_%m_%d_%H_%M_%S')
    common_dir = Path(__file__).parent.parent.parent  # common directory
    versions_dir = common_dir / 'alembic' / 'versions'
    if args.output_dir != 'versions':
        versions_dir = Path(args.output_dir)
    versions_dir.mkdir(exist_ok=True)
    latest_revision = get_latest_revision(versions_dir)
    # Default to install_pgpartman if we don't have a latest revision
    # since pg_partman is required for partitioning
    if not latest_revision:
        latest_revision = 'install_pgpartman'
    partition_revision = generate_revision_id()
    partition_filename = f"{partition_timestamp}_{partition_revision}_partition_{args.table}.py"
    partition_content = PARTITION_MIGRATION_TEMPLATE.format(
        schema=args.schema,
        table=args.table,
        key=args.key,
        interval=args.interval,
        premake=args.premake,
        batch_size=args.batch_size,
        revision=partition_revision,
        down_revision=repr(latest_revision)
    )
    cutover_revision = generate_revision_id()
    cutover_filename = f"{cutover_timestamp}_{cutover_revision}_cutover_{args.table}.py"
    cutover_content = CUTOVER_MIGRATION_TEMPLATE.format(
        schema=args.schema,
        table=args.table,
        key=args.key,
        revision=cutover_revision,
        down_revision=repr(partition_revision),
        batch_size=args.batch_size
    )
    partition_path = versions_dir / partition_filename
    with open(partition_path, 'w') as f:
        f.write(partition_content)
    cutover_path = versions_dir / cutover_filename
    with open(cutover_path, 'w') as f:
        f.write(cutover_content)
    print(f"Generated migrations:")
    print(f"  1. Partition: {partition_path}")
    print(f"  2. Cutover:   {cutover_path}")
    print()
    print("Next steps:")
    print(f"  1. Review and edit the generated files")
    print(f"  2. Run: poetry run alembic upgrade {partition_revision}")
    print(f"  3. Monitor migration: SELECT * FROM partman.partition_migration_status")
    print(f"  4. When complete, run: poetry run alembic upgrade {cutover_revision}")


if __name__ == '__main__':
    main()

alembic/scripts/migrations/partitioning.py
"""
Alembic Partitioning Migration Helpers

This module provides reusable functions for creating partitioned tables,
setting up pg_partman, migrating data, and performing cutover operations.
The Original table is renamed to _old, and the Partitioned table is renamed to the original name.
Preconditions:
- The original table must exist
- The partitioned table must not exist
- The partition key column must exist in the original table eg. "timestamp"
- The pg_partman extension must be installed
- The pg_cron extension must be installed

Postcutover:
- The original table is renamed to _old
- The partitioned table is renamed to the original name
- Sequences are updated

"""
import logging
from typing import List, Dict, Optional, Any

from sqlalchemy import text

from alembic import op

logger = logging.getLogger(__name__)


class PartitioningMigrationHelper:
    """Helper class for partitioning migrations."""

    def __init__(self, schema: str, table_name: str, partition_key: str = 'timestamp'):
        """
        Initialize the partitioning helper.
        
        Args:
            schema: Database schema name
            table_name: Base table name (without _partitioned suffix)
            partition_key: Column to partition by (default: 'timestamp')
        """
        self.schema = schema
        self.table_name = table_name
        self.partition_key = partition_key
        self.partitioned_table = f"{table_name}_partitioned"
        self.old_table = f"{table_name}_old"
        self.node_name = 'localhost'

    def get_database_nodename(self) -> Optional[str]:
        """
        Get the database nodename for pg_cron configuration.
        
        Returns:
            The nodename (IP address or hostname) of the database server
        """
        connection = op.get_bind()
        result = connection.execute(text("SELECT inet_server_addr() as nodename"))
        row = result.fetchone()
        return row.nodename if row and row.nodename else None

    def create_migration_status_table(self):
        """Create the partition migration status table if it doesn't exist."""
        connection = op.get_bind()
        print("Creating partition migration status table")
        connection.execute(text(f"""
            CREATE TABLE IF NOT EXISTS {self.schema}.partition_migration_status (
                id SERIAL PRIMARY KEY,
                table_name TEXT NOT NULL,
                batch_start TIMESTAMP,
                batch_end TIMESTAMP,
                rows_migrated INTEGER,
                last_id INTEGER,
                started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                completed_at TIMESTAMP,
                error_message TEXT
            )
        """))

    def create_partitioned_table_from_existing(self, primary_keys: List[str], unique_constraints: List[List[str]], foreign_keys: Optional[List[Dict[str, Any]]] = None,
                                               indexes: Optional[List[Dict[str, str]]] = None,
                                               check_constraints: Optional[List[Dict[str, str]]] = None):
        """
        Create a partitioned table based on an existing table structure.
        
        Args:
            primary_keys: List of primary key columns (must include partition key)
            unique_constraints: List of unique constraint column lists
            foreign_keys: List of foreign key definitions
            indexes: List of index definitions with 'name' and 'columns'
            check_constraints: List of check constraint definitions
        """
        connection = op.get_bind()
        print(f"Creating partitioned table {self.schema}.{self.partitioned_table}")
        print("\n" + "=" * 80)
        print(f"PARTITIONING MIGRATION CONFIGURATION")
        print("=" * 80)
        print(f"Source Table:      {self.schema}.{self.table_name}")
        print(f"Target Table:      {self.schema}.{self.partitioned_table}")
        print(f"Partition Key:     {self.partition_key}")
        print(f"Partition Method:  RANGE partitioning")
        print("-" * 80)

        # Primary Keys
        print(f"Primary Keys:      {', '.join(primary_keys) if primary_keys else 'None detected'}")
        if primary_keys and self.partition_key not in primary_keys:
            print(f"                   → Will become: ({', '.join(primary_keys + [self.partition_key])})")

        # Unique Constraints  
        print(f"\nUnique Constraints: {len(unique_constraints)} found")
        for i, constraint in enumerate(unique_constraints, 1):
            print(f"  {i}. ({', '.join(constraint)})")
            if self.partition_key not in constraint:
                print(f"     → Will become: ({', '.join(constraint + [self.partition_key])})")

        # Indexes
        print(f"\nIndexes:           {len(indexes)} to be created")
        for idx in indexes:
            print(f"  - {idx['name']} on ({', '.join(idx['columns']) if isinstance(idx['columns'], list) else idx['columns']})")

        # Foreign Keys
        if foreign_keys:
            print(f"\nForeign Keys:      {len(foreign_keys)} to be created")
            for fk in foreign_keys:
                print(f"  - {fk.get('name', 'unnamed')}: {fk['column']} → {fk['references']}({fk['ref_column']})")

        print("=" * 80 + "\n")
        user_response = input("Do you want to continue with the partition creation? (yes/y or no/n): ").strip().lower()
        if user_response not in ['yes', 'y']:
            print("Partition creation cancelled by user.")
            import sys
            sys.exit(0)
        print("\nContinuing with partition creation...\n")
        # Get column definitions from existing table
        create_sql = f"""
        CREATE TABLE {self.schema}.{self.partitioned_table} (
            LIKE {self.schema}.{self.table_name} INCLUDING DEFAULTS INCLUDING COMMENTS
        ) PARTITION BY RANGE ({self.partition_key})
        """
        connection.execute(text(create_sql))
        # Add primary key constraint
        pk_constraint = f"pk_{self.partitioned_table}"
        pk_columns = ', '.join(primary_keys)
        connection.execute(text(f"""
            ALTER TABLE {self.schema}.{self.partitioned_table} 
            ADD CONSTRAINT {pk_constraint} PRIMARY KEY ({pk_columns})
        """))
        # Add unique constraints
        for i, constraint_cols in enumerate(unique_constraints):
            constraint_name = f"uq_{self.partitioned_table}_{i}"
            columns = ', '.join(constraint_cols)
            connection.execute(text(f"""
                ALTER TABLE {self.schema}.{self.partitioned_table}
                ADD CONSTRAINT {constraint_name} UNIQUE ({columns})
            """))
        # Add foreign keys
        if foreign_keys:
            for fk in foreign_keys:
                fk_name = fk.get('name', f"fk_{self.partitioned_table}_{fk['column']}")
                connection.execute(text(f"""
                    ALTER TABLE {self.schema}.{self.partitioned_table}
                    ADD CONSTRAINT {fk_name} FOREIGN KEY ({fk['column']})
                    REFERENCES {fk['references']} ({fk['ref_column']})
                    {fk.get('options', '')}
                """))

        # Create indexes
        if indexes:
            for index in indexes:
                index_name = index['name']
                columns = ', '.join(index['columns']) if isinstance(index['columns'], list) else index['columns']
                connection.execute(text(f"""
                    CREATE INDEX IF NOT EXISTS {index_name} 
                    ON {self.schema}.{self.partitioned_table} ({columns})
                """))

        # Add check constraints
        if check_constraints:
            for constraint in check_constraints:
                connection.execute(text(f"""
                    ALTER TABLE {self.schema}.{self.partitioned_table}
                    ADD CONSTRAINT {constraint['name']} CHECK ({constraint['check']})
                """))

        self.create_migration_status_table()

    def setup_pg_partman(self, partition_type: str = 'native', partition_interval: str = '1 month', premake: int = 12, retention_keep_table: bool = True,
                         retention_keep_index: bool = True,
                         inherit_privileges: bool = True):
        """
        Set up pg_partman for the partitioned table.

        Args:
            partition_type: Type of partitioning ('native' or 'trigger')
            partition_interval: Partitioning interval (e.g., 'monthly', 'daily')
            premake: Number of partitions to create in advance
            retention_keep_table: Whether to keep tables when retention is applied
            retention_keep_index: Whether to keep indexes when retention is applied
            inherit_privileges: Whether child tables inherit parent privileges
        """
        connection = op.get_bind()
        print(f"Setting up pg_partman for {self.schema}.{self.partitioned_table}")
        interval_map = {
            'monthly': '1 month',
            'weekly': '1 week',
            'daily': '1 day',
            'hourly': '1 hour'
        }
        actual_interval = interval_map.get(partition_interval, partition_interval)

        # Create parent partition - tested on version pg_partman 5.1.0. Versions below 5.1.0 uses different syntax.
        connection.execute(text(f"""
            SELECT partman.create_parent(
                '{self.schema}.{self.partitioned_table}',  -- p_parent_table
                '{self.partition_key}',                     -- p_control
                '{actual_interval}',                        -- p_interval (must be before p_type!)
                'range',                                    -- p_type (always 'range' for v5)
                'none',                                     -- p_epoch
                {premake},                                  -- p_premake
                NULL,                                       -- p_start_partition
                true,                                       -- p_default_table
                'on'                                        -- p_automatic_maintenance
            )
        """))
        print(f"Created parent partition for {self.schema}.{self.partitioned_table}")
        connection.execute(text(f"""
            UPDATE partman.part_config 
            SET retention_keep_table = {retention_keep_table},
                retention_keep_index = {retention_keep_index},
                inherit_privileges = {inherit_privileges}
            WHERE parent_table = '{self.schema}.{self.partitioned_table}'
        """))

    def create_sync_trigger(self) -> None:
        """Create sync trigger for real-time data synchronization during migration."""
        connection = op.get_bind()
        columns = []
        result = connection.execute(text(f"""
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_schema = '{self.schema}' 
            AND table_name = '{self.table_name}'
            ORDER BY ordinal_position
        """))
        columns = [row[0] for row in result]
        columns_str = ', '.join(columns)
        values_str = ', '.join([f'NEW.{col}' for col in columns])

        # Generate SET clause for UPDATE (excluding primary key columns)
        # Get primary key columns
        pk_result = connection.execute(text(f"""
            SELECT a.attname
            FROM pg_index i
            JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
            WHERE i.indrelid = '{self.schema}.{self.table_name}'::regclass
            AND i.indisprimary
        """))
        primary_keys = [row[0] for row in pk_result]

        # Exclude primary keys from update
        update_cols = [col for col in columns if col not in primary_keys]
        set_clause = ',\n                    '.join([f'{col} = NEW.{col}' for col in update_cols])

        # Create the sync function
        connection.execute(text(f"""
            CREATE OR REPLACE FUNCTION {self.schema}.sync_{self.table_name}_to_partitioned()
            RETURNS TRIGGER AS $$
            BEGIN
                IF TG_OP = 'INSERT' THEN
                    INSERT INTO {self.schema}.{self.partitioned_table} 
                        ({columns_str})
                    VALUES 
                        ({values_str})
                    ON CONFLICT DO NOTHING;
                    RETURN NEW;
                ELSIF TG_OP = 'UPDATE' THEN
                    UPDATE {self.schema}.{self.partitioned_table}
                    SET {set_clause}
                    WHERE {' AND '.join([f'{pk} = NEW.{pk}' for pk in primary_keys])};
                    RETURN NEW;
                ELSIF TG_OP = 'DELETE' THEN
                    DELETE FROM {self.schema}.{self.partitioned_table}
                    WHERE {' AND '.join([f'{pk} = OLD.{pk}' for pk in primary_keys])};
                    RETURN OLD;
                END IF;
            END;
            $$ LANGUAGE plpgsql;
        """))

        # Create the trigger
        connection.execute(text(f"""
            CREATE TRIGGER sync_{self.table_name}_trigger
            AFTER INSERT OR UPDATE OR DELETE ON {self.schema}.{self.table_name}
            FOR EACH ROW EXECUTE FUNCTION {self.schema}.sync_{self.table_name}_to_partitioned();
        """))

    def set_nodename(self, nodename: str):
        if not nodename and self.node_name != 'localhost':
            detected_nodename = self.get_database_nodename()
            print("\n" + "=" * 80)
            print("pg_cron Configuration Required")
            print("=" * 80)
            print("\nFor AWS RDS Aurora or other managed databases, you need to specify the correct nodename.")
            if detected_nodename:
                print(f"\nDetected current database nodename: {detected_nodename}")
            print("\nTo verify your nodename, run: SELECT inet_server_addr(), inet_server_port();")
            print("For local development, use: localhost")
            print("For AWS RDS, use your cluster endpoint (e.g., mydb-cluster.cluster-xxx.region.rds.amazonaws.com)")
            print("")
            if detected_nodename:
                default_prompt = f" [{detected_nodename}]"
            else:
                default_prompt = " [localhost]"
            nodename = input(f"Enter the database nodename for pg_cron{default_prompt}: ").strip()
            if not nodename:
                nodename = detected_nodename if detected_nodename else 'localhost'
                print(f"Using: {nodename}")
        self.node_name = nodename

    def create_batch_migration_job(self,
                                   batch_size: int = 100000,
                                   schedule: str = '*/5 * * * *',
                                   job_name: Optional[str] = None,
                                   nodename: Optional[str] = None):
        """
        Create a pg_cron job for batch data migration.

        Args:
            batch_size: Number of rows to migrate per batch
            schedule: Cron schedule (default: every 5 minutes)
            job_name: Custom job name (optional)
            nodename: Database node name for pg_cron (required for AWS RDS)
        """
        connection = op.get_bind()

        if not job_name:
            job_name = f"migrate_{self.table_name}_batch"

        self.set_nodename(nodename)

        # Create migration function
        migration_function = f"pgcron_migrate_{self.table_name}"
        connection.execute(text(f"""
            CREATE OR REPLACE FUNCTION {self.schema}.{migration_function}()
            RETURNS void AS $$
            DECLARE
                v_last_id BIGINT;
                v_max_id BIGINT;
                v_rows_migrated INT;
            BEGIN
                SELECT COALESCE(last_id, 0) INTO v_last_id
                FROM {self.schema}.partition_migration_status
                WHERE table_name = '{self.table_name}'
                  AND last_id IS NOT NULL
                ORDER BY id DESC
                LIMIT 1;
                
                IF v_last_id IS NULL THEN
                    v_last_id := 0;
                END IF;
                
                SELECT MIN(id) + {batch_size} - 1 INTO v_max_id
                FROM (
                    SELECT id 
                    FROM {self.schema}.{self.table_name}
                    WHERE id > v_last_id
                    ORDER BY id
                    LIMIT {batch_size}
                ) t;
                
                IF v_max_id IS NULL THEN
                    SELECT MAX(id) INTO v_max_id
                    FROM {self.schema}.{self.table_name}
                    WHERE id > v_last_id;
                END IF;
                
                IF v_max_id > v_last_id THEN
                    -- Migrate batch
                    INSERT INTO {self.schema}.{self.partitioned_table}
                    SELECT * FROM {self.schema}.{self.table_name}
                    WHERE id > v_last_id AND id <= v_max_id
                    ON CONFLICT DO NOTHING;
                    
                    GET DIAGNOSTICS v_rows_migrated = ROW_COUNT;
                    
                    -- Update status
                    INSERT INTO {self.schema}.partition_migration_status 
                        (table_name, batch_start, batch_end, rows_migrated, last_id)
                    VALUES 
                        ('{self.table_name}', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 
                         v_rows_migrated, v_max_id);
                    
                    RAISE NOTICE 'Migrated % rows for % (IDs % to %)', 
                        v_rows_migrated, '{self.table_name}', v_last_id + 1, v_max_id;
                END IF;
            END;
            $$ LANGUAGE plpgsql;
        """))
        print(f"Scheduling pg_cron job: {job_name}")
        result = connection.execute(text(f"""
            SELECT cron.schedule(
                '{job_name}',
                '{schedule}',
                'SELECT {self.schema}.{migration_function}();'
            ) as jobid
        """))

        jobid = result.fetchone().jobid
        if nodename and nodename != 'localhost':
            print(f"Updating job nodename to: {self.nodename}")
            connection.execute(text(f"""
                UPDATE cron.job 
                SET nodename = '{self.nodename}'
                WHERE jobid = {jobid}
            """))

        # Log migration setup
        connection.execute(text(f"""
            INSERT INTO {self.schema}.partition_migration_status 
                (table_name, started_at, error_message)
            VALUES 
                ('{self.table_name}', CURRENT_TIMESTAMP, 
                 'Batch migration job scheduled')
        """))

    def migrate_data_using_partman(self,
                                   batch_count: int = 10000,
                                   batch_interval: int = 100,
                                   analyze: bool = True) -> None:
        """
        Migrate data by creating a batch migration procedure.

        Note: The procedure contains COMMIT statements so it cannot be called
        directly within an Alembic transaction. Use create_batch_migration_job()
        to schedule it via pg_cron, or run it manually after the migration.

        Args:
            batch_count: Number of rows per batch
            batch_interval: Sleep interval between batches (e.g., '100 milliseconds')
            analyze: Whether to analyze the partitioned table after migration
        """
        connection = op.get_bind()

        # Get column list
        result = connection.execute(text(f"""
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_schema = '{self.schema}' 
            AND table_name = '{self.table_name}'
            ORDER BY ordinal_position
        """))
        columns = [row[0] for row in result]
        columns_str = ', '.join(columns)
        connection.execute(text(f"""
            CREATE OR REPLACE PROCEDURE {self.schema}.migrate_{self.table_name}_batch(
                batch_size INTEGER DEFAULT {batch_count},
                sleep_ms INTEGER DEFAULT {batch_interval}
            )
            LANGUAGE plpgsql
            AS $$
            DECLARE
                last_migrated_id INTEGER;
                rows_copied INTEGER;
                total_copied INTEGER := 0;
                batch_start_time TIMESTAMP;
                batch_end_time TIMESTAMP;
            BEGIN
                -- Start migration
                LOOP
                    batch_start_time := clock_timestamp();
                    
                    -- Get the last migrated ID
                    SELECT COALESCE(MAX(id), 0) INTO last_migrated_id
                    FROM {self.schema}.{self.partitioned_table};
                    
                    -- Copy next batch
                    WITH batch AS (
                        SELECT {columns_str}
                        FROM {self.schema}.{self.table_name}
                        WHERE id > last_migrated_id
                        ORDER BY id
                        LIMIT batch_size
                    )
                    INSERT INTO {self.schema}.{self.partitioned_table} ({columns_str})
                    SELECT {columns_str} FROM batch;
                    
                    GET DIAGNOSTICS rows_copied = ROW_COUNT;
                    
                    -- Exit if no more rows
                    EXIT WHEN rows_copied = 0;
                    
                    total_copied := total_copied + rows_copied;
                    batch_end_time := clock_timestamp();
                    
                    -- Log progress
                    INSERT INTO {self.schema}.partition_migration_status 
                        (table_name, batch_start, batch_end, rows_migrated, last_id)
                    VALUES 
                        ('{self.table_name}', batch_start_time, batch_end_time, 
                         rows_copied, last_migrated_id);
                    
                    -- Commit the batch
                    COMMIT;
                    
                    RAISE NOTICE 'Migrated % rows (total: %, last_id: %)', 
                        rows_copied, total_copied, last_migrated_id;
                    
                    -- Optional sleep to reduce load
                    IF sleep_ms > 0 THEN
                        PERFORM pg_sleep(sleep_ms::numeric / 1000);
                    END IF;
                END LOOP;
                
                RAISE NOTICE 'Migration complete. Total rows migrated: %', total_copied;
            END;
            $$;
        """))
        connection.execute(text(f"""
            INSERT INTO {self.schema}.partition_migration_status 
                (table_name, started_at, error_message)
            VALUES 
                ('{self.table_name}', CURRENT_TIMESTAMP, 
                 'Migration Procedure created for {self.table_name}. Run CALL {self.schema}.migrate_{self.table_name}_batch() to manually trigger or let pg_cron schedule it')
        """))
        print(f"Created batch migration procedure for {self.schema}.{self.table_name}")
        print("To run migration:")
        print(f"  CALL {self.schema}.migrate_{self.table_name}_batch();")
        if analyze:
            connection.execute(text(f"ANALYZE {self.schema}.{self.partitioned_table}"))

    def perform_cutover(self, drop_old_table: bool = False):
        """
        Perform the cutover from old table to partitioned table.

        Args:
            drop_old_table: Whether to drop the old table after cutover
        """
        connection = op.get_bind()
        print(f"Performing cutover for {self.table_name}")
        connection.execute(text("BEGIN"))

        try:
            # Rename original table to old
            connection.execute(text(f"""
                ALTER TABLE {self.schema}.{self.table_name} 
                RENAME TO {self.old_table}
            """))

            # Rename partitioned table to original name
            connection.execute(text(f"""
                ALTER TABLE {self.schema}.{self.partitioned_table} 
                RENAME TO {self.table_name}
            """))

            # Update sequences
            connection.execute(text(f"""
                DO $$
                DECLARE
                    seq_name TEXT;
                    max_id BIGINT;
                BEGIN
                    -- Find sequence name
                    SELECT pg_get_serial_sequence('{self.schema}.{self.old_table}', 'id') INTO seq_name;
                    
                    IF seq_name IS NOT NULL THEN
                        -- Get max ID from new table
                        EXECUTE format('SELECT COALESCE(MAX(id), 0) FROM %I.%I', 
                            '{self.schema}', '{self.table_name}') INTO max_id;
                        
                        -- Set sequence value
                        EXECUTE format('SELECT setval(%L, %s)', seq_name, max_id + 1);
                    END IF;
                END $$;
            """))

            # Update pg_partman configuration
            connection.execute(text(f"""
                UPDATE partman.part_config 
                SET parent_table = '{self.schema}.{self.table_name}'
                WHERE parent_table = '{self.schema}.{self.partitioned_table}'
            """))
            connection.execute(text(f"""
                DROP TRIGGER IF EXISTS tr_sync_{self.table_name} ON {self.schema}.{self.old_table}
            """))
            recreate_foreign_keys(connection, self.schema, self.old_table, self.table_name)
            if drop_old_table:
                connection.execute(text(f"""
                    DROP TABLE IF EXISTS {self.schema}.{self.old_table} CASCADE
                """))
            connection.execute(text("COMMIT"))
            print("Cutover completed successfully")

        except Exception as e:
            connection.execute(text("ROLLBACK"))
            print(f"Cutover failed: {e}")
            raise

    def create_maintenance_job(self, schedule: str = '0 0 * * *', nodename: Optional[str] = None):
        """
        Create a pg_cron job for partition maintenance.

        Args:
            schedule: Cron schedule (default: 2 AM daily)
            nodename: Database node name for pg_cron (required for AWS RDS)
        """
        connection = op.get_bind()
        job_name = f"partman_maintenance_{self.table_name}"
        self.set_nodename(nodename)
        print(f"Creating maintenance job: {job_name}")
        result = connection.execute(text(f"""
            SELECT cron.schedule(
                '{job_name}',
                '{schedule}',
                'CALL partman.run_maintenance_proc()'
            ) as jobid
        """))
        jobid = result.fetchone().jobid
        connection.execute(text(f"""
            UPDATE cron.job 
            SET nodename = '{self.node_name}'
            WHERE jobid = {jobid}
        """))

    def cleanup_migration_artifacts(self, drop_migration_table: bool = False, rm_partman_maintenance: bool = False):
        """Clean up migration tracking and temporary objects."""
        connection = op.get_bind()
        if drop_migration_table:
            connection.execute(text(f"""
                DROP TABLE IF EXISTS {self.schema}.partition_migration_status
            """))

        if rm_partman_maintenance:
            connection.execute(text(f"""
            DO $$
            BEGIN
                IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN
                    -- Check if job exists before trying to unschedule
                    IF EXISTS (SELECT 1 FROM cron.job WHERE jobname = 'migrate_{self.table_name}_batch') THEN
                        PERFORM cron.unschedule('partman_maintenance_{self.table_name}');
                    END IF;
                END IF;
            END $$;
        """))

        connection.execute(text(f"""
            DROP PROCEDURE IF EXISTS {self.schema}.migrate_{self.table_name}_batch
        """))
        connection.execute(text(f"""
            DO $$
            BEGIN
                IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN
                    -- Check if job exists before trying to unschedule
                    IF EXISTS (SELECT 1 FROM cron.job WHERE jobname = 'migrate_{self.table_name}_batch') THEN
                        PERFORM cron.unschedule('migrate_{self.table_name}_batch');
                    END IF;
                END IF;
            END $$;
        """))

        print(f"Cleaned up migration artifacts for {self.schema}.{self.table_name}")


# Convenience functions for common patterns

def create_monthly_range_partition(connection,
                                   schema: str,
                                   table_name: str,
                                   start_date: str,
                                   end_date: str):
    """
    Create a monthly range partition.
    
    Args:
        connection: Database connection
        schema: Schema name
        table_name: Parent table name
        start_date: Start date (YYYY-MM-DD)
        end_date: End date (YYYY-MM-DD)
    """
    partition_name = f"{table_name}_p{start_date[:4]}_{start_date[5:7]}"

    connection.execute(text(f"""
        CREATE TABLE {schema}.{partition_name} PARTITION OF {schema}.{table_name}
        FOR VALUES FROM ('{start_date}') TO ('{end_date}')
    """))


def get_table_indexes(connection, schema: str, table_name: str) -> List[Dict[str, Any]]:
    """
    Get all indexes for a table.
    
    Returns:
        List of index definitions with name and columns
    """
    result = connection.execute(text(f"""
        SELECT 
            i.relname as index_name,
            array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns
        FROM pg_index ix
        JOIN pg_class t ON t.oid = ix.indrelid
        JOIN pg_class i ON i.oid = ix.indexrelid
        JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
        JOIN pg_namespace n ON n.oid = t.relnamespace
        WHERE n.nspname = '{schema}'
        AND t.relname = '{table_name}'
        AND NOT ix.indisprimary
        AND NOT ix.indisunique
        GROUP BY i.relname
    """))

    return [{'name': row[0], 'columns': row[1]} for row in result]


def get_table_constraints(connection, schema: str, table_name: str) -> Dict[str, List[Any]]:
    """
    Get all constraints for a table.
    
    Returns:
        Dictionary with constraint types and their definitions
    """
    result = connection.execute(text(f"""
        SELECT 
            con.conname,
            con.contype,
            array_agg(att.attname ORDER BY att.attnum) as columns
        FROM pg_constraint con
        JOIN pg_class rel ON rel.oid = con.conrelid
        JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
        JOIN pg_attribute att ON att.attrelid = con.conrelid AND att.attnum = ANY(con.conkey)
        WHERE nsp.nspname = '{schema}'
        AND rel.relname = '{table_name}'
        GROUP BY con.conname, con.contype
    """))

    constraints = {
        'primary_keys': [],
        'unique_constraints': [],
        'check_constraints': []
    }

    for row in result:
        contype = row[1]
        # Handle both string and bytes comparison
        if isinstance(contype, bytes):
            contype = contype.decode('utf-8')
        if contype == 'p':  # Primary key
            constraints['primary_keys'] = row[2]
        elif contype == 'u':  # Unique
            constraints['unique_constraints'].append(row[2])
        elif contype == 'c':  # Check
            constraints['check_constraints'].append({'name': row[0], 'columns': row[2]})

    return constraints


def get_table_foreign_keys(connection, schema: str, table_name: str) -> List[Dict[str, Any]]:
    """
    Get all foreign key constraints for a table.
    
    Returns:
        List of foreign key definitions with details
    """
    result = connection.execute(text(f"""
        SELECT 
            con.conname,
            nsp2.nspname AS foreign_schema,
            cl2.relname AS foreign_table,
            array_agg(att.attname ORDER BY att.attnum) as columns,
            array_agg(att2.attname ORDER BY att2.attnum) as foreign_columns,
            con.confupdtype,
            con.confdeltype,
            con.condeferrable,
            con.condeferred
        FROM pg_constraint con
        JOIN pg_class cl ON cl.oid = con.conrelid
        JOIN pg_namespace nsp ON nsp.oid = cl.relnamespace
        JOIN pg_attribute att ON att.attrelid = con.conrelid AND att.attnum = ANY(con.conkey)
        JOIN pg_class cl2 ON cl2.oid = con.confrelid
        JOIN pg_namespace nsp2 ON nsp2.oid = cl2.relnamespace
        JOIN pg_attribute att2 ON att2.attrelid = con.confrelid AND att2.attnum = ANY(con.confkey)
        WHERE nsp.nspname = '{schema}'
        AND cl.relname = '{table_name}'
        AND con.contype = 'f'
        GROUP BY con.conname, con.confrelid, nsp2.nspname, cl2.relname, 
                 con.confupdtype, con.confdeltype, con.condeferrable, con.condeferred
    """))
    foreign_keys = []
    for row in result:
        # Map action codes to SQL keywords
        update_action = {'a': 'NO ACTION', 'r': 'RESTRICT', 'c': 'CASCADE', 'n': 'SET NULL', 'd': 'SET DEFAULT'}
        delete_action = {'a': 'NO ACTION', 'r': 'RESTRICT', 'c': 'CASCADE', 'n': 'SET NULL', 'd': 'SET DEFAULT'}
        fk = {
            'name': row[0],
            'foreign_schema': row[1],
            'foreign_table': row[2],
            'columns': row[3],
            'foreign_columns': row[4],
            'on_update': update_action.get(row[5], 'NO ACTION'),
            'on_delete': delete_action.get(row[6], 'NO ACTION'),
            'deferrable': row[7],
            'initially_deferred': row[8]
        }
        foreign_keys.append(fk)
    return foreign_keys


def recreate_foreign_keys(connection, schema: str, source_table: str, target_table: str):
    """
    Recreate foreign key constraints from source table on target table.
    """
    foreign_keys = get_table_foreign_keys(connection, schema, source_table)
    for fk in foreign_keys:
        columns = ', '.join(fk['columns'])
        foreign_columns = ', '.join(fk['foreign_columns'])
        constraint_def = f"""
            ALTER TABLE {schema}.{target_table}
            ADD CONSTRAINT {fk['name']} 
            FOREIGN KEY ({columns}) 
            REFERENCES {fk['foreign_schema']}.{fk['foreign_table']} ({foreign_columns})
            ON UPDATE {fk['on_update']}
            ON DELETE {fk['on_delete']}
        """
        if fk['deferrable']:
            constraint_def += " DEFERRABLE"
            if fk['initially_deferred']:
                constraint_def += " INITIALLY DEFERRED"
        connection.execute(text(constraint_def))

After the setup of above, you can run

python scripts/alembic/generate_partition_migration.py \
--schema billing \
--table event_transactions \
--key timestamp \
--interval monthly \
--premake 12 \
--batch-size 10000

This would generate the following output if everything was setup correctly in the environment

migration generation
(shared-py3.12) schan common % python scripts/migrations/generate_partition_migration.py \
  --schema billing \
  --table event_transactions \
  --key timestamp \
  --interval monthly \
  --premake 12 \
  --batch-size 10000
Generated migrations:
  1. Partition: /Users/schan/Github/partition_poc/common/alembic/versions/2025_07_12_20_35_44_5b02cf04498e_partition_event_transactions.py
  2. Cutover:   /Users/schan/Github/partition_poc/common/alembic/versions/2025_07_12_20_35_45_0b716fef61b8_cutover_event_transactions.py

Next steps:
  1. Review and edit the generated files
  2. Run: poetry run alembic upgrade 5b02cf04498e
  3. Monitor migration: SELECT * FROM billing.partition_migration_status
  4. When complete, run: poetry run alembic upgrade 0b716fef61b8

You should take a look at the generated file contents. A majority of them points to the “partitioning.py” script we created which are reusable helper methods.

  • Steps 2-3 is merged together this is the file “2025_07_12_20_35_44_5b02cf04498e_partition_event_transactions.py
  • Step 4 is the “2025_07_12_20_35_45_0b716fef61b8_cutover_event_transactions.py

As soon as you upgrade to the file “2025_07_12_20_35_44_5b02cf04498e_partition_event_transactions.py”, a lot of magic happen behind the scene.

  • Analyzes existing table – Captures current indexes and constraints from event_transactions table
  • Creates partitioned table – Builds new event_transactions_partitioned table with timestamp-based partitioning
  • Modifies constraints – Adds timestamp column to primary keys and unique constraints for partition compatibility, since the partitioning column is “timestamp”
  • Configures pg_partman – Sets up monthly partitioning with 12 future partitions pre-created
  • Establishes sync trigger – Ensures new data writes to both original and partitioned tables during migration
  • Initiates data migration – Starts copying existing data in 10,000 row batches
  • Schedules batch migration job – Creates pg_cron job running every minute to migrate remaining data. You can check in your new database under the cron.job_details table for the list of runs.
  • Sets up maintenance job – Schedules daily pg_partman maintenance at midnight for partition management
  • Preserves original table – Keeps source table intact for zero-downtime migration

You may want to calculate how much time it takes to migrate all of the data, or make your pg_cron schedule even more aggressive. For my 24 million rows of data, with one instance of db.t3.large instance of database, it was inserting around 10k rows/seconds. This was about 40 minutes if running non-stop.

I exposed a function called “migrate_<your table name>_batch”. If you want to big bang migrate the entire table in one go without pg_cron, disable in pg_cron first, then run “CALL migrate_<your table name>_batch” in your database sql console.

During the next steps, if you encounter any issue, there is an ‘alembic downgrade -1’ command which you can use to revert changes.

Steps 2-3 – Precutover

Dislcaimer – always test locally before running it in production!

alembic upgrade +1 (pre cutover)
(shared-py3.12) schan@ common % alembic upgrade +1                      
2025-07-12 21:04:57,167 INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
2025-07-12 21:04:57,167 INFO  [alembic.runtime.migration] Will assume transactional DDL.
2025-07-12 21:04:57,180 INFO  [alembic.runtime.migration] Running upgrade install_pgpartman -> 4e697e5a94cc, Partition event_transactions table
Creating partitioned table billing.event_transactions_partitioned

================================================================================
PARTITIONING MIGRATION CONFIGURATION
================================================================================
Source Table:      billing.event_transactions
Target Table:      billing.event_transactions_partitioned
Partition Key:     timestamp
Partition Method:  RANGE partitioning
--------------------------------------------------------------------------------
Primary Keys:      id, timestamp

Unique Constraints: 1 found
  1. (idempotency_key, timestamp)

Indexes:           8 to be created
  - ix_billing_event_transactions_partitioned_anonymous_id on (anonymous_id)
  - ix_billing_event_transactions_partitioned_email on (email)
  - ix_billing_event_transactions_partitioned_event_code on (event_code)
  - ix_billing_event_transactions_partitioned_mbid on (mbid)
  - ix_billing_event_transactions_partitioned_proforma_invoice_id on (proforma_invoice_id)
  - ix_billing_event_transactions_partitioned_session_id on (session_id)
  - ix_billing_event_transactions_partitioned_timestamp on (timestamp)
  - ix_billing_event_transactions_partitioned_user_id on (user_id)
================================================================================

Do you want to continue with the partition creation? (yes/y or no/n): 

If you successfully completed the precutover step, in your database, you will be able to see couple artifacts.

cron.job table

cron.job_run_details table

<your schema>.partition_migration_status

Steps 4 – Cutover

When you’re ready to cut over, run the alembic_upgrade +1 command again and it checks if the record counts matches.

  • You should turn off any insert jobs temporarily so that this check succeeds
  • The old table will be retained, also the migration status table for future references.
  • pg_cron for running the pg_partman maintenance job will be kept for creating future partitions.
  • Resume any jobs that you may have paused for inserting the table.

And now you’re done.

Next Steps:

  • Check how you would like to to detach old partitioned tables, export them to S3 and delete it from your DB
  • Please like and subscribe!

References:

Leave a comment