Created
December 9, 2014 14:31
-
-
Save markpollack/77783f95ee19082649f6 to your computer and use it in GitHub Desktop.
POC on XD+Reactor stream processing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface ReactorModule { | |
/** | |
* A Stream of messages from the message bus or in the case of composed modules, from the | |
* previous XD module. | |
* @param inputStream Input stream of Messages | |
* @return Output stream of messages sent to the message bus or the next module in the case of | |
* composed modules. | |
*/ | |
Stream<Message> process(Stream<Message> inputStream); | |
} | |
/** | |
* A handler that adapts the item at a time delivery in a message handler and | |
* delegates processing to a Reactor Stream via the ReactorModule interface. | |
* The output of the stream is consumed and sent to the output channel. | |
*/ | |
public class ReactorMessageHandler extends AbstractMessageProducingHandler { | |
private final HotStream<Message> stream; | |
private final ReactorModule reactorModule; | |
public ReactorMessageHandler(ReactorModule reactorModule) { | |
Assert.notNull(reactorModule, "ReactorModule cannot be null."); | |
this.reactorModule = reactorModule; | |
//Stream with a SynchronousDispatcher as this handler is | |
//called by Message Listener managed threads | |
this.stream = Streams.defer(); | |
//user defined stream processing | |
Stream<Message> outputStream = reactorModule.process(stream); | |
//TBD | |
//outputStream.when(Throwable.class) | |
//send processed messages on output stream to output channel | |
outputStream.consume(msg -> getOutputChannel().send(msg)); | |
} | |
@Override | |
protected void handleMessageInternal(Message<?> message) throws Exception { | |
stream.broadcastNext(message); | |
} | |
} | |
public class SimpleReactorModule implements ReactorModule { | |
@Override | |
public Stream<Message> process(Stream<Message> inputStream) { | |
return inputStream | |
.map(s -> s.getPayload() + " processed!") | |
.map(s -> MessageBuilder.withPayload(s).build()); | |
} | |
} | |
@Test | |
public void simple() { | |
//A context that contains a PollableChannel of capacity 10. | |
AbstractApplicationContext context = new ClassPathXmlApplicationContext("/META-INF/spring/xd/reactor.xml", ReactorMessageHandlerTests.class); | |
ReactorModuleOriginal reactorModule = new SimpleReactorModuleOriginal(); | |
ReactorMessageHandlerOriginal messageHandler = new ReactorMessageHandlerOriginal(reactorModule); | |
PollableChannel outputChannel = context.getBean("outputChannel", PollableChannel.class); | |
messageHandler.setOutputChannel(outputChannel); | |
for (int i = 0; i < 5; i++) { | |
Message<String> msg = MessageBuilder.withPayload("hello" + i).build(); | |
messageHandler.handleMessage(msg); | |
} | |
for (int i = 0; i < 5; i++) { | |
System.out.println("Output Channel " + outputChannel.receive().getPayload()); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment