EventBus源码学习

初步学习EventBus源码实现

Posted by BY on July 10, 2018

写在前面的话:

近期做项目大量用到了EventBus通过收发消息,更新界面上的按钮状态和某些页面的同步。但是随着项目需求的增加,发送的消息类型越来越多多,收发场景越来越复杂,除了合理分包外,如何高效使用EventBus是我考虑的一个问题。另外RxBus和RxJava是否是更合适的控件通信方案,有待进一步考察。本文先探索一下EventBus的源码世界。

目录:

一、EventBus的使用

二、EventBus2源码分析

一、EventBus的使用

1.导入组件

打开App的build.gradle,在dependencies中添加最新的EventBus依赖:

如果需要使用EventBus的索引加速,还需要下面三步操作:

首先在项目的gradle的dependencies中引入apt编译插件:

classpath 'com.neenbedankt.gradle.plugins:android-apt:1.8'

然后再App的build.gradle中应用apt插件,并设置apt生成的索引的包名和类名:

apply plugin: 'com.neenbedankt.android-apt'
apt {
    arguments {
    eventBusIndex "com.study.sangerzhong.studyapp.MyEventBusIndex"
    }
}

接着在App的dependencies中引入EventBusAnnotationProcessor:

apt 'org.greenrobot:eventbus-annotation-processor:3.0.1'

这里需要注意,如果应用了EventBusAnnotationProcessor却没有设置arguments的话,编译时就会报错:

No option eventBusIndex passed to annotation processor

此时需要我们先编译一次,生成索引类。编译成功之后,就会发现在\ProjectName\app\build\generated\source\apt\PakageName\下看到通过注解分析生成的索引类,这样我们便可以在初始化EventBus时应用我们生成的索引了。

2.初始化EventBus

EventBus默认有一个单例,可以通过getDefault()获取,也可以通过EventBus.builder()构造自定义的EventBus,比如要应用我们生成好的索引时:

EventBus mEventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();

如果想把自定义的设置应用到EventBus默认的单例中,则可以用installDefaultEventBus()方法:

EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();

3.定义事件

所有能被实力化为Object的实例都可以作为事件:

public class DriverEvent { public String info; }

在最新版的eventbus 3中如果用到了索引加速,事件类的修饰符必须为public,不然编译时会报错:

Subscriber method must be public

4.监听事件

首先把作为订阅事件的模块通过EventBus注册监听:

mEventBus.register(this);

在3.0之前,注册监听需要区分是否监听黏性(sticky)事件,监听EventBus事件的模块需要实现以onEvent开头的方法。如今改为在方法上添加注解的形式:

@Subscribe(threadMode = ThreadMode.POSTING, priority = 0, sticky = true)
public void handleEvent(DriverEvent event) {
    Log.d(TAG, event.info);
}

注解有三个参数,threadMode为回调所在的线程,priority为优先级,sticky为是否接收黏性事件。调度单位从类细化到了方法,对方法的命名也没有了要求,方便混淆代码。但注册了监听的模块必须有一个标注了Subscribe注解方法,不然在register时会抛出异常:

Subscriber class XXX and its super classes have no public methods with the @Subscribe annotation

5.发送事件

调用post或者postSticky即可:

mEventBus.post(new DriverEvent("magnet:?xt=urn:btih……"));

在实际项目的使用中,register和unregister通常与Activity和Fragment的生命周期相关,ThreadMode.MainThread可以很好地解决Android的界面刷新必须在UI线程的问题,不需要再回调后用Handler中转(EventBus中已经自动用Handler做了处理),黏性事件可以很好地解决post与register同时执行时的异步问题(这个在原理中会说到),事件的传递也没有序列化与反序列化的性能消耗,足以满足我们大部分情况下的模块间通信需求。

二、EventBus2源码分析

1.EventBus工作原理

首先从整体上梳理一下EventBus的工作原理。

事件发布者post一个event给EventBus,EventBus作为一个大的收发中心,按事件类别分别通知该类事件的众多订阅者执行onEvent方法。

register的订阅逻辑

(1)首先是调用register()方法注册一个订阅者A;

(2)遍历这个订阅者A的全部以onEvent开头的订阅方法;

(3)将A订阅的所有事件作为key,所有能相应key事件的订阅者的集合作为value,存入Map<事件,List<订阅这个事件的订阅者> >

(4)以A的类名为key,所有onEvent参数类型的类名组成的集合为value,存入Map<订阅者,List<订阅的事件> >;

(5)如果是订阅了粘滞事件的订阅者,从粘滞事件缓存区获取之前发送过的粘滞事件,响应这些粘滞事件。

post的发送事件逻辑

(1)取当前线程的发送事件封装数据,并从封装的数据中拿到发送事件的事件队列;

(2)将要发送的事件加入到事件队列中去;

(3)开启循环,每次发送队列中的一条事件给所有订阅了这个事件的订阅者;

(4)如果十子事件可以响应父事件的事件模式,需要先将这个事件的所有父类、接口、父类接口的父类都找到,并让订阅了这些父类信息的订阅者也都响应这个事件。

响应事件的逻辑

(1)发送事件处理完成后会将事件交给负责响应的逻辑部分;

(2)首先判断事件的响应模式,响应模式分为四种:

  • PostThread在哪个线程调用的post()方法,就在哪个线程执行响应方法。
  • MainThread无论在哪个线程调用的post()方法,最终都在主线程执行响应方法。
  • BackgroundThread无论在哪个线程调用的post()方法,最终都在后台线程执行响应方法。它是串行执行的,即一次只执行一个任务,其他任务在队列中处于等待状态。
  • Async无论在哪个线程调用post()方法,最终都在后台线程执行响应方法。它是并行执行的,只要有任务就开一个线程让它执行。

unregister的取消注册逻辑

(1)首先是调用unregister()方法拿到要取消注册的订阅者B;

(2)从这个类订阅的时候存入的Map<订阅者,List<订阅的事件> >中,拿到这个类的订阅事件集合;

(3)遍历订阅事件集合,在注册的时候存入的Map<事件,List<订阅这个事件的订阅者> >中将对应事件的订阅者集合中的这个订阅者移除。

(4)将步骤2中的Map<订阅者,List<订阅的事件> >中这个订阅者相关的Entry移除。

2.EventBus的构建

我们一般通过getDefault()方法获取EventBus的单例,该方法的源码如下:

/** 在App使用的整个过程中可以方便地获取一个EventBus的实例 */
public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}

EventBus通过Double-Check模式构建了一个线程安全并且懒加载的一个单例。我们继续看EventBus的构造方法:

private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
/**
 * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a
 * central bus, consider {@link #getDefault()}.
 */
public EventBus() {
    this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
...
}

从上面的代码中我们可以看到EventBus的构造函数参数为EventBusBuilder类,这个类是用来创建EventBus对象的。

/**
 * Creates EventBus instances with custom parameters and also allows to install a custom default EventBus instance.
 * Create a new builder using {@link EventBus#builder()}.
 */
public class EventBusBuilder {
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    boolean logSubscriberExceptions = true; // 监听异常日志
    boolean logNoSubscriberMessages = true; // 如果没有订阅者,显示一个Log
    boolean sendSubscriberExceptionEvent = true; // 发送监听到异常事件
    boolean sendNoSubscriberEvent = true; // 如果没有订阅者,发送一条默认事件
    boolean throwSubscriberException; // 如果失败则抛出异常
    boolean eventInheritance = true; // event的子类是否也能响应订阅者
    boolean ignoreGeneratedIndex;
    boolean strictMethodVerification;
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
    List<Class<?>> skipMethodVerificationForClasses;
    List<SubscriberInfoIndex> subscriberInfoIndexes;
    Logger logger;
    MainThreadSupport mainThreadSupport;

    EventBusBuilder() {
    }

    ...
}

接着我们看EventBusBuilder的最终构造方法

/**
 * 根据参数创建对象,并赋值给EventBus.defaultInstance, 必须在默认的eventbus对象使用以前调用
 *
 * @throws EventBusException if there's already a default EventBus instance in place
 */
public EventBus installDefaultEventBus() {
    synchronized (EventBus.class) {
        if (EventBus.defaultInstance != null) {
            throw new EventBusException("Default instance already exists." +
                    " It may be only set once before it's used the first time to ensure consistent behavior.");
        }
        EventBus.defaultInstance = build();
        return EventBus.defaultInstance;
    }
}

/** 根据参数创建对象 */
public EventBus build() {
    return new EventBus(this);
}

EventBusBuilder类提供了两种建造方法,之前的getDefault()方法维护了一个单例对象,installDefaultEventBus()方法建造的EventBus对象最终会复制给那个单例对象,但是有一个前提我们之前没有创建过那个单例对象。EventBus.defaultInstance不为null以后程序要抛出异常。第二个方法就是默认的建造者方法了。

再回到我们的EventBus构造方法,根据提供的建造者初始化了一大堆属性。

EventBus(EventBusBuilder builder) {
    logger = builder.getLogger();
    subscriptionsByEventType = new HashMap<>(); // //key为event,value为subscriber列表,这个map就是这个事件有多少的订阅者,也就是事件对应的订阅者
    typesBySubscriber = new HashMap<>(); //keysubscribervalueevent列表,这个map就是这个订阅者有多少的事件,也就是订阅者订阅的事件列表
    stickyEvents = new ConcurrentHashMap<>(); //粘性事件
    mainThreadSupport = builder.getMainThreadSupport(); //MainThreadposter
    mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
    backgroundPoster = new BackgroundPoster(this); //Backgroudposter
    asyncPoster = new AsyncPoster(this); //Asyncposter
    indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
    subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
            builder.strictMethodVerification, builder.ignoreGeneratedIndex);
    logSubscriberExceptions = builder.logSubscriberExceptions;
    logNoSubscriberMessages = builder.logNoSubscriberMessages;
    sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
    sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
    throwSubscriberException = builder.throwSubscriberException;
    eventInheritance = builder.eventInheritance;
    executorService = builder.executorService;
}

首先说一下这三个 HasMap。

subscriptionsByEventType 是以 event 为 key,subscriber列表 为 value,当发送 event 的时候,都是去这里找对应的订阅者。

typesBySubscriber 是以 subscriber 为 key,event列表 为 value,当 register() 和 unregister() 的时候都是操作这个map,同时对 subscriptionsByEventType 进行对用操作。

stickyEvents 维护的是粘性事件,粘性事件也就是当 event 发送出去之后再注册粘性事件的话,该粘性事件也能收到之前发送出去的 event。

在初始化的这些字段中有三个至关重要

private final HandlerPoster mainThreadPoster; //前台发送者
private final BackgroundPoster backgroundPoster; //后台发送者
private final AsyncPoster asyncPoster;   //后台发送者(只让队列第一个待订阅者去响应)

这三个Poster类本质上是HandlerPoster类,我们继续分析这个类。

public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue; // 发送任务队列
    private final int maxMillisInsideHandleMessage; // 最大发送时间
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        ...
    }

    @Override
    public void handleMessage(Message msg) {
        ...
    }
}

HandlerPoster类的构造函数传入三个参数,分别是EventBus实例、当前线程的Looper和最大发送时间。该类的两个成员函数enqueue()方法是添加订阅者和订阅事件;handleMessage()方法是发送事件。其中有一个非常重要的成员PendingPostQueue(发送任务队列)。我们继续分析PendingPostQueue类:

final class PendingPostQueue {
    private PendingPost head; // 待发送对象队列头节点
    private PendingPost tail; // 待发送对象队列尾节点

    // 入队函数
    synchronized void enqueue(PendingPost pendingPost) {
        ...
    }
   
    // 出队函数
    synchronized PendingPost poll() {
        ...
    }
   
    // 延时出队函数
    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        ...
    }
}

该队列类有两个成员变量,分别是头节点head和尾节点tail。接着我们还需分析队列中的元素PendingPost类:

final class PendingPost {
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>(); // 对象池

    Object event; // 事件类型
    Subscription subscription; // 订阅者
    PendingPost next; // 队列下一个待发送的对象

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }

    /**
     * 首先检查复用池中是否有可用,如果有则返回复用,否则返回一个新的
     *
     * @param subscription 订阅者
     * @param event        订阅事件
     * @return 待发送对象
     */
    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }

    /**
     * 回收一个待发送对象,并加入复用池
     *
     * @param pendingPost 待回收的待发送对象
     */
    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }
}

通过上面代码可以看出,该类提供了一个的设计,类似于线程池,目的是减小对象创建的开销,档一个对象不用了,我们可以留着它,下次再需要的时候返回这个保留的对象而不是再去创建一个新对象。

队列元素包括一个事件类型,一个订阅者和一个next指针,指向下一个待发送的对象。

obtainPendingPost()方法首先检查复用池中是否有对象可用,如果有则返回复用的对象,否则返回一个新的对象。

releasePendingPost()方法是回收一个待发送的对象,并加入复用池。并且为了防止复用池的无限增长,默认复用池的最大长度是10000,一般情况下足够用了。

到了这里我们终于可以往上层走了,继续分析PendingPostQueue中的入队enqueue()方法出队poll()方法。

synchronized void enqueue(PendingPost pendingPost) {
    if (pendingPost == null) {
        throw new NullPointerException("null cannot be enqueued");
    }
    if (tail != null) {
        tail.next = pendingPost;
        tail = pendingPost;
    } else if (head == null) {
        head = tail = pendingPost;
    } else {
        throw new IllegalStateException("Head present, but no tail");
    }
    notifyAll();
}

入队时,先判断tail尾节点是否为空,如果不为空则证明队列中已经有元素,将新元素补到队列最后一个即可;若尾节点为空并且头节点为空,说明队列为空,将头节点和尾节点都指向这个新元素;若尾节点为空头节点不为空,那就是出现异常了,直接报错。

synchronized PendingPost poll() {
    PendingPost pendingPost = head;
    if (head != null) {
        head = head.next;
        if (head == null) {
            tail = null;
        }
    }
    return pendingPost;
}

synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
    if (head == null) {
        wait(maxMillisToWait);
    }
    return poll();
}

出队时,移动头节点head到它的下一个next位置,如果队列移空,则将尾节点tail置为null,返回出队的元素。

入队出队方法都声明了synchronized,使它们可以在多线程下也能正常工作。

我们再回到HandlerPoster的入队方法enqueue()方法:

/**
 * @param subscription 订阅者
 * @param event        订阅事件
 */
public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if (!handlerActive) {
            handlerActive = true;
            if (!sendMessage(obtainMessage())) {
                throw new EventBusException("Could not send handler message");
            }
        }
    }
}

入队方法会根据参数创建待发送对象pendingPost并加入队列,如果此时handleMessage()没有在运行中,则发送一条空消息让handleMessage响应,接下来我们分析handleMessage()方法:

@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // Check again, this time in synchronized
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            //如果订阅者没有取消注册,则分发消息
            eventBus.invokeSubscriber(pendingPost);

            //如果在一定时间内仍然没有发完队列中所有的待发送者,则退出
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return;
            }
        }
    } finally {
        handlerActive = rescheduled;
    }
}

handleMessage()不停的在待发送队列queue中去取消息。 需要说明的是在循环之外有个临时boolean变量rescheduled,最后是通过这个值去修改了handlerActive。而 handlerActive 是用来判断当前queue中是否有正在发送对象的任务,看到上面的入队方法enqueue(),如果已经有任务在跑着了,就不需要再去sendMessage()唤起我们的handleMessage()

最终通过eventBus对象的invokeSubscriber()最终发送出去,并回收这个pendingPost,让注册了的订阅者去响应(相当于回调),至于这个发送方法,我们之后再看。

Poster工作原理 Handler、Message、Lopper的工作原理

接着我们看Poster的工作原理

整个EventBus源码的发送接收核心部分已经分析完了。

3.register()方法的分析

分析完EventBus类的构造方法,接着我们分析另一个入口register()方法。

public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

我们在注册时调用的register方法,传入一个Object订阅者对象,通过反射获取具体的订阅者对象类。然后调用subscriberMethodFinder的findSubscriberMethods()方法获取该订阅者订阅的所有方法列表。最后开启一个for循环,将该订阅者与它的每个订阅方法进行绑定。

在该方法中有一个SubscriberMethod类,从字面上可以看出它是订阅者方法的类,我们进一步分析:

public class SubscriberMethod {
    final Method method; // 方法名
    final ThreadMode threadMode; // 工作在哪个线程
    final Class<?> eventType; // 参数类型
    final int priority;
    final boolean sticky;
    /** Used for efficient comparison */
    String methodString;

    public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.priority = priority;
        this.sticky = sticky;
    }

    @Override
    public boolean equals(Object other) {
        ...
    }

    private synchronized void checkMethodString() {
        ...    
    }

    @Override
    public int hashCode() {
        return method.hashCode();
    }
}

该类复写了equals()和hashcode()方法。其中checkMethodString()方法具体如下:

private synchronized void checkMethodString() {
    if (methodString == null) {
        // Method.toString has more overhead, just take relevant parts of the method
        StringBuilder builder = new StringBuilder(64);
        builder.append(method.getDeclaringClass().getName());
        builder.append('#').append(method.getName());
        builder.append('(').append(eventType.getName());
        methodString = builder.toString();
    }
}

checkMethodString()方法就是为了设置变量 methodString 的值,这里new了一个StringBuilder,然后又调用了toString()返回。这个类的目的就是封装了方法名。

在register()方法中我们还遇到了SubscriberMethodFinder类,从字面上看这个类是发现订阅者订阅的方法。我们一般在activity或者fragment的onStart()方法中注册时,传入一个参数this,SubscriberMethodFinder类就是查看传进去的那个this对象里面有没有onEvent()方法。这个类用了大量的反射区查找类中方法名。

先看该类的变量声明

private static final String ON_EVENT_METHOD_NAME = "onEvent";

/**
 * 在较新的类文件,编译器可能会添加方法。那些被称为BRIDGE或SYNTHETIC方法。
 * EventBus必须忽略两者。有修饰符没有公开,但在Java类文件中有格式定义
 */
private static final int BRIDGE = 0x40;
private static final int SYNTHETIC = 0x1000;
//需要忽略的修饰符
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE |
        SYNTHETIC;

//key:类名,value:该类中需要相应的方法集合
private static final Map<String, List<SubscriberMethod>> methodCache = new HashMap<String,
        List<SubscriberMethod>>();

//跳过校验方法的类(即通过构造函数传入的集合)
private final Map<Class<?>, Class<?>> skipMethodVerificationForClasses;

接下来分析一下findSubscriberMethods()方法,从while循环里开始看:

Method[] methods = clazz.getDeclaredMethods();
for (Method method : methods) {
    String methodName = method.getName();
    if (methodName.startsWith(ON_EVENT_METHOD_NAME)) {
        int modifiers = method.getModifiers();//方法的修饰符
        //如果是public, 不是之前定义要忽略的类型
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            //。。。先不看
        }
    }
}
clazz = clazz.getSuperclass();

首先是反射获取到 clazz 的全部方法 methods。 通过对全部的方法遍历,为了效率首先做一次筛选,只关注我们的以 “onEvent” 开头的方法。(现在知道之前在基础用法中我说:其实命名不一定必须是onEvent()的原因了吧,因为只要是onEvent开头的就可以了。) 忽略private类型的,最后如果是公有,并且不是 java编译器 生成的方法名,那么就是我们要的了。

再来看拿到要的方法后是怎么处理的

Class<?>[] parameterTypes = method.getParameterTypes();
//如果只有一个参数
if (parameterTypes.length == 1) {
    String modifierString = methodName.substring(ON_EVENT_METHOD_NAME
            .length());
    ThreadMode threadMode;
    if (modifierString.length() == 0) {
        threadMode = ThreadMode.PostThread;
    } else if (modifierString.equals("MainThread")) {
        threadMode = ThreadMode.MainThread;
    } else if (modifierString.equals("BackgroundThread")) {
        threadMode = ThreadMode.BackgroundThread;
    } else if (modifierString.equals("Async")) {
        threadMode = ThreadMode.Async;
    } else {
        if (skipMethodVerificationForClasses.containsKey(clazz)) {
            continue;
        } else {
            throw new EventBusException("Illegal onEvent method, check " +
                    "for typos: " + method);
        }
    }
    Class<?> eventType = parameterTypes[0];
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(methodName);
    methodKeyBuilder.append('>').append(eventType.getName());
    String methodKey = methodKeyBuilder.toString();
    if (eventTypesFound.add(methodKey)) {
        // 方法名,工作在哪个线程,事件类型
        subscriberMethods.add(new SubscriberMethod(method, threadMode,
                eventType));
    }
}

还是反射,拿到这个方法的全部参数集合,如果是只有一个参数,再去根据不同的方法名赋予不同的线程模式(其实也就是最后响应的方法是工作在哪个线程)。 这里我们看到,其实EventBus不仅仅支持onEvent()的回调,它还支持onEventMainThread()、onEventBackgroundThread()、onEventAsync()这三个方法的回调。 一直到最后,我们看到这个方法把所有的方法名集合作为value,类名作为key存入了 methodCache 这个全局静态变量中。意味着,整个库在运行期间所有遍历的方法都会存在这个 map 中,而不必每次都去做耗时的反射取方法了。

synchronized (methodCache) {
    methodCache.put(key, subscriberMethods);
}
return subscriberMethods;

看了这么久,我们再回到 EventBus#register() 方法。这回可以看懂了,就是拿到指定类名的全部订阅方法(以 onEvent 开头的方法),并对每一个方法调用subscribe()。那么再看subscribe()方法。

4.subscribe()方法的分析

subscribe()方法接受四个参数,分别为:订阅者封装的对象、响应方法名封装的对象、是否为粘滞事件(可理解为广播)、这条事件的优先级。

//根据传入的响应方法名获取到响应事件(参数类型)
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority);
//通过响应事件作为key,并取得这个事件类型将会响应的全部订阅者
//没个订阅者至少会订阅一个事件,多个订阅者可能订阅同一个事件(多对多)
//key:订阅的事件,value:订阅这个事件的所有订阅者集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);

//根据优先级插入到订阅者集合中
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
    if (i == size || newSubscription.priority > subscriptions.get(i).priority) {
        subscriptions.add(i, newSubscription);
        break;
    }
}

//当前订阅者订阅了哪些事件
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
    subscribedEvents = new ArrayList<Class<?>>();
    typesBySubscriber.put(subscriber, subscribedEvents);
}
//key:订阅者对象,value:这个订阅者订阅的事件集合
subscribedEvents.add(eventType);

跳过一些初始化的局部变量(逻辑看注释就够了) 如果传入的事件是有优先级之分的,则会根据优先级,将事件插入所有订阅了事件eventType的类的集合subscriptions中去。看逻辑我们发现,这里并没有对优先级的大小做限制,默认的优先级是0,priority越大,优先级越高。 每个订阅者是可以有多个重载的onEvent()方法的,所以这里多做了一步,将所有订阅者的响应方法保存到subscribedEvents中。 至此,我们就知道了 EventBus 中那几个map的全部含义。同时也回答了上一篇中问的为什么如果EventBus.defaultInstance不为null以后程序要抛出异常,就是因为这几个 map 不同了。 map 变了以后,订阅的事件就全部变为另一个 EventBus 对象的了,就没办法响应之前那个 EventBus 对象的订阅方法了。

最后又是一个感叹:子事件也可以让响应父事件的 onEvent() 。这个有点绕,举个例子,订阅者的onEvent(CharSequence),如果传一个String类型的值进去,默认情况下是不会响应的,但如果我们在构建的时候设置了 eventInheritance 为 true ,那么它就会响应了。

if(sticky)
if (eventInheritance) {
    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
    for (Map.Entry<Class<?>, Object> entry : entries) {
        Class<?> candidateEventType = entry.getKey();
        //如果eventtypecandidateEventType同一个类或是其子类
        if (eventType.isAssignableFrom(candidateEventType)) {
            Object stickyEvent = entry.getValue();
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
} else {
    Object stickyEvent = stickyEvents.get(eventType);
    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}

最后是调用checkPostStickyEventToSubscription()做一次安全判断,就调用postToSubscription()发送事件了。 这里就关联到了我们之前讲的Poster类的作用了。 回答之前的问题:Poster只负责粘滞事件的代码。这里可以回答一部分:如果不是 sticky 事件都直接不执行了,还怎么响应。

private void postToSubscription(...) {
    switch (threadMode) {
        case PostThread:
            //直接调用响应方法
            invokeSubscriber(subscription, event);
            break;
        case MainThread:
            //如果是主线程则直接调用响应事件,否则使用handle去在主线程响应事件
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
            //。。。
    }
}

最后,还记得我们之前没有讲的那个invokeSubscriber(subscription, event);方法吗? 之前我们不知道subscriberMethod是什么,现在我们能看懂了,就是通过反射调用订阅者类subscriber的订阅方法onEventXXX(),并将event作为参数传递进去

subscription.subscriberMethod.method.invoke(subscription.subscriber, event);

Register与Poster工作图

原理图

流程图

5.post()方法的分析

我们继续来看EventBus类,的另一个入口方法post()

//已省略部分代码
public void post(Object event) {
    PostingThreadState postingState = currentPostingThreadState.get();
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);

    if (!postingState.isPosting) {
        postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        while (!eventQueue.isEmpty()) {
            postSingleEvent(eventQueue.remove(0), postingState);
        }
        postingState.isPosting = false;
        postingState.isMainThread = false;
    }
}

post() 方法首先从 currentPostingThreadState 对象中取了一个 PostingThreadState ,我们来看看这个 currentPostingThreadState 对象的创建代码。

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new
ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};

ThreadLocal 是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据,而这段数据是不会与其他线程共享的。其内部原理是通过生成一个它包裹的泛型对象的数组,在不同的线程会有不同的数组索引值,通过这样就可以做到每个线程通过 get() 方法获取的时候,取到的只能是自己线程所对应的数据。 在 EventBus 中, ThreadLocal 所包裹的是一个 PostingThreadState 类,它仅仅是封装了一些事件发送中过程所需的数据。

final static class PostingThreadState {
    //通过post方法参数传入的事件集合
    final List<Object> eventQueue = new ArrayList<Object>(); 
    boolean isPosting; //是否正在执行postSingleEvent()方法
    boolean isMainThread;
    Subscription subscription;
    Object event;
    boolean canceled;
}

回到 post() 方法,我们看到其核心代码是这句:

while (!eventQueue.isEmpty()) {
    postSingleEvent(eventQueue.remove(0), postingState);
}

每次调用post()的时候都会传入一个事件,这个事件会被加入到队列。而每次执行postSingleEvent()都会从队列中取出一个事件,这样不停循环取出事件处理,直到队列全部取完。 再看 postSingleEvent() 方法

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        //获取到eventClass所有父类的集合
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            //左或右只要有一个为真则为真,并赋值给左
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            Log.d(TAG, "No subscribers registered for event " + eventClass);
        }

        //参考sendNoSubscriberEvent注释
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

还记得 EventBusBuild 中的 eventInheritance是做什么的吗?它表示一个子类事件能否响应父类的 onEvent() 方法。 再往下看 lookupAllEventTypes() 它通过循环和递归一起用,将一个类的父类,接口,父类的接口,父类接口的父类,全部添加到全局静态变量 eventTypes 集合中。之所以用全局静态变量的好处在于用全局静态变量只需要将那耗时又复杂的循环+递归方法执行一次就够了,下次只需要通过 key:事件类名 来判断这个事件是否以及执行过 lookupAllEventTypes() 方法。

6.postSingleEventForEventType()方法的分析

然后我们继续往下,看发送方法 postSingleEventForEventType()

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        //所有订阅了eventClass的事件集合
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        //回调subscription的响应方法
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

它首先通过这一句

subscriptions = subscriptionsByEventType.get(eventClass);

获取到所有订阅了 eventClass 的事件集合,之前有讲过, subscriptionsByEventType 是一个以 key:订阅的事件 value:订阅这个事件的所有订阅者集合 的 Map 。 最后通过循环,遍历所有订阅了 eventClass 事件的订阅者,并向每一个订阅者发送事件。 看它的发送事件的方法:

postToSubscription(subscription, event, postingState.isMainThread);

噢,又回到了和之前 Subscribe 流程中处理粘滞事件相同的方法里————对声明不同线程模式的事件做不同的响应方法,最终都是通过invokeSubscriber()反射订阅者类中的以onEvent开头的方法。

6.unregister()方法的分析

我们继续来看EventBus类,的最后一个入口方法unregister()

public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            //取消注册subscribereventType事件的响应
            unsubscribeByEventType(subscriber, eventType);
        }
        //subscriber对所有事件都不响应以后,移除订阅者
        typesBySubscriber.remove(subscriber);
    }
}

之前讲过typesBySubscriber key:订阅者对象 value:这个订阅者订阅的事件集合,表示当前订阅者订阅了哪些事件。 首先遍历要取消注册的订阅者订阅的每一个事件,调用unsubscribeByEventType(),从这个事件的所有订阅者集合中将要取消注册的订阅者移除。最后再以:当前订阅者为 key 全部订阅事件集合为 value 的一个 Map 的 Entry 移除,就完成了取消注册的全部过程。