Created
April 9, 2025 16:08
-
-
Save dhsrocha/4473a7328953fd18a1fc8ba1f796f40e to your computer and use it in GitHub Desktop.
The Bus class is a thread-safe message bus that implements the Producer-Consumer pattern, allowing asynchronous message handling. It uses a BlockingQueue for queuing messages and an ExecutorService for processing them via an observer.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Objects; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.function.Consumer; | |
/** | |
* A thread-safe message bus that allows for asynchronous message handling. | |
* <p> | |
* This class utilizes the Producer-Consumer pattern, where the producer is responsible for sending | |
* messages to the bus and the consumer (the observer) processes those messages asynchronously. | |
* | |
* @param <T> the type of messages that this bus can handle. | |
* @author <a href="mailto:[email protected]">Diego Rocha</a> | |
*/ | |
final class Bus<T> implements Consumer<T> { | |
private final AtomicBoolean active = new AtomicBoolean(true); | |
private final BlockingQueue<T> queue = new LinkedBlockingQueue<>(); | |
private final ExecutorService exec; | |
private final Observer<T> observer; | |
/** | |
* Constructs a new Bus instance with the specified executor service and observer. | |
* | |
* @param exec the executor service used for dispatching messages. | |
* @param observer the observer that handles the messages. | |
* @throws NullPointerException if exec or observer is null. | |
*/ | |
Bus(final ExecutorService exec, final Observer<T> observer) { | |
this.exec = Objects.requireNonNull(exec); | |
this.observer = Objects.requireNonNull(observer); | |
exec.submit(this::dispatch); | |
} | |
/** | |
* Accepts a message and puts it into the queue for processing. | |
* | |
* @param obj the message to be accepted. | |
* @throws IllegalStateException if the bus is not active. | |
*/ | |
@Override | |
public void accept(final T obj) { | |
try { | |
checkIfUsable(); | |
this.queue.put(obj); | |
} catch (final InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
/** | |
* Continuously dispatches messages from the queue to the observer until the bus is inactive. | |
*/ | |
private void dispatch() { | |
while (active.get()) { | |
try { | |
this.observer.observe(this.queue.take()); | |
} catch (final Exception e) { | |
Thread.currentThread().interrupt(); | |
break; | |
} | |
} | |
this.exec.shutdown(); | |
this.active.set(false); | |
} | |
/** | |
* Checks if the bus is currently active and able to accept messages. | |
* | |
* @return true if the bus is active, false otherwise. | |
*/ | |
private boolean isActive() { | |
return this.active.get() && !(this.exec.isShutdown() && this.exec.isTerminated()); | |
} | |
/** | |
* Checks if the bus is usable and throws an exception if it is not. | |
* | |
* @throws IllegalStateException if the bus is not active. | |
*/ | |
private void checkIfUsable() { | |
if (!isActive()) { | |
throw new IllegalStateException("Executor is shut down."); | |
} | |
} | |
/** | |
* An interface for observing messages received by the Bus. | |
* | |
* @param <T> the type of messages that the observer can handle. | |
* @author <a href="mailto:[email protected]">Diego Rocha</a> | |
* @see Bus | |
*/ | |
interface Observer<T> { | |
/** | |
* Called when a new message is observed. | |
* | |
* @param t the message to observe. | |
*/ | |
void observe(final T t); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment