BackFill Failed Delivery From Kinesis To RedShift With Lambda
If you are dealing with the realtime data stream from Kinesis to RedShift, then you may face this situation where Redshift was down due to some maintenance activity and kinesis firehose was not able to ingest the data. But it has awesome features to retry after the next 60 Minutes. I had a situation that, there was a password change on the RedShift cluster on staging infra and we didn’t notice that the data load was failing. Now we need to backfill all those data into redshift using manifest files. One simple way, list all the files in errors manifest folders and generate the copy command and run those commands in loop or import as a
.sql file. But I wanted to automate this process with lambda.
I know it has 15mins of total runtime, but if you have fewer files then its good to use lambda. It’s serverless this is the only reason I choose lambda. Alternatively, you can use shell scripts with
aws cli or any other language to process this.
Lambda will put the failed deliveries manifest files in a directory called
errors. The files are proper in a partitioned way
- Lambda should read all the manifest files for importing.(no manual process for mention the location every time)
- list-objects-v2 will not return more than 1000 files, so we used
paginateto loop this and get all the objects.
- Redshift’s password is encrypted with KMS.
- Lambda needs
psychopg2to access Redshift, but the official version will not support redshift. We used a custom compiled version of psychopg2.
- Once you imported the data with a manifest file, the next execution should not load the same file again and again. So we are moving the file once it’s imported.
- I’m a great fan of track everything into a metadata table. So every import process will be tracked along with what
COPYcommand it used.
- If you are thinking to launch lambda outside the VPC, then please don’t consider this blog.
- lambda needs to access KMS to decrypt the password. Its mandatory that the subnets which you are going to use launch the Lambda should have the NAT Gateway.
- Create a KMS key for the region where you are going to create this lambda function.
- Add the following variables in Lambda’s environment variables.
|REDSHIFT_DATABASE||Your database name|
|REDSHIFT_TABLE||Your table name to import the data|
|REDSHIFT_USER||Redshift User Name|
|REDSHIFT_PASSWD||Redshift user’s password|
|REDSHIFT_IAMROLE||IAM role to access S3 inside Redshfit|
|SOURCE_BUCKET||Bucket name where you have the manifest file|
|SOURCE_PREFIX||Location of the error manifest files|
|TARGET_PREFIX||Location where to move the loaded manifest files|
- For this blog, I just encrypt the password only. Under encryption, configuration checks the
Enable helpers for encryption in transitand
Use a customer master key. Choose the KMS key which you created for this.
- Then you can see a button called
Encrypton all the environment variables. Just click encrypt on the password variable.
- Lambda’s IAM role should have the predefined policy
AWSLambdaBasicExecutionRoleand one custom policy to access the KMS for decrypting it.
- From the network choose the VPC and subnets(must be attached with NAT/NAT Gateway) and a security group where all traffic allowed to its own ID.
- Make sure the redshift cluster’s security group should accept the connections from the lambda subnet IP range.
128MBMemory fine for me, but this memory and the timeout can be configured as per your workload.
Code for the Function:
How to Deploy it?
As I mentioned above, you need to use the custom complied psychopg2 which you can download from the below link.
Python 3.6 on Lambda. So download this repo and rename the
psycopg2. And then create a file with name
pgcode.py and paste the above python code.
Now create a zip file with the
pgcode.py upload this file to lambda. In the
lambda Handler use
That’s it, your lambda function is ready, not really ready to execute.
Create the History Table:
To maintain this import process in a table, we need to create a table in RedShift.
Run the Function:
Im giving my s3 path and lambda environment variables here for your reference.
- S3 bucket -
bhuvi-datalakeHere Im storing all the kinesis data.
- S3 prefix -
kinesis/errors/Failed manifest files will go to this path (eg:
- Target prefix -
kinesis/processed/Once the data imported to Redshift the loaded manifest file will go to this location.
Once the execution was done, you can see the load history from the History table.
- I ran this on Ad-Hoc basis, but if you want to run this automatically, then use Cloudwatch triggers to trigger this on daily or any N internal.
- I used KMS for encrypting the password, you can use IAM temporary credentials also.
- My S3 data is compressed and JSON format. If you have different file format and compression then modify your COPY command in the python code.
Related interesting Reading:
- Get the email notification when kinesis failed to import the data into RedShift.
- List keys in S3 more than 1000 with list-objects-v2 include Prefix and Suffix.
- Psycopg2 - custom compiler for Python 2.7, 3.6, 3.7