Last active
November 27, 2024 11:21
-
-
Save boly38/94b97f55f81ac6ee77204736f290f6ec to your computer and use it in GitHub Desktop.
Given an executor, a set of tasks, and a time limit to process them all : -> what is the way to detect & interrupt task properly on timeout ? -> could you explain the diff between two tests regarding thread flag ^^ ?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mycompany; | |
import static java.util.concurrent.Executors.newFixedThreadPool; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
import lombok.NonNull; | |
import lombok.extern.slf4j.Slf4j; | |
import org.junit.jupiter.api.Test; | |
@Slf4j | |
public class WaitForAllFuturesTest { | |
public enum WaitMode {WITH_SLEEP, WITH_PROCESSING} | |
public static class WaitFutureUtil { | |
public static void waitForAllFutures(List<Future<Void>> futures, long timeoutMs) { | |
long startTime = System.currentTimeMillis(); | |
long endTime = startTime + TimeUnit.MILLISECONDS.toMillis(timeoutMs); | |
log.debug("waitForAllFutures"); | |
try { | |
for (Future<Void> future : futures) { | |
try { | |
long remainingTime = endTime - System.currentTimeMillis();// wait until limit | |
if (remainingTime > 0) { | |
future.get(remainingTime, TimeUnit.MILLISECONDS);// may throw InterruptedException, ExecutionException, TimeoutException | |
} | |
} catch (ExecutionException e) { | |
System.out.println("Exception in future execution: " + e.getCause()); | |
} | |
// InterruptedException|TimeoutException are caught globally (and not inside for loop) to cancel/interrupt rest of futures | |
} | |
} catch (InterruptedException | TimeoutException stopped) { | |
String cause = stopped instanceof InterruptedException ? "Interrupted" : "Timeout"; | |
List<Future<Void>> notDoneNorCancelledFutures = futures.stream().filter(f -> !f.isDone() && !f.isCancelled()).toList(); | |
log.info("{} while waiting for some future(s) to complete, so cancelling {} not done task(s)", | |
cause, notDoneNorCancelledFutures.size() | |
); | |
notDoneNorCancelledFutures.forEach(f -> f.cancel(true)); | |
} | |
long totalTime = System.currentTimeMillis() - startTime; | |
log.debug("{} tasks processed in {} ms", futures.size(), totalTime); | |
} | |
} | |
private final ExecutorService executorService = newFixedThreadPool(2); | |
@Test | |
public void testWaitForTimeout_with_sleep() { | |
List<Future<Void>> futures = new ArrayList<>(); | |
for (int i = 1; i <= 5; i++) { | |
Callable<Void> task = craftATask(i, WaitMode.WITH_SLEEP); | |
Future<Void> submittedTask = executorService.submit(task); | |
futures.add(submittedTask); | |
} | |
WaitFutureUtil.waitForAllFutures(futures, 6000); | |
} | |
@Test | |
public void testWaitForTimeout_with_processing() { | |
List<Future<Void>> futures = new ArrayList<>(); | |
for (int i = 1; i <= 5; i++) { | |
Callable<Void> task = craftATask(i, WaitMode.WITH_PROCESSING); | |
Future<Void> submittedTask = executorService.submit(task); | |
futures.add(submittedTask); | |
} | |
WaitFutureUtil.waitForAllFutures(futures, 6000); | |
} | |
private @NonNull Callable<Void> craftATask(int i, WaitMode waitMode) { | |
final int taskId = i; | |
return () -> { | |
if (Thread.currentThread().isInterrupted()) { | |
logTask(taskId, "SKIPPED"); | |
return null; // Void | |
} | |
long startTime = System.currentTimeMillis(); | |
logTask(taskId, " started at " + startTime); | |
try { | |
waitChecked(taskId, waitMode, 5);// 5 sec with sleep mode, else less | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
throw e; | |
} | |
long endTime = System.currentTimeMillis(); | |
logTask(taskId, " finished at " + endTime); | |
return null; // Void | |
}; | |
} | |
private static void logTask(int taskId, String comment) { | |
System.out.printf("Task [%d][%s] isInterrupted:%s isAlive:%s\n".formatted( | |
taskId, comment, Thread.currentThread().isInterrupted(), Thread.currentThread().isAlive() | |
)); | |
System.out.flush(); | |
} | |
public static void smallProcessingChecked() { | |
long startTime = System.currentTimeMillis(); | |
@SuppressWarnings("unused")// used for processing time simulation purpose | |
double sum = 0.0; | |
for (long i = 0; i < 100000000; i++) { | |
sum += 10.0; | |
if (Thread.currentThread().isInterrupted()) { | |
long durationMs = System.currentTimeMillis() - startTime; | |
log.debug("smallProcessingChecked Exiting interrupted after {}ms", durationMs); | |
return; | |
} | |
} | |
long durationMs = System.currentTimeMillis() - startTime; | |
log.debug("smallProcessingChecked end after {}ms", durationMs); | |
} | |
private static void waitChecked(int taskId, WaitMode waitMode, int halfSecEstimation) throws InterruptedException { | |
for (int i = 0; i < (halfSecEstimation * 2); i++) { | |
if (Thread.currentThread().isInterrupted()) { | |
logTask(taskId, " INTERRUPTED (loop begin)"); | |
throw new InterruptedException("PAF ! " + taskId); | |
} | |
logTask(taskId, " ..."); | |
if (waitMode == WaitMode.WITH_SLEEP) { | |
try { | |
Thread.sleep(500); | |
} catch (InterruptedException ie) { | |
logTask(taskId, " sleep INTERRUPTED !"); | |
throw ie; | |
} | |
} else { // (waitMode == WaitMode.WITH_PROCESSING) { | |
smallProcessingChecked();// may take 500ms but more/less on next iteration or other host :) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment