HOWTO: Lambda with S3 trigger using AWS SDK and python

In this tutorial, you will use the AWS SDK and Python to create a Lambda function and configure a trigger for an Amazon Simple Storage Service (Amazon S3) bucket. Every time you add an object to your Amazon S3 bucket, your lambda function runs and outputs the object type to Amazon CloudWatch Logs.

You can use a Lambda function with an Amazon S3 trigger to perform many types of file processing tasks. For example, you can use a Lambda function to create a thumbnail whenever an image file is uploaded to your Amazon S3 bucket, or to convert uploaded documents into different formats. 

AWS Services Used:
  • Amazon S3
  • AWS IAM
  • AWS Lambda
  • AWS CloudWatch Logs

To complete this tutorial, you perform the following steps:

  1. Review IAM user permissions
  2. Upload an object to your S3 bucket
  3. Create IAM permissions policy
  4. Create IAM execution role for your lambda function
  5. Create a Lambda function that returns the object type of objects in an Amazon S3 bucket.
  6. Deploy the Lambda function
  7. Configure a Lambda trigger that invokes your function when objects are uploaded to your bucket.
  8. Test your function using the trigger.
  9. Clean up resources
Prerequisites

Follow the instructions in the previous tutorial to set up users and create bucket. After you have created a bucket, follow the steps below:

Assign permissions to your IAM user

Your IAM user requires the following policies:

  • AmazonS3FullAccess – Grants permissions to all Amazon S3 actions, including permissions to create and use an Object Lambda Access Point. 
  • AWSLambda_FullAccess – Grants permissions to all Lambda actions. 
  • IAMFullAccess – Grants permissions to all IAM actions. 
  • IAMAccessAnalyzerReadOnlyAccess – Grants permissions to read all access information provided by IAM Access Analyzer. 
  • CloudWatchLogsFullAccess – Grants full access to CloudWatch Logs.

In the previous tutorial, you provided Full Administrative Access to ALL AWS services, hence you don’t need the above IAM policies for this tutorial but the above is a good security practice and ought to replace the Full Administrative Access in production workloads.

Upload an image file to your bucket ‘vibstest’

For this tutorial to work, in your s3 bucket, uncheck Block Public access under permissions tab

import boto3
from botocore.exceptions import ClientError
import logging
import os

def upload_file(file_name, bucket, object_name=None):

    print('***************upload_file *****************')
    # If S3 object_name was not specified, use file_name
    if object_name is None:
        object_name = os.path.basename(file_name)

    # Upload the file
    s3_client = boto3.client('s3')
    try:
        response = s3_client.upload_file(file_name, bucket, object_name)
    except ClientError as e:
        logging.error(e)
        return False
    return True

#upload a file to the bucket
response = upload_file(file_name,bucket)
print(response)

#call the function
upload_file('vacation.jpg', 'vibstest')
Create IAM permissions policy

Before you can create an execution role for you Lambda function, you first create a permissions policy to give your function permission to access the required AWS resources. For this tutorial, the policy allows Lambda to get objects from an Amazon S3 bucket and to write to Amazon CloudWatch Logs.

import boto3
from botocore.exceptions import ClientError
import logging
import os
import json

def create_iam_policy(policy_name):

        print('***************create_iam_policy *****************')
        my_iam_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "logs:PutLogEvents",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream"
                ],
                "Resource": "arn:aws:logs:*:*:*"
            },
            {
                "Effect": "Allow",
                "Action": ["s3:GetObject"],
                "Resource": "arn:aws:s3:::*/*"
            }
            ]
        }
        
        # Convert the policy from JSON dict to string
        my_iam_policy = json.dumps(my_iam_policy)
        print(my_iam_policy)

        iam = boto3.client('iam')

        # Set the new policy
        try:
            response = iam.create_policy(PolicyName=policy_name, PolicyDocument=my_iam_policy)
            print(response)
        except ClientError as e:
             logging.error(e)
             return False
        return True

#call the function
print(create_iam_policy('s3-trigger-vibs'))
Create IAM execution role

An execution role is an AWS Identity and Access Management (IAM) role that grants a Lambda function permission to access AWS services and resources. To enable your function to get objects from an Amazon S3 bucket, you attach the permissions policy you created in the previous step.

import boto3
from botocore.exceptions import ClientError
import logging
import os
import json

def create_lambda_execution_role(rolename):
         iam = boto3.client('iam')

         print('***************create_lambda_execution_role *****************')
         trust_policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Action": ["sts:AssumeRole"],
                        "Principal": {"Service": "lambda.amazonaws.com"}
                    }
                ]
            }
         
         # Convert the policy from JSON dict to string
         trust_policy = json.dumps(trust_policy)

         try:
            #create role
            response = iam.create_role(RoleName=rolename,
                                        AssumeRolePolicyDocument=trust_policy)
            print(response)
            
            #attach the iam policy to this newly created iam role.
            response = iam.attach_role_policy(
                    PolicyArn='arn:aws:iam::111111222222:policy/s3-trigger-vibs',
                    RoleName=rolename,
                )
            print(response)
         except ClientError as e:
             logging.error(e)
             return False
         return True

#call the function
print(create_lambda_execution_role('lambda-s3-trigger-role'))
Create the function code

Your Lambda function will retrieve the key name of the uploaded object and the name of the bucket from the event parameter it receives from Amazon S3.

import urllib.parse
import boto3
from botocore.exceptions import ClientError
import logging
import os
import json

print('Loading function')

s3 = boto3.client('s3')

def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))

    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    try:
        response = s3.get_object(Bucket=bucket, Key=key)
        print("CONTENT TYPE: " + response['ContentType'])
        return response['ContentType']
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

Save the above in a new python file and name it test.py
Zip the file and upload to s3

zip function.zip test.py  #i named the zip file function.zip

Upload the function.zip file to your bucket

#call the function
upload_file('function.zip', 'vibstest')
Deploy the Lambda function
import boto3
from botocore.exceptions import ClientError
import logging

def create_lambda_function(functionName):
     
    lam = boto3.client(‘lambda')
    print('***************create_lambda_function *****************')

    try:
        response =  lam.create_function(
            FunctionName=functionName,
            Runtime='python3.11',
            Handler=‘test.lambda_handler’, #test is the name of the file with lambda_handler function
            Code={'S3Bucket': 'vibstest', 'S3Key': 'function.zip'},
            Role='arn:aws:iam::111111222222:role/lambda-s3-trigger-role'
        )
        print(response)
    except ClientError as e:
        logging.error(e)
        return False
    return True

#call the function
print(create_lambda_function('vibsfunction'))
Create the Amazon S3 trigger

Now that you have deployed your function code above, you create the Amazon S3 trigger that will invoke your function.

import boto3
from botocore.exceptions import ClientError
import logging


def create_trigger_s3(bucketName):
    s3 = boto3.client('s3')

    print('***************create_trigger_s3 *****************')
    #first add permissions on the lambda to allow S3 to invoke the lambda function
    lam = boto3.client('lambda')
    response = lam.add_permission(
        FunctionName='vibsfunction',
        StatementId='1',
        Action='lambda:InvokeFunction',
        Principal='s3.amazonaws.com',
        SourceArn='arn:aws:s3:::vibstest',
        SourceAccount='111111222222'
    )
    # create s3 notification for lambda function
    try:
        response = s3.put_bucket_notification_configuration(
            Bucket=bucketName,
            NotificationConfiguration= {
                        'LambdaFunctionConfigurations':
                            [{'LambdaFunctionArn': 'arn:aws:lambda:us-east-1:111111222222:function:vibsfunction', 
                            'Events': ['s3:ObjectCreated:*']}]
                    }
                    )
        print(response)
    except ClientError as e:
        logging.error(e)
        return False
    return True

#call the function
print(create_trigger_s3('vibstest'))
Test the Lambda function with the Amazon S3 trigger
def check_logs():

    print('***************check_logs function*****************')
    client = boto3.client('logs')

    ## For the latest
    response = client.describe_log_streams(
        logGroupName="/aws/lambda/vibsfunction", 
        orderBy='LastEventTime'
        )

    for log_stream in response['logStreams']:  
        latestlogStreamName = log_stream['logStreamName']
        print('logstream=========',latestlogStreamName)

        response = client.get_log_events(
             logGroupName='/aws/lambda/vibsfunction',
             logStreamName=latestlogStreamName
        )
        #print(response)

        for event in response['events']:
            #print(event)
            print(event['message'])
def test_lambda():

    print('***************test_lambda *****************')
    #upload a file to the bucket
    response = upload_file('vacation.jpg','vibstest')
    print(response)

    #check logs to see if the function ran successfully and the content type of the image uploaded
    check_logs()
#call the function
test_lambda()

Verify correct operation using CloudWatch Logs
Running the test_lambda() function should show similar results. Please note the ‘Loading function’ and CONTENT TYPE shows that the lambda function executed successfully.

logstream========= 2023/12/27/[$LATEST]8800b75ff2594696aad710da757a5f62
INIT_START Runtime Version: python:3.11.v25     Runtime Version ARN: arn:aws:lambda:us-east-1::runtime:a3304f2b48f740276b97ad9c52a9cc36a0bd9b44fecf74d0f1416aafb74e92fc

Loading function

START RequestId: 8549ec7a-f994-4cd7-9be6-ba7968edd8fc Version: $LATEST

CONTENT TYPE: binary/octet-stream

END RequestId: 8549ec7a-f994-4cd7-9be6-ba7968edd8fc

REPORT RequestId: 8549ec7a-f994-4cd7-9be6-ba7968edd8fc  Duration: 289.86 ms     Billed Duration: 290 ms Memory Size: 128 MB     Max Memory Used: 83 MB  Init Duration: 483.76 ms   
Clean up your resources
  1. Delete the Lambda function
def delete_lambda_function(functionName):
     
     print('***************delete_lambda_function function*****************')
     lam = boto3.client('lambda')

     try:
        response = lam.delete_function(
            FunctionName=functionName
        )
        print(response)
        print('function deleted')
     except ClientError as e:
        logging.error(e)
        return False
     return True

2. Delete the IAM policy

def delete_iam_policy(rolename, policyarn):

    print('***************delete_iam_policy *****************')
    iam = boto3.client('iam')

    #first have to detach the policy
    response = iam.detach_role_policy(
                RoleName=rolename,
                PolicyArn=policyarn
            )
    print(response)

    #delete the policy
    response = iam.delete_policy(
                PolicyArn=policyarn
            )
    print(response)

3. Delete the lambda execution role

def delete_lambda_execution_role(rolename):

    print('***************delete_lambda_execution_role *****************')
    iam = boto3.client('iam')
    response = iam.delete_role(RoleName=rolename)
    print(response)

4. Delete the S3 bucket and all of its contents

def delete_bucket(bucket_name):

    print('***************delete_bucket *****************')
    s3 = boto3.client('s3')

    #list all the contents in the bucket
    response = s3.list_objects_v2(Bucket=bucket_name)

    #delete each object
    for object in response['Contents']:
        print('Deleting', object['Key'])
        s3.delete_object(Bucket=bucket_name, Key=object['Key'])

    try:
        s3.delete_bucket(Bucket=bucket_name)
    except ClientError as e:
        logging.error(e)
        return False
    return True

5. Run the above functions
Replace the variable names as needed

def clean_up():

    print('***************clean_up function*****************')
    
    bucketname='vibstest'
    lambdafunctionname='vibsfunction'
    policyarn='arn:aws:iam::111111222222:policy/s3-trigger-vibs'
    rolename='lambda-s3-trigger-role'

    #delete lambda function 'vibsfunction'
    print(delete_lambda_function(lambdafunctionname))

    #detach and delete iam policy 's3-trigger-vibs'
    delete_iam_policy(rolename,policyarn)

    #delete lambda execution role 'lambda-s3-trigger-role'
    delete_lambda_execution_role(rolename)
    
    #delete bucket and its contents 'vibstest'
    delete_bucket(bucketname)

#call the function
clean_up()

Note: You may also delete the test user. 

References: