RxJava学习笔记

Posted by BY on July 18, 2018

RxJava四要素

  1. 被观察者
  2. 观察者
  3. 订阅
  4. 事件

Demo

// 创建被观察者:create
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("World");
        subscriber.onCompleted();
    }
});

// 通过just方法来创建被观察者
Observable observableJust = Observable.just("Hello", "World");

// 通过from方法来创建被观察者
String[] parameters = {"Hello", "World"};
Observable observableFrom = Observable.from(parameters);
// 创建观察者
Observer<Object> observer = new Observer<Object>() {
    @Override
    public void onCompleted() { // 不再有新的事件由被观察者发出时调用
    }
    
    @Override
    public void onError(Throwable e) { // 处理错误时调用
    }

    @Override
    public void onNext(Object s) { // 被观察者发出新事件时调用 
    }
};

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
    }
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onStart(String s) {
        super.onStart();
    }
}
// 订阅
public void doRxjava() {
    observable.subscribe(observer);
    observable.subscribe(subscriber);
}

三个关键对象和一个核心方法

  1. Observable(被观察者)
  2. OnSubscribe (相当于观察者模式中,被观察者通知观察者的notifyObserver()方法)
  3. Subscriber(观察者)
  4. subscribe(订阅方法)

源码分析之先从create方法讲起

public class Observable<T> {
    final OnSubscribe<T> onSubscribe;
    
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }
}

hook是一个代理类,create方法利用hook生成一个被观察者。

RxJava如何创建Subscriber以及如何完成订阅

Subscriber 实现了Observer和Subscription接口

我们回顾一下这两个接口

public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

public interface Subscription {
    void unsubscribe(); // 解绑
    boolean isUnsubscribed(); // 是否已经解绑
}

我们再回头看Subscriber的成员

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    // represents requested not set yet
    private static final Long NOT_SET = Long.MIN_VALUE;

    private final SubscriptionList subscriptions; // 所有观察者的订阅事件,并实现了Subscription接口(重要成员!!!)
    private final Subscriber<?> subscriber;
    /* protected by `this` */
    private Producer producer;
    /* protected by `this` */
    private long requested = NOT_SET; // default to not set

    ......
}

当创建好了我们的subscriber后,我们就要调用subscribe方法将观察者与被观察者绑定。我们接着看subscribe方法是怎么实现的。

// 在Observable.java中
public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    // validate and proceed
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
        /*
         * the subscribe function can also be overridden but generally that's not the appropriate approach
         * so I won't mention that in the exception
         */
    }
        
    // new Subscriber so onStart it
    subscriber.onStart();
        
    /*
     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
     * to user code from within an Observer"
     */
    // if not already wrapped
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    // The code below is exactly the same an unsafeSubscribe but not used because it would 
    // add a significant depth to already huge call stacks.
    try {
        // allow the hook to intercept and/or decorate
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        // special handling for certain Throwable/Error/Exception types
        Exceptions.throwIfFatal(e);
        // in case the subscriber can't listen to exceptions anymore
        if (subscriber.isUnsubscribed()) {
            RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
        } else {
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
        }
        return Subscriptions.unsubscribed();
    }
}

这个方法首先对subscriber和observable.onSubscribe进行非空判断。然后调用subscriber的onStart方法,默认的onStart方法是空,在使用时需要由我们自己来实现。

接着将传入的subscriber转化为SafeSubscriber,SafeSubscriber对我们的subscriber类进行了包装,包括onCompleted方法和onError方法。

接下来,在try中调用了call方法,将subscriber方法传进来。调用完call方法意味着完成了subscriber的订阅。

最后我们hook调用onSubscribeReturn方法将subscriber返回,必要注意subscriber方法必须实现Subscription接口,同时subscriber观察者都持有一个SubscriptionList类(该类存储了这个观察者订阅的所有被观察者列表)。

我们回到前面,再总结回顾一下rxjava最基本的使用流程:

首先我们创建被观察者,调用Observable的create方法。同时,我们在create方法的参数中会new一个Observable.OnSubscribe对象完成我们的被观察者创建。

接着我们会创建观察者Observer或者Subscriber。

最后,我们会调用observable的subscribe方法,传入观察者去完成订阅。

在我们调用subscribe方法后,默认会触发被观察者Observable中OnSubscribe对象的call方法,来完成回调,在call方法中可以执行观察者subscriber的onNext、onCompleted、onError方法。

操作符

变换:就是将时间序列中的对象或整个序列进行加工处理。

map:简单来说就是用来把一个事件转换为另一个事件的。

官方说法:map()函数接受一个Func1类型的参数,然后把这个Func1应用到每一个由Observable发射的值上,将发射的值转换为我们期望的值。

一个例子

Observable.just("map/image/map.png")
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String filePath) {
                return getBitmap(filepath);
            }
        })
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) {
                return showBitmap(bitmap);
            }
        })
})

map操作符源码学习

从我们上一个例子出发,我们先看just方法

public static <T> Observable<T> just(final T value) {
    return ScalarSynchronousObservable.create(value);
}

//继续往下走
public static <T> ScalarSynchronousObservable<T> create(T t) {
    return new ScalarSynchronousObservable<T>(t);
}

//继续往下走
protected ScalarSynchronousObservable(final T t) {
    // 在这里我们看到产生了一个被观察者,
    // 然后通过call方法来通知观察者,
    // 通过之前分析可知,只要再生成一个观察者,通过订阅方法,
    // 然后观察者模式关系就形成了。我们后面主要看map的实现 
    super(new OnSubscribe<T>() { 
        @Override
        public void call(Subscriber<? super T> s) {
            s.setProducer(createProducer(s, t));
        }
    });
    this.t = t;
}
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

我们可以看到再map方法中调用了lift方法,它是RxJava操作符的核心,其本质仍是一个操作符,作用是针对我们的事件序列处理再发送。

于是我们知道,需要着重了解下lift的源码

lift操作符源码学习

前面的map方法中,lift包含了一个OperatorMap类,我们先从这个类开始学起。

// OperatorMap实现了Operator接口,我们先看call方法
public final class OperatorMap<T, R> implements Operator<R, T> {
    final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        return parent;
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {
        
        final Subscriber<? super R> actual;
        
        final Func1<? super T, ? extends R> mapper;

        boolean done;
        
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        
        @Override
        public void onNext(T t) {
            R result;
            
            try {
                result = mapper.call(t); // (非常重要!!!)
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            
            actual.onNext(result); // (非常重要!!!)
        }
        
        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaPluginUtils.handleException(e);
                return;
            }
            done = true;
            
            actual.onError(e);
        }
        
        
        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }
        
        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}

我们可以看到,在call方法中,最重要的操作是调用了transformer的call方法,看一下Func1这个接口定义就知道call来自哪里了。它的返回值就是R,即需要返回的对象,完成了事件的转化。

public interface Func1<T, R> extends Function {
    R call(T t);
}

result = mapper.call(t)完成了事件的转化,从T->R。

actual.onNext(result)将转化后的事件传给观察者进行后续操作。

o.add(parent)将新产生的MapSubscriber加入到o的观察者列表当中。

在OperatorMap的call方法中只返回了Subscriber,并没有完成订阅。订阅操作是在lift中进行的,我们继续看lift。

// lift方法实现
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

可以看出它创建了一个新的Observable被观察者,我们继续看OnSubscribeLift是什么东西。

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st); // 在这里完成了整个订阅,新生成的被观察者就是个传话人,在事件转换中间传递消息
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}

flatMap操作符源码学习

一个例子

Observable.just("http://www.baidu,com",
                "https://www.sina.com",
                "https://www.sohu.com/")
                .flatMap(new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String s) {
                        return createObservable(s);
                    }
                 })
                 .subscribe(new Action1<String>() {
                     @Override
                     public void call(String s) {
                         System.out.println(s);
                     }
                 })
})

flaMap与map最大的区别就是:

  1. 会将一个事件对象转化为Observable
  2. 它不会直接发送这个Observable,而是将这个Observable激活让它自己开始发送事件。
  3. 每一个创建出来的Observable发送的事件,都被汇入同一个Observable

RxJava线程控制:多线程编程准则&RxJava如何处理多线程&Schedulers

Android多线程编程的原则

  1. 不要阻塞UI线程
  2. 不要在UI线程之外访问UI控件

RxJava线程控制

  • Schedulers 线程控制符,用于线程切换
  • (1)Schedulers.immediate() 在当前线程下运行,默认的
  • (2)Schedulers.newThread() 总是启用一个新线程,并在新线程下工作
  • (3)Schedulers.io() 用于进行io操作(网络收发数据或者读写文件),无数量上限的线程池,比newThread效率更高
  • (4)Schedulers.computation() 用于计算密集型任务
  • (5)AndroidSchedulers.mainThread() 在Android主线程中运行

RxJava如何进行线程控制

  1. subscribeOn() Observable.OnSubscribe激活时所在的线程,换句话收就是事件的发出所在的线程
  2. observeOn() 事件消费所在的线程

先说一个结论,后面分析源码:onserveOn支持多次调用,subscribeOn只能调用一次。

subscribeOn源码分析

public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return create(new OperatorSubscribeOn<T>(this, scheduler)); // 重点在这里,create一个OperatorSubscribeOn对象
}

我们接着看OperatorSubscribeOn对象是什么。

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                
                source.unsafeSubscribe(s);
            }
        });
    }
}

两个成员变量,分别是传入的被观察者source和线程控制对象scheduler。对于实现了OnSubscribe的OperatorSubscribeOn类,我们需注重看一下call方法都写了什么。call方法的第一行遇到一个Worker类,我们先了解一下这个。

public abstract static class Worker implements Subscription {
    // 调度器执行一个事件的抽象方法,由用户自己实现
    public abstract Subscription schedule(Action0 action);
    
    // 调度器延迟执行一个事件的抽象方法,由用户自己实现
    public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);

    public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
            final long periodInNanos = unit.toNanos(period);
            final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now());
            final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);

            final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
            final Action0 recursiveAction = new Action0() {
                long count;
                long lastNowNanos = firstNowNanos;
                long startInNanos = firstStartInNanos;
                @Override
                public void call() {
                    if (!mas.isUnsubscribed()) {
                        action.call();
                        
                        long nextTick;
                        
                        long nowNanos = TimeUnit.MILLISECONDS.toNanos(now());
                        // If the clock moved in a direction quite a bit, rebase the repetition period
                        if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
                                || nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
                            nextTick = nowNanos + periodInNanos;
                            /* 
                             * Shift the start point back by the drift as if the whole thing
                             * started count periods ago.
                             */
                            startInNanos = nextTick - (periodInNanos * (++count));
                        } else {
                            nextTick = startInNanos + (++count * periodInNanos);
                        }
                        lastNowNanos = nowNanos;
                        
                        long delay = nextTick - nowNanos;
                        mas.set(schedule(this, delay, TimeUnit.NANOSECONDS));
                    }
                }
            };
            MultipleAssignmentSubscription s = new MultipleAssignmentSubscription();
            // Should call `mas.set` before `schedule`, or the new Subscription may replace the old one.
            mas.set(s);
            s.set(schedule(recursiveAction, initialDelay, unit));
            return mas;
        }

        public long now() {
            return System.currentTimeMillis();
        }
    }
}

Worker类实现了Subscription接口,用于在单个线程或事件循环上执行动作的顺序调度器。schedulePeriodically()方法定期的执行可取消的事件,这个默认的实现是递归并等待调度事件的完成(代替可能在当前长时间执行的事件)。如果你要复写该方法,应该以一种更好的方式周期性的执行每个调度事件。

我们再回到OperatorSubscribeOn的call方法,当创建完一个inner的Worker类后,inner.schedule()方法中setProducer对线程进行判断。

source.unsafeSubscribe(s)这是最核心的一句,通过之前传入的Observable被观察者调用,表明没有取消订阅。我们看一下它的实现。

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    try {
        // new Subscriber so onStart it
        subscriber.onStart();
        // allow the hook to intercept and/or decorate
        hook.onSubscribeStart(this, onSubscribe).call(subscriber); // 表明subcribeOn方法已经完成了
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        // special handling for certain Throwable/Error/Exception types
        Exceptions.throwIfFatal(e);
        // if an unhandled error occurs executing the onSubscribe we will propagate it
        try {
            subscriber.onError(hook.onSubscribeError(e));
        } catch (Throwable e2) {
            Exceptions.throwIfFatal(e2);
            // if this happens it means the onError itself failed (perhaps an invalid function implementation)
            // so we are unable to propagate the error correctly and will just throw
            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
            // TODO could the hook be the cause of the error in the on error handling.
            hook.onSubscribeError(r);
            // TODO why aren't we throwing the hook's return value.
            throw r;
        }
        return Subscriptions.unsubscribed();
    }
}

说了一堆,是时候总结一下SubscribeOn方法了

  1. 会新生成一个Observable
  2. onSubscribe会在目标Subscriber订阅时使用传入的Scheduler的worker作为线程调度执行者
  3. 在对应的线程中通知原始Observable发送消息给这个过程中临时生成的Subscriber
  4. 这个Subscriber又会通知到目标Subscriber,从而完成我们的subscribeOn的过程

observeOn源码分析

经过层层重载,我们找到它真正实现的地方。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

我们可以看到在这个函数尾部调用了一个lift方法,这个lift方法中传入了一个OperatorObserveOn对象,我们来看一下这个OperatorObserveOn类。

public final class OperatorObserveOn<T> implements Operator<T, T> {

    private final Scheduler scheduler;
    private final boolean delayError;
    private final int bufferSize;

    ......
}

这个OperatorObserveOn类实现了Operator接口。同样我们要着重看一下call方法。

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
    if (scheduler instanceof ImmediateScheduler) {
        // avoid overhead, execute directly
        return child;
    } else if (scheduler instanceof TrampolineScheduler) {
        // avoid overhead, execute directly
        return child;
    } else {
        ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
        parent.init();
        return parent;
    }
}

我们直接看else中的语句,新建了一个ObserveOnSubscriber对象,这个对象就是我们完成线程切换实际的地方。我们在这里并没有看到线程切换的操作,因该是在这个对象的next方法中进行的。

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
    ......

    @Override
    public void onNext(final T t) {
        if (isUnsubscribed() || finished) {
            return;
        }
        if (!queue.offer(on.next(t))) {
            onError(new MissingBackpressureException());
            return;
        }
        schedule();
    }

    ......
}

首先会将结果保存到offer队列当中,如果没有保存成功会调用onError方法,如果成功保存则会进行线程调度schedule()。我们看一下schedule()的实现。

// OperatorObserveOn.java
protected void schedule() {
    if (counter.getAndIncrement() == 0) {
        recursiveScheduler.schedule(this); // 点进去看一下实现
    }
}

// Scheduler.java
public abstract class Scheduler {
    ......

    public abstract static class Worker implements Subscription {
        public abstract Subscription schedule(Action0 action); // 走到了这个抽象方法
  
        ......
    }

    ......
}

// 我们看一下NewThreadWorker这个具体实现类,是怎么做的。
// NewThreadWorker.java
......

@Override
public Subscription schedule(final Action0 action) {
    return schedule(action, 0, null); // 点进去
}

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
    if (isUnsubscribed) {
        return Subscriptions.unsubscribed();
    }
    return scheduleActual(action, delayTime, unit); // 实际调用方法,再点进去
}

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
    Action0 decoratedAction = schedulersHook.onSchedule(action);
    ScheduledAction run = new ScheduledAction(decoratedAction);
    Future<?> f;
    if (delayTime <= 0) {
        f = executor.submit(run);  // 利用线程池完成事件的处理
    } else {
        f = executor.schedule(run, delayTime, unit);
    }
    run.add(f);

    return run;
}

......

于是我们发现无论是SubscribeOn还是ObserveOn都是通过线程池完成线程控制的。

总结一下:

  • subscribeOn是通过新建Observable的方式,使用OnSubscribe类的方式去做到线程切换的。所以多次调用subscribeOn是无效的,因为它只会调用第一次创建的Observable,这个Observable在哪个线程创建会一直保留,不受后面重复调用影响。
  • observeOn是通过operator操作符的形式去完成线程切换的,所以它的作用域和其他操作符一样,是调用observeOn之后的链路。
  • observeOn()指定的是踏之后的操作符所在的线程,通过observeOn()的多次调用,程序实现了线程的多次切换。
  • subscribeOn()的位置放在哪里都可以,但它只是调用一次的,原因就是subscribeOn()是通过新建Observable的方式完成的