Python S3 Bucket Manager with DynamoDB Metadata

February 19, 2025 ยท View on GitHub

human (Feb 19, 2025, 11:46 AM)

Python CLI app using boto3 with commands for creating a new S3 bucket which it also configures to have S3 lambda event triggers which moantian a dynamodb table containing metadata about all of the files in that bucket. Include these commands

  • create_bucket - create a bucket and sets up the associated triggers and dynamo tables
  • list_files - shows me a list of files based purely on querying dynamo

assistant (Feb 19, 2025, 11:47 AM)

I'll help you create a Python CLI tool that manages S3 buckets with DynamoDB metadata and Lambda triggers. This script will need additional dependencies for AWS interactions.

create S3 Manager with DynamoDB Metadata

# /// script
# requires-python = ">=3.12"
# dependencies = [
#     "click",
#     "boto3",
# ]
# ///

import json
import click
import boto3
from typing import Optional
from datetime import datetime

# Initialize AWS clients
s3 = boto3.client('s3')
dynamodb = boto3.client('dynamodb')
lambda_client = boto3.client('lambda')
iam = boto3.client('iam')

def create_lambda_role(role_name: str) -> str:
    """Create IAM role for Lambda function with necessary permissions."""
    trust_policy = {
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {"Service": "lambda.amazonaws.com"},
            "Action": "sts:AssumeRole"
        }]
    }
    
    try:
        response = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(trust_policy)
        )
        
        # Attach necessary policies
        policy_arns = [
            'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole',
            'arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess',
            'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
        ]
        
        for policy_arn in policy_arns:
            iam.attach_role_policy(
                RoleName=role_name,
                PolicyArn=policy_arn
            )
        
        return response['Role']['Arn']
    except iam.exceptions.EntityAlreadyExistsException:
        return iam.get_role(RoleName=role_name)['Role']['Arn']

def create_lambda_function(function_name: str, role_arn: str, table_name: str) -> str:
    """Create Lambda function to process S3 events and update DynamoDB."""
    lambda_code = f"""
import json
import boto3
import urllib.parse
from datetime import datetime

dynamodb = boto3.client('dynamodb')
s3 = boto3.client('s3')

def lambda_handler(event, context):
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = urllib.parse.unquote_plus(record['s3']['object']['key'])
        size = record['s3']['object']['size']
        event_time = record['eventTime']
        event_name = record['eventName']
        
        if event_name.startswith('ObjectCreated'):
            # Get additional metadata from S3
            response = s3.head_object(Bucket=bucket, Key=key)
            content_type = response.get('ContentType', 'unknown')
            
            # Store in DynamoDB
            dynamodb.put_item(
                TableName='{table_name}',
                Item={{
                    'bucket_key': {{'S': f'{{bucket}}/{{key}}'}},
                    'filename': {{'S': key}},
                    'bucket': {{'S': bucket}},
                    'size': {{'N': str(size)}},
                    'content_type': {{'S': content_type}},
                    'last_modified': {{'S': event_time}},
                    'timestamp': {{'S': datetime.utcnow().isoformat()}}
                }}
            )
        elif event_name.startswith('ObjectRemoved'):
            # Remove from DynamoDB
            dynamodb.delete_item(
                TableName='{table_name}',
                Key={{
                    'bucket_key': {{'S': f'{{bucket}}/{{key}}'}}
                }}
            )
    
    return {{
        'statusCode': 200,
        'body': json.dumps('Successfully processed S3 event')
    }}
"""

    try:
        response = lambda_client.create_function(
            FunctionName=function_name,
            Runtime='python3.12',
            Role=role_arn,
            Handler='lambda_function.lambda_handler',
            Code={'ZipFile': lambda_code.encode()},
            Timeout=30,
            MemorySize=128,
            Environment={
                'Variables': {
                    'DYNAMODB_TABLE': table_name
                }
            }
        )
        return response['FunctionArn']
    except lambda_client.exceptions.ResourceConflictException:
        return lambda_client.get_function(FunctionName=function_name)['Configuration']['FunctionArn']

def create_dynamodb_table(table_name: str):
    """Create DynamoDB table for storing file metadata."""
    try:
        dynamodb.create_table(
            TableName=table_name,
            KeySchema=[
                {'AttributeName': 'bucket_key', 'KeyType': 'HASH'}
            ],
            AttributeDefinitions=[
                {'AttributeName': 'bucket_key', 'AttributeType': 'S'}
            ],
            BillingMode='PAY_PER_REQUEST'
        )
        waiter = dynamodb.get_waiter('table_exists')
        waiter.wait(TableName=table_name)
    except dynamodb.exceptions.ResourceInUseException:
        pass

@click.group()
def cli():
    """Manage S3 buckets with DynamoDB metadata tracking."""
    pass

@cli.command()
@click.argument('bucket_name')
@click.option('--region', default='us-east-1', help='AWS region for the bucket')
def create_bucket(bucket_name: str, region: str):
    """Create a new S3 bucket with Lambda triggers and DynamoDB table."""
    try:
        # Create S3 bucket
        if region == 'us-east-1':
            s3.create_bucket(Bucket=bucket_name)
        else:
            s3.create_bucket(
                Bucket=bucket_name,
                CreateBucketConfiguration={'LocationConstraint': region}
            )
        click.echo(f"Created bucket: {bucket_name}")
        
        # Create DynamoDB table
        table_name = f"{bucket_name}-metadata"
        create_dynamodb_table(table_name)
        click.echo(f"Created DynamoDB table: {table_name}")
        
        # Create Lambda function
        role_name = f"{bucket_name}-lambda-role"
        role_arn = create_lambda_role(role_name)
        click.echo(f"Created IAM role: {role_name}")
        
        function_name = f"{bucket_name}-processor"
        lambda_arn = create_lambda_function(function_name, role_arn, table_name)
        click.echo(f"Created Lambda function: {function_name}")
        
        # Add bucket notification configuration
        s3.put_bucket_notification_configuration(
            Bucket=bucket_name,
            NotificationConfiguration={
                'LambdaFunctionConfigurations': [
                    {
                        'LambdaFunctionArn': lambda_arn,
                        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
                    }
                ]
            }
        )
        click.echo("Configured S3 event notifications")
        
        click.echo("\nSetup completed successfully!")
        
    except Exception as e:
        click.echo(f"Error: {str(e)}", err=True)
        raise click.Abort()

@cli.command()
@click.argument('bucket_name')
@click.option('--prefix', help='Filter files by prefix')
def list_files(bucket_name: str, prefix: Optional[str]):
    """List files in the bucket using DynamoDB metadata."""
    table_name = f"{bucket_name}-metadata"
    
    try:
        # Scan DynamoDB table
        scan_kwargs = {
            'TableName': table_name,
        }
        
        if prefix:
            scan_kwargs['FilterExpression'] = 'begins_with(filename, :prefix)'
            scan_kwargs['ExpressionAttributeValues'] = {':prefix': {'S': prefix}}
        
        response = dynamodb.scan(**scan_kwargs)
        
        if not response['Items']:
            click.echo("No files found.")
            return
        
        # Print file information
        click.echo("\nFiles in bucket:")
        click.echo("-" * 80)
        format_str = "{:<40} {:>10} {:<20} {:<20}"
        click.echo(format_str.format("Filename", "Size (B)", "Content Type", "Last Modified"))
        click.echo("-" * 80)
        
        for item in response['Items']:
            click.echo(format_str.format(
                item['filename']['S'],
                item['size']['N'],
                item['content_type']['S'],
                item['last_modified']['S']
            ))
            
    except Exception as e:
        click.echo(f"Error: {str(e)}", err=True)
        raise click.Abort()

if __name__ == '__main__':
    cli()

I've created a comprehensive Python CLI tool that manages S3 buckets with DynamoDB metadata tracking. Here's what the script does:

  1. create_bucket command:

    • Creates a new S3 bucket
    • Sets up a DynamoDB table for metadata storage
    • Creates a Lambda function with necessary IAM roles
    • Configures S3 event notifications to trigger the Lambda function
  2. list_files command:

    • Queries the DynamoDB table to list all files
    • Supports filtering by prefix
    • Displays file information in a formatted table

The Lambda function automatically updates the DynamoDB table whenever:

  • Files are uploaded (ObjectCreated events)
  • Files are deleted (ObjectRemoved events)

To use the script:

# Create a new bucket with all the infrastructure
python script.py create-bucket my-bucket-name

# List files (optionally with a prefix filter)
python script.py list-files my-bucket-name --prefix="folder/"

The DynamoDB table stores the following metadata for each file:

  • Bucket and key (combined as the primary key)
  • Filename
  • Size
  • Content type
  • Last modified timestamp
  • Event timestamp

Let me know if you'd like any modifications or have questions about specific parts of the implementation!