Rx中的线程切换

初学者在使用RxJava的过程中,经常搞不清Observable的事件序列和每次操作应该怎样切换线程,切换哪个线程
首先需要搞懂在RxJava.subscribeOn()observeOn() 之间的区别:

  • .subscribeOn() 用来指定Observable应该操作的调度器(Scheduler)
  • .observeOn() 指定 Observable在一个指定的调度器(Scheduler)上给观察者发送通知
  • 默认情况下, 事件序列操作的线程与调用.subscribe()的线程一致

没理解?

英文原文: https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2#.nn1m7lrb8

翻译: hanks
注: 不是完全翻译,添加了具体例子

例子

1.主线程 / .subscribe() 线程

在 Activity的 onCreate()(主线程) 方法中添加以下代码:

1
2
Observable.just(1,2,3)
.subscribe();

调用情况如下:

图片

实验:

1
2
3
4
5
6
7
8
9
10
11
Observable.just(1,2,3)
.doOnNext(new Action1<Integer>() {
@Override public void call(Integer integer) {
Log.i("RxThread", "doOnNext:" + integer +", run In :" + Thread.currentThread().getName() );
}
})
.subscribe(new Action1<Integer>() {
@Override public void call(Integer integer) {
Log.i("RxThread", "get result:" + integer +", run In :" + Thread.currentThread().getName() );
}
});

输出结果:

1
2
3
4
5
6
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:3, run In :main

2. .subscribeOn()

即使你在主线程中添加下面的代码,但是整段代码将运行在 .subscribeOn()定义的线程上

1
2
3
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.subscribe();

图片

实验:

1
2
3
4
5
6
7
8
9
10
11
12
Observable.just(1,2,3)
.doOnNext(new Action1<Integer>() {
@Override public void call(Integer integer) {
Log.i("RxThread", "doOnNext:" + integer +", run In :" + Thread.currentThread().getName() );
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Action1<Integer>() {
@Override public void call(Integer integer) {
Log.i("RxThread", "get result:" + integer +", run In :" + Thread.currentThread().getName() );
}
});

输出结果:

1
2
3
4
5
6
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:1, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:2, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:3, run In :RxNewThreadScheduler-1

3. .observeOn()

加入在主线程中添加下面的代码,首先 Observable 将在 .subscribe() 的线程上创建,但是 .observeOn()方法被调用之后,代码将运行在指定的线程上:

1
2
3
Observable.just(1,2,3)
.observeOn(Schedulers.newThread())
.subscribe();

图片

实验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
new Thread() {
@Override public void run() {
Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
@Override public void call(Integer integer) {
Log.i("RxThread", "doOnNext:" + integer + ", run In :" + Thread.currentThread()
.getName());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override public void call(Integer integer) {
Log.i("RxThread", "get result:" + integer + ", run In :" + Thread.currentThread()
.getName());
}
});

}
}.start();

输出结果:

1
2
3
4
5
6
12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :Thread-155
12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :Thread-155
12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :Thread-155
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:3, run In :main

3. Combined logic

由于操作可以被组合使用,于是有了下面的代码:

1
2
3
4
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe();

图片

实验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
new Thread() {
@Override public void run() {
Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
@Override public void call(Integer integer) {
Log.i("RxThread", "doOnNext:" + integer + ", run In :" + Thread.currentThread()
.getName());
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override public void call(Integer integer) {
Log.i("RxThread", "get result:" + integer + ", run In :" + Thread.currentThread()
.getName());
}
});
}
}.start();

输出结果:

1
2
3
4
5
6
12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :RxNewThreadScheduler-1
12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :RxNewThreadScheduler-1
12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :RxNewThreadScheduler-1
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:3, run In :main

Tips / Gotchas:

1. “UI线程运行异常”

1
2
3
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.subscribe(/** logic which touches ui **//); //在newThread中调用

obviously.

2. 逻辑处理放在后台(newThread)

错误姿势:

1
2
3
4
5
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(/** logic which doesn't touch ui **//)
.subscribe();

实验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
new Thread() {
@Override public void run() {
Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
@Override public void call(String str) {
Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
.getName());
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<String, Observable<String>>() {
@Override public Observable<String> call(String str) {
Log.i("RxThread", "flatMap:" + str + ", run In :" + Thread.currentThread());
return Observable.from(str.split("-") ); // 返回平方
}
})
.subscribe(new Action1<String>() {
@Override public void call(String str) {
Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
.getName());
}
});
}
}.start();

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Picasso, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Picasso, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Glide, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Glide, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Fresco, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Fresco, run In :main

正确姿势:

1
2
3
4
5
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.flatMap(/** logic which doesn't touch ui **//)
.observeOn(AndroidSchedulers.mainThread())
.subscribe();

第二段代码中 flatMap (或者其他逻辑处理)将运行在后台线程, 如果是在Android中,这样做不会阻塞UI,阻塞UI的话有可能导致ANR之类的异常。这跟 AsyncTask中的 doInBackground()类似,在 doInBackground()中做耗时操作

实验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
new Thread() {
@Override public void run() {
Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
@Override public void call(String str) {
Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
.getName());
}
})
.subscribeOn(Schedulers.newThread())
.flatMap(new Func1<String, Observable<String>>() {
@Override public Observable<String> call(String str) {
Log.i("RxThread", "flatMap:" + str + ", run In :" + Thread.currentThread());
return Observable.from(str.split("-") ); // 返回平方
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override public void call(String str) {
Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
.getName());
}
});
}
}.start();

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Picasso, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Glide, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Fresco, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Picasso, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Glide, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Fresco, run In :main

3. 最早的 .subscribeOn() 生效

看下面的代码:

1
2
3
4
Observable.just(1,2,3)
.subscribeOn(thread1)
.subscribeOn(thread2)
.subscribe();

Observable 的创建和 .subscribeOn() 的调用都将在 thread1 上面执行,所以没有必要多次调用 .subscribeOn(),因为只有第一次的是有用的。

实验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
new Thread() {
@Override public void run() {
Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
@Override public void call(String str) {
Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
.getName());
}
})
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribe(new Action1<String>() {
@Override public void call(String str) {
Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
.getName());
}
});
}
}.start();

输出结果

1
2
3
4
5
6
7
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Fresco, run In :RxNewThreadScheduler-1

实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
new Thread() {
@Override public void run() {
Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
@Override public void call(String str) {
Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
.getName());
}
})
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.computation())
.subscribe(new Action1<String>() {
@Override public void call(String str) {
Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
.getName());
}
});
}
}.start();

输出结果

1
2
3
4
5
6
12-06 16:52:13.378 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Picasso, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Glide, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Fresco, run In :RxCachedThreadScheduler-2

Android Rxjava Rxandroid

文章出处 (https://hanks.xyz)