AWS Lambda & S3| Automate CSV File Processing From S3 Bucket And Push In DynamoDB Using Lambda [Python]
Play this article
In this blog we are going to pick CSV file from S3 bucket once it is created/uploaded, process the file and push it to DynamoDB table.
Create Role For Lambda
- Create policy mentioned below.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "dynamodb:CreateTable", "s3:PutObject", "s3:GetObject", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:UpdateTable" "logs:CreateLogDelivery", "logs:PutMetricFilter", "logs:CreateLogStream", "logs:GetLogRecord", "logs:DeleteLogGroup", "logs:GetLogEvents", "logs:FilterLogEvents", "logs:GetLogGroupFields", "logs:CreateLogGroup", "logs:DeleteLogStream", "logs:GetLogDelivery", "logs:PutLogEvents" ], "Resource": "*" } ] }
- Now create new role for lambda and attach this policy to the role.
Create S3 Bucket And Attach Tags
Creates a new S3 bucket. To create a bucket, you must register with Amazon S3 and have a valid Amazon Web Services Access Key ID to authenticate requests. Anonymous requests are never allowed to create buckets. By creating the bucket, you become the bucket owner.
- Lets import boto3 module
import boto3
- We will invoke the client for S3
client = boto3.client('s3')
- Now we will use input() to take bucket name to be create as user input and will store in variable "bucket_name".
Note:- Make sure to check the bucket naming rules herebucket_name=str(input('Please input bucket name to be created: '))
- Goto link where you will find all arguments list. Based on your requirement you can put this arguments to list your S3 buckets. This document also mentions datatype of the parameter.
Note:-Bucket Name argument is mandatory and bucket name should be uniqueresponse1 = client.create_bucket( ACL='public-read-write', Bucket=bucket_name )
- Now we will use input() to confirm if user wants to go ahead with bucket tagging via user input and will store it in variable "tag_resp".
tag_resp=str(input('Press "y" if you want to tag your bucket?: '))
- Now we will use if condition and take user input for tags which needs to be defined for bucket.
We will store tag key in variable "tag_key" and tag value in "tag_value". To add tag to bucket we are going to use put_bucket_tagging() method, make sure to check official documentation here In method parameters we are passing variable as "bucket_name","tag_key","tag_value".
To view entire github code please click hereif tag_resp == 'y': tag_key=str(input("Please enter key for the tag: ")) tag_value = str(input("Please enter value for the tag: ")) response2 = client.put_bucket_tagging( Bucket=bucket_name, Tagging={ 'TagSet': [ { 'Key': tag_key, 'Value': tag_value } ] })
Create DynamoDB Table
- Python code in one module gains access to the code in another module by the process of importing it. The import statement combines two operations it searches for the named module, then it binds the results of that search to a name in the local scope.
import boto3
- We will invoke the resource for DyanamoDB.
dynamodb = boto3.resource('dynamodb')
- We will use create_table() function to create table in Dynamo DB with following arguments listed below. Here we will see 2 examples one with "primary keys only" and another with "primary key and sort key". You can find official documentation here.
Example1:- Below code is to create table with primary key only
You can find working code for example in Git Repo heretable = dynamodb.create_table( TableName='user', KeySchema=[ { 'AttributeName': 'id', 'KeyType': 'HASH' #Partition Key Only } ], AttributeDefinitions=[ { 'AttributeName': 'id', 'AttributeType': 'S' } ], ProvisionedThroughput={ 'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1 }, )
Lambda Function To Read CSV File From S3 Bucket And Push Into DynamoDB Table
- Goto Lambda console and click on create function
- Select "Author From Scratch" , Function name = csv_s3_Lambda, Runtime= Python and role we created with above policy attached to this blog and click on create function.
- Goto code editor and start writing the code.
- We will import 3 modules
import boto3
- We will invoke the client for S3 and resource for dynamodb
s3_client = boto3.client('s3') dynamodb = boto3.resource('dynamodb')
- First we will fetch bucket name from event json object
def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name']
- Now we will fetch file name which is uploaded in s3 bucket from event json object
def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] csv_file_name = event['Records'][0]['s3']['object']['key']
- We will call now get_object() function to Retrieves objects from Amazon S3. To use GET , you must have READ access to the object. If you grant READ access to the anonymous user, you can return the object without using an authorization header. You can view this function official documentation here
def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] csv_file_name = event['Records'][0]['s3']['object']['key'] csv_object = s3_client.get_object(Bucket=bucket,Key=csv_file_name)
- Lets decode the json object returned by function which will return string
def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] csv_file_name = event['Records'][0]['s3']['object']['key'] csv_object = s3_client.get_object(Bucket=bucket,Key=csv_file_name) file_reader = csv_object['Body'].read().decode("utf-8")
- Use Split() which will split a string into a list where each word is a list item. Make sure to check official documentation here
def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] csv_file_name = event['Records'][0]['s3']['object']['key'] csv_object = s3_client.get_object(Bucket=bucket,Key=csv_file_name) file_reader = csv_object['Body'].read().decode("utf-8") users = file_reader.split("\n")
- We will use filter() method which filters the given sequence with the help of a function that tests each element in the sequence to be true or not.. Make sure to check official documentation here
def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] csv_file_name = event['Records'][0]['s3']['object']['key'] csv_object = s3_client.get_object(Bucket=bucket,Key=csv_file_name) file_reader = csv_object['Body'].read().decode("utf-8") users = file_reader.split("\n") users = list(filter(None, users))
- Now we will traverse through the list pick elements one by one and push it to dynamodb table using table.put_item() . You can find official documentation here.
To view entire github code please click heredef lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] csv_file_name = event['Records'][0]['s3']['object']['key'] csv_object = s3_client.get_object(Bucket=bucket,Key=csv_file_name) file_reader = csv_object['Body'].read().decode("utf-8") users = file_reader.split("\n") users = list(filter(None, users)) for user in users: user_data = user.split(",") table.put_item(Item = { "id" : user_data[0], "name" : user_data[1], "salary" : user_data[2] }) return 'success'
Set Event For S3 bucket
- Open Lambda function and click on add trigger
- Select S3 as trigger target and select the bucket we have created above and select event type as "PUT" and add suffix as ".csv" Click on Add.
Create CSV File And Upload It To S3 Bucket
- Create .csv file with below data
1,ABC,200 2,DEF,300 3,XYZ,400
- Now upload this file to S3 bucket and it will process the data and push this data to DynamoDB.
Youtube Tutorial
Resource Cleanup
- Delete Lambda Function
- Delete DynamoDB Table
- Delete S3 Bucket Object First And Then Bucket
- Delete Lambda Role
Conclusion
In this blog we are going to pick CSV file from S3 bucket once it is created, process the file and push it to DynamoDB table.
Stay tuned for my next blog.....
So, did you find my content helpful? If you did or like my other content, feel free to buy me a coffee. Thanks.
ย