Welcome to Part II series of exploring postgresql partitioning. If you haven’t already checked out Part I, it has some pre-requisite information. This helps you find out if partitioning is the right solution for you, as it did for me.
Table of Contents
Key Information To Know Before Starting
Partitioned Table Characteristics
- Which column will be used for partitioning?
- timestamp. Example ‘2025-05-05 00:00:00’
- Which type of partitioning?
- RANGE – because the column is time series base.
- Declarative or Inheritance?
- Declarative
In addition, I would like to introduce two commonly used extensions. The extensions will automate everything needed. This ensures no future interventions are needed.
pg_partman
Think of pg_partman as an automated filing system for your database.
Imagine you have millions of documents coming into your office every month. Instead of dumping them all in one giant filing cabinet (making it impossible to find anything quickly), you want to organize them into monthly folders.
pg_partman is like hiring an assistant who:
- Creates new monthly folders before you need them
- Labels each folder with the correct date range
- Removes old folders you no longer need (based on your retention policy)
- Maintains the filing system without you having to think about it
Without pg_partman:
“Boss, we need a June 2025 folder!”
“Oh no, I forgot to create it… now all June documents are in the ‘miscellaneous’ pile!”
With pg_partman:
“Boss, June 2025 folder is already ready – I created it 12 months ago!”
“Perfect, I don’t even have to think about it.”
pg_cron
pg_cron is like a scheduler or alarm clock for your database. It runs tasks automatically at times you specify.
Think of it like:
- Setting your coffee maker to brew at 6 AM every day
- Scheduling your sprinklers to water the lawn every evening
- Having your bills auto-pay on the 1st of each month
When combined together, these two extensions will “schedule” the “assistant” to create new partitions, and remove existing ones. No need to login 1 year later when another team has taken over the project and then calls you up due to P1 production issue.
Data Migration Plan
If you’re in a typical tech department:
- Oh crap! I didn’t create this table as a partitioned table the first time. Now I need to migrate the data to the partitioned table.
- Oh no, there is live business impacting activity that is using the table 24/7, I need to migrate this data while the business is running!
Well fear not. I have just the solution for you! (Because I went through the same thing).
Just a disclaimer, if your architecture does not allow downtime at all (more than 5 minutes), you may need additional considerations aside from the ones I provide you below. In my case, the application allow temporary downtime in some odd hours for cutover.
Here is the plan:
- Enable the extensions of pg_partman, and pg_cron in your postgres cluster. (Downtime 1 minute)
- In my company, I used Amazon Aurora RDS Postgresql managed service. This involves enabling the shared_preload_libraries name after creating db parameters group.
- pg_partman library was automatically added for me. But for pg_cron, you still need to add it.
- If you’re using terraform, here is a great module.
- You may need to restart the database instance as it requires loading the library. This is the 1 minute downtime.
- Create the New Partitioned Table, migration functions, and sync triggers. (No Downtime)
- <schema>.<your old table name>_partitioned. Ex: transactions –> transactions_partitioned
- sync triggers are used for migrating data while there are new data coming in real time
- manual migration function is available in case you want to do this in one shot. (Much faster)
- migration table status to see what data has been migrated.
- Precutover: Let the job run in the background for migrating to the new table. (No downtime)
- Postcutover: Move to using the new partitioned table (Downtime 5 minutes)
- Pause all queues/jobs related to inserting into the database. This is the 5 minute downtime.
- rename the partitioned table transactions_partitioned –> transactions
- rename the old non-partitioned table transactions –> transactions_old in case you want to revert or something went wrong.
- drop sync triggers and cron job for migrating existing data.
- Resume all queues/jobs related to inserting into the database.
Looking at the activities, one may think “wow that is a lot of steps”. In my tech stack, I have this completely automated. I’ll share the code snippets here, but even if you’re not using the same tech stack, I’ll try to go over some of the steps in case you have to recreate them in another framework/language. Nowadays, you probably could just chatgpt translate it.
My Tech stacks are:
- Terraformed Amazon Aurora RDS Postgresql
- Python 3.12+
- Alembic for database migrations
- SQLAlechemy for database modeling/ORM
- Poetry for environment managing (and scripts organization)
- pg_partman version 5.1.0
- pg_cron version 1.6.2
- postgres 16+
Pre-requisites: Install Extensions
Extensions code are not installed by default into databases. To use extensions, first we need to get them installed, second, use the “CREATE EXTENSIONS” sql command in the database.
- As I mentioned earlier, pg_partman is already installed, but we need to explicitly install pg_cron.
terraform
module "aurora_postgres_cluster" {
depends_on = [module.vpc]
for_each = var.aurora_postgres
source = "terraform-aws-modules/rds-aurora/aws"
version = "9.9.1"
# https://github.com/terraform-aws-modules/terraform-aws-rds-aurora
## other inputs skipped
create_db_parameter_group = true
db_parameter_group_family = "aurora-postgresql16"
db_parameter_group_parameters = lookup(each.value, "db_parameter_group_parameters", [
{
name = "shared_preload_libraries"
value = "pg_stat_statements,pg_cron"
apply_method = "pending-reboot"
}
])
Upon deploying, you’ll need to reboot the database instance manually. THIS IS A DOWNTIME ACTIVITY. So be sure to pause any jobs and communicate to stakeholders.
Step 1 – Step 4:
The rest of the steps only involves creation of migration files and using alembic +upgrade or -downgrade to run the migrations.
Step 1 – Create Extensions In DB:
Create below file in your alembic repository folder. This is a one time creation assuming the extension never been enabled in the database.
alembic/versions/2025_05_01_install_pg_partman.py
"""Install extensions pg_partman pg_cron
Revision ID: install_pgpartman
Revises: <your previous id>
Create Date: 2025-06-25 15:28:35.277865
"""
from alembic import op
import sqlalchemy as sa
import logging
from sqlalchemy import text
logger = logging.getLogger(__name__)
revision = 'install_pgpartman'
down_revision = '<your previous id>'
branch_labels = None
depends_on = None
def check_version(connection):
result = connection.execute(text("SELECT version_num FROM alembic_version")).fetchone()
if result:
current_version = result[0]
logger.info(f"Current DB version: {current_version}")
if current_version != "<your current id>":
raise Exception(f"Expected version <your current id> but found {current_version}")
else:
logger.info("No version found in alembic_version table.")
def upgrade():
# Install pg_partman extension
op.execute("CREATE SCHEMA IF NOT EXISTS partman")
op.execute("CREATE EXTENSION IF NOT EXISTS pg_partman SCHEMA partman")
op.execute("CREATE EXTENSION IF NOT EXISTS pg_cron")
# Grant necessary permissions
op.execute("GRANT ALL ON SCHEMA partman TO CURRENT_USER")
op.execute("GRANT ALL ON ALL TABLES IN SCHEMA partman TO CURRENT_USER")
op.execute("GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA partman TO CURRENT_USER")
def downgrade():
op.execute("DROP EXTENSION IF EXISTS pg_partman CASCADE")
op.execute("DROP SCHEMA IF EXISTS partman CASCADE")
op.execute("DROP EXTENSION IF EXISTS pg_cron CASCADE")
#afterwards run below:
> alembic upgrade install_pgpartman
Generate Migration files
Below is a migration file generator for any tables you want to partition in future. Generates files for Steps 2-4. Create them under the same alembic folder
alembic/scripts/migrations/generate_partition_migration.py
#!/usr/bin/env python
"""
Generate Alembic migration files for table partitioning.
"""
import argparse
import os
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
# Template for partition creation migration
PARTITION_MIGRATION_TEMPLATE = '''"""Partition {table} table
Creates partitioned version of {schema}.{table} using pg_partman.
"""
from typing import Sequence, Union
import sys
from pathlib import Path
from alembic import op
# Add helpers to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from scripts.migrations.partitioning import PartitioningMigrationHelper, get_table_indexes, get_table_constraints
revision: str = '{revision}'
down_revision: Union[str, None] = {down_revision}
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create partitioned table and migrate data from {schema}.{table}."""
connection = op.get_bind()
# Get existing table structure
indexes = get_table_indexes(connection, '{schema}', '{table}')
constraints = get_table_constraints(connection, '{schema}', '{table}')
# Initialize helper
helper = PartitioningMigrationHelper(
schema='{schema}',
table_name='{table}',
partition_key='{key}'
)
# Adjust primary keys to include partition key
primary_keys = constraints['primary_keys']
if '{key}' not in primary_keys:
primary_keys.append('{key}')
# Adjust unique constraints to include partition key
unique_constraints = []
for constraint in constraints['unique_constraints']:
if '{key}' not in constraint:
constraint.append('{key}')
unique_constraints.append(constraint)
# Adjust index names for partitioned table
partitioned_indexes = []
for idx in indexes:
new_name = idx['name'].replace('{table}', '{table}_partitioned')
partitioned_indexes.append({{'name': new_name, 'columns': idx['columns']}})
# Step 1: Create partitioned table
helper.create_partitioned_table_from_existing(
primary_keys=primary_keys,
unique_constraints=unique_constraints,
indexes=partitioned_indexes
)
# Step 2: Setup pg_partman
helper.setup_pg_partman(
partition_type='native',
partition_interval='{interval}',
premake={premake}
)
# Step 3: Create sync trigger
helper.create_sync_trigger()
# Step 4: Migrate data
helper.migrate_data_using_partman(
batch_count={batch_size},
batch_interval=100,
analyze=True
)
# Step 5: Schedule batch migration job (optional)
helper.create_batch_migration_job(
schedule='*/1 * * * *', # Every minute
batch_size={batch_size}
)
# To run it manually: CALL {schema}.migrate_{table}_batch();
# Step 6: Schedule pg_partman maintenance job
helper.create_maintenance_job(
schedule='0 0 * * *' # Daily at 12am
)
def downgrade() -> None:
"""Remove partitioned table and related objects."""
connection = op.get_bind()
# Drop sync trigger and function
op.execute("DROP TRIGGER IF EXISTS sync_{table}_trigger ON {schema}.{table}")
op.execute("DROP FUNCTION IF EXISTS {schema}.sync_{table}_to_partitioned() CASCADE")
# Drop batch migration procedure
op.execute("DROP PROCEDURE IF EXISTS {schema}.migrate_{table}_batch() CASCADE")
# Clean up migration artifacts
helper = PartitioningMigrationHelper(
schema='{schema}',
table_name='{table}'
)
helper.cleanup_migration_artifacts(drop_migration_table=True, rm_partman_maintenance=True)
# Drop partitioned table
op.execute("DROP TABLE IF EXISTS {schema}.{table}_partitioned CASCADE")
# Clean up pg_partman configuration
op.execute("DELETE FROM partman.part_config WHERE parent_table = '{schema}.{table}_partitioned'")
'''
# Template for cutover migration
CUTOVER_MIGRATION_TEMPLATE = '''"""Cutover {table} to partitioned table
Performs cutover from {schema}.{table} to partitioned version.
"""
from typing import Sequence, Union
import sys
from pathlib import Path
from alembic import op
from sqlalchemy import text
# Add helpers to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from scripts.migrations.partitioning import PartitioningMigrationHelper
revision: str = '{revision}'
down_revision: Union[str, None] = {down_revision}
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Perform cutover to partitioned table."""
connection = op.get_bind()
# Verify migration is complete
result = connection.execute(text("""
SELECT
(SELECT COUNT(*) FROM {schema}.{table}) as original_count,
(SELECT COUNT(*) FROM {schema}.{table}_partitioned) as partitioned_count
""")).fetchone()
print(f"Original table rows: {{result[0]}}")
print(f"Partitioned table rows: {{result[1]}}")
if result[0] != result[1]:
raise Exception(
f"Row counts don't match! Original: {{result[0]}}, Partitioned: {{result[1]}}. "
"Ensure migration is complete before cutover."
)
# Initialize helper
helper = PartitioningMigrationHelper(
schema='{schema}',
table_name='{table}'
)
# Perform cutover
helper.perform_cutover(drop_old_table=False)
# Clean up migration artifacts
helper.cleanup_migration_artifacts()
print("Cutover completed. Old table preserved as {schema}.{table}_old")
def downgrade() -> None:
"""Revert the cutover."""
connection = op.get_bind()
# Revert table names
connection.execute(text("BEGIN"))
try:
connection.execute(text(
"ALTER TABLE {schema}.{table} RENAME TO {table}_partitioned"
))
connection.execute(text(
"ALTER TABLE {schema}.{table}_old RENAME TO {table}"
))
# Update pg_partman
connection.execute(text("""
UPDATE partman.part_config
SET parent_table = '{schema}.{table}_partitioned'
WHERE parent_table = '{schema}.{table}'
"""))
connection.execute(text("COMMIT"))
except Exception as e:
connection.execute(text("ROLLBACK"))
raise
helper = PartitioningMigrationHelper(schema='{schema}', table_name='{table}', partition_key='{key}')
helper.create_sync_trigger()
helper.create_batch_migration_job(schedule='*/1 * * * *', batch_size={batch_size})
helper.create_maintenance_job(schedule='0 0 * * *' )
'''
def generate_revision_id():
"""Generate a unique revision ID."""
return os.urandom(6).hex()
def get_latest_revision(versions_dir):
"""Get the latest Alembic revision."""
try:
common_dir = versions_dir.parent.parent # common directory
result = subprocess.run(
['poetry', 'run', 'alembic:local', 'current'],
capture_output=True,
text=True,
check=True,
cwd=str(common_dir)
)
for line in result.stdout.split('\n'):
if ' (head)' in line:
return line.split()[0]
return None
except:
return None
def main():
parser = argparse.ArgumentParser(
description='Generate Alembic migrations for table partitioning'
)
parser.add_argument('--schema', required=True, help='Database schema')
parser.add_argument('--table', required=True, help='Table name')
parser.add_argument('--key', default='timestamp', help='Partition key column (default: timestamp)')
parser.add_argument('--interval', default='monthly',
choices=['daily', 'weekly', 'monthly', 'yearly'],
help='Partition interval (default: monthly)')
parser.add_argument('--premake', type=int, default=12,
help='Number of future partitions to create (default: 12)')
parser.add_argument('--batch-size', type=int, default=10000,
help='Batch size for data migration (default: 10000)')
parser.add_argument('--output-dir', default='versions',
help='Output directory for migrations')
args = parser.parse_args()
now = datetime.now()
timestamp = now.strftime('%Y_%m_%d_%H_%M_%S')
partition_timestamp = timestamp
cutover_timestamp = (now + timedelta(seconds=1)).strftime('%Y_%m_%d_%H_%M_%S')
common_dir = Path(__file__).parent.parent.parent # common directory
versions_dir = common_dir / 'alembic' / 'versions'
if args.output_dir != 'versions':
versions_dir = Path(args.output_dir)
versions_dir.mkdir(exist_ok=True)
latest_revision = get_latest_revision(versions_dir)
# Default to install_pgpartman if we don't have a latest revision
# since pg_partman is required for partitioning
if not latest_revision:
latest_revision = 'install_pgpartman'
partition_revision = generate_revision_id()
partition_filename = f"{partition_timestamp}_{partition_revision}_partition_{args.table}.py"
partition_content = PARTITION_MIGRATION_TEMPLATE.format(
schema=args.schema,
table=args.table,
key=args.key,
interval=args.interval,
premake=args.premake,
batch_size=args.batch_size,
revision=partition_revision,
down_revision=repr(latest_revision)
)
cutover_revision = generate_revision_id()
cutover_filename = f"{cutover_timestamp}_{cutover_revision}_cutover_{args.table}.py"
cutover_content = CUTOVER_MIGRATION_TEMPLATE.format(
schema=args.schema,
table=args.table,
key=args.key,
revision=cutover_revision,
down_revision=repr(partition_revision),
batch_size=args.batch_size
)
partition_path = versions_dir / partition_filename
with open(partition_path, 'w') as f:
f.write(partition_content)
cutover_path = versions_dir / cutover_filename
with open(cutover_path, 'w') as f:
f.write(cutover_content)
print(f"Generated migrations:")
print(f" 1. Partition: {partition_path}")
print(f" 2. Cutover: {cutover_path}")
print()
print("Next steps:")
print(f" 1. Review and edit the generated files")
print(f" 2. Run: poetry run alembic upgrade {partition_revision}")
print(f" 3. Monitor migration: SELECT * FROM partman.partition_migration_status")
print(f" 4. When complete, run: poetry run alembic upgrade {cutover_revision}")
if __name__ == '__main__':
main()
alembic/scripts/migrations/partitioning.py
"""
Alembic Partitioning Migration Helpers
This module provides reusable functions for creating partitioned tables,
setting up pg_partman, migrating data, and performing cutover operations.
The Original table is renamed to _old, and the Partitioned table is renamed to the original name.
Preconditions:
- The original table must exist
- The partitioned table must not exist
- The partition key column must exist in the original table eg. "timestamp"
- The pg_partman extension must be installed
- The pg_cron extension must be installed
Postcutover:
- The original table is renamed to _old
- The partitioned table is renamed to the original name
- Sequences are updated
"""
import logging
from typing import List, Dict, Optional, Any
from sqlalchemy import text
from alembic import op
logger = logging.getLogger(__name__)
class PartitioningMigrationHelper:
"""Helper class for partitioning migrations."""
def __init__(self, schema: str, table_name: str, partition_key: str = 'timestamp'):
"""
Initialize the partitioning helper.
Args:
schema: Database schema name
table_name: Base table name (without _partitioned suffix)
partition_key: Column to partition by (default: 'timestamp')
"""
self.schema = schema
self.table_name = table_name
self.partition_key = partition_key
self.partitioned_table = f"{table_name}_partitioned"
self.old_table = f"{table_name}_old"
self.node_name = 'localhost'
def get_database_nodename(self) -> Optional[str]:
"""
Get the database nodename for pg_cron configuration.
Returns:
The nodename (IP address or hostname) of the database server
"""
connection = op.get_bind()
result = connection.execute(text("SELECT inet_server_addr() as nodename"))
row = result.fetchone()
return row.nodename if row and row.nodename else None
def create_migration_status_table(self):
"""Create the partition migration status table if it doesn't exist."""
connection = op.get_bind()
print("Creating partition migration status table")
connection.execute(text(f"""
CREATE TABLE IF NOT EXISTS {self.schema}.partition_migration_status (
id SERIAL PRIMARY KEY,
table_name TEXT NOT NULL,
batch_start TIMESTAMP,
batch_end TIMESTAMP,
rows_migrated INTEGER,
last_id INTEGER,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT
)
"""))
def create_partitioned_table_from_existing(self, primary_keys: List[str], unique_constraints: List[List[str]], foreign_keys: Optional[List[Dict[str, Any]]] = None,
indexes: Optional[List[Dict[str, str]]] = None,
check_constraints: Optional[List[Dict[str, str]]] = None):
"""
Create a partitioned table based on an existing table structure.
Args:
primary_keys: List of primary key columns (must include partition key)
unique_constraints: List of unique constraint column lists
foreign_keys: List of foreign key definitions
indexes: List of index definitions with 'name' and 'columns'
check_constraints: List of check constraint definitions
"""
connection = op.get_bind()
print(f"Creating partitioned table {self.schema}.{self.partitioned_table}")
print("\n" + "=" * 80)
print(f"PARTITIONING MIGRATION CONFIGURATION")
print("=" * 80)
print(f"Source Table: {self.schema}.{self.table_name}")
print(f"Target Table: {self.schema}.{self.partitioned_table}")
print(f"Partition Key: {self.partition_key}")
print(f"Partition Method: RANGE partitioning")
print("-" * 80)
# Primary Keys
print(f"Primary Keys: {', '.join(primary_keys) if primary_keys else 'None detected'}")
if primary_keys and self.partition_key not in primary_keys:
print(f" → Will become: ({', '.join(primary_keys + [self.partition_key])})")
# Unique Constraints
print(f"\nUnique Constraints: {len(unique_constraints)} found")
for i, constraint in enumerate(unique_constraints, 1):
print(f" {i}. ({', '.join(constraint)})")
if self.partition_key not in constraint:
print(f" → Will become: ({', '.join(constraint + [self.partition_key])})")
# Indexes
print(f"\nIndexes: {len(indexes)} to be created")
for idx in indexes:
print(f" - {idx['name']} on ({', '.join(idx['columns']) if isinstance(idx['columns'], list) else idx['columns']})")
# Foreign Keys
if foreign_keys:
print(f"\nForeign Keys: {len(foreign_keys)} to be created")
for fk in foreign_keys:
print(f" - {fk.get('name', 'unnamed')}: {fk['column']} → {fk['references']}({fk['ref_column']})")
print("=" * 80 + "\n")
user_response = input("Do you want to continue with the partition creation? (yes/y or no/n): ").strip().lower()
if user_response not in ['yes', 'y']:
print("Partition creation cancelled by user.")
import sys
sys.exit(0)
print("\nContinuing with partition creation...\n")
# Get column definitions from existing table
create_sql = f"""
CREATE TABLE {self.schema}.{self.partitioned_table} (
LIKE {self.schema}.{self.table_name} INCLUDING DEFAULTS INCLUDING COMMENTS
) PARTITION BY RANGE ({self.partition_key})
"""
connection.execute(text(create_sql))
# Add primary key constraint
pk_constraint = f"pk_{self.partitioned_table}"
pk_columns = ', '.join(primary_keys)
connection.execute(text(f"""
ALTER TABLE {self.schema}.{self.partitioned_table}
ADD CONSTRAINT {pk_constraint} PRIMARY KEY ({pk_columns})
"""))
# Add unique constraints
for i, constraint_cols in enumerate(unique_constraints):
constraint_name = f"uq_{self.partitioned_table}_{i}"
columns = ', '.join(constraint_cols)
connection.execute(text(f"""
ALTER TABLE {self.schema}.{self.partitioned_table}
ADD CONSTRAINT {constraint_name} UNIQUE ({columns})
"""))
# Add foreign keys
if foreign_keys:
for fk in foreign_keys:
fk_name = fk.get('name', f"fk_{self.partitioned_table}_{fk['column']}")
connection.execute(text(f"""
ALTER TABLE {self.schema}.{self.partitioned_table}
ADD CONSTRAINT {fk_name} FOREIGN KEY ({fk['column']})
REFERENCES {fk['references']} ({fk['ref_column']})
{fk.get('options', '')}
"""))
# Create indexes
if indexes:
for index in indexes:
index_name = index['name']
columns = ', '.join(index['columns']) if isinstance(index['columns'], list) else index['columns']
connection.execute(text(f"""
CREATE INDEX IF NOT EXISTS {index_name}
ON {self.schema}.{self.partitioned_table} ({columns})
"""))
# Add check constraints
if check_constraints:
for constraint in check_constraints:
connection.execute(text(f"""
ALTER TABLE {self.schema}.{self.partitioned_table}
ADD CONSTRAINT {constraint['name']} CHECK ({constraint['check']})
"""))
self.create_migration_status_table()
def setup_pg_partman(self, partition_type: str = 'native', partition_interval: str = '1 month', premake: int = 12, retention_keep_table: bool = True,
retention_keep_index: bool = True,
inherit_privileges: bool = True):
"""
Set up pg_partman for the partitioned table.
Args:
partition_type: Type of partitioning ('native' or 'trigger')
partition_interval: Partitioning interval (e.g., 'monthly', 'daily')
premake: Number of partitions to create in advance
retention_keep_table: Whether to keep tables when retention is applied
retention_keep_index: Whether to keep indexes when retention is applied
inherit_privileges: Whether child tables inherit parent privileges
"""
connection = op.get_bind()
print(f"Setting up pg_partman for {self.schema}.{self.partitioned_table}")
interval_map = {
'monthly': '1 month',
'weekly': '1 week',
'daily': '1 day',
'hourly': '1 hour'
}
actual_interval = interval_map.get(partition_interval, partition_interval)
# Create parent partition - tested on version pg_partman 5.1.0. Versions below 5.1.0 uses different syntax.
connection.execute(text(f"""
SELECT partman.create_parent(
'{self.schema}.{self.partitioned_table}', -- p_parent_table
'{self.partition_key}', -- p_control
'{actual_interval}', -- p_interval (must be before p_type!)
'range', -- p_type (always 'range' for v5)
'none', -- p_epoch
{premake}, -- p_premake
NULL, -- p_start_partition
true, -- p_default_table
'on' -- p_automatic_maintenance
)
"""))
print(f"Created parent partition for {self.schema}.{self.partitioned_table}")
connection.execute(text(f"""
UPDATE partman.part_config
SET retention_keep_table = {retention_keep_table},
retention_keep_index = {retention_keep_index},
inherit_privileges = {inherit_privileges}
WHERE parent_table = '{self.schema}.{self.partitioned_table}'
"""))
def create_sync_trigger(self) -> None:
"""Create sync trigger for real-time data synchronization during migration."""
connection = op.get_bind()
columns = []
result = connection.execute(text(f"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = '{self.schema}'
AND table_name = '{self.table_name}'
ORDER BY ordinal_position
"""))
columns = [row[0] for row in result]
columns_str = ', '.join(columns)
values_str = ', '.join([f'NEW.{col}' for col in columns])
# Generate SET clause for UPDATE (excluding primary key columns)
# Get primary key columns
pk_result = connection.execute(text(f"""
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = '{self.schema}.{self.table_name}'::regclass
AND i.indisprimary
"""))
primary_keys = [row[0] for row in pk_result]
# Exclude primary keys from update
update_cols = [col for col in columns if col not in primary_keys]
set_clause = ',\n '.join([f'{col} = NEW.{col}' for col in update_cols])
# Create the sync function
connection.execute(text(f"""
CREATE OR REPLACE FUNCTION {self.schema}.sync_{self.table_name}_to_partitioned()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO {self.schema}.{self.partitioned_table}
({columns_str})
VALUES
({values_str})
ON CONFLICT DO NOTHING;
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
UPDATE {self.schema}.{self.partitioned_table}
SET {set_clause}
WHERE {' AND '.join([f'{pk} = NEW.{pk}' for pk in primary_keys])};
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
DELETE FROM {self.schema}.{self.partitioned_table}
WHERE {' AND '.join([f'{pk} = OLD.{pk}' for pk in primary_keys])};
RETURN OLD;
END IF;
END;
$$ LANGUAGE plpgsql;
"""))
# Create the trigger
connection.execute(text(f"""
CREATE TRIGGER sync_{self.table_name}_trigger
AFTER INSERT OR UPDATE OR DELETE ON {self.schema}.{self.table_name}
FOR EACH ROW EXECUTE FUNCTION {self.schema}.sync_{self.table_name}_to_partitioned();
"""))
def set_nodename(self, nodename: str):
if not nodename and self.node_name != 'localhost':
detected_nodename = self.get_database_nodename()
print("\n" + "=" * 80)
print("pg_cron Configuration Required")
print("=" * 80)
print("\nFor AWS RDS Aurora or other managed databases, you need to specify the correct nodename.")
if detected_nodename:
print(f"\nDetected current database nodename: {detected_nodename}")
print("\nTo verify your nodename, run: SELECT inet_server_addr(), inet_server_port();")
print("For local development, use: localhost")
print("For AWS RDS, use your cluster endpoint (e.g., mydb-cluster.cluster-xxx.region.rds.amazonaws.com)")
print("")
if detected_nodename:
default_prompt = f" [{detected_nodename}]"
else:
default_prompt = " [localhost]"
nodename = input(f"Enter the database nodename for pg_cron{default_prompt}: ").strip()
if not nodename:
nodename = detected_nodename if detected_nodename else 'localhost'
print(f"Using: {nodename}")
self.node_name = nodename
def create_batch_migration_job(self,
batch_size: int = 100000,
schedule: str = '*/5 * * * *',
job_name: Optional[str] = None,
nodename: Optional[str] = None):
"""
Create a pg_cron job for batch data migration.
Args:
batch_size: Number of rows to migrate per batch
schedule: Cron schedule (default: every 5 minutes)
job_name: Custom job name (optional)
nodename: Database node name for pg_cron (required for AWS RDS)
"""
connection = op.get_bind()
if not job_name:
job_name = f"migrate_{self.table_name}_batch"
self.set_nodename(nodename)
# Create migration function
migration_function = f"pgcron_migrate_{self.table_name}"
connection.execute(text(f"""
CREATE OR REPLACE FUNCTION {self.schema}.{migration_function}()
RETURNS void AS $$
DECLARE
v_last_id BIGINT;
v_max_id BIGINT;
v_rows_migrated INT;
BEGIN
SELECT COALESCE(last_id, 0) INTO v_last_id
FROM {self.schema}.partition_migration_status
WHERE table_name = '{self.table_name}'
AND last_id IS NOT NULL
ORDER BY id DESC
LIMIT 1;
IF v_last_id IS NULL THEN
v_last_id := 0;
END IF;
SELECT MIN(id) + {batch_size} - 1 INTO v_max_id
FROM (
SELECT id
FROM {self.schema}.{self.table_name}
WHERE id > v_last_id
ORDER BY id
LIMIT {batch_size}
) t;
IF v_max_id IS NULL THEN
SELECT MAX(id) INTO v_max_id
FROM {self.schema}.{self.table_name}
WHERE id > v_last_id;
END IF;
IF v_max_id > v_last_id THEN
-- Migrate batch
INSERT INTO {self.schema}.{self.partitioned_table}
SELECT * FROM {self.schema}.{self.table_name}
WHERE id > v_last_id AND id <= v_max_id
ON CONFLICT DO NOTHING;
GET DIAGNOSTICS v_rows_migrated = ROW_COUNT;
-- Update status
INSERT INTO {self.schema}.partition_migration_status
(table_name, batch_start, batch_end, rows_migrated, last_id)
VALUES
('{self.table_name}', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP,
v_rows_migrated, v_max_id);
RAISE NOTICE 'Migrated % rows for % (IDs % to %)',
v_rows_migrated, '{self.table_name}', v_last_id + 1, v_max_id;
END IF;
END;
$$ LANGUAGE plpgsql;
"""))
print(f"Scheduling pg_cron job: {job_name}")
result = connection.execute(text(f"""
SELECT cron.schedule(
'{job_name}',
'{schedule}',
'SELECT {self.schema}.{migration_function}();'
) as jobid
"""))
jobid = result.fetchone().jobid
if nodename and nodename != 'localhost':
print(f"Updating job nodename to: {self.nodename}")
connection.execute(text(f"""
UPDATE cron.job
SET nodename = '{self.nodename}'
WHERE jobid = {jobid}
"""))
# Log migration setup
connection.execute(text(f"""
INSERT INTO {self.schema}.partition_migration_status
(table_name, started_at, error_message)
VALUES
('{self.table_name}', CURRENT_TIMESTAMP,
'Batch migration job scheduled')
"""))
def migrate_data_using_partman(self,
batch_count: int = 10000,
batch_interval: int = 100,
analyze: bool = True) -> None:
"""
Migrate data by creating a batch migration procedure.
Note: The procedure contains COMMIT statements so it cannot be called
directly within an Alembic transaction. Use create_batch_migration_job()
to schedule it via pg_cron, or run it manually after the migration.
Args:
batch_count: Number of rows per batch
batch_interval: Sleep interval between batches (e.g., '100 milliseconds')
analyze: Whether to analyze the partitioned table after migration
"""
connection = op.get_bind()
# Get column list
result = connection.execute(text(f"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = '{self.schema}'
AND table_name = '{self.table_name}'
ORDER BY ordinal_position
"""))
columns = [row[0] for row in result]
columns_str = ', '.join(columns)
connection.execute(text(f"""
CREATE OR REPLACE PROCEDURE {self.schema}.migrate_{self.table_name}_batch(
batch_size INTEGER DEFAULT {batch_count},
sleep_ms INTEGER DEFAULT {batch_interval}
)
LANGUAGE plpgsql
AS $$
DECLARE
last_migrated_id INTEGER;
rows_copied INTEGER;
total_copied INTEGER := 0;
batch_start_time TIMESTAMP;
batch_end_time TIMESTAMP;
BEGIN
-- Start migration
LOOP
batch_start_time := clock_timestamp();
-- Get the last migrated ID
SELECT COALESCE(MAX(id), 0) INTO last_migrated_id
FROM {self.schema}.{self.partitioned_table};
-- Copy next batch
WITH batch AS (
SELECT {columns_str}
FROM {self.schema}.{self.table_name}
WHERE id > last_migrated_id
ORDER BY id
LIMIT batch_size
)
INSERT INTO {self.schema}.{self.partitioned_table} ({columns_str})
SELECT {columns_str} FROM batch;
GET DIAGNOSTICS rows_copied = ROW_COUNT;
-- Exit if no more rows
EXIT WHEN rows_copied = 0;
total_copied := total_copied + rows_copied;
batch_end_time := clock_timestamp();
-- Log progress
INSERT INTO {self.schema}.partition_migration_status
(table_name, batch_start, batch_end, rows_migrated, last_id)
VALUES
('{self.table_name}', batch_start_time, batch_end_time,
rows_copied, last_migrated_id);
-- Commit the batch
COMMIT;
RAISE NOTICE 'Migrated % rows (total: %, last_id: %)',
rows_copied, total_copied, last_migrated_id;
-- Optional sleep to reduce load
IF sleep_ms > 0 THEN
PERFORM pg_sleep(sleep_ms::numeric / 1000);
END IF;
END LOOP;
RAISE NOTICE 'Migration complete. Total rows migrated: %', total_copied;
END;
$$;
"""))
connection.execute(text(f"""
INSERT INTO {self.schema}.partition_migration_status
(table_name, started_at, error_message)
VALUES
('{self.table_name}', CURRENT_TIMESTAMP,
'Migration Procedure created for {self.table_name}. Run CALL {self.schema}.migrate_{self.table_name}_batch() to manually trigger or let pg_cron schedule it')
"""))
print(f"Created batch migration procedure for {self.schema}.{self.table_name}")
print("To run migration:")
print(f" CALL {self.schema}.migrate_{self.table_name}_batch();")
if analyze:
connection.execute(text(f"ANALYZE {self.schema}.{self.partitioned_table}"))
def perform_cutover(self, drop_old_table: bool = False):
"""
Perform the cutover from old table to partitioned table.
Args:
drop_old_table: Whether to drop the old table after cutover
"""
connection = op.get_bind()
print(f"Performing cutover for {self.table_name}")
connection.execute(text("BEGIN"))
try:
# Rename original table to old
connection.execute(text(f"""
ALTER TABLE {self.schema}.{self.table_name}
RENAME TO {self.old_table}
"""))
# Rename partitioned table to original name
connection.execute(text(f"""
ALTER TABLE {self.schema}.{self.partitioned_table}
RENAME TO {self.table_name}
"""))
# Update sequences
connection.execute(text(f"""
DO $$
DECLARE
seq_name TEXT;
max_id BIGINT;
BEGIN
-- Find sequence name
SELECT pg_get_serial_sequence('{self.schema}.{self.old_table}', 'id') INTO seq_name;
IF seq_name IS NOT NULL THEN
-- Get max ID from new table
EXECUTE format('SELECT COALESCE(MAX(id), 0) FROM %I.%I',
'{self.schema}', '{self.table_name}') INTO max_id;
-- Set sequence value
EXECUTE format('SELECT setval(%L, %s)', seq_name, max_id + 1);
END IF;
END $$;
"""))
# Update pg_partman configuration
connection.execute(text(f"""
UPDATE partman.part_config
SET parent_table = '{self.schema}.{self.table_name}'
WHERE parent_table = '{self.schema}.{self.partitioned_table}'
"""))
connection.execute(text(f"""
DROP TRIGGER IF EXISTS tr_sync_{self.table_name} ON {self.schema}.{self.old_table}
"""))
recreate_foreign_keys(connection, self.schema, self.old_table, self.table_name)
if drop_old_table:
connection.execute(text(f"""
DROP TABLE IF EXISTS {self.schema}.{self.old_table} CASCADE
"""))
connection.execute(text("COMMIT"))
print("Cutover completed successfully")
except Exception as e:
connection.execute(text("ROLLBACK"))
print(f"Cutover failed: {e}")
raise
def create_maintenance_job(self, schedule: str = '0 0 * * *', nodename: Optional[str] = None):
"""
Create a pg_cron job for partition maintenance.
Args:
schedule: Cron schedule (default: 2 AM daily)
nodename: Database node name for pg_cron (required for AWS RDS)
"""
connection = op.get_bind()
job_name = f"partman_maintenance_{self.table_name}"
self.set_nodename(nodename)
print(f"Creating maintenance job: {job_name}")
result = connection.execute(text(f"""
SELECT cron.schedule(
'{job_name}',
'{schedule}',
'CALL partman.run_maintenance_proc()'
) as jobid
"""))
jobid = result.fetchone().jobid
connection.execute(text(f"""
UPDATE cron.job
SET nodename = '{self.node_name}'
WHERE jobid = {jobid}
"""))
def cleanup_migration_artifacts(self, drop_migration_table: bool = False, rm_partman_maintenance: bool = False):
"""Clean up migration tracking and temporary objects."""
connection = op.get_bind()
if drop_migration_table:
connection.execute(text(f"""
DROP TABLE IF EXISTS {self.schema}.partition_migration_status
"""))
if rm_partman_maintenance:
connection.execute(text(f"""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN
-- Check if job exists before trying to unschedule
IF EXISTS (SELECT 1 FROM cron.job WHERE jobname = 'migrate_{self.table_name}_batch') THEN
PERFORM cron.unschedule('partman_maintenance_{self.table_name}');
END IF;
END IF;
END $$;
"""))
connection.execute(text(f"""
DROP PROCEDURE IF EXISTS {self.schema}.migrate_{self.table_name}_batch
"""))
connection.execute(text(f"""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN
-- Check if job exists before trying to unschedule
IF EXISTS (SELECT 1 FROM cron.job WHERE jobname = 'migrate_{self.table_name}_batch') THEN
PERFORM cron.unschedule('migrate_{self.table_name}_batch');
END IF;
END IF;
END $$;
"""))
print(f"Cleaned up migration artifacts for {self.schema}.{self.table_name}")
# Convenience functions for common patterns
def create_monthly_range_partition(connection,
schema: str,
table_name: str,
start_date: str,
end_date: str):
"""
Create a monthly range partition.
Args:
connection: Database connection
schema: Schema name
table_name: Parent table name
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
"""
partition_name = f"{table_name}_p{start_date[:4]}_{start_date[5:7]}"
connection.execute(text(f"""
CREATE TABLE {schema}.{partition_name} PARTITION OF {schema}.{table_name}
FOR VALUES FROM ('{start_date}') TO ('{end_date}')
"""))
def get_table_indexes(connection, schema: str, table_name: str) -> List[Dict[str, Any]]:
"""
Get all indexes for a table.
Returns:
List of index definitions with name and columns
"""
result = connection.execute(text(f"""
SELECT
i.relname as index_name,
array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns
FROM pg_index ix
JOIN pg_class t ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
JOIN pg_namespace n ON n.oid = t.relnamespace
WHERE n.nspname = '{schema}'
AND t.relname = '{table_name}'
AND NOT ix.indisprimary
AND NOT ix.indisunique
GROUP BY i.relname
"""))
return [{'name': row[0], 'columns': row[1]} for row in result]
def get_table_constraints(connection, schema: str, table_name: str) -> Dict[str, List[Any]]:
"""
Get all constraints for a table.
Returns:
Dictionary with constraint types and their definitions
"""
result = connection.execute(text(f"""
SELECT
con.conname,
con.contype,
array_agg(att.attname ORDER BY att.attnum) as columns
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN pg_attribute att ON att.attrelid = con.conrelid AND att.attnum = ANY(con.conkey)
WHERE nsp.nspname = '{schema}'
AND rel.relname = '{table_name}'
GROUP BY con.conname, con.contype
"""))
constraints = {
'primary_keys': [],
'unique_constraints': [],
'check_constraints': []
}
for row in result:
contype = row[1]
# Handle both string and bytes comparison
if isinstance(contype, bytes):
contype = contype.decode('utf-8')
if contype == 'p': # Primary key
constraints['primary_keys'] = row[2]
elif contype == 'u': # Unique
constraints['unique_constraints'].append(row[2])
elif contype == 'c': # Check
constraints['check_constraints'].append({'name': row[0], 'columns': row[2]})
return constraints
def get_table_foreign_keys(connection, schema: str, table_name: str) -> List[Dict[str, Any]]:
"""
Get all foreign key constraints for a table.
Returns:
List of foreign key definitions with details
"""
result = connection.execute(text(f"""
SELECT
con.conname,
nsp2.nspname AS foreign_schema,
cl2.relname AS foreign_table,
array_agg(att.attname ORDER BY att.attnum) as columns,
array_agg(att2.attname ORDER BY att2.attnum) as foreign_columns,
con.confupdtype,
con.confdeltype,
con.condeferrable,
con.condeferred
FROM pg_constraint con
JOIN pg_class cl ON cl.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = cl.relnamespace
JOIN pg_attribute att ON att.attrelid = con.conrelid AND att.attnum = ANY(con.conkey)
JOIN pg_class cl2 ON cl2.oid = con.confrelid
JOIN pg_namespace nsp2 ON nsp2.oid = cl2.relnamespace
JOIN pg_attribute att2 ON att2.attrelid = con.confrelid AND att2.attnum = ANY(con.confkey)
WHERE nsp.nspname = '{schema}'
AND cl.relname = '{table_name}'
AND con.contype = 'f'
GROUP BY con.conname, con.confrelid, nsp2.nspname, cl2.relname,
con.confupdtype, con.confdeltype, con.condeferrable, con.condeferred
"""))
foreign_keys = []
for row in result:
# Map action codes to SQL keywords
update_action = {'a': 'NO ACTION', 'r': 'RESTRICT', 'c': 'CASCADE', 'n': 'SET NULL', 'd': 'SET DEFAULT'}
delete_action = {'a': 'NO ACTION', 'r': 'RESTRICT', 'c': 'CASCADE', 'n': 'SET NULL', 'd': 'SET DEFAULT'}
fk = {
'name': row[0],
'foreign_schema': row[1],
'foreign_table': row[2],
'columns': row[3],
'foreign_columns': row[4],
'on_update': update_action.get(row[5], 'NO ACTION'),
'on_delete': delete_action.get(row[6], 'NO ACTION'),
'deferrable': row[7],
'initially_deferred': row[8]
}
foreign_keys.append(fk)
return foreign_keys
def recreate_foreign_keys(connection, schema: str, source_table: str, target_table: str):
"""
Recreate foreign key constraints from source table on target table.
"""
foreign_keys = get_table_foreign_keys(connection, schema, source_table)
for fk in foreign_keys:
columns = ', '.join(fk['columns'])
foreign_columns = ', '.join(fk['foreign_columns'])
constraint_def = f"""
ALTER TABLE {schema}.{target_table}
ADD CONSTRAINT {fk['name']}
FOREIGN KEY ({columns})
REFERENCES {fk['foreign_schema']}.{fk['foreign_table']} ({foreign_columns})
ON UPDATE {fk['on_update']}
ON DELETE {fk['on_delete']}
"""
if fk['deferrable']:
constraint_def += " DEFERRABLE"
if fk['initially_deferred']:
constraint_def += " INITIALLY DEFERRED"
connection.execute(text(constraint_def))
After the setup of above, you can run
python scripts/alembic/generate_partition_migration.py \
--schema billing \
--table event_transactions \
--key timestamp \
--interval monthly \
--premake 12 \
--batch-size 10000
This would generate the following output if everything was setup correctly in the environment
migration generation
(shared-py3.12) schan common % python scripts/migrations/generate_partition_migration.py \
--schema billing \
--table event_transactions \
--key timestamp \
--interval monthly \
--premake 12 \
--batch-size 10000
Generated migrations:
1. Partition: /Users/schan/Github/partition_poc/common/alembic/versions/2025_07_12_20_35_44_5b02cf04498e_partition_event_transactions.py
2. Cutover: /Users/schan/Github/partition_poc/common/alembic/versions/2025_07_12_20_35_45_0b716fef61b8_cutover_event_transactions.py
Next steps:
1. Review and edit the generated files
2. Run: poetry run alembic upgrade 5b02cf04498e
3. Monitor migration: SELECT * FROM billing.partition_migration_status
4. When complete, run: poetry run alembic upgrade 0b716fef61b8
You should take a look at the generated file contents. A majority of them points to the “partitioning.py” script we created which are reusable helper methods.
- Steps 2-3 is merged together this is the file “2025_07_12_20_35_44_5b02cf04498e_partition_event_transactions.py”
- Step 4 is the “2025_07_12_20_35_45_0b716fef61b8_cutover_event_transactions.py“
As soon as you upgrade to the file “2025_07_12_20_35_44_5b02cf04498e_partition_event_transactions.py”, a lot of magic happen behind the scene.
- Analyzes existing table – Captures current indexes and constraints from event_transactions table
- Creates partitioned table – Builds new event_transactions_partitioned table with timestamp-based partitioning
- Modifies constraints – Adds timestamp column to primary keys and unique constraints for partition compatibility, since the partitioning column is “timestamp”
- Configures pg_partman – Sets up monthly partitioning with 12 future partitions pre-created
- Establishes sync trigger – Ensures new data writes to both original and partitioned tables during migration
- Initiates data migration – Starts copying existing data in 10,000 row batches
- Schedules batch migration job – Creates pg_cron job running every minute to migrate remaining data. You can check in your new database under the cron.job_details table for the list of runs.
- Sets up maintenance job – Schedules daily pg_partman maintenance at midnight for partition management
- Preserves original table – Keeps source table intact for zero-downtime migration
You may want to calculate how much time it takes to migrate all of the data, or make your pg_cron schedule even more aggressive. For my 24 million rows of data, with one instance of db.t3.large instance of database, it was inserting around 10k rows/seconds. This was about 40 minutes if running non-stop.
I exposed a function called “migrate_<your table name>_batch”. If you want to big bang migrate the entire table in one go without pg_cron, disable in pg_cron first, then run “CALL migrate_<your table name>_batch” in your database sql console.
During the next steps, if you encounter any issue, there is an ‘alembic downgrade -1’ command which you can use to revert changes.
Steps 2-3 – Precutover
Dislcaimer – always test locally before running it in production!
alembic upgrade +1 (pre cutover)
(shared-py3.12) schan@ common % alembic upgrade +1
2025-07-12 21:04:57,167 INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
2025-07-12 21:04:57,167 INFO [alembic.runtime.migration] Will assume transactional DDL.
2025-07-12 21:04:57,180 INFO [alembic.runtime.migration] Running upgrade install_pgpartman -> 4e697e5a94cc, Partition event_transactions table
Creating partitioned table billing.event_transactions_partitioned
================================================================================
PARTITIONING MIGRATION CONFIGURATION
================================================================================
Source Table: billing.event_transactions
Target Table: billing.event_transactions_partitioned
Partition Key: timestamp
Partition Method: RANGE partitioning
--------------------------------------------------------------------------------
Primary Keys: id, timestamp
Unique Constraints: 1 found
1. (idempotency_key, timestamp)
Indexes: 8 to be created
- ix_billing_event_transactions_partitioned_anonymous_id on (anonymous_id)
- ix_billing_event_transactions_partitioned_email on (email)
- ix_billing_event_transactions_partitioned_event_code on (event_code)
- ix_billing_event_transactions_partitioned_mbid on (mbid)
- ix_billing_event_transactions_partitioned_proforma_invoice_id on (proforma_invoice_id)
- ix_billing_event_transactions_partitioned_session_id on (session_id)
- ix_billing_event_transactions_partitioned_timestamp on (timestamp)
- ix_billing_event_transactions_partitioned_user_id on (user_id)
================================================================================
Do you want to continue with the partition creation? (yes/y or no/n):
If you successfully completed the precutover step, in your database, you will be able to see couple artifacts.
cron.job table

cron.job_run_details table

<your schema>.partition_migration_status

Steps 4 – Cutover
When you’re ready to cut over, run the alembic_upgrade +1 command again and it checks if the record counts matches.
- You should turn off any insert jobs temporarily so that this check succeeds

- The old table will be retained, also the migration status table for future references.
- pg_cron for running the pg_partman maintenance job will be kept for creating future partitions.
- Resume any jobs that you may have paused for inserting the table.
And now you’re done.
Next Steps:
- Check how you would like to to detach old partitioned tables, export them to S3 and delete it from your DB
- Please like and subscribe!

Leave a comment