Identity Resolution with Lambda, DynamoDB Streams and RDS Postgres
In my last post about Identity Resolution using Postgres, we saw how we can use Postgres Triggers and Functions to do Identity Resolution. But in practical scenarios the data about users profiles are stored in NoSQL databases such as Cassandra or DynamoDB because the profiles are schemaless (these profiles are created across multiple platforms and thus need not be consistent).
In this post, we are going to substitute triggers with aws lambda because the data is coming from a different aws service (dynamodb stream).
Step 1: Create DynamoDB instance with streams using Terraform.
Note the stream_view_type which is NEW_AND_OLD_IMAGES implying that the whenever a dynamodb record is updated both the old and new records are sent to the stream.
Step 2: Create VPC, Subnets, Security Groups and DB Parameter Group for RDS using Terraform.
Step 3: Create RDS Postgres Instance using Terraform.
Step 4: Create ECR repositories using Terraform.
Note that we are creating 2 ECR repositories because we have 2 lambda functions one for creating RDS tables and indexes and one for reading from dynamodb stream and updating RDS.
RDS instances are usually isolated in a VPC and it is not recommended that they are made publicly accessible outside the VPC. In general our APIs are run inside the same VPC as our RDS instance so that they are able to access the database.
Thus in order to create tables or insert test data into RDS we can either create an EC2 instance in the same VPC and make the EC2 accessible through security groups. Create a script in the EC2 instance and run it to insert mock data. This is inefficient as just to run a single Python script we have to create an EC2 instance.
Another better alternative is to use AWS Lambda.
Step 5: Python script with lambda handler to create RDS schema, tables, indexes and function for doing union find operation.
The function ‘get_profiles’ does union find operation using Breadth First Search over the adjacency_list table and returns all profile_ids discovered during the search.
rds_config.py is another file in the same directory as the above file containing the RDS instance credentials as constants.
Step 6: Write Dockerfile for creation of RDS tables and indexes.
Step 7: Login to AWS ECR and upload docker image to repository.
chmod +X docker_build_ecr.sh && sh docker_build_ecr.sh
Step 8: Setup AWS Lambda using Terraform.
The above Lambda function is to create the RDS tables only and not reading from the dynamodb stream.
The permissions required by lambda functions are as follows:
- Creating log group in cloudwatch so that logs can be seen there.
- Creating the lambda function in the same VPC we created earlier. For this we need to give NetworkInterface permissions.
- Reading from dynamodb streams as event source. (This is not really required by the current lambda function but in order to reuse code for this tutorial, I have merged the iam_policies).
Step 9: Invoke AWS Lambda to create RDS schema, tables, indexes and function for doing union find operation.
chmod +X update_lambda.sh && sh update_lambda.sh
Step 10: Python script with lambda handler to read from dynamodb stream and update inverted_index and adjacency_list tables in RDS.
Note that all the logs we are printing in the script can be verified in Cloudwatch Log Group for the lambda function.
The above script is very simplistic because we have omitted some complexities of matching which attributes to use for union find operation. In the above script by default all the attributes in the metadata field from dynamodb stream are considered but in real life, attributes such as names, addresses etc. needs to be omitted.
Step 12: Login to AWS ECR and upload docker image to repository.
Step 13: Setup AWS Lambda using Terraform.
Note that we need to add the dynamodb stream as an event source for this lambda function.
Step 14: Invoke AWS Lambda by mocking dynamodb stream and update RDS instance and return the identities.
Create a sample payload.json file. This data mocks the data coming from dynamodb stream.
Then create and run the below script.
Step 15: Testing end to end by uploading data to dynamodb.
Create a file called item.json with sample data for dynamodb table:
aws dynamodb put-item — table-name Profiles — item file://item.json — cli-binary-format raw-in-base64-out
After this, go to Cloudwatch Logs and look at the Log Group corresponding to lambda-stream. Pickup the latest log. It will have the result of the union find operation on the profiles graph.
Step 16: Tear down resources with Terraform.
Once our goal is accomplished we should be able to tear down all cloud resources as efficiently as possible.
terraform destroy
That’s all is required here :)
All the codes are available here.