当前位置:网站首页>EventBus源码分析
EventBus源码分析
2022-08-04 05:25:00 【vivianluomin】
作为传递消息的神器,我一直对它十分感兴趣,所以现在来看看它。
一般情况下,我们会在需要传递消息的地方注册一个对象:例如在Activity中
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
EventBus.getDefault().register(this);
}
@Override
protected void onDestroy() {
super.onDestroy();
EventBus.getDefault().unregister(this);
}
}
所以我们就从这个看起吧:
首先会调用EventBus.getDefault()
得到EvenBus对象:
static volatile EventBus defaultInstance;
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
可以看到,这里采用的是double-check的单例模式。
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public EventBus() {
this(DEFAULT_BUILDER);
}
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
private final Map<Object, List<Class<?>>> typesBySubscriber;
private final Map<Class<?>, Object> stickyEvents;
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
//用来保存订阅方法参数类型和Subscription List
subscriptionsByEventType = new HashMap<>();
//用来保存订阅者和被订阅方法的参数类型
typesBySubscriber = new HashMap<>();
//粘性事件
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
//主线程发送器
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
//后台发送器
backgroundPoster = new BackgroundPoster(this);
//异步线程发送器
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ?
builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder =
new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
//异常设置
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
可以看到,在EventBus的构造方法中,主要创建了一个Map对象和三个不同类型的线程类型发送器,可以匹配注解中的threadMode字段。
先看一个用法:
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent1(String text){
Log.d(TAG, "onMessageEvent1: "+text+"-----"+Thread.currentThread().getName());
}
@Subscribe(threadMode = ThreadMode.BACKGROUND)
public void onMessageEvent2(String text){
Log.d(TAG, "onMessageEvent2: "+text+"-----"+Thread.currentThread().getName());
}
@Subscribe(threadMode = ThreadMode.POSTING)
public void onMessageEvent3(String text){
Log.d(TAG, "onMessageEvent3: "+text+"-----"+Thread.currentThread().getName());
}
@Subscribe(threadMode = ThreadMode.ASYNC)
public void onMessageEvent4(String text){
Log.d(TAG, "onMessageEvent4: "+text+"-----"+Thread.currentThread().getName());
}
@Subscribe(threadMode = ThreadMode.MAIN_ORDERED)
public void onMessageEvent5(String text){
Log.d(TAG, "onMessageEvent5: "+text+"-----"+Thread.currentThread().getName());
}
new Thread(new Runnable() {
@Override
public void run() {
String s = "aaaaaaaaaaaaaaaa";
EventBus.getDefault().post(s);
}
}).start();
打印结束:
onMessageEvent2: aaaaaaaaaaaaaaaa-----Thread-5
onMessageEvent3: aaaaaaaaaaaaaaaa-----Thread-5
onMessageEvent4: aaaaaaaaaaaaaaaa-----pool-1-thread-1
onMessageEvent1: aaaaaaaaaaaaaaaa-----main
onMessageEvent5: aaaaaaaaaaaaaaaa-----main
可以看到,五个方法都调用了,而且有些运行的线程不一样。
我们先看看threadMode各个字段的含义:
- POSTING:默认的模式,开销最小的模式。事件的处理在事件的发送的那个线程。对应上面的事件3,也就是处理的事件的线程是我们发送事件所在的线程。
- MAIN:事件的处理在UI线程中执行。
- BACKGROUND:事件的处理会在一个后台线程中执行。如果发送事件的线程不是UI线程,就直接用该发送事件的线程。如果发送事件的线程是UI线程,就会使用一个单独的后台线程,将按顺序分发所有的事件。对于上面的事件2.
- ASYNC:事件处理会在单独的线程中执行,它一直与发送事件的线程和主线程独立,主要用于在后台线程中执行耗时操作,每一个事件都会开启一个线程(通过线程池)。应该限制线程数目。
- MAIN_ORDERED:事件的处理在主线程,与MAIN不同在于,用这个模式,事件等排序等待分发。这个保证了发送发不会阻塞。
然后我们看看EventBus的注册事件:
public void register(Object subscriber) {
//得到订阅者的Class
Class<?> subscriberClass = subscriber.getClass();
//找到该CLass下的所有订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//对订阅方法注册
subscribe(subscriber, subscriberMethod);
}
}
}
我们先看是怎么找到订阅方法的:
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//尝试从缓存中获取该订阅者的订阅方法
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//如果没有找到,就从下面连在方法中获取。
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//将获取到的订阅方法放置到缓存中。
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
可以看到先从METHOD_CACHE
从找该类对应的所有订阅方法,METHOD_CACHE
是ConcurrentHashMap
的实例,它是HashMap的一个线程安全,支持高效并发的版本。在默认理想情况下,ConcurrentHashMap
可以支持16个线程执行并发写操作及任意数量线程的读操作。
首先从缓存中获得该订阅者的所有订阅方法,如果没有获取到,就有两种方法去获取,如果ignoreGeneratedIndex
为true,表示忽略注解器生成的MyEventBusIndex
,然后就使用反射来查找;一般情况下该值默认为false,我们就会进入下面的方法:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//通过FindState对象来存储找到的方法信息
//从池中或创建使用
FindState findState = prepareFindState();
//初始化
findState.initForSubscriber(subscriberClass);
//从当前类开始遍历该类的所有父类
while (findState.clazz != null) {
//获得SubscriberInfo
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
//如果使用了MyEventBusIndex,将会到这里并获取订阅方法信息。
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 未使用MyEventBusIndex将会进入这里使用反射获取方法信息
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
这里从池取出或创建FindState
对象,并且调用getSubscriberInfo
查找是否有新添加的SubscriberInfoIndex
,这个是需要添加编辑器支持的,并且在编译器自动 生成,需要手动添加,这里默认的情况下是不存在,所以会进入findUsingReflectionInSingleClass
方法,通过反射来调用。,接着调用moveToSuperclass
方法获取其父类。最后将findState回收。
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
//得到该类的方法。
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//遍历方法
for (Method method : methods) {
int modifiers = method.getModifiers();
// 如果方法的修饰符是public
//并且不是 Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//获取参数类型数组
Class<?>[] parameterTypes = method.getParameterTypes();
//如果参数是1个
//说明我们在用EventBus的时候,订阅方法的参数只能有一个
if (parameterTypes.length == 1) {
//判断该方法是否被Subscribe 注解修饰
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
//如果被Subscribe 注解修饰,则进入这里。
Class<?> eventType = parameterTypes[0];
//检查是否已经添加了该eventType,eventType是由参数定义的
//如果返回false,说明父类也有这个方法,但是子类重写了它。
if (findState.checkAdd(method, eventType)) {
//得到threadMode
ThreadMode threadMode = subscribeAnnotation.threadMode();
//添加订阅方法
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
//抛出参数过多的异常
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
//抛出方法修饰符不正确异常
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
在这里,通过反射来获取主要分为:
- 获取类的方法
- 遍历类,并且检查
- 检查方法修饰符是不是public
- 检查参数数量是不是等于1
- 检查方法是否含有SubScribe注解
如果上面的三个条件都满足,就需要检查方法和参数类型是否已经存在,是否此方法需要添加threadMode
- 根据检查结果来判断是否要添加threadMode
下面是checkAdd
方法:
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
// Usually a subscriber doesn't have methods listening to the same event type.
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
//map中没有改参数和method
return true;
} else {
if (existing instanceof Method) {
//如果有该eventType的value,就比较在map里面的value是不是Method类型
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = methodKeyBuilder.toString();
//得到方法所在的类
Class<?> methodClass = method.getDeclaringClass();
//检查是否以前有存在
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
//如果不存在或者methodClass是methodClassOld的子类
//就返回true,表示重新替换成子类的方法
return true;
} else {
// Revert the put, old class is further down the class hierarchy
//如果不是子类,就回退刚才的put
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
在addCheck
方法中,会先判断anyMethodByEventType
存不存key为eventType的一项,如果不存在,就返回true,并且将method添加到了anyMethodByEventType
。如果存在,就看现在方法所在的class是不是之前方法所在class的子类,如果是子类,就替换之前存在的value,否则就还原之前的。
现在需要订阅的方法找到了,
然后对每个订阅的方法调用subscribe
方法:
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//参数类型
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//是否存在有该eventType的subscriptions
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
· //subscriptionsByEventType里面存放的是参数类型相同的所有方法
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
//检查是否已经被注册了
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
//根据优先级添加新的订阅方法
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//typesBySubscriber中存放的是某一个订阅者所有的事件类型(通过参数划分)
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
//添加事件类型,也就是新的参数类型
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
//如果是粘性方法
if (eventInheritance) {
// 构造是默认为true
//stickyEvents是ConcurrentHashMap的实例
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
//如果candidateEventType是eventType的子类或相同
if (eventType.isAssignableFrom(candidateEventType)) {
//得到stickyEvent
Object stickyEvent = entry.getValue();
//这个是特别少见的情况
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
- 订阅方法的过程是一个队方法遍历的过程,每一个方法都要单独订阅。如果已经订阅过方法了,将抛出异常
- 首先得到同类型事件的所有方法,如果不存在,就新建一个
CopyOnWriteArrayList
用来存在同类型事件的所有方法,然后按照优先级添加进去。事件的类型是由参数的类型决定的,并且只有有一个参数。 - 然后得到该订阅者下的所有被订阅的事件类型,将新的事件类型添加进去。
- 检查是否是粘性事件
然后看看它的post方法:
public void post(Object event) {
//使用ThreadLoacl来确保线程中局部变量PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
//获取事件队列
List<Object> eventQueue = postingState.eventQueue;
//将最新的事件添加队尾
eventQueue.add(event);
//如果当前没有进行中的事件
if (!postingState.isPosting) {
//设置当前线程是否为主线程
postingState.isMainThread = isMainThread();
//设置正在发送状态
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//队列循环,将所有的事件全部执行
while (!eventQueue.isEmpty()) {
//发送事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
在调用post
方法的时候,会通过ThreadLocal
来确保一个线程中只有一个PostingThreadState
,然后将此次事件添加到PostingThreadState
中的eventQueue
中,然后循环遍历队列,调用postSingleEvent
发送事件。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
//获取事件的类型
Class<?> eventClass = event.getClass();
//subscriptionFound 表示是否找到了接收者
boolean subscriptionFound = false;
//eventInheritance为true
if (eventInheritance) {
//得到同类型事件下所有父类和接口的类型
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
//对查找后的结果进行遍历
Class<?> clazz = eventTypes.get(h);
//为每个class发送事件
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//没有接受者的话
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
在上面的方法中,先得到事件的Class对象,然后调用lookupAllEventTypes
方法,根据事件的Class对象得到该Class对象的所有父类类型和接口类型:
/** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
//eventTypesCache是HashMap的实例
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
//循环遍历
eventTypes.add(clazz);
//添加对应的接口类型
addInterfaces(eventTypes, clazz.getInterfaces());
//得到父类
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
得到eventTypes
后,就为每一个Class对象发送消息:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//得到同eventClass的所有订阅方法
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
//循环遍历该订阅方法
for (Subscription subscription : subscriptions) {
//事件赋值
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//发送消息
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
根据传入的参数类型从subscriptionsByEventType
中获取订阅信息CopyOnWriteArrayList<Subscription> subscriptions
,如果存在的话,对postingState
的event
和subscription
赋值,并且调用postToSubscription
方法,最后将event
和subscription
置为null:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据订阅方法的threadMode来确定如何发送
switch (subscription.subscriberMethod.threadMode) {
//默认,在哪个线程发就在哪个线程执行
case POSTING:
invokeSubscriber(subscription, event);
break;
//在主线程执行
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
//在后台线程执行
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
//异步线程
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
我们一个一个的看吧:
- POSTING:它是直接调用
invokeSubscriber
方法:
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
其实上面方法就是利用反射invoke我们需要的方法。
- MAIN:先判断是不是在主线程,如果在主线程,就好办了,直接调用
invokeSubscriber
方法,利用反射invoke我们需要的方法。如果不在主线程,就通过mainThreadPoster
的enqueue
方法,让subscription, event
入队:mainThreadPoster
的类型是HandlerPoster
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//得到pendingPost 对象
Post pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//入队
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//发送消息
//要知道,这个Looper是Main线程的looper
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
从上面的代码中,可以看出来,如果thradMode是MAIN,如果发送消息的线程不是主线程,那么就会先将需要唤醒的方法的subscription, event
入队,然后然后利用Handler发送消息,在Handler的handleMessage
方法中,调用eventBus.invokeSubscriber(pendingPost);
,利用反射来invoke方法。
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
- MAIN_ORDERED:和MAIN差不多,区别在于它是优先考虑使用
mainThreadPoster
来发送消息,也就是Handler发送消息。 - BACKGROUND:先判断是否在主线程中,如果在主线程中,就利用
backgroundPoster.enqueue(subscription, event);
来发送消息,否则直接调用invokeSubscriber(subscription, event);
来发送消息。
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//获取pendingPost 的实例
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//入队
queue.enqueue(pendingPost);
//如果没有执行
if (!executorRunning) {
executorRunning = true;
//使用线程池来执行这个任务
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
//invoke方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
通过上面代码,如果threadMode是BACKGROUND,如果发送消息的线程是主线程,就会利用线程池来执行一个Runnable任务,也就是BackgroundPoster
,然后在它的run方法中,通过队列去取pendingPost,然后调用eventBus.invokeSubscriber(pendingPost);
,利用反射invoke方法。
- ASYNC:直接调用了
asyncPoster.enqueue(subscription, event);
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
可以看到它和BACKGROUND
差不多,都是通过线程池来执行一个Runnable任务,然后在run方法中通过eventBus.invokeSubscriber(pendingPost);
,利用反射invoke方法。不同在于只要入队,就会用线程池来执行这一个任务。
EventBus中的executorService
是EventBuilder中的:
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可是看到,核心线程数为0,也就是线程执行完任务后,就死了。
边栏推荐
猜你喜欢
《看见新力量》第四期免费下载!走进十五位科技创业者的精彩故事
MySQL日志篇,MySQL日志之binlog日志,binlog日志详解
Can‘t connect to MySQL server on ‘localhost3306‘ (10061) 简洁明了的解决方法
What is the salary of a software testing student?
[One step in place] Jenkins installation, deployment, startup (complete tutorial)
编程大杂烩(四)
字节最爱问的智力题,你会几道?
3面头条,花7天整理了面试题和学习笔记,已正式入职半个月
Performance testing with Loadrunner
Do you think border-radius is just rounded corners?【Various angles】
随机推荐
C语言 -- 操作符详解
MySql data recovery method personal summary
编程大杂烩(三)
7.15 Day21---MySQL----索引
你以为border-radius只是圆角吗?【各种角度】
The Road to Ad Monetization for Uni-app Mini Program Apps: Full Screen Video Ads
MySQL log articles, binlog log of MySQL log, detailed explanation of binlog log
Embedded system driver primary [3] - _IO model in character device driver foundation
文献管理工具 | Zotero
OpenRefine中的正则表达式
嵌入式系统驱动初级【4】——字符设备驱动基础下_并发控制
想好了吗?
C Expert Programming Chapter 5 Thinking about Linking 5.3 5 Special Secrets of Library Linking
FPGA学习笔记——知识点总结
离线采集怎么看sql执行计划
8.03 Day34---BaseMapper query statement usage
flink cdc一启动,源端Oracle那台服务器的CPU就飙升到80%以上,会是啥原因呢?
【论文阅读笔记】无监督行人重识别中的采样策略
Camera2 闪光灯梳理
4.2 声明式事务概念