RxJava四要素
- 被观察者
- 观察者
- 订阅
- 事件
Demo
// 创建被观察者:create
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
}
});
// 通过just方法来创建被观察者
Observable observableJust = Observable.just("Hello", "World");
// 通过from方法来创建被观察者
String[] parameters = {"Hello", "World"};
Observable observableFrom = Observable.from(parameters);
// 创建观察者
Observer<Object> observer = new Observer<Object>() {
@Override
public void onCompleted() { // 不再有新的事件由被观察者发出时调用
}
@Override
public void onError(Throwable e) { // 处理错误时调用
}
@Override
public void onNext(Object s) { // 被观察者发出新事件时调用
}
};
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onStart(String s) {
super.onStart();
}
}
// 订阅
public void doRxjava() {
observable.subscribe(observer);
observable.subscribe(subscriber);
}
三个关键对象和一个核心方法
- Observable(被观察者)
- OnSubscribe (相当于观察者模式中,被观察者通知观察者的notifyObserver()方法)
- Subscriber(观察者)
- subscribe(订阅方法)
源码分析之先从create方法讲起
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
}
hook是一个代理类,create方法利用hook生成一个被观察者。
RxJava如何创建Subscriber以及如何完成订阅
Subscriber 实现了Observer和Subscription接口
我们回顾一下这两个接口
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
public interface Subscription {
void unsubscribe(); // 解绑
boolean isUnsubscribed(); // 是否已经解绑
}
我们再回头看Subscriber的成员
public abstract class Subscriber<T> implements Observer<T>, Subscription {
// represents requested not set yet
private static final Long NOT_SET = Long.MIN_VALUE;
private final SubscriptionList subscriptions; // 所有观察者的订阅事件,并实现了Subscription接口(重要成员!!!)
private final Subscriber<?> subscriber;
/* protected by `this` */
private Producer producer;
/* protected by `this` */
private long requested = NOT_SET; // default to not set
......
}
当创建好了我们的subscriber后,我们就要调用subscribe方法将观察者与被观察者绑定。我们接着看subscribe方法是怎么实现的。
// 在Observable.java中
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
}
return Subscriptions.unsubscribed();
}
}
这个方法首先对subscriber和observable.onSubscribe进行非空判断。然后调用subscriber的onStart方法,默认的onStart方法是空,在使用时需要由我们自己来实现。
接着将传入的subscriber转化为SafeSubscriber,SafeSubscriber对我们的subscriber类进行了包装,包括onCompleted方法和onError方法。
接下来,在try中调用了call方法,将subscriber方法传进来。调用完call方法意味着完成了subscriber的订阅。
最后我们hook调用onSubscribeReturn方法将subscriber返回,必要注意subscriber方法必须实现Subscription接口,同时subscriber观察者都持有一个SubscriptionList类(该类存储了这个观察者订阅的所有被观察者列表)。
我们回到前面,再总结回顾一下rxjava最基本的使用流程:
首先我们创建被观察者,调用Observable的create方法。同时,我们在create方法的参数中会new一个Observable.OnSubscribe对象完成我们的被观察者创建。
接着我们会创建观察者Observer或者Subscriber。
最后,我们会调用observable的subscribe方法,传入观察者去完成订阅。
在我们调用subscribe方法后,默认会触发被观察者Observable中OnSubscribe对象的call方法,来完成回调,在call方法中可以执行观察者subscriber的onNext、onCompleted、onError方法。
操作符
变换:就是将时间序列中的对象或整个序列进行加工处理。
map:简单来说就是用来把一个事件转换为另一个事件的。
官方说法:map()函数接受一个Func1类型的参数,然后把这个Func1应用到每一个由Observable发射的值上,将发射的值转换为我们期望的值。
一个例子
Observable.just("map/image/map.png")
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) {
return getBitmap(filepath);
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
return showBitmap(bitmap);
}
})
})
map操作符源码学习
从我们上一个例子出发,我们先看just方法
public static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
//继续往下走
public static <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}
//继续往下走
protected ScalarSynchronousObservable(final T t) {
// 在这里我们看到产生了一个被观察者,
// 然后通过call方法来通知观察者,
// 通过之前分析可知,只要再生成一个观察者,通过订阅方法,
// 然后观察者模式关系就形成了。我们后面主要看map的实现
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
s.setProducer(createProducer(s, t));
}
});
this.t = t;
}
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
我们可以看到再map方法中调用了lift方法,它是RxJava操作符的核心,其本质仍是一个操作符,作用是针对我们的事件序列处理再发送。
于是我们知道,需要着重了解下lift的源码
lift操作符源码学习
前面的map方法中,lift包含了一个OperatorMap类,我们先从这个类开始学起。
// OperatorMap实现了Operator接口,我们先看call方法
public final class OperatorMap<T, R> implements Operator<R, T> {
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
return parent;
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t); // (非常重要!!!)
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result); // (非常重要!!!)
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaPluginUtils.handleException(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
我们可以看到,在call方法中,最重要的操作是调用了transformer的call方法,看一下Func1这个接口定义就知道call来自哪里了。它的返回值就是R,即需要返回的对象,完成了事件的转化。
public interface Func1<T, R> extends Function {
R call(T t);
}
result = mapper.call(t)完成了事件的转化,从T->R。
actual.onNext(result)将转化后的事件传给观察者进行后续操作。
o.add(parent)将新产生的MapSubscriber加入到o的观察者列表当中。
在OperatorMap的call方法中只返回了Subscriber,并没有完成订阅。订阅操作是在lift中进行的,我们继续看lift。
// lift方法实现
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
可以看出它创建了一个新的Observable被观察者,我们继续看OnSubscribeLift是什么东西。
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st); // 在这里完成了整个订阅,新生成的被观察者就是个传话人,在事件转换中间传递消息
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}
flatMap操作符源码学习
一个例子
Observable.just("http://www.baidu,com",
"https://www.sina.com",
"https://www.sohu.com/")
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return createObservable(s);
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
})
})
flaMap与map最大的区别就是:
- 会将一个事件对象转化为Observable
- 它不会直接发送这个Observable,而是将这个Observable激活让它自己开始发送事件。
- 每一个创建出来的Observable发送的事件,都被汇入同一个Observable
RxJava线程控制:多线程编程准则&RxJava如何处理多线程&Schedulers
Android多线程编程的原则
- 不要阻塞UI线程
- 不要在UI线程之外访问UI控件
RxJava线程控制
- Schedulers 线程控制符,用于线程切换
- (1)Schedulers.immediate() 在当前线程下运行,默认的
- (2)Schedulers.newThread() 总是启用一个新线程,并在新线程下工作
- (3)Schedulers.io() 用于进行io操作(网络收发数据或者读写文件),无数量上限的线程池,比newThread效率更高
- (4)Schedulers.computation() 用于计算密集型任务
- (5)AndroidSchedulers.mainThread() 在Android主线程中运行
RxJava如何进行线程控制
- subscribeOn() Observable.OnSubscribe激活时所在的线程,换句话收就是事件的发出所在的线程
- observeOn() 事件消费所在的线程
先说一个结论,后面分析源码:onserveOn支持多次调用,subscribeOn只能调用一次。
subscribeOn源码分析
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler)); // 重点在这里,create一个OperatorSubscribeOn对象
}
我们接着看OperatorSubscribeOn对象是什么。
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
两个成员变量,分别是传入的被观察者source和线程控制对象scheduler。对于实现了OnSubscribe的OperatorSubscribeOn类,我们需注重看一下call方法都写了什么。call方法的第一行遇到一个Worker类,我们先了解一下这个。
public abstract static class Worker implements Subscription {
// 调度器执行一个事件的抽象方法,由用户自己实现
public abstract Subscription schedule(Action0 action);
// 调度器延迟执行一个事件的抽象方法,由用户自己实现
public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now());
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
final Action0 recursiveAction = new Action0() {
long count;
long lastNowNanos = firstNowNanos;
long startInNanos = firstStartInNanos;
@Override
public void call() {
if (!mas.isUnsubscribed()) {
action.call();
long nextTick;
long nowNanos = TimeUnit.MILLISECONDS.toNanos(now());
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
|| nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
nextTick = nowNanos + periodInNanos;
/*
* Shift the start point back by the drift as if the whole thing
* started count periods ago.
*/
startInNanos = nextTick - (periodInNanos * (++count));
} else {
nextTick = startInNanos + (++count * periodInNanos);
}
lastNowNanos = nowNanos;
long delay = nextTick - nowNanos;
mas.set(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
};
MultipleAssignmentSubscription s = new MultipleAssignmentSubscription();
// Should call `mas.set` before `schedule`, or the new Subscription may replace the old one.
mas.set(s);
s.set(schedule(recursiveAction, initialDelay, unit));
return mas;
}
public long now() {
return System.currentTimeMillis();
}
}
}
Worker类实现了Subscription接口,用于在单个线程或事件循环上执行动作的顺序调度器。schedulePeriodically()方法定期的执行可取消的事件,这个默认的实现是递归并等待调度事件的完成(代替可能在当前长时间执行的事件)。如果你要复写该方法,应该以一种更好的方式周期性的执行每个调度事件。
我们再回到OperatorSubscribeOn的call方法,当创建完一个inner的Worker类后,inner.schedule()方法中setProducer对线程进行判断。
source.unsafeSubscribe(s)这是最核心的一句,通过之前传入的Observable被观察者调用,表明没有取消订阅。我们看一下它的实现。
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber); // 表明subcribeOn方法已经完成了
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
说了一堆,是时候总结一下SubscribeOn方法了
- 会新生成一个Observable
- onSubscribe会在目标Subscriber订阅时使用传入的Scheduler的worker作为线程调度执行者
- 在对应的线程中通知原始Observable发送消息给这个过程中临时生成的Subscriber
- 这个Subscriber又会通知到目标Subscriber,从而完成我们的subscribeOn的过程
observeOn源码分析
经过层层重载,我们找到它真正实现的地方。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
我们可以看到在这个函数尾部调用了一个lift方法,这个lift方法中传入了一个OperatorObserveOn对象,我们来看一下这个OperatorObserveOn类。
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;
......
}
这个OperatorObserveOn类实现了Operator接口。同样我们要着重看一下call方法。
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
我们直接看else中的语句,新建了一个ObserveOnSubscriber对象,这个对象就是我们完成线程切换实际的地方。我们在这里并没有看到线程切换的操作,因该是在这个对象的next方法中进行的。
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
......
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
......
}
首先会将结果保存到offer队列当中,如果没有保存成功会调用onError方法,如果成功保存则会进行线程调度schedule()。我们看一下schedule()的实现。
// OperatorObserveOn.java
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this); // 点进去看一下实现
}
}
// Scheduler.java
public abstract class Scheduler {
......
public abstract static class Worker implements Subscription {
public abstract Subscription schedule(Action0 action); // 走到了这个抽象方法
......
}
......
}
// 我们看一下NewThreadWorker这个具体实现类,是怎么做的。
// NewThreadWorker.java
......
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null); // 点进去
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit); // 实际调用方法,再点进去
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run); // 利用线程池完成事件的处理
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
......
于是我们发现无论是SubscribeOn还是ObserveOn都是通过线程池完成线程控制的。
总结一下:
- subscribeOn是通过新建Observable的方式,使用OnSubscribe类的方式去做到线程切换的。所以多次调用subscribeOn是无效的,因为它只会调用第一次创建的Observable,这个Observable在哪个线程创建会一直保留,不受后面重复调用影响。
- observeOn是通过operator操作符的形式去完成线程切换的,所以它的作用域和其他操作符一样,是调用observeOn之后的链路。
- observeOn()指定的是踏之后的操作符所在的线程,通过observeOn()的多次调用,程序实现了线程的多次切换。
- subscribeOn()的位置放在哪里都可以,但它只是调用一次的,原因就是subscribeOn()是通过新建Observable的方式完成的