今天就跟大家聊聊有關(guān)RxJava簡單源碼的示例分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

創(chuàng)新互聯(lián)公司專注于盤州網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠為您提供盤州營銷型網(wǎng)站建設(shè),盤州網(wǎng)站制作、盤州網(wǎng)頁設(shè)計(jì)、盤州網(wǎng)站官網(wǎng)定制、小程序制作服務(wù),打造盤州網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供盤州網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。
demo代碼如下
public class ObservableTest {
public static void main(String[] args) {
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> observer) throws Exception {
observer.onNext("處理的數(shù)字是:" + Math.random() * 100);
observer.onComplete();
}
});
observable.subscribe(new Consumer<Object>() {
@Override
public void accept(Object consumer) throws Exception {
System.out.println("我處理的元素是:" + consumer);
}
});
}
}先看第一行代碼
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> observer) throws Exception {
observer.onNext("處理的數(shù)字是:" + Math.random() * 100);
observer.onComplete();
}
});
//Observable.java
//第1560行
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//RxJavaPlugins里有很多方法可以設(shè)置,
//有點(diǎn)類似于Spring的ApplicationListener,在對(duì)應(yīng)的生命周期中會(huì)被調(diào)用
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
//RxJavaPlugins.java
//第1031行
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
//如果設(shè)置了對(duì)應(yīng)的方法,就執(zhí)行,否則原樣返回
if (f != null) {
return apply(f, source);
}
return source;
}可以看到RxJavaPlugins中的方法如果不配置的方法,參數(shù)就會(huì)原樣返回,所以O(shè)bservable.create最終得到的就是ObservableCreate這個(gè)類。
再來看第二行代碼
observable.subscribe(new Consumer<Object>() {
@Override
public void accept(Object consumer) throws Exception {
System.out.println("我處理的元素是:" + consumer);
}
});
//Observable.java
//第10869行
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
//Observable.java
//第10958行
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
//這里的onNext就是我們自己寫的Consumer類
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
//Observable.java
//第10974行
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//還記得我們的observable變量是什么類型么?ObservableCreate!
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
//ObservableCreate.java
//第35行
protected void subscribeActual(Observer<? super T> observer) {
//這里的observer是LambdaObserver
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
//省略部分代碼
}
//LambdaObserver.java
//第47行
public void onSubscribe(Disposable s) {
//設(shè)置AtomicReference的值(LambdaObserver繼承了AtomicReference)
//如果之前已經(jīng)設(shè)置過了(AtomicReference的值不為空),則直接返回false
if (DisposableHelper.setOnce(this, s)) {
try {
//在new LambdaObserver()的時(shí)候我們?cè)O(shè)置了onSubscribe = Functions.emptyConsumer()
//所以這里什么都不做
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
onError(ex);
}
}
}
//ObservableCreate.java
//第35行
protected void subscribeActual(Observer<? super T> observer) {
//省略部分代碼
try {
//還記得source是啥么,就是你在創(chuàng)建Observable的時(shí)候new的ObservableOnSubscribe
//于是終于執(zhí)行到了我們編寫的代碼中
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//ObservableOnSubscribe.java
//第6行
public static void main(String[] args) {
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
//開始執(zhí)行這個(gè)方法
//observer是new CreateEmitter<T>(new LambdaObserver());
@Override
public void subscribe(ObservableEmitter<Object> observer) throws Exception {
observer.onNext("處理的數(shù)字是:" + Math.random() * 100);
observer.onComplete();
}
});
}
//ObservableCreate$CreateEmitter
//第61行
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//這里的observer就是LambdaObserver
//t就是《"處理的數(shù)字是:" + Math.random() * 100》這段字符串
observer.onNext(t);
}
}
//LambdaObserver.java
//第60行
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
//ObservableOnSubscribe.java
//第13行
public static void main(String[] args) {
//省略部分代碼
observable.subscribe(new Consumer<Object>() {
@Override
public void accept(Object consumer) throws Exception {
System.out.println("我處理的元素是:" + consumer);
}
});
}
//ObservableOnSubscribe.java
//第8行
public static void main(String[] args) {
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> observer) throws Exception {
//省略部分代碼
observer.onComplete();
}
});
//省略部分代碼
}
//ObservableCreate.java
//第95行
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
//取消訂閱
dispose();
}
}
}
//LambdaObserver.java
//第86行
public void onComplete() {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
//new LambdaObserver的時(shí)候設(shè)置了為空,所以不執(zhí)行操作
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
}至此,調(diào)用流程分析完成,可以看到雖然在main方法里我們只寫了幾行代碼,但是內(nèi)部調(diào)用的流程還是很繁雜的
看完上述內(nèi)容,你們對(duì)RxJava簡單源碼的示例分析有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。
本文名稱:RxJava簡單源碼的示例分析
標(biāo)題網(wǎng)址:http://chinadenli.net/article20/ggpsjo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護(hù)、、網(wǎng)站設(shè)計(jì)、響應(yīng)式網(wǎng)站、定制開發(fā)、動(dòng)態(tài)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)