前言

EventBus 可以轻量级的事件订阅和发布, 在一个 app 中或者说在打包在一个 apk 包中的代码中, 是可以订阅, 然后在其他任何地方发送, 然后接收。 其 EventBus 的源码比较简单。 但是很多 App 包括地图淘金也使用了 EventBus, 所以分析其工作原理还是很有必要的呢。
这里仅仅分析简单的普通事件及流程。

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

EventBus.getDefault().post(MyEvent())
}

override fun onStart() {
super.onStart()
EventBus.getDefault().register(this)
}

override fun onStop() {
super.onStop()
EventBus.getDefault().unregister(this)
}

@Subscribe(threadMode = ThreadMode.MAIN)
fun myEventBack(event: MyEvent) {
}
}

所有的代码以上即可,以前跟我们分析了。 在 onStart 中订阅, 在 onStop 进行取消订阅, myEventBack 方法为我们的订阅回调的方法来接收消息, onCreate 中发送了消息。

源码分析

发送消息

我们直接上来看 EventBus 的 post 方法吧, 看看是怎么发送消息的。 这样的分析可能有点倒, 但是源码流程过于简单, 就这也吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

首先看着这个 PostingThreadState 是个啥。

1
2
3
4
5
6
7
8
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}

这个类是当前发送事件时的一些状态, 比如 isMainThread 即当前发送是否是在主线程中发送的呢, isPosting 即是否正在处理分发等等。 我们也可以看下 postingState 是 ThreaLocal 的, 所以它是不同 Thread 是不同的哈。
我们继续往下看源码, 先把我们要发送的消息放在了队列里, 判断了是否正在分发, 如果分发就退出了, 因为我们已经再分发了, 我们添加到队列里, 上次分发没有结束就会把当前消息给分发出去了。 我们可以看到我们从队列中一个一个取出来后就直接调用了 postSingleEvent。 所以我们的消息最后交给了 postSingleEvent。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

我们可以看到 eventInheritance 变量为 true 的时候可能会构造更多的 class, 然后跟 为 false 最后调用的结果一样。 我们看看 eventInheritance 是做什么呢。 我们看看 lookupAllEventTypes 这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}

static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}

我们可以很清晰的看出, 如果为 true 的话, 会吧当前消息的父类及接口都会加入到当前分发的 class 中。 所以 eventInheritance 的作用很明显, 在我们 defaultEvent 中, 该参数为 true, 所以默认消息的父类及接口都会被分发到。
我们继续看 postSingleEventForEventType 方法吧, 通过消息进行下一步的处理及分发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

通过上面的方法, 我们可以看出通过 subscriptionsByEventType 获取到该消息类型对应的方法信息, 所以可以猜测 subscriptionsByEventType 这个存的是 event 和 event 对于的方法信息。 然后通过 postToSubscription 进行下一步处理, 这里主要是对分发的线程做了处理, 没有什么好说的, 直接看 invokeSubscriber 方法, 我们可以很清晰的看到, 在这里我们进行了方法的调用。 所以呢, 我们订阅的回调接收方法就运行了哈。

订阅

我们通过上面发送消息的分析, 我们应该可以猜测这个流程做了什么哈, 应该是通过我们订阅的类, 找到我们关注的回调接收方法, 收集到其的一些信息, 然后存起来即可。 通过发送消息, 我们也可以大胆点我们吧所有的消息对于的关系都存在了 subscriptionsByEventType 缓存中。 我们看看代码验证下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);

if (subscriberMethod.sticky) {
// ......
}
}

其实上面两个方法也很简单, 表达的也非常清晰。 看着代码我们也验证了我们刚才的猜测。 通过 subscriberMethodFinder 的 findSubscriberMethods 找到当前类中我们需要关注的订阅的方法接收信息。 然后再一个个的放到 subscriptionsByEventType 里面。 至于后面的按照 priority 排序插入等, 这个不再分析了, 很简单。
所以我们来看看是怎么通过订阅的类找到相应的方法呢, 即 subscriptionsByEventType。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

这个主要是加了一个缓存, 其中 ignoreGeneratedIndex 默认是为 false 的哈。 但是这个啥啥东西呢。 我们看看这两个分支后处理的区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// ignoreGeneratedIndex 为 true 时
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

// ignoreGeneratedIndex 为 false 时
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

通过上面两个分支的处理, 如果 findState.subscriberInfo != null 时, 其值不管是什么都是一样的逻辑处理。

1
2
3
4
while (clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}

我们看名字应该也容易看出, findUsingReflectionInSingleClass 是我们通过反射进行找我们想要的信息。 我们看到 findState.moveToSuperclass(); 又再次查找其父类, 所以在处理一个类中的数据时, 还会把其继承的类中的也包含进来。 然后把所有的方法等信息返回出去。
所以现在不同的就是下面这个方法, 所以我们看看这个是做了什么事情呢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}

private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}

通过 getSubscriberInfo 方法我可以看出其设计。 其中有三个 return, 如果前两个 return 都不符合, 那么直接返回 null 了, 返回 null 呢, 我们就跟上面分析的一样了, 直接使用反射去找了, 等下直接使用发反射, 那么前面两个 return 不是通过反射去找的吗? 跟缓存一样, 可以给我们用户去自己定义, 然后提高性能? 看着像对吧。
我们一个个的看看。

1
2
3
4
5
6
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}

如果 findState.subscriberInfo 不为空的话, 所以我们需要 findState.subscriberInfo 不为空的时候, 然后其提供该类的方法信息。 但是看到其赋值等没有发现再哪里可以对其进行赋值。 所以这个先不深究了。
看第二个, 这个就很明显了。

1
2
3
4
5
6
7
8
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}

如果 subscriberInfoIndexes 不为空的话, 就直接返回了哈。 这个值我们可以定义 EventBus 进行设置。
注意我们不管通过那种方法获取的的数据, 在添加 subscriberMethods, 前对我们获取到的数据进行了部分排除, 至于什么样的数据才能添加进去呢, 后面一起说。
最后我们分析下 findUsingReflectionInSingleClass 通过反射找到相应方法的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
}
}
}

也很清晰哈, 不分析了。 所以我们直接看看 checkAdd 什么符合呢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
// Usually a subscriber doesn't have methods listening to the same event type.
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}

private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());

String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}

我们主要看下 checkAddWithMethodSignature 里的 methodClassOld.isAssignableFrom(methodClass) 方法, 通过结果可以看到, 如果 methodClass 是 methodClassOld 的子类的话, 就返回了 true, 也就覆盖了。 如果 methodClass 不是 methodClassOld 的子类的话, 那么返回 false 不添加。 这说明了什么, 说明了如果子类继承了父类, 且整两个方法都加了订阅的注解, 那么仅仅添加子类, 父类不添加哈。 也就是如果子类不调用 super 的话, 那么父类是不会被接收的呢, 这点开发时一定要注意呢。

取消订阅

根据上面的分析, 取消订阅就是把当前订阅的类中的方法信息去掉即可。 我们看看代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

这个就不再过的过多分析了, 基本就是 remove。