Solutions

Lab 1 Solutions

ReduceLambdaPolicy

{
    "Version": "2012-10-17",
    "Statement": [
    {
        "Sid": "ReadFromDynamoDBStream",
        "Effect": "Allow",
        "Action": [
        "dynamodb:DescribeStream",
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:ListStreams"
        ],
        "Resource": "arn:aws:dynamodb:*:*:table/ReduceTable/stream/*"
    },
    {
        "Sid": "CreateCloudwatchLogGroup",
        "Effect": "Allow",
        "Action": [
        "logs:CreateLogGroup"
        ],
        "Resource": "arn:aws:logs:*:*:*"
    },

    {
        "Sid": "WriteToCloudwatchLogGroup",
        "Effect": "Allow",
        "Action": [
        "logs:CreateLogStream",
        "logs:PutLogEvents"
        ],
        "Resource": "arn:aws:logs:*:*:log-group:/aws/lambda/ReduceLambda:*"
    },
    {
        "Sid": "WriteToDynamoDBTable",
        "Effect": "Allow",
        "Action": [
        "dynamodb:UpdateItem"
        ],
        "Resource": "arn:aws:dynamodb:*:*:table/AggregateTable"
    },
    {
        "Sid": "ReadFromParameterTable",
        "Effect": "Allow",
        "Action": [
        "dynamodb:GetItem"
        ],
        "Resource": "arn:aws:dynamodb:*:*:table/ParameterTable"
    }
    ]
}

Lab 2 Solutions

Lab 2 StateLambda function complete code

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

# --------------------------------------------------------------------------------------------------
# Imports
# --------------------------------------------------------------------------------------------------

# General Imports
import json
import time
import base64
import random
from decimal import Decimal

# AWS Imports
import boto3
from botocore.exceptions import ClientError

# Project Imports
import functions
import constants

# --------------------------------------------------------------------------------------------------
# Lambda Function
# --------------------------------------------------------------------------------------------------

def lambda_handler(event, context):

    # Print Status at Start
    records = event['Records']
    print('Invoked StateLambda with ' + str(len(records)) + ' record(s).')

    # Initialize DynamoDB
    ddb_ressource = boto3.resource('dynamodb')
    table = ddb_ressource.Table(constants.STATE_TABLE_NAME)

    # Get Failure PCT
    FAILURE_STATE_LAMBDA_PCT = functions.get_parameter(ddb_ressource, "FAILURE_STATE_LAMBDA_PCT", 0)

    # Loop over records
    for record in records:

        # Manually Introduced Random Failure
        if random.uniform(0,100) < FAILURE_STATE_LAMBDA_PCT / len(records):
            # Raise exception
            raise Exception('Manually Introduced Random Failure!')

        # Load Record
        data = json.loads(base64.b64decode(record['kinesis']['data']).decode('utf-8'))

        # Get Entries
        record_id           = data[constants.ID_COLUMN_NAME]
        record_hierarchy    = data[constants.HIERARCHY_COLUMN_NAME]
        record_value        = data[constants.VALUE_COLUMN_NAME]
        record_version      = data[constants.VERSION_COLUMN_NAME]
        record_time         = data[constants.TIMESTAMP_COLUMN_NAME]

        # If Record is older than 1 Minute -> Ignore it
        if (time.time() - record_time) > 60:
            continue

        # Write to DDB
        try:
            table.update_item(
                Key = {
                    constants.STATE_TABLE_KEY: record_id
                    },
                ConditionExpression = 'attribute_not_exists(' + constants.STATE_TABLE_KEY + ') OR ' + constants.VERSION_COLUMN_NAME + '< :new_version',
                UpdateExpression = 'SET  #VALUE     = :new_value,' + \
                                        '#VERSION   = :new_version,' + \
                                        '#HIERARCHY = :new_hierarchy,' + \
                                        '#TIMESTAMP = :new_time',
                ExpressionAttributeNames={
                    '#VALUE':       constants.VALUE_COLUMN_NAME,
                    '#VERSION':     constants.VERSION_COLUMN_NAME,
                    '#HIERARCHY':   constants.HIERARCHY_COLUMN_NAME,
                    '#TIMESTAMP':   constants.TIMESTAMP_COLUMN_NAME
                    },
                ExpressionAttributeValues={
                    ':new_version':     record_version,
                    ':new_value':       Decimal(str(record_value)),
                    ':new_hierarchy':   json.dumps(record_hierarchy, sort_keys = True),
                    ':new_time':        Decimal(str(record_time))
                    }
                )
        except ClientError as e:
            if e.response['Error']['Code']=='ConditionalCheckFailedException':
                print('Conditional put failed.' + \
                    ' This is either a duplicate or a more recent version already arrived.')
                print('Id: ',           record_id)
                print('Hierarchy: ',    record_hierarchy)
                print('Value: ',        record_value)
                print('Version: ',      record_version)
                print('Timestamp: ',    record_time)
            else:
                raise e

    # Print Status at End
    print('StateLambda successfully processed ' + str(len(records)) + ' record(s).')
    return {'statusCode': 200}

Lab 2 ReduceLambda function complete code

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

# --------------------------------------------------------------------------------------------------
# Imports
# --------------------------------------------------------------------------------------------------

# General Imports
import json
import hashlib
import random
import time

# AWS Imports
import boto3
from botocore.exceptions import ClientError

# Project Imports
import functions
import constants

# --------------------------------------------------------------------------------------------------
# Lambda Function
# --------------------------------------------------------------------------------------------------

def lambda_handler(event, context):

    # Print Status at Start
    records = event['Records']
    print('Invoked ReduceLambda with ' + str(len(records)) + ' Delta message(s).')

    # Initialize Dict for Total Delta
    totals = dict()

    # Initialize DDB Ressource
    ddb_ressource = boto3.resource('dynamodb')

    # Keep track of number of batches for timestamp mean
    batch_count = 0

    # Iterate over Messages
    for record in event['Records']:

        # Aggregate over Batch of Messages the Lambda was invoked with
        if 'NewImage' in record['dynamodb']:

            # Load Message to Dict
            message = record['dynamodb']['NewImage']['Message']['S'].replace("'",'"')
            data = json.loads(message)

            # Get Batch Count (To Calculate Mean of Timestamp)
            batch_count += 1

            # Iterate over Entries in Message
            for entry in data:
                if (entry == constants.TIMESTAMP_GENERATOR_FIRST or entry == constants.TIMESTAMP_GENERATOR_MEAN):
                    continue
                else:
                    functions.dict_entry_add(totals, entry, data[entry])

    # If this batch contains only deletes: Done
    if not totals:
        print('Skipped batch - no new entries.')
        return {'statusCode': 200}

    # Total Count of New Messages (for Printing)
    total_new_message_count = totals[constants.MESSAGE_COUNT_NAME]

    # Update all Values within one single transaction
    ddb_client = boto3.client('dynamodb')

    # Batch of Items
    batch = [
        { 'Update':
            {
                'TableName' : constants.AGGREGATE_TABLE_NAME,
                'Key' : {constants.AGGREGATE_TABLE_KEY : {'S' : entry}},
                'UpdateExpression' : "ADD #val :val ",
                'ExpressionAttributeValues' : {
                    ':val': {'N' : str(totals[entry])}
                },
                'ExpressionAttributeNames': {
                    "#val" : "Value"
                }
            }
        } for entry in totals.keys()]

    # Calculate hash to ensure this batch hasn't been processed already:
    record_list_hash = hashlib.md5(str(records).encode()).hexdigest()

    response = ddb_client.transact_write_items(
        TransactItems = batch,
        ClientRequestToken = record_list_hash
    )

    # Manually Introduced Random Failure
    if random.uniform(0,100) < functions.get_parameter(ddb_ressource, "FAILURE_REDUCE_LAMBDA_PCT", 0):

        # Raise Exception
        raise Exception('Manually Introduced Random Failure!')

    # Print Status at End
    print('ReduceLambda finished. Updates aggregates with ' + str(total_new_message_count) + ' new message(s) in total.')

    return {'statusCode': 200}