封装了各种类型的线程池,方便直接使用
看下有哪些类型:
默认线程池,搜索模块专用线程池,网络请求专用线程池,U盘更新,同步SDK读写操作线程池,日志打印使用线程池
DEFALUT,SEARCH,NET_WORK,UDISK_DOWNLOAD,SDK_IO,LOG_PRINT
看下有哪些优先级级别
UI_TOP, UI_NORMAL, UI_LOW, DEFAULT, BG_TOP, BG_NORMAL, BG_LOW;
一:实现默认线程池
接下来先实现一个 默认实现 等待队列, 优先级比较,核心线程数等的线程池
开放 核心线程数,线程创建的工厂类(可以定制线程的优先级)供 外部定制(你也可以开放更多的参数)
public class TaskPriorityExecutor implements Executor { private static final int CORE_POOL_SIZE = 3; private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 30; private static final Comparator<Runnable> RUNNABLE_COMPARATOR = new Comparator<Runnable>() { @Override public int compare(Runnable lhs, Runnable rhs) { if (lhs instanceof TaskPriorityRunnable && rhs instanceof TaskPriorityRunnable) { return ((TaskPriorityRunnable) lhs).priority.ordinal() - ((TaskPriorityRunnable) rhs).priority.ordinal(); } else { return 0; } } }; private final BlockingQueue<Runnable> mPoolWorkQueue = new PriorityBlockingQueue<Runnable>(MAXIMUM_POOL_SIZE, RUNNABLE_COMPARATOR); private final ThreadPoolExecutor mThreadPoolExecutor; public TaskPriorityExecutor() { this(CORE_POOL_SIZE, new TaskThreadFactory("Defalut")); } public TaskPriorityExecutor(int poolSize, ThreadFactory threadFactory) { mThreadPoolExecutor = new ThreadPoolExecutor( poolSize, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, mPoolWorkQueue, threadFactory); } /** * 该构造器会创建单线程池执行任务 */ public TaskPriorityExecutor(ThreadFactory threadFactory) { mThreadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public int getPoolSize() { return mThreadPoolExecutor.getCorePoolSize(); } public void setPoolSize(int poolSize) { if (poolSize > 0) { mThreadPoolExecutor.setCorePoolSize(poolSize); } // Executors.newCachedThreadPool() // Executors.newFixedThreadPool() // Executors.newScheduledThreadPool() // Executors.newSingleThreadExecutor() // Executors.newSingleThreadScheduledExecutor() } public ThreadPoolExecutor getThreadPoolExecutor() { return mThreadPoolExecutor; } /** * 线程池忙 */ public boolean isBusy() { return mThreadPoolExecutor.getActiveCount() >= mThreadPoolExecutor.getCorePoolSize(); } /** * 线程池超载 */ public boolean isFull() { return mThreadPoolExecutor.getActiveCount() >= mThreadPoolExecutor.getCorePoolSize() * 2; } public boolean isShutdown(){ return mThreadPoolExecutor.isShutdown(); } @Override public void execute(final Runnable r) { mThreadPoolExecutor.execute(r); }
二 根据接口定制业务场景的线程池接口
接下来我们就开始根据开放的接口来根据不同的业务需要来配置自己的线程池
final class TaskProxy< extends Task<ResultType> { private final Task<ResultType> task; private Executor executor; /*package*/ static final InternalHandler INTERNAL_HANDLER = new InternalHandler(); static TaskPriorityExecutor sDefaultExecutor = new TaskPriorityExecutor(); static TaskPriorityExecutor sSearchExecutor = null; static TaskPriorityExecutor sNetExecutor = null; static TaskPriorityExecutor sAE8EngineExecutor = null; static TaskPriorityExecutor sUDiskExecutor = null; static TaskPriorityExecutor sTtsInitializeExecutor = null; static TaskPriorityExecutor sSyncSdkIOHandleExecutor = null; static TaskPriorityExecutor sAdapterExecutor = null; static TaskPriorityExecutor sLoggerExecutor = null; static TaskPriorityExecutor sActivateLogExecutor = null; private ResultType result; private Throwable exception; private CancelledException cancelledException; /*package*/ TaskProxy(Task<ResultType> task) { if (task == null) { throw new IllegalArgumentException("task must not be null"); } this.task = task; this.executor = task.getExecutor(); if (this.executor == null) { this.executor = sDefaultExecutor; } } public static synchronized TaskPriorityExecutor getExecute(TaskExector type){ if (TaskExector.DEFALUT.equals(type)) { if (sDefaultExecutor == null || sDefaultExecutor.isShutdown()){ sDefaultExecutor = new TaskPriorityExecutor(4, new TaskThreadFactory("Default")); } return sDefaultExecutor; }else if(TaskExector.SEARCH.equals(type)) { if (sSearchExecutor == null || sSearchExecutor.isShutdown()){ sSearchExecutor = new TaskPriorityExecutor(2, new TaskThreadFactory("Search")); } return sSearchExecutor; } else if (TaskExector.NET_WORK.equals(type)){ if (sNetExecutor == null || sNetExecutor.isShutdown()){ sNetExecutor = new TaskPriorityExecutor(1, new TaskThreadFactory("Net")); } return sNetExecutor; } else if (TaskExector.AE8_ENGINE.equals(type)){ if (sAE8EngineExecutor == null || sAE8EngineExecutor.isShutdown()){ sAE8EngineExecutor = new TaskPriorityExecutor(1, new TaskThreadFactory("AE8_Engine")); } return sAE8EngineExecutor; } else if (TaskExector.UDISK_DOWNLOAD.equals(type)){ if (sUDiskExecutor == null || sUDiskExecutor.isShutdown()){ sUDiskExecutor = new TaskPriorityExecutor(1, new TaskThreadFactory("UDISK_DOWNLOAD")); } return sUDiskExecutor; } else if (TaskExector.SYNC_SDK_IO.equals(type)) { if (sSyncSdkIOHandleExecutor == null || sSyncSdkIOHandleExecutor.isShutdown()) { sSyncSdkIOHandleExecutor = new TaskPriorityExecutor(1, new TaskThreadFactory("SYNC_SDK_IO_HANDLE")); } return sSyncSdkIOHandleExecutor; } else if (TaskExector.USER_BL.equals(type)) { if (sSyncSdkIOHandleExecutor == null || sSyncSdkIOHandleExecutor.isShutdown()) { sSyncSdkIOHandleExecutor = new TaskPriorityExecutor(1, new TaskThreadFactory("USER_BL")); } return sSyncSdkIOHandleExecutor; }else if (TaskExector.ADAPTER.equals(type)) { if (sAdapterExecutor == null || sAdapterExecutor.isShutdown()) { sAdapterExecutor = new TaskPriorityExecutor(1, new TaskThreadFactory("ADAPTER")); } return sAdapterExecutor; } // else if (TaskExector.LOGGER.equals(type)) { // if (sLoggerExecutor == null || sLoggerExecutor.isShutdown()) { // //日志问题出于时序考虑只能单线程模型进行执行 // sLoggerExecutor = new TaskPriorityExecutor(new TaskThreadFactory("LOGGER")); // } // return sLoggerExecutor; // } else if (TaskExector.ACTIVATE_LOG.equals(type)) { if (sActivateLogExecutor == null || sActivateLogExecutor.isShutdown()) { //日志问题出于时序考虑只能单线程模型进行执行 sActivateLogExecutor = new TaskPriorityExecutor(new TaskThreadFactory("ACTIVATE_LOG")); } return sActivateLogExecutor; } return sDefaultExecutor; } public synchronized static void shutDown(TaskExector type){ Logger.d("byron", "[TaskProxy] shutDown:= "+ type); if (TaskExector.SEARCH.equals(type)) { if (sSearchExecutor != null){ sSearchExecutor.getThreadPoolExecutor().shutdown(); sSearchExecutor = null; } } else if (TaskExector.NET_WORK.equals(type)){ if (sNetExecutor != null){ sNetExecutor.getThreadPoolExecutor().shutdown(); sNetExecutor = null; } } else if (TaskExector.AE8_ENGINE.equals(type)){ if (sAE8EngineExecutor != null){ sAE8EngineExecutor.getThreadPoolExecutor().shutdown(); sAE8EngineExecutor = null; } } else if (TaskExector.UDISK_DOWNLOAD.equals(type)){ if (sUDiskExecutor != null){ sUDiskExecutor.getThreadPoolExecutor().shutdown(); sUDiskExecutor = null; } } else if (TaskExector.SYNC_SDK_IO.equals(type)) { if (sSyncSdkIOHandleExecutor != null) { sSyncSdkIOHandleExecutor.getThreadPoolExecutor().shutdown(); sSyncSdkIOHandleExecutor = null; } } else if (TaskExector.USER_BL.equals(type)) { if (sSyncSdkIOHandleExecutor != null) { sSyncSdkIOHandleExecutor.getThreadPoolExecutor().shutdown(); sSyncSdkIOHandleExecutor = null; } }else if (TaskExector.ADAPTER.equals(type)) { if (sAdapterExecutor != null) { sAdapterExecutor.getThreadPoolExecutor().shutdown(); sAdapterExecutor = null; } } // else if (TaskExector.LOGGER.equals(type)) { // if (sLoggerExecutor != null) { // sLoggerExecutor.getThreadPoolExecutor().shutdown(); // sLoggerExecutor = null; // } // } else if (TaskExector.ACTIVATE_LOG.equals(type)) { if (sActivateLogExecutor != null) { sActivateLogExecutor.getThreadPoolExecutor().shutdown(); sActivateLogExecutor = null; } } } public static void onDestory(){ Logger.d("byron", "[TaskProxy] onDestory"); if(INTERNAL_HANDLER != null){ INTERNAL_HANDLER.removeCallbacksAndMessages(null); } } @Override protected ResultType doBackground() throws Exception { this.setState(State.Waiting); TaskPriorityRunnable runnable = new TaskPriorityRunnable( task.getPriority(),null, new Runnable() { @Override public void run() { try { Logger.d("proxy", "taskProxy 1"); // trace_start running TaskProxy.this.setState(State.Running); TaskProxy.this.onStart(); result = task.doBackground(); if (TaskProxy.this.state.get() == State.Cancelled) { // 没有在doBackground过程中取消成功 Logger.d("proxy", "taskProxy 1 cancelled"); throw new CancelledException(""); } TaskProxy.this.setState(State.Finished); TaskProxy.this.onFinished(result); } catch (CancelledException cex) { Logger.d("proxy", "taskProxy e1 = {?}", cex); TaskProxy.this.setState(State.Cancelled); TaskProxy.this.onCancelled(cex); } catch (Throwable ex) { Logger.d("proxy", "taskProxy e2 = {?}", ex); TaskProxy.this.setState(State.Error); TaskProxy.this.onError(ex, false); } } }); this.executor.execute(runnable); return null; } @Override protected void onFinished(ResultType result) { INTERNAL_HANDLER.obtainMessage(MSG_WHAT_ON_FINISH, this).sendToTarget(); } @Override protected void onError(Throwable ex, boolean isCallbackError) { exception = ex; INTERNAL_HANDLER.obtainMessage(MSG_WHAT_ON_ERROR, this).sendToTarget(); } @Override protected void onStart() { INTERNAL_HANDLER.obtainMessage(MSG_WHAT_ON_START, this).sendToTarget(); } @Override protected void onUpdate(int flag, Object... args) { INTERNAL_HANDLER.obtainMessage(MSG_WHAT_ON_UPDATE, flag, 0, new ArgsObj(this, args)).sendToTarget(); } @Override protected void onCancelled(CancelledException cex) { cancelledException = cex; INTERNAL_HANDLER.obtainMessage(MSG_WHAT_ON_CANCEL, this).sendToTarget(); } private void setState(State state) { this.state.set(state); this.task.state.set(state); } @Override public TaskPriority getPriority() { return task.getPriority(); } @Override public Executor getExecutor() { return task.getExecutor(); } // ########################### inner type ############################# private static class ArgsObj { TaskProxy taskProxy; Object[] args; public ArgsObj(TaskProxy taskProxy, Object[] args) { this.taskProxy = taskProxy; this.args = args; } } private final static int MSG_WHAT_ON_START = 1; private final static int MSG_WHAT_ON_FINISH = 2; private final static int MSG_WHAT_ON_ERROR = 3; private final static int MSG_WHAT_ON_UPDATE = 4; private final static int MSG_WHAT_ON_CANCEL = 5; /*package*/ final static class InternalHandler extends Handler { private InternalHandler() { super(Looper.getMainLooper()); } @SuppressWarnings("unchecked") @Override public void handleMessage(Message msg) { if (msg.obj == null) { throw new IllegalArgumentException("msg must not be null"); } TaskProxy taskProxy = null; Object[] args = null; if (msg.obj instanceof TaskProxy) { taskProxy = (TaskProxy) msg.obj; } else if (msg.obj instanceof ArgsObj) { ArgsObj argsObj = (ArgsObj) msg.obj; taskProxy = argsObj.taskProxy; args = argsObj.args; } if (taskProxy == null) { throw new RuntimeException("msg.obj not instanceof TaskProxy"); } try { switch (msg.what) { case MSG_WHAT_ON_START: { taskProxy.task.onStart(); break; } case MSG_WHAT_ON_FINISH: { taskProxy.task.onFinished(taskProxy.result); break; } case MSG_WHAT_ON_ERROR: { try { taskProxy.task.onError(taskProxy.exception, false); } catch (Throwable ignored) { } break; } case MSG_WHAT_ON_UPDATE: { taskProxy.task.onUpdate(msg.arg1, args); break; } case MSG_WHAT_ON_CANCEL: { taskProxy.task.onCancelled(taskProxy.cancelledException); break; } default: { break; } } } catch (Throwable ex) { taskProxy.setState(State.Error); if (msg.what != MSG_WHAT_ON_ERROR) { taskProxy.task.onError(ex, true); } else { ex.printStackTrace(); } } } } }
这里是包含优先级的线程对应的基类,可以通过线程工厂类 创建一个优先级的线程类
/** * 任务线程优先级 */ public class TaskPriorityRunnable implements Runnable { public final TaskPriority priority; private final Runnable runnable; private String name; private String oldThreadName; public TaskPriorityRunnable(TaskPriority priority, String name, Runnable runnable) { this.priority = priority == null ? TaskPriority.DEFAULT : priority; this.runnable = runnable; this.name = name == null ? null : new StringBuilder().append("Executor#").append(name).toString(); } @Override public final void run() { // run会进来多次,防止多次设置线程名称引起问题,故添加对name、oldThreadName判空判断 if (!TextUtils.isEmpty(name) && TextUtils.isEmpty(oldThreadName)){ oldThreadName = Thread.currentThread().getName(); Thread.currentThread().setName(name); } this.runnable.run(); // oldThreadName不为空时才进入设置线程名称 if (!TextUtils.isEmpty(oldThreadName)){ Thread.currentThread().setName(oldThreadName); oldThreadName = null; } } }
三:其他相关类
这里面还有一些业务类的封装,这里就不再一一贴出来了(如有人需要,可以整个贴出来)
public abstract class Task<ResultType> { /*package*/ final AtomicReference<State> state = new AtomicReference<>(State.Null); /*package*/ TaskProxy taskProxy = null; protected abstract ResultType doBackground() throws Exception; protected abstract void onFinished(ResultType result); protected abstract void onError(Throwable ex, boolean isCallbackError); protected void onStart() { } protected void onUpdate(int flag, Object... args) { } protected void onCancelled(CancelledException cex) { } public final void update(int flag, Object... args) { if (taskProxy != null) { taskProxy.onUpdate(flag, args); } } public final void cancel() { this.state.set(State.Cancelled); if (taskProxy != null) { taskProxy.cancel(); } } public final State getState() { return state.get(); } public final boolean isStopped() { return this.state.get().value() > State.Running.value(); } public TaskPriority getPriority() { return null; } public Executor getExecutor() { return null; } public static class CancelledException extends RuntimeException { public CancelledException(String detailMessage) { super(detailMessage); } } public static enum State { Null(0), Waiting(1), Running(2), Finished(3), Cancelled(4), Error(5); private final int value; private State(int value) { this.value = value; } public int value() { return value; } } }