Skip to content

Instantly share code, notes, and snippets.

@markpollack
Created December 10, 2014 05:50
Show Gist options
  • Save markpollack/ce922dbedfab986b3067 to your computer and use it in GitHub Desktop.
Save markpollack/ce922dbedfab986b3067 to your computer and use it in GitHub Desktop.
POC on XD+RxJava Stream
/**
* Create a new Subject, if not already present in thread local storage.
* @param data
*/
void process2(String data) {
System.out.println(Thread.currentThread().getName() + "> Received data in handler " + data);
BehaviorSubject<Object> subject = createSubjectIfNeeded();
//wire up
Observable<Object> out = doWork(subject);
final Subscription subscription = out.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(Thread.currentThread().getName() + "> Going to send to message bus " + o);
}
});
//send in data
subject.onNext(data);
//TBD
//subject.onError();
subscription.unsubscribe();
}
/**
* Create a new observable per invocation
* @param data
*/
public void process4(final String data) {
System.out.println(Thread.currentThread().getName() + "> Creating new observable - " + data);
Observable<Object> objectObservable = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
try {
System.out.println(Thread.currentThread().getName() + "> Going to call onNext " + data);
subscriber.onNext(data);
if (false == subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch (Throwable t) {
if (false == subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}
}
});
Observable < Object > out = doWork(objectObservable);
final Subscription subscription = out.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(Thread.currentThread().getName() + "> Going to send to message bus " + o);
}
});
subscription.unsubscribe();
}
private BehaviorSubject<Object> createSubjectIfNeeded() {
if (this.currentSubject.get() == null) {
currentSubject.set(BehaviorSubject.create());
}
return this.currentSubject.get();
}
//Driver app
public static void main(String[] args) throws IOException {
IntegrationHandler handler = new IntegrationHandler();
ExecutorService executorService = Executors.newFixedThreadPool(14);
for (int i = 0; i < 10; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
handler.process2("mark-" + UUID.randomUUID());
}
});
}
System.in.read();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment