Skip to content

Instantly share code, notes, and snippets.

@markpollack
Last active August 29, 2015 14:12
Show Gist options
  • Save markpollack/586229a93b03f895954c to your computer and use it in GitHub Desktop.
Save markpollack/586229a93b03f895954c to your computer and use it in GitHub Desktop.
WIP RxJava
package spring.xd.rxjava;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.ResolvableType;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Created by mpollack on 1/6/15.
*/
public class SubjectMessageHandler extends AbstractMessageProducingHandler {
protected final Log logger = LogFactory.getLog(getClass());
private final ConcurrentMap<Long, BehaviorSubject<Object>> subjectMap =
new ConcurrentHashMap<Long, BehaviorSubject<Object>>();
private final Object monitor = new Object();
@SuppressWarnings("rawtypes")
private final Processor processor;
private final ResolvableType inputType;
@SuppressWarnings({"unchecked", "rawtypes"})
public SubjectMessageHandler(Processor processor) {
Assert.notNull(processor, "processor cannot be null.");
this.processor = processor;
Method method = ReflectionUtils.findMethod(this.processor.getClass(), "process", Observable.class);
this.inputType = ResolvableType.forMethodParameter(method, 0).getNested(2);
}
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
Subject subjectToUse = getSubject();
if (ClassUtils.isAssignable(inputType.getRawClass(), message.getClass())) {
subjectToUse.onNext(message);
} else if (ClassUtils.isAssignable(inputType.getRawClass(), message.getPayload().getClass())) {
//TODO handle type conversion of payload to input type if possible
System.out.println(Thread.currentThread().getName() +
"> Inside handleMessageInternal, sending to subject data = " + message.getPayload());
subjectToUse.onNext(message.getPayload());
}
//else log error.
}
private Subject getSubject() {
long idToUse = Thread.currentThread().getId();
Subject subject = subjectMap.get(idToUse);
if (subject == null) {
BehaviorSubject existingSubject = subjectMap.putIfAbsent(idToUse, BehaviorSubject.create());
if (existingSubject == null)
subject = subjectMap.get(idToUse);
synchronized (this.monitor) {
if (!subject.hasObservers()) {
//user defined stream processing
Observable<?> outputStream = processor.process(subject);
//Error handling
final Subscription subscription = outputStream.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(Thread.currentThread().getName() + "> Going to send to message bus " + o);
}
});
//TBD keep track of subscriptions to unsubscribe cleanly...
}
}
}
return subject;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment