Created
January 7, 2021 14:37
-
-
Save RyanSnodgrass/b36e7362393aace723e4c63e69ed0541 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.futures | |
def cancel_further_threads(self, futures, number_of_finished_page): | |
for fut, page in futures.items(): | |
if page > number_of_finished_page: | |
fut.cancel() | |
return futures |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from unittest import mock | |
import concurrent.futures | |
from cancel_further_threads import cancel_further_threads | |
class TestCancelFurtherThreads(unittest.TestCase): | |
def test_cancelling_future_threads(self): | |
"""When a thread comes back with the finished condition(zero api results)- | |
then it is time to cancel any later threads currently running. | |
Loop through the dict of futures and set to cancel the threads later | |
than the finished thread. | |
The thread that returns with no results could be right next in line with | |
the others that have already finished with full results, or it could be | |
several page numbers down the line due to race conditions. For now just | |
find the rest of the numbers that the thread finished and cancel the | |
later page numbers.""" | |
sample_futures = { | |
concurrent.futures.Future(): page for page in range(0, 7) | |
} | |
for fut, page in sample_futures.items(): | |
if page in range(0, 2): | |
fut.set_result(pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})) | |
if page == 4: | |
fut.set_result(pd.DataFrame()) | |
number_of_finished_page = 4 | |
bygone_conclusion_futures = cancel_further_threads( | |
sample_futures, | |
number_of_finished_page | |
) | |
for fut, page in bygone_conclusion_futures.items(): | |
self.assertTrue(fut.done()) if page == 0 else None | |
self.assertTrue(fut.done()) if page == 1 else None | |
self.assertFalse(fut.done()) if page == 2 else None | |
self.assertFalse(fut.done()) if page == 3 else None | |
self.assertTrue(fut.done()) if page == 4 else None | |
self.assertTrue(fut.cancelled()) if page == 5 else None | |
self.assertTrue(fut.cancelled()) if page == 6 else None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The initial idea was that the multithreader would run infinitely until the first thread returned with a finished status, when that occurred it needed a way to cancel any later still running threads.