Oracle to PostgreSQL Time-Window Data Reload Implementation Guide

Overview

This guide walks through implementing a solution for reloading data from a specific time window from Oracle to PostgreSQL while maintaining ongoing CDC (Change Data Capture) replication.

Prerequisites

  • AWS DMS (Database Migration Service) set up with:

    • Source Oracle endpoint configured

    • Target PostgreSQL endpoint configured

    • Replication instance with adequate capacity

  • Access to both Oracle and PostgreSQL databases with proper permissions

  • AWS CLI installed and configured on your management workstation

  • Basic knowledge of SQL, PostgreSQL, and Oracle database concepts

Step 1: Prepare Target PostgreSQL Environment

  1. Create Staging Schema and Tables:

    -- Connect to your PostgreSQL database
    psql -h <postgres-host> -p <postgres-port> -U <username> -d <database>
    
    -- Create staging schema
    CREATE SCHEMA IF NOT EXISTS staging;
    
    -- Create staging tables mirroring your production tables
    CREATE TABLE staging.customers (
        customer_id INTEGER PRIMARY KEY,
        name VARCHAR(100),
        email VARCHAR(100),
        status VARCHAR(20),
        created_at TIMESTAMP,
        updated_at TIMESTAMP,
        _dms_processing_status VARCHAR(20) DEFAULT 'NEW'
    );
    
    CREATE TABLE staging.orders (
        order_id INTEGER PRIMARY KEY,
        customer_id INTEGER,
        amount NUMERIC(10,2),
        order_date TIMESTAMP,
        updated_at TIMESTAMP,
        _dms_processing_status VARCHAR(20) DEFAULT 'NEW'
    );
    
    -- Create indexes for optimization
    CREATE INDEX idx_customers_updated_at ON staging.customers(updated_at);
    CREATE INDEX idx_orders_updated_at ON staging.orders(updated_at);
    
    -- Create admin schema for logging and monitoring
    CREATE SCHEMA IF NOT EXISTS admin;
    
    -- Create log tables
    CREATE TABLE admin.merge_logs (
        id SERIAL PRIMARY KEY,
        table_name VARCHAR(100),
        merge_time TIMESTAMP,
        records_updated INT,
        records_inserted INT
    );
    
    CREATE TABLE admin.merge_summary (
        id SERIAL PRIMARY KEY,
        merge_start TIMESTAMP,
        merge_end TIMESTAMP,
        status VARCHAR(20),
        notes TEXT
    );
  2. Create Merge Procedure:

    CREATE OR REPLACE PROCEDURE merge_time_window_data()
    LANGUAGE plpgsql
    AS $$
    DECLARE
        merge_start_time TIMESTAMP;
        merge_end_time TIMESTAMP;
        records_updated INT;
        records_inserted INT;
    BEGIN
        -- Record start time for logging
        merge_start_time := CURRENT_TIMESTAMP;
        
        -- Begin transaction
        BEGIN
            -- Merge customers table
            WITH updated_rows AS (
                UPDATE production.customers p
                SET 
                    name = s.name,
                    email = s.email,
                    status = s.status,
                    updated_at = s.updated_at
                FROM staging.customers s
                WHERE p.customer_id = s.customer_id
                AND s.updated_at > p.updated_at
                RETURNING p.customer_id
            )
            SELECT COUNT(*) INTO records_updated FROM updated_rows;
            
            -- Insert new records
            WITH inserted_rows AS (
                INSERT INTO production.customers
                SELECT 
                    s.customer_id, s.name, s.email, s.status, s.created_at, s.updated_at
                FROM staging.customers s
                LEFT JOIN production.customers p ON s.customer_id = p.customer_id
                WHERE p.customer_id IS NULL
                RETURNING 1
            )
            SELECT COUNT(*) INTO records_inserted FROM inserted_rows;
            
            -- Log merge results
            INSERT INTO admin.merge_logs
            (table_name, merge_time, records_updated, records_inserted)
            VALUES ('customers', CURRENT_TIMESTAMP, records_updated, records_inserted);
            
            -- Repeat for orders table
            WITH updated_orders AS (
                UPDATE production.orders p
                SET 
                    customer_id = s.customer_id,
                    amount = s.amount,
                    order_date = s.order_date,
                    updated_at = s.updated_at
                FROM staging.orders s
                WHERE p.order_id = s.order_id
                AND s.updated_at > p.updated_at
                RETURNING p.order_id
            )
            SELECT COUNT(*) INTO records_updated FROM updated_orders;
            
            WITH inserted_orders AS (
                INSERT INTO production.orders
                SELECT 
                    s.order_id, s.customer_id, s.amount, s.order_date, s.updated_at
                FROM staging.orders s
                LEFT JOIN production.orders p ON s.order_id = p.order_id
                WHERE p.order_id IS NULL
                RETURNING 1
            )
            SELECT COUNT(*) INTO records_inserted FROM inserted_orders;
            
            INSERT INTO admin.merge_logs
            (table_name, merge_time, records_updated, records_inserted)
            VALUES ('orders', CURRENT_TIMESTAMP, records_updated, records_inserted);
            
            -- Mark all staging records as processed
            UPDATE staging.customers 
            SET _dms_processing_status = 'PROCESSED';
            
            UPDATE staging.orders 
            SET _dms_processing_status = 'PROCESSED';
            
            -- Commit transaction
            COMMIT;
            
            merge_end_time := CURRENT_TIMESTAMP;
            
            -- Log overall merge statistics
            INSERT INTO admin.merge_summary
            (merge_start, merge_end, status, notes)
            VALUES (merge_start_time, merge_end_time, 'SUCCESS', 
                    'Merged Oracle data from time window');
                    
        EXCEPTION WHEN OTHERS THEN
            -- Roll back transaction on error
            ROLLBACK;
            
            -- Log error
            INSERT INTO admin.merge_summary
            (merge_start, merge_end, status, notes)
            VALUES (merge_start_time, CURRENT_TIMESTAMP, 'FAILED', 
                    'Error: ' || SQLERRM);
                    
            -- Re-raise exception
            RAISE;
        END;
    END;
    $$;
  3. Create Monitoring Views:

    CREATE OR REPLACE VIEW admin.oracle_pg_migration_status AS
    SELECT 
        'customers' AS table_name,
        COUNT(*) AS total_records,
        SUM(CASE WHEN _dms_processing_status = 'NEW' THEN 1 ELSE 0 END) AS pending_records,
        SUM(CASE WHEN _dms_processing_status = 'PROCESSED' THEN 1 ELSE 0 END) AS processed_records,
        SUM(CASE WHEN _dms_processing_status = 'ERROR' THEN 1 ELSE 0 END) AS error_records
    FROM staging.customers
    UNION ALL
    SELECT 
        'orders' AS table_name,
        COUNT(*) AS total_records,
        SUM(CASE WHEN _dms_processing_status = 'NEW' THEN 1 ELSE 0 END) AS pending_records,
        SUM(CASE WHEN _dms_processing_status = 'PROCESSED' THEN 1 ELSE 0 END) AS processed_records,
        SUM(CASE WHEN _dms_processing_status = 'ERROR' THEN 1 ELSE 0 END) AS error_records
    FROM staging.orders;

Step 2: Configure DMS Time-Window Migration

  1. Create DMS Task Settings File: Create a file named oracle-pg-task-settings.json with the following content:

  2. Create Table Mappings File: Create a file named oracle-pg-time-window-mapping.json with the following content (adjust the date range as needed):

Step 3: Create and Run the Orchestration Script

  1. Create the Orchestration Script: Create a file named oracle-pg-time-window-reload.sh with the following content:

  2. Make the Script Executable:

  3. Run the Orchestration Script:

Step 4: Monitor and Validate the Process

  1. Check Migration Status:

  2. View Merge Logs:

  3. Check for Data Type Issues:

Step 5: Troubleshooting Common Issues

  1. DMS Task Failures:

    • Check CloudWatch logs for detailed error messages

    • Verify Oracle supplemental logging is enabled for all source tables

    • Ensure PostgreSQL target has sufficient disk space

    • Validate that all Oracle data types are properly mapped to PostgreSQL equivalents

  2. Data Type Conversion Issues:

    • Oracle NUMBER to PostgreSQL NUMERIC may require explicit precision/scale definitions

    • Oracle DATE/TIMESTAMP handling might need special conversion rules

    • Character encoding differences between databases might cause string conversion issues

  3. Merge Procedure Errors:

    • Check admin.merge_summary for specific error messages

    • Verify that primary keys are defined correctly in both staging and production tables

    • Ensure there are no constraint violations during the merge process

Step 6: Finalizing the Process

  1. Cleanup Tasks:

    • After successful verification, truncate staging tables if needed

    • Consider deleting the one-time DMS task to save resources

    • Archive logs for audit purposes

  2. Documentation:

    • Document the entire process, including any issues encountered and resolutions

    • Record the time window that was reloaded for future reference

    • Note any data type conversion or mapping decisions made

Additional Considerations

  1. Performance Optimization:

    • Increase DMS MaxFullLoadSubTasks for larger tables

    • Adjust PostgreSQL configuration for better bulk load performance (increase work_mem, maintenance_work_mem, etc.)

    • Consider temporarily disabling indexes during the load process

  2. Parallel Ongoing CDC:

    • Ensure your continuous CDC task doesn't conflict with the time-window reload

    • Monitor both processes carefully for any signs of interference

  3. Recovery Options:

    • Create a backup of the target tables before running the merge process

    • Have a rollback plan in case of issues during the merge

This implementation guide provides a comprehensive approach to selectively reload time-window data from Oracle to PostgreSQL while maintaining ongoing CDC replication.

Last updated

Was this helpful?