Optimizing Boto3: Wrapping AWS SDK for Python for Better Performance
Introduction:
Boto3 is a powerful library for interacting with AWS services using Python. However, by wrapping Boto3 in a custom class, you can optimize the way you interact with AWS services and improve the performance of your application. In this post, we will explore the benefits of wrapping Boto3 and show you how to do it.
1. The Benefits of Wrapping Boto3
Wrapping Boto3 in a custom class provides several benefits. One of the main benefits is that it allows you to reuse the same client or resource object across multiple method calls, which can improve performance. Additionally, you can add custom functionality, such as logging or caching, to the wrapper class to further optimize your interactions with AWS services.
2. Creating a Boto3 Wrapper Class
To create a Boto3 wrapper class, you will need to import the boto3 library and create a new class. The class should include two attributes, client
and resource
, that store the boto3 client and resource respectively. The __getattr__
method should be overridden to check if the requested method exists in both the client and resource, and call the corresponding method if it does.
Example:
import boto3
class Boto3Wrapper:
def __init__(self, service_name, region_name):
self.client = boto3.client(service_name, region_name=region_name)
self.resource = boto3.resource(service_name, region_name=region_name)
def __getattr__(self, name):
# Check if the method exists in the client
if hasattr(self.client, name):
return getattr(self.client, name)
# Check if the method exists in the resource
elif hasattr(self.resource, name):
return getattr(self.resource, name)
else:
raise AttributeError("'Boto3Wrapper' object has no attribute '{}'".format(name))
def set_client(self, service_name, region_name):
"""
Set the client to a new service and region
"""
self.client = boto3.client(service_name, region_name=region_name)
def set_resource(self, service_name, region_name):
"""
Set the resource to a new service and region
"""
self.resource = boto3.resource(service_name, region_name=region_name)
def get_client(self):
"""
Get the current client
"""
return self.client
def get_resource(self):
"""
Get the current resource
"""
return self.resource
3. Using the Wrapper Class
Once you’ve created your wrapper class, you can use it to interact with AWS services just like you would with the regular boto3 client and resource. The difference is that you’ll be using the same client and resource object across multiple method calls, which can improve performance.
Example:
wrapper = Boto3Wrapper('s3', 'us-west-2')
response = wrapper.list_buckets()
print(response)
4. Adding Custom Functionality
In addition to improving performance, wrapping Boto3 in a custom class allows you to add custom functionality, such as logging or caching, to the wrapper class. This can further optimize your interactions with AWS services and make your application more efficient.
Example:
class Boto3Wrapper:
def __init__(self, service_name, region_name):
self.client = boto3.client(service_name, region_name=region_name)
self.resource = boto3.resource(service_name, region_name=region_name)
def __getattr__(self, name):
# Check if the method exists in the client
if hasattr(self.client, name):
return getattr(self.client, name)
# Check if the method exists in the resource
elif hasattr(self.resource, name):
return getattr(self.resource, name)
else:
raise AttributeError("'Boto3Wrapper' object has no attribute '{}'".format(name))
def set_client(self, service_name, region_name):
"""
Set the client to a new service and region
"""
self.client = boto3.client(service_name, region_name=region_name)
def set_resource(self, service_name, region_name):
"""
Set the resource to a new service and region
"""
self.resource = boto3.resource(service_name, region_name=region_name)
def get_client(self):
"""
Get the current client
"""
return self.client
def get_resource(self):
"""
Get the current resource
"""
return self.resource
def log_request(self, method, *args, **kwargs):
"""
Log the request being made
"""
print(f'Calling {method} with args: {args} and kwargs: {kwargs}')
def cache_response(self, method, *args, **kwargs):
"""
Cache the response for a given method call
"""
key = (method, args, frozenset(kwargs.items()))
if key in self.response_cache:
return self.response_cache[key]
else:
response = getattr(self.client, method)(*args, **kwargs)
self.response_cache[key] = response
return response
As you can see, in this example, I’ve added two methods, log_request
and cache_response
, that allow you to log and cache the requests being made. You can add other functionality like this, like validations, custom errors handling, etc.
Other custom methods
There are several other custom methods you can include in the wrapper class to further optimize your interactions with AWS services using Boto3. Some examples include:
-
retry_on_failure
: This method could be used to automatically retry a failed API call a certain number of times before giving up. -
handle_throttling
: This method could be used to automatically handle throttling errors by implementing an exponential backoff strategy. -
handle_errors
: This method could be used to handle specific error codes and exceptions, and return a custom response or raise a custom exception. -
credentials_manager
: This method could be used to manage the credentials automatically like rotating credentials and handling temporary credentials -
metrics_tracker
: This method could be used to track various metrics like number of API calls, success rate, and response time and send it to monitoring services like Prometheus. -
batch_operation
: This method could be used to perform batch operations for services that support it like dynamoDB, this can help to save costs and improve performance. -
paginator
: This method could be used to paginate over large sets of data automatically and process it in chunks. -
async_operation
: This method could be used to perform async operations and improve performance by running multiple requests in parallel.
You can also include other custom methods that suit your specific needs. The idea is to add functionality that will help you to optimize the way you interact with AWS services and make your application more efficient.
Complete Class Code:
import time
class Boto3Wrapper:
def __init__(self, service_name, region_name):
self.client = boto3.client(service_name, region_name=region_name)
self.resource = boto3.resource(service_name, region_name=region_name)
self.max_retries = 5
self.retry_errors = [
'Throttling',
'ProvisionedThroughputExceededException'
]
self.custom_errors = {
'NoSuchKey': 'Key not found',
'AccessDenied': 'Access Denied'
}
self.credentials = None
self.metrics = {
'api_calls': 0,
'success_rate': 0,
'response_time': 0
}
def retry_on_failure(self, method, *args, **kwargs):
"""
Automatically retry a failed API call a certain number of times before giving up.
"""
retries = 0
while retries < self.max_retries:
try:
response = getattr(self.client, method)(*args, **kwargs)
return response
except Exception as e:
if any(error in str(e) for error in self.retry_errors):
retries += 1
time.sleep(2 ** retries)
else:
raise e
def handle_throttling(self, method, *args, **kwargs):
"""
Automatically handle throttling errors by implementing an exponential backoff strategy.
"""
delay = 1
while True:
try:
response = getattr(self.client, method)(*args, **kwargs)
return response
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'Throttling':
time.sleep(delay)
delay *= 2
else:
raise e
def handle_errors(self, method, *args, **kwargs):
"""
Handle specific error codes and return a custom response or raise a custom exception.
"""
try:
response = getattr(self.client, method)(*args, **kwargs)
return response
except botocore.exceptions.ClientError as e:
error_code = e.response['Error']['Code']
if error_code in self.custom_errors:
return self.custom_errors[error_code]
else:
raise e
def credentials_manager(self):
"""
Manage the credentials automatically like rotating credentials and handling temporary credentials.
"""
if not self.credentials:
self.credentials = boto3.Session().get_credentials()
if self.credentials.method == 'iam-role':
if self.credentials.expired:
self.credentials.refresh()
elif self.credentials.method == 'env':
pass
else:
raise ValueError("Invalid credentials method")
ef metrics_tracker(self, method, *args, **kwargs):
"""
Track various metrics like number of API calls, success rate, and response time.
"""
start_time = time.time()
try:
response = getattr(self.client, method)(*args, **kwargs)
self.metrics['api_calls'] += 1
self.metrics['success_rate'] += 1
self.metrics['response_time'] += time.time() - start_time
return response
except botocore.exceptions.ClientError as e:
self.metrics['api_calls'] += 1
self.metrics['response_time'] += time.time() - start_time
raise e
def send_metrics(self):
"""
Send metrics to monitoring service like Prometheus.
"""
self.metrics['success_rate'] = self.metrics['success_rate']/self.metrics['api_calls']
# Code to send metrics to Prometheus
def batch_operation(self, method, **kwargs):
"""
Perform batch operations for services that support it like DynamoDB.
"""
try:
response = getattr(self.client, method)(**kwargs)
return response
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
# Divide the requests into smaller chunks and perform the operation
else:
raise e
def paginator(self, method, **kwargs):
"""
Paginate over large sets of data automatically and process it in chunks.
"""
paginator = self.client.get_paginator(method)
for page in paginator.paginate(**kwargs):
yield page
async def async_operation(self, method, **kwargs):
"""
Perform async operations and improve performance by running multiple requests in parallel.
"""
loop = asyncio.get_event_loop()
async def call_method(method, kwargs):
return await loop.run_in_executor(None, getattr(self.client, method), **kwargs)
response = await asyncio.gather(*[call_method(method, kwargs) for kwargs in kwargs])
return response
def assume_role(self, role_arn, role_session_name):
"""
Assume a role and return the temporary credentials.
"""
try:
session = boto3.Session()
credentials = session.client('sts').assume_role(
RoleArn=role_arn,
RoleSessionName=role_session_name
)
return credentials['Credentials']
except NoCredentialsError as e:
raise e
Use each of the custom methods
retry_on_failure
:
wrapper = Boto3Wrapper("s3", "us-west-2")
response = wrapper.retry_on_failure("list_objects", Bucket="mybucket")
In this example, the retry_on_failure
method is used to automatically retry the list_objects
operation on the mybucket
S3 bucket if it fails with a throttling error or a ProvisionedThroughputExceededException error.
handle_throttling
:
wrapper = Boto3Wrapper("dynamodb", "us-west-2")
response = wrapper.handle_throttling("get_item", TableName="mytable", Key={'id': {'N': '1'}})
In this example, the handle_throttling
method is used to automatically handle throttling errors by implementing an exponential backoff strategy for the get_item
operation on the mytable
DynamoDB table.
handle_errors
:
wrapper = Boto3Wrapper("s3", "us-west-2")
try:
response = wrapper.handle_errors("get_object", Bucket="mybucket", Key="mykey")
except Exception as e:
print(e)
In this example, the handle_errors
method is used to handle specific error codes and return a custom response or raise a custom exception for the get_object
operation on the mybucket
S3 bucket. If the specified error code is ‘NoSuchKey’ or ‘AccessDenied’, it will return a custom error message instead of raising the exception.
credentials_manager
:
wrapper = Boto3Wrapper("s3", "us-west-2")
wrapper.credentials_manager()
response = wrapper.client.list_buckets()
In this example, the credentials_manager
method is used to manage the credentials automatically like rotating credentials and handling temporary credentials. It checks the method of the credentials, if it’s ‘iam-role’, it checks if the credentials are expired, if so it refreshes them, if it’s ‘env’ it does nothing and if it’s other than these it raises an error.
metrics_tracker
:
wrapper = Boto3Wrapper("s3", "us-west-2")
response = wrapper.metrics_tracker("list_objects", Bucket="mybucket")
wrapper.send_metrics()
In this example, the metrics_tracker
method is used to track various metrics like number of API calls, success rate, and response time for the list_objects
operation on the mybucket
S3 bucket. After that, the send_metrics
method is used to send the metrics to monitoring service like Prometheus.
batch_operation
:
wrapper = Boto3Wrapper("dynamodb", "us-west-2")
response = wrapper.batch_operation("batch_write_item", RequestItems={"mytable": [{"PutRequest": {"Item": {"id": {"N": "1"}}}}]})
In this example, the batch_operation
method is used to perform batch operations for services that support it like DynamoDB for the batch_write_item
operation on the mytable
DynamoDB table.
paginator
:
wrapper = Boto3Wrapper("s3", "us-west-2")
for page in wrapper.paginator("list_objects_v2", Bucket="mybucket"):
print(page)
In this example, the paginator method is used to paginate over large sets of data automatically and process it in chunks for the list_objects_v2 operation on the mybucket S3 bucket.
async_operation
:
import asyncio
wrapper = Boto3Wrapper("s3", "us-west-2")
loop = asyncio.get_event_loop()
response = loop.run_until_complete(wrapper.async_operation("list_objects", Bucket="mybucket"))
In this example, the async_operation
method is used to perform async operations and improve performance by running multiple requests in parallel for the list_objects
operation on the mybucket
S3 bucket. The run_until_complete
method of the event loop is used to wait for the async call to complete.
assume_role
:
credentials = wrapper.assume_role(role_arn, role_session_name)
wrapper.client = boto3.client('s3', aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'])
response = wrapper.client.list_buckets()
This assume_role method
takes in two parameters, the role_arn
and role_session_name
, it creates a new session using the boto3.Session()
and using this session it calls the assume_role
method of the sts client, it passes the role_arn
and role_session_name
as the argument. This will return temporary credentials which can be used to make further boto3 calls with the assumed role.
You can also use the credentials to create a new session and use it to create the client and resource object.
session = boto3.Session(aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'])
wrapper.client = session.client('s3')
Please note that, above examples are just for the demonstration purpose, you may need to adjust the parameters accordingly based on your use case.
Conclusion:
Wrapping Boto3 in a custom class can improve the performance of your application by reusing the same client and resource object across multiple method calls. Additionally, you can add custom functionality to the wrapper class to further optimize your interactions with AWS services. Whether you’re new to AWS or an experienced user, wrapping Boto3 in a custom class is a powerful way to maximize your use of Boto3 and take your skills to the next level.