Android中的Handler

2021/03/12 note 共 14136 字,约 41 分钟

Handler是Android中最常用的消息机制,具体指示涉及到Looper、MessageQueue、ThreadLocal、binder、epoll的知识。

0、先说自己的总结

  • Looper负责从MessageQueue中循环取消息并分发,通过ThreadLocal机制保证每个线程只有一个Looper实例,Looper.getMainLooper()所在的线程就是主线程
  • MessageQueue存储维护了一个有序链表,并通过epoll机制实现消息实时处理
  • Handler负责发送Message,并在Looper取到消息后进行消息实际回调处理

以下基于android-30的源码进行分析。

1、从Looper说起

Looper中常用的是两个方法:prepare()loop()

prepare()方法

prepare()方法很简单,将Looper实例加入到ThreadLocal线程变量副本中,如果已经初始化过,会抛出运行时异常Only one Looper may be created per thread

// 在线程中初始化looper的常用方法  
public static void prepare() {  
    prepare(true);  
}  
  
private static void prepare(boolean quitAllowed) {  
    // 通过ThreadLocal的机制,保证每个线程只有一个Looper实例  
    if (sThreadLocal.get() != null) {  
        throw new RuntimeException("Only one Looper may be created per thread");  
    }  
    sThreadLocal.set(new Looper(quitAllowed));  
}  
...  
// 初始化主线程looper的方法,这个方法不需要我们手动调用,在App启动阶段已经有ActivityThread调用过  
public static void prepareMainLooper() {  
    prepare(false);  
    synchronized (Looper.class) {  
        if (sMainLooper != null) {  
            throw new IllegalStateException("The main Looper has already been prepared.");  
        }  
        sMainLooper = myLooper();  
    }  
}  
...  
public static @Nullable Looper myLooper() {  
    return sThreadLocal.get();  
}  
  
// 私有的构造方法,在这里初始化了Looper对应的MessageQ对象,保证一一对应  
private Looper(boolean quitAllowed) {  
    mQueue = new MessageQueue(quitAllowed);  
    mThread = Thread.currentThread();  
}  

可以看到,Looper内通过ThreadLocal机制保证每个线程只能由一个Looper实例和一个MessageQueue实例。
关于ThreadLocal机制,这个不是本文的重点,可以参考文章:Java并发编程:深入剖析ThreadLocal

loop()方法

loop()就是开启循环的地方,循环读取消息并分发,调用这个方法后当前线程会进入阻塞状态,在它后面的代码不会被执行到:

/**  
 * Run the message queue in this thread. Be sure to call  
 * {@link #quit()} to end the loop.  
 */  
public static void loop() {  
    // 获取当前线程的looper实例,基于前面的ThreadLocal原理  
    final Looper me = myLooper();  
    if (me == null) {  
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");  
    }  
    if (me.mInLoop) {  
        Slog.w(TAG, "Loop again would have the queued messages be executed"  
                + " before this one completed.");  
    }  
  
    me.mInLoop = true;  
    // 每一个Looper都有唯一的一个MessageQueue  
    final MessageQueue queue = me.mQueue;  
  
    // Make sure the identity of this thread is that of the local process,  
    // and keep track of what that identity token actually is.  
    Binder.clearCallingIdentity();  
    final long ident = Binder.clearCallingIdentity();  
  
    // Allow overriding a threshold with a system prop. e.g.  
    // adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'  
    final int thresholdOverride =  
            SystemProperties.getInt("log.looper."  
                    + Process.myUid() + "."  
                    + Thread.currentThread().getName()  
                    + ".slow", 0);  
  
    boolean slowDeliveryDetected = false;  
  
    // 开启死循环  
    for (;;) {  
        // 从MessageQueue中取下一条消息,next()方法实际是epoll的机制保证没有消息时的线程挂起状态  
        Message msg = queue.next(); // might block  
        if (msg == null) {  
            // No message indicates that the message queue is quitting.  
            return;  
        }  
  
        // This must be in a local variable, in case a UI event sets the logger  
        // 如果我们手动给Handler设置logging回调,可以通过监听Dispatching和Finished之间的间隔判断是否发生ANR  
        final Printer logging = me.mLogging;  
        if (logging != null) {  
            logging.println(">>>>> Dispatching to " + msg.target + " " +  
                    msg.callback + ": " + msg.what);  
        }  
        // Make sure the observer won't change while processing a transaction.  
        final Observer observer = sObserver; // app内不能直接设置observer,hide隐藏方法,这里略过不看  
  
        final long traceTag = me.mTraceTag;  
        long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;  
        long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;  
        if (thresholdOverride > 0) {  
            slowDispatchThresholdMs = thresholdOverride;  
            slowDeliveryThresholdMs = thresholdOverride;  
        }  
        final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);  
        final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);  
  
        final boolean needStartTime = logSlowDelivery || logSlowDispatch;  
        final boolean needEndTime = logSlowDispatch;  
  
        if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {  
            Trace.traceBegin(traceTag, msg.target.getTraceName(msg));  
        }  
  
        final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;  
        final long dispatchEnd;  
        Object token = null;  
        if (observer != null) {  
            token = observer.messageDispatchStarting();  
        }  
        long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid);  
        try {  
            // 将msg交给target的Handler去处理消息  
            msg.target.dispatchMessage(msg);  
            if (observer != null) {  
                observer.messageDispatched(token, msg);  
            }  
            dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;  
        } catch (Exception exception) {  
            if (observer != null) {  
                observer.dispatchingThrewException(token, msg, exception);  
            }  
            throw exception;  
        } finally {  
            ThreadLocalWorkSource.restore(origWorkSource);  
            if (traceTag != 0) {  
                Trace.traceEnd(traceTag);  
            }  
        }  
        if (logSlowDelivery) {  
            if (slowDeliveryDetected) {  
                if ((dispatchStart - msg.when) <= 10) {  
                    Slog.w(TAG, "Drained");  
                    slowDeliveryDetected = false;  
                }  
            } else {  
                if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",  
                        msg)) {  
                    // Once we write a slow delivery log, suppress until the queue drains.  
                    slowDeliveryDetected = true;  
                }  
            }  
        }  
        if (logSlowDispatch) {  
            showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);  
        }  
  
        if (logging != null) {  
            logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);  
        }  
  
        // Make sure that during the course of dispatching the  
        // identity of the thread wasn't corrupted.  
        final long newIdent = Binder.clearCallingIdentity();  
        if (ident != newIdent) {  
            Log.wtf(TAG, "Thread identity changed from 0x"  
                    + Long.toHexString(ident) + " to 0x"  
                    + Long.toHexString(newIdent) + " while dispatching to "  
                    + msg.target.getClass().getName() + " "  
                    + msg.callback + " what=" + msg.what);  
        }  
  
        // 调用msg的回收方法,将Message内的成员变量置空,并将其加入到缓存链表中,下次调用Message.obtain()方法就可以复用  
        msg.recycleUnchecked();  
    }  
}  

可以看出,Looper的loop方法开启了一个死循环,循环的将MessageQueue中取到的消息分发到target对应的Handler中进行处理。
Message是一个链表结构,通过next变量存储下一个Message,当调用到recycleUnchecked方法时,会将内部变量置空,并加入到Message的静态变量sPool的链表中,供缓存使用。

MainLooper的初始化时机

App在启动时,Zygote进程会反射到ActivityThread类的main方法,在这里进行了MainLooper的初始化。
有关App的启动流程不是本文重点,参考文章:App 启动过程-AndroidOfferKiller

// http://androidxref.com/8.1.0_r33/xref/frameworks/base/core/java/android/app/ActivityThread.java#6459  
public static void main(String[] args) {  
    ...  
    Looper.prepareMainLooper();  
  
    ActivityThread thread = new ActivityThread();  
    thread.attach(false);  
    ...  
    Looper.loop();  
      
   throw new RuntimeException("Main thread loop unexpectedly exited");  
}  
  
// http://androidxref.com/8.1.0_r33/xref/frameworks/base/core/java/android/app/ActivityThread.java#6315  
private void attach(boolean system) {  
    sCurrentActivityThread = this;  
    mSystemThread = system;  
    if (!system) {  
        ...  
        final IActivityManager mgr = ActivityManager.getService();  
        try {  
            mgr.attachApplication(mAppThread);  
        } catch (RemoteException ex) {  
            ...  
        }  
        ...  
    }  
    ...  
}  

main方法很简单,loop方法调用后当前线程会进入阻塞状态,可以认为ActivityThread的main方法执行的线程就是主线程。
attach方法中,和ActivityManagerService(AMS)完成绑定,AMS会在后续中通过binder机制将消息传入到ActivityThread中的内部Handler变量mH中,完成生命周期的回调。
由于binder机制有线程池机制,所以ActivityThread接收到的AMS回调是执行在binder线程中的,再通过handler将消息发送到主线程执行。有关binder线程池机制介绍参考:Binder Driver浅析:Binder线程池

2、Handler源码分析

Handler的源码相对简单一点,它负责发送消息和处理消息。

Handler的构造方法

// 最常用的构造函数是这个  
@Deprecated  
public Handler() {  
    this(null, false);  
}  
  
// 这个函数是hide的,不能调用,主要是因为async这个变量,async=true表示handler发送的所有消息都是异步的,不受同步屏障影响  
// @hide  
public Handler(@Nullable Callback callback, boolean async) {  
    if (FIND_POTENTIAL_LEAKS) {// 默认为false,用来检查非static的Handler实例,防止内存泄漏  
        final Class<? extends Handler> klass = getClass();  
        if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&  
                (klass.getModifiers() & Modifier.STATIC) == 0) {  
            Log.w(TAG, "The following Handler class should be static or leaks might occur: " +  
                klass.getCanonicalName());  
        }  
    }  
  
    // 获取当前线程的looper对象,通过ThreadLocal机制  
    mLooper = Looper.myLooper();  
    if (mLooper == null) {  
        throw new RuntimeException(  
            "Can't create handler inside thread " + Thread.currentThread()  
                    + " that has not called Looper.prepare()");  
    }  
    // 获取和looper一一对应的MessageQueue对象  
    mQueue = mLooper.mQueue;  
    mCallback = callback;  
    mAsynchronous = async;  
}  

post和postDelayed方法

发送消息最常用的就是post和postDelayed方法:

public final boolean postDelayed(@NonNull Runnable r, long delayMillis) {  
    return sendMessageDelayed(getPostMessage(r), delayMillis);  
}  
  
public final boolean post(@NonNull Runnable r) {  
   // 可以看到,post内部调用的也是sendMessageDelayed方法,和postDelayed一致  
   return  sendMessageDelayed(getPostMessage(r), 0);  
}  
  
private static Message getPostMessage(Runnable r) {  
    // 通过obtain方法复用之前的Message对象  
    Message m = Message.obtain();  
    m.callback = r;  
    return m;  
}  
  
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {  
    if (delayMillis < 0) {  
        delayMillis = 0;  
    }  
    // 转换为确定时间  
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);  
}  
  
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {  
    // mQueue是在Handler的构造里面初始化的,拿到的是Looper内的queue实例,线程内唯一  
    MessageQueue queue = mQueue;  
    if (queue == null) {  
        RuntimeException e = new RuntimeException(  
                this + " sendMessageAtTime() called with no mQueue");  
        Log.w("Looper", e.getMessage(), e);  
        return false;  
    }  
    return enqueueMessage(queue, msg, uptimeMillis);  
}  
  
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,  
        long uptimeMillis) {  
    // 将target设置为handler自己,给Looper后面分发消息使用  
    msg.target = this;  
    msg.workSourceUid = ThreadLocalWorkSource.getUid();  
   // 判断是否有异步消息标志,正常开发中这个标志位是不能设置为true的,只有系统函数才能设置,这个可以通过反射或者编译时提供provide jar包的方式调用到  
    // 异步消息主要和同步屏障相对应,这个后面MessageQueue中会说到  
    if (mAsynchronous) {  
        msg.setAsynchronous(true);  
    }  
   // 调用MessageQueue的enqueueMessage方法,将消息加入到队列中  
    return queue.enqueueMessage(msg, uptimeMillis);  
}  

那Handler是怎么分发消息的呢,通过Looper的loop方法代码可以看到,Looper从MessageQueue中取到消息后,会通过msg.target.dispatchMessage(msg)执行Handler的dispatchMessage方法。
那看一下dispatchMessage方法:

public void dispatchMessage(@NonNull Message msg) {  
    // 首先判断msg的callback是否为null,这个就是之前post方法内的Runnable对象  
    // handleCallback方法内部其实调用的就是message.callback.run(),直接执行Runnable内的run方法  
    if (msg.callback != null) {  
        handleCallback(msg);  
    } else {  
        // 再判断handler本身的callback是否为null,如果不为null,交给callback处理  
        if (mCallback != null) {  
            if (mCallback.handleMessage(msg)) {  
                return;  
            }  
        }  
        // 最后是handler内可重写的方法handleMessage  
        handleMessage(msg);  
    }  
}  

Handler在子线程中的使用

在子线程中使用Handler机制来发送消息,有两种方式。

第一种就是在子线程中直接初始化Looper,直接用Handler即可:

new Thread(new Runnable() {  
    @Override  
    public void run() {  
        Looper.prepare();  
        Handler handler = new Handler() {  
            @Override  
            public void handleMessage(@NonNull Message msg) {  
                // 在这里处理消息  
            }  
        };  
          
        Looper.loop();  
    }  
}).start();  

第二种方式是使用HandlerThread,它其实是对上面代码进行的封装,它本身继承自Thread,并在run方法内调用了Looper.prepare()和Looper.loop();:

HandlerThread handlerThread = new HandlerThread("test");  
handlerThread.start(); // 启动一个HandlerThread线程  
Handler handler = new Handler(handlerThread.getLooper()) {  
    @Override  
    public void handleMessage(@NonNull Message msg) {  
        // 在这里处理消息  
    }  
};  

3、MessageQueue源码分析

MessageQueue是消息队列存放处,通过epoll机制提供了消息队列的读取功能。

epoll机制是Linux中kernel常用的IO多路复用机制,参考文章:Android 消息处理以及epoll机制

MessageQueue的构造方法

MessageQueue(boolean quitAllowed) {  
    mQuitAllowed = quitAllowed;  
    mPtr = nativeInit(); // epoll机制的初始化,通过mPtr拿到fd的引用  
}  

enqueueMessage方法

enqueueMessage方法是放置消息的入口方法,这个方法主要是Handler最终调用,通过前面的Handler可以看到,Message消息自己有target和callback保证回调,另外一个参数when表示确定执行的时间。

boolean enqueueMessage(Message msg, long when) {  
    // 通过handler发送的消息必须有target  
    // 没有target的消息是同步屏障消息,这个下面会看到  
    if (msg.target == null) {  
        throw new IllegalArgumentException("Message must have a target.");  
    }  
  
    // 同步锁  
    synchronized (this) {  
        if (msg.isInUse()) {  
            throw new IllegalStateException(msg + " This message is already in use.");  
        }  
  
        // 对于主线程来说,没有退出机制,主线程queue退出也就代表着App无响应或退出了  
        if (mQuitting) {  
            IllegalStateException e = new IllegalStateException(  
                    msg.target + " sending message to a Handler on a dead thread");  
            Log.w(TAG, e.getMessage(), e);  
            msg.recycle();  
            return false;  
        }  
  
        // 将Message标为已经使用  
        msg.markInUse();  
        msg.when = when; // 设置when确定时间  
        Message p = mMessages; // mMessages是链表的根Message  
        boolean needWake;  
        if (p == null || when == 0 || when < p.when) { // 当前消息当做头消息  
            // New head, wake up the event queue if blocked.  
            msg.next = p;  
            mMessages = msg;  
            needWake = mBlocked;  
        } else {  
            // Inserted within the middle of the queue.  Usually we don't have to wake  
            // up the event queue unless there is a barrier at the head of the queue  
            // and the message is the earliest asynchronous message in the queue.  
            needWake = mBlocked && p.target == null && msg.isAsynchronous();  
            Message prev;  
            for (;;) { // 循环,根据when的先后顺序,找到msg应该插入的位置  
                prev = p;  
                p = p.next;  
                if (p == null || when < p.when) {  
                    break;  
                }  
                if (needWake && p.isAsynchronous()) {  
                    needWake = false;  
                }  
            }  
            msg.next = p; // invariant: p == prev.next  
            prev.next = msg;  
        }  
  
        // We can assume mPtr != 0 because mQuitting is false.  
        if (needWake) {  
            nativeWake(mPtr); // epoll机制的唤醒操作  
        }  
    }  
    return true;  
}  

next方法

next方法是取出下一条Message的方法,主要是Looper.loop方法内调用,循环读取下一条消息。

@UnsupportedAppUsage  
Message next() {  
    // Return here if the message loop has already quit and been disposed.  
    // This can happen if the application tries to restart a looper after quit  
    // which is not supported.  
    final long ptr = mPtr; // mPtr是epoll机制的fd标志  
    if (ptr == 0) {  
        return null;  
    }  
  
    int pendingIdleHandlerCount = -1; // -1 only during first iteration  
    int nextPollTimeoutMillis = 0;  
    for (;;) {  
        if (nextPollTimeoutMillis != 0) {  
            Binder.flushPendingCommands();  
        }  
  
        // epoll的阻塞方法,挂起当前线程,直到超时或者被nativeWake(mPtr)唤醒  
        nativePollOnce(ptr, nextPollTimeoutMillis);  
  
        synchronized (this) {  
            // Try to retrieve the next message.  Return if found.  
            final long now = SystemClock.uptimeMillis();  
            Message prevMsg = null;  
            Message msg = mMessages;  
            // msg.target如果是null,表示是同步屏障消息  
            // 那么,取到第一条异步消息  
            // 如果没有同步屏障,那么就取当前第一条消息  
            if (msg != null && msg.target == null) {  
                // Stalled by a barrier.  Find the next asynchronous message in the queue.  
                do {  
                    prevMsg = msg;  
                    msg = msg.next;  
                } while (msg != null && !msg.isAsynchronous());  
            }  
            if (msg != null) {  
                if (now < msg.when) { // 如果当前时间还不到头msg的执行时间,设置下次取消息的休眠时间,进入下一次for循环  
                    // Next message is not ready.  Set a timeout to wake up when it is ready.  
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);  
                } else { // 否则,直接返回当前消息  
                    // Got a message.  
                    mBlocked = false; // 将mBlocked置为false,表示不阻塞,和enqueueMessage中对应  
                    if (prevMsg != null) {  
                        prevMsg.next = msg.next;  
                    } else {  
                        mMessages = msg.next;  
                    }  
                    msg.next = null;  
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);  
                    msg.markInUse();  
                    return msg;  
                }  
            } else {  
                // No more messages.  
                nextPollTimeoutMillis = -1;  
            }  
  
            // Process the quit message now that all pending messages have been handled.  
            if (mQuitting) {  
                dispose();  
                return null;  
            }  
  
            // 如果当前没有消息需要处理,进入到IdleHandler的处理内  
            // 通过pendingIdleHandlerCount这个变量表示是否进行过IdleHandler的处理  
            // If first time idle, then get the number of idlers to run.  
            // Idle handles only run if the queue is empty or if the first message  
            // in the queue (possibly a barrier) is due to be handled in the future.  
            if (pendingIdleHandlerCount < 0  
                    && (mMessages == null || now < mMessages.when)) {  
                pendingIdleHandlerCount = mIdleHandlers.size();  
            }  
            if (pendingIdleHandlerCount <= 0) {  
                // No idle handlers to run.  Loop and wait some more.  
                mBlocked = true;  
                continue;  
            }  
  
            if (mPendingIdleHandlers == null) {  
                mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];  
            }  
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);  
        } // end synchronized (this)  
  
        // 如果当前没有消息需要处理,进入到IdleHandler的处理内  
        // Run the idle handlers.  
        // We only ever reach this code block during the first iteration.  
        for (int i = 0; i < pendingIdleHandlerCount; i++) {  
            final IdleHandler idler = mPendingIdleHandlers[i];  
            mPendingIdleHandlers[i] = null; // release the reference to the handler  
  
            boolean keep = false;  
            try {  
                keep = idler.queueIdle();  
            } catch (Throwable t) {  
                Log.wtf(TAG, "IdleHandler threw exception", t);  
            }  
  
            if (!keep) {  
                synchronized (this) {  
                    mIdleHandlers.remove(idler);  
                }  
            }  
        }  
  
        // Reset the idle handler count to 0 so we do not run them again.  
        pendingIdleHandlerCount = 0;  
  
        // While calling an idle handler, a new message could have been delivered  
        // so go back and look again for a pending message without waiting.  
        nextPollTimeoutMillis = 0;  
    }  
}  

文档信息

Search

    Table of Contents