Skip to content

Instantly share code, notes, and snippets.

@boly38
Last active November 27, 2024 11:21
Show Gist options
  • Save boly38/94b97f55f81ac6ee77204736f290f6ec to your computer and use it in GitHub Desktop.
Save boly38/94b97f55f81ac6ee77204736f290f6ec to your computer and use it in GitHub Desktop.
Given an executor, a set of tasks, and a time limit to process them all : -> what is the way to detect & interrupt task properly on timeout ? -> could you explain the diff between two tests regarding thread flag ^^ ?
package com.mycompany;
import static java.util.concurrent.Executors.newFixedThreadPool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
@Slf4j
public class WaitForAllFuturesTest {
public enum WaitMode {WITH_SLEEP, WITH_PROCESSING}
public static class WaitFutureUtil {
public static void waitForAllFutures(List<Future<Void>> futures, long timeoutMs) {
long startTime = System.currentTimeMillis();
long endTime = startTime + TimeUnit.MILLISECONDS.toMillis(timeoutMs);
log.debug("waitForAllFutures");
try {
for (Future<Void> future : futures) {
try {
long remainingTime = endTime - System.currentTimeMillis();// wait until limit
if (remainingTime > 0) {
future.get(remainingTime, TimeUnit.MILLISECONDS);// may throw InterruptedException, ExecutionException, TimeoutException
}
} catch (ExecutionException e) {
System.out.println("Exception in future execution: " + e.getCause());
}
// InterruptedException|TimeoutException are caught globally (and not inside for loop) to cancel/interrupt rest of futures
}
} catch (InterruptedException | TimeoutException stopped) {
String cause = stopped instanceof InterruptedException ? "Interrupted" : "Timeout";
List<Future<Void>> notDoneNorCancelledFutures = futures.stream().filter(f -> !f.isDone() && !f.isCancelled()).toList();
log.info("{} while waiting for some future(s) to complete, so cancelling {} not done task(s)",
cause, notDoneNorCancelledFutures.size()
);
notDoneNorCancelledFutures.forEach(f -> f.cancel(true));
}
long totalTime = System.currentTimeMillis() - startTime;
log.debug("{} tasks processed in {} ms", futures.size(), totalTime);
}
}
private final ExecutorService executorService = newFixedThreadPool(2);
@Test
public void testWaitForTimeout_with_sleep() {
List<Future<Void>> futures = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
Callable<Void> task = craftATask(i, WaitMode.WITH_SLEEP);
Future<Void> submittedTask = executorService.submit(task);
futures.add(submittedTask);
}
WaitFutureUtil.waitForAllFutures(futures, 6000);
}
@Test
public void testWaitForTimeout_with_processing() {
List<Future<Void>> futures = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
Callable<Void> task = craftATask(i, WaitMode.WITH_PROCESSING);
Future<Void> submittedTask = executorService.submit(task);
futures.add(submittedTask);
}
WaitFutureUtil.waitForAllFutures(futures, 6000);
}
private @NonNull Callable<Void> craftATask(int i, WaitMode waitMode) {
final int taskId = i;
return () -> {
if (Thread.currentThread().isInterrupted()) {
logTask(taskId, "SKIPPED");
return null; // Void
}
long startTime = System.currentTimeMillis();
logTask(taskId, " started at " + startTime);
try {
waitChecked(taskId, waitMode, 5);// 5 sec with sleep mode, else less
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
}
long endTime = System.currentTimeMillis();
logTask(taskId, " finished at " + endTime);
return null; // Void
};
}
private static void logTask(int taskId, String comment) {
System.out.printf("Task [%d][%s] isInterrupted:%s isAlive:%s\n".formatted(
taskId, comment, Thread.currentThread().isInterrupted(), Thread.currentThread().isAlive()
));
System.out.flush();
}
public static void smallProcessingChecked() {
long startTime = System.currentTimeMillis();
@SuppressWarnings("unused")// used for processing time simulation purpose
double sum = 0.0;
for (long i = 0; i < 100000000; i++) {
sum += 10.0;
if (Thread.currentThread().isInterrupted()) {
long durationMs = System.currentTimeMillis() - startTime;
log.debug("smallProcessingChecked Exiting interrupted after {}ms", durationMs);
return;
}
}
long durationMs = System.currentTimeMillis() - startTime;
log.debug("smallProcessingChecked end after {}ms", durationMs);
}
private static void waitChecked(int taskId, WaitMode waitMode, int halfSecEstimation) throws InterruptedException {
for (int i = 0; i < (halfSecEstimation * 2); i++) {
if (Thread.currentThread().isInterrupted()) {
logTask(taskId, " INTERRUPTED (loop begin)");
throw new InterruptedException("PAF ! " + taskId);
}
logTask(taskId, " ...");
if (waitMode == WaitMode.WITH_SLEEP) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
logTask(taskId, " sleep INTERRUPTED !");
throw ie;
}
} else { // (waitMode == WaitMode.WITH_PROCESSING) {
smallProcessingChecked();// may take 500ms but more/less on next iteration or other host :)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment