|
import os |
|
import asyncio |
|
import aiobotocore |
|
import multiprocessing as mp |
|
import io |
|
from PIL import Image |
|
|
|
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] |
|
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] |
|
|
|
|
|
async def get_s3_obj(client, s3_bucket, s3_key): |
|
"""send request and retrieve the obj from S3""" |
|
resp = await client.get_object( |
|
Bucket=s3_bucket, |
|
Key=s3_key, |
|
) |
|
obj = await resp['Body'].read() |
|
return obj |
|
|
|
|
|
async def download_img(client, s3_bucket, s3_key, save_path): |
|
"""convert the obj to image and save""" |
|
obj = await get_s3_obj(client, s3_bucket, s3_key) |
|
img = Image.open(io.BytesIO(obj)) |
|
img.save(save_path) |
|
|
|
|
|
async def launch(loop, download_list): |
|
"""launch the process to download multiple files""" |
|
session = aiobotocore.get_session(loop=loop) |
|
async with session.create_client( |
|
's3', |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
) as client: |
|
tasks = [download_img(client, *f) for f in download_list] |
|
await asyncio.gather(*tasks) |
|
|
|
|
|
def single_process(download_list): |
|
"""mission for single process""" |
|
loop = asyncio.new_event_loop() |
|
loop.run_until_complete(launch(loop, download_list)) |
|
|
|
|
|
if __name__ == "__main__": |
|
download_list = [ |
|
("s3_bucket1", "s3_key1", "save_path1"), |
|
("s3_bucket2", "s3_key2", "save_path2"), |
|
("s3_bucket3", "s3_key3", "save_path3"), |
|
] |
|
|
|
# number of process |
|
no_mp = 8 # or no_mp = mp.cpu_count() |
|
|
|
# number of tasks |
|
no_tasks = 16 |
|
|
|
download_list_chunk = [download_list[i::no_tasks] for i in range(no_tasks)] |
|
with mp.Pool(no_mp) as p: |
|
p.map(single_process, download_list_chunk) |
Thank you for sharing this!