Created
July 26, 2016 10:16
-
-
Save zoetrope/fad1071c471f8092d179a7b3628c2ef7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.Observer; | |
import rx.Producer; | |
import rx.Subscriber; | |
import rx.schedulers.Schedulers; | |
import java.util.ArrayList; | |
import java.util.List; | |
public class BackPressureSample { | |
public static void main(String[] args) throws Exception { | |
MyObservable<Integer> subject = MyObservable.create(); | |
MyWorker<Integer> sw = new MyWorker<>(); | |
subject | |
.observeOn(Schedulers.newThread(), 1) // ここでbufferSizeを渡しておかないと、デフォルトのrequest値が128になる。 | |
.filter(x -> x % 2 == 0) | |
.subscribe(sw); | |
subject.onNext(1); | |
Thread.sleep(500); | |
subject.onNext(2); | |
Thread.sleep(500); | |
subject.onNext(3); | |
Thread.sleep(500); | |
subject.onNext(4); | |
Thread.sleep(2000); | |
} | |
} | |
class MyProducer implements Producer { | |
private long req = 0; | |
long getRequest() { | |
System.out.println("MyProducer getRequest: " + req); | |
return req; | |
} | |
void decrement() { | |
req--; | |
System.out.println("MyProducer decrement: " + req); | |
} | |
public void request(long n) { | |
System.out.println("MyProducer request: " + n); | |
req = n; | |
} | |
} | |
class MyState<T> implements Observable.OnSubscribe<T> { | |
private final MyProducer producer; | |
MyState(MyProducer producer) { | |
this.producer = producer; | |
this.subscribers = new ArrayList<>(); | |
} | |
@Override | |
public void call(Subscriber<? super T> subscriber) { | |
System.out.println("MyState call"); | |
subscriber.setProducer(producer); | |
subscribers.add(subscriber); | |
} | |
private List<Subscriber<? super T>> subscribers; | |
void onNext(T x) { | |
if (producer.getRequest() > 0) { | |
producer.decrement(); | |
subscribers.forEach(subscriber -> subscriber.onNext(x)); | |
} else { | |
throw new RuntimeException("ばっくぷれっしゃ!"); | |
} | |
} | |
} | |
class MyObservable<T> extends Observable<T> implements Observer<T> { | |
static <T> MyObservable<T> create() { | |
MyProducer producer = new MyProducer(); | |
MyState<T> state = new MyState<>(producer); | |
return new MyObservable<>(state); | |
} | |
private MyState state; | |
private MyObservable(MyState state) { | |
super(state); | |
this.state = state; | |
} | |
public void onCompleted() { | |
System.out.println("MyObservable onCompleted"); | |
} | |
public void onError(Throwable e) { | |
System.out.println("MyObservable onError: " + e.getMessage()); | |
} | |
public void onNext(T x) { | |
System.out.println("MyObservable onNext: " + x); | |
this.state.onNext(x); | |
} | |
} | |
class MyWorker<T> extends Subscriber<T> { | |
@Override | |
public void onStart() { | |
System.out.println("MyWorker onStart"); | |
request(1); | |
} | |
public void onNext(T o) { | |
try { | |
Thread.sleep(100); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
System.out.println("MyWorker next: " + o); | |
// request(1); | |
} | |
public void onCompleted() { | |
System.out.println("MyWorker onCompleted"); | |
} | |
public void onError(Throwable e) { | |
System.out.println("MyWorker error: " + e.getMessage()); | |
e.printStackTrace(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment