C++协程线程池

C++协程线程池,第1张

协程线程池

如何等待协程完成
协程表示任务,完成任务时恢复协程,从而获取任务返回值.
以线程中执行任务为例,来看如何定制协程机器的"零件"来实现等待并获取任务返回值:

任务<大小型>读文件(){
  常 动 结果=协待 异步读文件{"../主.c++"};
  协中 结果.大小();
}

整个流程大概这样:
创建协程后,执行协待承诺.初始挂起,由于返回从不挂起,不会挂起协程,执行协程函数体(函数体),然后执行协待异步读文件{"../主.c++"},协待式会产生3个结果:
1,挂起当前协程;
2,执行等待器.挂起协
3,执行代码返回到调用者

协待等待器生成的伪代码如下:

(!等待器.准备好协()){
     挂起协程;//挂起当前协程
     等待器.挂起协(协程句柄);//异步执行任务
     中 到调用者;//返回调用者
     恢复点;//协程恢复时回到这里
 }
 中 等待器.恢复协();//返回异步任务结果

等待器.挂起协传入协程句柄(协程句柄)到执行任务的线程中,线程中执行完成任务时,恢复挂起协程,通过恢复协程来告诉协程任务执行完了,并通过等待器.恢复协()返回任务的结果.
现在已经了解协程是如何表示任务了,接下来要实现同步等待协程的完成,实现思路还是定制"协程机器"零件.
要实现等待语义,可再创建等待协程,这个协程调用者间通过事件来通信,调用者那里阻塞等待事件,等待协程执行时,也是被等待协程完成时,通过事件通知调用者已完成了.
可在等待器下功夫,创建协程后挂起,传入协程句柄被等待协程(A)中,A执行完成后恢复等待协程,等待协程给阻塞的调用者完成通知.

<类 T>
动 同步等待(T 任务){
  承诺<>承诺;
  动 帮助器=[&]()->同步等待任务{
    承诺.置值();
    协中;
  };
  动 帮助器任务=帮助器();
  任务.置下(帮助器任务.取句柄());
  承诺.取未来().等待();
  中 任务.取结果();
}

整 主(){
  动 懒读任务=读文件();
  动 读大小=同步等待(移动(懒读任务));0;
}

看看同步等待实现,这里通过承诺来实现协程调用者间通信,也可换成条件变量等待事件等.先创建给调用者通知表示等待结束同步等待任务协程(B).B是在被等待协程止挂起里恢复的,因为止挂起是在执行协程函数体后调用的,所以止挂起就说明已执行完了协程,在这里恢复同步等待任务是最好的.

#指示 一次
元<型名 T>
构 承诺;<型名 T>[[未丢弃]]任务{
  用 承诺类型=承诺<T>;
  任务()=默认;

  动 符号 协待()常 无异{
    构 可等待{
      极 准备好协()常 无异{
        中 承诺.是准备好();
      }
      用 协程句柄=协程句柄<>;
      协程句柄 挂起协(协程句柄 连续)常 无异{
        承诺.连续=连续;
        中 协程句柄<承诺<T>>::从承诺(承诺);
      }
      T&&恢复协(){
        中 承诺.取结果();
      }

      承诺<T>&承诺;
    };
    中 可等待{*承诺};
  }

  T&&取结果(){
    中 承诺->取结果();
  }

  动 取句柄(){中 承诺->取句柄();}
  空 置下(协程句柄<>h){
    承诺->连续=h;
  }:
  任务(承诺<T>*承诺):承诺{承诺}{}
  承诺<T>*承诺=空针;<型名>友 构 承诺;
};<型名 T>
构 承诺{
  动 取中对象(){
    中 任务<T>{};
  }
  动 取句柄(){
    中 协程句柄<承诺<T>>::从承诺(*);
  }
  从不挂起 初始挂起()无异{{};}
  动 止挂起()无异{
    构 止可等待{
      极 准备好协()常 无异{中 假;}
      空 挂起协(协程句柄<承诺<T>>本协程)无异{&承诺=本协程.承诺();(承诺.连续)
          承诺.连续();
      }
      空 恢复协()常 无异{}
    };

    中 止可等待{};
  }
  空 未处理异常(){终止();}<型名 U>
  空 返回值(U&&)
  {
    结果.元 原位<1>(前向<U>());
  }

  T&&取结果(){(结果.索引()==2)
      再抛异常(<2>(结果));
    中 移动(<1>(结果));
  }

  极 是准备好(){
    中 结果.索引()!=0;
  }

  变量<单态,T,异常针>结果;
  协程句柄<>连续=无 *** 协程();
};

构 同步等待任务{
  构 承诺类型{
    动 取中对象(){
      中 同步等待任务{协程句柄<承诺类型>::从承诺(*)};
    }
    动 初始挂起(){
      中 总是挂起{};
    }
    空 中空(){
    }
    动 止挂起()无异{
      中 从不挂起{};
    }
    空 未处理异常(){
      退出(1);
    }
  };

  动 取句柄(){
    中 句柄_;
  }

  协程句柄<承诺类型>句柄_;
};<类 T>
动 同步等待(T 任务){
  承诺<>承诺;
  动 帮助器=[&]()->同步等待任务{
    承诺.置值();
    协中;
  };

  动 帮助器任务=帮助器();

  任务.置下(帮助器任务.取句柄());
  承诺.取未来().等待();
  中 任务.取结果();
}

协程线程池

相比调度协程到线程中执行,把协程调度到线程池中执行更有意义,实现思路也比较简单,通过协待等待器即可实现,在等待器.挂起协里把协程丢到线程池里就好了,剩下就由线程池调度执行协程就好了.
线程池部分实现也很简单,把前面简单池代码增加个调度方法即可,其它都不变:

#包含"安全队列.h++"
用 协程项=协程句柄<>;
类 协程线程池{:
  显 协程线程池(大小型 线程=线程::硬件并行()){(大小型 i=0;i<线程;++i)
      工作者_.原后([]{(;;){
          协程项 任务;(!队列_.d(任务));(任务)任务();
        }
      });
  }

  动 调度()
  {
    构 等待器
    {
      协程线程池*线程池_;

      常式 极 准备好协()常 无异{中 假;}
      常式 空 恢复协()常 无异{}
      空 挂起协(协程句柄<>协程)常 无异{
        线程池_->入列(协程);
      }
    };
    中 等待器{};
  }

  空 入列(协程项 项){队列_.(移动());}
  ~协程线程池(){
    队列_.停止();(&thd:工作者_){
      thd.合并();
    }
  }:
  队列<协程项>队列_;
  向量<线程>工作者_;
};
//测试代码:

空 测试协程池(){
  协程线程池 池;
  动 任务=调度(,[](整 x){
      输出<<"当前线程标识:"<<本线程::取标识()<<"\n";
      中 x;
    },
    42);
  输出<<同步等待(移动(任务))<<'\n';

  调度(,[](整 x){
        输出<<"当前线程标识:"<<本线程::取标识()<<"\n";
        中 x;
    },42);

  本线程::休息(时间::(2));
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/562265.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-01
下一篇 2022-04-01

发表评论

登录后才能评论

评论列表(0条)

保存