scala编写Akka并发控制线程数量简单示例

scala编写Akka并发控制线程数量简单示例,第1张

scala编写Akka并发控制线程数量简单示例

akka设计模式
首先就是设计一个管理者,来管理控制线程的并发
taskTotal是并发执行的总任务数
taskExecute是已经执行过的任务数
根据传入的并发数量来控制我们一开始要创建多少个工作
通过.actorOf来创建一个worker的ActorRef,注意名字不能重复,并且将自己的ActorRef传过去,用来接收回馈的信息
每创建完一个ActorRef,我们从任务队列中获取一个任务,让它把任务消息发给workerActor去执行任务

worker执行完之后反馈执行情况,将任务执行数量和总任务数对比,不相等则进一步判断任务对列是否还有任务
有则将任务发送过去继续执行,没有则关闭对应的worker,等待其他线程执行完成
所以线程执行完成,则关闭整个system

class TaskMaster extends Actor{
  val taskTotal=29
  var taskExecute=0
  var queue:mutable.Queue[CaseModel.TableModel]=_
  override def receive: Receive = {
    case CaseModel.Table(parallel)=>{
      1 to parallel foreach { i =>
        val worker = context.actorOf(Props(new TaskWorker(self)),s"Worker-$i")
        worker ! CaseModel.Task(queue.dequeue())
      }
    }
    case CaseModel.TaskResult(rf)=>{
      taskExecute =taskExecute+1
      if (taskExecute==taskTotal){
        context.system.shutdown()
      }else{
        if (queue.nonEmpty){
          rf ! CaseModel.Task(queue.dequeue())
        }else{
          context.stop(rf)
        }

      }
    }
  }

  override def preStart(): Unit = {
    queue = Demo.getTableModel
  }
}


设计一个工作者WorkerActor来完成具体的每一项任务
从Master中将传一个任务过来,获取到任务信息,执行任务
执行完之后将任务执行情况返回给Master,并将自己的ActorRef传过去,接收反馈信息

class TaskWorker(mT:ActorRef) extends Actor{

  def doWorker(tableModel: CaseModel.TableModel) = {
    Thread.sleep(tableModel.time)
  }

  override def receive: Receive = {
    case CaseModel.Task(tableModel)=>{
      println("开始执行"+tableModel.tablename+"任务")
      doWorker(tableModel)
      println(tableModel.tablename+"任务执行完成")
      mT ! CaseModel.TaskResult(self)
      //      context.stop(self)
    }
  }
}

消息传递样例

object CaseModel {
  case class TableModel(tablename: String,time:Int,n:Int)
  case class Table(i: Int)
  case class Task(tableModel:TableModel)
  case class TaskResult(rf:ActorRef)
}

一个简单的Demo

object Demo{
  val queue0 = new mutable.Queue[CaseModel.TableModel]
  def main(args: Array[String]): Unit = {

    val t1 = CaseModel.TableModel("a", 4000, 0)
    val t2 = CaseModel.TableModel("b", 4000, 1)
    val t10 =CaseModel.TableModel("m", 4000, 9)
    val t3 = CaseModel.TableModel("c", 4000, 2)
    val t4 = CaseModel.TableModel("d", 4000, 3)
    val t5 = CaseModel.TableModel("e", 4000, 4)
    val t6 = CaseModel.TableModel("f", 4000, 5)
    val t7 = CaseModel.TableModel("g", 4000, 6)
    val t8 = CaseModel.TableModel("h", 3000, 7)
    val t9 = CaseModel.TableModel("i", 2000, 8)
    val t0 = CaseModel.TableModel("j", 1000, 9)
    queue0.enqueue(t0,t2,t3,t4,t6,t7,t8,t9,t10,t0,t1,t2,t3,t4,t6,t7,t8,t0,t0,t0,t0,t0,t0,t0,t0,t0,t0,t0,t0)
    //并行度
    val number=10
    val system = ActorSystem.create("task")
    val mActor = system.actorOf(Props(classOf[TaskMaster]), "mActor")
    mActor ! CaseModel.Table(number)
  }
  def getTableModel: mutable.Queue[CaseModel.TableModel] ={
    queue0
  }
}


 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5703589.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存