Use AWS Lambda to Forward Event Notifications
    • Dark
      Light

    Use AWS Lambda to Forward Event Notifications

    • Dark
      Light

    Article Summary

    If you use Amazon S3, you may have integrations that run in AWS that rely on event notifications through Amazon SNS topics or Amazon SQS queues. This tutorial shows you how to deploy a serverless function in AWS Lambda to forward incoming event notification messages from Backblaze B2 Cloud Storage to an Amazon SNS topic or an Amazon SQS queue (depending on which of those options you choose) using the same message structure as Amazon S3. This approach may allow you to migrate data from Amazon S3 to Backblaze B2 while making few or no changes to your integrations in AWS.

    Note
    The code in this tutorial assumes that all AWS resources are located in the same region. As you create each resource, verify that you are in the same AWS region (this is shown in the upper-right corner of the AWS console).
    Note
    This feature is currently in private preview. For information about enabling this feature, see Automate Your Data Workflows With Backblaze B2 Event Notifications.

    Create an Amazon SNS Topic

    1. Sign in to the AWS console, and go to the Amazon SNS Topics page.
    2. Create an SNS topic.
      1. Click Create topic.
      2. For purposes of this guide, select Standard as the type since it allows a wider range of subscription protocols.
      3. Enter a topic name.
      4. Optionally, enter a display name.
      5. Scroll to the bottom of the page, and click Create topic.
      6. Copy the topic's ARN value, and save it for use in another step.
    3. Create a subscription.
      1. Click Create subscription.
      2. For purposes of this guide, select Email as the protocol.
      3. Enter your email address as the endpoint.
      4. Click Create subscription.
    4. Check your email inbox for the SNS subscription confirmation email.
    5. Click Confirm subscription in the email.

    An email is sent to you whenever a message is sent to your SNS topic.

    Create an Amazon SQS Queue

    1. Sign in to the AWS console, and go to the Amazon SQS Queue page.
    2. Create a queue.
      1. Click Create queue.
      2. For purposes of this guide, select Standard as the type.
      3. Enter a queue name.
      4. Scroll to the bottom of the page, and click Create queue.
      5. Copy both of the queue's ARN and URL values, and save them for use in another step.

    Create an AWS Lambda Function

    1. Sign in to the AWS console, and go to https://console.aws.amazon.com/lambda/home.
    2. Create a function.
      1. Click Create Function.
      2. Leave Author from scratch as the default value.
      3. Enter a function name.
      4. Select Python 3.12 as the Runtime.
      5. If necessary, scroll to and select Advanced Settings.
      6. Select Enable function URL.
      7. Select None as the auth type.
        You will implement your own authorization logic in your function.
      8. Click Create function.
        The default “Hello World” example returns an HTTP 200 status code with a JSON body, so it suffices for testing the event notification rule.
    3. In the Function URL field, click (copy) and save the value.

    Edit and Test your Event Notification Rule

    1. Sign in to your Backblaze account.
    2. In the left navigation menu under B2 Cloud Storage, click Buckets.
    3. Click Edit Event Notification.
    4. Replace the Target URL with your Lambda function URL that you copied in a previous step.
    5. Click Test Rule.
      A success message is displayed if there are no errors.
    6. In the Custom HTTP Header section, enter X-B2-Region as the Key.
    7. In the Value field, enter the region from your bucket’s endpoint. For example, in the endpoint “s3.us-west-004.backblazeb2.com,” the region is “us-west-004.”
    8. Click Save Rule, and close the Event Notifications dialog.

    Assign a New Permission to the AWS Lambda Function

    Now you need to give your Lambda function permission to publish to your SNS topic or SQS queue.

    1. In your AWS Lambda Console, select the Configuration tab.
    2. Click Permissions in the left menu.
    3. Click the link under Role name to open the Lambda function’s execution role page.
    4. Copy the execution role’s ARN, and save it for use in a later step.
    5. Scroll to and select the Add Permissions menu, click Create inline policy, and click JSON.
    6. If you created an SNS topic, replace the policy editor template with the following JSON, replacing the Resource value with the SNS topic ARN that you copied earlier.
      {
      	"Version": "2012-10-17",
      	"Statement": [
      		{
      			"Sid": "PublishSNSMessage",
      			"Effect": "Allow",
      			"Action": "sns:Publish",
      			"Resource": "<sns-topic-arn>"
      		}
      	]
      }
    7. If you created an SQS queue, replace the policy editor template with the following JSON, replacing the Resource value with the SQS queue ARN that you copied earlier.
      {
      	"Version": "2012-10-17",
      	"Statement": [
      		{
      			"Sid": "SendSQSMessage",
      			"Effect": "Allow",
      			"Action": "sqs:SendMessage",
      			"Resource": "<sqs-queue-arn>"
      		}
      	]
      }
    8. Click Next.
    9. Enter a policy name.
    10. Click Create policy.

    Add the Signing Secret to AWS Secrets Manager

    You should securely store credentials such as the event notification rule’s signing secret rather than simply pasting the value into source code. Create a secret in AWS Secrets Manager for this purpose.

    1. Sign in to the AWS console, and go to https://console.aws.amazon.com/secretsmanager/listsecrets.
    2. Click Store a new secret.
    3. Select Other type of secret as the secret type.
    4. Enter SigningSecret as the key, and enter the value of the secret that you copied from the Event Notifications dialog earlier.
    5. Click Next.
    6. Enter b2-object-creation-rule-signing-secret as the secret name, and click Edit permissions.
    7. Paste the following JSON into Resource permissions in place of the template policy, setting the AWS value to the AWS Lambda function execution role ARN that you saved earlier:
      {
        "Version" : "2012-10-17",
        "Statement" : [ {
          "Effect" : "Allow",
          "Principal" : {
            "AWS" : "<lambda-function-execution-role-arn>"
          },
          "Action" : "secretsmanager:GetSecretValue",
          "Resource" : "*"
        } ]
      }
    8. Click Save, and click Next.
    9. Click Next on the Configure rotation page.
    10. Click Store on the Review page.

    Add Code to the AWS Lambda Function to Publish an SNS Notification

    1. In the AWS console, navigate to your AWS Lambda function and select the Code tab.
    2. Delete the existing code, and paste the following Python code, replacing the value of topic_arn with your SNS topic ARN. The code populates the S3-style notification message with values from the B2 notification message, using the value of the X-B2-Region custom HTTP header for awsRegion.
      Click here for Python code.
      import boto3
      import datetime
      import hashlib
      import hmac
      import json
      
      from botocore.exceptions import ClientError
      
      # Set these values according to your environment and configuration
      topic_arn = '<sns-topic-arn>'
      
      secret_name = "b2-object-creation-rule-signing-secret"
      
      # SDK clients for SNS and Secrets Manager
      sns_client = boto3.client('sns')
      secrets_client = boto3.client('secretsmanager')
      
      
      class SignatureError(Exception):
          """Raised when there's an error validating the message signature."""
      
      
      def get_signing_secret():
          """Retrieve the signing secret from AWS Secrets Manager."""
          secret_value_response = secrets_client.get_secret_value(SecretId=secret_name)
      
          # get_secret_value() returns JSON string, so we need to parse it
          secret_value = json.loads(secret_value_response['SecretString'])
          return secret_value['SigningSecret']
      
      
      def validate_signed_message(event):
          """Validate the signature on the event notification message.
      
          Verify that the x-bz-event-notification-signature header is present, 
          well formatted, has the correct version, and matches the HMAC-SHA256
          digest generated from the signing secret and message body.
          """
          if 'x-bz-event-notification-signature' not in event['headers']:
              raise SignatureError('Missing signature header')
      
          signature = event['headers']['x-bz-event-notification-signature']
          pair = signature.split('=')
          if len(pair) != 2:
              raise SignatureError('Invalid signature format')
      
          version = pair[0]
          if version != 'v1':
              raise SignatureError('Invalid signature version')
      
          try:
              signing_secret = get_signing_secret()
          except ClientError as err:
              raise SignatureError('Can\'t get signing secret')
      
          received_sig = pair[1]
          calculated_sig = hmac.new(
               bytes(signing_secret, 'utf-8'),
               msg=bytes(event['body'], 'utf-8'),
               digestmod=hashlib.sha256
          ).hexdigest().lower()
      
          if received_sig != calculated_sig:
              raise SignatureError('Invalid signature')
      
          return None
      
      
      def create_response(status, message, err=None):
          """Print the message and return a formatted response"""
          body = f'{message}: {str(err)}' if err else message
          print(body)
          return {
              'statusCode': status,
              'headers': {
                  'Content-Type': 'text/plain',
              },
              'body': body,
              'isBase64Encoded': False,
          }
      
      
      def lambda_handler(event, context):
          """Entry point for the AWS Lambda function"""
          try:
              validate_signed_message(event)
          except SignatureError as err:
              return create_response(401, 'Signature error', err)
      
          print(f'Incoming message: \n{event['body']}')
      
          # Event body is a JSON-formatted string, so we need to parse it
          try:
              body = json.loads(event['body'])
          except json.JSONDecodeError as err:
              return create_response(400, 'Error parsing JSON event body', err)
      
          # Right now, event notification messages contain just a single event
          b2_event = body['events'][0];
      
          # Incoming event timestamp is in epoch milliseconds
          # Outgoing message needs ISO 8601
          timestamp = b2_event['eventTimestamp'] / 1000.0
          eventTime = ( 
             datetime.datetime.fromtimestamp(timestamp)
             .strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
          )
      
          # B2 region is in a custom HTTP header
          # Note that AWS changes all HTTP header keys to lowercase
          b2_region = event['headers']['x-b2-region']
      
          # This message is based on the example S3 event notification at
          # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
          message = f'''{{  
          "Records":[  
              {{  
                  "eventVersion":"2.1",
                  "eventSource":"aws:s3",
                  "awsRegion":"{b2_region}",
                  "eventTime":"{eventTime}",
                  "eventName":"ObjectCreated:Put",
                  "userIdentity":{{  
                      "principalId":"{b2_event['accountId']}"
                  }},
                  "requestParameters":{{  
                      "sourceIPAddress":"{event['requestContext']['http']['sourceIp']}"
                  }},
                  "responseElements":{{  
                      "x-amz-request-id":"{event['requestContext']['requestId']}",
                      "x-amz-id-2":"dummy/value/abc123"
                  }},
                  "s3":{{  
                      "s3SchemaVersion":"1.0",
                      "configurationId":"{b2_event['matchedRuleName']}",
                      "bucket":{{  
                          "name":"{b2_event['bucketName']}",
                          "ownerIdentity":{{  
                              "principalId":"{b2_event['accountId']}"
                          }},
                          "arn":"arn:aws:s3:::{b2_event['bucketName']}"
                      }},
                      "object":{{  
                          "key":"{b2_event['objectName']}",
                          "size":{b2_event['objectSize']},
                          "eTag":"dummy_value",
                          "versionId":"{b2_event['objectVersionId']}",
                          "sequencer":"dummy_value"
                      }}
                  }}
              }}
          ]
      }}'''
      
          print(f'Outgoing message: \n{message}')
      
          try:
              response = sns_client.publish(
                  TopicArn=topic_arn,
                  Message=message)
          except ClientError as err:
              return create_response(500, 'Error publishing message', err)
      
          return create_response(200, 'Success')
    3. Click Deploy.
    4. Upload another file to your Backblaze B2 bucket.
      Within a few seconds to a few minutes, depending on your email provider, you should receive an email containing a JSON event notification that is structured similarly to an S3 notification.
    5. If you do not receive an email, in your AWS Lambda function’s console page, select the Monitor tab and click View CloudWatch logs.
    6. Examine the log streams to gather insight into why your AWS Lambda function is failing.

    Add Code to the AWS Lambda Function to Send a Message to an SQS Queue

    1. In the AWS console, navigate to your AWS Lambda function and select the Code tab.
    2. Paste the following Python code in place of the example replacing the value of queue_url with your SQS queue URL. The code populates the S3-style notification message with values from the B2 notification message, using the value of the X-B2-Region custom HTTP header for awsRegion.
      Click here for Python code.
      import boto3
      import datetime
      import hashlib
      import hmac
      import json
      
      from botocore.exceptions import ClientError
      
      # Set these values according to your environment and configuration
      queue_url = '<sqs-queue-url>'
      secret_name = 'b2-object-creation-rule-signing-secret'
      
      # SDK clients for SQS and Secrets Manager
      sqs_client = boto3.client('sqs')
      secrets_client = boto3.client('secretsmanager')
      
      
      class SignatureError(Exception):
          """Raised when there's an error validating the message signature."""
      
      
      def get_signing_secret():
          """Retrieve the signing secret from AWS Secrets Manager."""
          secret_value_response = secrets_client.get_secret_value(SecretId=secret_name)
      
          # get_secret_value() returns JSON string, so we need to parse it
          secret_value = json.loads(secret_value_response['SecretString'])
          return secret_value['SigningSecret']
      
      
      def validate_signed_message(event):
          """Validate the signature on the event notification message.
      
          Verify that the x-bz-event-notification-signature header is present, 
          well formatted, has the correct version, and matches the HMAC-SHA256
          digest generated from the signing secret and message body.
          """
          if 'x-bz-event-notification-signature' not in event['headers']:
              raise SignatureError('Missing signature header')
      
          signature = event['headers']['x-bz-event-notification-signature']
          pair = signature.split('=')
          if len(pair) != 2:
              raise SignatureError('Invalid signature format')
      
          version = pair[0]
          if version != 'v1':
              raise SignatureError('Invalid signature version')
      
          try:
              signing_secret = get_signing_secret()
          except ClientError as err:
              raise SignatureError('Can\'t get signing secret') from err
      
          received_sig = pair[1]
          calculated_sig = hmac.new(
               bytes(signing_secret, 'utf-8'),
               msg=bytes(event['body'], 'utf-8'),
               digestmod=hashlib.sha256
          ).hexdigest().lower()
      
          if received_sig != calculated_sig:
              raise SignatureError('Invalid signature')
      
          return None
      
      
      def create_response(status, message, err=None):
          """Print the message and return a formatted response"""
          body = f'{message}: {str(err)}' if err else message
          print(body)
          return {
              'statusCode': status,
              'headers': {
                  'Content-Type': 'text/plain',
              },
              'body': body,
              'isBase64Encoded': False,
          }
      
      
      def lambda_handler(event, context):
          """Entry point for the AWS Lambda function"""
          try:
              validate_signed_message(event)
          except SignatureError as err:
              return create_response(401, 'Signature error', err)
      
          # Event body is a JSON-formatted string, so we need to parse it
          try:
              body = json.loads(event['body'])
          except json.JSONDecodeError as err:
              return create_response(400, 'Error parsing JSON event body', err)
      
          # Right now, event notification messages contain just a single event
          b2_event = body['events'][0];
      
          # Incoming event timestamp is in epoch milliseconds
          # Outgoing message needs ISO 8601
          timestamp = b2_event['eventTimestamp'] / 1000.0
          eventTime = ( 
             datetime.datetime.fromtimestamp(timestamp)
             .strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
          )
      
          # B2 region is in a custom HTTP header
          # Note that AWS changes all HTTP header keys to lowercase
          b2_region = event['headers']['x-b2-region']
      
          # This message is based on the example S3 event notification at
          # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
          message = f'''{{  
          "Records":[  
              {{  
                  "eventVersion":"2.1",
                  "eventSource":"aws:s3",
                  "awsRegion":"{b2_region}",
                  "eventTime":"{eventTime}",
                  "eventName":"ObjectCreated:Put",
                  "userIdentity":{{  
                      "principalId":"{b2_event['accountId']}"
                  }},
                  "requestParameters":{{  
                      "sourceIPAddress":"{event['requestContext']['http']['sourceIp']}"
                  }},
                  "responseElements":{{  
                      "x-amz-request-id":"{event['requestContext']['requestId']}",
                      "x-amz-id-2":"dummy/value/abc123"
                  }},
                  "s3":{{  
                      "s3SchemaVersion":"1.0",
                      "configurationId":"{b2_event['matchedRuleName']}",
                      "bucket":{{  
                          "name":"{b2_event['bucketName']}",
                          "ownerIdentity":{{  
                              "principalId":"{b2_event['accountId']}"
                          }},
                          "arn":"arn:aws:s3:::{b2_event['bucketName']}"
                      }},
                      "object":{{  
                          "key":"{b2_event['objectName']}",
                          "size":{b2_event['objectSize']},
                          "eTag":"dummy_value",
                          "versionId":"{b2_event['objectVersionId']}",
                          "sequencer":"dummy_value"
                      }}
                  }}
              }}
          ]
      }}'''
      
          print(f'Outgoing message: \n{message}')
      
          try:
              response = sqs_client.send_message(
                  QueueUrl=queue_url,
                  MessageBody=message)
          except ClientError as err:
              return create_response(500, 'Error sending message', err)
      
          return create_response(200, 'Success')
    3. Click Deploy.
    4. Navigate to your SQS function’s page, and click Send and receive messages.
    5. Click Poll for messages.
    6. Upload another file to your Backblaze B2 bucket.
      A new message appears at the bottom of the page.
    7. In the message, click the message ID link.
    8. Scroll to the body of the message to examine the JSON-formatted message.
      The following example message shows what you should see in the Body tab:
      Click here for example JSON.
      {  
          "Records":[  
              {  
                  "eventVersion":"2.1",
                  "eventSource":"aws:s3",
                  "awsRegion":"us-west-004",
                  "eventTime":"2024-02-23T22:12:47.000Z",
                  "eventName":"ObjectCreated:Put",
                  "userIdentity":{  
                      "principalId":"15f935cf4dcb"
                  },
                  "requestParameters":{  
                      "sourceIPAddress":"2605:72c0:503:42::c19"
                  },
                  "responseElements":{  
                      "x-amz-request-id":"254d41aa-a6bb-4264-b210-bb4ff9664a54",
                      "x-amz-id-2":"dummy/value/abc123"
                  },
                  "s3":{  
                      "s3SchemaVersion":"1.0",
                      "configurationId":"my-first-event-notification-rule",
                      "bucket":{  
                          "name":"Metadaddy-Tester",
                          "ownerIdentity":{  
                              "principalId":"15f935cf4dcb"
                          },
                          "arn":"arn:aws:s3:::Metadaddy-Tester"
                      },
                      "object":{  
                          "key":"1megabyte-test-file",
                          "size":1048576,
                          "eTag":"dummy_value",
                          "versionId":"4_z71d55f5943b52c7f74ed0c1b_f117370b8bbd1b3e7_d20240223_m221247_c004_v0402017_t0002_u01708726367081",
                          "sequencer":"dummy_value"
                      }
                  }
              }
          ]
      }
    9. If you do not see a message, in your AWS Lambda function’s console page, select the Monitor tab and click View CloudWatch logs.
    10. Examine the log streams to gather insight into why your AWS Lambda function is failing.

    Was this article helpful?