Skip to content

Capture pre & post migration record count from AWS DMS to a Athena table!

At this moment I do not have a personal relationship with a computer.

Janet Reno

Capturing record counts are important for several reasons in the context of data migration and ingestion:

  1. Data completeness: Record counts help to ensure that all the data from the source system has been migrated to the target system without any data loss or truncation.
  2. Data quality: Record counts also help to check the quality of the data by comparing the number of records in the source and target systems. Any discrepancies in the record counts can indicate data integrity issues.
  3. Data profiling: Record counts can be used to profile the data in the source and target systems, such as identifying the distribution of data by various dimensions or finding data anomalies.
  4. Data lineage: Record counts can help to establish data lineage by tracking the flow of data from the source to the target system. This can be useful for compliance and auditing purposes.

Overall, record counts play an important role in ensuring the accuracy, completeness, and integrity of data during migration and ingestion processes.

Key terms

  • Data Lineage
  • Data Profiling
  • Data Integrity
  • Step Functions
  • Lambda Function

Pseudo Steps to catpture the record counts:

To get the pre-migration and post-migration record counts from AWS DMS and store them in an Athena table, you can use the following steps:

  1. Configure AWS DMS to capture metrics and send them to CloudWatch.
  2. Create a CloudWatch log group and stream to store the metrics.
  3. Create a Lambda function to extract the pre-migration and post-migration record counts from the CloudWatch log group.
  4. Use Step Functions to orchestrate the AWS DMS migration, including invoking the Lambda function to get the record counts.
  5. Modify the Lambda function to write the pre-migration and post-migration record counts to an S3 bucket in CSV format.
  6. Create a Glue crawler to crawl the S3 bucket and create a table in the Glue Data Catalog.
  7. Query the Glue Data Catalog to create an Athena table based on the Glue table schema.

Here’s an example of how you can modify the Lambda function to write the pre-migration and post-migration record counts to an S3 bucket in CSV format:

Pseudo Code:

import boto3
import csv
from datetime import datetime, timedelta

s3 = boto3.client('s3')
client = boto3.client('logs')

def lambda_handler(event, context):

# Get the log group and log stream names from the event
log_group_name = event['logGroupName']
log_stream_name = event['logStreamName']

# Get the start and end times for the CloudWatch log events
start_time = datetime.utcnow() - timedelta(hours=2)
end_time = datetime.utcnow()

# Query the CloudWatch logs to get the migration metrics
response = client.filter_log_events(
    logGroupName=log_group_name,
    logStreamNames=[log_stream_name],
    startTime=int(start_time.timestamp() * 1000),
    endTime=int(end_time.timestamp() * 1000),
    filterPattern='MetricName="FullLoadRows"',
    interleaved=True
)

# Extract the pre-migration and post-migration record counts from the CloudWatch log events
pre_migration_count = 0
post_migration_count = 0
for event in response['events']:
    message = event['message']
    if 'Total changes applied by DMS' in message:
        pre_migration_count = int(message.split(':')[-1].strip())
    elif 'Total number of rows successfully loaded into' in message:
        post_migration_count = int(message.split(':')[-1].strip())

# Write the pre-migration and post-migration record counts to an S3 bucket in CSV format
bucket_name = 'my-s3-bucket'
file_name = 'migration_metrics.csv'
with open(file_name, mode='w', newline='') as csv_file:
    fieldnames = ['StartTime', 'EndTime', 'PreMigrationCount', 'PostMigrationCount']
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerow({
        'StartTime': start_time.isoformat(),
        'EndTime': end_time.isoformat(),
        'PreMigrationCount': pre_migration_count,
        'PostMigrationCount': post_migration_count
    })
s3.upload_file(file_name, bucket_name, file_name)

# Return the pre-migration and post-migration record counts
return {
    'PreMigrationCount': pre_migration_count,
    'PostMigrationCount': post_migration_count
}

This modified Lambda function writes the pre-migration and post-migration record counts to a CSV file and uploads it to an S3 bucket. The CSV file has four columns: StartTime, EndTime, PreMigrationCount, and PostMigrationCount.

The same code can be orchestrated to AWS Step function: You can check my gist: WSDMSRecordCount.json

Published inData WarehouseGlueLambdaMetastorePersonal PostsS3Technical Posts