Skip to content

Instantly share code, notes, and snippets.

@dhsrocha
Created April 9, 2025 16:08
Show Gist options
  • Save dhsrocha/4473a7328953fd18a1fc8ba1f796f40e to your computer and use it in GitHub Desktop.
Save dhsrocha/4473a7328953fd18a1fc8ba1f796f40e to your computer and use it in GitHub Desktop.
The Bus class is a thread-safe message bus that implements the Producer-Consumer pattern, allowing asynchronous message handling. It uses a BlockingQueue for queuing messages and an ExecutorService for processing them via an observer.
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
* A thread-safe message bus that allows for asynchronous message handling.
* <p>
* This class utilizes the Producer-Consumer pattern, where the producer is responsible for sending
* messages to the bus and the consumer (the observer) processes those messages asynchronously.
*
* @param <T> the type of messages that this bus can handle.
* @author <a href="mailto:[email protected]">Diego Rocha</a>
*/
final class Bus<T> implements Consumer<T> {
private final AtomicBoolean active = new AtomicBoolean(true);
private final BlockingQueue<T> queue = new LinkedBlockingQueue<>();
private final ExecutorService exec;
private final Observer<T> observer;
/**
* Constructs a new Bus instance with the specified executor service and observer.
*
* @param exec the executor service used for dispatching messages.
* @param observer the observer that handles the messages.
* @throws NullPointerException if exec or observer is null.
*/
Bus(final ExecutorService exec, final Observer<T> observer) {
this.exec = Objects.requireNonNull(exec);
this.observer = Objects.requireNonNull(observer);
exec.submit(this::dispatch);
}
/**
* Accepts a message and puts it into the queue for processing.
*
* @param obj the message to be accepted.
* @throws IllegalStateException if the bus is not active.
*/
@Override
public void accept(final T obj) {
try {
checkIfUsable();
this.queue.put(obj);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Continuously dispatches messages from the queue to the observer until the bus is inactive.
*/
private void dispatch() {
while (active.get()) {
try {
this.observer.observe(this.queue.take());
} catch (final Exception e) {
Thread.currentThread().interrupt();
break;
}
}
this.exec.shutdown();
this.active.set(false);
}
/**
* Checks if the bus is currently active and able to accept messages.
*
* @return true if the bus is active, false otherwise.
*/
private boolean isActive() {
return this.active.get() && !(this.exec.isShutdown() && this.exec.isTerminated());
}
/**
* Checks if the bus is usable and throws an exception if it is not.
*
* @throws IllegalStateException if the bus is not active.
*/
private void checkIfUsable() {
if (!isActive()) {
throw new IllegalStateException("Executor is shut down.");
}
}
/**
* An interface for observing messages received by the Bus.
*
* @param <T> the type of messages that the observer can handle.
* @author <a href="mailto:[email protected]">Diego Rocha</a>
* @see Bus
*/
interface Observer<T> {
/**
* Called when a new message is observed.
*
* @param t the message to observe.
*/
void observe(final T t);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment