文章目录
- muduo网络库学习(2)
- 前言
- 一、oneloop per thread
- 二、EventLoop事件循环流程
- 1.事件循环流程
- 二、Channel类
- 三、Poller类
- 1.select poll epoll
- 2.代码解析
- 总结
前言
本章主要记录muduo库中的事件循环功能实现即EventLoop类实现。
一、oneloop per thread
为了保证每一个线程只有一个loop循环,muduo在每个线程中创建了一个变量记录EventLoop的指针。
namespace mduo{
namespace net{
// 当前线程EventLoop对象指针
// 线程局部存储
__thread EventLoop*t_loopThisThread=0;
//构造函数中
EventLoop::EventLoop(){
if(t_loopThisThread){
//之前改线程已经建立了EventLoop对象
}
else{
t_loopThisThread=this;
}
}
EventLoop::~EventLoop()
{
t_loopInThisThread = NULL;
}
//判断Eventloop对象执行时候是否在当前线程中
bool EventLoop::isInLoopThread(){
return this==t_loopThisThread;
}
}
}
二、EventLoop事件循环流程
1.事件循环流程
由EventLoop通过loop函数从而调用成员Poller的poll()函数得到当前就绪的IO事件activeChannels返回给EventLoop,EventLoop通过调用对应Channel的handleEvent进行事件处理,周而复始!
代码如下:
// 事件循环,该函数不能跨线程调用
// 只能在创建该对象的线程中调用
void EventLoop::loop()
{
assert(!looping_);
// 断言当前处于创建该对象的线程中
assertInLoopThread();
looping_ = true;
LOG_TRACE << "EventLoop " << this << " start looping";
//::poll(NULL, 0, 5*1000);
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
//++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
//doPendingFunctors();
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
// XXX pollfds_ shouldn't change
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happended";
fillActiveChannels(numEvents, activeChannels);
}
else if (numEvents == 0)
{
LOG_TRACE << " nothing happended";
}
else
{
LOG_SYSERR << "PollPoller::poll()";
}
return now;
}
void PollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
for (PollFdList::const_iterator pfd = pollfds_.begin();
pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
if (pfd->revents > 0)
{
--numEvents;
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
assert(ch != channels_.end());
Channel* channel = ch->second;
assert(channel->fd() == pfd->fd);
channel->set_revents(pfd->revents);
// pfd->revents = 0;
activeChannels->push_back(channel);
}
}
}
二、Channel类
Channel类不控制文件描述符的生存周期,Channel中的文件描述符可以是
eventfd,timefd,signalfd。
Channel主要功能:
2.1:这个文件描述符产生的事件增加文件描述符
代码如下:
void setReadCallback(const ReadEventCallback& cb)
{ readCallback_ = cb; }
void setWriteCallback(const EventCallback& cb)
{ writeCallback_ = cb; }
void setCloseCallback(const EventCallback& cb)
{ closeCallback_ = cb; }
void setErrorCallback(const EventCallback& cb)
{ errorCallback_ = cb; }
2.2:向EventLoop中注册事件
代码如下:
void enableReading() { events_ |= kReadEvent; update(); }
// void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
void Channel::update()
{
loop_->updateChannel(this);
}
void EventLoop::updateChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}
void PollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
if (channel->index() < 0)
{
// index < 0说明是一个新的通道
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
pollfds_.push_back(pfd);
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
}
else
{
// update existing one
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
// 将一个通道暂时更改为不关注事件,但不从Poller中移除该通道
if (channel->isNoneEvent())
{
// ignore this pollfd
// 暂时忽略该文件描述符的事件
// 这里pfd.fd 可以直接设置为-1
pfd.fd = -channel->fd()-1; // 这样子设置是为了removeChannel优化
}
}
}
// 调用这个函数之前确保调用disableAll
void Channel::remove()
{
assert(isNoneEvent());
loop_->removeChannel(this);
}
void EventLoop::removeChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
if (eventHandling_)
{
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
}
poller_->removeChannel(channel);
}
void PollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
assert(channel->isNoneEvent());
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
size_t n = channels_.erase(channel->fd());
assert(n == 1); (void)n;
if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
{
pollfds_.pop_back();
}
else
{
// 这里移除的算法复杂度是O(1),将待删除元素与最后一个元素交换再pop_back
int channelAtEnd = pollfds_.back().fd;
iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
if (channelAtEnd < 0)
{
channelAtEnd = -channelAtEnd-1;
}
channels_[channelAtEnd]->set_index(idx);
pollfds_.pop_back();
}
}
3:处理文件描述符对应的事件
代码如下:
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
//只有管道的写端被关闭,读端才会接收到POLLHUP
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
//无效的文件描述符
if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
//POLLRDHUP对等方关闭或者半关闭read返回为0
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
三、Poller类
poller类是muduo库中唯一一个基于面向对象思想封装的类,由poller抽象类得到EpollPoller类,PollPoller类,两者区别在于IO复用的方法分别是epoll和poll。
1.select poll epoll首先介绍一下Linux下三种IO复用系统调用的区别与联系
语法区别:
select(int nfds,fd_set* readfds,fd_set writefds,fd_set exception,struct timeval*timeout)
/*nfds:监听文件描述符最大值+1
fd_set:文件描述符数组
timeout:超时事件
struct timeval{
int s;
int us;
}
*/
poll(struct pollfd*pollfds,nfds_t nfds,int timeout)
/*
struct pollfd{
int fd;//文件描述符
int events;//注册的事件
int revents;//实际就绪的事件,由内核填充
}
nfds:被监听事件集合的大小
timeout:超时时间ms
*/
epoll_wait(int epollfd,struct epoll_event*events,int maxevents,int timeout)
/*
epollfd:标识内核事件表
events:返回的就绪事件
struct epoll_event{
epoll_data_t data;
_uint32_t events;
};
typedef unio epoll_data{
void*ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t
maxevents:最多监听多少个事件
timeout:超时时间
*/
返回值:
超过超时时间没有时间发生返回0;
发生错误返回-1,指定errno;
有事件发生返回事件的个数。
区别:
抽象类Poller:四个虚函数,分别是用于析构,更新通道监听事件,删除通道对应的事件以及,开始事件监听循环(返回活跃通道),注意在muduo中将文件描述符以及就绪事件,注册事件,对应的read,write,error处理方法封装成了一个Channel类。
class Poller{
public:
typedef std::vector<Channel*> ChannelList;
Poller(Eventloop*loop);
virtual ~Poller();//抽象类必须设置析构函数为虚函数,避免内存泄漏
/// Polls the I/O events.
/// Must be called in the loop thread.
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel* channel) = 0;
/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel* channel) = 0;
//创建eventloop对应的Poller对象
static Poller* newDefaultPoller(Eventloop*loop);
//断言必须在创建eventloop的线程进行io复用
void assertInLoopThread();
private:
Eventloop* loop_;//一对一关系
};
PollPoller:
添加通道:通过判断Channel对象的下标来判断之前是否已经加入到pollfd中,若没有则加入了则pollfd数组同时更新map,否则就更新对应pollfd对应events即可。
void PollPoller::updateChannel(Channel* channel){
//断言在创建Eventloop的线程中
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
if(channel->index()<0){
//index<0 说明是一个新的通道
assert(Channels_.find(channel->fd())==Channels_.end());
struct pollfd pfd;
pfd.fd=channel->fd();
pfd.events=channel->events();
pfd.revents=0;
Channels_[pfd.fd]=channel;
pollfds_.push_back(pfd);
int idx=pollfds_.size()-1;
channel->set_index(idx);
}
else{
assert(Channels_[channel->fd()]->fd()==channel->fd());
int idx=channel->index();
assert(idx>0&&idx<Channels_.size());
struct pollfd pfd=pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
pfd.events=channel->events();
pfd.revents=0;
// 将一个通道暂时更改为不关注事件,但不从Poller中移除该通道
if(channel->isNoneEvent()){
// ignore this pollfd
// 暂时忽略该文件描述符的事件
// 这里pfd.fd 可以直接设置为-1
pfd.fd=-pfd.fd-1;
}
}
}
**移除通道:**移除通道之前需要先检测对应的通道是否没有注册事件了,若已经没有注册事件了直接将channel对应的pollfd从数组中删除,同时将map对应元素删除
void PollPoller::removeChannel(Channel* channel){
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd();
assert(Channels_.find(channel->fd()) != Channels_.end());
assert(Channels_[channel->fd()] == channel);
assert(channel->isNoneEvent());
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
const struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
size_t n = Channels_.erase(channel->fd());
assert(n == 1);
if(static_cast<size_t>(idx)==pollfds_.size()-1){
pollfds_.pop_back();
}
else{
// 这里移除的算法复杂度是O(1),将待删除元素与最后一个元素交换再pop_back
int temp_id=pollfds_[pollfds_.size()-1].fd;
iter_swap(pollfds_.begin()+idx,pollfds_.begin()+pollfds_.size()-1);
if (temp_id < 0)
{
temp_id = -temp_id-1;
}
Channels_[temp_id]->set_index(idx);
pollfds_.pop_back();
}
}
**IO复用:**遍历pollfd数组中的revents如果大于0,证明有就绪事件发生,同时更新对应的Channel对象中的revents,并将活跃的通达存于ActiveChannels中,返回给EventLoop,EventLoop对活跃通道进行事件回调。
Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels){
int number=::poll(&(*pollfds_.begin()),pollfds_.size(),timeoutMs);
if(number<0){
LOG_SYSERR << "PollPoller::poll()";
}
else if(number==0){
LOG_TRACE << " nothing happended";
}
else{
LOG_TRACE << number << " events happended";
fillActiveChannels(number, activeChannels);
}
return Timestamp::now();
}
void PollPoller::fillActiveChannels(int numevents, ChannelList* activeChannels){
for(PollfdList::iterator it=pollfds_.begin();it!=pollfds_.end();it++){
if(it->revents!=0){
numevents--;
ChannelMap::iterator temp=Channels_.find(it->fd);
assert(it->fd==temp->second->fd());
temp->second->set_revents(it->revents);
activeChannels->push_back(temp->second);
}
}
}
EPollPoller:
EpollPoller相对于PollPoller来说,主要是下标的 *** 作,对于EpollPoller,remove后对于的下标为new,update中如果是更新的话则是add,如果是不关注事件则是delete但是这里没有将通道移除map容器。其余的 *** 作与PollPoller没有区别
enum MOD{
kNew = -1,
kAdded = 1,
kDeleted = 2
};
EpollPoller::EpollPoller(Eventloop*loop):
Poller(loop),
epollfd_(epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
if(epollfd_<0){
LOG_SYSFATAL << "EPollPoller::EPollPoller";
}
}
EpollPoller:: ~EpollPoller(){
::close(epollfd_);
}
Timestamp EpollPoller::poll(int timeoutMs, ChannelList* activeChannels){
int number=epoll_wait(epollfd_,&*events_.begin(),events_.size(),timeoutMs);
if(number>0){
LOG_TRACE << number << " events happended";
fillActiveChannels(number, activeChannels);
if (static_cast<size_t>(number) == events_.size())
{
events_.resize(events_.size()*2);
}
}
else if(number==0){
LOG_TRACE << " nothing happended";
}
else{
LOG_SYSERR << "EPollPoller::poll()";
}
return Timestamp::now();
}
void EpollPoller::fillActiveChannels(int number,ChannelList*activeChannels){
for(int i=0;i<number;i++){
//ptr指向的用户数据中包含了fd
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
int fd = channel->fd();
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}
void EpollPoller::updateChannel(Channel* channel){
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
int index=channel->index();
if(index==MOD::kNew||index==MOD::kDeleted){
int fd=channel->fd();
if(index==MOD::kNew){
assert(channels_.find(fd) == channels_.end());
channels_[fd] = channel;
}
else{
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
}
channel->set_index(MOD::kAdded);
update(EPOLL_CTL_ADD, channel);
}
else{
int fd = channel->fd();
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(index == MOD::kAdded);
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(MOD::kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}
void EpollPoller::removeChannel(Channel* channel){
Poller::assertInLoopThread();
int fd = channel->fd();
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(channel->isNoneEvent());
int index = channel->index();
assert(index == kAdded || index == kDeleted);
size_t n = channels_.erase(fd);
assert(n == 1);
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
channel->set_index(MOD::kNew);
}
void EpollPoller::update(int operation, Channel* channel){
struct epoll_event event;
memset(&event,0,sizeof event);
event.events = channel->events();
event.data.ptr = channel;
int fd = channel->fd();
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
if (operation == EPOLL_CTL_DEL)
{
LOG_SYSERR << "epoll_ctl op=" << operation << " fd=" << fd;
}
else
{
LOG_SYSFATAL << "epoll_ctl op=" << operation << " fd=" << fd;
}
}
}
总结
本章主要讲述了EventLoop,Channel,以及Poller三者之间的关系,主要是一个IO线程对应一个EventLoop,一个EventLoop对应着一个Poller用于事件循环,一个Poller可以关注多个Channel,返回给EventLoop 活跃的ActiveChannels。注意的是EventLoop中的Channel中更新删除以及loop都必须在创建loop的线程中使用,避免跨线程危险。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)