消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出。如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的Handler进行处理;如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来。在编写AndroID应用程序时,当程序执行的任务比较繁重时,为了不阻塞UI主线程而导致ANR的发生,我们通常的做法的创建一个子线程来完成特定的任务。在创建子线程时,有两种选择,一种通过创建Thread对象来创建一个无消息循环的子线程;还有一种就是创建一个带有消息循环的子线程,而创建带有消息循环的子线程由于两种实现方法,一种是直接利用AndroID给我们封装好的HandlerThread类来直接生成一个带有消息循环的线程对象,另一种方法是在实现线程的run()方法内使用以下方式启动一个消息循环:
一、消息机制使用
通常消息都是有一个消息线程和一个Handler组成,下面我们看PowerManagerService中的一个消息Handler:
mHandlerThread = new ServiceThread(TAG,Process.THREAD_PRIORITY_disPLAY,false /*allowIo*/); mHandlerThread.start(); mHandler = new PowerManagerHandler(mHandlerThread.getLooper());
这里的ServiceThread就是一个HandlerThread,创建Handler的时候,必须把HandlerThread的looper传进去,否则就是默认当前线程的looper。
而每个handler,大致如下:
private final class PowerManagerHandler extends Handler { public PowerManagerHandler(Looper looper) { super(looper,null,true /*async*/); } @OverrIDe public voID handleMessage(Message msg) { switch (msg.what) { case MSG_USER_ACTIVITY_TIMEOUT: handleUserActivityTimeout(); break; case MSG_SANDMAN: handleSandman(); break; case MSG_SCREEN_BRIGHTnesS_BOOST_TIMEOUT: handleScreenBrightnessBoostTimeout(); break; case MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT: checkWakeLockAquiretoolong(); Message m = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); m.setAsynchronous(true); mHandler.sendMessageDelayed(m,WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT); break; } } }
二、消息机制原理
那我们先来看下HandlerThread的主函数run函数:
public voID run() { mTID = Process.myTID(); Looper.prepare(); synchronized (this) { mLooper = Looper.myLooper();//赋值后notifyall,主要是getLooper函数返回的是mLooper notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTID = -1; }
再来看看Lopper的prepare函数,最后新建了一个Looper对象,并且放在线程的局部变量中。
public static voID prepare() { prepare(true); } private static voID prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
Looper的构造函数中创建了MessageQueue
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
我们再来看下MessageQueue的构造函数,其中nativeInit是一个native方法,并且把返回值保存在mPtr显然是用long型变量保存的指针
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
native函数中主要创建了NativeMessageQueue对象,并且把指针变量返回了。
static jlong androID_os_MessageQueue_nativeInit(jnienv* env,jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env,"Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast<jlong>(nativeMessageQueue);}
NativeMessageQueue构造函数就是获取mLooper,如果没有就是新建一个Looper
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NulL),mPollObj(NulL),mExceptionObj(NulL) { mLooper = Looper::getForThread(); if (mLooper == NulL) { mLooper = new Looper(false); Looper::setForThread(mLooper); }}
然后我们再看下Looper的构造函数,显示调用了eventfd创建了一个fd,eventfd它的主要是用于进程或者线程间的通信,我们可以看下这篇博客eventfd介绍
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks),mSendingMessage(false),mPolling(false),mEpollFd(-1),mEpollRebuildrequired(false),mNextRequestSeq(0),mResponseIndex(0),mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0,EFD_NONBLOCK); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0,"Could not make wake event fd. errno=%d",errno); autoMutex _l(mlock); rebuildEpollLocked();}
2.1 c层创建epoll
我们再来看下rebuildEpollLocked函数,创建了epoll,并且把mWakeEventFd加入epoll,而且把mRequests的fd也加入epoll
voID Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) {#if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set",this);#endif close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. mEpollFd = epoll_create(EPolL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0,"Could not create epoll instance. errno=%d",errno); struct epoll_event eventItem; memset(& eventItem,sizeof(epoll_event)); // zero out unused members of data fIEld union eventItem.events = EPolliN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd,EPolL_CTL_ADD,mWakeEventFd,& eventItem); LOG_ALWAYS_FATAL_IF(result != 0,"Could not add wake event fd to epoll instance. errno=%d",errno); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd,request.fd,& eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set,errno=%d",errno); } }}
继续回到HandlerThread的run函数,我们继续分析Looper的loop函数
public voID run() { mTID = Process.myTID(); Looper.prepare(); synchronized (this) { mLooper = Looper.myLooper(); notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTID = -1; }
我们看看Looper的loop函数:
public static voID loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue;//得到Looper的mQueue // Make sure the IDentity of this thread is that of the local process,// and keep track of what that IDentity token actually is. Binder.clearCallingIDentity(); final long IDent = Binder.clearCallingIDentity(); for (;;) { Message msg = queue.next(); // might block这个函数会阻塞,阻塞主要是epoll_wait if (msg == null) { // No message indicates that the message queue is quitting. return; } // This must be in a local variable,in case a UI event sets the logger Printer logging = me.mLogging;//自己打的打印 if (logging != null) { logging.println(">>>>> dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // IDentity of the thread wasn't corrupted. final long newIDent = Binder.clearCallingIDentity(); if (IDent != newIDent) { Log.wtf(TAG,"Thread IDentity changed from 0x" + Long.toHexString(IDent) + " to 0x" + Long.toHexString(newIDent) + " while dispatching to " + msg.target.getClass().getname() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } }
MessageQueue类的next函数主要是调用了nativePollOnce函数,后面就是从消息队列中取出一个Message
Message next() { // Return here if the message loop has already quit and been disposed. // This can happen if the application trIEs to restart a looper after quit // which is not supported. final long ptr = mPtr;//之前保留的指针 if (ptr == 0) { return null; } int pendingIDleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(ptr,nextPollTimeoutMillis);
下面我们主要看下nativePollOnce这个native函数,把之前的指针强制转换成NativeMessageQueue,然后调用其pollOnce函数
static voID androID_os_MessageQueue_nativePollOnce(jnienv* env,jobject obj,jlong ptr,jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env,obj,timeoutMillis);}
2.2 c层epoll_wait阻塞
pollOnce函数,这个函数前面的while一般都没有只是处理了indent大于0的情况,这种情况一般没有,所以我们可以直接看pollinner函数
int Looper::pollOnce(int timeoutMillis,int* outFd,int* outEvents,voID** outData) { int result = 0; for (;;) { while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int IDent = response.request.IDent; if (IDent >= 0) { int fd = response.request.fd; int events = response.events; voID* data = response.request.data;#if DEBUG_PolL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled IDentifIEr %d: " "fd=%d,events=0x%x,data=%p",this,IDent,fd,events,data);#endif if (outFd != NulL) *outFd = fd; if (outEvents != NulL) *outEvents = events; if (outData != NulL) *outData = data; return IDent; } } if (result != 0) {#if DEBUG_PolL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d",result);#endif if (outFd != NulL) *outFd = 0; if (outEvents != NulL) *outEvents = 0; if (outData != NulL) *outData = NulL; return result; } result = pollinner(timeoutMillis); }}
pollinner函数主要就是调用epoll_wait阻塞,并且java层会计算每次阻塞的时间传到c层,等待有mWakeEventFd或者之前addFd的fd有事件过来,才会epoll_wait返回。
int Looper::pollinner(int timeoutMillis) {#if DEBUG_PolL_AND_WAKE ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d",timeoutMillis);#endif // Adjust the timeout based on when the next message is due. if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t Now = systemTime(SYstem_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(Now,mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; }#if DEBUG_PolL_AND_WAKE ALOGD("%p ~ pollOnce - next message in %" PRID64 "ns,adjusted timeout: timeoutMillis=%d",mNextMessageUptime - Now,timeoutMillis);#endif } // Poll. int result = PolL_WAKE; mResponses.clear();//清空mResponses mResponseIndex = 0; // We are about to IDle. mPolling = true; struct epoll_event eventItems[EPolL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd,eventItems,EPolL_MAX_EVENTS,timeoutMillis);//epoll_wait主要线程阻塞在这,这个阻塞的时间也是有java层传过来的 // No longer IDling. mPolling = false; // Acquire lock. mlock.lock(); // Rebuild epoll set if needed. if (mEpollRebuildrequired) { mEpollRebuildrequired = false; rebuildEpollLocked(); goto Done; } // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll Failed with an unexpected error,errno); result = PolL_ERROR; goto Done; } // Check for poll timeout. if (eventCount == 0) {#if DEBUG_PolL_AND_WAKE ALOGD("%p ~ pollOnce - timeout",this);#endif result = PolL_TIMEOUT; goto Done; } // Handle all events.#if DEBUG_PolL_AND_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds",eventCount);#endif for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) {//通知唤醒线程的事件 if (epollEvents & EPolliN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.",epollEvents); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd);//之前addFd的事件 if (requestIndex >= 0) { int events = 0; if (epollEvents & EPolliN) events |= EVENT_input; if (epollEvents & EPolLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPolLERR) events |= EVENT_ERROR; if (epollEvents & EPolLHUP) events |= EVENT_HANGUP; pushResponse(events,mRequests.valueAt(requestIndex));//放在mResponses中 } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.",epollEvents,fd); } } }Done: ; // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) {// 这块主要是c层的消息,java层的消息是自己管理的 nsecs_t Now = systemTime(SYstem_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= Now) { // Remove the envelope from the List. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mlock.unlock();#if DEBUG_PolL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p,what=%d",handler.get(),message.what);#endif handler->handleMessage(message); } // release handler mlock.lock(); mSendingMessage = false; result = PolL_CALLBACK; } else { // The last message left at the head of the queue deter@R_403_6386@s the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mlock.unlock(); // Invoke all response callbacks. for (size_t i = 0; i < mResponses.size(); i++) {//这是之前addFd的事件的处理,主要是遍历mResponses,然后调用其回调 Response& response = mResponses.editItemAt(i); if (response.request.IDent == PolL_CALLBACK) { int fd = response.request.fd; int events = response.events; voID* data = response.request.data;#if DEBUG_PolL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d,response.request.callback.get(),data);#endif // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd,data); if (callbackResult == 0) { removeFd(fd,response.request.seq); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = PolL_CALLBACK; } } return result;}
继续分析Looper的loop函数,可以增加自己的打印来调试代码,之前调用Message的target的dispatchMessage来分配消息
for (;;) { Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } // This must be in a local variable,in case a UI event sets the logger Printer logging = me.mLogging;//自己的打印 if (logging != null) { logging.println(">>>>> dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // IDentity of the thread wasn't corrupted. final long newIDent = Binder.clearCallingIDentity(); if (IDent != newIDent) { Log.wtf(TAG,"Thread IDentity changed from 0x" + Long.toHexString(IDent) + " to 0x" + Long.toHexString(newIDent) + " while dispatching to " + msg.target.getClass().getname() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } }
2.3 增加调试打印
我们先来看自己添加打印,可以通过Lopper的setMessageLogging函数来打印
public voID setMessageLogging(@Nullable Printer printer) { mLogging = printer; } Printer就是一个interface public interface Printer { /** * Write a line of text to the output. There is no need to terminate * the given string with a newline. */ voID println(String x);}
2.4 java层消息分发处理
再来看消息的分发,先是调用Handler的obtainMessage函数
Message msg = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); msg.setAsynchronous(true); mHandler.sendMessageDelayed(msg,WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);
先看obtainMessage调用了Message的obtain函数
public final Message obtainMessage(int what) { return Message.obtain(this,what); }
Message的obtain函数就是新建一个Message,然后其target就是设置成其Handler
public static Message obtain(Handler h,int what) { Message m = obtain();//就是新建一个Message m.target = h; m.what = what; return m; }
我们再联系之前分发消息
msg.target.dispatchMessage(msg);最后就是调用Handler的dispatchMessage函数,最后在Handler中,最后会根据不同的情况对消息进行处理。
public voID dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg);//这种就是用post形式发送,带Runnable的 } else { if (mCallback != null) {//这种是handler传参的时候就是传入了mCallback回调了 if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg);//最后就是在自己实现的handleMessage处理 } }
2.3 java层 消息发送
我们再看下java层的消息发送,主要也是调用Handler的sendMessage post之类函数,最终都会调用下面这个函数
public boolean sendMessageAtTime(Message msg,long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper",e.getMessage(),e); return false; } return enqueueMessage(queue,msg,uptimeMillis); }
我们再来看java层发送消息最终都会调用enqueueMessage函数
private boolean enqueueMessage(MessageQueue queue,Message msg,long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg,uptimeMillis); }
最终在enqueueMessage中,把消息加入消息队列,然后需要的话就调用c层的nativeWake函数
boolean enqueueMessage(Message msg,long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } synchronized (this) { if (mQuitting) { IllegalStateException e = new IllegalStateException( msg.target + " sending message to a Handler on a dead thread"); Log.w(TAG,e); msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean neeDWake; if (p == null || when == 0 || when < p.when) { // New head,wake up the event queue if blocked. msg.next = p; mMessages = msg; neeDWake = mBlocked; } else { // Inserted within the mIDdle of the queue. Usually we don't have to wake // up the event queue unless there is a barrIEr at the head of the queue // and the message is the earlIEst asynchronous message in the queue. neeDWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (neeDWake && p.isAsynchronous()) { neeDWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (neeDWake) { nativeWake(mPtr); } } return true; }
我们看下这个native方法,最后也是调用了Looper的wake函数
static voID androID_os_MessageQueue_nativeWake(jnienv* env,jclass clazz,jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->wake();}voID NativeMessageQueue::wake() { mLooper->wake();}
Looper类的wake,函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似pipe,最后会把epoll_wait唤醒,线程就不阻塞了继续先发送c层消息,然后处理之前addFd的事件,然后处理java层的消息。
voID Looper::wake() {#if DEBUG_PolL_AND_WAKE ALOGD("%p ~ wake",this);#endif uint64_t inc = 1; ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd,&inc,sizeof(uint64_t))); if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { ALOGW("Could not write wake signal,errno); } }}
2.4 c层发送消息
在c层也是可以发送消息的,主要是调用Looper的sendMessageAtTime函数,参数有有一个handler是一个回调,我们把消息放在mMessageEnvelopes中。
voID Looper::sendMessageAtTime(nsecs_t uptime,const sp<MessageHandler>& handler,const Message& message) {#if DEBUG_CALLBACKS ALOGD("%p ~ sendMessageAtTime - uptime=%" PRID64 ",handler=%p,uptime,message.what);#endif size_t i = 0; { // acquire lock autoMutex _l(mlock); size_t messageCount = mMessageEnvelopes.size(); while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { i += 1; } MessageEnvelope messageEnvelope(uptime,handler,message); mMessageEnvelopes.insertAt(messageEnvelope,i,1); // Optimization: If the Looper is currently sending a message,then we can skip // the call to wake() because the next thing the Looper will do after processing // messages is to decIDe when the next wakeup time should be. In fact,it does // not even matter whether this code is running on the Looper thread. if (mSendingMessage) { return; } } // release lock // Wake the poll loop only when we enqueue a new message at the head. if (i == 0) { wake(); }}
当在pollOnce中,在epoll_wait之后,会遍历mMessageEnvelopes中的消息,然后调用其handler的handleMessage函数
while (mMessageEnvelopes.size() != 0) { nsecs_t Now = systemTime(SYstem_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= Now) { // Remove the envelope from the List. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mlock.unlock();#if DEBUG_PolL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p,message.what);#endif handler->handleMessage(message); } // release handler mlock.lock(); mSendingMessage = false; result = PolL_CALLBACK; } else { // The last message left at the head of the queue deter@R_403_6386@s the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } }
有一个Looper_test.cpp文件,里面介绍了很多Looper的使用方法,我们来看下
sp<StubMessageHandler> handler = new StubMessageHandler(); mLooper->sendMessageAtTime(Now + ms2ns(100),Message(MSG_TEST1)); StubMessageHandler继承MessageHandler就必须实现handleMessage方法 class StubMessageHandler : public MessageHandler {public: Vector<Message> messages; virtual voID handleMessage(const Message& message) { messages.push(message); }};
我们再顺便看下Message和MessageHandler类
struct Message { Message() : what(0) { } Message(int what) : what(what) { } /* The message type. (interpretation is left up to the handler) */ int what;};/** * Interface for a Looper message handler. * * The Looper holds a strong reference to the message handler whenever it has * a message to deliver to it. Make sure to call Looper::removeMessages * to remove any pending messages destined for the handler so that the handler * can be destroyed. */class MessageHandler : public virtual RefBase {protected: virtual ~MessageHandler() { }public: /** * Handles a message. */ virtual voID handleMessage(const Message& message) = 0;};
2.5 c层addFd
我们也可以在Looper.cpp的addFd中增加fd放入线程epoll中,当fd有数据来我们也可以处理相应的数据,下面我们先来看下addFd函数,我们注意其中有一个callBack回调
int Looper::addFd(int fd,int IDent,int events,Looper_callbackFunc callback,voID* data) { return addFd(fd,callback ? new SimpleLooperCallback(callback) : NulL,data);}int Looper::addFd(int fd,const sp<LooperCallback>& callback,voID* data) {#if DEBUG_CALLBACKS ALOGD("%p ~ addFd - fd=%d,IDent=%d,callback=%p,callback.get(),data);#endif if (!callback.get()) { if (! mAllowNonCallbacks) { ALOGE("InvalID attempt to set NulL callback but not allowed for this looper."); return -1; } if (IDent < 0) { ALOGE("InvalID attempt to set NulL callback with IDent < 0."); return -1; } } else { IDent = PolL_CALLBACK; } { // acquire lock autoMutex _l(mlock); Request request; request.fd = fd; request.IDent = IDent; request.events = events; request.seq = mNextRequestSeq++; request.callback = callback; request.data = data; if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1 struct epoll_event eventItem; request.initEventItem(&eventItem); ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex < 0) { int epollResult = epoll_ctl(mEpollFd,& eventItem);//加入epoll if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d,errno); return -1; } mRequests.add(fd,request);//放入mRequests中 } else { int epollResult = epoll_ctl(mEpollFd,EPolL_CTL_MOD,& eventItem);//更新 if (epollResult < 0) { if (errno == ENOENT) { // Tolerate ENOENT because it means that an older file descriptor was // closed before its callback was unregistered and meanwhile a new // file descriptor with the same number has been created and is Now // being registered for the first time. This error may occur naturally // when a callback has the sIDe-effect of closing the file descriptor // before returning and unregistering itself. Callback sequence number // checks further ensure that the race is benign. // // Unfortunately due to kernel limitations we need to rebuild the epoll // set from scratch because it may contain an old file handle that we are // Now unable to remove since its file descriptor is no longer valID. // No such problem would have occurred if we were using the poll system // call instead,but that approach carrIEs others disadvantages.#if DEBUG_CALLBACKS ALOGD("%p ~ addFd - EPolL_CTL_MOD Failed due to file descriptor " "being recycled,falling back on EPolL_CTL_ADD,errno);#endif epollResult = epoll_ctl(mEpollFd,& eventItem); if (epollResult < 0) { ALOGE("Error modifying or adding epoll events for fd %d,errno); return -1; } scheduleEpollRebuildLocked(); } else { ALOGE("Error modifying epoll events for fd %d,errno); return -1; } } mRequests.replaceValueAt(requestIndex,request); } } // release lock return 1;}
在pollOnce函数中,我们先寻找mRequests中匹配的fd,然后在pushResponse中新建一个Response,然后把Response和Request匹配起来。
} else { ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPolliN) events |= EVENT_input; if (epollEvents & EPolLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPolLERR) events |= EVENT_ERROR; if (epollEvents & EPolLHUP) events |= EVENT_HANGUP; pushResponse(events,mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.",fd); } }
下面我们就会遍历mResponses中的Response,然后调用其request中的回调
for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.IDent == PolL_CALLBACK) { int fd = response.request.fd; int events = response.events; voID* data = response.request.data;#if DEBUG_PolL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d,response.request.seq); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = PolL_CALLBACK; } }
同样我们再来看看Looper_test.cpp是如何使用的?
Pipe pipe; StubCallbackHandler handler(true); handler.setCallback(mLooper,pipe.receiveFd,Looper::EVENT_input);
我们看下handler的setCallback函数
class CallbackHandler {public: voID setCallback(const sp<Looper>& looper,int fd,int events) { looper->addFd(fd,staticHandler,this);//就是调用了looper的addFd函数,并且回调 }protected: virtual ~CallbackHandler() { } virtual int handler(int fd,int events) = 0;private: static int staticHandler(int fd,voID* data) {//这个就是回调函数 return static_cast<CallbackHandler*>(data)->handler(fd,events); }};class StubCallbackHandler : public CallbackHandler {public: int nextResult; int callbackCount; int fd; int events; StubCallbackHandler(int nextResult) : nextResult(nextResult),callbackCount(0),fd(-1),events(-1) { }protected: virtual int handler(int fd,int events) {//这个是通过回调函数再调到这里的 callbackCount += 1; this->fd = fd; this->events = events; return nextResult; }};
我们结合Looper的addFd一起来看,当callback是有的,我们新建一个SimpleLooperCallback
int Looper::addFd(int fd,data);}
这里的Looper_callbackFunc是一个typedef
typedef int (*Looper_callbackFunc)(int fd,voID* data);
我们再来看SimpleLooperCallback
class SimpleLooperCallback : public LooperCallback {protected: virtual ~SimpleLooperCallback();public: SimpleLooperCallback(Looper_callbackFunc callback); virtual int handleEvent(int fd,voID* data);private: Looper_callbackFunc mCallback;};SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunc callback) : mCallback(callback) {}SimpleLooperCallback::~SimpleLooperCallback() {}int SimpleLooperCallback::handleEvent(int fd,voID* data) { return mCallback(fd,data);}
最后我们是调用callback->handleEvent(fd,data),而callback就是SimpleLooperCallback,这里的data,之前传进来的就是CallbackHandler 的this指针
因此最后就是调用了staticHandler,而data->handler,就是this->handler,最后是虚函数就调用到了StubCallbackHandler 的handler函数中了。
当然我们也可以不用这么复杂,直接使用第二个addFd函数,当然callBack我们需要自己定义一个类来实现LooperCallBack类就行了,这样就简单多了。
int addFd(int fd,voID* data);
2.6 java层addFd
一直以为只能在c层的Looper中才能addFd,原来在java层也通过jni做了这个功能。
我们可以在MessageQueue中的addOnfileDescriptorEventListener来实现这个功能
public voID addOnfileDescriptorEventListener(@NonNull fileDescriptor fd,@OnfileDescriptorEventListener.Events int events,@NonNull OnfileDescriptorEventListener Listener) { if (fd == null) { throw new IllegalArgumentException("fd must not be null"); } if (Listener == null) { throw new IllegalArgumentException("Listener must not be null"); } synchronized (this) { updateOnfileDescriptorEventListenerLocked(fd,Listener); } }
我们再来看看OnfileDescriptorEventListener 这个回调
public interface OnfileDescriptorEventListener { public static final int EVENT_input = 1 << 0; public static final int EVENT_OUTPUT = 1 << 1; public static final int EVENT_ERROR = 1 << 2; /** @hIDe */ @Retention(RetentionPolicy.soURCE) @IntDef(flag=true,value={EVENT_input,EVENT_OUTPUT,EVENT_ERROR}) public @interface Events {} @Events int onfileDescriptorEvents(@NonNull fileDescriptor fd,@Events int events); }
接着调用了updateOnfileDescriptorEventListenerLocked函数
private voID updateOnfileDescriptorEventListenerLocked(fileDescriptor fd,OnfileDescriptorEventListener Listener) { final int fdNum = fd.getInt$(); int index = -1; fileDescriptorRecord record = null; if (mfileDescriptorRecords != null) { index = mfileDescriptorRecords.indexOfKey(fdNum); if (index >= 0) { record = mfileDescriptorRecords.valueAt(index); if (record != null && record.mEvents == events) { return; } } } if (events != 0) { events |= OnfileDescriptorEventListener.EVENT_ERROR; if (record == null) { if (mfileDescriptorRecords == null) { mfileDescriptorRecords = new SparseArray<fileDescriptorRecord>(); } record = new fileDescriptorRecord(fd,Listener);//fd保存在fileDescriptorRecord对象 mfileDescriptorRecords.put(fdNum,record);//mfileDescriptorRecords然后保存在 } else { record.mListener = Listener; record.mEvents = events; record.mSeq += 1; } nativeSetfileDescriptorEvents(mPtr,fdNum,events);//调用native函数 } else if (record != null) { record.mEvents = 0; mfileDescriptorRecords.removeAt(index); } }
native最后调用了NativeMessageQueue的setfileDescriptorEvents函数
static voID androID_os_MessageQueue_nativeSetfileDescriptorEvents(jnienv* env,jint fd,jint events) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->setfileDescriptorEvents(fd,events);}
setfileDescriptorEvents函数,这个addFd就是调用的第二个addFd,因此我们可以肯定NativeMessageQueue继承了LooperCallback
voID NativeMessageQueue::setfileDescriptorEvents(int fd,int events) { if (events) { int looperEvents = 0; if (events & CALLBACK_EVENT_input) { looperEvents |= Looper::EVENT_input; } if (events & CALLBACK_EVENT_OUTPUT) { looperEvents |= Looper::EVENT_OUTPUT; } mLooper->addFd(fd,Looper::PolL_CALLBACK,looperEvents,reinterpret_cast<voID*>(events)); } else { mLooper->removeFd(fd); }}
果然是,需要实现handleEvent函数
class NativeMessageQueue : public MessageQueue,public LooperCallback {public: NativeMessageQueue(); virtual ~NativeMessageQueue(); virtual voID raiseException(jnienv* env,const char* msg,jthrowable exceptionObj); voID pollOnce(jnienv* env,int timeoutMillis); voID wake(); voID setfileDescriptorEvents(int fd,int events); virtual int handleEvent(int fd,voID* data);
handleEvent就是在looper中epoll_wait之后,当我们增加的fd有数据就会调用这个函数
int NativeMessageQueue::handleEvent(int fd,int looperEvents,voID* data) { int events = 0; if (looperEvents & Looper::EVENT_input) { events |= CALLBACK_EVENT_input; } if (looperEvents & Looper::EVENT_OUTPUT) { events |= CALLBACK_EVENT_OUTPUT; } if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) { events |= CALLBACK_EVENT_ERROR; } int olDWatchedEvents = reinterpret_cast<intptr_t>(data); int newWatchedEvents = mPollEnv->CallintMethod(mPollObj,gMessageQueueClassInfo.dispatchEvents,events); //调用回调 if (!newWatchedEvents) { return 0; // unregister the fd } if (newWatchedEvents != olDWatchedEvents) { setfileDescriptorEvents(fd,newWatchedEvents); } return 1;}
最后在java的MessageQueue中的dispatchEvents就是在jni层反调过来的,然后调用之前注册的回调函数
// Called from native code. private int dispatchEvents(int fd,int events) { // Get the file descriptor record and any state that might change. final fileDescriptorRecord record; final int olDWatchedEvents; final OnfileDescriptorEventListener Listener; final int seq; synchronized (this) { record = mfileDescriptorRecords.get(fd);//通过fd得到fileDescriptorRecord if (record == null) { return 0; // spurIoUs,no Listener registered } olDWatchedEvents = record.mEvents; events &= olDWatchedEvents; // filter events based on current watched set if (events == 0) { return olDWatchedEvents; // spurIoUs,watched events changed } Listener = record.mListener; seq = record.mSeq; } // Invoke the Listener outsIDe of the lock. int newWatchedEvents = Listener.onfileDescriptorEvents(//Listener回调 record.mDescriptor,events); if (newWatchedEvents != 0) { newWatchedEvents |= OnfileDescriptorEventListener.EVENT_ERROR; } // Update the file descriptor record if the Listener changed the set of // events to watch and the Listener itself hasn't been updated since. if (newWatchedEvents != olDWatchedEvents) { synchronized (this) { int index = mfileDescriptorRecords.indexOfKey(fd); if (index >= 0 && mfileDescriptorRecords.valueAt(index) == record && record.mSeq == seq) { record.mEvents = newWatchedEvents; if (newWatchedEvents == 0) { mfileDescriptorRecords.removeAt(index); } } } } // Return the new set of events to watch for native code to take care of. return newWatchedEvents; }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。
总结以上是内存溢出为你收集整理的Android6.0 消息机制原理解析全部内容,希望文章能够帮你解决Android6.0 消息机制原理解析所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)