Setting up Real-Time MySQL Data Synchronization with OpenSearch
Last updated: June 13, 2025
This guide explains how to set up real-time data synchronization between MySQL databases and OpenSearch for enhanced search performance in your applications.
STEP-BY-STEP: AWS CONSOLE SETUP
🔹 Step 1: Enable CDC on Aurora MySQL
Go to RDS > Databases > Your Aurora Instance
Open Configuration, locate Cluster parameter group
IfÂ
default.aurora-mysql5.x, go to Parameter groups → create a new cluster parameter groupEdit parameters:
binlog_format = ROWbinlog_row_image = FULL
Attach this new parameter group to your DB cluster
Reboot the DB cluster to apply changes
Set binlog retention:
sqlCopyEdit
CALL mysql.rds_set_configuration('binlog retention hours', 24);
🔹 Step 2: Create IAM Role for Lambda
Go to IAM > Roles > Create Role
Select Lambda
Attach policies:
AmazonDMSRedshiftS3Role (or custom for DMS access)AmazonOpenSearchServiceFullAccess (or scoped)CloudWatchLogsFullAccess
Name it:Â
lambda-opensearch-transform-role
🔹 Step 3: Create the Lambda Function
Go to Lambda > Create function
Name:Â
transform-and-index-opensearchRuntime: Python 3.12 (or Node.js)
Choose existing role →Â
lambda-opensearch-transform-roleDeploy your transformation logic
Example handler:
import json
import base64
import os
import requests
OPENSEARCH_URL = os.environ.get("OPENSEARCH_URL") # e.g. https://your-opensearch-host:9200
# AUTH_USER = os.environ.get("OPENSEARCH_USER") # basic auth user
# AUTH_PASS = os.environ.get("OPENSEARCH_PASS") # basic auth pass
def lambda_handler(event, context):
headers = {
"Content-Type": "application/json"
}
for record in event['Records']:
try:
# Decode base64 Kinesis data
payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
# print(payload)
message = json.loads(payload)
# print(f"Message: {message}")
table = message['metadata']['table-name'].lower()
operation = message['metadata']['operation']
doc = message.get('data', {})
if table == 'invoice':
doc_id = doc.get('INVOICE_ID')
# Skip if no ID
if not doc_id:
print("doc_id is empty")
continue
print(f"Table name: {table}")
print(f"Operation: {operation}")
index_name = f"app_{table}"
# Handle insert/update
if operation in ['insert', 'update', 'load']:
url = f"{OPENSEARCH_URL}/{index_name}/_doc/{doc_id}"
print("Making API Call")
# response = requests.put(url, headers=headers, auth=(AUTH_USER, AUTH_PASS), data=json.dumps(doc))
response = requests.put(url, headers=headers, data=json.dumps(doc))
print(f"[{operation.upper()}] {url} => {response.status_code} {response.text}")
# Handle delete
elif operation == 'delete':
url = f"{OPENSEARCH_URL}/{index_name}/_doc/{doc_id}"
response = requests.delete(url, headers=headers, auth=(AUTH_USER, AUTH_PASS))
print(f"[DELETE] {url} => {response.status_code} {response.text}")
except Exception as e:
print(f"Error processing record: {e}")
continue
🔹 Step 4: Create DMS Subnet Group
Go to DMS > Subnet Groups
Click Create
Name:Â
dms-subnet-groupSelect VPC
Choose private subnets with access to Aurora & Lambda
🔹 Step 5: Create DMS Replication Instance
Go to DMS > Replication instances > Create
Name:Â
dms-replication-instanceInstance Class:Â
dms.t3.medium or higherChoose the VPC &Â
dms-subnet-groupAttach security group that allows MySQL (3306) & Lambda access
🔹 Step 6: Create DMS Source Endpoint (MySQL)
Go to DMS > Endpoints > Create
Endpoint type:Â Source
Engine:Â MySQL
Provide:
Host, Port, DB Name
Username:Â
dms_userPassword
Test the connection (ensure connectivity & permissions)
🔹 Step 7: Create DMS Target Endpoint (Lambda)
💡 Lambda is not a native DMS target. Use Kinesis or S3 with Lambda trigger OR create a custom endpoint.
Options:
Create Kinesis or S3 endpoint as Target
Set up Lambda trigger on those services
OR use custom DMS target plugin (advanced)
🔹 Step 8: Create DMS Replication Task
Go to DMS > Tasks > Create
Name:Â
mysql-to-opensearch-syncSource: Your MySQL/Aurora endpoint
Target: Your S3/Kinesis/Lambda proxy
Migration type: "CDC only" or "Full load + CDC"
Enable CloudWatch logs
🔹 Step 9: OpenSearch Setup (Duplo or Self-Managed)
Create 7 indexes:
app_<TABLE_NAME>
Define index templates for each:
Mappings for ID fields, timestamps, status, etc.
Create ILM policies for data retention
Create an OpenSearch user/role with restricted access toÂ
app_*Â indexes
🔹 Step 10: Monitoring
Go to CloudWatch > Log groups
Check logs for Lambda, DMS task
Create a CloudWatch dashboard:
Metrics for Lambda errors, invocation count
DMS replication latency