使用Java多线程实现任务分发

使用Java多线程实现任务分发,第1张

线程下载由来已久 如 FlashGet NetAnts 等工具 它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围) 首先能读取出请求内容 (即欲下载的文件) 的大小 划分出若干区块 把区块分段分发给每个线程去下载 线程从本段起始处下载数据及至段尾 多个线程下载的内容最终会写入到同一个文件中

只研究有用的 工作中的需求 要把多个任务纳凯分派给Java的多个线程去执行 这其中就会有一个任务列表指派到线程的策略思考 已知 一个待执行的任务列表 指定要启动的线程数 问题是 每个线程实际要执行哪些任务

使用Java多线程实现这种任务分发的策略是 任务列表连续按线程数分段 先保证每线程平均能分配到的任务数 余下的任务从前至后依次附加到线程中——只是数量上 实际每个线程执行的任务都还是连续的 如果出现那种僧多(线程) 粥(任务) 少的情况 实际启动的线程数就等于任务数 一挑一 这里只实现了每个线程各扫自谨宴家门前雪 动作快的完成后眼见别的线程再累都是爱莫能助

实现及演示代码如下 由三个类实现 写在了一个 Java 文件中 TaskDistributor 为任务分发器 Task 为待执行的任务 WorkThread 为自定的工作洞晌唤线程 代码中运用了命令模式 如若能配以监听器 用上观察者模式来控制 UI 显示就更绝妙不过了 就能实现像下载中的区块着色跳跃的动感了 在此定义下一步的着眼点了

代码中有较为详细的注释 看这些注释和执行结果就很容易理解的 main() 是测试方法

package monimport java util ArrayListimport java util List/*** 指派任务列表给线程的分发器* @author Unmi* QQ: Email: * MSN: */public class TaskDistributor {/*** 测试方法* @param args*/public static void main(String[] args) {//初始化要执行的任务列表List taskList = new ArrayList()for (int i = i <i++) {taskList add(new Task(i))}//设定要启动的工作线程数为 个int threadCount = List[] taskListPerThread = distributeTasks(taskList threadCount)System out println( 实际要启动的工作线程数 +taskListPerThread length)for (int i = i <taskListPerThread lengthi++) {Thread workThread = new WorkThread(taskListPerThread[i] i)workThread start()}}/*** 把 List 中的任务分配给每个线程 先平均分配 剩于的依次附加给前面的线程* 返回的数组有多少个元素 (List) 就表明将启动多少个工作线程* @param taskList 待分派的任务列表* @param threadCount 线程数* @return 列表的数组 每个元素中存有该线程要执行的任务列表*/public static List[] distributeTasks(List taskList int threadCount) {// 每个线程至少要执行的任务数 假如不为零则表示每个线程都会分配到任务int minTaskCount = taskList size() / threadCount// 平均分配后还剩下的任务数 不为零则还有任务依个附加到前面的线程中int remainTaskCount = taskList size() % threadCount// 实际要启动的线程数 如果工作线程比任务还多// 自然只需要启动与任务相同个数的工作线程 一对一的执行// 毕竟不打算实现了线程池 所以用不着预先初始化好休眠的线程int actualThreadCount = minTaskCount >? threadCount : remainTaskCount// 要启动的线程数组 以及每个线程要执行的任务列表List[] taskListPerThread = new List[actualThreadCount]int taskIndex = //平均分配后多余任务 每附加给一个线程后的剩余数 重新声明与 remainTaskCount//相同的变量 不然会在执行中改变 remainTaskCount 原有值 产生麻烦int remainIndces = remainTaskCountfor (int i = i <taskListPerThread lengthi++) {taskListPerThread[i] = new ArrayList()// 如果大于零 线程要分配到基本的任务if (minTaskCount >) {for (int j = taskIndexj <minTaskCount + taskIndexj++) {taskListPerThread[i] add(taskList get(j))}taskIndex += minTaskCount}// 假如还有剩下的 则补一个到这个线程中if (remainIndces >) {taskListPerThread[i] add(taskList get(taskIndex++))remainIndces }}// 打印任务的分配情况for (int i = i <taskListPerThread lengthi++) {System out println( 线程 + i + 的任务数 +

taskListPerThread[i] size() + 区间[ + taskListPerThread[i] get( ) getTaskId() + + taskListPerThread[i] get(taskListPerThread[i] size() ) getTaskId() + ] )}return taskListPerThread}}/*** 要执行的任务 可在执行时改变它的某个状态或调用它的某个 *** 作* 例如任务有三个状态 就绪 运行 完成 默认为就绪态* 要进一步完善 可为 Task 加上状态变迁的监听器 因之决定UI的显示*/class Task {public static final int READY = public static final int RUNNING = public static final int FINISHED = private int status//声明一个任务的自有业务含义的变量 用于标识任务private int taskId//任务的初始化方法public Task(int taskId){this status = READYthis taskId = taskId}/*** 执行任务*/public void execute() {// 设置状态为运行中setStatus(Task RUNNING)System out println( 当前线程 ID 是 + Thread currentThread() getName()+ | 任务 ID 是 +this taskId)// 附加一个延时try {Thread sleep( )} catch (InterruptedException e) {e printStackTrace()}// 执行完成 改状态为完成setStatus(FINISHED)}public void setStatus(int status) {this status = status}public int getTaskId() {return taskId}}/*** 自定义的工作线程 持有分派给它执行的任务列表*/class WorkThread extends Thread {//本线程待执行的任务列表 你也可以指为任务索引的起始值private List taskList = nullprivate int threadId/*** 构造工作线程 为其指派任务列表 及命名线程 ID* @param taskList 欲执行的任务列表* @param threadId 线程 ID*/public WorkThread(List taskList int threadId) {this taskList = taskListthis threadId = threadId}/*** 执行被指派的所有任务*/public void run() {for (Task task : taskList) {task execute()}}}

执行结果如下 注意观察每个Java多线程分配到的任务数量及区间 直到所有的线程完成了所分配到的任务后程序结束

线程 的任务数 区间[ ]线程 的任务数 区间[ ]线程 的任务数 区间[ ]线程 的任务数 区间[ ]线程 的任务数 区间[ ]实际要启动的工作线程数 当前线程 ID 是 Thread | 任务 ID 是 当前线程 ID 是 Thread | 任务 ID 是 当前线程 ID 是 Thread | 任务 ID 是 当前线程 ID 是 Thread | 任务 ID 是 当前线程 ID 是 Thread | 任务 ID 是 当前线程 ID 是 Thread | 任务 ID 是 当前线程 ID 是 Thread | 任务 ID 是 当前线程 ID 是 Thread | 任务 ID 是

上面坦白来只算是基本功夫 贴出来还真见笑了 还有更为复杂的功能

lishixinzhi/Article/program/Java/gj/201311/27443

线程名其实是一个地址,所以宴前你要开晌含清200个线程,就可以做两百次循环,就可以了CreateThread()的返回HANDLE报存在一个数组里就可以了

HANDLE hThread[200]

for(int i = 0i<200i++)

hThread[i] = CreateThread(NULL,0,程序名,NULL,0,NULL)

这老凳样就可以了

     异步通信机制,将工作线程中需更新UI的 *** 作信息 传递到 UI主线程,从而实现 工作线程对UI的更新处理,最终实现异步消息的处理。Handler不仅仅能将子线程的数据传递给主线程,它能实现任意两个线程的数据传递。

(1)Message

    Message 可以在线程之间传递消息。可以在它的内部携带少量数据,用于在不同线程之间进行数据交换。除了 what 字段,还可以使用 arg1 和 arg2 来携带整型数据,使用 obj 来携带 Object 数据。

(2) Handler

    Handler 作为处理中心,用于发送(sendMessage 系列方法)与处理消息(handleMessage 方法)。

(3) MessageQueue

    MessageQueue 用于存放所有通过 Handler 发送的消息。这部分消息会一直存放在消息队列中,直到被处理。每个线程中只会有一个 MessageQueue 对象

(4) Looper

    Looper 用于管理 MessageQueue 队列,Looper对象通过loop()方法开启了一个死循环——for (){},不断地从looper内的MessageQueue中取出Message,并传递到 Handler 的 handleMessage() 方法中。每个线程中只会有一个 Looper 对象。

    AsyncTask 是一种轻量级的任务异步类,可以在后台子线程执行任务,且将执行进度及执行结果传递给 UI 线程。

(1)onPreExecute()

    在 UI 线程上工作,在任务执行 doInBackground() 之前调用。此步骤通常用于设置任务,例如在用户界面中显示进度条。

(2)doInBackground(Params... params)

    在子线程中工作,在 onPreExecute() 方法结束后执行,这一步被用于在后台执行长时间的任务,Params 参数通过 execute(Params) 方法被传递到此方法中。任务执行结束后,将结果传递给 onPostExecute(Result) 方法,同时我们可以通过 publishProgress(Progress) 方法,将执行进度发送桐租明给 onProgressUpdate(Progress) 方法。

(3)onProgressUpdate(Progress... values)

    在 UI 线程上工作,会在 doInBackground() 中调用 publishProgress(Progress) 方法后执行,此方法用于在后台计算仍在执行时(也就是 doInBackgound() 还在执行时)将计算执行进度通过 UI 显示出来。例如,可以通过动画进度条或显示文本字段中的日志,从而方便用户知道后台任务执行的进度。

(4)onPostExecute(Result result)

    在 UI 线程上工作,在任务执行完毕(即 doInBackground(Result) 执行完毕)并将执行结果传过来的时候工作。

使用规则:

(1)AsyncTask 是个抽象类,所以要创建它的子类实现抽象方法

(1)AsyncTask 类必须是在 UI 线程中被加载,但在Android 4.1(API 16)开始,就能被自动加载完成。

(2)AsyncTask 类的实例对象必须在 UI 线程中被创建。

(3)execute() 方法必须是在 UI 线程中被调用。

(4)不要手动调用方法 onPreExecute()、onPostExecute()、doInBackground()、onProgressUpdate()

(5)任务只能执行一次(如果尝试第二次执行,将抛出异常)。即一个AsyncTask对象只能调用一次execute()方法。

原理:

          其源码中原理还是 Thread 与 Handler 的实现,其包含 两个线程池,一个 Handler,如下所示:

名称类型作用

SERIAL_EXECUTOR线程池分发任务,串行分发,一型链次只分发一局告个任务

THREAD_POOL_EXECUTOR线程池执行任务,并行执行,执行的任务由 SERIAL_EXECUTOR 分发

InternalHandlerHandler负责子线程与主线程的沟通,通知主线程做 UI 工作

    一方面减少了每个并行任务独自建立线程的开销,另一方面可以管理多个并发线程的公共资源,从而提高了多线程的效率。所以ThreadPoolExecutor比较适合一组任务的执行。Executors利用工厂模式对ThreadPoolExecutor进行了封装。

Executors提供了四种创建ExecutorService的方法,他们的使用场景如下:

1. Executors.newFixedThreadPool()

    创建一个定长的线程池,每提交一个任务就创建一个线程,直到达到池的最大长度,这时线程池会保持长度不再变化。

当线程处于空闲状态时,它们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲出来。

只有核心线程并且不会被回收,能够更加快速的响应外界的请求。

2. Executors.newCachedThreadPool()

    创建一个可缓存的线程池,如果当前线程池的长度超过了处理的需要时,它可以灵活的回收空闲的线程,当需要增加时,它可以灵活的添加新的线程,而不会对池的长度作任何限制

    线程数量不定的线程池,只有非核心线程,最大线程数为 Integer.MAX_VALUE。当线程池中的线程都处于活动状态时,线程池会创建新的线程来处理新任务,否则利用空闲的线程来处理新任务。线程池中的空闲线程具有超时机制,为 60s。

    任务队列相当于一个空集合,导致任何任务都会立即被执行,适合执行大量耗时较少的任务。当整个线程池都处于限制状态时,线程池中的线程都会超时而被停止。

3. Executors.newScheduledThreadPool()

    创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer。

    非核心线程数没有限制,并且非核心线程闲置的时候立即回收,主要用于执行定时任务和具有固定周期的重复任务。

4. Executors.newSingleThreadExecutor()

    创建一个单线程化的executor,它只创建唯一的worker线程来执行任务

    只有一个核心线程,保证所有的任务都在一个线程中顺序执行,意义在于不需要处理线程同步的问题。

    一般用于执行后台耗时任务,当任务执行完成会自动停止;同时由于它是一个服务,优先级要远远高于线程,更不容易被系统杀死,因此比较适合执行一些高优先级的后台任务。

使用步骤:创建IntentService的子类,重写onHandleIntent方法,在onHandleIntent中执行耗时任务

    原理:在源码实现上,IntentService封装了HandlerThread和Handler。onHandleIntent方法结束后会调用IntentService的stopSelf(int startId)方法尝试停止服务。

    IntentService的内部是通过消息的方式请求HandlerThread执行任务,HandlerThread内部又是一种使用Handler的Thread,这就意味着IntentService和Looper一样是顺序执行后台任务的

(HandlerThread:封装了Handler + ThreadHandlerThread适合在有需要一个工作线程(非UI线程)+任务的等待队列的形式,优点是不会有堵塞,减少了对性能的消耗,缺点是不能同时进行多个任务的处理,需要等待进行处理。处理效率低,可以当成一个轻量级的线程池来用)


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/yw/12568635.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-26
下一篇 2023-05-26

发表评论

登录后才能评论

评论列表(0条)

保存