from __future__ import unicode_literals, print_function
import os
import json
import boto3
import botocore
from boto3.session import Session
from libraries.general_tools.file_utils import get_mime_type
[docs]class S3Handler(object):
def __init__(self, bucket_name=None, aws_access_key_id=None, aws_secret_access_key=None,
aws_region_name='us-west-2'):
self.bucket_name = bucket_name
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.aws_region_name = aws_region_name
self.bucket = None
self.client = None
self.resource = None
self.setup_resources()
[docs] def setup_resources(self):
if self.aws_access_key_id and self.aws_secret_access_key:
session = Session(aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.aws_region_name)
self.resource = session.resource('s3')
self.client = session.client('s3')
else:
self.resource = boto3.resource('s3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.aws_region_name)
self.client = boto3.client('s3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.aws_region_name)
self.bucket_name = self.bucket_name
self.bucket = None
if self.bucket_name:
self.bucket = self.resource.Bucket(self.bucket_name)
[docs] def download_file(self, key, local_file):
"""
Download file from S3 bucket. Similar to s3.download_file except that does
not play nicely with moto, this however, does.
:param string key: object to download
:param string local_file: file to download to
"""
body = self.resource.Object(bucket_name=self.bucket_name, key=key).get()['Body']
with open(local_file, 'wb') as f:
for chunk in iter(lambda: body.read(1024), b''):
f.write(chunk)
# Downloads all the files in S3 that have a prefix of `key_prefix` from `bucket` to the `local` directory
[docs] def download_dir(self, key_prefix, local):
paginator = self.client.get_paginator('list_objects')
for result in paginator.paginate(Bucket=self.bucket_name, Delimiter='/', Prefix=key_prefix):
if result.get('CommonPrefixes') is not None:
for subdir in result.get('CommonPrefixes'):
self.download_dir(subdir.get('Prefix'), local)
if result.get('Contents') is not None:
for file in result.get('Contents'):
local_file = os.path.join(local, file.get('Key'))
if local_file.endswith('/'):
pass
else:
if not os.path.exists(os.path.dirname(local_file)):
os.makedirs(os.path.dirname(local_file))
self.download_file(file.get('Key'), local_file)
[docs] def key_exists(self, key, bucket_name=None):
if not bucket_name:
bucket = self.bucket
else:
bucket = self.resource.Bucket(bucket_name)
try:
bucket.Object(key=key).load()
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
exists = False
else:
raise
else:
exists = True
return exists
[docs] def key_modified_time(self, key, bucket_name=None):
"""
get last modified time for key
:param key:
:param bucket_name:
:return:
"""
if not bucket_name:
bucket = self.bucket
else:
bucket = self.resource.Bucket(bucket_name)
try:
s3_object = bucket.Object(key=key)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
return None
else:
raise
return s3_object.last_modified
[docs] def copy(self, from_key, from_bucket=None, to_key=None, catch_exception=True):
if not to_key:
to_key = from_key
if not from_bucket:
from_bucket = self.bucket_name
if catch_exception:
try:
return self.resource.Object(bucket_name=self.bucket_name, key=to_key).copy_from(
CopySource='{0}/{1}'.format(from_bucket, from_key))
except:
return False
else:
return self.resource.Object(bucket_name=self.bucket_name, key=to_key).copy_from(
CopySource='{0}/{1}'.format(from_bucket, from_key))
[docs] def replace(self, key, catch_exception=True):
if catch_exception:
try:
return self.resource.Object(bucket_name=self.bucket_name, key=key).copy_from(
CopySource='{0}/{1}'.format(self.bucket_name, key), MetadataDirective='REPLACE')
except:
return False
else:
return self.resource.Object(bucket_name=self.bucket_name, key=key).copy_from(
CopySource='{0}/{1}'.format(self.bucket_name, key), MetadataDirective='REPLACE')
[docs] def upload_file(self, path, key, cache_time=600, content_type=None):
"""
Upload file to S3 storage. Similar to the s3.upload_file, however, that
does not work nicely with moto, whereas this function does.
:param string path: file to upload
:param string key: name of the object in the bucket
"""
with open(path, 'rb') as f:
binary = f.read()
if content_type is None:
content_type = get_mime_type(path)
self.bucket.put_object(
Key=key,
Body=binary,
ContentType=content_type,
CacheControl='max-age={0}'.format(cache_time)
)
[docs] def get_object(self, key):
return self.resource.Object(bucket_name=self.bucket_name, key=key)
[docs] def redirect(self, key, location):
self.bucket.put_object(Key=key, WebsiteRedirectLocation=location, CacheControl='max-age=0')
[docs] def get_file_contents(self, key, catch_exception=True):
if catch_exception:
try:
return self.get_object(key).get()['Body'].read()
except:
return None
else:
return self.get_object(key).get()['Body'].read()
[docs] def get_json(self, key, catch_exception = True):
if catch_exception:
try:
return json.loads(self.get_file_contents(key))
except:
return {}
else:
return json.loads(self.get_file_contents(key, catch_exception))
[docs] def get_objects(self, prefix=None, suffix=None):
filtered = []
objects = self.bucket.objects.filter(Prefix=prefix)
if objects:
if suffix:
for obj in objects:
if obj.key.endswith(suffix):
filtered.append(obj)
else:
filtered = objects
return filtered
[docs] def put_contents(self, key, body, catch_exception=True):
if catch_exception:
try:
return self.get_object(key).put(Body=body)
except:
return None
else:
return self.get_object(key).put(Body=body)
[docs] def delete_file(self, key, catch_exception=True):
if catch_exception:
try:
return self.resource.Object(bucket_name=self.bucket_name, key=key).delete()
except:
return False
else:
return self.resource.Object(bucket_name=self.bucket_name, key=key).delete()
[docs] def create_bucket(self, bucket_name=None, catch_exception=True):
if not bucket_name:
bucket_name = self.bucket_name
if catch_exception:
try:
return self.resource.create_bucket(Bucket=bucket_name)
except:
return None
else:
return self.resource.create_bucket(Bucket=bucket_name)