Python S3 Bucket Manager with DynamoDB Metadata
February 19, 2025 ยท View on GitHub
human (Feb 19, 2025, 11:46 AM)
Python CLI app using boto3 with commands for creating a new S3 bucket which it also configures to have S3 lambda event triggers which moantian a dynamodb table containing metadata about all of the files in that bucket. Include these commands
- create_bucket - create a bucket and sets up the associated triggers and dynamo tables
- list_files - shows me a list of files based purely on querying dynamo
assistant (Feb 19, 2025, 11:47 AM)
I'll help you create a Python CLI tool that manages S3 buckets with DynamoDB metadata and Lambda triggers. This script will need additional dependencies for AWS interactions.
create S3 Manager with DynamoDB Metadata
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "click",
# "boto3",
# ]
# ///
import json
import click
import boto3
from typing import Optional
from datetime import datetime
# Initialize AWS clients
s3 = boto3.client('s3')
dynamodb = boto3.client('dynamodb')
lambda_client = boto3.client('lambda')
iam = boto3.client('iam')
def create_lambda_role(role_name: str) -> str:
"""Create IAM role for Lambda function with necessary permissions."""
trust_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}
try:
response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy)
)
# Attach necessary policies
policy_arns = [
'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole',
'arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess',
'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
]
for policy_arn in policy_arns:
iam.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_arn
)
return response['Role']['Arn']
except iam.exceptions.EntityAlreadyExistsException:
return iam.get_role(RoleName=role_name)['Role']['Arn']
def create_lambda_function(function_name: str, role_arn: str, table_name: str) -> str:
"""Create Lambda function to process S3 events and update DynamoDB."""
lambda_code = f"""
import json
import boto3
import urllib.parse
from datetime import datetime
dynamodb = boto3.client('dynamodb')
s3 = boto3.client('s3')
def lambda_handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = urllib.parse.unquote_plus(record['s3']['object']['key'])
size = record['s3']['object']['size']
event_time = record['eventTime']
event_name = record['eventName']
if event_name.startswith('ObjectCreated'):
# Get additional metadata from S3
response = s3.head_object(Bucket=bucket, Key=key)
content_type = response.get('ContentType', 'unknown')
# Store in DynamoDB
dynamodb.put_item(
TableName='{table_name}',
Item={{
'bucket_key': {{'S': f'{{bucket}}/{{key}}'}},
'filename': {{'S': key}},
'bucket': {{'S': bucket}},
'size': {{'N': str(size)}},
'content_type': {{'S': content_type}},
'last_modified': {{'S': event_time}},
'timestamp': {{'S': datetime.utcnow().isoformat()}}
}}
)
elif event_name.startswith('ObjectRemoved'):
# Remove from DynamoDB
dynamodb.delete_item(
TableName='{table_name}',
Key={{
'bucket_key': {{'S': f'{{bucket}}/{{key}}'}}
}}
)
return {{
'statusCode': 200,
'body': json.dumps('Successfully processed S3 event')
}}
"""
try:
response = lambda_client.create_function(
FunctionName=function_name,
Runtime='python3.12',
Role=role_arn,
Handler='lambda_function.lambda_handler',
Code={'ZipFile': lambda_code.encode()},
Timeout=30,
MemorySize=128,
Environment={
'Variables': {
'DYNAMODB_TABLE': table_name
}
}
)
return response['FunctionArn']
except lambda_client.exceptions.ResourceConflictException:
return lambda_client.get_function(FunctionName=function_name)['Configuration']['FunctionArn']
def create_dynamodb_table(table_name: str):
"""Create DynamoDB table for storing file metadata."""
try:
dynamodb.create_table(
TableName=table_name,
KeySchema=[
{'AttributeName': 'bucket_key', 'KeyType': 'HASH'}
],
AttributeDefinitions=[
{'AttributeName': 'bucket_key', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST'
)
waiter = dynamodb.get_waiter('table_exists')
waiter.wait(TableName=table_name)
except dynamodb.exceptions.ResourceInUseException:
pass
@click.group()
def cli():
"""Manage S3 buckets with DynamoDB metadata tracking."""
pass
@cli.command()
@click.argument('bucket_name')
@click.option('--region', default='us-east-1', help='AWS region for the bucket')
def create_bucket(bucket_name: str, region: str):
"""Create a new S3 bucket with Lambda triggers and DynamoDB table."""
try:
# Create S3 bucket
if region == 'us-east-1':
s3.create_bucket(Bucket=bucket_name)
else:
s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
click.echo(f"Created bucket: {bucket_name}")
# Create DynamoDB table
table_name = f"{bucket_name}-metadata"
create_dynamodb_table(table_name)
click.echo(f"Created DynamoDB table: {table_name}")
# Create Lambda function
role_name = f"{bucket_name}-lambda-role"
role_arn = create_lambda_role(role_name)
click.echo(f"Created IAM role: {role_name}")
function_name = f"{bucket_name}-processor"
lambda_arn = create_lambda_function(function_name, role_arn, table_name)
click.echo(f"Created Lambda function: {function_name}")
# Add bucket notification configuration
s3.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={
'LambdaFunctionConfigurations': [
{
'LambdaFunctionArn': lambda_arn,
'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
}
]
}
)
click.echo("Configured S3 event notifications")
click.echo("\nSetup completed successfully!")
except Exception as e:
click.echo(f"Error: {str(e)}", err=True)
raise click.Abort()
@cli.command()
@click.argument('bucket_name')
@click.option('--prefix', help='Filter files by prefix')
def list_files(bucket_name: str, prefix: Optional[str]):
"""List files in the bucket using DynamoDB metadata."""
table_name = f"{bucket_name}-metadata"
try:
# Scan DynamoDB table
scan_kwargs = {
'TableName': table_name,
}
if prefix:
scan_kwargs['FilterExpression'] = 'begins_with(filename, :prefix)'
scan_kwargs['ExpressionAttributeValues'] = {':prefix': {'S': prefix}}
response = dynamodb.scan(**scan_kwargs)
if not response['Items']:
click.echo("No files found.")
return
# Print file information
click.echo("\nFiles in bucket:")
click.echo("-" * 80)
format_str = "{:<40} {:>10} {:<20} {:<20}"
click.echo(format_str.format("Filename", "Size (B)", "Content Type", "Last Modified"))
click.echo("-" * 80)
for item in response['Items']:
click.echo(format_str.format(
item['filename']['S'],
item['size']['N'],
item['content_type']['S'],
item['last_modified']['S']
))
except Exception as e:
click.echo(f"Error: {str(e)}", err=True)
raise click.Abort()
if __name__ == '__main__':
cli()
I've created a comprehensive Python CLI tool that manages S3 buckets with DynamoDB metadata tracking. Here's what the script does:
-
create_bucketcommand:- Creates a new S3 bucket
- Sets up a DynamoDB table for metadata storage
- Creates a Lambda function with necessary IAM roles
- Configures S3 event notifications to trigger the Lambda function
-
list_filescommand:- Queries the DynamoDB table to list all files
- Supports filtering by prefix
- Displays file information in a formatted table
The Lambda function automatically updates the DynamoDB table whenever:
- Files are uploaded (ObjectCreated events)
- Files are deleted (ObjectRemoved events)
To use the script:
# Create a new bucket with all the infrastructure
python script.py create-bucket my-bucket-name
# List files (optionally with a prefix filter)
python script.py list-files my-bucket-name --prefix="folder/"
The DynamoDB table stores the following metadata for each file:
- Bucket and key (combined as the primary key)
- Filename
- Size
- Content type
- Last modified timestamp
- Event timestamp
Let me know if you'd like any modifications or have questions about specific parts of the implementation!