1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| package com.github.sejoung.hystrix;
import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription;
import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;
public class PubSub { public static void main(String[] args) throws InterruptedException { //Publisher <- Observable //Subscriber <- Observer
Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5); ExecutorService es = Executors.newSingleThreadExecutor();
Publisher p = new Publisher() { @Override public void subscribe(Subscriber subscriber) { Iterator<Integer> it = iter.iterator();
subscriber.onSubscribe(new Subscription() { @Override public void request(long l) { es.execute(() -> { int i = 0; try { while (i++ < l) { if (it.hasNext()) { subscriber.onNext(it.next());
} else { subscriber.onComplete(); break; } } } catch (RuntimeException e) { subscriber.onError(e); }
});
}
@Override public void cancel() {
} }); } };
Subscriber<Integer> s = new Subscriber<Integer>() { Subscription subscription;
@Override public void onSubscribe(Subscription subscription) { System.out.println("onSubscribe"); this.subscription = subscription; this.subscription.request(2);
}
@Override public void onNext(Integer integer) { System.out.println(Thread.currentThread().getName()+" onNext " + integer);
this.subscription.request(1);
}
@Override public void onError(Throwable throwable) { System.out.println("onError "+throwable.getMessage()); }
@Override public void onComplete() { System.out.println("onComplete");
} };
p.subscribe(s); es.awaitTermination(10, TimeUnit.HOURS); } }
|