强大的 RxJava 也要进行错误处理

onNext, onComplete, onError,onErrorObservable 序列传递过程中出现异常时被调用,然后终止Observable 序列的传递,以此通知所有的观察者发生了一个不可恢复的错误。但是有好多时候需要我们手动去终止序列,例如当序列传递时不满足某个条件了就不在进行后续的操作,这可能并不是一个异常,需要我们手动抛出。

1
2
3
4
Observable.just(1, 2, 3, 4, null, 6)
.doOnNext(integer1 -> System.out.println(1.0 / integer1))
.subscribe(integer -> System.out.println("integer = " + integer),
throwable -> System.out.println("throwable = " + throwable));

输出

1
2
3
4
5
6
7
8
9
1.0
integer = 1
0.5
integer = 2
0.3333333333333333
integer = 3
0.25
integer = 4
throwable = java.lang.NullPointerException

正常情况下,Observable 序列在传递过程中出现错误会调用 onError 终止序列。 看下面的情况:

1
2
3
4
5
6
7
8
9
10
Observable.just("file1", "file2","file3")
.doOnNext(fileName->{
try {
new FileInputStream(fileName);
} catch (FileNotFoundException e) {
// e.printStackTrace();
}
})
.subscribe(filename -> System.out.println("filename = " + filename),
throwable -> System.out.println("throwable = " + throwable));

输出

1
2
3
filename = file1
filename = file2
filename = file3

因为使用了 FileInputStream 所以必须进行 try ... catch,但是我想在出现 FileNotFoundException 的异常时回调给 ObservableonError 统一处理,这时候就需要将异常抛出。

1
2
3
4
5
6
7
8
9
10
Observable.just("file1", "file2","file3")
.doOnNext(fileName->{
try {
new FileInputStream(fileName);
} catch (FileNotFoundException e) {
throw Exceptions.propagate(new Throwable("文件找不到")); // 抛出异常来中断序列
}
})
.subscribe(filename -> System.out.println("filename = " + filename),
throwable -> System.out.println("throwable = " + throwable));

输出

1
throwable = java.lang.RuntimeException: java.lang.Throwable: 文件找不到

总结

通过 throw 关键字抛出异常

1
2
3
4
5
6
Observable.just("Hello!")
.map(input -> { throw new RuntimeException(); })
.subscribe(
System.out::println,
error -> System.out.println("Error!")
);

当在 FlatMap 中想终止序列,使用 Observable.error()

1
2
3
4
5
6
7
8
Observable.just("Hello!")
.flatMap(input -> {
try {
return Observable.just(transform(input));
} catch (Throwable t) {
return Observable.error(t);
}
})

高级异常处理:

onErrorReturn :当发生错误的时候,发射一个默认值然后结束数据流。所以 Subscriber 看不到异常信息,看到的是正常的数据流结束状态。
onErrorResumeNext :当错误发生的时候,使用另外一个数据流继续发射数据。在返回的 Observable 中是看不到错误信息的。
onExceptionResumeNext :onExceptionResumeNext 和 onErrorResumeNext 的区别是只捕获 Exception;
retry :如果发生了不定性的异常,则通常会重试一下看看是否正常了。 retry 的功能就算重新订阅到事件流,并重头重新开始发射数据。
retryWhen : retryWhen 的参数是一个函数, 该函数的输入参数为一个异常 Observable,返回值为另外一个 Observable。
using :用来管理资源的,如果一个 Observable 需要使用一个资源来发射数据(比如 需要使用一个文件资源,从文件中读取内容),当该 Observable 结束的时候(不管是正常结束还是异常结束)就释放该资源。这样你就不用自己管理资源了, 用 Rx 的方式来管理资源。

1
2
3
4
5
6
7
8
9
10
11
...
.flatMap(...)
.flatMap(...)
.onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
// Here simply return an Observable which will emit the Throwable of your liking
return Observable.error(new Throwable(...));
}
})
.flatMap(...);

参考链接

文章来自: https://hanks.pub