Skip to content

Instantly share code, notes, and snippets.

@markpollack
Created December 9, 2014 14:31
Show Gist options
  • Save markpollack/77783f95ee19082649f6 to your computer and use it in GitHub Desktop.
Save markpollack/77783f95ee19082649f6 to your computer and use it in GitHub Desktop.
POC on XD+Reactor stream processing
public interface ReactorModule {
/**
* A Stream of messages from the message bus or in the case of composed modules, from the
* previous XD module.
* @param inputStream Input stream of Messages
* @return Output stream of messages sent to the message bus or the next module in the case of
* composed modules.
*/
Stream<Message> process(Stream<Message> inputStream);
}
/**
* A handler that adapts the item at a time delivery in a message handler and
* delegates processing to a Reactor Stream via the ReactorModule interface.
* The output of the stream is consumed and sent to the output channel.
*/
public class ReactorMessageHandler extends AbstractMessageProducingHandler {
private final HotStream<Message> stream;
private final ReactorModule reactorModule;
public ReactorMessageHandler(ReactorModule reactorModule) {
Assert.notNull(reactorModule, "ReactorModule cannot be null.");
this.reactorModule = reactorModule;
//Stream with a SynchronousDispatcher as this handler is
//called by Message Listener managed threads
this.stream = Streams.defer();
//user defined stream processing
Stream<Message> outputStream = reactorModule.process(stream);
//TBD
//outputStream.when(Throwable.class)
//send processed messages on output stream to output channel
outputStream.consume(msg -> getOutputChannel().send(msg));
}
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
stream.broadcastNext(message);
}
}
public class SimpleReactorModule implements ReactorModule {
@Override
public Stream<Message> process(Stream<Message> inputStream) {
return inputStream
.map(s -> s.getPayload() + " processed!")
.map(s -> MessageBuilder.withPayload(s).build());
}
}
@Test
public void simple() {
//A context that contains a PollableChannel of capacity 10.
AbstractApplicationContext context = new ClassPathXmlApplicationContext("/META-INF/spring/xd/reactor.xml", ReactorMessageHandlerTests.class);
ReactorModuleOriginal reactorModule = new SimpleReactorModuleOriginal();
ReactorMessageHandlerOriginal messageHandler = new ReactorMessageHandlerOriginal(reactorModule);
PollableChannel outputChannel = context.getBean("outputChannel", PollableChannel.class);
messageHandler.setOutputChannel(outputChannel);
for (int i = 0; i < 5; i++) {
Message<String> msg = MessageBuilder.withPayload("hello" + i).build();
messageHandler.handleMessage(msg);
}
for (int i = 0; i < 5; i++) {
System.out.println("Output Channel " + outputChannel.receive().getPayload());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment