akka设计模式
首先就是设计一个管理者,来管理控制线程的并发
taskTotal是并发执行的总任务数
taskExecute是已经执行过的任务数
根据传入的并发数量来控制我们一开始要创建多少个工作
通过.actorOf来创建一个worker的ActorRef,注意名字不能重复,并且将自己的ActorRef传过去,用来接收回馈的信息
每创建完一个ActorRef,我们从任务队列中获取一个任务,让它把任务消息发给workerActor去执行任务
worker执行完之后反馈执行情况,将任务执行数量和总任务数对比,不相等则进一步判断任务对列是否还有任务
有则将任务发送过去继续执行,没有则关闭对应的worker,等待其他线程执行完成
所以线程执行完成,则关闭整个system
class TaskMaster extends Actor{ val taskTotal=29 var taskExecute=0 var queue:mutable.Queue[CaseModel.TableModel]=_ override def receive: Receive = { case CaseModel.Table(parallel)=>{ 1 to parallel foreach { i => val worker = context.actorOf(Props(new TaskWorker(self)),s"Worker-$i") worker ! CaseModel.Task(queue.dequeue()) } } case CaseModel.TaskResult(rf)=>{ taskExecute =taskExecute+1 if (taskExecute==taskTotal){ context.system.shutdown() }else{ if (queue.nonEmpty){ rf ! CaseModel.Task(queue.dequeue()) }else{ context.stop(rf) } } } } override def preStart(): Unit = { queue = Demo.getTableModel } }
设计一个工作者WorkerActor来完成具体的每一项任务
从Master中将传一个任务过来,获取到任务信息,执行任务
执行完之后将任务执行情况返回给Master,并将自己的ActorRef传过去,接收反馈信息
class TaskWorker(mT:ActorRef) extends Actor{ def doWorker(tableModel: CaseModel.TableModel) = { Thread.sleep(tableModel.time) } override def receive: Receive = { case CaseModel.Task(tableModel)=>{ println("开始执行"+tableModel.tablename+"任务") doWorker(tableModel) println(tableModel.tablename+"任务执行完成") mT ! CaseModel.TaskResult(self) // context.stop(self) } } }
消息传递样例
object CaseModel { case class TableModel(tablename: String,time:Int,n:Int) case class Table(i: Int) case class Task(tableModel:TableModel) case class TaskResult(rf:ActorRef) }
一个简单的Demo
object Demo{ val queue0 = new mutable.Queue[CaseModel.TableModel] def main(args: Array[String]): Unit = { val t1 = CaseModel.TableModel("a", 4000, 0) val t2 = CaseModel.TableModel("b", 4000, 1) val t10 =CaseModel.TableModel("m", 4000, 9) val t3 = CaseModel.TableModel("c", 4000, 2) val t4 = CaseModel.TableModel("d", 4000, 3) val t5 = CaseModel.TableModel("e", 4000, 4) val t6 = CaseModel.TableModel("f", 4000, 5) val t7 = CaseModel.TableModel("g", 4000, 6) val t8 = CaseModel.TableModel("h", 3000, 7) val t9 = CaseModel.TableModel("i", 2000, 8) val t0 = CaseModel.TableModel("j", 1000, 9) queue0.enqueue(t0,t2,t3,t4,t6,t7,t8,t9,t10,t0,t1,t2,t3,t4,t6,t7,t8,t0,t0,t0,t0,t0,t0,t0,t0,t0,t0,t0,t0) //并行度 val number=10 val system = ActorSystem.create("task") val mActor = system.actorOf(Props(classOf[TaskMaster]), "mActor") mActor ! CaseModel.Table(number) } def getTableModel: mutable.Queue[CaseModel.TableModel] ={ queue0 } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)