The Data Guy

BackFill Failed Delivery From Kinesis To RedShift With Lambda

If you are dealing with the realtime data stream from Kinesis to RedShift, then you may face this situation where Redshift was down due to some maintenance activity and kinesis firehose was not able to ingest the data. But it has awesome features to retry after the next 60 Minutes. I had a situation that, there was a password change on the RedShift cluster on staging infra and we didn’t notice that the data load was failing. Now we need to backfill all those data into redshift using manifest files. One simple way, list all the files in errors manifest folders and generate the copy command and run those commands in loop or import as a .sql file. But I wanted to automate this process with lambda.

Why Lambda?

I know it has 15mins of total runtime, but if you have fewer files then its good to use lambda. It’s serverless this is the only reason I choose lambda. Alternatively, you can use shell scripts with aws cli or any other language to process this.

Solution Overview:

Lambda will put the failed deliveries manifest files in a directory called errors. The files are proper in a partitioned way error/manifest/yyyy/mm/dd/hh/

  1. Lambda should read all the manifest files for importing.(no manual process for mention the location every time)
  2. list-objects-v2 will not return more than 1000 files, so we used paginate to loop this and get all the objects.
  3. Redshift’s password is encrypted with KMS.
  4. Lambda needs psychopg2 to access Redshift, but the official version will not support redshift. We used a custom compiled version of psychopg2.
  5. Once you imported the data with a manifest file, the next execution should not load the same file again and again. So we are moving the file once it’s imported.
  6. I’m a great fan of track everything into a metadata table. So every import process will be tracked along with what COPY command it used.

Lambda Setup:

Variable NameValue
REDSHIFT_DATABASEYour database name
REDSHIFT_TABLEYour table name to import the data
REDSHIFT_USERRedshift User Name
REDSHIFT_PASSWDRedshift user’s password
REDSHIFT_PORTRedshift port
REDSHIFT_IAMROLEIAM role to access S3 inside Redshfit
SOURCE_BUCKETBucket name where you have the manifest file
SOURCE_PREFIXLocation of the error manifest files
TARGET_PREFIXLocation where to move the loaded manifest files
            "Version": "2012-10-17",
            "Statement": [
                    "Sid": "VisualEditor0",
                    "Effect": "Allow",
                    "Action": "kms:Decrypt",
                    "Resource": "<your-kms-arn>"

Code for the Function:

import os
import boto3
import psycopg2
import sys
from base64 import b64decode
from datetime import datetime

s3  = boto3.client('s3')
kms = boto3.client('kms')

# Get values from Env
DE_PASS = kms.decrypt(CiphertextBlob=b64decode(REDSHIFT_PASSWD))['Plaintext']

# Declare other parameters

# Define the Functions
Function 1: Get all manifest files
This fuction is written by alexwlchan
list_objects_v2  won't support more than 1000 files,
so it'll paginate to next 1000 and so on.

def get_matching_s3_objects(bucket, prefix="", suffix=""):
    paginator = s3.get_paginator("list_objects_v2")
    kwargs = {'Bucket': bucket}
    if isinstance(prefix, str):
        prefixes = (prefix, )
        prefixes = prefix

    for key_prefix in prefixes:
        kwargs["Prefix"] = key_prefix

        for page in paginator.paginate(**kwargs):
                contents = page["Contents"]
            except KeyError:

            for obj in contents:
                key = obj["Key"]
                if key.endswith(suffix):
                    yield obj

def get_matching_s3_keys(bucket, prefix="", suffix=""):
    for obj in get_matching_s3_objects(bucket, prefix, suffix):
        yield obj["Key"]

Function 2: Connection string for RedShift
Its using a custom complied psycopg2
def get_pg_con(
    return psycopg2.connect(dbname=dbname, 

Function 3: Main function
def run_handler(handler, context):
    all_files = get_matching_s3_keys(SOURCE_BUCKET, SOURCE_PREFIX)
    for file in all_files:
      source_file = {'Bucket': SOURCE_BUCKET,'Key': file}
      target_file = TARGET_PREFIX + str(file) + ".done"
      print (SOURCE_BUCKET)
      print (SOURCE_PREFIX)
      print (file)
      #Process starting here
      start_time = str(
      copy_command="COPY " + REDSHIFT_TABLE + " FROM 's3://" + SOURCE_BUCKET + "/" + file + "' iam_role '" + REDSHIFT_IAMROLE + "' MANIFEST json 'auto' GZIP;"
      conn = get_pg_con()
      cur = conn.cursor()
      #print (copy_command) - For debug 
      #Insert to History Table
      end_time = str(
      history_insert="insert into error_copy_history (TRIGGER_TIME,start_time,end_time,db_name,table_name,file) values ( '" + TRIGGER_TIME + "','"  + start_time + "','"  + REDSHIFT_DATABASE + "','"  + db_name + "','"  + table_name + "','s3://floweraura-rawdata/"  + file +"');"
      #Commit and Close
      #Move the files from Errors directory to processed directory
      s3.copy(source_file, SOURCE_BUCKET, target_file)
      print ("copied", file)
      print ("deleted", file)

How to Deploy it?

As I mentioned above, you need to use the custom complied psychopg2 which you can download from the below link.

I’m using Python 3.6 on Lambda. So download this repo and rename the psycopg2-3.6 to psycopg2. And then create a file with name and paste the above python code.

Now create a zip file with the psycopg2 and upload this file to lambda. In the lambda Handler use pgcode.run_handler

That’s it, your lambda function is ready, not really ready to execute.

Create the History Table:

To maintain this import process in a table, we need to create a table in RedShift.

    CREATE TABLE error_copy_history 
         pid          INT IDENTITY(1, 1), 
         trigger_time DATETIME, 
         start_time   DATETIME, 
         end_time     DATETIME, 
         db_name      VARCHAR(100), 
         table_name   VARCHAR(100), 
         FILE         VARCHAR(65000) 

Run the Function:

Im giving my s3 path and lambda environment variables here for your reference.

Once the execution was done, you can see the load history from the History table.

    bhuvi=# select * from error_copy_history limit 1;
    pid          | 260
    trigger_time | 2019-10-17 08:14:23.495213
    start_time   | 2019-10-17 08:14:24.59309
    end_time     | 2019-10-17 08:14:24.917248
    db_name      | bhuvi
    table_name   | s3cp
    file         | s3://bhuvi-datalake/errors/manifests/2019/09/26/13/collect-redshift-2019-09-26-13-21-49-56371982-2375-4b28-8d79-45f01952667e

Further Customization:

  1. I ran this on Ad-Hoc basis, but if you want to run this automatically, then use Cloudwatch triggers to trigger this on daily or any N internal.
  2. I used KMS for encrypting the password, you can use IAM temporary credentials also.
  3. My S3 data is compressed and JSON format. If you have different file format and compression then modify your COPY command in the python code.
  1. Get the email notification when kinesis failed to import the data into RedShift.
  2. List keys in S3 more than 1000 with list-objects-v2 include Prefix and Suffix.
  3. Psycopg2 - custom compiler for Python 2.7, 3.6, 3.7
· aws, kinesis, firehose, redshift, lambda, python


Loading Comments