Created
January 7, 2021 21:19
-
-
Save RyanSnodgrass/8418101443e258b82eab0a84b482b5b2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import ThreadPoolExecutor | |
import concurrent.futures | |
import time | |
import logging | |
from datetime import datetime, date | |
import pandas as pd | |
from itertools import count | |
from panopto_soap import ( | |
PanoptoSoapUsers, | |
PanoptoSoapFolders, | |
PanoptoSoapSessions, | |
PanoptoSoapCaliper | |
) | |
logging.basicConfig(level=20) | |
logging.getLogger('snowflake').setLevel(logging.WARNING) | |
logging.getLogger('zeep').setLevel(logging.ERROR) | |
log = logging.getLogger(__name__) | |
log.setLevel(logging.INFO) | |
class MultiThreadPanopto: | |
def cancel_further_threads(self, futures, number_of_finished_page): | |
for fut, page in futures.items(): | |
if page > number_of_finished_page: | |
fut.cancel() | |
return futures | |
# 189, 180, 213 seconds | |
def call_for_users_concurrent(self): | |
panopto = PanoptoSoapUsers() | |
# page_range = list(range(1, 120)) | |
for page in range(0, 25): | |
panopto.wrap_users_call(page) | |
return panopto | |
def multithreading_goes_forever_and_never_stops(self): | |
panopto = PanoptoSoapUsers() | |
with concurrent.futures.ThreadPoolExecutor(10) as executor: | |
{executor.submit(panopto.wrap_users_call, page): page for page in count()} | |
# 24 seconds # 21246, 23246 results | |
def multi_thread_in_explicit_number_loop(self): | |
panopto = PanoptoSoapUsers() | |
with concurrent.futures.ThreadPoolExecutor(10) as executor: | |
{executor.submit(panopto.wrap_users_call, page): page for page in range(0, 25)} | |
return panopto | |
# !!!!! 23 seconds # 24246, 24246 results | |
def multithread_explicit_number_loop_with_futures_result(self): | |
panopto = PanoptoSoapUsers() | |
df = pd.DataFrame() | |
with concurrent.futures.ThreadPoolExecutor(10) as executor: | |
futures = { | |
executor.submit(panopto.wrap_users_call, page): page for page in range(0, 25) | |
} | |
for fut in concurrent.futures.as_completed(futures): | |
df = df.append(fut.result()) | |
return df | |
def multithread_cancel_any_further_threads_on_first_return(self): | |
panopto = PanoptoSoapUsers() | |
df = pd.DataFrame() | |
with concurrent.futures.ThreadPoolExecutor(10) as executor: | |
futures = { | |
executor.submit(panopto.wrap_users_call, page): page for page in range(0, 30) | |
} | |
# Wait for the first thread to complete. | |
done, not_done = concurrent.futures.wait( | |
futures, return_when=concurrent.futures.FIRST_COMPLETED | |
) | |
print('first completed came back') | |
for fut in done: | |
number_of_finished_page = futures[fut] | |
print(f'page number of first return: {number_of_finished_page}') | |
print('cancelling further threads') | |
self.cancel_further_threads(futures, number_of_finished_page) | |
for fut in concurrent.futures.as_completed(futures): | |
if not fut.cancelled(): | |
df = df.append(fut.result()) | |
return df | |
def wip_multithread_set_done_flag_when_api_has_no_more_results(self): | |
panopto = PanoptoSoapUsers() | |
df = pd.DataFrame() | |
with concurrent.futures.ThreadPoolExecutor(10) as executor: | |
futures = { | |
executor.submit(panopto.wrap_users_call, page): page for page in range(0, 30) | |
} | |
while True: | |
# Wait for the first thread to complete. | |
done, not_done = concurrent.futures.wait( | |
futures, return_when=concurrent.futures.FIRST_COMPLETED | |
) | |
for fut in done: | |
number_of_finished_page = futures[fut] | |
print(f'page number of return: {number_of_finished_page}') | |
if not fut.cancelled(): | |
print(f'length of returned result: {len(fut.result())}') | |
# check if we've reached the end of pagination | |
if len(fut.result()) != panopto.max_num: | |
# cancel any further threads | |
print('cancelling further threads') | |
self.cancel_further_threads(futures, number_of_finished_page) | |
# # add the next api call if we're not at the end. | |
# futures[ | |
# executor.submit(panopto.wrap_users_call, page_number) | |
# ] = page_number | |
# page_number += 1 | |
# else: | |
# for f in futures: | |
# breakpoint() | |
# break | |
for fut in concurrent.futures.as_completed(futures): | |
if not fut.cancelled(): | |
df = df.append(fut.result()) | |
return df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment