Setting up Real-Time MySQL Data Synchronization with OpenSearch

Last updated: June 13, 2025

This guide explains how to set up real-time data synchronization between MySQL databases and OpenSearch for enhanced search performance in your applications.

STEP-BY-STEP: AWS CONSOLE SETUP


🔹 Step 1: Enable CDC on Aurora MySQL

  1. Go to RDS > Databases > Your Aurora Instance

  2. Open Configuration, locate Cluster parameter group

  3. If default.aurora-mysql5.x, go to Parameter groups → create a new cluster parameter group

  4. Edit parameters:

    • binlog_format = ROW

    • binlog_row_image = FULL

  5. Attach this new parameter group to your DB cluster

  6. Reboot the DB cluster to apply changes

  7. Set binlog retention:

    sql

    CopyEdit

    CALL mysql.rds_set_configuration('binlog retention hours', 24);


🔹 Step 2: Create IAM Role for Lambda

  1. Go to IAM > Roles > Create Role

  2. Select Lambda

  3. Attach policies:

    • AmazonDMSRedshiftS3Role (or custom for DMS access)

    • AmazonOpenSearchServiceFullAccess (or scoped)

    • CloudWatchLogsFullAccess

  4. Name it: lambda-opensearch-transform-role


🔹 Step 3: Create the Lambda Function

  1. Go to Lambda > Create function

  2. Name: transform-and-index-opensearch

  3. Runtime: Python 3.12 (or Node.js)

  4. Choose existing role → lambda-opensearch-transform-role

  5. Deploy your transformation logic

Example handler:

import json
import base64
import os
import requests

OPENSEARCH_URL = os.environ.get("OPENSEARCH_URL")  # e.g. https://your-opensearch-host:9200
# AUTH_USER = os.environ.get("OPENSEARCH_USER")      # basic auth user
# AUTH_PASS = os.environ.get("OPENSEARCH_PASS")      # basic auth pass

def lambda_handler(event, context):
    headers = {
        "Content-Type": "application/json"
    }

    for record in event['Records']:
        try:
            # Decode base64 Kinesis data
            payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            # print(payload)
            message = json.loads(payload)
            
            # print(f"Message: {message}")

            table = message['metadata']['table-name'].lower()
            operation = message['metadata']['operation']
            doc = message.get('data', {})
            if table == 'invoice':
                doc_id = doc.get('INVOICE_ID')

            # Skip if no ID
            if not doc_id:
                print("doc_id is empty")
                continue

            print(f"Table name: {table}")
            print(f"Operation: {operation}")

            index_name = f"app_{table}"

            # Handle insert/update
            if operation in ['insert', 'update', 'load']:
                url = f"{OPENSEARCH_URL}/{index_name}/_doc/{doc_id}"
                print("Making API Call")
                # response = requests.put(url, headers=headers, auth=(AUTH_USER, AUTH_PASS), data=json.dumps(doc))
                response = requests.put(url, headers=headers, data=json.dumps(doc))
                print(f"[{operation.upper()}] {url} => {response.status_code} {response.text}")

            # Handle delete
            elif operation == 'delete':
                url = f"{OPENSEARCH_URL}/{index_name}/_doc/{doc_id}"
                response = requests.delete(url, headers=headers, auth=(AUTH_USER, AUTH_PASS))
                print(f"[DELETE] {url} => {response.status_code} {response.text}")

        except Exception as e:
            print(f"Error processing record: {e}")
            continue


🔹 Step 4: Create DMS Subnet Group

  1. Go to DMS > Subnet Groups

  2. Click Create

    • Name: dms-subnet-group

    • Select VPC

    • Choose private subnets with access to Aurora & Lambda


🔹 Step 5: Create DMS Replication Instance

  1. Go to DMS > Replication instances > Create

  2. Name: dms-replication-instance

  3. Instance Class: dms.t3.medium or higher

  4. Choose the VPC & dms-subnet-group

  5. Attach security group that allows MySQL (3306) & Lambda access


🔹 Step 6: Create DMS Source Endpoint (MySQL)

  1. Go to DMS > Endpoints > Create

  2. Endpoint type: Source

  3. Engine: MySQL

  4. Provide:

    • Host, Port, DB Name

    • Username: dms_user

    • Password

  5. Test the connection (ensure connectivity & permissions)


🔹 Step 7: Create DMS Target Endpoint (Lambda)

💡 Lambda is not a native DMS target. Use Kinesis or S3 with Lambda trigger OR create a custom endpoint.

Options:

  • Create Kinesis or S3 endpoint as Target

  • Set up Lambda trigger on those services

  • OR use custom DMS target plugin (advanced)


🔹 Step 8: Create DMS Replication Task

  1. Go to DMS > Tasks > Create

  2. Name: mysql-to-opensearch-sync

  3. Source: Your MySQL/Aurora endpoint

  4. Target: Your S3/Kinesis/Lambda proxy

  5. Migration type: "CDC only" or "Full load + CDC"

  6. Enable CloudWatch logs


🔹 Step 9: OpenSearch Setup (Duplo or Self-Managed)

  1. Create 7 indexes:

    • app_<TABLE_NAME>

  2. Define index templates for each:

    • Mappings for ID fields, timestamps, status, etc.

  3. Create ILM policies for data retention

  4. Create an OpenSearch user/role with restricted access to app_* indexes


🔹 Step 10: Monitoring

  1. Go to CloudWatch > Log groups

    • Check logs for Lambda, DMS task

  2. Create a CloudWatch dashboard:

    • Metrics for Lambda errors, invocation count

    • DMS replication latency